from typing import Optional from moto.core.exceptions import ServiceException class RDSClientError(ServiceException): def __init__(self, code: str, message: str): super().__init__(message) self.code = code self.message = message class DBInstanceNotFoundError(RDSClientError): def __init__(self, database_identifier: str): super().__init__( "DBInstanceNotFound", f"DBInstance {database_identifier} not found." ) class DBInstanceAlreadyExists(RDSClientError): def __init__(self) -> None: super().__init__("DBInstanceAlreadyExists", "DB instance already exists") class DBSnapshotNotFoundFault(RDSClientError): def __init__(self, snapshot_identifier: str): super().__init__( "DBSnapshotNotFoundFault", f"DBSnapshot {snapshot_identifier} not found." ) class DBSecurityGroupNotFoundError(RDSClientError): def __init__(self, security_group_name: str): super().__init__( "DBSecurityGroupNotFound", f"Security Group {security_group_name} not found.", ) class DBSubnetGroupNotFoundError(RDSClientError): def __init__(self, subnet_group_name: str): super().__init__( "DBSubnetGroupNotFoundFault", f"Subnet Group {subnet_group_name} not found." ) class DBParameterGroupNotFoundError(RDSClientError): def __init__(self, db_parameter_group_name: str): super().__init__( "DBParameterGroupNotFoundFault", f"DB Parameter Group {db_parameter_group_name} not found.", ) class DBParameterGroupAlreadyExistsError(RDSClientError): def __init__(self, db_parameter_group_name: str): super().__init__( "DBParameterGroupAlreadyExistsFault", f"DB Parameter Group {db_parameter_group_name} already exists.", ) class DBClusterParameterGroupNotFoundError(RDSClientError): def __init__(self, group_name: str): super().__init__( "DBParameterGroupNotFound", f"DBClusterParameterGroup not found: {group_name}", ) class OptionGroupNotFoundFaultError(RDSClientError): def __init__(self, option_group_name: str): super().__init__( "OptionGroupNotFoundFault", f"Specified OptionGroupName: {option_group_name} not found.", ) class InvalidDBClusterStateFaultError(RDSClientError): def __init__(self, database_identifier: str): super().__init__( "InvalidDBClusterStateFault", f"Invalid DB type, when trying to perform StopDBInstance on {database_identifier}e. See AWS RDS documentation on rds.stop_db_instance", ) class DBClusterToBeDeletedHasActiveMembers(RDSClientError): def __init__(self) -> None: super().__init__( "InvalidDBClusterStateFault", "Cluster cannot be deleted, it still contains DB instances in non-deleting state.", ) class InvalidDBInstanceStateError(RDSClientError): def __init__(self, database_identifier: str, istate: str): estate = ( "in available state" if istate == "stop" else "stopped, it cannot be started" ) super().__init__( "InvalidDBInstanceState", f"Instance {database_identifier} is not {estate}." ) class SnapshotQuotaExceededFault(RDSClientError): # This is used for both DBSnapshots and DBClusterSnapshots def __init__(self) -> None: super().__init__( "SnapshotQuotaExceeded", "The request cannot be processed because it would exceed the maximum number of snapshots.", ) class SharedSnapshotQuotaExceeded(RDSClientError): def __init__(self) -> None: super().__init__( "SharedSnapshotQuotaExceeded", "The request cannot be processed because it would exceed the maximum number of snapshots.", ) class KMSKeyNotAccessibleFault(RDSClientError): fmt = "Specified KMS key [{key_id}] does not exist, is not enabled or you do not have permissions to access it." def __init__(self, key_id: str) -> None: super().__init__( "KMSKeyNotAccessibleFault", f"Specified KMS key [{key_id}] does not exist, is not enabled or you do not have permissions to access it.", ) class InvalidDBClusterSnapshotStateFault(RDSClientError): def __init__(self, message: str): super().__init__("InvalidDBClusterSnapshotStateFault", message) class DBSnapshotAlreadyExistsError(RDSClientError): def __init__(self, database_snapshot_identifier: str): super().__init__( "DBSnapshotAlreadyExists", f"Cannot create the snapshot because a snapshot with the identifier {database_snapshot_identifier} already exists.", ) class InvalidParameterValue(RDSClientError): def __init__(self, message: str): super().__init__("InvalidParameterValue", message) class InvalidParameterCombination(RDSClientError): def __init__(self, message: str): super().__init__("InvalidParameterCombination", message) class InvalidDBClusterStateFault(RDSClientError): def __init__(self, message: str): super().__init__("InvalidDBClusterStateFault", message) class DBClusterNotFoundError(RDSClientError): def __init__(self, cluster_identifier: str, message: Optional[str] = None): if message is None: message = f"DBCluster {cluster_identifier} not found." super().__init__("DBClusterNotFoundFault", message) class DBClusterSnapshotNotFoundError(RDSClientError): def __init__(self, snapshot_identifier: str): super().__init__( "DBClusterSnapshotNotFoundFault", f"DBClusterSnapshot {snapshot_identifier} not found.", ) class DBClusterSnapshotAlreadyExistsError(RDSClientError): def __init__(self, database_snapshot_identifier: str): super().__init__( "DBClusterSnapshotAlreadyExistsFault", f"Cannot create the snapshot because a snapshot with the identifier {database_snapshot_identifier} already exists.", ) class ExportTaskAlreadyExistsError(RDSClientError): def __init__(self, export_task_identifier: str): super().__init__( "ExportTaskAlreadyExists", f"Cannot start export task because a task with the identifier {export_task_identifier} already exists.", ) class ExportTaskNotFoundError(RDSClientError): def __init__(self, export_task_identifier: str): super().__init__( "ExportTaskNotFound", f"Cannot cancel export task because a task with the identifier {export_task_identifier} is not exist.", ) class InvalidExportSourceStateError(RDSClientError): def __init__(self, status: str): super().__init__( "InvalidExportSourceStateFault", f"Export source should be 'available' but current status is {status}.", ) class SubscriptionAlreadyExistError(RDSClientError): def __init__(self, subscription_name: str): super().__init__( "SubscriptionAlreadyExist", f"Subscription {subscription_name} already exists.", ) class SubscriptionNotFoundError(RDSClientError): def __init__(self, subscription_name: str): super().__init__( "SubscriptionNotFound", f"Subscription {subscription_name} not found." ) class InvalidGlobalClusterStateFault(RDSClientError): def __init__(self, arn: str): super().__init__( "InvalidGlobalClusterStateFault", f"Global Cluster {arn} is not empty" ) class InvalidDBInstanceIdentifier(InvalidParameterValue): def __init__(self) -> None: super().__init__( "The parameter DBInstanceIdentifier is not a valid identifier. " "Identifiers must begin with a letter; must contain only ASCII letters, digits, and hyphens; " "and must not end with a hyphen or contain two consecutive hyphens." ) class InvalidDBSnapshotIdentifier(InvalidParameterValue): def __init__(self, snapshot_identifier: str, parameter_name: str) -> None: if snapshot_identifier == "": exception_text = f"The parameter {parameter_name} must be provided and must not be blank." elif not snapshot_identifier[0].isalpha(): # On AWS, this error message seems to be triggered when the first character is invalid. # The two spaces before the snapshot_identifier are what AWS produces! exception_text = f"Invalid snapshot identifier: {snapshot_identifier}" else: exception_text = ( f"The parameter {parameter_name} is not a valid identifier. " "Identifiers must begin with a letter; must contain only ASCII letters, digits, and hyphens; " "and must not end with a hyphen or contain two consecutive hyphens." ) super().__init__(exception_text) class InvalidDBInstanceEngine(InvalidParameterCombination): def __init__(self, instance_engine: str, cluster_engine: str) -> None: super().__init__( f"The engine name requested for your DB instance ({instance_engine}) doesn't match " f"the engine name of your DB cluster ({cluster_engine})." ) class InvalidSubnet(RDSClientError): def __init__(self, subnet_identifier: str): super().__init__( "InvalidSubnet", f"The requested subnet {subnet_identifier} is invalid, or multiple subnets were requested that are not all in a common VPC.", ) class DBProxyAlreadyExistsFault(RDSClientError): def __init__(self, db_proxy_identifier: str): super().__init__( "DBProxyAlreadyExistsFault", f"Cannot create the DBProxy because a DBProxy with the identifier {db_proxy_identifier} already exists.", ) class DBProxyQuotaExceededFault(RDSClientError): def __init__(self) -> None: super().__init__( "DBProxyQuotaExceeded", "The request cannot be processed because it would exceed the maximum number of DBProxies.", ) class DBProxyNotFoundFault(RDSClientError): def __init__(self, db_proxy_identifier: str): super().__init__( "DBProxyNotFoundFault", f"The specified proxy name {db_proxy_identifier} doesn't correspond to a proxy owned by your Amazon Web Services account in the specified Amazon Web Services Region.", )
Memory