Add tag_resource and untag_resource for ECS services

* Refactor resource ARN parsing
This commit is contained in:
Jim Shields 2019-10-03 15:16:07 -04:00
commit 6cb1173719
3 changed files with 235 additions and 9 deletions

View file

@ -959,28 +959,31 @@ class EC2ContainerServiceBackend(BaseBackend):
yield task_fam
def list_tags_for_resource(self, resource_arn):
"""Currently only implemented for task definitions"""
@staticmethod
def _parse_resource_arn(resource_arn):
match = re.match(
"^arn:aws:ecs:(?P<region>[^:]+):(?P<account_id>[^:]+):(?P<service>[^:]+)/(?P<id>.*)$",
resource_arn)
if not match:
raise JsonRESTError('InvalidParameterException', 'The ARN provided is invalid.')
return match.groupdict()
service = match.group("service")
if service == "task-definition":
def list_tags_for_resource(self, resource_arn):
"""Currently implemented only for task definitions and services"""
parsed_arn = self._parse_resource_arn(resource_arn)
if parsed_arn["service"] == "task-definition":
for task_definition in self.task_definitions.values():
for revision in task_definition.values():
if revision.arn == resource_arn:
return revision.tags
else:
raise TaskDefinitionNotFoundException()
elif service == "service":
for _service in self.services.values():
if _service.arn == resource_arn:
return _service.tags
elif parsed_arn["service"] == "service":
for service in self.services.values():
if service.arn == resource_arn:
return service.tags
else:
raise ServiceNotFoundException(service_name=match.group("id"))
raise ServiceNotFoundException(service_name=parsed_arn["id"])
raise NotImplementedError()
def _get_last_task_definition_revision_id(self, family):
@ -988,6 +991,42 @@ class EC2ContainerServiceBackend(BaseBackend):
if definitions:
return max(definitions.keys())
def tag_resource(self, resource_arn, tags):
"""Currently implemented only for services"""
parsed_arn = self._parse_resource_arn(resource_arn)
if parsed_arn["service"] == "service":
for service in self.services.values():
if service.arn == resource_arn:
service.tags = self._merge_tags(service.tags, tags)
return {}
else:
raise ServiceNotFoundException(service_name=parsed_arn["id"])
raise NotImplementedError()
def _merge_tags(self, existing_tags, new_tags):
merged_tags = new_tags
new_keys = self._get_keys(new_tags)
for existing_tag in existing_tags:
if existing_tag["key"] not in new_keys:
merged_tags.append(existing_tag)
return merged_tags
@staticmethod
def _get_keys(tags):
return [tag['key'] for tag in tags]
def untag_resource(self, resource_arn, tag_keys):
"""Currently implemented only for services"""
parsed_arn = self._parse_resource_arn(resource_arn)
if parsed_arn["service"] == "service":
for service in self.services.values():
if service.arn == resource_arn:
service.tags = [tag for tag in service.tags if tag["key"] not in tag_keys]
return {}
else:
raise ServiceNotFoundException(service_name=parsed_arn["id"])
raise NotImplementedError()
available_regions = boto3.session.Session().get_available_regions("ecs")
ecs_backends = {region: EC2ContainerServiceBackend(region) for region in available_regions}