import re from copy import copy from datetime import datetime, timezone from os import getenv from typing import Any, Dict, Iterator, List, Optional, Tuple from moto import settings from moto.core.base_backend import BackendDict, BaseBackend from moto.core.common_models import BaseModel, CloudFormationModel from moto.core.exceptions import JsonRESTError from moto.core.utils import pascal_to_camelcase, remap_nested_keys, unix_time from moto.ec2 import ec2_backends from moto.moto_api._internal import mock_random from moto.moto_api._internal.managed_state_model import ManagedState from moto.utilities.utils import ARN_PARTITION_REGEX, get_partition from ..ec2.exceptions import InvalidSecurityGroupNotFoundError, InvalidSubnetIdError from ..ec2.utils import random_private_ip from .exceptions import ( ClusterNotFoundException, EcsClientException, InvalidParameterException, RevisionNotFoundException, ServiceNotFoundException, TaskDefinitionMemoryError, TaskDefinitionMissingPropertyError, TaskDefinitionNotFoundException, TaskSetNotFoundException, UnknownAccountSettingException, ) class BaseObject(BaseModel): def camelCase(self, key: str) -> str: words = [] for i, word in enumerate(key.split("_")): if i > 0: words.append(word.title()) else: words.append(word) return "".join(words) def gen_response_object(self) -> Dict[str, Any]: response_object = copy(self.__dict__) for key, value in self.__dict__.items(): if key.startswith("_"): del response_object[key] elif "_" in key: response_object[self.camelCase(key)] = value del response_object[key] return response_object @property def response_object(self) -> Dict[str, Any]: # type: ignore[misc] return self.gen_response_object() class AccountSetting(BaseObject): def __init__(self, name: str, value: str): self.name = name self.value = value class Cluster(BaseObject, CloudFormationModel): def __init__( self, cluster_name: str, account_id: str, region_name: str, cluster_settings: Optional[List[Dict[str, str]]] = None, configuration: Optional[Dict[str, Any]] = None, capacity_providers: Optional[List[str]] = None, default_capacity_provider_strategy: Optional[List[Dict[str, Any]]] = None, tags: Optional[List[Dict[str, str]]] = None, service_connect_defaults: Optional[Dict[str, str]] = None, ): self.active_services_count = 0 self.arn = f"arn:{get_partition(region_name)}:ecs:{region_name}:{account_id}:cluster/{cluster_name}" self.name = cluster_name self.pending_tasks_count = 0 self.registered_container_instances_count = 0 self.running_tasks_count = 0 self.status = "ACTIVE" self.region_name = region_name self.settings = cluster_settings or [ {"name": "containerInsights", "value": "disabled"} ] self.configuration = configuration self.capacity_providers = capacity_providers self.default_capacity_provider_strategy = default_capacity_provider_strategy self.tags = tags self.service_connect_defaults = service_connect_defaults @property def physical_resource_id(self) -> str: return self.name @property def response_object(self) -> Dict[str, Any]: # type: ignore[misc] response_object = self.gen_response_object() response_object["clusterArn"] = self.arn response_object["clusterName"] = self.name response_object["capacityProviders"] = self.capacity_providers response_object["defaultCapacityProviderStrategy"] = ( self.default_capacity_provider_strategy ) del response_object["arn"], response_object["name"] return response_object @staticmethod def cloudformation_name_type() -> str: return "ClusterName" @staticmethod def cloudformation_type() -> str: # https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-ecs-cluster.html return "AWS::ECS::Cluster" @classmethod def create_from_cloudformation_json( # type: ignore[misc] cls, resource_name: str, cloudformation_json: Any, account_id: str, region_name: str, **kwargs: Any, ) -> "Cluster": ecs_backend = ecs_backends[account_id][region_name] return ecs_backend.create_cluster( # ClusterName is optional in CloudFormation, thus create a random # name if necessary cluster_name=resource_name ) @classmethod def update_from_cloudformation_json( # type: ignore[misc] cls, original_resource: Any, new_resource_name: str, cloudformation_json: Any, account_id: str, region_name: str, ) -> "Cluster": if original_resource.name != new_resource_name: ecs_backend = ecs_backends[account_id][region_name] ecs_backend.delete_cluster(original_resource.arn) return ecs_backend.create_cluster( # ClusterName is optional in CloudFormation, thus create a # random name if necessary cluster_name=new_resource_name ) else: # no-op when nothing changed between old and new resources return original_resource @classmethod def has_cfn_attr(cls, attr: str) -> bool: return attr in ["Arn"] def get_cfn_attribute(self, attribute_name: str) -> str: from moto.cloudformation.exceptions import UnformattedGetAttTemplateException if attribute_name == "Arn": return self.arn raise UnformattedGetAttTemplateException() class TaskDefinition(BaseObject, CloudFormationModel): def __init__( self, family: str, revision: int, container_definitions: List[Dict[str, Any]], account_id: str, region_name: str, network_mode: Optional[str] = None, volumes: Optional[List[Dict[str, Any]]] = None, tags: Optional[List[Dict[str, str]]] = None, placement_constraints: Optional[List[Dict[str, str]]] = None, requires_compatibilities: Optional[List[str]] = None, cpu: Optional[str] = None, memory: Optional[str] = None, task_role_arn: Optional[str] = None, execution_role_arn: Optional[str] = None, proxy_configuration: Optional[Dict[str, Any]] = None, inference_accelerators: Optional[List[Dict[str, str]]] = None, runtime_platform: Optional[Dict[str, str]] = None, ipc_mode: Optional[str] = None, pid_mode: Optional[str] = None, ephemeral_storage: Optional[Dict[str, int]] = None, ): self.family = family self.revision = revision self.arn = f"arn:{get_partition(region_name)}:ecs:{region_name}:{account_id}:task-definition/{family}:{revision}" default_container_definition = { "cpu": 0, "portMappings": [], "essential": True, "environment": [], "mountPoints": [], "volumesFrom": [], } self.container_definitions = [] for container_definition in container_definitions: full_definition = default_container_definition.copy() full_definition.update(container_definition) self.container_definitions.append(full_definition) self.tags = tags if tags is not None else [] if volumes is None: self.volumes = [] else: self.volumes = volumes for volume in volumes: if "efsVolumeConfiguration" in volume: # We should reach into EFS to verify this volume exists efs_config = volume["efsVolumeConfiguration"] if "rootDirectory" not in efs_config: efs_config["rootDirectory"] = "/" if not requires_compatibilities or requires_compatibilities == ["EC2"]: self.compatibilities = ["EC2"] else: self.compatibilities = ["EC2", "FARGATE"] if network_mode is None and "FARGATE" not in self.compatibilities: self.network_mode: Optional[str] = "bridge" elif "FARGATE" in self.compatibilities: self.network_mode: Optional[str] = "awsvpc" # type: ignore[no-redef] else: self.network_mode = network_mode if task_role_arn is not None: self.task_role_arn = task_role_arn if execution_role_arn is not None: self.execution_role_arn = execution_role_arn self.placement_constraints = ( placement_constraints if placement_constraints is not None else [] ) self.requires_compatibilities = requires_compatibilities self.proxy_configuration = proxy_configuration self.inference_accelerators = inference_accelerators self.runtime_platform = runtime_platform self.ipc_mode = ipc_mode self.pid_mode = pid_mode self.ephemeral_storage = ephemeral_storage self.cpu = cpu self.memory = memory self.status = "ACTIVE" @property def response_object(self) -> Dict[str, Any]: # type: ignore[misc] response_object = self.gen_response_object() response_object["taskDefinitionArn"] = response_object.pop("arn") if not response_object["requiresCompatibilities"]: del response_object["requiresCompatibilities"] if not response_object["cpu"]: del response_object["cpu"] if not response_object["memory"]: del response_object["memory"] return { "taskDefinition": response_object, "tags": response_object.get("tags", []), } @property def physical_resource_id(self) -> str: return self.arn @staticmethod def cloudformation_name_type() -> str: return "" @staticmethod def cloudformation_type() -> str: # https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-ecs-taskdefinition.html return "AWS::ECS::TaskDefinition" @classmethod def create_from_cloudformation_json( # type: ignore[misc] cls, resource_name: str, cloudformation_json: Any, account_id: str, region_name: str, **kwargs: Any, ) -> "TaskDefinition": properties = cloudformation_json["Properties"] family = properties.get( "Family", f"task-definition-{int(mock_random.random() * 10**6)}" ) container_definitions = remap_nested_keys( properties.get("ContainerDefinitions", []), pascal_to_camelcase ) volumes = remap_nested_keys(properties.get("Volumes", []), pascal_to_camelcase) memory = properties.get("Memory") ecs_backend = ecs_backends[account_id][region_name] return ecs_backend.register_task_definition( family=family, container_definitions=container_definitions, volumes=volumes, memory=memory, ) @classmethod def update_from_cloudformation_json( # type: ignore[misc] cls, original_resource: Any, new_resource_name: str, cloudformation_json: Any, account_id: str, region_name: str, ) -> "TaskDefinition": properties = cloudformation_json["Properties"] family = properties.get( "Family", f"task-definition-{int(mock_random.random() * 10**6)}" ) container_definitions = properties["ContainerDefinitions"] volumes = properties.get("Volumes") memory = properties.get("Memory") if ( original_resource.family != family or original_resource.container_definitions != container_definitions or original_resource.volumes != volumes ): # currently TaskRoleArn isn't stored at TaskDefinition # instances ecs_backend = ecs_backends[account_id][region_name] ecs_backend.deregister_task_definition(original_resource.arn) return ecs_backend.register_task_definition( family=family, container_definitions=container_definitions, volumes=volumes, memory=memory, ) else: # no-op when nothing changed between old and new resources return original_resource class Task(BaseObject, ManagedState): def __init__( self, cluster: Cluster, task_definition: TaskDefinition, container_instance_arn: Optional[str], resource_requirements: Optional[Dict[str, str]], backend: "EC2ContainerServiceBackend", launch_type: str = "", overrides: Optional[Dict[str, Any]] = None, started_by: str = "", tags: Optional[List[Dict[str, str]]] = None, networking_configuration: Optional[Dict[str, Any]] = None, ): # Configure ManagedState # https://docs.aws.amazon.com/AmazonECS/latest/developerguide/task-lifecycle.html super().__init__( model_name="ecs::task", transitions=[ # We start in RUNNING state in order not to break existing tests. # ("PROVISIONING", "PENDING"), # ("PENDING", "ACTIVATING"), # ("ACTIVATING", "RUNNING"), ("RUNNING", "DEACTIVATING"), ("DEACTIVATING", "STOPPING"), ("STOPPING", "DEPROVISIONING"), ("DEPROVISIONING", "STOPPED"), # There seems to be race condition, where the waiter expects the task to be in # STOPPED state, but it is already in DELETED state. # ("STOPPED", "DELETED"), ], ) self.id = str(mock_random.uuid4()) self.cluster_name = cluster.name self.cluster_arn = cluster.arn self.container_instance_arn = container_instance_arn self.desired_status = "RUNNING" self.task_definition_arn = task_definition.arn self.overrides = overrides or {} self.containers = [Container(task_definition)] self.started_by = started_by self.tags = tags or [] self.launch_type = launch_type self.stopped_reason = "" self.resource_requirements = resource_requirements self.region_name = cluster.region_name self._account_id = backend.account_id self._backend = backend self.attachments = [] if task_definition.network_mode == "awsvpc": if not networking_configuration: raise InvalidParameterException( "Network Configuration must be provided when networkMode 'awsvpc' is specified." ) self.network_configuration = networking_configuration net_conf = networking_configuration["awsvpcConfiguration"] ec2_backend = ec2_backends[self._account_id][self.region_name] eni = ec2_backend.create_network_interface( subnet=net_conf["subnets"][0], private_ip_address=random_private_ip(), group_ids=net_conf["securityGroups"], description="moto ECS", ) eni.status = "in-use" eni.device_index = 0 self.attachments.append( { "id": str(mock_random.uuid4()), "type": "ElasticNetworkInterface", "status": "ATTACHED", "details": [ {"name": "subnetId", "value": net_conf["subnets"][0]}, {"name": "networkInterfaceId", "value": eni.id}, {"name": "macAddress", "value": eni.mac_address}, {"name": "privateDnsName", "value": eni.private_dns_name}, {"name": "privateIPv4Address", "value": eni.private_ip_address}, ], } ) @property def last_status(self) -> Optional[str]: return self.status # managed state @last_status.setter def last_status(self, value: Optional[str]) -> None: self.status = value @property def task_arn(self) -> str: if self._backend.enable_long_arn_for_name(name="taskLongArnFormat"): return f"arn:{get_partition(self.region_name)}:ecs:{self.region_name}:{self._account_id}:task/{self.cluster_name}/{self.id}" return f"arn:{get_partition(self.region_name)}:ecs:{self.region_name}:{self._account_id}:task/{self.id}" def response_object(self, include_tags: bool = True) -> Dict[str, Any]: # type: ignore response_object = self.gen_response_object() if not include_tags: response_object.pop("tags", None) response_object["taskArn"] = self.task_arn response_object["lastStatus"] = self.last_status response_object["containers"] = [self.containers[0].response_object] return response_object class CapacityProvider(BaseObject): def __init__( self, account_id: str, region_name: str, name: str, asg_details: Dict[str, Any], tags: Optional[List[Dict[str, str]]], ): self._id = str(mock_random.uuid4()) self.capacity_provider_arn = f"arn:{get_partition(region_name)}:ecs:{region_name}:{account_id}:capacity-provider/{name}" self.name = name self.status = "ACTIVE" self.auto_scaling_group_provider = self._prepare_asg_provider(asg_details) self.tags = tags self.update_status: Optional[str] = None def _prepare_asg_provider(self, asg_details: Dict[str, Any]) -> Dict[str, Any]: if "managedScaling" not in asg_details: asg_details["managedScaling"] = {} if asg_details["managedScaling"].get("instanceWarmupPeriod") is None: asg_details["managedScaling"]["instanceWarmupPeriod"] = 300 if not asg_details["managedScaling"].get("minimumScalingStepSize"): asg_details["managedScaling"]["minimumScalingStepSize"] = 1 if not asg_details["managedScaling"].get("maximumScalingStepSize"): asg_details["managedScaling"]["maximumScalingStepSize"] = 10000 if not asg_details["managedScaling"].get("targetCapacity"): asg_details["managedScaling"]["targetCapacity"] = 100 if not asg_details["managedScaling"].get("status"): asg_details["managedScaling"]["status"] = "DISABLED" if "managedTerminationProtection" not in asg_details: asg_details["managedTerminationProtection"] = "DISABLED" return asg_details def update(self, asg_details: Dict[str, Any]) -> None: if "managedTerminationProtection" in asg_details: self.auto_scaling_group_provider["managedTerminationProtection"] = ( asg_details["managedTerminationProtection"] ) if "managedScaling" in asg_details: scaling_props = [ "status", "targetCapacity", "minimumScalingStepSize", "maximumScalingStepSize", "instanceWarmupPeriod", ] for prop in scaling_props: if prop in asg_details["managedScaling"]: self.auto_scaling_group_provider["managedScaling"][prop] = ( asg_details["managedScaling"][prop] ) self.auto_scaling_group_provider = self._prepare_asg_provider( self.auto_scaling_group_provider ) self.update_status = "UPDATE_COMPLETE" class CapacityProviderFailure(BaseObject): def __init__(self, reason: str, name: str, account_id: str, region_name: str): self.reason = reason self.arn = f"arn:{get_partition(region_name)}:ecs:{region_name}:{account_id}:capacity_provider/{name}" @property def response_object(self) -> Dict[str, Any]: # type: ignore[misc] response_object = self.gen_response_object() response_object["reason"] = self.reason response_object["arn"] = self.arn return response_object class Service(BaseObject, CloudFormationModel): def __init__( self, cluster: Cluster, service_name: str, desired_count: int, backend: "EC2ContainerServiceBackend", task_definition: Optional[TaskDefinition] = None, load_balancers: Optional[List[Dict[str, Any]]] = None, scheduling_strategy: Optional[List[Dict[str, Any]]] = None, tags: Optional[List[Dict[str, str]]] = None, deployment_controller: Optional[Dict[str, str]] = None, launch_type: Optional[str] = None, service_registries: Optional[List[Dict[str, Any]]] = None, platform_version: Optional[str] = None, network_configuration: Optional[Dict[str, Dict[str, List[str]]]] = None, propagate_tags: str = "NONE", role_arn: Optional[str] = None, ): self.cluster_name = cluster.name self.cluster_arn = cluster.arn self.name = service_name self.status = "ACTIVE" self.task_definition = task_definition.arn if task_definition else None self.desired_count = desired_count self.task_sets: List[TaskSet] = [] self.deployment_controller = deployment_controller or {"type": "ECS"} self.events: List[Dict[str, Any]] = [] self.launch_type = launch_type self.service_registries = service_registries or [] self.load_balancers = load_balancers if load_balancers is not None else [] self.scheduling_strategy = ( scheduling_strategy if scheduling_strategy is not None else "REPLICA" ) self.platform_version = platform_version self.tags = tags if tags is not None else [] self.region_name = cluster.region_name self._account_id = backend.account_id self._backend = backend self.role_arn = role_arn try: # negative running count not allowed, set to 0 if so ecs_running_count = max(int(getenv("MOTO_ECS_SERVICE_RUNNING", 0)), 0) except ValueError: # Unable to parse value of MOTO_ECS_SERVICE_RUNNING as an integer, set to default 0 ecs_running_count = 0 self.running_count = ecs_running_count self.pending_count = desired_count - ecs_running_count if self.deployment_controller["type"] == "ECS": self.deployments = [ { "createdAt": datetime.now(timezone.utc), "desiredCount": self.desired_count, "id": f"ecs-svc/{mock_random.randint(0, 32**12)}", "launchType": self.launch_type, "pendingCount": self.pending_count, "runningCount": ecs_running_count, "status": "PRIMARY", "taskDefinition": self.task_definition, "updatedAt": datetime.now(timezone.utc), } ] else: self.deployments = [] self.propagate_tags = propagate_tags if network_configuration is not None: self.network_configuration = self._validate_network(network_configuration) else: self.network_configuration = {} def _validate_network( self, nc: Dict[str, Dict[str, List[str]]] ) -> Dict[str, Dict[str, List[str]]]: c = nc["awsvpcConfiguration"] if len(c["subnets"]) == 0: raise InvalidParameterException("subnets can not be empty.") ec2_backend = ec2_backends[self._account_id][self.region_name] try: ec2_backend.describe_subnets(subnet_ids=c["subnets"]) except InvalidSubnetIdError as exc: subnet_id = exc.message.split("'")[1] raise InvalidParameterException( f"Error retrieving subnet information for [{subnet_id}]: {exc.message} (ErrorCode: {exc.error_type})" ) try: ec2_backend.describe_security_groups(group_ids=c["securityGroups"]) except InvalidSecurityGroupNotFoundError as exc: sg = exc.message.split("'{'")[1].split("'}'")[0] raise InvalidParameterException( f"Error retrieving security group information for [{sg}]: " f"The security group '{sg}' does not exist (ErrorCode: InvalidGroup.NotFound)" ) return nc @property def arn(self) -> str: if self._backend.enable_long_arn_for_name(name="serviceLongArnFormat"): return f"arn:{get_partition(self.region_name)}:ecs:{self.region_name}:{self._account_id}:service/{self.cluster_name}/{self.name}" return f"arn:{get_partition(self.region_name)}:ecs:{self.region_name}:{self._account_id}:service/{self.name}" @property def physical_resource_id(self) -> str: return self.arn @property def response_object(self) -> Dict[str, Any]: # type: ignore[misc] response_object = self.gen_response_object() del response_object["name"], response_object["tags"] response_object["serviceName"] = self.name response_object["serviceArn"] = self.arn response_object["schedulingStrategy"] = self.scheduling_strategy response_object["platformVersion"] = self.platform_version if response_object["deploymentController"]["type"] == "ECS": del response_object["deploymentController"] del response_object["taskSets"] else: response_object["taskSets"] = [ t.response_object for t in response_object["taskSets"] ] for deployment in response_object["deployments"]: if isinstance(deployment["createdAt"], datetime): deployment["createdAt"] = unix_time( deployment["createdAt"].replace(tzinfo=None) ) if isinstance(deployment["updatedAt"], datetime): deployment["updatedAt"] = unix_time( deployment["updatedAt"].replace(tzinfo=None) ) response_object["networkConfiguration"] = self.network_configuration if self.role_arn: response_object["roleArn"] = self.role_arn return response_object @staticmethod def cloudformation_name_type() -> str: return "ServiceName" @staticmethod def cloudformation_type() -> str: # https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-ecs-service.html return "AWS::ECS::Service" @classmethod def create_from_cloudformation_json( # type: ignore[misc] cls, resource_name: str, cloudformation_json: Any, account_id: str, region_name: str, **kwargs: Any, ) -> "Service": properties = cloudformation_json["Properties"] if isinstance(properties["Cluster"], Cluster): cluster = properties["Cluster"].name else: cluster = properties["Cluster"] if isinstance(properties["TaskDefinition"], TaskDefinition): task_definition = properties["TaskDefinition"].family else: task_definition = properties["TaskDefinition"] desired_count = properties.get("DesiredCount", None) # TODO: LoadBalancers # TODO: Role ecs_backend = ecs_backends[account_id][region_name] return ecs_backend.create_service( cluster, resource_name, desired_count, task_definition_str=task_definition ) @classmethod def update_from_cloudformation_json( # type: ignore[misc] cls, original_resource: Any, new_resource_name: str, cloudformation_json: Any, account_id: str, region_name: str, ) -> "Service": properties = cloudformation_json["Properties"] if isinstance(properties["Cluster"], Cluster): cluster_name = properties["Cluster"].name else: cluster_name = properties["Cluster"] if isinstance(properties["TaskDefinition"], TaskDefinition): task_definition = properties["TaskDefinition"].family else: task_definition = properties["TaskDefinition"] desired_count = properties.get("DesiredCount", None) ecs_backend = ecs_backends[account_id][region_name] service_name = original_resource.name if ( original_resource.cluster_arn != Cluster(cluster_name, account_id, region_name).arn ): # TODO: LoadBalancers # TODO: Role ecs_backend.delete_service( original_resource.cluster_name, service_name, force=True ) return ecs_backend.create_service( cluster_name, new_resource_name, desired_count, task_definition_str=task_definition, ) else: return ecs_backend.update_service( { "cluster": cluster_name, "service": service_name, "task_definition": task_definition, "desired_count": desired_count, } ) @classmethod def has_cfn_attr(cls, attr: str) -> bool: return attr in ["Name"] def get_cfn_attribute(self, attribute_name: str) -> str: from moto.cloudformation.exceptions import UnformattedGetAttTemplateException if attribute_name == "Name": return self.name raise UnformattedGetAttTemplateException() class Container(BaseObject, CloudFormationModel): def __init__( self, task_def: TaskDefinition, ): self.container_arn = f"{task_def.arn}/{str(mock_random.uuid4())}" self.task_arn = task_def.arn container_def = task_def.container_definitions[0] self.image = container_def.get("image") self.last_status = "PENDING" self.exitCode = 0 self.network_interfaces: List[Dict[str, Any]] = [] self.health_status = "HEALTHY" self.cpu = container_def.get("cpu") self.memory = container_def.get("memory") self.environment = container_def.get("environment") self.name = container_def.get("name") self.command = container_def.get("command") class ContainerInstance(BaseObject): def __init__( self, ec2_instance_id: str, account_id: str, region_name: str, cluster_name: str, backend: "EC2ContainerServiceBackend", ): self.ec2_instance_id = ec2_instance_id self.agent_connected = True self.status = "ACTIVE" self.registered_resources: List[Dict[str, Any]] = [ { "doubleValue": 0.0, "integerValue": 4096, "longValue": 0, "name": "CPU", "type": "INTEGER", }, { "doubleValue": 0.0, "integerValue": 7482, "longValue": 0, "name": "MEMORY", "type": "INTEGER", }, { "doubleValue": 0.0, "integerValue": 0, "longValue": 0, "name": "PORTS", "stringSetValue": ["22", "2376", "2375", "51678", "51679"], "type": "STRINGSET", }, { "doubleValue": 0.0, "integerValue": 0, "longValue": 0, "name": "PORTS_UDP", "stringSetValue": [], "type": "STRINGSET", }, ] self.pending_tasks_count = 0 self.remaining_resources: List[Dict[str, Any]] = [ { "doubleValue": 0.0, "integerValue": 4096, "longValue": 0, "name": "CPU", "type": "INTEGER", }, { "doubleValue": 0.0, "integerValue": 7482, "longValue": 0, "name": "MEMORY", "type": "INTEGER", }, { "doubleValue": 0.0, "integerValue": 0, "longValue": 0, "name": "PORTS", "stringSetValue": ["22", "2376", "2375", "51678", "51679"], "type": "STRINGSET", }, { "doubleValue": 0.0, "integerValue": 0, "longValue": 0, "name": "PORTS_UDP", "stringSetValue": [], "type": "STRINGSET", }, ] self.running_tasks_count = 0 self.version_info = { "agentVersion": "1.0.0", "agentHash": "4023248", "dockerVersion": "DockerVersion: 1.5.0", } ec2_backend = ec2_backends[account_id][region_name] ec2_instance = ec2_backend.get_instance(ec2_instance_id) self.attributes = { "ecs.ami-id": ec2_instance.image_id, "ecs.availability-zone": ec2_instance.placement, "ecs.instance-type": ec2_instance.instance_type, "ecs.os-type": ec2_instance.platform if ec2_instance.platform == "windows" else "linux", # options are windows and linux, linux is default } self.registered_at = datetime.now(timezone.utc) self.region_name = region_name self.id = str(mock_random.uuid4()) self.cluster_name = cluster_name self._account_id = backend.account_id self._backend = backend @property def container_instance_arn(self) -> str: if self._backend.enable_long_arn_for_name( name="containerInstanceLongArnFormat" ): return f"arn:{get_partition(self.region_name)}:ecs:{self.region_name}:{self._account_id}:container-instance/{self.cluster_name}/{self.id}" return f"arn:{get_partition(self.region_name)}:ecs:{self.region_name}:{self._account_id}:container-instance/{self.id}" @property def response_object(self) -> Dict[str, Any]: # type: ignore[misc] response_object = self.gen_response_object() response_object["containerInstanceArn"] = self.container_instance_arn response_object["attributes"] = [ self._format_attribute(name, value) for name, value in response_object["attributes"].items() ] if isinstance(response_object["registeredAt"], datetime): response_object["registeredAt"] = unix_time( response_object["registeredAt"].replace(tzinfo=None) ) return response_object def _format_attribute(self, name: str, value: Optional[str]) -> Dict[str, str]: formatted_attr = {"name": name} if value is not None: formatted_attr["value"] = value return formatted_attr class ClusterFailure(BaseObject): def __init__( self, reason: str, cluster_name: str, account_id: str, region_name: str ): self.reason = reason self.arn = f"arn:{get_partition(region_name)}:ecs:{region_name}:{account_id}:cluster/{cluster_name}" @property def response_object(self) -> Dict[str, Any]: # type: ignore[misc] response_object = self.gen_response_object() response_object["reason"] = self.reason response_object["arn"] = self.arn return response_object class ContainerInstanceFailure(BaseObject): def __init__( self, reason: str, container_instance_id: str, account_id: str, region_name: str ): self.reason = reason self.arn = f"arn:{get_partition(region_name)}:ecs:{region_name}:{account_id}:container-instance/{container_instance_id}" @property def response_object(self) -> Dict[str, Any]: # type: ignore[misc] response_object = self.gen_response_object() response_object["reason"] = self.reason response_object["arn"] = self.arn return response_object class TaskSet(BaseObject): def __init__( self, service: str, cluster: str, task_definition: str, account_id: str, region_name: str, external_id: Optional[str] = None, network_configuration: Optional[Dict[str, Any]] = None, load_balancers: Optional[List[Dict[str, Any]]] = None, service_registries: Optional[List[Dict[str, Any]]] = None, launch_type: Optional[str] = None, capacity_provider_strategy: Optional[List[Dict[str, Any]]] = None, platform_version: Optional[str] = None, scale: Optional[Dict[str, Any]] = None, client_token: Optional[str] = None, tags: Optional[List[Dict[str, str]]] = None, ): self.service = service self.cluster = cluster self.status = "ACTIVE" self.task_definition = task_definition or "" self.region_name = region_name self.external_id = external_id or "" self.network_configuration = network_configuration or None self.load_balancers = load_balancers or [] self.service_registries = service_registries or [] self.launch_type = launch_type self.capacity_provider_strategy = capacity_provider_strategy or [] self.platform_version = platform_version or "LATEST" self.scale = scale or {"value": 100.0, "unit": "PERCENT"} self.client_token = client_token or "" self.tags = tags or [] self.stabilityStatus = "STEADY_STATE" self.createdAt = datetime.now(timezone.utc) self.updatedAt = datetime.now(timezone.utc) self.stabilityStatusAt = datetime.now(timezone.utc) self.id = f"ecs-svc/{mock_random.randint(0, 32**12)}" self.service_arn = "" self.cluster_arn = "" cluster_name = self.cluster.split("/")[-1] service_name = self.service.split("/")[-1] self.task_set_arn = f"arn:{get_partition(region_name)}:ecs:{region_name}:{account_id}:task-set/{cluster_name}/{service_name}/{self.id}" @property def response_object(self) -> Dict[str, Any]: # type: ignore[misc] response_object = self.gen_response_object() if isinstance(response_object["createdAt"], datetime): response_object["createdAt"] = unix_time( self.createdAt.replace(tzinfo=None) ) if isinstance(response_object["updatedAt"], datetime): response_object["updatedAt"] = unix_time( self.updatedAt.replace(tzinfo=None) ) if isinstance(response_object["stabilityStatusAt"], datetime): response_object["stabilityStatusAt"] = unix_time( self.stabilityStatusAt.replace(tzinfo=None) ) del response_object["service"] del response_object["cluster"] return response_object class EC2ContainerServiceBackend(BaseBackend): """ ECS resources use the new ARN format by default. Use the following environment variable to revert back to the old/short ARN format: `MOTO_ECS_NEW_ARN=false` AWS reference: https://aws.amazon.com/blogs/compute/migrating-your-amazon-ecs-deployment-to-the-new-arn-and-resource-id-format-2/ Set the environment variable MOTO_ECS_SERVICE_RUNNING to a number of running tasks you want. For example: MOTO_ECS_SERVICE_RUNNING=2 Every describe_services() will return runningCount AND deployment of 2 """ def __init__(self, region_name: str, account_id: str): super().__init__(region_name, account_id) self.account_settings: Dict[str, AccountSetting] = dict() self.capacity_providers: Dict[str, CapacityProvider] = dict() self.clusters: Dict[str, Cluster] = {} self.task_definitions: Dict[str, Dict[int, TaskDefinition]] = {} self.tasks: Dict[str, Dict[str, Task]] = {} self.services: Dict[str, Service] = {} self.container_instances: Dict[str, Dict[str, ContainerInstance]] = {} def _get_cluster(self, name: str) -> Cluster: # short name or full ARN of the cluster cluster_name = name.split("/")[-1] cluster = self.clusters.get(cluster_name) if not cluster: raise ClusterNotFoundException return cluster def create_capacity_provider( self, name: str, asg_details: Dict[str, Any], tags: Optional[List[Dict[str, str]]], ) -> CapacityProvider: capacity_provider = CapacityProvider( self.account_id, self.region_name, name, asg_details, tags ) self.capacity_providers[name] = capacity_provider return capacity_provider def describe_task_definition(self, task_definition_str: str) -> TaskDefinition: task_definition_name = task_definition_str.split("/")[-1] if ":" in task_definition_name: family, rev = task_definition_name.split(":") revision = int(rev) else: family = task_definition_name revision = self._get_last_task_definition_revision_id(family) if ( family in self.task_definitions and revision in self.task_definitions[family] ): return self.task_definitions[family][revision] else: raise TaskDefinitionNotFoundException() def create_cluster( self, cluster_name: str, tags: Any = None, cluster_settings: Any = None, configuration: Optional[Dict[str, Any]] = None, capacity_providers: Optional[List[str]] = None, default_capacity_provider_strategy: Optional[List[Dict[str, Any]]] = None, service_connect_defaults: Optional[Dict[str, str]] = None, ) -> Cluster: cluster = Cluster( cluster_name, self.account_id, self.region_name, cluster_settings, configuration, capacity_providers, default_capacity_provider_strategy, tags, service_connect_defaults=service_connect_defaults, ) self.clusters[cluster_name] = cluster return cluster def update_cluster( self, cluster_name: str, cluster_settings: Optional[List[Dict[str, str]]], configuration: Optional[Dict[str, Any]], service_connect_defaults: Optional[Dict[str, str]], ) -> Cluster: """ The serviceConnectDefaults-parameter is not yet implemented """ cluster = self._get_cluster(cluster_name) if cluster_settings is not None: cluster.settings = cluster_settings if configuration is not None: cluster.configuration = configuration if service_connect_defaults is not None: cluster.service_connect_defaults = service_connect_defaults return cluster def put_cluster_capacity_providers( self, cluster_name: str, capacity_providers: Optional[List[str]], default_capacity_provider_strategy: Optional[List[Dict[str, Any]]], ) -> Cluster: cluster = self._get_cluster(cluster_name) if capacity_providers is not None: cluster.capacity_providers = capacity_providers if default_capacity_provider_strategy is not None: cluster.default_capacity_provider_strategy = ( default_capacity_provider_strategy ) return cluster def _get_provider(self, name_or_arn: str) -> Optional[CapacityProvider]: for provider in self.capacity_providers.values(): if ( provider.name == name_or_arn or provider.capacity_provider_arn == name_or_arn ): return provider return None def describe_capacity_providers( self, names: List[str] ) -> Tuple[List[CapacityProvider], List[CapacityProviderFailure]]: providers = [] failures = [] for name in names: provider = self._get_provider(name) if provider: providers.append(provider) else: failures.append( CapacityProviderFailure( "MISSING", name, self.account_id, self.region_name ) ) return providers, failures def delete_capacity_provider(self, name_or_arn: str) -> CapacityProvider: provider: CapacityProvider = self._get_provider(name_or_arn) # type: ignore[assignment] self.capacity_providers.pop(provider.name) return provider def update_capacity_provider( self, name_or_arn: str, asg_provider: Dict[str, Any] ) -> CapacityProvider: provider: CapacityProvider = self._get_provider(name_or_arn) # type: ignore[assignment] provider.update(asg_provider) return provider def list_clusters(self) -> List[str]: """ maxSize and pagination not implemented """ return [cluster.arn for cluster in self.clusters.values()] def describe_clusters( self, list_clusters_name: Optional[List[str]] = None, include: Optional[List[str]] = None, ) -> Tuple[List[Dict[str, Any]], List[ClusterFailure]]: """ Only include=TAGS is currently supported. """ list_clusters = [] failures = [] if list_clusters_name is None: if "default" in self.clusters: list_clusters.append(self.clusters["default"].response_object) else: for cluster_name in list_clusters_name: cluster_name = cluster_name.split("/")[-1] if cluster_name in self.clusters: list_clusters.append(self.clusters[cluster_name].response_object) else: failures.append( ClusterFailure( "MISSING", cluster_name, self.account_id, self.region_name ) ) if not include or "TAGS" not in (include): for cluster in list_clusters: cluster["tags"] = None return list_clusters, failures def delete_cluster(self, cluster_str: str) -> Cluster: cluster = self._get_cluster(cluster_str) # A cluster is not immediately removed - just marked as inactive # It is only deleted later on # https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/ecs.html#ECS.Client.delete_cluster cluster.status = "INACTIVE" return cluster def register_task_definition( self, family: str, container_definitions: List[Dict[str, Any]], volumes: Optional[List[Dict[str, Any]]] = None, network_mode: Optional[str] = None, tags: Optional[List[Dict[str, str]]] = None, placement_constraints: Optional[List[Dict[str, str]]] = None, requires_compatibilities: Optional[List[str]] = None, cpu: Optional[str] = None, memory: Optional[str] = None, task_role_arn: Optional[str] = None, execution_role_arn: Optional[str] = None, proxy_configuration: Optional[Dict[str, Any]] = None, inference_accelerators: Optional[List[Dict[str, str]]] = None, runtime_platform: Optional[Dict[str, str]] = None, ipc_mode: Optional[str] = None, pid_mode: Optional[str] = None, ephemeral_storage: Optional[Dict[str, int]] = None, ) -> TaskDefinition: if requires_compatibilities and "FARGATE" in requires_compatibilities: # TODO need more validation for Fargate if pid_mode and pid_mode != "task": raise EcsClientException( f"Tasks using the Fargate launch type do not support pidMode '{pid_mode}'. The supported value for pidMode is 'task'." ) self._validate_container_defs( memory, container_definitions, requires_compatibilities ) if family in self.task_definitions: last_id = self._get_last_task_definition_revision_id(family) revision = (last_id or 0) + 1 else: self.task_definitions[family] = {} revision = 1 task_definition = TaskDefinition( family, revision, container_definitions, self.account_id, self.region_name, volumes=volumes, network_mode=network_mode, tags=tags, placement_constraints=placement_constraints, requires_compatibilities=requires_compatibilities, cpu=cpu, memory=memory, task_role_arn=task_role_arn, execution_role_arn=execution_role_arn, proxy_configuration=proxy_configuration, inference_accelerators=inference_accelerators, runtime_platform=runtime_platform, ipc_mode=ipc_mode, pid_mode=pid_mode, ephemeral_storage=ephemeral_storage, ) self.task_definitions[family][revision] = task_definition return task_definition @staticmethod def _validate_container_defs( # type: ignore[misc] memory: Optional[str], container_definitions: List[Dict[str, Any]], requires_compatibilities: Optional[List[str]], ) -> None: # The capitalised keys are passed by Cloudformation for cd in container_definitions: if "name" not in cd and "Name" not in cd: raise TaskDefinitionMissingPropertyError("name") if "image" not in cd and "Image" not in cd: raise TaskDefinitionMissingPropertyError("image") if ( requires_compatibilities and "EC2" in requires_compatibilities and ("memory" not in cd and "Memory" not in cd and not memory) ): raise TaskDefinitionMemoryError(cd["name"]) if ( "memory" not in cd and "Memory" not in cd and "memoryReservation" not in cd and "MemoryReservation" not in cd and not memory ): raise TaskDefinitionMemoryError(cd["name"]) def list_task_definitions(self, family_prefix: str) -> List[str]: task_arns = [] for task_definition_list in self.task_definitions.values(): task_arns.extend( [ task_definition.arn for task_definition in task_definition_list.values() if family_prefix is None or task_definition.family == family_prefix ] ) return task_arns def deregister_task_definition(self, task_definition_str: str) -> TaskDefinition: task_definition_name = task_definition_str.split("/")[-1] try: family, rev = task_definition_name.split(":") except ValueError: raise RevisionNotFoundException try: revision = int(rev) except ValueError: raise InvalidParameterException("Invalid revision number. Number: " + rev) if ( family in self.task_definitions and revision in self.task_definitions[family] ): # https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/ecs.html#ECS.Client.deregister_task_definition # At this time, INACTIVE task definitions remain discoverable in your account indefinitely. task_definition = self.task_definitions[family][revision] task_definition.status = "INACTIVE" return task_definition else: raise TaskDefinitionNotFoundException def run_task( self, cluster_str: str, task_definition_str: str, count: int, overrides: Optional[Dict[str, Any]], started_by: str, tags: Optional[List[Dict[str, str]]], launch_type: Optional[str], networking_configuration: Optional[Dict[str, Any]] = None, ) -> List[Task]: if launch_type and launch_type not in ["EC2", "FARGATE", "EXTERNAL"]: raise InvalidParameterException( "launch type should be one of [EC2,FARGATE,EXTERNAL]" ) cluster = self._get_cluster(cluster_str) task_definition = self.describe_task_definition(task_definition_str) resource_requirements = self._calculate_task_resource_requirements( task_definition ) if cluster.name not in self.tasks: self.tasks[cluster.name] = {} tasks = [] if launch_type == "FARGATE": for _ in range(count): task = Task( cluster=cluster, task_definition=task_definition, container_instance_arn=None, resource_requirements=resource_requirements, backend=self, overrides=overrides or {}, started_by=started_by or "", tags=tags or [], launch_type=launch_type or "", networking_configuration=networking_configuration, ) tasks.append(task) self.tasks[cluster.name][task.task_arn] = task return tasks container_instances = list( self.container_instances.get(cluster.name, {}).keys() ) if not container_instances: raise Exception(f"No instances found in cluster {cluster.name}") active_container_instances = [ x for x in container_instances if self.container_instances[cluster.name][x].status == "ACTIVE" ] # TODO: return event about unable to place task if not able to place enough tasks to meet count placed_count = 0 for name in active_container_instances: container_instance = self.container_instances[cluster.name][name] container_instance_arn = container_instance.container_instance_arn try_to_place = True while try_to_place: can_be_placed = self._can_be_placed( container_instance, resource_requirements ) if can_be_placed: task = Task( cluster, task_definition, container_instance_arn, resource_requirements, backend=self, overrides=overrides or {}, started_by=started_by or "", tags=tags or [], launch_type=launch_type or "", networking_configuration=networking_configuration, ) self.update_container_instance_resources( container_instance, resource_requirements ) tasks.append(task) self.tasks[cluster.name][task.task_arn] = task placed_count += 1 if placed_count == count: return tasks else: try_to_place = False return tasks @staticmethod def _calculate_task_resource_requirements( # type: ignore[misc] task_definition: TaskDefinition, ) -> Dict[str, Any]: resource_requirements: Dict[str, Any] = { "CPU": 0, "MEMORY": 0, "PORTS": [], "PORTS_UDP": [], } for container_definition in task_definition.container_definitions: # cloudformation uses capitalized properties, while boto uses all lower case # CPU is optional resource_requirements["CPU"] += container_definition.get( "cpu", container_definition.get("Cpu", 0) ) # either memory or memory reservation must be provided if ( "Memory" in container_definition or "MemoryReservation" in container_definition ): resource_requirements["MEMORY"] += container_definition.get( "Memory", container_definition.get("MemoryReservation") ) else: resource_requirements["MEMORY"] += container_definition.get( "memory", container_definition.get("memoryReservation") ) port_mapping_key = ( "PortMappings" if "PortMappings" in container_definition else "portMappings" ) for port_mapping in container_definition.get(port_mapping_key, []): # type: ignore[attr-defined] if "hostPort" in port_mapping: resource_requirements["PORTS"].append(port_mapping.get("hostPort")) elif "HostPort" in port_mapping: resource_requirements["PORTS"].append(port_mapping.get("HostPort")) return resource_requirements @staticmethod def _can_be_placed( # type: ignore[misc] container_instance: ContainerInstance, task_resource_requirements: Dict[str, Any], ) -> bool: """ :param container_instance: The container instance trying to be placed onto :param task_resource_requirements: The calculated resource requirements of the task in the form of a dict :return: A boolean stating whether the given container instance has enough resources to have the task placed on it as well as a description, if it cannot be placed this will describe why. """ # TODO: Implement default and other placement strategies as well as constraints: # docs.aws.amazon.com/AmazonECS/latest/developerguide/task-placement.html remaining_cpu = 0 remaining_memory = 0 reserved_ports: List[str] = [] for resource in container_instance.remaining_resources: if resource.get("name") == "CPU": remaining_cpu = resource.get("integerValue") # type: ignore[assignment] elif resource.get("name") == "MEMORY": remaining_memory = resource.get("integerValue") # type: ignore[assignment] elif resource.get("name") == "PORTS": reserved_ports = resource.get("stringSetValue") # type: ignore[assignment] if task_resource_requirements.get("CPU") > remaining_cpu: # type: ignore[operator] return False if task_resource_requirements.get("MEMORY") > remaining_memory: # type: ignore[operator] return False ports_needed = task_resource_requirements.get("PORTS") for port in ports_needed: # type: ignore[union-attr] if str(port) in reserved_ports: return False return True def start_task( self, cluster_str: str, task_definition_str: str, container_instances: List[str], overrides: Dict[str, Any], started_by: str, tags: Optional[List[Dict[str, str]]] = None, ) -> List[Task]: cluster = self._get_cluster(cluster_str) task_definition = self.describe_task_definition(task_definition_str) if cluster.name not in self.tasks: self.tasks[cluster.name] = {} tasks = [] if not container_instances: raise EcsClientException("Container Instances cannot be empty.") container_instance_ids = [x.split("/")[-1] for x in container_instances] resource_requirements = self._calculate_task_resource_requirements( task_definition ) for container_instance_id in container_instance_ids: container_instance = self.container_instances[cluster.name][ container_instance_id ] task = Task( cluster, task_definition, container_instance.container_instance_arn, resource_requirements, backend=self, overrides=overrides or {}, started_by=started_by or "", tags=tags, ) tasks.append(task) self.update_container_instance_resources( container_instance, resource_requirements ) self.tasks[cluster.name][task.task_arn] = task return tasks def describe_tasks(self, cluster_str: str, tasks: Optional[str]) -> List[Task]: """ Only include=TAGS is currently supported. """ self._get_cluster(cluster_str) if not tasks: raise InvalidParameterException("Tasks cannot be empty.") response = [] for cluster_tasks in self.tasks.values(): for task_arn, task in cluster_tasks.items(): task_id = task_arn.split("/")[-1] if ( task_arn in tasks or task.task_arn in tasks or any(task_id in task for task in tasks) ): task.advance() response.append(task) return response def list_tasks( self, cluster_str: Optional[str] = None, container_instance: Optional[str] = None, family: Optional[str] = None, started_by: Optional[str] = None, service_name: Optional[str] = None, desiredStatus: Optional[str] = None, ) -> List[Task]: filtered_tasks = [] for tasks in self.tasks.values(): for task in tasks.values(): filtered_tasks.append(task) if cluster_str: cluster = self._get_cluster(cluster_str) filtered_tasks = list( filter(lambda t: cluster.name in t.cluster_arn, filtered_tasks) ) if container_instance: filtered_tasks = list( filter( lambda t: container_instance in t.container_instance_arn, # type: ignore filtered_tasks, ) ) if family: task_definition_arns = self.list_task_definitions(family) filtered_tasks = list( filter( lambda t: t.task_definition_arn in task_definition_arns, filtered_tasks, ) ) if started_by: filtered_tasks = list( filter(lambda t: started_by == t.started_by, filtered_tasks) ) if service_name: # TODO: We can't filter on `service_name` until the backend actually # launches tasks as part of the service creation process. pass if desiredStatus: filtered_tasks = list( filter(lambda t: t.desired_status == desiredStatus, filtered_tasks) ) return filtered_tasks def stop_task(self, cluster_str: str, task_str: str, reason: str) -> Task: cluster = self._get_cluster(cluster_str) task_id = task_str.split("/")[-1] tasks = self.tasks.get(cluster.name, None) if not tasks: raise Exception(f"Cluster {cluster.name} has no registered tasks") for task in tasks.keys(): if task.endswith(task_id): container_instance_arn = tasks[task].container_instance_arn if container_instance_arn: container_instance = self.container_instances[cluster.name][ container_instance_arn.split("/")[-1] ] self.update_container_instance_resources( container_instance, tasks[task].resource_requirements, # type: ignore[arg-type] removing=True, ) tasks[task].last_status = "STOPPED" tasks[task].desired_status = "STOPPED" tasks[task].stopped_reason = reason return tasks[task] raise Exception(f"Could not find task {task_str} on cluster {cluster.name}") def _get_service(self, cluster_str: str, service_str: str) -> Service: cluster = self._get_cluster(cluster_str) for service in self.services.values(): if service.cluster_name == cluster.name and ( service.name == service_str or service.arn == service_str ): return service raise ServiceNotFoundException def create_service( self, cluster_str: str, service_name: str, desired_count: int, task_definition_str: Optional[str] = None, load_balancers: Optional[List[Dict[str, Any]]] = None, scheduling_strategy: Optional[List[Dict[str, Any]]] = None, tags: Optional[List[Dict[str, str]]] = None, deployment_controller: Optional[Dict[str, str]] = None, launch_type: Optional[str] = None, service_registries: Optional[List[Dict[str, Any]]] = None, platform_version: Optional[str] = None, propagate_tags: str = "NONE", network_configuration: Optional[Dict[str, Dict[str, List[str]]]] = None, role_arn: Optional[str] = None, ) -> Service: cluster = self._get_cluster(cluster_str) if task_definition_str: task_definition = self.describe_task_definition(task_definition_str) else: task_definition = None desired_count = desired_count if desired_count is not None else 0 launch_type = launch_type if launch_type is not None else "EC2" if launch_type not in ["EC2", "FARGATE"]: raise EcsClientException("launch type should be one of [EC2,FARGATE]") service = Service( cluster=cluster, service_name=service_name, desired_count=desired_count, task_definition=task_definition, load_balancers=load_balancers, scheduling_strategy=scheduling_strategy, tags=tags, deployment_controller=deployment_controller, launch_type=launch_type, backend=self, service_registries=service_registries, platform_version=platform_version, propagate_tags=propagate_tags, network_configuration=network_configuration, role_arn=role_arn, ) cluster_service_pair = f"{cluster.name}:{service_name}" self.services[cluster_service_pair] = service return service def list_services( self, cluster_str: str, scheduling_strategy: Optional[str] = None, launch_type: Optional[str] = None, ) -> List[str]: cluster = self._get_cluster(cluster_str) service_arns = [] for key, service in self.services.items(): if cluster.name + ":" not in key: continue if ( scheduling_strategy is not None and service.scheduling_strategy != scheduling_strategy ): continue if launch_type is not None and service.launch_type != launch_type: continue service_arns.append(service.arn) return sorted(service_arns) def describe_services( self, cluster_str: str, service_names_or_arns: List[str] ) -> Tuple[List[Service], List[Dict[str, str]]]: cluster = self._get_cluster(cluster_str) result = [] failures = [] for name_or_arn in service_names_or_arns: name = name_or_arn.split("/")[-1] cluster_service_pair = f"{cluster.name}:{name}" if cluster_service_pair in self.services: result.append(self.services[cluster_service_pair]) else: if re.match(ARN_PARTITION_REGEX + ":ecs", name_or_arn): missing_arn = name_or_arn else: missing_arn = f"arn:{get_partition(self.region_name)}:ecs:{self.region_name}:{self.account_id}:service/{name}" failures.append({"arn": missing_arn, "reason": "MISSING"}) return result, failures def update_service(self, service_properties: Dict[str, Any]) -> Service: cluster_str = service_properties.pop("cluster", "default") task_definition_str = service_properties.pop("task_definition", None) cluster = self._get_cluster(cluster_str) service_name = service_properties.pop("service").split("/")[-1] force_new_deployment = service_properties.pop("force_new_deployment", False) cluster_service_pair = f"{cluster.name}:{service_name}" if cluster_service_pair in self.services: current_service = self.services[cluster_service_pair] for prop_name, prop_val in service_properties.items(): if prop_val is not None: current_service.__setattr__(prop_name, prop_val) if prop_name == "desired_count": current_service.__setattr__("running_count", prop_val) current_service.__setattr__("pending_count", 0) if task_definition_str: self.describe_task_definition(task_definition_str) current_service.task_definition = task_definition_str if force_new_deployment and current_service.deployments: deployment = current_service.deployments[0] deployment["id"] = f"ecs-svc/{mock_random.randint(0, 32**12)}" now = datetime.now(timezone.utc) deployment["createdAt"] = now deployment["updatedAt"] = now return current_service else: raise ServiceNotFoundException def delete_service( self, cluster_name: str, service_name: str, force: bool ) -> Service: cluster = self._get_cluster(cluster_name) service = self._get_service(cluster_name, service_name) cluster_service_pair = f"{cluster.name}:{service.name}" service = self.services[cluster_service_pair] if service.desired_count > 0 and not force: raise InvalidParameterException( "The service cannot be stopped while it is scaled above 0." ) else: # A service is not immediately removed - just marked as inactive # It is only deleted later on # https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/ecs.html#ECS.Client.delete_service service.status = "INACTIVE" service.pending_count = 0 return service def register_container_instance( self, cluster_str: str, ec2_instance_id: str ) -> ContainerInstance: cluster_name = cluster_str.split("/")[-1] if cluster_name not in self.clusters: raise Exception(f"{cluster_name} is not a cluster") container_instance = ContainerInstance( ec2_instance_id, self.account_id, self.region_name, cluster_name, backend=self, ) if not self.container_instances.get(cluster_name): self.container_instances[cluster_name] = {} container_instance_id = container_instance.container_instance_arn.split("/")[-1] self.container_instances[cluster_name][container_instance_id] = ( container_instance ) self.clusters[cluster_name].registered_container_instances_count += 1 return container_instance def list_container_instances(self, cluster_str: str) -> List[str]: cluster_name = cluster_str.split("/")[-1] container_instances_values = self.container_instances.get( cluster_name, {} ).values() container_instances = [ ci.container_instance_arn for ci in container_instances_values ] return sorted(container_instances) def describe_container_instances( self, cluster_str: str, list_container_instance_ids: List[str] ) -> Tuple[List[ContainerInstance], List[ContainerInstanceFailure]]: cluster = self._get_cluster(cluster_str) if not list_container_instance_ids: raise EcsClientException("Container Instances cannot be empty.") failures = [] container_instance_objects = [] for container_instance_id in list_container_instance_ids: container_instance_id = container_instance_id.split("/")[-1] container_instance = self.container_instances[cluster.name].get( container_instance_id, None ) if container_instance is not None: container_instance_objects.append(container_instance) else: failures.append( ContainerInstanceFailure( "MISSING", container_instance_id, self.account_id, self.region_name, ) ) return container_instance_objects, failures def update_container_instances_state( self, cluster_str: str, list_container_instance_ids: List[str], status: str ) -> Tuple[List[ContainerInstance], List[ContainerInstanceFailure]]: cluster = self._get_cluster(cluster_str) status = status.upper() if status not in ["ACTIVE", "DRAINING"]: raise InvalidParameterException( "Container instance status should be one of [ACTIVE, DRAINING]" ) failures = [] container_instance_objects = [] list_container_instance_ids = [ x.split("/")[-1] for x in list_container_instance_ids ] for container_instance_id in list_container_instance_ids: container_instance = self.container_instances[cluster.name].get( container_instance_id, None ) if container_instance is not None: container_instance.status = status container_instance_objects.append(container_instance) else: failures.append( ContainerInstanceFailure( "MISSING", container_instance_id, self.account_id, self.region_name, ) ) return container_instance_objects, failures def update_container_instance_resources( self, container_instance: ContainerInstance, task_resources: Dict[str, Any], removing: bool = False, ) -> None: resource_multiplier = 1 if removing: resource_multiplier = -1 for resource in container_instance.remaining_resources: if resource.get("name") == "CPU": resource["integerValue"] -= ( task_resources.get("CPU") * resource_multiplier # type: ignore[operator] ) elif resource.get("name") == "MEMORY": resource["integerValue"] -= ( task_resources.get("MEMORY") * resource_multiplier # type: ignore[operator] ) elif resource.get("name") == "PORTS": for port in task_resources.get("PORTS"): # type: ignore[union-attr] if removing: resource["stringSetValue"].remove(str(port)) else: resource["stringSetValue"].append(str(port)) container_instance.running_tasks_count += resource_multiplier * 1 def deregister_container_instance( self, cluster_str: str, container_instance_str: str, force: bool ) -> ContainerInstance: cluster = self._get_cluster(cluster_str) container_instance_id = container_instance_str.split("/")[-1] container_instance = self.container_instances[cluster.name].get( container_instance_id ) if container_instance is None: raise Exception("{0} is not a container id in the cluster") if not force and container_instance.running_tasks_count > 0: raise JsonRESTError( error_type="InvalidParameter", message="Found running tasks on the instance.", ) # Currently assume that people might want to do something based around deregistered instances # with tasks left running on them - but nothing if no tasks were running already elif force and container_instance.running_tasks_count > 0: if not self.container_instances.get("orphaned"): self.container_instances["orphaned"] = {} self.container_instances["orphaned"][container_instance_id] = ( container_instance ) del self.container_instances[cluster.name][container_instance_id] self._respond_to_cluster_state_update(cluster_str) return container_instance def _respond_to_cluster_state_update(self, cluster_str: str) -> None: self._get_cluster(cluster_str) pass def put_attributes( self, cluster_name: str, attributes: Optional[List[Dict[str, Any]]] = None ) -> None: cluster = self._get_cluster(cluster_name) if attributes is None: raise InvalidParameterException("attributes can not be empty") for attr in attributes: self._put_attribute( cluster.name, attr["name"], attr.get("value"), attr.get("targetId"), attr.get("targetType"), ) def _put_attribute( self, cluster_name: str, name: str, value: Optional[str] = None, target_id: Optional[str] = None, target_type: Optional[str] = None, ) -> None: if target_id is None and target_type is None: for instance in self.container_instances[cluster_name].values(): instance.attributes[name] = value elif target_type is None: # targetId is full container instance arn try: arn = target_id.rsplit("/", 1)[-1] # type: ignore[union-attr] self.container_instances[cluster_name][arn].attributes[name] = value except KeyError: raise JsonRESTError( "TargetNotFoundException", f"Could not find {target_id}" ) else: # targetId is container uuid, targetType must be container-instance try: if target_type != "container-instance": raise JsonRESTError( "TargetNotFoundException", f"Could not find {target_id}" ) self.container_instances[cluster_name][target_id].attributes[ # type: ignore[index] name ] = value except KeyError: raise JsonRESTError( "TargetNotFoundException", f"Could not find {target_id}" ) def list_attributes( self, target_type: str, cluster_name: Optional[str] = None, attr_name: Optional[str] = None, attr_value: Optional[str] = None, ) -> Any: """ Pagination is not yet implemented """ if target_type != "container-instance": raise JsonRESTError( "InvalidParameterException", "targetType must be container-instance" ) filters = [lambda x: True] # item will be {0 cluster_name, 1 arn, 2 name, 3 value} if cluster_name is not None: filters.append(lambda item: item[0] == cluster_name) if attr_name: filters.append(lambda item: item[2] == attr_name) if attr_name: filters.append(lambda item: item[3] == attr_value) all_attrs = [] for cluster_name, cobj in self.container_instances.items(): for container_instance in cobj.values(): for key, value in container_instance.attributes.items(): all_attrs.append( ( cluster_name, container_instance.container_instance_arn, key, value, ) ) return filter(lambda x: all(f(x) for f in filters), all_attrs) # type: ignore def delete_attributes( self, cluster_name: str, attributes: Optional[List[Dict[str, Any]]] = None ) -> None: cluster = self._get_cluster(cluster_name) if attributes is None: raise JsonRESTError( "InvalidParameterException", "attributes value is required" ) for attr in attributes: self._delete_attribute( cluster.name, attr["name"], attr.get("value"), attr.get("targetId"), attr.get("targetType"), ) def _delete_attribute( self, cluster_name: str, name: str, value: Optional[str] = None, target_id: Optional[str] = None, target_type: Optional[str] = None, ) -> None: if target_id is None and target_type is None: for instance in self.container_instances[cluster_name].values(): if name in instance.attributes and instance.attributes[name] == value: del instance.attributes[name] elif target_type is None: # targetId is full container instance arn try: arn = target_id.rsplit("/", 1)[-1] # type: ignore[union-attr] instance = self.container_instances[cluster_name][arn] if name in instance.attributes and instance.attributes[name] == value: del instance.attributes[name] except KeyError: raise JsonRESTError( "TargetNotFoundException", f"Could not find {target_id}" ) else: # targetId is container uuid, targetType must be container-instance try: if target_type != "container-instance": raise JsonRESTError( "TargetNotFoundException", f"Could not find {target_id}" ) instance = self.container_instances[cluster_name][target_id] # type: ignore[index] if name in instance.attributes and instance.attributes[name] == value: del instance.attributes[name] except KeyError: raise JsonRESTError( "TargetNotFoundException", f"Could not find {target_id}" ) def list_task_definition_families( self, family_prefix: Optional[str] = None ) -> Iterator[str]: """ The Status and pagination parameters are not yet implemented """ for task_fam in self.task_definitions: if family_prefix is not None and not task_fam.startswith(family_prefix): continue yield task_fam @staticmethod def _parse_resource_arn(resource_arn: str) -> Dict[str, str]: regexes = [ ARN_PARTITION_REGEX + ":ecs:(?P<region>[^:]+):(?P<account_id>[^:]+):(?P<service>[^:]+)/(?P<cluster_id>[^:]+)/(?P<service_id>[^:]+)/ecs-svc/(?P<id>.*)$", ARN_PARTITION_REGEX + ":ecs:(?P<region>[^:]+):(?P<account_id>[^:]+):(?P<service>[^:]+)/(?P<cluster_id>[^:]+)/(?P<id>.*)$", ARN_PARTITION_REGEX + ":ecs:(?P<region>[^:]+):(?P<account_id>[^:]+):(?P<service>[^:]+)/(?P<id>.*)$", ] for regex in regexes: match = re.match(regex, resource_arn) if match: return match.groupdict() raise JsonRESTError("InvalidParameterException", "The ARN provided is invalid.") def _get_resource(self, resource_arn: str, parsed_arn: Dict[str, str]) -> Any: if parsed_arn["service"] == "cluster": return self._get_cluster(parsed_arn["id"]) if parsed_arn["service"] == "service": for service in self.services.values(): if service.arn == resource_arn: return service raise ServiceNotFoundException elif parsed_arn["service"] == "task-set": c_id = parsed_arn["cluster_id"] s_id = parsed_arn["service_id"] services, _ = self.describe_services( cluster_str=c_id, service_names_or_arns=[s_id] ) for service in services: for task_set in service.task_sets: if task_set.task_set_arn == resource_arn: return task_set raise ServiceNotFoundException elif parsed_arn["service"] == "task-definition": task_def = self.describe_task_definition( task_definition_str=parsed_arn["id"] ) return task_def elif parsed_arn["service"] == "capacity-provider": return self._get_provider(parsed_arn["id"]) elif parsed_arn["service"] == "task": for task in self.list_tasks(): if task.task_arn == resource_arn: return task raise NotImplementedError() def list_tags_for_resource(self, resource_arn: str) -> List[Dict[str, str]]: """Currently implemented only for task definitions and services""" parsed_arn = self._parse_resource_arn(resource_arn) resource = self._get_resource(resource_arn, parsed_arn) return resource.tags def _get_last_task_definition_revision_id(self, family: str) -> int: # type: ignore[return] definitions = self.task_definitions.get(family) if definitions: return max(definitions.keys()) def tag_resource(self, resource_arn: str, tags: List[Dict[str, str]]) -> None: parsed_arn = self._parse_resource_arn(resource_arn) resource = self._get_resource(resource_arn, parsed_arn) resource.tags = self._merge_tags(resource.tags or [], tags) def _merge_tags( self, existing_tags: List[Dict[str, str]], new_tags: List[Dict[str, str]] ) -> List[Dict[str, str]]: merged_tags = new_tags new_keys = self._get_keys(new_tags) for existing_tag in existing_tags: if existing_tag["key"] not in new_keys: merged_tags.append(existing_tag) return merged_tags @staticmethod def _get_keys(tags: List[Dict[str, str]]) -> List[str]: return [tag["key"] for tag in tags] def untag_resource(self, resource_arn: str, tag_keys: List[str]) -> None: parsed_arn = self._parse_resource_arn(resource_arn) resource = self._get_resource(resource_arn, parsed_arn) resource.tags = [tag for tag in resource.tags if tag["key"] not in tag_keys] def create_task_set( self, service: str, cluster_str: str, task_definition: str, external_id: Optional[str] = None, network_configuration: Optional[Dict[str, Any]] = None, load_balancers: Optional[List[Dict[str, Any]]] = None, service_registries: Optional[List[Dict[str, Any]]] = None, launch_type: Optional[str] = None, capacity_provider_strategy: Optional[List[Dict[str, Any]]] = None, platform_version: Optional[str] = None, scale: Optional[Dict[str, Any]] = None, client_token: Optional[str] = None, tags: Optional[List[Dict[str, str]]] = None, ) -> TaskSet: launch_type = launch_type if launch_type is not None else "EC2" if launch_type not in ["EC2", "FARGATE"]: raise EcsClientException("launch type should be one of [EC2,FARGATE]") task_set = TaskSet( service, cluster_str, task_definition, self.account_id, self.region_name, external_id=external_id, network_configuration=network_configuration, load_balancers=load_balancers, service_registries=service_registries, launch_type=launch_type, capacity_provider_strategy=capacity_provider_strategy, platform_version=platform_version, scale=scale, client_token=client_token, tags=tags, ) service_name = service.split("/")[-1] cluster_obj = self._get_cluster(cluster_str) service_obj = self.services.get(f"{cluster_obj.name}:{service_name}") if not service_obj: raise ServiceNotFoundException task_def_obj = self.describe_task_definition(task_definition) task_set.task_definition = task_def_obj.arn task_set.service_arn = service_obj.arn task_set.cluster_arn = cluster_obj.arn service_obj.task_sets.append(task_set) # TODO: validate load balancers if scale: if scale.get("unit") == "PERCENT": desired_count = service_obj.desired_count nr_of_tasks = int(desired_count * (scale["value"] / 100)) all_tags = {} if service_obj.propagate_tags == "TASK_DEFINITION": all_tags.update({t["key"]: t["value"] for t in task_def_obj.tags}) if service_obj.propagate_tags == "SERVICE": all_tags.update({t["key"]: t["value"] for t in service_obj.tags}) all_tags.update({t["key"]: t["value"] for t in (tags or [])}) self.run_task( cluster_str=cluster_str, task_definition_str=task_definition, count=nr_of_tasks, overrides=None, started_by=self.account_id, tags=[{"key": k, "value": v} for k, v in all_tags.items()], launch_type=launch_type, networking_configuration=network_configuration, ) return task_set def describe_task_sets( self, cluster_str: str, service: str, task_sets: Optional[List[str]] = None ) -> List[TaskSet]: task_sets = task_sets or [] cluster_obj = self._get_cluster(cluster_str) service_name = service.split("/")[-1] service_key = f"{cluster_obj.name}:{service_name}" service_obj = self.services.get(service_key) if not service_obj: raise ServiceNotFoundException task_set_results = [] if task_sets: for task_set in service_obj.task_sets: # Match full ARN if task_set.task_set_arn in task_sets: task_set_results.append(task_set) # Match partial ARN if only the taskset ID is provided elif "/".join(task_set.task_set_arn.split("/")[-2:]) in task_sets: task_set_results.append(task_set) else: task_set_results = service_obj.task_sets return task_set_results def delete_task_set(self, cluster: str, service: str, task_set: str) -> TaskSet: """ The Force-parameter is not yet implemented """ cluster_name = cluster.split("/")[-1] service_name = service.split("/")[-1] service_key = f"{cluster_name}:{service_name}" task_set_element = None for i, ts in enumerate(self.services[service_key].task_sets): if task_set == ts.task_set_arn or task_set == "/".join( ts.task_set_arn.split("/")[-2:] ): task_set_element = i if task_set_element is not None: deleted_task_set = self.services[service_key].task_sets.pop( task_set_element ) else: raise TaskSetNotFoundException # TODO: add logic for `force` to raise an exception if `PRIMARY` task has not been scaled to 0. return deleted_task_set def update_task_set( self, cluster: str, service: str, task_set: str, scale: Dict[str, Any] ) -> TaskSet: cluster_name = cluster.split("/")[-1] service_name = service.split("/")[-1] task_set_obj = self.describe_task_sets( cluster_name, service_name, task_sets=[task_set] )[0] task_set_obj.scale = scale return task_set_obj def update_service_primary_task_set( self, cluster: str, service: str, primary_task_set: str ) -> TaskSet: """Updates task sets be PRIMARY or ACTIVE for given cluster:service task sets""" cluster_name = cluster.split("/")[-1] service_name = service.split("/")[-1] task_set_obj = self.describe_task_sets( cluster_name, service_name, task_sets=[primary_task_set] )[0] services, _ = self.describe_services(cluster, [service]) service_obj = services[0] service_obj.load_balancers = task_set_obj.load_balancers service_obj.task_definition = task_set_obj.task_definition for task_set in service_obj.task_sets: if task_set.task_set_arn == primary_task_set: task_set.status = "PRIMARY" else: task_set.status = "ACTIVE" return task_set_obj def list_account_settings( self, name: Optional[str] = None, value: Optional[str] = None ) -> List[AccountSetting]: expected_names = [ "serviceLongArnFormat", "taskLongArnFormat", "containerInstanceLongArnFormat", "containerLongArnFormat", "awsvpcTrunking", "containerInsights", "dualStackIPv6", ] if name and name not in expected_names: raise UnknownAccountSettingException() all_settings = self.account_settings.values() return [ s for s in all_settings if (not name or s.name == name) and (not value or s.value == value) ] def put_account_setting(self, name: str, value: str) -> AccountSetting: account_setting = AccountSetting(name, value) self.account_settings[name] = account_setting return account_setting def delete_account_setting(self, name: str) -> None: self.account_settings.pop(name, None) def enable_long_arn_for_name(self, name: str) -> bool: account = self.account_settings.get(name, None) if account and account.value == "disabled": return False return settings.ecs_new_arn_format() ecs_backends = BackendDict(EC2ContainerServiceBackend, "ecs")
Memory