Implement heartbeat timeout on SWF activity tasks

This commit is contained in:
Jean-Baptiste Barth 2015-11-03 00:28:13 +01:00
commit 90c8797abd
8 changed files with 178 additions and 14 deletions

View file

@ -61,6 +61,11 @@ class SWFBackend(BaseBackend):
if not isinstance(i, basestring):
raise SWFSerializationException(parameter)
def _process_timeouts(self):
for domain in self.domains:
for wfe in domain.workflow_executions:
wfe._process_timeouts()
def list_domains(self, status, reverse_order=None):
self._check_string(status)
domains = [domain for domain in self.domains
@ -159,12 +164,16 @@ class SWFBackend(BaseBackend):
self._check_string(domain_name)
self._check_string(run_id)
self._check_string(workflow_id)
# process timeouts on all objects
self._process_timeouts()
domain = self._get_domain(domain_name)
return domain.get_workflow_execution(workflow_id, run_id=run_id)
def poll_for_decision_task(self, domain_name, task_list, identity=None):
self._check_string(domain_name)
self._check_string(task_list)
# process timeouts on all objects
self._process_timeouts()
domain = self._get_domain(domain_name)
# Real SWF cases:
# - case 1: there's a decision task to return, return it
@ -196,6 +205,8 @@ class SWFBackend(BaseBackend):
def count_pending_decision_tasks(self, domain_name, task_list):
self._check_string(domain_name)
self._check_string(task_list)
# process timeouts on all objects
self._process_timeouts()
domain = self._get_domain(domain_name)
count = 0
for wfe in domain.workflow_executions:
@ -208,6 +219,8 @@ class SWFBackend(BaseBackend):
execution_context=None):
self._check_string(task_token)
self._check_none_or_string(execution_context)
# process timeouts on all objects
self._process_timeouts()
# let's find decision task
decision_task = None
for domain in self.domains:
@ -259,6 +272,8 @@ class SWFBackend(BaseBackend):
def poll_for_activity_task(self, domain_name, task_list, identity=None):
self._check_string(domain_name)
self._check_string(task_list)
# process timeouts on all objects
self._process_timeouts()
domain = self._get_domain(domain_name)
# Real SWF cases:
# - case 1: there's an activity task to return, return it
@ -290,6 +305,8 @@ class SWFBackend(BaseBackend):
def count_pending_activity_tasks(self, domain_name, task_list):
self._check_string(domain_name)
self._check_string(task_list)
# process timeouts on all objects
self._process_timeouts()
domain = self._get_domain(domain_name)
count = 0
for _task_list, tasks in domain.activity_task_lists.iteritems():
@ -339,6 +356,8 @@ class SWFBackend(BaseBackend):
def respond_activity_task_completed(self, task_token, result=None):
self._check_string(task_token)
self._check_none_or_string(result)
# process timeouts on all objects
self._process_timeouts()
activity_task = self._find_activity_task_from_token(task_token)
wfe = activity_task.workflow_execution
wfe.complete_activity_task(activity_task.task_token, result=result)
@ -348,6 +367,8 @@ class SWFBackend(BaseBackend):
# TODO: implement length limits on reason and details (common pb with client libs)
self._check_none_or_string(reason)
self._check_none_or_string(details)
# process timeouts on all objects
self._process_timeouts()
activity_task = self._find_activity_task_from_token(task_token)
wfe = activity_task.workflow_execution
wfe.fail_activity_task(activity_task.task_token, reason=reason, details=details)
@ -360,6 +381,8 @@ class SWFBackend(BaseBackend):
self._check_none_or_string(details)
self._check_none_or_string(reason)
self._check_none_or_string(run_id)
# process timeouts on all objects
self._process_timeouts()
domain = self._get_domain(domain_name)
wfe = domain.get_workflow_execution(workflow_id, run_id=run_id, raise_if_closed=True)
wfe.terminate(child_policy=child_policy, details=details, reason=reason)
@ -367,8 +390,12 @@ class SWFBackend(BaseBackend):
def record_activity_task_heartbeat(self, task_token, details=None):
self._check_string(task_token)
self._check_none_or_string(details)
# process timeouts on all objects
self._process_timeouts()
activity_task = self._find_activity_task_from_token(task_token)
activity_task.reset_heartbeat_clock()
if details:
activity_task.details = details
swf_backends = {}

View file

@ -7,20 +7,27 @@ from ..utils import now_timestamp
class ActivityTask(object):
def __init__(self, activity_id, activity_type, scheduled_event_id,
workflow_execution, input=None):
workflow_execution, timeouts, input=None):
self.activity_id = activity_id
self.activity_type = activity_type
self.details = None
self.input = input
self.last_heartbeat_timestamp = now_timestamp()
self.scheduled_event_id = scheduled_event_id
self.started_event_id = None
self.state = "SCHEDULED"
self.task_token = str(uuid.uuid4())
self.timeouts = timeouts
self.timeout_type = None
self.workflow_execution = workflow_execution
# this is *not* necessarily coherent with workflow execution history,
# but that shouldn't be a problem for tests
self.scheduled_at = datetime.now()
@property
def open(self):
return self.state in ["SCHEDULED", "STARTED"]
def to_full_dict(self):
hsh = {
"activityId": self.activity_id,
@ -45,3 +52,13 @@ class ActivityTask(object):
def reset_heartbeat_clock(self):
self.last_heartbeat_timestamp = now_timestamp()
def has_timedout(self):
heartbeat_timeout_at = self.last_heartbeat_timestamp + \
int(self.timeouts["heartbeatTimeout"])
return heartbeat_timeout_at < now_timestamp()
def process_timeouts(self):
if self.has_timedout():
self.state = "TIMED_OUT"
self.timeout_type = "HEARTBEAT"

View file

@ -143,6 +143,15 @@ class HistoryEvent(object):
if self.reason:
hsh["reason"] = self.reason
return hsh
elif self.event_type == "ActivityTaskTimedOut":
hsh = {
"scheduledEventId": self.scheduled_event_id,
"startedEventId": self.started_event_id,
"timeoutType": self.timeout_type,
}
if self.details:
hsh["details"] = self.details
return hsh
else:
raise NotImplementedError(
"HistoryEvent does not implement attributes for type '{}'".format(self.event_type)

View file

@ -146,6 +146,26 @@ class WorkflowExecution(object):
hsh["latestActivityTaskTimestamp"] = self.latest_activity_task_timestamp
return hsh
def _process_timeouts(self):
self.should_schedule_decision_next = False
# TODO: process timeouts on workflow itself
# TODO: process timeouts on decision tasks
# activity tasks timeouts
for task in self.activity_tasks:
if task.open and task.has_timedout():
self.should_schedule_decision_next = True
task.process_timeouts()
self._add_event(
"ActivityTaskTimedOut",
details=task.details,
scheduled_event_id=task.scheduled_event_id,
started_event_id=task.started_event_id,
timeout_type=task.timeout_type,
)
# schedule decision task if needed
if self.should_schedule_decision_next:
self.schedule_decision_task()
def events(self, reverse_order=False):
if reverse_order:
return reversed(self._events)
@ -416,6 +436,7 @@ class WorkflowExecution(object):
input=attributes.get("input"),
scheduled_event_id=evt.event_id,
workflow_execution=self,
timeouts=timeouts,
)
self.domain.add_to_activity_task_list(task_list, task)
self.open_counts["openActivityTasks"] += 1