Merge remote-tracking branch 'upstream/master'

This commit is contained in:
Stephan Huber 2018-09-12 11:48:19 +02:00
commit 88596518f5
61 changed files with 2231 additions and 264 deletions

View file

@ -981,7 +981,7 @@ def test_api_keys():
apikey['value'].should.equal(apikey_value)
apikey_name = 'TESTKEY2'
payload = {'name': apikey_name, 'generateDistinctId': True}
payload = {'name': apikey_name }
response = client.create_api_key(**payload)
apikey_id = response['id']
apikey = client.get_api_key(apiKey=apikey_id)
@ -995,3 +995,92 @@ def test_api_keys():
response = client.get_api_keys()
len(response['items']).should.equal(1)
@mock_apigateway
def test_usage_plans():
region_name = 'us-west-2'
client = boto3.client('apigateway', region_name=region_name)
response = client.get_usage_plans()
len(response['items']).should.equal(0)
usage_plan_name = 'TEST-PLAN'
payload = {'name': usage_plan_name}
response = client.create_usage_plan(**payload)
usage_plan = client.get_usage_plan(usagePlanId=response['id'])
usage_plan['name'].should.equal(usage_plan_name)
usage_plan['apiStages'].should.equal([])
usage_plan_name = 'TEST-PLAN-2'
usage_plan_description = 'Description'
usage_plan_quota = {'limit': 10, 'period': 'DAY', 'offset': 0}
usage_plan_throttle = {'rateLimit': 2, 'burstLimit': 1}
usage_plan_api_stages = [{'apiId': 'foo', 'stage': 'bar'}]
payload = {'name': usage_plan_name, 'description': usage_plan_description, 'quota': usage_plan_quota, 'throttle': usage_plan_throttle, 'apiStages': usage_plan_api_stages}
response = client.create_usage_plan(**payload)
usage_plan_id = response['id']
usage_plan = client.get_usage_plan(usagePlanId=usage_plan_id)
usage_plan['name'].should.equal(usage_plan_name)
usage_plan['description'].should.equal(usage_plan_description)
usage_plan['apiStages'].should.equal(usage_plan_api_stages)
usage_plan['throttle'].should.equal(usage_plan_throttle)
usage_plan['quota'].should.equal(usage_plan_quota)
response = client.get_usage_plans()
len(response['items']).should.equal(2)
client.delete_usage_plan(usagePlanId=usage_plan_id)
response = client.get_usage_plans()
len(response['items']).should.equal(1)
@mock_apigateway
def test_usage_plan_keys():
region_name = 'us-west-2'
usage_plan_id = 'test_usage_plan_id'
client = boto3.client('apigateway', region_name=region_name)
usage_plan_id = "test"
# Create an API key so we can use it
key_name = 'test-api-key'
response = client.create_api_key(name=key_name)
key_id = response["id"]
key_value = response["value"]
# Get current plan keys (expect none)
response = client.get_usage_plan_keys(usagePlanId=usage_plan_id)
len(response['items']).should.equal(0)
# Create usage plan key
key_type = 'API_KEY'
payload = {'usagePlanId': usage_plan_id, 'keyId': key_id, 'keyType': key_type }
response = client.create_usage_plan_key(**payload)
usage_plan_key_id = response["id"]
# Get current plan keys (expect 1)
response = client.get_usage_plan_keys(usagePlanId=usage_plan_id)
len(response['items']).should.equal(1)
# Get a single usage plan key and check it matches the created one
usage_plan_key = client.get_usage_plan_key(usagePlanId=usage_plan_id, keyId=usage_plan_key_id)
usage_plan_key['name'].should.equal(key_name)
usage_plan_key['id'].should.equal(key_id)
usage_plan_key['type'].should.equal(key_type)
usage_plan_key['value'].should.equal(key_value)
# Delete usage plan key
client.delete_usage_plan_key(usagePlanId=usage_plan_id, keyId=key_id)
# Get current plan keys (expect none)
response = client.get_usage_plan_keys(usagePlanId=usage_plan_id)
len(response['items']).should.equal(0)
@mock_apigateway
def test_create_usage_plan_key_non_existent_api_key():
region_name = 'us-west-2'
usage_plan_id = 'test_usage_plan_id'
client = boto3.client('apigateway', region_name=region_name)
usage_plan_id = "test"
# Attempt to create a usage plan key for a API key that doesn't exists
payload = {'usagePlanId': usage_plan_id, 'keyId': 'non-existent', 'keyType': 'API_KEY' }
client.create_usage_plan_key.when.called_with(**payload).should.throw(ClientError)

View file

@ -1,5 +1,6 @@
from __future__ import unicode_literals
import sure # noqa
import json
import moto.server as server
@ -9,8 +10,82 @@ Test the different server responses
def test_list_apis():
backend = server.create_backend_app("apigateway")
backend = server.create_backend_app('apigateway')
test_client = backend.test_client()
res = test_client.get('/restapis')
res.data.should.equal(b'{"item": []}')
def test_usage_plans_apis():
backend = server.create_backend_app('apigateway')
test_client = backend.test_client()
# List usage plans (expect empty)
res = test_client.get('/usageplans')
json.loads(res.data)["item"].should.have.length_of(0)
# Create usage plan
res = test_client.post('/usageplans', data=json.dumps({'name': 'test'}))
created_plan = json.loads(res.data)
created_plan['name'].should.equal('test')
# List usage plans (expect 1 plan)
res = test_client.get('/usageplans')
json.loads(res.data)["item"].should.have.length_of(1)
# Get single usage plan
res = test_client.get('/usageplans/{0}'.format(created_plan["id"]))
fetched_plan = json.loads(res.data)
fetched_plan.should.equal(created_plan)
# Delete usage plan
res = test_client.delete('/usageplans/{0}'.format(created_plan["id"]))
res.data.should.equal(b'{}')
# List usage plans (expect empty again)
res = test_client.get('/usageplans')
json.loads(res.data)["item"].should.have.length_of(0)
def test_usage_plans_keys():
backend = server.create_backend_app('apigateway')
test_client = backend.test_client()
usage_plan_id = 'test_plan_id'
# Create API key to be used in tests
res = test_client.post('/apikeys', data=json.dumps({'name': 'test'}))
created_api_key = json.loads(res.data)
# List usage plans keys (expect empty)
res = test_client.get('/usageplans/{0}/keys'.format(usage_plan_id))
json.loads(res.data)["item"].should.have.length_of(0)
# Create usage plan key
res = test_client.post('/usageplans/{0}/keys'.format(usage_plan_id), data=json.dumps({'keyId': created_api_key["id"], 'keyType': 'API_KEY'}))
created_usage_plan_key = json.loads(res.data)
# List usage plans keys (expect 1 key)
res = test_client.get('/usageplans/{0}/keys'.format(usage_plan_id))
json.loads(res.data)["item"].should.have.length_of(1)
# Get single usage plan key
res = test_client.get('/usageplans/{0}/keys/{1}'.format(usage_plan_id, created_api_key["id"]))
fetched_plan_key = json.loads(res.data)
fetched_plan_key.should.equal(created_usage_plan_key)
# Delete usage plan key
res = test_client.delete('/usageplans/{0}/keys/{1}'.format(usage_plan_id, created_api_key["id"]))
res.data.should.equal(b'{}')
# List usage plans keys (expect to be empty again)
res = test_client.get('/usageplans/{0}/keys'.format(usage_plan_id))
json.loads(res.data)["item"].should.have.length_of(0)
def test_create_usage_plans_key_non_existent_api_key():
backend = server.create_backend_app('apigateway')
test_client = backend.test_client()
usage_plan_id = 'test_plan_id'
# Create usage plan key with non-existent api key
res = test_client.post('/usageplans/{0}/keys'.format(usage_plan_id), data=json.dumps({'keyId': 'non-existent', 'keyType': 'API_KEY'}))
res.status_code.should.equal(404)

