WIP: Introducing VPC Flow Logs (#3337)

* Start working on flow logs

* Change test

* Constructing tests

* Changing exceptions and adding more tests

* Adding more tests

* Changing model and adding more tests

* Adding support for tags

* Mocking Access error with non-existing Log Group Name

* Adding FlowLogAlreadyExists support

* Changing style

* Reformatted code

* Reformatted tests

* Removing needless test

* Adding support for CloudFormation

* Reformatting slightly

* Removing arnparse and using split

* Rearranging tests

* Fixing FilterNotImplementedError test

* Moving imports to 'if' clauses and adding explicit test for 'cloud-watch-logs' type

* Setting names matching boto3 API and restoring 'not-implementd-filter' test

* Reformatting tests with black
This commit is contained in:
ljakimczuk 2020-09-28 08:16:06 +02:00 committed by GitHub
commit 3bc18455a2
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
6 changed files with 1164 additions and 0 deletions

View file

@ -71,6 +71,24 @@ class InvalidSubnetIdError(EC2ClientError):
)
class InvalidFlowLogIdError(EC2ClientError):
def __init__(self, count, flow_log_ids):
super(InvalidFlowLogIdError, self).__init__(
"InvalidFlowLogId.NotFound",
"These flow log ids in the input list are not found: [TotalCount: {0}] {1}".format(
count, flow_log_ids
),
)
class FlowLogAlreadyExists(EC2ClientError):
def __init__(self):
super(FlowLogAlreadyExists, self).__init__(
"FlowLogAlreadyExists",
"Error. There is an existing Flow Log with the same configuration and log destination.",
)
class InvalidNetworkAclIdError(EC2ClientError):
def __init__(self, network_acl_id):
super(InvalidNetworkAclIdError, self).__init__(
@ -263,6 +281,14 @@ class InvalidAddressError(EC2ClientError):
)
class LogDestinationNotFoundError(EC2ClientError):
def __init__(self, bucket_name):
super(LogDestinationNotFoundError, self).__init__(
"LogDestinationNotFoundException",
"LogDestination: '{0}' does not exist.".format(bucket_name),
)
class InvalidAllocationIdError(EC2ClientError):
def __init__(self, allocation_id):
super(InvalidAllocationIdError, self).__init__(
@ -309,6 +335,33 @@ class InvalidVPCPeeringConnectionStateTransitionError(EC2ClientError):
)
class InvalidDependantParameterError(EC2ClientError):
def __init__(self, dependant_parameter, parameter, parameter_value):
super(InvalidDependantParameterError, self).__init__(
"InvalidParameter",
"{0} can't be empty if {1} is {2}.".format(
dependant_parameter, parameter, parameter_value,
),
)
class InvalidDependantParameterTypeError(EC2ClientError):
def __init__(self, dependant_parameter, parameter_value, parameter):
super(InvalidDependantParameterTypeError, self).__init__(
"InvalidParameter",
"{0} type must be {1} if {2} is provided.".format(
dependant_parameter, parameter_value, parameter,
),
)
class InvalidAggregationIntervalParameterError(EC2ClientError):
def __init__(self, parameter):
super(InvalidAggregationIntervalParameterError, self).__init__(
"InvalidParameter", "Invalid {0}".format(parameter),
)
class InvalidParameterValueError(EC2ClientError):
def __init__(self, parameter_value):
super(InvalidParameterValueError, self).__init__(

View file

@ -28,11 +28,13 @@ from moto.core.utils import (
camelcase_to_underscores,
)
from moto.core import ACCOUNT_ID
from .exceptions import (
CidrLimitExceeded,
DependencyViolationError,
EC2ClientError,
FilterNotImplementedError,
FlowLogAlreadyExists,
GatewayNotAttachedError,
InvalidAddressError,
InvalidAllocationIdError,
@ -52,6 +54,10 @@ from .exceptions import (
InvalidKeyPairDuplicateError,
InvalidKeyPairFormatError,
InvalidKeyPairNameError,
InvalidAggregationIntervalParameterError,
InvalidDependantParameterError,
InvalidDependantParameterTypeError,
InvalidFlowLogIdError,
InvalidLaunchTemplateNameError,
InvalidNetworkAclIdError,
InvalidNetworkAttachmentIdError,
@ -123,6 +129,7 @@ from .utils import (
random_spot_request_id,
random_subnet_id,
random_subnet_association_id,
random_flow_log_id,
random_volume_id,
random_vpc_id,
random_vpc_cidr_association_id,
@ -1176,6 +1183,7 @@ class TagBackend(object):
"subnet",
"volume",
"vpc",
"vpc-flow-log",
"vpc-peering-connection" "vpn-connection",
"vpn-gateway",
]
@ -3524,6 +3532,301 @@ class SubnetBackend(object):
raise InvalidParameterValueError(attr_name)
class Unsuccessful(object):
def __init__(
self, resource_id, error_code, error_message,
):
self.resource_id = resource_id
self.error_code = error_code
self.error_message = error_message
class FlowLogs(TaggedEC2Resource, CloudFormationModel):
def __init__(
self,
ec2_backend,
flow_log_id,
resource_id,
traffic_type,
log_destination,
log_group_name,
deliver_logs_permission_arn,
max_aggregation_interval,
log_destination_type,
log_format,
deliver_logs_status="SUCCESS",
deliver_logs_error_message=None,
):
self.ec2_backend = ec2_backend
self.id = flow_log_id
self.resource_id = resource_id
self.traffic_type = traffic_type
self.log_destination = log_destination
self.log_group_name = log_group_name
self.deliver_logs_permission_arn = deliver_logs_permission_arn
self.deliver_logs_status = deliver_logs_status
self.deliver_logs_error_message = deliver_logs_error_message
self.max_aggregation_interval = max_aggregation_interval
self.log_destination_type = log_destination_type
self.log_format = log_format
self.created_at = utc_date_and_time()
@staticmethod
def cloudformation_name_type():
return None
@staticmethod
def cloudformation_type():
# https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-ec2-flowlog.html
return "AWS::EC2::FlowLog"
@classmethod
def create_from_cloudformation_json(
cls, resource_name, cloudformation_json, region_name
):
properties = cloudformation_json["Properties"]
resource_type = properties.get("ResourceType")
resource_id = [properties.get("ResourceId")]
traffic_type = properties.get("TrafficType")
deliver_logs_permission_arn = properties.get("DeliverLogsPermissionArn")
log_destination_type = properties.get("LogDestinationType")
log_destination = properties.get("LogDestination")
log_group_name = properties.get("LogGroupName")
log_format = properties.get("LogFormat")
max_aggregation_interval = properties.get("MaxAggregationInterval")
ec2_backend = ec2_backends[region_name]
flow_log, _ = ec2_backend.create_flow_logs(
resource_type,
resource_id,
traffic_type,
deliver_logs_permission_arn,
log_destination_type,
log_destination,
log_group_name,
log_format,
max_aggregation_interval,
)
for tag in properties.get("Tags", []):
tag_key = tag["Key"]
tag_value = tag["Value"]
flow_log[0].add_tag(tag_key, tag_value)
return flow_log[0]
@property
def physical_resource_id(self):
return self.id
def get_filter_value(self, filter_name):
"""
API Version 2016-11-15 defines the following filters for DescribeFlowLogs:
* deliver-log-status
* log-destination-type
* flow-log-id
* log-group-name
* resource-id
* traffic-type
* tag:key=value
* tag-key
Taken from: https://docs.aws.amazon.com/AWSEC2/latest/APIReference/API_DescribeFlowLogs.html
"""
if filter_name == "resource-id":
return self.resource_id
elif filter_name == "traffic-type":
return self.traffic_type
elif filter_name == "log-destination-type":
return self.log_destination_type
elif filter_name == "flow-log-id":
return self.id
elif filter_name == "log-group-name":
return self.log_group_name
elif filter_name == "deliver-log-status":
return "SUCCESS"
else:
return super(FlowLogs, self).get_filter_value(
filter_name, "DescribeFlowLogs"
)
class FlowLogsBackend(object):
def __init__(self):
self.flow_logs = defaultdict(dict)
super(FlowLogsBackend, self).__init__()
def _validate_request(
self,
log_group_name,
log_destination,
log_destination_type,
max_aggregation_interval,
deliver_logs_permission_arn,
):
if log_group_name is None and log_destination is None:
raise InvalidDependantParameterError(
"LogDestination", "LogGroupName", "not provided",
)
if log_destination_type == "s3":
if log_group_name is not None:
raise InvalidDependantParameterTypeError(
"LogDestination", "cloud-watch-logs", "LogGroupName",
)
elif log_destination_type == "cloud-watch-logs":
if deliver_logs_permission_arn is None:
raise InvalidDependantParameterError(
"DeliverLogsPermissionArn",
"LogDestinationType",
"cloud-watch-logs",
)
if max_aggregation_interval not in ["60", "600"]:
raise InvalidAggregationIntervalParameterError(
"Flow Log Max Aggregation Interval"
)
def create_flow_logs(
self,
resource_type,
resource_ids,
traffic_type,
deliver_logs_permission_arn,
log_destination_type,
log_destination,
log_group_name,
log_format,
max_aggregation_interval,
):
# Guess it's best to put it here due to possible
# lack of them in the CloudFormation template
max_aggregation_interval = (
"600" if max_aggregation_interval is None else max_aggregation_interval
)
log_destination_type = (
"cloud-watch-logs" if log_destination_type is None else log_destination_type
)
log_format = (
"${version} ${account-id} ${interface-id} ${srcaddr} ${dstaddr} ${srcport} ${dstport} ${protocol} ${packets} ${bytes} ${start} ${end} ${action} ${log-status}"
if log_format is None
else log_format
)
# Validate the requests paremeters
self._validate_request(
log_group_name,
log_destination,
log_destination_type,
max_aggregation_interval,
deliver_logs_permission_arn,
)
flow_logs_set = []
unsuccessful = []
for resource_id in resource_ids:
deliver_logs_status = "SUCCESS"
deliver_logs_error_message = None
flow_log_id = random_flow_log_id()
if resource_type == "VPC":
# Validate VPCs exist
self.get_vpc(resource_id)
elif resource_type == "Subnet":
# Validate Subnets exist
self.get_subnet(resource_id)
elif resource_type == "NetworkInterface":
# Validate NetworkInterfaces exist
self.get_network_interface(resource_id)
if log_destination_type == "s3":
from moto.s3.models import s3_backend
from moto.s3.exceptions import MissingBucket
arn = log_destination.split(":", 5)[5]
try:
s3_backend.get_bucket(arn)
except MissingBucket:
unsuccessful.append(
# Instead of creating FlowLog report
# the unsuccessful status for the
# given resource_id
Unsuccessful(
resource_id,
"400",
"LogDestination: {0} does not exist.".format(arn),
)
)
continue
elif log_destination_type == "cloud-watch-logs":
from moto.logs.models import logs_backends
from moto.logs.exceptions import ResourceNotFoundException
# API allows to create a FlowLog with a
# non-existing LogGroup. It however later
# on reports the FAILED delivery status.
try:
# Need something easy to check the group exists.
# The list_tags_log_group seems to do the trick.
logs_backends[self.region_name].list_tags_log_group(log_group_name)
except ResourceNotFoundException:
deliver_logs_status = "FAILED"
deliver_logs_error_message = "Access error"
all_flow_logs = self.describe_flow_logs()
if any(
fl.resource_id == resource_id
and (
fl.log_group_name == log_group_name
or fl.log_destination == log_destination
)
for fl in all_flow_logs
):
raise FlowLogAlreadyExists()
flow_logs = FlowLogs(
self,
flow_log_id,
resource_id,
traffic_type,
log_destination,
log_group_name,
deliver_logs_permission_arn,
max_aggregation_interval,
log_destination_type,
log_format,
deliver_logs_status,
deliver_logs_error_message,
)
self.flow_logs[flow_log_id] = flow_logs
flow_logs_set.append(flow_logs)
return flow_logs_set, unsuccessful
def describe_flow_logs(self, flow_log_ids=None, filters=None):
matches = itertools.chain([i for i in self.flow_logs.values()])
if flow_log_ids:
matches = [flow_log for flow_log in matches if flow_log.id in flow_log_ids]
if filters:
matches = generic_filter(filters, matches)
return matches
def delete_flow_logs(self, flow_log_ids):
non_existing = []
for flow_log in flow_log_ids:
if flow_log in self.flow_logs:
self.flow_logs.pop(flow_log, None)
else:
non_existing.append(flow_log)
if non_existing:
raise InvalidFlowLogIdError(
len(flow_log_ids), " ".join(x for x in flow_log_ids),
)
return True
class SubnetRouteTableAssociation(CloudFormationModel):
def __init__(self, route_table_id, subnet_id):
self.route_table_id = route_table_id
@ -5530,6 +5833,7 @@ class EC2Backend(
VPCBackend,
SubnetBackend,
SubnetRouteTableAssociationBackend,
FlowLogsBackend,
NetworkInterfaceBackend,
VPNConnectionBackend,
VPCPeeringConnectionBackend,

View file

@ -24,6 +24,7 @@ from .security_groups import SecurityGroups
from .spot_fleets import SpotFleets
from .spot_instances import SpotInstances
from .subnets import Subnets
from .flow_logs import FlowLogs
from .tags import TagResponse
from .virtual_private_gateways import VirtualPrivateGateways
from .vm_export import VMExport
@ -60,6 +61,7 @@ class EC2Response(
SpotFleets,
SpotInstances,
Subnets,
FlowLogs,
TagResponse,
VirtualPrivateGateways,
VMExport,

View file

@ -0,0 +1,122 @@
from __future__ import unicode_literals
from moto.core.responses import BaseResponse
from moto.ec2.models import validate_resource_ids
from moto.ec2.utils import filters_from_querystring
class FlowLogs(BaseResponse):
def create_flow_logs(self):
resource_type = self._get_param("ResourceType")
resource_ids = self._get_multi_param("ResourceId")
traffic_type = self._get_param("TrafficType")
deliver_logs_permission_arn = self._get_param("DeliverLogsPermissionArn")
log_destination_type = self._get_param("LogDestinationType")
log_destination = self._get_param("LogDestination")
log_group_name = self._get_param("LogGroupName")
log_format = self._get_param("LogFormat")
max_aggregation_interval = self._get_param("MaxAggregationInterval")
validate_resource_ids(resource_ids)
tags = self._parse_tag_specification("TagSpecification")
tags = tags.get("vpc-flow-log", {})
if self.is_not_dryrun("CreateFlowLogs"):
flow_logs, errors = self.ec2_backend.create_flow_logs(
resource_type=resource_type,
resource_ids=resource_ids,
traffic_type=traffic_type,
deliver_logs_permission_arn=deliver_logs_permission_arn,
log_destination_type=log_destination_type,
log_destination=log_destination,
log_group_name=log_group_name,
log_format=log_format,
max_aggregation_interval=max_aggregation_interval,
)
for fl in flow_logs:
fl.add_tags(tags)
template = self.response_template(CREATE_FLOW_LOGS_RESPONSE)
return template.render(flow_logs=flow_logs, errors=errors)
def describe_flow_logs(self):
flow_log_ids = self._get_multi_param("FlowLogId")
filters = filters_from_querystring(self.querystring)
flow_logs = self.ec2_backend.describe_flow_logs(flow_log_ids, filters)
if self.is_not_dryrun("DescribeFlowLogs"):
template = self.response_template(DESCRIBE_FLOW_LOGS_RESPONSE)
return template.render(flow_logs=flow_logs)
def delete_flow_logs(self):
flow_log_ids = self._get_multi_param("FlowLogId")
self.ec2_backend.delete_flow_logs(flow_log_ids)
if self.is_not_dryrun("DeleteFlowLogs"):
template = self.response_template(DELETE_FLOW_LOGS_RESPONSE)
return template.render()
CREATE_FLOW_LOGS_RESPONSE = """
<CreateFlowLogsResponse xmlns="http://ec2.amazonaws.com/doc/2016-11-15/">
<requestId>2d96dae3-504b-4fc4-bf50-266EXAMPLE</requestId>
<unsuccessful>
{% for error in errors %}
<item>
<error>
<code>{{ error.error_code }}</code>
<message>{{ error.error_message }}</message>
</error>
<resourceId>{{ error.resource_id }}</resourceId>
</item>
{% endfor %}
</unsuccessful>
<flowLogIdSet>
{% for flow_log in flow_logs %}
<item>{{ flow_log.id }}</item>
{% endfor %}
</flowLogIdSet>
</CreateFlowLogsResponse>"""
DELETE_FLOW_LOGS_RESPONSE = """
<DeleteFlowLogsResponse xmlns="http://ec2.amazonaws.com/doc/2016-11-15/">
<requestId>c5c4f51f-f4e9-42bc-8700-EXAMPLE</requestId>
<unsuccessful/>
</DeleteFlowLogsResponse>"""
DESCRIBE_FLOW_LOGS_RESPONSE = """
<DescribeFlowLogsResponse xmlns="http://ec2.amazonaws.com/doc/2016-11-15/">
<requestId>3cb46f23-099e-4bf0-891c-EXAMPLE</requestId>
<flowLogSet>
{% for flow_log in flow_logs %}
<item>
{% if flow_log.log_destination is not none %}
<logDestination>{{ flow_log.log_destination }}</logDestination>
{% endif %}
<resourceId>{{ flow_log.resource_id }}</resourceId>
<logDestinationType>{{ flow_log.log_destination_type }}</logDestinationType>
<creationTime>{{ flow_log.created_at }}</creationTime>
<trafficType>{{ flow_log.traffic_type }}</trafficType>
<deliverLogsStatus>{{ flow_log.deliver_logs_status }}</deliverLogsStatus>
{% if flow_log.deliver_logs_error_message is not none %}
<deliverLogsErrorMessage>{{ flow_log.deliver_logs_error_message }}</deliverLogsErrorMessage>
{% endif %}
<logFormat>{{ flow_log.log_format }}</logFormat>
<flowLogStatus>ACTIVE</flowLogStatus>
<flowLogId>{{ flow_log.id }}</flowLogId>
<maxAggregationInterval>{{ flow_log.max_aggregation_interval }}</maxAggregationInterval>
{% if flow_log.deliver_logs_permission_arn is not none %}
<deliverLogsPermissionArn>{{ flow_log.deliver_logs_permission_arn }}</deliverLogsPermissionArn>
{% endif %}
{% if flow_log.log_group_name is not none %}
<logGroupName>{{ flow_log.log_group_name }}</logGroupName>
{% endif %}
{% if flow_log.get_tags() %}
<tagSet>
{% for tag in flow_log.get_tags() %}
<item>
<key>{{ tag.key }}</key>
<value>{{ tag.value }}</value>
</item>
{% endfor %}
</tagSet>
{% endif %}
</item>
{% endfor %}
</flowLogSet>
</DescribeFlowLogsResponse>"""

View file

@ -16,6 +16,7 @@ from moto.core import ACCOUNT_ID
EC2_RESOURCE_TO_PREFIX = {
"customer-gateway": "cgw",
"dhcp-options": "dopt",
"flow-logs": "fl",
"image": "ami",
"instance": "i",
"internet-gateway": "igw",
@ -74,6 +75,10 @@ def random_security_group_id():
return random_id(prefix=EC2_RESOURCE_TO_PREFIX["security-group"])
def random_flow_log_id():
return random_id(prefix=EC2_RESOURCE_TO_PREFIX["flow-logs"])
def random_snapshot_id():
return random_id(prefix=EC2_RESOURCE_TO_PREFIX["snapshot"])