from overrides import overrides from chromadb.api.client import Client from chromadb.config import System import hypothesis.strategies as st from hypothesis.stateful import ( rule, run_state_machine_as_test, initialize, ) from chromadb.test.property.test_embeddings import ( EmbeddingStateMachineBase, EmbeddingStateMachineStates, trace, ) import chromadb.test.property.strategies as strategies collection_persistent_st = st.shared( strategies.collections( with_hnsw_params=True, with_persistent_hnsw_params=st.just(True), # Makes it more likely to find persist-related bugs (by default these are set to 2000). max_hnsw_batch_size=10, max_hnsw_sync_threshold=10, ), key="coll_persistent", ) # This machine shares a lot of similarity with the machine in chromadb/test/property/test_persist.py. # However, test_persist.py tests correctness under complete process isolation and therefore can only check invariants on a new system--whereas this machine does not have full process isolation between systems/clients but after a restart continues to exercise the state machine with the newly-created system. class RestartablePersistedEmbeddingStateMachine(EmbeddingStateMachineBase): system: System def __init__(self, system: System) -> None: self.system = system client = Client.from_system(system) super().__init__(client) @initialize(collection=collection_persistent_st) # type: ignore @overrides def initialize(self, collection: strategies.Collection): self.client.reset() self.collection = self.client.create_collection( name=collection.name, metadata=collection.metadata, # type: ignore embedding_function=collection.embedding_function, ) self.embedding_function = collection.embedding_function trace("init") self.on_state_change(EmbeddingStateMachineStates.initialize) self.record_set_state = strategies.StateMachineRecordSet( ids=[], metadatas=[], documents=[], embeddings=[] ) @rule() def restart_system(self) -> None: # Simulates restarting chromadb self.system.stop() self.system = System(self.system.settings) self.system.start() self.client.clear_system_cache() self.client = Client.from_system(self.system) self.collection = self.client.get_collection( self.collection.name, embedding_function=self.embedding_function ) @overrides def teardown(self) -> None: super().teardown() # Need to manually stop the system to cleanup resources because we may have created a new system (above rule). # Normally, we wouldn't have to worry about this as the system from the fixture is shared between state machine runs. # (This helps avoid a "too many open files" error.) self.system.stop() def test_restart_persisted_client(sqlite_persistent: System) -> None: run_state_machine_as_test( lambda: RestartablePersistedEmbeddingStateMachine(sqlite_persistent), ) # type: ignore
Memory