Add KMS policy actions

Adds the following to the KMS service
* PutKeyPolicy
* GetKeyPolicy
* ListKeyPolicies

Signed-off-by: Jesse Szwedko <jesse.szwedko@getbraintree.com>
This commit is contained in:
Andrew Garrett 2015-11-25 19:50:55 +00:00 committed by Jesse Szwedko
commit 136f622b3b
3 changed files with 95 additions and 0 deletions

View file

@ -78,6 +78,12 @@ class KmsBackend(BaseBackend):
def get_key_rotation_status(self, key_id):
return self.keys[key_id].key_rotation_status
def put_key_policy(self, key_id, policy):
self.keys[key_id].policy = policy
def get_key_policy(self, key_id):
return self.keys[key_id].policy
kms_backends = {}
for region in boto.kms.regions():

View file

@ -171,6 +171,53 @@ class KmsResponse(BaseResponse):
'__type': 'NotFoundException'})
return json.dumps({'KeyRotationEnabled': rotation_enabled})
def put_key_policy(self):
key_id = self.parameters.get('KeyId')
policy_name = self.parameters.get('PolicyName')
policy = self.parameters.get('Policy')
_assert_valid_key_id(key_id)
_assert_default_policy(policy_name)
try:
self.kms_backend.put_key_policy(key_id, policy)
except KeyError:
raise JSONResponseError(404, 'Not Found', body={
'message': "Key 'arn:aws:kms:{region}:012345678912:key/{key_id}' does not exist".format(region=self.region,key_id=key_id),
'__type': 'NotFoundException'})
return json.dumps(None)
def get_key_policy(self):
key_id = self.parameters.get('KeyId')
policy_name = self.parameters.get('PolicyName')
_assert_valid_key_id(key_id)
_assert_default_policy(policy_name)
try:
return json.dumps({'Policy': self.kms_backend.get_key_policy(key_id)})
except KeyError:
raise JSONResponseError(404, 'Not Found', body={
'message': "Key 'arn:aws:kms:{region}:012345678912:key/{key_id}' does not exist".format(region=self.region,key_id=key_id),
'__type': 'NotFoundException'})
def list_key_policies(self):
key_id = self.parameters.get('KeyId')
_assert_valid_key_id(key_id)
try:
self.kms_backend.describe_key(key_id)
except KeyError:
raise JSONResponseError(404, 'Not Found', body={
'message': "Key 'arn:aws:kms:{region}:012345678912:key/{key_id}' does not exist".format(region=self.region,key_id=key_id),
'__type': 'NotFoundException'})
return json.dumps({'Truncated': False, 'PolicyNames': ['default']})
def _assert_valid_key_id(key_id):
if not re.match(r'^[A-F0-9]{8}-[A-F0-9]{4}-[A-F0-9]{4}-[A-F0-9]{4}-[A-F0-9]{12}$', key_id, re.IGNORECASE):
raise JSONResponseError(404, 'Not Found', body={'message': ' Invalid keyId', '__type': 'NotFoundException'})
def _assert_default_policy(policy_name):
if policy_name != 'default':
raise JSONResponseError(404, 'Not Found', body={
'message': "No such policy exists",
'__type': 'NotFoundException'})