This commit is contained in:
Stephan 2018-12-21 12:28:56 +01:00
commit e51d1bfade
172 changed files with 49629 additions and 49629 deletions

File diff suppressed because it is too large Load diff

View file

@ -1,387 +1,387 @@
from __future__ import unicode_literals
import boto
import boto3
from boto.exception import S3ResponseError
from boto.s3.lifecycle import Lifecycle, Transition, Expiration, Rule
import sure # noqa
from botocore.exceptions import ClientError
from datetime import datetime
from nose.tools import assert_raises
from moto import mock_s3_deprecated, mock_s3
@mock_s3_deprecated
def test_lifecycle_create():
conn = boto.s3.connect_to_region("us-west-1")
bucket = conn.create_bucket("foobar")
lifecycle = Lifecycle()
lifecycle.add_rule('myid', '', 'Enabled', 30)
bucket.configure_lifecycle(lifecycle)
response = bucket.get_lifecycle_config()
len(response).should.equal(1)
lifecycle = response[0]
lifecycle.id.should.equal('myid')
lifecycle.prefix.should.equal('')
lifecycle.status.should.equal('Enabled')
list(lifecycle.transition).should.equal([])
@mock_s3
def test_lifecycle_with_filters():
client = boto3.client("s3")
client.create_bucket(Bucket="bucket")
# Create a lifecycle rule with a Filter (no tags):
lfc = {
"Rules": [
{
"Expiration": {
"Days": 7
},
"ID": "wholebucket",
"Filter": {
"Prefix": ""
},
"Status": "Enabled"
}
]
}
client.put_bucket_lifecycle_configuration(Bucket="bucket", LifecycleConfiguration=lfc)
result = client.get_bucket_lifecycle_configuration(Bucket="bucket")
assert len(result["Rules"]) == 1
assert result["Rules"][0]["Filter"]["Prefix"] == ''
assert not result["Rules"][0]["Filter"].get("And")
assert not result["Rules"][0]["Filter"].get("Tag")
with assert_raises(KeyError):
assert result["Rules"][0]["Prefix"]
# With a tag:
lfc["Rules"][0]["Filter"]["Tag"] = {
"Key": "mytag",
"Value": "mytagvalue"
}
client.put_bucket_lifecycle_configuration(Bucket="bucket", LifecycleConfiguration=lfc)
result = client.get_bucket_lifecycle_configuration(Bucket="bucket")
assert len(result["Rules"]) == 1
assert result["Rules"][0]["Filter"]["Prefix"] == ''
assert not result["Rules"][0]["Filter"].get("And")
assert result["Rules"][0]["Filter"]["Tag"]["Key"] == "mytag"
assert result["Rules"][0]["Filter"]["Tag"]["Value"] == "mytagvalue"
with assert_raises(KeyError):
assert result["Rules"][0]["Prefix"]
# With And (single tag):
lfc["Rules"][0]["Filter"]["And"] = {
"Prefix": "some/prefix",
"Tags": [
{
"Key": "mytag",
"Value": "mytagvalue"
}
]
}
client.put_bucket_lifecycle_configuration(Bucket="bucket", LifecycleConfiguration=lfc)
result = client.get_bucket_lifecycle_configuration(Bucket="bucket")
assert len(result["Rules"]) == 1
assert result["Rules"][0]["Filter"]["Prefix"] == ""
assert result["Rules"][0]["Filter"]["And"]["Prefix"] == "some/prefix"
assert len(result["Rules"][0]["Filter"]["And"]["Tags"]) == 1
assert result["Rules"][0]["Filter"]["And"]["Tags"][0]["Key"] == "mytag"
assert result["Rules"][0]["Filter"]["And"]["Tags"][0]["Value"] == "mytagvalue"
assert result["Rules"][0]["Filter"]["Tag"]["Key"] == "mytag"
assert result["Rules"][0]["Filter"]["Tag"]["Value"] == "mytagvalue"
with assert_raises(KeyError):
assert result["Rules"][0]["Prefix"]
# With multiple And tags:
lfc["Rules"][0]["Filter"]["And"] = {
"Prefix": "some/prefix",
"Tags": [
{
"Key": "mytag",
"Value": "mytagvalue"
},
{
"Key": "mytag2",
"Value": "mytagvalue2"
}
]
}
client.put_bucket_lifecycle_configuration(Bucket="bucket", LifecycleConfiguration=lfc)
result = client.get_bucket_lifecycle_configuration(Bucket="bucket")
assert len(result["Rules"]) == 1
assert result["Rules"][0]["Filter"]["Prefix"] == ""
assert result["Rules"][0]["Filter"]["And"]["Prefix"] == "some/prefix"
assert len(result["Rules"][0]["Filter"]["And"]["Tags"]) == 2
assert result["Rules"][0]["Filter"]["And"]["Tags"][0]["Key"] == "mytag"
assert result["Rules"][0]["Filter"]["And"]["Tags"][0]["Value"] == "mytagvalue"
assert result["Rules"][0]["Filter"]["Tag"]["Key"] == "mytag"
assert result["Rules"][0]["Filter"]["Tag"]["Value"] == "mytagvalue"
assert result["Rules"][0]["Filter"]["And"]["Tags"][1]["Key"] == "mytag2"
assert result["Rules"][0]["Filter"]["And"]["Tags"][1]["Value"] == "mytagvalue2"
assert result["Rules"][0]["Filter"]["Tag"]["Key"] == "mytag"
assert result["Rules"][0]["Filter"]["Tag"]["Value"] == "mytagvalue"
with assert_raises(KeyError):
assert result["Rules"][0]["Prefix"]
# Can't have both filter and prefix:
lfc["Rules"][0]["Prefix"] = ''
with assert_raises(ClientError) as err:
client.put_bucket_lifecycle_configuration(Bucket="bucket", LifecycleConfiguration=lfc)
assert err.exception.response["Error"]["Code"] == "MalformedXML"
lfc["Rules"][0]["Prefix"] = 'some/path'
with assert_raises(ClientError) as err:
client.put_bucket_lifecycle_configuration(Bucket="bucket", LifecycleConfiguration=lfc)
assert err.exception.response["Error"]["Code"] == "MalformedXML"
# No filters -- just a prefix:
del lfc["Rules"][0]["Filter"]
client.put_bucket_lifecycle_configuration(Bucket="bucket", LifecycleConfiguration=lfc)
result = client.get_bucket_lifecycle_configuration(Bucket="bucket")
assert not result["Rules"][0].get("Filter")
assert result["Rules"][0]["Prefix"] == "some/path"
@mock_s3
def test_lifecycle_with_eodm():
client = boto3.client("s3")
client.create_bucket(Bucket="bucket")
lfc = {
"Rules": [
{
"Expiration": {
"ExpiredObjectDeleteMarker": True
},
"ID": "wholebucket",
"Filter": {
"Prefix": ""
},
"Status": "Enabled"
}
]
}
client.put_bucket_lifecycle_configuration(Bucket="bucket", LifecycleConfiguration=lfc)
result = client.get_bucket_lifecycle_configuration(Bucket="bucket")
assert len(result["Rules"]) == 1
assert result["Rules"][0]["Expiration"]["ExpiredObjectDeleteMarker"]
# Set to False:
lfc["Rules"][0]["Expiration"]["ExpiredObjectDeleteMarker"] = False
client.put_bucket_lifecycle_configuration(Bucket="bucket", LifecycleConfiguration=lfc)
result = client.get_bucket_lifecycle_configuration(Bucket="bucket")
assert len(result["Rules"]) == 1
assert not result["Rules"][0]["Expiration"]["ExpiredObjectDeleteMarker"]
# With failure:
lfc["Rules"][0]["Expiration"]["Days"] = 7
with assert_raises(ClientError) as err:
client.put_bucket_lifecycle_configuration(Bucket="bucket", LifecycleConfiguration=lfc)
assert err.exception.response["Error"]["Code"] == "MalformedXML"
del lfc["Rules"][0]["Expiration"]["Days"]
lfc["Rules"][0]["Expiration"]["Date"] = datetime(2015, 1, 1)
with assert_raises(ClientError) as err:
client.put_bucket_lifecycle_configuration(Bucket="bucket", LifecycleConfiguration=lfc)
assert err.exception.response["Error"]["Code"] == "MalformedXML"
@mock_s3
def test_lifecycle_with_nve():
client = boto3.client("s3")
client.create_bucket(Bucket="bucket")
lfc = {
"Rules": [
{
"NoncurrentVersionExpiration": {
"NoncurrentDays": 30
},
"ID": "wholebucket",
"Filter": {
"Prefix": ""
},
"Status": "Enabled"
}
]
}
client.put_bucket_lifecycle_configuration(Bucket="bucket", LifecycleConfiguration=lfc)
result = client.get_bucket_lifecycle_configuration(Bucket="bucket")
assert len(result["Rules"]) == 1
assert result["Rules"][0]["NoncurrentVersionExpiration"]["NoncurrentDays"] == 30
# Change NoncurrentDays:
lfc["Rules"][0]["NoncurrentVersionExpiration"]["NoncurrentDays"] = 10
client.put_bucket_lifecycle_configuration(Bucket="bucket", LifecycleConfiguration=lfc)
result = client.get_bucket_lifecycle_configuration(Bucket="bucket")
assert len(result["Rules"]) == 1
assert result["Rules"][0]["NoncurrentVersionExpiration"]["NoncurrentDays"] == 10
# TODO: Add test for failures due to missing children
@mock_s3
def test_lifecycle_with_nvt():
client = boto3.client("s3")
client.create_bucket(Bucket="bucket")
lfc = {
"Rules": [
{
"NoncurrentVersionTransitions": [{
"NoncurrentDays": 30,
"StorageClass": "ONEZONE_IA"
}],
"ID": "wholebucket",
"Filter": {
"Prefix": ""
},
"Status": "Enabled"
}
]
}
client.put_bucket_lifecycle_configuration(Bucket="bucket", LifecycleConfiguration=lfc)
result = client.get_bucket_lifecycle_configuration(Bucket="bucket")
assert len(result["Rules"]) == 1
assert result["Rules"][0]["NoncurrentVersionTransitions"][0]["NoncurrentDays"] == 30
assert result["Rules"][0]["NoncurrentVersionTransitions"][0]["StorageClass"] == "ONEZONE_IA"
# Change NoncurrentDays:
lfc["Rules"][0]["NoncurrentVersionTransitions"][0]["NoncurrentDays"] = 10
client.put_bucket_lifecycle_configuration(Bucket="bucket", LifecycleConfiguration=lfc)
result = client.get_bucket_lifecycle_configuration(Bucket="bucket")
assert len(result["Rules"]) == 1
assert result["Rules"][0]["NoncurrentVersionTransitions"][0]["NoncurrentDays"] == 10
# Change StorageClass:
lfc["Rules"][0]["NoncurrentVersionTransitions"][0]["StorageClass"] = "GLACIER"
client.put_bucket_lifecycle_configuration(Bucket="bucket", LifecycleConfiguration=lfc)
result = client.get_bucket_lifecycle_configuration(Bucket="bucket")
assert len(result["Rules"]) == 1
assert result["Rules"][0]["NoncurrentVersionTransitions"][0]["StorageClass"] == "GLACIER"
# With failures for missing children:
del lfc["Rules"][0]["NoncurrentVersionTransitions"][0]["NoncurrentDays"]
with assert_raises(ClientError) as err:
client.put_bucket_lifecycle_configuration(Bucket="bucket", LifecycleConfiguration=lfc)
assert err.exception.response["Error"]["Code"] == "MalformedXML"
lfc["Rules"][0]["NoncurrentVersionTransitions"][0]["NoncurrentDays"] = 30
del lfc["Rules"][0]["NoncurrentVersionTransitions"][0]["StorageClass"]
with assert_raises(ClientError) as err:
client.put_bucket_lifecycle_configuration(Bucket="bucket", LifecycleConfiguration=lfc)
assert err.exception.response["Error"]["Code"] == "MalformedXML"
@mock_s3
def test_lifecycle_with_aimu():
client = boto3.client("s3")
client.create_bucket(Bucket="bucket")
lfc = {
"Rules": [
{
"AbortIncompleteMultipartUpload": {
"DaysAfterInitiation": 7
},
"ID": "wholebucket",
"Filter": {
"Prefix": ""
},
"Status": "Enabled"
}
]
}
client.put_bucket_lifecycle_configuration(Bucket="bucket", LifecycleConfiguration=lfc)
result = client.get_bucket_lifecycle_configuration(Bucket="bucket")
assert len(result["Rules"]) == 1
assert result["Rules"][0]["AbortIncompleteMultipartUpload"]["DaysAfterInitiation"] == 7
# Change DaysAfterInitiation:
lfc["Rules"][0]["AbortIncompleteMultipartUpload"]["DaysAfterInitiation"] = 30
client.put_bucket_lifecycle_configuration(Bucket="bucket", LifecycleConfiguration=lfc)
result = client.get_bucket_lifecycle_configuration(Bucket="bucket")
assert len(result["Rules"]) == 1
assert result["Rules"][0]["AbortIncompleteMultipartUpload"]["DaysAfterInitiation"] == 30
# TODO: Add test for failures due to missing children
@mock_s3_deprecated
def test_lifecycle_with_glacier_transition():
conn = boto.s3.connect_to_region("us-west-1")
bucket = conn.create_bucket("foobar")
lifecycle = Lifecycle()
transition = Transition(days=30, storage_class='GLACIER')
rule = Rule('myid', prefix='', status='Enabled', expiration=None,
transition=transition)
lifecycle.append(rule)
bucket.configure_lifecycle(lifecycle)
response = bucket.get_lifecycle_config()
transition = response[0].transition
transition.days.should.equal(30)
transition.storage_class.should.equal('GLACIER')
transition.date.should.equal(None)
@mock_s3_deprecated
def test_lifecycle_multi():
conn = boto.s3.connect_to_region("us-west-1")
bucket = conn.create_bucket("foobar")
date = '2022-10-12T00:00:00.000Z'
sc = 'GLACIER'
lifecycle = Lifecycle()
lifecycle.add_rule("1", "1/", "Enabled", 1)
lifecycle.add_rule("2", "2/", "Enabled", Expiration(days=2))
lifecycle.add_rule("3", "3/", "Enabled", Expiration(date=date))
lifecycle.add_rule("4", "4/", "Enabled", None,
Transition(days=4, storage_class=sc))
lifecycle.add_rule("5", "5/", "Enabled", None,
Transition(date=date, storage_class=sc))
bucket.configure_lifecycle(lifecycle)
# read the lifecycle back
rules = bucket.get_lifecycle_config()
for rule in rules:
if rule.id == "1":
rule.prefix.should.equal("1/")
rule.expiration.days.should.equal(1)
elif rule.id == "2":
rule.prefix.should.equal("2/")
rule.expiration.days.should.equal(2)
elif rule.id == "3":
rule.prefix.should.equal("3/")
rule.expiration.date.should.equal(date)
elif rule.id == "4":
rule.prefix.should.equal("4/")
rule.transition.days.should.equal(4)
rule.transition.storage_class.should.equal(sc)
elif rule.id == "5":
rule.prefix.should.equal("5/")
rule.transition.date.should.equal(date)
rule.transition.storage_class.should.equal(sc)
else:
assert False, "Invalid rule id"
@mock_s3_deprecated
def test_lifecycle_delete():
conn = boto.s3.connect_to_region("us-west-1")
bucket = conn.create_bucket("foobar")
lifecycle = Lifecycle()
lifecycle.add_rule(expiration=30)
bucket.configure_lifecycle(lifecycle)
response = bucket.get_lifecycle_config()
response.should.have.length_of(1)
bucket.delete_lifecycle_configuration()
bucket.get_lifecycle_config.when.called_with().should.throw(S3ResponseError)
from __future__ import unicode_literals
import boto
import boto3
from boto.exception import S3ResponseError
from boto.s3.lifecycle import Lifecycle, Transition, Expiration, Rule
import sure # noqa
from botocore.exceptions import ClientError
from datetime import datetime
from nose.tools import assert_raises
from moto import mock_s3_deprecated, mock_s3
@mock_s3_deprecated
def test_lifecycle_create():
conn = boto.s3.connect_to_region("us-west-1")
bucket = conn.create_bucket("foobar")
lifecycle = Lifecycle()
lifecycle.add_rule('myid', '', 'Enabled', 30)
bucket.configure_lifecycle(lifecycle)
response = bucket.get_lifecycle_config()
len(response).should.equal(1)
lifecycle = response[0]
lifecycle.id.should.equal('myid')
lifecycle.prefix.should.equal('')
lifecycle.status.should.equal('Enabled')
list(lifecycle.transition).should.equal([])
@mock_s3
def test_lifecycle_with_filters():
client = boto3.client("s3")
client.create_bucket(Bucket="bucket")
# Create a lifecycle rule with a Filter (no tags):
lfc = {
"Rules": [
{
"Expiration": {
"Days": 7
},
"ID": "wholebucket",
"Filter": {
"Prefix": ""
},
"Status": "Enabled"
}
]
}
client.put_bucket_lifecycle_configuration(Bucket="bucket", LifecycleConfiguration=lfc)
result = client.get_bucket_lifecycle_configuration(Bucket="bucket")
assert len(result["Rules"]) == 1
assert result["Rules"][0]["Filter"]["Prefix"] == ''
assert not result["Rules"][0]["Filter"].get("And")
assert not result["Rules"][0]["Filter"].get("Tag")
with assert_raises(KeyError):
assert result["Rules"][0]["Prefix"]
# With a tag:
lfc["Rules"][0]["Filter"]["Tag"] = {
"Key": "mytag",
"Value": "mytagvalue"
}
client.put_bucket_lifecycle_configuration(Bucket="bucket", LifecycleConfiguration=lfc)
result = client.get_bucket_lifecycle_configuration(Bucket="bucket")
assert len(result["Rules"]) == 1
assert result["Rules"][0]["Filter"]["Prefix"] == ''
assert not result["Rules"][0]["Filter"].get("And")
assert result["Rules"][0]["Filter"]["Tag"]["Key"] == "mytag"
assert result["Rules"][0]["Filter"]["Tag"]["Value"] == "mytagvalue"
with assert_raises(KeyError):
assert result["Rules"][0]["Prefix"]
# With And (single tag):
lfc["Rules"][0]["Filter"]["And"] = {
"Prefix": "some/prefix",
"Tags": [
{
"Key": "mytag",
"Value": "mytagvalue"
}
]
}
client.put_bucket_lifecycle_configuration(Bucket="bucket", LifecycleConfiguration=lfc)
result = client.get_bucket_lifecycle_configuration(Bucket="bucket")
assert len(result["Rules"]) == 1
assert result["Rules"][0]["Filter"]["Prefix"] == ""
assert result["Rules"][0]["Filter"]["And"]["Prefix"] == "some/prefix"
assert len(result["Rules"][0]["Filter"]["And"]["Tags"]) == 1
assert result["Rules"][0]["Filter"]["And"]["Tags"][0]["Key"] == "mytag"
assert result["Rules"][0]["Filter"]["And"]["Tags"][0]["Value"] == "mytagvalue"
assert result["Rules"][0]["Filter"]["Tag"]["Key"] == "mytag"
assert result["Rules"][0]["Filter"]["Tag"]["Value"] == "mytagvalue"
with assert_raises(KeyError):
assert result["Rules"][0]["Prefix"]
# With multiple And tags:
lfc["Rules"][0]["Filter"]["And"] = {
"Prefix": "some/prefix",
"Tags": [
{
"Key": "mytag",
"Value": "mytagvalue"
},
{
"Key": "mytag2",
"Value": "mytagvalue2"
}
]
}
client.put_bucket_lifecycle_configuration(Bucket="bucket", LifecycleConfiguration=lfc)
result = client.get_bucket_lifecycle_configuration(Bucket="bucket")
assert len(result["Rules"]) == 1
assert result["Rules"][0]["Filter"]["Prefix"] == ""
assert result["Rules"][0]["Filter"]["And"]["Prefix"] == "some/prefix"
assert len(result["Rules"][0]["Filter"]["And"]["Tags"]) == 2
assert result["Rules"][0]["Filter"]["And"]["Tags"][0]["Key"] == "mytag"
assert result["Rules"][0]["Filter"]["And"]["Tags"][0]["Value"] == "mytagvalue"
assert result["Rules"][0]["Filter"]["Tag"]["Key"] == "mytag"
assert result["Rules"][0]["Filter"]["Tag"]["Value"] == "mytagvalue"
assert result["Rules"][0]["Filter"]["And"]["Tags"][1]["Key"] == "mytag2"
assert result["Rules"][0]["Filter"]["And"]["Tags"][1]["Value"] == "mytagvalue2"
assert result["Rules"][0]["Filter"]["Tag"]["Key"] == "mytag"
assert result["Rules"][0]["Filter"]["Tag"]["Value"] == "mytagvalue"
with assert_raises(KeyError):
assert result["Rules"][0]["Prefix"]
# Can't have both filter and prefix:
lfc["Rules"][0]["Prefix"] = ''
with assert_raises(ClientError) as err:
client.put_bucket_lifecycle_configuration(Bucket="bucket", LifecycleConfiguration=lfc)
assert err.exception.response["Error"]["Code"] == "MalformedXML"
lfc["Rules"][0]["Prefix"] = 'some/path'
with assert_raises(ClientError) as err:
client.put_bucket_lifecycle_configuration(Bucket="bucket", LifecycleConfiguration=lfc)
assert err.exception.response["Error"]["Code"] == "MalformedXML"
# No filters -- just a prefix:
del lfc["Rules"][0]["Filter"]
client.put_bucket_lifecycle_configuration(Bucket="bucket", LifecycleConfiguration=lfc)
result = client.get_bucket_lifecycle_configuration(Bucket="bucket")
assert not result["Rules"][0].get("Filter")
assert result["Rules"][0]["Prefix"] == "some/path"
@mock_s3
def test_lifecycle_with_eodm():
client = boto3.client("s3")
client.create_bucket(Bucket="bucket")
lfc = {
"Rules": [
{
"Expiration": {
"ExpiredObjectDeleteMarker": True
},
"ID": "wholebucket",
"Filter": {
"Prefix": ""
},
"Status": "Enabled"
}
]
}
client.put_bucket_lifecycle_configuration(Bucket="bucket", LifecycleConfiguration=lfc)
result = client.get_bucket_lifecycle_configuration(Bucket="bucket")
assert len(result["Rules"]) == 1
assert result["Rules"][0]["Expiration"]["ExpiredObjectDeleteMarker"]
# Set to False:
lfc["Rules"][0]["Expiration"]["ExpiredObjectDeleteMarker"] = False
client.put_bucket_lifecycle_configuration(Bucket="bucket", LifecycleConfiguration=lfc)
result = client.get_bucket_lifecycle_configuration(Bucket="bucket")
assert len(result["Rules"]) == 1
assert not result["Rules"][0]["Expiration"]["ExpiredObjectDeleteMarker"]
# With failure:
lfc["Rules"][0]["Expiration"]["Days"] = 7
with assert_raises(ClientError) as err:
client.put_bucket_lifecycle_configuration(Bucket="bucket", LifecycleConfiguration=lfc)
assert err.exception.response["Error"]["Code"] == "MalformedXML"
del lfc["Rules"][0]["Expiration"]["Days"]
lfc["Rules"][0]["Expiration"]["Date"] = datetime(2015, 1, 1)
with assert_raises(ClientError) as err:
client.put_bucket_lifecycle_configuration(Bucket="bucket", LifecycleConfiguration=lfc)
assert err.exception.response["Error"]["Code"] == "MalformedXML"
@mock_s3
def test_lifecycle_with_nve():
client = boto3.client("s3")
client.create_bucket(Bucket="bucket")
lfc = {
"Rules": [
{
"NoncurrentVersionExpiration": {
"NoncurrentDays": 30
},
"ID": "wholebucket",
"Filter": {
"Prefix": ""
},
"Status": "Enabled"
}
]
}
client.put_bucket_lifecycle_configuration(Bucket="bucket", LifecycleConfiguration=lfc)
result = client.get_bucket_lifecycle_configuration(Bucket="bucket")
assert len(result["Rules"]) == 1
assert result["Rules"][0]["NoncurrentVersionExpiration"]["NoncurrentDays"] == 30
# Change NoncurrentDays:
lfc["Rules"][0]["NoncurrentVersionExpiration"]["NoncurrentDays"] = 10
client.put_bucket_lifecycle_configuration(Bucket="bucket", LifecycleConfiguration=lfc)
result = client.get_bucket_lifecycle_configuration(Bucket="bucket")
assert len(result["Rules"]) == 1
assert result["Rules"][0]["NoncurrentVersionExpiration"]["NoncurrentDays"] == 10
# TODO: Add test for failures due to missing children
@mock_s3
def test_lifecycle_with_nvt():
client = boto3.client("s3")
client.create_bucket(Bucket="bucket")
lfc = {
"Rules": [
{
"NoncurrentVersionTransitions": [{
"NoncurrentDays": 30,
"StorageClass": "ONEZONE_IA"
}],
"ID": "wholebucket",
"Filter": {
"Prefix": ""
},
"Status": "Enabled"
}
]
}
client.put_bucket_lifecycle_configuration(Bucket="bucket", LifecycleConfiguration=lfc)
result = client.get_bucket_lifecycle_configuration(Bucket="bucket")
assert len(result["Rules"]) == 1
assert result["Rules"][0]["NoncurrentVersionTransitions"][0]["NoncurrentDays"] == 30
assert result["Rules"][0]["NoncurrentVersionTransitions"][0]["StorageClass"] == "ONEZONE_IA"
# Change NoncurrentDays:
lfc["Rules"][0]["NoncurrentVersionTransitions"][0]["NoncurrentDays"] = 10
client.put_bucket_lifecycle_configuration(Bucket="bucket", LifecycleConfiguration=lfc)
result = client.get_bucket_lifecycle_configuration(Bucket="bucket")
assert len(result["Rules"]) == 1
assert result["Rules"][0]["NoncurrentVersionTransitions"][0]["NoncurrentDays"] == 10
# Change StorageClass:
lfc["Rules"][0]["NoncurrentVersionTransitions"][0]["StorageClass"] = "GLACIER"
client.put_bucket_lifecycle_configuration(Bucket="bucket", LifecycleConfiguration=lfc)
result = client.get_bucket_lifecycle_configuration(Bucket="bucket")
assert len(result["Rules"]) == 1
assert result["Rules"][0]["NoncurrentVersionTransitions"][0]["StorageClass"] == "GLACIER"
# With failures for missing children:
del lfc["Rules"][0]["NoncurrentVersionTransitions"][0]["NoncurrentDays"]
with assert_raises(ClientError) as err:
client.put_bucket_lifecycle_configuration(Bucket="bucket", LifecycleConfiguration=lfc)
assert err.exception.response["Error"]["Code"] == "MalformedXML"
lfc["Rules"][0]["NoncurrentVersionTransitions"][0]["NoncurrentDays"] = 30
del lfc["Rules"][0]["NoncurrentVersionTransitions"][0]["StorageClass"]
with assert_raises(ClientError) as err:
client.put_bucket_lifecycle_configuration(Bucket="bucket", LifecycleConfiguration=lfc)
assert err.exception.response["Error"]["Code"] == "MalformedXML"
@mock_s3
def test_lifecycle_with_aimu():
client = boto3.client("s3")
client.create_bucket(Bucket="bucket")
lfc = {
"Rules": [
{
"AbortIncompleteMultipartUpload": {
"DaysAfterInitiation": 7
},
"ID": "wholebucket",
"Filter": {
"Prefix": ""
},
"Status": "Enabled"
}
]
}
client.put_bucket_lifecycle_configuration(Bucket="bucket", LifecycleConfiguration=lfc)
result = client.get_bucket_lifecycle_configuration(Bucket="bucket")
assert len(result["Rules"]) == 1
assert result["Rules"][0]["AbortIncompleteMultipartUpload"]["DaysAfterInitiation"] == 7
# Change DaysAfterInitiation:
lfc["Rules"][0]["AbortIncompleteMultipartUpload"]["DaysAfterInitiation"] = 30
client.put_bucket_lifecycle_configuration(Bucket="bucket", LifecycleConfiguration=lfc)
result = client.get_bucket_lifecycle_configuration(Bucket="bucket")
assert len(result["Rules"]) == 1
assert result["Rules"][0]["AbortIncompleteMultipartUpload"]["DaysAfterInitiation"] == 30
# TODO: Add test for failures due to missing children
@mock_s3_deprecated
def test_lifecycle_with_glacier_transition():
conn = boto.s3.connect_to_region("us-west-1")
bucket = conn.create_bucket("foobar")
lifecycle = Lifecycle()
transition = Transition(days=30, storage_class='GLACIER')
rule = Rule('myid', prefix='', status='Enabled', expiration=None,
transition=transition)
lifecycle.append(rule)
bucket.configure_lifecycle(lifecycle)
response = bucket.get_lifecycle_config()
transition = response[0].transition
transition.days.should.equal(30)
transition.storage_class.should.equal('GLACIER')
transition.date.should.equal(None)
@mock_s3_deprecated
def test_lifecycle_multi():
conn = boto.s3.connect_to_region("us-west-1")
bucket = conn.create_bucket("foobar")
date = '2022-10-12T00:00:00.000Z'
sc = 'GLACIER'
lifecycle = Lifecycle()
lifecycle.add_rule("1", "1/", "Enabled", 1)
lifecycle.add_rule("2", "2/", "Enabled", Expiration(days=2))
lifecycle.add_rule("3", "3/", "Enabled", Expiration(date=date))
lifecycle.add_rule("4", "4/", "Enabled", None,
Transition(days=4, storage_class=sc))
lifecycle.add_rule("5", "5/", "Enabled", None,
Transition(date=date, storage_class=sc))
bucket.configure_lifecycle(lifecycle)
# read the lifecycle back
rules = bucket.get_lifecycle_config()
for rule in rules:
if rule.id == "1":
rule.prefix.should.equal("1/")
rule.expiration.days.should.equal(1)
elif rule.id == "2":
rule.prefix.should.equal("2/")
rule.expiration.days.should.equal(2)
elif rule.id == "3":
rule.prefix.should.equal("3/")
rule.expiration.date.should.equal(date)
elif rule.id == "4":
rule.prefix.should.equal("4/")
rule.transition.days.should.equal(4)
rule.transition.storage_class.should.equal(sc)
elif rule.id == "5":
rule.prefix.should.equal("5/")
rule.transition.date.should.equal(date)
rule.transition.storage_class.should.equal(sc)
else:
assert False, "Invalid rule id"
@mock_s3_deprecated
def test_lifecycle_delete():
conn = boto.s3.connect_to_region("us-west-1")
bucket = conn.create_bucket("foobar")
lifecycle = Lifecycle()
lifecycle.add_rule(expiration=30)
bucket.configure_lifecycle(lifecycle)
response = bucket.get_lifecycle_config()
response.should.have.length_of(1)
bucket.delete_lifecycle_configuration()
bucket.get_lifecycle_config.when.called_with().should.throw(S3ResponseError)

