#2546 - AWS Lambda: Add Role validation when creating functions

This commit is contained in:
Bert Blommers 2019-11-07 17:11:13 +00:00
commit 40aa73a12b
6 changed files with 149 additions and 50 deletions

View file

@ -15,6 +15,7 @@ from freezegun import freeze_time
from moto import (
mock_dynamodb2,
mock_lambda,
mock_iam,
mock_s3,
mock_ec2,
mock_sns,
@ -22,6 +23,7 @@ from moto import (
settings,
mock_sqs,
)
from moto.sts.models import ACCOUNT_ID
from nose.tools import assert_raises
from botocore.exceptions import ClientError
@ -96,7 +98,7 @@ def test_invoke_requestresponse_function():
conn.create_function(
FunctionName="testFunction",
Runtime="python2.7",
Role="test-iam-role",
Role=get_role_name(),
Handler="lambda_function.lambda_handler",
Code={"ZipFile": get_test_zip_file1()},
Description="test lambda function",
@ -129,7 +131,7 @@ def test_invoke_event_function():
conn.create_function(
FunctionName="testFunction",
Runtime="python2.7",
Role="test-iam-role",
Role=get_role_name(),
Handler="lambda_function.lambda_handler",
Code={"ZipFile": get_test_zip_file1()},
Description="test lambda function",
@ -163,7 +165,7 @@ if settings.TEST_SERVER_MODE:
conn.create_function(
FunctionName="testFunction",
Runtime="python2.7",
Role="test-iam-role",
Role=get_role_name(),
Handler="lambda_function.lambda_handler",
Code={"ZipFile": get_test_zip_file2()},
Description="test lambda function",
@ -216,7 +218,7 @@ def test_invoke_function_from_sns():
result = conn.create_function(
FunctionName="testFunction",
Runtime="python2.7",
Role="test-iam-role",
Role=get_role_name(),
Handler="lambda_function.lambda_handler",
Code={"ZipFile": get_test_zip_file3()},
Description="test lambda function",
@ -260,7 +262,7 @@ def test_create_based_on_s3_with_missing_bucket():
conn.create_function.when.called_with(
FunctionName="testFunction",
Runtime="python2.7",
Role="test-iam-role",
Role=get_role_name(),
Handler="lambda_function.lambda_handler",
Code={"S3Bucket": "this-bucket-does-not-exist", "S3Key": "test.zip"},
Description="test lambda function",
@ -285,7 +287,7 @@ def test_create_function_from_aws_bucket():
result = conn.create_function(
FunctionName="testFunction",
Runtime="python2.7",
Role="test-iam-role",
Role=get_role_name(),
Handler="lambda_function.lambda_handler",
Code={"S3Bucket": "test-bucket", "S3Key": "test.zip"},
Description="test lambda function",
@ -306,7 +308,7 @@ def test_create_function_from_aws_bucket():
_lambda_region
),
"Runtime": "python2.7",
"Role": "test-iam-role",
"Role": result["Role"],
"Handler": "lambda_function.lambda_handler",
"CodeSha256": hashlib.sha256(zip_content).hexdigest(),
"CodeSize": len(zip_content),
@ -332,7 +334,7 @@ def test_create_function_from_zipfile():
result = conn.create_function(
FunctionName="testFunction",
Runtime="python2.7",
Role="test-iam-role",
Role=get_role_name(),
Handler="lambda_function.lambda_handler",
Code={"ZipFile": zip_content},
Description="test lambda function",
@ -353,7 +355,7 @@ def test_create_function_from_zipfile():
_lambda_region
),
"Runtime": "python2.7",
"Role": "test-iam-role",
"Role": result["Role"],
"Handler": "lambda_function.lambda_handler",
"CodeSize": len(zip_content),
"Description": "test lambda function",
@ -381,7 +383,7 @@ def test_get_function():
conn.create_function(
FunctionName="testFunction",
Runtime="python2.7",
Role="test-iam-role",
Role=get_role_name(),
Handler="lambda_function.lambda_handler",
Code={"S3Bucket": "test-bucket", "S3Key": "test.zip"},
Description="test lambda function",
@ -412,7 +414,7 @@ def test_get_function():
result["Configuration"]["FunctionName"].should.equal("testFunction")
result["Configuration"]["Handler"].should.equal("lambda_function.lambda_handler")
result["Configuration"]["MemorySize"].should.equal(128)
result["Configuration"]["Role"].should.equal("test-iam-role")
result["Configuration"]["Role"].should.equal(get_role_name())
result["Configuration"]["Runtime"].should.equal("python2.7")
result["Configuration"]["Timeout"].should.equal(3)
result["Configuration"]["Version"].should.equal("$LATEST")
@ -449,7 +451,7 @@ def test_get_function_by_arn():
fnc = conn.create_function(
FunctionName="testFunction",
Runtime="python2.7",
Role="test-iam-role",
Role=get_role_name(),
Handler="lambda_function.lambda_handler",
Code={"S3Bucket": bucket_name, "S3Key": "test.zip"},
Description="test lambda function",
@ -475,7 +477,7 @@ def test_delete_function():
conn.create_function(
FunctionName="testFunction",
Runtime="python2.7",
Role="test-iam-role",
Role=get_role_name(),
Handler="lambda_function.lambda_handler",
Code={"S3Bucket": "test-bucket", "S3Key": "test.zip"},
Description="test lambda function",
@ -510,7 +512,7 @@ def test_delete_function_by_arn():
fnc = conn.create_function(
FunctionName="testFunction",
Runtime="python2.7",
Role="test-iam-role",
Role=get_role_name(),
Handler="lambda_function.lambda_handler",
Code={"S3Bucket": bucket_name, "S3Key": "test.zip"},
Description="test lambda function",
@ -545,7 +547,7 @@ def test_publish():
conn.create_function(
FunctionName="testFunction",
Runtime="python2.7",
Role="test-iam-role",
Role=get_role_name(),
Handler="lambda_function.lambda_handler",
Code={"S3Bucket": "test-bucket", "S3Key": "test.zip"},
Description="test lambda function",
@ -597,7 +599,7 @@ def test_list_create_list_get_delete_list():
conn.create_function(
FunctionName="testFunction",
Runtime="python2.7",
Role="test-iam-role",
Role=get_role_name(),
Handler="lambda_function.lambda_handler",
Code={"S3Bucket": "test-bucket", "S3Key": "test.zip"},
Description="test lambda function",
@ -622,7 +624,7 @@ def test_list_create_list_get_delete_list():
"FunctionName": "testFunction",
"Handler": "lambda_function.lambda_handler",
"MemorySize": 128,
"Role": "test-iam-role",
"Role": get_role_name(),
"Runtime": "python2.7",
"Timeout": 3,
"Version": "$LATEST",
@ -663,7 +665,7 @@ def lambda_handler(event, context):
client.create_function(
FunctionName="test-lambda-fx",
Runtime="python2.7",
Role="test-iam-role",
Role=get_role_name(),
Handler="lambda_function.lambda_handler",
Description="test lambda function",
Timeout=3,
@ -696,7 +698,7 @@ def test_tags():
function = conn.create_function(
FunctionName="testFunction",
Runtime="python2.7",
Role="test-iam-role",
Role=get_role_name(),
Handler="lambda_function.handler",
Code={"S3Bucket": "test-bucket", "S3Key": "test.zip"},
Description="test lambda function",
@ -764,7 +766,7 @@ def test_invoke_async_function():
conn.create_function(
FunctionName="testFunction",
Runtime="python2.7",
Role="test-iam-role",
Role=get_role_name(),
Handler="lambda_function.lambda_handler",
Code={"ZipFile": get_test_zip_file1()},
Description="test lambda function",
@ -788,7 +790,7 @@ def test_get_function_created_with_zipfile():
result = conn.create_function(
FunctionName="testFunction",
Runtime="python2.7",
Role="test-iam-role",
Role=get_role_name(),
Handler="lambda_function.handler",
Code={"ZipFile": zip_content},
Description="test lambda function",
@ -817,7 +819,7 @@ def test_get_function_created_with_zipfile():
"FunctionName": "testFunction",
"Handler": "lambda_function.handler",
"MemorySize": 128,
"Role": "test-iam-role",
"Role": get_role_name(),
"Runtime": "python2.7",
"Timeout": 3,
"Version": "$LATEST",
@ -833,7 +835,7 @@ def test_add_function_permission():
conn.create_function(
FunctionName="testFunction",
Runtime="python2.7",
Role="test-iam-role",
Role=(get_role_name()),
Handler="lambda_function.handler",
Code={"ZipFile": zip_content},
Description="test lambda function",
@ -864,7 +866,7 @@ def test_get_function_policy():
conn.create_function(
FunctionName="testFunction",
Runtime="python2.7",
Role="test-iam-role",
Role=get_role_name(),
Handler="lambda_function.handler",
Code={"ZipFile": zip_content},
Description="test lambda function",
@ -904,7 +906,7 @@ def test_list_versions_by_function():
conn.create_function(
FunctionName="testFunction",
Runtime="python2.7",
Role="arn:aws:iam::123456789012:role/test-iam-role",
Role=get_role_name(),
Handler="lambda_function.lambda_handler",
Code={"S3Bucket": "test-bucket", "S3Key": "test.zip"},
Description="test lambda function",
@ -933,7 +935,7 @@ def test_list_versions_by_function():
conn.create_function(
FunctionName="testFunction_2",
Runtime="python2.7",
Role="arn:aws:iam::123456789012:role/test-iam-role",
Role=get_role_name(),
Handler="lambda_function.lambda_handler",
Code={"S3Bucket": "test-bucket", "S3Key": "test.zip"},
Description="test lambda function",
@ -962,7 +964,7 @@ def test_create_function_with_already_exists():
conn.create_function(
FunctionName="testFunction",
Runtime="python2.7",
Role="test-iam-role",
Role=get_role_name(),
Handler="lambda_function.lambda_handler",
Code={"S3Bucket": "test-bucket", "S3Key": "test.zip"},
Description="test lambda function",
@ -974,7 +976,7 @@ def test_create_function_with_already_exists():
response = conn.create_function(
FunctionName="testFunction",
Runtime="python2.7",
Role="test-iam-role",
Role=get_role_name(),
Handler="lambda_function.lambda_handler",
Code={"S3Bucket": "test-bucket", "S3Key": "test.zip"},
Description="test lambda function",
@ -1006,7 +1008,7 @@ def test_create_event_source_mapping():
func = conn.create_function(
FunctionName="testFunction",
Runtime="python2.7",
Role="test-iam-role",
Role=get_role_name(),
Handler="lambda_function.lambda_handler",
Code={"ZipFile": get_test_zip_file3()},
Description="test lambda function",
@ -1036,7 +1038,7 @@ def test_invoke_function_from_sqs():
func = conn.create_function(
FunctionName="testFunction",
Runtime="python2.7",
Role="test-iam-role",
Role=get_role_name(),
Handler="lambda_function.lambda_handler",
Code={"ZipFile": get_test_zip_file3()},
Description="test lambda function",
@ -1096,7 +1098,7 @@ def test_invoke_function_from_dynamodb():
func = conn.create_function(
FunctionName="testFunction",
Runtime="python2.7",
Role="test-iam-role",
Role=get_role_name(),
Handler="lambda_function.lambda_handler",
Code={"ZipFile": get_test_zip_file3()},
Description="test lambda function executed after a DynamoDB table is updated",
@ -1147,7 +1149,7 @@ def test_invoke_function_from_sqs_exception():
func = conn.create_function(
FunctionName="testFunction",
Runtime="python2.7",
Role="test-iam-role",
Role=get_role_name(),
Handler="lambda_function.lambda_handler",
Code={"ZipFile": get_test_zip_file4()},
Description="test lambda function",
@ -1206,7 +1208,7 @@ def test_list_event_source_mappings():
func = conn.create_function(
FunctionName="testFunction",
Runtime="python2.7",
Role="test-iam-role",
Role=get_role_name(),
Handler="lambda_function.lambda_handler",
Code={"ZipFile": get_test_zip_file3()},
Description="test lambda function",
@ -1238,7 +1240,7 @@ def test_get_event_source_mapping():
func = conn.create_function(
FunctionName="testFunction",
Runtime="python2.7",
Role="test-iam-role",
Role=get_role_name(),
Handler="lambda_function.lambda_handler",
Code={"ZipFile": get_test_zip_file3()},
Description="test lambda function",
@ -1268,7 +1270,7 @@ def test_update_event_source_mapping():
func1 = conn.create_function(
FunctionName="testFunction",
Runtime="python2.7",
Role="test-iam-role",
Role=get_role_name(),
Handler="lambda_function.lambda_handler",
Code={"ZipFile": get_test_zip_file3()},
Description="test lambda function",
@ -1279,7 +1281,7 @@ def test_update_event_source_mapping():
func2 = conn.create_function(
FunctionName="testFunction2",
Runtime="python2.7",
Role="test-iam-role",
Role=get_role_name(),
Handler="lambda_function.lambda_handler",
Code={"ZipFile": get_test_zip_file3()},
Description="test lambda function",
@ -1312,7 +1314,7 @@ def test_delete_event_source_mapping():
func1 = conn.create_function(
FunctionName="testFunction",
Runtime="python2.7",
Role="test-iam-role",
Role=get_role_name(),
Handler="lambda_function.lambda_handler",
Code={"ZipFile": get_test_zip_file3()},
Description="test lambda function",
@ -1348,7 +1350,7 @@ def test_update_configuration():
fxn = conn.create_function(
FunctionName="testFunction",
Runtime="python2.7",
Role="test-iam-role",
Role=get_role_name(),
Handler="lambda_function.lambda_handler",
Code={"S3Bucket": "test-bucket", "S3Key": "test.zip"},
Description="test lambda function",
@ -1393,7 +1395,7 @@ def test_update_function_zip():
fxn = conn.create_function(
FunctionName="testFunctionZip",
Runtime="python2.7",
Role="test-iam-role",
Role=get_role_name(),
Handler="lambda_function.lambda_handler",
Code={"ZipFile": zip_content_one},
Description="test lambda function",
@ -1428,7 +1430,7 @@ def test_update_function_zip():
"FunctionName": "testFunctionZip",
"Handler": "lambda_function.lambda_handler",
"MemorySize": 128,
"Role": "test-iam-role",
"Role": fxn["Role"],
"Runtime": "python2.7",
"Timeout": 3,
"Version": "2",
@ -1451,7 +1453,7 @@ def test_update_function_s3():
fxn = conn.create_function(
FunctionName="testFunctionS3",
Runtime="python2.7",
Role="test-iam-role",
Role=get_role_name(),
Handler="lambda_function.lambda_handler",
Code={"S3Bucket": "test-bucket", "S3Key": "test.zip"},
Description="test lambda function",
@ -1490,10 +1492,67 @@ def test_update_function_s3():
"FunctionName": "testFunctionS3",
"Handler": "lambda_function.lambda_handler",
"MemorySize": 128,
"Role": "test-iam-role",
"Role": fxn["Role"],
"Runtime": "python2.7",
"Timeout": 3,
"Version": "2",
"VpcConfig": {"SecurityGroupIds": [], "SubnetIds": []},
}
)
@mock_lambda
def test_create_function_with_invalid_arn():
err = create_invalid_lambda("test-iam-role")
err.exception.response["Error"]["Code"].should.equal("ValidationException")
err.exception.response["Error"]["Message"].should.equal(
"1 validation error detected: Value 'test-iam-role' at 'role' failed to satisfy constraint: Member must satisfy regular expression pattern: arn:(aws[a-zA-Z-]*)?:iam::(\d{12}):role/?[a-zA-Z_0-9+=,.@\-_/]+"
)
@mock_lambda
def test_create_function_with_arn_from_different_account():
err = create_invalid_lambda("arn:aws:iam::000000000000:role/example_role")
err.exception.response["Error"]["Code"].should.equal("AccessDeniedException")
err.exception.response["Error"]["Message"].should.equal(
"Cross-account pass role is not allowed."
)
@mock_lambda
def test_create_function_with_unknown_arn():
err = create_invalid_lambda(
"arn:aws:iam::" + str(ACCOUNT_ID) + ":role/service-role/unknown_role"
)
err.exception.response["Error"]["Code"].should.equal(
"InvalidParameterValueException"
)
err.exception.response["Error"]["Message"].should.equal(
"The role defined for the function cannot be assumed by Lambda."
)
def create_invalid_lambda(role):
conn = boto3.client("lambda", "us-west-2")
zip_content = get_test_zip_file1()
with assert_raises(ClientError) as err:
conn.create_function(
FunctionName="testFunction",
Runtime="python2.7",
Role=role,
Handler="lambda_function.handler",
Code={"ZipFile": zip_content},
Description="test lambda function",
Timeout=3,
MemorySize=128,
Publish=True,
)
return err
def get_role_name():
with mock_iam():
iam = boto3.client("iam")
return iam.create_role(
RoleName="my-role", AssumeRolePolicyDocument="some policy", Path="/my-path/"
)["Role"]["Arn"]