Add exists filtering functionality to Archive (#3832)

* Add exists filtering functionality to Archive. Add test case and refactor existing Archive EventPattern test cases

* Apply black formatting

* Change NotImplementedError to warning

* Simplify unimplemented warning for filters

* Change str check to six.string_types check for python2.7

Co-authored-by: Tom Noble <tom.noble@bjss.com>
This commit is contained in:
Tom Noble 2021-04-02 18:32:01 +01:00 committed by GitHub
commit 15eda737d6
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 103 additions and 51 deletions

View file

@ -9,6 +9,7 @@ from datetime import datetime
from enum import Enum, unique
from boto3 import Session
from six import string_types
from moto.core.exceptions import JsonRESTError
from moto.core import ACCOUNT_ID, BaseBackend, CloudFormationModel, BaseModel
@ -93,23 +94,46 @@ class Rule(CloudFormationModel):
if index is not None:
self.targets.pop(index)
def _does_event_match_pattern(self, event, pattern):
if not pattern:
def _does_event_match_filter(self, event, filter):
if not filter:
return True
event_pattern_pairs = [(event.get(k), v) for k, v in pattern.items()]
for event_item, pattern_item in event_pattern_pairs:
if not self._does_event_item_match_pattern_item(event_item, pattern_item):
return False
return True
items_and_filters = [(event.get(k), v) for k, v in filter.items()]
nested_filter_matches = [
self._does_event_match_filter(item, nested_filter)
for item, nested_filter in items_and_filters
if isinstance(nested_filter, dict)
]
filter_list_matches = [
self._does_item_match_filters(item, filter_list)
for item, filter_list in items_and_filters
if isinstance(filter_list, list)
]
return all(nested_filter_matches + filter_list_matches)
def _does_event_item_match_pattern_item(self, event_item, pattern_item):
# Only supports "key: [value]" filters currently
if not event_item:
def _does_item_match_filters(self, item, filters):
allowed_values = [value for value in filters if isinstance(value, string_types)]
allowed_values_match = item in allowed_values if allowed_values else True
print(item, filters, allowed_values)
named_filter_matches = [
self._does_item_match_named_filter(item, filter)
for filter in filters
if isinstance(filter, dict)
]
return allowed_values_match and all(named_filter_matches)
def _does_item_match_named_filter(self, item, filter):
filter_name, filter_value = list(filter.items())[0]
if filter_name == "exists":
item_exists = item is not None
should_exist = filter_value
return item_exists if should_exist else not item_exists
else:
warnings.warn(
"'{}' filter logic unimplemented. defaulting to True".format(
filter_name
)
)
return False
if isinstance(pattern_item, list):
return event_item in pattern_item
if isinstance(pattern_item, dict):
return self._does_event_match_pattern(event_item, pattern_item)
def send_to_targets(self, event_bus_name, event):
event_bus_name = event_bus_name.split("/")[-1]
@ -210,7 +234,7 @@ class Rule(CloudFormationModel):
if archive.uuid == archive_uuid:
event = json.loads(json.dumps(event))
pattern = json.loads(pattern) if pattern else None
if self._does_event_match_pattern(event, pattern):
if self._does_event_match_filter(event, pattern):
archive.events.append(event)
def _send_to_sqs_queue(self, resource_id, event, group_id=None):