Better EMR coverage and boto3 request/response handling

This revision includes:

- A handler for requests for which content-type is JSON (from boto3).

- A decorator (generate_boto3_response) to convert XML responses to
  JSON (for boto3). This way, existing response templates for boto can
  be shared for generating boto3 response.

- Utility class/functions to use botocore's service specification data
  (accessible under botocore.data) for type casting, from query
  parameters to Python objects and XML to JSON.

- Updates to response handlers/models to cover more EMR end points and
  mockable parameters
This commit is contained in:
Taro Sato 2016-09-21 20:59:19 -07:00
commit 7cd404808b
10 changed files with 2399 additions and 841 deletions

View file

@ -1,47 +1,227 @@
from __future__ import unicode_literals
from datetime import datetime
import boto.emr
import pytz
from moto.core import BaseBackend
from .utils import random_instance_group_id, random_job_id
from .utils import random_instance_group_id, random_cluster_id, random_step_id
DEFAULT_JOB_FLOW_ROLE = 'EMRJobflowDefault'
class FakeApplication(object):
def __init__(self, name, version, args=None, additional_info=None):
self.additional_info = additional_info or {}
self.args = args or []
self.name = name
self.version = version
class FakeBootstrapAction(object):
def __init__(self, args, name, script_path):
self.args = args or []
self.name = name
self.script_path = script_path
class FakeInstanceGroup(object):
def __init__(self, id, instance_count, instance_role, instance_type, market, name, bid_price=None):
self.id = id
def __init__(self, instance_count, instance_role, instance_type, market, name,
id=None, bid_price=None):
self.id = id or random_instance_group_id()
self.bid_price = bid_price
self.market = market
self.name = name
self.num_instances = instance_count
self.role = instance_role
self.type = instance_type
self.market = market
self.name = name
self.bid_price = bid_price
self.creation_datetime = datetime.now(pytz.utc)
self.start_datetime = datetime.now(pytz.utc)
self.ready_datetime = datetime.now(pytz.utc)
self.end_datetime = None
self.state = 'RUNNING'
def set_instance_count(self, instance_count):
self.num_instances = instance_count
class Cluster(object):
def __init__(self, id, name, availability_zone, ec2_key_name, subnet_id,
ec2_iam_profile, log_uri):
self.id = id
class FakeStep(object):
def __init__(self,
state,
name='',
jar='',
args=None,
properties=None,
action_on_failure='TERMINATE_CLUSTER'):
self.id = random_step_id()
self.action_on_failure = action_on_failure
self.args = args or []
self.name = name
self.jar = jar
self.properties = properties or {}
self.creation_datetime = datetime.now(pytz.utc)
self.end_datetime = None
self.ready_datetime = None
self.start_datetime = None
self.state = state
class FakeCluster(object):
def __init__(self,
emr_backend,
name,
log_uri,
job_flow_role,
service_role,
steps,
instance_attrs,
bootstrap_actions=None,
configurations=None,
cluster_id=None,
visible_to_all_users='false',
release_label=None,
requested_ami_version=None,
running_ami_version=None):
self.id = cluster_id or random_cluster_id()
emr_backend.clusters[self.id] = self
self.emr_backend = emr_backend
self.applications = []
self.auto_terminate = "false"
self.availability_zone = availability_zone
self.subnet_id = subnet_id
self.ec2_iam_profile = ec2_iam_profile
self.log_uri = log_uri
self.master_public_dns_name = ""
self.normalized_instance_hours = 0
self.requested_ami_version = "2.4.2"
self.running_ami_version = "2.4.2"
self.service_role = "my-service-role"
self.state = "RUNNING"
self.bootstrap_actions = []
for bootstrap_action in (bootstrap_actions or []):
self.add_bootstrap_action(bootstrap_action)
self.configurations = configurations or []
self.tags = {}
self.termination_protected = "false"
self.visible_to_all_users = "false"
self.log_uri = log_uri
self.name = name
self.normalized_instance_hours = 0
self.steps = []
self.add_steps(steps)
self.set_visibility(visible_to_all_users)
self.instance_group_ids = []
self.master_instance_group_id = None
self.core_instance_group_id = None
if 'master_instance_type' in instance_attrs and instance_attrs['master_instance_type']:
self.emr_backend.add_instance_groups(
self.id,
[{'instance_count': 1,
'instance_role': 'MASTER',
'instance_type': instance_attrs['master_instance_type'],
'market': 'ON_DEMAND',
'name': 'master'}])
if 'slave_instance_type' in instance_attrs and instance_attrs['slave_instance_type']:
self.emr_backend.add_instance_groups(
self.id,
[{'instance_count': instance_attrs['instance_count'] - 1,
'instance_role': 'CORE',
'instance_type': instance_attrs['slave_instance_type'],
'market': 'ON_DEMAND',
'name': 'slave'}])
self.additional_master_security_groups = instance_attrs.get('additional_master_security_groups')
self.additional_slave_security_groups = instance_attrs.get('additional_slave_security_groups')
self.availability_zone = instance_attrs.get('availability_zone')
self.ec2_key_name = instance_attrs.get('ec2_key_name')
self.ec2_subnet_id = instance_attrs.get('ec2_subnet_id')
self.hadoop_version = instance_attrs.get('hadoop_version')
self.keep_job_flow_alive_when_no_steps = instance_attrs.get('keep_job_flow_alive_when_no_steps')
self.master_security_group = instance_attrs.get('emr_managed_master_security_group')
self.service_access_security_group = instance_attrs.get('service_access_security_group')
self.slave_security_group = instance_attrs.get('emr_managed_slave_security_group')
self.termination_protected = instance_attrs.get('termination_protected')
self.release_label = release_label
self.requested_ami_version = requested_ami_version
self.running_ami_version = running_ami_version
self.role = job_flow_role or 'EMRJobflowDefault'
self.service_role = service_role
self.creation_datetime = datetime.now(pytz.utc)
self.start_datetime = None
self.ready_datetime = None
self.end_datetime = None
self.state = None
self.start_cluster()
self.run_bootstrap_actions()
@property
def instance_groups(self):
return self.emr_backend.get_instance_groups(self.instance_group_ids)
@property
def master_instance_type(self):
return self.emr_backend.instance_groups[self.master_instance_group_id].type
@property
def slave_instance_type(self):
return self.emr_backend.instance_groups[self.core_instance_group_id].type
@property
def instance_count(self):
return sum(group.num_instances for group in self.instance_groups)
def start_cluster(self):
self.state = 'STARTING'
self.start_datetime = datetime.now(pytz.utc)
def run_bootstrap_actions(self):
self.state = 'BOOTSTRAPPING'
self.ready_datetime = datetime.now(pytz.utc)
self.state = 'WAITING'
if not self.steps:
if not self.keep_job_flow_alive_when_no_steps:
self.terminate()
def terminate(self):
self.state = 'TERMINATING'
self.end_datetime = datetime.now(pytz.utc)
self.state = 'TERMINATED'
def add_applications(self, applications):
self.applications.extend([
FakeApplication(
name=app.get('name', ''),
version=app.get('version', ''),
args=app.get('args', []),
additional_info=app.get('additiona_info', {}))
for app in applications])
def add_bootstrap_action(self, bootstrap_action):
self.bootstrap_actions.append(FakeBootstrapAction(**bootstrap_action))
def add_instance_group(self, instance_group):
if instance_group.role == 'MASTER':
if self.master_instance_group_id:
raise Exception('Cannot add another master instance group')
self.master_instance_group_id = instance_group.id
if instance_group.role == 'CORE':
if self.core_instance_group_id:
raise Exception('Cannot add another core instance group')
self.core_instance_group_id = instance_group.id
self.instance_group_ids.append(instance_group.id)
def add_steps(self, steps):
added_steps = []
for step in steps:
if self.steps:
# If we already have other steps, this one is pending
fake = FakeStep(state='PENDING', **step)
else:
fake = FakeStep(state='STARTING', **step)
self.steps.append(fake)
added_steps.append(fake)
self.state = 'RUNNING'
return added_steps
def add_tags(self, tags):
self.tags.update(tags)
@ -50,166 +230,61 @@ class Cluster(object):
for key in tag_keys:
self.tags.pop(key, None)
class FakeStep(object):
def __init__(self, state, **kwargs):
# 'Steps.member.1.HadoopJarStep.Jar': ['/home/hadoop/contrib/streaming/hadoop-streaming.jar'],
# 'Steps.member.1.HadoopJarStep.Args.member.1': ['-mapper'],
# 'Steps.member.1.HadoopJarStep.Args.member.2': ['s3n://elasticmapreduce/samples/wordcount/wordSplitter.py'],
# 'Steps.member.1.HadoopJarStep.Args.member.3': ['-reducer'],
# 'Steps.member.1.HadoopJarStep.Args.member.4': ['aggregate'],
# 'Steps.member.1.HadoopJarStep.Args.member.5': ['-input'],
# 'Steps.member.1.HadoopJarStep.Args.member.6': ['s3n://elasticmapreduce/samples/wordcount/input'],
# 'Steps.member.1.HadoopJarStep.Args.member.7': ['-output'],
# 'Steps.member.1.HadoopJarStep.Args.member.8': ['s3n://<my output bucket>/output/wordcount_output'],
# 'Steps.member.1.ActionOnFailure': ['TERMINATE_JOB_FLOW'],
# 'Steps.member.1.Name': ['My wordcount example']}
self.action_on_failure = kwargs['action_on_failure']
self.name = kwargs['name']
self.jar = kwargs['hadoop_jar_step._jar']
self.args = []
self.state = state
arg_index = 1
while True:
arg = kwargs.get('hadoop_jar_step._args.member.{0}'.format(arg_index))
if arg:
self.args.append(arg)
arg_index += 1
else:
break
class FakeJobFlow(object):
def __init__(self, job_id, name, log_uri, job_flow_role, visible_to_all_users, steps, instance_attrs, emr_backend):
self.id = job_id
self.name = name
self.log_uri = log_uri
self.role = job_flow_role or DEFAULT_JOB_FLOW_ROLE
self.state = "STARTING"
self.steps = []
self.add_steps(steps)
self.initial_instance_count = instance_attrs.get('instance_count', 0)
self.initial_master_instance_type = instance_attrs.get('master_instance_type')
self.initial_slave_instance_type = instance_attrs.get('slave_instance_type')
self.set_visibility(visible_to_all_users)
self.normalized_instance_hours = 0
self.ec2_key_name = instance_attrs.get('ec2_key_name')
self.availability_zone = instance_attrs.get('placement.availability_zone')
self.subnet_id = instance_attrs.get('ec2_subnet_id')
self.keep_job_flow_alive_when_no_steps = instance_attrs.get('keep_job_flow_alive_when_no_steps')
self.termination_protected = instance_attrs.get('termination_protected')
self.instance_group_ids = []
self.emr_backend = emr_backend
def create_cluster(self):
cluster = Cluster(
id=self.id,
name=self.name,
availability_zone=self.availability_zone,
ec2_key_name=self.ec2_key_name,
subnet_id=self.subnet_id,
ec2_iam_profile=self.role,
log_uri=self.log_uri,
)
return cluster
def terminate(self):
self.state = 'TERMINATED'
def set_visibility(self, visibility):
if visibility == 'true':
self.visible_to_all_users = True
else:
self.visible_to_all_users = False
def set_termination_protection(self, value):
self.termination_protected = value
def add_steps(self, steps):
for index, step in enumerate(steps):
if self.steps:
# If we already have other steps, this one is pending
self.steps.append(FakeStep(state='PENDING', **step))
else:
self.steps.append(FakeStep(state='STARTING', **step))
def add_instance_group(self, instance_group_id):
self.instance_group_ids.append(instance_group_id)
@property
def instance_groups(self):
return self.emr_backend.get_instance_groups(self.instance_group_ids)
@property
def master_instance_type(self):
groups = self.instance_groups
if groups:
return groups[0].type
else:
return self.initial_master_instance_type
@property
def slave_instance_type(self):
groups = self.instance_groups
if groups:
return groups[0].type
else:
return self.initial_slave_instance_type
@property
def instance_count(self):
groups = self.instance_groups
if not groups:
# No groups,return initial instance count
return self.initial_instance_count
count = 0
for group in groups:
count += int(group.num_instances)
return count
def set_visibility(self, visibility):
self.visible_to_all_users = visibility
class ElasticMapReduceBackend(BaseBackend):
def __init__(self):
self.job_flows = {}
def __init__(self, region_name):
super(ElasticMapReduceBackend, self).__init__()
self.region_name = region_name
self.clusters = {}
self.instance_groups = {}
def run_job_flow(self, name, log_uri, job_flow_role, visible_to_all_users, steps, instance_attrs):
job_id = random_job_id()
job_flow = FakeJobFlow(
job_id, name, log_uri, job_flow_role, visible_to_all_users, steps, instance_attrs, self)
self.job_flows[job_id] = job_flow
cluster = job_flow.create_cluster()
self.clusters[cluster.id] = cluster
return job_flow
def reset(self):
region_name = self.region_name
self.__dict__ = {}
self.__init__(region_name)
def add_applications(self, cluster_id, applications):
cluster = self.get_cluster(cluster_id)
cluster.add_applications(applications)
def add_instance_groups(self, cluster_id, instance_groups):
cluster = self.clusters[cluster_id]
result_groups = []
for instance_group in instance_groups:
group = FakeInstanceGroup(**instance_group)
self.instance_groups[group.id] = group
cluster.add_instance_group(group)
result_groups.append(group)
return result_groups
def add_job_flow_steps(self, job_flow_id, steps):
job_flow = self.job_flows[job_flow_id]
job_flow.add_steps(steps)
return job_flow
cluster = self.clusters[job_flow_id]
steps = cluster.add_steps(steps)
return steps
def add_tags(self, cluster_id, tags):
cluster = self.get_cluster(cluster_id)
cluster.add_tags(tags)
def describe_job_flows(self, job_flow_ids=None):
jobs = self.job_flows.values()
clusters = self.clusters.values()
if job_flow_ids:
return [job for job in jobs if job.id in job_flow_ids]
return [cluster for cluster in clusters if cluster.id in job_flow_ids]
else:
return jobs
return clusters
def terminate_job_flows(self, job_ids):
flows = [flow for flow in self.describe_job_flows() if flow.id in job_ids]
for flow in flows:
flow.terminate()
return flows
def list_clusters(self):
return self.clusters.values()
def describe_step(self, cluster_id, step_id):
cluster = self.clusters[cluster_id]
for step in cluster.steps:
if step.id == step_id:
return step
def get_cluster(self, cluster_id):
return self.clusters[cluster_id]
@ -221,43 +296,50 @@ class ElasticMapReduceBackend(BaseBackend):
if group_id in instance_group_ids
]
def add_instance_groups(self, job_flow_id, instance_groups):
job_flow = self.job_flows[job_flow_id]
result_groups = []
for instance_group in instance_groups:
instance_group_id = random_instance_group_id()
group = FakeInstanceGroup(instance_group_id, **instance_group)
self.instance_groups[instance_group_id] = group
job_flow.add_instance_group(instance_group_id)
result_groups.append(group)
return result_groups
def list_bootstrap_actions(self, cluster_id):
return self.clusters[cluster_id].bootstrap_actions
def list_clusters(self):
return self.clusters.values()
def list_instance_groups(self, cluster_id):
return self.clusters[cluster_id].instance_groups
def list_steps(self, cluster_id, step_states=None):
return self.clusters[cluster_id].steps
def modify_instance_groups(self, instance_groups):
result_groups = []
for instance_group in instance_groups:
group = self.instance_groups[instance_group['instance_group_id']]
group.set_instance_count(instance_group['instance_count'])
group.set_instance_count(int(instance_group['instance_count']))
return result_groups
def set_visible_to_all_users(self, job_ids, visible_to_all_users):
for job_id in job_ids:
job = self.job_flows[job_id]
job.set_visibility(visible_to_all_users)
def set_termination_protection(self, job_ids, value):
for job_id in job_ids:
job = self.job_flows[job_id]
job.set_termination_protection(value)
def add_tags(self, cluster_id, tags):
cluster = self.get_cluster(cluster_id)
cluster.add_tags(tags)
def remove_tags(self, cluster_id, tag_keys):
cluster = self.get_cluster(cluster_id)
cluster.remove_tags(tag_keys)
def run_job_flow(self, **kwargs):
return FakeCluster(self, **kwargs)
def set_visible_to_all_users(self, job_flow_ids, visible_to_all_users):
for job_flow_id in job_flow_ids:
cluster = self.clusters[job_flow_id]
cluster.set_visibility(visible_to_all_users)
def set_termination_protection(self, job_flow_ids, value):
for job_flow_id in job_flow_ids:
cluster = self.clusters[job_flow_id]
cluster.set_termination_protection(value)
def terminate_job_flows(self, job_flow_ids):
clusters = [cluster for cluster in self.describe_job_flows()
if cluster.id in job_flow_ids]
for cluster in clusters:
cluster.terminate()
return clusters
emr_backends = {}
for region in boto.emr.regions():
emr_backends[region.name] = ElasticMapReduceBackend()
emr_backends[region.name] = ElasticMapReduceBackend(region.name)

