From 36d8f118e36a7cbdb1dd99667f599fd91aa26bcc Mon Sep 17 00:00:00 2001 From: Stephan Huber Date: Wed, 24 Oct 2018 14:53:08 +0200 Subject: [PATCH] implement `attach_policy`, `detach_policy` and `list_attached_policy` --- moto/iot/models.py | 22 +++++++++++++++++++ moto/iot/responses.py | 30 ++++++++++++++++++++++++++ tests/test_iot/test_iot.py | 44 ++++++++++++++++++++++++++++++++++++++ 3 files changed, 96 insertions(+) diff --git a/moto/iot/models.py b/moto/iot/models.py index 931af192..4789e045 100644 --- a/moto/iot/models.py +++ b/moto/iot/models.py @@ -400,6 +400,28 @@ class IoTBackend(BaseBackend): self.policies[policy.name] = policy return policy + def attach_policy(self, policy_name, target): + principal = self._get_principal(target) + policy = self.get_policy(policy_name) + k = (target, policy_name) + if k in self.principal_policies: + return + self.principal_policies[k] = (principal, policy) + + def detach_policy(self, policy_name, target): + # this may raises ResourceNotFoundException + self._get_principal(target) + self.get_policy(policy_name) + + k = (target, policy_name) + if k not in self.principal_policies: + raise ResourceNotFoundException() + del self.principal_policies[k] + + def list_attached_policies(self, target): + policies = [v[1] for k, v in self.principal_policies.items() if k[0] == target] + return policies + def list_policies(self): policies = self.policies.values() return policies diff --git a/moto/iot/responses.py b/moto/iot/responses.py index c71d4942..f5e25fdb 100644 --- a/moto/iot/responses.py +++ b/moto/iot/responses.py @@ -1,6 +1,7 @@ from __future__ import unicode_literals import json +from urllib.parse import unquote from moto.core.responses import BaseResponse from .models import iot_backends @@ -234,6 +235,35 @@ class IoTResponse(BaseResponse): ) return json.dumps(dict()) + def attach_policy(self): + policy_name = self._get_param("policyName") + principal = self._get_param('target') + self.iot_backend.attach_policy( + policy_name=policy_name, + target=principal, + ) + return json.dumps(dict()) + + def detach_policy(self): + policy_name = self._get_param("policyName") + principal = self._get_param('target') + self.iot_backend.detach_policy( + policy_name=policy_name, + target=principal, + ) + return json.dumps(dict()) + + def list_attached_policies(self): + principal = unquote(self._get_param('target')) + # marker = self._get_param("marker") + # page_size = self._get_int_param("pageSize") + policies = self.iot_backend.list_attached_policies( + target=principal + ) + # TODO: implement pagination in the future + next_marker = None + return json.dumps(dict(policies=[_.to_dict() for _ in policies], nextMarker=next_marker)) + def attach_principal_policy(self): policy_name = self._get_param("policyName") principal = self.headers.get('x-amzn-iot-principal') diff --git a/tests/test_iot/test_iot.py b/tests/test_iot/test_iot.py index 759c7d3c..7fbd6696 100644 --- a/tests/test_iot/test_iot.py +++ b/tests/test_iot/test_iot.py @@ -8,6 +8,50 @@ import boto3 from moto import mock_iot +@mock_iot +def test_attach_policy(): + client = boto3.client('iot', region_name='ap-northeast-1') + policy_name = 'my-policy' + doc = '{}' + + cert = client.create_keys_and_certificate(setAsActive=True) + cert_arn = cert['certificateArn'] + client.create_policy(policyName=policy_name, policyDocument=doc) + client.attach_policy(policyName=policy_name, target=cert_arn) + + res = client.list_attached_policies(target=cert_arn) + res.should.have.key('policies').which.should.have.length_of(1) + res['policies'][0]['policyName'].should.equal('my-policy') + + +@mock_iot +def test_detach_policy(): + client = boto3.client('iot', region_name='ap-northeast-1') + policy_name = 'my-policy' + doc = '{}' + + cert = client.create_keys_and_certificate(setAsActive=True) + cert_arn = cert['certificateArn'] + client.create_policy(policyName=policy_name, policyDocument=doc) + client.attach_policy(policyName=policy_name, target=cert_arn) + + res = client.list_attached_policies(target=cert_arn) + res.should.have.key('policies').which.should.have.length_of(1) + res['policies'][0]['policyName'].should.equal('my-policy') + + client.detach_policy(policyName=policy_name, target=cert_arn) + res = client.list_attached_policies(target=cert_arn) + res.should.have.key('policies').which.should.be.empty + + +@mock_iot +def test_list_attached_policies(): + client = boto3.client('iot', region_name='ap-northeast-1') + cert = client.create_keys_and_certificate(setAsActive=True) + policies = client.list_attached_policies(target=cert['certificateArn']) + policies['policies'].should.be.empty + + @mock_iot def test_things(): client = boto3.client('iot', region_name='ap-northeast-1')