# HG changeset patch # User Sylvain Thénault # Date 1301577832 -7200 # Node ID 5e2f93b88d86520c1700b3101cacd46492ba0ab0 # Parent 7df83a6d17c06adee5176542d0392948a126b00c [entity vocabulary] refactor unrelated rql to allow usage of RQLRewriter to insert schema constraints. Closes #1561806 diff -r 7df83a6d17c0 -r 5e2f93b88d86 cwvreg.py --- a/cwvreg.py Thu Mar 31 15:23:50 2011 +0200 +++ b/cwvreg.py Thu Mar 31 15:23:52 2011 +0200 @@ -312,6 +312,10 @@ kwargs['clear'] = True super(ETypeRegistry, self).register(obj, **kwargs) + def iter_classes(self): + for etype in self.vreg.schema.entities(): + yield self.etype_class(etype) + @cached def parent_classes(self, etype): if etype == 'Any': diff -r 7df83a6d17c0 -r 5e2f93b88d86 entity.py --- a/entity.py Thu Mar 31 15:23:50 2011 +0200 +++ b/entity.py Thu Mar 31 15:23:52 2011 +0200 @@ -157,6 +157,7 @@ def fetch_rql(cls, user, restriction=None, fetchattrs=None, mainvar='X', settype=True, ordermethod='fetch_order'): """return a rql to fetch all entities of the class type""" + # XXX update api and implementation to AST manipulation (see unrelated rql) restrictions = restriction or [] if settype: restrictions.append('%s is %s' % (mainvar, cls.__regid__)) @@ -753,7 +754,7 @@ # generic vocabulary methods ############################################## def cw_unrelated_rql(self, rtype, targettype, role, ordermethod=None, - vocabconstraints=True): + vocabconstraints=True): """build a rql to fetch `targettype` entities unrelated to this entity using (rtype, role) relation. @@ -763,13 +764,20 @@ ordermethod = ordermethod or 'fetch_unrelated_order' if isinstance(rtype, basestring): rtype = self._cw.vreg.schema.rschema(rtype) + rdef = rtype.role_rdef(self.e_schema, targettype, role) + rewriter = RQLRewriter(self._cw) + # initialize some variables according to the `role` of `self` in the + # relation: + # * variable for myself (`evar`) and searched entities (`searchvedvar`) + # * entity type of the subject (`subjtype`) and of the object + # (`objtype`) of the relation if role == 'subject': evar, searchedvar = 'S', 'O' subjtype, objtype = self.e_schema, targettype else: searchedvar, evar = 'S', 'O' objtype, subjtype = self.e_schema, targettype - rdef = rtype.role_rdef(self.e_schema, targettype, role) + # initialize some variables according to `self` existance if self.has_eid(): restriction = ['NOT S %s O' % rtype] if rdef.role_cardinality(role) not in '?1': @@ -777,19 +785,24 @@ restriction.append('%s eid %%(x)s' % evar) args = {'x': self.eid} if role == 'subject': - securitycheck_args = {'fromeid': self.eid} + sec_check_args = {'fromeid': self.eid} else: - securitycheck_args = {'toeid': self.eid} + sec_check_args = {'toeid': self.eid} + existant = None # instead of 'SO', improve perfs else: if rdef.role_cardinality(role) in '?1': restriction = ['NOT S %s O' % rtype] else: restriction = [] args = {} - securitycheck_args = {} - insertsecurity = (rdef.has_local_role('add') and not - rdef.has_perm(self._cw, 'add', **securitycheck_args)) - # XXX consider constraint.mainvars to check if constraint apply + sec_check_args = {} + existant = searchedvar + # retreive entity class for targettype to compute base rql + etypecls = self._cw.vreg['etypes'].etype_class(targettype) + rql = etypecls.fetch_rql(self._cw.user, restriction, + mainvar=searchedvar, ordermethod=ordermethod) + select = self._cw.vreg.parse(self._cw, rql, args).children[0] + # insert RQL expressions for schema constraints into the rql syntax tree if vocabconstraints: # RQLConstraint is a subclass for RQLVocabularyConstraint, so they # will be included as well @@ -797,33 +810,31 @@ else: cstrcls = RQLConstraint for cstr in rdef.constraints: - if isinstance(cstr, RQLVocabularyConstraint) and searchedvar in cstr.mainvars: + # consider constraint.mainvars to check if constraint apply + if isinstance(cstr, cstrcls) and searchedvar in cstr.mainvars: if not self.has_eid() and evar in cstr.mainvars: continue - restriction.append(cstr.expression) - etypecls = self._cw.vreg['etypes'].etype_class(targettype) - rql = etypecls.fetch_rql(self._cw.user, restriction, - mainvar=searchedvar, ordermethod=ordermethod) + # compute a varmap suitable to RQLRewriter.rewrite argument + varmap = dict((v, v) for v in 'SO' if v in select.defined_vars + and v in cstr.mainvars) + # rewrite constraint by constraint since we want a AND between + # expressions. + rewriter.rewrite(select, [(varmap, (cstr,))], select.solutions, + args, existant) + # insert security RQL expressions granting the permission to 'add' the + # relation into the rql syntax tree, if necessary + rqlexprs = rdef.get_rqlexprs('add') + if rqlexprs and not rdef.has_perm(self._cw, 'add', **sec_check_args): + # compute a varmap suitable to RQLRewriter.rewrite argument + varmap = dict((v, v) for v in 'SO' if v in select.defined_vars) + # rewrite all expressions at once since we want a OR between them. + rewriter.rewrite(select, [(varmap, rqlexprs)], select.solutions, + args, existant) # ensure we have an order defined - if not ' ORDERBY ' in rql: - before, after = rql.split(' WHERE ', 1) - rql = '%s ORDERBY %s WHERE %s' % (before, searchedvar, after) - if insertsecurity: - rqlexprs = rdef.get_rqlexprs('add') - rewriter = RQLRewriter(self._cw) - rqlst = self._cw.vreg.parse(self._cw, rql, args) - if not self.has_eid(): - existant = searchedvar - else: - existant = None # instead of 'SO', improve perfs - for select in rqlst.children: - varmap = {} - for var in 'SO': - if var in select.defined_vars: - varmap[var] = var - rewriter.rewrite(select, [(varmap, rqlexprs)], - select.solutions, args, existant) - rql = rqlst.as_string() + if not select.orderby: + select.add_sort_var(select.defined_vars[searchedvar]) + # we're done, turn the rql syntax tree as a string + rql = select.as_string() return rql, args def unrelated(self, rtype, targettype, role='subject', limit=None, @@ -835,6 +846,7 @@ rql, args = self.cw_unrelated_rql(rtype, targettype, role, ordermethod) except Unauthorized: return self._cw.empty_rset() + # XXX should be set in unrelated rql when manipulating the AST if limit is not None: before, after = rql.split(' WHERE ', 1) rql = '%s LIMIT %s WHERE %s' % (before, limit, after) diff -r 7df83a6d17c0 -r 5e2f93b88d86 test/data/schema.py --- a/test/data/schema.py Thu Mar 31 15:23:50 2011 +0200 +++ b/test/data/schema.py Thu Mar 31 15:23:52 2011 +0200 @@ -31,7 +31,10 @@ 'Personne', symmetric=True, constraints=[ RQLConstraint('NOT S identity O'), - RQLVocabularyConstraint('NOT (S connait P, P nom "toto")')]) + # conflicting constraints, see cw_unrelated_rql tests in + # unittest_entity.py + RQLVocabularyConstraint('NOT (S connait P, P nom "toto")'), + RQLVocabularyConstraint('S travaille P, P nom "tutu"')]) class Societe(EntityType): nom = String() diff -r 7df83a6d17c0 -r 5e2f93b88d86 test/unittest_entity.py --- a/test/unittest_entity.py Thu Mar 31 15:23:50 2011 +0200 +++ b/test/unittest_entity.py Thu Mar 31 15:23:52 2011 +0200 @@ -1,5 +1,5 @@ # -*- coding: utf-8 -*- -# copyright 2003-2010 LOGILAB S.A. (Paris, FRANCE), all rights reserved. +# copyright 2003-2011 LOGILAB S.A. (Paris, FRANCE), all rights reserved. # contact http://www.logilab.fr/ -- mailto:contact@logilab.fr # # This file is part of CubicWeb. @@ -19,7 +19,7 @@ """unit tests for cubicweb.web.views.entities module""" from datetime import datetime - +from logilab.common import tempattr from cubicweb import Binary, Unauthorized from cubicweb.devtools.testlib import CubicWebTC from cubicweb.mttransforms import HAS_TAL @@ -29,6 +29,17 @@ class EntityTC(CubicWebTC): + def setUp(self): + super(EntityTC, self).setUp() + self.backup_dict = {} + for cls in self.vreg['etypes'].iter_classes(): + self.backup_dict[cls] = (cls.fetch_attrs, cls.fetch_order) + + def tearDown(self): + super(EntityTC, self).tearDown() + for cls in self.vreg['etypes'].iter_classes(): + cls.fetch_attrs, cls.fetch_order = self.backup_dict[cls] + def test_boolean_value(self): e = self.vreg['etypes'].etype_class('CWUser')(self.request()) self.failUnless(e) @@ -136,17 +147,19 @@ Note = self.vreg['etypes'].etype_class('Note') peschema = Personne.e_schema seschema = Societe.e_schema - peschema.subjrels['travaille'].rdef(peschema, seschema).cardinality = '1*' - peschema.subjrels['connait'].rdef(peschema, peschema).cardinality = '11' - peschema.subjrels['evaluee'].rdef(peschema, Note.e_schema).cardinality = '1*' - seschema.subjrels['evaluee'].rdef(seschema, Note.e_schema).cardinality = '1*' - # testing basic fetch_attrs attribute - self.assertEqual(Personne.fetch_rql(user), - 'Any X,AA,AB,AC ORDERBY AA ASC ' - 'WHERE X is Personne, X nom AA, X prenom AB, X modification_date AC') - pfetch_attrs = Personne.fetch_attrs - sfetch_attrs = Societe.fetch_attrs + torestore = [] + for rdef, card in [(peschema.subjrels['travaille'].rdef(peschema, seschema), '1*'), + (peschema.subjrels['connait'].rdef(peschema, peschema), '11'), + (peschema.subjrels['evaluee'].rdef(peschema, Note.e_schema), '1*'), + (seschema.subjrels['evaluee'].rdef(seschema, Note.e_schema), '1*')]: + cm = tempattr(rdef, 'cardinality', card) + cm.__enter__() + torestore.append(cm) try: + # testing basic fetch_attrs attribute + self.assertEqual(Personne.fetch_rql(user), + 'Any X,AA,AB,AC ORDERBY AA ASC ' + 'WHERE X is Personne, X nom AA, X prenom AB, X modification_date AC') # testing unknown attributes Personne.fetch_attrs = ('bloug', 'beep') self.assertEqual(Personne.fetch_rql(user), 'Any X WHERE X is Personne') @@ -185,8 +198,9 @@ 'Any X,AA,AB ORDERBY AA ASC WHERE X is Personne, X nom AA, X prenom AB') # XXX test unauthorized attribute finally: - Personne.fetch_attrs = pfetch_attrs - Societe.fetch_attrs = sfetch_attrs + # fetch_attrs restored by generic tearDown + for cm in torestore: + cm.__exit__(None, None, None) def test_related_rql_base(self): Personne = self.vreg['etypes'].etype_class('Personne') @@ -227,7 +241,7 @@ user = self.request().user rql = user.cw_unrelated_rql('use_email', 'EmailAddress', 'subject')[0] self.assertEqual(rql, 'Any O,AA,AB,AC ORDERBY AC DESC ' - 'WHERE NOT S use_email O, S eid %(x)s, ' + 'WHERE NOT EXISTS(S use_email O), S eid %(x)s, ' 'O is EmailAddress, O address AA, O alias AB, O modification_date AC') def test_unrelated_rql_security_1_user(self): @@ -236,7 +250,7 @@ user = self.request().user rql = user.cw_unrelated_rql('use_email', 'EmailAddress', 'subject')[0] self.assertEqual(rql, 'Any O,AA,AB,AC ORDERBY AC DESC ' - 'WHERE NOT S use_email O, S eid %(x)s, ' + 'WHERE NOT EXISTS(S use_email O), S eid %(x)s, ' 'O is EmailAddress, O address AA, O alias AB, O modification_date AC') user = self.execute('Any X WHERE X login "admin"').get_entity(0, 0) rql = user.cw_unrelated_rql('use_email', 'EmailAddress', 'subject')[0] @@ -257,49 +271,59 @@ def test_unrelated_rql_security_2(self): email = self.execute('INSERT EmailAddress X: X address "hop"').get_entity(0, 0) rql = email.cw_unrelated_rql('use_email', 'CWUser', 'object')[0] - self.assertEqual(rql, 'Any S,AA,AB,AC,AD ORDERBY AA ASC ' - 'WHERE NOT S use_email O, O eid %(x)s, S is CWUser, S login AA, S firstname AB, S surname AC, S modification_date AD') + self.assertEqual(rql, 'Any S,AA,AB,AC,AD ORDERBY AA ' + 'WHERE NOT EXISTS(S use_email O), S is CWUser, ' + 'S login AA, S firstname AB, S surname AC, S modification_date AD') self.login('anon') email = self.execute('Any X WHERE X eid %(x)s', {'x': email.eid}).get_entity(0, 0) rql = email.cw_unrelated_rql('use_email', 'CWUser', 'object')[0] self.assertEqual(rql, 'Any S,AA,AB,AC,AD ORDERBY AA ' - 'WHERE NOT EXISTS(S use_email O), O eid %(x)s, S is CWUser, S login AA, S firstname AB, S surname AC, S modification_date AD, ' - 'A eid %(B)s, EXISTS(S identity A, NOT A in_group C, C name "guests", C is CWGroup)') + 'WHERE NOT EXISTS(S use_email O, O is EmailAddress), S is CWUser, ' + 'S login AA, S firstname AB, S surname AC, S modification_date AD, ' + 'A eid %(B)s, EXISTS(S identity A, NOT A in_group C, C name "guests", C is CWGroup)') def test_unrelated_rql_security_nonexistant(self): self.login('anon') email = self.vreg['etypes'].etype_class('EmailAddress')(self.request()) rql = email.cw_unrelated_rql('use_email', 'CWUser', 'object')[0] self.assertEqual(rql, 'Any S,AA,AB,AC,AD ORDERBY AA ' - 'WHERE S is CWUser, S login AA, S firstname AB, S surname AC, S modification_date AD, ' - 'A eid %(B)s, EXISTS(S identity A, NOT A in_group C, C name "guests", C is CWGroup)') + 'WHERE NOT EXISTS(S use_email O, O is EmailAddress), S is CWUser, ' + 'S login AA, S firstname AB, S surname AC, S modification_date AD, ' + 'A eid %(B)s, EXISTS(S identity A, NOT A in_group C, C name "guests", C is CWGroup)') def test_unrelated_rql_constraints_creation_subject(self): person = self.vreg['etypes'].etype_class('Personne')(self.request()) rql = person.cw_unrelated_rql('connait', 'Personne', 'subject')[0] - self.assertEqual(rql, 'Any O,AA,AB,AC ORDERBY AC DESC WHERE ' - 'O is Personne, O nom AA, O prenom AB, O modification_date AC') + self.assertEqual( + rql, 'Any O,AA,AB,AC ORDERBY AC DESC WHERE ' + 'O is Personne, O nom AA, O prenom AB, O modification_date AC') def test_unrelated_rql_constraints_creation_object(self): person = self.vreg['etypes'].etype_class('Personne')(self.request()) rql = person.cw_unrelated_rql('connait', 'Personne', 'object')[0] - self.assertEqual(rql, 'Any S,AA,AB,AC ORDERBY AC DESC WHERE ' - 'NOT (S connait P, P nom "toto"), S is Personne, S nom AA, ' - 'S prenom AB, S modification_date AC') + self.assertEqual( + rql, 'Any S,AA,AB,AC ORDERBY AC DESC WHERE ' + 'S is Personne, S nom AA, S prenom AB, S modification_date AC, ' + 'NOT (S connait A, A nom "toto"), A is Personne, EXISTS(S travaille B, B nom "tutu")') def test_unrelated_rql_constraints_edition_subject(self): person = self.request().create_entity('Personne', nom=u'sylvain') rql = person.cw_unrelated_rql('connait', 'Personne', 'subject')[0] - self.assertEqual(rql, 'Any O,AA,AB,AC ORDERBY AC DESC WHERE ' - 'NOT S connait O, S eid %(x)s, NOT S identity O, O is Personne, ' - 'O nom AA, O prenom AB, O modification_date AC') + self.assertEqual( + rql, 'Any O,AA,AB,AC ORDERBY AC DESC WHERE ' + 'NOT EXISTS(S connait O), S eid %(x)s, O is Personne, ' + 'O nom AA, O prenom AB, O modification_date AC, ' + 'NOT S identity O') def test_unrelated_rql_constraints_edition_object(self): person = self.request().create_entity('Personne', nom=u'sylvain') rql = person.cw_unrelated_rql('connait', 'Personne', 'object')[0] - self.assertEqual(rql, 'Any S,AA,AB,AC ORDERBY AC DESC WHERE ' - 'NOT S connait O, O eid %(x)s, NOT S identity O, NOT (S connait P, ' - 'P nom "toto"), S is Personne, S nom AA, S prenom AB, S modification_date AC') + self.assertEqual( + rql, 'Any S,AA,AB,AC ORDERBY AC DESC WHERE ' + 'NOT EXISTS(S connait O), O eid %(x)s, S is Personne, ' + 'S nom AA, S prenom AB, S modification_date AC, ' + 'NOT S identity O, NOT (S connait A, A nom "toto"), ' + 'EXISTS(S travaille B, B nom "tutu")') def test_unrelated_base(self): req = self.request()