"""TimestreamInfluxDBBackend class with methods for supported APIs.""" from enum import Enum from typing import Any, Dict, List, Optional from moto.core.base_backend import BackendDict, BaseBackend from moto.core.common_models import BaseModel from moto.utilities.tagging_service import TaggingService from .exceptions import ( ConflictException, ResourceNotFoundException, ValidationException, ) from .utils import random_id, validate_name class InstanceStatus(str, Enum): CREATING = "CREATING" AVAILABLE = "AVAILABLE" DELETING = "DELETING" MODIFYING = "MODIFYING" UPDATING = "UPDATING" DELETED = "DELETED" FAILED = "FAILED" UPDATING_DEPLOYMENT_TYPE = "UPDATING_DEPLOYMENT_TYPE" UPDATING_INSTANCE_TYPE = "UPDATING_INSTANCE_TYPE" class NetworkType(str, Enum): IPV4 = "IPV4" DUAL = "DUAL" class InstanceType(str, Enum): DB_INFLUX_MEDIUM = "db.influx.medium" DB_INFLUX_LARGE = "db.influx.large" DB_INFLUX_XLARGE = "db.influx.xlarge" DB_INFLUX_2XLARGE = "db.influx.2xlarge" DB_INFLUX_4XLARGE = "db.influx.4xlarge" DB_INFLUX_8XLARGE = "db.influx.8xlarge" DB_INFLUX_12XLARGE = "db.influx.12xlarge" DB_INFLUX_16XLARGE = "db.influx.16xlarge" class DBStorageType(str, Enum): InfluxIOIncludedT1 = "InfluxIOIncludedT1" InfluxIOIncludedT2 = "InfluxIOIncludedT2" InfluxIOIncludedT3 = "InfluxIOIncludedT3" class DeploymentType(str, Enum): SINGLE_AZ = "SINGLE_AZ" WITH_MULTIAZ_STANDBY = "WITH_MULTIAZ_STANDBY" class DBInstance(BaseModel): def __init__( self, name: str, username: Optional[str], password: str, organization: str, bucket: str, dbInstanceType: str, vpcSubnetIds: List[str], vpcSecurityGroupIds: List[str], publiclyAccessible: bool, dbStorageType: str, allocatedStorage: int, dbParameterGroupIdentifier: Optional[str], deploymentType: str, logDeliveryConfiguration: Optional[Dict[str, Any]], tags: Optional[Dict[str, Any]], port: int, networkType: str, region_name: str, account_id: str, endpoint_id: str, ): # Generate a random id of size 10 self.id = random_id() self.name = name self.username = username self.password = password self.organization = organization self.bucket = bucket self.db_instance_type = dbInstanceType self.vpc_subnet_ids = vpcSubnetIds self.vpc_security_group_ids = vpcSecurityGroupIds self.publicly_accessible = publiclyAccessible self.db_storage_type = dbStorageType self.allocated_storage = allocatedStorage self.db_parameter_group_id = dbParameterGroupIdentifier self.deployment_type = deploymentType self.log_delivery_configuration = logDeliveryConfiguration self.port = port self.network_type = networkType self.status = InstanceStatus.CREATING self.arn = f"arn:aws:timestream-influxdb:{region_name}:{account_id}:db-instance/{self.id}" self.endpoint = ( f"{self.id}-{endpoint_id}.timestream-influxdb.{region_name}.on.aws" ) # Before 12/09/2024, there was a different endpoint format. self.endpoint_old = ( f"{self.name}-{endpoint_id}.timestream-influxdb.{region_name}.on.aws" ) self.availability_zone = "" # TODO implement this self.secondary_availability_zone = "" # TODO implement this def to_dict(self) -> Dict[str, Any]: return { "id": self.id, "name": self.name, "arn": self.arn, "status": self.status, "endpoint": self.endpoint, "port": self.port, "networkType": self.network_type, "dbInstanceType": self.db_instance_type, "dbStorageType": self.db_storage_type, "allocatedStorage": self.allocated_storage, "deploymentType": self.deployment_type, "vpcSubnetIds": self.vpc_subnet_ids, "publiclyAccessible": self.publicly_accessible, "vpcSecurityGroupIds": self.vpc_security_group_ids, "dbParameterGroupIdentifier": self.db_parameter_group_id, # TODO implement this "availabilityZone": self.availability_zone, # TODO implement this "secondaryAvailabilityZone": self.secondary_availability_zone, # TODO implement this "logDeliveryConfiguration": self.log_delivery_configuration, # TODO implement this "influxAuthParametersSecretArn": "", # TODO implement this } class TimestreamInfluxDBBackend(BaseBackend): """Implementation of TimestreamInfluxDB APIs.""" def __init__(self, region_name: str, account_id: str): super().__init__(region_name, account_id) # the endpoint identifier is unique per account and per region # https://docs.aws.amazon.com/timestream/latest/developerguide/timestream-for-influxdb.html self.endpoint_id: str = random_id(10) self.db_instances: Dict[str, DBInstance] = {} self.tagger = TaggingService() def create_db_instance( self, name: str, username: Optional[str], # required if using InfluxDB UI though password: str, organization: str, bucket: str, db_instance_type: str, vpc_subnet_ids: List[str], vpc_security_group_ids: List[str], db_storage_type: str, publicly_accessible: bool, allocated_storage: int, db_parameter_group_identifier: str, deployment_type: str, log_delivery_configuration: Optional[Dict[str, Any]], tags: Optional[Dict[str, str]], port: int, network_type: str, ) -> DBInstance: """ dbParameterGroupIdentifier argument is not yet handled deploymentType currently is auto set to 'SINGLE_AZ' if not passed in. publicAccessible is not yet handled logDeliveryConfiguration is not yet handled AvailabilityZone and SecondaryAvailabilityZone are not yet handled influxAuthParametersSecretArn is not yet handled """ # Checks: for db_instance in self.db_instances.values(): if db_instance.name == name: raise ConflictException( f"A DB Instance with the name {name} already exists" ) validate_name(name) if db_storage_type not in [t.value for t in DBStorageType]: raise ValidationException(f"Unknown DB storage type {db_storage_type}") if db_instance_type not in [t.value for t in InstanceType]: raise ValidationException(f"Unknown DB instance type {db_instance_type}") new_instance = DBInstance( name, username, password, organization, bucket, db_instance_type, vpc_subnet_ids, vpc_security_group_ids, publicly_accessible, db_storage_type, allocated_storage, db_parameter_group_identifier, deployment_type, log_delivery_configuration, tags, port, network_type, self.region_name, self.account_id, self.endpoint_id, ) # add to the list self.db_instances[new_instance.id] = new_instance # add tags if tags: self.tag_resource(new_instance.arn, tags) return new_instance def delete_db_instance(self, id: str) -> DBInstance: if id not in self.db_instances: raise ResourceNotFoundException(f"DB Instance with id {id} not found") # mark as deleting self.db_instances[id].status = InstanceStatus.DELETING return self.db_instances.pop(id) def get_db_instance(self, id: str) -> DBInstance: if id not in self.db_instances: raise ResourceNotFoundException(f"DB Instance with id {id} not found") return self.db_instances[id] def list_db_instances(self) -> List[Dict[str, Any]]: """ Pagination is not yet implemented """ return [ { "allocatedStorage": instance.allocated_storage, "arn": instance.arn, "dbInstanceType": instance.db_instance_type, "dbStorageType": instance.db_storage_type, "deploymentType": instance.deployment_type, "endpoint": instance.endpoint, "id": instance.id, "name": instance.name, "networkType": instance.network_type, "port": instance.port, "status": instance.status, } for instance in self.db_instances.values() ] def tag_resource(self, resource_arn: str, tags: Dict[str, str]) -> None: tag_list = self.tagger.convert_dict_to_tags_input(tags) errmsg = self.tagger.validate_tags(tag_list) if errmsg: raise ValidationException(errmsg) self.tagger.tag_resource(resource_arn, tag_list) def untag_resource(self, resource_arn: str, tag_keys: List[str]) -> None: self.tagger.untag_resource_using_names(resource_arn, tag_keys) def list_tags_for_resource(self, resource_arn: str) -> Dict[str, str]: return self.tagger.get_tag_dict_for_resource(resource_arn) timestreaminfluxdb_backends = BackendDict( TimestreamInfluxDBBackend, "timestream-influxdb", additional_regions=[ "us-east-1", "us-east-2", "us-west-2", "eu-central-1", "eu-west-1", "ap-southeast-2", "ap-northeast-1", ], )
Memory