"""Provides access to all properties of a top-level object (message, recipient, attachment). This object is accessed using the `.properties` property of a top-level object. The properties for each top-level object are segregated into its own properties object. Many property-ids (PIDs) and property-types (PTYPs) are available as constants in `oxml.domain.contants`. ```python >>> from oxmsg import Message >>> from oxmsg.domain import constants as c >>> msg = Message.load("message.msg") >>> properties = msg.properties >>> properties.str_prop_value(c.PID_MESSAGE_CLASS).value 'IPM.Note' ``` """ from __future__ import annotations import datetime as dt import itertools import struct import types import uuid from collections import defaultdict from typing import Final, Iterator, cast from oxmsg.domain import constants as c from oxmsg.domain import encodings from oxmsg.domain import model as m from oxmsg.domain import reference as ref from oxmsg.util import lazyproperty class Properties: """Provides access to properties from an OXMSG storage.""" def __init__(self, storage: m.PropStorageT, properties_header_offset: int): self._storage = storage # -- Offset within the properties stream at which 16-byte property segments start. This # -- varies between storage types, e.g. root properties and attachment properties, etc. self._properties_header_offset = properties_header_offset def __iter__(self) -> Iterator[m.PropertyT]: return iter(self._property_sequence) def binary_prop_value(self, pid: int) -> bytes | None: """Retrieve bytes of PtypBinary property identified by `pid`. Returns `None` if property is not present in this collection. """ property = self._get_property(pid, c.PTYP_BINARY) if property is None: return None return cast(BinaryProperty, property).value @lazyproperty def body_encoding(self) -> str: """The encoding used for a PidTagBody or PidTagHtml property of PtypString8/Binary. Must be cherry-picked because it is required before constructing the properties collection. Note when these are PtypString they unconditionally use UTF-16LE. """ # -- Case 1: Use `PID_INTERNET_CODEPAGE` (0x3FDE) when present -- internet_codepage = self._cherry_pick_int_prop(c.PID_INTERNET_CODEPAGE) if internet_codepage is not None: return encodings.encoding_from_codepage(internet_codepage.value) # -- the fallbacks are the same encoding sources as string properties -- return self._str_prop_encoding def date_prop_value(self, pid: int) -> dt.datetime | None: """Read datetime property value from the properties stream. - Microseconds are truncated. - Returns `None` when no `pid` property is present in properties stream. """ property = self._properties_mapping.get(pid) if property is None: return None return cast(TimeProperty, property).value def int_prop_value(self, pid: int) -> int | None: """Retrieve int value of PtypInteger32 property identified by `pid`. Returns `None` if no `pid` property is present in this collection. """ property = self._properties_mapping.get(pid) if property is None: return None return cast(Int32Property, property).value def str_prop_value(self, pid: int) -> str | None: """Retrieve str value of PtypString or PtypString8 property identified by `pid`. Returns the empty str if property is not present in this collection. """ property = self._get_property(pid, (c.PTYP_STRING, c.PTYP_STRING8)) if property is None: return None return cast(StringProperty, property).value @lazyproperty def string_props_are_unicode(self) -> bool: # pragma: no cover """True indicates PtypString properties in this message are encoded "utf-16-le".""" store_support_mask = self.int_prop_value(c.PID_STORE_SUPPORT_MASK) if store_support_mask is None: return False return bool(store_support_mask & m.STORE_UNICODE_OK) def _cherry_pick_int_prop(self, pid: int) -> Int32Property | None: """Get an Int32 property without triggering broader property load. Used to solve chicken-and-egg problem of determining encoding required by string properties before atomically loading all properties. """ PID = struct.Struct("<2xH") for segment in self._prop_segment_sequence: pid_ = PID.unpack(segment[:4])[0] if pid_ == pid: return Int32Property(segment) return None def _get_property(self, pid: int, ptyps: int | tuple[int, ...]) -> m.PropertyT | None: """Retrieve the first property with `pid` and one of `ptyps`. The general expectation is that at most one property with `pid` and one of `ptyps` will be present in the collection. In the unusual case there could be more than one this method may need to be called once for each possible type to get them all or in a particular order of preference. """ acceptable_ptyps = (ptyps,) if isinstance(ptyps, int) else ptyps candidate_props = self._properties_by_pid[pid] for p in candidate_props: if p.ptyp in acceptable_ptyps: return p return None @lazyproperty def _str_prop_encoding(self) -> str: """The encoding used for non-body properties of PtypString8. Must be cherry-picked because it is required before constructing the properties collection. Note when PtypString properties are unconditionally encoded with UTF-16LE. """ # -- Case 1: `PID_TAG_MESSAGE_CODEPAGE` (0x3FFD) is present and specifies the int # -- code-page used to encode the non-Unicode string properties on the Message object. message_codepage = self._cherry_pick_int_prop(c.PID_MESSAGE_CODEPAGE) if message_codepage is not None: return encodings.encoding_from_codepage(message_codepage.value) # - Case 2: not specified one way or another, default to "iso-8859-15" (Latin 9) -- return "iso-8859-15" @lazyproperty def _prop_segment_sequence(self) -> tuple[bytes, ...]: """16-byte segments comprising property blocks from the attachment properties stream.""" return tuple( segment for segment in _batched_bytes( self._storage.properties_stream_bytes[self._properties_header_offset :], 16 ) # -- drop any trailing short segment, happens sometimes -- if len(segment) == 16 ) @lazyproperty def _properties_by_pid(self) -> defaultdict[int, list[m.PropertyT]]: """Properties in this collection grouped by property-id (PID). Not sure if this solves an actual problem in practice, but it's at least theoretically possible that the same PID could appear twice in a property collection with different PTYPs. """ properties_by_pid: defaultdict[int, list[m.PropertyT]] = defaultdict(list) for p in self._property_sequence: properties_by_pid[p.pid].append(p) return properties_by_pid @lazyproperty def _properties_mapping(self) -> types.MappingProxyType[int, m.PropertyT]: """The property objects in this collection keyed by pid.""" return types.MappingProxyType({p.pid: p for p in self._property_sequence}) @lazyproperty def _property_sequence(self) -> tuple[m.PropertyT, ...]: """Property object for each property in this collection. Properties are in property-id (PID) order. """ PID = struct.Struct("<2xH") segments = sorted(self._prop_segment_sequence, key=lambda x: PID.unpack(x[:4])[0]) return tuple( BaseProperty.factory( segment=segment, storage=self._storage, str_prop_encoding=self._str_prop_encoding, body_encoding=self.body_encoding, ) for segment in segments ) class BaseProperty: """Base class for properties, providing common behaviors.""" PID: Final[struct.Struct] = struct.Struct("<2xH") PTYP: Final[struct.Struct] = struct.Struct("<H") def __init__(self, segment: bytes): self._segment = segment @classmethod def factory( cls, segment: bytes, storage: m.PropStorageT, str_prop_encoding: str, body_encoding: str ) -> m.PropertyT: """Construct a property object of the appropriate sub-type for `segment`.""" ptyp = cls.PTYP.unpack(segment[:2])[0] if ptyp == c.PTYP_BINARY: return BinaryProperty(segment, storage) if ptyp == c.PTYP_BOOLEAN: return BooleanProperty(segment) if ptyp == c.PTYP_FLOATING_64: return Float64Property(segment) if ptyp == c.PTYP_GUID: return GuidProperty(segment, storage) if ptyp == c.PTYP_INTEGER_16: return Int16Property(segment) if ptyp == c.PTYP_INTEGER_32: return Int32Property(segment) if ptyp == c.PTYP_STRING: return StringProperty(segment, storage) if ptyp == c.PTYP_STRING8: return String8Property( segment=segment, storage=storage, str_prop_encoding=str_prop_encoding, body_encoding=body_encoding, ) if ptyp == c.PTYP_TIME: return TimeProperty(segment) # -- default to Int32 -- return Int32Property(segment) @property def name(self) -> str: """The Microsft name for this property, like "PidTagMessageClass".""" prop_desc = ref.property_descriptors.get(self.pid) return prop_desc.ms_name if prop_desc is not None else "not recorded in model" @property def pid(self) -> int: """The property-id (PID) for this property, like 0x3701 for attachment bytes.""" return self.PID.unpack(self._segment[:4])[0] @property def ptyp(self) -> int: """The property-type (PTYP) for this property, like 0x0102 for PtypBinary.""" return self.PTYP.unpack(self._segment[:2])[0] @property def ptyp_name(self) -> str: """The Microsft name for the type of this property, like "PtypString".""" prop_type_desc = ref.property_type_descriptors.get(self.ptyp) return ( prop_type_desc.ms_name if prop_type_desc else f"{self.ptyp:04X} not recorded in model" ) @lazyproperty def _payload(self) -> bytes: """The latter 8 bytes of the property segment, where the property value is stored.""" return self._segment[8:] class BinaryProperty(BaseProperty): """Property for PtypBinary OLE properties.""" def __init__(self, segment: bytes, storage: m.PropStorageT): super().__init__(segment) self._storage = storage @lazyproperty def value(self) -> bytes: """The bytes of this binary property.""" return self._storage.property_stream_bytes(self.pid, self.ptyp) class BooleanProperty(BaseProperty): """Property for PtypBoolean OLE properties.""" SIGNED_CHAR: Final[struct.Struct] = struct.Struct("<b") @lazyproperty def value(self) -> bool: """The boolean value of this property.""" return self.SIGNED_CHAR.unpack(self._payload[:1])[0] != 0 class Float64Property(BaseProperty): """Property for PtypFloating64 OLE properties.""" FLOAT64: Final[struct.Struct] = struct.Struct("<d") @lazyproperty def value(self) -> float: """The 64-bit floating-point value of this property.""" return self.FLOAT64.unpack(self._payload)[0] class GuidProperty(BaseProperty): """Property for PtypGuid OLE properties.""" GUID: Final[struct.Struct] = struct.Struct("<IHH8s") def __init__(self, segment: bytes, storage: m.PropStorageT): super().__init__(segment) self._storage = storage def __str__(self) -> str: """Hex str representation of this UUID like '9d947746-9662-40a8-a526-abd4faec9737'.""" return str(self.value) @lazyproperty def value(self) -> uuid.UUID: """The value of this property as a uuid.UUID object. The `str` value of this object is the standard-form string for the UUID, like: '9d947746-9662-40a8-a526-abd4faec9737'. """ # -- In the OXMSG format, a GUID (aka. UUID) is stored as four distinct fields, each in # -- little-endian form. Luckily Python's uuid built-in can parse this directly. return uuid.UUID(bytes_le=self._storage.property_stream_bytes(self.pid, self.ptyp)[:16]) class Int16Property(BaseProperty): """Property for PtypInteger16 OLE properties.""" INT16: Final[struct.Struct] = struct.Struct("<H") @lazyproperty def value(self) -> int: """The integer value of this property.""" return self.INT16.unpack(self._payload[:2])[0] class Int32Property(BaseProperty): """Property for PtypInteger32 OLE properties.""" INT32: Final[struct.Struct] = struct.Struct("<I") @lazyproperty def value(self) -> int: """The integer value of this property.""" return self.INT32.unpack(self._payload[:4])[0] class StringProperty(BaseProperty): """Property for PtypString OLE properties.""" def __init__(self, segment: bytes, storage: m.PropStorageT): super().__init__(segment) self._storage = storage @lazyproperty def value(self) -> str: """The decoded str from this string property.""" return self._storage.property_stream_bytes(self.pid, self.ptyp).decode("utf-16-le") class String8Property(BaseProperty): """Property for PtypString8 (8-bit characters, not Unicode) OLE properties.""" def __init__( self, segment: bytes, storage: m.PropStorageT, str_prop_encoding: str, body_encoding: str ): super().__init__(segment) self._storage = storage self._str_prop_encoding = str_prop_encoding self._body_encoding = body_encoding @lazyproperty def value(self) -> str: """The encoded bytes of this string property. The caller is responsible for determining the encoding and applying it to get a str value. """ return self._storage.property_stream_bytes(self.pid, self.ptyp).decode( self._body_encoding if self.pid == c.PID_BODY else self._str_prop_encoding ) class TimeProperty(BaseProperty): """Property for PtypTime OLE properties.""" TIME: Final[struct.Struct] = struct.Struct("<Q") @lazyproperty def value(self) -> dt.datetime: """The value of this property as a timezone-aware `datetime`.""" hundred_nanosecond_intervals_since_epoch = self.TIME.unpack(self._payload)[0] epoch = dt.datetime(1601, 1, 1, tzinfo=dt.timezone.utc) seconds_since_epoch = hundred_nanosecond_intervals_since_epoch // 1e7 return epoch + dt.timedelta(seconds=seconds_since_epoch) def _batched_bytes(block: bytes, n: int) -> Iterator[bytes]: """Batch bytes from `block` into segments of `n` bytes each. Last batch is shorter than `n` when `block` is not evenly divisible by `n`. """ if n < 1: # pragma: no cover raise ValueError("n must be at least one") iter_bytes = iter(block) while batch := bytes(itertools.islice(iter_bytes, n)): yield batch
Memory