diff --git a/moto/s3/models.py b/moto/s3/models.py
index 3020fd45..25ead4f5 100644
--- a/moto/s3/models.py
+++ b/moto/s3/models.py
@@ -778,6 +778,7 @@ class FakeBucket(BaseModel):
self.payer = "BucketOwner"
self.creation_date = datetime.datetime.utcnow()
self.public_access_block = None
+ self.encryption = None
@property
def location(self):
@@ -1227,6 +1228,9 @@ class S3Backend(BaseBackend):
def get_bucket_versioning(self, bucket_name):
return self.get_bucket(bucket_name).versioning_status
+ def get_bucket_encryption(self, bucket_name):
+ return self.get_bucket(bucket_name).encryption
+
def get_bucket_latest_versions(self, bucket_name):
versions = self.get_bucket_versions(bucket_name)
latest_modified_per_key = {}
@@ -1275,6 +1279,12 @@ class S3Backend(BaseBackend):
bucket = self.get_bucket(bucket_name)
bucket.policy = None
+ def put_bucket_encryption(self, bucket_name, encryption):
+ self.get_bucket(bucket_name).encryption = encryption
+
+ def delete_bucket_encryption(self, bucket_name):
+ self.get_bucket(bucket_name).encryption = None
+
def set_bucket_lifecycle(self, bucket_name, rules):
bucket = self.get_bucket(bucket_name)
bucket.set_lifecycle(rules)
diff --git a/moto/s3/responses.py b/moto/s3/responses.py
index 98f28f01..4aaba1fc 100644
--- a/moto/s3/responses.py
+++ b/moto/s3/responses.py
@@ -466,6 +466,13 @@ class ResponseObject(_TemplateEnvironmentMixin, ActionAuthenticatorMixin):
is_truncated="false",
),
)
+ elif "encryption" in querystring:
+ encryption = self.backend.get_bucket_encryption(bucket_name)
+ if not encryption:
+ template = self.response_template(S3_NO_ENCRYPTION)
+ return 404, {}, template.render(bucket_name=bucket_name)
+ template = self.response_template(S3_ENCRYPTION_CONFIG)
+ return 200, {}, template.render(encryption=encryption)
elif querystring.get("list-type", [None])[0] == "2":
return 200, {}, self._handle_list_objects_v2(bucket_name, querystring)
@@ -703,7 +710,16 @@ class ResponseObject(_TemplateEnvironmentMixin, ActionAuthenticatorMixin):
bucket_name, pab_config["PublicAccessBlockConfiguration"]
)
return ""
-
+ elif "encryption" in querystring:
+ try:
+ self.backend.put_bucket_encryption(
+ bucket_name, self._encryption_config_from_xml(body)
+ )
+ return ""
+ except KeyError:
+ raise MalformedXML()
+ except Exception as e:
+ raise e
else:
# us-east-1, the default AWS region behaves a bit differently
# - you should not use it as a location constraint --> it fails
@@ -768,6 +784,9 @@ class ResponseObject(_TemplateEnvironmentMixin, ActionAuthenticatorMixin):
elif "publicAccessBlock" in querystring:
self.backend.delete_bucket_public_access_block(bucket_name)
return 204, {}, ""
+ elif "encryption" in querystring:
+ bucket = self.backend.delete_bucket_encryption(bucket_name)
+ return 204, {}, ""
removed_bucket = self.backend.delete_bucket(bucket_name)
@@ -1427,6 +1446,22 @@ class ResponseObject(_TemplateEnvironmentMixin, ActionAuthenticatorMixin):
return [parsed_xml["CORSConfiguration"]["CORSRule"]]
+ def _encryption_config_from_xml(self, xml):
+ parsed_xml = xmltodict.parse(xml)
+
+ if (
+ not parsed_xml["ServerSideEncryptionConfiguration"].get("Rule")
+ or not parsed_xml["ServerSideEncryptionConfiguration"]["Rule"].get(
+ "ApplyServerSideEncryptionByDefault"
+ )
+ or not parsed_xml["ServerSideEncryptionConfiguration"]["Rule"][
+ "ApplyServerSideEncryptionByDefault"
+ ].get("SSEAlgorithm")
+ ):
+ raise MalformedXML()
+
+ return [parsed_xml["ServerSideEncryptionConfiguration"]]
+
def _logging_from_xml(self, xml):
parsed_xml = xmltodict.parse(xml)
@@ -2130,6 +2165,31 @@ S3_NO_LOGGING_CONFIG = """
"""
+S3_ENCRYPTION_CONFIG = """
+
+ {% for entry in encryption %}
+
+
+ {{ entry["Rule"]["ApplyServerSideEncryptionByDefault"]["SSEAlgorithm"] }}
+ {% if entry["Rule"]["ApplyServerSideEncryptionByDefault"].get("KMSMasterKeyID") %}
+ {{ entry["Rule"]["ApplyServerSideEncryptionByDefault"]["KMSMasterKeyID"] }}
+ {% endif %}
+
+
+ {% endfor %}
+
+"""
+
+S3_NO_ENCRYPTION = """
+
+ ServerSideEncryptionConfigurationNotFoundError
+ The server side encryption configuration was not found
+ {{ bucket_name }}
+ 0D68A23BB2E2215B
+ 9Gjjt1m+cjU4OPvX9O9/8RuvnG41MRb/18Oux2o5H5MY7ISNTlXN+Dz9IG62/ILVxhAGI0qyPfg=
+
+"""
+
S3_GET_BUCKET_NOTIFICATION_CONFIG = """
{% for topic in bucket.notification_configuration.topic %}
diff --git a/tests/test_s3/test_s3.py b/tests/test_s3/test_s3.py
index bcb9da87..363ccc02 100644
--- a/tests/test_s3/test_s3.py
+++ b/tests/test_s3/test_s3.py
@@ -4521,3 +4521,36 @@ def test_creating_presigned_post():
].read()
== fdata
)
+
+
+@mock_s3
+def test_encryption():
+ # Create Bucket so that test can run
+ conn = boto3.client("s3", region_name="us-east-1")
+ conn.create_bucket(Bucket="mybucket")
+
+ with assert_raises(ClientError) as exc:
+ conn.get_bucket_encryption(Bucket="mybucket")
+
+ sse_config = {
+ "Rules": [
+ {
+ "ApplyServerSideEncryptionByDefault": {
+ "SSEAlgorithm": "aws:kms",
+ "KMSMasterKeyID": "12345678",
+ }
+ }
+ ]
+ }
+
+ conn.put_bucket_encryption(
+ Bucket="mybucket", ServerSideEncryptionConfiguration=sse_config
+ )
+
+ resp = conn.get_bucket_encryption(Bucket="mybucket")
+ assert "ServerSideEncryptionConfiguration" in resp
+ assert resp["ServerSideEncryptionConfiguration"] == sse_config
+
+ conn.delete_bucket_encryption(Bucket="mybucket")
+ with assert_raises(ClientError) as exc:
+ conn.get_bucket_encryption(Bucket="mybucket")