View file

@ -1,106 +1,106 @@
from __future__ import unicode_literals
import boto
import boto3
from boto.exception import S3CreateError, S3ResponseError
from boto.s3.lifecycle import Lifecycle, Transition, Expiration, Rule
import sure # noqa
from botocore.exceptions import ClientError
from datetime import datetime
from nose.tools import assert_raises
from moto import mock_s3_deprecated, mock_s3
@mock_s3
def test_s3_storage_class_standard():
s3 = boto3.client("s3")
s3.create_bucket(Bucket="Bucket")
# add an object to the bucket with standard storage
s3.put_object(Bucket="Bucket", Key="my_key", Body="my_value")
list_of_objects = s3.list_objects(Bucket="Bucket")
list_of_objects['Contents'][0]["StorageClass"].should.equal("STANDARD")
@mock_s3
def test_s3_storage_class_infrequent_access():
s3 = boto3.client("s3")
s3.create_bucket(Bucket="Bucket")
# add an object to the bucket with standard storage
s3.put_object(Bucket="Bucket", Key="my_key_infrequent", Body="my_value_infrequent", StorageClass="STANDARD_IA")
D = s3.list_objects(Bucket="Bucket")
D['Contents'][0]["StorageClass"].should.equal("STANDARD_IA")
@mock_s3
def test_s3_storage_class_copy():
s3 = boto3.client("s3")
s3.create_bucket(Bucket="Bucket")
s3.put_object(Bucket="Bucket", Key="First_Object", Body="Body", StorageClass="STANDARD")
s3.create_bucket(Bucket="Bucket2")
# second object is originally of storage class REDUCED_REDUNDANCY
s3.put_object(Bucket="Bucket2", Key="Second_Object", Body="Body2")
s3.copy_object(CopySource = {"Bucket": "Bucket", "Key": "First_Object"}, Bucket="Bucket2", Key="Second_Object", StorageClass="ONEZONE_IA")
list_of_copied_objects = s3.list_objects(Bucket="Bucket2")
# checks that a copied object can be properly copied
list_of_copied_objects["Contents"][0]["StorageClass"].should.equal("ONEZONE_IA")
@mock_s3
def test_s3_invalid_copied_storage_class():
s3 = boto3.client("s3")
s3.create_bucket(Bucket="Bucket")
s3.put_object(Bucket="Bucket", Key="First_Object", Body="Body", StorageClass="STANDARD")
s3.create_bucket(Bucket="Bucket2")
s3.put_object(Bucket="Bucket2", Key="Second_Object", Body="Body2", StorageClass="REDUCED_REDUNDANCY")
# Try to copy an object with an invalid storage class
with assert_raises(ClientError) as err:
s3.copy_object(CopySource = {"Bucket": "Bucket", "Key": "First_Object"}, Bucket="Bucket2", Key="Second_Object", StorageClass="STANDARD2")
e = err.exception
e.response["Error"]["Code"].should.equal("InvalidStorageClass")
e.response["Error"]["Message"].should.equal("The storage class you specified is not valid")
@mock_s3
def test_s3_invalid_storage_class():
s3 = boto3.client("s3")
s3.create_bucket(Bucket="Bucket")
# Try to add an object with an invalid storage class
with assert_raises(ClientError) as err:
s3.put_object(Bucket="Bucket", Key="First_Object", Body="Body", StorageClass="STANDARDD")
e = err.exception
e.response["Error"]["Code"].should.equal("InvalidStorageClass")
e.response["Error"]["Message"].should.equal("The storage class you specified is not valid")
@mock_s3
def test_s3_default_storage_class():
s3 = boto3.client("s3")
s3.create_bucket(Bucket="Bucket")
s3.put_object(Bucket="Bucket", Key="First_Object", Body="Body")
list_of_objects = s3.list_objects(Bucket="Bucket")
# tests that the default storage class is still STANDARD
list_of_objects["Contents"][0]["StorageClass"].should.equal("STANDARD")
from __future__ import unicode_literals
import boto
import boto3
from boto.exception import S3CreateError, S3ResponseError
from boto.s3.lifecycle import Lifecycle, Transition, Expiration, Rule
import sure # noqa
from botocore.exceptions import ClientError
from datetime import datetime
from nose.tools import assert_raises
from moto import mock_s3_deprecated, mock_s3
@mock_s3
def test_s3_storage_class_standard():
s3 = boto3.client("s3")
s3.create_bucket(Bucket="Bucket")
# add an object to the bucket with standard storage
s3.put_object(Bucket="Bucket", Key="my_key", Body="my_value")
list_of_objects = s3.list_objects(Bucket="Bucket")
list_of_objects['Contents'][0]["StorageClass"].should.equal("STANDARD")
@mock_s3
def test_s3_storage_class_infrequent_access():
s3 = boto3.client("s3")
s3.create_bucket(Bucket="Bucket")
# add an object to the bucket with standard storage
s3.put_object(Bucket="Bucket", Key="my_key_infrequent", Body="my_value_infrequent", StorageClass="STANDARD_IA")
D = s3.list_objects(Bucket="Bucket")
D['Contents'][0]["StorageClass"].should.equal("STANDARD_IA")
@mock_s3
def test_s3_storage_class_copy():
s3 = boto3.client("s3")
s3.create_bucket(Bucket="Bucket")
s3.put_object(Bucket="Bucket", Key="First_Object", Body="Body", StorageClass="STANDARD")
s3.create_bucket(Bucket="Bucket2")
# second object is originally of storage class REDUCED_REDUNDANCY
s3.put_object(Bucket="Bucket2", Key="Second_Object", Body="Body2")
s3.copy_object(CopySource = {"Bucket": "Bucket", "Key": "First_Object"}, Bucket="Bucket2", Key="Second_Object", StorageClass="ONEZONE_IA")
list_of_copied_objects = s3.list_objects(Bucket="Bucket2")
# checks that a copied object can be properly copied
list_of_copied_objects["Contents"][0]["StorageClass"].should.equal("ONEZONE_IA")
@mock_s3
def test_s3_invalid_copied_storage_class():
s3 = boto3.client("s3")
s3.create_bucket(Bucket="Bucket")
s3.put_object(Bucket="Bucket", Key="First_Object", Body="Body", StorageClass="STANDARD")
s3.create_bucket(Bucket="Bucket2")
s3.put_object(Bucket="Bucket2", Key="Second_Object", Body="Body2", StorageClass="REDUCED_REDUNDANCY")
# Try to copy an object with an invalid storage class
with assert_raises(ClientError) as err:
s3.copy_object(CopySource = {"Bucket": "Bucket", "Key": "First_Object"}, Bucket="Bucket2", Key="Second_Object", StorageClass="STANDARD2")
e = err.exception
e.response["Error"]["Code"].should.equal("InvalidStorageClass")
e.response["Error"]["Message"].should.equal("The storage class you specified is not valid")
@mock_s3
def test_s3_invalid_storage_class():
s3 = boto3.client("s3")
s3.create_bucket(Bucket="Bucket")
# Try to add an object with an invalid storage class
with assert_raises(ClientError) as err:
s3.put_object(Bucket="Bucket", Key="First_Object", Body="Body", StorageClass="STANDARDD")
e = err.exception
e.response["Error"]["Code"].should.equal("InvalidStorageClass")
e.response["Error"]["Message"].should.equal("The storage class you specified is not valid")
@mock_s3
def test_s3_default_storage_class():
s3 = boto3.client("s3")
s3.create_bucket(Bucket="Bucket")
s3.put_object(Bucket="Bucket", Key="First_Object", Body="Body")
list_of_objects = s3.list_objects(Bucket="Bucket")
# tests that the default storage class is still STANDARD
list_of_objects["Contents"][0]["StorageClass"].should.equal("STANDARD")

