from __future__ import annotations import copy import math import os import re import string import uuid from collections import OrderedDict, defaultdict from datetime import datetime from functools import lru_cache, partialmethod from re import compile as re_compile from typing import ( TYPE_CHECKING, Any, Dict, Iterable, List, Literal, Optional, Protocol, Tuple, Union, ) from moto.core.base_backend import BackendDict, BaseBackend from moto.core.common_models import BaseModel, CloudFormationModel from moto.core.utils import unix_time, utcnow from moto.ec2.models import ec2_backends from moto.kms.models import KmsBackend, kms_backends from moto.moto_api._internal import mock_random as random from moto.secretsmanager.models import FakeSecret, SecretsManagerBackend from moto.utilities.utils import ARN_PARTITION_REGEX, load_resource from .exceptions import ( DBClusterNotFoundError, DBClusterParameterGroupNotFoundError, DBClusterSnapshotAlreadyExistsError, DBClusterSnapshotNotFoundError, DBClusterToBeDeletedHasActiveMembers, DBInstanceAlreadyExists, DBInstanceNotFoundError, DBParameterGroupAlreadyExistsError, DBParameterGroupNotFoundError, DBProxyAlreadyExistsFault, DBProxyNotFoundFault, DBProxyQuotaExceededFault, DBSecurityGroupNotFoundError, DBSnapshotAlreadyExistsError, DBSnapshotNotFoundFault, DBSubnetGroupNotFoundError, ExportTaskAlreadyExistsError, ExportTaskNotFoundError, InvalidDBClusterSnapshotStateFault, InvalidDBClusterStateFault, InvalidDBClusterStateFaultError, InvalidDBInstanceEngine, InvalidDBInstanceIdentifier, InvalidDBInstanceStateError, InvalidDBSnapshotIdentifier, InvalidExportSourceStateError, InvalidGlobalClusterStateFault, InvalidParameterCombination, InvalidParameterValue, InvalidSubnet, KMSKeyNotAccessibleFault, OptionGroupNotFoundFaultError, RDSClientError, SharedSnapshotQuotaExceeded, SnapshotQuotaExceededFault, SubscriptionAlreadyExistError, SubscriptionNotFoundError, ) from .serialize import XFormedAttributeAccessMixin from .utils import ( ClusterEngine, DbInstanceEngine, FilterDef, apply_filter, merge_filters, valid_preferred_maintenance_window, validate_filters, ) if TYPE_CHECKING: from moto.ec2.models.subnets import Subnet def find_cluster(cluster_arn: str) -> DBCluster: arn_parts = cluster_arn.split(":") region, account = arn_parts[3], arn_parts[4] return rds_backends[account][region].describe_db_clusters(cluster_arn)[0] KMS_ARN_PATTERN = re.compile( r"^arn:(aws|aws-us-gov|aws-cn):kms:(?P<region>\w+(?:-\w+)+):(?P<account_id>\d{12}):key\/(?P<key>[A-Za-z0-9]+(?:-[A-Za-z0-9]+)+)$" ) class SnapshotAttributesMixin: ALLOWED_ATTRIBUTE_NAMES = ["restore"] attributes: Dict[str, List[str]] def __init__(self, **kwargs: Any) -> None: super().__init__(**kwargs) self.attributes = defaultdict(list) for attribute in self.ALLOWED_ATTRIBUTE_NAMES: self.attributes[attribute] = [] def modify_attribute( self, attribute_name: str, values_to_add: Optional[List[str]], values_to_remove: Optional[List[str]], ) -> None: if not values_to_add: values_to_add = [] if not values_to_remove: values_to_remove = [] if attribute_name not in self.ALLOWED_ATTRIBUTE_NAMES: raise InvalidParameterValue(f"Invalid snapshot attribute {attribute_name}") common_values = set(values_to_add).intersection(values_to_remove) if common_values: raise InvalidParameterCombination( "A value may not appear in both the add list and remove list. " + f"{list(common_values)}" ) add = self.attributes[attribute_name] + values_to_add new_attribute_values = [value for value in add if value not in values_to_remove] if len(new_attribute_values) > int(os.getenv("MAX_SHARED_ACCOUNTS", 20)): raise SharedSnapshotQuotaExceeded() self.attributes[attribute_name] = new_attribute_values class ResourceWithEvents(Protocol): arn: str event_source_type: str @property def resource_id(self) -> str: raise NotImplementedError() class EventMixin: arn: str backend: RDSBackend event_source_type: str def add_event(self, event_type: str) -> None: self.backend.add_event(event_type, self) @property def resource_id(self) -> str: raise NotImplementedError( "Concrete classes must implement resource_id property." ) class TaggingMixin: _tags: List[Dict[str, str]] = [] @property def tags(self) -> List[Dict[str, str]]: return self._tags @tags.setter def tags(self, value: Optional[List[Dict[str, str]]]) -> None: if value is None: value = [] # Tags may come in as XFormedDict and we want a regular dict. coerced = [{"Key": tag["Key"], "Value": tag["Value"]} for tag in value] self._tags = coerced @property def tag_list(self) -> List[Dict[str, str]]: return self.tags def get_tags(self) -> List[Dict[str, str]]: return self.tags def add_tags(self, tags: List[Dict[str, str]]) -> List[Dict[str, str]]: new_keys = [tag_set["Key"] for tag_set in tags] updated_tags = [ tag_set for tag_set in self.tags if tag_set["Key"] not in new_keys ] updated_tags.extend(tags) self.tags = updated_tags return self.tags def remove_tags(self, tag_keys: List[str]) -> None: self.tags = [tag_set for tag_set in self.tags if tag_set["Key"] not in tag_keys] class RDSBaseModel(TaggingMixin, XFormedAttributeAccessMixin, BaseModel): resource_type: str def __init__(self, backend: RDSBackend, **kwargs: Any) -> None: super().__init__(**kwargs) self.backend = backend self.created = utcnow() @property def resource_id(self) -> str: raise NotImplementedError("Subclasses must implement resource_id property.") @property def region(self) -> str: return self.backend.region_name @property def account_id(self) -> str: return self.backend.account_id @property def partition(self) -> str: return self.backend.partition @property def arn(self) -> str: return f"arn:{self.partition}:rds:{self.region}:{self.account_id}:{self.resource_type}:{self.resource_id}" class DBProxyTarget(RDSBaseModel): resource_type = "proxy-target" def __init__( self, backend: RDSBackend, resource_id: str, endpoint: Optional[str], type: str, ): super().__init__(backend) self.endpoint = endpoint self.rds_resource_id = resource_id self.type = type self.port = 5432 self._registering = True # Not implemented yet: self.role = None self.target_arn = None @property def registering(self) -> bool: if self._registering is True: self._registering = False return True return self._registering @property def target_health(self) -> Dict[str, str]: return { "State": "REGISTERING" if self.registering else "AVAILABLE", } class DBProxyTargetGroup(RDSBaseModel): resource_type = "target-group" def __init__( self, backend: RDSBackend, name: str, proxy_name: str, ): super().__init__(backend) self._name = f"prx-tg-{random.get_random_string(length=17, lower_case=True)}" self.target_group_name = name self.db_proxy_name = proxy_name self.targets: List[DBProxyTarget] = [] self.max_connections = 100 self.max_idle_connections = 50 self.borrow_timeout = 120 self.session_pinning_filters: List[str] = [] self.created_date = utcnow() self.updated_date = utcnow() self.status = "available" self.is_default = True @property def resource_id(self) -> str: return self._name @property def target_group_arn(self) -> str: return self.arn @property def connection_pool_config(self) -> Dict[str, Any]: # type: ignore[misc] return { "MaxConnectionsPercent": self.max_connections, "MaxIdleConnectionsPercent": self.max_idle_connections, "ConnectionBorrowTimeout": self.borrow_timeout, "SessionPinningFilters": [ filter_ for filter_ in self.session_pinning_filters ], } class GlobalCluster(RDSBaseModel): resource_type = "global-cluster" def __init__( self, backend: RDSBackend, global_cluster_identifier: str, engine: str, engine_version: Optional[str], storage_encrypted: Optional[bool], deletion_protection: Optional[bool], ): super().__init__(backend) self.global_cluster_identifier = global_cluster_identifier self.unique_identifier = random.get_random_hex(8) self.global_cluster_resource_id = f"cluster-{self.unique_identifier}" self.engine = engine self.engine_version = engine_version or DBCluster.default_engine_version( self.engine ) self.storage_encrypted = storage_encrypted if self.storage_encrypted is None: self.storage_encrypted = False self.deletion_protection = deletion_protection if self.deletion_protection is None: self.deletion_protection = False self.members: List[DBCluster] = [] self.status = "available" @property def resource_id(self) -> str: return self.global_cluster_identifier @property def arn(self) -> str: # Global Clusters do not belong to a particular region. return super().arn.replace(self.region, "") @property def endpoint(self) -> str: ep = f"{self.global_cluster_identifier}.global-{self.unique_identifier}.global.rds.amazonaws.com" return ep @property def global_cluster_arn(self) -> str: return self.arn @property def readers(self) -> List[str]: readers = [ reader.db_cluster_arn for reader in self.members if not reader.is_writer ] return readers @property def global_cluster_members(self) -> List[Dict[str, Any]]: # type: ignore[misc] members: List[Dict[str, Any]] = [ { "DBClusterArn": member.db_cluster_arn, "IsWriter": True if member.is_writer else False, "DBClusterParameterGroupStatus": "in-sync", "PromotionTier": 1, # Not sure if this is correct, but current tests assert it being empty for non writers. "Readers": [], } for member in self.members ] for member in members: if member["IsWriter"]: member["Readers"] = self.readers else: member["GlobalWriteForwardingStatus"] = "disabled" return members class MasterUserSecret: """Class to manage the master user password for RDS clusters/instances.""" def __init__(self, resource: DBCluster | DBInstance, kms_key_id: str | None): self.resource = resource self.kms = resource.backend.kms self.secretsmanager = resource.backend.secretsmanager self.secret = self._create_secret(kms_key_id) self._status = "creating" def delete_secret(self) -> None: self.secretsmanager.delete_secret( self.secret.arn, recovery_window_in_days=None, force_delete_without_recovery=True, ) def rotate_secret(self) -> None: self._status = "rotating" self.secretsmanager.rotate_secret(self.secret.arn, rotate_immediately=True) @property def kms_key_id(self) -> str: assert self.secret.kms_key_id is not None key = self.kms.describe_key(self.secret.kms_key_id) return key.arn @property def secret_arn(self) -> str: return self.secret.arn @property def secret_status(self) -> str: status_to_return = self._status # Poor man's state manager - look into using moto's state manager. if status_to_return in ["creating", "rotating"]: self._status = "active" return status_to_return def _create_secret(self, kms_key_id: Optional[str]) -> FakeSecret: secret = self.secretsmanager.create_managed_secret( service_name="rds", secret_id=self._generate_secret_name(), secret_string="P@55w0rd!", description=self._generate_secret_description(), kms_key_id=kms_key_id, tags=self._generate_secret_tags(), ) return secret def _generate_secret_name(self) -> str: unique_id = str(uuid.uuid4()) secret_name = f"rds!{self.resource.resource_type}-{unique_id}" return secret_name def _generate_secret_description(self) -> str: resource_type = self.resource.__class__.__name__[2:].lower() description = f"The secret associated with the primary RDS DB {resource_type}: {self.resource.arn}" return description def _generate_secret_tags(self) -> list[dict[str, str]]: resource_type = self.resource.__class__.__name__ tags = [ {"Key": f"aws:rds:primary{resource_type}Arn", "Value": self.resource.arn}, ] return tags class DBCluster(RDSBaseModel): SUPPORTED_FILTERS = { "db-cluster-id": FilterDef( ["db_cluster_arn", "db_cluster_identifier"], "DB Cluster Identifiers" ), "engine": FilterDef(["engine"], "Engine Names"), } resource_type = "cluster" def __init__( self, backend: RDSBackend, db_cluster_identifier: str, engine: str, allocated_storage: Optional[int] = None, auto_minor_version_upgrade: bool = True, engine_version: Optional[str] = None, master_username: Optional[str] = None, master_user_password: Optional[str] = None, backup_retention_period: int = 1, character_set_name: Optional[str] = None, copy_tags_to_snapshot: Optional[bool] = False, database_name: Optional[str] = None, db_cluster_parameter_group_name: Optional[str] = None, db_subnet_group_name: Optional[str] = None, license_model: str = "general-public-license", port: Optional[int] = None, preferred_backup_window: str = "01:37-02:07", preferred_maintenance_window: str = "wed:02:40-wed:03:10", storage_encrypted: bool = False, tags: Optional[List[Dict[str, str]]] = None, vpc_security_group_ids: Optional[List[str]] = None, deletion_protection: Optional[bool] = False, kms_key_id: Optional[str] = None, manage_master_user_password: Optional[bool] = False, master_user_secret_kms_key_id: Optional[str] = None, **kwargs: Any, ): super().__init__(backend) self.database_name = database_name self.db_cluster_identifier = db_cluster_identifier self.db_cluster_instance_class = kwargs.get("db_cluster_instance_class") self.auto_minor_version_upgrade = auto_minor_version_upgrade self.deletion_protection = deletion_protection self.engine = engine if self.engine not in ClusterEngine.list_cluster_engines(): raise InvalidParameterValue( ( "Engine '{engine}' is not supported " "to satisfy constraint: Member must satisfy enum value set: " "{valid_engines}" ).format( engine=self.engine, valid_engines=ClusterEngine.list_cluster_engines(), ) ) self.engine_version = engine_version or DBCluster.default_engine_version( self.engine ) semantic = self.engine_version.split(".") option_suffix = semantic[0] if len(semantic) > 1: option_suffix = option_suffix + "-" + semantic[1] self.option_group_name = f"default:{self.engine}-{option_suffix}" self.engine_mode = kwargs.get("engine_mode") or "provisioned" self.iops = kwargs.get("iops") self.network_type = kwargs.get("network_type") or "IPV4" self._status = "creating" self.cluster_create_time = self.created self.copy_tags_to_snapshot = copy_tags_to_snapshot self.storage_type = kwargs.get("storage_type") if self.storage_type is None: self.storage_type = DBCluster.default_storage_type(iops=self.iops) self.allocated_storage = ( allocated_storage or DBCluster.default_allocated_storage( engine=self.engine, storage_type=self.storage_type ) ) self.master_username = master_username self.character_set_name = character_set_name self.global_cluster_identifier = kwargs.get("global_cluster_identifier") if ( not self.master_username and self.global_cluster_identifier or self.engine == "neptune" ): pass elif not self.master_username: raise InvalidParameterValue( "The parameter MasterUsername must be provided and must not be blank." ) self.manage_master_user_password = manage_master_user_password if self.manage_master_user_password: self.master_user_secret = MasterUserSecret( self, master_user_secret_kms_key_id ) elif not self.global_cluster_identifier and not self.engine == "neptune": self.master_user_password = master_user_password or "" self.availability_zones = kwargs.get("availability_zones") if not self.availability_zones: self.availability_zones = [ f"{self.region}a", f"{self.region}b", f"{self.region}c", ] default_pg = ( "default.neptune1.3" if self.engine == "neptune" else "default.aurora8.0" ) self.parameter_group = db_cluster_parameter_group_name or default_pg self.db_subnet_group_name = db_subnet_group_name or "default" self.url_identifier = "".join( random.choice(string.ascii_lowercase + string.digits) for _ in range(12) ) self.endpoint = f"{self.db_cluster_identifier}.cluster-{self.url_identifier}.{self.region}.rds.amazonaws.com" self.reader_endpoint = f"{self.db_cluster_identifier}.cluster-ro-{self.url_identifier}.{self.region}.rds.amazonaws.com" self.port = port or DBCluster.default_port(self.engine) self.preferred_backup_window = preferred_backup_window self.preferred_maintenance_window = preferred_maintenance_window # This should default to the default security group self._vpc_security_group_ids = vpc_security_group_ids or [] self.hosted_zone_id = "".join( random.choice(string.ascii_uppercase + string.digits) for _ in range(14) ) self.db_cluster_resource_id = "cluster-" + "".join( random.choice(string.ascii_uppercase + string.digits) for _ in range(26) ) self.tags = tags or [] self.enabled_cloudwatch_logs_exports = ( kwargs.get("enable_cloudwatch_logs_exports") or [] ) self.enable_http_endpoint = kwargs.get("enable_http_endpoint") # type: ignore self.earliest_restorable_time = utcnow() self.scaling_configuration = kwargs.get("scaling_configuration") if not self.scaling_configuration and self.engine_mode == "serverless": # In AWS, this default configuration only shows up when the Cluster is in a ready state, so a few minutes after creation self.scaling_configuration = { "min_capacity": 1, "max_capacity": 16, "auto_pause": True, "seconds_until_auto_pause": 300, "timeout_action": "RollbackCapacityChange", "seconds_before_timeout": 300, } self.serverless_v2_scaling_configuration = kwargs.get( "serverless_v2_scaling_configuration" ) self.replication_source_identifier = kwargs.get("replication_source_identifier") self.read_replica_identifiers: List[str] = list() self.is_writer: bool = False self.storage_encrypted = storage_encrypted self.kms_key_id = kms_key_id or "default_kms_key_id" if self.engine == "aurora-mysql" or self.engine == "aurora-postgresql": self._global_write_forwarding_requested = kwargs.get( "enable_global_write_forwarding" ) self.backup_retention_period = backup_retention_period if backtrack := kwargs.get("backtrack_window"): if self.engine == "aurora-mysql": # https://docs.aws.amazon.com/AmazonRDS/latest/APIReference/API_CreateDBCluster.html if 0 <= backtrack <= 259200: self.backtrack_window: int = backtrack else: raise InvalidParameterValue( f"The specified value ({backtrack}) is not a valid Backtrack Window. " "Allowed values are within the range of 0 to 259200" ) else: raise InvalidParameterValue( "Backtrack is not enabled for the postgres engine." ) else: self.backtrack_window = 0 self.iam_auth = kwargs.get("enable_iam_database_authentication", False) if self.iam_auth: if not self.engine.startswith("aurora-"): raise InvalidParameterCombination( "IAM Authentication is currently not supported by Multi-AZ DB clusters." ) self.license_model = license_model @property def db_subnet_group(self) -> str: # Despite the documentation saying this attribute returns: # "Information about the subnet group associated with the DB cluster, # including the name, description, and subnets in the subnet group." # It just returns the name... return self.db_subnet_group_name @property def resource_id(self) -> str: return self.db_cluster_identifier @property def multi_az(self) -> bool: availability_zones = list( set([instance.availability_zone for instance in self._members]) ) multi_az_conditions = [ (len(self.read_replica_identifiers) > 0), (self.replication_source_identifier is not None), (len(availability_zones) > 1), ] return any(multi_az_conditions) @property def db_cluster_arn(self) -> str: return self.arn @property def latest_restorable_time(self) -> datetime: return utcnow() @property def master_user_password(self) -> str: raise NotImplementedError("Password not retrievable.") @master_user_password.setter def master_user_password(self, val: str) -> None: if not val: raise InvalidParameterValue( "The parameter MasterUserPassword must be provided and must not be blank." ) if len(val) < 8: raise InvalidParameterValue( "The parameter MasterUserPassword is not a valid password because it is shorter than 8 characters." ) self._master_user_password = val @property def enable_http_endpoint(self) -> bool: return self._enable_http_endpoint @enable_http_endpoint.setter def enable_http_endpoint(self, val: Optional[bool]) -> None: # instead of raising an error on aws rds create-db-cluster commands with # incompatible configurations with enable_http_endpoint # (e.g. engine_mode is not set to "serverless"), the API # automatically sets the enable_http_endpoint parameter to False self._enable_http_endpoint = False if val is not None: if self.engine_mode == "serverless": if self.engine == "aurora-mysql" and self.engine_version in [ "5.6.10a", "5.6.1", "2.07.1", "5.7.2", "5.7.mysql_aurora.2.07.1", "5.7.mysql_aurora.2.07.2", "5.7.mysql_aurora.2.08.3", ]: self._enable_http_endpoint = val elif self.engine == "aurora-postgresql" and self.engine_version in [ "10.12", "10.14", "10.18", "11.13", ]: self._enable_http_endpoint = val elif self.engine == "aurora" and self.engine_version in [ "5.6.mysql_aurora.1.22.5" ]: self._enable_http_endpoint = val @property def http_endpoint_enabled(self) -> bool: return True if self.enable_http_endpoint else False @property def db_cluster_parameter_group(self) -> str: return self.parameter_group @property def status(self) -> str: if self._status == "creating": self._status = "available" return "creating" return self._status @status.setter def status(self, value: str) -> None: self._status = value @property def _members(self) -> List[DBInstanceClustered]: return [ db_instance for db_instance in self.backend.databases.values() if isinstance(db_instance, DBInstanceClustered) and db_instance.db_cluster_identifier == self.resource_id ] @property def writer(self) -> Optional[DBInstanceClustered]: return next( ( db_instance for db_instance in self._members if db_instance.is_cluster_writer ), None, ) @writer.setter def writer(self, db_instance: DBInstanceClustered) -> None: db_instance.is_cluster_writer = True @property def members(self) -> List[DBInstanceClustered]: self.designate_writer() return self._members def designate_writer(self) -> None: if self.writer or not self._members: return promotion_list = sorted(self._members, key=lambda x: x.promotion_tier) self.writer = promotion_list[0] @property def associated_roles(self) -> List[Dict[str, Any]]: # type: ignore[misc] return [] @property def scaling_configuration_info(self) -> Dict[str, Any]: # type: ignore[misc] configuration = self.scaling_configuration or {} info = { "MinCapacity": configuration.get("min_capacity"), "MaxCapacity": configuration.get("max_capacity"), "AutoPause": configuration.get("auto_pause"), "SecondsUntilAutoPause": configuration.get("seconds_until_auto_pause"), "TimeoutAction": configuration.get("timeout_action"), "SecondsBeforeTimeout": configuration.get("seconds_before_timeout"), } return info @property def vpc_security_groups(self) -> List[Dict[str, Any]]: # type: ignore[misc] groups = [ {"VpcSecurityGroupId": sg_id, "Status": "active"} for sg_id in self._vpc_security_group_ids ] return groups @property def domain_memberships(self) -> List[str]: return [] @property def cross_account_clone(self) -> bool: return False @property def global_write_forwarding_requested(self) -> bool: # This does not appear to be in the standard response for any clusters # Docs say it's only for a secondary cluster in aurora global database... return True if self._global_write_forwarding_requested else False @property def db_cluster_members(self) -> List[Dict[str, Any]]: # type: ignore[misc] members = [ { "DBInstanceIdentifier": member.db_instance_identifier, "IsClusterWriter": member.is_cluster_writer, "DBClusterParameterGroupStatus": "in-sync", "PromotionTier": member.promotion_tier, } for member in self.members ] return members @property def iam_database_authentication_enabled(self) -> bool: return True if self.iam_auth else False def get_cfg(self) -> Dict[str, Any]: cfg = self.__dict__.copy() cfg.pop("backend") cfg["master_user_password"] = cfg.pop("_master_user_password") cfg["enable_http_endpoint"] = cfg.pop("_enable_http_endpoint") cfg["vpc_security_group_ids"] = cfg.pop("_vpc_security_group_ids") return cfg @staticmethod def default_engine_version(engine: str) -> str: return { "aurora": "5.6.mysql_aurora.1.22.5", "aurora-mysql": "5.7.mysql_aurora.2.07.2", "aurora-postgresql": "12.7", "mysql": "8.0.23", "neptune": "1.3.2.1", "postgres": "13.4", }[engine] @staticmethod def default_port(engine: str) -> int: return { "aurora": 3306, "aurora-mysql": 3306, "aurora-postgresql": 5432, "mysql": 3306, "neptune": 8182, "postgres": 5432, }[engine] @staticmethod def default_storage_type(iops: Any) -> str: # type: ignore[misc] return "gp2" if iops is None else "io1" @staticmethod def default_allocated_storage(engine: str, storage_type: str) -> int: return { "aurora": {"gp2": 1, "io1": 1, "standard": 1}, "aurora-mysql": {"gp2": 1, "io1": 1, "standard": 1}, "aurora-postgresql": {"gp2": 1, "io1": 1, "standard": 1}, "mysql": {"gp2": 20, "io1": 100, "standard": 5}, "neptune": {"gp2": 0, "io1": 0, "standard": 0}, "postgres": {"gp2": 20, "io1": 100, "standard": 5}, }[engine][storage_type] def failover(self, target_instance: DBInstanceClustered) -> None: if self.writer is not None: self.writer.is_cluster_writer = False self.writer = target_instance def save_automated_backup(self) -> None: time_stamp = utcnow().strftime("%Y-%m-%d-%H-%M") snapshot_id = f"rds:{self.db_cluster_identifier}-{time_stamp}" self.backend.create_auto_cluster_snapshot( self.db_cluster_identifier, snapshot_id ) class DBClusterSnapshot(SnapshotAttributesMixin, RDSBaseModel): resource_type = "cluster-snapshot" SUPPORTED_FILTERS = { "db-cluster-id": FilterDef( ["db_cluster_arn", "db_cluster_identifier"], "DB Cluster Identifiers", ), "db-cluster-snapshot-id": FilterDef( ["db_cluster_snapshot_identifier"], "DB Cluster Snapshot Identifiers" ), "snapshot-type": FilterDef(["snapshot_type"], "Snapshot Types"), "engine": FilterDef(["cluster.engine"], "Engine Names"), } def __init__( self, backend: RDSBackend, cluster: DBCluster, snapshot_id: str, snapshot_type: str = "manual", tags: Optional[List[Dict[str, str]]] = None, kms_key_id: Optional[str] = None, **kwargs: Any, ): super().__init__(backend=backend, **kwargs) self.db_cluster_snapshot_identifier = snapshot_id self.snapshot_type = snapshot_type self.percent_progress = 100 self.status = "available" # If tags are provided at creation, AWS does *not* copy tags from the # db_cluster (even if copy_tags_to_snapshot is True). if tags is not None: self.tags = tags elif cluster.copy_tags_to_snapshot: self.tags = cluster.tags or [] else: self.tags = [] self.cluster = copy.copy(cluster) self.allocated_storage = self.cluster.allocated_storage self.cluster_create_time = self.cluster.created self.db_cluster_identifier = self.cluster.db_cluster_identifier if kms_key_id is not None: self.kms_key_id = self.cluster.kms_key_id = kms_key_id self.encrypted = self.cluster.storage_encrypted = True else: self.kms_key_id = self.cluster.kms_key_id # type: ignore[assignment] self.encrypted = self.cluster.storage_encrypted # type: ignore[assignment] self.engine = self.cluster.engine self.engine_version = self.cluster.engine_version self.master_username = self.cluster.master_username self.port = self.cluster.port self.storage_encrypted = self.cluster.storage_encrypted @property def resource_id(self) -> str: return self.db_cluster_snapshot_identifier @property def db_cluster_snapshot_arn(self) -> str: return self.arn @property def snapshot_create_time(self) -> datetime: return self.created class LogFileManager: def __init__(self, engine: str) -> None: self.log_files = [] filename = f"error/{engine}.log" if engine == "postgres": formatted_time = utcnow().strftime("%Y-%m-%d-%H") filename = f"error/postgresql.log.{formatted_time}" self.log_files.append(DBLogFile(filename)) @property def files(self) -> List[DBLogFile]: return self.log_files class DBLogFile(XFormedAttributeAccessMixin): BOTOCORE_MODEL = "DescribeDBLogFilesDetails" def __init__(self, name: str) -> None: self.log_file_name = name self.last_written = unix_time() self.size = 123 class DBInstance(EventMixin, CloudFormationModel, RDSBaseModel): BOTOCORE_MODEL = "DBInstance" SUPPORTED_FILTERS = { "db-cluster-id": FilterDef(["db_cluster_identifier"], "DB Cluster Identifiers"), "db-instance-id": FilterDef( ["db_instance_arn", "db_instance_identifier"], "DB Instance Identifiers" ), "dbi-resource-id": FilterDef(["dbi_resource_id"], "Dbi Resource Ids"), "domain": FilterDef(None, ""), "engine": FilterDef(["engine"], "Engine Names"), } default_engine_versions = { "MySQL": "5.6.21", "mysql": "5.6.21", "oracle-ee": "11.2.0.4.v3", "oracle-se": "11.2.0.4.v3", "oracle-se1": "11.2.0.4.v3", "oracle-se2": "11.2.0.4.v3", "postgres": "9.3.3", "sqlserver-ee": "11.00.2100.60.v1", "sqlserver-ex": "11.00.2100.60.v1", "sqlserver-se": "11.00.2100.60.v1", "sqlserver-web": "11.00.2100.60.v1", } event_source_type = "db-instance" resource_type = "db" def __init__( self, backend: RDSBackend, db_instance_identifier: str, db_instance_class: str, engine: str, engine_version: Optional[str] = None, port: Optional[int] = None, allocated_storage: Optional[int] = None, max_allocated_storage: Optional[int] = None, backup_retention_period: int = 1, character_set_name: Optional[str] = None, auto_minor_version_upgrade: bool = True, db_name: Optional[str] = None, db_security_groups: Optional[List[str]] = None, db_subnet_group_name: Optional[str] = None, db_cluster_identifier: Optional[str] = None, db_parameter_group_name: Optional[str] = None, copy_tags_to_snapshot: bool = False, iops: Optional[str] = None, master_username: Optional[str] = None, master_user_password: Optional[str] = None, multi_az: bool = False, license_model: str = "general-public-license", preferred_backup_window: str = "13:14-13:44", preferred_maintenance_window: str = "wed:06:38-wed:07:08", publicly_accessible: Optional[bool] = None, source_db_instance_identifier: Optional[str] = None, storage_type: Optional[str] = None, storage_encrypted: bool = False, tags: Optional[List[Dict[str, str]]] = None, vpc_security_group_ids: Optional[List[str]] = None, deletion_protection: bool = False, option_group_name: Optional[str] = None, enable_cloudwatch_logs_exports: Optional[List[str]] = None, ca_certificate_identifier: str = "rds-ca-default", availability_zone: Optional[str] = None, manage_master_user_password: Optional[bool] = False, master_user_secret_kms_key_id: Optional[str] = None, **kwargs: Any, ) -> None: super().__init__(backend) self.status = "available" self.is_replica = False self.replicas: List[str] = [] self.engine = engine if self.engine not in DbInstanceEngine.valid_db_instance_engine(): raise InvalidParameterValue( f"Value {self.engine} for parameter Engine is invalid. Reason: engine {self.engine} not supported" ) self.log_file_manager = LogFileManager(self.engine) self.iops = iops self.auto_minor_version_upgrade = auto_minor_version_upgrade self.db_instance_identifier = db_instance_identifier self.source_db_identifier = source_db_instance_identifier self.db_instance_class = db_instance_class self.port = port if self.port is None: self.port = DBInstance.default_port(self.engine) self.db_name = db_name self.instance_create_time = self.created self.publicly_accessible = publicly_accessible self.copy_tags_to_snapshot = copy_tags_to_snapshot self.availability_zone: str = availability_zone or f"{self.region}a" self.multi_az = multi_az self.db_subnet_group_name = db_subnet_group_name self.db_security_groups = db_security_groups or [] self.vpc_security_group_ids = vpc_security_group_ids or [] if not self.vpc_security_group_ids: ec2_backend = ec2_backends[self.account_id][self.region] default_vpc = ec2_backend.default_vpc default_sg = ec2_backend.get_default_security_group(default_vpc.id) self.vpc_security_group_ids.append(default_sg.id) # type: ignore self.preferred_maintenance_window = preferred_maintenance_window.lower() self.db_parameter_group_name = db_parameter_group_name if ( self.db_parameter_group_name and not self.is_default_parameter_group(self.db_parameter_group_name) and self.db_parameter_group_name not in rds_backends[self.account_id][self.region].db_parameter_groups ): raise DBParameterGroupNotFoundError(self.db_parameter_group_name) self.ca_certificate_identifier = ca_certificate_identifier self.enable_iam_database_authentication = kwargs.get( "enable_iam_database_authentication", False ) self.dbi_resource_id = "db-M5ENSHXFPU6XHZ4G4ZEI5QIO2U" self.tags = tags or [] self.deletion_protection = deletion_protection self.enabled_cloudwatch_logs_exports = enable_cloudwatch_logs_exports or [] self.db_cluster_identifier = db_cluster_identifier if self.db_cluster_identifier is None: self.storage_type = storage_type or DBInstance.default_storage_type( iops=self.iops ) self.allocated_storage = ( allocated_storage or DBInstance.default_allocated_storage( engine=self.engine, storage_type=self.storage_type ) ) self.max_allocated_storage = max_allocated_storage or self.allocated_storage self.storage_encrypted = storage_encrypted if self.storage_encrypted: self.kms_key_id = kwargs.get("kms_key_id", "default_kms_key_id") else: self.kms_key_id = None self.backup_retention_period = backup_retention_period self.character_set_name = character_set_name self.engine_version = ( engine_version or self.default_engine_versions[self.engine] ) self.license_model = license_model self.master_username = master_username self.manage_master_user_password = manage_master_user_password if self.manage_master_user_password: self.master_user_secret = MasterUserSecret( self, master_user_secret_kms_key_id ) else: self.master_user_password = master_user_password self.preferred_backup_window = preferred_backup_window msg = valid_preferred_maintenance_window( self.preferred_maintenance_window, self.preferred_backup_window, ) if msg: raise RDSClientError("InvalidParameterValue", msg) self.option_group_supplied = option_group_name is not None if ( option_group_name and option_group_name not in rds_backends[self.account_id][self.region].option_groups ): raise OptionGroupNotFoundFaultError(option_group_name) assert self.engine and self.engine_version semantic = self.engine_version.split(".") option_suffix = semantic[0] if len(semantic) > 1: option_suffix = option_suffix + "-" + semantic[1] default_option_group_name = f"default:{self.engine}-{option_suffix}" self.option_group_name = option_group_name or default_option_group_name @property def db_subnet_group_name(self) -> Optional[str]: raise NotImplementedError("write only property") @db_subnet_group_name.setter def db_subnet_group_name(self, value: Optional[str]) -> None: self._db_subnet_group_name = value if self._db_subnet_group_name is not None: self.db_subnet_group = rds_backends[self.account_id][ self.region ].describe_db_subnet_groups(self._db_subnet_group_name)[0] @property def allocated_storage(self) -> int: return self._allocated_storage @allocated_storage.setter def allocated_storage(self, value: int) -> None: self._allocated_storage = value @property def backup_retention_period(self) -> int: return self._backup_retention_period @backup_retention_period.setter def backup_retention_period(self, value: int) -> None: self._backup_retention_period = value @property def character_set_name(self) -> Optional[str]: return self._character_set_name @character_set_name.setter def character_set_name(self, value: Optional[str]) -> None: self._character_set_name = value @property def db_name(self) -> Optional[str]: return self._db_name @db_name.setter def db_name(self, value: Optional[str]) -> None: self._db_name = value @property def engine_version(self) -> str: return self._engine_version @engine_version.setter def engine_version(self, value: str) -> None: self._engine_version = value @property def kms_key_id(self) -> Optional[str]: return self._kms_key_id @kms_key_id.setter def kms_key_id(self, value: Optional[str]) -> None: self._kms_key_id = value @property def license_model(self) -> str: return self._license_model @license_model.setter def license_model(self, value: str) -> None: self._license_model = value @property def master_username(self) -> Optional[str]: return self._master_username @master_username.setter def master_username(self, value: Optional[str]) -> None: self._master_username = value @property def master_user_password(self) -> Optional[str]: raise NotImplementedError("Password is not retrievable.") @master_user_password.setter def master_user_password(self, value: Optional[str]) -> None: self._master_user_password = value @property def max_allocated_storage(self) -> Optional[int]: if self._max_allocated_storage > self.allocated_storage: return self._max_allocated_storage return None @max_allocated_storage.setter def max_allocated_storage(self, value: int) -> None: if value < self.allocated_storage: raise InvalidParameterCombination( "Max storage size must be greater than storage size" ) self._max_allocated_storage = value @property def option_group_name(self) -> str: return self._option_group_name @option_group_name.setter def option_group_name(self, value: str) -> None: self._option_group_name = value @property def preferred_backup_window(self) -> str: return self._preferred_backup_window @preferred_backup_window.setter def preferred_backup_window(self, value: str) -> None: self._preferred_backup_window = value @property def storage_encrypted(self) -> bool: return self._storage_encrypted @storage_encrypted.setter def storage_encrypted(self, value: bool) -> None: self._storage_encrypted = value @property def storage_type(self) -> str: return self._storage_type @storage_type.setter def storage_type(self, value: str) -> None: self._storage_type = value @property def resource_id(self) -> str: return self.db_instance_identifier @property def db_instance_arn(self) -> str: return self.arn @property def physical_resource_id(self) -> Optional[str]: return self.db_instance_identifier @property def latest_restorable_time(self) -> datetime: return utcnow() def db_parameter_groups(self) -> List[DBParameterGroup]: if not self.db_parameter_group_name or self.is_default_parameter_group( self.db_parameter_group_name ): ( db_family, db_parameter_group_name, ) = self.default_db_parameter_group_details() description = f"Default parameter group for {db_family}" if db_parameter_group_name in self.backend.db_parameter_groups: return [self.backend.db_parameter_groups[db_parameter_group_name]] default_group = DBParameterGroup( backend=self.backend, db_parameter_group_name=db_parameter_group_name, db_parameter_group_family=db_family, description=description, tags=[], ) self.backend.db_parameter_groups[db_parameter_group_name] = default_group return [default_group] return [self.backend.db_parameter_groups[self.db_parameter_group_name]] def is_default_parameter_group(self, param_group_name: str) -> bool: return param_group_name.startswith(f"default.{self.engine.lower()}") # type: ignore def default_db_parameter_group_details(self) -> Tuple[str, str]: assert self.engine and self.engine_version minor_engine_version = ".".join(str(self.engine_version).rsplit(".")[:-1]) db_family = f"{self.engine.lower()}{minor_engine_version}" # type: ignore return db_family, f"default.{db_family}" @property def address(self) -> str: return ( f"{self.db_instance_identifier}.aaaaaaaaaa.{self.region}.rds.amazonaws.com" ) @property def vpc_security_group_membership_list(self) -> List[Dict[str, Any]]: # type: ignore[misc] groups = [ { "Status": "active", "VpcSecurityGroupId": id_, } for id_ in self.vpc_security_group_ids ] return groups @property def db_parameter_group_status_list(self) -> Any: # type: ignore[misc] groups = self.db_parameter_groups() for group in groups: setattr(group, "ParameterApplyStatus", "in-sync") return groups @property def db_security_group_membership_list(self) -> List[Dict[str, Any]]: # type: ignore[misc] groups = [ { "Status": "active", "DBSecurityGroupName": group, } for group in self.db_security_groups ] return groups @property def endpoint(self) -> Dict[str, Any]: # type: ignore[misc] return { "Address": self.address, "Port": self.port, } @property def option_group_memberships(self) -> List[Dict[str, Any]]: # type: ignore[misc] groups = [ { "OptionGroupName": self.option_group_name, "Status": "in-sync", } ] return groups @property def read_replica_db_instance_identifiers(self) -> List[str]: return [replica for replica in self.replicas] @property def db_instance_port(self) -> Optional[int]: return self.port @property def read_replica_source_db_instance_identifier(self) -> Optional[str]: return self.source_db_instance_identifier @property def iam_database_authentication_enabled(self) -> bool: return self.enable_iam_database_authentication def add_replica(self, replica: DBInstance) -> None: if self.region != replica.region: # Cross Region replica self.replicas.append(replica.db_instance_arn) else: self.replicas.append(replica.db_instance_identifier) # type: ignore def remove_replica(self, replica: DBInstance) -> None: self.replicas.remove(replica.db_instance_identifier) # type: ignore def set_as_replica(self) -> None: self.is_replica = True self.replicas = [] def update( self, manage_master_user_password: Optional[bool] = None, master_user_secret_kms_key_id: Optional[str] = None, rotate_master_user_password: Optional[bool] = None, **db_kwargs: Dict[str, Any], ) -> None: if manage_master_user_password is True: self.master_user_secret = MasterUserSecret( self, master_user_secret_kms_key_id ) elif manage_master_user_password is False: self.master_user_secret.delete_secret() del self.master_user_secret if rotate_master_user_password is True: self.master_user_secret.rotate_secret() for key, value in db_kwargs.items(): if value is not None: setattr(self, key, value) cwl_exports = db_kwargs.get("cloudwatch_logs_export_configuration") or {} for exp in cwl_exports.get("DisableLogTypes", []): self.enabled_cloudwatch_logs_exports.remove(exp) self.enabled_cloudwatch_logs_exports.extend( cwl_exports.get("EnableLogTypes", []) ) @classmethod def has_cfn_attr(cls, attr: str) -> bool: return attr in ["Endpoint.Address", "Endpoint.Port"] def get_cfn_attribute(self, attribute_name: str) -> Any: # Local import to avoid circular dependency with cloudformation.parsing from moto.cloudformation.exceptions import UnformattedGetAttTemplateException if attribute_name == "Endpoint.Address": return self.address elif attribute_name == "Endpoint.Port": return self.port raise UnformattedGetAttTemplateException() @staticmethod def default_port(engine: str) -> int: return { "aurora": 3306, "aurora-mysql": 3306, "aurora-postgresql": 5432, "mysql": 3306, "mariadb": 3306, "postgres": 5432, "oracle-ee": 1521, "oracle-se2": 1521, "oracle-se1": 1521, "oracle-se": 1521, "sqlserver-ee": 1433, "sqlserver-ex": 1433, "sqlserver-se": 1433, "sqlserver-web": 1433, }[engine] @staticmethod def default_storage_type(iops: Any) -> str: # type: ignore[misc] return "gp2" if iops is None else "io1" @staticmethod def default_allocated_storage(engine: str, storage_type: str) -> int: return { "aurora": {"gp2": 0, "io1": 0, "standard": 0}, "aurora-mysql": {"gp2": 20, "io1": 100, "standard": 10}, "aurora-postgresql": {"gp2": 20, "io1": 100, "standard": 10}, "mysql": {"gp2": 20, "io1": 100, "standard": 5}, "mariadb": {"gp2": 20, "io1": 100, "standard": 5}, "postgres": {"gp2": 20, "io1": 100, "standard": 5}, "oracle-ee": {"gp2": 20, "io1": 100, "standard": 10}, "oracle-se2": {"gp2": 20, "io1": 100, "standard": 10}, "oracle-se1": {"gp2": 20, "io1": 100, "standard": 10}, "oracle-se": {"gp2": 20, "io1": 100, "standard": 10}, "sqlserver-ee": {"gp2": 200, "io1": 200, "standard": 200}, "sqlserver-ex": {"gp2": 20, "io1": 100, "standard": 20}, "sqlserver-se": {"gp2": 200, "io1": 200, "standard": 200}, "sqlserver-web": {"gp2": 20, "io1": 100, "standard": 20}, }[engine][storage_type] @staticmethod def cloudformation_name_type() -> str: return "DBInstanceIdentifier" @staticmethod def cloudformation_type() -> str: # https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-rds-dbinstance.html return "AWS::RDS::DBInstance" @classmethod def create_from_cloudformation_json( # type: ignore[misc] cls, resource_name: str, cloudformation_json: Any, account_id: str, region_name: str, **kwargs: Any, ) -> DBInstance: properties = cloudformation_json["Properties"] db_security_groups = properties.get("DBSecurityGroups") if not db_security_groups: db_security_groups = [] security_groups = [group.name for group in db_security_groups] db_subnet_group = properties.get("DBSubnetGroupName") db_subnet_group_name = db_subnet_group.name if db_subnet_group else None db_kwargs = { "auto_minor_version_upgrade": properties.get("AutoMinorVersionUpgrade"), "allocated_storage": properties.get("AllocatedStorage"), "availability_zone": properties.get("AvailabilityZone"), "backup_retention_period": properties.get("BackupRetentionPeriod"), "db_instance_class": properties.get("DBInstanceClass"), "db_instance_identifier": resource_name.replace("_", "-"), "db_name": properties.get("DBName"), "preferred_backup_window": properties.get( "PreferredBackupWindow", "13:14-13:44" ), "preferred_maintenance_window": properties.get( "PreferredMaintenanceWindow", "wed:06:38-wed:07:08" ).lower(), "db_subnet_group_name": db_subnet_group_name, "engine": properties.get("Engine"), "engine_version": properties.get("EngineVersion"), "iops": properties.get("Iops"), "kms_key_id": properties.get("KmsKeyId"), "master_user_password": properties.get("MasterUserPassword"), "master_username": properties.get("MasterUsername"), "multi_az": properties.get("MultiAZ"), "db_parameter_group_name": properties.get("DBParameterGroupName"), "port": properties.get("Port", 3306), "publicly_accessible": properties.get("PubliclyAccessible"), "copy_tags_to_snapshot": properties.get("CopyTagsToSnapshot"), "db_security_groups": security_groups, "storage_encrypted": properties.get("StorageEncrypted"), "storage_type": properties.get("StorageType"), "tags": properties.get("Tags"), "vpc_security_group_ids": properties.get("VpcSecurityGroupIds", []), } rds_backend = rds_backends[account_id][region_name] source_db_identifier = properties.get("SourceDBInstanceIdentifier") if source_db_identifier: # Replica db_kwargs["source_db_instance_identifier"] = source_db_identifier database = rds_backend.create_db_instance_read_replica(db_kwargs) else: database = rds_backend.create_db_instance(db_kwargs) return database def delete(self, account_id: str, region_name: str) -> None: backend = rds_backends[account_id][region_name] backend.delete_db_instance(self.db_instance_identifier) # type: ignore[arg-type] def save_automated_backup(self) -> None: self.add_event("DB_INSTANCE_BACKUP_START") time_stamp = utcnow().strftime("%Y-%m-%d-%H-%M") snapshot_id = f"rds:{self.db_instance_identifier}-{time_stamp}" self.backend.create_auto_snapshot(self.db_instance_identifier, snapshot_id) self.add_event("DB_INSTANCE_BACKUP_FINISH") class DBInstanceClustered(DBInstance): def __init__( self, db_cluster_identifier: str, promotion_tier: int = 1, **kwargs: Any ) -> None: super().__init__(db_cluster_identifier=db_cluster_identifier, **kwargs) if db_cluster_identifier not in self.backend.clusters: raise DBClusterNotFoundError( db_cluster_identifier, f"The source cluster could not be found or cannot be accessed: {db_cluster_identifier}", ) self.cluster = self.backend.clusters[db_cluster_identifier] self.db_cluster_identifier: str = db_cluster_identifier self.is_cluster_writer = True if not self.cluster.members else False self.promotion_tier = promotion_tier self.manage_master_user_password = False @property def allocated_storage(self) -> int: return self.cluster.allocated_storage @allocated_storage.setter def allocated_storage(self, value: int) -> None: raise NotImplementedError("Not valid for clustered db instances.") @property def backup_retention_period(self) -> int: return self.cluster.backup_retention_period @backup_retention_period.setter def backup_retention_period(self, value: int) -> None: raise NotImplementedError("Not valid for clustered db instances.") @property def character_set_name(self) -> Optional[str]: return self.cluster.character_set_name @character_set_name.setter def character_set_name(self, value: Optional[str]) -> None: raise NotImplementedError("Not valid for clustered db instances.") # TODO: Need to understand better how this works with Aurora instances. # According to the boto3 documentation, `db_name` is valid: # "The name of the database to create when the primary instance # of the DB cluster is created. If this parameter isn't specified, # no database is created in the DB instance." # So does that mean the cluster.database_name and the instance.db_name # can differ? @property def db_name(self) -> Optional[str]: return self._db_name or self.cluster.database_name @db_name.setter def db_name(self, value: Optional[str]) -> None: self._db_name = value @property def engine_version(self) -> str: return self.cluster.engine_version @engine_version.setter def engine_version(self, value: str) -> None: raise NotImplementedError("Not valid for clustered db instances.") @property def kms_key_id(self) -> Optional[str]: return self.cluster.kms_key_id @kms_key_id.setter def kms_key_id(self, value: Optional[str]) -> None: raise NotImplementedError("Not valid for clustered db instances.") @property def license_model(self) -> str: return self.cluster.license_model @license_model.setter def license_model(self, value: str) -> None: raise NotImplementedError("Not valid for clustered db instances.") @property def master_username(self) -> Optional[str]: return self.cluster.master_username @master_username.setter def master_username(self, value: Optional[str]) -> None: raise NotImplementedError("Not valid for clustered db instances.") @property def master_user_password(self) -> Optional[str]: raise NotImplementedError("Password is not retrievable.") @master_user_password.setter def master_user_password(self, value: Optional[str]) -> None: raise NotImplementedError("Not valid for clustered db instances.") @property def max_allocated_storage(self) -> Optional[int]: return None @max_allocated_storage.setter def max_allocated_storage(self, value: int) -> None: raise NotImplementedError("Not valid for clustered db instances.") @property def option_group_name(self) -> str: return self.cluster.option_group_name @option_group_name.setter def option_group_name(self, value: str) -> None: raise NotImplementedError("Not valid for clustered db instances.") @property def preferred_backup_window(self) -> str: return self.cluster.preferred_backup_window @preferred_backup_window.setter def preferred_backup_window(self, value: str) -> None: raise NotImplementedError("Not valid for clustered db instances.") @property def storage_encrypted(self) -> bool: return self.cluster.storage_encrypted @storage_encrypted.setter def storage_encrypted(self, value: bool) -> None: raise NotImplementedError("Not valid for clustered db instances.") @property def storage_type(self) -> str: return "aurora" @storage_type.setter def storage_type(self, value: str) -> None: raise NotImplementedError("Not valid for clustered db instances.") class DBSnapshot(EventMixin, SnapshotAttributesMixin, RDSBaseModel): event_source_type = "db-snapshot" resource_type = "snapshot" SUPPORTED_FILTERS = { "db-instance-id": FilterDef( ["database.db_instance_arn", "database.db_instance_identifier"], "DB Instance Identifiers", ), "db-snapshot-id": FilterDef( ["db_snapshot_identifier"], "DB Snapshot Identifiers" ), "dbi-resource-id": FilterDef(["database.dbi_resource_id"], "Dbi Resource Ids"), "snapshot-type": FilterDef(["snapshot_type"], "Snapshot Types"), "engine": FilterDef(["database.engine"], "Engine Names"), } def __init__( self, backend: RDSBackend, database: DBInstance, snapshot_id: str, snapshot_type: str = "manual", tags: Optional[List[Dict[str, str]]] = None, original_created_at: Optional[datetime] = None, kms_key_id: Optional[str] = None, **kwargs: Any, ): super().__init__(backend=backend, **kwargs) self.database = copy.copy(database) # TODO: Refactor this out. self.db_snapshot_identifier = snapshot_id self.snapshot_type = snapshot_type self.status = "available" self.original_snapshot_create_time = original_created_at or self.created # If tags are provided at creation, AWS does *not* copy tags from the # db_cluster (even if copy_tags_to_snapshot is True). if tags: self.tags = tags elif database.copy_tags_to_snapshot and database.tags: self.tags = database.tags else: self.tags = [] # Database attributes are captured at the time the snapshot is taken. self.allocated_storage = database.allocated_storage self.dbi_resource_id = database.dbi_resource_id self.db_instance_identifier = database.db_instance_identifier self.engine = database.engine self.engine_version = database.engine_version self.kms_key_id = kms_key_id or database.kms_key_id self.storage_encrypted = ( self.kms_key_id is not None or database.storage_encrypted ) self.encrypted = self.kms_key_id is not None and self.storage_encrypted self.iam_database_authentication_enabled = ( database.enable_iam_database_authentication ) self.instance_create_time = database.created self.master_username = database.master_username self.port = database.port @property def resource_id(self) -> str: return self.db_snapshot_identifier @property def db_snapshot_arn(self) -> str: return self.arn @property def snapshot_create_time(self) -> datetime: return self.created class ExportTask(RDSBaseModel): def __init__( self, backend: RDSBackend, snapshot: Union[DBSnapshot, DBClusterSnapshot], kwargs: Dict[str, Any], ): super().__init__(backend) self.snapshot = snapshot self.export_task_identifier = kwargs.get("export_task_identifier") self.kms_key_id = kwargs.get("kms_key_id", "default_kms_key_id") self.source_arn = kwargs.get("source_arn") self.iam_role_arn = kwargs.get("iam_role_arn") self.s3_bucket = kwargs.get("s3_bucket_name") self.s3_prefix = kwargs.get("s3_prefix", "") self.export_only = kwargs.get("export_only", []) self.status = "complete" self.created_at = utcnow() self.source_type = "SNAPSHOT" if type(snapshot) is DBSnapshot else "CLUSTER" class EventSubscription(RDSBaseModel): resource_type = "es" def __init__(self, backend: RDSBackend, subscription_name: str, **kwargs: Any): super().__init__(backend) self.subscription_name = subscription_name self.sns_topic_arn = kwargs.get("sns_topic_arn") self.source_type = kwargs.get("source_type") self.event_categories = kwargs.get("event_categories", []) self.source_ids = kwargs.get("source_ids", []) self.enabled = kwargs.get("enabled", False) self.tags = kwargs.get("tags", []) self.status = "active" self.created_at = utcnow() @property def resource_id(self) -> str: return self.subscription_name @property def cust_subscription_id(self) -> str: return self.subscription_name @property def event_categories_list(self) -> List[str]: return self.event_categories @property def source_ids_list(self) -> List[str]: return self.source_ids class DBSecurityGroup(CloudFormationModel, RDSBaseModel): resource_type = "secgrp" def __init__( self, backend: RDSBackend, group_name: str, description: str, tags: List[Dict[str, str]], ): super().__init__(backend) self.name = group_name self.description = description self.status = "authorized" self._ip_ranges: List[Any] = [] self._ec2_security_groups: List[Any] = [] self.tags = tags self.vpc_id = None @property def resource_id(self) -> str: return self.name @property def ec2_security_groups(self) -> List[Dict[str, str]]: security_groups = [ { "Status": "Active", "EC2SecurityGroupName": sg.name, "EC2SecurityGroupId": sg.id, "EC2SecurityGroupOwnerId": sg.owner_id, } for sg in self._ec2_security_groups ] return security_groups @property def ip_ranges(self) -> List[Dict[str, Any]]: # type: ignore[misc] ranges = [ { "CIDRIP": ip_range, "Status": "authorized", } for ip_range in self._ip_ranges ] return ranges def authorize_cidr(self, cidr_ip: str) -> None: self._ip_ranges.append(cidr_ip) def authorize_security_group(self, security_group: str) -> None: self._ec2_security_groups.append(security_group) @staticmethod def cloudformation_name_type() -> str: return "" @staticmethod def cloudformation_type() -> str: # https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-rds-dbsecuritygroup.html return "AWS::RDS::DBSecurityGroup" @classmethod def create_from_cloudformation_json( # type: ignore[misc] cls, resource_name: str, cloudformation_json: Any, account_id: str, region_name: str, **kwargs: Any, ) -> DBSecurityGroup: properties = cloudformation_json["Properties"] group_name = resource_name.lower() description = properties["GroupDescription"] security_group_ingress_rules = properties.get("DBSecurityGroupIngress", []) tags = properties.get("Tags") ec2_backend = ec2_backends[account_id][region_name] rds_backend = rds_backends[account_id][region_name] security_group = rds_backend.create_db_security_group( group_name, description, tags ) for security_group_ingress in security_group_ingress_rules: for ingress_type, ingress_value in security_group_ingress.items(): if ingress_type == "CIDRIP": security_group.authorize_cidr(ingress_value) elif ingress_type == "EC2SecurityGroupName": subnet = ec2_backend.get_security_group_from_name(ingress_value) security_group.authorize_security_group(subnet) # type: ignore[arg-type] elif ingress_type == "EC2SecurityGroupId": subnet = ec2_backend.get_security_group_from_id(ingress_value) security_group.authorize_security_group(subnet) # type: ignore[arg-type] return security_group def delete(self, account_id: str, region_name: str) -> None: backend = rds_backends[account_id][region_name] backend.delete_security_group(self.group_name) class DBSubnetGroup(CloudFormationModel, RDSBaseModel): resource_type = "subgrp" def __init__( self, backend: RDSBackend, subnet_name: str, description: str, subnets: List[Subnet], tags: List[Dict[str, str]], ): super().__init__(backend) self.name = subnet_name self.description = description self._subnets = subnets self.subnet_group_status = "Complete" self.tags = tags self.vpc_id = self._subnets[0].vpc_id @property def resource_id(self) -> str: return self.name @property def db_subnet_group_description(self) -> str: return self.description @property def subnets(self) -> List[Dict[str, Any]]: # type: ignore[misc] subnets = [ { "SubnetStatus": "Active", "SubnetIdentifier": subnet.id, "SubnetAvailabilityZone": { "Name": subnet.availability_zone, "ProvisionedIopsCapable": False, }, } for subnet in self._subnets ] return subnets @subnets.setter def subnets(self, subnets: List[Subnet]) -> None: self._subnets = subnets @staticmethod def cloudformation_name_type() -> str: return "DBSubnetGroupName" @staticmethod def cloudformation_type() -> str: # https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-rds-dbsubnetgroup.html return "AWS::RDS::DBSubnetGroup" @classmethod def create_from_cloudformation_json( # type: ignore[misc] cls, resource_name: str, cloudformation_json: Any, account_id: str, region_name: str, **kwargs: Any, ) -> DBSubnetGroup: properties = cloudformation_json["Properties"] description = properties["DBSubnetGroupDescription"] subnet_ids = properties["SubnetIds"] tags = properties.get("Tags") ec2_backend = ec2_backends[account_id][region_name] subnets = [ec2_backend.get_subnet(subnet_id) for subnet_id in subnet_ids] rds_backend = rds_backends[account_id][region_name] subnet_group = rds_backend.create_subnet_group( resource_name, description, subnets, tags, ) return subnet_group def delete(self, account_id: str, region_name: str) -> None: backend = rds_backends[account_id][region_name] backend.delete_subnet_group(self.subnet_name) class DBProxy(RDSBaseModel): resource_type = "db-proxy" def __init__( self, backend: RDSBackend, db_proxy_name: str, engine_family: str, auth: List[Dict[str, str]], role_arn: str, vpc_subnet_ids: List[str], vpc_security_group_ids: Optional[List[str]], require_tls: Optional[bool] = False, idle_client_timeout: Optional[int] = 1800, debug_logging: Optional[bool] = False, tags: Optional[List[Dict[str, str]]] = None, ): super().__init__(backend) self.db_proxy_name = db_proxy_name self.engine_family = engine_family if self.engine_family not in ["MYSQL", "POSTGRESQL", "SQLSERVER"]: raise InvalidParameterValue("Provided EngineFamily is not valid.") self.auth = auth self.role_arn = role_arn self.vpc_subnet_ids = vpc_subnet_ids self.vpc_security_group_ids = vpc_security_group_ids or [] self.require_tls = require_tls if idle_client_timeout is None: self.idle_client_timeout = 1800 else: if int(idle_client_timeout) < 1: self.idle_client_timeout = 1 elif int(idle_client_timeout) > 28800: self.idle_client_timeout = 28800 else: self.idle_client_timeout = idle_client_timeout self.debug_logging = debug_logging if self.debug_logging is None: self.debug_logging = False self.created_date = utcnow() self.updated_date = utcnow() if tags is None: self.tags = [] else: self.tags = tags ec2_backend = ec2_backends[self.account_id][self.region] subnets = ec2_backend.describe_subnets(subnet_ids=self.vpc_subnet_ids) vpcs = [] for subnet in subnets: vpcs.append(subnet.vpc_id) if subnet.vpc_id != vpcs[0]: raise InvalidSubnet(subnet_identifier=subnet.id) if not self.vpc_security_group_ids: default_sg = ec2_backend.get_default_security_group(vpcs[0]) self.vpc_security_group_ids.append(default_sg.id) # type: ignore self.vpc_id = ec2_backend.describe_subnets(subnet_ids=[self.vpc_subnet_ids[0]])[ 0 ].vpc_id self.status = "available" self.url_identifier = "".join( random.choice(string.ascii_lowercase + string.digits) for _ in range(12) ) self.endpoint = f"{self.db_proxy_name}.db-proxy-{self.url_identifier}.{self.region}.rds.amazonaws.com" self.proxy_target_groups = { "default": DBProxyTargetGroup( backend=self.backend, name="default", proxy_name=db_proxy_name ) } self.unique_id = f"prx-{random.get_random_string(17, lower_case=True)}" @property def resource_id(self) -> str: return self.unique_id class DBInstanceAutomatedBackup(XFormedAttributeAccessMixin): def __init__( self, backend: RDSBackend, db_instance_identifier: str, automated_snapshots: List[DBSnapshot], ) -> None: self.backend = backend self.db_instance_identifier = db_instance_identifier self.automated_snapshots = automated_snapshots @property def status(self) -> str: status = "active" if self.db_instance_identifier not in self.backend.databases: status = "retained" return status class RDSBackend(BaseBackend): def __init__(self, region_name: str, account_id: str): super().__init__(region_name, account_id) self.arn_regex = re_compile( ARN_PARTITION_REGEX + r":rds:.*:[0-9]*:(db|cluster|es|og|pg|ri|secgrp|snapshot|cluster-snapshot|subgrp|db-proxy):.*$" ) self.clusters: Dict[str, DBCluster] = OrderedDict() self.global_clusters: Dict[str, GlobalCluster] = OrderedDict() self.databases: Dict[str, DBInstance] = OrderedDict() self.database_snapshots: Dict[str, DBSnapshot] = OrderedDict() self.cluster_snapshots: Dict[str, DBClusterSnapshot] = OrderedDict() self.export_tasks: Dict[str, ExportTask] = OrderedDict() self.event_subscriptions: Dict[str, EventSubscription] = OrderedDict() self.db_parameter_groups: Dict[str, DBParameterGroup] = {} self.db_cluster_parameter_groups: Dict[str, DBClusterParameterGroup] = {} self.option_groups: Dict[str, OptionGroup] = {} self.security_groups: Dict[str, DBSecurityGroup] = {} self.subnet_groups: Dict[str, DBSubnetGroup] = {} self._db_cluster_options: Optional[List[Dict[str, Any]]] = None self.db_proxies: Dict[str, DBProxy] = OrderedDict() self.events: List[Event] = [] self.resource_map = { DBCluster: self.clusters, DBClusterParameterGroup: self.db_cluster_parameter_groups, DBClusterSnapshot: self.cluster_snapshots, DBInstance: self.databases, DBParameterGroup: self.db_parameter_groups, DBProxy: self.db_proxies, DBSecurityGroup: self.security_groups, DBSnapshot: self.database_snapshots, DBSubnetGroup: self.subnet_groups, EventSubscription: self.event_subscriptions, ExportTask: self.export_tasks, GlobalCluster: self.global_clusters, OptionGroup: self.option_groups, } @property def kms(self) -> KmsBackend: return kms_backends[self.account_id][self.region_name] @property def secretsmanager(self) -> SecretsManagerBackend: return self.get_backend("secretsmanager", self.region_name, self.account_id) # type: ignore[return-value] def _validate_kms_key(self, kms_key_id: str) -> str: key = kms_key_id kms_backend = self.kms if (match := re.fullmatch(KMS_ARN_PATTERN, kms_key_id)) is not None: region = match.group("region") if region != self.region_name: raise KMSKeyNotAccessibleFault(kms_key_id) account = match.group("account_id") key = match.group("key") kms_backend = self.get_backend("kms", region, account) # type: ignore[assignment] assert isinstance(kms_backend, KmsBackend) try: kms_key = kms_backend.describe_key(key) except KeyError: raise KMSKeyNotAccessibleFault(kms_key_id) validated_key = kms_key.arn return validated_key def get_backend( self, service: Literal["kms"] | Literal["rds"] | Literal["secretsmanager"], region: str, account_id: Optional[str] = None, ) -> KmsBackend | RDSBackend | SecretsManagerBackend: from moto.backends import get_backend as get_moto_backend if account_id is None: account_id = self.account_id return get_moto_backend(service)[account_id][region] def get_snapshot( self, identifier: str, resource_type: type[DBSnapshot] | type[DBClusterSnapshot], not_found_exception: type[DBSnapshotNotFoundFault] | type[DBClusterSnapshotNotFoundError], ) -> DBSnapshot | DBClusterSnapshot: region = self.region_name if identifier.startswith("arn"): region = identifier.split(":")[3] identifier = identifier.split(":")[-1] backend = self.get_backend("rds", region=region) assert isinstance(backend, RDSBackend) snapshots = backend.resource_map[resource_type] if identifier not in snapshots: # type: ignore[operator] raise not_found_exception(identifier) return snapshots[identifier] # type: ignore[index] get_db_snapshot = partialmethod( get_snapshot, resource_type=DBSnapshot, not_found_exception=DBSnapshotNotFoundFault, ) get_db_cluster_snapshot = partialmethod( get_snapshot, resource_type=DBClusterSnapshot, not_found_exception=DBClusterSnapshotNotFoundError, ) def get_shared_snapshots( self, resource_type: type[DBSnapshot] | type[DBClusterSnapshot] ) -> List[DBSnapshot | DBClusterSnapshot]: snapshots_shared = [] for backend_container in rds_backends.values(): for backend in backend_container.values(): if backend.region_name != self.region_name: continue snapshots = backend.resource_map[resource_type].values() # type: ignore[attr-defined] for snapshot in snapshots: if self.account_id in snapshot.attributes["restore"]: snapshots_shared.append(snapshot) return snapshots_shared get_shared_db_snapshots = partialmethod( get_shared_snapshots, resource_type=DBSnapshot ) get_shared_db_cluster_snapshots = partialmethod( get_shared_snapshots, resource_type=DBClusterSnapshot ) @lru_cache() def db_cluster_options(self, engine) -> List[Dict[str, Any]]: # type: ignore from moto.rds.utils import decode_orderable_db_instance decoded_options = load_resource( __name__, f"resources/cluster_options/{engine}.json" ) self._db_cluster_options = [ decode_orderable_db_instance(option) for option in decoded_options ] return self._db_cluster_options def create_db_instance(self, db_kwargs: Dict[str, Any]) -> DBInstance: database_id = db_kwargs["db_instance_identifier"] if database_id in self.databases: raise DBInstanceAlreadyExists() self._validate_db_identifier(database_id) if db_kwargs.get("db_cluster_identifier") is None: database = DBInstance(self, **db_kwargs) else: database = DBInstanceClustered(backend=self, **db_kwargs) cluster_id = database.db_cluster_identifier if cluster_id is not None: cluster = self.clusters.get(cluster_id) if cluster is not None: if ( cluster.engine in ClusterEngine.serverless_engines() and cluster.engine_mode == "serverless" ): raise InvalidParameterValue( "Instances cannot be added to Aurora Serverless clusters." ) if database.engine != cluster.engine: raise InvalidDBInstanceEngine( str(database.engine), str(cluster.engine) ) self.databases[database_id] = database database.add_event("DB_INSTANCE_CREATE") database.save_automated_backup() return database def create_auto_snapshot( self, db_instance_identifier: str, db_snapshot_identifier: str, ) -> DBSnapshot: snapshot = self.create_db_snapshot( db_instance_identifier, db_snapshot_identifier, snapshot_type="automated" ) snapshot.add_event("DB_SNAPSHOT_CREATE_AUTOMATED_START") snapshot.add_event("DB_SNAPSHOT_CREATE_AUTOMATED_FINISH") return snapshot def create_db_snapshot( self, db_instance_identifier: str, db_snapshot_identifier: str, snapshot_type: str = "manual", tags: Optional[List[Dict[str, str]]] = None, original_created_at: Optional[datetime] = None, kms_key_id: Optional[str] = None, ) -> DBSnapshot: database = self.databases.get(db_instance_identifier) if not database: raise DBInstanceNotFoundError(db_instance_identifier) if db_snapshot_identifier in self.database_snapshots: raise DBSnapshotAlreadyExistsError(db_snapshot_identifier) if len(self.database_snapshots) >= int( os.environ.get("MOTO_RDS_SNAPSHOT_LIMIT", "100") ): raise SnapshotQuotaExceededFault() snapshot = DBSnapshot( self, database, db_snapshot_identifier, snapshot_type, tags, original_created_at, kms_key_id, ) self.database_snapshots[db_snapshot_identifier] = snapshot return snapshot def copy_db_snapshot( self, source_db_snapshot_identifier: str, target_db_snapshot_identifier: str, kms_key_id: Optional[str] = None, tags: Optional[List[Dict[str, str]]] = None, copy_tags: Optional[bool] = False, **_: Any, ) -> DBSnapshot: if source_db_snapshot_identifier.startswith("arn:aws:rds:"): self.extract_snapshot_name_from_arn(source_db_snapshot_identifier) if target_db_snapshot_identifier in self.database_snapshots: raise DBSnapshotAlreadyExistsError(target_db_snapshot_identifier) if len(self.database_snapshots) >= int( os.environ.get("MOTO_RDS_SNAPSHOT_LIMIT", "100") ): raise SnapshotQuotaExceededFault() if kms_key_id is not None: kms_key_id = self._validate_kms_key(kms_key_id) source_db_snapshot = self.get_db_snapshot(source_db_snapshot_identifier) # When tags are passed, AWS does NOT copy/merge tags of the # source snapshot, even when copy_tags=True is given. # But when tags=[], AWS does honor copy_tags=True. if copy_tags and not tags: tags = source_db_snapshot.tags or [] target_db_snapshot = DBSnapshot( self, source_db_snapshot.database, target_db_snapshot_identifier, tags=tags, kms_key_id=kms_key_id, original_created_at=source_db_snapshot.original_snapshot_create_time, ) self.database_snapshots[target_db_snapshot_identifier] = target_db_snapshot return target_db_snapshot def delete_db_snapshot(self, db_snapshot_identifier: str) -> DBSnapshot: if db_snapshot_identifier not in self.database_snapshots: raise DBSnapshotNotFoundFault(db_snapshot_identifier) return self.database_snapshots.pop(db_snapshot_identifier) def promote_read_replica(self, db_kwargs: Dict[str, Any]) -> DBInstance: database_id = db_kwargs["db_instance_identifier"] database = self.databases[database_id] if database.is_replica: database.is_replica = False database.update(**db_kwargs) return database def create_db_instance_read_replica(self, db_kwargs: Dict[str, Any]) -> DBInstance: database_id = db_kwargs["db_instance_identifier"] source_database_id = db_kwargs["source_db_instance_identifier"] primary = self.find_db_from_id(source_database_id) if self.arn_regex.match(source_database_id): db_kwargs["backend"] = self # Shouldn't really copy here as the instance is duplicated. RDS replicas have different instances. replica = copy.copy(primary) replica.update(**db_kwargs) replica.set_as_replica() self.databases[database_id] = replica primary.add_replica(replica) return replica def describe_db_instances( self, db_instance_identifier: Optional[str] = None, filters: Any = None ) -> List[DBInstance]: databases = self.databases if db_instance_identifier: filters = merge_filters( filters, {"db-instance-id": [db_instance_identifier]} ) if filters: databases = self._filter_resources(databases, filters, DBInstance) if db_instance_identifier and not databases: raise DBInstanceNotFoundError(db_instance_identifier) return list(databases.values()) def describe_db_snapshots( self, db_instance_identifier: Optional[str], db_snapshot_identifier: Optional[str] = None, snapshot_type: Optional[str] = None, filters: Optional[Dict[str, Any]] = None, ) -> List[DBSnapshot]: if snapshot_type == "shared": return self.get_shared_db_snapshots() # type: ignore[return-value] snapshots = self.database_snapshots if db_instance_identifier: filters = merge_filters( filters, {"db-instance-id": [db_instance_identifier]} ) if db_snapshot_identifier: filters = merge_filters( filters, {"db-snapshot-id": [db_snapshot_identifier]} ) snapshot_types = ( ["automated", "manual"] if ( snapshot_type is None and (filters is not None and "snapshot-type" not in filters) ) else [snapshot_type] if snapshot_type is not None else [] ) if snapshot_types: filters = merge_filters(filters, {"snapshot-type": snapshot_types}) if filters: snapshots = self._filter_resources(snapshots, filters, DBSnapshot) if db_snapshot_identifier and not snapshots and not db_instance_identifier: raise DBSnapshotNotFoundFault(db_snapshot_identifier) return list(snapshots.values()) def modify_db_instance( self, db_instance_identifier: str, db_kwargs: Dict[str, Any] ) -> DBInstance: database = self.describe_db_instances(db_instance_identifier)[0] if "new_db_instance_identifier" in db_kwargs: del self.databases[db_instance_identifier] db_instance_identifier = db_kwargs["db_instance_identifier"] = ( db_kwargs.pop("new_db_instance_identifier") ) self.databases[db_instance_identifier] = database if "db_parameter_group_name" in db_kwargs: db_parameter_group_name = db_kwargs["db_parameter_group_name"] if db_parameter_group_name not in self.db_parameter_groups: raise DBParameterGroupNotFoundError(db_parameter_group_name) preferred_backup_window = db_kwargs.get( "preferred_backup_window", database.preferred_backup_window ) preferred_maintenance_window = db_kwargs.get( "preferred_maintenance_window", database.preferred_maintenance_window ) if preferred_maintenance_window or preferred_backup_window: msg = valid_preferred_maintenance_window( preferred_maintenance_window, preferred_backup_window ) if msg: raise RDSClientError("InvalidParameterValue", msg) if db_kwargs.get("rotate_master_user_password") and not db_kwargs.get( "apply_immediately" ): raise RDSClientError( "InvalidParameterCombination", "You must specify apply immediately when rotating the master user password.", ) database.update(**db_kwargs) return database def reboot_db_instance(self, db_instance_identifier: str) -> DBInstance: return self.describe_db_instances(db_instance_identifier)[0] def extract_snapshot_name_from_arn(self, snapshot_arn: str) -> str: arn_breakdown = snapshot_arn.split(":") region_name, account_id, resource_type, snapshot_name = arn_breakdown[3:7] if resource_type != "snapshot": raise InvalidParameterValue( "The parameter SourceDBSnapshotIdentifier is not a valid identifier. " "Identifiers must begin with a letter; must contain only ASCII " "letters, digits, and hyphens; and must not end with a hyphen or " "contain two consecutive hyphens." ) return snapshot_name def restore_db_instance_from_db_snapshot( self, from_snapshot_id: str, overrides: Dict[str, Any] ) -> DBInstance: if from_snapshot_id.startswith("arn:aws:rds:"): from_snapshot_id = self.extract_snapshot_name_from_arn(from_snapshot_id) snapshot = self.describe_db_snapshots( db_instance_identifier=None, db_snapshot_identifier=from_snapshot_id )[0] original_database = snapshot.database if overrides["db_instance_identifier"] in self.databases: raise DBInstanceAlreadyExists() new_instance_props = {} for key, value in original_database.__dict__.items(): if key.startswith("_"): key = key[1:] if key not in [ "backend", "db_parameter_group_name", "vpc_security_group_ids", "max_allocated_storage", ]: new_instance_props[key] = copy.copy(value) new_instance_props["kms_key_id"] = snapshot.kms_key_id new_instance_props["storage_encrypted"] = snapshot.encrypted if not original_database.option_group_supplied: # If the option group is not supplied originally, the 'option_group_name' will receive a default value # Force this reconstruction, and prevent any validation on the default value del new_instance_props["option_group_name"] if "allocated_storage" in overrides: if overrides["allocated_storage"] < snapshot.allocated_storage: raise InvalidParameterValue( "The allocated storage size can't be less than the source snapshot or backup size." ) for key, value in overrides.items(): if value: new_instance_props[key] = value return self.create_db_instance(new_instance_props) def restore_db_instance_to_point_in_time( self, source_db_identifier: str, target_db_identifier: str, overrides: Dict[str, Any], ) -> DBInstance: db_instance = self.describe_db_instances( db_instance_identifier=source_db_identifier )[0] new_instance_props = {} for key, value in db_instance.__dict__.items(): if key.startswith("_"): key = key[1:] # Remove backend / db subnet group as they cannot be copied # and are not used in the restored instance. if key in ("backend", "db_subnet_group", "max_allocated_storage"): continue new_instance_props[key] = copy.deepcopy(value) new_instance_props["db_name"] = db_instance.db_name if not db_instance.option_group_supplied: # If the option group is not supplied originally, the 'option_group_name' will receive a default value # Force this reconstruction, and prevent any validation on the default value del new_instance_props["option_group_name"] if "allocated_storage" in overrides: if overrides["allocated_storage"] < db_instance.allocated_storage: raise InvalidParameterValue( "Allocated storage size can't be less than the source instance size." ) for key, value in overrides.items(): if value: new_instance_props[key] = value # set the new db instance identifier new_instance_props["db_instance_identifier"] = target_db_identifier return self.create_db_instance(new_instance_props) def restore_db_cluster_to_point_in_time( self, db_cluster_identifier: str, source_db_cluster_identifier: str, restore_type: str = "full-copy", restore_to_time: Optional[datetime] = None, use_latest_restorable_time: bool = False, **overrides: Dict[str, Any], ) -> DBCluster: db_cluster = self.describe_db_clusters( db_cluster_identifier=source_db_cluster_identifier )[0] new_cluster_props = {} for key, value in db_cluster.__dict__.items(): if key.startswith("_"): key = key[1:] # Remove backend / db subnet group as they cannot be copied # and are not used in the restored instance. if key in ("backend", "db_subnet_group", "vpc_security_group_ids"): continue new_cluster_props[key] = copy.copy(value) for key, value in overrides.items(): new_cluster_props[key] = value new_cluster_props["db_cluster_identifier"] = db_cluster_identifier return self.create_db_cluster(new_cluster_props) def failover_db_cluster( self, db_cluster_identifier: str, target_db_instance_identifier: str ) -> DBCluster: cluster = self.clusters[db_cluster_identifier] instance = self.databases[target_db_instance_identifier] assert isinstance(instance, DBInstanceClustered) cluster.failover(instance) return cluster def stop_db_instance( self, db_instance_identifier: str, db_snapshot_identifier: Optional[str] = None ) -> DBInstance: self._validate_db_identifier(db_instance_identifier) database = self.describe_db_instances(db_instance_identifier)[0] # todo: certain rds types not allowed to be stopped at this time. # https://docs.aws.amazon.com/AmazonRDS/latest/UserGuide/USER_StopInstance.html#USER_StopInstance.Limitations if database.is_replica or ( database.multi_az and database.engine.lower().startswith("sqlserver") # type: ignore ): # todo: more db types not supported by stop/start instance api raise InvalidDBClusterStateFaultError(db_instance_identifier) if database.status != "available": raise InvalidDBInstanceStateError(db_instance_identifier, "stop") if db_snapshot_identifier: self.create_auto_snapshot(db_instance_identifier, db_snapshot_identifier) database.status = "stopped" return database def start_db_instance(self, db_instance_identifier: str) -> DBInstance: self._validate_db_identifier(db_instance_identifier) database = self.describe_db_instances(db_instance_identifier)[0] # todo: bunch of different error messages to be generated from this api call if database.status != "stopped": raise InvalidDBInstanceStateError(db_instance_identifier, "start") database.status = "available" return database def find_db_from_id(self, db_id: str) -> DBInstance: if self.arn_regex.match(db_id): arn_breakdown = db_id.split(":") region = arn_breakdown[3] backend = rds_backends[self.account_id][region] db_name = arn_breakdown[-1] else: backend = self db_name = db_id return backend.describe_db_instances(db_name)[0] def delete_db_instance( self, db_instance_identifier: str, final_db_snapshot_identifier: Optional[str] = None, skip_final_snapshot: Optional[bool] = False, delete_automated_backups: Optional[bool] = True, ) -> DBInstance: self._validate_db_identifier(db_instance_identifier) if db_instance_identifier in self.databases: if self.databases[db_instance_identifier].deletion_protection: raise InvalidParameterCombination( "Cannot delete protected DB Instance, please disable deletion protection and try again." ) if final_db_snapshot_identifier and not skip_final_snapshot: self.create_db_snapshot( db_instance_identifier, final_db_snapshot_identifier, snapshot_type="manual", ) database = self.databases.pop(db_instance_identifier) if database.manage_master_user_password: database.master_user_secret.delete_secret() if database.is_replica: primary = self.find_db_from_id(database.source_db_instance_identifier) # type: ignore primary.remove_replica(database) automated_snapshots = self.describe_db_snapshots( db_instance_identifier, db_snapshot_identifier=None, snapshot_type="automated", ) if delete_automated_backups: for snapshot in automated_snapshots: self.delete_db_snapshot(snapshot.db_snapshot_identifier) database.status = "deleting" return database else: raise DBInstanceNotFoundError(db_instance_identifier) def create_db_security_group( self, group_name: str, description: str, tags: List[Dict[str, str]] ) -> DBSecurityGroup: security_group = DBSecurityGroup(self, group_name, description, tags) self.security_groups[group_name] = security_group return security_group def describe_security_groups( self, security_group_name: str ) -> List[DBSecurityGroup]: if security_group_name: if security_group_name in self.security_groups: return [self.security_groups[security_group_name]] else: raise DBSecurityGroupNotFoundError(security_group_name) return list(self.security_groups.values()) def delete_security_group(self, security_group_name: str) -> DBSecurityGroup: if security_group_name in self.security_groups: return self.security_groups.pop(security_group_name) else: raise DBSecurityGroupNotFoundError(security_group_name) def delete_db_parameter_group( self, db_parameter_group_name: str ) -> DBParameterGroup: if db_parameter_group_name in self.db_parameter_groups: return self.db_parameter_groups.pop(db_parameter_group_name) else: raise DBParameterGroupNotFoundError(db_parameter_group_name) def authorize_security_group( self, security_group_name: str, cidr_ip: str ) -> DBSecurityGroup: security_group = self.describe_security_groups(security_group_name)[0] security_group.authorize_cidr(cidr_ip) return security_group def create_subnet_group( self, subnet_name: str, description: str, subnets: List[Any], tags: List[Dict[str, str]], ) -> DBSubnetGroup: subnet_group = DBSubnetGroup(self, subnet_name, description, subnets, tags) self.subnet_groups[subnet_name] = subnet_group return subnet_group def describe_db_subnet_groups(self, subnet_group_name: str) -> List[DBSubnetGroup]: if subnet_group_name: if subnet_group_name in self.subnet_groups: return [self.subnet_groups[subnet_group_name]] else: raise DBSubnetGroupNotFoundError(subnet_group_name) return list(self.subnet_groups.values()) def modify_db_subnet_group( self, subnet_name: str, description: str, subnets: List[Subnet] ) -> DBSubnetGroup: subnet_group = self.subnet_groups.pop(subnet_name) if not subnet_group: raise DBSubnetGroupNotFoundError(subnet_name) subnet_group.name = subnet_name subnet_group.subnets = subnets # type: ignore[assignment] if description is not None: subnet_group.description = description return subnet_group def delete_subnet_group(self, subnet_name: str) -> DBSubnetGroup: if subnet_name in self.subnet_groups: return self.subnet_groups.pop(subnet_name) else: raise DBSubnetGroupNotFoundError(subnet_name) def create_option_group(self, option_group_kwargs: Dict[str, Any]) -> OptionGroup: option_group_id = option_group_kwargs["option_group_name"] # This list was verified against the AWS Console on 14 Dec 2022 # Having an automated way (using the CLI) would be nice, but AFAICS that's not possible # # Some options that are allowed in the CLI, but that do now show up in the Console: # - Mysql 5.5 # - All postgres-versions # - oracle-se and oracle-se1 - I could not deduct the available versions # `Cannot find major version 19 for oracle-se` # (The engines do exist, otherwise the error would be `Invalid DB engine` valid_option_group_engines = { "mariadb": ["10.0", "10.1", "10.2", "10.3", "10.4", "10.5", "10.6"], "mysql": ["5.5", "5.6", "5.7", "8.0"], "oracle-ee": ["19"], "oracle-ee-cdb": ["19", "21"], "oracle-se": [], "oracle-se1": [], "oracle-se2": ["19"], "oracle-se2-cdb": ["19", "21"], "postgres": ["10", "11", "12", "13"], "sqlserver-ee": ["11.00", "12.00", "13.00", "14.00", "15.00"], "sqlserver-ex": ["11.00", "12.00", "13.00", "14.00", "15.00"], "sqlserver-se": ["11.00", "12.00", "13.00", "14.00", "15.00"], "sqlserver-web": ["11.00", "12.00", "13.00", "14.00", "15.00"], } if option_group_id in self.option_groups: raise RDSClientError( "OptionGroupAlreadyExistsFault", f"An option group named {option_group_id} already exists.", ) if ( "option_group_description" not in option_group_kwargs or not option_group_kwargs["option_group_description"] ): raise RDSClientError( "InvalidParameterValue", "The parameter OptionGroupDescription must be provided and must not be blank.", ) if option_group_kwargs["engine_name"] not in valid_option_group_engines.keys(): raise RDSClientError( "InvalidParameterValue", "Invalid DB engine: non-existent" ) if ( option_group_kwargs["major_engine_version"] # type: ignore not in valid_option_group_engines[option_group_kwargs["engine_name"]] ): raise RDSClientError( "InvalidParameterCombination", f"Cannot find major version {option_group_kwargs['major_engine_version']} for {option_group_kwargs['engine_name']}", ) # AWS also creates default option groups, if they do not yet exist, when creating an option group in the CLI # Maybe we should do the same # { # "OptionGroupName": "default:postgres-10", # "OptionGroupDescription": "Default option group for postgres 10", # "EngineName": "postgres", # "MajorEngineVersion": "10", # "Options": [], # "AllowsVpcAndNonVpcInstanceMemberships": true, # "OptionGroupArn": "arn:aws:rds:us-east-1:{account}:og:default:postgres-10" # } # The CLI does not allow deletion of default groups option_group = OptionGroup(self, **option_group_kwargs) self.option_groups[option_group_id] = option_group return option_group def delete_option_group(self, option_group_name: str) -> OptionGroup: if option_group_name in self.option_groups: return self.option_groups.pop(option_group_name) else: raise OptionGroupNotFoundFaultError(option_group_name) def describe_option_groups( self, option_group_kwargs: Dict[str, Any] ) -> List[OptionGroup]: option_group_list = [] for option_group in self.option_groups.values(): if ( option_group_kwargs["option_group_name"] and option_group.name != option_group_kwargs["option_group_name"] ): continue elif option_group_kwargs.get( "engine_name" ) and option_group.engine_name != option_group_kwargs.get("engine_name"): continue elif option_group_kwargs.get( "major_engine_version" ) and option_group.major_engine_version != option_group_kwargs.get( "major_engine_version" ): continue else: option_group_list.append(option_group) if not len(option_group_list): raise OptionGroupNotFoundFaultError( option_group_kwargs["option_group_name"] ) return option_group_list @staticmethod def describe_option_group_options( engine_name: str, major_engine_version: Optional[str] = None ) -> str: default_option_group_options = { "mysql": { "5.6": '<DescribeOptionGroupOptionsResponse xmlns="http://rds.amazonaws.com/doc/2014-09-01/">\n <DescribeOptionGroupOptionsResult>\n <OptionGroupOptions>\n \n <OptionGroupOption><MajorEngineVersion>5.6</MajorEngineVersion><DefaultPort>11211</DefaultPort><PortRequired>True</PortRequired><OptionsDependedOn></OptionsDependedOn><Description>Innodb Memcached for MySQL</Description><Name>MEMCACHED</Name><OptionGroupOptionSettings><OptionGroupOptionSetting><AllowedValues>1-4294967295</AllowedValues><ApplyType>STATIC</ApplyType><DefaultValue>1</DefaultValue><IsModifiable>True</IsModifiable><SettingDescription>Specifies how many memcached read operations (get) to perform before doing a COMMIT to start a new transaction</SettingDescription><SettingName>DAEMON_MEMCACHED_R_BATCH_SIZE</SettingName></OptionGroupOptionSetting><OptionGroupOptionSetting><AllowedValues>1-4294967295</AllowedValues><ApplyType>STATIC</ApplyType><DefaultValue>1</DefaultValue><IsModifiable>True</IsModifiable><SettingDescription>Specifies how many memcached write operations, such as add, set, or incr, to perform before doing a COMMIT to start a new transaction</SettingDescription><SettingName>DAEMON_MEMCACHED_W_BATCH_SIZE</SettingName></OptionGroupOptionSetting><OptionGroupOptionSetting><AllowedValues>1-1073741824</AllowedValues><ApplyType>DYNAMIC</ApplyType><DefaultValue>5</DefaultValue><IsModifiable>True</IsModifiable><SettingDescription>Specifies how often to auto-commit idle connections that use the InnoDB memcached interface.</SettingDescription><SettingName>INNODB_API_BK_COMMIT_INTERVAL</SettingName></OptionGroupOptionSetting><OptionGroupOptionSetting><AllowedValues>0,1</AllowedValues><ApplyType>STATIC</ApplyType><DefaultValue>0</DefaultValue><IsModifiable>True</IsModifiable><SettingDescription>Disables the use of row locks when using the InnoDB memcached interface.</SettingDescription><SettingName>INNODB_API_DISABLE_ROWLOCK</SettingName></OptionGroupOptionSetting><OptionGroupOptionSetting><AllowedValues>0,1</AllowedValues><ApplyType>STATIC</ApplyType><DefaultValue>0</DefaultValue><IsModifiable>True</IsModifiable><SettingDescription>Locks the table used by the InnoDB memcached plugin, so that it cannot be dropped or altered by DDL through the SQL interface.</SettingDescription><SettingName>INNODB_API_ENABLE_MDL</SettingName></OptionGroupOptionSetting><OptionGroupOptionSetting><AllowedValues>0-3</AllowedValues><ApplyType>STATIC</ApplyType><DefaultValue>0</DefaultValue><IsModifiable>True</IsModifiable><SettingDescription>Lets you control the transaction isolation level on queries processed by the memcached interface.</SettingDescription><SettingName>INNODB_API_TRX_LEVEL</SettingName></OptionGroupOptionSetting><OptionGroupOptionSetting><AllowedValues>auto,ascii,binary</AllowedValues><ApplyType>STATIC</ApplyType><DefaultValue>auto</DefaultValue><IsModifiable>True</IsModifiable><SettingDescription>The binding protocol to use which can be either auto, ascii, or binary. The default is auto which means the server automatically negotiates the protocol with the client.</SettingDescription><SettingName>BINDING_PROTOCOL</SettingName></OptionGroupOptionSetting><OptionGroupOptionSetting><AllowedValues>1-2048</AllowedValues><ApplyType>STATIC</ApplyType><DefaultValue>1024</DefaultValue><IsModifiable>True</IsModifiable><SettingDescription>The backlog queue configures how many network connections can be waiting to be processed by memcached</SettingDescription><SettingName>BACKLOG_QUEUE_LIMIT</SettingName></OptionGroupOptionSetting><OptionGroupOptionSetting><AllowedValues>0,1</AllowedValues><ApplyType>STATIC</ApplyType><DefaultValue>0</DefaultValue><IsModifiable>True</IsModifiable><SettingDescription>Disable the use of compare and swap (CAS) which reduces the per-item size by 8 bytes.</SettingDescription><SettingName>CAS_DISABLED</SettingName></OptionGroupOptionSetting><OptionGroupOptionSetting><AllowedValues>1-48</AllowedValues><ApplyType>STATIC</ApplyType><DefaultValue>48</DefaultValue><IsModifiable>True</IsModifiable><SettingDescription>Minimum chunk size in bytes to allocate for the smallest item\'s key, value, and flags. The default is 48 and you can get a significant memory efficiency gain with a lower value.</SettingDescription><SettingName>CHUNK_SIZE</SettingName></OptionGroupOptionSetting><OptionGroupOptionSetting><AllowedValues>1-2</AllowedValues><ApplyType>STATIC</ApplyType><DefaultValue>1.25</DefaultValue><IsModifiable>True</IsModifiable><SettingDescription>Chunk size growth factor that controls the size of each successive chunk with each chunk growing times this amount larger than the previous chunk.</SettingDescription><SettingName>CHUNK_SIZE_GROWTH_FACTOR</SettingName></OptionGroupOptionSetting><OptionGroupOptionSetting><AllowedValues>0,1</AllowedValues><ApplyType>STATIC</ApplyType><DefaultValue>0</DefaultValue><IsModifiable>True</IsModifiable><SettingDescription>If enabled when there is no more memory to store items, memcached will return an error rather than evicting items.</SettingDescription><SettingName>ERROR_ON_MEMORY_EXHAUSTED</SettingName></OptionGroupOptionSetting><OptionGroupOptionSetting><AllowedValues>10-1024</AllowedValues><ApplyType>STATIC</ApplyType><DefaultValue>1024</DefaultValue><IsModifiable>True</IsModifiable><SettingDescription>Maximum number of concurrent connections. Setting this value to anything less than 10 prevents MySQL from starting.</SettingDescription><SettingName>MAX_SIMULTANEOUS_CONNECTIONS</SettingName></OptionGroupOptionSetting><OptionGroupOptionSetting><AllowedValues>v,vv,vvv</AllowedValues><ApplyType>STATIC</ApplyType><DefaultValue>v</DefaultValue><IsModifiable>True</IsModifiable><SettingDescription>Verbose level for memcached.</SettingDescription><SettingName>VERBOSITY</SettingName></OptionGroupOptionSetting></OptionGroupOptionSettings><EngineName>mysql</EngineName></OptionGroupOption>\n \n </OptionGroupOptions>\n </DescribeOptionGroupOptionsResult>\n <ResponseMetadata>\n <RequestId>457f7bb8-9fbf-11e4-9084-5754f80d5144</RequestId>\n </ResponseMetadata>\n</DescribeOptionGroupOptionsResponse>', "all": '<DescribeOptionGroupOptionsResponse xmlns="http://rds.amazonaws.com/doc/2014-09-01/">\n <DescribeOptionGroupOptionsResult>\n <OptionGroupOptions>\n \n <OptionGroupOption><MajorEngineVersion>5.6</MajorEngineVersion><DefaultPort>11211</DefaultPort><PortRequired>True</PortRequired><OptionsDependedOn></OptionsDependedOn><Description>Innodb Memcached for MySQL</Description><Name>MEMCACHED</Name><OptionGroupOptionSettings><OptionGroupOptionSetting><AllowedValues>1-4294967295</AllowedValues><ApplyType>STATIC</ApplyType><DefaultValue>1</DefaultValue><IsModifiable>True</IsModifiable><SettingDescription>Specifies how many memcached read operations (get) to perform before doing a COMMIT to start a new transaction</SettingDescription><SettingName>DAEMON_MEMCACHED_R_BATCH_SIZE</SettingName></OptionGroupOptionSetting><OptionGroupOptionSetting><AllowedValues>1-4294967295</AllowedValues><ApplyType>STATIC</ApplyType><DefaultValue>1</DefaultValue><IsModifiable>True</IsModifiable><SettingDescription>Specifies how many memcached write operations, such as add, set, or incr, to perform before doing a COMMIT to start a new transaction</SettingDescription><SettingName>DAEMON_MEMCACHED_W_BATCH_SIZE</SettingName></OptionGroupOptionSetting><OptionGroupOptionSetting><AllowedValues>1-1073741824</AllowedValues><ApplyType>DYNAMIC</ApplyType><DefaultValue>5</DefaultValue><IsModifiable>True</IsModifiable><SettingDescription>Specifies how often to auto-commit idle connections that use the InnoDB memcached interface.</SettingDescription><SettingName>INNODB_API_BK_COMMIT_INTERVAL</SettingName></OptionGroupOptionSetting><OptionGroupOptionSetting><AllowedValues>0,1</AllowedValues><ApplyType>STATIC</ApplyType><DefaultValue>0</DefaultValue><IsModifiable>True</IsModifiable><SettingDescription>Disables the use of row locks when using the InnoDB memcached interface.</SettingDescription><SettingName>INNODB_API_DISABLE_ROWLOCK</SettingName></OptionGroupOptionSetting><OptionGroupOptionSetting><AllowedValues>0,1</AllowedValues><ApplyType>STATIC</ApplyType><DefaultValue>0</DefaultValue><IsModifiable>True</IsModifiable><SettingDescription>Locks the table used by the InnoDB memcached plugin, so that it cannot be dropped or altered by DDL through the SQL interface.</SettingDescription><SettingName>INNODB_API_ENABLE_MDL</SettingName></OptionGroupOptionSetting><OptionGroupOptionSetting><AllowedValues>0-3</AllowedValues><ApplyType>STATIC</ApplyType><DefaultValue>0</DefaultValue><IsModifiable>True</IsModifiable><SettingDescription>Lets you control the transaction isolation level on queries processed by the memcached interface.</SettingDescription><SettingName>INNODB_API_TRX_LEVEL</SettingName></OptionGroupOptionSetting><OptionGroupOptionSetting><AllowedValues>auto,ascii,binary</AllowedValues><ApplyType>STATIC</ApplyType><DefaultValue>auto</DefaultValue><IsModifiable>True</IsModifiable><SettingDescription>The binding protocol to use which can be either auto, ascii, or binary. The default is auto which means the server automatically negotiates the protocol with the client.</SettingDescription><SettingName>BINDING_PROTOCOL</SettingName></OptionGroupOptionSetting><OptionGroupOptionSetting><AllowedValues>1-2048</AllowedValues><ApplyType>STATIC</ApplyType><DefaultValue>1024</DefaultValue><IsModifiable>True</IsModifiable><SettingDescription>The backlog queue configures how many network connections can be waiting to be processed by memcached</SettingDescription><SettingName>BACKLOG_QUEUE_LIMIT</SettingName></OptionGroupOptionSetting><OptionGroupOptionSetting><AllowedValues>0,1</AllowedValues><ApplyType>STATIC</ApplyType><DefaultValue>0</DefaultValue><IsModifiable>True</IsModifiable><SettingDescription>Disable the use of compare and swap (CAS) which reduces the per-item size by 8 bytes.</SettingDescription><SettingName>CAS_DISABLED</SettingName></OptionGroupOptionSetting><OptionGroupOptionSetting><AllowedValues>1-48</AllowedValues><ApplyType>STATIC</ApplyType><DefaultValue>48</DefaultValue><IsModifiable>True</IsModifiable><SettingDescription>Minimum chunk size in bytes to allocate for the smallest item\'s key, value, and flags. The default is 48 and you can get a significant memory efficiency gain with a lower value.</SettingDescription><SettingName>CHUNK_SIZE</SettingName></OptionGroupOptionSetting><OptionGroupOptionSetting><AllowedValues>1-2</AllowedValues><ApplyType>STATIC</ApplyType><DefaultValue>1.25</DefaultValue><IsModifiable>True</IsModifiable><SettingDescription>Chunk size growth factor that controls the size of each successive chunk with each chunk growing times this amount larger than the previous chunk.</SettingDescription><SettingName>CHUNK_SIZE_GROWTH_FACTOR</SettingName></OptionGroupOptionSetting><OptionGroupOptionSetting><AllowedValues>0,1</AllowedValues><ApplyType>STATIC</ApplyType><DefaultValue>0</DefaultValue><IsModifiable>True</IsModifiable><SettingDescription>If enabled when there is no more memory to store items, memcached will return an error rather than evicting items.</SettingDescription><SettingName>ERROR_ON_MEMORY_EXHAUSTED</SettingName></OptionGroupOptionSetting><OptionGroupOptionSetting><AllowedValues>10-1024</AllowedValues><ApplyType>STATIC</ApplyType><DefaultValue>1024</DefaultValue><IsModifiable>True</IsModifiable><SettingDescription>Maximum number of concurrent connections. Setting this value to anything less than 10 prevents MySQL from starting.</SettingDescription><SettingName>MAX_SIMULTANEOUS_CONNECTIONS</SettingName></OptionGroupOptionSetting><OptionGroupOptionSetting><AllowedValues>v,vv,vvv</AllowedValues><ApplyType>STATIC</ApplyType><DefaultValue>v</DefaultValue><IsModifiable>True</IsModifiable><SettingDescription>Verbose level for memcached.</SettingDescription><SettingName>VERBOSITY</SettingName></OptionGroupOptionSetting></OptionGroupOptionSettings><EngineName>mysql</EngineName></OptionGroupOption>\n \n </OptionGroupOptions>\n </DescribeOptionGroupOptionsResult>\n <ResponseMetadata>\n <RequestId>457f7bb8-9fbf-11e4-9084-5754f80d5144</RequestId>\n </ResponseMetadata>\n</DescribeOptionGroupOptionsResponse>', }, "oracle-ee": { "11.2": '<DescribeOptionGroupOptionsResponse xmlns="http://rds.amazonaws.com/doc/2014-09-01/">\n <DescribeOptionGroupOptionsResult>\n <OptionGroupOptions>\n \n <OptionGroupOption><MajorEngineVersion>11.2</MajorEngineVersion><OptionsDependedOn><OptionName>XMLDB</OptionName></OptionsDependedOn><Description>Oracle Application Express Runtime Environment</Description><Name>APEX</Name><OptionGroupOptionSettings></OptionGroupOptionSettings><EngineName>oracle-ee</EngineName></OptionGroupOption>\n \n <OptionGroupOption><MajorEngineVersion>11.2</MajorEngineVersion><OptionsDependedOn><OptionName>APEX</OptionName></OptionsDependedOn><Description>Oracle Application Express Development Environment</Description><Name>APEX-DEV</Name><OptionGroupOptionSettings></OptionGroupOptionSettings><EngineName>oracle-ee</EngineName></OptionGroupOption>\n \n <OptionGroupOption><MajorEngineVersion>11.2</MajorEngineVersion><OptionsDependedOn></OptionsDependedOn><Description>Oracle Advanced Security - Native Network Encryption</Description><Name>NATIVE_NETWORK_ENCRYPTION</Name><OptionGroupOptionSettings><OptionGroupOptionSetting><AllowedValues>ACCEPTED,REJECTED,REQUESTED,REQUIRED</AllowedValues><ApplyType>STATIC</ApplyType><DefaultValue>REQUESTED</DefaultValue><IsModifiable>True</IsModifiable><SettingDescription>Specifies the desired encryption behavior</SettingDescription><SettingName>SQLNET.ENCRYPTION_SERVER</SettingName></OptionGroupOptionSetting><OptionGroupOptionSetting><AllowedValues>ACCEPTED,REJECTED,REQUESTED,REQUIRED</AllowedValues><ApplyType>STATIC</ApplyType><DefaultValue>REQUESTED</DefaultValue><IsModifiable>True</IsModifiable><SettingDescription>Specifies the desired data integrity behavior</SettingDescription><SettingName>SQLNET.CRYPTO_CHECKSUM_SERVER</SettingName></OptionGroupOptionSetting><OptionGroupOptionSetting><AllowedValues>RC4_256,AES256,AES192,3DES168,RC4_128,AES128,3DES112,RC4_56,DES,RC4_40,DES40</AllowedValues><ApplyType>STATIC</ApplyType><DefaultValue>RC4_256,AES256,AES192,3DES168,RC4_128,AES128,3DES112,RC4_56,DES,RC4_40,DES40</DefaultValue><IsModifiable>True</IsModifiable><SettingDescription>Specifies list of encryption algorithms in order of intended use</SettingDescription><SettingName>SQLNET.ENCRYPTION_TYPES_SERVER</SettingName></OptionGroupOptionSetting><OptionGroupOptionSetting><AllowedValues>SHA1,MD5</AllowedValues><ApplyType>STATIC</ApplyType><DefaultValue>SHA1,MD5</DefaultValue><IsModifiable>True</IsModifiable><SettingDescription>Specifies list of checksumming algorithms in order of intended use</SettingDescription><SettingName>SQLNET.CRYPTO_CHECKSUM_TYPES_SERVER</SettingName></OptionGroupOptionSetting></OptionGroupOptionSettings><EngineName>oracle-ee</EngineName></OptionGroupOption>\n \n <OptionGroupOption><MajorEngineVersion>11.2</MajorEngineVersion><DefaultPort>1158</DefaultPort><PortRequired>True</PortRequired><OptionsDependedOn></OptionsDependedOn><Description>Oracle Enterprise Manager (Database Control only)</Description><Name>OEM</Name><OptionGroupOptionSettings></OptionGroupOptionSettings><EngineName>oracle-ee</EngineName></OptionGroupOption>\n \n <OptionGroupOption><MajorEngineVersion>11.2</MajorEngineVersion><OptionsDependedOn></OptionsDependedOn><Description>Oracle Statspack</Description><Name>STATSPACK</Name><OptionGroupOptionSettings></OptionGroupOptionSettings><EngineName>oracle-ee</EngineName></OptionGroupOption>\n \n <OptionGroupOption><MajorEngineVersion>11.2</MajorEngineVersion><Persistent>True</Persistent><OptionsDependedOn></OptionsDependedOn><Permanent>True</Permanent><Description>Oracle Advanced Security - Transparent Data Encryption</Description><Name>TDE</Name><OptionGroupOptionSettings></OptionGroupOptionSettings><EngineName>oracle-ee</EngineName></OptionGroupOption>\n \n <OptionGroupOption><MajorEngineVersion>11.2</MajorEngineVersion><Persistent>True</Persistent><OptionsDependedOn></OptionsDependedOn><Permanent>True</Permanent><Description>Oracle Advanced Security - TDE with HSM</Description><Name>TDE_HSM</Name><OptionGroupOptionSettings></OptionGroupOptionSettings><EngineName>oracle-ee</EngineName></OptionGroupOption>\n \n <OptionGroupOption><MajorEngineVersion>11.2</MajorEngineVersion><Persistent>True</Persistent><OptionsDependedOn></OptionsDependedOn><Permanent>True</Permanent><Description>Change time zone</Description><Name>Timezone</Name><OptionGroupOptionSettings><OptionGroupOptionSetting><AllowedValues>Africa/Cairo,Africa/Casablanca,Africa/Harare,Africa/Monrovia,Africa/Nairobi,Africa/Tripoli,Africa/Windhoek,America/Araguaina,America/Asuncion,America/Bogota,America/Caracas,America/Chihuahua,America/Cuiaba,America/Denver,America/Fortaleza,America/Guatemala,America/Halifax,America/Manaus,America/Matamoros,America/Monterrey,America/Montevideo,America/Phoenix,America/Santiago,America/Tijuana,Asia/Amman,Asia/Ashgabat,Asia/Baghdad,Asia/Baku,Asia/Bangkok,Asia/Beirut,Asia/Calcutta,Asia/Damascus,Asia/Dhaka,Asia/Irkutsk,Asia/Jerusalem,Asia/Kabul,Asia/Karachi,Asia/Kathmandu,Asia/Krasnoyarsk,Asia/Magadan,Asia/Muscat,Asia/Novosibirsk,Asia/Riyadh,Asia/Seoul,Asia/Shanghai,Asia/Singapore,Asia/Taipei,Asia/Tehran,Asia/Tokyo,Asia/Ulaanbaatar,Asia/Vladivostok,Asia/Yakutsk,Asia/Yerevan,Atlantic/Azores,Australia/Adelaide,Australia/Brisbane,Australia/Darwin,Australia/Hobart,Australia/Perth,Australia/Sydney,Brazil/East,Canada/Newfoundland,Canada/Saskatchewan,Europe/Amsterdam,Europe/Athens,Europe/Dublin,Europe/Helsinki,Europe/Istanbul,Europe/Kaliningrad,Europe/Moscow,Europe/Paris,Europe/Prague,Europe/Sarajevo,Pacific/Auckland,Pacific/Fiji,Pacific/Guam,Pacific/Honolulu,Pacific/Samoa,US/Alaska,US/Central,US/Eastern,US/East-Indiana,US/Pacific,UTC</AllowedValues><ApplyType>DYNAMIC</ApplyType><DefaultValue>UTC</DefaultValue><IsModifiable>True</IsModifiable><SettingDescription>Specifies the timezone the user wants to change the system time to</SettingDescription><SettingName>TIME_ZONE</SettingName></OptionGroupOptionSetting></OptionGroupOptionSettings><EngineName>oracle-ee</EngineName></OptionGroupOption>\n \n <OptionGroupOption><MajorEngineVersion>11.2</MajorEngineVersion><OptionsDependedOn></OptionsDependedOn><Description>Oracle XMLDB Repository</Description><Name>XMLDB</Name><OptionGroupOptionSettings></OptionGroupOptionSettings><EngineName>oracle-ee</EngineName></OptionGroupOption>\n \n </OptionGroupOptions>\n </DescribeOptionGroupOptionsResult>\n <ResponseMetadata>\n <RequestId>457f7bb8-9fbf-11e4-9084-5754f80d5144</RequestId>\n </ResponseMetadata>\n</DescribeOptionGroupOptionsResponse>', "all": '<DescribeOptionGroupOptionsResponse xmlns="http://rds.amazonaws.com/doc/2014-09-01/">\n <DescribeOptionGroupOptionsResult>\n <OptionGroupOptions>\n \n <OptionGroupOption><MajorEngineVersion>11.2</MajorEngineVersion><OptionsDependedOn><OptionName>XMLDB</OptionName></OptionsDependedOn><Description>Oracle Application Express Runtime Environment</Description><Name>APEX</Name><OptionGroupOptionSettings></OptionGroupOptionSettings><EngineName>oracle-ee</EngineName></OptionGroupOption>\n \n <OptionGroupOption><MajorEngineVersion>11.2</MajorEngineVersion><OptionsDependedOn><OptionName>APEX</OptionName></OptionsDependedOn><Description>Oracle Application Express Development Environment</Description><Name>APEX-DEV</Name><OptionGroupOptionSettings></OptionGroupOptionSettings><EngineName>oracle-ee</EngineName></OptionGroupOption>\n \n <OptionGroupOption><MajorEngineVersion>11.2</MajorEngineVersion><OptionsDependedOn></OptionsDependedOn><Description>Oracle Advanced Security - Native Network Encryption</Description><Name>NATIVE_NETWORK_ENCRYPTION</Name><OptionGroupOptionSettings><OptionGroupOptionSetting><AllowedValues>ACCEPTED,REJECTED,REQUESTED,REQUIRED</AllowedValues><ApplyType>STATIC</ApplyType><DefaultValue>REQUESTED</DefaultValue><IsModifiable>True</IsModifiable><SettingDescription>Specifies the desired encryption behavior</SettingDescription><SettingName>SQLNET.ENCRYPTION_SERVER</SettingName></OptionGroupOptionSetting><OptionGroupOptionSetting><AllowedValues>ACCEPTED,REJECTED,REQUESTED,REQUIRED</AllowedValues><ApplyType>STATIC</ApplyType><DefaultValue>REQUESTED</DefaultValue><IsModifiable>True</IsModifiable><SettingDescription>Specifies the desired data integrity behavior</SettingDescription><SettingName>SQLNET.CRYPTO_CHECKSUM_SERVER</SettingName></OptionGroupOptionSetting><OptionGroupOptionSetting><AllowedValues>RC4_256,AES256,AES192,3DES168,RC4_128,AES128,3DES112,RC4_56,DES,RC4_40,DES40</AllowedValues><ApplyType>STATIC</ApplyType><DefaultValue>RC4_256,AES256,AES192,3DES168,RC4_128,AES128,3DES112,RC4_56,DES,RC4_40,DES40</DefaultValue><IsModifiable>True</IsModifiable><SettingDescription>Specifies list of encryption algorithms in order of intended use</SettingDescription><SettingName>SQLNET.ENCRYPTION_TYPES_SERVER</SettingName></OptionGroupOptionSetting><OptionGroupOptionSetting><AllowedValues>SHA1,MD5</AllowedValues><ApplyType>STATIC</ApplyType><DefaultValue>SHA1,MD5</DefaultValue><IsModifiable>True</IsModifiable><SettingDescription>Specifies list of checksumming algorithms in order of intended use</SettingDescription><SettingName>SQLNET.CRYPTO_CHECKSUM_TYPES_SERVER</SettingName></OptionGroupOptionSetting></OptionGroupOptionSettings><EngineName>oracle-ee</EngineName></OptionGroupOption>\n \n <OptionGroupOption><MajorEngineVersion>11.2</MajorEngineVersion><DefaultPort>1158</DefaultPort><PortRequired>True</PortRequired><OptionsDependedOn></OptionsDependedOn><Description>Oracle Enterprise Manager (Database Control only)</Description><Name>OEM</Name><OptionGroupOptionSettings></OptionGroupOptionSettings><EngineName>oracle-ee</EngineName></OptionGroupOption>\n \n <OptionGroupOption><MajorEngineVersion>11.2</MajorEngineVersion><OptionsDependedOn></OptionsDependedOn><Description>Oracle Statspack</Description><Name>STATSPACK</Name><OptionGroupOptionSettings></OptionGroupOptionSettings><EngineName>oracle-ee</EngineName></OptionGroupOption>\n \n <OptionGroupOption><MajorEngineVersion>11.2</MajorEngineVersion><Persistent>True</Persistent><OptionsDependedOn></OptionsDependedOn><Permanent>True</Permanent><Description>Oracle Advanced Security - Transparent Data Encryption</Description><Name>TDE</Name><OptionGroupOptionSettings></OptionGroupOptionSettings><EngineName>oracle-ee</EngineName></OptionGroupOption>\n \n <OptionGroupOption><MajorEngineVersion>11.2</MajorEngineVersion><Persistent>True</Persistent><OptionsDependedOn></OptionsDependedOn><Permanent>True</Permanent><Description>Oracle Advanced Security - TDE with HSM</Description><Name>TDE_HSM</Name><OptionGroupOptionSettings></OptionGroupOptionSettings><EngineName>oracle-ee</EngineName></OptionGroupOption>\n \n <OptionGroupOption><MajorEngineVersion>11.2</MajorEngineVersion><Persistent>True</Persistent><OptionsDependedOn></OptionsDependedOn><Permanent>True</Permanent><Description>Change time zone</Description><Name>Timezone</Name><OptionGroupOptionSettings><OptionGroupOptionSetting><AllowedValues>Africa/Cairo,Africa/Casablanca,Africa/Harare,Africa/Monrovia,Africa/Nairobi,Africa/Tripoli,Africa/Windhoek,America/Araguaina,America/Asuncion,America/Bogota,America/Caracas,America/Chihuahua,America/Cuiaba,America/Denver,America/Fortaleza,America/Guatemala,America/Halifax,America/Manaus,America/Matamoros,America/Monterrey,America/Montevideo,America/Phoenix,America/Santiago,America/Tijuana,Asia/Amman,Asia/Ashgabat,Asia/Baghdad,Asia/Baku,Asia/Bangkok,Asia/Beirut,Asia/Calcutta,Asia/Damascus,Asia/Dhaka,Asia/Irkutsk,Asia/Jerusalem,Asia/Kabul,Asia/Karachi,Asia/Kathmandu,Asia/Krasnoyarsk,Asia/Magadan,Asia/Muscat,Asia/Novosibirsk,Asia/Riyadh,Asia/Seoul,Asia/Shanghai,Asia/Singapore,Asia/Taipei,Asia/Tehran,Asia/Tokyo,Asia/Ulaanbaatar,Asia/Vladivostok,Asia/Yakutsk,Asia/Yerevan,Atlantic/Azores,Australia/Adelaide,Australia/Brisbane,Australia/Darwin,Australia/Hobart,Australia/Perth,Australia/Sydney,Brazil/East,Canada/Newfoundland,Canada/Saskatchewan,Europe/Amsterdam,Europe/Athens,Europe/Dublin,Europe/Helsinki,Europe/Istanbul,Europe/Kaliningrad,Europe/Moscow,Europe/Paris,Europe/Prague,Europe/Sarajevo,Pacific/Auckland,Pacific/Fiji,Pacific/Guam,Pacific/Honolulu,Pacific/Samoa,US/Alaska,US/Central,US/Eastern,US/East-Indiana,US/Pacific,UTC</AllowedValues><ApplyType>DYNAMIC</ApplyType><DefaultValue>UTC</DefaultValue><IsModifiable>True</IsModifiable><SettingDescription>Specifies the timezone the user wants to change the system time to</SettingDescription><SettingName>TIME_ZONE</SettingName></OptionGroupOptionSetting></OptionGroupOptionSettings><EngineName>oracle-ee</EngineName></OptionGroupOption>\n \n <OptionGroupOption><MajorEngineVersion>11.2</MajorEngineVersion><OptionsDependedOn></OptionsDependedOn><Description>Oracle XMLDB Repository</Description><Name>XMLDB</Name><OptionGroupOptionSettings></OptionGroupOptionSettings><EngineName>oracle-ee</EngineName></OptionGroupOption>\n \n </OptionGroupOptions>\n </DescribeOptionGroupOptionsResult>\n <ResponseMetadata>\n <RequestId>457f7bb8-9fbf-11e4-9084-5754f80d5144</RequestId>\n </ResponseMetadata>\n</DescribeOptionGroupOptionsResponse>', }, "oracle-sa": { "11.2": '<DescribeOptionGroupOptionsResponse xmlns="http://rds.amazonaws.com/doc/2014-09-01/">\n <DescribeOptionGroupOptionsResult>\n <OptionGroupOptions>\n \n <OptionGroupOption><MajorEngineVersion>11.2</MajorEngineVersion><OptionsDependedOn><OptionName>XMLDB</OptionName></OptionsDependedOn><Description>Oracle Application Express Runtime Environment</Description><Name>APEX</Name><OptionGroupOptionSettings></OptionGroupOptionSettings><EngineName>oracle-ee</EngineName></OptionGroupOption>\n \n <OptionGroupOption><MajorEngineVersion>11.2</MajorEngineVersion><OptionsDependedOn><OptionName>APEX</OptionName></OptionsDependedOn><Description>Oracle Application Express Development Environment</Description><Name>APEX-DEV</Name><OptionGroupOptionSettings></OptionGroupOptionSettings><EngineName>oracle-ee</EngineName></OptionGroupOption>\n \n <OptionGroupOption><MajorEngineVersion>11.2</MajorEngineVersion><OptionsDependedOn></OptionsDependedOn><Description>Oracle Advanced Security - Native Network Encryption</Description><Name>NATIVE_NETWORK_ENCRYPTION</Name><OptionGroupOptionSettings><OptionGroupOptionSetting><AllowedValues>ACCEPTED,REJECTED,REQUESTED,REQUIRED</AllowedValues><ApplyType>STATIC</ApplyType><DefaultValue>REQUESTED</DefaultValue><IsModifiable>True</IsModifiable><SettingDescription>Specifies the desired encryption behavior</SettingDescription><SettingName>SQLNET.ENCRYPTION_SERVER</SettingName></OptionGroupOptionSetting><OptionGroupOptionSetting><AllowedValues>ACCEPTED,REJECTED,REQUESTED,REQUIRED</AllowedValues><ApplyType>STATIC</ApplyType><DefaultValue>REQUESTED</DefaultValue><IsModifiable>True</IsModifiable><SettingDescription>Specifies the desired data integrity behavior</SettingDescription><SettingName>SQLNET.CRYPTO_CHECKSUM_SERVER</SettingName></OptionGroupOptionSetting><OptionGroupOptionSetting><AllowedValues>RC4_256,AES256,AES192,3DES168,RC4_128,AES128,3DES112,RC4_56,DES,RC4_40,DES40</AllowedValues><ApplyType>STATIC</ApplyType><DefaultValue>RC4_256,AES256,AES192,3DES168,RC4_128,AES128,3DES112,RC4_56,DES,RC4_40,DES40</DefaultValue><IsModifiable>True</IsModifiable><SettingDescription>Specifies list of encryption algorithms in order of intended use</SettingDescription><SettingName>SQLNET.ENCRYPTION_TYPES_SERVER</SettingName></OptionGroupOptionSetting><OptionGroupOptionSetting><AllowedValues>SHA1,MD5</AllowedValues><ApplyType>STATIC</ApplyType><DefaultValue>SHA1,MD5</DefaultValue><IsModifiable>True</IsModifiable><SettingDescription>Specifies list of checksumming algorithms in order of intended use</SettingDescription><SettingName>SQLNET.CRYPTO_CHECKSUM_TYPES_SERVER</SettingName></OptionGroupOptionSetting></OptionGroupOptionSettings><EngineName>oracle-ee</EngineName></OptionGroupOption>\n \n <OptionGroupOption><MajorEngineVersion>11.2</MajorEngineVersion><DefaultPort>1158</DefaultPort><PortRequired>True</PortRequired><OptionsDependedOn></OptionsDependedOn><Description>Oracle Enterprise Manager (Database Control only)</Description><Name>OEM</Name><OptionGroupOptionSettings></OptionGroupOptionSettings><EngineName>oracle-ee</EngineName></OptionGroupOption>\n \n <OptionGroupOption><MajorEngineVersion>11.2</MajorEngineVersion><OptionsDependedOn></OptionsDependedOn><Description>Oracle Statspack</Description><Name>STATSPACK</Name><OptionGroupOptionSettings></OptionGroupOptionSettings><EngineName>oracle-ee</EngineName></OptionGroupOption>\n \n <OptionGroupOption><MajorEngineVersion>11.2</MajorEngineVersion><Persistent>True</Persistent><OptionsDependedOn></OptionsDependedOn><Permanent>True</Permanent><Description>Oracle Advanced Security - Transparent Data Encryption</Description><Name>TDE</Name><OptionGroupOptionSettings></OptionGroupOptionSettings><EngineName>oracle-ee</EngineName></OptionGroupOption>\n \n <OptionGroupOption><MajorEngineVersion>11.2</MajorEngineVersion><Persistent>True</Persistent><OptionsDependedOn></OptionsDependedOn><Permanent>True</Permanent><Description>Oracle Advanced Security - TDE with HSM</Description><Name>TDE_HSM</Name><OptionGroupOptionSettings></OptionGroupOptionSettings><EngineName>oracle-ee</EngineName></OptionGroupOption>\n \n <OptionGroupOption><MajorEngineVersion>11.2</MajorEngineVersion><Persistent>True</Persistent><OptionsDependedOn></OptionsDependedOn><Permanent>True</Permanent><Description>Change time zone</Description><Name>Timezone</Name><OptionGroupOptionSettings><OptionGroupOptionSetting><AllowedValues>Africa/Cairo,Africa/Casablanca,Africa/Harare,Africa/Monrovia,Africa/Nairobi,Africa/Tripoli,Africa/Windhoek,America/Araguaina,America/Asuncion,America/Bogota,America/Caracas,America/Chihuahua,America/Cuiaba,America/Denver,America/Fortaleza,America/Guatemala,America/Halifax,America/Manaus,America/Matamoros,America/Monterrey,America/Montevideo,America/Phoenix,America/Santiago,America/Tijuana,Asia/Amman,Asia/Ashgabat,Asia/Baghdad,Asia/Baku,Asia/Bangkok,Asia/Beirut,Asia/Calcutta,Asia/Damascus,Asia/Dhaka,Asia/Irkutsk,Asia/Jerusalem,Asia/Kabul,Asia/Karachi,Asia/Kathmandu,Asia/Krasnoyarsk,Asia/Magadan,Asia/Muscat,Asia/Novosibirsk,Asia/Riyadh,Asia/Seoul,Asia/Shanghai,Asia/Singapore,Asia/Taipei,Asia/Tehran,Asia/Tokyo,Asia/Ulaanbaatar,Asia/Vladivostok,Asia/Yakutsk,Asia/Yerevan,Atlantic/Azores,Australia/Adelaide,Australia/Brisbane,Australia/Darwin,Australia/Hobart,Australia/Perth,Australia/Sydney,Brazil/East,Canada/Newfoundland,Canada/Saskatchewan,Europe/Amsterdam,Europe/Athens,Europe/Dublin,Europe/Helsinki,Europe/Istanbul,Europe/Kaliningrad,Europe/Moscow,Europe/Paris,Europe/Prague,Europe/Sarajevo,Pacific/Auckland,Pacific/Fiji,Pacific/Guam,Pacific/Honolulu,Pacific/Samoa,US/Alaska,US/Central,US/Eastern,US/East-Indiana,US/Pacific,UTC</AllowedValues><ApplyType>DYNAMIC</ApplyType><DefaultValue>UTC</DefaultValue><IsModifiable>True</IsModifiable><SettingDescription>Specifies the timezone the user wants to change the system time to</SettingDescription><SettingName>TIME_ZONE</SettingName></OptionGroupOptionSetting></OptionGroupOptionSettings><EngineName>oracle-ee</EngineName></OptionGroupOption>\n \n <OptionGroupOption><MajorEngineVersion>11.2</MajorEngineVersion><OptionsDependedOn></OptionsDependedOn><Description>Oracle XMLDB Repository</Description><Name>XMLDB</Name><OptionGroupOptionSettings></OptionGroupOptionSettings><EngineName>oracle-ee</EngineName></OptionGroupOption>\n \n </OptionGroupOptions>\n </DescribeOptionGroupOptionsResult>\n <ResponseMetadata>\n <RequestId>457f7bb8-9fbf-11e4-9084-5754f80d5144</RequestId>\n </ResponseMetadata>\n</DescribeOptionGroupOptionsResponse>', "all": '<DescribeOptionGroupOptionsResponse xmlns="http://rds.amazonaws.com/doc/2014-09-01/">\n <DescribeOptionGroupOptionsResult>\n <OptionGroupOptions>\n \n <OptionGroupOption><MajorEngineVersion>11.2</MajorEngineVersion><OptionsDependedOn><OptionName>XMLDB</OptionName></OptionsDependedOn><Description>Oracle Application Express Runtime Environment</Description><Name>APEX</Name><OptionGroupOptionSettings></OptionGroupOptionSettings><EngineName>oracle-ee</EngineName></OptionGroupOption>\n \n <OptionGroupOption><MajorEngineVersion>11.2</MajorEngineVersion><OptionsDependedOn><OptionName>APEX</OptionName></OptionsDependedOn><Description>Oracle Application Express Development Environment</Description><Name>APEX-DEV</Name><OptionGroupOptionSettings></OptionGroupOptionSettings><EngineName>oracle-ee</EngineName></OptionGroupOption>\n \n <OptionGroupOption><MajorEngineVersion>11.2</MajorEngineVersion><OptionsDependedOn></OptionsDependedOn><Description>Oracle Advanced Security - Native Network Encryption</Description><Name>NATIVE_NETWORK_ENCRYPTION</Name><OptionGroupOptionSettings><OptionGroupOptionSetting><AllowedValues>ACCEPTED,REJECTED,REQUESTED,REQUIRED</AllowedValues><ApplyType>STATIC</ApplyType><DefaultValue>REQUESTED</DefaultValue><IsModifiable>True</IsModifiable><SettingDescription>Specifies the desired encryption behavior</SettingDescription><SettingName>SQLNET.ENCRYPTION_SERVER</SettingName></OptionGroupOptionSetting><OptionGroupOptionSetting><AllowedValues>ACCEPTED,REJECTED,REQUESTED,REQUIRED</AllowedValues><ApplyType>STATIC</ApplyType><DefaultValue>REQUESTED</DefaultValue><IsModifiable>True</IsModifiable><SettingDescription>Specifies the desired data integrity behavior</SettingDescription><SettingName>SQLNET.CRYPTO_CHECKSUM_SERVER</SettingName></OptionGroupOptionSetting><OptionGroupOptionSetting><AllowedValues>RC4_256,AES256,AES192,3DES168,RC4_128,AES128,3DES112,RC4_56,DES,RC4_40,DES40</AllowedValues><ApplyType>STATIC</ApplyType><DefaultValue>RC4_256,AES256,AES192,3DES168,RC4_128,AES128,3DES112,RC4_56,DES,RC4_40,DES40</DefaultValue><IsModifiable>True</IsModifiable><SettingDescription>Specifies list of encryption algorithms in order of intended use</SettingDescription><SettingName>SQLNET.ENCRYPTION_TYPES_SERVER</SettingName></OptionGroupOptionSetting><OptionGroupOptionSetting><AllowedValues>SHA1,MD5</AllowedValues><ApplyType>STATIC</ApplyType><DefaultValue>SHA1,MD5</DefaultValue><IsModifiable>True</IsModifiable><SettingDescription>Specifies list of checksumming algorithms in order of intended use</SettingDescription><SettingName>SQLNET.CRYPTO_CHECKSUM_TYPES_SERVER</SettingName></OptionGroupOptionSetting></OptionGroupOptionSettings><EngineName>oracle-ee</EngineName></OptionGroupOption>\n \n <OptionGroupOption><MajorEngineVersion>11.2</MajorEngineVersion><DefaultPort>1158</DefaultPort><PortRequired>True</PortRequired><OptionsDependedOn></OptionsDependedOn><Description>Oracle Enterprise Manager (Database Control only)</Description><Name>OEM</Name><OptionGroupOptionSettings></OptionGroupOptionSettings><EngineName>oracle-ee</EngineName></OptionGroupOption>\n \n <OptionGroupOption><MajorEngineVersion>11.2</MajorEngineVersion><OptionsDependedOn></OptionsDependedOn><Description>Oracle Statspack</Description><Name>STATSPACK</Name><OptionGroupOptionSettings></OptionGroupOptionSettings><EngineName>oracle-ee</EngineName></OptionGroupOption>\n \n <OptionGroupOption><MajorEngineVersion>11.2</MajorEngineVersion><Persistent>True</Persistent><OptionsDependedOn></OptionsDependedOn><Permanent>True</Permanent><Description>Oracle Advanced Security - Transparent Data Encryption</Description><Name>TDE</Name><OptionGroupOptionSettings></OptionGroupOptionSettings><EngineName>oracle-ee</EngineName></OptionGroupOption>\n \n <OptionGroupOption><MajorEngineVersion>11.2</MajorEngineVersion><Persistent>True</Persistent><OptionsDependedOn></OptionsDependedOn><Permanent>True</Permanent><Description>Oracle Advanced Security - TDE with HSM</Description><Name>TDE_HSM</Name><OptionGroupOptionSettings></OptionGroupOptionSettings><EngineName>oracle-ee</EngineName></OptionGroupOption>\n \n <OptionGroupOption><MajorEngineVersion>11.2</MajorEngineVersion><Persistent>True</Persistent><OptionsDependedOn></OptionsDependedOn><Permanent>True</Permanent><Description>Change time zone</Description><Name>Timezone</Name><OptionGroupOptionSettings><OptionGroupOptionSetting><AllowedValues>Africa/Cairo,Africa/Casablanca,Africa/Harare,Africa/Monrovia,Africa/Nairobi,Africa/Tripoli,Africa/Windhoek,America/Araguaina,America/Asuncion,America/Bogota,America/Caracas,America/Chihuahua,America/Cuiaba,America/Denver,America/Fortaleza,America/Guatemala,America/Halifax,America/Manaus,America/Matamoros,America/Monterrey,America/Montevideo,America/Phoenix,America/Santiago,America/Tijuana,Asia/Amman,Asia/Ashgabat,Asia/Baghdad,Asia/Baku,Asia/Bangkok,Asia/Beirut,Asia/Calcutta,Asia/Damascus,Asia/Dhaka,Asia/Irkutsk,Asia/Jerusalem,Asia/Kabul,Asia/Karachi,Asia/Kathmandu,Asia/Krasnoyarsk,Asia/Magadan,Asia/Muscat,Asia/Novosibirsk,Asia/Riyadh,Asia/Seoul,Asia/Shanghai,Asia/Singapore,Asia/Taipei,Asia/Tehran,Asia/Tokyo,Asia/Ulaanbaatar,Asia/Vladivostok,Asia/Yakutsk,Asia/Yerevan,Atlantic/Azores,Australia/Adelaide,Australia/Brisbane,Australia/Darwin,Australia/Hobart,Australia/Perth,Australia/Sydney,Brazil/East,Canada/Newfoundland,Canada/Saskatchewan,Europe/Amsterdam,Europe/Athens,Europe/Dublin,Europe/Helsinki,Europe/Istanbul,Europe/Kaliningrad,Europe/Moscow,Europe/Paris,Europe/Prague,Europe/Sarajevo,Pacific/Auckland,Pacific/Fiji,Pacific/Guam,Pacific/Honolulu,Pacific/Samoa,US/Alaska,US/Central,US/Eastern,US/East-Indiana,US/Pacific,UTC</AllowedValues><ApplyType>DYNAMIC</ApplyType><DefaultValue>UTC</DefaultValue><IsModifiable>True</IsModifiable><SettingDescription>Specifies the timezone the user wants to change the system time to</SettingDescription><SettingName>TIME_ZONE</SettingName></OptionGroupOptionSetting></OptionGroupOptionSettings><EngineName>oracle-ee</EngineName></OptionGroupOption>\n \n <OptionGroupOption><MajorEngineVersion>11.2</MajorEngineVersion><OptionsDependedOn></OptionsDependedOn><Description>Oracle XMLDB Repository</Description><Name>XMLDB</Name><OptionGroupOptionSettings></OptionGroupOptionSettings><EngineName>oracle-ee</EngineName></OptionGroupOption>\n \n </OptionGroupOptions>\n </DescribeOptionGroupOptionsResult>\n <ResponseMetadata>\n <RequestId>457f7bb8-9fbf-11e4-9084-5754f80d5144</RequestId>\n </ResponseMetadata>\n</DescribeOptionGroupOptionsResponse>', }, "oracle-sa1": { "11.2": '<DescribeOptionGroupOptionsResponse xmlns="http://rds.amazonaws.com/doc/2014-09-01/">\n <DescribeOptionGroupOptionsResult>\n <OptionGroupOptions>\n \n <OptionGroupOption><MajorEngineVersion>11.2</MajorEngineVersion><OptionsDependedOn><OptionName>XMLDB</OptionName></OptionsDependedOn><Description>Oracle Application Express Runtime Environment</Description><Name>APEX</Name><OptionGroupOptionSettings></OptionGroupOptionSettings><EngineName>oracle-ee</EngineName></OptionGroupOption>\n \n <OptionGroupOption><MajorEngineVersion>11.2</MajorEngineVersion><OptionsDependedOn><OptionName>APEX</OptionName></OptionsDependedOn><Description>Oracle Application Express Development Environment</Description><Name>APEX-DEV</Name><OptionGroupOptionSettings></OptionGroupOptionSettings><EngineName>oracle-ee</EngineName></OptionGroupOption>\n \n <OptionGroupOption><MajorEngineVersion>11.2</MajorEngineVersion><OptionsDependedOn></OptionsDependedOn><Description>Oracle Advanced Security - Native Network Encryption</Description><Name>NATIVE_NETWORK_ENCRYPTION</Name><OptionGroupOptionSettings><OptionGroupOptionSetting><AllowedValues>ACCEPTED,REJECTED,REQUESTED,REQUIRED</AllowedValues><ApplyType>STATIC</ApplyType><DefaultValue>REQUESTED</DefaultValue><IsModifiable>True</IsModifiable><SettingDescription>Specifies the desired encryption behavior</SettingDescription><SettingName>SQLNET.ENCRYPTION_SERVER</SettingName></OptionGroupOptionSetting><OptionGroupOptionSetting><AllowedValues>ACCEPTED,REJECTED,REQUESTED,REQUIRED</AllowedValues><ApplyType>STATIC</ApplyType><DefaultValue>REQUESTED</DefaultValue><IsModifiable>True</IsModifiable><SettingDescription>Specifies the desired data integrity behavior</SettingDescription><SettingName>SQLNET.CRYPTO_CHECKSUM_SERVER</SettingName></OptionGroupOptionSetting><OptionGroupOptionSetting><AllowedValues>RC4_256,AES256,AES192,3DES168,RC4_128,AES128,3DES112,RC4_56,DES,RC4_40,DES40</AllowedValues><ApplyType>STATIC</ApplyType><DefaultValue>RC4_256,AES256,AES192,3DES168,RC4_128,AES128,3DES112,RC4_56,DES,RC4_40,DES40</DefaultValue><IsModifiable>True</IsModifiable><SettingDescription>Specifies list of encryption algorithms in order of intended use</SettingDescription><SettingName>SQLNET.ENCRYPTION_TYPES_SERVER</SettingName></OptionGroupOptionSetting><OptionGroupOptionSetting><AllowedValues>SHA1,MD5</AllowedValues><ApplyType>STATIC</ApplyType><DefaultValue>SHA1,MD5</DefaultValue><IsModifiable>True</IsModifiable><SettingDescription>Specifies list of checksumming algorithms in order of intended use</SettingDescription><SettingName>SQLNET.CRYPTO_CHECKSUM_TYPES_SERVER</SettingName></OptionGroupOptionSetting></OptionGroupOptionSettings><EngineName>oracle-ee</EngineName></OptionGroupOption>\n \n <OptionGroupOption><MajorEngineVersion>11.2</MajorEngineVersion><DefaultPort>1158</DefaultPort><PortRequired>True</PortRequired><OptionsDependedOn></OptionsDependedOn><Description>Oracle Enterprise Manager (Database Control only)</Description><Name>OEM</Name><OptionGroupOptionSettings></OptionGroupOptionSettings><EngineName>oracle-ee</EngineName></OptionGroupOption>\n \n <OptionGroupOption><MajorEngineVersion>11.2</MajorEngineVersion><OptionsDependedOn></OptionsDependedOn><Description>Oracle Statspack</Description><Name>STATSPACK</Name><OptionGroupOptionSettings></OptionGroupOptionSettings><EngineName>oracle-ee</EngineName></OptionGroupOption>\n \n <OptionGroupOption><MajorEngineVersion>11.2</MajorEngineVersion><Persistent>True</Persistent><OptionsDependedOn></OptionsDependedOn><Permanent>True</Permanent><Description>Oracle Advanced Security - Transparent Data Encryption</Description><Name>TDE</Name><OptionGroupOptionSettings></OptionGroupOptionSettings><EngineName>oracle-ee</EngineName></OptionGroupOption>\n \n <OptionGroupOption><MajorEngineVersion>11.2</MajorEngineVersion><Persistent>True</Persistent><OptionsDependedOn></OptionsDependedOn><Permanent>True</Permanent><Description>Oracle Advanced Security - TDE with HSM</Description><Name>TDE_HSM</Name><OptionGroupOptionSettings></OptionGroupOptionSettings><EngineName>oracle-ee</EngineName></OptionGroupOption>\n \n <OptionGroupOption><MajorEngineVersion>11.2</MajorEngineVersion><Persistent>True</Persistent><OptionsDependedOn></OptionsDependedOn><Permanent>True</Permanent><Description>Change time zone</Description><Name>Timezone</Name><OptionGroupOptionSettings><OptionGroupOptionSetting><AllowedValues>Africa/Cairo,Africa/Casablanca,Africa/Harare,Africa/Monrovia,Africa/Nairobi,Africa/Tripoli,Africa/Windhoek,America/Araguaina,America/Asuncion,America/Bogota,America/Caracas,America/Chihuahua,America/Cuiaba,America/Denver,America/Fortaleza,America/Guatemala,America/Halifax,America/Manaus,America/Matamoros,America/Monterrey,America/Montevideo,America/Phoenix,America/Santiago,America/Tijuana,Asia/Amman,Asia/Ashgabat,Asia/Baghdad,Asia/Baku,Asia/Bangkok,Asia/Beirut,Asia/Calcutta,Asia/Damascus,Asia/Dhaka,Asia/Irkutsk,Asia/Jerusalem,Asia/Kabul,Asia/Karachi,Asia/Kathmandu,Asia/Krasnoyarsk,Asia/Magadan,Asia/Muscat,Asia/Novosibirsk,Asia/Riyadh,Asia/Seoul,Asia/Shanghai,Asia/Singapore,Asia/Taipei,Asia/Tehran,Asia/Tokyo,Asia/Ulaanbaatar,Asia/Vladivostok,Asia/Yakutsk,Asia/Yerevan,Atlantic/Azores,Australia/Adelaide,Australia/Brisbane,Australia/Darwin,Australia/Hobart,Australia/Perth,Australia/Sydney,Brazil/East,Canada/Newfoundland,Canada/Saskatchewan,Europe/Amsterdam,Europe/Athens,Europe/Dublin,Europe/Helsinki,Europe/Istanbul,Europe/Kaliningrad,Europe/Moscow,Europe/Paris,Europe/Prague,Europe/Sarajevo,Pacific/Auckland,Pacific/Fiji,Pacific/Guam,Pacific/Honolulu,Pacific/Samoa,US/Alaska,US/Central,US/Eastern,US/East-Indiana,US/Pacific,UTC</AllowedValues><ApplyType>DYNAMIC</ApplyType><DefaultValue>UTC</DefaultValue><IsModifiable>True</IsModifiable><SettingDescription>Specifies the timezone the user wants to change the system time to</SettingDescription><SettingName>TIME_ZONE</SettingName></OptionGroupOptionSetting></OptionGroupOptionSettings><EngineName>oracle-ee</EngineName></OptionGroupOption>\n \n <OptionGroupOption><MajorEngineVersion>11.2</MajorEngineVersion><OptionsDependedOn></OptionsDependedOn><Description>Oracle XMLDB Repository</Description><Name>XMLDB</Name><OptionGroupOptionSettings></OptionGroupOptionSettings><EngineName>oracle-ee</EngineName></OptionGroupOption>\n \n </OptionGroupOptions>\n </DescribeOptionGroupOptionsResult>\n <ResponseMetadata>\n <RequestId>457f7bb8-9fbf-11e4-9084-5754f80d5144</RequestId>\n </ResponseMetadata>\n</DescribeOptionGroupOptionsResponse>', "all": '<DescribeOptionGroupOptionsResponse xmlns="http://rds.amazonaws.com/doc/2014-09-01/">\n <DescribeOptionGroupOptionsResult>\n <OptionGroupOptions>\n \n <OptionGroupOption><MajorEngineVersion>11.2</MajorEngineVersion><OptionsDependedOn><OptionName>XMLDB</OptionName></OptionsDependedOn><Description>Oracle Application Express Runtime Environment</Description><Name>APEX</Name><OptionGroupOptionSettings></OptionGroupOptionSettings><EngineName>oracle-ee</EngineName></OptionGroupOption>\n \n <OptionGroupOption><MajorEngineVersion>11.2</MajorEngineVersion><OptionsDependedOn><OptionName>APEX</OptionName></OptionsDependedOn><Description>Oracle Application Express Development Environment</Description><Name>APEX-DEV</Name><OptionGroupOptionSettings></OptionGroupOptionSettings><EngineName>oracle-ee</EngineName></OptionGroupOption>\n \n <OptionGroupOption><MajorEngineVersion>11.2</MajorEngineVersion><OptionsDependedOn></OptionsDependedOn><Description>Oracle Advanced Security - Native Network Encryption</Description><Name>NATIVE_NETWORK_ENCRYPTION</Name><OptionGroupOptionSettings><OptionGroupOptionSetting><AllowedValues>ACCEPTED,REJECTED,REQUESTED,REQUIRED</AllowedValues><ApplyType>STATIC</ApplyType><DefaultValue>REQUESTED</DefaultValue><IsModifiable>True</IsModifiable><SettingDescription>Specifies the desired encryption behavior</SettingDescription><SettingName>SQLNET.ENCRYPTION_SERVER</SettingName></OptionGroupOptionSetting><OptionGroupOptionSetting><AllowedValues>ACCEPTED,REJECTED,REQUESTED,REQUIRED</AllowedValues><ApplyType>STATIC</ApplyType><DefaultValue>REQUESTED</DefaultValue><IsModifiable>True</IsModifiable><SettingDescription>Specifies the desired data integrity behavior</SettingDescription><SettingName>SQLNET.CRYPTO_CHECKSUM_SERVER</SettingName></OptionGroupOptionSetting><OptionGroupOptionSetting><AllowedValues>RC4_256,AES256,AES192,3DES168,RC4_128,AES128,3DES112,RC4_56,DES,RC4_40,DES40</AllowedValues><ApplyType>STATIC</ApplyType><DefaultValue>RC4_256,AES256,AES192,3DES168,RC4_128,AES128,3DES112,RC4_56,DES,RC4_40,DES40</DefaultValue><IsModifiable>True</IsModifiable><SettingDescription>Specifies list of encryption algorithms in order of intended use</SettingDescription><SettingName>SQLNET.ENCRYPTION_TYPES_SERVER</SettingName></OptionGroupOptionSetting><OptionGroupOptionSetting><AllowedValues>SHA1,MD5</AllowedValues><ApplyType>STATIC</ApplyType><DefaultValue>SHA1,MD5</DefaultValue><IsModifiable>True</IsModifiable><SettingDescription>Specifies list of checksumming algorithms in order of intended use</SettingDescription><SettingName>SQLNET.CRYPTO_CHECKSUM_TYPES_SERVER</SettingName></OptionGroupOptionSetting></OptionGroupOptionSettings><EngineName>oracle-ee</EngineName></OptionGroupOption>\n \n <OptionGroupOption><MajorEngineVersion>11.2</MajorEngineVersion><DefaultPort>1158</DefaultPort><PortRequired>True</PortRequired><OptionsDependedOn></OptionsDependedOn><Description>Oracle Enterprise Manager (Database Control only)</Description><Name>OEM</Name><OptionGroupOptionSettings></OptionGroupOptionSettings><EngineName>oracle-ee</EngineName></OptionGroupOption>\n \n <OptionGroupOption><MajorEngineVersion>11.2</MajorEngineVersion><OptionsDependedOn></OptionsDependedOn><Description>Oracle Statspack</Description><Name>STATSPACK</Name><OptionGroupOptionSettings></OptionGroupOptionSettings><EngineName>oracle-ee</EngineName></OptionGroupOption>\n \n <OptionGroupOption><MajorEngineVersion>11.2</MajorEngineVersion><Persistent>True</Persistent><OptionsDependedOn></OptionsDependedOn><Permanent>True</Permanent><Description>Oracle Advanced Security - Transparent Data Encryption</Description><Name>TDE</Name><OptionGroupOptionSettings></OptionGroupOptionSettings><EngineName>oracle-ee</EngineName></OptionGroupOption>\n \n <OptionGroupOption><MajorEngineVersion>11.2</MajorEngineVersion><Persistent>True</Persistent><OptionsDependedOn></OptionsDependedOn><Permanent>True</Permanent><Description>Oracle Advanced Security - TDE with HSM</Description><Name>TDE_HSM</Name><OptionGroupOptionSettings></OptionGroupOptionSettings><EngineName>oracle-ee</EngineName></OptionGroupOption>\n \n <OptionGroupOption><MajorEngineVersion>11.2</MajorEngineVersion><Persistent>True</Persistent><OptionsDependedOn></OptionsDependedOn><Permanent>True</Permanent><Description>Change time zone</Description><Name>Timezone</Name><OptionGroupOptionSettings><OptionGroupOptionSetting><AllowedValues>Africa/Cairo,Africa/Casablanca,Africa/Harare,Africa/Monrovia,Africa/Nairobi,Africa/Tripoli,Africa/Windhoek,America/Araguaina,America/Asuncion,America/Bogota,America/Caracas,America/Chihuahua,America/Cuiaba,America/Denver,America/Fortaleza,America/Guatemala,America/Halifax,America/Manaus,America/Matamoros,America/Monterrey,America/Montevideo,America/Phoenix,America/Santiago,America/Tijuana,Asia/Amman,Asia/Ashgabat,Asia/Baghdad,Asia/Baku,Asia/Bangkok,Asia/Beirut,Asia/Calcutta,Asia/Damascus,Asia/Dhaka,Asia/Irkutsk,Asia/Jerusalem,Asia/Kabul,Asia/Karachi,Asia/Kathmandu,Asia/Krasnoyarsk,Asia/Magadan,Asia/Muscat,Asia/Novosibirsk,Asia/Riyadh,Asia/Seoul,Asia/Shanghai,Asia/Singapore,Asia/Taipei,Asia/Tehran,Asia/Tokyo,Asia/Ulaanbaatar,Asia/Vladivostok,Asia/Yakutsk,Asia/Yerevan,Atlantic/Azores,Australia/Adelaide,Australia/Brisbane,Australia/Darwin,Australia/Hobart,Australia/Perth,Australia/Sydney,Brazil/East,Canada/Newfoundland,Canada/Saskatchewan,Europe/Amsterdam,Europe/Athens,Europe/Dublin,Europe/Helsinki,Europe/Istanbul,Europe/Kaliningrad,Europe/Moscow,Europe/Paris,Europe/Prague,Europe/Sarajevo,Pacific/Auckland,Pacific/Fiji,Pacific/Guam,Pacific/Honolulu,Pacific/Samoa,US/Alaska,US/Central,US/Eastern,US/East-Indiana,US/Pacific,UTC</AllowedValues><ApplyType>DYNAMIC</ApplyType><DefaultValue>UTC</DefaultValue><IsModifiable>True</IsModifiable><SettingDescription>Specifies the timezone the user wants to change the system time to</SettingDescription><SettingName>TIME_ZONE</SettingName></OptionGroupOptionSetting></OptionGroupOptionSettings><EngineName>oracle-ee</EngineName></OptionGroupOption>\n \n <OptionGroupOption><MajorEngineVersion>11.2</MajorEngineVersion><OptionsDependedOn></OptionsDependedOn><Description>Oracle XMLDB Repository</Description><Name>XMLDB</Name><OptionGroupOptionSettings></OptionGroupOptionSettings><EngineName>oracle-ee</EngineName></OptionGroupOption>\n \n </OptionGroupOptions>\n </DescribeOptionGroupOptionsResult>\n <ResponseMetadata>\n <RequestId>457f7bb8-9fbf-11e4-9084-5754f80d5144</RequestId>\n </ResponseMetadata>\n</DescribeOptionGroupOptionsResponse>', }, "sqlserver-ee": { "10.50": '<DescribeOptionGroupOptionsResponse xmlns="http://rds.amazonaws.com/doc/2014-09-01/">\n <DescribeOptionGroupOptionsResult>\n <OptionGroupOptions>\n \n <OptionGroupOption><MajorEngineVersion>10.50</MajorEngineVersion><OptionsDependedOn></OptionsDependedOn><Description>SQLServer Database Mirroring</Description><Name>Mirroring</Name><OptionGroupOptionSettings></OptionGroupOptionSettings><EngineName>sqlserver-ee</EngineName></OptionGroupOption>\n \n <OptionGroupOption><MajorEngineVersion>10.50</MajorEngineVersion><Persistent>True</Persistent><OptionsDependedOn></OptionsDependedOn><Description>SQL Server - Transparent Data Encryption</Description><Name>TDE</Name><OptionGroupOptionSettings></OptionGroupOptionSettings><EngineName>sqlserver-ee</EngineName></OptionGroupOption>\n \n </OptionGroupOptions>\n </DescribeOptionGroupOptionsResult>\n <ResponseMetadata>\n <RequestId>457f7bb8-9fbf-11e4-9084-5754f80d5144</RequestId>\n </ResponseMetadata>\n</DescribeOptionGroupOptionsResponse>', "11.00": '<DescribeOptionGroupOptionsResponse xmlns="http://rds.amazonaws.com/doc/2014-09-01/">\n <DescribeOptionGroupOptionsResult>\n <OptionGroupOptions>\n \n <OptionGroupOption><MajorEngineVersion>11.00</MajorEngineVersion><OptionsDependedOn></OptionsDependedOn><Description>SQLServer Database Mirroring</Description><Name>Mirroring</Name><OptionGroupOptionSettings></OptionGroupOptionSettings><EngineName>sqlserver-ee</EngineName></OptionGroupOption>\n \n <OptionGroupOption><MajorEngineVersion>11.00</MajorEngineVersion><Persistent>True</Persistent><OptionsDependedOn></OptionsDependedOn><Description>SQL Server - Transparent Data Encryption</Description><Name>TDE</Name><OptionGroupOptionSettings></OptionGroupOptionSettings><EngineName>sqlserver-ee</EngineName></OptionGroupOption>\n \n </OptionGroupOptions>\n </DescribeOptionGroupOptionsResult>\n <ResponseMetadata>\n <RequestId>457f7bb8-9fbf-11e4-9084-5754f80d5144</RequestId>\n </ResponseMetadata>\n</DescribeOptionGroupOptionsResponse>', "all": '<DescribeOptionGroupOptionsResponse xmlns="http://rds.amazonaws.com/doc/2014-09-01/">\n <DescribeOptionGroupOptionsResult>\n <OptionGroupOptions>\n \n <OptionGroupOption><MajorEngineVersion>10.50</MajorEngineVersion><OptionsDependedOn></OptionsDependedOn><Description>SQLServer Database Mirroring</Description><Name>Mirroring</Name><OptionGroupOptionSettings></OptionGroupOptionSettings><EngineName>sqlserver-ee</EngineName></OptionGroupOption>\n \n <OptionGroupOption><MajorEngineVersion>10.50</MajorEngineVersion><Persistent>True</Persistent><OptionsDependedOn></OptionsDependedOn><Description>SQL Server - Transparent Data Encryption</Description><Name>TDE</Name><OptionGroupOptionSettings></OptionGroupOptionSettings><EngineName>sqlserver-ee</EngineName></OptionGroupOption>\n \n <OptionGroupOption><MajorEngineVersion>11.00</MajorEngineVersion><OptionsDependedOn></OptionsDependedOn><Description>SQLServer Database Mirroring</Description><Name>Mirroring</Name><OptionGroupOptionSettings></OptionGroupOptionSettings><EngineName>sqlserver-ee</EngineName></OptionGroupOption>\n \n <OptionGroupOption><MajorEngineVersion>11.00</MajorEngineVersion><Persistent>True</Persistent><OptionsDependedOn></OptionsDependedOn><Description>SQL Server - Transparent Data Encryption</Description><Name>TDE</Name><OptionGroupOptionSettings></OptionGroupOptionSettings><EngineName>sqlserver-ee</EngineName></OptionGroupOption>\n \n </OptionGroupOptions>\n </DescribeOptionGroupOptionsResult>\n <ResponseMetadata>\n <RequestId>457f7bb8-9fbf-11e4-9084-5754f80d5144</RequestId>\n </ResponseMetadata>\n</DescribeOptionGroupOptionsResponse>', }, } if engine_name not in default_option_group_options: raise RDSClientError( "InvalidParameterValue", f"Invalid DB engine: {engine_name}" ) if ( major_engine_version and major_engine_version not in default_option_group_options[engine_name] ): raise RDSClientError( "InvalidParameterCombination", f"Cannot find major version {major_engine_version} for {engine_name}", ) if major_engine_version: return default_option_group_options[engine_name][major_engine_version] return default_option_group_options[engine_name]["all"] def modify_option_group( self, option_group_name: str, options_to_include: Optional[List[Dict[str, Any]]] = None, options_to_remove: Optional[List[str]] = None, ) -> OptionGroup: if option_group_name not in self.option_groups: raise OptionGroupNotFoundFaultError(option_group_name) if not options_to_include and not options_to_remove: raise RDSClientError( "InvalidParameterValue", "At least one option must be added, modified, or removed.", ) if options_to_remove: self.option_groups[option_group_name].remove_options(options_to_remove) if options_to_include: self.option_groups[option_group_name].add_options(options_to_include) return self.option_groups[option_group_name] def create_db_parameter_group( self, db_parameter_group_kwargs: Dict[str, Any] ) -> DBParameterGroup: db_parameter_group_id = db_parameter_group_kwargs["db_parameter_group_name"] if db_parameter_group_id in self.db_parameter_groups: raise RDSClientError( "DBParameterGroupAlreadyExistsFault", f"A DB parameter group named {db_parameter_group_id} already exists.", ) if not db_parameter_group_kwargs.get("description"): raise RDSClientError( "InvalidParameterValue", "The parameter Description must be provided and must not be blank.", ) if not db_parameter_group_kwargs.get("db_parameter_group_family"): raise RDSClientError( "InvalidParameterValue", "The parameter DBParameterGroupFamily must be provided and must not be blank.", ) db_parameter_group = DBParameterGroup(self, **db_parameter_group_kwargs) self.db_parameter_groups[db_parameter_group_id] = db_parameter_group return db_parameter_group def copy_db_parameter_group( self, source_db_parameter_group_identifier: str, target_db_parameter_group_identifier: str, target_db_parameter_group_description: str, tags: Optional[List[Dict[str, str]]] = None, ) -> DBParameterGroup: if source_db_parameter_group_identifier.startswith("arn:aws:rds:"): source_db_parameter_group_identifier = ( source_db_parameter_group_identifier.split(":")[-1] ) if source_db_parameter_group_identifier not in self.db_parameter_groups: raise DBParameterGroupNotFoundError(source_db_parameter_group_identifier) if target_db_parameter_group_identifier in self.db_parameter_groups: raise DBParameterGroupAlreadyExistsError( target_db_parameter_group_identifier ) source_db_parameter_group = self.db_parameter_groups[ source_db_parameter_group_identifier ] target_db_parameter_group = DBParameterGroup( backend=self, db_parameter_group_name=target_db_parameter_group_identifier, db_parameter_group_family=source_db_parameter_group.family, description=target_db_parameter_group_description, tags=tags, ) self.db_parameter_groups[target_db_parameter_group_identifier] = ( target_db_parameter_group ) iterable_source_parameters = [ {"ParameterName": name, **values} for name, values in source_db_parameter_group.parameters.items() ] self.modify_db_parameter_group( db_parameter_group_name=target_db_parameter_group_identifier, db_parameter_group_parameters=iterable_source_parameters, ) return target_db_parameter_group def describe_db_parameter_groups( self, db_parameter_group_kwargs: Dict[str, Any] ) -> List[DBParameterGroup]: db_parameter_group_list = [] for db_parameter_group in self.db_parameter_groups.values(): if not db_parameter_group_kwargs.get( "db_parameter_group_name" ) or db_parameter_group.name == db_parameter_group_kwargs.get( "db_parameter_group_name" ): db_parameter_group_list.append(db_parameter_group) else: continue return db_parameter_group_list def modify_db_parameter_group( self, db_parameter_group_name: str, db_parameter_group_parameters: Iterable[Dict[str, Any]], ) -> DBParameterGroup: if db_parameter_group_name not in self.db_parameter_groups: raise DBParameterGroupNotFoundError(db_parameter_group_name) db_parameter_group = self.db_parameter_groups[db_parameter_group_name] db_parameter_group.update_parameters(db_parameter_group_parameters) return db_parameter_group def describe_db_cluster_parameters(self) -> List[Dict[str, Any]]: return [] def create_db_cluster(self, kwargs: Dict[str, Any]) -> DBCluster: cluster_id = kwargs["db_cluster_identifier"] cluster = DBCluster(self, **kwargs) self.clusters[cluster_id] = cluster cluster.save_automated_backup() if cluster.global_cluster_identifier: for regional_backend in rds_backends[self.account_id]: if ( cluster.global_cluster_identifier in rds_backends[self.account_id][regional_backend].global_clusters ): global_cluster = rds_backends[self.account_id][ regional_backend ].global_clusters[cluster.global_cluster_identifier] global_cluster.members.append(cluster) if len(global_cluster.members) == 1: # primary cluster setattr(cluster, "is_writer", True) else: # secondary cluster(s) setattr(cluster, "is_writer", False) if cluster.replication_source_identifier: cluster_identifier = cluster.replication_source_identifier original_cluster = find_cluster(cluster_identifier) original_cluster.read_replica_identifiers.append(cluster.db_cluster_arn) return cluster def modify_db_cluster(self, kwargs: Dict[str, Any]) -> DBCluster: cluster_id = kwargs["db_cluster_identifier"] cluster = self.clusters[cluster_id] del self.clusters[cluster_id] if kwargs.get("rotate_master_user_password") and kwargs.get( "apply_immediately" ): cluster.master_user_secret.rotate_secret() elif kwargs.get("rotate_master_user_password") and not kwargs.get( "apply_immediately" ): raise RDSClientError( "InvalidParameterCombination", "You must specify apply immediately when rotating the master user password.", ) if "manage_master_user_password" in kwargs: manage_master_user_password = kwargs.pop("manage_master_user_password") if manage_master_user_password: kms_key_id = kwargs.pop("master_user_secret_kms_key_id", None) cluster.master_user_secret = MasterUserSecret(cluster, kms_key_id) else: cluster.master_user_secret.delete_secret() del cluster.master_user_secret kwargs["db_cluster_identifier"] = kwargs.pop("new_db_cluster_identifier", None) for k, v in kwargs.items(): if k == "db_cluster_parameter_group_name": k = "parameter_group" if v is not None: setattr(cluster, k, v) cwl_exports = kwargs.get("enable_cloudwatch_logs_exports") or {} for exp in cwl_exports.get("DisableLogTypes", []): cluster.enabled_cloudwatch_logs_exports.remove(exp) cluster.enabled_cloudwatch_logs_exports.extend( cwl_exports.get("EnableLogTypes", []) ) cluster_id = kwargs.get("new_db_cluster_identifier", cluster_id) self.clusters[cluster_id] = cluster initial_state = copy.copy(cluster) # Return status=creating # Already set the final status in the background cluster.status = "available" return initial_state def promote_read_replica_db_cluster(self, db_cluster_identifier: str) -> DBCluster: cluster = self.clusters[db_cluster_identifier] source_cluster = find_cluster(cluster.replication_source_identifier) # type: ignore source_cluster.read_replica_identifiers.remove(cluster.db_cluster_arn) cluster.replication_source_identifier = None return cluster def create_auto_cluster_snapshot( self, db_cluster_identifier: str, db_snapshot_identifier: str ) -> DBClusterSnapshot: return self.create_db_cluster_snapshot( db_cluster_identifier, db_snapshot_identifier, snapshot_type="automated" ) def create_db_cluster_snapshot( self, db_cluster_identifier: str, db_cluster_snapshot_identifier: str, snapshot_type: str = "manual", tags: Optional[List[Dict[str, str]]] = None, ) -> DBClusterSnapshot: if db_cluster_snapshot_identifier in self.cluster_snapshots: raise DBClusterSnapshotAlreadyExistsError(db_cluster_snapshot_identifier) if db_cluster_identifier not in self.clusters: raise DBClusterNotFoundError(db_cluster_identifier) if snapshot_type == "manual" and len(self.cluster_snapshots) >= int( os.environ.get("MOTO_RDS_SNAPSHOT_LIMIT", "100") ): raise SnapshotQuotaExceededFault() cluster = self.clusters[db_cluster_identifier] snapshot = DBClusterSnapshot( self, cluster, db_cluster_snapshot_identifier, snapshot_type, tags ) self.cluster_snapshots[db_cluster_snapshot_identifier] = snapshot return snapshot def copy_db_cluster_snapshot( self, source_db_cluster_snapshot_identifier: str, target_db_cluster_snapshot_identifier: str, kms_key_id: Optional[str] = None, copy_tags: bool = False, tags: Optional[List[Dict[str, str]]] = None, **_: Any, ) -> DBClusterSnapshot: if target_db_cluster_snapshot_identifier in self.cluster_snapshots: raise DBClusterSnapshotAlreadyExistsError( target_db_cluster_snapshot_identifier ) if len(self.cluster_snapshots) >= int( os.environ.get("MOTO_RDS_SNAPSHOT_LIMIT", "100") ): raise SnapshotQuotaExceededFault() if kms_key_id is not None: kms_key_id = self._validate_kms_key(kms_key_id) source_db_cluster_snapshot = self.get_db_cluster_snapshot( source_db_cluster_snapshot_identifier ) # If tags are supplied, copy_tags is ignored (verified against real AWS backend 2025-02-12). if copy_tags and not tags: tags = source_db_cluster_snapshot.tags or [] target_db_cluster_snapshot = DBClusterSnapshot( self, source_db_cluster_snapshot.cluster, target_db_cluster_snapshot_identifier, tags=tags, kms_key_id=kms_key_id, ) self.cluster_snapshots[target_db_cluster_snapshot_identifier] = ( target_db_cluster_snapshot ) return target_db_cluster_snapshot def delete_db_cluster_snapshot( self, db_snapshot_identifier: str ) -> DBClusterSnapshot: if db_snapshot_identifier not in self.cluster_snapshots: raise DBClusterSnapshotNotFoundError(db_snapshot_identifier) return self.cluster_snapshots.pop(db_snapshot_identifier) def describe_db_clusters( self, db_cluster_identifier: Optional[str] = None, filters: Any = None ) -> List[DBCluster]: clusters = self.clusters if db_cluster_identifier: filters = merge_filters(filters, {"db-cluster-id": [db_cluster_identifier]}) if filters: clusters = self._filter_resources(clusters, filters, DBCluster) if db_cluster_identifier and not clusters: raise DBClusterNotFoundError(db_cluster_identifier) return list(clusters.values()) # type: ignore def describe_db_cluster_snapshots( self, db_cluster_identifier: Optional[str], db_snapshot_identifier: str, snapshot_type: Optional[str] = None, filters: Any = None, ) -> List[DBClusterSnapshot]: if snapshot_type == "shared": return self.get_shared_db_cluster_snapshots() # type: ignore[return-value] snapshots = self.cluster_snapshots if db_cluster_identifier: filters = merge_filters(filters, {"db-cluster-id": [db_cluster_identifier]}) if db_snapshot_identifier: filters = merge_filters( filters, {"db-cluster-snapshot-id": [db_snapshot_identifier]} ) snapshot_types = ( ["automated", "manual"] if ( snapshot_type is None and (filters is not None and "snapshot-type" not in filters) ) else [snapshot_type] if snapshot_type is not None else [] ) if snapshot_types: filters = merge_filters(filters, {"snapshot-type": snapshot_types}) if filters: snapshots = self._filter_resources(snapshots, filters, DBClusterSnapshot) if db_snapshot_identifier and not snapshots and not db_cluster_identifier: raise DBClusterSnapshotNotFoundError(db_snapshot_identifier) return list(snapshots.values()) def delete_db_cluster( self, cluster_identifier: str, snapshot_name: Optional[str] = None ) -> DBCluster: if cluster_identifier in self.clusters: cluster = self.clusters[cluster_identifier] if cluster.deletion_protection: raise InvalidParameterCombination( "Cannot delete protected Cluster, please disable deletion protection and try again." ) if cluster.members: raise DBClusterToBeDeletedHasActiveMembers() global_id = cluster.global_cluster_identifier or "" if global_id in self.global_clusters: self.remove_from_global_cluster(global_id, cluster_identifier) if cluster.manage_master_user_password: cluster.master_user_secret.delete_secret() if snapshot_name: self.create_db_cluster_snapshot(cluster_identifier, snapshot_name) return self.clusters.pop(cluster_identifier) raise DBClusterNotFoundError(cluster_identifier) def start_db_cluster(self, cluster_identifier: str) -> DBCluster: if cluster_identifier not in self.clusters: raise DBClusterNotFoundError(cluster_identifier) cluster = self.clusters[cluster_identifier] if cluster.status != "stopped": raise InvalidDBClusterStateFault( f"DbCluster {cluster_identifier} is in {cluster.status} state " "but expected it to be one of " "stopped,inaccessible-encryption-credentials-recoverable." ) temp_state = copy.deepcopy(cluster) temp_state.status = "started" cluster.status = "available" # This is the final status - already setting it in the background return temp_state def restore_db_cluster_from_snapshot( self, from_snapshot_id: str, overrides: Dict[str, Any] ) -> DBCluster: snapshot = self.describe_db_cluster_snapshots( db_cluster_identifier=None, db_snapshot_identifier=from_snapshot_id )[0] original_cluster = snapshot.cluster new_cluster_props = copy.copy(original_cluster.get_cfg()) for key, value in overrides.items(): if value: new_cluster_props[key] = value return self.create_db_cluster(new_cluster_props) def stop_db_cluster(self, cluster_identifier: str) -> DBCluster: if cluster_identifier not in self.clusters: raise DBClusterNotFoundError(cluster_identifier) cluster = self.clusters[cluster_identifier] if cluster.status not in ["available"]: raise InvalidDBClusterStateFault( f"DbCluster {cluster_identifier} is in {cluster.status} state " "but expected it to be one of available." ) previous_state = copy.deepcopy(cluster) cluster.status = "stopped" return previous_state def start_export_task(self, kwargs: Dict[str, Any]) -> ExportTask: export_task_id = kwargs["export_task_identifier"] source_arn = kwargs["source_arn"] snapshot_id = source_arn.split(":")[-1] snapshot_type = source_arn.split(":")[-2] if export_task_id in self.export_tasks: raise ExportTaskAlreadyExistsError(export_task_id) if snapshot_type == "snapshot" and snapshot_id not in self.database_snapshots: raise DBSnapshotNotFoundFault(snapshot_id) elif ( snapshot_type == "cluster-snapshot" and snapshot_id not in self.cluster_snapshots ): raise DBClusterSnapshotNotFoundError(snapshot_id) if snapshot_type == "snapshot": snapshot: Union[DBSnapshot, DBClusterSnapshot] = self.database_snapshots[ snapshot_id ] else: snapshot = self.cluster_snapshots[snapshot_id] if snapshot.status not in ["available"]: raise InvalidExportSourceStateError(snapshot.status) export_task = ExportTask(self, snapshot, kwargs) self.export_tasks[export_task_id] = export_task return export_task def cancel_export_task(self, export_task_identifier: str) -> ExportTask: if export_task_identifier in self.export_tasks: export_task = self.export_tasks[export_task_identifier] export_task.status = "canceled" self.export_tasks[export_task_identifier] = export_task return export_task raise ExportTaskNotFoundError(export_task_identifier) def describe_export_tasks( self, export_task_identifier: str ) -> Iterable[ExportTask]: if export_task_identifier: if export_task_identifier in self.export_tasks: return [self.export_tasks[export_task_identifier]] else: raise ExportTaskNotFoundError(export_task_identifier) return self.export_tasks.values() def create_event_subscription(self, kwargs: Any) -> EventSubscription: subscription_name = kwargs["subscription_name"] if subscription_name in self.event_subscriptions: raise SubscriptionAlreadyExistError(subscription_name) subscription = EventSubscription(self, **kwargs) self.event_subscriptions[subscription_name] = subscription return subscription def delete_event_subscription(self, subscription_name: str) -> EventSubscription: if subscription_name in self.event_subscriptions: return self.event_subscriptions.pop(subscription_name) raise SubscriptionNotFoundError(subscription_name) def describe_event_subscriptions( self, subscription_name: str ) -> Iterable[EventSubscription]: if subscription_name: if subscription_name in self.event_subscriptions: return [self.event_subscriptions[subscription_name]] else: raise SubscriptionNotFoundError(subscription_name) return self.event_subscriptions.values() def _find_resource(self, resource_type: str, resource_name: str) -> Any: for resource_class, resources in self.resource_map.items(): if resource_type == getattr(resource_class, "resource_type", ""): if resource_name in resources: # type: ignore return resources[resource_name] # type: ignore # The resource_name is the last part of the ARN # Usually that's the name - but for DBProxies, the last part of the ARN is a random identifier # So we can't just use the dict-keys - we have to manually check the ARN if resource_type == "db-proxy": for resource in self.db_proxies.values(): if resource.arn.endswith(resource_name): return resource def _get_resource_for_tagging(self, arn: str) -> Any: if self.arn_regex.match(arn): arn_breakdown = arn.split(":") resource_type = arn_breakdown[len(arn_breakdown) - 2] resource_name = arn_breakdown[len(arn_breakdown) - 1] # FIXME: HACK for automated snapshots if resource_type == "rds": resource_type = arn_breakdown[-3] resource_name = arn_breakdown[-2] + ":" + arn_breakdown[-1] resource = self._find_resource(resource_type, resource_name) return resource raise RDSClientError("InvalidParameterValue", f"Invalid resource name: {arn}") def list_tags_for_resource(self, arn: str) -> List[Dict[str, str]]: resource = self._get_resource_for_tagging(arn) if resource: return resource.get_tags() return [] def remove_tags_from_resource(self, arn: str, tag_keys: List[str]) -> None: resource = self._get_resource_for_tagging(arn) if resource: resource.remove_tags(tag_keys) def add_tags_to_resource( # type: ignore[return] self, arn: str, tags: List[Dict[str, str]] ) -> List[Dict[str, str]]: resource = self._get_resource_for_tagging(arn) if resource: return resource.add_tags(tags) return [] @staticmethod def _filter_resources(resources: Any, filters: Any, resource_class: Any) -> Any: # type: ignore[misc] try: filter_defs = resource_class.SUPPORTED_FILTERS validate_filters(filters, filter_defs) return apply_filter(resources, filters, filter_defs) except KeyError as e: # https://stackoverflow.com/questions/24998968/why-does-strkeyerror-add-extra-quotes raise InvalidParameterValue(e.args[0]) except ValueError as e: raise InvalidParameterCombination(str(e)) @staticmethod def _validate_db_identifier(db_identifier: str) -> None: # https://docs.aws.amazon.com/AmazonRDS/latest/APIReference/API_CreateDBInstance.html # Constraints: # # Must contain from 1 to 63 letters, numbers, or hyphens. # # First character must be a letter. # # Can't end with a hyphen or contain two consecutive hyphens. if ( re.match( "^(?!.*--)([a-zA-Z]?[a-zA-Z0-9-]{0,61}[a-zA-Z0-9])$", db_identifier ) and db_identifier[0].isalpha() ): return raise InvalidDBInstanceIdentifier @staticmethod def validate_db_snapshot_identifier( db_snapshot_identifier: str, parameter_name: str ) -> None: # https://docs.aws.amazon.com/AmazonRDS/latest/APIReference/API_CreateDBSnapshot.html # Constraints: # # Must contain from 1 to 255 letters, numbers, or hyphens. # # First character must be a letter. # # Can't end with a hyphen or contain two consecutive hyphens. if ( re.match( "^(?!.*--)([a-zA-Z]?[a-zA-Z0-9-]{0,253}[a-zA-Z0-9])$", db_snapshot_identifier, ) and db_snapshot_identifier[0].isalpha() ): return raise InvalidDBSnapshotIdentifier(db_snapshot_identifier, parameter_name) def describe_orderable_db_instance_options( self, engine: str, engine_version: str ) -> List[Dict[str, Any]]: """ Only the Aurora-Postgresql and Neptune-engine is currently implemented """ if engine in ["aurora-postgresql", "neptune"]: if engine_version: return [ option for option in self.db_cluster_options(engine) if option["EngineVersion"] == engine_version ] return self.db_cluster_options(engine) return [] def create_db_cluster_parameter_group( self, group_name: str, family: str, description: str, ) -> DBClusterParameterGroup: group = DBClusterParameterGroup( backend=self, name=group_name, family=family, description=description, ) self.db_cluster_parameter_groups[group_name] = group return group def describe_db_cluster_parameter_groups( self, group_name: str ) -> List[DBClusterParameterGroup]: if group_name is not None: if group_name not in self.db_cluster_parameter_groups: raise DBClusterParameterGroupNotFoundError(group_name) return [self.db_cluster_parameter_groups[group_name]] return list(self.db_cluster_parameter_groups.values()) def delete_db_cluster_parameter_group(self, group_name: str) -> None: self.db_cluster_parameter_groups.pop(group_name) def create_global_cluster( self, global_cluster_identifier: str, source_db_cluster_identifier: Optional[str], engine: Optional[str], engine_version: Optional[str], storage_encrypted: Optional[bool], deletion_protection: Optional[bool], ) -> GlobalCluster: source_cluster = None if source_db_cluster_identifier is not None: # validate our source cluster exists if not re.match(ARN_PARTITION_REGEX + ":rds", source_db_cluster_identifier): raise InvalidParameterValue("Malformed db cluster arn dbci") source_cluster = self.describe_db_clusters( db_cluster_identifier=source_db_cluster_identifier )[0] # We should not specify an engine at the same time, as we'll take it from the source cluster if engine is not None: raise InvalidParameterCombination( "When creating global cluster from existing db cluster, value for engineName should not be specified since it will be inherited from source cluster" ) engine = source_cluster.engine engine_version = source_cluster.engine_version elif engine is None: raise InvalidParameterValue( "When creating standalone global cluster, value for engineName should be specified" ) global_cluster = GlobalCluster( backend=self, global_cluster_identifier=global_cluster_identifier, engine=engine, # type: ignore engine_version=engine_version, storage_encrypted=storage_encrypted, deletion_protection=deletion_protection, ) self.global_clusters[global_cluster_identifier] = global_cluster if source_cluster is not None: source_cluster.global_cluster_identifier = global_cluster.global_cluster_arn global_cluster.members.append(source_cluster) return global_cluster def describe_global_clusters(self) -> List[GlobalCluster]: return list(self.global_clusters.values()) def delete_global_cluster(self, global_cluster_identifier: str) -> GlobalCluster: global_cluster = self.global_clusters[global_cluster_identifier] if global_cluster.members: raise InvalidGlobalClusterStateFault(global_cluster.global_cluster_arn) return self.global_clusters.pop(global_cluster_identifier) def remove_from_global_cluster( self, global_cluster_identifier: str, db_cluster_identifier: str ) -> Optional[GlobalCluster]: try: global_cluster = self.global_clusters[global_cluster_identifier] cluster = self.describe_db_clusters( db_cluster_identifier=db_cluster_identifier )[0] global_cluster.members.remove(cluster) return global_cluster except: # noqa: E722 Do not use bare except pass return None def describe_db_snapshot_attributes( self, db_snapshot_identifier: str ) -> Dict[str, List[str]]: snapshot = self.describe_db_snapshots( db_instance_identifier=None, db_snapshot_identifier=db_snapshot_identifier )[0] return snapshot.attributes def modify_db_snapshot_attribute( self, db_snapshot_identifier: str, attribute_name: str, values_to_add: Optional[List[str]] = None, values_to_remove: Optional[List[str]] = None, ) -> Dict[str, List[str]]: snapshot = self.describe_db_snapshots( db_instance_identifier=None, db_snapshot_identifier=db_snapshot_identifier )[0] if db_snapshot_identifier.startswith("rds:"): # automated snapshot raise InvalidParameterValue( "The parameter DBSnapshotIdentifier is not a valid identifier." ) snapshot.modify_attribute(attribute_name, values_to_add, values_to_remove) return snapshot.attributes def describe_db_cluster_snapshot_attributes( self, db_cluster_snapshot_identifier: str ) -> Dict[str, List[str]]: snapshot = self.describe_db_cluster_snapshots( db_cluster_identifier=None, db_snapshot_identifier=db_cluster_snapshot_identifier, )[0] return snapshot.attributes def modify_db_cluster_snapshot_attribute( self, db_cluster_snapshot_identifier: str, attribute_name: str, values_to_add: Optional[List[str]] = None, values_to_remove: Optional[List[str]] = None, ) -> Dict[str, List[str]]: snapshot = self.describe_db_cluster_snapshots( db_cluster_identifier=None, db_snapshot_identifier=db_cluster_snapshot_identifier, )[0] if snapshot.snapshot_type != "manual": raise InvalidDBClusterSnapshotStateFault( "automated snapshots cannot be modified." ) snapshot.modify_attribute(attribute_name, values_to_add, values_to_remove) return snapshot.attributes def create_db_proxy( self, db_proxy_name: str, engine_family: str, auth: List[Dict[str, str]], role_arn: str, vpc_subnet_ids: List[str], vpc_security_group_ids: Optional[List[str]], require_tls: Optional[bool], idle_client_timeout: Optional[int], debug_logging: Optional[bool], tags: Optional[List[Dict[str, str]]], ) -> DBProxy: self._validate_db_identifier(db_proxy_name) if db_proxy_name in self.db_proxies: raise DBProxyAlreadyExistsFault(db_proxy_name) if len(self.db_proxies) >= int(os.environ.get("MOTO_RDS_PROXY_LIMIT", "100")): raise DBProxyQuotaExceededFault() db_proxy = DBProxy( self, db_proxy_name, engine_family, auth, role_arn, vpc_subnet_ids, vpc_security_group_ids, require_tls, idle_client_timeout, debug_logging, tags, ) self.db_proxies[db_proxy_name] = db_proxy return db_proxy def describe_db_proxies( self, db_proxy_name: Optional[str], filters: Optional[List[Dict[str, Any]]] = None, ) -> List[DBProxy]: """ The filters-argument is not yet supported """ db_proxies = list(self.db_proxies.values()) if db_proxy_name and db_proxy_name in self.db_proxies.keys(): db_proxies = [self.db_proxies[db_proxy_name]] if db_proxy_name and db_proxy_name not in self.db_proxies.keys(): raise DBProxyNotFoundFault(db_proxy_name) return db_proxies def deregister_db_proxy_targets( self, db_proxy_name: str, target_group_name: str, db_cluster_identifiers: List[str], db_instance_identifiers: List[str], ) -> None: db_proxy = self.db_proxies[db_proxy_name] target_group = db_proxy.proxy_target_groups[target_group_name or "default"] target_group.targets = [ t for t in target_group.targets if t.rds_resource_id not in db_cluster_identifiers and t.rds_resource_id not in db_instance_identifiers ] def register_db_proxy_targets( self, db_proxy_name: str, target_group_name: str, db_cluster_identifiers: List[str], db_instance_identifiers: List[str], ) -> List[DBProxyTarget]: db_proxy = self.db_proxies[db_proxy_name] target_group = db_proxy.proxy_target_groups[target_group_name or "default"] new_targets = [] for cluster_id in db_cluster_identifiers: cluster = self.clusters[cluster_id] target = DBProxyTarget( backend=self, resource_id=cluster_id, endpoint=cluster.endpoint, type="TRACKED_CLUSTER", ) new_targets.append(target) for instance_id in db_instance_identifiers: target = DBProxyTarget( backend=self, resource_id=instance_id, endpoint=None, type="RDS_INSTANCE", ) new_targets.append(target) target_group.targets.extend(new_targets) return new_targets def delete_db_proxy(self, proxy_name: str) -> DBProxy: return self.db_proxies.pop(proxy_name) def describe_db_proxy_targets(self, proxy_name: str) -> List[DBProxyTarget]: proxy = self.db_proxies[proxy_name] target_group = proxy.proxy_target_groups["default"] return target_group.targets def describe_db_proxy_target_groups( self, proxy_name: str ) -> List[DBProxyTargetGroup]: proxy = self.db_proxies[proxy_name] return list(proxy.proxy_target_groups.values()) def modify_db_proxy_target_group( self, proxy_name: str, config: Dict[str, Any] ) -> DBProxyTargetGroup: proxy = self.db_proxies[proxy_name] target_group = proxy.proxy_target_groups["default"] if max_connections := config.get("MaxConnectionsPercent"): target_group.max_connections = max_connections if max_idle := config.get("MaxIdleConnectionsPercent"): target_group.max_idle_connections = max_idle else: target_group.max_idle_connections = math.floor( int(target_group.max_connections) / 2 ) target_group.borrow_timeout = config.get( "ConnectionBorrowTimeout", target_group.borrow_timeout ) if "SessionPinningFilters" in config: target_group.session_pinning_filters = config["SessionPinningFilters"] return target_group def describe_db_instance_automated_backups( self, db_instance_identifier: Optional[str] = None, **_: Any, ) -> List[DBInstanceAutomatedBackup]: snapshots = list(self.database_snapshots.values()) if db_instance_identifier is not None: snapshots = [ snap for snap in self.database_snapshots.values() if snap.db_instance_identifier == db_instance_identifier ] snapshots_grouped = defaultdict(list) for snapshot in snapshots: if snapshot.snapshot_type == "automated": snapshots_grouped[snapshot.db_instance_identifier].append(snapshot) return [ DBInstanceAutomatedBackup(self, k, v) for k, v in snapshots_grouped.items() ] def add_event(self, event_type: str, resource: ResourceWithEvents) -> None: event = Event(event_type, resource) self.events.append(event) def describe_events( self, source_identifier: Optional[str] = None, source_type: Optional[str] = None, **_: Any, ) -> List[Event]: if source_identifier is not None and source_type is None: raise InvalidParameterCombination( "Cannot specify source identifier without source type" ) events = self.events if source_identifier is not None: events = [e for e in events if e.source_identifier == source_identifier] if source_type is not None: events = [e for e in events if e.source_type == source_type] return events def describe_db_log_files(self, db_instance_identifier: str) -> List[DBLogFile]: if db_instance_identifier not in self.databases: raise DBInstanceNotFoundError(db_instance_identifier) database = self.databases[db_instance_identifier] return database.log_file_manager.files class OptionGroup(RDSBaseModel): resource_type = "og" def __init__( self, backend: RDSBackend, option_group_name: str, engine_name: str, major_engine_version: str, option_group_description: Optional[str] = None, tags: Optional[List[Dict[str, str]]] = None, ): super().__init__(backend) self.engine_name = engine_name self.major_engine_version = major_engine_version self.description = option_group_description self.name = option_group_name self.vpc_and_non_vpc_instance_memberships = False self._options: Dict[str, Any] = {} self.vpcId = "null" self.tags = tags or [] @property def resource_id(self) -> str: return self.name @property def options(self) -> List[Dict[str, Any]]: # type: ignore[misc] return [ { "OptionName": name, "OptionSettings": [ { "Name": setting.get("Name"), "Value": setting.get("Value"), } for setting in option_settings ], } for name, option_settings in self._options.items() ] def remove_options(self, options_to_remove: List[str]) -> None: for option in options_to_remove: self._options.pop(option, None) def add_options(self, options_to_add: List[Dict[str, Any]]) -> None: for option in options_to_add: self._options[option["OptionName"]] = option.get("OptionSettings", {}) class DBParameterGroup(CloudFormationModel, RDSBaseModel): resource_type = "pg" def __init__( self, backend: RDSBackend, db_parameter_group_name: str, description: str, db_parameter_group_family: Optional[str], tags: Optional[List[Dict[str, str]]] = None, ): super().__init__(backend) self.name = db_parameter_group_name self.description = description self.family = db_parameter_group_family self.tags = tags or [] self.parameters: Dict[str, Any] = defaultdict(dict) @property def resource_id(self) -> str: return self.name def update_parameters(self, new_parameters: Iterable[Dict[str, Any]]) -> None: for new_parameter in new_parameters: parameter = self.parameters[new_parameter["ParameterName"]] parameter.update(new_parameter) def delete(self, account_id: str, region_name: str) -> None: backend = rds_backends[account_id][region_name] backend.delete_db_parameter_group(self.name) # type: ignore[arg-type] @staticmethod def cloudformation_name_type() -> str: return "" @staticmethod def cloudformation_type() -> str: # https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-rds-dbparametergroup.html return "AWS::RDS::DBParameterGroup" @classmethod def create_from_cloudformation_json( # type: ignore[misc] cls, resource_name: str, cloudformation_json: Any, account_id: str, region_name: str, **kwargs: Any, ) -> DBParameterGroup: properties = cloudformation_json["Properties"] db_parameter_group_kwargs = { "description": properties["Description"], "db_parameter_group_family": properties["Family"], "db_parameter_group_name": resource_name.lower(), "tags": properties.get("Tags"), } db_parameter_group_parameters = [] for db_parameter, db_parameter_value in properties.get( "Parameters", {} ).items(): db_parameter_group_parameters.append( {"ParameterName": db_parameter, "ParameterValue": db_parameter_value} ) rds_backend = rds_backends[account_id][region_name] db_parameter_group = rds_backend.create_db_parameter_group( db_parameter_group_kwargs ) db_parameter_group.update_parameters(db_parameter_group_parameters) return db_parameter_group class DBClusterParameterGroup(CloudFormationModel, RDSBaseModel): resource_type = "cluster-pg" def __init__(self, backend: RDSBackend, name: str, description: str, family: str): super().__init__(backend) self.name = name self.description = description self.db_parameter_group_family = family self.parameters: Dict[str, Any] = defaultdict(dict) @property def resource_id(self) -> str: return self.name class Event(XFormedAttributeAccessMixin): EVENT_MAP = { "DB_INSTANCE_BACKUP_START": { "Categories": ["backup"], "Message": "Backing up DB instance", }, "DB_INSTANCE_BACKUP_FINISH": { "Categories": ["backup"], "Message": "Finished DB instance backup", }, "DB_INSTANCE_CREATE": { "Categories": ["creation"], "Message": "DB instance created", }, "DB_INSTANCE_RESET_MASTER_CREDENTIALS": { "Categories": ["configuration change"], "Message": "Reset master credentials", }, "DB_SNAPSHOT_CREATE_AUTOMATED_START": { "Categories": ["creation"], "Message": "Creating automated snapshot", }, "DB_SNAPSHOT_CREATE_AUTOMATED_FINISH": { "Categories": ["creation"], "Message": "Automated snapshot created", }, "DB_SNAPSHOT_CREATE_MANUAL_START": { "Categories": ["creation"], "Message": "Creating manual snapshot", }, "DB_SNAPSHOT_CREATE_MANUAL_FINISH": { "Categories": ["creation"], "Message": "Manual snapshot created", }, } def __init__(self, event_type: str, resource: ResourceWithEvents) -> None: event_metadata = self.EVENT_MAP[event_type] self.source_identifier = resource.resource_id self.source_type = resource.event_source_type self.message = event_metadata["Message"] self.event_categories = event_metadata["Categories"] self.source_arn = resource.arn self.date = utcnow() rds_backends = BackendDict(RDSBackend, "rds")
Memory