Implement additional Step Functions endpoints (#3437)

* Implement tagging/untagging for State Machine resources

* Implement `stepfunctions:UpdateStateMachine` endpoint
This commit is contained in:
Brian Pandola 2020-11-05 02:16:48 -08:00 committed by GitHub
commit 574f46e212
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
5 changed files with 208 additions and 4 deletions

View file

@ -13,6 +13,7 @@ from .exceptions import (
InvalidArn,
InvalidExecutionInput,
InvalidName,
ResourceNotFound,
StateMachineDoesNotExist,
)
from .utils import paginate
@ -21,11 +22,41 @@ from .utils import paginate
class StateMachine(CloudFormationModel):
def __init__(self, arn, name, definition, roleArn, tags=None):
self.creation_date = iso_8601_datetime_with_milliseconds(datetime.now())
self.update_date = self.creation_date
self.arn = arn
self.name = name
self.definition = definition
self.roleArn = roleArn
self.tags = tags
self.tags = []
if tags:
self.add_tags(tags)
def update(self, **kwargs):
for key, value in kwargs.items():
if value is not None:
setattr(self, key, value)
self.update_date = iso_8601_datetime_with_milliseconds(datetime.now())
def add_tags(self, tags):
merged_tags = []
for tag in self.tags:
replacement_index = next(
(index for (index, d) in enumerate(tags) if d["key"] == tag["key"]),
None,
)
if replacement_index is not None:
replacement = tags.pop(replacement_index)
merged_tags.append(replacement)
else:
merged_tags.append(tag)
for tag in tags:
merged_tags.append(tag)
self.tags = merged_tags
return self.tags
def remove_tags(self, tag_keys):
self.tags = [tag_set for tag_set in self.tags if tag_set["key"] not in tag_keys]
return self.tags
@property
def physical_resource_id(self):
@ -249,6 +280,15 @@ class StepFunctionBackend(BaseBackend):
if sm:
self.state_machines.remove(sm)
def update_state_machine(self, arn, definition=None, role_arn=None):
sm = self.describe_state_machine(arn)
updates = {
"definition": definition,
"roleArn": role_arn,
}
sm.update(**updates)
return sm
def start_execution(self, state_machine_arn, name=None, execution_input=None):
state_machine_name = self.describe_state_machine(state_machine_arn).name
self._ensure_execution_name_doesnt_exist(name)
@ -296,6 +336,20 @@ class StepFunctionBackend(BaseBackend):
raise ExecutionDoesNotExist("Execution Does Not Exist: '" + arn + "'")
return exctn
def tag_resource(self, resource_arn, tags):
try:
state_machine = self.describe_state_machine(resource_arn)
state_machine.add_tags(tags)
except StateMachineDoesNotExist:
raise ResourceNotFound(resource_arn)
def untag_resource(self, resource_arn, tag_keys):
try:
state_machine = self.describe_state_machine(resource_arn)
state_machine.remove_tags(tag_keys)
except StateMachineDoesNotExist:
raise ResourceNotFound(resource_arn)
def reset(self):
region_name = self.region_name
self.__dict__ = {}