# Copyright (C) 2019-2021 Zilliz. All rights reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except # in compliance with the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software distributed under the License # is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express # or implied. See the License for the specific language governing permissions and limitations under # the License. import copy import logging import pathlib import threading import time from typing import Callable, Tuple, Union from urllib import parse from pymilvus.client.check import is_legal_address, is_legal_host, is_legal_port from pymilvus.client.grpc_handler import GrpcHandler from pymilvus.exceptions import ( ConnectionConfigException, ConnectionNotExistException, ExceptionsMessage, ) from pymilvus.settings import Config logger = logging.getLogger(__name__) VIRTUAL_PORT = 443 def synchronized(func: Callable): """ Decorator in order to achieve thread-safe singleton class. """ func.__lock__ = threading.Lock() def lock_func(*args, **kwargs): with func.__lock__: return func(*args, **kwargs) return lock_func class SingleInstanceMetaClass(type): instance = None def __init__(cls, *args, **kwargs) -> None: super().__init__(*args, **kwargs) def __call__(cls, *args, **kwargs): if cls.instance: return cls.instance cls.instance = cls.__new__(cls) cls.instance.__init__(*args, **kwargs) return cls.instance @synchronized def __new__(cls, *args, **kwargs): return super().__new__(cls, *args, **kwargs) class ReconnectHandler: def __init__(self, conns: object, connection_name: str, kwargs: object) -> None: self.connection_name = connection_name self.conns = conns self._kwargs = kwargs self.is_idle_state = False self.reconnect_lock = threading.Lock() def check_state_and_reconnect_later(self): check_after_seconds = 3 logger.debug(f"state is idle, schedule reconnect in {check_after_seconds} seconds") time.sleep(check_after_seconds) if not self.is_idle_state: logger.debug("idle state changed, skip reconnect") return with self.reconnect_lock: logger.info("reconnect on idle state") self.is_idle_state = False try: logger.debug("try disconnecting old connection...") self.conns.disconnect(self.connection_name) except Exception: logger.warning("disconnect failed: {e}") finally: reconnected = False while not reconnected: try: logger.debug("try reconnecting...") self.conns.connect(self.connection_name, **self._kwargs) reconnected = True except Exception as e: logger.warning( f"reconnect failed: {e}, try again after {check_after_seconds} seconds" ) time.sleep(check_after_seconds) logger.info("reconnected") def reconnect_on_idle(self, state: object): logger.debug(f"state change to: {state}") with self.reconnect_lock: if state.value[1] != "idle": self.is_idle_state = False return self.is_idle_state = True threading.Thread(target=self.check_state_and_reconnect_later).start() class Connections(metaclass=SingleInstanceMetaClass): """Class for managing all connections of milvus. Used as a singleton in this module.""" def __init__(self) -> None: """Constructs a default milvus alias config default config will be read from env: MILVUS_URI and MILVUS_CONN_ALIAS with default value: default="localhost:19530" Read default connection config from environment variable: MILVUS_URI. Format is: [scheme://][<user>@<password>]host[:<port>] scheme is one of: http, https, or <empty> Examples: localhost localhost:19530 test_user@localhost:19530 http://test_userlocalhost:19530 https://test_user:password@localhost:19530 """ self._alias = {} self._connected_alias = {} self._env_uri = None if Config.MILVUS_URI != "": address, parsed_uri = self.__parse_address_from_uri(Config.MILVUS_URI) self._env_uri = (address, parsed_uri) default_conn_config = { "user": parsed_uri.username if parsed_uri.username is not None else "", "address": address, } else: default_conn_config = { "user": "", "address": f"{Config.DEFAULT_HOST}:{Config.DEFAULT_PORT}", } self.add_connection(**{Config.MILVUS_CONN_ALIAS: default_conn_config}) def __verify_host_port(self, host: str, port: Union[int, str]): if not is_legal_host(host): raise ConnectionConfigException(message=ExceptionsMessage.HostType) if not is_legal_port(port): raise ConnectionConfigException(message=ExceptionsMessage.PortType) if not 0 <= int(port) < 65535: msg = f"port number {port} out of range, valid range [0, 65535)" raise ConnectionConfigException(message=msg) def __parse_address_from_uri(self, uri: str) -> (str, parse.ParseResult): illegal_uri_msg = ( "Illegal uri: [{}], expected form 'http[s]://[user:password@]example.com[:12345]'" ) try: parsed_uri = parse.urlparse(uri) except Exception as e: raise ConnectionConfigException( message=f"{illegal_uri_msg.format(uri)}: <{type(e).__name__}, {e}>" ) from None if len(parsed_uri.netloc) == 0: raise ConnectionConfigException(message=f"{illegal_uri_msg.format(uri)}") from None host = parsed_uri.hostname if parsed_uri.hostname is not None else Config.DEFAULT_HOST default_port = "443" if parsed_uri.scheme == "https" else Config.DEFAULT_PORT port = parsed_uri.port if parsed_uri.port is not None else default_port addr = f"{host}:{port}" self.__verify_host_port(host, port) if not is_legal_address(addr): raise ConnectionConfigException(message=illegal_uri_msg.format(uri)) return addr, parsed_uri def add_connection(self, **kwargs): """Configures a milvus connection. Addresses priority in kwargs: address, uri, host and port :param kwargs: * *address* (``str``) -- Optional. The actual address of Milvus instance. Example address: "localhost:19530" * *uri* (``str``) -- Optional. The uri of Milvus instance. Example uri: "http://localhost:19530", "tcp:localhost:19530", "https://ok.s3.south.com:19530". * *host* (``str``) -- Optional. The host of Milvus instance. Default at "localhost", PyMilvus will fill in the default host if only port is provided. * *port* (``str/int``) -- Optional. The port of Milvus instance. Default at 19530, PyMilvus will fill in the default port if only host is provided. Example:: connections.add_connection( default={"host": "localhost", "port": "19530"}, dev1={"host": "localhost", "port": "19531"}, dev2={"uri": "http://random.com/random"}, dev3={"uri": "http://localhost:19530"}, dev4={"uri": "tcp://localhost:19530"}, dev5={"address": "localhost:19530"}, prod={"uri": "http://random.random.random.com:19530"}, ) """ for alias, config in kwargs.items(): addr, parsed_uri = self.__get_full_address( config.get("address", ""), config.get("uri", ""), config.get("host", ""), config.get("port", ""), ) if alias in self._connected_alias and self._alias[alias].get("address") != addr: raise ConnectionConfigException(message=ExceptionsMessage.ConnDiffConf % alias) alias_config = { "address": addr, "user": config.get("user", ""), } if parsed_uri is not None and parsed_uri.scheme == "https": alias_config["secure"] = True self._alias[alias] = alias_config def __get_full_address( self, address: str = "", uri: str = "", host: str = "", port: str = "", ) -> (str, parse.ParseResult): if address != "": if not is_legal_address(address): raise ConnectionConfigException( message=f"Illegal address: {address}, should be in form 'localhost:19530'" ) return address, None if uri != "": if isinstance(uri, str) and uri.startswith("unix:"): return uri, None address, parsed = self.__parse_address_from_uri(uri) return address, parsed _host = host if host != "" else Config.DEFAULT_HOST _port = port if port != "" else Config.DEFAULT_PORT self.__verify_host_port(_host, _port) addr = f"{_host}:{_port}" if not is_legal_address(addr): raise ConnectionConfigException( message=f"Illegal host: {host} or port: {port}, should be in form of '111.1.1.1', '19530'" ) return addr, None def disconnect(self, alias: str): """Disconnects connection from the registry. :param alias: The name of milvus connection :type alias: str """ if not isinstance(alias, str): raise ConnectionConfigException(message=ExceptionsMessage.AliasType % type(alias)) if alias in self._connected_alias: self._connected_alias.pop(alias).close() def remove_connection(self, alias: str): """Removes connection from the registry. :param alias: The name of milvus connection :type alias: str """ if not isinstance(alias, str): raise ConnectionConfigException(message=ExceptionsMessage.AliasType % type(alias)) self.disconnect(alias) self._alias.pop(alias, None) def connect( self, alias: str = Config.MILVUS_CONN_ALIAS, user: str = "", password: str = "", db_name: str = "default", token: str = "", **kwargs, ) -> None: """ Constructs a milvus connection and register it under given alias. :param alias: The name of milvus connection :type alias: str :param kwargs: * *address* (``str``) -- Optional. The actual address of Milvus instance. Example address: "localhost:19530" * *uri* (``str``) -- Optional. The uri of Milvus instance. Example uri: "http://localhost:19530", "tcp:localhost:19530", "https://ok.s3.south.com:19530". * *host* (``str``) -- Optional. The host of Milvus instance. Default at "localhost", PyMilvus will fill in the default host if only port is provided. * *port* (``str/int``) -- Optional. The port of Milvus instance. Default at 19530, PyMilvus will fill in the default port if only host is provided. * *secure* (``bool``) -- Optional. Default is false. If set to true, tls will be enabled. * *user* (``str``) -- Optional. Use which user to connect to Milvus instance. If user and password are provided, we will add related header in every RPC call. * *password* (``str``) -- Optional and required when user is provided. The password corresponding to the user. * *token* (``str``) -- Optional. Serving as the key for identification and authentication purposes. Whenever a token is furnished, we shall supplement the corresponding header to each RPC call. * *keep_alive* (``bool``) -- Optional. Default is false. If set to true, client will keep an alive connection. * *db_name* (``str``) -- Optional. default database name of this connection * *client_key_path* (``str``) -- Optional. If use tls two-way authentication, need to write the client.key path. * *client_pem_path* (``str``) -- Optional. If use tls two-way authentication, need to write the client.pem path. * *ca_pem_path* (``str``) -- Optional. If use tls two-way authentication, need to write the ca.pem path. * *server_pem_path* (``str``) -- Optional. If use tls one-way authentication, need to write the server.pem path. * *server_name* (``str``) -- Optional. If use tls, need to write the common name. :raises NotImplementedError: If handler in connection parameters is not GRPC. :raises ParamError: If pool in connection parameters is not supported. :raises Exception: If server specified in parameters is not ready, we cannot connect to server. :example: >>> from pymilvus import connections >>> connections.connect("test", host="localhost", port="19530") """ if kwargs.get("uri") and parse.urlparse(kwargs["uri"]).scheme.lower() not in [ "unix", "http", "https", "tcp", "grpc", ]: # start and connect milvuslite if not kwargs["uri"].endswith(".db"): raise ConnectionConfigException( message=f"uri: {kwargs['uri']} is illegal, needs start with [unix, http, https, tcp] or a local file endswith [.db]" ) logger.info(f"Pass in the local path {kwargs['uri']}, and run it using milvus-lite") parent_path = pathlib.Path(kwargs["uri"]).parent if not parent_path.is_dir(): raise ConnectionConfigException( message=f"Open local milvus failed, dir: {parent_path} not exists" ) from milvus_lite.server_manager import ( server_manager_instance, ) local_uri = server_manager_instance.start_and_get_uri(kwargs["uri"]) if local_uri is None: raise ConnectionConfigException(message="Open local milvus failed") kwargs["uri"] = local_uri # kwargs_copy is used for auto reconnect kwargs_copy = copy.deepcopy(kwargs) kwargs_copy["user"] = user kwargs_copy["password"] = password kwargs_copy["db_name"] = db_name kwargs_copy["token"] = token def connect_milvus(**kwargs): gh = GrpcHandler(**kwargs) t = kwargs.get("timeout") timeout = t if isinstance(t, (int, float)) else Config.MILVUS_CONN_TIMEOUT gh._wait_for_channel_ready(timeout=timeout) if kwargs.get("keep_alive", False): gh.register_state_change_callback( ReconnectHandler(self, alias, kwargs_copy).reconnect_on_idle ) kwargs.pop("password") kwargs.pop("token", None) kwargs.pop("db_name", "") self._connected_alias[alias] = gh self._alias[alias] = copy.deepcopy(kwargs) def with_config(config: Tuple) -> bool: return any(c != "" for c in config) if not isinstance(alias, str): raise ConnectionConfigException(message=ExceptionsMessage.AliasType % type(alias)) config = ( kwargs.pop("address", ""), kwargs.pop("uri", ""), kwargs.pop("host", ""), kwargs.pop("port", ""), ) # Make sure passed in None doesnt break user, password, token = str(user) or "", str(password) or "", str(token) or "" # 1st Priority: connection from params if with_config(config): addr, parsed_uri = self.__get_full_address(*config) kwargs["address"] = addr if self.has_connection(alias) and self._alias[alias].get("address") != addr: raise ConnectionConfigException(message=ExceptionsMessage.ConnDiffConf % alias) # uri might take extra info if parsed_uri is not None: # get db_name from uri user = parsed_uri.username or user password = parsed_uri.password or password group = parsed_uri.path.split("/") db_name = group[1] if len(group) > 1 else db_name # Set secure=True if https scheme if parsed_uri.scheme == "https": kwargs["secure"] = True connect_milvus(**kwargs, user=user, password=password, token=token, db_name=db_name) return # 2nd Priority, connection configs from env if self._env_uri is not None: addr, parsed_uri = self._env_uri kwargs["address"] = addr user = parsed_uri.username if parsed_uri.username is not None else "" password = parsed_uri.password if parsed_uri.password is not None else "" # Set secure=True if https scheme if parsed_uri.scheme == "https": kwargs["secure"] = True connect_milvus(**kwargs, user=user, password=password, db_name=db_name) return # 3rd Priority, connect to cached configs with provided user and password if alias in self._alias: connect_alias = dict(self._alias[alias].items()) connect_alias["user"] = user connect_milvus(**connect_alias, password=password, db_name=db_name, **kwargs) return # No params, env, and cached configs for the alias raise ConnectionConfigException(message=ExceptionsMessage.ConnLackConf % alias) def list_connections(self) -> list: """List names of all connections. :return list: Names of all connections. :example: >>> from pymilvus import connections >>> connections.connect("test", host="localhost", port="19530") >>> connections.list_connections() """ return [(k, self._connected_alias.get(k, None)) for k in self._alias] def get_connection_addr(self, alias: str): """ Retrieves connection configure by alias. :param alias: The name of milvus connection :type alias: str :return dict: The connection configure which of the name is alias. If alias does not exist, return empty dict. :example: >>> from pymilvus import connections >>> connections.connect("test", host="localhost", port="19530") >>> connections.list_connections() >>> connections.get_connection_addr('test') {'host': 'localhost', 'port': '19530'} """ if not isinstance(alias, str): raise ConnectionConfigException(message=ExceptionsMessage.AliasType % type(alias)) return self._alias.get(alias, {}) def has_connection(self, alias: str) -> bool: """Check if connection named alias exists. :param alias: The name of milvus connection :type alias: str :return bool: if the connection of name alias exists. :example: >>> from pymilvus import connections >>> connections.connect("test", host="localhost", port="19530") >>> connections.list_connections() >>> connections.get_connection_addr('test') {'host': 'localhost', 'port': '19530'} """ if not isinstance(alias, str): raise ConnectionConfigException(message=ExceptionsMessage.AliasType % type(alias)) return alias in self._connected_alias def _fetch_handler(self, alias: str = Config.MILVUS_CONN_ALIAS) -> GrpcHandler: """Retrieves a GrpcHandler by alias.""" if not isinstance(alias, str): raise ConnectionConfigException(message=ExceptionsMessage.AliasType % type(alias)) conn = self._connected_alias.get(alias, None) if conn is None: raise ConnectionNotExistException(message=ExceptionsMessage.ConnectFirst) return conn # Singleton Mode in Python connections = Connections()
Memory