Use TaggingService for S3 - Cleanup

This commit is contained in:
Bert Blommers 2020-04-01 15:35:25 +01:00
commit 8dbfd43c5c
4 changed files with 40 additions and 92 deletions

View file

@ -24,6 +24,7 @@ from moto.s3bucket_path.utils import (
from .exceptions import (
BucketAlreadyExists,
DuplicateTagKeys,
S3ClientError,
MissingBucket,
MissingKey,
@ -43,9 +44,6 @@ from .models import (
FakeGrant,
FakeAcl,
FakeKey,
FakeTagging,
FakeTagSet,
FakeTag,
)
from .utils import (
bucket_name_from_url,
@ -652,7 +650,7 @@ class ResponseObject(_TemplateEnvironmentMixin, ActionAuthenticatorMixin):
return ""
elif "tagging" in querystring:
tagging = self._bucket_tagging_from_xml(body)
self.backend.put_bucket_tagging(bucket_name, tagging)
self.backend.put_bucket_tags(bucket_name, tagging)
return ""
elif "website" in querystring:
self.backend.set_bucket_website_configuration(bucket_name, body)
@ -1361,55 +1359,45 @@ class ResponseObject(_TemplateEnvironmentMixin, ActionAuthenticatorMixin):
return None
def _tagging_from_headers(self, headers):
tags = {}
if headers.get("x-amz-tagging"):
parsed_header = parse_qs(headers["x-amz-tagging"], keep_blank_values=True)
tags = []
for tag in parsed_header.items():
tags.append(FakeTag(tag[0], tag[1][0]))
tag_set = FakeTagSet(tags)
tagging = FakeTagging(tag_set)
return tagging
else:
return FakeTagging()
tags[tag[0]] = tag[1][0]
return tags
def _tagging_from_xml(self, xml):
parsed_xml = xmltodict.parse(xml, force_list={"Tag": True})
tags = []
tags = {}
for tag in parsed_xml["Tagging"]["TagSet"]["Tag"]:
tags.append(FakeTag(tag["Key"], tag["Value"]))
tags[tag["Key"]] = tag["Value"]
tag_set = FakeTagSet(tags)
tagging = FakeTagging(tag_set)
return tagging
return tags
def _bucket_tagging_from_xml(self, xml):
parsed_xml = xmltodict.parse(xml)
tags = []
tags = {}
# Optional if no tags are being sent:
if parsed_xml["Tagging"].get("TagSet"):
# If there is only 1 tag, then it's not a list:
if not isinstance(parsed_xml["Tagging"]["TagSet"]["Tag"], list):
tags.append(
FakeTag(
parsed_xml["Tagging"]["TagSet"]["Tag"]["Key"],
parsed_xml["Tagging"]["TagSet"]["Tag"]["Value"],
)
)
tags[parsed_xml["Tagging"]["TagSet"]["Tag"]["Key"]] = parsed_xml[
"Tagging"
]["TagSet"]["Tag"]["Value"]
else:
for tag in parsed_xml["Tagging"]["TagSet"]["Tag"]:
tags.append(FakeTag(tag["Key"], tag["Value"]))
if tag["Key"] in tags:
raise DuplicateTagKeys()
tags[tag["Key"]] = tag["Value"]
# Verify that "aws:" is not in the tags. If so, then this is a problem:
for tag in tags:
if tag.key.startswith("aws:"):
for key, _ in tags.items():
if key.startswith("aws:"):
raise NoSystemTags()
tag_set = FakeTagSet(tags)
tagging = FakeTagging(tag_set)
return tagging
return tags
def _cors_from_xml(self, xml):
parsed_xml = xmltodict.parse(xml)
@ -1730,10 +1718,10 @@ S3_BUCKET_LIFECYCLE_CONFIGURATION = """<?xml version="1.0" encoding="UTF-8"?>
{% if rule.filter.prefix != None %}
<Prefix>{{ rule.filter.prefix }}</Prefix>
{% endif %}
{% if rule.filter.tag %}
{% if rule.filter.tag_key %}
<Tag>
<Key>{{ rule.filter.tag.key }}</Key>
<Value>{{ rule.filter.tag.value }}</Value>
<Key>{{ rule.filter.tag_key }}</Key>
<Value>{{ rule.filter.tag_value }}</Value>
</Tag>
{% endif %}
{% if rule.filter.and_filter %}
@ -1741,10 +1729,10 @@ S3_BUCKET_LIFECYCLE_CONFIGURATION = """<?xml version="1.0" encoding="UTF-8"?>
{% if rule.filter.and_filter.prefix != None %}
<Prefix>{{ rule.filter.and_filter.prefix }}</Prefix>
{% endif %}
{% for tag in rule.filter.and_filter.tags %}
{% for key, value in rule.filter.and_filter.tags.items() %}
<Tag>
<Key>{{ tag.key }}</Key>
<Value>{{ tag.value }}</Value>
<Key>{{ key }}</Key>
<Value>{{ value }}</Value>
</Tag>
{% endfor %}
</And>