Lambda reserved concurrency (#3215)

* lambda-responses: add method to dispatch concurrency calls

* lambda-resources: add route to handle concurrency requests

* lambda-model: implement put_function_concurrency and concurrency attribute

* put-concurrency-tests: add one simple test

* get_function: add concurrency entry - with test

* lambda-reserved-concurrency: cloudformation support

* lambda-concurrency: implement delete_reserved with tests

* lambda-concurrency: implement get_reserved with tests

* lint

* implementation-cov: mark delete_function_concurrency, put_function_concurrency and get_function_concurrency

* botocore doesn't display concurrency entry for lambdas without it

* lambda(refactor): improvements on response's handler
This commit is contained in:
Guilherme Martins Crocetti 2020-08-26 07:06:53 -03:00 committed by GitHub
commit f744356da7
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
6 changed files with 170 additions and 8 deletions

View file

@ -165,6 +165,7 @@ class LambdaFunction(CloudFormationModel):
self.docker_client = docker.from_env()
self.policy = None
self.state = "Active"
self.reserved_concurrency = spec.get("ReservedConcurrentExecutions", None)
# Unfortunately mocking replaces this method w/o fallback enabled, so we
# need to replace it if we detect it's been mocked
@ -285,7 +286,7 @@ class LambdaFunction(CloudFormationModel):
return config
def get_code(self):
return {
code = {
"Code": {
"Location": "s3://awslambda-{0}-tasks.s3-{0}.amazonaws.com/{1}".format(
self.region, self.code["S3Key"]
@ -294,6 +295,15 @@ class LambdaFunction(CloudFormationModel):
},
"Configuration": self.get_configuration(),
}
if self.reserved_concurrency:
code.update(
{
"Concurrency": {
"ReservedConcurrentExecutions": self.reserved_concurrency
}
}
)
return code
def update_configuration(self, config_updates):
for key, value in config_updates.items():
@ -511,6 +521,15 @@ class LambdaFunction(CloudFormationModel):
cls, resource_name, cloudformation_json, region_name
):
properties = cloudformation_json["Properties"]
optional_properties = (
"Description",
"MemorySize",
"Publish",
"Timeout",
"VpcConfig",
"Environment",
"ReservedConcurrentExecutions",
)
# required
spec = {
@ -520,9 +539,7 @@ class LambdaFunction(CloudFormationModel):
"Role": properties["Role"],
"Runtime": properties["Runtime"],
}
optional_properties = (
"Description MemorySize Publish Timeout VpcConfig Environment".split()
)
# NOTE: Not doing `properties.get(k, DEFAULT)` to avoid duplicating the
# default logic
for prop in optional_properties:
@ -1157,6 +1174,20 @@ class LambdaBackend(BaseBackend):
else:
return None
def put_function_concurrency(self, function_name, reserved_concurrency):
fn = self.get_function(function_name)
fn.reserved_concurrency = reserved_concurrency
return fn.reserved_concurrency
def delete_function_concurrency(self, function_name):
fn = self.get_function(function_name)
fn.reserved_concurrency = None
return fn.reserved_concurrency
def get_function_concurrency(self, function_name):
fn = self.get_function(function_name)
return fn.reserved_concurrency
def do_validate_s3():
return os.environ.get("VALIDATE_LAMBDA_S3", "") in ["", "1", "true"]

View file

@ -141,6 +141,19 @@ class LambdaResponse(BaseResponse):
else:
raise ValueError("Cannot handle request")
def function_concurrency(self, request, full_url, headers):
http_method = request.method
self.setup_class(request, full_url, headers)
if http_method == "GET":
return self._get_function_concurrency(request)
elif http_method == "DELETE":
return self._delete_function_concurrency(request)
elif http_method == "PUT":
return self._put_function_concurrency(request)
else:
raise ValueError("Cannot handle request")
def _add_policy(self, request, full_url, headers):
path = request.path if hasattr(request, "path") else path_url(request.url)
function_name = path.split("/")[-2]
@ -359,3 +372,38 @@ class LambdaResponse(BaseResponse):
return 200, {}, json.dumps(resp)
else:
return 404, {}, "{}"
def _get_function_concurrency(self, request):
path_function_name = self.path.rsplit("/", 2)[-2]
function_name = self.lambda_backend.get_function(path_function_name)
if function_name is None:
return 404, {}, "{}"
resp = self.lambda_backend.get_function_concurrency(path_function_name)
return 200, {}, json.dumps({"ReservedConcurrentExecutions": resp})
def _delete_function_concurrency(self, request):
path_function_name = self.path.rsplit("/", 2)[-2]
function_name = self.lambda_backend.get_function(path_function_name)
if function_name is None:
return 404, {}, "{}"
self.lambda_backend.delete_function_concurrency(path_function_name)
return 204, {}, "{}"
def _put_function_concurrency(self, request):
path_function_name = self.path.rsplit("/", 2)[-2]
function = self.lambda_backend.get_function(path_function_name)
if function is None:
return 404, {}, "{}"
concurrency = self._get_param("ReservedConcurrentExecutions", None)
resp = self.lambda_backend.put_function_concurrency(
path_function_name, concurrency
)
return 200, {}, json.dumps({"ReservedConcurrentExecutions": resp})

View file

@ -19,4 +19,5 @@ url_paths = {
r"{0}/(?P<api_version>[^/]+)/functions/(?P<function_name>[\w_-]+)/policy/?$": response.policy,
r"{0}/(?P<api_version>[^/]+)/functions/(?P<function_name>[\w_-]+)/configuration/?$": response.configuration,
r"{0}/(?P<api_version>[^/]+)/functions/(?P<function_name>[\w_-]+)/code/?$": response.code,
r"{0}/(?P<api_version>[^/]+)/functions/(?P<function_name>[\w_-]+)/concurrency/?$": response.function_concurrency,
}