Merge remote-tracking branch 'upstream/master'

This commit is contained in:
Stephan 2019-05-28 08:55:50 +02:00
commit d0de38601d
110 changed files with 22566 additions and 20240 deletions

View file

15
tests/test_ec2/helpers.py Normal file
View file

@ -0,0 +1,15 @@
import six
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives.asymmetric import rsa
def rsa_check_private_key(private_key_material):
assert isinstance(private_key_material, six.string_types)
private_key = serialization.load_pem_private_key(
data=private_key_material.encode('ascii'),
backend=default_backend(),
password=None)
assert isinstance(private_key, rsa.RSAPrivateKey)

View file

@ -16,7 +16,7 @@ from moto import mock_ec2_deprecated, mock_ec2
@mock_ec2_deprecated
def test_create_and_delete_volume():
conn = boto.connect_ec2('the_key', 'the_secret')
conn = boto.ec2.connect_to_region("us-east-1")
volume = conn.create_volume(80, "us-east-1a")
all_volumes = conn.get_all_volumes()
@ -52,7 +52,7 @@ def test_create_and_delete_volume():
@mock_ec2_deprecated
def test_create_encrypted_volume_dryrun():
conn = boto.connect_ec2('the_key', 'the_secret')
conn = boto.ec2.connect_to_region("us-east-1")
with assert_raises(EC2ResponseError) as ex:
conn.create_volume(80, "us-east-1a", encrypted=True, dry_run=True)
ex.exception.error_code.should.equal('DryRunOperation')
@ -63,7 +63,7 @@ def test_create_encrypted_volume_dryrun():
@mock_ec2_deprecated
def test_create_encrypted_volume():
conn = boto.connect_ec2('the_key', 'the_secret')
conn = boto.ec2.connect_to_region("us-east-1")
volume = conn.create_volume(80, "us-east-1a", encrypted=True)
with assert_raises(EC2ResponseError) as ex:
@ -79,7 +79,7 @@ def test_create_encrypted_volume():
@mock_ec2_deprecated
def test_filter_volume_by_id():
conn = boto.connect_ec2('the_key', 'the_secret')
conn = boto.ec2.connect_to_region("us-east-1")
volume1 = conn.create_volume(80, "us-east-1a")
volume2 = conn.create_volume(36, "us-east-1b")
volume3 = conn.create_volume(20, "us-east-1c")
@ -99,7 +99,7 @@ def test_filter_volume_by_id():
@mock_ec2_deprecated
def test_volume_filters():
conn = boto.connect_ec2('the_key', 'the_secret')
conn = boto.ec2.connect_to_region("us-east-1")
reservation = conn.run_instances('ami-1234abcd')
instance = reservation.instances[0]
@ -196,7 +196,7 @@ def test_volume_filters():
@mock_ec2_deprecated
def test_volume_attach_and_detach():
conn = boto.connect_ec2('the_key', 'the_secret')
conn = boto.ec2.connect_to_region("us-east-1")
reservation = conn.run_instances('ami-1234abcd')
instance = reservation.instances[0]
volume = conn.create_volume(80, "us-east-1a")
@ -252,7 +252,7 @@ def test_volume_attach_and_detach():
@mock_ec2_deprecated
def test_create_snapshot():
conn = boto.connect_ec2('the_key', 'the_secret')
conn = boto.ec2.connect_to_region("us-east-1")
volume = conn.create_volume(80, "us-east-1a")
with assert_raises(EC2ResponseError) as ex:
@ -291,7 +291,7 @@ def test_create_snapshot():
@mock_ec2_deprecated
def test_create_encrypted_snapshot():
conn = boto.connect_ec2('the_key', 'the_secret')
conn = boto.ec2.connect_to_region("us-east-1")
volume = conn.create_volume(80, "us-east-1a", encrypted=True)
snapshot = volume.create_snapshot('a test snapshot')
snapshot.update()
@ -306,7 +306,7 @@ def test_create_encrypted_snapshot():
@mock_ec2_deprecated
def test_filter_snapshot_by_id():
conn = boto.connect_ec2('the_key', 'the_secret')
conn = boto.ec2.connect_to_region("us-east-1")
volume1 = conn.create_volume(36, "us-east-1a")
snap1 = volume1.create_snapshot('a test snapshot 1')
volume2 = conn.create_volume(42, 'us-east-1a')
@ -333,7 +333,7 @@ def test_filter_snapshot_by_id():
@mock_ec2_deprecated
def test_snapshot_filters():
conn = boto.connect_ec2('the_key', 'the_secret')
conn = boto.ec2.connect_to_region("us-east-1")
volume1 = conn.create_volume(20, "us-east-1a", encrypted=False)
volume2 = conn.create_volume(25, "us-east-1a", encrypted=True)
@ -394,12 +394,17 @@ def test_snapshot_filters():
set([snap.id for snap in snapshots_by_encrypted]
).should.equal({snapshot3.id})
snapshots_by_owner_id = conn.get_all_snapshots(
filters={'owner-id': '123456789012'})
set([snap.id for snap in snapshots_by_owner_id]
).should.equal({snapshot1.id, snapshot2.id, snapshot3.id})
@mock_ec2_deprecated
def test_snapshot_attribute():
import copy
conn = boto.connect_ec2('the_key', 'the_secret')
conn = boto.ec2.connect_to_region("us-east-1")
volume = conn.create_volume(80, "us-east-1a")
snapshot = volume.create_snapshot()
@ -502,7 +507,7 @@ def test_snapshot_attribute():
@mock_ec2_deprecated
def test_create_volume_from_snapshot():
conn = boto.connect_ec2('the_key', 'the_secret')
conn = boto.ec2.connect_to_region("us-east-1")
volume = conn.create_volume(80, "us-east-1a")
snapshot = volume.create_snapshot('a test snapshot')
@ -524,7 +529,7 @@ def test_create_volume_from_snapshot():
@mock_ec2_deprecated
def test_create_volume_from_encrypted_snapshot():
conn = boto.connect_ec2('the_key', 'the_secret')
conn = boto.ec2.connect_to_region("us-east-1")
volume = conn.create_volume(80, "us-east-1a", encrypted=True)
snapshot = volume.create_snapshot('a test snapshot')
@ -569,7 +574,7 @@ def test_modify_attribute_blockDeviceMapping():
@mock_ec2_deprecated
def test_volume_tag_escaping():
conn = boto.connect_ec2('the_key', 'the_secret')
conn = boto.ec2.connect_to_region("us-east-1")
vol = conn.create_volume(10, 'us-east-1a')
snapshot = conn.create_snapshot(vol.id, 'Desc')

View file

@ -42,7 +42,7 @@ def test_add_servers():
@freeze_time("2014-01-01 05:00:00")
@mock_ec2_deprecated
def test_instance_launch_and_terminate():
conn = boto.connect_ec2('the_key', 'the_secret')
conn = boto.ec2.connect_to_region("us-east-1")
with assert_raises(EC2ResponseError) as ex:
reservation = conn.run_instances('ami-1234abcd', dry_run=True)
@ -820,7 +820,7 @@ def test_run_instance_with_instance_type():
@mock_ec2_deprecated
def test_run_instance_with_default_placement():
conn = boto.connect_ec2('the_key', 'the_secret')
conn = boto.ec2.connect_to_region("us-east-1")
reservation = conn.run_instances('ami-1234abcd')
instance = reservation.instances[0]

View file

