Merge pull request #1197 from terrycain/batch

AWS Batch
This commit is contained in:
Jack Danger 2017-10-23 09:43:10 -07:00 committed by GitHub
commit 2eec3a5c36
16 changed files with 2539 additions and 9 deletions

View file

@ -40,6 +40,7 @@ from .route53 import mock_route53, mock_route53_deprecated # flake8: noqa
from .swf import mock_swf, mock_swf_deprecated # flake8: noqa
from .xray import mock_xray, mock_xray_client, XRaySegment # flake8: noqa
from .logs import mock_logs, mock_logs_deprecated # flake8: noqa
from .batch import mock_batch # flake8: noqa
try:

View file

@ -35,11 +35,13 @@ from moto.sqs import sqs_backends
from moto.ssm import ssm_backends
from moto.sts import sts_backends
from moto.xray import xray_backends
from moto.batch import batch_backends
BACKENDS = {
'acm': acm_backends,
'apigateway': apigateway_backends,
'autoscaling': autoscaling_backends,
'batch': batch_backends,
'cloudformation': cloudformation_backends,
'cloudwatch': cloudwatch_backends,
'datapipeline': datapipeline_backends,

6
moto/batch/__init__.py Normal file
View file

@ -0,0 +1,6 @@
from __future__ import unicode_literals
from .models import batch_backends
from ..core.models import base_decorator
batch_backend = batch_backends['us-east-1']
mock_batch = base_decorator(batch_backends)

37
moto/batch/exceptions.py Normal file
View file

@ -0,0 +1,37 @@
from __future__ import unicode_literals
import json
class AWSError(Exception):
CODE = None
STATUS = 400
def __init__(self, message, code=None, status=None):
self.message = message
self.code = code if code is not None else self.CODE
self.status = status if status is not None else self.STATUS
def response(self):
return json.dumps({'__type': self.code, 'message': self.message}), dict(status=self.status)
class InvalidRequestException(AWSError):
CODE = 'InvalidRequestException'
class InvalidParameterValueException(AWSError):
CODE = 'InvalidParameterValue'
class ValidationError(AWSError):
CODE = 'ValidationError'
class InternalFailure(AWSError):
CODE = 'InternalFailure'
STATUS = 500
class ClientException(AWSError):
CODE = 'ClientException'
STATUS = 400

1042
moto/batch/models.py Normal file

File diff suppressed because it is too large Load diff

296
moto/batch/responses.py Normal file
View file

@ -0,0 +1,296 @@
from __future__ import unicode_literals
from moto.core.responses import BaseResponse
from .models import batch_backends
from six.moves.urllib.parse import urlsplit
from .exceptions import AWSError
import json
class BatchResponse(BaseResponse):
def _error(self, code, message):
return json.dumps({'__type': code, 'message': message}), dict(status=400)
@property
def batch_backend(self):
"""
:return: Batch Backend
:rtype: moto.batch.models.BatchBackend
"""
return batch_backends[self.region]
@property
def json(self):
if self.body is None or self.body == '':
self._json = {}
elif not hasattr(self, '_json'):
try:
self._json = json.loads(self.body)
except json.JSONDecodeError:
print()
return self._json
def _get_param(self, param_name, if_none=None):
val = self.json.get(param_name)
if val is not None:
return val
return if_none
def _get_action(self):
# Return element after the /v1/*
return urlsplit(self.uri).path.lstrip('/').split('/')[1]
# CreateComputeEnvironment
def createcomputeenvironment(self):
compute_env_name = self._get_param('computeEnvironmentName')
compute_resource = self._get_param('computeResources')
service_role = self._get_param('serviceRole')
state = self._get_param('state')
_type = self._get_param('type')
try:
name, arn = self.batch_backend.create_compute_environment(
compute_environment_name=compute_env_name,
_type=_type, state=state,
compute_resources=compute_resource,
service_role=service_role
)
except AWSError as err:
return err.response()
result = {
'computeEnvironmentArn': arn,
'computeEnvironmentName': name
}
return json.dumps(result)
# DescribeComputeEnvironments
def describecomputeenvironments(self):
compute_environments = self._get_param('computeEnvironments')
max_results = self._get_param('maxResults') # Ignored, should be int
next_token = self._get_param('nextToken') # Ignored
envs = self.batch_backend.describe_compute_environments(compute_environments, max_results=max_results, next_token=next_token)
result = {'computeEnvironments': envs}
return json.dumps(result)
# DeleteComputeEnvironment
def deletecomputeenvironment(self):
compute_environment = self._get_param('computeEnvironment')
try:
self.batch_backend.delete_compute_environment(compute_environment)
except AWSError as err:
return err.response()
return ''
# UpdateComputeEnvironment
def updatecomputeenvironment(self):
compute_env_name = self._get_param('computeEnvironment')
compute_resource = self._get_param('computeResources')
service_role = self._get_param('serviceRole')
state = self._get_param('state')
try:
name, arn = self.batch_backend.update_compute_environment(
compute_environment_name=compute_env_name,
compute_resources=compute_resource,
service_role=service_role,
state=state
)
except AWSError as err:
return err.response()
result = {
'computeEnvironmentArn': arn,
'computeEnvironmentName': name
}
return json.dumps(result)
# CreateJobQueue
def createjobqueue(self):
compute_env_order = self._get_param('computeEnvironmentOrder')
queue_name = self._get_param('jobQueueName')
priority = self._get_param('priority')
state = self._get_param('state')
try:
name, arn = self.batch_backend.create_job_queue(
queue_name=queue_name,
priority=priority,
state=state,
compute_env_order=compute_env_order
)
except AWSError as err:
return err.response()
result = {
'jobQueueArn': arn,
'jobQueueName': name
}
return json.dumps(result)
# DescribeJobQueues
def describejobqueues(self):
job_queues = self._get_param('jobQueues')
max_results = self._get_param('maxResults') # Ignored, should be int
next_token = self._get_param('nextToken') # Ignored
queues = self.batch_backend.describe_job_queues(job_queues, max_results=max_results, next_token=next_token)
result = {'jobQueues': queues}
return json.dumps(result)
# UpdateJobQueue
def updatejobqueue(self):
compute_env_order = self._get_param('computeEnvironmentOrder')
queue_name = self._get_param('jobQueue')
priority = self._get_param('priority')
state = self._get_param('state')
try:
name, arn = self.batch_backend.update_job_queue(
queue_name=queue_name,
priority=priority,
state=state,
compute_env_order=compute_env_order
)
except AWSError as err:
return err.response()
result = {
'jobQueueArn': arn,
'jobQueueName': name
}
return json.dumps(result)
# DeleteJobQueue
def deletejobqueue(self):
queue_name = self._get_param('jobQueue')
self.batch_backend.delete_job_queue(queue_name)
return ''
# RegisterJobDefinition
def registerjobdefinition(self):
container_properties = self._get_param('containerProperties')
def_name = self._get_param('jobDefinitionName')
parameters = self._get_param('parameters')
retry_strategy = self._get_param('retryStrategy')
_type = self._get_param('type')
try:
name, arn, revision = self.batch_backend.register_job_definition(
def_name=def_name,
parameters=parameters,
_type=_type,
retry_strategy=retry_strategy,
container_properties=container_properties
)
except AWSError as err:
return err.response()
result = {
'jobDefinitionArn': arn,
'jobDefinitionName': name,
'revision': revision
}
return json.dumps(result)
# DeregisterJobDefinition
def deregisterjobdefinition(self):
queue_name = self._get_param('jobDefinition')
self.batch_backend.deregister_job_definition(queue_name)
return ''
# DescribeJobDefinitions
def describejobdefinitions(self):
job_def_name = self._get_param('jobDefinitionName')
job_def_list = self._get_param('jobDefinitions')
max_results = self._get_param('maxResults')
next_token = self._get_param('nextToken')
status = self._get_param('status')
job_defs = self.batch_backend.describe_job_definitions(job_def_name, job_def_list, status, max_results, next_token)
result = {'jobDefinitions': [job.describe() for job in job_defs]}
return json.dumps(result)
# SubmitJob
def submitjob(self):
container_overrides = self._get_param('containerOverrides')
depends_on = self._get_param('dependsOn')
job_def = self._get_param('jobDefinition')
job_name = self._get_param('jobName')
job_queue = self._get_param('jobQueue')
parameters = self._get_param('parameters')
retries = self._get_param('retryStrategy')
try:
name, job_id = self.batch_backend.submit_job(
job_name, job_def, job_queue,
parameters=parameters,
retries=retries,
depends_on=depends_on,
container_overrides=container_overrides
)
except AWSError as err:
return err.response()
result = {
'jobId': job_id,
'jobName': name,
}
return json.dumps(result)
# DescribeJobs
def describejobs(self):
jobs = self._get_param('jobs')
try:
return json.dumps({'jobs': self.batch_backend.describe_jobs(jobs)})
except AWSError as err:
return err.response()
# ListJobs
def listjobs(self):
job_queue = self._get_param('jobQueue')
job_status = self._get_param('jobStatus')
max_results = self._get_param('maxResults')
next_token = self._get_param('nextToken')
try:
jobs = self.batch_backend.list_jobs(job_queue, job_status, max_results, next_token)
except AWSError as err:
return err.response()
result = {'jobSummaryList': [{'jobId': job.job_id, 'jobName': job.job_name} for job in jobs]}
return json.dumps(result)
# TerminateJob
def terminatejob(self):
job_id = self._get_param('jobId')
reason = self._get_param('reason')
try:
self.batch_backend.terminate_job(job_id, reason)
except AWSError as err:
return err.response()
return ''
# CancelJob
def canceljob(self): # Theres some AWS semantics on the differences but for us they're identical ;-)
return self.terminatejob()

25
moto/batch/urls.py Normal file
View file

@ -0,0 +1,25 @@
from __future__ import unicode_literals
from .responses import BatchResponse
url_bases = [
"https?://batch.(.+).amazonaws.com",
]
url_paths = {
'{0}/v1/createcomputeenvironment$': BatchResponse.dispatch,
'{0}/v1/describecomputeenvironments$': BatchResponse.dispatch,
'{0}/v1/deletecomputeenvironment': BatchResponse.dispatch,
'{0}/v1/updatecomputeenvironment': BatchResponse.dispatch,
'{0}/v1/createjobqueue': BatchResponse.dispatch,
'{0}/v1/describejobqueues': BatchResponse.dispatch,
'{0}/v1/updatejobqueue': BatchResponse.dispatch,
'{0}/v1/deletejobqueue': BatchResponse.dispatch,
'{0}/v1/registerjobdefinition': BatchResponse.dispatch,
'{0}/v1/deregisterjobdefinition': BatchResponse.dispatch,
'{0}/v1/describejobdefinitions': BatchResponse.dispatch,
'{0}/v1/submitjob': BatchResponse.dispatch,
'{0}/v1/describejobs': BatchResponse.dispatch,
'{0}/v1/listjobs': BatchResponse.dispatch,
'{0}/v1/terminatejob': BatchResponse.dispatch,
'{0}/v1/canceljob': BatchResponse.dispatch,
}

22
moto/batch/utils.py Normal file
View file

@ -0,0 +1,22 @@
from __future__ import unicode_literals
def make_arn_for_compute_env(account_id, name, region_name):
return "arn:aws:batch:{0}:{1}:compute-environment/{2}".format(region_name, account_id, name)
def make_arn_for_job_queue(account_id, name, region_name):
return "arn:aws:batch:{0}:{1}:job-queue/{2}".format(region_name, account_id, name)
def make_arn_for_task_def(account_id, name, revision, region_name):
return "arn:aws:batch:{0}:{1}:job-definition/{2}:{3}".format(region_name, account_id, name, revision)
def lowercase_first_key(some_dict):
new_dict = {}
for key, value in some_dict.items():
new_key = key[0].lower() + key[1:]
new_dict[new_key] = value
return new_dict

View file

@ -8,6 +8,7 @@ import re
from moto.autoscaling import models as autoscaling_models
from moto.awslambda import models as lambda_models
from moto.batch import models as batch_models
from moto.cloudwatch import models as cloudwatch_models
from moto.datapipeline import models as datapipeline_models
from moto.dynamodb import models as dynamodb_models
@ -31,6 +32,9 @@ from boto.cloudformation.stack import Output
MODEL_MAP = {
"AWS::AutoScaling::AutoScalingGroup": autoscaling_models.FakeAutoScalingGroup,
"AWS::AutoScaling::LaunchConfiguration": autoscaling_models.FakeLaunchConfiguration,
"AWS::Batch::JobDefinition": batch_models.JobDefinition,
"AWS::Batch::JobQueue": batch_models.JobQueue,
"AWS::Batch::ComputeEnvironment": batch_models.ComputeEnvironment,
"AWS::DynamoDB::Table": dynamodb_models.Table,
"AWS::Kinesis::Stream": kinesis_models.Stream,
"AWS::Lambda::EventSourceMapping": lambda_models.EventSourceMapping,

View file

@ -1,10 +1,10 @@
from __future__ import unicode_literals
from .responses import ELBV2Response
from ..elb.urls import api_version_elb_backend
url_bases = [
"https?://elasticloadbalancing.(.+).amazonaws.com",
]
url_paths = {
'{0}/$': ELBV2Response.dispatch,
'{0}/$': api_version_elb_backend,
}

View file

@ -528,6 +528,12 @@ class IAMBackend(BaseBackend):
return role
raise IAMNotFoundException("Role {0} not found".format(role_name))
def get_role_by_arn(self, arn):
for role in self.get_roles():
if role.arn == arn:
return role
raise IAMNotFoundException("Role {0} not found".format(arn))
def delete_role(self, role_name):
for role in self.get_roles():
if role.name == role_name:

View file

@ -22,6 +22,13 @@ class LogEvent:
"timestamp": self.timestamp
}
def to_response_dict(self):
return {
"ingestionTime": self.ingestionTime,
"message": self.message,
"timestamp": self.timestamp
}
class LogStream:
_log_ids = 0
@ -41,7 +48,14 @@ class LogStream:
self.__class__._log_ids += 1
def _update(self):
self.firstEventTimestamp = min([x.timestamp for x in self.events])
self.lastEventTimestamp = max([x.timestamp for x in self.events])
def to_describe_dict(self):
# Compute start and end times
self._update()
return {
"arn": self.arn,
"creationTime": self.creationTime,
@ -79,7 +93,7 @@ class LogStream:
if next_token is None:
next_token = 0
events_page = events[next_token: next_token + limit]
events_page = [event.to_response_dict() for event in events[next_token: next_token + limit]]
next_token += limit
if next_token >= len(self.events):
next_token = None
@ -120,17 +134,17 @@ class LogGroup:
del self.streams[log_stream_name]
def describe_log_streams(self, descending, limit, log_group_name, log_stream_name_prefix, next_token, order_by):
log_streams = [stream.to_describe_dict() for name, stream in self.streams.items() if name.startswith(log_stream_name_prefix)]
log_streams = [(name, stream.to_describe_dict()) for name, stream in self.streams.items() if name.startswith(log_stream_name_prefix)]
def sorter(stream):
return stream.name if order_by == 'logStreamName' else stream.lastEventTimestamp
def sorter(item):
return item[0] if order_by == 'logStreamName' else item[1]['lastEventTimestamp']
if next_token is None:
next_token = 0
log_streams = sorted(log_streams, key=sorter, reverse=descending)
new_token = next_token + limit
log_streams_page = log_streams[next_token: new_token]
log_streams_page = [x[1] for x in log_streams[next_token: new_token]]
if new_token >= len(log_streams):
new_token = None

View file

@ -47,7 +47,7 @@ class LogsResponse(BaseResponse):
def describe_log_streams(self):
log_group_name = self._get_param('logGroupName')
log_stream_name_prefix = self._get_param('logStreamNamePrefix')
log_stream_name_prefix = self._get_param('logStreamNamePrefix', '')
descending = self._get_param('descending', False)
limit = self._get_param('limit', 50)
assert limit <= 50
@ -83,7 +83,7 @@ class LogsResponse(BaseResponse):
limit = self._get_param('limit', 10000)
assert limit <= 10000
next_token = self._get_param('nextToken')
start_from_head = self._get_param('startFromHead')
start_from_head = self._get_param('startFromHead', False)
events, next_backward_token, next_foward_token = \
self.logs_backend.get_log_events(log_group_name, log_stream_name, start_time, end_time, limit, next_token, start_from_head)