Merge remote-tracking branch 'spulec/moto/master' into organizations_support

This commit is contained in:
Ashley Gould 2018-09-28 08:52:22 -07:00
commit 95700d6631
71 changed files with 2645 additions and 267 deletions

View file

@ -85,3 +85,14 @@ class TesterWithSetup(unittest.TestCase):
def test_still_the_same(self):
bucket = self.conn.get_bucket('mybucket')
bucket.name.should.equal("mybucket")
@mock_s3_deprecated
class TesterWithStaticmethod(object):
@staticmethod
def static(*args):
assert not args or not isinstance(args[0], TesterWithStaticmethod)
def test_no_instance_sent_to_staticmethod(self):
self.static()

View file

@ -201,6 +201,48 @@ def test_item_add_empty_string_exception():
)
@requires_boto_gte("2.9")
@mock_dynamodb2
def test_update_item_with_empty_string_exception():
name = 'TestTable'
conn = boto3.client('dynamodb',
region_name='us-west-2',
aws_access_key_id="ak",
aws_secret_access_key="sk")
conn.create_table(TableName=name,
KeySchema=[{'AttributeName':'forum_name','KeyType':'HASH'}],
AttributeDefinitions=[{'AttributeName':'forum_name','AttributeType':'S'}],
ProvisionedThroughput={'ReadCapacityUnits':5,'WriteCapacityUnits':5})
conn.put_item(
TableName=name,
Item={
'forum_name': { 'S': 'LOLCat Forum' },
'subject': { 'S': 'Check this out!' },
'Body': { 'S': 'http://url_to_lolcat.gif'},
'SentBy': { 'S': "test" },
'ReceivedTime': { 'S': '12/9/2011 11:36:03 PM'},
}
)
with assert_raises(ClientError) as ex:
conn.update_item(
TableName=name,
Key={
'forum_name': { 'S': 'LOLCat Forum'},
},
UpdateExpression='set Body=:Body',
ExpressionAttributeValues={
':Body': {'S': ''}
})
ex.exception.response['Error']['Code'].should.equal('ValidationException')
ex.exception.response['ResponseMetadata']['HTTPStatusCode'].should.equal(400)
ex.exception.response['Error']['Message'].should.equal(
'One or more parameter values were invalid: An AttributeValue may not contain an empty string'
)
@requires_boto_gte("2.9")
@mock_dynamodb2
def test_query_invalid_table():

View file

@ -199,7 +199,7 @@ def test_igw_desribe():
@mock_ec2_deprecated
def test_igw_desribe_bad_id():
def test_igw_describe_bad_id():
""" internet gateway fail to fetch by bad id """
conn = boto.connect_vpc('the_key', 'the_secret')
with assert_raises(EC2ResponseError) as cm:

View file

@ -2,12 +2,15 @@ from __future__ import unicode_literals
# Ensure 'assert_raises' context manager support for Python 2.6
import tests.backport_assert_raises
from nose.tools import assert_raises
from moto.ec2.exceptions import EC2ClientError
from botocore.exceptions import ClientError
import boto3
import boto
from boto.exception import EC2ResponseError
import sure # noqa
from moto import mock_ec2_deprecated
from moto import mock_ec2, mock_ec2_deprecated
from tests.helpers import requires_boto_gte
@ -93,3 +96,37 @@ def test_vpc_peering_connections_delete():
cm.exception.code.should.equal('InvalidVpcPeeringConnectionId.NotFound')
cm.exception.status.should.equal(400)
cm.exception.request_id.should_not.be.none
@mock_ec2
def test_vpc_peering_connections_cross_region():
# create vpc in us-west-1 and ap-northeast-1
ec2_usw1 = boto3.resource('ec2', region_name='us-west-1')
vpc_usw1 = ec2_usw1.create_vpc(CidrBlock='10.90.0.0/16')
ec2_apn1 = boto3.resource('ec2', region_name='ap-northeast-1')
vpc_apn1 = ec2_apn1.create_vpc(CidrBlock='10.20.0.0/16')
# create peering
vpc_pcx = ec2_usw1.create_vpc_peering_connection(
VpcId=vpc_usw1.id,
PeerVpcId=vpc_apn1.id,
PeerRegion='ap-northeast-1',
)
vpc_pcx.status['Code'].should.equal('initiating-request')
vpc_pcx.requester_vpc.id.should.equal(vpc_usw1.id)
vpc_pcx.accepter_vpc.id.should.equal(vpc_apn1.id)
@mock_ec2
def test_vpc_peering_connections_cross_region_fail():
# create vpc in us-west-1 and ap-northeast-1
ec2_usw1 = boto3.resource('ec2', region_name='us-west-1')
vpc_usw1 = ec2_usw1.create_vpc(CidrBlock='10.90.0.0/16')
ec2_apn1 = boto3.resource('ec2', region_name='ap-northeast-1')
vpc_apn1 = ec2_apn1.create_vpc(CidrBlock='10.20.0.0/16')
# create peering wrong region with no vpc
with assert_raises(ClientError) as cm:
ec2_usw1.create_vpc_peering_connection(
VpcId=vpc_usw1.id,
PeerVpcId=vpc_apn1.id,
PeerRegion='ap-northeast-2')
cm.exception.response['Error']['Code'].should.equal('InvalidVpcID.NotFound')

View file

