from typing import ClassVar, Dict
import uuid
from chromadb.api import ServerAPI
from chromadb.config import Settings, System
from chromadb.telemetry.product import ProductTelemetryClient
from chromadb.telemetry.product.events import ClientStartEvent
class SharedSystemClient:
_identifier_to_system: ClassVar[Dict[str, System]] = {}
_identifier: str
def __init__(
self,
settings: Settings = Settings(),
) -> None:
self._identifier = SharedSystemClient._get_identifier_from_settings(settings)
SharedSystemClient._create_system_if_not_exists(self._identifier, settings)
@classmethod
def _create_system_if_not_exists(
cls, identifier: str, settings: Settings
) -> System:
if identifier not in cls._identifier_to_system:
new_system = System(settings)
cls._identifier_to_system[identifier] = new_system
new_system.instance(ProductTelemetryClient)
new_system.instance(ServerAPI)
new_system.start()
else:
previous_system = cls._identifier_to_system[identifier]
# For now, the settings must match
if previous_system.settings != settings:
raise ValueError(
f"An instance of Chroma already exists for {identifier} with different settings"
)
return cls._identifier_to_system[identifier]
@staticmethod
def _get_identifier_from_settings(settings: Settings) -> str:
identifier = ""
api_impl = settings.chroma_api_impl
if api_impl is None:
raise ValueError("Chroma API implementation must be set in settings")
elif api_impl == "chromadb.api.segment.SegmentAPI":
if settings.is_persistent:
identifier = settings.persist_directory
else:
identifier = (
"ephemeral" # TODO: support pathing and multiple ephemeral clients
)
elif api_impl in [
"chromadb.api.fastapi.FastAPI",
"chromadb.api.async_fastapi.AsyncFastAPI",
]:
# FastAPI clients can all use unique system identifiers since their configurations can be independent, e.g. different auth tokens
identifier = str(uuid.uuid4())
else:
raise ValueError(f"Unsupported Chroma API implementation {api_impl}")
return identifier
@staticmethod
def _populate_data_from_system(system: System) -> str:
identifier = SharedSystemClient._get_identifier_from_settings(system.settings)
SharedSystemClient._identifier_to_system[identifier] = system
return identifier
@classmethod
def from_system(cls, system: System) -> "SharedSystemClient":
"""Create a client from an existing system. This is useful for testing and debugging."""
SharedSystemClient._populate_data_from_system(system)
instance = cls(system.settings)
return instance
@staticmethod
def clear_system_cache() -> None:
SharedSystemClient._identifier_to_system = {}
@property
def _system(self) -> System:
return SharedSystemClient._identifier_to_system[self._identifier]
def _submit_client_start_event(self) -> None:
telemetry_client = self._system.instance(ProductTelemetryClient)
telemetry_client.capture(ClientStartEvent())