AWS Config Aggregator support

- Added support for the following APIs:
	- put_configuration_aggregator
	- describe_configuration_aggregators
	- delete_configuration_aggregator
	- put_aggregation_authorization
	- describe_aggregation_authorizations
	- delete_aggregation_authorization
This commit is contained in:
Mike Grima 2019-07-29 16:36:57 -07:00
commit 188969a048
7 changed files with 1090 additions and 12 deletions

View file

@ -123,6 +123,526 @@ def test_put_configuration_recorder():
assert "maximum number of configuration recorders: 1 is reached." in ce.exception.response['Error']['Message']
@mock_config
def test_put_configuration_aggregator():
client = boto3.client('config', region_name='us-west-2')
# With too many aggregation sources:
with assert_raises(ClientError) as ce:
client.put_configuration_aggregator(
ConfigurationAggregatorName='testing',
AccountAggregationSources=[
{
'AccountIds': [
'012345678910',
'111111111111',
'222222222222'
],
'AwsRegions': [
'us-east-1',
'us-west-2'
]
},
{
'AccountIds': [
'012345678910',
'111111111111',
'222222222222'
],
'AwsRegions': [
'us-east-1',
'us-west-2'
]
}
]
)
assert 'Member must have length less than or equal to 1' in ce.exception.response['Error']['Message']
assert ce.exception.response['Error']['Code'] == 'ValidationException'
# With an invalid region config (no regions defined):
with assert_raises(ClientError) as ce:
client.put_configuration_aggregator(
ConfigurationAggregatorName='testing',
AccountAggregationSources=[
{
'AccountIds': [
'012345678910',
'111111111111',
'222222222222'
],
'AllAwsRegions': False
}
]
)
assert 'Your request does not specify any regions' in ce.exception.response['Error']['Message']
assert ce.exception.response['Error']['Code'] == 'InvalidParameterValueException'
with assert_raises(ClientError) as ce:
client.put_configuration_aggregator(
ConfigurationAggregatorName='testing',
OrganizationAggregationSource={
'RoleArn': 'arn:aws:iam::012345678910:role/SomeRole'
}
)
assert 'Your request does not specify any regions' in ce.exception.response['Error']['Message']
assert ce.exception.response['Error']['Code'] == 'InvalidParameterValueException'
# With both region flags defined:
with assert_raises(ClientError) as ce:
client.put_configuration_aggregator(
ConfigurationAggregatorName='testing',
AccountAggregationSources=[
{
'AccountIds': [
'012345678910',
'111111111111',
'222222222222'
],
'AwsRegions': [
'us-east-1',
'us-west-2'
],
'AllAwsRegions': True
}
]
)
assert 'You must choose one of these options' in ce.exception.response['Error']['Message']
assert ce.exception.response['Error']['Code'] == 'InvalidParameterValueException'
with assert_raises(ClientError) as ce:
client.put_configuration_aggregator(
ConfigurationAggregatorName='testing',
OrganizationAggregationSource={
'RoleArn': 'arn:aws:iam::012345678910:role/SomeRole',
'AwsRegions': [
'us-east-1',
'us-west-2'
],
'AllAwsRegions': True
}
)
assert 'You must choose one of these options' in ce.exception.response['Error']['Message']
assert ce.exception.response['Error']['Code'] == 'InvalidParameterValueException'
# Name too long:
with assert_raises(ClientError) as ce:
client.put_configuration_aggregator(
ConfigurationAggregatorName='a' * 257,
AccountAggregationSources=[
{
'AccountIds': [
'012345678910',
],
'AllAwsRegions': True
}
]
)
assert 'configurationAggregatorName' in ce.exception.response['Error']['Message']
assert ce.exception.response['Error']['Code'] == 'ValidationException'
# Too many tags (>50):
with assert_raises(ClientError) as ce:
client.put_configuration_aggregator(
ConfigurationAggregatorName='testing',
AccountAggregationSources=[
{
'AccountIds': [
'012345678910',
],
'AllAwsRegions': True
}
],
Tags=[{'Key': '{}'.format(x), 'Value': '{}'.format(x)} for x in range(0, 51)]
)
assert 'Member must have length less than or equal to 50' in ce.exception.response['Error']['Message']
assert ce.exception.response['Error']['Code'] == 'ValidationException'
# Tag key is too big (>128 chars):
with assert_raises(ClientError) as ce:
client.put_configuration_aggregator(
ConfigurationAggregatorName='testing',
AccountAggregationSources=[
{
'AccountIds': [
'012345678910',
],
'AllAwsRegions': True
}
],
Tags=[{'Key': 'a' * 129, 'Value': 'a'}]
)
assert 'Member must have length less than or equal to 128' in ce.exception.response['Error']['Message']
assert ce.exception.response['Error']['Code'] == 'ValidationException'
# Tag value is too big (>256 chars):
with assert_raises(ClientError) as ce:
client.put_configuration_aggregator(
ConfigurationAggregatorName='testing',
AccountAggregationSources=[
{
'AccountIds': [
'012345678910',
],
'AllAwsRegions': True
}
],
Tags=[{'Key': 'tag', 'Value': 'a' * 257}]
)
assert 'Member must have length less than or equal to 256' in ce.exception.response['Error']['Message']
assert ce.exception.response['Error']['Code'] == 'ValidationException'
# Duplicate Tags:
with assert_raises(ClientError) as ce:
client.put_configuration_aggregator(
ConfigurationAggregatorName='testing',
AccountAggregationSources=[
{
'AccountIds': [
'012345678910',
],
'AllAwsRegions': True
}
],
Tags=[{'Key': 'a', 'Value': 'a'}, {'Key': 'a', 'Value': 'a'}]
)
assert 'Duplicate tag keys found.' in ce.exception.response['Error']['Message']
assert ce.exception.response['Error']['Code'] == 'InvalidInput'
# Invalid characters in the tag key:
with assert_raises(ClientError) as ce:
client.put_configuration_aggregator(
ConfigurationAggregatorName='testing',
AccountAggregationSources=[
{
'AccountIds': [
'012345678910',
],
'AllAwsRegions': True
}
],
Tags=[{'Key': '!', 'Value': 'a'}]
)
assert 'Member must satisfy regular expression pattern:' in ce.exception.response['Error']['Message']
assert ce.exception.response['Error']['Code'] == 'ValidationException'
# If it contains both the AccountAggregationSources and the OrganizationAggregationSource
with assert_raises(ClientError) as ce:
client.put_configuration_aggregator(
ConfigurationAggregatorName='testing',
AccountAggregationSources=[
{
'AccountIds': [
'012345678910',
],
'AllAwsRegions': False
}
],
OrganizationAggregationSource={
'RoleArn': 'arn:aws:iam::012345678910:role/SomeRole',
'AllAwsRegions': False
}
)
assert 'AccountAggregationSource and the OrganizationAggregationSource' in ce.exception.response['Error']['Message']
assert ce.exception.response['Error']['Code'] == 'InvalidParameterValueException'
# If it contains neither:
with assert_raises(ClientError) as ce:
client.put_configuration_aggregator(
ConfigurationAggregatorName='testing',
)
assert 'AccountAggregationSource or the OrganizationAggregationSource' in ce.exception.response['Error']['Message']
assert ce.exception.response['Error']['Code'] == 'InvalidParameterValueException'
# Just make one:
account_aggregation_source = {
'AccountIds': [
'012345678910',
'111111111111',
'222222222222'
],
'AwsRegions': [
'us-east-1',
'us-west-2'
],
'AllAwsRegions': False
}
result = client.put_configuration_aggregator(
ConfigurationAggregatorName='testing',
AccountAggregationSources=[account_aggregation_source],
)
assert result['ConfigurationAggregator']['ConfigurationAggregatorName'] == 'testing'
assert result['ConfigurationAggregator']['AccountAggregationSources'] == [account_aggregation_source]
assert 'arn:aws:config:us-west-2:123456789012:config-aggregator/config-aggregator-' in \
result['ConfigurationAggregator']['ConfigurationAggregatorArn']
assert result['ConfigurationAggregator']['CreationTime'] == result['ConfigurationAggregator']['LastUpdatedTime']
# Update the existing one:
original_arn = result['ConfigurationAggregator']['ConfigurationAggregatorArn']
account_aggregation_source.pop('AwsRegions')
account_aggregation_source['AllAwsRegions'] = True
result = client.put_configuration_aggregator(
ConfigurationAggregatorName='testing',
AccountAggregationSources=[account_aggregation_source]
)
assert result['ConfigurationAggregator']['ConfigurationAggregatorName'] == 'testing'
assert result['ConfigurationAggregator']['AccountAggregationSources'] == [account_aggregation_source]
assert result['ConfigurationAggregator']['ConfigurationAggregatorArn'] == original_arn
# Make an org one:
result = client.put_configuration_aggregator(
ConfigurationAggregatorName='testingOrg',
OrganizationAggregationSource={
'RoleArn': 'arn:aws:iam::012345678910:role/SomeRole',
'AwsRegions': ['us-east-1', 'us-west-2']
}
)
assert result['ConfigurationAggregator']['ConfigurationAggregatorName'] == 'testingOrg'
assert result['ConfigurationAggregator']['OrganizationAggregationSource'] == {
'RoleArn': 'arn:aws:iam::012345678910:role/SomeRole',
'AwsRegions': [
'us-east-1',
'us-west-2'
],
'AllAwsRegions': False
}
@mock_config
def test_describe_configuration_aggregators():
client = boto3.client('config', region_name='us-west-2')
# Without any config aggregators:
assert not client.describe_configuration_aggregators()['ConfigurationAggregators']
# Make 10 config aggregators:
for x in range(0, 10):
client.put_configuration_aggregator(
ConfigurationAggregatorName='testing{}'.format(x),
AccountAggregationSources=[
{
'AccountIds': [
'012345678910',
],
'AllAwsRegions': True
}
]
)
# Describe with an incorrect name:
with assert_raises(ClientError) as ce:
client.describe_configuration_aggregators(ConfigurationAggregatorNames=['DoesNotExist'])
assert 'The configuration aggregator does not exist.' in ce.exception.response['Error']['Message']
assert ce.exception.response['Error']['Code'] == 'NoSuchConfigurationAggregatorException'
# Error describe with more than 1 item in the list:
with assert_raises(ClientError) as ce:
client.describe_configuration_aggregators(ConfigurationAggregatorNames=['testing0', 'DoesNotExist'])
assert 'At least one of the configuration aggregators does not exist.' in ce.exception.response['Error']['Message']
assert ce.exception.response['Error']['Code'] == 'NoSuchConfigurationAggregatorException'
# Get the normal list:
result = client.describe_configuration_aggregators()
assert not result.get('NextToken')
assert len(result['ConfigurationAggregators']) == 10
# Test filtered list:
agg_names = ['testing0', 'testing1', 'testing2']
result = client.describe_configuration_aggregators(ConfigurationAggregatorNames=agg_names)
assert not result.get('NextToken')
assert len(result['ConfigurationAggregators']) == 3
assert [agg['ConfigurationAggregatorName'] for agg in result['ConfigurationAggregators']] == agg_names
# Test Pagination:
result = client.describe_configuration_aggregators(Limit=4)
assert len(result['ConfigurationAggregators']) == 4
assert result['NextToken'] == 'testing4'
assert [agg['ConfigurationAggregatorName'] for agg in result['ConfigurationAggregators']] == \
['testing{}'.format(x) for x in range(0, 4)]
result = client.describe_configuration_aggregators(Limit=4, NextToken='testing4')
assert len(result['ConfigurationAggregators']) == 4
assert result['NextToken'] == 'testing8'
assert [agg['ConfigurationAggregatorName'] for agg in result['ConfigurationAggregators']] == \
['testing{}'.format(x) for x in range(4, 8)]
result = client.describe_configuration_aggregators(Limit=4, NextToken='testing8')
assert len(result['ConfigurationAggregators']) == 2
assert not result.get('NextToken')
assert [agg['ConfigurationAggregatorName'] for agg in result['ConfigurationAggregators']] == \
['testing{}'.format(x) for x in range(8, 10)]
# Test Pagination with Filtering:
result = client.describe_configuration_aggregators(ConfigurationAggregatorNames=['testing2', 'testing4'], Limit=1)
assert len(result['ConfigurationAggregators']) == 1
assert result['NextToken'] == 'testing4'
assert result['ConfigurationAggregators'][0]['ConfigurationAggregatorName'] == 'testing2'
result = client.describe_configuration_aggregators(ConfigurationAggregatorNames=['testing2', 'testing4'], Limit=1, NextToken='testing4')
assert not result.get('NextToken')
assert result['ConfigurationAggregators'][0]['ConfigurationAggregatorName'] == 'testing4'
# Test with an invalid filter:
with assert_raises(ClientError) as ce:
client.describe_configuration_aggregators(NextToken='WRONG')
assert 'The nextToken provided is invalid' == ce.exception.response['Error']['Message']
assert ce.exception.response['Error']['Code'] == 'InvalidNextTokenException'
@mock_config
def test_put_aggregation_authorization():
client = boto3.client('config', region_name='us-west-2')
# Too many tags (>50):
with assert_raises(ClientError) as ce:
client.put_aggregation_authorization(
AuthorizedAccountId='012345678910',
AuthorizedAwsRegion='us-west-2',
Tags=[{'Key': '{}'.format(x), 'Value': '{}'.format(x)} for x in range(0, 51)]
)
assert 'Member must have length less than or equal to 50' in ce.exception.response['Error']['Message']
assert ce.exception.response['Error']['Code'] == 'ValidationException'
# Tag key is too big (>128 chars):
with assert_raises(ClientError) as ce:
client.put_aggregation_authorization(
AuthorizedAccountId='012345678910',
AuthorizedAwsRegion='us-west-2',
Tags=[{'Key': 'a' * 129, 'Value': 'a'}]
)
assert 'Member must have length less than or equal to 128' in ce.exception.response['Error']['Message']
assert ce.exception.response['Error']['Code'] == 'ValidationException'
# Tag value is too big (>256 chars):
with assert_raises(ClientError) as ce:
client.put_aggregation_authorization(
AuthorizedAccountId='012345678910',
AuthorizedAwsRegion='us-west-2',
Tags=[{'Key': 'tag', 'Value': 'a' * 257}]
)
assert 'Member must have length less than or equal to 256' in ce.exception.response['Error']['Message']
assert ce.exception.response['Error']['Code'] == 'ValidationException'
# Duplicate Tags:
with assert_raises(ClientError) as ce:
client.put_aggregation_authorization(
AuthorizedAccountId='012345678910',
AuthorizedAwsRegion='us-west-2',
Tags=[{'Key': 'a', 'Value': 'a'}, {'Key': 'a', 'Value': 'a'}]
)
assert 'Duplicate tag keys found.' in ce.exception.response['Error']['Message']
assert ce.exception.response['Error']['Code'] == 'InvalidInput'
# Invalid characters in the tag key:
with assert_raises(ClientError) as ce:
client.put_aggregation_authorization(
AuthorizedAccountId='012345678910',
AuthorizedAwsRegion='us-west-2',
Tags=[{'Key': '!', 'Value': 'a'}]
)
assert 'Member must satisfy regular expression pattern:' in ce.exception.response['Error']['Message']
assert ce.exception.response['Error']['Code'] == 'ValidationException'
# Put a normal one there:
result = client.put_aggregation_authorization(AuthorizedAccountId='012345678910', AuthorizedAwsRegion='us-east-1',
Tags=[{'Key': 'tag', 'Value': 'a'}])
assert result['AggregationAuthorization']['AggregationAuthorizationArn'] == 'arn:aws:config:us-west-2:123456789012:' \
'aggregation-authorization/012345678910/us-east-1'
assert result['AggregationAuthorization']['AuthorizedAccountId'] == '012345678910'
assert result['AggregationAuthorization']['AuthorizedAwsRegion'] == 'us-east-1'
assert isinstance(result['AggregationAuthorization']['CreationTime'], datetime)
creation_date = result['AggregationAuthorization']['CreationTime']
# And again:
result = client.put_aggregation_authorization(AuthorizedAccountId='012345678910', AuthorizedAwsRegion='us-east-1')
assert result['AggregationAuthorization']['AggregationAuthorizationArn'] == 'arn:aws:config:us-west-2:123456789012:' \
'aggregation-authorization/012345678910/us-east-1'
assert result['AggregationAuthorization']['AuthorizedAccountId'] == '012345678910'
assert result['AggregationAuthorization']['AuthorizedAwsRegion'] == 'us-east-1'
assert result['AggregationAuthorization']['CreationTime'] == creation_date
@mock_config
def test_describe_aggregation_authorizations():
client = boto3.client('config', region_name='us-west-2')
# With no aggregation authorizations:
assert not client.describe_aggregation_authorizations()['AggregationAuthorizations']
# Make 10 account authorizations:
for i in range(0, 10):
client.put_aggregation_authorization(AuthorizedAccountId='{}'.format(str(i) * 12), AuthorizedAwsRegion='us-west-2')
result = client.describe_aggregation_authorizations()
assert len(result['AggregationAuthorizations']) == 10
assert not result.get('NextToken')
for i in range(0, 10):
assert result['AggregationAuthorizations'][i]['AuthorizedAccountId'] == str(i) * 12
# Test Pagination:
result = client.describe_aggregation_authorizations(Limit=4)
assert len(result['AggregationAuthorizations']) == 4
assert result['NextToken'] == ('4' * 12) + '/us-west-2'
assert [auth['AuthorizedAccountId'] for auth in result['AggregationAuthorizations']] == ['{}'.format(str(x) * 12) for x in range(0, 4)]
result = client.describe_aggregation_authorizations(Limit=4, NextToken=('4' * 12) + '/us-west-2')
assert len(result['AggregationAuthorizations']) == 4
assert result['NextToken'] == ('8' * 12) + '/us-west-2'
assert [auth['AuthorizedAccountId'] for auth in result['AggregationAuthorizations']] == ['{}'.format(str(x) * 12) for x in range(4, 8)]
result = client.describe_aggregation_authorizations(Limit=4, NextToken=('8' * 12) + '/us-west-2')
assert len(result['AggregationAuthorizations']) == 2
assert not result.get('NextToken')
assert [auth['AuthorizedAccountId'] for auth in result['AggregationAuthorizations']] == ['{}'.format(str(x) * 12) for x in range(8, 10)]
# Test with an invalid filter:
with assert_raises(ClientError) as ce:
client.describe_aggregation_authorizations(NextToken='WRONG')
assert 'The nextToken provided is invalid' == ce.exception.response['Error']['Message']
assert ce.exception.response['Error']['Code'] == 'InvalidNextTokenException'
@mock_config
def test_delete_aggregation_authorization():
client = boto3.client('config', region_name='us-west-2')
client.put_aggregation_authorization(AuthorizedAccountId='012345678910', AuthorizedAwsRegion='us-west-2')
# Delete it:
client.delete_aggregation_authorization(AuthorizedAccountId='012345678910', AuthorizedAwsRegion='us-west-2')
# Verify that none are there:
assert not client.describe_aggregation_authorizations()['AggregationAuthorizations']
# Try it again -- nothing should happen:
client.delete_aggregation_authorization(AuthorizedAccountId='012345678910', AuthorizedAwsRegion='us-west-2')
@mock_config
def test_delete_configuration_aggregator():
client = boto3.client('config', region_name='us-west-2')
client.put_configuration_aggregator(
ConfigurationAggregatorName='testing',
AccountAggregationSources=[
{
'AccountIds': [
'012345678910',
],
'AllAwsRegions': True
}
]
)
client.delete_configuration_aggregator(ConfigurationAggregatorName='testing')
# And again to confirm that it's deleted:
with assert_raises(ClientError) as ce:
client.delete_configuration_aggregator(ConfigurationAggregatorName='testing')
assert 'The configuration aggregator does not exist.' in ce.exception.response['Error']['Message']
assert ce.exception.response['Error']['Code'] == 'NoSuchConfigurationAggregatorException'
@mock_config
def test_describe_configurations():
client = boto3.client('config', region_name='us-west-2')