from __future__ import annotations
import numbers
import subprocess
from io import BufferedReader, BytesIO, TextIOWrapper
from tempfile import SpooledTemporaryFile
from time import sleep
from typing import IO, TYPE_CHECKING, Any, Optional, TypeVar, cast
import emoji
import psutil
from unstructured.documents.coordinates import CoordinateSystem, PixelSpace
from unstructured.documents.elements import (
TYPE_TO_TEXT_ELEMENT_MAP,
CheckBox,
CoordinatesMetadata,
Element,
ElementMetadata,
ElementType,
ListItem,
PageBreak,
Text,
)
from unstructured.logger import logger
from unstructured.nlp.patterns import ENUMERATED_BULLETS_RE, UNICODE_BULLETS_RE
if TYPE_CHECKING:
from unstructured_inference.inference.layout import PageLayout
from unstructured_inference.inference.layoutelement import LayoutElement
def normalize_layout_element(
layout_element: LayoutElement | Element | dict[str, Any],
coordinate_system: Optional[CoordinateSystem] = None,
infer_list_items: bool = True,
source_format: Optional[str] = "html",
) -> Element | list[Element]:
"""Converts an unstructured_inference LayoutElement object to an unstructured Element."""
if isinstance(layout_element, Element) and source_format == "html":
return layout_element
# NOTE(alan): Won't the lines above ensure this never runs (PageBreak is a subclass of Element)?
if isinstance(layout_element, PageBreak):
return PageBreak(text="")
if not isinstance(layout_element, dict):
layout_dict = layout_element.to_dict()
else:
layout_dict = layout_element
text = layout_dict.get("text", "")
# Both `coordinates` and `coordinate_system` must be present
# in order to add coordinates metadata to the element.
coordinates = layout_dict.get("coordinates") if coordinate_system else None
element_type = layout_dict.get("type")
prob = layout_dict.get("prob")
aux_origin = layout_dict.get("source", None)
origin = None
if aux_origin:
origin = aux_origin.value
if prob and isinstance(prob, (int, str, float, numbers.Number)):
class_prob_metadata = ElementMetadata(detection_class_prob=float(prob)) # type: ignore
else:
class_prob_metadata = ElementMetadata()
common_kwargs = {
"coordinates": coordinates,
"coordinate_system": coordinate_system,
"metadata": class_prob_metadata,
"detection_origin": origin,
}
if element_type == ElementType.LIST:
if infer_list_items:
return layout_list_to_list_items(
text,
**common_kwargs,
)
else:
return ListItem(
text=text,
**common_kwargs,
)
elif element_type in TYPE_TO_TEXT_ELEMENT_MAP:
assert isinstance(element_type, str) # Added to resolve type-error
_element_class = TYPE_TO_TEXT_ELEMENT_MAP[element_type]
_element_class = _element_class(
text=text,
**common_kwargs,
)
if element_type == ElementType.HEADLINE:
_element_class.metadata.category_depth = 1
elif element_type == ElementType.SUB_HEADLINE:
_element_class.metadata.category_depth = 2
return _element_class
elif element_type in [
ElementType.CHECK_BOX_CHECKED,
ElementType.CHECK_BOX_UNCHECKED,
ElementType.RADIO_BUTTON_CHECKED,
ElementType.RADIO_BUTTON_UNCHECKED,
ElementType.CHECKED,
ElementType.UNCHECKED,
]:
checked = element_type in [
ElementType.CHECK_BOX_CHECKED,
ElementType.RADIO_BUTTON_CHECKED,
ElementType.CHECKED,
]
return CheckBox(
checked=checked,
**common_kwargs,
)
else:
return Text(
text=text,
**common_kwargs,
)
def layout_list_to_list_items(
text: Optional[str],
coordinates: Optional[tuple[tuple[float, float], ...]],
coordinate_system: Optional[CoordinateSystem],
metadata: Optional[ElementMetadata],
detection_origin: Optional[str],
) -> list[Element]:
"""Converts a list LayoutElement to a list of ListItem elements."""
split_items = ENUMERATED_BULLETS_RE.split(text) if text else []
# NOTE(robinson) - this means there wasn't a match for the enumerated bullets
if len(split_items) == 1:
split_items = UNICODE_BULLETS_RE.split(text) if text else []
list_items: list[Element] = []
for text_segment in split_items:
if len(text_segment.strip()) > 0:
# Both `coordinates` and `coordinate_system` must be present
# in order to add coordinates metadata to the element.
item = ListItem(
text=text_segment.strip(),
coordinates=coordinates,
coordinate_system=coordinate_system,
metadata=metadata,
detection_origin=detection_origin,
)
list_items.append(item)
return list_items
def add_element_metadata(
element: Element,
filename: Optional[str] = None,
filetype: Optional[str] = None,
page_number: Optional[int] = None,
url: Optional[str] = None,
text_as_html: Optional[str] = None,
coordinates: Optional[tuple[tuple[float, float], ...]] = None,
coordinate_system: Optional[CoordinateSystem] = None,
image_path: Optional[str] = None,
detection_origin: Optional[str] = None,
languages: Optional[list[str]] = None,
**kwargs: Any,
) -> Element:
"""Adds document metadata to the document element.
Document metadata includes information like the filename, source url, and page number.
"""
coordinates_metadata = (
CoordinatesMetadata(
points=coordinates,
system=coordinate_system,
)
if coordinates is not None and coordinate_system is not None
else None
)
links = element.links if hasattr(element, "links") and len(element.links) > 0 else None
link_urls = [link.get("url") for link in links] if links else None
link_texts = [link.get("text") for link in links] if links else None
link_start_indexes = [link.get("start_index") for link in links] if links else None
emphasized_texts = (
element.emphasized_texts
if hasattr(element, "emphasized_texts") and len(element.emphasized_texts) > 0
else None
)
emphasized_text_contents = (
[emphasized_text.get("text") for emphasized_text in emphasized_texts]
if emphasized_texts
else None
)
emphasized_text_tags = (
[emphasized_text.get("tag") for emphasized_text in emphasized_texts]
if emphasized_texts
else None
)
depth = element.metadata.category_depth if element.metadata.category_depth else None
metadata = ElementMetadata(
coordinates=coordinates_metadata,
filename=filename,
filetype=filetype,
page_number=page_number,
url=url,
text_as_html=text_as_html,
link_urls=link_urls,
link_texts=link_texts,
link_start_indexes=link_start_indexes,
emphasized_text_contents=emphasized_text_contents,
emphasized_text_tags=emphasized_text_tags,
category_depth=depth,
image_path=image_path,
languages=languages,
)
element.metadata.update(metadata)
if detection_origin is not None:
element.metadata.detection_origin = detection_origin
return element
def remove_element_metadata(layout_elements: list[Element]) -> list[Element]:
"""Removes document metadata from the document element.
Document metadata includes information like the filename, source url, and page number.
"""
elements: list[Element] = []
metadata = ElementMetadata()
for layout_element in layout_elements:
element = normalize_layout_element(layout_element)
if isinstance(element, list):
for _element in element:
_element.metadata = metadata
elements.extend(element)
else:
element.metadata = metadata
elements.append(element)
return elements
def _is_soffice_running():
for proc in psutil.process_iter():
try:
if "soffice" in proc.name().lower():
return True
except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess):
pass
return False
def convert_office_doc(
input_filename: str,
output_directory: str,
target_format: str = "docx",
target_filter: Optional[str] = None,
wait_for_soffice_ready_time_out: int = 10,
):
"""Converts a .doc/.ppt file to a .docx/.pptx file using the libreoffice CLI.
Parameters
----------
input_filename: str
The name of the .doc file to convert to .docx
output_directory: str
The output directory for the convert .docx file
target_format: str
The desired output format
target_filter: str
The output filter name to use when converting. See references below
for details.
wait_for_soffice_ready_time_out: int
The max wait time in seconds for soffice to become available to run
References
----------
https://stackoverflow.com/questions/52277264/convert-doc-to-docx-using-soffice-not-working
https://git.libreoffice.org/core/+/refs/heads/master/filter/source/config/fragments/filters
"""
if target_filter is not None:
target_format = f"{target_format}:{target_filter}"
# NOTE(robinson) - In the future can also include win32com client as a fallback for windows
# users who do not have LibreOffice installed
# ref: https://stackoverflow.com/questions/38468442/
# multiple-doc-to-docx-file-conversion-using-python
command = [
"soffice",
"--headless",
"--convert-to",
target_format,
"--outdir",
output_directory,
input_filename,
]
try:
# only one soffice process can be ran
wait_time = 0
sleep_time = 0.1
output = subprocess.run(command, capture_output=True)
message = output.stdout.decode().strip()
# we can't rely on returncode unfortunately because on macOS it would return 0 even when the
# command failed to run; instead we have to rely on the stdout being empty as a sign of the
# process failed
while (wait_time < wait_for_soffice_ready_time_out) and (message == ""):
wait_time += sleep_time
if _is_soffice_running():
sleep(sleep_time)
else:
output = subprocess.run(command, capture_output=True)
message = output.stdout.decode().strip()
except FileNotFoundError:
raise FileNotFoundError(
"""soffice command was not found. Please install libreoffice
on your system and try again.
- Install instructions: https://www.libreoffice.org/get-help/install-howto/
- Mac: https://formulae.brew.sh/cask/libreoffice
- Debian: https://wiki.debian.org/LibreOffice""",
)
logger.info(message)
if output.returncode != 0 or message == "":
logger.error(
"soffice failed to convert to format %s with code %i", target_format, output.returncode
)
logger.error(output.stderr.decode().strip())
def exactly_one(**kwargs: Any) -> None:
"""
Verify arguments; exactly one of all keyword arguments must not be None.
Example:
>>> exactly_one(filename=filename, file=file, text=text, url=url)
"""
if sum([(arg is not None and arg != "") for arg in kwargs.values()]) != 1:
names = list(kwargs.keys())
if len(names) > 1:
message = f"Exactly one of {', '.join(names[:-1])} and {names[-1]} must be specified."
else:
message = f"{names[0]} must be specified."
raise ValueError(message)
_T = TypeVar("_T")
def spooled_to_bytes_io_if_needed(file: _T | SpooledTemporaryFile[bytes]) -> _T | BytesIO:
"""Convert `file` to `BytesIO` when it is a `SpooledTemporaryFile`.
Note that `file` does not need to be IO[bytes]. It can be `None` or `bytes` and this function
will not complain.
In Python <3.11, `SpooledTemporaryFile` does not implement `.readable()` or `.seekable()` which
triggers an exception when the file is loaded by certain packages. In particular, the stdlib
`zipfile.Zipfile` raises on opening a `SpooledTemporaryFile` as does `Pandas.read_csv()`.
"""
if isinstance(file, SpooledTemporaryFile):
file.seek(0)
return BytesIO(cast(bytes, file.read()))
# -- return `file` unchanged otherwise --
return file
def convert_to_bytes(file: bytes | IO[bytes]) -> bytes:
"""Extract the bytes from `file` without preventing it from being read again later.
As a convenience to simplify client code, also returns `file` unchanged if it is already bytes.
"""
if isinstance(file, bytes):
return file
if isinstance(file, SpooledTemporaryFile):
file.seek(0)
f_bytes = file.read()
file.seek(0)
return f_bytes
if isinstance(file, BytesIO):
return file.getvalue()
if isinstance(file, (TextIOWrapper, BufferedReader)):
with open(file.name, "rb") as f:
return f.read()
raise ValueError("Invalid file-like object type")
def contains_emoji(s: str) -> bool:
"""
Check if the input string contains any emoji characters.
Parameters:
- s (str): The input string to check.
Returns:
- bool: True if the string contains any emoji, False otherwise.
"""
return bool(emoji.emoji_count(s))
def get_page_image_metadata(page: PageLayout) -> dict[str, Any]:
"""Retrieve image metadata and coordinate system from a page."""
image = getattr(page, "image", None)
image_metadata = getattr(page, "image_metadata", None)
if image:
image_format = image.format
image_width = image.width
image_height = image.height
elif image_metadata:
image_format = image_metadata.get("format")
image_width = image_metadata.get("width")
image_height = image_metadata.get("height")
else:
image_format = None
image_width = None
image_height = None
return {
"format": image_format,
"width": image_width,
"height": image_height,
}
def ocr_data_to_elements(
ocr_data: list["LayoutElement"],
image_size: tuple[int | float, int | float],
common_metadata: Optional[ElementMetadata] = None,
infer_list_items: bool = True,
source_format: Optional[str] = None,
) -> list[Element]:
"""Convert OCR layout data into `unstructured` elements with associated metadata."""
image_width, image_height = image_size
coordinate_system = PixelSpace(width=image_width, height=image_height)
elements: list[Element] = []
for layout_element in ocr_data:
element = normalize_layout_element(
layout_element,
coordinate_system=coordinate_system,
infer_list_items=infer_list_items,
source_format=source_format if source_format else "html",
)
if common_metadata:
element.metadata.update(common_metadata)
elements.append(element)
return elements