Support rotating secrets using Lambda [#3905] (#3912)

* Support rotating secrets using Lambda

The Secrets manager rotation process uses an AWS Lambda function
to perform the rotation of a secret. [1]

In fact, it's not possible to trigger rotation of a Secret
without specifying a Lambda function at some point in the life
of the secret:

```
$ aws secretsmanager rotate-secret --secret-id /rotationTest

An error occurred (InvalidRequestException) when calling the RotateSecret operation: No Lambda rotation function ARN is associated with this secret.
```

`moto` can be a little more lenient in this regard and allow
`rotate_secret` to be called without a Lambda function being
present, if only to allow simulation of the `AWSCURRENT` and
`AWSPREVIOUS` labels moving across versions.

However, if a lambda function _has_ been specified when calling
`rotate_secret`, it should be invoked therefore providing the
developer with the full multi-stage process [3] which can be
used to test the Lambda function itself and ensuring that full
end-to-end testing is performed. Without this there's no easy
way to configure the Secret in the state needed to provide the
Lambda function with the data in the format it needs to be in
at each step of the invocation process.

[1]: https://docs.aws.amazon.com/secretsmanager/latest/userguide/rotating-secrets-lambda-function-overview.html
[2]: https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/secretsmanager.html#SecretsManager.Client.rotate_secret
[3]: https://docs.aws.amazon.com/secretsmanager/latest/userguide/rotating-secrets-lambda-function-overview.html#rotation-explanation-of-steps

* Run `black` over `secretsmanager/models.py`

* Make `lambda_backends` import local to the condition

* Implement `update_secret_version_stage`

Allow a staging label to be moved across versions.

https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/secretsmanager.html#SecretsManager.Client.update_secret_version_stage

* Add an integration test for Secrets Manager & Lambda

* Support passing `ClientRequestToken` to `put_secret_value`

By passing `ClientRequestToken` to `put_secret_value` within
the lambda function  invoked by calling `rotate_secret`, one
can update the value associated with the existing (pending)
version, without causing a new secret version to be created.

* Add application logic for `AWSPENDING`

The rotation function must end with the versions of the secret
in one of two states:

 - The `AWSPENDING` and `AWSCURRENT` staging labels are
   attached to the same version of the secret, or
 - The `AWSPENDING` staging label is not attached to any
   version of the secret.

If the `AWSPENDING` staging label is present but not attached
to the same version as `AWSCURRENT` then any later invocation
of RotateSecret assumes that a previous rotation request is
still in progress and returns an error.

* Update `default_version_id` after Lambda rotation concludes

Call `set_default_version_id` directly, rather than going 
through `reset_default_version` as the Lambda function is 
responsible for moving the version labels around, not `rotate_secret`.

* Run `black` over changed files

* Fix Python 2.7 compatibility

* Add additional test coverage for Secrets Manager

* Fix bug found by tests

AWSPENDING + AWSCURRENT check wasn't using `version_stages`.
Also tidy up the AWSCURRENT moving in `update_secret_version_stage`
to remove AWSPREVIOUS it from the new stage.

* Run `black` over changed files

* Add additional `rotate_secret` tests

* Skip `test_rotate_secret_lambda_invocations` in test server mode

* Add test for invalid Lambda ARN
This commit is contained in:
Daniel Samuels 2021-05-11 12:08:01 +01:00 committed by GitHub
commit a4b1498665
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 564 additions and 15 deletions

View file

@ -269,13 +269,7 @@ class SecretsManagerBackend(BaseBackend):
return secret.to_short_dict()
def create_secret(
self,
name,
secret_string=None,
secret_binary=None,
description=None,
tags=[],
**kwargs
self, name, secret_string=None, secret_binary=None, description=None, tags=[]
):
# error if secret exists
@ -325,7 +319,11 @@ class SecretsManagerBackend(BaseBackend):
if secret_id in self.secrets:
secret = self.secrets[secret_id]
secret.update(description, tags)
secret.reset_default_version(secret_version, version_id)
if "AWSPENDING" in version_stages:
secret.versions[version_id] = secret_version
else:
secret.reset_default_version(secret_version, version_id)
else:
secret = FakeSecret(
region_name=self.region,
@ -341,7 +339,14 @@ class SecretsManagerBackend(BaseBackend):
return secret
def put_secret_value(self, secret_id, secret_string, secret_binary, version_stages):
def put_secret_value(
self,
secret_id,
secret_string,
secret_binary,
client_request_token,
version_stages,
):
if not self._is_valid_identifier(secret_id):
raise SecretNotFoundException()
@ -354,6 +359,7 @@ class SecretsManagerBackend(BaseBackend):
secret_id,
secret_string,
secret_binary,
version_id=client_request_token,
description=description,
tags=tags,
version_stages=version_stages,
@ -410,26 +416,85 @@ class SecretsManagerBackend(BaseBackend):
secret = self.secrets[secret_id]
# The rotation function must end with the versions of the secret in
# one of two states:
#
# - The AWSPENDING and AWSCURRENT staging labels are attached to the
# same version of the secret, or
# - The AWSPENDING staging label is not attached to any version of the secret.
#
# If the AWSPENDING staging label is present but not attached to the same
# version as AWSCURRENT then any later invocation of RotateSecret assumes
# that a previous rotation request is still in progress and returns an error.
try:
version = next(
version
for version in secret.versions.values()
if "AWSPENDING" in version["version_stages"]
)
if "AWSCURRENT" in version["version_stages"]:
msg = "Previous rotation request is still in progress."
raise InvalidRequestException(msg)
except StopIteration:
# Pending is not present in any version
pass
old_secret_version = secret.versions[secret.default_version_id]
new_version_id = client_request_token or str(uuid.uuid4())
# We add the new secret version as "pending". The previous version remains
# as "current" for now. Once we've passed the new secret through the lambda
# rotation function (if provided) we can then update the status to "current".
self._add_secret(
secret_id,
old_secret_version["secret_string"],
description=secret.description,
tags=secret.tags,
version_id=new_version_id,
version_stages=["AWSCURRENT"],
version_stages=["AWSPENDING"],
)
secret.rotation_lambda_arn = rotation_lambda_arn or ""
if rotation_rules:
secret.auto_rotate_after_days = rotation_rules.get(rotation_days, 0)
if secret.auto_rotate_after_days > 0:
secret.rotation_enabled = True
if "AWSCURRENT" in old_secret_version["version_stages"]:
old_secret_version["version_stages"].remove("AWSCURRENT")
# Begin the rotation process for the given secret by invoking the lambda function.
if secret.rotation_lambda_arn:
from moto.awslambda.models import lambda_backends
lambda_backend = lambda_backends[self.region]
request_headers = {}
response_headers = {}
func = lambda_backend.get_function(secret.rotation_lambda_arn)
if not func:
msg = "Resource not found for ARN '{}'.".format(
secret.rotation_lambda_arn
)
raise ResourceNotFoundException(msg)
for step in ["create", "set", "test", "finish"]:
func.invoke(
json.dumps(
{
"Step": step + "Secret",
"SecretId": secret.name,
"ClientRequestToken": new_version_id,
}
),
request_headers,
response_headers,
)
secret.set_default_version_id(new_version_id)
else:
secret.reset_default_version(
secret.versions[new_version_id], new_version_id
)
secret.versions[new_version_id]["version_stages"] = ["AWSCURRENT"]
return secret.to_short_dict()
@ -592,6 +657,54 @@ class SecretsManagerBackend(BaseBackend):
return secret_id
def update_secret_version_stage(
self, secret_id, version_stage, remove_from_version_id, move_to_version_id
):
if secret_id not in self.secrets.keys():
raise SecretNotFoundException()
secret = self.secrets[secret_id]
if remove_from_version_id:
if remove_from_version_id not in secret.versions:
raise InvalidParameterException(
"Not a valid version: %s" % remove_from_version_id
)
stages = secret.versions[remove_from_version_id]["version_stages"]
if version_stage not in stages:
raise InvalidParameterException(
"Version stage %s not found in version %s"
% (version_stage, remove_from_version_id)
)
stages.remove(version_stage)
if move_to_version_id:
if move_to_version_id not in secret.versions:
raise InvalidParameterException(
"Not a valid version: %s" % move_to_version_id
)
stages = secret.versions[move_to_version_id]["version_stages"]
stages.append(version_stage)
if version_stage == "AWSCURRENT":
if remove_from_version_id:
# Whenever you move AWSCURRENT, Secrets Manager automatically
# moves the label AWSPREVIOUS to the version that AWSCURRENT
# was removed from.
secret.versions[remove_from_version_id]["version_stages"].append(
"AWSPREVIOUS"
)
if move_to_version_id:
stages = secret.versions[move_to_version_id]["version_stages"]
if "AWSPREVIOUS" in stages:
stages.remove("AWSPREVIOUS")
return secret_id
@staticmethod
def get_resource_policy(secret_id):
resource_policy = {

View file

@ -106,16 +106,21 @@ class SecretsManagerResponse(BaseResponse):
secret_id = self._get_param("SecretId", if_none="")
secret_string = self._get_param("SecretString")
secret_binary = self._get_param("SecretBinary")
client_request_token = self._get_param("ClientRequestToken")
if not secret_binary and not secret_string:
raise InvalidRequestException(
"You must provide either SecretString or SecretBinary."
)
version_stages = self._get_param("VersionStages", if_none=["AWSCURRENT"])
if not isinstance(version_stages, list):
version_stages = [version_stages]
return secretsmanager_backends[self.region].put_secret_value(
secret_id=secret_id,
secret_binary=secret_binary,
secret_string=secret_string,
version_stages=version_stages,
client_request_token=client_request_token,
)
def list_secret_version_ids(self):
@ -169,3 +174,15 @@ class SecretsManagerResponse(BaseResponse):
return secretsmanager_backends[self.region].untag_resource(
secret_id=secret_id, tag_keys=tag_keys
)
def update_secret_version_stage(self):
secret_id = self._get_param("SecretId")
version_stage = self._get_param("VersionStage")
remove_from_version_id = self._get_param("RemoveFromVersionId")
move_to_version_id = self._get_param("MoveToVersionId")
return secretsmanager_backends[self.region].update_secret_version_stage(
secret_id=secret_id,
version_stage=version_stage,
remove_from_version_id=remove_from_version_id,
move_to_version_id=move_to_version_id,
)