add proper KMS encrypt, decrypt, and generate_data_key functionality and tests

This commit is contained in:
mattsb42-aws 2019-08-27 13:42:36 -07:00
commit 98581b9196
4 changed files with 550 additions and 558 deletions

View file

@ -8,6 +8,7 @@ import six
from moto.core.responses import BaseResponse
from .models import kms_backends
from .exceptions import NotFoundException, ValidationException, AlreadyExistsException, NotAuthorizedException
from .utils import decrypt, encrypt
reserved_aliases = [
'alias/aws/ebs',
@ -21,7 +22,13 @@ class KmsResponse(BaseResponse):
@property
def parameters(self):
return json.loads(self.body)
params = json.loads(self.body)
for key in ("Plaintext", "CiphertextBlob"):
if key in params:
params[key] = base64.b64decode(params[key].encode("utf-8"))
return params
@property
def kms_backend(self):
@ -224,24 +231,34 @@ class KmsResponse(BaseResponse):
return json.dumps({'Truncated': False, 'PolicyNames': ['default']})
def encrypt(self):
"""
We perform no encryption, we just encode the value as base64 and then
decode it in decrypt().
"""
value = self.parameters.get("Plaintext")
if isinstance(value, six.text_type):
value = value.encode('utf-8')
return json.dumps({"CiphertextBlob": base64.b64encode(value).decode("utf-8"), 'KeyId': 'key_id'})
key_id = self.parameters.get("KeyId")
encryption_context = self.parameters.get('EncryptionContext', {})
plaintext = self.parameters.get("Plaintext")
if isinstance(plaintext, six.text_type):
plaintext = plaintext.encode('utf-8')
ciphertext_blob, arn = self.kms_backend.encrypt(
key_id=key_id,
plaintext=plaintext,
encryption_context=encryption_context,
)
ciphertext_blob_response = base64.b64encode(ciphertext_blob).decode("utf-8")
return json.dumps({"CiphertextBlob": ciphertext_blob_response, "KeyId": arn})
def decrypt(self):
# TODO refuse decode if EncryptionContext is not the same as when it was encrypted / generated
ciphertext_blob = self.parameters.get("CiphertextBlob")
encryption_context = self.parameters.get('EncryptionContext', {})
value = self.parameters.get("CiphertextBlob")
try:
return json.dumps({"Plaintext": base64.b64decode(value).decode("utf-8"), 'KeyId': 'key_id'})
except UnicodeDecodeError:
# Generate data key will produce random bytes which when decrypted is still returned as base64
return json.dumps({"Plaintext": value})
plaintext, arn = self.kms_backend.decrypt(
ciphertext_blob=ciphertext_blob,
encryption_context=encryption_context,
)
plaintext_response = base64.b64encode(plaintext).decode("utf-8")
return json.dumps({"Plaintext": plaintext_response, 'KeyId': arn})
def disable_key(self):
key_id = self.parameters.get('KeyId')
@ -291,7 +308,7 @@ class KmsResponse(BaseResponse):
def generate_data_key(self):
key_id = self.parameters.get('KeyId')
encryption_context = self.parameters.get('EncryptionContext')
encryption_context = self.parameters.get('EncryptionContext', {})
number_of_bytes = self.parameters.get('NumberOfBytes')
key_spec = self.parameters.get('KeySpec')
grant_tokens = self.parameters.get('GrantTokens')
@ -306,27 +323,39 @@ class KmsResponse(BaseResponse):
raise NotFoundException('Invalid keyId')
if number_of_bytes and (number_of_bytes > 1024 or number_of_bytes < 0):
raise ValidationException("1 validation error detected: Value '2048' at 'numberOfBytes' failed "
"to satisfy constraint: Member must have value less than or "
"equal to 1024")
raise ValidationException((
"1 validation error detected: Value '{number_of_bytes:d}' at 'numberOfBytes' failed "
"to satisfy constraint: Member must have value less than or "
"equal to 1024"
).format(number_of_bytes=number_of_bytes)
)
if key_spec and key_spec not in ('AES_256', 'AES_128'):
raise ValidationException("1 validation error detected: Value 'AES_257' at 'keySpec' failed "
"to satisfy constraint: Member must satisfy enum value set: "
"[AES_256, AES_128]")
raise ValidationException((
"1 validation error detected: Value '{key_spec}' at 'keySpec' failed "
"to satisfy constraint: Member must satisfy enum value set: "
"[AES_256, AES_128]"
).format(key_spec=key_spec)
)
if not key_spec and not number_of_bytes:
raise ValidationException("Please specify either number of bytes or key spec.")
if key_spec and number_of_bytes:
raise ValidationException("Please specify either number of bytes or key spec.")
plaintext, key_arn = self.kms_backend.generate_data_key(key_id, encryption_context,
number_of_bytes, key_spec, grant_tokens)
plaintext, ciphertext_blob, key_arn = self.kms_backend.generate_data_key(
key_id=key_id,
encryption_context=encryption_context,
number_of_bytes=number_of_bytes,
key_spec=key_spec,
grant_tokens=grant_tokens
)
plaintext = base64.b64encode(plaintext).decode()
plaintext_response = base64.b64encode(plaintext).decode("utf-8")
ciphertext_blob_response = base64.b64encode(ciphertext_blob).decode("utf-8")
return json.dumps({
'CiphertextBlob': plaintext,
'Plaintext': plaintext,
'CiphertextBlob': ciphertext_blob_response,
'Plaintext': plaintext_response,
'KeyId': key_arn # not alias
})