Select Git revision
query_managers.py
query_managers.py 15.47 KiB
import operator
import re
# https://anaconda.org/conda-forge/boolean.py
import boolean
# https://pypi.org/project/python-intervals/
import intervals as I
from functools import reduce
from itertools import chain
from django.core.exceptions import ValidationError
from django.db.models import Q
from django.utils.translation import gettext as _
# TODO update
'''
QueryManager and its subclasses implement make_queries() method
returning lists of Q objects to be applied in a cascade of filter() calls.
The use of Q objects is necessary to allow for alternatives in queries:
the union() and intersection() methods of QuerySets yield a QuerySet that does not support
further filtering.
'''
DUMMY_LOOKUP = 'DUMMY'
# https://docs.djangoproject.com/en/2.2/ref/forms/validation/#raising-validationerror
# ValidationError params don’t work with str.format(), must use old-style % formatting
class QueryManager(object):
def __init__(self, lookup, default_conjunction=True):
self.lookup = lookup
self.default_conjunction=default_conjunction
# https://stackoverflow.com/questions/310732/in-django-how-does-one-filter-a-queryset-with-dynamic-field-lookups
def make_Q(self, lookup, value):
return Q(**{lookup : value})
def make_queries(self, value, conjunction):
if self.lookup == DUMMY_LOOKUP:
return []
return self._make_queries(self.lookup, value, conjunction)
def _make_queries(self, lookup, value, conjunction):
raise NotImplementedError
class SingleValueQueryManager(QueryManager):
def _make_queries(self, lookup, value, conjunction):
return [self.make_Q(lookup, value)]
class SingleRegexQueryManager(SingleValueQueryManager):
def make_Q(self, lookup, value):
return super().make_Q(lookup + '__iregex', r'^{}$'.format(value))
class MultiValueQueryManager(QueryManager):
def _make_queries(self, lookup, values, conjunction):
queries = [self.make_Q(lookup, value) for value in values]
if conjunction:
return list(queries)
else:
return [reduce(operator.or_, queries)]
class ExpressionAlgebra(boolean.BooleanAlgebra):
TOKENS = None
# override to do sth with the symbol (e.g. escape a regex)
def make_symbol(self, symbol):
return symbol
def valid_symbol_begin(self, char):
raise NotImplementedError
def allowed(self, char):
raise NotImplementedError
def literal_validator(self, literal):
raise NotImplementedError
# modified from boolean.BooleanAlgebra.tokenize
def tokenize(self, expr):
if not isinstance(expr, str):
raise TypeError('expr must be string but it is %s.' % type(expr))
position = 0
length = len(expr)
while position < length:
tok = expr[position]
sym = self.valid_symbol_begin(tok)
if sym:
position += 1
while position < length:
char = expr[position]
if self.allowed(char):
position += 1
tok += char
else:
break
position -= 1
try:
yield self.TOKENS[tok.lower()], tok, position
except KeyError:
if sym:
yield boolean.TOKEN_SYMBOL, self.make_symbol(tok), position
elif tok not in (' ', '\t', '\r', '\n'):
raise boolean.ParseError(token_string=tok, position=position, error_code=1)
position += 1
class RangesAlgebra(ExpressionAlgebra):
TOKENS = {
'&' : boolean.TOKEN_AND,
'and' : boolean.TOKEN_AND,
'|' : boolean.TOKEN_OR,
'or' : boolean.TOKEN_OR,
'~' : boolean.TOKEN_NOT,
'!' : boolean.TOKEN_NOT,
'not' : boolean.TOKEN_NOT,
}
OPEN_RANGE = '*'
def valid_symbol_begin(self, char):
return char == '['
def allowed(self, char):
return char.isdigit() or char in ',]' + self.OPEN_RANGE
def literal_validator(self, literal):
literal = literal.obj
if literal[0] != '[' or literal[-1] != ']':
raise ValidationError(_('Zakres musi być ograniczony nawiasami kwadratowymi [...]: %(x)s.'), params={'x': literal}, code='invalid')
inside = literal[1:-1]
ends = [x.strip() for x in inside.split(',')]
if len(ends) != 2:
raise ValidationError(_('Zakres musi mieć dwa końce (podano %(n)d): %(x)s.'), params={'n' : len(ends), 'x': literal}, code='invalid')
lo, hi = ends
for e in (lo, hi):
if not e.isdigit() and e != self.OPEN_RANGE:
raise ValidationError(_('Ograniczenie zakresu musi być liczbą lub znakiem %(c)s: %(x)s.'), params={'c' : self.OPEN_RANGE, 'x': e}, code='invalid')
if lo.isdigit() and hi.isdigit() and int(lo) > int(hi):
raise ValidationError(_('Pusty zakres: %(x)s.'), params={'x': literal}, code='invalid')
class RegexAlgebra(ExpressionAlgebra):
TOKENS = {
'&' : boolean.TOKEN_AND,
'and' : boolean.TOKEN_AND,
'|' : boolean.TOKEN_OR,
'or' : boolean.TOKEN_OR,
'~' : boolean.TOKEN_NOT,
'!' : boolean.TOKEN_NOT,
'not' : boolean.TOKEN_NOT,
}
ALLOWED = '.?*+,()_[]{}- '
def make_symbol(self, symbol):
# Can’t use re.escape – we only want to escape parentheses
ret = ''
for char in symbol:
if char in '()[]{}':
ret += '\\'
ret += char
return ret
def valid_symbol_begin(self, char):
# TODO what else can a valid regex start with in the context of filters?
return char.isalnum() or char in self.ALLOWED
def allowed(self, char):
return char.isalnum() or char in self.ALLOWED
def literal_validator(self, literal):
try:
re.compile(literal.obj)
except re.error as e:
raise ValidationError(_('Niepoprawne wyrażenie regularne: %(x)s (%(msg)s).'), params={'x' : literal.obj, 'msg': _(str(e))}, code='invalid')
class ExpressionQueryManager(QueryManager):
expr_parser = None
def __init__(self, lookup, additional_operators=False, **kwargs):
super().__init__(lookup, **kwargs)
self.additional_operators = additional_operators
def expression_validator(self, value):
try:
if value in ('.*', ''):
return
if not self.additional_operators:
for op in ('!&', '&&'):
if op in value:
raise ValidationError(_('To pole nie dopuszcza operatora %(op)s.'), params={'op': op}, code='invalid')
if '!&' in value:
# TODO remove this when implemented
raise ValidationError('Operator !& nie jest jeszcze zaimplementowany.', code='invalid')
for v in value.split('!&'):
expr = self.expr_parser.parse(v)
if not expr.isliteral:
raise ValidationError(_('Operator !& nie dopuszcza zagnieżdżonych wyrażeń: %(expr)s.'), params={'expr': v.strip()}, code='invalid')
else:
self.expr_parser.literal_validator(expr.get_symbols()[0])
return
values = value.split('&&')
exprs = list(map(self.expr_parser.parse, values))
for expr in exprs:
for symbol in expr.get_symbols():
self.expr_parser.literal_validator(symbol)
# calls to self.expr_parser.parse will raise exceptions if the expression is malformed
except boolean.boolean.ParseError as pe:
raise ValidationError(_('Niepoprawne wyrażenie: %(msg)s.'), params={'msg': _(str(pe))}, code='invalid')
class RangesQueryManager(ExpressionQueryManager):
expr_parser = RangesAlgebra()
def literal2intervals(self, literal):
# a literal may be negated or have no operator attribute
try:
op = literal.operator
except:
op = None
symbols = literal.get_symbols()
assert (len(symbols) == 1)
lo, hi = symbols[0].obj.strip('[]').split(',')
lo = int(lo) if lo != RangesQueryManager.expr_parser.OPEN_RANGE else -I.inf
hi = int(hi) if hi != RangesQueryManager.expr_parser.OPEN_RANGE else I.inf
interv = I.closed(lo, hi)
if op == '~':
interv = ~interv
return interv
def cnf2intervals(self, expr):
if type(expr) in (boolean.AND, boolean.OR):
subranges = list(map(self.cnf2intervals, expr.args))
op = operator.and_ if type(expr) == boolean.AND else operator.or_
return reduce(op, subranges)
if expr.isliteral:
return self.literal2intervals(expr)
1/0
def atomic_interval2query(self, interval, lookup):
lo, hi = None, None
if interval.lower != -I.inf:
lo = interval.lower
# open interval
if not interval.left:
lo += 1
if interval.upper != I.inf:
hi = interval.upper
# open interval
if not interval.right:
hi -= 1
if lo == hi:
# (-inf,+inf)
if lo == None:
return None
else:
return self.make_Q(lookup, lo)
else:
qs = []
if lo is not None:
qs.append(self.make_Q(lookup + '__gte', lo))
if hi is not None:
qs.append(self.make_Q(lookup + '__lte', hi))
return reduce(operator.and_, qs)
def _make_queries(self, lookup, value, conjunction):
if not value:
return [[]]
expr = self.expr_parser.parse(value)
cnf_expr = self.expr_parser.cnf(expr)
intervs = self.cnf2intervals(expr)._intervals
queries = list(filter(None, [self.atomic_interval2query(interval, lookup) for interval in intervs]))
if queries:
return [reduce(operator.or_, queries)]
else:
return []
'''
# TODO should this inherit after QueryManager???
class OuterQueryMixin(object):
# TODO (?):
# Using ‘&’ on Q objects yields the first behavior described in
# https://docs.djangoproject.com/en/2.2/topics/db/queries/#spanning-multi-valued-relationships
# Instead, a cascade of filter() calls seems necessary:
# https://stackoverflow.com/questions/6230897/django-combining-and-and-or-queries-with-manytomany-field
# but to keep consistent with the QueryManager interface (returning lists of Q objects),
# Q objects for individual object specifications are created the ugly way, using the __in lookup
def make_outer_queries(self, value):
object_queries = self._make_queries(self.object_lookup, value, conjunction=True)
outer_queries = []
print('-------', object_queries)
for queries in object_queries:
if not queries:
continue
print(' ---', queries)
objects = self.inner_class.objects.all()
for query in queries:
objects = objects.filter(query)
outer_queries.append(self.make_Q(self.outer_lookup, objects))
return outer_queries
'''
# TODO this got complicated, write more comments?
class RegexQueryManager(ExpressionQueryManager):#, OuterQueryMixin):
expr_parser = RegexAlgebra()
def literal2query(self, literal, lookup):
# a literal may be negated or have no operator attribute
try:
op = literal.operator
except:
op = None
symbols = literal.get_symbols()
assert (len(symbols) == 1)
q = self.make_Q(lookup + '__iregex', r'^{}$'.format(symbols[0].obj))
if op == '~':
q = ~q
return q
# the argument is assumed to be a conjunct of a CNF
# (e.g. either a literal or a disjunction of literals)
def disjunction2query(self, disjunction, lookup):
if disjunction.isliteral:
return self.literal2query(disjunction, lookup)
else:
assert (disjunction.operator == '|')
return reduce(operator.or_, (self.literal2query(a, lookup) for a in disjunction.args))
def cnf2queries(self, expr, lookup, tab=' '):
if expr.isliteral:
return [self.literal2query(expr, lookup)]
if type(expr) == boolean.boolean._TRUE:
return []
assert (expr.operator in '|&')
if expr.operator == '|':
return [self.disjunction2query(expr, lookup)]
else:
return [self.disjunction2query(disjunction, lookup) for disjunction in expr.args]
# TODO this operator is a horror...
# give up on generality and implement in subclasses when required?
# still looks potentially terribly inefficient...
def exclusive_and2queries(self, lookup, value, conjunction):
return []
# value has been validated as a proper expression
def _make_queries(self, lookup, value, conjunction):
if value == '.*':
return [[]]
if '!&' in value:
return self.exclusive_and2queries(lookup, value, conjunction)
values = value.split('&&')
exprs = list(map(self.expr_parser.parse, values))
#print('\n\n', ' * '.join(map(str, exprs)))
if not conjunction:
exprs = [reduce(operator.or_, exprs)]
#print(' * '.join(map(str, exprs)))
cnf_exprs = list(map(self.expr_parser.cnf, exprs))
#print(' * '.join(map(str, cnf_exprs)), '\n\n')
return [self.cnf2queries(e, lookup) for e in cnf_exprs]
def make_queries(self, value, conjunction):
# _make_queries will return a single list of queries when conjunction=False
return self._make_queries(self.lookup, value, conjunction=False)[0]
# TODO work-in-progress!!!
# for MultiValueField-based filter fields
# doesn’t support operator switching for component queries (TODO?)
class ComboQueryManager(QueryManager):#, OuterQueryMixin):
def __init__(self, inner_class, outer_lookup, managers, negation_field, **kwargs):
super().__init__('foo', 'bar', **kwargs)
self.inner_class = inner_class
self.outer_lookup = outer_lookup
self.managers = managers
# whether a special, first „negation” value is expected
self.negation_field = negation_field
def _make_queries(self, lookup, values, conjunction):
if lookup != self.object_lookup:
raise RuntimeError
negate = values[0] if self.negation_field else False
query_values = values[1:] if self.negation_field else values
queries = [manager.make_queries(value) for value, manager in zip(query_values, self.managers) if value is not None]
# The inner_class instances we want to retrieve must satisfy
# all the sub-queries at once, so & can be used.
# We assume the sub-managers return singleton lists of queries.
# TODO Is this always the case?
if queries:
query = reduce(operator.and_, chain(*queries))
if negate:
query = ~query
print(query)
return [[query]]
else:
return [[]]
def make_queries(self, values):
# _make_queries will return a single list of queries
return self._make_queries(self.lookup, values, conjunction=False)[0]