from jinja2 import Template
from .models import s3_backend
from moto.core.utils import headers_to_dict
from .utils import bucket_name_from_hostname
def all_buckets(uri, body, method):
# No bucket specified. Listing all buckets
all_buckets = s3_backend.get_all_buckets()
template = Template(S3_ALL_BUCKETS)
return template.render(buckets=all_buckets)
def bucket_response(uri, body, headers):
hostname = uri.hostname
method = uri.method
bucket_name = bucket_name_from_hostname(hostname)
if method == 'GET':
bucket = s3_backend.get_bucket(bucket_name)
if bucket:
template = Template(S3_BUCKET_GET_RESPONSE)
return template.render(bucket=bucket)
else:
return "", dict(status=404)
elif method == 'PUT':
new_bucket = s3_backend.create_bucket(bucket_name)
template = Template(S3_BUCKET_CREATE_RESPONSE)
return template.render(bucket=new_bucket)
elif method == 'DELETE':
removed_bucket = s3_backend.delete_bucket(bucket_name)
if removed_bucket is None:
# Non-existant bucket
template = Template(S3_DELETE_NON_EXISTING_BUCKET)
return template.render(bucket_name=bucket_name), dict(status=404)
elif removed_bucket:
# Bucket exists
template = Template(S3_DELETE_BUCKET_SUCCESS)
return template.render(bucket=removed_bucket), dict(status=204)
else:
# Tried to delete a bucket that still has keys
template = Template(S3_DELETE_BUCKET_WITH_ITEMS_ERROR)
return template.render(bucket=removed_bucket), dict(status=409)
else:
raise NotImplementedError("Method {} has not been impelemented in the S3 backend yet".format(method))
def key_response(uri_info, body, headers):
key_name = uri_info.path.lstrip('/')
hostname = uri_info.hostname
method = uri_info.method
headers = headers_to_dict(headers)
bucket_name = bucket_name_from_hostname(hostname)
if method == 'GET':
key = s3_backend.get_key(bucket_name, key_name)
return key.value
if method == 'PUT':
if 'x-amz-copy-source' in headers:
# Copy key
src_bucket, src_key = headers.get("x-amz-copy-source").split("/")
s3_backend.copy_key(src_bucket, src_key, bucket_name, key_name)
return S3_OBJECT_COPY_RESPONSE
if body:
new_key = s3_backend.set_key(bucket_name, key_name, body)
return S3_OBJECT_RESPONSE, dict(etag=new_key.etag)
key = s3_backend.get_key(bucket_name, key_name)
if key:
return "", dict(etag=key.etag)
else:
return ""
elif method == 'HEAD':
key = s3_backend.get_key(bucket_name, key_name)
if key:
return S3_OBJECT_RESPONSE, dict(etag=key.etag)
else:
return "", dict(status=404)
elif method == 'DELETE':
removed_key = s3_backend.delete_key(bucket_name, key_name)
template = Template(S3_DELETE_OBJECT_SUCCESS)
return template.render(bucket=removed_key), dict(status=204)
else:
raise NotImplementedError("Method {} has not been impelemented in the S3 backend yet".format(method))
S3_ALL_BUCKETS = """
bcaf1ffd86f41161ca5fb16fd081034f
webfile
{% for bucket in buckets %}
{{ bucket.name }}
2006-02-03T16:45:09.000Z
{% endfor %}
"""
S3_BUCKET_GET_RESPONSE = """\
{{ bucket.name }}\
notes/\
/\
1000\
AKIAIOSFODNN7EXAMPLE\
2006-03-01T12:00:00.183Z\
Iuyz3d3P0aTou39dzbqaEXAMPLE=\
"""
S3_BUCKET_CREATE_RESPONSE = """
{{ bucket.name }}
"""
S3_DELETE_BUCKET_SUCCESS = """
204
No Content
"""
S3_DELETE_NON_EXISTING_BUCKET = """
NoSuchBucket
The specified bucket does not exist
{{ bucket_name }}
asdfasdfsadf
asfasdfsfsafasdf
"""
S3_DELETE_BUCKET_WITH_ITEMS_ERROR = """
BucketNotEmpty
The bucket you tried to delete is not empty
{{ bucket.name }}
asdfasdfsdafds
sdfgdsfgdsfgdfsdsfgdfs
"""
S3_DELETE_OBJECT_SUCCESS = """
200
OK
"""
S3_OBJECT_RESPONSE = """
"asdlfkdalsjfsalfkjsadlfjsdjkk"
2006-03-01T12:00:00.183Z
"""
S3_OBJECT_COPY_RESPONSE = """
"asdfadsfdsafjsadfdafsadf"
2008-02-18T13:54:10.183Z
"""