import base64
import json
import uuid
from datetime import datetime, timedelta
from typing import Any, Dict, List, Optional, Union
from dateutil.tz import tzutc
from moto.core.base_backend import BackendDict, BaseBackend
from moto.core.common_models import BaseModel
from moto.moto_api._internal.managed_state_model import ManagedState
from moto.panorama.utils import (
arn_formatter,
deep_convert_datetime_to_isoformat,
generate_package_id,
hash_name,
)
from moto.utilities.paginator import paginate
from .exceptions import (
ValidationError,
)
PAGINATION_MODEL = {
"list_devices": {
"input_token": "next_token",
"limit_key": "max_results",
"limit_default": 123,
"unique_attribute": "device_id",
},
"list_nodes": {
"input_token": "next_token",
"limit_key": "max_results",
"limit_default": 123,
"unique_attribute": "package_id",
},
"list_application_instances": {
"input_token": "next_token",
"limit_key": "max_results",
"limit_default": 123,
"unique_attribute": "application_instance_id",
},
}
class BaseObject(BaseModel):
def camelCase(self, key: str) -> str:
words = []
for word in key.split("_"):
words.append(word.title())
return "".join(words)
def update(self, details_json: str) -> None:
details = json.loads(details_json)
for k in details.keys():
setattr(self, k, details[k])
def gen_response_object(self) -> Dict[str, Any]:
response_object: Dict[str, Any] = dict()
for key, value in self.__dict__.items():
if "_" in key:
response_object[self.camelCase(key)] = value
else:
response_object[key[0].upper() + key[1:]] = value
return response_object
def response_object(self) -> Dict[str, Any]:
return self.gen_response_object()
class Device(BaseObject):
def __init__(
self,
account_id: str,
region_name: str,
description: Optional[str],
name: str,
network_configuration: Optional[Dict[str, Any]],
tags: Optional[Dict[str, str]],
) -> None:
# ManagedState is a class that helps us manage the state of a resource.
# A Panorama Device has a lot of different states that has their own lifecycle.
# To make all this states in the same object, and avoid changing ManagedState,
# we use a ManagedState for each state and we manage them in the Device class.
# Each ManagedState has a name composed with attribute name and device name to make subscription easier.
self.__device_aggregated_status_manager = ManagedState(
model_name=f"panorama::device_{name}_aggregated_status",
transitions=[
("NOT-A-STATUS", "AWAITING_PROVISIONING"),
("AWAITING_PROVISIONING", "PENDING"),
("PENDING", "ONLINE"),
],
)
self.__device_provisioning_status_manager = ManagedState(
model_name=f"panorama::device_{name}_provisioning_status",
transitions=[
("NOT-A-STATUS", "AWAITING_PROVISIONING"),
("AWAITING_PROVISIONING", "PENDING"),
("PENDING", "SUCCEEDED"),
],
)
self.account_id = account_id
self.region_name = region_name
self.description = description
self.name = name
self.network_configuration = network_configuration
self.tags = tags
self.certificates = base64.b64encode("certificate".encode("utf-8")).decode(
"utf-8"
)
self.arn = arn_formatter("device", self.name, self.account_id, self.region_name)
self.device_id = f"device-{hash_name(name)}"
self.iot_thing_name = ""
self.alternate_softwares = [
{"Version": "0.2.1"},
]
self.brand: str = "AWS_PANORAMA" # AWS_PANORAMA | LENOVO
self.created_time = datetime.now(tzutc())
self.last_updated_time = datetime.now(tzutc())
self.current_networking_status = {
"Ethernet0Status": {
"ConnectionStatus": "CONNECTED",
"HwAddress": "8C:0F:5F:60:F5:C4",
"IpAddress": "192.168.1.300/24",
},
"Ethernet1Status": {
"ConnectionStatus": "NOT_CONNECTED",
"HwAddress": "8C:0F:6F:60:F4:F1",
"IpAddress": "--",
},
"LastUpdatedTime": datetime.now(tzutc()),
"NtpStatus": {
"ConnectionStatus": "CONNECTED",
"IpAddress": "91.224.149.41:123",
"NtpServerName": "0.pool.ntp.org",
},
}
self.current_software = "6.2.1"
self.device_connection_status: str = "ONLINE" # "ONLINE"|"OFFLINE"|"AWAITING_CREDENTIALS"|"NOT_AVAILABLE"|"ERROR"
self.latest_device_job = {"JobType": "REBOOT", "Status": "COMPLETED"}
self.latest_software = "6.2.1"
self.lease_expiration_time = datetime.now(tzutc()) + timedelta(days=5)
self.serial_number = "GAD81E29013274749"
self.type: str = "PANORAMA_APPLIANCE" # "PANORAMA_APPLIANCE_DEVELOPER_KIT", "PANORAMA_APPLIANCE"
@property
def device_aggregated_status(self) -> str:
self.__device_aggregated_status_manager.advance()
return self.__device_aggregated_status_manager.status # type: ignore[return-value]
@property
def provisioning_status(self) -> str:
self.__device_provisioning_status_manager.advance()
return self.__device_provisioning_status_manager.status # type: ignore[return-value]
def response_object(self) -> Dict[str, Any]:
response_object = super().gen_response_object()
response_object = deep_convert_datetime_to_isoformat(response_object)
static_response_fields = [
"AlternateSoftwares",
"Arn",
"Brand",
"CreatedTime",
"CurrentNetworkingStatus",
"CurrentSoftware",
"Description",
"DeviceConnectionStatus",
"DeviceId",
"LatestAlternateSoftware",
"LatestDeviceJob",
"LatestSoftware",
"LeaseExpirationTime",
"Name",
"NetworkConfiguration",
"SerialNumber",
"Tags",
"Type",
]
return {
**{
k: v
for k, v in response_object.items()
if v is not None and k in static_response_fields
},
**{
"DeviceAggregatedStatus": self.device_aggregated_status,
"ProvisioningStatus": self.provisioning_status,
},
}
def response_listed(self) -> Dict[str, Any]:
response_object = super().gen_response_object()
response_object = deep_convert_datetime_to_isoformat(response_object)
static_response_fields = [
"Brand",
"CreatedTime",
"CurrentSoftware",
"Description",
"DeviceId",
"LastUpdatedTime",
"LatestDeviceJob",
"LeaseExpirationTime",
"Name",
"Tags",
"Type",
]
return {
**{
k: v
for k, v in response_object.items()
if v is not None and k in static_response_fields
},
**{
"DeviceAggregatedStatus": self.device_aggregated_status,
"ProvisioningStatus": self.provisioning_status,
},
}
@property
def response_provision(self) -> Dict[str, Union[str, bytes]]:
return {
"Arn": self.arn,
"Certificates": self.certificates,
"DeviceId": self.device_id,
"IotThingName": self.iot_thing_name,
"Status": self.provisioning_status,
}
@property
def response_updated(self) -> Dict[str, str]:
return {"DeviceId": self.device_id}
@property
def response_deleted(self) -> Dict[str, str]:
return {"DeviceId": self.device_id}
class Package(BaseObject):
def __init__(
self,
category: str,
description: str,
name: str,
account_id: str,
region_name: str,
package_name: str,
package_version: str,
):
self.category = category
self.description = description
self.name = name
now = datetime.now(tzutc())
self.created_time = now
self.last_updated_time = now
self.package_name = package_name
self.patch_version = generate_package_id(self.package_name)
self.package_version = package_version
self.output_package_name = f"{self.package_name}-{self.package_version}-{self.patch_version[:8]}-{self.name}"
self.owner_account = account_id
self.package_id = f"package-{hash_name(package_name)}"
self.package_arn = arn_formatter(
"package", self.package_id, account_id, region_name
)
def response_object(self) -> Dict[str, Any]:
response_object = super().gen_response_object()
response_object = deep_convert_datetime_to_isoformat(response_object)
return response_object
def response_listed(self) -> Dict[str, Any]:
package_response = self.response_object()
return package_response
class ApplicationInstance(BaseObject):
def __init__(
self,
account_id: str,
region_name: str,
default_runtime_context_device: str,
default_runtime_context_device_name: str,
description: str,
manifest_overrides_payload: Dict[str, str],
manifest_payload: Dict[str, str],
name: str,
runtime_role_arn: str,
tags: Dict[str, str],
) -> None:
self.default_runtime_context_device = default_runtime_context_device
self.default_runtime_context_device_name = default_runtime_context_device_name
self.description = description
self.manifest_overrides_payload = manifest_overrides_payload
self.manifest_payload = manifest_payload
self.name = name
self.runtime_role_arn = runtime_role_arn
self.tags = tags
now = datetime.now(tzutc())
self.created_time = now
self.last_updated_time = now
name = f"{self.name}-{self.created_time}"
self.application_instance_id = f"applicationInstance-{hash_name(name).lower()}"
self.arn = arn_formatter(
"application-instance",
self.application_instance_id,
account_id=account_id,
region_name=region_name,
)
self.health_status = "RUNNING"
self.status = "DEPLOYMENT_SUCCEEDED"
self.status_description = "string"
self.runtime_context_states = [
{
"DesiredState": "RUNNING",
"DeviceReportedStatus": "RUNNING",
"DeviceReportedTime": now,
"RuntimeContextName": "string",
},
]
def add_new_runtime_context_states(
self, desired_state: str, device_reported_status: str
) -> None:
now = datetime.now(tzutc())
self.runtime_context_states.append(
{
"DesiredState": desired_state,
"DeviceReportedStatus": device_reported_status,
"DeviceReportedTime": now,
"RuntimeContextName": "string",
}
)
def response_object(self) -> Dict[str, Any]:
response_object = super().gen_response_object()
response_object = deep_convert_datetime_to_isoformat(response_object)
return response_object
def response_listed(self) -> Dict[str, Any]:
package_response = self.response_object()
return package_response
def response_created(self) -> Dict[str, str]:
return {"ApplicationInstanceId": self.application_instance_id}
def response_describe(self) -> Dict[str, str]:
response_object = self.response_object()
return response_object
class Node(BaseObject):
def __init__(
self,
job_id: str,
job_tags: List[Dict[str, Union[str, Dict[str, str]]]],
node_description: str,
node_name: str,
output_package_name: str,
output_package_version: str,
template_parameters: Dict[str, str],
template_type: str,
) -> None:
self.job_id = job_id
now = datetime.now(tzutc())
self.created_time = now
self.last_updated_time = now
self.job_tags = job_tags
self.node_description = node_name
self.status = "PENDING"
self.node_name = node_name
self.output_package_name = output_package_name
self.output_package_version = output_package_version
self.template_parameters = self.protect_secrets(template_parameters)
self.template_type = template_type
def response_object(self) -> Dict[str, Any]:
response_object = super().gen_response_object()
response_object = deep_convert_datetime_to_isoformat(response_object)
return response_object
def response_created(self) -> Dict[str, str]:
return {"JobId": self.job_id}
def response_described(self) -> Dict[str, Any]:
return self.response_object()
@staticmethod
def protect_secrets(template_parameters: Dict[str, str]) -> Dict[str, str]:
for key in template_parameters.keys():
if key.lower() == "password":
template_parameters[key] = "SAVED_AS_SECRET"
if key.lower() == "username":
template_parameters[key] = "SAVED_AS_SECRET"
return template_parameters
class PanoramaBackend(BaseBackend):
def __init__(self, region_name: str, account_id: str):
super().__init__(region_name, account_id)
self.devices_memory: Dict[str, Device] = {}
self.node_from_template_memory: Dict[str, Node] = {}
self.nodes_memory: Dict[str, Package] = {}
self.application_instances_memory: Dict[str, ApplicationInstance] = {}
def provision_device(
self,
description: Optional[str],
name: str,
networking_configuration: Optional[Dict[str, Any]],
tags: Optional[Dict[str, str]],
) -> Device:
device_obj = Device(
account_id=self.account_id,
region_name=self.region_name,
description=description,
name=name,
network_configuration=networking_configuration,
tags=tags,
)
self.devices_memory[device_obj.device_id] = device_obj
return device_obj
def describe_device(self, device_id: str) -> Device:
device = self.devices_memory.get(device_id)
if device is None:
raise ValidationError(f"Device {device_id} not found")
return device
@paginate(pagination_model=PAGINATION_MODEL)
def list_devices(
self,
device_aggregated_status_filter: str,
name_filter: str,
sort_by: str, # "DEVICE_ID", "CREATED_TIME", "NAME", "DEVICE_AGGREGATED_STATUS"
sort_order: str, # "ASCENDING", "DESCENDING"
) -> List[Device]:
devices_list = list(
filter(
lambda x: (name_filter is None or x.name.startswith(name_filter))
and (
device_aggregated_status_filter is None
or x.device_aggregated_status == device_aggregated_status_filter
),
self.devices_memory.values(),
)
)
devices_list = list(
sorted(
devices_list,
key={
"DEVICE_ID": lambda x: x.device_id,
"CREATED_TIME": lambda x: x.created_time,
"NAME": lambda x: x.name,
"DEVICE_AGGREGATED_STATUS": lambda x: x.device_aggregated_status,
None: lambda x: x.created_time,
}[sort_by],
reverse=sort_order == "DESCENDING",
)
)
return devices_list
def update_device_metadata(self, device_id: str, description: str) -> Device:
self.devices_memory[device_id].description = description
return self.devices_memory[device_id]
def delete_device(self, device_id: str) -> Device:
return self.devices_memory.pop(device_id)
def create_node_from_template_job(
self,
job_tags: List[Dict[str, Union[str, Dict[str, str]]]],
node_description: str,
node_name: str,
output_package_name: str,
output_package_version: str,
template_parameters: Dict[str, str],
template_type: str,
) -> Node:
job_id = str(uuid.uuid4()).lower()
self.node_from_template_memory[job_id] = Node(
job_id=job_id,
job_tags=job_tags,
node_description=node_description,
node_name=node_name,
output_package_name=output_package_name,
output_package_version=output_package_version,
template_parameters=template_parameters,
template_type=template_type,
)
if template_type == "RTSP_CAMERA_STREAM":
package = Package(
category="MEDIA_SOURCE",
description=node_description,
name=node_name,
account_id=self.account_id,
region_name=self.region_name,
package_name=output_package_name,
package_version=output_package_version,
)
self.nodes_memory[package.package_id] = package
return self.node_from_template_memory[job_id]
def describe_node_from_template_job(self, job_id: str) -> Node:
return self.node_from_template_memory[job_id]
@paginate(pagination_model=PAGINATION_MODEL)
def list_nodes(self, category: str) -> List[Package]:
category_nodes = list(
filter(
lambda x: x.category == category,
self.nodes_memory.values(),
)
)
return category_nodes
def create_application_instance(
self,
application_instance_id_to_replace: Optional[str],
default_runtime_context_device: str,
description: str,
manifest_overrides_payload: Dict[str, str],
manifest_payload: Dict[str, str],
name: str,
runtime_role_arn: str,
tags: Dict[str, str],
) -> ApplicationInstance:
device = self.devices_memory.get(default_runtime_context_device)
if device is None:
raise ValidationError(f"Device {default_runtime_context_device} not found")
if (
application_instance_id_to_replace
and application_instance_id_to_replace in self.application_instances_memory
):
removed_application_instance = self.application_instances_memory[
application_instance_id_to_replace
]
removed_application_instance.status = "REMOVAL_SUCCEEDED"
removed_application_instance.add_new_runtime_context_states(
desired_state="REMOVED", device_reported_status="REMOVAL_IN_PROGRESS"
)
application_instance = ApplicationInstance(
account_id=self.account_id,
region_name=self.region_name,
default_runtime_context_device=default_runtime_context_device,
default_runtime_context_device_name=device.name,
description=description,
manifest_overrides_payload=manifest_overrides_payload,
manifest_payload=manifest_payload,
name=name,
runtime_role_arn=runtime_role_arn,
tags=tags,
)
self.application_instances_memory[
application_instance.application_instance_id
] = application_instance
return application_instance
def describe_application_instance(
self, application_instance_id: str
) -> ApplicationInstance:
application_instance = self.application_instances_memory[
application_instance_id
]
return application_instance
@paginate(pagination_model=PAGINATION_MODEL)
def list_application_instances(
self,
device_id: Optional[str],
status_filter: Optional[str],
) -> List[ApplicationInstance]:
filtered_application_instances = filter(
lambda x: x.status == status_filter if status_filter else True,
filter(
lambda x: x.default_runtime_context_device == device_id
if device_id
else True,
self.application_instances_memory.values(),
),
)
return list(filtered_application_instances)
panorama_backends = BackendDict(
PanoramaBackend,
"panorama",
False,
additional_regions=[
"us-east-1",
"us-west-2",
"ca-central-1",
"eu-west-1",
"ap-southeast-2",
"ap-southeast-1",
],
)