VPC IPv4 validation (#2026)

* Implemented throwing invalid subnet range error and fixed breaking tests.

* Implemented throwing invalid CIDR block parameter error for vpcs and subnets.

* Implemented throwing invalid destination CIDR block error.

* IPv6 addresses not accepted, strict checking disabled.

* Implemented throwing invalid subnet conflict error and fixed breaking tests.

* Implemented throwing invalid VPC range error and fixed breaking tests.

* Fixed accidentally removed ).

* Fixed test case trying to create two subnets with the same CIDR range.
This commit is contained in:
Bendegúz Ács 2019-05-25 19:35:07 +02:00 committed by Terry Cain
commit f408709ef9
14 changed files with 215 additions and 45 deletions

View file

@ -430,6 +430,51 @@ class OperationNotPermitted(EC2ClientError):
)
class InvalidSubnetRangeError(EC2ClientError):
def __init__(self, cidr_block):
super(InvalidSubnetRangeError, self).__init__(
"InvalidSubnet.Range",
"The CIDR '{}' is invalid.".format(cidr_block)
)
class InvalidCIDRBlockParameterError(EC2ClientError):
def __init__(self, cidr_block):
super(InvalidCIDRBlockParameterError, self).__init__(
"InvalidParameterValue",
"Value ({}) for parameter cidrBlock is invalid. This is not a valid CIDR block.".format(cidr_block)
)
class InvalidDestinationCIDRBlockParameterError(EC2ClientError):
def __init__(self, cidr_block):
super(InvalidDestinationCIDRBlockParameterError, self).__init__(
"InvalidParameterValue",
"Value ({}) for parameter destinationCidrBlock is invalid. This is not a valid CIDR block.".format(cidr_block)
)
class InvalidSubnetConflictError(EC2ClientError):
def __init__(self, cidr_block):
super(InvalidSubnetConflictError, self).__init__(
"InvalidSubnet.Conflict",
"The CIDR '{}' conflicts with another subnet".format(cidr_block)
)
class InvalidVPCRangeError(EC2ClientError):
def __init__(self, cidr_block):
super(InvalidVPCRangeError, self).__init__(
"InvalidVpc.Range",
"The CIDR '{}' is invalid.".format(cidr_block)
)
# accept exception
class OperationNotPermitted2(EC2ClientError):
def __init__(self, client_region, pcx_id, acceptor_region):

View file

@ -36,8 +36,10 @@ from .exceptions import (
InvalidAMIIdError,
InvalidAMIAttributeItemValueError,
InvalidAssociationIdError,
InvalidCIDRBlockParameterError,
InvalidCIDRSubnetError,
InvalidCustomerGatewayIdError,
InvalidDestinationCIDRBlockParameterError,
InvalidDHCPOptionsIdError,
InvalidDomainError,
InvalidID,
@ -58,13 +60,16 @@ from .exceptions import (
InvalidSecurityGroupDuplicateError,
InvalidSecurityGroupNotFoundError,
InvalidSnapshotIdError,
InvalidSubnetConflictError,
InvalidSubnetIdError,
InvalidSubnetRangeError,
InvalidVolumeIdError,
InvalidVolumeAttachmentError,
InvalidVpcCidrBlockAssociationIdError,
InvalidVPCPeeringConnectionIdError,
InvalidVPCPeeringConnectionStateTransitionError,
InvalidVPCIdError,
InvalidVPCRangeError,
InvalidVpnGatewayIdError,
InvalidVpnConnectionIdError,
MalformedAMIIdError,
@ -2151,6 +2156,12 @@ class VPCBackend(object):
def create_vpc(self, cidr_block, instance_tenancy='default', amazon_provided_ipv6_cidr_block=False):
vpc_id = random_vpc_id()
try:
vpc_cidr_block = ipaddress.IPv4Network(six.text_type(cidr_block), strict=False)
except ValueError:
raise InvalidCIDRBlockParameterError(cidr_block)
if vpc_cidr_block.prefixlen < 16 or vpc_cidr_block.prefixlen > 28:
raise InvalidVPCRangeError(cidr_block)
vpc = VPC(self, vpc_id, cidr_block, len(self.vpcs) == 0, instance_tenancy, amazon_provided_ipv6_cidr_block)
self.vpcs[vpc_id] = vpc
@ -2367,7 +2378,7 @@ class Subnet(TaggedEC2Resource):
self.id = subnet_id
self.vpc_id = vpc_id
self.cidr_block = cidr_block
self.cidr = ipaddress.ip_network(six.text_type(self.cidr_block))
self.cidr = ipaddress.IPv4Network(six.text_type(self.cidr_block), strict=False)
self._availability_zone = availability_zone
self.default_for_az = default_for_az
self.map_public_ip_on_launch = map_public_ip_on_launch
@ -2499,7 +2510,19 @@ class SubnetBackend(object):
def create_subnet(self, vpc_id, cidr_block, availability_zone):
subnet_id = random_subnet_id()
self.get_vpc(vpc_id) # Validate VPC exists
vpc = self.get_vpc(vpc_id) # Validate VPC exists and the supplied CIDR block is a subnet of the VPC's
vpc_cidr_block = ipaddress.IPv4Network(six.text_type(vpc.cidr_block), strict=False)
try:
subnet_cidr_block = ipaddress.IPv4Network(six.text_type(cidr_block), strict=False)
except ValueError:
raise InvalidCIDRBlockParameterError(cidr_block)
if not (vpc_cidr_block.network_address <= subnet_cidr_block.network_address and
vpc_cidr_block.broadcast_address >= subnet_cidr_block.broadcast_address):
raise InvalidSubnetRangeError(cidr_block)
for subnet in self.get_all_subnets(filters={'vpc-id': vpc_id}):
if subnet.cidr.overlaps(subnet_cidr_block):
raise InvalidSubnetConflictError(cidr_block)
# if this is the first subnet for an availability zone,
# consider it the default
@ -2759,6 +2782,11 @@ class RouteBackend(object):
elif EC2_RESOURCE_TO_PREFIX['internet-gateway'] in gateway_id:
gateway = self.get_internet_gateway(gateway_id)
try:
ipaddress.IPv4Network(six.text_type(destination_cidr_block), strict=False)
except ValueError:
raise InvalidDestinationCIDRBlockParameterError(destination_cidr_block)
route = Route(route_table, destination_cidr_block, local=local,
gateway=gateway,
instance=self.get_instance(