diff --git a/moto/s3/exceptions.py b/moto/s3/exceptions.py
index 24704e7e..08dd0231 100644
--- a/moto/s3/exceptions.py
+++ b/moto/s3/exceptions.py
@@ -111,3 +111,30 @@ class MalformedXML(S3ClientError):
"MalformedXML",
"The XML you provided was not well-formed or did not validate against our published schema",
*args, **kwargs)
+
+
+class MalformedACLError(S3ClientError):
+ code = 400
+
+ def __init__(self, *args, **kwargs):
+ super(MalformedACLError, self).__init__(
+ "MalformedACLError",
+ "The XML you provided was not well-formed or did not validate against our published schema",
+ *args, **kwargs)
+
+
+class InvalidTargetBucketForLogging(S3ClientError):
+ code = 400
+
+ def __init__(self, msg):
+ super(InvalidTargetBucketForLogging, self).__init__("InvalidTargetBucketForLogging", msg)
+
+
+class CrossLocationLoggingProhibitted(S3ClientError):
+ code = 403
+
+ def __init__(self):
+ super(CrossLocationLoggingProhibitted, self).__init__(
+ "CrossLocationLoggingProhibitted",
+ "Cross S3 location logging not allowed."
+ )
diff --git a/moto/s3/models.py b/moto/s3/models.py
index 91d3c1e2..7eb89531 100644
--- a/moto/s3/models.py
+++ b/moto/s3/models.py
@@ -347,6 +347,7 @@ class FakeBucket(BaseModel):
self.acl = get_canned_acl('private')
self.tags = FakeTagging()
self.cors = []
+ self.logging = {}
@property
def location(self):
@@ -422,6 +423,40 @@ class FakeBucket(BaseModel):
def tagging(self):
return self.tags
+ def set_logging(self, logging_config, bucket_backend):
+ if not logging_config:
+ self.logging = {}
+ else:
+ from moto.s3.exceptions import InvalidTargetBucketForLogging, CrossLocationLoggingProhibitted
+ # Target bucket must exist in the same account (assuming all moto buckets are in the same account):
+ if not bucket_backend.buckets.get(logging_config["TargetBucket"]):
+ raise InvalidTargetBucketForLogging("The target bucket for logging does not exist.")
+
+ # Does the target bucket have the log-delivery WRITE and READ_ACP permissions?
+ write = read_acp = False
+ for grant in bucket_backend.buckets[logging_config["TargetBucket"]].acl.grants:
+ # Must be granted to: http://acs.amazonaws.com/groups/s3/LogDelivery
+ for grantee in grant.grantees:
+ if grantee.uri == "http://acs.amazonaws.com/groups/s3/LogDelivery":
+ if "WRITE" in grant.permissions or "FULL_CONTROL" in grant.permissions:
+ write = True
+
+ if "READ_ACP" in grant.permissions or "FULL_CONTROL" in grant.permissions:
+ read_acp = True
+
+ break
+
+ if not write or not read_acp:
+ raise InvalidTargetBucketForLogging("You must give the log-delivery group WRITE and READ_ACP"
+ " permissions to the target bucket")
+
+ # Buckets must also exist within the same region:
+ if bucket_backend.buckets[logging_config["TargetBucket"]].region_name != self.region_name:
+ raise CrossLocationLoggingProhibitted()
+
+ # Checks pass -- set the logging config:
+ self.logging = logging_config
+
def set_website_configuration(self, website_configuration):
self.website_configuration = website_configuration
@@ -608,6 +643,10 @@ class S3Backend(BaseBackend):
bucket = self.get_bucket(bucket_name)
bucket.set_cors(cors_rules)
+ def put_bucket_logging(self, bucket_name, logging_config):
+ bucket = self.get_bucket(bucket_name)
+ bucket.set_logging(logging_config, self)
+
def delete_bucket_cors(self, bucket_name):
bucket = self.get_bucket(bucket_name)
bucket.delete_cors()
diff --git a/moto/s3/responses.py b/moto/s3/responses.py
index 6abb4f2d..8d2caf09 100755
--- a/moto/s3/responses.py
+++ b/moto/s3/responses.py
@@ -11,11 +11,13 @@ import xmltodict
from moto.packages.httpretty.core import HTTPrettyRequest
from moto.core.responses import _TemplateEnvironmentMixin
-from moto.s3bucket_path.utils import bucket_name_from_url as bucketpath_bucket_name_from_url, parse_key_name as bucketpath_parse_key_name, is_delete_keys as bucketpath_is_delete_keys
+from moto.s3bucket_path.utils import bucket_name_from_url as bucketpath_bucket_name_from_url, \
+ parse_key_name as bucketpath_parse_key_name, is_delete_keys as bucketpath_is_delete_keys
-
-from .exceptions import BucketAlreadyExists, S3ClientError, MissingBucket, MissingKey, InvalidPartOrder
-from .models import s3_backend, get_canned_acl, FakeGrantee, FakeGrant, FakeAcl, FakeKey, FakeTagging, FakeTagSet, FakeTag
+from .exceptions import BucketAlreadyExists, S3ClientError, MissingBucket, MissingKey, InvalidPartOrder, MalformedXML, \
+ MalformedACLError
+from .models import s3_backend, get_canned_acl, FakeGrantee, FakeGrant, FakeAcl, FakeKey, FakeTagging, FakeTagSet, \
+ FakeTag
from .utils import bucket_name_from_url, metadata_from_headers
from xml.dom import minidom
@@ -70,8 +72,9 @@ class ResponseObject(_TemplateEnvironmentMixin):
match = re.match(r'^\[(.+)\](:\d+)?$', host)
if match:
- match = re.match(r'^(((?=.*(::))(?!.*\3.+\3))\3?|[\dA-F]{1,4}:)([\dA-F]{1,4}(\3|:\b)|\2){5}(([\dA-F]{1,4}(\3|:\b|$)|\2){2}|(((2[0-4]|1\d|[1-9])?\d|25[0-5])\.?\b){4})\Z',
- match.groups()[0], re.IGNORECASE)
+ match = re.match(
+ r'^(((?=.*(::))(?!.*\3.+\3))\3?|[\dA-F]{1,4}:)([\dA-F]{1,4}(\3|:\b)|\2){5}(([\dA-F]{1,4}(\3|:\b|$)|\2){2}|(((2[0-4]|1\d|[1-9])?\d|25[0-5])\.?\b){4})\Z',
+ match.groups()[0], re.IGNORECASE)
if match:
return False
@@ -229,6 +232,13 @@ class ResponseObject(_TemplateEnvironmentMixin):
return 404, {}, template.render(bucket_name=bucket_name)
template = self.response_template(S3_BUCKET_TAGGING_RESPONSE)
return template.render(bucket=bucket)
+ elif 'logging' in querystring:
+ bucket = self.backend.get_bucket(bucket_name)
+ if not bucket.logging:
+ template = self.response_template(S3_NO_LOGGING_CONFIG)
+ return 200, {}, template.render()
+ template = self.response_template(S3_LOGGING_CONFIG)
+ return 200, {}, template.render(logging=bucket.logging)
elif "cors" in querystring:
bucket = self.backend.get_bucket(bucket_name)
if len(bucket.cors) == 0:
@@ -324,8 +334,7 @@ class ResponseObject(_TemplateEnvironmentMixin):
limit = continuation_token or start_after
result_keys = self._get_results_from_token(result_keys, limit)
- result_keys, is_truncated, \
- next_continuation_token = self._truncate_result(result_keys, max_keys)
+ result_keys, is_truncated, next_continuation_token = self._truncate_result(result_keys, max_keys)
return template.render(
bucket=bucket,
@@ -380,8 +389,11 @@ class ResponseObject(_TemplateEnvironmentMixin):
self.backend.set_bucket_policy(bucket_name, body)
return 'True'
elif 'acl' in querystring:
- # TODO: Support the XML-based ACL format
- self.backend.set_bucket_acl(bucket_name, self._acl_from_headers(request.headers))
+ # Headers are first. If not set, then look at the body (consistent with the documentation):
+ acls = self._acl_from_headers(request.headers)
+ if not acls:
+ acls = self._acl_from_xml(body)
+ self.backend.set_bucket_acl(bucket_name, acls)
return ""
elif "tagging" in querystring:
tagging = self._bucket_tagging_from_xml(body)
@@ -391,12 +403,18 @@ class ResponseObject(_TemplateEnvironmentMixin):
self.backend.set_bucket_website_configuration(bucket_name, body)
return ""
elif "cors" in querystring:
- from moto.s3.exceptions import MalformedXML
try:
self.backend.put_bucket_cors(bucket_name, self._cors_from_xml(body))
return ""
except KeyError:
raise MalformedXML()
+ elif "logging" in querystring:
+ try:
+ self.backend.put_bucket_logging(bucket_name, self._logging_from_xml(body))
+ return ""
+ except KeyError:
+ raise MalformedXML()
+
else:
if body:
try:
@@ -515,6 +533,7 @@ class ResponseObject(_TemplateEnvironmentMixin):
def toint(i):
return int(i) if i else None
+
begin, end = map(toint, rspec.split('-'))
if begin is not None: # byte range
end = last if end is None else min(end, last)
@@ -731,6 +750,58 @@ class ResponseObject(_TemplateEnvironmentMixin):
else:
return 404, response_headers, ""
+ def _acl_from_xml(self, xml):
+ parsed_xml = xmltodict.parse(xml)
+ if not parsed_xml.get("AccessControlPolicy"):
+ raise MalformedACLError()
+
+ # The owner is needed for some reason...
+ if not parsed_xml["AccessControlPolicy"].get("Owner"):
+ # TODO: Validate that the Owner is actually correct.
+ raise MalformedACLError()
+
+ # If empty, then no ACLs:
+ if parsed_xml["AccessControlPolicy"].get("AccessControlList") is None:
+ return []
+
+ if not parsed_xml["AccessControlPolicy"]["AccessControlList"].get("Grant"):
+ raise MalformedACLError()
+
+ permissions = [
+ "READ",
+ "WRITE",
+ "READ_ACP",
+ "WRITE_ACP",
+ "FULL_CONTROL"
+ ]
+
+ if not isinstance(parsed_xml["AccessControlPolicy"]["AccessControlList"]["Grant"], list):
+ parsed_xml["AccessControlPolicy"]["AccessControlList"]["Grant"] = \
+ [parsed_xml["AccessControlPolicy"]["AccessControlList"]["Grant"]]
+
+ grants = self._get_grants_from_xml(parsed_xml["AccessControlPolicy"]["AccessControlList"]["Grant"],
+ MalformedACLError, permissions)
+ return FakeAcl(grants)
+
+ def _get_grants_from_xml(self, grant_list, exception_type, permissions):
+ grants = []
+ for grant in grant_list:
+ if grant.get("Permission", "") not in permissions:
+ raise exception_type()
+
+ if grant["Grantee"].get("@xsi:type", "") not in ["CanonicalUser", "AmazonCustomerByEmail", "Group"]:
+ raise exception_type()
+
+ # TODO: Verify that the proper grantee data is supplied based on the type.
+
+ grants.append(FakeGrant(
+ [FakeGrantee(id=grant["Grantee"].get("ID", ""), display_name=grant["Grantee"].get("DisplayName", ""),
+ uri=grant["Grantee"].get("URI", ""))],
+ [grant["Permission"]])
+ )
+
+ return grants
+
def _acl_from_headers(self, headers):
canned_acl = headers.get('x-amz-acl', '')
if canned_acl:
@@ -814,6 +885,42 @@ class ResponseObject(_TemplateEnvironmentMixin):
return [parsed_xml["CORSConfiguration"]["CORSRule"]]
+ def _logging_from_xml(self, xml):
+ parsed_xml = xmltodict.parse(xml)
+
+ if not parsed_xml["BucketLoggingStatus"].get("LoggingEnabled"):
+ return {}
+
+ if not parsed_xml["BucketLoggingStatus"]["LoggingEnabled"].get("TargetBucket"):
+ raise MalformedXML()
+
+ if not parsed_xml["BucketLoggingStatus"]["LoggingEnabled"].get("TargetPrefix"):
+ parsed_xml["BucketLoggingStatus"]["LoggingEnabled"]["TargetPrefix"] = ""
+
+ # Get the ACLs:
+ if parsed_xml["BucketLoggingStatus"]["LoggingEnabled"].get("TargetGrants"):
+ permissions = [
+ "READ",
+ "WRITE",
+ "FULL_CONTROL"
+ ]
+ if not isinstance(parsed_xml["BucketLoggingStatus"]["LoggingEnabled"]["TargetGrants"]["Grant"], list):
+ target_grants = self._get_grants_from_xml(
+ [parsed_xml["BucketLoggingStatus"]["LoggingEnabled"]["TargetGrants"]["Grant"]],
+ MalformedXML,
+ permissions
+ )
+ else:
+ target_grants = self._get_grants_from_xml(
+ parsed_xml["BucketLoggingStatus"]["LoggingEnabled"]["TargetGrants"]["Grant"],
+ MalformedXML,
+ permissions
+ )
+
+ parsed_xml["BucketLoggingStatus"]["LoggingEnabled"]["TargetGrants"] = target_grants
+
+ return parsed_xml["BucketLoggingStatus"]["LoggingEnabled"]
+
def _key_response_delete(self, bucket_name, query, key_name, headers):
if query.get('uploadId'):
upload_id = query['uploadId'][0]
@@ -1322,3 +1429,37 @@ S3_NO_CORS_CONFIG = """
9Gjjt1m+cjU4OPvX9O9/8RuvnG41MRb/18Oux2o5H5MY7ISNTlXN+Dz9IG62/ILVxhAGI0qyPfg=
"""
+
+S3_LOGGING_CONFIG = """
+
+
+ {{ logging["TargetBucket"] }}
+ {{ logging["TargetPrefix"] }}
+ {% if logging.get("TargetGrants") %}
+
+ {% for grant in logging["TargetGrants"] %}
+
+
+ {% if grant.grantees[0].uri %}
+ {{ grant.grantees[0].uri }}
+ {% endif %}
+ {% if grant.grantees[0].id %}
+ {{ grant.grantees[0].id }}
+ {% endif %}
+ {% if grant.grantees[0].display_name %}
+ {{ grant.grantees[0].display_name }}
+ {% endif %}
+
+ {{ grant.permissions[0] }}
+
+ {% endfor %}
+
+ {% endif %}
+
+
+"""
+
+S3_NO_LOGGING_CONFIG = """
+
+"""
diff --git a/tests/test_s3/test_s3.py b/tests/test_s3/test_s3.py
index 829941d7..33752af6 100644
--- a/tests/test_s3/test_s3.py
+++ b/tests/test_s3/test_s3.py
@@ -50,6 +50,7 @@ def reduced_min_part_size(f):
return f(*args, **kwargs)
finally:
s3model.UPLOAD_PART_MIN_SIZE = orig_size
+
return wrapped
@@ -883,11 +884,12 @@ def test_s3_object_in_public_bucket():
s3_anonymous.Object(key='file.txt', bucket_name='test-bucket').get()
exc.exception.response['Error']['Code'].should.equal('403')
- params = {'Bucket': 'test-bucket','Key': 'file.txt'}
+ params = {'Bucket': 'test-bucket', 'Key': 'file.txt'}
presigned_url = boto3.client('s3').generate_presigned_url('get_object', params, ExpiresIn=900)
response = requests.get(presigned_url)
assert response.status_code == 200
+
@mock_s3
def test_s3_object_in_private_bucket():
s3 = boto3.resource('s3')
@@ -1102,6 +1104,7 @@ def test_boto3_key_etag():
resp = s3.get_object(Bucket='mybucket', Key='steve')
resp['ETag'].should.equal('"d32bda93738f7e03adb22e66c90fbc04"')
+
@mock_s3
def test_website_redirect_location():
s3 = boto3.client('s3', region_name='us-east-1')
@@ -1116,6 +1119,7 @@ def test_website_redirect_location():
resp = s3.get_object(Bucket='mybucket', Key='steve')
resp['WebsiteRedirectLocation'].should.equal(url)
+
@mock_s3
def test_boto3_list_keys_xml_escaped():
s3 = boto3.client('s3', region_name='us-east-1')
@@ -1627,7 +1631,7 @@ def test_boto3_put_bucket_cors():
})
e = err.exception
e.response["Error"]["Code"].should.equal("InvalidRequest")
- e.response["Error"]["Message"].should.equal("Found unsupported HTTP method in CORS config. "
+ e.response["Error"]["Message"].should.equal("Found unsupported HTTP method in CORS config. "
"Unsupported method is NOTREAL")
with assert_raises(ClientError) as err:
@@ -1732,6 +1736,249 @@ def test_boto3_delete_bucket_cors():
e.response["Error"]["Message"].should.equal("The CORS configuration does not exist")
+@mock_s3
+def test_put_bucket_acl_body():
+ s3 = boto3.client("s3", region_name="us-east-1")
+ s3.create_bucket(Bucket="bucket")
+ bucket_owner = s3.get_bucket_acl(Bucket="bucket")["Owner"]
+ s3.put_bucket_acl(Bucket="bucket", AccessControlPolicy={
+ "Grants": [
+ {
+ "Grantee": {
+ "URI": "http://acs.amazonaws.com/groups/s3/LogDelivery",
+ "Type": "Group"
+ },
+ "Permission": "WRITE"
+ },
+ {
+ "Grantee": {
+ "URI": "http://acs.amazonaws.com/groups/s3/LogDelivery",
+ "Type": "Group"
+ },
+ "Permission": "READ_ACP"
+ }
+ ],
+ "Owner": bucket_owner
+ })
+
+ result = s3.get_bucket_acl(Bucket="bucket")
+ assert len(result["Grants"]) == 2
+ for g in result["Grants"]:
+ assert g["Grantee"]["URI"] == "http://acs.amazonaws.com/groups/s3/LogDelivery"
+ assert g["Grantee"]["Type"] == "Group"
+ assert g["Permission"] in ["WRITE", "READ_ACP"]
+
+ # With one:
+ s3.put_bucket_acl(Bucket="bucket", AccessControlPolicy={
+ "Grants": [
+ {
+ "Grantee": {
+ "URI": "http://acs.amazonaws.com/groups/s3/LogDelivery",
+ "Type": "Group"
+ },
+ "Permission": "WRITE"
+ }
+ ],
+ "Owner": bucket_owner
+ })
+ result = s3.get_bucket_acl(Bucket="bucket")
+ assert len(result["Grants"]) == 1
+
+ # With no owner:
+ with assert_raises(ClientError) as err:
+ s3.put_bucket_acl(Bucket="bucket", AccessControlPolicy={
+ "Grants": [
+ {
+ "Grantee": {
+ "URI": "http://acs.amazonaws.com/groups/s3/LogDelivery",
+ "Type": "Group"
+ },
+ "Permission": "WRITE"
+ }
+ ]
+ })
+ assert err.exception.response["Error"]["Code"] == "MalformedACLError"
+
+ # With incorrect permission:
+ with assert_raises(ClientError) as err:
+ s3.put_bucket_acl(Bucket="bucket", AccessControlPolicy={
+ "Grants": [
+ {
+ "Grantee": {
+ "URI": "http://acs.amazonaws.com/groups/s3/LogDelivery",
+ "Type": "Group"
+ },
+ "Permission": "lskjflkasdjflkdsjfalisdjflkdsjf"
+ }
+ ],
+ "Owner": bucket_owner
+ })
+ assert err.exception.response["Error"]["Code"] == "MalformedACLError"
+
+ # Clear the ACLs:
+ result = s3.put_bucket_acl(Bucket="bucket", AccessControlPolicy={"Grants": [], "Owner": bucket_owner})
+ assert not result.get("Grants")
+
+
+@mock_s3
+def test_boto3_put_bucket_logging():
+ s3 = boto3.client("s3", region_name="us-east-1")
+ bucket_name = "mybucket"
+ log_bucket = "logbucket"
+ wrong_region_bucket = "wrongregionlogbucket"
+ s3.create_bucket(Bucket=bucket_name)
+ s3.create_bucket(Bucket=log_bucket) # Adding the ACL for log-delivery later...
+ s3.create_bucket(Bucket=wrong_region_bucket, CreateBucketConfiguration={"LocationConstraint": "us-west-2"})
+
+ # No logging config:
+ result = s3.get_bucket_logging(Bucket=bucket_name)
+ assert not result.get("LoggingEnabled")
+
+ # A log-bucket that doesn't exist:
+ with assert_raises(ClientError) as err:
+ s3.put_bucket_logging(Bucket=bucket_name, BucketLoggingStatus={
+ "LoggingEnabled": {
+ "TargetBucket": "IAMNOTREAL",
+ "TargetPrefix": ""
+ }
+ })
+ assert err.exception.response["Error"]["Code"] == "InvalidTargetBucketForLogging"
+
+ # A log-bucket that's missing the proper ACLs for LogDelivery:
+ with assert_raises(ClientError) as err:
+ s3.put_bucket_logging(Bucket=bucket_name, BucketLoggingStatus={
+ "LoggingEnabled": {
+ "TargetBucket": log_bucket,
+ "TargetPrefix": ""
+ }
+ })
+ assert err.exception.response["Error"]["Code"] == "InvalidTargetBucketForLogging"
+ assert "log-delivery" in err.exception.response["Error"]["Message"]
+
+ # Add the proper "log-delivery" ACL to the log buckets:
+ bucket_owner = s3.get_bucket_acl(Bucket=log_bucket)["Owner"]
+ for bucket in [log_bucket, wrong_region_bucket]:
+ s3.put_bucket_acl(Bucket=bucket, AccessControlPolicy={
+ "Grants": [
+ {
+ "Grantee": {
+ "URI": "http://acs.amazonaws.com/groups/s3/LogDelivery",
+ "Type": "Group"
+ },
+ "Permission": "WRITE"
+ },
+ {
+ "Grantee": {
+ "URI": "http://acs.amazonaws.com/groups/s3/LogDelivery",
+ "Type": "Group"
+ },
+ "Permission": "READ_ACP"
+ },
+ {
+ "Grantee": {
+ "Type": "CanonicalUser",
+ "ID": bucket_owner["ID"]
+ },
+ "Permission": "FULL_CONTROL"
+ }
+ ],
+ "Owner": bucket_owner
+ })
+
+ # A log-bucket that's in the wrong region:
+ with assert_raises(ClientError) as err:
+ s3.put_bucket_logging(Bucket=bucket_name, BucketLoggingStatus={
+ "LoggingEnabled": {
+ "TargetBucket": wrong_region_bucket,
+ "TargetPrefix": ""
+ }
+ })
+ assert err.exception.response["Error"]["Code"] == "CrossLocationLoggingProhibitted"
+
+ # Correct logging:
+ s3.put_bucket_logging(Bucket=bucket_name, BucketLoggingStatus={
+ "LoggingEnabled": {
+ "TargetBucket": log_bucket,
+ "TargetPrefix": "{}/".format(bucket_name)
+ }
+ })
+ result = s3.get_bucket_logging(Bucket=bucket_name)
+ assert result["LoggingEnabled"]["TargetBucket"] == log_bucket
+ assert result["LoggingEnabled"]["TargetPrefix"] == "{}/".format(bucket_name)
+ assert not result["LoggingEnabled"].get("TargetGrants")
+
+ # And disabling:
+ s3.put_bucket_logging(Bucket=bucket_name, BucketLoggingStatus={})
+ assert not s3.get_bucket_logging(Bucket=bucket_name).get("LoggingEnabled")
+
+ # And enabling with multiple target grants:
+ s3.put_bucket_logging(Bucket=bucket_name, BucketLoggingStatus={
+ "LoggingEnabled": {
+ "TargetBucket": log_bucket,
+ "TargetPrefix": "{}/".format(bucket_name),
+ "TargetGrants": [
+ {
+ "Grantee": {
+ "ID": "SOMEIDSTRINGHERE9238748923734823917498237489237409123840983274",
+ "Type": "CanonicalUser"
+ },
+ "Permission": "READ"
+ },
+ {
+ "Grantee": {
+ "ID": "SOMEIDSTRINGHERE9238748923734823917498237489237409123840983274",
+ "Type": "CanonicalUser"
+ },
+ "Permission": "WRITE"
+ }
+ ]
+ }
+ })
+
+ result = s3.get_bucket_logging(Bucket=bucket_name)
+ assert len(result["LoggingEnabled"]["TargetGrants"]) == 2
+ assert result["LoggingEnabled"]["TargetGrants"][0]["Grantee"]["ID"] == \
+ "SOMEIDSTRINGHERE9238748923734823917498237489237409123840983274"
+
+ # Test with just 1 grant:
+ s3.put_bucket_logging(Bucket=bucket_name, BucketLoggingStatus={
+ "LoggingEnabled": {
+ "TargetBucket": log_bucket,
+ "TargetPrefix": "{}/".format(bucket_name),
+ "TargetGrants": [
+ {
+ "Grantee": {
+ "ID": "SOMEIDSTRINGHERE9238748923734823917498237489237409123840983274",
+ "Type": "CanonicalUser"
+ },
+ "Permission": "READ"
+ }
+ ]
+ }
+ })
+ result = s3.get_bucket_logging(Bucket=bucket_name)
+ assert len(result["LoggingEnabled"]["TargetGrants"]) == 1
+
+ # With an invalid grant:
+ with assert_raises(ClientError) as err:
+ s3.put_bucket_logging(Bucket=bucket_name, BucketLoggingStatus={
+ "LoggingEnabled": {
+ "TargetBucket": log_bucket,
+ "TargetPrefix": "{}/".format(bucket_name),
+ "TargetGrants": [
+ {
+ "Grantee": {
+ "ID": "SOMEIDSTRINGHERE9238748923734823917498237489237409123840983274",
+ "Type": "CanonicalUser"
+ },
+ "Permission": "NOTAREALPERM"
+ }
+ ]
+ }
+ })
+ assert err.exception.response["Error"]["Code"] == "MalformedXML"
+
+
@mock_s3
def test_boto3_put_object_tagging():
s3 = boto3.client('s3', region_name='us-east-1')
@@ -1939,11 +2186,10 @@ def test_get_stream_gzipped():
Bucket='moto-tests',
Key='keyname',
)
- res = zlib.decompress(obj['Body'].read(), 16+zlib.MAX_WBITS)
+ res = zlib.decompress(obj['Body'].read(), 16 + zlib.MAX_WBITS)
assert res == payload
-
TEST_XML = """\