@ -1,151 +1,224 @@
from __future__ import unicode_literals
# Ensure 'assert_raises' context manager support for Python 2.6
import tests.backport_assert_raises
from nose.tools import assert_raises
import boto
import six
import sure # noqa
from boto.exception import EC2ResponseError
from moto import mock_ec2_deprecated
@mock_ec2_deprecated
def test_key_pairs_empty():
conn = boto.connect_ec2('the_key', 'the_secret')
assert len(conn.get_all_key_pairs()) == 0
@mock_ec2_deprecated
def test_key_pairs_invalid_id():
conn = boto.connect_ec2('the_key', 'the_secret')
with assert_raises(EC2ResponseError) as cm:
conn.get_all_key_pairs('foo')
cm.exception.code.should.equal('InvalidKeyPair.NotFound')
cm.exception.status.should.equal(400)
cm.exception.request_id.should_not.be.none
@mock_ec2_deprecated
def test_key_pairs_create():
conn = boto.connect_ec2('the_key', 'the_secret')
with assert_raises(EC2ResponseError) as ex:
kp = conn.create_key_pair('foo', dry_run=True)
ex.exception.error_code.should.equal('DryRunOperation')
ex.exception.status.should.equal(400)
ex.exception.message.should.equal(
'An error occurred (DryRunOperation) when calling the CreateKeyPair operation: Request would have succeeded, but DryRun flag is set')
kp = conn.create_key_pair('foo')
assert kp.material.startswith('---- BEGIN RSA PRIVATE KEY ----')
kps = conn.get_all_key_pairs()
assert len(kps) == 1
assert kps[0].name == 'foo'
@mock_ec2_deprecated
def test_key_pairs_create_two():
conn = boto.connect_ec2('the_key', 'the_secret')
kp = conn.create_key_pair('foo')
kp = conn.create_key_pair('bar')
assert kp.material.startswith('---- BEGIN RSA PRIVATE KEY ----')
kps = conn.get_all_key_pairs()
kps.should.have.length_of(2)
[i.name for i in kps].should.contain('foo')
[i.name for i in kps].should.contain('bar')
kps = conn.get_all_key_pairs('foo')
kps.should.have.length_of(1)
kps[0].name.should.equal('foo')
@mock_ec2_deprecated
def test_key_pairs_create_exist():
conn = boto.connect_ec2('the_key', 'the_secret')
kp = conn.create_key_pair('foo')
assert kp.material.startswith('---- BEGIN RSA PRIVATE KEY ----')
assert len(conn.get_all_key_pairs()) == 1
with assert_raises(EC2ResponseError) as cm:
conn.create_key_pair('foo')
cm.exception.code.should.equal('InvalidKeyPair.Duplicate')
cm.exception.status.should.equal(400)
cm.exception.request_id.should_not.be.none
@mock_ec2_deprecated
def test_key_pairs_delete_no_exist():
conn = boto.connect_ec2('the_key', 'the_secret')
assert len(conn.get_all_key_pairs()) == 0
r = conn.delete_key_pair('foo')
r.should.be.ok
@mock_ec2_deprecated
def test_key_pairs_delete_exist():
conn = boto.connect_ec2('the_key', 'the_secret')
conn.create_key_pair('foo')
with assert_raises(EC2ResponseError) as ex:
r = conn.delete_key_pair('foo', dry_run=True)
ex.exception.error_code.should.equal('DryRunOperation')
ex.exception.status.should.equal(400)
ex.exception.message.should.equal(
'An error occurred (DryRunOperation) when calling the DeleteKeyPair operation: Request would have succeeded, but DryRun flag is set')
r = conn.delete_key_pair('foo')
r.should.be.ok
assert len(conn.get_all_key_pairs()) == 0
@mock_ec2_deprecated
def test_key_pairs_import():
conn = boto.connect_ec2('the_key', 'the_secret')
with assert_raises(EC2ResponseError) as ex:
kp = conn.import_key_pair('foo', b'content', dry_run=True)
ex.exception.error_code.should.equal('DryRunOperation')
ex.exception.status.should.equal(400)
ex.exception.message.should.equal(
'An error occurred (DryRunOperation) when calling the ImportKeyPair operation: Request would have succeeded, but DryRun flag is set')
kp = conn.import_key_pair('foo', b'content')
assert kp.name == 'foo'
kps = conn.get_all_key_pairs()
assert len(kps) == 1
assert kps[0].name == 'foo'
@mock_ec2_deprecated
def test_key_pairs_import_exist():
conn = boto.connect_ec2('the_key', 'the_secret')
kp = conn.import_key_pair('foo', b'content')
assert kp.name == 'foo'
assert len(conn.get_all_key_pairs()) == 1
with assert_raises(EC2ResponseError) as cm:
conn.create_key_pair('foo')
cm.exception.code.should.equal('InvalidKeyPair.Duplicate')
cm.exception.status.should.equal(400)
cm.exception.request_id.should_not.be.none
@mock_ec2_deprecated
def test_key_pair_filters():
conn = boto.connect_ec2('the_key', 'the_secret')
_ = conn.create_key_pair('kpfltr1')
kp2 = conn.create_key_pair('kpfltr2')
kp3 = conn.create_key_pair('kpfltr3')
kp_by_name = conn.get_all_key_pairs(
filters={'key-name': 'kpfltr2'})
set([kp.name for kp in kp_by_name]
).should.equal(set([kp2.name]))
kp_by_name = conn.get_all_key_pairs(
filters={'fingerprint': kp3.fingerprint})
set([kp.name for kp in kp_by_name]
).should.equal(set([kp3.name]))
from __future__ import unicode_literals
# Ensure 'assert_raises' context manager support for Python 2.6
import tests.backport_assert_raises
from nose.tools import assert_raises
import boto
import sure # noqa
from boto.exception import EC2ResponseError
from moto import mock_ec2_deprecated
from .helpers import rsa_check_private_key
RSA_PUBLIC_KEY_OPENSSH = b"""\
ssh-rsa \
AAAAB3NzaC1yc2EAAAADAQABAAABAQDusXfgTE4eBP50NglSzCSEGnIL6+cr6m3H\
6cZANOQ+P1o/W4BdtcAL3sor4iGi7SOeJgo\8kweyMQrhrt6HaKGgromRiz37LQx\
4YIAcBi4Zd023mO/V7Rc2Chh18mWgLSmA6ng+j37ip6452zxtv0jHAz9pJolbKBp\
JzbZlPN45ZCTk9ck0fSVHRl6VRSSPQcpqi65XpRf+35zNOCGCc1mAOOTmw59Q2a6\
A3t8mL7r91aM5q6QOQm219lctFM8O7HRJnDgmhGpnjRwE1LyKktWTbgFZ4SNWU2X\
qusUO07jKuSxzPumXBeU+JEtx0J1tqZwJlpGt2R+0qN7nKnPl2+hx \
moto@github.com"""
RSA_PUBLIC_KEY_RFC4716 = b"""\
---- BEGIN SSH2 PUBLIC KEY ----
AAAAB3NzaC1yc2EAAAADAQABAAABAQDusXfgTE4eBP50NglSzCSEGnIL6+cr6m3H6cZANO
Q+P1o/W4BdtcAL3sor4iGi7SOeJgo8kweyMQrhrt6HaKGgromRiz37LQx4YIAcBi4Zd023
mO/V7Rc2Chh18mWgLSmA6ng+j37ip6452zxtv0jHAz9pJolbKBpJzbZlPN45ZCTk9ck0fS
VHRl6VRSSPQcpqi65XpRf+35zNOCGCc1mAOOTmw59Q2a6A3t8mL7r91aM5q6QOQm219lct
FM8O7HRJnDgmhGpnjRwE1LyKktWTbgFZ4SNWU2XqusUO07jKuSxzPumXBeU+JEtx0J1tqZ
wJlpGt2R+0qN7nKnPl2+hx
---- END SSH2 PUBLIC KEY ----
"""
RSA_PUBLIC_KEY_FINGERPRINT = "6a:49:07:1c:7e:bd:d2:bd:96:25:fe:b5:74:83:ae:fd"
DSA_PUBLIC_KEY_OPENSSH = b"""ssh-dss \
AAAAB3NzaC1kc3MAAACBAJ0aXctVwbN6VB81gpo8R7DUk8zXRjZvrkg8Y8vEGt63gklpNJNsLXtEUXkl5D4c0nD2FZO1rJNqFoe\
OQOCoGSfclHvt9w4yPl/lUEtb3Qtj1j80MInETHr19vaSunRk5R+M+8YH+LLcdYdz7MijuGey02mbi0H9K5nUIcuLMArVAAAAFQ\
D0RDvsObRWBlnaW8645obZBM86jwAAAIBNZwf3B4krIzAwVfkMHLDSdAvs7lOWE7o8SJLzr9t4a9HhYp9SLbMzJ815KWfidEYV2\
+s4ZaPCfcZ1GENFRbE8rixz5eMAjEUXEPMJkblDZTHzMsH96z2cOCQZ0vfOmgznsf18Uf725pqo9OqAioEsTJjX8jtI2qNPEBU0\
uhMSZQAAAIBBMGhDu5CWPUlS2QG7vzmzw81XasmHE/s2YPDRbolkriwlunpgwZhCscoQP8HFHY+DLUVvUb+GZwBmFt4l1uHl03b\
ffsm7UIHtCBYERr9Nx0u20ldfhkgB1lhaJb5o0ZJ3pmJ38KChfyHe5EUcqRdEFo89Mp72VI2Z6UHyL175RA== \
moto@github.com"""
@mock_ec2_deprecated
def test_key_pairs_empty():
conn = boto.connect_ec2('the_key', 'the_secret')
assert len(conn.get_all_key_pairs()) == 0
@mock_ec2_deprecated
def test_key_pairs_invalid_id():
conn = boto.connect_ec2('the_key', 'the_secret')
with assert_raises(EC2ResponseError) as cm:
conn.get_all_key_pairs('foo')
cm.exception.code.should.equal('InvalidKeyPair.NotFound')
cm.exception.status.should.equal(400)
cm.exception.request_id.should_not.be.none
@mock_ec2_deprecated
def test_key_pairs_create():
conn = boto.connect_ec2('the_key', 'the_secret')
with assert_raises(EC2ResponseError) as ex:
conn.create_key_pair('foo', dry_run=True)
ex.exception.error_code.should.equal('DryRunOperation')
ex.exception.status.should.equal(400)
ex.exception.message.should.equal(
'An error occurred (DryRunOperation) when calling the CreateKeyPair operation: Request would have succeeded, but DryRun flag is set')
kp = conn.create_key_pair('foo')
rsa_check_private_key(kp.material)
kps = conn.get_all_key_pairs()
assert len(kps) == 1
assert kps[0].name == 'foo'
@mock_ec2_deprecated
def test_key_pairs_create_two():
conn = boto.connect_ec2('the_key', 'the_secret')
kp1 = conn.create_key_pair('foo')
rsa_check_private_key(kp1.material)
kp2 = conn.create_key_pair('bar')
rsa_check_private_key(kp2.material)
assert kp1.material != kp2.material
kps = conn.get_all_key_pairs()
kps.should.have.length_of(2)
assert {i.name for i in kps} == {'foo', 'bar'}
kps = conn.get_all_key_pairs('foo')
kps.should.have.length_of(1)
kps[0].name.should.equal('foo')
@mock_ec2_deprecated
def test_key_pairs_create_exist():
conn = boto.connect_ec2('the_key', 'the_secret')
conn.create_key_pair('foo')
assert len(conn.get_all_key_pairs()) == 1
with assert_raises(EC2ResponseError) as cm:
conn.create_key_pair('foo')
cm.exception.code.should.equal('InvalidKeyPair.Duplicate')
cm.exception.status.should.equal(400)
cm.exception.request_id.should_not.be.none
@mock_ec2_deprecated
def test_key_pairs_delete_no_exist():
conn = boto.connect_ec2('the_key', 'the_secret')
assert len(conn.get_all_key_pairs()) == 0
r = conn.delete_key_pair('foo')
r.should.be.ok
@mock_ec2_deprecated
def test_key_pairs_delete_exist():
conn = boto.connect_ec2('the_key', 'the_secret')
conn.create_key_pair('foo')
with assert_raises(EC2ResponseError) as ex:
r = conn.delete_key_pair('foo', dry_run=True)
ex.exception.error_code.should.equal('DryRunOperation')
ex.exception.status.should.equal(400)
ex.exception.message.should.equal(
'An error occurred (DryRunOperation) when calling the DeleteKeyPair operation: Request would have succeeded, but DryRun flag is set')
r = conn.delete_key_pair('foo')
r.should.be.ok
assert len(conn.get_all_key_pairs()) == 0
@mock_ec2_deprecated
def test_key_pairs_import():
conn = boto.connect_ec2('the_key', 'the_secret')
with assert_raises(EC2ResponseError) as ex:
conn.import_key_pair('foo', RSA_PUBLIC_KEY_OPENSSH, dry_run=True)
ex.exception.error_code.should.equal('DryRunOperation')
ex.exception.status.should.equal(400)
ex.exception.message.should.equal(
'An error occurred (DryRunOperation) when calling the ImportKeyPair operation: Request would have succeeded, but DryRun flag is set')
kp1 = conn.import_key_pair('foo', RSA_PUBLIC_KEY_OPENSSH)
assert kp1.name == 'foo'
assert kp1.fingerprint == RSA_PUBLIC_KEY_FINGERPRINT
kp2 = conn.import_key_pair('foo2', RSA_PUBLIC_KEY_RFC4716)
assert kp2.name == 'foo2'
assert kp2.fingerprint == RSA_PUBLIC_KEY_FINGERPRINT
kps = conn.get_all_key_pairs()
assert len(kps) == 2
assert kps[0].name == kp1.name
assert kps[1].name == kp2.name
@mock_ec2_deprecated
def test_key_pairs_import_exist():
conn = boto.connect_ec2('the_key', 'the_secret')
kp = conn.import_key_pair('foo', RSA_PUBLIC_KEY_OPENSSH)
assert kp.name == 'foo'
assert len(conn.get_all_key_pairs()) == 1
with assert_raises(EC2ResponseError) as cm:
conn.create_key_pair('foo')
cm.exception.code.should.equal('InvalidKeyPair.Duplicate')
cm.exception.status.should.equal(400)
cm.exception.request_id.should_not.be.none
@mock_ec2_deprecated
def test_key_pairs_invalid():
conn = boto.connect_ec2('the_key', 'the_secret')
with assert_raises(EC2ResponseError) as ex:
conn.import_key_pair('foo', b'')
ex.exception.error_code.should.equal('InvalidKeyPair.Format')
ex.exception.status.should.equal(400)
ex.exception.message.should.equal(
'Key is not in valid OpenSSH public key format')
with assert_raises(EC2ResponseError) as ex:
conn.import_key_pair('foo', b'garbage')
ex.exception.error_code.should.equal('InvalidKeyPair.Format')
ex.exception.status.should.equal(400)
ex.exception.message.should.equal(
'Key is not in valid OpenSSH public key format')
with assert_raises(EC2ResponseError) as ex:
conn.import_key_pair('foo', DSA_PUBLIC_KEY_OPENSSH)
ex.exception.error_code.should.equal('InvalidKeyPair.Format')
ex.exception.status.should.equal(400)
ex.exception.message.should.equal(
'Key is not in valid OpenSSH public key format')
@mock_ec2_deprecated
def test_key_pair_filters():
conn = boto.connect_ec2('the_key', 'the_secret')
_ = conn.create_key_pair('kpfltr1')
kp2 = conn.create_key_pair('kpfltr2')
kp3 = conn.create_key_pair('kpfltr3')
kp_by_name = conn.get_all_key_pairs(
filters={'key-name': 'kpfltr2'})
set([kp.name for kp in kp_by_name]
).should.equal(set([kp2.name]))
kp_by_name = conn.get_all_key_pairs(
filters={'fingerprint': kp3.fingerprint})
set([kp.name for kp in kp_by_name]
).should.equal(set([kp3.name]))

