Merge pull request #1913 from cm-iwata/add_iot_delete_validation

Fix #1908 add some validation for IoT delete operations
This commit is contained in:
Steve Pulec 2018-12-28 19:52:24 -05:00 committed by GitHub
commit 811197a9bb
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 138 additions and 1 deletions

View file

@ -31,3 +31,20 @@ class VersionConflictException(IoTClientError):
'VersionConflictException',
'The version for thing %s does not match the expected version.' % name
)
class CertificateStateException(IoTClientError):
def __init__(self, msg, cert_id):
self.code = 406
super(CertificateStateException, self).__init__(
'CertificateStateException',
'%s Id: %s' % (msg, cert_id)
)
class DeleteConflictException(IoTClientError):
def __init__(self, msg):
self.code = 409
super(DeleteConflictException, self).__init__(
'DeleteConflictException', msg
)

View file

@ -13,6 +13,8 @@ import boto3
from moto.core import BaseBackend, BaseModel
from .exceptions import (
CertificateStateException,
DeleteConflictException,
ResourceNotFoundException,
InvalidRequestException,
VersionConflictException
@ -378,7 +380,25 @@ class IoTBackend(BaseBackend):
return certificate, key_pair
def delete_certificate(self, certificate_id):
self.describe_certificate(certificate_id)
cert = self.describe_certificate(certificate_id)
if cert.status == 'ACTIVE':
raise CertificateStateException(
'Certificate must be deactivated (not ACTIVE) before deletion.', certificate_id)
certs = [k[0] for k, v in self.principal_things.items()
if self._get_principal(k[0]).certificate_id == certificate_id]
if len(certs) > 0:
raise DeleteConflictException(
'Things must be detached before deletion (arn: %s)' % certs[0]
)
certs = [k[0] for k, v in self.principal_policies.items()
if self._get_principal(k[0]).certificate_id == certificate_id]
if len(certs) > 0:
raise DeleteConflictException(
'Certificate policies must be detached before deletion (arn: %s)' % certs[0]
)
del self.certificates[certificate_id]
def describe_certificate(self, certificate_id):
@ -411,6 +431,14 @@ class IoTBackend(BaseBackend):
return policies[0]
def delete_policy(self, policy_name):
policies = [k[1] for k, v in self.principal_policies.items() if k[1] == policy_name]
if len(policies) > 0:
raise DeleteConflictException(
'The policy cannot be deleted as the policy is attached to one or more principals (name=%s)'
% policy_name
)
policy = self.get_policy(policy_name)
del self.policies[policy.name]