Implement Redshift Taggable Resources (#1217)

- Implement create_tags, describe_tags, and delete_tags endpoints
- Clusters, Parameter Groups, Security Groups, Snapshots, and Subnet Groups can all be tagged
- Test Suite updated
- Minor clean-up of restore_from_cluster_snapshot endpoint
- Miscellaneous typo fixes
This commit is contained in:
Brian Pandola 2017-09-27 17:18:28 -07:00 committed by Jack Danger
commit 5bb6b98f6d
4 changed files with 597 additions and 94 deletions

View file

@ -71,3 +71,25 @@ class ClusterSnapshotAlreadyExistsError(RedshiftClientError):
'ClusterSnapshotAlreadyExists',
"Cannot create the snapshot because a snapshot with the "
"identifier {0} already exists".format(snapshot_identifier))
class InvalidParameterValueError(RedshiftClientError):
def __init__(self, message):
super(InvalidParameterValueError, self).__init__(
'InvalidParameterValue',
message)
class ResourceNotFoundFaultError(RedshiftClientError):
code = 404
def __init__(self, resource_type=None, resource_name=None, message=None):
if resource_type and not resource_name:
msg = "resource of type '{0}' not found.".format(resource_type)
else:
msg = "{0} ({1}) not found.".format(resource_type, resource_name)
if message:
msg = message
super(ResourceNotFoundFaultError, self).__init__(
'ResourceNotFoundFault', msg)

View file

