Add SWF endpoint: DescribeWorkflowExecution

This commit is contained in:
Jean-Baptiste Barth 2015-10-02 17:42:28 +02:00
commit 2878252816
4 changed files with 153 additions and 1 deletions

View file

@ -84,6 +84,17 @@ class Domain(object):
raise SWFWorkflowExecutionAlreadyStartedFault()
self.workflow_executions[_id] = workflow_execution
def get_workflow_execution(self, run_id, workflow_id):
wfe = self.workflow_executions.get(workflow_id)
if not wfe or wfe.run_id != run_id:
raise SWFUnknownResourceFault(
"execution",
"WorkflowExecution=[workflowId={}, runId={}]".format(
workflow_id, run_id
)
)
return wfe
class GenericType(object):
def __init__(self, name, version, **kwargs):
@ -177,12 +188,68 @@ class WorkflowExecution(object):
self.workflow_type = workflow_type
self.workflow_id = workflow_id
self.run_id = uuid.uuid4().hex
self.execution_status = "OPEN"
self.cancel_requested = False
#config
for key, value in kwargs.iteritems():
self.__setattr__(key, value)
#counters
self.open_counts = {
"openTimers": 0,
"openDecisionTasks": 0,
"openActivityTasks": 0,
"openChildWorkflowExecutions": 0,
}
def __repr__(self):
return "WorkflowExecution(run_id: {})".format(self.run_id)
@property
def _configuration_keys(self):
return [
"executionStartToCloseTimeout",
"childPolicy",
"taskPriority",
"taskStartToCloseTimeout",
]
def to_short_dict(self):
return {
"workflowId": self.workflow_id,
"runId": self.run_id
}
def to_medium_dict(self):
hsh = {
"execution": self.to_short_dict(),
"workflowType": self.workflow_type.to_short_dict(),
"startTimestamp": 1420066800.123,
"executionStatus": self.execution_status,
"cancelRequested": self.cancel_requested,
}
if hasattr(self, "tag_list"):
hsh["tagList"] = self.tag_list
return hsh
def to_full_dict(self):
hsh = {
"executionInfo": self.to_medium_dict(),
"executionConfiguration": {}
}
#configuration
if hasattr(self, "task_list"):
hsh["executionConfiguration"]["taskList"] = {"name": self.task_list}
for key in self._configuration_keys:
attr = camelcase_to_underscores(key)
if not hasattr(self, attr):
continue
if not getattr(self, attr):
continue
hsh["executionConfiguration"][key] = getattr(self, attr)
#counters
hsh["openCounts"] = self.open_counts
return hsh
class SWFBackend(BaseBackend):
def __init__(self, region_name):
@ -318,6 +385,13 @@ class SWFBackend(BaseBackend):
return wfe
def describe_workflow_execution(self, domain_name, run_id, workflow_id):
self._check_string(domain_name)
self._check_string(run_id)
self._check_string(workflow_id)
domain = self._get_domain(domain_name)
return domain.get_workflow_execution(run_id, workflow_id)
swf_backends = {}
for region in boto.swf.regions():

View file

@ -139,7 +139,6 @@ class SWFResponse(BaseResponse):
def describe_activity_type(self):
return self._describe_type("activity")
# TODO: refactor with list_activity_types()
def list_workflow_types(self):
return self._list_types("workflow")
@ -201,3 +200,12 @@ class SWFResponse(BaseResponse):
return json.dumps({
"runId": wfe.run_id
})
def describe_workflow_execution(self):
domain_name = self._params["domain"]
_workflow_execution = self._params["execution"]
run_id = _workflow_execution["runId"]
workflow_id = _workflow_execution["workflowId"]
wfe = self.swf_backend.describe_workflow_execution(domain_name, run_id, workflow_id)
return json.dumps(wfe.to_full_dict())