##############################################################################
#
# Copyright (c) 2002 Zope Foundation and Contributors.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE
#
##############################################################################
"""
transformer module:
uses Python standard library ast module and its containing classes to transform
the parsed python code to create a modified AST for a byte code generation.
"""
import ast
import contextlib
import textwrap
# For AugAssign the operator must be converted to a string.
IOPERATOR_TO_STR = {
ast.Add: '+=',
ast.Sub: '-=',
ast.Mult: '*=',
ast.Div: '/=',
ast.Mod: '%=',
ast.Pow: '**=',
ast.LShift: '<<=',
ast.RShift: '>>=',
ast.BitOr: '|=',
ast.BitXor: '^=',
ast.BitAnd: '&=',
ast.FloorDiv: '//=',
ast.MatMult: '@=',
}
# For creation allowed magic method names. See also
# https://docs.python.org/3/reference/datamodel.html#special-method-names
ALLOWED_FUNC_NAMES = frozenset([
'__init__',
'__contains__',
'__lt__',
'__le__',
'__eq__',
'__ne__',
'__gt__',
'__ge__',
])
FORBIDDEN_FUNC_NAMES = frozenset([
'print',
'printed',
'builtins',
'breakpoint',
])
# Attributes documented in the `inspect` module, but defined on the listed
# objects. See also https://docs.python.org/3/library/inspect.html
INSPECT_ATTRIBUTES = frozenset([
# on traceback objects:
"tb_frame",
# "tb_lasti", # int
# "tb_lineno", # int
"tb_next",
# on frame objects:
"f_back",
"f_builtins",
"f_code",
"f_globals",
# "f_lasti", # int
# "f_lineno", # int
"f_locals",
"f_trace",
# on code objects:
# "co_argcount", # int
"co_code",
# "co_cellvars", # tuple of str
# "co_consts", # tuple of str
# "co_filename", # str
# "co_firstlineno", # int
# "co_flags", # int
# "co_lnotab", # mapping between ints and indices
# "co_freevars", # tuple of strings
# "co_posonlyargcount", # int
# "co_kwonlyargcount", # int
# "co_name", # str
# "co_qualname", # str
# "co_names", # str
# "co_nlocals", # int
# "co_stacksize", # int
# "co_varnames", # tuple of str
# on generator objects:
"gi_frame",
# "gi_running", # bool
"gi_code",
"gi_yieldfrom",
# on coroutine objects:
"cr_await",
"cr_frame",
# "cr_running", # bool
"cr_code",
"cr_origin",
])
# When new ast nodes are generated they have no 'lineno', 'end_lineno',
# 'col_offset' and 'end_col_offset'. This function copies these fields from the
# incoming node:
def copy_locations(new_node, old_node):
assert 'lineno' in new_node._attributes
new_node.lineno = old_node.lineno
assert 'end_lineno' in new_node._attributes
new_node.end_lineno = old_node.end_lineno
assert 'col_offset' in new_node._attributes
new_node.col_offset = old_node.col_offset
assert 'end_col_offset' in new_node._attributes
new_node.end_col_offset = old_node.end_col_offset
ast.fix_missing_locations(new_node)
class PrintInfo:
def __init__(self):
self.print_used = False
self.printed_used = False
@contextlib.contextmanager
def new_print_scope(self):
old_print_used = self.print_used
old_printed_used = self.printed_used
self.print_used = False
self.printed_used = False
try:
yield
finally:
self.print_used = old_print_used
self.printed_used = old_printed_used
class RestrictingNodeTransformer(ast.NodeTransformer):
def __init__(self, errors=None, warnings=None, used_names=None):
super().__init__()
self.errors = [] if errors is None else errors
self.warnings = [] if warnings is None else warnings
# All the variables used by the incoming source.
# Internal names/variables, like the ones from 'gen_tmp_name', don't
# have to be added.
# 'used_names' is for example needed by 'RestrictionCapableEval' to
# know wich names it has to supply when calling the final code.
self.used_names = {} if used_names is None else used_names
# Global counter to construct temporary variable names.
self._tmp_idx = 0
self.print_info = PrintInfo()
def gen_tmp_name(self):
# 'check_name' ensures that no variable is prefixed with '_'.
# => Its safe to use '_tmp..' as a temporary variable.
name = '_tmp%i' % self._tmp_idx
self._tmp_idx += 1
return name
def error(self, node, info):
"""Record a security error discovered during transformation."""
lineno = getattr(node, 'lineno', None)
self.errors.append(
f'Line {lineno}: {info}')
def warn(self, node, info):
"""Record a security error discovered during transformation."""
lineno = getattr(node, 'lineno', None)
self.warnings.append(
f'Line {lineno}: {info}')
def guard_iter(self, node):
"""
Converts:
for x in expr
to
for x in _getiter_(expr)
Also used for
* list comprehensions
* dict comprehensions
* set comprehensions
* generator expresions
"""
node = self.node_contents_visit(node)
if isinstance(node.target, ast.Tuple):
spec = self.gen_unpack_spec(node.target)
new_iter = ast.Call(
func=ast.Name('_iter_unpack_sequence_', ast.Load()),
args=[node.iter, spec, ast.Name('_getiter_', ast.Load())],
keywords=[])
else:
new_iter = ast.Call(
func=ast.Name("_getiter_", ast.Load()),
args=[node.iter],
keywords=[])
copy_locations(new_iter, node.iter)
node.iter = new_iter
return node
def is_starred(self, ob):
return isinstance(ob, ast.Starred)
def gen_unpack_spec(self, tpl):
"""Generate a specification for 'guarded_unpack_sequence'.
This spec is used to protect sequence unpacking.
The primary goal of this spec is to tell which elements in a sequence
are sequences again. These 'child' sequences have to be protected
again.
For example there is a sequence like this:
(a, (b, c), (d, (e, f))) = g
On a higher level the spec says:
- There is a sequence of len 3
- The element at index 1 is a sequence again with len 2
- The element at index 2 is a sequence again with len 2
- The element at index 1 in this subsequence is a sequence again
with len 2
With this spec 'guarded_unpack_sequence' does something like this for
protection (len checks are omitted):
t = list(_getiter_(g))
t[1] = list(_getiter_(t[1]))
t[2] = list(_getiter_(t[2]))
t[2][1] = list(_getiter_(t[2][1]))
return t
The 'real' spec for the case above is then:
spec = {
'min_len': 3,
'childs': (
(1, {'min_len': 2, 'childs': ()}),
(2, {
'min_len': 2,
'childs': (
(1, {'min_len': 2, 'childs': ()})
)
}
)
)
}
So finally the assignment above is converted into:
(a, (b, c), (d, (e, f))) = guarded_unpack_sequence(g, spec)
"""
spec = ast.Dict(keys=[], values=[])
spec.keys.append(ast.Constant('childs'))
spec.values.append(ast.Tuple([], ast.Load()))
# starred elements in a sequence do not contribute into the min_len.
# For example a, b, *c = g
# g must have at least 2 elements, not 3. 'c' is empyt if g has only 2.
min_len = len([ob for ob in tpl.elts if not self.is_starred(ob)])
offset = 0
for idx, val in enumerate(tpl.elts):
# After a starred element specify the child index from the back.
# Since it is unknown how many elements from the sequence are
# consumed by the starred element.
# For example a, *b, (c, d) = g
# Then (c, d) has the index '-1'
if self.is_starred(val):
offset = min_len + 1
elif isinstance(val, ast.Tuple):
el = ast.Tuple([], ast.Load())
el.elts.append(ast.Constant(idx - offset))
el.elts.append(self.gen_unpack_spec(val))
spec.values[0].elts.append(el)
spec.keys.append(ast.Constant('min_len'))
spec.values.append(ast.Constant(min_len))
return spec
def protect_unpack_sequence(self, target, value):
spec = self.gen_unpack_spec(target)
return ast.Call(
func=ast.Name('_unpack_sequence_', ast.Load()),
args=[value, spec, ast.Name('_getiter_', ast.Load())],
keywords=[])
def gen_unpack_wrapper(self, node, target):
"""Helper function to protect tuple unpacks.
node: used to copy the locations for the new nodes.
target: is the tuple which must be protected.
It returns a tuple with two element.
Element 1: Is a temporary name node which must be used to
replace the target.
The context (store, param) is defined
by the 'ctx' parameter..
Element 2: Is a try .. finally where the body performs the
protected tuple unpack of the temporary variable
into the original target.
"""
# Generate a tmp name to replace the tuple with.
tmp_name = self.gen_tmp_name()
# Generates an expressions which protects the unpack.
# converter looks like 'wrapper(tmp_name)'.
# 'wrapper' takes care to protect sequence unpacking with _getiter_.
converter = self.protect_unpack_sequence(
target,
ast.Name(tmp_name, ast.Load()))
# Assign the expression to the original names.
# Cleanup the temporary variable.
# Generates:
# try:
# # converter is 'wrapper(tmp_name)'
# arg = converter
# finally:
# del tmp_arg
try_body = [ast.Assign(targets=[target], value=converter)]
finalbody = [self.gen_del_stmt(tmp_name)]
cleanup = ast.Try(
body=try_body, finalbody=finalbody, handlers=[], orelse=[])
# This node is used to catch the tuple in a tmp variable.
tmp_target = ast.Name(tmp_name, ast.Store())
copy_locations(tmp_target, node)
copy_locations(cleanup, node)
return (tmp_target, cleanup)
def gen_none_node(self):
return ast.NameConstant(value=None)
def gen_del_stmt(self, name_to_del):
return ast.Delete(targets=[ast.Name(name_to_del, ast.Del())])
def transform_slice(self, slice_):
"""Transform slices into function parameters.
ast.Slice nodes are only allowed within a ast.Subscript node.
To use a slice as an argument of ast.Call it has to be converted.
Conversion is done by calling the 'slice' function from builtins
"""
if isinstance(slice_, ast.expr):
# Python 3.9+
return slice_
elif isinstance(slice_, ast.Index):
return slice_.value
elif isinstance(slice_, ast.Slice):
# Create a python slice object.
args = []
if slice_.lower:
args.append(slice_.lower)
else:
args.append(self.gen_none_node())
if slice_.upper:
args.append(slice_.upper)
else:
args.append(self.gen_none_node())
if slice_.step:
args.append(slice_.step)
else:
args.append(self.gen_none_node())
return ast.Call(
func=ast.Name('slice', ast.Load()),
args=args,
keywords=[])
elif isinstance(slice_, ast.ExtSlice):
dims = ast.Tuple([], ast.Load())
for item in slice_.dims:
dims.elts.append(self.transform_slice(item))
return dims
else: # pragma: no cover
# Index, Slice and ExtSlice are only defined Slice types.
raise NotImplementedError(f"Unknown slice type: {slice_}")
def check_name(self, node, name, allow_magic_methods=False):
"""Check names if they are allowed.
If ``allow_magic_methods is True`` names in `ALLOWED_FUNC_NAMES`
are additionally allowed although their names start with `_`.
"""
if name is None:
return
if (name.startswith('_')
and name != '_'
and not (allow_magic_methods
and name in ALLOWED_FUNC_NAMES
and node.col_offset != 0)):
self.error(
node,
'"{name}" is an invalid variable name because it '
'starts with "_"'.format(name=name))
elif name.endswith('__roles__'):
self.error(node, '"%s" is an invalid variable name because '
'it ends with "__roles__".' % name)
elif name in FORBIDDEN_FUNC_NAMES:
self.error(node, f'"{name}" is a reserved name.')
def check_function_argument_names(self, node):
for arg in node.args.args:
self.check_name(node, arg.arg)
if node.args.vararg:
self.check_name(node, node.args.vararg.arg)
if node.args.kwarg:
self.check_name(node, node.args.kwarg.arg)
for arg in node.args.kwonlyargs:
self.check_name(node, arg.arg)
def check_import_names(self, node):
"""Check the names being imported.
This is a protection against rebinding dunder names like
_getitem_, _write_ via imports.
=> 'from _a import x' is ok, because '_a' is not added to the scope.
"""
for name in node.names:
if '*' in name.name:
self.error(node, '"*" imports are not allowed.')
self.check_name(node, name.name)
if name.asname:
self.check_name(node, name.asname)
return self.node_contents_visit(node)
def inject_print_collector(self, node, position=0):
print_used = self.print_info.print_used
printed_used = self.print_info.printed_used
if print_used or printed_used:
# Add '_print = _print_(_getattr_)' add the top of a
# function/module.
_print = ast.Assign(
targets=[ast.Name('_print', ast.Store())],
value=ast.Call(
func=ast.Name("_print_", ast.Load()),
args=[ast.Name("_getattr_", ast.Load())],
keywords=[]))
if isinstance(node, ast.Module):
_print.lineno = position
_print.col_offset = position
_print.end_lineno = position
_print.end_col_offset = position
ast.fix_missing_locations(_print)
else:
copy_locations(_print, node)
node.body.insert(position, _print)
if not printed_used:
self.warn(node, "Prints, but never reads 'printed' variable.")
elif not print_used:
self.warn(node, "Doesn't print, but reads 'printed' variable.")
# Special Functions for an ast.NodeTransformer
def generic_visit(self, node):
"""Reject ast nodes which do not have a corresponding `visit_` method.
This is needed to prevent new ast nodes from new Python versions to be
trusted before any security review.
To access `generic_visit` on the super class use `node_contents_visit`.
"""
self.warn(
node,
'{0.__class__.__name__}'
' statement is not known to RestrictedPython'.format(node)
)
self.not_allowed(node)
def not_allowed(self, node):
self.error(
node,
f'{node.__class__.__name__} statements are not allowed.')
def node_contents_visit(self, node):
"""Visit the contents of a node."""
return super().generic_visit(node)
# ast for Literals
def visit_Constant(self, node):
"""Allow constant literals with restriction for Ellipsis.
Constant replaces Num, Str, Bytes, NameConstant and Ellipsis in
Python 3.8+.
:see: https://docs.python.org/dev/whatsnew/3.8.html#deprecated
"""
if node.value is Ellipsis:
# Deny using `...`.
# Special handling necessary as ``self.not_allowed(node)``
# would return the Error Message:
# 'Constant statements are not allowed.'
# which is only partial true.
self.error(node, 'Ellipsis statements are not allowed.')
return
return self.node_contents_visit(node)
def visit_Interactive(self, node):
"""Allow single mode without restrictions."""
return self.node_contents_visit(node)
def visit_List(self, node):
"""Allow list literals without restrictions."""
return self.node_contents_visit(node)
def visit_Tuple(self, node):
"""Allow tuple literals without restrictions."""
return self.node_contents_visit(node)
def visit_Set(self, node):
"""Allow set literals without restrictions."""
return self.node_contents_visit(node)
def visit_Dict(self, node):
"""Allow dict literals without restrictions."""
return self.node_contents_visit(node)
def visit_FormattedValue(self, node):
"""Allow f-strings without restrictions."""
return self.node_contents_visit(node)
def visit_JoinedStr(self, node):
"""Allow joined string without restrictions."""
return self.node_contents_visit(node)
# ast for Variables
def visit_Name(self, node):
"""Prevents access to protected names.
Converts use of the name 'printed' to this expression: '_print()'
"""
node = self.node_contents_visit(node)
if isinstance(node.ctx, ast.Load):
if node.id == 'printed':
self.print_info.printed_used = True
new_node = ast.Call(
func=ast.Name("_print", ast.Load()),
args=[],
keywords=[])
copy_locations(new_node, node)
return new_node
elif node.id == 'print':
self.print_info.print_used = True
new_node = ast.Attribute(
value=ast.Name('_print', ast.Load()),
attr="_call_print",
ctx=ast.Load())
copy_locations(new_node, node)
return new_node
self.used_names[node.id] = True
self.check_name(node, node.id)
return node
def visit_Load(self, node):
"""
"""
return self.node_contents_visit(node)
def visit_Store(self, node):
"""
"""
return self.node_contents_visit(node)
def visit_Del(self, node):
"""
"""
return self.node_contents_visit(node)
def visit_Starred(self, node):
"""
"""
return self.node_contents_visit(node)
# Expressions
def visit_Expression(self, node):
"""Allow Expression statements without restrictions.
They are in the AST when using the `eval` compile mode.
"""
return self.node_contents_visit(node)
def visit_Expr(self, node):
"""Allow Expr statements (any expression) without restrictions."""
return self.node_contents_visit(node)
def visit_UnaryOp(self, node):
"""
UnaryOp (Unary Operations) is the overall element for:
* Not --> which should be allowed
* UAdd --> Positive notation of variables (e.g. +var)
* USub --> Negative notation of variables (e.g. -var)
"""
return self.node_contents_visit(node)
def visit_UAdd(self, node):
"""Allow positive notation of variables. (e.g. +var)"""
return self.node_contents_visit(node)
def visit_USub(self, node):
"""Allow negative notation of variables. (e.g. -var)"""
return self.node_contents_visit(node)
def visit_Not(self, node):
"""Allow the `not` operator."""
return self.node_contents_visit(node)
def visit_Invert(self, node):
"""Allow `~` expressions."""
return self.node_contents_visit(node)
def visit_BinOp(self, node):
"""Allow binary operations."""
return self.node_contents_visit(node)
def visit_Add(self, node):
"""Allow `+` expressions."""
return self.node_contents_visit(node)
def visit_Sub(self, node):
"""Allow `-` expressions."""
return self.node_contents_visit(node)
def visit_Mult(self, node):
"""Allow `*` expressions."""
return self.node_contents_visit(node)
def visit_Div(self, node):
"""Allow `/` expressions."""
return self.node_contents_visit(node)
def visit_FloorDiv(self, node):
"""Allow `//` expressions."""
return self.node_contents_visit(node)
def visit_Mod(self, node):
"""Allow `%` expressions."""
return self.node_contents_visit(node)
def visit_Pow(self, node):
"""Allow `**` expressions."""
return self.node_contents_visit(node)
def visit_LShift(self, node):
"""Allow `<<` expressions."""
return self.node_contents_visit(node)
def visit_RShift(self, node):
"""Allow `>>` expressions."""
return self.node_contents_visit(node)
def visit_BitOr(self, node):
"""Allow `|` expressions."""
return self.node_contents_visit(node)
def visit_BitXor(self, node):
"""Allow `^` expressions."""
return self.node_contents_visit(node)
def visit_BitAnd(self, node):
"""Allow `&` expressions."""
return self.node_contents_visit(node)
def visit_MatMult(self, node):
"""Allow multiplication (`@`)."""
return self.node_contents_visit(node)
def visit_BoolOp(self, node):
"""Allow bool operator without restrictions."""
return self.node_contents_visit(node)
def visit_And(self, node):
"""Allow bool operator `and` without restrictions."""
return self.node_contents_visit(node)
def visit_Or(self, node):
"""Allow bool operator `or` without restrictions."""
return self.node_contents_visit(node)
def visit_Compare(self, node):
"""Allow comparison expressions without restrictions."""
return self.node_contents_visit(node)
def visit_Eq(self, node):
"""Allow == expressions."""
return self.node_contents_visit(node)
def visit_NotEq(self, node):
"""Allow != expressions."""
return self.node_contents_visit(node)
def visit_Lt(self, node):
"""Allow < expressions."""
return self.node_contents_visit(node)
def visit_LtE(self, node):
"""Allow <= expressions."""
return self.node_contents_visit(node)
def visit_Gt(self, node):
"""Allow > expressions."""
return self.node_contents_visit(node)
def visit_GtE(self, node):
"""Allow >= expressions."""
return self.node_contents_visit(node)
def visit_Is(self, node):
"""Allow `is` expressions."""
return self.node_contents_visit(node)
def visit_IsNot(self, node):
"""Allow `is not` expressions."""
return self.node_contents_visit(node)
def visit_In(self, node):
"""Allow `in` expressions."""
return self.node_contents_visit(node)
def visit_NotIn(self, node):
"""Allow `not in` expressions."""
return self.node_contents_visit(node)
def visit_Call(self, node):
"""Checks calls with '*args' and '**kwargs'.
Note: The following happens only if '*args' or '**kwargs' is used.
Transfroms 'foo(<all the possible ways of args>)' into
_apply_(foo, <all the possible ways for args>)
The thing is that '_apply_' has only '*args', '**kwargs', so it gets
Python to collapse all the myriad ways to call functions
into one manageable from.
From there, '_apply_()' wraps args and kws in guarded accessors,
then calls the function, returning the value.
"""
if isinstance(node.func, ast.Name):
if node.func.id == 'exec':
self.error(node, 'Exec calls are not allowed.')
elif node.func.id == 'eval':
self.error(node, 'Eval calls are not allowed.')
needs_wrap = False
for pos_arg in node.args:
if isinstance(pos_arg, ast.Starred):
needs_wrap = True
for keyword_arg in node.keywords:
if keyword_arg.arg is None:
needs_wrap = True
node = self.node_contents_visit(node)
if not needs_wrap:
return node
node.args.insert(0, node.func)
node.func = ast.Name('_apply_', ast.Load())
copy_locations(node.func, node.args[0])
return node
def visit_keyword(self, node):
"""
"""
return self.node_contents_visit(node)
def visit_IfExp(self, node):
"""Allow `if` expressions without restrictions."""
return self.node_contents_visit(node)
def visit_Attribute(self, node):
"""Checks and mutates attribute access/assignment.
'a.b' becomes '_getattr_(a, "b")'
'a.b = c' becomes '_write_(a).b = c'
'del a.b' becomes 'del _write_(a).b'
The _write_ function should return a security proxy.
"""
if node.attr.startswith('_') and node.attr != '_':
self.error(
node,
'"{name}" is an invalid attribute name because it starts '
'with "_".'.format(name=node.attr))
if node.attr.endswith('__roles__'):
self.error(
node,
'"{name}" is an invalid attribute name because it ends '
'with "__roles__".'.format(name=node.attr))
if node.attr in INSPECT_ATTRIBUTES:
self.error(
node,
f'"{node.attr}" is a restricted name,'
' that is forbidden to access in RestrictedPython.',
)
if isinstance(node.ctx, ast.Load):
node = self.node_contents_visit(node)
new_node = ast.Call(
func=ast.Name('_getattr_', ast.Load()),
args=[node.value, ast.Constant(node.attr)],
keywords=[])
copy_locations(new_node, node)
return new_node
elif isinstance(node.ctx, (ast.Store, ast.Del)):
node = self.node_contents_visit(node)
new_value = ast.Call(
func=ast.Name('_write_', ast.Load()),
args=[node.value],
keywords=[])
copy_locations(new_value, node.value)
node.value = new_value
return node
else: # pragma: no cover
# Impossible Case only ctx Load, Store and Del are defined in ast.
raise NotImplementedError(
f"Unknown ctx type: {type(node.ctx)}")
# Subscripting
def visit_Subscript(self, node):
"""Transforms all kinds of subscripts.
'foo[bar]' becomes '_getitem_(foo, bar)'
'foo[:ab]' becomes '_getitem_(foo, slice(None, ab, None))'
'foo[ab:]' becomes '_getitem_(foo, slice(ab, None, None))'
'foo[a:b]' becomes '_getitem_(foo, slice(a, b, None))'
'foo[a:b:c]' becomes '_getitem_(foo, slice(a, b, c))'
'foo[a, b:c] becomes '_getitem_(foo, (a, slice(b, c, None)))'
'foo[a] = c' becomes '_write_(foo)[a] = c'
'del foo[a]' becomes 'del _write_(foo)[a]'
The _write_ function should return a security proxy.
"""
node = self.node_contents_visit(node)
# 'AugStore' and 'AugLoad' are defined in 'Python.asdl' as possible
# 'expr_context'. However, according to Python/ast.c
# they are NOT used by the implementation => No need to worry here.
# Instead ast.c creates 'AugAssign' nodes, which can be visited.
if isinstance(node.ctx, ast.Load):
new_node = ast.Call(
func=ast.Name('_getitem_', ast.Load()),
args=[node.value, self.transform_slice(node.slice)],
keywords=[])
copy_locations(new_node, node)
return new_node
elif isinstance(node.ctx, (ast.Del, ast.Store)):
new_value = ast.Call(
func=ast.Name('_write_', ast.Load()),
args=[node.value],
keywords=[])
copy_locations(new_value, node)
node.value = new_value
return node
else: # pragma: no cover
# Impossible Case only ctx Load, Store and Del are defined in ast.
raise NotImplementedError(
f"Unknown ctx type: {type(node.ctx)}")
def visit_Index(self, node):
"""
"""
return self.node_contents_visit(node)
def visit_Slice(self, node):
"""
"""
return self.node_contents_visit(node)
def visit_ExtSlice(self, node):
"""
"""
return self.node_contents_visit(node)
# Comprehensions
def visit_ListComp(self, node):
"""
"""
return self.node_contents_visit(node)
def visit_SetComp(self, node):
"""
"""
return self.node_contents_visit(node)
def visit_GeneratorExp(self, node):
"""
"""
return self.node_contents_visit(node)
def visit_DictComp(self, node):
"""
"""
return self.node_contents_visit(node)
def visit_comprehension(self, node):
"""
"""
return self.guard_iter(node)
# Statements
def visit_Assign(self, node):
"""
"""
node = self.node_contents_visit(node)
if not any(isinstance(t, ast.Tuple) for t in node.targets):
return node
# Handle sequence unpacking.
# For briefness this example omits cleanup of the temporary variables.
# Check 'transform_tuple_assign' how its done.
#
# - Single target (with nested support)
# (a, (b, (c, d))) = <exp>
# is converted to
# (a, t1) = _getiter_(<exp>)
# (b, t2) = _getiter_(t1)
# (c, d) = _getiter_(t2)
#
# - Multi targets
# (a, b) = (c, d) = <exp>
# is converted to
# (c, d) = _getiter_(<exp>)
# (a, b) = _getiter_(<exp>)
# Why is this valid ? The original bytecode for this multi targets
# behaves the same way.
# ast.NodeTransformer works with list results.
# He injects it at the right place of the node's parent statements.
new_nodes = []
# python fills the right most target first.
for target in reversed(node.targets):
if isinstance(target, ast.Tuple):
wrapper = ast.Assign(
targets=[target],
value=self.protect_unpack_sequence(target, node.value))
new_nodes.append(wrapper)
else:
new_node = ast.Assign(targets=[target], value=node.value)
new_nodes.append(new_node)
for new_node in new_nodes:
copy_locations(new_node, node)
return new_nodes
def visit_AugAssign(self, node):
"""Forbid certain kinds of AugAssign
According to the language reference (and ast.c) the following nodes
are are possible:
Name, Attribute, Subscript
Note that although augmented assignment of attributes and
subscripts is disallowed, augmented assignment of names (such
as 'n += 1') is allowed.
'n += 1' becomes 'n = _inplacevar_("+=", n, 1)'
"""
node = self.node_contents_visit(node)
if isinstance(node.target, ast.Attribute):
self.error(
node,
"Augmented assignment of attributes is not allowed.")
return node
elif isinstance(node.target, ast.Subscript):
self.error(
node,
"Augmented assignment of object items "
"and slices is not allowed.")
return node
elif isinstance(node.target, ast.Name):
new_node = ast.Assign(
targets=[node.target],
value=ast.Call(
func=ast.Name('_inplacevar_', ast.Load()),
args=[
ast.Constant(IOPERATOR_TO_STR[type(node.op)]),
ast.Name(node.target.id, ast.Load()),
node.value
],
keywords=[]))
copy_locations(new_node, node)
return new_node
else: # pragma: no cover
# Impossible Case - Only Node Types:
# * Name
# * Attribute
# * Subscript
# defined, those are checked before.
raise NotImplementedError(
f"Unknown target type: {type(node.target)}")
def visit_Raise(self, node):
"""Allow `raise` statements without restrictions."""
return self.node_contents_visit(node)
def visit_Assert(self, node):
"""Allow assert statements without restrictions."""
return self.node_contents_visit(node)
def visit_Delete(self, node):
"""Allow `del` statements without restrictions."""
return self.node_contents_visit(node)
def visit_Pass(self, node):
"""Allow `pass` statements without restrictions."""
return self.node_contents_visit(node)
# Imports
def visit_Import(self, node):
"""Allow `import` statements with restrictions.
See check_import_names."""
return self.check_import_names(node)
def visit_ImportFrom(self, node):
"""Allow `import from` statements with restrictions.
See check_import_names."""
return self.check_import_names(node)
def visit_alias(self, node):
"""Allow `as` statements in import and import from statements."""
return self.node_contents_visit(node)
# Control flow
def visit_If(self, node):
"""Allow `if` statements without restrictions."""
return self.node_contents_visit(node)
def visit_For(self, node):
"""Allow `for` statements with some restrictions."""
return self.guard_iter(node)
def visit_While(self, node):
"""Allow `while` statements."""
return self.node_contents_visit(node)
def visit_Break(self, node):
"""Allow `break` statements without restrictions."""
return self.node_contents_visit(node)
def visit_Continue(self, node):
"""Allow `continue` statements without restrictions."""
return self.node_contents_visit(node)
def visit_Try(self, node):
"""Allow `try` without restrictions."""
return self.node_contents_visit(node)
def visit_TryStar(self, node):
"""Disallow `ExceptionGroup` due to a potential sandbox escape."""
self.not_allowed(node)
def visit_ExceptHandler(self, node):
"""Protect exception handlers."""
node = self.node_contents_visit(node)
self.check_name(node, node.name)
return node
def visit_With(self, node):
"""Protect tuple unpacking on with statements."""
node = self.node_contents_visit(node)
for item in reversed(node.items):
if isinstance(item.optional_vars, ast.Tuple):
tmp_target, unpack = self.gen_unpack_wrapper(
node,
item.optional_vars)
item.optional_vars = tmp_target
node.body.insert(0, unpack)
return node
def visit_withitem(self, node):
"""Allow `with` statements (context managers) without restrictions."""
return self.node_contents_visit(node)
# Function and class definitions
def visit_FunctionDef(self, node):
"""Allow function definitions (`def`) with some restrictions."""
self.check_name(node, node.name, allow_magic_methods=True)
self.check_function_argument_names(node)
with self.print_info.new_print_scope():
node = self.node_contents_visit(node)
self.inject_print_collector(node)
return node
def visit_Lambda(self, node):
"""Allow lambda with some restrictions."""
self.check_function_argument_names(node)
return self.node_contents_visit(node)
def visit_arguments(self, node):
"""
"""
return self.node_contents_visit(node)
def visit_arg(self, node):
"""
"""
return self.node_contents_visit(node)
def visit_Return(self, node):
"""Allow `return` statements without restrictions."""
return self.node_contents_visit(node)
def visit_Yield(self, node):
"""Allow `yield`statements without restrictions."""
return self.node_contents_visit(node)
def visit_YieldFrom(self, node):
"""Allow `yield`statements without restrictions."""
return self.node_contents_visit(node)
def visit_Global(self, node):
"""Allow `global` statements without restrictions."""
return self.node_contents_visit(node)
def visit_Nonlocal(self, node):
"""Deny `nonlocal` statements."""
self.not_allowed(node)
def visit_ClassDef(self, node):
"""Check the name of a class definition."""
self.check_name(node, node.name)
node = self.node_contents_visit(node)
if any(keyword.arg == 'metaclass' for keyword in node.keywords):
self.error(
node, 'The keyword argument "metaclass" is not allowed.')
CLASS_DEF = textwrap.dedent('''\
class {0.name}(metaclass=__metaclass__):
pass
'''.format(node))
new_class_node = ast.parse(CLASS_DEF).body[0]
new_class_node.body = node.body
new_class_node.bases = node.bases
new_class_node.decorator_list = node.decorator_list
return new_class_node
def visit_Module(self, node):
"""Add the print_collector (only if print is used) at the top."""
node = self.node_contents_visit(node)
# Inject the print collector after 'from __future__ import ....'
position = 0
for position, child in enumerate(node.body): # pragma: no branch
if not isinstance(child, ast.ImportFrom):
break
if not child.module == '__future__':
break
self.inject_print_collector(node, position)
return node
# Async und await
def visit_AsyncFunctionDef(self, node):
"""Deny async functions."""
self.not_allowed(node)
def visit_Await(self, node):
"""Deny async functionality."""
self.not_allowed(node)
def visit_AsyncFor(self, node):
"""Deny async functionality."""
self.not_allowed(node)
def visit_AsyncWith(self, node):
"""Deny async functionality."""
self.not_allowed(node)
# Assignment expressions (walrus operator ``:=``)
# New in 3.8
def visit_NamedExpr(self, node):
"""Allow assignment expressions under some circumstances."""
# while the grammar requires ``node.target`` to be a ``Name``
# the abstract syntax is more permissive and allows an ``expr``.
# We support only a ``Name``.
# This is safe as the expression can only add/modify local
# variables. While this may hide global variables, an
# (implicitly performed) name check guarantees (as usual)
# that no essential global variable is hidden.
node = self.node_contents_visit(node) # this checks ``node.target``
target = node.target
if not isinstance(target, ast.Name):
self.error(
node,
"Assignment expressions are only allowed for simple targets")
return node