from collections import OrderedDict from typing import Any, Dict, List, Optional from moto.core.base_backend import BackendDict, BaseBackend from moto.core.common_models import BaseModel from moto.mediaconnect.exceptions import NotFoundException from moto.moto_api._internal import mock_random as random from moto.utilities.tagging_service import TaggingService from moto.utilities.utils import get_partition class Flow(BaseModel): def __init__(self, account_id: str, region_name: str, **kwargs: Any): self.id = random.uuid4().hex self.availability_zone = kwargs.get("availability_zone") self.entitlements = kwargs.get("entitlements", []) self.name = kwargs.get("name") self.outputs = kwargs.get("outputs", []) self.source = kwargs.get("source", {}) self.source_failover_config = kwargs.get("source_failover_config", {}) self.sources = kwargs.get("sources", []) self.vpc_interfaces = kwargs.get("vpc_interfaces", []) self.status: Optional[str] = ( "STANDBY" # one of 'STANDBY'|'ACTIVE'|'UPDATING'|'DELETING'|'STARTING'|'STOPPING'|'ERROR' ) self._previous_status: Optional[str] = None self.description = "A Moto test flow" self.flow_arn = f"arn:{get_partition(region_name)}:mediaconnect:{region_name}:{account_id}:flow:{self.id}:{self.name}" self.egress_ip = "127.0.0.1" self.maintenance = kwargs.get("maintenance", {}) if self.source and not self.sources: self.sources = [ self.source, ] def to_dict(self, include: Optional[List[str]] = None) -> Dict[str, Any]: data = { "availabilityZone": self.availability_zone, "description": self.description, "egressIp": self.egress_ip, "entitlements": self.entitlements, "flowArn": self.flow_arn, "name": self.name, "outputs": self.outputs, "source": self.source, "sourceFailoverConfig": self.source_failover_config, "sources": self.sources, "status": self.status, "vpcInterfaces": self.vpc_interfaces, } if self.maintenance: data["maintenance"] = self.maintenance if include: new_data = {k: v for k, v in data.items() if k in include} if "sourceType" in include: new_data["sourceType"] = "OWNED" return new_data return data def resolve_transient_states(self) -> None: if self.status in ["STARTING"]: self.status = "ACTIVE" if self.status in ["STOPPING"]: self.status = "STANDBY" if self.status in ["UPDATING"]: self.status = self._previous_status self._previous_status = None class MediaConnectBackend(BaseBackend): def __init__(self, region_name: str, account_id: str): super().__init__(region_name, account_id) self._flows: Dict[str, Flow] = OrderedDict() self.tagger = TaggingService() def _add_source_details( self, source: Optional[Dict[str, Any]], flow_id: str, ingest_ip: str = "127.0.0.1", ) -> None: if source: source["sourceArn"] = ( f"arn:{get_partition(self.region_name)}:mediaconnect:{self.region_name}:{self.account_id}:source" f":{flow_id}:{source['name']}" ) if not source.get("entitlementArn"): source["ingestIp"] = ingest_ip def _add_entitlement_details( self, entitlement: Optional[Dict[str, Any]], entitlement_id: str ) -> None: if entitlement: entitlement["entitlementArn"] = ( f"arn:{get_partition(self.region_name)}:mediaconnect:{self.region_name}" f":{self.account_id}:entitlement:{entitlement_id}" f":{entitlement['name']}" ) def _create_flow_add_details(self, flow: Flow) -> None: for index, _source in enumerate(flow.sources): self._add_source_details(_source, flow.id, f"127.0.0.{index}") for index, output in enumerate(flow.outputs or []): if output.get("protocol") in ["srt-listener", "zixi-pull"]: output["listenerAddress"] = f"{index}.0.0.0" output_id = random.uuid4().hex arn = ( f"arn:{get_partition(self.region_name)}:mediaconnect:{self.region_name}" f":{self.account_id}:output:{output_id}:{output['name']}" ) output["outputArn"] = arn for _, entitlement in enumerate(flow.entitlements): entitlement_id = random.uuid4().hex self._add_entitlement_details(entitlement, entitlement_id) def create_flow( self, availability_zone: str, entitlements: List[Dict[str, Any]], name: str, outputs: List[Dict[str, Any]], source: Dict[str, Any], source_failover_config: Dict[str, Any], sources: List[Dict[str, Any]], vpc_interfaces: List[Dict[str, Any]], maintenance: Optional[List[Dict[str, Any]]] = None, ) -> Flow: flow = Flow( account_id=self.account_id, region_name=self.region_name, availability_zone=availability_zone, entitlements=entitlements, name=name, outputs=outputs, source=source, source_failover_config=source_failover_config, sources=sources, vpc_interfaces=vpc_interfaces, maintenance=maintenance, ) self._create_flow_add_details(flow) self._flows[flow.flow_arn] = flow return flow def list_flows(self, max_results: Optional[int]) -> List[Dict[str, Any]]: """ Pagination is not yet implemented """ flows = list(self._flows.values()) if max_results is not None: flows = flows[:max_results] return [ fl.to_dict( include=[ "availabilityZone", "description", "flowArn", "name", "sourceType", "status", ] ) for fl in flows ] def describe_flow(self, flow_arn: str) -> Flow: if flow_arn in self._flows: flow = self._flows[flow_arn] flow.resolve_transient_states() return flow raise NotFoundException(message="Flow not found.") def delete_flow(self, flow_arn: str) -> Flow: if flow_arn in self._flows: return self._flows.pop(flow_arn) raise NotFoundException(message="Flow not found.") def start_flow(self, flow_arn: str) -> Flow: if flow_arn in self._flows: flow = self._flows[flow_arn] flow.status = "STARTING" return flow raise NotFoundException(message="Flow not found.") def stop_flow(self, flow_arn: str) -> Flow: if flow_arn in self._flows: flow = self._flows[flow_arn] flow.status = "STOPPING" return flow raise NotFoundException(message="Flow not found.") def tag_resource(self, resource_arn: str, tags: Dict[str, Any]) -> None: tag_list = TaggingService.convert_dict_to_tags_input(tags) self.tagger.tag_resource(resource_arn, tag_list) def list_tags_for_resource(self, resource_arn: str) -> Dict[str, str]: if self.tagger.has_tags(resource_arn): return self.tagger.get_tag_dict_for_resource(resource_arn) raise NotFoundException(message="Resource not found.") def add_flow_vpc_interfaces( self, flow_arn: str, vpc_interfaces: List[Dict[str, Any]] ) -> Flow: if flow_arn in self._flows: flow = self._flows[flow_arn] flow.vpc_interfaces = vpc_interfaces return flow raise NotFoundException(message=f"flow with arn={flow_arn} not found") def add_flow_outputs(self, flow_arn: str, outputs: List[Dict[str, Any]]) -> Flow: if flow_arn in self._flows: flow = self._flows[flow_arn] flow.outputs = outputs return flow raise NotFoundException(message=f"flow with arn={flow_arn} not found") def remove_flow_vpc_interface(self, flow_arn: str, vpc_interface_name: str) -> None: if flow_arn in self._flows: flow = self._flows[flow_arn] flow.vpc_interfaces = [ vpc_interface for vpc_interface in self._flows[flow_arn].vpc_interfaces if vpc_interface["name"] != vpc_interface_name ] else: raise NotFoundException(message=f"flow with arn={flow_arn} not found") def remove_flow_output(self, flow_arn: str, output_name: str) -> None: if flow_arn in self._flows: flow = self._flows[flow_arn] flow.outputs = [ output for output in self._flows[flow_arn].outputs if output["name"] != output_name ] else: raise NotFoundException(message=f"flow with arn={flow_arn} not found") def update_flow_output( self, flow_arn: str, output_arn: str, cidr_allow_list: List[str], description: str, destination: str, encryption: Dict[str, str], max_latency: int, media_stream_output_configuration: List[Dict[str, Any]], min_latency: int, port: int, protocol: str, remote_id: str, sender_control_port: int, sender_ip_address: str, smoothing_latency: int, stream_id: str, vpc_interface_attachment: Dict[str, str], ) -> Dict[str, Any]: if flow_arn not in self._flows: raise NotFoundException(message=f"flow with arn={flow_arn} not found") flow = self._flows[flow_arn] for output in flow.outputs: if output["outputArn"] == output_arn: output["cidrAllowList"] = cidr_allow_list output["description"] = description output["destination"] = destination output["encryption"] = encryption output["maxLatency"] = max_latency output["mediaStreamOutputConfiguration"] = ( media_stream_output_configuration ) output["minLatency"] = min_latency output["port"] = port output["protocol"] = protocol output["remoteId"] = remote_id output["senderControlPort"] = sender_control_port output["senderIpAddress"] = sender_ip_address output["smoothingLatency"] = smoothing_latency output["streamId"] = stream_id output["vpcInterfaceAttachment"] = vpc_interface_attachment return output raise NotFoundException(message=f"output with arn={output_arn} not found") def add_flow_sources( self, flow_arn: str, sources: List[Dict[str, Any]] ) -> List[Dict[str, Any]]: if flow_arn not in self._flows: raise NotFoundException(message=f"flow with arn={flow_arn} not found") flow = self._flows[flow_arn] for source in sources: source_id = random.uuid4().hex name = source["name"] arn = f"arn:{get_partition(self.region_name)}:mediaconnect:{self.region_name}:{self.account_id}:source:{source_id}:{name}" source["sourceArn"] = arn flow.sources = sources return sources def update_flow_source( self, flow_arn: str, source_arn: str, decryption: str, description: str, entitlement_arn: str, ingest_port: int, max_bitrate: int, max_latency: int, max_sync_buffer: int, media_stream_source_configurations: List[Dict[str, Any]], min_latency: int, protocol: str, sender_control_port: int, sender_ip_address: str, stream_id: str, vpc_interface_name: str, whitelist_cidr: str, ) -> Optional[Dict[str, Any]]: if flow_arn not in self._flows: raise NotFoundException(message=f"flow with arn={flow_arn} not found") flow = self._flows[flow_arn] source: Optional[Dict[str, Any]] = next( iter( [source for source in flow.sources if source["sourceArn"] == source_arn] ), {}, ) if source: source["decryption"] = decryption source["description"] = description source["entitlementArn"] = entitlement_arn source["ingestPort"] = ingest_port source["maxBitrate"] = max_bitrate source["maxLatency"] = max_latency source["maxSyncBuffer"] = max_sync_buffer source["mediaStreamSourceConfigurations"] = ( media_stream_source_configurations ) source["minLatency"] = min_latency source["protocol"] = protocol source["senderControlPort"] = sender_control_port source["senderIpAddress"] = sender_ip_address source["streamId"] = stream_id source["vpcInterfaceName"] = vpc_interface_name source["whitelistCidr"] = whitelist_cidr return source def grant_flow_entitlements( self, flow_arn: str, entitlements: List[Dict[str, Any]], ) -> List[Dict[str, Any]]: if flow_arn not in self._flows: raise NotFoundException(message=f"flow with arn={flow_arn} not found") flow = self._flows[flow_arn] for entitlement in entitlements: entitlement_id = random.uuid4().hex name = entitlement["name"] arn = f"arn:{get_partition(self.region_name)}:mediaconnect:{self.region_name}:{self.account_id}:entitlement:{entitlement_id}:{name}" entitlement["entitlementArn"] = arn flow.entitlements += entitlements return entitlements def revoke_flow_entitlement(self, flow_arn: str, entitlement_arn: str) -> None: if flow_arn not in self._flows: raise NotFoundException(message=f"flow with arn={flow_arn} not found") flow = self._flows[flow_arn] for entitlement in flow.entitlements: if entitlement_arn == entitlement["entitlementArn"]: flow.entitlements.remove(entitlement) return raise NotFoundException( message=f"entitlement with arn={entitlement_arn} not found" ) def update_flow_entitlement( self, flow_arn: str, entitlement_arn: str, description: str, encryption: Dict[str, str], entitlement_status: str, name: str, subscribers: List[str], ) -> Dict[str, Any]: if flow_arn not in self._flows: raise NotFoundException(message=f"flow with arn={flow_arn} not found") flow = self._flows[flow_arn] for entitlement in flow.entitlements: if entitlement_arn == entitlement["entitlementArn"]: entitlement["description"] = description entitlement["encryption"] = encryption entitlement["entitlementStatus"] = entitlement_status entitlement["name"] = name entitlement["subscribers"] = subscribers return entitlement raise NotFoundException( message=f"entitlement with arn={entitlement_arn} not found" ) mediaconnect_backends = BackendDict(MediaConnectBackend, "mediaconnect")
Memory