@ -15,11 +15,51 @@ from .exceptions import (
ClusterSnapshotAlreadyExistsError,
ClusterSnapshotNotFoundError,
ClusterSubnetGroupNotFoundError,
InvalidParameterValueError,
InvalidSubnetError,
ResourceNotFoundFaultError
)
class Cluster(BaseModel):
ACCOUNT_ID = 123456789012
class TaggableResourceMixin(object):
resource_type = None
def __init__(self, region_name, tags):
self.region = region_name
self.tags = tags or []
@property
def resource_id(self):
return None
@property
def arn(self):
return "arn:aws:redshift:{region}:{account_id}:{resource_type}:{resource_id}".format(
region=self.region,
account_id=ACCOUNT_ID,
resource_type=self.resource_type,
resource_id=self.resource_id)
def create_tags(self, tags):
new_keys = [tag_set['Key'] for tag_set in tags]
self.tags = [tag_set for tag_set in self.tags
if tag_set['Key'] not in new_keys]
self.tags.extend(tags)
return self.tags
def delete_tags(self, tag_keys):
self.tags = [tag_set for tag_set in self.tags
if tag_set['Key'] not in tag_keys]
return self.tags
class Cluster(TaggableResourceMixin, BaseModel):
resource_type = 'cluster'
def __init__(self, redshift_backend, cluster_identifier, node_type, master_username,
master_user_password, db_name, cluster_type, cluster_security_groups,
@ -27,7 +67,8 @@ class Cluster(BaseModel):
preferred_maintenance_window, cluster_parameter_group_name,
automated_snapshot_retention_period, port, cluster_version,
allow_version_upgrade, number_of_nodes, publicly_accessible,
encrypted, region):
encrypted, region_name, tags=None):
super(Cluster, self).__init__(region_name, tags)
self.redshift_backend = redshift_backend
self.cluster_identifier = cluster_identifier
self.status = 'available'
@ -57,13 +98,12 @@ class Cluster(BaseModel):
else:
self.cluster_security_groups = ["Default"]
self.region = region
if availability_zone:
self.availability_zone = availability_zone
else:
# This could probably be smarter, but there doesn't appear to be a
# way to pull AZs for a region in boto
self.availability_zone = region + "a"
self.availability_zone = region_name + "a"
if cluster_type == 'single-node':
self.number_of_nodes = 1
@ -106,7 +146,7 @@ class Cluster(BaseModel):
number_of_nodes=properties.get('NumberOfNodes'),
publicly_accessible=properties.get("PubliclyAccessible"),
encrypted=properties.get("Encrypted"),
region=region_name,
region_name=region_name,
)
return cluster
@ -149,6 +189,10 @@ class Cluster(BaseModel):
if parameter_group.cluster_parameter_group_name in self.cluster_parameter_group_name
]
@property
def resource_id(self):
return self.cluster_identifier
def to_json(self):
return {
"MasterUsername": self.master_username,
@ -180,18 +224,21 @@ class Cluster(BaseModel):
"ClusterIdentifier": self.cluster_identifier,
"AllowVersionUpgrade": self.allow_version_upgrade,
"Endpoint": {
"Address": '{}.{}.redshift.amazonaws.com'.format(
self.cluster_identifier,
self.region),
"Address": self.endpoint,
"Port": self.port
},
"PendingModifiedValues": []
"PendingModifiedValues": [],
"Tags": self.tags
}
class SubnetGroup(BaseModel):
class SubnetGroup(TaggableResourceMixin, BaseModel):
def __init__(self, ec2_backend, cluster_subnet_group_name, description, subnet_ids):
resource_type = 'subnetgroup'
def __init__(self, ec2_backend, cluster_subnet_group_name, description, subnet_ids,
region_name, tags=None):
super(SubnetGroup, self).__init__(region_name, tags)
self.ec2_backend = ec2_backend
self.cluster_subnet_group_name = cluster_subnet_group_name
self.description = description
@ -208,6 +255,7 @@ class SubnetGroup(BaseModel):
cluster_subnet_group_name=resource_name,
description=properties.get("Description"),
subnet_ids=properties.get("SubnetIds", []),
region_name=region_name
)
return subnet_group
@ -219,6 +267,10 @@ class SubnetGroup(BaseModel):
def vpc_id(self):
return self.subnets[0].vpc_id
@property
def resource_id(self):
return self.cluster_subnet_group_name
def to_json(self):
return {
"VpcId": self.vpc_id,
@ -232,27 +284,39 @@ class SubnetGroup(BaseModel):
"Name": subnet.availability_zone
},
} for subnet in self.subnets],
"Tags": self.tags
}
class SecurityGroup(BaseModel):
class SecurityGroup(TaggableResourceMixin, BaseModel):
def __init__(self, cluster_security_group_name, description):
resource_type = 'securitygroup'
def __init__(self, cluster_security_group_name, description, region_name, tags=None):
super(SecurityGroup, self).__init__(region_name, tags)
self.cluster_security_group_name = cluster_security_group_name
self.description = description
@property
def resource_id(self):
return self.cluster_security_group_name
def to_json(self):
return {
"EC2SecurityGroups": [],
"IPRanges": [],
"Description": self.description,
"ClusterSecurityGroupName": self.cluster_security_group_name,
"Tags": self.tags
}
class ParameterGroup(BaseModel):
class ParameterGroup(TaggableResourceMixin, BaseModel):
def __init__(self, cluster_parameter_group_name, group_family, description):
resource_type = 'parametergroup'
def __init__(self, cluster_parameter_group_name, group_family, description, region_name, tags=None):
super(ParameterGroup, self).__init__(region_name, tags)
self.cluster_parameter_group_name = cluster_parameter_group_name
self.group_family = group_family
self.description = description
@ -266,34 +330,41 @@ class ParameterGroup(BaseModel):
cluster_parameter_group_name=resource_name,
description=properties.get("Description"),
group_family=properties.get("ParameterGroupFamily"),
region_name=region_name
)
return parameter_group
@property
def resource_id(self):
return self.cluster_parameter_group_name
def to_json(self):
return {
"ParameterGroupFamily": self.group_family,
"Description": self.description,
"ParameterGroupName": self.cluster_parameter_group_name,
"Tags": self.tags
}
class Snapshot(BaseModel):
class Snapshot(TaggableResourceMixin, BaseModel):
def __init__(self, cluster, snapshot_identifier, tags=None):
resource_type = 'snapshot'
def __init__(self, cluster, snapshot_identifier, region_name, tags=None):
super(Snapshot, self).__init__(region_name, tags)
self.cluster = copy.copy(cluster)
self.snapshot_identifier = snapshot_identifier
self.snapshot_type = 'manual'
self.status = 'available'
self.tags = tags or []
self.create_time = iso_8601_datetime_with_milliseconds(
datetime.datetime.now())
@property
def arn(self):
return "arn:aws:redshift:{0}:1234567890:snapshot:{1}/{2}".format(
self.cluster.region,
self.cluster.cluster_identifier,
self.snapshot_identifier)
def resource_id(self):
return "{cluster_id}/{snapshot_id}".format(
cluster_id=self.cluster.cluster_identifier,
snapshot_id=self.snapshot_identifier)
def to_json(self):
return {
@ -315,26 +386,36 @@ class Snapshot(BaseModel):
class RedshiftBackend(BaseBackend):
def __init__(self, ec2_backend):
def __init__(self, ec2_backend, region_name):
self.region = region_name
self.clusters = {}
self.subnet_groups = {}
self.security_groups = {
"Default": SecurityGroup("Default", "Default Redshift Security Group")
"Default": SecurityGroup("Default", "Default Redshift Security Group", self.region)
}
self.parameter_groups = {
"default.redshift-1.0": ParameterGroup(
"default.redshift-1.0",
"redshift-1.0",
"Default Redshift parameter group",
self.region
)
}
self.ec2_backend = ec2_backend
self.snapshots = OrderedDict()
self.RESOURCE_TYPE_MAP = {
'cluster': self.clusters,
'parametergroup': self.parameter_groups,
'securitygroup': self.security_groups,
'snapshot': self.snapshots,
'subnetgroup': self.subnet_groups
}
def reset(self):
ec2_backend = self.ec2_backend
region_name = self.region
self.__dict__ = {}
self.__init__(ec2_backend)
self.__init__(ec2_backend, region_name)
def create_cluster(self, **cluster_kwargs):
cluster_identifier = cluster_kwargs['cluster_identifier']
@ -373,9 +454,10 @@ class RedshiftBackend(BaseBackend):
return self.clusters.pop(cluster_identifier)
raise ClusterNotFoundError(cluster_identifier)
def create_cluster_subnet_group(self, cluster_subnet_group_name, description, subnet_ids):
def create_cluster_subnet_group(self, cluster_subnet_group_name, description, subnet_ids,
region_name, tags=None):
subnet_group = SubnetGroup(
self.ec2_backend, cluster_subnet_group_name, description, subnet_ids)
self.ec2_backend, cluster_subnet_group_name, description, subnet_ids, region_name, tags)
self.subnet_groups[cluster_subnet_group_name] = subnet_group
return subnet_group
@ -393,9 +475,9 @@ class RedshiftBackend(BaseBackend):
return self.subnet_groups.pop(subnet_identifier)
raise ClusterSubnetGroupNotFoundError(subnet_identifier)
def create_cluster_security_group(self, cluster_security_group_name, description):
def create_cluster_security_group(self, cluster_security_group_name, description, region_name, tags=None):
security_group = SecurityGroup(
cluster_security_group_name, description)
cluster_security_group_name, description, region_name, tags)
self.security_groups[cluster_security_group_name] = security_group
return security_group
@ -414,9 +496,9 @@ class RedshiftBackend(BaseBackend):
raise ClusterSecurityGroupNotFoundError(security_group_identifier)
def create_cluster_parameter_group(self, cluster_parameter_group_name,
group_family, description):
group_family, description, region_name, tags=None):
parameter_group = ParameterGroup(
cluster_parameter_group_name, group_family, description)
cluster_parameter_group_name, group_family, description, region_name, tags)
self.parameter_groups[cluster_parameter_group_name] = parameter_group
return parameter_group
@ -435,17 +517,17 @@ class RedshiftBackend(BaseBackend):
return self.parameter_groups.pop(parameter_group_name)
raise ClusterParameterGroupNotFoundError(parameter_group_name)
def create_snapshot(self, cluster_identifier, snapshot_identifier, tags):
def create_cluster_snapshot(self, cluster_identifier, snapshot_identifier, region_name, tags):
cluster = self.clusters.get(cluster_identifier)
if not cluster:
raise ClusterNotFoundError(cluster_identifier)
if self.snapshots.get(snapshot_identifier) is not None:
raise ClusterSnapshotAlreadyExistsError(snapshot_identifier)
snapshot = Snapshot(cluster, snapshot_identifier, tags)
snapshot = Snapshot(cluster, snapshot_identifier, region_name, tags)
self.snapshots[snapshot_identifier] = snapshot
return snapshot
def describe_snapshots(self, cluster_identifier, snapshot_identifier):
def describe_cluster_snapshots(self, cluster_identifier=None, snapshot_identifier=None):
if cluster_identifier:
for snapshot in self.snapshots.values():
if snapshot.cluster.cluster_identifier == cluster_identifier:
@ -459,7 +541,7 @@ class RedshiftBackend(BaseBackend):
return self.snapshots.values()
def delete_snapshot(self, snapshot_identifier):
def delete_cluster_snapshot(self, snapshot_identifier):
if snapshot_identifier not in self.snapshots:
raise ClusterSnapshotNotFoundError(snapshot_identifier)
@ -467,23 +549,105 @@ class RedshiftBackend(BaseBackend):
deleted_snapshot.status = 'deleted'
return deleted_snapshot
def describe_tags_for_resource_type(self, resource_type):
def restore_from_cluster_snapshot(self, **kwargs):
snapshot_identifier = kwargs.pop('snapshot_identifier')
snapshot = self.describe_cluster_snapshots(snapshot_identifier=snapshot_identifier)[0]
create_kwargs = {
"node_type": snapshot.cluster.node_type,
"master_username": snapshot.cluster.master_username,
"master_user_password": snapshot.cluster.master_user_password,
"db_name": snapshot.cluster.db_name,
"cluster_type": 'multi-node' if snapshot.cluster.number_of_nodes > 1 else 'single-node',
"availability_zone": snapshot.cluster.availability_zone,
"port": snapshot.cluster.port,
"cluster_version": snapshot.cluster.cluster_version,
"number_of_nodes": snapshot.cluster.number_of_nodes,
"encrypted": snapshot.cluster.encrypted,
"tags": snapshot.cluster.tags
}
create_kwargs.update(kwargs)
return self.create_cluster(**create_kwargs)
def _get_resource_from_arn(self, arn):
try:
arn_breakdown = arn.split(':')
resource_type = arn_breakdown[5]
if resource_type == 'snapshot':
resource_id = arn_breakdown[6].split('/')[1]
else:
resource_id = arn_breakdown[6]
except IndexError:
resource_type = resource_id = arn
resources = self.RESOURCE_TYPE_MAP.get(resource_type)
if resources is None:
message = (
"Tagging is not supported for this type of resource: '{0}' "
"(the ARN is potentially malformed, please check the ARN "
"documentation for more information)".format(resource_type))
raise ResourceNotFoundFaultError(message=message)
try:
resource = resources[resource_id]
except KeyError:
raise ResourceNotFoundFaultError(resource_type, resource_id)
else:
return resource
@staticmethod
def _describe_tags_for_resources(resources):
tagged_resources = []
if resource_type == 'Snapshot':
for snapshot in self.snapshots.values():
for tag in snapshot.tags:
data = {
'ResourceName': snapshot.arn,
'ResourceType': 'snapshot',
'Tag': {
'Key': tag['Key'],
'Value': tag['Value']
}
for resource in resources:
for tag in resource.tags:
data = {
'ResourceName': resource.arn,
'ResourceType': resource.resource_type,
'Tag': {
'Key': tag['Key'],
'Value': tag['Value']
}
tagged_resources.append(data)
}
tagged_resources.append(data)
return tagged_resources
def _describe_tags_for_resource_type(self, resource_type):
resources = self.RESOURCE_TYPE_MAP.get(resource_type)
if not resources:
raise ResourceNotFoundFaultError(resource_type=resource_type)
return self._describe_tags_for_resources(resources.values())
def _describe_tags_for_resource_name(self, resource_name):
resource = self._get_resource_from_arn(resource_name)
return self._describe_tags_for_resources([resource])
def create_tags(self, resource_name, tags):
resource = self._get_resource_from_arn(resource_name)
resource.create_tags(tags)
def describe_tags(self, resource_name, resource_type):
if resource_name and resource_type:
raise InvalidParameterValueError(
"You cannot filter a list of resources using an Amazon "
"Resource Name (ARN) and a resource type together in the "
"same request. Retry the request using either an ARN or "
"a resource type, but not both.")
if resource_type:
return self._describe_tags_for_resource_type(resource_type.lower())
if resource_name:
return self._describe_tags_for_resource_name(resource_name)
# If name and type are not specified, return all tagged resources.
# TODO: Implement aws marker pagination
tagged_resources = []
for resource_type in self.RESOURCE_TYPE_MAP:
try:
tagged_resources += self._describe_tags_for_resource_type(resource_type)
except ResourceNotFoundFaultError:
pass
return tagged_resources
def delete_tags(self, resource_name, tag_keys):
resource = self._get_resource_from_arn(resource_name)
resource.delete_tags(tag_keys)
redshift_backends = {}
for region in boto.redshift.regions():
redshift_backends[region.name] = RedshiftBackend(ec2_backends[region.name])
redshift_backends[region.name] = RedshiftBackend(ec2_backends[region.name], region.name)

View file

@ -57,6 +57,15 @@ class RedshiftResponse(BaseResponse):
count += 1
return unpacked_list
def unpack_list_params(self, label):
unpacked_list = list()
count = 1
while self._get_param('{0}.{1}'.format(label, count)):
unpacked_list.append(self._get_param(
'{0}.{1}'.format(label, count)))
count += 1
return unpacked_list
def create_cluster(self):
cluster_kwargs = {
"cluster_identifier": self._get_param('ClusterIdentifier'),
@ -78,7 +87,8 @@ class RedshiftResponse(BaseResponse):
"number_of_nodes": self._get_int_param('NumberOfNodes'),
"publicly_accessible": self._get_param("PubliclyAccessible"),
"encrypted": self._get_param("Encrypted"),
"region": self.region,
"region_name": self.region,
"tags": self.unpack_complex_list_params('Tags.Tag', ('Key', 'Value'))
}
cluster = self.redshift_backend.create_cluster(**cluster_kwargs).to_json()
cluster['ClusterStatus'] = 'creating'
@ -94,23 +104,8 @@ class RedshiftResponse(BaseResponse):
})
def restore_from_cluster_snapshot(self):
snapshot_identifier = self._get_param('SnapshotIdentifier')
snapshots = self.redshift_backend.describe_snapshots(
None,
snapshot_identifier)
snapshot = snapshots[0]
kwargs_from_snapshot = {
"node_type": snapshot.cluster.node_type,
"master_username": snapshot.cluster.master_username,
"master_user_password": snapshot.cluster.master_user_password,
"db_name": snapshot.cluster.db_name,
"cluster_type": 'multi-node' if snapshot.cluster.number_of_nodes > 1 else 'single-node',
"availability_zone": snapshot.cluster.availability_zone,
"port": snapshot.cluster.port,
"cluster_version": snapshot.cluster.cluster_version,
"number_of_nodes": snapshot.cluster.number_of_nodes,
}
kwargs_from_request = {
restore_kwargs = {
"snapshot_identifier": self._get_param('SnapshotIdentifier'),
"cluster_identifier": self._get_param('ClusterIdentifier'),
"port": self._get_int_param('Port'),
"availability_zone": self._get_param('AvailabilityZone'),
@ -129,12 +124,9 @@ class RedshiftResponse(BaseResponse):
'PreferredMaintenanceWindow'),
"automated_snapshot_retention_period": self._get_int_param(
'AutomatedSnapshotRetentionPeriod'),
"region": self.region,
"encrypted": False,
"region_name": self.region,
}
kwargs_from_snapshot.update(kwargs_from_request)
cluster_kwargs = kwargs_from_snapshot
cluster = self.redshift_backend.create_cluster(**cluster_kwargs).to_json()
cluster = self.redshift_backend.restore_from_cluster_snapshot(**restore_kwargs).to_json()
cluster['ClusterStatus'] = 'creating'
return self.get_response({
"RestoreFromClusterSnapshotResponse": {
@ -230,11 +222,14 @@ class RedshiftResponse(BaseResponse):
# according to the AWS documentation
if not subnet_ids:
subnet_ids = self._get_multi_param('SubnetIds.SubnetIdentifier')
tags = self.unpack_complex_list_params('Tags.Tag', ('Key', 'Value'))
subnet_group = self.redshift_backend.create_cluster_subnet_group(
cluster_subnet_group_name=cluster_subnet_group_name,
description=description,
subnet_ids=subnet_ids,
region_name=self.region,
tags=tags
)
return self.get_response({
@ -280,10 +275,13 @@ class RedshiftResponse(BaseResponse):
cluster_security_group_name = self._get_param(
'ClusterSecurityGroupName')
description = self._get_param('Description')
tags = self.unpack_complex_list_params('Tags.Tag', ('Key', 'Value'))
security_group = self.redshift_backend.create_cluster_security_group(
cluster_security_group_name=cluster_security_group_name,
description=description,
region_name=self.region,
tags=tags
)
return self.get_response({
@ -331,11 +329,14 @@ class RedshiftResponse(BaseResponse):
cluster_parameter_group_name = self._get_param('ParameterGroupName')
group_family = self._get_param('ParameterGroupFamily')
description = self._get_param('Description')
tags = self.unpack_complex_list_params('Tags.Tag', ('Key', 'Value'))
parameter_group = self.redshift_backend.create_cluster_parameter_group(
cluster_parameter_group_name,
group_family,
description,
self.region,
tags
)
return self.get_response({
@ -381,11 +382,12 @@ class RedshiftResponse(BaseResponse):
def create_cluster_snapshot(self):
cluster_identifier = self._get_param('ClusterIdentifier')
snapshot_identifier = self._get_param('SnapshotIdentifier')
tags = self.unpack_complex_list_params(
'Tags.Tag', ('Key', 'Value'))
snapshot = self.redshift_backend.create_snapshot(cluster_identifier,
snapshot_identifier,
tags)
tags = self.unpack_complex_list_params('Tags.Tag', ('Key', 'Value'))
snapshot = self.redshift_backend.create_cluster_snapshot(cluster_identifier,
snapshot_identifier,
self.region,
tags)
return self.get_response({
'CreateClusterSnapshotResponse': {
"CreateClusterSnapshotResult": {
@ -399,9 +401,9 @@ class RedshiftResponse(BaseResponse):
def describe_cluster_snapshots(self):
cluster_identifier = self._get_param('ClusterIdentifier')
snapshot_identifier = self._get_param('DBSnapshotIdentifier')
snapshots = self.redshift_backend.describe_snapshots(cluster_identifier,
snapshot_identifier)
snapshot_identifier = self._get_param('SnapshotIdentifier')
snapshots = self.redshift_backend.describe_cluster_snapshots(cluster_identifier,
snapshot_identifier)
return self.get_response({
"DescribeClusterSnapshotsResponse": {
"DescribeClusterSnapshotsResult": {
@ -415,7 +417,7 @@ class RedshiftResponse(BaseResponse):
def delete_cluster_snapshot(self):
snapshot_identifier = self._get_param('SnapshotIdentifier')
snapshot = self.redshift_backend.delete_snapshot(snapshot_identifier)
snapshot = self.redshift_backend.delete_cluster_snapshot(snapshot_identifier)
return self.get_response({
"DeleteClusterSnapshotResponse": {
@ -428,13 +430,26 @@ class RedshiftResponse(BaseResponse):
}
})
def create_tags(self):
resource_name = self._get_param('ResourceName')
tags = self.unpack_complex_list_params('Tags.Tag', ('Key', 'Value'))
self.redshift_backend.create_tags(resource_name, tags)
return self.get_response({
"CreateTagsResponse": {
"ResponseMetadata": {
"RequestId": "384ac68d-3775-11df-8963-01868b7c937a",
}
}
})
def describe_tags(self):
resource_name = self._get_param('ResourceName')
resource_type = self._get_param('ResourceType')
if resource_type != 'Snapshot':
raise NotImplementedError(
"The describe_tags action has not been fully implemented.")
tagged_resources = \
self.redshift_backend.describe_tags_for_resource_type(resource_type)
tagged_resources = self.redshift_backend.describe_tags(resource_name,
resource_type)
return self.get_response({
"DescribeTagsResponse": {
"DescribeTagsResult": {
@ -445,3 +460,17 @@ class RedshiftResponse(BaseResponse):
}
}
})
def delete_tags(self):
resource_name = self._get_param('ResourceName')
tag_keys = self.unpack_list_params('TagKeys.TagKey')
self.redshift_backend.delete_tags(resource_name, tag_keys)
return self.get_response({
"DeleteTagsResponse": {
"ResponseMetadata": {
"RequestId": "384ac68d-3775-11df-8963-01868b7c937a",
}
}
})