Fix:Add functionality authorize-cluster-security-group-ingress (#3742)

* Fix:Add functionality  authorize-cluster-security-group-ingress

* Added tests

* Added more test cases
This commit is contained in:
usmangani1 2021-03-10 14:16:13 +05:30 committed by GitHub
commit 433e4c0733
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 100 additions and 0 deletions

View file

@ -157,3 +157,11 @@ class UnknownSnapshotCopyRegionFaultError(RedshiftClientError):
super(UnknownSnapshotCopyRegionFaultError, self).__init__(
"UnknownSnapshotCopyRegionFault", message
)
class ClusterSecurityGroupNotFoundFaultError(RedshiftClientError):
def __init__(self):
super(ClusterSecurityGroupNotFoundFaultError, self).__init__(
"ClusterSecurityGroupNotFoundFault",
"The cluster security group name does not refer to an existing cluster security group.",
)

View file

@ -28,6 +28,7 @@ from .exceptions import (
SnapshotCopyGrantAlreadyExistsFaultError,
SnapshotCopyGrantNotFoundFaultError,
UnknownSnapshotCopyRegionFaultError,
ClusterSecurityGroupNotFoundFaultError,
)
@ -423,6 +424,7 @@ class SecurityGroup(TaggableResourceMixin, BaseModel):
super(SecurityGroup, self).__init__(region_name, tags)
self.cluster_security_group_name = cluster_security_group_name
self.description = description
self.ingress_rules = []
@property
def resource_id(self):
@ -749,6 +751,16 @@ class RedshiftBackend(BaseBackend):
return self.security_groups.pop(security_group_identifier)
raise ClusterSecurityGroupNotFoundError(security_group_identifier)
def authorize_cluster_security_group_ingress(self, security_group_name, cidr_ip):
security_group = self.security_groups.get(security_group_name)
if not security_group:
raise ClusterSecurityGroupNotFoundFaultError()
# just adding the cidr_ip as ingress rule for now as there is no security rule
security_group.ingress_rules.append(cidr_ip)
return security_group
def create_cluster_parameter_group(
self,
cluster_parameter_group_name,

View file

@ -412,6 +412,34 @@ class RedshiftResponse(BaseResponse):
}
)
def authorize_cluster_security_group_ingress(self):
cluster_security_group_name = self._get_param("ClusterSecurityGroupName")
cidr_ip = self._get_param("CIDRIP")
security_group = self.redshift_backend.authorize_cluster_security_group_ingress(
cluster_security_group_name, cidr_ip
)
return self.get_response(
{
"AuthorizeClusterSecurityGroupIngressResponse": {
"AuthorizeClusterSecurityGroupIngressResult": {
"ClusterSecurityGroup": {
"ClusterSecurityGroupName": cluster_security_group_name,
"Description": security_group.description,
"IPRanges": [
{
"Status": "authorized",
"CIDRIP": cidr_ip,
"Tags": security_group.tags,
},
],
}
}
}
}
)
def create_cluster_parameter_group(self):
cluster_parameter_group_name = self._get_param("ParameterGroupName")
group_family = self._get_param("ParameterGroupFamily")