from re import compile as re_compile from typing import Any, Dict, List, Optional from moto.core.base_backend import BackendDict, BaseBackend from moto.core.common_models import BaseModel from moto.core.utils import utcnow from moto.utilities.paginator import paginate from moto.utilities.utils import get_partition from ..moto_api._internal import mock_random from .exceptions import ( CacheClusterAlreadyExists, CacheClusterNotFound, InvalidARNFault, InvalidParameterCombinationException, InvalidParameterValueException, UserAlreadyExists, UserNotFound, ) from .utils import PAGINATION_MODEL, AuthenticationTypes class User(BaseModel): def __init__( self, account_id: str, region: str, user_id: str, user_name: str, access_string: str, engine: str, no_password_required: bool, passwords: Optional[List[str]] = None, authentication_type: Optional[str] = None, ): self.id = user_id self.name = user_name self.engine = engine self.passwords = passwords or [] self.access_string = access_string self.no_password_required = no_password_required self.status = "active" self.minimum_engine_version = "6.0" self.usergroupids: List[str] = [] self.region = region self.arn = f"arn:{get_partition(self.region)}:elasticache:{self.region}:{account_id}:user:{self.id}" self.authentication_type = authentication_type class CacheCluster(BaseModel): def __init__( self, account_id: str, region_name: str, cache_cluster_id: str, replication_group_id: Optional[str], az_mode: Optional[str], preferred_availability_zone: Optional[str], num_cache_nodes: Optional[int], cache_node_type: Optional[str], engine: Optional[str], engine_version: Optional[str], cache_parameter_group_name: Optional[str], cache_subnet_group_name: Optional[str], transit_encryption_enabled: Optional[bool], network_type: Optional[str], ip_discovery: Optional[str], snapshot_name: Optional[str], preferred_maintenance_window: Optional[str], port: Optional[int], notification_topic_arn: Optional[str], auto_minor_version_upgrade: Optional[bool], snapshot_retention_limit: Optional[int], snapshot_window: Optional[str], auth_token: Optional[str], outpost_mode: Optional[str], preferred_outpost_arn: Optional[str], preferred_availability_zones: Optional[List[str]], cache_security_group_names: Optional[List[str]], security_group_ids: Optional[List[str]], tags: Optional[List[Dict[str, str]]], snapshot_arns: Optional[List[str]], preferred_outpost_arns: Optional[List[str]], log_delivery_configurations: List[Dict[str, Any]], cache_node_ids_to_remove: Optional[List[str]], cache_node_ids_to_reboot: Optional[List[str]], ): if tags is None: tags = [] self.cache_cluster_id = cache_cluster_id self.az_mode = az_mode self.preferred_availability_zone = preferred_availability_zone self.preferred_availability_zones = preferred_availability_zones or [] self.engine = engine or "redis" self.engine_version = engine_version if engine == "redis": self.num_cache_nodes = 1 self.replication_group_id = replication_group_id self.snapshot_arns = snapshot_arns or [] self.snapshot_name = snapshot_name self.snapshot_window = snapshot_window if engine == "memcached": if num_cache_nodes is None: self.num_cache_nodes = 1 elif 1 <= num_cache_nodes <= 40: self.num_cache_nodes = num_cache_nodes self.cache_node_type = cache_node_type self.cache_parameter_group_name = cache_parameter_group_name self.cache_subnet_group_name = cache_subnet_group_name self.cache_security_group_names = cache_security_group_names or [] self.security_group_ids = security_group_ids or [] self.tags = tags self.preferred_maintenance_window = preferred_maintenance_window self.port = port or 6379 self.notification_topic_arn = notification_topic_arn self.auto_minor_version_upgrade = auto_minor_version_upgrade self.snapshot_retention_limit = snapshot_retention_limit or 0 self.auth_token = auth_token self.outpost_mode = outpost_mode self.preferred_outpost_arn = preferred_outpost_arn self.preferred_outpost_arns = preferred_outpost_arns or [] self.log_delivery_configurations = log_delivery_configurations or [] self.transit_encryption_enabled = transit_encryption_enabled self.network_type = network_type self.ip_discovery = ip_discovery self.cache_node_ids_to_remove = cache_node_ids_to_remove self.cache_node_ids_to_reboot = cache_node_ids_to_reboot self.cache_cluster_create_time = utcnow() self.auth_token_last_modified_date = utcnow() self.cache_cluster_status = "available" self.arn = f"arn:{get_partition(region_name)}:elasticache:{region_name}:{account_id}:cluster:{cache_cluster_id}" self.cache_node_id = str(mock_random.uuid4()) def get_tags(self) -> List[Dict[str, str]]: return self.tags class ElastiCacheBackend(BaseBackend): """Implementation of ElastiCache APIs.""" def __init__(self, region_name: str, account_id: str): super().__init__(region_name, account_id) self.arn_regex = re_compile( r"^arn:aws:elasticache:.*:[0-9]*:(cluster|snapshot):.*$" ) self.users = dict() self.users["default"] = User( account_id=self.account_id, region=self.region_name, user_id="default", user_name="default", engine="redis", access_string="on ~* +@all", no_password_required=True, ) self.cache_clusters: Dict[str, Any] = dict() def create_user( self, user_id: str, user_name: str, engine: str, passwords: List[str], access_string: str, no_password_required: bool, authentication_type: str, # contain it to the str in the enums TODO ) -> User: if user_id in self.users: raise UserAlreadyExists if authentication_type not in AuthenticationTypes._value2member_map_: raise InvalidParameterValueException( f"Input Authentication type: {authentication_type} is not in the allowed list: [password,no-password-required,iam]" ) if ( no_password_required and authentication_type != AuthenticationTypes.NOPASSWORD ): raise InvalidParameterCombinationException( f"No password required flag is true but provided authentication type is {authentication_type}" ) if passwords and authentication_type != AuthenticationTypes.PASSWORD: raise InvalidParameterCombinationException( f"Password field is not allowed with authentication type: {authentication_type}" ) if not passwords and authentication_type == AuthenticationTypes.PASSWORD: raise InvalidParameterCombinationException( "A user with Authentication Mode: password, must have at least one password" ) user = User( account_id=self.account_id, region=self.region_name, user_id=user_id, user_name=user_name, engine=engine, passwords=passwords, access_string=access_string, no_password_required=no_password_required, authentication_type=authentication_type, ) self.users[user_id] = user return user def delete_user(self, user_id: str) -> User: if user_id in self.users: user = self.users[user_id] if user.status == "active": user.status = "deleting" return user raise UserNotFound(user_id) def describe_users(self, user_id: Optional[str]) -> List[User]: """ Only the `user_id` parameter is currently supported. Pagination is not yet implemented. """ if user_id: if user_id in self.users: user = self.users[user_id] if user.status == "deleting": self.users.pop(user_id) return [user] else: raise UserNotFound(user_id) return list(self.users.values()) def create_cache_cluster( self, cache_cluster_id: str, replication_group_id: str, az_mode: str, preferred_availability_zone: str, num_cache_nodes: int, cache_node_type: str, engine: str, engine_version: str, cache_parameter_group_name: str, cache_subnet_group_name: str, transit_encryption_enabled: bool, network_type: str, ip_discovery: str, snapshot_name: str, preferred_maintenance_window: str, port: int, notification_topic_arn: str, auto_minor_version_upgrade: bool, snapshot_retention_limit: int, snapshot_window: str, auth_token: str, outpost_mode: str, preferred_outpost_arn: str, preferred_availability_zones: List[str], cache_security_group_names: List[str], security_group_ids: List[str], tags: List[Dict[str, str]], snapshot_arns: List[str], preferred_outpost_arns: List[str], log_delivery_configurations: List[Dict[str, Any]], cache_node_ids_to_remove: List[str], cache_node_ids_to_reboot: List[str], ) -> CacheCluster: if cache_cluster_id in self.cache_clusters: raise CacheClusterAlreadyExists(cache_cluster_id) cache_cluster = CacheCluster( account_id=self.account_id, region_name=self.region_name, cache_cluster_id=cache_cluster_id, replication_group_id=replication_group_id, az_mode=az_mode, preferred_availability_zone=preferred_availability_zone, preferred_availability_zones=preferred_availability_zones, num_cache_nodes=num_cache_nodes, cache_node_type=cache_node_type, engine=engine, engine_version=engine_version, cache_parameter_group_name=cache_parameter_group_name, cache_subnet_group_name=cache_subnet_group_name, cache_security_group_names=cache_security_group_names, security_group_ids=security_group_ids, tags=tags, snapshot_arns=snapshot_arns, snapshot_name=snapshot_name, preferred_maintenance_window=preferred_maintenance_window, port=port, notification_topic_arn=notification_topic_arn, auto_minor_version_upgrade=auto_minor_version_upgrade, snapshot_retention_limit=snapshot_retention_limit, snapshot_window=snapshot_window, auth_token=auth_token, outpost_mode=outpost_mode, preferred_outpost_arn=preferred_outpost_arn, preferred_outpost_arns=preferred_outpost_arns, log_delivery_configurations=log_delivery_configurations, transit_encryption_enabled=transit_encryption_enabled, network_type=network_type, ip_discovery=ip_discovery, cache_node_ids_to_remove=cache_node_ids_to_remove, cache_node_ids_to_reboot=cache_node_ids_to_reboot, ) self.cache_clusters[cache_cluster_id] = cache_cluster return cache_cluster @paginate(PAGINATION_MODEL) def describe_cache_clusters( self, cache_cluster_id: str, max_records: int, marker: str, ) -> List[CacheCluster]: if max_records is None: max_records = 100 if cache_cluster_id: if cache_cluster_id in self.cache_clusters: cache_cluster = self.cache_clusters[cache_cluster_id] return list([cache_cluster]) else: raise CacheClusterNotFound(cache_cluster_id) cache_clusters = list(self.cache_clusters.values())[:max_records] return cache_clusters def delete_cache_cluster(self, cache_cluster_id: str) -> CacheCluster: if cache_cluster_id: if cache_cluster_id in self.cache_clusters: cache_cluster = self.cache_clusters[cache_cluster_id] cache_cluster.cache_cluster_status = "deleting" return cache_cluster raise CacheClusterNotFound(cache_cluster_id) def list_tags_for_resource(self, arn: str) -> List[Dict[str, str]]: if self.arn_regex.match(arn): arn_breakdown = arn.split(":") resource_type = arn_breakdown[len(arn_breakdown) - 2] resource_name = arn_breakdown[len(arn_breakdown) - 1] if resource_type == "cluster": if resource_name in self.cache_clusters: return self.cache_clusters[resource_name].get_tags() else: return [] else: raise InvalidARNFault(arn) return [] elasticache_backends = BackendDict(ElastiCacheBackend, "elasticache")
Memory