Merge pull request #506 from DenverJ/volume_snapshot_filters

Add filters for describeVolumes and describeSnapshots.
This commit is contained in:
Steve Pulec 2016-01-11 13:30:56 -05:00
commit defd106523
3 changed files with 162 additions and 6 deletions

View file

@ -48,6 +48,63 @@ def test_filter_volume_by_id():
vol2.should.have.length_of(2)
@mock_ec2
def test_volume_filters():
conn = boto.connect_ec2('the_key', 'the_secret')
reservation = conn.run_instances('ami-1234abcd')
instance = reservation.instances[0]
instance.update()
volume1 = conn.create_volume(80, "us-east-1a")
volume2 = conn.create_volume(36, "us-east-1b")
volume3 = conn.create_volume(20, "us-east-1c")
snapshot = volume3.create_snapshot(description='testsnap')
volume4 = conn.create_volume(25, "us-east-1a", snapshot=snapshot)
conn.create_tags([volume1.id], {'testkey1': 'testvalue1'})
conn.create_tags([volume2.id], {'testkey2': 'testvalue2'})
volume1.update()
volume2.update()
volume3.update()
volume4.update()
block_mapping = instance.block_device_mapping['/dev/sda1']
volumes_by_attach_time = conn.get_all_volumes(filters={'attachment.attach-time': block_mapping.attach_time})
set([vol.id for vol in volumes_by_attach_time]).should.equal(set([block_mapping.volume_id]))
volumes_by_attach_device = conn.get_all_volumes(filters={'attachment.device': '/dev/sda1'})
set([vol.id for vol in volumes_by_attach_device]).should.equal(set([block_mapping.volume_id]))
volumes_by_attach_instance_id = conn.get_all_volumes(filters={'attachment.instance-id': instance.id})
set([vol.id for vol in volumes_by_attach_instance_id]).should.equal(set([block_mapping.volume_id]))
volumes_by_create_time = conn.get_all_volumes(filters={'create-time': volume4.create_time})
set([vol.create_time for vol in volumes_by_create_time]).should.equal(set([volume4.create_time]))
volumes_by_size = conn.get_all_volumes(filters={'size': volume2.size})
set([vol.id for vol in volumes_by_size]).should.equal(set([volume2.id]))
volumes_by_snapshot_id = conn.get_all_volumes(filters={'snapshot-id': snapshot.id})
set([vol.id for vol in volumes_by_snapshot_id]).should.equal(set([volume4.id]))
volumes_by_status = conn.get_all_volumes(filters={'status': 'in-use'})
set([vol.id for vol in volumes_by_status]).should.equal(set([block_mapping.volume_id]))
volumes_by_tag_key = conn.get_all_volumes(filters={'tag-key': 'testkey1'})
set([vol.id for vol in volumes_by_tag_key]).should.equal(set([volume1.id]))
volumes_by_tag_value = conn.get_all_volumes(filters={'tag-value': 'testvalue1'})
set([vol.id for vol in volumes_by_tag_value]).should.equal(set([volume1.id]))
volumes_by_tag = conn.get_all_volumes(filters={'tag:testkey1': 'testvalue1'})
set([vol.id for vol in volumes_by_tag]).should.equal(set([volume1.id]))
@mock_ec2
def test_volume_attach_and_detach():
conn = boto.connect_ec2('the_key', 'the_secret')
@ -139,6 +196,44 @@ def test_filter_snapshot_by_id():
s.region.name.should.equal(conn.region.name)
@mock_ec2
def test_snapshot_filters():
conn = boto.connect_ec2('the_key', 'the_secret')
volume1 = conn.create_volume(20, "us-east-1a")
volume2 = conn.create_volume(25, "us-east-1a")
snapshot1 = volume1.create_snapshot(description='testsnapshot1')
snapshot2 = volume1.create_snapshot(description='testsnapshot2')
snapshot3 = volume2.create_snapshot(description='testsnapshot3')
conn.create_tags([snapshot1.id], {'testkey1': 'testvalue1'})
conn.create_tags([snapshot2.id], {'testkey2': 'testvalue2'})
snapshots_by_description = conn.get_all_snapshots(filters={'description': 'testsnapshot1'})
set([snap.id for snap in snapshots_by_description]).should.equal(set([snapshot1.id]))
snapshots_by_id = conn.get_all_snapshots(filters={'snapshot-id': snapshot1.id})
set([snap.id for snap in snapshots_by_id]).should.equal(set([snapshot1.id]))
snapshots_by_start_time = conn.get_all_snapshots(filters={'start-time': snapshot1.start_time})
set([snap.start_time for snap in snapshots_by_start_time]).should.equal(set([snapshot1.start_time]))
snapshots_by_volume_id = conn.get_all_snapshots(filters={'volume-id': volume1.id})
set([snap.id for snap in snapshots_by_volume_id]).should.equal(set([snapshot1.id, snapshot2.id]))
snapshots_by_volume_size = conn.get_all_snapshots(filters={'volume-size': volume1.size})
set([snap.id for snap in snapshots_by_volume_size]).should.equal(set([snapshot1.id, snapshot2.id]))
snapshots_by_tag_key = conn.get_all_snapshots(filters={'tag-key': 'testkey1'})
set([snap.id for snap in snapshots_by_tag_key]).should.equal(set([snapshot1.id]))
snapshots_by_tag_value = conn.get_all_snapshots(filters={'tag-value': 'testvalue1'})
set([snap.id for snap in snapshots_by_tag_value]).should.equal(set([snapshot1.id]))
snapshots_by_tag = conn.get_all_snapshots(filters={'tag:testkey1': 'testvalue1'})
set([snap.id for snap in snapshots_by_tag]).should.equal(set([snapshot1.id]))
@mock_ec2
def test_snapshot_attribute():
conn = boto.connect_ec2('the_key', 'the_secret')