@ -45,7 +45,8 @@ def _create_image_manifest():
{
"mediaType": "application/vnd.docker.image.rootfs.diff.tar.gzip",
"size": 73109,
"digest": _create_image_digest("layer3")
# randomize image digest
"digest": _create_image_digest()
}
]
}
@ -197,6 +198,47 @@ def test_put_image():
response['image']['repositoryName'].should.equal('test_repository')
response['image']['registryId'].should.equal('012345678910')
@mock_ecr
def test_put_image_with_multiple_tags():
client = boto3.client('ecr', region_name='us-east-1')
_ = client.create_repository(
repositoryName='test_repository'
)
manifest = _create_image_manifest()
response = client.put_image(
repositoryName='test_repository',
imageManifest=json.dumps(manifest),
imageTag='v1'
)
response['image']['imageId']['imageTag'].should.equal('v1')
response['image']['imageId']['imageDigest'].should.contain("sha")
response['image']['repositoryName'].should.equal('test_repository')
response['image']['registryId'].should.equal('012345678910')
response1 = client.put_image(
repositoryName='test_repository',
imageManifest=json.dumps(manifest),
imageTag='latest'
)
response1['image']['imageId']['imageTag'].should.equal('latest')
response1['image']['imageId']['imageDigest'].should.contain("sha")
response1['image']['repositoryName'].should.equal('test_repository')
response1['image']['registryId'].should.equal('012345678910')
response2 = client.describe_images(repositoryName='test_repository')
type(response2['imageDetails']).should.be(list)
len(response2['imageDetails']).should.be(1)
response2['imageDetails'][0]['imageDigest'].should.contain("sha")
response2['imageDetails'][0]['registryId'].should.equal("012345678910")
response2['imageDetails'][0]['repositoryName'].should.equal("test_repository")
len(response2['imageDetails'][0]['imageTags']).should.be(2)
response2['imageDetails'][0]['imageTags'].should.be.equal(['v1', 'latest'])
@mock_ecr
def test_list_images():
@ -281,6 +323,11 @@ def test_describe_images():
repositoryName='test_repository'
)
_ = client.put_image(
repositoryName='test_repository',
imageManifest=json.dumps(_create_image_manifest())
)
_ = client.put_image(
repositoryName='test_repository',
imageManifest=json.dumps(_create_image_manifest()),
@ -301,32 +348,37 @@ def test_describe_images():
response = client.describe_images(repositoryName='test_repository')
type(response['imageDetails']).should.be(list)
len(response['imageDetails']).should.be(3)
len(response['imageDetails']).should.be(4)
response['imageDetails'][0]['imageDigest'].should.contain("sha")
response['imageDetails'][1]['imageDigest'].should.contain("sha")
response['imageDetails'][2]['imageDigest'].should.contain("sha")
response['imageDetails'][3]['imageDigest'].should.contain("sha")
response['imageDetails'][0]['registryId'].should.equal("012345678910")
response['imageDetails'][1]['registryId'].should.equal("012345678910")
response['imageDetails'][2]['registryId'].should.equal("012345678910")
response['imageDetails'][3]['registryId'].should.equal("012345678910")
response['imageDetails'][0]['repositoryName'].should.equal("test_repository")
response['imageDetails'][1]['repositoryName'].should.equal("test_repository")
response['imageDetails'][2]['repositoryName'].should.equal("test_repository")
response['imageDetails'][3]['repositoryName'].should.equal("test_repository")
len(response['imageDetails'][0]['imageTags']).should.be(1)
response['imageDetails'][0].should_not.have.key('imageTags')
len(response['imageDetails'][1]['imageTags']).should.be(1)
len(response['imageDetails'][2]['imageTags']).should.be(1)
len(response['imageDetails'][3]['imageTags']).should.be(1)
image_tags = ['latest', 'v1', 'v2']
set([response['imageDetails'][0]['imageTags'][0],
response['imageDetails'][1]['imageTags'][0],
response['imageDetails'][2]['imageTags'][0]]).should.equal(set(image_tags))
set([response['imageDetails'][1]['imageTags'][0],
response['imageDetails'][2]['imageTags'][0],
response['imageDetails'][3]['imageTags'][0]]).should.equal(set(image_tags))
response['imageDetails'][0]['imageSizeInBytes'].should.equal(52428800)
response['imageDetails'][1]['imageSizeInBytes'].should.equal(52428800)
response['imageDetails'][2]['imageSizeInBytes'].should.equal(52428800)
response['imageDetails'][3]['imageSizeInBytes'].should.equal(52428800)
@mock_ecr
@ -355,6 +407,68 @@ def test_describe_images_by_tag():
image_detail['imageDigest'].should.equal(put_response['imageId']['imageDigest'])
@mock_ecr
def test_describe_images_tags_should_not_contain_empty_tag1():
client = boto3.client('ecr', region_name='us-east-1')
_ = client.create_repository(
repositoryName='test_repository'
)
manifest = _create_image_manifest()
client.put_image(
repositoryName='test_repository',
imageManifest=json.dumps(manifest)
)
tags = ['v1', 'v2', 'latest']
for tag in tags:
client.put_image(
repositoryName='test_repository',
imageManifest=json.dumps(manifest),
imageTag=tag
)
response = client.describe_images(repositoryName='test_repository', imageIds=[{'imageTag': tag}])
len(response['imageDetails']).should.be(1)
image_detail = response['imageDetails'][0]
len(image_detail['imageTags']).should.equal(3)
image_detail['imageTags'].should.be.equal(tags)
@mock_ecr
def test_describe_images_tags_should_not_contain_empty_tag2():
client = boto3.client('ecr', region_name='us-east-1')
_ = client.create_repository(
repositoryName='test_repository'
)
manifest = _create_image_manifest()
tags = ['v1', 'v2']
for tag in tags:
client.put_image(
repositoryName='test_repository',
imageManifest=json.dumps(manifest),
imageTag=tag
)
client.put_image(
repositoryName='test_repository',
imageManifest=json.dumps(manifest)
)
client.put_image(
repositoryName='test_repository',
imageManifest=json.dumps(manifest),
imageTag='latest'
)
response = client.describe_images(repositoryName='test_repository', imageIds=[{'imageTag': tag}])
len(response['imageDetails']).should.be(1)
image_detail = response['imageDetails'][0]
len(image_detail['imageTags']).should.equal(3)
image_detail['imageTags'].should.be.equal(['v1', 'v2', 'latest'])
@mock_ecr
def test_describe_repository_that_doesnt_exist():
client = boto3.client('ecr', region_name='us-east-1')

View file

@ -304,6 +304,52 @@ def test_create_service():
response['service']['status'].should.equal('ACTIVE')
response['service']['taskDefinition'].should.equal(
'arn:aws:ecs:us-east-1:012345678910:task-definition/test_ecs_task:1')
response['service']['schedulingStrategy'].should.equal('REPLICA')
@mock_ecs
def test_create_service_scheduling_strategy():
client = boto3.client('ecs', region_name='us-east-1')
_ = client.create_cluster(
clusterName='test_ecs_cluster'
)
_ = client.register_task_definition(
family='test_ecs_task',
containerDefinitions=[
{
'name': 'hello_world',
'image': 'docker/hello-world:latest',
'cpu': 1024,
'memory': 400,
'essential': True,
'environment': [{
'name': 'AWS_ACCESS_KEY_ID',
'value': 'SOME_ACCESS_KEY'
}],
'logConfiguration': {'logDriver': 'json-file'}
}
]
)
response = client.create_service(
cluster='test_ecs_cluster',
serviceName='test_ecs_service',
taskDefinition='test_ecs_task',
desiredCount=2,
schedulingStrategy='DAEMON',
)
response['service']['clusterArn'].should.equal(
'arn:aws:ecs:us-east-1:012345678910:cluster/test_ecs_cluster')
response['service']['desiredCount'].should.equal(2)
len(response['service']['events']).should.equal(0)
len(response['service']['loadBalancers']).should.equal(0)
response['service']['pendingCount'].should.equal(0)
response['service']['runningCount'].should.equal(0)
response['service']['serviceArn'].should.equal(
'arn:aws:ecs:us-east-1:012345678910:service/test_ecs_service')
response['service']['serviceName'].should.equal('test_ecs_service')
response['service']['status'].should.equal('ACTIVE')
response['service']['taskDefinition'].should.equal(
'arn:aws:ecs:us-east-1:012345678910:task-definition/test_ecs_task:1')
response['service']['schedulingStrategy'].should.equal('DAEMON')
@mock_ecs
@ -411,6 +457,72 @@ def test_describe_services():
response['services'][0]['deployments'][0]['status'].should.equal('PRIMARY')
@mock_ecs
def test_describe_services_scheduling_strategy():
client = boto3.client('ecs', region_name='us-east-1')
_ = client.create_cluster(
clusterName='test_ecs_cluster'
)
_ = client.register_task_definition(
family='test_ecs_task',
containerDefinitions=[
{
'name': 'hello_world',
'image': 'docker/hello-world:latest',
'cpu': 1024,
'memory': 400,
'essential': True,
'environment': [{
'name': 'AWS_ACCESS_KEY_ID',
'value': 'SOME_ACCESS_KEY'
}],
'logConfiguration': {'logDriver': 'json-file'}
}
]
)
_ = client.create_service(
cluster='test_ecs_cluster',
serviceName='test_ecs_service1',
taskDefinition='test_ecs_task',
desiredCount=2
)
_ = client.create_service(
cluster='test_ecs_cluster',
serviceName='test_ecs_service2',
taskDefinition='test_ecs_task',
desiredCount=2,
schedulingStrategy='DAEMON'
)
_ = client.create_service(
cluster='test_ecs_cluster',
serviceName='test_ecs_service3',
taskDefinition='test_ecs_task',
desiredCount=2
)
response = client.describe_services(
cluster='test_ecs_cluster',
services=['test_ecs_service1',
'arn:aws:ecs:us-east-1:012345678910:service/test_ecs_service2',
'test_ecs_service3']
)
len(response['services']).should.equal(3)
response['services'][0]['serviceArn'].should.equal(
'arn:aws:ecs:us-east-1:012345678910:service/test_ecs_service1')
response['services'][0]['serviceName'].should.equal('test_ecs_service1')
response['services'][1]['serviceArn'].should.equal(
'arn:aws:ecs:us-east-1:012345678910:service/test_ecs_service2')
response['services'][1]['serviceName'].should.equal('test_ecs_service2')
response['services'][0]['deployments'][0]['desiredCount'].should.equal(2)
response['services'][0]['deployments'][0]['pendingCount'].should.equal(2)
response['services'][0]['deployments'][0]['runningCount'].should.equal(0)
response['services'][0]['deployments'][0]['status'].should.equal('PRIMARY')
response['services'][0]['schedulingStrategy'].should.equal('REPLICA')
response['services'][1]['schedulingStrategy'].should.equal('DAEMON')
response['services'][2]['schedulingStrategy'].should.equal('REPLICA')
@mock_ecs
def test_update_service():
client = boto3.client('ecs', region_name='us-east-1')
@ -449,6 +561,7 @@ def test_update_service():
desiredCount=0
)
response['service']['desiredCount'].should.equal(0)
response['service']['schedulingStrategy'].should.equal('REPLICA')
@mock_ecs
@ -515,8 +628,10 @@ def test_delete_service():
'arn:aws:ecs:us-east-1:012345678910:service/test_ecs_service')
response['service']['serviceName'].should.equal('test_ecs_service')
response['service']['status'].should.equal('ACTIVE')
response['service']['schedulingStrategy'].should.equal('REPLICA')
response['service']['taskDefinition'].should.equal(
'arn:aws:ecs:us-east-1:012345678910:task-definition/test_ecs_task:1')
@mock_ec2

View file

