Use TaggingService for S3 Objects

This commit is contained in:
Bert Blommers 2020-03-31 12:04:04 +01:00
commit f7ad4cbc09
5 changed files with 71 additions and 33 deletions

View file

@ -3255,7 +3255,8 @@ def test_boto3_put_object_tagging_on_earliest_version():
# Older version has tags while the most recent does not
resp = s3.get_object_tagging(Bucket=bucket_name, Key=key, VersionId=first_object.id)
resp["ResponseMetadata"]["HTTPStatusCode"].should.equal(200)
resp["TagSet"].should.equal(
sorted_tagset = sorted(resp["TagSet"], key=lambda t: t["Key"])
sorted_tagset.should.equal(
[{"Key": "item1", "Value": "foo"}, {"Key": "item2", "Value": "bar"}]
)
@ -3333,7 +3334,8 @@ def test_boto3_put_object_tagging_on_both_version():
resp = s3.get_object_tagging(Bucket=bucket_name, Key=key, VersionId=first_object.id)
resp["ResponseMetadata"]["HTTPStatusCode"].should.equal(200)
resp["TagSet"].should.equal(
sorted_tagset = sorted(resp["TagSet"], key=lambda t: t["Key"])
sorted_tagset.should.equal(
[{"Key": "item1", "Value": "foo"}, {"Key": "item2", "Value": "bar"}]
)
@ -3341,7 +3343,8 @@ def test_boto3_put_object_tagging_on_both_version():
Bucket=bucket_name, Key=key, VersionId=second_object.id
)
resp["ResponseMetadata"]["HTTPStatusCode"].should.equal(200)
resp["TagSet"].should.equal(
sorted_tagset = sorted(resp["TagSet"], key=lambda t: t["Key"])
sorted_tagset.should.equal(
[{"Key": "item1", "Value": "baz"}, {"Key": "item2", "Value": "bin"}]
)

View file

@ -77,3 +77,34 @@ def test_extract_tag_names():
expected = ["key1", "key2"]
expected.should.be.equal(actual)
def test_copy_non_existing_arn():
svc = TaggingService()
tags = [{"Key": "key1", "Value": "value1"}, {"Key": "key2", "Value": "value2"}]
svc.tag_resource("new_arn", tags)
#
svc.copy_tags("non_existing_arn", "new_arn")
# Copying from a non-existing ARN should a NOOP
# Assert the old tags still exist
actual = sorted(
svc.list_tags_for_resource("new_arn")["Tags"], key=lambda t: t["Key"]
)
actual.should.equal(tags)
def test_copy_existing_arn():
svc = TaggingService()
tags_old_arn = [{"Key": "key1", "Value": "value1"}]
tags_new_arn = [{"Key": "key2", "Value": "value2"}]
svc.tag_resource("old_arn", tags_old_arn)
svc.tag_resource("new_arn", tags_new_arn)
#
svc.copy_tags("old_arn", "new_arn")
# Assert the old tags still exist
actual = sorted(
svc.list_tags_for_resource("new_arn")["Tags"], key=lambda t: t["Key"]
)
actual.should.equal(
[{"Key": "key1", "Value": "value1"}, {"Key": "key2", "Value": "value2"}]
)