Add SWF endpoint GetWorkflowExecutionHistory and associated HistoryEvent model

This commit is contained in:
Jean-Baptiste Barth 2015-10-04 23:37:50 +02:00
commit 464aef293c
8 changed files with 173 additions and 11 deletions

View file

@ -15,6 +15,7 @@ from ..exceptions import (
from .activity_type import ActivityType
from .domain import Domain
from .generic_type import GenericType
from .history_event import HistoryEvent
from .workflow_type import WorkflowType
from .workflow_execution import WorkflowExecution
@ -150,6 +151,7 @@ class SWFBackend(BaseBackend):
wfe = WorkflowExecution(wf_type, workflow_id,
tag_list=tag_list, **kwargs)
domain.add_workflow_execution(wfe)
wfe.start()
return wfe

View file

@ -0,0 +1,59 @@
from __future__ import unicode_literals
from datetime import datetime
from time import mktime
class HistoryEvent(object):
def __init__(self, event_id, event_type, **kwargs):
self.event_id = event_id
self.event_type = event_type
self.event_timestamp = float(mktime(datetime.now().timetuple()))
for key, value in kwargs.iteritems():
self.__setattr__(key, value)
# break soon if attributes are not valid
self.event_attributes()
def to_dict(self):
return {
"eventId": self.event_id,
"eventType": self.event_type,
"eventTimestamp": self.event_timestamp,
self._attributes_key(): self.event_attributes()
}
def _attributes_key(self):
key = "{}EventAttributes".format(self.event_type)
key = key[0].lower() + key[1:]
return key
def event_attributes(self):
if self.event_type == "WorkflowExecutionStarted":
wfe = self.workflow_execution
hsh = {
"childPolicy": wfe.child_policy,
"executionStartToCloseTimeout": wfe.execution_start_to_close_timeout,
"parentInitiatedEventId": 0,
"taskList": {
"name": wfe.task_list
},
"taskStartToCloseTimeout": wfe.task_start_to_close_timeout,
"workflowType": {
"name": wfe.workflow_type.name,
"version": wfe.workflow_type.version
}
}
return hsh
elif self.event_type == "DecisionTaskScheduled":
wfe = self.workflow_execution
return {
"startToCloseTimeout": wfe.task_start_to_close_timeout,
"taskList": {"name": wfe.task_list}
}
elif self.event_type == "DecisionTaskStarted":
return {
"scheduledEventId": self.scheduled_event_id
}
else:
raise NotImplementedError(
"HistoryEvent does not implement attributes for type '{}'".format(self.event_type)
)

View file

@ -4,6 +4,7 @@ import uuid
from moto.core.utils import camelcase_to_underscores
from ..exceptions import SWFDefaultUndefinedFault
from .history_event import HistoryEvent
class WorkflowExecution(object):
@ -29,6 +30,8 @@ class WorkflowExecution(object):
"openActivityTasks": 0,
"openChildWorkflowExecutions": 0,
}
# events
self.events = []
def __repr__(self):
return "WorkflowExecution(run_id: {})".format(self.run_id)
@ -88,3 +91,21 @@ class WorkflowExecution(object):
#counters
hsh["openCounts"] = self.open_counts
return hsh
def next_event_id(self):
event_ids = [evt.event_id for evt in self.events]
return max(event_ids or [0])
def _add_event(self, *args, **kwargs):
evt = HistoryEvent(self.next_event_id(), *args, **kwargs)
self.events.append(evt)
def start(self):
self._add_event(
"WorkflowExecutionStarted",
workflow_execution=self,
)
self._add_event(
"DecisionTaskScheduled",
workflow_execution=self,
)