S3 cloudformation update (#3199)
* First cut of S3 Cloudformation Update support: encryption property. * Update type support for S3. Abstract base class for CloudFormation-aware models, as designed by @bblommers, introduced to decentralize CloudFormation resource and name property values to model objects. * Blackened... * Un-renamed param in s3.models.update_from_cloudformation_json() and its call to stay compatible with other modules. Co-authored-by: Bert Blommers <bblommers@users.noreply.github.com> Co-authored-by: Joseph Weitekamp <jweite@amazon.com> Co-authored-by: Bert Blommers <info@bertblommers.nl>
This commit is contained in:
parent
06ed67a8e5
commit
3342d49a43
5 changed files with 241 additions and 19 deletions
33
moto/s3/cloud_formation.py
Normal file
33
moto/s3/cloud_formation.py
Normal file
|
|
@ -0,0 +1,33 @@
|
|||
from collections import OrderedDict
|
||||
|
||||
|
||||
def cfn_to_api_encryption(bucket_encryption_properties):
|
||||
|
||||
sse_algorithm = bucket_encryption_properties["ServerSideEncryptionConfiguration"][
|
||||
0
|
||||
]["ServerSideEncryptionByDefault"]["SSEAlgorithm"]
|
||||
kms_master_key_id = bucket_encryption_properties[
|
||||
"ServerSideEncryptionConfiguration"
|
||||
][0]["ServerSideEncryptionByDefault"].get("KMSMasterKeyID")
|
||||
apply_server_side_encryption_by_default = OrderedDict()
|
||||
apply_server_side_encryption_by_default["SSEAlgorithm"] = sse_algorithm
|
||||
if kms_master_key_id:
|
||||
apply_server_side_encryption_by_default["KMSMasterKeyID"] = kms_master_key_id
|
||||
rule = OrderedDict(
|
||||
{"ApplyServerSideEncryptionByDefault": apply_server_side_encryption_by_default}
|
||||
)
|
||||
bucket_encryption = OrderedDict(
|
||||
{"@xmlns": "http://s3.amazonaws.com/doc/2006-03-01/"}
|
||||
)
|
||||
bucket_encryption["Rule"] = rule
|
||||
return bucket_encryption
|
||||
|
||||
|
||||
def is_replacement_update(properties):
|
||||
properties_requiring_replacement_update = ["BucketName", "ObjectLockEnabled"]
|
||||
return any(
|
||||
[
|
||||
property_requiring_replacement in properties
|
||||
for property_requiring_replacement in properties_requiring_replacement_update
|
||||
]
|
||||
)
|
||||
|
|
@ -43,6 +43,7 @@ from .exceptions import (
|
|||
WrongPublicAccessBlockAccountIdError,
|
||||
NoSuchUpload,
|
||||
)
|
||||
from .cloud_formation import cfn_to_api_encryption, is_replacement_update
|
||||
from .utils import clean_key_name, _VersionedKeyStore
|
||||
|
||||
MAX_BUCKET_NAME_LENGTH = 63
|
||||
|
|
@ -1084,8 +1085,54 @@ class FakeBucket(CloudFormationModel):
|
|||
cls, resource_name, cloudformation_json, region_name
|
||||
):
|
||||
bucket = s3_backend.create_bucket(resource_name, region_name)
|
||||
|
||||
properties = cloudformation_json["Properties"]
|
||||
|
||||
if "BucketEncryption" in properties:
|
||||
bucket_encryption = cfn_to_api_encryption(properties["BucketEncryption"])
|
||||
s3_backend.put_bucket_encryption(
|
||||
bucket_name=resource_name, encryption=[bucket_encryption]
|
||||
)
|
||||
|
||||
return bucket
|
||||
|
||||
@classmethod
|
||||
def update_from_cloudformation_json(
|
||||
cls, original_resource, new_resource_name, cloudformation_json, region_name,
|
||||
):
|
||||
properties = cloudformation_json["Properties"]
|
||||
|
||||
if is_replacement_update(properties):
|
||||
resource_name_property = cls.cloudformation_name_type()
|
||||
if resource_name_property not in properties:
|
||||
properties[resource_name_property] = new_resource_name
|
||||
new_resource = cls.create_from_cloudformation_json(
|
||||
properties[resource_name_property], cloudformation_json, region_name
|
||||
)
|
||||
properties[resource_name_property] = original_resource.name
|
||||
cls.delete_from_cloudformation_json(
|
||||
original_resource.name, cloudformation_json, region_name
|
||||
)
|
||||
return new_resource
|
||||
|
||||
else: # No Interruption
|
||||
if "BucketEncryption" in properties:
|
||||
bucket_encryption = cfn_to_api_encryption(
|
||||
properties["BucketEncryption"]
|
||||
)
|
||||
s3_backend.put_bucket_encryption(
|
||||
bucket_name=original_resource.name, encryption=[bucket_encryption]
|
||||
)
|
||||
return original_resource
|
||||
|
||||
@classmethod
|
||||
def delete_from_cloudformation_json(
|
||||
cls, resource_name, cloudformation_json, region_name
|
||||
):
|
||||
properties = cloudformation_json["Properties"]
|
||||
bucket_name = properties[cls.cloudformation_name_type()]
|
||||
s3_backend.delete_bucket(bucket_name)
|
||||
|
||||
def to_config_dict(self):
|
||||
"""Return the AWS Config JSON format of this S3 bucket.
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue