Merge remote-tracking branch 'upstream/master'

This commit is contained in:
Stephan 2019-04-29 12:11:47 +02:00
commit 5804441d38
104 changed files with 26177 additions and 18562 deletions

File diff suppressed because it is too large Load diff

File diff suppressed because it is too large Load diff

View file

@ -1,362 +1,367 @@
from __future__ import unicode_literals
# Ensure 'assert_raises' context manager support for Python 2.6
import tests.backport_assert_raises
from nose.tools import assert_raises
import boto3
from botocore.exceptions import ClientError
import boto
import boto.cloudformation
import boto.ec2
from boto.exception import EC2ResponseError
import sure # noqa
from moto import mock_ec2, mock_cloudformation_deprecated, mock_ec2_deprecated
from tests.helpers import requires_boto_gte
from tests.test_cloudformation.fixtures import vpc_eni
import json
@mock_ec2_deprecated
def test_elastic_network_interfaces():
conn = boto.connect_vpc('the_key', 'the_secret')
vpc = conn.create_vpc("10.0.0.0/16")
subnet = conn.create_subnet(vpc.id, "10.0.0.0/18")
with assert_raises(EC2ResponseError) as ex:
eni = conn.create_network_interface(subnet.id, dry_run=True)
ex.exception.error_code.should.equal('DryRunOperation')
ex.exception.status.should.equal(400)
ex.exception.message.should.equal(
'An error occurred (DryRunOperation) when calling the CreateNetworkInterface operation: Request would have succeeded, but DryRun flag is set')
eni = conn.create_network_interface(subnet.id)
all_enis = conn.get_all_network_interfaces()
all_enis.should.have.length_of(1)
eni = all_enis[0]
eni.groups.should.have.length_of(0)
eni.private_ip_addresses.should.have.length_of(0)
with assert_raises(EC2ResponseError) as ex:
conn.delete_network_interface(eni.id, dry_run=True)
ex.exception.error_code.should.equal('DryRunOperation')
ex.exception.status.should.equal(400)
ex.exception.message.should.equal(
'An error occurred (DryRunOperation) when calling the DeleteNetworkInterface operation: Request would have succeeded, but DryRun flag is set')
conn.delete_network_interface(eni.id)
all_enis = conn.get_all_network_interfaces()
all_enis.should.have.length_of(0)
with assert_raises(EC2ResponseError) as cm:
conn.delete_network_interface(eni.id)
cm.exception.error_code.should.equal('InvalidNetworkInterfaceID.NotFound')
cm.exception.status.should.equal(400)
cm.exception.request_id.should_not.be.none
@mock_ec2_deprecated
def test_elastic_network_interfaces_subnet_validation():
conn = boto.connect_vpc('the_key', 'the_secret')
with assert_raises(EC2ResponseError) as cm:
conn.create_network_interface("subnet-abcd1234")
cm.exception.error_code.should.equal('InvalidSubnetID.NotFound')
cm.exception.status.should.equal(400)
cm.exception.request_id.should_not.be.none
@mock_ec2_deprecated
def test_elastic_network_interfaces_with_private_ip():
conn = boto.connect_vpc('the_key', 'the_secret')
vpc = conn.create_vpc("10.0.0.0/16")
subnet = conn.create_subnet(vpc.id, "10.0.0.0/18")
private_ip = "54.0.0.1"
eni = conn.create_network_interface(subnet.id, private_ip)
all_enis = conn.get_all_network_interfaces()
all_enis.should.have.length_of(1)
eni = all_enis[0]
eni.groups.should.have.length_of(0)
eni.private_ip_addresses.should.have.length_of(1)
eni.private_ip_addresses[0].private_ip_address.should.equal(private_ip)
@mock_ec2_deprecated
def test_elastic_network_interfaces_with_groups():
conn = boto.connect_vpc('the_key', 'the_secret')
vpc = conn.create_vpc("10.0.0.0/16")
subnet = conn.create_subnet(vpc.id, "10.0.0.0/18")
security_group1 = conn.create_security_group(
'test security group #1', 'this is a test security group')
security_group2 = conn.create_security_group(
'test security group #2', 'this is a test security group')
conn.create_network_interface(
subnet.id, groups=[security_group1.id, security_group2.id])
all_enis = conn.get_all_network_interfaces()
all_enis.should.have.length_of(1)
eni = all_enis[0]
eni.groups.should.have.length_of(2)
set([group.id for group in eni.groups]).should.equal(
set([security_group1.id, security_group2.id]))
@requires_boto_gte("2.12.0")
@mock_ec2_deprecated
def test_elastic_network_interfaces_modify_attribute():
conn = boto.connect_vpc('the_key', 'the_secret')
vpc = conn.create_vpc("10.0.0.0/16")
subnet = conn.create_subnet(vpc.id, "10.0.0.0/18")
security_group1 = conn.create_security_group(
'test security group #1', 'this is a test security group')
security_group2 = conn.create_security_group(
'test security group #2', 'this is a test security group')
conn.create_network_interface(subnet.id, groups=[security_group1.id])
all_enis = conn.get_all_network_interfaces()
all_enis.should.have.length_of(1)
eni = all_enis[0]
eni.groups.should.have.length_of(1)
eni.groups[0].id.should.equal(security_group1.id)
with assert_raises(EC2ResponseError) as ex:
conn.modify_network_interface_attribute(
eni.id, 'groupset', [security_group2.id], dry_run=True)
ex.exception.error_code.should.equal('DryRunOperation')
ex.exception.status.should.equal(400)
ex.exception.message.should.equal(
'An error occurred (DryRunOperation) when calling the ModifyNetworkInterface operation: Request would have succeeded, but DryRun flag is set')
conn.modify_network_interface_attribute(
eni.id, 'groupset', [security_group2.id])
all_enis = conn.get_all_network_interfaces()
all_enis.should.have.length_of(1)
eni = all_enis[0]
eni.groups.should.have.length_of(1)
eni.groups[0].id.should.equal(security_group2.id)
@mock_ec2_deprecated
def test_elastic_network_interfaces_filtering():
conn = boto.connect_vpc('the_key', 'the_secret')
vpc = conn.create_vpc("10.0.0.0/16")
subnet = conn.create_subnet(vpc.id, "10.0.0.0/18")
security_group1 = conn.create_security_group(
'test security group #1', 'this is a test security group')
security_group2 = conn.create_security_group(
'test security group #2', 'this is a test security group')
eni1 = conn.create_network_interface(
subnet.id, groups=[security_group1.id, security_group2.id])
eni2 = conn.create_network_interface(
subnet.id, groups=[security_group1.id])
eni3 = conn.create_network_interface(subnet.id)
all_enis = conn.get_all_network_interfaces()
all_enis.should.have.length_of(3)
# Filter by NetworkInterfaceId
enis_by_id = conn.get_all_network_interfaces([eni1.id])
enis_by_id.should.have.length_of(1)
set([eni.id for eni in enis_by_id]).should.equal(set([eni1.id]))
# Filter by ENI ID
enis_by_id = conn.get_all_network_interfaces(
filters={'network-interface-id': eni1.id})
enis_by_id.should.have.length_of(1)
set([eni.id for eni in enis_by_id]).should.equal(set([eni1.id]))
# Filter by Security Group
enis_by_group = conn.get_all_network_interfaces(
filters={'group-id': security_group1.id})
enis_by_group.should.have.length_of(2)
set([eni.id for eni in enis_by_group]).should.equal(set([eni1.id, eni2.id]))
# Filter by ENI ID and Security Group
enis_by_group = conn.get_all_network_interfaces(
filters={'network-interface-id': eni1.id, 'group-id': security_group1.id})
enis_by_group.should.have.length_of(1)
set([eni.id for eni in enis_by_group]).should.equal(set([eni1.id]))
# Unsupported filter
conn.get_all_network_interfaces.when.called_with(
filters={'not-implemented-filter': 'foobar'}).should.throw(NotImplementedError)
@mock_ec2
def test_elastic_network_interfaces_get_by_tag_name():
ec2 = boto3.resource('ec2', region_name='us-west-2')
ec2_client = boto3.client('ec2', region_name='us-west-2')
vpc = ec2.create_vpc(CidrBlock='10.0.0.0/16')
subnet = ec2.create_subnet(
VpcId=vpc.id, CidrBlock='10.0.0.0/24', AvailabilityZone='us-west-2a')
eni1 = ec2.create_network_interface(
SubnetId=subnet.id, PrivateIpAddress='10.0.10.5')
with assert_raises(ClientError) as ex:
eni1.create_tags(Tags=[{'Key': 'Name', 'Value': 'eni1'}], DryRun=True)
ex.exception.response['Error']['Code'].should.equal('DryRunOperation')
ex.exception.response['ResponseMetadata'][
'HTTPStatusCode'].should.equal(400)
ex.exception.response['Error']['Message'].should.equal(
'An error occurred (DryRunOperation) when calling the CreateTags operation: Request would have succeeded, but DryRun flag is set')
eni1.create_tags(Tags=[{'Key': 'Name', 'Value': 'eni1'}])
# The status of the new interface should be 'available'
waiter = ec2_client.get_waiter('network_interface_available')
waiter.wait(NetworkInterfaceIds=[eni1.id])
filters = [{'Name': 'tag:Name', 'Values': ['eni1']}]
enis = list(ec2.network_interfaces.filter(Filters=filters))
enis.should.have.length_of(1)
filters = [{'Name': 'tag:Name', 'Values': ['wrong-name']}]
enis = list(ec2.network_interfaces.filter(Filters=filters))
enis.should.have.length_of(0)
@mock_ec2
def test_elastic_network_interfaces_get_by_availability_zone():
ec2 = boto3.resource('ec2', region_name='us-west-2')
ec2_client = boto3.client('ec2', region_name='us-west-2')
vpc = ec2.create_vpc(CidrBlock='10.0.0.0/16')
subnet1 = ec2.create_subnet(
VpcId=vpc.id, CidrBlock='10.0.0.0/24', AvailabilityZone='us-west-2a')
subnet2 = ec2.create_subnet(
VpcId=vpc.id, CidrBlock='10.0.1.0/24', AvailabilityZone='us-west-2b')
eni1 = ec2.create_network_interface(
SubnetId=subnet1.id, PrivateIpAddress='10.0.0.15')
eni2 = ec2.create_network_interface(
SubnetId=subnet2.id, PrivateIpAddress='10.0.1.15')
# The status of the new interface should be 'available'
waiter = ec2_client.get_waiter('network_interface_available')
waiter.wait(NetworkInterfaceIds=[eni1.id, eni2.id])
filters = [{'Name': 'availability-zone', 'Values': ['us-west-2a']}]
enis = list(ec2.network_interfaces.filter(Filters=filters))
enis.should.have.length_of(1)
filters = [{'Name': 'availability-zone', 'Values': ['us-west-2c']}]
enis = list(ec2.network_interfaces.filter(Filters=filters))
enis.should.have.length_of(0)
@mock_ec2
def test_elastic_network_interfaces_get_by_private_ip():
ec2 = boto3.resource('ec2', region_name='us-west-2')
ec2_client = boto3.client('ec2', region_name='us-west-2')
vpc = ec2.create_vpc(CidrBlock='10.0.0.0/16')
subnet = ec2.create_subnet(
VpcId=vpc.id, CidrBlock='10.0.0.0/24', AvailabilityZone='us-west-2a')
eni1 = ec2.create_network_interface(
SubnetId=subnet.id, PrivateIpAddress='10.0.10.5')
# The status of the new interface should be 'available'
waiter = ec2_client.get_waiter('network_interface_available')
waiter.wait(NetworkInterfaceIds=[eni1.id])
filters = [{'Name': 'private-ip-address', 'Values': ['10.0.10.5']}]
enis = list(ec2.network_interfaces.filter(Filters=filters))
enis.should.have.length_of(1)
filters = [{'Name': 'private-ip-address', 'Values': ['10.0.10.10']}]
enis = list(ec2.network_interfaces.filter(Filters=filters))
enis.should.have.length_of(0)
filters = [{'Name': 'addresses.private-ip-address', 'Values': ['10.0.10.5']}]
enis = list(ec2.network_interfaces.filter(Filters=filters))
enis.should.have.length_of(1)
filters = [{'Name': 'addresses.private-ip-address', 'Values': ['10.0.10.10']}]
enis = list(ec2.network_interfaces.filter(Filters=filters))
enis.should.have.length_of(0)
@mock_ec2
def test_elastic_network_interfaces_get_by_vpc_id():
ec2 = boto3.resource('ec2', region_name='us-west-2')
ec2_client = boto3.client('ec2', region_name='us-west-2')
vpc = ec2.create_vpc(CidrBlock='10.0.0.0/16')
subnet = ec2.create_subnet(
VpcId=vpc.id, CidrBlock='10.0.0.0/24', AvailabilityZone='us-west-2a')
eni1 = ec2.create_network_interface(
SubnetId=subnet.id, PrivateIpAddress='10.0.10.5')
# The status of the new interface should be 'available'
waiter = ec2_client.get_waiter('network_interface_available')
waiter.wait(NetworkInterfaceIds=[eni1.id])
filters = [{'Name': 'vpc-id', 'Values': [subnet.vpc_id]}]
enis = list(ec2.network_interfaces.filter(Filters=filters))
enis.should.have.length_of(1)
filters = [{'Name': 'vpc-id', 'Values': ['vpc-aaaa1111']}]
enis = list(ec2.network_interfaces.filter(Filters=filters))
enis.should.have.length_of(0)
@mock_ec2
def test_elastic_network_interfaces_get_by_subnet_id():
ec2 = boto3.resource('ec2', region_name='us-west-2')
ec2_client = boto3.client('ec2', region_name='us-west-2')
vpc = ec2.create_vpc(CidrBlock='10.0.0.0/16')
subnet = ec2.create_subnet(
VpcId=vpc.id, CidrBlock='10.0.0.0/24', AvailabilityZone='us-west-2a')
eni1 = ec2.create_network_interface(
SubnetId=subnet.id, PrivateIpAddress='10.0.10.5')
# The status of the new interface should be 'available'
waiter = ec2_client.get_waiter('network_interface_available')
waiter.wait(NetworkInterfaceIds=[eni1.id])
filters = [{'Name': 'subnet-id', 'Values': [subnet.id]}]
enis = list(ec2.network_interfaces.filter(Filters=filters))
enis.should.have.length_of(1)
filters = [{'Name': 'subnet-id', 'Values': ['subnet-aaaa1111']}]
enis = list(ec2.network_interfaces.filter(Filters=filters))
enis.should.have.length_of(0)
@mock_ec2_deprecated
@mock_cloudformation_deprecated
def test_elastic_network_interfaces_cloudformation():
template = vpc_eni.template
template_json = json.dumps(template)
conn = boto.cloudformation.connect_to_region("us-west-1")
conn.create_stack(
"test_stack",
template_body=template_json,
)
ec2_conn = boto.ec2.connect_to_region("us-west-1")
eni = ec2_conn.get_all_network_interfaces()[0]
stack = conn.describe_stacks()[0]
resources = stack.describe_resources()
cfn_eni = [resource for resource in resources if resource.resource_type ==
'AWS::EC2::NetworkInterface'][0]
cfn_eni.physical_resource_id.should.equal(eni.id)
from __future__ import unicode_literals
# Ensure 'assert_raises' context manager support for Python 2.6
import tests.backport_assert_raises
from nose.tools import assert_raises
import boto3
from botocore.exceptions import ClientError
import boto
import boto.cloudformation
import boto.ec2
from boto.exception import EC2ResponseError
import sure # noqa
from moto import mock_ec2, mock_cloudformation_deprecated, mock_ec2_deprecated
from tests.helpers import requires_boto_gte
from tests.test_cloudformation.fixtures import vpc_eni
import json
@mock_ec2_deprecated
def test_elastic_network_interfaces():
conn = boto.connect_vpc('the_key', 'the_secret')
vpc = conn.create_vpc("10.0.0.0/16")
subnet = conn.create_subnet(vpc.id, "10.0.0.0/18")
with assert_raises(EC2ResponseError) as ex:
eni = conn.create_network_interface(subnet.id, dry_run=True)
ex.exception.error_code.should.equal('DryRunOperation')
ex.exception.status.should.equal(400)
ex.exception.message.should.equal(
'An error occurred (DryRunOperation) when calling the CreateNetworkInterface operation: Request would have succeeded, but DryRun flag is set')
eni = conn.create_network_interface(subnet.id)
all_enis = conn.get_all_network_interfaces()
all_enis.should.have.length_of(1)
eni = all_enis[0]
eni.groups.should.have.length_of(0)
eni.private_ip_addresses.should.have.length_of(1)
eni.private_ip_addresses[0].private_ip_address.startswith('10.').should.be.true
with assert_raises(EC2ResponseError) as ex:
conn.delete_network_interface(eni.id, dry_run=True)
ex.exception.error_code.should.equal('DryRunOperation')
ex.exception.status.should.equal(400)
ex.exception.message.should.equal(
'An error occurred (DryRunOperation) when calling the DeleteNetworkInterface operation: Request would have succeeded, but DryRun flag is set')
conn.delete_network_interface(eni.id)
all_enis = conn.get_all_network_interfaces()
all_enis.should.have.length_of(0)
with assert_raises(EC2ResponseError) as cm:
conn.delete_network_interface(eni.id)
cm.exception.error_code.should.equal('InvalidNetworkInterfaceID.NotFound')
cm.exception.status.should.equal(400)
cm.exception.request_id.should_not.be.none
@mock_ec2_deprecated
def test_elastic_network_interfaces_subnet_validation():
conn = boto.connect_vpc('the_key', 'the_secret')
with assert_raises(EC2ResponseError) as cm:
conn.create_network_interface("subnet-abcd1234")
cm.exception.error_code.should.equal('InvalidSubnetID.NotFound')
cm.exception.status.should.equal(400)
cm.exception.request_id.should_not.be.none
@mock_ec2_deprecated
def test_elastic_network_interfaces_with_private_ip():
conn = boto.connect_vpc('the_key', 'the_secret')
vpc = conn.create_vpc("10.0.0.0/16")
subnet = conn.create_subnet(vpc.id, "10.0.0.0/18")
private_ip = "54.0.0.1"
eni = conn.create_network_interface(subnet.id, private_ip)
all_enis = conn.get_all_network_interfaces()
all_enis.should.have.length_of(1)
eni = all_enis[0]
eni.groups.should.have.length_of(0)
eni.private_ip_addresses.should.have.length_of(1)
eni.private_ip_addresses[0].private_ip_address.should.equal(private_ip)
@mock_ec2_deprecated
def test_elastic_network_interfaces_with_groups():
conn = boto.connect_vpc('the_key', 'the_secret')
vpc = conn.create_vpc("10.0.0.0/16")
subnet = conn.create_subnet(vpc.id, "10.0.0.0/18")
security_group1 = conn.create_security_group(
'test security group #1', 'this is a test security group')
security_group2 = conn.create_security_group(
'test security group #2', 'this is a test security group')
conn.create_network_interface(
subnet.id, groups=[security_group1.id, security_group2.id])
all_enis = conn.get_all_network_interfaces()
all_enis.should.have.length_of(1)
eni = all_enis[0]
eni.groups.should.have.length_of(2)
set([group.id for group in eni.groups]).should.equal(
set([security_group1.id, security_group2.id]))
@requires_boto_gte("2.12.0")
@mock_ec2_deprecated
def test_elastic_network_interfaces_modify_attribute():
conn = boto.connect_vpc('the_key', 'the_secret')
vpc = conn.create_vpc("10.0.0.0/16")
subnet = conn.create_subnet(vpc.id, "10.0.0.0/18")
security_group1 = conn.create_security_group(
'test security group #1', 'this is a test security group')
security_group2 = conn.create_security_group(
'test security group #2', 'this is a test security group')
conn.create_network_interface(subnet.id, groups=[security_group1.id])
all_enis = conn.get_all_network_interfaces()
all_enis.should.have.length_of(1)
eni = all_enis[0]
eni.groups.should.have.length_of(1)
eni.groups[0].id.should.equal(security_group1.id)
with assert_raises(EC2ResponseError) as ex:
conn.modify_network_interface_attribute(
eni.id, 'groupset', [security_group2.id], dry_run=True)
ex.exception.error_code.should.equal('DryRunOperation')
ex.exception.status.should.equal(400)
ex.exception.message.should.equal(
'An error occurred (DryRunOperation) when calling the ModifyNetworkInterface operation: Request would have succeeded, but DryRun flag is set')
conn.modify_network_interface_attribute(
eni.id, 'groupset', [security_group2.id])
all_enis = conn.get_all_network_interfaces()
all_enis.should.have.length_of(1)
eni = all_enis[0]
eni.groups.should.have.length_of(1)
eni.groups[0].id.should.equal(security_group2.id)
@mock_ec2_deprecated
def test_elastic_network_interfaces_filtering():
conn = boto.connect_vpc('the_key', 'the_secret')
vpc = conn.create_vpc("10.0.0.0/16")
subnet = conn.create_subnet(vpc.id, "10.0.0.0/18")
security_group1 = conn.create_security_group(
'test security group #1', 'this is a test security group')
security_group2 = conn.create_security_group(
'test security group #2', 'this is a test security group')
eni1 = conn.create_network_interface(
subnet.id, groups=[security_group1.id, security_group2.id])
eni2 = conn.create_network_interface(
subnet.id, groups=[security_group1.id])
eni3 = conn.create_network_interface(subnet.id)
all_enis = conn.get_all_network_interfaces()
all_enis.should.have.length_of(3)
# Filter by NetworkInterfaceId
enis_by_id = conn.get_all_network_interfaces([eni1.id])
enis_by_id.should.have.length_of(1)
set([eni.id for eni in enis_by_id]).should.equal(set([eni1.id]))
# Filter by ENI ID
enis_by_id = conn.get_all_network_interfaces(
filters={'network-interface-id': eni1.id})
enis_by_id.should.have.length_of(1)
set([eni.id for eni in enis_by_id]).should.equal(set([eni1.id]))
# Filter by Security Group
enis_by_group = conn.get_all_network_interfaces(
filters={'group-id': security_group1.id})
enis_by_group.should.have.length_of(2)
set([eni.id for eni in enis_by_group]).should.equal(set([eni1.id, eni2.id]))
# Filter by ENI ID and Security Group
enis_by_group = conn.get_all_network_interfaces(
filters={'network-interface-id': eni1.id, 'group-id': security_group1.id})
enis_by_group.should.have.length_of(1)
set([eni.id for eni in enis_by_group]).should.equal(set([eni1.id]))
# Unsupported filter
conn.get_all_network_interfaces.when.called_with(
filters={'not-implemented-filter': 'foobar'}).should.throw(NotImplementedError)
@mock_ec2
def test_elastic_network_interfaces_get_by_tag_name():
ec2 = boto3.resource('ec2', region_name='us-west-2')
ec2_client = boto3.client('ec2', region_name='us-west-2')
vpc = ec2.create_vpc(CidrBlock='10.0.0.0/16')
subnet = ec2.create_subnet(
VpcId=vpc.id, CidrBlock='10.0.0.0/24', AvailabilityZone='us-west-2a')
eni1 = ec2.create_network_interface(
SubnetId=subnet.id, PrivateIpAddress='10.0.10.5')
with assert_raises(ClientError) as ex:
eni1.create_tags(Tags=[{'Key': 'Name', 'Value': 'eni1'}], DryRun=True)
ex.exception.response['Error']['Code'].should.equal('DryRunOperation')
ex.exception.response['ResponseMetadata'][
'HTTPStatusCode'].should.equal(400)
ex.exception.response['Error']['Message'].should.equal(
'An error occurred (DryRunOperation) when calling the CreateTags operation: Request would have succeeded, but DryRun flag is set')
eni1.create_tags(Tags=[{'Key': 'Name', 'Value': 'eni1'}])
# The status of the new interface should be 'available'
waiter = ec2_client.get_waiter('network_interface_available')
waiter.wait(NetworkInterfaceIds=[eni1.id])
filters = [{'Name': 'tag:Name', 'Values': ['eni1']}]
enis = list(ec2.network_interfaces.filter(Filters=filters))
enis.should.have.length_of(1)
filters = [{'Name': 'tag:Name', 'Values': ['wrong-name']}]
enis = list(ec2.network_interfaces.filter(Filters=filters))
enis.should.have.length_of(0)
@mock_ec2
def test_elastic_network_interfaces_get_by_availability_zone():
ec2 = boto3.resource('ec2', region_name='us-west-2')
ec2_client = boto3.client('ec2', region_name='us-west-2')
vpc = ec2.create_vpc(CidrBlock='10.0.0.0/16')
subnet1 = ec2.create_subnet(
VpcId=vpc.id, CidrBlock='10.0.0.0/24', AvailabilityZone='us-west-2a')
subnet2 = ec2.create_subnet(
VpcId=vpc.id, CidrBlock='10.0.1.0/24', AvailabilityZone='us-west-2b')
eni1 = ec2.create_network_interface(
SubnetId=subnet1.id, PrivateIpAddress='10.0.0.15')
eni2 = ec2.create_network_interface(
SubnetId=subnet2.id, PrivateIpAddress='10.0.1.15')
# The status of the new interface should be 'available'
waiter = ec2_client.get_waiter('network_interface_available')
waiter.wait(NetworkInterfaceIds=[eni1.id, eni2.id])
filters = [{'Name': 'availability-zone', 'Values': ['us-west-2a']}]
enis = list(ec2.network_interfaces.filter(Filters=filters))
enis.should.have.length_of(1)
filters = [{'Name': 'availability-zone', 'Values': ['us-west-2c']}]
enis = list(ec2.network_interfaces.filter(Filters=filters))
enis.should.have.length_of(0)
@mock_ec2
def test_elastic_network_interfaces_get_by_private_ip():
ec2 = boto3.resource('ec2', region_name='us-west-2')
ec2_client = boto3.client('ec2', region_name='us-west-2')
vpc = ec2.create_vpc(CidrBlock='10.0.0.0/16')
subnet = ec2.create_subnet(
VpcId=vpc.id, CidrBlock='10.0.0.0/24', AvailabilityZone='us-west-2a')
eni1 = ec2.create_network_interface(
SubnetId=subnet.id, PrivateIpAddress='10.0.10.5')
# The status of the new interface should be 'available'
waiter = ec2_client.get_waiter('network_interface_available')
waiter.wait(NetworkInterfaceIds=[eni1.id])
filters = [{'Name': 'private-ip-address', 'Values': ['10.0.10.5']}]
enis = list(ec2.network_interfaces.filter(Filters=filters))
enis.should.have.length_of(1)
filters = [{'Name': 'private-ip-address', 'Values': ['10.0.10.10']}]
enis = list(ec2.network_interfaces.filter(Filters=filters))
enis.should.have.length_of(0)
filters = [{'Name': 'addresses.private-ip-address', 'Values': ['10.0.10.5']}]
enis = list(ec2.network_interfaces.filter(Filters=filters))
enis.should.have.length_of(1)
filters = [{'Name': 'addresses.private-ip-address', 'Values': ['10.0.10.10']}]
enis = list(ec2.network_interfaces.filter(Filters=filters))
enis.should.have.length_of(0)
@mock_ec2
def test_elastic_network_interfaces_get_by_vpc_id():
ec2 = boto3.resource('ec2', region_name='us-west-2')
ec2_client = boto3.client('ec2', region_name='us-west-2')
vpc = ec2.create_vpc(CidrBlock='10.0.0.0/16')
subnet = ec2.create_subnet(
VpcId=vpc.id, CidrBlock='10.0.0.0/24', AvailabilityZone='us-west-2a')
eni1 = ec2.create_network_interface(
SubnetId=subnet.id, PrivateIpAddress='10.0.10.5')
# The status of the new interface should be 'available'
waiter = ec2_client.get_waiter('network_interface_available')
waiter.wait(NetworkInterfaceIds=[eni1.id])
filters = [{'Name': 'vpc-id', 'Values': [subnet.vpc_id]}]
enis = list(ec2.network_interfaces.filter(Filters=filters))
enis.should.have.length_of(1)
filters = [{'Name': 'vpc-id', 'Values': ['vpc-aaaa1111']}]
enis = list(ec2.network_interfaces.filter(Filters=filters))
enis.should.have.length_of(0)
@mock_ec2
def test_elastic_network_interfaces_get_by_subnet_id():
ec2 = boto3.resource('ec2', region_name='us-west-2')
ec2_client = boto3.client('ec2', region_name='us-west-2')
vpc = ec2.create_vpc(CidrBlock='10.0.0.0/16')
subnet = ec2.create_subnet(
VpcId=vpc.id, CidrBlock='10.0.0.0/24', AvailabilityZone='us-west-2a')
eni1 = ec2.create_network_interface(
SubnetId=subnet.id, PrivateIpAddress='10.0.10.5')
# The status of the new interface should be 'available'
waiter = ec2_client.get_waiter('network_interface_available')
waiter.wait(NetworkInterfaceIds=[eni1.id])
filters = [{'Name': 'subnet-id', 'Values': [subnet.id]}]
enis = list(ec2.network_interfaces.filter(Filters=filters))
enis.should.have.length_of(1)
filters = [{'Name': 'subnet-id', 'Values': ['subnet-aaaa1111']}]
enis = list(ec2.network_interfaces.filter(Filters=filters))
enis.should.have.length_of(0)
@mock_ec2_deprecated
@mock_cloudformation_deprecated
def test_elastic_network_interfaces_cloudformation():
template = vpc_eni.template
template_json = json.dumps(template)
conn = boto.cloudformation.connect_to_region("us-west-1")
conn.create_stack(
"test_stack",
template_body=template_json,
)
ec2_conn = boto.ec2.connect_to_region("us-west-1")
eni = ec2_conn.get_all_network_interfaces()[0]
eni.private_ip_addresses.should.have.length_of(1)
stack = conn.describe_stacks()[0]
resources = stack.describe_resources()
cfn_eni = [resource for resource in resources if resource.resource_type ==
'AWS::EC2::NetworkInterface'][0]
cfn_eni.physical_resource_id.should.equal(eni.id)
outputs = {output.key: output.value for output in stack.outputs}
outputs['ENIIpAddress'].should.equal(eni.private_ip_addresses[0].private_ip_address)