View file

@ -1,80 +1,80 @@
from __future__ import unicode_literals
import os
from sure import expect
from moto.s3.utils import bucket_name_from_url, _VersionedKeyStore, parse_region_from_url
def test_base_url():
expect(bucket_name_from_url('https://s3.amazonaws.com/')).should.equal(None)
def test_localhost_bucket():
expect(bucket_name_from_url('https://wfoobar.localhost:5000/abc')
).should.equal("wfoobar")
def test_localhost_without_bucket():
expect(bucket_name_from_url(
'https://www.localhost:5000/def')).should.equal(None)
def test_force_ignore_subdomain_for_bucketnames():
os.environ['S3_IGNORE_SUBDOMAIN_BUCKETNAME'] = '1'
expect(bucket_name_from_url('https://subdomain.localhost:5000/abc/resource')).should.equal(None)
del(os.environ['S3_IGNORE_SUBDOMAIN_BUCKETNAME'])
def test_versioned_key_store():
d = _VersionedKeyStore()
d.should.have.length_of(0)
d['key'] = [1]
d.should.have.length_of(1)
d['key'] = 2
d.should.have.length_of(1)
d.should.have.key('key').being.equal(2)
d.get.when.called_with('key').should.return_value(2)
d.get.when.called_with('badkey').should.return_value(None)
d.get.when.called_with('badkey', 'HELLO').should.return_value('HELLO')
# Tests key[
d.shouldnt.have.key('badkey')
d.__getitem__.when.called_with('badkey').should.throw(KeyError)
d.getlist('key').should.have.length_of(2)
d.getlist('key').should.be.equal([[1], 2])
d.getlist('badkey').should.be.none
d.setlist('key', 1)
d.getlist('key').should.be.equal([1])
d.setlist('key', (1, 2))
d.getlist('key').shouldnt.be.equal((1, 2))
d.getlist('key').should.be.equal([1, 2])
d.setlist('key', [[1], [2]])
d['key'].should.have.length_of(1)
d.getlist('key').should.be.equal([[1], [2]])
def test_parse_region_from_url():
expected = 'us-west-2'
for url in ['http://s3-us-west-2.amazonaws.com/bucket',
'http://s3.us-west-2.amazonaws.com/bucket',
'http://bucket.s3-us-west-2.amazonaws.com',
'https://s3-us-west-2.amazonaws.com/bucket',
'https://s3.us-west-2.amazonaws.com/bucket',
'https://bucket.s3-us-west-2.amazonaws.com']:
parse_region_from_url(url).should.equal(expected)
expected = 'us-east-1'
for url in ['http://s3.amazonaws.com/bucket',
'http://bucket.s3.amazonaws.com',
'https://s3.amazonaws.com/bucket',
'https://bucket.s3.amazonaws.com']:
parse_region_from_url(url).should.equal(expected)
from __future__ import unicode_literals
import os
from sure import expect
from moto.s3.utils import bucket_name_from_url, _VersionedKeyStore, parse_region_from_url
def test_base_url():
expect(bucket_name_from_url('https://s3.amazonaws.com/')).should.equal(None)
def test_localhost_bucket():
expect(bucket_name_from_url('https://wfoobar.localhost:5000/abc')
).should.equal("wfoobar")
def test_localhost_without_bucket():
expect(bucket_name_from_url(
'https://www.localhost:5000/def')).should.equal(None)
def test_force_ignore_subdomain_for_bucketnames():
os.environ['S3_IGNORE_SUBDOMAIN_BUCKETNAME'] = '1'
expect(bucket_name_from_url('https://subdomain.localhost:5000/abc/resource')).should.equal(None)
del(os.environ['S3_IGNORE_SUBDOMAIN_BUCKETNAME'])
def test_versioned_key_store():
d = _VersionedKeyStore()
d.should.have.length_of(0)
d['key'] = [1]
d.should.have.length_of(1)
d['key'] = 2
d.should.have.length_of(1)
d.should.have.key('key').being.equal(2)
d.get.when.called_with('key').should.return_value(2)
d.get.when.called_with('badkey').should.return_value(None)
d.get.when.called_with('badkey', 'HELLO').should.return_value('HELLO')
# Tests key[
d.shouldnt.have.key('badkey')
d.__getitem__.when.called_with('badkey').should.throw(KeyError)
d.getlist('key').should.have.length_of(2)
d.getlist('key').should.be.equal([[1], 2])
d.getlist('badkey').should.be.none
d.setlist('key', 1)
d.getlist('key').should.be.equal([1])
d.setlist('key', (1, 2))
d.getlist('key').shouldnt.be.equal((1, 2))
d.getlist('key').should.be.equal([1, 2])
d.setlist('key', [[1], [2]])
d['key'].should.have.length_of(1)
d.getlist('key').should.be.equal([[1], [2]])
def test_parse_region_from_url():
expected = 'us-west-2'
for url in ['http://s3-us-west-2.amazonaws.com/bucket',
'http://s3.us-west-2.amazonaws.com/bucket',
'http://bucket.s3-us-west-2.amazonaws.com',
'https://s3-us-west-2.amazonaws.com/bucket',
'https://s3.us-west-2.amazonaws.com/bucket',
'https://bucket.s3-us-west-2.amazonaws.com']:
parse_region_from_url(url).should.equal(expected)
expected = 'us-east-1'
for url in ['http://s3.amazonaws.com/bucket',
'http://bucket.s3.amazonaws.com',
'https://s3.amazonaws.com/bucket',
'https://bucket.s3.amazonaws.com']:
parse_region_from_url(url).should.equal(expected)

