Implement filters and pagers for some EMR end points

This commit is contained in:
Taro Sato 2016-10-18 16:47:02 -07:00
commit 484faa54c4
6 changed files with 342 additions and 115 deletions

View file

@ -1,6 +1,9 @@
from __future__ import unicode_literals
import time
from datetime import datetime
import boto
import pytz
from boto.emr.bootstrap_action import BootstrapAction
from boto.emr.instance_group import InstanceGroup
from boto.emr.step import StreamingStep
@ -104,18 +107,53 @@ def test_describe_cluster():
@mock_emr
def test_describe_jobflows():
conn = boto.connect_emr()
job1_id = conn.run_jobflow(**run_jobflow_args)
job2_id = conn.run_jobflow(**run_jobflow_args)
args = run_jobflow_args.copy()
expected = {}
for idx in range(400):
cluster_name = 'cluster' + str(idx)
args['name'] = cluster_name
cluster_id = conn.run_jobflow(**args)
expected[cluster_id] = {
'id': cluster_id,
'name': cluster_name,
'state': 'WAITING'
}
# need sleep since it appears the timestamp is always rounded to
# the nearest second internally
time.sleep(1)
timestamp = datetime.now(pytz.utc)
time.sleep(1)
for idx in range(400, 600):
cluster_name = 'cluster' + str(idx)
args['name'] = cluster_name
cluster_id = conn.run_jobflow(**args)
conn.terminate_jobflow(cluster_id)
expected[cluster_id] = {
'id': cluster_id,
'name': cluster_name,
'state': 'TERMINATED'
}
jobs = conn.describe_jobflows()
jobs.should.have.length_of(2)
jobs.should.have.length_of(512)
jobs = conn.describe_jobflows(jobflow_ids=[job2_id])
jobs.should.have.length_of(1)
jobs[0].jobflowid.should.equal(job2_id)
for cluster_id, y in expected.items():
resp = conn.describe_jobflows(jobflow_ids=[cluster_id])
resp.should.have.length_of(1)
resp[0].jobflowid.should.equal(cluster_id)
first_job = conn.describe_jobflow(job1_id)
first_job.jobflowid.should.equal(job1_id)
resp = conn.describe_jobflows(states=['WAITING'])
resp.should.have.length_of(400)
for x in resp:
x.state.should.equal('WAITING')
resp = conn.describe_jobflows(created_before=timestamp)
resp.should.have.length_of(400)
resp = conn.describe_jobflows(created_after=timestamp)
resp.should.have.length_of(200)
@mock_emr
@ -204,43 +242,69 @@ def test_describe_jobflow():
@mock_emr
def test_list_clusters():
conn = boto.connect_emr()
args = run_jobflow_args.copy()
args['name'] = 'jobflow1'
cluster1_id = conn.run_jobflow(**args)
args['name'] = 'jobflow2'
cluster2_id = conn.run_jobflow(**args)
conn.terminate_jobflow(cluster2_id)
expected = {}
summary = conn.list_clusters()
clusters = summary.clusters
clusters.should.have.length_of(2)
for idx in range(40):
cluster_name = 'jobflow' + str(idx)
args['name'] = cluster_name
cluster_id = conn.run_jobflow(**args)
expected[cluster_id] = {
'id': cluster_id,
'name': cluster_name,
'normalizedinstancehours': '0',
'state': 'WAITING'
}
expected = {
cluster1_id: {
'id': cluster1_id,
'name': 'jobflow1',
'normalizedinstancehours': 0,
'state': 'WAITING'},
cluster2_id: {
'id': cluster2_id,
'name': 'jobflow2',
'normalizedinstancehours': 0,
'state': 'TERMINATED'},
}
# need sleep since it appears the timestamp is always rounded to
# the nearest second internally
time.sleep(1)
timestamp = datetime.now(pytz.utc)
time.sleep(1)
for x in clusters:
y = expected[x.id]
x.id.should.equal(y['id'])
x.name.should.equal(y['name'])
int(x.normalizedinstancehours).should.equal(y['normalizedinstancehours'])
x.status.state.should.equal(y['state'])
x.status.timeline.creationdatetime.should.be.a(six.string_types)
if y['state'] == 'TERMINATED':
x.status.timeline.enddatetime.should.be.a(six.string_types)
else:
x.status.timeline.shouldnt.have.property('enddatetime')
x.status.timeline.readydatetime.should.be.a(six.string_types)
for idx in range(40, 70):
cluster_name = 'jobflow' + str(idx)
args['name'] = cluster_name
cluster_id = conn.run_jobflow(**args)
conn.terminate_jobflow(cluster_id)
expected[cluster_id] = {
'id': cluster_id,
'name': cluster_name,
'normalizedinstancehours': '0',
'state': 'TERMINATED'
}
args = {}
while 1:
resp = conn.list_clusters(**args)
clusters = resp.clusters
len(clusters).should.be.lower_than_or_equal_to(50)
for x in clusters:
y = expected[x.id]
x.id.should.equal(y['id'])
x.name.should.equal(y['name'])
x.normalizedinstancehours.should.equal(y['normalizedinstancehours'])
x.status.state.should.equal(y['state'])
x.status.timeline.creationdatetime.should.be.a(six.string_types)
if y['state'] == 'TERMINATED':
x.status.timeline.enddatetime.should.be.a(six.string_types)
else:
x.status.timeline.shouldnt.have.property('enddatetime')
x.status.timeline.readydatetime.should.be.a(six.string_types)
if not hasattr(resp, 'marker'):
break
args = {'marker': resp.marker}
resp = conn.list_clusters(cluster_states=['TERMINATED'])
resp.clusters.should.have.length_of(30)
for x in resp.clusters:
x.status.state.should.equal('TERMINATED')
resp = conn.list_clusters(created_before=timestamp)
resp.clusters.should.have.length_of(40)
resp = conn.list_clusters(created_after=timestamp)
resp.clusters.should.have.length_of(30)
@mock_emr
@ -516,7 +580,8 @@ def test_steps():
expected = dict((s.name, s) for s in input_steps)
for x in conn.list_steps(cluster_id).steps:
steps = conn.list_steps(cluster_id).steps
for x in steps:
y = expected[x.name]
# actiononfailure
list(arg.value for arg in x.config.args).should.equal([
@ -554,6 +619,17 @@ def test_steps():
# x.status.timeline.enddatetime.should.be.a(six.string_types)
# x.status.timeline.startdatetime.should.be.a(six.string_types)
@requires_boto_gte('2.39')
def test_list_steps_with_states():
# boto's list_steps prior to 2.39 has a bug that ignores
# step_states argument.
steps = conn.list_steps(cluster_id).steps
step_id = steps[0].id
steps = conn.list_steps(cluster_id, step_states=['STARTING']).steps
steps.should.have.length_of(1)
steps[0].id.should.equal(step_id)
test_list_steps_with_states()
@mock_emr
def test_tags():