improved SWF support

This commit is contained in:
George Ionita 2017-12-23 05:45:05 +02:00
commit 3cede60f5b
8 changed files with 66 additions and 6 deletions

View file

@ -21,7 +21,7 @@ from .history_event import HistoryEvent # flake8: noqa
from .timeout import Timeout # flake8: noqa
from .workflow_type import WorkflowType # flake8: noqa
from .workflow_execution import WorkflowExecution # flake8: noqa
from time import sleep
KNOWN_SWF_TYPES = {
"activity": ActivityType,
@ -198,6 +198,9 @@ class SWFBackend(BaseBackend):
wfe.start_decision_task(task.task_token, identity=identity)
return task
else:
# Sleeping here will prevent clients that rely on the timeout from
# entering in a busy waiting loop.
sleep(1)
return None
def count_pending_decision_tasks(self, domain_name, task_list):
@ -293,6 +296,9 @@ class SWFBackend(BaseBackend):
wfe.start_activity_task(task.task_token, identity=identity)
return task
else:
# Sleeping here will prevent clients that rely on the timeout from
# entering in a busy waiting loop.
sleep(1)
return None
def count_pending_activity_tasks(self, domain_name, task_list):
@ -379,6 +385,14 @@ class SWFBackend(BaseBackend):
if details:
activity_task.details = details
def signal_workflow_execution(self, domain_name, signal_name, workflow_id, input=None, run_id=None):
# process timeouts on all objects
self._process_timeouts()
domain = self._get_domain(domain_name)
wfe = domain.get_workflow_execution(
workflow_id, run_id=run_id, raise_if_closed=True)
wfe.signal(signal_name, input)
swf_backends = {}
for region in boto.swf.regions():

View file

@ -25,6 +25,7 @@ SUPPORTED_HISTORY_EVENT_TYPES = (
"ActivityTaskTimedOut",
"DecisionTaskTimedOut",
"WorkflowExecutionTimedOut",
"WorkflowExecutionSignaled"
)

View file

@ -599,6 +599,14 @@ class WorkflowExecution(BaseModel):
self.close_status = "TERMINATED"
self.close_cause = "OPERATOR_INITIATED"
def signal(self, signal_name, input):
self._add_event(
"WorkflowExecutionSignaled",
signal_name=signal_name,
input=input,
)
self.schedule_decision_task()
def first_timeout(self):
if not self.open or not self.start_timestamp:
return None

View file

@ -326,9 +326,9 @@ class SWFResponse(BaseResponse):
_workflow_type = self._params["workflowType"]
workflow_name = _workflow_type["name"]
workflow_version = _workflow_type["version"]
_default_task_list = self._params.get("defaultTaskList")
if _default_task_list:
task_list = _default_task_list.get("name")
_task_list = self._params.get("taskList")
if _task_list:
task_list = _task_list.get("name")
else:
task_list = None
child_policy = self._params.get("childPolicy")
@ -507,3 +507,20 @@ class SWFResponse(BaseResponse):
)
# TODO: make it dynamic when we implement activity tasks cancellation
return json.dumps({"cancelRequested": False})
def signal_workflow_execution(self):
domain_name = self._params["domain"]
signal_name = self._params["signalName"]
workflow_id = self._params["workflowId"]
_input = self._params["input"]
run_id = self._params["runId"]
self._check_string(domain_name)
self._check_string(signal_name)
self._check_string(workflow_id)
self._check_none_or_string(_input)
self._check_none_or_string(run_id)
self.swf_backend.signal_workflow_execution(
domain_name, signal_name, workflow_id, _input, run_id)
return ""