Add Tag support for other resource types then an account (#3982)

* - Adding checking for resource type in tag functions
- Adding TargetNotFoundException when no resource found
- Adding support for tags for root OU, OU and Policies
- Adding tests covering the new code
- Adding test for deletion of a tag

* fixed linting issue

* - renamed helper function to a more logical name
- added tests for helper function
- fixed bugs in tests for tag functions

Co-authored-by: Sjoerd Tromp <stromp@schubergphilis.com>
This commit is contained in:
stromp 2021-06-05 16:12:17 +02:00 committed by GitHub
commit 9e4972b43f
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 236 additions and 34 deletions

View file

@ -71,7 +71,7 @@ class FakeAccount(BaseModel):
self.joined_method = "CREATED"
self.parent_id = organization.root_id
self.attached_policies = []
self.tags = {}
self.tags = {tag["Key"]: tag["Value"] for tag in kwargs.get("Tags", [])}
@property
def arn(self):
@ -114,6 +114,7 @@ class FakeOrganizationalUnit(BaseModel):
self.parent_id = kwargs.get("ParentId")
self._arn_format = utils.OU_ARN_FORMAT
self.attached_policies = []
self.tags = {tag["Key"]: tag["Value"] for tag in kwargs.get("Tags", [])}
@property
def arn(self):
@ -143,6 +144,7 @@ class FakeRoot(FakeOrganizationalUnit):
self.policy_types = [{"Type": "SERVICE_CONTROL_POLICY", "Status": "ENABLED"}]
self._arn_format = utils.ROOT_ARN_FORMAT
self.attached_policies = []
self.tags = {tag["Key"]: tag["Value"] for tag in kwargs.get("Tags", [])}
def describe(self):
return {
@ -654,6 +656,25 @@ class OrganizationsBackend(BaseBackend):
]
)
def _get_resource_for_tagging(self, resource_id):
if re.compile(utils.OU_ID_REGEX).fullmatch(resource_id) or re.fullmatch(
utils.ROOT_ID_REGEX, resource_id
):
resource = next((a for a in self.ou if a.id == resource_id), None)
elif re.compile(utils.ACCOUNT_ID_REGEX).fullmatch(resource_id):
resource = next((a for a in self.accounts if a.id == resource_id), None)
elif re.compile(utils.POLICY_ID_REGEX).fullmatch(resource_id):
resource = next((a for a in self.policies if a.id == resource_id), None)
else:
raise InvalidInputException(
"You provided a value that does not match the required pattern."
)
if resource is None:
raise TargetNotFoundException
return resource
def list_targets_for_policy(self, **kwargs):
if re.compile(utils.POLICY_ID_REGEX).match(kwargs["PolicyId"]):
policy = next(
@ -673,37 +694,19 @@ class OrganizationsBackend(BaseBackend):
return dict(Targets=objects)
def tag_resource(self, **kwargs):
account = next((a for a in self.accounts if a.id == kwargs["ResourceId"]), None)
if account is None:
raise InvalidInputException(
"You provided a value that does not match the required pattern."
)
resource = self._get_resource_for_tagging(kwargs["ResourceId"])
new_tags = {tag["Key"]: tag["Value"] for tag in kwargs["Tags"]}
account.tags.update(new_tags)
resource.tags.update(new_tags)
def list_tags_for_resource(self, **kwargs):
account = next((a for a in self.accounts if a.id == kwargs["ResourceId"]), None)
if account is None:
raise InvalidInputException(
"You provided a value that does not match the required pattern."
)
tags = [{"Key": key, "Value": value} for key, value in account.tags.items()]
resource = self._get_resource_for_tagging(kwargs["ResourceId"])
tags = [{"Key": key, "Value": value} for key, value in resource.tags.items()]
return dict(Tags=tags)
def untag_resource(self, **kwargs):
account = next((a for a in self.accounts if a.id == kwargs["ResourceId"]), None)
if account is None:
raise InvalidInputException(
"You provided a value that does not match the required pattern."
)
resource = self._get_resource_for_tagging(kwargs["ResourceId"])
for key in kwargs["TagKeys"]:
account.tags.pop(key, None)
resource.tags.pop(key, None)
def enable_aws_service_access(self, **kwargs):
service = FakeServiceAccess(**kwargs)