from typing import Optional, Sequence, TypeVar
from abc import abstractmethod
from chromadb.types import (
Collection,
MetadataEmbeddingRecord,
Operation,
RequestVersionContext,
VectorEmbeddingRecord,
Where,
WhereDocument,
VectorQuery,
VectorQueryResult,
Segment,
SeqId,
Metadata,
)
from chromadb.config import Component, System
from uuid import UUID
from enum import Enum
class SegmentType(Enum):
SQLITE = "urn:chroma:segment/metadata/sqlite"
HNSW_LOCAL_MEMORY = "urn:chroma:segment/vector/hnsw-local-memory"
HNSW_LOCAL_PERSISTED = "urn:chroma:segment/vector/hnsw-local-persisted"
HNSW_DISTRIBUTED = "urn:chroma:segment/vector/hnsw-distributed"
BLOCKFILE_RECORD = "urn:chroma:segment/record/blockfile"
BLOCKFILE_METADATA = "urn:chroma:segment/metadata/blockfile"
class SegmentImplementation(Component):
@abstractmethod
def __init__(self, sytstem: System, segment: Segment):
pass
@abstractmethod
def count(self, request_version_context: RequestVersionContext) -> int:
"""Get the number of embeddings in this segment"""
pass
@abstractmethod
def max_seqid(self) -> SeqId:
"""Get the maximum SeqID currently indexed by this segment"""
pass
@staticmethod
def propagate_collection_metadata(metadata: Metadata) -> Optional[Metadata]:
"""Given an arbitrary metadata map (e.g, from a collection), validate it and
return metadata (if any) that is applicable and should be applied to the
segment. Validation errors will be reported to the user."""
return None
@abstractmethod
def delete(self) -> None:
"""Delete the segment and all its data"""
...
S = TypeVar("S", bound=SegmentImplementation)
class MetadataReader(SegmentImplementation):
"""Embedding Metadata segment interface"""
@abstractmethod
def get_metadata(
self,
request_version_context: RequestVersionContext,
where: Optional[Where] = None,
where_document: Optional[WhereDocument] = None,
ids: Optional[Sequence[str]] = None,
limit: Optional[int] = None,
offset: Optional[int] = None,
include_metadata: bool = True,
) -> Sequence[MetadataEmbeddingRecord]:
"""Query for embedding metadata."""
pass
class VectorReader(SegmentImplementation):
"""Embedding Vector segment interface"""
@abstractmethod
def get_vectors(
self,
request_version_context: RequestVersionContext,
ids: Optional[Sequence[str]] = None,
) -> Sequence[VectorEmbeddingRecord]:
"""Get embeddings from the segment. If no IDs are provided, all embeddings are
returned."""
pass
@abstractmethod
def query_vectors(
self, query: VectorQuery
) -> Sequence[Sequence[VectorQueryResult]]:
"""Given a vector query, return the top-k nearest neighbors for vector in the
query."""
pass
class SegmentManager(Component):
"""Interface for a pluggable strategy for creating, retrieving and instantiating
segments as required"""
@abstractmethod
def prepare_segments_for_new_collection(self, collection: Collection) -> Sequence[Segment]:
"""Return the segments required for a new collection. Returns only segment data,
does not persist to the SysDB"""
pass
@abstractmethod
def delete_segments(self, collection_id: UUID) -> Sequence[UUID]:
"""Delete any local state for all the segments associated with a collection, and
returns a sequence of their IDs. Does not update the SysDB."""
pass
@abstractmethod
def hint_use_collection(self, collection_id: UUID, hint_type: Operation) -> None:
"""Signal to the segment manager that a collection is about to be used, so that
it can preload segments as needed. This is only a hint, and implementations are
free to ignore it."""
pass