diff --git a/moto/secretsmanager/models.py b/moto/secretsmanager/models.py index 5542cb3e..84b25103 100644 --- a/moto/secretsmanager/models.py +++ b/moto/secretsmanager/models.py @@ -269,13 +269,7 @@ class SecretsManagerBackend(BaseBackend): return secret.to_short_dict() def create_secret( - self, - name, - secret_string=None, - secret_binary=None, - description=None, - tags=[], - **kwargs + self, name, secret_string=None, secret_binary=None, description=None, tags=[] ): # error if secret exists @@ -325,7 +319,11 @@ class SecretsManagerBackend(BaseBackend): if secret_id in self.secrets: secret = self.secrets[secret_id] secret.update(description, tags) - secret.reset_default_version(secret_version, version_id) + + if "AWSPENDING" in version_stages: + secret.versions[version_id] = secret_version + else: + secret.reset_default_version(secret_version, version_id) else: secret = FakeSecret( region_name=self.region, @@ -341,7 +339,14 @@ class SecretsManagerBackend(BaseBackend): return secret - def put_secret_value(self, secret_id, secret_string, secret_binary, version_stages): + def put_secret_value( + self, + secret_id, + secret_string, + secret_binary, + client_request_token, + version_stages, + ): if not self._is_valid_identifier(secret_id): raise SecretNotFoundException() @@ -354,6 +359,7 @@ class SecretsManagerBackend(BaseBackend): secret_id, secret_string, secret_binary, + version_id=client_request_token, description=description, tags=tags, version_stages=version_stages, @@ -410,26 +416,85 @@ class SecretsManagerBackend(BaseBackend): secret = self.secrets[secret_id] + # The rotation function must end with the versions of the secret in + # one of two states: + # + # - The AWSPENDING and AWSCURRENT staging labels are attached to the + # same version of the secret, or + # - The AWSPENDING staging label is not attached to any version of the secret. + # + # If the AWSPENDING staging label is present but not attached to the same + # version as AWSCURRENT then any later invocation of RotateSecret assumes + # that a previous rotation request is still in progress and returns an error. + try: + version = next( + version + for version in secret.versions.values() + if "AWSPENDING" in version["version_stages"] + ) + if "AWSCURRENT" in version["version_stages"]: + msg = "Previous rotation request is still in progress." + raise InvalidRequestException(msg) + + except StopIteration: + # Pending is not present in any version + pass + old_secret_version = secret.versions[secret.default_version_id] new_version_id = client_request_token or str(uuid.uuid4()) + # We add the new secret version as "pending". The previous version remains + # as "current" for now. Once we've passed the new secret through the lambda + # rotation function (if provided) we can then update the status to "current". self._add_secret( secret_id, old_secret_version["secret_string"], description=secret.description, tags=secret.tags, version_id=new_version_id, - version_stages=["AWSCURRENT"], + version_stages=["AWSPENDING"], ) - secret.rotation_lambda_arn = rotation_lambda_arn or "" if rotation_rules: secret.auto_rotate_after_days = rotation_rules.get(rotation_days, 0) if secret.auto_rotate_after_days > 0: secret.rotation_enabled = True - if "AWSCURRENT" in old_secret_version["version_stages"]: - old_secret_version["version_stages"].remove("AWSCURRENT") + # Begin the rotation process for the given secret by invoking the lambda function. + if secret.rotation_lambda_arn: + from moto.awslambda.models import lambda_backends + + lambda_backend = lambda_backends[self.region] + + request_headers = {} + response_headers = {} + + func = lambda_backend.get_function(secret.rotation_lambda_arn) + if not func: + msg = "Resource not found for ARN '{}'.".format( + secret.rotation_lambda_arn + ) + raise ResourceNotFoundException(msg) + + for step in ["create", "set", "test", "finish"]: + func.invoke( + json.dumps( + { + "Step": step + "Secret", + "SecretId": secret.name, + "ClientRequestToken": new_version_id, + } + ), + request_headers, + response_headers, + ) + + secret.set_default_version_id(new_version_id) + else: + secret.reset_default_version( + secret.versions[new_version_id], new_version_id + ) + secret.versions[new_version_id]["version_stages"] = ["AWSCURRENT"] return secret.to_short_dict() @@ -592,6 +657,54 @@ class SecretsManagerBackend(BaseBackend): return secret_id + def update_secret_version_stage( + self, secret_id, version_stage, remove_from_version_id, move_to_version_id + ): + if secret_id not in self.secrets.keys(): + raise SecretNotFoundException() + + secret = self.secrets[secret_id] + + if remove_from_version_id: + if remove_from_version_id not in secret.versions: + raise InvalidParameterException( + "Not a valid version: %s" % remove_from_version_id + ) + + stages = secret.versions[remove_from_version_id]["version_stages"] + if version_stage not in stages: + raise InvalidParameterException( + "Version stage %s not found in version %s" + % (version_stage, remove_from_version_id) + ) + + stages.remove(version_stage) + + if move_to_version_id: + if move_to_version_id not in secret.versions: + raise InvalidParameterException( + "Not a valid version: %s" % move_to_version_id + ) + + stages = secret.versions[move_to_version_id]["version_stages"] + stages.append(version_stage) + + if version_stage == "AWSCURRENT": + if remove_from_version_id: + # Whenever you move AWSCURRENT, Secrets Manager automatically + # moves the label AWSPREVIOUS to the version that AWSCURRENT + # was removed from. + secret.versions[remove_from_version_id]["version_stages"].append( + "AWSPREVIOUS" + ) + + if move_to_version_id: + stages = secret.versions[move_to_version_id]["version_stages"] + if "AWSPREVIOUS" in stages: + stages.remove("AWSPREVIOUS") + + return secret_id + @staticmethod def get_resource_policy(secret_id): resource_policy = { diff --git a/moto/secretsmanager/responses.py b/moto/secretsmanager/responses.py index a469f5ba..0433a565 100644 --- a/moto/secretsmanager/responses.py +++ b/moto/secretsmanager/responses.py @@ -106,16 +106,21 @@ class SecretsManagerResponse(BaseResponse): secret_id = self._get_param("SecretId", if_none="") secret_string = self._get_param("SecretString") secret_binary = self._get_param("SecretBinary") + client_request_token = self._get_param("ClientRequestToken") if not secret_binary and not secret_string: raise InvalidRequestException( "You must provide either SecretString or SecretBinary." ) version_stages = self._get_param("VersionStages", if_none=["AWSCURRENT"]) + if not isinstance(version_stages, list): + version_stages = [version_stages] + return secretsmanager_backends[self.region].put_secret_value( secret_id=secret_id, secret_binary=secret_binary, secret_string=secret_string, version_stages=version_stages, + client_request_token=client_request_token, ) def list_secret_version_ids(self): @@ -169,3 +174,15 @@ class SecretsManagerResponse(BaseResponse): return secretsmanager_backends[self.region].untag_resource( secret_id=secret_id, tag_keys=tag_keys ) + + def update_secret_version_stage(self): + secret_id = self._get_param("SecretId") + version_stage = self._get_param("VersionStage") + remove_from_version_id = self._get_param("RemoveFromVersionId") + move_to_version_id = self._get_param("MoveToVersionId") + return secretsmanager_backends[self.region].update_secret_version_stage( + secret_id=secret_id, + version_stage=version_stage, + remove_from_version_id=remove_from_version_id, + move_to_version_id=move_to_version_id, + ) diff --git a/tests/test_secretsmanager/test_secretsmanager.py b/tests/test_secretsmanager/test_secretsmanager.py index 1e351570..a6074294 100644 --- a/tests/test_secretsmanager/test_secretsmanager.py +++ b/tests/test_secretsmanager/test_secretsmanager.py @@ -3,7 +3,7 @@ from __future__ import unicode_literals import boto3 -from moto import mock_secretsmanager +from moto import mock_secretsmanager, mock_lambda, settings from botocore.exceptions import ClientError import string import pytz @@ -628,6 +628,131 @@ def test_rotate_secret_rotation_period_too_long(): ) +def get_rotation_zip_file(): + from tests.test_awslambda.test_lambda import _process_lambda + + func_str = """ +import boto3 +import json + +def lambda_handler(event, context): + arn = event['SecretId'] + token = event['ClientRequestToken'] + step = event['Step'] + + client = boto3.client("secretsmanager", region_name="us-west-2", endpoint_url="http://motoserver:5000") + metadata = client.describe_secret(SecretId=arn) + value = client.get_secret_value(SecretId=arn, VersionId=token, VersionStage="AWSPENDING") + + if not metadata['RotationEnabled']: + print("Secret %s is not enabled for rotation." % arn) + raise ValueError("Secret %s is not enabled for rotation." % arn) + versions = metadata['VersionIdsToStages'] + if token not in versions: + print("Secret version %s has no stage for rotation of secret %s." % (token, arn)) + raise ValueError("Secret version %s has no stage for rotation of secret %s." % (token, arn)) + if "AWSCURRENT" in versions[token]: + print("Secret version %s already set as AWSCURRENT for secret %s." % (token, arn)) + return + elif "AWSPENDING" not in versions[token]: + print("Secret version %s not set as AWSPENDING for rotation of secret %s." % (token, arn)) + raise ValueError("Secret version %s not set as AWSPENDING for rotation of secret %s." % (token, arn)) + + if step == 'createSecret': + try: + client.get_secret_value(SecretId=arn, VersionId=token, VersionStage='AWSPENDING') + except client.exceptions.ResourceNotFoundException: + client.put_secret_value( + SecretId=arn, + ClientRequestToken=token, + SecretString=json.dumps({'create': True}), + VersionStages=['AWSPENDING'] + ) + + if step == 'setSecret': + client.put_secret_value( + SecretId=arn, + ClientRequestToken=token, + SecretString='UpdatedValue', + VersionStages=["AWSPENDING"], + ) + + elif step == 'finishSecret': + current_version = next( + version + for version, stages in metadata['VersionIdsToStages'].items() + if 'AWSCURRENT' in stages + ) + print("current: %s new: %s" % (current_version, token)) + client.update_secret_version_stage( + SecretId=arn, + VersionStage='AWSCURRENT', + MoveToVersionId=token, + RemoveFromVersionId=current_version, + ) + client.update_secret_version_stage( + SecretId=arn, + VersionStage='AWSPENDING', + RemoveFromVersionId=token, + ) + """ + return _process_lambda(func_str) + + +if settings.TEST_SERVER_MODE: + + @mock_lambda + @mock_secretsmanager + def test_rotate_secret_using_lambda(): + from tests.test_awslambda.test_lambda import get_role_name + + # Passing a `RotationLambdaARN` value to `rotate_secret` should invoke lambda + lambda_conn = boto3.client( + "lambda", region_name="us-west-2", endpoint_url="http://localhost:5000", + ) + func = lambda_conn.create_function( + FunctionName="testFunction", + Runtime="python3.8", + Role=get_role_name(), + Handler="lambda_function.lambda_handler", + Code={"ZipFile": get_rotation_zip_file()}, + Description="Secret rotator", + Timeout=3, + MemorySize=128, + Publish=True, + ) + + secrets_conn = boto3.client( + "secretsmanager", + region_name="us-west-2", + endpoint_url="http://localhost:5000", + ) + secret = secrets_conn.create_secret( + Name=DEFAULT_SECRET_NAME, SecretString="InitialValue", + ) + initial_version = secret["VersionId"] + + rotated_secret = secrets_conn.rotate_secret( + SecretId=DEFAULT_SECRET_NAME, + RotationLambdaARN=func["FunctionArn"], + RotationRules=dict(AutomaticallyAfterDays=30,), + ) + + # Ensure we received an updated VersionId from `rotate_secret` + assert rotated_secret["VersionId"] != initial_version + + updated_secret = secrets_conn.get_secret_value( + SecretId=DEFAULT_SECRET_NAME, VersionStage="AWSCURRENT", + ) + rotated_version = updated_secret["VersionId"] + + assert initial_version != rotated_version + metadata = secrets_conn.describe_secret(SecretId=DEFAULT_SECRET_NAME) + assert metadata["VersionIdsToStages"][initial_version] == ["AWSPREVIOUS"] + assert metadata["VersionIdsToStages"][rotated_version] == ["AWSCURRENT"] + assert updated_secret["SecretString"] == "UpdatedValue" + + @mock_secretsmanager def test_put_secret_value_on_non_existing_secret(): conn = boto3.client("secretsmanager", region_name="us-west-2") diff --git a/tests/test_secretsmanager/test_server.py b/tests/test_secretsmanager/test_server.py index da41eb5f..d5d9223e 100644 --- a/tests/test_secretsmanager/test_server.py +++ b/tests/test_secretsmanager/test_server.py @@ -2,10 +2,14 @@ from __future__ import unicode_literals import json + +import boto3 +import pytest import sure # noqa import moto.server as server -from moto import mock_secretsmanager +from moto import mock_secretsmanager, mock_lambda, mock_iam, mock_logs, settings +from tests.test_awslambda.test_lambda import get_test_zip_file1 """ Test the different server responses for secretsmanager @@ -324,6 +328,54 @@ def test_rotate_secret_that_does_not_match(): assert json_data["__type"] == "ResourceNotFoundException" +@mock_secretsmanager +def test_rotate_secret_that_is_still_rotating(): + backend = server.create_backend_app("secretsmanager") + test_client = backend.test_client() + + create_secret = test_client.post( + "/", + data={ + "Name": DEFAULT_SECRET_NAME, + "SecretString": "foosecret", + # "VersionStages": ["AWSPENDING"], + }, + headers={"X-Amz-Target": "secretsmanager.CreateSecret"}, + ) + create_secret = json.loads(create_secret.data.decode("utf-8")) + + # Get the secret into a broken state. + version_id = create_secret["VersionId"] + test_client.post( + "/", + data={ + "SecretId": "test-secret", + "VersionStage": "AWSPENDING", + "MoveToVersionId": version_id, + }, + headers={"X-Amz-Target": "secretsmanager.UpdateSecretVersionStage"}, + ) + describe_secret = test_client.post( + "/", + data={"SecretId": DEFAULT_SECRET_NAME}, + headers={"X-Amz-Target": "secretsmanager.DescribeSecret"}, + ) + + metadata = json.loads(describe_secret.data.decode("utf-8")) + assert metadata["SecretVersionsToStages"][version_id] == [ + "AWSCURRENT", + "AWSPENDING", + ] + + # Then attempt to rotate it + rotate_secret = test_client.post( + "/", + data={"SecretId": DEFAULT_SECRET_NAME}, + headers={"X-Amz-Target": "secretsmanager.RotateSecret"}, + ) + assert rotate_secret.status_code == 400 + + @mock_secretsmanager def test_rotate_secret_client_request_token_too_short(): backend = server.create_backend_app("secretsmanager") @@ -404,6 +456,79 @@ def test_rotate_secret_rotation_lambda_arn_too_long(): assert json_data["__type"] == "InvalidParameterException" +if not settings.TEST_SERVER_MODE: + + @mock_iam + @mock_lambda + @mock_logs + @mock_secretsmanager + def test_rotate_secret_lambda_invocations(): + conn = boto3.client("iam", region_name="us-east-1") + logs_conn = boto3.client("logs", region_name="us-east-1") + role = conn.create_role( + RoleName="role", AssumeRolePolicyDocument="some policy", Path="/my-path/", + ) + + conn = boto3.client("lambda", region_name="us-east-1") + func = conn.create_function( + FunctionName="testFunction", + Code=dict(ZipFile=get_test_zip_file1()), + Handler="lambda_function.lambda_handler", + Runtime="python2.7", + Role=role["Role"]["Arn"], + ) + + secretsmanager_backend = server.create_backend_app("secretsmanager") + secretsmanager_client = secretsmanager_backend.test_client() + + secretsmanager_client.post( + "/", + data={"Name": DEFAULT_SECRET_NAME, "SecretString": "foosecret"}, + headers={"X-Amz-Target": "secretsmanager.CreateSecret"}, + ) + + with pytest.raises(logs_conn.exceptions.ResourceNotFoundException): + # The log group doesn't exist yet + logs_conn.describe_log_streams(logGroupName="/aws/lambda/testFunction") + + secretsmanager_client.post( + "/", + data={ + "SecretId": DEFAULT_SECRET_NAME, + "RotationLambdaARN": func["FunctionArn"], + }, + headers={"X-Amz-Target": "secretsmanager.RotateSecret"}, + ) + + # The log group now exists and has been logged to 4 times (for each invocation) + logs = logs_conn.describe_log_streams(logGroupName="/aws/lambda/testFunction") + assert len(logs["logStreams"]) == 4 + + @mock_iam + @mock_lambda + @mock_logs + @mock_secretsmanager + def test_rotate_secret_with_incorrect_lambda_arn(): + secretsmanager_backend = server.create_backend_app("secretsmanager") + secretsmanager_client = secretsmanager_backend.test_client() + + secretsmanager_client.post( + "/", + data={"Name": DEFAULT_SECRET_NAME, "SecretString": "foosecret"}, + headers={"X-Amz-Target": "secretsmanager.CreateSecret"}, + ) + + resp = secretsmanager_client.post( + "/", + data={"SecretId": DEFAULT_SECRET_NAME, "RotationLambdaARN": "notarealarn",}, + headers={"X-Amz-Target": "secretsmanager.RotateSecret"}, + ) + json_data = json.loads(resp.data.decode("utf-8")) + assert json_data["message"] == "Resource not found for ARN 'notarealarn'." + assert json_data["__type"] == "ResourceNotFoundException" + assert resp.status_code == 404 + + @mock_secretsmanager def test_put_secret_value_puts_new_secret(): backend = server.create_backend_app("secretsmanager") @@ -629,6 +754,175 @@ def test_get_resource_policy_secret(): assert json_data["Name"] == "test-secret" +@mock_secretsmanager +def test_update_secret_version_stage(): + custom_stage = "CUSTOM_STAGE" + backend = server.create_backend_app("secretsmanager") + test_client = backend.test_client() + create_secret = test_client.post( + "/", + data={"Name": "test-secret", "SecretString": "secret"}, + headers={"X-Amz-Target": "secretsmanager.CreateSecret"}, + ) + create_secret = json.loads(create_secret.data.decode("utf-8")) + initial_version = create_secret["VersionId"] + + # Create a new version + put_secret = test_client.post( + "/", + data={ + "SecretId": DEFAULT_SECRET_NAME, + "SecretString": "secret", + "VersionStages": [custom_stage], + }, + headers={"X-Amz-Target": "secretsmanager.PutSecretValue"}, + ) + put_secret = json.loads(put_secret.data.decode("utf-8")) + new_version = put_secret["VersionId"] + + describe_secret = test_client.post( + "/", + data={"SecretId": "test-secret"}, + headers={"X-Amz-Target": "secretsmanager.DescribeSecret"}, + ) + + json_data = json.loads(describe_secret.data.decode("utf-8")) + stages = json_data["SecretVersionsToStages"] + assert len(stages) == 2 + assert stages[initial_version] == ["AWSPREVIOUS"] + assert stages[new_version] == [custom_stage] + + test_client.post( + "/", + data={ + "SecretId": "test-secret", + "VersionStage": custom_stage, + "RemoveFromVersionId": new_version, + "MoveToVersionId": initial_version, + }, + headers={"X-Amz-Target": "secretsmanager.UpdateSecretVersionStage"}, + ) + + describe_secret = test_client.post( + "/", + data={"SecretId": "test-secret"}, + headers={"X-Amz-Target": "secretsmanager.DescribeSecret"}, + ) + + json_data = json.loads(describe_secret.data.decode("utf-8")) + stages = json_data["SecretVersionsToStages"] + assert len(stages) == 2 + assert stages[initial_version] == ["AWSPREVIOUS", custom_stage] + assert stages[new_version] == [] + + +@mock_secretsmanager +def test_update_secret_version_stage_currentversion_handling(): + backend = server.create_backend_app("secretsmanager") + test_client = backend.test_client() + create_secret = test_client.post( + "/", + data={"Name": "test-secret", "SecretString": "secret"}, + headers={"X-Amz-Target": "secretsmanager.CreateSecret"}, + ) + create_secret = json.loads(create_secret.data.decode("utf-8")) + initial_version = create_secret["VersionId"] + + # Create a new version + put_secret = test_client.post( + "/", + data={"SecretId": DEFAULT_SECRET_NAME, "SecretString": "secret",}, + headers={"X-Amz-Target": "secretsmanager.PutSecretValue"}, + ) + put_secret = json.loads(put_secret.data.decode("utf-8")) + new_version = put_secret["VersionId"] + + describe_secret = test_client.post( + "/", + data={"SecretId": "test-secret"}, + headers={"X-Amz-Target": "secretsmanager.DescribeSecret"}, + ) + + json_data = json.loads(describe_secret.data.decode("utf-8")) + stages = json_data["SecretVersionsToStages"] + assert len(stages) == 2 + assert stages[initial_version] == ["AWSPREVIOUS"] + assert stages[new_version] == ["AWSCURRENT"] + + test_client.post( + "/", + data={ + "SecretId": "test-secret", + "VersionStage": "AWSCURRENT", + "RemoveFromVersionId": new_version, + "MoveToVersionId": initial_version, + }, + headers={"X-Amz-Target": "secretsmanager.UpdateSecretVersionStage"}, + ) + + describe_secret = test_client.post( + "/", + data={"SecretId": "test-secret"}, + headers={"X-Amz-Target": "secretsmanager.DescribeSecret"}, + ) + + json_data = json.loads(describe_secret.data.decode("utf-8")) + stages = json_data["SecretVersionsToStages"] + assert len(stages) == 2 + assert stages[initial_version] == ["AWSCURRENT"] + assert stages[new_version] == ["AWSPREVIOUS"] + + +@mock_secretsmanager +def test_update_secret_version_stage_validation(): + backend = server.create_backend_app("secretsmanager") + test_client = backend.test_client() + + # Secret ID that doesn't exist + resp = test_client.post( + "/", + data={"SecretId": "nonexistent"}, + headers={"X-Amz-Target": "secretsmanager.UpdateSecretVersionStage"}, + ) + assert resp.status_code == 404 + + # Add a secret so we can run further checks + secret = test_client.post( + "/", + data={"Name": DEFAULT_SECRET_NAME, "SecretString": "secret"}, + headers={"X-Amz-Target": "secretsmanager.CreateSecret"}, + ) + secret = json.loads(secret.data.decode("utf-8")) + + # "Remove from" version ID that doesn't exist + resp = test_client.post( + "/", + data={"SecretId": DEFAULT_SECRET_NAME, "RemoveFromVersionId": "nonexistent"}, + headers={"X-Amz-Target": "secretsmanager.UpdateSecretVersionStage"}, + ) + assert resp.status_code == 400 + + # "Remove from" stage name which isn't attached to the given version + resp = test_client.post( + "/", + data={ + "SecretId": DEFAULT_SECRET_NAME, + "RemoveFromVersionId": secret["VersionId"], + "VersionStage": "nonexistent", + }, + headers={"X-Amz-Target": "secretsmanager.UpdateSecretVersionStage"}, + ) + assert resp.status_code == 400 + + # "Move to" version ID that doesn't exist + resp = test_client.post( + "/", + data={"SecretId": DEFAULT_SECRET_NAME, "MoveToVersionId": "nonexistent",}, + headers={"X-Amz-Target": "secretsmanager.UpdateSecretVersionStage"}, + ) + assert resp.status_code == 400 + + # # The following tests should work, but fail on the embedded dict in # RotationRules. The error message suggests a problem deeper in the code, which