View file

@ -2,6 +2,8 @@ from __future__ import unicode_literals
import boto
import boto3
import sure # noqa
from nose.tools import assert_raises
from botocore.exceptions import ClientError
from moto import mock_ec2_deprecated, mock_ec2
@ -28,7 +30,7 @@ def test_new_subnet_associates_with_default_network_acl():
conn = boto.connect_vpc('the_key', 'the secret')
vpc = conn.get_all_vpcs()[0]
subnet = conn.create_subnet(vpc.id, "10.0.0.0/18")
subnet = conn.create_subnet(vpc.id, "172.31.48.0/20")
all_network_acls = conn.get_all_network_acls()
all_network_acls.should.have.length_of(1)
@ -214,3 +216,37 @@ def test_default_network_acl_default_entries():
unique_entries.append(entry)
unique_entries.should.have.length_of(4)
@mock_ec2
def test_delete_default_network_acl_default_entry():
ec2 = boto3.resource('ec2', region_name='us-west-1')
default_network_acl = next(iter(ec2.network_acls.all()), None)
default_network_acl.is_default.should.be.ok
default_network_acl.entries.should.have.length_of(4)
first_default_network_acl_entry = default_network_acl.entries[0]
default_network_acl.delete_entry(Egress=first_default_network_acl_entry['Egress'],
RuleNumber=first_default_network_acl_entry['RuleNumber'])
default_network_acl.entries.should.have.length_of(3)
@mock_ec2
def test_duplicate_network_acl_entry():
ec2 = boto3.resource('ec2', region_name='us-west-1')
default_network_acl = next(iter(ec2.network_acls.all()), None)
default_network_acl.is_default.should.be.ok
rule_number = 200
egress = True
default_network_acl.create_entry(CidrBlock="0.0.0.0/0", Egress=egress, Protocol="-1", RuleAction="allow", RuleNumber=rule_number)
with assert_raises(ClientError) as ex:
default_network_acl.create_entry(CidrBlock="10.0.0.0/0", Egress=egress, Protocol="-1", RuleAction="deny", RuleNumber=rule_number)
str(ex.exception).should.equal(
"An error occurred (NetworkAclEntryAlreadyExists) when calling the CreateNetworkAclEntry "
"operation: The network acl entry identified by {} already exists.".format(rule_number))

View file

@ -1,148 +1,150 @@
from __future__ import unicode_literals
import boto.ec2
import boto.ec2.autoscale
import boto.ec2.elb
import sure
from moto import mock_ec2_deprecated, mock_autoscaling_deprecated, mock_elb_deprecated
from moto.ec2 import ec2_backends
def test_use_boto_regions():
boto_regions = {r.name for r in boto.ec2.regions()}
moto_regions = set(ec2_backends)
moto_regions.should.equal(boto_regions)
def add_servers_to_region(ami_id, count, region):
conn = boto.ec2.connect_to_region(region)
for index in range(count):
conn.run_instances(ami_id)
@mock_ec2_deprecated
def test_add_servers_to_a_single_region():
region = 'ap-northeast-1'
add_servers_to_region('ami-1234abcd', 1, region)
add_servers_to_region('ami-5678efgh', 1, region)
conn = boto.ec2.connect_to_region(region)
reservations = conn.get_all_instances()
len(reservations).should.equal(2)
reservations.sort(key=lambda x: x.instances[0].image_id)
reservations[0].instances[0].image_id.should.equal('ami-1234abcd')
reservations[1].instances[0].image_id.should.equal('ami-5678efgh')
@mock_ec2_deprecated
def test_add_servers_to_multiple_regions():
region1 = 'us-east-1'
region2 = 'ap-northeast-1'
add_servers_to_region('ami-1234abcd', 1, region1)
add_servers_to_region('ami-5678efgh', 1, region2)
us_conn = boto.ec2.connect_to_region(region1)
ap_conn = boto.ec2.connect_to_region(region2)
us_reservations = us_conn.get_all_instances()
ap_reservations = ap_conn.get_all_instances()
len(us_reservations).should.equal(1)
len(ap_reservations).should.equal(1)
us_reservations[0].instances[0].image_id.should.equal('ami-1234abcd')
ap_reservations[0].instances[0].image_id.should.equal('ami-5678efgh')
@mock_autoscaling_deprecated
@mock_elb_deprecated
def test_create_autoscaling_group():
elb_conn = boto.ec2.elb.connect_to_region('us-east-1')
elb_conn.create_load_balancer(
'us_test_lb', zones=[], listeners=[(80, 8080, 'http')])
elb_conn = boto.ec2.elb.connect_to_region('ap-northeast-1')
elb_conn.create_load_balancer(
'ap_test_lb', zones=[], listeners=[(80, 8080, 'http')])
us_conn = boto.ec2.autoscale.connect_to_region('us-east-1')
config = boto.ec2.autoscale.LaunchConfiguration(
name='us_tester',
image_id='ami-abcd1234',
instance_type='m1.small',
)
us_conn.create_launch_configuration(config)
group = boto.ec2.autoscale.AutoScalingGroup(
name='us_tester_group',
availability_zones=['us-east-1c'],
default_cooldown=60,
desired_capacity=2,
health_check_period=100,
health_check_type="EC2",
max_size=2,
min_size=2,
launch_config=config,
load_balancers=["us_test_lb"],
placement_group="us_test_placement",
vpc_zone_identifier='subnet-1234abcd',
termination_policies=["OldestInstance", "NewestInstance"],
)
us_conn.create_auto_scaling_group(group)
ap_conn = boto.ec2.autoscale.connect_to_region('ap-northeast-1')
config = boto.ec2.autoscale.LaunchConfiguration(
name='ap_tester',
image_id='ami-efgh5678',
instance_type='m1.small',
)
ap_conn.create_launch_configuration(config)
group = boto.ec2.autoscale.AutoScalingGroup(
name='ap_tester_group',
availability_zones=['ap-northeast-1a'],
default_cooldown=60,
desired_capacity=2,
health_check_period=100,
health_check_type="EC2",
max_size=2,
min_size=2,
launch_config=config,
load_balancers=["ap_test_lb"],
placement_group="ap_test_placement",
vpc_zone_identifier='subnet-5678efgh',
termination_policies=["OldestInstance", "NewestInstance"],
)
ap_conn.create_auto_scaling_group(group)
len(us_conn.get_all_groups()).should.equal(1)
len(ap_conn.get_all_groups()).should.equal(1)
us_group = us_conn.get_all_groups()[0]
us_group.name.should.equal('us_tester_group')
list(us_group.availability_zones).should.equal(['us-east-1c'])
us_group.desired_capacity.should.equal(2)
us_group.max_size.should.equal(2)
us_group.min_size.should.equal(2)
us_group.vpc_zone_identifier.should.equal('subnet-1234abcd')
us_group.launch_config_name.should.equal('us_tester')
us_group.default_cooldown.should.equal(60)
us_group.health_check_period.should.equal(100)
us_group.health_check_type.should.equal("EC2")
list(us_group.load_balancers).should.equal(["us_test_lb"])
us_group.placement_group.should.equal("us_test_placement")
list(us_group.termination_policies).should.equal(
["OldestInstance", "NewestInstance"])
ap_group = ap_conn.get_all_groups()[0]
ap_group.name.should.equal('ap_tester_group')
list(ap_group.availability_zones).should.equal(['ap-northeast-1a'])
ap_group.desired_capacity.should.equal(2)
ap_group.max_size.should.equal(2)
ap_group.min_size.should.equal(2)
ap_group.vpc_zone_identifier.should.equal('subnet-5678efgh')
ap_group.launch_config_name.should.equal('ap_tester')
ap_group.default_cooldown.should.equal(60)
ap_group.health_check_period.should.equal(100)
ap_group.health_check_type.should.equal("EC2")
list(ap_group.load_balancers).should.equal(["ap_test_lb"])
ap_group.placement_group.should.equal("ap_test_placement")
list(ap_group.termination_policies).should.equal(
["OldestInstance", "NewestInstance"])
from __future__ import unicode_literals
import boto.ec2
import boto.ec2.autoscale
import boto.ec2.elb
import sure
from moto import mock_ec2_deprecated, mock_autoscaling_deprecated, mock_elb_deprecated
from moto.ec2 import ec2_backends
def test_use_boto_regions():
boto_regions = {r.name for r in boto.ec2.regions()}
moto_regions = set(ec2_backends)
moto_regions.should.equal(boto_regions)
def add_servers_to_region(ami_id, count, region):
conn = boto.ec2.connect_to_region(region)
for index in range(count):
conn.run_instances(ami_id)
@mock_ec2_deprecated
def test_add_servers_to_a_single_region():
region = 'ap-northeast-1'
add_servers_to_region('ami-1234abcd', 1, region)
add_servers_to_region('ami-5678efgh', 1, region)
conn = boto.ec2.connect_to_region(region)
reservations = conn.get_all_instances()
len(reservations).should.equal(2)
reservations.sort(key=lambda x: x.instances[0].image_id)
reservations[0].instances[0].image_id.should.equal('ami-1234abcd')
reservations[1].instances[0].image_id.should.equal('ami-5678efgh')
@mock_ec2_deprecated
def test_add_servers_to_multiple_regions():
region1 = 'us-east-1'
region2 = 'ap-northeast-1'
add_servers_to_region('ami-1234abcd', 1, region1)
add_servers_to_region('ami-5678efgh', 1, region2)
us_conn = boto.ec2.connect_to_region(region1)
ap_conn = boto.ec2.connect_to_region(region2)
us_reservations = us_conn.get_all_instances()
ap_reservations = ap_conn.get_all_instances()
len(us_reservations).should.equal(1)
len(ap_reservations).should.equal(1)
us_reservations[0].instances[0].image_id.should.equal('ami-1234abcd')
ap_reservations[0].instances[0].image_id.should.equal('ami-5678efgh')
@mock_autoscaling_deprecated
@mock_elb_deprecated
def test_create_autoscaling_group():
elb_conn = boto.ec2.elb.connect_to_region('us-east-1')
elb_conn.create_load_balancer(
'us_test_lb', zones=[], listeners=[(80, 8080, 'http')])
elb_conn = boto.ec2.elb.connect_to_region('ap-northeast-1')
elb_conn.create_load_balancer(
'ap_test_lb', zones=[], listeners=[(80, 8080, 'http')])
us_conn = boto.ec2.autoscale.connect_to_region('us-east-1')
config = boto.ec2.autoscale.LaunchConfiguration(
name='us_tester',
image_id='ami-abcd1234',
instance_type='m1.small',
)
x = us_conn.create_launch_configuration(config)
us_subnet_id = list(ec2_backends['us-east-1'].subnets['us-east-1c'].keys())[0]
ap_subnet_id = list(ec2_backends['ap-northeast-1'].subnets['ap-northeast-1a'].keys())[0]
group = boto.ec2.autoscale.AutoScalingGroup(
name='us_tester_group',
availability_zones=['us-east-1c'],
default_cooldown=60,
desired_capacity=2,
health_check_period=100,
health_check_type="EC2",
max_size=2,
min_size=2,
launch_config=config,
load_balancers=["us_test_lb"],
placement_group="us_test_placement",
vpc_zone_identifier=us_subnet_id,
termination_policies=["OldestInstance", "NewestInstance"],
)
us_conn.create_auto_scaling_group(group)
ap_conn = boto.ec2.autoscale.connect_to_region('ap-northeast-1')
config = boto.ec2.autoscale.LaunchConfiguration(
name='ap_tester',
image_id='ami-efgh5678',
instance_type='m1.small',
)
ap_conn.create_launch_configuration(config)
group = boto.ec2.autoscale.AutoScalingGroup(
name='ap_tester_group',
availability_zones=['ap-northeast-1a'],
default_cooldown=60,
desired_capacity=2,
health_check_period=100,
health_check_type="EC2",
max_size=2,
min_size=2,
launch_config=config,
load_balancers=["ap_test_lb"],
placement_group="ap_test_placement",
vpc_zone_identifier=ap_subnet_id,
termination_policies=["OldestInstance", "NewestInstance"],
)
ap_conn.create_auto_scaling_group(group)
len(us_conn.get_all_groups()).should.equal(1)
len(ap_conn.get_all_groups()).should.equal(1)
us_group = us_conn.get_all_groups()[0]
us_group.name.should.equal('us_tester_group')
list(us_group.availability_zones).should.equal(['us-east-1c'])
us_group.desired_capacity.should.equal(2)
us_group.max_size.should.equal(2)
us_group.min_size.should.equal(2)
us_group.vpc_zone_identifier.should.equal(us_subnet_id)
us_group.launch_config_name.should.equal('us_tester')
us_group.default_cooldown.should.equal(60)
us_group.health_check_period.should.equal(100)
us_group.health_check_type.should.equal("EC2")
list(us_group.load_balancers).should.equal(["us_test_lb"])
us_group.placement_group.should.equal("us_test_placement")
list(us_group.termination_policies).should.equal(
["OldestInstance", "NewestInstance"])
ap_group = ap_conn.get_all_groups()[0]
ap_group.name.should.equal('ap_tester_group')
list(ap_group.availability_zones).should.equal(['ap-northeast-1a'])
ap_group.desired_capacity.should.equal(2)
ap_group.max_size.should.equal(2)
ap_group.min_size.should.equal(2)
ap_group.vpc_zone_identifier.should.equal(ap_subnet_id)
ap_group.launch_config_name.should.equal('ap_tester')
ap_group.default_cooldown.should.equal(60)
ap_group.health_check_period.should.equal(100)
ap_group.health_check_type.should.equal("EC2")
list(ap_group.load_balancers).should.equal(["ap_test_lb"])
ap_group.placement_group.should.equal("ap_test_placement")
list(ap_group.termination_policies).should.equal(
["OldestInstance", "NewestInstance"])