File diff suppressed because it is too large Load diff

View file

@ -1,175 +1,216 @@
from __future__ import unicode_literals
import boto
import sure # noqa
from moto import mock_ec2_deprecated
@mock_ec2_deprecated
def test_default_network_acl_created_with_vpc():
conn = boto.connect_vpc('the_key', 'the secret')
vpc = conn.create_vpc("10.0.0.0/16")
all_network_acls = conn.get_all_network_acls()
all_network_acls.should.have.length_of(2)
@mock_ec2_deprecated
def test_network_acls():
conn = boto.connect_vpc('the_key', 'the secret')
vpc = conn.create_vpc("10.0.0.0/16")
network_acl = conn.create_network_acl(vpc.id)
all_network_acls = conn.get_all_network_acls()
all_network_acls.should.have.length_of(3)
@mock_ec2_deprecated
def test_new_subnet_associates_with_default_network_acl():
conn = boto.connect_vpc('the_key', 'the secret')
vpc = conn.get_all_vpcs()[0]
subnet = conn.create_subnet(vpc.id, "10.0.0.0/18")
all_network_acls = conn.get_all_network_acls()
all_network_acls.should.have.length_of(1)
acl = all_network_acls[0]
acl.associations.should.have.length_of(4)
[a.subnet_id for a in acl.associations].should.contain(subnet.id)
@mock_ec2_deprecated
def test_network_acl_entries():
conn = boto.connect_vpc('the_key', 'the secret')
vpc = conn.create_vpc("10.0.0.0/16")
network_acl = conn.create_network_acl(vpc.id)
network_acl_entry = conn.create_network_acl_entry(
network_acl.id, 110, 6,
'ALLOW', '0.0.0.0/0', False,
port_range_from='443',
port_range_to='443'
)
all_network_acls = conn.get_all_network_acls()
all_network_acls.should.have.length_of(3)
test_network_acl = next(na for na in all_network_acls
if na.id == network_acl.id)
entries = test_network_acl.network_acl_entries
entries.should.have.length_of(1)
entries[0].rule_number.should.equal('110')
entries[0].protocol.should.equal('6')
entries[0].rule_action.should.equal('ALLOW')
@mock_ec2_deprecated
def test_delete_network_acl_entry():
conn = boto.connect_vpc('the_key', 'the secret')
vpc = conn.create_vpc("10.0.0.0/16")
network_acl = conn.create_network_acl(vpc.id)
conn.create_network_acl_entry(
network_acl.id, 110, 6,
'ALLOW', '0.0.0.0/0', False,
port_range_from='443',
port_range_to='443'
)
conn.delete_network_acl_entry(
network_acl.id, 110, False
)
all_network_acls = conn.get_all_network_acls()
test_network_acl = next(na for na in all_network_acls
if na.id == network_acl.id)
entries = test_network_acl.network_acl_entries
entries.should.have.length_of(0)
@mock_ec2_deprecated
def test_replace_network_acl_entry():
conn = boto.connect_vpc('the_key', 'the secret')
vpc = conn.create_vpc("10.0.0.0/16")
network_acl = conn.create_network_acl(vpc.id)
conn.create_network_acl_entry(
network_acl.id, 110, 6,
'ALLOW', '0.0.0.0/0', False,
port_range_from='443',
port_range_to='443'
)
conn.replace_network_acl_entry(
network_acl.id, 110, -1,
'DENY', '0.0.0.0/0', False,
port_range_from='22',
port_range_to='22'
)
all_network_acls = conn.get_all_network_acls()
test_network_acl = next(na for na in all_network_acls
if na.id == network_acl.id)
entries = test_network_acl.network_acl_entries
entries.should.have.length_of(1)
entries[0].rule_number.should.equal('110')
entries[0].protocol.should.equal('-1')
entries[0].rule_action.should.equal('DENY')
@mock_ec2_deprecated
def test_associate_new_network_acl_with_subnet():
conn = boto.connect_vpc('the_key', 'the secret')
vpc = conn.create_vpc("10.0.0.0/16")
subnet = conn.create_subnet(vpc.id, "10.0.0.0/18")
network_acl = conn.create_network_acl(vpc.id)
conn.associate_network_acl(network_acl.id, subnet.id)
all_network_acls = conn.get_all_network_acls()
all_network_acls.should.have.length_of(3)
test_network_acl = next(na for na in all_network_acls
if na.id == network_acl.id)
test_network_acl.associations.should.have.length_of(1)
test_network_acl.associations[0].subnet_id.should.equal(subnet.id)
@mock_ec2_deprecated
def test_delete_network_acl():
conn = boto.connect_vpc('the_key', 'the secret')
vpc = conn.create_vpc("10.0.0.0/16")
subnet = conn.create_subnet(vpc.id, "10.0.0.0/18")
network_acl = conn.create_network_acl(vpc.id)
all_network_acls = conn.get_all_network_acls()
all_network_acls.should.have.length_of(3)
any(acl.id == network_acl.id for acl in all_network_acls).should.be.ok
conn.delete_network_acl(network_acl.id)
updated_network_acls = conn.get_all_network_acls()
updated_network_acls.should.have.length_of(2)
any(acl.id == network_acl.id for acl in updated_network_acls).shouldnt.be.ok
@mock_ec2_deprecated
def test_network_acl_tagging():
conn = boto.connect_vpc('the_key', 'the secret')
vpc = conn.create_vpc("10.0.0.0/16")
network_acl = conn.create_network_acl(vpc.id)
network_acl.add_tag("a key", "some value")
tag = conn.get_all_tags()[0]
tag.name.should.equal("a key")
tag.value.should.equal("some value")
all_network_acls = conn.get_all_network_acls()
test_network_acl = next(na for na in all_network_acls
if na.id == network_acl.id)
test_network_acl.tags.should.have.length_of(1)
test_network_acl.tags["a key"].should.equal("some value")
from __future__ import unicode_literals
import boto
import boto3
import sure # noqa
from moto import mock_ec2_deprecated, mock_ec2
@mock_ec2_deprecated
def test_default_network_acl_created_with_vpc():
conn = boto.connect_vpc('the_key', 'the secret')
vpc = conn.create_vpc("10.0.0.0/16")
all_network_acls = conn.get_all_network_acls()
all_network_acls.should.have.length_of(2)
@mock_ec2_deprecated
def test_network_acls():
conn = boto.connect_vpc('the_key', 'the secret')
vpc = conn.create_vpc("10.0.0.0/16")
network_acl = conn.create_network_acl(vpc.id)
all_network_acls = conn.get_all_network_acls()
all_network_acls.should.have.length_of(3)
@mock_ec2_deprecated
def test_new_subnet_associates_with_default_network_acl():
conn = boto.connect_vpc('the_key', 'the secret')
vpc = conn.get_all_vpcs()[0]
subnet = conn.create_subnet(vpc.id, "10.0.0.0/18")
all_network_acls = conn.get_all_network_acls()
all_network_acls.should.have.length_of(1)
acl = all_network_acls[0]
acl.associations.should.have.length_of(4)
[a.subnet_id for a in acl.associations].should.contain(subnet.id)
@mock_ec2_deprecated
def test_network_acl_entries():
conn = boto.connect_vpc('the_key', 'the secret')
vpc = conn.create_vpc("10.0.0.0/16")
network_acl = conn.create_network_acl(vpc.id)
network_acl_entry = conn.create_network_acl_entry(
network_acl.id, 110, 6,
'ALLOW', '0.0.0.0/0', False,
port_range_from='443',
port_range_to='443'
)
all_network_acls = conn.get_all_network_acls()
all_network_acls.should.have.length_of(3)
test_network_acl = next(na for na in all_network_acls
if na.id == network_acl.id)
entries = test_network_acl.network_acl_entries
entries.should.have.length_of(1)
entries[0].rule_number.should.equal('110')
entries[0].protocol.should.equal('6')
entries[0].rule_action.should.equal('ALLOW')
@mock_ec2_deprecated
def test_delete_network_acl_entry():
conn = boto.connect_vpc('the_key', 'the secret')
vpc = conn.create_vpc("10.0.0.0/16")
network_acl = conn.create_network_acl(vpc.id)
conn.create_network_acl_entry(
network_acl.id, 110, 6,
'ALLOW', '0.0.0.0/0', False,
port_range_from='443',
port_range_to='443'
)
conn.delete_network_acl_entry(
network_acl.id, 110, False
)
all_network_acls = conn.get_all_network_acls()
test_network_acl = next(na for na in all_network_acls
if na.id == network_acl.id)
entries = test_network_acl.network_acl_entries
entries.should.have.length_of(0)
@mock_ec2_deprecated
def test_replace_network_acl_entry():
conn = boto.connect_vpc('the_key', 'the secret')
vpc = conn.create_vpc("10.0.0.0/16")
network_acl = conn.create_network_acl(vpc.id)
conn.create_network_acl_entry(
network_acl.id, 110, 6,
'ALLOW', '0.0.0.0/0', False,
port_range_from='443',
port_range_to='443'
)
conn.replace_network_acl_entry(
network_acl.id, 110, -1,
'DENY', '0.0.0.0/0', False,
port_range_from='22',
port_range_to='22'
)
all_network_acls = conn.get_all_network_acls()
test_network_acl = next(na for na in all_network_acls
if na.id == network_acl.id)
entries = test_network_acl.network_acl_entries
entries.should.have.length_of(1)
entries[0].rule_number.should.equal('110')
entries[0].protocol.should.equal('-1')
entries[0].rule_action.should.equal('DENY')
@mock_ec2_deprecated
def test_associate_new_network_acl_with_subnet():
conn = boto.connect_vpc('the_key', 'the secret')
vpc = conn.create_vpc("10.0.0.0/16")
subnet = conn.create_subnet(vpc.id, "10.0.0.0/18")
network_acl = conn.create_network_acl(vpc.id)
conn.associate_network_acl(network_acl.id, subnet.id)
all_network_acls = conn.get_all_network_acls()
all_network_acls.should.have.length_of(3)
test_network_acl = next(na for na in all_network_acls
if na.id == network_acl.id)
test_network_acl.associations.should.have.length_of(1)
test_network_acl.associations[0].subnet_id.should.equal(subnet.id)
@mock_ec2_deprecated
def test_delete_network_acl():
conn = boto.connect_vpc('the_key', 'the secret')
vpc = conn.create_vpc("10.0.0.0/16")
subnet = conn.create_subnet(vpc.id, "10.0.0.0/18")
network_acl = conn.create_network_acl(vpc.id)
all_network_acls = conn.get_all_network_acls()
all_network_acls.should.have.length_of(3)
any(acl.id == network_acl.id for acl in all_network_acls).should.be.ok
conn.delete_network_acl(network_acl.id)
updated_network_acls = conn.get_all_network_acls()
updated_network_acls.should.have.length_of(2)
any(acl.id == network_acl.id for acl in updated_network_acls).shouldnt.be.ok
@mock_ec2_deprecated
def test_network_acl_tagging():
conn = boto.connect_vpc('the_key', 'the secret')
vpc = conn.create_vpc("10.0.0.0/16")
network_acl = conn.create_network_acl(vpc.id)
network_acl.add_tag("a key", "some value")
tag = conn.get_all_tags()[0]
tag.name.should.equal("a key")
tag.value.should.equal("some value")
all_network_acls = conn.get_all_network_acls()
test_network_acl = next(na for na in all_network_acls
if na.id == network_acl.id)
test_network_acl.tags.should.have.length_of(1)
test_network_acl.tags["a key"].should.equal("some value")
@mock_ec2
def test_new_subnet_in_new_vpc_associates_with_default_network_acl():
ec2 = boto3.resource('ec2', region_name='us-west-1')
new_vpc = ec2.create_vpc(CidrBlock='10.0.0.0/16')
new_vpc.reload()
subnet = ec2.create_subnet(VpcId=new_vpc.id, CidrBlock='10.0.0.0/24')
subnet.reload()
new_vpcs_default_network_acl = next(iter(new_vpc.network_acls.all()), None)
new_vpcs_default_network_acl.reload()
new_vpcs_default_network_acl.vpc_id.should.equal(new_vpc.id)
new_vpcs_default_network_acl.associations.should.have.length_of(1)
new_vpcs_default_network_acl.associations[0]['SubnetId'].should.equal(subnet.id)
@mock_ec2
def test_default_network_acl_default_entries():
ec2 = boto3.resource('ec2', region_name='us-west-1')
default_network_acl = next(iter(ec2.network_acls.all()), None)
default_network_acl.is_default.should.be.ok
default_network_acl.entries.should.have.length_of(4)
unique_entries = []
for entry in default_network_acl.entries:
entry['CidrBlock'].should.equal('0.0.0.0/0')
entry['Protocol'].should.equal('-1')
entry['RuleNumber'].should.be.within([100, 32767])
entry['RuleAction'].should.be.within(['allow', 'deny'])
assert type(entry['Egress']) is bool
if entry['RuleAction'] == 'allow':
entry['RuleNumber'].should.be.equal(100)
else:
entry['RuleNumber'].should.be.equal(32767)
if entry not in unique_entries:
unique_entries.append(entry)
unique_entries.should.have.length_of(4)

