import ipaddress from typing import Any, Dict, List, Optional, Set from moto.core.common_models import CloudFormationModel from moto.ec2.models.carrier_gateways import CarrierGateway from moto.ec2.models.elastic_network_interfaces import NetworkInterface from moto.ec2.models.instances import Instance from moto.ec2.models.internet_gateways import EgressOnlyInternetGateway from moto.ec2.models.managed_prefixes import ManagedPrefixList from moto.ec2.models.nat_gateways import NatGateway from moto.ec2.models.transit_gateway import TransitGateway from moto.ec2.models.vpc_peering_connections import VPCPeeringConnection from moto.ec2.models.vpn_gateway import VpnGateway from ..exceptions import ( DependencyViolationError, InvalidAssociationIdError, InvalidDestinationCIDRBlockParameterError, InvalidParameterValueErrorReplaceRoute, InvalidRouteError, InvalidRouteTableIdError, RouteAlreadyExistsError, RouteNotSupportedError, ) from ..utils import ( EC2_RESOURCE_TO_PREFIX, generate_route_id, generic_filter, random_route_table_id, random_subnet_association_id, ) from .core import TaggedEC2Resource class RouteTable(TaggedEC2Resource, CloudFormationModel): def __init__( self, ec2_backend: Any, route_table_id: str, vpc_id: str, main: bool = False ): self.ec2_backend = ec2_backend self.id = route_table_id self.vpc_id = vpc_id self.main_association_id = random_subnet_association_id() if main else None self.associations: Dict[str, str] = {} self.routes: Dict[str, Route] = {} @property def owner_id(self) -> str: return self.ec2_backend.account_id @staticmethod def cloudformation_name_type() -> str: return "" @staticmethod def cloudformation_type() -> str: # https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-ec2-routetable.html return "AWS::EC2::RouteTable" @classmethod def create_from_cloudformation_json( # type: ignore[misc] cls, resource_name: str, cloudformation_json: Any, account_id: str, region_name: str, **kwargs: Any, ) -> "RouteTable": from ..models import ec2_backends properties = cloudformation_json["Properties"] vpc_id = properties["VpcId"] ec2_backend = ec2_backends[account_id][region_name] route_table = ec2_backend.create_route_table(vpc_id=vpc_id) return route_table @property def physical_resource_id(self) -> str: return self.id def get_filter_value( self, filter_name: str, method_name: Optional[str] = None ) -> Any: if filter_name == "association.main": # Note: Boto only supports 'true'. # https://github.com/boto/boto/issues/1742 if self.main_association_id is not None: return "true" else: return "false" elif filter_name == "route-table-id": return self.id elif filter_name == "vpc-id": return self.vpc_id elif filter_name == "association.route-table-id": return self.id elif filter_name == "association.route-table-association-id": return self.all_associations_ids elif filter_name == "association.subnet-id": return self.associations.values() elif filter_name == "route.destination-cidr-block": return [ route.destination_cidr_block for route in self.routes.values() if route.destination_cidr_block is not None ] elif filter_name == "route.gateway-id": return [ route.gateway.id for route in self.routes.values() if route.gateway is not None ] elif filter_name == "route.vpc-peering-connection-id": return [ route.vpc_pcx.id for route in self.routes.values() if route.vpc_pcx is not None ] elif filter_name == "route.nat-gateway-id": return [ route.nat_gateway.id for route in self.routes.values() if route.nat_gateway is not None ] elif filter_name == "route.transit-gateway-id": return [ route.transit_gateway.id for route in self.routes.values() if route.transit_gateway is not None ] else: return super().get_filter_value(filter_name, "DescribeRouteTables") @property def all_associations_ids(self) -> Set[str]: # NOTE(yoctozepto): Doing an explicit copy to not touch the original. all_associations = set(self.associations) if self.main_association_id is not None: all_associations.add(self.main_association_id) return all_associations class Route(CloudFormationModel): def __init__( self, route_table: RouteTable, destination_cidr_block: Optional[str], destination_ipv6_cidr_block: Optional[str], destination_prefix_list: Optional[ManagedPrefixList] = None, local: bool = False, gateway: Optional[VpnGateway] = None, instance: Optional[Instance] = None, nat_gateway: Optional[NatGateway] = None, egress_only_igw: Optional[EgressOnlyInternetGateway] = None, transit_gateway: Optional[TransitGateway] = None, interface: Optional[NetworkInterface] = None, vpc_pcx: Optional[VPCPeeringConnection] = None, carrier_gateway: Optional[CarrierGateway] = None, vpc_endpoint_id: Optional[str] = None, ): self.id = generate_route_id( route_table.id, destination_cidr_block, destination_ipv6_cidr_block, destination_prefix_list.id if destination_prefix_list else None, ) self.route_table = route_table self.destination_cidr_block = destination_cidr_block self.destination_ipv6_cidr_block = destination_ipv6_cidr_block self.destination_prefix_list = destination_prefix_list self.local = local self.gateway = gateway self.instance = instance self.nat_gateway = nat_gateway self.egress_only_igw = egress_only_igw self.transit_gateway = transit_gateway self.interface = interface self.vpc_pcx = vpc_pcx self.carrier_gateway = carrier_gateway self.vpc_endpoint_id = vpc_endpoint_id @property def physical_resource_id(self) -> str: return self.id @staticmethod def cloudformation_name_type() -> str: return "" @staticmethod def cloudformation_type() -> str: # https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-ec2-route.html return "AWS::EC2::Route" @classmethod def create_from_cloudformation_json( # type: ignore[misc] cls, resource_name: str, cloudformation_json: Any, account_id: str, region_name: str, **kwargs: str, ) -> "Route": from ..models import ec2_backends properties = cloudformation_json["Properties"] gateway_id = properties.get("GatewayId") instance_id = properties.get("InstanceId") interface_id = properties.get("NetworkInterfaceId") nat_gateway_id = properties.get("NatGatewayId") egress_only_igw_id = properties.get("EgressOnlyInternetGatewayId") transit_gateway_id = properties.get("TransitGatewayId") pcx_id = properties.get("VpcPeeringConnectionId") route_table_id = properties["RouteTableId"] ec2_backend = ec2_backends[account_id][region_name] route = ec2_backend.create_route( route_table_id=route_table_id, destination_cidr_block=properties.get("DestinationCidrBlock"), gateway_id=gateway_id, instance_id=instance_id, nat_gateway_id=nat_gateway_id, egress_only_igw_id=egress_only_igw_id, transit_gateway_id=transit_gateway_id, interface_id=interface_id, vpc_peering_connection_id=pcx_id, ) return route class RouteBackend: def __init__(self) -> None: self.route_tables: Dict[str, RouteTable] = {} def create_route_table( self, vpc_id: str, tags: Optional[List[Dict[str, str]]] = None, main: bool = False, ) -> RouteTable: route_table_id = random_route_table_id() vpc = self.get_vpc(vpc_id) # type: ignore[attr-defined] # Validate VPC exists route_table = RouteTable(self, route_table_id, vpc_id, main=main) for tag in tags or []: route_table.add_tag(tag["Key"], tag["Value"]) self.route_tables[route_table_id] = route_table # creating default routes for ipv4 cirds ipv4_cidrs = vpc.get_cidr_block_association_set(ipv6=False) for ipv4_cidr in ipv4_cidrs: self.create_route(route_table_id, ipv4_cidr.get("cidr_block"), local=True) # creating default routes for ipv6 cidrs ipv6_cidrs = vpc.get_cidr_block_association_set(ipv6=True) for ipv6_cidr in ipv6_cidrs: self.create_route( route_table_id, destination_cidr_block=None, local=True, destination_ipv6_cidr_block=ipv6_cidr.get("cidr_block"), ) return route_table def get_route_table(self, route_table_id: str) -> RouteTable: route_table = self.route_tables.get(route_table_id, None) if not route_table: raise InvalidRouteTableIdError(route_table_id) return route_table def describe_route_tables( self, route_table_ids: Optional[List[str]] = None, filters: Any = None ) -> List[RouteTable]: route_tables = list(self.route_tables.values()) if route_table_ids: route_tables = [ route_table for route_table in route_tables if route_table.id in route_table_ids ] if len(route_tables) != len(route_table_ids): invalid_id = list( set(route_table_ids).difference( set([route_table.id for route_table in route_tables]) ) )[0] raise InvalidRouteTableIdError(invalid_id) return generic_filter(filters, route_tables) def delete_route_table(self, route_table_id: str) -> None: route_table = self.get_route_table(route_table_id) if route_table.associations: raise DependencyViolationError( f"The routeTable '{route_table_id}' has dependencies and cannot be deleted." ) self.route_tables.pop(route_table_id) def associate_route_table( self, route_table_id: str, gateway_id: Optional[str] = None, subnet_id: Optional[str] = None, ) -> str: # Idempotent if association already exists. route_tables_by_subnet = self.describe_route_tables( filters={"association.subnet-id": [subnet_id]} ) if route_tables_by_subnet: for association_id, check_subnet_id in route_tables_by_subnet[ 0 ].associations.items(): if subnet_id == check_subnet_id: return association_id # Association does not yet exist, so create it. route_table = self.get_route_table(route_table_id) if gateway_id is None: self.get_subnet(subnet_id) # type: ignore[attr-defined] # Validate subnet exists association_id = random_subnet_association_id() route_table.associations[association_id] = subnet_id # type: ignore[assignment] return association_id else: association_id = random_subnet_association_id() route_table.associations[association_id] = gateway_id return association_id def disassociate_route_table(self, association_id: str) -> Optional[str]: for route_table in self.route_tables.values(): if association_id in route_table.associations: return route_table.associations.pop(association_id, None) raise InvalidAssociationIdError(association_id) def replace_route_table_association( self, association_id: str, route_table_id: str ) -> str: # Idempotent if association already exists. new_route_table = self.get_route_table(route_table_id) if association_id in new_route_table.all_associations_ids: return association_id # Find route table which currently has the association, error if none. route_tables_by_association_id = self.describe_route_tables( filters={"association.route-table-association-id": [association_id]} ) if not route_tables_by_association_id: raise InvalidAssociationIdError(association_id) previous_route_table = route_tables_by_association_id[0] # Remove existing association, create new one. new_association_id = random_subnet_association_id() if previous_route_table.main_association_id == association_id: previous_route_table.main_association_id = None new_route_table.main_association_id = new_association_id else: association_target_id = previous_route_table.associations.pop( association_id ) new_route_table.associations[new_association_id] = association_target_id return new_association_id def create_route( self, route_table_id: str, destination_cidr_block: Optional[str], destination_ipv6_cidr_block: Optional[str] = None, destination_prefix_list_id: Optional[str] = None, local: bool = False, gateway_id: Optional[str] = None, instance_id: Optional[str] = None, nat_gateway_id: Optional[str] = None, egress_only_igw_id: Optional[str] = None, transit_gateway_id: Optional[str] = None, interface_id: Optional[str] = None, vpc_peering_connection_id: Optional[str] = None, carrier_gateway_id: Optional[str] = None, vpc_endpoint_id: Optional[str] = None, ) -> Route: gateway = None nat_gateway = None transit_gateway = None egress_only_igw = None interface = None destination_prefix_list = None carrier_gateway = None if vpc_endpoint_id: vpce = self.describe_vpc_endpoints(vpc_end_point_ids=[vpc_endpoint_id]) # type: ignore[attr-defined] if not vpce[0].endpoint_type == "GatewayLoadBalancer": # https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/ec2/client/create_route.html # VpcEndpointId (string) – The ID of a VPC endpoint. Supported for Gateway Load Balancer endpoints only. raise RouteNotSupportedError(vpc_endpoint_id) route_table = self.get_route_table(route_table_id) if interface_id: # for validating interface Id whether it is valid or not. interface = self.get_network_interface(interface_id) # type: ignore[attr-defined] else: if gateway_id: if EC2_RESOURCE_TO_PREFIX["vpn-gateway"] in gateway_id: gateway = self.get_vpn_gateway(gateway_id) # type: ignore[attr-defined] elif EC2_RESOURCE_TO_PREFIX["internet-gateway"] in gateway_id: gateway = self.get_internet_gateway(gateway_id) # type: ignore[attr-defined] elif EC2_RESOURCE_TO_PREFIX["vpc-endpoint"] in gateway_id: gateway = self.get_vpc_end_point(gateway_id) # type: ignore[attr-defined] if destination_cidr_block: self.__validate_destination_cidr_block( destination_cidr_block, route_table ) if nat_gateway_id is not None: nat_gateway = self.nat_gateways.get(nat_gateway_id) # type: ignore[attr-defined] if egress_only_igw_id is not None: egress_only_igw = self.get_egress_only_igw(egress_only_igw_id) # type: ignore[attr-defined] if transit_gateway_id is not None: transit_gateway = self.transit_gateways.get(transit_gateway_id) # type: ignore[attr-defined] if destination_prefix_list_id is not None: destination_prefix_list = self.managed_prefix_lists.get( # type: ignore[attr-defined] destination_prefix_list_id ) if carrier_gateway_id is not None: carrier_gateway = self.carrier_gateways.get(carrier_gateway_id) # type: ignore[attr-defined] route = Route( route_table, destination_cidr_block, destination_ipv6_cidr_block, destination_prefix_list, local=local, gateway=gateway, instance=self.get_instance(instance_id) if instance_id else None, # type: ignore[attr-defined] nat_gateway=nat_gateway, egress_only_igw=egress_only_igw, transit_gateway=transit_gateway, interface=interface, carrier_gateway=carrier_gateway, vpc_endpoint_id=vpc_endpoint_id, vpc_pcx=self.get_vpc_peering_connection(vpc_peering_connection_id) # type: ignore[attr-defined] if vpc_peering_connection_id else None, ) route_table.routes[route.id] = route return route def replace_route( self, route_table_id: str, destination_cidr_block: str, destination_ipv6_cidr_block: Optional[str] = None, destination_prefix_list_id: Optional[str] = None, nat_gateway_id: Optional[str] = None, egress_only_igw_id: Optional[str] = None, transit_gateway_id: Optional[str] = None, gateway_id: Optional[str] = None, instance_id: Optional[str] = None, interface_id: Optional[str] = None, vpc_peering_connection_id: Optional[str] = None, ) -> Route: cidr = destination_cidr_block if destination_ipv6_cidr_block: cidr = destination_ipv6_cidr_block if destination_prefix_list_id: cidr = destination_prefix_list_id route_table = self.get_route_table(route_table_id) route_id = generate_route_id(route_table.id, cidr, destination_ipv6_cidr_block) try: route = route_table.routes[route_id] except KeyError: # This should be 'raise InvalidRouteError(route_table_id, cidr)' in # line with the delete_route() equivalent, but for some reason AWS # returns InvalidParameterValue instead in this case. raise InvalidParameterValueErrorReplaceRoute(cidr) from None route.gateway = None route.nat_gateway = None route.egress_only_igw = None route.transit_gateway = None if gateway_id: if EC2_RESOURCE_TO_PREFIX["vpn-gateway"] in gateway_id: route.gateway = self.get_vpn_gateway(gateway_id) # type: ignore[attr-defined] elif EC2_RESOURCE_TO_PREFIX["internet-gateway"] in gateway_id: route.gateway = self.get_internet_gateway(gateway_id) # type: ignore[attr-defined] if nat_gateway_id is not None: route.nat_gateway = self.nat_gateways.get(nat_gateway_id) # type: ignore[attr-defined] if egress_only_igw_id is not None: route.egress_only_igw = self.get_egress_only_igw(egress_only_igw_id) # type: ignore[attr-defined] if transit_gateway_id is not None: route.transit_gateway = self.transit_gateways.get(transit_gateway_id) # type: ignore[attr-defined] if destination_prefix_list_id is not None: route.prefix_list = self.managed_prefix_lists.get( # type: ignore[attr-defined] destination_prefix_list_id ) route.instance = self.get_instance(instance_id) if instance_id else None # type: ignore[attr-defined] route.interface = ( self.get_network_interface(interface_id) if interface_id else None # type: ignore[attr-defined] ) route.vpc_pcx = ( self.get_vpc_peering_connection(vpc_peering_connection_id) # type: ignore[attr-defined] if vpc_peering_connection_id else None ) route_table.routes[route.id] = route return route def delete_route( self, route_table_id: str, destination_cidr_block: str, destination_ipv6_cidr_block: Optional[str] = None, destination_prefix_list_id: Optional[str] = None, ) -> Route: cidr = destination_cidr_block route_table = self.get_route_table(route_table_id) if destination_ipv6_cidr_block: cidr = destination_ipv6_cidr_block if destination_prefix_list_id: cidr = destination_prefix_list_id route_id = generate_route_id(route_table_id, cidr) deleted = route_table.routes.pop(route_id, None) if not deleted: raise InvalidRouteError(route_table_id, cidr) return deleted def __validate_destination_cidr_block( self, destination_cidr_block: str, route_table: RouteTable ) -> None: """ Utility function to check the destination CIDR block Will validate the format and check for overlap with existing routes """ try: ip_v4_network = ipaddress.IPv4Network( str(destination_cidr_block), strict=False ) except ValueError: raise InvalidDestinationCIDRBlockParameterError(destination_cidr_block) if not route_table.routes: return for route in route_table.routes.values(): if not route.destination_cidr_block: continue if not route.local and ip_v4_network == ipaddress.IPv4Network( str(route.destination_cidr_block) ): raise RouteAlreadyExistsError(destination_cidr_block)
Memory