Source code for hunter.actions

from __future__ import absolute_import

import ast
import os
import sys
import threading
from collections import defaultdict
from itertools import chain

from colorama import AnsiToWin32
from colorama import Back
from colorama import Fore
from colorama import Style
from six import string_types

from .util import Fields

EVENT_COLORS = {
    'reset': Style.RESET_ALL,
    'normal': Style.NORMAL,
    'filename': '',
    'colon': Style.BRIGHT + Fore.BLACK,
    'lineno': Style.RESET_ALL,
    'kind': Fore.CYAN,
    'continuation': Style.BRIGHT + Fore.BLUE,
    'call': Style.BRIGHT + Fore.BLUE,
    'return': Style.BRIGHT + Fore.GREEN,
    'exception': Style.BRIGHT + Fore.RED,
    'detail': Style.NORMAL,
    'vars': Style.RESET_ALL + Fore.MAGENTA,
    'vars-name': Style.BRIGHT,
    'internal-failure': Style.BRIGHT + Back.RED + Fore.RED,
    'internal-detail': Fore.WHITE,
    'source-failure': Style.BRIGHT + Back.YELLOW + Fore.YELLOW,
    'source-detail': Fore.WHITE,
}
CODE_COLORS = {
    'call': Fore.RESET + Style.BRIGHT,
    'line': Fore.RESET,
    'return': Fore.YELLOW,
    'exception': Fore.RED,
}
NO_COLORS = {key: '' for key in chain(CODE_COLORS, EVENT_COLORS)}
MISSING = type('MISSING', (), {'__repr__': lambda _: '?'})()


class Action(object):
    def __call__(self, event):
        raise NotImplementedError()


