import datetime
import threading
from typing import Any, Dict, Optional
from pymilvus.grpc_gen import common_pb2
from .constants import BOUNDED_TS, EVENTUALLY_TS, GUARANTEE_TIMESTAMP, ITERATOR_FIELD
from .singleton_utils import Singleton
from .types import get_consistency_level
from .utils import hybridts_to_unixtime
ConsistencyLevel = common_pb2.ConsistencyLevel
class GTsDict(metaclass=Singleton):
def __init__(self) -> None:
# collection id -> last write ts
self._last_write_ts_dict = {}
self._last_write_ts_dict_lock = threading.Lock()
def __repr__(self) -> str:
return self._last_write_ts_dict.__repr__()
def update(self, collection: int, ts: int):
# use lru later if necessary
with self._last_write_ts_dict_lock:
if ts > self._last_write_ts_dict.get(collection, 0):
self._last_write_ts_dict[collection] = ts
def get(self, collection: int):
return self._last_write_ts_dict.get(collection, 0)
# Return a GTsDict instance.
def _get_gts_dict():
return GTsDict()
# Update the last write ts of collection.
def update_collection_ts(collection: int, ts: int):
_get_gts_dict().update(collection, ts)
# Return a callback corresponding to the collection.
def update_ts_on_mutation(collection: int):
def _update(mutation_result: Any):
update_collection_ts(collection, mutation_result.timestamp)
return _update
# Get the last write ts of collection.
def get_collection_ts(collection: int):
return _get_gts_dict().get(collection)
# Get the last write timestamp of collection.
def get_collection_timestamp(collection: int):
ts = _get_gts_dict().get(collection)
return hybridts_to_unixtime(ts)
# Get the last write datetime of collection.
def get_collection_datetime(collection: int, tz: Optional[datetime.timezone] = None):
timestamp = get_collection_timestamp(collection)
return datetime.datetime.fromtimestamp(timestamp, tz=tz)
def get_eventually_ts():
return EVENTUALLY_TS
def get_bounded_ts():
return BOUNDED_TS
def construct_guarantee_ts(collection_name: str, kwargs: Dict):
if kwargs.get(ITERATOR_FIELD) is not None:
return True
consistency_level = kwargs.get("consistency_level")
use_default = consistency_level is None
if use_default:
# in case of the default consistency is Customized or Session,
# we set guarantee_timestamp to the cached mutation ts or 1
kwargs[GUARANTEE_TIMESTAMP] = get_collection_ts(collection_name) or get_eventually_ts()
return True
consistency_level = get_consistency_level(consistency_level)
kwargs["consistency_level"] = consistency_level
if consistency_level == ConsistencyLevel.Strong:
# Milvus will assign a newest ts.
kwargs[GUARANTEE_TIMESTAMP] = 0
elif consistency_level == ConsistencyLevel.Session:
# Using the last write ts of the collection.
# TODO: get a timestamp from server?
kwargs[GUARANTEE_TIMESTAMP] = get_collection_ts(collection_name) or get_eventually_ts()
elif consistency_level == ConsistencyLevel.Bounded:
# Milvus will assign ts according to the server timestamp and a configured time interval
kwargs[GUARANTEE_TIMESTAMP] = get_bounded_ts()
else:
# Users customize the consistency level, no modification on `guarantee_timestamp`.
kwargs.setdefault(GUARANTEE_TIMESTAMP, get_eventually_ts())
return use_default