View file

@ -1,345 +1,387 @@
from __future__ import unicode_literals
import boto3
import sure # noqa
from moto import mock_ec2
def get_subnet_id(conn):
vpc = conn.create_vpc(CidrBlock="10.0.0.0/8")['Vpc']
subnet = conn.create_subnet(
VpcId=vpc['VpcId'], CidrBlock='10.0.0.0/16', AvailabilityZone='us-east-1a')['Subnet']
subnet_id = subnet['SubnetId']
return subnet_id
def spot_config(subnet_id, allocation_strategy="lowestPrice"):
return {
'ClientToken': 'string',
'SpotPrice': '0.12',
'TargetCapacity': 6,
'IamFleetRole': 'arn:aws:iam::123456789012:role/fleet',
'LaunchSpecifications': [{
'ImageId': 'ami-123',
'KeyName': 'my-key',
'SecurityGroups': [
{
'GroupId': 'sg-123'
},
],
'UserData': 'some user data',
'InstanceType': 't2.small',
'BlockDeviceMappings': [
{
'VirtualName': 'string',
'DeviceName': 'string',
'Ebs': {
'SnapshotId': 'string',
'VolumeSize': 123,
'DeleteOnTermination': True | False,
'VolumeType': 'standard',
'Iops': 123,
'Encrypted': True | False
},
'NoDevice': 'string'
},
],
'Monitoring': {
'Enabled': True
},
'SubnetId': subnet_id,
'IamInstanceProfile': {
'Arn': 'arn:aws:iam::123456789012:role/fleet'
},
'EbsOptimized': False,
'WeightedCapacity': 2.0,
'SpotPrice': '0.13'
}, {
'ImageId': 'ami-123',
'KeyName': 'my-key',
'SecurityGroups': [
{
'GroupId': 'sg-123'
},
],
'UserData': 'some user data',
'InstanceType': 't2.large',
'Monitoring': {
'Enabled': True
},
'SubnetId': subnet_id,
'IamInstanceProfile': {
'Arn': 'arn:aws:iam::123456789012:role/fleet'
},
'EbsOptimized': False,
'WeightedCapacity': 4.0,
'SpotPrice': '10.00',
}],
'AllocationStrategy': allocation_strategy,
'FulfilledCapacity': 6,
}
@mock_ec2
def test_create_spot_fleet_with_lowest_price():
conn = boto3.client("ec2", region_name='us-west-2')
subnet_id = get_subnet_id(conn)
spot_fleet_res = conn.request_spot_fleet(
SpotFleetRequestConfig=spot_config(subnet_id)
)
spot_fleet_id = spot_fleet_res['SpotFleetRequestId']
spot_fleet_requests = conn.describe_spot_fleet_requests(
SpotFleetRequestIds=[spot_fleet_id])['SpotFleetRequestConfigs']
len(spot_fleet_requests).should.equal(1)
spot_fleet_request = spot_fleet_requests[0]
spot_fleet_request['SpotFleetRequestState'].should.equal("active")
spot_fleet_config = spot_fleet_request['SpotFleetRequestConfig']
spot_fleet_config['SpotPrice'].should.equal('0.12')
spot_fleet_config['TargetCapacity'].should.equal(6)
spot_fleet_config['IamFleetRole'].should.equal(
'arn:aws:iam::123456789012:role/fleet')
spot_fleet_config['AllocationStrategy'].should.equal('lowestPrice')
spot_fleet_config['FulfilledCapacity'].should.equal(6.0)
len(spot_fleet_config['LaunchSpecifications']).should.equal(2)
launch_spec = spot_fleet_config['LaunchSpecifications'][0]
launch_spec['EbsOptimized'].should.equal(False)
launch_spec['SecurityGroups'].should.equal([{"GroupId": "sg-123"}])
launch_spec['IamInstanceProfile'].should.equal(
{"Arn": "arn:aws:iam::123456789012:role/fleet"})
launch_spec['ImageId'].should.equal("ami-123")
launch_spec['InstanceType'].should.equal("t2.small")
launch_spec['KeyName'].should.equal("my-key")
launch_spec['Monitoring'].should.equal({"Enabled": True})
launch_spec['SpotPrice'].should.equal("0.13")
launch_spec['SubnetId'].should.equal(subnet_id)
launch_spec['UserData'].should.equal("some user data")
launch_spec['WeightedCapacity'].should.equal(2.0)
instance_res = conn.describe_spot_fleet_instances(
SpotFleetRequestId=spot_fleet_id)
instances = instance_res['ActiveInstances']
len(instances).should.equal(3)
@mock_ec2
def test_create_diversified_spot_fleet():
conn = boto3.client("ec2", region_name='us-west-2')
subnet_id = get_subnet_id(conn)
diversified_config = spot_config(
subnet_id, allocation_strategy='diversified')
spot_fleet_res = conn.request_spot_fleet(
SpotFleetRequestConfig=diversified_config
)
spot_fleet_id = spot_fleet_res['SpotFleetRequestId']
instance_res = conn.describe_spot_fleet_instances(
SpotFleetRequestId=spot_fleet_id)
instances = instance_res['ActiveInstances']
len(instances).should.equal(2)
instance_types = set([instance['InstanceType'] for instance in instances])
instance_types.should.equal(set(["t2.small", "t2.large"]))
instances[0]['InstanceId'].should.contain("i-")
@mock_ec2
def test_cancel_spot_fleet_request():
conn = boto3.client("ec2", region_name='us-west-2')
subnet_id = get_subnet_id(conn)
spot_fleet_res = conn.request_spot_fleet(
SpotFleetRequestConfig=spot_config(subnet_id),
)
spot_fleet_id = spot_fleet_res['SpotFleetRequestId']
conn.cancel_spot_fleet_requests(
SpotFleetRequestIds=[spot_fleet_id], TerminateInstances=True)
spot_fleet_requests = conn.describe_spot_fleet_requests(
SpotFleetRequestIds=[spot_fleet_id])['SpotFleetRequestConfigs']
len(spot_fleet_requests).should.equal(0)
@mock_ec2
def test_modify_spot_fleet_request_up():
conn = boto3.client("ec2", region_name='us-west-2')
subnet_id = get_subnet_id(conn)
spot_fleet_res = conn.request_spot_fleet(
SpotFleetRequestConfig=spot_config(subnet_id),
)
spot_fleet_id = spot_fleet_res['SpotFleetRequestId']
conn.modify_spot_fleet_request(
SpotFleetRequestId=spot_fleet_id, TargetCapacity=20)
instance_res = conn.describe_spot_fleet_instances(
SpotFleetRequestId=spot_fleet_id)
instances = instance_res['ActiveInstances']
len(instances).should.equal(10)
spot_fleet_config = conn.describe_spot_fleet_requests(
SpotFleetRequestIds=[spot_fleet_id])['SpotFleetRequestConfigs'][0]['SpotFleetRequestConfig']
spot_fleet_config['TargetCapacity'].should.equal(20)
spot_fleet_config['FulfilledCapacity'].should.equal(20.0)
@mock_ec2
def test_modify_spot_fleet_request_up_diversified():
conn = boto3.client("ec2", region_name='us-west-2')
subnet_id = get_subnet_id(conn)
spot_fleet_res = conn.request_spot_fleet(
SpotFleetRequestConfig=spot_config(
subnet_id, allocation_strategy='diversified'),
)
spot_fleet_id = spot_fleet_res['SpotFleetRequestId']
conn.modify_spot_fleet_request(
SpotFleetRequestId=spot_fleet_id, TargetCapacity=19)
instance_res = conn.describe_spot_fleet_instances(
SpotFleetRequestId=spot_fleet_id)
instances = instance_res['ActiveInstances']
len(instances).should.equal(7)
spot_fleet_config = conn.describe_spot_fleet_requests(
SpotFleetRequestIds=[spot_fleet_id])['SpotFleetRequestConfigs'][0]['SpotFleetRequestConfig']
spot_fleet_config['TargetCapacity'].should.equal(19)
spot_fleet_config['FulfilledCapacity'].should.equal(20.0)
@mock_ec2
def test_modify_spot_fleet_request_down_no_terminate():
conn = boto3.client("ec2", region_name='us-west-2')
subnet_id = get_subnet_id(conn)
spot_fleet_res = conn.request_spot_fleet(
SpotFleetRequestConfig=spot_config(subnet_id),
)
spot_fleet_id = spot_fleet_res['SpotFleetRequestId']
conn.modify_spot_fleet_request(
SpotFleetRequestId=spot_fleet_id, TargetCapacity=1, ExcessCapacityTerminationPolicy="noTermination")
instance_res = conn.describe_spot_fleet_instances(
SpotFleetRequestId=spot_fleet_id)
instances = instance_res['ActiveInstances']
len(instances).should.equal(3)
spot_fleet_config = conn.describe_spot_fleet_requests(
SpotFleetRequestIds=[spot_fleet_id])['SpotFleetRequestConfigs'][0]['SpotFleetRequestConfig']
spot_fleet_config['TargetCapacity'].should.equal(1)
spot_fleet_config['FulfilledCapacity'].should.equal(6.0)
@mock_ec2
def test_modify_spot_fleet_request_down_odd():
conn = boto3.client("ec2", region_name='us-west-2')
subnet_id = get_subnet_id(conn)
spot_fleet_res = conn.request_spot_fleet(
SpotFleetRequestConfig=spot_config(subnet_id),
)
spot_fleet_id = spot_fleet_res['SpotFleetRequestId']
conn.modify_spot_fleet_request(
SpotFleetRequestId=spot_fleet_id, TargetCapacity=7)
conn.modify_spot_fleet_request(
SpotFleetRequestId=spot_fleet_id, TargetCapacity=5)
instance_res = conn.describe_spot_fleet_instances(
SpotFleetRequestId=spot_fleet_id)
instances = instance_res['ActiveInstances']
len(instances).should.equal(3)
spot_fleet_config = conn.describe_spot_fleet_requests(
SpotFleetRequestIds=[spot_fleet_id])['SpotFleetRequestConfigs'][0]['SpotFleetRequestConfig']
spot_fleet_config['TargetCapacity'].should.equal(5)
spot_fleet_config['FulfilledCapacity'].should.equal(6.0)
@mock_ec2
def test_modify_spot_fleet_request_down():
conn = boto3.client("ec2", region_name='us-west-2')
subnet_id = get_subnet_id(conn)
spot_fleet_res = conn.request_spot_fleet(
SpotFleetRequestConfig=spot_config(subnet_id),
)
spot_fleet_id = spot_fleet_res['SpotFleetRequestId']
conn.modify_spot_fleet_request(
SpotFleetRequestId=spot_fleet_id, TargetCapacity=1)
instance_res = conn.describe_spot_fleet_instances(
SpotFleetRequestId=spot_fleet_id)
instances = instance_res['ActiveInstances']
len(instances).should.equal(1)
spot_fleet_config = conn.describe_spot_fleet_requests(
SpotFleetRequestIds=[spot_fleet_id])['SpotFleetRequestConfigs'][0]['SpotFleetRequestConfig']
spot_fleet_config['TargetCapacity'].should.equal(1)
spot_fleet_config['FulfilledCapacity'].should.equal(2.0)
@mock_ec2
def test_modify_spot_fleet_request_down_no_terminate_after_custom_terminate():
conn = boto3.client("ec2", region_name='us-west-2')
subnet_id = get_subnet_id(conn)
spot_fleet_res = conn.request_spot_fleet(
SpotFleetRequestConfig=spot_config(subnet_id),
)
spot_fleet_id = spot_fleet_res['SpotFleetRequestId']
instance_res = conn.describe_spot_fleet_instances(
SpotFleetRequestId=spot_fleet_id)
instances = instance_res['ActiveInstances']
conn.terminate_instances(InstanceIds=[i['InstanceId'] for i in instances[1:]])
conn.modify_spot_fleet_request(
SpotFleetRequestId=spot_fleet_id, TargetCapacity=1, ExcessCapacityTerminationPolicy="noTermination")
instance_res = conn.describe_spot_fleet_instances(
SpotFleetRequestId=spot_fleet_id)
instances = instance_res['ActiveInstances']
len(instances).should.equal(1)
spot_fleet_config = conn.describe_spot_fleet_requests(
SpotFleetRequestIds=[spot_fleet_id])['SpotFleetRequestConfigs'][0]['SpotFleetRequestConfig']
spot_fleet_config['TargetCapacity'].should.equal(1)
spot_fleet_config['FulfilledCapacity'].should.equal(2.0)
@mock_ec2
def test_create_spot_fleet_without_spot_price():
conn = boto3.client("ec2", region_name='us-west-2')
subnet_id = get_subnet_id(conn)
# remove prices to force a fallback to ondemand price
spot_config_without_price = spot_config(subnet_id)
del spot_config_without_price['SpotPrice']
for spec in spot_config_without_price['LaunchSpecifications']:
del spec['SpotPrice']
spot_fleet_id = conn.request_spot_fleet(SpotFleetRequestConfig=spot_config_without_price)['SpotFleetRequestId']
spot_fleet_requests = conn.describe_spot_fleet_requests(
SpotFleetRequestIds=[spot_fleet_id])['SpotFleetRequestConfigs']
len(spot_fleet_requests).should.equal(1)
spot_fleet_request = spot_fleet_requests[0]
spot_fleet_config = spot_fleet_request['SpotFleetRequestConfig']
len(spot_fleet_config['LaunchSpecifications']).should.equal(2)
launch_spec1 = spot_fleet_config['LaunchSpecifications'][0]
launch_spec2 = spot_fleet_config['LaunchSpecifications'][1]
# AWS will figure out the price
assert 'SpotPrice' not in launch_spec1
assert 'SpotPrice' not in launch_spec2
from __future__ import unicode_literals
import boto3
import sure # noqa
from moto import mock_ec2
def get_subnet_id(conn):
vpc = conn.create_vpc(CidrBlock="10.0.0.0/8")['Vpc']
subnet = conn.create_subnet(
VpcId=vpc['VpcId'], CidrBlock='10.0.0.0/16', AvailabilityZone='us-east-1a')['Subnet']
subnet_id = subnet['SubnetId']
return subnet_id
def spot_config(subnet_id, allocation_strategy="lowestPrice"):
return {
'ClientToken': 'string',
'SpotPrice': '0.12',
'TargetCapacity': 6,
'IamFleetRole': 'arn:aws:iam::123456789012:role/fleet',
'LaunchSpecifications': [{
'ImageId': 'ami-123',
'KeyName': 'my-key',
'SecurityGroups': [
{
'GroupId': 'sg-123'
},
],
'UserData': 'some user data',
'InstanceType': 't2.small',
'BlockDeviceMappings': [
{
'VirtualName': 'string',
'DeviceName': 'string',
'Ebs': {
'SnapshotId': 'string',
'VolumeSize': 123,
'DeleteOnTermination': True | False,
'VolumeType': 'standard',
'Iops': 123,
'Encrypted': True | False
},
'NoDevice': 'string'
},
],
'Monitoring': {
'Enabled': True
},
'SubnetId': subnet_id,
'IamInstanceProfile': {
'Arn': 'arn:aws:iam::123456789012:role/fleet'
},
'EbsOptimized': False,
'WeightedCapacity': 2.0,
'SpotPrice': '0.13',
}, {
'ImageId': 'ami-123',
'KeyName': 'my-key',
'SecurityGroups': [
{
'GroupId': 'sg-123'
},
],
'UserData': 'some user data',
'InstanceType': 't2.large',
'Monitoring': {
'Enabled': True
},
'SubnetId': subnet_id,
'IamInstanceProfile': {
'Arn': 'arn:aws:iam::123456789012:role/fleet'
},
'EbsOptimized': False,
'WeightedCapacity': 4.0,
'SpotPrice': '10.00',
}],
'AllocationStrategy': allocation_strategy,
'FulfilledCapacity': 6,
}
@mock_ec2
def test_create_spot_fleet_with_lowest_price():
conn = boto3.client("ec2", region_name='us-west-2')
subnet_id = get_subnet_id(conn)
spot_fleet_res = conn.request_spot_fleet(
SpotFleetRequestConfig=spot_config(subnet_id)
)
spot_fleet_id = spot_fleet_res['SpotFleetRequestId']
spot_fleet_requests = conn.describe_spot_fleet_requests(
SpotFleetRequestIds=[spot_fleet_id])['SpotFleetRequestConfigs']
len(spot_fleet_requests).should.equal(1)
spot_fleet_request = spot_fleet_requests[0]
spot_fleet_request['SpotFleetRequestState'].should.equal("active")
spot_fleet_config = spot_fleet_request['SpotFleetRequestConfig']
spot_fleet_config['SpotPrice'].should.equal('0.12')
spot_fleet_config['TargetCapacity'].should.equal(6)
spot_fleet_config['IamFleetRole'].should.equal(
'arn:aws:iam::123456789012:role/fleet')
spot_fleet_config['AllocationStrategy'].should.equal('lowestPrice')
spot_fleet_config['FulfilledCapacity'].should.equal(6.0)
len(spot_fleet_config['LaunchSpecifications']).should.equal(2)
launch_spec = spot_fleet_config['LaunchSpecifications'][0]
launch_spec['EbsOptimized'].should.equal(False)
launch_spec['SecurityGroups'].should.equal([{"GroupId": "sg-123"}])
launch_spec['IamInstanceProfile'].should.equal(
{"Arn": "arn:aws:iam::123456789012:role/fleet"})
launch_spec['ImageId'].should.equal("ami-123")
launch_spec['InstanceType'].should.equal("t2.small")
launch_spec['KeyName'].should.equal("my-key")
launch_spec['Monitoring'].should.equal({"Enabled": True})
launch_spec['SpotPrice'].should.equal("0.13")
launch_spec['SubnetId'].should.equal(subnet_id)
launch_spec['UserData'].should.equal("some user data")
launch_spec['WeightedCapacity'].should.equal(2.0)
instance_res = conn.describe_spot_fleet_instances(
SpotFleetRequestId=spot_fleet_id)
instances = instance_res['ActiveInstances']
len(instances).should.equal(3)
@mock_ec2
def test_create_diversified_spot_fleet():
conn = boto3.client("ec2", region_name='us-west-2')
subnet_id = get_subnet_id(conn)
diversified_config = spot_config(
subnet_id, allocation_strategy='diversified')
spot_fleet_res = conn.request_spot_fleet(
SpotFleetRequestConfig=diversified_config
)
spot_fleet_id = spot_fleet_res['SpotFleetRequestId']
instance_res = conn.describe_spot_fleet_instances(
SpotFleetRequestId=spot_fleet_id)
instances = instance_res['ActiveInstances']
len(instances).should.equal(2)
instance_types = set([instance['InstanceType'] for instance in instances])
instance_types.should.equal(set(["t2.small", "t2.large"]))
instances[0]['InstanceId'].should.contain("i-")
@mock_ec2
def test_create_spot_fleet_request_with_tag_spec():
conn = boto3.client("ec2", region_name='us-west-2')
subnet_id = get_subnet_id(conn)
tag_spec = [
{
'ResourceType': 'instance',
'Tags': [
{
'Key': 'tag-1',
'Value': 'foo',
},
{
'Key': 'tag-2',
'Value': 'bar',
},
]
},
]
config = spot_config(subnet_id)
config['LaunchSpecifications'][0]['TagSpecifications'] = tag_spec
spot_fleet_res = conn.request_spot_fleet(
SpotFleetRequestConfig=config
)
spot_fleet_id = spot_fleet_res['SpotFleetRequestId']
spot_fleet_requests = conn.describe_spot_fleet_requests(
SpotFleetRequestIds=[spot_fleet_id])['SpotFleetRequestConfigs']
spot_fleet_config = spot_fleet_requests[0]['SpotFleetRequestConfig']
spot_fleet_config['LaunchSpecifications'][0]['TagSpecifications'][0][
'ResourceType'].should.equal('instance')
for tag in tag_spec[0]['Tags']:
spot_fleet_config['LaunchSpecifications'][0]['TagSpecifications'][0]['Tags'].should.contain(tag)
instance_res = conn.describe_spot_fleet_instances(
SpotFleetRequestId=spot_fleet_id)
instances = conn.describe_instances(InstanceIds=[i['InstanceId'] for i in instance_res['ActiveInstances']])
for instance in instances['Reservations'][0]['Instances']:
for tag in tag_spec[0]['Tags']:
instance['Tags'].should.contain(tag)
@mock_ec2
def test_cancel_spot_fleet_request():
conn = boto3.client("ec2", region_name='us-west-2')
subnet_id = get_subnet_id(conn)
spot_fleet_res = conn.request_spot_fleet(
SpotFleetRequestConfig=spot_config(subnet_id),
)
spot_fleet_id = spot_fleet_res['SpotFleetRequestId']
conn.cancel_spot_fleet_requests(
SpotFleetRequestIds=[spot_fleet_id], TerminateInstances=True)
spot_fleet_requests = conn.describe_spot_fleet_requests(
SpotFleetRequestIds=[spot_fleet_id])['SpotFleetRequestConfigs']
len(spot_fleet_requests).should.equal(0)
@mock_ec2
def test_modify_spot_fleet_request_up():
conn = boto3.client("ec2", region_name='us-west-2')
subnet_id = get_subnet_id(conn)
spot_fleet_res = conn.request_spot_fleet(
SpotFleetRequestConfig=spot_config(subnet_id),
)
spot_fleet_id = spot_fleet_res['SpotFleetRequestId']
conn.modify_spot_fleet_request(
SpotFleetRequestId=spot_fleet_id, TargetCapacity=20)
instance_res = conn.describe_spot_fleet_instances(
SpotFleetRequestId=spot_fleet_id)
instances = instance_res['ActiveInstances']
len(instances).should.equal(10)
spot_fleet_config = conn.describe_spot_fleet_requests(
SpotFleetRequestIds=[spot_fleet_id])['SpotFleetRequestConfigs'][0]['SpotFleetRequestConfig']
spot_fleet_config['TargetCapacity'].should.equal(20)
spot_fleet_config['FulfilledCapacity'].should.equal(20.0)
@mock_ec2
def test_modify_spot_fleet_request_up_diversified():
conn = boto3.client("ec2", region_name='us-west-2')
subnet_id = get_subnet_id(conn)
spot_fleet_res = conn.request_spot_fleet(
SpotFleetRequestConfig=spot_config(
subnet_id, allocation_strategy='diversified'),
)
spot_fleet_id = spot_fleet_res['SpotFleetRequestId']
conn.modify_spot_fleet_request(
SpotFleetRequestId=spot_fleet_id, TargetCapacity=19)
instance_res = conn.describe_spot_fleet_instances(
SpotFleetRequestId=spot_fleet_id)
instances = instance_res['ActiveInstances']
len(instances).should.equal(7)
spot_fleet_config = conn.describe_spot_fleet_requests(
SpotFleetRequestIds=[spot_fleet_id])['SpotFleetRequestConfigs'][0]['SpotFleetRequestConfig']
spot_fleet_config['TargetCapacity'].should.equal(19)
spot_fleet_config['FulfilledCapacity'].should.equal(20.0)
@mock_ec2
def test_modify_spot_fleet_request_down_no_terminate():
conn = boto3.client("ec2", region_name='us-west-2')
subnet_id = get_subnet_id(conn)
spot_fleet_res = conn.request_spot_fleet(
SpotFleetRequestConfig=spot_config(subnet_id),
)
spot_fleet_id = spot_fleet_res['SpotFleetRequestId']
conn.modify_spot_fleet_request(
SpotFleetRequestId=spot_fleet_id, TargetCapacity=1, ExcessCapacityTerminationPolicy="noTermination")
instance_res = conn.describe_spot_fleet_instances(
SpotFleetRequestId=spot_fleet_id)
instances = instance_res['ActiveInstances']
len(instances).should.equal(3)
spot_fleet_config = conn.describe_spot_fleet_requests(
SpotFleetRequestIds=[spot_fleet_id])['SpotFleetRequestConfigs'][0]['SpotFleetRequestConfig']
spot_fleet_config['TargetCapacity'].should.equal(1)
spot_fleet_config['FulfilledCapacity'].should.equal(6.0)
@mock_ec2
def test_modify_spot_fleet_request_down_odd():
conn = boto3.client("ec2", region_name='us-west-2')
subnet_id = get_subnet_id(conn)
spot_fleet_res = conn.request_spot_fleet(
SpotFleetRequestConfig=spot_config(subnet_id),
)
spot_fleet_id = spot_fleet_res['SpotFleetRequestId']
conn.modify_spot_fleet_request(
SpotFleetRequestId=spot_fleet_id, TargetCapacity=7)
conn.modify_spot_fleet_request(
SpotFleetRequestId=spot_fleet_id, TargetCapacity=5)
instance_res = conn.describe_spot_fleet_instances(
SpotFleetRequestId=spot_fleet_id)
instances = instance_res['ActiveInstances']
len(instances).should.equal(3)
spot_fleet_config = conn.describe_spot_fleet_requests(
SpotFleetRequestIds=[spot_fleet_id])['SpotFleetRequestConfigs'][0]['SpotFleetRequestConfig']
spot_fleet_config['TargetCapacity'].should.equal(5)
spot_fleet_config['FulfilledCapacity'].should.equal(6.0)
@mock_ec2
def test_modify_spot_fleet_request_down():
conn = boto3.client("ec2", region_name='us-west-2')
subnet_id = get_subnet_id(conn)
spot_fleet_res = conn.request_spot_fleet(
SpotFleetRequestConfig=spot_config(subnet_id),
)
spot_fleet_id = spot_fleet_res['SpotFleetRequestId']
conn.modify_spot_fleet_request(
SpotFleetRequestId=spot_fleet_id, TargetCapacity=1)
instance_res = conn.describe_spot_fleet_instances(
SpotFleetRequestId=spot_fleet_id)
instances = instance_res['ActiveInstances']
len(instances).should.equal(1)
spot_fleet_config = conn.describe_spot_fleet_requests(
SpotFleetRequestIds=[spot_fleet_id])['SpotFleetRequestConfigs'][0]['SpotFleetRequestConfig']
spot_fleet_config['TargetCapacity'].should.equal(1)
spot_fleet_config['FulfilledCapacity'].should.equal(2.0)
@mock_ec2
def test_modify_spot_fleet_request_down_no_terminate_after_custom_terminate():
conn = boto3.client("ec2", region_name='us-west-2')
subnet_id = get_subnet_id(conn)
spot_fleet_res = conn.request_spot_fleet(
SpotFleetRequestConfig=spot_config(subnet_id),
)
spot_fleet_id = spot_fleet_res['SpotFleetRequestId']
instance_res = conn.describe_spot_fleet_instances(
SpotFleetRequestId=spot_fleet_id)
instances = instance_res['ActiveInstances']
conn.terminate_instances(InstanceIds=[i['InstanceId'] for i in instances[1:]])
conn.modify_spot_fleet_request(
SpotFleetRequestId=spot_fleet_id, TargetCapacity=1, ExcessCapacityTerminationPolicy="noTermination")
instance_res = conn.describe_spot_fleet_instances(
SpotFleetRequestId=spot_fleet_id)
instances = instance_res['ActiveInstances']
len(instances).should.equal(1)
spot_fleet_config = conn.describe_spot_fleet_requests(
SpotFleetRequestIds=[spot_fleet_id])['SpotFleetRequestConfigs'][0]['SpotFleetRequestConfig']
spot_fleet_config['TargetCapacity'].should.equal(1)
spot_fleet_config['FulfilledCapacity'].should.equal(2.0)
@mock_ec2
def test_create_spot_fleet_without_spot_price():
conn = boto3.client("ec2", region_name='us-west-2')
subnet_id = get_subnet_id(conn)
# remove prices to force a fallback to ondemand price
spot_config_without_price = spot_config(subnet_id)
del spot_config_without_price['SpotPrice']
for spec in spot_config_without_price['LaunchSpecifications']:
del spec['SpotPrice']
spot_fleet_id = conn.request_spot_fleet(SpotFleetRequestConfig=spot_config_without_price)['SpotFleetRequestId']
spot_fleet_requests = conn.describe_spot_fleet_requests(
SpotFleetRequestIds=[spot_fleet_id])['SpotFleetRequestConfigs']
len(spot_fleet_requests).should.equal(1)
spot_fleet_request = spot_fleet_requests[0]
spot_fleet_config = spot_fleet_request['SpotFleetRequestConfig']
len(spot_fleet_config['LaunchSpecifications']).should.equal(2)
launch_spec1 = spot_fleet_config['LaunchSpecifications'][0]
launch_spec2 = spot_fleet_config['LaunchSpecifications'][1]
# AWS will figure out the price
assert 'SpotPrice' not in launch_spec1
assert 'SpotPrice' not in launch_spec2

