Source code for hunter.predicates

from __future__ import absolute_import

import inspect
import re
from itertools import chain

from .actions import Action
from .event import Event
from .util import STRING_TYPES

__all__ = (
    'And',
    'From',
    'Not',
    'Or',
    'Query',
    'When',
)

ALLOWED_KEYS = tuple(sorted(
    i for i in Event.__dict__.keys()
    if not i.startswith('_') and i not in ('tracer', 'thread', 'frame')
))
ALLOWED_OPERATORS = (
    'startswith', 'endswith', 'in', 'contains', 'regex',
    'sw', 'ew', 'has', 'rx',
    'gt', 'gte', 'lt', 'lte',
)


[docs]class Query(object): """ A query class. See :class:`hunter.event.Event` for fields that can be filtered on. """ def __init__(self, **query): """ Args: query: criteria to match on. Accepted arguments: ``arg``, ``calls``, ``code``, ``depth``, ``filename``, ``frame``, ``fullsource``, ``function``, ``globals``, ``kind``, ``lineno``, ``locals``, ``module``, ``source``, ``stdlib``, ``threadid``, ``threadname``. """ query_eq = {} query_startswith = {} query_endswith = {} query_in = {} query_contains = {} query_regex = {} query_lt = {} query_lte = {} query_gt = {} query_gte = {} for key, value in query.items(): parts = [p for p in key.split('_') if p] count = len(parts) if count > 2: raise TypeError('Unexpected argument %r. Must be one of %s with optional operators like: %s' % ( key, ALLOWED_KEYS, ALLOWED_OPERATORS )) elif count == 2: prefix, operator = parts if operator in ('startswith', 'sw'): if not isinstance(value, STRING_TYPES): if not isinstance(value, (list, set, tuple)): raise ValueError('Value %r for %r is invalid. Must be a string, list, tuple or set.' % (value, key)) value = tuple(value) mapping = query_startswith elif operator in ('endswith', 'ew'): if not isinstance(value, STRING_TYPES): if not isinstance(value, (list, set, tuple)): raise ValueError('Value %r for %r is invalid. Must be a string, list, tuple or set.' % (value, key)) value = tuple(value) mapping = query_endswith elif operator == 'in': mapping = query_in elif operator in ('contains', 'has'): mapping = query_contains elif operator in ('regex', 'rx'): value = re.compile(value) mapping = query_regex elif operator == 'lt': mapping = query_lt elif operator == 'lte': mapping = query_lte elif operator == 'gt': mapping = query_gt elif operator == 'gte': mapping = query_gte else: raise TypeError('Unexpected operator %r. Must be one of %s.' % (operator, ALLOWED_OPERATORS)) else: mapping = query_eq prefix = key if prefix not in ALLOWED_KEYS: raise TypeError('Unexpected argument %r. Must be one of %s.' % (key, ALLOWED_KEYS)) mapping[prefix] = value self.query_eq = tuple(sorted(query_eq.items())) self.query_startswith = tuple(sorted(query_startswith.items())) self.query_endswith = tuple(sorted(query_endswith.items())) self.query_in = tuple(sorted(query_in.items())) self.query_contains = tuple(sorted(query_contains.items())) self.query_regex = tuple(sorted(query_regex.items())) self.query_lt = tuple(sorted(query_lt.items())) self.query_lte = tuple(sorted(query_lte.items())) self.query_gt = tuple(sorted(query_gt.items())) self.query_gte = tuple(sorted(query_gte.items())) def __str__(self): return 'Query(%s)' % ( ', '.join( ', '.join('%s%s=%r' % (key, kind, value) for key, value in mapping) for kind, mapping in [ ('', self.query_eq), ('_in', self.query_in), ('_contains', self.query_contains), ('_startswith', self.query_startswith), ('_endswith', self.query_endswith), ('_regex', self.query_regex), ('_lt', self.query_lt), ('_lte', self.query_lte), ('_gt', self.query_gt), ('_gte', self.query_gte), ] if mapping ) ) def __repr__(self): return '<hunter.predicates.Query: %s>' % ' '.join( fmt % (mapping,) for fmt, mapping in [ ('query_eq=%r', self.query_eq), ('query_in=%r', self.query_in), ('query_contains=%r', self.query_contains), ('query_startswith=%r', self.query_startswith), ('query_endswith=%r', self.query_endswith), ('query_regex=%r', self.query_regex), ('query_lt=%r', self.query_lt), ('query_lte=%r', self.query_lte), ('query_gt=%r', self.query_gt), ('query_gte=%r', self.query_gte), ] if mapping ) def __eq__(self, other): return ( isinstance(other, Query) and self.query_eq == other.query_eq and self.query_in == other.query_in and self.query_contains == other.query_contains and self.query_startswith == other.query_startswith and self.query_endswith == other.query_endswith and self.query_regex == other.query_regex and self.query_lt == other.query_lt and self.query_lte == other.query_lte and self.query_gt == other.query_gt and self.query_gte == other.query_gte )
[docs] def __call__(self, event): """ Handles event. Returns True if all criteria matched. """ for key, value in self.query_eq: evalue = event[key] if evalue != value: return False for key, value in self.query_in: evalue = event[key] if evalue not in value: return False for key, value in self.query_contains: evalue = event[key] if value not in evalue: return False for key, value in self.query_startswith: evalue = event[key] if not evalue.startswith(value): return False for key, value in self.query_endswith: evalue = event[key] if not evalue.endswith(value): return False for key, value in self.query_regex: evalue = event[key] if not value.match(evalue): return False for key, value in self.query_gt: evalue = event[key] if not evalue > value: return False for key, value in self.query_gte: evalue = event[key] if not evalue >= value: return False for key, value in self.query_lt: evalue = event[key] if not evalue < value: return False for key, value in self.query_lte: evalue = event[key] if not evalue <= value: return False return True
def __or__(self, other): """ Convenience API so you can do ``Q() | Q()``. It converts that to ``Or(Q(), Q())``. """ return Or(self, other) def __and__(self, other): """ Convenience API so you can do ``Q() & Q()``. It converts that to ``And(Q(), Q())``. """ return And(self, other) def __invert__(self): return Not(self) __ror__ = __or__ __rand__ = __and__
[docs]class When(object): """ Runs ``actions`` when ``condition(event)`` is ``True``. Actions take a single ``event`` argument. """ def __init__(self, condition, *actions): if not actions: raise TypeError('Must give at least one action.') self.condition = condition self.actions = tuple( action() if inspect.isclass(action) and issubclass(action, Action) else action for action in actions) def __str__(self): return 'When(%s, %s)' % ( self.condition, ', '.join(repr(p) for p in self.actions) ) def __repr__(self): return '<hunter.predicates.When: condition=%r, actions=%r>' % (self.condition, self.actions) def __eq__(self, other): return ( isinstance(other, When) and self.condition == other.condition and self.actions == other.actions )
[docs] def __call__(self, event): """ Handles the event. """ if self.condition(event): for action in self.actions: action(event) return True else: return False
def __or__(self, other): return Or(self, other) def __and__(self, other): return And(self, other) def __invert__(self): return Not(self) __ror__ = __or__ __rand__ = __and__
[docs]class From(object): """ From-point filtering mechanism. Switches on to running the predicate after condition maches, and switches off when the depth returns to the same level. After ``condition(event)`` returns ``True`` the ``event.depth`` will be saved and calling this object with an ``event`` will return ``predicate(event)`` until ``event.depth - watermark`` is equal to the depth that was saved. Args: condition (callable): A callable that returns True/False or a :class:`hunter.predicates.Query` object. predicate (callable): Optional callable that returns True/False or a :class:`hunter.predicates.Query` object to run after ``condition`` first returns ``True``. watermark (int): Depth difference to switch off and wait again on ``condition``. """ def __init__(self, condition, predicate=None, watermark=0): self.condition = condition self.predicate = predicate self.watermark = watermark self.waiting_for_condition = True self.depth = -1 def __str__(self): return 'From(%s, %s)' % (self.condition, self.predicate) def __repr__(self): return '<hunter.predicates.From: condition=%r, predicate=%r>' % (self.condition, self.predicate) def __eq__(self, other): return ( isinstance(other, From) and self.condition == other.condition and self.predicate == other.predicate )
[docs] def __call__(self, event): """ Handles the event. """ if event.depth - self.watermark <= self.depth: self.waiting_for_condition = True self.depth = -1 if self.waiting_for_condition: if self.condition(event): self.waiting_for_condition = False self.depth = event.depth else: return False if self.predicate is None: return True else: return self.predicate(event)
def __or__(self, other): return From(Or(self.condition, other), self.predicate) def __and__(self, other): return From(self.condition, And(self.predicate, other)) def __invert__(self): return Not(self) __ror__ = __or__ __rand__ = __and__
[docs]class And(object): """ Returns ``False`` at the first sub-predicate that returns ``False``, otherwise returns ``True``. """ def __init__(self, *predicates): self.predicates = predicates def __str__(self): return 'And(%s)' % ', '.join(str(p) for p in self.predicates) def __repr__(self): return '<hunter.predicates.And: predicates=%r>' % (self.predicates,)
[docs] def __call__(self, event): """ Handles the event. """ for predicate in self.predicates: if not predicate(event): return False else: return True
def __eq__(self, other): return ( isinstance(other, And) and self.predicates == other.predicates ) def __or__(self, other): return Or(self, other) def __and__(self, other): return And(*chain(self.predicates, other.predicates if isinstance(other, And) else (other,))) def __invert__(self): return Not(self) def __hash__(self): return hash(frozenset(self.predicates)) __ror__ = __or__ __rand__ = __and__
[docs]class Or(object): """ Returns ``True`` after the first sub-predicate that returns ``True``. """ def __init__(self, *predicates): self.predicates = predicates def __str__(self): return 'Or(%s)' % ', '.join(str(p) for p in self.predicates) def __repr__(self): return '<hunter.predicates.Or: predicates=%r>' % (self.predicates,)
[docs] def __call__(self, event): """ Handles the event. """ for predicate in self.predicates: if predicate(event): return True else: return False
def __eq__(self, other): return ( isinstance(other, Or) and self.predicates == other.predicates ) def __or__(self, other): return Or(*chain(self.predicates, other.predicates if isinstance(other, Or) else (other,))) def __and__(self, other): return And(self, other) def __invert__(self): return Not(self) def __hash__(self): return hash(frozenset(self.predicates)) __ror__ = __or__ __rand__ = __and__
[docs]class Not(object): """ Simply returns ``not predicate(event)``. """ def __init__(self, predicate): self.predicate = predicate def __str__(self): return 'Not(%s)' % self.predicate def __repr__(self): return '<hunter.predicates.Not: predicate=%r>' % self.predicate def __eq__(self, other): return ( isinstance(other, Not) and self.predicate == other.predicate )
[docs] def __call__(self, event): """ Handles the event. """ return not self.predicate(event)
def __or__(self, other): if isinstance(other, Not): return Not(And(self.predicate, other.predicate)) else: return Or(self, other) def __and__(self, other): if isinstance(other, Not): return Not(Or(self.predicate, other.predicate)) else: return And(self, other) def __invert__(self): return self.predicate __ror__ = __or__ __rand__ = __and__