from __future__ import unicode_literals
import re
import sys
import six
from botocore.awsrequest import AWSPreparedRequest
from moto.core.utils import str_to_rfc_1123_datetime, py2_strip_unicode_keys
from six.moves.urllib.parse import parse_qs, urlparse, unquote, parse_qsl
import xmltodict
from moto.packages.httpretty.core import HTTPrettyRequest
from moto.core.responses import _TemplateEnvironmentMixin, ActionAuthenticatorMixin
from moto.core.utils import path_url
from moto.core import ACCOUNT_ID
from moto.s3bucket_path.utils import (
bucket_name_from_url as bucketpath_bucket_name_from_url,
parse_key_name as bucketpath_parse_key_name,
is_delete_keys as bucketpath_is_delete_keys,
)
from .exceptions import (
BucketAlreadyExists,
DuplicateTagKeys,
S3ClientError,
MissingBucket,
MissingKey,
InvalidPartOrder,
MalformedXML,
MalformedACLError,
IllegalLocationConstraintException,
InvalidNotificationARN,
InvalidNotificationEvent,
ObjectNotInActiveTierError,
NoSystemTags,
)
from .models import (
s3_backend,
get_canned_acl,
FakeGrantee,
FakeGrant,
FakeAcl,
FakeKey,
)
from .utils import (
bucket_name_from_url,
clean_key_name,
undo_clean_key_name,
metadata_from_headers,
parse_region_from_url,
)
from xml.dom import minidom
DEFAULT_REGION_NAME = "us-east-1"
ACTION_MAP = {
"BUCKET": {
"GET": {
"uploads": "ListBucketMultipartUploads",
"location": "GetBucketLocation",
"lifecycle": "GetLifecycleConfiguration",
"versioning": "GetBucketVersioning",
"policy": "GetBucketPolicy",
"website": "GetBucketWebsite",
"acl": "GetBucketAcl",
"tagging": "GetBucketTagging",
"logging": "GetBucketLogging",
"cors": "GetBucketCORS",
"notification": "GetBucketNotification",
"accelerate": "GetAccelerateConfiguration",
"versions": "ListBucketVersions",
"public_access_block": "GetPublicAccessBlock",
"DEFAULT": "ListBucket",
},
"PUT": {
"lifecycle": "PutLifecycleConfiguration",
"versioning": "PutBucketVersioning",
"policy": "PutBucketPolicy",
"website": "PutBucketWebsite",
"acl": "PutBucketAcl",
"tagging": "PutBucketTagging",
"logging": "PutBucketLogging",
"cors": "PutBucketCORS",
"notification": "PutBucketNotification",
"accelerate": "PutAccelerateConfiguration",
"public_access_block": "PutPublicAccessBlock",
"DEFAULT": "CreateBucket",
},
"DELETE": {
"lifecycle": "PutLifecycleConfiguration",
"policy": "DeleteBucketPolicy",
"tagging": "PutBucketTagging",
"cors": "PutBucketCORS",
"public_access_block": "DeletePublicAccessBlock",
"DEFAULT": "DeleteBucket",
},
},
"KEY": {
"GET": {
"uploadId": "ListMultipartUploadParts",
"acl": "GetObjectAcl",
"tagging": "GetObjectTagging",
"versionId": "GetObjectVersion",
"DEFAULT": "GetObject",
},
"PUT": {
"acl": "PutObjectAcl",
"tagging": "PutObjectTagging",
"DEFAULT": "PutObject",
},
"DELETE": {
"uploadId": "AbortMultipartUpload",
"versionId": "DeleteObjectVersion",
"DEFAULT": " DeleteObject",
},
"POST": {
"uploads": "PutObject",
"restore": "RestoreObject",
"uploadId": "PutObject",
},
},
"CONTROL": {
"GET": {"publicAccessBlock": "GetPublicAccessBlock"},
"PUT": {"publicAccessBlock": "PutPublicAccessBlock"},
"DELETE": {"publicAccessBlock": "DeletePublicAccessBlock"},
},
}
def parse_key_name(pth):
# strip the first '/' left by urlparse
return pth[1:] if pth.startswith("/") else pth
def is_delete_keys(request, path, bucket_name):
# GOlang sends a request as url/?delete= (treating it as a normal key=value, even if the value is empty)
# Python sends a request as url/?delete (treating it as a flag)
# https://github.com/spulec/moto/issues/2937
return (
path == "/?delete"
or path == "/?delete="
or (path == "/" and getattr(request, "query_string", "") == "delete")
)
class ResponseObject(_TemplateEnvironmentMixin, ActionAuthenticatorMixin):
def __init__(self, backend):
super(ResponseObject, self).__init__()
self.backend = backend
self.method = ""
self.path = ""
self.data = {}
self.headers = {}
@property
def should_autoescape(self):
return True
def all_buckets(self):
self.data["Action"] = "ListAllMyBuckets"
self._authenticate_and_authorize_s3_action()
# No bucket specified. Listing all buckets
all_buckets = self.backend.get_all_buckets()
template = self.response_template(S3_ALL_BUCKETS)
return template.render(buckets=all_buckets)
def subdomain_based_buckets(self, request):
host = request.headers.get("host", request.headers.get("Host"))
if not host:
host = urlparse(request.url).netloc
if (
not host
or host.startswith("localhost")
or host.startswith("localstack")
or re.match(r"^[^.]+$", host)
or re.match(r"^.*\.svc\.cluster\.local:?\d*$", host)
):
# Default to path-based buckets for (1) localhost, (2) localstack hosts (e.g. localstack.dev),
# (3) local host names that do not contain a "." (e.g., Docker container host names), or
# (4) kubernetes host names
return False
match = re.match(r"^([^\[\]:]+)(:\d+)?$", host)
if match:
match = re.match(
r"((25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)(\.|$)){4}", match.groups()[0]
)
if match:
return False
match = re.match(r"^\[(.+)\](:\d+)?$", host)
if match:
match = re.match(
r"^(((?=.*(::))(?!.*\3.+\3))\3?|[\dA-F]{1,4}:)([\dA-F]{1,4}(\3|:\b)|\2){5}(([\dA-F]{1,4}(\3|:\b|$)|\2){2}|(((2[0-4]|1\d|[1-9])?\d|25[0-5])\.?\b){4})\Z",
match.groups()[0],
re.IGNORECASE,
)
if match:
return False
path_based = host == "s3.amazonaws.com" or re.match(
r"s3[\.\-]([^.]*)\.amazonaws\.com", host
)
return not path_based
def is_delete_keys(self, request, path, bucket_name):
if self.subdomain_based_buckets(request):
return is_delete_keys(request, path, bucket_name)
else:
return bucketpath_is_delete_keys(request, path, bucket_name)
def parse_bucket_name_from_url(self, request, url):
if self.subdomain_based_buckets(request):
return bucket_name_from_url(url)
else:
return bucketpath_bucket_name_from_url(url)
def parse_key_name(self, request, url):
if self.subdomain_based_buckets(request):
return parse_key_name(url)
else:
return bucketpath_parse_key_name(url)
def ambiguous_response(self, request, full_url, headers):
# Depending on which calling format the client is using, we don't know
# if this is a bucket or key request so we have to check
if self.subdomain_based_buckets(request):
return self.key_or_control_response(request, full_url, headers)
else:
# Using path-based buckets
return self.bucket_response(request, full_url, headers)
def bucket_response(self, request, full_url, headers):
self.method = request.method
self.path = self._get_path(request)
self.headers = request.headers
if "host" not in self.headers:
self.headers["host"] = urlparse(full_url).netloc
try:
response = self._bucket_response(request, full_url, headers)
except S3ClientError as s3error:
response = s3error.code, {}, s3error.description
return self._send_response(response)
@staticmethod
def _send_response(response):
if isinstance(response, six.string_types):
return 200, {}, response.encode("utf-8")
else:
status_code, headers, response_content = response
if not isinstance(response_content, six.binary_type):
response_content = response_content.encode("utf-8")
return status_code, headers, response_content
def _bucket_response(self, request, full_url, headers):
querystring = self._get_querystring(full_url)
method = request.method
region_name = parse_region_from_url(full_url)
bucket_name = self.parse_bucket_name_from_url(request, full_url)
if not bucket_name:
# If no bucket specified, list all buckets
return self.all_buckets()
self.data["BucketName"] = bucket_name
if hasattr(request, "body"):
# Boto
body = request.body
else:
# Flask server
body = request.data
if body is None:
body = b""
if isinstance(body, six.binary_type):
body = body.decode("utf-8")
body = "{0}".format(body).encode("utf-8")
if method == "HEAD":
return self._bucket_response_head(bucket_name)
elif method == "GET":
return self._bucket_response_get(bucket_name, querystring)
elif method == "PUT":
return self._bucket_response_put(
request, body, region_name, bucket_name, querystring
)
elif method == "DELETE":
return self._bucket_response_delete(body, bucket_name, querystring)
elif method == "POST":
return self._bucket_response_post(request, body, bucket_name)
else:
raise NotImplementedError(
"Method {0} has not been implemented in the S3 backend yet".format(
method
)
)
@staticmethod
def _get_querystring(full_url):
parsed_url = urlparse(full_url)
querystring = parse_qs(parsed_url.query, keep_blank_values=True)
return querystring
def _bucket_response_head(self, bucket_name):
try:
self.backend.get_bucket(bucket_name)
except MissingBucket:
# Unless we do this, boto3 does not raise ClientError on
# HEAD (which the real API responds with), and instead
# raises NoSuchBucket, leading to inconsistency in
# error response between real and mocked responses.
return 404, {}, ""
return 200, {}, ""
def _bucket_response_get(self, bucket_name, querystring):
self._set_action("BUCKET", "GET", querystring)
self._authenticate_and_authorize_s3_action()
if "uploads" in querystring:
for unsup in ("delimiter", "max-uploads"):
if unsup in querystring:
raise NotImplementedError(
"Listing multipart uploads with {} has not been implemented yet.".format(
unsup
)
)
multiparts = list(self.backend.get_all_multiparts(bucket_name).values())
if "prefix" in querystring:
prefix = querystring.get("prefix", [None])[0]
multiparts = [
upload
for upload in multiparts
if upload.key_name.startswith(prefix)
]
template = self.response_template(S3_ALL_MULTIPARTS)
return template.render(bucket_name=bucket_name, uploads=multiparts)
elif "location" in querystring:
bucket = self.backend.get_bucket(bucket_name)
template = self.response_template(S3_BUCKET_LOCATION)
location = bucket.location
# us-east-1 is different - returns a None location
if location == DEFAULT_REGION_NAME:
location = None
return template.render(location=location)
elif "lifecycle" in querystring:
bucket = self.backend.get_bucket(bucket_name)
if not bucket.rules:
template = self.response_template(S3_NO_LIFECYCLE)
return 404, {}, template.render(bucket_name=bucket_name)
template = self.response_template(S3_BUCKET_LIFECYCLE_CONFIGURATION)
return template.render(rules=bucket.rules)
elif "versioning" in querystring:
versioning = self.backend.get_bucket_versioning(bucket_name)
template = self.response_template(S3_BUCKET_GET_VERSIONING)
return template.render(status=versioning)
elif "policy" in querystring:
policy = self.backend.get_bucket_policy(bucket_name)
if not policy:
template = self.response_template(S3_NO_POLICY)
return 404, {}, template.render(bucket_name=bucket_name)
return 200, {}, policy
elif "website" in querystring:
website_configuration = self.backend.get_bucket_website_configuration(
bucket_name
)
if not website_configuration:
template = self.response_template(S3_NO_BUCKET_WEBSITE_CONFIG)
return 404, {}, template.render(bucket_name=bucket_name)
return 200, {}, website_configuration
elif "acl" in querystring:
bucket = self.backend.get_bucket(bucket_name)
template = self.response_template(S3_OBJECT_ACL_RESPONSE)
return template.render(obj=bucket)
elif "tagging" in querystring:
tags = self.backend.get_bucket_tags(bucket_name)["Tags"]
# "Special Error" if no tags:
if len(tags) == 0:
template = self.response_template(S3_NO_BUCKET_TAGGING)
return 404, {}, template.render(bucket_name=bucket_name)
template = self.response_template(S3_OBJECT_TAGGING_RESPONSE)
return template.render(tags=tags)
elif "logging" in querystring:
bucket = self.backend.get_bucket(bucket_name)
if not bucket.logging:
template = self.response_template(S3_NO_LOGGING_CONFIG)
return 200, {}, template.render()
template = self.response_template(S3_LOGGING_CONFIG)
return 200, {}, template.render(logging=bucket.logging)
elif "cors" in querystring:
bucket = self.backend.get_bucket(bucket_name)
if len(bucket.cors) == 0:
template = self.response_template(S3_NO_CORS_CONFIG)
return 404, {}, template.render(bucket_name=bucket_name)
template = self.response_template(S3_BUCKET_CORS_RESPONSE)
return template.render(bucket=bucket)
elif "notification" in querystring:
bucket = self.backend.get_bucket(bucket_name)
if not bucket.notification_configuration:
return 200, {}, ""
template = self.response_template(S3_GET_BUCKET_NOTIFICATION_CONFIG)
return template.render(bucket=bucket)
elif "accelerate" in querystring:
bucket = self.backend.get_bucket(bucket_name)
if bucket.accelerate_configuration is None:
template = self.response_template(S3_BUCKET_ACCELERATE_NOT_SET)
return 200, {}, template.render()
template = self.response_template(S3_BUCKET_ACCELERATE)
return template.render(bucket=bucket)
elif "publicAccessBlock" in querystring:
public_block_config = self.backend.get_bucket_public_access_block(
bucket_name
)
template = self.response_template(S3_PUBLIC_ACCESS_BLOCK_CONFIGURATION)
return template.render(public_block_config=public_block_config)
elif "versions" in querystring:
delimiter = querystring.get("delimiter", [None])[0]
encoding_type = querystring.get("encoding-type", [None])[0]
key_marker = querystring.get("key-marker", [None])[0]
max_keys = querystring.get("max-keys", [None])[0]
prefix = querystring.get("prefix", [""])[0]
version_id_marker = querystring.get("version-id-marker", [None])[0]
bucket = self.backend.get_bucket(bucket_name)
versions = self.backend.get_bucket_versions(
bucket_name,
delimiter=delimiter,
encoding_type=encoding_type,
key_marker=key_marker,
max_keys=max_keys,
version_id_marker=version_id_marker,
prefix=prefix,
)
latest_versions = self.backend.get_bucket_latest_versions(
bucket_name=bucket_name
)
key_list = []
delete_marker_list = []
for version in versions:
if isinstance(version, FakeKey):
key_list.append(version)
else:
delete_marker_list.append(version)
template = self.response_template(S3_BUCKET_GET_VERSIONS)
return (
200,
{},
template.render(
key_list=key_list,
delete_marker_list=delete_marker_list,
latest_versions=latest_versions,
bucket=bucket,
prefix="",
max_keys=1000,
delimiter="",
is_truncated="false",
),
)
elif "encryption" in querystring:
encryption = self.backend.get_bucket_encryption(bucket_name)
if not encryption:
template = self.response_template(S3_NO_ENCRYPTION)
return 404, {}, template.render(bucket_name=bucket_name)
template = self.response_template(S3_ENCRYPTION_CONFIG)
return 200, {}, template.render(encryption=encryption)
elif querystring.get("list-type", [None])[0] == "2":
return 200, {}, self._handle_list_objects_v2(bucket_name, querystring)
bucket = self.backend.get_bucket(bucket_name)
prefix = querystring.get("prefix", [None])[0]
if prefix and isinstance(prefix, six.binary_type):
prefix = prefix.decode("utf-8")
delimiter = querystring.get("delimiter", [None])[0]
max_keys = int(querystring.get("max-keys", [1000])[0])
marker = querystring.get("marker", [None])[0]
result_keys, result_folders = self.backend.prefix_query(
bucket, prefix, delimiter
)
if marker:
result_keys = self._get_results_from_token(result_keys, marker)
result_keys, is_truncated, next_marker = self._truncate_result(
result_keys, max_keys
)
template = self.response_template(S3_BUCKET_GET_RESPONSE)
return (
200,
{},
template.render(
bucket=bucket,
prefix=prefix,
delimiter=delimiter,
result_keys=result_keys,
result_folders=result_folders,
is_truncated=is_truncated,
next_marker=next_marker,
max_keys=max_keys,
),
)
def _set_action(self, action_resource_type, method, querystring):
action_set = False
for action_in_querystring, action in ACTION_MAP[action_resource_type][
method
].items():
if action_in_querystring in querystring:
self.data["Action"] = action
action_set = True
if not action_set:
self.data["Action"] = ACTION_MAP[action_resource_type][method]["DEFAULT"]
def _handle_list_objects_v2(self, bucket_name, querystring):
template = self.response_template(S3_BUCKET_GET_RESPONSE_V2)
bucket = self.backend.get_bucket(bucket_name)
prefix = querystring.get("prefix", [None])[0]
if prefix and isinstance(prefix, six.binary_type):
prefix = prefix.decode("utf-8")
delimiter = querystring.get("delimiter", [None])[0]
result_keys, result_folders = self.backend.prefix_query(
bucket, prefix, delimiter
)
fetch_owner = querystring.get("fetch-owner", [False])[0]
max_keys = int(querystring.get("max-keys", [1000])[0])
continuation_token = querystring.get("continuation-token", [None])[0]
start_after = querystring.get("start-after", [None])[0]
# sort the combination of folders and keys into lexicographical order
all_keys = result_keys + result_folders
all_keys.sort(key=self._get_name)
if continuation_token or start_after:
limit = continuation_token or start_after
all_keys = self._get_results_from_token(all_keys, limit)
truncated_keys, is_truncated, next_continuation_token = self._truncate_result(
all_keys, max_keys
)
result_keys, result_folders = self._split_truncated_keys(truncated_keys)
key_count = len(result_keys) + len(result_folders)
return template.render(
bucket=bucket,
prefix=prefix or "",
delimiter=delimiter,
key_count=key_count,
result_keys=result_keys,
result_folders=result_folders,
fetch_owner=fetch_owner,
max_keys=max_keys,
is_truncated=is_truncated,
next_continuation_token=next_continuation_token,
start_after=None if continuation_token else start_after,
)
@staticmethod
def _get_name(key):
if isinstance(key, FakeKey):
return key.name
else:
return key
@staticmethod
def _split_truncated_keys(truncated_keys):
result_keys = []
result_folders = []
for key in truncated_keys:
if isinstance(key, FakeKey):
result_keys.append(key)
else:
result_folders.append(key)
return result_keys, result_folders
def _get_results_from_token(self, result_keys, token):
continuation_index = 0
for key in result_keys:
if (key.name if isinstance(key, FakeKey) else key) > token:
break
continuation_index += 1
return result_keys[continuation_index:]
def _truncate_result(self, result_keys, max_keys):
if len(result_keys) > max_keys:
is_truncated = "true"
result_keys = result_keys[:max_keys]
item = result_keys[-1]
next_continuation_token = item.name if isinstance(item, FakeKey) else item
else:
is_truncated = "false"
next_continuation_token = None
return result_keys, is_truncated, next_continuation_token
def _body_contains_location_constraint(self, body):
if body:
try:
xmltodict.parse(body)["CreateBucketConfiguration"]["LocationConstraint"]
return True
except KeyError:
pass
return False
def _parse_pab_config(self, body):
parsed_xml = xmltodict.parse(body)
parsed_xml["PublicAccessBlockConfiguration"].pop("@xmlns", None)
# If Python 2, fix the unicode strings:
if sys.version_info[0] < 3:
parsed_xml = {
"PublicAccessBlockConfiguration": py2_strip_unicode_keys(
dict(parsed_xml["PublicAccessBlockConfiguration"])
)
}
return parsed_xml
def _bucket_response_put(
self, request, body, region_name, bucket_name, querystring
):
if not request.headers.get("Content-Length"):
return 411, {}, "Content-Length required"
self._set_action("BUCKET", "PUT", querystring)
self._authenticate_and_authorize_s3_action()
if "versioning" in querystring:
ver = re.search("([A-Za-z]+)", body.decode())
if ver:
self.backend.set_bucket_versioning(bucket_name, ver.group(1))
template = self.response_template(S3_BUCKET_VERSIONING)
return template.render(bucket_versioning_status=ver.group(1))
else:
return 404, {}, ""
elif "lifecycle" in querystring:
rules = xmltodict.parse(body)["LifecycleConfiguration"]["Rule"]
if not isinstance(rules, list):
# If there is only one rule, xmldict returns just the item
rules = [rules]
self.backend.set_bucket_lifecycle(bucket_name, rules)
return ""
elif "policy" in querystring:
self.backend.set_bucket_policy(bucket_name, body)
return "True"
elif "acl" in querystring:
# Headers are first. If not set, then look at the body (consistent with the documentation):
acls = self._acl_from_headers(request.headers)
if not acls:
acls = self._acl_from_xml(body)
self.backend.set_bucket_acl(bucket_name, acls)
return ""
elif "tagging" in querystring:
tagging = self._bucket_tagging_from_xml(body)
self.backend.put_bucket_tags(bucket_name, tagging)
return ""
elif "website" in querystring:
self.backend.set_bucket_website_configuration(bucket_name, body)
return ""
elif "cors" in querystring:
try:
self.backend.put_bucket_cors(bucket_name, self._cors_from_xml(body))
return ""
except KeyError:
raise MalformedXML()
elif "logging" in querystring:
try:
self.backend.put_bucket_logging(
bucket_name, self._logging_from_xml(body)
)
return ""
except KeyError:
raise MalformedXML()
elif "notification" in querystring:
try:
self.backend.put_bucket_notification_configuration(
bucket_name, self._notification_config_from_xml(body)
)
return ""
except KeyError:
raise MalformedXML()
except Exception as e:
raise e
elif "accelerate" in querystring:
try:
accelerate_status = self._accelerate_config_from_xml(body)
self.backend.put_bucket_accelerate_configuration(
bucket_name, accelerate_status
)
return ""
except KeyError:
raise MalformedXML()
except Exception as e:
raise e
elif "publicAccessBlock" in querystring:
pab_config = self._parse_pab_config(body)
self.backend.put_bucket_public_access_block(
bucket_name, pab_config["PublicAccessBlockConfiguration"]
)
return ""
elif "encryption" in querystring:
try:
self.backend.put_bucket_encryption(
bucket_name, self._encryption_config_from_xml(body)
)
return ""
except KeyError:
raise MalformedXML()
except Exception as e:
raise e
else:
# us-east-1, the default AWS region behaves a bit differently
# - you should not use it as a location constraint --> it fails
# - querying the location constraint returns None
# - LocationConstraint has to be specified if outside us-east-1
if (
region_name != DEFAULT_REGION_NAME
and not self._body_contains_location_constraint(body)
):
raise IllegalLocationConstraintException()
if body:
try:
forced_region = xmltodict.parse(body)["CreateBucketConfiguration"][
"LocationConstraint"
]
if forced_region == DEFAULT_REGION_NAME:
raise S3ClientError(
"InvalidLocationConstraint",
"The specified location-constraint is not valid",
)
else:
region_name = forced_region
except KeyError:
pass
try:
new_bucket = self.backend.create_bucket(bucket_name, region_name)
except BucketAlreadyExists:
if region_name == DEFAULT_REGION_NAME:
# us-east-1 has different behavior
new_bucket = self.backend.get_bucket(bucket_name)
else:
raise
if "x-amz-acl" in request.headers:
# TODO: Support the XML-based ACL format
self.backend.set_bucket_acl(
bucket_name, self._acl_from_headers(request.headers)
)
template = self.response_template(S3_BUCKET_CREATE_RESPONSE)
return 200, {}, template.render(bucket=new_bucket)
def _bucket_response_delete(self, body, bucket_name, querystring):
self._set_action("BUCKET", "DELETE", querystring)
self._authenticate_and_authorize_s3_action()
if "policy" in querystring:
self.backend.delete_bucket_policy(bucket_name, body)
return 204, {}, ""
elif "tagging" in querystring:
self.backend.delete_bucket_tagging(bucket_name)
return 204, {}, ""
elif "cors" in querystring:
self.backend.delete_bucket_cors(bucket_name)
return 204, {}, ""
elif "lifecycle" in querystring:
bucket = self.backend.get_bucket(bucket_name)
bucket.delete_lifecycle()
return 204, {}, ""
elif "publicAccessBlock" in querystring:
self.backend.delete_bucket_public_access_block(bucket_name)
return 204, {}, ""
elif "encryption" in querystring:
bucket = self.backend.delete_bucket_encryption(bucket_name)
return 204, {}, ""
removed_bucket = self.backend.delete_bucket(bucket_name)
if removed_bucket:
# Bucket exists
template = self.response_template(S3_DELETE_BUCKET_SUCCESS)
return 204, {}, template.render(bucket=removed_bucket)
else:
# Tried to delete a bucket that still has keys
template = self.response_template(S3_DELETE_BUCKET_WITH_ITEMS_ERROR)
return 409, {}, template.render(bucket=removed_bucket)
def _bucket_response_post(self, request, body, bucket_name):
response_headers = {}
if not request.headers.get("Content-Length"):
return 411, {}, "Content-Length required"
path = self._get_path(request)
if self.is_delete_keys(request, path, bucket_name):
self.data["Action"] = "DeleteObject"
self._authenticate_and_authorize_s3_action()
return self._bucket_response_delete_keys(request, body, bucket_name)
self.data["Action"] = "PutObject"
self._authenticate_and_authorize_s3_action()
# POST to bucket-url should create file from form
if hasattr(request, "form"):
# Not HTTPretty
form = request.form
else:
# HTTPretty, build new form object
body = body.decode()
form = dict(parse_qsl(body))
key = form["key"]
if "file" in form:
f = form["file"]
else:
f = request.files["file"].stream.read()
if "success_action_redirect" in form:
response_headers["Location"] = form["success_action_redirect"]
if "success_action_status" in form:
status_code = form["success_action_status"]
elif "success_action_redirect" in form:
status_code = 303
else:
status_code = 204
new_key = self.backend.set_key(bucket_name, key, f)
# Metadata
metadata = metadata_from_headers(form)
new_key.set_metadata(metadata)
return status_code, response_headers, ""
@staticmethod
def _get_path(request):
if isinstance(request, HTTPrettyRequest):
path = request.path
else:
path = (
request.full_path
if hasattr(request, "full_path")
else path_url(request.url)
)
return path
def _bucket_response_delete_keys(self, request, body, bucket_name):
template = self.response_template(S3_DELETE_KEYS_RESPONSE)
body_dict = xmltodict.parse(body)
objects = body_dict["Delete"].get("Object", [])
if not isinstance(objects, list):
# We expect a list of objects, but when there is a single