IAM Role Tagging support

This commit is contained in:
Mike Grima 2019-01-29 18:09:31 -08:00
commit 1a36c0c377
6 changed files with 426 additions and 16 deletions

View file

@ -32,3 +32,48 @@ class MalformedCertificate(RESTError):
def __init__(self, cert):
super(MalformedCertificate, self).__init__(
'MalformedCertificate', 'Certificate {cert} is malformed'.format(cert=cert))
class DuplicateTags(RESTError):
code = 400
def __init__(self):
super(DuplicateTags, self).__init__(
'InvalidInput', 'Duplicate tag keys found. Please note that Tag keys are case insensitive.')
class TagKeyTooBig(RESTError):
code = 400
def __init__(self, tag, param='tags.X.member.key'):
super(TagKeyTooBig, self).__init__(
'ValidationError', "1 validation error detected: Value '{}' at '{}' failed to satisfy "
"constraint: Member must have length less than or equal to 128.".format(tag, param))
class TagValueTooBig(RESTError):
code = 400
def __init__(self, tag):
super(TagValueTooBig, self).__init__(
'ValidationError', "1 validation error detected: Value '{}' at 'tags.X.member.value' failed to satisfy "
"constraint: Member must have length less than or equal to 256.".format(tag))
class InvalidTagCharacters(RESTError):
code = 400
def __init__(self, tag, param='tags.X.member.key'):
message = "1 validation error detected: Value '{}' at '{}' failed to satisfy ".format(tag, param)
message += "constraint: Member must satisfy regular expression pattern: [\\p{L}\\p{Z}\\p{N}_.:/=+\\-@]+"
super(InvalidTagCharacters, self).__init__('ValidationError', message)
class TooManyTags(RESTError):
code = 400
def __init__(self, tags, param='tags'):
super(TooManyTags, self).__init__(
'ValidationError', "1 validation error detected: Value '{}' at '{}' failed to satisfy "
"constraint: Member must have length less than or equal to 50.".format(tags, param))

View file

