From ef20b47f979324eed9ed3992e96efe8e2eefa431 Mon Sep 17 00:00:00 2001 From: acsbendi Date: Sun, 30 Jun 2019 17:09:55 +0200 Subject: [PATCH] Implemented checking policy documents for syntax errors. --- moto/iam/exceptions.py | 8 ++ moto/iam/models.py | 4 + moto/iam/policy_validation.py | 148 ++++++++++++++++++++++++++++++++++ 3 files changed, 160 insertions(+) create mode 100644 moto/iam/policy_validation.py diff --git a/moto/iam/exceptions.py b/moto/iam/exceptions.py index 5b13277d..4b11b0e4 100644 --- a/moto/iam/exceptions.py +++ b/moto/iam/exceptions.py @@ -34,6 +34,14 @@ class MalformedCertificate(RESTError): 'MalformedCertificate', 'Certificate {cert} is malformed'.format(cert=cert)) +class MalformedPolicyDocument(RESTError): + code = 400 + + def __init__(self, message=""): + super(MalformedPolicyDocument, self).__init__( + 'MalformedPolicyDocument', message) + + class DuplicateTags(RESTError): code = 400 diff --git a/moto/iam/models.py b/moto/iam/models.py index 86eec73f..dbd66e80 100644 --- a/moto/iam/models.py +++ b/moto/iam/models.py @@ -11,6 +11,7 @@ from cryptography.hazmat.backends import default_backend from moto.core.exceptions import RESTError from moto.core import BaseBackend, BaseModel from moto.core.utils import iso_8601_datetime_without_milliseconds, iso_8601_datetime_with_milliseconds +from moto.iam.policy_validation import IAMPolicyDocumentValidator from .aws_managed_policies import aws_managed_policies_data from .exceptions import IAMNotFoundException, IAMConflictException, IAMReportNotPresentException, MalformedCertificate, \ @@ -568,6 +569,9 @@ class IAMBackend(BaseBackend): policy.detach_from(self.get_user(user_name)) def create_policy(self, description, path, policy_document, policy_name): + iam_policy_document_validator = IAMPolicyDocumentValidator(policy_document) + iam_policy_document_validator.validate() + policy = ManagedPolicy( policy_name, description=description, diff --git a/moto/iam/policy_validation.py b/moto/iam/policy_validation.py new file mode 100644 index 00000000..b7b9f26d --- /dev/null +++ b/moto/iam/policy_validation.py @@ -0,0 +1,148 @@ +import json + +from six import string_types + +from moto.iam.exceptions import MalformedPolicyDocument + + +ALLOWED_TOP_ELEMENTS = [ + "Version", + "Id", + "Statement", + "Conditions" +] + +ALLOWED_VERSIONS = [ + "2008-10-17", + "2012-10-17" +] + +ALLOWED_STATEMENT_ELEMENTS = [ + "Sid", + "Action", + "NotAction", + "Resource", + "NotResource", + "Effect", + "Condition" +] + +ALLOWED_EFFECTS = [ + "Allow", + "Deny" +] + + +class IAMPolicyDocumentValidator: + + def __init__(self, policy_document): + self._policy_document = policy_document + self._policy_json = {} + self._statements = [] + + def validate(self): + try: + self._validate_syntax() + except Exception: + raise MalformedPolicyDocument("Syntax errors in policy.") + try: + self._validate_version() + except Exception: + raise MalformedPolicyDocument("Policy document must be version 2012-10-17 or greater.") + try: + self._validate_resource_exist() + except Exception: + raise MalformedPolicyDocument("Policy statement must contain resources.") + + def _validate_syntax(self): + self._policy_json = json.loads(self._policy_document) + assert isinstance(self._policy_json, dict) + self._validate_top_elements() + self._validate_version_syntax() + self._validate_id_syntax() + self._validate_statements_syntax() + + def _validate_top_elements(self): + top_elements = self._policy_json.keys() + for element in top_elements: + assert element in ALLOWED_TOP_ELEMENTS + + def _validate_version_syntax(self): + if "Version" in self._policy_json: + assert self._policy_json["Version"] in ALLOWED_VERSIONS + + def _validate_version(self): + assert self._policy_json["Version"] == "2012-10-17" + + def _validate_statements_syntax(self): + assert "Statement" in self._policy_json + assert isinstance(self._policy_json["Statement"], (dict, list)) + + if isinstance(self._policy_json["Statement"], dict): + self._statements.append(self._policy_json["Statement"]) + else: + self._statements += self._policy_json["Statement"] + + assert self._statements + for statement in self._statements: + self._validate_statement_syntax(statement) + + @staticmethod + def _validate_statement_syntax(statement): + assert isinstance(statement, dict) + for statement_element in statement.keys(): + assert statement_element in ALLOWED_STATEMENT_ELEMENTS + + assert ("Resource" not in statement or "NotResource" not in statement) + assert ("Action" not in statement or "NotAction" not in statement) + + IAMPolicyDocumentValidator._validate_effect_syntax(statement) + IAMPolicyDocumentValidator._validate_resource_syntax(statement) + IAMPolicyDocumentValidator._validate_not_resource_syntax(statement) + IAMPolicyDocumentValidator._validate_condition_syntax(statement) + IAMPolicyDocumentValidator._validate_sid_syntax(statement) + + @staticmethod + def _validate_effect_syntax(statement): + assert "Effect" in statement + assert isinstance(statement["Effect"], string_types) + assert statement["Effect"].lower() in [allowed_effect.lower() for allowed_effect in ALLOWED_EFFECTS] + + @staticmethod + def _validate_resource_syntax(statement): + IAMPolicyDocumentValidator._validate_resource_like_syntax(statement, "Resource") + + @staticmethod + def _validate_not_resource_syntax(statement): + IAMPolicyDocumentValidator._validate_resource_like_syntax(statement, "NotResource") + + @staticmethod + def _validate_resource_like_syntax(statement, key): + if key in statement: + assert isinstance(statement[key], (string_types, list)) + if isinstance(statement[key], list): + for resource in statement[key]: + assert isinstance(resource, string_types) + + @staticmethod + def _validate_condition_syntax(statement): + if "Condition" in statement: + assert isinstance(statement["Condition"], dict) + for condition_key, condition_value in statement["Condition"].items(): + assert isinstance(condition_value, dict) + for condition_data_key, condition_data_value in condition_value.items(): + assert isinstance(condition_data_value, (list, string_types)) + + @staticmethod + def _validate_sid_syntax(statement): + if "Sid" in statement: + assert isinstance(statement["Sid"], string_types) + + def _validate_id_syntax(self): + if "Id" in self._policy_document: + assert isinstance(self._policy_document["Id"], string_types) + + def _validate_resource_exist(self): + for statement in self._statements: + assert "Resource" in statement +