Config - implement Organization Conformance Pack functionality (#3116)

* Add config.put_organization_conformance_pack

* Add config.describe_organization_conformance_packs

* Add config.get_organization_conformance_pack_detailed_status

* Add config.describe_organization_conformance_pack_statuses

* Add config.delete_organization_conformance_pack

* Add an update method to OrganizationConformancePack
This commit is contained in:
Anton Grübel 2020-07-13 10:30:55 +02:00 committed by GitHub
commit 55bb4eb08d
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 547 additions and 1 deletions

View file

@ -376,3 +376,19 @@ class InvalidResultTokenException(JsonRESTError):
super(InvalidResultTokenException, self).__init__(
"InvalidResultTokenException", message
)
class ValidationException(JsonRESTError):
code = 400
def __init__(self, message):
super(ValidationException, self).__init__("ValidationException", message)
class NoSuchOrganizationConformancePackException(JsonRESTError):
code = 400
def __init__(self, message):
super(NoSuchOrganizationConformancePackException, self).__init__(
"NoSuchOrganizationConformancePackException", message
)

View file

@ -41,6 +41,8 @@ from moto.config.exceptions import (
ResourceNotDiscoveredException,
TooManyResourceKeys,
InvalidResultTokenException,
ValidationException,
NoSuchOrganizationConformancePackException,
)
from moto.core import BaseBackend, BaseModel
@ -159,7 +161,8 @@ class ConfigEmptyDictable(BaseModel):
def to_dict(self):
data = {}
for item, value in self.__dict__.items():
if value is not None:
# ignore private attributes
if not item.startswith("_") and value is not None:
if isinstance(value, ConfigEmptyDictable):
data[
snake_to_camels(
@ -367,12 +370,56 @@ class ConfigAggregationAuthorization(ConfigEmptyDictable):
self.tags = tags or {}
class OrganizationConformancePack(ConfigEmptyDictable):
def __init__(
self,
region,
name,
delivery_s3_bucket,
delivery_s3_key_prefix=None,
input_parameters=None,
excluded_accounts=None,
):
super(OrganizationConformancePack, self).__init__(
capitalize_start=True, capitalize_arn=False
)
self._status = "CREATE_SUCCESSFUL"
self._unique_pack_name = "{0}-{1}".format(name, random_string())
self.conformance_pack_input_parameters = input_parameters or []
self.delivery_s3_bucket = delivery_s3_bucket
self.delivery_s3_key_prefix = delivery_s3_key_prefix
self.excluded_accounts = excluded_accounts or []
self.last_update_time = datetime2int(datetime.utcnow())
self.organization_conformance_pack_arn = "arn:aws:config:{0}:{1}:organization-conformance-pack/{2}".format(
region, DEFAULT_ACCOUNT_ID, self._unique_pack_name
)
self.organization_conformance_pack_name = name
def update(
self,
delivery_s3_bucket,
delivery_s3_key_prefix,
input_parameters,
excluded_accounts,
):
self._status = "UPDATE_SUCCESSFUL"
self.conformance_pack_input_parameters = input_parameters
self.delivery_s3_bucket = delivery_s3_bucket
self.delivery_s3_key_prefix = delivery_s3_key_prefix
self.excluded_accounts = excluded_accounts
self.last_update_time = datetime2int(datetime.utcnow())
class ConfigBackend(BaseBackend):
def __init__(self):
self.recorders = {}
self.delivery_channels = {}
self.config_aggregators = {}
self.aggregation_authorizations = {}
self.organization_conformance_packs = {}
@staticmethod
def _validate_resource_types(resource_list):
@ -1110,6 +1157,134 @@ class ConfigBackend(BaseBackend):
"FailedEvaluations": [],
} # At this time, moto is not adding failed evaluations.
def put_organization_conformance_pack(
self,
region,
name,
template_s3_uri,
template_body,
delivery_s3_bucket,
delivery_s3_key_prefix,
input_parameters,
excluded_accounts,
):
# a real validation of the content of the template is missing at the moment
if not template_s3_uri and not template_body:
raise ValidationException("Template body is invalid")
if not re.match(r"s3://.*", template_s3_uri):
raise ValidationException(
"1 validation error detected: "
"Value '{}' at 'templateS3Uri' failed to satisfy constraint: "
"Member must satisfy regular expression pattern: "
"s3://.*".format(template_s3_uri)
)
pack = self.organization_conformance_packs.get(name)
if pack:
pack.update(
delivery_s3_bucket=delivery_s3_bucket,
delivery_s3_key_prefix=delivery_s3_key_prefix,
input_parameters=input_parameters,
excluded_accounts=excluded_accounts,
)
else:
pack = OrganizationConformancePack(
region=region,
name=name,
delivery_s3_bucket=delivery_s3_bucket,
delivery_s3_key_prefix=delivery_s3_key_prefix,
input_parameters=input_parameters,
excluded_accounts=excluded_accounts,
)
self.organization_conformance_packs[name] = pack
return {
"OrganizationConformancePackArn": pack.organization_conformance_pack_arn
}
def describe_organization_conformance_packs(self, names):
packs = []
for name in names:
pack = self.organization_conformance_packs.get(name)
if not pack:
raise NoSuchOrganizationConformancePackException(
"One or more organization conformance packs with specified names are not present. "
"Ensure your names are correct and try your request again later."
)
packs.append(pack.to_dict())
return {"OrganizationConformancePacks": packs}
def describe_organization_conformance_pack_statuses(self, names):
packs = []
statuses = []
if names:
for name in names:
pack = self.organization_conformance_packs.get(name)
if not pack:
raise NoSuchOrganizationConformancePackException(
"One or more organization conformance packs with specified names are not present. "
"Ensure your names are correct and try your request again later."
)
packs.append(pack)
else:
packs = list(self.organization_conformance_packs.values())
for pack in packs:
statuses.append(
{
"OrganizationConformancePackName": pack.organization_conformance_pack_name,
"Status": pack._status,
"LastUpdateTime": pack.last_update_time,
}
)
return {"OrganizationConformancePackStatuses": statuses}
def get_organization_conformance_pack_detailed_status(self, name):
pack = self.organization_conformance_packs.get(name)
if not pack:
raise NoSuchOrganizationConformancePackException(
"One or more organization conformance packs with specified names are not present. "
"Ensure your names are correct and try your request again later."
)
# actually here would be a list of all accounts in the organization
statuses = [
{
"AccountId": DEFAULT_ACCOUNT_ID,
"ConformancePackName": "OrgConformsPack-{0}".format(
pack._unique_pack_name
),
"Status": pack._status,
"LastUpdateTime": datetime2int(datetime.utcnow()),
}
]
return {"OrganizationConformancePackDetailedStatuses": statuses}
def delete_organization_conformance_pack(self, name):
pack = self.organization_conformance_packs.get(name)
if not pack:
raise NoSuchOrganizationConformancePackException(
"Could not find an OrganizationConformancePack for given request with resourceName {}".format(
name
)
)
self.organization_conformance_packs.pop(name)
config_backends = {}
for region in Session().get_available_regions("config"):

View file

@ -159,3 +159,46 @@ class ConfigResponse(BaseResponse):
self._get_param("TestMode"),
)
return json.dumps(evaluations)
def put_organization_conformance_pack(self):
conformance_pack = self.config_backend.put_organization_conformance_pack(
region=self.region,
name=self._get_param("OrganizationConformancePackName"),
template_s3_uri=self._get_param("TemplateS3Uri"),
template_body=self._get_param("TemplateBody"),
delivery_s3_bucket=self._get_param("DeliveryS3Bucket"),
delivery_s3_key_prefix=self._get_param("DeliveryS3KeyPrefix"),
input_parameters=self._get_param("ConformancePackInputParameters"),
excluded_accounts=self._get_param("ExcludedAccounts"),
)
return json.dumps(conformance_pack)
def describe_organization_conformance_packs(self):
conformance_packs = self.config_backend.describe_organization_conformance_packs(
self._get_param("OrganizationConformancePackNames")
)
return json.dumps(conformance_packs)
def describe_organization_conformance_pack_statuses(self):
statuses = self.config_backend.describe_organization_conformance_pack_statuses(
self._get_param("OrganizationConformancePackNames")
)
return json.dumps(statuses)
def get_organization_conformance_pack_detailed_status(self):
# 'Filters' parameter is not implemented yet
statuses = self.config_backend.get_organization_conformance_pack_detailed_status(
self._get_param("OrganizationConformancePackName")
)
return json.dumps(statuses)
def delete_organization_conformance_pack(self):
self.config_backend.delete_organization_conformance_pack(
self._get_param("OrganizationConformancePackName")
)
return ""