@ -723,6 +723,40 @@ def test_describe_instance_health():
instances_health[0].state.should.equal('InService')
@mock_ec2
@mock_elb
def test_describe_instance_health_boto3():
elb = boto3.client('elb', region_name="us-east-1")
ec2 = boto3.client('ec2', region_name="us-east-1")
instances = ec2.run_instances(MinCount=2, MaxCount=2)['Instances']
lb_name = "my_load_balancer"
elb.create_load_balancer(
Listeners=[{
'InstancePort': 80,
'LoadBalancerPort': 8080,
'Protocol': 'HTTP'
}],
LoadBalancerName=lb_name,
)
elb.register_instances_with_load_balancer(
LoadBalancerName=lb_name,
Instances=[{'InstanceId': instances[0]['InstanceId']}]
)
instances_health = elb.describe_instance_health(
LoadBalancerName=lb_name,
Instances=[{'InstanceId': instance['InstanceId']} for instance in instances]
)
instances_health['InstanceStates'].should.have.length_of(2)
instances_health['InstanceStates'][0]['InstanceId'].\
should.equal(instances[0]['InstanceId'])
instances_health['InstanceStates'][0]['State'].\
should.equal('InService')
instances_health['InstanceStates'][1]['InstanceId'].\
should.equal(instances[1]['InstanceId'])
instances_health['InstanceStates'][1]['State'].\
should.equal('Unknown')
@mock_elb
def test_add_remove_tags():
client = boto3.client('elb', region_name='us-east-1')

View file

@ -0,0 +1 @@
from __future__ import unicode_literals

View file

@ -0,0 +1 @@
from __future__ import unicode_literals

View file

@ -0,0 +1,31 @@
from __future__ import unicode_literals
TABLE_INPUT = {
'Owner': 'a_fake_owner',
'Parameters': {
'EXTERNAL': 'TRUE',
},
'Retention': 0,
'StorageDescriptor': {
'BucketColumns': [],
'Compressed': False,
'InputFormat': 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat',
'NumberOfBuckets': -1,
'OutputFormat': 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat',
'Parameters': {},
'SerdeInfo': {
'Parameters': {
'serialization.format': '1'
},
'SerializationLibrary': 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
},
'SkewedInfo': {
'SkewedColumnNames': [],
'SkewedColumnValueLocationMaps': {},
'SkewedColumnValues': []
},
'SortColumns': [],
'StoredAsSubDirectories': False
},
'TableType': 'EXTERNAL_TABLE',
}

View file

@ -0,0 +1,46 @@
from __future__ import unicode_literals
import copy
from .fixtures.datacatalog import TABLE_INPUT
def create_database(client, database_name):
return client.create_database(
DatabaseInput={
'Name': database_name
}
)
def get_database(client, database_name):
return client.get_database(Name=database_name)
def create_table_input(table_name, s3_location, columns=[], partition_keys=[]):
table_input = copy.deepcopy(TABLE_INPUT)
table_input['Name'] = table_name
table_input['PartitionKeys'] = partition_keys
table_input['StorageDescriptor']['Columns'] = columns
table_input['StorageDescriptor']['Location'] = s3_location
return table_input
def create_table(client, database_name, table_name, table_input):
return client.create_table(
DatabaseName=database_name,
TableInput=table_input
)
def get_table(client, database_name, table_name):
return client.get_table(
DatabaseName=database_name,
Name=table_name
)
def get_tables(client, database_name):
return client.get_tables(
DatabaseName=database_name
)

View file

@ -0,0 +1,108 @@
from __future__ import unicode_literals
import sure # noqa
from nose.tools import assert_raises
import boto3
from botocore.client import ClientError
from moto import mock_glue
from . import helpers
@mock_glue
def test_create_database():
client = boto3.client('glue', region_name='us-east-1')
database_name = 'myspecialdatabase'
helpers.create_database(client, database_name)
response = helpers.get_database(client, database_name)
database = response['Database']
database.should.equal({'Name': database_name})
@mock_glue
def test_create_database_already_exists():
client = boto3.client('glue', region_name='us-east-1')
database_name = 'cantcreatethisdatabasetwice'
helpers.create_database(client, database_name)
with assert_raises(ClientError) as exc:
helpers.create_database(client, database_name)
exc.exception.response['Error']['Code'].should.equal('DatabaseAlreadyExistsException')
@mock_glue
def test_create_table():
client = boto3.client('glue', region_name='us-east-1')
database_name = 'myspecialdatabase'
helpers.create_database(client, database_name)
table_name = 'myspecialtable'
s3_location = 's3://my-bucket/{database_name}/{table_name}'.format(
database_name=database_name,
table_name=table_name
)
table_input = helpers.create_table_input(table_name, s3_location)
helpers.create_table(client, database_name, table_name, table_input)
response = helpers.get_table(client, database_name, table_name)
table = response['Table']
table['Name'].should.equal(table_input['Name'])
table['StorageDescriptor'].should.equal(table_input['StorageDescriptor'])
table['PartitionKeys'].should.equal(table_input['PartitionKeys'])
@mock_glue
def test_create_table_already_exists():
client = boto3.client('glue', region_name='us-east-1')
database_name = 'myspecialdatabase'
helpers.create_database(client, database_name)
table_name = 'cantcreatethistabletwice'
s3_location = 's3://my-bucket/{database_name}/{table_name}'.format(
database_name=database_name,
table_name=table_name
)
table_input = helpers.create_table_input(table_name, s3_location)
helpers.create_table(client, database_name, table_name, table_input)
with assert_raises(ClientError) as exc:
helpers.create_table(client, database_name, table_name, table_input)
exc.exception.response['Error']['Code'].should.equal('TableAlreadyExistsException')
@mock_glue
def test_get_tables():
client = boto3.client('glue', region_name='us-east-1')
database_name = 'myspecialdatabase'
helpers.create_database(client, database_name)
table_names = ['myfirsttable', 'mysecondtable', 'mythirdtable']
table_inputs = {}
for table_name in table_names:
s3_location = 's3://my-bucket/{database_name}/{table_name}'.format(
database_name=database_name,
table_name=table_name
)
table_input = helpers.create_table_input(table_name, s3_location)
table_inputs[table_name] = table_input
helpers.create_table(client, database_name, table_name, table_input)
response = helpers.get_tables(client, database_name)
tables = response['TableList']
assert len(tables) == 3
for table in tables:
table_name = table['Name']
table_name.should.equal(table_inputs[table_name]['Name'])
table['StorageDescriptor'].should.equal(table_inputs[table_name]['StorageDescriptor'])
table['PartitionKeys'].should.equal(table_inputs[table_name]['PartitionKeys'])

View file

@ -286,6 +286,16 @@ def test_create_policy_versions():
PolicyDocument='{"some":"policy"}')
version.get('PolicyVersion').get('Document').should.equal({'some': 'policy'})
@mock_iam
def test_get_policy():
conn = boto3.client('iam', region_name='us-east-1')
response = conn.create_policy(
PolicyName="TestGetPolicy",
PolicyDocument='{"some":"policy"}')
policy = conn.get_policy(
PolicyArn="arn:aws:iam::123456789012:policy/TestGetPolicy")
response['Policy']['Arn'].should.equal("arn:aws:iam::123456789012:policy/TestGetPolicy")
@mock_iam
def test_get_policy_version():
@ -314,17 +324,22 @@ def test_list_policy_versions():
PolicyArn="arn:aws:iam::123456789012:policy/TestListPolicyVersions")
conn.create_policy(
PolicyName="TestListPolicyVersions",
PolicyDocument='{"some":"policy"}')
conn.create_policy_version(
PolicyArn="arn:aws:iam::123456789012:policy/TestListPolicyVersions",
PolicyDocument='{"first":"policy"}')
versions = conn.list_policy_versions(
PolicyArn="arn:aws:iam::123456789012:policy/TestListPolicyVersions")
versions.get('Versions')[0].get('VersionId').should.equal('v1')
conn.create_policy_version(
PolicyArn="arn:aws:iam::123456789012:policy/TestListPolicyVersions",
PolicyDocument='{"second":"policy"}')
conn.create_policy_version(
PolicyArn="arn:aws:iam::123456789012:policy/TestListPolicyVersions",
PolicyDocument='{"third":"policy"}')
versions = conn.list_policy_versions(
PolicyArn="arn:aws:iam::123456789012:policy/TestListPolicyVersions")
versions.get('Versions')[0].get('Document').should.equal({'first': 'policy'})
print(versions.get('Versions'))
versions.get('Versions')[1].get('Document').should.equal({'second': 'policy'})
versions.get('Versions')[2].get('Document').should.equal({'third': 'policy'})
@mock_iam
@ -332,20 +347,20 @@ def test_delete_policy_version():
conn = boto3.client('iam', region_name='us-east-1')
conn.create_policy(
PolicyName="TestDeletePolicyVersion",
PolicyDocument='{"some":"policy"}')
PolicyDocument='{"first":"policy"}')
conn.create_policy_version(
PolicyArn="arn:aws:iam::123456789012:policy/TestDeletePolicyVersion",
PolicyDocument='{"first":"policy"}')
PolicyDocument='{"second":"policy"}')
with assert_raises(ClientError):
conn.delete_policy_version(
PolicyArn="arn:aws:iam::123456789012:policy/TestDeletePolicyVersion",
VersionId='v2-nope-this-does-not-exist')
conn.delete_policy_version(
PolicyArn="arn:aws:iam::123456789012:policy/TestDeletePolicyVersion",
VersionId='v1')
VersionId='v2')
versions = conn.list_policy_versions(
PolicyArn="arn:aws:iam::123456789012:policy/TestDeletePolicyVersion")
len(versions.get('Versions')).should.equal(0)
len(versions.get('Versions')).should.equal(1)
@mock_iam_deprecated()
@ -678,3 +693,68 @@ def test_update_access_key():
Status='Inactive')
resp = client.list_access_keys(UserName=username)
resp['AccessKeyMetadata'][0]['Status'].should.equal('Inactive')
@mock_iam
def test_get_account_authorization_details():
import json
conn = boto3.client('iam', region_name='us-east-1')
conn.create_role(RoleName="my-role", AssumeRolePolicyDocument="some policy", Path="/my-path/")
conn.create_user(Path='/', UserName='testCloudAuxUser')
conn.create_group(Path='/', GroupName='testCloudAuxGroup')
conn.create_policy(
PolicyName='testCloudAuxPolicy',
Path='/',
PolicyDocument=json.dumps({
"Version": "2012-10-17",
"Statement": [
{
"Action": "s3:ListBucket",
"Resource": "*",
"Effect": "Allow",
}
]
}),
Description='Test CloudAux Policy'
)
result = conn.get_account_authorization_details(Filter=['Role'])
len(result['RoleDetailList']) == 1
len(result['UserDetailList']) == 0
len(result['GroupDetailList']) == 0
len(result['Policies']) == 0
result = conn.get_account_authorization_details(Filter=['User'])
len(result['RoleDetailList']) == 0
len(result['UserDetailList']) == 1
len(result['GroupDetailList']) == 0
len(result['Policies']) == 0
result = conn.get_account_authorization_details(Filter=['Group'])
len(result['RoleDetailList']) == 0
len(result['UserDetailList']) == 0
len(result['GroupDetailList']) == 1
len(result['Policies']) == 0
result = conn.get_account_authorization_details(Filter=['LocalManagedPolicy'])
len(result['RoleDetailList']) == 0
len(result['UserDetailList']) == 0
len(result['GroupDetailList']) == 0
len(result['Policies']) == 1
# Check for greater than 1 since this should always be greater than one but might change.
# See iam/aws_managed_policies.py
result = conn.get_account_authorization_details(Filter=['AWSManagedPolicy'])
len(result['RoleDetailList']) == 0
len(result['UserDetailList']) == 0
len(result['GroupDetailList']) == 0
len(result['Policies']) > 1
result = conn.get_account_authorization_details()
len(result['RoleDetailList']) == 1
len(result['UserDetailList']) == 1
len(result['GroupDetailList']) == 1
len(result['Policies']) > 1

