Nearly finished implementation and tests

This commit is contained in:
Terry Cain 2017-09-22 11:21:36 +01:00
commit edbbbf6d20
3 changed files with 287 additions and 10 deletions

View file

@ -43,6 +43,11 @@ MqO5tzHpCvX2HzLc
# so for now a cheap response is just give any old root CA
def datetime_to_epoch(date):
# As only Py3 has datetime.timestamp()
return int((date - datetime.datetime(1970, 1, 1)).total_seconds())
class AWSError(Exception):
TYPE = None
STATUS = 400
@ -64,7 +69,8 @@ class AWSResourceNotFoundException(AWSError):
class CertBundle(BaseModel):
def __init__(self, certificate, private_key, chain=None, region='us-east-1', arn=None):
def __init__(self, certificate, private_key, chain=None, region='us-east-1', arn=None, cert_type='IMPORTED'):
self.created_at = datetime.datetime.now()
self.cert = certificate
self._cert = None
self.common_name = None
@ -73,6 +79,7 @@ class CertBundle(BaseModel):
self.chain = chain
self.tags = {}
self._chain = None
self.type = cert_type # Should really be an enum
# AWS always returns your chain + root CA
if self.chain is None:
@ -132,10 +139,13 @@ class CertBundle(BaseModel):
self._chain = []
for cert_armored in self.chain.split(b'-\n-'):
# Would leave encoded but Py2 does not have raw binary strings
cert_armored = cert_armored.decode()
# Fix missing -'s on split
cert_armored = re.sub(rb'^----B', b'-----B', cert_armored)
cert_armored = re.sub(rb'E----$', b'E-----', cert_armored)
cert = cryptography.x509.load_pem_x509_certificate(cert_armored, default_backend())
cert_armored = re.sub(r'^----B', '-----B', cert_armored)
cert_armored = re.sub(r'E----$', 'E-----', cert_armored)
cert = cryptography.x509.load_pem_x509_certificate(cert_armored.encode(), default_backend())
self._chain.append(cert)
now = datetime.datetime.now()
@ -150,6 +160,42 @@ class CertBundle(BaseModel):
raise
raise AWSValidationException('The certificate is not PEM-encoded or is not valid.')
def describe(self):
#'RenewalSummary': {}, # Only when cert is amazon issued
if self._key.key_size == 1024:
key_algo = 'RSA_1024'
elif self._key.key_size == 2048:
key_algo = 'RSA_2048'
else:
key_algo = 'EC_prime256v1'
result = {
'Certificate': {
'CertificateArn': self.arn,
'DomainName': self.common_name,
'InUseBy': [],
'Issuer': self._cert.issuer.get_attributes_for_oid(cryptography.x509.OID_COMMON_NAME)[0].value,
'KeyAlgorithm': key_algo,
'NotAfter': datetime_to_epoch(self._cert.not_valid_after),
'NotBefore': datetime_to_epoch(self._cert.not_valid_before),
'Serial': self._cert.serial,
'SignatureAlgorithm': self._cert.signature_algorithm_oid._name.upper().replace('ENCRYPTION', ''),
'Status': 'ISSUED', # One of PENDING_VALIDATION, ISSUED, INACTIVE, EXPIRED, VALIDATION_TIMED_OUT, REVOKED, FAILED.
'Subject': 'CN={0}'.format(self.common_name),
'SubjectAlternativeNames': [],
'Type': self.type # One of IMPORTED, AMAZON_ISSUED
}
}
if self.type == 'IMPORTED':
result['Certificate']['ImportedAt'] = datetime_to_epoch(self.created_at)
else:
result['Certificate']['CreatedAt'] = datetime_to_epoch(self.created_at)
result['Certificate']['IssuedAt'] = datetime_to_epoch(self.created_at)
return result
def __str__(self):
return self.arn

View file

@ -30,7 +30,7 @@ class AWSCertificateManagerResponse(BaseResponse):
def add_tags_to_certificate(self):
arn = self._get_param('CertificateArn')
tags = self._get_list_prefix('Tags')
tags = self._get_param('Tags')
if arn is None:
msg = 'A required parameter for the specified action is not supplied.'
@ -58,7 +58,18 @@ class AWSCertificateManagerResponse(BaseResponse):
return ''
def describe_certificate(self):
raise NotImplementedError()
arn = self._get_param('CertificateArn')
if arn is None:
msg = 'A required parameter for the specified action is not supplied.'
return {'__type': 'MissingParameter', 'message': msg}, dict(status=400)
try:
cert_bundle = self.acm_backend.get_certificate(arn)
except AWSError as err:
return err.response()
return json.dumps(cert_bundle.describe())
def get_certificate(self):
arn = self._get_param('CertificateArn')
@ -79,7 +90,18 @@ class AWSCertificateManagerResponse(BaseResponse):
return json.dumps(result)
def import_certificate(self):
# TODO comment on what raises exceptions for all branches
"""
Returns errors on:
Certificate, PrivateKey or Chain not being properly formatted
Arn not existing if its provided
PrivateKey size > 2048
Certificate expired or is not yet in effect
Does not return errors on:
Checking Certificate is legit, or a selfsigned chain is provided
:return: str(JSON) for response
"""
certificate = self._get_param('Certificate')
private_key = self._get_param('PrivateKey')
chain = self._get_param('CertificateChain') # Optional
@ -134,7 +156,7 @@ class AWSCertificateManagerResponse(BaseResponse):
result = {'Tags': []}
# Tag "objects" can not contain the Value part
for key, value in cert_bundle.tags:
for key, value in cert_bundle.tags.items():
tag_dict = {'Key': key}
if value is not None:
tag_dict['Value'] = value
@ -144,7 +166,7 @@ class AWSCertificateManagerResponse(BaseResponse):
def remove_tags_from_certificate(self):
arn = self._get_param('CertificateArn')
tags = self._get_list_prefix('Tags')
tags = self._get_param('Tags')
if arn is None:
msg = 'A required parameter for the specified action is not supplied.'