This commit is contained in:
root 2015-12-03 11:57:34 +00:00
commit 70171e8ef9
59 changed files with 4722 additions and 56 deletions

View file

@ -3,7 +3,7 @@ import logging
logging.getLogger('boto').setLevel(logging.CRITICAL)
__title__ = 'moto'
__version__ = '0.4.18'
__version__ = '0.4.19'
from .autoscaling import mock_autoscaling # flake8: noqa
from .cloudformation import mock_cloudformation # flake8: noqa
@ -28,3 +28,4 @@ from .sns import mock_sns # flake8: noqa
from .sqs import mock_sqs # flake8: noqa
from .sts import mock_sts # flake8: noqa
from .route53 import mock_route53 # flake8: noqa
from .swf import mock_swf # flake8: noqa

View file

@ -131,7 +131,7 @@ class BaseResponse(_TemplateEnvironmentMixin):
else:
self.region = self.default_region
self.headers = dict(request.headers)
self.headers = request.headers
self.response_headers = headers
return self.call_action()
@ -142,7 +142,7 @@ class BaseResponse(_TemplateEnvironmentMixin):
# Headers are case-insensitive. Probably a better way to do this.
match = self.headers.get('x-amz-target') or self.headers.get('X-Amz-Target')
if match:
action = match.split(".")[1]
action = match.split(".")[-1]
action = camelcase_to_underscores(action)
method_names = method_names_from_class(self.__class__)

View file

@ -1,4 +1,6 @@
from __future__ import unicode_literals
import datetime
import inspect
import random
import re
@ -23,6 +25,22 @@ def camelcase_to_underscores(argument):
return result
def underscores_to_camelcase(argument):
''' Converts a camelcase param like the_new_attribute to the equivalent
camelcase version like theNewAttribute. Note that the first letter is
NOT capitalized by this function '''
result = ''
previous_was_underscore = False
for char in argument:
if char != '_':
if previous_was_underscore:
result += char.upper()
else:
result += char
previous_was_underscore = char == '_'
return result
def method_names_from_class(clazz):
# On Python 2, methods are different from functions, and the `inspect`
# predicates distinguish between them. On Python 3, methods are just
@ -87,3 +105,14 @@ def iso_8601_datetime_with_milliseconds(datetime):
def rfc_1123_datetime(datetime):
RFC1123 = '%a, %d %b %Y %H:%M:%S GMT'
return datetime.strftime(RFC1123)
def unix_time(dt=None):
dt = dt or datetime.datetime.utcnow()
epoch = datetime.datetime.utcfromtimestamp(0)
delta = dt - epoch
return (delta.days * 86400) + (delta.seconds + (delta.microseconds / 1e6))
def unix_time_millis(dt=None):
return unix_time(dt) * 1000.0

View file

@ -5,8 +5,8 @@ import json
from moto.compat import OrderedDict
from moto.core import BaseBackend
from moto.core.utils import unix_time
from .comparisons import get_comparison_func
from .utils import unix_time
class DynamoJsonEncoder(json.JSONEncoder):

View file

@ -1,6 +0,0 @@
from __future__ import unicode_literals
import calendar
def unix_time(dt):
return calendar.timegm(dt.timetuple())

View file

@ -5,8 +5,8 @@ import json
from moto.compat import OrderedDict
from moto.core import BaseBackend
from moto.core.utils import unix_time
from .comparisons import get_comparison_func
from .utils import unix_time
class DynamoJsonEncoder(json.JSONEncoder):
@ -82,7 +82,7 @@ class Item(object):
attributes = {}
for attribute_key, attribute in self.attrs.items():
attributes[attribute_key] = {
attribute.type : attribute.value
attribute.type: attribute.value
}
return {
@ -204,7 +204,7 @@ class Table(object):
keys.append(key['AttributeName'])
return keys
def put_item(self, item_attrs, expected = None, overwrite = False):
def put_item(self, item_attrs, expected=None, overwrite=False):
hash_value = DynamoType(item_attrs.get(self.hash_key_attr))
if self.has_range_key:
range_value = DynamoType(item_attrs.get(self.range_key_attr))
@ -228,13 +228,13 @@ class Table(object):
if current is None:
current_attr = {}
elif hasattr(current,'attrs'):
elif hasattr(current, 'attrs'):
current_attr = current.attrs
else:
current_attr = current
for key, val in expected.items():
if 'Exists' in val and val['Exists'] == False:
if 'Exists' in val and val['Exists'] is False:
if key in current_attr:
raise ValueError("The conditional request failed")
elif key not in current_attr:
@ -361,7 +361,7 @@ class DynamoDBBackend(BaseBackend):
table.throughput = throughput
return table
def put_item(self, table_name, item_attrs, expected = None, overwrite = False):
def put_item(self, table_name, item_attrs, expected=None, overwrite=False):
table = self.tables.get(table_name)
if not table:
return None

View file

@ -1,6 +0,0 @@
from __future__ import unicode_literals
import calendar
def unix_time(dt):
return calendar.timegm(dt.timetuple())

View file

@ -319,12 +319,6 @@ class Instance(BotoInstance, TaggedEC2Resource):
# If we are in EC2-Classic, autoassign a public IP
associate_public_ip = True
self.block_device_mapping = BlockDeviceMapping()
# Default have an instance with root volume should you not wish to override with attach volume cmd.
# However this is a ghost volume and wont show up in get_all_volumes or snapshot-able.
self.block_device_mapping['/dev/sda1'] = BlockDeviceType(volume_id=random_volume_id(), status='attached',
attach_time=utc_date_and_time())
amis = self.ec2_backend.describe_images(filters={'image-id': image_id})
ami = amis[0] if amis else None
@ -345,11 +339,23 @@ class Instance(BotoInstance, TaggedEC2Resource):
subnet = ec2_backend.get_subnet(self.subnet_id)
self.vpc_id = subnet.vpc_id
self.block_device_mapping = BlockDeviceMapping()
self.prep_nics(kwargs.get("nics", {}),
subnet_id=self.subnet_id,
private_ip=kwargs.get("private_ip"),
associate_public_ip=associate_public_ip)
def setup_defaults(self):
# Default have an instance with root volume should you not wish to override with attach volume cmd.
volume = self.ec2_backend.create_volume(8, 'us-east-1a')
self.ec2_backend.attach_volume(volume.id, self.id, '/dev/sda1')
def teardown_defaults(self):
volume_id = self.block_device_mapping['/dev/sda1'].volume_id
self.ec2_backend.detach_volume(volume_id, self.id, '/dev/sda1')
self.ec2_backend.delete_volume(volume_id)
@property
def get_block_device_mapping(self):
return self.block_device_mapping.items()
@ -420,6 +426,8 @@ class Instance(BotoInstance, TaggedEC2Resource):
for nic in self.nics.values():
nic.stop()
self.teardown_defaults()
self._state.name = "terminated"
self._state.code = 48
@ -546,6 +554,7 @@ class InstanceBackend(object):
for name in security_group_names]
security_groups.extend(self.get_security_group_from_id(sg_id)
for sg_id in kwargs.pop("security_group_ids", []))
self.reservations[new_reservation.id] = new_reservation
for index in range(count):
new_instance = Instance(
self,
@ -555,7 +564,7 @@ class InstanceBackend(object):
**kwargs
)
new_reservation.instances.append(new_instance)
self.reservations[new_reservation.id] = new_reservation
new_instance.setup_defaults()
return new_reservation
def start_instances(self, instance_ids):
@ -2597,6 +2606,7 @@ class NetworkAclAssociation(object):
subnet_id, network_acl_id):
self.ec2_backend = ec2_backend
self.id = new_association_id
self.new_association_id = new_association_id
self.subnet_id = subnet_id
self.network_acl_id = network_acl_id
super(NetworkAclAssociation, self).__init__()
@ -2609,10 +2619,12 @@ class NetworkAcl(TaggedEC2Resource):
self.vpc_id = vpc_id
self.network_acl_entries = []
self.associations = {}
self.default = default
self.default = 'true' if default is True else 'false'
def get_filter_value(self, filter_name):
if filter_name == "vpc-id":
if filter_name == "default":
return self.default
elif filter_name == "vpc-id":
return self.vpc_id
elif filter_name == "association.network-acl-id":
return self.id

View file

