Skip to content
Snippets Groups Projects
Select Git revision
  • 47ae4a11e18c280113dab3812a5ae88baa2ea80f
  • master default protected
  • vertical_relations
  • lu_without_semantic_frames
  • hierarchy
  • additional-unification-filters
  • v0.1.1
  • v0.1.0
  • v0.0.9
  • v0.0.8
  • v0.0.7
  • v0.0.6
  • v0.0.5
  • v0.0.4
  • v0.0.3
  • v0.0.2
  • v0.0.1
17 results

query_managers.py

Blame
  • user avatar
    Kasia Krasnowska authored
    * added local filtering
    * added missing control heuristics and reporting for compar etc.
    * udpated phrase descriptions
    * switched to new Morfeusz homonym identifiers
    ca00bbcc
    History
    query_managers.py 15.47 KiB
    import operator
    import re
    
    # https://anaconda.org/conda-forge/boolean.py
    import boolean
    # https://pypi.org/project/python-intervals/
    import intervals as I
    
    from functools import reduce
    from itertools import chain
    
    from django.core.exceptions import ValidationError
    from django.db.models import Q
    from django.utils.translation import gettext as _
    
    # TODO update
    '''
    QueryManager and its subclasses implement make_queries() method
    returning lists of Q objects to be applied in a cascade of filter() calls.
    The use of Q objects is necessary to allow for alternatives in queries:
    the union() and intersection() methods of QuerySets yield a QuerySet that does not support
    further filtering.
    '''
    
    DUMMY_LOOKUP = 'DUMMY'
    
    # https://docs.djangoproject.com/en/2.2/ref/forms/validation/#raising-validationerror
    # ValidationError params don’t work with str.format(), must use old-style % formatting
    
    class QueryManager(object):
        
        def __init__(self, lookup, default_conjunction=True):
            self.lookup = lookup
            self.default_conjunction=default_conjunction
        
        # https://stackoverflow.com/questions/310732/in-django-how-does-one-filter-a-queryset-with-dynamic-field-lookups
        def make_Q(self, lookup, value):
            return Q(**{lookup : value})
            
        def make_queries(self, value, conjunction):
            if self.lookup == DUMMY_LOOKUP:
                return []
            return self._make_queries(self.lookup, value, conjunction)
        
        def _make_queries(self, lookup, value, conjunction):
            raise NotImplementedError
    
    class SingleValueQueryManager(QueryManager):
        
        def _make_queries(self, lookup, value, conjunction):
            return [self.make_Q(lookup, value)]
    
    class SingleRegexQueryManager(SingleValueQueryManager):
        
        def make_Q(self, lookup, value):
            return super().make_Q(lookup + '__iregex', r'^{}$'.format(value))
        
    class MultiValueQueryManager(QueryManager):
        
        def _make_queries(self, lookup, values, conjunction):
            queries = [self.make_Q(lookup, value) for value in values]
            if conjunction:
                return list(queries)
            else:
                return [reduce(operator.or_, queries)]
    
    class ExpressionAlgebra(boolean.BooleanAlgebra):
        
        TOKENS = None
        
        # override to do sth with the symbol (e.g. escape a regex)
        def make_symbol(self, symbol):
            return symbol
        
        def valid_symbol_begin(self, char):
            raise NotImplementedError
        
        def allowed(self, char):
            raise NotImplementedError
        
        def literal_validator(self, literal):
            raise NotImplementedError
        
        # modified from boolean.BooleanAlgebra.tokenize
        def tokenize(self, expr):
            if not isinstance(expr, str):
                raise TypeError('expr must be string but it is %s.' % type(expr))
            position = 0
            length = len(expr)
            while position < length:
                tok = expr[position]
                sym = self.valid_symbol_begin(tok)
                if sym:
                    position += 1
                    while position < length:
                        char = expr[position]
                        if self.allowed(char):
                            position += 1
                            tok += char
                        else:
                            break
                    position -= 1
                try:
                    yield self.TOKENS[tok.lower()], tok, position
                except KeyError:
                    if sym:
                        yield boolean.TOKEN_SYMBOL, self.make_symbol(tok), position
                    elif tok not in (' ', '\t', '\r', '\n'):
                        raise boolean.ParseError(token_string=tok, position=position, error_code=1)
                position += 1
    
    class RangesAlgebra(ExpressionAlgebra):
        
        TOKENS = {
            '&'   : boolean.TOKEN_AND,
            'and' : boolean.TOKEN_AND,
            '|'   : boolean.TOKEN_OR,
            'or'  : boolean.TOKEN_OR,
            '~'   : boolean.TOKEN_NOT,
            '!'   : boolean.TOKEN_NOT,
            'not' : boolean.TOKEN_NOT,
        }
        
        OPEN_RANGE = '*'
        
        def valid_symbol_begin(self, char):
            return char == '['
        
        def allowed(self, char):
            return char.isdigit() or char in ',]' + self.OPEN_RANGE
        
        def literal_validator(self, literal):
            literal = literal.obj
            if literal[0] != '[' or literal[-1] != ']':
                raise ValidationError(_('Zakres musi być ograniczony nawiasami kwadratowymi [...]: %(x)s.'), params={'x': literal}, code='invalid')
            inside = literal[1:-1]
            ends = [x.strip() for x in inside.split(',')]
            if len(ends) != 2:
                raise ValidationError(_('Zakres musi mieć dwa końce (podano %(n)d): %(x)s.'), params={'n' : len(ends), 'x': literal}, code='invalid')
            lo, hi = ends
            for e in (lo, hi):
                if not e.isdigit() and e != self.OPEN_RANGE:
                    raise ValidationError(_('Ograniczenie zakresu musi być liczbą lub znakiem %(c)s: %(x)s.'), params={'c' : self.OPEN_RANGE, 'x': e}, code='invalid')
            if lo.isdigit() and hi.isdigit() and int(lo) > int(hi):
                raise ValidationError(_('Pusty zakres: %(x)s.'), params={'x': literal}, code='invalid')
        
    class RegexAlgebra(ExpressionAlgebra):
        
        TOKENS = {
            '&'   : boolean.TOKEN_AND,
            'and' : boolean.TOKEN_AND,
            '|'   : boolean.TOKEN_OR,
            'or'  : boolean.TOKEN_OR,
            '~'   : boolean.TOKEN_NOT,
            '!'   : boolean.TOKEN_NOT,
            'not' : boolean.TOKEN_NOT,
        }
        
        ALLOWED = '.?*+,()_[]{}- '
        
        def make_symbol(self, symbol):
            # Can’t use re.escape – we only want to escape parentheses
            ret = ''
            for char in symbol:
                if char in '()[]{}':
                    ret += '\\'
                ret += char
            return ret
        
        def valid_symbol_begin(self, char):
            # TODO what else can a valid regex start with in the context of filters?
            return char.isalnum() or char in self.ALLOWED
        
        def allowed(self, char):
            return char.isalnum() or char in self.ALLOWED
        
        def literal_validator(self, literal):
            try:
                re.compile(literal.obj)
            except re.error as e:
                raise ValidationError(_('Niepoprawne wyrażenie regularne: %(x)s (%(msg)s).'), params={'x' : literal.obj, 'msg': _(str(e))}, code='invalid')
    
    class ExpressionQueryManager(QueryManager):
        
        expr_parser = None
        
        def __init__(self, lookup, additional_operators=False, **kwargs):
            super().__init__(lookup, **kwargs)
            self.additional_operators = additional_operators
        
        def expression_validator(self, value):
            try:
                if value in ('.*', ''):
                    return
                if not self.additional_operators:
                    for op in ('!&', '&&'):
                        if op in value:
                            raise ValidationError(_('To pole nie dopuszcza operatora %(op)s.'), params={'op': op}, code='invalid')
                if '!&' in value:
                    # TODO remove this when implemented
                    raise ValidationError('Operator !& nie jest jeszcze zaimplementowany.', code='invalid')
                    for v in value.split('!&'):
                        expr = self.expr_parser.parse(v)
                        if not expr.isliteral:
                            raise ValidationError(_('Operator !& nie dopuszcza zagnieżdżonych wyrażeń: %(expr)s.'), params={'expr': v.strip()}, code='invalid')
                        else:
                            self.expr_parser.literal_validator(expr.get_symbols()[0])
                    return
                values = value.split('&&')
                exprs = list(map(self.expr_parser.parse, values))
                for expr in exprs:
                    for symbol in expr.get_symbols():
                        self.expr_parser.literal_validator(symbol)
            # calls to self.expr_parser.parse will raise exceptions if the expression is malformed
            except boolean.boolean.ParseError as pe:
                raise ValidationError(_('Niepoprawne wyrażenie: %(msg)s.'), params={'msg': _(str(pe))}, code='invalid')
    
    class RangesQueryManager(ExpressionQueryManager):
        
        expr_parser = RangesAlgebra()
        
        def literal2intervals(self, literal):
            # a literal may be negated or have no operator attribute 
            try:
                op = literal.operator
            except:
                op = None
            symbols = literal.get_symbols()
            assert (len(symbols) == 1)
            lo, hi = symbols[0].obj.strip('[]').split(',')
            lo = int(lo) if lo != RangesQueryManager.expr_parser.OPEN_RANGE else -I.inf
            hi = int(hi) if hi != RangesQueryManager.expr_parser.OPEN_RANGE else I.inf
            interv = I.closed(lo, hi)
            if op == '~':
                interv = ~interv
            return interv
            
        def cnf2intervals(self, expr):
            if type(expr) in (boolean.AND, boolean.OR):
                subranges = list(map(self.cnf2intervals, expr.args))
                op = operator.and_ if type(expr) == boolean.AND else operator.or_
                return reduce(op, subranges)
            if expr.isliteral:
                return self.literal2intervals(expr)
            1/0
        
        def atomic_interval2query(self, interval, lookup):
            lo, hi = None, None
            if interval.lower != -I.inf:
                lo = interval.lower
                # open interval
                if not interval.left:
                    lo += 1
            if interval.upper != I.inf:
                hi = interval.upper
                # open interval
                if not interval.right:
                    hi -= 1
            if lo == hi:
                # (-inf,+inf)
                if lo == None:
                    return None
                else:
                    return self.make_Q(lookup, lo)
            else:
                qs = []
                if lo is not None:
                    qs.append(self.make_Q(lookup + '__gte', lo))
                if hi is not None:
                    qs.append(self.make_Q(lookup + '__lte', hi))
                return reduce(operator.and_, qs)
        
        def _make_queries(self, lookup, value, conjunction):
            if not value:
                return [[]]
            expr = self.expr_parser.parse(value)
            cnf_expr = self.expr_parser.cnf(expr)
            intervs = self.cnf2intervals(expr)._intervals
            queries = list(filter(None, [self.atomic_interval2query(interval, lookup) for interval in intervs]))
            if queries:
                return [reduce(operator.or_, queries)]
            else:
                return []
    
    '''
    # TODO should this inherit after QueryManager???
    class OuterQueryMixin(object):
        
        # TODO (?):
        # Using ‘&’ on Q objects yields the first behavior described in
        # https://docs.djangoproject.com/en/2.2/topics/db/queries/#spanning-multi-valued-relationships
        # Instead, a cascade of filter() calls seems necessary:
        # https://stackoverflow.com/questions/6230897/django-combining-and-and-or-queries-with-manytomany-field
        # but to keep consistent with the QueryManager interface (returning lists of Q objects),
        # Q objects for individual object specifications are created the ugly way, using the __in lookup
        def make_outer_queries(self, value):
            object_queries = self._make_queries(self.object_lookup, value, conjunction=True)
            outer_queries = []
            print('-------', object_queries)
            for queries in object_queries:
                if not queries:
                    continue
                print('    ---', queries)
                objects = self.inner_class.objects.all()
                for query in queries:
                    objects = objects.filter(query)
                outer_queries.append(self.make_Q(self.outer_lookup, objects))
            return outer_queries
    '''
    
    # TODO this got complicated, write more comments?
    class RegexQueryManager(ExpressionQueryManager):#, OuterQueryMixin):
        
        expr_parser = RegexAlgebra()
        
        def literal2query(self, literal, lookup):
            # a literal may be negated or have no operator attribute 
            try:
                op = literal.operator
            except:
                op = None
            symbols = literal.get_symbols()
            assert (len(symbols) == 1)
            q = self.make_Q(lookup + '__iregex', r'^{}$'.format(symbols[0].obj))
            if op == '~':
                q = ~q
            return q
            
        # the argument is assumed to be a conjunct of a CNF
        # (e.g. either a literal or a disjunction of literals)
        def disjunction2query(self, disjunction, lookup):
            if disjunction.isliteral:
                return self.literal2query(disjunction, lookup)
            else:
                assert (disjunction.operator == '|')
                return reduce(operator.or_, (self.literal2query(a, lookup) for a in disjunction.args))
        
        def cnf2queries(self, expr, lookup, tab=' '):
            if expr.isliteral:
                return [self.literal2query(expr, lookup)]
            if type(expr) == boolean.boolean._TRUE:
                return []
            assert (expr.operator in '|&')
            if expr.operator == '|':
                return [self.disjunction2query(expr, lookup)]
            else:
                return [self.disjunction2query(disjunction, lookup) for disjunction in expr.args]
        
        # TODO this operator is a horror...
        # give up on generality and implement in subclasses when required?
        # still looks potentially terribly inefficient...
        def exclusive_and2queries(self, lookup, value, conjunction):
            return []
        
        # value has been validated as a proper expression
        def _make_queries(self, lookup, value, conjunction):
            if value == '.*':
                return [[]]
            if '!&' in value:
                return self.exclusive_and2queries(lookup, value, conjunction)
            values = value.split('&&')
            exprs = list(map(self.expr_parser.parse, values))
            #print('\n\n', ' * '.join(map(str, exprs)))
            if not conjunction:
                exprs = [reduce(operator.or_, exprs)]
            #print(' * '.join(map(str, exprs)))
            cnf_exprs = list(map(self.expr_parser.cnf, exprs))
            #print(' * '.join(map(str, cnf_exprs)), '\n\n')
            return [self.cnf2queries(e, lookup) for e in cnf_exprs]
        
        def make_queries(self, value, conjunction):
            # _make_queries will return a single list of queries when conjunction=False
            return self._make_queries(self.lookup, value, conjunction=False)[0]
        
    # TODO work-in-progress!!!
    # for MultiValueField-based filter fields
    # doesn’t support operator switching for component queries (TODO?)
    class ComboQueryManager(QueryManager):#, OuterQueryMixin):
    
        def __init__(self, inner_class, outer_lookup, managers, negation_field, **kwargs):
            super().__init__('foo', 'bar', **kwargs)
            self.inner_class = inner_class
            self.outer_lookup = outer_lookup
            self.managers = managers
            # whether a special, first „negation” value is expected 
            self.negation_field = negation_field
        
        def _make_queries(self, lookup, values, conjunction):
            if lookup != self.object_lookup:
                raise RuntimeError
            negate = values[0] if self.negation_field else False
            query_values = values[1:] if self.negation_field else values
            queries = [manager.make_queries(value) for value, manager in zip(query_values, self.managers) if value is not None]
            # The inner_class instances we want to retrieve must satisfy
            # all the sub-queries at once, so & can be used.
            # We assume the sub-managers return singleton lists of queries.
            # TODO Is this always the case?
            if queries:
                query = reduce(operator.and_, chain(*queries))
                if negate:
                    query = ~query
                print(query)
                return [[query]]
            else:
                return [[]]
        
        def make_queries(self, values):
            # _make_queries will return a single list of queries
            return self._make_queries(self.lookup, values, conjunction=False)[0]