Add EventBridge Archive CRUD endpoints (#3617)

* Add events.create_archive

* Add events.describe_archive

* Add events.list_archives

* Add events.update_archive

* Add events.delete_archive

* Add actual archive functionality

* Fix test

* Fix PR issues
This commit is contained in:
Anton Grübel 2021-01-28 11:47:53 +01:00 committed by GitHub
commit 199da2220b
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
5 changed files with 985 additions and 1 deletions

View file

@ -2,6 +2,15 @@ from __future__ import unicode_literals
from moto.core.exceptions import JsonRESTError
class InvalidEventPatternException(JsonRESTError):
code = 400
def __init__(self):
super(InvalidEventPatternException, self).__init__(
"InvalidEventPatternException", "Event pattern is not valid."
)
class ResourceNotFoundException(JsonRESTError):
code = 400
@ -11,6 +20,15 @@ class ResourceNotFoundException(JsonRESTError):
)
class ResourceAlreadyExistsException(JsonRESTError):
code = 400
def __init__(self, message):
super(ResourceAlreadyExistsException, self).__init__(
"ResourceAlreadyExistsException", message
)
class ValidationException(JsonRESTError):
code = 400

View file

@ -1,11 +1,21 @@
import copy
import os
import re
import json
import sys
from datetime import datetime
from boto3 import Session
from moto.core.exceptions import JsonRESTError
from moto.core import ACCOUNT_ID, BaseBackend, CloudFormationModel
from moto.events.exceptions import ValidationException, ResourceNotFoundException
from moto.core.utils import unix_time
from moto.events.exceptions import (
ValidationException,
ResourceNotFoundException,
ResourceAlreadyExistsException,
InvalidEventPatternException,
)
from moto.utilities.tagging_service import TaggingService
from uuid import uuid4
@ -200,6 +210,146 @@ class EventBus(CloudFormationModel):
event_backend.delete_event_bus(event_bus_name)
class Archive(CloudFormationModel):
# https://docs.aws.amazon.com/eventbridge/latest/APIReference/API_ListArchives.html#API_ListArchives_RequestParameters
VALID_STATES = [
"ENABLED",
"DISABLED",
"CREATING",
"UPDATING",
"CREATE_FAILED",
"UPDATE_FAILED",
]
def __init__(
self, region_name, name, source_arn, description, event_pattern, retention
):
self.region = region_name
self.name = name
self.source_arn = source_arn
self.description = description
self.event_pattern = event_pattern
self.retention = retention if retention else 0
self.creation_time = unix_time(datetime.utcnow())
self.state = "ENABLED"
self.events = []
self.event_bus_name = source_arn.split("/")[-1]
@property
def arn(self):
return "arn:aws:events:{region}:{account_id}:archive/{name}".format(
region=self.region, account_id=ACCOUNT_ID, name=self.name
)
def describe_short(self):
return {
"ArchiveName": self.name,
"EventSourceArn": self.source_arn,
"State": self.state,
"RetentionDays": self.retention,
"SizeBytes": sys.getsizeof(self.events) if len(self.events) > 0 else 0,
"EventCount": len(self.events),
"CreationTime": self.creation_time,
}
def describe(self):
result = {
"ArchiveArn": self.arn,
"Description": self.description,
"EventPattern": self.event_pattern,
}
result.update(self.describe_short())
return result
def update(self, description, event_pattern, retention):
if description:
self.description = description
if event_pattern:
self.event_pattern = event_pattern
if retention:
self.retention = retention
def delete(self, region_name):
event_backend = events_backends[region_name]
event_backend.archives.pop(self.name)
def matches_pattern(self, event):
if not self.event_pattern:
return True
# only works on the first level of the event dict
# logic for nested dicts needs to be implemented
for pattern_key, pattern_value in json.loads(self.event_pattern).items():
event_value = event.get(pattern_key)
if event_value not in pattern_value:
return False
def get_cfn_attribute(self, attribute_name):
from moto.cloudformation.exceptions import UnformattedGetAttTemplateException
if attribute_name == "ArchiveName":
return self.name
elif attribute_name == "Arn":
return self.arn
raise UnformattedGetAttTemplateException()
@staticmethod
def cloudformation_name_type():
return "ArchiveName"
@staticmethod
def cloudformation_type():
# https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-events-archive.html
return "AWS::Events::Archive"
@classmethod
def create_from_cloudformation_json(
cls, resource_name, cloudformation_json, region_name
):
properties = cloudformation_json["Properties"]
event_backend = events_backends[region_name]
source_arn = properties.get("SourceArn")
description = properties.get("Description")
event_pattern = properties.get("EventPattern")
retention = properties.get("RetentionDays")
return event_backend.create_archive(
resource_name, source_arn, description, event_pattern, retention
)
@classmethod
def update_from_cloudformation_json(
cls, original_resource, new_resource_name, cloudformation_json, region_name
):
if new_resource_name == original_resource.name:
properties = cloudformation_json["Properties"]
original_resource.update(
properties.get("Description"),
properties.get("EventPattern"),
properties.get("Retention"),
)
return original_resource
else:
original_resource.delete(region_name)
return cls.create_from_cloudformation_json(
new_resource_name, cloudformation_json, region_name
)
@classmethod
def delete_from_cloudformation_json(
cls, resource_name, cloudformation_json, region_name
):
event_backend = events_backends[region_name]
event_backend.delete_archive(resource_name)
class EventsBackend(BaseBackend):
ACCOUNT_ID = re.compile(r"^(\d{1,12}|\*)$")
STATEMENT_ID = re.compile(r"^[a-zA-Z0-9-_]{1,64}$")
@ -213,6 +363,7 @@ class EventsBackend(BaseBackend):
self.region_name = region_name
self.event_buses = {}
self.event_sources = {}
self.archives = {}
self.tagger = TaggingService()
self._add_default_event_bus()
@ -404,6 +555,22 @@ class EventsBackend(BaseBackend):
entries.append({"EventId": str(uuid4())})
# add to correct archive
# if 'EventBusName' is not espically set, it will stored in the default
event_bus_name = event.get("EventBusName", "default")
archives = [
archive
for archive in self.archives.values()
if archive.event_bus_name == event_bus_name
]
for archive in archives:
event_copy = copy.deepcopy(event)
event_copy.pop("EventBusName", None)
if archive.matches_pattern(event):
archive.events.append(event_copy)
# We dont really need to store the events yet
return entries
@ -544,6 +711,118 @@ class EventsBackend(BaseBackend):
"Rule {0} does not exist on EventBus default.".format(name)
)
def create_archive(self, name, source_arn, description, event_pattern, retention):
if len(name) > 48:
raise ValidationException(
" 1 validation error detected: "
"Value '{}' at 'archiveName' failed to satisfy constraint: "
"Member must have length less than or equal to 48".format(name)
)
if event_pattern:
self._validate_event_pattern(event_pattern)
event_bus_name = source_arn.split("/")[-1]
if event_bus_name not in self.event_buses:
raise ResourceNotFoundException(
"Event bus {} does not exist.".format(event_bus_name)
)
if name in self.archives:
raise ResourceAlreadyExistsException(
"Archive {} already exists.".format(name)
)
archive = Archive(
self.region_name, name, source_arn, description, event_pattern, retention
)
self.archives[name] = archive
return archive
def _validate_event_pattern(self, pattern):
try:
json_pattern = json.loads(pattern)
except ValueError: # json.JSONDecodeError exists since Python 3.5
raise InvalidEventPatternException
if not self._is_event_value_an_array(json_pattern):
raise InvalidEventPatternException
def _is_event_value_an_array(self, pattern):
# the values of a key in the event pattern have to be either a dict or an array
for value in pattern.values():
if isinstance(value, dict):
if not self._is_event_value_an_array(value):
return False
elif not isinstance(value, list):
return False
return True
def describe_archive(self, name):
archive = self.archives.get(name)
if not archive:
raise ResourceNotFoundException("Archive {} does not exist.".format(name))
return archive.describe()
def list_archives(self, name_prefix, source_arn, state):
if [name_prefix, source_arn, state].count(None) < 2:
raise ValidationException(
"At most one filter is allowed for ListArchives. "
"Use either : State, EventSourceArn, or NamePrefix."
)
if state and state not in Archive.VALID_STATES:
raise ValidationException(
"1 validation error detected: Value '{0}' at 'state' failed to satisfy constraint: "
"Member must satisfy enum value set: "
"[{1}]".format(state, ", ".join(Archive.VALID_STATES))
)
if [name_prefix, source_arn, state].count(None) == 3:
return [archive.describe_short() for archive in self.archives.values()]
result = []
for archive in self.archives.values():
if name_prefix and archive.name.startswith(name_prefix):
result.append(archive.describe_short())
elif source_arn and archive.source_arn == source_arn:
result.append(archive.describe_short())
elif state and archive.state == state:
result.append(archive.describe_short())
return result
def update_archive(self, name, description, event_pattern, retention):
archive = self.archives.get(name)
if not archive:
raise ResourceNotFoundException("Archive {} does not exist.".format(name))
if event_pattern:
self._validate_event_pattern(event_pattern)
archive.update(description, event_pattern, retention)
return {
"ArchiveArn": archive.arn,
"CreationTime": archive.creation_time,
"State": archive.state,
}
def delete_archive(self, name):
archive = self.archives.get(name)
if not archive:
raise ResourceNotFoundException("Archive {} does not exist.".format(name))
archive.delete(self.region_name)
events_backends = {}
for region in Session().get_available_regions("events"):

