from __future__ import annotations import contextlib import copy import io import os import re import warnings from pathlib import Path from typing import IO, TYPE_CHECKING, Any, Optional, cast import numpy as np import wrapt from pdfminer.layout import LTContainer, LTImage, LTItem, LTTextBox from pdfminer.utils import open_filename from pi_heif import register_heif_opener from PIL import Image as PILImage from pypdf import PdfReader from unstructured_inference.inference.layout import DocumentLayout from unstructured_inference.inference.layoutelement import LayoutElement from unstructured.chunking import add_chunking_strategy from unstructured.cleaners.core import ( clean_extra_whitespace_with_index_run, index_adjustment_after_clean_extra_whitespace, ) from unstructured.documents.coordinates import PixelSpace, PointSpace from unstructured.documents.elements import ( CoordinatesMetadata, Element, ElementMetadata, ElementType, Image, Link, ListItem, PageBreak, Text, Title, process_metadata, ) from unstructured.errors import PageCountExceededError from unstructured.file_utils.filetype import add_metadata_with_filetype from unstructured.file_utils.model import FileType from unstructured.logger import logger, trace_logger from unstructured.nlp.patterns import PARAGRAPH_PATTERN from unstructured.partition.common.common import ( add_element_metadata, exactly_one, get_page_image_metadata, normalize_layout_element, ocr_data_to_elements, spooled_to_bytes_io_if_needed, ) from unstructured.partition.common.lang import ( check_language_args, prepare_languages_for_tesseract, tesseract_to_paddle_language, ) from unstructured.partition.common.metadata import get_last_modified_date from unstructured.partition.pdf_image.analysis.layout_dump import ( ExtractedLayoutDumper, FinalLayoutDumper, ObjectDetectionLayoutDumper, OCRLayoutDumper, ) from unstructured.partition.pdf_image.analysis.tools import save_analysis_artifiacts from unstructured.partition.pdf_image.form_extraction import run_form_extraction from unstructured.partition.pdf_image.pdf_image_utils import ( check_element_types_to_extract, convert_pdf_to_images, save_elements, ) from unstructured.partition.pdf_image.pdfminer_processing import ( check_annotations_within_element, clean_pdfminer_inner_elements, get_links_in_element, get_uris, get_words_from_obj, map_bbox_and_index, merge_inferred_with_extracted_layout, ) from unstructured.partition.pdf_image.pdfminer_utils import ( open_pdfminer_pages_generator, rect_to_bbox, ) from unstructured.partition.strategies import determine_pdf_or_image_strategy, validate_strategy from unstructured.partition.text import element_from_text from unstructured.partition.utils.config import env_config from unstructured.partition.utils.constants import ( OCR_AGENT_PADDLE, SORT_MODE_BASIC, SORT_MODE_DONT, SORT_MODE_XY_CUT, OCRMode, PartitionStrategy, ) from unstructured.partition.utils.sorting import coord_has_valid_points, sort_page_elements from unstructured.patches.pdfminer import patch_psparser from unstructured.utils import first, requires_dependencies if TYPE_CHECKING: pass # Correct a bug that was introduced by a previous patch to # pdfminer.six, causing needless and unsuccessful repairing of PDFs # which were not actually broken. patch_psparser() RE_MULTISPACE_INCLUDING_NEWLINES = re.compile(pattern=r"\s+", flags=re.DOTALL) @requires_dependencies("unstructured_inference") def default_hi_res_model() -> str: # a light config for the hi res model; this is not defined as a constant so that no setting of # the default hi res model name is done on importing of this submodule; this allows (if user # prefers) for setting env after importing the sub module and changing the default model name from unstructured_inference.models.base import DEFAULT_MODEL return os.environ.get("UNSTRUCTURED_HI_RES_MODEL_NAME", DEFAULT_MODEL) @process_metadata() @add_metadata_with_filetype(FileType.PDF) @add_chunking_strategy def partition_pdf( filename: Optional[str] = None, file: Optional[IO[bytes]] = None, include_page_breaks: bool = False, strategy: str = PartitionStrategy.AUTO, infer_table_structure: bool = False, ocr_languages: Optional[str] = None, # changing to optional for deprecation languages: Optional[list[str]] = None, metadata_filename: Optional[str] = None, # used by decorator metadata_last_modified: Optional[str] = None, chunking_strategy: Optional[str] = None, # used by decorator hi_res_model_name: Optional[str] = None, extract_images_in_pdf: bool = False, extract_image_block_types: Optional[list[str]] = None, extract_image_block_output_dir: Optional[str] = None, extract_image_block_to_payload: bool = False, starting_page_number: int = 1, extract_forms: bool = False, form_extraction_skip_tables: bool = True, **kwargs: Any, ) -> list[Element]: """Parses a pdf document into a list of interpreted elements. Parameters ---------- filename A string defining the target filename path. file A file-like object as bytes --> open(filename, "rb"). strategy The strategy to use for partitioning the PDF. Valid strategies are "hi_res", "ocr_only", and "fast". When using the "hi_res" strategy, the function uses a layout detection model to identify document elements. When using the "ocr_only" strategy, partition_pdf simply extracts the text from the document using OCR and processes it. If the "fast" strategy is used, the text is extracted directly from the PDF. The default strategy `auto` will determine when a page can be extracted using `fast` mode, otherwise it will fall back to `hi_res`. infer_table_structure Only applicable if `strategy=hi_res`. If True, any Table elements that are extracted will also have a metadata field named "text_as_html" where the table's text content is rendered into an html string. I.e., rows and cells are preserved. Whether True or False, the "text" field is always present in any Table element and is the text content of the table (no structure). languages The languages present in the document, for use in partitioning and/or OCR. To use a language with Tesseract, you'll first need to install the appropriate Tesseract language pack. metadata_last_modified The last modified date for the document. hi_res_model_name The layout detection model used when partitioning strategy is set to `hi_res`. extract_images_in_pdf Only applicable if `strategy=hi_res`. If True, any detected images will be saved in the path specified by 'extract_image_block_output_dir' or stored as base64 encoded data within metadata fields. Deprecation Note: This parameter is marked for deprecation. Future versions will use 'extract_image_block_types' for broader extraction capabilities. extract_image_block_types Only applicable if `strategy=hi_res`. Images of the element type(s) specified in this list (e.g., ["Image", "Table"]) will be saved in the path specified by 'extract_image_block_output_dir' or stored as base64 encoded data within metadata fields. extract_image_block_to_payload Only applicable if `strategy=hi_res`. If True, images of the element type(s) defined in 'extract_image_block_types' will be encoded as base64 data and stored in two metadata fields: 'image_base64' and 'image_mime_type'. This parameter facilitates the inclusion of element data directly within the payload, especially for web-based applications or APIs. extract_image_block_output_dir Only applicable if `strategy=hi_res` and `extract_image_block_to_payload=False`. The filesystem path for saving images of the element type(s) specified in 'extract_image_block_types'. extract_forms Whether the form extraction logic should be run (results in adding FormKeysValues elements to output). form_extraction_skip_tables Whether the form extraction logic should ignore regions designated as Tables. """ exactly_one(filename=filename, file=file) languages = check_language_args(languages or [], ocr_languages) return partition_pdf_or_image( filename=filename, file=file, include_page_breaks=include_page_breaks, strategy=strategy, infer_table_structure=infer_table_structure, languages=languages, metadata_last_modified=metadata_last_modified, hi_res_model_name=hi_res_model_name, extract_images_in_pdf=extract_images_in_pdf, extract_image_block_types=extract_image_block_types, extract_image_block_output_dir=extract_image_block_output_dir, extract_image_block_to_payload=extract_image_block_to_payload, starting_page_number=starting_page_number, extract_forms=extract_forms, form_extraction_skip_tables=form_extraction_skip_tables, **kwargs, ) def partition_pdf_or_image( filename: str = "", file: Optional[bytes | IO[bytes]] = None, is_image: bool = False, include_page_breaks: bool = False, strategy: str = PartitionStrategy.AUTO, infer_table_structure: bool = False, languages: Optional[list[str]] = None, metadata_last_modified: Optional[str] = None, hi_res_model_name: Optional[str] = None, extract_images_in_pdf: bool = False, extract_image_block_types: Optional[list[str]] = None, extract_image_block_output_dir: Optional[str] = None, extract_image_block_to_payload: bool = False, starting_page_number: int = 1, extract_forms: bool = False, form_extraction_skip_tables: bool = True, **kwargs: Any, ) -> list[Element]: """Parses a pdf or image document into a list of interpreted elements.""" # TODO(alan): Extract information about the filetype to be processed from the template # route. Decoding the routing should probably be handled by a single function designed for # that task so as routing design changes, those changes are implemented in a single # function. if languages is None: languages = ["eng"] # init ability to process .heic files register_heif_opener() validate_strategy(strategy, is_image) last_modified = get_last_modified_date(filename) if filename else None extracted_elements = [] pdf_text_extractable = False if not is_image: try: extracted_elements = extractable_elements( filename=filename, file=spooled_to_bytes_io_if_needed(file), languages=languages, metadata_last_modified=metadata_last_modified or last_modified, starting_page_number=starting_page_number, **kwargs, ) pdf_text_extractable = any( isinstance(el, Text) and el.text.strip() for page_elements in extracted_elements for el in page_elements ) except Exception as e: logger.debug(e) logger.info("PDF text extraction failed, skip text extraction...") strategy = determine_pdf_or_image_strategy( strategy, is_image=is_image, pdf_text_extractable=pdf_text_extractable, infer_table_structure=infer_table_structure, extract_images_in_pdf=extract_images_in_pdf, extract_image_block_types=extract_image_block_types, ) if file is not None: file.seek(0) ocr_languages = prepare_languages_for_tesseract(languages) if env_config.OCR_AGENT == OCR_AGENT_PADDLE: ocr_languages = tesseract_to_paddle_language(ocr_languages) if strategy == PartitionStrategy.HI_RES: # NOTE(robinson): Catches a UserWarning that occurs when detection is called with warnings.catch_warnings(): warnings.simplefilter("ignore") elements = _partition_pdf_or_image_local( filename=filename, file=spooled_to_bytes_io_if_needed(file), is_image=is_image, infer_table_structure=infer_table_structure, include_page_breaks=include_page_breaks, languages=languages, ocr_languages=ocr_languages, metadata_last_modified=metadata_last_modified or last_modified, hi_res_model_name=hi_res_model_name, pdf_text_extractable=pdf_text_extractable, extract_images_in_pdf=extract_images_in_pdf, extract_image_block_types=extract_image_block_types, extract_image_block_output_dir=extract_image_block_output_dir, extract_image_block_to_payload=extract_image_block_to_payload, starting_page_number=starting_page_number, extract_forms=extract_forms, form_extraction_skip_tables=form_extraction_skip_tables, **kwargs, ) out_elements = _process_uncategorized_text_elements(elements) elif strategy == PartitionStrategy.FAST: out_elements = _partition_pdf_with_pdfparser( extracted_elements=extracted_elements, include_page_breaks=include_page_breaks, **kwargs, ) return out_elements elif strategy == PartitionStrategy.OCR_ONLY: # NOTE(robinson): Catches file conversion warnings when running with PDFs with warnings.catch_warnings(): elements = _partition_pdf_or_image_with_ocr( filename=filename, file=file, include_page_breaks=include_page_breaks, languages=languages, ocr_languages=ocr_languages, is_image=is_image, metadata_last_modified=metadata_last_modified or last_modified, starting_page_number=starting_page_number, **kwargs, ) out_elements = _process_uncategorized_text_elements(elements) return out_elements def extractable_elements( filename: str = "", file: Optional[bytes | IO[bytes]] = None, languages: Optional[list[str]] = None, metadata_last_modified: Optional[str] = None, starting_page_number: int = 1, **kwargs: Any, ) -> list[list[Element]]: if isinstance(file, bytes): file = io.BytesIO(file) return _partition_pdf_with_pdfminer( filename=filename, file=file, languages=languages, metadata_last_modified=metadata_last_modified, starting_page_number=starting_page_number, **kwargs, ) def _partition_pdf_with_pdfminer( filename: str, file: Optional[IO[bytes]], languages: list[str], metadata_last_modified: Optional[str], starting_page_number: int = 1, **kwargs: Any, ) -> list[list[Element]]: """Partitions a PDF using PDFMiner instead of using a layoutmodel. Used for faster processing or detectron2 is not available. Implementation is based on the `extract_text` implemenation in pdfminer.six, but modified to support tracking page numbers and working with file-like objects. ref: https://github.com/pdfminer/pdfminer.six/blob/master/pdfminer/high_level.py """ if languages is None: languages = ["eng"] exactly_one(filename=filename, file=file) if filename: with open_filename(filename, "rb") as fp: fp = cast(IO[bytes], fp) elements = _process_pdfminer_pages( fp=fp, filename=filename, languages=languages, metadata_last_modified=metadata_last_modified, starting_page_number=starting_page_number, **kwargs, ) elif file: elements = _process_pdfminer_pages( fp=file, filename=filename, languages=languages, metadata_last_modified=metadata_last_modified, starting_page_number=starting_page_number, **kwargs, ) return elements @requires_dependencies("pdfminer") def _process_pdfminer_pages( fp: IO[bytes], filename: str, languages: list[str], metadata_last_modified: Optional[str], annotation_threshold: Optional[float] = env_config.PDF_ANNOTATION_THRESHOLD, starting_page_number: int = 1, **kwargs, ) -> list[list[Element]]: """Uses PDFMiner to split a document into pages and process them.""" elements = [] for page_number, (page, page_layout) in enumerate( open_pdfminer_pages_generator(fp), start=starting_page_number ): width, height = page_layout.width, page_layout.height page_elements: list[Element] = [] annotation_list = [] coordinate_system = PixelSpace( width=width, height=height, ) if page.annots: annotation_list = get_uris(page.annots, height, coordinate_system, page_number) for obj in page_layout: x1, y1, x2, y2 = rect_to_bbox(obj.bbox, height) bbox = (x1, y1, x2, y2) urls_metadata: list[dict[str, Any]] = [] if len(annotation_list) > 0 and isinstance(obj, LTTextBox): annotations_within_element = check_annotations_within_element( annotation_list, bbox, page_number, annotation_threshold, ) _, words = get_words_from_obj(obj, height) for annot in annotations_within_element: urls_metadata.append(map_bbox_and_index(words, annot)) if hasattr(obj, "get_text"): _text_snippets: list[str] = [obj.get_text()] else: _text = _extract_text(obj) _text_snippets = re.split(PARAGRAPH_PATTERN, _text) for _text in _text_snippets: _text, moved_indices = clean_extra_whitespace_with_index_run(_text) if _text.strip(): points = ((x1, y1), (x1, y2), (x2, y2), (x2, y1)) element = element_from_text( _text, coordinates=points, coordinate_system=coordinate_system, ) coordinates_metadata = CoordinatesMetadata( points=points, system=coordinate_system, ) links = _get_links_from_urls_metadata(urls_metadata, moved_indices) element.metadata = ElementMetadata( filename=filename, page_number=page_number, coordinates=coordinates_metadata, last_modified=metadata_last_modified, links=links, languages=languages, ) element.metadata.detection_origin = "pdfminer" page_elements.append(element) page_elements = _combine_list_elements(page_elements, coordinate_system) elements.append(page_elements) return elements def _get_pdf_page_number( filename: str = "", file: Optional[bytes | IO[bytes]] = None, ) -> int: if file: number_of_pages = PdfReader(file).get_num_pages() file.seek(0) elif filename: number_of_pages = PdfReader(filename).get_num_pages() else: raise ValueError("Either 'file' or 'filename' must be provided.") return number_of_pages def check_pdf_hi_res_max_pages_exceeded( filename: str = "", file: Optional[bytes | IO[bytes]] = None, pdf_hi_res_max_pages: int = None, ) -> None: """Checks whether PDF exceeds pdf_hi_res_max_pages limit.""" if pdf_hi_res_max_pages: document_pages = _get_pdf_page_number(filename=filename, file=file) if document_pages > pdf_hi_res_max_pages: raise PageCountExceededError( document_pages=document_pages, pdf_hi_res_max_pages=pdf_hi_res_max_pages ) @requires_dependencies("unstructured_inference") def _partition_pdf_or_image_local( filename: str = "", file: Optional[bytes | IO[bytes]] = None, is_image: bool = False, infer_table_structure: bool = False, include_page_breaks: bool = False, languages: Optional[list[str]] = None, ocr_languages: Optional[str] = None, ocr_mode: str = OCRMode.FULL_PAGE.value, model_name: Optional[str] = None, # to be deprecated in favor of `hi_res_model_name` hi_res_model_name: Optional[str] = None, pdf_image_dpi: Optional[int] = None, metadata_last_modified: Optional[str] = None, pdf_text_extractable: bool = False, extract_images_in_pdf: bool = False, extract_image_block_types: Optional[list[str]] = None, extract_image_block_output_dir: Optional[str] = None, extract_image_block_to_payload: bool = False, analysis: bool = False, analyzed_image_output_dir_path: Optional[str] = None, starting_page_number: int = 1, extract_forms: bool = False, form_extraction_skip_tables: bool = True, pdf_hi_res_max_pages: Optional[int] = None, **kwargs: Any, ) -> list[Element]: """Partition using package installed locally""" from unstructured_inference.inference.layout import ( process_data_with_model, process_file_with_model, ) from unstructured.partition.pdf_image.ocr import process_data_with_ocr, process_file_with_ocr from unstructured.partition.pdf_image.pdfminer_processing import ( process_data_with_pdfminer, process_file_with_pdfminer, ) if not is_image: check_pdf_hi_res_max_pages_exceeded( filename=filename, file=file, pdf_hi_res_max_pages=pdf_hi_res_max_pages ) hi_res_model_name = hi_res_model_name or model_name or default_hi_res_model() if pdf_image_dpi is None: pdf_image_dpi = 200 od_model_layout_dumper: Optional[ObjectDetectionLayoutDumper] = None extracted_layout_dumper: Optional[ExtractedLayoutDumper] = None ocr_layout_dumper: Optional[OCRLayoutDumper] = None final_layout_dumper: Optional[FinalLayoutDumper] = None skip_analysis_dump = env_config.ANALYSIS_DUMP_OD_SKIP if file is None: inferred_document_layout = process_file_with_model( filename, is_image=is_image, model_name=hi_res_model_name, pdf_image_dpi=pdf_image_dpi, ) extracted_layout, layouts_links = ( process_file_with_pdfminer(filename=filename, dpi=pdf_image_dpi) if pdf_text_extractable else ([], []) ) if analysis: if not analyzed_image_output_dir_path: if env_config.GLOBAL_WORKING_DIR_ENABLED: analyzed_image_output_dir_path = str( Path(env_config.GLOBAL_WORKING_PROCESS_DIR) / "annotated" ) else: analyzed_image_output_dir_path = str(Path.cwd() / "annotated") os.makedirs(analyzed_image_output_dir_path, exist_ok=True) if not skip_analysis_dump: od_model_layout_dumper = ObjectDetectionLayoutDumper( layout=inferred_document_layout, model_name=hi_res_model_name, ) extracted_layout_dumper = ExtractedLayoutDumper( layout=[layout.as_list() for layout in extracted_layout], ) ocr_layout_dumper = OCRLayoutDumper() # NOTE(christine): merged_document_layout = extracted_layout + inferred_layout merged_document_layout = merge_inferred_with_extracted_layout( inferred_document_layout=inferred_document_layout, extracted_layout=extracted_layout, hi_res_model_name=hi_res_model_name, ) final_document_layout = process_file_with_ocr( filename, merged_document_layout, extracted_layout=extracted_layout, is_image=is_image, infer_table_structure=infer_table_structure, ocr_languages=ocr_languages, ocr_mode=ocr_mode, pdf_image_dpi=pdf_image_dpi, ocr_layout_dumper=ocr_layout_dumper, ) else: inferred_document_layout = process_data_with_model( file, is_image=is_image, model_name=hi_res_model_name, pdf_image_dpi=pdf_image_dpi, ) if hasattr(file, "seek"): file.seek(0) extracted_layout, layouts_links = ( process_data_with_pdfminer(file=file, dpi=pdf_image_dpi) if pdf_text_extractable else ([], []) ) if analysis: if not analyzed_image_output_dir_path: if env_config.GLOBAL_WORKING_DIR_ENABLED: analyzed_image_output_dir_path = str( Path(env_config.GLOBAL_WORKING_PROCESS_DIR) / "annotated" ) else: analyzed_image_output_dir_path = str(Path.cwd() / "annotated") if not skip_analysis_dump: od_model_layout_dumper = ObjectDetectionLayoutDumper( layout=inferred_document_layout, model_name=hi_res_model_name, ) extracted_layout_dumper = ExtractedLayoutDumper( layout=[layout.as_list() for layout in extracted_layout], ) ocr_layout_dumper = OCRLayoutDumper() # NOTE(christine): merged_document_layout = extracted_layout + inferred_layout merged_document_layout = merge_inferred_with_extracted_layout( inferred_document_layout=inferred_document_layout, extracted_layout=extracted_layout, hi_res_model_name=hi_res_model_name, ) if hasattr(file, "seek"): file.seek(0) final_document_layout = process_data_with_ocr( file, merged_document_layout, extracted_layout=extracted_layout, is_image=is_image, infer_table_structure=infer_table_structure, ocr_languages=ocr_languages, ocr_mode=ocr_mode, pdf_image_dpi=pdf_image_dpi, ocr_layout_dumper=ocr_layout_dumper, ) # vectorization of the data structure ends here final_document_layout = clean_pdfminer_inner_elements(final_document_layout) for page in final_document_layout.pages: for el in page.elements: el.text = el.text or "" elements = document_to_element_list( final_document_layout, sortable=True, include_page_breaks=include_page_breaks, last_modification_date=metadata_last_modified, # NOTE(crag): do not attempt to derive ListItem's from a layout-recognized "list" # block with NLP rules. Otherwise, the assumptions in # unstructured.partition.common::layout_list_to_list_items often result in weird chunking. infer_list_items=False, languages=languages, starting_page_number=starting_page_number, layouts_links=layouts_links, **kwargs, ) extract_image_block_types = check_element_types_to_extract(extract_image_block_types) # NOTE(christine): `extract_images_in_pdf` would deprecate # (but continue to support for a while) if extract_images_in_pdf: save_elements( elements=elements, starting_page_number=starting_page_number, element_category_to_save=ElementType.IMAGE, filename=filename, file=file, is_image=is_image, pdf_image_dpi=pdf_image_dpi, extract_image_block_to_payload=extract_image_block_to_payload, output_dir_path=extract_image_block_output_dir, ) for el_type in extract_image_block_types: if extract_images_in_pdf and el_type == ElementType.IMAGE: continue save_elements( elements=elements, starting_page_number=starting_page_number, element_category_to_save=el_type, filename=filename, file=file, is_image=is_image, pdf_image_dpi=pdf_image_dpi, extract_image_block_to_payload=extract_image_block_to_payload, output_dir_path=extract_image_block_output_dir, ) out_elements = [] for el in elements: if isinstance(el, PageBreak) and not include_page_breaks: continue if isinstance(el, Image): out_elements.append(cast(Element, el)) # NOTE(crag): this is probably always a Text object, but check for the sake of typing elif isinstance(el, Text): el.text = re.sub( RE_MULTISPACE_INCLUDING_NEWLINES, " ", el.text or "", ).strip() if el.text or isinstance(el, PageBreak): out_elements.append(cast(Element, el)) if extract_forms: forms = run_form_extraction( file=file, filename=filename, model_name=hi_res_model_name, elements=out_elements, skip_table_regions=form_extraction_skip_tables, ) out_elements.extend(forms) if analysis: if not skip_analysis_dump: final_layout_dumper = FinalLayoutDumper( layout=out_elements, ) layout_dumpers = [] if od_model_layout_dumper: layout_dumpers.append(od_model_layout_dumper) if extracted_layout_dumper: layout_dumpers.append(extracted_layout_dumper) if ocr_layout_dumper: layout_dumpers.append(ocr_layout_dumper) if final_layout_dumper: layout_dumpers.append(final_layout_dumper) save_analysis_artifiacts( *layout_dumpers, filename=filename, file=file, is_image=is_image, analyzed_image_output_dir_path=analyzed_image_output_dir_path, skip_bboxes=env_config.ANALYSIS_BBOX_SKIP, skip_dump_od=env_config.ANALYSIS_DUMP_OD_SKIP, draw_grid=env_config.ANALYSIS_BBOX_DRAW_GRID, draw_caption=env_config.ANALYSIS_BBOX_DRAW_CAPTION, resize=env_config.ANALYSIS_BBOX_RESIZE, format=env_config.ANALYSIS_BBOX_FORMAT, ) return out_elements def _partition_pdf_with_pdfparser( extracted_elements: list[list[Element]], include_page_breaks: bool = False, sort_mode: str = SORT_MODE_XY_CUT, **kwargs, ): """Partitions a PDF using pdfparser.""" elements = [] for page_elements in extracted_elements: # NOTE(crag, christine): always do the basic sort first for deterministic order across # python versions. sorted_page_elements = sort_page_elements(page_elements, SORT_MODE_BASIC) if sort_mode != SORT_MODE_BASIC: sorted_page_elements = sort_page_elements(sorted_page_elements, sort_mode) elements += sorted_page_elements if include_page_breaks: elements.append(PageBreak(text="")) return elements def _partition_pdf_or_image_with_ocr( filename: str = "", file: Optional[bytes | IO[bytes]] = None, include_page_breaks: bool = False, languages: Optional[list[str]] = None, ocr_languages: Optional[str] = None, is_image: bool = False, metadata_last_modified: Optional[str] = None, starting_page_number: int = 1, **kwargs: Any, ): """Partitions an image or PDF using OCR. For PDFs, each page is converted to an image prior to processing.""" elements = [] if is_image: images = [] image = PILImage.open(file) if file is not None else PILImage.open(filename) images.append(image) for page_number, image in enumerate(images, start=starting_page_number): page_elements = _partition_pdf_or_image_with_ocr_from_image( image=image, languages=languages, ocr_languages=ocr_languages, page_number=page_number, include_page_breaks=include_page_breaks, metadata_last_modified=metadata_last_modified, **kwargs, ) elements.extend(page_elements) else: for page_number, image in enumerate( convert_pdf_to_images(filename, file), start=starting_page_number ): page_elements = _partition_pdf_or_image_with_ocr_from_image( image=image, languages=languages, ocr_languages=ocr_languages, page_number=page_number, include_page_breaks=include_page_breaks, metadata_last_modified=metadata_last_modified, **kwargs, ) elements.extend(page_elements) return elements def _partition_pdf_or_image_with_ocr_from_image( image: PILImage.Image, languages: Optional[list[str]] = None, ocr_languages: Optional[str] = None, page_number: int = 1, include_page_breaks: bool = False, metadata_last_modified: Optional[str] = None, sort_mode: str = SORT_MODE_XY_CUT, **kwargs: Any, ) -> list[Element]: """Extract `unstructured` elements from an image using OCR and perform partitioning.""" from unstructured.partition.utils.ocr_models.ocr_interface import OCRAgent ocr_agent = OCRAgent.get_agent(language=ocr_languages) # NOTE(christine): `pytesseract.image_to_string()` returns sorted text if ocr_agent.is_text_sorted(): sort_mode = SORT_MODE_DONT ocr_data = ocr_agent.get_layout_elements_from_image(image=image) metadata = ElementMetadata( last_modified=metadata_last_modified, filetype=image.format, page_number=page_number, languages=languages, ) # NOTE (yao): elements for a document is still stored as a list therefore at this step we have # to convert the vector data structured ocr_data into a list page_elements = ocr_data_to_elements( ocr_data.as_list(), image_size=image.size, common_metadata=metadata, ) sorted_page_elements = page_elements if sort_mode != SORT_MODE_DONT: sorted_page_elements = sort_page_elements(page_elements, sort_mode) if include_page_breaks: sorted_page_elements.append(PageBreak(text="")) return page_elements def _process_uncategorized_text_elements(elements: list[Element]): """Processes a list of elements, creating a new list where elements with the category `UncategorizedText` are replaced with corresponding elements created from their text content.""" out_elements = [] for el in elements: if hasattr(el, "category") and el.category == ElementType.UNCATEGORIZED_TEXT: new_el = element_from_text(cast(Text, el).text) new_el.metadata = el.metadata else: new_el = el out_elements.append(new_el) return out_elements def _extract_text(item: LTItem) -> str: """Recursively extracts text from PDFMiner objects to account for scenarios where the text is in a sub-container.""" if hasattr(item, "get_text"): return item.get_text() elif isinstance(item, LTContainer): text = "" for child in item: text += _extract_text(child) or "" return text elif isinstance(item, (LTTextBox, LTImage)): # TODO(robinson) - Support pulling text out of images # https://github.com/pdfminer/pdfminer.six/blob/master/pdfminer/image.py#L90 return "\n" return "\n" # Some pages with a ICC color space do not follow the pdf spec # They throw an error when we call interpreter.process_page # Since we don't need color info, we can just drop it in the pdfminer code # See #2059 @wrapt.patch_function_wrapper("pdfminer.pdfinterp", "PDFPageInterpreter.init_resources") def pdfminer_interpreter_init_resources(wrapped, instance, args, kwargs): resources = args[0] if "ColorSpace" in resources: del resources["ColorSpace"] return wrapped(resources) def _combine_list_elements( elements: list[Element], coordinate_system: PixelSpace | PointSpace ) -> list[Element]: """Combine elements that should be considered a single ListItem element.""" tmp_element = None updated_elements: list[Element] = [] for element in elements: if isinstance(element, ListItem): tmp_element = element tmp_text = element.text tmp_coords = element.metadata.coordinates elif tmp_element and check_coords_within_boundary( coordinates=element.metadata.coordinates, boundary=tmp_coords, ): tmp_element.text = f"{tmp_text} {element.text}" # replace "element" with the corrected element element = _combine_coordinates_into_element1( element1=tmp_element, element2=element, coordinate_system=coordinate_system, ) # remove previously added ListItem element with incomplete text updated_elements.pop() updated_elements.append(element) return updated_elements def _get_links_from_urls_metadata( urls_metadata: list[dict[str, Any]], moved_indices: np.ndarray ) -> list[Link]: """Extracts links from a list of URL metadata.""" links: list[Link] = [] for url in urls_metadata: with contextlib.suppress(IndexError): links.append( { "text": url["text"], "url": url["uri"], "start_index": index_adjustment_after_clean_extra_whitespace( url["start_index"], moved_indices, ), }, ) return links def _combine_coordinates_into_element1( element1: Element, element2: Element, coordinate_system: PixelSpace | PointSpace ) -> Element: """Combine the coordiantes of two elements and apply the updated coordiantes to `elements1`""" x1 = min( element1.metadata.coordinates.points[0][0], element2.metadata.coordinates.points[0][0], ) x2 = max( element1.metadata.coordinates.points[2][0], element2.metadata.coordinates.points[2][0], ) y1 = min( element1.metadata.coordinates.points[0][1], element2.metadata.coordinates.points[0][1], ) y2 = max( element1.metadata.coordinates.points[1][1], element2.metadata.coordinates.points[1][1], ) points = ((x1, y1), (x1, y2), (x2, y2), (x2, y1)) element1.metadata.coordinates = CoordinatesMetadata( points=points, system=coordinate_system, ) return copy.deepcopy(element1) def check_coords_within_boundary( coordinates: CoordinatesMetadata, boundary: CoordinatesMetadata, horizontal_threshold: float = 0.2, vertical_threshold: float = 0.3, ) -> bool: """Checks if the coordinates are within boundary thresholds. Parameters ---------- coordinates a CoordinatesMetadata input boundary a CoordinatesMetadata to compare against vertical_threshold a float ranges from [0,1] to scale the vertical (y-axis) boundary horizontal_threshold a float ranges from [0,1] to scale the horizontal (x-axis) boundary """ if not coord_has_valid_points(coordinates) and not coord_has_valid_points(boundary): trace_logger.detail( # type: ignore f"coordinates {coordinates} and boundary {boundary} did not pass validation", ) return False boundary_x_min = boundary.points[0][0] boundary_x_max = boundary.points[2][0] boundary_y_min = boundary.points[0][1] boundary_y_max = boundary.points[1][1] line_width = boundary_x_max - boundary_x_min line_height = boundary_y_max - boundary_y_min x_within_boundary = ( (coordinates.points[0][0] > boundary_x_min - (horizontal_threshold * line_width)) and (coordinates.points[2][0] < boundary_x_max + (horizontal_threshold * line_width)) and (coordinates.points[0][0] >= boundary_x_min) ) y_within_boundary = ( coordinates.points[0][1] < boundary_y_max + (vertical_threshold * line_height) ) and (coordinates.points[0][1] > boundary_y_min - (vertical_threshold * line_height)) return x_within_boundary and y_within_boundary def document_to_element_list( document: DocumentLayout, sortable: bool = False, include_page_breaks: bool = False, last_modification_date: Optional[str] = None, infer_list_items: bool = True, source_format: Optional[str] = None, detection_origin: Optional[str] = None, sort_mode: str = SORT_MODE_XY_CUT, languages: Optional[list[str]] = None, starting_page_number: int = 1, layouts_links: Optional[list[list]] = None, **kwargs: Any, ) -> list[Element]: """Converts a DocumentLayout object to a list of unstructured elements.""" elements: list[Element] = [] num_pages = len(document.pages) for page_number, page in enumerate(document.pages, start=starting_page_number): page_elements: list[Element] = [] page_image_metadata = get_page_image_metadata(page) image_format = page_image_metadata.get("format") image_width = page_image_metadata.get("width") image_height = page_image_metadata.get("height") translation_mapping: list[tuple["LayoutElement", Element]] = [] links = ( layouts_links[page_number - starting_page_number] if layouts_links and layouts_links[0] else None ) for layout_element in page.elements: if ( image_width and image_height and getattr(layout_element.bbox, "x1") not in (None, np.nan) ): coordinate_system = PixelSpace(width=image_width, height=image_height) else: coordinate_system = None element = normalize_layout_element( layout_element, coordinate_system=coordinate_system, infer_list_items=infer_list_items, source_format=source_format if source_format else "html", ) if isinstance(element, list): for el in element: if last_modification_date: el.metadata.last_modified = last_modification_date el.metadata.page_number = page_number page_elements.extend(element) translation_mapping.extend([(layout_element, el) for el in element]) continue else: element.metadata.links = ( get_links_in_element(links, layout_element.bbox) if links else [] ) if last_modification_date: element.metadata.last_modified = last_modification_date element.metadata.text_as_html = getattr(layout_element, "text_as_html", None) element.metadata.table_as_cells = getattr(layout_element, "table_as_cells", None) if (isinstance(element, Title) and element.metadata.category_depth is None) and any( getattr(el, "type", "") in ["Headline", "Subheadline"] for el in page.elements ): element.metadata.category_depth = 0 page_elements.append(element) translation_mapping.append((layout_element, element)) coordinates = ( element.metadata.coordinates.points if element.metadata.coordinates else None ) el_image_path = ( layout_element.image_path if hasattr(layout_element, "image_path") else None ) add_element_metadata( element, page_number=page_number, filetype=image_format, coordinates=coordinates, coordinate_system=coordinate_system, category_depth=element.metadata.category_depth, image_path=el_image_path, detection_origin=detection_origin, languages=languages, **kwargs, ) for layout_element, element in translation_mapping: if hasattr(layout_element, "parent") and layout_element.parent is not None: element_parent = first( (el for l_el, el in translation_mapping if l_el is layout_element.parent), ) element.metadata.parent_id = element_parent.id sorted_page_elements = page_elements if sortable and sort_mode != SORT_MODE_DONT: sorted_page_elements = sort_page_elements(page_elements, sort_mode) if include_page_breaks and page_number < num_pages + starting_page_number: sorted_page_elements.append(PageBreak(text="")) elements.extend(sorted_page_elements) return elements
Memory