View file

@ -1,453 +1,482 @@
from __future__ import unicode_literals
from nose.tools import assert_raises
import itertools
import boto
import boto3
from boto.exception import EC2ResponseError
from boto.ec2.instance import Reservation
import sure # noqa
from moto import mock_ec2_deprecated, mock_ec2
from nose.tools import assert_raises
@mock_ec2_deprecated
def test_add_tag():
conn = boto.connect_ec2('the_key', 'the_secret')
reservation = conn.run_instances('ami-1234abcd')
instance = reservation.instances[0]
with assert_raises(EC2ResponseError) as ex:
instance.add_tag("a key", "some value", dry_run=True)
ex.exception.error_code.should.equal('DryRunOperation')
ex.exception.status.should.equal(400)
ex.exception.message.should.equal(
'An error occurred (DryRunOperation) when calling the CreateTags operation: Request would have succeeded, but DryRun flag is set')
instance.add_tag("a key", "some value")
chain = itertools.chain.from_iterable
existing_instances = list(
chain([res.instances for res in conn.get_all_instances()]))
existing_instances.should.have.length_of(1)
existing_instance = existing_instances[0]
existing_instance.tags["a key"].should.equal("some value")
@mock_ec2_deprecated
def test_remove_tag():
conn = boto.connect_ec2('the_key', 'the_secret')
reservation = conn.run_instances('ami-1234abcd')
instance = reservation.instances[0]
instance.add_tag("a key", "some value")
tags = conn.get_all_tags()
tag = tags[0]
tag.name.should.equal("a key")
tag.value.should.equal("some value")
with assert_raises(EC2ResponseError) as ex:
instance.remove_tag("a key", dry_run=True)
ex.exception.error_code.should.equal('DryRunOperation')
ex.exception.status.should.equal(400)
ex.exception.message.should.equal(
'An error occurred (DryRunOperation) when calling the DeleteTags operation: Request would have succeeded, but DryRun flag is set')
instance.remove_tag("a key")
conn.get_all_tags().should.have.length_of(0)
instance.add_tag("a key", "some value")
conn.get_all_tags().should.have.length_of(1)
instance.remove_tag("a key", "some value")
@mock_ec2_deprecated
def test_get_all_tags():
conn = boto.connect_ec2('the_key', 'the_secret')
reservation = conn.run_instances('ami-1234abcd')
instance = reservation.instances[0]
instance.add_tag("a key", "some value")
tags = conn.get_all_tags()
tag = tags[0]
tag.name.should.equal("a key")
tag.value.should.equal("some value")
@mock_ec2_deprecated
def test_get_all_tags_with_special_characters():
conn = boto.connect_ec2('the_key', 'the_secret')
reservation = conn.run_instances('ami-1234abcd')
instance = reservation.instances[0]
instance.add_tag("a key", "some<> value")
tags = conn.get_all_tags()
tag = tags[0]
tag.name.should.equal("a key")
tag.value.should.equal("some<> value")
@mock_ec2_deprecated
def test_create_tags():
conn = boto.connect_ec2('the_key', 'the_secret')
reservation = conn.run_instances('ami-1234abcd')
instance = reservation.instances[0]
tag_dict = {'a key': 'some value',
'another key': 'some other value',
'blank key': ''}
with assert_raises(EC2ResponseError) as ex:
conn.create_tags(instance.id, tag_dict, dry_run=True)
ex.exception.error_code.should.equal('DryRunOperation')
ex.exception.status.should.equal(400)
ex.exception.message.should.equal(
'An error occurred (DryRunOperation) when calling the CreateTags operation: Request would have succeeded, but DryRun flag is set')
conn.create_tags(instance.id, tag_dict)
tags = conn.get_all_tags()
set([key for key in tag_dict]).should.equal(
set([tag.name for tag in tags]))
set([tag_dict[key] for key in tag_dict]).should.equal(
set([tag.value for tag in tags]))
@mock_ec2_deprecated
def test_tag_limit_exceeded():
conn = boto.connect_ec2('the_key', 'the_secret')
reservation = conn.run_instances('ami-1234abcd')
instance = reservation.instances[0]
tag_dict = {}
for i in range(51):
tag_dict['{0:02d}'.format(i + 1)] = ''
with assert_raises(EC2ResponseError) as cm:
conn.create_tags(instance.id, tag_dict)
cm.exception.code.should.equal('TagLimitExceeded')
cm.exception.status.should.equal(400)
cm.exception.request_id.should_not.be.none
instance.add_tag("a key", "a value")
with assert_raises(EC2ResponseError) as cm:
conn.create_tags(instance.id, tag_dict)
cm.exception.code.should.equal('TagLimitExceeded')
cm.exception.status.should.equal(400)
cm.exception.request_id.should_not.be.none
tags = conn.get_all_tags()
tag = tags[0]
tags.should.have.length_of(1)
tag.name.should.equal("a key")
tag.value.should.equal("a value")
@mock_ec2_deprecated
def test_invalid_parameter_tag_null():
conn = boto.connect_ec2('the_key', 'the_secret')
reservation = conn.run_instances('ami-1234abcd')
instance = reservation.instances[0]
with assert_raises(EC2ResponseError) as cm:
instance.add_tag("a key", None)
cm.exception.code.should.equal('InvalidParameterValue')
cm.exception.status.should.equal(400)
cm.exception.request_id.should_not.be.none
@mock_ec2_deprecated
def test_invalid_id():
conn = boto.connect_ec2('the_key', 'the_secret')
with assert_raises(EC2ResponseError) as cm:
conn.create_tags('ami-blah', {'key': 'tag'})
cm.exception.code.should.equal('InvalidID')
cm.exception.status.should.equal(400)
cm.exception.request_id.should_not.be.none
with assert_raises(EC2ResponseError) as cm:
conn.create_tags('blah-blah', {'key': 'tag'})
cm.exception.code.should.equal('InvalidID')
cm.exception.status.should.equal(400)
cm.exception.request_id.should_not.be.none
@mock_ec2_deprecated
def test_get_all_tags_resource_id_filter():
conn = boto.connect_ec2('the_key', 'the_secret')
reservation = conn.run_instances('ami-1234abcd')
instance = reservation.instances[0]
instance.add_tag("an instance key", "some value")
image_id = conn.create_image(instance.id, "test-ami", "this is a test ami")
image = conn.get_image(image_id)
image.add_tag("an image key", "some value")
tags = conn.get_all_tags(filters={'resource-id': instance.id})
tag = tags[0]
tags.should.have.length_of(1)
tag.res_id.should.equal(instance.id)
tag.res_type.should.equal('instance')
tag.name.should.equal("an instance key")
tag.value.should.equal("some value")
tags = conn.get_all_tags(filters={'resource-id': image_id})
tag = tags[0]
tags.should.have.length_of(1)
tag.res_id.should.equal(image_id)
tag.res_type.should.equal('image')
tag.name.should.equal("an image key")
tag.value.should.equal("some value")
@mock_ec2_deprecated
def test_get_all_tags_resource_type_filter():
conn = boto.connect_ec2('the_key', 'the_secret')
reservation = conn.run_instances('ami-1234abcd')
instance = reservation.instances[0]
instance.add_tag("an instance key", "some value")
image_id = conn.create_image(instance.id, "test-ami", "this is a test ami")
image = conn.get_image(image_id)
image.add_tag("an image key", "some value")
tags = conn.get_all_tags(filters={'resource-type': 'instance'})
tag = tags[0]
tags.should.have.length_of(1)
tag.res_id.should.equal(instance.id)
tag.res_type.should.equal('instance')
tag.name.should.equal("an instance key")
tag.value.should.equal("some value")
tags = conn.get_all_tags(filters={'resource-type': 'image'})
tag = tags[0]
tags.should.have.length_of(1)
tag.res_id.should.equal(image_id)
tag.res_type.should.equal('image')
tag.name.should.equal("an image key")
tag.value.should.equal("some value")
@mock_ec2_deprecated
def test_get_all_tags_key_filter():
conn = boto.connect_ec2('the_key', 'the_secret')
reservation = conn.run_instances('ami-1234abcd')
instance = reservation.instances[0]
instance.add_tag("an instance key", "some value")
image_id = conn.create_image(instance.id, "test-ami", "this is a test ami")
image = conn.get_image(image_id)
image.add_tag("an image key", "some value")
tags = conn.get_all_tags(filters={'key': 'an instance key'})
tag = tags[0]
tags.should.have.length_of(1)
tag.res_id.should.equal(instance.id)
tag.res_type.should.equal('instance')
tag.name.should.equal("an instance key")
tag.value.should.equal("some value")
@mock_ec2_deprecated
def test_get_all_tags_value_filter():
conn = boto.connect_ec2('the_key', 'the_secret')
reservation = conn.run_instances('ami-1234abcd')
instance = reservation.instances[0]
instance.add_tag("an instance key", "some value")
reservation_b = conn.run_instances('ami-1234abcd')
instance_b = reservation_b.instances[0]
instance_b.add_tag("an instance key", "some other value")
reservation_c = conn.run_instances('ami-1234abcd')
instance_c = reservation_c.instances[0]
instance_c.add_tag("an instance key", "other value*")
reservation_d = conn.run_instances('ami-1234abcd')
instance_d = reservation_d.instances[0]
instance_d.add_tag("an instance key", "other value**")
reservation_e = conn.run_instances('ami-1234abcd')
instance_e = reservation_e.instances[0]
instance_e.add_tag("an instance key", "other value*?")
image_id = conn.create_image(instance.id, "test-ami", "this is a test ami")
image = conn.get_image(image_id)
image.add_tag("an image key", "some value")
tags = conn.get_all_tags(filters={'value': 'some value'})
tags.should.have.length_of(2)
tags = conn.get_all_tags(filters={'value': 'some*value'})
tags.should.have.length_of(3)
tags = conn.get_all_tags(filters={'value': '*some*value'})
tags.should.have.length_of(3)
tags = conn.get_all_tags(filters={'value': '*some*value*'})
tags.should.have.length_of(3)
tags = conn.get_all_tags(filters={'value': '*value\*'})
tags.should.have.length_of(1)
tags = conn.get_all_tags(filters={'value': '*value\*\*'})
tags.should.have.length_of(1)
tags = conn.get_all_tags(filters={'value': '*value\*\?'})
tags.should.have.length_of(1)
@mock_ec2_deprecated
def test_retrieved_instances_must_contain_their_tags():
tag_key = 'Tag name'
tag_value = 'Tag value'
tags_to_be_set = {tag_key: tag_value}
conn = boto.connect_ec2('the_key', 'the_secret')
reservation = conn.run_instances('ami-1234abcd')
reservation.should.be.a(Reservation)
reservation.instances.should.have.length_of(1)
instance = reservation.instances[0]
reservations = conn.get_all_instances()
reservations.should.have.length_of(1)
reservations[0].id.should.equal(reservation.id)
instances = reservations[0].instances
instances.should.have.length_of(1)
instances[0].id.should.equal(instance.id)
conn.create_tags([instance.id], tags_to_be_set)
reservations = conn.get_all_instances()
instance = reservations[0].instances[0]
retrieved_tags = instance.tags
# Cleanup of instance
conn.terminate_instances([instances[0].id])
# Check whether tag is present with correct value
retrieved_tags[tag_key].should.equal(tag_value)
@mock_ec2_deprecated
def test_retrieved_volumes_must_contain_their_tags():
tag_key = 'Tag name'
tag_value = 'Tag value'
tags_to_be_set = {tag_key: tag_value}
conn = boto.connect_ec2('the_key', 'the_secret')
volume = conn.create_volume(80, "us-east-1a")
all_volumes = conn.get_all_volumes()
volume = all_volumes[0]
conn.create_tags([volume.id], tags_to_be_set)
# Fetch the volume again
all_volumes = conn.get_all_volumes()
volume = all_volumes[0]
retrieved_tags = volume.tags
volume.delete()
# Check whether tag is present with correct value
retrieved_tags[tag_key].should.equal(tag_value)
@mock_ec2_deprecated
def test_retrieved_snapshots_must_contain_their_tags():
tag_key = 'Tag name'
tag_value = 'Tag value'
tags_to_be_set = {tag_key: tag_value}
conn = boto.connect_ec2(aws_access_key_id='the_key',
aws_secret_access_key='the_secret')
volume = conn.create_volume(80, "eu-west-1a")
snapshot = conn.create_snapshot(volume.id)
conn.create_tags([snapshot.id], tags_to_be_set)
# Fetch the snapshot again
all_snapshots = conn.get_all_snapshots()
snapshot = [item for item in all_snapshots if item.id == snapshot.id][0]
retrieved_tags = snapshot.tags
conn.delete_snapshot(snapshot.id)
volume.delete()
# Check whether tag is present with correct value
retrieved_tags[tag_key].should.equal(tag_value)
@mock_ec2_deprecated
def test_filter_instances_by_wildcard_tags():
conn = boto.connect_ec2(aws_access_key_id='the_key',
aws_secret_access_key='the_secret')
reservation = conn.run_instances('ami-1234abcd')
instance_a = reservation.instances[0]
instance_a.add_tag("Key1", "Value1")
reservation_b = conn.run_instances('ami-1234abcd')
instance_b = reservation_b.instances[0]
instance_b.add_tag("Key1", "Value2")
reservations = conn.get_all_instances(filters={'tag:Key1': 'Value*'})
reservations.should.have.length_of(2)
reservations = conn.get_all_instances(filters={'tag-key': 'Key*'})
reservations.should.have.length_of(2)
reservations = conn.get_all_instances(filters={'tag-value': 'Value*'})
reservations.should.have.length_of(2)
@mock_ec2
def test_create_volume_with_tags():
client = boto3.client('ec2', 'us-west-2')
response = client.create_volume(
AvailabilityZone='us-west-2',
Encrypted=False,
Size=40,
TagSpecifications=[
{
'ResourceType': 'volume',
'Tags': [
{
'Key': 'TEST_TAG',
'Value': 'TEST_VALUE'
}
],
}
]
)
assert response['Tags'][0]['Key'] == 'TEST_TAG'
@mock_ec2
def test_create_snapshot_with_tags():
client = boto3.client('ec2', 'us-west-2')
volume_id = client.create_volume(
AvailabilityZone='us-west-2',
Encrypted=False,
Size=40,
TagSpecifications=[
{
'ResourceType': 'volume',
'Tags': [
{
'Key': 'TEST_TAG',
'Value': 'TEST_VALUE'
}
],
}
]
)['VolumeId']
snapshot = client.create_snapshot(
VolumeId=volume_id,
TagSpecifications=[
{
'ResourceType': 'snapshot',
'Tags': [
{
'Key': 'TEST_SNAPSHOT_TAG',
'Value': 'TEST_SNAPSHOT_VALUE'
}
],
}
]
)
expected_tags = [{
'Key': 'TEST_SNAPSHOT_TAG',
'Value': 'TEST_SNAPSHOT_VALUE'
}]
assert snapshot['Tags'] == expected_tags
from __future__ import unicode_literals
from nose.tools import assert_raises
import itertools
import boto
import boto3
from botocore.exceptions import ClientError
from boto.exception import EC2ResponseError
from boto.ec2.instance import Reservation
import sure # noqa
from moto import mock_ec2_deprecated, mock_ec2
from nose.tools import assert_raises
@mock_ec2_deprecated
def test_add_tag():
conn = boto.connect_ec2('the_key', 'the_secret')
reservation = conn.run_instances('ami-1234abcd')
instance = reservation.instances[0]
with assert_raises(EC2ResponseError) as ex:
instance.add_tag("a key", "some value", dry_run=True)
ex.exception.error_code.should.equal('DryRunOperation')
ex.exception.status.should.equal(400)
ex.exception.message.should.equal(
'An error occurred (DryRunOperation) when calling the CreateTags operation: Request would have succeeded, but DryRun flag is set')
instance.add_tag("a key", "some value")
chain = itertools.chain.from_iterable
existing_instances = list(
chain([res.instances for res in conn.get_all_instances()]))
existing_instances.should.have.length_of(1)
existing_instance = existing_instances[0]
existing_instance.tags["a key"].should.equal("some value")
@mock_ec2_deprecated
def test_remove_tag():
conn = boto.connect_ec2('the_key', 'the_secret')
reservation = conn.run_instances('ami-1234abcd')
instance = reservation.instances[0]
instance.add_tag("a key", "some value")
tags = conn.get_all_tags()
tag = tags[0]
tag.name.should.equal("a key")
tag.value.should.equal("some value")
with assert_raises(EC2ResponseError) as ex:
instance.remove_tag("a key", dry_run=True)
ex.exception.error_code.should.equal('DryRunOperation')
ex.exception.status.should.equal(400)
ex.exception.message.should.equal(
'An error occurred (DryRunOperation) when calling the DeleteTags operation: Request would have succeeded, but DryRun flag is set')
instance.remove_tag("a key")
conn.get_all_tags().should.have.length_of(0)
instance.add_tag("a key", "some value")
conn.get_all_tags().should.have.length_of(1)
instance.remove_tag("a key", "some value")
@mock_ec2_deprecated
def test_get_all_tags():
conn = boto.connect_ec2('the_key', 'the_secret')
reservation = conn.run_instances('ami-1234abcd')
instance = reservation.instances[0]
instance.add_tag("a key", "some value")
tags = conn.get_all_tags()
tag = tags[0]
tag.name.should.equal("a key")
tag.value.should.equal("some value")
@mock_ec2_deprecated
def test_get_all_tags_with_special_characters():
conn = boto.connect_ec2('the_key', 'the_secret')
reservation = conn.run_instances('ami-1234abcd')
instance = reservation.instances[0]
instance.add_tag("a key", "some<> value")
tags = conn.get_all_tags()
tag = tags[0]
tag.name.should.equal("a key")
tag.value.should.equal("some<> value")
@mock_ec2_deprecated
def test_create_tags():
conn = boto.connect_ec2('the_key', 'the_secret')
reservation = conn.run_instances('ami-1234abcd')
instance = reservation.instances[0]
tag_dict = {'a key': 'some value',
'another key': 'some other value',
'blank key': ''}
with assert_raises(EC2ResponseError) as ex:
conn.create_tags(instance.id, tag_dict, dry_run=True)
ex.exception.error_code.should.equal('DryRunOperation')
ex.exception.status.should.equal(400)
ex.exception.message.should.equal(
'An error occurred (DryRunOperation) when calling the CreateTags operation: Request would have succeeded, but DryRun flag is set')
conn.create_tags(instance.id, tag_dict)
tags = conn.get_all_tags()
set([key for key in tag_dict]).should.equal(
set([tag.name for tag in tags]))
set([tag_dict[key] for key in tag_dict]).should.equal(
set([tag.value for tag in tags]))
@mock_ec2_deprecated
def test_tag_limit_exceeded():
conn = boto.connect_ec2('the_key', 'the_secret')
reservation = conn.run_instances('ami-1234abcd')
instance = reservation.instances[0]
tag_dict = {}
for i in range(51):
tag_dict['{0:02d}'.format(i + 1)] = ''
with assert_raises(EC2ResponseError) as cm:
conn.create_tags(instance.id, tag_dict)
cm.exception.code.should.equal('TagLimitExceeded')
cm.exception.status.should.equal(400)
cm.exception.request_id.should_not.be.none
instance.add_tag("a key", "a value")
with assert_raises(EC2ResponseError) as cm:
conn.create_tags(instance.id, tag_dict)
cm.exception.code.should.equal('TagLimitExceeded')
cm.exception.status.should.equal(400)
cm.exception.request_id.should_not.be.none
tags = conn.get_all_tags()
tag = tags[0]
tags.should.have.length_of(1)
tag.name.should.equal("a key")
tag.value.should.equal("a value")
@mock_ec2_deprecated
def test_invalid_parameter_tag_null():
conn = boto.connect_ec2('the_key', 'the_secret')
reservation = conn.run_instances('ami-1234abcd')
instance = reservation.instances[0]
with assert_raises(EC2ResponseError) as cm:
instance.add_tag("a key", None)
cm.exception.code.should.equal('InvalidParameterValue')
cm.exception.status.should.equal(400)
cm.exception.request_id.should_not.be.none
@mock_ec2_deprecated
def test_invalid_id():
conn = boto.connect_ec2('the_key', 'the_secret')
with assert_raises(EC2ResponseError) as cm:
conn.create_tags('ami-blah', {'key': 'tag'})
cm.exception.code.should.equal('InvalidID')
cm.exception.status.should.equal(400)
cm.exception.request_id.should_not.be.none
with assert_raises(EC2ResponseError) as cm:
conn.create_tags('blah-blah', {'key': 'tag'})
cm.exception.code.should.equal('InvalidID')
cm.exception.status.should.equal(400)
cm.exception.request_id.should_not.be.none
@mock_ec2_deprecated
def test_get_all_tags_resource_id_filter():
conn = boto.connect_ec2('the_key', 'the_secret')
reservation = conn.run_instances('ami-1234abcd')
instance = reservation.instances[0]
instance.add_tag("an instance key", "some value")
image_id = conn.create_image(instance.id, "test-ami", "this is a test ami")
image = conn.get_image(image_id)
image.add_tag("an image key", "some value")
tags = conn.get_all_tags(filters={'resource-id': instance.id})
tag = tags[0]
tags.should.have.length_of(1)
tag.res_id.should.equal(instance.id)
tag.res_type.should.equal('instance')
tag.name.should.equal("an instance key")
tag.value.should.equal("some value")
tags = conn.get_all_tags(filters={'resource-id': image_id})
tag = tags[0]
tags.should.have.length_of(1)
tag.res_id.should.equal(image_id)
tag.res_type.should.equal('image')
tag.name.should.equal("an image key")
tag.value.should.equal("some value")
@mock_ec2_deprecated
def test_get_all_tags_resource_type_filter():
conn = boto.connect_ec2('the_key', 'the_secret')
reservation = conn.run_instances('ami-1234abcd')
instance = reservation.instances[0]
instance.add_tag("an instance key", "some value")
image_id = conn.create_image(instance.id, "test-ami", "this is a test ami")
image = conn.get_image(image_id)
image.add_tag("an image key", "some value")
tags = conn.get_all_tags(filters={'resource-type': 'instance'})
tag = tags[0]
tags.should.have.length_of(1)
tag.res_id.should.equal(instance.id)
tag.res_type.should.equal('instance')
tag.name.should.equal("an instance key")
tag.value.should.equal("some value")
tags = conn.get_all_tags(filters={'resource-type': 'image'})
tag = tags[0]
tags.should.have.length_of(1)
tag.res_id.should.equal(image_id)
tag.res_type.should.equal('image')
tag.name.should.equal("an image key")
tag.value.should.equal("some value")
@mock_ec2_deprecated
def test_get_all_tags_key_filter():
conn = boto.connect_ec2('the_key', 'the_secret')
reservation = conn.run_instances('ami-1234abcd')
instance = reservation.instances[0]
instance.add_tag("an instance key", "some value")
image_id = conn.create_image(instance.id, "test-ami", "this is a test ami")
image = conn.get_image(image_id)
image.add_tag("an image key", "some value")
tags = conn.get_all_tags(filters={'key': 'an instance key'})
tag = tags[0]
tags.should.have.length_of(1)
tag.res_id.should.equal(instance.id)
tag.res_type.should.equal('instance')
tag.name.should.equal("an instance key")
tag.value.should.equal("some value")
@mock_ec2_deprecated
def test_get_all_tags_value_filter():
conn = boto.connect_ec2('the_key', 'the_secret')
reservation = conn.run_instances('ami-1234abcd')
instance = reservation.instances[0]
instance.add_tag("an instance key", "some value")
reservation_b = conn.run_instances('ami-1234abcd')
instance_b = reservation_b.instances[0]
instance_b.add_tag("an instance key", "some other value")
reservation_c = conn.run_instances('ami-1234abcd')
instance_c = reservation_c.instances[0]
instance_c.add_tag("an instance key", "other value*")
reservation_d = conn.run_instances('ami-1234abcd')
instance_d = reservation_d.instances[0]
instance_d.add_tag("an instance key", "other value**")
reservation_e = conn.run_instances('ami-1234abcd')
instance_e = reservation_e.instances[0]
instance_e.add_tag("an instance key", "other value*?")
image_id = conn.create_image(instance.id, "test-ami", "this is a test ami")
image = conn.get_image(image_id)
image.add_tag("an image key", "some value")
tags = conn.get_all_tags(filters={'value': 'some value'})
tags.should.have.length_of(2)
tags = conn.get_all_tags(filters={'value': 'some*value'})
tags.should.have.length_of(3)
tags = conn.get_all_tags(filters={'value': '*some*value'})
tags.should.have.length_of(3)
tags = conn.get_all_tags(filters={'value': '*some*value*'})
tags.should.have.length_of(3)
tags = conn.get_all_tags(filters={'value': '*value\*'})
tags.should.have.length_of(1)
tags = conn.get_all_tags(filters={'value': '*value\*\*'})
tags.should.have.length_of(1)
tags = conn.get_all_tags(filters={'value': '*value\*\?'})
tags.should.have.length_of(1)
@mock_ec2_deprecated
def test_retrieved_instances_must_contain_their_tags():
tag_key = 'Tag name'
tag_value = 'Tag value'
tags_to_be_set = {tag_key: tag_value}
conn = boto.connect_ec2('the_key', 'the_secret')
reservation = conn.run_instances('ami-1234abcd')
reservation.should.be.a(Reservation)
reservation.instances.should.have.length_of(1)
instance = reservation.instances[0]
reservations = conn.get_all_instances()
reservations.should.have.length_of(1)
reservations[0].id.should.equal(reservation.id)
instances = reservations[0].instances
instances.should.have.length_of(1)
instances[0].id.should.equal(instance.id)
conn.create_tags([instance.id], tags_to_be_set)
reservations = conn.get_all_instances()
instance = reservations[0].instances[0]
retrieved_tags = instance.tags
# Cleanup of instance
conn.terminate_instances([instances[0].id])
# Check whether tag is present with correct value
retrieved_tags[tag_key].should.equal(tag_value)
@mock_ec2_deprecated
def test_retrieved_volumes_must_contain_their_tags():
tag_key = 'Tag name'
tag_value = 'Tag value'
tags_to_be_set = {tag_key: tag_value}
conn = boto.connect_ec2('the_key', 'the_secret')
volume = conn.create_volume(80, "us-east-1a")
all_volumes = conn.get_all_volumes()
volume = all_volumes[0]
conn.create_tags([volume.id], tags_to_be_set)
# Fetch the volume again
all_volumes = conn.get_all_volumes()
volume = all_volumes[0]
retrieved_tags = volume.tags
volume.delete()
# Check whether tag is present with correct value
retrieved_tags[tag_key].should.equal(tag_value)
@mock_ec2_deprecated
def test_retrieved_snapshots_must_contain_their_tags():
tag_key = 'Tag name'
tag_value = 'Tag value'
tags_to_be_set = {tag_key: tag_value}
conn = boto.connect_ec2(aws_access_key_id='the_key',
aws_secret_access_key='the_secret')
volume = conn.create_volume(80, "eu-west-1a")
snapshot = conn.create_snapshot(volume.id)
conn.create_tags([snapshot.id], tags_to_be_set)
# Fetch the snapshot again
all_snapshots = conn.get_all_snapshots()
snapshot = [item for item in all_snapshots if item.id == snapshot.id][0]
retrieved_tags = snapshot.tags
conn.delete_snapshot(snapshot.id)
volume.delete()
# Check whether tag is present with correct value
retrieved_tags[tag_key].should.equal(tag_value)
@mock_ec2_deprecated
def test_filter_instances_by_wildcard_tags():
conn = boto.connect_ec2(aws_access_key_id='the_key',
aws_secret_access_key='the_secret')
reservation = conn.run_instances('ami-1234abcd')
instance_a = reservation.instances[0]
instance_a.add_tag("Key1", "Value1")
reservation_b = conn.run_instances('ami-1234abcd')
instance_b = reservation_b.instances[0]
instance_b.add_tag("Key1", "Value2")
reservations = conn.get_all_instances(filters={'tag:Key1': 'Value*'})
reservations.should.have.length_of(2)
reservations = conn.get_all_instances(filters={'tag-key': 'Key*'})
reservations.should.have.length_of(2)
reservations = conn.get_all_instances(filters={'tag-value': 'Value*'})
reservations.should.have.length_of(2)
@mock_ec2
def test_create_volume_with_tags():
client = boto3.client('ec2', 'us-west-2')
response = client.create_volume(
AvailabilityZone='us-west-2',
Encrypted=False,
Size=40,
TagSpecifications=[
{
'ResourceType': 'volume',
'Tags': [
{
'Key': 'TEST_TAG',
'Value': 'TEST_VALUE'
}
],
}
]
)
assert response['Tags'][0]['Key'] == 'TEST_TAG'
@mock_ec2
def test_create_snapshot_with_tags():
client = boto3.client('ec2', 'us-west-2')
volume_id = client.create_volume(
AvailabilityZone='us-west-2',
Encrypted=False,
Size=40,
TagSpecifications=[
{
'ResourceType': 'volume',
'Tags': [
{
'Key': 'TEST_TAG',
'Value': 'TEST_VALUE'
}
],
}
]
)['VolumeId']
snapshot = client.create_snapshot(
VolumeId=volume_id,
TagSpecifications=[
{
'ResourceType': 'snapshot',
'Tags': [
{
'Key': 'TEST_SNAPSHOT_TAG',
'Value': 'TEST_SNAPSHOT_VALUE'
}
],
}
]
)
expected_tags = [{
'Key': 'TEST_SNAPSHOT_TAG',
'Value': 'TEST_SNAPSHOT_VALUE'
}]
assert snapshot['Tags'] == expected_tags
@mock_ec2
def test_create_tag_empty_resource():
# create ec2 client in us-west-1
client = boto3.client('ec2', region_name='us-west-1')
# create tag with empty resource
with assert_raises(ClientError) as ex:
client.create_tags(
Resources=[],
Tags=[{'Key': 'Value'}]
)
ex.exception.response['Error']['Code'].should.equal('MissingParameter')
ex.exception.response['Error']['Message'].should.equal('The request must contain the parameter resourceIdSet')
@mock_ec2
def test_delete_tag_empty_resource():
# create ec2 client in us-west-1
client = boto3.client('ec2', region_name='us-west-1')
# delete tag with empty resource
with assert_raises(ClientError) as ex:
client.delete_tags(
Resources=[],
Tags=[{'Key': 'Value'}]
)
ex.exception.response['Error']['Code'].should.equal('MissingParameter')
ex.exception.response['Error']['Message'].should.equal('The request must contain the parameter resourceIdSet')

