From ddf2f5a754455fe2b1b434089cade9083d6b7d33 Mon Sep 17 00:00:00 2001 From: Steve Pulec Date: Sun, 9 Oct 2016 20:23:56 -0400 Subject: [PATCH] Fix firehose to work without Redshift config. --- moto/kinesis/models.py | 75 ++++++++++++++++++----------- moto/kinesis/responses.py | 10 ++++ tests/test_kinesis/test_firehose.py | 54 +++++++++++++++++++++ 3 files changed, 112 insertions(+), 27 deletions(-) diff --git a/moto/kinesis/models.py b/moto/kinesis/models.py index 4de10a55..e0e20da3 100644 --- a/moto/kinesis/models.py +++ b/moto/kinesis/models.py @@ -176,17 +176,23 @@ class FirehoseRecord(object): class DeliveryStream(object): def __init__(self, stream_name, **stream_kwargs): self.name = stream_name - self.redshift_username = stream_kwargs['redshift_username'] - self.redshift_password = stream_kwargs['redshift_password'] - self.redshift_jdbc_url = stream_kwargs['redshift_jdbc_url'] - self.redshift_role_arn = stream_kwargs['redshift_role_arn'] - self.redshift_copy_command = stream_kwargs['redshift_copy_command'] + self.redshift_username = stream_kwargs.get('redshift_username') + self.redshift_password = stream_kwargs.get('redshift_password') + self.redshift_jdbc_url = stream_kwargs.get('redshift_jdbc_url') + self.redshift_role_arn = stream_kwargs.get('redshift_role_arn') + self.redshift_copy_command = stream_kwargs.get('redshift_copy_command') - self.redshift_s3_role_arn = stream_kwargs['redshift_s3_role_arn'] - self.redshift_s3_bucket_arn = stream_kwargs['redshift_s3_bucket_arn'] - self.redshift_s3_prefix = stream_kwargs['redshift_s3_prefix'] + self.s3_role_arn = stream_kwargs.get('s3_role_arn') + self.s3_bucket_arn = stream_kwargs.get('s3_bucket_arn') + self.s3_prefix = stream_kwargs.get('s3_prefix') + self.s3_compression_format = stream_kwargs.get('s3_compression_format', 'UNCOMPRESSED') + self.s3_buffering_hings = stream_kwargs.get('s3_buffering_hings') + + self.redshift_s3_role_arn = stream_kwargs.get('redshift_s3_role_arn') + self.redshift_s3_bucket_arn = stream_kwargs.get('redshift_s3_bucket_arn') + self.redshift_s3_prefix = stream_kwargs.get('redshift_s3_prefix') self.redshift_s3_compression_format = stream_kwargs.get('redshift_s3_compression_format', 'UNCOMPRESSED') - self.redshift_s3_buffering_hings = stream_kwargs['redshift_s3_buffering_hings'] + self.redshift_s3_buffering_hings = stream_kwargs.get('redshift_s3_buffering_hings') self.records = [] self.status = 'ACTIVE' @@ -197,6 +203,38 @@ class DeliveryStream(object): def arn(self): return 'arn:aws:firehose:us-east-1:123456789012:deliverystream/{0}'.format(self.name) + def destinations_to_dict(self): + if self.s3_role_arn: + return [{ + 'DestinationId': 'string', + 'S3DestinationDescription': { + 'RoleARN': self.s3_role_arn, + 'BucketARN': self.s3_bucket_arn, + 'Prefix': self.s3_prefix, + 'BufferingHints': self.s3_buffering_hings, + 'CompressionFormat': self.s3_compression_format, + } + }] + else: + return [{ + "DestinationId": "string", + "RedshiftDestinationDescription": { + "ClusterJDBCURL": self.redshift_jdbc_url, + "CopyCommand": self.redshift_copy_command, + "RoleARN": self.redshift_role_arn, + "S3DestinationDescription": { + "BucketARN": self.redshift_s3_bucket_arn, + "BufferingHints": self.redshift_s3_buffering_hings, + "CompressionFormat": self.redshift_s3_compression_format, + "Prefix": self.redshift_s3_prefix, + "RoleARN": self.redshift_s3_role_arn + }, + "Username": self.redshift_username, + }, + } + ] + + def to_dict(self): return { "DeliveryStreamDescription": { @@ -204,24 +242,7 @@ class DeliveryStream(object): "DeliveryStreamARN": self.arn, "DeliveryStreamName": self.name, "DeliveryStreamStatus": self.status, - "Destinations": [ - { - "DestinationId": "string", - "RedshiftDestinationDescription": { - "ClusterJDBCURL": self.redshift_jdbc_url, - "CopyCommand": self.redshift_copy_command, - "RoleARN": self.redshift_role_arn, - "S3DestinationDescription": { - "BucketARN": self.redshift_s3_bucket_arn, - "BufferingHints": self.redshift_s3_buffering_hings, - "CompressionFormat": self.redshift_s3_compression_format, - "Prefix": self.redshift_s3_prefix, - "RoleARN": self.redshift_s3_role_arn - }, - "Username": self.redshift_username, - }, - } - ], + "Destinations": self.destinations_to_dict(), "HasMoreDestinations": False, "LastUpdateTimestamp": time.mktime(self.last_updated.timetuple()), "VersionId": "string", diff --git a/moto/kinesis/responses.py b/moto/kinesis/responses.py index b52bdedf..264f53a2 100644 --- a/moto/kinesis/responses.py +++ b/moto/kinesis/responses.py @@ -139,6 +139,16 @@ class KinesisResponse(BaseResponse): 'redshift_s3_compression_format': redshift_s3_config.get('CompressionFormat'), 'redshift_s3_buffering_hings': redshift_s3_config['BufferingHints'], } + else: + # S3 Config + s3_config = self.parameters['S3DestinationConfiguration'] + stream_kwargs = { + 's3_role_arn': s3_config['RoleARN'], + 's3_bucket_arn': s3_config['BucketARN'], + 's3_prefix': s3_config['Prefix'], + 's3_compression_format': s3_config.get('CompressionFormat'), + 's3_buffering_hings': s3_config['BufferingHints'], + } stream = self.kinesis_backend.create_delivery_stream(stream_name, **stream_kwargs) return json.dumps({ 'DeliveryStreamARN': stream.arn diff --git a/tests/test_kinesis/test_firehose.py b/tests/test_kinesis/test_firehose.py index 37585fe5..14ee1916 100644 --- a/tests/test_kinesis/test_firehose.py +++ b/tests/test_kinesis/test_firehose.py @@ -87,6 +87,60 @@ def test_create_stream(): }) +@mock_kinesis +@freeze_time("2015-03-01") +def test_create_stream_without_redshift(): + client = boto3.client('firehose', region_name='us-east-1') + + response = client.create_delivery_stream( + DeliveryStreamName="stream1", + S3DestinationConfiguration={ + 'RoleARN': 'arn:aws:iam::123456789012:role/firehose_delivery_role', + 'BucketARN': 'arn:aws:s3:::kinesis-test', + 'Prefix': 'myFolder/', + 'BufferingHints': { + 'SizeInMBs': 123, + 'IntervalInSeconds': 124 + }, + 'CompressionFormat': 'UNCOMPRESSED', + } + ) + stream_arn = response['DeliveryStreamARN'] + + response = client.describe_delivery_stream(DeliveryStreamName='stream1') + stream_description = response['DeliveryStreamDescription'] + + # Sure and Freezegun don't play nicely together + created = stream_description.pop('CreateTimestamp') + last_updated = stream_description.pop('LastUpdateTimestamp') + from dateutil.tz import tzlocal + assert created == datetime.datetime(2015, 3, 1, tzinfo=tzlocal()) + assert last_updated == datetime.datetime(2015, 3, 1, tzinfo=tzlocal()) + + stream_description.should.equal({ + 'DeliveryStreamName': 'stream1', + 'DeliveryStreamARN': stream_arn, + 'DeliveryStreamStatus': 'ACTIVE', + 'VersionId': 'string', + 'Destinations': [ + { + 'DestinationId': 'string', + 'S3DestinationDescription': { + 'RoleARN': 'arn:aws:iam::123456789012:role/firehose_delivery_role', + 'RoleARN': 'arn:aws:iam::123456789012:role/firehose_delivery_role', + 'BucketARN': 'arn:aws:s3:::kinesis-test', + 'Prefix': 'myFolder/', + 'BufferingHints': { + 'SizeInMBs': 123, + 'IntervalInSeconds': 124 + }, + 'CompressionFormat': 'UNCOMPRESSED', + } + }, + ], + "HasMoreDestinations": False, + }) + @mock_kinesis @freeze_time("2015-03-01") def test_deescribe_non_existant_stream():