Merge pull request #1316 from terrycain/lotsa_stuff

Lotsa stuff
This commit is contained in:
Steve Pulec 2017-11-04 15:15:20 -04:00 committed by GitHub
commit adf41244cd
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
21 changed files with 2260 additions and 242 deletions

View file

@ -4,6 +4,7 @@ import os
import boto3
from freezegun import freeze_time
import sure # noqa
import uuid
from botocore.exceptions import ClientError
@ -281,11 +282,23 @@ def test_resend_validation_email_invalid():
def test_request_certificate():
client = boto3.client('acm', region_name='eu-central-1')
token = str(uuid.uuid4())
resp = client.request_certificate(
DomainName='google.com',
IdempotencyToken=token,
SubjectAlternativeNames=['google.com', 'www.google.com', 'mail.google.com'],
)
resp.should.contain('CertificateArn')
arn = resp['CertificateArn']
resp = client.request_certificate(
DomainName='google.com',
IdempotencyToken=token,
SubjectAlternativeNames=['google.com', 'www.google.com', 'mail.google.com'],
)
resp['CertificateArn'].should.equal(arn)
@mock_acm
def test_request_certificate_no_san():

View file

@ -28,13 +28,13 @@ except ImportError:
@mock_dynamodb2_deprecated
def test_list_tables():
name = 'TestTable'
#{'schema': }
# Should make tables properly with boto
dynamodb_backend2.create_table(name, schema=[
{u'KeyType': u'HASH', u'AttributeName': u'forum_name'},
{u'KeyType': u'RANGE', u'AttributeName': u'subject'}
])
conn = boto.dynamodb2.connect_to_region(
'us-west-2',
'us-east-1',
aws_access_key_id="ak",
aws_secret_access_key="sk")
assert conn.list_tables()["TableNames"] == [name]
@ -43,6 +43,7 @@ def test_list_tables():
@requires_boto_gte("2.9")
@mock_dynamodb2_deprecated
def test_list_tables_layer_1():
# Should make tables properly with boto
dynamodb_backend2.create_table("test_1", schema=[
{u'KeyType': u'HASH', u'AttributeName': u'name'}
])
@ -50,7 +51,7 @@ def test_list_tables_layer_1():
{u'KeyType': u'HASH', u'AttributeName': u'name'}
])
conn = boto.dynamodb2.connect_to_region(
'us-west-2',
'us-east-1',
aws_access_key_id="ak",
aws_secret_access_key="sk")
@ -88,12 +89,22 @@ def test_list_table_tags():
ProvisionedThroughput={'ReadCapacityUnits':5,'WriteCapacityUnits':5})
table_description = conn.describe_table(TableName=name)
arn = table_description['Table']['TableArn']
tags = [{'Key':'TestTag', 'Value': 'TestValue'}]
conn.tag_resource(ResourceArn=arn,
Tags=tags)
# Tag table
tags = [{'Key': 'TestTag', 'Value': 'TestValue'}, {'Key': 'TestTag2', 'Value': 'TestValue2'}]
conn.tag_resource(ResourceArn=arn, Tags=tags)
# Check tags
resp = conn.list_tags_of_resource(ResourceArn=arn)
assert resp["Tags"] == tags
# Remove 1 tag
conn.untag_resource(ResourceArn=arn, TagKeys=['TestTag'])
# Check tags
resp = conn.list_tags_of_resource(ResourceArn=arn)
assert resp["Tags"] == [{'Key': 'TestTag2', 'Value': 'TestValue2'}]
@requires_boto_gte("2.9")
@mock_dynamodb2
@ -868,3 +879,50 @@ def test_delete_item():
response = table.scan()
assert response['Count'] == 0
@mock_dynamodb2
def test_describe_limits():
client = boto3.client('dynamodb', region_name='eu-central-1')
resp = client.describe_limits()
resp['AccountMaxReadCapacityUnits'].should.equal(20000)
resp['AccountMaxWriteCapacityUnits'].should.equal(20000)
resp['TableMaxWriteCapacityUnits'].should.equal(10000)
resp['TableMaxReadCapacityUnits'].should.equal(10000)
@mock_dynamodb2
def test_set_ttl():
client = boto3.client('dynamodb', region_name='us-east-1')
# Create the DynamoDB table.
client.create_table(
TableName='test1',
AttributeDefinitions=[{'AttributeName': 'client', 'AttributeType': 'S'}, {'AttributeName': 'app', 'AttributeType': 'S'}],
KeySchema=[{'AttributeName': 'client', 'KeyType': 'HASH'}, {'AttributeName': 'app', 'KeyType': 'RANGE'}],
ProvisionedThroughput={'ReadCapacityUnits': 123, 'WriteCapacityUnits': 123}
)
client.update_time_to_live(
TableName='test1',
TimeToLiveSpecification={
'Enabled': True,
'AttributeName': 'expire'
}
)
resp = client.describe_time_to_live(TableName='test1')
resp['TimeToLiveDescription']['TimeToLiveStatus'].should.equal('ENABLED')
resp['TimeToLiveDescription']['AttributeName'].should.equal('expire')
client.update_time_to_live(
TableName='test1',
TimeToLiveSpecification={
'Enabled': False,
'AttributeName': 'expire'
}
)
resp = client.describe_time_to_live(TableName='test1')
resp['TimeToLiveDescription']['TimeToLiveStatus'].should.equal('DISABLED')

