"""A "folder" in Microsoft Compound File Binary (CFB) format. The CFB/OLE file format encloses a filesystem, to a first appoximation, much like a Zip archive does. In this format, a "storage" corresponds to a directory and a "stream" corresponds to a file. A storage can contain both streams and other storages. Each MSG file has a "root" storage, represented in this package by `MessageStorage`. Each attachment and recipient has their own storage in the root, with a pre-defined name, and there are other top-level objects than can appear in a MSG file that get their own storage. """ from __future__ import annotations import dataclasses as dc from typing import Iterator, Mapping from olefile import OleFileIO from olefile.olefile import STGTY_STORAGE, STGTY_STREAM, OleDirectoryEntry from oxmsg.util import lazyproperty @dc.dataclass class Storage: """Container for streams and sub-storages.""" path: str streams: tuple[Stream, ...] storages: tuple[Storage, ...] def __repr__(self) -> str: return ( f"Storage(path={repr(self.path)}, {len(self.streams)} streams," f" {len(self.storages)} storages)" ) @classmethod def from_ole( cls, ole: OleFileIO, node: OleDirectoryEntry | None = None, prefix: str = "" ) -> Storage: """Return a Storage loaded from `node` and containing its streams and sub-storages.""" # -- initial call is `.from_ole(ole)`; other args are only specified on recursion -- node = node if node else ole.root def _iter_streams(ole: OleFileIO, node: OleDirectoryEntry, prefix: str) -> Iterator[Stream]: """Generate `Stream` object for each stream in `nodes`.""" for stream_node in (k for k in node.kids if k.entry_type == STGTY_STREAM): path = f"{prefix}/{stream_node.name}" if prefix else stream_node.name with ole.openstream(path) as f: bytes_ = f.read() yield Stream(path, bytes_) streams = tuple(_iter_streams(ole, node, prefix)) sub_storages = tuple( cls.from_ole(ole, k, f"{prefix}/{k.name}" if prefix else k.name) for k in node.kids if k.entry_type == STGTY_STORAGE ) return cls(path=prefix, streams=streams, storages=sub_storages) def iter_attachment_storages(self) -> Iterator[Storage]: """Generate storage object specific to each attachment in this message.""" for s in self.storages: if s.name.startswith("__attach_version1.0_#"): yield s def iter_recipient_storages(self) -> Iterator[Storage]: """Generate storage object specific to each recipent in this message.""" for s in self.storages: if s.name.startswith("__recip_version1.0_#"): yield s @lazyproperty def name(self) -> str: """The "directory-name" of this storage, with no path-prefix.""" return self.path.split("/")[-1] @lazyproperty def properties_stream_bytes(self) -> bytes: """The bytes of the one-and-only-one properties stream in this storage.""" # -- every storage mush have a properties stream -- return self._streams_by_name["__properties_version1.0"].bytes_ def property_stream_bytes(self, pid: int, ptyp: int) -> bytes: """Read variable-length property bytes from the stream it's stored in.""" # -- This method should not be called unless there is an entry for this property in the # -- properties stream. If the property exists but its stream does not, that's an # -- exception, not an expected occurence. return self._streams_by_name[f"__substg1.0_{pid:04X}{ptyp:04X}"].bytes_ @lazyproperty def _streams_by_name(self) -> Mapping[str, Stream]: """dict semantics on streams of this storage.""" return {s.name: s for s in self.streams} @dc.dataclass class Stream: """Bytes of a property of a top-level object in an OXMSG file.""" path: str bytes_: bytes def __repr__(self) -> str: return f"Stream(path={repr(self.path)}, {len(self.bytes_):,} bytes)" @lazyproperty def name(self) -> str: """The "filename" of this stream, with no path-prefix.""" return self.path.split("/")[-1]
Memory