Implemented checking if S3 action is permitted.

This commit is contained in:
acsbendi 2019-07-04 16:38:43 +02:00
commit 5dbec8aee5
5 changed files with 260 additions and 38 deletions

View file

@ -1,8 +1,9 @@
import json
import re
from abc import ABC, abstractmethod
from enum import Enum
from botocore.auth import SigV4Auth
from botocore.auth import SigV4Auth, S3SigV4Auth
from botocore.awsrequest import AWSRequest
from botocore.credentials import Credentials
from moto.iam.models import ACCOUNT_ID, Policy
@ -10,6 +11,7 @@ from moto.iam.models import ACCOUNT_ID, Policy
from moto.iam import iam_backend
from moto.core.exceptions import SignatureDoesNotMatchError, AccessDeniedError, InvalidClientTokenIdError
from moto.s3.exceptions import BucketAccessDeniedError, S3AccessDeniedError
ACCESS_KEY_STORE = {
"AKIAJDULPKHCC4KGTYVA": {
@ -19,7 +21,7 @@ ACCESS_KEY_STORE = {
}
class IAMRequest:
class IAMRequestBase(ABC):
def __init__(self, method, path, data, headers):
print(f"Creating {IAMRequest.__name__} with method={method}, path={path}, data={data}, headers={headers}")
@ -56,12 +58,9 @@ class IAMRequest:
if not permitted:
self._raise_access_denied(iam_user_name)
@abstractmethod
def _raise_access_denied(self, iam_user_name):
raise AccessDeniedError(
account_id=ACCOUNT_ID,
iam_user_name=iam_user_name,
action=self._action
)
pass
@staticmethod
def _collect_policies_for_iam_user(iam_user_name):
@ -87,13 +86,9 @@ class IAMRequest:
return user_policies
def _create_auth(self):
if self._access_key not in ACCESS_KEY_STORE:
raise InvalidClientTokenIdError()
secret_key = ACCESS_KEY_STORE[self._access_key]["secret_access_key"]
credentials = Credentials(self._access_key, secret_key)
return SigV4Auth(credentials, self._service, self._region)
@abstractmethod
def _create_auth(self, credentials):
pass
@staticmethod
def _create_headers_for_aws_request(signed_headers, original_headers):
@ -112,7 +107,12 @@ class IAMRequest:
return request
def _calculate_signature(self):
auth = self._create_auth()
if self._access_key not in ACCESS_KEY_STORE:
raise InvalidClientTokenIdError()
secret_key = ACCESS_KEY_STORE[self._access_key]["secret_access_key"]
credentials = Credentials(self._access_key, secret_key)
auth = self._create_auth(credentials)
request = self._create_aws_request()
canonical_request = auth.canonical_request(request)
string_to_sign = auth.string_to_sign(request, canonical_request)
@ -123,6 +123,31 @@ class IAMRequest:
return string.partition(first_separator)[2].partition(second_separator)[0]
class IAMRequest(IAMRequestBase):
def _create_auth(self, credentials):
return SigV4Auth(credentials, self._service, self._region)
def _raise_access_denied(self, iam_user_name):
raise AccessDeniedError(
account_id=ACCOUNT_ID,
iam_user_name=iam_user_name,
action=self._action
)
class S3IAMRequest(IAMRequestBase):
def _create_auth(self, credentials):
return S3SigV4Auth(credentials, self._service, self._region)
def _raise_access_denied(self, _):
if "BucketName" in self._data:
raise BucketAccessDeniedError(bucket=self._data["BucketName"])
else:
raise S3AccessDeniedError()
class IAMPolicy:
def __init__(self, policy):