import logging import chromadb.test.property.strategies as strategies import hypothesis.strategies as st from chromadb.api import ClientAPI from chromadb.test.conftest import NOT_CLUSTER_ONLY, reset from chromadb.test.property import invariants from chromadb.test.utils.wait_for_version_increase import wait_for_version_increase from hypothesis import HealthCheck, given, settings collection_st = st.shared( strategies.collections(add_filterable_data=True, with_hnsw_params=True), key="coll", ) recordset_st = st.shared( strategies.recordsets(collection_st, max_size=1000), key="recordset" ) @settings( deadline=90000, suppress_health_check=[ HealthCheck.function_scoped_fixture, HealthCheck.large_base_example, HealthCheck.filter_too_much, ], ) # type: ignore @given( collection=collection_st, record_set=recordset_st, limit=st.integers(min_value=1, max_value=10), offset=st.integers(min_value=0, max_value=10), should_compact=st.booleans(), ) def test_get_limit_offset( caplog, client: ClientAPI, collection: strategies.Collection, record_set: dict, limit: int, offset: int, should_compact: bool, ) -> None: caplog.set_level(logging.ERROR) reset(client) coll = client.create_collection( name=collection.name, metadata=collection.metadata, # type: ignore embedding_function=collection.embedding_function, ) initial_version = coll.get_model()["version"] coll.add(**record_set) if not NOT_CLUSTER_ONLY: # Only wait for compaction if the size of the collection is # some minimal size if should_compact and len(invariants.wrap(record_set["ids"])) > 10: # Wait for the model to be updated wait_for_version_increase(client, collection.name, initial_version) result_ids = coll.get(offset=offset, limit=limit)["ids"] all_offset_ids = coll.get()["ids"] assert result_ids == all_offset_ids[offset : offset + limit]
Memory