Add dynamodb continuous backups (#2976)

* remove print statement

* Add dynamodb.describe_continuous_backups

* Add dynamodb.update_continuous_backups

* Fix Python 2 timestamp error
This commit is contained in:
Anton Grübel 2020-05-08 16:57:48 +02:00 committed by GitHub
commit 65e790c4eb
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 198 additions and 1 deletions

View file

@ -316,6 +316,12 @@ class Table(BaseModel):
}
self.set_stream_specification(streams)
self.lambda_event_source_mappings = {}
self.continuous_backups = {
"ContinuousBackupsStatus": "ENABLED", # One of 'ENABLED'|'DISABLED', it's enabled by default
"PointInTimeRecoveryDescription": {
"PointInTimeRecoveryStatus": "DISABLED" # One of 'ENABLED'|'DISABLED'
},
}
@classmethod
def create_from_cloudformation_json(
@ -1246,6 +1252,33 @@ class DynamoDBBackend(BaseBackend):
self.tables = original_table_state
raise
def describe_continuous_backups(self, table_name):
table = self.get_table(table_name)
return table.continuous_backups
def update_continuous_backups(self, table_name, point_in_time_spec):
table = self.get_table(table_name)
if (
point_in_time_spec["PointInTimeRecoveryEnabled"]
and table.continuous_backups["PointInTimeRecoveryDescription"][
"PointInTimeRecoveryStatus"
]
== "DISABLED"
):
table.continuous_backups["PointInTimeRecoveryDescription"] = {
"PointInTimeRecoveryStatus": "ENABLED",
"EarliestRestorableDateTime": unix_time(),
"LatestRestorableDateTime": unix_time(),
}
elif not point_in_time_spec["PointInTimeRecoveryEnabled"]:
table.continuous_backups["PointInTimeRecoveryDescription"] = {
"PointInTimeRecoveryStatus": "DISABLED"
}
return table.continuous_backups
dynamodb_backends = {}
for region in Session().get_available_regions("dynamodb"):

View file

@ -936,3 +936,32 @@ class DynamoHandler(BaseResponse):
)
response = {"ConsumedCapacity": [], "ItemCollectionMetrics": {}}
return dynamo_json_dump(response)
def describe_continuous_backups(self):
name = self.body["TableName"]
if self.dynamodb_backend.get_table(name) is None:
return self.error(
"com.amazonaws.dynamodb.v20111205#TableNotFoundException",
"Table not found: {}".format(name),
)
response = self.dynamodb_backend.describe_continuous_backups(name)
return json.dumps({"ContinuousBackupsDescription": response})
def update_continuous_backups(self):
name = self.body["TableName"]
point_in_time_spec = self.body["PointInTimeRecoverySpecification"]
if self.dynamodb_backend.get_table(name) is None:
return self.error(
"com.amazonaws.dynamodb.v20111205#TableNotFoundException",
"Table not found: {}".format(name),
)
response = self.dynamodb_backend.update_continuous_backups(
name, point_in_time_spec
)
return json.dumps({"ContinuousBackupsDescription": response})