server/test/unittest_security.py
changeset 11057 0b59724cb3f2
parent 11052 058bb3dc685f
child 11058 23eb30449fe5
equal deleted inserted replaced
11052:058bb3dc685f 11057:0b59724cb3f2
     1 # copyright 2003-2014 LOGILAB S.A. (Paris, FRANCE), all rights reserved.
       
     2 # contact http://www.logilab.fr/ -- mailto:contact@logilab.fr
       
     3 #
       
     4 # This file is part of CubicWeb.
       
     5 #
       
     6 # CubicWeb is free software: you can redistribute it and/or modify it under the
       
     7 # terms of the GNU Lesser General Public License as published by the Free
       
     8 # Software Foundation, either version 2.1 of the License, or (at your option)
       
     9 # any later version.
       
    10 #
       
    11 # CubicWeb is distributed in the hope that it will be useful, but WITHOUT
       
    12 # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
       
    13 # FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for more
       
    14 # details.
       
    15 #
       
    16 # You should have received a copy of the GNU Lesser General Public License along
       
    17 # with CubicWeb.  If not, see <http://www.gnu.org/licenses/>.
       
    18 """functional tests for server'security"""
       
    19 
       
    20 from six.moves import range
       
    21 
       
    22 from logilab.common.testlib import unittest_main
       
    23 
       
    24 from cubicweb.devtools.testlib import CubicWebTC
       
    25 from cubicweb import Unauthorized, ValidationError, QueryError, Binary
       
    26 from cubicweb.schema import ERQLExpression
       
    27 from cubicweb.server.querier import get_local_checks, check_relations_read_access
       
    28 from cubicweb.server.utils import _CRYPTO_CTX
       
    29 
       
    30 
       
    31 class BaseSecurityTC(CubicWebTC):
       
    32 
       
    33     def setup_database(self):
       
    34         super(BaseSecurityTC, self).setup_database()
       
    35         with self.admin_access.client_cnx() as cnx:
       
    36             self.create_user(cnx, u'iaminusersgrouponly')
       
    37             hash = _CRYPTO_CTX.encrypt('oldpassword', scheme='des_crypt')
       
    38             self.create_user(cnx, u'oldpassword', password=Binary(hash.encode('ascii')))
       
    39 
       
    40 class LowLevelSecurityFunctionTC(BaseSecurityTC):
       
    41 
       
    42     def test_check_relation_read_access(self):
       
    43         rql = u'Personne U WHERE U nom "managers"'
       
    44         rqlst = self.repo.vreg.rqlhelper.parse(rql).children[0]
       
    45         nom = self.repo.schema['Personne'].rdef('nom')
       
    46         with self.temporary_permissions((nom, {'read': ('users', 'managers')})):
       
    47             with self.admin_access.repo_cnx() as cnx:
       
    48                 self.repo.vreg.solutions(cnx, rqlst, None)
       
    49                 check_relations_read_access(cnx, rqlst, {})
       
    50             with self.new_access(u'anon').repo_cnx() as cnx:
       
    51                 self.assertRaises(Unauthorized,
       
    52                                   check_relations_read_access,
       
    53                                   cnx, rqlst, {})
       
    54                 self.assertRaises(Unauthorized, cnx.execute, rql)
       
    55 
       
    56     def test_get_local_checks(self):
       
    57         rql = u'Personne U WHERE U nom "managers"'
       
    58         rqlst = self.repo.vreg.rqlhelper.parse(rql).children[0]
       
    59         with self.temporary_permissions(Personne={'read': ('users', 'managers')}):
       
    60             with self.admin_access.repo_cnx() as cnx:
       
    61                 self.repo.vreg.solutions(cnx, rqlst, None)
       
    62                 solution = rqlst.solutions[0]
       
    63                 localchecks = get_local_checks(cnx, rqlst, solution)
       
    64                 self.assertEqual({}, localchecks)
       
    65             with self.new_access(u'anon').repo_cnx() as cnx:
       
    66                 self.assertRaises(Unauthorized,
       
    67                                   get_local_checks,
       
    68                                   cnx, rqlst, solution)
       
    69                 self.assertRaises(Unauthorized, cnx.execute, rql)
       
    70 
       
    71     def test_upassword_not_selectable(self):
       
    72         with self.admin_access.repo_cnx() as cnx:
       
    73             self.assertRaises(Unauthorized,
       
    74                               cnx.execute, 'Any X,P WHERE X is CWUser, X upassword P')
       
    75         with self.new_access(u'iaminusersgrouponly').repo_cnx() as cnx:
       
    76             self.assertRaises(Unauthorized,
       
    77                               cnx.execute, 'Any X,P WHERE X is CWUser, X upassword P')
       
    78 
       
    79     def test_update_password(self):
       
    80         """Ensure that if a user's password is stored with a deprecated hash,
       
    81         it will be updated on next login
       
    82         """
       
    83         with self.repo.internal_cnx() as cnx:
       
    84             oldhash = cnx.system_sql("SELECT cw_upassword FROM cw_CWUser "
       
    85                                          "WHERE cw_login = 'oldpassword'").fetchone()[0]
       
    86             oldhash = self.repo.system_source.binary_to_str(oldhash)
       
    87             self.repo.close(self.repo.connect('oldpassword', password='oldpassword'))
       
    88             newhash = cnx.system_sql("SELECT cw_upassword FROM cw_CWUser "
       
    89                                      "WHERE cw_login = 'oldpassword'").fetchone()[0]
       
    90             newhash = self.repo.system_source.binary_to_str(newhash)
       
    91             self.assertNotEqual(oldhash, newhash)
       
    92             self.assertTrue(newhash.startswith(b'$6$'))
       
    93             self.repo.close(self.repo.connect('oldpassword', password='oldpassword'))
       
    94             newnewhash = cnx.system_sql("SELECT cw_upassword FROM cw_CWUser WHERE "
       
    95                                         "cw_login = 'oldpassword'").fetchone()[0]
       
    96             newnewhash = self.repo.system_source.binary_to_str(newnewhash)
       
    97             self.assertEqual(newhash, newnewhash)
       
    98 
       
    99 
       
   100 class SecurityRewritingTC(BaseSecurityTC):
       
   101     def hijack_source_execute(self):
       
   102         def syntax_tree_search(*args, **kwargs):
       
   103             self.query = (args, kwargs)
       
   104             return []
       
   105         self.repo.system_source.syntax_tree_search = syntax_tree_search
       
   106 
       
   107     def tearDown(self):
       
   108         self.repo.system_source.__dict__.pop('syntax_tree_search', None)
       
   109         super(SecurityRewritingTC, self).tearDown()
       
   110 
       
   111     def test_not_relation_read_security(self):
       
   112         with self.new_access(u'iaminusersgrouponly').repo_cnx() as cnx:
       
   113             self.hijack_source_execute()
       
   114             cnx.execute('Any U WHERE NOT A todo_by U, A is Affaire')
       
   115             self.assertEqual(self.query[0][1].as_string(),
       
   116                               'Any U WHERE NOT EXISTS(A todo_by U), A is Affaire')
       
   117             cnx.execute('Any U WHERE NOT EXISTS(A todo_by U), A is Affaire')
       
   118             self.assertEqual(self.query[0][1].as_string(),
       
   119                               'Any U WHERE NOT EXISTS(A todo_by U), A is Affaire')
       
   120 
       
   121 class SecurityTC(BaseSecurityTC):
       
   122 
       
   123     def setUp(self):
       
   124         super(SecurityTC, self).setUp()
       
   125         # implicitly test manager can add some entities
       
   126         with self.admin_access.repo_cnx() as cnx:
       
   127             cnx.execute("INSERT Affaire X: X sujet 'cool'")
       
   128             cnx.execute("INSERT Societe X: X nom 'logilab'")
       
   129             cnx.execute("INSERT Personne X: X nom 'bidule'")
       
   130             cnx.execute('INSERT CWGroup X: X name "staff"')
       
   131             cnx.commit()
       
   132 
       
   133     def test_insert_security(self):
       
   134         with self.new_access(u'anon').repo_cnx() as cnx:
       
   135             cnx.execute("INSERT Personne X: X nom 'bidule'")
       
   136             self.assertRaises(Unauthorized, cnx.commit)
       
   137             self.assertEqual(cnx.execute('Personne X').rowcount, 1)
       
   138 
       
   139     def test_insert_security_2(self):
       
   140         with self.new_access(u'anon').repo_cnx() as cnx:
       
   141             cnx.execute("INSERT Affaire X")
       
   142             self.assertRaises(Unauthorized, cnx.commit)
       
   143             # anon has no read permission on Affaire entities, so
       
   144             # rowcount == 0
       
   145             self.assertEqual(cnx.execute('Affaire X').rowcount, 0)
       
   146 
       
   147     def test_insert_rql_permission(self):
       
   148         # test user can only add une affaire related to a societe he owns
       
   149         with self.new_access(u'iaminusersgrouponly').repo_cnx() as cnx:
       
   150             cnx.execute("INSERT Affaire X: X sujet 'cool'")
       
   151             self.assertRaises(Unauthorized, cnx.commit)
       
   152         # test nothing has actually been inserted
       
   153         with self.admin_access.repo_cnx() as cnx:
       
   154             self.assertEqual(cnx.execute('Affaire X').rowcount, 1)
       
   155         with self.new_access(u'iaminusersgrouponly').repo_cnx() as cnx:
       
   156             cnx.execute("INSERT Affaire X: X sujet 'cool'")
       
   157             cnx.execute("INSERT Societe X: X nom 'chouette'")
       
   158             cnx.execute("SET A concerne S WHERE A sujet 'cool', S nom 'chouette'")
       
   159             cnx.commit()
       
   160 
       
   161     def test_update_security_1(self):
       
   162         with self.new_access(u'anon').repo_cnx() as cnx:
       
   163             # local security check
       
   164             cnx.execute( "SET X nom 'bidulechouette' WHERE X is Personne")
       
   165             self.assertRaises(Unauthorized, cnx.commit)
       
   166         with self.admin_access.repo_cnx() as cnx:
       
   167             self.assertEqual(cnx.execute('Personne X WHERE X nom "bidulechouette"').rowcount, 0)
       
   168 
       
   169     def test_update_security_2(self):
       
   170         with self.temporary_permissions(Personne={'read': ('users', 'managers'),
       
   171                                                   'add': ('guests', 'users', 'managers')}):
       
   172             with self.new_access(u'anon').repo_cnx() as cnx:
       
   173                 self.assertRaises(Unauthorized, cnx.execute,
       
   174                                   "SET X nom 'bidulechouette' WHERE X is Personne")
       
   175         # test nothing has actually been inserted
       
   176         with self.admin_access.repo_cnx() as cnx:
       
   177             self.assertEqual(cnx.execute('Personne X WHERE X nom "bidulechouette"').rowcount, 0)
       
   178 
       
   179     def test_update_security_3(self):
       
   180         with self.new_access(u'iaminusersgrouponly').repo_cnx() as cnx:
       
   181             cnx.execute("INSERT Personne X: X nom 'biduuule'")
       
   182             cnx.execute("INSERT Societe X: X nom 'looogilab'")
       
   183             cnx.execute("SET X travaille S WHERE X nom 'biduuule', S nom 'looogilab'")
       
   184 
       
   185     def test_insert_immutable_attribute_update(self):
       
   186         with self.admin_access.repo_cnx() as cnx:
       
   187             cnx.create_entity('Old', name=u'Babar')
       
   188             cnx.commit()
       
   189             # this should be equivalent
       
   190             o = cnx.create_entity('Old')
       
   191             o.cw_set(name=u'Celeste')
       
   192             cnx.commit()
       
   193 
       
   194     def test_update_rql_permission(self):
       
   195         with self.admin_access.repo_cnx() as cnx:
       
   196             cnx.execute("SET A concerne S WHERE A is Affaire, S is Societe")
       
   197             cnx.commit()
       
   198         # test user can only update une affaire related to a societe he owns
       
   199         with self.new_access(u'iaminusersgrouponly').repo_cnx() as cnx:
       
   200             cnx.execute("SET X sujet 'pascool' WHERE X is Affaire")
       
   201             # this won't actually do anything since the selection query won't return anything
       
   202             cnx.commit()
       
   203             # to actually get Unauthorized exception, try to update an entity we can read
       
   204             cnx.execute("SET X nom 'toto' WHERE X is Societe")
       
   205             self.assertRaises(Unauthorized, cnx.commit)
       
   206             cnx.execute("INSERT Affaire X: X sujet 'pascool'")
       
   207             cnx.execute("INSERT Societe X: X nom 'chouette'")
       
   208             cnx.execute("SET A concerne S WHERE A sujet 'pascool', S nom 'chouette'")
       
   209             cnx.execute("SET X sujet 'habahsicestcool' WHERE X sujet 'pascool'")
       
   210             cnx.commit()
       
   211 
       
   212     def test_delete_security(self):
       
   213         # FIXME: sample below fails because we don't detect "owner" can't delete
       
   214         # user anyway, and since no user with login == 'bidule' exists, no
       
   215         # exception is raised
       
   216         #user._groups = {'guests':1}
       
   217         #self.assertRaises(Unauthorized,
       
   218         #                  self.o.execute, user, "DELETE CWUser X WHERE X login 'bidule'")
       
   219         # check local security
       
   220         with self.new_access(u'iaminusersgrouponly').repo_cnx() as cnx:
       
   221             self.assertRaises(Unauthorized, cnx.execute, "DELETE CWGroup Y WHERE Y name 'staff'")
       
   222 
       
   223     def test_delete_rql_permission(self):
       
   224         with self.admin_access.repo_cnx() as cnx:
       
   225             cnx.execute("SET A concerne S WHERE A is Affaire, S is Societe")
       
   226             cnx.commit()
       
   227         # test user can only dele une affaire related to a societe he owns
       
   228         with self.new_access(u'iaminusersgrouponly').repo_cnx() as cnx:
       
   229             # this won't actually do anything since the selection query won't return anything
       
   230             cnx.execute("DELETE Affaire X")
       
   231             cnx.commit()
       
   232             # to actually get Unauthorized exception, try to delete an entity we can read
       
   233             self.assertRaises(Unauthorized, cnx.execute, "DELETE Societe S")
       
   234             self.assertRaises(QueryError, cnx.commit) # can't commit anymore
       
   235             cnx.rollback()
       
   236             cnx.execute("INSERT Affaire X: X sujet 'pascool'")
       
   237             cnx.execute("INSERT Societe X: X nom 'chouette'")
       
   238             cnx.execute("SET A concerne S WHERE A sujet 'pascool', S nom 'chouette'")
       
   239             cnx.commit()
       
   240 ##         # this one should fail since it will try to delete two affaires, one authorized
       
   241 ##         # and the other not
       
   242 ##         self.assertRaises(Unauthorized, cnx.execute, "DELETE Affaire X")
       
   243             cnx.execute("DELETE Affaire X WHERE X sujet 'pascool'")
       
   244             cnx.commit()
       
   245 
       
   246     def test_insert_relation_rql_permission(self):
       
   247         with self.new_access(u'iaminusersgrouponly').repo_cnx() as cnx:
       
   248             cnx.execute("SET A concerne S WHERE A is Affaire, S is Societe")
       
   249             # should raise Unauthorized since user don't own S though this won't
       
   250             # actually do anything since the selection query won't return
       
   251             # anything
       
   252             cnx.commit()
       
   253             # to actually get Unauthorized exception, try to insert a relation
       
   254             # were we can read both entities
       
   255             rset = cnx.execute('Personne P')
       
   256             self.assertEqual(len(rset), 1)
       
   257             ent = rset.get_entity(0, 0)
       
   258             self.assertFalse(cnx.execute('Any P,S WHERE P travaille S,P is Personne, S is Societe'))
       
   259             self.assertRaises(Unauthorized, ent.cw_check_perm, 'update')
       
   260             self.assertRaises(Unauthorized,
       
   261                               cnx.execute, "SET P travaille S WHERE P is Personne, S is Societe")
       
   262             self.assertRaises(QueryError, cnx.commit) # can't commit anymore
       
   263             cnx.rollback()
       
   264             # test nothing has actually been inserted:
       
   265             self.assertFalse(cnx.execute('Any P,S WHERE P travaille S,P is Personne, S is Societe'))
       
   266             cnx.execute("INSERT Societe X: X nom 'chouette'")
       
   267             cnx.execute("SET A concerne S WHERE A is Affaire, S nom 'chouette'")
       
   268             cnx.commit()
       
   269 
       
   270     def test_delete_relation_rql_permission(self):
       
   271         with self.admin_access.repo_cnx() as cnx:
       
   272             cnx.execute("SET A concerne S WHERE A is Affaire, S is Societe")
       
   273             cnx.commit()
       
   274         with self.new_access(u'iaminusersgrouponly').repo_cnx() as cnx:
       
   275             # this won't actually do anything since the selection query won't return anything
       
   276             cnx.execute("DELETE A concerne S")
       
   277             cnx.commit()
       
   278         with self.admin_access.repo_cnx() as cnx:
       
   279             # to actually get Unauthorized exception, try to delete a relation we can read
       
   280             eid = cnx.execute("INSERT Affaire X: X sujet 'pascool'")[0][0]
       
   281             cnx.execute('SET X owned_by U WHERE X eid %(x)s, U login "iaminusersgrouponly"',
       
   282                          {'x': eid})
       
   283             cnx.execute("SET A concerne S WHERE A sujet 'pascool', S is Societe")
       
   284             cnx.commit()
       
   285         with self.new_access(u'iaminusersgrouponly').repo_cnx() as cnx:
       
   286             self.assertRaises(Unauthorized, cnx.execute, "DELETE A concerne S")
       
   287             self.assertRaises(QueryError, cnx.commit) # can't commit anymore
       
   288             cnx.rollback()
       
   289             cnx.execute("INSERT Societe X: X nom 'chouette'")
       
   290             cnx.execute("SET A concerne S WHERE A is Affaire, S nom 'chouette'")
       
   291             cnx.commit()
       
   292             cnx.execute("DELETE A concerne S WHERE S nom 'chouette'")
       
   293             cnx.commit()
       
   294 
       
   295 
       
   296     def test_user_can_change_its_upassword(self):
       
   297         with self.admin_access.repo_cnx() as cnx:
       
   298             ueid = self.create_user(cnx, u'user').eid
       
   299         with self.new_access(u'user').repo_cnx() as cnx:
       
   300             cnx.execute('SET X upassword %(passwd)s WHERE X eid %(x)s',
       
   301                        {'x': ueid, 'passwd': b'newpwd'})
       
   302             cnx.commit()
       
   303         self.repo.close(self.repo.connect('user', password='newpwd'))
       
   304 
       
   305     def test_user_cant_change_other_upassword(self):
       
   306         with self.admin_access.repo_cnx() as cnx:
       
   307             ueid = self.create_user(cnx, u'otheruser').eid
       
   308         with self.new_access(u'iaminusersgrouponly').repo_cnx() as cnx:
       
   309             cnx.execute('SET X upassword %(passwd)s WHERE X eid %(x)s',
       
   310                        {'x': ueid, 'passwd': b'newpwd'})
       
   311             self.assertRaises(Unauthorized, cnx.commit)
       
   312 
       
   313     # read security test
       
   314 
       
   315     def test_read_base(self):
       
   316         with self.temporary_permissions(Personne={'read': ('users', 'managers')}):
       
   317             with self.new_access(u'anon').repo_cnx() as cnx:
       
   318                 self.assertRaises(Unauthorized,
       
   319                                   cnx.execute, 'Personne U where U nom "managers"')
       
   320 
       
   321     def test_read_erqlexpr_base(self):
       
   322         with self.admin_access.repo_cnx() as cnx:
       
   323             eid = cnx.execute("INSERT Affaire X: X sujet 'cool'")[0][0]
       
   324             cnx.commit()
       
   325         with self.new_access(u'iaminusersgrouponly').repo_cnx() as cnx:
       
   326             rset = cnx.execute('Affaire X')
       
   327             self.assertEqual(rset.rows, [])
       
   328             self.assertRaises(Unauthorized, cnx.execute, 'Any X WHERE X eid %(x)s', {'x': eid})
       
   329             # cache test
       
   330             self.assertRaises(Unauthorized, cnx.execute, 'Any X WHERE X eid %(x)s', {'x': eid})
       
   331             aff2 = cnx.execute("INSERT Affaire X: X sujet 'cool'")[0][0]
       
   332             soc1 = cnx.execute("INSERT Societe X: X nom 'chouette'")[0][0]
       
   333             cnx.execute("SET A concerne S WHERE A is Affaire, S is Societe")
       
   334             cnx.commit()
       
   335             rset = cnx.execute('Any X WHERE X eid %(x)s', {'x': aff2})
       
   336             self.assertEqual(rset.rows, [[aff2]])
       
   337             # more cache test w/ NOT eid
       
   338             rset = cnx.execute('Affaire X WHERE NOT X eid %(x)s', {'x': eid})
       
   339             self.assertEqual(rset.rows, [[aff2]])
       
   340             rset = cnx.execute('Affaire X WHERE NOT X eid %(x)s', {'x': aff2})
       
   341             self.assertEqual(rset.rows, [])
       
   342             # test can't update an attribute of an entity that can't be readen
       
   343             self.assertRaises(Unauthorized, cnx.execute,
       
   344                               'SET X sujet "hacked" WHERE X eid %(x)s', {'x': eid})
       
   345 
       
   346 
       
   347     def test_entity_created_in_transaction(self):
       
   348         affschema = self.schema['Affaire']
       
   349         with self.temporary_permissions(Affaire={'read': affschema.permissions['add']}):
       
   350             with self.new_access(u'iaminusersgrouponly').repo_cnx() as cnx:
       
   351                 aff2 = cnx.execute("INSERT Affaire X: X sujet 'cool'")[0][0]
       
   352                 # entity created in transaction are readable *by eid*
       
   353                 self.assertTrue(cnx.execute('Any X WHERE X eid %(x)s', {'x':aff2}))
       
   354                 # XXX would be nice if it worked
       
   355                 rset = cnx.execute("Affaire X WHERE X sujet 'cool'")
       
   356                 self.assertEqual(len(rset), 0)
       
   357                 self.assertRaises(Unauthorized, cnx.commit)
       
   358 
       
   359     def test_read_erqlexpr_has_text1(self):
       
   360         with self.admin_access.repo_cnx() as cnx:
       
   361             aff1 = cnx.execute("INSERT Affaire X: X sujet 'cool'")[0][0]
       
   362             card1 = cnx.execute("INSERT Card X: X title 'cool'")[0][0]
       
   363             cnx.execute('SET X owned_by U WHERE X eid %(x)s, U login "iaminusersgrouponly"',
       
   364                         {'x': card1})
       
   365             cnx.commit()
       
   366         with self.new_access(u'iaminusersgrouponly').repo_cnx() as cnx:
       
   367             aff2 = cnx.execute("INSERT Affaire X: X sujet 'cool'")[0][0]
       
   368             soc1 = cnx.execute("INSERT Societe X: X nom 'chouette'")[0][0]
       
   369             cnx.execute("SET A concerne S WHERE A eid %(a)s, S eid %(s)s", {'a': aff2, 's': soc1})
       
   370             cnx.commit()
       
   371             self.assertRaises(Unauthorized, cnx.execute, 'Any X WHERE X eid %(x)s', {'x':aff1})
       
   372             self.assertTrue(cnx.execute('Any X WHERE X eid %(x)s', {'x':aff2}))
       
   373             self.assertTrue(cnx.execute('Any X WHERE X eid %(x)s', {'x':card1}))
       
   374             rset = cnx.execute("Any X WHERE X has_text 'cool'")
       
   375             self.assertEqual(sorted(eid for eid, in rset.rows),
       
   376                               [card1, aff2])
       
   377 
       
   378     def test_read_erqlexpr_has_text2(self):
       
   379         with self.admin_access.repo_cnx() as cnx:
       
   380             cnx.execute("INSERT Personne X: X nom 'bidule'")
       
   381             cnx.execute("INSERT Societe X: X nom 'bidule'")
       
   382             cnx.commit()
       
   383         with self.temporary_permissions(Personne={'read': ('managers',)}):
       
   384             with self.new_access(u'iaminusersgrouponly').repo_cnx() as cnx:
       
   385                 rset = cnx.execute('Any N WHERE N has_text "bidule"')
       
   386                 self.assertEqual(len(rset.rows), 1, rset.rows)
       
   387                 rset = cnx.execute('Any N WITH N BEING (Any N WHERE N has_text "bidule")')
       
   388                 self.assertEqual(len(rset.rows), 1, rset.rows)
       
   389 
       
   390     def test_read_erqlexpr_optional_rel(self):
       
   391         with self.admin_access.repo_cnx() as cnx:
       
   392             cnx.execute("INSERT Personne X: X nom 'bidule'")
       
   393             cnx.execute("INSERT Societe X: X nom 'bidule'")
       
   394             cnx.commit()
       
   395         with self.temporary_permissions(Personne={'read': ('managers',)}):
       
   396             with self.new_access(u'anon').repo_cnx() as cnx:
       
   397                 rset = cnx.execute('Any N,U WHERE N has_text "bidule", N owned_by U?')
       
   398                 self.assertEqual(len(rset.rows), 1, rset.rows)
       
   399 
       
   400     def test_read_erqlexpr_aggregat(self):
       
   401         with self.admin_access.repo_cnx() as cnx:
       
   402             cnx.execute("INSERT Affaire X: X sujet 'cool'")[0][0]
       
   403             cnx.commit()
       
   404         with self.new_access(u'iaminusersgrouponly').repo_cnx() as cnx:
       
   405             rset = cnx.execute('Any COUNT(X) WHERE X is Affaire')
       
   406             self.assertEqual(rset.rows, [[0]])
       
   407             aff2 = cnx.execute("INSERT Affaire X: X sujet 'cool'")[0][0]
       
   408             soc1 = cnx.execute("INSERT Societe X: X nom 'chouette'")[0][0]
       
   409             cnx.execute("SET A concerne S WHERE A is Affaire, S is Societe")
       
   410             cnx.commit()
       
   411             rset = cnx.execute('Any COUNT(X) WHERE X is Affaire')
       
   412             self.assertEqual(rset.rows, [[1]])
       
   413             rset = cnx.execute('Any ETN, COUNT(X) GROUPBY ETN WHERE X is ET, ET name ETN')
       
   414             values = dict(rset)
       
   415             self.assertEqual(values['Affaire'], 1)
       
   416             self.assertEqual(values['Societe'], 2)
       
   417             rset = cnx.execute('Any ETN, COUNT(X) GROUPBY ETN WHERE X is ET, ET name ETN '
       
   418                               'WITH X BEING ((Affaire X) UNION (Societe X))')
       
   419             self.assertEqual(len(rset), 2)
       
   420             values = dict(rset)
       
   421             self.assertEqual(values['Affaire'], 1)
       
   422             self.assertEqual(values['Societe'], 2)
       
   423 
       
   424 
       
   425     def test_attribute_security(self):
       
   426         with self.admin_access.repo_cnx() as cnx:
       
   427             # only managers should be able to edit the 'test' attribute of Personne entities
       
   428             eid = cnx.execute("INSERT Personne X: X nom 'bidule', "
       
   429                                "X web 'http://www.debian.org', X test TRUE")[0][0]
       
   430             cnx.execute('SET X test FALSE WHERE X eid %(x)s', {'x': eid})
       
   431             cnx.commit()
       
   432         with self.new_access(u'iaminusersgrouponly').repo_cnx() as cnx:
       
   433             cnx.execute("INSERT Personne X: X nom 'bidule', "
       
   434                        "X web 'http://www.debian.org', X test TRUE")
       
   435             self.assertRaises(Unauthorized, cnx.commit)
       
   436             cnx.execute("INSERT Personne X: X nom 'bidule', "
       
   437                        "X web 'http://www.debian.org', X test FALSE")
       
   438             self.assertRaises(Unauthorized, cnx.commit)
       
   439             eid = cnx.execute("INSERT Personne X: X nom 'bidule', "
       
   440                              "X web 'http://www.debian.org'")[0][0]
       
   441             cnx.commit()
       
   442             cnx.execute('SET X test FALSE WHERE X eid %(x)s', {'x': eid})
       
   443             self.assertRaises(Unauthorized, cnx.commit)
       
   444             cnx.execute('SET X test TRUE WHERE X eid %(x)s', {'x': eid})
       
   445             self.assertRaises(Unauthorized, cnx.commit)
       
   446             cnx.execute('SET X web "http://www.logilab.org" WHERE X eid %(x)s', {'x': eid})
       
   447             cnx.commit()
       
   448         with self.new_access(u'iaminusersgrouponly').repo_cnx() as cnx:
       
   449             cnx.execute('INSERT Frozable F: F name "Foo"')
       
   450             cnx.commit()
       
   451             cnx.execute('SET F name "Bar" WHERE F is Frozable')
       
   452             cnx.commit()
       
   453             cnx.execute('SET F name "BaBar" WHERE F is Frozable')
       
   454             cnx.execute('SET F frozen True WHERE F is Frozable')
       
   455             with self.assertRaises(Unauthorized):
       
   456                 cnx.commit()
       
   457             cnx.rollback()
       
   458             cnx.execute('SET F frozen True WHERE F is Frozable')
       
   459             cnx.commit()
       
   460             cnx.execute('SET F name "Bar" WHERE F is Frozable')
       
   461             with self.assertRaises(Unauthorized):
       
   462                 cnx.commit()
       
   463 
       
   464     def test_attribute_security_rqlexpr(self):
       
   465         with self.admin_access.repo_cnx() as cnx:
       
   466             # Note.para attribute editable by managers or if the note is in "todo" state
       
   467             note = cnx.execute("INSERT Note X: X para 'bidule'").get_entity(0, 0)
       
   468             cnx.commit()
       
   469             note.cw_adapt_to('IWorkflowable').fire_transition('markasdone')
       
   470             cnx.execute('SET X para "truc" WHERE X eid %(x)s', {'x': note.eid})
       
   471             cnx.commit()
       
   472         with self.new_access(u'iaminusersgrouponly').repo_cnx() as cnx:
       
   473             cnx.execute("SET X para 'chouette' WHERE X eid %(x)s", {'x': note.eid})
       
   474             self.assertRaises(Unauthorized, cnx.commit)
       
   475             note2 = cnx.execute("INSERT Note X: X para 'bidule'").get_entity(0, 0)
       
   476             cnx.commit()
       
   477             note2.cw_adapt_to('IWorkflowable').fire_transition('markasdone')
       
   478             cnx.commit()
       
   479             self.assertEqual(len(cnx.execute('Any X WHERE X in_state S, S name "todo", X eid %(x)s',
       
   480                                             {'x': note2.eid})),
       
   481                               0)
       
   482             cnx.execute("SET X para 'chouette' WHERE X eid %(x)s", {'x': note2.eid})
       
   483             self.assertRaises(Unauthorized, cnx.commit)
       
   484             note2.cw_adapt_to('IWorkflowable').fire_transition('redoit')
       
   485             cnx.commit()
       
   486             cnx.execute("SET X para 'chouette' WHERE X eid %(x)s", {'x': note2.eid})
       
   487             cnx.commit()
       
   488             cnx.execute("INSERT Note X: X something 'A'")
       
   489             self.assertRaises(Unauthorized, cnx.commit)
       
   490             cnx.execute("INSERT Note X: X para 'zogzog', X something 'A'")
       
   491             cnx.commit()
       
   492             note = cnx.execute("INSERT Note X").get_entity(0,0)
       
   493             cnx.commit()
       
   494             note.cw_set(something=u'B')
       
   495             cnx.commit()
       
   496             note.cw_set(something=None, para=u'zogzog')
       
   497             cnx.commit()
       
   498 
       
   499     def test_attribute_read_security(self):
       
   500         # anon not allowed to see users'login, but they can see users
       
   501         login_rdef = self.repo.schema['CWUser'].rdef('login')
       
   502         with self.temporary_permissions((login_rdef, {'read': ('users', 'managers')}),
       
   503                                         CWUser={'read': ('guests', 'users', 'managers')}):
       
   504             with self.new_access(u'anon').repo_cnx() as cnx:
       
   505                 rset = cnx.execute('CWUser X')
       
   506                 self.assertTrue(rset)
       
   507                 x = rset.get_entity(0, 0)
       
   508                 x.complete()
       
   509                 self.assertEqual(x.login, None)
       
   510                 self.assertTrue(x.creation_date)
       
   511                 x = rset.get_entity(1, 0)
       
   512                 x.complete()
       
   513                 self.assertEqual(x.login, None)
       
   514                 self.assertTrue(x.creation_date)
       
   515 
       
   516     def test_yams_inheritance_and_security_bug(self):
       
   517         with self.temporary_permissions(Division={'read': ('managers',
       
   518                                                            ERQLExpression('X owned_by U'))}):
       
   519             with self.new_access(u'iaminusersgrouponly').repo_cnx() as cnx:
       
   520                 querier = cnx.repo.querier
       
   521                 rqlst = querier.parse('Any X WHERE X is_instance_of Societe')
       
   522                 querier.solutions(cnx, rqlst, {})
       
   523                 querier._annotate(rqlst)
       
   524                 plan = querier.plan_factory(rqlst, {}, cnx)
       
   525                 plan.preprocess(rqlst)
       
   526                 self.assertEqual(
       
   527                     rqlst.as_string(),
       
   528                     '(Any X WHERE X is IN(Societe, SubDivision)) UNION '
       
   529                     '(Any X WHERE X is Division, EXISTS(X owned_by %(B)s))')
       
   530 
       
   531 
       
   532 class BaseSchemaSecurityTC(BaseSecurityTC):
       
   533     """tests related to the base schema permission configuration"""
       
   534 
       
   535     def test_user_can_delete_object_he_created(self):
       
   536         # even if some other user have changed object'state
       
   537         with self.new_access(u'iaminusersgrouponly').repo_cnx() as cnx:
       
   538             # due to security test, affaire has to concerne a societe the user owns
       
   539             cnx.execute('INSERT Societe X: X nom "ARCTIA"')
       
   540             cnx.execute('INSERT Affaire X: X ref "ARCT01", X concerne S WHERE S nom "ARCTIA"')
       
   541             cnx.commit()
       
   542         with self.admin_access.repo_cnx() as cnx:
       
   543             affaire = cnx.execute('Any X WHERE X ref "ARCT01"').get_entity(0, 0)
       
   544             affaire.cw_adapt_to('IWorkflowable').fire_transition('abort')
       
   545             cnx.commit()
       
   546             self.assertEqual(len(cnx.execute('TrInfo X WHERE X wf_info_for A, A ref "ARCT01"')),
       
   547                              1)
       
   548             self.assertEqual(len(cnx.execute('TrInfo X WHERE X wf_info_for A, A ref "ARCT01",'
       
   549                                               'X owned_by U, U login "admin"')),
       
   550                              1) # TrInfo at the above state change
       
   551         with self.new_access(u'iaminusersgrouponly').repo_cnx() as cnx:
       
   552             cnx.execute('DELETE Affaire X WHERE X ref "ARCT01"')
       
   553             cnx.commit()
       
   554             self.assertFalse(cnx.execute('Affaire X'))
       
   555 
       
   556     def test_users_and_groups_non_readable_by_guests(self):
       
   557         with self.repo.internal_cnx() as cnx:
       
   558             admineid = cnx.execute('CWUser U WHERE U login "admin"').rows[0][0]
       
   559         with self.new_access(u'anon').repo_cnx() as cnx:
       
   560             anon = cnx.user
       
   561             # anonymous user can only read itself
       
   562             rset = cnx.execute('Any L WHERE X owned_by U, U login L')
       
   563             self.assertEqual([['anon']], rset.rows)
       
   564             rset = cnx.execute('CWUser X')
       
   565             self.assertEqual([[anon.eid]], rset.rows)
       
   566             # anonymous user can read groups (necessary to check allowed transitions for instance)
       
   567             self.assertTrue(cnx.execute('CWGroup X'))
       
   568             # should only be able to read the anonymous user, not another one
       
   569             self.assertRaises(Unauthorized,
       
   570                               cnx.execute, 'CWUser X WHERE X eid %(x)s', {'x': admineid})
       
   571             rset = cnx.execute('CWUser X WHERE X eid %(x)s', {'x': anon.eid})
       
   572             self.assertEqual([[anon.eid]], rset.rows)
       
   573             # but can't modify it
       
   574             cnx.execute('SET X login "toto" WHERE X eid %(x)s', {'x': anon.eid})
       
   575             self.assertRaises(Unauthorized, cnx.commit)
       
   576 
       
   577     def test_in_group_relation(self):
       
   578         with self.new_access(u'iaminusersgrouponly').repo_cnx() as cnx:
       
   579             rql = u"DELETE U in_group G WHERE U login 'admin'"
       
   580             self.assertRaises(Unauthorized, cnx.execute, rql)
       
   581             rql = u"SET U in_group G WHERE U login 'admin', G name 'users'"
       
   582             self.assertRaises(Unauthorized, cnx.execute, rql)
       
   583 
       
   584     def test_owned_by(self):
       
   585         with self.admin_access.repo_cnx() as cnx:
       
   586             cnx.execute("INSERT Personne X: X nom 'bidule'")
       
   587             cnx.commit()
       
   588         with self.new_access(u'iaminusersgrouponly').repo_cnx() as cnx:
       
   589             rql = u"SET X owned_by U WHERE U login 'iaminusersgrouponly', X is Personne"
       
   590             self.assertRaises(Unauthorized, cnx.execute, rql)
       
   591 
       
   592     def test_bookmarked_by_guests_security(self):
       
   593         with self.admin_access.repo_cnx() as cnx:
       
   594             beid1 = cnx.execute('INSERT Bookmark B: B path "?vid=manage", B title "manage"')[0][0]
       
   595             beid2 = cnx.execute('INSERT Bookmark B: B path "?vid=index", B title "index", '
       
   596                                 'B bookmarked_by U WHERE U login "anon"')[0][0]
       
   597             cnx.commit()
       
   598         with self.new_access(u'anon').repo_cnx() as cnx:
       
   599             anoneid = cnx.user.eid
       
   600             self.assertEqual(cnx.execute('Any T,P ORDERBY lower(T) WHERE B is Bookmark,B title T,B path P,'
       
   601                                          'B bookmarked_by U, U eid %s' % anoneid).rows,
       
   602                               [['index', '?vid=index']])
       
   603             self.assertEqual(cnx.execute('Any T,P ORDERBY lower(T) WHERE B is Bookmark,B title T,B path P,'
       
   604                                          'B bookmarked_by U, U eid %(x)s', {'x': anoneid}).rows,
       
   605                               [['index', '?vid=index']])
       
   606             # can read others bookmarks as well
       
   607             self.assertEqual(cnx.execute('Any B where B is Bookmark, NOT B bookmarked_by U').rows,
       
   608                               [[beid1]])
       
   609             self.assertRaises(Unauthorized, cnx.execute,'DELETE B bookmarked_by U')
       
   610             self.assertRaises(Unauthorized,
       
   611                               cnx.execute, 'SET B bookmarked_by U WHERE U eid %(x)s, B eid %(b)s',
       
   612                               {'x': anoneid, 'b': beid1})
       
   613 
       
   614     def test_ambigous_ordered(self):
       
   615         with self.new_access(u'anon').repo_cnx() as cnx:
       
   616             names = [t for t, in cnx.execute('Any N ORDERBY lower(N) WHERE X name N')]
       
   617             self.assertEqual(names, sorted(names, key=lambda x: x.lower()))
       
   618 
       
   619     def test_in_state_without_update_perm(self):
       
   620         """check a user change in_state without having update permission on the
       
   621         subject
       
   622         """
       
   623         with self.admin_access.repo_cnx() as cnx:
       
   624             eid = cnx.execute('INSERT Affaire X: X ref "ARCT01"')[0][0]
       
   625             cnx.commit()
       
   626         with self.new_access(u'iaminusersgrouponly').repo_cnx() as cnx:
       
   627             # needed to remove rql expr granting update perm to the user
       
   628             affschema = self.schema['Affaire']
       
   629             with self.temporary_permissions(Affaire={'update': affschema.get_groups('update'),
       
   630                                                      'read': ('users',)}):
       
   631                 self.assertRaises(Unauthorized,
       
   632                                   affschema.check_perm, cnx, 'update', eid=eid)
       
   633                 aff = cnx.execute('Any X WHERE X ref "ARCT01"').get_entity(0, 0)
       
   634                 aff.cw_adapt_to('IWorkflowable').fire_transition('abort')
       
   635                 cnx.commit()
       
   636                 # though changing a user state (even logged user) is reserved to managers
       
   637                 user = cnx.user
       
   638                 # XXX wether it should raise Unauthorized or ValidationError is not clear
       
   639                 # the best would probably ValidationError if the transition doesn't exist
       
   640                 # from the current state but Unauthorized if it exists but user can't pass it
       
   641                 self.assertRaises(ValidationError,
       
   642                                   user.cw_adapt_to('IWorkflowable').fire_transition, 'deactivate')
       
   643 
       
   644     def test_trinfo_security(self):
       
   645         with self.admin_access.repo_cnx() as cnx:
       
   646             aff = cnx.execute('INSERT Affaire X: X ref "ARCT01"').get_entity(0, 0)
       
   647             iworkflowable = aff.cw_adapt_to('IWorkflowable')
       
   648             cnx.commit()
       
   649             iworkflowable.fire_transition('abort')
       
   650             cnx.commit()
       
   651             # can change tr info comment
       
   652             cnx.execute('SET TI comment %(c)s WHERE TI wf_info_for X, X ref "ARCT01"',
       
   653                          {'c': u'bouh!'})
       
   654             cnx.commit()
       
   655             aff.cw_clear_relation_cache('wf_info_for', 'object')
       
   656             trinfo = iworkflowable.latest_trinfo()
       
   657             self.assertEqual(trinfo.comment, 'bouh!')
       
   658             # but not from_state/to_state
       
   659             aff.cw_clear_relation_cache('wf_info_for', role='object')
       
   660             self.assertRaises(Unauthorized, cnx.execute,
       
   661                               'SET TI from_state S WHERE TI eid %(ti)s, S name "ben non"',
       
   662                               {'ti': trinfo.eid})
       
   663             self.assertRaises(Unauthorized, cnx.execute,
       
   664                               'SET TI to_state S WHERE TI eid %(ti)s, S name "pitetre"',
       
   665                               {'ti': trinfo.eid})
       
   666 
       
   667     def test_emailaddress_security(self):
       
   668         # check for prexisting email adresse
       
   669         with self.admin_access.repo_cnx() as cnx:
       
   670             if cnx.execute('Any X WHERE X is EmailAddress'):
       
   671                 rset = cnx.execute('Any X, U WHERE X is EmailAddress, U use_email X')
       
   672                 msg = ['Preexisting email readable by anon found!']
       
   673                 tmpl = '  - "%s" used by user "%s"'
       
   674                 for i in range(len(rset)):
       
   675                     email, user = rset.get_entity(i, 0), rset.get_entity(i, 1)
       
   676                     msg.append(tmpl % (email.dc_title(), user.dc_title()))
       
   677                 raise RuntimeError('\n'.join(msg))
       
   678             # actual test
       
   679             cnx.execute('INSERT EmailAddress X: X address "hop"').get_entity(0, 0)
       
   680             cnx.execute('INSERT EmailAddress X: X address "anon", '
       
   681                          'U use_email X WHERE U login "anon"').get_entity(0, 0)
       
   682             cnx.commit()
       
   683             self.assertEqual(len(cnx.execute('Any X WHERE X is EmailAddress')), 2)
       
   684         with self.new_access(u'anon').repo_cnx() as cnx:
       
   685             self.assertEqual(len(cnx.execute('Any X WHERE X is EmailAddress')), 1)
       
   686 
       
   687 if __name__ == '__main__':
       
   688     unittest_main()