import hashlib import json import re import time from collections import OrderedDict from dataclasses import asdict, dataclass, field, replace from datetime import datetime, timedelta from json import JSONDecodeError from typing import ( TYPE_CHECKING, Any, Dict, Iterable, List, Optional, Pattern, Tuple, Union, ) from cryptography import x509 from cryptography.hazmat._oid import NameOID from cryptography.hazmat.backends import default_backend from cryptography.hazmat.primitives import hashes, serialization from cryptography.hazmat.primitives.asymmetric import rsa from moto.core.base_backend import BackendDict, BaseBackend from moto.core.common_models import BaseModel, CloudFormationModel from moto.core.utils import utcnow from moto.moto_api._internal import mock_random as random from moto.settings import iot_use_valid_cert from moto.utilities.paginator import paginate from moto.utilities.utils import get_partition from .exceptions import ( CertificateStateException, ConflictException, DeleteConflictException, InvalidRequestException, InvalidStateTransitionException, ResourceAlreadyExistsException, ResourceNotFoundException, ThingStillAttached, VersionConflictException, VersionsLimitExceededException, ) from .utils import PAGINATION_MODEL, decapitalize_dict if TYPE_CHECKING: from moto.iotdata.models import FakeShadow class FakeThing(CloudFormationModel): def __init__( self, thing_name: str, thing_type: Optional["FakeThingType"], attributes: Dict[str, Any], account_id: str, region_name: str, ): self.region_name = region_name self.account_id = account_id self.thing_id = str(random.uuid4()) self.thing_name = thing_name self.thing_type = thing_type self.attributes = attributes self.arn = f"arn:{get_partition(region_name)}:iot:{region_name}:{account_id}:thing/{thing_name}" self.version = 1 # TODO: we need to handle "version"? # for iot-data self.thing_shadows: Dict[Optional[str], FakeShadow] = {} def matches(self, query_string: str) -> bool: if query_string == "*": return True if query_string.startswith("thingTypeName:"): if not self.thing_type: return False qs = query_string[14:].replace("*", ".*").replace("?", ".") return re.search(f"^{qs}$", self.thing_type.thing_type_name) is not None if query_string.startswith("thingName:"): qs = query_string[10:].replace("*", ".*").replace("?", ".") return re.search(f"^{qs}$", self.thing_name) is not None if query_string.startswith("attributes."): k, v = query_string[11:].split(":") return self.attributes.get(k) == v return query_string in self.thing_name def to_dict( self, include_default_client_id: bool = False, include_connectivity: bool = False, include_thing_id: bool = False, include_thing_group_names: bool = False, include_shadows_as_json: bool = False, ) -> Dict[str, Any]: obj = { "thingName": self.thing_name, "thingArn": self.arn, "attributes": self.attributes, "version": self.version, } if self.thing_type: obj["thingTypeName"] = self.thing_type.thing_type_name if include_default_client_id: obj["defaultClientId"] = self.thing_name if include_connectivity: obj["connectivity"] = { "connected": True, "timestamp": time.mktime(utcnow().timetuple()), } if include_thing_id: obj["thingId"] = self.thing_id if include_thing_group_names: iot_backend = iot_backends[self.account_id][self.region_name] obj["thingGroupNames"] = [ thing_group.thing_group_name for thing_group in iot_backend.list_thing_groups(None, None, None) if self.arn in thing_group.things ] if include_shadows_as_json: named_shadows = { shadow_name: shadow.to_dict() for shadow_name, shadow in self.thing_shadows.items() if shadow_name is not None } converted_response_format = { shadow_name: { **shadow_data["state"], "metadata": shadow_data["metadata"], "version": shadow_data["version"], "hasDelta": "delta" in shadow_data["state"], } for shadow_name, shadow_data in named_shadows.items() } obj["shadow"] = json.dumps({"name": converted_response_format}) return obj @staticmethod def cloudformation_name_type() -> str: return "Thing" @staticmethod def cloudformation_type() -> str: return "AWS::IoT::Thing" @classmethod def has_cfn_attr(cls, attr: str) -> bool: return attr in [ "Arn", "Id", ] def get_cfn_attribute(self, attribute_name: str) -> Any: from moto.cloudformation.exceptions import UnformattedGetAttTemplateException if attribute_name == "Arn": return self.arn elif attribute_name == "Id": return self.thing_id raise UnformattedGetAttTemplateException() @classmethod def create_from_cloudformation_json( # type: ignore[misc] cls, resource_name: str, cloudformation_json: Any, account_id: str, region_name: str, **kwargs: Any, ) -> "FakeThing": iot_backend = iot_backends[account_id][region_name] properties = cloudformation_json["Properties"] thing_name = properties.get("ThingName", resource_name) attribute_payload = properties.get("AttributePayload", "") try: attributes = json.loads(attribute_payload) except JSONDecodeError: attributes = None return iot_backend.create_thing( thing_name=thing_name, thing_type_name="", attribute_payload=attributes ) @classmethod def update_from_cloudformation_json( # type: ignore[misc] cls, original_resource: "FakeThing", new_resource_name: str, cloudformation_json: Any, account_id: str, region_name: str, ) -> "FakeThing": iot_backend = iot_backends[account_id][region_name] properties = cloudformation_json["Properties"] thing_name = properties.get("ThingName", new_resource_name) attribute_payload = properties.get("AttributePayload", "") try: attributes = json.loads(attribute_payload) except JSONDecodeError: attributes = None if thing_name != original_resource.thing_name: iot_backend.delete_thing(original_resource.thing_name) return cls.create_from_cloudformation_json( new_resource_name, cloudformation_json, account_id, region_name ) else: iot_backend.update_thing( thing_name=thing_name, thing_type_name="", attribute_payload=attributes, remove_thing_type=False, ) return original_resource @classmethod def delete_from_cloudformation_json( # type: ignore[misc] cls, resource_name: str, cloudformation_json: Any, account_id: str, region_name: str, ) -> None: iot_backend = iot_backends[account_id][region_name] properties = cloudformation_json["Properties"] thing_name = properties.get("ThingName", resource_name) iot_backend.delete_thing(thing_name=thing_name) class FakeThingType(CloudFormationModel): def __init__( self, thing_type_name: str, thing_type_properties: Optional[Dict[str, Any]], account_id: str, region_name: str, ): self.account_id = account_id self.region_name = region_name self.thing_type_name = thing_type_name self.thing_type_properties = thing_type_properties self.thing_type_id = str(random.uuid4()) # I don't know the rule of id t = time.time() self.metadata = {"deprecated": False, "creationDate": int(t * 1000) / 1000.0} self.arn = f"arn:{get_partition(self.region_name)}:iot:{self.region_name}:{self.account_id}:thingtype/{thing_type_name}" def to_dict(self) -> Dict[str, Any]: return { "thingTypeName": self.thing_type_name, "thingTypeId": self.thing_type_id, "thingTypeProperties": self.thing_type_properties, "thingTypeMetadata": self.metadata, "thingTypeArn": self.arn, } @staticmethod def cloudformation_name_type() -> str: return "ThingType" @staticmethod def cloudformation_type() -> str: return "AWS::IoT::ThingType" @classmethod def has_cfn_attr(cls, attr: str) -> bool: return attr in [ "Arn", "Id", ] def get_cfn_attribute(self, attribute_name: str) -> Any: from moto.cloudformation.exceptions import UnformattedGetAttTemplateException if attribute_name == "Arn": return self.arn elif attribute_name == "Id": return self.thing_type_id raise UnformattedGetAttTemplateException() @classmethod def create_from_cloudformation_json( # type: ignore[misc] cls, resource_name: str, cloudformation_json: Any, account_id: str, region_name: str, **kwargs: Any, ) -> "FakeThingType": iot_backend = iot_backends[account_id][region_name] properties = cloudformation_json["Properties"] thing_type_name = properties.get("ThingTypeName", resource_name) thing_type_properties = properties.get("ThingTypeProperties", {}) type_name, type_arn = iot_backend.create_thing_type( thing_type_name=thing_type_name, thing_type_properties=thing_type_properties ) return iot_backend.thing_types[type_arn] @classmethod def update_from_cloudformation_json( # type: ignore[misc] cls, original_resource: "FakeThingType", new_resource_name: str, cloudformation_json: Any, account_id: str, region_name: str, ) -> "FakeThingType": iot_backend = iot_backends[account_id][region_name] iot_backend.delete_thing_type(thing_type_name=original_resource.thing_type_name) return cls.create_from_cloudformation_json( new_resource_name, cloudformation_json, account_id, region_name ) @classmethod def delete_from_cloudformation_json( # type: ignore[misc] cls, resource_name: str, cloudformation_json: Any, account_id: str, region_name: str, ) -> None: properties = cloudformation_json["Properties"] thing_type_name = properties.get("ThingTypeName", resource_name) iot_backend = iot_backends[account_id][region_name] iot_backend.delete_thing_type(thing_type_name=thing_type_name) class FakeThingGroup(BaseModel): def __init__( self, thing_group_name: str, parent_group_name: str, thing_group_properties: Dict[str, str], account_id: str, region_name: str, thing_groups: Dict[str, "FakeThingGroup"], ): self.account_id = account_id self.region_name = region_name self.thing_group_name = thing_group_name self.thing_group_id = str(random.uuid4()) # I don't know the rule of id self.version = 1 # TODO: tmp self.parent_group_name = parent_group_name self.thing_group_properties = thing_group_properties or {} t = time.time() self.metadata: Dict[str, Any] = {"creationDate": int(t * 1000) / 1000.0} if parent_group_name: self.metadata["parentGroupName"] = parent_group_name # initilize rootToParentThingGroups if "rootToParentThingGroups" not in self.metadata: self.metadata["rootToParentThingGroups"] = [] # search for parent arn for thing_group in thing_groups.values(): if thing_group.thing_group_name == parent_group_name: parent_thing_group_structure = thing_group break # if parent arn found (should always be found) if parent_thing_group_structure: # copy parent's rootToParentThingGroups if "rootToParentThingGroups" in parent_thing_group_structure.metadata: self.metadata["rootToParentThingGroups"].extend( parent_thing_group_structure.metadata["rootToParentThingGroups"] ) self.metadata["rootToParentThingGroups"].extend( [ { "groupName": parent_group_name, "groupArn": parent_thing_group_structure.arn, # type: ignore } ] ) self.arn = f"arn:{get_partition(self.region_name)}:iot:{self.region_name}:{self.account_id}:thinggroup/{thing_group_name}" self.things: Dict[str, FakeThing] = OrderedDict() def to_dict(self) -> Dict[str, Any]: return { "thingGroupName": self.thing_group_name, "thingGroupId": self.thing_group_id, "version": self.version, "thingGroupProperties": self.thing_group_properties, "thingGroupMetadata": self.metadata, "thingGroupArn": self.arn, } class FakeCertificate(BaseModel): def __init__( self, certificate_pem: str, status: str, account_id: str, region_name: str, ca_certificate_id: Optional[str] = None, ): self.certificate_id = self.compute_cert_id(certificate_pem) self.arn = f"arn:{get_partition(region_name)}:iot:{region_name}:{account_id}:cert/{self.certificate_id}" self.certificate_pem = certificate_pem self.status = status self.owner = account_id self.transfer_data: Dict[str, str] = {} self.creation_date = time.time() self.last_modified_date = self.creation_date self.validity_not_before = time.time() - 86400 self.validity_not_after = time.time() + 86400 self.ca_certificate_id = ca_certificate_id def compute_cert_id(self, certificate_pem: str) -> str: if iot_use_valid_cert(): return self.compute_der_cert_id(certificate_pem) else: return self.compute_pem_cert_id(certificate_pem) def compute_pem_cert_id(self, certificate_pem: str) -> str: """ Original (incorrect) PEM-based hash kept for backwards compatibility with existing tests which may or may not use valid certs. Also useful for simplifying tests that don't require a real cert. """ m = hashlib.sha256() m.update(certificate_pem.encode("utf-8")) return m.hexdigest() def compute_der_cert_id(self, certificate_pem: str) -> str: """ Compute the certificate hash based on DER encoding """ return hashlib.sha256( x509.load_pem_x509_certificate( certificate_pem.encode("utf-8") ).public_bytes(serialization.Encoding.DER) ).hexdigest() def to_dict(self) -> Dict[str, Any]: return { "certificateArn": self.arn, "certificateId": self.certificate_id, "caCertificateId": self.ca_certificate_id, "status": self.status, "creationDate": self.creation_date, } def to_description_dict(self) -> Dict[str, Any]: """ You might need keys below in some situation - caCertificateId - previousOwnedBy """ return { "certificateArn": self.arn, "certificateId": self.certificate_id, "status": self.status, "certificatePem": self.certificate_pem, "ownedBy": self.owner, "creationDate": self.creation_date, "lastModifiedDate": self.last_modified_date, "validity": { "notBefore": self.validity_not_before, "notAfter": self.validity_not_after, }, "transferData": self.transfer_data, } class FakeCaCertificate(FakeCertificate): def __init__( self, ca_certificate: str, status: str, account_id: str, region_name: str, registration_config: Dict[str, str], ): super().__init__( certificate_pem=ca_certificate, status=status, account_id=account_id, region_name=region_name, ca_certificate_id=None, ) self.registration_config = registration_config class FakePolicy(CloudFormationModel): def __init__( self, name: str, document: Dict[str, Any], account_id: str, region_name: str, default_version_id: str = "1", ): self.name = name self.document = document self.arn = f"arn:{get_partition(region_name)}:iot:{region_name}:{account_id}:policy/{name}" self.default_version_id = default_version_id self.versions = [ FakePolicyVersion(self.name, document, True, account_id, region_name) ] self._max_version_id = self.versions[0]._version_id def to_get_dict(self) -> Dict[str, Any]: return { "policyName": self.name, "policyArn": self.arn, "policyDocument": self.document, "defaultVersionId": self.default_version_id, } def to_dict_at_creation(self) -> Dict[str, Any]: return { "policyName": self.name, "policyArn": self.arn, "policyDocument": self.document, "policyVersionId": self.default_version_id, } def to_dict(self) -> Dict[str, str]: return {"policyName": self.name, "policyArn": self.arn} @staticmethod def cloudformation_name_type() -> str: return "Policy" @staticmethod def cloudformation_type() -> str: return "AWS::IoT::Policy" @classmethod def has_cfn_attr(cls, attr: str) -> bool: return attr in [ "Arn", "Id", ] def get_cfn_attribute(self, attribute_name: str) -> Any: from moto.cloudformation.exceptions import UnformattedGetAttTemplateException if attribute_name == "Arn": return self.arn elif attribute_name == "Id": return self.name + "_" + str(self._max_version_id) raise UnformattedGetAttTemplateException() @classmethod def create_from_cloudformation_json( # type: ignore[misc] cls, resource_name: str, cloudformation_json: Any, account_id: str, region_name: str, **kwargs: Any, ) -> "FakePolicy": iot_backend = iot_backends[account_id][region_name] properties = cloudformation_json["Properties"] policy_name = properties.get("PolicyName", resource_name) policy_document = properties.get("PolicyDocument", {}) return iot_backend.create_policy( policy_name=policy_name, policy_document=policy_document ) @classmethod def update_from_cloudformation_json( # type: ignore[misc] cls, original_resource: "FakePolicy", new_resource_name: str, cloudformation_json: Any, account_id: str, region_name: str, ) -> "FakePolicy": iot_backend = iot_backends[account_id][region_name] properties = cloudformation_json["Properties"] new_policy_name = properties.get("PolicyName", new_resource_name) policy_document = properties.get("PolicyDocument", {}) if original_resource.name != new_policy_name: iot_backend.delete_policy(policy_name=original_resource.name) return iot_backend.create_policy( policy_name=new_policy_name, policy_document=policy_document ) else: iot_backend.create_policy_version( policy_name=original_resource.name, policy_document=policy_document, set_as_default=True, ) return original_resource @classmethod def delete_from_cloudformation_json( # type: ignore[misc] cls, resource_name: str, cloudformation_json: Any, account_id: str, region_name: str, ) -> None: properties = cloudformation_json["Properties"] policy_name = properties.get("PolicyName", resource_name) iot_backend = iot_backends[account_id][region_name] iot_backend.delete_policy(policy_name=policy_name) class FakePolicyVersion: def __init__( self, policy_name: str, document: Dict[str, Any], is_default: bool, account_id: str, region_name: str, version_id: int = 1, ): self.name = policy_name self.arn = f"arn:{get_partition(region_name)}:iot:{region_name}:{account_id}:policy/{policy_name}" self.document = document or {} self.is_default = is_default self._version_id = version_id self.create_datetime = time.mktime(datetime(2015, 1, 1).timetuple()) self.last_modified_datetime = time.mktime(datetime(2015, 1, 2).timetuple()) @property def version_id(self) -> str: return str(self._version_id) def to_get_dict(self) -> Dict[str, Any]: return { "policyName": self.name, "policyArn": self.arn, "policyDocument": self.document, "policyVersionId": self.version_id, "isDefaultVersion": self.is_default, "creationDate": self.create_datetime, "lastModifiedDate": self.last_modified_datetime, "generationId": self.version_id, } def to_dict_at_creation(self) -> Dict[str, Any]: return { "policyArn": self.arn, "policyDocument": self.document, "policyVersionId": self.version_id, "isDefaultVersion": self.is_default, } def to_dict(self) -> Dict[str, Any]: return { "versionId": self.version_id, "isDefaultVersion": self.is_default, "createDate": self.create_datetime, } class FakeJob(BaseModel): JOB_ID_REGEX_PATTERN = "[a-zA-Z0-9_-]" JOB_ID_REGEX = re.compile(JOB_ID_REGEX_PATTERN) def __init__( self, job_id: str, targets: List[str], document_source: str, document: str, description: str, presigned_url_config: Dict[str, Any], target_selection: str, job_executions_rollout_config: Dict[str, Any], document_parameters: Dict[str, str], abort_config: Dict[str, List[Dict[str, Any]]], job_execution_retry_config: Dict[str, Any], scheduling_config: Dict[str, Any], timeout_config: Dict[str, Any], account_id: str, region_name: str, ): if not self._job_id_matcher(self.JOB_ID_REGEX, job_id): raise InvalidRequestException() self.account_id = account_id self.region_name = region_name self.job_id = job_id self.job_arn = f"arn:{get_partition(self.region_name)}:iot:{self.region_name}:{self.account_id}:job/{job_id}" self.targets = targets self.document_source = document_source self.document = document self.force = False self.description = description self.presigned_url_config = presigned_url_config self.target_selection = target_selection self.job_executions_rollout_config = job_executions_rollout_config self.abort_config = abort_config self.job_execution_retry_config = job_execution_retry_config self.scheduling_config = scheduling_config self.timeout_config = timeout_config self.status = "QUEUED" # IN_PROGRESS | CANCELED | COMPLETED self.comment: Optional[str] = None self.reason_code: Optional[str] = None self.created_at = time.mktime(datetime(2015, 1, 1).timetuple()) self.last_updated_at = time.mktime(datetime(2015, 1, 1).timetuple()) self.completed_at = None self.job_process_details = { "processingTargets": targets, "numberOfQueuedThings": 1, "numberOfCanceledThings": 0, "numberOfSucceededThings": 0, "numberOfFailedThings": 0, "numberOfRejectedThings": 0, "numberOfInProgressThings": 0, "numberOfRemovedThings": 0, } self.document_parameters = document_parameters def to_dict(self) -> Dict[str, Any]: obj = { "jobArn": self.job_arn, "jobId": self.job_id, "targets": self.targets, "description": self.description, "presignedUrlConfig": self.presigned_url_config, "targetSelection": self.target_selection, "timeoutConfig": self.timeout_config, "status": self.status, "comment": self.comment, "forceCanceled": self.force, "reasonCode": self.reason_code, "createdAt": self.created_at, "lastUpdatedAt": self.last_updated_at, "completedAt": self.completed_at, "jobProcessDetails": self.job_process_details, "documentParameters": self.document_parameters, "document": self.document, "documentSource": self.document_source, } return obj def _job_id_matcher(self, regex: Pattern[str], argument: str) -> bool: regex_match = regex.match(argument) is not None length_match = len(argument) <= 64 return regex_match and length_match class FakeJobExecution(BaseModel): def __init__( self, job_id: str, thing_arn: str, status: str = "QUEUED", force_canceled: bool = False, status_details_map: Optional[Dict[str, Any]] = None, ): self.job_id = job_id self.status = status # IN_PROGRESS | CANCELED | COMPLETED self.force_canceled = force_canceled self.status_details_map = status_details_map or {} self.thing_arn = thing_arn self.queued_at = time.mktime(datetime(2015, 1, 1).timetuple()) self.started_at = time.mktime(datetime(2015, 1, 1).timetuple()) self.last_updated_at = time.mktime(datetime(2015, 1, 1).timetuple()) self.execution_number = 123 self.version_number = 123 self.approximate_seconds_before_time_out = 123 def to_get_dict(self) -> Dict[str, Any]: return { "jobId": self.job_id, "status": self.status, "forceCanceled": self.force_canceled, "statusDetails": {"detailsMap": self.status_details_map}, "thingArn": self.thing_arn, "queuedAt": self.queued_at, "startedAt": self.started_at, "lastUpdatedAt": self.last_updated_at, "executionNumber": self.execution_number, "versionNumber": self.version_number, "approximateSecondsBeforeTimedOut": self.approximate_seconds_before_time_out, } def to_dict(self) -> Dict[str, Any]: return { "jobId": self.job_id, "thingArn": self.thing_arn, "jobExecutionSummary": { "status": self.status, "queuedAt": self.queued_at, "startedAt": self.started_at, "lastUpdatedAt": self.last_updated_at, "executionNumber": self.execution_number, }, } class FakeJobTemplate(CloudFormationModel): JOB_TEMPLATE_ID_REGEX_PATTERN = "[a-zA-Z0-9_-]+" JOB_TEMPLATE_ID_REGEX = re.compile(JOB_TEMPLATE_ID_REGEX_PATTERN) def __init__( self, job_template_id: str, document_source: str, document: str, description: str, presigned_url_config: Dict[str, Any], job_executions_rollout_config: Dict[str, Any], abort_config: Dict[str, List[Dict[str, Any]]], job_execution_retry_config: Dict[str, Any], timeout_config: Dict[str, Any], account_id: str, region_name: str, ): if not self._job_template_id_matcher(job_template_id): raise InvalidRequestException() self.account_id = account_id self.region_name = region_name self.job_template_id = job_template_id self.job_template_arn = f"arn:{get_partition(self.region_name)}:iot:{self.region_name}:{self.account_id}:jobtemplate/{job_template_id}" self.document_source = document_source self.document = document self.description = description self.presigned_url_config = presigned_url_config self.job_executions_rollout_config = job_executions_rollout_config self.abort_config = abort_config self.job_execution_retry_config = job_execution_retry_config self.timeout_config = timeout_config self.created_at = time.mktime(datetime(2015, 1, 1).timetuple()) def to_dict(self) -> Dict[str, Union[str, float]]: return { "jobTemplateArn": self.job_template_arn, "jobTemplateId": self.job_template_id, "description": self.description, "createdAt": self.created_at, } def _job_template_id_matcher(self, argument: str) -> bool: return ( self.JOB_TEMPLATE_ID_REGEX.fullmatch(argument) is not None and len(argument) <= 64 ) @staticmethod def cloudformation_name_type() -> str: return "JobTemplate" @staticmethod def cloudformation_type() -> str: return "AWS::IoT::JobTemplate" @classmethod def has_cfn_attr(cls, attr: str) -> bool: return attr in [ "Arn", ] def get_cfn_attribute(self, attribute_name: str) -> Any: from moto.cloudformation.exceptions import UnformattedGetAttTemplateException if attribute_name == "Arn": return self.job_template_arn raise UnformattedGetAttTemplateException() @classmethod def create_from_cloudformation_json( # type: ignore[misc] cls, resource_name: str, cloudformation_json: Any, account_id: str, region_name: str, **kwargs: Any, ) -> "FakeJobTemplate": iot_backend = iot_backends[account_id][region_name] properties = cloudformation_json["Properties"] return iot_backend.create_job_template( job_template_id=properties.get("JobTemplateId", resource_name), document_source=properties.get("DocumentSource", ""), document=properties.get("Document"), description=properties.get("Description"), presigned_url_config=decapitalize_dict( properties.get("PresignedUrlConfig", {}) ), job_executions_rollout_config=decapitalize_dict( properties.get("JobExecutionsRolloutConfig", {}) ), abort_config=decapitalize_dict(properties.get("AbortConfig", {})), job_execution_retry_config=decapitalize_dict( properties.get("JobExecutionsRetryConfig", {}) ), timeout_config=decapitalize_dict(properties.get("TimeoutConfig", {})), ) @classmethod def update_from_cloudformation_json( # type: ignore[misc] cls, original_resource: "FakeJobTemplate", new_resource_name: str, cloudformation_json: Any, account_id: str, region_name: str, ) -> "FakeJobTemplate": iot_backend = iot_backends[account_id][region_name] iot_backend.delete_job_template( job_template_id=original_resource.job_template_id ) return cls.create_from_cloudformation_json( new_resource_name, cloudformation_json, account_id, region_name ) @classmethod def delete_from_cloudformation_json( # type: ignore[misc] cls, resource_name: str, cloudformation_json: Any, account_id: str, region_name: str, ) -> None: properties = cloudformation_json["Properties"] job_template_id = properties.get("JobTemplateId", resource_name) iot_backend = iot_backends[account_id][region_name] iot_backend.delete_job_template(job_template_id=job_template_id) class FakeEndpoint(BaseModel): def __init__(self, endpoint_type: str, region_name: str): if endpoint_type not in [ "iot:Data", "iot:Data-ATS", "iot:CredentialProvider", "iot:Jobs", ]: raise InvalidRequestException( " An error occurred (InvalidRequestException) when calling the DescribeEndpoint " f"operation: Endpoint type {endpoint_type} not recognized." ) self.region_name = region_name identifier = random.get_random_string(length=14, lower_case=True) if endpoint_type == "iot:Data": self.endpoint = f"{identifier}.iot.{self.region_name}.amazonaws.com" elif "iot:Data-ATS" in endpoint_type: self.endpoint = f"{identifier}-ats.iot.{self.region_name}.amazonaws.com" elif "iot:CredentialProvider" in endpoint_type: self.endpoint = ( f"{identifier}.credentials.iot.{self.region_name}.amazonaws.com" ) elif "iot:Jobs" in endpoint_type: self.endpoint = f"{identifier}.jobs.iot.{self.region_name}.amazonaws.com" self.endpoint_type = endpoint_type def to_get_dict(self) -> Dict[str, str]: return { "endpointAddress": self.endpoint, } def to_dict(self) -> Dict[str, str]: return { "endpointAddress": self.endpoint, } class FakeRule(BaseModel): def __init__( self, rule_name: str, description: str, created_at: int, rule_disabled: bool, topic_pattern: Optional[str], actions: List[Dict[str, Any]], error_action: Dict[str, Any], sql: str, aws_iot_sql_version: str, account_id: str, region_name: str, ): self.account_id = account_id self.region_name = region_name self.rule_name = rule_name self.description = description or "" self.created_at = created_at self.rule_disabled = bool(rule_disabled) self.topic_pattern = topic_pattern self.actions = actions or [] self.error_action = error_action or {} self.sql = sql self.aws_iot_sql_version = aws_iot_sql_version or "2016-03-23" self.arn = f"arn:{get_partition(self.region_name)}:iot:{self.region_name}:{self.account_id}:rule/{rule_name}" def to_get_dict(self) -> Dict[str, Any]: return { "rule": { "actions": self.actions, "awsIotSqlVersion": self.aws_iot_sql_version, "createdAt": self.created_at, "description": self.description, "errorAction": self.error_action, "ruleDisabled": self.rule_disabled, "ruleName": self.rule_name, "sql": self.sql, }, "ruleArn": self.arn, } def to_dict(self) -> Dict[str, Any]: return { "ruleName": self.rule_name, "createdAt": self.created_at, "ruleArn": self.arn, "ruleDisabled": self.rule_disabled, "topicPattern": self.topic_pattern, } class FakeDomainConfiguration(BaseModel): def __init__( self, account_id: str, region_name: str, domain_configuration_name: str, domain_name: str, server_certificate_arns: List[str], domain_configuration_status: str, service_type: str, authorizer_config: Optional[Dict[str, Any]], domain_type: str, ): if service_type and service_type not in ["DATA", "CREDENTIAL_PROVIDER", "JOBS"]: raise InvalidRequestException( "An error occurred (InvalidRequestException) when calling the DescribeDomainConfiguration " f"operation: Service type {service_type} not recognized." ) self.domain_configuration_name = domain_configuration_name self.domain_configuration_arn = f"arn:{get_partition(region_name)}:iot:{region_name}:{account_id}:domainconfiguration/{domain_configuration_name}/{random.get_random_string(length=5)}" self.domain_name = domain_name self.server_certificates = [] if server_certificate_arns: for sc in server_certificate_arns: self.server_certificates.append( {"serverCertificateArn": sc, "serverCertificateStatus": "VALID"} ) self.domain_configuration_status = domain_configuration_status self.service_type = service_type self.authorizer_config = authorizer_config self.domain_type = domain_type self.last_status_change_date = time.time() def to_description_dict(self) -> Dict[str, Any]: return { "domainConfigurationName": self.domain_configuration_name, "domainConfigurationArn": self.domain_configuration_arn, "domainName": self.domain_name, "serverCertificates": self.server_certificates, "authorizerConfig": self.authorizer_config, "domainConfigurationStatus": self.domain_configuration_status, "serviceType": self.service_type, "domainType": self.domain_type, "lastStatusChangeDate": self.last_status_change_date, } def to_dict(self) -> Dict[str, Any]: return { "domainConfigurationName": self.domain_configuration_name, "domainConfigurationArn": self.domain_configuration_arn, } class FakeRoleAlias(CloudFormationModel): def __init__( self, role_alias: str, role_arn: str, account_id: str, region_name: str, credential_duration_seconds: int = 3600, ): self.role_alias = role_alias self.role_arn = role_arn self.credential_duration_seconds = credential_duration_seconds self.account_id = account_id self.region_name = region_name self.creation_date = time.time() self.last_modified_date = self.creation_date self.arn = f"arn:{get_partition(self.region_name)}:iot:{self.region_name}:{self.account_id}:rolealias/{role_alias}" def to_dict(self) -> Dict[str, Any]: return { "roleAlias": self.role_alias, "roleAliasArn": self.arn, "roleArn": self.role_arn, "owner": self.account_id, "credentialDurationSeconds": self.credential_duration_seconds, "creationDate": self.creation_date, "lastModifiedDate": self.last_modified_date, } @staticmethod def cloudformation_name_type() -> str: return "RoleAlias" @staticmethod def cloudformation_type() -> str: return "AWS::IoT::RoleAlias" @classmethod def has_cfn_attr(cls, attr: str) -> bool: return attr in [ "RoleAliasArn", ] def get_cfn_attribute(self, attribute_name: str) -> Any: from moto.cloudformation.exceptions import UnformattedGetAttTemplateException if attribute_name == "RoleAliasArn": return self.arn raise UnformattedGetAttTemplateException() @classmethod def create_from_cloudformation_json( # type: ignore[misc] cls, resource_name: str, cloudformation_json: Any, account_id: str, region_name: str, **kwargs: Any, ) -> "FakeRoleAlias": iot_backend = iot_backends[account_id][region_name] properties = cloudformation_json["Properties"] role_alias_name = properties.get("RoleAlias", resource_name) role_arn = properties.get("RoleArn") credential_duration_seconds = properties.get("CredentialDurationSeconds", 3600) return iot_backend.create_role_alias( role_alias_name=role_alias_name, role_arn=role_arn, credential_duration_seconds=credential_duration_seconds, ) @classmethod def update_from_cloudformation_json( # type: ignore[misc] cls, original_resource: "FakeRoleAlias", new_resource_name: str, cloudformation_json: Any, account_id: str, region_name: str, ) -> "FakeRoleAlias": iot_backend = iot_backends[account_id][region_name] iot_backend.delete_role_alias(role_alias_name=original_resource.role_alias) return cls.create_from_cloudformation_json( new_resource_name, cloudformation_json, account_id, region_name ) @classmethod def delete_from_cloudformation_json( # type: ignore[misc] cls, resource_name: str, cloudformation_json: Any, account_id: str, region_name: str, ) -> None: properties = cloudformation_json["Properties"] role_alias_name = properties.get("RoleAlias", resource_name) iot_backend = iot_backends[account_id][region_name] iot_backend.delete_role_alias(role_alias_name=role_alias_name) @dataclass(frozen=True) class FakeConfigField: name: str type: str @dataclass(frozen=True) class FakeThingGroupIndexingConfiguration: customFields: List[FakeConfigField] = field(default_factory=list) managedFields: List[FakeConfigField] = field(default_factory=list) thingGroupIndexingMode: str = "OFF" @dataclass(frozen=True) class FakeThingIndexingConfigurationFilterGeoLocations: name: str order: str @dataclass(frozen=True) class FakeThingIndexingConfigurationFilter: geoLocations: List[FakeThingIndexingConfigurationFilterGeoLocations] = field( default_factory=list ) namedShadowNames: List[str] = field(default_factory=list) @dataclass(frozen=True) class FakeThingIndexingConfiguration: customFields: List[FakeConfigField] = field(default_factory=list) managedFields: List[FakeConfigField] = field(default_factory=list) filter: FakeThingIndexingConfigurationFilter = field( default_factory=FakeThingIndexingConfigurationFilter ) deviceDefenderIndexingMode: str = "OFF" namedShadowIndexingMode: str = "OFF" thingConnectivityIndexingMode: str = "OFF" thingIndexingMode: str = "OFF" @dataclass(frozen=True) class FakeIndexingConfigurationData: thingGroupIndexingConfiguration: FakeThingGroupIndexingConfiguration = field( default_factory=FakeThingGroupIndexingConfiguration ) thingIndexingConfiguration: FakeThingIndexingConfiguration = field( default_factory=FakeThingIndexingConfiguration ) class FakeIndexingConfiguration(BaseModel): def __init__(self, region_name: str, account_id: str) -> None: self.region_name = region_name self.account_id = account_id self.configuration = FakeIndexingConfigurationData() def to_dict(self) -> Dict[str, Any]: return asdict(self.configuration) def update_configuration( self, thingIndexingConfiguration: Dict[str, Any], thingGroupIndexingConfiguration: Dict[str, Any], ) -> None: self.configuration = replace( self.configuration, thingIndexingConfiguration=replace( self.configuration.thingIndexingConfiguration, **thingIndexingConfiguration, ), thingGroupIndexingConfiguration=replace( self.configuration.thingGroupIndexingConfiguration, **thingGroupIndexingConfiguration, ), ) class IoTBackend(BaseBackend): def __init__(self, region_name: str, account_id: str): super().__init__(region_name, account_id) self.things: Dict[str, FakeThing] = OrderedDict() self.jobs: Dict[str, FakeJob] = OrderedDict() self.jobs_templates: Dict[str, FakeJobTemplate] = OrderedDict() self.job_executions: Dict[Tuple[str, str], FakeJobExecution] = OrderedDict() self.thing_types: Dict[str, FakeThingType] = OrderedDict() self.thing_groups: Dict[str, FakeThingGroup] = OrderedDict() self.ca_certificates: Dict[str, FakeCaCertificate] = OrderedDict() self.certificates: Dict[str, FakeCertificate] = OrderedDict() self.policies: Dict[str, FakePolicy] = OrderedDict() self.principal_policies: Dict[Tuple[str, str], Tuple[str, FakePolicy]] = ( OrderedDict() ) self.principal_things: Dict[Tuple[str, str], Tuple[str, FakeThing]] = ( OrderedDict() ) self.rules: Dict[str, FakeRule] = OrderedDict() self.role_aliases: Dict[str, FakeRoleAlias] = OrderedDict() self.endpoint: Optional[FakeEndpoint] = None self.domain_configurations: Dict[str, FakeDomainConfiguration] = OrderedDict() self.indexing_configuration = FakeIndexingConfiguration(region_name, account_id) @staticmethod def default_vpc_endpoint_service( service_region: str, zones: List[str] ) -> List[Dict[str, str]]: """Default VPC endpoint service.""" return BaseBackend.default_vpc_endpoint_service_factory( service_region, zones, "iot" ) + BaseBackend.default_vpc_endpoint_service_factory( service_region, zones, "data.iot", private_dns_names=False, special_service_name="iot.data", policy_supported=False, ) def create_certificate_from_csr( self, csr: str, set_as_active: bool ) -> FakeCertificate: cert = x509.load_pem_x509_csr(csr.encode("utf-8"), default_backend()) pem = self._generate_certificate_pem( domain_name="example.com", subject=cert.subject ) return self.register_certificate( pem, ca_certificate_pem=None, set_as_active=set_as_active, status="INACTIVE" ) def _generate_certificate_pem( self, domain_name: str, subject: x509.Name, key: Optional[rsa.RSAPrivateKey] = None, ) -> str: sans = [x509.DNSName(domain_name)] key = key or rsa.generate_private_key( public_exponent=65537, key_size=2048, backend=default_backend() ) issuer = x509.Name( [ # C = US, O = Moto, OU = Server CA 1B, CN = Moto x509.NameAttribute(x509.NameOID.COUNTRY_NAME, "US"), x509.NameAttribute(x509.NameOID.ORGANIZATION_NAME, "Moto"), x509.NameAttribute( x509.NameOID.ORGANIZATIONAL_UNIT_NAME, "Server CA 1B" ), x509.NameAttribute(x509.NameOID.COMMON_NAME, "Moto"), ] ) cert = ( x509.CertificateBuilder() .subject_name(subject) .issuer_name(issuer) .public_key(key.public_key()) .serial_number(x509.random_serial_number()) .not_valid_before(utcnow()) .not_valid_after(utcnow() + timedelta(days=365)) .add_extension(x509.SubjectAlternativeName(sans), critical=False) .sign(key, hashes.SHA512(), default_backend()) ) return cert.public_bytes(serialization.Encoding.PEM).decode("utf-8") def create_thing( self, thing_name: str, thing_type_name: str, attribute_payload: Optional[Dict[str, Any]], ) -> FakeThing: thing_types = self.list_thing_types() thing_type = None if thing_type_name: filtered_thing_types = [ _ for _ in thing_types if _.thing_type_name == thing_type_name ] if len(filtered_thing_types) == 0: raise ResourceNotFoundException() thing_type = filtered_thing_types[0] if thing_type.metadata["deprecated"]: # note - typo (depreated) exists also in the original exception. raise InvalidRequestException( msg=f"Can not create new thing with depreated thing type:{thing_type_name}" ) if attribute_payload is None: attributes: Dict[str, Any] = {} elif "attributes" not in attribute_payload: attributes = {} else: attributes = attribute_payload["attributes"] thing = FakeThing( thing_name, thing_type, attributes, self.account_id, self.region_name ) self.things[thing.arn] = thing return thing def create_thing_type( self, thing_type_name: str, thing_type_properties: Dict[str, Any] ) -> Tuple[str, str]: if thing_type_properties is None: thing_type_properties = {} thing_type = FakeThingType( thing_type_name, thing_type_properties, self.account_id, self.region_name ) self.thing_types[thing_type.arn] = thing_type return thing_type.thing_type_name, thing_type.arn def list_thing_types( self, thing_type_name: Optional[str] = None ) -> Iterable[FakeThingType]: if thing_type_name: # It's weird but thing_type_name is filtered by forward match, not complete match return [ _ for _ in self.thing_types.values() if _.thing_type_name.startswith(thing_type_name) ] return self.thing_types.values() def list_things( self, attribute_name: str, attribute_value: str, thing_type_name: str, max_results: int, token: Optional[str], ) -> Tuple[Iterable[FakeThing], Optional[str]]: all_things = [_.to_dict() for _ in self.things.values()] if attribute_name is not None and thing_type_name is not None: filtered_things = list( filter( lambda elem: attribute_name in elem["attributes"] and elem["attributes"][attribute_name] == attribute_value and "thingTypeName" in elem and elem["thingTypeName"] == thing_type_name, all_things, ) ) elif attribute_name is not None and thing_type_name is None: filtered_things = list( filter( lambda elem: attribute_name in elem["attributes"] and elem["attributes"][attribute_name] == attribute_value, all_things, ) ) elif attribute_name is None and thing_type_name is not None: filtered_things = list( filter( lambda elem: "thingTypeName" in elem and elem["thingTypeName"] == thing_type_name, all_things, ) ) else: filtered_things = all_things if token is None: things = filtered_things[0:max_results] next_token = ( str(max_results) if len(filtered_things) > max_results else None ) else: int_token = int(token) things = filtered_things[int_token : int_token + max_results] next_token = ( str(int_token + max_results) if len(filtered_things) > int_token + max_results else None ) return things, next_token def describe_thing(self, thing_name: str) -> FakeThing: things = [_ for _ in self.things.values() if _.thing_name == thing_name] if len(things) == 0: raise ResourceNotFoundException() return things[0] def describe_thing_type(self, thing_type_name: str) -> FakeThingType: thing_types = [ _ for _ in self.thing_types.values() if _.thing_type_name == thing_type_name ] if len(thing_types) == 0: raise ResourceNotFoundException() return thing_types[0] def describe_endpoint(self, endpoint_type: str) -> FakeEndpoint: self.endpoint = FakeEndpoint(endpoint_type, self.region_name) return self.endpoint def delete_thing(self, thing_name: str) -> None: """ The ExpectedVersion-parameter is not yet implemented """ # can raise ResourceNotFoundError thing = self.describe_thing(thing_name) for k in list(self.principal_things.keys()): if k[1] == thing_name: raise ThingStillAttached(thing_name) del self.things[thing.arn] def delete_thing_type(self, thing_type_name: str) -> None: # can raise ResourceNotFoundError thing_type = self.describe_thing_type(thing_type_name) del self.thing_types[thing_type.arn] def deprecate_thing_type( self, thing_type_name: str, undo_deprecate: bool ) -> FakeThingType: thing_types = [ _ for _ in self.thing_types.values() if _.thing_type_name == thing_type_name ] if len(thing_types) == 0: raise ResourceNotFoundException() thing_types[0].metadata["deprecated"] = not undo_deprecate return thing_types[0] def update_thing( self, thing_name: str, thing_type_name: str, attribute_payload: Optional[Dict[str, Any]], remove_thing_type: bool, ) -> None: """ The ExpectedVersion-parameter is not yet implemented """ # if attributes payload = {}, nothing thing = self.describe_thing(thing_name) if remove_thing_type and thing_type_name: raise InvalidRequestException() # thing_type if thing_type_name: thing_types = self.list_thing_types() filtered_thing_types = [ _ for _ in thing_types if _.thing_type_name == thing_type_name ] if len(filtered_thing_types) == 0: raise ResourceNotFoundException() thing_type = filtered_thing_types[0] if thing_type.metadata["deprecated"]: raise InvalidRequestException( msg=f"Can not update a thing to use deprecated thing type: {thing_type_name}" ) thing.thing_type = thing_type if remove_thing_type: thing.thing_type = None # attribute if attribute_payload is not None and "attributes" in attribute_payload: do_merge = attribute_payload.get("merge", False) attributes = attribute_payload["attributes"] if not do_merge: thing.attributes = attributes else: thing.attributes.update(attributes) thing.attributes = {k: v for k, v in thing.attributes.items() if v} def create_keys_and_certificate( self, set_as_active: bool ) -> Tuple[FakeCertificate, Dict[str, str]]: # implement here # caCertificate can be blank private_key = rsa.generate_private_key( public_exponent=65537, key_size=2048, backend=default_backend() ) key_pair = { "PublicKey": private_key.public_key() .public_bytes( encoding=serialization.Encoding.PEM, format=serialization.PublicFormat.SubjectPublicKeyInfo, ) .decode("utf-8"), "PrivateKey": private_key.private_bytes( encoding=serialization.Encoding.PEM, format=serialization.PrivateFormat.TraditionalOpenSSL, encryption_algorithm=serialization.NoEncryption(), ).decode("utf-8"), } subject = x509.Name( [x509.NameAttribute(NameOID.COMMON_NAME, "AWS IoT Certificate")] ) certificate_pem = self._generate_certificate_pem( "getmoto.org", subject, key=private_key ) status = "ACTIVE" if set_as_active else "INACTIVE" certificate = FakeCertificate( certificate_pem, status, self.account_id, self.region_name ) self.certificates[certificate.certificate_id] = certificate return certificate, key_pair def delete_ca_certificate(self, certificate_id: str) -> None: cert = self.describe_ca_certificate(certificate_id) self._validation_delete(cert) del self.ca_certificates[certificate_id] def delete_certificate(self, certificate_id: str, force_delete: bool) -> None: cert = self.describe_certificate(certificate_id) self._validation_delete(cert, force_delete) del self.certificates[certificate_id] def _validation_delete( self, cert: FakeCertificate, force_delete: bool = False ) -> None: if cert.status == "ACTIVE": raise CertificateStateException( "Certificate must be deactivated (not ACTIVE) before deletion.", cert.certificate_id, ) certs = [ k[0] for k, v in self.principal_things.items() if self._get_principal(k[0]).certificate_id == cert.certificate_id ] if len(certs) > 0: raise DeleteConflictException( f"Things must be detached before deletion (arn: {certs[0]})" ) certs = [ k[0] for k, v in self.principal_policies.items() if self._get_principal(k[0]).certificate_id == cert.certificate_id ] if len(certs) > 0 and not force_delete: raise DeleteConflictException( "Certificate policies must be detached before deletion (arn: %s)" % certs[0] ) def describe_ca_certificate(self, certificate_id: str) -> FakeCaCertificate: if certificate_id not in self.ca_certificates: raise ResourceNotFoundException() return self.ca_certificates[certificate_id] def describe_certificate(self, certificate_id: str) -> FakeCertificate: certs = [ _ for _ in self.certificates.values() if _.certificate_id == certificate_id ] if len(certs) == 0: raise ResourceNotFoundException() return certs[0] def get_registration_code(self) -> str: return str(random.uuid4()) def list_certificates(self) -> Iterable[FakeCertificate]: """ Pagination is not yet implemented """ return self.certificates.values() def list_certificates_by_ca(self, ca_certificate_id: str) -> List[FakeCertificate]: """ Pagination is not yet implemented """ return [ cert for cert in self.certificates.values() if cert.ca_certificate_id == ca_certificate_id ] def __raise_if_certificate_already_exists( self, certificate_id: str, certificate_arn: str ) -> None: if certificate_id in self.certificates: raise ResourceAlreadyExistsException( "The certificate is already provisioned or registered", certificate_id, certificate_arn, ) def register_ca_certificate( self, ca_certificate: str, set_as_active: bool, registration_config: Dict[str, str], ) -> FakeCaCertificate: """ The VerificationCertificate-parameter is not yet implemented """ certificate = FakeCaCertificate( ca_certificate=ca_certificate, status="ACTIVE" if set_as_active else "INACTIVE", account_id=self.account_id, region_name=self.region_name, registration_config=registration_config, ) self.ca_certificates[certificate.certificate_id] = certificate return certificate def _find_ca_certificate(self, ca_certificate_pem: Optional[str]) -> Optional[str]: for ca_cert in self.ca_certificates.values(): if ca_cert.certificate_pem == ca_certificate_pem: return ca_cert.certificate_id return None def register_certificate( self, certificate_pem: str, ca_certificate_pem: Optional[str], set_as_active: bool, status: str, ) -> FakeCertificate: ca_certificate_id = self._find_ca_certificate(ca_certificate_pem) certificate = FakeCertificate( certificate_pem, "ACTIVE" if set_as_active else status, self.account_id, self.region_name, ca_certificate_id, ) self.__raise_if_certificate_already_exists( certificate.certificate_id, certificate_arn=certificate.arn ) self.certificates[certificate.certificate_id] = certificate return certificate def register_certificate_without_ca( self, certificate_pem: str, status: str ) -> FakeCertificate: certificate = FakeCertificate( certificate_pem, status, self.account_id, self.region_name ) self.__raise_if_certificate_already_exists( certificate.certificate_id, certificate_arn=certificate.arn ) self.certificates[certificate.certificate_id] = certificate return certificate def update_ca_certificate( self, certificate_id: str, new_status: Optional[str], config: Optional[Dict[str, str]], ) -> None: """ The newAutoRegistrationStatus and removeAutoRegistration-parameters are not yet implemented """ cert = self.describe_ca_certificate(certificate_id) if new_status is not None: cert.status = new_status if config is not None: cert.registration_config = config def update_certificate(self, certificate_id: str, new_status: str) -> None: cert = self.describe_certificate(certificate_id) # TODO: validate new_status cert.status = new_status def create_policy( self, policy_name: str, policy_document: Dict[str, Any] ) -> FakePolicy: if policy_name in self.policies: current_policy = self.policies[policy_name] raise ResourceAlreadyExistsException( f"Policy cannot be created - name already exists (name={policy_name})", current_policy.name, current_policy.arn, ) policy = FakePolicy( policy_name, policy_document, self.account_id, self.region_name ) self.policies[policy.name] = policy return policy def attach_policy(self, policy_name: str, target: str) -> None: principal = self._get_principal(target) policy = self.get_policy(policy_name) k = (target, policy_name) if k in self.principal_policies: return self.principal_policies[k] = (principal, policy) def detach_policy(self, policy_name: str, target: str) -> None: # this may raise ResourceNotFoundException self._get_principal(target) self.get_policy(policy_name) k = (target, policy_name) if k not in self.principal_policies: raise ResourceNotFoundException() del self.principal_policies[k] def list_attached_policies(self, target: str) -> List[FakePolicy]: """ Pagination is not yet implemented """ return [v[1] for k, v in self.principal_policies.items() if k[0] == target] def list_policies(self) -> Iterable[FakePolicy]: """ Pagination is not yet implemented """ return self.policies.values() def get_policy(self, policy_name: str) -> FakePolicy: policies = [_ for _ in self.policies.values() if _.name == policy_name] if len(policies) == 0: raise ResourceNotFoundException() return policies[0] def delete_policy(self, policy_name: str) -> None: policies = [ k[1] for k, v in self.principal_policies.items() if k[1] == policy_name ] if len(policies) > 0: raise DeleteConflictException( "The policy cannot be deleted as the policy is attached to one or more principals (name=%s)" % policy_name ) policy = self.get_policy(policy_name) if len(policy.versions) > 1: raise DeleteConflictException( "Cannot delete the policy because it has one or more policy versions attached to it (name=%s)" % policy_name ) del self.policies[policy.name] def create_policy_version( self, policy_name: str, policy_document: Dict[str, Any], set_as_default: bool ) -> FakePolicyVersion: policy = self.get_policy(policy_name) if not policy: raise ResourceNotFoundException() if len(policy.versions) >= 5: raise VersionsLimitExceededException(policy_name) policy._max_version_id += 1 version = FakePolicyVersion( policy_name, policy_document, set_as_default, self.account_id, self.region_name, version_id=policy._max_version_id, ) policy.versions.append(version) if set_as_default: self.set_default_policy_version(policy_name, version.version_id) return version def set_default_policy_version(self, policy_name: str, version_id: str) -> None: policy = self.get_policy(policy_name) if not policy: raise ResourceNotFoundException() for version in policy.versions: if version.version_id == version_id: version.is_default = True policy.default_version_id = version.version_id policy.document = version.document else: version.is_default = False def get_policy_version( self, policy_name: str, version_id: str ) -> FakePolicyVersion: policy = self.get_policy(policy_name) if not policy: raise ResourceNotFoundException() for version in policy.versions: if version.version_id == version_id: return version raise ResourceNotFoundException() def list_policy_versions(self, policy_name: str) -> Iterable[FakePolicyVersion]: policy = self.get_policy(policy_name) if not policy: raise ResourceNotFoundException() return policy.versions def delete_policy_version(self, policy_name: str, version_id: str) -> None: policy = self.get_policy(policy_name) if not policy: raise ResourceNotFoundException() if version_id == policy.default_version_id: raise InvalidRequestException( "Cannot delete the default version of a policy" ) for i, v in enumerate(policy.versions): if v.version_id == version_id: del policy.versions[i] return raise ResourceNotFoundException() def _get_principal(self, principal_arn: str) -> Any: """ raise ResourceNotFoundException """ if ":cert/" in principal_arn: certs = [_ for _ in self.certificates.values() if _.arn == principal_arn] if len(certs) == 0: raise ResourceNotFoundException() principal = certs[0] return principal if ":thinggroup/" in principal_arn: try: return self.thing_groups[principal_arn] except KeyError: raise ResourceNotFoundException( f"No thing group with ARN {principal_arn} exists" ) from moto.cognitoidentity import cognitoidentity_backends cognito = cognitoidentity_backends[self.account_id][self.region_name] identities = [] for identity_pool in cognito.identity_pools: pool_identities = cognito.pools_identities.get(identity_pool, {}) identities.extend( [pi["IdentityId"] for pi in pool_identities.get("Identities", [])] ) if principal_arn in identities: return {"IdentityId": principal_arn} raise ResourceNotFoundException() def attach_principal_policy(self, policy_name: str, principal_arn: str) -> None: principal = self._get_principal(principal_arn) policy = self.get_policy(policy_name) k = (principal_arn, policy_name) if k in self.principal_policies: return self.principal_policies[k] = (principal, policy) def detach_principal_policy(self, policy_name: str, principal_arn: str) -> None: # this may raises ResourceNotFoundException self._get_principal(principal_arn) self.get_policy(policy_name) k = (principal_arn, policy_name) if k not in self.principal_policies: raise ResourceNotFoundException() del self.principal_policies[k] def list_principal_policies(self, principal_arn: str) -> List[FakePolicy]: """ Pagination is not yet implemented """ policies = [ v[1] for k, v in self.principal_policies.items() if k[0] == principal_arn ] return policies def list_policy_principals(self, policy_name: str) -> List[str]: """ Pagination is not yet implemented """ # this action is deprecated # https://docs.aws.amazon.com/iot/latest/apireference/API_ListTargetsForPolicy.html # should use ListTargetsForPolicy instead principals = [ k[0] for k, v in self.principal_policies.items() if k[1] == policy_name ] return principals def list_targets_for_policy(self, policy_name: str) -> List[str]: """ Pagination is not yet implemented """ # This behaviour is different to list_policy_principals which will just return an empty list if policy_name not in self.policies: raise ResourceNotFoundException("Policy not found") return self.list_policy_principals(policy_name=policy_name) def attach_thing_principal(self, thing_name: str, principal_arn: str) -> None: principal = self._get_principal(principal_arn) thing = self.describe_thing(thing_name) k = (principal_arn, thing_name) if k in self.principal_things: return self.principal_things[k] = (principal, thing) def detach_thing_principal(self, thing_name: str, principal_arn: str) -> None: # this may raises ResourceNotFoundException self._get_principal(principal_arn) self.describe_thing(thing_name) k = (principal_arn, thing_name) if k not in self.principal_things: raise ResourceNotFoundException() del self.principal_things[k] def list_principal_things(self, principal_arn: str) -> List[str]: thing_names = [ k[1] for k, v in self.principal_things.items() if k[0] == principal_arn ] return thing_names def list_thing_principals(self, thing_name: str) -> List[str]: things = [_ for _ in self.things.values() if _.thing_name == thing_name] if len(things) == 0: raise ResourceNotFoundException( "Failed to list principals for thing %s because the thing does not exist in your account" % thing_name ) principals = [ k[0] for k, v in self.principal_things.items() if k[1] == thing_name ] return principals def describe_thing_group(self, thing_group_name: str) -> FakeThingGroup: thing_groups = [ _ for _ in self.thing_groups.values() if _.thing_group_name == thing_group_name ] if len(thing_groups) == 0: raise ResourceNotFoundException() return thing_groups[0] def create_thing_group( self, thing_group_name: str, parent_group_name: str, thing_group_properties: Dict[str, Any], ) -> Tuple[str, str, str]: thing_group = FakeThingGroup( thing_group_name, parent_group_name, thing_group_properties, self.account_id, self.region_name, self.thing_groups, ) # this behavior is not documented, but AWS does it like that # if a thing group with the same name exists, it's properties are compared # if they differ, an error is returned. # Otherwise, the old thing group is returned if thing_group.arn in self.thing_groups: current_thing_group = self.thing_groups[thing_group.arn] if current_thing_group.thing_group_properties != thing_group_properties: raise ResourceAlreadyExistsException( msg=f"Thing Group {thing_group_name} already exists in current account with different properties", resource_arn=thing_group.arn, resource_id=current_thing_group.thing_group_id, ) thing_group = current_thing_group else: self.thing_groups[thing_group.arn] = thing_group return thing_group.thing_group_name, thing_group.arn, thing_group.thing_group_id def delete_thing_group(self, thing_group_name: str) -> None: """ The ExpectedVersion-parameter is not yet implemented """ child_groups = [ thing_group for _, thing_group in self.thing_groups.items() if thing_group.parent_group_name == thing_group_name ] if len(child_groups) > 0: raise InvalidRequestException( " Cannot delete thing group : " + thing_group_name + " when there are still child groups attached to it" ) try: thing_group = self.describe_thing_group(thing_group_name) del self.thing_groups[thing_group.arn] except ResourceNotFoundException: # AWS returns success even if the thing group does not exist. pass def list_thing_groups( self, parent_group: Optional[str], name_prefix_filter: Optional[str], recursive: Optional[bool], ) -> List[FakeThingGroup]: if recursive is None: recursive = True if name_prefix_filter is None: name_prefix_filter = "" if parent_group and parent_group not in [ _.thing_group_name for _ in self.thing_groups.values() ]: raise ResourceNotFoundException() thing_groups = [ _ for _ in self.thing_groups.values() if _.parent_group_name == parent_group ] if recursive: for g in thing_groups: thing_groups.extend( self.list_thing_groups( parent_group=g.thing_group_name, name_prefix_filter=None, recursive=False, ) ) # thing_groups = groups_to_process.values() return [ _ for _ in thing_groups if _.thing_group_name.startswith(name_prefix_filter) ] def update_thing_group( self, thing_group_name: str, thing_group_properties: Dict[str, Any], expected_version: int, ) -> int: thing_group = self.describe_thing_group(thing_group_name) if expected_version and expected_version != thing_group.version: raise VersionConflictException(thing_group_name) attribute_payload = thing_group_properties.get("attributePayload", None) if attribute_payload is not None and "attributes" in attribute_payload: do_merge = attribute_payload.get("merge", False) attributes = attribute_payload["attributes"] if attributes: # might not exist yet, for example when the thing group was created without attributes current_attribute_payload = ( thing_group.thing_group_properties.setdefault( "attributePayload", {"attributes": {}}, # type: ignore ) ) if not do_merge: current_attribute_payload["attributes"] = attributes # type: ignore else: current_attribute_payload["attributes"].update(attributes) # type: ignore elif attribute_payload is not None and "attributes" not in attribute_payload: thing_group.attributes = {} # type: ignore if "thingGroupDescription" in thing_group_properties: thing_group.thing_group_properties["thingGroupDescription"] = ( thing_group_properties["thingGroupDescription"] ) thing_group.version = thing_group.version + 1 return thing_group.version def _identify_thing_group( self, thing_group_name: Optional[str], thing_group_arn: Optional[str] ) -> FakeThingGroup: # identify thing group if thing_group_name is None and thing_group_arn is None: raise InvalidRequestException( " Both thingGroupArn and thingGroupName are empty. Need to specify at least one of them" ) if thing_group_name is not None: thing_group = self.describe_thing_group(thing_group_name) if thing_group_arn and thing_group.arn != thing_group_arn: raise InvalidRequestException( "ThingGroupName thingGroupArn does not match specified thingGroupName in request" ) elif thing_group_arn is not None: if thing_group_arn not in self.thing_groups: raise InvalidRequestException() thing_group = self.thing_groups[thing_group_arn] return thing_group def _identify_thing( self, thing_name: Optional[str], thing_arn: Optional[str] ) -> FakeThing: # identify thing if thing_name is None and thing_arn is None: raise InvalidRequestException( "Both thingArn and thingName are empty. Need to specify at least one of them" ) if thing_name is not None: thing = self.describe_thing(thing_name) if thing_arn and thing.arn != thing_arn: raise InvalidRequestException( "ThingName thingArn does not match specified thingName in request" ) elif thing_arn is not None: if thing_arn not in self.things: raise InvalidRequestException() thing = self.things[thing_arn] return thing def add_thing_to_thing_group( self, thing_group_name: str, thing_group_arn: Optional[str], thing_name: str, thing_arn: Optional[str], ) -> None: thing_group = self._identify_thing_group(thing_group_name, thing_group_arn) thing = self._identify_thing(thing_name, thing_arn) if thing.arn in thing_group.things: # aws ignores duplicate registration return thing_group.things[thing.arn] = thing def remove_thing_from_thing_group( self, thing_group_name: str, thing_group_arn: Optional[str], thing_name: str, thing_arn: Optional[str], ) -> None: thing_group = self._identify_thing_group(thing_group_name, thing_group_arn) thing = self._identify_thing(thing_name, thing_arn) if thing.arn not in thing_group.things: # aws ignores non-registered thing return del thing_group.things[thing.arn] def list_things_in_thing_group(self, thing_group_name: str) -> Iterable[FakeThing]: """ Pagination and the recursive-parameter is not yet implemented """ thing_group = self.describe_thing_group(thing_group_name) return thing_group.things.values() def list_thing_groups_for_thing(self, thing_name: str) -> List[Dict[str, str]]: """ Pagination is not yet implemented """ thing = self.describe_thing(thing_name) all_thing_groups = self.list_thing_groups(None, None, None) ret = [] for thing_group in all_thing_groups: if thing.arn in thing_group.things: ret.append( { "groupName": thing_group.thing_group_name, "groupArn": thing_group.arn, } ) return ret def update_thing_groups_for_thing( self, thing_name: str, thing_groups_to_add: List[str], thing_groups_to_remove: List[str], ) -> None: thing = self.describe_thing(thing_name) for thing_group_name in thing_groups_to_add: thing_group = self.describe_thing_group(thing_group_name) self.add_thing_to_thing_group( thing_group.thing_group_name, None, thing.thing_name, None ) for thing_group_name in thing_groups_to_remove: thing_group = self.describe_thing_group(thing_group_name) self.remove_thing_from_thing_group( thing_group.thing_group_name, None, thing.thing_name, None ) def create_job( self, job_id: str, targets: List[str], document_source: str, document: str, description: str, presigned_url_config: Dict[str, Any], target_selection: str, job_executions_rollout_config: Dict[str, Any], document_parameters: Dict[str, str], abort_config: Dict[str, List[Dict[str, Any]]], job_execution_retry_config: Dict[str, Any], scheduling_config: Dict[str, Any], timeout_config: Dict[str, Any], ) -> Tuple[str, str, str]: job = FakeJob( job_id=job_id, targets=targets, document_source=document_source, document=document, description=description, presigned_url_config=presigned_url_config, target_selection=target_selection, job_executions_rollout_config=job_executions_rollout_config, abort_config=abort_config, job_execution_retry_config=job_execution_retry_config, scheduling_config=scheduling_config, timeout_config=timeout_config, document_parameters=document_parameters, account_id=self.account_id, region_name=self.region_name, ) self.jobs[job_id] = job for thing_arn in targets: thing_name = thing_arn.split(":")[-1].split("/")[-1] job_execution = FakeJobExecution(job_id, thing_arn) self.job_executions[(job_id, thing_name)] = job_execution return job.job_arn, job_id, description def describe_job(self, job_id: str) -> FakeJob: jobs = [_ for _ in self.jobs.values() if _.job_id == job_id] if len(jobs) == 0: raise ResourceNotFoundException() return jobs[0] def delete_job(self, job_id: str, force: bool) -> None: job = self.jobs[job_id] if job.status == "IN_PROGRESS" and force: del self.jobs[job_id] elif job.status != "IN_PROGRESS": del self.jobs[job_id] else: raise InvalidStateTransitionException() def cancel_job( self, job_id: str, reason_code: str, comment: str, force: bool ) -> FakeJob: job = self.jobs[job_id] job.reason_code = reason_code if reason_code is not None else job.reason_code job.comment = comment if comment is not None else job.comment job.force = force if force is not None and force != job.force else job.force job.status = "CANCELED" if job.status == "IN_PROGRESS" and force: self.jobs[job_id] = job elif job.status != "IN_PROGRESS": self.jobs[job_id] = job else: raise InvalidStateTransitionException() return job def get_job_document(self, job_id: str) -> FakeJob: return self.jobs[job_id] @paginate(PAGINATION_MODEL) # type: ignore[misc] def list_jobs(self) -> List[FakeJob]: """ The following parameter are not yet implemented: Status, TargetSelection, ThingGroupName, ThingGroupId """ return [_ for _ in self.jobs.values()] def describe_job_execution( self, job_id: str, thing_name: str, execution_number: int ) -> FakeJobExecution: try: job_execution = self.job_executions[(job_id, thing_name)] except KeyError: raise ResourceNotFoundException() if job_execution is None or ( execution_number is not None and job_execution.execution_number != execution_number ): raise ResourceNotFoundException() return job_execution def cancel_job_execution(self, job_id: str, thing_name: str, force: bool) -> None: """ The parameters ExpectedVersion and StatusDetails are not yet implemented """ job_execution = self.job_executions[(job_id, thing_name)] if job_execution is None: raise ResourceNotFoundException() job_execution.force_canceled = ( force if force is not None else job_execution.force_canceled ) # TODO: implement expected_version and status_details (at most 10 can be specified) if job_execution.status == "IN_PROGRESS" and force: job_execution.status = "CANCELED" self.job_executions[(job_id, thing_name)] = job_execution elif job_execution.status != "IN_PROGRESS": job_execution.status = "CANCELED" self.job_executions[(job_id, thing_name)] = job_execution else: raise InvalidStateTransitionException() def delete_job_execution( self, job_id: str, thing_name: str, execution_number: int, force: bool ) -> None: job_execution = self.job_executions[(job_id, thing_name)] if job_execution.execution_number != execution_number: raise ResourceNotFoundException() if job_execution.status == "IN_PROGRESS" and force: del self.job_executions[(job_id, thing_name)] elif job_execution.status != "IN_PROGRESS": del self.job_executions[(job_id, thing_name)] else: raise InvalidStateTransitionException() @paginate(PAGINATION_MODEL) def list_job_executions_for_job( self, job_id: str, status: str ) -> List[FakeJobExecution]: return [ job_exec for (_id, _), job_exec in self.job_executions.items() if _id == job_id and (not status or job_exec.status == status) ] @paginate(PAGINATION_MODEL) # type: ignore[misc] def list_job_executions_for_thing( self, thing_name: str, status: Optional[str] ) -> List[FakeJobExecution]: return [ job_exec for (_, name), job_exec in self.job_executions.items() if name == thing_name and (not status or job_exec.status == status) ] def list_topic_rules(self) -> List[Dict[str, Any]]: return [r.to_dict() for r in self.rules.values()] def get_topic_rule(self, rule_name: str) -> Dict[str, Any]: if rule_name not in self.rules: raise ResourceNotFoundException() return self.rules[rule_name].to_get_dict() def create_topic_rule(self, rule_name: str, sql: str, **kwargs: Any) -> None: if not re.match("^[a-zA-Z0-9_]+$", rule_name): msg = f"1 validation error detected: Value '{rule_name}' at 'ruleName' failed to satisfy constraint: Member must satisfy regular expression pattern: ^[a-zA-Z0-9_]+$" raise InvalidRequestException(msg) if rule_name in self.rules: raise ResourceAlreadyExistsException( "Rule with given name already exists", "", self.rules[rule_name].arn ) result = re.search(r"FROM\s+([^\s]*)", sql) topic = result.group(1).strip("'") if result else None self.rules[rule_name] = FakeRule( rule_name=rule_name, created_at=int(time.time()), topic_pattern=topic, sql=sql, account_id=self.account_id, region_name=self.region_name, **kwargs, ) def replace_topic_rule(self, rule_name: str, **kwargs: Any) -> None: self.delete_topic_rule(rule_name) self.create_topic_rule(rule_name, **kwargs) def delete_topic_rule(self, rule_name: str) -> None: if rule_name not in self.rules: raise ResourceNotFoundException() del self.rules[rule_name] def enable_topic_rule(self, rule_name: str) -> None: if rule_name not in self.rules: raise ResourceNotFoundException() self.rules[rule_name].rule_disabled = False def disable_topic_rule(self, rule_name: str) -> None: if rule_name not in self.rules: raise ResourceNotFoundException() self.rules[rule_name].rule_disabled = True def create_domain_configuration( self, domain_configuration_name: str, domain_name: str, server_certificate_arns: List[str], authorizer_config: Dict[str, Any], service_type: str, ) -> FakeDomainConfiguration: """ The ValidationCertificateArn-parameter is not yet implemented """ if domain_configuration_name in self.domain_configurations: raise ResourceAlreadyExistsException( "Domain configuration with given name already exists.", self.domain_configurations[ domain_configuration_name ].domain_configuration_name, self.domain_configurations[ domain_configuration_name ].domain_configuration_arn, ) self.domain_configurations[domain_configuration_name] = FakeDomainConfiguration( self.account_id, self.region_name, domain_configuration_name, domain_name, server_certificate_arns, "ENABLED", service_type, authorizer_config, "CUSTOMER_MANAGED", ) return self.domain_configurations[domain_configuration_name] def delete_domain_configuration(self, domain_configuration_name: str) -> None: if domain_configuration_name not in self.domain_configurations: raise ResourceNotFoundException("The specified resource does not exist.") del self.domain_configurations[domain_configuration_name] def describe_domain_configuration( self, domain_configuration_name: str ) -> FakeDomainConfiguration: if domain_configuration_name not in self.domain_configurations: raise ResourceNotFoundException("The specified resource does not exist.") return self.domain_configurations[domain_configuration_name] def list_domain_configurations(self) -> List[Dict[str, Any]]: return [_.to_dict() for _ in self.domain_configurations.values()] def update_domain_configuration( self, domain_configuration_name: str, authorizer_config: Dict[str, Any], domain_configuration_status: str, remove_authorizer_config: Optional[bool], ) -> FakeDomainConfiguration: if domain_configuration_name not in self.domain_configurations: raise ResourceNotFoundException("The specified resource does not exist.") domain_configuration = self.domain_configurations[domain_configuration_name] if authorizer_config is not None: domain_configuration.authorizer_config = authorizer_config if domain_configuration_status is not None: domain_configuration.domain_configuration_status = ( domain_configuration_status ) if remove_authorizer_config is not None and remove_authorizer_config is True: domain_configuration.authorizer_config = None return domain_configuration def search_index(self, query_string: str) -> List[Dict[str, Any]]: """ Pagination is not yet implemented. Only basic search queries are supported for now. """ things = [ thing for thing in self.things.values() if thing.matches(query_string) ] return [ t.to_dict( include_connectivity=True, include_thing_id=True, include_thing_group_names=True, include_shadows_as_json=True, ) for t in things ] def create_role_alias( self, role_alias_name: str, role_arn: str, credential_duration_seconds: int = 3600, ) -> FakeRoleAlias: if role_alias_name in self.role_aliases: current_role_alias = self.role_aliases[role_alias_name] raise ResourceAlreadyExistsException( f"RoleAlias cannot be created - already exists (name={role_alias_name})", current_role_alias.role_alias, current_role_alias.arn, ) new_role_alias = FakeRoleAlias( role_alias=role_alias_name, role_arn=role_arn, account_id=self.account_id, region_name=self.region_name, credential_duration_seconds=credential_duration_seconds, ) self.role_aliases[role_alias_name] = new_role_alias return new_role_alias def list_role_aliases(self) -> Iterable[FakeRoleAlias]: return self.role_aliases.values() def describe_role_alias(self, role_alias_name: str) -> FakeRoleAlias: if role_alias_name not in self.role_aliases: raise ResourceNotFoundException( f"RoleAlias not found (name= {role_alias_name})", ) return self.role_aliases[role_alias_name] def update_role_alias( self, role_alias_name: str, role_arn: Optional[str] = None, credential_duration_seconds: Optional[int] = None, ) -> FakeRoleAlias: role_alias = self.describe_role_alias(role_alias_name=role_alias_name) if role_arn: role_alias.role_arn = role_arn if credential_duration_seconds: role_alias.credential_duration_seconds = credential_duration_seconds return role_alias def delete_role_alias(self, role_alias_name: str) -> None: self.describe_role_alias(role_alias_name=role_alias_name) del self.role_aliases[role_alias_name] def get_indexing_configuration(self) -> Dict[str, Any]: return self.indexing_configuration.to_dict() def update_indexing_configuration( self, thingIndexingConfiguration: Dict[str, Any], thingGroupIndexingConfiguration: Dict[str, Any], ) -> None: self.indexing_configuration.update_configuration( thingIndexingConfiguration, thingGroupIndexingConfiguration ) def create_job_template( self, job_template_id: str, document_source: str, document: str, description: str, presigned_url_config: Dict[str, Any], job_executions_rollout_config: Dict[str, Any], abort_config: Dict[str, List[Dict[str, Any]]], job_execution_retry_config: Dict[str, Any], timeout_config: Dict[str, Any], ) -> "FakeJobTemplate": if job_template_id in self.jobs_templates: raise ConflictException(job_template_id) job_template = FakeJobTemplate( job_template_id=job_template_id, document_source=document_source, document=document, description=description, presigned_url_config=presigned_url_config, job_executions_rollout_config=job_executions_rollout_config, abort_config=abort_config, job_execution_retry_config=job_execution_retry_config, timeout_config=timeout_config, account_id=self.account_id, region_name=self.region_name, ) self.jobs_templates[job_template_id] = job_template return job_template @paginate(PAGINATION_MODEL) # type: ignore[misc] def list_job_templates(self) -> List[Dict[str, Union[str, float]]]: return [_.to_dict() for _ in self.jobs_templates.values()] def delete_job_template(self, job_template_id: str) -> None: if job_template_id not in self.jobs_templates: raise ResourceNotFoundException(f"Job template {job_template_id} not found") del self.jobs_templates[job_template_id] def describe_job_template(self, job_template_id: str) -> FakeJobTemplate: if job_template_id not in self.jobs_templates: raise ResourceNotFoundException(f"Job template {job_template_id} not found") return self.jobs_templates[job_template_id] iot_backends = BackendDict(IoTBackend, "iot")
Memory