View file

@ -332,3 +332,60 @@ class EventsHandler(BaseResponse):
result = self.events_backend.untag_resource(arn, tags)
return json.dumps(result), self.response_headers
def create_archive(self):
name = self._get_param("ArchiveName")
source_arn = self._get_param("EventSourceArn")
description = self._get_param("Description")
event_pattern = self._get_param("EventPattern")
retention = self._get_param("RetentionDays")
archive = self.events_backend.create_archive(
name, source_arn, description, event_pattern, retention
)
return (
json.dumps(
{
"ArchiveArn": archive.arn,
"CreationTime": archive.creation_time,
"State": archive.state,
}
),
self.response_headers,
)
def describe_archive(self):
name = self._get_param("ArchiveName")
result = self.events_backend.describe_archive(name)
return json.dumps(result), self.response_headers
def list_archives(self):
name_prefix = self._get_param("NamePrefix")
source_arn = self._get_param("EventSourceArn")
state = self._get_param("State")
result = self.events_backend.list_archives(name_prefix, source_arn, state)
return json.dumps({"Archives": result}), self.response_headers
def update_archive(self):
name = self._get_param("ArchiveName")
description = self._get_param("Description")
event_pattern = self._get_param("EventPattern")
retention = self._get_param("RetentionDays")
result = self.events_backend.update_archive(
name, description, event_pattern, retention
)
return json.dumps(result), self.response_headers
def delete_archive(self):
name = self._get_param("ArchiveName")
self.events_backend.delete_archive(name)
return "", self.response_headers