add Lambda tag_resource, untag_resource, and list_tags methods
This commit is contained in:
parent
f052757259
commit
98f95dd4a7
4 changed files with 170 additions and 0 deletions
|
|
@ -81,6 +81,8 @@ class LambdaFunction(BaseModel):
|
|||
self.function_arn = 'arn:aws:lambda:123456789012:function:{0}'.format(
|
||||
self.function_name)
|
||||
|
||||
self.tags = dict()
|
||||
|
||||
@property
|
||||
def vpc_config(self):
|
||||
config = self._vpc_config.copy()
|
||||
|
|
@ -278,6 +280,9 @@ class LambdaBackend(BaseBackend):
|
|||
def has_function(self, function_name):
|
||||
return function_name in self._functions
|
||||
|
||||
def has_function_arn(self, function_arn):
|
||||
return self.get_function_by_arn(function_arn) is not None
|
||||
|
||||
def create_function(self, spec):
|
||||
fn = LambdaFunction(spec)
|
||||
self._functions[fn.function_name] = fn
|
||||
|
|
@ -286,12 +291,33 @@ class LambdaBackend(BaseBackend):
|
|||
def get_function(self, function_name):
|
||||
return self._functions[function_name]
|
||||
|
||||
def get_function_by_arn(self, function_arn):
|
||||
for function in self._functions.values():
|
||||
if function.function_arn == function_arn:
|
||||
return function
|
||||
return None
|
||||
|
||||
def delete_function(self, function_name):
|
||||
del self._functions[function_name]
|
||||
|
||||
def list_functions(self):
|
||||
return self._functions.values()
|
||||
|
||||
def list_tags(self, resource):
|
||||
return self.get_function_by_arn(resource).tags
|
||||
|
||||
def tag_resource(self, resource, tags):
|
||||
self.get_function_by_arn(resource).tags.update(tags)
|
||||
|
||||
def untag_resource(self, resource, tagKeys):
|
||||
function = self.get_function_by_arn(resource)
|
||||
for key in tagKeys:
|
||||
try:
|
||||
del function.tags[key]
|
||||
except KeyError:
|
||||
pass
|
||||
# Don't care
|
||||
|
||||
|
||||
def do_validate_s3():
|
||||
return os.environ.get('VALIDATE_LAMBDA_S3', '') in ['', '1', 'true']
|
||||
|
|
|
|||
|
|
@ -3,6 +3,12 @@ from __future__ import unicode_literals
|
|||
import json
|
||||
import re
|
||||
|
||||
try:
|
||||
from urllib import unquote
|
||||
from urlparse import urlparse, parse_qs
|
||||
except:
|
||||
from urllib.parse import unquote, urlparse, parse_qs
|
||||
|
||||
from moto.core.responses import BaseResponse
|
||||
|
||||
|
||||
|
|
@ -33,6 +39,17 @@ class LambdaResponse(BaseResponse):
|
|||
else:
|
||||
raise ValueError("Cannot handle request")
|
||||
|
||||
def tag(self, request, full_url, headers):
|
||||
self.setup_class(request, full_url, headers)
|
||||
if request.method == 'GET':
|
||||
return self._list_tags(request, full_url)
|
||||
elif request.method == 'POST':
|
||||
return self._tag_resource(request, full_url)
|
||||
elif request.method == 'DELETE':
|
||||
return self._untag_resource(request, full_url)
|
||||
else:
|
||||
raise ValueError("Cannot handle {0} request".format(request.method))
|
||||
|
||||
def _invoke(self, request, full_url):
|
||||
response_headers = {}
|
||||
lambda_backend = self.get_lambda_backend(full_url)
|
||||
|
|
@ -102,3 +119,43 @@ class LambdaResponse(BaseResponse):
|
|||
return region.group(1)
|
||||
else:
|
||||
return self.default_region
|
||||
|
||||
def _list_tags(self, request, full_url):
|
||||
lambda_backend = self.get_lambda_backend(full_url)
|
||||
|
||||
path = request.path if hasattr(request, 'path') else request.path_url
|
||||
function_arn = unquote(path.split('/')[-1])
|
||||
|
||||
if lambda_backend.has_function_arn(function_arn):
|
||||
function = lambda_backend.get_function_by_arn(function_arn)
|
||||
return 200, {}, json.dumps(dict(Tags=function.tags))
|
||||
else:
|
||||
return 404, {}, "{}"
|
||||
|
||||
def _tag_resource(self, request, full_url):
|
||||
lambda_backend = self.get_lambda_backend(full_url)
|
||||
|
||||
path = request.path if hasattr(request, 'path') else request.path_url
|
||||
function_arn = unquote(path.split('/')[-1])
|
||||
|
||||
spec = json.loads(self.body)
|
||||
|
||||
if lambda_backend.has_function_arn(function_arn):
|
||||
lambda_backend.tag_resource(function_arn, spec['Tags'])
|
||||
return 200, {}, "{}"
|
||||
else:
|
||||
return 404, {}, "{}"
|
||||
|
||||
def _untag_resource(self, request, full_url):
|
||||
lambda_backend = self.get_lambda_backend(full_url)
|
||||
|
||||
path = request.path if hasattr(request, 'path') else request.path_url
|
||||
function_arn = unquote(path.split('/')[-1].split('?')[0])
|
||||
|
||||
tag_keys = parse_qs(urlparse(full_url).query)['tagKeys']
|
||||
|
||||
if lambda_backend.has_function_arn(function_arn):
|
||||
lambda_backend.untag_resource(function_arn, tag_keys)
|
||||
return 204, {}, "{}"
|
||||
else:
|
||||
return 404, {}, "{}"
|
||||
|
|
|
|||
|
|
@ -11,4 +11,5 @@ url_paths = {
|
|||
'{0}/(?P<api_version>[^/]+)/functions/?$': response.root,
|
||||
'{0}/(?P<api_version>[^/]+)/functions/(?P<function_name>[\w_-]+)/?$': response.function,
|
||||
'{0}/(?P<api_version>[^/]+)/functions/(?P<function_name>[\w_-]+)/invocations/?$': response.invoke,
|
||||
'{0}/(?P<api_version>[^/]+)/tags/(?P<resource_arn>.+)': response.tag
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue