Support iot and iot-data (#1303)
* append appropriate urls when scaffolding * make dispatch for rest-api * fix dispatch for rest-json * fix moto/core/response to obtain path and body parameters * small fixes * remove unused import * fix get_int_param * Add features of things and thing-types * fix scaffold * basic crud of cert * support basic CRUD of policy * refactor * fix formatting of scaffold * support principal_pocicy * support thing_principal * update readme * escape service to handle service w/ hyphen like iot-data * escape service w/ hyphen * fix regexp to extract region from url * escape service * Implement basic iota-data feature * iot-data shadow delta * update readme * remove unused import * remove comment * fix syntax * specify region when creating boto3 client for test * use uuid for seed of generating cert id * specify region_name to iotdata client in test * specify region to boto3 client in moto response * excude iot and iotdata tests on server mode * fix handling of thingTypeName in describe-thing * test if server is up for iot
This commit is contained in:
parent
884fc6f260
commit
0de2e55b13
20 changed files with 1260 additions and 4 deletions
6
moto/iot/__init__.py
Normal file
6
moto/iot/__init__.py
Normal file
|
|
@ -0,0 +1,6 @@
|
|||
from __future__ import unicode_literals
|
||||
from .models import iot_backends
|
||||
from ..core.models import base_decorator
|
||||
|
||||
iot_backend = iot_backends['us-east-1']
|
||||
mock_iot = base_decorator(iot_backends)
|
||||
24
moto/iot/exceptions.py
Normal file
24
moto/iot/exceptions.py
Normal file
|
|
@ -0,0 +1,24 @@
|
|||
from __future__ import unicode_literals
|
||||
from moto.core.exceptions import RESTError
|
||||
|
||||
|
||||
class IoTClientError(RESTError):
|
||||
code = 400
|
||||
|
||||
|
||||
class ResourceNotFoundException(IoTClientError):
|
||||
def __init__(self):
|
||||
self.code = 400
|
||||
super(ResourceNotFoundException, self).__init__(
|
||||
"ResourceNotFoundException",
|
||||
"The specified resource does not exist"
|
||||
)
|
||||
|
||||
|
||||
class InvalidRequestException(IoTClientError):
|
||||
def __init__(self):
|
||||
self.code = 400
|
||||
super(InvalidRequestException, self).__init__(
|
||||
"InvalidRequestException",
|
||||
"The request is not valid."
|
||||
)
|
||||
364
moto/iot/models.py
Normal file
364
moto/iot/models.py
Normal file
|
|
@ -0,0 +1,364 @@
|
|||
from __future__ import unicode_literals
|
||||
import time
|
||||
import boto3
|
||||
import string
|
||||
import random
|
||||
import hashlib
|
||||
import uuid
|
||||
from moto.core import BaseBackend, BaseModel
|
||||
from collections import OrderedDict
|
||||
from .exceptions import (
|
||||
ResourceNotFoundException,
|
||||
InvalidRequestException
|
||||
)
|
||||
|
||||
|
||||
class FakeThing(BaseModel):
|
||||
def __init__(self, thing_name, thing_type, attributes, region_name):
|
||||
self.region_name = region_name
|
||||
self.thing_name = thing_name
|
||||
self.thing_type = thing_type
|
||||
self.attributes = attributes
|
||||
self.arn = 'arn:aws:iot:%s:1:thing/%s' % (self.region_name, thing_name)
|
||||
self.version = 1
|
||||
# TODO: we need to handle 'version'?
|
||||
|
||||
# for iot-data
|
||||
self.thing_shadow = None
|
||||
|
||||
def to_dict(self, include_default_client_id=False):
|
||||
obj = {
|
||||
'thingName': self.thing_name,
|
||||
'attributes': self.attributes,
|
||||
'version': self.version
|
||||
}
|
||||
if self.thing_type:
|
||||
obj['thingTypeName'] = self.thing_type.thing_type_name
|
||||
if include_default_client_id:
|
||||
obj['defaultClientId'] = self.thing_name
|
||||
return obj
|
||||
|
||||
|
||||
class FakeThingType(BaseModel):
|
||||
def __init__(self, thing_type_name, thing_type_properties, region_name):
|
||||
self.region_name = region_name
|
||||
self.thing_type_name = thing_type_name
|
||||
self.thing_type_properties = thing_type_properties
|
||||
t = time.time()
|
||||
self.metadata = {
|
||||
'deprecated': False,
|
||||
'creationData': int(t * 1000) / 1000.0
|
||||
}
|
||||
self.arn = 'arn:aws:iot:%s:1:thingtype/%s' % (self.region_name, thing_type_name)
|
||||
|
||||
def to_dict(self):
|
||||
return {
|
||||
'thingTypeName': self.thing_type_name,
|
||||
'thingTypeProperties': self.thing_type_properties,
|
||||
'thingTypeMetadata': self.metadata
|
||||
}
|
||||
|
||||
|
||||
class FakeCertificate(BaseModel):
|
||||
def __init__(self, certificate_pem, status, region_name):
|
||||
m = hashlib.sha256()
|
||||
m.update(str(uuid.uuid4()).encode('utf-8'))
|
||||
self.certificate_id = m.hexdigest()
|
||||
self.arn = 'arn:aws:iot:%s:1:cert/%s' % (region_name, self.certificate_id)
|
||||
self.certificate_pem = certificate_pem
|
||||
self.status = status
|
||||
|
||||
# TODO: must adjust
|
||||
self.owner = '1'
|
||||
self.transfer_data = {}
|
||||
self.creation_date = time.time()
|
||||
self.last_modified_date = self.creation_date
|
||||
self.ca_certificate_id = None
|
||||
|
||||
def to_dict(self):
|
||||
return {
|
||||
'certificateArn': self.arn,
|
||||
'certificateId': self.certificate_id,
|
||||
'status': self.status,
|
||||
'creationDate': self.creation_date
|
||||
}
|
||||
|
||||
def to_description_dict(self):
|
||||
"""
|
||||
You might need keys below in some situation
|
||||
- caCertificateId
|
||||
- previousOwnedBy
|
||||
"""
|
||||
return {
|
||||
'certificateArn': self.arn,
|
||||
'certificateId': self.certificate_id,
|
||||
'status': self.status,
|
||||
'certificatePem': self.certificate_pem,
|
||||
'ownedBy': self.owner,
|
||||
'creationDate': self.creation_date,
|
||||
'lastModifiedDate': self.last_modified_date,
|
||||
'transferData': self.transfer_data
|
||||
}
|
||||
|
||||
|
||||
class FakePolicy(BaseModel):
|
||||
def __init__(self, name, document, region_name):
|
||||
self.name = name
|
||||
self.document = document
|
||||
self.arn = 'arn:aws:iot:%s:1:policy/%s' % (region_name, name)
|
||||
self.version = '1' # TODO: handle version
|
||||
|
||||
def to_get_dict(self):
|
||||
return {
|
||||
'policyName': self.name,
|
||||
'policyArn': self.arn,
|
||||
'policyDocument': self.document,
|
||||
'defaultVersionId': self.version
|
||||
}
|
||||
|
||||
def to_dict_at_creation(self):
|
||||
return {
|
||||
'policyName': self.name,
|
||||
'policyArn': self.arn,
|
||||
'policyDocument': self.document,
|
||||
'policyVersionId': self.version
|
||||
}
|
||||
|
||||
def to_dict(self):
|
||||
return {
|
||||
'policyName': self.name,
|
||||
'policyArn': self.arn,
|
||||
}
|
||||
|
||||
|
||||
class IoTBackend(BaseBackend):
|
||||
def __init__(self, region_name=None):
|
||||
super(IoTBackend, self).__init__()
|
||||
self.region_name = region_name
|
||||
self.things = OrderedDict()
|
||||
self.thing_types = OrderedDict()
|
||||
self.certificates = OrderedDict()
|
||||
self.policies = OrderedDict()
|
||||
self.principal_policies = OrderedDict()
|
||||
self.principal_things = OrderedDict()
|
||||
|
||||
def reset(self):
|
||||
region_name = self.region_name
|
||||
self.__dict__ = {}
|
||||
self.__init__(region_name)
|
||||
|
||||
def create_thing(self, thing_name, thing_type_name, attribute_payload):
|
||||
thing_types = self.list_thing_types()
|
||||
thing_type = None
|
||||
if thing_type_name:
|
||||
filtered_thing_types = [_ for _ in thing_types if _.thing_type_name == thing_type_name]
|
||||
if len(filtered_thing_types) == 0:
|
||||
raise ResourceNotFoundException()
|
||||
thing_type = filtered_thing_types[0]
|
||||
if attribute_payload is None:
|
||||
attributes = {}
|
||||
elif 'attributes' not in attribute_payload:
|
||||
attributes = {}
|
||||
else:
|
||||
attributes = attribute_payload['attributes']
|
||||
thing = FakeThing(thing_name, thing_type, attributes, self.region_name)
|
||||
self.things[thing.arn] = thing
|
||||
return thing.thing_name, thing.arn
|
||||
|
||||
def create_thing_type(self, thing_type_name, thing_type_properties):
|
||||
if thing_type_properties is None:
|
||||
thing_type_properties = {}
|
||||
thing_type = FakeThingType(thing_type_name, thing_type_properties, self.region_name)
|
||||
self.thing_types[thing_type.arn] = thing_type
|
||||
return thing_type.thing_type_name, thing_type.arn
|
||||
|
||||
def list_thing_types(self, thing_type_name=None):
|
||||
if thing_type_name:
|
||||
# It's wierd but thing_type_name is filterd by forward match, not complete match
|
||||
return [_ for _ in self.thing_types.values() if _.thing_type_name.startswith(thing_type_name)]
|
||||
thing_types = self.thing_types.values()
|
||||
return thing_types
|
||||
|
||||
def list_things(self, attribute_name, attribute_value, thing_type_name):
|
||||
# TODO: filter by attributess or thing_type
|
||||
things = self.things.values()
|
||||
return things
|
||||
|
||||
def describe_thing(self, thing_name):
|
||||
things = [_ for _ in self.things.values() if _.thing_name == thing_name]
|
||||
if len(things) == 0:
|
||||
raise ResourceNotFoundException()
|
||||
return things[0]
|
||||
|
||||
def describe_thing_type(self, thing_type_name):
|
||||
thing_types = [_ for _ in self.thing_types.values() if _.thing_type_name == thing_type_name]
|
||||
if len(thing_types) == 0:
|
||||
raise ResourceNotFoundException()
|
||||
return thing_types[0]
|
||||
|
||||
def delete_thing(self, thing_name, expected_version):
|
||||
# TODO: handle expected_version
|
||||
|
||||
# can raise ResourceNotFoundError
|
||||
thing = self.describe_thing(thing_name)
|
||||
del self.things[thing.arn]
|
||||
|
||||
def delete_thing_type(self, thing_type_name):
|
||||
# can raise ResourceNotFoundError
|
||||
thing_type = self.describe_thing_type(thing_type_name)
|
||||
del self.thing_types[thing_type.arn]
|
||||
|
||||
def update_thing(self, thing_name, thing_type_name, attribute_payload, expected_version, remove_thing_type):
|
||||
# if attributes payload = {}, nothing
|
||||
thing = self.describe_thing(thing_name)
|
||||
thing_type = None
|
||||
|
||||
if remove_thing_type and thing_type_name:
|
||||
raise InvalidRequestException()
|
||||
|
||||
# thing_type
|
||||
if thing_type_name:
|
||||
thing_types = self.list_thing_types()
|
||||
filtered_thing_types = [_ for _ in thing_types if _.thing_type_name == thing_type_name]
|
||||
if len(filtered_thing_types) == 0:
|
||||
raise ResourceNotFoundException()
|
||||
thing_type = filtered_thing_types[0]
|
||||
thing.thing_type = thing_type
|
||||
|
||||
if remove_thing_type:
|
||||
thing.thing_type = None
|
||||
|
||||
# attribute
|
||||
if attribute_payload is not None and 'attributes' in attribute_payload:
|
||||
do_merge = attribute_payload.get('merge', False)
|
||||
attributes = attribute_payload['attributes']
|
||||
if not do_merge:
|
||||
thing.attributes = attributes
|
||||
else:
|
||||
thing.attributes.update(attributes)
|
||||
|
||||
def _random_string(self):
|
||||
n = 20
|
||||
random_str = ''.join([random.choice(string.ascii_letters + string.digits) for i in range(n)])
|
||||
return random_str
|
||||
|
||||
def create_keys_and_certificate(self, set_as_active):
|
||||
# implement here
|
||||
# caCertificate can be blank
|
||||
key_pair = {
|
||||
'PublicKey': self._random_string(),
|
||||
'PrivateKey': self._random_string()
|
||||
}
|
||||
certificate_pem = self._random_string()
|
||||
status = 'ACTIVE' if set_as_active else 'INACTIVE'
|
||||
certificate = FakeCertificate(certificate_pem, status, self.region_name)
|
||||
self.certificates[certificate.certificate_id] = certificate
|
||||
return certificate, key_pair
|
||||
|
||||
def delete_certificate(self, certificate_id):
|
||||
self.describe_certificate(certificate_id)
|
||||
del self.certificates[certificate_id]
|
||||
|
||||
def describe_certificate(self, certificate_id):
|
||||
certs = [_ for _ in self.certificates.values() if _.certificate_id == certificate_id]
|
||||
if len(certs) == 0:
|
||||
raise ResourceNotFoundException()
|
||||
return certs[0]
|
||||
|
||||
def list_certificates(self):
|
||||
return self.certificates.values()
|
||||
|
||||
def update_certificate(self, certificate_id, new_status):
|
||||
cert = self.describe_certificate(certificate_id)
|
||||
# TODO: validate new_status
|
||||
cert.status = new_status
|
||||
|
||||
def create_policy(self, policy_name, policy_document):
|
||||
policy = FakePolicy(policy_name, policy_document, self.region_name)
|
||||
self.policies[policy.name] = policy
|
||||
return policy
|
||||
|
||||
def list_policies(self):
|
||||
policies = self.policies.values()
|
||||
return policies
|
||||
|
||||
def get_policy(self, policy_name):
|
||||
policies = [_ for _ in self.policies.values() if _.name == policy_name]
|
||||
if len(policies) == 0:
|
||||
raise ResourceNotFoundException()
|
||||
return policies[0]
|
||||
|
||||
def delete_policy(self, policy_name):
|
||||
policy = self.get_policy(policy_name)
|
||||
del self.policies[policy.name]
|
||||
|
||||
def _get_principal(self, principal_arn):
|
||||
"""
|
||||
raise ResourceNotFoundException
|
||||
"""
|
||||
if ':cert/' in principal_arn:
|
||||
certs = [_ for _ in self.certificates.values() if _.arn == principal_arn]
|
||||
if len(certs) == 0:
|
||||
raise ResourceNotFoundException()
|
||||
principal = certs[0]
|
||||
return principal
|
||||
else:
|
||||
# TODO: search for cognito_ids
|
||||
pass
|
||||
raise ResourceNotFoundException()
|
||||
|
||||
def attach_principal_policy(self, policy_name, principal_arn):
|
||||
principal = self._get_principal(principal_arn)
|
||||
policy = self.get_policy(policy_name)
|
||||
k = (principal_arn, policy_name)
|
||||
if k in self.principal_policies:
|
||||
return
|
||||
self.principal_policies[k] = (principal, policy)
|
||||
|
||||
def detach_principal_policy(self, policy_name, principal_arn):
|
||||
# this may raises ResourceNotFoundException
|
||||
self._get_principal(principal_arn)
|
||||
self.get_policy(policy_name)
|
||||
|
||||
k = (principal_arn, policy_name)
|
||||
if k not in self.principal_policies:
|
||||
raise ResourceNotFoundException()
|
||||
del self.principal_policies[k]
|
||||
|
||||
def list_principal_policies(self, principal_arn):
|
||||
policies = [v[1] for k, v in self.principal_policies.items() if k[0] == principal_arn]
|
||||
return policies
|
||||
|
||||
def list_policy_principals(self, policy_name):
|
||||
principals = [k[0] for k, v in self.principal_policies.items() if k[1] == policy_name]
|
||||
return principals
|
||||
|
||||
def attach_thing_principal(self, thing_name, principal_arn):
|
||||
principal = self._get_principal(principal_arn)
|
||||
thing = self.describe_thing(thing_name)
|
||||
k = (principal_arn, thing_name)
|
||||
if k in self.principal_things:
|
||||
return
|
||||
self.principal_things[k] = (principal, thing)
|
||||
|
||||
def detach_thing_principal(self, thing_name, principal_arn):
|
||||
# this may raises ResourceNotFoundException
|
||||
self._get_principal(principal_arn)
|
||||
self.describe_thing(thing_name)
|
||||
|
||||
k = (principal_arn, thing_name)
|
||||
if k not in self.principal_things:
|
||||
raise ResourceNotFoundException()
|
||||
del self.principal_things[k]
|
||||
|
||||
def list_principal_things(self, principal_arn):
|
||||
thing_names = [k[0] for k, v in self.principal_things.items() if k[0] == principal_arn]
|
||||
return thing_names
|
||||
|
||||
def list_thing_principals(self, thing_name):
|
||||
principals = [k[0] for k, v in self.principal_things.items() if k[1] == thing_name]
|
||||
return principals
|
||||
|
||||
|
||||
available_regions = boto3.session.Session().get_available_regions("iot")
|
||||
iot_backends = {region: IoTBackend(region) for region in available_regions}
|
||||
258
moto/iot/responses.py
Normal file
258
moto/iot/responses.py
Normal file
|
|
@ -0,0 +1,258 @@
|
|||
from __future__ import unicode_literals
|
||||
from moto.core.responses import BaseResponse
|
||||
from .models import iot_backends
|
||||
import json
|
||||
|
||||
|
||||
class IoTResponse(BaseResponse):
|
||||
SERVICE_NAME = 'iot'
|
||||
|
||||
@property
|
||||
def iot_backend(self):
|
||||
return iot_backends[self.region]
|
||||
|
||||
def create_thing(self):
|
||||
thing_name = self._get_param("thingName")
|
||||
thing_type_name = self._get_param("thingTypeName")
|
||||
attribute_payload = self._get_param("attributePayload")
|
||||
thing_name, thing_arn = self.iot_backend.create_thing(
|
||||
thing_name=thing_name,
|
||||
thing_type_name=thing_type_name,
|
||||
attribute_payload=attribute_payload,
|
||||
)
|
||||
return json.dumps(dict(thingName=thing_name, thingArn=thing_arn))
|
||||
|
||||
def create_thing_type(self):
|
||||
thing_type_name = self._get_param("thingTypeName")
|
||||
thing_type_properties = self._get_param("thingTypeProperties")
|
||||
thing_type_name, thing_type_arn = self.iot_backend.create_thing_type(
|
||||
thing_type_name=thing_type_name,
|
||||
thing_type_properties=thing_type_properties,
|
||||
)
|
||||
return json.dumps(dict(thingTypeName=thing_type_name, thingTypeArn=thing_type_arn))
|
||||
|
||||
def list_thing_types(self):
|
||||
# previous_next_token = self._get_param("nextToken")
|
||||
# max_results = self._get_int_param("maxResults")
|
||||
thing_type_name = self._get_param("thingTypeName")
|
||||
thing_types = self.iot_backend.list_thing_types(
|
||||
thing_type_name=thing_type_name
|
||||
)
|
||||
|
||||
# TODO: support next_token and max_results
|
||||
next_token = None
|
||||
return json.dumps(dict(thingTypes=[_.to_dict() for _ in thing_types], nextToken=next_token))
|
||||
|
||||
def list_things(self):
|
||||
# previous_next_token = self._get_param("nextToken")
|
||||
# max_results = self._get_int_param("maxResults")
|
||||
attribute_name = self._get_param("attributeName")
|
||||
attribute_value = self._get_param("attributeValue")
|
||||
thing_type_name = self._get_param("thingTypeName")
|
||||
things = self.iot_backend.list_things(
|
||||
attribute_name=attribute_name,
|
||||
attribute_value=attribute_value,
|
||||
thing_type_name=thing_type_name,
|
||||
)
|
||||
# TODO: support next_token and max_results
|
||||
next_token = None
|
||||
return json.dumps(dict(things=[_.to_dict() for _ in things], nextToken=next_token))
|
||||
|
||||
def describe_thing(self):
|
||||
thing_name = self._get_param("thingName")
|
||||
thing = self.iot_backend.describe_thing(
|
||||
thing_name=thing_name,
|
||||
)
|
||||
print(thing.to_dict(include_default_client_id=True))
|
||||
return json.dumps(thing.to_dict(include_default_client_id=True))
|
||||
|
||||
def describe_thing_type(self):
|
||||
thing_type_name = self._get_param("thingTypeName")
|
||||
thing_type = self.iot_backend.describe_thing_type(
|
||||
thing_type_name=thing_type_name,
|
||||
)
|
||||
return json.dumps(thing_type.to_dict())
|
||||
|
||||
def delete_thing(self):
|
||||
thing_name = self._get_param("thingName")
|
||||
expected_version = self._get_param("expectedVersion")
|
||||
self.iot_backend.delete_thing(
|
||||
thing_name=thing_name,
|
||||
expected_version=expected_version,
|
||||
)
|
||||
return json.dumps(dict())
|
||||
|
||||
def delete_thing_type(self):
|
||||
thing_type_name = self._get_param("thingTypeName")
|
||||
self.iot_backend.delete_thing_type(
|
||||
thing_type_name=thing_type_name,
|
||||
)
|
||||
return json.dumps(dict())
|
||||
|
||||
def update_thing(self):
|
||||
thing_name = self._get_param("thingName")
|
||||
thing_type_name = self._get_param("thingTypeName")
|
||||
attribute_payload = self._get_param("attributePayload")
|
||||
expected_version = self._get_param("expectedVersion")
|
||||
remove_thing_type = self._get_param("removeThingType")
|
||||
self.iot_backend.update_thing(
|
||||
thing_name=thing_name,
|
||||
thing_type_name=thing_type_name,
|
||||
attribute_payload=attribute_payload,
|
||||
expected_version=expected_version,
|
||||
remove_thing_type=remove_thing_type,
|
||||
)
|
||||
return json.dumps(dict())
|
||||
|
||||
def create_keys_and_certificate(self):
|
||||
set_as_active = self._get_param("setAsActive")
|
||||
cert, key_pair = self.iot_backend.create_keys_and_certificate(
|
||||
set_as_active=set_as_active,
|
||||
)
|
||||
return json.dumps(dict(
|
||||
certificateArn=cert.arn,
|
||||
certificateId=cert.certificate_id,
|
||||
certificatePem=cert.certificate_pem,
|
||||
keyPair=key_pair
|
||||
))
|
||||
|
||||
def delete_certificate(self):
|
||||
certificate_id = self._get_param("certificateId")
|
||||
self.iot_backend.delete_certificate(
|
||||
certificate_id=certificate_id,
|
||||
)
|
||||
return json.dumps(dict())
|
||||
|
||||
def describe_certificate(self):
|
||||
certificate_id = self._get_param("certificateId")
|
||||
certificate = self.iot_backend.describe_certificate(
|
||||
certificate_id=certificate_id,
|
||||
)
|
||||
return json.dumps(dict(certificateDescription=certificate.to_description_dict()))
|
||||
|
||||
def list_certificates(self):
|
||||
# page_size = self._get_int_param("pageSize")
|
||||
# marker = self._get_param("marker")
|
||||
# ascending_order = self._get_param("ascendingOrder")
|
||||
certificates = self.iot_backend.list_certificates()
|
||||
# TODO: handle pagination
|
||||
return json.dumps(dict(certificates=[_.to_dict() for _ in certificates]))
|
||||
|
||||
def update_certificate(self):
|
||||
certificate_id = self._get_param("certificateId")
|
||||
new_status = self._get_param("newStatus")
|
||||
self.iot_backend.update_certificate(
|
||||
certificate_id=certificate_id,
|
||||
new_status=new_status,
|
||||
)
|
||||
return json.dumps(dict())
|
||||
|
||||
def create_policy(self):
|
||||
policy_name = self._get_param("policyName")
|
||||
policy_document = self._get_param("policyDocument")
|
||||
policy = self.iot_backend.create_policy(
|
||||
policy_name=policy_name,
|
||||
policy_document=policy_document,
|
||||
)
|
||||
return json.dumps(policy.to_dict_at_creation())
|
||||
|
||||
def list_policies(self):
|
||||
# marker = self._get_param("marker")
|
||||
# page_size = self._get_int_param("pageSize")
|
||||
# ascending_order = self._get_param("ascendingOrder")
|
||||
policies = self.iot_backend.list_policies()
|
||||
|
||||
# TODO: handle pagination
|
||||
return json.dumps(dict(policies=[_.to_dict() for _ in policies]))
|
||||
|
||||
def get_policy(self):
|
||||
policy_name = self._get_param("policyName")
|
||||
policy = self.iot_backend.get_policy(
|
||||
policy_name=policy_name,
|
||||
)
|
||||
return json.dumps(policy.to_get_dict())
|
||||
|
||||
def delete_policy(self):
|
||||
policy_name = self._get_param("policyName")
|
||||
self.iot_backend.delete_policy(
|
||||
policy_name=policy_name,
|
||||
)
|
||||
return json.dumps(dict())
|
||||
|
||||
def attach_principal_policy(self):
|
||||
policy_name = self._get_param("policyName")
|
||||
principal = self.headers.get('x-amzn-iot-principal')
|
||||
self.iot_backend.attach_principal_policy(
|
||||
policy_name=policy_name,
|
||||
principal_arn=principal,
|
||||
)
|
||||
return json.dumps(dict())
|
||||
|
||||
def detach_principal_policy(self):
|
||||
policy_name = self._get_param("policyName")
|
||||
principal = self.headers.get('x-amzn-iot-principal')
|
||||
self.iot_backend.detach_principal_policy(
|
||||
policy_name=policy_name,
|
||||
principal_arn=principal,
|
||||
)
|
||||
return json.dumps(dict())
|
||||
|
||||
def list_principal_policies(self):
|
||||
principal = self.headers.get('x-amzn-iot-principal')
|
||||
# marker = self._get_param("marker")
|
||||
# page_size = self._get_int_param("pageSize")
|
||||
# ascending_order = self._get_param("ascendingOrder")
|
||||
policies = self.iot_backend.list_principal_policies(
|
||||
principal_arn=principal
|
||||
)
|
||||
# TODO: handle pagination
|
||||
next_marker = None
|
||||
return json.dumps(dict(policies=[_.to_dict() for _ in policies], nextMarker=next_marker))
|
||||
|
||||
def list_policy_principals(self):
|
||||
policy_name = self.headers.get('x-amzn-iot-policy')
|
||||
# marker = self._get_param("marker")
|
||||
# page_size = self._get_int_param("pageSize")
|
||||
# ascending_order = self._get_param("ascendingOrder")
|
||||
principals = self.iot_backend.list_policy_principals(
|
||||
policy_name=policy_name,
|
||||
)
|
||||
# TODO: handle pagination
|
||||
next_marker = None
|
||||
return json.dumps(dict(principals=principals, nextMarker=next_marker))
|
||||
|
||||
def attach_thing_principal(self):
|
||||
thing_name = self._get_param("thingName")
|
||||
principal = self.headers.get('x-amzn-principal')
|
||||
self.iot_backend.attach_thing_principal(
|
||||
thing_name=thing_name,
|
||||
principal_arn=principal,
|
||||
)
|
||||
return json.dumps(dict())
|
||||
|
||||
def detach_thing_principal(self):
|
||||
thing_name = self._get_param("thingName")
|
||||
principal = self.headers.get('x-amzn-principal')
|
||||
self.iot_backend.detach_thing_principal(
|
||||
thing_name=thing_name,
|
||||
principal_arn=principal,
|
||||
)
|
||||
return json.dumps(dict())
|
||||
|
||||
def list_principal_things(self):
|
||||
next_token = self._get_param("nextToken")
|
||||
# max_results = self._get_int_param("maxResults")
|
||||
principal = self.headers.get('x-amzn-principal')
|
||||
things = self.iot_backend.list_principal_things(
|
||||
principal_arn=principal,
|
||||
)
|
||||
# TODO: handle pagination
|
||||
next_token = None
|
||||
return json.dumps(dict(things=things, nextToken=next_token))
|
||||
|
||||
def list_thing_principals(self):
|
||||
thing_name = self._get_param("thingName")
|
||||
principals = self.iot_backend.list_thing_principals(
|
||||
thing_name=thing_name,
|
||||
)
|
||||
return json.dumps(dict(principals=principals))
|
||||
14
moto/iot/urls.py
Normal file
14
moto/iot/urls.py
Normal file
|
|
@ -0,0 +1,14 @@
|
|||
from __future__ import unicode_literals
|
||||
from .responses import IoTResponse
|
||||
|
||||
url_bases = [
|
||||
"https?://iot.(.+).amazonaws.com",
|
||||
]
|
||||
|
||||
|
||||
response = IoTResponse()
|
||||
|
||||
|
||||
url_paths = {
|
||||
'{0}/.*$': response.dispatch,
|
||||
}
|
||||
Loading…
Add table
Add a link
Reference in a new issue