import operator import re # https://anaconda.org/conda-forge/boolean.py import boolean # https://pypi.org/project/python-intervals/ import intervals as I from functools import reduce from itertools import chain from django.core.exceptions import ValidationError from django.db.models import Q from django.utils.translation import gettext as _ # TODO update ''' QueryManager and its subclasses implement make_queries() method returning lists of Q objects to be applied in a cascade of filter() calls. The use of Q objects is necessary to allow for alternatives in queries: the union() and intersection() methods of QuerySets yield a QuerySet that does not support further filtering. ''' DUMMY_LOOKUP = 'DUMMY' # https://docs.djangoproject.com/en/2.2/ref/forms/validation/#raising-validationerror # ValidationError params don’t work with str.format(), must use old-style % formatting class QueryManager(object): def __init__(self, lookup, default_conjunction=True): self.lookup = lookup self.default_conjunction=default_conjunction # https://stackoverflow.com/questions/310732/in-django-how-does-one-filter-a-queryset-with-dynamic-field-lookups def make_Q(self, lookup, value): return Q(**{lookup : value}) def make_queries(self, value, conjunction): if self.lookup == DUMMY_LOOKUP: return [] return self._make_queries(self.lookup, value, conjunction) def _make_queries(self, lookup, value, conjunction): raise NotImplementedError class SingleValueQueryManager(QueryManager): def _make_queries(self, lookup, value, conjunction): return [self.make_Q(lookup, value)] class SingleRegexQueryManager(SingleValueQueryManager): def make_Q(self, lookup, value): return super().make_Q(lookup + '__iregex', r'^{}$'.format(value)) class MultiValueQueryManager(QueryManager): def _make_queries(self, lookup, values, conjunction): queries = [self.make_Q(lookup, value) for value in values] if conjunction: return list(queries) else: return [reduce(operator.or_, queries)] class ExpressionAlgebra(boolean.BooleanAlgebra): TOKENS = None # override to do sth with the symbol (e.g. escape a regex) def make_symbol(self, symbol): return symbol def valid_symbol_begin(self, char): raise NotImplementedError def allowed(self, char): raise NotImplementedError def literal_validator(self, literal): raise NotImplementedError # modified from boolean.BooleanAlgebra.tokenize def tokenize(self, expr): if not isinstance(expr, str): raise TypeError('expr must be string but it is %s.' % type(expr)) position = 0 length = len(expr) while position < length: tok = expr[position] sym = self.valid_symbol_begin(tok) if sym: position += 1 while position < length: char = expr[position] if self.allowed(char): position += 1 tok += char else: break position -= 1 try: yield self.TOKENS[tok.lower()], tok, position except KeyError: if sym: yield boolean.TOKEN_SYMBOL, self.make_symbol(tok), position elif tok not in (' ', '\t', '\r', '\n'): raise boolean.ParseError(token_string=tok, position=position, error_code=1) position += 1 class RangesAlgebra(ExpressionAlgebra): TOKENS = { '&' : boolean.TOKEN_AND, 'and' : boolean.TOKEN_AND, '|' : boolean.TOKEN_OR, 'or' : boolean.TOKEN_OR, '~' : boolean.TOKEN_NOT, '!' : boolean.TOKEN_NOT, 'not' : boolean.TOKEN_NOT, } OPEN_RANGE = '*' def valid_symbol_begin(self, char): return char == '[' def allowed(self, char): return char.isdigit() or char in ',]' + self.OPEN_RANGE def literal_validator(self, literal): literal = literal.obj if literal[0] != '[' or literal[-1] != ']': raise ValidationError(_('Zakres musi być ograniczony nawiasami kwadratowymi [...]: %(x)s.'), params={'x': literal}, code='invalid') inside = literal[1:-1] ends = [x.strip() for x in inside.split(',')] if len(ends) != 2: raise ValidationError(_('Zakres musi mieć dwa końce (podano %(n)d): %(x)s.'), params={'n' : len(ends), 'x': literal}, code='invalid') lo, hi = ends for e in (lo, hi): if not e.isdigit() and e != self.OPEN_RANGE: raise ValidationError(_('Ograniczenie zakresu musi być liczbą lub znakiem %(c)s: %(x)s.'), params={'c' : self.OPEN_RANGE, 'x': e}, code='invalid') if lo.isdigit() and hi.isdigit() and int(lo) > int(hi): raise ValidationError(_('Pusty zakres: %(x)s.'), params={'x': literal}, code='invalid') class RegexAlgebra(ExpressionAlgebra): TOKENS = { '&' : boolean.TOKEN_AND, 'and' : boolean.TOKEN_AND, '|' : boolean.TOKEN_OR, 'or' : boolean.TOKEN_OR, '~' : boolean.TOKEN_NOT, '!' : boolean.TOKEN_NOT, 'not' : boolean.TOKEN_NOT, } ALLOWED = '.?*+,()_[]{}- ' def make_symbol(self, symbol): # Can’t use re.escape – we only want to escape parentheses ret = '' for char in symbol: if char in '()[]{}': ret += '\\' ret += char return ret def valid_symbol_begin(self, char): # TODO what else can a valid regex start with in the context of filters? return char.isalnum() or char in self.ALLOWED def allowed(self, char): return char.isalnum() or char in self.ALLOWED def literal_validator(self, literal): try: re.compile(literal.obj) except re.error as e: raise ValidationError(_('Niepoprawne wyrażenie regularne: %(x)s (%(msg)s).'), params={'x' : literal.obj, 'msg': _(str(e))}, code='invalid') class ExpressionQueryManager(QueryManager): expr_parser = None def __init__(self, lookup, additional_operators=False, **kwargs): super().__init__(lookup, **kwargs) self.additional_operators = additional_operators def expression_validator(self, value): try: if value in ('.*', ''): return if not self.additional_operators: for op in ('!&', '&&'): if op in value: raise ValidationError(_('To pole nie dopuszcza operatora %(op)s.'), params={'op': op}, code='invalid') if '!&' in value: # TODO remove this when implemented raise ValidationError('Operator !& nie jest jeszcze zaimplementowany.', code='invalid') for v in value.split('!&'): expr = self.expr_parser.parse(v) if not expr.isliteral: raise ValidationError(_('Operator !& nie dopuszcza zagnieżdżonych wyrażeń: %(expr)s.'), params={'expr': v.strip()}, code='invalid') else: self.expr_parser.literal_validator(expr.get_symbols()[0]) return values = value.split('&&') exprs = list(map(self.expr_parser.parse, values)) for expr in exprs: for symbol in expr.get_symbols(): self.expr_parser.literal_validator(symbol) # calls to self.expr_parser.parse will raise exceptions if the expression is malformed except boolean.boolean.ParseError as pe: raise ValidationError(_('Niepoprawne wyrażenie: %(msg)s.'), params={'msg': _(str(pe))}, code='invalid') class RangesQueryManager(ExpressionQueryManager): expr_parser = RangesAlgebra() def literal2intervals(self, literal): # a literal may be negated or have no operator attribute try: op = literal.operator except: op = None symbols = literal.get_symbols() assert (len(symbols) == 1) lo, hi = symbols[0].obj.strip('[]').split(',') lo = int(lo) if lo != RangesQueryManager.expr_parser.OPEN_RANGE else -I.inf hi = int(hi) if hi != RangesQueryManager.expr_parser.OPEN_RANGE else I.inf interv = I.closed(lo, hi) if op == '~': interv = ~interv return interv def cnf2intervals(self, expr): if type(expr) in (boolean.AND, boolean.OR): subranges = list(map(self.cnf2intervals, expr.args)) op = operator.and_ if type(expr) == boolean.AND else operator.or_ return reduce(op, subranges) if expr.isliteral: return self.literal2intervals(expr) 1/0 def atomic_interval2query(self, interval, lookup): lo, hi = None, None if interval.lower != -I.inf: lo = interval.lower # open interval if not interval.left: lo += 1 if interval.upper != I.inf: hi = interval.upper # open interval if not interval.right: hi -= 1 if lo == hi: # (-inf,+inf) if lo == None: return None else: return self.make_Q(lookup, lo) else: qs = [] if lo is not None: qs.append(self.make_Q(lookup + '__gte', lo)) if hi is not None: qs.append(self.make_Q(lookup + '__lte', hi)) return reduce(operator.and_, qs) def _make_queries(self, lookup, value, conjunction): if not value: return [[]] expr = self.expr_parser.parse(value) cnf_expr = self.expr_parser.cnf(expr) intervs = self.cnf2intervals(expr)._intervals queries = list(filter(None, [self.atomic_interval2query(interval, lookup) for interval in intervs])) if queries: return [reduce(operator.or_, queries)] else: return [] ''' # TODO should this inherit after QueryManager??? class OuterQueryMixin(object): # TODO (?): # Using ‘&’ on Q objects yields the first behavior described in # https://docs.djangoproject.com/en/2.2/topics/db/queries/#spanning-multi-valued-relationships # Instead, a cascade of filter() calls seems necessary: # https://stackoverflow.com/questions/6230897/django-combining-and-and-or-queries-with-manytomany-field # but to keep consistent with the QueryManager interface (returning lists of Q objects), # Q objects for individual object specifications are created the ugly way, using the __in lookup def make_outer_queries(self, value): object_queries = self._make_queries(self.object_lookup, value, conjunction=True) outer_queries = [] print('-------', object_queries) for queries in object_queries: if not queries: continue print(' ---', queries) objects = self.inner_class.objects.all() for query in queries: objects = objects.filter(query) outer_queries.append(self.make_Q(self.outer_lookup, objects)) return outer_queries ''' # TODO this got complicated, write more comments? class RegexQueryManager(ExpressionQueryManager):#, OuterQueryMixin): expr_parser = RegexAlgebra() def literal2query(self, literal, lookup): # a literal may be negated or have no operator attribute try: op = literal.operator except: op = None symbols = literal.get_symbols() assert (len(symbols) == 1) q = self.make_Q(lookup + '__iregex', r'^{}$'.format(symbols[0].obj)) if op == '~': q = ~q return q # the argument is assumed to be a conjunct of a CNF # (e.g. either a literal or a disjunction of literals) def disjunction2query(self, disjunction, lookup): if disjunction.isliteral: return self.literal2query(disjunction, lookup) else: assert (disjunction.operator == '|') return reduce(operator.or_, (self.literal2query(a, lookup) for a in disjunction.args)) def cnf2queries(self, expr, lookup, tab=' '): if expr.isliteral: return [self.literal2query(expr, lookup)] if type(expr) == boolean.boolean._TRUE: return [] assert (expr.operator in '|&') if expr.operator == '|': return [self.disjunction2query(expr, lookup)] else: return [self.disjunction2query(disjunction, lookup) for disjunction in expr.args] # TODO this operator is a horror... # give up on generality and implement in subclasses when required? # still looks potentially terribly inefficient... def exclusive_and2queries(self, lookup, value, conjunction): return [] # value has been validated as a proper expression def _make_queries(self, lookup, value, conjunction): if value == '.*': return [[]] if '!&' in value: return self.exclusive_and2queries(lookup, value, conjunction) values = value.split('&&') exprs = list(map(self.expr_parser.parse, values)) #print('\n\n', ' * '.join(map(str, exprs))) if not conjunction: exprs = [reduce(operator.or_, exprs)] #print(' * '.join(map(str, exprs))) cnf_exprs = list(map(self.expr_parser.cnf, exprs)) #print(' * '.join(map(str, cnf_exprs)), '\n\n') return [self.cnf2queries(e, lookup) for e in cnf_exprs] def make_queries(self, value, conjunction): # _make_queries will return a single list of queries when conjunction=False return self._make_queries(self.lookup, value, conjunction=False)[0] # TODO work-in-progress!!! # for MultiValueField-based filter fields # doesn’t support operator switching for component queries (TODO?) class ComboQueryManager(QueryManager):#, OuterQueryMixin): def __init__(self, inner_class, outer_lookup, managers, negation_field, **kwargs): super().__init__('foo', 'bar', **kwargs) self.inner_class = inner_class self.outer_lookup = outer_lookup self.managers = managers # whether a special, first „negation” value is expected self.negation_field = negation_field def _make_queries(self, lookup, values, conjunction): if lookup != self.object_lookup: raise RuntimeError negate = values[0] if self.negation_field else False query_values = values[1:] if self.negation_field else values queries = [manager.make_queries(value) for value, manager in zip(query_values, self.managers) if value is not None] # The inner_class instances we want to retrieve must satisfy # all the sub-queries at once, so & can be used. # We assume the sub-managers return singleton lists of queries. # TODO Is this always the case? if queries: query = reduce(operator.and_, chain(*queries)) if negate: query = ~query print(query) return [[query]] else: return [[]] def make_queries(self, values): # _make_queries will return a single list of queries return self._make_queries(self.lookup, values, conjunction=False)[0]