diff --git a/moto/events/models.py b/moto/events/models.py index eb8d4142..b17d21a6 100644 --- a/moto/events/models.py +++ b/moto/events/models.py @@ -3,6 +3,7 @@ import os import re import json import sys +import warnings from collections import namedtuple from datetime import datetime from enum import Enum, unique @@ -121,7 +122,7 @@ class Rule(CloudFormationModel): # supported targets # - CloudWatch Log Group # - EventBridge Archive - # - SQS Queue (not FIFO) + # - SQS Queue + FIFO Queue for target in self.targets: arn = self._parse_arn(target["Arn"]) @@ -133,7 +134,8 @@ class Rule(CloudFormationModel): self._send_to_events_archive(archive_arn.resource_id, event) elif arn.service == "sqs": - self._send_to_sqs_queue(arn.resource_id, event) + group_id = target.get("SqsParameters", {}).get("MessageGroupId") + self._send_to_sqs_queue(arn.resource_id, event, group_id) else: raise NotImplementedError("Expr not defined for {0}".format(type(self))) @@ -211,7 +213,7 @@ class Rule(CloudFormationModel): if self._does_event_match_pattern(event, pattern): archive.events.append(event) - def _send_to_sqs_queue(self, resource_id, event): + def _send_to_sqs_queue(self, resource_id, event, group_id=None): from moto.sqs import sqs_backends event_copy = copy.deepcopy(event) @@ -219,8 +221,20 @@ class Rule(CloudFormationModel): datetime.utcfromtimestamp(event_copy["time"]) ) + if group_id: + queue_attr = sqs_backends[self.region_name].get_queue_attributes( + queue_name=resource_id, attribute_names=["ContentBasedDeduplication"] + ) + if queue_attr["ContentBasedDeduplication"] == "false": + warnings.warn( + "To let EventBridge send messages to your SQS FIFO queue, you must enable content-based deduplication." + ) + return + sqs_backends[self.region_name].send_message( - queue_name=resource_id, message_body=json.dumps(event_copy) + queue_name=resource_id, + message_body=json.dumps(event_copy), + group_id=group_id, ) def get_cfn_attribute(self, attribute_name): @@ -763,6 +777,20 @@ class EventsBackend(BaseBackend): "Reason: Provided Arn is not in correct format.".format(invalid_arn) ) + for target in targets: + arn = target["Arn"] + + if ( + ":sqs:" in arn + and arn.endswith(".fifo") + and not target.get("SqsParameters") + ): + raise ValidationException( + "Parameter(s) SqsParameters must be specified for target: {}.".format( + target["Id"] + ) + ) + rule = self.rules.get(name) if not rule: diff --git a/tests/test_events/test_events.py b/tests/test_events/test_events.py index ee92e870..43580e88 100644 --- a/tests/test_events/test_events.py +++ b/tests/test_events/test_events.py @@ -401,6 +401,35 @@ def test_put_targets_error_unknown_rule(): ) +@mock_events +def test_put_targets_error_missing_parameter_sqs_fifo(): + # given + client = boto3.client("events", "eu-central-1") + + # when + with pytest.raises(ClientError) as e: + client.put_targets( + Rule="unknown", + Targets=[ + { + "Id": "sqs-fifo", + "Arn": "arn:aws:sqs:eu-central-1:{}:test-queue.fifo".format( + ACCOUNT_ID + ), + } + ], + ) + + # then + ex = e.value + ex.operation_name.should.equal("PutTargets") + ex.response["ResponseMetadata"]["HTTPStatusCode"].should.equal(400) + ex.response["Error"]["Code"].should.contain("ValidationException") + ex.response["Error"]["Message"].should.equal( + "Parameter(s) SqsParameters must be specified for target: sqs-fifo." + ) + + @mock_events def test_permissions(): client = boto3.client("events", "eu-central-1") diff --git a/tests/test_events/test_events_targets.py b/tests/test_events/test_events_integration.py similarity index 56% rename from tests/test_events/test_events_targets.py rename to tests/test_events/test_events_integration.py index 08915edc..8f20c94c 100644 --- a/tests/test_events/test_events_targets.py +++ b/tests/test_events/test_events_integration.py @@ -68,6 +68,94 @@ def test_send_to_cw_log_group(): message["detail"].should.equal({"key": "value"}) +@mock_events +@mock_sqs +def test_send_to_sqs_fifo_queue(): + # given + client_events = boto3.client("events", "eu-central-1") + client_sqs = boto3.client("sqs", region_name="eu-central-1") + rule_name = "test-rule" + + queue_url = client_sqs.create_queue( + QueueName="test-queue.fifo", Attributes={"FifoQueue": "true"} + )["QueueUrl"] + queue_arn = client_sqs.get_queue_attributes( + QueueUrl=queue_url, AttributeNames=["QueueArn"] + )["Attributes"]["QueueArn"] + queue_url_dedup = client_sqs.create_queue( + QueueName="test-queue-dedup.fifo", + Attributes={"FifoQueue": "true", "ContentBasedDeduplication": "true"}, + )["QueueUrl"] + queue_arn_dedup = client_sqs.get_queue_attributes( + QueueUrl=queue_url_dedup, AttributeNames=["QueueArn"] + )["Attributes"]["QueueArn"] + + client_events.put_rule( + Name=rule_name, + EventPattern=json.dumps({"account": [ACCOUNT_ID]}), + State="ENABLED", + ) + client_events.put_targets( + Rule=rule_name, + Targets=[ + { + "Id": "sqs-fifo", + "Arn": queue_arn, + "SqsParameters": {"MessageGroupId": "group-id"}, + }, + { + "Id": "sqs-dedup-fifo", + "Arn": queue_arn_dedup, + "SqsParameters": {"MessageGroupId": "group-id"}, + }, + ], + ) + + # when + event_time = datetime(2021, 1, 1, 12, 23, 34) + client_events.put_events( + Entries=[ + { + "Time": event_time, + "Source": "source", + "DetailType": "type", + "Detail": json.dumps({"key": "value"}), + } + ], + ) + + # then + response = client_sqs.receive_message( + QueueUrl=queue_url_dedup, + AttributeNames=["MessageDeduplicationId", "MessageGroupId"], + ) + response["Messages"].should.have.length_of(1) + message = response["Messages"][0] + message["MessageId"].should_not.be.empty + message["ReceiptHandle"].should_not.be.empty + message["MD5OfBody"].should_not.be.empty + + message["Attributes"]["MessageDeduplicationId"].should_not.be.empty + message["Attributes"]["MessageGroupId"].should.equal("group-id") + + body = json.loads(message["Body"]) + body["version"].should.equal("0") + body["id"].should_not.be.empty + body["detail-type"].should.equal("type") + body["source"].should.equal("source") + body["time"].should.equal(iso_8601_datetime_without_milliseconds(event_time)) + body["region"].should.equal("eu-central-1") + body["resources"].should.be.empty + body["detail"].should.equal({"key": "value"}) + + # A FIFO queue without content-based deduplication enabled + # does not receive any event from the Event Bus + response = client_sqs.receive_message( + QueueUrl=queue_url, AttributeNames=["MessageDeduplicationId", "MessageGroupId"] + ) + response.should_not.have.key("Messages") + + @mock_events @mock_sqs def test_send_to_sqs_queue():