"""TimestreamQueryBackend class with methods for supported APIs.""" from typing import Any, Dict, List, Optional, Union from uuid import uuid4 from moto.core.base_backend import BackendDict, BaseBackend from moto.core.common_models import BaseModel from moto.core.utils import unix_time from moto.utilities.utils import get_partition from .exceptions import ResourceNotFound class ScheduledQuery(BaseModel): def __init__( self, account_id: str, region_name: str, name: str, query_string: str, schedule_configuration: Dict[str, str], notification_configuration: Dict[str, Dict[str, str]], target_configuration: Optional[Dict[str, Any]], scheduled_query_execution_role_arn: str, tags: Optional[List[Dict[str, str]]], kms_key_id: Optional[str], error_report_configuration: Optional[Dict[str, Dict[str, str]]], ): self.account_id = account_id self.region_name = region_name self.name = name self.query_string = query_string self.schedule_configuration = schedule_configuration self.notification_configuration = notification_configuration self.target_configuration = target_configuration self.scheduled_query_execution_role_arn = scheduled_query_execution_role_arn self.tags = tags self.kms_key_id = kms_key_id self.error_report_configuration = error_report_configuration self.created_on = unix_time() self.updated_on = unix_time() self.arn = f"arn:{get_partition(region_name)}:timestream:{region_name}:{account_id}:scheduled-query/{name}" self.state = "ENABLED" def description(self) -> Dict[str, Any]: return { "Arn": self.arn, "Name": self.name, "CreationTime": self.created_on, "State": self.state, "QueryString": self.query_string, "ScheduleConfiguration": self.schedule_configuration, "NotificationConfiguration": self.notification_configuration, "TargetConfiguration": self.target_configuration, "ScheduledQueryExecutionRoleArn": self.scheduled_query_execution_role_arn, "KmsKeyId": self.kms_key_id, "ErrorReportConfiguration": self.error_report_configuration, } class TimestreamQueryBackend(BaseBackend): """Implementation of TimestreamQuery APIs.""" def __init__(self, region_name: str, account_id: str): super().__init__(region_name, account_id) self.scheduled_queries: Dict[str, ScheduledQuery] = {} self.query_result_queue: Dict[Optional[str], List[Dict[str, Any]]] = {} self.query_results: Dict[str, Dict[str, Any]] = {} def create_scheduled_query( self, name: str, query_string: str, schedule_configuration: Dict[str, str], notification_configuration: Dict[str, Dict[str, str]], target_configuration: Optional[Dict[str, Any]], scheduled_query_execution_role_arn: str, tags: Optional[List[Dict[str, str]]], kms_key_id: Optional[str], error_report_configuration: Dict[str, Dict[str, str]], ) -> ScheduledQuery: query = ScheduledQuery( account_id=self.account_id, region_name=self.region_name, name=name, query_string=query_string, schedule_configuration=schedule_configuration, notification_configuration=notification_configuration, target_configuration=target_configuration, scheduled_query_execution_role_arn=scheduled_query_execution_role_arn, tags=tags, kms_key_id=kms_key_id, error_report_configuration=error_report_configuration, ) self.scheduled_queries[query.arn] = query return query def delete_scheduled_query(self, scheduled_query_arn: str) -> None: self.scheduled_queries.pop(scheduled_query_arn, None) def describe_scheduled_query(self, scheduled_query_arn: str) -> ScheduledQuery: if scheduled_query_arn not in self.scheduled_queries: raise ResourceNotFound(scheduled_query_arn) return self.scheduled_queries[scheduled_query_arn] def update_scheduled_query(self, scheduled_query_arn: str, state: str) -> None: query = self.scheduled_queries[scheduled_query_arn] query.state = state def query(self, query_string: str) -> Dict[str, Any]: """ Moto does not have a builtin time-series Database, so calling this endpoint will return zero results by default. You can use a dedicated API to configuring a queue of expected results. An example invocation looks like this: .. sourcecode:: python first_result = { 'QueryId': 'some_id', 'Rows': [...], 'ColumnInfo': [...], 'QueryStatus': ... } result_for_unknown_query_string = { 'QueryId': 'unknown', 'Rows': [...], 'ColumnInfo': [...], 'QueryStatus': ... } expected_results = { "account_id": "123456789012", # This is the default - can be omitted "region": "us-east-1", # This is the default - can be omitted "results": { # Use the exact querystring, and a list of results for it # For example "SELECT data FROM mytable": [first_result, ...], # Use None if the exact querystring is unknown/irrelevant None: [result_for_unknown_query_string, ...], } } requests.post( "http://motoapi.amazonaws.com/moto-api/static/timestream/query-results", json=expected_results, ) When calling `query(QueryString='SELECT data FROM mytable')`, the `first_result` will be returned. Call the query again for the second result, and so on. If you don't know the exact query strings, use the `None`-key. In the above example, when calling `SELECT something FROM unknown`, there are no results for that specific query, so `result_for_unknown_query_string` will be returned. Results for unknown queries are cached, so calling `SELECT something FROM unknown` will return the same result. """ if self.query_result_queue.get(query_string): return self.query_result_queue[query_string].pop(0) if result := self.query_results.get(query_string): return result if self.query_result_queue.get(None): self.query_results[query_string] = self.query_result_queue[None].pop(0) return self.query_results[query_string] return {"QueryId": str(uuid4()), "Rows": [], "ColumnInfo": []} def describe_endpoints(self) -> List[Dict[str, Union[str, int]]]: # https://docs.aws.amazon.com/timestream/latest/developerguide/Using-API.endpoint-discovery.how-it-works.html # Usually, the address look like this: # query-cell1.timestream.us-east-1.amazonaws.com # Where 'cell1' can be any number, 'cell2', 'cell3', etc - whichever endpoint happens to be available for that particular account # We don't implement a cellular architecture in Moto though, so let's keep it simple return [ { "Address": f"query.timestream.{self.region_name}.amazonaws.com", "CachePeriodInMinutes": 1440, } ] timestreamquery_backends = BackendDict( TimestreamQueryBackend, "timestream-query", additional_regions=[ "us-east-1", "us-east-2", "us-west-2", "eu-central-1", "eu-west-1", "ap-southeast-2", "ap-northeast-1", ], )
Memory