added several basic tests

This commit is contained in:
Bjorn Olsen 2019-11-01 21:16:59 +02:00
commit c02c0e4033
3 changed files with 230 additions and 47 deletions

View file

@ -8,34 +8,49 @@ from moto.compat import OrderedDict
from moto.core import BaseBackend, BaseModel
'''
Endpoints I need to test:
list_locations
list_tasks
start_task_execution
cancel_task_execution
describe_task
describe_task_execution
'''
def datasync_json_dump(datasync_object):
return json.dumps(datasync_object)
class Location(BaseModel):
def __init__(self, location_uri, region_name):
self.location_uri = location_uri
self.region_name = region_name
loc = ''.join([random.choice(string.ascii_lowercase + string.digits) for _ in range(17)])
self.arn = 'arn:aws:datasync:{0}:111222333444:location/loc-{1}'.format(region_name, loc)
def __init__(self,
location_uri,
region_name,
arn_counter=0):
self.uri = location_uri
self.region_name = region_name
# Generate ARN
self.arn = 'arn:aws:datasync:{0}:111222333444:location/loc-{1}'.format(region_name, str(arn_counter).zfill(17))
class Task(BaseModel):
def __init__(self,
source_location_arn,
destination_location_arn,
name,
region_name,
arn_counter=0):
self.source_location_arn = source_location_arn
self.destination_location_arn = destination_location_arn
self.status = 'AVAILABLE'
self.name = name
# Generate ARN
self.arn = 'arn:aws:datasync:{0}:111222333444:task/task-{1}'.format(region_name, str(arn_counter).zfill(17))
class TaskExecution(BaseModel):
def __init__(self,
task_arn,
arn_counter=0):
self.task_arn = task_arn
self.arn = '{0}/execution/exec-{1}'.format(task_arn, str(arn_counter).zfill(17))
class DataSyncBackend(BaseBackend):
def __init__(self, region_name):
self.region_name = region_name
self.locations = OrderedDict()
# Always increase when new things are created
# This ensures uniqueness
self.arn_counter = 0
self.locations = dict()
self.tasks = dict()
self.task_executions = dict()
def reset(self):
region_name = self.region_name
self._reset_model_refs()
@ -43,13 +58,38 @@ class DataSyncBackend(BaseBackend):
self.__init__(region_name)
def create_location(self, location_uri):
if location_uri in self.locations:
raise Exception('Location already exists')
location = Location(location_uri, region_name=self.region_name)
self.locations['location_uri'] = location
# TODO BJORN figure out exception
# TODO BJORN test for exception
for arn, location in self.locations.items():
if location.uri == location_uri:
raise Exception('Location already exists')
self.arn_counter = self.arn_counter + 1
location = Location(location_uri,
region_name=self.region_name,
arn_counter=self.arn_counter)
self.locations[location.arn] = location
return location.arn
def create_task(self,
source_location_arn,
destination_location_arn,
name):
self.arn_counter = self.arn_counter + 1
task = Task(source_location_arn,
destination_location_arn,
name,
region_name=self.region_name,
arn_counter=self.arn_counter
)
self.tasks[task.arn] = task
return task.arn
def start_task_execution(self, task_arn):
self.arn_counter = self.arn_counter + 1
task_execution = TaskExecution(task_arn,
arn_counter=self.arn_counter)
self.task_executions[task_execution.arn] = task_execution
return task_execution.arn
datasync_backends = {}
for region in boto3.Session().get_available_regions("datasync"):