View file

@ -54,7 +54,7 @@ def test_create_table():
}
}
conn = boto.dynamodb2.connect_to_region(
'us-west-2',
'us-east-1',
aws_access_key_id="ak",
aws_secret_access_key="sk"
)
@ -425,7 +425,7 @@ def test_get_special_item():
@mock_dynamodb2_deprecated
def test_update_item_remove():
conn = boto.dynamodb2.connect_to_region("us-west-2")
conn = boto.dynamodb2.connect_to_region("us-east-1")
table = Table.create('messages', schema=[
HashKey('username')
])
@ -452,7 +452,7 @@ def test_update_item_remove():
@mock_dynamodb2_deprecated
def test_update_item_set():
conn = boto.dynamodb2.connect_to_region("us-west-2")
conn = boto.dynamodb2.connect_to_region("us-east-1")
table = Table.create('messages', schema=[
HashKey('username')
])

View file

@ -1611,6 +1611,152 @@ def test_update_service_through_cloudformation_should_trigger_replacement():
len(resp['serviceArns']).should.equal(1)
@mock_ec2
@mock_ecs
def test_attributes():
# Combined put, list delete attributes into the same test due to the amount of setup
ecs_client = boto3.client('ecs', region_name='us-east-1')
ec2 = boto3.resource('ec2', region_name='us-east-1')
test_cluster_name = 'test_ecs_cluster'
_ = ecs_client.create_cluster(
clusterName=test_cluster_name
)
test_instance = ec2.create_instances(
ImageId="ami-1234abcd",
MinCount=1,
MaxCount=1,
)[0]
instance_id_document = json.dumps(
ec2_utils.generate_instance_identity_document(test_instance)
)
response = ecs_client.register_container_instance(
cluster=test_cluster_name,
instanceIdentityDocument=instance_id_document
)
response['containerInstance'][
'ec2InstanceId'].should.equal(test_instance.id)
full_arn1 = response['containerInstance']['containerInstanceArn']
test_instance = ec2.create_instances(
ImageId="ami-1234abcd",
MinCount=1,
MaxCount=1,
)[0]
instance_id_document = json.dumps(
ec2_utils.generate_instance_identity_document(test_instance)
)
response = ecs_client.register_container_instance(
cluster=test_cluster_name,
instanceIdentityDocument=instance_id_document
)
response['containerInstance'][
'ec2InstanceId'].should.equal(test_instance.id)
full_arn2 = response['containerInstance']['containerInstanceArn']
partial_arn2 = full_arn2.rsplit('/', 1)[-1]
full_arn2.should_not.equal(full_arn1) # uuid1 isnt unique enough when the pc is fast ;-)
# Ok set instance 1 with 1 attribute, instance 2 with another, and all of them with a 3rd.
ecs_client.put_attributes(
cluster=test_cluster_name,
attributes=[
{'name': 'env', 'value': 'prod'},
{'name': 'attr1', 'value': 'instance1', 'targetId': full_arn1},
{'name': 'attr1', 'value': 'instance2', 'targetId': partial_arn2, 'targetType': 'container-instance'}
]
)
resp = ecs_client.list_attributes(
cluster=test_cluster_name,
targetType='container-instance'
)
attrs = resp['attributes']
len(attrs).should.equal(4)
# Tests that the attrs have been set properly
len(list(filter(lambda item: item['name'] == 'env', attrs))).should.equal(2)
len(list(filter(lambda item: item['name'] == 'attr1' and item['value'] == 'instance1', attrs))).should.equal(1)
ecs_client.delete_attributes(
cluster=test_cluster_name,
attributes=[
{'name': 'attr1', 'value': 'instance2', 'targetId': partial_arn2, 'targetType': 'container-instance'}
]
)
resp = ecs_client.list_attributes(
cluster=test_cluster_name,
targetType='container-instance'
)
attrs = resp['attributes']
len(attrs).should.equal(3)
@mock_ecs
def test_poll_endpoint():
# Combined put, list delete attributes into the same test due to the amount of setup
ecs_client = boto3.client('ecs', region_name='us-east-1')
# Just a placeholder until someone actually wants useless data, just testing it doesnt raise an exception
resp = ecs_client.discover_poll_endpoint(cluster='blah', containerInstance='blah')
resp.should.contain('endpoint')
resp.should.contain('telemetryEndpoint')
@mock_ecs
def test_list_task_definition_families():
client = boto3.client('ecs', region_name='us-east-1')
client.register_task_definition(
family='test_ecs_task',
containerDefinitions=[
{
'name': 'hello_world',
'image': 'docker/hello-world:latest',
'cpu': 1024,
'memory': 400,
'essential': True,
'environment': [{
'name': 'AWS_ACCESS_KEY_ID',
'value': 'SOME_ACCESS_KEY'
}],
'logConfiguration': {'logDriver': 'json-file'}
}
]
)
client.register_task_definition(
family='alt_test_ecs_task',
containerDefinitions=[
{
'name': 'hello_world',
'image': 'docker/hello-world:latest',
'cpu': 1024,
'memory': 400,
'essential': True,
'environment': [{
'name': 'AWS_ACCESS_KEY_ID',
'value': 'SOME_ACCESS_KEY'
}],
'logConfiguration': {'logDriver': 'json-file'}
}
]
)
resp1 = client.list_task_definition_families()
resp2 = client.list_task_definition_families(familyPrefix='alt')
len(resp1['families']).should.equal(2)
len(resp2['families']).should.equal(1)
def _fetch_container_instance_resources(container_instance_description):
remaining_resources = {}
registered_resources = {}

