import weakref from collections import defaultdict from typing import Any, Dict, Iterator, List, Optional from moto.core.common_models import CloudFormationModel from ..exceptions import ( InvalidVPCPeeringConnectionIdError, InvalidVPCPeeringConnectionStateTransitionError, OperationNotPermitted2, OperationNotPermitted3, OperationNotPermitted5, ) from ..utils import random_vpc_peering_connection_id from .core import TaggedEC2Resource from .vpcs import VPC class PeeringConnectionStatus: def __init__( self, accepter_id: str, code: str = "initiating-request", message: str = "" ): self.accepter_id = accepter_id self.code = code self.message = message def deleted(self, deleter_id: str) -> None: self.code = "deleted" self.message = f"Deleted by {deleter_id}" def initiating(self) -> None: self.code = "initiating-request" self.message = f"Initiating Request to {self.accepter_id}" def pending(self) -> None: self.code = "pending-acceptance" self.message = f"Pending Acceptance by {self.accepter_id}" def accept(self) -> None: self.code = "active" self.message = "Active" def reject(self) -> None: self.code = "rejected" self.message = "Inactive" class VPCPeeringConnection(TaggedEC2Resource, CloudFormationModel): DEFAULT_OPTIONS = { "AllowEgressFromLocalClassicLinkToRemoteVpc": "false", "AllowEgressFromLocalVpcToRemoteClassicLink": "false", "AllowDnsResolutionFromRemoteVpc": "false", } def __init__( self, backend: Any, vpc_pcx_id: str, vpc: VPC, peer_vpc: VPC, tags: Optional[Dict[str, str]] = None, ): self.id = vpc_pcx_id self.ec2_backend = backend self.vpc = vpc self.peer_vpc = peer_vpc self.requester_options = self.DEFAULT_OPTIONS.copy() self.accepter_options = self.DEFAULT_OPTIONS.copy() self.add_tags(tags or {}) self._status = PeeringConnectionStatus(accepter_id=peer_vpc.owner_id) @staticmethod def cloudformation_name_type() -> str: return "" @staticmethod def cloudformation_type() -> str: # https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-ec2-vpcpeeringconnection.html return "AWS::EC2::VPCPeeringConnection" @classmethod def create_from_cloudformation_json( # type: ignore[misc] cls, resource_name: str, cloudformation_json: Any, account_id: str, region_name: str, **kwargs: Any, ) -> "VPCPeeringConnection": from ..models import ec2_backends properties = cloudformation_json["Properties"] ec2_backend = ec2_backends[account_id][region_name] vpc = ec2_backend.get_vpc(properties["VpcId"]) peer_vpc = ec2_backend.get_vpc(properties["PeerVpcId"]) vpc_pcx = ec2_backend.create_vpc_peering_connection(vpc, peer_vpc) return vpc_pcx @property def physical_resource_id(self) -> str: return self.id class VPCPeeringConnectionBackend: # for cross region vpc reference vpc_pcx_refs = defaultdict(set) # type: ignore def __init__(self) -> None: self.vpc_pcxs: Dict[str, VPCPeeringConnection] = {} self.vpc_pcx_refs[self.__class__].add(weakref.ref(self)) @classmethod def get_vpc_pcx_refs(cls) -> Iterator[VPCPeeringConnection]: for inst_ref in cls.vpc_pcx_refs[cls]: inst = inst_ref() if inst is not None: yield inst def create_vpc_peering_connection( self, vpc: VPC, peer_vpc: VPC, tags: Optional[Dict[str, str]] = None ) -> VPCPeeringConnection: vpc_pcx_id = random_vpc_peering_connection_id() vpc_pcx = VPCPeeringConnection(self, vpc_pcx_id, vpc, peer_vpc, tags) vpc_pcx._status.pending() self.vpc_pcxs[vpc_pcx_id] = vpc_pcx # insert cross-account/cross-region peering info if vpc.owner_id != peer_vpc.owner_id or vpc.region != peer_vpc.region: for backend in peer_vpc.ec2_backend.get_vpc_pcx_refs(): if ( backend.account_id == peer_vpc.owner_id and backend.region_name == peer_vpc.region ): backend.vpc_pcxs[vpc_pcx_id] = vpc_pcx return vpc_pcx def describe_vpc_peering_connections( self, vpc_peering_ids: Optional[List[str]] = None ) -> List[VPCPeeringConnection]: all_pcxs = list(self.vpc_pcxs.values()) if vpc_peering_ids: return [pcx for pcx in all_pcxs if pcx.id in vpc_peering_ids] return all_pcxs def get_vpc_peering_connection(self, vpc_pcx_id: str) -> VPCPeeringConnection: if vpc_pcx_id not in self.vpc_pcxs: raise InvalidVPCPeeringConnectionIdError(vpc_pcx_id) return self.vpc_pcxs[vpc_pcx_id] def delete_vpc_peering_connection(self, vpc_pcx_id: str) -> VPCPeeringConnection: deleted = self.get_vpc_peering_connection(vpc_pcx_id) deleted._status.deleted(deleter_id=self.account_id) # type: ignore[attr-defined] return deleted def accept_vpc_peering_connection(self, vpc_pcx_id: str) -> VPCPeeringConnection: vpc_pcx = self.get_vpc_peering_connection(vpc_pcx_id) # validate cross-account acceptance req_account_id = vpc_pcx.vpc.owner_id acp_account_id = vpc_pcx.peer_vpc.owner_id if req_account_id != acp_account_id and self.account_id != acp_account_id: # type: ignore[attr-defined] raise OperationNotPermitted5(self.account_id, vpc_pcx_id, "accept") # type: ignore[attr-defined] # validate cross-region acceptance pcx_req_region = vpc_pcx.vpc.region pcx_acp_region = vpc_pcx.peer_vpc.region if pcx_req_region != pcx_acp_region and self.region_name == pcx_req_region: # type: ignore[attr-defined] raise OperationNotPermitted2(self.region_name, vpc_pcx.id, pcx_acp_region) # type: ignore[attr-defined] if vpc_pcx._status.code != "pending-acceptance": raise InvalidVPCPeeringConnectionStateTransitionError(vpc_pcx.id) vpc_pcx._status.accept() return vpc_pcx def reject_vpc_peering_connection(self, vpc_pcx_id: str) -> VPCPeeringConnection: vpc_pcx = self.get_vpc_peering_connection(vpc_pcx_id) # validate cross-account rejection req_account_id = vpc_pcx.vpc.owner_id acp_account_id = vpc_pcx.peer_vpc.owner_id if req_account_id != acp_account_id and self.account_id != acp_account_id: # type: ignore[attr-defined] raise OperationNotPermitted5(self.account_id, vpc_pcx_id, "reject") # type: ignore[attr-defined] # validate cross-region acceptance pcx_req_region = vpc_pcx.vpc.region pcx_acp_region = vpc_pcx.peer_vpc.region if pcx_req_region != pcx_acp_region and self.region_name == pcx_req_region: # type: ignore[attr-defined] raise OperationNotPermitted3(self.region_name, vpc_pcx.id, pcx_acp_region) # type: ignore[attr-defined] if vpc_pcx._status.code != "pending-acceptance": raise InvalidVPCPeeringConnectionStateTransitionError(vpc_pcx.id) vpc_pcx._status.reject() return vpc_pcx def modify_vpc_peering_connection_options( self, vpc_pcx_id: str, accepter_options: Optional[Dict[str, Any]] = None, requester_options: Optional[Dict[str, Any]] = None, ) -> None: vpc_pcx = self.get_vpc_peering_connection(vpc_pcx_id) if not vpc_pcx: raise InvalidVPCPeeringConnectionIdError(vpc_pcx_id) # TODO: check if actual vpc has this options enabled if accepter_options: vpc_pcx.accepter_options.update(accepter_options) if requester_options: vpc_pcx.requester_options.update(requester_options)
Memory