from __future__ import annotations
__all__ = [
'AddressBookEntryID',
'ContactAddressEntryID',
'EntryID',
'FolderEntryID',
'MessageEntryID',
'NNTPNewsgroupFolderEntryID',
'OneOffRecipient',
'PermanentEntryID',
'PersonalDistributionListEntryID',
'StoreObjectEntryID',
'WrappedEntryID',
]
import abc
import logging
from typing import Optional, Union
from ._helpers import BytesReader
from .. import constants
from ..enums import (
AddressBookType, ContactAddressIndex, DisplayType, EntryIDType,
MacintoshEncoding, MessageFormat, MessageType, OORBodyFormat,
WrappedType
)
from ..exceptions import FeatureNotImplemented
from ..utils import bitwiseAdjustedAnd, bytesToGuid
logger = logging.getLogger(__name__)
logger.addHandler(logging.NullHandler())
# First we define the main EntryID structure that is the base for the others.
class EntryID(abc.ABC):
"""
Base class for all EntryID structures. Use :classmethod autoCreate: to
automatically create the correct EntryID structure type from the specified
data.
"""
@classmethod
def autoCreate(cls, data: Optional[bytes]) -> Optional[EntryID]:
"""
Automatically determines the type of EntryID and returns an instance of
the correct subclass. If the subclass cannot be determined, will return
a plain EntryID instance.
"""
if not data:
return None
if len(data) < 20:
raise ValueError('Cannot create an EntryID with less than 20 bytes.')
providerUID = data[4:20]
try:
providerUID = EntryIDType(providerUID)
except ValueError:
raise ValueError(f'Unrecognized UID "{"".join(f"{x:02X}" for x in providerUID)}". You should probably report this to the developers.') from None
# Now check the Provider UID against the known ones.
if providerUID == EntryIDType.ADDRESS_BOOK_RECIPIENT:
return AddressBookEntryID(data)
if providerUID == EntryIDType.CA_OR_PDL_RECIPIENT:
# Verify that the type signature is correct.
if data[24:28] not in (b'\x04\x00\x00\x00', b'\x05\x00\x00\x00'):
raise ValueError(f'Found Entry ID matching ContactAddress or PersonalDistributionList but the type was invalid ({data[24:28]}).')
if data[24] == 4:
return ContactAddressEntryID(data)
else:
return PersonalDistributionListEntryID(data)
if providerUID == EntryIDType.NNTP_NEWSGROUP_FOLDER:
# This is, of course, another one with a shared unique identifier.
# Technically it's the UID of the provider, but why do multiple
# structures use the same provider?
if data[20] == 0xC:
return NNTPNewsgroupFolderEntryID(data)
else:
return StoreObjectEntryID(data)
if providerUID == EntryIDType.ONE_OFF_RECIPIENT:
return OneOffRecipient(data)
if providerUID == EntryIDType.PUBLIC_MESSAGE_STORE:
if len(data) == 46:
return FolderEntryID(data)
else:
return MessageEntryID(data)
if providerUID == EntryIDType.WRAPPED:
return WrappedEntryID(data)
raise FeatureNotImplemented(f'UID for EntryID found in database, but no class was specified for it: {providerUID}')
def __init__(self, data: bytes):
self.__flags = data[:4]
self.__providerUID = data[4:20]
self.__rawData = data
def __bytes__(self) -> bytes:
return self.toBytes()
def toBytes(self) -> bytes:
return self.__rawData
@property
def flags(self) -> bytes:
"""
The flags for this Entry ID.
"""
return self.__flags
@property
def entryIDType(self) -> Union[EntryIDType, bytes]:
"""
Returns an instance of EntryIDType corresponding to the provider UID of
this EntryID. If none is found, returns the bytes.
"""
try:
return EntryIDType(self.__providerUID)
except ValueError:
return self.__providerUID
@property
def longTerm(self) -> bool:
"""
Whether the EntryID is long term or not.
"""
return self.__flags == b'\x00\x00\x00\x00'
@property
@abc.abstractmethod
def position(self) -> int:
"""
Used to tell the amount of bytes read in this EntryID. Useful for
EntryID data that has been chained together with no separator.
"""
@property
def providerUID(self) -> bytes:
"""
The 16 byte UID that identifies the type of Entry ID.
"""
return self.__providerUID
# Now for the specific types.
class AddressBookEntryID(EntryID):
"""
An Address Book EntryID structure, as specified in [MS-OXCDATA].
"""
def __init__(self, data: bytes):
super().__init__(data)
reader = BytesReader(data[20:])
# Version *MUST* be 1.
self.__version = reader.readUnsignedInt()
if self.__version != 1:
raise ValueError(f'Version must be 1 on address book entry IDs (got {self.__version}).')
self.__type = AddressBookType(reader.readUnsignedInt())
self.__X500DN = reader.readByteString()
self.__position = reader.tell() + 20
@property
def position(self) -> int:
return self.__position
@property
def type(self) -> AddressBookType:
"""
The type of the object.
"""
return self.__type
@property
def version(self) -> int:
"""
The version. MUST be 1.
"""
return self.__version
@property
def X5000DN(self) -> bytes:
"""
The X500 DN of the Address Book object.
"""
return self.__X500DN
class ContactAddressEntryID(EntryID):
"""
A Contact Address EntryID structure, as defined in [MS-OXCDATA]. Specifies a
set of data representing recipients whose information is stored in a Contact
object.
"""
def __init__(self, data: bytes):
super().__init__(data)
reader = BytesReader(data[20:])
if (version := reader.readUnsignedInt()) != 3:
raise ValueError(f'Version must be 3 (got {version}).')
if (type_ := reader.readUnsignedInt()) != 4:
raise ValueError(f'Type must be 4 (got {type_}).')
self.__index = ContactAddressIndex(reader.readUnsignedInt())
self.__entryIdCount = reader.readUnsignedInt()
self.__entryID = MessageEntryID(reader.read(self.__entryIdCount))
self.__position = reader.tell() + 20
@property
def entryID(self) -> MessageEntryID:
"""
The EntryID contained in this object.
"""
return self.__entryID
@property
def entryIDCount(self) -> int:
"""
The size, in bytes, of the EntryID contained in this object.
"""
return self.__entryIdCount
@property
def index(self) -> ContactAddressIndex:
"""
The electronic address in the contact information to use.
"""
return self.__index
@property
def position(self) -> int:
return self.__position
class FolderEntryID(EntryID):
"""
A Folder EntryID structure, as defined in [MS-OXCDATA].
"""
__SIZE__: int = 46
def __init__(self, data: bytes):
super().__init__(data)
reader = BytesReader(data[20:])
self.__folderType = MessageType(reader.readUnsignedShort())
self.__databaseGuid = bytesToGuid(reader.read(16))
# This entry is 6 bytes, so we pull some shenanigans to unpack it.
self.__globalCounter = constants.st.ST_LE_UI64.unpack(reader.read(6) + b'\x00\x00')[0]
reader.assertNull(2, 'Pad bytes were not 0.')
@property
def databaseGuid(self) -> str:
"""
A GUID associated with the Store Object and corresponding to the
ReplicaID field of the FID structure.
"""
return self.__databaseGuid
@property
def folderType(self) -> MessageType:
"""
The type of folder.
"""
return self.__folderType
@property
def globalCounter(self) -> int:
"""
An unsigned integer identifying the folder.
"""
return self.__globalCounter
@property
def position(self) -> int:
return self.__SIZE__
class MessageEntryID(EntryID):
"""
A Message EntryID structure, as defined in [MS-OXCDATA].
"""
__SIZE__: int = 70
def __init__(self, data: bytes):
super().__init__(data)
reader = BytesReader(data[20:])
self.__messageType = MessageType(reader.readUnsignedShort())
self.__folderDatabaseGuid = bytesToGuid(reader.read(16))
# This entry is 6 bytes, so we pull some shenanigans to unpack it.
self.__folderGlobalCounter = constants.st.ST_LE_UI64.unpack(reader.read(6) + b'\x00\x00')[0]
reader.assertNull(2, 'Pad bytes were not 0.')
self.__messageDatabaseGuid = bytesToGuid(reader.read(16))
# This entry is 6 bytes, so we pull some shenanigans to unpack it.
self.__messageGlobalCounter = constants.st.ST_LE_UI64.unpack(reader.read(6) + b'\x00\x00')[0]
reader.assertNull(2, 'Pad bytes were not 0.')
# Not sure why Microsoft decided to say "yes, let's do 2 6-byte integers
# followed by 2 pad bytes each" instead of just 2 8-byte integers with a
# maximum value, but here we are.
@property
def folderDatabaseGuid(self) -> str:
"""
A GUID associated with the Store object of the folder in which the
message resides and corresponding to the ReplicaId field in the folder
ID structure.
"""
return self.__folderDatabaseGuid
@property
def folderGlobalCounter(self) -> int:
"""
An unsigned integer identifying the folder in which the message resides.
"""
return self.__folderGlobalCounter
@property
def messageDatabaseGuid(self) -> str:
"""
A GUID associated with the Store object of the message and corresponding
to the ReplicaId field of the Message ID structure.
"""
return self.__messageDatabaseGuid
@property
def messageGlobalCounter(self) -> int:
"""
An unsigned integer identifying the message.
"""
return self.__messageGlobalCounter
@property
def messageType(self) -> MessageType:
"""
The Store object type.
"""
return self.__messageType
@property
def position(self) -> int:
return self.__SIZE__
class NNTPNewsgroupFolderEntryID(EntryID):
"""
A NNTP Newsgroup Folder EntryID structure, as defined in [MS-OXCDATA].
"""
def __init__(self, data: bytes):
super().__init__(data)
reader = BytesReader(data[20:])
self.__folderType = reader.readUnsignedShort()
if self.__folderType != 0x000C:
raise ValueError(f'Folder type was not 0x000C (got {self.__folderType})')
self.__newsgroupName = reader.readByteString()
self.__position = reader.tell() + 20
@property
def folderType(self) -> int:
"""
The type of folder. MUST be ``0x000C``.
"""
return self.__folderType
@property
def newsgroupName(self) -> bytes:
"""
The name of the newsgroup, as an ANSI string.
"""
return self.__newsgroupName
@property
def position(self) -> int:
return self.__position
class OneOffRecipient(EntryID):
"""
A One-Off EntryID structure, as specified in [MS-OXCDATA].
"""
def __init__(self, data: bytes):
super().__init__(data)
# Create a reader to easily
reader = BytesReader(data[20:])
self.__version = reader.readUnsignedShort()
# It's not really flags, but I can't come up with a descriptive name for
# this collection of data, so `flagsThing` it is.
flagsThing = reader.readUnsignedShort()
# Just a little forewarning, I am *well* aware that these masks for each
# flag do not match the specification as you might expect. That is
# because, unlike with other parts of the documentation, these bytes
# are, for some reason, *not read together*. This means they are not
# meant to be swapped for little endian, and as such I had to flip the
# masks to compensate. Again, this is *despite other portions of the
# documentation using an identical format and being read in little
# endian. Took me way too long to figure out why this was not working
# despite following the documentation to the letter. If I had to guess,
# the reason this one is not flipped and the others are is because this
# is not grouped together.
self.__macintoshEncoding = MacintoshEncoding(bitwiseAdjustedAnd(flagsThing, 0xC))
self.__format = OORBodyFormat(bitwiseAdjustedAnd(flagsThing, 0x1E))
# Flag to indicate how messages are to be sent. 0 means TNEF, 1 means
# MIME.
self.__messageFormat = MessageFormat(bitwiseAdjustedAnd(flagsThing, 0x1))
# Whether the strings are UTF-16 or not.
self.__stringFormatUnicode = bitwiseAdjustedAnd(flagsThing, 0x8000) == 1
self.__canLookup = bitwiseAdjustedAnd(flagsThing, 0x1000) == 0
if self.__stringFormatUnicode:
self.__displayName = reader.readUtf16String()
self.__addressType = reader.readUtf16String()
self.__emailAddress = reader.readUtf16String()
else:
# Don't actually know how to properly handle this kind of encoding,
# since the documentation doesn't define exactly what encoding to
# use for this of even how to find out, so for now we just don't
# decode it at all and just leave it as bytes.
self.__displayName = reader.readByteString()
self.__addressType = reader.readByteString()
self.__emailAddress = reader.readByteString()
self.__position = reader.tell() + 20
@property
def addressType(self) -> Union[str, bytes]:
"""
The address type for this Recipient.
"""
return self.__addressType
@property
def areStringUnicode(self) -> bool:
"""
Whether or not the strings are in UTF-16 format.
"""
return self.__stringFormatUnicode
@property
def canLookup(self) -> bool:
"""
Whether the server can lookup the user's email in the address book.
"""
return self.__canLookup
@property
def displayName(self) -> Union[str, bytes]:
"""
The display name for this Recipient.
"""
return self.__displayName
@property
def emailAddress(self) -> Union[str, bytes]:
"""
The email address for this Recipient.
"""
return self.__emailAddress
@property
def format(self) -> OORBodyFormat:
"""
The message body format desired for this recipient.
"""
return self.__format
@property
def macintoshEncoding(self) -> MacintoshEncoding:
"""
The encoding used for Macintosh-specific data attachments.
"""
return self.__macintoshEncoding
@property
def messageFormat(self) -> MessageFormat:
"""
The message format to use for messages sent to this recipient.
"""
return self.__messageFormat
@property
def position(self) -> int:
return self.__position
class PermanentEntryID(EntryID):
"""
A Permanent EntryID structure, as defined in [MS-OXNSPI].
"""
def __init__(self, data: bytes):
super().__init__(data)
reader = BytesReader(data)
unpacked = reader.readStruct(constants.st.ST_PEID)
if unpacked[0] != 0:
raise TypeError(f'Not a PermanentEntryID (expected 0, got {unpacked[0]}).')
self.__displayTypeString = DisplayType(unpacked[2])
self.__distinguishedName = reader.readAsciiString()
self.__position = reader.tell()
@property
def displayTypeString(self) -> DisplayType:
"""
Returns the display type string value.
"""
return self.__displayTypeString
@property
def distinguishedName(self) -> str:
"""
Returns the distinguished name.
"""
return self.__distinguishedName
@property
def position(self) -> int:
return self.__position
class PersonalDistributionListEntryID(EntryID):
"""
A Personal Distribution List EntryID structure, as defined in [MS-OXCDATA].
"""
def __init__(self, data: bytes):
super().__init__(data)
reader = BytesReader(data[20:])
if (arg := reader.readUnsignedInt()) != 3:
raise ValueError(f'Version must be 3 (got {arg}).')
if (arg := reader.readUnsignedInt()) != 5:
raise ValueError(f'Type must be 5 (got {arg}).')
if (arg := reader.readUnsignedInt()) != 0xFF:
raise ValueError(f'Index must be 255 (got {arg}).')
self.__entryIdCount = reader.readUnsignedInt()
self.__entryID = MessageEntryID(reader.read(self.__entryIdCount))
self.__position = reader.tell() + 20
@property
def entryID(self) -> MessageEntryID:
"""
The EntryID contained in this object.
"""
return self.__entryID
@property
def entryIDCount(self) -> int:
"""
The size, in bytes, of the EntryID contained in this object.
"""
return self.__entryIdCount
@property
def position(self) -> int:
return self.__position
class StoreObjectEntryID(EntryID):
"""
A Store Object EntryID structure, as defined in [MS-OXCDATA].
"""
def __init__(self, data: bytes):
super().__init__(data)
reader = BytesReader(data[20:])
self.__version = reader.readUnsignedByte()
if self.__version != 0:
raise ValueError(f'Version was not set to 0 (got {self.__version}).')
self.__flag = reader.readUnsignedByte()
if self.__flag != 0:
raise ValueError(f'Flag was not set to 0 (got {self.__flag}).')
self.__dllFileName = reader.read(14)
self.__wrappedFlags = reader.readUnsignedInt()
if self.__wrappedFlags != 0:
raise ValueError(f'Wrapped flags was not set to 0 (got {self.__wrappedFlags}).')
self.__wrappedProviderUID = reader.read(16)
self.__wrappedType = WrappedType(reader.readUnsignedInt())
# Don't know how this is encoded, just that it is "single-byte
# characters".
self.__serverShortname = reader.readByteString()
if self.__wrappedProviderUID == b'\x1B\x55\xFA\x20\xAA\x66\x11\xCD\x9B\xC8\x00\xAA\x00\x2F\xC4\x5A':
self.__mailboxDN = reader.readAsciiString()
else:
self.__mailboxDN = None
self.__position = reader.tell() + 20
@property
def dllFileName(self) -> bytes:
"""
Must be set to b'emsmdb.dll\\x00\\x00\\x00\\x00'.
"""
return self.__dllFileName
@property
def flag(self) -> int:
return self.__flag
@property
def mailboxDN(self) -> Optional[str]:
"""
A string representing the X500 DN of the mailbox, as specified in
[MS-OXOAB]. THis field is present only for mailbox databases.
"""
return self.__mailboxDN
@property
def position(self) -> int:
return self.__position
@property
def serverShortname(self) -> bytes:
"""
A string of single-byte characters indicating the short name or NetBIOS
name of the server.
"""
return self.__serverShortname
@property
def version(self) -> int:
return self.__version
@property
def wrappedProviderUID(self) -> bytes:
return self.__wrappedProviderUID
@property
def wrappedType(self) -> WrappedType:
"""
Determined by where the folder is located.
"""
return self.__wrappedType
class WrappedEntryID(EntryID):
"""
A WrappedEntryId structure, as specified in [MS-OXOCNTC].
"""
def __init__(self, data: bytes):
super().__init__(data)
# Grab the type byte and parse it.
self.__type = data[20]
bits = self.__type & 0xF
if bits == 0:
self.__embeddedEntryID = OneOffRecipient(data[21:])
elif bits == 3 or bits == 4:
self.__embeddedEntryID = MessageEntryID(data[21:])
elif bits == 5 or bits == 6:
self.__embeddedEntryID = AddressBookEntryID(data[21:])
else:
raise ValueError(f'Found wrapped entry id with invalid type (type bits were {bits}).')
self.__embeddedIsOneOff = self.__type & 0x80 == 0
self.__position = 21 + self.__embeddedEntryID.position
@property
def embeddedEntryID(self) -> EntryID:
"""
The embedded EntryID of this object.
"""
return self.__embeddedEntryID
@property
def embeddedIsOneOff(self) -> bool:
"""
Whether the embedded EntryID is a One-Off EntryID.
"""
return self.__embeddedIsOneOff
@property
def position(self) -> int:
return self.__position
@property
def type(self) -> int:
"""
The type bits of this object.
"""
return self.__type