import json from typing import List, Optional from moto.core.exceptions import JsonRESTError class OpensearchIngestionExceptions(JsonRESTError): pass class PipelineAlreadyExistsException(OpensearchIngestionExceptions): def __init__(self, pipeline_name: str): super().__init__( "ResourceAlreadyExistsException", f"Pipeline with given name {pipeline_name} already exists", ) self.description = json.dumps({"message": self.message}) class PipelineInvalidStateException(OpensearchIngestionExceptions): def __init__( self, action: str, valid_states: List[str], current_state: Optional[str] ): super().__init__( "ConflictException", f"Only pipelines with one of the following statuses are eligible for {action}: {valid_states}. The current status is {current_state}.", ) self.description = json.dumps({"message": self.message}) class PipelineNotFoundException(OpensearchIngestionExceptions): def __init__(self, pipeline_name: str): super().__init__( "ResourceNotFoundException", f"Pipeline {pipeline_name} could not be found." ) self.description = json.dumps({"message": self.message}) class PipelineValidationException(OpensearchIngestionExceptions): def __init__(self, message: str): super().__init__("ValidationException", message) self.description = json.dumps({"message": self.message}) class InvalidVPCOptionsException(OpensearchIngestionExceptions): def __init__(self, message: str) -> None: super().__init__("ValidationException", f"Invalid VpcOptions: {message}") self.description = json.dumps({"message": self.message}) class SubnetNotFoundException(InvalidVPCOptionsException): def __init__(self, subnet_id: str) -> None: super().__init__(f"The subnet ID {subnet_id} does not exist") class SecurityGroupNotFoundException(InvalidVPCOptionsException): def __init__(self, sg_id: str) -> None: super().__init__(f"The security group {sg_id} does not exist")
Memory