* Support for CloudFormation update and delete of Kinesis Streams (#3212)

* Support for CloudFormation stack resource deletion via backend resource method
  delete_from_cloudformation_json() via parse_and_delete_resource().
* Correction to the inappropriate inclusion of EndingSequenceNumber in open shards.
  This attribute should only appear in closed shards.  This regretfully prevents
  confirmation of consistent record counts after split/merge in unit tests.
* Added parameters/decorator to CloudFormationModel method declarations to calm-down Pycharm.

Co-authored-by: Joseph Weitekamp <jweite@amazon.com>
This commit is contained in:
jweite 2020-08-03 11:04:05 -04:00 committed by GitHub
commit da07adae52
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
5 changed files with 268 additions and 36 deletions

View file

@ -649,6 +649,23 @@ class ResourceMap(collections_abc.Mapping):
try:
if parsed_resource and hasattr(parsed_resource, "delete"):
parsed_resource.delete(self._region_name)
else:
resource_name_attribute = (
parsed_resource.cloudformation_name_type()
if hasattr(parsed_resource, "cloudformation_name_type")
else resource_name_property_from_type(parsed_resource.type)
)
if resource_name_attribute:
resource_json = self._resource_json_map[
parsed_resource.logical_resource_id
]
resource_name = resource_json["Properties"][
resource_name_attribute
]
parse_and_delete_resource(
resource_name, resource_json, self, self._region_name
)
self._parsed_resources.pop(parsed_resource.logical_resource_id)
except Exception as e:
# skip over dependency violations, and try again in a
# second pass

View file

@ -538,21 +538,25 @@ class BaseModel(object):
# Parent class for every Model that can be instantiated by CloudFormation
# On subclasses, implement the two methods as @staticmethod to ensure correct behaviour of the CF parser
class CloudFormationModel(BaseModel):
@staticmethod
@abstractmethod
def cloudformation_name_type(self):
def cloudformation_name_type():
# https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-properties-name.html
# This must be implemented as a staticmethod with no parameters
# Return None for resources that do not have a name property
pass
@staticmethod
@abstractmethod
def cloudformation_type(self):
def cloudformation_type():
# This must be implemented as a staticmethod with no parameters
# See for example https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-dynamodb-table.html
return "AWS::SERVICE::RESOURCE"
@abstractmethod
def create_from_cloudformation_json(self):
def create_from_cloudformation_json(
cls, resource_name, cloudformation_json, region_name
):
# This must be implemented as a classmethod with parameters:
# cls, resource_name, cloudformation_json, region_name
# Extract the resource parameters from the cloudformation json
@ -560,7 +564,9 @@ class CloudFormationModel(BaseModel):
pass
@abstractmethod
def update_from_cloudformation_json(self):
def update_from_cloudformation_json(
cls, original_resource, new_resource_name, cloudformation_json, region_name
):
# This must be implemented as a classmethod with parameters:
# cls, original_resource, new_resource_name, cloudformation_json, region_name
# Extract the resource parameters from the cloudformation json,
@ -569,7 +575,9 @@ class CloudFormationModel(BaseModel):
pass
@abstractmethod
def delete_from_cloudformation_json(self):
def delete_from_cloudformation_json(
cls, resource_name, cloudformation_json, region_name
):
# This must be implemented as a classmethod with parameters:
# cls, resource_name, cloudformation_json, region_name
# Extract the resource parameters from the cloudformation json

View file

@ -53,6 +53,7 @@ class Shard(BaseModel):
self.starting_hash = starting_hash
self.ending_hash = ending_hash
self.records = OrderedDict()
self.is_open = True
@property
def shard_id(self):
@ -116,29 +117,41 @@ class Shard(BaseModel):
return r.sequence_number
def to_json(self):
return {
response = {
"HashKeyRange": {
"EndingHashKey": str(self.ending_hash),
"StartingHashKey": str(self.starting_hash),
},
"SequenceNumberRange": {
"EndingSequenceNumber": self.get_max_sequence_number(),
"StartingSequenceNumber": self.get_min_sequence_number(),
},
"ShardId": self.shard_id,
}
if not self.is_open:
response["SequenceNumberRange"][
"EndingSequenceNumber"
] = self.get_max_sequence_number()
return response
class Stream(CloudFormationModel):
def __init__(self, stream_name, shard_count, region):
def __init__(self, stream_name, shard_count, region_name):
self.stream_name = stream_name
self.shard_count = shard_count
self.creation_datetime = datetime.datetime.now()
self.region = region
self.region = region_name
self.account_number = ACCOUNT_ID
self.shards = {}
self.tags = {}
self.status = "ACTIVE"
self.shard_count = None
self.update_shard_count(shard_count)
def update_shard_count(self, shard_count):
# ToDo: This was extracted from init. It's only accurate for new streams.
# It doesn't (yet) try to accurately mimic the more complex re-sharding behavior.
# It makes the stream as if it had been created with this number of shards.
# Logically consistent, but not what AWS does.
self.shard_count = shard_count
step = 2 ** 128 // shard_count
hash_ranges = itertools.chain(
@ -146,7 +159,6 @@ class Stream(CloudFormationModel):
[(shard_count - 1, (shard_count - 1) * step, 2 ** 128)],
)
for index, start, end in hash_ranges:
shard = Shard(index, start, end)
self.shards[shard.shard_id] = shard
@ -229,10 +241,65 @@ class Stream(CloudFormationModel):
def create_from_cloudformation_json(
cls, resource_name, cloudformation_json, region_name
):
properties = cloudformation_json["Properties"]
region = properties.get("Region", "us-east-1")
properties = cloudformation_json.get("Properties", {})
shard_count = properties.get("ShardCount", 1)
return Stream(properties["Name"], shard_count, region)
name = properties.get("Name", resource_name)
backend = kinesis_backends[region_name]
return backend.create_stream(name, shard_count, region_name)
@classmethod
def update_from_cloudformation_json(
cls, original_resource, new_resource_name, cloudformation_json, region_name,
):
properties = cloudformation_json["Properties"]
if Stream.is_replacement_update(properties):
resource_name_property = cls.cloudformation_name_type()
if resource_name_property not in properties:
properties[resource_name_property] = new_resource_name
new_resource = cls.create_from_cloudformation_json(
properties[resource_name_property], cloudformation_json, region_name
)
properties[resource_name_property] = original_resource.name
cls.delete_from_cloudformation_json(
original_resource.name, cloudformation_json, region_name
)
return new_resource
else: # No Interruption
if "ShardCount" in properties:
original_resource.update_shard_count(properties["ShardCount"])
return original_resource
@classmethod
def delete_from_cloudformation_json(
cls, resource_name, cloudformation_json, region_name
):
backend = kinesis_backends[region_name]
properties = cloudformation_json.get("Properties", {})
stream_name = properties.get(cls.cloudformation_name_type(), resource_name)
backend.delete_stream(stream_name)
@staticmethod
def is_replacement_update(properties):
properties_requiring_replacement_update = ["BucketName", "ObjectLockEnabled"]
return any(
[
property_requiring_replacement in properties
for property_requiring_replacement in properties_requiring_replacement_update
]
)
def get_cfn_attribute(self, attribute_name):
from moto.cloudformation.exceptions import UnformattedGetAttTemplateException
if attribute_name == "Arn":
return self.arn
raise UnformattedGetAttTemplateException()
@property
def physical_resource_id(self):
return self.stream_name
class FirehoseRecord(BaseModel):
@ -331,10 +398,10 @@ class KinesisBackend(BaseBackend):
self.streams = OrderedDict()
self.delivery_streams = {}
def create_stream(self, stream_name, shard_count, region):
def create_stream(self, stream_name, shard_count, region_name):
if stream_name in self.streams:
raise ResourceInUseError(stream_name)
stream = Stream(stream_name, shard_count, region)
stream = Stream(stream_name, shard_count, region_name)
self.streams[stream_name] = stream
return stream