"""API for reading/writing serialized Open Packaging Convention (OPC) package.""" from __future__ import annotations import os import posixpath import zipfile from typing import IO, TYPE_CHECKING, Any, Container, Sequence from pptx.exc import PackageNotFoundError from pptx.opc.constants import CONTENT_TYPE as CT from pptx.opc.oxml import CT_Types, serialize_part_xml from pptx.opc.packuri import CONTENT_TYPES_URI, PACKAGE_URI, PackURI from pptx.opc.shared import CaseInsensitiveDict from pptx.opc.spec import default_content_types from pptx.util import lazyproperty if TYPE_CHECKING: from pptx.opc.package import Part, _Relationships # pyright: ignore[reportPrivateUsage] class PackageReader(Container[bytes]): """Provides access to package-parts of an OPC package with dict semantics. The package may be in zip-format (a .pptx file) or expanded into a directory structure, perhaps by unzipping a .pptx file. """ def __init__(self, pkg_file: str | IO[bytes]): self._pkg_file = pkg_file def __contains__(self, pack_uri: object) -> bool: """Return True when part identified by `pack_uri` is present in package.""" return pack_uri in self._blob_reader def __getitem__(self, pack_uri: PackURI) -> bytes: """Return bytes for part corresponding to `pack_uri`.""" return self._blob_reader[pack_uri] def rels_xml_for(self, partname: PackURI) -> bytes | None: """Return optional rels item XML for `partname`. Returns `None` if no rels item is present for `partname`. `partname` is a |PackURI| instance. """ blob_reader, uri = self._blob_reader, partname.rels_uri return blob_reader[uri] if uri in blob_reader else None @lazyproperty def _blob_reader(self) -> _PhysPkgReader: """|_PhysPkgReader| subtype providing read access to the package file.""" return _PhysPkgReader.factory(self._pkg_file) class PackageWriter: """Writes a zip-format OPC package to `pkg_file`. `pkg_file` can be either a path to a zip file (a string) or a file-like object. `pkg_rels` is the |_Relationships| object containing relationships for the package. `parts` is a sequence of |Part| subtype instance to be written to the package. Its single API classmethod is :meth:`write`. This class is not intended to be instantiated. """ def __init__(self, pkg_file: str | IO[bytes], pkg_rels: _Relationships, parts: Sequence[Part]): self._pkg_file = pkg_file self._pkg_rels = pkg_rels self._parts = parts @classmethod def write( cls, pkg_file: str | IO[bytes], pkg_rels: _Relationships, parts: Sequence[Part] ) -> None: """Write a physical package (.pptx file) to `pkg_file`. The serialized package contains `pkg_rels` and `parts`, a content-types stream based on the content type of each part, and a .rels file for each part that has relationships. """ cls(pkg_file, pkg_rels, parts)._write() def _write(self) -> None: """Write physical package (.pptx file).""" with _PhysPkgWriter.factory(self._pkg_file) as phys_writer: self._write_content_types_stream(phys_writer) self._write_pkg_rels(phys_writer) self._write_parts(phys_writer) def _write_content_types_stream(self, phys_writer: _PhysPkgWriter) -> None: """Write `[Content_Types].xml` part to the physical package. This part must contain an appropriate content type lookup target for each part in the package. """ phys_writer.write( CONTENT_TYPES_URI, serialize_part_xml(_ContentTypesItem.xml_for(self._parts)), ) def _write_parts(self, phys_writer: _PhysPkgWriter) -> None: """Write blob of each part in `parts` to the package. A rels item for each part is also written when the part has relationships. """ for part in self._parts: phys_writer.write(part.partname, part.blob) if part._rels: # pyright: ignore[reportPrivateUsage] phys_writer.write(part.partname.rels_uri, part.rels.xml) def _write_pkg_rels(self, phys_writer: _PhysPkgWriter) -> None: """Write the XML rels item for `pkg_rels` ('/_rels/.rels') to the package.""" phys_writer.write(PACKAGE_URI.rels_uri, self._pkg_rels.xml) class _PhysPkgReader(Container[PackURI]): """Base class for physical package reader objects.""" def __contains__(self, item: object) -> bool: """Must be implemented by each subclass.""" raise NotImplementedError( # pragma: no cover "`%s` must implement `.__contains__()`" % type(self).__name__ ) def __getitem__(self, pack_uri: PackURI) -> bytes: """Blob for part corresponding to `pack_uri`.""" raise NotImplementedError( # pragma: no cover f"`{type(self).__name__}` must implement `.__contains__()`" ) @classmethod def factory(cls, pkg_file: str | IO[bytes]) -> _PhysPkgReader: """Return |_PhysPkgReader| subtype instance appropriage for `pkg_file`.""" # --- for pkg_file other than str, assume it's a stream and pass it to Zip # --- reader to sort out if not isinstance(pkg_file, str): return _ZipPkgReader(pkg_file) # --- otherwise we treat `pkg_file` as a path --- if os.path.isdir(pkg_file): return _DirPkgReader(pkg_file) if zipfile.is_zipfile(pkg_file): return _ZipPkgReader(pkg_file) raise PackageNotFoundError("Package not found at '%s'" % pkg_file) class _DirPkgReader(_PhysPkgReader): """Implements |PhysPkgReader| interface for OPC package extracted into directory. `path` is the path to a directory containing an expanded package. """ def __init__(self, path: str): self._path = os.path.abspath(path) def __contains__(self, pack_uri: object) -> bool: """Return True when part identified by `pack_uri` is present in zip archive.""" if not isinstance(pack_uri, PackURI): return False return os.path.exists(posixpath.join(self._path, pack_uri.membername)) def __getitem__(self, pack_uri: PackURI) -> bytes: """Return bytes of file corresponding to `pack_uri` in package directory.""" path = os.path.join(self._path, pack_uri.membername) try: with open(path, "rb") as f: return f.read() except IOError: raise KeyError("no member '%s' in package" % pack_uri) class _ZipPkgReader(_PhysPkgReader): """Implements |PhysPkgReader| interface for a zip-file OPC package.""" def __init__(self, pkg_file: str | IO[bytes]): self._pkg_file = pkg_file def __contains__(self, pack_uri: object) -> bool: """Return True when part identified by `pack_uri` is present in zip archive.""" return pack_uri in self._blobs def __getitem__(self, pack_uri: PackURI) -> bytes: """Return bytes for part corresponding to `pack_uri`. Raises |KeyError| if no matching member is present in zip archive. """ if pack_uri not in self._blobs: raise KeyError("no member '%s' in package" % pack_uri) return self._blobs[pack_uri] @lazyproperty def _blobs(self) -> dict[PackURI, bytes]: """dict mapping partname to package part binaries.""" with zipfile.ZipFile(self._pkg_file, "r") as z: return {PackURI("/%s" % name): z.read(name) for name in z.namelist()} class _PhysPkgWriter: """Base class for physical package writer objects.""" @classmethod def factory(cls, pkg_file: str | IO[bytes]) -> _ZipPkgWriter: """Return |_PhysPkgWriter| subtype instance appropriage for `pkg_file`. Currently the only subtype is `_ZipPkgWriter`, but a `_DirPkgWriter` could be implemented or even a `_StreamPkgWriter`. """ return _ZipPkgWriter(pkg_file) def write(self, pack_uri: PackURI, blob: bytes) -> None: """Write `blob` to package with membername corresponding to `pack_uri`.""" raise NotImplementedError( # pragma: no cover f"`{type(self).__name__}` must implement `.write()`" ) class _ZipPkgWriter(_PhysPkgWriter): """Implements |PhysPkgWriter| interface for a zip-file (.pptx file) OPC package.""" def __init__(self, pkg_file: str | IO[bytes]): self._pkg_file = pkg_file def __enter__(self) -> _ZipPkgWriter: """Enable use as a context-manager. Opening zip for writing happens here.""" return self def __exit__(self, *exc: list[Any]) -> None: """Close the zip archive on exit from context. Closing flushes any pending physical writes and releasing any resources it's using. """ self._zipf.close() def write(self, pack_uri: PackURI, blob: bytes) -> None: """Write `blob` to zip package with membername corresponding to `pack_uri`.""" self._zipf.writestr(pack_uri.membername, blob) @lazyproperty def _zipf(self) -> zipfile.ZipFile: """`ZipFile` instance open for writing.""" return zipfile.ZipFile( self._pkg_file, "w", compression=zipfile.ZIP_DEFLATED, strict_timestamps=False ) class _ContentTypesItem: """Composes content-types "part" ([Content_Types].xml) for a collection of parts.""" def __init__(self, parts: Sequence[Part]): self._parts = parts @classmethod def xml_for(cls, parts: Sequence[Part]) -> CT_Types: """Return content-types XML mapping each part in `parts` to a content-type. The resulting XML is suitable for storage as `[Content_Types].xml` in an OPC package. """ return cls(parts)._xml @lazyproperty def _xml(self) -> CT_Types: """lxml.etree._Element containing the content-types item. This XML object is suitable for serialization to the `[Content_Types].xml` item for an OPC package. Although the sequence of elements is not strictly significant, as an aid to testing and readability Default elements are sorted by extension and Override elements are sorted by partname. """ defaults, overrides = self._defaults_and_overrides _types_elm = CT_Types.new() for ext, content_type in sorted(defaults.items()): _types_elm.add_default(ext, content_type) for partname, content_type in sorted(overrides.items()): _types_elm.add_override(partname, content_type) return _types_elm @lazyproperty def _defaults_and_overrides(self) -> tuple[dict[str, str], dict[PackURI, str]]: """pair of dict (defaults, overrides) accounting for all parts. `defaults` is {ext: content_type} and overrides is {partname: content_type}. """ defaults = CaseInsensitiveDict(rels=CT.OPC_RELATIONSHIPS, xml=CT.XML) overrides: dict[PackURI, str] = {} for part in self._parts: partname, content_type = part.partname, part.content_type ext = partname.ext if (ext.lower(), content_type) in default_content_types: defaults[ext] = content_type else: overrides[partname] = content_type return defaults, overrides
Memory