First version of datapipelines.

This commit is contained in:
Steve Pulec 2015-09-16 10:00:38 -04:00
commit 95169c6011
9 changed files with 371 additions and 0 deletions

View file

@ -8,6 +8,7 @@ __version__ = '0.4.12'
from .autoscaling import mock_autoscaling # flake8: noqa
from .cloudformation import mock_cloudformation # flake8: noqa
from .cloudwatch import mock_cloudwatch # flake8: noqa
from .datapipeline import mock_datapipeline # flake8: noqa
from .dynamodb import mock_dynamodb # flake8: noqa
from .dynamodb2 import mock_dynamodb2 # flake8: noqa
from .ec2 import mock_ec2 # flake8: noqa

View file

@ -2,6 +2,7 @@ from __future__ import unicode_literals
from moto.autoscaling import autoscaling_backend
from moto.cloudwatch import cloudwatch_backend
from moto.cloudformation import cloudformation_backend
from moto.datapipeline import datapipeline_backend
from moto.dynamodb import dynamodb_backend
from moto.dynamodb2 import dynamodb_backend2
from moto.ec2 import ec2_backend
@ -25,6 +26,7 @@ BACKENDS = {
'autoscaling': autoscaling_backend,
'cloudformation': cloudformation_backend,
'cloudwatch': cloudwatch_backend,
'datapipeline': datapipeline_backend,
'dynamodb': dynamodb_backend,
'dynamodb2': dynamodb_backend2,
'ec2': ec2_backend,

View file

@ -0,0 +1,12 @@
from __future__ import unicode_literals
from .models import datapipeline_backends
from ..core.models import MockAWS
datapipeline_backend = datapipeline_backends['us-east-1']
def mock_datapipeline(func=None):
if func:
return MockAWS(datapipeline_backends)(func)
else:
return MockAWS(datapipeline_backends)

122
moto/datapipeline/models.py Normal file
View file

@ -0,0 +1,122 @@
from __future__ import unicode_literals
import datetime
import boto.datapipeline
from moto.core import BaseBackend
from .utils import get_random_pipeline_id
class PipelineObject(object):
def __init__(self, object_id, name, fields):
self.object_id = object_id
self.name = name
self.fields = fields
def to_json(self):
return {
"Fields": self.fields,
"Id": self.object_id,
"Name": self.name,
}
class Pipeline(object):
def __init__(self, name, unique_id):
self.name = name
self.unique_id = unique_id
self.description = ""
self.pipeline_id = get_random_pipeline_id()
self.creation_time = datetime.datetime.utcnow()
self.objects = []
def to_json(self):
return {
"Description": self.description,
"Fields": [{
"key": "@pipelineState",
"stringValue": "SCHEDULED"
}, {
"key": "description",
"stringValue": self.description
}, {
"key": "name",
"stringValue": self.name
}, {
"key": "@creationTime",
"stringValue": datetime.datetime.strftime(self.creation_time, '%Y-%m-%dT%H-%M-%S'),
}, {
"key": "@id",
"stringValue": self.pipeline_id,
}, {
"key": "@sphere",
"stringValue": "PIPELINE"
}, {
"key": "@version",
"stringValue": "1"
}, {
"key": "@userId",
"stringValue": "924374875933"
}, {
"key": "@accountId",
"stringValue": "924374875933"
}, {
"key": "uniqueId",
"stringValue": self.unique_id
}],
"Name": self.name,
"PipelineId": self.pipeline_id,
"Tags": [
]
}
def set_pipeline_objects(self, pipeline_objects):
self.objects = [
PipelineObject(pipeline_object['id'], pipeline_object['name'], pipeline_object['fields'])
for pipeline_object in pipeline_objects
]
def activate(self):
pass
class DataPipelineBackend(BaseBackend):
def __init__(self):
self.pipelines = {}
def create_pipeline(self, name, unique_id):
pipeline = Pipeline(name, unique_id)
self.pipelines[pipeline.pipeline_id] = pipeline
return pipeline
def describe_pipelines(self, pipeline_ids):
pipelines = [pipeline for pipeline in self.pipelines.values() if pipeline.pipeline_id in pipeline_ids]
return pipelines
def get_pipeline(self, pipeline_id):
return self.pipelines[pipeline_id]
def put_pipeline_definition(self, pipeline_id, pipeline_objects):
pipeline = self.get_pipeline(pipeline_id)
pipeline.set_pipeline_objects(pipeline_objects)
def get_pipeline_definition(self, pipeline_id):
pipeline = self.get_pipeline(pipeline_id)
return pipeline.objects
def describe_objects(self, object_ids, pipeline_id):
pipeline = self.get_pipeline(pipeline_id)
pipeline_objects = [
pipeline_object for pipeline_object in pipeline.objects
if pipeline_object.object_id in object_ids
]
return pipeline_objects
def activate_pipeline(self, pipeline_id):
pipeline = self.get_pipeline(pipeline_id)
pipeline.activate()
datapipeline_backends = {}
for region in boto.datapipeline.regions():
datapipeline_backends[region.name] = DataPipelineBackend()

View file

@ -0,0 +1,68 @@
from __future__ import unicode_literals
import json
from moto.core.responses import BaseResponse
from .models import datapipeline_backends
class DataPipelineResponse(BaseResponse):
@property
def parameters(self):
return json.loads(self.body.decode("utf-8"))
@property
def datapipeline_backend(self):
return datapipeline_backends[self.region]
def create_pipeline(self):
name = self.parameters['name']
unique_id = self.parameters['uniqueId']
pipeline = self.datapipeline_backend.create_pipeline(name, unique_id)
return json.dumps({
"pipelineId": pipeline.pipeline_id,
})
def describe_pipelines(self):
pipeline_ids = self.parameters["pipelineIds"]
pipelines = self.datapipeline_backend.describe_pipelines(pipeline_ids)
return json.dumps({
"PipelineDescriptionList": [
pipeline.to_json() for pipeline in pipelines
]
})
def put_pipeline_definition(self):
pipeline_id = self.parameters["pipelineId"]
pipeline_objects = self.parameters["pipelineObjects"]
self.datapipeline_backend.put_pipeline_definition(pipeline_id, pipeline_objects)
return json.dumps({"errored": False})
def get_pipeline_definition(self):
pipeline_id = self.parameters["pipelineId"]
pipeline_definition = self.datapipeline_backend.get_pipeline_definition(pipeline_id)
return json.dumps({
"pipelineObjects": [pipeline_object.to_json() for pipeline_object in pipeline_definition]
})
def describe_objects(self):
pipeline_id = self.parameters["pipelineId"]
object_ids = self.parameters["objectIds"]
pipeline_objects = self.datapipeline_backend.describe_objects(object_ids, pipeline_id)
return json.dumps({
"HasMoreResults": False,
"Marker": None,
"PipelineObjects": [
pipeline_object.to_json() for pipeline_object in pipeline_objects
]
})
def activate_pipeline(self):
pipeline_id = self.parameters["pipelineId"]
self.datapipeline_backend.activate_pipeline(pipeline_id)
return json.dumps({})

10
moto/datapipeline/urls.py Normal file
View file

@ -0,0 +1,10 @@
from __future__ import unicode_literals
from .responses import DataPipelineResponse
url_bases = [
"https?://datapipeline.(.+).amazonaws.com",
]
url_paths = {
'{0}/$': DataPipelineResponse.dispatch,
}

View file

@ -0,0 +1,5 @@
from moto.core.utils import get_random_hex
def get_random_pipeline_id():
return "df-{0}".format(get_random_hex(length=19))