import re import time from collections import OrderedDict from enum import Enum, unique from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple, Union from moto.core.base_backend import BackendDict, BaseBackend from moto.core.common_models import BaseModel from moto.ecs import ecs_backends from moto.moto_api._internal import mock_random from moto.utilities.utils import ARN_PARTITION_REGEX, get_partition from .exceptions import AWSValidationException if TYPE_CHECKING: from moto.cloudwatch.models import FakeAlarm @unique class ResourceTypeExceptionValueSet(Enum): RESOURCE_TYPE = "ResourceType" # MSK currently only has the "broker-storage" resource type which is not part of the resource_id KAFKA_BROKER_STORAGE = "broker-storage" @unique class ServiceNamespaceValueSet(Enum): APPSTREAM = "appstream" RDS = "rds" LAMBDA = "lambda" CASSANDRA = "cassandra" DYNAMODB = "dynamodb" CUSTOM_RESOURCE = "custom-resource" ELASTICMAPREDUCE = "elasticmapreduce" EC2 = "ec2" COMPREHEND = "comprehend" ECS = "ecs" SAGEMAKER = "sagemaker" KAFKA = "kafka" @unique class ScalableDimensionValueSet(Enum): CASSANDRA_TABLE_READ_CAPACITY_UNITS = "cassandra:table:ReadCapacityUnits" CASSANDRA_TABLE_WRITE_CAPACITY_UNITS = "cassandra:table:WriteCapacityUnits" DYNAMODB_INDEX_READ_CAPACITY_UNITS = "dynamodb:index:ReadCapacityUnits" DYNAMODB_INDEX_WRITE_CAPACITY_UNITS = "dynamodb:index:WriteCapacityUnits" DYNAMODB_TABLE_READ_CAPACITY_UNITS = "dynamodb:table:ReadCapacityUnits" DYNAMODB_TABLE_WRITE_CAPACITY_UNITS = "dynamodb:table:WriteCapacityUnits" RDS_CLUSTER_READ_REPLICA_COUNT = "rds:cluster:ReadReplicaCount" RDS_CLUSTER_CAPACITY = "rds:cluster:Capacity" COMPREHEND_DOCUMENT_CLASSIFIER_ENDPOINT_DESIRED_INFERENCE_UNITS = ( "comprehend:document-classifier-endpoint:DesiredInferenceUnits" ) ELASTICMAPREDUCE_INSTANCE_FLEET_ON_DEMAND_CAPACITY = ( "elasticmapreduce:instancefleet:OnDemandCapacity" ) ELASTICMAPREDUCE_INSTANCE_FLEET_SPOT_CAPACITY = ( "elasticmapreduce:instancefleet:SpotCapacity" ) ELASTICMAPREDUCE_INSTANCE_GROUP_INSTANCE_COUNT = ( "elasticmapreduce:instancegroup:InstanceCount" ) LAMBDA_FUNCTION_PROVISIONED_CONCURRENCY = "lambda:function:ProvisionedConcurrency" APPSTREAM_FLEET_DESIRED_CAPACITY = "appstream:fleet:DesiredCapacity" CUSTOM_RESOURCE_RESOURCE_TYPE_PROPERTY = "custom-resource:ResourceType:Property" SAGEMAKER_VARIANT_DESIRED_INSTANCE_COUNT = "sagemaker:variant:DesiredInstanceCount" EC2_SPOT_FLEET_REQUEST_TARGET_CAPACITY = "ec2:spot-fleet-request:TargetCapacity" ECS_SERVICE_DESIRED_COUNT = "ecs:service:DesiredCount" KAFKA_BROKER_STORAGE_VOLUME_SIZE = "kafka:broker-storage:VolumeSize" class ApplicationAutoscalingBackend(BaseBackend): def __init__(self, region_name: str, account_id: str) -> None: super().__init__(region_name, account_id) self.ecs_backend = ecs_backends[account_id][region_name] self.targets: Dict[str, Dict[str, FakeScalableTarget]] = OrderedDict() self.policies: Dict[str, FakeApplicationAutoscalingPolicy] = {} self.scheduled_actions: List[FakeScheduledAction] = list() def describe_scalable_targets( self, namespace: str, r_ids: Union[None, List[str]], dimension: Union[None, str] ) -> List["FakeScalableTarget"]: if r_ids is None: r_ids = [] targets = self._flatten_scalable_targets(namespace) if dimension is not None: targets = [t for t in targets if t.scalable_dimension == dimension] if len(r_ids) > 0: targets = [t for t in targets if t.resource_id in r_ids] return targets def _flatten_scalable_targets(self, namespace: str) -> List["FakeScalableTarget"]: """Flatten scalable targets for a given service namespace down to a list.""" targets = [] for dimension in self.targets.keys(): for resource_id in self.targets[dimension].keys(): targets.append(self.targets[dimension][resource_id]) targets = [t for t in targets if t.service_namespace == namespace] return targets def register_scalable_target( self, namespace: str, r_id: str, dimension: str, min_capacity: Optional[int], max_capacity: Optional[int], role_arn: str, suspended_state: str, ) -> "FakeScalableTarget": _ = _target_params_are_valid(namespace, r_id, dimension) if namespace == ServiceNamespaceValueSet.ECS.value: _ = self._ecs_service_exists_for_target(r_id) if self._scalable_target_exists(r_id, dimension): target = self.targets[dimension][r_id] target.update(min_capacity, max_capacity, suspended_state) else: target = FakeScalableTarget( self, namespace, r_id, dimension, min_capacity, max_capacity, role_arn, suspended_state, ) self._add_scalable_target(target) return target def _scalable_target_exists(self, r_id: str, dimension: str) -> bool: return r_id in self.targets.get(dimension, []) def _ecs_service_exists_for_target(self, r_id: str) -> bool: """Raises a ValidationException if an ECS service does not exist for the specified resource ID. """ _, cluster, service = r_id.split("/") result, _ = self.ecs_backend.describe_services(cluster, [service]) if len(result) != 1: raise AWSValidationException(f"ECS service doesn't exist: {r_id}") return True def _add_scalable_target( self, target: "FakeScalableTarget" ) -> "FakeScalableTarget": if target.scalable_dimension not in self.targets: self.targets[target.scalable_dimension] = OrderedDict() if target.resource_id not in self.targets[target.scalable_dimension]: self.targets[target.scalable_dimension][target.resource_id] = target return target def deregister_scalable_target( self, namespace: str, r_id: str, dimension: str ) -> None: if self._scalable_target_exists(r_id, dimension): del self.targets[dimension][r_id] else: raise AWSValidationException( f"No scalable target found for service namespace: {namespace}, resource ID: {r_id}, scalable dimension: {dimension}" ) def put_scaling_policy( self, policy_name: str, service_namespace: str, resource_id: str, scalable_dimension: str, policy_body: Dict[str, Any], policy_type: Optional[None], ) -> "FakeApplicationAutoscalingPolicy": policy_key = FakeApplicationAutoscalingPolicy.formulate_key( service_namespace, resource_id, scalable_dimension, policy_name ) if policy_key in self.policies: old_policy = self.policies[policy_key] policy = FakeApplicationAutoscalingPolicy( account_id=self.account_id, region_name=self.region_name, policy_name=policy_name, service_namespace=service_namespace, resource_id=resource_id, scalable_dimension=scalable_dimension, policy_type=policy_type if policy_type else old_policy.policy_type, policy_body=policy_body if policy_body else old_policy._policy_body, ) else: policy = FakeApplicationAutoscalingPolicy( account_id=self.account_id, region_name=self.region_name, policy_name=policy_name, service_namespace=service_namespace, resource_id=resource_id, scalable_dimension=scalable_dimension, policy_type=policy_type, policy_body=policy_body, ) self.policies[policy_key] = policy return policy def describe_scaling_policies( self, service_namespace: str, resource_id: str, scalable_dimension: str, max_results: Optional[int], next_token: str, ) -> Tuple[Optional[str], List["FakeApplicationAutoscalingPolicy"]]: max_results = max_results or 100 policies = [ policy for policy in self.policies.values() if policy.service_namespace == service_namespace ] if resource_id: policies = [ policy for policy in policies if policy.resource_id in resource_id ] if scalable_dimension: policies = [ policy for policy in policies if policy.scalable_dimension in scalable_dimension ] starting_point = int(next_token) if next_token else 0 ending_point = starting_point + max_results policies_page = policies[starting_point:ending_point] new_next_token = str(ending_point) if ending_point < len(policies) else None return new_next_token, policies_page def delete_scaling_policy( self, policy_name: str, service_namespace: str, resource_id: str, scalable_dimension: str, ) -> None: policy_key = FakeApplicationAutoscalingPolicy.formulate_key( service_namespace, resource_id, scalable_dimension, policy_name ) if policy_key in self.policies: policy = self.policies[policy_key] policy.delete_alarms(self.account_id, self.region_name) del self.policies[policy_key] else: raise AWSValidationException( f"No scaling policy found for service namespace: {service_namespace}, resource ID: {resource_id}, scalable dimension: {scalable_dimension}, policy name: {policy_name}" ) def delete_scheduled_action( self, service_namespace: str, scheduled_action_name: str, resource_id: str, scalable_dimension: str, ) -> None: self.scheduled_actions = [ a for a in self.scheduled_actions if not ( a.service_namespace == service_namespace and a.scheduled_action_name == scheduled_action_name and a.resource_id == resource_id and a.scalable_dimension == scalable_dimension ) ] def describe_scheduled_actions( self, scheduled_action_names: str, service_namespace: str, resource_id: str, scalable_dimension: str, ) -> List["FakeScheduledAction"]: """ Pagination is not yet implemented """ result = [ a for a in self.scheduled_actions if a.service_namespace == service_namespace ] if scheduled_action_names: result = [ a for a in result if a.scheduled_action_name in scheduled_action_names ] if resource_id: result = [a for a in result if a.resource_id == resource_id] if scalable_dimension: result = [a for a in result if a.scalable_dimension == scalable_dimension] return result def put_scheduled_action( self, service_namespace: str, schedule: str, timezone: str, scheduled_action_name: str, resource_id: str, scalable_dimension: str, start_time: str, end_time: str, scalable_target_action: str, ) -> None: existing_action = next( ( a for a in self.scheduled_actions if a.service_namespace == service_namespace and a.scheduled_action_name == scheduled_action_name and a.resource_id == resource_id and a.scalable_dimension == scalable_dimension ), None, ) if existing_action: existing_action.update( schedule, timezone, scheduled_action_name, start_time, end_time, scalable_target_action, ) else: action = FakeScheduledAction( service_namespace, schedule, timezone, scheduled_action_name, resource_id, scalable_dimension, start_time, end_time, scalable_target_action, self.account_id, self.region_name, ) self.scheduled_actions.append(action) def _target_params_are_valid(namespace: str, r_id: str, dimension: str) -> bool: """Check whether namespace, resource_id and dimension are valid and consistent with each other.""" is_valid = True valid_namespaces = [n.value for n in ServiceNamespaceValueSet] if namespace not in valid_namespaces: is_valid = False if dimension is not None: try: valid_dimensions = [d.value for d in ScalableDimensionValueSet] resource_type_exceptions = [r.value for r in ResourceTypeExceptionValueSet] d_namespace, d_resource_type, _ = dimension.split(":") if d_resource_type not in resource_type_exceptions: resource_type = _get_resource_type_from_resource_id(r_id) else: resource_type = d_resource_type if ( dimension not in valid_dimensions or d_namespace != namespace or resource_type != d_resource_type ): is_valid = False except ValueError: is_valid = False if not is_valid: raise AWSValidationException( "Unsupported service namespace, resource type or scalable dimension" ) return is_valid def _get_resource_type_from_resource_id(resource_id: str) -> str: # AWS Application Autoscaling resource_ids are multi-component (path-like) identifiers that vary in format, # depending on the type of resource it identifies. resource_type is one of its components. # resource_id format variations are described in # https://docs.aws.amazon.com/autoscaling/application/APIReference/API_RegisterScalableTarget.html # In a nutshell: # - Most use slash separators, but some use colon separators. # - The resource type is usually the first component of the resource_id... # - ...except for sagemaker endpoints, dynamodb GSIs and keyspaces tables, where it's the third. # - Comprehend uses an arn, with the resource type being the last element. if re.match(ARN_PARTITION_REGEX + ":comprehend", resource_id): resource_id = resource_id.split(":")[-1] resource_split = ( resource_id.split("/") if "/" in resource_id else resource_id.split(":") ) if ( resource_split[0] == "endpoint" or (resource_split[0] == "table" and len(resource_split) > 2) or (resource_split[0] == "keyspace") ): resource_type = resource_split[2] else: resource_type = resource_split[0] return resource_type class FakeScalableTarget(BaseModel): def __init__( self, backend: ApplicationAutoscalingBackend, service_namespace: str, resource_id: str, scalable_dimension: str, min_capacity: Optional[int], max_capacity: Optional[int], role_arn: str, suspended_state: str, ) -> None: self.applicationautoscaling_backend = backend self.service_namespace = service_namespace self.resource_id = resource_id self.scalable_dimension = scalable_dimension self.min_capacity = min_capacity self.max_capacity = max_capacity self.role_arn = role_arn self.suspended_state = suspended_state self.creation_time = time.time() self.arn = f"arn:{get_partition(backend.region_name)}:application-autoscaling:{backend.region_name}:{backend.account_id}:scalable-target/{mock_random.get_random_string(length=36, lower_case=True)}" def update( self, min_capacity: Optional[int], max_capacity: Optional[int], suspended_state: str, ) -> None: if min_capacity is not None: self.min_capacity = min_capacity if max_capacity is not None: self.max_capacity = max_capacity if suspended_state is not None: self.suspended_state = suspended_state class FakeApplicationAutoscalingPolicy(BaseModel): def __init__( self, account_id: str, region_name: str, policy_name: str, service_namespace: str, resource_id: str, scalable_dimension: str, policy_type: Optional[str], policy_body: Dict[str, Any], ) -> None: self.step_scaling_policy_configuration = None self.target_tracking_scaling_policy_configuration = None if policy_type == "StepScaling": self.step_scaling_policy_configuration = policy_body self.target_tracking_scaling_policy_configuration = None elif policy_type == "TargetTrackingScaling": self.step_scaling_policy_configuration = None self.target_tracking_scaling_policy_configuration = policy_body else: raise AWSValidationException( f"1 validation error detected: Value '{policy_type}' at 'policyType' failed to satisfy constraint: Member must satisfy enum value set: [PredictiveScaling, StepScaling, TargetTrackingScaling]" ) self._policy_body = policy_body self.service_namespace = service_namespace self.resource_id = resource_id self.scalable_dimension = scalable_dimension self.policy_name = policy_name self.policy_type = policy_type self._guid = mock_random.uuid4() self.policy_arn = f"arn:{get_partition(region_name)}:autoscaling:{region_name}:{account_id}:scalingPolicy:{self._guid}:resource/{self.service_namespace}/{self.resource_id}:policyName/{self.policy_name}" self.creation_time = time.time() self.alarms: List["FakeAlarm"] = [] self.account_id = account_id self.region_name = region_name self.create_alarms() def create_alarms(self) -> None: if self.policy_type == "TargetTrackingScaling": if self.service_namespace == "dynamodb": self.alarms.extend(self._generate_dynamodb_alarms()) if self.service_namespace == "ecs": self.alarms.extend(self._generate_ecs_alarms()) def _generate_dynamodb_alarms(self) -> List["FakeAlarm"]: from moto.cloudwatch.models import CloudWatchBackend, cloudwatch_backends cloudwatch: CloudWatchBackend = cloudwatch_backends[self.account_id][ self.region_name ] alarms = [] table_name = self.resource_id.split("/")[-1] alarm_action = f"{self.policy_arn}:createdBy/{mock_random.uuid4()}" alarm1 = cloudwatch.put_metric_alarm( name=f"TargetTracking-table/{table_name}-AlarmHigh-{mock_random.uuid4()}", namespace="AWS/DynamoDB", metric_name="ConsumedReadCapacityUnits", metric_data_queries=[], comparison_operator="GreaterThanThreshold", evaluation_periods=2, period=60, threshold=42.0, statistic="Sum", description=f"DO NOT EDIT OR DELETE. For TargetTrackingScaling policy {alarm_action}", dimensions=[{"name": "TableName", "value": table_name}], alarm_actions=[alarm_action], ) alarms.append(alarm1) alarm2 = cloudwatch.put_metric_alarm( name=f"TargetTracking-table/{table_name}-AlarmLow-{mock_random.uuid4()}", namespace="AWS/DynamoDB", metric_name="ConsumedReadCapacityUnits", metric_data_queries=[], comparison_operator="LessThanThreshold", evaluation_periods=15, period=60, threshold=30.0, statistic="Sum", description=f"DO NOT EDIT OR DELETE. For TargetTrackingScaling policy {alarm_action}", dimensions=[{"name": "TableName", "value": table_name}], alarm_actions=[alarm_action], ) alarms.append(alarm2) alarm3 = cloudwatch.put_metric_alarm( name=f"TargetTracking-table/{table_name}-ProvisionedCapacityHigh-{mock_random.uuid4()}", namespace="AWS/DynamoDB", metric_name="ProvisionedReadCapacityUnits", metric_data_queries=[], comparison_operator="GreaterThanThreshold", evaluation_periods=2, period=300, threshold=1.0, statistic="Average", description=f"DO NOT EDIT OR DELETE. For TargetTrackingScaling policy {alarm_action}", dimensions=[{"name": "TableName", "value": table_name}], alarm_actions=[alarm_action], ) alarms.append(alarm3) alarm4 = cloudwatch.put_metric_alarm( name=f"TargetTracking-table/{table_name}-ProvisionedCapacityLow-{mock_random.uuid4()}", namespace="AWS/DynamoDB", metric_name="ProvisionedReadCapacityUnits", metric_data_queries=[], comparison_operator="LessThanThreshold", evaluation_periods=3, period=300, threshold=1.0, statistic="Average", description=f"DO NOT EDIT OR DELETE. For TargetTrackingScaling policy {alarm_action}", dimensions=[{"name": "TableName", "value": table_name}], alarm_actions=[alarm_action], ) alarms.append(alarm4) return alarms def _generate_ecs_alarms(self) -> List["FakeAlarm"]: from moto.cloudwatch.models import CloudWatchBackend, cloudwatch_backends cloudwatch: CloudWatchBackend = cloudwatch_backends[self.account_id][ self.region_name ] alarms: List["FakeAlarm"] = [] alarm_action = f"{self.policy_arn}:createdBy/{mock_random.uuid4()}" config = self.target_tracking_scaling_policy_configuration or {} metric_spec = config.get("PredefinedMetricSpecification", {}) if "Memory" in metric_spec.get("PredefinedMetricType", ""): metric_name = "MemoryUtilization" else: metric_name = "CPUUtilization" _, cluster_name, service_name = self.resource_id.split("/") alarm1 = cloudwatch.put_metric_alarm( name=f"TargetTracking-{self.resource_id}-AlarmHigh-{mock_random.uuid4()}", namespace="AWS/ECS", metric_name=metric_name, metric_data_queries=[], comparison_operator="GreaterThanThreshold", evaluation_periods=3, period=60, threshold=6, unit="Percent", statistic="Average", description=f"DO NOT EDIT OR DELETE. For TargetTrackingScaling policy {alarm_action}", dimensions=[ {"name": "ClusterName", "value": cluster_name}, {"name": "ServiceName", "value": service_name}, ], alarm_actions=[alarm_action], ) alarms.append(alarm1) alarm2 = cloudwatch.put_metric_alarm( name=f"TargetTracking-{self.resource_id}-AlarmLow-{mock_random.uuid4()}", namespace="AWS/ECS", metric_name=metric_name, metric_data_queries=[], comparison_operator="LessThanThreshold", evaluation_periods=15, period=60, threshold=6, unit="Percent", statistic="Average", description=f"DO NOT EDIT OR DELETE. For TargetTrackingScaling policy {alarm_action}", dimensions=[ {"name": "ClusterName", "value": cluster_name}, {"name": "ServiceName", "value": service_name}, ], alarm_actions=[alarm_action], ) alarms.append(alarm2) return alarms def delete_alarms(self, account_id: str, region_name: str) -> None: from moto.cloudwatch.models import CloudWatchBackend, cloudwatch_backends cloudwatch: CloudWatchBackend = cloudwatch_backends[account_id][region_name] cloudwatch.delete_alarms([a.name for a in self.alarms]) @staticmethod def formulate_key( service_namespace: str, resource_id: str, scalable_dimension: str, policy_name: str, ) -> str: return ( f"{service_namespace}\t{resource_id}\t{scalable_dimension}\t{policy_name}" ) class FakeScheduledAction(BaseModel): def __init__( self, service_namespace: str, schedule: str, timezone: str, scheduled_action_name: str, resource_id: str, scalable_dimension: str, start_time: str, end_time: str, scalable_target_action: str, account_id: str, region: str, ) -> None: self.arn = f"arn:{get_partition(region)}:autoscaling:{region}:{account_id}:scheduledAction:{service_namespace}/{resource_id}:scheduledActionName/{scheduled_action_name}" self.service_namespace = service_namespace self.schedule = schedule self.timezone = timezone self.scheduled_action_name = scheduled_action_name self.resource_id = resource_id self.scalable_dimension = scalable_dimension self.start_time = start_time self.end_time = end_time self.scalable_target_action = scalable_target_action self.creation_time = time.time() def update( self, schedule: str, timezone: str, scheduled_action_name: str, start_time: str, end_time: str, scalable_target_action: str, ) -> None: if scheduled_action_name: self.scheduled_action_name = scheduled_action_name if schedule: self.schedule = schedule if timezone: self.timezone = timezone if scalable_target_action: self.scalable_target_action = scalable_target_action self.start_time = start_time self.end_time = end_time applicationautoscaling_backends = BackendDict( ApplicationAutoscalingBackend, "application-autoscaling" )
Memory