Merge pull request #2457 from bblommers/feature/dynamodb_streams_invoke_lambda

Feature: Add option for DynamoDB stream to kick off lambda
This commit is contained in:
Steve Pulec 2019-10-10 17:05:43 -05:00 committed by GitHub
commit 31198c75b8
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 83 additions and 1 deletions

View file

@ -33,6 +33,8 @@ from moto.s3.exceptions import MissingBucket, MissingKey
from moto import settings
from .utils import make_function_arn, make_function_ver_arn
from moto.sqs import sqs_backends
from moto.dynamodb2 import dynamodb_backends2
from moto.dynamodbstreams import dynamodbstreams_backends
logger = logging.getLogger(__name__)
@ -748,6 +750,15 @@ class LambdaBackend(BaseBackend):
queue.lambda_event_source_mappings[esm.function_arn] = esm
return esm
for stream in json.loads(dynamodbstreams_backends[self.region_name].list_streams())['Streams']:
if stream['StreamArn'] == spec['EventSourceArn']:
spec.update({'FunctionArn': func.function_arn})
esm = EventSourceMapping(spec)
self._event_source_mappings[esm.uuid] = esm
table_name = stream['TableName']
table = dynamodb_backends2[self.region_name].get_table(table_name)
table.lambda_event_source_mappings[esm.function_arn] = esm
return esm
raise RESTError('ResourceNotFoundException', 'Invalid EventSourceArn')
def publish_function(self, function_name):
@ -867,6 +878,19 @@ class LambdaBackend(BaseBackend):
func = self._lambdas.get_function(function_name, qualifier)
func.invoke(json.dumps(event), {}, {})
def send_dynamodb_items(self, function_arn, items, source):
event = {'Records': [
{
'eventID': item.to_json()['eventID'],
'eventName': 'INSERT',
'eventVersion': item.to_json()['eventVersion'],
'eventSource': item.to_json()['eventSource'],
'awsRegion': self.region_name,
'dynamodb': item.to_json()['dynamodb'],
'eventSourceARN': source} for item in items]}
func = self._lambdas.get_arn(function_arn)
func.invoke(json.dumps(event), {}, {})
def list_tags(self, resource):
return self.get_function_by_arn(resource).tags