# SPDX-License-Identifier: MIT
# Copyright (C) 2022 Max Bachmann
from __future__ import annotations
from rapidfuzz._common_py import common_affix, conv_sequences
from rapidfuzz._utils import is_none, setupPandas
from rapidfuzz.distance import Indel_py as Indel
from rapidfuzz.distance._initialize_py import Editop, Editops
def _levenshtein_maximum(s1, s2, weights):
len1 = len(s1)
len2 = len(s2)
insert, delete, replace = weights
max_dist = len1 * delete + len2 * insert
if len1 >= len2:
max_dist = min(max_dist, len2 * replace + (len1 - len2) * delete)
else:
max_dist = min(max_dist, len1 * replace + (len2 - len1) * insert)
return max_dist
def _uniform_generic(s1, s2, weights):
len1 = len(s1)
insert, delete, replace = weights
cache = list(range(0, (len1 + 1) * delete, delete))
for ch2 in s2:
temp = cache[0]
cache[0] += insert
for i in range(len1):
x = temp
if s1[i] != ch2:
x = min(cache[i] + delete, cache[i + 1] + insert, temp + replace)
temp = cache[i + 1]
cache[i + 1] = x
return cache[-1]
def _uniform_distance(s1, s2):
if not s1:
return len(s2)
VP = (1 << len(s1)) - 1
VN = 0
currDist = len(s1)
mask = 1 << (len(s1) - 1)
block = {}
block_get = block.get
x = 1
for ch1 in s1:
block[ch1] = block_get(ch1, 0) | x
x <<= 1
for ch2 in s2:
# Step 1: Computing D0
PM_j = block_get(ch2, 0)
X = PM_j
D0 = (((X & VP) + VP) ^ VP) | X | VN
# Step 2: Computing HP and HN
HP = VN | ~(D0 | VP)
HN = D0 & VP
# Step 3: Computing the value D[m,j]
currDist += (HP & mask) != 0
currDist -= (HN & mask) != 0
# Step 4: Computing Vp and VN
HP = (HP << 1) | 1
HN = HN << 1
VP = HN | ~(D0 | HP)
VN = HP & D0
return currDist
def distance(
s1,
s2,
*,
weights=(1, 1, 1),
processor=None,
score_cutoff=None,
score_hint=None,
):
"""
Calculates the minimum number of insertions, deletions, and substitutions
required to change one sequence into the other according to Levenshtein with custom
costs for insertion, deletion and substitution
Parameters
----------
s1 : Sequence[Hashable]
First string to compare.
s2 : Sequence[Hashable]
Second string to compare.
weights : tuple[int, int, int] or None, optional
The weights for the three operations in the form
(insertion, deletion, substitution). Default is (1, 1, 1),
which gives all three operations a weight of 1.
processor : callable, optional
Optional callable that is used to preprocess the strings before
comparing them. Default is None, which deactivates this behaviour.
score_cutoff : int, optional
Maximum distance between s1 and s2, that is
considered as a result. If the distance is bigger than score_cutoff,
score_cutoff + 1 is returned instead. Default is None, which deactivates
this behaviour.
score_hint : int, optional
Expected distance between s1 and s2. This is used to select a
faster implementation. Default is None, which deactivates this behaviour.
Returns
-------
distance : int
distance between s1 and s2
Raises
------
ValueError
If unsupported weights are provided a ValueError is thrown
Examples
--------
Find the Levenshtein distance between two strings:
>>> from rapidfuzz.distance import Levenshtein
>>> Levenshtein.distance("lewenstein", "levenshtein")
2
Setting a maximum distance allows the implementation to select
a more efficient implementation:
>>> Levenshtein.distance("lewenstein", "levenshtein", score_cutoff=1)
2
It is possible to select different weights by passing a `weight`
tuple.
>>> Levenshtein.distance("lewenstein", "levenshtein", weights=(1,1,2))
3
"""
_ = score_hint
if processor is not None:
s1 = processor(s1)
s2 = processor(s2)
s1, s2 = conv_sequences(s1, s2)
if weights is None or weights == (1, 1, 1):
dist = _uniform_distance(s1, s2)
elif weights == (1, 1, 2):
dist = Indel.distance(s1, s2)
else:
dist = _uniform_generic(s1, s2, weights)
return dist if (score_cutoff is None or dist <= score_cutoff) else score_cutoff + 1
def similarity(
s1,
s2,
*,
weights=(1, 1, 1),
processor=None,
score_cutoff=None,
score_hint=None,
):
"""
Calculates the levenshtein similarity in the range [max, 0] using custom
costs for insertion, deletion and substitution.
This is calculated as ``max - distance``, where max is the maximal possible
Levenshtein distance given the lengths of the sequences s1/s2 and the weights.
Parameters
----------
s1 : Sequence[Hashable]
First string to compare.
s2 : Sequence[Hashable]
Second string to compare.
weights : tuple[int, int, int] or None, optional
The weights for the three operations in the form
(insertion, deletion, substitution). Default is (1, 1, 1),
which gives all three operations a weight of 1.
processor : callable, optional
Optional callable that is used to preprocess the strings before
comparing them. Default is None, which deactivates this behaviour.
score_cutoff : int, optional
Maximum distance between s1 and s2, that is
considered as a result. If the similarity is smaller than score_cutoff,
0 is returned instead. Default is None, which deactivates
this behaviour.
score_hint : int, optional
Expected similarity between s1 and s2. This is used to select a
faster implementation. Default is None, which deactivates this behaviour.
Returns
-------
similarity : int
similarity between s1 and s2
Raises
------
ValueError
If unsupported weights are provided a ValueError is thrown
"""
_ = score_hint
if processor is not None:
s1 = processor(s1)
s2 = processor(s2)
s1, s2 = conv_sequences(s1, s2)
weights = weights or (1, 1, 1)
maximum = _levenshtein_maximum(s1, s2, weights)
dist = distance(s1, s2, weights=weights)
sim = maximum - dist
return sim if (score_cutoff is None or sim >= score_cutoff) else 0
def normalized_distance(
s1,
s2,
*,
weights=(1, 1, 1),
processor=None,
score_cutoff=None,
score_hint=None,
):
"""
Calculates a normalized levenshtein distance in the range [1, 0] using custom
costs for insertion, deletion and substitution.
This is calculated as ``distance / max``, where max is the maximal possible
Levenshtein distance given the lengths of the sequences s1/s2 and the weights.
Parameters
----------
s1 : Sequence[Hashable]
First string to compare.
s2 : Sequence[Hashable]
Second string to compare.
weights : tuple[int, int, int] or None, optional
The weights for the three operations in the form
(insertion, deletion, substitution). Default is (1, 1, 1),
which gives all three operations a weight of 1.
processor : callable, optional
Optional callable that is used to preprocess the strings before
comparing them. Default is None, which deactivates this behaviour.
score_cutoff : float, optional
Optional argument for a score threshold as a float between 0 and 1.0.
For norm_dist > score_cutoff 1.0 is returned instead. Default is None,
which deactivates this behaviour.
score_hint : float, optional
Expected normalized distance between s1 and s2. This is used to select a
faster implementation. Default is None, which deactivates this behaviour.
Returns
-------
norm_dist : float
normalized distance between s1 and s2 as a float between 1.0 and 0.0
Raises
------
ValueError
If unsupported weights are provided a ValueError is thrown
"""
_ = score_hint
setupPandas()
if is_none(s1) or is_none(s2):
return 1.0
if processor is not None:
s1 = processor(s1)
s2 = processor(s2)
s1, s2 = conv_sequences(s1, s2)
weights = weights or (1, 1, 1)
maximum = _levenshtein_maximum(s1, s2, weights)
dist = distance(s1, s2, weights=weights)
norm_dist = dist / maximum if maximum else 0
return norm_dist if (score_cutoff is None or norm_dist <= score_cutoff) else 1
def normalized_similarity(
s1,
s2,
*,
weights=(1, 1, 1),
processor=None,
score_cutoff=None,
score_hint=None,
):
"""
Calculates a normalized levenshtein similarity in the range [0, 1] using custom
costs for insertion, deletion and substitution.
This is calculated as ``1 - normalized_distance``
Parameters
----------
s1 : Sequence[Hashable]
First string to compare.
s2 : Sequence[Hashable]
Second string to compare.
weights : tuple[int, int, int] or None, optional
The weights for the three operations in the form
(insertion, deletion, substitution). Default is (1, 1, 1),
which gives all three operations a weight of 1.
processor : callable, optional
Optional callable that is used to preprocess the strings before
comparing them. Default is None, which deactivates this behaviour.
score_cutoff : float, optional
Optional argument for a score threshold as a float between 0 and 1.0.
For norm_sim < score_cutoff 0 is returned instead. Default is None,
which deactivates this behaviour.
score_hint : int, optional
Expected normalized similarity between s1 and s2. This is used to select a
faster implementation. Default is None, which deactivates this behaviour.
Returns
-------
norm_sim : float
normalized similarity between s1 and s2 as a float between 0 and 1.0
Raises
------
ValueError
If unsupported weights are provided a ValueError is thrown
Examples
--------
Find the normalized Levenshtein similarity between two strings:
>>> from rapidfuzz.distance import Levenshtein
>>> Levenshtein.normalized_similarity("lewenstein", "levenshtein")
0.81818181818181
Setting a score_cutoff allows the implementation to select
a more efficient implementation:
>>> Levenshtein.normalized_similarity("lewenstein", "levenshtein", score_cutoff=0.85)
0.0
It is possible to select different weights by passing a `weight`
tuple.
>>> Levenshtein.normalized_similarity("lewenstein", "levenshtein", weights=(1,1,2))
0.85714285714285
When a different processor is used s1 and s2 do not have to be strings
>>> Levenshtein.normalized_similarity(["lewenstein"], ["levenshtein"], processor=lambda s: s[0])
0.81818181818181
"""
_ = score_hint
setupPandas()
if is_none(s1) or is_none(s2):
return 0.0
if processor is not None:
s1 = processor(s1)
s2 = processor(s2)
s1, s2 = conv_sequences(s1, s2)
weights = weights or (1, 1, 1)
norm_dist = normalized_distance(s1, s2, weights=weights)
norm_sim = 1.0 - norm_dist
return norm_sim if (score_cutoff is None or norm_sim >= score_cutoff) else 0
def _matrix(s1, s2):
if not s1:
return (len(s2), [], [])
VP = (1 << len(s1)) - 1
VN = 0
currDist = len(s1)
mask = 1 << (len(s1) - 1)
block = {}
block_get = block.get
x = 1
for ch1 in s1:
block[ch1] = block_get(ch1, 0) | x
x <<= 1
matrix_VP = []
matrix_VN = []
for ch2 in s2:
# Step 1: Computing D0
PM_j = block_get(ch2, 0)
X = PM_j
D0 = (((X & VP) + VP) ^ VP) | X | VN
# Step 2: Computing HP and HN
HP = VN | ~(D0 | VP)
HN = D0 & VP
# Step 3: Computing the value D[m,j]
currDist += (HP & mask) != 0
currDist -= (HN & mask) != 0
# Step 4: Computing Vp and VN
HP = (HP << 1) | 1
HN = HN << 1
VP = HN | ~(D0 | HP)
VN = HP & D0
matrix_VP.append(VP)
matrix_VN.append(VN)
return (currDist, matrix_VP, matrix_VN)
def editops(
s1,
s2,
*,
processor=None,
score_hint=None,
):
"""
Return Editops describing how to turn s1 into s2.
Parameters
----------
s1 : Sequence[Hashable]
First string to compare.
s2 : Sequence[Hashable]
Second string to compare.
processor : callable, optional
Optional callable that is used to preprocess the strings before
comparing them. Default is None, which deactivates this behaviour.
score_hint : int, optional
Expected distance between s1 and s2. This is used to select a
faster implementation. Default is None, which deactivates this behaviour.
Returns
-------
editops : Editops
edit operations required to turn s1 into s2
Notes
-----
The alignment is calculated using an algorithm of Heikki Hyyrö, which is
described [8]_. It has a time complexity and memory usage of ``O([N/64] * M)``.
References
----------
.. [8] Hyyrö, Heikki. "A Note on Bit-Parallel Alignment Computation."
Stringology (2004).
Examples
--------
>>> from rapidfuzz.distance import Levenshtein
>>> for tag, src_pos, dest_pos in Levenshtein.editops("qabxcd", "abycdf"):
... print(("%7s s1[%d] s2[%d]" % (tag, src_pos, dest_pos)))
delete s1[1] s2[0]
replace s1[3] s2[2]
insert s1[6] s2[5]
"""
_ = score_hint
if processor is not None:
s1 = processor(s1)
s2 = processor(s2)
s1, s2 = conv_sequences(s1, s2)
prefix_len, suffix_len = common_affix(s1, s2)
s1 = s1[prefix_len : len(s1) - suffix_len]
s2 = s2[prefix_len : len(s2) - suffix_len]
dist, VP, VN = _matrix(s1, s2)
editops = Editops([], 0, 0)
editops._src_len = len(s1) + prefix_len + suffix_len
editops._dest_len = len(s2) + prefix_len + suffix_len
if dist == 0:
return editops
editop_list = [None] * dist
col = len(s1)
row = len(s2)
while row != 0 and col != 0:
# deletion
if VP[row - 1] & (1 << (col - 1)):
dist -= 1
col -= 1
editop_list[dist] = Editop("delete", col + prefix_len, row + prefix_len)
else:
row -= 1
# insertion
if row and (VN[row - 1] & (1 << (col - 1))):
dist -= 1
editop_list[dist] = Editop("insert", col + prefix_len, row + prefix_len)
else:
col -= 1
# replace (Matches are not recorded)
if s1[col] != s2[row]:
dist -= 1
editop_list[dist] = Editop("replace", col + prefix_len, row + prefix_len)
while col != 0:
dist -= 1
col -= 1
editop_list[dist] = Editop("delete", col + prefix_len, row + prefix_len)
while row != 0:
dist -= 1
row -= 1
editop_list[dist] = Editop("insert", col + prefix_len, row + prefix_len)
editops._editops = editop_list
return editops
def opcodes(
s1,
s2,
*,
processor=None,
score_hint=None,
):
"""
Return Opcodes describing how to turn s1 into s2.
Parameters
----------
s1 : Sequence[Hashable]
First string to compare.
s2 : Sequence[Hashable]
Second string to compare.
processor : callable, optional
Optional callable that is used to preprocess the strings before
comparing them. Default is None, which deactivates this behaviour.
score_hint : int, optional
Expected distance between s1 and s2. This is used to select a
faster implementation. Default is None, which deactivates this behaviour.
Returns
-------
opcodes : Opcodes
edit operations required to turn s1 into s2
Notes
-----
The alignment is calculated using an algorithm of Heikki Hyyrö, which is
described [9]_. It has a time complexity and memory usage of ``O([N/64] * M)``.
References
----------
.. [9] Hyyrö, Heikki. "A Note on Bit-Parallel Alignment Computation."
Stringology (2004).
Examples
--------
>>> from rapidfuzz.distance import Levenshtein
>>> a = "qabxcd"
>>> b = "abycdf"
>>> for tag, i1, i2, j1, j2 in Levenshtein.opcodes("qabxcd", "abycdf"):
... print(("%7s a[%d:%d] (%s) b[%d:%d] (%s)" %
... (tag, i1, i2, a[i1:i2], j1, j2, b[j1:j2])))
delete a[0:1] (q) b[0:0] ()
equal a[1:3] (ab) b[0:2] (ab)
replace a[3:4] (x) b[2:3] (y)
equal a[4:6] (cd) b[3:5] (cd)
insert a[6:6] () b[5:6] (f)
"""
return editops(s1, s2, processor=processor, score_hint=score_hint).as_opcodes()