DataSync: Task metadata and update_task

Travis: Moved lint check before build
Datasync: Added delete_task and delete_location
This commit is contained in:
Bjorn Olsen 2019-11-05 12:30:05 +02:00
commit 5cfbe2bb3d
4 changed files with 211 additions and 53 deletions

View file

@ -27,12 +27,14 @@ class Task(BaseModel):
name,
region_name,
arn_counter=0,
metadata=None,
):
self.source_location_arn = source_location_arn
self.destination_location_arn = destination_location_arn
self.name = name
self.metadata = metadata
# For simplicity Tasks are either available or running
self.status = "AVAILABLE"
self.name = name
self.current_task_execution_arn = None
# Generate ARN
self.arn = "arn:aws:datasync:{0}:111222333444:task/task-{1}".format(
@ -129,7 +131,27 @@ class DataSyncBackend(BaseBackend):
self.locations[location.arn] = location
return location.arn
def create_task(self, source_location_arn, destination_location_arn, name):
def _get_location(self, location_arn, typ):
if location_arn not in self.locations:
raise InvalidRequestException(
"Location {0} is not found.".format(location_arn)
)
location = self.locations[location_arn]
if location.typ != typ:
raise InvalidRequestException(
"Invalid Location type: {0}".format(location.typ)
)
return location
def delete_location(self, location_arn):
if location_arn in self.locations:
del self.locations[location_arn]
else:
raise InvalidRequestException
def create_task(
self, source_location_arn, destination_location_arn, name, metadata=None
):
if source_location_arn not in self.locations:
raise InvalidRequestException(
"Location {0} not found.".format(source_location_arn)
@ -145,10 +167,33 @@ class DataSyncBackend(BaseBackend):
name,
region_name=self.region_name,
arn_counter=self.arn_counter,
metadata=metadata,
)
self.tasks[task.arn] = task
return task.arn
def _get_task(self, task_arn):
if task_arn in self.tasks:
return self.tasks[task_arn]
else:
raise InvalidRequestException
def update_task(self, task_arn, name, metadata):
if task_arn in self.tasks:
task = self.tasks[task_arn]
task.name = name
task.metadata = metadata
else:
raise InvalidRequestException(
"Sync task {0} is not found.".format(task_arn)
)
def delete_task(self, task_arn):
if task_arn in self.tasks:
del self.tasks[task_arn]
else:
raise InvalidRequestException
def start_task_execution(self, task_arn):
self.arn_counter = self.arn_counter + 1
if task_arn in self.tasks:
@ -161,12 +206,19 @@ class DataSyncBackend(BaseBackend):
return task_execution.arn
raise InvalidRequestException("Invalid request.")
def _get_task_execution(self, task_execution_arn):
if task_execution_arn in self.task_executions:
return self.task_executions[task_execution_arn]
else:
raise InvalidRequestException
def cancel_task_execution(self, task_execution_arn):
if task_execution_arn in self.task_executions:
task_execution = self.task_executions[task_execution_arn]
task_execution.cancel()
task_arn = task_execution.task_arn
self.tasks[task_arn].current_task_execution_arn = None
self.tasks[task_arn].status = "AVAILABLE"
return
raise InvalidRequestException(
"Sync task {0} is not found.".format(task_execution_arn)

View file

@ -2,7 +2,6 @@ import json
from moto.core.responses import BaseResponse
from .exceptions import InvalidRequestException
from .models import datasync_backends
@ -18,17 +17,7 @@ class DataSyncResponse(BaseResponse):
return json.dumps({"Locations": locations})
def _get_location(self, location_arn, typ):
location_arn = self._get_param("LocationArn")
if location_arn not in self.datasync_backend.locations:
raise InvalidRequestException(
"Location {0} is not found.".format(location_arn)
)
location = self.datasync_backend.locations[location_arn]
if location.typ != typ:
raise InvalidRequestException(
"Invalid Location type: {0}".format(location.typ)
)
return location
return self.datasync_backend._get_location(location_arn, typ)
def create_location_s3(self):
# s3://bucket_name/folder/
@ -86,16 +75,40 @@ class DataSyncResponse(BaseResponse):
}
)
def delete_location(self):
location_arn = self._get_param("LocationArn")
self.datasync_backend.delete_location(location_arn)
return json.dumps({})
def create_task(self):
destination_location_arn = self._get_param("DestinationLocationArn")
source_location_arn = self._get_param("SourceLocationArn")
name = self._get_param("Name")
metadata = {
"CloudWatchLogGroupArn": self._get_param("CloudWatchLogGroupArn"),
"Options": self._get_param("Options"),
"Excludes": self._get_param("Excludes"),
"Tags": self._get_param("Tags"),
}
arn = self.datasync_backend.create_task(
source_location_arn, destination_location_arn, name
source_location_arn, destination_location_arn, name, metadata=metadata
)
return json.dumps({"TaskArn": arn})
def update_task(self):
task_arn = self._get_param("TaskArn")
self.datasync_backend.update_task(
task_arn,
name=self._get_param("Name"),
metadata={
"CloudWatchLogGroupArn": self._get_param("CloudWatchLogGroupArn"),
"Options": self._get_param("Options"),
"Excludes": self._get_param("Excludes"),
"Tags": self._get_param("Tags"),
},
)
return json.dumps({})
def list_tasks(self):
tasks = list()
for arn, task in self.datasync_backend.tasks.items():
@ -104,29 +117,32 @@ class DataSyncResponse(BaseResponse):
)
return json.dumps({"Tasks": tasks})
def delete_task(self):
task_arn = self._get_param("TaskArn")
self.datasync_backend.delete_task(task_arn)
return json.dumps({})
def describe_task(self):
task_arn = self._get_param("TaskArn")
if task_arn in self.datasync_backend.tasks:
task = self.datasync_backend.tasks[task_arn]
return json.dumps(
{
"TaskArn": task.arn,
"Name": task.name,
"CurrentTaskExecutionArn": task.current_task_execution_arn,
"Status": task.status,
"SourceLocationArn": task.source_location_arn,
"DestinationLocationArn": task.destination_location_arn,
}
)
raise InvalidRequestException
task = self.datasync_backend._get_task(task_arn)
return json.dumps(
{
"TaskArn": task.arn,
"Status": task.status,
"Name": task.name,
"CurrentTaskExecutionArn": task.current_task_execution_arn,
"SourceLocationArn": task.source_location_arn,
"DestinationLocationArn": task.destination_location_arn,
"CloudWatchLogGroupArn": task.metadata["CloudWatchLogGroupArn"],
"Options": task.metadata["Options"],
"Excludes": task.metadata["Excludes"],
}
)
def start_task_execution(self):
task_arn = self._get_param("TaskArn")
if task_arn in self.datasync_backend.tasks:
arn = self.datasync_backend.start_task_execution(task_arn)
if arn:
return json.dumps({"TaskExecutionArn": arn})
raise InvalidRequestException("Invalid request.")
arn = self.datasync_backend.start_task_execution(task_arn)
return json.dumps({"TaskExecutionArn": arn})
def cancel_task_execution(self):
task_execution_arn = self._get_param("TaskExecutionArn")
@ -135,21 +151,12 @@ class DataSyncResponse(BaseResponse):
def describe_task_execution(self):
task_execution_arn = self._get_param("TaskExecutionArn")
if task_execution_arn in self.datasync_backend.task_executions:
task_execution = self.datasync_backend.task_executions[task_execution_arn]
if task_execution:
result = json.dumps(
{
"TaskExecutionArn": task_execution.arn,
"Status": task_execution.status,
}
)
if task_execution.status == "SUCCESS":
self.datasync_backend.tasks[
task_execution.task_arn
].status = "AVAILABLE"
# Simulate task being executed
task_execution.iterate_status()
return result
raise InvalidRequestException
task_execution = self.datasync_backend._get_task_execution(task_execution_arn)
result = json.dumps(
{"TaskExecutionArn": task_execution.arn, "Status": task_execution.status,}
)
if task_execution.status == "SUCCESS":
self.datasync_backend.tasks[task_execution.task_arn].status = "AVAILABLE"
# Simulate task being executed
task_execution.iterate_status()
return result