View file

@ -0,0 +1,39 @@
from __future__ import unicode_literals
template = {
"AWSTemplateFormatVersion": "2010-09-09",
"Description": "AWS CloudFormation Sample Template to create a KMS Key. The Fn::GetAtt is used to retrieve the ARN",
"Resources" : {
"myKey" : {
"Type" : "AWS::KMS::Key",
"Properties" : {
"Description": "Sample KmsKey",
"EnableKeyRotation": False,
"Enabled": True,
"KeyPolicy" : {
"Version": "2012-10-17",
"Id": "key-default-1",
"Statement": [
{
"Sid": "Enable IAM User Permissions",
"Effect": "Allow",
"Principal": {
"AWS": { "Fn::Join" : ["" , ["arn:aws:iam::", {"Ref" : "AWS::AccountId"} ,":root" ]] }
},
"Action": "kms:*",
"Resource": "*"
}
]
}
}
}
},
"Outputs" : {
"KeyArn" : {
"Description": "Generated Key Arn",
"Value" : { "Fn::GetAtt" : [ "myKey", "Arn" ] }
}
}
}

View file

@ -254,6 +254,21 @@ def test_parse_stack_with_get_attribute_outputs():
output.should.be.a(Output)
output.value.should.equal("my-queue")
def test_parse_stack_with_get_attribute_kms():
from .fixtures.kms_key import template
template_json = json.dumps(template)
stack = FakeStack(
stack_id="test_id",
name="test_stack",
template=template_json,
parameters={},
region_name='us-west-1')
stack.output_map.should.have.length_of(1)
list(stack.output_map.keys())[0].should.equal('KeyArn')
output = list(stack.output_map.values())[0]
output.should.be.a(Output)
def test_parse_stack_with_get_availability_zones():
stack = FakeStack(

View file

@ -596,7 +596,50 @@ def test_boto3_conditions():
@mock_dynamodb2
def test_boto3_put_item_conditions_fails():
def test_boto3_put_item_conditions_pass():
table = _create_user_table()
table.put_item(Item={'username': 'johndoe', 'foo': 'bar'})
table.put_item(
Item={'username': 'johndoe', 'foo': 'baz'},
Expected={
'foo': {
'ComparisonOperator': 'EQ',
'AttributeValueList': ['bar']
}
})
final_item = table.get_item(Key={'username': 'johndoe'})
assert dict(final_item)['Item']['foo'].should.equal("baz")
@mock_dynamodb2
def test_boto3_put_item_conditions_pass_because_expect_not_exists_by_compare_to_null():
table = _create_user_table()
table.put_item(Item={'username': 'johndoe', 'foo': 'bar'})
table.put_item(
Item={'username': 'johndoe', 'foo': 'baz'},
Expected={
'whatever': {
'ComparisonOperator': 'NULL',
}
})
final_item = table.get_item(Key={'username': 'johndoe'})
assert dict(final_item)['Item']['foo'].should.equal("baz")
@mock_dynamodb2
def test_boto3_put_item_conditions_pass_because_expect_exists_by_compare_to_not_null():
table = _create_user_table()
table.put_item(Item={'username': 'johndoe', 'foo': 'bar'})
table.put_item(
Item={'username': 'johndoe', 'foo': 'baz'},
Expected={
'foo': {
'ComparisonOperator': 'NOT_NULL',
}
})
final_item = table.get_item(Key={'username': 'johndoe'})
assert dict(final_item)['Item']['foo'].should.equal("baz")
@mock_dynamodb2
def test_boto3_put_item_conditions_fail():
table = _create_user_table()
table.put_item(Item={'username': 'johndoe', 'foo': 'bar'})
table.put_item.when.called_with(
@ -609,7 +652,7 @@ def test_boto3_put_item_conditions_fails():
}).should.throw(botocore.client.ClientError)
@mock_dynamodb2
def test_boto3_update_item_conditions_fails():
def test_boto3_update_item_conditions_fail():
table = _create_user_table()
table.put_item(Item={'username': 'johndoe', 'foo': 'baz'})
table.update_item.when.called_with(
@ -622,7 +665,7 @@ def test_boto3_update_item_conditions_fails():
}).should.throw(botocore.client.ClientError)
@mock_dynamodb2
def test_boto3_update_item_conditions_fails_because_expect_not_exists():
def test_boto3_update_item_conditions_fail_because_expect_not_exists():
table = _create_user_table()
table.put_item(Item={'username': 'johndoe', 'foo': 'baz'})
table.update_item.when.called_with(
@ -634,6 +677,19 @@ def test_boto3_update_item_conditions_fails_because_expect_not_exists():
}
}).should.throw(botocore.client.ClientError)
@mock_dynamodb2
def test_boto3_update_item_conditions_fail_because_expect_not_exists_by_compare_to_null():
table = _create_user_table()
table.put_item(Item={'username': 'johndoe', 'foo': 'baz'})
table.update_item.when.called_with(
Key={'username': 'johndoe'},
UpdateExpression='SET foo=bar',
Expected={
'foo': {
'ComparisonOperator': 'NULL',
}
}).should.throw(botocore.client.ClientError)
@mock_dynamodb2
def test_boto3_update_item_conditions_pass():
table = _create_user_table()
@ -650,7 +706,7 @@ def test_boto3_update_item_conditions_pass():
assert dict(returned_item)['Item']['foo'].should.equal("baz")
@mock_dynamodb2
def test_boto3_update_item_conditions_pass_because_expext_not_exists():
def test_boto3_update_item_conditions_pass_because_expect_not_exists():
table = _create_user_table()
table.put_item(Item={'username': 'johndoe', 'foo': 'bar'})
table.update_item(
@ -664,6 +720,36 @@ def test_boto3_update_item_conditions_pass_because_expext_not_exists():
returned_item = table.get_item(Key={'username': 'johndoe'})
assert dict(returned_item)['Item']['foo'].should.equal("baz")
@mock_dynamodb2
def test_boto3_update_item_conditions_pass_because_expect_not_exists_by_compare_to_null():
table = _create_user_table()
table.put_item(Item={'username': 'johndoe', 'foo': 'bar'})
table.update_item(
Key={'username': 'johndoe'},
UpdateExpression='SET foo=baz',
Expected={
'whatever': {
'ComparisonOperator': 'NULL',
}
})
returned_item = table.get_item(Key={'username': 'johndoe'})
assert dict(returned_item)['Item']['foo'].should.equal("baz")
@mock_dynamodb2
def test_boto3_update_item_conditions_pass_because_expect_exists_by_compare_to_not_null():
table = _create_user_table()
table.put_item(Item={'username': 'johndoe', 'foo': 'bar'})
table.update_item(
Key={'username': 'johndoe'},
UpdateExpression='SET foo=baz',
Expected={
'foo': {
'ComparisonOperator': 'NOT_NULL',
}
})
returned_item = table.get_item(Key={'username': 'johndoe'})
assert dict(returned_item)['Item']['foo'].should.equal("baz")
@mock_dynamodb2
def test_boto3_put_item_conditions_pass():
table = _create_user_table()

View file

@ -199,7 +199,7 @@ def test_igw_desribe():
@mock_ec2_deprecated
def test_igw_desribe_bad_id():
def test_igw_describe_bad_id():
""" internet gateway fail to fetch by bad id """
conn = boto.connect_vpc('the_key', 'the_secret')
with assert_raises(EC2ResponseError) as cm:

View file

@ -289,9 +289,31 @@ def test_list_images():
len(response['imageIds']).should.be(1)
response['imageIds'][0]['imageTag'].should.equal('oldest')
response = client.list_images(repositoryName='test_repository_2', registryId='109876543210')
type(response['imageIds']).should.be(list)
len(response['imageIds']).should.be(0)
@mock_ecr
def test_list_images_from_repository_that_doesnt_exist():
client = boto3.client('ecr', region_name='us-east-1')
_ = client.create_repository(
repositoryName='test_repository_1'
)
# non existing repo
error_msg = re.compile(
r".*The repository with name 'repo-that-doesnt-exist' does not exist in the registry with id '123'.*",
re.MULTILINE)
client.list_images.when.called_with(
repositoryName='repo-that-doesnt-exist',
registryId='123',
).should.throw(Exception, error_msg)
# repo does not exist in specified registry
error_msg = re.compile(
r".*The repository with name 'test_repository_1' does not exist in the registry with id '222'.*",
re.MULTILINE)
client.list_images.when.called_with(
repositoryName='test_repository_1',
registryId='222',
).should.throw(Exception, error_msg)
@mock_ecr

View file

@ -2,6 +2,7 @@ from __future__ import unicode_literals
from copy import deepcopy
from botocore.exceptions import ClientError
import boto3
import sure # noqa
import json
@ -450,6 +451,21 @@ def test_update_service():
response['service']['desiredCount'].should.equal(0)
@mock_ecs
def test_update_missing_service():
client = boto3.client('ecs', region_name='us-east-1')
_ = client.create_cluster(
clusterName='test_ecs_cluster'
)
client.update_service.when.called_with(
cluster='test_ecs_cluster',
service='test_ecs_service',
taskDefinition='test_ecs_task',
desiredCount=0
).should.throw(ClientError)
@mock_ecs
def test_delete_service():
client = boto3.client('ecs', region_name='us-east-1')
@ -1054,6 +1070,13 @@ def test_describe_tasks():
set([response['tasks'][0]['taskArn'], response['tasks']
[1]['taskArn']]).should.equal(set(tasks_arns))
# Test we can pass task ids instead of ARNs
response = client.describe_tasks(
cluster='test_ecs_cluster',
tasks=[tasks_arns[0].split("/")[-1]]
)
len(response['tasks']).should.equal(1)
@mock_ecs
def describe_task_definition():

View file

@ -0,0 +1 @@
from __future__ import unicode_literals

View file

@ -0,0 +1 @@
from __future__ import unicode_literals

View file

@ -0,0 +1,31 @@
from __future__ import unicode_literals
TABLE_INPUT = {
'Owner': 'a_fake_owner',
'Parameters': {
'EXTERNAL': 'TRUE',
},
'Retention': 0,
'StorageDescriptor': {
'BucketColumns': [],
'Compressed': False,
'InputFormat': 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat',
'NumberOfBuckets': -1,
'OutputFormat': 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat',
'Parameters': {},
'SerdeInfo': {
'Parameters': {
'serialization.format': '1'
},
'SerializationLibrary': 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
},
'SkewedInfo': {
'SkewedColumnNames': [],
'SkewedColumnValueLocationMaps': {},
'SkewedColumnValues': []
},
'SortColumns': [],
'StoredAsSubDirectories': False
},
'TableType': 'EXTERNAL_TABLE',
}

View file

@ -0,0 +1,46 @@
from __future__ import unicode_literals
import copy
from .fixtures.datacatalog import TABLE_INPUT
def create_database(client, database_name):
return client.create_database(
DatabaseInput={
'Name': database_name
}
)
def get_database(client, database_name):
return client.get_database(Name=database_name)
def create_table_input(table_name, s3_location, columns=[], partition_keys=[]):
table_input = copy.deepcopy(TABLE_INPUT)
table_input['Name'] = table_name
table_input['PartitionKeys'] = partition_keys
table_input['StorageDescriptor']['Columns'] = columns
table_input['StorageDescriptor']['Location'] = s3_location
return table_input
def create_table(client, database_name, table_name, table_input):
return client.create_table(
DatabaseName=database_name,
TableInput=table_input
)
def get_table(client, database_name, table_name):
return client.get_table(
DatabaseName=database_name,
Name=table_name
)
def get_tables(client, database_name):
return client.get_tables(
DatabaseName=database_name
)

View file

@ -0,0 +1,108 @@
from __future__ import unicode_literals
import sure # noqa
from nose.tools import assert_raises
import boto3
from botocore.client import ClientError
from moto import mock_glue
from . import helpers
@mock_glue
def test_create_database():
client = boto3.client('glue', region_name='us-east-1')
database_name = 'myspecialdatabase'
helpers.create_database(client, database_name)
response = helpers.get_database(client, database_name)
database = response['Database']
database.should.equal({'Name': database_name})
@mock_glue
def test_create_database_already_exists():
client = boto3.client('glue', region_name='us-east-1')
database_name = 'cantcreatethisdatabasetwice'
helpers.create_database(client, database_name)
with assert_raises(ClientError) as exc:
helpers.create_database(client, database_name)
exc.exception.response['Error']['Code'].should.equal('DatabaseAlreadyExistsException')
@mock_glue
def test_create_table():
client = boto3.client('glue', region_name='us-east-1')
database_name = 'myspecialdatabase'
helpers.create_database(client, database_name)
table_name = 'myspecialtable'
s3_location = 's3://my-bucket/{database_name}/{table_name}'.format(
database_name=database_name,
table_name=table_name
)
table_input = helpers.create_table_input(table_name, s3_location)
helpers.create_table(client, database_name, table_name, table_input)
response = helpers.get_table(client, database_name, table_name)
table = response['Table']
table['Name'].should.equal(table_input['Name'])
table['StorageDescriptor'].should.equal(table_input['StorageDescriptor'])
table['PartitionKeys'].should.equal(table_input['PartitionKeys'])
@mock_glue
def test_create_table_already_exists():
client = boto3.client('glue', region_name='us-east-1')
database_name = 'myspecialdatabase'
helpers.create_database(client, database_name)
table_name = 'cantcreatethistabletwice'
s3_location = 's3://my-bucket/{database_name}/{table_name}'.format(
database_name=database_name,
table_name=table_name
)
table_input = helpers.create_table_input(table_name, s3_location)
helpers.create_table(client, database_name, table_name, table_input)
with assert_raises(ClientError) as exc:
helpers.create_table(client, database_name, table_name, table_input)
exc.exception.response['Error']['Code'].should.equal('TableAlreadyExistsException')
@mock_glue
def test_get_tables():
client = boto3.client('glue', region_name='us-east-1')
database_name = 'myspecialdatabase'
helpers.create_database(client, database_name)
table_names = ['myfirsttable', 'mysecondtable', 'mythirdtable']
table_inputs = {}
for table_name in table_names:
s3_location = 's3://my-bucket/{database_name}/{table_name}'.format(
database_name=database_name,
table_name=table_name
)
table_input = helpers.create_table_input(table_name, s3_location)
table_inputs[table_name] = table_input
helpers.create_table(client, database_name, table_name, table_input)
response = helpers.get_tables(client, database_name)
tables = response['TableList']
assert len(tables) == 3
for table in tables:
table_name = table['Name']
table_name.should.equal(table_inputs[table_name]['Name'])
table['StorageDescriptor'].should.equal(table_inputs[table_name]['StorageDescriptor'])
table['PartitionKeys'].should.equal(table_inputs[table_name]['PartitionKeys'])

View file

@ -262,18 +262,27 @@ def test_update_assume_role_policy():
role.assume_role_policy_document.should.equal("my-policy")
@mock_iam
def test_create_policy():
conn = boto3.client('iam', region_name='us-east-1')
response = conn.create_policy(
PolicyName="TestCreatePolicy",
PolicyDocument='{"some":"policy"}')
response['Policy']['Arn'].should.equal("arn:aws:iam::123456789012:policy/TestCreatePolicy")
@mock_iam
def test_create_policy_versions():
conn = boto3.client('iam', region_name='us-east-1')
with assert_raises(ClientError):
conn.create_policy_version(
PolicyArn="arn:aws:iam::aws:policy/TestCreatePolicyVersion",
PolicyArn="arn:aws:iam::123456789012:policy/TestCreatePolicyVersion",
PolicyDocument='{"some":"policy"}')
conn.create_policy(
PolicyName="TestCreatePolicyVersion",
PolicyDocument='{"some":"policy"}')
version = conn.create_policy_version(
PolicyArn="arn:aws:iam::aws:policy/TestCreatePolicyVersion",
PolicyArn="arn:aws:iam::123456789012:policy/TestCreatePolicyVersion",
PolicyDocument='{"some":"policy"}')
version.get('PolicyVersion').get('Document').should.equal({'some': 'policy'})
@ -285,14 +294,14 @@ def test_get_policy_version():
PolicyName="TestGetPolicyVersion",
PolicyDocument='{"some":"policy"}')
version = conn.create_policy_version(
PolicyArn="arn:aws:iam::aws:policy/TestGetPolicyVersion",
PolicyArn="arn:aws:iam::123456789012:policy/TestGetPolicyVersion",
PolicyDocument='{"some":"policy"}')
with assert_raises(ClientError):
conn.get_policy_version(
PolicyArn="arn:aws:iam::aws:policy/TestGetPolicyVersion",
PolicyArn="arn:aws:iam::123456789012:policy/TestGetPolicyVersion",
VersionId='v2-does-not-exist')
retrieved = conn.get_policy_version(
PolicyArn="arn:aws:iam::aws:policy/TestGetPolicyVersion",
PolicyArn="arn:aws:iam::123456789012:policy/TestGetPolicyVersion",
VersionId=version.get('PolicyVersion').get('VersionId'))
retrieved.get('PolicyVersion').get('Document').should.equal({'some': 'policy'})
@ -302,18 +311,18 @@ def test_list_policy_versions():
conn = boto3.client('iam', region_name='us-east-1')
with assert_raises(ClientError):
versions = conn.list_policy_versions(
PolicyArn="arn:aws:iam::aws:policy/TestListPolicyVersions")
PolicyArn="arn:aws:iam::123456789012:policy/TestListPolicyVersions")
conn.create_policy(
PolicyName="TestListPolicyVersions",
PolicyDocument='{"some":"policy"}')
conn.create_policy_version(
PolicyArn="arn:aws:iam::aws:policy/TestListPolicyVersions",
PolicyArn="arn:aws:iam::123456789012:policy/TestListPolicyVersions",
PolicyDocument='{"first":"policy"}')
conn.create_policy_version(
PolicyArn="arn:aws:iam::aws:policy/TestListPolicyVersions",
PolicyArn="arn:aws:iam::123456789012:policy/TestListPolicyVersions",
PolicyDocument='{"second":"policy"}')
versions = conn.list_policy_versions(
PolicyArn="arn:aws:iam::aws:policy/TestListPolicyVersions")
PolicyArn="arn:aws:iam::123456789012:policy/TestListPolicyVersions")
versions.get('Versions')[0].get('Document').should.equal({'first': 'policy'})
versions.get('Versions')[1].get('Document').should.equal({'second': 'policy'})
@ -325,17 +334,17 @@ def test_delete_policy_version():
PolicyName="TestDeletePolicyVersion",
PolicyDocument='{"some":"policy"}')
conn.create_policy_version(
PolicyArn="arn:aws:iam::aws:policy/TestDeletePolicyVersion",
PolicyArn="arn:aws:iam::123456789012:policy/TestDeletePolicyVersion",
PolicyDocument='{"first":"policy"}')
with assert_raises(ClientError):
conn.delete_policy_version(
PolicyArn="arn:aws:iam::aws:policy/TestDeletePolicyVersion",
PolicyArn="arn:aws:iam::123456789012:policy/TestDeletePolicyVersion",
VersionId='v2-nope-this-does-not-exist')
conn.delete_policy_version(
PolicyArn="arn:aws:iam::aws:policy/TestDeletePolicyVersion",
PolicyArn="arn:aws:iam::123456789012:policy/TestDeletePolicyVersion",
VersionId='v1')
versions = conn.list_policy_versions(
PolicyArn="arn:aws:iam::aws:policy/TestDeletePolicyVersion")
PolicyArn="arn:aws:iam::123456789012:policy/TestDeletePolicyVersion")
len(versions.get('Versions')).should.equal(0)
@ -669,3 +678,68 @@ def test_update_access_key():
Status='Inactive')
resp = client.list_access_keys(UserName=username)
resp['AccessKeyMetadata'][0]['Status'].should.equal('Inactive')
@mock_iam
def test_get_account_authorization_details():
import json
conn = boto3.client('iam', region_name='us-east-1')
conn.create_role(RoleName="my-role", AssumeRolePolicyDocument="some policy", Path="/my-path/")
conn.create_user(Path='/', UserName='testCloudAuxUser')
conn.create_group(Path='/', GroupName='testCloudAuxGroup')
conn.create_policy(
PolicyName='testCloudAuxPolicy',
Path='/',
PolicyDocument=json.dumps({
"Version": "2012-10-17",
"Statement": [
{
"Action": "s3:ListBucket",
"Resource": "*",
"Effect": "Allow",
}
]
}),
Description='Test CloudAux Policy'
)
result = conn.get_account_authorization_details(Filter=['Role'])
len(result['RoleDetailList']) == 1
len(result['UserDetailList']) == 0
len(result['GroupDetailList']) == 0
len(result['Policies']) == 0
result = conn.get_account_authorization_details(Filter=['User'])
len(result['RoleDetailList']) == 0
len(result['UserDetailList']) == 1
len(result['GroupDetailList']) == 0
len(result['Policies']) == 0
result = conn.get_account_authorization_details(Filter=['Group'])
len(result['RoleDetailList']) == 0
len(result['UserDetailList']) == 0
len(result['GroupDetailList']) == 1
len(result['Policies']) == 0
result = conn.get_account_authorization_details(Filter=['LocalManagedPolicy'])
len(result['RoleDetailList']) == 0
len(result['UserDetailList']) == 0
len(result['GroupDetailList']) == 0
len(result['Policies']) == 1
# Check for greater than 1 since this should always be greater than one but might change.
# See iam/aws_managed_policies.py
result = conn.get_account_authorization_details(Filter=['AWSManagedPolicy'])
len(result['RoleDetailList']) == 0
len(result['UserDetailList']) == 0
len(result['GroupDetailList']) == 0
len(result['Policies']) > 1
result = conn.get_account_authorization_details()
len(result['RoleDetailList']) == 1
len(result['UserDetailList']) == 1
len(result['GroupDetailList']) == 1
len(result['Policies']) > 1

