# HG changeset patch # User Sylvain Thénault # Date 1253104138 -7200 # Node ID 1a6f7a0e7dbd0151d5fecab183f4861e20c64b0a # Parent 8604a15995d11ea172f913ef5a23c8b103df57a1 unrelated_rql now considers relation's add perm diff -r 8604a15995d1 -r 1a6f7a0e7dbd entity.py --- a/entity.py Wed Sep 16 14:24:31 2009 +0200 +++ b/entity.py Wed Sep 16 14:28:58 2009 +0200 @@ -15,12 +15,14 @@ from logilab.common.deprecation import deprecated from logilab.mtconverter import TransformData, TransformError, xml_escape +from rql import parse from rql.utils import rqlvar_maker from cubicweb import Unauthorized from cubicweb.rset import ResultSet from cubicweb.selectors import yes from cubicweb.appobject import AppObject +from cubicweb.rqlrewrite import RQLRewriter from cubicweb.schema import RQLVocabularyConstraint, RQLConstraint, bw_normalize_etype from cubicweb.common.uilib import printable_value, soup2xhtml @@ -736,7 +738,10 @@ def unrelated_rql(self, rtype, targettype, role, ordermethod=None, vocabconstraints=True): """build a rql to fetch `targettype` entities unrelated to this entity - using (rtype, role) relation + using (rtype, role) relation. + + Consider relation permissions so that returned entities may be actually + linked by `rtype`. """ ordermethod = ordermethod or 'fetch_unrelated_order' if isinstance(rtype, basestring): @@ -749,8 +754,17 @@ objtype, subjtype = self.e_schema, targettype if self.has_eid(): restriction = ['NOT S %s O' % rtype, '%s eid %%(x)s' % evar] + args = {'x': self.eid} + if role == 'subject': + securitycheck_args = {'fromeid': self.eid} + else: + securitycheck_args = {'toeid': self.eid} else: restriction = [] + args = {} + securitycheck_args = {} + insertsecurity = (rtype.has_local_role('add') and not + rtype.has_perm(self.req, 'add', **securitycheck_args)) constraints = rtype.rproperty(subjtype, objtype, 'constraints') if vocabconstraints: # RQLConstraint is a subclass for RQLVocabularyConstraint, so they @@ -767,20 +781,29 @@ if not ' ORDERBY ' in rql: before, after = rql.split(' WHERE ', 1) rql = '%s ORDERBY %s WHERE %s' % (before, searchedvar, after) - return rql + if insertsecurity: + rqlexprs = rtype.get_rqlexprs('add') + rewriter = RQLRewriter(self.req) + rqlst = self.req.vreg.parse(self.req, rql, args) + for select in rqlst.children: + rewriter.rewrite(select, [((searchedvar, searchedvar), rqlexprs)], + select.solutions, args) + rql = rqlst.as_string() + return rql, args def unrelated(self, rtype, targettype, role='subject', limit=None, ordermethod=None): """return a result set of target type objects that may be related by a given relation, with self as subject or object """ - rql = self.unrelated_rql(rtype, targettype, role, ordermethod) + try: + rql, args = self.unrelated_rql(rtype, targettype, role, ordermethod) + except Unauthorized: + return self.req.empty_rset() if limit is not None: before, after = rql.split(' WHERE ', 1) rql = '%s LIMIT %s WHERE %s' % (before, limit, after) - if self.has_eid(): - return self.req.execute(rql, {'x': self.eid}) - return self.req.execute(rql) + return self.req.execute(rql, args, tuple(args.keys())) # relations cache handling ################################################ diff -r 8604a15995d1 -r 1a6f7a0e7dbd test/unittest_entity.py --- a/test/unittest_entity.py Wed Sep 16 14:24:31 2009 +0200 +++ b/test/unittest_entity.py Wed Sep 16 14:28:58 2009 +0200 @@ -9,9 +9,10 @@ from datetime import datetime -from cubicweb import Binary +from cubicweb import Binary, Unauthorized from cubicweb.devtools.apptest import EnvBasedTC from cubicweb.common.mttransforms import HAS_TAL +from cubicweb.entities import fetch_config class EntityTC(EnvBasedTC): @@ -179,7 +180,6 @@ Societe.fetch_attrs = sfetch_attrs def test_related_rql(self): - from cubicweb.entities import fetch_config Personne = self.vreg['etypes'].etype_class('Personne') Note = self.vreg['etypes'].etype_class('Note') self.failUnless(issubclass(self.vreg['etypes'].etype_class('SubNote'), Note)) @@ -194,7 +194,40 @@ self.assertEquals(p.related_rql('evaluee'), 'Any X,AA ORDERBY Z DESC WHERE X modification_date Z, E eid %(x)s, E evaluee X, X modification_date AA') - def test_entity_unrelated(self): + def test_unrelated_rql_security_1(self): + user = self.request().user + rql = user.unrelated_rql('use_email', 'EmailAddress', 'subject')[0] + self.assertEquals(rql, 'Any O,AA,AB,AC ORDERBY AC DESC ' + 'WHERE NOT S use_email O, S eid %(x)s, O is EmailAddress, O address AA, O alias AB, O modification_date AC') + self.create_user('toto') + self.login('toto') + user = self.request().user + rql = user.unrelated_rql('use_email', 'EmailAddress', 'subject')[0] + self.assertEquals(rql, 'Any O,AA,AB,AC ORDERBY AC DESC ' + 'WHERE NOT S use_email O, S eid %(x)s, O is EmailAddress, O address AA, O alias AB, O modification_date AC') + user = self.execute('Any X WHERE X login "admin"').get_entity(0, 0) + self.assertRaises(Unauthorized, user.unrelated_rql, 'use_email', 'EmailAddress', 'subject') + self.login('anon') + user = self.request().user + self.assertRaises(Unauthorized, user.unrelated_rql, 'use_email', 'EmailAddress', 'subject') + + def test_unrelated_rql_security_2(self): + email = self.execute('INSERT EmailAddress X: X address "hop"').get_entity(0, 0) + rql = email.unrelated_rql('use_email', 'CWUser', 'object')[0] + self.assertEquals(rql, 'Any S,AA,AB,AC,AD ORDERBY AA ASC ' + 'WHERE NOT S use_email O, O eid %(x)s, S is CWUser, S login AA, S firstname AB, S surname AC, S modification_date AD') + #rql = email.unrelated_rql('use_email', 'Person', 'object')[0] + #self.assertEquals(rql, '') + self.login('anon') + email = self.execute('Any X WHERE X eid %(x)s', {'x': email.eid}, 'x').get_entity(0, 0) + rql = email.unrelated_rql('use_email', 'CWUser', 'object')[0] + self.assertEquals(rql, 'Any S,AA,AB,AC,AD ORDERBY AA ' + 'WHERE NOT S use_email O, O eid %(x)s, S is CWUser, S login AA, S firstname AB, S surname AC, S modification_date AD, ' + 'A eid %(B)s, EXISTS(S identity A, NOT A in_group C, C name "guests", C is CWGroup)') + #rql = email.unrelated_rql('use_email', 'Person', 'object')[0] + #self.assertEquals(rql, '') + + def test_unrelated_base(self): p = self.add_entity('Personne', nom=u'di mascio', prenom=u'adrien') e = self.add_entity('Tag', name=u'x') related = [r.eid for r in e.tags] @@ -206,14 +239,40 @@ unrelated = [r[0] for r in e.unrelated('tags', 'Personne', 'subject')] self.failIf(p.eid in unrelated) - def test_entity_unrelated_limit(self): + def test_unrelated_limit(self): e = self.add_entity('Tag', name=u'x') self.add_entity('Personne', nom=u'di mascio', prenom=u'adrien') - self.add_entity('Personne', nom=u'di mascio', prenom=u'gwen') + self.add_entity('Personne', nom=u'thenault', prenom=u'sylvain') self.assertEquals(len(e.unrelated('tags', 'Personne', 'subject', limit=1)), 1) - def test_new_entity_unrelated(self): + def test_unrelated_security(self): + email = self.execute('INSERT EmailAddress X: X address "hop"').get_entity(0, 0) + rset = email.unrelated('use_email', 'CWUser', 'object') + self.assertEquals([x.login for x in rset.entities()], [u'admin', u'anon']) + user = self.request().user + rset = user.unrelated('use_email', 'EmailAddress', 'subject') + self.assertEquals([x.address for x in rset.entities()], [u'hop']) + self.create_user('toto') + self.login('toto') + email = self.execute('Any X WHERE X eid %(x)s', {'x': email.eid}, 'x').get_entity(0, 0) + rset = email.unrelated('use_email', 'CWUser', 'object') + self.assertEquals([x.login for x in rset.entities()], ['toto']) + user = self.request().user + rset = user.unrelated('use_email', 'EmailAddress', 'subject') + self.assertEquals([x.address for x in rset.entities()], ['hop']) + user = self.execute('Any X WHERE X login "admin"').get_entity(0, 0) + rset = user.unrelated('use_email', 'EmailAddress', 'subject') + self.assertEquals([x.address for x in rset.entities()], []) + self.login('anon') + email = self.execute('Any X WHERE X eid %(x)s', {'x': email.eid}, 'x').get_entity(0, 0) + rset = email.unrelated('use_email', 'CWUser', 'object') + self.assertEquals([x.login for x in rset.entities()], []) + user = self.request().user + rset = user.unrelated('use_email', 'EmailAddress', 'subject') + self.assertEquals([x.address for x in rset.entities()], []) + + def test_unrelated_new_entity(self): e = self.etype_instance('CWUser') unrelated = [r[0] for r in e.unrelated('in_group', 'CWGroup', 'subject')] # should be default groups but owners, i.e. managers, users, guests