Merge remote-tracking branch 'upstream/master' into fix-invalid-yaml-parsererror

This commit is contained in:
Shane 2020-07-03 10:57:57 +01:00
commit aab0e2ffa2
No known key found for this signature in database
GPG key ID: 439BE3F7ABFB2A56
3 changed files with 59 additions and 3 deletions

View file

@ -10,6 +10,7 @@ import six
import types
from io import BytesIO
from collections import defaultdict
from botocore.config import Config
from botocore.handlers import BUILTIN_HANDLERS
from botocore.awsrequest import AWSResponse
from six.moves.urllib.parse import urlparse
@ -416,6 +417,13 @@ class ServerModeMockAWS(BaseMockAWS):
import mock
def fake_boto3_client(*args, **kwargs):
region = self._get_region(*args, **kwargs)
if region:
if "config" in kwargs:
kwargs["config"].__dict__["user_agent_extra"] += " region/" + region
else:
config = Config(user_agent_extra="region/" + region)
kwargs["config"] = config
if "endpoint_url" not in kwargs:
kwargs["endpoint_url"] = "http://localhost:5000"
return real_boto3_client(*args, **kwargs)
@ -463,6 +471,14 @@ class ServerModeMockAWS(BaseMockAWS):
if six.PY2:
self._httplib_patcher.start()
def _get_region(self, *args, **kwargs):
if "region_name" in kwargs:
return kwargs["region_name"]
if type(args) == tuple and len(args) == 2:
service, region = args
return region
return None
def disable_patching(self):
if self._client_patcher:
self._client_patcher.stop()

View file

@ -188,6 +188,9 @@ class BaseResponse(_TemplateEnvironmentMixin, ActionAuthenticatorMixin):
default_region = "us-east-1"
# to extract region, use [^.]
region_regex = re.compile(r"\.(?P<region>[a-z]{2}-[a-z]+-\d{1})\.amazonaws\.com")
region_from_useragent_regex = re.compile(
r"region/(?P<region>[a-z]{2}-[a-z]+-\d{1})"
)
param_list_regex = re.compile(r"(.*)\.(\d+)\.")
access_key_regex = re.compile(
r"AWS.*(?P<access_key>(?<![A-Z0-9])[A-Z0-9]{20}(?![A-Z0-9]))[:/]"
@ -272,9 +275,14 @@ class BaseResponse(_TemplateEnvironmentMixin, ActionAuthenticatorMixin):
self.response_headers = {"server": "amazon.com"}
def get_region_from_url(self, request, full_url):
match = self.region_regex.search(full_url)
if match:
region = match.group(1)
url_match = self.region_regex.search(full_url)
user_agent_match = self.region_from_useragent_regex.search(
request.headers.get("User-Agent", "")
)
if url_match:
region = url_match.group(1)
elif user_agent_match:
region = user_agent_match.group(1)
elif (
"Authorization" in request.headers
and "AWS4" in request.headers["Authorization"]