import json from moto.core.responses import BaseResponse from .models import ( DBStorageType, DeploymentType, NetworkType, TimestreamInfluxDBBackend, timestreaminfluxdb_backends, ) class TimestreamInfluxDBResponse(BaseResponse): """Handler for TimestreamInfluxDB requests and responses.""" def __init__(self) -> None: super().__init__(service_name="timestream-influxdb") @property def timestreaminfluxdb_backend(self) -> TimestreamInfluxDBBackend: return timestreaminfluxdb_backends[self.current_account][self.region] def create_db_instance(self) -> str: params = json.loads(self.body) name = params.get("name") username = params.get("username") password = params.get("password") organization = params.get("organization") bucket = params.get("bucket") db_instance_type = params.get("dbInstanceType") vpc_subnet_ids = params.get("vpcSubnetIds") vpc_security_group_ids = params.get("vpcSecurityGroupIds") publicly_accessible = params.get("publiclyAccessible", False) db_storage_type = params.get("dbStorageType", DBStorageType.InfluxIOIncludedT1) allocated_storage = params.get("allocatedStorage") db_parameter_group_identifier = params.get("dbParameterGroupIdentifier") deployment_type = params.get("deploymentType", DeploymentType.SINGLE_AZ) log_delivery_configuration = params.get("logDeliveryConfiguration", {}) tags = params.get("tags", {}) port = int(params.get("port", 8086)) network_type = params.get("networkType", NetworkType.IPV4) created_instance = self.timestreaminfluxdb_backend.create_db_instance( name=name, username=username, password=password, organization=organization, bucket=bucket, db_instance_type=db_instance_type, vpc_subnet_ids=vpc_subnet_ids, vpc_security_group_ids=vpc_security_group_ids, publicly_accessible=publicly_accessible, db_storage_type=db_storage_type, allocated_storage=allocated_storage, db_parameter_group_identifier=db_parameter_group_identifier, deployment_type=deployment_type, log_delivery_configuration=log_delivery_configuration, tags=tags, port=port, network_type=network_type, ) return json.dumps(created_instance.to_dict()) def delete_db_instance(self) -> str: params = json.loads(self.body) id = params.get("identifier") deleted_instance = self.timestreaminfluxdb_backend.delete_db_instance(id=id) return json.dumps(deleted_instance.to_dict()) def get_db_instance(self) -> str: params = json.loads(self.body) id = params.get("identifier") instance = self.timestreaminfluxdb_backend.get_db_instance(id=id) return json.dumps(instance.to_dict()) def list_db_instances(self) -> str: """ Pagination is not yet implemented """ instances = self.timestreaminfluxdb_backend.list_db_instances() return json.dumps({"items": instances}) def tag_resource(self) -> str: params = json.loads(self.body) arn = params.get("resourceArn") tags = params.get("tags") self.timestreaminfluxdb_backend.tag_resource(resource_arn=arn, tags=tags) return "{}" def untag_resource(self) -> str: params = json.loads(self.body) arn = params.get("resourceArn") tag_keys = params.get("tagKeys") self.timestreaminfluxdb_backend.untag_resource( resource_arn=arn, tag_keys=tag_keys ) return "{}" def list_tags_for_resource(self) -> str: params = json.loads(self.body) arn = params.get("resourceArn") tags = self.timestreaminfluxdb_backend.list_tags_for_resource(resource_arn=arn) return json.dumps({"tags": tags})
Memory