[docs]class Debugger(Fields.klass.kwargs, Action): """ An action that starts ``pdb``. """ def __init__(self, klass=lambda **kwargs: __import__('pdb').Pdb(**kwargs), **kwargs): self.klass = klass self.kwargs = kwargs
[docs] def __call__(self, event): """ Runs a ``pdb.set_trace`` at the matching frame. """ self.klass(**self.kwargs).set_trace(event.frame)
class Manhole(Action): def __init__(self, **options): self.options = options def __call__(self, event): import manhole inst = manhole.install(strict=False, thread=False, **self.options) inst.handle_oneshot() DEFAULT_STREAM = sys.stderr class ColorStreamAction(Fields.stream.force_colors.filename_alignment.thread_alignment.repr_limit, Action): _stream_cache = {} _stream = None _tty = None def __init__(self, stream=None, force_colors=False, filename_alignment=40, thread_alignment=12, repr_limit=1024): self.force_colors = force_colors self.stream = DEFAULT_STREAM if stream is None else stream self.filename_alignment = filename_alignment self.thread_alignment = thread_alignment self.repr_limit = repr_limit @property def stream(self): return self._stream @stream.setter def stream(self, value): if isinstance(value, string_types): if value in self._stream_cache: value = self._stream_cache[value] else: value = self._stream_cache[value] = open(value, 'a', buffering=0) isatty = getattr(value, 'isatty', None) if self.force_colors or (isatty and isatty() and os.name != 'java'): self._stream = AnsiToWin32(value, strip=False) self._tty = True self.event_colors = EVENT_COLORS self.code_colors = CODE_COLORS else: self._tty = False self._stream = value self.event_colors = NO_COLORS self.code_colors = NO_COLORS def _safe_repr(self, obj): limit = self.repr_limit try: s = repr(obj) s = s.replace('\n', r'\n') if len(s) > limit: cutoff = limit // 2 return "{} {continuation}[...]{reset} {}".format(s[:cutoff], s[-cutoff:], **self.event_colors) else: return s except Exception as exc: return "{internal-failure}!!! FAILED REPR: {internal-detail}{!r}{reset}".format(exc, **self.event_colors)
[docs]class CodePrinter(ColorStreamAction): """ An action that just prints the code being executed. Args: stream (file-like): Stream to write to. Default: ``sys.stderr``. filename_alignment (int): Default size for the filename column (files are right-aligned). Default: ``40``. force_colors (bool): Force coloring. Default: ``False``. repr_limit (bool): Limit length of ``repr()`` output. Default: ``512``. """ def _safe_source(self, event): try: lines = event._raw_fullsource.rstrip().splitlines() if lines: return lines else: return "{source-failure}??? NO SOURCE: {source-detail}" \ "Source code string for module {!r} is empty.".format(event.module, **self.event_colors), return lines except Exception as exc: return "{source-failure}??? NO SOURCE: {source-detail}{!r}".format(exc, **self.event_colors), def _format_filename(self, event): filename = event.filename or "<???>" if len(filename) > self.filename_alignment: filename = '[...]{}'.format(filename[5 - self.filename_alignment:]) return filename
[docs] def __call__(self, event, sep=os.path.sep, join=os.path.join): """ Handle event and print filename, line number and source code. If event.kind is a `return` or `exception` also prints values. """ # context = event.tracer # alignment = context.filename_alignment = max( # getattr(context, 'filename_alignment', 5), # len(filename) # ) lines = self._safe_source(event) thread_name = threading.current_thread().name if event.tracer.threading_support else '' thread_align = self.thread_alignment if event.tracer.threading_support else '' self.stream.write( "{thread:{thread_align}}{filename}{:>{align}}{colon}:{lineno}{:<5} {kind}{:9} {code}{}{reset}\n".format( self._format_filename(event), event.lineno, event.kind, lines[0], thread=thread_name, thread_align=thread_align, align=self.filename_alignment, code=self.code_colors[event.kind], **self.event_colors )) for line in lines[1:]: self.stream.write("{thread:{thread_align}}{:>{align}} {kind}{:9} {code}{}{reset}\n".format( "", r" |", line, thread=thread_name, thread_align=thread_align, align=self.filename_alignment, code=self.code_colors[event.kind], **self.event_colors )) if event.kind in ('return', 'exception'): self.stream.write( "{thread:{thread_align}}{:>{align}} {continuation}{:9} {color}{} " "value: {detail}{}{reset}\n".format( "", "...", event.kind, self._safe_repr(event.arg), thread=thread_name, thread_align=thread_align, align=self.filename_alignment, color=self.event_colors[event.kind], **self.event_colors ))
[docs]class CallPrinter(CodePrinter): """ An action that just prints the code being executed, but unlike :obj:`hunter.CodePrinter` it indents based on callstack depth and it also shows ``repr()`` of function arguments. Args: stream (file-like): Stream to write to. Default: ``sys.stderr``. filename_alignment (int): Default size for the filename column (files are right-aligned). Default: ``40``. force_colors (bool): Force coloring. Default: ``False``. repr_limit (bool): Limit length of ``repr()`` output. Default: ``512``. .. versionadded:: 1.2.0 .. note:: This will be the default action in `hunter 2.0`. """ def __init__(self, **options): super(CallPrinter, self).__init__(**options) self.locals = defaultdict(list)
[docs] def __call__(self, event, sep=os.path.sep, join=os.path.join): """ Handle event and print filename, line number and source code. If event.kind is a `return` or `exception` also prints values. """ filename = self._format_filename(event) ident = event.module, event.function thread = threading.current_thread() thread_name = thread.name if event.tracer.threading_support else '' thread_align = self.thread_alignment if event.tracer.threading_support else '' stack = self.locals[thread.ident] if event.kind == 'call': code = event.code stack.append(ident) self.stream.write( "{thread:{thread_align}}{filename}{:>{align}}{colon}:{lineno}{:<5} {kind}{:9} {}{call}=>{normal} " "{}({}{call}{normal}){reset}\n".format( filename, event.lineno, event.kind, ' ' * (len(stack) - 1), event.function, ', '.join('{vars}{vars-name}{0}{vars}={reset}{1}'.format( var, self._safe_repr(event.locals.get(var, MISSING)), **self.event_colors ) for var in code.co_varnames[:code.co_argcount]), thread=thread_name, thread_align=thread_align, align=self.filename_alignment, **self.event_colors )) elif event.kind in ('return', 'exception'): self.stream.write( "{thread:{thread_align}}{filename}{:>{align}}{colon}:{lineno}{:<5} {kind}{:9} " "{code}{}{}{normal} {}: {reset}{}\n".format( filename, event.lineno, event.kind, ' ' * (len(stack) - 1), {'return': '<=', 'exception': '<!'}[event.kind], event.function, self._safe_repr(event.arg), thread=thread_name, thread_align=thread_align, align=self.filename_alignment, code=self.event_colors[event.kind], **self.event_colors )) if stack and stack[-1] == ident: stack.pop() else: self.stream.write( "{thread:{thread_align}}{filename}{:>{align}}{colon}:{lineno}{:<5} {kind}{:9} {reset}{}{}\n".format( filename, event.lineno, event.kind, ' ' * len(stack), event.source.strip(), thread=thread_name, thread_align=thread_align, align=self.filename_alignment, code=self.code_colors[event.kind], **self.event_colors ))
[docs]class VarsPrinter(Fields.names.globals.stream.filename_alignment, ColorStreamAction): """ An action that prints local variables and optionally global variables visible from the current executing frame. Args: *names (strings): Names to evaluate. Expressions can be used (will only try to evaluate if all the variables are present on the frame. globals (bool): Allow access to globals. Default: ``False`` (only looks at locals). stream (file-like): Stream to write to. Default: ``sys.stderr``. filename_alignment (int): Default size for the filename column (files are right-aligned). Default: ``40``. force_colors (bool): Force coloring. Default: ``False``. repr_limit (bool): Limit length of ``repr()`` output. Default: ``512``. """ def __init__(self, *names, **options): if not names: raise TypeError("VarsPrinter requires at least one variable name/expression.") self.names = { name: set(self._iter_symbols(name)) for name in names } self.globals = options.pop('globals', False) super(VarsPrinter, self).__init__(**options) @staticmethod def _iter_symbols(code): """ Iterate all the variable names in the given expression. Example: * ``self.foobar`` yields ``self`` * ``self[foobar]`` yields `self`` and ``foobar`` """ for node in ast.walk(ast.parse(code)): if isinstance(node, ast.Name): yield node.id def _safe_eval(self, code, event): """ Try to evaluate the given code on the given frame. If failure occurs, returns some ugly string with exception. """ try: return eval(code, event.globals if self.globals else {}, event.locals) except Exception as exc: return "{internal-failure}FAILED EVAL: {internal-detail}{!r}".format(exc, **self.event_colors)
[docs] def __call__(self, event): """ Handle event and print the specified variables. """ first = True frame_symbols = set(event.locals) if self.globals: frame_symbols |= set(event.globals) thread_name = threading.current_thread().name if event.tracer.threading_support else '' thread_align = self.thread_alignment if event.tracer.threading_support else '' for code, symbols in self.names.items(): try: obj = eval(code, event.globals if self.globals else {}, event.locals) except AttributeError: continue except Exception as exc: printout = "{internal-failure}FAILED EVAL: {internal-detail}{!r}".format(exc, **self.event_colors) else: printout = self._safe_repr(obj) if frame_symbols >= symbols: self.stream.write("{thread:{thread_align}}{:>{align}} {vars}{:9} {vars-name}{} {vars}=> {reset}{}{reset}\n".format( "", "vars" if first else "...", code, printout, thread=thread_name, thread_align=thread_align, align=self.filename_alignment, **self.event_colors )) first = False