"""Handles incoming dsql requests, invokes methods, returns responses.""" import json from moto.core.responses import BaseResponse from .models import AuroraDSQLBackend, dsql_backends class AuroraDSQLResponse(BaseResponse): """Handler for AuroraDSQL requests and responses.""" def __init__(self) -> None: super().__init__(service_name="dsql") @property def dsql_backend(self) -> AuroraDSQLBackend: """Return backend instance specific for this region.""" return dsql_backends[self.current_account][self.region] def create_cluster(self) -> str: params = self._get_params() deletion_protection_enabled = params.get("deletionProtectionEnabled", True) tags = params.get("tags") client_token = params.get("clientToken") cluster = self.dsql_backend.create_cluster( deletion_protection_enabled=deletion_protection_enabled, tags=tags, client_token=client_token, ) return json.dumps(dict(cluster.to_dict())) def get_cluster(self) -> str: identifier = self.path.split("/")[-1] cluster = self.dsql_backend.get_cluster(identifier=identifier) return json.dumps(dict(cluster.to_dict()))
Memory