Add support for CloudFormation resource AWS::StepFunctions::StateMachine (#3429)

Closes #3402

Co-authored-by: Bert Blommers <bblommers@users.noreply.github.com>
This commit is contained in:
Brian Pandola 2020-11-02 03:53:03 -08:00 committed by GitHub
commit 171130fe7b
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 95 additions and 3 deletions

View file

@ -4,7 +4,7 @@ from datetime import datetime
from boto3 import Session
from moto.core import ACCOUNT_ID, BaseBackend
from moto.core import ACCOUNT_ID, BaseBackend, CloudFormationModel
from moto.core.utils import iso_8601_datetime_with_milliseconds
from uuid import uuid4
from .exceptions import (
@ -18,7 +18,7 @@ from .exceptions import (
from .utils import paginate
class StateMachine:
class StateMachine(CloudFormationModel):
def __init__(self, arn, name, definition, roleArn, tags=None):
self.creation_date = iso_8601_datetime_with_milliseconds(datetime.now())
self.arn = arn
@ -27,6 +27,45 @@ class StateMachine:
self.roleArn = roleArn
self.tags = tags
@property
def physical_resource_id(self):
return self.arn
def get_cfn_attribute(self, attribute_name):
from moto.cloudformation.exceptions import UnformattedGetAttTemplateException
if attribute_name == "Name":
return self.name
raise UnformattedGetAttTemplateException()
@staticmethod
def cloudformation_name_type():
return "StateMachine"
@staticmethod
def cloudformation_type():
return "AWS::StepFunctions::StateMachine"
@classmethod
def create_from_cloudformation_json(
cls, resource_name, cloudformation_json, region_name
):
properties = cloudformation_json["Properties"]
name = properties.get("StateMachineName", resource_name)
definition = properties.get("DefinitionString", "")
role_arn = properties.get("RoleArn", "")
tags = properties.get("Tags", [])
tags_xform = [{k.lower(): v for k, v in d.items()} for d in tags]
sf_backend = stepfunction_backends[region_name]
return sf_backend.create_state_machine(
name, definition, role_arn, tags=tags_xform
)
@classmethod
def delete_from_cloudformation_json(cls, resource_name, _, region_name):
sf_backend = stepfunction_backends[region_name]
sf_backend.delete_state_machine(resource_name)
class Execution:
def __init__(