"""Handles incoming memorydb requests, invokes methods, returns responses."""
import json
from moto.core.responses import BaseResponse
from .models import MemoryDBBackend, memorydb_backends
class MemoryDBResponse(BaseResponse):
"""Handler for MemoryDB requests and responses."""
def __init__(self) -> None:
super().__init__(service_name="memorydb")
@property
def memorydb_backend(self) -> MemoryDBBackend:
"""Return backend instance specific for this region."""
return memorydb_backends[self.current_account][self.region]
def create_cluster(self) -> str:
params = json.loads(self.body)
cluster_name = params.get("ClusterName")
node_type = params.get("NodeType")
parameter_group_name = params.get("ParameterGroupName")
description = params.get("Description")
num_shards = params.get("NumShards")
num_replicas_per_shard = params.get("NumReplicasPerShard")
subnet_group_name = params.get("SubnetGroupName")
security_group_ids = params.get("SecurityGroupIds")
maintenance_window = params.get("MaintenanceWindow")
port = params.get("Port")
sns_topic_arn = params.get("SnsTopicArn")
tls_enabled = params.get("TLSEnabled")
kms_key_id = params.get("KmsKeyId")
snapshot_arns = params.get("SnapshotArns")
snapshot_name = params.get("SnapshotName")
snapshot_retention_limit = params.get("SnapshotRetentionLimit")
tags = params.get("Tags")
snapshot_window = params.get("SnapshotWindow")
acl_name = params.get("ACLName")
engine_version = params.get("EngineVersion")
auto_minor_version_upgrade = params.get("AutoMinorVersionUpgrade")
data_tiering = params.get("DataTiering")
cluster = self.memorydb_backend.create_cluster(
cluster_name=cluster_name,
node_type=node_type,
parameter_group_name=parameter_group_name,
description=description,
num_shards=num_shards,
num_replicas_per_shard=num_replicas_per_shard,
subnet_group_name=subnet_group_name,
security_group_ids=security_group_ids,
maintenance_window=maintenance_window,
port=port,
sns_topic_arn=sns_topic_arn,
tls_enabled=tls_enabled,
kms_key_id=kms_key_id,
snapshot_arns=snapshot_arns,
snapshot_name=snapshot_name,
snapshot_retention_limit=snapshot_retention_limit,
tags=tags,
snapshot_window=snapshot_window,
acl_name=acl_name,
engine_version=engine_version,
auto_minor_version_upgrade=auto_minor_version_upgrade,
data_tiering=data_tiering,
)
return json.dumps(dict(Cluster=cluster.to_dict()))
def create_subnet_group(self) -> str:
params = json.loads(self.body)
subnet_group_name = params.get("SubnetGroupName")
description = params.get("Description")
subnet_ids = params.get("SubnetIds")
tags = params.get("Tags")
subnet_group = self.memorydb_backend.create_subnet_group(
subnet_group_name=subnet_group_name,
description=description,
subnet_ids=subnet_ids,
tags=tags,
)
return json.dumps(dict(SubnetGroup=subnet_group.to_dict()))
def create_snapshot(self) -> str:
params = json.loads(self.body)
cluster_name = params.get("ClusterName")
snapshot_name = params.get("SnapshotName")
kms_key_id = params.get("KmsKeyId")
tags = params.get("Tags")
snapshot = self.memorydb_backend.create_snapshot(
cluster_name=cluster_name,
snapshot_name=snapshot_name,
kms_key_id=kms_key_id,
tags=tags,
)
return json.dumps(dict(Snapshot=snapshot.to_dict()))
def describe_clusters(self) -> str:
params = json.loads(self.body)
cluster_name = params.get("ClusterName")
show_shard_details = params.get("ShowShardDetails")
clusters = self.memorydb_backend.describe_clusters(
cluster_name=cluster_name,
)
return json.dumps(
dict(
Clusters=[
cluster.to_desc_dict() if show_shard_details else cluster.to_dict()
for cluster in clusters
]
)
)
def describe_snapshots(self) -> str:
params = json.loads(self.body)
cluster_name = params.get("ClusterName")
snapshot_name = params.get("SnapshotName")
source = params.get("Source")
show_detail = params.get("ShowDetail")
snapshots = self.memorydb_backend.describe_snapshots(
cluster_name=cluster_name,
snapshot_name=snapshot_name,
source=source,
)
return json.dumps(
dict(
Snapshots=[
snapshot.to_desc_dict() if show_detail else snapshot.to_dict()
for snapshot in snapshots
]
)
)
def describe_subnet_groups(self) -> str:
params = json.loads(self.body)
subnet_group_name = params.get("SubnetGroupName")
subnet_groups = self.memorydb_backend.describe_subnet_groups(
subnet_group_name=subnet_group_name,
)
return json.dumps(dict(SubnetGroups=[sg.to_dict() for sg in subnet_groups]))
def list_tags(self) -> str:
params = json.loads(self.body)
resource_arn = params.get("ResourceArn")
tag_list = self.memorydb_backend.list_tags(
resource_arn=resource_arn,
)
return json.dumps(dict(TagList=tag_list))
def tag_resource(self) -> str:
params = json.loads(self.body)
resource_arn = params.get("ResourceArn")
tags = params.get("Tags")
tag_list = self.memorydb_backend.tag_resource(
resource_arn=resource_arn,
tags=tags,
)
return json.dumps(dict(TagList=tag_list))
def untag_resource(self) -> str:
params = json.loads(self.body)
resource_arn = params.get("ResourceArn")
tag_keys = params.get("TagKeys")
tag_list = self.memorydb_backend.untag_resource(
resource_arn=resource_arn,
tag_keys=tag_keys,
)
return json.dumps(dict(TagList=tag_list))
def update_cluster(self) -> str:
params = json.loads(self.body)
cluster_name = params.get("ClusterName")
description = params.get("Description")
security_group_ids = params.get("SecurityGroupIds")
maintenance_window = params.get("MaintenanceWindow")
sns_topic_arn = params.get("SnsTopicArn")
sns_topic_status = params.get("SnsTopicStatus")
parameter_group_name = params.get("ParameterGroupName")
snapshot_window = params.get("SnapshotWindow")
snapshot_retention_limit = params.get("SnapshotRetentionLimit")
node_type = params.get("NodeType")
engine_version = params.get("EngineVersion")
replica_configuration = params.get("ReplicaConfiguration")
shard_configuration = params.get("ShardConfiguration")
acl_name = params.get("ACLName")
cluster = self.memorydb_backend.update_cluster(
cluster_name=cluster_name,
description=description,
security_group_ids=security_group_ids,
maintenance_window=maintenance_window,
sns_topic_arn=sns_topic_arn,
sns_topic_status=sns_topic_status,
parameter_group_name=parameter_group_name,
snapshot_window=snapshot_window,
snapshot_retention_limit=snapshot_retention_limit,
node_type=node_type,
engine_version=engine_version,
replica_configuration=replica_configuration,
shard_configuration=shard_configuration,
acl_name=acl_name,
)
return json.dumps(dict(Cluster=cluster.to_dict()))
def delete_cluster(self) -> str:
params = json.loads(self.body)
cluster_name = params.get("ClusterName")
final_snapshot_name = params.get("FinalSnapshotName")
cluster = self.memorydb_backend.delete_cluster(
cluster_name=cluster_name,
final_snapshot_name=final_snapshot_name,
)
return json.dumps(dict(Cluster=cluster.to_dict()))
def delete_snapshot(self) -> str:
params = json.loads(self.body)
snapshot_name = params.get("SnapshotName")
snapshot = self.memorydb_backend.delete_snapshot(
snapshot_name=snapshot_name,
)
return json.dumps(dict(Snapshot=snapshot.to_dict()))
def delete_subnet_group(self) -> str:
params = json.loads(self.body)
subnet_group_name = params.get("SubnetGroupName")
subnet_group = self.memorydb_backend.delete_subnet_group(
subnet_group_name=subnet_group_name,
)
return json.dumps(dict(SubnetGroup=subnet_group.to_dict()))