From 761ab816f96b4709701b31a34ff08a736440b453 Mon Sep 17 00:00:00 2001 From: Jean-Baptiste Barth Date: Mon, 26 Oct 2015 23:16:59 +0100 Subject: [PATCH] Add SWF endpoint PollForActivityTask --- moto/swf/models/__init__.py | 31 ++++++++++++ moto/swf/models/activity_task.py | 20 +++++++- moto/swf/models/history_event.py | 8 +++ moto/swf/models/workflow_execution.py | 36 +++++++++---- moto/swf/responses.py | 14 ++++++ tests/test_swf/models/test_activity_task.py | 30 ++++++++++- .../models/test_workflow_execution.py | 50 +++++++++---------- .../test_swf/responses/test_activity_tasks.py | 46 +++++++++++++++++ 8 files changed, 197 insertions(+), 38 deletions(-) create mode 100644 tests/test_swf/responses/test_activity_tasks.py diff --git a/moto/swf/models/__init__.py b/moto/swf/models/__init__.py index 9fff96eb..c8cb7916 100644 --- a/moto/swf/models/__init__.py +++ b/moto/swf/models/__init__.py @@ -256,6 +256,37 @@ class SWFBackend(BaseBackend): decisions=decisions, execution_context=execution_context) + def poll_for_activity_task(self, domain_name, task_list, identity=None): + self._check_string(domain_name) + self._check_string(task_list) + domain = self._get_domain(domain_name) + # Real SWF cases: + # - case 1: there's an activity task to return, return it + # - case 2: there's no activity task to return, so wait for timeout + # and if a new activity is scheduled, return it + # - case 3: timeout reached, no activity task, return an empty response + # (e.g. a response with an empty "taskToken") + # + # For the sake of simplicity, we forget case 2 for now, so either + # there's an ActivityTask to return, either we return a blank one. + # + # SWF client libraries should cope with that easily as long as tests + # aren't distributed. + # + # TODO: handle long polling (case 2) for activity tasks + candidates = [] + for _task_list, tasks in domain.activity_task_lists.iteritems(): + if _task_list == task_list: + candidates += filter(lambda t: t.state == "SCHEDULED", tasks) + if any(candidates): + # TODO: handle task priorities (but not supported by boto for now) + task = min(candidates, key=lambda d: d.scheduled_at) + wfe = task.workflow_execution + wfe.start_activity_task(task.task_token, identity=identity) + return task + else: + return None + swf_backends = {} for region in boto.swf.regions(): diff --git a/moto/swf/models/activity_task.py b/moto/swf/models/activity_task.py index 298984a2..c7b68d9c 100644 --- a/moto/swf/models/activity_task.py +++ b/moto/swf/models/activity_task.py @@ -1,16 +1,34 @@ from __future__ import unicode_literals +from datetime import datetime import uuid class ActivityTask(object): - def __init__(self, activity_id, activity_type, workflow_execution, input=None): + def __init__(self, activity_id, activity_type, scheduled_event_id, + workflow_execution, input=None): self.activity_id = activity_id self.activity_type = activity_type self.input = input + self.scheduled_event_id = scheduled_event_id self.started_event_id = None self.state = "SCHEDULED" self.task_token = str(uuid.uuid4()) self.workflow_execution = workflow_execution + # this is *not* necessarily coherent with workflow execution history, + # but that shouldn't be a problem for tests + self.scheduled_at = datetime.now() + + def to_full_dict(self): + hsh = { + "activityId": self.activity_id, + "activityType": self.activity_type.to_short_dict(), + "taskToken": self.task_token, + "startedEventId": self.started_event_id, + "workflowExecution": self.workflow_execution.to_short_dict(), + } + if self.input: + hsh["input"] = self.input + return hsh def start(self, started_event_id): self.state = "STARTED" diff --git a/moto/swf/models/history_event.py b/moto/swf/models/history_event.py index eca00fb0..45a83903 100644 --- a/moto/swf/models/history_event.py +++ b/moto/swf/models/history_event.py @@ -105,6 +105,14 @@ class HistoryEvent(object): "cause": self.cause, "decisionTaskCompletedEventId": self.decision_task_completed_event_id, } + elif self.event_type == "ActivityTaskStarted": + # TODO: merge it with DecisionTaskStarted + hsh = { + "scheduledEventId": self.scheduled_event_id + } + if hasattr(self, "identity") and self.identity: + hsh["identity"] = self.identity + return hsh else: raise NotImplementedError( "HistoryEvent does not implement attributes for type '{}'".format(self.event_type) diff --git a/moto/swf/models/workflow_execution.py b/moto/swf/models/workflow_execution.py index 0b1984af..dfa7380f 100644 --- a/moto/swf/models/workflow_execution.py +++ b/moto/swf/models/workflow_execution.py @@ -396,19 +396,37 @@ class WorkflowExecution(object): "{}_UNDEFINED".format(error_key.upper())) return - task = ActivityTask( - activity_id=attributes["activityId"], - activity_type=activity_type, - input=attributes.get("input"), - workflow_execution=self, - ) - # Only add event and increment counters if nothing went wrong - self.domain.add_to_activity_task_list(task_list, task) - self._add_event( + # Only add event and increment counters now that nothing went wrong + evt = self._add_event( "ActivityTaskScheduled", decision_task_completed_event_id=event_id, activity_type=activity_type, attributes=attributes, task_list=task_list, ) + task = ActivityTask( + activity_id=attributes["activityId"], + activity_type=activity_type, + input=attributes.get("input"), + scheduled_event_id=evt.event_id, + workflow_execution=self, + ) + self.domain.add_to_activity_task_list(task_list, task) self.open_counts["openActivityTasks"] += 1 + + def _find_activity_task(self, task_token): + for task in self.activity_tasks: + if task.task_token == task_token: + return task + raise ValueError( + "No activity task with token: {}".format(task_token) + ) + + def start_activity_task(self, task_token, identity=None): + task = self._find_activity_task(task_token) + evt = self._add_event( + "ActivityTaskStarted", + scheduled_event_id=task.scheduled_event_id, + identity=identity + ) + task.start(evt.event_id) diff --git a/moto/swf/responses.py b/moto/swf/responses.py index 334bac21..66493aef 100644 --- a/moto/swf/responses.py +++ b/moto/swf/responses.py @@ -251,3 +251,17 @@ class SWFResponse(BaseResponse): task_token, decisions=decisions, execution_context=execution_context ) return "" + + def poll_for_activity_task(self): + domain_name = self._params["domain"] + task_list = self._params["taskList"]["name"] + identity = self._params.get("identity") + activity_task = self.swf_backend.poll_for_activity_task( + domain_name, task_list, identity=identity + ) + if activity_task: + return json.dumps( + activity_task.to_full_dict() + ) + else: + return json.dumps({"startedEventId": 0}) diff --git a/tests/test_swf/models/test_activity_task.py b/tests/test_swf/models/test_activity_task.py index 2e2bba2f..d691cc05 100644 --- a/tests/test_swf/models/test_activity_task.py +++ b/tests/test_swf/models/test_activity_task.py @@ -1,6 +1,9 @@ from sure import expect -from moto.swf.models import ActivityTask +from moto.swf.models import ( + ActivityTask, + ActivityType, +) from ..utils import make_workflow_execution @@ -11,6 +14,7 @@ def test_activity_task_creation(): activity_id="my-activity-123", activity_type="foo", input="optional", + scheduled_event_id=117, workflow_execution=wfe, ) task.workflow_execution.should.equal(wfe) @@ -24,3 +28,27 @@ def test_activity_task_creation(): task.complete() task.state.should.equal("COMPLETED") + +def test_activity_task_full_dict_representation(): + wfe = make_workflow_execution() + wft = wfe.workflow_type + at = ActivityTask( + activity_id="my-activity-123", + activity_type=ActivityType("foo", "v1.0"), + input="optional", + scheduled_event_id=117, + workflow_execution=wfe, + ) + at.start(1234) + + fd = at.to_full_dict() + fd["activityId"].should.equal("my-activity-123") + fd["activityType"]["version"].should.equal("v1.0") + fd["input"].should.equal("optional") + fd["startedEventId"].should.equal(1234) + fd.should.contain("taskToken") + fd["workflowExecution"].should.equal(wfe.to_short_dict()) + + at.start(1234) + fd = at.to_full_dict() + fd["startedEventId"].should.equal(1234) diff --git a/tests/test_swf/models/test_workflow_execution.py b/tests/test_swf/models/test_workflow_execution.py index c6660ab9..edacade8 100644 --- a/tests/test_swf/models/test_workflow_execution.py +++ b/tests/test_swf/models/test_workflow_execution.py @@ -17,6 +17,16 @@ from ..utils import ( ) +VALID_ACTIVITY_TASK_ATTRIBUTES = { + "activityId": "my-activity-001", + "activityType": { "name": "test-activity", "version": "v1.1" }, + "taskList": { "name": "task-list-name" }, + "scheduleToStartTimeout": "600", + "scheduleToCloseTimeout": "600", + "startToCloseTimeout": "600", + "heartbeatTimeout": "300", +} + def test_workflow_execution_creation(): domain = get_basic_domain() wft = get_basic_workflow_type() @@ -187,15 +197,7 @@ def test_workflow_execution_fail(): def test_workflow_execution_schedule_activity_task(): wfe = make_workflow_execution() - wfe.schedule_activity_task(123, { - "activityId": "my-activity-001", - "activityType": { "name": "test-activity", "version": "v1.1" }, - "taskList": { "name": "task-list-name" }, - "scheduleToStartTimeout": "600", - "scheduleToCloseTimeout": "600", - "startToCloseTimeout": "600", - "heartbeatTimeout": "300", - }) + wfe.schedule_activity_task(123, VALID_ACTIVITY_TASK_ATTRIBUTES) wfe.open_counts["openActivityTasks"].should.equal(1) last_event = wfe.events()[-1] @@ -330,29 +332,23 @@ def test_workflow_execution_schedule_activity_task_failure_triggers_new_decision def test_workflow_execution_schedule_activity_task_with_same_activity_id(): wfe = make_workflow_execution() - wfe.schedule_activity_task(123, { - "activityId": "my-activity-001", - "activityType": { "name": "test-activity", "version": "v1.1" }, - "taskList": { "name": "task-list-name" }, - "scheduleToStartTimeout": "600", - "scheduleToCloseTimeout": "600", - "startToCloseTimeout": "600", - "heartbeatTimeout": "300", - }) + wfe.schedule_activity_task(123, VALID_ACTIVITY_TASK_ATTRIBUTES) wfe.open_counts["openActivityTasks"].should.equal(1) last_event = wfe.events()[-1] last_event.event_type.should.equal("ActivityTaskScheduled") - wfe.schedule_activity_task(123, { - "activityId": "my-activity-001", - "activityType": { "name": "test-activity", "version": "v1.1" }, - "taskList": { "name": "task-list-name" }, - "scheduleToStartTimeout": "600", - "scheduleToCloseTimeout": "600", - "startToCloseTimeout": "600", - "heartbeatTimeout": "300", - }) + wfe.schedule_activity_task(123, VALID_ACTIVITY_TASK_ATTRIBUTES) wfe.open_counts["openActivityTasks"].should.equal(1) last_event = wfe.events()[-1] last_event.event_type.should.equal("ScheduleActivityTaskFailed") last_event.cause.should.equal("ACTIVITY_ID_ALREADY_IN_USE") + +def test_workflow_execution_start_activity_task(): + wfe = make_workflow_execution() + wfe.schedule_activity_task(123, VALID_ACTIVITY_TASK_ATTRIBUTES) + task_token = wfe.activity_tasks[-1].task_token + wfe.start_activity_task(task_token, identity="worker01") + task = wfe.activity_tasks[-1] + task.state.should.equal("STARTED") + wfe.events()[-1].event_type.should.equal("ActivityTaskStarted") + wfe.events()[-1].identity.should.equal("worker01") diff --git a/tests/test_swf/responses/test_activity_tasks.py b/tests/test_swf/responses/test_activity_tasks.py new file mode 100644 index 00000000..ea95c556 --- /dev/null +++ b/tests/test_swf/responses/test_activity_tasks.py @@ -0,0 +1,46 @@ +import boto +from sure import expect + +from moto import mock_swf +from moto.swf.exceptions import SWFUnknownResourceFault + +from ..utils import setup_workflow + + +# PollForActivityTask endpoint +@mock_swf +def test_poll_for_activity_task_when_one(): + conn = setup_workflow() + decision_token = conn.poll_for_decision_task("test-domain", "queue")["taskToken"] + conn.respond_decision_task_completed(decision_token, decisions=[ + { + "decisionType": "ScheduleActivityTask", + "scheduleActivityTaskDecisionAttributes": { + "activityId": "my-activity-001", + "activityType": { "name": "test-activity", "version": "v1.1" }, + "taskList": { "name": "activity-task-list" }, + } + } + ]) + resp = conn.poll_for_activity_task("test-domain", "activity-task-list", identity="surprise") + resp["activityId"].should.equal("my-activity-001") + resp["taskToken"].should_not.be.none + + resp = conn.get_workflow_execution_history("test-domain", conn.run_id, "uid-abcd1234") + resp["events"][-1]["eventType"].should.equal("ActivityTaskStarted") + resp["events"][-1]["activityTaskStartedEventAttributes"].should.equal( + { "identity": "surprise", "scheduledEventId": 5 } + ) + +@mock_swf +def test_poll_for_activity_task_when_none(): + conn = setup_workflow() + resp = conn.poll_for_activity_task("test-domain", "activity-task-list") + resp.should.equal({"startedEventId": 0}) + +@mock_swf +def test_poll_for_activity_task_on_non_existent_queue(): + conn = setup_workflow() + resp = conn.poll_for_activity_task("test-domain", "non-existent-queue") + resp.should.equal({"startedEventId": 0}) +