Merge https://github.com/spulec/moto into support_optin_regions

This commit is contained in:
Matthew Gladney 2020-04-27 18:27:40 -04:00
commit ff1beea280
60 changed files with 4652 additions and 515 deletions

View file

@ -69,6 +69,22 @@ def test_create_rest_api_with_tags():
response["tags"].should.equal({"MY_TAG1": "MY_VALUE1"})
@mock_apigateway
def test_create_rest_api_with_policy():
client = boto3.client("apigateway", region_name="us-west-2")
policy = '{"Version": "2012-10-17","Statement": []}'
response = client.create_rest_api(
name="my_api", description="this is my api", policy=policy
)
api_id = response["id"]
response = client.get_rest_api(restApiId=api_id)
assert "policy" in response
response["policy"].should.equal(policy)
@mock_apigateway
def test_create_rest_api_invalid_apikeysource():
client = boto3.client("apigateway", region_name="us-west-2")

View file

@ -1677,6 +1677,42 @@ def test_create_function_with_unknown_arn():
)
@mock_lambda
def test_remove_function_permission():
conn = boto3.client("lambda", _lambda_region)
zip_content = get_test_zip_file1()
conn.create_function(
FunctionName="testFunction",
Runtime="python2.7",
Role=(get_role_name()),
Handler="lambda_function.handler",
Code={"ZipFile": zip_content},
Description="test lambda function",
Timeout=3,
MemorySize=128,
Publish=True,
)
conn.add_permission(
FunctionName="testFunction",
StatementId="1",
Action="lambda:InvokeFunction",
Principal="432143214321",
SourceArn="arn:aws:lambda:us-west-2:account-id:function:helloworld",
SourceAccount="123412341234",
EventSourceToken="blah",
Qualifier="2",
)
remove = conn.remove_permission(
FunctionName="testFunction", StatementId="1", Qualifier="2",
)
remove["ResponseMetadata"]["HTTPStatusCode"].should.equal(204)
policy = conn.get_policy(FunctionName="testFunction", Qualifier="2")["Policy"]
policy = json.loads(policy)
policy["Statement"].should.equal([])
def create_invalid_lambda(role):
conn = boto3.client("lambda", _lambda_region)
zip_content = get_test_zip_file1()

View file

@ -835,8 +835,10 @@ def test_describe_change_set():
)
stack = cf_conn.describe_change_set(ChangeSetName="NewChangeSet")
stack["ChangeSetName"].should.equal("NewChangeSet")
stack["StackName"].should.equal("NewStack")
stack["Status"].should.equal("REVIEW_IN_PROGRESS")
cf_conn.create_change_set(
StackName="NewStack",
@ -851,15 +853,30 @@ def test_describe_change_set():
@mock_cloudformation
@mock_ec2
def test_execute_change_set_w_arn():
cf_conn = boto3.client("cloudformation", region_name="us-east-1")
ec2 = boto3.client("ec2", region_name="us-east-1")
# Verify no instances exist at the moment
ec2.describe_instances()["Reservations"].should.have.length_of(0)
# Create a Change set, and verify no resources have been created yet
change_set = cf_conn.create_change_set(
StackName="NewStack",
TemplateBody=dummy_template_json,
ChangeSetName="NewChangeSet",
ChangeSetType="CREATE",
)
ec2.describe_instances()["Reservations"].should.have.length_of(0)
cf_conn.describe_change_set(ChangeSetName="NewChangeSet")["Status"].should.equal(
"REVIEW_IN_PROGRESS"
)
# Execute change set
cf_conn.execute_change_set(ChangeSetName=change_set["Id"])
# Verify that the status has changed, and the appropriate resources have been created
cf_conn.describe_change_set(ChangeSetName="NewChangeSet")["Status"].should.equal(
"CREATE_COMPLETE"
)
ec2.describe_instances()["Reservations"].should.have.length_of(1)
@mock_cloudformation

View file

@ -1,9 +1,10 @@
import boto
from boto.ec2.cloudwatch.alarm import MetricAlarm
from boto.s3.key import Key
from datetime import datetime
import sure # noqa
from moto import mock_cloudwatch_deprecated
from moto import mock_cloudwatch_deprecated, mock_s3_deprecated
def alarm_fixture(name="tester", action=None):
@ -83,7 +84,8 @@ def test_put_metric_data():
)
metrics = conn.list_metrics()
metrics.should.have.length_of(1)
metric_names = [m for m in metrics if m.name == "metric"]
metric_names.should.have(1)
metric = metrics[0]
metric.namespace.should.equal("tester")
metric.name.should.equal("metric")
@ -153,3 +155,36 @@ def test_get_metric_statistics():
datapoint = datapoints[0]
datapoint.should.have.key("Minimum").which.should.equal(1.5)
datapoint.should.have.key("Timestamp").which.should.equal(metric_timestamp)
# TODO: THIS IS CURRENTLY BROKEN!
# @mock_s3_deprecated
# @mock_cloudwatch_deprecated
# def test_cloudwatch_return_s3_metrics():
#
# region = "us-east-1"
#
# cw = boto.ec2.cloudwatch.connect_to_region(region)
# s3 = boto.s3.connect_to_region(region)
#
# bucket_name_1 = "test-bucket-1"
# bucket_name_2 = "test-bucket-2"
#
# bucket1 = s3.create_bucket(bucket_name=bucket_name_1)
# key = Key(bucket1)
# key.key = "the-key"
# key.set_contents_from_string("foobar" * 4)
# s3.create_bucket(bucket_name=bucket_name_2)
#
# metrics_s3_bucket_1 = cw.list_metrics(dimensions={"BucketName": bucket_name_1})
# # Verify that the OOTB S3 metrics are available for the created buckets
# len(metrics_s3_bucket_1).should.be(2)
# metric_names = [m.name for m in metrics_s3_bucket_1]
# sorted(metric_names).should.equal(
# ["Metric:BucketSizeBytes", "Metric:NumberOfObjects"]
# )
#
# # Explicit clean up - the metrics for these buckets are messing with subsequent tests
# key.delete()
# s3.delete_bucket(bucket_name_1)
# s3.delete_bucket(bucket_name_2)

View file

@ -3,6 +3,7 @@
import boto3
from botocore.exceptions import ClientError
from datetime import datetime, timedelta
from freezegun import freeze_time
from nose.tools import assert_raises
from uuid import uuid4
import pytz
@ -211,6 +212,35 @@ def test_get_metric_statistics():
datapoint["Sum"].should.equal(1.5)
@mock_cloudwatch
@freeze_time("2020-02-10 18:44:05")
def test_custom_timestamp():
utc_now = datetime.now(tz=pytz.utc)
time = "2020-02-10T18:44:09Z"
cw = boto3.client("cloudwatch", "eu-west-1")
cw.put_metric_data(
Namespace="tester",
MetricData=[dict(MetricName="metric1", Value=1.5, Timestamp=time)],
)
cw.put_metric_data(
Namespace="tester",
MetricData=[
dict(MetricName="metric2", Value=1.5, Timestamp=datetime(2020, 2, 10))
],
)
stats = cw.get_metric_statistics(
Namespace="tester",
MetricName="metric",
StartTime=utc_now - timedelta(seconds=60),
EndTime=utc_now + timedelta(seconds=60),
Period=60,
Statistics=["SampleCount", "Sum"],
)
@mock_cloudwatch
def test_list_metrics():
cloudwatch = boto3.client("cloudwatch", "eu-west-1")
@ -233,8 +263,16 @@ def test_list_metrics():
# Verify format
res.should.equal(
[
{u"Namespace": "list_test_1/", u"Dimensions": [], u"MetricName": "metric1"},
{u"Namespace": "list_test_1/", u"Dimensions": [], u"MetricName": "metric1"},
{
u"Namespace": "list_test_1/",
u"Dimensions": [],
u"MetricName": "metric1",
},
{
u"Namespace": "list_test_1/",
u"Dimensions": [],
u"MetricName": "metric1",
},
]
)
# Verify unknown namespace still has no results
@ -292,3 +330,232 @@ def create_metrics(cloudwatch, namespace, metrics=5, data_points=5):
Namespace=namespace,
MetricData=[{"MetricName": metric_name, "Value": j, "Unit": "Seconds"}],
)
@mock_cloudwatch
def test_get_metric_data_within_timeframe():
utc_now = datetime.now(tz=pytz.utc)
cloudwatch = boto3.client("cloudwatch", "eu-west-1")
namespace1 = "my_namespace/"
# put metric data
values = [0, 2, 4, 3.5, 7, 100]
cloudwatch.put_metric_data(
Namespace=namespace1,
MetricData=[
{"MetricName": "metric1", "Value": val, "Unit": "Seconds"} for val in values
],
)
# get_metric_data
stats = ["Average", "Sum", "Minimum", "Maximum"]
response = cloudwatch.get_metric_data(
MetricDataQueries=[
{
"Id": "result_" + stat,
"MetricStat": {
"Metric": {"Namespace": namespace1, "MetricName": "metric1"},
"Period": 60,
"Stat": stat,
},
}
for stat in stats
],
StartTime=utc_now - timedelta(seconds=60),
EndTime=utc_now + timedelta(seconds=60),
)
#
# Assert Average/Min/Max/Sum is returned as expected
avg = [
res for res in response["MetricDataResults"] if res["Id"] == "result_Average"
][0]
avg["Label"].should.equal("metric1 Average")
avg["StatusCode"].should.equal("Complete")
[int(val) for val in avg["Values"]].should.equal([19])
sum_ = [res for res in response["MetricDataResults"] if res["Id"] == "result_Sum"][
0
]
sum_["Label"].should.equal("metric1 Sum")
sum_["StatusCode"].should.equal("Complete")
[val for val in sum_["Values"]].should.equal([sum(values)])
min_ = [
res for res in response["MetricDataResults"] if res["Id"] == "result_Minimum"
][0]
min_["Label"].should.equal("metric1 Minimum")
min_["StatusCode"].should.equal("Complete")
[int(val) for val in min_["Values"]].should.equal([0])
max_ = [
res for res in response["MetricDataResults"] if res["Id"] == "result_Maximum"
][0]
max_["Label"].should.equal("metric1 Maximum")
max_["StatusCode"].should.equal("Complete")
[int(val) for val in max_["Values"]].should.equal([100])
@mock_cloudwatch
def test_get_metric_data_partially_within_timeframe():
utc_now = datetime.now(tz=pytz.utc)
yesterday = utc_now - timedelta(days=1)
last_week = utc_now - timedelta(days=7)
cloudwatch = boto3.client("cloudwatch", "eu-west-1")
namespace1 = "my_namespace/"
# put metric data
values = [0, 2, 4, 3.5, 7, 100]
cloudwatch.put_metric_data(
Namespace=namespace1,
MetricData=[
{
"MetricName": "metric1",
"Value": 10,
"Unit": "Seconds",
"Timestamp": utc_now,
}
],
)
cloudwatch.put_metric_data(
Namespace=namespace1,
MetricData=[
{
"MetricName": "metric1",
"Value": 20,
"Unit": "Seconds",
"Timestamp": yesterday,
}
],
)
cloudwatch.put_metric_data(
Namespace=namespace1,
MetricData=[
{
"MetricName": "metric1",
"Value": 50,
"Unit": "Seconds",
"Timestamp": last_week,
}
],
)
# get_metric_data
response = cloudwatch.get_metric_data(
MetricDataQueries=[
{
"Id": "result",
"MetricStat": {
"Metric": {"Namespace": namespace1, "MetricName": "metric1"},
"Period": 60,
"Stat": "Sum",
},
}
],
StartTime=yesterday - timedelta(seconds=60),
EndTime=utc_now + timedelta(seconds=60),
)
#
# Assert Last week's data is not returned
len(response["MetricDataResults"]).should.equal(1)
sum_ = response["MetricDataResults"][0]
sum_["Label"].should.equal("metric1 Sum")
sum_["StatusCode"].should.equal("Complete")
sum_["Values"].should.equal([30.0])
@mock_cloudwatch
def test_get_metric_data_outside_timeframe():
utc_now = datetime.now(tz=pytz.utc)
last_week = utc_now - timedelta(days=7)
cloudwatch = boto3.client("cloudwatch", "eu-west-1")
namespace1 = "my_namespace/"
# put metric data
cloudwatch.put_metric_data(
Namespace=namespace1,
MetricData=[
{
"MetricName": "metric1",
"Value": 50,
"Unit": "Seconds",
"Timestamp": last_week,
}
],
)
# get_metric_data
response = cloudwatch.get_metric_data(
MetricDataQueries=[
{
"Id": "result",
"MetricStat": {
"Metric": {"Namespace": namespace1, "MetricName": "metric1"},
"Period": 60,
"Stat": "Sum",
},
}
],
StartTime=utc_now - timedelta(seconds=60),
EndTime=utc_now + timedelta(seconds=60),
)
#
# Assert Last week's data is not returned
len(response["MetricDataResults"]).should.equal(1)
response["MetricDataResults"][0]["Id"].should.equal("result")
response["MetricDataResults"][0]["StatusCode"].should.equal("Complete")
response["MetricDataResults"][0]["Values"].should.equal([])
@mock_cloudwatch
def test_get_metric_data_for_multiple_metrics():
utc_now = datetime.now(tz=pytz.utc)
cloudwatch = boto3.client("cloudwatch", "eu-west-1")
namespace = "my_namespace/"
# put metric data
cloudwatch.put_metric_data(
Namespace=namespace,
MetricData=[
{
"MetricName": "metric1",
"Value": 50,
"Unit": "Seconds",
"Timestamp": utc_now,
}
],
)
cloudwatch.put_metric_data(
Namespace=namespace,
MetricData=[
{
"MetricName": "metric2",
"Value": 25,
"Unit": "Seconds",
"Timestamp": utc_now,
}
],
)
# get_metric_data
response = cloudwatch.get_metric_data(
MetricDataQueries=[
{
"Id": "result1",
"MetricStat": {
"Metric": {"Namespace": namespace, "MetricName": "metric1"},
"Period": 60,
"Stat": "Sum",
},
},
{
"Id": "result2",
"MetricStat": {
"Metric": {"Namespace": namespace, "MetricName": "metric2"},
"Period": 60,
"Stat": "Sum",
},
},
],
StartTime=utc_now - timedelta(seconds=60),
EndTime=utc_now + timedelta(seconds=60),
)
#
len(response["MetricDataResults"]).should.equal(2)
res1 = [res for res in response["MetricDataResults"] if res["Id"] == "result1"][0]
res1["Values"].should.equal([50.0])
res2 = [res for res in response["MetricDataResults"] if res["Id"] == "result2"][0]
res2["Values"].should.equal([25.0])

View file

@ -7,6 +7,7 @@ from nose.tools import assert_raises
from moto import mock_cognitoidentity
from moto.cognitoidentity.utils import get_random_identity_id
from moto.core import ACCOUNT_ID
from uuid import UUID
@mock_cognitoidentity
@ -83,8 +84,10 @@ def test_describe_identity_pool_with_invalid_id_raises_error():
# testing a helper function
def test_get_random_identity_id():
assert len(get_random_identity_id("us-west-2")) > 0
assert len(get_random_identity_id("us-west-2").split(":")[1]) == 19
identity_id = get_random_identity_id("us-west-2")
region, id = identity_id.split(":")
region.should.equal("us-west-2")
UUID(id, version=4) # Will throw an error if it's not a valid UUID
@mock_cognitoidentity
@ -96,7 +99,6 @@ def test_get_id():
IdentityPoolId="us-west-2:12345",
Logins={"someurl": "12345"},
)
print(result)
assert (
result.get("IdentityId", "").startswith("us-west-2")
or result.get("ResponseMetadata").get("HTTPStatusCode") == 200

View file

@ -48,6 +48,5 @@ def test_get_id():
},
)
print(res.data)
json_data = json.loads(res.data.decode("utf-8"))
assert ":" in json_data["IdentityId"]

View file

@ -11,6 +11,8 @@ from moto import mock_s3
from moto.config import mock_config
from moto.core import ACCOUNT_ID
import sure # noqa
@mock_config
def test_put_configuration_recorder():

View file

@ -1,21 +1,17 @@
from __future__ import unicode_literals, print_function
import re
from decimal import Decimal
import six
import boto
import boto3
from boto3.dynamodb.conditions import Attr, Key
import re
import requests
import sure # noqa
from moto import mock_dynamodb2, mock_dynamodb2_deprecated
from moto.dynamodb2 import dynamodb_backend2, dynamodb_backends2
from boto.exception import JSONResponseError
from botocore.exceptions import ClientError, ParamValidationError
from tests.helpers import requires_boto_gte
import tests.backport_assert_raises
import moto.dynamodb2.comparisons
import moto.dynamodb2.models
@ -1454,6 +1450,13 @@ def test_filter_expression():
filter_expr.expr(row1).should.be(True)
filter_expr.expr(row2).should.be(False)
# lowercase AND test
filter_expr = moto.dynamodb2.comparisons.get_filter_expression(
"Id > :v0 and Subs < :v1", {}, {":v0": {"N": "5"}, ":v1": {"N": "7"}}
)
filter_expr.expr(row1).should.be(True)
filter_expr.expr(row2).should.be(False)
# OR test
filter_expr = moto.dynamodb2.comparisons.get_filter_expression(
"Id = :v0 OR Id=:v1", {}, {":v0": {"N": "5"}, ":v1": {"N": "8"}}
@ -2785,7 +2788,7 @@ def test_query_gsi_with_range_key():
res = dynamodb.query(
TableName="test",
IndexName="test_gsi",
KeyConditionExpression="gsi_hash_key = :gsi_hash_key AND gsi_range_key = :gsi_range_key",
KeyConditionExpression="gsi_hash_key = :gsi_hash_key and gsi_range_key = :gsi_range_key",
ExpressionAttributeValues={
":gsi_hash_key": {"S": "key1"},
":gsi_range_key": {"S": "range1"},
@ -3214,6 +3217,25 @@ def test_remove_top_level_attribute():
result.should.equal({"id": {"S": "foo"}})
@mock_dynamodb2
def test_remove_top_level_attribute_non_existent():
"""
Remove statements do not require attribute to exist they silently pass
"""
table_name = "test_remove"
client = create_table_with_list(table_name)
ddb_item = {"id": {"S": "foo"}, "item": {"S": "bar"}}
client.put_item(TableName=table_name, Item=ddb_item)
client.update_item(
TableName=table_name,
Key={"id": {"S": "foo"}},
UpdateExpression="REMOVE non_existent_attribute",
ExpressionAttributeNames={"#i": "item"},
)
result = client.get_item(TableName=table_name, Key={"id": {"S": "foo"}})["Item"]
result.should.equal(ddb_item)
@mock_dynamodb2
def test_remove_list_index__remove_existing_index():
table_name = "test_list_index_access"
@ -4212,6 +4234,396 @@ def test_gsi_verify_negative_number_order():
)
@mock_dynamodb2
def test_transact_write_items_put():
table_schema = {
"KeySchema": [{"AttributeName": "id", "KeyType": "HASH"}],
"AttributeDefinitions": [{"AttributeName": "id", "AttributeType": "S"},],
}
dynamodb = boto3.client("dynamodb", region_name="us-east-1")
dynamodb.create_table(
TableName="test-table", BillingMode="PAY_PER_REQUEST", **table_schema
)
# Put multiple items
dynamodb.transact_write_items(
TransactItems=[
{
"Put": {
"Item": {"id": {"S": "foo{}".format(str(i))}, "foo": {"S": "bar"},},
"TableName": "test-table",
}
}
for i in range(0, 5)
]
)
# Assert all are present
items = dynamodb.scan(TableName="test-table")["Items"]
items.should.have.length_of(5)
@mock_dynamodb2
def test_transact_write_items_put_conditional_expressions():
table_schema = {
"KeySchema": [{"AttributeName": "id", "KeyType": "HASH"}],
"AttributeDefinitions": [{"AttributeName": "id", "AttributeType": "S"},],
}
dynamodb = boto3.client("dynamodb", region_name="us-east-1")
dynamodb.create_table(
TableName="test-table", BillingMode="PAY_PER_REQUEST", **table_schema
)
dynamodb.put_item(
TableName="test-table", Item={"id": {"S": "foo2"},},
)
# Put multiple items
with assert_raises(ClientError) as ex:
dynamodb.transact_write_items(
TransactItems=[
{
"Put": {
"Item": {
"id": {"S": "foo{}".format(str(i))},
"foo": {"S": "bar"},
},
"TableName": "test-table",
"ConditionExpression": "#i <> :i",
"ExpressionAttributeNames": {"#i": "id"},
"ExpressionAttributeValues": {
":i": {
"S": "foo2"
} # This item already exist, so the ConditionExpression should fail
},
}
}
for i in range(0, 5)
]
)
# Assert the exception is correct
ex.exception.response["Error"]["Code"].should.equal(
"ConditionalCheckFailedException"
)
ex.exception.response["ResponseMetadata"]["HTTPStatusCode"].should.equal(400)
ex.exception.response["Error"]["Message"].should.equal(
"A condition specified in the operation could not be evaluated."
)
# Assert all are present
items = dynamodb.scan(TableName="test-table")["Items"]
items.should.have.length_of(1)
items[0].should.equal({"id": {"S": "foo2"}})
@mock_dynamodb2
def test_transact_write_items_conditioncheck_passes():
table_schema = {
"KeySchema": [{"AttributeName": "id", "KeyType": "HASH"}],
"AttributeDefinitions": [{"AttributeName": "id", "AttributeType": "S"},],
}
dynamodb = boto3.client("dynamodb", region_name="us-east-1")
dynamodb.create_table(
TableName="test-table", BillingMode="PAY_PER_REQUEST", **table_schema
)
# Insert an item without email address
dynamodb.put_item(
TableName="test-table", Item={"id": {"S": "foo"},},
)
# Put an email address, after verifying it doesn't exist yet
dynamodb.transact_write_items(
TransactItems=[
{
"ConditionCheck": {
"Key": {"id": {"S": "foo"}},
"TableName": "test-table",
"ConditionExpression": "attribute_not_exists(#e)",
"ExpressionAttributeNames": {"#e": "email_address"},
}
},
{
"Put": {
"Item": {
"id": {"S": "foo"},
"email_address": {"S": "test@moto.com"},
},
"TableName": "test-table",
}
},
]
)
# Assert all are present
items = dynamodb.scan(TableName="test-table")["Items"]
items.should.have.length_of(1)
items[0].should.equal({"email_address": {"S": "test@moto.com"}, "id": {"S": "foo"}})
@mock_dynamodb2
def test_transact_write_items_conditioncheck_fails():
table_schema = {
"KeySchema": [{"AttributeName": "id", "KeyType": "HASH"}],
"AttributeDefinitions": [{"AttributeName": "id", "AttributeType": "S"},],
}
dynamodb = boto3.client("dynamodb", region_name="us-east-1")
dynamodb.create_table(
TableName="test-table", BillingMode="PAY_PER_REQUEST", **table_schema
)
# Insert an item with email address
dynamodb.put_item(
TableName="test-table",
Item={"id": {"S": "foo"}, "email_address": {"S": "test@moto.com"}},
)
# Try to put an email address, but verify whether it exists
# ConditionCheck should fail
with assert_raises(ClientError) as ex:
dynamodb.transact_write_items(
TransactItems=[
{
"ConditionCheck": {
"Key": {"id": {"S": "foo"}},
"TableName": "test-table",
"ConditionExpression": "attribute_not_exists(#e)",
"ExpressionAttributeNames": {"#e": "email_address"},
}
},
{
"Put": {
"Item": {
"id": {"S": "foo"},
"email_address": {"S": "update@moto.com"},
},
"TableName": "test-table",
}
},
]
)
# Assert the exception is correct
ex.exception.response["Error"]["Code"].should.equal(
"ConditionalCheckFailedException"
)
ex.exception.response["ResponseMetadata"]["HTTPStatusCode"].should.equal(400)
ex.exception.response["Error"]["Message"].should.equal(
"A condition specified in the operation could not be evaluated."
)
# Assert the original email address is still present
items = dynamodb.scan(TableName="test-table")["Items"]
items.should.have.length_of(1)
items[0].should.equal({"email_address": {"S": "test@moto.com"}, "id": {"S": "foo"}})
@mock_dynamodb2
def test_transact_write_items_delete():
table_schema = {
"KeySchema": [{"AttributeName": "id", "KeyType": "HASH"}],
"AttributeDefinitions": [{"AttributeName": "id", "AttributeType": "S"},],
}
dynamodb = boto3.client("dynamodb", region_name="us-east-1")
dynamodb.create_table(
TableName="test-table", BillingMode="PAY_PER_REQUEST", **table_schema
)
# Insert an item
dynamodb.put_item(
TableName="test-table", Item={"id": {"S": "foo"},},
)
# Delete the item
dynamodb.transact_write_items(
TransactItems=[
{"Delete": {"Key": {"id": {"S": "foo"}}, "TableName": "test-table",}}
]
)
# Assert the item is deleted
items = dynamodb.scan(TableName="test-table")["Items"]
items.should.have.length_of(0)
@mock_dynamodb2
def test_transact_write_items_delete_with_successful_condition_expression():
table_schema = {
"KeySchema": [{"AttributeName": "id", "KeyType": "HASH"}],
"AttributeDefinitions": [{"AttributeName": "id", "AttributeType": "S"},],
}
dynamodb = boto3.client("dynamodb", region_name="us-east-1")
dynamodb.create_table(
TableName="test-table", BillingMode="PAY_PER_REQUEST", **table_schema
)
# Insert an item without email address
dynamodb.put_item(
TableName="test-table", Item={"id": {"S": "foo"},},
)
# ConditionExpression will pass - no email address has been specified yet
dynamodb.transact_write_items(
TransactItems=[
{
"Delete": {
"Key": {"id": {"S": "foo"},},
"TableName": "test-table",
"ConditionExpression": "attribute_not_exists(#e)",
"ExpressionAttributeNames": {"#e": "email_address"},
}
}
]
)
# Assert the item is deleted
items = dynamodb.scan(TableName="test-table")["Items"]
items.should.have.length_of(0)
@mock_dynamodb2
def test_transact_write_items_delete_with_failed_condition_expression():
table_schema = {
"KeySchema": [{"AttributeName": "id", "KeyType": "HASH"}],
"AttributeDefinitions": [{"AttributeName": "id", "AttributeType": "S"},],
}
dynamodb = boto3.client("dynamodb", region_name="us-east-1")
dynamodb.create_table(
TableName="test-table", BillingMode="PAY_PER_REQUEST", **table_schema
)
# Insert an item with email address
dynamodb.put_item(
TableName="test-table",
Item={"id": {"S": "foo"}, "email_address": {"S": "test@moto.com"}},
)
# Try to delete an item that does not have an email address
# ConditionCheck should fail
with assert_raises(ClientError) as ex:
dynamodb.transact_write_items(
TransactItems=[
{
"Delete": {
"Key": {"id": {"S": "foo"},},
"TableName": "test-table",
"ConditionExpression": "attribute_not_exists(#e)",
"ExpressionAttributeNames": {"#e": "email_address"},
}
}
]
)
# Assert the exception is correct
ex.exception.response["Error"]["Code"].should.equal(
"ConditionalCheckFailedException"
)
ex.exception.response["ResponseMetadata"]["HTTPStatusCode"].should.equal(400)
ex.exception.response["Error"]["Message"].should.equal(
"A condition specified in the operation could not be evaluated."
)
# Assert the original item is still present
items = dynamodb.scan(TableName="test-table")["Items"]
items.should.have.length_of(1)
items[0].should.equal({"email_address": {"S": "test@moto.com"}, "id": {"S": "foo"}})
@mock_dynamodb2
def test_transact_write_items_update():
table_schema = {
"KeySchema": [{"AttributeName": "id", "KeyType": "HASH"}],
"AttributeDefinitions": [{"AttributeName": "id", "AttributeType": "S"},],
}
dynamodb = boto3.client("dynamodb", region_name="us-east-1")
dynamodb.create_table(
TableName="test-table", BillingMode="PAY_PER_REQUEST", **table_schema
)
# Insert an item
dynamodb.put_item(TableName="test-table", Item={"id": {"S": "foo"}})
# Update the item
dynamodb.transact_write_items(
TransactItems=[
{
"Update": {
"Key": {"id": {"S": "foo"}},
"TableName": "test-table",
"UpdateExpression": "SET #e = :v",
"ExpressionAttributeNames": {"#e": "email_address"},
"ExpressionAttributeValues": {":v": {"S": "test@moto.com"}},
}
}
]
)
# Assert the item is updated
items = dynamodb.scan(TableName="test-table")["Items"]
items.should.have.length_of(1)
items[0].should.equal({"id": {"S": "foo"}, "email_address": {"S": "test@moto.com"}})
@mock_dynamodb2
def test_transact_write_items_update_with_failed_condition_expression():
table_schema = {
"KeySchema": [{"AttributeName": "id", "KeyType": "HASH"}],
"AttributeDefinitions": [{"AttributeName": "id", "AttributeType": "S"},],
}
dynamodb = boto3.client("dynamodb", region_name="us-east-1")
dynamodb.create_table(
TableName="test-table", BillingMode="PAY_PER_REQUEST", **table_schema
)
# Insert an item with email address
dynamodb.put_item(
TableName="test-table",
Item={"id": {"S": "foo"}, "email_address": {"S": "test@moto.com"}},
)
# Try to update an item that does not have an email address
# ConditionCheck should fail
with assert_raises(ClientError) as ex:
dynamodb.transact_write_items(
TransactItems=[
{
"Update": {
"Key": {"id": {"S": "foo"}},
"TableName": "test-table",
"UpdateExpression": "SET #e = :v",
"ConditionExpression": "attribute_not_exists(#e)",
"ExpressionAttributeNames": {"#e": "email_address"},
"ExpressionAttributeValues": {":v": {"S": "update@moto.com"}},
}
}
]
)
# Assert the exception is correct
ex.exception.response["Error"]["Code"].should.equal(
"ConditionalCheckFailedException"
)
ex.exception.response["ResponseMetadata"]["HTTPStatusCode"].should.equal(400)
ex.exception.response["Error"]["Message"].should.equal(
"A condition specified in the operation could not be evaluated."
)
# Assert the original item is still present
items = dynamodb.scan(TableName="test-table")["Items"]
items.should.have.length_of(1)
items[0].should.equal({"email_address": {"S": "test@moto.com"}, "id": {"S": "foo"}})
@mock_dynamodb2
def test_dynamodb_max_1mb_limit():
ddb = boto3.resource("dynamodb", region_name="eu-west-1")
table_name = "populated-mock-table"
table = ddb.create_table(
TableName=table_name,
KeySchema=[
{"AttributeName": "partition_key", "KeyType": "HASH"},
{"AttributeName": "sort_key", "KeyType": "RANGE"},
],
AttributeDefinitions=[
{"AttributeName": "partition_key", "AttributeType": "S"},
{"AttributeName": "sort_key", "AttributeType": "S"},
],
BillingMode="PAY_PER_REQUEST",
)
# Populate the table
items = [
{
"partition_key": "partition_key_val", # size=30
"sort_key": "sort_key_value____" + str(i), # size=30
}
for i in range(10000, 29999)
]
with table.batch_writer() as batch:
for item in items:
batch.put_item(Item=item)
response = table.query(
KeyConditionExpression=Key("partition_key").eq("partition_key_val")
)
# We shouldn't get everything back - the total result set is well over 1MB
len(items).should.be.greater_than(response["Count"])
response["LastEvaluatedKey"].shouldnt.be(None)
def assert_raise_syntax_error(client_error, token, near):
"""
Assert whether a client_error is as expected Syntax error. Syntax error looks like: `syntax_error_template`
@ -4286,3 +4698,251 @@ def test_list_tables_exclusive_start_table_name_empty():
resp = client.list_tables(Limit=1, ExclusiveStartTableName="whatever")
len(resp["TableNames"]).should.equal(0)
def assert_correct_client_error(
client_error, code, message_template, message_values=None, braces=None
):
"""
Assert whether a client_error is as expected. Allow for a list of values to be passed into the message
Args:
client_error(ClientError): The ClientError exception that was raised
code(str): The code for the error (e.g. ValidationException)
message_template(str): Error message template. if message_values is not None then this template has a {values}
as placeholder. For example:
'Value provided in ExpressionAttributeValues unused in expressions: keys: {values}'
message_values(list of str|None): The values that are passed in the error message
braces(list of str|None): List of length 2 with opening and closing brace for the values. By default it will be
surrounded by curly brackets
"""
braces = braces or ["{", "}"]
assert client_error.response["Error"]["Code"] == code
if message_values is not None:
values_string = "{open_brace}(?P<values>.*){close_brace}".format(
open_brace=braces[0], close_brace=braces[1]
)
re_msg = re.compile(message_template.format(values=values_string))
match_result = re_msg.match(client_error.response["Error"]["Message"])
assert match_result is not None
values_string = match_result.groupdict()["values"]
values = [key for key in values_string.split(", ")]
assert len(message_values) == len(values)
for value in message_values:
assert value in values
else:
assert client_error.response["Error"]["Message"] == message_template
def create_simple_table_and_return_client():
dynamodb = boto3.client("dynamodb", region_name="eu-west-1")
dynamodb.create_table(
TableName="moto-test",
KeySchema=[{"AttributeName": "id", "KeyType": "HASH"}],
AttributeDefinitions=[{"AttributeName": "id", "AttributeType": "S"},],
ProvisionedThroughput={"ReadCapacityUnits": 1, "WriteCapacityUnits": 1},
)
dynamodb.put_item(
TableName="moto-test",
Item={"id": {"S": "1"}, "myNum": {"N": "1"}, "MyStr": {"S": "1"},},
)
return dynamodb
# https://github.com/spulec/moto/issues/2806
# https://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_UpdateItem.html
# #DDB-UpdateItem-request-UpdateExpression
@mock_dynamodb2
def test_update_item_with_attribute_in_right_hand_side_and_operation():
dynamodb = create_simple_table_and_return_client()
dynamodb.update_item(
TableName="moto-test",
Key={"id": {"S": "1"}},
UpdateExpression="SET myNum = myNum+:val",
ExpressionAttributeValues={":val": {"N": "3"}},
)
result = dynamodb.get_item(TableName="moto-test", Key={"id": {"S": "1"}})
assert result["Item"]["myNum"]["N"] == "4"
dynamodb.update_item(
TableName="moto-test",
Key={"id": {"S": "1"}},
UpdateExpression="SET myNum = myNum - :val",
ExpressionAttributeValues={":val": {"N": "1"}},
)
result = dynamodb.get_item(TableName="moto-test", Key={"id": {"S": "1"}})
assert result["Item"]["myNum"]["N"] == "3"
@mock_dynamodb2
def test_non_existing_attribute_should_raise_exception():
"""
Does error message get correctly raised if attribute is referenced but it does not exist for the item.
"""
dynamodb = create_simple_table_and_return_client()
try:
dynamodb.update_item(
TableName="moto-test",
Key={"id": {"S": "1"}},
UpdateExpression="SET MyStr = no_attr + MyStr",
)
assert False, "Validation exception not thrown"
except dynamodb.exceptions.ClientError as e:
assert_correct_client_error(
e,
"ValidationException",
"The provided expression refers to an attribute that does not exist in the item",
)
@mock_dynamodb2
def test_update_expression_with_plus_in_attribute_name():
"""
Does error message get correctly raised if attribute contains a plus and is passed in without an AttributeName. And
lhs & rhs are not attribute IDs by themselve.
"""
dynamodb = create_simple_table_and_return_client()
dynamodb.put_item(
TableName="moto-test",
Item={"id": {"S": "1"}, "my+Num": {"S": "1"}, "MyStr": {"S": "aaa"},},
)
try:
dynamodb.update_item(
TableName="moto-test",
Key={"id": {"S": "1"}},
UpdateExpression="SET MyStr = my+Num",
)
assert False, "Validation exception not thrown"
except dynamodb.exceptions.ClientError as e:
assert_correct_client_error(
e,
"ValidationException",
"The provided expression refers to an attribute that does not exist in the item",
)
@mock_dynamodb2
def test_update_expression_with_minus_in_attribute_name():
"""
Does error message get correctly raised if attribute contains a minus and is passed in without an AttributeName. And
lhs & rhs are not attribute IDs by themselve.
"""
dynamodb = create_simple_table_and_return_client()
dynamodb.put_item(
TableName="moto-test",
Item={"id": {"S": "1"}, "my-Num": {"S": "1"}, "MyStr": {"S": "aaa"},},
)
try:
dynamodb.update_item(
TableName="moto-test",
Key={"id": {"S": "1"}},
UpdateExpression="SET MyStr = my-Num",
)
assert False, "Validation exception not thrown"
except dynamodb.exceptions.ClientError as e:
assert_correct_client_error(
e,
"ValidationException",
"The provided expression refers to an attribute that does not exist in the item",
)
@mock_dynamodb2
def test_update_expression_with_space_in_attribute_name():
"""
Does error message get correctly raised if attribute contains a space and is passed in without an AttributeName. And
lhs & rhs are not attribute IDs by themselves.
"""
dynamodb = create_simple_table_and_return_client()
dynamodb.put_item(
TableName="moto-test",
Item={"id": {"S": "1"}, "my Num": {"S": "1"}, "MyStr": {"S": "aaa"},},
)
try:
dynamodb.update_item(
TableName="moto-test",
Key={"id": {"S": "1"}},
UpdateExpression="SET MyStr = my Num",
)
assert False, "Validation exception not thrown"
except dynamodb.exceptions.ClientError as e:
assert_raise_syntax_error(e, "Num", "my Num")
@mock_dynamodb2
def test_summing_up_2_strings_raises_exception():
"""
Update set supports different DynamoDB types but some operations are not supported. For example summing up 2 strings
raises an exception. It results in ClientError with code ValidationException:
Saying An operand in the update expression has an incorrect data type
"""
dynamodb = create_simple_table_and_return_client()
try:
dynamodb.update_item(
TableName="moto-test",
Key={"id": {"S": "1"}},
UpdateExpression="SET MyStr = MyStr + MyStr",
)
assert False, "Validation exception not thrown"
except dynamodb.exceptions.ClientError as e:
assert_correct_client_error(
e,
"ValidationException",
"An operand in the update expression has an incorrect data type",
)
# https://github.com/spulec/moto/issues/2806
@mock_dynamodb2
def test_update_item_with_attribute_in_right_hand_side():
"""
After tokenization and building expression make sure referenced attributes are replaced with their current value
"""
dynamodb = create_simple_table_and_return_client()
# Make sure there are 2 values
dynamodb.put_item(
TableName="moto-test",
Item={"id": {"S": "1"}, "myVal1": {"S": "Value1"}, "myVal2": {"S": "Value2"}},
)
dynamodb.update_item(
TableName="moto-test",
Key={"id": {"S": "1"}},
UpdateExpression="SET myVal1 = myVal2",
)
result = dynamodb.get_item(TableName="moto-test", Key={"id": {"S": "1"}})
assert result["Item"]["myVal1"]["S"] == result["Item"]["myVal2"]["S"] == "Value2"
@mock_dynamodb2
def test_multiple_updates():
dynamodb = create_simple_table_and_return_client()
dynamodb.put_item(
TableName="moto-test",
Item={"id": {"S": "1"}, "myNum": {"N": "1"}, "path": {"N": "6"}},
)
dynamodb.update_item(
TableName="moto-test",
Key={"id": {"S": "1"}},
UpdateExpression="SET myNum = #p + :val, newAttr = myNum",
ExpressionAttributeValues={":val": {"N": "1"}},
ExpressionAttributeNames={"#p": "path"},
)
result = dynamodb.get_item(TableName="moto-test", Key={"id": {"S": "1"}})["Item"]
expected_result = {
"myNum": {"N": "7"},
"newAttr": {"N": "1"},
"path": {"N": "6"},
"id": {"S": "1"},
}
assert result == expected_result

View file

@ -0,0 +1,446 @@
from moto.dynamodb2.exceptions import IncorrectOperandType, IncorrectDataType
from moto.dynamodb2.models import Item, DynamoType
from moto.dynamodb2.parsing.executors import UpdateExpressionExecutor
from moto.dynamodb2.parsing.expressions import UpdateExpressionParser
from moto.dynamodb2.parsing.validators import UpdateExpressionValidator
from parameterized import parameterized
def test_execution_of_if_not_exists_not_existing_value():
update_expression = "SET a = if_not_exists(b, a)"
update_expression_ast = UpdateExpressionParser.make(update_expression)
item = Item(
hash_key=DynamoType({"S": "id"}),
hash_key_type="TYPE",
range_key=None,
range_key_type=None,
attrs={"id": {"S": "1"}, "a": {"S": "A"}},
)
validated_ast = UpdateExpressionValidator(
update_expression_ast,
expression_attribute_names=None,
expression_attribute_values=None,
item=item,
).validate()
UpdateExpressionExecutor(validated_ast, item, None).execute()
expected_item = Item(
hash_key=DynamoType({"S": "id"}),
hash_key_type="TYPE",
range_key=None,
range_key_type=None,
attrs={"id": {"S": "1"}, "a": {"S": "A"}},
)
assert expected_item == item
def test_execution_of_if_not_exists_with_existing_attribute_should_return_attribute():
update_expression = "SET a = if_not_exists(b, a)"
update_expression_ast = UpdateExpressionParser.make(update_expression)
item = Item(
hash_key=DynamoType({"S": "id"}),
hash_key_type="TYPE",
range_key=None,
range_key_type=None,
attrs={"id": {"S": "1"}, "a": {"S": "A"}, "b": {"S": "B"}},
)
validated_ast = UpdateExpressionValidator(
update_expression_ast,
expression_attribute_names=None,
expression_attribute_values=None,
item=item,
).validate()
UpdateExpressionExecutor(validated_ast, item, None).execute()
expected_item = Item(
hash_key=DynamoType({"S": "id"}),
hash_key_type="TYPE",
range_key=None,
range_key_type=None,
attrs={"id": {"S": "1"}, "a": {"S": "B"}, "b": {"S": "B"}},
)
assert expected_item == item
def test_execution_of_if_not_exists_with_existing_attribute_should_return_value():
update_expression = "SET a = if_not_exists(b, :val)"
update_expression_values = {":val": {"N": "4"}}
update_expression_ast = UpdateExpressionParser.make(update_expression)
item = Item(
hash_key=DynamoType({"S": "id"}),
hash_key_type="TYPE",
range_key=None,
range_key_type=None,
attrs={"id": {"S": "1"}, "b": {"N": "3"}},
)
validated_ast = UpdateExpressionValidator(
update_expression_ast,
expression_attribute_names=None,
expression_attribute_values=update_expression_values,
item=item,
).validate()
UpdateExpressionExecutor(validated_ast, item, None).execute()
expected_item = Item(
hash_key=DynamoType({"S": "id"}),
hash_key_type="TYPE",
range_key=None,
range_key_type=None,
attrs={"id": {"S": "1"}, "b": {"N": "3"}, "a": {"N": "3"}},
)
assert expected_item == item
def test_execution_of_if_not_exists_with_non_existing_attribute_should_return_value():
update_expression = "SET a = if_not_exists(b, :val)"
update_expression_values = {":val": {"N": "4"}}
update_expression_ast = UpdateExpressionParser.make(update_expression)
item = Item(
hash_key=DynamoType({"S": "id"}),
hash_key_type="TYPE",
range_key=None,
range_key_type=None,
attrs={"id": {"S": "1"}},
)
validated_ast = UpdateExpressionValidator(
update_expression_ast,
expression_attribute_names=None,
expression_attribute_values=update_expression_values,
item=item,
).validate()
UpdateExpressionExecutor(validated_ast, item, None).execute()
expected_item = Item(
hash_key=DynamoType({"S": "id"}),
hash_key_type="TYPE",
range_key=None,
range_key_type=None,
attrs={"id": {"S": "1"}, "a": {"N": "4"}},
)
assert expected_item == item
def test_execution_of_sum_operation():
update_expression = "SET a = a + b"
update_expression_ast = UpdateExpressionParser.make(update_expression)
item = Item(
hash_key=DynamoType({"S": "id"}),
hash_key_type="TYPE",
range_key=None,
range_key_type=None,
attrs={"id": {"S": "1"}, "a": {"N": "3"}, "b": {"N": "4"}},
)
validated_ast = UpdateExpressionValidator(
update_expression_ast,
expression_attribute_names=None,
expression_attribute_values=None,
item=item,
).validate()
UpdateExpressionExecutor(validated_ast, item, None).execute()
expected_item = Item(
hash_key=DynamoType({"S": "id"}),
hash_key_type="TYPE",
range_key=None,
range_key_type=None,
attrs={"id": {"S": "1"}, "a": {"N": "7"}, "b": {"N": "4"}},
)
assert expected_item == item
def test_execution_of_remove():
update_expression = "Remove a"
update_expression_ast = UpdateExpressionParser.make(update_expression)
item = Item(
hash_key=DynamoType({"S": "id"}),
hash_key_type="TYPE",
range_key=None,
range_key_type=None,
attrs={"id": {"S": "1"}, "a": {"N": "3"}, "b": {"N": "4"}},
)
validated_ast = UpdateExpressionValidator(
update_expression_ast,
expression_attribute_names=None,
expression_attribute_values=None,
item=item,
).validate()
UpdateExpressionExecutor(validated_ast, item, None).execute()
expected_item = Item(
hash_key=DynamoType({"S": "id"}),
hash_key_type="TYPE",
range_key=None,
range_key_type=None,
attrs={"id": {"S": "1"}, "b": {"N": "4"}},
)
assert expected_item == item
def test_execution_of_remove_in_map():
update_expression = "Remove itemmap.itemlist[1].foo11"
update_expression_ast = UpdateExpressionParser.make(update_expression)
item = Item(
hash_key=DynamoType({"S": "id"}),
hash_key_type="TYPE",
range_key=None,
range_key_type=None,
attrs={
"id": {"S": "foo2"},
"itemmap": {
"M": {
"itemlist": {
"L": [
{"M": {"foo00": {"S": "bar1"}, "foo01": {"S": "bar2"}}},
{"M": {"foo10": {"S": "bar1"}, "foo11": {"S": "bar2"}}},
]
}
}
},
},
)
validated_ast = UpdateExpressionValidator(
update_expression_ast,
expression_attribute_names=None,
expression_attribute_values=None,
item=item,
).validate()
UpdateExpressionExecutor(validated_ast, item, None).execute()
expected_item = Item(
hash_key=DynamoType({"S": "id"}),
hash_key_type="TYPE",
range_key=None,
range_key_type=None,
attrs={
"id": {"S": "foo2"},
"itemmap": {
"M": {
"itemlist": {
"L": [
{"M": {"foo00": {"S": "bar1"}, "foo01": {"S": "bar2"}}},
{"M": {"foo10": {"S": "bar1"},}},
]
}
}
},
},
)
assert expected_item == item
def test_execution_of_remove_in_list():
update_expression = "Remove itemmap.itemlist[1]"
update_expression_ast = UpdateExpressionParser.make(update_expression)
item = Item(
hash_key=DynamoType({"S": "id"}),
hash_key_type="TYPE",
range_key=None,
range_key_type=None,
attrs={
"id": {"S": "foo2"},
"itemmap": {
"M": {
"itemlist": {
"L": [
{"M": {"foo00": {"S": "bar1"}, "foo01": {"S": "bar2"}}},
{"M": {"foo10": {"S": "bar1"}, "foo11": {"S": "bar2"}}},
]
}
}
},
},
)
validated_ast = UpdateExpressionValidator(
update_expression_ast,
expression_attribute_names=None,
expression_attribute_values=None,
item=item,
).validate()
UpdateExpressionExecutor(validated_ast, item, None).execute()
expected_item = Item(
hash_key=DynamoType({"S": "id"}),
hash_key_type="TYPE",
range_key=None,
range_key_type=None,
attrs={
"id": {"S": "foo2"},
"itemmap": {
"M": {
"itemlist": {
"L": [{"M": {"foo00": {"S": "bar1"}, "foo01": {"S": "bar2"}}},]
}
}
},
},
)
assert expected_item == item
def test_execution_of_delete_element_from_set():
update_expression = "delete s :value"
update_expression_ast = UpdateExpressionParser.make(update_expression)
item = Item(
hash_key=DynamoType({"S": "id"}),
hash_key_type="TYPE",
range_key=None,
range_key_type=None,
attrs={"id": {"S": "foo2"}, "s": {"SS": ["value1", "value2", "value3"]},},
)
validated_ast = UpdateExpressionValidator(
update_expression_ast,
expression_attribute_names=None,
expression_attribute_values={":value": {"SS": ["value2", "value5"]}},
item=item,
).validate()
UpdateExpressionExecutor(validated_ast, item, None).execute()
expected_item = Item(
hash_key=DynamoType({"S": "id"}),
hash_key_type="TYPE",
range_key=None,
range_key_type=None,
attrs={"id": {"S": "foo2"}, "s": {"SS": ["value1", "value3"]},},
)
assert expected_item == item
def test_execution_of_add_number():
update_expression = "add s :value"
update_expression_ast = UpdateExpressionParser.make(update_expression)
item = Item(
hash_key=DynamoType({"S": "id"}),
hash_key_type="TYPE",
range_key=None,
range_key_type=None,
attrs={"id": {"S": "foo2"}, "s": {"N": "5"},},
)
validated_ast = UpdateExpressionValidator(
update_expression_ast,
expression_attribute_names=None,
expression_attribute_values={":value": {"N": "10"}},
item=item,
).validate()
UpdateExpressionExecutor(validated_ast, item, None).execute()
expected_item = Item(
hash_key=DynamoType({"S": "id"}),
hash_key_type="TYPE",
range_key=None,
range_key_type=None,
attrs={"id": {"S": "foo2"}, "s": {"N": "15"}},
)
assert expected_item == item
def test_execution_of_add_set_to_a_number():
update_expression = "add s :value"
update_expression_ast = UpdateExpressionParser.make(update_expression)
item = Item(
hash_key=DynamoType({"S": "id"}),
hash_key_type="TYPE",
range_key=None,
range_key_type=None,
attrs={"id": {"S": "foo2"}, "s": {"N": "5"},},
)
try:
validated_ast = UpdateExpressionValidator(
update_expression_ast,
expression_attribute_names=None,
expression_attribute_values={":value": {"SS": ["s1"]}},
item=item,
).validate()
UpdateExpressionExecutor(validated_ast, item, None).execute()
expected_item = Item(
hash_key=DynamoType({"S": "id"}),
hash_key_type="TYPE",
range_key=None,
range_key_type=None,
attrs={"id": {"S": "foo2"}, "s": {"N": "15"}},
)
assert expected_item == item
assert False
except IncorrectDataType:
assert True
def test_execution_of_add_to_a_set():
update_expression = "ADD s :value"
update_expression_ast = UpdateExpressionParser.make(update_expression)
item = Item(
hash_key=DynamoType({"S": "id"}),
hash_key_type="TYPE",
range_key=None,
range_key_type=None,
attrs={"id": {"S": "foo2"}, "s": {"SS": ["value1", "value2", "value3"]},},
)
validated_ast = UpdateExpressionValidator(
update_expression_ast,
expression_attribute_names=None,
expression_attribute_values={":value": {"SS": ["value2", "value5"]}},
item=item,
).validate()
UpdateExpressionExecutor(validated_ast, item, None).execute()
expected_item = Item(
hash_key=DynamoType({"S": "id"}),
hash_key_type="TYPE",
range_key=None,
range_key_type=None,
attrs={
"id": {"S": "foo2"},
"s": {"SS": ["value1", "value2", "value3", "value5"]},
},
)
assert expected_item == item
@parameterized(
[
({":value": {"S": "10"}}, "STRING",),
({":value": {"N": "10"}}, "NUMBER",),
({":value": {"B": "10"}}, "BINARY",),
({":value": {"BOOL": True}}, "BOOLEAN",),
({":value": {"NULL": True}}, "NULL",),
({":value": {"M": {"el0": {"S": "10"}}}}, "MAP",),
({":value": {"L": []}}, "LIST",),
]
)
def test_execution_of__delete_element_from_set_invalid_value(
expression_attribute_values, unexpected_data_type
):
"""A delete statement must use a value of type SS in order to delete elements from a set."""
update_expression = "delete s :value"
update_expression_ast = UpdateExpressionParser.make(update_expression)
item = Item(
hash_key=DynamoType({"S": "id"}),
hash_key_type="TYPE",
range_key=None,
range_key_type=None,
attrs={"id": {"S": "foo2"}, "s": {"SS": ["value1", "value2", "value3"]},},
)
try:
validated_ast = UpdateExpressionValidator(
update_expression_ast,
expression_attribute_names=None,
expression_attribute_values=expression_attribute_values,
item=item,
).validate()
UpdateExpressionExecutor(validated_ast, item, None).execute()
assert False, "Must raise exception"
except IncorrectOperandType as e:
assert e.operator_or_function == "operator: DELETE"
assert e.operand_type == unexpected_data_type
def test_execution_of_delete_element_from_a_string_attribute():
"""A delete statement must use a value of type SS in order to delete elements from a set."""
update_expression = "delete s :value"
update_expression_ast = UpdateExpressionParser.make(update_expression)
item = Item(
hash_key=DynamoType({"S": "id"}),
hash_key_type="TYPE",
range_key=None,
range_key_type=None,
attrs={"id": {"S": "foo2"}, "s": {"S": "5"},},
)
try:
validated_ast = UpdateExpressionValidator(
update_expression_ast,
expression_attribute_names=None,
expression_attribute_values={":value": {"SS": ["value2"]}},
item=item,
).validate()
UpdateExpressionExecutor(validated_ast, item, None).execute()
assert False, "Must raise exception"
except IncorrectDataType:
assert True

View file

@ -8,6 +8,8 @@ from boto3.dynamodb.conditions import Key
from botocore.exceptions import ClientError
import sure # noqa
from freezegun import freeze_time
from nose.tools import assert_raises
from moto import mock_dynamodb2, mock_dynamodb2_deprecated
from boto.exception import JSONResponseError
from tests.helpers import requires_boto_gte
@ -1273,6 +1275,15 @@ def test_update_item_with_expression():
)
def assert_failure_due_to_key_not_in_schema(func, **kwargs):
with assert_raises(ClientError) as ex:
func(**kwargs)
ex.exception.response["Error"]["Code"].should.equal("ValidationException")
ex.exception.response["Error"]["Message"].should.equal(
"The provided key element does not match the schema"
)
@mock_dynamodb2
def test_update_item_add_with_expression():
table = _create_table_with_range_key()
@ -1299,14 +1310,13 @@ def test_update_item_add_with_expression():
dict(table.get_item(Key=item_key)["Item"]).should.equal(current_item)
# Update item to add a string value to a non-existing set
# Should just create the set in the background
table.update_item(
# Should throw: 'The provided key element does not match the schema'
assert_failure_due_to_key_not_in_schema(
table.update_item,
Key=item_key,
UpdateExpression="ADD non_existing_str_set :v",
ExpressionAttributeValues={":v": {"item4"}},
)
current_item["non_existing_str_set"] = {"item4"}
dict(table.get_item(Key=item_key)["Item"]).should.equal(current_item)
# Update item to add a num value to a num set
table.update_item(
@ -1381,15 +1391,14 @@ def test_update_item_add_with_nested_sets():
dict(table.get_item(Key=item_key)["Item"]).should.equal(current_item)
# Update item to add a string value to a non-existing set
# Should just create the set in the background
table.update_item(
# Should raise
assert_failure_due_to_key_not_in_schema(
table.update_item,
Key=item_key,
UpdateExpression="ADD #ns.#ne :v",
ExpressionAttributeNames={"#ns": "nested", "#ne": "non_existing_str_set"},
ExpressionAttributeValues={":v": {"new_item"}},
)
current_item["nested"]["non_existing_str_set"] = {"new_item"}
dict(table.get_item(Key=item_key)["Item"]).should.equal(current_item)
@mock_dynamodb2

130
tests/test_eb/test_eb.py Normal file
View file

@ -0,0 +1,130 @@
import boto3
import sure # noqa
from botocore.exceptions import ClientError
from moto import mock_elasticbeanstalk
@mock_elasticbeanstalk
def test_create_application():
# Create Elastic Beanstalk Application
conn = boto3.client("elasticbeanstalk", region_name="us-east-1")
app = conn.create_application(ApplicationName="myapp",)
app["Application"]["ApplicationName"].should.equal("myapp")
@mock_elasticbeanstalk
def test_create_application_dup():
conn = boto3.client("elasticbeanstalk", region_name="us-east-1")
conn.create_application(ApplicationName="myapp",)
conn.create_application.when.called_with(ApplicationName="myapp",).should.throw(
ClientError
)
@mock_elasticbeanstalk
def test_describe_applications():
# Create Elastic Beanstalk Application
conn = boto3.client("elasticbeanstalk", region_name="us-east-1")
conn.create_application(ApplicationName="myapp",)
apps = conn.describe_applications()
len(apps["Applications"]).should.equal(1)
apps["Applications"][0]["ApplicationName"].should.equal("myapp")
@mock_elasticbeanstalk
def test_create_environment():
# Create Elastic Beanstalk Environment
conn = boto3.client("elasticbeanstalk", region_name="us-east-1")
app = conn.create_application(ApplicationName="myapp",)
env = conn.create_environment(ApplicationName="myapp", EnvironmentName="myenv",)
env["EnvironmentName"].should.equal("myenv")
@mock_elasticbeanstalk
def test_describe_environments():
# List Elastic Beanstalk Envs
conn = boto3.client("elasticbeanstalk", region_name="us-east-1")
conn.create_application(ApplicationName="myapp",)
conn.create_environment(
ApplicationName="myapp", EnvironmentName="myenv",
)
envs = conn.describe_environments()
envs = envs["Environments"]
len(envs).should.equal(1)
envs[0]["ApplicationName"].should.equal("myapp")
envs[0]["EnvironmentName"].should.equal("myenv")
def tags_dict_to_list(tag_dict):
tag_list = []
for key, value in tag_dict.items():
tag_list.append({"Key": key, "Value": value})
return tag_list
def tags_list_to_dict(tag_list):
tag_dict = {}
for tag in tag_list:
tag_dict[tag["Key"]] = tag["Value"]
return tag_dict
@mock_elasticbeanstalk
def test_create_environment_tags():
conn = boto3.client("elasticbeanstalk", region_name="us-east-1")
conn.create_application(ApplicationName="myapp",)
env_tags = {"initial key": "initial value"}
env = conn.create_environment(
ApplicationName="myapp",
EnvironmentName="myenv",
Tags=tags_dict_to_list(env_tags),
)
tags = conn.list_tags_for_resource(ResourceArn=env["EnvironmentArn"],)
tags["ResourceArn"].should.equal(env["EnvironmentArn"])
tags_list_to_dict(tags["ResourceTags"]).should.equal(env_tags)
@mock_elasticbeanstalk
def test_update_tags():
conn = boto3.client("elasticbeanstalk", region_name="us-east-1")
conn.create_application(ApplicationName="myapp",)
env_tags = {
"initial key": "initial value",
"to remove": "delete me",
"to update": "original",
}
env = conn.create_environment(
ApplicationName="myapp",
EnvironmentName="myenv",
Tags=tags_dict_to_list(env_tags),
)
extra_env_tags = {
"to update": "new",
"extra key": "extra value",
}
conn.update_tags_for_resource(
ResourceArn=env["EnvironmentArn"],
TagsToAdd=tags_dict_to_list(extra_env_tags),
TagsToRemove=["to remove"],
)
total_env_tags = env_tags.copy()
total_env_tags.update(extra_env_tags)
del total_env_tags["to remove"]
tags = conn.list_tags_for_resource(ResourceArn=env["EnvironmentArn"],)
tags["ResourceArn"].should.equal(env["EnvironmentArn"])
tags_list_to_dict(tags["ResourceTags"]).should.equal(total_env_tags)
@mock_elasticbeanstalk
def test_list_available_solution_stacks():
conn = boto3.client("elasticbeanstalk", region_name="us-east-1")
stacks = conn.list_available_solution_stacks()
len(stacks["SolutionStacks"]).should.be.greater_than(0)
len(stacks["SolutionStacks"]).should.be.equal(len(stacks["SolutionStackDetails"]))

View file

@ -9,6 +9,7 @@ from nose.tools import assert_raises
import base64
import datetime
import ipaddress
import json
import six
import boto
@ -18,7 +19,7 @@ from boto.exception import EC2ResponseError, EC2ResponseError
from freezegun import freeze_time
import sure # noqa
from moto import mock_ec2_deprecated, mock_ec2
from moto import mock_ec2_deprecated, mock_ec2, mock_cloudformation
from tests.helpers import requires_boto_gte
@ -1334,6 +1335,12 @@ def test_create_instance_ebs_optimized():
instance.load()
instance.ebs_optimized.should.be(False)
instance = ec2_resource.create_instances(
ImageId="ami-12345678", MaxCount=1, MinCount=1,
)[0]
instance.load()
instance.ebs_optimized.should.be(False)
@mock_ec2
def test_run_multiple_instances_in_same_command():
@ -1414,3 +1421,40 @@ def test_describe_instance_attribute():
invalid_instance_attribute=invalid_instance_attribute
)
ex.exception.response["Error"]["Message"].should.equal(message)
@mock_ec2
@mock_cloudformation
def test_volume_size_through_cloudformation():
ec2 = boto3.client("ec2", region_name="us-east-1")
cf = boto3.client("cloudformation", region_name="us-east-1")
volume_template = {
"AWSTemplateFormatVersion": "2010-09-09",
"Resources": {
"testInstance": {
"Type": "AWS::EC2::Instance",
"Properties": {
"ImageId": "ami-d3adb33f",
"KeyName": "dummy",
"InstanceType": "t2.micro",
"BlockDeviceMappings": [
{"DeviceName": "/dev/sda2", "Ebs": {"VolumeSize": "50"}}
],
"Tags": [
{"Key": "foo", "Value": "bar"},
{"Key": "blah", "Value": "baz"},
],
},
}
},
}
template_json = json.dumps(volume_template)
cf.create_stack(StackName="test_stack", TemplateBody=template_json)
instances = ec2.describe_instances()
volume = instances["Reservations"][0]["Instances"][0]["BlockDeviceMappings"][0][
"Ebs"
]
volumes = ec2.describe_volumes(VolumeIds=[volume["VolumeId"]])
volumes["Volumes"][0]["Size"].should.equal(50)

View file

@ -2218,6 +2218,29 @@ def test_boto3_deleted_versionings_list():
assert len(listed["Contents"]) == 1
@mock_s3
def test_boto3_delete_objects_for_specific_version_id():
client = boto3.client("s3", region_name=DEFAULT_REGION_NAME)
client.create_bucket(Bucket="blah")
client.put_bucket_versioning(
Bucket="blah", VersioningConfiguration={"Status": "Enabled"}
)
client.put_object(Bucket="blah", Key="test1", Body=b"test1a")
client.put_object(Bucket="blah", Key="test1", Body=b"test1b")
response = client.list_object_versions(Bucket="blah", Prefix="test1")
id_to_delete = [v["VersionId"] for v in response["Versions"] if v["IsLatest"]][0]
response = client.delete_objects(
Bucket="blah", Delete={"Objects": [{"Key": "test1", "VersionId": id_to_delete}]}
)
assert response["Deleted"] == [{"Key": "test1", "VersionId": id_to_delete}]
listed = client.list_objects_v2(Bucket="blah")
assert len(listed["Contents"]) == 1
@mock_s3
def test_boto3_delete_versioned_bucket():
client = boto3.client("s3", region_name=DEFAULT_REGION_NAME)
@ -3256,7 +3279,8 @@ def test_boto3_put_object_tagging_on_earliest_version():
# Older version has tags while the most recent does not
resp = s3.get_object_tagging(Bucket=bucket_name, Key=key, VersionId=first_object.id)
resp["ResponseMetadata"]["HTTPStatusCode"].should.equal(200)
resp["TagSet"].should.equal(
sorted_tagset = sorted(resp["TagSet"], key=lambda t: t["Key"])
sorted_tagset.should.equal(
[{"Key": "item1", "Value": "foo"}, {"Key": "item2", "Value": "bar"}]
)
@ -3334,7 +3358,8 @@ def test_boto3_put_object_tagging_on_both_version():
resp = s3.get_object_tagging(Bucket=bucket_name, Key=key, VersionId=first_object.id)
resp["ResponseMetadata"]["HTTPStatusCode"].should.equal(200)
resp["TagSet"].should.equal(
sorted_tagset = sorted(resp["TagSet"], key=lambda t: t["Key"])
sorted_tagset.should.equal(
[{"Key": "item1", "Value": "foo"}, {"Key": "item2", "Value": "bar"}]
)
@ -3342,7 +3367,8 @@ def test_boto3_put_object_tagging_on_both_version():
Bucket=bucket_name, Key=key, VersionId=second_object.id
)
resp["ResponseMetadata"]["HTTPStatusCode"].should.equal(200)
resp["TagSet"].should.equal(
sorted_tagset = sorted(resp["TagSet"], key=lambda t: t["Key"])
sorted_tagset.should.equal(
[{"Key": "item1", "Value": "baz"}, {"Key": "item2", "Value": "bin"}]
)
@ -3744,6 +3770,28 @@ def test_root_dir_with_empty_name_works():
store_and_read_back_a_key("/")
@parameterized(["mybucket", "my.bucket"])
@mock_s3
def test_leading_slashes_not_removed(bucket_name):
"""Make sure that leading slashes are not removed internally."""
s3 = boto3.client("s3", region_name=DEFAULT_REGION_NAME)
s3.create_bucket(Bucket=bucket_name)
uploaded_key = "/key"
invalid_key_1 = "key"
invalid_key_2 = "//key"
s3.put_object(Bucket=bucket_name, Key=uploaded_key, Body=b"Some body")
with assert_raises(ClientError) as e:
s3.get_object(Bucket=bucket_name, Key=invalid_key_1)
e.exception.response["Error"]["Code"].should.equal("NoSuchKey")
with assert_raises(ClientError) as e:
s3.get_object(Bucket=bucket_name, Key=invalid_key_2)
e.exception.response["Error"]["Code"].should.equal("NoSuchKey")
@parameterized(
[("foo/bar/baz",), ("foo",), ("foo/run_dt%3D2019-01-01%252012%253A30%253A00",)]
)
@ -4293,24 +4341,17 @@ def test_s3_config_dict():
FakeAcl,
FakeGrant,
FakeGrantee,
FakeTag,
FakeTagging,
FakeTagSet,
OWNER,
)
# Without any buckets:
assert not s3_config_query.get_config_resource("some_bucket")
tags = FakeTagging(
FakeTagSet(
[FakeTag("someTag", "someValue"), FakeTag("someOtherTag", "someOtherValue")]
)
)
tags = {"someTag": "someValue", "someOtherTag": "someOtherValue"}
# With 1 bucket in us-west-2:
s3_config_query.backends["global"].create_bucket("bucket1", "us-west-2")
s3_config_query.backends["global"].put_bucket_tagging("bucket1", tags)
s3_config_query.backends["global"].put_bucket_tags("bucket1", tags)
# With a log bucket:
s3_config_query.backends["global"].create_bucket("logbucket", "us-west-2")

View file

@ -137,6 +137,45 @@ def test_create_secret_with_tags():
]
@mock_secretsmanager
def test_create_secret_with_description():
conn = boto3.client("secretsmanager", region_name="us-east-1")
secret_name = "test-secret-with-tags"
result = conn.create_secret(
Name=secret_name, SecretString="foosecret", Description="desc"
)
assert result["ARN"]
assert result["Name"] == secret_name
secret_value = conn.get_secret_value(SecretId=secret_name)
assert secret_value["SecretString"] == "foosecret"
secret_details = conn.describe_secret(SecretId=secret_name)
assert secret_details["Description"] == "desc"
@mock_secretsmanager
def test_create_secret_with_tags_and_description():
conn = boto3.client("secretsmanager", region_name="us-east-1")
secret_name = "test-secret-with-tags"
result = conn.create_secret(
Name=secret_name,
SecretString="foosecret",
Description="desc",
Tags=[{"Key": "Foo", "Value": "Bar"}, {"Key": "Mykey", "Value": "Myvalue"}],
)
assert result["ARN"]
assert result["Name"] == secret_name
secret_value = conn.get_secret_value(SecretId=secret_name)
assert secret_value["SecretString"] == "foosecret"
secret_details = conn.describe_secret(SecretId=secret_name)
assert secret_details["Tags"] == [
{"Key": "Foo", "Value": "Bar"},
{"Key": "Mykey", "Value": "Myvalue"},
]
assert secret_details["Description"] == "desc"
@mock_secretsmanager
def test_delete_secret():
conn = boto3.client("secretsmanager", region_name="us-west-2")
@ -690,6 +729,31 @@ def test_put_secret_value_versions_differ_if_same_secret_put_twice():
assert first_version_id != second_version_id
@mock_secretsmanager
def test_put_secret_value_maintains_description_and_tags():
conn = boto3.client("secretsmanager", region_name="us-west-2")
conn.create_secret(
Name=DEFAULT_SECRET_NAME,
SecretString="foosecret",
Description="desc",
Tags=[{"Key": "Foo", "Value": "Bar"}, {"Key": "Mykey", "Value": "Myvalue"}],
)
conn = boto3.client("secretsmanager", region_name="us-west-2")
conn.put_secret_value(
SecretId=DEFAULT_SECRET_NAME,
SecretString="dupe_secret",
VersionStages=["AWSCURRENT"],
)
secret_details = conn.describe_secret(SecretId=DEFAULT_SECRET_NAME)
assert secret_details["Tags"] == [
{"Key": "Foo", "Value": "Bar"},
{"Key": "Mykey", "Value": "Myvalue"},
]
assert secret_details["Description"] == "desc"
@mock_secretsmanager
def test_can_list_secret_version_ids():
conn = boto3.client("secretsmanager", region_name="us-west-2")
@ -739,6 +803,43 @@ def test_update_secret():
assert created_secret["VersionId"] != updated_secret["VersionId"]
@mock_secretsmanager
def test_update_secret_with_tags_and_description():
conn = boto3.client("secretsmanager", region_name="us-west-2")
created_secret = conn.create_secret(
Name="test-secret",
SecretString="foosecret",
Description="desc",
Tags=[{"Key": "Foo", "Value": "Bar"}, {"Key": "Mykey", "Value": "Myvalue"}],
)
assert created_secret["ARN"]
assert created_secret["Name"] == "test-secret"
assert created_secret["VersionId"] != ""
secret = conn.get_secret_value(SecretId="test-secret")
assert secret["SecretString"] == "foosecret"
updated_secret = conn.update_secret(
SecretId="test-secret", SecretString="barsecret"
)
assert updated_secret["ARN"]
assert updated_secret["Name"] == "test-secret"
assert updated_secret["VersionId"] != ""
secret = conn.get_secret_value(SecretId="test-secret")
assert secret["SecretString"] == "barsecret"
assert created_secret["VersionId"] != updated_secret["VersionId"]
secret_details = conn.describe_secret(SecretId="test-secret")
assert secret_details["Tags"] == [
{"Key": "Foo", "Value": "Bar"},
{"Key": "Mykey", "Value": "Myvalue"},
]
assert secret_details["Description"] == "desc"
@mock_secretsmanager
def test_update_secret_which_does_not_exit():
conn = boto3.client("secretsmanager", region_name="us-west-2")

View file

@ -1,4 +1,5 @@
from __future__ import unicode_literals
from base64 import b64encode
import json
import boto
@ -103,6 +104,128 @@ def test_assume_role():
)
@freeze_time("2012-01-01 12:00:00")
@mock_sts
def test_assume_role_with_saml():
client = boto3.client("sts", region_name="us-east-1")
session_name = "session-name"
policy = json.dumps(
{
"Statement": [
{
"Sid": "Stmt13690092345534",
"Action": ["S3:ListBucket"],
"Effect": "Allow",
"Resource": ["arn:aws:s3:::foobar-tester"],
}
]
}
)
role_name = "test-role"
provider_name = "TestProvFed"
user_name = "testuser"
role_input = "arn:aws:iam::{account_id}:role/{role_name}".format(
account_id=ACCOUNT_ID, role_name=role_name
)
principal_role = "arn:aws:iam:{account_id}:saml-provider/{provider_name}".format(
account_id=ACCOUNT_ID, provider_name=provider_name
)
saml_assertion = """
<?xml version="1.0"?>
<samlp:Response xmlns:samlp="urn:oasis:names:tc:SAML:2.0:protocol" ID="_00000000-0000-0000-0000-000000000000" Version="2.0" IssueInstant="2012-01-01T12:00:00.000Z" Destination="https://signin.aws.amazon.com/saml" Consent="urn:oasis:names:tc:SAML:2.0:consent:unspecified">
<Issuer xmlns="urn:oasis:names:tc:SAML:2.0:assertion">http://localhost/</Issuer>
<samlp:Status>
<samlp:StatusCode Value="urn:oasis:names:tc:SAML:2.0:status:Success"/>
</samlp:Status>
<Assertion xmlns="urn:oasis:names:tc:SAML:2.0:assertion" ID="_00000000-0000-0000-0000-000000000000" IssueInstant="2012-12-01T12:00:00.000Z" Version="2.0">
<Issuer>http://localhost:3000/</Issuer>
<ds:Signature xmlns:ds="http://www.w3.org/2000/09/xmldsig#">
<ds:SignedInfo>
<ds:CanonicalizationMethod Algorithm="http://www.w3.org/2001/10/xml-exc-c14n#"/>
<ds:SignatureMethod Algorithm="http://www.w3.org/2001/04/xmldsig-more#rsa-sha256"/>
<ds:Reference URI="#_00000000-0000-0000-0000-000000000000">
<ds:Transforms>
<ds:Transform Algorithm="http://www.w3.org/2000/09/xmldsig#enveloped-signature"/>
<ds:Transform Algorithm="http://www.w3.org/2001/10/xml-exc-c14n#"/>
</ds:Transforms>
<ds:DigestMethod Algorithm="http://www.w3.org/2001/04/xmlenc#sha256"/>
<ds:DigestValue>NTIyMzk0ZGI4MjI0ZjI5ZGNhYjkyOGQyZGQ1NTZjODViZjk5YTY4ODFjOWRjNjkyYzZmODY2ZDQ4NjlkZjY3YSAgLQo=</ds:DigestValue>
</ds:Reference>
</ds:SignedInfo>
<ds:SignatureValue>NTIyMzk0ZGI4MjI0ZjI5ZGNhYjkyOGQyZGQ1NTZjODViZjk5YTY4ODFjOWRjNjkyYzZmODY2ZDQ4NjlkZjY3YSAgLQo=</ds:SignatureValue>
<KeyInfo xmlns="http://www.w3.org/2000/09/xmldsig#">
<ds:X509Data>
<ds:X509Certificate>NTIyMzk0ZGI4MjI0ZjI5ZGNhYjkyOGQyZGQ1NTZjODViZjk5YTY4ODFjOWRjNjkyYzZmODY2ZDQ4NjlkZjY3YSAgLQo=</ds:X509Certificate>
</ds:X509Data>
</KeyInfo>
</ds:Signature>
<Subject>
<NameID Format="urn:oasis:names:tc:SAML:2.0:nameid-format:persistent">{username}</NameID>
<SubjectConfirmation Method="urn:oasis:names:tc:SAML:2.0:cm:bearer">
<SubjectConfirmationData NotOnOrAfter="2012-01-01T13:00:00.000Z" Recipient="https://signin.aws.amazon.com/saml"/>
</SubjectConfirmation>
</Subject>
<Conditions NotBefore="2012-01-01T12:00:00.000Z" NotOnOrAfter="2012-01-01T13:00:00.000Z">
<AudienceRestriction>
<Audience>urn:amazon:webservices</Audience>
</AudienceRestriction>
</Conditions>
<AttributeStatement>
<Attribute Name="https://aws.amazon.com/SAML/Attributes/RoleSessionName">
<AttributeValue>{username}@localhost</AttributeValue>
</Attribute>
<Attribute Name="https://aws.amazon.com/SAML/Attributes/Role">
<AttributeValue>arn:aws:iam::{account_id}:saml-provider/{provider_name},arn:aws:iam::{account_id}:role/{role_name}</AttributeValue>
</Attribute>
<Attribute Name="https://aws.amazon.com/SAML/Attributes/SessionDuration">
<AttributeValue>900</AttributeValue>
</Attribute>
</AttributeStatement>
<AuthnStatement AuthnInstant="2012-01-01T12:00:00.000Z" SessionIndex="_00000000-0000-0000-0000-000000000000">
<AuthnContext>
<AuthnContextClassRef>urn:oasis:names:tc:SAML:2.0:ac:classes:PasswordProtectedTransport</AuthnContextClassRef>
</AuthnContext>
</AuthnStatement>
</Assertion>
</samlp:Response>""".format(
account_id=ACCOUNT_ID,
role_name=role_name,
provider_name=provider_name,
username=user_name,
).replace(
"\n", ""
)
assume_role_response = client.assume_role_with_saml(
RoleArn=role_input,
PrincipalArn=principal_role,
SAMLAssertion=b64encode(saml_assertion.encode("utf-8")).decode("utf-8"),
)
credentials = assume_role_response["Credentials"]
if not settings.TEST_SERVER_MODE:
credentials["Expiration"].isoformat().should.equal("2012-01-01T12:15:00+00:00")
credentials["SessionToken"].should.have.length_of(356)
assert credentials["SessionToken"].startswith("FQoGZXIvYXdzE")
credentials["AccessKeyId"].should.have.length_of(20)
assert credentials["AccessKeyId"].startswith("ASIA")
credentials["SecretAccessKey"].should.have.length_of(40)
assume_role_response["AssumedRoleUser"]["Arn"].should.equal(
"arn:aws:sts::{account_id}:assumed-role/{role_name}/{fed_name}@localhost".format(
account_id=ACCOUNT_ID, role_name=role_name, fed_name=user_name
)
)
assert assume_role_response["AssumedRoleUser"]["AssumedRoleId"].startswith("AROA")
assert assume_role_response["AssumedRoleUser"]["AssumedRoleId"].endswith(
":{fed_name}@localhost".format(fed_name=user_name)
)
assume_role_response["AssumedRoleUser"]["AssumedRoleId"].should.have.length_of(
21 + 1 + len("{fed_name}@localhost".format(fed_name=user_name))
)
@freeze_time("2012-01-01 12:00:00")
@mock_sts_deprecated
def test_assume_role_with_web_identity():

View file

@ -77,3 +77,34 @@ def test_extract_tag_names():
expected = ["key1", "key2"]
expected.should.be.equal(actual)
def test_copy_non_existing_arn():
svc = TaggingService()
tags = [{"Key": "key1", "Value": "value1"}, {"Key": "key2", "Value": "value2"}]
svc.tag_resource("new_arn", tags)
#
svc.copy_tags("non_existing_arn", "new_arn")
# Copying from a non-existing ARN should a NOOP
# Assert the old tags still exist
actual = sorted(
svc.list_tags_for_resource("new_arn")["Tags"], key=lambda t: t["Key"]
)
actual.should.equal(tags)
def test_copy_existing_arn():
svc = TaggingService()
tags_old_arn = [{"Key": "key1", "Value": "value1"}]
tags_new_arn = [{"Key": "key2", "Value": "value2"}]
svc.tag_resource("old_arn", tags_old_arn)
svc.tag_resource("new_arn", tags_new_arn)
#
svc.copy_tags("old_arn", "new_arn")
# Assert the old tags still exist
actual = sorted(
svc.list_tags_for_resource("new_arn")["Tags"], key=lambda t: t["Key"]
)
actual.should.equal(
[{"Key": "key1", "Value": "value1"}, {"Key": "key2", "Value": "value2"}]
)