from typing import Any, Dict, List, Optional from moto.core.common_models import CloudFormationModel from ..exceptions import ( FilterNotImplementedError, InvalidAddressError, InvalidAllocationIdError, InvalidAssociationIdError, ResourceAlreadyAssociatedError, ) from ..utils import ( generic_filter, random_eip_allocation_id, random_eip_association_id, random_ip, ) from .core import TaggedEC2Resource class ElasticAddress(TaggedEC2Resource, CloudFormationModel): def __init__( self, ec2_backend: Any, domain: str, address: Optional[str] = None, tags: Optional[Dict[str, str]] = None, ): self.ec2_backend = ec2_backend if address: self.public_ip = address else: self.public_ip = random_ip() self.allocation_id = random_eip_allocation_id() if domain == "vpc" else None self.id = self.allocation_id self.domain = domain self.instance = None self.eni = None self.association_id: Optional[str] = None self.add_tags(tags or {}) @staticmethod def cloudformation_name_type() -> str: return "" @staticmethod def cloudformation_type() -> str: # https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-properties-ec2-eip.html return "AWS::EC2::EIP" @classmethod def create_from_cloudformation_json( # type: ignore[misc] cls, resource_name: str, cloudformation_json: Any, account_id: str, region_name: str, **kwargs: Any, ) -> "ElasticAddress": from ..models import ec2_backends ec2_backend = ec2_backends[account_id][region_name] properties = cloudformation_json.get("Properties") instance_id = None if properties: domain = properties.get("Domain") # TODO: support tags from cloudformation template eip = ec2_backend.allocate_address(domain=domain if domain else "standard") instance_id = properties.get("InstanceId") else: eip = ec2_backend.allocate_address(domain="standard") if instance_id: instance = ec2_backend.get_instance_by_id(instance_id) ec2_backend.associate_address(instance, address=eip.public_ip) return eip @property def physical_resource_id(self) -> str: return self.public_ip @classmethod def has_cfn_attr(cls, attr: str) -> bool: return attr in ["AllocationId"] def get_cfn_attribute(self, attribute_name: str) -> Any: from moto.cloudformation.exceptions import UnformattedGetAttTemplateException if attribute_name == "AllocationId": return self.allocation_id raise UnformattedGetAttTemplateException() def get_filter_value( self, filter_name: str, method_name: Optional[str] = None ) -> Any: if filter_name == "allocation-id": return self.allocation_id elif filter_name == "association-id": return self.association_id elif filter_name == "domain": return self.domain elif filter_name == "instance-id": if self.instance: return self.instance.id return None elif filter_name == "network-interface-id": if self.eni: return self.eni.id return None elif filter_name == "private-ip-address": if self.eni: return self.eni.private_ip_address return None elif filter_name == "public-ip": return self.public_ip elif filter_name == "network-interface-owner-id": # TODO: implement network-interface-owner-id raise FilterNotImplementedError(filter_name, "DescribeAddresses") else: return super().get_filter_value(filter_name, "DescribeAddresses") class ElasticAddressBackend: def __init__(self) -> None: self.addresses: List[ElasticAddress] = [] def allocate_address( self, domain: str, address: Optional[str] = None, tags: Optional[Dict[str, str]] = None, ) -> ElasticAddress: if domain not in ["standard", "vpc"]: domain = "vpc" if address: ea = ElasticAddress(self, domain=domain, address=address, tags=tags) else: ea = ElasticAddress(self, domain=domain, tags=tags) self.addresses.append(ea) return ea def address_by_ip( self, ips: List[str], fail_if_not_found: bool = True ) -> List[ElasticAddress]: eips = [ address for address in self.addresses.copy() if address.public_ip in ips ] # TODO: Trim error message down to specific invalid address. if (not eips or len(ips) > len(eips)) and fail_if_not_found: raise InvalidAddressError(ips) return eips def address_by_allocation(self, allocation_ids: List[str]) -> List[ElasticAddress]: eips = [ address for address in self.addresses if address.allocation_id in allocation_ids ] # TODO: Trim error message down to specific invalid id. if not eips or len(allocation_ids) > len(eips): raise InvalidAllocationIdError(allocation_ids) return eips def address_by_association( self, association_ids: List[str] ) -> List[ElasticAddress]: eips = [ address for address in self.addresses if address.association_id in association_ids ] # TODO: Trim error message down to specific invalid id. if not eips or len(association_ids) > len(eips): raise InvalidAssociationIdError(association_ids) return eips def associate_address( self, instance: Any = None, eni: Any = None, address: Optional[str] = None, allocation_id: Optional[str] = None, reassociate: bool = False, ) -> ElasticAddress: eips = [] if address: eips = self.address_by_ip([address]) elif allocation_id: eips = self.address_by_allocation([allocation_id]) eip = eips[0] new_instance_association = bool( instance and (not eip.instance or eip.instance.id == instance.id) ) new_eni_association = bool(eni and (not eip.eni or eni.id == eip.eni.id)) if new_instance_association or new_eni_association or reassociate: eip.instance = instance eip.eni = eni if not eip.eni and instance: # default to primary network interface eip.eni = instance.nics[0] if eip.eni: eip.eni.public_ip = eip.public_ip if eip.domain == "vpc": eip.association_id = random_eip_association_id() return eip raise ResourceAlreadyAssociatedError(eip.public_ip) def describe_addresses( self, allocation_ids: Optional[List[str]] = None, public_ips: Optional[List[str]] = None, filters: Any = None, ) -> List[ElasticAddress]: matches = self.addresses.copy() if allocation_ids: matches = [addr for addr in matches if addr.allocation_id in allocation_ids] if len(allocation_ids) > len(matches): unknown_ids = set(allocation_ids) - set(matches) # type: ignore[arg-type] raise InvalidAllocationIdError(unknown_ids) if public_ips: matches = [addr for addr in matches if addr.public_ip in public_ips] if len(public_ips) > len(matches): unknown_ips = set(public_ips) - set(matches) # type: ignore[arg-type] raise InvalidAddressError(unknown_ips) if filters: matches = generic_filter(filters, matches) return matches def describe_addresses_attribute( self, allocation_ids: Optional[List[str]] = None ) -> List[ElasticAddress]: return self.describe_addresses(allocation_ids) def disassociate_address( self, address: Optional[str] = None, association_id: Optional[str] = None ) -> None: eips = [] if address: eips = self.address_by_ip([address]) elif association_id: eips = self.address_by_association([association_id]) eip = eips[0] if eip.eni: eip.eni.public_ip = None if eip.eni.instance and eip.eni.instance._state.name == "running": eip.eni.check_auto_public_ip() eip.eni = None eip.instance = None eip.association_id = None def release_address( self, address: Optional[str] = None, allocation_id: Optional[str] = None ) -> None: eips = [] if address: eips = self.address_by_ip([address]) elif allocation_id: eips = self.address_by_allocation([allocation_id]) eip = eips[0] self.disassociate_address(address=eip.public_ip) eip.allocation_id = None self.addresses.remove(eip)
Memory