diff --git a/moto/swf/models/activity_task.py b/moto/swf/models/activity_task.py index 635c371c..ccddb0ba 100644 --- a/moto/swf/models/activity_task.py +++ b/moto/swf/models/activity_task.py @@ -54,6 +54,7 @@ class ActivityTask(object): self.last_heartbeat_timestamp = now_timestamp() def has_timedout(self): + # TODO: handle the "NONE" case heartbeat_timeout_at = self.last_heartbeat_timestamp + \ int(self.timeouts["heartbeatTimeout"]) return heartbeat_timeout_at < now_timestamp() diff --git a/moto/swf/models/decision_task.py b/moto/swf/models/decision_task.py index 967c94fa..dfc8f568 100644 --- a/moto/swf/models/decision_task.py +++ b/moto/swf/models/decision_task.py @@ -2,6 +2,8 @@ from __future__ import unicode_literals from datetime import datetime import uuid +from ..utils import now_timestamp + class DecisionTask(object): def __init__(self, workflow_execution, scheduled_event_id): @@ -11,10 +13,13 @@ class DecisionTask(object): self.scheduled_event_id = scheduled_event_id self.previous_started_event_id = 0 self.started_event_id = None + self.started_timestamp = None + self.start_to_close_timeout = self.workflow_execution.task_start_to_close_timeout self.state = "SCHEDULED" # this is *not* necessarily coherent with workflow execution history, # but that shouldn't be a problem for tests self.scheduled_at = datetime.now() + self.timeout_type = None def to_full_dict(self, reverse_order=False): events = self.workflow_execution.events(reverse_order=reverse_order) @@ -33,7 +38,21 @@ class DecisionTask(object): def start(self, started_event_id): self.state = "STARTED" + self.started_timestamp = now_timestamp() self.started_event_id = started_event_id def complete(self): self.state = "COMPLETED" + + def has_timedout(self): + if self.state != "STARTED": + return False + # TODO: handle the "NONE" case + start_to_close_timeout = self.started_timestamp + \ + int(self.start_to_close_timeout) + return start_to_close_timeout < now_timestamp() + + def process_timeouts(self): + if self.has_timedout(): + self.state = "TIMED_OUT" + self.timeout_type = "START_TO_CLOSE" diff --git a/moto/swf/models/history_event.py b/moto/swf/models/history_event.py index 0b53f865..c4d1e61a 100644 --- a/moto/swf/models/history_event.py +++ b/moto/swf/models/history_event.py @@ -152,6 +152,12 @@ class HistoryEvent(object): if self.details: hsh["details"] = self.details return hsh + elif self.event_type == "DecisionTaskTimedOut": + return { + "scheduledEventId": self.scheduled_event_id, + "startedEventId": self.started_event_id, + "timeoutType": self.timeout_type, + } else: raise NotImplementedError( "HistoryEvent does not implement attributes for type '{0}'".format(self.event_type) diff --git a/moto/swf/models/workflow_execution.py b/moto/swf/models/workflow_execution.py index cdc356ab..0de75755 100644 --- a/moto/swf/models/workflow_execution.py +++ b/moto/swf/models/workflow_execution.py @@ -148,8 +148,21 @@ class WorkflowExecution(object): def _process_timeouts(self): self.should_schedule_decision_next = False + # TODO: process timeouts on workflow itself - # TODO: process timeouts on decision tasks + + # decision tasks timeouts + for task in self.decision_tasks: + if task.state == "STARTED" and task.has_timedout(): + self.should_schedule_decision_next = True + task.process_timeouts() + self._add_event( + "DecisionTaskTimedOut", + scheduled_event_id=task.scheduled_event_id, + started_event_id=task.started_event_id, + timeout_type=task.timeout_type, + ) + # activity tasks timeouts for task in self.activity_tasks: if task.open and task.has_timedout(): diff --git a/tests/test_swf/models/test_decision_task.py b/tests/test_swf/models/test_decision_task.py index f1156a19..64268b38 100644 --- a/tests/test_swf/models/test_decision_task.py +++ b/tests/test_swf/models/test_decision_task.py @@ -1,3 +1,4 @@ +from freezegun import freeze_time from sure import expect from moto.swf.models import DecisionTask @@ -29,3 +30,20 @@ def test_decision_task_full_dict_representation(): dt.start(1234) fd = dt.to_full_dict() fd["startedEventId"].should.equal(1234) + +def test_decision_task_has_timedout(): + wfe = make_workflow_execution() + wft = wfe.workflow_type + dt = DecisionTask(wfe, 123) + dt.has_timedout().should.equal(False) + + with freeze_time("2015-01-01 12:00:00"): + dt.start(1234) + dt.has_timedout().should.equal(False) + + # activity task timeout is 300s == 5mins + with freeze_time("2015-01-01 12:06:00"): + dt.has_timedout().should.equal(True) + + dt.complete() + dt.has_timedout().should.equal(False) diff --git a/tests/test_swf/responses/test_timeouts.py b/tests/test_swf/responses/test_timeouts.py index ca237779..97593d2c 100644 --- a/tests/test_swf/responses/test_timeouts.py +++ b/tests/test_swf/responses/test_timeouts.py @@ -32,3 +32,34 @@ def test_activity_task_heartbeat_timeout(): attrs["timeoutType"].should.equal("HEARTBEAT") resp["events"][-1]["eventType"].should.equal("DecisionTaskScheduled") + +# Decision Task Start to Close timeout +# Default value in workflow helpers: 5 mins +@mock_swf +def test_decision_task_start_to_close_timeout(): + pass + with freeze_time("2015-01-01 12:00:00"): + conn = setup_workflow() + conn.poll_for_decision_task("test-domain", "queue")["taskToken"] + + with freeze_time("2015-01-01 12:04:30"): + resp = conn.get_workflow_execution_history("test-domain", conn.run_id, "uid-abcd1234") + + event_types = [evt["eventType"] for evt in resp["events"]] + event_types.should.equal( + ["WorkflowExecutionStarted", "DecisionTaskScheduled", "DecisionTaskStarted"] + ) + + with freeze_time("2015-01-01 12:05:30"): + # => Decision Task Start to Close timeout reached!! + resp = conn.get_workflow_execution_history("test-domain", conn.run_id, "uid-abcd1234") + + event_types = [evt["eventType"] for evt in resp["events"]] + event_types.should.equal( + ["WorkflowExecutionStarted", "DecisionTaskScheduled", "DecisionTaskStarted", + "DecisionTaskTimedOut", "DecisionTaskScheduled"] + ) + attrs = resp["events"][-2]["decisionTaskTimedOutEventAttributes"] + attrs.should.equal({ + "scheduledEventId": 2, "startedEventId": 3, "timeoutType": "START_TO_CLOSE" + })