# SPDX-License-Identifier: Apache-2.0
#
# The OpenSearch Contributors require contributions made to
# this file be licensed under the Apache-2.0 license or a
# compatible open source license.
#
# Modifications Copyright OpenSearch Contributors. See
# GitHub history for details.
from typing import Any
import opensearchpy
from opensearchpy._async.helpers.actions import aiter
from opensearchpy.serializer import serializer
class AsyncConnections:
_conns: Any
"""
Class responsible for holding connections to different clusters. Used as a
singleton in this module.
"""
def __init__(self) -> None:
self._kwargs: Any = {}
self._conns: Any = {}
async def configure(self, **kwargs: Any) -> None:
"""
Configure multiple connections at once, useful for passing in config
dictionaries obtained from other sources, like Django's settings or a
configuration management tool.
Example::
async_connections.configure(
default={'hosts': 'localhost'},
dev={'hosts': ['opensearchdev1.example.com:9200'], 'sniff_on_start': True},
)
Connections will only be constructed lazily when requested through
``get_connection``.
"""
async for k in aiter(list(self._conns)):
# try and preserve existing client to keep the persistent connections alive
if k in self._kwargs and kwargs.get(k, None) == self._kwargs[k]:
continue
del self._conns[k]
self._kwargs = kwargs
async def add_connection(self, alias: str, conn: Any) -> None:
"""
Add a connection object, it will be passed through as-is.
"""
self._conns[alias] = conn
async def remove_connection(self, alias: str) -> None:
"""
Remove connection from the registry. Raises ``KeyError`` if connection
wasn't found.
"""
errors = 0
async for d in aiter((self._conns, self._kwargs)):
try:
del d[alias]
except KeyError:
errors += 1
if errors == 2:
raise KeyError(f"There is no connection with alias {alias!r}.")
async def create_connection(self, alias: str = "default", **kwargs: Any) -> Any:
"""
Construct an instance of ``opensearchpy.AsyncOpenSearch`` and register
it under given alias.
"""
kwargs.setdefault("serializer", serializer)
conn = self._conns[alias] = opensearchpy.AsyncOpenSearch(**kwargs)
return conn
async def get_connection(self, alias: str = "default") -> Any:
"""
Retrieve a connection, construct it if necessary (only configuration
was passed to us). If a non-string alias has been passed through we
assume it's already a client instance and will just return it as-is.
Raises ``KeyError`` if no client (or its definition) is registered
under the alias.
"""
# do not check isinstance(AsyncOpenSearch) so that people can wrap their
# clients
if not isinstance(alias, str):
return alias
# connection already established
try:
return self._conns[alias]
except KeyError:
pass
# if not, try to create it
try:
return await self.create_connection(alias, **self._kwargs[alias])
except KeyError:
# no connection and no kwargs to set one up
raise KeyError(f"There is no connection with alias {alias!r}.")
async_connections = AsyncConnections()
configure = async_connections.configure
add_connection = async_connections.add_connection
remove_connection = async_connections.remove_connection
create_connection = async_connections.create_connection
get_connection = async_connections.get_connection