View file

@ -89,6 +89,7 @@ def test_basic_shard_iterator():
response = conn.get_records(shard_iterator)
shard_iterator = response['NextShardIterator']
response['Records'].should.equal([])
response['MillisBehindLatest'].should.equal(0)
@mock_kinesis_deprecated
@ -225,6 +226,7 @@ def test_get_records_after_sequence_number():
response = conn.get_records(shard_iterator)
# And the first result returned should be the third item
response['Records'][0]['Data'].should.equal('3')
response['MillisBehindLatest'].should.equal(0)
@mock_kinesis_deprecated
@ -262,6 +264,7 @@ def test_get_records_latest():
response['Records'].should.have.length_of(1)
response['Records'][0]['PartitionKey'].should.equal('last_record')
response['Records'][0]['Data'].should.equal('last_record')
response['MillisBehindLatest'].should.equal(0)
@mock_kinesis
@ -305,6 +308,7 @@ def test_get_records_at_timestamp():
response['Records'].should.have.length_of(len(keys))
partition_keys = [r['PartitionKey'] for r in response['Records']]
partition_keys.should.equal(keys)
response['MillisBehindLatest'].should.equal(0)
@mock_kinesis
@ -330,10 +334,69 @@ def test_get_records_at_very_old_timestamp():
shard_iterator = response['ShardIterator']
response = conn.get_records(ShardIterator=shard_iterator)
response['Records'].should.have.length_of(len(keys))
partition_keys = [r['PartitionKey'] for r in response['Records']]
partition_keys.should.equal(keys)
response['MillisBehindLatest'].should.equal(0)
@mock_kinesis
def test_get_records_timestamp_filtering():
conn = boto3.client('kinesis', region_name="us-west-2")
stream_name = "my_stream"
conn.create_stream(StreamName=stream_name, ShardCount=1)
conn.put_record(StreamName=stream_name,
Data='0',
PartitionKey='0')
time.sleep(1.0)
timestamp = datetime.datetime.utcnow()
conn.put_record(StreamName=stream_name,
Data='1',
PartitionKey='1')
response = conn.describe_stream(StreamName=stream_name)
shard_id = response['StreamDescription']['Shards'][0]['ShardId']
response = conn.get_shard_iterator(StreamName=stream_name,
ShardId=shard_id,
ShardIteratorType='AT_TIMESTAMP',
Timestamp=timestamp)
shard_iterator = response['ShardIterator']
response = conn.get_records(ShardIterator=shard_iterator)
response['Records'].should.have.length_of(1)
response['Records'][0]['PartitionKey'].should.equal('1')
response['Records'][0]['ApproximateArrivalTimestamp'].should.be.\
greater_than(timestamp)
response['MillisBehindLatest'].should.equal(0)
@mock_kinesis
def test_get_records_millis_behind_latest():
conn = boto3.client('kinesis', region_name="us-west-2")
stream_name = "my_stream"
conn.create_stream(StreamName=stream_name, ShardCount=1)
conn.put_record(StreamName=stream_name,
Data='0',
PartitionKey='0')
time.sleep(1.0)
conn.put_record(StreamName=stream_name,
Data='1',
PartitionKey='1')
response = conn.describe_stream(StreamName=stream_name)
shard_id = response['StreamDescription']['Shards'][0]['ShardId']
response = conn.get_shard_iterator(StreamName=stream_name,
ShardId=shard_id,
ShardIteratorType='TRIM_HORIZON')
shard_iterator = response['ShardIterator']
response = conn.get_records(ShardIterator=shard_iterator, Limit=1)
response['Records'].should.have.length_of(1)
response['MillisBehindLatest'].should.be.greater_than(0)
@mock_kinesis
@ -363,6 +426,7 @@ def test_get_records_at_very_new_timestamp():
response = conn.get_records(ShardIterator=shard_iterator)
response['Records'].should.have.length_of(0)
response['MillisBehindLatest'].should.equal(0)
@mock_kinesis
@ -385,6 +449,7 @@ def test_get_records_from_empty_stream_at_timestamp():
response = conn.get_records(ShardIterator=shard_iterator)
response['Records'].should.have.length_of(0)
response['MillisBehindLatest'].should.equal(0)
@mock_kinesis_deprecated

