"""Handles incoming kinesisanalyticsv2 requests, invokes methods, returns responses.""" import json from moto.core.responses import BaseResponse from .models import KinesisAnalyticsV2Backend, kinesisanalyticsv2_backends class KinesisAnalyticsV2Response(BaseResponse): """Handler for KinesisAnalyticsV2 requests and responses.""" def __init__(self) -> None: super().__init__(service_name="kinesisanalyticsv2") @property def kinesisanalyticsv2_backend(self) -> KinesisAnalyticsV2Backend: """Return backend instance specific for this region.""" return kinesisanalyticsv2_backends[self.current_account][self.region] def create_application(self) -> str: application_name = self._get_param("ApplicationName") application_description = self._get_param("ApplicationDescription") runtime_environment = self._get_param("RuntimeEnvironment") service_execution_role = self._get_param("ServiceExecutionRole") application_configuration = self._get_param("ApplicationConfiguration") cloud_watch_logging_options = self._get_param("CloudWatchLoggingOptions") tags = self._get_param("Tags") application_mode = self._get_param("ApplicationMode") application_detail = self.kinesisanalyticsv2_backend.create_application( application_name=application_name, application_description=application_description, runtime_environment=runtime_environment, service_execution_role=service_execution_role, application_configuration=application_configuration, cloud_watch_logging_options=cloud_watch_logging_options, tags=tags, application_mode=application_mode, ) return json.dumps(dict(ApplicationDetail=application_detail)) def list_tags_for_resource(self) -> str: params = json.loads(self.body) resource_arn = params.get("ResourceARN") tags = self.kinesisanalyticsv2_backend.list_tags_for_resource( resource_arn=resource_arn, ) return json.dumps(dict(Tags=tags)) def tag_resource(self) -> str: params = json.loads(self.body) resource_arn = params.get("ResourceARN") tags = params.get("Tags") self.kinesisanalyticsv2_backend.tag_resource( resource_arn=resource_arn, tags=tags ) return json.dumps(dict()) def describe_application(self) -> str: application_name = self._get_param("ApplicationName") application_detail = self.kinesisanalyticsv2_backend.describe_application( application_name=application_name, ) return json.dumps(dict(ApplicationDetail=application_detail)) def list_applications(self) -> str: application_summaries = self.kinesisanalyticsv2_backend.list_applications() return json.dumps(dict(ApplicationSummaries=application_summaries))
Memory