Add events target integration for sqs queue (#3815)
This commit is contained in:
parent
4f34439170
commit
9c89c24caf
4 changed files with 239 additions and 43 deletions
|
|
@ -100,7 +100,10 @@ class Rule(CloudFormationModel):
|
|||
if not self._validate_event(event):
|
||||
return
|
||||
|
||||
# for now only CW Log groups are supported
|
||||
# supported targets
|
||||
# - CloudWatch Log Group
|
||||
# - EventBridge Archive
|
||||
# - SQS Queue (not FIFO)
|
||||
for target in self.targets:
|
||||
arn = self._parse_arn(target["Arn"])
|
||||
|
||||
|
|
@ -111,6 +114,8 @@ class Rule(CloudFormationModel):
|
|||
archive_arn = self._parse_arn(input_template["archive-arn"])
|
||||
|
||||
self._send_to_events_archive(archive_arn.resource_id, event)
|
||||
elif arn.service == "sqs":
|
||||
self._send_to_sqs_queue(arn.resource_id, event)
|
||||
else:
|
||||
raise NotImplementedError("Expr not defined for {0}".format(type(self)))
|
||||
|
||||
|
|
@ -185,6 +190,18 @@ class Rule(CloudFormationModel):
|
|||
if archive.uuid == archive_uuid:
|
||||
archive.events.append(event)
|
||||
|
||||
def _send_to_sqs_queue(self, resource_id, event):
|
||||
from moto.sqs import sqs_backends
|
||||
|
||||
event_copy = copy.deepcopy(event)
|
||||
event_copy["time"] = iso_8601_datetime_without_milliseconds(
|
||||
datetime.utcfromtimestamp(event_copy["time"])
|
||||
)
|
||||
|
||||
sqs_backends[self.region_name].send_message(
|
||||
queue_name=resource_id, message_body=json.dumps(event_copy)
|
||||
)
|
||||
|
||||
def get_cfn_attribute(self, attribute_name):
|
||||
from moto.cloudformation.exceptions import UnformattedGetAttTemplateException
|
||||
|
||||
|
|
@ -709,14 +726,30 @@ class EventsBackend(BaseBackend):
|
|||
self.rules_order.append(new_rule.name)
|
||||
return new_rule
|
||||
|
||||
def put_targets(self, name, targets):
|
||||
def put_targets(self, name, event_bus_name, targets):
|
||||
# super simple ARN check
|
||||
invalid_arn = next(
|
||||
(
|
||||
target["Arn"]
|
||||
for target in targets
|
||||
if not re.match(r"arn:[\d\w:\-/]*", target["Arn"])
|
||||
),
|
||||
None,
|
||||
)
|
||||
if invalid_arn:
|
||||
raise ValidationException(
|
||||
"Parameter {} is not valid. "
|
||||
"Reason: Provided Arn is not in correct format.".format(invalid_arn)
|
||||
)
|
||||
|
||||
rule = self.rules.get(name)
|
||||
|
||||
if rule:
|
||||
rule.put_targets(targets)
|
||||
return True
|
||||
if not rule:
|
||||
raise ResourceNotFoundException(
|
||||
"Rule {0} does not exist on EventBus {1}.".format(name, event_bus_name)
|
||||
)
|
||||
|
||||
return False
|
||||
rule.put_targets(targets)
|
||||
|
||||
def put_events(self, events):
|
||||
num_events = len(events)
|
||||
|
|
@ -788,18 +821,16 @@ class EventsBackend(BaseBackend):
|
|||
|
||||
return entries
|
||||
|
||||
def remove_targets(self, name, ids):
|
||||
def remove_targets(self, name, event_bus_name, ids):
|
||||
rule = self.rules.get(name)
|
||||
|
||||
if rule:
|
||||
rule.remove_targets(ids)
|
||||
return {"FailedEntries": [], "FailedEntryCount": 0}
|
||||
else:
|
||||
raise JsonRESTError(
|
||||
"ResourceNotFoundException",
|
||||
"An entity that you specified does not exist",
|
||||
if not rule:
|
||||
raise ResourceNotFoundException(
|
||||
"Rule {0} does not exist on EventBus {1}.".format(name, event_bus_name)
|
||||
)
|
||||
|
||||
rule.remove_targets(ids)
|
||||
|
||||
def test_event_pattern(self):
|
||||
raise NotImplementedError()
|
||||
|
||||
|
|
@ -955,6 +986,7 @@ class EventsBackend(BaseBackend):
|
|||
)
|
||||
self.put_targets(
|
||||
rule.name,
|
||||
rule.event_bus_name,
|
||||
[
|
||||
{
|
||||
"Id": rule.name,
|
||||
|
|
|
|||
|
|
@ -207,18 +207,10 @@ class EventsHandler(BaseResponse):
|
|||
|
||||
def put_targets(self):
|
||||
rule_name = self._get_param("Rule")
|
||||
event_bus_name = self._get_param("EventBusName", "default")
|
||||
targets = self._get_param("Targets")
|
||||
|
||||
if not rule_name:
|
||||
return self.error("ValidationException", "Parameter Rule is required.")
|
||||
|
||||
if not targets:
|
||||
return self.error("ValidationException", "Parameter Targets is required.")
|
||||
|
||||
if not self.events_backend.put_targets(rule_name, targets):
|
||||
return self.error(
|
||||
"ResourceNotFoundException", "Rule " + rule_name + " does not exist."
|
||||
)
|
||||
self.events_backend.put_targets(rule_name, event_bus_name, targets)
|
||||
|
||||
return (
|
||||
json.dumps({"FailedEntryCount": 0, "FailedEntries": []}),
|
||||
|
|
@ -227,18 +219,10 @@ class EventsHandler(BaseResponse):
|
|||
|
||||
def remove_targets(self):
|
||||
rule_name = self._get_param("Rule")
|
||||
event_bus_name = self._get_param("EventBusName", "default")
|
||||
ids = self._get_param("Ids")
|
||||
|
||||
if not rule_name:
|
||||
return self.error("ValidationException", "Parameter Rule is required.")
|
||||
|
||||
if not ids:
|
||||
return self.error("ValidationException", "Parameter Ids is required.")
|
||||
|
||||
if not self.events_backend.remove_targets(rule_name, ids):
|
||||
return self.error(
|
||||
"ResourceNotFoundException", "Rule " + rule_name + " does not exist."
|
||||
)
|
||||
self.events_backend.remove_targets(rule_name, event_bus_name, ids)
|
||||
|
||||
return (
|
||||
json.dumps({"FailedEntryCount": 0, "FailedEntries": []}),
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue