Improve implementation coverage (and layout)

This commit is contained in:
Bert Blommers 2020-05-12 14:58:35 +01:00
commit ddb5c30d34
14 changed files with 1243 additions and 387 deletions

View file

@ -49,9 +49,7 @@ mock_dynamodbstreams = lazy_load(".dynamodbstreams", "mock_dynamodbstreams")
mock_elasticbeanstalk = lazy_load(".elasticbeanstalk", "mock_elasticbeanstalk")
mock_ec2 = lazy_load(".ec2", "mock_ec2")
mock_ec2_deprecated = lazy_load(".ec2", "mock_ec2_deprecated")
mock_ec2_instance_connect = lazy_load(
".ec2_instance_connect", "mock_ec2_instance_connect"
)
mock_ec2instanceconnect = lazy_load(".ec2instanceconnect", "mock_ec2instanceconnect")
mock_ecr = lazy_load(".ecr", "mock_ecr")
mock_ecr_deprecated = lazy_load(".ecr", "mock_ecr_deprecated")
mock_ecs = lazy_load(".ecs", "mock_ecs")

View file

@ -21,7 +21,7 @@ BACKENDS = {
"dynamodb2": ("dynamodb2", "dynamodb_backends2"),
"dynamodbstreams": ("dynamodbstreams", "dynamodbstreams_backends"),
"ec2": ("ec2", "ec2_backends"),
"ec2_instance_connect": ("ec2_instance_connect", "ec2_instance_connect_backends"),
"ec2instanceconnect": ("ec2instanceconnect", "ec2instanceconnect_backends"),
"ecr": ("ecr", "ecr_backends"),
"ecs": ("ecs", "ecs_backends"),
"elasticbeanstalk": ("elasticbeanstalk", "eb_backends"),

View file

@ -824,6 +824,42 @@ class DynamoDBBackend(BaseBackend):
required_table = self.tables[table]
return required_table.tags
def list_tables(self, limit, exclusive_start_table_name):
all_tables = list(self.tables.keys())
if exclusive_start_table_name:
try:
last_table_index = all_tables.index(exclusive_start_table_name)
except ValueError:
start = len(all_tables)
else:
start = last_table_index + 1
else:
start = 0
if limit:
tables = all_tables[start : start + limit]
else:
tables = all_tables[start:]
if limit and len(all_tables) > start + limit:
return tables, tables[-1]
return tables, None
def describe_table(self, name):
table = self.tables[name]
return table.describe(base_key="Table")
def update_table(self, name, global_index, throughput, stream_spec):
table = self.get_table(name)
if global_index:
table = self.update_table_global_indexes(name, global_index)
if throughput:
table = self.update_table_throughput(name, throughput)
if stream_spec:
table = self.update_table_streams(name, stream_spec)
return table
def update_table_throughput(self, name, throughput):
table = self.tables[name]
table.throughput = throughput
@ -1134,7 +1170,7 @@ class DynamoDBBackend(BaseBackend):
return table.delete_item(hash_value, range_value)
def update_ttl(self, table_name, ttl_spec):
def update_time_to_live(self, table_name, ttl_spec):
table = self.tables.get(table_name)
if table is None:
raise JsonRESTError("ResourceNotFound", "Table not found")
@ -1151,7 +1187,7 @@ class DynamoDBBackend(BaseBackend):
table.ttl["TimeToLiveStatus"] = "DISABLED"
table.ttl["AttributeName"] = ttl_spec["AttributeName"]
def describe_ttl(self, table_name):
def describe_time_to_live(self, table_name):
table = self.tables.get(table_name)
if table is None:
raise JsonRESTError("ResourceNotFound", "Table not found")
@ -1246,6 +1282,21 @@ class DynamoDBBackend(BaseBackend):
self.tables = original_table_state
raise
######################
# LIST of methods where the logic completely resides in responses.py
# Duplicated here so that the implementation coverage script is aware
# TODO: Move logic here
######################
def batch_get_item(self):
pass
def batch_write_item(self):
pass
def transact_get_items(self):
pass
dynamodb_backends = {}
for region in Session().get_available_regions("dynamodb"):

View file

@ -92,27 +92,14 @@ class DynamoHandler(BaseResponse):
def list_tables(self):
body = self.body
limit = body.get("Limit", 100)
all_tables = list(self.dynamodb_backend.tables.keys())
exclusive_start_table_name = body.get("ExclusiveStartTableName")
if exclusive_start_table_name:
try:
last_table_index = all_tables.index(exclusive_start_table_name)
except ValueError:
start = len(all_tables)
else:
start = last_table_index + 1
else:
start = 0
if limit:
tables = all_tables[start : start + limit]
else:
tables = all_tables[start:]
tables, last_eval = self.dynamodb_backend.list_tables(
limit, exclusive_start_table_name
)
response = {"TableNames": tables}
if limit and len(all_tables) > start + limit:
response["LastEvaluatedTableName"] = tables[-1]
if last_eval:
response["LastEvaluatedTableName"] = last_eval
return dynamo_json_dump(response)
@ -232,33 +219,29 @@ class DynamoHandler(BaseResponse):
def update_table(self):
name = self.body["TableName"]
table = self.dynamodb_backend.get_table(name)
if "GlobalSecondaryIndexUpdates" in self.body:
table = self.dynamodb_backend.update_table_global_indexes(
name, self.body["GlobalSecondaryIndexUpdates"]
global_index = self.body.get("GlobalSecondaryIndexUpdates", None)
throughput = self.body.get("ProvisionedThroughput", None)
stream_spec = self.body.get("StreamSpecification", None)
try:
table = self.dynamodb_backend.update_table(
name=name,
global_index=global_index,
throughput=throughput,
stream_spec=stream_spec,
)
if "ProvisionedThroughput" in self.body:
throughput = self.body["ProvisionedThroughput"]
table = self.dynamodb_backend.update_table_throughput(name, throughput)
if "StreamSpecification" in self.body:
try:
table = self.dynamodb_backend.update_table_streams(
name, self.body["StreamSpecification"]
)
except ValueError:
er = "com.amazonaws.dynamodb.v20111205#ResourceInUseException"
return self.error(er, "Cannot enable stream")
return dynamo_json_dump(table.describe())
return dynamo_json_dump(table.describe())
except ValueError:
er = "com.amazonaws.dynamodb.v20111205#ResourceInUseException"
return self.error(er, "Cannot enable stream")
def describe_table(self):
name = self.body["TableName"]
try:
table = self.dynamodb_backend.tables[name]
table = self.dynamodb_backend.describe_table(name)
return dynamo_json_dump(table)
except KeyError:
er = "com.amazonaws.dynamodb.v20111205#ResourceNotFoundException"
return self.error(er, "Requested resource not found")
return dynamo_json_dump(table.describe(base_key="Table"))
def put_item(self):
name = self.body["TableName"]
@ -850,14 +833,14 @@ class DynamoHandler(BaseResponse):
name = self.body["TableName"]
ttl_spec = self.body["TimeToLiveSpecification"]
self.dynamodb_backend.update_ttl(name, ttl_spec)
self.dynamodb_backend.update_time_to_live(name, ttl_spec)
return json.dumps({"TimeToLiveSpecification": ttl_spec})
def describe_time_to_live(self):
name = self.body["TableName"]
ttl_spec = self.dynamodb_backend.describe_ttl(name)
ttl_spec = self.dynamodb_backend.describe_time_to_live(name)
return json.dumps({"TimeToLiveDescription": ttl_spec})

View file

@ -1,4 +0,0 @@
from ..core.models import base_decorator
from .models import ec2_instance_connect_backends
mock_ec2_instance_connect = base_decorator(ec2_instance_connect_backends)

View file

@ -1,11 +0,0 @@
import boto.ec2
from moto.core import BaseBackend
class Ec2InstanceConnectBackend(BaseBackend):
pass
ec2_instance_connect_backends = {}
for region in boto.ec2.regions():
ec2_instance_connect_backends[region.name] = Ec2InstanceConnectBackend()

View file

@ -1,9 +0,0 @@
import json
from moto.core.responses import BaseResponse
class Ec2InstanceConnectResponse(BaseResponse):
def send_ssh_public_key(self):
return json.dumps(
{"RequestId": "example-2a47-4c91-9700-e37e85162cb6", "Success": True}
)

View file

@ -0,0 +1,4 @@
from ..core.models import base_decorator
from .models import ec2instanceconnect_backends
mock_ec2instanceconnect = base_decorator(ec2instanceconnect_backends)

View file

@ -0,0 +1,15 @@
import boto.ec2
import json
from moto.core import BaseBackend
class Ec2InstanceConnectBackend(BaseBackend):
def send_ssh_public_key(self):
return json.dumps(
{"RequestId": "example-2a47-4c91-9700-e37e85162cb6", "Success": True}
)
ec2instanceconnect_backends = {}
for region in boto.ec2.regions():
ec2instanceconnect_backends[region.name] = Ec2InstanceConnectBackend()

View file

@ -0,0 +1,11 @@
from moto.core.responses import BaseResponse
from .models import ec2instanceconnect_backends
class Ec2InstanceConnectResponse(BaseResponse):
@property
def ec2instanceconnect_backend(self):
return ec2instanceconnect_backends[self.region]
def send_ssh_public_key(self):
return self.ec2instanceconnect_backend.send_ssh_public_key()