from typing import Callable, List, Optional, Sequence, Union
import numpy as np
from qdrant_client.conversions import common_types as types
from qdrant_client.http.models import SparseVector
from qdrant_client.local.distances import EPSILON, fast_sigmoid, scaled_fast_sigmoid
from qdrant_client.local.sparse import (
empty_sparse_vector,
is_sorted,
sort_sparse_vector,
validate_sparse_vector,
)
class SparseRecoQuery:
def __init__(
self,
positive: Optional[List[SparseVector]] = None,
negative: Optional[List[SparseVector]] = None,
):
positive = positive if positive is not None else []
negative = negative if negative is not None else []
for i, vector in enumerate(positive):
validate_sparse_vector(vector)
positive[i] = sort_sparse_vector(vector)
for i, vector in enumerate(negative):
validate_sparse_vector(vector)
negative[i] = sort_sparse_vector(vector)
self.positive = positive
self.negative = negative
def transform_sparse(
self, foo: Callable[["SparseVector"], "SparseVector"]
) -> "SparseRecoQuery":
return SparseRecoQuery(
positive=[foo(vector) for vector in self.positive],
negative=[foo(vector) for vector in self.negative],
)
class SparseContextPair:
def __init__(self, positive: SparseVector, negative: SparseVector):
validate_sparse_vector(positive)
validate_sparse_vector(negative)
self.positive: SparseVector = sort_sparse_vector(positive)
self.negative: SparseVector = sort_sparse_vector(negative)
class SparseDiscoveryQuery:
def __init__(self, target: SparseVector, context: List[SparseContextPair]):
validate_sparse_vector(target)
self.target: SparseVector = sort_sparse_vector(target)
self.context = context
def transform_sparse(
self, foo: Callable[["SparseVector"], "SparseVector"]
) -> "SparseDiscoveryQuery":
return SparseDiscoveryQuery(
target=foo(self.target),
context=[
SparseContextPair(foo(pair.positive), foo(pair.negative)) for pair in self.context
],
)
class SparseContextQuery:
def __init__(self, context_pairs: List[SparseContextPair]):
self.context_pairs = context_pairs
def transform_sparse(
self, foo: Callable[["SparseVector"], "SparseVector"]
) -> "SparseContextQuery":
return SparseContextQuery(
context_pairs=[
SparseContextPair(foo(pair.positive), foo(pair.negative))
for pair in self.context_pairs
]
)
SparseQueryVector = Union[
SparseVector,
SparseDiscoveryQuery,
SparseContextQuery,
SparseRecoQuery,
]
def calculate_distance_sparse(
query: SparseVector, vectors: List[SparseVector]
) -> types.NumpyArray:
scores = []
for vector in vectors:
score = sparse_dot_product(query, vector)
if score is not None:
scores.append(score)
else:
# means no overlap
scores.append(np.float32("-inf"))
return np.array(scores, dtype=np.float32)
# Expects sorted indices
# Returns None if no overlap
def sparse_dot_product(vector1: SparseVector, vector2: SparseVector) -> Optional[np.float32]:
result = 0.0
i, j = 0, 0
overlap = False
assert is_sorted(vector1), "Query sparse vector must be sorted"
assert is_sorted(vector2), "Sparse vector to compare with must be sorted"
while i < len(vector1.indices) and j < len(vector2.indices):
if vector1.indices[i] == vector2.indices[j]:
overlap = True
result += vector1.values[i] * vector2.values[j]
i += 1
j += 1
elif vector1.indices[i] < vector2.indices[j]:
i += 1
else:
j += 1
if overlap:
return np.float32(result)
else:
return None
def calculate_sparse_discovery_ranks(
context: List[SparseContextPair],
vectors: List[SparseVector],
) -> types.NumpyArray:
overall_ranks = np.zeros(len(vectors), dtype=np.int32)
for pair in context:
# Get distances to positive and negative vectors
pos = calculate_distance_sparse(pair.positive, vectors)
neg = calculate_distance_sparse(pair.negative, vectors)
pair_ranks = np.array(
[
1 if is_bigger else 0 if is_equal else -1
for is_bigger, is_equal in zip(pos > neg, pos == neg)
]
)
overall_ranks += pair_ranks
return overall_ranks
def calculate_sparse_discovery_scores(
query: SparseDiscoveryQuery, vectors: List[SparseVector]
) -> types.NumpyArray:
ranks = calculate_sparse_discovery_ranks(query.context, vectors)
# Get distances to target
distances_to_target = calculate_distance_sparse(query.target, vectors)
sigmoided_distances = np.fromiter(
(scaled_fast_sigmoid(xi) for xi in distances_to_target), np.float32
)
return ranks + sigmoided_distances
def calculate_sparse_context_scores(
query: SparseContextQuery, vectors: List[SparseVector]
) -> types.NumpyArray:
overall_scores = np.zeros(len(vectors), dtype=np.float32)
for pair in query.context_pairs:
# Get distances to positive and negative vectors
pos = calculate_distance_sparse(pair.positive, vectors)
neg = calculate_distance_sparse(pair.negative, vectors)
difference = pos - neg - EPSILON
pair_scores = np.fromiter(
(fast_sigmoid(xi) for xi in np.minimum(difference, 0.0)), np.float32
)
overall_scores += pair_scores
return overall_scores
def calculate_sparse_recommend_best_scores(
query: SparseRecoQuery, vectors: List[SparseVector]
) -> types.NumpyArray:
def get_best_scores(examples: List[SparseVector]) -> types.NumpyArray:
vector_count = len(vectors)
# Get scores to all examples
scores: List[types.NumpyArray] = []
for example in examples:
score = calculate_distance_sparse(example, vectors)
scores.append(score)
# Keep only max for each vector
if len(scores) == 0:
scores.append(np.full(vector_count, -np.inf))
best_scores = np.array(scores, dtype=np.float32).max(axis=0)
return best_scores
pos = get_best_scores(query.positive)
neg = get_best_scores(query.negative)
# Choose from best positive or best negative,
# in in both cases we apply sigmoid and then negate depending on the order
return np.where(
pos > neg,
np.fromiter((scaled_fast_sigmoid(xi) for xi in pos), pos.dtype),
np.fromiter((-scaled_fast_sigmoid(xi) for xi in neg), neg.dtype),
)
# Expects sorted indices
def combine_aggregate(vector1: SparseVector, vector2: SparseVector, op: Callable) -> SparseVector:
result = empty_sparse_vector()
i, j = 0, 0
while i < len(vector1.indices) and j < len(vector2.indices):
if vector1.indices[i] == vector2.indices[j]:
result.indices.append(vector1.indices[i])
result.values.append(op(vector1.values[i], vector2.values[j]))
i += 1
j += 1
elif vector1.indices[i] < vector2.indices[j]:
result.indices.append(vector1.indices[i])
result.values.append(op(vector1.values[i], 0.0))
i += 1
else:
result.indices.append(vector2.indices[j])
result.values.append(op(0.0, vector2.values[j]))
j += 1
while i < len(vector1.indices):
result.indices.append(vector1.indices[i])
result.values.append(op(vector1.values[i], 0.0))
i += 1
while j < len(vector2.indices):
result.indices.append(vector2.indices[j])
result.values.append(op(0.0, vector2.values[j]))
j += 1
return result
# Expects sorted indices
def sparse_avg(vectors: Sequence[SparseVector]) -> SparseVector:
result = empty_sparse_vector()
if len(vectors) == 0:
return result
sparse_count = 0
for vector in vectors:
sparse_count += 1
result = combine_aggregate(result, vector, lambda v1, v2: v1 + v2)
result.values = np.divide(result.values, sparse_count).tolist()
return result
# Expects sorted indices
def merge_positive_and_negative_avg(
positive: SparseVector, negative: SparseVector
) -> SparseVector:
return combine_aggregate(positive, negative, lambda pos, neg: pos + pos - neg)