File diff suppressed because it is too large Load diff

File diff suppressed because it is too large Load diff

View file

@ -7,7 +7,7 @@ from moto import mock_ec2
def get_subnet_id(conn):
vpc = conn.create_vpc(CidrBlock="10.0.0.0/8")['Vpc']
vpc = conn.create_vpc(CidrBlock="10.0.0.0/16")['Vpc']
subnet = conn.create_subnet(
VpcId=vpc['VpcId'], CidrBlock='10.0.0.0/16', AvailabilityZone='us-east-1a')['Subnet']
subnet_id = subnet['SubnetId']

View file

@ -1,268 +1,268 @@
from __future__ import unicode_literals
from nose.tools import assert_raises
import datetime
import boto
import boto3
from boto.exception import EC2ResponseError
from botocore.exceptions import ClientError
import pytz
import sure # noqa
from moto import mock_ec2, mock_ec2_deprecated
from moto.backends import get_model
from moto.core.utils import iso_8601_datetime_with_milliseconds
@mock_ec2
def test_request_spot_instances():
conn = boto3.client('ec2', 'us-east-1')
vpc = conn.create_vpc(CidrBlock="10.0.0.0/8")['Vpc']
subnet = conn.create_subnet(
VpcId=vpc['VpcId'], CidrBlock='10.0.0.0/16', AvailabilityZone='us-east-1a')['Subnet']
subnet_id = subnet['SubnetId']
conn.create_security_group(GroupName='group1', Description='description')
conn.create_security_group(GroupName='group2', Description='description')
start_dt = datetime.datetime(2013, 1, 1).replace(tzinfo=pytz.utc)
end_dt = datetime.datetime(2013, 1, 2).replace(tzinfo=pytz.utc)
start = iso_8601_datetime_with_milliseconds(start_dt)
end = iso_8601_datetime_with_milliseconds(end_dt)
with assert_raises(ClientError) as ex:
request = conn.request_spot_instances(
SpotPrice="0.5", InstanceCount=1, Type='one-time',
ValidFrom=start, ValidUntil=end, LaunchGroup="the-group",
AvailabilityZoneGroup='my-group',
LaunchSpecification={
"ImageId": 'ami-abcd1234',
"KeyName": "test",
"SecurityGroups": ['group1', 'group2'],
"UserData": "some test data",
"InstanceType": 'm1.small',
"Placement": {
"AvailabilityZone": 'us-east-1c',
},
"KernelId": "test-kernel",
"RamdiskId": "test-ramdisk",
"Monitoring": {
"Enabled": True,
},
"SubnetId": subnet_id,
},
DryRun=True,
)
ex.exception.response['Error']['Code'].should.equal('DryRunOperation')
ex.exception.response['ResponseMetadata'][
'HTTPStatusCode'].should.equal(400)
ex.exception.response['Error']['Message'].should.equal(
'An error occurred (DryRunOperation) when calling the RequestSpotInstance operation: Request would have succeeded, but DryRun flag is set')
request = conn.request_spot_instances(
SpotPrice="0.5", InstanceCount=1, Type='one-time',
ValidFrom=start, ValidUntil=end, LaunchGroup="the-group",
AvailabilityZoneGroup='my-group',
LaunchSpecification={
"ImageId": 'ami-abcd1234',
"KeyName": "test",
"SecurityGroups": ['group1', 'group2'],
"UserData": "some test data",
"InstanceType": 'm1.small',
"Placement": {
"AvailabilityZone": 'us-east-1c',
},
"KernelId": "test-kernel",
"RamdiskId": "test-ramdisk",
"Monitoring": {
"Enabled": True,
},
"SubnetId": subnet_id,
},
)
requests = conn.describe_spot_instance_requests()['SpotInstanceRequests']
requests.should.have.length_of(1)
request = requests[0]
request['State'].should.equal("open")
request['SpotPrice'].should.equal("0.5")
request['Type'].should.equal('one-time')
request['ValidFrom'].should.equal(start_dt)
request['ValidUntil'].should.equal(end_dt)
request['LaunchGroup'].should.equal("the-group")
request['AvailabilityZoneGroup'].should.equal('my-group')
launch_spec = request['LaunchSpecification']
security_group_names = [group['GroupName']
for group in launch_spec['SecurityGroups']]
set(security_group_names).should.equal(set(['group1', 'group2']))
launch_spec['ImageId'].should.equal('ami-abcd1234')
launch_spec['KeyName'].should.equal("test")
launch_spec['InstanceType'].should.equal('m1.small')
launch_spec['KernelId'].should.equal("test-kernel")
launch_spec['RamdiskId'].should.equal("test-ramdisk")
launch_spec['SubnetId'].should.equal(subnet_id)
@mock_ec2
def test_request_spot_instances_default_arguments():
"""
Test that moto set the correct default arguments
"""
conn = boto3.client('ec2', 'us-east-1')
request = conn.request_spot_instances(
SpotPrice="0.5",
LaunchSpecification={
"ImageId": 'ami-abcd1234',
}
)
requests = conn.describe_spot_instance_requests()['SpotInstanceRequests']
requests.should.have.length_of(1)
request = requests[0]
request['State'].should.equal("open")
request['SpotPrice'].should.equal("0.5")
request['Type'].should.equal('one-time')
request.shouldnt.contain('ValidFrom')
request.shouldnt.contain('ValidUntil')
request.shouldnt.contain('LaunchGroup')
request.shouldnt.contain('AvailabilityZoneGroup')
launch_spec = request['LaunchSpecification']
security_group_names = [group['GroupName']
for group in launch_spec['SecurityGroups']]
security_group_names.should.equal(["default"])
launch_spec['ImageId'].should.equal('ami-abcd1234')
request.shouldnt.contain('KeyName')
launch_spec['InstanceType'].should.equal('m1.small')
request.shouldnt.contain('KernelId')
request.shouldnt.contain('RamdiskId')
request.shouldnt.contain('SubnetId')
@mock_ec2_deprecated
def test_cancel_spot_instance_request():
conn = boto.connect_ec2()
conn.request_spot_instances(
price=0.5, image_id='ami-abcd1234',
)
requests = conn.get_all_spot_instance_requests()
requests.should.have.length_of(1)
with assert_raises(EC2ResponseError) as ex:
conn.cancel_spot_instance_requests([requests[0].id], dry_run=True)
ex.exception.error_code.should.equal('DryRunOperation')
ex.exception.status.should.equal(400)
ex.exception.message.should.equal(
'An error occurred (DryRunOperation) when calling the CancelSpotInstance operation: Request would have succeeded, but DryRun flag is set')
conn.cancel_spot_instance_requests([requests[0].id])
requests = conn.get_all_spot_instance_requests()
requests.should.have.length_of(0)
@mock_ec2_deprecated
def test_request_spot_instances_fulfilled():
"""
Test that moto correctly fullfills a spot instance request
"""
conn = boto.ec2.connect_to_region("us-east-1")
request = conn.request_spot_instances(
price=0.5, image_id='ami-abcd1234',
)
requests = conn.get_all_spot_instance_requests()
requests.should.have.length_of(1)
request = requests[0]
request.state.should.equal("open")
get_model('SpotInstanceRequest', 'us-east-1')[0].state = 'active'
requests = conn.get_all_spot_instance_requests()
requests.should.have.length_of(1)
request = requests[0]
request.state.should.equal("active")
@mock_ec2_deprecated
def test_tag_spot_instance_request():
"""
Test that moto correctly tags a spot instance request
"""
conn = boto.connect_ec2()
request = conn.request_spot_instances(
price=0.5, image_id='ami-abcd1234',
)
request[0].add_tag('tag1', 'value1')
request[0].add_tag('tag2', 'value2')
requests = conn.get_all_spot_instance_requests()
requests.should.have.length_of(1)
request = requests[0]
tag_dict = dict(request.tags)
tag_dict.should.equal({'tag1': 'value1', 'tag2': 'value2'})
@mock_ec2_deprecated
def test_get_all_spot_instance_requests_filtering():
"""
Test that moto correctly filters spot instance requests
"""
conn = boto.connect_ec2()
request1 = conn.request_spot_instances(
price=0.5, image_id='ami-abcd1234',
)
request2 = conn.request_spot_instances(
price=0.5, image_id='ami-abcd1234',
)
conn.request_spot_instances(
price=0.5, image_id='ami-abcd1234',
)
request1[0].add_tag('tag1', 'value1')
request1[0].add_tag('tag2', 'value2')
request2[0].add_tag('tag1', 'value1')
request2[0].add_tag('tag2', 'wrong')
requests = conn.get_all_spot_instance_requests(filters={'state': 'active'})
requests.should.have.length_of(0)
requests = conn.get_all_spot_instance_requests(filters={'state': 'open'})
requests.should.have.length_of(3)
requests = conn.get_all_spot_instance_requests(
filters={'tag:tag1': 'value1'})
requests.should.have.length_of(2)
requests = conn.get_all_spot_instance_requests(
filters={'tag:tag1': 'value1', 'tag:tag2': 'value2'})
requests.should.have.length_of(1)
@mock_ec2_deprecated
def test_request_spot_instances_setting_instance_id():
conn = boto.ec2.connect_to_region("us-east-1")
request = conn.request_spot_instances(
price=0.5, image_id='ami-abcd1234')
req = get_model('SpotInstanceRequest', 'us-east-1')[0]
req.state = 'active'
req.instance_id = 'i-12345678'
request = conn.get_all_spot_instance_requests()[0]
assert request.state == 'active'
assert request.instance_id == 'i-12345678'
from __future__ import unicode_literals
from nose.tools import assert_raises
import datetime
import boto
import boto3
from boto.exception import EC2ResponseError
from botocore.exceptions import ClientError
import pytz
import sure # noqa
from moto import mock_ec2, mock_ec2_deprecated
from moto.backends import get_model
from moto.core.utils import iso_8601_datetime_with_milliseconds
@mock_ec2
def test_request_spot_instances():
conn = boto3.client('ec2', 'us-east-1')
vpc = conn.create_vpc(CidrBlock="10.0.0.0/16")['Vpc']
subnet = conn.create_subnet(
VpcId=vpc['VpcId'], CidrBlock='10.0.0.0/16', AvailabilityZone='us-east-1a')['Subnet']
subnet_id = subnet['SubnetId']
conn.create_security_group(GroupName='group1', Description='description')
conn.create_security_group(GroupName='group2', Description='description')
start_dt = datetime.datetime(2013, 1, 1).replace(tzinfo=pytz.utc)
end_dt = datetime.datetime(2013, 1, 2).replace(tzinfo=pytz.utc)
start = iso_8601_datetime_with_milliseconds(start_dt)
end = iso_8601_datetime_with_milliseconds(end_dt)
with assert_raises(ClientError) as ex:
request = conn.request_spot_instances(
SpotPrice="0.5", InstanceCount=1, Type='one-time',
ValidFrom=start, ValidUntil=end, LaunchGroup="the-group",
AvailabilityZoneGroup='my-group',
LaunchSpecification={
"ImageId": 'ami-abcd1234',
"KeyName": "test",
"SecurityGroups": ['group1', 'group2'],
"UserData": "some test data",
"InstanceType": 'm1.small',
"Placement": {
"AvailabilityZone": 'us-east-1c',
},
"KernelId": "test-kernel",
"RamdiskId": "test-ramdisk",
"Monitoring": {
"Enabled": True,
},
"SubnetId": subnet_id,
},
DryRun=True,
)
ex.exception.response['Error']['Code'].should.equal('DryRunOperation')
ex.exception.response['ResponseMetadata'][
'HTTPStatusCode'].should.equal(400)
ex.exception.response['Error']['Message'].should.equal(
'An error occurred (DryRunOperation) when calling the RequestSpotInstance operation: Request would have succeeded, but DryRun flag is set')
request = conn.request_spot_instances(
SpotPrice="0.5", InstanceCount=1, Type='one-time',
ValidFrom=start, ValidUntil=end, LaunchGroup="the-group",
AvailabilityZoneGroup='my-group',
LaunchSpecification={
"ImageId": 'ami-abcd1234',
"KeyName": "test",
"SecurityGroups": ['group1', 'group2'],
"UserData": "some test data",
"InstanceType": 'm1.small',
"Placement": {
"AvailabilityZone": 'us-east-1c',
},
"KernelId": "test-kernel",
"RamdiskId": "test-ramdisk",
"Monitoring": {
"Enabled": True,
},
"SubnetId": subnet_id,
},
)
requests = conn.describe_spot_instance_requests()['SpotInstanceRequests']
requests.should.have.length_of(1)
request = requests[0]
request['State'].should.equal("open")
request['SpotPrice'].should.equal("0.5")
request['Type'].should.equal('one-time')
request['ValidFrom'].should.equal(start_dt)
request['ValidUntil'].should.equal(end_dt)
request['LaunchGroup'].should.equal("the-group")
request['AvailabilityZoneGroup'].should.equal('my-group')
launch_spec = request['LaunchSpecification']
security_group_names = [group['GroupName']
for group in launch_spec['SecurityGroups']]
set(security_group_names).should.equal(set(['group1', 'group2']))
launch_spec['ImageId'].should.equal('ami-abcd1234')
launch_spec['KeyName'].should.equal("test")
launch_spec['InstanceType'].should.equal('m1.small')
launch_spec['KernelId'].should.equal("test-kernel")
launch_spec['RamdiskId'].should.equal("test-ramdisk")
launch_spec['SubnetId'].should.equal(subnet_id)
@mock_ec2
def test_request_spot_instances_default_arguments():
"""
Test that moto set the correct default arguments
"""
conn = boto3.client('ec2', 'us-east-1')
request = conn.request_spot_instances(
SpotPrice="0.5",
LaunchSpecification={
"ImageId": 'ami-abcd1234',
}
)
requests = conn.describe_spot_instance_requests()['SpotInstanceRequests']
requests.should.have.length_of(1)
request = requests[0]
request['State'].should.equal("open")
request['SpotPrice'].should.equal("0.5")
request['Type'].should.equal('one-time')
request.shouldnt.contain('ValidFrom')
request.shouldnt.contain('ValidUntil')
request.shouldnt.contain('LaunchGroup')
request.shouldnt.contain('AvailabilityZoneGroup')
launch_spec = request['LaunchSpecification']
security_group_names = [group['GroupName']
for group in launch_spec['SecurityGroups']]
security_group_names.should.equal(["default"])
launch_spec['ImageId'].should.equal('ami-abcd1234')
request.shouldnt.contain('KeyName')
launch_spec['InstanceType'].should.equal('m1.small')
request.shouldnt.contain('KernelId')
request.shouldnt.contain('RamdiskId')
request.shouldnt.contain('SubnetId')
@mock_ec2_deprecated
def test_cancel_spot_instance_request():
conn = boto.connect_ec2()
conn.request_spot_instances(
price=0.5, image_id='ami-abcd1234',
)
requests = conn.get_all_spot_instance_requests()
requests.should.have.length_of(1)
with assert_raises(EC2ResponseError) as ex:
conn.cancel_spot_instance_requests([requests[0].id], dry_run=True)
ex.exception.error_code.should.equal('DryRunOperation')
ex.exception.status.should.equal(400)
ex.exception.message.should.equal(
'An error occurred (DryRunOperation) when calling the CancelSpotInstance operation: Request would have succeeded, but DryRun flag is set')
conn.cancel_spot_instance_requests([requests[0].id])
requests = conn.get_all_spot_instance_requests()
requests.should.have.length_of(0)
@mock_ec2_deprecated
def test_request_spot_instances_fulfilled():
"""
Test that moto correctly fullfills a spot instance request
"""
conn = boto.ec2.connect_to_region("us-east-1")
request = conn.request_spot_instances(
price=0.5, image_id='ami-abcd1234',
)
requests = conn.get_all_spot_instance_requests()
requests.should.have.length_of(1)
request = requests[0]
request.state.should.equal("open")
get_model('SpotInstanceRequest', 'us-east-1')[0].state = 'active'
requests = conn.get_all_spot_instance_requests()
requests.should.have.length_of(1)
request = requests[0]
request.state.should.equal("active")
@mock_ec2_deprecated
def test_tag_spot_instance_request():
"""
Test that moto correctly tags a spot instance request
"""
conn = boto.connect_ec2()
request = conn.request_spot_instances(
price=0.5, image_id='ami-abcd1234',
)
request[0].add_tag('tag1', 'value1')
request[0].add_tag('tag2', 'value2')
requests = conn.get_all_spot_instance_requests()
requests.should.have.length_of(1)
request = requests[0]
tag_dict = dict(request.tags)
tag_dict.should.equal({'tag1': 'value1', 'tag2': 'value2'})
@mock_ec2_deprecated
def test_get_all_spot_instance_requests_filtering():
"""
Test that moto correctly filters spot instance requests
"""
conn = boto.connect_ec2()
request1 = conn.request_spot_instances(
price=0.5, image_id='ami-abcd1234',
)
request2 = conn.request_spot_instances(
price=0.5, image_id='ami-abcd1234',
)
conn.request_spot_instances(
price=0.5, image_id='ami-abcd1234',
)
request1[0].add_tag('tag1', 'value1')
request1[0].add_tag('tag2', 'value2')
request2[0].add_tag('tag1', 'value1')
request2[0].add_tag('tag2', 'wrong')
requests = conn.get_all_spot_instance_requests(filters={'state': 'active'})
requests.should.have.length_of(0)
requests = conn.get_all_spot_instance_requests(filters={'state': 'open'})
requests.should.have.length_of(3)
requests = conn.get_all_spot_instance_requests(
filters={'tag:tag1': 'value1'})
requests.should.have.length_of(2)
requests = conn.get_all_spot_instance_requests(
filters={'tag:tag1': 'value1', 'tag:tag2': 'value2'})
requests.should.have.length_of(1)
@mock_ec2_deprecated
def test_request_spot_instances_setting_instance_id():
conn = boto.ec2.connect_to_region("us-east-1")
request = conn.request_spot_instances(
price=0.5, image_id='ami-abcd1234')
req = get_model('SpotInstanceRequest', 'us-east-1')[0]
req.state = 'active'
req.instance_id = 'i-12345678'
request = conn.get_all_spot_instance_requests()[0]
assert request.state == 'active'
assert request.instance_id == 'i-12345678'

