Add SWF endpoint PollForActivityTask
This commit is contained in:
parent
d650f71d9c
commit
761ab816f9
8 changed files with 196 additions and 37 deletions
|
|
@ -256,6 +256,37 @@ class SWFBackend(BaseBackend):
|
|||
decisions=decisions,
|
||||
execution_context=execution_context)
|
||||
|
||||
def poll_for_activity_task(self, domain_name, task_list, identity=None):
|
||||
self._check_string(domain_name)
|
||||
self._check_string(task_list)
|
||||
domain = self._get_domain(domain_name)
|
||||
# Real SWF cases:
|
||||
# - case 1: there's an activity task to return, return it
|
||||
# - case 2: there's no activity task to return, so wait for timeout
|
||||
# and if a new activity is scheduled, return it
|
||||
# - case 3: timeout reached, no activity task, return an empty response
|
||||
# (e.g. a response with an empty "taskToken")
|
||||
#
|
||||
# For the sake of simplicity, we forget case 2 for now, so either
|
||||
# there's an ActivityTask to return, either we return a blank one.
|
||||
#
|
||||
# SWF client libraries should cope with that easily as long as tests
|
||||
# aren't distributed.
|
||||
#
|
||||
# TODO: handle long polling (case 2) for activity tasks
|
||||
candidates = []
|
||||
for _task_list, tasks in domain.activity_task_lists.iteritems():
|
||||
if _task_list == task_list:
|
||||
candidates += filter(lambda t: t.state == "SCHEDULED", tasks)
|
||||
if any(candidates):
|
||||
# TODO: handle task priorities (but not supported by boto for now)
|
||||
task = min(candidates, key=lambda d: d.scheduled_at)
|
||||
wfe = task.workflow_execution
|
||||
wfe.start_activity_task(task.task_token, identity=identity)
|
||||
return task
|
||||
else:
|
||||
return None
|
||||
|
||||
|
||||
swf_backends = {}
|
||||
for region in boto.swf.regions():
|
||||
|
|
|
|||
|
|
@ -1,16 +1,34 @@
|
|||
from __future__ import unicode_literals
|
||||
from datetime import datetime
|
||||
import uuid
|
||||
|
||||
|
||||
class ActivityTask(object):
|
||||
def __init__(self, activity_id, activity_type, workflow_execution, input=None):
|
||||
def __init__(self, activity_id, activity_type, scheduled_event_id,
|
||||
workflow_execution, input=None):
|
||||
self.activity_id = activity_id
|
||||
self.activity_type = activity_type
|
||||
self.input = input
|
||||
self.scheduled_event_id = scheduled_event_id
|
||||
self.started_event_id = None
|
||||
self.state = "SCHEDULED"
|
||||
self.task_token = str(uuid.uuid4())
|
||||
self.workflow_execution = workflow_execution
|
||||
# this is *not* necessarily coherent with workflow execution history,
|
||||
# but that shouldn't be a problem for tests
|
||||
self.scheduled_at = datetime.now()
|
||||
|
||||
def to_full_dict(self):
|
||||
hsh = {
|
||||
"activityId": self.activity_id,
|
||||
"activityType": self.activity_type.to_short_dict(),
|
||||
"taskToken": self.task_token,
|
||||
"startedEventId": self.started_event_id,
|
||||
"workflowExecution": self.workflow_execution.to_short_dict(),
|
||||
}
|
||||
if self.input:
|
||||
hsh["input"] = self.input
|
||||
return hsh
|
||||
|
||||
def start(self, started_event_id):
|
||||
self.state = "STARTED"
|
||||
|
|
|
|||
|
|
@ -105,6 +105,14 @@ class HistoryEvent(object):
|
|||
"cause": self.cause,
|
||||
"decisionTaskCompletedEventId": self.decision_task_completed_event_id,
|
||||
}
|
||||
elif self.event_type == "ActivityTaskStarted":
|
||||
# TODO: merge it with DecisionTaskStarted
|
||||
hsh = {
|
||||
"scheduledEventId": self.scheduled_event_id
|
||||
}
|
||||
if hasattr(self, "identity") and self.identity:
|
||||
hsh["identity"] = self.identity
|
||||
return hsh
|
||||
else:
|
||||
raise NotImplementedError(
|
||||
"HistoryEvent does not implement attributes for type '{}'".format(self.event_type)
|
||||
|
|
|
|||
|
|
@ -396,19 +396,37 @@ class WorkflowExecution(object):
|
|||
"{}_UNDEFINED".format(error_key.upper()))
|
||||
return
|
||||
|
||||
task = ActivityTask(
|
||||
activity_id=attributes["activityId"],
|
||||
activity_type=activity_type,
|
||||
input=attributes.get("input"),
|
||||
workflow_execution=self,
|
||||
)
|
||||
# Only add event and increment counters if nothing went wrong
|
||||
self.domain.add_to_activity_task_list(task_list, task)
|
||||
self._add_event(
|
||||
# Only add event and increment counters now that nothing went wrong
|
||||
evt = self._add_event(
|
||||
"ActivityTaskScheduled",
|
||||
decision_task_completed_event_id=event_id,
|
||||
activity_type=activity_type,
|
||||
attributes=attributes,
|
||||
task_list=task_list,
|
||||
)
|
||||
task = ActivityTask(
|
||||
activity_id=attributes["activityId"],
|
||||
activity_type=activity_type,
|
||||
input=attributes.get("input"),
|
||||
scheduled_event_id=evt.event_id,
|
||||
workflow_execution=self,
|
||||
)
|
||||
self.domain.add_to_activity_task_list(task_list, task)
|
||||
self.open_counts["openActivityTasks"] += 1
|
||||
|
||||
def _find_activity_task(self, task_token):
|
||||
for task in self.activity_tasks:
|
||||
if task.task_token == task_token:
|
||||
return task
|
||||
raise ValueError(
|
||||
"No activity task with token: {}".format(task_token)
|
||||
)
|
||||
|
||||
def start_activity_task(self, task_token, identity=None):
|
||||
task = self._find_activity_task(task_token)
|
||||
evt = self._add_event(
|
||||
"ActivityTaskStarted",
|
||||
scheduled_event_id=task.scheduled_event_id,
|
||||
identity=identity
|
||||
)
|
||||
task.start(evt.event_id)
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue