From a7ddcd7da314507975246a256a5ebc4aaca1f4be Mon Sep 17 00:00:00 2001 From: usmangani1 Date: Tue, 4 Aug 2020 11:20:57 +0530 Subject: [PATCH] Fix:EC2-authorize_security_group_ingress- add description to IP-Ranges (#3196) * Fix:EC2-authorize_security_group_ingress- add description to IP-Ranges * Fix:EC2-authorize_security_group_ingress- add test when description is not present. * part commit * Fix:fixed build errors * Linting * Allow for Python2 string/unicodes Co-authored-by: usmankb Co-authored-by: Bert Blommers --- moto/ec2/models.py | 26 +++++++--- moto/ec2/responses/security_groups.py | 12 ++++- tests/test_ec2/test_security_groups.py | 72 ++++++++++++++++++++++++-- 3 files changed, 96 insertions(+), 14 deletions(-) diff --git a/moto/ec2/models.py b/moto/ec2/models.py index e6c57dcd..2498726b 100644 --- a/moto/ec2/models.py +++ b/moto/ec2/models.py @@ -160,7 +160,6 @@ AMIS = _load_resource( or resource_filename(__name__, "resources/amis.json"), ) - OWNER_ID = ACCOUNT_ID @@ -1405,7 +1404,6 @@ class Ami(TaggedEC2Resource): class AmiBackend(object): - AMI_REGEX = re.compile("ami-[a-z0-9]+") def __init__(self): @@ -2118,11 +2116,16 @@ class SecurityGroupBackend(object): vpc_id=None, ): group = self.get_security_group_by_name_or_id(group_name_or_id, vpc_id) - if ip_ranges and not isinstance(ip_ranges, list): - ip_ranges = [ip_ranges] + if ip_ranges: + if isinstance(ip_ranges, str) or ( + six.PY2 and isinstance(ip_ranges, unicode) # noqa + ): + ip_ranges = [{"CidrIp": str(ip_ranges)}] + elif not isinstance(ip_ranges, list): + ip_ranges = [json.loads(ip_ranges)] if ip_ranges: for cidr in ip_ranges: - if not is_valid_cidr(cidr): + if not is_valid_cidr(cidr["CidrIp"]): raise InvalidCIDRSubnetError(cidr=cidr) self._verify_group_will_respect_rule_count_limit( @@ -2200,10 +2203,14 @@ class SecurityGroupBackend(object): group = self.get_security_group_by_name_or_id(group_name_or_id, vpc_id) if ip_ranges and not isinstance(ip_ranges, list): - ip_ranges = [ip_ranges] + + if isinstance(ip_ranges, str) and "CidrIp" not in ip_ranges: + ip_ranges = [{"CidrIp": ip_ranges}] + else: + ip_ranges = [json.loads(ip_ranges)] if ip_ranges: for cidr in ip_ranges: - if not is_valid_cidr(cidr): + if not is_valid_cidr(cidr["CidrIp"]): raise InvalidCIDRSubnetError(cidr=cidr) self._verify_group_will_respect_rule_count_limit( @@ -2259,9 +2266,13 @@ class SecurityGroupBackend(object): if source_group: source_groups.append(source_group) + for ip in ip_ranges: + ip_ranges = [ip.get("CidrIp") if ip.get("CidrIp") == "0.0.0.0/0" else ip] + security_rule = SecurityRule( ip_protocol, from_port, to_port, ip_ranges, source_groups ) + if security_rule in group.egress_rules: group.egress_rules.remove(security_rule) return security_rule @@ -3737,7 +3748,6 @@ class VPCEndPoint(TaggedEC2Resource): tag_specifications=None, private_dns_enabled=None, ): - self.id = id self.vpc_id = vpc_id self.service_name = service_name diff --git a/moto/ec2/responses/security_groups.py b/moto/ec2/responses/security_groups.py index f0002d5b..af84b773 100644 --- a/moto/ec2/responses/security_groups.py +++ b/moto/ec2/responses/security_groups.py @@ -20,7 +20,11 @@ def parse_sg_attributes_from_dict(sg_attributes): ip_ranges = [] ip_ranges_tree = sg_attributes.get("IpRanges") or {} for ip_range_idx in sorted(ip_ranges_tree.keys()): - ip_ranges.append(ip_ranges_tree[ip_range_idx]["CidrIp"][0]) + ip_range = {"CidrIp": ip_ranges_tree[ip_range_idx]["CidrIp"][0]} + if ip_ranges_tree[ip_range_idx].get("Description"): + ip_range["Description"] = ip_ranges_tree[ip_range_idx].get("Description")[0] + + ip_ranges.append(ip_range) source_groups = [] source_group_ids = [] @@ -61,6 +65,7 @@ class SecurityGroups(BaseResponse): source_groups, source_group_ids, ) = parse_sg_attributes_from_dict(querytree) + yield ( group_name_or_id, ip_protocol, @@ -211,7 +216,10 @@ DESCRIBE_SECURITY_GROUPS_RESPONSE = ( {% for ip_range in rule.ip_ranges %} - {{ ip_range }} + {{ ip_range['CidrIp'] }} + {% if ip_range['Description'] %} + {{ ip_range['Description'] }} + {% endif %} {% endfor %} diff --git a/tests/test_ec2/test_security_groups.py b/tests/test_ec2/test_security_groups.py index 7e936b7a..90f39550 100644 --- a/tests/test_ec2/test_security_groups.py +++ b/tests/test_ec2/test_security_groups.py @@ -1,6 +1,7 @@ from __future__ import unicode_literals import copy +import json # Ensure 'assert_raises' context manager support for Python 2.6 import tests.backport_assert_raises # noqa @@ -272,9 +273,10 @@ def test_authorize_ip_range_and_revoke(): # There are two egress rules associated with the security group: # the default outbound rule and the new one int(egress_security_group.rules_egress[1].to_port).should.equal(2222) - egress_security_group.rules_egress[1].grants[0].cidr_ip.should.equal( - "123.123.123.123/32" - ) + actual_cidr = egress_security_group.rules_egress[1].grants[0].cidr_ip + # Deal with Python2 dict->unicode, instead of dict->string + actual_cidr = json.loads(actual_cidr.replace("u'", "'").replace("'", '"')) + actual_cidr.should.equal({"CidrIp": "123.123.123.123/32"}) # Wrong Cidr should throw error egress_security_group.revoke.when.called_with( @@ -690,6 +692,68 @@ def test_add_same_rule_twice_throws_error(): sg.authorize_ingress(IpPermissions=ip_permissions) +@mock_ec2 +def test_description_in_ip_permissions(): + ec2 = boto3.resource("ec2", region_name="us-west-1") + conn = boto3.client("ec2", region_name="us-east-1") + vpc = ec2.create_vpc(CidrBlock="10.0.0.0/16") + sg = conn.create_security_group( + GroupName="sg1", Description="Test security group sg1", VpcId=vpc.id + ) + + ip_permissions = [ + { + "IpProtocol": "tcp", + "FromPort": 27017, + "ToPort": 27017, + "IpRanges": [{"CidrIp": "1.2.3.4/32", "Description": "testDescription"}], + } + ] + conn.authorize_security_group_ingress( + GroupId=sg["GroupId"], IpPermissions=ip_permissions + ) + + result = conn.describe_security_groups(GroupIds=[sg["GroupId"]]) + + assert ( + result["SecurityGroups"][0]["IpPermissions"][0]["IpRanges"][0]["Description"] + == "testDescription" + ) + assert ( + result["SecurityGroups"][0]["IpPermissions"][0]["IpRanges"][0]["CidrIp"] + == "1.2.3.4/32" + ) + + sg = conn.create_security_group( + GroupName="sg2", Description="Test security group sg1", VpcId=vpc.id + ) + + ip_permissions = [ + { + "IpProtocol": "tcp", + "FromPort": 27017, + "ToPort": 27017, + "IpRanges": [{"CidrIp": "1.2.3.4/32"}], + } + ] + conn.authorize_security_group_ingress( + GroupId=sg["GroupId"], IpPermissions=ip_permissions + ) + + result = conn.describe_security_groups(GroupIds=[sg["GroupId"]]) + + assert ( + result["SecurityGroups"][0]["IpPermissions"][0]["IpRanges"][0].get( + "Description" + ) + is None + ) + assert ( + result["SecurityGroups"][0]["IpPermissions"][0]["IpRanges"][0]["CidrIp"] + == "1.2.3.4/32" + ) + + @mock_ec2 def test_security_group_tagging_boto3(): conn = boto3.client("ec2", region_name="us-east-1") @@ -868,7 +932,7 @@ def test_revoke_security_group_egress(): { "FromPort": 0, "IpProtocol": "-1", - "IpRanges": [{"CidrIp": "0.0.0.0/0"},], + "IpRanges": [{"CidrIp": "0.0.0.0/0"}], "ToPort": 123, }, ]