Added AWS DataSync mocks and tests

This commit is contained in:
Bjorn Olsen 2019-11-02 21:34:35 +02:00
commit 97c20dd11d
10 changed files with 544 additions and 208 deletions

View file

@ -2,134 +2,326 @@ import logging
import boto
import boto3
from botocore.exceptions import ClientError
from moto import mock_datasync
from nose.tools import assert_raises
'''
Endpoints I need to test:
start_task_execution
cancel_task_execution
describe_task_execution
'''
def create_locations(client, create_smb=False, create_s3=False):
"""
Convenience function for creating locations.
Locations must exist before tasks can be created.
"""
smb_arn = None
s3_arn = None
if create_smb:
response = client.create_location_smb(
ServerHostname="host",
Subdirectory="somewhere",
User="",
Password="",
AgentArns=["stuff"],
)
smb_arn = response["LocationArn"]
if create_s3:
response = client.create_location_s3(
S3BucketArn="arn:aws:s3:::my_bucket",
Subdirectory="dir",
S3Config={"BucketAccessRoleArn": "role"},
)
s3_arn = response["LocationArn"]
return {"smb_arn": smb_arn, "s3_arn": s3_arn}
@mock_datasync
def test_create_location_smb():
client = boto3.client("datasync", region_name="us-east-1")
response = client.create_location_smb(ServerHostname='host',
Subdirectory='somewhere',
User='',
Password='',
AgentArns=['stuff'])
assert 'LocationArn' in response
response = client.create_location_smb(
ServerHostname="host",
Subdirectory="somewhere",
User="",
Password="",
AgentArns=["stuff"],
)
assert "LocationArn" in response
@mock_datasync
def test_describe_location_smb():
client = boto3.client("datasync", region_name="us-east-1")
agent_arns = ["stuff"]
user = "user"
response = client.create_location_smb(
ServerHostname="host",
Subdirectory="somewhere",
User=user,
Password="",
AgentArns=agent_arns,
)
response = client.describe_location_smb(LocationArn=response["LocationArn"])
assert "LocationArn" in response
assert "LocationUri" in response
assert response["User"] == user
assert response["AgentArns"] == agent_arns
@mock_datasync
def test_create_location_s3():
client = boto3.client("datasync", region_name="us-east-1")
response = client.create_location_s3(S3BucketArn='arn:aws:s3:::my_bucket',
Subdirectory='dir',
S3Config={'BucketAccessRoleArn':'role'})
assert 'LocationArn' in response
response = client.create_location_s3(
S3BucketArn="arn:aws:s3:::my_bucket",
Subdirectory="dir",
S3Config={"BucketAccessRoleArn": "role"},
)
assert "LocationArn" in response
@mock_datasync
def test_describe_location_s3():
client = boto3.client("datasync", region_name="us-east-1")
s3_config = {"BucketAccessRoleArn": "role"}
response = client.create_location_s3(
S3BucketArn="arn:aws:s3:::my_bucket", Subdirectory="dir", S3Config=s3_config
)
response = client.describe_location_s3(LocationArn=response["LocationArn"])
assert "LocationArn" in response
assert "LocationUri" in response
assert response["S3Config"] == s3_config
@mock_datasync
def test_describe_location_wrong():
client = boto3.client("datasync", region_name="us-east-1")
agent_arns = ["stuff"]
user = "user"
response = client.create_location_smb(
ServerHostname="host",
Subdirectory="somewhere",
User=user,
Password="",
AgentArns=agent_arns,
)
with assert_raises(ClientError) as e:
response = client.describe_location_s3(LocationArn=response["LocationArn"])
@mock_datasync
def test_list_locations():
client = boto3.client("datasync", region_name="us-east-1")
response = client.list_locations()
# TODO BJORN check if Locations exists when there are none
assert len(response['Locations']) == 0
assert len(response["Locations"]) == 0
response = client.create_location_smb(ServerHostname='host',
Subdirectory='somewhere',
User='',
Password='',
AgentArns=['stuff'])
create_locations(client, create_smb=True)
response = client.list_locations()
assert len(response['Locations']) == 1
assert response['Locations'][0]['LocationUri'] == 'smb://host/somewhere'
response = client.create_location_s3(S3BucketArn='arn:aws:s3:::my_bucket',
S3Config={'BucketAccessRoleArn':'role'})
assert len(response["Locations"]) == 1
assert response["Locations"][0]["LocationUri"] == "smb://host/somewhere"
create_locations(client, create_s3=True)
response = client.list_locations()
assert len(response['Locations']) == 2
assert response['Locations'][1]['LocationUri'] == 's3://my_bucket'
response = client.create_location_s3(S3BucketArn='arn:aws:s3:::my_bucket',
Subdirectory='subdir',
S3Config={'BucketAccessRoleArn':'role'})
assert len(response["Locations"]) == 2
assert response["Locations"][1]["LocationUri"] == "s3://my_bucket/dir"
create_locations(client, create_s3=True)
response = client.list_locations()
assert len(response['Locations']) == 3
assert response['Locations'][2]['LocationUri'] == 's3://my_bucket/subdir'
assert len(response["Locations"]) == 3
assert response["Locations"][2]["LocationUri"] == "s3://my_bucket/dir"
@mock_datasync
def test_create_task():
client = boto3.client("datasync", region_name="us-east-1")
# TODO BJORN check if task can be created when there are no locations
locations = create_locations(client, create_smb=True, create_s3=True)
response = client.create_task(
SourceLocationArn='1',
DestinationLocationArn='2'
SourceLocationArn=locations["smb_arn"],
DestinationLocationArn=locations["s3_arn"],
)
assert 'TaskArn' in response
assert "TaskArn" in response
@mock_datasync
def test_create_task_fail():
""" Test that Locations must exist before a Task can be created """
client = boto3.client("datasync", region_name="us-east-1")
locations = create_locations(client, create_smb=True, create_s3=True)
with assert_raises(ClientError) as e:
response = client.create_task(
SourceLocationArn="1", DestinationLocationArn=locations["s3_arn"]
)
with assert_raises(ClientError) as e:
response = client.create_task(
SourceLocationArn=locations["smb_arn"], DestinationLocationArn="2"
)
@mock_datasync
def test_list_tasks():
client = boto3.client("datasync", region_name="us-east-1")
locations = create_locations(client, create_s3=True, create_smb=True)
response = client.create_task(
SourceLocationArn='1',
DestinationLocationArn='2',
SourceLocationArn=locations["smb_arn"],
DestinationLocationArn=locations["s3_arn"],
)
response = client.create_task(
SourceLocationArn='3',
DestinationLocationArn='4',
Name='task_name'
SourceLocationArn=locations["s3_arn"],
DestinationLocationArn=locations["smb_arn"],
Name="task_name",
)
response = client.list_tasks()
tasks = response['Tasks']
tasks = response["Tasks"]
assert len(tasks) == 2
task = tasks[0]
assert task['Status'] == 'AVAILABLE'
assert 'Name' not in task
assert task["Status"] == "AVAILABLE"
assert "Name" not in task
task = tasks[1]
assert task['Status'] == 'AVAILABLE'
assert task['Name'] == 'task_name'
assert task["Status"] == "AVAILABLE"
assert task["Name"] == "task_name"
@mock_datasync
def test_describe_task():
client = boto3.client("datasync", region_name="us-east-1")
response = client.create_task(
SourceLocationArn='3',
DestinationLocationArn='4',
Name='task_name'
)
task_arn = response['TaskArn']
locations = create_locations(client, create_s3=True, create_smb=True)
response = client.describe_task(
TaskArn=task_arn
response = client.create_task(
SourceLocationArn=locations["smb_arn"],
DestinationLocationArn=locations["s3_arn"],
Name="task_name",
)
assert 'TaskArn' in response
assert 'Status' in response
assert 'SourceLocationArn' in response
assert 'DestinationLocationArn' in response
task_arn = response["TaskArn"]
response = client.describe_task(TaskArn=task_arn)
assert "TaskArn" in response
assert "Status" in response
assert "SourceLocationArn" in response
assert "DestinationLocationArn" in response
@mock_datasync
def test_describe_task_not_exist():
client = boto3.client("datasync", region_name="us-east-1")
with assert_raises(ClientError) as e:
client.describe_task(TaskArn="abc")
@mock_datasync
def test_start_task_execution():
client = boto3.client("datasync", region_name="us-east-1")
locations = create_locations(client, create_s3=True, create_smb=True)
response = client.create_task(
SourceLocationArn='3',
DestinationLocationArn='4',
Name='task_name'
)
task_arn = response['TaskArn']
response = client.start_task_execution(
TaskArn=task_arn
SourceLocationArn=locations["smb_arn"],
DestinationLocationArn=locations["s3_arn"],
Name="task_name",
)
assert 'TaskExecutionArn' in response
task_arn = response["TaskArn"]
response = client.describe_task(TaskArn=task_arn)
assert "CurrentTaskExecutionArn" not in response
response = client.start_task_execution(TaskArn=task_arn)
assert "TaskExecutionArn" in response
task_execution_arn = response["TaskExecutionArn"]
response = client.describe_task(TaskArn=task_arn)
assert response["CurrentTaskExecutionArn"] == task_execution_arn
@mock_datasync
def test_start_task_execution_twice():
client = boto3.client("datasync", region_name="us-east-1")
locations = create_locations(client, create_s3=True, create_smb=True)
response = client.create_task(
SourceLocationArn=locations["smb_arn"],
DestinationLocationArn=locations["s3_arn"],
Name="task_name",
)
task_arn = response["TaskArn"]
response = client.start_task_execution(TaskArn=task_arn)
assert "TaskExecutionArn" in response
task_execution_arn = response["TaskExecutionArn"]
with assert_raises(ClientError) as e:
response = client.start_task_execution(TaskArn=task_arn)
@mock_datasync
def test_describe_task_execution():
client = boto3.client("datasync", region_name="us-east-1")
locations = create_locations(client, create_s3=True, create_smb=True)
response = client.create_task(
SourceLocationArn=locations["smb_arn"],
DestinationLocationArn=locations["s3_arn"],
Name="task_name",
)
task_arn = response["TaskArn"]
response = client.start_task_execution(TaskArn=task_arn)
task_execution_arn = response["TaskExecutionArn"]
# Each time task_execution is described the Status will increment
# This is a simple way to simulate a task being executed
response = client.describe_task_execution(TaskExecutionArn=task_execution_arn)
assert response["TaskExecutionArn"] == task_execution_arn
assert response["Status"] == "INITIALIZING"
response = client.describe_task_execution(TaskExecutionArn=task_execution_arn)
assert response["TaskExecutionArn"] == task_execution_arn
assert response["Status"] == "PREPARING"
response = client.describe_task_execution(TaskExecutionArn=task_execution_arn)
assert response["TaskExecutionArn"] == task_execution_arn
assert response["Status"] == "TRANSFERRING"
response = client.describe_task_execution(TaskExecutionArn=task_execution_arn)
assert response["TaskExecutionArn"] == task_execution_arn
assert response["Status"] == "VERIFYING"
response = client.describe_task_execution(TaskExecutionArn=task_execution_arn)
assert response["TaskExecutionArn"] == task_execution_arn
assert response["Status"] == "SUCCESS"
response = client.describe_task_execution(TaskExecutionArn=task_execution_arn)
assert response["TaskExecutionArn"] == task_execution_arn
assert response["Status"] == "SUCCESS"
@mock_datasync
def test_describe_task_execution_not_exist():
client = boto3.client("datasync", region_name="us-east-1")
with assert_raises(ClientError) as e:
client.describe_task_execution(TaskExecutionArn="abc")
@mock_datasync
def test_cancel_task_execution():
client = boto3.client("datasync", region_name="us-east-1")
locations = create_locations(client, create_s3=True, create_smb=True)
response = client.create_task(
SourceLocationArn=locations["smb_arn"],
DestinationLocationArn=locations["s3_arn"],
Name="task_name",
)
task_arn = response["TaskArn"]
response = client.start_task_execution(TaskArn=task_arn)
task_execution_arn = response["TaskExecutionArn"]
response = client.describe_task(TaskArn=task_arn)
assert response["CurrentTaskExecutionArn"] == task_execution_arn
response = client.cancel_task_execution(TaskExecutionArn=task_execution_arn)
response = client.describe_task(TaskArn=task_arn)
assert "CurrentTaskExecutionArn" not in response
response = client.describe_task_execution(TaskExecutionArn=task_execution_arn)
assert response["Status"] == "ERROR"

View file

@ -1,27 +1,26 @@
# -*- coding: utf-8 -*-
from __future__ import unicode_literals
import base64
import json
import os
import time
import uuid
import boto
import boto3
import botocore.exceptions
import six
from botocore.exceptions import ClientError
from boto.exception import SQSError
from boto.sqs.message import RawMessage, Message
from freezegun import freeze_time
import base64
import json
import sure # noqa
import time
import uuid
from moto import settings, mock_sqs, mock_sqs_deprecated
from tests.helpers import requires_boto_gte
import tests.backport_assert_raises # noqa
from nose.tools import assert_raises
from boto.exception import SQSError
from boto.sqs.message import Message, RawMessage
from botocore.exceptions import ClientError
from freezegun import freeze_time
from moto import mock_sqs, mock_sqs_deprecated, settings
from nose import SkipTest
from nose.tools import assert_raises
from tests.helpers import requires_boto_gte
@mock_sqs
@ -33,7 +32,7 @@ def test_create_fifo_queue_fail():
except botocore.exceptions.ClientError as err:
err.response["Error"]["Code"].should.equal("InvalidParameterValue")
else:
raise RuntimeError("Should of raised InvalidParameterValue Exception")z
raise RuntimeError("Should of raised InvalidParameterValue Exception")
@mock_sqs