Merge pull request #2694 from brady-gsa/events-tagging

adds tagging support for cloudwatch events service
This commit is contained in:
Steve Pulec 2020-02-17 18:51:09 -06:00 committed by GitHub
commit 4a89131ec4
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
6 changed files with 249 additions and 12 deletions

View file

@ -6,6 +6,7 @@ from boto3 import Session
from moto.core.exceptions import JsonRESTError
from moto.core import BaseBackend, BaseModel
from moto.sts.models import ACCOUNT_ID
from moto.utilities.tagging_service import TaggingService
class Rule(BaseModel):
@ -104,6 +105,7 @@ class EventsBackend(BaseBackend):
self.region_name = region_name
self.event_buses = {}
self.event_sources = {}
self.tagger = TaggingService()
self._add_default_event_bus()
@ -141,6 +143,9 @@ class EventsBackend(BaseBackend):
def delete_rule(self, name):
self.rules_order.pop(self.rules_order.index(name))
arn = self.rules.get(name).arn
if self.tagger.has_tags(arn):
self.tagger.delete_all_tags_for_resource(arn)
return self.rules.pop(name) is not None
def describe_rule(self, name):
@ -361,6 +366,32 @@ class EventsBackend(BaseBackend):
self.event_buses.pop(name, None)
def list_tags_for_resource(self, arn):
name = arn.split("/")[-1]
if name in self.rules:
return self.tagger.list_tags_for_resource(self.rules[name].arn)
raise JsonRESTError(
"ResourceNotFoundException", "An entity that you specified does not exist."
)
def tag_resource(self, arn, tags):
name = arn.split("/")[-1]
if name in self.rules:
self.tagger.tag_resource(self.rules[name].arn, tags)
return {}
raise JsonRESTError(
"ResourceNotFoundException", "An entity that you specified does not exist."
)
def untag_resource(self, arn, tag_names):
name = arn.split("/")[-1]
if name in self.rules:
self.tagger.untag_resource_using_names(self.rules[name].arn, tag_names)
return {}
raise JsonRESTError(
"ResourceNotFoundException", "An entity that you specified does not exist."
)
events_backends = {}
for region in Session().get_available_regions("events"):

View file

@ -297,3 +297,26 @@ class EventsHandler(BaseResponse):
self.events_backend.delete_event_bus(name)
return "", self.response_headers
def list_tags_for_resource(self):
arn = self._get_param("ResourceARN")
result = self.events_backend.list_tags_for_resource(arn)
return json.dumps(result), self.response_headers
def tag_resource(self):
arn = self._get_param("ResourceARN")
tags = self._get_param("Tags")
result = self.events_backend.tag_resource(arn, tags)
return json.dumps(result), self.response_headers
def untag_resource(self):
arn = self._get_param("ResourceARN")
tags = self._get_param("TagKeys")
result = self.events_backend.untag_resource(arn, tags)
return json.dumps(result), self.response_headers

View file

View file

@ -0,0 +1,62 @@
class TaggingService:
def __init__(self, tagName="Tags", keyName="Key", valueName="Value"):
self.tagName = tagName
self.keyName = keyName
self.valueName = valueName
self.tags = {}
def list_tags_for_resource(self, arn):
result = []
if arn in self.tags:
for k, v in self.tags[arn].items():
result.append({self.keyName: k, self.valueName: v})
return {self.tagName: result}
def delete_all_tags_for_resource(self, arn):
del self.tags[arn]
def has_tags(self, arn):
return arn in self.tags
def tag_resource(self, arn, tags):
if arn not in self.tags:
self.tags[arn] = {}
for t in tags:
if self.valueName in t:
self.tags[arn][t[self.keyName]] = t[self.valueName]
else:
self.tags[arn][t[self.keyName]] = None
def untag_resource_using_names(self, arn, tag_names):
for name in tag_names:
if name in self.tags.get(arn, {}):
del self.tags[arn][name]
def untag_resource_using_tags(self, arn, tags):
m = self.tags.get(arn, {})
for t in tags:
if self.keyName in t:
if t[self.keyName] in m:
if self.valueName in t:
if m[t[self.keyName]] != t[self.valueName]:
continue
# If both key and value are provided, match both before deletion
del m[t[self.keyName]]
def extract_tag_names(self, tags):
results = []
if len(tags) == 0:
return results
for tag in tags:
if self.keyName in tag:
results.append(tag[self.keyName])
return results
def flatten_tag_list(self, tags):
result = {}
for t in tags:
if self.valueName in t:
result[t[self.keyName]] = t[self.valueName]
else:
result[t[self.keyName]] = None
return result