Add prefix and numeric filtering logic for Archive EventPattern (#3835)

* Add prefix and numeric filtering logic for Archive EventPattern

* Pull EventPattern logic out into class and test logic more directly

* Apply black formatting

Co-authored-by: Tom Noble <tom.noble@bjss.com>
This commit is contained in:
Tom Noble 2021-04-04 19:27:54 +01:00 committed by GitHub
commit b138d9956b
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 141 additions and 119 deletions

View file

@ -7,6 +7,7 @@ import warnings
from collections import namedtuple
from datetime import datetime
from enum import Enum, unique
from operator import lt, le, eq, ge, gt
from boto3 import Session
from six import string_types
@ -94,47 +95,6 @@ class Rule(CloudFormationModel):
if index is not None:
self.targets.pop(index)
def _does_event_match_filter(self, event, filter):
if not filter:
return True
items_and_filters = [(event.get(k), v) for k, v in filter.items()]
nested_filter_matches = [
self._does_event_match_filter(item, nested_filter)
for item, nested_filter in items_and_filters
if isinstance(nested_filter, dict)
]
filter_list_matches = [
self._does_item_match_filters(item, filter_list)
for item, filter_list in items_and_filters
if isinstance(filter_list, list)
]
return all(nested_filter_matches + filter_list_matches)
def _does_item_match_filters(self, item, filters):
allowed_values = [value for value in filters if isinstance(value, string_types)]
allowed_values_match = item in allowed_values if allowed_values else True
print(item, filters, allowed_values)
named_filter_matches = [
self._does_item_match_named_filter(item, filter)
for filter in filters
if isinstance(filter, dict)
]
return allowed_values_match and all(named_filter_matches)
def _does_item_match_named_filter(self, item, filter):
filter_name, filter_value = list(filter.items())[0]
if filter_name == "exists":
item_exists = item is not None
should_exist = filter_value
return item_exists if should_exist else not item_exists
else:
warnings.warn(
"'{}' filter logic unimplemented. defaulting to True".format(
filter_name
)
)
return False
def send_to_targets(self, event_bus_name, event):
event_bus_name = event_bus_name.split("/")[-1]
if event_bus_name != self.event_bus_name:
@ -230,11 +190,8 @@ class Rule(CloudFormationModel):
def _send_to_events_archive(self, resource_id, event):
archive_name, archive_uuid = resource_id.split(":")
archive = events_backends[self.region_name].archives.get(archive_name)
pattern = archive.event_pattern
if archive.uuid == archive_uuid:
event = json.loads(json.dumps(event))
pattern = json.loads(pattern) if pattern else None
if self._does_event_match_filter(event, pattern):
if archive.event_pattern.matches_event(event):
archive.events.append(event)
def _send_to_sqs_queue(self, resource_id, event, group_id=None):
@ -415,7 +372,7 @@ class Archive(CloudFormationModel):
self.name = name
self.source_arn = source_arn
self.description = description
self.event_pattern = event_pattern
self.event_pattern = EventPattern(event_pattern)
self.retention = retention if retention else 0
self.creation_time = unix_time(datetime.utcnow())
@ -446,7 +403,7 @@ class Archive(CloudFormationModel):
result = {
"ArchiveArn": self.arn,
"Description": self.description,
"EventPattern": self.event_pattern,
"EventPattern": str(self.event_pattern),
}
result.update(self.describe_short())
@ -603,6 +560,69 @@ class Replay(BaseModel):
self.end_time = unix_time(datetime.utcnow())
class EventPattern:
def __init__(self, filter):
self._filter = json.loads(filter) if filter else None
def __str__(self):
return json.dumps(self._filter)
def matches_event(self, event):
if not self._filter:
return True
event = json.loads(json.dumps(event))
return self._does_event_match(event, self._filter)
def _does_event_match(self, event, filter):
items_and_filters = [(event.get(k), v) for k, v in filter.items()]
nested_filter_matches = [
self._does_event_match(item, nested_filter)
for item, nested_filter in items_and_filters
if isinstance(nested_filter, dict)
]
filter_list_matches = [
self._does_item_match_filters(item, filter_list)
for item, filter_list in items_and_filters
if isinstance(filter_list, list)
]
return all(nested_filter_matches + filter_list_matches)
def _does_item_match_filters(self, item, filters):
allowed_values = [value for value in filters if isinstance(value, string_types)]
allowed_values_match = item in allowed_values if allowed_values else True
named_filter_matches = [
self._does_item_match_named_filter(item, filter)
for filter in filters
if isinstance(filter, dict)
]
return allowed_values_match and all(named_filter_matches)
def _does_item_match_named_filter(self, item, filter):
filter_name, filter_value = list(filter.items())[0]
if filter_name == "exists":
item_exists = item is not None
should_exist = filter_value
return item_exists if should_exist else not item_exists
if filter_name == "prefix":
prefix = filter_value
return item.startswith(prefix)
if filter_name == "numeric":
as_function = {"<": lt, "<=": le, "=": eq, ">=": ge, ">": gt}
operators_and_values = zip(filter_value[::2], filter_value[1::2])
numeric_matches = [
as_function[operator](item, value)
for operator, value in operators_and_values
]
return all(numeric_matches)
else:
warnings.warn(
"'{}' filter logic unimplemented. defaulting to True".format(
filter_name
)
)
return True
class EventsBackend(BaseBackend):
ACCOUNT_ID = re.compile(r"^(\d{1,12}|\*)$")
STATEMENT_ID = re.compile(r"^[a-zA-Z0-9-_]{1,64}$")