View file

@ -1,11 +1,13 @@
from __future__ import unicode_literals
import os
import boto3
import botocore
from botocore.exceptions import ClientError
from nose.tools import assert_raises
import sure # noqa
from moto import mock_elbv2, mock_ec2
from moto import mock_elbv2, mock_ec2, mock_acm
from moto.elbv2 import elbv2_backends
@mock_elbv2
@ -1030,3 +1032,373 @@ def test_describe_invalid_target_group():
# Check error raises correctly
with assert_raises(ClientError):
conn.describe_target_groups(Names=['invalid'])
@mock_elbv2
def test_describe_account_limits():
client = boto3.client('elbv2', region_name='eu-central-1')
resp = client.describe_account_limits()
resp['Limits'][0].should.contain('Name')
resp['Limits'][0].should.contain('Max')
@mock_elbv2
def test_describe_ssl_policies():
client = boto3.client('elbv2', region_name='eu-central-1')
resp = client.describe_ssl_policies()
len(resp['SslPolicies']).should.equal(5)
resp = client.describe_ssl_policies(Names=['ELBSecurityPolicy-TLS-1-2-2017-01', 'ELBSecurityPolicy-2016-08'])
len(resp['SslPolicies']).should.equal(2)
@mock_elbv2
@mock_ec2
def test_set_ip_address_type():
client = boto3.client('elbv2', region_name='us-east-1')
ec2 = boto3.resource('ec2', region_name='us-east-1')
security_group = ec2.create_security_group(
GroupName='a-security-group', Description='First One')
vpc = ec2.create_vpc(CidrBlock='172.28.7.0/24', InstanceTenancy='default')
subnet1 = ec2.create_subnet(
VpcId=vpc.id,
CidrBlock='172.28.7.192/26',
AvailabilityZone='us-east-1a')
subnet2 = ec2.create_subnet(
VpcId=vpc.id,
CidrBlock='172.28.7.192/26',
AvailabilityZone='us-east-1b')
response = client.create_load_balancer(
Name='my-lb',
Subnets=[subnet1.id, subnet2.id],
SecurityGroups=[security_group.id],
Scheme='internal',
Tags=[{'Key': 'key_name', 'Value': 'a_value'}])
arn = response['LoadBalancers'][0]['LoadBalancerArn']
# Internal LBs cant be dualstack yet
with assert_raises(ClientError):
client.set_ip_address_type(
LoadBalancerArn=arn,
IpAddressType='dualstack'
)
# Create internet facing one
response = client.create_load_balancer(
Name='my-lb2',
Subnets=[subnet1.id, subnet2.id],
SecurityGroups=[security_group.id],
Scheme='internet-facing',
Tags=[{'Key': 'key_name', 'Value': 'a_value'}])
arn = response['LoadBalancers'][0]['LoadBalancerArn']
client.set_ip_address_type(
LoadBalancerArn=arn,
IpAddressType='dualstack'
)
@mock_elbv2
@mock_ec2
def test_set_security_groups():
client = boto3.client('elbv2', region_name='us-east-1')
ec2 = boto3.resource('ec2', region_name='us-east-1')
security_group = ec2.create_security_group(
GroupName='a-security-group', Description='First One')
security_group2 = ec2.create_security_group(
GroupName='b-security-group', Description='Second One')
vpc = ec2.create_vpc(CidrBlock='172.28.7.0/24', InstanceTenancy='default')
subnet1 = ec2.create_subnet(
VpcId=vpc.id,
CidrBlock='172.28.7.192/26',
AvailabilityZone='us-east-1a')
subnet2 = ec2.create_subnet(
VpcId=vpc.id,
CidrBlock='172.28.7.192/26',
AvailabilityZone='us-east-1b')
response = client.create_load_balancer(
Name='my-lb',
Subnets=[subnet1.id, subnet2.id],
SecurityGroups=[security_group.id],
Scheme='internal',
Tags=[{'Key': 'key_name', 'Value': 'a_value'}])
arn = response['LoadBalancers'][0]['LoadBalancerArn']
client.set_security_groups(
LoadBalancerArn=arn,
SecurityGroups=[security_group.id, security_group2.id]
)
resp = client.describe_load_balancers(LoadBalancerArns=[arn])
len(resp['LoadBalancers'][0]['SecurityGroups']).should.equal(2)
with assert_raises(ClientError):
client.set_security_groups(
LoadBalancerArn=arn,
SecurityGroups=['non_existant']
)
@mock_elbv2
@mock_ec2
def test_set_subnets():
client = boto3.client('elbv2', region_name='us-east-1')
ec2 = boto3.resource('ec2', region_name='us-east-1')
security_group = ec2.create_security_group(
GroupName='a-security-group', Description='First One')
vpc = ec2.create_vpc(CidrBlock='172.28.7.0/24', InstanceTenancy='default')
subnet1 = ec2.create_subnet(
VpcId=vpc.id,
CidrBlock='172.28.7.192/26',
AvailabilityZone='us-east-1a')
subnet2 = ec2.create_subnet(
VpcId=vpc.id,
CidrBlock='172.28.7.192/26',
AvailabilityZone='us-east-1b')
subnet3 = ec2.create_subnet(
VpcId=vpc.id,
CidrBlock='172.28.7.192/26',
AvailabilityZone='us-east-1c')
response = client.create_load_balancer(
Name='my-lb',
Subnets=[subnet1.id, subnet2.id],
SecurityGroups=[security_group.id],
Scheme='internal',
Tags=[{'Key': 'key_name', 'Value': 'a_value'}])
arn = response['LoadBalancers'][0]['LoadBalancerArn']
client.set_subnets(
LoadBalancerArn=arn,
Subnets=[subnet1.id, subnet2.id, subnet3.id]
)
resp = client.describe_load_balancers(LoadBalancerArns=[arn])
len(resp['LoadBalancers'][0]['AvailabilityZones']).should.equal(3)
# Only 1 AZ
with assert_raises(ClientError):
client.set_subnets(
LoadBalancerArn=arn,
Subnets=[subnet1.id]
)
# Multiple subnets in same AZ
with assert_raises(ClientError):
client.set_subnets(
LoadBalancerArn=arn,
Subnets=[subnet1.id, subnet2.id, subnet2.id]
)
@mock_elbv2
@mock_ec2
def test_set_subnets():
client = boto3.client('elbv2', region_name='us-east-1')
ec2 = boto3.resource('ec2', region_name='us-east-1')
security_group = ec2.create_security_group(
GroupName='a-security-group', Description='First One')
vpc = ec2.create_vpc(CidrBlock='172.28.7.0/24', InstanceTenancy='default')
subnet1 = ec2.create_subnet(
VpcId=vpc.id,
CidrBlock='172.28.7.192/26',
AvailabilityZone='us-east-1a')
subnet2 = ec2.create_subnet(
VpcId=vpc.id,
CidrBlock='172.28.7.192/26',
AvailabilityZone='us-east-1b')
response = client.create_load_balancer(
Name='my-lb',
Subnets=[subnet1.id, subnet2.id],
SecurityGroups=[security_group.id],
Scheme='internal',
Tags=[{'Key': 'key_name', 'Value': 'a_value'}])
arn = response['LoadBalancers'][0]['LoadBalancerArn']
client.modify_load_balancer_attributes(
LoadBalancerArn=arn,
Attributes=[{'Key': 'idle_timeout.timeout_seconds', 'Value': '600'}]
)
# Check its 600 not 60
response = client.describe_load_balancer_attributes(
LoadBalancerArn=arn
)
idle_timeout = list(filter(lambda item: item['Key'] == 'idle_timeout.timeout_seconds', response['Attributes']))[0]
idle_timeout['Value'].should.equal('600')
@mock_elbv2
@mock_ec2
def test_modify_target_group():
client = boto3.client('elbv2', region_name='us-east-1')
ec2 = boto3.resource('ec2', region_name='us-east-1')
vpc = ec2.create_vpc(CidrBlock='172.28.7.0/24', InstanceTenancy='default')
response = client.create_target_group(
Name='a-target',
Protocol='HTTP',
Port=8080,
VpcId=vpc.id,
HealthCheckProtocol='HTTP',
HealthCheckPort='8080',
HealthCheckPath='/',
HealthCheckIntervalSeconds=5,
HealthCheckTimeoutSeconds=5,
HealthyThresholdCount=5,
UnhealthyThresholdCount=2,
Matcher={'HttpCode': '200'})
arn = response.get('TargetGroups')[0]['TargetGroupArn']
client.modify_target_group(
TargetGroupArn=arn,
HealthCheckProtocol='HTTPS',
HealthCheckPort='8081',
HealthCheckPath='/status',
HealthCheckIntervalSeconds=10,
HealthCheckTimeoutSeconds=10,
HealthyThresholdCount=10,
UnhealthyThresholdCount=4,
Matcher={'HttpCode': '200-399'}
)
response = client.describe_target_groups(
TargetGroupArns=[arn]
)
response['TargetGroups'][0]['Matcher']['HttpCode'].should.equal('200-399')
response['TargetGroups'][0]['HealthCheckIntervalSeconds'].should.equal(10)
response['TargetGroups'][0]['HealthCheckPath'].should.equal('/status')
response['TargetGroups'][0]['HealthCheckPort'].should.equal('8081')
response['TargetGroups'][0]['HealthCheckProtocol'].should.equal('HTTPS')
response['TargetGroups'][0]['HealthCheckTimeoutSeconds'].should.equal(10)
response['TargetGroups'][0]['HealthyThresholdCount'].should.equal(10)
response['TargetGroups'][0]['UnhealthyThresholdCount'].should.equal(4)
@mock_elbv2
@mock_ec2
@mock_acm
def test_modify_listener_http_to_https():
client = boto3.client('elbv2', region_name='eu-central-1')
acm = boto3.client('acm', region_name='eu-central-1')
ec2 = boto3.resource('ec2', region_name='eu-central-1')
security_group = ec2.create_security_group(
GroupName='a-security-group', Description='First One')
vpc = ec2.create_vpc(CidrBlock='172.28.7.0/24', InstanceTenancy='default')
subnet1 = ec2.create_subnet(
VpcId=vpc.id,
CidrBlock='172.28.7.192/26',
AvailabilityZone='eu-central-1a')
subnet2 = ec2.create_subnet(
VpcId=vpc.id,
CidrBlock='172.28.7.192/26',
AvailabilityZone='eu-central-1b')
response = client.create_load_balancer(
Name='my-lb',
Subnets=[subnet1.id, subnet2.id],
SecurityGroups=[security_group.id],
Scheme='internal',
Tags=[{'Key': 'key_name', 'Value': 'a_value'}])
load_balancer_arn = response.get('LoadBalancers')[0].get('LoadBalancerArn')
response = client.create_target_group(
Name='a-target',
Protocol='HTTP',
Port=8080,
VpcId=vpc.id,
HealthCheckProtocol='HTTP',
HealthCheckPort='8080',
HealthCheckPath='/',
HealthCheckIntervalSeconds=5,
HealthCheckTimeoutSeconds=5,
HealthyThresholdCount=5,
UnhealthyThresholdCount=2,
Matcher={'HttpCode': '200'})
target_group = response.get('TargetGroups')[0]
target_group_arn = target_group['TargetGroupArn']
# Plain HTTP listener
response = client.create_listener(
LoadBalancerArn=load_balancer_arn,
Protocol='HTTP',
Port=80,
DefaultActions=[{'Type': 'forward', 'TargetGroupArn': target_group_arn}]
)
listener_arn = response['Listeners'][0]['ListenerArn']
response = acm.request_certificate(
DomainName='google.com',
SubjectAlternativeNames=['google.com', 'www.google.com', 'mail.google.com'],
)
google_arn = response['CertificateArn']
response = acm.request_certificate(
DomainName='yahoo.com',
SubjectAlternativeNames=['yahoo.com', 'www.yahoo.com', 'mail.yahoo.com'],
)
yahoo_arn = response['CertificateArn']
response = client.modify_listener(
ListenerArn=listener_arn,
Port=443,
Protocol='HTTPS',
SslPolicy='ELBSecurityPolicy-TLS-1-2-2017-01',
Certificates=[
{'CertificateArn': google_arn, 'IsDefault': False},
{'CertificateArn': yahoo_arn, 'IsDefault': True}
],
DefaultActions=[
{'Type': 'forward', 'TargetGroupArn': target_group_arn}
]
)
response['Listeners'][0]['Port'].should.equal(443)
response['Listeners'][0]['Protocol'].should.equal('HTTPS')
response['Listeners'][0]['SslPolicy'].should.equal('ELBSecurityPolicy-TLS-1-2-2017-01')
len(response['Listeners'][0]['Certificates']).should.equal(2)
# Check default cert, can't do this in server mode
if os.environ.get('TEST_SERVER_MODE', 'false').lower() == 'false':
listener = elbv2_backends['eu-central-1'].load_balancers[load_balancer_arn].listeners[listener_arn]
listener.certificate.should.equal(yahoo_arn)
# No default cert
with assert_raises(ClientError):
client.modify_listener(
ListenerArn=listener_arn,
Port=443,
Protocol='HTTPS',
SslPolicy='ELBSecurityPolicy-TLS-1-2-2017-01',
Certificates=[
{'CertificateArn': google_arn, 'IsDefault': False}
],
DefaultActions=[
{'Type': 'forward', 'TargetGroupArn': target_group_arn}
]
)
# Bad cert
with assert_raises(ClientError):
client.modify_listener(
ListenerArn=listener_arn,
Port=443,
Protocol='HTTPS',
SslPolicy='ELBSecurityPolicy-TLS-1-2-2017-01',
Certificates=[
{'CertificateArn': 'lalala', 'IsDefault': True}
],
DefaultActions=[
{'Type': 'forward', 'TargetGroupArn': target_group_arn}
]
)

