Fix sqs permission handling & add more error handling

This commit is contained in:
gruebel 2020-01-30 22:42:27 +01:00
commit 44024ab74b
3 changed files with 301 additions and 19 deletions

View file

@ -30,6 +30,9 @@ from .exceptions import (
BatchEntryIdsNotDistinct,
TooManyEntriesInBatchRequest,
InvalidAttributeName,
InvalidParameterValue,
MissingParameter,
OverLimit,
)
from moto.core import ACCOUNT_ID as DEFAULT_ACCOUNT_ID
@ -183,6 +186,7 @@ class Queue(BaseModel):
"MaximumMessageSize",
"MessageRetentionPeriod",
"QueueArn",
"Policy",
"RedrivePolicy",
"ReceiveMessageWaitTimeSeconds",
"VisibilityTimeout",
@ -195,6 +199,8 @@ class Queue(BaseModel):
"DeleteMessage",
"GetQueueAttributes",
"GetQueueUrl",
"ListDeadLetterSourceQueues",
"PurgeQueue",
"ReceiveMessage",
"SendMessage",
)
@ -273,7 +279,7 @@ class Queue(BaseModel):
if key in bool_fields:
value = value == "true"
if key == "RedrivePolicy" and value is not None:
if key in ["Policy", "RedrivePolicy"] and value is not None:
continue
setattr(self, camelcase_to_underscores(key), value)
@ -281,6 +287,9 @@ class Queue(BaseModel):
if attributes.get("RedrivePolicy", None):
self._setup_dlq(attributes["RedrivePolicy"])
if attributes.get("Policy"):
self.policy = attributes["Policy"]
self.last_modified_timestamp = now
def _setup_dlq(self, policy):
@ -472,6 +481,24 @@ class Queue(BaseModel):
return self.name
raise UnformattedGetAttTemplateException()
@property
def policy(self):
if self._policy_json.get("Statement"):
return json.dumps(self._policy_json)
else:
return None
@policy.setter
def policy(self, policy):
if policy:
self._policy_json = json.loads(policy)
else:
self._policy_json = {
"Version": "2012-10-17",
"Id": "{}/SQSDefaultPolicy".format(self.queue_arn),
"Statement": [],
}
class SQSBackend(BaseBackend):
def __init__(self, region_name):
@ -802,25 +829,75 @@ class SQSBackend(BaseBackend):
def add_permission(self, queue_name, actions, account_ids, label):
queue = self.get_queue(queue_name)
if actions is None or len(actions) == 0:
raise RESTError("InvalidParameterValue", "Need at least one Action")
if account_ids is None or len(account_ids) == 0:
raise RESTError("InvalidParameterValue", "Need at least one Account ID")
if not actions:
raise MissingParameter()
if not all([item in Queue.ALLOWED_PERMISSIONS for item in actions]):
raise RESTError("InvalidParameterValue", "Invalid permissions")
if not account_ids:
raise InvalidParameterValue(
"Value [] for parameter PrincipalId is invalid. Reason: Unable to verify."
)
queue.permissions[label] = (account_ids, actions)
count = len(actions)
if count > 7:
raise OverLimit(count)
invalid_action = next(
(action for action in actions if action not in Queue.ALLOWED_PERMISSIONS),
None,
)
if invalid_action:
raise InvalidParameterValue(
"Value SQS:{} for parameter ActionName is invalid. "
"Reason: Only the queue owner is allowed to invoke this action.".format(
invalid_action
)
)
policy = queue._policy_json
statement = next(
(
statement
for statement in policy["Statement"]
if statement["Sid"] == label
),
None,
)
if statement:
raise InvalidParameterValue(
"Value {} for parameter Label is invalid. "
"Reason: Already exists.".format(label)
)
principals = [
"arn:aws:iam::{}:root".format(account_id) for account_id in account_ids
]
actions = ["SQS:{}".format(action) for action in actions]
statement = {
"Sid": label,
"Effect": "Allow",
"Principal": {"AWS": principals[0] if len(principals) == 1 else principals},
"Action": actions[0] if len(actions) == 1 else actions,
"Resource": queue.queue_arn,
}
queue._policy_json["Statement"].append(statement)
def remove_permission(self, queue_name, label):
queue = self.get_queue(queue_name)
if label not in queue.permissions:
raise RESTError(
"InvalidParameterValue", "Permission doesnt exist for the given label"
statements = queue._policy_json["Statement"]
statements_new = [
statement for statement in statements if statement["Sid"] != label
]
if len(statements) == len(statements_new):
raise InvalidParameterValue(
"Value {} for parameter Label is invalid. "
"Reason: can't find label on existing policy.".format(label)
)
del queue.permissions[label]
queue._policy_json["Statement"] = statements_new
def tag_queue(self, queue_name, tags):
queue = self.get_queue(queue_name)