Merge remote-tracking branch 'spulec/master'
Conflicts: moto/s3/responses.py
This commit is contained in:
commit
3628e40f3c
42 changed files with 1173 additions and 308 deletions
|
|
@ -365,7 +365,7 @@ def test_scan():
|
|||
'Body': 'http://url_to_lolcat.gif',
|
||||
'SentBy': 'User B',
|
||||
'ReceivedTime': '12/9/2011 11:36:03 PM',
|
||||
'Ids': {1, 2, 3},
|
||||
'Ids': set([1, 2, 3]),
|
||||
'PK': 7,
|
||||
}
|
||||
item = table.new_item(
|
||||
|
|
@ -442,7 +442,7 @@ def test_write_batch():
|
|||
'Body': 'http://url_to_lolcat.gif',
|
||||
'SentBy': 'User B',
|
||||
'ReceivedTime': '12/9/2011 11:36:03 PM',
|
||||
'Ids': {1, 2, 3},
|
||||
'Ids': set([1, 2, 3]),
|
||||
'PK': 7,
|
||||
},
|
||||
))
|
||||
|
|
@ -489,7 +489,7 @@ def test_batch_read():
|
|||
'Body': 'http://url_to_lolcat.gif',
|
||||
'SentBy': 'User B',
|
||||
'ReceivedTime': '12/9/2011 11:36:03 PM',
|
||||
'Ids': {1, 2, 3},
|
||||
'Ids': set([1, 2, 3]),
|
||||
'PK': 7,
|
||||
}
|
||||
item = table.new_item(
|
||||
|
|
|
|||
|
|
@ -282,7 +282,7 @@ def test_scan():
|
|||
'Body': 'http://url_to_lolcat.gif',
|
||||
'SentBy': 'User B',
|
||||
'ReceivedTime': '12/9/2011 11:36:03 PM',
|
||||
'Ids': {1, 2, 3},
|
||||
'Ids': set([1, 2, 3]),
|
||||
'PK': 7,
|
||||
}
|
||||
item = table.new_item(
|
||||
|
|
@ -356,7 +356,7 @@ def test_write_batch():
|
|||
'Body': 'http://url_to_lolcat.gif',
|
||||
'SentBy': 'User B',
|
||||
'ReceivedTime': '12/9/2011 11:36:03 PM',
|
||||
'Ids': {1, 2, 3},
|
||||
'Ids': set([1, 2, 3]),
|
||||
'PK': 7,
|
||||
},
|
||||
))
|
||||
|
|
@ -401,7 +401,7 @@ def test_batch_read():
|
|||
'Body': 'http://url_to_lolcat.gif',
|
||||
'SentBy': 'User B',
|
||||
'ReceivedTime': '12/9/2011 11:36:03 PM',
|
||||
'Ids': {1, 2, 3},
|
||||
'Ids': set([1, 2, 3]),
|
||||
'PK': 7,
|
||||
}
|
||||
item = table.new_item(
|
||||
|
|
|
|||
|
|
@ -1,9 +1,194 @@
|
|||
"""Test mocking of Elatic IP Address"""
|
||||
import boto
|
||||
from boto.exception import EC2ResponseError
|
||||
|
||||
import sure # noqa
|
||||
|
||||
from moto import mock_ec2
|
||||
|
||||
import logging
|
||||
import types
|
||||
|
||||
|
||||
@mock_ec2
|
||||
def test_elastic_ip_addresses():
|
||||
pass
|
||||
def test_eip_allocate_classic():
|
||||
"""Allocate/release Classic EIP"""
|
||||
conn = boto.connect_ec2('the_key', 'the_secret')
|
||||
|
||||
standard = conn.allocate_address()
|
||||
standard.should.be.a(boto.ec2.address.Address)
|
||||
standard.public_ip.should.be.a(types.UnicodeType)
|
||||
standard.instance_id.should.be.none
|
||||
standard.domain.should.be.equal("standard")
|
||||
standard.release()
|
||||
standard.should_not.be.within(conn.get_all_addresses())
|
||||
|
||||
|
||||
@mock_ec2
|
||||
def test_eip_allocate_vpc():
|
||||
"""Allocate/release VPC EIP"""
|
||||
conn = boto.connect_ec2('the_key', 'the_secret')
|
||||
|
||||
vpc = conn.allocate_address(domain="vpc")
|
||||
vpc.should.be.a(boto.ec2.address.Address)
|
||||
vpc.domain.should.be.equal("vpc")
|
||||
logging.debug("vpc alloc_id:".format(vpc.allocation_id))
|
||||
vpc.release()
|
||||
|
||||
|
||||
@mock_ec2
|
||||
def test_eip_allocate_invalid_domain():
|
||||
"""Allocate EIP invalid domain"""
|
||||
conn = boto.connect_ec2('the_key', 'the_secret')
|
||||
|
||||
conn.allocate_address.when.called_with(domain="bogus").should.throw(EC2ResponseError)
|
||||
|
||||
|
||||
@mock_ec2
|
||||
def test_eip_associate_classic():
|
||||
"""Associate/Disassociate EIP to classic instance"""
|
||||
conn = boto.connect_ec2('the_key', 'the_secret')
|
||||
|
||||
reservation = conn.run_instances('ami-1234abcd')
|
||||
instance = reservation.instances[0]
|
||||
|
||||
eip = conn.allocate_address()
|
||||
eip.instance_id.should.be.none
|
||||
conn.associate_address.when.called_with(public_ip=eip.public_ip).should.throw(EC2ResponseError)
|
||||
conn.associate_address(instance_id=instance.id, public_ip=eip.public_ip)
|
||||
eip = conn.get_all_addresses(addresses=[eip.public_ip])[0] # no .update() on address ):
|
||||
eip.instance_id.should.be.equal(instance.id)
|
||||
conn.disassociate_address(public_ip=eip.public_ip)
|
||||
eip = conn.get_all_addresses(addresses=[eip.public_ip])[0] # no .update() on address ):
|
||||
eip.instance_id.should.be.equal(u'')
|
||||
eip.release()
|
||||
eip.should_not.be.within(conn.get_all_addresses())
|
||||
eip = None
|
||||
|
||||
instance.terminate()
|
||||
|
||||
@mock_ec2
|
||||
def test_eip_associate_vpc():
|
||||
"""Associate/Disassociate EIP to VPC instance"""
|
||||
conn = boto.connect_ec2('the_key', 'the_secret')
|
||||
|
||||
reservation = conn.run_instances('ami-1234abcd')
|
||||
instance = reservation.instances[0]
|
||||
|
||||
eip = conn.allocate_address(domain='vpc')
|
||||
eip.instance_id.should.be.none
|
||||
conn.associate_address.when.called_with(allocation_id=eip.allocation_id).should.throw(EC2ResponseError)
|
||||
conn.associate_address(instance_id=instance.id, allocation_id=eip.allocation_id)
|
||||
eip = conn.get_all_addresses(addresses=[eip.public_ip])[0] # no .update() on address ):
|
||||
eip.instance_id.should.be.equal(instance.id)
|
||||
conn.disassociate_address(association_id=eip.association_id)
|
||||
eip = conn.get_all_addresses(addresses=[eip.public_ip])[0] # no .update() on address ):
|
||||
eip.instance_id.should.be.equal(u'')
|
||||
eip.association_id.should.be.none
|
||||
eip.release()
|
||||
eip = None
|
||||
|
||||
instance.terminate()
|
||||
|
||||
@mock_ec2
|
||||
def test_eip_reassociate():
|
||||
"""reassociate EIP"""
|
||||
conn = boto.connect_ec2('the_key', 'the_secret')
|
||||
|
||||
reservation = conn.run_instances('ami-1234abcd')
|
||||
instance = reservation.instances[0]
|
||||
|
||||
eip = conn.allocate_address()
|
||||
conn.associate_address(instance_id=instance.id, public_ip=eip.public_ip)
|
||||
conn.associate_address.when.called_with(instance_id=instance.id, public_ip=eip.public_ip, allow_reassociation=False).should.throw(EC2ResponseError)
|
||||
conn.associate_address.when.called_with(instance_id=instance.id, public_ip=eip.public_ip, allow_reassociation=True).should_not.throw(EC2ResponseError)
|
||||
eip.release()
|
||||
eip = None
|
||||
|
||||
instance.terminate()
|
||||
|
||||
@mock_ec2
|
||||
def test_eip_associate_invalid_args():
|
||||
"""Associate EIP, invalid args """
|
||||
conn = boto.connect_ec2('the_key', 'the_secret')
|
||||
|
||||
reservation = conn.run_instances('ami-1234abcd')
|
||||
instance = reservation.instances[0]
|
||||
|
||||
eip = conn.allocate_address()
|
||||
conn.associate_address.when.called_with(instance_id=instance.id).should.throw(EC2ResponseError)
|
||||
|
||||
instance.terminate()
|
||||
|
||||
|
||||
@mock_ec2
|
||||
def test_eip_disassociate_bogus_association():
|
||||
"""Disassociate bogus EIP"""
|
||||
conn = boto.connect_ec2('the_key', 'the_secret')
|
||||
conn.disassociate_address.when.called_with(association_id="bogus").should.throw(EC2ResponseError)
|
||||
|
||||
@mock_ec2
|
||||
def test_eip_release_bogus_eip():
|
||||
"""Release bogus EIP"""
|
||||
conn = boto.connect_ec2('the_key', 'the_secret')
|
||||
conn.release_address.when.called_with(allocation_id="bogus").should.throw(EC2ResponseError)
|
||||
|
||||
|
||||
@mock_ec2
|
||||
def test_eip_disassociate_arg_error():
|
||||
"""Invalid arguments disassociate address"""
|
||||
conn = boto.connect_ec2('the_key', 'the_secret')
|
||||
conn.disassociate_address.when.called_with().should.throw(EC2ResponseError)
|
||||
|
||||
|
||||
@mock_ec2
|
||||
def test_eip_release_arg_error():
|
||||
"""Invalid arguments release address"""
|
||||
conn = boto.connect_ec2('the_key', 'the_secret')
|
||||
conn.release_address.when.called_with().should.throw(EC2ResponseError)
|
||||
|
||||
|
||||
@mock_ec2
|
||||
def test_eip_describe():
|
||||
"""Listing of allocated Elastic IP Addresses."""
|
||||
conn = boto.connect_ec2('the_key', 'the_secret')
|
||||
eips = []
|
||||
number_of_classic_ips = 2
|
||||
number_of_vpc_ips = 2
|
||||
|
||||
#allocate some IPs
|
||||
for _ in range(number_of_classic_ips):
|
||||
eips.append(conn.allocate_address())
|
||||
for _ in range(number_of_vpc_ips):
|
||||
eips.append(conn.allocate_address(domain='vpc'))
|
||||
len(eips).should.be.equal(number_of_classic_ips + number_of_vpc_ips)
|
||||
|
||||
# Can we find each one individually?
|
||||
for eip in eips:
|
||||
if eip.allocation_id:
|
||||
lookup_addresses = conn.get_all_addresses(allocation_ids=[eip.allocation_id])
|
||||
else:
|
||||
lookup_addresses = conn.get_all_addresses(addresses=[eip.public_ip])
|
||||
len(lookup_addresses).should.be.equal(1)
|
||||
lookup_addresses[0].public_ip.should.be.equal(eip.public_ip)
|
||||
|
||||
# Can we find first two when we search for them?
|
||||
lookup_addresses = conn.get_all_addresses(addresses=[eips[0].public_ip, eips[1].public_ip])
|
||||
len(lookup_addresses).should.be.equal(2)
|
||||
lookup_addresses[0].public_ip.should.be.equal(eips[0].public_ip)
|
||||
lookup_addresses[1].public_ip.should.be.equal(eips[1].public_ip)
|
||||
|
||||
#Release all IPs
|
||||
for eip in eips:
|
||||
eip.release()
|
||||
len(conn.get_all_addresses()).should.be.equal(0)
|
||||
|
||||
|
||||
@mock_ec2
|
||||
def test_eip_describe_none():
|
||||
"""Find nothing when seach for bogus IP"""
|
||||
conn = boto.connect_ec2('the_key', 'the_secret')
|
||||
lookup_addresses = conn.get_all_addresses(addresses=["256.256.256.256"])
|
||||
len(lookup_addresses).should.be.equal(0)
|
||||
|
||||
|
||||
|
|
|
|||
|
|
@ -35,6 +35,7 @@ def test_instance_launch_and_terminate():
|
|||
reservation.should.be.a(Reservation)
|
||||
reservation.instances.should.have.length_of(1)
|
||||
instance = reservation.instances[0]
|
||||
instance.state.should.equal('pending')
|
||||
|
||||
reservations = conn.get_all_instances()
|
||||
reservations.should.have.length_of(1)
|
||||
|
|
@ -42,13 +43,13 @@ def test_instance_launch_and_terminate():
|
|||
instances = reservations[0].instances
|
||||
instances.should.have.length_of(1)
|
||||
instances[0].id.should.equal(instance.id)
|
||||
instances[0].state.should.equal('pending')
|
||||
instances[0].state.should.equal('running')
|
||||
|
||||
conn.terminate_instances([instances[0].id])
|
||||
|
||||
reservations = conn.get_all_instances()
|
||||
instance = reservations[0].instances[0]
|
||||
instance.state.should.equal('shutting-down')
|
||||
instance.state.should.equal('terminated')
|
||||
|
||||
|
||||
@mock_ec2
|
||||
|
|
@ -85,18 +86,18 @@ def test_get_instances_filtering_by_state():
|
|||
|
||||
conn.terminate_instances([instance1.id])
|
||||
|
||||
reservations = conn.get_all_instances(filters={'instance-state-name': 'pending'})
|
||||
reservations = conn.get_all_instances(filters={'instance-state-name': 'running'})
|
||||
reservations.should.have.length_of(1)
|
||||
# Since we terminated instance1, only instance2 and instance3 should be returned
|
||||
instance_ids = [instance.id for instance in reservations[0].instances]
|
||||
set(instance_ids).should.equal(set([instance2.id, instance3.id]))
|
||||
|
||||
reservations = conn.get_all_instances([instance2.id], filters={'instance-state-name': 'pending'})
|
||||
reservations = conn.get_all_instances([instance2.id], filters={'instance-state-name': 'running'})
|
||||
reservations.should.have.length_of(1)
|
||||
instance_ids = [instance.id for instance in reservations[0].instances]
|
||||
instance_ids.should.equal([instance2.id])
|
||||
|
||||
reservations = conn.get_all_instances([instance2.id], filters={'instance-state-name': 'terminating'})
|
||||
reservations = conn.get_all_instances([instance2.id], filters={'instance-state-name': 'terminated'})
|
||||
list(reservations).should.equal([])
|
||||
|
||||
# get_all_instances should still return all 3
|
||||
|
|
|
|||
50
tests/test_s3bucket_path/test_bucket_path_server.py
Normal file
50
tests/test_s3bucket_path/test_bucket_path_server.py
Normal file
|
|
@ -0,0 +1,50 @@
|
|||
import sure # noqa
|
||||
|
||||
import moto.server as server
|
||||
|
||||
'''
|
||||
Test the different server responses
|
||||
'''
|
||||
server.configure_urls("s3bucket_path")
|
||||
|
||||
|
||||
def test_s3_server_get():
|
||||
test_client = server.app.test_client()
|
||||
res = test_client.get('/')
|
||||
|
||||
res.data.should.contain('ListAllMyBucketsResult')
|
||||
|
||||
|
||||
def test_s3_server_bucket_create():
|
||||
test_client = server.app.test_client()
|
||||
res = test_client.put('/foobar', 'http://localhost:5000')
|
||||
res.status_code.should.equal(200)
|
||||
|
||||
res = test_client.get('/')
|
||||
res.data.should.contain('<Name>foobar</Name>')
|
||||
|
||||
res = test_client.get('/foobar', 'http://localhost:5000')
|
||||
res.status_code.should.equal(200)
|
||||
res.data.should.contain("ListBucketResult")
|
||||
|
||||
res = test_client.put('/foobar/bar', 'http://localhost:5000', data='test value')
|
||||
res.status_code.should.equal(200)
|
||||
|
||||
res = test_client.get('/foobar/bar', 'http://localhost:5000')
|
||||
res.status_code.should.equal(200)
|
||||
res.data.should.equal("test value")
|
||||
|
||||
|
||||
def test_s3_server_post_to_bucket():
|
||||
test_client = server.app.test_client()
|
||||
res = test_client.put('/foobar', 'http://localhost:5000/')
|
||||
res.status_code.should.equal(200)
|
||||
|
||||
test_client.post('/foobar', "https://localhost:5000/", data={
|
||||
'key': 'the-key',
|
||||
'file': 'nothing'
|
||||
})
|
||||
|
||||
res = test_client.get('/foobar/the-key', 'http://localhost:5000/')
|
||||
res.status_code.should.equal(200)
|
||||
res.data.should.equal("nothing")
|
||||
281
tests/test_s3bucket_path/test_s3bucket_path.py
Normal file
281
tests/test_s3bucket_path/test_s3bucket_path.py
Normal file
|
|
@ -0,0 +1,281 @@
|
|||
import urllib2
|
||||
|
||||
import boto
|
||||
from boto.exception import S3ResponseError
|
||||
from boto.s3.key import Key
|
||||
from boto.s3.connection import OrdinaryCallingFormat
|
||||
|
||||
from freezegun import freeze_time
|
||||
import requests
|
||||
|
||||
import sure # noqa
|
||||
|
||||
from moto import mock_s3bucket_path
|
||||
|
||||
|
||||
def create_connection(key=None, secret=None):
|
||||
return boto.connect_s3(key, secret, calling_format=OrdinaryCallingFormat())
|
||||
|
||||
|
||||
class MyModel(object):
|
||||
def __init__(self, name, value):
|
||||
self.name = name
|
||||
self.value = value
|
||||
|
||||
def save(self):
|
||||
conn = create_connection('the_key', 'the_secret')
|
||||
bucket = conn.get_bucket('mybucket')
|
||||
k = Key(bucket)
|
||||
k.key = self.name
|
||||
k.set_contents_from_string(self.value)
|
||||
|
||||
|
||||
@mock_s3bucket_path
|
||||
def test_my_model_save():
|
||||
# Create Bucket so that test can run
|
||||
conn = create_connection('the_key', 'the_secret')
|
||||
conn.create_bucket('mybucket')
|
||||
####################################
|
||||
|
||||
model_instance = MyModel('steve', 'is awesome')
|
||||
model_instance.save()
|
||||
|
||||
conn.get_bucket('mybucket').get_key('steve').get_contents_as_string().should.equal('is awesome')
|
||||
|
||||
|
||||
@mock_s3bucket_path
|
||||
def test_missing_key():
|
||||
conn = create_connection('the_key', 'the_secret')
|
||||
bucket = conn.create_bucket("foobar")
|
||||
bucket.get_key("the-key").should.equal(None)
|
||||
|
||||
|
||||
@mock_s3bucket_path
|
||||
def test_missing_key_urllib2():
|
||||
conn = create_connection('the_key', 'the_secret')
|
||||
conn.create_bucket("foobar")
|
||||
|
||||
urllib2.urlopen.when.called_with("http://s3.amazonaws.com/foobar/the-key").should.throw(urllib2.HTTPError)
|
||||
|
||||
|
||||
@mock_s3bucket_path
|
||||
def test_empty_key():
|
||||
conn = create_connection('the_key', 'the_secret')
|
||||
bucket = conn.create_bucket("foobar")
|
||||
key = Key(bucket)
|
||||
key.key = "the-key"
|
||||
key.set_contents_from_string("")
|
||||
|
||||
bucket.get_key("the-key").get_contents_as_string().should.equal('')
|
||||
|
||||
|
||||
@mock_s3bucket_path
|
||||
def test_empty_key_set_on_existing_key():
|
||||
conn = create_connection('the_key', 'the_secret')
|
||||
bucket = conn.create_bucket("foobar")
|
||||
key = Key(bucket)
|
||||
key.key = "the-key"
|
||||
key.set_contents_from_string("foobar")
|
||||
|
||||
bucket.get_key("the-key").get_contents_as_string().should.equal('foobar')
|
||||
|
||||
key.set_contents_from_string("")
|
||||
bucket.get_key("the-key").get_contents_as_string().should.equal('')
|
||||
|
||||
|
||||
@mock_s3bucket_path
|
||||
def test_large_key_save():
|
||||
conn = create_connection('the_key', 'the_secret')
|
||||
bucket = conn.create_bucket("foobar")
|
||||
key = Key(bucket)
|
||||
key.key = "the-key"
|
||||
key.set_contents_from_string("foobar" * 100000)
|
||||
|
||||
bucket.get_key("the-key").get_contents_as_string().should.equal('foobar' * 100000)
|
||||
|
||||
|
||||
@mock_s3bucket_path
|
||||
def test_copy_key():
|
||||
conn = create_connection('the_key', 'the_secret')
|
||||
bucket = conn.create_bucket("foobar")
|
||||
key = Key(bucket)
|
||||
key.key = "the-key"
|
||||
key.set_contents_from_string("some value")
|
||||
|
||||
bucket.copy_key('new-key', 'foobar', 'the-key')
|
||||
|
||||
bucket.get_key("the-key").get_contents_as_string().should.equal("some value")
|
||||
bucket.get_key("new-key").get_contents_as_string().should.equal("some value")
|
||||
|
||||
|
||||
@mock_s3bucket_path
|
||||
def test_set_metadata():
|
||||
conn = create_connection('the_key', 'the_secret')
|
||||
bucket = conn.create_bucket("foobar")
|
||||
key = Key(bucket)
|
||||
key.key = 'the-key'
|
||||
key.set_metadata('md', 'Metadatastring')
|
||||
key.set_contents_from_string("Testval")
|
||||
|
||||
bucket.get_key('the-key').get_metadata('md').should.equal('Metadatastring')
|
||||
|
||||
|
||||
@freeze_time("2012-01-01 12:00:00")
|
||||
@mock_s3bucket_path
|
||||
def test_last_modified():
|
||||
# See https://github.com/boto/boto/issues/466
|
||||
conn = create_connection()
|
||||
bucket = conn.create_bucket("foobar")
|
||||
key = Key(bucket)
|
||||
key.key = "the-key"
|
||||
key.set_contents_from_string("some value")
|
||||
|
||||
rs = bucket.get_all_keys()
|
||||
rs[0].last_modified.should.equal('2012-01-01T12:00:00Z')
|
||||
|
||||
bucket.get_key("the-key").last_modified.should.equal('Sun, 01 Jan 2012 12:00:00 GMT')
|
||||
|
||||
|
||||
@mock_s3bucket_path
|
||||
def test_missing_bucket():
|
||||
conn = create_connection('the_key', 'the_secret')
|
||||
conn.get_bucket.when.called_with('mybucket').should.throw(S3ResponseError)
|
||||
|
||||
|
||||
@mock_s3bucket_path
|
||||
def test_bucket_with_dash():
|
||||
conn = create_connection('the_key', 'the_secret')
|
||||
conn.get_bucket.when.called_with('mybucket-test').should.throw(S3ResponseError)
|
||||
|
||||
|
||||
@mock_s3bucket_path
|
||||
def test_bucket_deletion():
|
||||
conn = create_connection('the_key', 'the_secret')
|
||||
bucket = conn.create_bucket("foobar")
|
||||
|
||||
key = Key(bucket)
|
||||
key.key = "the-key"
|
||||
key.set_contents_from_string("some value")
|
||||
|
||||
# Try to delete a bucket that still has keys
|
||||
conn.delete_bucket.when.called_with("foobar").should.throw(S3ResponseError)
|
||||
|
||||
bucket.delete_key("the-key")
|
||||
conn.delete_bucket("foobar")
|
||||
|
||||
# Get non-existing bucket
|
||||
conn.get_bucket.when.called_with("foobar").should.throw(S3ResponseError)
|
||||
|
||||
# Delete non-existant bucket
|
||||
conn.delete_bucket.when.called_with("foobar").should.throw(S3ResponseError)
|
||||
|
||||
|
||||
@mock_s3bucket_path
|
||||
def test_get_all_buckets():
|
||||
conn = create_connection('the_key', 'the_secret')
|
||||
conn.create_bucket("foobar")
|
||||
conn.create_bucket("foobar2")
|
||||
buckets = conn.get_all_buckets()
|
||||
|
||||
buckets.should.have.length_of(2)
|
||||
|
||||
|
||||
@mock_s3bucket_path
|
||||
def test_post_to_bucket():
|
||||
conn = create_connection('the_key', 'the_secret')
|
||||
bucket = conn.create_bucket("foobar")
|
||||
|
||||
requests.post("https://s3.amazonaws.com/foobar", {
|
||||
'key': 'the-key',
|
||||
'file': 'nothing'
|
||||
})
|
||||
|
||||
bucket.get_key('the-key').get_contents_as_string().should.equal('nothing')
|
||||
|
||||
|
||||
@mock_s3bucket_path
|
||||
def test_post_with_metadata_to_bucket():
|
||||
conn = create_connection('the_key', 'the_secret')
|
||||
bucket = conn.create_bucket("foobar")
|
||||
|
||||
requests.post("https://s3.amazonaws.com/foobar", {
|
||||
'key': 'the-key',
|
||||
'file': 'nothing',
|
||||
'x-amz-meta-test': 'metadata'
|
||||
})
|
||||
|
||||
bucket.get_key('the-key').get_metadata('test').should.equal('metadata')
|
||||
|
||||
|
||||
@mock_s3bucket_path
|
||||
def test_bucket_method_not_implemented():
|
||||
requests.patch.when.called_with("https://s3.amazonaws.com/foobar").should.throw(NotImplementedError)
|
||||
|
||||
|
||||
@mock_s3bucket_path
|
||||
def test_key_method_not_implemented():
|
||||
requests.post.when.called_with("https://s3.amazonaws.com/foobar/foo").should.throw(NotImplementedError)
|
||||
|
||||
|
||||
@mock_s3bucket_path
|
||||
def test_bucket_name_with_dot():
|
||||
conn = create_connection()
|
||||
bucket = conn.create_bucket('firstname.lastname')
|
||||
|
||||
k = Key(bucket, 'somekey')
|
||||
k.set_contents_from_string('somedata')
|
||||
|
||||
|
||||
@mock_s3bucket_path
|
||||
def test_key_with_special_characters():
|
||||
conn = create_connection()
|
||||
bucket = conn.create_bucket('test_bucket_name')
|
||||
|
||||
key = Key(bucket, 'test_list_keys_2/x?y')
|
||||
key.set_contents_from_string('value1')
|
||||
|
||||
key_list = bucket.list('test_list_keys_2/', '/')
|
||||
keys = [x for x in key_list]
|
||||
keys[0].name.should.equal("test_list_keys_2/x?y")
|
||||
|
||||
|
||||
@mock_s3bucket_path
|
||||
def test_bucket_key_listing_order():
|
||||
conn = create_connection()
|
||||
bucket = conn.create_bucket('test_bucket')
|
||||
prefix = 'toplevel/'
|
||||
|
||||
def store(name):
|
||||
k = Key(bucket, prefix + name)
|
||||
k.set_contents_from_string('somedata')
|
||||
|
||||
names = ['x/key', 'y.key1', 'y.key2', 'y.key3', 'x/y/key', 'x/y/z/key']
|
||||
|
||||
for name in names:
|
||||
store(name)
|
||||
|
||||
delimiter = None
|
||||
keys = [x.name for x in bucket.list(prefix, delimiter)]
|
||||
keys.should.equal([
|
||||
'toplevel/x/key', 'toplevel/x/y/key', 'toplevel/x/y/z/key',
|
||||
'toplevel/y.key1', 'toplevel/y.key2', 'toplevel/y.key3'
|
||||
])
|
||||
|
||||
delimiter = '/'
|
||||
keys = [x.name for x in bucket.list(prefix, delimiter)]
|
||||
keys.should.equal([
|
||||
'toplevel/y.key1', 'toplevel/y.key2', 'toplevel/y.key3', 'toplevel/x/'
|
||||
])
|
||||
|
||||
# Test delimiter with no prefix
|
||||
delimiter = '/'
|
||||
keys = [x.name for x in bucket.list(prefix=None, delimiter=delimiter)]
|
||||
keys.should.equal(['toplevel'])
|
||||
|
||||
delimiter = None
|
||||
keys = [x.name for x in bucket.list(prefix + 'x', delimiter)]
|
||||
keys.should.equal([u'toplevel/x/key', u'toplevel/x/y/key', u'toplevel/x/y/z/key'])
|
||||
|
||||
delimiter = '/'
|
||||
keys = [x.name for x in bucket.list(prefix + 'x', delimiter)]
|
||||
keys.should.equal([u'toplevel/x/'])
|
||||
14
tests/test_s3bucket_path/test_s3bucket_path_utils.py
Normal file
14
tests/test_s3bucket_path/test_s3bucket_path_utils.py
Normal file
|
|
@ -0,0 +1,14 @@
|
|||
from sure import expect
|
||||
from moto.s3bucket_path.utils import bucket_name_from_url
|
||||
|
||||
|
||||
def test_base_url():
|
||||
expect(bucket_name_from_url('https://s3.amazonaws.com/')).should.equal(None)
|
||||
|
||||
|
||||
def test_localhost_bucket():
|
||||
expect(bucket_name_from_url('https://localhost:5000/wfoobar/abc')).should.equal("wfoobar")
|
||||
|
||||
|
||||
def test_localhost_without_bucket():
|
||||
expect(bucket_name_from_url('https://www.localhost:5000')).should.equal(None)
|
||||
Loading…
Add table
Add a link
Reference in a new issue