Add CloudWatch logs subscription filters (#2982)
* Add logs.describe_subscription_filters * Add logs.put_subscription_filter * Add logs.delete_subscription_filter * Change to usage of ACCOUNT_ID
This commit is contained in:
parent
1e0a7380d5
commit
e73a694219
5 changed files with 585 additions and 4 deletions
|
|
@ -5,6 +5,8 @@ import time
|
|||
from collections import defaultdict
|
||||
import copy
|
||||
import datetime
|
||||
from gzip import GzipFile
|
||||
|
||||
import docker
|
||||
import docker.errors
|
||||
import hashlib
|
||||
|
|
@ -983,6 +985,28 @@ class LambdaBackend(BaseBackend):
|
|||
func = self._lambdas.get_arn(function_arn)
|
||||
return func.invoke(json.dumps(event), {}, {})
|
||||
|
||||
def send_log_event(
|
||||
self, function_arn, filter_name, log_group_name, log_stream_name, log_events
|
||||
):
|
||||
data = {
|
||||
"messageType": "DATA_MESSAGE",
|
||||
"owner": ACCOUNT_ID,
|
||||
"logGroup": log_group_name,
|
||||
"logStream": log_stream_name,
|
||||
"subscriptionFilters": [filter_name],
|
||||
"logEvents": log_events,
|
||||
}
|
||||
|
||||
output = io.BytesIO()
|
||||
with GzipFile(fileobj=output, mode="w") as f:
|
||||
f.write(json.dumps(data, separators=(",", ":")).encode("utf-8"))
|
||||
payload_gz_encoded = base64.b64encode(output.getvalue()).decode("utf-8")
|
||||
|
||||
event = {"awslogs": {"data": payload_gz_encoded}}
|
||||
|
||||
func = self._lambdas.get_arn(function_arn)
|
||||
return func.invoke(json.dumps(event), {}, {})
|
||||
|
||||
def list_tags(self, resource):
|
||||
return self.get_function_by_arn(resource).tags
|
||||
|
||||
|
|
|
|||
|
|
@ -7,10 +7,10 @@ class LogsClientError(JsonRESTError):
|
|||
|
||||
|
||||
class ResourceNotFoundException(LogsClientError):
|
||||
def __init__(self):
|
||||
def __init__(self, msg=None):
|
||||
self.code = 400
|
||||
super(ResourceNotFoundException, self).__init__(
|
||||
"ResourceNotFoundException", "The specified resource does not exist"
|
||||
"ResourceNotFoundException", msg or "The specified log group does not exist"
|
||||
)
|
||||
|
||||
|
||||
|
|
@ -28,3 +28,11 @@ class ResourceAlreadyExistsException(LogsClientError):
|
|||
super(ResourceAlreadyExistsException, self).__init__(
|
||||
"ResourceAlreadyExistsException", "The specified log group already exists"
|
||||
)
|
||||
|
||||
|
||||
class LimitExceededException(LogsClientError):
|
||||
def __init__(self):
|
||||
self.code = 400
|
||||
super(LimitExceededException, self).__init__(
|
||||
"LimitExceededException", "Resource limit exceeded."
|
||||
)
|
||||
|
|
|
|||
|
|
@ -6,6 +6,7 @@ from .exceptions import (
|
|||
ResourceNotFoundException,
|
||||
ResourceAlreadyExistsException,
|
||||
InvalidParameterException,
|
||||
LimitExceededException,
|
||||
)
|
||||
|
||||
|
||||
|
|
@ -57,6 +58,8 @@ class LogStream:
|
|||
0 # I'm guessing this is token needed for sequenceToken by put_events
|
||||
)
|
||||
self.events = []
|
||||
self.destination_arn = None
|
||||
self.filter_name = None
|
||||
|
||||
self.__class__._log_ids += 1
|
||||
|
||||
|
|
@ -97,11 +100,32 @@ class LogStream:
|
|||
self.lastIngestionTime = int(unix_time_millis())
|
||||
# TODO: make this match AWS if possible
|
||||
self.storedBytes += sum([len(log_event["message"]) for log_event in log_events])
|
||||
self.events += [
|
||||
events = [
|
||||
LogEvent(self.lastIngestionTime, log_event) for log_event in log_events
|
||||
]
|
||||
self.events += events
|
||||
self.uploadSequenceToken += 1
|
||||
|
||||
if self.destination_arn and self.destination_arn.split(":")[2] == "lambda":
|
||||
from moto.awslambda import lambda_backends # due to circular dependency
|
||||
|
||||
lambda_log_events = [
|
||||
{
|
||||
"id": event.eventId,
|
||||
"timestamp": event.timestamp,
|
||||
"message": event.message,
|
||||
}
|
||||
for event in events
|
||||
]
|
||||
|
||||
lambda_backends[self.region].send_log_event(
|
||||
self.destination_arn,
|
||||
self.filter_name,
|
||||
log_group_name,
|
||||
log_stream_name,
|
||||
lambda_log_events,
|
||||
)
|
||||
|
||||
return "{:056d}".format(self.uploadSequenceToken)
|
||||
|
||||
def get_log_events(
|
||||
|
|
@ -227,6 +251,7 @@ class LogGroup:
|
|||
self.retention_in_days = kwargs.get(
|
||||
"RetentionInDays"
|
||||
) # AWS defaults to Never Expire for log group retention
|
||||
self.subscription_filters = []
|
||||
|
||||
def create_log_stream(self, log_stream_name):
|
||||
if log_stream_name in self.streams:
|
||||
|
|
@ -386,6 +411,48 @@ class LogGroup:
|
|||
k: v for (k, v) in self.tags.items() if k not in tags_to_remove
|
||||
}
|
||||
|
||||
def describe_subscription_filters(self):
|
||||
return self.subscription_filters
|
||||
|
||||
def put_subscription_filter(
|
||||
self, filter_name, filter_pattern, destination_arn, role_arn
|
||||
):
|
||||
creation_time = int(unix_time_millis())
|
||||
|
||||
# only one subscription filter can be associated with a log group
|
||||
if self.subscription_filters:
|
||||
if self.subscription_filters[0]["filterName"] == filter_name:
|
||||
creation_time = self.subscription_filters[0]["creationTime"]
|
||||
else:
|
||||
raise LimitExceededException
|
||||
|
||||
for stream in self.streams.values():
|
||||
stream.destination_arn = destination_arn
|
||||
stream.filter_name = filter_name
|
||||
|
||||
self.subscription_filters = [
|
||||
{
|
||||
"filterName": filter_name,
|
||||
"logGroupName": self.name,
|
||||
"filterPattern": filter_pattern,
|
||||
"destinationArn": destination_arn,
|
||||
"roleArn": role_arn,
|
||||
"distribution": "ByLogStream",
|
||||
"creationTime": creation_time,
|
||||
}
|
||||
]
|
||||
|
||||
def delete_subscription_filter(self, filter_name):
|
||||
if (
|
||||
not self.subscription_filters
|
||||
or self.subscription_filters[0]["filterName"] != filter_name
|
||||
):
|
||||
raise ResourceNotFoundException(
|
||||
"The specified subscription filter does not exist."
|
||||
)
|
||||
|
||||
self.subscription_filters = []
|
||||
|
||||
|
||||
class LogsBackend(BaseBackend):
|
||||
def __init__(self, region_name):
|
||||
|
|
@ -557,6 +624,46 @@ class LogsBackend(BaseBackend):
|
|||
log_group = self.groups[log_group_name]
|
||||
log_group.untag(tags)
|
||||
|
||||
def describe_subscription_filters(self, log_group_name):
|
||||
log_group = self.groups.get(log_group_name)
|
||||
|
||||
if not log_group:
|
||||
raise ResourceNotFoundException()
|
||||
|
||||
return log_group.describe_subscription_filters()
|
||||
|
||||
def put_subscription_filter(
|
||||
self, log_group_name, filter_name, filter_pattern, destination_arn, role_arn
|
||||
):
|
||||
# TODO: support other destinations like Kinesis stream
|
||||
from moto.awslambda import lambda_backends # due to circular dependency
|
||||
|
||||
log_group = self.groups.get(log_group_name)
|
||||
|
||||
if not log_group:
|
||||
raise ResourceNotFoundException()
|
||||
|
||||
lambda_func = lambda_backends[self.region_name].get_function(destination_arn)
|
||||
|
||||
# no specific permission check implemented
|
||||
if not lambda_func:
|
||||
raise InvalidParameterException(
|
||||
"Could not execute the lambda function. "
|
||||
"Make sure you have given CloudWatch Logs permission to execute your function."
|
||||
)
|
||||
|
||||
log_group.put_subscription_filter(
|
||||
filter_name, filter_pattern, destination_arn, role_arn
|
||||
)
|
||||
|
||||
def delete_subscription_filter(self, log_group_name, filter_name):
|
||||
log_group = self.groups.get(log_group_name)
|
||||
|
||||
if not log_group:
|
||||
raise ResourceNotFoundException()
|
||||
|
||||
log_group.delete_subscription_filter(filter_name)
|
||||
|
||||
|
||||
logs_backends = {}
|
||||
for region in Session().get_available_regions("logs"):
|
||||
|
|
|
|||
|
|
@ -178,3 +178,33 @@ class LogsResponse(BaseResponse):
|
|||
tags = self._get_param("tags")
|
||||
self.logs_backend.untag_log_group(log_group_name, tags)
|
||||
return ""
|
||||
|
||||
def describe_subscription_filters(self):
|
||||
log_group_name = self._get_param("logGroupName")
|
||||
|
||||
subscription_filters = self.logs_backend.describe_subscription_filters(
|
||||
log_group_name
|
||||
)
|
||||
|
||||
return json.dumps({"subscriptionFilters": subscription_filters})
|
||||
|
||||
def put_subscription_filter(self):
|
||||
log_group_name = self._get_param("logGroupName")
|
||||
filter_name = self._get_param("filterName")
|
||||
filter_pattern = self._get_param("filterPattern")
|
||||
destination_arn = self._get_param("destinationArn")
|
||||
role_arn = self._get_param("roleArn")
|
||||
|
||||
self.logs_backend.put_subscription_filter(
|
||||
log_group_name, filter_name, filter_pattern, destination_arn, role_arn
|
||||
)
|
||||
|
||||
return ""
|
||||
|
||||
def delete_subscription_filter(self):
|
||||
log_group_name = self._get_param("logGroupName")
|
||||
filter_name = self._get_param("filterName")
|
||||
|
||||
self.logs_backend.delete_subscription_filter(log_group_name, filter_name)
|
||||
|
||||
return ""
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue