Basic Kinesis Stream CRUD.
This commit is contained in:
parent
5c589b24af
commit
da15fb711d
10 changed files with 246 additions and 0 deletions
12
moto/kinesis/__init__.py
Normal file
12
moto/kinesis/__init__.py
Normal file
|
|
@ -0,0 +1,12 @@
|
|||
from __future__ import unicode_literals
|
||||
from .models import kinesis_backends
|
||||
from ..core.models import MockAWS
|
||||
|
||||
kinesis_backend = kinesis_backends['us-east-1']
|
||||
|
||||
|
||||
def mock_kinesis(func=None):
|
||||
if func:
|
||||
return MockAWS(kinesis_backends)(func)
|
||||
else:
|
||||
return MockAWS(kinesis_backends)
|
||||
19
moto/kinesis/exceptions.py
Normal file
19
moto/kinesis/exceptions.py
Normal file
|
|
@ -0,0 +1,19 @@
|
|||
from __future__ import unicode_literals
|
||||
|
||||
import json
|
||||
from werkzeug.exceptions import BadRequest
|
||||
|
||||
|
||||
class ResourceNotFoundError(BadRequest):
|
||||
def __init__(self, message):
|
||||
super(ResourceNotFoundError, self).__init__()
|
||||
self.description = json.dumps({
|
||||
"message": message,
|
||||
'__type': 'ResourceNotFoundException',
|
||||
})
|
||||
|
||||
|
||||
class StreamNotFoundError(ResourceNotFoundError):
|
||||
def __init__(self, stream_name):
|
||||
super(StreamNotFoundError, self).__init__(
|
||||
'Stream {} under account 123456789012 not found.'.format(stream_name))
|
||||
81
moto/kinesis/models.py
Normal file
81
moto/kinesis/models.py
Normal file
|
|
@ -0,0 +1,81 @@
|
|||
from __future__ import unicode_literals
|
||||
|
||||
import boto.kinesis
|
||||
from moto.core import BaseBackend
|
||||
from .exceptions import StreamNotFoundError
|
||||
|
||||
|
||||
class Stream(object):
|
||||
def __init__(self, stream_name, shard_count, region):
|
||||
self.stream_name = stream_name
|
||||
self.shard_count = shard_count
|
||||
self.region = region
|
||||
self.account_number = "123456789012"
|
||||
|
||||
@property
|
||||
def arn(self):
|
||||
return "arn:aws:kinesis:{region}:{account_number}:{stream_name}".format(
|
||||
region=self.region,
|
||||
account_number=self.account_number,
|
||||
stream_name=self.stream_name
|
||||
)
|
||||
|
||||
def to_json(self):
|
||||
return {
|
||||
"StreamDescription": {
|
||||
"StreamARN": self.arn,
|
||||
"StreamName": self.stream_name,
|
||||
"StreamStatus": "ACTIVE",
|
||||
"HasMoreShards": False,
|
||||
"Shards": [{
|
||||
"HashKeyRange": {
|
||||
"EndingHashKey": "113427455640312821154458202477256070484",
|
||||
"StartingHashKey": "0"
|
||||
},
|
||||
"SequenceNumberRange": {
|
||||
"EndingSequenceNumber": "21269319989741826081360214168359141376",
|
||||
"StartingSequenceNumber": "21267647932558653966460912964485513216"
|
||||
},
|
||||
"ShardId": "shardId-000000000000"
|
||||
}, {
|
||||
"HashKeyRange": {
|
||||
"EndingHashKey": "226854911280625642308916404954512140969",
|
||||
"StartingHashKey": "113427455640312821154458202477256070485"
|
||||
},
|
||||
"SequenceNumberRange": {
|
||||
"StartingSequenceNumber": "21267647932558653966460912964485513217"
|
||||
},
|
||||
"ShardId": "shardId-000000000001"
|
||||
}],
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
class KinesisBackend(BaseBackend):
|
||||
|
||||
def __init__(self):
|
||||
self.streams = {}
|
||||
|
||||
def create_stream(self, stream_name, shard_count, region):
|
||||
stream = Stream(stream_name, shard_count, region)
|
||||
self.streams[stream_name] = stream
|
||||
return stream
|
||||
|
||||
def describe_stream(self, stream_name):
|
||||
if stream_name in self.streams:
|
||||
return self.streams[stream_name]
|
||||
else:
|
||||
raise StreamNotFoundError(stream_name)
|
||||
|
||||
def list_streams(self):
|
||||
return self.streams.values()
|
||||
|
||||
def delete_stream(self, stream_name):
|
||||
if stream_name in self.streams:
|
||||
return self.streams.pop(stream_name)
|
||||
raise StreamNotFoundError(stream_name)
|
||||
|
||||
|
||||
kinesis_backends = {}
|
||||
for region in boto.kinesis.regions():
|
||||
kinesis_backends[region.name] = KinesisBackend()
|
||||
42
moto/kinesis/responses.py
Normal file
42
moto/kinesis/responses.py
Normal file
|
|
@ -0,0 +1,42 @@
|
|||
from __future__ import unicode_literals
|
||||
|
||||
import json
|
||||
|
||||
from moto.core.responses import BaseResponse
|
||||
from .models import kinesis_backends
|
||||
|
||||
|
||||
class KinesisResponse(BaseResponse):
|
||||
|
||||
@property
|
||||
def parameters(self):
|
||||
return json.loads(self.body)
|
||||
|
||||
@property
|
||||
def kinesis_backend(self):
|
||||
return kinesis_backends[self.region]
|
||||
|
||||
def create_stream(self):
|
||||
stream_name = self.parameters.get('StreamName')
|
||||
shard_count = self.parameters.get('ShardCount')
|
||||
self.kinesis_backend.create_stream(stream_name, shard_count, self.region)
|
||||
return ""
|
||||
|
||||
def describe_stream(self):
|
||||
stream_name = self.parameters.get('StreamName')
|
||||
stream = self.kinesis_backend.describe_stream(stream_name)
|
||||
return json.dumps(stream.to_json())
|
||||
|
||||
def list_streams(self):
|
||||
streams = self.kinesis_backend.list_streams()
|
||||
|
||||
return json.dumps({
|
||||
"HasMoreStreams": False,
|
||||
"StreamNames": [stream.stream_name for stream in streams],
|
||||
})
|
||||
|
||||
def delete_stream(self):
|
||||
stream_name = self.parameters.get("StreamName")
|
||||
self.kinesis_backend.delete_stream(stream_name)
|
||||
|
||||
return ""
|
||||
10
moto/kinesis/urls.py
Normal file
10
moto/kinesis/urls.py
Normal file
|
|
@ -0,0 +1,10 @@
|
|||
from __future__ import unicode_literals
|
||||
from .responses import KinesisResponse
|
||||
|
||||
url_bases = [
|
||||
"https?://kinesis.(.+).amazonaws.com",
|
||||
]
|
||||
|
||||
url_paths = {
|
||||
'{0}/$': KinesisResponse().dispatch,
|
||||
}
|
||||
Loading…
Add table
Add a link
Reference in a new issue