View file

@ -225,6 +225,29 @@ def test_multipart_invalid_order():
bucket.complete_multipart_upload.when.called_with(
multipart.key_name, multipart.id, xml).should.throw(S3ResponseError)
@mock_s3_deprecated
@reduced_min_part_size
def test_multipart_etag_quotes_stripped():
# Create Bucket so that test can run
conn = boto.connect_s3('the_key', 'the_secret')
bucket = conn.create_bucket('mybucket')
multipart = bucket.initiate_multipart_upload("the-key")
part1 = b'0' * REDUCED_PART_SIZE
etag1 = multipart.upload_part_from_file(BytesIO(part1), 1).etag
# last part, can be less than 5 MB
part2 = b'1'
etag2 = multipart.upload_part_from_file(BytesIO(part2), 2).etag
# Strip quotes from etags
etag1 = etag1.replace('"','')
etag2 = etag2.replace('"','')
xml = "<Part><PartNumber>{0}</PartNumber><ETag>{1}</ETag></Part>"
xml = xml.format(1, etag1) + xml.format(2, etag2)
xml = "<CompleteMultipartUpload>{0}</CompleteMultipartUpload>".format(xml)
bucket.complete_multipart_upload.when.called_with(
multipart.key_name, multipart.id, xml).should_not.throw(S3ResponseError)
# we should get both parts as the key contents
bucket.get_key("the-key").etag.should.equal(EXPECTED_ETAG)
@mock_s3_deprecated
@reduced_min_part_size
@ -2362,6 +2385,35 @@ def test_boto3_list_object_versions():
response['Body'].read().should.equal(items[-1])
@mock_s3
def test_boto3_bad_prefix_list_object_versions():
s3 = boto3.client('s3', region_name='us-east-1')
bucket_name = 'mybucket'
key = 'key-with-versions'
bad_prefix = 'key-that-does-not-exist'
s3.create_bucket(Bucket=bucket_name)
s3.put_bucket_versioning(
Bucket=bucket_name,
VersioningConfiguration={
'Status': 'Enabled'
}
)
items = (six.b('v1'), six.b('v2'))
for body in items:
s3.put_object(
Bucket=bucket_name,
Key=key,
Body=body
)
response = s3.list_object_versions(
Bucket=bucket_name,
Prefix=bad_prefix,
)
response['ResponseMetadata']['HTTPStatusCode'].should.equal(200)
response.should_not.contain('Versions')
response.should_not.contain('DeleteMarkers')
@mock_s3
def test_boto3_delete_markers():
s3 = boto3.client('s3', region_name='us-east-1')

