This commit is contained in:
Steve Pulec 2017-02-23 21:37:43 -05:00
commit f37bad0e00
260 changed files with 6363 additions and 3766 deletions

View file

@ -18,6 +18,7 @@ from .utils import compose_shard_iterator, compose_new_shard_iterator, decompose
class Record(object):
def __init__(self, partition_key, data, sequence_number, explicit_hash_key):
self.partition_key = partition_key
self.data = data
@ -33,6 +34,7 @@ class Record(object):
class Shard(object):
def __init__(self, shard_id, starting_hash, ending_hash):
self._shard_id = shard_id
self.starting_hash = starting_hash
@ -64,7 +66,8 @@ class Shard(object):
else:
last_sequence_number = 0
sequence_number = last_sequence_number + 1
self.records[sequence_number] = Record(partition_key, data, sequence_number, explicit_hash_key)
self.records[sequence_number] = Record(
partition_key, data, sequence_number, explicit_hash_key)
return sequence_number
def get_min_sequence_number(self):
@ -107,8 +110,10 @@ class Stream(object):
izip_longest = itertools.izip_longest
for index, start, end in izip_longest(range(shard_count),
range(0,2**128,2**128//shard_count),
range(2**128//shard_count,2**128,2**128//shard_count),
range(0, 2**128, 2 **
128 // shard_count),
range(2**128 // shard_count, 2 **
128, 2**128 // shard_count),
fillvalue=2**128):
shard = Shard(index, start, end)
self.shards[shard.shard_id] = shard
@ -152,7 +157,8 @@ class Stream(object):
def put_record(self, partition_key, explicit_hash_key, sequence_number_for_ordering, data):
shard = self.get_shard_for_key(partition_key, explicit_hash_key)
sequence_number = shard.put_record(partition_key, data, explicit_hash_key)
sequence_number = shard.put_record(
partition_key, data, explicit_hash_key)
return sequence_number, shard.shard_id
def to_json(self):
@ -168,12 +174,14 @@ class Stream(object):
class FirehoseRecord(object):
def __init__(self, record_data):
self.record_id = 12345678
self.record_data = record_data
class DeliveryStream(object):
def __init__(self, stream_name, **stream_kwargs):
self.name = stream_name
self.redshift_username = stream_kwargs.get('redshift_username')
@ -185,14 +193,18 @@ class DeliveryStream(object):
self.s3_role_arn = stream_kwargs.get('s3_role_arn')
self.s3_bucket_arn = stream_kwargs.get('s3_bucket_arn')
self.s3_prefix = stream_kwargs.get('s3_prefix')
self.s3_compression_format = stream_kwargs.get('s3_compression_format', 'UNCOMPRESSED')
self.s3_compression_format = stream_kwargs.get(
's3_compression_format', 'UNCOMPRESSED')
self.s3_buffering_hings = stream_kwargs.get('s3_buffering_hings')
self.redshift_s3_role_arn = stream_kwargs.get('redshift_s3_role_arn')
self.redshift_s3_bucket_arn = stream_kwargs.get('redshift_s3_bucket_arn')
self.redshift_s3_bucket_arn = stream_kwargs.get(
'redshift_s3_bucket_arn')
self.redshift_s3_prefix = stream_kwargs.get('redshift_s3_prefix')
self.redshift_s3_compression_format = stream_kwargs.get('redshift_s3_compression_format', 'UNCOMPRESSED')
self.redshift_s3_buffering_hings = stream_kwargs.get('redshift_s3_buffering_hings')
self.redshift_s3_compression_format = stream_kwargs.get(
'redshift_s3_compression_format', 'UNCOMPRESSED')
self.redshift_s3_buffering_hings = stream_kwargs.get(
'redshift_s3_buffering_hings')
self.records = []
self.status = 'ACTIVE'
@ -231,9 +243,8 @@ class DeliveryStream(object):
},
"Username": self.redshift_username,
},
}
]
}
]
def to_dict(self):
return {
@ -261,10 +272,9 @@ class KinesisBackend(BaseBackend):
self.streams = {}
self.delivery_streams = {}
def create_stream(self, stream_name, shard_count, region):
if stream_name in self.streams:
raise ResourceInUseError(stream_name)
raise ResourceInUseError(stream_name)
stream = Stream(stream_name, shard_count, region)
self.streams[stream_name] = stream
return stream
@ -302,7 +312,8 @@ class KinesisBackend(BaseBackend):
records, last_sequence_id = shard.get_records(last_sequence_id, limit)
next_shard_iterator = compose_shard_iterator(stream_name, shard, last_sequence_id)
next_shard_iterator = compose_shard_iterator(
stream_name, shard, last_sequence_id)
return next_shard_iterator, records
@ -320,7 +331,7 @@ class KinesisBackend(BaseBackend):
response = {
"FailedRecordCount": 0,
"Records" : []
"Records": []
}
for record in records:
@ -342,7 +353,7 @@ class KinesisBackend(BaseBackend):
stream = self.describe_stream(stream_name)
if shard_to_split not in stream.shards:
raise ResourceNotFoundError(shard_to_split)
raise ResourceNotFoundError(shard_to_split)
if not re.match(r'0|([1-9]\d{0,38})', new_starting_hash_key):
raise InvalidArgumentError(new_starting_hash_key)
@ -350,10 +361,12 @@ class KinesisBackend(BaseBackend):
shard = stream.shards[shard_to_split]
last_id = sorted(stream.shards.values(), key=attrgetter('_shard_id'))[-1]._shard_id
last_id = sorted(stream.shards.values(),
key=attrgetter('_shard_id'))[-1]._shard_id
if shard.starting_hash < new_starting_hash_key < shard.ending_hash:
new_shard = Shard(last_id+1, new_starting_hash_key, shard.ending_hash)
new_shard = Shard(
last_id + 1, new_starting_hash_key, shard.ending_hash)
shard.ending_hash = new_starting_hash_key
stream.shards[new_shard.shard_id] = new_shard
else:
@ -372,10 +385,10 @@ class KinesisBackend(BaseBackend):
stream = self.describe_stream(stream_name)
if shard_to_merge not in stream.shards:
raise ResourceNotFoundError(shard_to_merge)
raise ResourceNotFoundError(shard_to_merge)
if adjacent_shard_to_merge not in stream.shards:
raise ResourceNotFoundError(adjacent_shard_to_merge)
raise ResourceNotFoundError(adjacent_shard_to_merge)
shard1 = stream.shards[shard_to_merge]
shard2 = stream.shards[adjacent_shard_to_merge]
@ -390,9 +403,11 @@ class KinesisBackend(BaseBackend):
del stream.shards[shard2.shard_id]
for index in shard2.records:
record = shard2.records[index]
shard1.put_record(record.partition_key, record.data, record.explicit_hash_key)
shard1.put_record(record.partition_key,
record.data, record.explicit_hash_key)
''' Firehose '''
def create_delivery_stream(self, stream_name, **stream_kwargs):
stream = DeliveryStream(stream_name, **stream_kwargs)
self.delivery_streams[stream_name] = stream
@ -416,19 +431,19 @@ class KinesisBackend(BaseBackend):
return record
def list_tags_for_stream(self, stream_name, exclusive_start_tag_key=None, limit=None):
stream = self.describe_stream(stream_name)
stream = self.describe_stream(stream_name)
tags = []
result = {
'HasMoreTags': False,
'Tags': tags
}
for key, val in sorted(stream.tags.items(), key=lambda x:x[0]):
if limit and len(res) >= limit:
result['HasMoreTags'] = True
break
if exclusive_start_tag_key and key < exexclusive_start_tag_key:
continue
for key, val in sorted(stream.tags.items(), key=lambda x: x[0]):
if limit and len(tags) >= limit:
result['HasMoreTags'] = True
break
if exclusive_start_tag_key and key < exclusive_start_tag_key:
continue
tags.append({
'Key': key,
@ -438,14 +453,14 @@ class KinesisBackend(BaseBackend):
return result
def add_tags_to_stream(self, stream_name, tags):
stream = self.describe_stream(stream_name)
stream = self.describe_stream(stream_name)
stream.tags.update(tags)
def remove_tags_from_stream(self, stream_name, tag_keys):
stream = self.describe_stream(stream_name)
stream = self.describe_stream(stream_name)
for key in tag_keys:
if key in stream.tags:
del stream.tags[key]
del stream.tags[key]
kinesis_backends = {}