#!/usr/bin/env python
"""
rtfobj.py
rtfobj is a Python module to extract embedded objects from RTF files, such as
OLE ojects. It can be used as a Python library or a command-line tool.
Usage: rtfobj.py <file.rtf>
rtfobj project website: http://www.decalage.info/python/rtfobj
rtfobj is part of the python-oletools package:
http://www.decalage.info/python/oletools
"""
#=== LICENSE =================================================================
# rtfobj is copyright (c) 2012-2022, Philippe Lagadec (http://www.decalage.info)
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without modification,
# are permitted provided that the following conditions are met:
#
# * Redistributions of source code must retain the above copyright notice, this
# list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above copyright notice,
# this list of conditions and the following disclaimer in the documentation
# and/or other materials provided with the distribution.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#------------------------------------------------------------------------------
# CHANGELOG:
# 2012-11-09 v0.01 PL: - first version
# 2013-04-02 v0.02 PL: - fixed bug in main
# 2015-12-09 v0.03 PL: - configurable logging, CLI options
# - extract OLE 1.0 objects
# - extract files from OLE Package objects
# 2016-04-01 v0.04 PL: - fixed logging output to use stdout instead of stderr
# 2016-04-07 v0.45 PL: - improved parsing to handle some malware tricks
# 2016-05-06 v0.47 TJ: - added option -d to set the output directory
# (contribution by Thomas Jarosch)
# TJ: - sanitize filenames to avoid special characters
# 2016-05-29 PL: - improved parsing, fixed issue #42
# 2016-07-13 v0.50 PL: - new RtfParser and RtfObjParser classes
# 2016-07-18 SL: - added Python 3.5 support
# 2016-07-19 PL: - fixed Python 2.6-2.7 support
# 2016-07-30 PL: - new API with class RtfObject
# - backward-compatible API rtf_iter_objects (fixed issue #70)
# 2016-07-31 PL: - table output with tablestream
# 2016-08-01 PL: - detect executable filenames in OLE Package
# 2016-08-08 PL: - added option -s to save objects to files
# 2016-08-09 PL: - fixed issue #78, improved regex
# 2016-09-06 PL: - fixed issue #83, backward compatible API
# 2016-11-17 v0.51 PL: - updated call to oleobj.OleNativeStream
# 2017-03-12 PL: - fixed imports for Python 2+3
# - fixed hex decoding bug in RtfObjParser (issue #103)
# 2017-03-29 PL: - fixed RtfParser to handle issue #152 (control word with
# long parameter)
# 2017-04-11 PL: - added detection of the OLE2Link vulnerability CVE-2017-0199
# 2017-05-04 PL: - fixed issue #164 to handle linked OLE objects
# 2017-06-08 PL: - fixed issue/PR #143: bin object with negative length
# 2017-06-29 PL: - temporary fix for issue #178
# 2017-07-14 v0.52 PL: - disabled logging of each control word (issue #184)
# 2017-07-24 PL: - fixed call to RtfParser._end_of_file (issue #185)
# - ignore optional space after \bin (issue #185)
# 2017-09-06 PL: - fixed issue #196: \pxe is not a destination
# 2018-01-11 CH: - speedup RTF parsing (PR #244)
# 2018-02-01 JRM: - fixed issue #251: \bin without argument
# 2018-04-09 PL: - fixed issue #280: OLE Package were not detected on Python 3
# 2018-03-24 v0.53 PL: - fixed issue #292: \margSz is a destination
# 2018-04-27 PL: - extract and display the CLSID of OLE objects
# 2018-04-30 PL: - handle "\'" obfuscation trick - issue #281
# 2018-05-10 PL: - fixed issues #303 #307: several destination cwords were incorrect
# 2018-05-17 PL: - fixed issue #273: bytes constants instead of str
# 2018-05-31 v0.53.1 PP: - fixed issue #316: whitespace after \bin on Python 3
# 2018-06-22 v0.53.2 PL: - fixed issue #327: added "\pnaiu" & "\pnaiud"
# 2018-09-11 v0.54 PL: - olefile is now a dependency
# 2019-07-08 v0.55 MM: - added URL carver for CVE-2017-0199 (Equation Editor) PR #460
# - added SCT to the list of executable file extensions PR #461
# 2019-12-16 v0.55.2 PL: - \rtf is not a destination control word (issue #522)
# 2019-12-17 PL: - fixed process_file to detect Equation class (issue #525)
# 2021-05-06 v0.56.2 DD: - fixed bug when OLE package class name ends with null
# characters (issue #507, PR #648)
# 2021-05-23 v0.60 PL: - use ftguess to identify file type of OLE Package
# - fixed bug in re_executable_extensions
# 2021-06-03 v0.60.1 PL: - fixed code to find URLs in OLE2Link objects for Py3 (issue #692)
from __future__ import print_function
__version__ = '0.60.1'
# ------------------------------------------------------------------------------
# TODO:
# - allow semicolon within hex, as found in this sample:
# http://contagiodump.blogspot.nl/2011/10/sep-28-cve-2010-3333-manuscript-with.html
# TODO: use OleObject and OleNativeStream in RtfObject instead of copying each attribute
# TODO: option -e <id> to extract an object, -e all for all objects
# TODO: option to choose which destinations to include (objdata by default)
# TODO: option to display SHA256 or MD5 hashes of objects in table
# === IMPORTS =================================================================
import re, os, sys, binascii, logging, optparse, hashlib
import os.path
from time import time
# IMPORTANT: it should be possible to run oletools directly as scripts
# in any directory without installing them with pip or setup.py.
# In that case, relative imports are NOT usable.
# And to enable Python 2+3 compatibility, we need to use absolute imports,
# so we add the oletools parent folder to sys.path (absolute+normalized path):
_thismodule_dir = os.path.normpath(os.path.abspath(os.path.dirname(__file__)))
# print('_thismodule_dir = %r' % _thismodule_dir)
_parent_dir = os.path.normpath(os.path.join(_thismodule_dir, '..'))
# print('_parent_dir = %r' % _thirdparty_dir)
if not _parent_dir in sys.path:
sys.path.insert(0, _parent_dir)
from oletools.thirdparty.xglob import xglob
from oletools.thirdparty.tablestream import tablestream
from oletools import oleobj, ftguess
import olefile
from oletools.common import clsid
# === LOGGING =================================================================
class NullHandler(logging.Handler):
"""
Log Handler without output, to avoid printing messages if logging is not
configured by the main application.
Python 2.7 has logging.NullHandler, but this is necessary for 2.6:
see https://docs.python.org/2.6/library/logging.html#configuring-logging-for-a-library
"""
def emit(self, record):
pass
def get_logger(name, level=logging.CRITICAL+1):
"""
Create a suitable logger object for this module.
The goal is not to change settings of the root logger, to avoid getting
other modules' logs on the screen.
If a logger exists with same name, reuse it. (Else it would have duplicate
handlers and messages would be doubled.)
The level is set to CRITICAL+1 by default, to avoid any logging.
"""
# First, test if there is already a logger with the same name, else it
# will generate duplicate messages (due to duplicate handlers):
if name in logging.Logger.manager.loggerDict:
#NOTE: another less intrusive but more "hackish" solution would be to
# use getLogger then test if its effective level is not default.
logger = logging.getLogger(name)
# make sure level is OK:
logger.setLevel(level)
return logger
# get a new logger:
logger = logging.getLogger(name)
# only add a NullHandler for this logger, it is up to the application
# to configure its own logging:
logger.addHandler(NullHandler())
logger.setLevel(level)
return logger
# a global logger object used for debugging:
log = get_logger('rtfobj')
#=== CONSTANTS=================================================================
# REGEX pattern to extract embedded OLE objects in hexadecimal format:
# alphanum digit: [0-9A-Fa-f]
HEX_DIGIT = b'[0-9A-Fa-f]'
# hex char = two alphanum digits: [0-9A-Fa-f]{2}
# HEX_CHAR = r'[0-9A-Fa-f]{2}'
# in fact MS Word allows whitespaces in between the hex digits!
# HEX_CHAR = r'[0-9A-Fa-f]\s*[0-9A-Fa-f]'
# Even worse, MS Word also allows ANY RTF-style tag {*} in between!!
# AND the tags can be nested...
#SINGLE_RTF_TAG = r'[{][^{}]*[}]'
# Actually RTF tags may contain braces escaped with backslash (\{ \}):
SINGLE_RTF_TAG = b'[{](?:\\\\.|[^{}\\\\])*[}]'
# Nested tags, two levels (because Python's re does not support nested matching):
# NESTED_RTF_TAG = r'[{](?:[^{}]|'+SINGLE_RTF_TAG+r')*[}]'
NESTED_RTF_TAG = b'[{](?:\\\\.|[^{}\\\\]|'+SINGLE_RTF_TAG+b')*[}]'
# AND it is also allowed to insert ANY control word or control symbol (ignored)
# According to Rich Text Format (RTF) Specification Version 1.9.1,
# section "Control Word":
# control word = \<ASCII Letter [a-zA-Z] Sequence max 32><Delimiter>
# delimiter = space, OR signed integer followed by any non-digit,
# OR any character except letter and digit
# examples of valid control words:
# "\AnyThing " "\AnyThing123z" ""\AnyThing-456{" "\AnyThing{"
# control symbol = \<any char except letter or digit> (followed by anything)
ASCII_NAME = b'([a-zA-Z]{1,250})'
# using Python's re lookahead assumption:
# (?=...) Matches if ... matches next, but doesn't consume any of the string.
# This is called a lookahead assertion. For example, Isaac (?=Asimov) will
# match 'Isaac ' only if it's followed by 'Asimov'.
# TODO: Find the actual limit on the number of digits for Word
# SIGNED_INTEGER = r'(-?\d{1,250})'
SIGNED_INTEGER = b'(-?\\d+)'
# Note for issue #78: need to match "\A-" not followed by digits (or the end of string)
CONTROL_WORD = b'(?:\\\\' + ASCII_NAME + b'(?:' + SIGNED_INTEGER + b'(?=[^0-9])|(?=[^a-zA-Z0-9])|$))'
re_control_word = re.compile(CONTROL_WORD)
# Note for issue #78: need to match "\" followed by digit (any non-alpha)
CONTROL_SYMBOL = b'(?:\\\\[^a-zA-Z])'
re_control_symbol = re.compile(CONTROL_SYMBOL)
# Text that is not a control word/symbol or a group:
TEXT = b'[^{}\\\\]+'
re_text = re.compile(TEXT)
# ignored whitespaces and tags within a hex block:
IGNORED = b'(?:\\s|'+NESTED_RTF_TAG+b'|'+CONTROL_SYMBOL+b'|'+CONTROL_WORD+b')*'
#IGNORED = r'\s*'
# HEX_CHAR = HEX_DIGIT + IGNORED + HEX_DIGIT
# several hex chars, at least 4: (?:[0-9A-Fa-f]{2}){4,}
# + word boundaries
# HEX_CHARS_4orMORE = r'\b(?:' + HEX_CHAR + r'){4,}\b'
# at least 1 hex char:
# HEX_CHARS_1orMORE = r'(?:' + HEX_CHAR + r')+'
# at least 1 hex char, followed by whitespace or CR/LF:
# HEX_CHARS_1orMORE_WHITESPACES = r'(?:' + HEX_CHAR + r')+\s+'
# + word boundaries around hex block
# HEX_CHARS_1orMORE_WHITESPACES = r'\b(?:' + HEX_CHAR + r')+\b\s*'
# at least one block of hex and whitespace chars, followed by closing curly bracket:
# HEX_BLOCK_CURLY_BRACKET = r'(?:' + HEX_CHARS_1orMORE_WHITESPACES + r')+\}'
# PATTERN = r'(?:' + HEX_CHARS_1orMORE_WHITESPACES + r')*' + HEX_CHARS_1orMORE
#TODO PATTERN = r'\b(?:' + HEX_CHAR + IGNORED + r'){4,}\b'
# PATTERN = r'\b(?:' + HEX_CHAR + IGNORED + r'){4,}' #+ HEX_CHAR + r'\b'
PATTERN = b'\\b(?:' + HEX_DIGIT + IGNORED + b'){7,}' + HEX_DIGIT + b'\\b'
# at least 4 hex chars, followed by whitespace or CR/LF: (?:[0-9A-Fa-f]{2}){4,}\s*
# PATTERN = r'(?:(?:[0-9A-Fa-f]{2})+\s*)*(?:[0-9A-Fa-f]{2}){4,}'
# improved pattern, allowing semicolons within hex:
#PATTERN = r'(?:(?:[0-9A-Fa-f]{2})+\s*)*(?:[0-9A-Fa-f]{2}){4,}'
re_hexblock = re.compile(PATTERN)
re_embedded_tags = re.compile(IGNORED)
re_decimal = re.compile(b'\\d+')
re_delimiter = re.compile(b'[ \\t\\r\\n\\f\\v]')
DELIMITER = b'[ \\t\\r\\n\\f\\v]'
DELIMITERS_ZeroOrMore = b'[ \\t\\r\\n\\f\\v]*'
BACKSLASH_BIN = b'\\\\bin'
# According to my tests, Word accepts up to 250 digits (leading zeroes)
DECIMAL_GROUP = b'(\\d{1,250})'
re_delims_bin_decimal = re.compile(DELIMITERS_ZeroOrMore + BACKSLASH_BIN
+ DECIMAL_GROUP + DELIMITER)
re_delim_hexblock = re.compile(DELIMITER + PATTERN)
# TODO: use a frozenset instead of a regex?
re_executable_extensions = re.compile(
r"(?i)\.(BAT|CLASS|CMD|CPL|DLL|EXE|COM|GADGET|HTA|INF|JAR|JS|JSE|LNK|MSC|MSI|MSP|PIF|PS1|PS1XML|PS2|PS2XML|PSC1|PSC2|REG|SCF|SCR|SCT|VB|VBE|VBS|WS|WSC|WSF|WSH)\b")
# Destination Control Words, according to MS RTF Specifications v1.9.1:
DESTINATION_CONTROL_WORDS = frozenset((
b"aftncn", b"aftnsep", b"aftnsepc", b"annotation", b"atnauthor", b"atndate", b"atnid", b"atnparent", b"atnref",
b"atrfend", b"atrfstart", b"author", b"background", b"bkmkend", b"bkmkstart", b"blipuid", b"buptim", b"category",
b"colorschememapping", b"colortbl", b"comment", b"company", b"creatim", b"datafield", b"datastore", b"defchp", b"defpap",
b"do", b"doccomm", b"docvar", b"dptxbxtext", b"ebcend", b"ebcstart", b"factoidname", b"falt", b"fchars", b"ffdeftext",
b"ffentrymcr", b"ffexitmcr", b"ffformat", b"ffhelptext", b"ffl", b"ffname",b"ffstattext", b"field", b"file", b"filetbl",
b"fldinst", b"fldrslt", b"fldtype", b"fontemb", b"fonttbl", b"footer", b"footerf", b"footerl",
b"footerr", b"footnote", b"formfield", b"ftncn", b"ftnsep", b"ftnsepc", b"g", b"generator", b"gridtbl", b"header", b"headerf",
b"headerl", b"headerr", b"hl", b"hlfr", b"hlinkbase", b"hlloc", b"hlsrc", b"hsv", b"info", b"keywords",
b"latentstyles", b"lchars", b"levelnumbers", b"leveltext", b"lfolevel", b"linkval", b"list", b"listlevel", b"listname",
b"listoverride", b"listoverridetable", b"listpicture", b"liststylename", b"listtable", b"listtext", b"lsdlockedexcept",
b"macc", b"maccPr", b"mailmerge", b"malnScr", b"manager", b"margPr", b"mbar", b"mbarPr", b"mbaseJc", b"mbegChr",
b"mborderBox", b"mborderBoxPr", b"mbox", b"mboxPr", b"mchr", b"mcount", b"mctrlPr", b"md", b"mdeg", b"mdegHide", b"mden",
b"mdiff", b"mdPr", b"me", b"mendChr", b"meqArr", b"meqArrPr", b"mf", b"mfName", b"mfPr", b"mfunc", b"mfuncPr",b"mgroupChr",
b"mgroupChrPr",b"mgrow", b"mhideBot", b"mhideLeft", b"mhideRight", b"mhideTop", b"mlim", b"mlimLoc", b"mlimLow",
b"mlimLowPr", b"mlimUpp", b"mlimUppPr", b"mm", b"mmaddfieldname", b"mmathPict", b"mmaxDist", b"mmc",
b"mmcJc", b"mmconnectstr", b"mmconnectstrdata", b"mmcPr", b"mmcs", b"mmdatasource", b"mmheadersource", b"mmmailsubject",
b"mmodso", b"mmodsofilter", b"mmodsofldmpdata", b"mmodsomappedname", b"mmodsoname", b"mmodsorecipdata", b"mmodsosort",
b"mmodsosrc", b"mmodsotable", b"mmodsoudl", b"mmodsoudldata", b"mmodsouniquetag", b"mmPr", b"mmquery", b"mmr", b"mnary",
b"mnaryPr", b"mnoBreak", b"mnum", b"mobjDist", b"moMath", b"moMathPara", b"moMathParaPr", b"mopEmu", b"mphant", b"mphantPr",
b"mplcHide", b"mpos", b"mr", b"mrad", b"mradPr", b"mrPr", b"msepChr", b"mshow", b"mshp", b"msPre", b"msPrePr", b"msSub",
b"msSubPr", b"msSubSup", b"msSubSupPr", b"msSup", b"msSupPr", b"mstrikeBLTR", b"mstrikeH", b"mstrikeTLBR", b"mstrikeV",
b"msub", b"msubHide", b"msup", b"msupHide", b"mtransp", b"mtype", b"mvertJc", b"mvfmf", b"mvfml", b"mvtof", b"mvtol",
b"mzeroAsc", b"mzeroDesc", b"mzeroWid", b"nesttableprops", b"nonesttables", b"objalias", b"objclass",
b"objdata", b"object", b"objname", b"objsect", b"oldcprops", b"oldpprops", b"oldsprops", b"oldtprops",
b"oleclsid", b"operator", b"panose", b"password", b"passwordhash", b"pgp", b"pgptbl", b"picprop", b"pict", b"pn", b"pnseclvl",
b"pntext", b"pntxta", b"pntxtb", b"printim",
b"propname", b"protend", b"protstart", b"protusertbl",
b"result", b"revtbl", b"revtim",
# \rtf should not be treated as a destination (issue #522)
#b"rtf",
b"rxe", b"shp", b"shpgrp", b"shpinst", b"shppict", b"shprslt", b"shptxt",
b"sn", b"sp", b"staticval", b"stylesheet", b"subject", b"sv", b"svb", b"tc", b"template", b"themedata", b"title", b"txe", b"ud",
b"upr", b"userprops", b"wgrffmtfilter", b"windowcaption", b"writereservation", b"writereservhash", b"xe", b"xform",
b"xmlattrname", b"xmlattrvalue", b"xmlclose", b"xmlname", b"xmlnstbl", b"xmlopen",
# added for issue #292: https://github.com/decalage2/oletools/issues/292
b"margSz",
# added for issue #327:
b"pnaiu", b"pnaiud",
# It seems \private should not be treated as a destination (issue #178)
# Same for \pxe (issue #196)
# b"private", b"pxe",
# from issue #303: These destination control words can be treated as a "value" type.
# They don't consume data so they won't change the state of the parser.
# b"atnicn", b"atntime", b"fname", b"fontfile", b"htmltag", b"keycode", b"maln",
# b"mhtmltag", b"mmath", b"mmathPr", b"nextfile", b"objtime", b"rsidtbl",
))
# some str methods on Python 2.x return characters,
# while the equivalent bytes methods return integers on Python 3.x:
if sys.version_info[0] <= 2:
# Python 2.x - Characters (str)
BACKSLASH = '\\'
BRACE_OPEN = '{'
BRACE_CLOSE = '}'
UNICODE_TYPE = unicode # pylint: disable=undefined-variable
else:
# Python 3.x - Integers
BACKSLASH = ord('\\')
BRACE_OPEN = ord('{')
BRACE_CLOSE = ord('}')
UNICODE_TYPE = str
RTF_MAGIC = b'\x7b\\rt' # \x7b == b'{' but does not mess up auto-indent
def duration_str(duration):
""" create a human-readable string representation of duration [s] """
value = duration
unit = 's'
if value > 90:
value /= 60.
unit = 'min'
if value > 90:
value /= 60.
unit = 'h'
if value > 72:
value /= 24.
unit = 'days'
return '{0:.1f}{1}'.format(value, unit)
#=== CLASSES =================================================================
class Destination(object):
"""
Stores the data associated with a destination control word
"""
def __init__(self, cword=None):
self.cword = cword
self.data = b''
self.start = None
self.end = None
self.group_level = 0
# class Group(object):
# """
# Stores the data associated with a group between braces {...}
# """
# def __init__(self, cword=None):
# self.start = None
# self.end = None
# self.level = None
class RtfParser(object):
"""
Very simple but robust generic RTF parser, designed to handle
malformed malicious RTF as MS Word does
"""
def __init__(self, data):
"""
RtfParser constructor.
:param data: bytes object containing the RTF data to be parsed
"""
self.data = data
self.index = 0
self.size = len(data)
self.group_level = 0
# default destination for the document text:
document_destination = Destination()
self.destinations = [document_destination]
self.current_destination = document_destination
def _report_progress(self, start_time):
""" report progress on parsing at regular intervals """
now = float(time())
if now == start_time or self.size == 0:
return # avoid zero-division
percent_done = 100. * self.index / self.size
time_per_index = (now - start_time) / float(self.index)
finish_estim = float(self.size - self.index) * time_per_index
log.debug('After {0} finished {1:4.1f}% of current file ({2} bytes); '
'will finish in approx {3}'
.format(duration_str(now-start_time), percent_done,
self.size, duration_str(finish_estim)))
def parse(self):
"""
Parse the RTF data
:return: nothing
"""
# Start at beginning of data
self.index = 0
start_time = time()
last_report = start_time
# Loop until the end
while self.index < self.size:
if time() - last_report > 15: # report every 15s
self._report_progress(start_time)
last_report = time()
if self.data[self.index] == BRACE_OPEN:
# Found an opening brace "{": Start of a group
self._open_group()
self.index += 1
continue
if self.data[self.index] == BRACE_CLOSE:
# Found a closing brace "}": End of a group
self._close_group()
self.index += 1
continue
if self.data[self.index] == BACKSLASH:
# Found a backslash "\": Start of a control word or control symbol
# Use a regex to extract the control word name if present:
# NOTE: the full length of the control word + its optional integer parameter
# is limited by MS Word at 253 characters, so we have to run the regex
# on a cropped string:
data_cropped = self.data[self.index:self.index+254]
# append a space so that the regex can check the following character:
data_cropped += b' '
# m = re_control_word.match(self.data, self.index, self.index+253)
m = re_control_word.match(data_cropped)
if m:
cword = m.group(1)
param = None
if len(m.groups()) > 1:
param = m.group(2)
# log.debug('control word at index %Xh - cword=%r param=%r %r' % (self.index, cword, param, m.group()))
self._control_word(m, cword, param)
self.index += len(m.group())
# if it's \bin, call _bin after updating index
if cword == b'bin':
self._bin(m, param)
continue
# Otherwise, it may be a control symbol:
m = re_control_symbol.match(self.data, self.index)
if m:
self.control_symbol(m)
self.index += len(m.group())
continue
# Otherwise, this is plain text:
# Use a regex to match all characters until the next brace or backslash:
m = re_text.match(self.data, self.index)
if m:
self._text(m)
self.index += len(m.group())
continue
raise RuntimeError('Should not have reached this point - index=%Xh' % self.index)
# call _end_of_file to make sure all groups are closed properly
self._end_of_file()
def _open_group(self):
self.group_level += 1
#log.debug('{ Open Group at index %Xh - level=%d' % (self.index, self.group_level))
# call user method AFTER increasing the level:
self.open_group()
def open_group(self):
#log.debug('open group at index %Xh' % self.index)
pass
def _close_group(self):
#log.debug('} Close Group at index %Xh - level=%d' % (self.index, self.group_level))
# call user method BEFORE decreasing the level:
self.close_group()
# if the destination level is the same as the group level, close the destination:
if self.group_level == self.current_destination.group_level:
# log.debug('Current Destination %r level = %d => Close Destination' % (
# self.current_destination.cword, self.current_destination.group_level))
self._close_destination()
else:
# log.debug('Current Destination %r level = %d => Continue with same Destination' % (
# self.current_destination.cword, self.current_destination.group_level))
pass
self.group_level -= 1
# log.debug('Decreased group level to %d' % self.group_level)
def close_group(self):
#log.debug('close group at index %Xh' % self.index)
pass
def _open_destination(self, matchobject, cword):
# if the current destination is at the same group level, close it first:
if self.current_destination.group_level == self.group_level:
self._close_destination()
new_dest = Destination(cword)
new_dest.group_level = self.group_level
self.destinations.append(new_dest)
self.current_destination = new_dest
# start of the destination is right after the control word:
new_dest.start = self.index + len(matchobject.group())
# log.debug("Open Destination %r start=%Xh - level=%d" % (cword, new_dest.start, new_dest.group_level))
# call the corresponding user method for additional processing:
self.open_destination(self.current_destination)
def open_destination(self, destination):
pass
def _close_destination(self):
# log.debug("Close Destination %r end=%Xh - level=%d" % (self.current_destination.cword,
# self.index, self.current_destination.group_level))
self.current_destination.end = self.index
# call the corresponding user method for additional processing:
self.close_destination(self.current_destination)
if len(self.destinations)>0:
# remove the current destination from the stack, and go back to the previous one:
self.destinations.pop()
if len(self.destinations) > 0:
self.current_destination = self.destinations[-1]
else:
# log.debug('All destinations are closed, keeping the document destination open')
pass
def close_destination(self, destination):
pass
def _control_word(self, matchobject, cword, param):
#log.debug('control word %r at index %Xh' % (matchobject.group(), self.index))
# TODO: according to RTF specs v1.9.1, "Destination changes are legal only immediately after an opening brace ({)"
# (not counting the special control symbol \*, of course)
if cword in DESTINATION_CONTROL_WORDS:
log.debug('%r is a destination control word: starting a new destination at index %Xh' % (cword, self.index))
self._open_destination(matchobject, cword)
# call the corresponding user method for additional processing:
self.control_word(matchobject, cword, param)
def control_word(self, matchobject, cword, param):
pass
def control_symbol(self, matchobject):
#log.debug('control symbol %r at index %Xh' % (matchobject.group(), self.index))
pass
def _text(self, matchobject):
text = matchobject.group()
self.current_destination.data += text
self.text(matchobject, text)
def text(self, matchobject, text):
#log.debug('text %r at index %Xh' % (matchobject.group(), self.index))
pass
def _bin(self, matchobject, param):
if param is None:
log.info('Detected anti-analysis trick: \\bin object without length at index %X' % self.index)
binlen = 0
else:
binlen = int(param)
# handle negative length
if binlen < 0:
log.info('Detected anti-analysis trick: \\bin object with negative length at index %X' % self.index)
# binlen = int(param.strip('-'))
# According to my tests, if the bin length is negative,
# it should be treated as a null length:
binlen=0
# ignore optional space after \bin
if ord(self.data[self.index:self.index + 1]) == ord(' '):
log.debug('\\bin: ignoring whitespace before data')
self.index += 1
log.debug('\\bin: reading %d bytes of binary data' % binlen)
# TODO: handle length greater than data
bindata = self.data[self.index:self.index + binlen]
self.index += binlen
self.bin(bindata)
def bin(self, bindata):
pass
def _end_of_file(self):
# log.debug('%Xh Reached End of File')
# close any group/destination that is still open:
while self.group_level > 0:
log.debug('Group Level = %d, closing group' % self.group_level)
self._close_group()
self.end_of_file()
def end_of_file(self):
pass
class RtfObject(object):
"""
An object or a file (OLE Package) embedded into an RTF document
"""
def __init__(self):
"""
RtfObject constructor
"""
# start and end index in the RTF file:
self.start = None
self.end = None
# raw object data encoded in hexadecimal, as found in the RTF file:
self.hexdata = None
# raw object data in binary form, decoded from hexadecimal
self.rawdata = None
# OLE object data (extracted from rawdata)
self.is_ole = False
self.oledata = None
self.format_id = None
self.class_name = None
self.oledata_size = None
# OLE Package data (extracted from oledata)
self.is_package = False
self.olepkgdata = None
self.filename = None
self.src_path = None
self.temp_path = None
self.ftg = None # ftguess.FileTypeGuesser to identify file type
# Additional OLE object data
self.clsid = None
self.clsid_desc = None
class RtfObjParser(RtfParser):
"""
Specialized RTF parser to extract OLE objects
"""
def __init__(self, data):
super(RtfObjParser, self).__init__(data)
# list of RtfObjects found
self.objects = []
def open_destination(self, destination):
# TODO: detect when the destination is within an objdata, report as obfuscation
if destination.cword == b'objdata':
log.debug('*** Start object data at index %Xh' % destination.start)
def close_destination(self, destination):
if destination.cword == b'objdata':
log.debug('*** Close object data at index %Xh' % self.index)
rtfobj = RtfObject()
self.objects.append(rtfobj)
rtfobj.start = destination.start
rtfobj.end = destination.end
# Filter out all whitespaces first (just ignored):
hexdata1 = destination.data.translate(None, b' \t\r\n\f\v')
# Then filter out any other non-hex character:
hexdata = re.sub(b'[^a-fA-F0-9]', b'', hexdata1)
if len(hexdata) < len(hexdata1):
# this is only for debugging:
nonhex = re.sub(b'[a-fA-F0-9]', b'', hexdata1)
log.debug('Found non-hex chars in hexdata: %r' % nonhex)
# MS Word accepts an extra hex digit, so we need to trim it if present:
if len(hexdata) & 1:
log.debug('Odd length, trimmed last byte.')
hexdata = hexdata[:-1]
rtfobj.hexdata = hexdata
object_data = binascii.unhexlify(hexdata)
rtfobj.rawdata = object_data
rtfobj.rawdata_md5 = hashlib.md5(object_data).hexdigest()
# TODO: check if all hex data is extracted properly
obj = oleobj.OleObject()
try:
obj.parse(object_data)
rtfobj.format_id = obj.format_id
rtfobj.class_name = obj.class_name
rtfobj.oledata_size = obj.data_size
rtfobj.oledata = obj.data
rtfobj.oledata_md5 = hashlib.md5(obj.data).hexdigest()
rtfobj.is_ole = True
if obj.class_name.lower().rstrip(b'\0') == b'package':
opkg = oleobj.OleNativeStream(bindata=obj.data,
package=True)
rtfobj.filename = opkg.filename
rtfobj.src_path = opkg.src_path
rtfobj.temp_path = opkg.temp_path
rtfobj.olepkgdata = opkg.data
rtfobj.olepkgdata_md5 = hashlib.md5(opkg.data).hexdigest()
# use ftguess to identify file type from content:
rtfobj.ftg = ftguess.FileTypeGuesser(data=rtfobj.olepkgdata)
rtfobj.is_package = True
else:
if olefile.isOleFile(obj.data):
ole = olefile.OleFileIO(obj.data)
rtfobj.clsid = ole.root.clsid
rtfobj.clsid_desc = clsid.KNOWN_CLSIDS.get(rtfobj.clsid.upper(),
'unknown CLSID (please report at https://github.com/decalage2/oletools/issues)')
except:
pass
log.debug('*** Not an OLE 1.0 Object')
def bin(self, bindata):
if self.current_destination.cword == b'objdata':
# TODO: keep track of this, because it is unusual and indicates potential obfuscation
# trick: hexlify binary data, add it to hex data
self.current_destination.data += binascii.hexlify(bindata)
def control_word(self, matchobject, cword, param):
# TODO: extract useful cwords such as objclass
# TODO: keep track of cwords inside objdata, because it is unusual and indicates potential obfuscation
# TODO: same with control symbols, and opening bracket
# log.debug('- Control word "%s", param=%s, level=%d' % (cword, param, self.group_level))
pass
def control_symbol(self, matchobject):
# log.debug('control symbol %r at index %Xh' % (matchobject.group(), self.index))
symbol = matchobject.group()[1:2]
if symbol == b"'":
# read the two hex digits following "\'" - which can be any characters, not just hex digits
# (because within an objdata destination, they are simply ignored)
hexdigits = self.data[self.index+2:self.index+4]
# print(hexdigits)
# move the index two bytes forward
self.index += 2
if self.current_destination.cword == b'objdata':
# Here's the tricky part: there is a bug in the MS Word RTF parser at least
# until Word 2016, that removes the last hex digit before the \'hh control
# symbol, ONLY IF the number of hex digits read so far is odd.
# So to emulate that bug, we have to clean the data read so far by keeping
# only the hex digits:
# Filter out any non-hex character:
self.current_destination.data = re.sub(b'[^a-fA-F0-9]', b'', self.current_destination.data)
if len(self.current_destination.data) & 1 == 1:
# If the number of hex digits is odd, remove the last one:
self.current_destination.data = self.current_destination.data[:-1]
#=== FUNCTIONS ===============================================================
def rtf_iter_objects(filename, min_size=32):
"""
[DEPRECATED] Backward-compatible API, for applications using the old rtfobj:
Open a RTF file, extract each embedded object encoded in hexadecimal of
size > min_size, yield the index of the object in the RTF file, the original
length in the RTF file, and the decoded object data in binary format.
This is an iterator.
:param filename: str, RTF file name/path to open on disk
:param min_size: ignored, kept for backward compatibility
:returns: iterator, yielding tuples (start index, original length, binary data)
"""
data = open(filename, 'rb').read()
rtfp = RtfObjParser(data)
rtfp.parse()
for obj in rtfp.objects:
orig_len = obj.end - obj.start
yield obj.start, orig_len, obj.rawdata
def is_rtf(arg, treat_str_as_data=False):
""" determine whether given file / stream / array represents an rtf file
arg can be either a file name, a byte stream (located at start), a
list/tuple or a an iterable that contains bytes.
For str it is not clear whether data is a file name or the data read from
it (at least for py2-str which is bytes). Argument treat_str_as_data
clarifies.
"""
magic_len = len(RTF_MAGIC)
if isinstance(arg, UNICODE_TYPE):
with open(arg, 'rb') as reader:
return reader.read(len(RTF_MAGIC)) == RTF_MAGIC
if isinstance(arg, bytes) and not isinstance(arg, str): # only in PY3
return arg[:magic_len] == RTF_MAGIC
if isinstance(arg, bytearray):
return arg[:magic_len] == RTF_MAGIC
if isinstance(arg, str): # could be bytes, but we assume file name
if treat_str_as_data:
try:
return arg[:magic_len].encode('ascii', errors='strict')\
== RTF_MAGIC
except UnicodeError:
return False
else:
with open(arg, 'rb') as reader:
return reader.read(len(RTF_MAGIC)) == RTF_MAGIC
if hasattr(arg, 'read'): # a stream (i.e. file-like object)
return arg.read(len(RTF_MAGIC)) == RTF_MAGIC
if isinstance(arg, (list, tuple)):
iter_arg = iter(arg)
else:
iter_arg = arg
# check iterable
for magic_byte in zip(RTF_MAGIC):
try:
if next(iter_arg) not in magic_byte:
return False
except StopIteration:
return False
return True # checked the complete magic without returning False --> match
def sanitize_filename(filename, replacement='_', max_length=200):
"""compute basename of filename. Replaces all non-whitelisted characters.
The returned filename is always a basename of the file."""
basepath = os.path.basename(filename).strip()
sane_fname = re.sub(r'[^\w\.\- ]', replacement, basepath)
while ".." in sane_fname:
sane_fname = sane_fname.replace('..', '.')
while " " in sane_fname:
sane_fname = sane_fname.replace(' ', ' ')
if not len(filename):
sane_fname = 'NONAME'
# limit filename length
if max_length:
sane_fname = sane_fname[:max_length]
return sane_fname
def process_file(container, filename, data, output_dir=None, save_object=False):
if output_dir:
if not os.path.isdir(output_dir):
log.info('creating output directory %s' % output_dir)
os.mkdir(output_dir)
fname_prefix = os.path.join(output_dir,
sanitize_filename(filename))
else:
base_dir = os.path.dirname(filename)
sane_fname = sanitize_filename(filename)
fname_prefix = os.path.join(base_dir, sane_fname)
# TODO: option to extract objects to files (false by default)
if data is None:
data = open(filename, 'rb').read()
print('='*79)
print('File: %r - size: %d bytes' % (filename, len(data)))
tstream = tablestream.TableStream(
column_width=(3, 10, 63),
header_row=('id', 'index', 'OLE Object'),
style=tablestream.TableStyleSlim
)
rtfp = RtfObjParser(data)
rtfp.parse()
for rtfobj in rtfp.objects:
ole_color = None
if rtfobj.is_ole:
ole_column = 'format_id: %d ' % rtfobj.format_id
if rtfobj.format_id == oleobj.OleObject.TYPE_EMBEDDED:
ole_column += '(Embedded)\n'
elif rtfobj.format_id == oleobj.OleObject.TYPE_LINKED:
ole_column += '(Linked)\n'
else:
ole_column += '(Unknown)\n'
ole_column += 'class name: %r\n' % rtfobj.class_name
# if the object is linked and not embedded, data_size=None:
if rtfobj.oledata_size is None:
ole_column += 'data size: N/A'
else:
ole_column += 'data size: %d' % rtfobj.oledata_size
if rtfobj.is_package:
ole_column += '\nOLE Package object:'
ole_column += '\nFilename: %r' % rtfobj.filename
ole_column += '\nSource path: %r' % rtfobj.src_path
ole_column += '\nTemp path = %r' % rtfobj.temp_path
ole_column += '\nMD5 = %r' % rtfobj.olepkgdata_md5
ole_color = 'yellow'
# check if the file extension is executable:
_, temp_ext = os.path.splitext(rtfobj.temp_path)
log.debug('Temp path extension: %r' % temp_ext)
_, file_ext = os.path.splitext(rtfobj.filename)
log.debug('File extension: %r' % file_ext)
if temp_ext != file_ext:
ole_column += "\nMODIFIED FILE EXTENSION"
if re_executable_extensions.match(temp_ext) or re_executable_extensions.match(file_ext):
ole_color = 'red'
ole_column += '\nEXECUTABLE FILE'
ole_column += '\nFile Type: {}'.format(rtfobj.ftg.ftype.name)
else:
ole_column += '\nMD5 = %r' % rtfobj.oledata_md5
if rtfobj.clsid is not None:
ole_column += '\nCLSID: %s' % rtfobj.clsid
ole_column += '\n%s' % rtfobj.clsid_desc
if 'CVE' in rtfobj.clsid_desc:
ole_color = 'red'
# Detect OLE2Link exploit
# http://www.kb.cert.org/vuls/id/921560
if rtfobj.class_name == b'OLE2Link':
ole_color = 'red'
ole_column += '\nPossibly an exploit for the OLE2Link vulnerability (VU#921560, CVE-2017-0199)\n'
# https://bitbucket.org/snippets/Alexander_Hanel/7Adpp
urls = []
# We look for unicode strings of 3+ chars in the OLE object data:
# Here the regex must be a bytes string (issue #692)
# but Python 2.7 does not support rb'...' so we use b'...' and escape backslashes
pat = re.compile(b'(?:[\\x20-\\x7E][\\x00]){3,}')
words = [w.decode('utf-16le') for w in pat.findall(rtfobj.oledata)]
for w in words:
# TODO: we could use the URL_RE regex from olevba to be more precise
if "http" in w:
urls.append(w)
urls = sorted(set(urls))
if urls:
ole_column += 'URL extracted: ' + ', '.join(urls)
# Detect Equation Editor exploit
# https://www.kb.cert.org/vuls/id/421280/
elif rtfobj.class_name.lower().startswith(b'equation.3'):
ole_color = 'red'
ole_column += '\nPossibly an exploit for the Equation Editor vulnerability (VU#421280, CVE-2017-11882)'
else:
ole_column = 'Not a well-formed OLE object'
tstream.write_row((
rtfp.objects.index(rtfobj),
# filename,
'%08Xh' % rtfobj.start,
ole_column
), colors=(None, None, ole_color)
)
tstream.write_sep()
if save_object:
if save_object == 'all':
objects = rtfp.objects
else:
try:
i = int(save_object)
objects = [ rtfp.objects[i] ]
except:
log.error('The -s option must be followed by an object index or all, such as "-s 2" or "-s all"')
return
for rtfobj in objects:
i = objects.index(rtfobj)
if rtfobj.is_package:
print('Saving file from OLE Package in object #%d:' % i)
print(' Filename = %r' % rtfobj.filename)
print(' Source path = %r' % rtfobj.src_path)
print(' Temp path = %r' % rtfobj.temp_path)
if rtfobj.filename:
fname = '%s_%s' % (fname_prefix,
sanitize_filename(rtfobj.filename))
else:
fname = '%s_object_%08X.noname' % (fname_prefix, rtfobj.start)
print(' saving to file %s' % fname)
print(' md5 %s' % rtfobj.olepkgdata_md5)
open(fname, 'wb').write(rtfobj.olepkgdata)
# When format_id=TYPE_LINKED, oledata_size=None
elif rtfobj.is_ole and rtfobj.oledata_size is not None:
print('Saving file embedded in OLE object #%d:' % i)
print(' format_id = %d' % rtfobj.format_id)
print(' class name = %r' % rtfobj.class_name)
print(' data size = %d' % rtfobj.oledata_size)
# set a file extension according to the class name:
class_name = rtfobj.class_name.lower()
if class_name.startswith(b'word'):
ext = 'doc'
elif class_name.startswith(b'package'):
ext = 'package'
else:
ext = 'bin'
fname = '%s_object_%08X.%s' % (fname_prefix, rtfobj.start, ext)
print(' saving to file %s' % fname)
print(' md5 %s' % rtfobj.oledata_md5)
open(fname, 'wb').write(rtfobj.oledata)
else:
print('Saving raw data in object #%d:' % i)
fname = '%s_object_%08X.raw' % (fname_prefix, rtfobj.start)
print(' saving object to file %s' % fname)
print(' md5 %s' % rtfobj.rawdata_md5)
open(fname, 'wb').write(rtfobj.rawdata)
#=== MAIN =================================================================
def main():
# print banner with version
python_version = '%d.%d.%d' % sys.version_info[0:3]
print ('rtfobj %s on Python %s - http://decalage.info/python/oletools' %
(__version__, python_version))
print ('THIS IS WORK IN PROGRESS - Check updates regularly!')
print ('Please report any issue at https://github.com/decalage2/oletools/issues')
print ('')
DEFAULT_LOG_LEVEL = "warning" # Default log level
LOG_LEVELS = {
'debug': logging.DEBUG,
'info': logging.INFO,
'warning': logging.WARNING,
'error': logging.ERROR,
'critical': logging.CRITICAL
}
usage = 'usage: %prog [options] <filename> [filename2 ...]'
parser = optparse.OptionParser(usage=usage)
# parser.add_option('-o', '--outfile', dest='outfile',
# help='output file')
# parser.add_option('-c', '--csv', dest='csv',
# help='export results to a CSV file')
parser.add_option("-r", action="store_true", dest="recursive",
help='find files recursively in subdirectories.')
parser.add_option("-z", "--zip", dest='zip_password', type='str', default=None,
help='if the file is a zip archive, open first file from it, using the provided password (requires Python 2.6+)')
parser.add_option("-f", "--zipfname", dest='zip_fname', type='str', default='*',
help='if the file is a zip archive, file(s) to be opened within the zip. Wildcards * and ? are supported. (default:*)')
parser.add_option('-l', '--loglevel', dest="loglevel", action="store", default=DEFAULT_LOG_LEVEL,
help="logging level debug/info/warning/error/critical (default=%default)")
parser.add_option("-s", "--save", dest='save_object', type='str', default=None,
help='Save the object corresponding to the provided number to a file, for example "-s 2". Use "-s all" to save all objects at once.')
# parser.add_option("-o", "--outfile", dest='outfile', type='str', default=None,
# help='Filename to be used when saving an object to a file.')
parser.add_option("-d", type="str", dest="output_dir",
help='use specified directory to save output files.', default=None)
# parser.add_option("--pkg", action="store_true", dest="save_pkg",
# help='Save OLE Package binary data of extracted objects (file embedded into an OLE Package).')
# parser.add_option("--ole", action="store_true", dest="save_ole",
# help='Save OLE binary data of extracted objects (object data without the OLE container).')
# parser.add_option("--raw", action="store_true", dest="save_raw",
# help='Save raw binary data of extracted objects (decoded from hex, including the OLE container).')
# parser.add_option("--hex", action="store_true", dest="save_hex",
# help='Save raw hexadecimal data of extracted objects (including the OLE container).')
(options, args) = parser.parse_args()
# Print help if no arguments are passed
if len(args) == 0:
print (__doc__)
parser.print_help()
sys.exit()
# Setup logging to the console:
# here we use stdout instead of stderr by default, so that the output
# can be redirected properly.
logging.basicConfig(level=LOG_LEVELS[options.loglevel], stream=sys.stdout,
format='%(levelname)-8s %(message)s')
# enable logging in the modules:
log.setLevel(logging.NOTSET)
oleobj.enable_logging()
for container, filename, data in xglob.iter_files(args, recursive=options.recursive,
zip_password=options.zip_password, zip_fname=options.zip_fname):
# ignore directory names stored in zip files:
if container and filename.endswith('/'):
continue
process_file(container, filename, data, output_dir=options.output_dir,
save_object=options.save_object)
if __name__ == '__main__':
main()
# This code was developed while listening to The Mary Onettes "Lost"