View file

@ -1,291 +1,340 @@
from __future__ import unicode_literals
# Ensure 'assert_raises' context manager support for Python 2.6
import tests.backport_assert_raises # noqa
from nose.tools import assert_raises
import boto3
import boto
import boto.vpc
from boto.exception import EC2ResponseError
from botocore.exceptions import ParamValidationError
import json
import sure # noqa
from moto import mock_cloudformation_deprecated, mock_ec2, mock_ec2_deprecated
@mock_ec2_deprecated
def test_subnets():
ec2 = boto.connect_ec2('the_key', 'the_secret')
conn = boto.connect_vpc('the_key', 'the_secret')
vpc = conn.create_vpc("10.0.0.0/16")
subnet = conn.create_subnet(vpc.id, "10.0.0.0/18")
all_subnets = conn.get_all_subnets()
all_subnets.should.have.length_of(1 + len(ec2.get_all_zones()))
conn.delete_subnet(subnet.id)
all_subnets = conn.get_all_subnets()
all_subnets.should.have.length_of(0 + len(ec2.get_all_zones()))
with assert_raises(EC2ResponseError) as cm:
conn.delete_subnet(subnet.id)
cm.exception.code.should.equal('InvalidSubnetID.NotFound')
cm.exception.status.should.equal(400)
cm.exception.request_id.should_not.be.none
@mock_ec2_deprecated
def test_subnet_create_vpc_validation():
conn = boto.connect_vpc('the_key', 'the_secret')
with assert_raises(EC2ResponseError) as cm:
conn.create_subnet("vpc-abcd1234", "10.0.0.0/18")
cm.exception.code.should.equal('InvalidVpcID.NotFound')
cm.exception.status.should.equal(400)
cm.exception.request_id.should_not.be.none
@mock_ec2_deprecated
def test_subnet_tagging():
conn = boto.connect_vpc('the_key', 'the_secret')
vpc = conn.create_vpc("10.0.0.0/16")
subnet = conn.create_subnet(vpc.id, "10.0.0.0/18")
subnet.add_tag("a key", "some value")
tag = conn.get_all_tags()[0]
tag.name.should.equal("a key")
tag.value.should.equal("some value")
# Refresh the subnet
subnet = conn.get_all_subnets(subnet_ids=[subnet.id])[0]
subnet.tags.should.have.length_of(1)
subnet.tags["a key"].should.equal("some value")
@mock_ec2_deprecated
def test_subnet_should_have_proper_availability_zone_set():
conn = boto.vpc.connect_to_region('us-west-1')
vpcA = conn.create_vpc("10.0.0.0/16")
subnetA = conn.create_subnet(
vpcA.id, "10.0.0.0/24", availability_zone='us-west-1b')
subnetA.availability_zone.should.equal('us-west-1b')
@mock_ec2
def test_default_subnet():
ec2 = boto3.resource('ec2', region_name='us-west-1')
default_vpc = list(ec2.vpcs.all())[0]
default_vpc.cidr_block.should.equal('172.31.0.0/16')
default_vpc.reload()
default_vpc.is_default.should.be.ok
subnet = ec2.create_subnet(
VpcId=default_vpc.id, CidrBlock='172.31.0.0/20', AvailabilityZone='us-west-1a')
subnet.reload()
subnet.map_public_ip_on_launch.shouldnt.be.ok
@mock_ec2_deprecated
def test_non_default_subnet():
vpc_cli = boto.vpc.connect_to_region('us-west-1')
# Create the non default VPC
vpc = vpc_cli.create_vpc("10.0.0.0/16")
vpc.is_default.shouldnt.be.ok
subnet = vpc_cli.create_subnet(vpc.id, "10.0.0.0/24")
subnet = vpc_cli.get_all_subnets(subnet_ids=[subnet.id])[0]
subnet.mapPublicIpOnLaunch.should.equal('false')
@mock_ec2
def test_boto3_non_default_subnet():
ec2 = boto3.resource('ec2', region_name='us-west-1')
# Create the non default VPC
vpc = ec2.create_vpc(CidrBlock='10.0.0.0/16')
vpc.reload()
vpc.is_default.shouldnt.be.ok
subnet = ec2.create_subnet(
VpcId=vpc.id, CidrBlock='10.0.0.0/24', AvailabilityZone='us-west-1a')
subnet.reload()
subnet.map_public_ip_on_launch.shouldnt.be.ok
@mock_ec2
def test_modify_subnet_attribute():
ec2 = boto3.resource('ec2', region_name='us-west-1')
client = boto3.client('ec2', region_name='us-west-1')
# Get the default VPC
vpc = list(ec2.vpcs.all())[0]
subnet = ec2.create_subnet(
VpcId=vpc.id, CidrBlock='10.0.0.0/24', AvailabilityZone='us-west-1a')
# 'map_public_ip_on_launch' is set when calling 'DescribeSubnets' action
subnet.reload()
# For non default subnet, attribute value should be 'False'
subnet.map_public_ip_on_launch.shouldnt.be.ok
client.modify_subnet_attribute(
SubnetId=subnet.id, MapPublicIpOnLaunch={'Value': False})
subnet.reload()
subnet.map_public_ip_on_launch.shouldnt.be.ok
client.modify_subnet_attribute(
SubnetId=subnet.id, MapPublicIpOnLaunch={'Value': True})
subnet.reload()
subnet.map_public_ip_on_launch.should.be.ok
@mock_ec2
def test_modify_subnet_attribute_validation():
ec2 = boto3.resource('ec2', region_name='us-west-1')
client = boto3.client('ec2', region_name='us-west-1')
vpc = ec2.create_vpc(CidrBlock='10.0.0.0/16')
subnet = ec2.create_subnet(
VpcId=vpc.id, CidrBlock='10.0.0.0/24', AvailabilityZone='us-west-1a')
with assert_raises(ParamValidationError):
client.modify_subnet_attribute(
SubnetId=subnet.id, MapPublicIpOnLaunch={'Value': 'invalid'})
@mock_ec2_deprecated
def test_subnet_get_by_id():
ec2 = boto.ec2.connect_to_region('us-west-1')
conn = boto.vpc.connect_to_region('us-west-1')
vpcA = conn.create_vpc("10.0.0.0/16")
subnetA = conn.create_subnet(
vpcA.id, "10.0.0.0/24", availability_zone='us-west-1a')
vpcB = conn.create_vpc("10.0.0.0/16")
subnetB1 = conn.create_subnet(
vpcB.id, "10.0.0.0/24", availability_zone='us-west-1a')
subnetB2 = conn.create_subnet(
vpcB.id, "10.0.1.0/24", availability_zone='us-west-1b')
subnets_by_id = conn.get_all_subnets(subnet_ids=[subnetA.id, subnetB1.id])
subnets_by_id.should.have.length_of(2)
subnets_by_id = tuple(map(lambda s: s.id, subnets_by_id))
subnetA.id.should.be.within(subnets_by_id)
subnetB1.id.should.be.within(subnets_by_id)
with assert_raises(EC2ResponseError) as cm:
conn.get_all_subnets(subnet_ids=['subnet-does_not_exist'])
cm.exception.code.should.equal('InvalidSubnetID.NotFound')
cm.exception.status.should.equal(400)
cm.exception.request_id.should_not.be.none
@mock_ec2_deprecated
def test_get_subnets_filtering():
ec2 = boto.ec2.connect_to_region('us-west-1')
conn = boto.vpc.connect_to_region('us-west-1')
vpcA = conn.create_vpc("10.0.0.0/16")
subnetA = conn.create_subnet(
vpcA.id, "10.0.0.0/24", availability_zone='us-west-1a')
vpcB = conn.create_vpc("10.0.0.0/16")
subnetB1 = conn.create_subnet(
vpcB.id, "10.0.0.0/24", availability_zone='us-west-1a')
subnetB2 = conn.create_subnet(
vpcB.id, "10.0.1.0/24", availability_zone='us-west-1b')
all_subnets = conn.get_all_subnets()
all_subnets.should.have.length_of(3 + len(ec2.get_all_zones()))
# Filter by VPC ID
subnets_by_vpc = conn.get_all_subnets(filters={'vpc-id': vpcB.id})
subnets_by_vpc.should.have.length_of(2)
set([subnet.id for subnet in subnets_by_vpc]).should.equal(
set([subnetB1.id, subnetB2.id]))
# Filter by CIDR variations
subnets_by_cidr1 = conn.get_all_subnets(filters={'cidr': "10.0.0.0/24"})
subnets_by_cidr1.should.have.length_of(2)
set([subnet.id for subnet in subnets_by_cidr1]
).should.equal(set([subnetA.id, subnetB1.id]))
subnets_by_cidr2 = conn.get_all_subnets(
filters={'cidr-block': "10.0.0.0/24"})
subnets_by_cidr2.should.have.length_of(2)
set([subnet.id for subnet in subnets_by_cidr2]
).should.equal(set([subnetA.id, subnetB1.id]))
subnets_by_cidr3 = conn.get_all_subnets(
filters={'cidrBlock': "10.0.0.0/24"})
subnets_by_cidr3.should.have.length_of(2)
set([subnet.id for subnet in subnets_by_cidr3]
).should.equal(set([subnetA.id, subnetB1.id]))
# Filter by VPC ID and CIDR
subnets_by_vpc_and_cidr = conn.get_all_subnets(
filters={'vpc-id': vpcB.id, 'cidr': "10.0.0.0/24"})
subnets_by_vpc_and_cidr.should.have.length_of(1)
set([subnet.id for subnet in subnets_by_vpc_and_cidr]
).should.equal(set([subnetB1.id]))
# Filter by subnet ID
subnets_by_id = conn.get_all_subnets(filters={'subnet-id': subnetA.id})
subnets_by_id.should.have.length_of(1)
set([subnet.id for subnet in subnets_by_id]).should.equal(set([subnetA.id]))
# Filter by availabilityZone
subnets_by_az = conn.get_all_subnets(
filters={'availabilityZone': 'us-west-1a', 'vpc-id': vpcB.id})
subnets_by_az.should.have.length_of(1)
set([subnet.id for subnet in subnets_by_az]
).should.equal(set([subnetB1.id]))
# Filter by defaultForAz
subnets_by_az = conn.get_all_subnets(filters={'defaultForAz': "true"})
subnets_by_az.should.have.length_of(len(conn.get_all_zones()))
# Unsupported filter
conn.get_all_subnets.when.called_with(
filters={'not-implemented-filter': 'foobar'}).should.throw(NotImplementedError)
@mock_ec2_deprecated
@mock_cloudformation_deprecated
def test_subnet_tags_through_cloudformation():
vpc_conn = boto.vpc.connect_to_region('us-west-1')
vpc = vpc_conn.create_vpc("10.0.0.0/16")
subnet_template = {
"AWSTemplateFormatVersion": "2010-09-09",
"Resources": {
"testSubnet": {
"Type": "AWS::EC2::Subnet",
"Properties": {
"VpcId": vpc.id,
"CidrBlock": "10.0.0.0/24",
"AvailabilityZone": "us-west-1b",
"Tags": [{
"Key": "foo",
"Value": "bar",
}, {
"Key": "blah",
"Value": "baz",
}]
}
}
}
}
cf_conn = boto.cloudformation.connect_to_region("us-west-1")
template_json = json.dumps(subnet_template)
cf_conn.create_stack(
"test_stack",
template_body=template_json,
)
subnet = vpc_conn.get_all_subnets(filters={'cidrBlock': '10.0.0.0/24'})[0]
subnet.tags["foo"].should.equal("bar")
subnet.tags["blah"].should.equal("baz")
from __future__ import unicode_literals
# Ensure 'assert_raises' context manager support for Python 2.6
import tests.backport_assert_raises # noqa
from nose.tools import assert_raises
import boto3
import boto
import boto.vpc
from boto.exception import EC2ResponseError
from botocore.exceptions import ParamValidationError, ClientError
import json
import sure # noqa
from moto import mock_cloudformation_deprecated, mock_ec2, mock_ec2_deprecated
@mock_ec2_deprecated
def test_subnets():
ec2 = boto.connect_ec2('the_key', 'the_secret')
conn = boto.connect_vpc('the_key', 'the_secret')
vpc = conn.create_vpc("10.0.0.0/16")
subnet = conn.create_subnet(vpc.id, "10.0.0.0/18")
all_subnets = conn.get_all_subnets()
all_subnets.should.have.length_of(1 + len(ec2.get_all_zones()))
conn.delete_subnet(subnet.id)
all_subnets = conn.get_all_subnets()
all_subnets.should.have.length_of(0 + len(ec2.get_all_zones()))
with assert_raises(EC2ResponseError) as cm:
conn.delete_subnet(subnet.id)
cm.exception.code.should.equal('InvalidSubnetID.NotFound')
cm.exception.status.should.equal(400)
cm.exception.request_id.should_not.be.none
@mock_ec2_deprecated
def test_subnet_create_vpc_validation():
conn = boto.connect_vpc('the_key', 'the_secret')
with assert_raises(EC2ResponseError) as cm:
conn.create_subnet("vpc-abcd1234", "10.0.0.0/18")
cm.exception.code.should.equal('InvalidVpcID.NotFound')
cm.exception.status.should.equal(400)
cm.exception.request_id.should_not.be.none
@mock_ec2_deprecated
def test_subnet_tagging():
conn = boto.connect_vpc('the_key', 'the_secret')
vpc = conn.create_vpc("10.0.0.0/16")
subnet = conn.create_subnet(vpc.id, "10.0.0.0/18")
subnet.add_tag("a key", "some value")
tag = conn.get_all_tags()[0]
tag.name.should.equal("a key")
tag.value.should.equal("some value")
# Refresh the subnet
subnet = conn.get_all_subnets(subnet_ids=[subnet.id])[0]
subnet.tags.should.have.length_of(1)
subnet.tags["a key"].should.equal("some value")
@mock_ec2_deprecated
def test_subnet_should_have_proper_availability_zone_set():
conn = boto.vpc.connect_to_region('us-west-1')
vpcA = conn.create_vpc("10.0.0.0/16")
subnetA = conn.create_subnet(
vpcA.id, "10.0.0.0/24", availability_zone='us-west-1b')
subnetA.availability_zone.should.equal('us-west-1b')
@mock_ec2
def test_default_subnet():
ec2 = boto3.resource('ec2', region_name='us-west-1')
default_vpc = list(ec2.vpcs.all())[0]
default_vpc.cidr_block.should.equal('172.31.0.0/16')
default_vpc.reload()
default_vpc.is_default.should.be.ok
subnet = ec2.create_subnet(
VpcId=default_vpc.id, CidrBlock='172.31.48.0/20', AvailabilityZone='us-west-1a')
subnet.reload()
subnet.map_public_ip_on_launch.shouldnt.be.ok
@mock_ec2_deprecated
def test_non_default_subnet():
vpc_cli = boto.vpc.connect_to_region('us-west-1')
# Create the non default VPC
vpc = vpc_cli.create_vpc("10.0.0.0/16")
vpc.is_default.shouldnt.be.ok
subnet = vpc_cli.create_subnet(vpc.id, "10.0.0.0/24")
subnet = vpc_cli.get_all_subnets(subnet_ids=[subnet.id])[0]
subnet.mapPublicIpOnLaunch.should.equal('false')
@mock_ec2
def test_boto3_non_default_subnet():
ec2 = boto3.resource('ec2', region_name='us-west-1')
# Create the non default VPC
vpc = ec2.create_vpc(CidrBlock='10.0.0.0/16')
vpc.reload()
vpc.is_default.shouldnt.be.ok
subnet = ec2.create_subnet(
VpcId=vpc.id, CidrBlock='10.0.0.0/24', AvailabilityZone='us-west-1a')
subnet.reload()
subnet.map_public_ip_on_launch.shouldnt.be.ok
@mock_ec2
def test_modify_subnet_attribute():
ec2 = boto3.resource('ec2', region_name='us-west-1')
client = boto3.client('ec2', region_name='us-west-1')
# Get the default VPC
vpc = list(ec2.vpcs.all())[0]
subnet = ec2.create_subnet(
VpcId=vpc.id, CidrBlock="172.31.48.0/20", AvailabilityZone='us-west-1a')
# 'map_public_ip_on_launch' is set when calling 'DescribeSubnets' action
subnet.reload()
# For non default subnet, attribute value should be 'False'
subnet.map_public_ip_on_launch.shouldnt.be.ok
client.modify_subnet_attribute(
SubnetId=subnet.id, MapPublicIpOnLaunch={'Value': False})
subnet.reload()
subnet.map_public_ip_on_launch.shouldnt.be.ok
client.modify_subnet_attribute(
SubnetId=subnet.id, MapPublicIpOnLaunch={'Value': True})
subnet.reload()
subnet.map_public_ip_on_launch.should.be.ok
@mock_ec2
def test_modify_subnet_attribute_validation():
ec2 = boto3.resource('ec2', region_name='us-west-1')
client = boto3.client('ec2', region_name='us-west-1')
vpc = ec2.create_vpc(CidrBlock='10.0.0.0/16')
subnet = ec2.create_subnet(
VpcId=vpc.id, CidrBlock='10.0.0.0/24', AvailabilityZone='us-west-1a')
with assert_raises(ParamValidationError):
client.modify_subnet_attribute(
SubnetId=subnet.id, MapPublicIpOnLaunch={'Value': 'invalid'})
@mock_ec2_deprecated
def test_subnet_get_by_id():
ec2 = boto.ec2.connect_to_region('us-west-1')
conn = boto.vpc.connect_to_region('us-west-1')
vpcA = conn.create_vpc("10.0.0.0/16")
subnetA = conn.create_subnet(
vpcA.id, "10.0.0.0/24", availability_zone='us-west-1a')
vpcB = conn.create_vpc("10.0.0.0/16")
subnetB1 = conn.create_subnet(
vpcB.id, "10.0.0.0/24", availability_zone='us-west-1a')
subnetB2 = conn.create_subnet(
vpcB.id, "10.0.1.0/24", availability_zone='us-west-1b')
subnets_by_id = conn.get_all_subnets(subnet_ids=[subnetA.id, subnetB1.id])
subnets_by_id.should.have.length_of(2)
subnets_by_id = tuple(map(lambda s: s.id, subnets_by_id))
subnetA.id.should.be.within(subnets_by_id)
subnetB1.id.should.be.within(subnets_by_id)
with assert_raises(EC2ResponseError) as cm:
conn.get_all_subnets(subnet_ids=['subnet-does_not_exist'])
cm.exception.code.should.equal('InvalidSubnetID.NotFound')
cm.exception.status.should.equal(400)
cm.exception.request_id.should_not.be.none
@mock_ec2_deprecated
def test_get_subnets_filtering():
ec2 = boto.ec2.connect_to_region('us-west-1')
conn = boto.vpc.connect_to_region('us-west-1')
vpcA = conn.create_vpc("10.0.0.0/16")
subnetA = conn.create_subnet(
vpcA.id, "10.0.0.0/24", availability_zone='us-west-1a')
vpcB = conn.create_vpc("10.0.0.0/16")
subnetB1 = conn.create_subnet(
vpcB.id, "10.0.0.0/24", availability_zone='us-west-1a')
subnetB2 = conn.create_subnet(
vpcB.id, "10.0.1.0/24", availability_zone='us-west-1b')
all_subnets = conn.get_all_subnets()
all_subnets.should.have.length_of(3 + len(ec2.get_all_zones()))
# Filter by VPC ID
subnets_by_vpc = conn.get_all_subnets(filters={'vpc-id': vpcB.id})
subnets_by_vpc.should.have.length_of(2)
set([subnet.id for subnet in subnets_by_vpc]).should.equal(
set([subnetB1.id, subnetB2.id]))
# Filter by CIDR variations
subnets_by_cidr1 = conn.get_all_subnets(filters={'cidr': "10.0.0.0/24"})
subnets_by_cidr1.should.have.length_of(2)
set([subnet.id for subnet in subnets_by_cidr1]
).should.equal(set([subnetA.id, subnetB1.id]))
subnets_by_cidr2 = conn.get_all_subnets(
filters={'cidr-block': "10.0.0.0/24"})
subnets_by_cidr2.should.have.length_of(2)
set([subnet.id for subnet in subnets_by_cidr2]
).should.equal(set([subnetA.id, subnetB1.id]))
subnets_by_cidr3 = conn.get_all_subnets(
filters={'cidrBlock': "10.0.0.0/24"})
subnets_by_cidr3.should.have.length_of(2)
set([subnet.id for subnet in subnets_by_cidr3]
).should.equal(set([subnetA.id, subnetB1.id]))
# Filter by VPC ID and CIDR
subnets_by_vpc_and_cidr = conn.get_all_subnets(
filters={'vpc-id': vpcB.id, 'cidr': "10.0.0.0/24"})
subnets_by_vpc_and_cidr.should.have.length_of(1)
set([subnet.id for subnet in subnets_by_vpc_and_cidr]
).should.equal(set([subnetB1.id]))
# Filter by subnet ID
subnets_by_id = conn.get_all_subnets(filters={'subnet-id': subnetA.id})
subnets_by_id.should.have.length_of(1)
set([subnet.id for subnet in subnets_by_id]).should.equal(set([subnetA.id]))
# Filter by availabilityZone
subnets_by_az = conn.get_all_subnets(
filters={'availabilityZone': 'us-west-1a', 'vpc-id': vpcB.id})
subnets_by_az.should.have.length_of(1)
set([subnet.id for subnet in subnets_by_az]
).should.equal(set([subnetB1.id]))
# Filter by defaultForAz
subnets_by_az = conn.get_all_subnets(filters={'defaultForAz': "true"})
subnets_by_az.should.have.length_of(len(conn.get_all_zones()))
# Unsupported filter
conn.get_all_subnets.when.called_with(
filters={'not-implemented-filter': 'foobar'}).should.throw(NotImplementedError)
@mock_ec2_deprecated
@mock_cloudformation_deprecated
def test_subnet_tags_through_cloudformation():
vpc_conn = boto.vpc.connect_to_region('us-west-1')
vpc = vpc_conn.create_vpc("10.0.0.0/16")
subnet_template = {
"AWSTemplateFormatVersion": "2010-09-09",
"Resources": {
"testSubnet": {
"Type": "AWS::EC2::Subnet",
"Properties": {
"VpcId": vpc.id,
"CidrBlock": "10.0.0.0/24",
"AvailabilityZone": "us-west-1b",
"Tags": [{
"Key": "foo",
"Value": "bar",
}, {
"Key": "blah",
"Value": "baz",
}]
}
}
}
}
cf_conn = boto.cloudformation.connect_to_region("us-west-1")
template_json = json.dumps(subnet_template)
cf_conn.create_stack(
"test_stack",
template_body=template_json,
)
subnet = vpc_conn.get_all_subnets(filters={'cidrBlock': '10.0.0.0/24'})[0]
subnet.tags["foo"].should.equal("bar")
subnet.tags["blah"].should.equal("baz")
@mock_ec2
def test_create_subnet_with_invalid_cidr_range():
ec2 = boto3.resource('ec2', region_name='us-west-1')
vpc = ec2.create_vpc(CidrBlock='10.0.0.0/16')
vpc.reload()
vpc.is_default.shouldnt.be.ok
subnet_cidr_block = '10.1.0.0/20'
with assert_raises(ClientError) as ex:
subnet = ec2.create_subnet(VpcId=vpc.id, CidrBlock=subnet_cidr_block)
str(ex.exception).should.equal(
"An error occurred (InvalidSubnet.Range) when calling the CreateSubnet "
"operation: The CIDR '{}' is invalid.".format(subnet_cidr_block))
@mock_ec2
def test_create_subnet_with_invalid_cidr_block_parameter():
ec2 = boto3.resource('ec2', region_name='us-west-1')
vpc = ec2.create_vpc(CidrBlock='10.0.0.0/16')
vpc.reload()
vpc.is_default.shouldnt.be.ok
subnet_cidr_block = '1000.1.0.0/20'
with assert_raises(ClientError) as ex:
subnet = ec2.create_subnet(VpcId=vpc.id, CidrBlock=subnet_cidr_block)
str(ex.exception).should.equal(
"An error occurred (InvalidParameterValue) when calling the CreateSubnet "
"operation: Value ({}) for parameter cidrBlock is invalid. This is not a valid CIDR block.".format(subnet_cidr_block))
@mock_ec2
def test_create_subnets_with_overlapping_cidr_blocks():
ec2 = boto3.resource('ec2', region_name='us-west-1')
vpc = ec2.create_vpc(CidrBlock='10.0.0.0/16')
vpc.reload()
vpc.is_default.shouldnt.be.ok
subnet_cidr_block = '10.0.0.0/24'
with assert_raises(ClientError) as ex:
subnet1 = ec2.create_subnet(VpcId=vpc.id, CidrBlock=subnet_cidr_block)
subnet2 = ec2.create_subnet(VpcId=vpc.id, CidrBlock=subnet_cidr_block)
str(ex.exception).should.equal(
"An error occurred (InvalidSubnet.Conflict) when calling the CreateSubnet "
"operation: The CIDR '{}' conflicts with another subnet".format(subnet_cidr_block))

