Source code for hunter.actions

from __future__ import absolute_import

import ast
import os
import pdb
import sys

from colorama import AnsiToWin32
from colorama import Back
from colorama import Fore
from colorama import Style
from six import string_types

from .util import Fields

NO_COLORS = {
    'reset': '',
    'filename': '',
    'colon': '',
    'lineno': '',
    'kind': '',
    'continuation': '',
    'return': '',
    'exception': '',
    'detail': '',
    'vars': '',
    'vars-name': '',
    'call': '',
    'line': '',
    'internal-failure': '',
    'internal-detail': '',
    'source-failure': '',
    'source-detail': '',
}
EVENT_COLORS = {
    'reset': Style.RESET_ALL,
    'normal': Style.NORMAL,
    'filename': '',
    'colon': Style.BRIGHT + Fore.BLACK,
    'lineno': Style.RESET_ALL,
    'kind': Fore.CYAN,
    'continuation': Style.BRIGHT + Fore.BLUE,
    'call': Style.BRIGHT + Fore.BLUE,
    'return': Style.BRIGHT + Fore.GREEN,
    'exception': Style.BRIGHT + Fore.RED,
    'detail': Style.NORMAL,
    'vars': Style.RESET_ALL + Fore.MAGENTA,
    'vars-name': Style.BRIGHT,
    'internal-failure': Style.BRIGHT + Back.RED + Fore.RED,
    'internal-detail': Fore.WHITE,
    'source-failure': Style.BRIGHT + Back.YELLOW + Fore.YELLOW,
    'source-detail': Fore.WHITE,
}
CODE_COLORS = {
    'call': Fore.RESET + Style.BRIGHT,
    'line': Fore.RESET,
    'return': Fore.YELLOW,
    'exception': Fore.RED,
}
MISSING = type('MISSING', (), {'__repr__': lambda _: '?'})()


class Action(object):
    def __call__(self, event):
        raise NotImplementedError()


