# ruff: noqa: B008
import collections
import opcode
import os
import threading
from collections import defaultdict
from itertools import islice
from os import getpid
from . import config
from .util import BUILTIN_SYMBOLS
from .util import CALL_COLORS
from .util import CODE_COLORS
from .util import MISSING
from .util import OTHER_COLORS
from .util import builtins
from .util import frame_iterator
from .util import get_arguments
from .util import iter_symbols
from .util import safe_repr
from .vendor.colorama import AnsiToWin32
try:
from threading import get_ident
except ImportError:
from thread import get_ident
__all__ = [
'Action',
'Debugger',
'Manhole',
'CodePrinter',
'CallPrinter',
'VarsPrinter',
]
BUILTIN_REPR_FUNCS = {'repr': repr, 'safe_repr': safe_repr}
class Action:
def __call__(self, event):
raise NotImplementedError
class LazyImportPdb:
def __repr__(self):
return "<class 'pdb.Pdb'>"
__str__ = __repr__
def __call__(self, **kwargs):
from pdb import Pdb
return Pdb(**kwargs)
[docs]class Debugger(Action):
"""
An action that starts ``pdb``.
"""
[docs] def __init__(self, klass=config.Default('klass', LazyImportPdb()), **kwargs):
self.klass = config.resolve(klass)
self.kwargs = kwargs
[docs] def __eq__(self, other):
return type(self) is type(other) and self.klass == other.klass and self.kwargs == other.kwargs
[docs] def __str__(self):
return '{0.__class__.__name__}(klass={0.klass}, kwargs={0.kwargs})'.format(self)
[docs] def __repr__(self):
return '{0.__class__.__name__}(klass={0.klass!r}, kwargs={0.kwargs!r})'.format(self)
[docs] def __call__(self, event):
"""
Runs a ``pdb.set_trace`` at the matching frame.
"""
self.klass(**self.kwargs).set_trace(event.frame)
[docs]class Manhole(Action):
[docs] def __init__(self, **options):
self.options = options
[docs] def __eq__(self, other):
return type(self) is type(other) and self.options == other.options
[docs] def __str__(self):
return '{0.__class__.__name__}(options={0.options})'.format(self)
[docs] def __repr__(self):
return '{0.__class__.__name__}(options={0.options!r})'.format(self)
[docs] def __call__(self, event):
import manhole
inst = manhole.install(strict=False, thread=False, **self.options)
inst.handle_oneshot()
[docs]class ColorStreamAction(Action):
"""
Baseclass for your custom action. Just implement your own ``__call__``.
"""
_stream_cache = {}
_stream = None
_tty = None
_repr_func = None
OTHER_COLORS = OTHER_COLORS
EVENT_COLORS = CODE_COLORS
[docs] def __init__(
self,
stream=config.Default('stream', None),
force_colors=config.Default('force_colors', False),
force_pid=config.Default('force_pid', False),
filename_alignment=config.Default('filename_alignment', 40),
thread_alignment=config.Default('thread_alignment', 12),
pid_alignment=config.Default('pid_alignment', 9),
repr_limit=config.Default('repr_limit', 1024),
repr_func=config.Default('repr_func', 'safe_repr'),
):
self.force_colors = config.resolve(force_colors)
self.force_pid = config.resolve(force_pid)
stream = config.resolve(stream)
if stream is None:
from . import _default_stream as stream
self.stream = stream
self.filename_alignment = config.resolve(filename_alignment)
self.thread_alignment = config.resolve(thread_alignment)
self.pid_alignment = config.resolve(pid_alignment)
self.repr_limit = config.resolve(repr_limit)
self.repr_func = config.resolve(repr_func)
self.seen_threads = set()
self.seen_pid = getpid()
[docs] def __eq__(self, other):
return (
isinstance(other, type(self))
and self.stream == other.stream
and self.force_colors == other.force_colors
and self.filename_alignment == other.filename_alignment
and self.thread_alignment == other.thread_alignment
and self.pid_alignment == other.pid_alignment
and self.repr_limit == other.repr_limit
and self.repr_func == other.repr_func
)
[docs] def __str__(self):
return (
'{0.__class__.__name__}(stream={0.stream}, force_colors={0.force_colors}, '
'filename_alignment={0.filename_alignment}, thread_alignment={0.thread_alignment}, '
'pid_alignment={0.pid_alignment} repr_limit={0.repr_limit}, '
'repr_func={0.repr_func})'.format(self)
)
[docs] def __repr__(self):
return (
'{0.__class__.__name__}(stream={0.stream!r}, force_colors={0.force_colors!r}, '
'filename_alignment={0.filename_alignment!r}, thread_alignment={0.thread_alignment!r}, '
'pid_alignment={0.pid_alignment!r} repr_limit={0.repr_limit!r}, '
'repr_func={0.repr_func!r})'.format(self)
)
@property
def stream(self):
return self._stream
@stream.setter
def stream(self, value):
if isinstance(value, str):
if value in self._stream_cache:
value = self._stream_cache[value]
else:
value = self._stream_cache[value] = open(value, 'a', buffering=0)
isatty = getattr(value, 'isatty', None)
if self.force_colors or (isatty and isatty() and os.name != 'java'):
self._stream = AnsiToWin32(value, strip=False)
self._tty = True
self.event_colors = self.EVENT_COLORS
self.other_colors = self.OTHER_COLORS
else:
self._tty = False
self._stream = value
self.event_colors = {key: '' for key in self.EVENT_COLORS}
self.other_colors = {key: '' for key in self.OTHER_COLORS}
@property
def repr_func(self):
return self._repr_func
@repr_func.setter
def repr_func(self, value):
if callable(value):
self._repr_func = value
elif value in BUILTIN_REPR_FUNCS:
self._repr_func = BUILTIN_REPR_FUNCS[value]
else:
raise TypeError(f'Expected a callable or either "repr" or "safe_repr" strings, not {value!r}.')
[docs] def try_str(self, obj):
"""
Safely call ``str(obj)``. Failures will have special colored output and output is trimmed according
to ``self.repr_limit``.
Only used when dumping detached events.
Returns: string
"""
limit = self.repr_limit
try:
s = str(obj)
s = s.replace('\n', r'\n')
if len(s) > limit:
cutoff = limit // 2
return '{} {CONT}[...]{RESET} {}'.format(s[:cutoff], s[-cutoff:], **self.other_colors)
else:
return s
except Exception as exc:
return '{INTERNAL-FAILURE}!!! FAILED REPR: {INTERNAL-DETAIL}{!r}{RESET}'.format(exc, **self.other_colors)
[docs] def try_repr(self, obj):
"""
Safely call ``self.repr_func(obj)``. Failures will have special colored output and output is trimmed according
to ``self.repr_limit``.
Returns: string
"""
limit = self.repr_limit
try:
s = self.repr_func(obj)
s = s.replace('\n', r'\n')
if len(s) > limit:
cutoff = limit // 2
return '{} {CONT}[...]{RESET} {}'.format(s[:cutoff], s[-cutoff:], **self.other_colors)
else:
return s
except Exception as exc:
return '{INTERNAL-FAILURE}!!! FAILED REPR: {INTERNAL-DETAIL}{!r}{RESET}'.format(exc, **self.other_colors)
[docs] def try_source(self, event, full=False):
"""
Get a failure-colorized source for the given ``event``.
Return: string
"""
source = event.fullsource if full else event.source
if source.startswith('??? NO SOURCE: '):
return '{SOURCE-FAILURE}??? NO SOURCE: {SOURCE-DETAIL}{}'.format(source[15:], **self.other_colors)
elif source:
return source
else:
return '{SOURCE-FAILURE}??? NO SOURCE: {SOURCE-DETAIL}Source code string for {!r} is empty.'.format(
event.filename, **self.other_colors
)
[docs] def filename_prefix(self, event=None):
"""
Get an aligned and trimmed filename prefix for the given ``event``.
Returns: string
"""
if event:
filename = event.filename
if filename == '?':
filename = '{SOURCE-FAILURE}?'.format(**self.other_colors)
lineno = '{COLON}:{LINENO}{:<5}'.format(event.lineno, **self.other_colors)
if len(filename) > self.filename_alignment:
filename = f'[...]{filename[5 - self.filename_alignment :]}'
return '{:>{}}{} '.format(filename, self.filename_alignment, lineno, **self.other_colors)
else:
return '{:>{}} '.format('', self.filename_alignment)
[docs] def pid_prefix(self):
"""
Get an aligned and trimmed pid prefix.
"""
pid = getpid()
if self.force_pid or self.seen_pid != pid:
pid = f'[{pid}]'
pid_align = self.pid_alignment
else:
pid = pid_align = ''
return '{:{}}'.format(pid, pid_align)
[docs] def thread_prefix(self, event):
"""
Get an aligned and trimmed thread prefix for the given ``event``.
"""
self.seen_threads.add(get_ident())
if event.threading_support is False:
threading_support = False
elif event.threading_support:
threading_support = True
else:
threading_support = len(self.seen_threads) > 1
thread_name = threading.current_thread().name if threading_support else ''
thread_align = self.thread_alignment if threading_support else ''
return '{:{}}'.format(thread_name, thread_align)
[docs] def output(self, format_str, *args, **kwargs):
"""
Write ``format_str.format(*args, **ANSI_COLORS, **kwargs)`` to ``self.stream``.
For ANSI coloring you can place these in the ``format_str``:
* ``{BRIGHT}``
* ``{DIM}``
* ``{NORMAL}``
* ``{RESET}``
* ``{fore(BLACK)}``
* ``{fore(RED)}``
* ``{fore(GREEN)}``
* ``{fore(YELLOW)}``
* ``{fore(BLUE)}``
* ``{fore(MAGENTA)}``
* ``{fore(CYAN)}``
* ``{fore(WHITE)}``
* ``{fore(RESET)}``
* ``{back(BLACK)}``
* ``{back(RED)}``
* ``{back(GREEN)}``
* ``{back(YELLOW)}``
* ``{back(BLUE)}``
* ``{back(MAGENTA)}``
* ``{back(CYAN)}``
* ``{back(WHITE)}``
* ``{back(RESET)}``
Args:
format_str: a PEP-3101 format string
*args:
**kwargs:
Returns: string
"""
self.stream.write(format_str.format(*args, **dict(self.other_colors, **kwargs)))
def cleanup(self):
pass
[docs]class CodePrinter(ColorStreamAction):
"""
An action that just prints the code being executed.
Args:
stream (file-like): Stream to write to. Default: ``sys.stderr``.
filename_alignment (int): Default size for the filename column (files are right-aligned). Default: ``40``.
force_colors (bool): Force coloring. Default: ``False``.
repr_limit (bool): Limit length of ``repr()`` output. Default: ``512``.
repr_func (string or callable): Function to use instead of ``repr``.
If string must be one of 'repr' or 'safe_repr'. Default: ``'safe_repr'``.
"""
[docs] def __call__(self, event):
"""
Handle event and print filename, line number and source code. If event.kind is a `return` or `exception` also
prints values.
"""
lines = self.try_source(event, full=True).splitlines()
pid_prefix = self.pid_prefix()
thread_prefix = self.thread_prefix(event)
filename_prefix = self.filename_prefix(event)
self.output(
'{}{}{}{KIND}{:9} {COLOR}{}{RESET}\n',
pid_prefix,
thread_prefix,
filename_prefix,
event.kind,
lines[0],
COLOR=self.event_colors.get(event.kind),
)
if len(lines) > 1:
empty_filename_prefix = self.filename_prefix()
for line in lines[1:-1]:
self.output(
'{}{}{}{KIND}{:9} {COLOR}{}{RESET}\n',
pid_prefix,
thread_prefix,
empty_filename_prefix,
' |',
line,
COLOR=self.event_colors.get(event.kind),
)
self.output(
'{}{}{}{KIND}{:9} {COLOR}{}{RESET}\n',
pid_prefix,
thread_prefix,
empty_filename_prefix,
' *',
lines[-1],
COLOR=self.event_colors.get(event.kind),
)
if event.kind in ('return', 'exception'):
self.output(
'{}{}{}{CONT}{:9} {COLOR}{} value: {NORMAL}{}{RESET}\n',
pid_prefix,
thread_prefix,
self.filename_prefix(),
'...',
event.kind,
self.try_str(event.arg) if event.detached else self.try_repr(event.arg),
COLOR=self.event_colors.get(event.kind),
)
[docs]class CallPrinter(CodePrinter):
"""
An action that just prints the code being executed, but unlike :obj:`hunter.CodePrinter` it indents based on
callstack depth and it also shows ``repr()`` of function arguments.
Args:
stream (file-like): Stream to write to. Default: ``sys.stderr``.
filename_alignment (int): Default size for the filename column (files are right-aligned). Default: ``40``.
force_colors (bool): Force coloring. Default: ``False``.
repr_limit (bool): Limit length of ``repr()`` output. Default: ``512``.
repr_func (string or callable): Function to use instead of ``repr``.
If string must be one of 'repr' or 'safe_repr'. Default: ``'safe_repr'``.
.. versionadded:: 1.2.0
"""
EVENT_COLORS = CALL_COLORS
locals = defaultdict(list)
@classmethod
def cleanup(cls):
cls.locals = defaultdict(list)
[docs] def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
[docs] def __call__(self, event):
"""
Handle event and print filename, line number and source code. If event.kind is a `return` or `exception` also
prints values.
"""
ident = event.module, event.function
thread = threading.current_thread()
stack = self.locals[thread.ident]
pid_prefix = self.pid_prefix()
thread_prefix = self.thread_prefix(event)
filename_prefix = self.filename_prefix(event)
if event.kind == 'call':
if event.builtin:
self.output(
'{}{}{}{KIND}{:9} {}{COLOR} >{BUILTIN} {}.{}: {}{RESET}\n',
pid_prefix,
thread_prefix,
filename_prefix,
event.kind,
' ' * (len(stack) - 1),
event.module,
event.function,
self.try_source(event).strip(),
COLOR=self.event_colors.get(event.kind),
)
else:
code = event.code
stack.append(ident)
self.output(
'{}{}{}{KIND}{:9} {}{COLOR}=>{NORMAL} {}({}{COLOR}{NORMAL}){RESET}\n',
pid_prefix,
thread_prefix,
filename_prefix,
event.kind,
' ' * (len(stack) - 1),
event.function,
', '.join(
'{VARS}{0}{VARS-NAME}{1}{VARS}={RESET}{2}'.format(
prefix,
var_display,
self.try_str(event.locals.get(var_lookup, MISSING))
if event.detached
else self.try_repr(event.locals.get(var_lookup, MISSING)),
**self.other_colors,
)
for prefix, var_lookup, var_display in get_arguments(code)
),
COLOR=self.event_colors.get(event.kind),
)
elif event.kind == 'exception':
if event.builtin:
self.output(
'{}{}{}{KIND}{:9} {}{COLOR} ! {BUILTIN}{}.{}{RESET}\n',
pid_prefix,
thread_prefix,
filename_prefix,
event.kind,
' ' * (len(stack) - 1),
event.module,
event.function,
COLOR=self.event_colors.get(event.kind),
)
else:
self.output(
'{}{}{}{KIND}{:9} {}{COLOR} !{NORMAL} {}: {RESET}{}\n',
pid_prefix,
thread_prefix,
filename_prefix,
event.kind,
' ' * (len(stack) - 1),
event.function,
self.try_str(event.arg) if event.detached else self.try_repr(event.arg),
COLOR=self.event_colors.get(event.kind),
)
elif event.kind == 'return':
if event.builtin:
self.output(
'{}{}{}{KIND}{:9} {}{COLOR} <{BUILTIN} {}.{}{RESET}\n',
pid_prefix,
thread_prefix,
filename_prefix,
event.kind,
' ' * (len(stack) - 1),
event.module,
event.function,
COLOR=self.event_colors.get(event.kind),
)
else:
self.output(
'{}{}{}{KIND}{:9} {}{COLOR}<={NORMAL} {}{}{RESET}{}\n',
pid_prefix,
thread_prefix,
filename_prefix,
event.kind,
' ' * (len(stack) - 1),
event.function,
'' if event.builtin else ': ',
'' if event.builtin else (self.try_str(event.arg) if event.detached else self.try_repr(event.arg)),
COLOR=self.event_colors.get(event.kind),
)
if stack and stack[-1] == ident:
stack.pop()
else:
self.output(
'{}{}{}{KIND}{:9} {RESET}{}{}{RESET}\n',
pid_prefix,
thread_prefix,
filename_prefix,
event.kind,
' ' * len(stack),
self.try_source(event).strip(),
)
[docs]class VarsPrinter(ColorStreamAction):
"""
An action that prints local variables and optionally global variables visible from the current executing frame.
Args:
*names (strings): Names to evaluate. Expressions can be used (will only try to evaluate if all the variables are
present on the frame.
stream (file-like): Stream to write to. Default: ``sys.stderr``.
filename_alignment (int): Default size for the filename column (files are right-aligned). Default: ``40``.
force_colors (bool): Force coloring. Default: ``False``.
repr_limit (bool): Limit length of ``repr()`` output. Default: ``512``.
repr_func (string or callable): Function to use instead of ``repr``.
If string must be one of 'repr' or 'safe_repr'. Default: ``'safe_repr'``.
"""
[docs] def __init__(self, *names, **options):
if not names:
raise TypeError('VarsPrinter requires at least one variable name/expression.')
self.names = {name: set(iter_symbols(name)) for name in names}
super().__init__(**options)
[docs] def __call__(self, event):
"""
Handle event and print the specified variables.
"""
first = True
frame_symbols = set(event.locals)
frame_symbols.update(BUILTIN_SYMBOLS)
frame_symbols.update(event.globals)
pid_prefix = self.pid_prefix()
thread_prefix = self.thread_prefix(event)
filename_prefix = self.filename_prefix(event)
empty_filename_prefix = self.filename_prefix()
for code, symbols in self.names.items():
try:
obj = eval(code, dict(vars(builtins), **event.globals), event.locals)
except (AttributeError, KeyError):
continue
except Exception as exc:
printout = '{INTERNAL-FAILURE}FAILED EVAL: {INTERNAL-DETAIL}{!r}'.format(exc, **self.other_colors)
else:
printout = self.try_str(obj) if event.detached else self.try_repr(obj)
if frame_symbols >= symbols:
if first:
self.output(
'{}{}{}{KIND}{:9} {VARS}[{VARS-NAME}{} {VARS}=> {RESET}{}{VARS}]{RESET}\n',
pid_prefix,
thread_prefix,
filename_prefix,
event.kind,
code,
printout,
)
first = False
else:
self.output(
'{}{}{}{CONT}... {VARS}[{VARS-NAME}{} {VARS}=> {RESET}{}{VARS}]{RESET}\n',
pid_prefix,
thread_prefix,
empty_filename_prefix,
code,
printout,
)
RETURN_VALUE = opcode.opmap['RETURN_VALUE']
[docs]class ErrorSnooper(CodePrinter):
"""
An action that prints events around silenced exceptions. Note that it inherits the output of :class:`~hunter.actions.CodePrinter` so no
fancy call indentation.
.. warning::
Should be considered experimental. May show lots of false positives especially if you're tracing lots of clumsy code like:
.. sourcecode:: python
try:
stuff = something[key]
except KeyError:
stuff = "default"
Args:
max_backlog (int): Maximum number of events to record and display before the silenced exception is raised.
Set to 0 to disable and get a speed boost. Default: ``10``.
max_events (int): Maximum number of events to record and display for each detected silenced exception. Default: ``50``.
max_depth (int): Increase if you want to drill into subsequent calls after an exception is raised. If you increase this you might
want to also increase ``max_events`` since subsequent calls may have so many events you won't get to see the return event.
Default: ``0`` (doesn't drill into any calls).
stream (file-like): Stream to write to. Default: ``sys.stderr``.
filename_alignment (int): Default size for the filename column (files are right-aligned). Default: ``40``.
force_colors (bool): Force coloring. Default: ``False``.
repr_limit (bool): Limit length of ``repr()`` output. Default: ``512``.
repr_func (string or callable): Function to use instead of ``repr``.
If string must be one of 'repr' or 'safe_repr'. Default: ``'safe_repr'``.
.. versionadded:: 3.1.0
"""
[docs] def __init__(self, *args, **kwargs):
self.max_backlog = kwargs.pop('max_backlog', 10)
self.backlog = collections.deque(maxlen=self.max_backlog)
self.max_events = kwargs.pop('max_events', 50)
self.max_depth = kwargs.pop('max_depth', 0)
self.origin = None
self.events = None
super().__init__(*args, **kwargs)
[docs] def __call__(self, event):
if self.max_backlog:
detached_event = event.detach(self.try_repr)
self.backlog.append(detached_event)
else:
detached_event = None
if event.kind == 'exception': # something interesting happened ;)
if detached_event is None:
detached_event = event.detach(self.try_repr)
if self.origin:
self.events.append(detached_event)
if self.origin.depth > event.depth:
self.origin = detached_event
else:
self.events = list(self.backlog)
self.origin = detached_event
elif self.origin:
if event.kind == 'return':
if detached_event is None:
detached_event = event.detach(self.try_repr)
self.events.append(detached_event)
if event.depth == self.origin.depth - 1: # stop if the same function returned (depth is -1)
if event.instruction == RETURN_VALUE:
self.dump_events()
self.output(
'{BRIGHT}{fore(BLACK)}{} function exit{RESET}\n',
'-' * 46,
)
self.origin = None
self.events = None
elif event.depth > self.origin.depth + self.max_depth: # too many details
return
elif len(self.events) > self.max_events:
self.dump_events()
self.output('{BRIGHT}{fore(BLACK)}{} too many lines{RESET}\n', '-' * 46)
else:
if detached_event is None:
detached_event = event.detach(self.try_repr)
self.events.append(detached_event)
def dump_events(self):
self.output(
'{BRIGHT}{fore(BLUE)}{} tracing {fore(YELLOW)}{}{fore(BLUE)} on {fore(RED)}{}{RESET}\n',
'>' * 46,
self.origin.function,
self.origin.arg,
)
for event in self.events:
super().__call__(event)
self.origin = None
self.events = None
[docs]class StackPrinter(ColorStreamAction):
"""
An action that prints a one-line stacktrace.
Args:
depth (int): The maximum number of frames to show.
limit (int): The maximum number of components to show in path. Eg: ``limit=2`` means it will show 1 parent: ``foo/bar.py``.
stream (file-like): Stream to write to. Default: ``sys.stderr``.
filename_alignment (int): Default size for the filename column (files are right-aligned). Default: ``40``.
force_colors (bool): Force coloring. Default: ``False``.
repr_limit (bool): Limit length of ``repr()`` output. Default: ``512``.
repr_func (string or callable): Function to use instead of ``repr``.
If string must be one of 'repr' or 'safe_repr'. Default: ``'safe_repr'``.
"""
[docs] def __init__(self, depth=15, limit=2, **options):
self.limit = limit
self.depth = depth
super().__init__(**options)
[docs] def __call__(self, event):
"""
Handle event and print the stack.
"""
pid_prefix = self.pid_prefix()
thread_prefix = self.thread_prefix(event)
filename_prefix = self.filename_prefix(event).rstrip()
sep = os.path.sep
if event.frame and event.frame.f_back:
template = '{{}}{{}}{{}}{{CONT}}:{{BRIGHT}}{{fore(BLUE)}}{} {{KIND}}<={{RESET}} {}'.format(
event.function,
' {KIND}<={RESET} '.join(
'{}{{CONT}}:{{RESET}}{}{{CONT}}:{{BRIGHT}}{{fore(BLUE)}}{}'.format(
'/'.join(frame.f_code.co_filename.split(sep)[-self.limit :]),
frame.f_lineno,
frame.f_code.co_name,
)
for frame in islice(frame_iterator(event.frame.f_back), self.depth)
),
)
else:
template = '{{}}{{}}{{}}{{CONT}}:{{BRIGHT}}{{fore(BLUE)}}{} {{KIND}}<= {{BRIGHT}}{{fore(YELLOW)}}no frames available {{NORMAL}}(detached={})'.format(
event.function,
event.detached,
)
template += '{RESET}\n'
self.output(
template,
pid_prefix,
thread_prefix,
filename_prefix,
)