import json import time from typing import Any, Dict, List, Optional, Tuple from moto.core.base_backend import BackendDict, BaseBackend from moto.core.common_models import BaseModel from moto.core.utils import merge_dicts from moto.iot.models import IoTBackend, iot_backends from .exceptions import ( ConflictException, InvalidRequestException, ResourceNotFoundException, ) class FakeShadow(BaseModel): """See the specification: http://docs.aws.amazon.com/iot/latest/developerguide/thing-shadow-document-syntax.html """ def __init__( self, desired: Optional[str], reported: Optional[str], requested_payload: Optional[Dict[str, Any]], version: int, deleted: bool = False, ): self.desired = desired self.reported = reported self.requested_payload = requested_payload self.version = version self.timestamp = int(time.time()) self.deleted = deleted self.metadata_desired = self._create_metadata_from_state( self.desired, self.timestamp ) self.metadata_reported = self._create_metadata_from_state( self.reported, self.timestamp ) @classmethod def create_from_previous_version( # type: ignore[misc] cls, previous_shadow: Optional["FakeShadow"], payload: Optional[Dict[str, Any]] ) -> "FakeShadow": """ set None to payload when you want to delete shadow """ version, previous_payload = ( (previous_shadow.version + 1, previous_shadow.to_dict(include_delta=False)) if previous_shadow else (1, {}) ) if payload is None: # if given payload is None, delete existing payload # this means the request was delete_thing_shadow shadow = FakeShadow(None, None, None, version, deleted=True) return shadow # Updates affect only the fields specified in the request state document. # Any field with a value of None is removed from the device's shadow. state_document = previous_payload.copy() merge_dicts(state_document, payload, remove_nulls=True) desired = state_document.get("state", {}).get("desired") reported = state_document.get("state", {}).get("reported") return FakeShadow(desired, reported, payload, version) @classmethod def parse_payload(cls, desired: Any, reported: Any) -> Any: # type: ignore[misc] if not desired and not reported: delta = None elif reported is None and desired: delta = desired elif desired and reported: delta = cls._compute_delta_dict(desired, reported) else: delta = None return delta @classmethod def _compute_delta_dict(cls, desired: Any, reported: Any) -> Dict[str, Any]: # type: ignore[misc] delta = {} for key, value in desired.items(): delta_value = cls._compute_delta(reported.get(key), value) if delta_value is not None: delta[key] = delta_value return delta @classmethod def _compute_delta(cls, reported_value: Any, desired_value: Any) -> Any: # type: ignore[misc] if reported_value == desired_value: return None if isinstance(desired_value, dict) and isinstance(reported_value, dict): return cls._compute_delta_dict(desired_value, reported_value) # Types are different, or # Both types are intrinsic values (str, int, etc), or # Both types are lists: # # Just return the desired value return desired_value def _create_metadata_from_state(self, state: Any, ts: Any) -> Any: """ state must be desired or reported stype dict object replaces primitive type with {"timestamp": ts} in dict """ if state is None: return None def _f(elem: Any, ts: Any) -> Any: if isinstance(elem, dict): return {_: _f(elem[_], ts) for _ in elem.keys()} if isinstance(elem, list): return [_f(_, ts) for _ in elem] return {"timestamp": ts} return _f(state, ts) def to_response_dict(self) -> Dict[str, Any]: desired = self.requested_payload["state"].get("desired", None) # type: ignore reported = self.requested_payload["state"].get("reported", None) # type: ignore payload = {} if desired is not None: payload["desired"] = desired if reported is not None: payload["reported"] = reported metadata = {} if desired is not None: metadata["desired"] = self._create_metadata_from_state( desired, self.timestamp ) if reported is not None: metadata["reported"] = self._create_metadata_from_state( reported, self.timestamp ) return { "state": payload, "metadata": metadata, "timestamp": self.timestamp, "version": self.version, } def to_dict(self, include_delta: bool = True) -> Dict[str, Any]: """returning nothing except for just top-level keys for now.""" if self.deleted: return {"timestamp": self.timestamp, "version": self.version} delta = self.parse_payload(self.desired, self.reported) payload = {} if self.desired: payload["desired"] = self.desired if self.reported: payload["reported"] = self.reported if include_delta and delta: payload["delta"] = delta metadata = {} if self.metadata_desired is not None: metadata["desired"] = self.metadata_desired if self.metadata_reported is not None: metadata["reported"] = self.metadata_reported return { "state": payload, "metadata": metadata, "timestamp": self.timestamp, "version": self.version, } class IoTDataPlaneBackend(BaseBackend): def __init__(self, region_name: str, account_id: str): super().__init__(region_name, account_id) self.published_payloads: List[Tuple[str, bytes]] = list() @property def iot_backend(self) -> IoTBackend: return iot_backends[self.account_id][self.region_name] def update_thing_shadow( self, thing_name: str, payload: str, shadow_name: Optional[str] ) -> FakeShadow: """ spec of payload: - need node `state` - state node must be an Object - State contains an invalid node: 'foo' """ thing = self.iot_backend.describe_thing(thing_name) # validate try: _payload = json.loads(payload) except ValueError: raise InvalidRequestException("invalid json") if "state" not in _payload: raise InvalidRequestException("need node `state`") if not isinstance(_payload["state"], dict): raise InvalidRequestException("state node must be an Object") if any(_ for _ in _payload["state"].keys() if _ not in ["desired", "reported"]): raise InvalidRequestException("State contains an invalid node") thing_shadow = thing.thing_shadows.get(shadow_name) if "version" in _payload and thing_shadow.version != _payload["version"]: # type: ignore raise ConflictException("Version conflict") new_shadow = FakeShadow.create_from_previous_version(thing_shadow, _payload) thing.thing_shadows[shadow_name] = new_shadow return new_shadow def get_thing_shadow( self, thing_name: str, shadow_name: Optional[str] ) -> FakeShadow: thing = self.iot_backend.describe_thing(thing_name) thing_shadow = thing.thing_shadows.get(shadow_name) if thing_shadow is None or thing_shadow.deleted: raise ResourceNotFoundException() return thing_shadow def delete_thing_shadow( self, thing_name: str, shadow_name: Optional[str] ) -> FakeShadow: thing = self.iot_backend.describe_thing(thing_name) thing_shadow = thing.thing_shadows.get(shadow_name) if thing_shadow is None: raise ResourceNotFoundException() payload = None new_shadow = FakeShadow.create_from_previous_version(thing_shadow, payload) thing.thing_shadows[shadow_name] = new_shadow return new_shadow def publish(self, topic: str, payload: bytes) -> None: self.published_payloads.append((topic, payload)) def list_named_shadows_for_thing(self, thing_name: str) -> List[str]: thing = self.iot_backend.describe_thing(thing_name) return [name for name in thing.thing_shadows.keys() if name is not None] iotdata_backends = BackendDict(IoTDataPlaneBackend, "iot-data")
Memory