Merge branch 'master' of https://github.com/spulec/moto
This commit is contained in:
commit
bd9e7deb95
19 changed files with 524 additions and 120 deletions
|
|
@ -1,51 +1,66 @@
|
|||
from __future__ import unicode_literals
|
||||
|
||||
import base64
|
||||
import botocore.client
|
||||
import boto3
|
||||
import hashlib
|
||||
import io
|
||||
import json
|
||||
import zipfile
|
||||
import sure # noqa
|
||||
|
||||
from freezegun import freeze_time
|
||||
from moto import mock_lambda, mock_s3
|
||||
from moto import mock_lambda, mock_s3, mock_ec2
|
||||
|
||||
|
||||
def get_test_zip_file():
|
||||
def _process_lamda(pfunc):
|
||||
zip_output = io.BytesIO()
|
||||
zip_file = zipfile.ZipFile(zip_output, 'w')
|
||||
zip_file.writestr('lambda_function.py', b'''\
|
||||
def handler(event, context):
|
||||
return "hello world"
|
||||
''')
|
||||
zip_file = zipfile.ZipFile(zip_output, 'w', zipfile.ZIP_DEFLATED)
|
||||
zip_file.writestr('lambda_function.zip', pfunc)
|
||||
zip_file.close()
|
||||
zip_output.seek(0)
|
||||
return zip_output.read()
|
||||
|
||||
|
||||
@mock_lambda
|
||||
def test_list_functions():
|
||||
conn = boto3.client('lambda', 'us-west-2')
|
||||
def get_test_zip_file1():
|
||||
pfunc = """
|
||||
def lambda_handler(event, context):
|
||||
return (event, context)
|
||||
"""
|
||||
return _process_lamda(pfunc)
|
||||
|
||||
result = conn.list_functions()
|
||||
|
||||
result['Functions'].should.have.length_of(0)
|
||||
def get_test_zip_file2():
|
||||
pfunc = """
|
||||
def lambda_handler(event, context):
|
||||
volume_id = event.get('volume_id')
|
||||
print('get volume details for %s' % volume_id)
|
||||
import boto3
|
||||
ec2 = boto3.resource('ec2', region_name='us-west-2')
|
||||
vol = ec2.Volume(volume_id)
|
||||
print('Volume - %s state=%s, size=%s' % (volume_id, vol.state, vol.size))
|
||||
"""
|
||||
return _process_lamda(pfunc)
|
||||
|
||||
|
||||
@mock_lambda
|
||||
@mock_s3
|
||||
@freeze_time('2015-01-01 00:00:00')
|
||||
def test_invoke_function():
|
||||
def test_list_functions():
|
||||
conn = boto3.client('lambda', 'us-west-2')
|
||||
result = conn.list_functions()
|
||||
result['Functions'].should.have.length_of(0)
|
||||
|
||||
zip_content = get_test_zip_file()
|
||||
@mock_lambda
|
||||
@freeze_time('2015-01-01 00:00:00')
|
||||
def test_invoke_event_function():
|
||||
conn = boto3.client('lambda', 'us-west-2')
|
||||
conn.create_function(
|
||||
FunctionName='testFunction',
|
||||
Runtime='python2.7',
|
||||
Role='test-iam-role',
|
||||
Handler='lambda_function.handler',
|
||||
Code={
|
||||
'ZipFile': zip_content,
|
||||
'ZipFile': get_test_zip_file1(),
|
||||
},
|
||||
Description='test lambda function',
|
||||
Timeout=3,
|
||||
|
|
@ -53,8 +68,8 @@ def test_invoke_function():
|
|||
Publish=True,
|
||||
)
|
||||
|
||||
success_result = conn.invoke(FunctionName='testFunction', InvocationType='Event', Payload='{}')
|
||||
success_result["StatusCode"].should.equal(200)
|
||||
success_result = conn.invoke(FunctionName='testFunction', InvocationType='Event', Payload=json.dumps({'msg': 'Mostly Harmless'}))
|
||||
success_result["StatusCode"].should.equal(202)
|
||||
|
||||
conn.invoke.when.called_with(
|
||||
FunctionName='notAFunction',
|
||||
|
|
@ -62,11 +77,63 @@ def test_invoke_function():
|
|||
Payload='{}'
|
||||
).should.throw(botocore.client.ClientError)
|
||||
|
||||
success_result = conn.invoke(FunctionName='testFunction', InvocationType='RequestResponse', Payload='{}')
|
||||
success_result["StatusCode"].should.equal(200)
|
||||
|
||||
@mock_lambda
|
||||
@freeze_time('2015-01-01 00:00:00')
|
||||
def test_invoke_requestresponse_function():
|
||||
conn = boto3.client('lambda', 'us-west-2')
|
||||
conn.create_function(
|
||||
FunctionName='testFunction',
|
||||
Runtime='python2.7',
|
||||
Role='test-iam-role',
|
||||
Handler='lambda_function.handler',
|
||||
Code={
|
||||
'ZipFile': get_test_zip_file1(),
|
||||
},
|
||||
Description='test lambda function',
|
||||
Timeout=3,
|
||||
MemorySize=128,
|
||||
Publish=True,
|
||||
)
|
||||
|
||||
success_result = conn.invoke(FunctionName='testFunction', InvocationType='RequestResponse',
|
||||
Payload=json.dumps({'msg': 'So long and thanks for all the fish'}))
|
||||
success_result["StatusCode"].should.equal(202)
|
||||
|
||||
#nasty hack - hope someone has better solution dealing with unicode tests working for Py2 and Py3.
|
||||
base64.b64decode(success_result["LogResult"]).decode('utf-8').replace("u'", "'").should.equal("({'msg': 'So long and thanks for all the fish'}, {})\n\n")
|
||||
|
||||
@mock_ec2
|
||||
@mock_lambda
|
||||
@freeze_time('2015-01-01 00:00:00')
|
||||
def test_invoke_function_get_ec2_volume():
|
||||
conn = boto3.resource("ec2", "us-west-2")
|
||||
vol = conn.create_volume(Size=99, AvailabilityZone='us-west-2')
|
||||
vol = conn.Volume(vol.id)
|
||||
|
||||
conn = boto3.client('lambda', 'us-west-2')
|
||||
conn.create_function(
|
||||
FunctionName='testFunction',
|
||||
Runtime='python2.7',
|
||||
Role='test-iam-role',
|
||||
Handler='lambda_function.handler',
|
||||
Code={
|
||||
'ZipFile': get_test_zip_file2(),
|
||||
},
|
||||
Description='test lambda function',
|
||||
Timeout=3,
|
||||
MemorySize=128,
|
||||
Publish=True,
|
||||
)
|
||||
|
||||
import json
|
||||
success_result = conn.invoke(FunctionName='testFunction', InvocationType='RequestResponse', Payload=json.dumps({'volume_id': vol.id}))
|
||||
success_result["StatusCode"].should.equal(202)
|
||||
|
||||
import base64
|
||||
base64.b64decode(success_result["LogResult"]).decode('utf-8').should.equal("Some log file output...")
|
||||
msg = 'get volume details for %s\nVolume - %s state=%s, size=%s\nNone\n\n' % (vol.id, vol.id, vol.state, vol.size)
|
||||
# yet again hacky solution to allow code to run tests for python2 and python3 - pls someone fix :(
|
||||
base64.b64decode(success_result["LogResult"]).decode('utf-8').replace("u'", "'").should.equal(msg)
|
||||
|
||||
|
||||
@mock_lambda
|
||||
|
|
@ -100,8 +167,8 @@ def test_create_based_on_s3_with_missing_bucket():
|
|||
def test_create_function_from_aws_bucket():
|
||||
s3_conn = boto3.client('s3', 'us-west-2')
|
||||
s3_conn.create_bucket(Bucket='test-bucket')
|
||||
zip_content = get_test_zip_file2()
|
||||
|
||||
zip_content = get_test_zip_file()
|
||||
s3_conn.put_object(Bucket='test-bucket', Key='test.zip', Body=zip_content)
|
||||
conn = boto3.client('lambda', 'us-west-2')
|
||||
|
||||
|
|
@ -123,7 +190,8 @@ def test_create_function_from_aws_bucket():
|
|||
"SubnetIds": ["subnet-123abc"],
|
||||
},
|
||||
)
|
||||
result['ResponseMetadata'].pop('HTTPHeaders', None) # this is hard to match against, so remove it
|
||||
result['ResponseMetadata'].pop('HTTPHeaders', None) # this is hard to match against, so remove it
|
||||
result['ResponseMetadata'].pop('RetryAttempts', None) # Botocore inserts retry attempts not seen in Python27
|
||||
result.should.equal({
|
||||
'FunctionName': 'testFunction',
|
||||
'FunctionArn': 'arn:aws:lambda:123456789012:function:testFunction',
|
||||
|
|
@ -142,7 +210,6 @@ def test_create_function_from_aws_bucket():
|
|||
"SubnetIds": ["subnet-123abc"],
|
||||
"VpcId": "vpc-123abc"
|
||||
},
|
||||
|
||||
'ResponseMetadata': {'HTTPStatusCode': 201},
|
||||
})
|
||||
|
||||
|
|
@ -151,8 +218,7 @@ def test_create_function_from_aws_bucket():
|
|||
@freeze_time('2015-01-01 00:00:00')
|
||||
def test_create_function_from_zipfile():
|
||||
conn = boto3.client('lambda', 'us-west-2')
|
||||
|
||||
zip_content = get_test_zip_file()
|
||||
zip_content = get_test_zip_file1()
|
||||
result = conn.create_function(
|
||||
FunctionName='testFunction',
|
||||
Runtime='python2.7',
|
||||
|
|
@ -166,7 +232,9 @@ def test_create_function_from_zipfile():
|
|||
MemorySize=128,
|
||||
Publish=True,
|
||||
)
|
||||
result['ResponseMetadata'].pop('HTTPHeaders', None) # this is hard to match against, so remove it
|
||||
result['ResponseMetadata'].pop('HTTPHeaders', None) # this is hard to match against, so remove it
|
||||
result['ResponseMetadata'].pop('RetryAttempts', None) # Botocore inserts retry attempts not seen in Python27
|
||||
|
||||
result.should.equal({
|
||||
'FunctionName': 'testFunction',
|
||||
'FunctionArn': 'arn:aws:lambda:123456789012:function:testFunction',
|
||||
|
|
@ -196,7 +264,7 @@ def test_get_function():
|
|||
s3_conn = boto3.client('s3', 'us-west-2')
|
||||
s3_conn.create_bucket(Bucket='test-bucket')
|
||||
|
||||
zip_content = get_test_zip_file()
|
||||
zip_content = get_test_zip_file1()
|
||||
s3_conn.put_object(Bucket='test-bucket', Key='test.zip', Body=zip_content)
|
||||
conn = boto3.client('lambda', 'us-west-2')
|
||||
|
||||
|
|
@ -216,7 +284,8 @@ def test_get_function():
|
|||
)
|
||||
|
||||
result = conn.get_function(FunctionName='testFunction')
|
||||
result['ResponseMetadata'].pop('HTTPHeaders', None) # this is hard to match against, so remove it
|
||||
result['ResponseMetadata'].pop('HTTPHeaders', None) # this is hard to match against, so remove it
|
||||
result['ResponseMetadata'].pop('RetryAttempts', None) # Botocore inserts retry attempts not seen in Python27
|
||||
|
||||
result.should.equal({
|
||||
"Code": {
|
||||
|
|
@ -245,14 +314,13 @@ def test_get_function():
|
|||
})
|
||||
|
||||
|
||||
|
||||
@mock_lambda
|
||||
@mock_s3
|
||||
def test_delete_function():
|
||||
s3_conn = boto3.client('s3', 'us-west-2')
|
||||
s3_conn.create_bucket(Bucket='test-bucket')
|
||||
|
||||
zip_content = get_test_zip_file()
|
||||
zip_content = get_test_zip_file2()
|
||||
s3_conn.put_object(Bucket='test-bucket', Key='test.zip', Body=zip_content)
|
||||
conn = boto3.client('lambda', 'us-west-2')
|
||||
|
||||
|
|
@ -272,7 +340,9 @@ def test_delete_function():
|
|||
)
|
||||
|
||||
success_result = conn.delete_function(FunctionName='testFunction')
|
||||
success_result['ResponseMetadata'].pop('HTTPHeaders', None) # this is hard to match against, so remove it
|
||||
success_result['ResponseMetadata'].pop('HTTPHeaders', None) # this is hard to match against, so remove it
|
||||
success_result['ResponseMetadata'].pop('RetryAttempts', None) # Botocore inserts retry attempts not seen in Python27
|
||||
|
||||
success_result.should.equal({'ResponseMetadata': {'HTTPStatusCode': 204}})
|
||||
|
||||
conn.delete_function.when.called_with(FunctionName='testFunctionThatDoesntExist').should.throw(botocore.client.ClientError)
|
||||
|
|
@ -289,7 +359,7 @@ def test_list_create_list_get_delete_list():
|
|||
s3_conn = boto3.client('s3', 'us-west-2')
|
||||
s3_conn.create_bucket(Bucket='test-bucket')
|
||||
|
||||
zip_content = get_test_zip_file()
|
||||
zip_content = get_test_zip_file2()
|
||||
s3_conn.put_object(Bucket='test-bucket', Key='test.zip', Body=zip_content)
|
||||
conn = boto3.client('lambda', 'us-west-2')
|
||||
|
||||
|
|
@ -337,7 +407,9 @@ def test_list_create_list_get_delete_list():
|
|||
conn.list_functions()['Functions'].should.equal([expected_function_result['Configuration']])
|
||||
|
||||
func = conn.get_function(FunctionName='testFunction')
|
||||
func['ResponseMetadata'].pop('HTTPHeaders', None) # this is hard to match against, so remove it
|
||||
func['ResponseMetadata'].pop('HTTPHeaders', None) # this is hard to match against, so remove it
|
||||
func['ResponseMetadata'].pop('RetryAttempts', None) # Botocore inserts retry attempts not seen in Python27
|
||||
|
||||
func.should.equal(expected_function_result)
|
||||
conn.delete_function(FunctionName='testFunction')
|
||||
|
||||
|
|
|
|||
|
|
@ -1,6 +1,7 @@
|
|||
from __future__ import unicode_literals
|
||||
import json
|
||||
|
||||
import base64
|
||||
import boto
|
||||
import boto.cloudformation
|
||||
import boto.datapipeline
|
||||
|
|
@ -1724,10 +1725,29 @@ def test_datapipeline():
|
|||
stack_resources.should.have.length_of(1)
|
||||
stack_resources[0].physical_resource_id.should.equal(data_pipelines['pipelineIdList'][0]['id'])
|
||||
|
||||
def _process_lamda(pfunc):
|
||||
import io
|
||||
import zipfile
|
||||
zip_output = io.BytesIO()
|
||||
zip_file = zipfile.ZipFile(zip_output, 'w', zipfile.ZIP_DEFLATED)
|
||||
zip_file.writestr('lambda_function.zip', pfunc)
|
||||
zip_file.close()
|
||||
zip_output.seek(0)
|
||||
return zip_output.read()
|
||||
|
||||
|
||||
def get_test_zip_file1():
|
||||
pfunc = """
|
||||
def lambda_handler(event, context):
|
||||
return (event, context)
|
||||
"""
|
||||
return _process_lamda(pfunc)
|
||||
|
||||
|
||||
@mock_cloudformation
|
||||
@mock_lambda
|
||||
def test_lambda_function():
|
||||
# switch this to python as backend lambda only supports python execution.
|
||||
conn = boto3.client('lambda', 'us-east-1')
|
||||
template = {
|
||||
"AWSTemplateFormatVersion": "2010-09-09",
|
||||
|
|
@ -1736,22 +1756,15 @@ def test_lambda_function():
|
|||
"Type": "AWS::Lambda::Function",
|
||||
"Properties": {
|
||||
"Code": {
|
||||
"ZipFile": {"Fn::Join": [
|
||||
"\n",
|
||||
"""
|
||||
exports.handler = function(event, context) {
|
||||
context.succeed();
|
||||
}
|
||||
""".splitlines()
|
||||
]}
|
||||
"ZipFile": base64.b64encode(get_test_zip_file1()).decode('utf-8')
|
||||
},
|
||||
"Handler": "index.handler",
|
||||
"Handler": "lambda_function.handler",
|
||||
"Description": "Test function",
|
||||
"MemorySize": 128,
|
||||
"Role": "test-role",
|
||||
"Runtime": "nodejs",
|
||||
"Runtime": "python2.7"
|
||||
}
|
||||
},
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -1765,10 +1778,10 @@ def test_lambda_function():
|
|||
result = conn.list_functions()
|
||||
result['Functions'].should.have.length_of(1)
|
||||
result['Functions'][0]['Description'].should.equal('Test function')
|
||||
result['Functions'][0]['Handler'].should.equal('index.handler')
|
||||
result['Functions'][0]['Handler'].should.equal('lambda_function.handler')
|
||||
result['Functions'][0]['MemorySize'].should.equal(128)
|
||||
result['Functions'][0]['Role'].should.equal('test-role')
|
||||
result['Functions'][0]['Runtime'].should.equal('nodejs')
|
||||
result['Functions'][0]['Runtime'].should.equal('python2.7')
|
||||
|
||||
|
||||
@mock_cloudformation
|
||||
|
|
|
|||
|
|
@ -8,6 +8,7 @@ from freezegun import freeze_time
|
|||
from boto.exception import JSONResponseError
|
||||
from moto import mock_dynamodb2
|
||||
from tests.helpers import requires_boto_gte
|
||||
import botocore
|
||||
try:
|
||||
from boto.dynamodb2.fields import HashKey
|
||||
from boto.dynamodb2.table import Table
|
||||
|
|
@ -469,6 +470,7 @@ def test_update_item_set():
|
|||
})
|
||||
|
||||
|
||||
|
||||
@mock_dynamodb2
|
||||
def test_failed_overwrite():
|
||||
table = Table.create('messages', schema=[
|
||||
|
|
@ -585,6 +587,37 @@ def test_boto3_conditions():
|
|||
response['Items'][0].should.equal({"username": "johndoe"})
|
||||
|
||||
|
||||
@mock_dynamodb2
|
||||
def test_boto3_put_item_conditions_fails():
|
||||
table = _create_user_table()
|
||||
table.put_item(Item={'username': 'johndoe', 'foo': 'bar'})
|
||||
table.put_item.when.called_with(
|
||||
Item={'username': 'johndoe', 'foo': 'baz'},
|
||||
Expected={
|
||||
'foo': {
|
||||
'ComparisonOperator': 'NE',
|
||||
'AttributeValueList': ['bar']
|
||||
}
|
||||
}).should.throw(botocore.client.ClientError)
|
||||
|
||||
|
||||
@mock_dynamodb2
|
||||
def test_boto3_put_item_conditions_pass():
|
||||
table = _create_user_table()
|
||||
table.put_item(Item={'username': 'johndoe', 'foo': 'bar'})
|
||||
table.put_item(
|
||||
Item={'username': 'johndoe', 'foo': 'baz'},
|
||||
Expected={
|
||||
'foo': {
|
||||
'ComparisonOperator': 'EQ',
|
||||
'AttributeValueList': ['bar']
|
||||
}
|
||||
})
|
||||
returned_item = table.get_item(Key={'username': 'johndoe'})
|
||||
assert dict(returned_item)['Item']['foo'].should.equal("baz")
|
||||
|
||||
|
||||
|
||||
@mock_dynamodb2
|
||||
def test_scan_pagination():
|
||||
table = _create_user_table()
|
||||
|
|
|
|||
|
|
@ -73,6 +73,13 @@ def test_instance_launch_and_terminate():
|
|||
instance = reservations[0].instances[0]
|
||||
instance.state.should.equal('terminated')
|
||||
|
||||
|
||||
@mock_ec2
|
||||
def test_terminate_empty_instances():
|
||||
conn = boto.connect_ec2('the_key', 'the_secret')
|
||||
conn.terminate_instances.when.called_with([]).should.throw(EC2ResponseError)
|
||||
|
||||
|
||||
@freeze_time("2014-01-01 05:00:00")
|
||||
@mock_ec2
|
||||
def test_instance_attach_volume():
|
||||
|
|
@ -330,6 +337,7 @@ def test_get_instances_filtering_by_tag():
|
|||
reservations[0].instances[0].id.should.equal(instance1.id)
|
||||
reservations[0].instances[1].id.should.equal(instance3.id)
|
||||
|
||||
|
||||
@mock_ec2
|
||||
def test_get_instances_filtering_by_tag_value():
|
||||
conn = boto.connect_ec2()
|
||||
|
|
|
|||
|
|
@ -87,6 +87,60 @@ def test_create_stream():
|
|||
})
|
||||
|
||||
|
||||
@mock_kinesis
|
||||
@freeze_time("2015-03-01")
|
||||
def test_create_stream_without_redshift():
|
||||
client = boto3.client('firehose', region_name='us-east-1')
|
||||
|
||||
response = client.create_delivery_stream(
|
||||
DeliveryStreamName="stream1",
|
||||
S3DestinationConfiguration={
|
||||
'RoleARN': 'arn:aws:iam::123456789012:role/firehose_delivery_role',
|
||||
'BucketARN': 'arn:aws:s3:::kinesis-test',
|
||||
'Prefix': 'myFolder/',
|
||||
'BufferingHints': {
|
||||
'SizeInMBs': 123,
|
||||
'IntervalInSeconds': 124
|
||||
},
|
||||
'CompressionFormat': 'UNCOMPRESSED',
|
||||
}
|
||||
)
|
||||
stream_arn = response['DeliveryStreamARN']
|
||||
|
||||
response = client.describe_delivery_stream(DeliveryStreamName='stream1')
|
||||
stream_description = response['DeliveryStreamDescription']
|
||||
|
||||
# Sure and Freezegun don't play nicely together
|
||||
created = stream_description.pop('CreateTimestamp')
|
||||
last_updated = stream_description.pop('LastUpdateTimestamp')
|
||||
from dateutil.tz import tzlocal
|
||||
assert created == datetime.datetime(2015, 3, 1, tzinfo=tzlocal())
|
||||
assert last_updated == datetime.datetime(2015, 3, 1, tzinfo=tzlocal())
|
||||
|
||||
stream_description.should.equal({
|
||||
'DeliveryStreamName': 'stream1',
|
||||
'DeliveryStreamARN': stream_arn,
|
||||
'DeliveryStreamStatus': 'ACTIVE',
|
||||
'VersionId': 'string',
|
||||
'Destinations': [
|
||||
{
|
||||
'DestinationId': 'string',
|
||||
'S3DestinationDescription': {
|
||||
'RoleARN': 'arn:aws:iam::123456789012:role/firehose_delivery_role',
|
||||
'RoleARN': 'arn:aws:iam::123456789012:role/firehose_delivery_role',
|
||||
'BucketARN': 'arn:aws:s3:::kinesis-test',
|
||||
'Prefix': 'myFolder/',
|
||||
'BufferingHints': {
|
||||
'SizeInMBs': 123,
|
||||
'IntervalInSeconds': 124
|
||||
},
|
||||
'CompressionFormat': 'UNCOMPRESSED',
|
||||
}
|
||||
},
|
||||
],
|
||||
"HasMoreDestinations": False,
|
||||
})
|
||||
|
||||
@mock_kinesis
|
||||
@freeze_time("2015-03-01")
|
||||
def test_deescribe_non_existant_stream():
|
||||
|
|
|
|||
|
|
@ -30,6 +30,39 @@ def test_describe_key():
|
|||
key['KeyMetadata']['KeyUsage'].should.equal("ENCRYPT_DECRYPT")
|
||||
|
||||
|
||||
@mock_kms
|
||||
def test_describe_key_via_alias():
|
||||
conn = boto.kms.connect_to_region("us-west-2")
|
||||
key = conn.create_key(policy="my policy", description="my key", key_usage='ENCRYPT_DECRYPT')
|
||||
conn.create_alias(alias_name='alias/my-key-alias', target_key_id=key['KeyMetadata']['KeyId'])
|
||||
|
||||
alias_key = conn.describe_key('alias/my-key-alias')
|
||||
alias_key['KeyMetadata']['Description'].should.equal("my key")
|
||||
alias_key['KeyMetadata']['KeyUsage'].should.equal("ENCRYPT_DECRYPT")
|
||||
alias_key['KeyMetadata']['Arn'].should.equal(key['KeyMetadata']['Arn'])
|
||||
|
||||
|
||||
@mock_kms
|
||||
def test_describe_key_via_alias_not_found():
|
||||
conn = boto.kms.connect_to_region("us-west-2")
|
||||
key = conn.create_key(policy="my policy", description="my key", key_usage='ENCRYPT_DECRYPT')
|
||||
conn.create_alias(alias_name='alias/my-key-alias', target_key_id=key['KeyMetadata']['KeyId'])
|
||||
|
||||
conn.describe_key.when.called_with('alias/not-found-alias').should.throw(JSONResponseError)
|
||||
|
||||
|
||||
@mock_kms
|
||||
def test_describe_key_via_arn():
|
||||
conn = boto.kms.connect_to_region("us-west-2")
|
||||
key = conn.create_key(policy="my policy", description="my key", key_usage='ENCRYPT_DECRYPT')
|
||||
arn = key['KeyMetadata']['Arn']
|
||||
|
||||
the_key = conn.describe_key(arn)
|
||||
the_key['KeyMetadata']['Description'].should.equal("my key")
|
||||
the_key['KeyMetadata']['KeyUsage'].should.equal("ENCRYPT_DECRYPT")
|
||||
the_key['KeyMetadata']['KeyId'].should.equal(key['KeyMetadata']['KeyId'])
|
||||
|
||||
|
||||
@mock_kms
|
||||
def test_describe_missing_key():
|
||||
conn = boto.kms.connect_to_region("us-west-2")
|
||||
|
|
@ -58,6 +91,18 @@ def test_enable_key_rotation():
|
|||
|
||||
conn.get_key_rotation_status(key_id)['KeyRotationEnabled'].should.equal(True)
|
||||
|
||||
@mock_kms
|
||||
def test_enable_key_rotation_via_arn():
|
||||
conn = boto.kms.connect_to_region("us-west-2")
|
||||
|
||||
key = conn.create_key(policy="my policy", description="my key", key_usage='ENCRYPT_DECRYPT')
|
||||
key_id = key['KeyMetadata']['Arn']
|
||||
|
||||
conn.enable_key_rotation(key_id)
|
||||
|
||||
conn.get_key_rotation_status(key_id)['KeyRotationEnabled'].should.equal(True)
|
||||
|
||||
|
||||
|
||||
@mock_kms
|
||||
def test_enable_key_rotation_with_missing_key():
|
||||
|
|
@ -65,6 +110,18 @@ def test_enable_key_rotation_with_missing_key():
|
|||
conn.enable_key_rotation.when.called_with("not-a-key").should.throw(JSONResponseError)
|
||||
|
||||
|
||||
@mock_kms
|
||||
def test_enable_key_rotation_with_alias_name_should_fail():
|
||||
conn = boto.kms.connect_to_region("us-west-2")
|
||||
key = conn.create_key(policy="my policy", description="my key", key_usage='ENCRYPT_DECRYPT')
|
||||
conn.create_alias(alias_name='alias/my-key-alias', target_key_id=key['KeyMetadata']['KeyId'])
|
||||
|
||||
alias_key = conn.describe_key('alias/my-key-alias')
|
||||
alias_key['KeyMetadata']['Arn'].should.equal(key['KeyMetadata']['Arn'])
|
||||
|
||||
conn.enable_key_rotation.when.called_with('alias/my-alias').should.throw(JSONResponseError)
|
||||
|
||||
|
||||
@mock_kms
|
||||
def test_disable_key_rotation():
|
||||
conn = boto.kms.connect_to_region("us-west-2")
|
||||
|
|
@ -121,6 +178,14 @@ def test_get_key_policy():
|
|||
policy = conn.get_key_policy(key_id, 'default')
|
||||
policy['Policy'].should.equal('my policy')
|
||||
|
||||
@mock_kms
|
||||
def test_get_key_policy_via_arn():
|
||||
conn = boto.kms.connect_to_region('us-west-2')
|
||||
|
||||
key = conn.create_key(policy='my policy', description='my key1', key_usage='ENCRYPT_DECRYPT')
|
||||
policy = conn.get_key_policy(key['KeyMetadata']['Arn'], 'default')
|
||||
|
||||
policy['Policy'].should.equal('my policy')
|
||||
|
||||
@mock_kms
|
||||
def test_put_key_policy():
|
||||
|
|
@ -134,6 +199,42 @@ def test_put_key_policy():
|
|||
policy['Policy'].should.equal('new policy')
|
||||
|
||||
|
||||
@mock_kms
|
||||
def test_put_key_policy_via_arn():
|
||||
conn = boto.kms.connect_to_region('us-west-2')
|
||||
|
||||
key = conn.create_key(policy='my policy', description='my key1', key_usage='ENCRYPT_DECRYPT')
|
||||
key_id = key['KeyMetadata']['Arn']
|
||||
|
||||
conn.put_key_policy(key_id, 'default', 'new policy')
|
||||
policy = conn.get_key_policy(key_id, 'default')
|
||||
policy['Policy'].should.equal('new policy')
|
||||
|
||||
|
||||
@mock_kms
|
||||
def test_put_key_policy_via_alias_should_not_update():
|
||||
conn = boto.kms.connect_to_region('us-west-2')
|
||||
|
||||
key = conn.create_key(policy='my policy', description='my key1', key_usage='ENCRYPT_DECRYPT')
|
||||
conn.create_alias(alias_name='alias/my-key-alias', target_key_id=key['KeyMetadata']['KeyId'])
|
||||
|
||||
conn.put_key_policy.when.called_with('alias/my-key-alias', 'default', 'new policy').should.throw(JSONResponseError)
|
||||
|
||||
policy = conn.get_key_policy(key['KeyMetadata']['KeyId'], 'default')
|
||||
policy['Policy'].should.equal('my policy')
|
||||
|
||||
|
||||
@mock_kms
|
||||
def test_put_key_policy():
|
||||
conn = boto.kms.connect_to_region('us-west-2')
|
||||
|
||||
key = conn.create_key(policy='my policy', description='my key1', key_usage='ENCRYPT_DECRYPT')
|
||||
conn.put_key_policy(key['KeyMetadata']['Arn'], 'default', 'new policy')
|
||||
|
||||
policy = conn.get_key_policy(key['KeyMetadata']['KeyId'], 'default')
|
||||
policy['Policy'].should.equal('new policy')
|
||||
|
||||
|
||||
@mock_kms
|
||||
def test_list_key_policies():
|
||||
conn = boto.kms.connect_to_region('us-west-2')
|
||||
|
|
|
|||
|
|
@ -354,6 +354,8 @@ def test_list_or_change_tags_for_resource_request():
|
|||
response['ResourceTagSet']['Tags'].should.contain(tag1)
|
||||
response['ResourceTagSet']['Tags'].should.contain(tag2)
|
||||
|
||||
len(response['ResourceTagSet']['Tags']).should.equal(2)
|
||||
|
||||
# Try to remove the tags
|
||||
conn.change_tags_for_resource(
|
||||
ResourceType='healthcheck',
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue