From a9b06776671c081733ae82c21ba191a4f1965f3d Mon Sep 17 00:00:00 2001 From: addomafi Date: Thu, 5 Mar 2020 18:11:49 -0300 Subject: [PATCH 1/6] #2784 Adding missing support for EbsConfiguration on EMR run_job_flow --- moto/emr/models.py | 2 ++ tests/test_emr/test_emr_boto3.py | 23 +++++++++++++++++++++++ 2 files changed, 25 insertions(+) diff --git a/moto/emr/models.py b/moto/emr/models.py index d9ec2fd6..72c58816 100644 --- a/moto/emr/models.py +++ b/moto/emr/models.py @@ -35,6 +35,7 @@ class FakeInstanceGroup(BaseModel): name=None, id=None, bid_price=None, + ebs_configuration=None, ): self.id = id or random_instance_group_id() @@ -51,6 +52,7 @@ class FakeInstanceGroup(BaseModel): self.num_instances = instance_count self.role = instance_role self.type = instance_type + self.ebs_configuration = ebs_configuration self.creation_datetime = datetime.now(pytz.utc) self.start_datetime = datetime.now(pytz.utc) diff --git a/tests/test_emr/test_emr_boto3.py b/tests/test_emr/test_emr_boto3.py index d849247b..fc7170ba 100644 --- a/tests/test_emr/test_emr_boto3.py +++ b/tests/test_emr/test_emr_boto3.py @@ -61,6 +61,23 @@ input_instance_groups = [ "Name": "task-2", "BidPrice": "0.05", }, + { + "InstanceCount": 10, + "InstanceRole": "TASK", + "InstanceType": "c1.xlarge", + "Market": "SPOT", + "Name": "task-3", + "BidPrice": "0.05", + "EbsConfiguration": { + "EbsBlockDeviceConfigs": [ + { + "VolumeSpecification": {"VolumeType": "gp2", "SizeInGB": 800}, + "VolumesPerInstance": 6, + }, + ], + "EbsOptimized": True, + }, + }, ] @@ -447,6 +464,8 @@ def test_run_job_flow_with_instance_groups(): x["Market"].should.equal(y["Market"]) if "BidPrice" in y: x["BidPrice"].should.equal(y["BidPrice"]) + if "EbsConfiguration" in y: + x["EbsConfiguration"].should.equal(y["EbsConfiguration"]) @mock_emr @@ -604,6 +623,8 @@ def test_instance_groups(): y = input_groups[x["Name"]] if hasattr(y, "BidPrice"): x["BidPrice"].should.equal("BidPrice") + if "EbsConfiguration" in y: + x["EbsConfiguration"].should.equal(y["EbsConfiguration"]) x["CreationDateTime"].should.be.a("datetime.datetime") # x['EndDateTime'].should.be.a('datetime.datetime') x.should.have.key("InstanceGroupId") @@ -623,6 +644,8 @@ def test_instance_groups(): y = input_groups[x["Name"]] if hasattr(y, "BidPrice"): x["BidPrice"].should.equal("BidPrice") + if "EbsConfiguration" in y: + x["EbsConfiguration"].should.equal(y["EbsConfiguration"]) # Configurations # EbsBlockDevices # EbsOptimized From c8dfbe95753fcaa01578eda2798d47c62c86102f Mon Sep 17 00:00:00 2001 From: addomafi Date: Fri, 6 Mar 2020 15:12:44 -0300 Subject: [PATCH 2/6] #2784 Adding missing support for EbsConfiguration on EMR instance groups --- moto/emr/responses.py | 98 +++++++++++++++++++++++++++++++- tests/test_emr/test_emr_boto3.py | 23 ++++---- 2 files changed, 109 insertions(+), 12 deletions(-) diff --git a/moto/emr/responses.py b/moto/emr/responses.py index 38b9774e..3bb595bb 100644 --- a/moto/emr/responses.py +++ b/moto/emr/responses.py @@ -73,6 +73,8 @@ class ElasticMapReduceResponse(BaseResponse): instance_groups = self._get_list_prefix("InstanceGroups.member") for item in instance_groups: item["instance_count"] = int(item["instance_count"]) + # Adding support to EbsConfiguration + self._parse_ebs_configuration(item) instance_groups = self.backend.add_instance_groups(jobflow_id, instance_groups) template = self.response_template(ADD_INSTANCE_GROUPS_TEMPLATE) return template.render(instance_groups=instance_groups) @@ -324,6 +326,8 @@ class ElasticMapReduceResponse(BaseResponse): if instance_groups: for ig in instance_groups: ig["instance_count"] = int(ig["instance_count"]) + # Adding support to EbsConfiguration + self._parse_ebs_configuration(ig) self.backend.add_instance_groups(cluster.id, instance_groups) tags = self._get_list_prefix("Tags.member") @@ -335,6 +339,83 @@ class ElasticMapReduceResponse(BaseResponse): template = self.response_template(RUN_JOB_FLOW_TEMPLATE) return template.render(cluster=cluster) + def _has_key_prefix(self, key_prefix, value): + for key in value: # iter on both keys and values + if key.startswith(key_prefix): + return True + return False + + def _parse_ebs_configuration(self, instance_group): + key_ebs_config = "ebs_configuration" + ebs_configuration = dict() + # Filter only EBS config keys + for key in instance_group: + if key.startswith(key_ebs_config): + ebs_configuration[key] = instance_group[key] + + if len(ebs_configuration) > 0: + # Key that should be extracted + ebs_optimized = "ebs_optimized" + ebs_block_device_configs = "ebs_block_device_configs" + volume_specification = "volume_specification" + size_in_gb = "size_in_gb" + volume_type = "volume_type" + iops = "iops" + volumes_per_instance = "volumes_per_instance" + + key_ebs_optimized = f"{key_ebs_config}._{ebs_optimized}" + # EbsOptimized config + if key_ebs_optimized in ebs_configuration: + instance_group.pop(key_ebs_optimized) + ebs_configuration[ebs_optimized] = ebs_configuration.pop( + key_ebs_optimized + ) + + # Ebs Blocks + ebs_blocks = [] + idx = 1 + keyfmt = f"{key_ebs_config}._{ebs_block_device_configs}.member.{{}}" + key = keyfmt.format(idx) + while self._has_key_prefix(key, ebs_configuration): + vlespc_keyfmt = f"{key}._{volume_specification}._{{}}" + vol_size = vlespc_keyfmt.format(size_in_gb) + vol_iops = vlespc_keyfmt.format(iops) + vol_type = vlespc_keyfmt.format(volume_type) + + ebs_block = dict() + ebs_block[volume_specification] = dict() + if vol_size in ebs_configuration: + instance_group.pop(vol_size) + ebs_block[volume_specification][size_in_gb] = int( + ebs_configuration.pop(vol_size) + ) + if vol_iops in ebs_configuration: + instance_group.pop(vol_iops) + ebs_block[volume_specification][iops] = ebs_configuration.pop( + vol_iops + ) + if vol_type in ebs_configuration: + instance_group.pop(vol_type) + ebs_block[volume_specification][ + volume_type + ] = ebs_configuration.pop(vol_type) + + per_instance = f"{key}._{volumes_per_instance}" + if per_instance in ebs_configuration: + instance_group.pop(per_instance) + ebs_block[volumes_per_instance] = int( + ebs_configuration.pop(per_instance) + ) + + if len(ebs_block) > 0: + ebs_blocks.append(ebs_block) + idx += 1 + key = keyfmt.format(idx) + + if len(ebs_blocks) > 0: + ebs_configuration[ebs_block_device_configs] = ebs_blocks + instance_group[key_ebs_config] = ebs_configuration + @generate_boto3_response("SetTerminationProtection") def set_termination_protection(self): termination_protection = self._get_param("TerminationProtected") @@ -754,7 +835,22 @@ LIST_INSTANCE_GROUPS_TEMPLATE = """ Date: Fri, 6 Mar 2020 18:10:39 -0300 Subject: [PATCH 3/6] #2784 Implementing assertions for testcase with instance groups --- tests/test_emr/test_emr_boto3.py | 29 ++++++++++++++++------------- 1 file changed, 16 insertions(+), 13 deletions(-) diff --git a/tests/test_emr/test_emr_boto3.py b/tests/test_emr/test_emr_boto3.py index a00de164..524cdcd5 100644 --- a/tests/test_emr/test_emr_boto3.py +++ b/tests/test_emr/test_emr_boto3.py @@ -438,6 +438,19 @@ def test_run_job_flow_with_visible_to_all_users(): resp = client.describe_cluster(ClusterId=cluster_id) resp["Cluster"]["VisibleToAllUsers"].should.equal(expected) +def _do_assertion_ebs_configuration(x, y): + total_volumes = 0 + total_size = 0 + for ebs_block in y["EbsConfiguration"]["EbsBlockDeviceConfigs"]: + total_volumes += ebs_block["VolumesPerInstance"] + total_size += ebs_block["VolumeSpecification"]["SizeInGB"] + # Multiply by total volumes + total_size = total_size * total_volumes + comp_total_size = 0 + for ebs_block in x["EbsBlockDevices"]: + comp_total_size += ebs_block["VolumeSpecification"]["SizeInGB"] + len(x["EbsBlockDevices"]).should.equal(total_volumes) + comp_total_size.should.equal(comp_total_size) @mock_emr def test_run_job_flow_with_instance_groups(): @@ -456,8 +469,9 @@ def test_run_job_flow_with_instance_groups(): x["Market"].should.equal(y["Market"]) if "BidPrice" in y: x["BidPrice"].should.equal(y["BidPrice"]) + if "EbsConfiguration" in y: - x["EbsConfiguration"].should.equal(y["EbsConfiguration"]) + _do_assertion_ebs_configuration(x, y) @mock_emr @@ -635,18 +649,7 @@ def test_instance_groups(): if hasattr(y, "BidPrice"): x["BidPrice"].should.equal("BidPrice") if "EbsConfiguration" in y: - total_volumes = 0 - total_size = 0 - for ebs_block in y["EbsConfiguration"]["EbsBlockDeviceConfigs"]: - total_volumes += ebs_block["VolumesPerInstance"] - total_size += ebs_block["VolumeSpecification"]["SizeInGB"] - # Multiply by total volumes - total_size = total_size * total_volumes - comp_total_size = 0 - for ebs_block in x["EbsBlockDevices"]: - comp_total_size += ebs_block["VolumeSpecification"]["SizeInGB"] - len(x["EbsBlockDevices"]).should.equal(total_volumes) - comp_total_size.should.equal(comp_total_size) + _do_assertion_ebs_configuration(x, y) # Configurations # EbsBlockDevices # EbsOptimized From c6eca1843435413615b1650e2161a0f4890819d7 Mon Sep 17 00:00:00 2001 From: addomafi Date: Fri, 6 Mar 2020 18:11:07 -0300 Subject: [PATCH 4/6] Reformat --- tests/test_emr/test_emr_boto3.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/tests/test_emr/test_emr_boto3.py b/tests/test_emr/test_emr_boto3.py index 524cdcd5..adfc3fa9 100644 --- a/tests/test_emr/test_emr_boto3.py +++ b/tests/test_emr/test_emr_boto3.py @@ -438,6 +438,7 @@ def test_run_job_flow_with_visible_to_all_users(): resp = client.describe_cluster(ClusterId=cluster_id) resp["Cluster"]["VisibleToAllUsers"].should.equal(expected) + def _do_assertion_ebs_configuration(x, y): total_volumes = 0 total_size = 0 @@ -452,6 +453,7 @@ def _do_assertion_ebs_configuration(x, y): len(x["EbsBlockDevices"]).should.equal(total_volumes) comp_total_size.should.equal(comp_total_size) + @mock_emr def test_run_job_flow_with_instance_groups(): input_groups = dict((g["Name"], g) for g in input_instance_groups) From 155cf82791cef359ee7d6404da14090a18396d1b Mon Sep 17 00:00:00 2001 From: addomafi Date: Sat, 7 Mar 2020 07:43:59 -0300 Subject: [PATCH 5/6] Keeping support to python 2 --- moto/emr/responses.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/moto/emr/responses.py b/moto/emr/responses.py index 3bb595bb..a9a4aae9 100644 --- a/moto/emr/responses.py +++ b/moto/emr/responses.py @@ -363,7 +363,7 @@ class ElasticMapReduceResponse(BaseResponse): iops = "iops" volumes_per_instance = "volumes_per_instance" - key_ebs_optimized = f"{key_ebs_config}._{ebs_optimized}" + key_ebs_optimized = "{0}._{1}".format(key_ebs_config, ebs_optimized) # EbsOptimized config if key_ebs_optimized in ebs_configuration: instance_group.pop(key_ebs_optimized) @@ -374,10 +374,10 @@ class ElasticMapReduceResponse(BaseResponse): # Ebs Blocks ebs_blocks = [] idx = 1 - keyfmt = f"{key_ebs_config}._{ebs_block_device_configs}.member.{{}}" + keyfmt = "{0}._{1}.member.{{}}".format(key_ebs_config, ebs_block_device_configs) key = keyfmt.format(idx) while self._has_key_prefix(key, ebs_configuration): - vlespc_keyfmt = f"{key}._{volume_specification}._{{}}" + vlespc_keyfmt = "{0}._{1}._{{}}".format(key, volume_specification) vol_size = vlespc_keyfmt.format(size_in_gb) vol_iops = vlespc_keyfmt.format(iops) vol_type = vlespc_keyfmt.format(volume_type) @@ -400,7 +400,7 @@ class ElasticMapReduceResponse(BaseResponse): volume_type ] = ebs_configuration.pop(vol_type) - per_instance = f"{key}._{volumes_per_instance}" + per_instance = "{0}._{1}".format(key, volumes_per_instance) if per_instance in ebs_configuration: instance_group.pop(per_instance) ebs_block[volumes_per_instance] = int( From a6c1d474128d4097a157b300a4b24948e32656ea Mon Sep 17 00:00:00 2001 From: addomafi Date: Sat, 7 Mar 2020 08:21:27 -0300 Subject: [PATCH 6/6] Reformat --- moto/emr/responses.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/moto/emr/responses.py b/moto/emr/responses.py index a9a4aae9..3708db0e 100644 --- a/moto/emr/responses.py +++ b/moto/emr/responses.py @@ -374,7 +374,9 @@ class ElasticMapReduceResponse(BaseResponse): # Ebs Blocks ebs_blocks = [] idx = 1 - keyfmt = "{0}._{1}.member.{{}}".format(key_ebs_config, ebs_block_device_configs) + keyfmt = "{0}._{1}.member.{{}}".format( + key_ebs_config, ebs_block_device_configs + ) key = keyfmt.format(idx) while self._has_key_prefix(key, ebs_configuration): vlespc_keyfmt = "{0}._{1}._{{}}".format(key, volume_specification)