Merge pull request #2996 from bblommers/athena_start_stop_execution

Athena - Start/Stop query execution, get_work_group
This commit is contained in:
Steve Pulec 2020-05-16 15:37:42 -05:00 committed by GitHub
commit 134cceeb12
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 239 additions and 33 deletions

View file

@ -2,10 +2,9 @@ from __future__ import unicode_literals
import time
from boto3 import Session
from moto.core import BaseBackend, BaseModel, ACCOUNT_ID
from moto.core import BaseBackend, BaseModel
from moto.core import ACCOUNT_ID
from uuid import uuid4
class TaggableResourceMixin(object):
@ -50,6 +49,17 @@ class WorkGroup(TaggableResourceMixin, BaseModel):
self.configuration = configuration
class Execution(BaseModel):
def __init__(self, query, context, config, workgroup):
self.id = str(uuid4())
self.query = query
self.context = context
self.config = config
self.workgroup = workgroup
self.start_time = time.time()
self.status = "QUEUED"
class AthenaBackend(BaseBackend):
region_name = None
@ -57,6 +67,7 @@ class AthenaBackend(BaseBackend):
if region_name is not None:
self.region_name = region_name
self.work_groups = {}
self.executions = {}
def create_work_group(self, name, configuration, description, tags):
if name in self.work_groups:
@ -76,6 +87,32 @@ class AthenaBackend(BaseBackend):
for wg in self.work_groups.values()
]
def get_work_group(self, name):
if name not in self.work_groups:
return None
wg = self.work_groups[name]
return {
"Name": wg.name,
"State": wg.state,
"Configuration": wg.configuration,
"Description": wg.description,
"CreationTime": time.time(),
}
def start_query_execution(self, query, context, config, workgroup):
execution = Execution(
query=query, context=context, config=config, workgroup=workgroup
)
self.executions[execution.id] = execution
return execution.id
def get_execution(self, exec_id):
return self.executions[exec_id]
def stop_query_execution(self, exec_id):
execution = self.executions[exec_id]
execution.status = "CANCELLED"
athena_backends = {}
for region in Session().get_available_regions("athena"):

View file

@ -18,15 +18,7 @@ class AthenaResponse(BaseResponse):
name, configuration, description, tags
)
if not work_group:
return (
json.dumps(
{
"__type": "InvalidRequestException",
"Message": "WorkGroup already exists",
}
),
dict(status=400),
)
return self.error("WorkGroup already exists", 400)
return json.dumps(
{
"CreateWorkGroupResponse": {
@ -39,3 +31,57 @@ class AthenaResponse(BaseResponse):
def list_work_groups(self):
return json.dumps({"WorkGroups": self.athena_backend.list_work_groups()})
def get_work_group(self):
name = self._get_param("WorkGroup")
return json.dumps({"WorkGroup": self.athena_backend.get_work_group(name)})
def start_query_execution(self):
query = self._get_param("QueryString")
context = self._get_param("QueryExecutionContext")
config = self._get_param("ResultConfiguration")
workgroup = self._get_param("WorkGroup")
if workgroup and not self.athena_backend.get_work_group(workgroup):
return self.error("WorkGroup does not exist", 400)
id = self.athena_backend.start_query_execution(
query=query, context=context, config=config, workgroup=workgroup
)
return json.dumps({"QueryExecutionId": id})
def get_query_execution(self):
exec_id = self._get_param("QueryExecutionId")
execution = self.athena_backend.get_execution(exec_id)
result = {
"QueryExecution": {
"QueryExecutionId": exec_id,
"Query": execution.query,
"StatementType": "DDL",
"ResultConfiguration": execution.config,
"QueryExecutionContext": execution.context,
"Status": {
"State": execution.status,
"SubmissionDateTime": execution.start_time,
},
"Statistics": {
"EngineExecutionTimeInMillis": 0,
"DataScannedInBytes": 0,
"TotalExecutionTimeInMillis": 0,
"QueryQueueTimeInMillis": 0,
"QueryPlanningTimeInMillis": 0,
"ServiceProcessingTimeInMillis": 0,
},
"WorkGroup": execution.workgroup,
}
}
return json.dumps(result)
def stop_query_execution(self):
exec_id = self._get_param("QueryExecutionId")
self.athena_backend.stop_query_execution(exec_id)
return json.dumps({})
def error(self, msg, status):
return (
json.dumps({"__type": "InvalidRequestException", "Message": msg,}),
dict(status=status),
)