Merge pull request #1151 from JackDanger/jack/enforce-s3-acls

enforce s3 acls
This commit is contained in:
Jack Danger 2017-09-16 20:16:54 -07:00 committed by GitHub
commit 2937cf4c45
5 changed files with 119 additions and 23 deletions

View file

@ -16,6 +16,7 @@ import boto3
from botocore.client import ClientError
import botocore.exceptions
from boto.exception import S3CreateError, S3ResponseError
from botocore.handlers import disable_signing
from boto.s3.connection import S3Connection
from boto.s3.key import Key
from freezegun import freeze_time
@ -864,6 +865,45 @@ def test_bucket_acl_switching():
g.permission == 'READ' for g in grants), grants
@mock_s3
def test_s3_object_in_public_bucket():
s3 = boto3.resource('s3')
bucket = s3.Bucket('test-bucket')
bucket.create(ACL='public-read')
bucket.put_object(ACL='public-read', Body=b'ABCD', Key='file.txt')
s3_anonymous = boto3.resource('s3')
s3_anonymous.meta.client.meta.events.register('choose-signer.s3.*', disable_signing)
contents = s3_anonymous.Object(key='file.txt', bucket_name='test-bucket').get()['Body'].read()
contents.should.equal(b'ABCD')
bucket.put_object(ACL='private', Body=b'ABCD', Key='file.txt')
with assert_raises(ClientError) as exc:
s3_anonymous.Object(key='file.txt', bucket_name='test-bucket').get()
exc.exception.response['Error']['Code'].should.equal('403')
@mock_s3
def test_s3_object_in_private_bucket():
s3 = boto3.resource('s3')
bucket = s3.Bucket('test-bucket')
bucket.create(ACL='private')
bucket.put_object(ACL='private', Body=b'ABCD', Key='file.txt')
s3_anonymous = boto3.resource('s3')
s3_anonymous.meta.client.meta.events.register('choose-signer.s3.*', disable_signing)
with assert_raises(ClientError) as exc:
s3_anonymous.Object(key='file.txt', bucket_name='test-bucket').get()
exc.exception.response['Error']['Code'].should.equal('403')
bucket.put_object(ACL='public-read', Body=b'ABCD', Key='file.txt')
contents = s3_anonymous.Object(key='file.txt', bucket_name='test-bucket').get()['Body'].read()
contents.should.equal(b'ABCD')
@mock_s3_deprecated
def test_unicode_key():
conn = boto.connect_s3()

View file

@ -3,6 +3,7 @@
from __future__ import unicode_literals
import sure # noqa
from flask.testing import FlaskClient
import moto.server as server
'''
@ -10,18 +11,28 @@ Test the different server responses
'''
def test_s3_server_get():
backend = server.create_backend_app("s3")
test_client = backend.test_client()
class AuthenticatedClient(FlaskClient):
def open(self, *args, **kwargs):
kwargs['headers'] = kwargs.get('headers', {})
kwargs['headers']['Authorization'] = "Any authorization header"
return super(AuthenticatedClient, self).open(*args, **kwargs)
def authenticated_client():
backend = server.create_backend_app("s3")
backend.test_client_class = AuthenticatedClient
return backend.test_client()
def test_s3_server_get():
test_client = authenticated_client()
res = test_client.get('/')
res.data.should.contain(b'ListAllMyBucketsResult')
def test_s3_server_bucket_create():
backend = server.create_backend_app("s3")
test_client = backend.test_client()
test_client = authenticated_client()
res = test_client.put('/', 'http://foobaz.localhost:5000/')
res.status_code.should.equal(200)
@ -44,8 +55,7 @@ def test_s3_server_bucket_create():
def test_s3_server_bucket_versioning():
backend = server.create_backend_app("s3")
test_client = backend.test_client()
test_client = authenticated_client()
# Just enough XML to enable versioning
body = '<Status>Enabled</Status>'
@ -55,8 +65,7 @@ def test_s3_server_bucket_versioning():
def test_s3_server_post_to_bucket():
backend = server.create_backend_app("s3")
test_client = backend.test_client()
test_client = authenticated_client()
res = test_client.put('/', 'http://tester.localhost:5000/')
res.status_code.should.equal(200)
@ -72,8 +81,7 @@ def test_s3_server_post_to_bucket():
def test_s3_server_post_without_content_length():
backend = server.create_backend_app("s3")
test_client = backend.test_client()
test_client = authenticated_client()
res = test_client.put('/', 'http://tester.localhost:5000/', environ_overrides={'CONTENT_LENGTH': ''})
res.status_code.should.equal(411)

View file

@ -1,6 +1,7 @@
from __future__ import unicode_literals
import sure # noqa
from flask.testing import FlaskClient
import moto.server as server
'''
@ -8,9 +9,21 @@ Test the different server responses
'''
def test_s3_server_get():
class AuthenticatedClient(FlaskClient):
def open(self, *args, **kwargs):
kwargs['headers'] = kwargs.get('headers', {})
kwargs['headers']['Authorization'] = "Any authorization header"
return super(AuthenticatedClient, self).open(*args, **kwargs)
def authenticated_client():
backend = server.create_backend_app("s3bucket_path")
test_client = backend.test_client()
backend.test_client_class = AuthenticatedClient
return backend.test_client()
def test_s3_server_get():
test_client = authenticated_client()
res = test_client.get('/')
@ -18,8 +31,7 @@ def test_s3_server_get():
def test_s3_server_bucket_create():
backend = server.create_backend_app("s3bucket_path")
test_client = backend.test_client()
test_client = authenticated_client()
res = test_client.put('/foobar', 'http://localhost:5000')
res.status_code.should.equal(200)
@ -54,8 +66,7 @@ def test_s3_server_bucket_create():
def test_s3_server_post_to_bucket():
backend = server.create_backend_app("s3bucket_path")
test_client = backend.test_client()
test_client = authenticated_client()
res = test_client.put('/foobar2', 'http://localhost:5000/')
res.status_code.should.equal(200)
@ -71,8 +82,7 @@ def test_s3_server_post_to_bucket():
def test_s3_server_put_ipv6():
backend = server.create_backend_app("s3bucket_path")
test_client = backend.test_client()
test_client = authenticated_client()
res = test_client.put('/foobar2', 'http://[::]:5000/')
res.status_code.should.equal(200)
@ -88,8 +98,7 @@ def test_s3_server_put_ipv6():
def test_s3_server_put_ipv4():
backend = server.create_backend_app("s3bucket_path")
test_client = backend.test_client()
test_client = authenticated_client()
res = test_client.put('/foobar2', 'http://127.0.0.1:5000/')
res.status_code.should.equal(200)