Add validity dates to IoT fakecerts

This commit is contained in:
cătălin 2021-07-23 11:49:52 +02:00
commit e94a3e39df
2 changed files with 85 additions and 80 deletions

View file

@ -12,6 +12,7 @@ from datetime import datetime
from boto3 import Session
from moto.core import BaseBackend, BaseModel
from moto.utilities.utils import random_string
from .exceptions import (
CertificateStateException,
DeleteConflictException,
@ -21,7 +22,6 @@ from .exceptions import (
VersionConflictException,
ResourceAlreadyExistsException,
)
from moto.utilities.utils import random_string
class FakeThing(BaseModel):
@ -73,12 +73,12 @@ class FakeThingType(BaseModel):
class FakeThingGroup(BaseModel):
def __init__(
self,
thing_group_name,
parent_group_name,
thing_group_properties,
region_name,
thing_groups,
self,
thing_group_name,
parent_group_name,
thing_group_properties,
region_name,
thing_groups,
):
self.region_name = region_name
self.thing_group_name = thing_group_name
@ -144,7 +144,8 @@ class FakeCertificate(BaseModel):
self.transfer_data = {}
self.creation_date = time.time()
self.last_modified_date = self.creation_date
self.validity_not_before = (time.time() - 86400)
self.validity_not_after = (time.time() + 86400)
self.ca_certificate_id = None
self.ca_certificate_pem = ca_certificate_pem
if ca_certificate_pem:
@ -174,6 +175,10 @@ class FakeCertificate(BaseModel):
"ownedBy": self.owner,
"creationDate": self.creation_date,
"lastModifiedDate": self.last_modified_date,
"validity": {
"notBefore": self.validity_not_before,
"notAfter": self.validity_not_after
},
"transferData": self.transfer_data,
}
@ -250,17 +255,17 @@ class FakeJob(BaseModel):
JOB_ID_REGEX = re.compile(JOB_ID_REGEX_PATTERN)
def __init__(
self,
job_id,
targets,
document_source,
document,
description,
presigned_url_config,
target_selection,
job_executions_rollout_config,
document_parameters,
region_name,
self,
job_id,
targets,
document_source,
document,
description,
presigned_url_config,
target_selection,
job_executions_rollout_config,
document_parameters,
region_name,
):
if not self._job_id_matcher(self.JOB_ID_REGEX, job_id):
raise InvalidRequestException()
@ -326,12 +331,12 @@ class FakeJob(BaseModel):
class FakeJobExecution(BaseModel):
def __init__(
self,
job_id,
thing_arn,
status="QUEUED",
force_canceled=False,
status_details_map={},
self,
job_id,
thing_arn,
status="QUEUED",
force_canceled=False,
status_details_map={},
):
self.job_id = job_id
self.status = status # IN_PROGRESS | CANCELED | COMPLETED
@ -429,17 +434,17 @@ class FakeEndpoint(BaseModel):
class FakeRule(BaseModel):
def __init__(
self,
rule_name,
description,
created_at,
rule_disabled,
topic_pattern,
actions,
error_action,
sql,
aws_iot_sql_version,
region_name,
self,
rule_name,
description,
created_at,
rule_disabled,
topic_pattern,
actions,
error_action,
sql,
aws_iot_sql_version,
region_name,
):
self.region_name = region_name
self.rule_name = rule_name
@ -539,16 +544,16 @@ class IoTBackend(BaseBackend):
return self.thing_types.values()
def list_things(
self, attribute_name, attribute_value, thing_type_name, max_results, token
self, attribute_name, attribute_value, thing_type_name, max_results, token
):
all_things = [_.to_dict() for _ in self.things.values()]
if attribute_name is not None and thing_type_name is not None:
filtered_things = list(
filter(
lambda elem: attribute_name in elem["attributes"]
and elem["attributes"][attribute_name] == attribute_value
and "thingTypeName" in elem
and elem["thingTypeName"] == thing_type_name,
and elem["attributes"][attribute_name] == attribute_value
and "thingTypeName" in elem
and elem["thingTypeName"] == thing_type_name,
all_things,
)
)
@ -556,7 +561,7 @@ class IoTBackend(BaseBackend):
filtered_things = list(
filter(
lambda elem: attribute_name in elem["attributes"]
and elem["attributes"][attribute_name] == attribute_value,
and elem["attributes"][attribute_name] == attribute_value,
all_things,
)
)
@ -564,7 +569,7 @@ class IoTBackend(BaseBackend):
filtered_things = list(
filter(
lambda elem: "thingTypeName" in elem
and elem["thingTypeName"] == thing_type_name,
and elem["thingTypeName"] == thing_type_name,
all_things,
)
)
@ -578,7 +583,7 @@ class IoTBackend(BaseBackend):
)
else:
token = int(token)
things = filtered_things[token : token + max_results]
things = filtered_things[token: token + max_results]
next_token = (
str(token + max_results)
if len(filtered_things) > token + max_results
@ -624,12 +629,12 @@ class IoTBackend(BaseBackend):
del self.thing_types[thing_type.arn]
def update_thing(
self,
thing_name,
thing_type_name,
attribute_payload,
expected_version,
remove_thing_type,
self,
thing_name,
thing_type_name,
attribute_payload,
expected_version,
remove_thing_type,
):
# if attributes payload = {}, nothing
thing = self.describe_thing(thing_name)
@ -730,7 +735,7 @@ class IoTBackend(BaseBackend):
)
def register_certificate(
self, certificate_pem, ca_certificate_pem, set_as_active, status
self, certificate_pem, ca_certificate_pem, set_as_active, status
):
certificate = FakeCertificate(
certificate_pem,
@ -954,7 +959,7 @@ class IoTBackend(BaseBackend):
return thing_groups[0]
def create_thing_group(
self, thing_group_name, parent_group_name, thing_group_properties
self, thing_group_name, parent_group_name, thing_group_properties
):
thing_group = FakeThingGroup(
thing_group_name,
@ -1012,7 +1017,7 @@ class IoTBackend(BaseBackend):
]
def update_thing_group(
self, thing_group_name, thing_group_properties, expected_version
self, thing_group_name, thing_group_properties, expected_version
):
thing_group = self.describe_thing_group(thing_group_name)
if expected_version and expected_version != thing_group.version:
@ -1071,7 +1076,7 @@ class IoTBackend(BaseBackend):
return thing
def add_thing_to_thing_group(
self, thing_group_name, thing_group_arn, thing_name, thing_arn
self, thing_group_name, thing_group_arn, thing_name, thing_arn
):
thing_group = self._identify_thing_group(thing_group_name, thing_group_arn)
thing = self._identify_thing(thing_name, thing_arn)
@ -1081,7 +1086,7 @@ class IoTBackend(BaseBackend):
thing_group.things[thing.arn] = thing
def remove_thing_from_thing_group(
self, thing_group_name, thing_group_arn, thing_name, thing_arn
self, thing_group_name, thing_group_arn, thing_name, thing_arn
):
thing_group = self._identify_thing_group(thing_group_name, thing_group_arn)
thing = self._identify_thing(thing_name, thing_arn)
@ -1109,7 +1114,7 @@ class IoTBackend(BaseBackend):
return ret
def update_thing_groups_for_thing(
self, thing_name, thing_groups_to_add, thing_groups_to_remove
self, thing_name, thing_groups_to_add, thing_groups_to_remove
):
thing = self.describe_thing(thing_name)
for thing_group_name in thing_groups_to_add:
@ -1124,16 +1129,16 @@ class IoTBackend(BaseBackend):
)
def create_job(
self,
job_id,
targets,
document_source,
document,
description,
presigned_url_config,
target_selection,
job_executions_rollout_config,
document_parameters,
self,
job_id,
targets,
document_source,
document,
description,
presigned_url_config,
target_selection,
job_executions_rollout_config,
document_parameters,
):
job = FakeJob(
job_id,
@ -1192,13 +1197,13 @@ class IoTBackend(BaseBackend):
return self.jobs[job_id]
def list_jobs(
self,
status,
target_selection,
max_results,
token,
thing_group_name,
thing_group_id,
self,
status,
target_selection,
max_results,
token,
thing_group_name,
thing_group_id,
):
# TODO: implement filters
all_jobs = [_.to_dict() for _ in self.jobs.values()]
@ -1209,7 +1214,7 @@ class IoTBackend(BaseBackend):
next_token = str(max_results) if len(filtered_jobs) > max_results else None
else:
token = int(token)
jobs = filtered_jobs[token : token + max_results]
jobs = filtered_jobs[token: token + max_results]
next_token = (
str(token + max_results)
if len(filtered_jobs) > token + max_results
@ -1225,15 +1230,15 @@ class IoTBackend(BaseBackend):
raise ResourceNotFoundException()
if job_execution is None or (
execution_number is not None
and job_execution.execution_number != execution_number
execution_number is not None
and job_execution.execution_number != execution_number
):
raise ResourceNotFoundException()
return job_execution
def cancel_job_execution(
self, job_id, thing_name, force, expected_version, status_details
self, job_id, thing_name, force, expected_version, status_details
):
job_execution = self.job_executions[(job_id, thing_name)]
@ -1288,7 +1293,7 @@ class IoTBackend(BaseBackend):
next_token = str(max_results) if len(job_executions) > max_results else None
else:
token = int(token)
job_executions = job_executions[token : token + max_results]
job_executions = job_executions[token: token + max_results]
next_token = (
str(token + max_results)
if len(job_executions) > token + max_results
@ -1298,7 +1303,7 @@ class IoTBackend(BaseBackend):
return job_executions, next_token
def list_job_executions_for_thing(
self, thing_name, status, max_results, next_token
self, thing_name, status, max_results, next_token
):
job_executions = [
self.job_executions[je].to_dict()
@ -1320,7 +1325,7 @@ class IoTBackend(BaseBackend):
next_token = str(max_results) if len(job_executions) > max_results else None
else:
token = int(token)
job_executions = job_executions[token : token + max_results]
job_executions = job_executions[token: token + max_results]
next_token = (
str(token + max_results)
if len(job_executions) > token + max_results

View file

@ -49,7 +49,7 @@ class IoTResponse(BaseResponse):
next_token = str(max_results) if len(thing_types) > max_results else None
else:
token = int(previous_next_token)
result = thing_types[token : token + max_results]
result = thing_types[token: token + max_results]
next_token = (
str(token + max_results)
if len(thing_types) > token + max_results