"""Handles incoming opensearch requests, invokes methods, returns responses.""" import json import re from typing import Any from moto.core.common_types import TYPE_RESPONSE from moto.core.responses import BaseResponse from moto.es.exceptions import InvalidDomainName from .models import OpenSearchServiceBackend, opensearch_backends class OpenSearchServiceResponse(BaseResponse): """Handler for OpenSearchService requests and responses.""" def __init__(self) -> None: super().__init__(service_name="opensearch") @property def opensearch_backend(self) -> OpenSearchServiceBackend: """Return backend instance specific for this region.""" return opensearch_backends[self.current_account][self.region] @classmethod def list_domains(cls, request: Any, full_url: str, headers: Any) -> TYPE_RESPONSE: # type: ignore response = cls() response.setup_class(request, full_url, headers) if request.method == "GET": return 200, {}, response.list_domain_names() if request.method == "POST": return 200, {}, response.describe_domains() @classmethod def domains(cls, request: Any, full_url: str, headers: Any) -> TYPE_RESPONSE: # type: ignore response = cls() response.setup_class(request, full_url, headers) if request.method == "POST": return 200, {}, response.create_domain() @classmethod def domain(cls, request: Any, full_url: str, headers: Any) -> TYPE_RESPONSE: # type: ignore response = cls() response.setup_class(request, full_url, headers) if request.method == "DELETE": return 200, {}, response.delete_domain() if request.method == "GET": return 200, {}, response.describe_domain() @classmethod def tags(cls, request: Any, full_url: str, headers: Any) -> TYPE_RESPONSE: # type: ignore response = cls() response.setup_class(request, full_url, headers) if request.method == "GET": return 200, {}, response.list_tags() if request.method == "POST": return 200, {}, response.add_tags() @classmethod def tag_removal(cls, request: Any, full_url: str, headers: Any) -> TYPE_RESPONSE: # type: ignore response = cls() response.setup_class(request, full_url, headers) if request.method == "POST": return 200, {}, response.remove_tags() def create_domain(self) -> str: domain_name = self._get_param("DomainName") if not re.match(r"^[a-z][a-z0-9\-]+$", domain_name): raise InvalidDomainName(domain_name) engine_version = self._get_param("EngineVersion") cluster_config = self._get_param("ClusterConfig") ebs_options = self._get_param("EBSOptions") access_policies = self._get_param("AccessPolicies") snapshot_options = self._get_param("SnapshotOptions") vpc_options = self._get_param("VPCOptions") cognito_options = self._get_param("CognitoOptions") encryption_at_rest_options = self._get_param("EncryptionAtRestOptions") node_to_node_encryption_options = self._get_param("NodeToNodeEncryptionOptions") advanced_options = self._get_param("AdvancedOptions") log_publishing_options = self._get_param("LogPublishingOptions") domain_endpoint_options = self._get_param("DomainEndpointOptions") advanced_security_options = self._get_param("AdvancedSecurityOptions") tag_list = self._get_param("TagList") auto_tune_options = self._get_param("AutoTuneOptions") off_peak_window_options = self._get_param("OffPeakWindowOptions") software_update_options = self._get_param("SoftwareUpdateOptions") # ElasticSearch specific options is_es = self.parsed_url.path.endswith("/es/domain") elasticsearch_version = self._get_param("ElasticsearchVersion") elasticsearch_cluster_config = self._get_param("ElasticsearchClusterConfig") domain = self.opensearch_backend.create_domain( domain_name=domain_name, engine_version=engine_version, cluster_config=cluster_config, ebs_options=ebs_options, access_policies=access_policies, snapshot_options=snapshot_options, vpc_options=vpc_options, cognito_options=cognito_options, encryption_at_rest_options=encryption_at_rest_options, node_to_node_encryption_options=node_to_node_encryption_options, advanced_options=advanced_options, log_publishing_options=log_publishing_options, domain_endpoint_options=domain_endpoint_options, advanced_security_options=advanced_security_options, tag_list=tag_list, auto_tune_options=auto_tune_options, off_peak_window_options=off_peak_window_options, software_update_options=software_update_options, is_es=is_es, elasticsearch_version=elasticsearch_version, elasticsearch_cluster_config=elasticsearch_cluster_config, ) return json.dumps(dict(DomainStatus=domain.to_dict())) def get_compatible_versions(self) -> str: domain_name = self._get_param("domainName") compatible_versions = self.opensearch_backend.get_compatible_versions( domain_name=domain_name, ) return json.dumps(dict(CompatibleVersions=compatible_versions)) def delete_domain(self) -> str: domain_name = self.path.split("/")[-1] domain = self.opensearch_backend.delete_domain( domain_name=domain_name, ) return json.dumps(dict(DomainStatus=domain.to_dict())) def describe_domain(self) -> str: domain_name = self.path.split("/")[-1] if not re.match(r"^[a-z][a-z0-9\-]+$", domain_name): raise InvalidDomainName(domain_name) domain = self.opensearch_backend.describe_domain( domain_name=domain_name, ) return json.dumps(dict(DomainStatus=domain.to_dict())) def describe_domain_config(self) -> str: domain_name = self.path.split("/")[-1] domain = self.opensearch_backend.describe_domain_config( domain_name=domain_name, ) return json.dumps(dict(DomainConfig=domain.to_config_dict())) def update_domain_config(self) -> str: domain_name = self._get_param("DomainName") cluster_config = self._get_param("ClusterConfig") ebs_options = self._get_param("EBSOptions") access_policies = self._get_param("AccessPolicies") snapshot_options = self._get_param("SnapshotOptions") vpc_options = self._get_param("VPCOptions") cognito_options = self._get_param("CognitoOptions") encryption_at_rest_options = self._get_param("EncryptionAtRestOptions") node_to_node_encryption_options = self._get_param("NodeToNodeEncryptionOptions") advanced_options = self._get_param("AdvancedOptions") log_publishing_options = self._get_param("LogPublishingOptions") domain_endpoint_options = self._get_param("DomainEndpointOptions") advanced_security_options = self._get_param("AdvancedSecurityOptions") auto_tune_options = self._get_param("AutoTuneOptions") off_peak_window_options = self._get_param("OffPeakWindowOptions") software_update_options = self._get_param("SoftwareUpdateOptions") domain = self.opensearch_backend.update_domain_config( domain_name=domain_name, cluster_config=cluster_config, ebs_options=ebs_options, access_policies=access_policies, snapshot_options=snapshot_options, vpc_options=vpc_options, cognito_options=cognito_options, encryption_at_rest_options=encryption_at_rest_options, node_to_node_encryption_options=node_to_node_encryption_options, advanced_options=advanced_options, log_publishing_options=log_publishing_options, domain_endpoint_options=domain_endpoint_options, advanced_security_options=advanced_security_options, auto_tune_options=auto_tune_options, off_peak_window_options=off_peak_window_options, software_update_options=software_update_options, ) return json.dumps(dict(DomainConfig=domain.to_config_dict())) def list_tags(self) -> str: arn = self._get_param("arn") tags = self.opensearch_backend.list_tags(arn) return json.dumps({"TagList": tags}) def add_tags(self) -> str: arn = self._get_param("ARN") tags = self._get_param("TagList") self.opensearch_backend.add_tags(arn, tags) return "{}" def remove_tags(self) -> str: arn = self._get_param("ARN") tag_keys = self._get_param("TagKeys") self.opensearch_backend.remove_tags(arn, tag_keys) return "{}" def list_domain_names(self) -> str: engine_type = self._get_param("engineType") domain_names = self.opensearch_backend.list_domain_names( engine_type=engine_type, ) return json.dumps(dict(DomainNames=domain_names)) def describe_domains(self) -> str: domain_names = self._get_param("DomainNames") domains = self.opensearch_backend.describe_domains( domain_names=domain_names, ) domain_list = [domain.to_dict() for domain in domains] return json.dumps({"DomainStatusList": domain_list})
Memory