Add events.list_event_buses

This commit is contained in:
gruebel 2019-11-03 20:42:31 +01:00
commit 831577350d
4 changed files with 95 additions and 7 deletions

View file

@ -344,6 +344,16 @@ class EventsBackend(BaseBackend):
return self.event_buses[name]
def list_event_buses(self, name_prefix):
if name_prefix:
return [
event_bus
for event_bus in self.event_buses.values()
if event_bus.name.startswith(name_prefix)
]
return list(self.event_buses.values())
available_regions = boto3.session.Session().get_available_regions("events")
events_backends = {region: EventsBackend(region) for region in available_regions}

View file

@ -278,3 +278,21 @@ class EventsHandler(BaseResponse):
event_bus = self.events_backend.create_event_bus(name, event_source_name)
return json.dumps({"EventBusArn": event_bus.arn}), self.response_headers
def list_event_buses(self):
name_prefix = self._get_param("NamePrefix")
# ToDo: add 'NextToken' & 'Limit' parameters
response = []
for event_bus in self.events_backend.list_event_buses(name_prefix):
event_bus_response = {
"Name": event_bus.name,
"Arn": event_bus.arn,
}
if event_bus.policy:
event_bus_response["Policy"] = event_bus.policy
response.append(event_bus_response)
return json.dumps({"EventBuses": response}), self.response_headers