@ -96,7 +96,7 @@ DESCRIBE_NETWORK_ACL_RESPONSE = """
<item>
<networkAclId>{{ network_acl.id }}</networkAclId>
<vpcId>{{ network_acl.vpc_id }}</vpcId>
<default>true</default>
<default>{{ network_acl.default }}</default>
<entrySet>
{% for entry in network_acl.network_acl_entries %}
<item>

View file

@ -1,4 +1,5 @@
from __future__ import unicode_literals
import json
from moto.core.responses import BaseResponse
from .models import emr_backends
@ -11,6 +12,10 @@ class ElasticMapReduceResponse(BaseResponse):
def backend(self):
return emr_backends[self.region]
@property
def boto3_request(self):
return 'json' in self.headers.get('Content-Type', [])
def add_job_flow_steps(self):
job_flow_id = self._get_param('JobFlowId')
steps = self._get_list_prefix('Steps.member')
@ -35,6 +40,11 @@ class ElasticMapReduceResponse(BaseResponse):
if instance_groups:
self.backend.add_instance_groups(job_flow.id, instance_groups)
if self.boto3_request:
return json.dumps({
"JobFlowId": job_flow.id
})
template = self.response_template(RUN_JOB_FLOW_TEMPLATE)
return template.render(job_flow=job_flow)
@ -79,6 +89,24 @@ class ElasticMapReduceResponse(BaseResponse):
def list_clusters(self):
clusters = self.backend.list_clusters()
if self.boto3_request:
return json.dumps({
"Clusters": [
{
"Id": cluster.id,
"Name": cluster.name,
"Status": {
"State": cluster.state,
"StatusChangeReason": {},
"TimeLine": {},
},
"NormalizedInstanceHours": cluster.normalized_instance_hours,
} for cluster in clusters
],
"Marker": ""
})
template = self.response_template(LIST_CLUSTERS_TEMPLATE)
return template.render(clusters=clusters)

View file

@ -15,6 +15,7 @@ class Key(object):
self.enabled = True
self.region = region
self.account_id = "0123456789012"
self.key_rotation_status = False
@property
def arn(self):
@ -68,6 +69,22 @@ class KmsBackend(BaseBackend):
def get_all_aliases(self):
return self.key_to_aliases
def enable_key_rotation(self, key_id):
self.keys[key_id].key_rotation_status = True
def disable_key_rotation(self, key_id):
self.keys[key_id].key_rotation_status = False
def get_key_rotation_status(self, key_id):
return self.keys[key_id].key_rotation_status
def put_key_policy(self, key_id, policy):
self.keys[key_id].policy = policy
def get_key_policy(self, key_id):
return self.keys[key_id].policy
kms_backends = {}
for region in boto.kms.regions():
kms_backends[region.name] = KmsBackend()

View file

@ -39,8 +39,9 @@ class KmsResponse(BaseResponse):
try:
key = self.kms_backend.describe_key(key_id)
except KeyError:
self.headers['status'] = 404
return "{}", self.headers
headers = dict(self.headers)
headers['status'] = 404
return "{}", headers
return json.dumps(key.to_dict())
def list_keys(self):
@ -136,3 +137,88 @@ class KmsResponse(BaseResponse):
'Truncated': False,
'Aliases': response_aliases,
})
def enable_key_rotation(self):
key_id = self.parameters.get('KeyId')
_assert_valid_key_id(key_id)
try:
self.kms_backend.enable_key_rotation(key_id)
except KeyError:
raise JSONResponseError(404, 'Not Found', body={
'message': "Key 'arn:aws:kms:{region}:012345678912:key/{key_id}' does not exist".format(region=self.region,key_id=key_id),
'__type': 'NotFoundException'})
return json.dumps(None)
def disable_key_rotation(self):
key_id = self.parameters.get('KeyId')
_assert_valid_key_id(key_id)
try:
self.kms_backend.disable_key_rotation(key_id)
except KeyError:
raise JSONResponseError(404, 'Not Found', body={
'message': "Key 'arn:aws:kms:{region}:012345678912:key/{key_id}' does not exist".format(region=self.region,key_id=key_id),
'__type': 'NotFoundException'})
return json.dumps(None)
def get_key_rotation_status(self):
key_id = self.parameters.get('KeyId')
_assert_valid_key_id(key_id)
try:
rotation_enabled = self.kms_backend.get_key_rotation_status(key_id)
except KeyError:
raise JSONResponseError(404, 'Not Found', body={
'message': "Key 'arn:aws:kms:{region}:012345678912:key/{key_id}' does not exist".format(region=self.region,key_id=key_id),
'__type': 'NotFoundException'})
return json.dumps({'KeyRotationEnabled': rotation_enabled})
def put_key_policy(self):
key_id = self.parameters.get('KeyId')
policy_name = self.parameters.get('PolicyName')
policy = self.parameters.get('Policy')
_assert_valid_key_id(key_id)
_assert_default_policy(policy_name)
try:
self.kms_backend.put_key_policy(key_id, policy)
except KeyError:
raise JSONResponseError(404, 'Not Found', body={
'message': "Key 'arn:aws:kms:{region}:012345678912:key/{key_id}' does not exist".format(region=self.region,key_id=key_id),
'__type': 'NotFoundException'})
return json.dumps(None)
def get_key_policy(self):
key_id = self.parameters.get('KeyId')
policy_name = self.parameters.get('PolicyName')
_assert_valid_key_id(key_id)
_assert_default_policy(policy_name)
try:
return json.dumps({'Policy': self.kms_backend.get_key_policy(key_id)})
except KeyError:
raise JSONResponseError(404, 'Not Found', body={
'message': "Key 'arn:aws:kms:{region}:012345678912:key/{key_id}' does not exist".format(region=self.region,key_id=key_id),
'__type': 'NotFoundException'})
def list_key_policies(self):
key_id = self.parameters.get('KeyId')
_assert_valid_key_id(key_id)
try:
self.kms_backend.describe_key(key_id)
except KeyError:
raise JSONResponseError(404, 'Not Found', body={
'message': "Key 'arn:aws:kms:{region}:012345678912:key/{key_id}' does not exist".format(region=self.region,key_id=key_id),
'__type': 'NotFoundException'})
return json.dumps({'Truncated': False, 'PolicyNames': ['default']})
def _assert_valid_key_id(key_id):
if not re.match(r'^[A-F0-9]{8}-[A-F0-9]{4}-[A-F0-9]{4}-[A-F0-9]{4}-[A-F0-9]{12}$', key_id, re.IGNORECASE):
raise JSONResponseError(404, 'Not Found', body={'message': ' Invalid keyId', '__type': 'NotFoundException'})
def _assert_default_policy(policy_name):
if policy_name != 'default':
raise JSONResponseError(404, 'Not Found', body={
'message': "No such policy exists",
'__type': 'NotFoundException'})

View file

@ -8,8 +8,8 @@ from xml.sax.saxutils import escape
import boto.sqs
from moto.core import BaseBackend
from moto.core.utils import camelcase_to_underscores, get_random_message_id
from .utils import generate_receipt_handle, unix_time_millis
from moto.core.utils import camelcase_to_underscores, get_random_message_id, unix_time_millis
from .utils import generate_receipt_handle
from .exceptions import (
ReceiptHandleIsInvalid,
MessageNotInflight

View file

@ -1,5 +1,4 @@
from __future__ import unicode_literals
import datetime
import random
import string
@ -12,17 +11,6 @@ def generate_receipt_handle():
return ''.join(random.choice(string.ascii_lowercase) for x in range(length))
def unix_time(dt=None):
dt = dt or datetime.datetime.utcnow()
epoch = datetime.datetime.utcfromtimestamp(0)
delta = dt - epoch
return (delta.days * 86400) + (delta.seconds + (delta.microseconds / 1e6))
def unix_time_millis(dt=None):
return unix_time(dt) * 1000.0
def parse_message_attributes(querystring, base='', value_namespace='Value.'):
message_attributes = {}
index = 1

12
moto/swf/__init__.py Normal file
View file

@ -0,0 +1,12 @@
from __future__ import unicode_literals
from .models import swf_backends
from ..core.models import MockAWS
swf_backend = swf_backends['us-east-1']
def mock_swf(func=None):
if func:
return MockAWS(swf_backends)(func)
else:
return MockAWS(swf_backends)

85
moto/swf/constants.py Normal file
View file

@ -0,0 +1,85 @@
# List decision fields and if they're required or not
#
# See http://docs.aws.amazon.com/amazonswf/latest/apireference/API_RespondDecisionTaskCompleted.html
# and subsequent docs for each decision type.
DECISIONS_FIELDS = {
"cancelTimerDecisionAttributes": {
"timerId": { "type": "string", "required": True }
},
"cancelWorkflowExecutionDecisionAttributes": {
"details": { "type": "string", "required": False }
},
"completeWorkflowExecutionDecisionAttributes": {
"result": { "type": "string", "required": False }
},
"continueAsNewWorkflowExecutionDecisionAttributes": {
"childPolicy": { "type": "string", "required": False },
"executionStartToCloseTimeout": { "type": "string", "required": False },
"input": { "type": "string", "required": False },
"lambdaRole": { "type": "string", "required": False },
"tagList": { "type": "string", "array": True, "required": False },
"taskList": { "type": "TaskList", "required": False },
"taskPriority": { "type": "string", "required": False },
"taskStartToCloseTimeout": { "type": "string", "required": False },
"workflowTypeVersion": { "type": "string", "required": False }
},
"failWorkflowExecutionDecisionAttributes": {
"details": { "type": "string", "required": False },
"reason": { "type": "string", "required": False }
},
"recordMarkerDecisionAttributes": {
"details": { "type": "string", "required": False },
"markerName": { "type": "string", "required": True }
},
"requestCancelActivityTaskDecisionAttributes": {
"activityId": { "type": "string", "required": True }
},
"requestCancelExternalWorkflowExecutionDecisionAttributes": {
"control": { "type": "string", "required": False },
"runId": { "type": "string", "required": False },
"workflowId": { "type": "string", "required": True }
},
"scheduleActivityTaskDecisionAttributes": {
"activityId": { "type": "string", "required": True },
"activityType": { "type": "ActivityType", "required": True },
"control": { "type": "string", "required": False },
"heartbeatTimeout": { "type": "string", "required": False },
"input": { "type": "string", "required": False },
"scheduleToCloseTimeout": { "type": "string", "required": False },
"scheduleToStartTimeout": { "type": "string", "required": False },
"startToCloseTimeout": { "type": "string", "required": False },
"taskList": { "type": "TaskList", "required": False },
"taskPriority": { "type": "string", "required": False }
},
"scheduleLambdaFunctionDecisionAttributes": {
"id": { "type": "string", "required": True },
"input": { "type": "string", "required": False },
"name": { "type": "string", "required": True },
"startToCloseTimeout": { "type": "string", "required": False }
},
"signalExternalWorkflowExecutionDecisionAttributes": {
"control": { "type": "string", "required": False },
"input": { "type": "string", "required": False },
"runId": { "type": "string", "required": False },
"signalName": { "type": "string", "required": True },
"workflowId": { "type": "string", "required": True }
},
"startChildWorkflowExecutionDecisionAttributes": {
"childPolicy": { "type": "string", "required": False },
"control": { "type": "string", "required": False },
"executionStartToCloseTimeout": { "type": "string", "required": False },
"input": { "type": "string", "required": False },
"lambdaRole": { "type": "string", "required": False },
"tagList": { "type": "string", "array": True, "required": False },
"taskList": { "type": "TaskList", "required": False },
"taskPriority": { "type": "string", "required": False },
"taskStartToCloseTimeout": { "type": "string", "required": False },
"workflowId": { "type": "string", "required": True },
"workflowType": { "type": "WorkflowType", "required": True }
},
"startTimerDecisionAttributes": {
"control": { "type": "string", "required": False },
"startToFireTimeout": { "type": "string", "required": True },
"timerId": { "type": "string", "required": True }
}
}

126
moto/swf/exceptions.py Normal file
View file

@ -0,0 +1,126 @@
from __future__ import unicode_literals
from boto.exception import JSONResponseError
class SWFClientError(JSONResponseError):
def __init__(self, message, __type):
super(SWFClientError, self).__init__(
400, "Bad Request",
body={"message": message, "__type": __type}
)
class SWFUnknownResourceFault(SWFClientError):
def __init__(self, resource_type, resource_name=None):
if resource_name:
message = "Unknown {0}: {1}".format(resource_type, resource_name)
else:
message = "Unknown {0}".format(resource_type)
super(SWFUnknownResourceFault, self).__init__(
message,
"com.amazonaws.swf.base.model#UnknownResourceFault")
class SWFDomainAlreadyExistsFault(SWFClientError):
def __init__(self, domain_name):
super(SWFDomainAlreadyExistsFault, self).__init__(
domain_name,
"com.amazonaws.swf.base.model#DomainAlreadyExistsFault")
class SWFDomainDeprecatedFault(SWFClientError):
def __init__(self, domain_name):
super(SWFDomainDeprecatedFault, self).__init__(
domain_name,
"com.amazonaws.swf.base.model#DomainDeprecatedFault")
class SWFSerializationException(JSONResponseError):
def __init__(self, value):
message = "class java.lang.Foo can not be converted to an String "
message += " (not a real SWF exception ; happened on: {0})".format(value)
__type = "com.amazonaws.swf.base.model#SerializationException"
super(SWFSerializationException, self).__init__(
400, "Bad Request",
body={"Message": message, "__type": __type}
)
class SWFTypeAlreadyExistsFault(SWFClientError):
def __init__(self, _type):
super(SWFTypeAlreadyExistsFault, self).__init__(
"{0}=[name={1}, version={2}]".format(_type.__class__.__name__, _type.name, _type.version),
"com.amazonaws.swf.base.model#TypeAlreadyExistsFault")
class SWFTypeDeprecatedFault(SWFClientError):
def __init__(self, _type):
super(SWFTypeDeprecatedFault, self).__init__(
"{0}=[name={1}, version={2}]".format(_type.__class__.__name__, _type.name, _type.version),
"com.amazonaws.swf.base.model#TypeDeprecatedFault")
class SWFWorkflowExecutionAlreadyStartedFault(JSONResponseError):
def __init__(self):
super(SWFWorkflowExecutionAlreadyStartedFault, self).__init__(
400, "Bad Request",
body={"__type": "com.amazonaws.swf.base.model#WorkflowExecutionAlreadyStartedFault"}
)
class SWFDefaultUndefinedFault(SWFClientError):
def __init__(self, key):
# TODO: move that into moto.core.utils maybe?
words = key.split("_")
key_camel_case = words.pop(0)
for word in words:
key_camel_case += word.capitalize()
super(SWFDefaultUndefinedFault, self).__init__(
key_camel_case, "com.amazonaws.swf.base.model#DefaultUndefinedFault"
)
class SWFValidationException(SWFClientError):
def __init__(self, message):
super(SWFValidationException, self).__init__(
message,
"com.amazon.coral.validate#ValidationException"
)
class SWFDecisionValidationException(SWFClientError):
def __init__(self, problems):
# messages
messages = []
for pb in problems:
if pb["type"] == "null_value":
messages.append(
"Value null at '%(where)s' failed to satisfy constraint: "\
"Member must not be null" % pb
)
elif pb["type"] == "bad_decision_type":
messages.append(
"Value '%(value)s' at '%(where)s' failed to satisfy constraint: " \
"Member must satisfy enum value set: " \
"[%(possible_values)s]" % pb
)
else:
raise ValueError(
"Unhandled decision constraint type: {0}".format(pb["type"])
)
# prefix
count = len(problems)
if count < 2:
prefix = "{0} validation error detected: "
else:
prefix = "{0} validation errors detected: "
super(SWFDecisionValidationException, self).__init__(
prefix.format(count) + "; ".join(messages),
"com.amazon.coral.validate#ValidationException"
)
class SWFWorkflowExecutionClosedError(Exception):
def __str__(self):
return repr("Cannot change this object because the WorkflowExecution is closed")

337
moto/swf/models/__init__.py Normal file
View file

@ -0,0 +1,337 @@
from __future__ import unicode_literals
import six
import boto.swf
from moto.core import BaseBackend
from ..exceptions import (
SWFUnknownResourceFault,
SWFDomainAlreadyExistsFault,
SWFDomainDeprecatedFault,
SWFTypeAlreadyExistsFault,
SWFTypeDeprecatedFault,
SWFValidationException,
)
from .activity_task import ActivityTask
from .activity_type import ActivityType
from .decision_task import DecisionTask
from .domain import Domain
from .generic_type import GenericType
from .history_event import HistoryEvent
from .timeout import Timeout
from .workflow_type import WorkflowType
from .workflow_execution import WorkflowExecution
KNOWN_SWF_TYPES = {
"activity": ActivityType,
"workflow": WorkflowType,
}
class SWFBackend(BaseBackend):
def __init__(self, region_name):
self.region_name = region_name
self.domains = []
super(SWFBackend, self).__init__()
def reset(self):
region_name = self.region_name
self.__dict__ = {}
self.__init__(region_name)
def _get_domain(self, name, ignore_empty=False):
matching = [domain for domain in self.domains if domain.name == name]
if not matching and not ignore_empty:
raise SWFUnknownResourceFault("domain", name)
if matching:
return matching[0]
return None
def _process_timeouts(self):
for domain in self.domains:
for wfe in domain.workflow_executions:
wfe._process_timeouts()
def list_domains(self, status, reverse_order=None):
domains = [domain for domain in self.domains
if domain.status == status]
domains = sorted(domains, key=lambda domain: domain.name)
if reverse_order:
domains = reversed(domains)
return domains
def register_domain(self, name, workflow_execution_retention_period_in_days,
description=None):
if self._get_domain(name, ignore_empty=True):
raise SWFDomainAlreadyExistsFault(name)
domain = Domain(name, workflow_execution_retention_period_in_days,
description)
self.domains.append(domain)
def deprecate_domain(self, name):
domain = self._get_domain(name)
if domain.status == "DEPRECATED":
raise SWFDomainDeprecatedFault(name)
domain.status = "DEPRECATED"
def describe_domain(self, name):
return self._get_domain(name)
def list_types(self, kind, domain_name, status, reverse_order=None):
domain = self._get_domain(domain_name)
_types = domain.find_types(kind, status)
_types = sorted(_types, key=lambda domain: domain.name)
if reverse_order:
_types = reversed(_types)
return _types
def register_type(self, kind, domain_name, name, version, **kwargs):
domain = self._get_domain(domain_name)
_type = domain.get_type(kind, name, version, ignore_empty=True)
if _type:
raise SWFTypeAlreadyExistsFault(_type)
_class = KNOWN_SWF_TYPES[kind]
_type = _class(name, version, **kwargs)
domain.add_type(_type)
def deprecate_type(self, kind, domain_name, name, version):
domain = self._get_domain(domain_name)
_type = domain.get_type(kind, name, version)
if _type.status == "DEPRECATED":
raise SWFTypeDeprecatedFault(_type)
_type.status = "DEPRECATED"
def describe_type(self, kind, domain_name, name, version):
domain = self._get_domain(domain_name)
return domain.get_type(kind, name, version)
def start_workflow_execution(self, domain_name, workflow_id,
workflow_name, workflow_version,
tag_list=None, **kwargs):
domain = self._get_domain(domain_name)
wf_type = domain.get_type("workflow", workflow_name, workflow_version)
if wf_type.status == "DEPRECATED":
raise SWFTypeDeprecatedFault(wf_type)
wfe = WorkflowExecution(domain, wf_type, workflow_id,
tag_list=tag_list, **kwargs)
domain.add_workflow_execution(wfe)
wfe.start()
return wfe
def describe_workflow_execution(self, domain_name, run_id, workflow_id):
# process timeouts on all objects
self._process_timeouts()
domain = self._get_domain(domain_name)
return domain.get_workflow_execution(workflow_id, run_id=run_id)
def poll_for_decision_task(self, domain_name, task_list, identity=None):
# process timeouts on all objects
self._process_timeouts()
domain = self._get_domain(domain_name)
# Real SWF cases:
# - case 1: there's a decision task to return, return it
# - case 2: there's no decision task to return, so wait for timeout
# and if a new decision is schedule, start and return it
# - case 3: timeout reached, no decision, return an empty decision
# (e.g. a decision with an empty "taskToken")
#
# For the sake of simplicity, we forget case 2 for now, so either
# there's a DecisionTask to return, either we return a blank one.
#
# SWF client libraries should cope with that easily as long as tests
# aren't distributed.
#
# TODO: handle long polling (case 2) for decision tasks
candidates = []
for _task_list, tasks in domain.decision_task_lists.items():
if _task_list == task_list:
candidates += [t for t in tasks if t.state == "SCHEDULED"]
if any(candidates):
# TODO: handle task priorities (but not supported by boto for now)
task = min(candidates, key=lambda d: d.scheduled_at)
wfe = task.workflow_execution
wfe.start_decision_task(task.task_token, identity=identity)
return task
else:
return None
def count_pending_decision_tasks(self, domain_name, task_list):
# process timeouts on all objects
self._process_timeouts()
domain = self._get_domain(domain_name)
count = 0
for wfe in domain.workflow_executions:
if wfe.task_list == task_list:
count += wfe.open_counts["openDecisionTasks"]
return count
def respond_decision_task_completed(self, task_token,
decisions=None,
execution_context=None):
# process timeouts on all objects
self._process_timeouts()
# let's find decision task
decision_task = None
for domain in self.domains:
for wfe in domain.workflow_executions:
for dt in wfe.decision_tasks:
if dt.task_token == task_token:
decision_task = dt
# no decision task found
if not decision_task:
# In the real world, SWF distinguishes an obviously invalid token and a
# token that has no corresponding decision task. For the latter it seems
# to wait until a task with that token comes up (which looks like a smart
# choice in an eventually-consistent system). The call doesn't seem to
# timeout shortly, it takes 3 or 4 minutes to result in:
# BotoServerError: 500 Internal Server Error
# {"__type":"com.amazon.coral.service#InternalFailure"}
# This behavior is not documented clearly in SWF docs and we'll ignore it
# in moto, as there is no obvious reason to rely on it in tests.
raise SWFValidationException("Invalid token")
# decision task found, but WorflowExecution is CLOSED
wfe = decision_task.workflow_execution
if not wfe.open:
raise SWFUnknownResourceFault(
"execution",
"WorkflowExecution=[workflowId={0}, runId={1}]".format(
wfe.workflow_id, wfe.run_id
)
)
# decision task found, but already completed
if decision_task.state != "STARTED":
if decision_task.state == "COMPLETED":
raise SWFUnknownResourceFault(
"decision task, scheduledEventId = {0}".format(decision_task.scheduled_event_id)
)
else:
raise ValueError(
"This shouldn't happen: you have to PollForDecisionTask to get a token, "
"which changes DecisionTask status to 'STARTED' ; then it can only change "
"to 'COMPLETED'. If you didn't hack moto/swf internals, this is probably "
"a bug in moto, please report it, thanks!"
)
# everything's good
if decision_task:
wfe = decision_task.workflow_execution
wfe.complete_decision_task(decision_task.task_token,
decisions=decisions,
execution_context=execution_context)
def poll_for_activity_task(self, domain_name, task_list, identity=None):
# process timeouts on all objects
self._process_timeouts()
domain = self._get_domain(domain_name)
# Real SWF cases:
# - case 1: there's an activity task to return, return it
# - case 2: there's no activity task to return, so wait for timeout
# and if a new activity is scheduled, return it
# - case 3: timeout reached, no activity task, return an empty response
# (e.g. a response with an empty "taskToken")
#
# For the sake of simplicity, we forget case 2 for now, so either
# there's an ActivityTask to return, either we return a blank one.
#
# SWF client libraries should cope with that easily as long as tests
# aren't distributed.
#
# TODO: handle long polling (case 2) for activity tasks
candidates = []
for _task_list, tasks in domain.activity_task_lists.items():
if _task_list == task_list:
candidates += [t for t in tasks if t.state == "SCHEDULED"]
if any(candidates):
# TODO: handle task priorities (but not supported by boto for now)
task = min(candidates, key=lambda d: d.scheduled_at)
wfe = task.workflow_execution
wfe.start_activity_task(task.task_token, identity=identity)
return task
else:
return None
def count_pending_activity_tasks(self, domain_name, task_list):
# process timeouts on all objects
self._process_timeouts()
domain = self._get_domain(domain_name)
count = 0
for _task_list, tasks in domain.activity_task_lists.items():
if _task_list == task_list:
pending = [t for t in tasks if t.state in ["SCHEDULED", "STARTED"]]
count += len(pending)
return count
def _find_activity_task_from_token(self, task_token):
activity_task = None
for domain in self.domains:
for wfe in domain.workflow_executions:
for task in wfe.activity_tasks:
if task.task_token == task_token:
activity_task = task
# no task found
if not activity_task:
# Same as for decision tasks, we raise an invalid token BOTH for clearly
# wrong SWF tokens and OK tokens but not used correctly. This should not
# be a problem in moto.
raise SWFValidationException("Invalid token")
# activity task found, but WorflowExecution is CLOSED
wfe = activity_task.workflow_execution
if not wfe.open:
raise SWFUnknownResourceFault(
"execution",
"WorkflowExecution=[workflowId={0}, runId={1}]".format(
wfe.workflow_id, wfe.run_id
)
)
# activity task found, but already completed
if activity_task.state != "STARTED":
if activity_task.state == "COMPLETED":
raise SWFUnknownResourceFault(
"activity, scheduledEventId = {0}".format(activity_task.scheduled_event_id)
)
else:
raise ValueError(
"This shouldn't happen: you have to PollForActivityTask to get a token, "
"which changes ActivityTask status to 'STARTED' ; then it can only change "
"to 'COMPLETED'. If you didn't hack moto/swf internals, this is probably "
"a bug in moto, please report it, thanks!"
)
# everything's good
return activity_task
def respond_activity_task_completed(self, task_token, result=None):
# process timeouts on all objects
self._process_timeouts()
activity_task = self._find_activity_task_from_token(task_token)
wfe = activity_task.workflow_execution
wfe.complete_activity_task(activity_task.task_token, result=result)
def respond_activity_task_failed(self, task_token, reason=None, details=None):
# process timeouts on all objects
self._process_timeouts()
activity_task = self._find_activity_task_from_token(task_token)
wfe = activity_task.workflow_execution
wfe.fail_activity_task(activity_task.task_token, reason=reason, details=details)
def terminate_workflow_execution(self, domain_name, workflow_id, child_policy=None,
details=None, reason=None, run_id=None):
# process timeouts on all objects
self._process_timeouts()
domain = self._get_domain(domain_name)
wfe = domain.get_workflow_execution(workflow_id, run_id=run_id, raise_if_closed=True)
wfe.terminate(child_policy=child_policy, details=details, reason=reason)
def record_activity_task_heartbeat(self, task_token, details=None):
# process timeouts on all objects
self._process_timeouts()
activity_task = self._find_activity_task_from_token(task_token)
activity_task.reset_heartbeat_clock()
if details:
activity_task.details = details
swf_backends = {}
for region in boto.swf.regions():
swf_backends[region.name] = SWFBackend(region.name)

View file

@ -0,0 +1,83 @@
from __future__ import unicode_literals
from datetime import datetime
import uuid
from moto.core.utils import unix_time
from ..exceptions import SWFWorkflowExecutionClosedError
from .timeout import Timeout
class ActivityTask(object):
def __init__(self, activity_id, activity_type, scheduled_event_id,
workflow_execution, timeouts, input=None):
self.activity_id = activity_id
self.activity_type = activity_type
self.details = None
self.input = input
self.last_heartbeat_timestamp = unix_time()
self.scheduled_event_id = scheduled_event_id
self.started_event_id = None
self.state = "SCHEDULED"
self.task_token = str(uuid.uuid4())
self.timeouts = timeouts
self.timeout_type = None
self.workflow_execution = workflow_execution
# this is *not* necessarily coherent with workflow execution history,
# but that shouldn't be a problem for tests
self.scheduled_at = datetime.utcnow()
def _check_workflow_execution_open(self):
if not self.workflow_execution.open:
raise SWFWorkflowExecutionClosedError()
@property
def open(self):
return self.state in ["SCHEDULED", "STARTED"]
def to_full_dict(self):
hsh = {
"activityId": self.activity_id,
"activityType": self.activity_type.to_short_dict(),
"taskToken": self.task_token,
"startedEventId": self.started_event_id,
"workflowExecution": self.workflow_execution.to_short_dict(),
}
if self.input:
hsh["input"] = self.input
return hsh
def start(self, started_event_id):
self.state = "STARTED"
self.started_event_id = started_event_id
def complete(self):
self._check_workflow_execution_open()
self.state = "COMPLETED"
def fail(self):
self._check_workflow_execution_open()
self.state = "FAILED"
def reset_heartbeat_clock(self):
self.last_heartbeat_timestamp = unix_time()
def first_timeout(self):
if not self.open or not self.workflow_execution.open:
return None
# TODO: handle the "NONE" case
heartbeat_timeout_at = (self.last_heartbeat_timestamp +
int(self.timeouts["heartbeatTimeout"]))
_timeout = Timeout(self, heartbeat_timeout_at, "HEARTBEAT")
if _timeout.reached:
return _timeout
def process_timeouts(self):
_timeout = self.first_timeout()
if _timeout:
self.timeout(_timeout)
def timeout(self, _timeout):
self._check_workflow_execution_open()
self.state = "TIMED_OUT"
self.timeout_type = _timeout.kind

View file

@ -0,0 +1,16 @@
from .generic_type import GenericType
class ActivityType(GenericType):
@property
def _configuration_keys(self):
return [
"defaultTaskHeartbeatTimeout",
"defaultTaskScheduleToCloseTimeout",
"defaultTaskScheduleToStartTimeout",
"defaultTaskStartToCloseTimeout",
]
@property
def kind(self):
return "activity"

View file

@ -0,0 +1,76 @@
from __future__ import unicode_literals
from datetime import datetime
import uuid
from moto.core.utils import unix_time
from ..exceptions import SWFWorkflowExecutionClosedError
from .timeout import Timeout
class DecisionTask(object):
def __init__(self, workflow_execution, scheduled_event_id):
self.workflow_execution = workflow_execution
self.workflow_type = workflow_execution.workflow_type
self.task_token = str(uuid.uuid4())
self.scheduled_event_id = scheduled_event_id
self.previous_started_event_id = 0
self.started_event_id = None
self.started_timestamp = None
self.start_to_close_timeout = self.workflow_execution.task_start_to_close_timeout
self.state = "SCHEDULED"
# this is *not* necessarily coherent with workflow execution history,
# but that shouldn't be a problem for tests
self.scheduled_at = datetime.utcnow()
self.timeout_type = None
@property
def started(self):
return self.state == "STARTED"
def _check_workflow_execution_open(self):
if not self.workflow_execution.open:
raise SWFWorkflowExecutionClosedError()
def to_full_dict(self, reverse_order=False):
events = self.workflow_execution.events(reverse_order=reverse_order)
hsh = {
"events": [
evt.to_dict() for evt in events
],
"taskToken": self.task_token,
"previousStartedEventId": self.previous_started_event_id,
"workflowExecution": self.workflow_execution.to_short_dict(),
"workflowType": self.workflow_type.to_short_dict(),
}
if self.started_event_id:
hsh["startedEventId"] = self.started_event_id
return hsh
def start(self, started_event_id):
self.state = "STARTED"
self.started_timestamp = unix_time()
self.started_event_id = started_event_id
def complete(self):
self._check_workflow_execution_open()
self.state = "COMPLETED"
def first_timeout(self):
if not self.started or not self.workflow_execution.open:
return None
# TODO: handle the "NONE" case
start_to_close_at = self.started_timestamp + int(self.start_to_close_timeout)
_timeout = Timeout(self, start_to_close_at, "START_TO_CLOSE")
if _timeout.reached:
return _timeout
def process_timeouts(self):
_timeout = self.first_timeout()
if _timeout:
self.timeout(_timeout)
def timeout(self, _timeout):
self._check_workflow_execution_open()
self.state = "TIMED_OUT"
self.timeout_type = _timeout.kind

124
moto/swf/models/domain.py Normal file
View file

@ -0,0 +1,124 @@
from __future__ import unicode_literals
from collections import defaultdict
from ..exceptions import (
SWFUnknownResourceFault,
SWFWorkflowExecutionAlreadyStartedFault,
)
class Domain(object):
def __init__(self, name, retention, description=None):
self.name = name
self.retention = retention
self.description = description
self.status = "REGISTERED"
self.types = {
"activity": defaultdict(dict),
"workflow": defaultdict(dict),
}
# Workflow executions have an id, which unicity is guaranteed
# at domain level (not super clear in the docs, but I checked
# that against SWF API) ; hence the storage method as a dict
# of "workflow_id (client determined)" => WorkflowExecution()
# here.
self.workflow_executions = []
self.activity_task_lists = {}
self.decision_task_lists = {}
def __repr__(self):
return "Domain(name: %(name)s, status: %(status)s)" % self.__dict__
def to_short_dict(self):
hsh = {
"name": self.name,
"status": self.status,
}
if self.description:
hsh["description"] = self.description
return hsh
def to_full_dict(self):
return {
"domainInfo": self.to_short_dict(),
"configuration": {
"workflowExecutionRetentionPeriodInDays": self.retention,
}
}
def get_type(self, kind, name, version, ignore_empty=False):
try:
return self.types[kind][name][version]
except KeyError:
if not ignore_empty:
raise SWFUnknownResourceFault(
"type",
"{0}Type=[name={1}, version={2}]".format(
kind.capitalize(), name, version
)
)
def add_type(self, _type):
self.types[_type.kind][_type.name][_type.version] = _type
def find_types(self, kind, status):
_all = []
for family in self.types[kind].values():
for _type in family.values():
if _type.status == status:
_all.append(_type)
return _all
def add_workflow_execution(self, workflow_execution):
_id = workflow_execution.workflow_id
if self.get_workflow_execution(_id, raise_if_none=False):
raise SWFWorkflowExecutionAlreadyStartedFault()
self.workflow_executions.append(workflow_execution)
def get_workflow_execution(self, workflow_id, run_id=None,
raise_if_none=True, raise_if_closed=False):
# query
if run_id:
_all = [w for w in self.workflow_executions
if w.workflow_id == workflow_id and w.run_id == run_id]
else:
_all = [w for w in self.workflow_executions
if w.workflow_id == workflow_id and w.open]
# reduce
wfe = _all[0] if _all else None
# raise if closed / none
if raise_if_closed and wfe and wfe.execution_status == "CLOSED":
wfe = None
if not wfe and raise_if_none:
if run_id:
args = ["execution", "WorkflowExecution=[workflowId={0}, runId={1}]".format(
workflow_id, run_id)]
else:
args = ["execution, workflowId = {0}".format(workflow_id)]
raise SWFUnknownResourceFault(*args)
# at last return workflow execution
return wfe
def add_to_activity_task_list(self, task_list, obj):
if not task_list in self.activity_task_lists:
self.activity_task_lists[task_list] = []
self.activity_task_lists[task_list].append(obj)
@property
def activity_tasks(self):
_all = []
for tasks in self.activity_task_lists.values():
_all += tasks
return _all
def add_to_decision_task_list(self, task_list, obj):
if not task_list in self.decision_task_lists:
self.decision_task_lists[task_list] = []
self.decision_task_lists[task_list].append(obj)
@property
def decision_tasks(self):
_all = []
for tasks in self.decision_task_lists.values():
_all += tasks
return _all

View file

@ -0,0 +1,66 @@
from __future__ import unicode_literals
from moto.core.utils import camelcase_to_underscores
class GenericType(object):
def __init__(self, name, version, **kwargs):
self.name = name
self.version = version
self.status = "REGISTERED"
if "description" in kwargs:
self.description = kwargs.pop("description")
for key, value in kwargs.items():
self.__setattr__(key, value)
# default values set to none
for key in self._configuration_keys:
attr = camelcase_to_underscores(key)
if not hasattr(self, attr):
self.__setattr__(attr, None)
if not hasattr(self, "task_list"):
self.task_list = None
def __repr__(self):
cls = self.__class__.__name__
attrs = "name: %(name)s, version: %(version)s, status: %(status)s" % self.__dict__
return "{0}({1})".format(cls, attrs)
@property
def kind(self):
raise NotImplementedError()
@property
def _configuration_keys(self):
raise NotImplementedError()
def to_short_dict(self):
return {
"name": self.name,
"version": self.version,
}
def to_medium_dict(self):
hsh = {
"{0}Type".format(self.kind): self.to_short_dict(),
"creationDate": 1420066800,
"status": self.status,
}
if self.status == "DEPRECATED":
hsh["deprecationDate"] = 1422745200
if hasattr(self, "description"):
hsh["description"] = self.description
return hsh
def to_full_dict(self):
hsh = {
"typeInfo": self.to_medium_dict(),
"configuration": {}
}
if self.task_list:
hsh["configuration"]["defaultTaskList"] = {"name": self.task_list}
for key in self._configuration_keys:
attr = camelcase_to_underscores(key)
if not getattr(self, attr):
continue
hsh["configuration"][key] = getattr(self, attr)
return hsh

View file

@ -0,0 +1,65 @@
from __future__ import unicode_literals
from moto.core.utils import underscores_to_camelcase, unix_time
from ..utils import decapitalize
# We keep track of which history event types we support
# so that we'll be able to catch specific formatting
# for new events if needed.
SUPPORTED_HISTORY_EVENT_TYPES = (
"WorkflowExecutionStarted",
"DecisionTaskScheduled",
"DecisionTaskStarted",
"DecisionTaskCompleted",
"WorkflowExecutionCompleted",
"WorkflowExecutionFailed",
"ActivityTaskScheduled",
"ScheduleActivityTaskFailed",
"ActivityTaskStarted",
"ActivityTaskCompleted",
"ActivityTaskFailed",
"WorkflowExecutionTerminated",
"ActivityTaskTimedOut",
"DecisionTaskTimedOut",
"WorkflowExecutionTimedOut",
)
class HistoryEvent(object):
def __init__(self, event_id, event_type, event_timestamp=None, **kwargs):
if event_type not in SUPPORTED_HISTORY_EVENT_TYPES:
raise NotImplementedError(
"HistoryEvent does not implement attributes for type '{0}'".format(event_type)
)
self.event_id = event_id
self.event_type = event_type
if event_timestamp:
self.event_timestamp = event_timestamp
else:
self.event_timestamp = unix_time()
# pre-populate a dict: {"camelCaseKey": value}
self.event_attributes = {}
for key, value in kwargs.items():
if value:
camel_key = underscores_to_camelcase(key)
if key == "task_list":
value = {"name": value}
elif key == "workflow_type":
value = {"name": value.name, "version": value.version}
elif key == "activity_type":
value = value.to_short_dict()
self.event_attributes[camel_key] = value
def to_dict(self):
return {
"eventId": self.event_id,
"eventType": self.event_type,
"eventTimestamp": self.event_timestamp,
self._attributes_key(): self.event_attributes
}
def _attributes_key(self):
key = "{0}EventAttributes".format(self.event_type)
return decapitalize(key)

View file

@ -0,0 +1,12 @@
from moto.core.utils import unix_time
class Timeout(object):
def __init__(self, obj, timestamp, kind):
self.obj = obj
self.timestamp = timestamp
self.kind = kind
@property
def reached(self):
return unix_time() >= self.timestamp

View file

@ -0,0 +1,614 @@
from __future__ import unicode_literals
import uuid
from moto.core.utils import camelcase_to_underscores, unix_time
from ..constants import (
DECISIONS_FIELDS,
)
from ..exceptions import (
SWFDefaultUndefinedFault,
SWFValidationException,
SWFDecisionValidationException,
)
from ..utils import decapitalize
from .activity_task import ActivityTask
from .activity_type import ActivityType
from .decision_task import DecisionTask
from .history_event import HistoryEvent
from .timeout import Timeout
# TODO: extract decision related logic into a Decision class
class WorkflowExecution(object):
# NB: the list is ordered exactly as in SWF validation exceptions so we can
# mimic error messages closely ; don't reorder it without checking SWF.
KNOWN_DECISION_TYPES = [
"CompleteWorkflowExecution",
"StartTimer",
"RequestCancelExternalWorkflowExecution",
"SignalExternalWorkflowExecution",
"CancelTimer",
"RecordMarker",
"ScheduleActivityTask",
"ContinueAsNewWorkflowExecution",
"ScheduleLambdaFunction",
"FailWorkflowExecution",
"RequestCancelActivityTask",
"StartChildWorkflowExecution",
"CancelWorkflowExecution"
]
def __init__(self, domain, workflow_type, workflow_id, **kwargs):
self.domain = domain
self.workflow_id = workflow_id
self.run_id = uuid.uuid4().hex
# WorkflowExecutionInfo
self.cancel_requested = False
# TODO: check valid values among:
# COMPLETED | FAILED | CANCELED | TERMINATED | CONTINUED_AS_NEW | TIMED_OUT
# TODO: implement them all
self.close_cause = None
self.close_status = None
self.close_timestamp = None
self.execution_status = "OPEN"
self.latest_activity_task_timestamp = None
self.latest_execution_context = None
self.parent = None
self.start_timestamp = None
self.tag_list = [] # TODO
self.timeout_type = None
self.workflow_type = workflow_type
# args processing
# NB: the order follows boto/SWF order of exceptions appearance (if no
# param is set, # SWF will raise DefaultUndefinedFault errors in the
# same order as the few lines that follow)
self._set_from_kwargs_or_workflow_type(kwargs, "execution_start_to_close_timeout")
self._set_from_kwargs_or_workflow_type(kwargs, "task_list", "task_list")
self._set_from_kwargs_or_workflow_type(kwargs, "task_start_to_close_timeout")
self._set_from_kwargs_or_workflow_type(kwargs, "child_policy")
self.input = kwargs.get("input")
# counters
self.open_counts = {
"openTimers": 0,
"openDecisionTasks": 0,
"openActivityTasks": 0,
"openChildWorkflowExecutions": 0,
"openLambdaFunctions": 0,
}
# events
self._events = []
# child workflows
self.child_workflow_executions = []
def __repr__(self):
return "WorkflowExecution(run_id: {0})".format(self.run_id)
def _set_from_kwargs_or_workflow_type(self, kwargs, local_key, workflow_type_key=None):
if workflow_type_key is None:
workflow_type_key = "default_" + local_key
value = kwargs.get(local_key)
if not value and hasattr(self.workflow_type, workflow_type_key):
value = getattr(self.workflow_type, workflow_type_key)
if not value:
raise SWFDefaultUndefinedFault(local_key)
setattr(self, local_key, value)
@property
def _configuration_keys(self):
return [
"executionStartToCloseTimeout",
"childPolicy",
"taskPriority",
"taskStartToCloseTimeout",
]
def to_short_dict(self):
return {
"workflowId": self.workflow_id,
"runId": self.run_id
}
def to_medium_dict(self):
hsh = {
"execution": self.to_short_dict(),
"workflowType": self.workflow_type.to_short_dict(),
"startTimestamp": 1420066800.123,
"executionStatus": self.execution_status,
"cancelRequested": self.cancel_requested,
}
if hasattr(self, "tag_list") and self.tag_list:
hsh["tagList"] = self.tag_list
return hsh
def to_full_dict(self):
hsh = {
"executionInfo": self.to_medium_dict(),
"executionConfiguration": {
"taskList": {"name": self.task_list}
}
}
# configuration
for key in self._configuration_keys:
attr = camelcase_to_underscores(key)
if not hasattr(self, attr):
continue
if not getattr(self, attr):
continue
hsh["executionConfiguration"][key] = getattr(self, attr)
# counters
hsh["openCounts"] = self.open_counts
# latest things
if self.latest_execution_context:
hsh["latestExecutionContext"] = self.latest_execution_context
if self.latest_activity_task_timestamp:
hsh["latestActivityTaskTimestamp"] = self.latest_activity_task_timestamp
return hsh
def _process_timeouts(self):
"""
SWF timeouts can happen on different objects (workflow executions,
activity tasks, decision tasks) and should be processed in order.
A specific timeout can change the workflow execution state and have an
impact on other timeouts: for instance, if the workflow execution
timeouts, subsequent timeouts on activity or decision tasks are
irrelevant ; if an activity task timeouts, other timeouts on this task
are irrelevant, and a new decision is fired, which could well timeout
before the end of the workflow.
So the idea here is to find the earliest timeout that would have been
triggered, process it, then make the workflow state progress and repeat
the whole process.
"""
timeout_candidates = []
# workflow execution timeout
timeout_candidates.append(self.first_timeout())
# decision tasks timeouts
for task in self.decision_tasks:
timeout_candidates.append(task.first_timeout())
# activity tasks timeouts
for task in self.activity_tasks:
timeout_candidates.append(task.first_timeout())
# remove blank values (foo.first_timeout() is a Timeout or None)
timeout_candidates = list(filter(None, timeout_candidates))
# now find the first timeout to process
first_timeout = None
if timeout_candidates:
first_timeout = min(
timeout_candidates,
key=lambda t: t.timestamp
)
if first_timeout:
should_schedule_decision_next = False
if isinstance(first_timeout.obj, WorkflowExecution):
self.timeout(first_timeout)
elif isinstance(first_timeout.obj, DecisionTask):
self.timeout_decision_task(first_timeout)
should_schedule_decision_next = True
elif isinstance(first_timeout.obj, ActivityTask):
self.timeout_activity_task(first_timeout)
should_schedule_decision_next = True
else:
raise NotImplementedError("Unhandled timeout object")
# schedule decision task if needed
if should_schedule_decision_next:
self.schedule_decision_task()
# the workflow execution progressed, let's see if another
# timeout should be processed
self._process_timeouts()
def events(self, reverse_order=False):
if reverse_order:
return reversed(self._events)
else:
return self._events
def next_event_id(self):
event_ids = [evt.event_id for evt in self._events]
return max(event_ids or [0]) + 1
def _add_event(self, *args, **kwargs):
evt = HistoryEvent(self.next_event_id(), *args, **kwargs)
self._events.append(evt)
return evt
def start(self):
self.start_timestamp = unix_time()
self._add_event(
"WorkflowExecutionStarted",
child_policy=self.child_policy,
execution_start_to_close_timeout=self.execution_start_to_close_timeout,
# TODO: fix this hardcoded value
parent_initiated_event_id=0,
task_list=self.task_list,
task_start_to_close_timeout=self.task_start_to_close_timeout,
workflow_type=self.workflow_type,
)
self.schedule_decision_task()
def _schedule_decision_task(self):
evt = self._add_event(
"DecisionTaskScheduled",
start_to_close_timeout=self.task_start_to_close_timeout,
task_list=self.task_list,
)
self.domain.add_to_decision_task_list(
self.task_list,
DecisionTask(self, evt.event_id),
)
self.open_counts["openDecisionTasks"] += 1
def schedule_decision_task(self):
self._schedule_decision_task()
# Shortcut for tests: helps having auto-starting decision tasks when needed
def schedule_and_start_decision_task(self, identity=None):
self._schedule_decision_task()
decision_task = self.decision_tasks[-1]
self.start_decision_task(decision_task.task_token, identity=identity)
@property
def decision_tasks(self):
return [t for t in self.domain.decision_tasks
if t.workflow_execution == self]
@property
def activity_tasks(self):
return [t for t in self.domain.activity_tasks
if t.workflow_execution == self]
def _find_decision_task(self, task_token):
for dt in self.decision_tasks:
if dt.task_token == task_token:
return dt
raise ValueError(
"No decision task with token: {0}".format(task_token)
)
def start_decision_task(self, task_token, identity=None):
dt = self._find_decision_task(task_token)
evt = self._add_event(
"DecisionTaskStarted",
scheduled_event_id=dt.scheduled_event_id,
identity=identity
)
dt.start(evt.event_id)
def complete_decision_task(self, task_token, decisions=None, execution_context=None):
# 'decisions' can be None per boto.swf defaults, so replace it with something iterable
if not decisions:
decisions = []
# In case of a malformed or invalid decision task, SWF will raise an error and
# it won't perform any of the decisions in the decision set.
self.validate_decisions(decisions)
dt = self._find_decision_task(task_token)
evt = self._add_event(
"DecisionTaskCompleted",
scheduled_event_id=dt.scheduled_event_id,
started_event_id=dt.started_event_id,
execution_context=execution_context,
)
dt.complete()
self.should_schedule_decision_next = False
self.handle_decisions(evt.event_id, decisions)
if self.should_schedule_decision_next:
self.schedule_decision_task()
self.latest_execution_context = execution_context
def _check_decision_attributes(self, kind, value, decision_id):
problems = []
constraints = DECISIONS_FIELDS.get(kind, {})
for key, constraint in constraints.items():
if constraint["required"] and not value.get(key):
problems.append({
"type": "null_value",
"where": "decisions.{0}.member.{1}.{2}".format(
decision_id, kind, key
)
})
return problems
def validate_decisions(self, decisions):
"""
Performs some basic validations on decisions. The real SWF service
seems to break early and *not* process any decision if there's a
validation problem, such as a malformed decision for instance. I didn't
find an explicit documentation for that though, so criticisms welcome.
"""
problems = []
# check close decision is last
# (the real SWF service also works that way if you provide 2 close decision tasks)
for dcs in decisions[:-1]:
close_decision_types = [
"CompleteWorkflowExecution",
"FailWorkflowExecution",
"CancelWorkflowExecution",
]
if dcs["decisionType"] in close_decision_types:
raise SWFValidationException(
"Close must be last decision in list"
)
decision_number = 0
for dcs in decisions:
decision_number += 1
# check decision types mandatory attributes
# NB: the real SWF service seems to check attributes even for attributes list
# that are not in line with the decisionType, so we do the same
attrs_to_check = [d for d in dcs.keys() if d.endswith("DecisionAttributes")]
if dcs["decisionType"] in self.KNOWN_DECISION_TYPES:
decision_type = dcs["decisionType"]
decision_attr = "{0}DecisionAttributes".format(decapitalize(decision_type))
attrs_to_check.append(decision_attr)
for attr in attrs_to_check:
problems += self._check_decision_attributes(attr, dcs.get(attr, {}), decision_number)
# check decision type is correct
if dcs["decisionType"] not in self.KNOWN_DECISION_TYPES:
problems.append({
"type": "bad_decision_type",
"value": dcs["decisionType"],
"where": "decisions.{0}.member.decisionType".format(decision_number),
"possible_values": ", ".join(self.KNOWN_DECISION_TYPES),
})
# raise if any problem
if any(problems):
raise SWFDecisionValidationException(problems)
def handle_decisions(self, event_id, decisions):
"""
Handles a Decision according to SWF docs.
See: http://docs.aws.amazon.com/amazonswf/latest/apireference/API_Decision.html
"""
# handle each decision separately, in order
for decision in decisions:
decision_type = decision["decisionType"]
attributes_key = "{0}DecisionAttributes".format(decapitalize(decision_type))
attributes = decision.get(attributes_key, {})
if decision_type == "CompleteWorkflowExecution":
self.complete(event_id, attributes.get("result"))
elif decision_type == "FailWorkflowExecution":
self.fail(event_id, attributes.get("details"), attributes.get("reason"))
elif decision_type == "ScheduleActivityTask":
self.schedule_activity_task(event_id, attributes)
else:
# TODO: implement Decision type: CancelTimer
# TODO: implement Decision type: CancelWorkflowExecution
# TODO: implement Decision type: ContinueAsNewWorkflowExecution
# TODO: implement Decision type: RecordMarker
# TODO: implement Decision type: RequestCancelActivityTask
# TODO: implement Decision type: RequestCancelExternalWorkflowExecution
# TODO: implement Decision type: ScheduleLambdaFunction
# TODO: implement Decision type: SignalExternalWorkflowExecution
# TODO: implement Decision type: StartChildWorkflowExecution
# TODO: implement Decision type: StartTimer
raise NotImplementedError("Cannot handle decision: {0}".format(decision_type))
# finally decrement counter if and only if everything went well
self.open_counts["openDecisionTasks"] -= 1
def complete(self, event_id, result=None):
self.execution_status = "CLOSED"
self.close_status = "COMPLETED"
self.close_timestamp = unix_time()
self._add_event(
"WorkflowExecutionCompleted",
decision_task_completed_event_id=event_id,
result=result,
)
def fail(self, event_id, details=None, reason=None):
# TODO: implement lenght constraints on details/reason
self.execution_status = "CLOSED"
self.close_status = "FAILED"
self.close_timestamp = unix_time()
self._add_event(
"WorkflowExecutionFailed",
decision_task_completed_event_id=event_id,
details=details,
reason=reason,
)
def schedule_activity_task(self, event_id, attributes):
# Helper function to avoid repeating ourselves in the next sections
def fail_schedule_activity_task(_type, _cause):
# TODO: implement other possible failure mode: OPEN_ACTIVITIES_LIMIT_EXCEEDED
# NB: some failure modes are not implemented and probably won't be implemented in
# the future, such as ACTIVITY_CREATION_RATE_EXCEEDED or OPERATION_NOT_PERMITTED
self._add_event(
"ScheduleActivityTaskFailed",
activity_id=attributes["activityId"],
activity_type=_type,
cause=_cause,
decision_task_completed_event_id=event_id,
)
self.should_schedule_decision_next = True
activity_type = self.domain.get_type(
"activity",
attributes["activityType"]["name"],
attributes["activityType"]["version"],
ignore_empty=True,
)
if not activity_type:
fake_type = ActivityType(attributes["activityType"]["name"],
attributes["activityType"]["version"])
fail_schedule_activity_task(fake_type,
"ACTIVITY_TYPE_DOES_NOT_EXIST")
return
if activity_type.status == "DEPRECATED":
fail_schedule_activity_task(activity_type,
"ACTIVITY_TYPE_DEPRECATED")
return
if any(at for at in self.activity_tasks if at.activity_id == attributes["activityId"]):
fail_schedule_activity_task(activity_type,
"ACTIVITY_ID_ALREADY_IN_USE")
return
# find task list or default task list, else fail
task_list = attributes.get("taskList", {}).get("name")
if not task_list and activity_type.task_list:
task_list = activity_type.task_list
if not task_list:
fail_schedule_activity_task(activity_type,
"DEFAULT_TASK_LIST_UNDEFINED")
return
# find timeouts or default timeout, else fail
timeouts = {}
for _type in ["scheduleToStartTimeout", "scheduleToCloseTimeout", "startToCloseTimeout", "heartbeatTimeout"]:
default_key = "default_task_" + camelcase_to_underscores(_type)
default_value = getattr(activity_type, default_key)
timeouts[_type] = attributes.get(_type, default_value)
if not timeouts[_type]:
error_key = default_key.replace("default_task_", "default_")
fail_schedule_activity_task(activity_type,
"{0}_UNDEFINED".format(error_key.upper()))
return
# Only add event and increment counters now that nothing went wrong
evt = self._add_event(
"ActivityTaskScheduled",
activity_id=attributes["activityId"],
activity_type=activity_type,
control=attributes.get("control"),
decision_task_completed_event_id=event_id,
heartbeat_timeout=attributes.get("heartbeatTimeout"),
input=attributes.get("input"),
schedule_to_close_timeout=attributes.get("scheduleToCloseTimeout"),
schedule_to_start_timeout=attributes.get("scheduleToStartTimeout"),
start_to_close_timeout=attributes.get("startToCloseTimeout"),
task_list=task_list,
task_priority=attributes.get("taskPriority"),
)
task = ActivityTask(
activity_id=attributes["activityId"],
activity_type=activity_type,
input=attributes.get("input"),
scheduled_event_id=evt.event_id,
workflow_execution=self,
timeouts=timeouts,
)
self.domain.add_to_activity_task_list(task_list, task)
self.open_counts["openActivityTasks"] += 1
self.latest_activity_task_timestamp = unix_time()
def _find_activity_task(self, task_token):
for task in self.activity_tasks:
if task.task_token == task_token:
return task
raise ValueError(
"No activity task with token: {0}".format(task_token)
)
def start_activity_task(self, task_token, identity=None):
task = self._find_activity_task(task_token)
evt = self._add_event(
"ActivityTaskStarted",
scheduled_event_id=task.scheduled_event_id,
identity=identity
)
task.start(evt.event_id)
def complete_activity_task(self, task_token, result=None):
task = self._find_activity_task(task_token)
self._add_event(
"ActivityTaskCompleted",
scheduled_event_id=task.scheduled_event_id,
started_event_id=task.started_event_id,
result=result,
)
task.complete()
self.open_counts["openActivityTasks"] -= 1
# TODO: ensure we don't schedule multiple decisions at the same time!
self.schedule_decision_task()
def fail_activity_task(self, task_token, reason=None, details=None):
task = self._find_activity_task(task_token)
self._add_event(
"ActivityTaskFailed",
scheduled_event_id=task.scheduled_event_id,
started_event_id=task.started_event_id,
reason=reason,
details=details,
)
task.fail()
self.open_counts["openActivityTasks"] -= 1
# TODO: ensure we don't schedule multiple decisions at the same time!
self.schedule_decision_task()
def terminate(self, child_policy=None, details=None, reason=None):
# TODO: handle child policy for child workflows here
# TODO: handle cause="CHILD_POLICY_APPLIED"
# Until this, we set cause manually to "OPERATOR_INITIATED"
cause = "OPERATOR_INITIATED"
if not child_policy:
child_policy = self.child_policy
self._add_event(
"WorkflowExecutionTerminated",
cause=cause,
child_policy=child_policy,
details=details,
reason=reason,
)
self.execution_status = "CLOSED"
self.close_status = "TERMINATED"
self.close_cause = "OPERATOR_INITIATED"
def first_timeout(self):
if not self.open or not self.start_timestamp:
return None
start_to_close_at = self.start_timestamp + int(self.execution_start_to_close_timeout)
_timeout = Timeout(self, start_to_close_at, "START_TO_CLOSE")
if _timeout.reached:
return _timeout
def timeout(self, timeout):
# TODO: process child policy on child workflows here or in the triggering function
self.execution_status = "CLOSED"
self.close_status = "TIMED_OUT"
self.timeout_type = timeout.kind
self._add_event(
"WorkflowExecutionTimedOut",
child_policy=self.child_policy,
event_timestamp=timeout.timestamp,
timeout_type=self.timeout_type,
)
def timeout_decision_task(self, _timeout):
task = _timeout.obj
task.timeout(_timeout)
self._add_event(
"DecisionTaskTimedOut",
event_timestamp=_timeout.timestamp,
scheduled_event_id=task.scheduled_event_id,
started_event_id=task.started_event_id,
timeout_type=task.timeout_type,
)
def timeout_activity_task(self, _timeout):
task = _timeout.obj
task.timeout(_timeout)
self._add_event(
"ActivityTaskTimedOut",
details=task.details,
event_timestamp=_timeout.timestamp,
scheduled_event_id=task.scheduled_event_id,
started_event_id=task.started_event_id,
timeout_type=task.timeout_type,
)
@property
def open(self):
return self.execution_status == "OPEN"

View file

@ -0,0 +1,15 @@
from .generic_type import GenericType
class WorkflowType(GenericType):
@property
def _configuration_keys(self):
return [
"defaultChildPolicy",
"defaultExecutionStartToCloseTimeout",
"defaultTaskStartToCloseTimeout",
]
@property
def kind(self):
return "workflow"

377
moto/swf/responses.py Normal file
View file

@ -0,0 +1,377 @@
import json
import six
from moto.core.responses import BaseResponse
from werkzeug.exceptions import HTTPException
from moto.core.utils import camelcase_to_underscores, method_names_from_class
from .exceptions import SWFSerializationException
from .models import swf_backends
class SWFResponse(BaseResponse):
@property
def swf_backend(self):
return swf_backends[self.region]
# SWF parameters are passed through a JSON body, so let's ease retrieval
@property
def _params(self):
return json.loads(self.body.decode("utf-8"))
def _check_none_or_string(self, parameter):
if parameter is not None:
self._check_string(parameter)
def _check_string(self, parameter):
if not isinstance(parameter, six.string_types):
raise SWFSerializationException(parameter)
def _check_none_or_list_of_strings(self, parameter):
if parameter is not None:
self._check_list_of_strings(parameter)
def _check_list_of_strings(self, parameter):
if not isinstance(parameter, list):
raise SWFSerializationException(parameter)
for i in parameter:
if not isinstance(i, six.string_types):
raise SWFSerializationException(parameter)
def _list_types(self, kind):
domain_name = self._params["domain"]
status = self._params["registrationStatus"]
reverse_order = self._params.get("reverseOrder", None)
self._check_string(domain_name)
self._check_string(status)
types = self.swf_backend.list_types(kind, domain_name, status, reverse_order=reverse_order)
return json.dumps({
"typeInfos": [_type.to_medium_dict() for _type in types]
})
def _describe_type(self, kind):
domain = self._params["domain"]
_type_args = self._params["{0}Type".format(kind)]
name = _type_args["name"]
version = _type_args["version"]
self._check_string(domain)
self._check_string(name)
self._check_string(version)
_type = self.swf_backend.describe_type(kind, domain, name, version)
return json.dumps(_type.to_full_dict())
def _deprecate_type(self, kind):
domain = self._params["domain"]
_type_args = self._params["{0}Type".format(kind)]
name = _type_args["name"]
version = _type_args["version"]
self._check_string(domain)
self._check_string(name)
self._check_string(version)
self.swf_backend.deprecate_type(kind, domain, name, version)
return ""
# TODO: implement pagination
def list_domains(self):
status = self._params["registrationStatus"]
self._check_string(status)
reverse_order = self._params.get("reverseOrder", None)
domains = self.swf_backend.list_domains(status, reverse_order=reverse_order)
return json.dumps({
"domainInfos": [domain.to_short_dict() for domain in domains]
})
def register_domain(self):
name = self._params["name"]
retention = self._params["workflowExecutionRetentionPeriodInDays"]
description = self._params.get("description")
self._check_string(retention)
self._check_string(name)
self._check_none_or_string(description)
domain = self.swf_backend.register_domain(name, retention,
description=description)
return ""
def deprecate_domain(self):
name = self._params["name"]
self._check_string(name)
domain = self.swf_backend.deprecate_domain(name)
return ""
def describe_domain(self):
name = self._params["name"]
self._check_string(name)
domain = self.swf_backend.describe_domain(name)
return json.dumps(domain.to_full_dict())
# TODO: implement pagination
def list_activity_types(self):
return self._list_types("activity")
def register_activity_type(self):
domain = self._params["domain"]
name = self._params["name"]
version = self._params["version"]
default_task_list = self._params.get("defaultTaskList")
if default_task_list:
task_list = default_task_list.get("name")
else:
task_list = None
default_task_heartbeat_timeout = self._params.get("defaultTaskHeartbeatTimeout")
default_task_schedule_to_close_timeout = self._params.get("defaultTaskScheduleToCloseTimeout")
default_task_schedule_to_start_timeout = self._params.get("defaultTaskScheduleToStartTimeout")
default_task_start_to_close_timeout = self._params.get("defaultTaskStartToCloseTimeout")
description = self._params.get("description")
self._check_string(domain)
self._check_string(name)
self._check_string(version)
self._check_none_or_string(task_list)
self._check_none_or_string(default_task_heartbeat_timeout)
self._check_none_or_string(default_task_schedule_to_close_timeout)
self._check_none_or_string(default_task_schedule_to_start_timeout)
self._check_none_or_string(default_task_start_to_close_timeout)
self._check_none_or_string(description)
# TODO: add defaultTaskPriority when boto gets to support it
activity_type = self.swf_backend.register_type(
"activity", domain, name, version, task_list=task_list,
default_task_heartbeat_timeout=default_task_heartbeat_timeout,
default_task_schedule_to_close_timeout=default_task_schedule_to_close_timeout,
default_task_schedule_to_start_timeout=default_task_schedule_to_start_timeout,
default_task_start_to_close_timeout=default_task_start_to_close_timeout,
description=description,
)
return ""
def deprecate_activity_type(self):
return self._deprecate_type("activity")
def describe_activity_type(self):
return self._describe_type("activity")
def list_workflow_types(self):
return self._list_types("workflow")
def register_workflow_type(self):
domain = self._params["domain"]
name = self._params["name"]
version = self._params["version"]
default_task_list = self._params.get("defaultTaskList")
if default_task_list:
task_list = default_task_list.get("name")
else:
task_list = None
default_child_policy = self._params.get("defaultChildPolicy")
default_task_start_to_close_timeout = self._params.get("defaultTaskStartToCloseTimeout")
default_execution_start_to_close_timeout = self._params.get("defaultExecutionStartToCloseTimeout")
description = self._params.get("description")
self._check_string(domain)
self._check_string(name)
self._check_string(version)
self._check_none_or_string(task_list)
self._check_none_or_string(default_child_policy)
self._check_none_or_string(default_task_start_to_close_timeout)
self._check_none_or_string(default_execution_start_to_close_timeout)
self._check_none_or_string(description)
# TODO: add defaultTaskPriority when boto gets to support it
# TODO: add defaultLambdaRole when boto gets to support it
workflow_type = self.swf_backend.register_type(
"workflow", domain, name, version, task_list=task_list,
default_child_policy=default_child_policy,
default_task_start_to_close_timeout=default_task_start_to_close_timeout,
default_execution_start_to_close_timeout=default_execution_start_to_close_timeout,
description=description,
)
return ""
def deprecate_workflow_type(self):
return self._deprecate_type("workflow")
def describe_workflow_type(self):
return self._describe_type("workflow")
def start_workflow_execution(self):
domain = self._params["domain"]
workflow_id = self._params["workflowId"]
_workflow_type = self._params["workflowType"]
workflow_name = _workflow_type["name"]
workflow_version = _workflow_type["version"]
_default_task_list = self._params.get("defaultTaskList")
if _default_task_list:
task_list = _default_task_list.get("name")
else:
task_list = None
child_policy = self._params.get("childPolicy")
execution_start_to_close_timeout = self._params.get("executionStartToCloseTimeout")
input_ = self._params.get("input")
tag_list = self._params.get("tagList")
task_start_to_close_timeout = self._params.get("taskStartToCloseTimeout")
self._check_string(domain)
self._check_string(workflow_id)
self._check_string(workflow_name)
self._check_string(workflow_version)
self._check_none_or_string(task_list)
self._check_none_or_string(child_policy)
self._check_none_or_string(execution_start_to_close_timeout)
self._check_none_or_string(input_)
self._check_none_or_list_of_strings(tag_list)
self._check_none_or_string(task_start_to_close_timeout)
wfe = self.swf_backend.start_workflow_execution(
domain, workflow_id, workflow_name, workflow_version,
task_list=task_list, child_policy=child_policy,
execution_start_to_close_timeout=execution_start_to_close_timeout,
input=input_, tag_list=tag_list,
task_start_to_close_timeout=task_start_to_close_timeout
)
return json.dumps({
"runId": wfe.run_id
})
def describe_workflow_execution(self):
domain_name = self._params["domain"]
_workflow_execution = self._params["execution"]
run_id = _workflow_execution["runId"]
workflow_id = _workflow_execution["workflowId"]
self._check_string(domain_name)
self._check_string(run_id)
self._check_string(workflow_id)
wfe = self.swf_backend.describe_workflow_execution(domain_name, run_id, workflow_id)
return json.dumps(wfe.to_full_dict())
def get_workflow_execution_history(self):
domain_name = self._params["domain"]
_workflow_execution = self._params["execution"]
run_id = _workflow_execution["runId"]
workflow_id = _workflow_execution["workflowId"]
reverse_order = self._params.get("reverseOrder", None)
wfe = self.swf_backend.describe_workflow_execution(domain_name, run_id, workflow_id)
events = wfe.events(reverse_order=reverse_order)
return json.dumps({
"events": [evt.to_dict() for evt in events]
})
def poll_for_decision_task(self):
domain_name = self._params["domain"]
task_list = self._params["taskList"]["name"]
identity = self._params.get("identity")
reverse_order = self._params.get("reverseOrder", None)
self._check_string(domain_name)
self._check_string(task_list)
decision = self.swf_backend.poll_for_decision_task(
domain_name, task_list, identity=identity
)
if decision:
return json.dumps(
decision.to_full_dict(reverse_order=reverse_order)
)
else:
return json.dumps({"previousStartedEventId": 0, "startedEventId": 0})
def count_pending_decision_tasks(self):
domain_name = self._params["domain"]
task_list = self._params["taskList"]["name"]
self._check_string(domain_name)
self._check_string(task_list)
count = self.swf_backend.count_pending_decision_tasks(domain_name, task_list)
return json.dumps({"count": count, "truncated": False})
def respond_decision_task_completed(self):
task_token = self._params["taskToken"]
execution_context = self._params.get("executionContext")
decisions = self._params.get("decisions")
self._check_string(task_token)
self._check_none_or_string(execution_context)
self.swf_backend.respond_decision_task_completed(
task_token, decisions=decisions, execution_context=execution_context
)
return ""
def poll_for_activity_task(self):
domain_name = self._params["domain"]
task_list = self._params["taskList"]["name"]
identity = self._params.get("identity")
self._check_string(domain_name)
self._check_string(task_list)
self._check_none_or_string(identity)
activity_task = self.swf_backend.poll_for_activity_task(
domain_name, task_list, identity=identity
)
if activity_task:
return json.dumps(
activity_task.to_full_dict()
)
else:
return json.dumps({"startedEventId": 0})
def count_pending_activity_tasks(self):
domain_name = self._params["domain"]
task_list = self._params["taskList"]["name"]
self._check_string(domain_name)
self._check_string(task_list)
count = self.swf_backend.count_pending_activity_tasks(domain_name, task_list)
return json.dumps({"count": count, "truncated": False})
def respond_activity_task_completed(self):
task_token = self._params["taskToken"]
result = self._params.get("result")
self._check_string(task_token)
self._check_none_or_string(result)
self.swf_backend.respond_activity_task_completed(
task_token, result=result
)
return ""
def respond_activity_task_failed(self):
task_token = self._params["taskToken"]
reason = self._params.get("reason")
details = self._params.get("details")
self._check_string(task_token)
# TODO: implement length limits on reason and details (common pb with client libs)
self._check_none_or_string(reason)
self._check_none_or_string(details)
self.swf_backend.respond_activity_task_failed(
task_token, reason=reason, details=details
)
return ""
def terminate_workflow_execution(self):
domain_name = self._params["domain"]
workflow_id = self._params["workflowId"]
child_policy = self._params.get("childPolicy")
details = self._params.get("details")
reason = self._params.get("reason")
run_id = self._params.get("runId")
self._check_string(domain_name)
self._check_string(workflow_id)
self._check_none_or_string(child_policy)
self._check_none_or_string(details)
self._check_none_or_string(reason)
self._check_none_or_string(run_id)
self.swf_backend.terminate_workflow_execution(
domain_name, workflow_id, child_policy=child_policy,
details=details, reason=reason, run_id=run_id
)
return ""
def record_activity_task_heartbeat(self):
task_token = self._params["taskToken"]
details = self._params.get("details")
self._check_string(task_token)
self._check_none_or_string(details)
self.swf_backend.record_activity_task_heartbeat(
task_token, details=details
)
# TODO: make it dynamic when we implement activity tasks cancellation
return json.dumps({"cancelRequested": False})

9
moto/swf/urls.py Normal file
View file

@ -0,0 +1,9 @@
from .responses import SWFResponse
url_bases = [
"https?://swf.(.+).amazonaws.com",
]
url_paths = {
'{0}/$': SWFResponse.dispatch,
}

3
moto/swf/utils.py Normal file
View file

@ -0,0 +1,3 @@
def decapitalize(key):
return key[0].lower() + key[1:]