View file

@ -1,8 +1,9 @@
from __future__ import unicode_literals
import boto3
import sure # noqa
import json
import sure # noqa
import boto3
from moto import mock_iot
@ -63,6 +64,166 @@ def test_things():
res.should.have.key('thingTypes').which.should.have.length_of(0)
@mock_iot
def test_list_thing_types():
client = boto3.client('iot', region_name='ap-northeast-1')
for i in range(0, 100):
client.create_thing_type(thingTypeName=str(i + 1))
thing_types = client.list_thing_types()
thing_types.should.have.key('nextToken')
thing_types.should.have.key('thingTypes').which.should.have.length_of(50)
thing_types['thingTypes'][0]['thingTypeName'].should.equal('1')
thing_types['thingTypes'][-1]['thingTypeName'].should.equal('50')
thing_types = client.list_thing_types(nextToken=thing_types['nextToken'])
thing_types.should.have.key('thingTypes').which.should.have.length_of(50)
thing_types.should_not.have.key('nextToken')
thing_types['thingTypes'][0]['thingTypeName'].should.equal('51')
thing_types['thingTypes'][-1]['thingTypeName'].should.equal('100')
@mock_iot
def test_list_thing_types_with_typename_filter():
client = boto3.client('iot', region_name='ap-northeast-1')
client.create_thing_type(thingTypeName='thing')
client.create_thing_type(thingTypeName='thingType')
client.create_thing_type(thingTypeName='thingTypeName')
client.create_thing_type(thingTypeName='thingTypeNameGroup')
client.create_thing_type(thingTypeName='shouldNotFind')
client.create_thing_type(thingTypeName='find me it shall not')
thing_types = client.list_thing_types(thingTypeName='thing')
thing_types.should_not.have.key('nextToken')
thing_types.should.have.key('thingTypes').which.should.have.length_of(4)
thing_types['thingTypes'][0]['thingTypeName'].should.equal('thing')
thing_types['thingTypes'][-1]['thingTypeName'].should.equal('thingTypeNameGroup')
thing_types = client.list_thing_types(thingTypeName='thingTypeName')
thing_types.should_not.have.key('nextToken')
thing_types.should.have.key('thingTypes').which.should.have.length_of(2)
thing_types['thingTypes'][0]['thingTypeName'].should.equal('thingTypeName')
thing_types['thingTypes'][-1]['thingTypeName'].should.equal('thingTypeNameGroup')
@mock_iot
def test_list_things_with_next_token():
client = boto3.client('iot', region_name='ap-northeast-1')
for i in range(0, 200):
client.create_thing(thingName=str(i + 1))
things = client.list_things()
things.should.have.key('nextToken')
things.should.have.key('things').which.should.have.length_of(50)
things['things'][0]['thingName'].should.equal('1')
things['things'][0]['thingArn'].should.equal('arn:aws:iot:ap-northeast-1:1:thing/1')
things['things'][-1]['thingName'].should.equal('50')
things['things'][-1]['thingArn'].should.equal('arn:aws:iot:ap-northeast-1:1:thing/50')
things = client.list_things(nextToken=things['nextToken'])
things.should.have.key('nextToken')
things.should.have.key('things').which.should.have.length_of(50)
things['things'][0]['thingName'].should.equal('51')
things['things'][0]['thingArn'].should.equal('arn:aws:iot:ap-northeast-1:1:thing/51')
things['things'][-1]['thingName'].should.equal('100')
things['things'][-1]['thingArn'].should.equal('arn:aws:iot:ap-northeast-1:1:thing/100')
things = client.list_things(nextToken=things['nextToken'])
things.should.have.key('nextToken')
things.should.have.key('things').which.should.have.length_of(50)
things['things'][0]['thingName'].should.equal('101')
things['things'][0]['thingArn'].should.equal('arn:aws:iot:ap-northeast-1:1:thing/101')
things['things'][-1]['thingName'].should.equal('150')
things['things'][-1]['thingArn'].should.equal('arn:aws:iot:ap-northeast-1:1:thing/150')
things = client.list_things(nextToken=things['nextToken'])
things.should_not.have.key('nextToken')
things.should.have.key('things').which.should.have.length_of(50)
things['things'][0]['thingName'].should.equal('151')
things['things'][0]['thingArn'].should.equal('arn:aws:iot:ap-northeast-1:1:thing/151')
things['things'][-1]['thingName'].should.equal('200')
things['things'][-1]['thingArn'].should.equal('arn:aws:iot:ap-northeast-1:1:thing/200')
@mock_iot
def test_list_things_with_attribute_and_thing_type_filter_and_next_token():
client = boto3.client('iot', region_name='ap-northeast-1')
client.create_thing_type(thingTypeName='my-thing-type')
for i in range(0, 200):
if not (i + 1) % 3:
attribute_payload = {
'attributes': {
'foo': 'bar'
}
}
elif not (i + 1) % 5:
attribute_payload = {
'attributes': {
'bar': 'foo'
}
}
else:
attribute_payload = {}
if not (i + 1) % 2:
thing_type_name = 'my-thing-type'
client.create_thing(thingName=str(i + 1), thingTypeName=thing_type_name, attributePayload=attribute_payload)
else:
client.create_thing(thingName=str(i + 1), attributePayload=attribute_payload)
# Test filter for thingTypeName
things = client.list_things(thingTypeName=thing_type_name)
things.should.have.key('nextToken')
things.should.have.key('things').which.should.have.length_of(50)
things['things'][0]['thingName'].should.equal('2')
things['things'][0]['thingArn'].should.equal('arn:aws:iot:ap-northeast-1:1:thing/2')
things['things'][-1]['thingName'].should.equal('100')
things['things'][-1]['thingArn'].should.equal('arn:aws:iot:ap-northeast-1:1:thing/100')
all(item['thingTypeName'] == thing_type_name for item in things['things'])
things = client.list_things(nextToken=things['nextToken'], thingTypeName=thing_type_name)
things.should_not.have.key('nextToken')
things.should.have.key('things').which.should.have.length_of(50)
things['things'][0]['thingName'].should.equal('102')
things['things'][0]['thingArn'].should.equal('arn:aws:iot:ap-northeast-1:1:thing/102')
things['things'][-1]['thingName'].should.equal('200')
things['things'][-1]['thingArn'].should.equal('arn:aws:iot:ap-northeast-1:1:thing/200')
all(item['thingTypeName'] == thing_type_name for item in things['things'])
# Test filter for attributes
things = client.list_things(attributeName='foo', attributeValue='bar')
things.should.have.key('nextToken')
things.should.have.key('things').which.should.have.length_of(50)
things['things'][0]['thingName'].should.equal('3')
things['things'][0]['thingArn'].should.equal('arn:aws:iot:ap-northeast-1:1:thing/3')
things['things'][-1]['thingName'].should.equal('150')
things['things'][-1]['thingArn'].should.equal('arn:aws:iot:ap-northeast-1:1:thing/150')
all(item['attributes'] == {'foo': 'bar'} for item in things['things'])
things = client.list_things(nextToken=things['nextToken'], attributeName='foo', attributeValue='bar')
things.should_not.have.key('nextToken')
things.should.have.key('things').which.should.have.length_of(16)
things['things'][0]['thingName'].should.equal('153')
things['things'][0]['thingArn'].should.equal('arn:aws:iot:ap-northeast-1:1:thing/153')
things['things'][-1]['thingName'].should.equal('198')
things['things'][-1]['thingArn'].should.equal('arn:aws:iot:ap-northeast-1:1:thing/198')
all(item['attributes'] == {'foo': 'bar'} for item in things['things'])
# Test filter for attributes and thingTypeName
things = client.list_things(thingTypeName=thing_type_name, attributeName='foo', attributeValue='bar')
things.should_not.have.key('nextToken')
things.should.have.key('things').which.should.have.length_of(33)
things['things'][0]['thingName'].should.equal('6')
things['things'][0]['thingArn'].should.equal('arn:aws:iot:ap-northeast-1:1:thing/6')
things['things'][-1]['thingName'].should.equal('198')
things['things'][-1]['thingArn'].should.equal('arn:aws:iot:ap-northeast-1:1:thing/198')
all(item['attributes'] == {'foo': 'bar'} and item['thingTypeName'] == thing_type_name for item in things['things'])
@mock_iot
def test_certs():
client = boto3.client('iot', region_name='ap-northeast-1')
@ -204,7 +365,6 @@ def test_principal_thing():
@mock_iot
def test_thing_groups():
client = boto3.client('iot', region_name='ap-northeast-1')
name = 'my-thing'
group_name = 'my-group-name'
# thing group
@ -424,6 +584,7 @@ def test_create_job():
job.should.have.key('jobArn')
job.should.have.key('description')
@mock_iot
def test_describe_job():
client = boto3.client('iot', region_name='eu-west-1')

