Merge pull request #2751 from mikegrima/s3control
Implemented S3 Account-level public access block.
This commit is contained in:
commit
a1ffb47ae4
11 changed files with 745 additions and 50 deletions
|
|
@ -46,4 +46,4 @@ def test_domain_dispatched_with_service():
|
|||
dispatcher = DomainDispatcherApplication(create_backend_app, service="s3")
|
||||
backend_app = dispatcher.get_application({"HTTP_HOST": "s3.us-east1.amazonaws.com"})
|
||||
keys = set(backend_app.view_functions.keys())
|
||||
keys.should.contain("ResponseObject.key_response")
|
||||
keys.should.contain("ResponseObject.key_or_control_response")
|
||||
|
|
|
|||
|
|
@ -5,6 +5,7 @@ import datetime
|
|||
import os
|
||||
import sys
|
||||
|
||||
from boto3 import Session
|
||||
from six.moves.urllib.request import urlopen
|
||||
from six.moves.urllib.error import HTTPError
|
||||
from functools import wraps
|
||||
|
|
@ -1135,6 +1136,380 @@ if not settings.TEST_SERVER_MODE:
|
|||
"The unspecified location constraint is incompatible for the region specific endpoint this request was sent to."
|
||||
)
|
||||
|
||||
# All tests for s3-control cannot be run under the server without a modification of the
|
||||
# hosts file on your system. This is due to the fact that the URL to the host is in the form of:
|
||||
# ACCOUNT_ID.s3-control.amazonaws.com <-- That Account ID part is the problem. If you want to
|
||||
# make use of the moto server, update your hosts file for `THE_ACCOUNT_ID_FOR_MOTO.localhost`
|
||||
# and this will work fine.
|
||||
|
||||
@mock_s3
|
||||
def test_get_public_access_block_for_account():
|
||||
from moto.s3.models import ACCOUNT_ID
|
||||
|
||||
client = boto3.client("s3control", region_name="us-west-2")
|
||||
|
||||
# With an invalid account ID:
|
||||
with assert_raises(ClientError) as ce:
|
||||
client.get_public_access_block(AccountId="111111111111")
|
||||
assert ce.exception.response["Error"]["Code"] == "AccessDenied"
|
||||
|
||||
# Without one defined:
|
||||
with assert_raises(ClientError) as ce:
|
||||
client.get_public_access_block(AccountId=ACCOUNT_ID)
|
||||
assert (
|
||||
ce.exception.response["Error"]["Code"]
|
||||
== "NoSuchPublicAccessBlockConfiguration"
|
||||
)
|
||||
|
||||
# Put a with an invalid account ID:
|
||||
with assert_raises(ClientError) as ce:
|
||||
client.put_public_access_block(
|
||||
AccountId="111111111111",
|
||||
PublicAccessBlockConfiguration={"BlockPublicAcls": True},
|
||||
)
|
||||
assert ce.exception.response["Error"]["Code"] == "AccessDenied"
|
||||
|
||||
# Put with an invalid PAB:
|
||||
with assert_raises(ClientError) as ce:
|
||||
client.put_public_access_block(
|
||||
AccountId=ACCOUNT_ID, PublicAccessBlockConfiguration={}
|
||||
)
|
||||
assert ce.exception.response["Error"]["Code"] == "InvalidRequest"
|
||||
assert (
|
||||
"Must specify at least one configuration."
|
||||
in ce.exception.response["Error"]["Message"]
|
||||
)
|
||||
|
||||
# Correct PAB:
|
||||
client.put_public_access_block(
|
||||
AccountId=ACCOUNT_ID,
|
||||
PublicAccessBlockConfiguration={
|
||||
"BlockPublicAcls": True,
|
||||
"IgnorePublicAcls": True,
|
||||
"BlockPublicPolicy": True,
|
||||
"RestrictPublicBuckets": True,
|
||||
},
|
||||
)
|
||||
|
||||
# Get the correct PAB (for all regions):
|
||||
for region in Session().get_available_regions("s3control"):
|
||||
region_client = boto3.client("s3control", region_name=region)
|
||||
assert region_client.get_public_access_block(AccountId=ACCOUNT_ID)[
|
||||
"PublicAccessBlockConfiguration"
|
||||
] == {
|
||||
"BlockPublicAcls": True,
|
||||
"IgnorePublicAcls": True,
|
||||
"BlockPublicPolicy": True,
|
||||
"RestrictPublicBuckets": True,
|
||||
}
|
||||
|
||||
# Delete with an invalid account ID:
|
||||
with assert_raises(ClientError) as ce:
|
||||
client.delete_public_access_block(AccountId="111111111111")
|
||||
assert ce.exception.response["Error"]["Code"] == "AccessDenied"
|
||||
|
||||
# Delete successfully:
|
||||
client.delete_public_access_block(AccountId=ACCOUNT_ID)
|
||||
|
||||
# Confirm that it's deleted:
|
||||
with assert_raises(ClientError) as ce:
|
||||
client.get_public_access_block(AccountId=ACCOUNT_ID)
|
||||
assert (
|
||||
ce.exception.response["Error"]["Code"]
|
||||
== "NoSuchPublicAccessBlockConfiguration"
|
||||
)
|
||||
|
||||
@mock_s3
|
||||
@mock_config
|
||||
def test_config_list_account_pab():
|
||||
from moto.s3.models import ACCOUNT_ID
|
||||
|
||||
client = boto3.client("s3control", region_name="us-west-2")
|
||||
config_client = boto3.client("config", region_name="us-west-2")
|
||||
|
||||
# Create the aggregator:
|
||||
account_aggregation_source = {
|
||||
"AccountIds": [ACCOUNT_ID],
|
||||
"AllAwsRegions": True,
|
||||
}
|
||||
config_client.put_configuration_aggregator(
|
||||
ConfigurationAggregatorName="testing",
|
||||
AccountAggregationSources=[account_aggregation_source],
|
||||
)
|
||||
|
||||
# Without a PAB in place:
|
||||
result = config_client.list_discovered_resources(
|
||||
resourceType="AWS::S3::AccountPublicAccessBlock"
|
||||
)
|
||||
assert not result["resourceIdentifiers"]
|
||||
result = config_client.list_aggregate_discovered_resources(
|
||||
ResourceType="AWS::S3::AccountPublicAccessBlock",
|
||||
ConfigurationAggregatorName="testing",
|
||||
)
|
||||
assert not result["ResourceIdentifiers"]
|
||||
|
||||
# Create a PAB:
|
||||
client.put_public_access_block(
|
||||
AccountId=ACCOUNT_ID,
|
||||
PublicAccessBlockConfiguration={
|
||||
"BlockPublicAcls": True,
|
||||
"IgnorePublicAcls": True,
|
||||
"BlockPublicPolicy": True,
|
||||
"RestrictPublicBuckets": True,
|
||||
},
|
||||
)
|
||||
|
||||
# Test that successful queries work (non-aggregated):
|
||||
result = config_client.list_discovered_resources(
|
||||
resourceType="AWS::S3::AccountPublicAccessBlock"
|
||||
)
|
||||
assert result["resourceIdentifiers"] == [
|
||||
{
|
||||
"resourceType": "AWS::S3::AccountPublicAccessBlock",
|
||||
"resourceId": ACCOUNT_ID,
|
||||
}
|
||||
]
|
||||
result = config_client.list_discovered_resources(
|
||||
resourceType="AWS::S3::AccountPublicAccessBlock",
|
||||
resourceIds=[ACCOUNT_ID, "nope"],
|
||||
)
|
||||
assert result["resourceIdentifiers"] == [
|
||||
{
|
||||
"resourceType": "AWS::S3::AccountPublicAccessBlock",
|
||||
"resourceId": ACCOUNT_ID,
|
||||
}
|
||||
]
|
||||
result = config_client.list_discovered_resources(
|
||||
resourceType="AWS::S3::AccountPublicAccessBlock", resourceName=""
|
||||
)
|
||||
assert result["resourceIdentifiers"] == [
|
||||
{
|
||||
"resourceType": "AWS::S3::AccountPublicAccessBlock",
|
||||
"resourceId": ACCOUNT_ID,
|
||||
}
|
||||
]
|
||||
|
||||
# Test that successful queries work (aggregated):
|
||||
result = config_client.list_aggregate_discovered_resources(
|
||||
ResourceType="AWS::S3::AccountPublicAccessBlock",
|
||||
ConfigurationAggregatorName="testing",
|
||||
)
|
||||
regions = {region for region in Session().get_available_regions("config")}
|
||||
for r in result["ResourceIdentifiers"]:
|
||||
regions.remove(r.pop("SourceRegion"))
|
||||
assert r == {
|
||||
"ResourceType": "AWS::S3::AccountPublicAccessBlock",
|
||||
"SourceAccountId": ACCOUNT_ID,
|
||||
"ResourceId": ACCOUNT_ID,
|
||||
}
|
||||
|
||||
# Just check that the len is the same -- this should be reasonable
|
||||
regions = {region for region in Session().get_available_regions("config")}
|
||||
result = config_client.list_aggregate_discovered_resources(
|
||||
ResourceType="AWS::S3::AccountPublicAccessBlock",
|
||||
ConfigurationAggregatorName="testing",
|
||||
Filters={"ResourceName": ""},
|
||||
)
|
||||
assert len(regions) == len(result["ResourceIdentifiers"])
|
||||
result = config_client.list_aggregate_discovered_resources(
|
||||
ResourceType="AWS::S3::AccountPublicAccessBlock",
|
||||
ConfigurationAggregatorName="testing",
|
||||
Filters={"ResourceName": "", "ResourceId": ACCOUNT_ID},
|
||||
)
|
||||
assert len(regions) == len(result["ResourceIdentifiers"])
|
||||
result = config_client.list_aggregate_discovered_resources(
|
||||
ResourceType="AWS::S3::AccountPublicAccessBlock",
|
||||
ConfigurationAggregatorName="testing",
|
||||
Filters={
|
||||
"ResourceName": "",
|
||||
"ResourceId": ACCOUNT_ID,
|
||||
"Region": "us-west-2",
|
||||
},
|
||||
)
|
||||
assert (
|
||||
result["ResourceIdentifiers"][0]["SourceRegion"] == "us-west-2"
|
||||
and len(result["ResourceIdentifiers"]) == 1
|
||||
)
|
||||
|
||||
# Test aggregator pagination:
|
||||
result = config_client.list_aggregate_discovered_resources(
|
||||
ResourceType="AWS::S3::AccountPublicAccessBlock",
|
||||
ConfigurationAggregatorName="testing",
|
||||
Limit=1,
|
||||
)
|
||||
regions = sorted(
|
||||
[region for region in Session().get_available_regions("config")]
|
||||
)
|
||||
assert result["ResourceIdentifiers"][0] == {
|
||||
"ResourceType": "AWS::S3::AccountPublicAccessBlock",
|
||||
"SourceAccountId": ACCOUNT_ID,
|
||||
"ResourceId": ACCOUNT_ID,
|
||||
"SourceRegion": regions[0],
|
||||
}
|
||||
assert result["NextToken"] == regions[1]
|
||||
|
||||
# Get the next region:
|
||||
result = config_client.list_aggregate_discovered_resources(
|
||||
ResourceType="AWS::S3::AccountPublicAccessBlock",
|
||||
ConfigurationAggregatorName="testing",
|
||||
Limit=1,
|
||||
NextToken=regions[1],
|
||||
)
|
||||
assert result["ResourceIdentifiers"][0] == {
|
||||
"ResourceType": "AWS::S3::AccountPublicAccessBlock",
|
||||
"SourceAccountId": ACCOUNT_ID,
|
||||
"ResourceId": ACCOUNT_ID,
|
||||
"SourceRegion": regions[1],
|
||||
}
|
||||
|
||||
# Non-aggregated with incorrect info:
|
||||
result = config_client.list_discovered_resources(
|
||||
resourceType="AWS::S3::AccountPublicAccessBlock", resourceName="nope"
|
||||
)
|
||||
assert not result["resourceIdentifiers"]
|
||||
result = config_client.list_discovered_resources(
|
||||
resourceType="AWS::S3::AccountPublicAccessBlock", resourceIds=["nope"]
|
||||
)
|
||||
assert not result["resourceIdentifiers"]
|
||||
|
||||
# Aggregated with incorrect info:
|
||||
result = config_client.list_aggregate_discovered_resources(
|
||||
ResourceType="AWS::S3::AccountPublicAccessBlock",
|
||||
ConfigurationAggregatorName="testing",
|
||||
Filters={"ResourceName": "nope"},
|
||||
)
|
||||
assert not result["ResourceIdentifiers"]
|
||||
result = config_client.list_aggregate_discovered_resources(
|
||||
ResourceType="AWS::S3::AccountPublicAccessBlock",
|
||||
ConfigurationAggregatorName="testing",
|
||||
Filters={"ResourceId": "nope"},
|
||||
)
|
||||
assert not result["ResourceIdentifiers"]
|
||||
result = config_client.list_aggregate_discovered_resources(
|
||||
ResourceType="AWS::S3::AccountPublicAccessBlock",
|
||||
ConfigurationAggregatorName="testing",
|
||||
Filters={"Region": "Nope"},
|
||||
)
|
||||
assert not result["ResourceIdentifiers"]
|
||||
|
||||
@mock_s3
|
||||
@mock_config
|
||||
def test_config_get_account_pab():
|
||||
from moto.s3.models import ACCOUNT_ID
|
||||
|
||||
client = boto3.client("s3control", region_name="us-west-2")
|
||||
config_client = boto3.client("config", region_name="us-west-2")
|
||||
|
||||
# Create the aggregator:
|
||||
account_aggregation_source = {
|
||||
"AccountIds": [ACCOUNT_ID],
|
||||
"AllAwsRegions": True,
|
||||
}
|
||||
config_client.put_configuration_aggregator(
|
||||
ConfigurationAggregatorName="testing",
|
||||
AccountAggregationSources=[account_aggregation_source],
|
||||
)
|
||||
|
||||
# Without a PAB in place:
|
||||
with assert_raises(ClientError) as ce:
|
||||
config_client.get_resource_config_history(
|
||||
resourceType="AWS::S3::AccountPublicAccessBlock", resourceId=ACCOUNT_ID
|
||||
)
|
||||
assert (
|
||||
ce.exception.response["Error"]["Code"] == "ResourceNotDiscoveredException"
|
||||
)
|
||||
# aggregate
|
||||
result = config_client.batch_get_resource_config(
|
||||
resourceKeys=[
|
||||
{
|
||||
"resourceType": "AWS::S3::AccountPublicAccessBlock",
|
||||
"resourceId": "ACCOUNT_ID",
|
||||
}
|
||||
]
|
||||
)
|
||||
assert not result["baseConfigurationItems"]
|
||||
result = config_client.batch_get_aggregate_resource_config(
|
||||
ConfigurationAggregatorName="testing",
|
||||
ResourceIdentifiers=[
|
||||
{
|
||||
"SourceAccountId": ACCOUNT_ID,
|
||||
"SourceRegion": "us-west-2",
|
||||
"ResourceId": ACCOUNT_ID,
|
||||
"ResourceType": "AWS::S3::AccountPublicAccessBlock",
|
||||
"ResourceName": "",
|
||||
}
|
||||
],
|
||||
)
|
||||
assert not result["BaseConfigurationItems"]
|
||||
|
||||
# Create a PAB:
|
||||
client.put_public_access_block(
|
||||
AccountId=ACCOUNT_ID,
|
||||
PublicAccessBlockConfiguration={
|
||||
"BlockPublicAcls": True,
|
||||
"IgnorePublicAcls": True,
|
||||
"BlockPublicPolicy": True,
|
||||
"RestrictPublicBuckets": True,
|
||||
},
|
||||
)
|
||||
|
||||
# Get the proper config:
|
||||
proper_config = {
|
||||
"blockPublicAcls": True,
|
||||
"ignorePublicAcls": True,
|
||||
"blockPublicPolicy": True,
|
||||
"restrictPublicBuckets": True,
|
||||
}
|
||||
result = config_client.get_resource_config_history(
|
||||
resourceType="AWS::S3::AccountPublicAccessBlock", resourceId=ACCOUNT_ID
|
||||
)
|
||||
assert (
|
||||
json.loads(result["configurationItems"][0]["configuration"])
|
||||
== proper_config
|
||||
)
|
||||
assert (
|
||||
result["configurationItems"][0]["accountId"]
|
||||
== result["configurationItems"][0]["resourceId"]
|
||||
== ACCOUNT_ID
|
||||
)
|
||||
result = config_client.batch_get_resource_config(
|
||||
resourceKeys=[
|
||||
{
|
||||
"resourceType": "AWS::S3::AccountPublicAccessBlock",
|
||||
"resourceId": ACCOUNT_ID,
|
||||
}
|
||||
]
|
||||
)
|
||||
assert len(result["baseConfigurationItems"]) == 1
|
||||
assert (
|
||||
json.loads(result["baseConfigurationItems"][0]["configuration"])
|
||||
== proper_config
|
||||
)
|
||||
assert (
|
||||
result["baseConfigurationItems"][0]["accountId"]
|
||||
== result["baseConfigurationItems"][0]["resourceId"]
|
||||
== ACCOUNT_ID
|
||||
)
|
||||
|
||||
for region in Session().get_available_regions("s3control"):
|
||||
result = config_client.batch_get_aggregate_resource_config(
|
||||
ConfigurationAggregatorName="testing",
|
||||
ResourceIdentifiers=[
|
||||
{
|
||||
"SourceAccountId": ACCOUNT_ID,
|
||||
"SourceRegion": region,
|
||||
"ResourceId": ACCOUNT_ID,
|
||||
"ResourceType": "AWS::S3::AccountPublicAccessBlock",
|
||||
"ResourceName": "",
|
||||
}
|
||||
],
|
||||
)
|
||||
assert len(result["BaseConfigurationItems"]) == 1
|
||||
assert (
|
||||
json.loads(result["BaseConfigurationItems"][0]["configuration"])
|
||||
== proper_config
|
||||
)
|
||||
|
||||
|
||||
@mock_s3_deprecated
|
||||
def test_ranged_get():
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue