from typing import Any, List, Optional, Tuple
from botocore.awsrequest import AWSPreparedRequest
from werkzeug.wrappers import Request
from moto.core.common_types import TYPE_RESPONSE
from moto.core.responses import ActionResult, BaseResponse
from moto.core.utils import get_service_model
from moto.ec2.models import ec2_backends
from .exceptions import DBParameterGroupNotFoundError
from .models import RDSBackend, rds_backends
from .parser import QueryParser, XFormedDict
def normalize_request(request: AWSPreparedRequest) -> Request:
from urllib.parse import urlparse
parsed_url = urlparse(request.url)
normalized_request = Request.from_values(
method=request.method,
base_url=f"{parsed_url.scheme}://{parsed_url.netloc}",
path=parsed_url.path,
query_string=parsed_url.query,
data=request.body,
headers=[(k, v) for k, v in request.headers.items()],
)
return normalized_request
class RDSResponse(BaseResponse):
def __init__(self) -> None:
super().__init__(service_name="rds")
@property
def backend(self) -> RDSBackend:
return rds_backends[self.current_account][self.region]
@property
def global_backend(self) -> RDSBackend:
"""Return backend instance of the region that stores Global Clusters"""
return rds_backends[self.current_account]["us-east-1"]
def _dispatch(self, request: Any, full_url: str, headers: Any) -> TYPE_RESPONSE:
self.setup_class(request, full_url, headers)
if isinstance(request, AWSPreparedRequest):
request = normalize_request(request)
self.action = request.values["Action"]
service_model = get_service_model(self.service_name)
self.operation_model = service_model.operation_model(self.action)
parser = QueryParser(map_type=XFormedDict) # type: ignore[no-untyped-call]
self.parameters = parser.parse(
{"query_params": request.values},
self.operation_model, # type: ignore[no-untyped-call]
)
return self.call_action()
def create_db_instance(self) -> ActionResult:
db_kwargs = self.parameters
database = self.backend.create_db_instance(db_kwargs)
result = {"DBInstance": database}
return ActionResult(result)
def create_db_instance_read_replica(self) -> ActionResult:
db_kwargs = self.parameters
database = self.backend.create_db_instance_read_replica(db_kwargs)
result = {"DBInstance": database}
return ActionResult(result)
def describe_db_instances(self) -> ActionResult:
db_instance_identifier = self.parameters.get("DBInstanceIdentifier")
filters = self.parameters.get("Filters", [])
filter_dict = {f["Name"]: f["Values"] for f in filters}
all_instances = list(
self.backend.describe_db_instances(
db_instance_identifier, filters=filter_dict
)
)
instances_resp, next_marker = self._paginate(all_instances)
result = {
"DBInstances": instances_resp,
"Marker": next_marker,
}
return ActionResult(result)
def modify_db_instance(self) -> ActionResult:
db_instance_identifier = self.parameters.get("DBInstanceIdentifier")
db_kwargs = self.parameters
# This is a hack because the backend code expects the parameter to be
# lowercased before validation is performed. We need to move the
# lowercasing to a backend setter (in one place) and then do the validation.
if "PreferredMaintenanceWindow" in db_kwargs:
db_kwargs["PreferredMaintenanceWindow"] = db_kwargs[
"PreferredMaintenanceWindow"
].lower()
new_db_instance_identifier = self.parameters.get("NewDBInstanceIdentifier")
if new_db_instance_identifier:
db_kwargs["new_db_instance_identifier"] = new_db_instance_identifier
database = self.backend.modify_db_instance(db_instance_identifier, db_kwargs)
result = {"DBInstance": database}
return ActionResult(result)
def delete_db_instance(self) -> ActionResult:
db_snapshot_name = self.parameters.get("FinalDBSnapshotIdentifier")
if db_snapshot_name is not None:
self.backend.validate_db_snapshot_identifier(
db_snapshot_name, parameter_name="FinalDBSnapshotIdentifier"
)
database = self.backend.delete_db_instance(**self.parameters)
result = {"DBInstance": database}
return ActionResult(result)
def reboot_db_instance(self) -> ActionResult:
db_instance_identifier = self.parameters.get("DBInstanceIdentifier")
database = self.backend.reboot_db_instance(db_instance_identifier)
result = {"DBInstance": database}
return ActionResult(result)
def create_db_snapshot(self) -> ActionResult:
self.backend.validate_db_snapshot_identifier(
self.parameters["DBSnapshotIdentifier"],
parameter_name="DBSnapshotIdentifier",
)
snapshot = self.backend.create_db_snapshot(**self.parameters)
result = {"DBSnapshot": snapshot}
return ActionResult(result)
def copy_db_snapshot(self) -> ActionResult:
target_snapshot_identifier = self.parameters.get("TargetDBSnapshotIdentifier")
self.backend.validate_db_snapshot_identifier(
target_snapshot_identifier, parameter_name="TargetDBSnapshotIdentifier"
)
snapshot = self.backend.copy_db_snapshot(**self.parameters)
result = {"DBSnapshot": snapshot}
return ActionResult(result)
def describe_db_snapshots(self) -> ActionResult:
db_instance_identifier = self.parameters.get("DBInstanceIdentifier")
db_snapshot_identifier = self.parameters.get("DBSnapshotIdentifier")
snapshot_type = self.parameters.get("SnapshotType")
filters = self.parameters.get("Filters", [])
filter_dict = {f["Name"]: f["Values"] for f in filters}
snapshots = self.backend.describe_db_snapshots(
db_instance_identifier, db_snapshot_identifier, snapshot_type, filter_dict
)
result = {"DBSnapshots": snapshots}
return ActionResult(result)
def promote_read_replica(self) -> ActionResult:
db_kwargs = self.parameters
database = self.backend.promote_read_replica(db_kwargs)
result = {"DBInstance": database}
return ActionResult(result)
def delete_db_snapshot(self) -> ActionResult:
db_snapshot_identifier = self.parameters.get("DBSnapshotIdentifier")
snapshot = self.backend.delete_db_snapshot(db_snapshot_identifier)
result = {"DBSnapshot": snapshot}
return ActionResult(result)
def restore_db_instance_from_db_snapshot(self) -> ActionResult:
db_snapshot_identifier = self.parameters.get("DBSnapshotIdentifier")
db_kwargs = self.parameters
new_instance = self.backend.restore_db_instance_from_db_snapshot(
db_snapshot_identifier, db_kwargs
)
result = {"DBInstance": new_instance}
return ActionResult(result)
def restore_db_instance_to_point_in_time(self) -> ActionResult:
source_db_identifier = self.parameters.get("SourceDBInstanceIdentifier")
target_db_identifier = self.parameters.get("TargetDBInstanceIdentifier")
db_kwargs = self.parameters
new_instance = self.backend.restore_db_instance_to_point_in_time(
source_db_identifier, target_db_identifier, db_kwargs
)
result = {"DBInstance": new_instance}
return ActionResult(result)
def restore_db_cluster_to_point_in_time(self) -> ActionResult:
cluster = self.backend.restore_db_cluster_to_point_in_time(**self.parameters)
result = {"DBCluster": cluster}
return ActionResult(result)
def failover_db_cluster(self) -> ActionResult:
cluster = self.backend.failover_db_cluster(**self.parameters)
result = {"DBCluster": cluster}
return ActionResult(result)
def list_tags_for_resource(self) -> ActionResult:
arn = self.parameters.get("ResourceName")
tags = self.backend.list_tags_for_resource(arn)
result = {"TagList": tags}
return ActionResult(result)
def add_tags_to_resource(self) -> ActionResult:
arn = self.parameters.get("ResourceName")
tags = self.parameters.get("Tags", [])
self.backend.add_tags_to_resource(arn, tags)
return ActionResult({})
def remove_tags_from_resource(self) -> ActionResult:
arn = self.parameters.get("ResourceName")
tag_keys = self.parameters.get("TagKeys")
self.backend.remove_tags_from_resource(arn, tag_keys)
return ActionResult({})
def stop_db_instance(self) -> ActionResult:
db_instance_identifier = self.parameters.get("DBInstanceIdentifier")
db_snapshot_identifier = self.parameters.get("DBSnapshotIdentifier")
if db_snapshot_identifier is not None:
self.backend.validate_db_snapshot_identifier(
db_snapshot_identifier, parameter_name="DBSnapshotIdentifier"
)
database = self.backend.stop_db_instance(
db_instance_identifier, db_snapshot_identifier
)
result = {"DBInstance": database}
return ActionResult(result)
def start_db_instance(self) -> ActionResult:
db_instance_identifier = self.parameters.get("DBInstanceIdentifier")
database = self.backend.start_db_instance(db_instance_identifier)
result = {"DBInstance": database}
return ActionResult(result)
def create_db_security_group(self) -> ActionResult:
group_name = self.parameters.get("DBSecurityGroupName")
description = self.parameters.get("DBSecurityGroupDescription")
tags = self.parameters.get("Tags", [])
security_group = self.backend.create_db_security_group(
group_name, description, tags
)
result = {"DBSecurityGroup": security_group}
return ActionResult(result)
def describe_db_security_groups(self) -> ActionResult:
security_group_name = self.parameters.get("DBSecurityGroupName")
security_groups = self.backend.describe_security_groups(security_group_name)
result = {"DBSecurityGroups": security_groups}
return ActionResult(result)
def delete_db_security_group(self) -> ActionResult:
security_group_name = self.parameters.get("DBSecurityGroupName")
security_group = self.backend.delete_security_group(security_group_name)
result = {"DBSecurityGroup": security_group}
return ActionResult(result)
def authorize_db_security_group_ingress(self) -> ActionResult:
security_group_name = self.parameters.get("DBSecurityGroupName")
cidr_ip = self.parameters.get("CIDRIP")
security_group = self.backend.authorize_security_group(
security_group_name, cidr_ip
)
result = {"DBSecurityGroup": security_group}
return ActionResult(result)
def create_db_subnet_group(self) -> ActionResult:
subnet_name = self.parameters.get("DBSubnetGroupName")
description = self.parameters.get("DBSubnetGroupDescription")
subnet_ids = self.parameters.get("SubnetIds", [])
tags = self.parameters.get("Tags", [])
subnets = [
ec2_backends[self.current_account][self.region].get_subnet(subnet_id)
for subnet_id in subnet_ids
]
subnet_group = self.backend.create_subnet_group(
subnet_name, description, subnets, tags
)
result = {"DBSubnetGroup": subnet_group}
return ActionResult(result)
def describe_db_subnet_groups(self) -> ActionResult:
subnet_name = self.parameters.get("DBSubnetGroupName")
subnet_groups = self.backend.describe_db_subnet_groups(subnet_name)
result = {"DBSubnetGroups": subnet_groups}
return ActionResult(result)
def modify_db_subnet_group(self) -> ActionResult:
subnet_name = self.parameters.get("DBSubnetGroupName")
description = self.parameters.get("DBSubnetGroupDescription")
subnet_ids = self.parameters.get("SubnetIds", [])
subnets = [
ec2_backends[self.current_account][self.region].get_subnet(subnet_id)
for subnet_id in subnet_ids
]
subnet_group = self.backend.modify_db_subnet_group(
subnet_name, description, subnets
)
result = {"DBSubnetGroup": subnet_group}
return ActionResult(result)
def delete_db_subnet_group(self) -> ActionResult:
subnet_name = self.parameters.get("DBSubnetGroupName")
subnet_group = self.backend.delete_subnet_group(subnet_name)
result = {"DBSubnetGroup": subnet_group}
return ActionResult(result)
def create_option_group(self) -> ActionResult:
kwargs = self.parameters
option_group = self.backend.create_option_group(kwargs)
result = {"OptionGroup": option_group}
return ActionResult(result)
def delete_option_group(self) -> ActionResult:
name = self.parameters["OptionGroupName"]
option_group = self.backend.delete_option_group(name)
result = {"OptionGroup": option_group}
return ActionResult(result)
def describe_option_groups(self) -> ActionResult:
kwargs = self.parameters
option_groups = self.backend.describe_option_groups(kwargs)
option_groups, marker = self._paginate(option_groups)
result = {
"OptionGroupsList": option_groups,
"Marker": marker,
}
return ActionResult(result)
def describe_option_group_options(self) -> str:
engine_name = self.parameters.get("EngineName")
major_engine_version = self.parameters.get("MajorEngineVersion")
return self.backend.describe_option_group_options(
engine_name, major_engine_version
)
def modify_option_group(self) -> ActionResult:
option_group_name = self.parameters.get("OptionGroupName")
options_to_include = self.parameters.get("OptionsToInclude", [])
options_to_remove = self.parameters.get("OptionsToRemove", [])
option_group = self.backend.modify_option_group(
option_group_name, options_to_include, options_to_remove
)
result = {"OptionGroup": option_group}
return ActionResult(result)
def create_db_parameter_group(self) -> ActionResult:
kwargs = self.parameters
db_parameter_group = self.backend.create_db_parameter_group(kwargs)
result = {"DBParameterGroup": db_parameter_group}
return ActionResult(result)
def copy_db_parameter_group(self) -> ActionResult:
target_db_parameter_group = self.backend.copy_db_parameter_group(
**self.parameters
)
result = {"DBParameterGroup": target_db_parameter_group}
return ActionResult(result)
def describe_db_parameter_groups(self) -> ActionResult:
kwargs = self.parameters
db_parameter_groups = self.backend.describe_db_parameter_groups(kwargs)
db_parameter_groups, _ = self._paginate(db_parameter_groups)
result = {"DBParameterGroups": db_parameter_groups}
return ActionResult(result)
def modify_db_parameter_group(self) -> ActionResult:
db_parameter_group_name = self.parameters.get("DBParameterGroupName")
param_list = self.parameters.get("Parameters", [])
# Raw dict is stored on the backend, so we need the original PascalCase items.
db_parameter_group_parameters = [
dict(param.original_items()) for param in param_list
]
db_parameter_group = self.backend.modify_db_parameter_group(
db_parameter_group_name, db_parameter_group_parameters
)
result = {"DBParameterGroupName": db_parameter_group.name}
return ActionResult(result)
def describe_db_parameters(self) -> ActionResult:
db_parameter_group_name = self.parameters.get("DBParameterGroupName")
db_parameter_groups = self.backend.describe_db_parameter_groups(
{"db_parameter_group_name": db_parameter_group_name}
)
if not db_parameter_groups:
raise DBParameterGroupNotFoundError(db_parameter_group_name)
parameters = db_parameter_groups[0].parameters.values()
result = {"Parameters": parameters}
return ActionResult(result)
def delete_db_parameter_group(self) -> ActionResult:
name = self.parameters["DBParameterGroupName"]
db_parameter_group = self.backend.delete_db_parameter_group(name)
return ActionResult(db_parameter_group)
def describe_db_cluster_parameters(self) -> ActionResult:
# TODO: This never worked at all...
db_parameter_group_name = self.parameters.get("DBParameterGroupName")
db_parameter_groups = self.backend.describe_db_cluster_parameters()
if db_parameter_groups is None:
raise DBParameterGroupNotFoundError(db_parameter_group_name)
result = {"Parameters": db_parameter_groups}
return ActionResult(result)
def create_db_cluster(self) -> ActionResult:
kwargs = self.parameters
cluster = self.backend.create_db_cluster(kwargs)
result = {"DBCluster": cluster}
return ActionResult(result)
def modify_db_cluster(self) -> ActionResult:
kwargs = self.parameters
cluster = self.backend.modify_db_cluster(kwargs)
result = {"DBCluster": cluster}
return ActionResult(result)
def describe_db_clusters(self) -> ActionResult:
_id = self.parameters.get("DBClusterIdentifier")
filters = self.parameters.get("Filters", [])
filter_dict = {f["Name"]: f["Values"] for f in filters}
clusters = self.backend.describe_db_clusters(
db_cluster_identifier=_id, filters=filter_dict
)
result = {"DBClusters": clusters}
return ActionResult(result)
def delete_db_cluster(self) -> ActionResult:
_id = self.parameters.get("DBClusterIdentifier")
snapshot_name = self.parameters.get("FinalDBSnapshotIdentifier")
cluster = self.backend.delete_db_cluster(
cluster_identifier=_id, snapshot_name=snapshot_name
)
result = {"DBCluster": cluster}
return ActionResult(result)
def start_db_cluster(self) -> ActionResult:
_id = self.parameters.get("DBClusterIdentifier")
cluster = self.backend.start_db_cluster(cluster_identifier=_id)
result = {"DBCluster": cluster}
return ActionResult(result)
def stop_db_cluster(self) -> ActionResult:
_id = self.parameters.get("DBClusterIdentifier")
cluster = self.backend.stop_db_cluster(cluster_identifier=_id)
result = {"DBCluster": cluster}
return ActionResult(result)
def create_db_cluster_snapshot(self) -> ActionResult:
snapshot = self.backend.create_db_cluster_snapshot(**self.parameters)
result = {"DBClusterSnapshot": snapshot}
return ActionResult(result)
def copy_db_cluster_snapshot(self) -> ActionResult:
snapshot = self.backend.copy_db_cluster_snapshot(**self.parameters)
result = {"DBClusterSnapshot": snapshot}
return ActionResult(result)
def describe_db_cluster_snapshots(self) -> ActionResult:
db_cluster_identifier = self.parameters.get("DBClusterIdentifier")
db_snapshot_identifier = self.parameters.get("DBClusterSnapshotIdentifier")
snapshot_type = self.parameters.get("SnapshotType")
filters = self.parameters.get("Filters", [])
filter_dict = {f["Name"]: f["Values"] for f in filters}
snapshots = self.backend.describe_db_cluster_snapshots(
db_cluster_identifier, db_snapshot_identifier, snapshot_type, filter_dict
)
results = {"DBClusterSnapshots": snapshots}
return ActionResult(results)
def delete_db_cluster_snapshot(self) -> ActionResult:
db_snapshot_identifier = self.parameters["DBClusterSnapshotIdentifier"]
snapshot = self.backend.delete_db_cluster_snapshot(db_snapshot_identifier)
result = {"DBClusterSnapshot": snapshot}
return ActionResult(result)
def restore_db_cluster_from_snapshot(self) -> ActionResult:
db_snapshot_identifier = self.parameters.get("SnapshotIdentifier")
db_kwargs = self.parameters
new_cluster = self.backend.restore_db_cluster_from_snapshot(
db_snapshot_identifier, db_kwargs
)
result = {"DBCluster": new_cluster}
return ActionResult(result)
def start_export_task(self) -> ActionResult:
kwargs = self.parameters
export_task = self.backend.start_export_task(kwargs)
return ActionResult(export_task)
def cancel_export_task(self) -> ActionResult:
export_task_identifier = self.parameters.get("ExportTaskIdentifier")
export_task = self.backend.cancel_export_task(export_task_identifier)
return ActionResult(export_task)
def describe_export_tasks(self) -> ActionResult:
export_task_identifier = self.parameters.get("ExportTaskIdentifier")
tasks = self.backend.describe_export_tasks(export_task_identifier)
result = {"ExportTasks": tasks}
return ActionResult(result)
def create_event_subscription(self) -> ActionResult:
kwargs = self.parameters
subscription = self.backend.create_event_subscription(kwargs)
result = {"EventSubscription": subscription}
return ActionResult(result)
def delete_event_subscription(self) -> ActionResult:
subscription_name = self.parameters.get("SubscriptionName")
subscription = self.backend.delete_event_subscription(subscription_name)
result = {"EventSubscription": subscription}
return ActionResult(result)
def describe_event_subscriptions(self) -> ActionResult:
subscription_name = self.parameters.get("SubscriptionName")
subscriptions = self.backend.describe_event_subscriptions(subscription_name)
result = {"EventSubscriptionsList": subscriptions}
return ActionResult(result)
def describe_orderable_db_instance_options(self) -> ActionResult:
engine = self.parameters.get("Engine")
engine_version = self.parameters.get("EngineVersion")
options = self.backend.describe_orderable_db_instance_options(
engine, engine_version
)
result = {"OrderableDBInstanceOptions": options}
return ActionResult(result)
def describe_global_clusters(self) -> ActionResult:
clusters = self.global_backend.describe_global_clusters()
result = {"GlobalClusters": clusters}
return ActionResult(result)
def create_global_cluster(self) -> ActionResult:
params = self.parameters
cluster = self.global_backend.create_global_cluster(
global_cluster_identifier=params["GlobalClusterIdentifier"],
source_db_cluster_identifier=params.get("SourceDBClusterIdentifier"),
engine=params.get("Engine"),
engine_version=params.get("EngineVersion"),
storage_encrypted=params.get("StorageEncrypted"),
deletion_protection=params.get("DeletionProtection"),
)
result = {"GlobalCluster": cluster}
return ActionResult(result)
def delete_global_cluster(self) -> ActionResult:
params = self.parameters
cluster = self.global_backend.delete_global_cluster(
global_cluster_identifier=params["GlobalClusterIdentifier"],
)
result = {"GlobalCluster": cluster}
return ActionResult(result)
def remove_from_global_cluster(self) -> ActionResult:
params = self.parameters
global_cluster = self.backend.remove_from_global_cluster(
global_cluster_identifier=params["GlobalClusterIdentifier"],
db_cluster_identifier=params["DbClusterIdentifier"],
)
result = {"GlobalCluster": global_cluster}
return ActionResult(result)
def create_db_cluster_parameter_group(self) -> ActionResult:
group_name = self.parameters.get("DBClusterParameterGroupName")
family = self.parameters.get("DBParameterGroupFamily")
desc = self.parameters.get("Description")
db_cluster_parameter_group = self.backend.create_db_cluster_parameter_group(
group_name=group_name,
family=family,
description=desc,
)
result = {"DBClusterParameterGroup": db_cluster_parameter_group}
return ActionResult(result)
def describe_db_cluster_parameter_groups(self) -> ActionResult:
group_name = self.parameters.get("DBClusterParameterGroupName")
db_parameter_groups = self.backend.describe_db_cluster_parameter_groups(
group_name=group_name,
)
result = {"DBClusterParameterGroups": db_parameter_groups}
return ActionResult(result)
def delete_db_cluster_parameter_group(self) -> ActionResult:
group_name = self.parameters.get("DBClusterParameterGroupName")
self.backend.delete_db_cluster_parameter_group(
group_name=group_name,
)
return ActionResult({})
def promote_read_replica_db_cluster(self) -> ActionResult:
db_cluster_identifier = self.parameters.get("DBClusterIdentifier")
cluster = self.backend.promote_read_replica_db_cluster(db_cluster_identifier)
result = {"DBCluster": cluster}
return ActionResult(result)
def describe_db_snapshot_attributes(self) -> ActionResult:
params = self.parameters
db_snapshot_identifier = params["DBSnapshotIdentifier"]
db_snapshot_attributes_result = self.backend.describe_db_snapshot_attributes(
db_snapshot_identifier=db_snapshot_identifier,
)
result = {
"DBSnapshotAttributesResult": {
"DBSnapshotIdentifier": db_snapshot_identifier,
"DBSnapshotAttributes": [
{"AttributeName": name, "AttributeValues": values}
for name, values in db_snapshot_attributes_result.items()
],
}
}
return ActionResult(result)
def modify_db_snapshot_attribute(self) -> ActionResult:
params = self.parameters
db_snapshot_identifier = params["DBSnapshotIdentifier"]
db_snapshot_attributes_result = self.backend.modify_db_snapshot_attribute(
db_snapshot_identifier=db_snapshot_identifier,
attribute_name=params["AttributeName"],
values_to_add=params.get("ValuesToAdd"),
values_to_remove=params.get("ValuesToRemove"),
)
result = {
"DBSnapshotAttributesResult": {
"DBSnapshotIdentifier": db_snapshot_identifier,
"DBSnapshotAttributes": [
{"AttributeName": name, "AttributeValues": values}
for name, values in db_snapshot_attributes_result.items()
],
}
}
return ActionResult(result)
def describe_db_cluster_snapshot_attributes(self) -> ActionResult:
params = self.parameters
db_cluster_snapshot_identifier = params["DBClusterSnapshotIdentifier"]
db_cluster_snapshot_attributes_result = (
self.backend.describe_db_cluster_snapshot_attributes(
db_cluster_snapshot_identifier=db_cluster_snapshot_identifier,
)
)
result = {
"DBClusterSnapshotAttributesResult": {
"DBClusterSnapshotIdentifier": db_cluster_snapshot_identifier,
"DBClusterSnapshotAttributes": [
{"AttributeName": name, "AttributeValues": values}
for name, values in db_cluster_snapshot_attributes_result.items()
],
}
}
return ActionResult(result)
def modify_db_cluster_snapshot_attribute(self) -> ActionResult:
params = self.parameters
db_cluster_snapshot_identifier = params["DBClusterSnapshotIdentifier"]
db_cluster_snapshot_attributes_result = (
self.backend.modify_db_cluster_snapshot_attribute(
db_cluster_snapshot_identifier=db_cluster_snapshot_identifier,
attribute_name=params["AttributeName"],
values_to_add=params.get("ValuesToAdd"),
values_to_remove=params.get("ValuesToRemove"),
)
)
result = {
"DBClusterSnapshotAttributesResult": {
"DBClusterSnapshotIdentifier": db_cluster_snapshot_identifier,
"DBClusterSnapshotAttributes": [
{"AttributeName": name, "AttributeValues": values}
for name, values in db_cluster_snapshot_attributes_result.items()
],
}
}
return ActionResult(result)
def describe_db_proxies(self) -> ActionResult:
params = self.parameters
db_proxy_name = params.get("DBProxyName")
# filters = params.get("Filters")
marker = params.get("Marker")
db_proxies = self.backend.describe_db_proxies(
db_proxy_name=db_proxy_name,
# filters=filters,
)
result = {
"DBProxies": db_proxies,
"Marker": marker,
}
return ActionResult(result)
def create_db_proxy(self) -> ActionResult:
params = self.parameters
db_proxy_name = params["DBProxyName"]
engine_family = params["EngineFamily"]
auth = params["Auth"]
role_arn = params["RoleArn"]
vpc_subnet_ids = params["VpcSubnetIds"]
vpc_security_group_ids = params.get("VpcSecurityGroupIds")
require_tls = params.get("RequireTLS")
idle_client_timeout = params.get("IdleClientTimeout")
debug_logging = params.get("DebugLogging")
tags = self.parameters.get("Tags", [])
db_proxy = self.backend.create_db_proxy(
db_proxy_name=db_proxy_name,
engine_family=engine_family,
auth=auth,
role_arn=role_arn,
vpc_subnet_ids=vpc_subnet_ids,
vpc_security_group_ids=vpc_security_group_ids,
require_tls=require_tls,
idle_client_timeout=idle_client_timeout,
debug_logging=debug_logging,
tags=tags,
)
result = {"DBProxy": db_proxy}
return ActionResult(result)
def register_db_proxy_targets(self) -> ActionResult:
db_proxy_name = self.parameters.get("DBProxyName")
target_group_name = self.parameters.get("TargetGroupName")
db_cluster_identifiers = self.parameters.get("DBClusterIdentifiers", [])
db_instance_identifiers = self.parameters.get("DBInstanceIdentifiers", [])
targets = self.backend.register_db_proxy_targets(
db_proxy_name=db_proxy_name,
target_group_name=target_group_name,
db_cluster_identifiers=db_cluster_identifiers,
db_instance_identifiers=db_instance_identifiers,
)
result = {"DBProxyTargets": targets}
return ActionResult(result)
def deregister_db_proxy_targets(self) -> ActionResult:
db_proxy_name = self.parameters.get("DBProxyName")
target_group_name = self.parameters.get("TargetGroupName")
db_cluster_identifiers = self.parameters.get("DBClusterIdentifiers", [])
db_instance_identifiers = self.parameters.get("DBInstanceIdentifiers", [])
self.backend.deregister_db_proxy_targets(
db_proxy_name=db_proxy_name,
target_group_name=target_group_name,
db_cluster_identifiers=db_cluster_identifiers,
db_instance_identifiers=db_instance_identifiers,
)
return ActionResult({})
def describe_db_proxy_targets(self) -> ActionResult:
proxy_name = self.parameters.get("DBProxyName")
targets = self.backend.describe_db_proxy_targets(proxy_name=proxy_name)
result = {"Targets": targets}
return ActionResult(result)
def delete_db_proxy(self) -> ActionResult:
proxy_name = self.parameters.get("DBProxyName")
proxy = self.backend.delete_db_proxy(proxy_name=proxy_name)
result = {"DBProxy": proxy}
return ActionResult(result)
def describe_db_proxy_target_groups(self) -> ActionResult:
proxy_name = self.parameters.get("DBProxyName")
groups = self.backend.describe_db_proxy_target_groups(proxy_name=proxy_name)
result = {"TargetGroups": groups}
return ActionResult(result)
def modify_db_proxy_target_group(self) -> ActionResult:
proxy_name = self.parameters.get("DBProxyName")
config = self.parameters.get("ConnectionPoolConfig", {})
group = self.backend.modify_db_proxy_target_group(
proxy_name=proxy_name, config=config
)
result = {"DBProxyTargetGroup": group}
return ActionResult(result)
def describe_db_instance_automated_backups(self) -> ActionResult:
automated_backups = self.backend.describe_db_instance_automated_backups(
**self.parameters
)
result = {"DBInstanceAutomatedBackups": automated_backups}
return ActionResult(result)
def describe_events(self) -> ActionResult:
events = self.backend.describe_events(**self.parameters)
result = {"Events": events}
return ActionResult(result)
def describe_db_log_files(self) -> ActionResult:
log_files = self.backend.describe_db_log_files(**self.parameters)
result = {"DescribeDBLogFiles": log_files}
return ActionResult(result)
def _paginate(self, resources: List[Any]) -> Tuple[List[Any], Optional[str]]:
from moto.rds.exceptions import InvalidParameterValue
marker = self.parameters.get("Marker")
# Default was originally set to 50 instead of 100 for ease of testing. Should fix.
page_size = self.parameters.get("MaxRecords", 50)
if page_size < 20 or page_size > 100:
msg = (
f"Invalid value {page_size} for MaxRecords. Must be between 20 and 100"
)
raise InvalidParameterValue(msg)
all_resources = list(resources)
all_ids = [resource.resource_id for resource in all_resources]
if marker:
start = all_ids.index(marker) + 1
else:
start = 0
paginated_resources = all_resources[start : start + page_size]
next_marker = None
if len(all_resources) > start + page_size:
next_marker = paginated_resources[-1].resource_id
return paginated_resources, next_marker