View file

@ -101,6 +101,6 @@ def test_s3_default_storage_class():
# tests that the default storage class is still STANDARD
list_of_objects["Contents"][0]["StorageClass"].should.equal("STANDARD")

View file

@ -21,7 +21,7 @@ def test_force_ignore_subdomain_for_bucketnames():
os.environ['S3_IGNORE_SUBDOMAIN_BUCKETNAME'] = '1'
expect(bucket_name_from_url('https://subdomain.localhost:5000/abc/resource')).should.equal(None)
del(os.environ['S3_IGNORE_SUBDOMAIN_BUCKETNAME'])
def test_versioned_key_store():

View file

@ -3,11 +3,179 @@ from __future__ import unicode_literals
import boto3
from moto import mock_secretsmanager
from botocore.exceptions import ClientError
import sure # noqa
import string
import unittest
from nose.tools import assert_raises
@mock_secretsmanager
def test_get_secret_value():
conn = boto3.client('secretsmanager', region_name='us-west-2')
create_secret = conn.create_secret(Name='java-util-test-password',
SecretString="foosecret")
result = conn.get_secret_value(SecretId='java-util-test-password')
assert result['SecretString'] == 'mysecretstring'
assert result['SecretString'] == 'foosecret'
@mock_secretsmanager
def test_get_secret_that_does_not_exist():
conn = boto3.client('secretsmanager', region_name='us-west-2')
with assert_raises(ClientError):
result = conn.get_secret_value(SecretId='i-dont-exist')
@mock_secretsmanager
def test_get_secret_with_mismatched_id():
conn = boto3.client('secretsmanager', region_name='us-west-2')
create_secret = conn.create_secret(Name='java-util-test-password',
SecretString="foosecret")
with assert_raises(ClientError):
result = conn.get_secret_value(SecretId='i-dont-exist')
@mock_secretsmanager
def test_create_secret():
conn = boto3.client('secretsmanager', region_name='us-east-1')
result = conn.create_secret(Name='test-secret', SecretString="foosecret")
assert result['ARN'] == (
'arn:aws:secretsmanager:us-east-1:1234567890:secret:test-secret-rIjad')
assert result['Name'] == 'test-secret'
secret = conn.get_secret_value(SecretId='test-secret')
assert secret['SecretString'] == 'foosecret'
@mock_secretsmanager
def test_get_random_password_default_length():
conn = boto3.client('secretsmanager', region_name='us-west-2')
random_password = conn.get_random_password()
assert len(random_password['RandomPassword']) == 32
@mock_secretsmanager
def test_get_random_password_default_requirements():
# When require_each_included_type, default true
conn = boto3.client('secretsmanager', region_name='us-west-2')
random_password = conn.get_random_password()
# Should contain lowercase, upppercase, digit, special character
assert any(c.islower() for c in random_password['RandomPassword'])
assert any(c.isupper() for c in random_password['RandomPassword'])
assert any(c.isdigit() for c in random_password['RandomPassword'])
assert any(c in string.punctuation
for c in random_password['RandomPassword'])
@mock_secretsmanager
def test_get_random_password_custom_length():
conn = boto3.client('secretsmanager', region_name='us-west-2')
random_password = conn.get_random_password(PasswordLength=50)
assert len(random_password['RandomPassword']) == 50
@mock_secretsmanager
def test_get_random_exclude_lowercase():
conn = boto3.client('secretsmanager', region_name='us-west-2')
random_password = conn.get_random_password(PasswordLength=55,
ExcludeLowercase=True)
assert any(c.islower() for c in random_password['RandomPassword']) == False
@mock_secretsmanager
def test_get_random_exclude_uppercase():
conn = boto3.client('secretsmanager', region_name='us-west-2')
random_password = conn.get_random_password(PasswordLength=55,
ExcludeUppercase=True)
assert any(c.isupper() for c in random_password['RandomPassword']) == False
@mock_secretsmanager
def test_get_random_exclude_characters_and_symbols():
conn = boto3.client('secretsmanager', region_name='us-west-2')
random_password = conn.get_random_password(PasswordLength=20,
ExcludeCharacters='xyzDje@?!.')
assert any(c in 'xyzDje@?!.' for c in random_password['RandomPassword']) == False
@mock_secretsmanager
def test_get_random_exclude_numbers():
conn = boto3.client('secretsmanager', region_name='us-west-2')
random_password = conn.get_random_password(PasswordLength=100,
ExcludeNumbers=True)
assert any(c.isdigit() for c in random_password['RandomPassword']) == False
@mock_secretsmanager
def test_get_random_exclude_punctuation():
conn = boto3.client('secretsmanager', region_name='us-west-2')
random_password = conn.get_random_password(PasswordLength=100,
ExcludePunctuation=True)
assert any(c in string.punctuation
for c in random_password['RandomPassword']) == False
@mock_secretsmanager
def test_get_random_include_space_false():
conn = boto3.client('secretsmanager', region_name='us-west-2')
random_password = conn.get_random_password(PasswordLength=300)
assert any(c.isspace() for c in random_password['RandomPassword']) == False
@mock_secretsmanager
def test_get_random_include_space_true():
conn = boto3.client('secretsmanager', region_name='us-west-2')
random_password = conn.get_random_password(PasswordLength=4,
IncludeSpace=True)
assert any(c.isspace() for c in random_password['RandomPassword']) == True
@mock_secretsmanager
def test_get_random_require_each_included_type():
conn = boto3.client('secretsmanager', region_name='us-west-2')
random_password = conn.get_random_password(PasswordLength=4,
RequireEachIncludedType=True)
assert any(c in string.punctuation for c in random_password['RandomPassword']) == True
assert any(c in string.ascii_lowercase for c in random_password['RandomPassword']) == True
assert any(c in string.ascii_uppercase for c in random_password['RandomPassword']) == True
assert any(c in string.digits for c in random_password['RandomPassword']) == True
@mock_secretsmanager
def test_get_random_too_short_password():
conn = boto3.client('secretsmanager', region_name='us-west-2')
with assert_raises(ClientError):
random_password = conn.get_random_password(PasswordLength=3)
@mock_secretsmanager
def test_get_random_too_long_password():
conn = boto3.client('secretsmanager', region_name='us-west-2')
with assert_raises(Exception):
random_password = conn.get_random_password(PasswordLength=5555)
@mock_secretsmanager
def test_describe_secret():
conn = boto3.client('secretsmanager', region_name='us-west-2')
conn.create_secret(Name='test-secret',
SecretString='foosecret')
secret_description = conn.describe_secret(SecretId='test-secret')
assert secret_description # Returned dict is not empty
assert secret_description['ARN'] == (
'arn:aws:secretsmanager:us-west-2:1234567890:secret:test-secret-rIjad')
@mock_secretsmanager
def test_describe_secret_that_does_not_exist():
conn = boto3.client('secretsmanager', region_name='us-west-2')
with assert_raises(ClientError):
result = conn.get_secret_value(SecretId='i-dont-exist')
@mock_secretsmanager
def test_describe_secret_that_does_not_match():
conn = boto3.client('secretsmanager', region_name='us-west-2')
conn.create_secret(Name='test-secret',
SecretString='foosecret')
with assert_raises(ClientError):
result = conn.get_secret_value(SecretId='i-dont-match')

