from typing import Optional, List, Union import numpy as np from qdrant_client.http import models from qdrant_client.conversions import common_types as types from qdrant_client.local.distances import ( distance_to_order, DistanceOrder, calculate_distance, scaled_fast_sigmoid, EPSILON, fast_sigmoid, ) class MultiRecoQuery: def __init__( self, positive: Optional[List[List[List[float]]]] = None, # list of matrices negative: Optional[List[List[List[float]]]] = None, # list of matrices ): positive = positive if positive is not None else [] negative = negative if negative is not None else [] self.positive: List[types.NumpyArray] = [np.array(vector) for vector in positive] self.negative: List[types.NumpyArray] = [np.array(vector) for vector in negative] assert not np.isnan(self.positive).any(), "Positive vectors must not contain NaN" assert not np.isnan(self.negative).any(), "Negative vectors must not contain NaN" class MultiContextPair: def __init__(self, positive: List[List[float]], negative: List[List[float]]): self.positive: types.NumpyArray = np.array(positive) self.negative: types.NumpyArray = np.array(negative) assert not np.isnan(self.positive).any(), "Positive vector must not contain NaN" assert not np.isnan(self.negative).any(), "Negative vector must not contain NaN" class MultiDiscoveryQuery: def __init__(self, target: List[List[float]], context: List[MultiContextPair]): self.target: types.NumpyArray = np.array(target) self.context = context assert not np.isnan(self.target).any(), "Target vector must not contain NaN" class MultiContextQuery: def __init__(self, context_pairs: List[MultiContextPair]): self.context_pairs = context_pairs MultiQueryVector = Union[ MultiDiscoveryQuery, MultiContextQuery, MultiRecoQuery, ] def calculate_multi_distance( query_matrix: types.NumpyArray, matrices: List[types.NumpyArray], distance_type: models.Distance, ) -> types.NumpyArray: assert not np.isnan(query_matrix).any(), "Query matrix must not contain NaN" assert len(query_matrix.shape) == 2, "Query must be a matrix" reverse = distance_to_order(distance_type) == DistanceOrder.SMALLER_IS_BETTER similarities: List[float] = [] # max sim for matrix in matrices: sim_matrix = calculate_distance(query_matrix, matrix, distance_type) op = np.max if not reverse else np.min similarity = float(np.sum(op(sim_matrix, axis=-1))) similarities.append(similarity) return np.array(similarities) def calculate_multi_distance_core( query_matrix: types.NumpyArray, matrices: List[types.NumpyArray], distance_type: models.Distance, ) -> types.NumpyArray: def euclidean(m: types.NumpyArray, q: types.NumpyArray) -> types.NumpyArray: return -np.square(m - q, dtype=np.float32).sum(axis=1, dtype=np.float32) def manhattan(m: types.NumpyArray, q: types.NumpyArray) -> types.NumpyArray: return -np.abs(m - q, dtype=np.float32).sum(axis=1, dtype=np.float32) assert not np.isnan(query_matrix).any(), "Query vector must not contain NaN" if distance_type in [models.Distance.EUCLID, models.Distance.MANHATTAN]: query_matrix = query_matrix[:, np.newaxis] similarities: List[float] = [] dist_func = euclidean if distance_type == models.Distance.EUCLID else manhattan for matrix in matrices: sim_matrix = dist_func(matrix, query_matrix) similarity = float(np.sum(np.max(sim_matrix, axis=-1))) similarities.append(similarity) return np.array(similarities) return calculate_multi_distance(query_matrix, matrices, distance_type) def calculate_multi_recommend_best_scores( query: MultiRecoQuery, matrices: List[types.NumpyArray], distance_type: models.Distance ) -> types.NumpyArray: def get_best_scores(examples: List[types.NumpyArray]) -> types.NumpyArray: matrix_count = len(matrices) # Get scores to all examples scores: List[types.NumpyArray] = [] for example in examples: score = calculate_multi_distance_core(example, matrices, distance_type) scores.append(score) # Keep only max for each vector if len(scores) == 0: scores.append(np.full(matrix_count, -np.inf)) best_scores = np.array(scores, dtype=np.float32).max(axis=0) return best_scores pos = get_best_scores(query.positive) neg = get_best_scores(query.negative) # Choose from the best positive or the best negative, # in both cases we apply sigmoid and then negate depending on the order return np.where( pos > neg, np.fromiter((scaled_fast_sigmoid(xi) for xi in pos), pos.dtype), np.fromiter((-scaled_fast_sigmoid(xi) for xi in neg), neg.dtype), ) def calculate_multi_discovery_ranks( context: List[MultiContextPair], matrices: List[types.NumpyArray], distance_type: models.Distance, ) -> types.NumpyArray: overall_ranks = np.zeros(len(matrices), dtype=np.int32) for pair in context: # Get distances to positive and negative vectors pos = calculate_multi_distance_core(pair.positive, matrices, distance_type) neg = calculate_multi_distance_core(pair.negative, matrices, distance_type) pair_ranks = np.array( [ 1 if is_bigger else 0 if is_equal else -1 for is_bigger, is_equal in zip(pos > neg, pos == neg) ] ) overall_ranks += pair_ranks return overall_ranks def calculate_multi_discovery_scores( query: MultiDiscoveryQuery, matrices: List[types.NumpyArray], distance_type: models.Distance ) -> types.NumpyArray: ranks = calculate_multi_discovery_ranks(query.context, matrices, distance_type) # Get distances to target distances_to_target = calculate_multi_distance_core(query.target, matrices, distance_type) sigmoided_distances = np.fromiter( (scaled_fast_sigmoid(xi) for xi in distances_to_target), np.float32 ) return ranks + sigmoided_distances def calculate_multi_context_scores( query: MultiContextQuery, matrices: List[types.NumpyArray], distance_type: models.Distance ) -> types.NumpyArray: overall_scores = np.zeros(len(matrices), dtype=np.float32) for pair in query.context_pairs: # Get distances to positive and negative vectors pos = calculate_multi_distance_core(pair.positive, matrices, distance_type) neg = calculate_multi_distance_core(pair.negative, matrices, distance_type) difference = pos - neg - EPSILON pair_scores = np.fromiter( (fast_sigmoid(xi) for xi in np.minimum(difference, 0.0)), np.float32 ) overall_scores += pair_scores return overall_scores
Memory