View file

@ -1,8 +1,12 @@
from moto.ec2 import utils
def test_random_key_pair():
key_pair = utils.random_key_pair()
assert len(key_pair['fingerprint']) == 59
assert key_pair['material'].startswith('---- BEGIN RSA PRIVATE KEY ----')
assert key_pair['material'].endswith('-----END RSA PRIVATE KEY-----')
from moto.ec2 import utils
from .helpers import rsa_check_private_key
def test_random_key_pair():
key_pair = utils.random_key_pair()
rsa_check_private_key(key_pair['material'])
# AWS uses MD5 fingerprints, which are 47 characters long, *not* SHA1
# fingerprints with 59 characters.
assert len(key_pair['fingerprint']) == 47

View file

@ -107,14 +107,19 @@ def test_vpc_peering_connections_cross_region():
ec2_apn1 = boto3.resource('ec2', region_name='ap-northeast-1')
vpc_apn1 = ec2_apn1.create_vpc(CidrBlock='10.20.0.0/16')
# create peering
vpc_pcx = ec2_usw1.create_vpc_peering_connection(
vpc_pcx_usw1 = ec2_usw1.create_vpc_peering_connection(
VpcId=vpc_usw1.id,
PeerVpcId=vpc_apn1.id,
PeerRegion='ap-northeast-1',
)
vpc_pcx.status['Code'].should.equal('initiating-request')
vpc_pcx.requester_vpc.id.should.equal(vpc_usw1.id)
vpc_pcx.accepter_vpc.id.should.equal(vpc_apn1.id)
vpc_pcx_usw1.status['Code'].should.equal('initiating-request')
vpc_pcx_usw1.requester_vpc.id.should.equal(vpc_usw1.id)
vpc_pcx_usw1.accepter_vpc.id.should.equal(vpc_apn1.id)
# test cross region vpc peering connection exist
vpc_pcx_apn1 = ec2_apn1.VpcPeeringConnection(vpc_pcx_usw1.id)
vpc_pcx_apn1.id.should.equal(vpc_pcx_usw1.id)
vpc_pcx_apn1.requester_vpc.id.should.equal(vpc_usw1.id)
vpc_pcx_apn1.accepter_vpc.id.should.equal(vpc_apn1.id)
@mock_ec2
@ -131,3 +136,148 @@ def test_vpc_peering_connections_cross_region_fail():
PeerVpcId=vpc_apn1.id,
PeerRegion='ap-northeast-2')
cm.exception.response['Error']['Code'].should.equal('InvalidVpcID.NotFound')
@mock_ec2
def test_vpc_peering_connections_cross_region_accept():
# create vpc in us-west-1 and ap-northeast-1
ec2_usw1 = boto3.resource('ec2', region_name='us-west-1')
vpc_usw1 = ec2_usw1.create_vpc(CidrBlock='10.90.0.0/16')
ec2_apn1 = boto3.resource('ec2', region_name='ap-northeast-1')
vpc_apn1 = ec2_apn1.create_vpc(CidrBlock='10.20.0.0/16')
# create peering
vpc_pcx_usw1 = ec2_usw1.create_vpc_peering_connection(
VpcId=vpc_usw1.id,
PeerVpcId=vpc_apn1.id,
PeerRegion='ap-northeast-1',
)
# accept peering from ap-northeast-1
ec2_apn1 = boto3.client('ec2', region_name='ap-northeast-1')
ec2_usw1 = boto3.client('ec2', region_name='us-west-1')
acp_pcx_apn1 = ec2_apn1.accept_vpc_peering_connection(
VpcPeeringConnectionId=vpc_pcx_usw1.id
)
des_pcx_apn1 = ec2_usw1.describe_vpc_peering_connections(
VpcPeeringConnectionIds=[vpc_pcx_usw1.id]
)
des_pcx_usw1 = ec2_usw1.describe_vpc_peering_connections(
VpcPeeringConnectionIds=[vpc_pcx_usw1.id]
)
acp_pcx_apn1['VpcPeeringConnection']['Status']['Code'].should.equal('active')
des_pcx_apn1['VpcPeeringConnections'][0]['Status']['Code'].should.equal('active')
des_pcx_usw1['VpcPeeringConnections'][0]['Status']['Code'].should.equal('active')
@mock_ec2
def test_vpc_peering_connections_cross_region_reject():
# create vpc in us-west-1 and ap-northeast-1
ec2_usw1 = boto3.resource('ec2', region_name='us-west-1')
vpc_usw1 = ec2_usw1.create_vpc(CidrBlock='10.90.0.0/16')
ec2_apn1 = boto3.resource('ec2', region_name='ap-northeast-1')
vpc_apn1 = ec2_apn1.create_vpc(CidrBlock='10.20.0.0/16')
# create peering
vpc_pcx_usw1 = ec2_usw1.create_vpc_peering_connection(
VpcId=vpc_usw1.id,
PeerVpcId=vpc_apn1.id,
PeerRegion='ap-northeast-1',
)
# reject peering from ap-northeast-1
ec2_apn1 = boto3.client('ec2', region_name='ap-northeast-1')
ec2_usw1 = boto3.client('ec2', region_name='us-west-1')
rej_pcx_apn1 = ec2_apn1.reject_vpc_peering_connection(
VpcPeeringConnectionId=vpc_pcx_usw1.id
)
des_pcx_apn1 = ec2_usw1.describe_vpc_peering_connections(
VpcPeeringConnectionIds=[vpc_pcx_usw1.id]
)
des_pcx_usw1 = ec2_usw1.describe_vpc_peering_connections(
VpcPeeringConnectionIds=[vpc_pcx_usw1.id]
)
rej_pcx_apn1['Return'].should.equal(True)
des_pcx_apn1['VpcPeeringConnections'][0]['Status']['Code'].should.equal('rejected')
des_pcx_usw1['VpcPeeringConnections'][0]['Status']['Code'].should.equal('rejected')
@mock_ec2
def test_vpc_peering_connections_cross_region_delete():
# create vpc in us-west-1 and ap-northeast-1
ec2_usw1 = boto3.resource('ec2', region_name='us-west-1')
vpc_usw1 = ec2_usw1.create_vpc(CidrBlock='10.90.0.0/16')
ec2_apn1 = boto3.resource('ec2', region_name='ap-northeast-1')
vpc_apn1 = ec2_apn1.create_vpc(CidrBlock='10.20.0.0/16')
# create peering
vpc_pcx_usw1 = ec2_usw1.create_vpc_peering_connection(
VpcId=vpc_usw1.id,
PeerVpcId=vpc_apn1.id,
PeerRegion='ap-northeast-1',
)
# reject peering from ap-northeast-1
ec2_apn1 = boto3.client('ec2', region_name='ap-northeast-1')
ec2_usw1 = boto3.client('ec2', region_name='us-west-1')
del_pcx_apn1 = ec2_apn1.delete_vpc_peering_connection(
VpcPeeringConnectionId=vpc_pcx_usw1.id
)
des_pcx_apn1 = ec2_usw1.describe_vpc_peering_connections(
VpcPeeringConnectionIds=[vpc_pcx_usw1.id]
)
des_pcx_usw1 = ec2_usw1.describe_vpc_peering_connections(
VpcPeeringConnectionIds=[vpc_pcx_usw1.id]
)
del_pcx_apn1['Return'].should.equal(True)
des_pcx_apn1['VpcPeeringConnections'][0]['Status']['Code'].should.equal('deleted')
des_pcx_usw1['VpcPeeringConnections'][0]['Status']['Code'].should.equal('deleted')
@mock_ec2
def test_vpc_peering_connections_cross_region_accept_wrong_region():
# create vpc in us-west-1 and ap-northeast-1
ec2_usw1 = boto3.resource('ec2', region_name='us-west-1')
vpc_usw1 = ec2_usw1.create_vpc(CidrBlock='10.90.0.0/16')
ec2_apn1 = boto3.resource('ec2', region_name='ap-northeast-1')
vpc_apn1 = ec2_apn1.create_vpc(CidrBlock='10.20.0.0/16')
# create peering
vpc_pcx_usw1 = ec2_usw1.create_vpc_peering_connection(
VpcId=vpc_usw1.id,
PeerVpcId=vpc_apn1.id,
PeerRegion='ap-northeast-1',
)
# accept wrong peering from us-west-1 which will raise error
ec2_apn1 = boto3.client('ec2', region_name='ap-northeast-1')
ec2_usw1 = boto3.client('ec2', region_name='us-west-1')
with assert_raises(ClientError) as cm:
ec2_usw1.accept_vpc_peering_connection(
VpcPeeringConnectionId=vpc_pcx_usw1.id
)
cm.exception.response['Error']['Code'].should.equal('OperationNotPermitted')
exp_msg = 'Incorrect region ({0}) specified for this request.VPC ' \
'peering connection {1} must be ' \
'accepted in region {2}'.format('us-west-1', vpc_pcx_usw1.id, 'ap-northeast-1')
cm.exception.response['Error']['Message'].should.equal(exp_msg)
@mock_ec2
def test_vpc_peering_connections_cross_region_reject_wrong_region():
# create vpc in us-west-1 and ap-northeast-1
ec2_usw1 = boto3.resource('ec2', region_name='us-west-1')
vpc_usw1 = ec2_usw1.create_vpc(CidrBlock='10.90.0.0/16')
ec2_apn1 = boto3.resource('ec2', region_name='ap-northeast-1')
vpc_apn1 = ec2_apn1.create_vpc(CidrBlock='10.20.0.0/16')
# create peering
vpc_pcx_usw1 = ec2_usw1.create_vpc_peering_connection(
VpcId=vpc_usw1.id,
PeerVpcId=vpc_apn1.id,
PeerRegion='ap-northeast-1',
)
# reject wrong peering from us-west-1 which will raise error
ec2_apn1 = boto3.client('ec2', region_name='ap-northeast-1')
ec2_usw1 = boto3.client('ec2', region_name='us-west-1')
with assert_raises(ClientError) as cm:
ec2_usw1.reject_vpc_peering_connection(
VpcPeeringConnectionId=vpc_pcx_usw1.id
)
cm.exception.response['Error']['Code'].should.equal('OperationNotPermitted')
exp_msg = 'Incorrect region ({0}) specified for this request.VPC ' \
'peering connection {1} must be accepted or ' \
'rejected in region {2}'.format('us-west-1', vpc_pcx_usw1.id, 'ap-northeast-1')
cm.exception.response['Error']['Message'].should.equal(exp_msg)

File diff suppressed because it is too large Load diff