View file

@ -3,6 +3,8 @@ import random
import boto3
from moto.events import mock_events
from botocore.exceptions import ClientError
from nose.tools import assert_raises
RULES = [
@ -171,11 +173,36 @@ def test_remove_targets():
assert(targets_before - 1 == targets_after)
if __name__ == '__main__':
test_list_rules()
test_describe_rule()
test_enable_disable_rule()
test_list_rule_names_by_target()
test_list_rules()
test_list_targets_by_rule()
test_remove_targets()
@mock_events
def test_permissions():
client = boto3.client('events', 'eu-central-1')
client.put_permission(Action='PutEvents', Principal='111111111111', StatementId='Account1')
client.put_permission(Action='PutEvents', Principal='222222222222', StatementId='Account2')
resp = client.describe_event_bus()
assert len(resp['Policy']['Statement']) == 2
client.remove_permission(StatementId='Account2')
resp = client.describe_event_bus()
assert len(resp['Policy']['Statement']) == 1
assert resp['Policy']['Statement'][0]['Sid'] == 'Account1'
@mock_events
def test_put_events():
client = boto3.client('events', 'eu-central-1')
event = {
"Source": "com.mycompany.myapp",
"Detail": '{"key1": "value3", "key2": "value4"}',
"Resources": ["resource1", "resource2"],
"DetailType": "myDetailType"
}
client.put_events(Entries=[event])
# Boto3 would error if it didn't return 200 OK
with assert_raises(ClientError):
client.put_events(Entries=[event]*20)

View file

@ -1,5 +1,6 @@
# -*- coding: utf-8 -*-
from __future__ import unicode_literals
import os
import boto
import boto3
@ -8,14 +9,18 @@ from botocore.exceptions import ClientError
from boto.exception import SQSError
from boto.sqs.message import RawMessage, Message
from freezegun import freeze_time
import base64
import json
import sure # noqa
import time
import uuid
from moto import settings, mock_sqs, mock_sqs_deprecated
from tests.helpers import requires_boto_gte
import tests.backport_assert_raises # noqa
from nose.tools import assert_raises
from nose import SkipTest
@mock_sqs
@ -93,8 +98,6 @@ def test_message_send_without_attributes():
msg.get('MD5OfMessageBody').should.equal(
'58fd9edd83341c29f1aebba81c31e257')
msg.shouldnt.have.key('MD5OfMessageAttributes')
msg.get('ResponseMetadata', {}).get('RequestId').should.equal(
'27daac76-34dd-47df-bd01-1f6e873584a0')
msg.get('MessageId').should_not.contain(' \n')
messages = queue.receive_messages()
@ -118,8 +121,6 @@ def test_message_send_with_attributes():
'58fd9edd83341c29f1aebba81c31e257')
msg.get('MD5OfMessageAttributes').should.equal(
'235c5c510d26fb653d073faed50ae77c')
msg.get('ResponseMetadata', {}).get('RequestId').should.equal(
'27daac76-34dd-47df-bd01-1f6e873584a0')
msg.get('MessageId').should_not.contain(' \n')
messages = queue.receive_messages()
@ -143,8 +144,6 @@ def test_message_with_complex_attributes():
'58fd9edd83341c29f1aebba81c31e257')
msg.get('MD5OfMessageAttributes').should.equal(
'8ae21a7957029ef04146b42aeaa18a22')
msg.get('ResponseMetadata', {}).get('RequestId').should.equal(
'27daac76-34dd-47df-bd01-1f6e873584a0')
msg.get('MessageId').should_not.contain(' \n')
messages = queue.receive_messages()
@ -755,3 +754,181 @@ def test_delete_message_after_visibility_timeout():
m1_retrieved.delete()
assert new_queue.count() == 0
@mock_sqs
def test_batch_change_message_visibility():
if os.environ.get('TEST_SERVER_MODE', 'false').lower() == 'true':
raise SkipTest('Cant manipulate time in server mode')
with freeze_time("2015-01-01 12:00:00"):
sqs = boto3.client('sqs', region_name='us-east-1')
resp = sqs.create_queue(
QueueName='test-dlr-queue.fifo',
Attributes={'FifoQueue': 'true'}
)
queue_url = resp['QueueUrl']
sqs.send_message(QueueUrl=queue_url, MessageBody='msg1')
sqs.send_message(QueueUrl=queue_url, MessageBody='msg2')
sqs.send_message(QueueUrl=queue_url, MessageBody='msg3')
with freeze_time("2015-01-01 12:01:00"):
receive_resp = sqs.receive_message(QueueUrl=queue_url, MaxNumberOfMessages=2)
len(receive_resp['Messages']).should.equal(2)
handles = [item['ReceiptHandle'] for item in receive_resp['Messages']]
entries = [{'Id': str(uuid.uuid4()), 'ReceiptHandle': handle, 'VisibilityTimeout': 43200} for handle in handles]
resp = sqs.change_message_visibility_batch(QueueUrl=queue_url, Entries=entries)
len(resp['Successful']).should.equal(2)
with freeze_time("2015-01-01 14:00:00"):
resp = sqs.receive_message(QueueUrl=queue_url, MaxNumberOfMessages=3)
len(resp['Messages']).should.equal(1)
with freeze_time("2015-01-01 16:00:00"):
resp = sqs.receive_message(QueueUrl=queue_url, MaxNumberOfMessages=3)
len(resp['Messages']).should.equal(1)
with freeze_time("2015-01-02 12:00:00"):
resp = sqs.receive_message(QueueUrl=queue_url, MaxNumberOfMessages=3)
len(resp['Messages']).should.equal(3)
@mock_sqs
def test_permissions():
client = boto3.client('sqs', region_name='us-east-1')
resp = client.create_queue(
QueueName='test-dlr-queue.fifo',
Attributes={'FifoQueue': 'true'}
)
queue_url = resp['QueueUrl']
client.add_permission(QueueUrl=queue_url, Label='account1', AWSAccountIds=['111111111111'], Actions=['*'])
client.add_permission(QueueUrl=queue_url, Label='account2', AWSAccountIds=['222211111111'], Actions=['SendMessage'])
with assert_raises(ClientError):
client.add_permission(QueueUrl=queue_url, Label='account2', AWSAccountIds=['222211111111'], Actions=['SomeRubbish'])
client.remove_permission(QueueUrl=queue_url, Label='account2')
with assert_raises(ClientError):
client.remove_permission(QueueUrl=queue_url, Label='non_existant')
@mock_sqs
def test_tags():
client = boto3.client('sqs', region_name='us-east-1')
resp = client.create_queue(
QueueName='test-dlr-queue.fifo',
Attributes={'FifoQueue': 'true'}
)
queue_url = resp['QueueUrl']
client.tag_queue(
QueueUrl=queue_url,
Tags={
'test1': 'value1',
'test2': 'value2',
}
)
resp = client.list_queue_tags(QueueUrl=queue_url)
resp['Tags'].should.contain('test1')
resp['Tags'].should.contain('test2')
client.untag_queue(
QueueUrl=queue_url,
TagKeys=['test2']
)
resp = client.list_queue_tags(QueueUrl=queue_url)
resp['Tags'].should.contain('test1')
resp['Tags'].should_not.contain('test2')
@mock_sqs
def test_create_fifo_queue_with_dlq():
sqs = boto3.client('sqs', region_name='us-east-1')
resp = sqs.create_queue(
QueueName='test-dlr-queue.fifo',
Attributes={'FifoQueue': 'true'}
)
queue_url1 = resp['QueueUrl']
queue_arn1 = sqs.get_queue_attributes(QueueUrl=queue_url1)['Attributes']['QueueArn']
resp = sqs.create_queue(
QueueName='test-dlr-queue',
Attributes={'FifoQueue': 'false'}
)
queue_url2 = resp['QueueUrl']
queue_arn2 = sqs.get_queue_attributes(QueueUrl=queue_url2)['Attributes']['QueueArn']
sqs.create_queue(
QueueName='test-queue.fifo',
Attributes={
'FifoQueue': 'true',
'RedrivePolicy': json.dumps({'deadLetterTargetArn': queue_arn1, 'maxReceiveCount': 2})
}
)
# Cant have fifo queue with non fifo DLQ
with assert_raises(ClientError):
sqs.create_queue(
QueueName='test-queue2.fifo',
Attributes={
'FifoQueue': 'true',
'RedrivePolicy': json.dumps({'deadLetterTargetArn': queue_arn2, 'maxReceiveCount': 2})
}
)
@mock_sqs
def test_queue_with_dlq():
if os.environ.get('TEST_SERVER_MODE', 'false').lower() == 'true':
raise SkipTest('Cant manipulate time in server mode')
sqs = boto3.client('sqs', region_name='us-east-1')
with freeze_time("2015-01-01 12:00:00"):
resp = sqs.create_queue(
QueueName='test-dlr-queue.fifo',
Attributes={'FifoQueue': 'true'}
)
queue_url1 = resp['QueueUrl']
queue_arn1 = sqs.get_queue_attributes(QueueUrl=queue_url1)['Attributes']['QueueArn']
resp = sqs.create_queue(
QueueName='test-queue.fifo',
Attributes={
'FifoQueue': 'true',
'RedrivePolicy': json.dumps({'deadLetterTargetArn': queue_arn1, 'maxReceiveCount': 2})
}
)
queue_url2 = resp['QueueUrl']
sqs.send_message(QueueUrl=queue_url2, MessageBody='msg1')
sqs.send_message(QueueUrl=queue_url2, MessageBody='msg2')
with freeze_time("2015-01-01 13:00:00"):
resp = sqs.receive_message(QueueUrl=queue_url2, VisibilityTimeout=30, WaitTimeSeconds=0)
resp['Messages'][0]['Body'].should.equal('msg1')
with freeze_time("2015-01-01 13:01:00"):
resp = sqs.receive_message(QueueUrl=queue_url2, VisibilityTimeout=30, WaitTimeSeconds=0)
resp['Messages'][0]['Body'].should.equal('msg1')
with freeze_time("2015-01-01 13:02:00"):
resp = sqs.receive_message(QueueUrl=queue_url2, VisibilityTimeout=30, WaitTimeSeconds=0)
len(resp['Messages']).should.equal(1)
resp = sqs.receive_message(QueueUrl=queue_url1, VisibilityTimeout=30, WaitTimeSeconds=0)
resp['Messages'][0]['Body'].should.equal('msg1')
# Might as well test list source queues
resp = sqs.list_dead_letter_source_queues(QueueUrl=queue_url1)
resp['queueUrls'][0].should.equal(queue_url2)