Add more shard iterator types.
This commit is contained in:
parent
38a4734f95
commit
57d45aa4b8
5 changed files with 402 additions and 22 deletions
|
|
@ -17,3 +17,18 @@ class StreamNotFoundError(ResourceNotFoundError):
|
|||
def __init__(self, stream_name):
|
||||
super(StreamNotFoundError, self).__init__(
|
||||
'Stream {0} under account 123456789012 not found.'.format(stream_name))
|
||||
|
||||
|
||||
class ShardNotFoundError(ResourceNotFoundError):
|
||||
def __init__(self, shard_id):
|
||||
super(ShardNotFoundError, self).__init__(
|
||||
'Shard {0} under account 123456789012 not found.'.format(shard_id))
|
||||
|
||||
|
||||
class InvalidArgumentError(BadRequest):
|
||||
def __init__(self, message):
|
||||
super(InvalidArgumentError, self).__init__()
|
||||
self.description = json.dumps({
|
||||
"message": message,
|
||||
'__type': 'InvalidArgumentException',
|
||||
})
|
||||
|
|
|
|||
|
|
@ -2,7 +2,74 @@ from __future__ import unicode_literals
|
|||
|
||||
import boto.kinesis
|
||||
from moto.core import BaseBackend
|
||||
from .exceptions import StreamNotFoundError
|
||||
from .exceptions import StreamNotFoundError, ShardNotFoundError
|
||||
from .utils import compose_shard_iterator, compose_new_shard_iterator, decompose_shard_iterator
|
||||
|
||||
try:
|
||||
from collections import OrderedDict
|
||||
except ImportError:
|
||||
# python 2.6 or earlier, use backport
|
||||
from ordereddict import OrderedDict
|
||||
|
||||
|
||||
class Record(object):
|
||||
def __init__(self, partition_key, data, sequence_number):
|
||||
self.partition_key = partition_key
|
||||
self.data = data
|
||||
self.sequence_number = sequence_number
|
||||
|
||||
def to_json(self):
|
||||
return {
|
||||
"Data": self.data,
|
||||
"PartitionKey": self.partition_key,
|
||||
"SequenceNumber": str(self.sequence_number),
|
||||
}
|
||||
|
||||
|
||||
class Shard(object):
|
||||
def __init__(self, shard_id):
|
||||
self.shard_id = shard_id
|
||||
self.records = OrderedDict()
|
||||
|
||||
def get_records(self, last_sequence_id, limit):
|
||||
last_sequence_id = int(last_sequence_id)
|
||||
results = []
|
||||
|
||||
for sequence_number, record in self.records.items():
|
||||
if sequence_number > last_sequence_id:
|
||||
results.append(record)
|
||||
last_sequence_id = sequence_number
|
||||
|
||||
if len(results) == limit:
|
||||
break
|
||||
|
||||
return results, last_sequence_id
|
||||
|
||||
def put_record(self, partition_key, data):
|
||||
# Note: this function is not safe for concurrency
|
||||
if self.records:
|
||||
last_sequence_number = self.get_max_sequence_number()
|
||||
else:
|
||||
last_sequence_number = 0
|
||||
sequence_number = last_sequence_number + 1
|
||||
self.records[sequence_number] = Record(partition_key, data, sequence_number)
|
||||
return sequence_number
|
||||
|
||||
def get_max_sequence_number(self):
|
||||
return self.records.keys()[-1]
|
||||
|
||||
def to_json(self):
|
||||
return {
|
||||
"HashKeyRange": {
|
||||
"EndingHashKey": "113427455640312821154458202477256070484",
|
||||
"StartingHashKey": "0"
|
||||
},
|
||||
"SequenceNumberRange": {
|
||||
"EndingSequenceNumber": "21269319989741826081360214168359141376",
|
||||
"StartingSequenceNumber": "21267647932558653966460912964485513216"
|
||||
},
|
||||
"ShardId": self.shard_id
|
||||
}
|
||||
|
||||
|
||||
class Stream(object):
|
||||
|
|
@ -11,6 +78,11 @@ class Stream(object):
|
|||
self.shard_count = shard_count
|
||||
self.region = region
|
||||
self.account_number = "123456789012"
|
||||
self.shards = {}
|
||||
|
||||
for index in range(shard_count):
|
||||
shard_id = "shardId-{}".format(str(index).zfill(12))
|
||||
self.shards[shard_id] = Shard(shard_id)
|
||||
|
||||
@property
|
||||
def arn(self):
|
||||
|
|
@ -20,6 +92,24 @@ class Stream(object):
|
|||
stream_name=self.stream_name
|
||||
)
|
||||
|
||||
def get_shard(self, shard_id):
|
||||
if shard_id in self.shards:
|
||||
return self.shards[shard_id]
|
||||
else:
|
||||
raise ShardNotFoundError(shard_id)
|
||||
|
||||
def get_shard_for_key(self, partition_key):
|
||||
# TODO implement sharding
|
||||
shard = self.shards.values()[0]
|
||||
return shard
|
||||
|
||||
def put_record(self, partition_key, explicit_hash_key, sequence_number_for_ordering, data):
|
||||
partition_key = explicit_hash_key if explicit_hash_key else partition_key
|
||||
shard = self.get_shard_for_key(partition_key)
|
||||
|
||||
sequence_number = shard.put_record(partition_key, data)
|
||||
return sequence_number, shard.shard_id
|
||||
|
||||
def to_json(self):
|
||||
return {
|
||||
"StreamDescription": {
|
||||
|
|
@ -27,26 +117,7 @@ class Stream(object):
|
|||
"StreamName": self.stream_name,
|
||||
"StreamStatus": "ACTIVE",
|
||||
"HasMoreShards": False,
|
||||
"Shards": [{
|
||||
"HashKeyRange": {
|
||||
"EndingHashKey": "113427455640312821154458202477256070484",
|
||||
"StartingHashKey": "0"
|
||||
},
|
||||
"SequenceNumberRange": {
|
||||
"EndingSequenceNumber": "21269319989741826081360214168359141376",
|
||||
"StartingSequenceNumber": "21267647932558653966460912964485513216"
|
||||
},
|
||||
"ShardId": "shardId-000000000000"
|
||||
}, {
|
||||
"HashKeyRange": {
|
||||
"EndingHashKey": "226854911280625642308916404954512140969",
|
||||
"StartingHashKey": "113427455640312821154458202477256070485"
|
||||
},
|
||||
"SequenceNumberRange": {
|
||||
"StartingSequenceNumber": "21267647932558653966460912964485513217"
|
||||
},
|
||||
"ShardId": "shardId-000000000001"
|
||||
}],
|
||||
"Shards": [shard.to_json() for shard in self.shards.values()],
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -75,6 +146,37 @@ class KinesisBackend(BaseBackend):
|
|||
return self.streams.pop(stream_name)
|
||||
raise StreamNotFoundError(stream_name)
|
||||
|
||||
def get_shard_iterator(self, stream_name, shard_id, shard_iterator_type, starting_sequence_number):
|
||||
# Validate params
|
||||
stream = self.describe_stream(stream_name)
|
||||
shard = stream.get_shard(shard_id)
|
||||
|
||||
shard_iterator = compose_new_shard_iterator(
|
||||
stream_name, shard, shard_iterator_type, starting_sequence_number
|
||||
)
|
||||
return shard_iterator
|
||||
|
||||
def get_records(self, shard_iterator, limit):
|
||||
decomposed = decompose_shard_iterator(shard_iterator)
|
||||
stream_name, shard_id, last_sequence_id = decomposed
|
||||
|
||||
stream = self.describe_stream(stream_name)
|
||||
shard = stream.get_shard(shard_id)
|
||||
|
||||
records, last_sequence_id = shard.get_records(last_sequence_id, limit)
|
||||
|
||||
next_shard_iterator = compose_shard_iterator(stream_name, shard, last_sequence_id)
|
||||
|
||||
return next_shard_iterator, records
|
||||
|
||||
def put_record(self, stream_name, partition_key, explicit_hash_key, sequence_number_for_ordering, data):
|
||||
stream = self.describe_stream(stream_name)
|
||||
|
||||
sequence_number, shard_id = stream.put_record(
|
||||
partition_key, explicit_hash_key, sequence_number_for_ordering, data
|
||||
)
|
||||
|
||||
return sequence_number, shard_id
|
||||
|
||||
kinesis_backends = {}
|
||||
for region in boto.kinesis.regions():
|
||||
|
|
|
|||
|
|
@ -40,3 +40,44 @@ class KinesisResponse(BaseResponse):
|
|||
self.kinesis_backend.delete_stream(stream_name)
|
||||
|
||||
return ""
|
||||
|
||||
def get_shard_iterator(self):
|
||||
stream_name = self.parameters.get("StreamName")
|
||||
shard_id = self.parameters.get("ShardId")
|
||||
shard_iterator_type = self.parameters.get("ShardIteratorType")
|
||||
starting_sequence_number = self.parameters.get("StartingSequenceNumber")
|
||||
|
||||
shard_iterator = self.kinesis_backend.get_shard_iterator(
|
||||
stream_name, shard_id, shard_iterator_type, starting_sequence_number,
|
||||
)
|
||||
|
||||
return json.dumps({
|
||||
"ShardIterator": shard_iterator
|
||||
})
|
||||
|
||||
def get_records(self):
|
||||
shard_iterator = self.parameters.get("ShardIterator")
|
||||
limit = self.parameters.get("Limit")
|
||||
|
||||
next_shard_iterator, records = self.kinesis_backend.get_records(shard_iterator, limit)
|
||||
|
||||
return json.dumps({
|
||||
"NextShardIterator": next_shard_iterator,
|
||||
"Records": [record.to_json() for record in records]
|
||||
})
|
||||
|
||||
def put_record(self):
|
||||
stream_name = self.parameters.get("StreamName")
|
||||
partition_key = self.parameters.get("PartitionKey")
|
||||
explicit_hash_key = self.parameters.get("ExplicitHashKey")
|
||||
sequence_number_for_ordering = self.parameters.get("SequenceNumberForOrdering")
|
||||
data = self.parameters.get("Data")
|
||||
|
||||
sequence_number, shard_id = self.kinesis_backend.put_record(
|
||||
stream_name, partition_key, explicit_hash_key, sequence_number_for_ordering, data
|
||||
)
|
||||
|
||||
return json.dumps({
|
||||
"SequenceNumber": sequence_number,
|
||||
"ShardId": shard_id,
|
||||
})
|
||||
|
|
|
|||
31
moto/kinesis/utils.py
Normal file
31
moto/kinesis/utils.py
Normal file
|
|
@ -0,0 +1,31 @@
|
|||
import base64
|
||||
|
||||
from .exceptions import InvalidArgumentError
|
||||
|
||||
|
||||
def compose_new_shard_iterator(stream_name, shard, shard_iterator_type, starting_sequence_number):
|
||||
if shard_iterator_type == "AT_SEQUENCE_NUMBER":
|
||||
last_sequence_id = int(starting_sequence_number) - 1
|
||||
elif shard_iterator_type == "AFTER_SEQUENCE_NUMBER":
|
||||
last_sequence_id = int(starting_sequence_number)
|
||||
elif shard_iterator_type == "TRIM_HORIZON":
|
||||
last_sequence_id = 0
|
||||
elif shard_iterator_type == "LATEST":
|
||||
last_sequence_id = shard.get_max_sequence_number()
|
||||
else:
|
||||
raise InvalidArgumentError("Invalid ShardIteratorType: {0}".format(shard_iterator_type))
|
||||
return compose_shard_iterator(stream_name, shard, last_sequence_id)
|
||||
|
||||
|
||||
def compose_shard_iterator(stream_name, shard, last_sequence_id):
|
||||
return base64.encodestring(
|
||||
"{0}:{1}:{2}".format(
|
||||
stream_name,
|
||||
shard.shard_id,
|
||||
last_sequence_id,
|
||||
)
|
||||
)
|
||||
|
||||
|
||||
def decompose_shard_iterator(shard_iterator):
|
||||
return base64.decodestring(shard_iterator).split(":")
|
||||
Loading…
Add table
Add a link
Reference in a new issue