[entity vocabulary] refactor unrelated rql to allow usage of RQLRewriter to insert schema constraints. Closes #1561806
authorSylvain Thénault <sylvain.thenault@logilab.fr>
Thu, 31 Mar 2011 15:23:52 +0200 (2011-03-31)
changeset 7154 5e2f93b88d86
parent 7153 7df83a6d17c0
child 7156 b1521f1546e0
[entity vocabulary] refactor unrelated rql to allow usage of RQLRewriter to insert schema constraints. Closes #1561806
cwvreg.py
entity.py
test/data/schema.py
test/unittest_entity.py
--- a/cwvreg.py	Thu Mar 31 15:23:50 2011 +0200
+++ b/cwvreg.py	Thu Mar 31 15:23:52 2011 +0200
@@ -312,6 +312,10 @@
         kwargs['clear'] = True
         super(ETypeRegistry, self).register(obj, **kwargs)
 
+    def iter_classes(self):
+        for etype in self.vreg.schema.entities():
+            yield self.etype_class(etype)
+
     @cached
     def parent_classes(self, etype):
         if etype == 'Any':
--- a/entity.py	Thu Mar 31 15:23:50 2011 +0200
+++ b/entity.py	Thu Mar 31 15:23:52 2011 +0200
@@ -157,6 +157,7 @@
     def fetch_rql(cls, user, restriction=None, fetchattrs=None, mainvar='X',
                   settype=True, ordermethod='fetch_order'):
         """return a rql to fetch all entities of the class type"""
+        # XXX update api and implementation to AST manipulation (see unrelated rql)
         restrictions = restriction or []
         if settype:
             restrictions.append('%s is %s' % (mainvar, cls.__regid__))