[docs]class Debugger(Fields.klass.kwargs, Action): """ An action that starts ``pdb``. """ def __init__(self, klass=pdb.Pdb, **kwargs): self.klass = klass self.kwargs = kwargs
[docs] def __call__(self, event): """ Runs a ``pdb.set_trace`` at the matching frame. """ self.klass(**self.kwargs).set_trace(event.frame)
class ColorStreamAction(Fields.stream.force_colors.filename_alignment.repr_limit, Action): _stream_cache = {} _stream = None _tty = None def __init__(self, stream=sys.stderr, force_colors=False, filename_alignment=40, repr_limit=1024): self.force_colors = force_colors self.stream = stream self.filename_alignment = max(5, filename_alignment) self.repr_limit = repr_limit @property def stream(self): return self._stream @stream.setter def stream(self, value): if isinstance(value, string_types): if value in self._stream_cache: value = self._stream_cache[value] else: value = self._stream_cache[value] = open(value, 'a', buffering=0) isatty = getattr(value, 'isatty', None) if self.force_colors or (isatty and isatty() and os.name != 'java'): self._stream = AnsiToWin32(value, strip=False) self._tty = True self.event_colors = EVENT_COLORS self.code_colors = CODE_COLORS else: self._tty = False self._stream = value self.event_colors = NO_COLORS self.code_colors = NO_COLORS def _safe_repr(self, obj): limit = self.repr_limit try: s = repr(obj) s = s.replace('\n', r'\n') if len(s) > limit: cutoff = limit // 2 return "{} {continuation}[...]{reset} {}".format(s[:cutoff], s[-cutoff:], **self.event_colors) else: return s except Exception as exc: return "{internal-failure}!!! FAILED REPR: {internal-detail}{!r}{reset}".format(exc, **self.event_colors)
[docs]class CodePrinter(ColorStreamAction): """ An action that just prints the code being executed. Args: stream (file-like): Stream to write to. Default: ``sys.stderr``. filename_alignment (int): Default size for the filename column (files are right-aligned). Default: ``40``. force_colors (bool): Force coloring. Default: ``False``. repr_limit (bool): Limit length of ``repr()`` output. Default: ``512``. """ def _safe_source(self, event): try: lines = event._raw_fullsource.rstrip().splitlines() if lines: return lines else: return "{source-failure}??? NO SOURCE: {source-detail}" \ "Source code string for module {!r} is empty.".format(event.module, **self.event_colors), return lines except Exception as exc: return "{source-failure}??? NO SOURCE: {source-detail}{!r}".format(exc, **self.event_colors), def _format_filename(self, event): filename = event.filename or "<???>" if len(filename) > self.filename_alignment: filename = '[...]{}'.format(filename[5 - self.filename_alignment:]) return filename
[docs] def __call__(self, event, sep=os.path.sep, join=os.path.join): """ Handle event and print filename, line number and source code. If event.kind is a `return` or `exception` also prints values. """ # context = event.tracer # alignment = context.filename_alignment = max( # getattr(context, 'filename_alignment', 5), # len(filename) # ) lines = self._safe_source(event) self.stream.write("{filename}{:>{align}}{colon}:{lineno}{:<5} {kind}{:9} {code}{}{reset}\n".format( self._format_filename(event), event.lineno, event.kind, lines[0], align=self.filename_alignment, code=self.code_colors[event.kind], **self.event_colors )) for line in lines[1:]: self.stream.write("{:>{align}} {kind}{:9} {code}{}{reset}\n".format( "", r" |", line, align=self.filename_alignment, code=self.code_colors[event.kind], **self.event_colors )) if event.kind in ('return', 'exception'): self.stream.write("{:>{align}} {continuation}{:9} {color}{} value: {detail}{}{reset}\n".format( "", "...", event.kind, self._safe_repr(event.arg), align=self.filename_alignment, color=self.event_colors[event.kind], **self.event_colors ))
[docs]class CallPrinter(CodePrinter): """ An action that just prints the code being executed, but unlike :obj:`hunter.CodePrinter` it indents based on callstack depth and it also shows ``repr()`` of function arguments. Args: stream (file-like): Stream to write to. Default: ``sys.stderr``. filename_alignment (int): Default size for the filename column (files are right-aligned). Default: ``40``. force_colors (bool): Force coloring. Default: ``False``. repr_limit (bool): Limit length of ``repr()`` output. Default: ``512``. .. versionadded:: 1.2.0 .. note:: This will be the default action in `hunter 2.0`. """ def __init__(self, **options): super(CallPrinter, self).__init__(**options) self.stack = []
[docs] def __call__(self, event, sep=os.path.sep, join=os.path.join): """ Handle event and print filename, line number and source code. If event.kind is a `return` or `exception` also prints values. """ filename = self._format_filename(event) ident = event.module, event.function if event.kind == 'call': code = event.code self.stack.append(ident) self.stream.write( "{filename}{:>{align}}{colon}:{lineno}{:<5} {kind}{:9} {}{call}=>{normal} " "{}({}{call}{normal}){reset}\n".format( filename, event.lineno, event.kind, ' ' * (len(self.stack) - 1), event.function, ', '.join('{vars}{vars-name}{0}{vars}={reset}{1}'.format( var, self._safe_repr(event.locals.get(var, MISSING)), **self.event_colors ) for var in code.co_varnames[:code.co_argcount]), align=self.filename_alignment, **self.event_colors )) elif event.kind in ('return', 'exception'): self.stream.write("{filename}{:>{align}}{colon}:{lineno}{:<5} {kind}{:9} {code}{}{}{normal} {}: {reset}{}\n".format( filename, event.lineno, event.kind, ' ' * (len(self.stack) - 1), {'return': '<=', 'exception': '<!'}[event.kind], event.function, self._safe_repr(event.arg), align=self.filename_alignment, code=self.event_colors[event.kind], **self.event_colors )) if self.stack and self.stack[-1] == ident: self.stack.pop() else: self.stream.write("{filename}{:>{align}}{colon}:{lineno}{:<5} {kind}{:9} {reset}{}{}\n".format( filename, event.lineno, event.kind, ' ' * len(self.stack), event.source.strip(), align=self.filename_alignment, code=self.code_colors[event.kind], **self.event_colors ))
[docs]class VarsPrinter(Fields.names.globals.stream.filename_alignment, ColorStreamAction): """ An action that prints local variables and optionally global variables visible from the current executing frame. Args: *names (strings): Names to evaluate. Expressions can be used (will only try to evaluate if all the variables are present on the frame. globals (bool): Allow access to globals. Default: ``False`` (only looks at locals). stream (file-like): Stream to write to. Default: ``sys.stderr``. filename_alignment (int): Default size for the filename column (files are right-aligned). Default: ``40``. force_colors (bool): Force coloring. Default: ``False``. repr_limit (bool): Limit length of ``repr()`` output. Default: ``512``. .. note:: This is the default action. .. warning:: In `hunter 2.0` the default action will be :obj:`hunter.CallPrinter`. """ def __init__(self, *names, **options): if not names: raise TypeError("VarsPrinter requires at least one variable name/expression.") self.names = { name: set(self._iter_symbols(name)) for name in names } self.globals = options.pop('globals', False) super(VarsPrinter, self).__init__(**options) @staticmethod def _iter_symbols(code): """ Iterate all the variable names in the given expression. Example: * ``self.foobar`` yields ``self`` * ``self[foobar]`` yields `self`` and ``foobar`` """ for node in ast.walk(ast.parse(code)): if isinstance(node, ast.Name): yield node.id def _safe_eval(self, code, event): """ Try to evaluate the given code on the given frame. If failure occurs, returns some ugly string with exception. """ try: return eval(code, event.globals if self.globals else {}, event.locals) except Exception as exc: return "{internal-failure}FAILED EVAL: {internal-detail}{!r}".format(exc, **self.event_colors)
[docs] def __call__(self, event): """ Handle event and print the specified variables. """ first = True frame_symbols = set(event.locals) if self.globals: frame_symbols |= set(event.globals) for code, symbols in self.names.items(): try: obj = eval(code, event.globals if self.globals else {}, event.locals) except AttributeError: continue except Exception as exc: printout = "{internal-failure}FAILED EVAL: {internal-detail}{!r}".format(exc, **self.event_colors) else: printout = self._safe_repr(obj) if frame_symbols >= symbols: self.stream.write("{:>{align}} {vars}{:9} {vars-name}{} {vars}=> {reset}{}{reset}\n".format( "", "vars" if first else "...", code, printout, align=self.filename_alignment, **self.event_colors )) first = False