Add eventbridge replay (#3735)
* Add events.start_replay * Add events.describe_replay * Add events.list_replays * Add events.cancel_replay * implement actual replay functionality * Fix Python 2.7 issues
This commit is contained in:
parent
6da8dc0aec
commit
3c810ad152
4 changed files with 1038 additions and 49 deletions
|
|
@ -2,6 +2,13 @@ from __future__ import unicode_literals
|
|||
from moto.core.exceptions import JsonRESTError
|
||||
|
||||
|
||||
class IllegalStatusException(JsonRESTError):
|
||||
code = 400
|
||||
|
||||
def __init__(self, message):
|
||||
super(IllegalStatusException, self).__init__("IllegalStatusException", message)
|
||||
|
||||
|
||||
class InvalidEventPatternException(JsonRESTError):
|
||||
code = 400
|
||||
|
||||
|
|
|
|||
|
|
@ -3,18 +3,21 @@ import os
|
|||
import re
|
||||
import json
|
||||
import sys
|
||||
from collections import namedtuple
|
||||
from datetime import datetime
|
||||
from enum import Enum, unique
|
||||
|
||||
from boto3 import Session
|
||||
|
||||
from moto.core.exceptions import JsonRESTError
|
||||
from moto.core import ACCOUNT_ID, BaseBackend, CloudFormationModel
|
||||
from moto.core.utils import unix_time
|
||||
from moto.core import ACCOUNT_ID, BaseBackend, CloudFormationModel, BaseModel
|
||||
from moto.core.utils import unix_time, iso_8601_datetime_without_milliseconds
|
||||
from moto.events.exceptions import (
|
||||
ValidationException,
|
||||
ResourceNotFoundException,
|
||||
ResourceAlreadyExistsException,
|
||||
InvalidEventPatternException,
|
||||
IllegalStatusException,
|
||||
)
|
||||
from moto.utilities.tagging_service import TaggingService
|
||||
|
||||
|
|
@ -22,6 +25,8 @@ from uuid import uuid4
|
|||
|
||||
|
||||
class Rule(CloudFormationModel):
|
||||
Arn = namedtuple("Arn", ["service", "resource_type", "resource_id"])
|
||||
|
||||
def _generate_arn(self, name):
|
||||
return "arn:aws:events:{region_name}:111111111111:rule/{name}".format(
|
||||
region_name=self.region_name, name=name
|
||||
|
|
@ -36,7 +41,7 @@ class Rule(CloudFormationModel):
|
|||
self.state = kwargs.get("State") or "ENABLED"
|
||||
self.description = kwargs.get("Description")
|
||||
self.role_arn = kwargs.get("RoleArn")
|
||||
self.event_bus_name = kwargs.get("EventBusName", "default")
|
||||
self.event_bus_name = kwargs.get("EventBusName") or "default"
|
||||
self.targets = []
|
||||
|
||||
@property
|
||||
|
|
@ -76,6 +81,98 @@ class Rule(CloudFormationModel):
|
|||
if index is not None:
|
||||
self.targets.pop(index)
|
||||
|
||||
def send_to_targets(self, event_bus_name, event):
|
||||
if event_bus_name != self.event_bus_name:
|
||||
return
|
||||
|
||||
if not self._validate_event(event):
|
||||
return
|
||||
|
||||
# for now only CW Log groups are supported
|
||||
for target in self.targets:
|
||||
arn = self._parse_arn(target["Arn"])
|
||||
|
||||
if arn.service == "logs" and arn.resource_type == "log-group":
|
||||
self._send_to_cw_log_group(arn.resource_id, event)
|
||||
elif arn.service == "events" and not arn.resource_type:
|
||||
input_template = json.loads(target["InputTransformer"]["InputTemplate"])
|
||||
archive_arn = self._parse_arn(input_template["archive-arn"])
|
||||
|
||||
self._send_to_events_archive(archive_arn.resource_id, event)
|
||||
else:
|
||||
raise NotImplementedError("Expr not defined for {0}".format(type(self)))
|
||||
|
||||
def _validate_event(self, event):
|
||||
for field, pattern in json.loads(self.event_pattern).items():
|
||||
if not isinstance(pattern, list):
|
||||
# to keep it simple at the beginning only pattern with 1 level of depth are validated
|
||||
continue
|
||||
|
||||
if isinstance(pattern[0], dict):
|
||||
if "exists" in pattern[0]:
|
||||
if pattern[0]["exists"] and field not in event:
|
||||
return False
|
||||
elif not pattern[0]["exists"] and field in event:
|
||||
return False
|
||||
elif event.get(field) not in pattern:
|
||||
return False
|
||||
|
||||
return True
|
||||
|
||||
def _parse_arn(self, arn):
|
||||
# http://docs.aws.amazon.com/general/latest/gr/aws-arns-and-namespaces.html
|
||||
# this method needs probably some more fine tuning,
|
||||
# when also other targets are supported
|
||||
elements = arn.split(":", 5)
|
||||
|
||||
service = elements[2]
|
||||
resource = elements[5]
|
||||
|
||||
if ":" in resource and "/" in resource:
|
||||
if resource.index(":") < resource.index("/"):
|
||||
resource_type, resource_id = resource.split(":", 1)
|
||||
else:
|
||||
resource_type, resource_id = resource.split("/", 1)
|
||||
elif ":" in resource:
|
||||
resource_type, resource_id = resource.split(":", 1)
|
||||
elif "/" in resource:
|
||||
resource_type, resource_id = resource.split("/", 1)
|
||||
else:
|
||||
resource_type = None
|
||||
resource_id = resource
|
||||
|
||||
return self.Arn(
|
||||
service=service, resource_type=resource_type, resource_id=resource_id
|
||||
)
|
||||
|
||||
def _send_to_cw_log_group(self, name, event):
|
||||
from moto.logs import logs_backends
|
||||
|
||||
event_copy = copy.deepcopy(event)
|
||||
event_copy["time"] = iso_8601_datetime_without_milliseconds(
|
||||
datetime.utcfromtimestamp(event_copy["time"])
|
||||
)
|
||||
|
||||
log_stream_name = str(uuid4())
|
||||
log_events = [
|
||||
{
|
||||
"timestamp": unix_time(datetime.utcnow()),
|
||||
"message": json.dumps(event_copy),
|
||||
}
|
||||
]
|
||||
|
||||
logs_backends[self.region_name].create_log_stream(name, log_stream_name)
|
||||
logs_backends[self.region_name].put_log_events(
|
||||
name, log_stream_name, log_events, None
|
||||
)
|
||||
|
||||
def _send_to_events_archive(self, resource_id, event):
|
||||
archive_name, archive_uuid = resource_id.split(":")
|
||||
archive = events_backends[self.region_name].archives.get(archive_name)
|
||||
|
||||
if archive.uuid == archive_uuid:
|
||||
archive.events.append(event)
|
||||
|
||||
def get_cfn_attribute(self, attribute_name):
|
||||
from moto.cloudformation.exceptions import UnformattedGetAttTemplateException
|
||||
|
||||
|
|
@ -233,6 +330,7 @@ class Archive(CloudFormationModel):
|
|||
|
||||
self.creation_time = unix_time(datetime.utcnow())
|
||||
self.state = "ENABLED"
|
||||
self.uuid = str(uuid4())
|
||||
|
||||
self.events = []
|
||||
self.event_bus_name = source_arn.split("/")[-1]
|
||||
|
|
@ -276,19 +374,6 @@ class Archive(CloudFormationModel):
|
|||
event_backend = events_backends[region_name]
|
||||
event_backend.archives.pop(self.name)
|
||||
|
||||
def matches_pattern(self, event):
|
||||
if not self.event_pattern:
|
||||
return True
|
||||
|
||||
# only works on the first level of the event dict
|
||||
# logic for nested dicts needs to be implemented
|
||||
for pattern_key, pattern_value in json.loads(self.event_pattern).items():
|
||||
event_value = event.get(pattern_key)
|
||||
if event_value not in pattern_value:
|
||||
return False
|
||||
|
||||
return True
|
||||
|
||||
def get_cfn_attribute(self, attribute_name):
|
||||
from moto.cloudformation.exceptions import UnformattedGetAttTemplateException
|
||||
|
||||
|
|
@ -352,6 +437,82 @@ class Archive(CloudFormationModel):
|
|||
event_backend.delete_archive(resource_name)
|
||||
|
||||
|
||||
@unique
|
||||
class ReplayState(Enum):
|
||||
# https://docs.aws.amazon.com/eventbridge/latest/APIReference/API_ListReplays.html#API_ListReplays_RequestParameters
|
||||
STARTING = "STARTING"
|
||||
RUNNING = "RUNNING"
|
||||
CANCELLING = "CANCELLING"
|
||||
COMPLETED = "COMPLETED"
|
||||
CANCELLED = "CANCELLED"
|
||||
FAILED = "FAILED"
|
||||
|
||||
|
||||
class Replay(BaseModel):
|
||||
def __init__(
|
||||
self,
|
||||
region_name,
|
||||
name,
|
||||
description,
|
||||
source_arn,
|
||||
start_time,
|
||||
end_time,
|
||||
destination,
|
||||
):
|
||||
self.region = region_name
|
||||
self.name = name
|
||||
self.description = description
|
||||
self.source_arn = source_arn
|
||||
self.event_start_time = start_time
|
||||
self.event_end_time = end_time
|
||||
self.destination = destination
|
||||
|
||||
self.state = ReplayState.STARTING
|
||||
self.start_time = unix_time(datetime.utcnow())
|
||||
self.end_time = None
|
||||
|
||||
@property
|
||||
def arn(self):
|
||||
return "arn:aws:events:{region}:{account_id}:replay/{name}".format(
|
||||
region=self.region, account_id=ACCOUNT_ID, name=self.name
|
||||
)
|
||||
|
||||
def describe_short(self):
|
||||
return {
|
||||
"ReplayName": self.name,
|
||||
"EventSourceArn": self.source_arn,
|
||||
"State": self.state.value,
|
||||
"EventStartTime": self.event_start_time,
|
||||
"EventEndTime": self.event_end_time,
|
||||
"ReplayStartTime": self.start_time,
|
||||
"ReplayEndTime": self.end_time,
|
||||
}
|
||||
|
||||
def describe(self):
|
||||
result = {
|
||||
"ReplayArn": self.arn,
|
||||
"Description": self.description,
|
||||
"Destination": self.destination,
|
||||
}
|
||||
|
||||
result.update(self.describe_short())
|
||||
|
||||
return result
|
||||
|
||||
def replay_events(self, archive):
|
||||
event_bus_name = self.destination["Arn"].split("/")[-1]
|
||||
|
||||
for event in archive.events:
|
||||
for rule in events_backends[self.region].rules.values():
|
||||
rule.send_to_targets(
|
||||
event_bus_name,
|
||||
dict(event, **{"id": str(uuid4()), "replay-name": self.name}),
|
||||
)
|
||||
|
||||
self.state = ReplayState.COMPLETED
|
||||
self.end_time = unix_time(datetime.utcnow())
|
||||
|
||||
|
||||
class EventsBackend(BaseBackend):
|
||||
ACCOUNT_ID = re.compile(r"^(\d{1,12}|\*)$")
|
||||
STATEMENT_ID = re.compile(r"^[a-zA-Z0-9-_]{1,64}$")
|
||||
|
|
@ -366,6 +527,7 @@ class EventsBackend(BaseBackend):
|
|||
self.event_buses = {}
|
||||
self.event_sources = {}
|
||||
self.archives = {}
|
||||
self.replays = {}
|
||||
self.tagger = TaggingService()
|
||||
|
||||
self._add_default_event_bus()
|
||||
|
|
@ -402,6 +564,24 @@ class EventsBackend(BaseBackend):
|
|||
|
||||
return start_index, end_index, new_next_token
|
||||
|
||||
def _get_event_bus(self, name):
|
||||
event_bus_name = name.split("/")[-1]
|
||||
|
||||
event_bus = self.event_buses.get(event_bus_name)
|
||||
if not event_bus:
|
||||
raise ResourceNotFoundException(
|
||||
"Event bus {} does not exist.".format(event_bus_name)
|
||||
)
|
||||
|
||||
return event_bus
|
||||
|
||||
def _get_replay(self, name):
|
||||
replay = self.replays.get(name)
|
||||
if not replay:
|
||||
raise ResourceNotFoundException("Replay {} does not exist.".format(name))
|
||||
|
||||
return replay
|
||||
|
||||
def delete_rule(self, name):
|
||||
self.rules_order.pop(self.rules_order.index(name))
|
||||
arn = self.rules.get(name).arn
|
||||
|
|
@ -510,9 +690,7 @@ class EventsBackend(BaseBackend):
|
|||
def put_events(self, events):
|
||||
num_events = len(events)
|
||||
|
||||
if num_events < 1:
|
||||
raise JsonRESTError("ValidationError", "Need at least 1 event")
|
||||
elif num_events > 10:
|
||||
if num_events > 10:
|
||||
# the exact error text is longer, the Value list consists of all the put events
|
||||
raise ValidationException(
|
||||
"1 validation error detected: "
|
||||
|
|
@ -555,25 +733,28 @@ class EventsBackend(BaseBackend):
|
|||
)
|
||||
continue
|
||||
|
||||
entries.append({"EventId": str(uuid4())})
|
||||
event_id = str(uuid4())
|
||||
entries.append({"EventId": event_id})
|
||||
|
||||
# add to correct archive
|
||||
# if 'EventBusName' is not espically set, it will stored in the default
|
||||
# if 'EventBusName' is not especially set, it will be sent to the default one
|
||||
event_bus_name = event.get("EventBusName", "default")
|
||||
archives = [
|
||||
archive
|
||||
for archive in self.archives.values()
|
||||
if archive.event_bus_name == event_bus_name
|
||||
]
|
||||
|
||||
for archive in archives:
|
||||
event_copy = copy.deepcopy(event)
|
||||
event_copy.pop("EventBusName", None)
|
||||
for rule in self.rules.values():
|
||||
rule.send_to_targets(
|
||||
event_bus_name,
|
||||
{
|
||||
"version": "0",
|
||||
"id": event_id,
|
||||
"detail-type": event["DetailType"],
|
||||
"source": event["Source"],
|
||||
"account": ACCOUNT_ID,
|
||||
"time": event.get("Time", unix_time(datetime.utcnow())),
|
||||
"region": self.region_name,
|
||||
"resources": event.get("Resources", []),
|
||||
"detail": json.loads(event["Detail"]),
|
||||
},
|
||||
)
|
||||
|
||||
if archive.matches_pattern(event):
|
||||
archive.events.append(event_copy)
|
||||
|
||||
# We dont really need to store the events yet
|
||||
return entries
|
||||
|
||||
def remove_targets(self, name, ids):
|
||||
|
|
@ -639,12 +820,7 @@ class EventsBackend(BaseBackend):
|
|||
if not name:
|
||||
name = "default"
|
||||
|
||||
event_bus = self.event_buses.get(name)
|
||||
|
||||
if not event_bus:
|
||||
raise JsonRESTError(
|
||||
"ResourceNotFoundException", "Event bus {} does not exist.".format(name)
|
||||
)
|
||||
event_bus = self._get_event_bus(name)
|
||||
|
||||
return event_bus
|
||||
|
||||
|
|
@ -724,11 +900,7 @@ class EventsBackend(BaseBackend):
|
|||
if event_pattern:
|
||||
self._validate_event_pattern(event_pattern)
|
||||
|
||||
event_bus_name = source_arn.split("/")[-1]
|
||||
if event_bus_name not in self.event_buses:
|
||||
raise ResourceNotFoundException(
|
||||
"Event bus {} does not exist.".format(event_bus_name)
|
||||
)
|
||||
event_bus = self._get_event_bus(source_arn)
|
||||
|
||||
if name in self.archives:
|
||||
raise ResourceAlreadyExistsException(
|
||||
|
|
@ -739,6 +911,38 @@ class EventsBackend(BaseBackend):
|
|||
self.region_name, name, source_arn, description, event_pattern, retention
|
||||
)
|
||||
|
||||
rule_event_pattern = json.loads(event_pattern or "{}")
|
||||
rule_event_pattern["replay-name"] = [{"exists": False}]
|
||||
|
||||
rule = self.put_rule(
|
||||
"Events-Archive-{}".format(name),
|
||||
**{
|
||||
"EventPattern": json.dumps(rule_event_pattern),
|
||||
"EventBusName": event_bus.name,
|
||||
}
|
||||
)
|
||||
self.put_targets(
|
||||
rule.name,
|
||||
[
|
||||
{
|
||||
"Id": rule.name,
|
||||
"Arn": "arn:aws:events:{}:::".format(self.region_name),
|
||||
"InputTransformer": {
|
||||
"InputPathsMap": {},
|
||||
"InputTemplate": json.dumps(
|
||||
{
|
||||
"archive-arn": "{0}:{1}".format(
|
||||
archive.arn, archive.uuid
|
||||
),
|
||||
"event": "<aws.events.event.json>",
|
||||
"ingestion-time": "<aws.events.event.ingestion-time>",
|
||||
}
|
||||
),
|
||||
},
|
||||
}
|
||||
],
|
||||
)
|
||||
|
||||
self.archives[name] = archive
|
||||
|
||||
return archive
|
||||
|
|
@ -780,7 +984,8 @@ class EventsBackend(BaseBackend):
|
|||
|
||||
if state and state not in Archive.VALID_STATES:
|
||||
raise ValidationException(
|
||||
"1 validation error detected: Value '{0}' at 'state' failed to satisfy constraint: "
|
||||
"1 validation error detected: "
|
||||
"Value '{0}' at 'state' failed to satisfy constraint: "
|
||||
"Member must satisfy enum value set: "
|
||||
"[{1}]".format(state, ", ".join(Archive.VALID_STATES))
|
||||
)
|
||||
|
|
@ -825,6 +1030,119 @@ class EventsBackend(BaseBackend):
|
|||
|
||||
archive.delete(self.region_name)
|
||||
|
||||
def start_replay(
|
||||
self, name, description, source_arn, start_time, end_time, destination
|
||||
):
|
||||
event_bus_arn = destination["Arn"]
|
||||
event_bus_arn_pattern = r"^arn:aws:events:[a-zA-Z0-9-]+:\d{12}:event-bus/"
|
||||
if not re.match(event_bus_arn_pattern, event_bus_arn):
|
||||
raise ValidationException(
|
||||
"Parameter Destination.Arn is not valid. "
|
||||
"Reason: Must contain an event bus ARN."
|
||||
)
|
||||
|
||||
self._get_event_bus(event_bus_arn)
|
||||
|
||||
archive_name = source_arn.split("/")[-1]
|
||||
archive = self.archives.get(archive_name)
|
||||
if not archive:
|
||||
raise ValidationException(
|
||||
"Parameter EventSourceArn is not valid. "
|
||||
"Reason: Archive {} does not exist.".format(archive_name)
|
||||
)
|
||||
|
||||
if event_bus_arn != archive.source_arn:
|
||||
raise ValidationException(
|
||||
"Parameter Destination.Arn is not valid. "
|
||||
"Reason: Cross event bus replay is not permitted."
|
||||
)
|
||||
|
||||
if start_time > end_time:
|
||||
raise ValidationException(
|
||||
"Parameter EventEndTime is not valid. "
|
||||
"Reason: EventStartTime must be before EventEndTime."
|
||||
)
|
||||
|
||||
if name in self.replays:
|
||||
raise ResourceAlreadyExistsException(
|
||||
"Replay {} already exists.".format(name)
|
||||
)
|
||||
|
||||
replay = Replay(
|
||||
self.region_name,
|
||||
name,
|
||||
description,
|
||||
source_arn,
|
||||
start_time,
|
||||
end_time,
|
||||
destination,
|
||||
)
|
||||
|
||||
self.replays[name] = replay
|
||||
|
||||
replay.replay_events(archive)
|
||||
|
||||
return {
|
||||
"ReplayArn": replay.arn,
|
||||
"ReplayStartTime": replay.start_time,
|
||||
"State": ReplayState.STARTING.value, # the replay will be done before returning the response
|
||||
}
|
||||
|
||||
def describe_replay(self, name):
|
||||
replay = self._get_replay(name)
|
||||
|
||||
return replay.describe()
|
||||
|
||||
def list_replays(self, name_prefix, source_arn, state):
|
||||
if [name_prefix, source_arn, state].count(None) < 2:
|
||||
raise ValidationException(
|
||||
"At most one filter is allowed for ListReplays. "
|
||||
"Use either : State, EventSourceArn, or NamePrefix."
|
||||
)
|
||||
|
||||
valid_states = sorted([item.value for item in ReplayState])
|
||||
if state and state not in valid_states:
|
||||
raise ValidationException(
|
||||
"1 validation error detected: "
|
||||
"Value '{0}' at 'state' failed to satisfy constraint: "
|
||||
"Member must satisfy enum value set: "
|
||||
"[{1}]".format(state, ", ".join(valid_states))
|
||||
)
|
||||
|
||||
if [name_prefix, source_arn, state].count(None) == 3:
|
||||
return [replay.describe_short() for replay in self.replays.values()]
|
||||
|
||||
result = []
|
||||
|
||||
for replay in self.replays.values():
|
||||
if name_prefix and replay.name.startswith(name_prefix):
|
||||
result.append(replay.describe_short())
|
||||
elif source_arn and replay.source_arn == source_arn:
|
||||
result.append(replay.describe_short())
|
||||
elif state and replay.state == state:
|
||||
result.append(replay.describe_short())
|
||||
|
||||
return result
|
||||
|
||||
def cancel_replay(self, name):
|
||||
replay = self._get_replay(name)
|
||||
|
||||
# replays in the state 'COMPLETED' can't be canceled,
|
||||
# but the implementation is done synchronously,
|
||||
# so they are done right after the start
|
||||
if replay.state not in [
|
||||
ReplayState.STARTING,
|
||||
ReplayState.RUNNING,
|
||||
ReplayState.COMPLETED,
|
||||
]:
|
||||
raise IllegalStatusException(
|
||||
"Replay {} is not in a valid state for this operation.".format(name)
|
||||
)
|
||||
|
||||
replay.state = ReplayState.CANCELLED
|
||||
|
||||
return {"ReplayArn": replay.arn, "State": ReplayState.CANCELLING.value}
|
||||
|
||||
|
||||
events_backends = {}
|
||||
for region in Session().get_available_regions("events"):
|
||||
|
|
|
|||
|
|
@ -389,3 +389,40 @@ class EventsHandler(BaseResponse):
|
|||
self.events_backend.delete_archive(name)
|
||||
|
||||
return "", self.response_headers
|
||||
|
||||
def start_replay(self):
|
||||
name = self._get_param("ReplayName")
|
||||
description = self._get_param("Description")
|
||||
source_arn = self._get_param("EventSourceArn")
|
||||
start_time = self._get_param("EventStartTime")
|
||||
end_time = self._get_param("EventEndTime")
|
||||
destination = self._get_param("Destination")
|
||||
|
||||
result = self.events_backend.start_replay(
|
||||
name, description, source_arn, start_time, end_time, destination
|
||||
)
|
||||
|
||||
return json.dumps(result), self.response_headers
|
||||
|
||||
def describe_replay(self):
|
||||
name = self._get_param("ReplayName")
|
||||
|
||||
result = self.events_backend.describe_replay(name)
|
||||
|
||||
return json.dumps(result), self.response_headers
|
||||
|
||||
def list_replays(self):
|
||||
name_prefix = self._get_param("NamePrefix")
|
||||
source_arn = self._get_param("EventSourceArn")
|
||||
state = self._get_param("State")
|
||||
|
||||
result = self.events_backend.list_replays(name_prefix, source_arn, state)
|
||||
|
||||
return json.dumps({"Replays": result}), self.response_headers
|
||||
|
||||
def cancel_replay(self):
|
||||
name = self._get_param("ReplayName")
|
||||
|
||||
result = self.events_backend.cancel_replay(name)
|
||||
|
||||
return json.dumps(result), self.response_headers
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue