Merge branch 'master' into master
This commit is contained in:
commit
39c103ec0b
51 changed files with 2538 additions and 249 deletions
|
|
@ -391,6 +391,9 @@ def test_create_change_set_from_s3_url():
|
|||
TemplateURL=key_url,
|
||||
ChangeSetName='NewChangeSet',
|
||||
ChangeSetType='CREATE',
|
||||
Tags=[
|
||||
{'Key': 'tag-key', 'Value': 'tag-value'}
|
||||
],
|
||||
)
|
||||
assert 'arn:aws:cloudformation:us-west-1:123456789:changeSet/NewChangeSet/' in response['Id']
|
||||
assert 'arn:aws:cloudformation:us-east-1:123456789:stack/NewStack' in response['StackId']
|
||||
|
|
|
|||
115
tests/test_cloudformation/test_validate.py
Normal file
115
tests/test_cloudformation/test_validate.py
Normal file
|
|
@ -0,0 +1,115 @@
|
|||
from collections import OrderedDict
|
||||
import json
|
||||
import yaml
|
||||
import os
|
||||
import boto3
|
||||
from nose.tools import raises
|
||||
import botocore
|
||||
|
||||
|
||||
from moto.cloudformation.exceptions import ValidationError
|
||||
from moto.cloudformation.models import FakeStack
|
||||
from moto.cloudformation.parsing import resource_class_from_type, parse_condition, Export
|
||||
from moto.sqs.models import Queue
|
||||
from moto.s3.models import FakeBucket
|
||||
from moto.cloudformation.utils import yaml_tag_constructor
|
||||
from boto.cloudformation.stack import Output
|
||||
from moto import mock_cloudformation, mock_s3, mock_sqs, mock_ec2
|
||||
|
||||
json_template = {
|
||||
"AWSTemplateFormatVersion": "2010-09-09",
|
||||
"Description": "Stack 1",
|
||||
"Resources": {
|
||||
"EC2Instance1": {
|
||||
"Type": "AWS::EC2::Instance",
|
||||
"Properties": {
|
||||
"ImageId": "ami-d3adb33f",
|
||||
"KeyName": "dummy",
|
||||
"InstanceType": "t2.micro",
|
||||
"Tags": [
|
||||
{
|
||||
"Key": "Description",
|
||||
"Value": "Test tag"
|
||||
},
|
||||
{
|
||||
"Key": "Name",
|
||||
"Value": "Name tag for tests"
|
||||
}
|
||||
]
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
# One resource is required
|
||||
json_bad_template = {
|
||||
"AWSTemplateFormatVersion": "2010-09-09",
|
||||
"Description": "Stack 1"
|
||||
}
|
||||
|
||||
dummy_template_json = json.dumps(json_template)
|
||||
dummy_bad_template_json = json.dumps(json_bad_template)
|
||||
|
||||
|
||||
@mock_cloudformation
|
||||
def test_boto3_json_validate_successful():
|
||||
cf_conn = boto3.client('cloudformation', region_name='us-east-1')
|
||||
response = cf_conn.validate_template(
|
||||
TemplateBody=dummy_template_json,
|
||||
)
|
||||
assert response['Description'] == "Stack 1"
|
||||
assert response['Parameters'] == []
|
||||
assert response['ResponseMetadata']['HTTPStatusCode'] == 200
|
||||
|
||||
@mock_cloudformation
|
||||
def test_boto3_json_invalid_missing_resource():
|
||||
cf_conn = boto3.client('cloudformation', region_name='us-east-1')
|
||||
try:
|
||||
cf_conn.validate_template(
|
||||
TemplateBody=dummy_bad_template_json,
|
||||
)
|
||||
assert False
|
||||
except botocore.exceptions.ClientError as e:
|
||||
assert str(e) == 'An error occurred (ValidationError) when calling the ValidateTemplate operation: Stack' \
|
||||
' with id Missing top level item Resources to file module does not exist'
|
||||
assert True
|
||||
|
||||
|
||||
yaml_template = """
|
||||
AWSTemplateFormatVersion: '2010-09-09'
|
||||
Description: Simple CloudFormation Test Template
|
||||
Resources:
|
||||
S3Bucket:
|
||||
Type: AWS::S3::Bucket
|
||||
Properties:
|
||||
AccessControl: PublicRead
|
||||
BucketName: cf-test-bucket-1
|
||||
"""
|
||||
|
||||
yaml_bad_template = """
|
||||
AWSTemplateFormatVersion: '2010-09-09'
|
||||
Description: Simple CloudFormation Test Template
|
||||
"""
|
||||
|
||||
@mock_cloudformation
|
||||
def test_boto3_yaml_validate_successful():
|
||||
cf_conn = boto3.client('cloudformation', region_name='us-east-1')
|
||||
response = cf_conn.validate_template(
|
||||
TemplateBody=yaml_template,
|
||||
)
|
||||
assert response['Description'] == "Simple CloudFormation Test Template"
|
||||
assert response['Parameters'] == []
|
||||
assert response['ResponseMetadata']['HTTPStatusCode'] == 200
|
||||
|
||||
@mock_cloudformation
|
||||
def test_boto3_yaml_invalid_missing_resource():
|
||||
cf_conn = boto3.client('cloudformation', region_name='us-east-1')
|
||||
try:
|
||||
cf_conn.validate_template(
|
||||
TemplateBody=yaml_bad_template,
|
||||
)
|
||||
assert False
|
||||
except botocore.exceptions.ClientError as e:
|
||||
assert str(e) == 'An error occurred (ValidationError) when calling the ValidateTemplate operation: Stack' \
|
||||
' with id Missing top level item Resources to file module does not exist'
|
||||
assert True
|
||||
|
|
@ -1,14 +1,18 @@
|
|||
from __future__ import unicode_literals
|
||||
|
||||
import boto3
|
||||
import json
|
||||
import os
|
||||
import random
|
||||
import uuid
|
||||
|
||||
import boto3
|
||||
# noinspection PyUnresolvedReferences
|
||||
import sure # noqa
|
||||
from botocore.exceptions import ClientError
|
||||
from jose import jws
|
||||
from nose.tools import assert_raises
|
||||
|
||||
from moto import mock_cognitoidp
|
||||
import sure # noqa
|
||||
|
||||
|
||||
@mock_cognitoidp
|
||||
|
|
@ -41,6 +45,56 @@ def test_list_user_pools():
|
|||
result["UserPools"][0]["Name"].should.equal(name)
|
||||
|
||||
|
||||
@mock_cognitoidp
|
||||
def test_list_user_pools_returns_max_items():
|
||||
conn = boto3.client("cognito-idp", "us-west-2")
|
||||
|
||||
# Given 10 user pools
|
||||
pool_count = 10
|
||||
for i in range(pool_count):
|
||||
conn.create_user_pool(PoolName=str(uuid.uuid4()))
|
||||
|
||||
max_results = 5
|
||||
result = conn.list_user_pools(MaxResults=max_results)
|
||||
result["UserPools"].should.have.length_of(max_results)
|
||||
result.should.have.key("NextToken")
|
||||
|
||||
|
||||
@mock_cognitoidp
|
||||
def test_list_user_pools_returns_next_tokens():
|
||||
conn = boto3.client("cognito-idp", "us-west-2")
|
||||
|
||||
# Given 10 user pool clients
|
||||
pool_count = 10
|
||||
for i in range(pool_count):
|
||||
conn.create_user_pool(PoolName=str(uuid.uuid4()))
|
||||
|
||||
max_results = 5
|
||||
result = conn.list_user_pools(MaxResults=max_results)
|
||||
result["UserPools"].should.have.length_of(max_results)
|
||||
result.should.have.key("NextToken")
|
||||
|
||||
next_token = result["NextToken"]
|
||||
result_2 = conn.list_user_pools(MaxResults=max_results, NextToken=next_token)
|
||||
result_2["UserPools"].should.have.length_of(max_results)
|
||||
result_2.shouldnt.have.key("NextToken")
|
||||
|
||||
|
||||
@mock_cognitoidp
|
||||
def test_list_user_pools_when_max_items_more_than_total_items():
|
||||
conn = boto3.client("cognito-idp", "us-west-2")
|
||||
|
||||
# Given 10 user pool clients
|
||||
pool_count = 10
|
||||
for i in range(pool_count):
|
||||
conn.create_user_pool(PoolName=str(uuid.uuid4()))
|
||||
|
||||
max_results = pool_count + 5
|
||||
result = conn.list_user_pools(MaxResults=max_results)
|
||||
result["UserPools"].should.have.length_of(pool_count)
|
||||
result.shouldnt.have.key("NextToken")
|
||||
|
||||
|
||||
@mock_cognitoidp
|
||||
def test_describe_user_pool():
|
||||
conn = boto3.client("cognito-idp", "us-west-2")
|
||||
|
|
@ -140,6 +194,67 @@ def test_list_user_pool_clients():
|
|||
result["UserPoolClients"][0]["ClientName"].should.equal(client_name)
|
||||
|
||||
|
||||
@mock_cognitoidp
|
||||
def test_list_user_pool_clients_returns_max_items():
|
||||
conn = boto3.client("cognito-idp", "us-west-2")
|
||||
user_pool_id = conn.create_user_pool(PoolName=str(uuid.uuid4()))["UserPool"]["Id"]
|
||||
|
||||
# Given 10 user pool clients
|
||||
client_count = 10
|
||||
for i in range(client_count):
|
||||
client_name = str(uuid.uuid4())
|
||||
conn.create_user_pool_client(UserPoolId=user_pool_id,
|
||||
ClientName=client_name)
|
||||
max_results = 5
|
||||
result = conn.list_user_pool_clients(UserPoolId=user_pool_id,
|
||||
MaxResults=max_results)
|
||||
result["UserPoolClients"].should.have.length_of(max_results)
|
||||
result.should.have.key("NextToken")
|
||||
|
||||
|
||||
@mock_cognitoidp
|
||||
def test_list_user_pool_clients_returns_next_tokens():
|
||||
conn = boto3.client("cognito-idp", "us-west-2")
|
||||
user_pool_id = conn.create_user_pool(PoolName=str(uuid.uuid4()))["UserPool"]["Id"]
|
||||
|
||||
# Given 10 user pool clients
|
||||
client_count = 10
|
||||
for i in range(client_count):
|
||||
client_name = str(uuid.uuid4())
|
||||
conn.create_user_pool_client(UserPoolId=user_pool_id,
|
||||
ClientName=client_name)
|
||||
max_results = 5
|
||||
result = conn.list_user_pool_clients(UserPoolId=user_pool_id,
|
||||
MaxResults=max_results)
|
||||
result["UserPoolClients"].should.have.length_of(max_results)
|
||||
result.should.have.key("NextToken")
|
||||
|
||||
next_token = result["NextToken"]
|
||||
result_2 = conn.list_user_pool_clients(UserPoolId=user_pool_id,
|
||||
MaxResults=max_results,
|
||||
NextToken=next_token)
|
||||
result_2["UserPoolClients"].should.have.length_of(max_results)
|
||||
result_2.shouldnt.have.key("NextToken")
|
||||
|
||||
|
||||
@mock_cognitoidp
|
||||
def test_list_user_pool_clients_when_max_items_more_than_total_items():
|
||||
conn = boto3.client("cognito-idp", "us-west-2")
|
||||
user_pool_id = conn.create_user_pool(PoolName=str(uuid.uuid4()))["UserPool"]["Id"]
|
||||
|
||||
# Given 10 user pool clients
|
||||
client_count = 10
|
||||
for i in range(client_count):
|
||||
client_name = str(uuid.uuid4())
|
||||
conn.create_user_pool_client(UserPoolId=user_pool_id,
|
||||
ClientName=client_name)
|
||||
max_results = client_count + 5
|
||||
result = conn.list_user_pool_clients(UserPoolId=user_pool_id,
|
||||
MaxResults=max_results)
|
||||
result["UserPoolClients"].should.have.length_of(client_count)
|
||||
result.shouldnt.have.key("NextToken")
|
||||
|
||||
|
||||
@mock_cognitoidp
|
||||
def test_describe_user_pool_client():
|
||||
conn = boto3.client("cognito-idp", "us-west-2")
|
||||
|
|
@ -264,6 +379,83 @@ def test_list_identity_providers():
|
|||
result["Providers"][0]["ProviderType"].should.equal(provider_type)
|
||||
|
||||
|
||||
@mock_cognitoidp
|
||||
def test_list_identity_providers_returns_max_items():
|
||||
conn = boto3.client("cognito-idp", "us-west-2")
|
||||
user_pool_id = conn.create_user_pool(PoolName=str(uuid.uuid4()))["UserPool"]["Id"]
|
||||
|
||||
# Given 10 identity providers linked to a user pool
|
||||
identity_provider_count = 10
|
||||
for i in range(identity_provider_count):
|
||||
provider_name = str(uuid.uuid4())
|
||||
provider_type = "Facebook"
|
||||
conn.create_identity_provider(
|
||||
UserPoolId=user_pool_id,
|
||||
ProviderName=provider_name,
|
||||
ProviderType=provider_type,
|
||||
ProviderDetails={},
|
||||
)
|
||||
|
||||
max_results = 5
|
||||
result = conn.list_identity_providers(UserPoolId=user_pool_id,
|
||||
MaxResults=max_results)
|
||||
result["Providers"].should.have.length_of(max_results)
|
||||
result.should.have.key("NextToken")
|
||||
|
||||
|
||||
@mock_cognitoidp
|
||||
def test_list_identity_providers_returns_next_tokens():
|
||||
conn = boto3.client("cognito-idp", "us-west-2")
|
||||
user_pool_id = conn.create_user_pool(PoolName=str(uuid.uuid4()))["UserPool"]["Id"]
|
||||
|
||||
# Given 10 identity providers linked to a user pool
|
||||
identity_provider_count = 10
|
||||
for i in range(identity_provider_count):
|
||||
provider_name = str(uuid.uuid4())
|
||||
provider_type = "Facebook"
|
||||
conn.create_identity_provider(
|
||||
UserPoolId=user_pool_id,
|
||||
ProviderName=provider_name,
|
||||
ProviderType=provider_type,
|
||||
ProviderDetails={},
|
||||
)
|
||||
|
||||
max_results = 5
|
||||
result = conn.list_identity_providers(UserPoolId=user_pool_id, MaxResults=max_results)
|
||||
result["Providers"].should.have.length_of(max_results)
|
||||
result.should.have.key("NextToken")
|
||||
|
||||
next_token = result["NextToken"]
|
||||
result_2 = conn.list_identity_providers(UserPoolId=user_pool_id,
|
||||
MaxResults=max_results,
|
||||
NextToken=next_token)
|
||||
result_2["Providers"].should.have.length_of(max_results)
|
||||
result_2.shouldnt.have.key("NextToken")
|
||||
|
||||
|
||||
@mock_cognitoidp
|
||||
def test_list_identity_providers_when_max_items_more_than_total_items():
|
||||
conn = boto3.client("cognito-idp", "us-west-2")
|
||||
user_pool_id = conn.create_user_pool(PoolName=str(uuid.uuid4()))["UserPool"]["Id"]
|
||||
|
||||
# Given 10 identity providers linked to a user pool
|
||||
identity_provider_count = 10
|
||||
for i in range(identity_provider_count):
|
||||
provider_name = str(uuid.uuid4())
|
||||
provider_type = "Facebook"
|
||||
conn.create_identity_provider(
|
||||
UserPoolId=user_pool_id,
|
||||
ProviderName=provider_name,
|
||||
ProviderType=provider_type,
|
||||
ProviderDetails={},
|
||||
)
|
||||
|
||||
max_results = identity_provider_count + 5
|
||||
result = conn.list_identity_providers(UserPoolId=user_pool_id, MaxResults=max_results)
|
||||
result["Providers"].should.have.length_of(identity_provider_count)
|
||||
result.shouldnt.have.key("NextToken")
|
||||
|
||||
|
||||
@mock_cognitoidp
|
||||
def test_describe_identity_providers():
|
||||
conn = boto3.client("cognito-idp", "us-west-2")
|
||||
|
|
@ -323,6 +515,245 @@ def test_delete_identity_providers():
|
|||
caught.should.be.true
|
||||
|
||||
|
||||
@mock_cognitoidp
|
||||
def test_create_group():
|
||||
conn = boto3.client("cognito-idp", "us-west-2")
|
||||
|
||||
user_pool_id = conn.create_user_pool(PoolName=str(uuid.uuid4()))["UserPool"]["Id"]
|
||||
group_name = str(uuid.uuid4())
|
||||
description = str(uuid.uuid4())
|
||||
role_arn = "arn:aws:iam:::role/my-iam-role"
|
||||
precedence = random.randint(0, 100000)
|
||||
|
||||
result = conn.create_group(
|
||||
GroupName=group_name,
|
||||
UserPoolId=user_pool_id,
|
||||
Description=description,
|
||||
RoleArn=role_arn,
|
||||
Precedence=precedence,
|
||||
)
|
||||
|
||||
result["Group"]["GroupName"].should.equal(group_name)
|
||||
result["Group"]["UserPoolId"].should.equal(user_pool_id)
|
||||
result["Group"]["Description"].should.equal(description)
|
||||
result["Group"]["RoleArn"].should.equal(role_arn)
|
||||
result["Group"]["Precedence"].should.equal(precedence)
|
||||
result["Group"]["LastModifiedDate"].should.be.a("datetime.datetime")
|
||||
result["Group"]["CreationDate"].should.be.a("datetime.datetime")
|
||||
|
||||
|
||||
@mock_cognitoidp
|
||||
def test_create_group_with_duplicate_name_raises_error():
|
||||
conn = boto3.client("cognito-idp", "us-west-2")
|
||||
|
||||
user_pool_id = conn.create_user_pool(PoolName=str(uuid.uuid4()))["UserPool"]["Id"]
|
||||
group_name = str(uuid.uuid4())
|
||||
|
||||
conn.create_group(GroupName=group_name, UserPoolId=user_pool_id)
|
||||
|
||||
with assert_raises(ClientError) as cm:
|
||||
conn.create_group(GroupName=group_name, UserPoolId=user_pool_id)
|
||||
cm.exception.operation_name.should.equal('CreateGroup')
|
||||
cm.exception.response['Error']['Code'].should.equal('GroupExistsException')
|
||||
cm.exception.response['ResponseMetadata']['HTTPStatusCode'].should.equal(400)
|
||||
|
||||
|
||||
@mock_cognitoidp
|
||||
def test_get_group():
|
||||
conn = boto3.client("cognito-idp", "us-west-2")
|
||||
|
||||
user_pool_id = conn.create_user_pool(PoolName=str(uuid.uuid4()))["UserPool"]["Id"]
|
||||
group_name = str(uuid.uuid4())
|
||||
conn.create_group(GroupName=group_name, UserPoolId=user_pool_id)
|
||||
|
||||
result = conn.get_group(GroupName=group_name, UserPoolId=user_pool_id)
|
||||
|
||||
result["Group"]["GroupName"].should.equal(group_name)
|
||||
result["Group"]["UserPoolId"].should.equal(user_pool_id)
|
||||
result["Group"]["LastModifiedDate"].should.be.a("datetime.datetime")
|
||||
result["Group"]["CreationDate"].should.be.a("datetime.datetime")
|
||||
|
||||
|
||||
@mock_cognitoidp
|
||||
def test_list_groups():
|
||||
conn = boto3.client("cognito-idp", "us-west-2")
|
||||
|
||||
user_pool_id = conn.create_user_pool(PoolName=str(uuid.uuid4()))["UserPool"]["Id"]
|
||||
group_name = str(uuid.uuid4())
|
||||
conn.create_group(GroupName=group_name, UserPoolId=user_pool_id)
|
||||
|
||||
result = conn.list_groups(UserPoolId=user_pool_id)
|
||||
|
||||
result["Groups"].should.have.length_of(1)
|
||||
result["Groups"][0]["GroupName"].should.equal(group_name)
|
||||
|
||||
|
||||
@mock_cognitoidp
|
||||
def test_delete_group():
|
||||
conn = boto3.client("cognito-idp", "us-west-2")
|
||||
|
||||
user_pool_id = conn.create_user_pool(PoolName=str(uuid.uuid4()))["UserPool"]["Id"]
|
||||
group_name = str(uuid.uuid4())
|
||||
conn.create_group(GroupName=group_name, UserPoolId=user_pool_id)
|
||||
|
||||
result = conn.delete_group(GroupName=group_name, UserPoolId=user_pool_id)
|
||||
list(result.keys()).should.equal(["ResponseMetadata"]) # No response expected
|
||||
|
||||
with assert_raises(ClientError) as cm:
|
||||
conn.get_group(GroupName=group_name, UserPoolId=user_pool_id)
|
||||
cm.exception.response['Error']['Code'].should.equal('ResourceNotFoundException')
|
||||
|
||||
|
||||
@mock_cognitoidp
|
||||
def test_admin_add_user_to_group():
|
||||
conn = boto3.client("cognito-idp", "us-west-2")
|
||||
|
||||
user_pool_id = conn.create_user_pool(PoolName=str(uuid.uuid4()))["UserPool"]["Id"]
|
||||
group_name = str(uuid.uuid4())
|
||||
conn.create_group(GroupName=group_name, UserPoolId=user_pool_id)
|
||||
|
||||
username = str(uuid.uuid4())
|
||||
conn.admin_create_user(UserPoolId=user_pool_id, Username=username)
|
||||
|
||||
result = conn.admin_add_user_to_group(UserPoolId=user_pool_id, Username=username, GroupName=group_name)
|
||||
list(result.keys()).should.equal(["ResponseMetadata"]) # No response expected
|
||||
|
||||
|
||||
@mock_cognitoidp
|
||||
def test_admin_add_user_to_group_again_is_noop():
|
||||
conn = boto3.client("cognito-idp", "us-west-2")
|
||||
|
||||
user_pool_id = conn.create_user_pool(PoolName=str(uuid.uuid4()))["UserPool"]["Id"]
|
||||
group_name = str(uuid.uuid4())
|
||||
conn.create_group(GroupName=group_name, UserPoolId=user_pool_id)
|
||||
|
||||
username = str(uuid.uuid4())
|
||||
conn.admin_create_user(UserPoolId=user_pool_id, Username=username)
|
||||
|
||||
conn.admin_add_user_to_group(UserPoolId=user_pool_id, Username=username, GroupName=group_name)
|
||||
conn.admin_add_user_to_group(UserPoolId=user_pool_id, Username=username, GroupName=group_name)
|
||||
|
||||
|
||||
@mock_cognitoidp
|
||||
def test_list_users_in_group():
|
||||
conn = boto3.client("cognito-idp", "us-west-2")
|
||||
|
||||
user_pool_id = conn.create_user_pool(PoolName=str(uuid.uuid4()))["UserPool"]["Id"]
|
||||
group_name = str(uuid.uuid4())
|
||||
conn.create_group(GroupName=group_name, UserPoolId=user_pool_id)
|
||||
|
||||
username = str(uuid.uuid4())
|
||||
conn.admin_create_user(UserPoolId=user_pool_id, Username=username)
|
||||
|
||||
conn.admin_add_user_to_group(UserPoolId=user_pool_id, Username=username, GroupName=group_name)
|
||||
|
||||
result = conn.list_users_in_group(UserPoolId=user_pool_id, GroupName=group_name)
|
||||
|
||||
result["Users"].should.have.length_of(1)
|
||||
result["Users"][0]["Username"].should.equal(username)
|
||||
|
||||
|
||||
@mock_cognitoidp
|
||||
def test_list_users_in_group_ignores_deleted_user():
|
||||
conn = boto3.client("cognito-idp", "us-west-2")
|
||||
|
||||
user_pool_id = conn.create_user_pool(PoolName=str(uuid.uuid4()))["UserPool"]["Id"]
|
||||
group_name = str(uuid.uuid4())
|
||||
conn.create_group(GroupName=group_name, UserPoolId=user_pool_id)
|
||||
|
||||
username = str(uuid.uuid4())
|
||||
conn.admin_create_user(UserPoolId=user_pool_id, Username=username)
|
||||
username2 = str(uuid.uuid4())
|
||||
conn.admin_create_user(UserPoolId=user_pool_id, Username=username2)
|
||||
|
||||
conn.admin_add_user_to_group(UserPoolId=user_pool_id, Username=username, GroupName=group_name)
|
||||
conn.admin_add_user_to_group(UserPoolId=user_pool_id, Username=username2, GroupName=group_name)
|
||||
conn.admin_delete_user(UserPoolId=user_pool_id, Username=username)
|
||||
|
||||
result = conn.list_users_in_group(UserPoolId=user_pool_id, GroupName=group_name)
|
||||
|
||||
result["Users"].should.have.length_of(1)
|
||||
result["Users"][0]["Username"].should.equal(username2)
|
||||
|
||||
|
||||
@mock_cognitoidp
|
||||
def test_admin_list_groups_for_user():
|
||||
conn = boto3.client("cognito-idp", "us-west-2")
|
||||
|
||||
user_pool_id = conn.create_user_pool(PoolName=str(uuid.uuid4()))["UserPool"]["Id"]
|
||||
group_name = str(uuid.uuid4())
|
||||
conn.create_group(GroupName=group_name, UserPoolId=user_pool_id)
|
||||
|
||||
username = str(uuid.uuid4())
|
||||
conn.admin_create_user(UserPoolId=user_pool_id, Username=username)
|
||||
|
||||
conn.admin_add_user_to_group(UserPoolId=user_pool_id, Username=username, GroupName=group_name)
|
||||
|
||||
result = conn.admin_list_groups_for_user(Username=username, UserPoolId=user_pool_id)
|
||||
|
||||
result["Groups"].should.have.length_of(1)
|
||||
result["Groups"][0]["GroupName"].should.equal(group_name)
|
||||
|
||||
|
||||
@mock_cognitoidp
|
||||
def test_admin_list_groups_for_user_ignores_deleted_group():
|
||||
conn = boto3.client("cognito-idp", "us-west-2")
|
||||
|
||||
user_pool_id = conn.create_user_pool(PoolName=str(uuid.uuid4()))["UserPool"]["Id"]
|
||||
group_name = str(uuid.uuid4())
|
||||
conn.create_group(GroupName=group_name, UserPoolId=user_pool_id)
|
||||
group_name2 = str(uuid.uuid4())
|
||||
conn.create_group(GroupName=group_name2, UserPoolId=user_pool_id)
|
||||
|
||||
username = str(uuid.uuid4())
|
||||
conn.admin_create_user(UserPoolId=user_pool_id, Username=username)
|
||||
|
||||
conn.admin_add_user_to_group(UserPoolId=user_pool_id, Username=username, GroupName=group_name)
|
||||
conn.admin_add_user_to_group(UserPoolId=user_pool_id, Username=username, GroupName=group_name2)
|
||||
conn.delete_group(GroupName=group_name, UserPoolId=user_pool_id)
|
||||
|
||||
result = conn.admin_list_groups_for_user(Username=username, UserPoolId=user_pool_id)
|
||||
|
||||
result["Groups"].should.have.length_of(1)
|
||||
result["Groups"][0]["GroupName"].should.equal(group_name2)
|
||||
|
||||
|
||||
@mock_cognitoidp
|
||||
def test_admin_remove_user_from_group():
|
||||
conn = boto3.client("cognito-idp", "us-west-2")
|
||||
|
||||
user_pool_id = conn.create_user_pool(PoolName=str(uuid.uuid4()))["UserPool"]["Id"]
|
||||
group_name = str(uuid.uuid4())
|
||||
conn.create_group(GroupName=group_name, UserPoolId=user_pool_id)
|
||||
|
||||
username = str(uuid.uuid4())
|
||||
conn.admin_create_user(UserPoolId=user_pool_id, Username=username)
|
||||
|
||||
conn.admin_add_user_to_group(UserPoolId=user_pool_id, Username=username, GroupName=group_name)
|
||||
|
||||
result = conn.admin_remove_user_from_group(UserPoolId=user_pool_id, Username=username, GroupName=group_name)
|
||||
list(result.keys()).should.equal(["ResponseMetadata"]) # No response expected
|
||||
conn.list_users_in_group(UserPoolId=user_pool_id, GroupName=group_name) \
|
||||
["Users"].should.have.length_of(0)
|
||||
conn.admin_list_groups_for_user(Username=username, UserPoolId=user_pool_id) \
|
||||
["Groups"].should.have.length_of(0)
|
||||
|
||||
|
||||
@mock_cognitoidp
|
||||
def test_admin_remove_user_from_group_again_is_noop():
|
||||
conn = boto3.client("cognito-idp", "us-west-2")
|
||||
|
||||
user_pool_id = conn.create_user_pool(PoolName=str(uuid.uuid4()))["UserPool"]["Id"]
|
||||
group_name = str(uuid.uuid4())
|
||||
conn.create_group(GroupName=group_name, UserPoolId=user_pool_id)
|
||||
|
||||
username = str(uuid.uuid4())
|
||||
conn.admin_create_user(UserPoolId=user_pool_id, Username=username)
|
||||
|
||||
conn.admin_add_user_to_group(UserPoolId=user_pool_id, Username=username, GroupName=group_name)
|
||||
conn.admin_add_user_to_group(UserPoolId=user_pool_id, Username=username, GroupName=group_name)
|
||||
|
||||
|
||||
@mock_cognitoidp
|
||||
def test_admin_create_user():
|
||||
conn = boto3.client("cognito-idp", "us-west-2")
|
||||
|
|
@ -396,6 +827,62 @@ def test_list_users():
|
|||
result["Users"][0]["Username"].should.equal(username)
|
||||
|
||||
|
||||
@mock_cognitoidp
|
||||
def test_list_users_returns_limit_items():
|
||||
conn = boto3.client("cognito-idp", "us-west-2")
|
||||
user_pool_id = conn.create_user_pool(PoolName=str(uuid.uuid4()))["UserPool"]["Id"]
|
||||
|
||||
# Given 10 users
|
||||
user_count = 10
|
||||
for i in range(user_count):
|
||||
conn.admin_create_user(UserPoolId=user_pool_id,
|
||||
Username=str(uuid.uuid4()))
|
||||
max_results = 5
|
||||
result = conn.list_users(UserPoolId=user_pool_id, Limit=max_results)
|
||||
result["Users"].should.have.length_of(max_results)
|
||||
result.should.have.key("PaginationToken")
|
||||
|
||||
|
||||
@mock_cognitoidp
|
||||
def test_list_users_returns_pagination_tokens():
|
||||
conn = boto3.client("cognito-idp", "us-west-2")
|
||||
user_pool_id = conn.create_user_pool(PoolName=str(uuid.uuid4()))["UserPool"]["Id"]
|
||||
|
||||
# Given 10 users
|
||||
user_count = 10
|
||||
for i in range(user_count):
|
||||
conn.admin_create_user(UserPoolId=user_pool_id,
|
||||
Username=str(uuid.uuid4()))
|
||||
|
||||
max_results = 5
|
||||
result = conn.list_users(UserPoolId=user_pool_id, Limit=max_results)
|
||||
result["Users"].should.have.length_of(max_results)
|
||||
result.should.have.key("PaginationToken")
|
||||
|
||||
next_token = result["PaginationToken"]
|
||||
result_2 = conn.list_users(UserPoolId=user_pool_id,
|
||||
Limit=max_results, PaginationToken=next_token)
|
||||
result_2["Users"].should.have.length_of(max_results)
|
||||
result_2.shouldnt.have.key("PaginationToken")
|
||||
|
||||
|
||||
@mock_cognitoidp
|
||||
def test_list_users_when_limit_more_than_total_items():
|
||||
conn = boto3.client("cognito-idp", "us-west-2")
|
||||
user_pool_id = conn.create_user_pool(PoolName=str(uuid.uuid4()))["UserPool"]["Id"]
|
||||
|
||||
# Given 10 users
|
||||
user_count = 10
|
||||
for i in range(user_count):
|
||||
conn.admin_create_user(UserPoolId=user_pool_id,
|
||||
Username=str(uuid.uuid4()))
|
||||
|
||||
max_results = user_count + 5
|
||||
result = conn.list_users(UserPoolId=user_pool_id, Limit=max_results)
|
||||
result["Users"].should.have.length_of(user_count)
|
||||
result.shouldnt.have.key("PaginationToken")
|
||||
|
||||
|
||||
@mock_cognitoidp
|
||||
def test_admin_disable_user():
|
||||
conn = boto3.client("cognito-idp", "us-west-2")
|
||||
|
|
|
|||
|
|
@ -1000,6 +1000,11 @@ def test_delete_item():
|
|||
response = table.scan()
|
||||
assert response['Count'] == 2
|
||||
|
||||
# Test ReturnValues validation
|
||||
with assert_raises(ClientError) as ex:
|
||||
table.delete_item(Key={'client': 'client1', 'app': 'app1'},
|
||||
ReturnValues='ALL_NEW')
|
||||
|
||||
# Test deletion and returning old value
|
||||
response = table.delete_item(Key={'client': 'client1', 'app': 'app1'}, ReturnValues='ALL_OLD')
|
||||
response['Attributes'].should.contain('client')
|
||||
|
|
@ -1246,6 +1251,81 @@ def test_update_if_not_exists():
|
|||
assert resp['Items'][0]['created_at'] == 123
|
||||
|
||||
|
||||
# https://github.com/spulec/moto/issues/1937
|
||||
@mock_dynamodb2
|
||||
def test_update_return_attributes():
|
||||
dynamodb = boto3.client('dynamodb', region_name='us-east-1')
|
||||
|
||||
dynamodb.create_table(
|
||||
TableName='moto-test',
|
||||
KeySchema=[{'AttributeName': 'id', 'KeyType': 'HASH'}],
|
||||
AttributeDefinitions=[{'AttributeName': 'id', 'AttributeType': 'S'}],
|
||||
ProvisionedThroughput={'ReadCapacityUnits': 1, 'WriteCapacityUnits': 1}
|
||||
)
|
||||
|
||||
def update(col, to, rv):
|
||||
return dynamodb.update_item(
|
||||
TableName='moto-test',
|
||||
Key={'id': {'S': 'foo'}},
|
||||
AttributeUpdates={col: {'Value': {'S': to}, 'Action': 'PUT'}},
|
||||
ReturnValues=rv
|
||||
)
|
||||
|
||||
r = update('col1', 'val1', 'ALL_NEW')
|
||||
assert r['Attributes'] == {'id': {'S': 'foo'}, 'col1': {'S': 'val1'}}
|
||||
|
||||
r = update('col1', 'val2', 'ALL_OLD')
|
||||
assert r['Attributes'] == {'id': {'S': 'foo'}, 'col1': {'S': 'val1'}}
|
||||
|
||||
r = update('col2', 'val3', 'UPDATED_NEW')
|
||||
assert r['Attributes'] == {'col2': {'S': 'val3'}}
|
||||
|
||||
r = update('col2', 'val4', 'UPDATED_OLD')
|
||||
assert r['Attributes'] == {'col2': {'S': 'val3'}}
|
||||
|
||||
r = update('col1', 'val5', 'NONE')
|
||||
assert r['Attributes'] == {}
|
||||
|
||||
with assert_raises(ClientError) as ex:
|
||||
r = update('col1', 'val6', 'WRONG')
|
||||
|
||||
|
||||
@mock_dynamodb2
|
||||
def test_put_return_attributes():
|
||||
dynamodb = boto3.client('dynamodb', region_name='us-east-1')
|
||||
|
||||
dynamodb.create_table(
|
||||
TableName='moto-test',
|
||||
KeySchema=[{'AttributeName': 'id', 'KeyType': 'HASH'}],
|
||||
AttributeDefinitions=[{'AttributeName': 'id', 'AttributeType': 'S'}],
|
||||
ProvisionedThroughput={'ReadCapacityUnits': 1, 'WriteCapacityUnits': 1}
|
||||
)
|
||||
|
||||
r = dynamodb.put_item(
|
||||
TableName='moto-test',
|
||||
Item={'id': {'S': 'foo'}, 'col1': {'S': 'val1'}},
|
||||
ReturnValues='NONE'
|
||||
)
|
||||
assert 'Attributes' not in r
|
||||
|
||||
r = dynamodb.put_item(
|
||||
TableName='moto-test',
|
||||
Item={'id': {'S': 'foo'}, 'col1': {'S': 'val2'}},
|
||||
ReturnValues='ALL_OLD'
|
||||
)
|
||||
assert r['Attributes'] == {'id': {'S': 'foo'}, 'col1': {'S': 'val1'}}
|
||||
|
||||
with assert_raises(ClientError) as ex:
|
||||
dynamodb.put_item(
|
||||
TableName='moto-test',
|
||||
Item={'id': {'S': 'foo'}, 'col1': {'S': 'val3'}},
|
||||
ReturnValues='ALL_NEW'
|
||||
)
|
||||
ex.exception.response['Error']['Code'].should.equal('ValidationException')
|
||||
ex.exception.response['ResponseMetadata']['HTTPStatusCode'].should.equal(400)
|
||||
ex.exception.response['Error']['Message'].should.equal('Return values set to invalid value')
|
||||
|
||||
|
||||
@mock_dynamodb2
|
||||
def test_query_global_secondary_index_when_created_via_update_table_resource():
|
||||
dynamodb = boto3.resource('dynamodb', region_name='us-east-1')
|
||||
|
|
@ -1336,3 +1416,62 @@ def test_query_global_secondary_index_when_created_via_update_table_resource():
|
|||
assert len(forum_and_subject_items) == 1
|
||||
assert forum_and_subject_items[0] == {'user_id': Decimal('1'), 'forum_name': 'cats',
|
||||
'subject': 'my pet is the cutest'}
|
||||
|
||||
|
||||
@mock_dynamodb2
|
||||
def test_dynamodb_streams_1():
|
||||
conn = boto3.client('dynamodb', region_name='us-east-1')
|
||||
|
||||
resp = conn.create_table(
|
||||
TableName='test-streams',
|
||||
KeySchema=[{'AttributeName': 'id', 'KeyType': 'HASH'}],
|
||||
AttributeDefinitions=[{'AttributeName': 'id', 'AttributeType': 'S'}],
|
||||
ProvisionedThroughput={'ReadCapacityUnits': 1, 'WriteCapacityUnits': 1},
|
||||
StreamSpecification={
|
||||
'StreamEnabled': True,
|
||||
'StreamViewType': 'NEW_AND_OLD_IMAGES'
|
||||
}
|
||||
)
|
||||
|
||||
assert 'StreamSpecification' in resp['TableDescription']
|
||||
assert resp['TableDescription']['StreamSpecification'] == {
|
||||
'StreamEnabled': True,
|
||||
'StreamViewType': 'NEW_AND_OLD_IMAGES'
|
||||
}
|
||||
assert 'LatestStreamLabel' in resp['TableDescription']
|
||||
assert 'LatestStreamArn' in resp['TableDescription']
|
||||
|
||||
resp = conn.delete_table(TableName='test-streams')
|
||||
|
||||
assert 'StreamSpecification' in resp['TableDescription']
|
||||
|
||||
|
||||
@mock_dynamodb2
|
||||
def test_dynamodb_streams_2():
|
||||
conn = boto3.client('dynamodb', region_name='us-east-1')
|
||||
|
||||
resp = conn.create_table(
|
||||
TableName='test-stream-update',
|
||||
KeySchema=[{'AttributeName': 'id', 'KeyType': 'HASH'}],
|
||||
AttributeDefinitions=[{'AttributeName': 'id', 'AttributeType': 'S'}],
|
||||
ProvisionedThroughput={'ReadCapacityUnits': 1, 'WriteCapacityUnits': 1},
|
||||
)
|
||||
|
||||
assert 'StreamSpecification' not in resp['TableDescription']
|
||||
|
||||
resp = conn.update_table(
|
||||
TableName='test-stream-update',
|
||||
StreamSpecification={
|
||||
'StreamEnabled': True,
|
||||
'StreamViewType': 'NEW_IMAGE'
|
||||
}
|
||||
)
|
||||
|
||||
assert 'StreamSpecification' in resp['TableDescription']
|
||||
assert resp['TableDescription']['StreamSpecification'] == {
|
||||
'StreamEnabled': True,
|
||||
'StreamViewType': 'NEW_IMAGE'
|
||||
}
|
||||
assert 'LatestStreamLabel' in resp['TableDescription']
|
||||
assert 'LatestStreamArn' in resp['TableDescription']
|
||||
|
||||
|
|
|
|||
234
tests/test_dynamodbstreams/test_dynamodbstreams.py
Normal file
234
tests/test_dynamodbstreams/test_dynamodbstreams.py
Normal file
|
|
@ -0,0 +1,234 @@
|
|||
from __future__ import unicode_literals, print_function
|
||||
|
||||
from nose.tools import assert_raises
|
||||
|
||||
import boto3
|
||||
from moto import mock_dynamodb2, mock_dynamodbstreams
|
||||
|
||||
|
||||
class TestCore():
|
||||
stream_arn = None
|
||||
mocks = []
|
||||
|
||||
def setup(self):
|
||||
self.mocks = [mock_dynamodb2(), mock_dynamodbstreams()]
|
||||
for m in self.mocks:
|
||||
m.start()
|
||||
|
||||
# create a table with a stream
|
||||
conn = boto3.client('dynamodb', region_name='us-east-1')
|
||||
|
||||
resp = conn.create_table(
|
||||
TableName='test-streams',
|
||||
KeySchema=[{'AttributeName': 'id', 'KeyType': 'HASH'}],
|
||||
AttributeDefinitions=[{'AttributeName': 'id',
|
||||
'AttributeType': 'S'}],
|
||||
ProvisionedThroughput={'ReadCapacityUnits': 1,
|
||||
'WriteCapacityUnits': 1},
|
||||
StreamSpecification={
|
||||
'StreamEnabled': True,
|
||||
'StreamViewType': 'NEW_AND_OLD_IMAGES'
|
||||
}
|
||||
)
|
||||
self.stream_arn = resp['TableDescription']['LatestStreamArn']
|
||||
|
||||
def teardown(self):
|
||||
conn = boto3.client('dynamodb', region_name='us-east-1')
|
||||
conn.delete_table(TableName='test-streams')
|
||||
self.stream_arn = None
|
||||
|
||||
for m in self.mocks:
|
||||
m.stop()
|
||||
|
||||
|
||||
def test_verify_stream(self):
|
||||
conn = boto3.client('dynamodb', region_name='us-east-1')
|
||||
resp = conn.describe_table(TableName='test-streams')
|
||||
assert 'LatestStreamArn' in resp['Table']
|
||||
|
||||
def test_describe_stream(self):
|
||||
conn = boto3.client('dynamodbstreams', region_name='us-east-1')
|
||||
|
||||
resp = conn.describe_stream(StreamArn=self.stream_arn)
|
||||
assert 'StreamDescription' in resp
|
||||
desc = resp['StreamDescription']
|
||||
assert desc['StreamArn'] == self.stream_arn
|
||||
assert desc['TableName'] == 'test-streams'
|
||||
|
||||
def test_list_streams(self):
|
||||
conn = boto3.client('dynamodbstreams', region_name='us-east-1')
|
||||
|
||||
resp = conn.list_streams()
|
||||
assert resp['Streams'][0]['StreamArn'] == self.stream_arn
|
||||
|
||||
resp = conn.list_streams(TableName='no-stream')
|
||||
assert not resp['Streams']
|
||||
|
||||
def test_get_shard_iterator(self):
|
||||
conn = boto3.client('dynamodbstreams', region_name='us-east-1')
|
||||
|
||||
resp = conn.describe_stream(StreamArn=self.stream_arn)
|
||||
shard_id = resp['StreamDescription']['Shards'][0]['ShardId']
|
||||
|
||||
resp = conn.get_shard_iterator(
|
||||
StreamArn=self.stream_arn,
|
||||
ShardId=shard_id,
|
||||
ShardIteratorType='TRIM_HORIZON'
|
||||
)
|
||||
assert 'ShardIterator' in resp
|
||||
|
||||
def test_get_records_empty(self):
|
||||
conn = boto3.client('dynamodbstreams', region_name='us-east-1')
|
||||
|
||||
resp = conn.describe_stream(StreamArn=self.stream_arn)
|
||||
shard_id = resp['StreamDescription']['Shards'][0]['ShardId']
|
||||
|
||||
resp = conn.get_shard_iterator(
|
||||
StreamArn=self.stream_arn,
|
||||
ShardId=shard_id,
|
||||
ShardIteratorType='LATEST'
|
||||
)
|
||||
iterator_id = resp['ShardIterator']
|
||||
|
||||
resp = conn.get_records(ShardIterator=iterator_id)
|
||||
assert 'Records' in resp
|
||||
assert len(resp['Records']) == 0
|
||||
|
||||
def test_get_records_seq(self):
|
||||
conn = boto3.client('dynamodb', region_name='us-east-1')
|
||||
|
||||
conn.put_item(
|
||||
TableName='test-streams',
|
||||
Item={
|
||||
'id': {'S': 'entry1'},
|
||||
'first_col': {'S': 'foo'}
|
||||
}
|
||||
)
|
||||
conn.put_item(
|
||||
TableName='test-streams',
|
||||
Item={
|
||||
'id': {'S': 'entry1'},
|
||||
'first_col': {'S': 'bar'},
|
||||
'second_col': {'S': 'baz'}
|
||||
}
|
||||
)
|
||||
conn.delete_item(
|
||||
TableName='test-streams',
|
||||
Key={'id': {'S': 'entry1'}}
|
||||
)
|
||||
|
||||
conn = boto3.client('dynamodbstreams', region_name='us-east-1')
|
||||
|
||||
resp = conn.describe_stream(StreamArn=self.stream_arn)
|
||||
shard_id = resp['StreamDescription']['Shards'][0]['ShardId']
|
||||
|
||||
resp = conn.get_shard_iterator(
|
||||
StreamArn=self.stream_arn,
|
||||
ShardId=shard_id,
|
||||
ShardIteratorType='TRIM_HORIZON'
|
||||
)
|
||||
iterator_id = resp['ShardIterator']
|
||||
|
||||
resp = conn.get_records(ShardIterator=iterator_id)
|
||||
assert len(resp['Records']) == 3
|
||||
assert resp['Records'][0]['eventName'] == 'INSERT'
|
||||
assert resp['Records'][1]['eventName'] == 'MODIFY'
|
||||
assert resp['Records'][2]['eventName'] == 'DELETE'
|
||||
|
||||
# now try fetching from the next shard iterator, it should be
|
||||
# empty
|
||||
resp = conn.get_records(ShardIterator=resp['NextShardIterator'])
|
||||
assert len(resp['Records']) == 0
|
||||
|
||||
|
||||
class TestEdges():
|
||||
mocks = []
|
||||
|
||||
def setup(self):
|
||||
self.mocks = [mock_dynamodb2(), mock_dynamodbstreams()]
|
||||
for m in self.mocks:
|
||||
m.start()
|
||||
|
||||
def teardown(self):
|
||||
for m in self.mocks:
|
||||
m.stop()
|
||||
|
||||
|
||||
def test_enable_stream_on_table(self):
|
||||
conn = boto3.client('dynamodb', region_name='us-east-1')
|
||||
resp = conn.create_table(
|
||||
TableName='test-streams',
|
||||
KeySchema=[{'AttributeName': 'id', 'KeyType': 'HASH'}],
|
||||
AttributeDefinitions=[{'AttributeName': 'id',
|
||||
'AttributeType': 'S'}],
|
||||
ProvisionedThroughput={'ReadCapacityUnits': 1,
|
||||
'WriteCapacityUnits': 1}
|
||||
)
|
||||
assert 'StreamSpecification' not in resp['TableDescription']
|
||||
|
||||
resp = conn.update_table(
|
||||
TableName='test-streams',
|
||||
StreamSpecification={
|
||||
'StreamViewType': 'KEYS_ONLY'
|
||||
}
|
||||
)
|
||||
assert 'StreamSpecification' in resp['TableDescription']
|
||||
assert resp['TableDescription']['StreamSpecification'] == {
|
||||
'StreamEnabled': True,
|
||||
'StreamViewType': 'KEYS_ONLY'
|
||||
}
|
||||
assert 'LatestStreamLabel' in resp['TableDescription']
|
||||
|
||||
# now try to enable it again
|
||||
with assert_raises(conn.exceptions.ResourceInUseException):
|
||||
resp = conn.update_table(
|
||||
TableName='test-streams',
|
||||
StreamSpecification={
|
||||
'StreamViewType': 'OLD_IMAGES'
|
||||
}
|
||||
)
|
||||
|
||||
def test_stream_with_range_key(self):
|
||||
dyn = boto3.client('dynamodb', region_name='us-east-1')
|
||||
|
||||
resp = dyn.create_table(
|
||||
TableName='test-streams',
|
||||
KeySchema=[{'AttributeName': 'id', 'KeyType': 'HASH'},
|
||||
{'AttributeName': 'color', 'KeyType': 'RANGE'}],
|
||||
AttributeDefinitions=[{'AttributeName': 'id',
|
||||
'AttributeType': 'S'},
|
||||
{'AttributeName': 'color',
|
||||
'AttributeType': 'S'}],
|
||||
ProvisionedThroughput={'ReadCapacityUnits': 1,
|
||||
'WriteCapacityUnits': 1},
|
||||
StreamSpecification={
|
||||
'StreamViewType': 'NEW_IMAGES'
|
||||
}
|
||||
)
|
||||
stream_arn = resp['TableDescription']['LatestStreamArn']
|
||||
|
||||
streams = boto3.client('dynamodbstreams', region_name='us-east-1')
|
||||
resp = streams.describe_stream(StreamArn=stream_arn)
|
||||
shard_id = resp['StreamDescription']['Shards'][0]['ShardId']
|
||||
|
||||
resp = streams.get_shard_iterator(
|
||||
StreamArn=stream_arn,
|
||||
ShardId=shard_id,
|
||||
ShardIteratorType='LATEST'
|
||||
)
|
||||
iterator_id = resp['ShardIterator']
|
||||
|
||||
dyn.put_item(
|
||||
TableName='test-streams',
|
||||
Item={'id': {'S': 'row1'}, 'color': {'S': 'blue'}}
|
||||
)
|
||||
dyn.put_item(
|
||||
TableName='test-streams',
|
||||
Item={'id': {'S': 'row2'}, 'color': {'S': 'green'}}
|
||||
)
|
||||
|
||||
resp = streams.get_records(ShardIterator=iterator_id)
|
||||
assert len(resp['Records']) == 2
|
||||
assert resp['Records'][0]['eventName'] == 'INSERT'
|
||||
assert resp['Records'][1]['eventName'] == 'INSERT'
|
||||
|
||||
|
|
@ -89,7 +89,8 @@ def test_vpc_peering_connections_delete():
|
|||
verdict.should.equal(True)
|
||||
|
||||
all_vpc_pcxs = conn.get_all_vpc_peering_connections()
|
||||
all_vpc_pcxs.should.have.length_of(0)
|
||||
all_vpc_pcxs.should.have.length_of(1)
|
||||
all_vpc_pcxs[0]._status.code.should.equal('deleted')
|
||||
|
||||
with assert_raises(EC2ResponseError) as cm:
|
||||
conn.delete_vpc_peering_connection("pcx-1234abcd")
|
||||
|
|
|
|||
|
|
@ -925,6 +925,65 @@ def test_update_container_instances_state():
|
|||
status='test_status').should.throw(Exception)
|
||||
|
||||
|
||||
@mock_ec2
|
||||
@mock_ecs
|
||||
def test_update_container_instances_state_by_arn():
|
||||
ecs_client = boto3.client('ecs', region_name='us-east-1')
|
||||
ec2 = boto3.resource('ec2', region_name='us-east-1')
|
||||
|
||||
test_cluster_name = 'test_ecs_cluster'
|
||||
_ = ecs_client.create_cluster(
|
||||
clusterName=test_cluster_name
|
||||
)
|
||||
|
||||
instance_to_create = 3
|
||||
test_instance_arns = []
|
||||
for i in range(0, instance_to_create):
|
||||
test_instance = ec2.create_instances(
|
||||
ImageId="ami-1234abcd",
|
||||
MinCount=1,
|
||||
MaxCount=1,
|
||||
)[0]
|
||||
|
||||
instance_id_document = json.dumps(
|
||||
ec2_utils.generate_instance_identity_document(test_instance)
|
||||
)
|
||||
|
||||
response = ecs_client.register_container_instance(
|
||||
cluster=test_cluster_name,
|
||||
instanceIdentityDocument=instance_id_document)
|
||||
|
||||
test_instance_arns.append(response['containerInstance']['containerInstanceArn'])
|
||||
|
||||
response = ecs_client.update_container_instances_state(cluster=test_cluster_name,
|
||||
containerInstances=test_instance_arns,
|
||||
status='DRAINING')
|
||||
len(response['failures']).should.equal(0)
|
||||
len(response['containerInstances']).should.equal(instance_to_create)
|
||||
response_statuses = [ci['status'] for ci in response['containerInstances']]
|
||||
for status in response_statuses:
|
||||
status.should.equal('DRAINING')
|
||||
response = ecs_client.update_container_instances_state(cluster=test_cluster_name,
|
||||
containerInstances=test_instance_arns,
|
||||
status='DRAINING')
|
||||
len(response['failures']).should.equal(0)
|
||||
len(response['containerInstances']).should.equal(instance_to_create)
|
||||
response_statuses = [ci['status'] for ci in response['containerInstances']]
|
||||
for status in response_statuses:
|
||||
status.should.equal('DRAINING')
|
||||
response = ecs_client.update_container_instances_state(cluster=test_cluster_name,
|
||||
containerInstances=test_instance_arns,
|
||||
status='ACTIVE')
|
||||
len(response['failures']).should.equal(0)
|
||||
len(response['containerInstances']).should.equal(instance_to_create)
|
||||
response_statuses = [ci['status'] for ci in response['containerInstances']]
|
||||
for status in response_statuses:
|
||||
status.should.equal('ACTIVE')
|
||||
ecs_client.update_container_instances_state.when.called_with(cluster=test_cluster_name,
|
||||
containerInstances=test_instance_arns,
|
||||
status='test_status').should.throw(Exception)
|
||||
|
||||
|
||||
@mock_ec2
|
||||
@mock_ecs
|
||||
def test_run_task():
|
||||
|
|
|
|||
|
|
@ -14,6 +14,19 @@ from nose.tools import raises
|
|||
from tests.helpers import requires_boto_gte
|
||||
|
||||
|
||||
MOCK_CERT = """-----BEGIN CERTIFICATE-----
|
||||
MIIBpzCCARACCQCY5yOdxCTrGjANBgkqhkiG9w0BAQsFADAXMRUwEwYDVQQKDAxt
|
||||
b3RvIHRlc3RpbmcwIBcNMTgxMTA1MTkwNTIwWhgPMjI5MjA4MTkxOTA1MjBaMBcx
|
||||
FTATBgNVBAoMDG1vdG8gdGVzdGluZzCBnzANBgkqhkiG9w0BAQEFAAOBjQAwgYkC
|
||||
gYEA1Jn3g2h7LD3FLqdpcYNbFXCS4V4eDpuTCje9vKFcC3pi/01147X3zdfPy8Mt
|
||||
ZhKxcREOwm4NXykh23P9KW7fBovpNwnbYsbPqj8Hf1ZaClrgku1arTVhEnKjx8zO
|
||||
vaR/bVLCss4uE0E0VM1tJn/QGQsfthFsjuHtwx8uIWz35tUCAwEAATANBgkqhkiG
|
||||
9w0BAQsFAAOBgQBWdOQ7bDc2nWkUhFjZoNIZrqjyNdjlMUndpwREVD7FQ/DuxJMj
|
||||
FyDHrtlrS80dPUQWNYHw++oACDpWO01LGLPPrGmuO/7cOdojPEd852q5gd+7W9xt
|
||||
8vUH+pBa6IBLbvBp+szli51V3TLSWcoyy4ceJNQU2vCkTLoFdS0RLd/7tQ==
|
||||
-----END CERTIFICATE-----"""
|
||||
|
||||
|
||||
@mock_iam_deprecated()
|
||||
def test_get_all_server_certs():
|
||||
conn = boto.connect_iam()
|
||||
|
|
@ -108,6 +121,10 @@ def test_create_role_and_instance_profile():
|
|||
|
||||
conn.list_roles().roles[0].role_name.should.equal('my-role')
|
||||
|
||||
# Test with an empty path:
|
||||
profile = conn.create_instance_profile('my-other-profile')
|
||||
profile.path.should.equal('/')
|
||||
|
||||
|
||||
@mock_iam_deprecated()
|
||||
def test_remove_role_from_instance_profile():
|
||||
|
|
@ -536,6 +553,14 @@ def test_generate_credential_report():
|
|||
result['generate_credential_report_response'][
|
||||
'generate_credential_report_result']['state'].should.equal('COMPLETE')
|
||||
|
||||
@mock_iam
|
||||
def test_boto3_generate_credential_report():
|
||||
conn = boto3.client('iam', region_name='us-east-1')
|
||||
result = conn.generate_credential_report()
|
||||
result['State'].should.equal('STARTED')
|
||||
result = conn.generate_credential_report()
|
||||
result['State'].should.equal('COMPLETE')
|
||||
|
||||
|
||||
@mock_iam_deprecated()
|
||||
def test_get_credential_report():
|
||||
|
|
@ -551,6 +576,19 @@ def test_get_credential_report():
|
|||
'get_credential_report_result']['content'].encode('ascii')).decode('ascii')
|
||||
report.should.match(r'.*my-user.*')
|
||||
|
||||
@mock_iam
|
||||
def test_boto3_get_credential_report():
|
||||
conn = boto3.client('iam', region_name='us-east-1')
|
||||
conn.create_user(UserName='my-user')
|
||||
with assert_raises(ClientError):
|
||||
conn.get_credential_report()
|
||||
result = conn.generate_credential_report()
|
||||
while result['State'] != 'COMPLETE':
|
||||
result = conn.generate_credential_report()
|
||||
result = conn.get_credential_report()
|
||||
report = result['Content'].decode('utf-8')
|
||||
report.should.match(r'.*my-user.*')
|
||||
|
||||
|
||||
@requires_boto_gte('2.39')
|
||||
@mock_iam_deprecated()
|
||||
|
|
@ -700,10 +738,10 @@ def test_get_account_authorization_details():
|
|||
import json
|
||||
conn = boto3.client('iam', region_name='us-east-1')
|
||||
conn.create_role(RoleName="my-role", AssumeRolePolicyDocument="some policy", Path="/my-path/")
|
||||
conn.create_user(Path='/', UserName='testCloudAuxUser')
|
||||
conn.create_group(Path='/', GroupName='testCloudAuxGroup')
|
||||
conn.create_user(Path='/', UserName='testUser')
|
||||
conn.create_group(Path='/', GroupName='testGroup')
|
||||
conn.create_policy(
|
||||
PolicyName='testCloudAuxPolicy',
|
||||
PolicyName='testPolicy',
|
||||
Path='/',
|
||||
PolicyDocument=json.dumps({
|
||||
"Version": "2012-10-17",
|
||||
|
|
@ -715,46 +753,110 @@ def test_get_account_authorization_details():
|
|||
}
|
||||
]
|
||||
}),
|
||||
Description='Test CloudAux Policy'
|
||||
Description='Test Policy'
|
||||
)
|
||||
|
||||
conn.create_instance_profile(InstanceProfileName='ipn')
|
||||
conn.add_role_to_instance_profile(InstanceProfileName='ipn', RoleName='my-role')
|
||||
|
||||
result = conn.get_account_authorization_details(Filter=['Role'])
|
||||
len(result['RoleDetailList']) == 1
|
||||
len(result['UserDetailList']) == 0
|
||||
len(result['GroupDetailList']) == 0
|
||||
len(result['Policies']) == 0
|
||||
assert len(result['RoleDetailList']) == 1
|
||||
assert len(result['UserDetailList']) == 0
|
||||
assert len(result['GroupDetailList']) == 0
|
||||
assert len(result['Policies']) == 0
|
||||
assert len(result['RoleDetailList'][0]['InstanceProfileList']) == 1
|
||||
|
||||
result = conn.get_account_authorization_details(Filter=['User'])
|
||||
len(result['RoleDetailList']) == 0
|
||||
len(result['UserDetailList']) == 1
|
||||
len(result['GroupDetailList']) == 0
|
||||
len(result['Policies']) == 0
|
||||
assert len(result['RoleDetailList']) == 0
|
||||
assert len(result['UserDetailList']) == 1
|
||||
assert len(result['GroupDetailList']) == 0
|
||||
assert len(result['Policies']) == 0
|
||||
|
||||
result = conn.get_account_authorization_details(Filter=['Group'])
|
||||
len(result['RoleDetailList']) == 0
|
||||
len(result['UserDetailList']) == 0
|
||||
len(result['GroupDetailList']) == 1
|
||||
len(result['Policies']) == 0
|
||||
assert len(result['RoleDetailList']) == 0
|
||||
assert len(result['UserDetailList']) == 0
|
||||
assert len(result['GroupDetailList']) == 1
|
||||
assert len(result['Policies']) == 0
|
||||
|
||||
result = conn.get_account_authorization_details(Filter=['LocalManagedPolicy'])
|
||||
len(result['RoleDetailList']) == 0
|
||||
len(result['UserDetailList']) == 0
|
||||
len(result['GroupDetailList']) == 0
|
||||
len(result['Policies']) == 1
|
||||
assert len(result['RoleDetailList']) == 0
|
||||
assert len(result['UserDetailList']) == 0
|
||||
assert len(result['GroupDetailList']) == 0
|
||||
assert len(result['Policies']) == 1
|
||||
|
||||
# Check for greater than 1 since this should always be greater than one but might change.
|
||||
# See iam/aws_managed_policies.py
|
||||
result = conn.get_account_authorization_details(Filter=['AWSManagedPolicy'])
|
||||
len(result['RoleDetailList']) == 0
|
||||
len(result['UserDetailList']) == 0
|
||||
len(result['GroupDetailList']) == 0
|
||||
len(result['Policies']) > 1
|
||||
assert len(result['RoleDetailList']) == 0
|
||||
assert len(result['UserDetailList']) == 0
|
||||
assert len(result['GroupDetailList']) == 0
|
||||
assert len(result['Policies']) > 1
|
||||
|
||||
result = conn.get_account_authorization_details()
|
||||
len(result['RoleDetailList']) == 1
|
||||
len(result['UserDetailList']) == 1
|
||||
len(result['GroupDetailList']) == 1
|
||||
len(result['Policies']) > 1
|
||||
assert len(result['RoleDetailList']) == 1
|
||||
assert len(result['UserDetailList']) == 1
|
||||
assert len(result['GroupDetailList']) == 1
|
||||
assert len(result['Policies']) > 1
|
||||
|
||||
|
||||
@mock_iam
|
||||
def test_signing_certs():
|
||||
client = boto3.client('iam', region_name='us-east-1')
|
||||
|
||||
# Create the IAM user first:
|
||||
client.create_user(UserName='testing')
|
||||
|
||||
# Upload the cert:
|
||||
resp = client.upload_signing_certificate(UserName='testing', CertificateBody=MOCK_CERT)['Certificate']
|
||||
cert_id = resp['CertificateId']
|
||||
|
||||
assert resp['UserName'] == 'testing'
|
||||
assert resp['Status'] == 'Active'
|
||||
assert resp['CertificateBody'] == MOCK_CERT
|
||||
assert resp['CertificateId']
|
||||
|
||||
# Upload a the cert with an invalid body:
|
||||
with assert_raises(ClientError) as ce:
|
||||
client.upload_signing_certificate(UserName='testing', CertificateBody='notacert')
|
||||
assert ce.exception.response['Error']['Code'] == 'MalformedCertificate'
|
||||
|
||||
# Upload with an invalid user:
|
||||
with assert_raises(ClientError):
|
||||
client.upload_signing_certificate(UserName='notauser', CertificateBody=MOCK_CERT)
|
||||
|
||||
# Update:
|
||||
client.update_signing_certificate(UserName='testing', CertificateId=cert_id, Status='Inactive')
|
||||
|
||||
with assert_raises(ClientError):
|
||||
client.update_signing_certificate(UserName='notauser', CertificateId=cert_id, Status='Inactive')
|
||||
|
||||
with assert_raises(ClientError) as ce:
|
||||
client.update_signing_certificate(UserName='testing', CertificateId='x' * 32, Status='Inactive')
|
||||
|
||||
assert ce.exception.response['Error']['Message'] == 'The Certificate with id {id} cannot be found.'.format(
|
||||
id='x' * 32)
|
||||
|
||||
# List the certs:
|
||||
resp = client.list_signing_certificates(UserName='testing')['Certificates']
|
||||
assert len(resp) == 1
|
||||
assert resp[0]['CertificateBody'] == MOCK_CERT
|
||||
assert resp[0]['Status'] == 'Inactive' # Changed with the update call above.
|
||||
|
||||
with assert_raises(ClientError):
|
||||
client.list_signing_certificates(UserName='notauser')
|
||||
|
||||
# Delete:
|
||||
client.delete_signing_certificate(UserName='testing', CertificateId=cert_id)
|
||||
|
||||
with assert_raises(ClientError):
|
||||
client.delete_signing_certificate(UserName='notauser', CertificateId=cert_id)
|
||||
|
||||
with assert_raises(ClientError) as ce:
|
||||
client.delete_signing_certificate(UserName='testing', CertificateId=cert_id)
|
||||
|
||||
assert ce.exception.response['Error']['Message'] == 'The Certificate with id {id} cannot be found.'.format(
|
||||
id=cert_id)
|
||||
|
||||
# Verify that it's not in the list:
|
||||
resp = client.list_signing_certificates(UserName='testing')
|
||||
assert not resp['Certificates']
|
||||
|
|
|
|||
|
|
@ -5,6 +5,8 @@ import sure # noqa
|
|||
import boto3
|
||||
|
||||
from moto import mock_iot
|
||||
from botocore.exceptions import ClientError
|
||||
from nose.tools import assert_raises
|
||||
|
||||
|
||||
@mock_iot
|
||||
|
|
@ -261,6 +263,96 @@ def test_certs():
|
|||
res.should.have.key('certificates').which.should.have.length_of(0)
|
||||
|
||||
|
||||
@mock_iot
|
||||
def test_delete_policy_validation():
|
||||
doc = """{
|
||||
"Version": "2012-10-17",
|
||||
"Statement":[
|
||||
{
|
||||
"Effect":"Allow",
|
||||
"Action":[
|
||||
"iot: *"
|
||||
],
|
||||
"Resource":"*"
|
||||
}
|
||||
]
|
||||
}
|
||||
"""
|
||||
client = boto3.client('iot', region_name='ap-northeast-1')
|
||||
cert = client.create_keys_and_certificate(setAsActive=True)
|
||||
cert_arn = cert['certificateArn']
|
||||
policy_name = 'my-policy'
|
||||
client.create_policy(policyName=policy_name, policyDocument=doc)
|
||||
client.attach_principal_policy(policyName=policy_name, principal=cert_arn)
|
||||
|
||||
with assert_raises(ClientError) as e:
|
||||
client.delete_policy(policyName=policy_name)
|
||||
e.exception.response['Error']['Message'].should.contain(
|
||||
'The policy cannot be deleted as the policy is attached to one or more principals (name=%s)' % policy_name)
|
||||
res = client.list_policies()
|
||||
res.should.have.key('policies').which.should.have.length_of(1)
|
||||
|
||||
client.detach_principal_policy(policyName=policy_name, principal=cert_arn)
|
||||
client.delete_policy(policyName=policy_name)
|
||||
res = client.list_policies()
|
||||
res.should.have.key('policies').which.should.have.length_of(0)
|
||||
|
||||
|
||||
@mock_iot
|
||||
def test_delete_certificate_validation():
|
||||
doc = """{
|
||||
"Version": "2012-10-17",
|
||||
"Statement":[
|
||||
{
|
||||
"Effect":"Allow",
|
||||
"Action":[
|
||||
"iot: *"
|
||||
],
|
||||
"Resource":"*"
|
||||
}
|
||||
]
|
||||
}
|
||||
"""
|
||||
client = boto3.client('iot', region_name='ap-northeast-1')
|
||||
cert = client.create_keys_and_certificate(setAsActive=True)
|
||||
cert_id = cert['certificateId']
|
||||
cert_arn = cert['certificateArn']
|
||||
policy_name = 'my-policy'
|
||||
thing_name = 'thing-1'
|
||||
client.create_policy(policyName=policy_name, policyDocument=doc)
|
||||
client.attach_principal_policy(policyName=policy_name, principal=cert_arn)
|
||||
client.create_thing(thingName=thing_name)
|
||||
client.attach_thing_principal(thingName=thing_name, principal=cert_arn)
|
||||
|
||||
with assert_raises(ClientError) as e:
|
||||
client.delete_certificate(certificateId=cert_id)
|
||||
e.exception.response['Error']['Message'].should.contain(
|
||||
'Certificate must be deactivated (not ACTIVE) before deletion.')
|
||||
res = client.list_certificates()
|
||||
res.should.have.key('certificates').which.should.have.length_of(1)
|
||||
|
||||
client.update_certificate(certificateId=cert_id, newStatus='REVOKED')
|
||||
with assert_raises(ClientError) as e:
|
||||
client.delete_certificate(certificateId=cert_id)
|
||||
e.exception.response['Error']['Message'].should.contain(
|
||||
'Things must be detached before deletion (arn: %s)' % cert_arn)
|
||||
res = client.list_certificates()
|
||||
res.should.have.key('certificates').which.should.have.length_of(1)
|
||||
|
||||
client.detach_thing_principal(thingName=thing_name, principal=cert_arn)
|
||||
with assert_raises(ClientError) as e:
|
||||
client.delete_certificate(certificateId=cert_id)
|
||||
e.exception.response['Error']['Message'].should.contain(
|
||||
'Certificate policies must be detached before deletion (arn: %s)' % cert_arn)
|
||||
res = client.list_certificates()
|
||||
res.should.have.key('certificates').which.should.have.length_of(1)
|
||||
|
||||
client.detach_principal_policy(policyName=policy_name, principal=cert_arn)
|
||||
client.delete_certificate(certificateId=cert_id)
|
||||
res = client.list_certificates()
|
||||
res.should.have.key('certificates').which.should.have.length_of(0)
|
||||
|
||||
|
||||
@mock_iot
|
||||
def test_certs_create_inactive():
|
||||
client = boto3.client('iot', region_name='ap-northeast-1')
|
||||
|
|
@ -309,6 +401,47 @@ def test_policy():
|
|||
|
||||
@mock_iot
|
||||
def test_principal_policy():
|
||||
client = boto3.client('iot', region_name='ap-northeast-1')
|
||||
policy_name = 'my-policy'
|
||||
doc = '{}'
|
||||
client.create_policy(policyName=policy_name, policyDocument=doc)
|
||||
cert = client.create_keys_and_certificate(setAsActive=True)
|
||||
cert_arn = cert['certificateArn']
|
||||
|
||||
client.attach_policy(policyName=policy_name, target=cert_arn)
|
||||
|
||||
res = client.list_principal_policies(principal=cert_arn)
|
||||
res.should.have.key('policies').which.should.have.length_of(1)
|
||||
for policy in res['policies']:
|
||||
policy.should.have.key('policyName').which.should_not.be.none
|
||||
policy.should.have.key('policyArn').which.should_not.be.none
|
||||
|
||||
# do nothing if policy have already attached to certificate
|
||||
client.attach_policy(policyName=policy_name, target=cert_arn)
|
||||
|
||||
res = client.list_principal_policies(principal=cert_arn)
|
||||
res.should.have.key('policies').which.should.have.length_of(1)
|
||||
for policy in res['policies']:
|
||||
policy.should.have.key('policyName').which.should_not.be.none
|
||||
policy.should.have.key('policyArn').which.should_not.be.none
|
||||
|
||||
res = client.list_policy_principals(policyName=policy_name)
|
||||
res.should.have.key('principals').which.should.have.length_of(1)
|
||||
for principal in res['principals']:
|
||||
principal.should_not.be.none
|
||||
|
||||
client.detach_policy(policyName=policy_name, target=cert_arn)
|
||||
res = client.list_principal_policies(principal=cert_arn)
|
||||
res.should.have.key('policies').which.should.have.length_of(0)
|
||||
res = client.list_policy_principals(policyName=policy_name)
|
||||
res.should.have.key('principals').which.should.have.length_of(0)
|
||||
with assert_raises(ClientError) as e:
|
||||
client.detach_policy(policyName=policy_name, target=cert_arn)
|
||||
e.exception.response['Error']['Code'].should.equal('ResourceNotFoundException')
|
||||
|
||||
|
||||
@mock_iot
|
||||
def test_principal_policy_deprecated():
|
||||
client = boto3.client('iot', region_name='ap-northeast-1')
|
||||
policy_name = 'my-policy'
|
||||
doc = '{}'
|
||||
|
|
|
|||
|
|
@ -1553,6 +1553,24 @@ def test_boto3_put_bucket_tagging():
|
|||
})
|
||||
resp['ResponseMetadata']['HTTPStatusCode'].should.equal(200)
|
||||
|
||||
# With duplicate tag keys:
|
||||
with assert_raises(ClientError) as err:
|
||||
resp = s3.put_bucket_tagging(Bucket=bucket_name,
|
||||
Tagging={
|
||||
"TagSet": [
|
||||
{
|
||||
"Key": "TagOne",
|
||||
"Value": "ValueOne"
|
||||
},
|
||||
{
|
||||
"Key": "TagOne",
|
||||
"Value": "ValueOneAgain"
|
||||
}
|
||||
]
|
||||
})
|
||||
e = err.exception
|
||||
e.response["Error"]["Code"].should.equal("InvalidTag")
|
||||
e.response["Error"]["Message"].should.equal("Cannot provide multiple Tags with the same key")
|
||||
|
||||
@mock_s3
|
||||
def test_boto3_get_bucket_tagging():
|
||||
|
|
|
|||
|
|
@ -39,12 +39,28 @@ def test_create_secret():
|
|||
conn = boto3.client('secretsmanager', region_name='us-east-1')
|
||||
|
||||
result = conn.create_secret(Name='test-secret', SecretString="foosecret")
|
||||
assert result['ARN'] == (
|
||||
'arn:aws:secretsmanager:us-east-1:1234567890:secret:test-secret-rIjad')
|
||||
assert result['ARN']
|
||||
assert result['Name'] == 'test-secret'
|
||||
secret = conn.get_secret_value(SecretId='test-secret')
|
||||
assert secret['SecretString'] == 'foosecret'
|
||||
|
||||
@mock_secretsmanager
|
||||
def test_create_secret_with_tags():
|
||||
conn = boto3.client('secretsmanager', region_name='us-east-1')
|
||||
secret_name = 'test-secret-with-tags'
|
||||
|
||||
result = conn.create_secret(
|
||||
Name=secret_name,
|
||||
SecretString="foosecret",
|
||||
Tags=[{"Key": "Foo", "Value": "Bar"}, {"Key": "Mykey", "Value": "Myvalue"}]
|
||||
)
|
||||
assert result['ARN']
|
||||
assert result['Name'] == secret_name
|
||||
secret_value = conn.get_secret_value(SecretId=secret_name)
|
||||
assert secret_value['SecretString'] == 'foosecret'
|
||||
secret_details = conn.describe_secret(SecretId=secret_name)
|
||||
assert secret_details['Tags'] == [{"Key": "Foo", "Value": "Bar"}, {"Key": "Mykey", "Value": "Myvalue"}]
|
||||
|
||||
@mock_secretsmanager
|
||||
def test_get_random_password_default_length():
|
||||
conn = boto3.client('secretsmanager', region_name='us-west-2')
|
||||
|
|
@ -159,10 +175,17 @@ def test_describe_secret():
|
|||
conn.create_secret(Name='test-secret',
|
||||
SecretString='foosecret')
|
||||
|
||||
conn.create_secret(Name='test-secret-2',
|
||||
SecretString='barsecret')
|
||||
|
||||
secret_description = conn.describe_secret(SecretId='test-secret')
|
||||
secret_description_2 = conn.describe_secret(SecretId='test-secret-2')
|
||||
|
||||
assert secret_description # Returned dict is not empty
|
||||
assert secret_description['ARN'] == (
|
||||
'arn:aws:secretsmanager:us-west-2:1234567890:secret:test-secret-rIjad')
|
||||
assert secret_description['Name'] == ('test-secret')
|
||||
assert secret_description['ARN'] != '' # Test arn not empty
|
||||
assert secret_description_2['Name'] == ('test-secret-2')
|
||||
assert secret_description_2['ARN'] != '' # Test arn not empty
|
||||
|
||||
@mock_secretsmanager
|
||||
def test_describe_secret_that_does_not_exist():
|
||||
|
|
@ -190,9 +213,7 @@ def test_rotate_secret():
|
|||
rotated_secret = conn.rotate_secret(SecretId=secret_name)
|
||||
|
||||
assert rotated_secret
|
||||
assert rotated_secret['ARN'] == (
|
||||
'arn:aws:secretsmanager:us-west-2:1234567890:secret:test-secret-rIjad'
|
||||
)
|
||||
assert rotated_secret['ARN'] != '' # Test arn not empty
|
||||
assert rotated_secret['Name'] == secret_name
|
||||
assert rotated_secret['VersionId'] != ''
|
||||
|
||||
|
|
|
|||
|
|
@ -82,11 +82,20 @@ def test_create_secret():
|
|||
headers={
|
||||
"X-Amz-Target": "secretsmanager.CreateSecret"},
|
||||
)
|
||||
res_2 = test_client.post('/',
|
||||
data={"Name": "test-secret-2",
|
||||
"SecretString": "bar-secret"},
|
||||
headers={
|
||||
"X-Amz-Target": "secretsmanager.CreateSecret"},
|
||||
)
|
||||
|
||||
json_data = json.loads(res.data.decode("utf-8"))
|
||||
assert json_data['ARN'] == (
|
||||
'arn:aws:secretsmanager:us-east-1:1234567890:secret:test-secret-rIjad')
|
||||
assert json_data['ARN'] != ''
|
||||
assert json_data['Name'] == 'test-secret'
|
||||
|
||||
json_data_2 = json.loads(res_2.data.decode("utf-8"))
|
||||
assert json_data_2['ARN'] != ''
|
||||
assert json_data_2['Name'] == 'test-secret-2'
|
||||
|
||||
@mock_secretsmanager
|
||||
def test_describe_secret():
|
||||
|
|
@ -107,12 +116,30 @@ def test_describe_secret():
|
|||
"X-Amz-Target": "secretsmanager.DescribeSecret"
|
||||
},
|
||||
)
|
||||
|
||||
create_secret_2 = test_client.post('/',
|
||||
data={"Name": "test-secret-2",
|
||||
"SecretString": "barsecret"},
|
||||
headers={
|
||||
"X-Amz-Target": "secretsmanager.CreateSecret"
|
||||
},
|
||||
)
|
||||
describe_secret_2 = test_client.post('/',
|
||||
data={"SecretId": "test-secret-2"},
|
||||
headers={
|
||||
"X-Amz-Target": "secretsmanager.DescribeSecret"
|
||||
},
|
||||
)
|
||||
|
||||
json_data = json.loads(describe_secret.data.decode("utf-8"))
|
||||
assert json_data # Returned dict is not empty
|
||||
assert json_data['ARN'] == (
|
||||
'arn:aws:secretsmanager:us-east-1:1234567890:secret:test-secret-rIjad'
|
||||
)
|
||||
assert json_data['ARN'] != ''
|
||||
assert json_data['Name'] == 'test-secret'
|
||||
|
||||
json_data_2 = json.loads(describe_secret_2.data.decode("utf-8"))
|
||||
assert json_data_2 # Returned dict is not empty
|
||||
assert json_data_2['ARN'] != ''
|
||||
assert json_data_2['Name'] == 'test-secret-2'
|
||||
|
||||
@mock_secretsmanager
|
||||
def test_describe_secret_that_does_not_exist():
|
||||
|
|
@ -179,9 +206,7 @@ def test_rotate_secret():
|
|||
|
||||
json_data = json.loads(rotate_secret.data.decode("utf-8"))
|
||||
assert json_data # Returned dict is not empty
|
||||
assert json_data['ARN'] == (
|
||||
'arn:aws:secretsmanager:us-east-1:1234567890:secret:test-secret-rIjad'
|
||||
)
|
||||
assert json_data['ARN'] != ''
|
||||
assert json_data['Name'] == 'test-secret'
|
||||
assert json_data['VersionId'] == client_request_token
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue