implement s3 object tagging

This commit is contained in:
eric-weaver 2017-07-15 22:36:12 -04:00
commit abf3078c28
4 changed files with 175 additions and 2 deletions

View file

@ -14,7 +14,7 @@ from moto.s3bucket_path.utils import bucket_name_from_url as bucketpath_bucket_n
from .exceptions import BucketAlreadyExists, S3ClientError, MissingKey, InvalidPartOrder
from .models import s3_backend, get_canned_acl, FakeGrantee, FakeGrant, FakeAcl, FakeKey
from .models import s3_backend, get_canned_acl, FakeGrantee, FakeGrant, FakeAcl, FakeKey, FakeTagging, FakeTagSet, FakeTag
from .utils import bucket_name_from_url, metadata_from_headers
from xml.dom import minidom
@ -520,6 +520,9 @@ class ResponseObject(_TemplateEnvironmentMixin):
if 'acl' in query:
template = self.response_template(S3_OBJECT_ACL_RESPONSE)
return 200, response_headers, template.render(obj=key)
if 'tagging' in query:
template = self.response_template(S3_OBJECT_TAGGING_RESPONSE)
return 200, response_headers, template.render(obj=key)
response_headers.update(key.metadata)
response_headers.update(key.response_dict)
@ -556,6 +559,7 @@ class ResponseObject(_TemplateEnvironmentMixin):
storage_class = request.headers.get('x-amz-storage-class', 'STANDARD')
acl = self._acl_from_headers(request.headers)
tagging = self._tagging_from_headers(request.headers)
if 'acl' in query:
key = self.backend.get_key(bucket_name, key_name)
@ -563,6 +567,11 @@ class ResponseObject(_TemplateEnvironmentMixin):
key.set_acl(acl)
return 200, response_headers, ""
if 'tagging' in query:
tagging = self._tagging_from_xml(body)
self.backend.set_key_tagging(bucket_name, key_name, tagging)
return 200, response_headers, ""
if 'x-amz-copy-source' in request.headers:
# Copy key
src_key_parsed = urlparse(request.headers.get("x-amz-copy-source"))
@ -596,6 +605,7 @@ class ResponseObject(_TemplateEnvironmentMixin):
new_key.set_metadata(metadata)
new_key.set_acl(acl)
new_key.website_redirect_location = request.headers.get('x-amz-website-redirect-location')
new_key.set_tagging(tagging)
template = self.response_template(S3_OBJECT_RESPONSE)
response_headers.update(new_key.response_dict)
@ -655,6 +665,30 @@ class ResponseObject(_TemplateEnvironmentMixin):
else:
return None
def _tagging_from_headers(self, headers):
if headers.get('x-amz-tagging'):
parsed_header = parse_qs(headers['x-amz-tagging'], keep_blank_values=True)
tags = []
for tag in parsed_header.items():
tags.append(FakeTag(tag[0], tag[1][0]))
tag_set = FakeTagSet(tags)
tagging = FakeTagging(tag_set)
return tagging
else:
return FakeTagging()
def _tagging_from_xml(self, xml):
parsed_xml = xmltodict.parse(xml)
tags = []
for tag in parsed_xml['Tagging']['TagSet']['Tag']:
tags.append(FakeTag(tag['Key'], tag['Value']))
tag_set = FakeTagSet(tags)
tagging = FakeTagging(tag_set)
return tagging
def _key_response_delete(self, bucket_name, query, key_name, headers):
if query.get('uploadId'):
upload_id = query['uploadId'][0]
@ -968,6 +1002,19 @@ S3_OBJECT_ACL_RESPONSE = """<?xml version="1.0" encoding="UTF-8"?>
</AccessControlList>
</AccessControlPolicy>"""
S3_OBJECT_TAGGING_RESPONSE = """\
<?xml version="1.0" encoding="UTF-8"?>
<Tagging xmlns="http://s3.amazonaws.com/doc/2006-03-01/">
<TagSet>
{% for tag in obj.tagging.tag_set.tags %}
<Tag>
<Key>{{ tag.key }}</Key>
<Value>{{ tag.value }}</Value>
</Tag>
{% endfor %}
</TagSet>
</Tagging>"""
S3_OBJECT_COPY_RESPONSE = """\
<CopyObjectResult xmlns="http://doc.s3.amazonaws.com/2006-03-01">
<ETag>{{ key.etag }}</ETag>