Implemented S3 Public Access Block

This commit is contained in:
Mike Grima 2019-12-09 17:38:26 -08:00
commit 84ccdbd1cd
7 changed files with 353 additions and 33 deletions

View file

@ -1,5 +1,8 @@
from __future__ import unicode_literals
import copy
import sys
import sure # noqa
from freezegun import freeze_time
@ -7,6 +10,7 @@ from moto.core.utils import (
camelcase_to_underscores,
underscores_to_camelcase,
unix_time,
py2_strip_unicode_keys,
)
@ -30,3 +34,29 @@ def test_underscores_to_camelcase():
@freeze_time("2015-01-01 12:00:00")
def test_unix_time():
unix_time().should.equal(1420113600.0)
if sys.version_info[0] < 3:
# Tests for unicode removals (Python 2 only)
def _verify_no_unicode(blob):
"""Verify that no unicode values exist"""
if type(blob) == dict:
for key, value in blob.items():
assert type(key) != unicode
_verify_no_unicode(value)
elif type(blob) in [list, set]:
for item in blob:
_verify_no_unicode(item)
assert blob != unicode
def test_py2_strip_unicode_keys():
bad_dict = {
"some": "value",
"a": {"nested": ["List", "of", {"unicode": "values"}]},
"and a": {"nested", "set", "of", 5, "values"},
}
result = py2_strip_unicode_keys(copy.deepcopy(bad_dict))
_verify_no_unicode(result)

View file

@ -32,9 +32,10 @@ from nose.tools import assert_raises
import sure # noqa
from moto import settings, mock_s3, mock_s3_deprecated
from moto import settings, mock_s3, mock_s3_deprecated, mock_config
import moto.s3.models as s3model
from moto.core.exceptions import InvalidNextTokenException
from moto.core.utils import py2_strip_unicode_keys
if settings.TEST_SERVER_MODE:
REDUCED_PART_SIZE = s3model.UPLOAD_PART_MIN_SIZE
@ -3278,6 +3279,148 @@ def test_delete_objects_with_url_encoded_key(key):
assert_deleted()
@mock_s3
@mock_config
def test_public_access_block():
client = boto3.client("s3")
client.create_bucket(Bucket="mybucket")
# Try to get the public access block (should not exist by default)
with assert_raises(ClientError) as ce:
client.get_public_access_block(Bucket="mybucket")
assert (
ce.exception.response["Error"]["Code"] == "NoSuchPublicAccessBlockConfiguration"
)
assert (
ce.exception.response["Error"]["Message"]
== "The public access block configuration was not found"
)
assert ce.exception.response["ResponseMetadata"]["HTTPStatusCode"] == 404
# Put a public block in place:
test_map = {
"BlockPublicAcls": False,
"IgnorePublicAcls": False,
"BlockPublicPolicy": False,
"RestrictPublicBuckets": False,
}
for field in test_map.keys():
# Toggle:
test_map[field] = True
client.put_public_access_block(
Bucket="mybucket", PublicAccessBlockConfiguration=test_map
)
# Test:
assert (
test_map
== client.get_public_access_block(Bucket="mybucket")[
"PublicAccessBlockConfiguration"
]
)
# Assume missing values are default False:
client.put_public_access_block(
Bucket="mybucket", PublicAccessBlockConfiguration={"BlockPublicAcls": True}
)
assert client.get_public_access_block(Bucket="mybucket")[
"PublicAccessBlockConfiguration"
] == {
"BlockPublicAcls": True,
"IgnorePublicAcls": False,
"BlockPublicPolicy": False,
"RestrictPublicBuckets": False,
}
# Test with a blank PublicAccessBlockConfiguration:
with assert_raises(ClientError) as ce:
client.put_public_access_block(
Bucket="mybucket", PublicAccessBlockConfiguration={}
)
assert ce.exception.response["Error"]["Code"] == "InvalidRequest"
assert (
ce.exception.response["Error"]["Message"]
== "Must specify at least one configuration."
)
assert ce.exception.response["ResponseMetadata"]["HTTPStatusCode"] == 400
# Test that things work with AWS Config:
config_client = boto3.client("config", region_name="us-east-1")
result = config_client.get_resource_config_history(
resourceType="AWS::S3::Bucket", resourceId="mybucket"
)
pub_block_config = json.loads(
result["configurationItems"][0]["supplementaryConfiguration"][
"PublicAccessBlockConfiguration"
]
)
assert pub_block_config == {
"blockPublicAcls": True,
"ignorePublicAcls": False,
"blockPublicPolicy": False,
"restrictPublicBuckets": False,
}
# Delete:
client.delete_public_access_block(Bucket="mybucket")
with assert_raises(ClientError) as ce:
client.get_public_access_block(Bucket="mybucket")
assert (
ce.exception.response["Error"]["Code"] == "NoSuchPublicAccessBlockConfiguration"
)
@mock_s3
def test_s3_public_access_block_to_config_dict():
from moto.s3.config import s3_config_query
# With 1 bucket in us-west-2:
s3_config_query.backends["global"].create_bucket("bucket1", "us-west-2")
public_access_block = {
"BlockPublicAcls": "True",
"IgnorePublicAcls": "False",
"BlockPublicPolicy": "True",
"RestrictPublicBuckets": "False",
}
# Python 2 unicode issues:
if sys.version_info[0] < 3:
public_access_block = py2_strip_unicode_keys(public_access_block)
# Add a public access block:
s3_config_query.backends["global"].put_bucket_public_access_block(
"bucket1", public_access_block
)
result = (
s3_config_query.backends["global"]
.buckets["bucket1"]
.public_access_block.to_config_dict()
)
convert_bool = lambda x: x == "True"
for key, value in public_access_block.items():
assert result[
"{lowercase}{rest}".format(lowercase=key[0].lower(), rest=key[1:])
] == convert_bool(value)
# Verify that this resides in the full bucket's to_config_dict:
full_result = s3_config_query.backends["global"].buckets["bucket1"].to_config_dict()
assert (
json.loads(
full_result["supplementaryConfiguration"]["PublicAccessBlockConfiguration"]
)
== result
)
@mock_s3
def test_list_config_discovered_resources():
from moto.s3.config import s3_config_query