"""Handles incoming osis requests, invokes methods, returns responses.""" import json from moto.core.responses import BaseResponse from .models import OpenSearchIngestionBackend, osis_backends class OpenSearchIngestionResponse(BaseResponse): """Handler for OpenSearchIngestion requests and responses.""" def __init__(self) -> None: super().__init__(service_name="osis") @property def osis_backend(self) -> OpenSearchIngestionBackend: """Return backend instance specific for this region.""" return osis_backends[self.current_account][self.region] def create_pipeline(self) -> str: params = json.loads(self.body) pipeline_name = params.get("PipelineName") min_units = params.get("MinUnits") max_units = params.get("MaxUnits") pipeline_configuration_body = params.get("PipelineConfigurationBody") log_publishing_options = params.get("LogPublishingOptions") vpc_options = params.get("VpcOptions") buffer_options = params.get("BufferOptions") encryption_at_rest_options = params.get("EncryptionAtRestOptions") tags = params.get("Tags") pipeline = self.osis_backend.create_pipeline( pipeline_name=pipeline_name, min_units=min_units, max_units=max_units, pipeline_configuration_body=pipeline_configuration_body, log_publishing_options=log_publishing_options, vpc_options=vpc_options, buffer_options=buffer_options, encryption_at_rest_options=encryption_at_rest_options, tags=tags, ) return json.dumps(dict(Pipeline=pipeline.to_dict())) def delete_pipeline(self) -> str: pipeline_name = self._get_param("PipelineName") self.osis_backend.delete_pipeline( pipeline_name=pipeline_name, ) return json.dumps(dict()) def get_pipeline(self) -> str: pipeline_name = self._get_param("PipelineName") pipeline = self.osis_backend.get_pipeline( pipeline_name=pipeline_name, ) return json.dumps(dict(Pipeline=pipeline.to_dict())) def list_pipelines(self) -> str: max_results = self._get_int_param("MaxResults") next_token = self._get_param("NextToken") pipelines, next_token = self.osis_backend.list_pipelines( max_results=max_results, next_token=next_token, ) return json.dumps( dict(nextToken=next_token, Pipelines=[p.to_short_dict() for p in pipelines]) ) def list_tags_for_resource(self) -> str: arn = self._get_param("arn") tags = self.osis_backend.list_tags_for_resource(arn=arn) return json.dumps(dict(tags)) def update_pipeline(self) -> str: params = json.loads(self.body) pipeline_name = self.path.split("/")[-1] min_units = params.get("MinUnits") max_units = params.get("MaxUnits") pipeline_configuration_body = params.get("PipelineConfigurationBody") log_publishing_options = params.get("LogPublishingOptions") buffer_options = params.get("BufferOptions") encryption_at_rest_options = params.get("EncryptionAtRestOptions") pipeline = self.osis_backend.update_pipeline( pipeline_name=pipeline_name, min_units=min_units, max_units=max_units, pipeline_configuration_body=pipeline_configuration_body, log_publishing_options=log_publishing_options, buffer_options=buffer_options, encryption_at_rest_options=encryption_at_rest_options, ) # TODO: adjust response return json.dumps(dict(Pipeline=pipeline.to_dict())) def tag_resource(self) -> str: params = json.loads(self.body) arn = self._get_param("arn") tags = params.get("Tags") self.osis_backend.tag_resource( arn=arn, tags=tags, ) return json.dumps(dict()) def untag_resource(self) -> str: params = json.loads(self.body) arn = self._get_param("arn") tag_keys = params.get("TagKeys") self.osis_backend.untag_resource( arn=arn, tag_keys=tag_keys, ) return json.dumps(dict()) def start_pipeline(self) -> str: pipeline_name = self._get_param("PipelineName") pipeline = self.osis_backend.start_pipeline( pipeline_name=pipeline_name, ) return json.dumps(dict(Pipeline=pipeline.to_dict())) def stop_pipeline(self) -> str: pipeline_name = self._get_param("PipelineName") pipeline = self.osis_backend.stop_pipeline( pipeline_name=pipeline_name, ) return json.dumps(dict(Pipeline=pipeline.to_dict()))
Memory