Merge pull request #2443 from mikegrima/configquery

Adding support for querying AWS Config
This commit is contained in:
Mike Grima 2019-10-03 13:38:43 -07:00 committed by GitHub
commit d925335f05
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
10 changed files with 675 additions and 2 deletions

View file

@ -230,3 +230,27 @@ class TooManyTags(JsonRESTError):
super(TooManyTags, self).__init__(
'ValidationException', "1 validation error detected: Value '{}' at '{}' failed to satisfy "
"constraint: Member must have length less than or equal to 50.".format(tags, param))
class InvalidResourceParameters(JsonRESTError):
code = 400
def __init__(self):
super(InvalidResourceParameters, self).__init__('ValidationException', 'Both Resource ID and Resource Name '
'cannot be specified in the request')
class InvalidLimit(JsonRESTError):
code = 400
def __init__(self, value):
super(InvalidLimit, self).__init__('ValidationException', 'Value \'{value}\' at \'limit\' failed to satisify constraint: Member'
' must have value less than or equal to 100'.format(value=value))
class TooManyResourceIds(JsonRESTError):
code = 400
def __init__(self):
super(TooManyResourceIds, self).__init__('ValidationException', "The specified list had more than 20 resource ID's. "
"It must have '20' or less items")

View file

@ -17,11 +17,12 @@ from moto.config.exceptions import InvalidResourceTypeException, InvalidDelivery
InvalidSNSTopicARNException, MaxNumberOfDeliveryChannelsExceededException, NoAvailableDeliveryChannelException, \
NoSuchDeliveryChannelException, LastDeliveryChannelDeleteFailedException, TagKeyTooBig, \
TooManyTags, TagValueTooBig, TooManyAccountSources, InvalidParameterValueException, InvalidNextTokenException, \
NoSuchConfigurationAggregatorException, InvalidTagCharacters, DuplicateTags
NoSuchConfigurationAggregatorException, InvalidTagCharacters, DuplicateTags, InvalidLimit, InvalidResourceParameters, TooManyResourceIds
from moto.core import BaseBackend, BaseModel
from moto.s3.config import s3_config_query
DEFAULT_ACCOUNT_ID = 123456789012
DEFAULT_ACCOUNT_ID = '123456789012'
POP_STRINGS = [
'capitalizeStart',
'CapitalizeStart',
@ -32,6 +33,11 @@ POP_STRINGS = [
]
DEFAULT_PAGE_SIZE = 100
# Map the Config resource type to a backend:
RESOURCE_MAP = {
'AWS::S3::Bucket': s3_config_query
}
def datetime2int(date):
return int(time.mktime(date.timetuple()))
@ -680,6 +686,110 @@ class ConfigBackend(BaseBackend):
del self.delivery_channels[channel_name]
def list_discovered_resources(self, resource_type, backend_region, resource_ids, resource_name, limit, next_token):
"""This will query against the mocked AWS Config listing function that must exist for the resource backend.
:param resource_type:
:param backend_region:
:param ids:
:param name:
:param limit:
:param next_token:
:return:
"""
identifiers = []
new_token = None
limit = limit or DEFAULT_PAGE_SIZE
if limit > DEFAULT_PAGE_SIZE:
raise InvalidLimit(limit)
if resource_ids and resource_name:
raise InvalidResourceParameters()
# Only 20 maximum Resource IDs:
if resource_ids and len(resource_ids) > 20:
raise TooManyResourceIds()
# If the resource type exists and the backend region is implemented in moto, then
# call upon the resource type's Config Query class to retrieve the list of resources that match the criteria:
if RESOURCE_MAP.get(resource_type, {}):
# Is this a global resource type? -- if so, re-write the region to 'global':
if RESOURCE_MAP[resource_type].backends.get('global'):
backend_region = 'global'
# For non-aggregated queries, the we only care about the backend_region. Need to verify that moto has implemented
# the region for the given backend:
if RESOURCE_MAP[resource_type].backends.get(backend_region):
# Fetch the resources for the backend's region:
identifiers, new_token = \
RESOURCE_MAP[resource_type].list_config_service_resources(resource_ids, resource_name, limit, next_token)
result = {'resourceIdentifiers': [
{
'resourceType': identifier['type'],
'resourceId': identifier['id'],
'resourceName': identifier['name']
}
for identifier in identifiers]
}
if new_token:
result['nextToken'] = new_token
return result
def list_aggregate_discovered_resources(self, aggregator_name, resource_type, filters, limit, next_token):
"""This will query against the mocked AWS Config listing function that must exist for the resource backend.
As far a moto goes -- the only real difference between this function and the `list_discovered_resources` function is that
this will require a Config Aggregator be set up a priori and can search based on resource regions.
:param aggregator_name:
:param resource_type:
:param filters:
:param limit:
:param next_token:
:return:
"""
if not self.config_aggregators.get(aggregator_name):
raise NoSuchConfigurationAggregatorException()
identifiers = []
new_token = None
filters = filters or {}
limit = limit or DEFAULT_PAGE_SIZE
if limit > DEFAULT_PAGE_SIZE:
raise InvalidLimit(limit)
# If the resource type exists and the backend region is implemented in moto, then
# call upon the resource type's Config Query class to retrieve the list of resources that match the criteria:
if RESOURCE_MAP.get(resource_type, {}):
# We only care about a filter's Region, Resource Name, and Resource ID:
resource_region = filters.get('Region')
resource_id = [filters['ResourceId']] if filters.get('ResourceId') else None
resource_name = filters.get('ResourceName')
identifiers, new_token = \
RESOURCE_MAP[resource_type].list_config_service_resources(resource_id, resource_name, limit, next_token,
resource_region=resource_region)
result = {'ResourceIdentifiers': [
{
'SourceAccountId': DEFAULT_ACCOUNT_ID,
'SourceRegion': identifier['region'],
'ResourceType': identifier['type'],
'ResourceId': identifier['id'],
'ResourceName': identifier['name']
}
for identifier in identifiers]
}
if new_token:
result['NextToken'] = new_token
return result
config_backends = {}
boto3_session = Session()

View file

@ -84,3 +84,34 @@ class ConfigResponse(BaseResponse):
def stop_configuration_recorder(self):
self.config_backend.stop_configuration_recorder(self._get_param('ConfigurationRecorderName'))
return ""
def list_discovered_resources(self):
schema = self.config_backend.list_discovered_resources(self._get_param('resourceType'),
self.region,
self._get_param('resourceIds'),
self._get_param('resourceName'),
self._get_param('limit'),
self._get_param('nextToken'))
return json.dumps(schema)
def list_aggregate_discovered_resources(self):
schema = self.config_backend.list_aggregate_discovered_resources(self._get_param('ConfigurationAggregatorName'),
self._get_param('ResourceType'),
self._get_param('Filters'),
self._get_param('Limit'),
self._get_param('NextToken'))
return json.dumps(schema)
"""
def batch_get_resource_config(self):
# TODO implement me!
return ""
def batch_get_aggregate_resource_config(self):
# TODO implement me!
return ""
def get_resource_config_history(self):
# TODO implement me!
return ""
"""

View file

@ -104,3 +104,11 @@ class AuthFailureError(RESTError):
super(AuthFailureError, self).__init__(
'AuthFailure',
"AWS was not able to validate the provided access credentials")
class InvalidNextTokenException(JsonRESTError):
"""For AWS Config resource listing. This will be used by many different resource types, and so it is in moto.core."""
code = 400
def __init__(self):
super(InvalidNextTokenException, self).__init__('InvalidNextTokenException', 'The nextToken provided is invalid')

View file

@ -538,6 +538,65 @@ class BaseBackend(object):
else:
return HttprettyMockAWS({'global': self})
# def list_config_service_resources(self, resource_ids, resource_name, limit, next_token):
# """For AWS Config. This will list all of the resources of the given type and optional resource name and region"""
# raise NotImplementedError()
class ConfigQueryModel(object):
def __init__(self, backends):
"""Inits based on the resource type's backends (1 for each region if applicable)"""
self.backends = backends
def list_config_service_resources(self, resource_ids, resource_name, limit, next_token, backend_region=None, resource_region=None):
"""For AWS Config. This will list all of the resources of the given type and optional resource name and region.
This supports both aggregated and non-aggregated listing. The following notes the difference:
- Non Aggregated Listing -
This only lists resources within a region. The way that this is implemented in moto is based on the region
for the resource backend.
You must set the `backend_region` to the region that the API request arrived from. resource_region can be set to `None`.
- Aggregated Listing -
This lists resources from all potential regional backends. For non-global resource types, this should collect a full
list of resources from all the backends, and then be able to filter from the resource region. This is because an
aggregator can aggregate resources from multiple regions. In moto, aggregated regions will *assume full aggregation
from all resources in all regions for a given resource type*.
The `backend_region` should be set to `None` for these queries, and the `resource_region` should optionally be set to
the `Filters` region parameter to filter out resources that reside in a specific region.
For aggregated listings, pagination logic should be set such that the next page can properly span all the region backends.
As such, the proper way to implement is to first obtain a full list of results from all the region backends, and then filter
from there. It may be valuable to make this a concatenation of the region and resource name.
:param resource_region:
:param resource_ids:
:param resource_name:
:param limit:
:param next_token:
:param backend_region: The region for the backend to pull results from. Set to `None` if this is an aggregated query.
:return: This should return a list of Dicts that have the following fields:
[
{
'type': 'AWS::The AWS Config data type',
'name': 'The name of the resource',
'id': 'The ID of the resource',
'region': 'The region of the resource -- if global, then you may want to have the calling logic pass in the
aggregator region in for the resource region -- or just us-east-1 :P'
}
, ...
]
"""
raise NotImplementedError()
def get_config_resource(self):
"""TODO implement me."""
raise NotImplementedError()
class base_decorator(object):
mock_backend = MockAWS

70
moto/s3/config.py Normal file
View file

@ -0,0 +1,70 @@
from moto.core.exceptions import InvalidNextTokenException
from moto.core.models import ConfigQueryModel
from moto.s3 import s3_backends
class S3ConfigQuery(ConfigQueryModel):
def list_config_service_resources(self, resource_ids, resource_name, limit, next_token, backend_region=None, resource_region=None):
# S3 need not care about "backend_region" as S3 is global. The resource_region only matters for aggregated queries as you can
# filter on bucket regions for them. For other resource types, you would need to iterate appropriately for the backend_region.
# Resource IDs are the same as S3 bucket names
# For aggregation -- did we get both a resource ID and a resource name?
if resource_ids and resource_name:
# If the values are different, then return an empty list:
if resource_name not in resource_ids:
return [], None
# If no filter was passed in for resource names/ids then return them all:
if not resource_ids and not resource_name:
bucket_list = list(self.backends['global'].buckets.keys())
else:
# Match the resource name / ID:
bucket_list = []
filter_buckets = [resource_name] if resource_name else resource_ids
for bucket in self.backends['global'].buckets.keys():
if bucket in filter_buckets:
bucket_list.append(bucket)
# If a resource_region was supplied (aggregated only), then filter on bucket region too:
if resource_region:
region_buckets = []
for bucket in bucket_list:
if self.backends['global'].buckets[bucket].region_name == resource_region:
region_buckets.append(bucket)
bucket_list = region_buckets
if not bucket_list:
return [], None
# Pagination logic:
sorted_buckets = sorted(bucket_list)
new_token = None
# Get the start:
if not next_token:
start = 0
else:
# Tokens for this moto feature is just the bucket name:
# For OTHER non-global resource types, it's the region concatenated with the resource ID.
if next_token not in sorted_buckets:
raise InvalidNextTokenException()
start = sorted_buckets.index(next_token)
# Get the list of items to collect:
bucket_list = sorted_buckets[start:(start + limit)]
if len(sorted_buckets) > (start + limit):
new_token = sorted_buckets[start + limit]
return [{'type': 'AWS::S3::Bucket', 'id': bucket, 'name': bucket, 'region': self.backends['global'].buckets[bucket].region_name}
for bucket in bucket_list], new_token
s3_config_query = S3ConfigQuery(s3_backends)