@@ -753,7 +754,7 @@
     # generic vocabulary methods ##############################################
 
     def cw_unrelated_rql(self, rtype, targettype, role, ordermethod=None,
-                      vocabconstraints=True):
+                         vocabconstraints=True):
         """build a rql to fetch `targettype` entities unrelated to this entity
         using (rtype, role) relation.
 
@@ -763,13 +764,20 @@
         ordermethod = ordermethod or 'fetch_unrelated_order'
         if isinstance(rtype, basestring):
             rtype = self._cw.vreg.schema.rschema(rtype)
+        rdef = rtype.role_rdef(self.e_schema, targettype, role)
+        rewriter = RQLRewriter(self._cw)
+        # initialize some variables according to the `role` of `self` in the
+        # relation:
+        # * variable for myself (`evar`) and searched entities (`searchvedvar`)
+        # * entity type of the subject (`subjtype`) and of the object
+        #   (`objtype`) of the relation
         if role == 'subject':
             evar, searchedvar = 'S', 'O'
             subjtype, objtype = self.e_schema, targettype
         else:
             searchedvar, evar = 'S', 'O'
             objtype, subjtype = self.e_schema, targettype
-        rdef = rtype.role_rdef(self.e_schema, targettype, role)
+        # initialize some variables according to `self` existance
         if self.has_eid():
             restriction = ['NOT S %s O' % rtype]
             if rdef.role_cardinality(role) not in '?1':
@@ -777,19 +785,24 @@
                 restriction.append('%s eid %%(x)s' % evar)
             args = {'x': self.eid}
             if role == 'subject':
-                securitycheck_args = {'fromeid': self.eid}
+                sec_check_args = {'fromeid': self.eid}
             else:
-                securitycheck_args = {'toeid': self.eid}
+                sec_check_args = {'toeid': self.eid}
+            existant = None # instead of 'SO', improve perfs
         else:
             if rdef.role_cardinality(role) in '?1':
                 restriction = ['NOT S %s O' % rtype]
             else:
                 restriction = []
             args = {}
-            securitycheck_args = {}
-        insertsecurity = (rdef.has_local_role('add') and not
-                          rdef.has_perm(self._cw, 'add', **securitycheck_args))
-        # XXX consider constraint.mainvars to check if constraint apply
+            sec_check_args = {}
+            existant = searchedvar
+        # retreive entity class for targettype to compute base rql
+        etypecls = self._cw.vreg['etypes'].etype_class(targettype)
+        rql = etypecls.fetch_rql(self._cw.user, restriction,
+                                 mainvar=searchedvar, ordermethod=ordermethod)
+        select = self._cw.vreg.parse(self._cw, rql, args).children[0]
+        # insert RQL expressions for schema constraints into the rql syntax tree
         if vocabconstraints:
             # RQLConstraint is a subclass for RQLVocabularyConstraint, so they
             # will be included as well
@@ -797,33 +810,31 @@
         else:
             cstrcls = RQLConstraint
         for cstr in rdef.constraints:
-            if isinstance(cstr, RQLVocabularyConstraint) and searchedvar in cstr.mainvars:
+            # consider constraint.mainvars to check if constraint apply
+            if isinstance(cstr, cstrcls) and searchedvar in cstr.mainvars:
                 if not self.has_eid() and evar in cstr.mainvars:
                     continue
-                restriction.append(cstr.expression)
-        etypecls = self._cw.vreg['etypes'].etype_class(targettype)
-        rql = etypecls.fetch_rql(self._cw.user, restriction,
-                                 mainvar=searchedvar, ordermethod=ordermethod)
+                # compute a varmap suitable to RQLRewriter.rewrite argument
+                varmap = dict((v, v) for v in 'SO' if v in select.defined_vars
+                              and v in cstr.mainvars)
+                # rewrite constraint by constraint since we want a AND between
+                # expressions.
+                rewriter.rewrite(select, [(varmap, (cstr,))], select.solutions,
+                                 args, existant)
+        # insert security RQL expressions granting the permission to 'add' the
+        # relation into the rql syntax tree, if necessary
+        rqlexprs = rdef.get_rqlexprs('add')
+        if rqlexprs and not rdef.has_perm(self._cw, 'add', **sec_check_args):
+            # compute a varmap suitable to RQLRewriter.rewrite argument
+            varmap = dict((v, v) for v in 'SO' if v in select.defined_vars)
+            # rewrite all expressions at once since we want a OR between them.
+            rewriter.rewrite(select, [(varmap, rqlexprs)], select.solutions,
+                             args, existant)
         # ensure we have an order defined
-        if not ' ORDERBY ' in rql:
-            before, after = rql.split(' WHERE ', 1)
-            rql = '%s ORDERBY %s WHERE %s' % (before, searchedvar, after)
-        if insertsecurity:
-            rqlexprs = rdef.get_rqlexprs('add')
-            rewriter = RQLRewriter(self._cw)
-            rqlst = self._cw.vreg.parse(self._cw, rql, args)
-            if not self.has_eid():
-                existant = searchedvar
-            else:
-                existant = None # instead of 'SO', improve perfs
-            for select in rqlst.children:
-                varmap = {}
-                for var in 'SO':
-                    if var in select.defined_vars:
-                        varmap[var] = var
-                rewriter.rewrite(select, [(varmap, rqlexprs)],
-                                 select.solutions, args, existant)
-            rql = rqlst.as_string()
+        if not select.orderby:
+            select.add_sort_var(select.defined_vars[searchedvar])
+        # we're done, turn the rql syntax tree as a string
+        rql = select.as_string()
         return rql, args
 
     def unrelated(self, rtype, targettype, role='subject', limit=None,
@@ -835,6 +846,7 @@
             rql, args = self.cw_unrelated_rql(rtype, targettype, role, ordermethod)
         except Unauthorized:
             return self._cw.empty_rset()
+        # XXX should be set in unrelated rql when manipulating the AST
         if limit is not None:
             before, after = rql.split(' WHERE ', 1)
             rql = '%s LIMIT %s WHERE %s' % (before, limit, after)
--- a/test/data/schema.py	Thu Mar 31 15:23:50 2011 +0200
+++ b/test/data/schema.py	Thu Mar 31 15:23:52 2011 +0200
@@ -31,7 +31,10 @@
         'Personne', symmetric=True,
         constraints=[
             RQLConstraint('NOT S identity O'),
-            RQLVocabularyConstraint('NOT (S connait P, P nom "toto")')])
+            # conflicting constraints, see cw_unrelated_rql tests in
+            # unittest_entity.py
+            RQLVocabularyConstraint('NOT (S connait P, P nom "toto")'),
+            RQLVocabularyConstraint('S travaille P, P nom "tutu"')])
 
 class Societe(EntityType):
     nom = String()
--- a/test/unittest_entity.py	Thu Mar 31 15:23:50 2011 +0200
+++ b/test/unittest_entity.py	Thu Mar 31 15:23:52 2011 +0200
@@ -1,5 +1,5 @@
 # -*- coding: utf-8 -*-
-# copyright 2003-2010 LOGILAB S.A. (Paris, FRANCE), all rights reserved.
+# copyright 2003-2011 LOGILAB S.A. (Paris, FRANCE), all rights reserved.
 # contact http://www.logilab.fr/ -- mailto:contact@logilab.fr
 #
 # This file is part of CubicWeb.
@@ -19,7 +19,7 @@
 """unit tests for cubicweb.web.views.entities module"""
 
 from datetime import datetime
-
+from logilab.common import tempattr
 from cubicweb import Binary, Unauthorized
 from cubicweb.devtools.testlib import CubicWebTC
 from cubicweb.mttransforms import HAS_TAL
@@ -29,6 +29,17 @@
 
 class EntityTC(CubicWebTC):
 
+    def setUp(self):
+        super(EntityTC, self).setUp()
+        self.backup_dict = {}
+        for cls in self.vreg['etypes'].iter_classes():
+            self.backup_dict[cls] = (cls.fetch_attrs, cls.fetch_order)
+
+    def tearDown(self):
+        super(EntityTC, self).tearDown()
+        for cls in self.vreg['etypes'].iter_classes():
+            cls.fetch_attrs, cls.fetch_order = self.backup_dict[cls]
+
     def test_boolean_value(self):
         e = self.vreg['etypes'].etype_class('CWUser')(self.request())
         self.failUnless(e)
@@ -136,17 +147,19 @@
         Note = self.vreg['etypes'].etype_class('Note')
         peschema = Personne.e_schema
         seschema = Societe.e_schema
-        peschema.subjrels['travaille'].rdef(peschema, seschema).cardinality = '1*'
-        peschema.subjrels['connait'].rdef(peschema, peschema).cardinality = '11'
-        peschema.subjrels['evaluee'].rdef(peschema, Note.e_schema).cardinality = '1*'
-        seschema.subjrels['evaluee'].rdef(seschema, Note.e_schema).cardinality = '1*'
-        # testing basic fetch_attrs attribute
-        self.assertEqual(Personne.fetch_rql(user),
-                          'Any X,AA,AB,AC ORDERBY AA ASC '
-                          'WHERE X is Personne, X nom AA, X prenom AB, X modification_date AC')
-        pfetch_attrs = Personne.fetch_attrs
-        sfetch_attrs = Societe.fetch_attrs
+        torestore = []
+        for rdef, card in [(peschema.subjrels['travaille'].rdef(peschema, seschema), '1*'),
+                           (peschema.subjrels['connait'].rdef(peschema, peschema), '11'),
+                           (peschema.subjrels['evaluee'].rdef(peschema, Note.e_schema), '1*'),
+                           (seschema.subjrels['evaluee'].rdef(seschema, Note.e_schema), '1*')]:
+            cm = tempattr(rdef, 'cardinality', card)
+            cm.__enter__()
+            torestore.append(cm)
         try:
+            # testing basic fetch_attrs attribute
+            self.assertEqual(Personne.fetch_rql(user),
+                              'Any X,AA,AB,AC ORDERBY AA ASC '
+                              'WHERE X is Personne, X nom AA, X prenom AB, X modification_date AC')
             # testing unknown attributes
             Personne.fetch_attrs = ('bloug', 'beep')
             self.assertEqual(Personne.fetch_rql(user), 'Any X WHERE X is Personne')
@@ -185,8 +198,9 @@
                               'Any X,AA,AB ORDERBY AA ASC WHERE X is Personne, X nom AA, X prenom AB')
             # XXX test unauthorized attribute
         finally:
-            Personne.fetch_attrs = pfetch_attrs
-            Societe.fetch_attrs = sfetch_attrs
+            # fetch_attrs restored by generic tearDown
+            for cm in torestore:
+                cm.__exit__(None, None, None)
 
     def test_related_rql_base(self):
         Personne = self.vreg['etypes'].etype_class('Personne')
@@ -227,7 +241,7 @@
         user = self.request().user
         rql = user.cw_unrelated_rql('use_email', 'EmailAddress', 'subject')[0]
         self.assertEqual(rql, 'Any O,AA,AB,AC ORDERBY AC DESC '
-                         'WHERE NOT S use_email O, S eid %(x)s, '
+                         'WHERE NOT EXISTS(S use_email O), S eid %(x)s, '
                          'O is EmailAddress, O address AA, O alias AB, O modification_date AC')
 
     def test_unrelated_rql_security_1_user(self):
@@ -236,7 +250,7 @@
         user = self.request().user
         rql = user.cw_unrelated_rql('use_email', 'EmailAddress', 'subject')[0]
         self.assertEqual(rql, 'Any O,AA,AB,AC ORDERBY AC DESC '
-                          'WHERE NOT S use_email O, S eid %(x)s, '
+                          'WHERE NOT EXISTS(S use_email O), S eid %(x)s, '
                          'O is EmailAddress, O address AA, O alias AB, O modification_date AC')
         user = self.execute('Any X WHERE X login "admin"').get_entity(0, 0)
         rql = user.cw_unrelated_rql('use_email', 'EmailAddress', 'subject')[0]
@@ -257,49 +271,59 @@
     def test_unrelated_rql_security_2(self):
         email = self.execute('INSERT EmailAddress X: X address "hop"').get_entity(0, 0)
         rql = email.cw_unrelated_rql('use_email', 'CWUser', 'object')[0]
-        self.assertEqual(rql, 'Any S,AA,AB,AC,AD ORDERBY AA ASC '
-                          'WHERE NOT S use_email O, O eid %(x)s, S is CWUser, S login AA, S firstname AB, S surname AC, S modification_date AD')
+        self.assertEqual(rql, 'Any S,AA,AB,AC,AD ORDERBY AA '
+                         'WHERE NOT EXISTS(S use_email O), S is CWUser, '
+                         'S login AA, S firstname AB, S surname AC, S modification_date AD')
         self.login('anon')
         email = self.execute('Any X WHERE X eid %(x)s', {'x': email.eid}).get_entity(0, 0)
         rql = email.cw_unrelated_rql('use_email', 'CWUser', 'object')[0]
         self.assertEqual(rql, 'Any S,AA,AB,AC,AD ORDERBY AA '
-                          'WHERE NOT EXISTS(S use_email O), O eid %(x)s, S is CWUser, S login AA, S firstname AB, S surname AC, S modification_date AD, '
-                          'A eid %(B)s, EXISTS(S identity A, NOT A in_group C, C name "guests", C is CWGroup)')
+                         'WHERE NOT EXISTS(S use_email O, O is EmailAddress), S is CWUser, '
+                         'S login AA, S firstname AB, S surname AC, S modification_date AD, '
+                         'A eid %(B)s, EXISTS(S identity A, NOT A in_group C, C name "guests", C is CWGroup)')
 
     def test_unrelated_rql_security_nonexistant(self):
         self.login('anon')
         email = self.vreg['etypes'].etype_class('EmailAddress')(self.request())
         rql = email.cw_unrelated_rql('use_email', 'CWUser', 'object')[0]
         self.assertEqual(rql, 'Any S,AA,AB,AC,AD ORDERBY AA '
-                          'WHERE S is CWUser, S login AA, S firstname AB, S surname AC, S modification_date AD, '
-                          'A eid %(B)s, EXISTS(S identity A, NOT A in_group C, C name "guests", C is CWGroup)')
+                         'WHERE NOT EXISTS(S use_email O, O is EmailAddress), S is CWUser, '
+                         'S login AA, S firstname AB, S surname AC, S modification_date AD, '
+                         'A eid %(B)s, EXISTS(S identity A, NOT A in_group C, C name "guests", C is CWGroup)')
 
     def test_unrelated_rql_constraints_creation_subject(self):
         person = self.vreg['etypes'].etype_class('Personne')(self.request())
         rql = person.cw_unrelated_rql('connait', 'Personne', 'subject')[0]
-        self.assertEqual(rql, 'Any O,AA,AB,AC ORDERBY AC DESC WHERE '
-                         'O is Personne, O nom AA, O prenom AB, O modification_date AC')
+        self.assertEqual(
+            rql, 'Any O,AA,AB,AC ORDERBY AC DESC WHERE '
+            'O is Personne, O nom AA, O prenom AB, O modification_date AC')
 
     def test_unrelated_rql_constraints_creation_object(self):
         person = self.vreg['etypes'].etype_class('Personne')(self.request())
         rql = person.cw_unrelated_rql('connait', 'Personne', 'object')[0]
-        self.assertEqual(rql, 'Any S,AA,AB,AC ORDERBY AC DESC WHERE '
-                         'NOT (S connait P, P nom "toto"), S is Personne, S nom AA, '
-                         'S prenom AB, S modification_date AC')
+        self.assertEqual(
+            rql, 'Any S,AA,AB,AC ORDERBY AC DESC WHERE '
+            'S is Personne, S nom AA, S prenom AB, S modification_date AC, '
+            'NOT (S connait A, A nom "toto"), A is Personne, EXISTS(S travaille B, B nom "tutu")')
 
     def test_unrelated_rql_constraints_edition_subject(self):
         person = self.request().create_entity('Personne', nom=u'sylvain')
         rql = person.cw_unrelated_rql('connait', 'Personne', 'subject')[0]
-        self.assertEqual(rql, 'Any O,AA,AB,AC ORDERBY AC DESC WHERE '
-                         'NOT S connait O, S eid %(x)s, NOT S identity O, O is Personne, '
-                         'O nom AA, O prenom AB, O modification_date AC')
+        self.assertEqual(
+            rql, 'Any O,AA,AB,AC ORDERBY AC DESC WHERE '
+            'NOT EXISTS(S connait O), S eid %(x)s, O is Personne, '
+            'O nom AA, O prenom AB, O modification_date AC, '
+            'NOT S identity O')
 
     def test_unrelated_rql_constraints_edition_object(self):
         person = self.request().create_entity('Personne', nom=u'sylvain')
         rql = person.cw_unrelated_rql('connait', 'Personne', 'object')[0]
-        self.assertEqual(rql, 'Any S,AA,AB,AC ORDERBY AC DESC WHERE '
-                         'NOT S connait O, O eid %(x)s, NOT S identity O, NOT (S connait P, '
-                         'P nom "toto"), S is Personne, S nom AA, S prenom AB, S modification_date AC')
+        self.assertEqual(
+            rql, 'Any S,AA,AB,AC ORDERBY AC DESC WHERE '
+            'NOT EXISTS(S connait O), O eid %(x)s, S is Personne, '
+            'S nom AA, S prenom AB, S modification_date AC, '
+            'NOT S identity O, NOT (S connait A, A nom "toto"), '
+            'EXISTS(S travaille B, B nom "tutu")')
 
     def test_unrelated_base(self):
         req = self.request()