"""OpenSearchIngestionBackend class with methods for supported APIs."""
from datetime import datetime
from typing import TYPE_CHECKING, Any, ClassVar, Dict, List, Optional
import yaml
from moto.core.base_backend import BackendDict, BaseBackend
from moto.core.common_models import BaseModel
from moto.moto_api._internal import mock_random as random
from moto.moto_api._internal.managed_state_model import ManagedState
from moto.utilities.paginator import paginate
from moto.utilities.tagging_service import TaggingService
from moto.utilities.utils import get_partition
from .exceptions import (
InvalidVPCOptionsException,
PipelineAlreadyExistsException,
PipelineInvalidStateException,
PipelineNotFoundException,
SecurityGroupNotFoundException,
SubnetNotFoundException,
)
if TYPE_CHECKING:
from moto.ec2.models import EC2Backend
class Pipeline(ManagedState, BaseModel):
CREATING_REASON = "The pipeline is being created. It is not able to ingest data."
ACTIVE_REASON = "The pipeline is ready to ingest data."
DELETING_REASON = "The pipeline is being deleted"
STOPPING_REASON = "The pipeline is being stopped"
STOPPED_REASON = "The pipeline is stopped"
STARTING_REASON = "The pipeline is starting. It is not able to ingest data"
UPDATING_REASON = "An update was triggered for the pipeline. It is still available to ingest data."
STATUS_REASON_MAP: ClassVar[Dict[str, str]] = {
"CREATING": CREATING_REASON,
"ACTIVE": ACTIVE_REASON,
"STOPPING": STOPPING_REASON,
"STOPPED": STOPPED_REASON,
"STARTING": STARTING_REASON,
"UPDATING": UPDATING_REASON,
"DELETING": DELETING_REASON,
}
def __init__(
self,
pipeline_name: str,
account_id: str,
region: str,
min_units: int,
max_units: int,
pipeline_configuration_body: str,
log_publishing_options: Optional[Dict[str, Any]],
vpc_options: Optional[Dict[str, Any]],
buffer_options: Optional[Dict[str, Any]],
encryption_at_rest_options: Optional[Dict[str, Any]],
ingest_endpoint_urls: List[str],
serverless: bool,
vpc_endpoint_service: Optional[str],
vpc_endpoint: Optional[str],
vpc_id: Optional[str],
backend: "OpenSearchIngestionBackend",
):
ManagedState.__init__(
self,
model_name="osis::pipeline",
transitions=[
("CREATING", "ACTIVE"),
("UPDATING", "ACTIVE"),
("DELETING", "DELETED"),
("STOPPING", "STOPPED"),
("STARTING", "ACTIVE"),
],
)
self.pipeline_name = pipeline_name
self.account_id = account_id
self.region = region
self.min_units = min_units
self.max_units = max_units
self.pipeline_configuration_body_str = pipeline_configuration_body
self.pipeline_configuration_body = yaml.safe_load(pipeline_configuration_body)
self.log_publishing_options = log_publishing_options
self.vpc_options = vpc_options
self.buffer_options = buffer_options
self.encryption_at_rest_options = encryption_at_rest_options
self.ingest_endpoint_urls = ingest_endpoint_urls
self.serverless = serverless
self.vpc_endpoint_service = vpc_endpoint_service
self.vpc_endpoint = vpc_endpoint
self.vpc_id = vpc_id
self.backend = backend
self.status = "CREATING"
self.arn = self._get_arn(self.pipeline_name)
self.destinations = self._update_destinations()
if (
self.vpc_options is None
or self.vpc_options.get("VpcEndpointManagement", "SERVICE") == "SERVICE"
):
# Not returned in this case
self.vpc_endpoint_service = None
self.service_vpc_endpoints = self._get_service_vpc_endpoints()
self.created_at: datetime = datetime.now()
self.last_updated_at: datetime = datetime.now()
def _get_arn(self, name: str) -> str:
return f"arn:{get_partition(self.region)}:osis:{self.region}:{self.account_id}:pipeline/{name}"
def _get_service_vpc_endpoints(self) -> Optional[List[Dict[str, str]]]:
# ServiceVpcEndpoint.VpcEndpointId not implemented
if self.serverless:
return [{"ServiceName": "OPENSEARCH_SERVERLESS"}]
else:
return None
def _update_destinations(self) -> List[Dict[str, str]]:
destinations = []
for sub_pipeline in self.pipeline_configuration_body:
if sub_pipeline != "version":
for sink in self.pipeline_configuration_body[sub_pipeline]["sink"]:
for sink_type, sink_config in sink.items():
if sink_type == "opensearch":
if sink_config["aws"].get("serverless") is True:
service_name = "OpenSearch_Serverless"
else:
service_name = "OpenSearch"
endpoint = sink_config["hosts"][0]
elif sink_type == "s3":
service_name = "S3"
endpoint = sink_config["bucket"]
else:
continue
destinations.append(
{"ServiceName": service_name, "Endpoint": endpoint}
)
return destinations
@staticmethod
def is_serverless(pipeline_body: Dict[str, Any]) -> bool: # type: ignore[misc]
serverless = False
for sub_pipeline in pipeline_body:
if sub_pipeline != "version":
for sink in pipeline_body[sub_pipeline]["sink"]:
for _, sink_config in sink.items():
serverless = (
sink_config.get("aws", {}).get("serverless", False)
or serverless
)
source_type = list(pipeline_body[sub_pipeline]["source"].keys())[0]
source_config = pipeline_body[sub_pipeline]["source"][source_type]
serverless = (
source_config.get("aws", {}).get("serverless", False) or serverless
)
return serverless
def delete(self) -> None:
self.status = "DELETING"
self.set_last_updated()
def get_created_at(self) -> str:
return self.created_at.astimezone().isoformat()
def get_last_updated_at(self) -> str:
return self.last_updated_at.astimezone().isoformat()
def set_last_updated(self) -> None:
self.last_updated_at = datetime.now()
def start(self) -> None:
self.status = "STARTING"
self.set_last_updated()
def stop(self) -> None:
self.status = "STOPPING"
self.set_last_updated()
def update(
self,
min_units: Optional[int],
max_units: Optional[int],
pipeline_configuration_body: Optional[str],
log_publishing_options: Optional[Dict[str, Any]],
buffer_options: Optional[Dict[str, Any]],
encryption_at_rest_options: Optional[Dict[str, Any]],
) -> None:
if min_units is not None:
self.min_units = min_units
if max_units is not None:
self.max_units = max_units
if pipeline_configuration_body is not None:
self.pipeline_configuration_body_str = pipeline_configuration_body
self.pipeline_configuration_body = yaml.safe_load(
pipeline_configuration_body
)
if log_publishing_options is not None:
self.log_publishing_options = log_publishing_options
if buffer_options is not None:
self.buffer_options = buffer_options
if encryption_at_rest_options is not None:
self.encryption_at_rest_options = encryption_at_rest_options
self.destinations = self._update_destinations()
self.serverless = self.is_serverless(self.pipeline_configuration_body)
self.service_vpc_endpoints = self._get_service_vpc_endpoints()
self.status = "UPDATING"
self.set_last_updated()
def to_dict(self) -> Dict[str, Any]:
return {
"PipelineName": self.pipeline_name,
"PipelineArn": self.arn,
"MinUnits": self.min_units,
"MaxUnits": self.max_units,
"Status": self.status,
"StatusReason": {
"Description": self.STATUS_REASON_MAP.get(self.status or "", ""),
},
"PipelineConfigurationBody": self.pipeline_configuration_body_str,
"CreatedAt": self.get_created_at(),
"LastUpdatedAt": self.get_last_updated_at(),
"IngestEndpointUrls": self.ingest_endpoint_urls,
"LogPublishingOptions": self.log_publishing_options,
"VpcEndpoints": None
if self.vpc_options is None
else [
{
"VpcEndpointId": self.vpc_endpoint,
"VpcId": self.vpc_id,
"VpcOptions": self.vpc_options,
}
],
"BufferOptions": self.buffer_options,
"EncryptionAtRestOptions": self.encryption_at_rest_options,
"VpcEndpointService": self.vpc_endpoint_service,
"ServiceVpcEndpoints": self.service_vpc_endpoints,
"Destinations": self.destinations,
"Tags": self.backend.list_tags_for_resource(self.arn)["Tags"],
}
def to_short_dict(self) -> Dict[str, Any]:
return {
"Status": self.status,
"StatusReason": {
"Description": self.STATUS_REASON_MAP.get(self.status or "", ""),
},
"PipelineName": self.pipeline_name,
"PipelineArn": self.arn,
"MinUnits": self.min_units,
"MaxUnits": self.max_units,
"CreatedAt": self.get_created_at(),
"LastUpdatedAt": self.get_last_updated_at(),
"Destinations": self.destinations,
"Tags": self.backend.list_tags_for_resource(self.arn)["Tags"],
}
class OpenSearchIngestionBackend(BaseBackend):
"""Implementation of OpenSearchIngestion APIs."""
PAGINATION_MODEL = {
"list_pipelines": {
"input_token": "next_token",
"limit_key": "max_results",
"limit_default": 100,
"unique_attribute": "PipelineName",
},
}
PIPELINE_DELETE_VALID_STATES = [
"UPDATE_FAILED",
"ACTIVE",
"START_FAILED",
"STOPPED",
"CREATE_FAILED",
]
PIPELINE_STOP_VALID_STATES = ["UPDATE_FAILED", "ACTIVE"]
PIPELINE_START_VALID_STATES = ["START_FAILED", "STOPPED"]
PIPELINE_UPDATE_VALID_STATES = [
"UPDATE_FAILED",
"ACTIVE",
"START_FAILED",
"STOPPED",
]
def __init__(self, region_name: str, account_id: str):
super().__init__(region_name, account_id)
self._pipelines: Dict[str, Pipeline] = dict()
self.tagger = TaggingService()
@property
def ec2_backend(self) -> "EC2Backend": # type: ignore[misc]
from moto.ec2 import ec2_backends
return ec2_backends[self.account_id][self.region_name]
@property
def pipelines(self) -> Dict[str, Pipeline]:
self._pipelines = {
name: pipeline
for name, pipeline in self._pipelines.items()
if pipeline.status != "DELETED"
}
return self._pipelines
def _get_ingest_endpoint_urls(
self, pipeline_name: str, endpoint_random_string: str
) -> List[str]:
return [
f"{pipeline_name}-{endpoint_random_string}.{self.region_name}.osis.amazonaws.com"
]
def _get_random_endpoint_string(self) -> str:
return random.get_random_string(length=26, lower_case=True)
def _get_vpc_endpoint(
self, vpc_id: str, vpc_options: Dict[str, Any], service_name: str
) -> Optional[str]:
if vpc_options.get("VpcEndpointManagement", "SERVICE") == "SERVICE":
service_managed_endpoint = self.ec2_backend.create_vpc_endpoint(
vpc_id=vpc_id,
service_name=service_name,
endpoint_type="Interface",
security_group_ids=vpc_options.get("SecurityGroupIds"),
subnet_ids=vpc_options["SubnetIds"],
private_dns_enabled=False,
policy_document="OSIS Test Doc",
route_table_ids=[],
tags={"OSISManaged": "true"},
)
return service_managed_endpoint.id
else:
return None
def _get_vpc_endpoint_service(
self, pipeline_name: str, endpoint_random_string: str
) -> str:
return f"com.amazonaws.osis.{self.region_name}.{pipeline_name}-{endpoint_random_string}"
def _validate_and_get_vpc(self, vpc_options: Dict[str, Any]) -> str:
from moto.ec2.exceptions import InvalidSubnetIdError
vpc_id = ""
for subnet_id in vpc_options["SubnetIds"]:
try:
subnet = self.ec2_backend.get_subnet(subnet_id)
except InvalidSubnetIdError:
# re-raising for more accurate error message
raise SubnetNotFoundException(subnet_id)
if vpc_id == "":
vpc_id = subnet.vpc_id
else:
if subnet.vpc_id != vpc_id:
raise InvalidVPCOptionsException(
"All specified subnets must belong to the same VPC."
)
for sg_id in vpc_options["SecurityGroupIds"]:
sg = self.ec2_backend.get_security_group_from_id(sg_id)
if sg is None:
raise SecurityGroupNotFoundException(sg_id)
return vpc_id
def create_pipeline(
self,
pipeline_name: str,
min_units: int,
max_units: int,
pipeline_configuration_body: str,
log_publishing_options: Optional[Dict[str, Any]],
vpc_options: Optional[Dict[str, Any]],
buffer_options: Optional[Dict[str, bool]],
encryption_at_rest_options: Optional[Dict[str, Any]],
tags: List[Dict[str, str]],
) -> Pipeline:
if pipeline_name in self.pipelines:
raise PipelineAlreadyExistsException(pipeline_name)
serverless = Pipeline.is_serverless(yaml.safe_load(pipeline_configuration_body))
endpoint_random_string = self._get_random_endpoint_string()
endpoint_service = self._get_vpc_endpoint_service(
pipeline_name, endpoint_random_string
)
ingestion_endpoint_urls = self._get_ingest_endpoint_urls(
pipeline_name, endpoint_random_string
)
vpc_endpoint = None
vpc_id = None
if vpc_options is not None:
vpc_id = self._validate_and_get_vpc(vpc_options)
vpc_endpoint = self._get_vpc_endpoint(vpc_id, vpc_options, endpoint_service)
pipeline = Pipeline(
pipeline_name,
self.account_id,
self.region_name,
min_units,
max_units,
pipeline_configuration_body,
log_publishing_options,
vpc_options,
buffer_options,
encryption_at_rest_options,
ingestion_endpoint_urls,
serverless,
endpoint_service,
vpc_endpoint,
vpc_id,
backend=self,
)
self.pipelines[pipeline_name] = pipeline
self.tag_resource(pipeline.arn, tags)
return pipeline
def delete_pipeline(self, pipeline_name: str) -> None:
if pipeline_name not in self.pipelines:
raise PipelineNotFoundException(pipeline_name)
pipeline = self.pipelines[pipeline_name]
if pipeline.status not in self.PIPELINE_DELETE_VALID_STATES:
raise PipelineInvalidStateException(
"deletion", self.PIPELINE_DELETE_VALID_STATES, pipeline.status
)
pipeline.delete()
def start_pipeline(self, pipeline_name: str) -> Pipeline:
if pipeline_name not in self.pipelines:
raise PipelineNotFoundException(pipeline_name)
pipeline = self.pipelines[pipeline_name]
if pipeline.status not in self.PIPELINE_START_VALID_STATES:
raise PipelineInvalidStateException(
"starting", self.PIPELINE_START_VALID_STATES, pipeline.status
)
pipeline.start()
return pipeline
def stop_pipeline(self, pipeline_name: str) -> Pipeline:
if pipeline_name not in self.pipelines:
raise PipelineNotFoundException(pipeline_name)
pipeline = self.pipelines[pipeline_name]
if pipeline.status not in self.PIPELINE_STOP_VALID_STATES:
raise PipelineInvalidStateException(
"stopping", self.PIPELINE_STOP_VALID_STATES, pipeline.status
)
pipeline.stop()
return pipeline
def get_pipeline(self, pipeline_name: str) -> Pipeline:
if pipeline_name not in self.pipelines:
raise PipelineNotFoundException(pipeline_name)
pipeline = self.pipelines[pipeline_name]
pipeline.advance()
return pipeline
@paginate(pagination_model=PAGINATION_MODEL) # type: ignore
def list_pipelines(self) -> List[Pipeline]:
for pipeline in self.pipelines.values():
pipeline.advance()
return [p for p in self.pipelines.values()]
def list_tags_for_resource(self, arn: str) -> Dict[str, List[Dict[str, str]]]:
return self.tagger.list_tags_for_resource(arn)
def update_pipeline(
self,
pipeline_name: str,
min_units: Optional[int],
max_units: Optional[int],
pipeline_configuration_body: Optional[str],
log_publishing_options: Optional[Dict[str, Any]],
buffer_options: Optional[Dict[str, Any]],
encryption_at_rest_options: Optional[Dict[str, Any]],
) -> Pipeline:
if pipeline_name not in self.pipelines:
raise PipelineNotFoundException(pipeline_name)
pipeline = self.pipelines[pipeline_name]
if pipeline.status not in self.PIPELINE_UPDATE_VALID_STATES:
raise PipelineInvalidStateException(
"updates", self.PIPELINE_UPDATE_VALID_STATES, pipeline.status
)
pipeline.update(
min_units,
max_units,
pipeline_configuration_body,
log_publishing_options,
buffer_options,
encryption_at_rest_options,
)
return pipeline
def tag_resource(self, arn: str, tags: List[Dict[str, str]]) -> None:
self.tagger.tag_resource(arn, tags)
def untag_resource(self, arn: str, tag_keys: List[str]) -> None:
self.tagger.untag_resource_using_names(arn, tag_keys)
osis_backends = BackendDict(OpenSearchIngestionBackend, "osis")