From e94a3e39dfb68c4e8a96cd6e88e4fbc502799439 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?c=C4=83t=C4=83lin?= Date: Fri, 23 Jul 2021 11:49:52 +0200 Subject: [PATCH] Add validity dates to IoT fakecerts --- moto/iot/models.py | 163 ++++++++++++++++++++++-------------------- moto/iot/responses.py | 2 +- 2 files changed, 85 insertions(+), 80 deletions(-) diff --git a/moto/iot/models.py b/moto/iot/models.py index 17ea65dc..ba311094 100644 --- a/moto/iot/models.py +++ b/moto/iot/models.py @@ -12,6 +12,7 @@ from datetime import datetime from boto3 import Session from moto.core import BaseBackend, BaseModel +from moto.utilities.utils import random_string from .exceptions import ( CertificateStateException, DeleteConflictException, @@ -21,7 +22,6 @@ from .exceptions import ( VersionConflictException, ResourceAlreadyExistsException, ) -from moto.utilities.utils import random_string class FakeThing(BaseModel): @@ -73,12 +73,12 @@ class FakeThingType(BaseModel): class FakeThingGroup(BaseModel): def __init__( - self, - thing_group_name, - parent_group_name, - thing_group_properties, - region_name, - thing_groups, + self, + thing_group_name, + parent_group_name, + thing_group_properties, + region_name, + thing_groups, ): self.region_name = region_name self.thing_group_name = thing_group_name @@ -144,7 +144,8 @@ class FakeCertificate(BaseModel): self.transfer_data = {} self.creation_date = time.time() self.last_modified_date = self.creation_date - + self.validity_not_before = (time.time() - 86400) + self.validity_not_after = (time.time() + 86400) self.ca_certificate_id = None self.ca_certificate_pem = ca_certificate_pem if ca_certificate_pem: @@ -174,6 +175,10 @@ class FakeCertificate(BaseModel): "ownedBy": self.owner, "creationDate": self.creation_date, "lastModifiedDate": self.last_modified_date, + "validity": { + "notBefore": self.validity_not_before, + "notAfter": self.validity_not_after + }, "transferData": self.transfer_data, } @@ -250,17 +255,17 @@ class FakeJob(BaseModel): JOB_ID_REGEX = re.compile(JOB_ID_REGEX_PATTERN) def __init__( - self, - job_id, - targets, - document_source, - document, - description, - presigned_url_config, - target_selection, - job_executions_rollout_config, - document_parameters, - region_name, + self, + job_id, + targets, + document_source, + document, + description, + presigned_url_config, + target_selection, + job_executions_rollout_config, + document_parameters, + region_name, ): if not self._job_id_matcher(self.JOB_ID_REGEX, job_id): raise InvalidRequestException() @@ -326,12 +331,12 @@ class FakeJob(BaseModel): class FakeJobExecution(BaseModel): def __init__( - self, - job_id, - thing_arn, - status="QUEUED", - force_canceled=False, - status_details_map={}, + self, + job_id, + thing_arn, + status="QUEUED", + force_canceled=False, + status_details_map={}, ): self.job_id = job_id self.status = status # IN_PROGRESS | CANCELED | COMPLETED @@ -429,17 +434,17 @@ class FakeEndpoint(BaseModel): class FakeRule(BaseModel): def __init__( - self, - rule_name, - description, - created_at, - rule_disabled, - topic_pattern, - actions, - error_action, - sql, - aws_iot_sql_version, - region_name, + self, + rule_name, + description, + created_at, + rule_disabled, + topic_pattern, + actions, + error_action, + sql, + aws_iot_sql_version, + region_name, ): self.region_name = region_name self.rule_name = rule_name @@ -539,16 +544,16 @@ class IoTBackend(BaseBackend): return self.thing_types.values() def list_things( - self, attribute_name, attribute_value, thing_type_name, max_results, token + self, attribute_name, attribute_value, thing_type_name, max_results, token ): all_things = [_.to_dict() for _ in self.things.values()] if attribute_name is not None and thing_type_name is not None: filtered_things = list( filter( lambda elem: attribute_name in elem["attributes"] - and elem["attributes"][attribute_name] == attribute_value - and "thingTypeName" in elem - and elem["thingTypeName"] == thing_type_name, + and elem["attributes"][attribute_name] == attribute_value + and "thingTypeName" in elem + and elem["thingTypeName"] == thing_type_name, all_things, ) ) @@ -556,7 +561,7 @@ class IoTBackend(BaseBackend): filtered_things = list( filter( lambda elem: attribute_name in elem["attributes"] - and elem["attributes"][attribute_name] == attribute_value, + and elem["attributes"][attribute_name] == attribute_value, all_things, ) ) @@ -564,7 +569,7 @@ class IoTBackend(BaseBackend): filtered_things = list( filter( lambda elem: "thingTypeName" in elem - and elem["thingTypeName"] == thing_type_name, + and elem["thingTypeName"] == thing_type_name, all_things, ) ) @@ -578,7 +583,7 @@ class IoTBackend(BaseBackend): ) else: token = int(token) - things = filtered_things[token : token + max_results] + things = filtered_things[token: token + max_results] next_token = ( str(token + max_results) if len(filtered_things) > token + max_results @@ -624,12 +629,12 @@ class IoTBackend(BaseBackend): del self.thing_types[thing_type.arn] def update_thing( - self, - thing_name, - thing_type_name, - attribute_payload, - expected_version, - remove_thing_type, + self, + thing_name, + thing_type_name, + attribute_payload, + expected_version, + remove_thing_type, ): # if attributes payload = {}, nothing thing = self.describe_thing(thing_name) @@ -730,7 +735,7 @@ class IoTBackend(BaseBackend): ) def register_certificate( - self, certificate_pem, ca_certificate_pem, set_as_active, status + self, certificate_pem, ca_certificate_pem, set_as_active, status ): certificate = FakeCertificate( certificate_pem, @@ -954,7 +959,7 @@ class IoTBackend(BaseBackend): return thing_groups[0] def create_thing_group( - self, thing_group_name, parent_group_name, thing_group_properties + self, thing_group_name, parent_group_name, thing_group_properties ): thing_group = FakeThingGroup( thing_group_name, @@ -1012,7 +1017,7 @@ class IoTBackend(BaseBackend): ] def update_thing_group( - self, thing_group_name, thing_group_properties, expected_version + self, thing_group_name, thing_group_properties, expected_version ): thing_group = self.describe_thing_group(thing_group_name) if expected_version and expected_version != thing_group.version: @@ -1071,7 +1076,7 @@ class IoTBackend(BaseBackend): return thing def add_thing_to_thing_group( - self, thing_group_name, thing_group_arn, thing_name, thing_arn + self, thing_group_name, thing_group_arn, thing_name, thing_arn ): thing_group = self._identify_thing_group(thing_group_name, thing_group_arn) thing = self._identify_thing(thing_name, thing_arn) @@ -1081,7 +1086,7 @@ class IoTBackend(BaseBackend): thing_group.things[thing.arn] = thing def remove_thing_from_thing_group( - self, thing_group_name, thing_group_arn, thing_name, thing_arn + self, thing_group_name, thing_group_arn, thing_name, thing_arn ): thing_group = self._identify_thing_group(thing_group_name, thing_group_arn) thing = self._identify_thing(thing_name, thing_arn) @@ -1109,7 +1114,7 @@ class IoTBackend(BaseBackend): return ret def update_thing_groups_for_thing( - self, thing_name, thing_groups_to_add, thing_groups_to_remove + self, thing_name, thing_groups_to_add, thing_groups_to_remove ): thing = self.describe_thing(thing_name) for thing_group_name in thing_groups_to_add: @@ -1124,16 +1129,16 @@ class IoTBackend(BaseBackend): ) def create_job( - self, - job_id, - targets, - document_source, - document, - description, - presigned_url_config, - target_selection, - job_executions_rollout_config, - document_parameters, + self, + job_id, + targets, + document_source, + document, + description, + presigned_url_config, + target_selection, + job_executions_rollout_config, + document_parameters, ): job = FakeJob( job_id, @@ -1192,13 +1197,13 @@ class IoTBackend(BaseBackend): return self.jobs[job_id] def list_jobs( - self, - status, - target_selection, - max_results, - token, - thing_group_name, - thing_group_id, + self, + status, + target_selection, + max_results, + token, + thing_group_name, + thing_group_id, ): # TODO: implement filters all_jobs = [_.to_dict() for _ in self.jobs.values()] @@ -1209,7 +1214,7 @@ class IoTBackend(BaseBackend): next_token = str(max_results) if len(filtered_jobs) > max_results else None else: token = int(token) - jobs = filtered_jobs[token : token + max_results] + jobs = filtered_jobs[token: token + max_results] next_token = ( str(token + max_results) if len(filtered_jobs) > token + max_results @@ -1225,15 +1230,15 @@ class IoTBackend(BaseBackend): raise ResourceNotFoundException() if job_execution is None or ( - execution_number is not None - and job_execution.execution_number != execution_number + execution_number is not None + and job_execution.execution_number != execution_number ): raise ResourceNotFoundException() return job_execution def cancel_job_execution( - self, job_id, thing_name, force, expected_version, status_details + self, job_id, thing_name, force, expected_version, status_details ): job_execution = self.job_executions[(job_id, thing_name)] @@ -1288,7 +1293,7 @@ class IoTBackend(BaseBackend): next_token = str(max_results) if len(job_executions) > max_results else None else: token = int(token) - job_executions = job_executions[token : token + max_results] + job_executions = job_executions[token: token + max_results] next_token = ( str(token + max_results) if len(job_executions) > token + max_results @@ -1298,7 +1303,7 @@ class IoTBackend(BaseBackend): return job_executions, next_token def list_job_executions_for_thing( - self, thing_name, status, max_results, next_token + self, thing_name, status, max_results, next_token ): job_executions = [ self.job_executions[je].to_dict() @@ -1320,7 +1325,7 @@ class IoTBackend(BaseBackend): next_token = str(max_results) if len(job_executions) > max_results else None else: token = int(token) - job_executions = job_executions[token : token + max_results] + job_executions = job_executions[token: token + max_results] next_token = ( str(token + max_results) if len(job_executions) > token + max_results diff --git a/moto/iot/responses.py b/moto/iot/responses.py index c0de6007..3a2804d9 100644 --- a/moto/iot/responses.py +++ b/moto/iot/responses.py @@ -49,7 +49,7 @@ class IoTResponse(BaseResponse): next_token = str(max_results) if len(thing_types) > max_results else None else: token = int(previous_next_token) - result = thing_types[token : token + max_results] + result = thing_types[token: token + max_results] next_token = ( str(token + max_results) if len(thing_types) > token + max_results