File diff suppressed because it is too large Load diff

View file

@ -2,6 +2,7 @@ from __future__ import unicode_literals
from .responses import ElasticMapReduceResponse
url_bases = [
"https?://(.+).elasticmapreduce.amazonaws.com",
"https?://elasticmapreduce.(.+).amazonaws.com",
]

View file

@ -1,19 +1,25 @@
from __future__ import unicode_literals
import random
import string
import six
def random_job_id(size=13):
def random_id(size=13):
chars = list(range(10)) + list(string.ascii_uppercase)
job_tag = ''.join(six.text_type(random.choice(chars)) for x in range(size))
return 'j-{0}'.format(job_tag)
return ''.join(six.text_type(random.choice(chars)) for x in range(size))
def random_cluster_id(size=13):
return 'j-{0}'.format(random_id())
def random_step_id(size=13):
return 's-{0}'.format(random_id())
def random_instance_group_id(size=13):
chars = list(range(10)) + list(string.ascii_uppercase)
job_tag = ''.join(six.text_type(random.choice(chars)) for x in range(size))
return 'i-{0}'.format(job_tag)
return 'i-{0}'.format(random_id())
def tags_from_query_string(querystring_dict):
@ -30,3 +36,18 @@ def tags_from_query_string(querystring_dict):
else:
response_values[tag_key] = None
return response_values
def steps_from_query_string(querystring_dict):
steps = []
for step in querystring_dict:
step['jar'] = step.pop('hadoop_jar_step._jar')
step['properties'] = dict((o['Key'], o['Value']) for o in step.get('properties', []))
step['args'] = []
idx = 1
keyfmt = 'hadoop_jar_step._args.member.{0}'
while keyfmt.format(idx) in step:
step['args'].append(step.pop(keyfmt.format(idx)))
idx += 1
steps.append(step)
return steps