from __future__ import absolute_import
import ast
import atexit
import inspect
import linecache
import os
import pdb
import re
import sys
import tokenize
from distutils.sysconfig import get_python_lib
from functools import partial
from itertools import chain
from colorama import AnsiToWin32
from colorama import Back
from colorama import Fore
from colorama import Style
from fields import Fields
from six import string_types
__version__ = "0.6.0"
__all__ = 'Q', 'When', 'And', 'Or', 'CodePrinter', 'Debugger', 'VarsPrinter', 'trace', 'stop'
DEFAULT_MIN_FILENAME_ALIGNMENT = 40
NO_COLORS = {
'reset': '',
'filename': '',
'colon': '',
'lineno': '',
'kind': '',
'continuation': '',
'return': '',
'exception': '',
'detail': '',
'vars': '',
'vars-name': '',
'call': '',
'line': '',
'internal-failure': '',
'internal-detail': '',
'source-failure': '',
'source-detail': '',
}
EVENT_COLORS = {
'reset': Style.RESET_ALL,
'filename': '',
'colon': Fore.BLACK + Style.BRIGHT,
'lineno': Style.RESET_ALL,
'kind': Fore.CYAN,
'continuation': Fore.BLUE,
'return': Style.BRIGHT + Fore.GREEN,
'exception': Style.BRIGHT + Fore.RED,
'detail': Style.NORMAL,
'vars': Style.RESET_ALL + Fore.MAGENTA,
'vars-name': Style.BRIGHT,
'internal-failure': Back.RED + Style.BRIGHT + Fore.RED,
'internal-detail': Fore.WHITE,
'source-failure': Style.BRIGHT + Back.YELLOW + Fore.YELLOW,
'source-detail': Fore.WHITE,
}
CODE_COLORS = {
'call': Fore.RESET + Style.BRIGHT,
'line': Fore.RESET,
'return': Fore.YELLOW,
'exception': Fore.RED,
}
SITE_PACKAGES_PATH = get_python_lib()
SYS_PREFIX_PATHS = (
sys.prefix,
sys.exec_prefix
)
def tryadd(where, src, what):
if hasattr(src, what):
where += getattr(src, what),
tryadd(SYS_PREFIX_PATHS, sys, 'real_prefix')
tryadd(SYS_PREFIX_PATHS, sys, 'real_exec_prefix')
tryadd(SYS_PREFIX_PATHS, sys, 'base_prefix')
tryadd(SYS_PREFIX_PATHS, sys, 'base_exec_prefix')
class Tracer(object):
"""
Trace object.
"""
def __init__(self):
self._handler = None
self._previous_tracer = None
def __str__(self):
return "Tracer(_handler={}, _previous_tracer={})".format(
"<not started>" if self._handler is None else self._handler,
self._previous_tracer,
)
def __call__(self, frame, kind, arg):
"""
The settrace function.
.. note::
This always returns self (drills down) - as opposed to only drilling down when predicate(event) is True because it might
match further inside.
"""
if self._handler is None:
raise RuntimeError("Tracer is not started.")
self._handler(Event(frame, kind, arg, self))
if self._previous_tracer:
self._previous_tracer(frame, kind, arg)
return self
def trace(self, *predicates, **options):
"""
Starts tracing. Can be used as a context manager (with slightly incorrect semantics - it starts tracing before ``__enter__`` is
called).
Args:
predicates (:class:`hunter.Q` instances): Runs actions if any of the given predicates match.
options: Keyword arguments that are passed to :class:`hunter.Q`, for convenience.
"""
if "action" not in options and "actions" not in options:
options["action"] = CodePrinter
merge = options.pop("merge", True)
clear_env_var = options.pop("clear_env_var", False)
predicate = Q(*predicates, **options)
if clear_env_var:
os.environ.pop("PYTHONHUNTER", None)
previous_tracer = sys.gettrace()
if previous_tracer is self:
if merge:
self._handler |= predicate
else:
self._previous_tracer = previous_tracer
self._handler = predicate
sys.settrace(self)
return self
def stop(self):
"""
Stop tracing. Restores previous tracer (if any).
"""
sys.settrace(self._previous_tracer)
self._previous_tracer = None
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.stop()
_tracer = Tracer()
trace = _tracer.trace
stop = atexit.register(_tracer.stop)
class _CachedProperty(object):
def __init__(self, func):
self.func = func
self.__doc__ = func.__doc__
def __get__(self, obj, cls):
if obj is None:
return self
value = obj.__dict__[self.func.__name__] = self.func(obj)
return value
[docs]class Event(Fields.kind.function.module.filename):
"""
Event wrapper for ``frame, kind, arg`` (the arguments the settrace function gets).
Provides few convenience properties.
"""
frame = None
kind = None
arg = None
tracer = None
def __init__(self, frame, kind, arg, tracer):
self.frame = frame
self.kind = kind
self.arg = arg
self.tracer = tracer
@_CachedProperty
[docs] def locals(self):
"""
A dict with local variables.
"""
return self.frame.f_locals
@_CachedProperty
[docs] def globals(self):
"""
A dict with global variables.
"""
return self.frame.f_globals
@_CachedProperty
[docs] def function(self):
"""
A string with function name.
"""
return self.code.co_name
@_CachedProperty
[docs] def module(self):
"""
A string with module name (eg: ``"foo.bar"``).
"""
module = self.frame.f_globals.get('__name__', '')
if module is None:
module = ''
return module
@_CachedProperty
[docs] def filename(self):
"""
A string with absolute path to file.
"""
filename = self.frame.f_globals.get('__file__', '')
if filename is None:
filename = ''
if filename.endswith(('.pyc', '.pyo')):
filename = filename[:-1]
elif filename.endswith('$py.class'): # Jython
filename = filename[:-9] + ".py"
return filename
@_CachedProperty
[docs] def lineno(self):
"""
An integer with line number in file.
"""
return self.frame.f_lineno
@_CachedProperty
[docs] def code(self):
"""
A code object (not a string).
"""
return self.frame.f_code
@_CachedProperty
[docs] def stdlib(self):
"""
A boolean flag. ``True`` if frame is in stdlib.
"""
if self.filename.startswith(SITE_PACKAGES_PATH):
# if it's in site-packages then its definitely not stdlib
return False
if self.filename.startswith(SYS_PREFIX_PATHS):
return True
@_CachedProperty
[docs] def fullsource(self, getlines=linecache.getlines):
"""
A string with the sourcecode for the current statement (from ``linecache`` - failures are ignored).
May include multiple lines if it's a class/function definition (will include decorators).
"""
try:
return self._raw_fullsource
except Exception as exc:
return "??? NO SOURCE: {!r}".format(exc)
@_CachedProperty
[docs] def source(self, getline=linecache.getline):
"""
A string with the sourcecode for the current line (from ``linecache`` - failures are ignored).
Fast but sometimes incomplete.
"""
try:
return getline(self.filename, self.lineno)
except Exception as exc:
return "??? NO SOURCE: {!r}".format(exc)
@_CachedProperty
def _raw_fullsource(self, getlines=linecache.getlines, getline=linecache.getline):
if self.kind == 'call' and self.code.co_name != "<module>":
lines = []
try:
for _, token, _, _, line in tokenize.generate_tokens(partial(
next,
yield_lines(self.filename, self.lineno - 1, lines.append)
)):
if token in ("def", "class", "lambda"):
return ''.join(lines)
except tokenize.TokenError:
pass
return getline(self.filename, self.lineno)
__getitem__ = object.__getattribute__
def yield_lines(filename, start, collector,
limit=10,
getlines=linecache.getlines,
leading_whitespace_re=re.compile('(^[ \t]*)(?:[^ \t\n])', re.MULTILINE)):
dedent = None
amount = 0
for line in getlines(filename)[start:start + limit]:
if dedent is None:
dedent = leading_whitespace_re.findall(line)
dedent = dedent[0] if dedent else ""
amount = len(dedent)
elif not line.startswith(dedent):
break
collector(line)
yield line[amount:]
[docs]def Q(*predicates, **query):
"""
Handles situations where :class:`hunter.Query` objects (or other callables) are passed in as positional arguments.
Conveniently converts that to an :class:`hunter.Or` predicate.
"""
optional_actions = query.pop("actions", [])
if "action" in query:
optional_actions.append(query.pop("action"))
if predicates:
predicates = tuple(
p() if inspect.isclass(p) and issubclass(p, Action) else p
for p in predicates
)
if any(isinstance(p, CodePrinter) for p in predicates):
if CodePrinter in optional_actions:
optional_actions.remove(CodePrinter)
if query:
predicates += Query(**query),
result = Or(*predicates)
else:
result = Query(**query)
if optional_actions:
result = When(result, *optional_actions)
return result
[docs]class Query(Fields.query):
"""
A query class.
See :class:`hunter.Event` for fields that can be filtered on.
"""
query = ()
allowed = tuple(i for i in Event.__dict__.keys() if not i.startswith('_'))
[docs] def __init__(self, **query):
"""
Args:
query: criteria to match on.
Accepted arguments: ``arg``, ``code``, ``filename``, ``frame``, ``fullsource``, ``function``,
``globals``, ``kind``, ``lineno``, ``locals``, ``module``, ``source``, ``stdlib``, ``tracer``.
"""
for key in query:
if key not in self.allowed:
raise TypeError("Unexpected argument {!r}. Must be one of {}.".format(key, self.allowed))
self.query = query
def __repr__(self):
return "Query({})".format(
', '.join("{}={!r}".format(*item) for item in self.query.items()),
)
[docs] def __call__(self, event):
"""
Handles event. Returns True if all criteria matched.
"""
for key, value in self.query.items():
if event[key] != value:
return
return True
[docs] def __or__(self, other):
"""
Convenience API so you can do ``Q() | Q()``. It converts that to ``Or(Q(), Q())``.
"""
return Or(self, other)
[docs] def __and__(self, other):
"""
Convenience API so you can do ``Q() & Q()``. It converts that to ``And(Q(), Q())``.
"""
return And(self, other)
[docs]class When(Fields.condition.actions):
"""
Runs ``actions`` when ``condition(event)`` is ``True``.
Actions take a single ``event`` argument.
"""
def __init__(self, condition, *actions):
if not actions:
raise TypeError("Must give at least one action.")
super(When, self).__init__(condition, [
action() if inspect.isclass(action) and issubclass(action, Action) else action
for action in actions
])
[docs] def __call__(self, event):
"""
Handles the event.
"""
if self.condition(event):
for action in self.actions:
action(event)
return True
def __or__(self, other):
return Or(self, other)
def __and__(self, other):
return And(self, other)
def _with_metaclass(meta, *bases):
"""Create a base class with a metaclass."""
# This requires a bit of explanation: the basic idea is to make a dummy
# metaclass for one level of class instantiation that replaces itself with
# the actual metaclass.
class metaclass(meta):
def __new__(cls, name, this_bases, d):
return meta(name, bases, d)
return type.__new__(metaclass, 'temporary_class', (), {})
class _UnwrapSingleArgumentMetaclass(type):
def __call__(cls, predicate, *predicates):
if not predicates:
return predicate
else:
all_predicates = []
for p in chain((predicate,), predicates):
if isinstance(p, cls):
all_predicates.extend(p.predicates)
else:
all_predicates.append(p)
return super(_UnwrapSingleArgumentMetaclass, cls).__call__(*all_predicates)
[docs]class And(_with_metaclass(_UnwrapSingleArgumentMetaclass, ~Fields.predicates)):
"""
`And` predicate. Exits at the first sub-predicate that returns ``False``.
"""
def __init__(self, *predicates):
self.predicates = predicates
def __str__(self):
return "And({})".format(', '.join(str(p) for p in self.predicates))
[docs] def __call__(self, event):
"""
Handles the event.
"""
for predicate in self.predicates:
if not predicate(event):
return
return True
def __or__(self, other):
return Or(self, other)
def __and__(self, other):
return And(*chain(self.predicates, other.predicates if isinstance(other, And) else (other,)))
[docs]class Or(_with_metaclass(_UnwrapSingleArgumentMetaclass, ~Fields.predicates)):
"""
`Or` predicate. Exits at first sub-predicate that returns ``True``.
"""
def __init__(self, *predicates):
self.predicates = predicates
[docs] def __call__(self, event):
"""
Handles the event.
"""
for predicate in self.predicates:
if predicate(event):
return True
def __or__(self, other):
return Or(*chain(self.predicates, other.predicates if isinstance(other, Or) else (other,)))
def __and__(self, other):
return And(self, other)
class Action(object):
def __call__(self, event):
raise NotImplementedError()
[docs]class Debugger(Fields.klass.kwargs, Action):
"""
An action that starts ``pdb``.
"""
def __init__(self, klass=pdb.Pdb, **kwargs):
self.klass = klass
self.kwargs = kwargs
[docs] def __call__(self, event):
"""
Runs a ``pdb.set_trace`` at the matching frame.
"""
self.klass(**self.kwargs).set_trace(event.frame)
class ColorStreamAction(Action):
_stream_cache = {}
_stream = None
_tty = None
default_stream = sys.stderr
force_colors = False
@property
def stream(self):
return self._stream
@stream.setter
def stream(self, value):
if isinstance(value, string_types):
if value in self._stream_cache:
value = self._stream_cache[value]
else:
value = self._stream_cache[value] = open(value, 'a', buffering=0)
isatty = getattr(value, 'isatty', None)
if self.force_colors or (isatty and isatty() and os.name != 'java'):
self._stream = AnsiToWin32(value)
self._tty = True
self.event_colors = EVENT_COLORS
self.code_colors = CODE_COLORS
else:
self._tty = False
self._stream = value
self.event_colors = NO_COLORS
self.code_colors = NO_COLORS
def _safe_repr(self, obj):
try:
return repr(obj)
except Exception as exc:
return "{internal-failure}!!! FAILED REPR: {internal-detail}{!r}".format(exc, **self.event_colors)
[docs]class CodePrinter(Fields.stream.filename_alignment, ColorStreamAction):
"""
An action that just prints the code being executed.
Args:
stream (file-like): Stream to write to. Default: ``sys.stderr``.
filename_alignment (int): Default size for the filename column (files are right-aligned). Default: ``40``.
"""
def __init__(self,
stream=ColorStreamAction.default_stream, force_colors=False,
filename_alignment=DEFAULT_MIN_FILENAME_ALIGNMENT):
self.stream = stream
self.force_colors = force_colors
self.filename_alignment = max(5, filename_alignment)
def _safe_source(self, event):
try:
lines = event._raw_fullsource.rstrip().splitlines()
if not lines:
raise RuntimeError("Source code string is empty.")
return lines
except Exception as exc:
return "{source-failure}??? NO SOURCE: {source-detail}{!r}".format(exc, **self.event_colors),
[docs] def __call__(self, event, sep=os.path.sep, join=os.path.join):
"""
Handle event and print filename, line number and source code. If event.kind is a `return` or `exception` also prints values.
"""
filename = event.filename or "<???>"
if len(filename) > self.filename_alignment:
filename = '[...]{}'.format(filename[5 - self.filename_alignment:])
# context = event.tracer
# alignment = context.filename_alignment = max(
# getattr(context, 'filename_alignment', 5),
# len(filename)
# )
lines = self._safe_source(event)
self.stream.write("{filename}{:>{align}}{colon}:{lineno}{:<5} {kind}{:9} {code}{}{reset}\n".format(
filename,
event.lineno,
event.kind,
lines[0],
align=self.filename_alignment,
code=self.code_colors[event.kind],
**self.event_colors
))
for line in lines[1:]:
self.stream.write("{:>{align}} {kind}{:9} {code}{}{reset}\n".format(
"",
r" |",
line,
align=self.filename_alignment,
code=self.code_colors[event.kind],
**self.event_colors
))
if event.kind in ('return', 'exception'):
self.stream.write("{:>{align}} {continuation}{:9} {color}{} value: {detail}{}{reset}\n".format(
"",
"...",
event.kind,
self._safe_repr(event.arg),
align=self.filename_alignment,
color=self.event_colors[event.kind],
**self.event_colors
))
[docs]class VarsPrinter(Fields.names.globals.stream.filename_alignment, ColorStreamAction):
"""
An action that prints local variables and optionally global variables visible from the current executing frame.
Args:
*names (strings):
Names to evaluate. Expressions can be used (will only try to evaluate if all the variables are present on the frame.
stream (file-like): Stream to write to. Default: ``sys.stderr``.
filename_alignment (int): Default size for the filaneme column (files are right-aligned). Default: ``40``.
globals (bool): Allow access to globals. Default: ``False`` (only looks at locals).
"""
def __init__(self, *names, **options):
if not names:
raise TypeError("Must give at least one name/expression.")
self.stream = options.pop('stream', self.default_stream)
self.force_colors = options.pop('force_colors', False)
self.filename_alignment = max(5, options.pop('filename_alignment', DEFAULT_MIN_FILENAME_ALIGNMENT))
self.names = {
name: set(self._iter_symbols(name))
for name in names
}
self.globals = options.pop('globals', False)
@staticmethod
[docs] def _iter_symbols(code):
"""
Iterate all the variable names in the given expression.
Example:
* ``self.foobar`` yields ``self``
* ``self[foobar]`` yields `self`` and ``foobar``
"""
for node in ast.walk(ast.parse(code)):
if isinstance(node, ast.Name):
yield node.id
[docs] def _safe_eval(self, code, event):
"""
Try to evaluate the given code on the given frame. If failure occurs, returns some ugly string with exception.
"""
try:
return eval(code, event.globals if self.globals else {}, event.locals)
except Exception as exc:
return "{internal-failure}FAILED EVAL: {internal-detail}{!r}".format(exc, **self.event_colors)
[docs] def __call__(self, event):
"""
Handle event and print the specified variables.
"""
first = True
frame_symbols = set(event.locals)
if self.globals:
frame_symbols |= set(event.globals)
for code, symbols in self.names.items():
try:
obj = eval(code, event.globals if self.globals else {}, event.locals)
except AttributeError:
continue
except Exception as exc:
printout = "{internal-failure}FAILED EVAL: {internal-detail}{!r}".format(exc, **self.event_colors)
else:
printout = self._safe_repr(obj)
if frame_symbols >= symbols:
self.stream.write("{:>{align}} {vars}{:9} {vars-name}{} {vars}=> {reset}{}{reset}\n".format(
"",
"vars" if first else "...",
code,
printout,
align=self.filename_alignment,
**self.event_colors
))
first = False