from __future__ import annotations
from typing import TYPE_CHECKING, Any, BinaryIO, List, Optional, Union, cast
import numpy as np
from pdfminer.layout import LTChar, LTTextBox
from pdfminer.pdftypes import PDFObjRef
from pdfminer.utils import open_filename
from unstructured_inference.inference.elements import Rectangle
from unstructured.documents.coordinates import PixelSpace, PointSpace
from unstructured.documents.elements import CoordinatesMetadata
from unstructured.partition.pdf_image.pdf_image_utils import remove_control_characters
from unstructured.partition.pdf_image.pdfminer_utils import (
extract_image_objects,
extract_text_objects,
open_pdfminer_pages_generator,
rect_to_bbox,
)
from unstructured.partition.utils.config import env_config
from unstructured.partition.utils.constants import SORT_MODE_BASIC, Source
from unstructured.partition.utils.sorting import sort_text_regions
from unstructured.utils import requires_dependencies
if TYPE_CHECKING:
from unstructured_inference.inference.elements import TextRegion, TextRegions
from unstructured_inference.inference.layout import DocumentLayout
from unstructured_inference.inference.layoutelement import LayoutElements
EPSILON_AREA = 0.01
# rounding floating point to nearest machine precision
DEFAULT_ROUND = 15
def process_file_with_pdfminer(
filename: str = "",
dpi: int = 200,
) -> tuple[List[List["TextRegion"]], List[List]]:
with open_filename(filename, "rb") as fp:
fp = cast(BinaryIO, fp)
extracted_layout, layouts_links = process_data_with_pdfminer(
file=fp,
dpi=dpi,
)
return extracted_layout, layouts_links
def _validate_bbox(bbox: list[int | float]) -> bool:
return all(x is not None for x in bbox) and (bbox[2] - bbox[0] > 0) and (bbox[3] - bbox[1] > 0)
@requires_dependencies("unstructured_inference")
def process_page_layout_from_pdfminer(
annotation_list: list,
page_layout,
page_height: int | float,
page_number: int,
coord_coef: float,
) -> tuple[LayoutElements, list]:
from unstructured_inference.inference.layoutelement import LayoutElements
urls_metadata: list[dict[str, Any]] = []
element_coords, texts, element_class = [], [], []
annotation_threshold = env_config.PDF_ANNOTATION_THRESHOLD
for obj in page_layout:
x1, y1, x2, y2 = rect_to_bbox(obj.bbox, page_height)
bbox = (x1, y1, x2, y2)
if len(annotation_list) > 0 and isinstance(obj, LTTextBox):
annotations_within_element = check_annotations_within_element(
annotation_list,
bbox,
page_number,
annotation_threshold,
)
_, words = get_words_from_obj(obj, page_height)
for annot in annotations_within_element:
urls_metadata.append(map_bbox_and_index(words, annot))
if hasattr(obj, "get_text"):
inner_text_objects = extract_text_objects(obj)
for inner_obj in inner_text_objects:
inner_bbox = rect_to_bbox(inner_obj.bbox, page_height)
if not _validate_bbox(inner_bbox):
continue
texts.append(inner_obj.get_text())
element_coords.append(inner_bbox)
element_class.append(0)
else:
inner_image_objects = extract_image_objects(obj)
for img_obj in inner_image_objects:
inner_bbox = rect_to_bbox(img_obj.bbox, page_height)
if not _validate_bbox(inner_bbox):
continue
texts.append(None)
element_coords.append(inner_bbox)
element_class.append(1)
return (
LayoutElements(
element_coords=coord_coef * np.array(element_coords),
texts=np.array(texts).astype(object),
element_class_ids=np.array(element_class),
element_class_id_map={0: "Text", 1: "Image"},
sources=np.array([Source.PDFMINER] * len(element_class)),
),
urls_metadata,
)
@requires_dependencies("unstructured_inference")
def process_data_with_pdfminer(
file: Optional[Union[bytes, BinaryIO]] = None,
dpi: int = 200,
) -> tuple[List[LayoutElements], List[List]]:
"""Loads the image and word objects from a pdf using pdfplumber and the image renderings of the
pdf pages using pdf2image"""
from unstructured_inference.inference.layoutelement import LayoutElements
layouts = []
layouts_links = []
# Coefficient to rescale bounding box to be compatible with images
coef = dpi / 72
for page_number, (page, page_layout) in enumerate(open_pdfminer_pages_generator(file)):
width, height = page_layout.width, page_layout.height
annotation_list = []
coordinate_system = PixelSpace(
width=width,
height=height,
)
if page.annots:
annotation_list = get_uris(page.annots, height, coordinate_system, page_number)
layout, urls_metadata = process_page_layout_from_pdfminer(
annotation_list, page_layout, height, page_number, coef
)
links = [
{
"bbox": [x * coef for x in metadata["bbox"]],
"text": metadata["text"],
"url": metadata["uri"],
"start_index": metadata["start_index"],
}
for metadata in urls_metadata
]
clean_layouts = []
for threshold, element_class in zip(
(
env_config.EMBEDDED_TEXT_SAME_REGION_THRESHOLD,
env_config.EMBEDDED_IMAGE_SAME_REGION_THRESHOLD,
),
(0, 1),
):
elements_to_sort = layout.slice(layout.element_class_ids == element_class)
clean_layouts.append(
remove_duplicate_elements(elements_to_sort, threshold)
if len(elements_to_sort)
else elements_to_sort
)
layout = LayoutElements.concatenate(clean_layouts)
# NOTE(christine): always do the basic sort first for deterministic order across
# python versions.
layout = sort_text_regions(layout, SORT_MODE_BASIC)
# apply the current default sorting to the layout elements extracted by pdfminer
layout = sort_text_regions(layout)
layouts.append(layout)
layouts_links.append(links)
return layouts, layouts_links
def _create_text_region(x1, y1, x2, y2, coef, text, source, region_class):
"""Creates a text region of the specified class with scaled coordinates."""
return region_class.from_coords(
x1 * coef,
y1 * coef,
x2 * coef,
y2 * coef,
text=text,
source=source,
)
def get_coords_from_bboxes(bboxes, round_to: int = DEFAULT_ROUND) -> np.ndarray:
"""convert a list of boxes's coords into np array"""
if isinstance(bboxes, np.ndarray):
return bboxes.round(round_to)
# preallocate memory
coords = np.zeros((len(bboxes), 4), dtype=np.float32)
for i, bbox in enumerate(bboxes):
coords[i, :] = [bbox.x1, bbox.y1, bbox.x2, bbox.y2]
return coords.round(round_to)
def areas_of_boxes_and_intersection_area(
coords1: np.ndarray, coords2: np.ndarray, round_to: int = DEFAULT_ROUND
):
"""compute intersection area and own areas for two groups of bounding boxes"""
x11, y11, x12, y12 = np.split(coords1, 4, axis=1)
x21, y21, x22, y22 = np.split(coords2, 4, axis=1)
inter_area = np.maximum(
(np.minimum(x12, np.transpose(x22)) - np.maximum(x11, np.transpose(x21)) + 1), 0
) * np.maximum((np.minimum(y12, np.transpose(y22)) - np.maximum(y11, np.transpose(y21)) + 1), 0)
boxa_area = (x12 - x11 + 1) * (y12 - y11 + 1)
boxb_area = (x22 - x21 + 1) * (y22 - y21 + 1)
return inter_area.round(round_to), boxa_area.round(round_to), boxb_area.round(round_to)
def bboxes1_is_almost_subregion_of_bboxes2(
bboxes1, bboxes2, threshold: float = 0.5, round_to: int = DEFAULT_ROUND
) -> np.ndarray:
"""compute if each element from bboxes1 is almost a subregion of one or more elements in
bboxes2"""
coords1 = get_coords_from_bboxes(bboxes1, round_to=round_to)
coords2 = get_coords_from_bboxes(bboxes2, round_to=round_to)
inter_area, boxa_area, boxb_area = areas_of_boxes_and_intersection_area(
coords1, coords2, round_to=round_to
)
return (inter_area / np.maximum(boxa_area, EPSILON_AREA) > threshold) & (
boxa_area <= boxb_area.T
)
def boxes_self_iou(bboxes, threshold: float = 0.5, round_to: int = DEFAULT_ROUND) -> np.ndarray:
"""compute iou for a group of elements"""
coords = get_coords_from_bboxes(bboxes, round_to=round_to)
inter_area, boxa_area, boxb_area = areas_of_boxes_and_intersection_area(
coords, coords, round_to=round_to
)
return (inter_area / np.maximum(EPSILON_AREA, boxa_area + boxb_area.T - inter_area)) > threshold
@requires_dependencies("unstructured_inference")
def pdfminer_elements_to_text_regions(layout_elements: LayoutElements) -> list[TextRegions]:
"""a temporary solution to convert layout elements to a list of either EmbeddedTextRegion or
ImageTextRegion; this should be made obsolete after we refactor the merging logic in inference
library"""
from unstructured_inference.inference.elements import (
EmbeddedTextRegion,
ImageTextRegion,
)
regions = []
for i, element_class in enumerate(layout_elements.element_class_ids):
region_class = EmbeddedTextRegion if element_class == 0 else ImageTextRegion
regions.append(
region_class.from_coords(
*layout_elements.element_coords[i],
text=layout_elements.texts[i],
source=Source.PDFMINER,
)
)
return regions
@requires_dependencies("unstructured_inference")
def merge_inferred_with_extracted_layout(
inferred_document_layout: "DocumentLayout",
extracted_layout: List[TextRegions],
hi_res_model_name: str,
) -> "DocumentLayout":
"""Merge an inferred layout with an extracted layout"""
from unstructured_inference.inference.layoutelement import LayoutElements
from unstructured_inference.inference.layoutelement import (
merge_inferred_layout_with_extracted_layout as merge_inferred_with_extracted_page,
)
from unstructured_inference.models.detectron2onnx import UnstructuredDetectronONNXModel
inferred_pages = inferred_document_layout.pages
for i, (inferred_page, extracted_page_layout) in enumerate(
zip(inferred_pages, extracted_layout)
):
inferred_layout = inferred_page.elements
image_metadata = inferred_page.image_metadata
w = image_metadata.get("width")
h = image_metadata.get("height")
image_size = (w, h)
threshold_kwargs = {}
# NOTE(Benjamin): With this the thresholds are only changed for detextron2_mask_rcnn
# In other case the default values for the functions are used
if (
isinstance(inferred_page.detection_model, UnstructuredDetectronONNXModel)
and "R_50" not in inferred_page.detection_model.model_path
):
threshold_kwargs = {"same_region_threshold": 0.5, "subregion_threshold": 0.5}
# NOTE (yao): after refactoring the algorithm to be vectorized we can then pass in the
# vectorized data structure into the merge function
merged_layout = merge_inferred_with_extracted_page(
inferred_layout=inferred_layout,
extracted_layout=pdfminer_elements_to_text_regions(extracted_page_layout),
page_image_size=image_size,
**threshold_kwargs,
)
merged_layout = sort_text_regions(LayoutElements.from_list(merged_layout), SORT_MODE_BASIC)
# so that we can modify the text without worrying about hitting length limit
merged_layout.texts = merged_layout.texts.astype(object)
for i, text in enumerate(merged_layout.texts):
if text is None:
text = aggregate_embedded_text_by_block(
target_region=merged_layout.slice([i]),
source_regions=extracted_page_layout,
)
merged_layout.texts[i] = remove_control_characters(text)
inferred_page.elements_array = merged_layout
# NOTE: once we drop reference to elements we can remove this step below
inferred_page.elements[:] = merged_layout.as_list()
return inferred_document_layout
def clean_pdfminer_inner_elements(document: "DocumentLayout") -> "DocumentLayout":
"""Clean pdfminer elements from inside tables.
This function removes elements sourced from PDFMiner that are subregions within table elements.
"""
for page in document.pages:
non_pdfminer_element_boxes = [e.bbox for e in page.elements if e.source != Source.PDFMINER]
element_boxes = []
element_to_subregion_map = {}
subregion_indice = 0
for i, element in enumerate(page.elements):
if element.source != Source.PDFMINER:
continue
element_boxes.append(element.bbox)
element_to_subregion_map[i] = subregion_indice
subregion_indice += 1
is_element_subregion_of_other_elements = (
bboxes1_is_almost_subregion_of_bboxes2(
element_boxes,
non_pdfminer_element_boxes,
env_config.EMBEDDED_TEXT_AGGREGATION_SUBREGION_THRESHOLD,
).sum(axis=1)
== 1
)
page.elements = [
e
for i, e in enumerate(page.elements)
if (
(i not in element_to_subregion_map)
or not is_element_subregion_of_other_elements[element_to_subregion_map[i]]
)
]
return document
@requires_dependencies("unstructured_inference")
def remove_duplicate_elements(
elements: TextRegions,
threshold: float = 0.5,
) -> TextRegions:
"""Removes duplicate text elements extracted by PDFMiner from a document layout."""
iou = boxes_self_iou(elements.element_coords, threshold)
# this is equivalent of finding those rows where `not iou[i, i + 1 :].any()`, i.e., any element
# that has no overlap above the threshold with any other elements
return elements.slice(~np.triu(iou, k=1).any(axis=1))
def aggregate_embedded_text_by_block(
target_region: TextRegions,
source_regions: TextRegions,
threshold: float = env_config.EMBEDDED_TEXT_AGGREGATION_SUBREGION_THRESHOLD,
) -> str:
"""Extracts the text aggregated from the elements of the given layout that lie within the given
block."""
if len(source_regions) == 0 or len(target_region) == 0:
return ""
mask = (
bboxes1_is_almost_subregion_of_bboxes2(
source_regions.element_coords,
target_region.element_coords,
threshold,
)
.sum(axis=1)
.astype(bool)
)
text = " ".join([text for text in source_regions.slice(mask).texts if text])
return text
def get_links_in_element(page_links: list, region: Rectangle) -> list:
links_bboxes = [Rectangle(*link.get("bbox")) for link in page_links]
results = bboxes1_is_almost_subregion_of_bboxes2(links_bboxes, [region])
links = [
{
"text": page_links[idx].get("text"),
"url": page_links[idx].get("url"),
"start_index": page_links[idx].get("start_index"),
}
for idx, result in enumerate(results)
if any(result)
]
return links
def get_uris(
annots: PDFObjRef | list[PDFObjRef],
height: float,
coordinate_system: PixelSpace | PointSpace,
page_number: int,
) -> list[dict[str, Any]]:
"""
Extracts URI annotations from a single or a list of PDF object references on a specific page.
The type of annots (list or not) depends on the pdf formatting. The function detectes the type
of annots and then pass on to get_uris_from_annots function as a list.
Args:
annots (PDFObjRef | list[PDFObjRef]): A single or a list of PDF object references
representing annotations on the page.
height (float): The height of the page in the specified coordinate system.
coordinate_system (PixelSpace | PointSpace): The coordinate system used to represent
the annotations' coordinates.
page_number (int): The page number from which to extract annotations.
Returns:
list[dict]: A list of dictionaries, each containing information about a URI annotation,
including its coordinates, bounding box, type, URI link, and page number.
"""
if isinstance(annots, list):
return get_uris_from_annots(annots, height, coordinate_system, page_number)
resolved_annots = annots.resolve()
if resolved_annots is None:
return []
return get_uris_from_annots(resolved_annots, height, coordinate_system, page_number)
def get_uris_from_annots(
annots: list[PDFObjRef],
height: int | float,
coordinate_system: PixelSpace | PointSpace,
page_number: int,
) -> list[dict[str, Any]]:
"""
Extracts URI annotations from a list of PDF object references.
Args:
annots (list[PDFObjRef]): A list of PDF object references representing annotations on
a page.
height (int | float): The height of the page in the specified coordinate system.
coordinate_system (PixelSpace | PointSpace): The coordinate system used to represent
the annotations' coordinates.
page_number (int): The page number from which to extract annotations.
Returns:
list[dict]: A list of dictionaries, each containing information about a URI annotation,
including its coordinates, bounding box, type, URI link, and page number.
"""
annotation_list = []
for annotation in annots:
# Check annotation is valid for extraction
annotation_dict = try_resolve(annotation)
if not isinstance(annotation_dict, dict):
continue
subtype = annotation_dict.get("Subtype", None)
if not subtype or isinstance(subtype, PDFObjRef) or str(subtype) != "/'Link'":
continue
# Extract bounding box and update coordinates
rect = annotation_dict.get("Rect", None)
if not rect or isinstance(rect, PDFObjRef) or len(rect) != 4:
continue
x1, y1, x2, y2 = rect_to_bbox(rect, height)
points = ((x1, y1), (x1, y2), (x2, y2), (x2, y1))
coordinates_metadata = CoordinatesMetadata(
points=points,
system=coordinate_system,
)
# Extract type
if "A" not in annotation_dict:
continue
uri_dict = try_resolve(annotation_dict["A"])
if not isinstance(uri_dict, dict):
continue
uri_type = None
if "S" in uri_dict and not isinstance(uri_dict["S"], PDFObjRef):
uri_type = str(uri_dict["S"])
# Extract URI link
uri = None
try:
if uri_type == "/'URI'":
uri = try_resolve(try_resolve(uri_dict["URI"])).decode("utf-8")
if uri_type == "/'GoTo'":
uri = try_resolve(try_resolve(uri_dict["D"])).decode("utf-8")
except Exception:
pass
annotation_list.append(
{
"coordinates": coordinates_metadata,
"bbox": (x1, y1, x2, y2),
"type": uri_type,
"uri": uri,
"page_number": page_number,
},
)
return annotation_list
def try_resolve(annot: PDFObjRef):
"""
Attempt to resolve a PDF object reference. If successful, returns the resolved object;
otherwise, returns the original reference.
"""
try:
return annot.resolve()
except Exception:
return annot
def check_annotations_within_element(
annotation_list: list[dict[str, Any]],
element_bbox: tuple[float, float, float, float],
page_number: int,
annotation_threshold: float,
) -> list[dict[str, Any]]:
"""
Filter annotations that are within or highly overlap with a specified element on a page.
Args:
annotation_list (list[dict[str,Any]]): A list of dictionaries, each containing information
about an annotation.
element_bbox (tuple[float, float, float, float]): The bounding box coordinates of the
specified element in the bbox format (x1, y1, x2, y2).
page_number (int): The page number to which the annotations and element belong.
annotation_threshold (float, optional): The threshold value (between 0.0 and 1.0)
that determines the minimum overlap required for an annotation to be considered
within the element. Default is 0.9.
Returns:
list[dict[str,Any]]: A list of dictionaries containing information about annotations
that are within or highly overlap with the specified element on the given page, based on
the specified threshold.
"""
annotations_within_element = []
for annotation in annotation_list:
if annotation["page_number"] == page_number:
annotation_bbox_size = calculate_bbox_area(annotation["bbox"])
if annotation_bbox_size and (
calculate_intersection_area(element_bbox, annotation["bbox"]) / annotation_bbox_size
> annotation_threshold
):
annotations_within_element.append(annotation)
return annotations_within_element
def get_words_from_obj(
obj: LTTextBox,
height: float,
) -> tuple[list[LTChar], list[dict[str, Any]]]:
"""
Extracts characters and word bounding boxes from a PDF text element.
Args:
obj (LTTextBox): The PDF text element from which to extract characters and words.
height (float): The height of the page in the specified coordinate system.
Returns:
tuple[list[LTChar], list[dict[str,Any]]]: A tuple containing two lists:
- list[LTChar]: A list of LTChar objects representing individual characters.
- list[dict[str,Any]]]: A list of dictionaries, each containing information about
a word, including its text, bounding box, and start index in the element's text.
"""
characters = []
words = []
text_len = 0
for text_line in obj:
word = ""
x1, y1, x2, y2 = None, None, None, None
start_index = 0
for index, character in enumerate(text_line):
if isinstance(character, LTChar):
characters.append(character)
char = character.get_text()
if word and not char.strip():
words.append(
{"text": word, "bbox": (x1, y1, x2, y2), "start_index": start_index},
)
word = ""
continue
# TODO(klaijan) - isalnum() only works with A-Z, a-z and 0-9
# will need to switch to some pattern matching once we support more languages
if not word:
isalnum = char.isalnum()
if word and char.isalnum() != isalnum:
isalnum = char.isalnum()
words.append(
{"text": word, "bbox": (x1, y1, x2, y2), "start_index": start_index},
)
word = ""
if len(word) == 0:
start_index = text_len + index
x1 = character.x0
y2 = height - character.y0
x2 = character.x1
y1 = height - character.y1
else:
x2 = character.x1
y2 = height - character.y0
word += char
else:
words.append(
{"text": word, "bbox": (x1, y1, x2, y2), "start_index": start_index},
)
word = ""
text_len += len(text_line)
return characters, words
def map_bbox_and_index(words: list[dict[str, Any]], annot: dict[str, Any]):
"""
Maps a bounding box annotation to the corresponding text and start index within a list of words.
Args:
words (list[dict[str,Any]]): A list of dictionaries, each containing information about
a word, including its text, bounding box, and start index.
annot (dict[str,Any]): The annotation dictionary to be mapped, which will be updated with
"text" and "start_index" fields.
Returns:
dict: The updated annotation dictionary with "text" representing the mapped text and
"start_index" representing the start index of the mapped text in the list of words.
"""
if len(words) == 0:
annot["text"] = ""
annot["start_index"] = -1
return annot
distance_from_bbox_start = np.sqrt(
(annot["bbox"][0] - np.array([word["bbox"][0] for word in words])) ** 2
+ (annot["bbox"][1] - np.array([word["bbox"][1] for word in words])) ** 2,
)
distance_from_bbox_end = np.sqrt(
(annot["bbox"][2] - np.array([word["bbox"][2] for word in words])) ** 2
+ (annot["bbox"][3] - np.array([word["bbox"][3] for word in words])) ** 2,
)
closest_start = try_argmin(distance_from_bbox_start)
closest_end = try_argmin(distance_from_bbox_end)
# NOTE(klaijan) - get the word from closest start only if the end index comes after start index
text = ""
if closest_end >= closest_start:
for _ in range(closest_start, closest_end + 1):
text += " "
text += words[_]["text"]
else:
text = words[closest_start]["text"]
annot["text"] = text.strip()
annot["start_index"] = words[closest_start]["start_index"]
return annot
def calculate_intersection_area(
bbox1: tuple[float, float, float, float],
bbox2: tuple[float, float, float, float],
) -> float:
"""
Calculate the area of intersection between two bounding boxes.
Args:
bbox1 (tuple[float, float, float, float]): The coordinates of the first bounding box
in the format (x1, y1, x2, y2).
bbox2 (tuple[float, float, float, float]): The coordinates of the second bounding box
in the format (x1, y1, x2, y2).
Returns:
float: The area of intersection between the two bounding boxes. If there is no
intersection, the function returns 0.0.
"""
x1_1, y1_1, x2_1, y2_1 = bbox1
x1_2, y1_2, x2_2, y2_2 = bbox2
x_intersection = max(x1_1, x1_2)
y_intersection = max(y1_1, y1_2)
x2_intersection = min(x2_1, x2_2)
y2_intersection = min(y2_1, y2_2)
if x_intersection < x2_intersection and y_intersection < y2_intersection:
intersection_area = calculate_bbox_area(
(x_intersection, y_intersection, x2_intersection, y2_intersection),
)
return intersection_area
else:
return 0.0
def calculate_bbox_area(bbox: tuple[float, float, float, float]) -> float:
"""
Calculate the area of a bounding box.
Args:
bbox (tuple[float, float, float, float]): The coordinates of the bounding box
in the format (x1, y1, x2, y2).
Returns:
float: The area of the bounding box, computed as the product of its width and height.
"""
x1, y1, x2, y2 = bbox
area = (x2 - x1) * (y2 - y1)
return area
def try_argmin(array: np.ndarray) -> int:
"""
Attempt to find the index of the minimum value in a NumPy array.
Args:
array (np.ndarray): The NumPy array in which to find the minimum value's index.
Returns:
int: The index of the minimum value in the array. If the array is empty or an
IndexError occurs, it returns -1.
"""
try:
return int(np.argmin(array))
except IndexError:
return -1