@ -3,6 +3,7 @@ import base64
import sys
from datetime import datetime
import json
import re
from cryptography import x509
from cryptography.hazmat.backends import default_backend
@ -12,7 +13,8 @@ from moto.core import BaseBackend, BaseModel
from moto.core.utils import iso_8601_datetime_without_milliseconds
from .aws_managed_policies import aws_managed_policies_data
from .exceptions import IAMNotFoundException, IAMConflictException, IAMReportNotPresentException, MalformedCertificate
from .exceptions import IAMNotFoundException, IAMConflictException, IAMReportNotPresentException, MalformedCertificate, \
DuplicateTags, TagKeyTooBig, InvalidTagCharacters, TooManyTags, TagValueTooBig
from .utils import random_access_key, random_alphanumeric, random_resource_id, random_policy_id
ACCOUNT_ID = 123456789012
@ -32,7 +34,6 @@ class MFADevice(object):
class Policy(BaseModel):
is_attachable = False
def __init__(self,
@ -132,6 +133,7 @@ class Role(BaseModel):
self.policies = {}
self.managed_policies = {}
self.create_date = datetime.now(pytz.utc)
self.tags = {}
@classmethod
def create_from_cloudformation_json(cls, resource_name, cloudformation_json, region_name):
@ -175,6 +177,9 @@ class Role(BaseModel):
raise NotImplementedError('"Fn::GetAtt" : [ "{0}" , "Arn" ]"')
raise UnformattedGetAttTemplateException()
def get_tags(self):
return [self.tags[tag] for tag in self.tags]
class InstanceProfile(BaseModel):
@ -614,6 +619,86 @@ class IAMBackend(BaseBackend):
role = self.get_role(role_name)
return role.policies.keys()
def _validate_tag_key(self, tag_key, exception_param='tags.X.member.key'):
"""Validates the tag key.
:param all_tags: Dict to check if there is a duplicate tag.
:param tag_key: The tag key to check against.
:param exception_param: The exception parameter to send over to help format the message. This is to reflect
the difference between the tag and untag APIs.
:return:
"""
# Validate that the key length is correct:
if len(tag_key) > 128:
raise TagKeyTooBig(tag_key, param=exception_param)
# Validate that the tag key fits the proper Regex:
# [\w\s_.:/=+\-@]+ SHOULD be the same as the Java regex on the AWS documentation: [\p{L}\p{Z}\p{N}_.:/=+\-@]+
match = re.findall(r'[\w\s_.:/=+\-@]+', tag_key)
# Kudos if you can come up with a better way of doing a global search :)
if not len(match) or len(match[0]) < len(tag_key):
raise InvalidTagCharacters(tag_key, param=exception_param)
def _check_tag_duplicate(self, all_tags, tag_key):
"""Validates that a tag key is not a duplicate
:param all_tags: Dict to check if there is a duplicate tag.
:param tag_key: The tag key to check against.
:return:
"""
if tag_key in all_tags:
raise DuplicateTags()
def list_role_tags(self, role_name, marker, max_items=100):
role = self.get_role(role_name)
max_items = int(max_items)
tag_index = sorted(role.tags)
start_idx = int(marker) if marker else 0
tag_index = tag_index[start_idx:start_idx + max_items]
if len(role.tags) <= (start_idx + max_items):
marker = None
else:
marker = str(start_idx + max_items)
# Make the tag list of dict's:
tags = [role.tags[tag] for tag in tag_index]
return tags, marker
def tag_role(self, role_name, tags):
if len(tags) > 50:
raise TooManyTags(tags)
role = self.get_role(role_name)
tag_keys = {}
for tag in tags:
# Need to index by the lowercase tag key since the keys are case insensitive, but their case is retained.
ref_key = tag['Key'].lower()
self._check_tag_duplicate(tag_keys, ref_key)
self._validate_tag_key(tag['Key'])
if len(tag['Value']) > 256:
raise TagValueTooBig(tag['Value'])
tag_keys[ref_key] = tag
role.tags.update(tag_keys)
def untag_role(self, role_name, tag_keys):
if len(tag_keys) > 50:
raise TooManyTags(tag_keys, param='tagKeys')
role = self.get_role(role_name)
for key in tag_keys:
ref_key = key.lower()
self._validate_tag_key(key, exception_param='tagKeys')
role.tags.pop(ref_key, None)
def create_policy_version(self, policy_arn, policy_document, set_as_default):
policy = self.get_policy(policy_arn)
if not policy:

View file

@ -625,6 +625,34 @@ class IamResponse(BaseResponse):
template = self.response_template(LIST_SIGNING_CERTIFICATES_TEMPLATE)
return template.render(user_name=user_name, certificates=certs)
def list_role_tags(self):
role_name = self._get_param('RoleName')
marker = self._get_param('Marker')
max_items = self._get_param('MaxItems', 100)
tags, marker = iam_backend.list_role_tags(role_name, marker, max_items)
template = self.response_template(LIST_ROLE_TAG_TEMPLATE)
return template.render(tags=tags, marker=marker)
def tag_role(self):
role_name = self._get_param('RoleName')
tags = self._get_multi_param('Tags.member')
iam_backend.tag_role(role_name, tags)
template = self.response_template(TAG_ROLE_TEMPLATE)
return template.render()
def untag_role(self):
role_name = self._get_param('RoleName')
tag_keys = self._get_multi_param('TagKeys.member')
iam_backend.untag_role(role_name, tag_keys)
template = self.response_template(UNTAG_ROLE_TEMPLATE)
return template.render()
ATTACH_ROLE_POLICY_TEMPLATE = """<AttachRolePolicyResponse>
<ResponseMetadata>
@ -878,6 +906,16 @@ GET_ROLE_TEMPLATE = """<GetRoleResponse xmlns="https://iam.amazonaws.com/doc/201
<AssumeRolePolicyDocument>{{ role.assume_role_policy_document }}</AssumeRolePolicyDocument>
<CreateDate>{{ role.create_date }}</CreateDate>
<RoleId>{{ role.id }}</RoleId>
{% if role.tags %}
<Tags>
{% for tag in role.get_tags() %}
<member>
<Key>{{ tag['Key'] }}</Key>
<Value>{{ tag['Value'] }}</Value>
</member>
{% endfor %}
</Tags>
{% endif %}
</Role>
</GetRoleResult>
<ResponseMetadata>
@ -1503,6 +1541,14 @@ GET_ACCOUNT_AUTHORIZATION_DETAILS_TEMPLATE = """<GetAccountAuthorizationDetailsR
</member>
{% endfor %}
</AttachedManagedPolicies>
<Tags>
{% for tag in role.get_tags() %}
<member>
<Key>{{ tag['Key'] }}</Key>
<Value>{{ tag['Value'] }}</Value>
</member>
{% endfor %}
</Tags>
<InstanceProfileList>
{% for profile in instance_profiles %}
<member>
@ -1671,3 +1717,38 @@ LIST_SIGNING_CERTIFICATES_TEMPLATE = """<ListSigningCertificatesResponse>
<RequestId>7a62c49f-347e-4fc4-9331-6e8eEXAMPLE</RequestId>
</ResponseMetadata>
</ListSigningCertificatesResponse>"""
TAG_ROLE_TEMPLATE = """<TagRoleResponse xmlns="https://iam.amazonaws.com/doc/2010-05-08/">
<ResponseMetadata>
<RequestId>EXAMPLE8-90ab-cdef-fedc-ba987EXAMPLE</RequestId>
</ResponseMetadata>
</TagRoleResponse>"""
LIST_ROLE_TAG_TEMPLATE = """<ListRoleTagsResponse xmlns="https://iam.amazonaws.com/doc/2010-05-08/">
<ListRoleTagsResult>
<IsTruncated>{{ 'true' if marker else 'false' }}</IsTruncated>
{% if marker %}
<Marker>{{ marker }}</Marker>
{% endif %}
<Tags>
{% for tag in tags %}
<member>
<Key>{{ tag['Key'] }}</Key>
<Value>{{ tag['Value'] }}</Value>
</member>
{% endfor %}
</Tags>
</ListRoleTagsResult>
<ResponseMetadata>
<RequestId>EXAMPLE8-90ab-cdef-fedc-ba987EXAMPLE</RequestId>
</ResponseMetadata>
</ListRoleTagsResponse>"""
UNTAG_ROLE_TEMPLATE = """<UntagRoleResponse xmlns="https://iam.amazonaws.com/doc/2010-05-08/">
<ResponseMetadata>
<RequestId>EXAMPLE8-90ab-cdef-fedc-ba987EXAMPLE</RequestId>
</ResponseMetadata>
</UntagRoleResponse>"""