lambda + SNS enhancements (#1048)
* updates - support lambda messages from SNS - run lambda in docker container * decode output * populate timeout * simplify * whoops * skeletons of cloudwatchlogs * impl filter log streams * fix logging * PEP fixes * PEP fixes * fix reset * fix reset * add new endpoint * fix region name * add docker * try to fix tests * try to fix travis issue with boto * fix escaping in urls * fix environment variables * fix PEP * more pep * switch back to precise * another fix attempt * fix typo * fix lambda invoke * fix more unittests * work on getting this to work in new scheme * fix py2 * fix error * fix tests when running in server mode * more lambda fixes * try running with latest docker adapted from aiodocker * switch to docker python client * pep fixes * switch to docker volume * fix unittest * fix invoke from sns * fix zip2tar * add hack impl for get_function with zip * try fix * fix for py < 3.6 * add volume refcount * try to fix travis * docker test * fix yaml * try fix * update endpoints * fix * another attempt * try again * fix recursive import * refactor fix * revert changes with better fix * more reverts * wait for service to come up * add back detached mode * sleep and add another exception type * put this back for logging * put back with note * whoops :) * docker in docker! * fix invalid url * hopefully last fix! * fix lambda regions * fix protocol * travis!!!! * just run lambda test for now * use one print * fix escaping * another attempt * yet another * re-enable all tests * fixes * fix for py2 * revert change * fix for py2.7 * fix output ordering * remove this given there's a new unittest that covers it * changes based on review - add skeleton logs test file - switch to docker image that matches test env - fix mock_logs import * add readme entry
This commit is contained in:
parent
ca8ce8705b
commit
9008b85299
21 changed files with 836 additions and 167 deletions
5
moto/logs/__init__.py
Normal file
5
moto/logs/__init__.py
Normal file
|
|
@ -0,0 +1,5 @@
|
|||
from .models import logs_backends
|
||||
from ..core.models import base_decorator, deprecated_base_decorator
|
||||
|
||||
mock_logs = base_decorator(logs_backends)
|
||||
mock_logs_deprecated = deprecated_base_decorator(logs_backends)
|
||||
228
moto/logs/models.py
Normal file
228
moto/logs/models.py
Normal file
|
|
@ -0,0 +1,228 @@
|
|||
from moto.core import BaseBackend
|
||||
import boto.logs
|
||||
from moto.core.utils import unix_time_millis
|
||||
|
||||
|
||||
class LogEvent:
|
||||
_event_id = 0
|
||||
|
||||
def __init__(self, ingestion_time, log_event):
|
||||
self.ingestionTime = ingestion_time
|
||||
self.timestamp = log_event["timestamp"]
|
||||
self.message = log_event['message']
|
||||
self.eventId = self.__class__._event_id
|
||||
self.__class__._event_id += 1
|
||||
|
||||
def to_filter_dict(self):
|
||||
return {
|
||||
"eventId": self.eventId,
|
||||
"ingestionTime": self.ingestionTime,
|
||||
# "logStreamName":
|
||||
"message": self.message,
|
||||
"timestamp": self.timestamp
|
||||
}
|
||||
|
||||
|
||||
class LogStream:
|
||||
_log_ids = 0
|
||||
|
||||
def __init__(self, region, log_group, name):
|
||||
self.region = region
|
||||
self.arn = "arn:aws:logs:{region}:{id}:log-group:{log_group}:log-stream:{log_stream}".format(
|
||||
region=region, id=self.__class__._log_ids, log_group=log_group, log_stream=name)
|
||||
self.creationTime = unix_time_millis()
|
||||
self.firstEventTimestamp = None
|
||||
self.lastEventTimestamp = None
|
||||
self.lastIngestionTime = None
|
||||
self.logStreamName = name
|
||||
self.storedBytes = 0
|
||||
self.uploadSequenceToken = 0 # I'm guessing this is token needed for sequenceToken by put_events
|
||||
self.events = []
|
||||
|
||||
self.__class__._log_ids += 1
|
||||
|
||||
def to_describe_dict(self):
|
||||
return {
|
||||
"arn": self.arn,
|
||||
"creationTime": self.creationTime,
|
||||
"firstEventTimestamp": self.firstEventTimestamp,
|
||||
"lastEventTimestamp": self.lastEventTimestamp,
|
||||
"lastIngestionTime": self.lastIngestionTime,
|
||||
"logStreamName": self.logStreamName,
|
||||
"storedBytes": self.storedBytes,
|
||||
"uploadSequenceToken": str(self.uploadSequenceToken),
|
||||
}
|
||||
|
||||
def put_log_events(self, log_group_name, log_stream_name, log_events, sequence_token):
|
||||
# TODO: ensure sequence_token
|
||||
# TODO: to be thread safe this would need a lock
|
||||
self.lastIngestionTime = unix_time_millis()
|
||||
# TODO: make this match AWS if possible
|
||||
self.storedBytes += sum([len(log_event["message"]) for log_event in log_events])
|
||||
self.events += [LogEvent(self.lastIngestionTime, log_event) for log_event in log_events]
|
||||
self.uploadSequenceToken += 1
|
||||
|
||||
return self.uploadSequenceToken
|
||||
|
||||
def get_log_events(self, log_group_name, log_stream_name, start_time, end_time, limit, next_token, start_from_head):
|
||||
def filter_func(event):
|
||||
if start_time and event.timestamp < start_time:
|
||||
return False
|
||||
|
||||
if end_time and event.timestamp > end_time:
|
||||
return False
|
||||
|
||||
return True
|
||||
|
||||
events = sorted(filter(filter_func, self.events), key=lambda event: event.timestamp, reverse=start_from_head)
|
||||
back_token = next_token
|
||||
if next_token is None:
|
||||
next_token = 0
|
||||
|
||||
events_page = events[next_token: next_token + limit]
|
||||
next_token += limit
|
||||
if next_token >= len(self.events):
|
||||
next_token = None
|
||||
|
||||
return events_page, back_token, next_token
|
||||
|
||||
def filter_log_events(self, log_group_name, log_stream_names, start_time, end_time, limit, next_token, filter_pattern, interleaved):
|
||||
def filter_func(event):
|
||||
if start_time and event.timestamp < start_time:
|
||||
return False
|
||||
|
||||
if end_time and event.timestamp > end_time:
|
||||
return False
|
||||
|
||||
return True
|
||||
|
||||
events = []
|
||||
for event in sorted(filter(filter_func, self.events), key=lambda x: x.timestamp):
|
||||
event_obj = event.to_filter_dict()
|
||||
event_obj['logStreamName'] = self.logStreamName
|
||||
events.append(event_obj)
|
||||
return events
|
||||
|
||||
|
||||
class LogGroup:
|
||||
def __init__(self, region, name, tags):
|
||||
self.name = name
|
||||
self.region = region
|
||||
self.tags = tags
|
||||
self.streams = dict() # {name: LogStream}
|
||||
|
||||
def create_log_stream(self, log_stream_name):
|
||||
assert log_stream_name not in self.streams
|
||||
self.streams[log_stream_name] = LogStream(self.region, self.name, log_stream_name)
|
||||
|
||||
def delete_log_stream(self, log_stream_name):
|
||||
assert log_stream_name in self.streams
|
||||
del self.streams[log_stream_name]
|
||||
|
||||
def describe_log_streams(self, descending, limit, log_group_name, log_stream_name_prefix, next_token, order_by):
|
||||
log_streams = [stream.to_describe_dict() for name, stream in self.streams.items() if name.startswith(log_stream_name_prefix)]
|
||||
|
||||
def sorter(stream):
|
||||
return stream.name if order_by == 'logStreamName' else stream.lastEventTimestamp
|
||||
|
||||
if next_token is None:
|
||||
next_token = 0
|
||||
|
||||
log_streams = sorted(log_streams, key=sorter, reverse=descending)
|
||||
new_token = next_token + limit
|
||||
log_streams_page = log_streams[next_token: new_token]
|
||||
if new_token >= len(log_streams):
|
||||
new_token = None
|
||||
|
||||
return log_streams_page, new_token
|
||||
|
||||
def put_log_events(self, log_group_name, log_stream_name, log_events, sequence_token):
|
||||
assert log_stream_name in self.streams
|
||||
stream = self.streams[log_stream_name]
|
||||
return stream.put_log_events(log_group_name, log_stream_name, log_events, sequence_token)
|
||||
|
||||
def get_log_events(self, log_group_name, log_stream_name, start_time, end_time, limit, next_token, start_from_head):
|
||||
assert log_stream_name in self.streams
|
||||
stream = self.streams[log_stream_name]
|
||||
return stream.get_log_events(log_group_name, log_stream_name, start_time, end_time, limit, next_token, start_from_head)
|
||||
|
||||
def filter_log_events(self, log_group_name, log_stream_names, start_time, end_time, limit, next_token, filter_pattern, interleaved):
|
||||
assert not filter_pattern # TODO: impl
|
||||
|
||||
streams = [stream for name, stream in self.streams.items() if not log_stream_names or name in log_stream_names]
|
||||
|
||||
events = []
|
||||
for stream in streams:
|
||||
events += stream.filter_log_events(log_group_name, log_stream_names, start_time, end_time, limit, next_token, filter_pattern, interleaved)
|
||||
|
||||
if interleaved:
|
||||
events = sorted(events, key=lambda event: event.timestamp)
|
||||
|
||||
if next_token is None:
|
||||
next_token = 0
|
||||
|
||||
events_page = events[next_token: next_token + limit]
|
||||
next_token += limit
|
||||
if next_token >= len(events):
|
||||
next_token = None
|
||||
|
||||
searched_streams = [{"logStreamName": stream.logStreamName, "searchedCompletely": True} for stream in streams]
|
||||
return events_page, next_token, searched_streams
|
||||
|
||||
|
||||
class LogsBackend(BaseBackend):
|
||||
def __init__(self, region_name):
|
||||
self.region_name = region_name
|
||||
self.groups = dict() # { logGroupName: LogGroup}
|
||||
|
||||
def reset(self):
|
||||
region_name = self.region_name
|
||||
self.__dict__ = {}
|
||||
self.__init__(region_name)
|
||||
|
||||
def create_log_group(self, log_group_name, tags):
|
||||
assert log_group_name not in self.groups
|
||||
self.groups[log_group_name] = LogGroup(self.region_name, log_group_name, tags)
|
||||
|
||||
def ensure_log_group(self, log_group_name, tags):
|
||||
if log_group_name in self.groups:
|
||||
return
|
||||
self.groups[log_group_name] = LogGroup(self.region_name, log_group_name, tags)
|
||||
|
||||
def delete_log_group(self, log_group_name):
|
||||
assert log_group_name in self.groups
|
||||
del self.groups[log_group_name]
|
||||
|
||||
def create_log_stream(self, log_group_name, log_stream_name):
|
||||
assert log_group_name in self.groups
|
||||
log_group = self.groups[log_group_name]
|
||||
return log_group.create_log_stream(log_stream_name)
|
||||
|
||||
def delete_log_stream(self, log_group_name, log_stream_name):
|
||||
assert log_group_name in self.groups
|
||||
log_group = self.groups[log_group_name]
|
||||
return log_group.delete_log_stream(log_stream_name)
|
||||
|
||||
def describe_log_streams(self, descending, limit, log_group_name, log_stream_name_prefix, next_token, order_by):
|
||||
assert log_group_name in self.groups
|
||||
log_group = self.groups[log_group_name]
|
||||
return log_group.describe_log_streams(descending, limit, log_group_name, log_stream_name_prefix, next_token, order_by)
|
||||
|
||||
def put_log_events(self, log_group_name, log_stream_name, log_events, sequence_token):
|
||||
# TODO: add support for sequence_tokens
|
||||
assert log_group_name in self.groups
|
||||
log_group = self.groups[log_group_name]
|
||||
return log_group.put_log_events(log_group_name, log_stream_name, log_events, sequence_token)
|
||||
|
||||
def get_log_events(self, log_group_name, log_stream_name, start_time, end_time, limit, next_token, start_from_head):
|
||||
assert log_group_name in self.groups
|
||||
log_group = self.groups[log_group_name]
|
||||
return log_group.get_log_events(log_group_name, log_stream_name, start_time, end_time, limit, next_token, start_from_head)
|
||||
|
||||
def filter_log_events(self, log_group_name, log_stream_names, start_time, end_time, limit, next_token, filter_pattern, interleaved):
|
||||
assert log_group_name in self.groups
|
||||
log_group = self.groups[log_group_name]
|
||||
return log_group.filter_log_events(log_group_name, log_stream_names, start_time, end_time, limit, next_token, filter_pattern, interleaved)
|
||||
|
||||
|
||||
logs_backends = {region.name: LogsBackend(region.name) for region in boto.logs.regions()}
|
||||
114
moto/logs/responses.py
Normal file
114
moto/logs/responses.py
Normal file
|
|
@ -0,0 +1,114 @@
|
|||
from moto.core.responses import BaseResponse
|
||||
from .models import logs_backends
|
||||
import json
|
||||
|
||||
|
||||
# See http://docs.aws.amazon.com/AmazonCloudWatchLogs/latest/APIReference/Welcome.html
|
||||
|
||||
class LogsResponse(BaseResponse):
|
||||
@property
|
||||
def logs_backend(self):
|
||||
return logs_backends[self.region]
|
||||
|
||||
@property
|
||||
def request_params(self):
|
||||
try:
|
||||
return json.loads(self.body)
|
||||
except ValueError:
|
||||
return {}
|
||||
|
||||
def _get_param(self, param, if_none=None):
|
||||
return self.request_params.get(param, if_none)
|
||||
|
||||
def create_log_group(self):
|
||||
log_group_name = self._get_param('logGroupName')
|
||||
tags = self._get_param('tags')
|
||||
assert 1 <= len(log_group_name) <= 512 # TODO: assert pattern
|
||||
|
||||
self.logs_backend.create_log_group(log_group_name, tags)
|
||||
return ''
|
||||
|
||||
def delete_log_group(self):
|
||||
log_group_name = self._get_param('logGroupName')
|
||||
self.logs_backend.delete_log_group(log_group_name)
|
||||
return ''
|
||||
|
||||
def create_log_stream(self):
|
||||
log_group_name = self._get_param('logGroupName')
|
||||
log_stream_name = self._get_param('logStreamName')
|
||||
self.logs_backend.create_log_stream(log_group_name, log_stream_name)
|
||||
return ''
|
||||
|
||||
def delete_log_stream(self):
|
||||
log_group_name = self._get_param('logGroupName')
|
||||
log_stream_name = self._get_param('logStreamName')
|
||||
self.logs_backend.delete_log_stream(log_group_name, log_stream_name)
|
||||
return ''
|
||||
|
||||
def describe_log_streams(self):
|
||||
log_group_name = self._get_param('logGroupName')
|
||||
log_stream_name_prefix = self._get_param('logStreamNamePrefix')
|
||||
descending = self._get_param('descending', False)
|
||||
limit = self._get_param('limit', 50)
|
||||
assert limit <= 50
|
||||
next_token = self._get_param('nextToken')
|
||||
order_by = self._get_param('orderBy', 'LogStreamName')
|
||||
assert order_by in {'LogStreamName', 'LastEventTime'}
|
||||
|
||||
if order_by == 'LastEventTime':
|
||||
assert not log_stream_name_prefix
|
||||
|
||||
streams, next_token = self.logs_backend.describe_log_streams(
|
||||
descending, limit, log_group_name, log_stream_name_prefix,
|
||||
next_token, order_by)
|
||||
return json.dumps({
|
||||
"logStreams": streams,
|
||||
"nextToken": next_token
|
||||
})
|
||||
|
||||
def put_log_events(self):
|
||||
log_group_name = self._get_param('logGroupName')
|
||||
log_stream_name = self._get_param('logStreamName')
|
||||
log_events = self._get_param('logEvents')
|
||||
sequence_token = self._get_param('sequenceToken')
|
||||
|
||||
next_sequence_token = self.logs_backend.put_log_events(log_group_name, log_stream_name, log_events, sequence_token)
|
||||
return json.dumps({'nextSequenceToken': next_sequence_token})
|
||||
|
||||
def get_log_events(self):
|
||||
log_group_name = self._get_param('logGroupName')
|
||||
log_stream_name = self._get_param('logStreamName')
|
||||
start_time = self._get_param('startTime')
|
||||
end_time = self._get_param("endTime")
|
||||
limit = self._get_param('limit', 10000)
|
||||
assert limit <= 10000
|
||||
next_token = self._get_param('nextToken')
|
||||
start_from_head = self._get_param('startFromHead')
|
||||
|
||||
events, next_backward_token, next_foward_token = \
|
||||
self.logs_backend.get_log_events(log_group_name, log_stream_name, start_time, end_time, limit, next_token, start_from_head)
|
||||
|
||||
return json.dumps({
|
||||
"events": events,
|
||||
"nextBackwardToken": next_backward_token,
|
||||
"nextForwardToken": next_foward_token
|
||||
})
|
||||
|
||||
def filter_log_events(self):
|
||||
log_group_name = self._get_param('logGroupName')
|
||||
log_stream_names = self._get_param('logStreamNames', [])
|
||||
start_time = self._get_param('startTime')
|
||||
# impl, see: http://docs.aws.amazon.com/AmazonCloudWatch/latest/logs/FilterAndPatternSyntax.html
|
||||
filter_pattern = self._get_param('filterPattern')
|
||||
interleaved = self._get_param('interleaved', False)
|
||||
end_time = self._get_param("endTime")
|
||||
limit = self._get_param('limit', 10000)
|
||||
assert limit <= 10000
|
||||
next_token = self._get_param('nextToken')
|
||||
|
||||
events, next_token, searched_streams = self.logs_backend.filter_log_events(log_group_name, log_stream_names, start_time, end_time, limit, next_token, filter_pattern, interleaved)
|
||||
return json.dumps({
|
||||
"events": events,
|
||||
"nextToken": next_token,
|
||||
"searchedLogStreams": searched_streams
|
||||
})
|
||||
9
moto/logs/urls.py
Normal file
9
moto/logs/urls.py
Normal file
|
|
@ -0,0 +1,9 @@
|
|||
from .responses import LogsResponse
|
||||
|
||||
url_bases = [
|
||||
"https?://logs.(.+).amazonaws.com",
|
||||
]
|
||||
|
||||
url_paths = {
|
||||
'{0}/$': LogsResponse.dispatch,
|
||||
}
|
||||
Loading…
Add table
Add a link
Reference in a new issue