View file

@ -1,132 +1,133 @@
from __future__ import unicode_literals
# Ensure 'assert_raises' context manager support for Python 2.6
import tests.backport_assert_raises
from nose.tools import assert_raises
from moto.ec2.exceptions import EC2ClientError
from botocore.exceptions import ClientError
import boto3
import boto
from boto.exception import EC2ResponseError
import sure # noqa
from moto import mock_ec2, mock_ec2_deprecated
from tests.helpers import requires_boto_gte
@requires_boto_gte("2.32.0")
@mock_ec2_deprecated
def test_vpc_peering_connections():
conn = boto.connect_vpc('the_key', 'the_secret')
vpc = conn.create_vpc("10.0.0.0/16")
peer_vpc = conn.create_vpc("11.0.0.0/16")
vpc_pcx = conn.create_vpc_peering_connection(vpc.id, peer_vpc.id)
vpc_pcx._status.code.should.equal('initiating-request')
return vpc_pcx
@requires_boto_gte("2.32.0")
@mock_ec2_deprecated
def test_vpc_peering_connections_get_all():
conn = boto.connect_vpc('the_key', 'the_secret')
vpc_pcx = test_vpc_peering_connections()
vpc_pcx._status.code.should.equal('initiating-request')
all_vpc_pcxs = conn.get_all_vpc_peering_connections()
all_vpc_pcxs.should.have.length_of(1)
all_vpc_pcxs[0]._status.code.should.equal('pending-acceptance')
@requires_boto_gte("2.32.0")
@mock_ec2_deprecated
def test_vpc_peering_connections_accept():
conn = boto.connect_vpc('the_key', 'the_secret')
vpc_pcx = test_vpc_peering_connections()
vpc_pcx = conn.accept_vpc_peering_connection(vpc_pcx.id)
vpc_pcx._status.code.should.equal('active')
with assert_raises(EC2ResponseError) as cm:
conn.reject_vpc_peering_connection(vpc_pcx.id)
cm.exception.code.should.equal('InvalidStateTransition')
cm.exception.status.should.equal(400)
cm.exception.request_id.should_not.be.none
all_vpc_pcxs = conn.get_all_vpc_peering_connections()
all_vpc_pcxs.should.have.length_of(1)
all_vpc_pcxs[0]._status.code.should.equal('active')
@requires_boto_gte("2.32.0")
@mock_ec2_deprecated
def test_vpc_peering_connections_reject():
conn = boto.connect_vpc('the_key', 'the_secret')
vpc_pcx = test_vpc_peering_connections()
verdict = conn.reject_vpc_peering_connection(vpc_pcx.id)
verdict.should.equal(True)
with assert_raises(EC2ResponseError) as cm:
conn.accept_vpc_peering_connection(vpc_pcx.id)
cm.exception.code.should.equal('InvalidStateTransition')
cm.exception.status.should.equal(400)
cm.exception.request_id.should_not.be.none
all_vpc_pcxs = conn.get_all_vpc_peering_connections()
all_vpc_pcxs.should.have.length_of(1)
all_vpc_pcxs[0]._status.code.should.equal('rejected')
@requires_boto_gte("2.32.1")
@mock_ec2_deprecated
def test_vpc_peering_connections_delete():
conn = boto.connect_vpc('the_key', 'the_secret')
vpc_pcx = test_vpc_peering_connections()
verdict = vpc_pcx.delete()
verdict.should.equal(True)
all_vpc_pcxs = conn.get_all_vpc_peering_connections()
all_vpc_pcxs.should.have.length_of(0)
with assert_raises(EC2ResponseError) as cm:
conn.delete_vpc_peering_connection("pcx-1234abcd")
cm.exception.code.should.equal('InvalidVpcPeeringConnectionId.NotFound')
cm.exception.status.should.equal(400)
cm.exception.request_id.should_not.be.none
@mock_ec2
def test_vpc_peering_connections_cross_region():
# create vpc in us-west-1 and ap-northeast-1
ec2_usw1 = boto3.resource('ec2', region_name='us-west-1')
vpc_usw1 = ec2_usw1.create_vpc(CidrBlock='10.90.0.0/16')
ec2_apn1 = boto3.resource('ec2', region_name='ap-northeast-1')
vpc_apn1 = ec2_apn1.create_vpc(CidrBlock='10.20.0.0/16')
# create peering
vpc_pcx = ec2_usw1.create_vpc_peering_connection(
VpcId=vpc_usw1.id,
PeerVpcId=vpc_apn1.id,
PeerRegion='ap-northeast-1',
)
vpc_pcx.status['Code'].should.equal('initiating-request')
vpc_pcx.requester_vpc.id.should.equal(vpc_usw1.id)
vpc_pcx.accepter_vpc.id.should.equal(vpc_apn1.id)
@mock_ec2
def test_vpc_peering_connections_cross_region_fail():
# create vpc in us-west-1 and ap-northeast-1
ec2_usw1 = boto3.resource('ec2', region_name='us-west-1')
vpc_usw1 = ec2_usw1.create_vpc(CidrBlock='10.90.0.0/16')
ec2_apn1 = boto3.resource('ec2', region_name='ap-northeast-1')
vpc_apn1 = ec2_apn1.create_vpc(CidrBlock='10.20.0.0/16')
# create peering wrong region with no vpc
with assert_raises(ClientError) as cm:
ec2_usw1.create_vpc_peering_connection(
VpcId=vpc_usw1.id,
PeerVpcId=vpc_apn1.id,
PeerRegion='ap-northeast-2')
cm.exception.response['Error']['Code'].should.equal('InvalidVpcID.NotFound')
from __future__ import unicode_literals
# Ensure 'assert_raises' context manager support for Python 2.6
import tests.backport_assert_raises
from nose.tools import assert_raises
from moto.ec2.exceptions import EC2ClientError
from botocore.exceptions import ClientError
import boto3
import boto
from boto.exception import EC2ResponseError
import sure # noqa
from moto import mock_ec2, mock_ec2_deprecated
from tests.helpers import requires_boto_gte
@requires_boto_gte("2.32.0")
@mock_ec2_deprecated
def test_vpc_peering_connections():
conn = boto.connect_vpc('the_key', 'the_secret')
vpc = conn.create_vpc("10.0.0.0/16")
peer_vpc = conn.create_vpc("11.0.0.0/16")
vpc_pcx = conn.create_vpc_peering_connection(vpc.id, peer_vpc.id)
vpc_pcx._status.code.should.equal('initiating-request')
return vpc_pcx
@requires_boto_gte("2.32.0")
@mock_ec2_deprecated
def test_vpc_peering_connections_get_all():
conn = boto.connect_vpc('the_key', 'the_secret')
vpc_pcx = test_vpc_peering_connections()
vpc_pcx._status.code.should.equal('initiating-request')
all_vpc_pcxs = conn.get_all_vpc_peering_connections()
all_vpc_pcxs.should.have.length_of(1)
all_vpc_pcxs[0]._status.code.should.equal('pending-acceptance')
@requires_boto_gte("2.32.0")
@mock_ec2_deprecated
def test_vpc_peering_connections_accept():
conn = boto.connect_vpc('the_key', 'the_secret')
vpc_pcx = test_vpc_peering_connections()
vpc_pcx = conn.accept_vpc_peering_connection(vpc_pcx.id)
vpc_pcx._status.code.should.equal('active')
with assert_raises(EC2ResponseError) as cm:
conn.reject_vpc_peering_connection(vpc_pcx.id)
cm.exception.code.should.equal('InvalidStateTransition')
cm.exception.status.should.equal(400)
cm.exception.request_id.should_not.be.none
all_vpc_pcxs = conn.get_all_vpc_peering_connections()
all_vpc_pcxs.should.have.length_of(1)
all_vpc_pcxs[0]._status.code.should.equal('active')
@requires_boto_gte("2.32.0")
@mock_ec2_deprecated
def test_vpc_peering_connections_reject():
conn = boto.connect_vpc('the_key', 'the_secret')
vpc_pcx = test_vpc_peering_connections()
verdict = conn.reject_vpc_peering_connection(vpc_pcx.id)
verdict.should.equal(True)
with assert_raises(EC2ResponseError) as cm:
conn.accept_vpc_peering_connection(vpc_pcx.id)
cm.exception.code.should.equal('InvalidStateTransition')
cm.exception.status.should.equal(400)
cm.exception.request_id.should_not.be.none
all_vpc_pcxs = conn.get_all_vpc_peering_connections()
all_vpc_pcxs.should.have.length_of(1)
all_vpc_pcxs[0]._status.code.should.equal('rejected')
@requires_boto_gte("2.32.1")
@mock_ec2_deprecated
def test_vpc_peering_connections_delete():
conn = boto.connect_vpc('the_key', 'the_secret')
vpc_pcx = test_vpc_peering_connections()
verdict = vpc_pcx.delete()
verdict.should.equal(True)
all_vpc_pcxs = conn.get_all_vpc_peering_connections()
all_vpc_pcxs.should.have.length_of(1)
all_vpc_pcxs[0]._status.code.should.equal('deleted')
with assert_raises(EC2ResponseError) as cm:
conn.delete_vpc_peering_connection("pcx-1234abcd")
cm.exception.code.should.equal('InvalidVpcPeeringConnectionId.NotFound')
cm.exception.status.should.equal(400)
cm.exception.request_id.should_not.be.none
@mock_ec2
def test_vpc_peering_connections_cross_region():
# create vpc in us-west-1 and ap-northeast-1
ec2_usw1 = boto3.resource('ec2', region_name='us-west-1')
vpc_usw1 = ec2_usw1.create_vpc(CidrBlock='10.90.0.0/16')
ec2_apn1 = boto3.resource('ec2', region_name='ap-northeast-1')
vpc_apn1 = ec2_apn1.create_vpc(CidrBlock='10.20.0.0/16')
# create peering
vpc_pcx = ec2_usw1.create_vpc_peering_connection(
VpcId=vpc_usw1.id,
PeerVpcId=vpc_apn1.id,
PeerRegion='ap-northeast-1',
)
vpc_pcx.status['Code'].should.equal('initiating-request')
vpc_pcx.requester_vpc.id.should.equal(vpc_usw1.id)
vpc_pcx.accepter_vpc.id.should.equal(vpc_apn1.id)
@mock_ec2
def test_vpc_peering_connections_cross_region_fail():
# create vpc in us-west-1 and ap-northeast-1
ec2_usw1 = boto3.resource('ec2', region_name='us-west-1')
vpc_usw1 = ec2_usw1.create_vpc(CidrBlock='10.90.0.0/16')
ec2_apn1 = boto3.resource('ec2', region_name='ap-northeast-1')
vpc_apn1 = ec2_apn1.create_vpc(CidrBlock='10.20.0.0/16')
# create peering wrong region with no vpc
with assert_raises(ClientError) as cm:
ec2_usw1.create_vpc_peering_connection(
VpcId=vpc_usw1.id,
PeerVpcId=vpc_apn1.id,
PeerRegion='ap-northeast-2')
cm.exception.response['Error']['Code'].should.equal('InvalidVpcID.NotFound')