View file

@ -1,105 +1,105 @@
# coding=utf-8
from __future__ import unicode_literals
import sure # noqa
from flask.testing import FlaskClient
import moto.server as server
'''
Test the different server responses
'''
class AuthenticatedClient(FlaskClient):
def open(self, *args, **kwargs):
kwargs['headers'] = kwargs.get('headers', {})
kwargs['headers']['Authorization'] = "Any authorization header"
return super(AuthenticatedClient, self).open(*args, **kwargs)
def authenticated_client():
backend = server.create_backend_app("s3")
backend.test_client_class = AuthenticatedClient
return backend.test_client()
def test_s3_server_get():
test_client = authenticated_client()
res = test_client.get('/')
res.data.should.contain(b'ListAllMyBucketsResult')
def test_s3_server_bucket_create():
test_client = authenticated_client()
res = test_client.put('/', 'http://foobaz.localhost:5000/')
res.status_code.should.equal(200)
res = test_client.get('/')
res.data.should.contain(b'<Name>foobaz</Name>')
res = test_client.get('/', 'http://foobaz.localhost:5000/')
res.status_code.should.equal(200)
res.data.should.contain(b"ListBucketResult")
res = test_client.put(
'/bar', 'http://foobaz.localhost:5000/', data='test value')
res.status_code.should.equal(200)
assert 'ETag' in dict(res.headers)
res = test_client.get('/bar', 'http://foobaz.localhost:5000/')
res.status_code.should.equal(200)
res.data.should.equal(b"test value")
def test_s3_server_bucket_versioning():
test_client = authenticated_client()
# Just enough XML to enable versioning
body = '<Status>Enabled</Status>'
res = test_client.put(
'/?versioning', 'http://foobaz.localhost:5000', data=body)
res.status_code.should.equal(200)
def test_s3_server_post_to_bucket():
test_client = authenticated_client()
res = test_client.put('/', 'http://tester.localhost:5000/')
res.status_code.should.equal(200)
test_client.post('/', "https://tester.localhost:5000/", data={
'key': 'the-key',
'file': 'nothing'
})
res = test_client.get('/the-key', 'http://tester.localhost:5000/')
res.status_code.should.equal(200)
res.data.should.equal(b"nothing")
def test_s3_server_post_without_content_length():
test_client = authenticated_client()
res = test_client.put('/', 'http://tester.localhost:5000/', environ_overrides={'CONTENT_LENGTH': ''})
res.status_code.should.equal(411)
res = test_client.post('/', "https://tester.localhost:5000/", environ_overrides={'CONTENT_LENGTH': ''})
res.status_code.should.equal(411)
def test_s3_server_post_unicode_bucket_key():
# Make sure that we can deal with non-ascii characters in request URLs (e.g., S3 object names)
dispatcher = server.DomainDispatcherApplication(server.create_backend_app)
backend_app = dispatcher.get_application({
'HTTP_HOST': 's3.amazonaws.com',
'PATH_INFO': '/test-bucket/test-object-てすと'
})
assert backend_app
backend_app = dispatcher.get_application({
'HTTP_HOST': 's3.amazonaws.com',
'PATH_INFO': '/test-bucket/test-object-てすと'.encode('utf-8')
})
assert backend_app
# coding=utf-8
from __future__ import unicode_literals
import sure # noqa
from flask.testing import FlaskClient
import moto.server as server
'''
Test the different server responses
'''
class AuthenticatedClient(FlaskClient):
def open(self, *args, **kwargs):
kwargs['headers'] = kwargs.get('headers', {})
kwargs['headers']['Authorization'] = "Any authorization header"
return super(AuthenticatedClient, self).open(*args, **kwargs)
def authenticated_client():
backend = server.create_backend_app("s3")
backend.test_client_class = AuthenticatedClient
return backend.test_client()
def test_s3_server_get():
test_client = authenticated_client()
res = test_client.get('/')
res.data.should.contain(b'ListAllMyBucketsResult')
def test_s3_server_bucket_create():
test_client = authenticated_client()
res = test_client.put('/', 'http://foobaz.localhost:5000/')
res.status_code.should.equal(200)
res = test_client.get('/')
res.data.should.contain(b'<Name>foobaz</Name>')
res = test_client.get('/', 'http://foobaz.localhost:5000/')
res.status_code.should.equal(200)
res.data.should.contain(b"ListBucketResult")
res = test_client.put(
'/bar', 'http://foobaz.localhost:5000/', data='test value')
res.status_code.should.equal(200)
assert 'ETag' in dict(res.headers)
res = test_client.get('/bar', 'http://foobaz.localhost:5000/')
res.status_code.should.equal(200)
res.data.should.equal(b"test value")
def test_s3_server_bucket_versioning():
test_client = authenticated_client()
# Just enough XML to enable versioning
body = '<Status>Enabled</Status>'
res = test_client.put(
'/?versioning', 'http://foobaz.localhost:5000', data=body)
res.status_code.should.equal(200)
def test_s3_server_post_to_bucket():
test_client = authenticated_client()
res = test_client.put('/', 'http://tester.localhost:5000/')
res.status_code.should.equal(200)
test_client.post('/', "https://tester.localhost:5000/", data={
'key': 'the-key',
'file': 'nothing'
})
res = test_client.get('/the-key', 'http://tester.localhost:5000/')
res.status_code.should.equal(200)
res.data.should.equal(b"nothing")
def test_s3_server_post_without_content_length():
test_client = authenticated_client()
res = test_client.put('/', 'http://tester.localhost:5000/', environ_overrides={'CONTENT_LENGTH': ''})
res.status_code.should.equal(411)
res = test_client.post('/', "https://tester.localhost:5000/", environ_overrides={'CONTENT_LENGTH': ''})
res.status_code.should.equal(411)
def test_s3_server_post_unicode_bucket_key():
# Make sure that we can deal with non-ascii characters in request URLs (e.g., S3 object names)
dispatcher = server.DomainDispatcherApplication(server.create_backend_app)
backend_app = dispatcher.get_application({
'HTTP_HOST': 's3.amazonaws.com',
'PATH_INFO': '/test-bucket/test-object-てすと'
})
assert backend_app
backend_app = dispatcher.get_application({
'HTTP_HOST': 's3.amazonaws.com',
'PATH_INFO': '/test-bucket/test-object-てすと'.encode('utf-8')
})
assert backend_app