from dataclasses import asdict, dataclass, field from typing import Any, Dict, List, Optional from moto.appmesh.dataclasses.shared import ( Duration, Metadata, MissingField, Status, Timeout, ) from moto.appmesh.utils.common import clean_dict @dataclass class CertificateFile: certificate_chain: str def to_dict(self) -> Dict[str, Any]: # type: ignore[misc] return {"certificateChain": self.certificate_chain} @dataclass class CertificateFileWithPrivateKey(CertificateFile): private_key: str def to_dict(self) -> Dict[str, Any]: # type: ignore[misc] return { "certificateChain": self.certificate_chain, "privateKey": self.private_key, } @dataclass class SDS: secret_name: str def to_dict(self) -> Dict[str, Any]: # type: ignore[misc] return {"secretName": self.secret_name} @dataclass class Certificate: file: Optional[CertificateFileWithPrivateKey] sds: Optional[SDS] def to_dict(self) -> Dict[str, Any]: # type: ignore[misc] return clean_dict( { "file": (self.file or MissingField()).to_dict(), "sds": (self.sds or MissingField()).to_dict(), } ) @dataclass class ListenerCertificateACM: certificate_arn: str def to_dict(self) -> Dict[str, Any]: # type: ignore[misc] return {"certificateArn": self.certificate_arn} @dataclass class TLSListenerCertificate(Certificate): acm: Optional[ListenerCertificateACM] def to_dict(self) -> Dict[str, Any]: # type: ignore[misc] return clean_dict( { "acm": (self.acm or MissingField()).to_dict(), "file": (self.file or MissingField()).to_dict(), "sds": (self.sds or MissingField()).to_dict(), } ) @dataclass class Match: exact: List[str] to_dict = asdict @dataclass class SubjectAlternativeNames: match: Match def to_dict(self) -> Dict[str, Any]: # type: ignore[misc] return {"match": self.match.to_dict()} @dataclass class ACM: certificate_authority_arns: List[str] def to_dict(self) -> Dict[str, Any]: # type: ignore[misc] return {"certificateAuthorityArns": self.certificate_authority_arns} @dataclass class Trust: file: Optional[CertificateFile] sds: Optional[SDS] def to_dict(self) -> Dict[str, Any]: # type: ignore[misc] return clean_dict( { "file": (self.file or MissingField()).to_dict(), "sds": (self.sds or MissingField()).to_dict(), } ) @dataclass class BackendTrust(Trust): acm: Optional[ACM] def to_dict(self) -> Dict[str, Any]: # type: ignore[misc] return clean_dict( { "acm": (self.acm or MissingField()).to_dict(), "file": (self.file or MissingField()).to_dict(), "sds": (self.sds or MissingField()).to_dict(), } ) @dataclass class Validation: subject_alternative_names: Optional[SubjectAlternativeNames] def to_dict(self) -> Dict[str, Any]: # type: ignore[misc] return clean_dict( { "subjectAlternativeNames": ( self.subject_alternative_names or MissingField() ).to_dict() } ) @dataclass class TLSBackendValidation(Validation): trust: BackendTrust def to_dict(self) -> Dict[str, Any]: # type: ignore[misc] return clean_dict( { "subjectAlternativeNames": ( self.subject_alternative_names or MissingField() ).to_dict(), "trust": self.trust.to_dict(), } ) @dataclass class TLSListenerValidation(Validation): trust: Trust def to_dict(self) -> Dict[str, Any]: # type: ignore[misc] return clean_dict( { "subjectAlternativeNames": ( self.subject_alternative_names or MissingField() ).to_dict(), "trust": self.trust.to_dict(), } ) @dataclass class TLSClientPolicy: certificate: Optional[Certificate] enforce: Optional[bool] ports: Optional[List[int]] validation: TLSBackendValidation def to_dict(self) -> Dict[str, Any]: # type: ignore[misc] return clean_dict( { "certificate": (self.certificate or MissingField()).to_dict(), "enforce": self.enforce, "ports": self.ports, "validation": self.validation.to_dict(), } ) @dataclass class ClientPolicy: tls: Optional[TLSClientPolicy] def to_dict(self) -> Dict[str, Any]: # type: ignore[misc] return clean_dict({"tls": (self.tls or MissingField()).to_dict()}) @dataclass class BackendDefaults: client_policy: Optional[ClientPolicy] def to_dict(self) -> Dict[str, Any]: # type: ignore[misc] return clean_dict( {"clientPolicy": (self.client_policy or MissingField()).to_dict()} ) @dataclass class VirtualService: client_policy: Optional[ClientPolicy] virtual_service_name: str def to_dict(self) -> Dict[str, Any]: # type: ignore[misc] return clean_dict( { "clientPolicy": (self.client_policy or MissingField()).to_dict(), "virtualServiceName": self.virtual_service_name, } ) @dataclass class Backend: virtual_service: Optional[VirtualService] def to_dict(self) -> Dict[str, Any]: # type: ignore[misc] return clean_dict( {"virtualService": (self.virtual_service or MissingField()).to_dict()} ) @dataclass class HTTPConnection: max_connections: int max_pending_requests: Optional[int] def to_dict(self) -> Dict[str, Any]: # type: ignore[misc] return clean_dict( { "maxConnections": self.max_connections, "maxPendingRequests": self.max_pending_requests, } ) @dataclass class GRPCOrHTTP2Connection: max_requests: int def to_dict(self) -> Dict[str, Any]: # type: ignore[misc] return {"maxRequests": self.max_requests} @dataclass class TCPConnection: max_connections: int def to_dict(self) -> Dict[str, Any]: # type: ignore[misc] return {"maxConnections": self.max_connections} @dataclass class ConnectionPool: grpc: Optional[GRPCOrHTTP2Connection] http: Optional[HTTPConnection] http2: Optional[GRPCOrHTTP2Connection] tcp: Optional[TCPConnection] def to_dict(self) -> Dict[str, Any]: # type: ignore[misc] return clean_dict( { "grpc": (self.grpc or MissingField()).to_dict(), "http": (self.http or MissingField()).to_dict(), "http2": (self.http2 or MissingField()).to_dict(), "tcp": (self.tcp or MissingField()).to_dict(), } ) @dataclass class HealthCheck: healthy_threshold: int interval_millis: int path: Optional[str] port: Optional[int] protocol: str timeout_millis: int unhealthy_threshold: int def to_dict(self) -> Dict[str, Any]: # type: ignore[misc] return clean_dict( { "healthyThreshold": self.healthy_threshold, "intervalMillis": self.interval_millis, "path": self.path, "port": self.port, "protocol": self.protocol, "timeoutMillis": self.timeout_millis, "unhealthyThreshold": self.unhealthy_threshold, } ) @dataclass class OutlierDetection: base_ejection_duration: Duration interval: Duration max_ejection_percent: int max_server_errors: int def to_dict(self) -> Dict[str, Any]: # type: ignore[misc] return { "baseEjectionDuration": self.base_ejection_duration.to_dict(), "interval": self.interval.to_dict(), "maxEjectionPercent": self.max_ejection_percent, "maxServerErrors": self.max_server_errors, } @dataclass class PortMapping: port: int protocol: str to_dict = asdict @dataclass class TCPTimeout: idle: Duration def to_dict(self) -> Dict[str, Any]: # type: ignore[misc] return {"idle": self.idle.to_dict()} @dataclass class ProtocolTimeouts: grpc: Optional[Timeout] http: Optional[Timeout] http2: Optional[Timeout] tcp: Optional[TCPTimeout] def to_dict(self) -> Dict[str, Any]: # type: ignore[misc] return clean_dict( { "grpc": (self.grpc or MissingField()).to_dict(), "http": (self.http or MissingField()).to_dict(), "http2": (self.http2 or MissingField()).to_dict(), "tcp": (self.tcp or MissingField()).to_dict(), } ) @dataclass class ListenerTLS: certificate: TLSListenerCertificate mode: str validation: Optional[TLSListenerValidation] def to_dict(self) -> Dict[str, Any]: # type: ignore[misc] return clean_dict( { "certificate": self.certificate.to_dict(), "mode": self.mode, "validation": (self.validation or MissingField()).to_dict(), } ) @dataclass class Listener: connection_pool: Optional[ConnectionPool] health_check: Optional[HealthCheck] outlier_detection: Optional[OutlierDetection] port_mapping: PortMapping timeout: Optional[ProtocolTimeouts] tls: Optional[ListenerTLS] def to_dict(self) -> Dict[str, Any]: # type: ignore[misc] return clean_dict( { "connectionPool": (self.connection_pool or MissingField()).to_dict(), "healthCheck": (self.health_check or MissingField()).to_dict(), "outlierDetection": ( self.outlier_detection or MissingField() ).to_dict(), "portMapping": self.port_mapping.to_dict(), "timeout": (self.timeout or MissingField()).to_dict(), "tls": (self.tls or MissingField()).to_dict(), } ) @dataclass class KeyValue: key: str value: str to_dict = asdict @dataclass class LoggingFormat: json: Optional[List[KeyValue]] text: Optional[str] def to_dict(self) -> Dict[str, Any]: # type: ignore[misc] return clean_dict( {"json": [pair.to_dict() for pair in self.json or []], "text": self.text} ) @dataclass class AccessLogFile: format: Optional[LoggingFormat] path: str def to_dict(self) -> Dict[str, Any]: # type: ignore[misc] return clean_dict( {"format": (self.format or MissingField()).to_dict(), "path": self.path} ) @dataclass class AccessLog: file: Optional[AccessLogFile] def to_dict(self) -> Dict[str, Any]: # type: ignore[misc] return clean_dict({"file": (self.file or MissingField()).to_dict()}) @dataclass class Logging: access_log: Optional[AccessLog] def to_dict(self) -> Dict[str, Any]: # type: ignore[misc] return clean_dict({"accessLog": (self.access_log or MissingField()).to_dict()}) @dataclass class AWSCloudMap: attributes: Optional[List[KeyValue]] ip_preference: Optional[str] namespace_name: str service_name: str def to_dict(self) -> Dict[str, Any]: # type: ignore[misc] return clean_dict( { "attributes": [ attribute.to_dict() for attribute in self.attributes or [] ], "ipPreference": self.ip_preference, "namespaceName": self.namespace_name, "serviceName": self.service_name, } ) @dataclass class DNS: hostname: str ip_preference: Optional[str] response_type: Optional[str] def to_dict(self) -> Dict[str, Any]: # type: ignore[misc] return clean_dict( { "hostname": self.hostname, "ipPreference": self.ip_preference, "responseType": self.response_type, } ) @dataclass class ServiceDiscovery: aws_cloud_map: Optional[AWSCloudMap] dns: Optional[DNS] def to_dict(self) -> Dict[str, Any]: # type: ignore[misc] return clean_dict( { "awsCloudMap": (self.aws_cloud_map or MissingField()).to_dict(), "dns": (self.dns or MissingField()).to_dict(), } ) @dataclass class VirtualNodeSpec: backend_defaults: Optional[BackendDefaults] backends: Optional[List[Backend]] listeners: Optional[List[Listener]] logging: Optional[Logging] service_discovery: Optional[ServiceDiscovery] def to_dict(self) -> Dict[str, Any]: # type: ignore[misc] return clean_dict( { "backendDefaults": (self.backend_defaults or MissingField()).to_dict(), "backends": [backend.to_dict() for backend in self.backends or []], "listeners": [listener.to_dict() for listener in self.listeners or []], "logging": (self.logging or MissingField()).to_dict(), "serviceDiscovery": ( self.service_discovery or MissingField() ).to_dict(), } ) @dataclass class VirtualNodeMetadata(Metadata): mesh_name: str = field(default="") virtual_node_name: str = field(default="") def __post_init__(self) -> None: if self.mesh_name == "": raise TypeError("__init__ missing 1 required argument: 'mesh_name'") if self.mesh_owner == "": raise TypeError("__init__ missing 1 required argument: 'route_name'") if self.virtual_node_name == "": raise TypeError("__init__ missing 1 required argument: 'virtual_node_name'") def formatted_for_list_api(self) -> Dict[str, Any]: # type: ignore return { "arn": self.arn, "createdAt": self.created_at.strftime("%d/%m/%Y, %H:%M:%S"), "lastUpdatedAt": self.last_updated_at.strftime("%d/%m/%Y, %H:%M:%S"), "meshName": self.mesh_name, "meshOwner": self.mesh_owner, "resourceOwner": self.resource_owner, "version": self.version, "virtualNodeName": self.virtual_node_name, } def formatted_for_crud_apis(self) -> Dict[str, Any]: # type: ignore return { "arn": self.arn, "createdAt": self.created_at.strftime("%d/%m/%Y, %H:%M:%S"), "lastUpdatedAt": self.last_updated_at.strftime("%d/%m/%Y, %H:%M:%S"), "meshOwner": self.mesh_owner, "resourceOwner": self.resource_owner, "uid": self.uid, "version": self.version, } @dataclass class VirtualNode: mesh_name: str mesh_owner: str metadata: VirtualNodeMetadata spec: VirtualNodeSpec virtual_node_name: str status: Status = field(default_factory=lambda: {"status": "ACTIVE"}) tags: List[Dict[str, str]] = field(default_factory=list) def to_dict(self) -> Dict[str, Any]: # type: ignore[misc] return clean_dict( { "meshName": self.mesh_name, "metadata": self.metadata.formatted_for_crud_apis(), "spec": self.spec.to_dict(), "status": self.status, "virtualNodeName": self.virtual_node_name, } )
Memory