Implementation of core AWS Media Live endpoins (#3428)

* Initial attempt to mock AWS Media Live create_channel endpoint. Test fails.

* Completes basic implementation of Media Live create_channel endpoint

* Completes basic implementation of Media Live list_channels endpoint

* Adds skaffolds for describe_channel and delete_channel

* Adds unit test for delete_channel

* Adds unit test for describe_channel

* Reduces repetitive code by introducing a Channel model

* Implements MediaLive start_channel and stop_channel endpoints

* Fixes lack of support for the dash character in resource ARNs

* Implements MediaLive update_channel endpoint.

* Implements MediaLive create_input endpoint (and Input model).

* Implements MediaLive describe_input endpoint.

* Implements MediaLive list_inputs endpoint.

* Implements MediaLive update_input endpoint.

* Addse server tests for MediaLive

* Adds further url patterns for medialive

* Fixes url patterns

* Fixes url patterns
This commit is contained in:
Jordan Dimov 2021-01-19 16:11:39 +00:00 committed by GitHub
commit f11e3183bb
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
11 changed files with 877 additions and 14 deletions

View file

@ -119,6 +119,7 @@ mock_kinesisvideo = lazy_load(".kinesisvideo", "mock_kinesisvideo")
mock_kinesisvideoarchivedmedia = lazy_load(
".kinesisvideoarchivedmedia", "mock_kinesisvideoarchivedmedia"
)
mock_medialive = lazy_load(".medialive", "mock_medialive")
# import logging
# logging.getLogger('boto').setLevel(logging.CRITICAL)

View file

@ -71,6 +71,7 @@ BACKENDS = {
"transcribe": ("transcribe", "transcribe_backends"),
"xray": ("xray", "xray_backends"),
"kinesisvideo": ("kinesisvideo", "kinesisvideo_backends"),
"medialive": ("medialive", "medialive_backends"),
"kinesis-video-archived-media": (
"kinesisvideoarchivedmedia",
"kinesisvideoarchivedmedia_backends",

View file

@ -324,7 +324,12 @@ class BaseResponse(_TemplateEnvironmentMixin, ActionAuthenticatorMixin):
def _convert(elem, is_last):
if not re.match("^{.*}$", elem):
return elem
name = elem.replace("{", "").replace("}", "").replace("+", "")
name = (
elem.replace("{", "")
.replace("}", "")
.replace("+", "")
.replace("-", "_")
)
if is_last:
return "(?P<%s>[^/]*)" % name
return "(?P<%s>.*)" % name

View file

@ -0,0 +1,6 @@
from __future__ import unicode_literals
from .models import medialive_backends
from ..core.models import base_decorator
medialive_backend = medialive_backends["us-east-1"]
mock_medialive = base_decorator(medialive_backends)

View file

@ -0,0 +1 @@
from __future__ import unicode_literals

302
moto/medialive/models.py Normal file
View file

@ -0,0 +1,302 @@
from __future__ import unicode_literals
from collections import OrderedDict
from uuid import uuid4
from boto3 import Session
from moto.core import BaseBackend, BaseModel
class Input(BaseModel):
def __init__(self, *args, **kwargs):
self.arn = kwargs.get("arn")
self.attached_channels = kwargs.get("attached_channels", [])
self.destinations = kwargs.get("destinations", [])
self.input_id = kwargs.get("input_id")
self.input_class = kwargs.get("input_class", "STANDARD")
self.input_devices = kwargs.get("input_devices", [])
self.input_source_type = kwargs.get("input_source_type", "STATIC")
self.media_connect_flows = kwargs.get("media_connect_flows", [])
self.name = kwargs.get("name")
self.role_arn = kwargs.get("role_arn")
self.security_groups = kwargs.get("security_groups", [])
self.sources = kwargs.get("sources", [])
# Possible states: 'CREATING'|'DETACHED'|'ATTACHED'|'DELETING'|'DELETED'
self.state = kwargs.get("state")
self.tags = kwargs.get("tags")
self.input_type = kwargs.get("input_type")
def to_dict(self):
data = {
"arn": self.arn,
"attachedChannels": self.attached_channels,
"destinations": self.destinations,
"id": self.input_id,
"inputClass": self.input_class,
"inputDevices": self.input_devices,
"inputSourceType": self.input_source_type,
"mediaConnectFlows": self.media_connect_flows,
"name": self.name,
"roleArn": self.role_arn,
"securityGroups": self.security_groups,
"sources": self.sources,
"state": self.state,
"tags": self.tags,
"type": self.input_type,
}
return data
def _resolve_transient_states(self):
# Resolve transient states before second call
# (to simulate AWS taking its sweet time with these things)
if self.state in ["CREATING"]:
self.state = "DETACHED" # or ATTACHED
elif self.state == "DELETING":
self.state = "DELETED"
class Channel(BaseModel):
def __init__(self, *args, **kwargs):
self.arn = kwargs.get("arn")
self.cdi_input_specification = kwargs.get("cdi_input_specification")
self.channel_class = kwargs.get("channel_class", "STANDARD")
self.destinations = kwargs.get("destinations")
self.egress_endpoints = kwargs.get("egress_endpoints", [])
self.encoder_settings = kwargs.get("encoder_settings")
self.channel_id = kwargs.get("channel_id")
self.input_attachments = kwargs.get("input_attachments")
self.input_specification = kwargs.get("input_specification")
self.log_level = kwargs.get("log_level")
self.name = kwargs.get("name")
self.pipeline_details = kwargs.get("pipeline_details", [])
self.role_arn = kwargs.get("role_arn")
self.state = kwargs.get("state")
self.tags = kwargs.get("tags")
self._previous_state = None
def to_dict(self, exclude=None):
data = {
"arn": self.arn,
"cdiInputSpecification": self.cdi_input_specification,
"channelClass": self.channel_class,
"destinations": self.destinations,
"egressEndpoints": self.egress_endpoints,
"encoderSettings": self.encoder_settings,
"id": self.channel_id,
"inputAttachments": self.input_attachments,
"inputSpecification": self.input_specification,
"logLevel": self.log_level,
"name": self.name,
"pipelineDetails": self.pipeline_details,
"pipelinesRunningCount": 1
if self.channel_class == "SINGLE_PIPELINE"
else 2,
"roleArn": self.role_arn,
"state": self.state,
"tags": self.tags,
}
if exclude:
for key in exclude:
del data[key]
return data
def _resolve_transient_states(self):
# Resolve transient states before second call
# (to simulate AWS taking its sweet time with these things)
if self.state in ["CREATING", "STOPPING"]:
self.state = "IDLE"
elif self.state == "STARTING":
self.state = "RUNNING"
elif self.state == "DELETING":
self.state = "DELETED"
elif self.state == "UPDATING":
self.state = self._previous_state
self._previous_state = None
class MediaLiveBackend(BaseBackend):
def __init__(self, region_name=None):
super(MediaLiveBackend, self).__init__()
self.region_name = region_name
self._channels = OrderedDict()
self._inputs = OrderedDict()
def reset(self):
region_name = self.region_name
self.__dict__ = {}
self.__init__(region_name)
def create_channel(
self,
cdi_input_specification,
channel_class,
destinations,
encoder_settings,
input_attachments,
input_specification,
log_level,
name,
request_id,
reserved,
role_arn,
tags,
):
channel_id = uuid4().hex
arn = "arn:aws:medialive:channel:{}".format(channel_id)
channel = Channel(
arn=arn,
cdi_input_specification=cdi_input_specification,
channel_class=channel_class or "STANDARD",
destinations=destinations,
egress_endpoints=[],
encoder_settings=encoder_settings,
channel_id=channel_id,
input_attachments=input_attachments,
input_specification=input_specification,
log_level=log_level,
name=name,
pipeline_details=[],
role_arn=role_arn,
state="CREATING",
tags=tags,
)
self._channels[channel_id] = channel
return channel
def list_channels(self, max_results, next_token):
channels = list(self._channels.values())
if max_results is not None:
channels = channels[:max_results]
response_channels = [
c.to_dict(exclude=["encoderSettings", "pipelineDetails"]) for c in channels
]
return response_channels, next_token
def describe_channel(self, channel_id):
channel = self._channels[channel_id]
channel._resolve_transient_states()
return channel.to_dict()
def delete_channel(self, channel_id):
channel = self._channels[channel_id]
channel.state = "DELETING"
return channel.to_dict()
def start_channel(self, channel_id):
channel = self._channels[channel_id]
channel.state = "STARTING"
return channel.to_dict()
def stop_channel(self, channel_id):
channel = self._channels[channel_id]
channel.state = "STOPPING"
return channel.to_dict()
def update_channel(
self,
channel_id,
cdi_input_specification,
destinations,
encoder_settings,
input_attachments,
input_specification,
log_level,
name,
role_arn,
):
channel = self._channels[channel_id]
channel.cdi_input_specification = cdi_input_specification
channel.destinations = destinations
channel.encoder_settings = encoder_settings
channel.input_attachments = input_attachments
channel.input_specification = input_specification
channel.log_level = log_level
channel.name = name
channel.role_arn = role_arn
channel._resolve_transient_states()
channel._previous_state = channel.state
channel.state = "UPDATING"
return channel
def create_input(
self,
destinations,
input_devices,
input_security_groups,
media_connect_flows,
name,
request_id,
role_arn,
sources,
tags,
type,
vpc,
):
input_id = uuid4().hex
arn = "arn:aws:medialive:input:{}".format(input_id)
a_input = Input(
arn=arn,
input_id=input_id,
destinations=destinations,
input_devices=input_devices,
input_security_groups=input_security_groups,
media_connect_flows=media_connect_flows,
name=name,
role_arn=role_arn,
sources=sources,
tags=tags,
input_type=type,
state="CREATING",
)
self._inputs[input_id] = a_input
return a_input
def describe_input(self, input_id):
a_input = self._inputs[input_id]
a_input._resolve_transient_states()
return a_input.to_dict()
def list_inputs(self, max_results, next_token):
inputs = list(self._inputs.values())
if max_results is not None:
inputs = inputs[:max_results]
response_inputs = [i.to_dict() for i in inputs]
return response_inputs, next_token
def delete_input(self, input_id):
a_input = self._inputs[input_id]
a_input.state = "DELETING"
return a_input.to_dict()
def update_input(
self,
destinations,
input_devices,
input_id,
input_security_groups,
media_connect_flows,
name,
role_arn,
sources,
):
a_input = self._inputs[input_id]
a_input.destinations = destinations
a_input.input_devices = input_devices
a_input.security_groups = input_security_groups
a_input.media_connect_flows = media_connect_flows
a_input.name = name
a_input.role_arn = role_arn
a_input.sources = sources
return a_input
medialive_backends = {}
for region in Session().get_available_regions("medialive"):
medialive_backends[region] = MediaLiveBackend()
for region in Session().get_available_regions("medialive", partition_name="aws-us-gov"):
medialive_backends[region] = MediaLiveBackend()
for region in Session().get_available_regions("medialive", partition_name="aws-cn"):
medialive_backends[region] = MediaLiveBackend()

160
moto/medialive/responses.py Normal file
View file

@ -0,0 +1,160 @@
from __future__ import unicode_literals
from moto.core.responses import BaseResponse
from .models import medialive_backends
import json
class MediaLiveResponse(BaseResponse):
SERVICE_NAME = "medialive"
@property
def medialive_backend(self):
return medialive_backends[self.region]
def create_channel(self):
cdi_input_specification = self._get_param("cdiInputSpecification")
channel_class = self._get_param("channelClass")
destinations = self._get_param("destinations")
encoder_settings = self._get_param("encoderSettings")
input_attachments = self._get_param("inputAttachments")
input_specification = self._get_param("inputSpecification")
log_level = self._get_param("logLevel")
name = self._get_param("name")
request_id = self._get_param("requestId")
reserved = self._get_param("reserved")
role_arn = self._get_param("roleArn")
tags = self._get_param("tags")
channel = self.medialive_backend.create_channel(
cdi_input_specification=cdi_input_specification,
channel_class=channel_class,
destinations=destinations,
encoder_settings=encoder_settings,
input_attachments=input_attachments,
input_specification=input_specification,
log_level=log_level,
name=name,
request_id=request_id,
reserved=reserved,
role_arn=role_arn,
tags=tags,
)
return json.dumps(
dict(channel=channel.to_dict(exclude=["pipelinesRunningCount"]))
)
def list_channels(self):
max_results = self._get_int_param("maxResults")
next_token = self._get_param("nextToken")
channels, next_token = self.medialive_backend.list_channels(
max_results=max_results, next_token=next_token,
)
return json.dumps(dict(channels=channels, nextToken=next_token))
def describe_channel(self):
channel_id = self._get_param("channelId")
return json.dumps(
self.medialive_backend.describe_channel(channel_id=channel_id,)
)
def delete_channel(self):
channel_id = self._get_param("channelId")
return json.dumps(self.medialive_backend.delete_channel(channel_id=channel_id,))
def start_channel(self):
channel_id = self._get_param("channelId")
return json.dumps(self.medialive_backend.start_channel(channel_id=channel_id,))
def stop_channel(self):
channel_id = self._get_param("channelId")
return json.dumps(self.medialive_backend.stop_channel(channel_id=channel_id,))
def update_channel(self):
channel_id = self._get_param("channelId")
cdi_input_specification = self._get_param("cdiInputSpecification")
destinations = self._get_param("destinations")
encoder_settings = self._get_param("encoderSettings")
input_attachments = self._get_param("inputAttachments")
input_specification = self._get_param("inputSpecification")
log_level = self._get_param("logLevel")
name = self._get_param("name")
role_arn = self._get_param("roleArn")
channel = self.medialive_backend.update_channel(
channel_id=channel_id,
cdi_input_specification=cdi_input_specification,
destinations=destinations,
encoder_settings=encoder_settings,
input_attachments=input_attachments,
input_specification=input_specification,
log_level=log_level,
name=name,
role_arn=role_arn,
)
return json.dumps(dict(channel=channel.to_dict()))
def create_input(self):
destinations = self._get_param("destinations")
input_devices = self._get_param("inputDevices")
input_security_groups = self._get_param("inputSecurityGroups")
media_connect_flows = self._get_param("mediaConnectFlows")
name = self._get_param("name")
request_id = self._get_param("requestId")
role_arn = self._get_param("roleArn")
sources = self._get_param("sources")
tags = self._get_param("tags")
type = self._get_param("type")
vpc = self._get_param("vpc")
a_input = self.medialive_backend.create_input(
destinations=destinations,
input_devices=input_devices,
input_security_groups=input_security_groups,
media_connect_flows=media_connect_flows,
name=name,
request_id=request_id,
role_arn=role_arn,
sources=sources,
tags=tags,
type=type,
vpc=vpc,
)
return json.dumps({"input": a_input.to_dict()})
def describe_input(self):
input_id = self._get_param("inputId")
return json.dumps(self.medialive_backend.describe_input(input_id=input_id,))
def list_inputs(self):
max_results = self._get_int_param("maxResults")
next_token = self._get_param("nextToken")
inputs, next_token = self.medialive_backend.list_inputs(
max_results=max_results, next_token=next_token,
)
return json.dumps(dict(inputs=inputs, nextToken=next_token))
def delete_input(self):
input_id = self._get_param("inputId")
self.medialive_backend.delete_input(input_id=input_id,)
return json.dumps({})
def update_input(self):
destinations = self._get_param("destinations")
input_devices = self._get_param("inputDevices")
input_id = self._get_param("inputId")
input_security_groups = self._get_param("inputSecurityGroups")
media_connect_flows = self._get_param("mediaConnectFlows")
name = self._get_param("name")
role_arn = self._get_param("roleArn")
sources = self._get_param("sources")
a_input = self.medialive_backend.update_input(
destinations=destinations,
input_devices=input_devices,
input_id=input_id,
input_security_groups=input_security_groups,
media_connect_flows=media_connect_flows,
name=name,
role_arn=role_arn,
sources=sources,
)
return json.dumps(dict(input=a_input.to_dict()))

19
moto/medialive/urls.py Normal file
View file

@ -0,0 +1,19 @@
from __future__ import unicode_literals
from .responses import MediaLiveResponse
url_bases = [
"https?://medialive.(.+).amazonaws.com",
]
response = MediaLiveResponse()
url_paths = {
"{0}/prod/channels": response.dispatch,
"{0}/prod/channels/(?P<channelid>[^/.]+)": response.dispatch,
"{0}/prod/channels/(?P<channelid>[^/.]+)/start": response.dispatch,
"{0}/prod/channels/(?P<channelid>[^/.]+)/stop": response.dispatch,
"{0}/prod/inputs": response.dispatch,
"{0}/prod/inputs/(?P<inputid>[^/.]+)": response.dispatch,
}