Refactor timeouts processing so it will be easier to compute them in order

This commit is contained in:
Jean-Baptiste Barth 2015-11-05 02:22:02 +01:00
commit d618585790
9 changed files with 101 additions and 52 deletions

View file

@ -20,6 +20,7 @@ from .decision_task import DecisionTask
from .domain import Domain
from .generic_type import GenericType
from .history_event import HistoryEvent
from .timeout import Timeout
from .workflow_type import WorkflowType
from .workflow_execution import WorkflowExecution

View file

@ -5,6 +5,8 @@ import uuid
from ..exceptions import SWFWorkflowExecutionClosedError
from ..utils import now_timestamp
from .timeout import Timeout
class ActivityTask(object):
def __init__(self, activity_id, activity_type, scheduled_event_id,
@ -60,19 +62,23 @@ class ActivityTask(object):
def reset_heartbeat_clock(self):
self.last_heartbeat_timestamp = now_timestamp()
def has_timedout(self):
def first_timeout(self):
if not self.workflow_execution.open:
return False
return None
# TODO: handle the "NONE" case
heartbeat_timeout_at = self.last_heartbeat_timestamp + \
int(self.timeouts["heartbeatTimeout"])
return heartbeat_timeout_at < now_timestamp()
_timeout = Timeout(self, heartbeat_timeout_at, "HEARTBEAT")
if _timeout.reached:
return _timeout
def process_timeouts(self):
if self.has_timedout():
self.timeout()
_timeout = self.first_timeout()
if _timeout:
self.timeout(_timeout)
def timeout(self):
def timeout(self, _timeout):
self._check_workflow_execution_open()
self.state = "TIMED_OUT"
self.timeout_type = "HEARTBEAT"
self.timeout_type = _timeout.kind

View file

@ -5,6 +5,8 @@ import uuid
from ..exceptions import SWFWorkflowExecutionClosedError
from ..utils import now_timestamp
from .timeout import Timeout
class DecisionTask(object):
def __init__(self, workflow_execution, scheduled_event_id):
@ -50,19 +52,21 @@ class DecisionTask(object):
self._check_workflow_execution_open()
self.state = "COMPLETED"
def has_timedout(self):
def first_timeout(self):
if self.state != "STARTED" or not self.workflow_execution.open:
return False
return None
# TODO: handle the "NONE" case
start_to_close_timeout = self.started_timestamp + \
int(self.start_to_close_timeout)
return start_to_close_timeout < now_timestamp()
start_to_close_at = self.started_timestamp + int(self.start_to_close_timeout)
_timeout = Timeout(self, start_to_close_at, "START_TO_CLOSE")
if _timeout.reached:
return _timeout
def process_timeouts(self):
if self.has_timedout():
self.timeout()
_timeout = self.first_timeout()
if _timeout:
self.timeout(_timeout)
def timeout(self):
def timeout(self, _timeout):
self._check_workflow_execution_open()
self.state = "TIMED_OUT"
self.timeout_type = "START_TO_CLOSE"
self.timeout_type = _timeout.kind

View file

@ -0,0 +1,12 @@
from ..utils import now_timestamp
class Timeout(object):
def __init__(self, obj, timestamp, kind):
self.obj = obj
self.timestamp = timestamp
self.kind = kind
@property
def reached(self):
return now_timestamp() >= self.timestamp

View file

@ -18,6 +18,7 @@ from .activity_task import ActivityTask
from .activity_type import ActivityType
from .decision_task import DecisionTask
from .history_event import HistoryEvent
from .timeout import Timeout
# TODO: extract decision related logic into a Decision class
@ -151,8 +152,9 @@ class WorkflowExecution(object):
self.should_schedule_decision_next = False
# workflow execution timeout
if self.has_timedout():
self.process_timeouts()
_timeout = self.first_timeout()
if _timeout:
self.execute_timeout(_timeout)
# TODO: process child policy on child workflows here or in process_timeouts()
self._add_event(
"WorkflowExecutionTimedOut",
@ -162,7 +164,7 @@ class WorkflowExecution(object):
# decision tasks timeouts
for task in self.decision_tasks:
if task.state == "STARTED" and task.has_timedout():
if task.state == "STARTED" and task.first_timeout():
self.should_schedule_decision_next = True
task.process_timeouts()
self._add_event(
@ -174,7 +176,7 @@ class WorkflowExecution(object):
# activity tasks timeouts
for task in self.activity_tasks:
if task.open and task.has_timedout():
if task.open and task.first_timeout():
self.should_schedule_decision_next = True
task.process_timeouts()
self._add_event(
@ -522,19 +524,23 @@ class WorkflowExecution(object):
self.close_status = "TERMINATED"
self.close_cause = "OPERATOR_INITIATED"
def has_timedout(self):
def first_timeout(self):
if not self.open or not self.start_timestamp:
return False
# TODO: handle the "NONE" case
start_to_close_timeout = self.start_timestamp + \
int(self.execution_start_to_close_timeout)
return start_to_close_timeout < now_timestamp()
return None
start_to_close_at = self.start_timestamp + int(self.execution_start_to_close_timeout)
_timeout = Timeout(self, start_to_close_at, "START_TO_CLOSE")
if _timeout.reached:
return _timeout
def execute_timeout(self, timeout):
self.execution_status = "CLOSED"
self.close_status = "TIMED_OUT"
self.timeout_type = timeout.kind
def process_timeouts(self):
if self.has_timedout():
self.execution_status = "CLOSED"
self.close_status = "TIMED_OUT"
self.timeout_type = "START_TO_CLOSE"
_timeout = self.first_timeout()
if _timeout:
self.execute_timeout(_timeout)
@property
def open(self):