from typing import Dict, List from qdrant_client.http import models def reciprocal_rank_fusion( responses: List[List[models.ScoredPoint]], limit: int = 10 ) -> List[models.ScoredPoint]: def compute_score(pos: int) -> float: ranking_constant = ( 2 # the constant mitigates the impact of high rankings by outlier systems ) return 1 / (ranking_constant + pos) scores: Dict[models.ExtendedPointId, float] = {} point_pile = {} for response in responses: for i, scored_point in enumerate(response): if scored_point.id in scores: scores[scored_point.id] += compute_score(i) else: point_pile[scored_point.id] = scored_point scores[scored_point.id] = compute_score(i) sorted_scores = sorted(scores.items(), key=lambda item: item[1], reverse=True) sorted_points = [] for point_id, score in sorted_scores[:limit]: point = point_pile[point_id] point.score = score sorted_points.append(point) return sorted_points def distribution_based_score_fusion( responses: List[List[models.ScoredPoint]], limit: int ) -> List[models.ScoredPoint]: def normalize(response: List[models.ScoredPoint]) -> List[models.ScoredPoint]: total = sum([point.score for point in response]) mean = total / len(response) variance = sum([(point.score - mean) ** 2 for point in response]) / (len(response) - 1) std_dev = variance**0.5 low = mean - 3 * std_dev high = mean + 3 * std_dev for point in response: point.score = (point.score - low) / (high - low) return response points_map: Dict[models.ExtendedPointId, models.ScoredPoint] = {} for response in responses: if not response: continue normalized = normalize(response) for point in normalized: entry = points_map.get(point.id) if entry is None: points_map[point.id] = point else: entry.score += point.score sorted_points = sorted(points_map.values(), key=lambda item: item.score, reverse=True) return sorted_points[:limit]
Memory