Fix:EC2-authorize_security_group_ingress- add description to IP-Ranges (#3196)

* Fix:EC2-authorize_security_group_ingress- add description to IP-Ranges

* Fix:EC2-authorize_security_group_ingress- add test when description is not present.

* part commit

* Fix:fixed build errors

* Linting

* Allow for Python2 string/unicodes

Co-authored-by: usmankb <usman@krazybee.com>
Co-authored-by: Bert Blommers <info@bertblommers.nl>
This commit is contained in:
usmangani1 2020-08-04 11:20:57 +05:30 committed by GitHub
commit a7ddcd7da3
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 96 additions and 14 deletions

View file

@ -160,7 +160,6 @@ AMIS = _load_resource(
or resource_filename(__name__, "resources/amis.json"),
)
OWNER_ID = ACCOUNT_ID
@ -1405,7 +1404,6 @@ class Ami(TaggedEC2Resource):
class AmiBackend(object):
AMI_REGEX = re.compile("ami-[a-z0-9]+")
def __init__(self):
@ -2118,11 +2116,16 @@ class SecurityGroupBackend(object):
vpc_id=None,
):
group = self.get_security_group_by_name_or_id(group_name_or_id, vpc_id)
if ip_ranges and not isinstance(ip_ranges, list):
ip_ranges = [ip_ranges]
if ip_ranges:
if isinstance(ip_ranges, str) or (
six.PY2 and isinstance(ip_ranges, unicode) # noqa
):
ip_ranges = [{"CidrIp": str(ip_ranges)}]
elif not isinstance(ip_ranges, list):
ip_ranges = [json.loads(ip_ranges)]
if ip_ranges:
for cidr in ip_ranges:
if not is_valid_cidr(cidr):
if not is_valid_cidr(cidr["CidrIp"]):
raise InvalidCIDRSubnetError(cidr=cidr)
self._verify_group_will_respect_rule_count_limit(
@ -2200,10 +2203,14 @@ class SecurityGroupBackend(object):
group = self.get_security_group_by_name_or_id(group_name_or_id, vpc_id)
if ip_ranges and not isinstance(ip_ranges, list):
ip_ranges = [ip_ranges]
if isinstance(ip_ranges, str) and "CidrIp" not in ip_ranges:
ip_ranges = [{"CidrIp": ip_ranges}]
else:
ip_ranges = [json.loads(ip_ranges)]
if ip_ranges:
for cidr in ip_ranges:
if not is_valid_cidr(cidr):
if not is_valid_cidr(cidr["CidrIp"]):
raise InvalidCIDRSubnetError(cidr=cidr)
self._verify_group_will_respect_rule_count_limit(
@ -2259,9 +2266,13 @@ class SecurityGroupBackend(object):
if source_group:
source_groups.append(source_group)
for ip in ip_ranges:
ip_ranges = [ip.get("CidrIp") if ip.get("CidrIp") == "0.0.0.0/0" else ip]
security_rule = SecurityRule(
ip_protocol, from_port, to_port, ip_ranges, source_groups
)
if security_rule in group.egress_rules:
group.egress_rules.remove(security_rule)
return security_rule
@ -3737,7 +3748,6 @@ class VPCEndPoint(TaggedEC2Resource):
tag_specifications=None,
private_dns_enabled=None,
):
self.id = id
self.vpc_id = vpc_id
self.service_name = service_name

View file

@ -20,7 +20,11 @@ def parse_sg_attributes_from_dict(sg_attributes):
ip_ranges = []
ip_ranges_tree = sg_attributes.get("IpRanges") or {}
for ip_range_idx in sorted(ip_ranges_tree.keys()):
ip_ranges.append(ip_ranges_tree[ip_range_idx]["CidrIp"][0])
ip_range = {"CidrIp": ip_ranges_tree[ip_range_idx]["CidrIp"][0]}
if ip_ranges_tree[ip_range_idx].get("Description"):
ip_range["Description"] = ip_ranges_tree[ip_range_idx].get("Description")[0]
ip_ranges.append(ip_range)
source_groups = []
source_group_ids = []
@ -61,6 +65,7 @@ class SecurityGroups(BaseResponse):
source_groups,
source_group_ids,
) = parse_sg_attributes_from_dict(querytree)
yield (
group_name_or_id,
ip_protocol,
@ -211,7 +216,10 @@ DESCRIBE_SECURITY_GROUPS_RESPONSE = (
<ipRanges>
{% for ip_range in rule.ip_ranges %}
<item>
<cidrIp>{{ ip_range }}</cidrIp>
<cidrIp>{{ ip_range['CidrIp'] }}</cidrIp>
{% if ip_range['Description'] %}
<description>{{ ip_range['Description'] }}</description>
{% endif %}
</item>
{% endfor %}
</ipRanges>