View file

@ -1,5 +1,6 @@
import boto3
import sure # noqa
import six
from botocore.exceptions import ClientError
from moto import mock_logs, settings
@ -47,7 +48,7 @@ def test_exceptions():
logEvents=[
{
'timestamp': 0,
'message': 'line'
'message': 'line'
},
],
)
@ -79,7 +80,7 @@ def test_put_logs():
{'timestamp': 0, 'message': 'hello'},
{'timestamp': 0, 'message': 'world'}
]
conn.put_log_events(
putRes = conn.put_log_events(
logGroupName=log_group_name,
logStreamName=log_stream_name,
logEvents=messages
@ -89,6 +90,9 @@ def test_put_logs():
logStreamName=log_stream_name
)
events = res['events']
nextSequenceToken = putRes['nextSequenceToken']
assert isinstance(nextSequenceToken, six.string_types) == True
assert len(nextSequenceToken) == 56
events.should.have.length_of(2)

View file

@ -33,6 +33,7 @@ def test_create_database():
db_instance['DBInstanceIdentifier'].should.equal("db-master-1")
db_instance['IAMDatabaseAuthenticationEnabled'].should.equal(False)
db_instance['DbiResourceId'].should.contain("db-")
db_instance['CopyTagsToSnapshot'].should.equal(False)
@mock_rds2
@ -339,6 +340,49 @@ def test_create_db_snapshots():
snapshot.get('Engine').should.equal('postgres')
snapshot.get('DBInstanceIdentifier').should.equal('db-primary-1')
snapshot.get('DBSnapshotIdentifier').should.equal('g-1')
result = conn.list_tags_for_resource(ResourceName=snapshot['DBSnapshotArn'])
result['TagList'].should.equal([])
@mock_rds2
def test_create_db_snapshots_copy_tags():
conn = boto3.client('rds', region_name='us-west-2')
conn.create_db_snapshot.when.called_with(
DBInstanceIdentifier='db-primary-1',
DBSnapshotIdentifier='snapshot-1').should.throw(ClientError)
conn.create_db_instance(DBInstanceIdentifier='db-primary-1',
AllocatedStorage=10,
Engine='postgres',
DBName='staging-postgres',
DBInstanceClass='db.m1.small',
MasterUsername='root',
MasterUserPassword='hunter2',
Port=1234,
DBSecurityGroups=["my_sg"],
CopyTagsToSnapshot=True,
Tags=[
{
'Key': 'foo',
'Value': 'bar',
},
{
'Key': 'foo1',
'Value': 'bar1',
},
])
snapshot = conn.create_db_snapshot(DBInstanceIdentifier='db-primary-1',
DBSnapshotIdentifier='g-1').get('DBSnapshot')
snapshot.get('Engine').should.equal('postgres')
snapshot.get('DBInstanceIdentifier').should.equal('db-primary-1')
snapshot.get('DBSnapshotIdentifier').should.equal('g-1')
result = conn.list_tags_for_resource(ResourceName=snapshot['DBSnapshotArn'])
result['TagList'].should.equal([{'Value': 'bar',
'Key': 'foo'},
{'Value': 'bar1',
'Key': 'foo1'}])
@mock_rds2
@ -656,6 +700,117 @@ def test_remove_tags_db():
len(result['TagList']).should.equal(1)
@mock_rds2
def test_list_tags_snapshot():
conn = boto3.client('rds', region_name='us-west-2')
result = conn.list_tags_for_resource(
ResourceName='arn:aws:rds:us-west-2:1234567890:snapshot:foo')
result['TagList'].should.equal([])
conn.create_db_instance(DBInstanceIdentifier='db-primary-1',
AllocatedStorage=10,
Engine='postgres',
DBName='staging-postgres',
DBInstanceClass='db.m1.small',
MasterUsername='root',
MasterUserPassword='hunter2',
Port=1234,
DBSecurityGroups=["my_sg"])
snapshot = conn.create_db_snapshot(DBInstanceIdentifier='db-primary-1',
DBSnapshotIdentifier='snapshot-with-tags',
Tags=[
{
'Key': 'foo',
'Value': 'bar',
},
{
'Key': 'foo1',
'Value': 'bar1',
},
])
result = conn.list_tags_for_resource(ResourceName=snapshot['DBSnapshot']['DBSnapshotArn'])
result['TagList'].should.equal([{'Value': 'bar',
'Key': 'foo'},
{'Value': 'bar1',
'Key': 'foo1'}])
@mock_rds2
def test_add_tags_snapshot():
conn = boto3.client('rds', region_name='us-west-2')
conn.create_db_instance(DBInstanceIdentifier='db-primary-1',
AllocatedStorage=10,
Engine='postgres',
DBName='staging-postgres',
DBInstanceClass='db.m1.small',
MasterUsername='root',
MasterUserPassword='hunter2',
Port=1234,
DBSecurityGroups=["my_sg"])
snapshot = conn.create_db_snapshot(DBInstanceIdentifier='db-primary-1',
DBSnapshotIdentifier='snapshot-without-tags',
Tags=[
{
'Key': 'foo',
'Value': 'bar',
},
{
'Key': 'foo1',
'Value': 'bar1',
},
])
result = conn.list_tags_for_resource(
ResourceName='arn:aws:rds:us-west-2:1234567890:snapshot:snapshot-without-tags')
list(result['TagList']).should.have.length_of(2)
conn.add_tags_to_resource(ResourceName='arn:aws:rds:us-west-2:1234567890:snapshot:snapshot-without-tags',
Tags=[
{
'Key': 'foo',
'Value': 'fish',
},
{
'Key': 'foo2',
'Value': 'bar2',
},
])
result = conn.list_tags_for_resource(
ResourceName='arn:aws:rds:us-west-2:1234567890:snapshot:snapshot-without-tags')
list(result['TagList']).should.have.length_of(3)
@mock_rds2
def test_remove_tags_snapshot():
conn = boto3.client('rds', region_name='us-west-2')
conn.create_db_instance(DBInstanceIdentifier='db-primary-1',
AllocatedStorage=10,
Engine='postgres',
DBName='staging-postgres',
DBInstanceClass='db.m1.small',
MasterUsername='root',
MasterUserPassword='hunter2',
Port=1234,
DBSecurityGroups=["my_sg"])
snapshot = conn.create_db_snapshot(DBInstanceIdentifier='db-primary-1',
DBSnapshotIdentifier='snapshot-with-tags',
Tags=[
{
'Key': 'foo',
'Value': 'bar',
},
{
'Key': 'foo1',
'Value': 'bar1',
},
])
result = conn.list_tags_for_resource(
ResourceName='arn:aws:rds:us-west-2:1234567890:snapshot:snapshot-with-tags')
list(result['TagList']).should.have.length_of(2)
conn.remove_tags_from_resource(
ResourceName='arn:aws:rds:us-west-2:1234567890:snapshot:snapshot-with-tags', TagKeys=['foo'])
result = conn.list_tags_for_resource(
ResourceName='arn:aws:rds:us-west-2:1234567890:snapshot:snapshot-with-tags')
len(result['TagList']).should.equal(1)
@mock_rds2
def test_add_tags_option_group():
conn = boto3.client('rds', region_name='us-west-2')

View file

@ -1,5 +1,7 @@
from __future__ import unicode_literals
import datetime
import boto
import boto3
from boto.redshift.exceptions import (
@ -32,6 +34,8 @@ def test_create_cluster_boto3():
MasterUserPassword='password',
)
response['Cluster']['NodeType'].should.equal('ds2.xlarge')
create_time = response['Cluster']['ClusterCreateTime']
create_time.should.be.lower_than(datetime.datetime.now(create_time.tzinfo))
@mock_redshift

View file

@ -2471,6 +2471,72 @@ def test_boto3_delete_markers():
oldest['Key'].should.equal('key-with-versions-and-unicode-ó')
@mock_s3
def test_boto3_multiple_delete_markers():
s3 = boto3.client('s3', region_name='us-east-1')
bucket_name = 'mybucket'
key = u'key-with-versions-and-unicode-ó'
s3.create_bucket(Bucket=bucket_name)
s3.put_bucket_versioning(
Bucket=bucket_name,
VersioningConfiguration={
'Status': 'Enabled'
}
)
items = (six.b('v1'), six.b('v2'))
for body in items:
s3.put_object(
Bucket=bucket_name,
Key=key,
Body=body
)
# Delete the object twice to add multiple delete markers
s3.delete_object(Bucket=bucket_name, Key=key)
s3.delete_object(Bucket=bucket_name, Key=key)
response = s3.list_object_versions(Bucket=bucket_name)
response['DeleteMarkers'].should.have.length_of(2)
with assert_raises(ClientError) as e:
s3.get_object(
Bucket=bucket_name,
Key=key
)
e.response['Error']['Code'].should.equal('404')
# Remove both delete markers to restore the object
s3.delete_object(
Bucket=bucket_name,
Key=key,
VersionId='2'
)
s3.delete_object(
Bucket=bucket_name,
Key=key,
VersionId='3'
)
response = s3.get_object(
Bucket=bucket_name,
Key=key
)
response['Body'].read().should.equal(items[-1])
response = s3.list_object_versions(Bucket=bucket_name)
response['Versions'].should.have.length_of(2)
# We've asserted there is only 2 records so one is newest, one is oldest
latest = list(filter(lambda item: item['IsLatest'], response['Versions']))[0]
oldest = list(filter(lambda item: not item['IsLatest'], response['Versions']))[0]
# Double check ordering of version ID's
latest['VersionId'].should.equal('1')
oldest['VersionId'].should.equal('0')
# Double check the name is still unicode
latest['Key'].should.equal('key-with-versions-and-unicode-ó')
oldest['Key'].should.equal('key-with-versions-and-unicode-ó')
@mock_s3
def test_get_stream_gzipped():
payload = b"this is some stuff here"

View file

@ -101,6 +101,6 @@ def test_s3_default_storage_class():
# tests that the default storage class is still STANDARD
list_of_objects["Contents"][0]["StorageClass"].should.equal("STANDARD")

View file

@ -21,7 +21,7 @@ def test_force_ignore_subdomain_for_bucketnames():
os.environ['S3_IGNORE_SUBDOMAIN_BUCKETNAME'] = '1'
expect(bucket_name_from_url('https://subdomain.localhost:5000/abc/resource')).should.equal(None)
del(os.environ['S3_IGNORE_SUBDOMAIN_BUCKETNAME'])
def test_versioned_key_store():

View file

@ -25,6 +25,15 @@ def test_get_secret_that_does_not_exist():
with assert_raises(ClientError):
result = conn.get_secret_value(SecretId='i-dont-exist')
@mock_secretsmanager
def test_get_secret_that_does_not_match():
conn = boto3.client('secretsmanager', region_name='us-west-2')
create_secret = conn.create_secret(Name='java-util-test-password',
SecretString="foosecret")
with assert_raises(ClientError):
result = conn.get_secret_value(SecretId='i-dont-match')
@mock_secretsmanager
def test_create_secret():
conn = boto3.client('secretsmanager', region_name='us-east-1')
@ -143,3 +152,135 @@ def test_get_random_too_long_password():
with assert_raises(Exception):
random_password = conn.get_random_password(PasswordLength=5555)
@mock_secretsmanager
def test_describe_secret():
conn = boto3.client('secretsmanager', region_name='us-west-2')
conn.create_secret(Name='test-secret',
SecretString='foosecret')
secret_description = conn.describe_secret(SecretId='test-secret')
assert secret_description # Returned dict is not empty
assert secret_description['ARN'] == (
'arn:aws:secretsmanager:us-west-2:1234567890:secret:test-secret-rIjad')
@mock_secretsmanager
def test_describe_secret_that_does_not_exist():
conn = boto3.client('secretsmanager', region_name='us-west-2')
with assert_raises(ClientError):
result = conn.get_secret_value(SecretId='i-dont-exist')
@mock_secretsmanager
def test_describe_secret_that_does_not_match():
conn = boto3.client('secretsmanager', region_name='us-west-2')
conn.create_secret(Name='test-secret',
SecretString='foosecret')
with assert_raises(ClientError):
result = conn.get_secret_value(SecretId='i-dont-match')
@mock_secretsmanager
def test_rotate_secret():
secret_name = 'test-secret'
conn = boto3.client('secretsmanager', region_name='us-west-2')
conn.create_secret(Name=secret_name,
SecretString='foosecret')
rotated_secret = conn.rotate_secret(SecretId=secret_name)
assert rotated_secret
assert rotated_secret['ARN'] == (
'arn:aws:secretsmanager:us-west-2:1234567890:secret:test-secret-rIjad'
)
assert rotated_secret['Name'] == secret_name
assert rotated_secret['VersionId'] != ''
@mock_secretsmanager
def test_rotate_secret_enable_rotation():
secret_name = 'test-secret'
conn = boto3.client('secretsmanager', region_name='us-west-2')
conn.create_secret(Name=secret_name,
SecretString='foosecret')
initial_description = conn.describe_secret(SecretId=secret_name)
assert initial_description
assert initial_description['RotationEnabled'] is False
assert initial_description['RotationRules']['AutomaticallyAfterDays'] == 0
conn.rotate_secret(SecretId=secret_name,
RotationRules={'AutomaticallyAfterDays': 42})
rotated_description = conn.describe_secret(SecretId=secret_name)
assert rotated_description
assert rotated_description['RotationEnabled'] is True
assert rotated_description['RotationRules']['AutomaticallyAfterDays'] == 42
@mock_secretsmanager
def test_rotate_secret_that_does_not_exist():
conn = boto3.client('secretsmanager', 'us-west-2')
with assert_raises(ClientError):
result = conn.rotate_secret(SecretId='i-dont-exist')
@mock_secretsmanager
def test_rotate_secret_that_does_not_match():
conn = boto3.client('secretsmanager', region_name='us-west-2')
conn.create_secret(Name='test-secret',
SecretString='foosecret')
with assert_raises(ClientError):
result = conn.rotate_secret(SecretId='i-dont-match')
@mock_secretsmanager
def test_rotate_secret_client_request_token_too_short():
# Test is intentionally empty. Boto3 catches too short ClientRequestToken
# and raises ParamValidationError before Moto can see it.
# test_server actually handles this error.
assert True
@mock_secretsmanager
def test_rotate_secret_client_request_token_too_long():
secret_name = 'test-secret'
conn = boto3.client('secretsmanager', region_name='us-west-2')
conn.create_secret(Name=secret_name,
SecretString='foosecret')
client_request_token = (
'ED9F8B6C-85B7-446A-B7E4-38F2A3BEB13C-'
'ED9F8B6C-85B7-446A-B7E4-38F2A3BEB13C'
)
with assert_raises(ClientError):
result = conn.rotate_secret(SecretId=secret_name,
ClientRequestToken=client_request_token)
@mock_secretsmanager
def test_rotate_secret_rotation_lambda_arn_too_long():
secret_name = 'test-secret'
conn = boto3.client('secretsmanager', region_name='us-west-2')
conn.create_secret(Name=secret_name,
SecretString='foosecret')
rotation_lambda_arn = '85B7-446A-B7E4' * 147 # == 2058 characters
with assert_raises(ClientError):
result = conn.rotate_secret(SecretId=secret_name,
RotationLambdaARN=rotation_lambda_arn)
@mock_secretsmanager
def test_rotate_secret_rotation_period_zero():
# Test is intentionally empty. Boto3 catches zero day rotation period
# and raises ParamValidationError before Moto can see it.
# test_server actually handles this error.
assert True
@mock_secretsmanager
def test_rotate_secret_rotation_period_too_long():
secret_name = 'test-secret'
conn = boto3.client('secretsmanager', region_name='us-west-2')
conn.create_secret(Name=secret_name,
SecretString='foosecret')
rotation_rules = {'AutomaticallyAfterDays': 1001}
with assert_raises(ClientError):
result = conn.rotate_secret(SecretId=secret_name,
RotationRules=rotation_rules)

View file

@ -49,6 +49,27 @@ def test_get_secret_that_does_not_exist():
assert json_data['message'] == "Secrets Manager can't find the specified secret"
assert json_data['__type'] == 'ResourceNotFoundException'
@mock_secretsmanager
def test_get_secret_that_does_not_match():
backend = server.create_backend_app("secretsmanager")
test_client = backend.test_client()
create_secret = test_client.post('/',
data={"Name": "test-secret",
"SecretString": "foo-secret"},
headers={
"X-Amz-Target": "secretsmanager.CreateSecret"},
)
get_secret = test_client.post('/',
data={"SecretId": "i-dont-match",
"VersionStage": "AWSCURRENT"},
headers={
"X-Amz-Target": "secretsmanager.GetSecretValue"},
)
json_data = json.loads(get_secret.data.decode("utf-8"))
assert json_data['message'] == "Secrets Manager can't find the specified secret"
assert json_data['__type'] == 'ResourceNotFoundException'
@mock_secretsmanager
def test_create_secret():
@ -66,3 +87,335 @@ def test_create_secret():
assert json_data['ARN'] == (
'arn:aws:secretsmanager:us-east-1:1234567890:secret:test-secret-rIjad')
assert json_data['Name'] == 'test-secret'
@mock_secretsmanager
def test_describe_secret():
backend = server.create_backend_app('secretsmanager')
test_client = backend.test_client()
create_secret = test_client.post('/',
data={"Name": "test-secret",
"SecretString": "foosecret"},
headers={
"X-Amz-Target": "secretsmanager.CreateSecret"
},
)
describe_secret = test_client.post('/',
data={"SecretId": "test-secret"},
headers={
"X-Amz-Target": "secretsmanager.DescribeSecret"
},
)
json_data = json.loads(describe_secret.data.decode("utf-8"))
assert json_data # Returned dict is not empty
assert json_data['ARN'] == (
'arn:aws:secretsmanager:us-east-1:1234567890:secret:test-secret-rIjad'
)
@mock_secretsmanager
def test_describe_secret_that_does_not_exist():
backend = server.create_backend_app('secretsmanager')
test_client = backend.test_client()
describe_secret = test_client.post('/',
data={"SecretId": "i-dont-exist"},
headers={
"X-Amz-Target": "secretsmanager.DescribeSecret"
},
)
json_data = json.loads(describe_secret.data.decode("utf-8"))
assert json_data['message'] == "Secrets Manager can't find the specified secret"
assert json_data['__type'] == 'ResourceNotFoundException'
@mock_secretsmanager
def test_describe_secret_that_does_not_match():
backend = server.create_backend_app('secretsmanager')
test_client = backend.test_client()
create_secret = test_client.post('/',
data={"Name": "test-secret",
"SecretString": "foosecret"},
headers={
"X-Amz-Target": "secretsmanager.CreateSecret"
},
)
describe_secret = test_client.post('/',
data={"SecretId": "i-dont-match"},
headers={
"X-Amz-Target": "secretsmanager.DescribeSecret"
},
)
json_data = json.loads(describe_secret.data.decode("utf-8"))
assert json_data['message'] == "Secrets Manager can't find the specified secret"
assert json_data['__type'] == 'ResourceNotFoundException'
@mock_secretsmanager
def test_rotate_secret():
backend = server.create_backend_app('secretsmanager')
test_client = backend.test_client()
create_secret = test_client.post('/',
data={"Name": "test-secret",
"SecretString": "foosecret"},
headers={
"X-Amz-Target": "secretsmanager.CreateSecret"
},
)
client_request_token = "EXAMPLE2-90ab-cdef-fedc-ba987SECRET2"
rotate_secret = test_client.post('/',
data={"SecretId": "test-secret",
"ClientRequestToken": client_request_token},
headers={
"X-Amz-Target": "secretsmanager.RotateSecret"
},
)
json_data = json.loads(rotate_secret.data.decode("utf-8"))
assert json_data # Returned dict is not empty
assert json_data['ARN'] == (
'arn:aws:secretsmanager:us-east-1:1234567890:secret:test-secret-rIjad'
)
assert json_data['Name'] == 'test-secret'
assert json_data['VersionId'] == client_request_token
# @mock_secretsmanager
# def test_rotate_secret_enable_rotation():
# backend = server.create_backend_app('secretsmanager')
# test_client = backend.test_client()
# create_secret = test_client.post(
# '/',
# data={
# "Name": "test-secret",
# "SecretString": "foosecret"
# },
# headers={
# "X-Amz-Target": "secretsmanager.CreateSecret"
# },
# )
# initial_description = test_client.post(
# '/',
# data={
# "SecretId": "test-secret"
# },
# headers={
# "X-Amz-Target": "secretsmanager.DescribeSecret"
# },
# )
# json_data = json.loads(initial_description.data.decode("utf-8"))
# assert json_data # Returned dict is not empty
# assert json_data['RotationEnabled'] is False
# assert json_data['RotationRules']['AutomaticallyAfterDays'] == 0
# rotate_secret = test_client.post(
# '/',
# data={
# "SecretId": "test-secret",
# "RotationRules": {"AutomaticallyAfterDays": 42}
# },
# headers={
# "X-Amz-Target": "secretsmanager.RotateSecret"
# },
# )
# rotated_description = test_client.post(
# '/',
# data={
# "SecretId": "test-secret"
# },
# headers={
# "X-Amz-Target": "secretsmanager.DescribeSecret"
# },
# )
# json_data = json.loads(rotated_description.data.decode("utf-8"))
# assert json_data # Returned dict is not empty
# assert json_data['RotationEnabled'] is True
# assert json_data['RotationRules']['AutomaticallyAfterDays'] == 42
@mock_secretsmanager
def test_rotate_secret_that_does_not_exist():
backend = server.create_backend_app('secretsmanager')
test_client = backend.test_client()
rotate_secret = test_client.post('/',
data={"SecretId": "i-dont-exist"},
headers={
"X-Amz-Target": "secretsmanager.RotateSecret"
},
)
json_data = json.loads(rotate_secret.data.decode("utf-8"))
assert json_data['message'] == "Secrets Manager can't find the specified secret"
assert json_data['__type'] == 'ResourceNotFoundException'
@mock_secretsmanager
def test_rotate_secret_that_does_not_match():
backend = server.create_backend_app('secretsmanager')
test_client = backend.test_client()
create_secret = test_client.post('/',
data={"Name": "test-secret",
"SecretString": "foosecret"},
headers={
"X-Amz-Target": "secretsmanager.CreateSecret"
},
)
rotate_secret = test_client.post('/',
data={"SecretId": "i-dont-match"},
headers={
"X-Amz-Target": "secretsmanager.RotateSecret"
},
)
json_data = json.loads(rotate_secret.data.decode("utf-8"))
assert json_data['message'] == "Secrets Manager can't find the specified secret"
assert json_data['__type'] == 'ResourceNotFoundException'
@mock_secretsmanager
def test_rotate_secret_client_request_token_too_short():
backend = server.create_backend_app('secretsmanager')
test_client = backend.test_client()
create_secret = test_client.post('/',
data={"Name": "test-secret",
"SecretString": "foosecret"},
headers={
"X-Amz-Target": "secretsmanager.CreateSecret"
},
)
client_request_token = "ED9F8B6C-85B7-B7E4-38F2A3BEB13C"
rotate_secret = test_client.post('/',
data={"SecretId": "test-secret",
"ClientRequestToken": client_request_token},
headers={
"X-Amz-Target": "secretsmanager.RotateSecret"
},
)
json_data = json.loads(rotate_secret.data.decode("utf-8"))
assert json_data['message'] == "ClientRequestToken must be 32-64 characters long."
assert json_data['__type'] == 'InvalidParameterException'
@mock_secretsmanager
def test_rotate_secret_client_request_token_too_long():
backend = server.create_backend_app('secretsmanager')
test_client = backend.test_client()
create_secret = test_client.post('/',
data={"Name": "test-secret",
"SecretString": "foosecret"},
headers={
"X-Amz-Target": "secretsmanager.CreateSecret"
},
)
client_request_token = (
'ED9F8B6C-85B7-446A-B7E4-38F2A3BEB13C-'
'ED9F8B6C-85B7-446A-B7E4-38F2A3BEB13C'
)
rotate_secret = test_client.post('/',
data={"SecretId": "test-secret",
"ClientRequestToken": client_request_token},
headers={
"X-Amz-Target": "secretsmanager.RotateSecret"
},
)
json_data = json.loads(rotate_secret.data.decode("utf-8"))
assert json_data['message'] == "ClientRequestToken must be 32-64 characters long."
assert json_data['__type'] == 'InvalidParameterException'
@mock_secretsmanager
def test_rotate_secret_rotation_lambda_arn_too_long():
backend = server.create_backend_app('secretsmanager')
test_client = backend.test_client()
create_secret = test_client.post('/',
data={"Name": "test-secret",
"SecretString": "foosecret"},
headers={
"X-Amz-Target": "secretsmanager.CreateSecret"
},
)
rotation_lambda_arn = '85B7-446A-B7E4' * 147 # == 2058 characters
rotate_secret = test_client.post('/',
data={"SecretId": "test-secret",
"RotationLambdaARN": rotation_lambda_arn},
headers={
"X-Amz-Target": "secretsmanager.RotateSecret"
},
)
json_data = json.loads(rotate_secret.data.decode("utf-8"))
assert json_data['message'] == "RotationLambdaARN must <= 2048 characters long."
assert json_data['__type'] == 'InvalidParameterException'
#
# The following tests should work, but fail on the embedded dict in
# RotationRules. The error message suggests a problem deeper in the code, which
# needs further investigation.
#
# @mock_secretsmanager
# def test_rotate_secret_rotation_period_zero():
# backend = server.create_backend_app('secretsmanager')
# test_client = backend.test_client()
# create_secret = test_client.post('/',
# data={"Name": "test-secret",
# "SecretString": "foosecret"},
# headers={
# "X-Amz-Target": "secretsmanager.CreateSecret"
# },
# )
# rotate_secret = test_client.post('/',
# data={"SecretId": "test-secret",
# "RotationRules": {"AutomaticallyAfterDays": 0}},
# headers={
# "X-Amz-Target": "secretsmanager.RotateSecret"
# },
# )
# json_data = json.loads(rotate_secret.data.decode("utf-8"))
# assert json_data['message'] == "RotationRules.AutomaticallyAfterDays must be within 1-1000."
# assert json_data['__type'] == 'InvalidParameterException'
# @mock_secretsmanager
# def test_rotate_secret_rotation_period_too_long():
# backend = server.create_backend_app('secretsmanager')
# test_client = backend.test_client()
# create_secret = test_client.post('/',
# data={"Name": "test-secret",
# "SecretString": "foosecret"},
# headers={
# "X-Amz-Target": "secretsmanager.CreateSecret"
# },
# )
# rotate_secret = test_client.post('/',
# data={"SecretId": "test-secret",
# "RotationRules": {"AutomaticallyAfterDays": 1001}},
# headers={
# "X-Amz-Target": "secretsmanager.RotateSecret"
# },
# )
# json_data = json.loads(rotate_secret.data.decode("utf-8"))
# assert json_data['message'] == "RotationRules.AutomaticallyAfterDays must be within 1-1000."
# assert json_data['__type'] == 'InvalidParameterException'

View file

@ -40,6 +40,33 @@ def test_create_fifo_queue_fail():
raise RuntimeError('Should of raised InvalidParameterValue Exception')
@mock_sqs
def test_create_queue_with_same_attributes():
sqs = boto3.client('sqs', region_name='us-east-1')
dlq_url = sqs.create_queue(QueueName='test-queue-dlq')['QueueUrl']
dlq_arn = sqs.get_queue_attributes(QueueUrl=dlq_url)['Attributes']['QueueArn']
attributes = {
'DelaySeconds': '900',
'MaximumMessageSize': '262144',
'MessageRetentionPeriod': '1209600',
'ReceiveMessageWaitTimeSeconds': '20',
'RedrivePolicy': '{"deadLetterTargetArn": "%s", "maxReceiveCount": 100}' % (dlq_arn),
'VisibilityTimeout': '43200'
}
sqs.create_queue(
QueueName='test-queue',
Attributes=attributes
)
sqs.create_queue(
QueueName='test-queue',
Attributes=attributes
)
@mock_sqs
def test_create_queue_with_different_attributes_fail():
sqs = boto3.client('sqs', region_name='us-east-1')
@ -1195,3 +1222,16 @@ def test_receive_messages_with_message_group_id_on_visibility_timeout():
messages = queue.receive_messages()
messages.should.have.length_of(1)
messages[0].message_id.should.equal(message.message_id)
@mock_sqs
def test_receive_message_for_queue_with_receive_message_wait_time_seconds_set():
sqs = boto3.resource('sqs', region_name='us-east-1')
queue = sqs.create_queue(
QueueName='test-queue',
Attributes={
'ReceiveMessageWaitTimeSeconds': '2',
}
)
queue.receive_messages()

View file

@ -5,11 +5,12 @@ import botocore.exceptions
import sure # noqa
import datetime
import uuid
import json
from botocore.exceptions import ClientError
from nose.tools import assert_raises
from moto import mock_ssm
from moto import mock_ssm, mock_cloudformation
@mock_ssm
@ -668,3 +669,118 @@ def test_list_commands():
with assert_raises(ClientError):
response = client.list_commands(
CommandId=str(uuid.uuid4()))
@mock_ssm
def test_get_command_invocation():
client = boto3.client('ssm', region_name='us-east-1')
ssm_document = 'AWS-RunShellScript'
params = {'commands': ['#!/bin/bash\necho \'hello world\'']}
response = client.send_command(
InstanceIds=['i-123456', 'i-234567', 'i-345678'],
DocumentName=ssm_document,
Parameters=params,
OutputS3Region='us-east-2',
OutputS3BucketName='the-bucket',
OutputS3KeyPrefix='pref')
cmd = response['Command']
cmd_id = cmd['CommandId']
instance_id = 'i-345678'
invocation_response = client.get_command_invocation(
CommandId=cmd_id,
InstanceId=instance_id,
PluginName='aws:runShellScript')
invocation_response['CommandId'].should.equal(cmd_id)
invocation_response['InstanceId'].should.equal(instance_id)
# test the error case for an invalid instance id
with assert_raises(ClientError):
invocation_response = client.get_command_invocation(
CommandId=cmd_id,
InstanceId='i-FAKE')
# test the error case for an invalid plugin name
with assert_raises(ClientError):
invocation_response = client.get_command_invocation(
CommandId=cmd_id,
InstanceId=instance_id,
PluginName='FAKE')
@mock_ssm
@mock_cloudformation
def test_get_command_invocations_from_stack():
stack_template = {
"AWSTemplateFormatVersion": "2010-09-09",
"Description": "Test Stack",
"Resources": {
"EC2Instance1": {
"Type": "AWS::EC2::Instance",
"Properties": {
"ImageId": "ami-test-image-id",
"KeyName": "test",
"InstanceType": "t2.micro",
"Tags": [
{
"Key": "Test Description",
"Value": "Test tag"
},
{
"Key": "Test Name",
"Value": "Name tag for tests"
}
]
}
}
},
"Outputs": {
"test": {
"Description": "Test Output",
"Value": "Test output value",
"Export": {
"Name": "Test value to export"
}
},
"PublicIP": {
"Value": "Test public ip"
}
}
}
cloudformation_client = boto3.client(
'cloudformation',
region_name='us-east-1')
stack_template_str = json.dumps(stack_template)
response = cloudformation_client.create_stack(
StackName='test_stack',
TemplateBody=stack_template_str,
Capabilities=('CAPABILITY_IAM', ))
client = boto3.client('ssm', region_name='us-east-1')
ssm_document = 'AWS-RunShellScript'
params = {'commands': ['#!/bin/bash\necho \'hello world\'']}
response = client.send_command(
Targets=[{
'Key': 'tag:aws:cloudformation:stack-name',
'Values': ('test_stack', )}],
DocumentName=ssm_document,
Parameters=params,
OutputS3Region='us-east-2',
OutputS3BucketName='the-bucket',
OutputS3KeyPrefix='pref')
cmd = response['Command']
cmd_id = cmd['CommandId']
instance_ids = cmd['InstanceIds']
invocation_response = client.get_command_invocation(
CommandId=cmd_id,
InstanceId=instance_ids[0],
PluginName='aws:runShellScript')