Add basic get_execution_history implementation for Step Functions (#3507)

* Add format command to makefile

* Refactor executions to be a attribute of StateMachine

* Begin to add tests for execution history

* Add tests for failed and successful event histories, with implementations

* Add failure case to environment var check

* Skip test if in server mode and update implementation coverage

* Add conditional import for mock to cover python 2

* Refactor stop execution logic into StateMachine

* Refactor event history environment variable into settings.py

* Remove typing and os import
This commit is contained in:
Ciaran Evans 2020-12-03 18:32:06 +00:00 committed by GitHub
commit 48df5bd5af
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
6 changed files with 383 additions and 46 deletions

View file

@ -1,6 +1,7 @@
import json
import re
from datetime import datetime
from dateutil.tz import tzlocal
from boto3 import Session
@ -17,6 +18,7 @@ from .exceptions import (
StateMachineDoesNotExist,
)
from .utils import paginate, api_to_cfn_tags, cfn_to_api_tags
from moto import settings
class StateMachine(CloudFormationModel):
@ -27,10 +29,51 @@ class StateMachine(CloudFormationModel):
self.name = name
self.definition = definition
self.roleArn = roleArn
self.executions = []
self.tags = []
if tags:
self.add_tags(tags)
def start_execution(self, region_name, account_id, execution_name, execution_input):
self._ensure_execution_name_doesnt_exist(execution_name)
self._validate_execution_input(execution_input)
execution = Execution(
region_name=region_name,
account_id=account_id,
state_machine_name=self.name,
execution_name=execution_name,
state_machine_arn=self.arn,
execution_input=execution_input,
)
self.executions.append(execution)
return execution
def stop_execution(self, execution_arn):
execution = next(
(x for x in self.executions if x.execution_arn == execution_arn), None
)
if not execution:
raise ExecutionDoesNotExist(
"Execution Does Not Exist: '" + execution_arn + "'"
)
execution.stop()
return execution
def _ensure_execution_name_doesnt_exist(self, name):
for execution in self.executions:
if execution.name == name:
raise ExecutionAlreadyExists(
"Execution Already Exists: '" + execution.execution_arn + "'"
)
def _validate_execution_input(self, execution_input):
try:
json.loads(execution_input)
except Exception as ex:
raise InvalidExecutionInput(
"Invalid State Machine Execution Input: '" + str(ex) + "'"
)
def update(self, **kwargs):
for key, value in kwargs.items():
if value is not None:
@ -176,6 +219,104 @@ class Execution:
self.status = "RUNNING"
self.stop_date = None
def get_execution_history(self, roleArn):
sf_execution_history_type = settings.get_sf_execution_history_type()
if sf_execution_history_type == "SUCCESS":
return [
{
"timestamp": iso_8601_datetime_with_milliseconds(
datetime(2020, 1, 1, 0, 0, 0, tzinfo=tzlocal())
),
"type": "ExecutionStarted",
"id": 1,
"previousEventId": 0,
"executionStartedEventDetails": {
"input": "{}",
"inputDetails": {"truncated": False},
"roleArn": roleArn,
},
},
{
"timestamp": iso_8601_datetime_with_milliseconds(
datetime(2020, 1, 1, 0, 0, 10, tzinfo=tzlocal())
),
"type": "PassStateEntered",
"id": 2,
"previousEventId": 0,
"stateEnteredEventDetails": {
"name": "A State",
"input": "{}",
"inputDetails": {"truncated": False},
},
},
{
"timestamp": iso_8601_datetime_with_milliseconds(
datetime(2020, 1, 1, 0, 0, 10, tzinfo=tzlocal())
),
"type": "PassStateExited",
"id": 3,
"previousEventId": 2,
"stateExitedEventDetails": {
"name": "A State",
"output": "An output",
"outputDetails": {"truncated": False},
},
},
{
"timestamp": iso_8601_datetime_with_milliseconds(
datetime(2020, 1, 1, 0, 0, 20, tzinfo=tzlocal())
),
"type": "ExecutionSucceeded",
"id": 4,
"previousEventId": 3,
"executionSucceededEventDetails": {
"output": "An output",
"outputDetails": {"truncated": False},
},
},
]
elif sf_execution_history_type == "FAILURE":
return [
{
"timestamp": iso_8601_datetime_with_milliseconds(
datetime(2020, 1, 1, 0, 0, 0, tzinfo=tzlocal())
),
"type": "ExecutionStarted",
"id": 1,
"previousEventId": 0,
"executionStartedEventDetails": {
"input": "{}",
"inputDetails": {"truncated": False},
"roleArn": roleArn,
},
},
{
"timestamp": iso_8601_datetime_with_milliseconds(
datetime(2020, 1, 1, 0, 0, 10, tzinfo=tzlocal())
),
"type": "FailStateEntered",
"id": 2,
"previousEventId": 0,
"stateEnteredEventDetails": {
"name": "A State",
"input": "{}",
"inputDetails": {"truncated": False},
},
},
{
"timestamp": iso_8601_datetime_with_milliseconds(
datetime(2020, 1, 1, 0, 0, 10, tzinfo=tzlocal())
),
"type": "ExecutionFailed",
"id": 3,
"previousEventId": 2,
"executionFailedEventDetails": {
"error": "AnError",
"cause": "An error occurred!",
},
},
]
def stop(self):
self.status = "ABORTED"
self.stop_date = iso_8601_datetime_with_milliseconds(datetime.now())
@ -346,38 +487,23 @@ class StepFunctionBackend(BaseBackend):
return sm
def start_execution(self, state_machine_arn, name=None, execution_input=None):
state_machine_name = self.describe_state_machine(state_machine_arn).name
self._ensure_execution_name_doesnt_exist(name)
self._validate_execution_input(execution_input)
execution = Execution(
state_machine = self.describe_state_machine(state_machine_arn)
execution = state_machine.start_execution(
region_name=self.region_name,
account_id=self._get_account_id(),
state_machine_name=state_machine_name,
execution_name=name or str(uuid4()),
state_machine_arn=state_machine_arn,
execution_input=execution_input,
)
self.executions.append(execution)
return execution
def stop_execution(self, execution_arn):
execution = next(
(x for x in self.executions if x.execution_arn == execution_arn), None
)
if not execution:
raise ExecutionDoesNotExist(
"Execution Does Not Exist: '" + execution_arn + "'"
)
execution.stop()
return execution
self._validate_execution_arn(execution_arn)
state_machine = self._get_state_machine_for_execution(execution_arn)
return state_machine.stop_execution(execution_arn)
@paginate
def list_executions(self, state_machine_arn, status_filter=None):
executions = [
execution
for execution in self.executions
if execution.state_machine_arn == state_machine_arn
]
executions = self.describe_state_machine(state_machine_arn).executions
if status_filter:
executions = list(filter(lambda e: e.status == status_filter, executions))
@ -385,13 +511,32 @@ class StepFunctionBackend(BaseBackend):
executions = sorted(executions, key=lambda x: x.start_date, reverse=True)
return executions
def describe_execution(self, arn):
self._validate_execution_arn(arn)
exctn = next((x for x in self.executions if x.execution_arn == arn), None)
def describe_execution(self, execution_arn):
self._validate_execution_arn(execution_arn)
state_machine = self._get_state_machine_for_execution(execution_arn)
exctn = next(
(x for x in state_machine.executions if x.execution_arn == execution_arn),
None,
)
if not exctn:
raise ExecutionDoesNotExist("Execution Does Not Exist: '" + arn + "'")
raise ExecutionDoesNotExist(
"Execution Does Not Exist: '" + execution_arn + "'"
)
return exctn
def get_execution_history(self, execution_arn):
self._validate_execution_arn(execution_arn)
state_machine = self._get_state_machine_for_execution(execution_arn)
execution = next(
(x for x in state_machine.executions if x.execution_arn == execution_arn),
None,
)
if not execution:
raise ExecutionDoesNotExist(
"Execution Does Not Exist: '" + execution_arn + "'"
)
return execution.get_execution_history(state_machine.roleArn)
def tag_resource(self, resource_arn, tags):
try:
state_machine = self.describe_state_machine(resource_arn)
@ -444,20 +589,18 @@ class StepFunctionBackend(BaseBackend):
if not arn or not match:
raise InvalidArn(invalid_msg)
def _ensure_execution_name_doesnt_exist(self, name):
for execution in self.executions:
if execution.name == name:
raise ExecutionAlreadyExists(
"Execution Already Exists: '" + execution.execution_arn + "'"
)
def _validate_execution_input(self, execution_input):
try:
json.loads(execution_input)
except Exception as ex:
raise InvalidExecutionInput(
"Invalid State Machine Execution Input: '" + str(ex) + "'"
def _get_state_machine_for_execution(self, execution_arn):
state_machine_name = execution_arn.split(":")[6]
state_machine_arn = next(
(x.arn for x in self.state_machines if x.name == state_machine_name), None
)
if not state_machine_arn:
# Assume that if the state machine arn is not present, then neither will the
# execution
raise ExecutionDoesNotExist(
"Execution Does Not Exist: '" + execution_arn + "'"
)
return self.describe_state_machine(state_machine_arn)
def _get_account_id(self):
return ACCOUNT_ID

View file

@ -208,6 +208,21 @@ class StepFunctionResponse(BaseResponse):
@amzn_request_id
def stop_execution(self):
arn = self._get_param("executionArn")
execution = self.stepfunction_backend.stop_execution(arn)
response = {"stopDate": execution.stop_date}
return 200, {}, json.dumps(response)
try:
execution = self.stepfunction_backend.stop_execution(arn)
response = {"stopDate": execution.stop_date}
return 200, {}, json.dumps(response)
except AWSError as err:
return err.response()
@amzn_request_id
def get_execution_history(self):
execution_arn = self._get_param("executionArn")
try:
execution_history = self.stepfunction_backend.get_execution_history(
execution_arn
)
response = {"events": execution_history}
return 200, {}, json.dumps(response)
except AWSError as err:
return err.response()