From 433e4c07335326b61ab6f6785907d2039ef67044 Mon Sep 17 00:00:00 2001 From: usmangani1 Date: Wed, 10 Mar 2021 14:16:13 +0530 Subject: [PATCH] Fix:Add functionality authorize-cluster-security-group-ingress (#3742) * Fix:Add functionality authorize-cluster-security-group-ingress * Added tests * Added more test cases --- moto/redshift/exceptions.py | 8 +++++ moto/redshift/models.py | 12 +++++++ moto/redshift/responses.py | 28 +++++++++++++++ tests/test_redshift/test_redshift.py | 52 ++++++++++++++++++++++++++++ 4 files changed, 100 insertions(+) diff --git a/moto/redshift/exceptions.py b/moto/redshift/exceptions.py index eb6cea99..02d93e7e 100644 --- a/moto/redshift/exceptions.py +++ b/moto/redshift/exceptions.py @@ -157,3 +157,11 @@ class UnknownSnapshotCopyRegionFaultError(RedshiftClientError): super(UnknownSnapshotCopyRegionFaultError, self).__init__( "UnknownSnapshotCopyRegionFault", message ) + + +class ClusterSecurityGroupNotFoundFaultError(RedshiftClientError): + def __init__(self): + super(ClusterSecurityGroupNotFoundFaultError, self).__init__( + "ClusterSecurityGroupNotFoundFault", + "The cluster security group name does not refer to an existing cluster security group.", + ) diff --git a/moto/redshift/models.py b/moto/redshift/models.py index 0574112f..4ff3aeb1 100644 --- a/moto/redshift/models.py +++ b/moto/redshift/models.py @@ -28,6 +28,7 @@ from .exceptions import ( SnapshotCopyGrantAlreadyExistsFaultError, SnapshotCopyGrantNotFoundFaultError, UnknownSnapshotCopyRegionFaultError, + ClusterSecurityGroupNotFoundFaultError, ) @@ -423,6 +424,7 @@ class SecurityGroup(TaggableResourceMixin, BaseModel): super(SecurityGroup, self).__init__(region_name, tags) self.cluster_security_group_name = cluster_security_group_name self.description = description + self.ingress_rules = [] @property def resource_id(self): @@ -749,6 +751,16 @@ class RedshiftBackend(BaseBackend): return self.security_groups.pop(security_group_identifier) raise ClusterSecurityGroupNotFoundError(security_group_identifier) + def authorize_cluster_security_group_ingress(self, security_group_name, cidr_ip): + security_group = self.security_groups.get(security_group_name) + if not security_group: + raise ClusterSecurityGroupNotFoundFaultError() + + # just adding the cidr_ip as ingress rule for now as there is no security rule + security_group.ingress_rules.append(cidr_ip) + + return security_group + def create_cluster_parameter_group( self, cluster_parameter_group_name, diff --git a/moto/redshift/responses.py b/moto/redshift/responses.py index 9397d5b4..cbd6d01a 100644 --- a/moto/redshift/responses.py +++ b/moto/redshift/responses.py @@ -412,6 +412,34 @@ class RedshiftResponse(BaseResponse): } ) + def authorize_cluster_security_group_ingress(self): + cluster_security_group_name = self._get_param("ClusterSecurityGroupName") + cidr_ip = self._get_param("CIDRIP") + + security_group = self.redshift_backend.authorize_cluster_security_group_ingress( + cluster_security_group_name, cidr_ip + ) + + return self.get_response( + { + "AuthorizeClusterSecurityGroupIngressResponse": { + "AuthorizeClusterSecurityGroupIngressResult": { + "ClusterSecurityGroup": { + "ClusterSecurityGroupName": cluster_security_group_name, + "Description": security_group.description, + "IPRanges": [ + { + "Status": "authorized", + "CIDRIP": cidr_ip, + "Tags": security_group.tags, + }, + ], + } + } + } + } + ) + def create_cluster_parameter_group(self): cluster_parameter_group_name = self._get_param("ParameterGroupName") group_family = self._get_param("ParameterGroupFamily") diff --git a/tests/test_redshift/test_redshift.py b/tests/test_redshift/test_redshift.py index 62613458..dfb3186a 100644 --- a/tests/test_redshift/test_redshift.py +++ b/tests/test_redshift/test_redshift.py @@ -616,6 +616,58 @@ def test_create_cluster_subnet_group(): set(subnet_ids).should.equal(set([subnet1.id, subnet2.id])) +@mock_redshift +def test_authorize_security_group_ingress(): + iam_roles_arn = ["arn:aws:iam:::role/my-iam-role"] + client = boto3.client("redshift", region_name="us-east-1") + cluster_identifier = "my_cluster" + + client.create_cluster( + ClusterIdentifier=cluster_identifier, + NodeType="single-node", + MasterUsername="username", + MasterUserPassword="password", + IamRoles=iam_roles_arn, + ) + + client.create_cluster_security_group( + ClusterSecurityGroupName="security_group", + Description="security_group_description", + ) + + response = client.authorize_cluster_security_group_ingress( + ClusterSecurityGroupName="security_group", CIDRIP="192.168.10.0/28" + ) + + assert ( + response.get("ClusterSecurityGroup").get("ClusterSecurityGroupName") + == "security_group" + ) + assert ( + response.get("ClusterSecurityGroup").get("Description") + == "security_group_description" + ) + assert ( + response.get("ClusterSecurityGroup").get("IPRanges")[0].get("Status") + == "authorized" + ) + assert ( + response.get("ClusterSecurityGroup").get("IPRanges")[0].get("CIDRIP") + == "192.168.10.0/28" + ) + + with pytest.raises(ClientError) as ex: + client.authorize_cluster_security_group_ingress( + ClusterSecurityGroupName="invalid_security_group", CIDRIP="192.168.10.0/28" + ) + assert ex.value.response["Error"]["Code"] == "ClusterSecurityGroupNotFoundFault" + + assert ( + ex.value.response["Error"]["Message"] + == "The cluster security group name does not refer to an existing cluster security group." + ) + + @mock_redshift_deprecated @mock_ec2_deprecated def test_create_invalid_cluster_subnet_group():