from __future__ import annotations import random from datetime import datetime from typing import Any, Dict, List, Optional from moto.core.base_backend import BackendDict, BaseBackend from moto.core.common_models import BaseModel from moto.moto_api._internal import mock_random from moto.utilities.tagging_service import TaggingService FAKE_VPC_ID = "vpc-0123456789abcdef0" class Application(BaseModel): def __init__( self, account_id: str, region_name: str, application_name: str, application_description: Optional[str], runtime_environment: str, service_execution_role: str, application_configuration: Optional[Dict[str, Any]], cloud_watch_logging_options: Optional[List[Dict[str, str]]], application_mode: Optional[str], ): self.account_id = account_id self.region_name = region_name self.application_name = application_name self.application_description = application_description self.runtime_environment = runtime_environment self.service_execution_role = service_execution_role self.application_mode = application_mode self.app_config_description = ( self._generate_app_config_description(application_configuration) if application_configuration else None ) self.cloud_watch_logging_description = self._generate_logging_options( cloud_watch_logging_options ) self.application_arn = self._generate_arn() self.application_status = "STARTING" self.application_version_id = 1 self.creation_date_time = datetime.now().isoformat() self.last_updated_date_time = datetime.now().isoformat() self.conditional_token = str(mock_random.uuid4()).replace("-", "") def _generate_arn(self) -> str: return f"arn:aws:kinesisanalytics:{self.region_name}:{self.account_id}:application/{self.application_name}" def _generate_logging_options( self, cloud_watch_logging_options: Optional[List[Dict[str, str]]] ) -> List[Dict[str, str]] | None: cloud_watch_logging_option_descriptions = [] option_id = f"{str(random.randint(1,100))}.1" # Leaving out RoleARN since it is provided only sometimes for backwards # compatibility. Current API versions do not have the resource-level # role. if cloud_watch_logging_options: for i in cloud_watch_logging_options: cloud_watch_logging_option_descriptions.append( { "CloudWatchLoggingOptionId": option_id, "LogStreamARN": i["LogStreamARN"], } ) return cloud_watch_logging_option_descriptions else: return None # The app_config description does not include # - "SqlApplicationConfigurationDescription" (discontinued) # - "RunConfigurationDescription" (which requires start_application) def _generate_app_config_description( self, app_config: Dict[str, Any] ) -> Dict[str, Any]: # Keys that do not have extra values in the description besides renamed keys UPDATABLE_APP_CONFIG_TOP_LEVEL_KEYS = { "EnvironmentProperties": "EnvironmentPropertyDescriptions", "ApplicationSnapshotConfiguration": "ApplicationSnapshotConfigurationDescription", "ApplicationSystemRollbackConfiguration": "ApplicationSystemRollbackConfigurationDescription", "ZeppelinApplicationConfiguration": "ZeppelinApplicationConfigurationDescription", } APP_CONFIG_SUBFIELD_KEYS = { "PropertyGroups": "PropertyGroupDescriptions", "MonitoringConfiguration": "MonitoringConfigurationDescription", "CatalogConfiguration": "CatalogConfigurationDescription", "DeployAsApplicationConfiguration": "DeployAsApplicationConfigurationDescription", "S3ContentLocation": "S3ContentLocationDescription", "CustomArtifactsConfiguration": "CustomArtifactsConfigurationDescription", "GlueDataCatalogConfiguration": "GlueDataCatalogConfigurationDescription", "MavenReference": "MavenReferenceDescription", } app_config_description = {} if app_config: if "FlinkApplicationConfiguration" in app_config: app_config_description["FlinkApplicationConfigurationDescription"] = ( self.__generate_flink_app_description(app_config) ) for old_key, new_key in UPDATABLE_APP_CONFIG_TOP_LEVEL_KEYS.items(): if old_key in app_config: app_config_description[new_key] = self.__update_keys( app_config[old_key], APP_CONFIG_SUBFIELD_KEYS ) app_code_config = app_config.get("ApplicationCodeConfiguration") if app_code_config: new_key = "ApplicationCodeConfigurationDescription" # S3ContentLocation has a different value, so keeping it # separate from APP_CONFIG_SUBFIELD_KEYS app_code_config_keys = { "S3ContentLocation": "S3ApplicationCodeLocationDescription", "CodeContent": "CodeContentDescription", } app_config_description[new_key] = self.__update_keys( app_code_config, app_code_config_keys ) if app_code_config["CodeContentType"] == "ZIPFILE": app_config_description[new_key]["CodeContentDescription"][ "CodeMD5" ] = "fakechecksum" app_config_description[new_key]["CodeContentDescription"][ "CodeSize" ] = 123 if "VpcConfigurations" in app_config: app_config_description["VpcConfigurationDescriptions"] = app_config[ "VpcConfigurations" ] for index, vpc_config in enumerate( app_config_description["VpcConfigurationDescriptions"] ): vpc_config["VpcConfigurationId"] = str(index + 1.1) # type: ignore[index] # FAKE_VPC_ID hardcoded, not a value from the parameters vpc_config["VpcId"] = FAKE_VPC_ID # type: ignore[index] return app_config_description def __generate_flink_app_description( self, app_config: Dict[str, Any] ) -> Dict[str, Any]: flink_config_description = {} flink_config = app_config.get("FlinkApplicationConfiguration") if flink_config and isinstance(flink_config, dict): checkpoint_config = flink_config.get("CheckpointConfiguration") if checkpoint_config and isinstance(checkpoint_config, dict): if checkpoint_config.get("ConfigurationType") == "DEFAULT": flink_config_description["CheckpointConfigurationDescription"] = { "ConfigurationType": "DEFAULT", "CheckpointingEnabled": True, "CheckpointInterval": 60000, "MinPauseBetweenCheckpoints": 5000, } elif checkpoint_config.get("ConfigurationType") == "CUSTOM": flink_config_description["CheckpointConfigurationDescription"] = { "ConfigurationType": "CUSTOM", "CheckpointingEnabled": checkpoint_config.get( "CheckpointingEnabled", True ), "CheckpointInterval": checkpoint_config.get( "CheckpointInterval", 60000 ), "MinPauseBetweenCheckpoints": checkpoint_config.get( "MinPauseBetweenCheckpoints", 5000 ), } monitoring_config = flink_config.get("MonitoringConfiguration") if monitoring_config and isinstance(monitoring_config, dict): if monitoring_config.get("ConfigurationType") == "DEFAULT": flink_config_description["MonitoringConfigurationDescription"] = { "ConfigurationType": "DEFAULT", "MetricsLevel": "APPLICATION", "LogLevel": "INFO", } elif monitoring_config.get("ConfigurationType") == "CUSTOM": flink_config_description["MonitoringConfigurationDescription"] = { "ConfigurationType": "CUSTOM", "MetricsLevel": monitoring_config.get( "MetricsLevel", "APPLICATION" ), "LogLevel": monitoring_config.get("LogLevel", "INFO"), } parallel_config = flink_config.get("ParallelismConfiguration") if monitoring_config and isinstance(parallel_config, dict): if parallel_config.get("ConfigurationType") == "DEFAULT": flink_config_description["ParallelismConfigurationDescription"] = { "ConfigurationType": "DEFAULT", "Parallelism": 1, "ParallelismPerKPU": 1, "AutoScalingEnabled": False, "CurrentParallelism": 1, } elif parallel_config.get("ConfigurationType") == "CUSTOM": flink_config_description["ParallelismConfigurationDescription"] = { "ConfigurationType": "CUSTOM", "Parallelism": parallel_config.get("Parallelism", 1), "ParallelismPerKPU": parallel_config.get( "ParallelismPerKPU", 1 ), "AutoScalingEnabled": parallel_config.get( "AutoScalingEnabled", False ), "CurrentParallelism": parallel_config.get("Parallelism", 1), } return flink_config_description def __update_keys(self, old_dict: Any, key_map: Dict[str, str]) -> Any: if not isinstance(old_dict, dict): return old_dict updated_dict = {} for old_key, value in old_dict.items(): # Check if the current key is in key_map, else keep old_key new_key = key_map.get(old_key, old_key) if isinstance(value, dict): updated_dict[new_key] = self.__update_keys(value, key_map) elif isinstance(value, list): updated_dict[new_key] = [ self.__update_keys(list_item, key_map) for list_item in value ] else: updated_dict[new_key] = value return updated_dict class KinesisAnalyticsV2Backend(BaseBackend): """Implementation of KinesisAnalyticsV2 APIs.""" def __init__(self, region_name: str, account_id: str) -> None: super().__init__(region_name, account_id) self.applications: Dict[str, Application] = {} self.tagger = TaggingService( tag_name="Tags", key_name="Key", value_name="Value" ) def create_application( self, application_name: str, application_description: Optional[str], runtime_environment: str, service_execution_role: str, application_configuration: Optional[Dict[str, Any]], cloud_watch_logging_options: Optional[List[Dict[str, str]]], tags: Optional[List[Dict[str, str]]], application_mode: Optional[str], ) -> Dict[str, Any]: app = Application( account_id=self.account_id, region_name=self.region_name, application_name=application_name, application_description=application_description, runtime_environment=runtime_environment, service_execution_role=service_execution_role, application_configuration=application_configuration, cloud_watch_logging_options=cloud_watch_logging_options, application_mode=application_mode, ) self.applications[application_name] = app if tags: self.tag_resource(resource_arn=app.application_arn, tags=tags) return { "ApplicationARN": app.application_arn, "ApplicationDescription": app.application_description, "RuntimeEnvironment": app.runtime_environment, "ServiceExecutionRole": app.service_execution_role, "ApplicationStatus": app.application_status, "ApplicationVersionId": app.application_version_id, "CreateTimestamp": app.creation_date_time, "LastUpdateTimestamp": app.last_updated_date_time, "ApplicationConfigurationDescription": app.app_config_description, "CloudWatchLoggingOptionDescriptions": app.cloud_watch_logging_description, "ApplicationMaintenanceConfigurationDescription": { "ApplicationMaintenanceWindowStartTime": "06:00", "ApplicationMaintenanceWindowEndTime": "14:00", }, "ApplicationVersionCreateTimestamp": str(app.creation_date_time), "ConditionalToken": app.conditional_token, "ApplicationMode": app.application_mode, } def tag_resource(self, resource_arn: str, tags: List[Dict[str, str]]) -> None: self.tagger.tag_resource(resource_arn, tags) def list_tags_for_resource(self, resource_arn: str) -> List[Dict[str, str]]: return self.tagger.list_tags_for_resource(resource_arn)["Tags"] def describe_application( self, application_name: str, ) -> Dict[str, Any]: app = self.applications[application_name] return { "ApplicationARN": app.application_arn, "ApplicationDescription": app.application_description, "RuntimeEnvironment": app.runtime_environment, "ServiceExecutionRole": app.service_execution_role, "ApplicationStatus": app.application_status, "ApplicationVersionId": app.application_version_id, "CreateTimestamp": app.creation_date_time, "LastUpdateTimestamp": app.last_updated_date_time, "ApplicationConfigurationDescription": app.app_config_description, "CloudWatchLoggingOptionDescriptions": app.cloud_watch_logging_description, "ApplicationMaintenanceConfigurationDescription": { "ApplicationMaintenanceWindowStartTime": "06:00", "ApplicationMaintenanceWindowEndTime": "14:00", }, "ApplicationVersionCreateTimestamp": str(app.creation_date_time), "ConditionalToken": app.conditional_token, "ApplicationMode": app.application_mode, } def list_applications(self) -> List[Dict[str, Any]]: application_summaries = [ { "ApplicationName": app.application_name, "ApplicationARN": app.application_arn, "ApplicationStatus": app.application_status, "ApplicationVersionId": app.application_version_id, "RuntimeEnvironment": app.runtime_environment, "ApplicationMode": app.application_mode, } for app in self.applications.values() ] return application_summaries kinesisanalyticsv2_backends = BackendDict( KinesisAnalyticsV2Backend, "kinesisanalyticsv2" )
Memory