View file

@ -7,7 +7,7 @@ import moto.server as server
from moto import mock_secretsmanager
'''
Test the different server responses
Test the different server responses for secretsmanager
'''
@ -17,11 +17,119 @@ def test_get_secret_value():
backend = server.create_backend_app("secretsmanager")
test_client = backend.test_client()
res = test_client.post('/',
data={"SecretId": "test", "VersionStage": "AWSCURRENT"},
create_secret = test_client.post('/',
data={"Name": "test-secret",
"SecretString": "foo-secret"},
headers={
"X-Amz-Target": "secretsmanager.CreateSecret"},
)
get_secret = test_client.post('/',
data={"SecretId": "test-secret",
"VersionStage": "AWSCURRENT"},
headers={
"X-Amz-Target": "secretsmanager.GetSecretValue"},
)
json_data = json.loads(get_secret.data.decode("utf-8"))
assert json_data['SecretString'] == 'foo-secret'
@mock_secretsmanager
def test_get_secret_that_does_not_exist():
backend = server.create_backend_app("secretsmanager")
test_client = backend.test_client()
get_secret = test_client.post('/',
data={"SecretId": "i-dont-exist",
"VersionStage": "AWSCURRENT"},
headers={
"X-Amz-Target": "secretsmanager.GetSecretValue"},
)
json_data = json.loads(get_secret.data.decode("utf-8"))
assert json_data['message'] == "Secrets Manager can't find the specified secret"
assert json_data['__type'] == 'ResourceNotFoundException'
@mock_secretsmanager
def test_create_secret():
backend = server.create_backend_app("secretsmanager")
test_client = backend.test_client()
res = test_client.post('/',
data={"Name": "test-secret",
"SecretString": "foo-secret"},
headers={
"X-Amz-Target": "secretsmanager.CreateSecret"},
)
json_data = json.loads(res.data.decode("utf-8"))
assert json_data['SecretString'] == "mysecretstring"
assert json_data['ARN'] == (
'arn:aws:secretsmanager:us-east-1:1234567890:secret:test-secret-rIjad')
assert json_data['Name'] == 'test-secret'
@mock_secretsmanager
def test_describe_secret():
backend = server.create_backend_app('secretsmanager')
test_client = backend.test_client()
create_secret = test_client.post('/',
data={"Name": "test-secret",
"SecretString": "foosecret"},
headers={
"X-Amz-Target": "secretsmanager.CreateSecret"
},
)
describe_secret = test_client.post('/',
data={"SecretId": "test-secret"},
headers={
"X-Amz-Target": "secretsmanager.DescribeSecret"
},
)
json_data = json.loads(describe_secret.data.decode("utf-8"))
assert json_data # Returned dict is not empty
assert json_data['ARN'] == (
'arn:aws:secretsmanager:us-east-1:1234567890:secret:test-secret-rIjad'
)
@mock_secretsmanager
def test_describe_secret_that_does_not_exist():
backend = server.create_backend_app('secretsmanager')
test_client = backend.test_client()
describe_secret = test_client.post('/',
data={"SecretId": "i-dont-exist"},
headers={
"X-Amz-Target": "secretsmanager.DescribeSecret"
},
)
json_data = json.loads(describe_secret.data.decode("utf-8"))
assert json_data['message'] == "Secrets Manager can't find the specified secret"
assert json_data['__type'] == 'ResourceNotFoundException'
@mock_secretsmanager
def test_describe_secret_that_does_not_match():
backend = server.create_backend_app('secretsmanager')
test_client = backend.test_client()
create_secret = test_client.post('/',
data={"Name": "test-secret",
"SecretString": "foosecret"},
headers={
"X-Amz-Target": "secretsmanager.CreateSecret"
},
)
describe_secret = test_client.post('/',
data={"SecretId": "i-dont-match"},
headers={
"X-Amz-Target": "secretsmanager.DescribeSecret"
},
)
json_data = json.loads(describe_secret.data.decode("utf-8"))
assert json_data['message'] == "Secrets Manager can't find the specified secret"
assert json_data['__type'] == 'ResourceNotFoundException'

View file

@ -182,6 +182,72 @@ def test_subscription_paging():
topic1_subscriptions.shouldnt.have("NextToken")
@mock_sns
def test_creating_subscription_with_attributes():
conn = boto3.client('sns', region_name='us-east-1')
conn.create_topic(Name="some-topic")
response = conn.list_topics()
topic_arn = response["Topics"][0]['TopicArn']
delivery_policy = json.dumps({
'healthyRetryPolicy': {
"numRetries": 10,
"minDelayTarget": 1,
"maxDelayTarget":2
}
})
filter_policy = json.dumps({
"store": ["example_corp"],
"event": ["order_cancelled"],
"encrypted": [False],
"customer_interests": ["basketball", "baseball"]
})
conn.subscribe(TopicArn=topic_arn,
Protocol="http",
Endpoint="http://example.com/",
Attributes={
'RawMessageDelivery': 'true',
'DeliveryPolicy': delivery_policy,
'FilterPolicy': filter_policy
})
subscriptions = conn.list_subscriptions()["Subscriptions"]
subscriptions.should.have.length_of(1)
subscription = subscriptions[0]
subscription["TopicArn"].should.equal(topic_arn)
subscription["Protocol"].should.equal("http")
subscription["SubscriptionArn"].should.contain(topic_arn)
subscription["Endpoint"].should.equal("http://example.com/")
# Test the subscription attributes have been set
subscription_arn = subscription["SubscriptionArn"]
attrs = conn.get_subscription_attributes(
SubscriptionArn=subscription_arn
)
attrs['Attributes']['RawMessageDelivery'].should.equal('true')
attrs['Attributes']['DeliveryPolicy'].should.equal(delivery_policy)
attrs['Attributes']['FilterPolicy'].should.equal(filter_policy)
# Now unsubscribe the subscription
conn.unsubscribe(SubscriptionArn=subscription["SubscriptionArn"])
# And there should be zero subscriptions left
subscriptions = conn.list_subscriptions()["Subscriptions"]
subscriptions.should.have.length_of(0)
# invalid attr name
with assert_raises(ClientError):
conn.subscribe(TopicArn=topic_arn,
Protocol="http",
Endpoint="http://example.com/",
Attributes={
'InvalidName': 'true'
})
@mock_sns
def test_set_subscription_attributes():
conn = boto3.client('sns', region_name='us-east-1')

View file

@ -4,6 +4,10 @@ import boto3
import botocore.exceptions
import sure # noqa
import datetime
import uuid
from botocore.exceptions import ClientError
from nose.tools import assert_raises
from moto import mock_ssm
@ -95,6 +99,27 @@ def test_get_parameters_by_path():
Type='SecureString',
KeyId='alias/aws/ssm')
client.put_parameter(
Name='foo',
Description='A test parameter',
Value='bar',
Type='String')
client.put_parameter(
Name='baz',
Description='A test parameter',
Value='qux',
Type='String')
response = client.get_parameters_by_path(Path='/', Recursive=False)
len(response['Parameters']).should.equal(2)
{p['Value'] for p in response['Parameters']}.should.equal(
set(['bar', 'qux'])
)
response = client.get_parameters_by_path(Path='/', Recursive=True)
len(response['Parameters']).should.equal(9)
response = client.get_parameters_by_path(Path='/foo')
len(response['Parameters']).should.equal(2)
{p['Value'] for p in response['Parameters']}.should.equal(
@ -417,6 +442,7 @@ def test_describe_parameters_filter_keyid():
response['Parameters'][0]['Type'].should.equal('SecureString')
''.should.equal(response.get('NextToken', ''))
@mock_ssm
def test_describe_parameters_attributes():
client = boto3.client('ssm', region_name='us-east-1')
@ -445,6 +471,7 @@ def test_describe_parameters_attributes():
response['Parameters'][1].get('Description').should.be.none
response['Parameters'][1]['Version'].should.equal(1)
@mock_ssm
def test_get_parameter_invalid():
client = client = boto3.client('ssm', region_name='us-east-1')
@ -585,3 +612,59 @@ def test_send_command():
cmd['OutputS3KeyPrefix'].should.equal('pref')
cmd['ExpiresAfter'].should.be.greater_than(before)
# test sending a command without any optional parameters
response = client.send_command(
DocumentName=ssm_document)
cmd = response['Command']
cmd['CommandId'].should_not.be(None)
cmd['DocumentName'].should.equal(ssm_document)
@mock_ssm
def test_list_commands():
client = boto3.client('ssm', region_name='us-east-1')
ssm_document = 'AWS-RunShellScript'
params = {'commands': ['#!/bin/bash\necho \'hello world\'']}
response = client.send_command(
InstanceIds=['i-123456'],
DocumentName=ssm_document,
Parameters=params,
OutputS3Region='us-east-2',
OutputS3BucketName='the-bucket',
OutputS3KeyPrefix='pref')
cmd = response['Command']
cmd_id = cmd['CommandId']
# get the command by id
response = client.list_commands(
CommandId=cmd_id)
cmds = response['Commands']
len(cmds).should.equal(1)
cmds[0]['CommandId'].should.equal(cmd_id)
# add another command with the same instance id to test listing by
# instance id
client.send_command(
InstanceIds=['i-123456'],
DocumentName=ssm_document)
response = client.list_commands(
InstanceId='i-123456')
cmds = response['Commands']
len(cmds).should.equal(2)
for cmd in cmds:
cmd['InstanceIds'].should.contain('i-123456')
# test the error case for an invalid command id
with assert_raises(ClientError):
response = client.list_commands(
CommandId=str(uuid.uuid4()))