server/test/unittest_security.py
branchstable
changeset 8461 8af7c6d86efb
parent 8452 1ad42383a9ec
child 8463 a964c40adbe3
equal deleted inserted replaced
8460:b1f6777fc839 8461:8af7c6d86efb
    18 """functional tests for server'security"""
    18 """functional tests for server'security"""
    19 
    19 
    20 import sys
    20 import sys
    21 
    21 
    22 from logilab.common.testlib import unittest_main, TestCase
    22 from logilab.common.testlib import unittest_main, TestCase
       
    23 
       
    24 from rql import RQLException
       
    25 
    23 from cubicweb.devtools.testlib import CubicWebTC
    26 from cubicweb.devtools.testlib import CubicWebTC
    24 
       
    25 from cubicweb import Unauthorized, ValidationError, QueryError
    27 from cubicweb import Unauthorized, ValidationError, QueryError
    26 from cubicweb.schema import ERQLExpression
    28 from cubicweb.schema import ERQLExpression
    27 from cubicweb.server.querier import check_read_access
    29 from cubicweb.server.querier import check_read_access
    28 
    30 
    29 
    31 
    30 class BaseSecurityTC(CubicWebTC):
    32 class BaseSecurityTC(CubicWebTC):
    31 
    33 
    32     def setup_database(self):
    34     def setup_database(self):
    33         super(BaseSecurityTC, self).setup_database()
    35         super(BaseSecurityTC, self).setup_database()
    34         req = self.request()
    36         self.create_user(self.request(), 'iaminusersgrouponly')
    35         self.create_user(req, 'iaminusersgrouponly')
       
    36         readoriggroups = self.schema['Personne'].permissions['read']
       
    37         addoriggroups = self.schema['Personne'].permissions['add']
       
    38         def fix_perm():
       
    39             self.schema['Personne'].set_action_permissions('read', readoriggroups)
       
    40             self.schema['Personne'].set_action_permissions('add', addoriggroups)
       
    41         self.addCleanup(fix_perm)
       
    42 
    37 
    43 
    38 
    44 class LowLevelSecurityFunctionTC(BaseSecurityTC):
    39 class LowLevelSecurityFunctionTC(BaseSecurityTC):
    45 
    40 
    46     def test_check_read_access(self):
    41     def test_check_read_access(self):
    47         rql = u'Personne U where U nom "managers"'
    42         rql = u'Personne U where U nom "managers"'
    48         rqlst = self.repo.vreg.rqlhelper.parse(rql).children[0]
    43         rqlst = self.repo.vreg.rqlhelper.parse(rql).children[0]
    49         origgroups = self.schema['Personne'].get_groups('read')
    44         with self.temporary_permissions(Personne={'read': ('users', 'managers')}):
    50         self.schema['Personne'].set_action_permissions('read', ('users', 'managers'))
    45             self.repo.vreg.solutions(self.session, rqlst, None)
    51         self.repo.vreg.solutions(self.session, rqlst, None)
    46             solution = rqlst.solutions[0]
    52         solution = rqlst.solutions[0]
    47             check_read_access(self.session, rqlst, solution, {})
    53         check_read_access(self.session, rqlst, solution, {})
    48             with self.login('anon') as cu:
    54         cnx = self.login('anon')
    49                 self.assertRaises(Unauthorized,
    55         cu = cnx.cursor()
    50                                   check_read_access,
    56         self.assertRaises(Unauthorized,
    51                                   self.session, rqlst, solution, {})
    57                           check_read_access,
    52                 self.assertRaises(Unauthorized, cu.execute, rql)
    58                           self.session, rqlst, solution, {})
       
    59         self.assertRaises(Unauthorized, cu.execute, rql)
       
    60 
    53 
    61     def test_upassword_not_selectable(self):
    54     def test_upassword_not_selectable(self):
    62         self.assertRaises(Unauthorized,
    55         self.assertRaises(Unauthorized,
    63                           self.execute, 'Any X,P WHERE X is CWUser, X upassword P')
    56                           self.execute, 'Any X,P WHERE X is CWUser, X upassword P')
    64         self.rollback()
    57         self.rollback()
    65         cnx = self.login('iaminusersgrouponly')
    58         with self.login('iaminusersgrouponly') as cu:
    66         cu = cnx.cursor()
    59             self.assertRaises(Unauthorized,
    67         self.assertRaises(Unauthorized,
    60                               cu.execute, 'Any X,P WHERE X is CWUser, X upassword P')
    68                           cu.execute, 'Any X,P WHERE X is CWUser, X upassword P')
       
    69 
    61 
    70 
    62 
    71 class SecurityRewritingTC(BaseSecurityTC):
    63 class SecurityRewritingTC(BaseSecurityTC):
    72     def hijack_source_execute(self):
    64     def hijack_source_execute(self):
    73         def syntax_tree_search(*args, **kwargs):
    65         def syntax_tree_search(*args, **kwargs):
    78     def tearDown(self):
    70     def tearDown(self):
    79         self.repo.system_source.__dict__.pop('syntax_tree_search', None)
    71         self.repo.system_source.__dict__.pop('syntax_tree_search', None)
    80         super(SecurityRewritingTC, self).tearDown()
    72         super(SecurityRewritingTC, self).tearDown()
    81 
    73 
    82     def test_not_relation_read_security(self):
    74     def test_not_relation_read_security(self):
    83         cnx = self.login('iaminusersgrouponly')
    75         with self.login('iaminusersgrouponly'):
    84         self.hijack_source_execute()
    76             self.hijack_source_execute()
    85         self.execute('Any U WHERE NOT A todo_by U, A is Affaire')
    77             self.execute('Any U WHERE NOT A todo_by U, A is Affaire')
    86         self.assertEqual(self.query[0][1].as_string(),
    78             self.assertEqual(self.query[0][1].as_string(),
    87                           'Any U WHERE NOT EXISTS(A todo_by U), A is Affaire')
    79                               'Any U WHERE NOT EXISTS(A todo_by U), A is Affaire')
    88         self.execute('Any U WHERE NOT EXISTS(A todo_by U), A is Affaire')
    80             self.execute('Any U WHERE NOT EXISTS(A todo_by U), A is Affaire')
    89         self.assertEqual(self.query[0][1].as_string(),
    81             self.assertEqual(self.query[0][1].as_string(),
    90                           'Any U WHERE NOT EXISTS(A todo_by U), A is Affaire')
    82                               'Any U WHERE NOT EXISTS(A todo_by U), A is Affaire')
    91         cnx.close()
       
    92 
    83 
    93 class SecurityTC(BaseSecurityTC):
    84 class SecurityTC(BaseSecurityTC):
    94 
    85 
    95     def setUp(self):
    86     def setUp(self):
    96         BaseSecurityTC.setUp(self)
    87         BaseSecurityTC.setUp(self)
   100         self.execute("INSERT Personne X: X nom 'bidule'")
    91         self.execute("INSERT Personne X: X nom 'bidule'")
   101         self.execute('INSERT CWGroup X: X name "staff"')
    92         self.execute('INSERT CWGroup X: X name "staff"')
   102         self.commit()
    93         self.commit()
   103 
    94 
   104     def test_insert_security(self):
    95     def test_insert_security(self):
   105         cnx = self.login('anon')
    96         with self.login('anon') as cu:
   106         cu = cnx.cursor()
    97             cu.execute("INSERT Personne X: X nom 'bidule'")
   107         cu.execute("INSERT Personne X: X nom 'bidule'")
    98             self.assertRaises(Unauthorized, self.commit)
   108         self.assertRaises(Unauthorized, cnx.commit)
    99             self.assertEqual(cu.execute('Personne X').rowcount, 1)
   109         self.assertEqual(cu.execute('Personne X').rowcount, 1)
       
   110         cnx.close()
       
   111 
   100 
   112     def test_insert_rql_permission(self):
   101     def test_insert_rql_permission(self):
   113         # test user can only add une affaire related to a societe he owns
   102         # test user can only add une affaire related to a societe he owns
   114         cnx = self.login('iaminusersgrouponly')
   103         with self.login('iaminusersgrouponly') as cu:
   115         cu = cnx.cursor()
   104             cu.execute("INSERT Affaire X: X sujet 'cool'")
   116         cu.execute("INSERT Affaire X: X sujet 'cool'")
   105             self.assertRaises(Unauthorized, self.commit)
   117         self.assertRaises(Unauthorized, cnx.commit)
       
   118         # test nothing has actually been inserted
   106         # test nothing has actually been inserted
   119         self.restore_connection()
       
   120         self.assertEqual(self.execute('Affaire X').rowcount, 1)
   107         self.assertEqual(self.execute('Affaire X').rowcount, 1)
   121         cnx = self.login('iaminusersgrouponly')
   108         with self.login('iaminusersgrouponly') as cu:
   122         cu = cnx.cursor()
   109             cu.execute("INSERT Affaire X: X sujet 'cool'")
   123         cu.execute("INSERT Affaire X: X sujet 'cool'")
   110             cu.execute("INSERT Societe X: X nom 'chouette'")
   124         cu.execute("INSERT Societe X: X nom 'chouette'")
   111             cu.execute("SET A concerne S WHERE A sujet 'cool', S nom 'chouette'")
   125         cu.execute("SET A concerne S WHERE A sujet 'cool', S nom 'chouette'")
   112             self.commit()
   126         cnx.commit()
       
   127         cnx.close()
       
   128 
   113 
   129     def test_update_security_1(self):
   114     def test_update_security_1(self):
   130         cnx = self.login('anon')
   115         with self.login('anon') as cu:
   131         cu = cnx.cursor()
   116             # local security check
   132         # local security check
   117             cu.execute( "SET X nom 'bidulechouette' WHERE X is Personne")
   133         cu.execute( "SET X nom 'bidulechouette' WHERE X is Personne")
   118             self.assertRaises(Unauthorized, self.commit)
   134         self.assertRaises(Unauthorized, cnx.commit)
       
   135         self.restore_connection()
       
   136         self.assertEqual(self.execute('Personne X WHERE X nom "bidulechouette"').rowcount, 0)
   119         self.assertEqual(self.execute('Personne X WHERE X nom "bidulechouette"').rowcount, 0)
   137 
   120 
   138     def test_update_security_2(self):
   121     def test_update_security_2(self):
   139         cnx = self.login('anon')
   122         with self.temporary_permissions(Personne={'read': ('users', 'managers'),
   140         cu = cnx.cursor()
   123                                                   'add': ('guests', 'users', 'managers')}):
   141         self.repo.schema['Personne'].set_action_permissions('read', ('users', 'managers'))
   124             with self.login('anon') as cu:
   142         self.repo.schema['Personne'].set_action_permissions('add', ('guests', 'users', 'managers'))
   125                 self.assertRaises(Unauthorized, cu.execute, "SET X nom 'bidulechouette' WHERE X is Personne")
   143         self.assertRaises(Unauthorized, cu.execute, "SET X nom 'bidulechouette' WHERE X is Personne")
   126                 self.rollback()
   144         #self.assertRaises(Unauthorized, cnx.commit)
   127                 # self.assertRaises(Unauthorized, cnx.commit)
   145         # test nothing has actually been inserted
   128         # test nothing has actually been inserted
   146         self.restore_connection()
       
   147         self.assertEqual(self.execute('Personne X WHERE X nom "bidulechouette"').rowcount, 0)
   129         self.assertEqual(self.execute('Personne X WHERE X nom "bidulechouette"').rowcount, 0)
   148 
   130 
   149     def test_update_security_3(self):
   131     def test_update_security_3(self):
   150         cnx = self.login('iaminusersgrouponly')
   132         with self.login('iaminusersgrouponly') as cu:
   151         cu = cnx.cursor()
   133             cu.execute("INSERT Personne X: X nom 'biduuule'")
   152         cu.execute("INSERT Personne X: X nom 'biduuule'")
   134             cu.execute("INSERT Societe X: X nom 'looogilab'")
   153         cu.execute("INSERT Societe X: X nom 'looogilab'")
   135             cu.execute("SET X travaille S WHERE X nom 'biduuule', S nom 'looogilab'")
   154         cu.execute("SET X travaille S WHERE X nom 'biduuule', S nom 'looogilab'")
       
   155         cnx.close()
       
   156 
   136 
   157     def test_update_rql_permission(self):
   137     def test_update_rql_permission(self):
   158         self.execute("SET A concerne S WHERE A is Affaire, S is Societe")
   138         self.execute("SET A concerne S WHERE A is Affaire, S is Societe")
   159         self.commit()
   139         self.commit()
   160         # test user can only update une affaire related to a societe he owns
   140         # test user can only update une affaire related to a societe he owns
   161         cnx = self.login('iaminusersgrouponly')
   141         with self.login('iaminusersgrouponly') as cu:
   162         cu = cnx.cursor()
   142             cu.execute("SET X sujet 'pascool' WHERE X is Affaire")
   163         cu.execute("SET X sujet 'pascool' WHERE X is Affaire")
   143             # this won't actually do anything since the selection query won't return anything
   164         # this won't actually do anything since the selection query won't return anything
   144             self.commit()
   165         cnx.commit()
   145             # to actually get Unauthorized exception, try to update an entity we can read
   166         # to actually get Unauthorized exception, try to update an entity we can read
   146             cu.execute("SET X nom 'toto' WHERE X is Societe")
   167         cu.execute("SET X nom 'toto' WHERE X is Societe")
   147             self.assertRaises(Unauthorized, self.commit)
   168         self.assertRaises(Unauthorized, cnx.commit)
   148             cu.execute("INSERT Affaire X: X sujet 'pascool'")
   169         cu.execute("INSERT Affaire X: X sujet 'pascool'")
   149             cu.execute("INSERT Societe X: X nom 'chouette'")
   170         cu.execute("INSERT Societe X: X nom 'chouette'")
   150             cu.execute("SET A concerne S WHERE A sujet 'pascool', S nom 'chouette'")
   171         cu.execute("SET A concerne S WHERE A sujet 'pascool', S nom 'chouette'")
   151             cu.execute("SET X sujet 'habahsicestcool' WHERE X sujet 'pascool'")
   172         cu.execute("SET X sujet 'habahsicestcool' WHERE X sujet 'pascool'")
   152             self.commit()
   173         cnx.commit()
       
   174         cnx.close()
       
   175 
   153 
   176     def test_delete_security(self):
   154     def test_delete_security(self):
   177         # FIXME: sample below fails because we don't detect "owner" can't delete
   155         # FIXME: sample below fails because we don't detect "owner" can't delete
   178         # user anyway, and since no user with login == 'bidule' exists, no
   156         # user anyway, and since no user with login == 'bidule' exists, no
   179         # exception is raised
   157         # exception is raised
   180         #user._groups = {'guests':1}
   158         #user._groups = {'guests':1}
   181         #self.assertRaises(Unauthorized,
   159         #self.assertRaises(Unauthorized,
   182         #                  self.o.execute, user, "DELETE CWUser X WHERE X login 'bidule'")
   160         #                  self.o.execute, user, "DELETE CWUser X WHERE X login 'bidule'")
   183         # check local security
   161         # check local security
   184         cnx = self.login('iaminusersgrouponly')
   162         with self.login('iaminusersgrouponly') as cu:
   185         cu = cnx.cursor()
   163             self.assertRaises(Unauthorized, cu.execute, "DELETE CWGroup Y WHERE Y name 'staff'")
   186         self.assertRaises(Unauthorized, cu.execute, "DELETE CWGroup Y WHERE Y name 'staff'")
   164             self.rollback()
   187         cnx.close()
       
   188 
   165 
   189     def test_delete_rql_permission(self):
   166     def test_delete_rql_permission(self):
   190         self.execute("SET A concerne S WHERE A is Affaire, S is Societe")
   167         self.execute("SET A concerne S WHERE A is Affaire, S is Societe")
   191         self.commit()
   168         self.commit()
   192         # test user can only dele une affaire related to a societe he owns
   169         # test user can only dele une affaire related to a societe he owns
   193         cnx = self.login('iaminusersgrouponly')
   170         with self.login('iaminusersgrouponly') as cu:
   194         cu = cnx.cursor()
   171             # this won't actually do anything since the selection query won't return anything
   195         # this won't actually do anything since the selection query won't return anything
   172             cu.execute("DELETE Affaire X")
   196         cu.execute("DELETE Affaire X")
   173             self.commit()
   197         cnx.commit()
   174             # to actually get Unauthorized exception, try to delete an entity we can read
   198         # to actually get Unauthorized exception, try to delete an entity we can read
   175             self.assertRaises(Unauthorized, cu.execute, "DELETE Societe S")
   199         self.assertRaises(Unauthorized, cu.execute, "DELETE Societe S")
   176             self.assertRaises(QueryError, self.commit) # can't commit anymore
   200         self.assertRaises(QueryError, cnx.commit) # can't commit anymore
   177             self.rollback() # required after Unauthorized
   201         cnx.rollback() # required after Unauthorized
   178             cu.execute("INSERT Affaire X: X sujet 'pascool'")
   202         cu.execute("INSERT Affaire X: X sujet 'pascool'")
   179             cu.execute("INSERT Societe X: X nom 'chouette'")
   203         cu.execute("INSERT Societe X: X nom 'chouette'")
   180             cu.execute("SET A concerne S WHERE A sujet 'pascool', S nom 'chouette'")
   204         cu.execute("SET A concerne S WHERE A sujet 'pascool', S nom 'chouette'")
   181             self.commit()
   205         cnx.commit()
       
   206 ##         # this one should fail since it will try to delete two affaires, one authorized
   182 ##         # this one should fail since it will try to delete two affaires, one authorized
   207 ##         # and the other not
   183 ##         # and the other not
   208 ##         self.assertRaises(Unauthorized, cu.execute, "DELETE Affaire X")
   184 ##         self.assertRaises(Unauthorized, cu.execute, "DELETE Affaire X")
   209         cu.execute("DELETE Affaire X WHERE X sujet 'pascool'")
   185             cu.execute("DELETE Affaire X WHERE X sujet 'pascool'")
   210         cnx.commit()
   186             self.commit()
   211         cnx.close()
       
   212 
   187 
   213 
   188 
   214     def test_insert_relation_rql_permission(self):
   189     def test_insert_relation_rql_permission(self):
   215         cnx = self.login('iaminusersgrouponly')
   190         with self.login('iaminusersgrouponly') as cu:
   216         session = self.session
   191             cu.execute("SET A concerne S WHERE A is Affaire, S is Societe")
   217         cu = cnx.cursor(session)
   192             # should raise Unauthorized since user don't own S though this won't
   218         cu.execute("SET A concerne S WHERE A is Affaire, S is Societe")
   193             # actually do anything since the selection query won't return
   219         # should raise Unauthorized since user don't own S
   194             # anything
   220         # though this won't actually do anything since the selection query won't return anything
   195             self.commit()
   221         cnx.commit()
   196             # to actually get Unauthorized exception, try to insert a relation
   222         # to actually get Unauthorized exception, try to insert a relation were we can read both entities
   197             # were we can read both entities
   223         rset = cu.execute('Personne P')
   198             rset = cu.execute('Personne P')
   224         self.assertEqual(len(rset), 1)
   199             self.assertEqual(len(rset), 1)
   225         ent = rset.get_entity(0, 0)
   200             ent = rset.get_entity(0, 0)
   226         session.set_cnxset() # necessary
   201             self.assertFalse(cu.execute('Any P,S WHERE P travaille S,P is Personne, S is Societe'))
   227         self.assertRaises(Unauthorized, ent.cw_check_perm, 'update')
   202             self.assertRaises(Unauthorized, ent.cw_check_perm, 'update')
   228         self.assertRaises(Unauthorized,
   203             self.assertRaises(Unauthorized,
   229                           cu.execute, "SET P travaille S WHERE P is Personne, S is Societe")
   204                               cu.execute, "SET P travaille S WHERE P is Personne, S is Societe")
   230         self.assertRaises(QueryError, cnx.commit) # can't commit anymore
   205             self.assertRaises(QueryError, self.commit) # can't commit anymore
   231         cnx.rollback()
   206             self.rollback()
   232         # test nothing has actually been inserted:
   207             # test nothing has actually been inserted:
   233         self.assertEqual(cu.execute('Any P,S WHERE P travaille S,P is Personne, S is Societe').rowcount, 0)
   208             self.assertFalse(cu.execute('Any P,S WHERE P travaille S,P is Personne, S is Societe'))
   234         cu.execute("INSERT Societe X: X nom 'chouette'")
   209             cu.execute("INSERT Societe X: X nom 'chouette'")
   235         cu.execute("SET A concerne S WHERE A is Affaire, S nom 'chouette'")
   210             cu.execute("SET A concerne S WHERE A is Affaire, S nom 'chouette'")
   236         cnx.commit()
   211             self.commit()
   237         cnx.close()
       
   238 
   212 
   239     def test_delete_relation_rql_permission(self):
   213     def test_delete_relation_rql_permission(self):
   240         self.execute("SET A concerne S WHERE A is Affaire, S is Societe")
   214         self.execute("SET A concerne S WHERE A is Affaire, S is Societe")
   241         self.commit()
   215         self.commit()
   242         cnx = self.login('iaminusersgrouponly')
   216         with self.login('iaminusersgrouponly') as cu:
   243         cu = cnx.cursor()
   217             # this won't actually do anything since the selection query won't return anything
   244         # this won't actually do anything since the selection query won't return anything
   218             cu.execute("DELETE A concerne S")
   245         cu.execute("DELETE A concerne S")
   219             self.commit()
   246         cnx.commit()
       
   247         # to actually get Unauthorized exception, try to delete a relation we can read
   220         # to actually get Unauthorized exception, try to delete a relation we can read
   248         self.restore_connection()
       
   249         eid = self.execute("INSERT Affaire X: X sujet 'pascool'")[0][0]
   221         eid = self.execute("INSERT Affaire X: X sujet 'pascool'")[0][0]
   250         self.execute('SET X owned_by U WHERE X eid %(x)s, U login "iaminusersgrouponly"', {'x': eid})
   222         self.execute('SET X owned_by U WHERE X eid %(x)s, U login "iaminusersgrouponly"', {'x': eid})
   251         self.execute("SET A concerne S WHERE A sujet 'pascool', S is Societe")
   223         self.execute("SET A concerne S WHERE A sujet 'pascool', S is Societe")
   252         self.commit()
   224         self.commit()
   253         cnx = self.login('iaminusersgrouponly')
   225         with self.login('iaminusersgrouponly') as cu:
   254         cu = cnx.cursor()
   226             self.assertRaises(Unauthorized, cu.execute, "DELETE A concerne S")
   255         self.assertRaises(Unauthorized, cu.execute, "DELETE A concerne S")
   227             self.assertRaises(QueryError, self.commit) # can't commit anymore
   256         self.assertRaises(QueryError, cnx.commit) # can't commit anymore
   228             self.rollback() # required after Unauthorized
   257         cnx.rollback() # required after Unauthorized
   229             cu.execute("INSERT Societe X: X nom 'chouette'")
   258         cu.execute("INSERT Societe X: X nom 'chouette'")
   230             cu.execute("SET A concerne S WHERE A is Affaire, S nom 'chouette'")
   259         cu.execute("SET A concerne S WHERE A is Affaire, S nom 'chouette'")
   231             self.commit()
   260         cnx.commit()
   232             cu.execute("DELETE A concerne S WHERE S nom 'chouette'")
   261         cu.execute("DELETE A concerne S WHERE S nom 'chouette'")
   233             self.commit()
   262         cnx.close()
       
   263 
   234 
   264 
   235 
   265     def test_user_can_change_its_upassword(self):
   236     def test_user_can_change_its_upassword(self):
   266         req = self.request()
   237         req = self.request()
   267         ueid = self.create_user(req, 'user').eid
   238         ueid = self.create_user(req, 'user').eid
   268         cnx = self.login('user')
   239         with self.login('user') as cu:
   269         cu = cnx.cursor()
   240             cu.execute('SET X upassword %(passwd)s WHERE X eid %(x)s',
   270         cu.execute('SET X upassword %(passwd)s WHERE X eid %(x)s',
   241                        {'x': ueid, 'passwd': 'newpwd'})
   271                    {'x': ueid, 'passwd': 'newpwd'})
   242             self.commit()
   272         cnx.commit()
       
   273         cnx.close()
       
   274         cnx = self.login('user', password='newpwd')
   243         cnx = self.login('user', password='newpwd')
   275         cnx.close()
   244         cnx.close()
   276 
   245 
   277     def test_user_cant_change_other_upassword(self):
   246     def test_user_cant_change_other_upassword(self):
   278         req = self.request()
   247         req = self.request()
   279         ueid = self.create_user(req, 'otheruser').eid
   248         ueid = self.create_user(req, 'otheruser').eid
   280         cnx = self.login('iaminusersgrouponly')
   249         with self.login('iaminusersgrouponly') as cu:
   281         cu = cnx.cursor()
   250             cu.execute('SET X upassword %(passwd)s WHERE X eid %(x)s',
   282         cu.execute('SET X upassword %(passwd)s WHERE X eid %(x)s',
   251                        {'x': ueid, 'passwd': 'newpwd'})
   283                    {'x': ueid, 'passwd': 'newpwd'})
   252             self.assertRaises(Unauthorized, self.commit)
   284         self.assertRaises(Unauthorized, cnx.commit)
       
   285         cnx.close()
       
   286 
   253 
   287     # read security test
   254     # read security test
   288 
   255 
   289     def test_read_base(self):
   256     def test_read_base(self):
   290         self.schema['Personne'].set_action_permissions('read', ('users', 'managers'))
   257         with self.temporary_permissions(Personne={'read': ('users', 'managers')}):
   291         cnx = self.login('anon')
   258             with self.login('anon') as cu:
   292         cu = cnx.cursor()
   259                 self.assertRaises(Unauthorized,
   293         self.assertRaises(Unauthorized,
   260                                   cu.execute, 'Personne U where U nom "managers"')
   294                           cu.execute, 'Personne U where U nom "managers"')
   261                 self.rollback()
   295         cnx.close()
       
   296 
   262 
   297     def test_read_erqlexpr_base(self):
   263     def test_read_erqlexpr_base(self):
   298         eid = self.execute("INSERT Affaire X: X sujet 'cool'")[0][0]
   264         eid = self.execute("INSERT Affaire X: X sujet 'cool'")[0][0]
   299         self.commit()
   265         self.commit()
   300         cnx = self.login('iaminusersgrouponly')
   266         with self.login('iaminusersgrouponly') as cu:
   301         cu = cnx.cursor()
   267             rset = cu.execute('Affaire X')
   302         rset = cu.execute('Affaire X')
   268             self.assertEqual(rset.rows, [])
   303         self.assertEqual(rset.rows, [])
   269             self.assertRaises(Unauthorized, cu.execute, 'Any X WHERE X eid %(x)s', {'x': eid})
   304         self.assertRaises(Unauthorized, cu.execute, 'Any X WHERE X eid %(x)s', {'x': eid})
   270             # cache test
   305         # cache test
   271             self.assertRaises(Unauthorized, cu.execute, 'Any X WHERE X eid %(x)s', {'x': eid})
   306         self.assertRaises(Unauthorized, cu.execute, 'Any X WHERE X eid %(x)s', {'x': eid})
   272             aff2 = cu.execute("INSERT Affaire X: X sujet 'cool'")[0][0]
   307         aff2 = cu.execute("INSERT Affaire X: X sujet 'cool'")[0][0]
   273             soc1 = cu.execute("INSERT Societe X: X nom 'chouette'")[0][0]
   308         soc1 = cu.execute("INSERT Societe X: X nom 'chouette'")[0][0]
   274             cu.execute("SET A concerne S WHERE A is Affaire, S is Societe")
   309         cu.execute("SET A concerne S WHERE A is Affaire, S is Societe")
   275             self.commit()
   310         cnx.commit()
   276             rset = cu.execute('Any X WHERE X eid %(x)s', {'x': aff2})
   311         rset = cu.execute('Any X WHERE X eid %(x)s', {'x': aff2})
   277             self.assertEqual(rset.rows, [[aff2]])
   312         self.assertEqual(rset.rows, [[aff2]])
   278             # more cache test w/ NOT eid
   313         # more cache test w/ NOT eid
   279             rset = cu.execute('Affaire X WHERE NOT X eid %(x)s', {'x': eid})
   314         rset = cu.execute('Affaire X WHERE NOT X eid %(x)s', {'x': eid})
   280             self.assertEqual(rset.rows, [[aff2]])
   315         self.assertEqual(rset.rows, [[aff2]])
   281             rset = cu.execute('Affaire X WHERE NOT X eid %(x)s', {'x': aff2})
   316         rset = cu.execute('Affaire X WHERE NOT X eid %(x)s', {'x': aff2})
   282             self.assertEqual(rset.rows, [])
   317         self.assertEqual(rset.rows, [])
   283             # test can't update an attribute of an entity that can't be readen
   318         # test can't update an attribute of an entity that can't be readen
   284             self.assertRaises(Unauthorized, cu.execute, 'SET X sujet "hacked" WHERE X eid %(x)s', {'x': eid})
   319         self.assertRaises(Unauthorized, cu.execute, 'SET X sujet "hacked" WHERE X eid %(x)s', {'x': eid})
   285             self.rollback()
   320         cnx.close()
       
   321 
   286 
   322 
   287 
   323     def test_entity_created_in_transaction(self):
   288     def test_entity_created_in_transaction(self):
   324         affschema = self.schema['Affaire']
   289         affschema = self.schema['Affaire']
   325         origperms = affschema.permissions['read']
   290         with self.temporary_permissions(Affaire={'read': affschema.permissions['add']}):
   326         affschema.set_action_permissions('read', affschema.permissions['add'])
   291             with self.login('iaminusersgrouponly') as cu:
   327         try:
   292                 aff2 = cu.execute("INSERT Affaire X: X sujet 'cool'")[0][0]
   328             cnx = self.login('iaminusersgrouponly')
   293                 # entity created in transaction are readable *by eid*
   329             cu = cnx.cursor()
   294                 self.assertTrue(cu.execute('Any X WHERE X eid %(x)s', {'x':aff2}))
   330             aff2 = cu.execute("INSERT Affaire X: X sujet 'cool'")[0][0]
   295                 # XXX would be nice if it worked
   331             # entity created in transaction are readable *by eid*
   296                 rset = cu.execute("Affaire X WHERE X sujet 'cool'")
   332             self.assertTrue(cu.execute('Any X WHERE X eid %(x)s', {'x':aff2}))
   297                 self.assertEqual(len(rset), 0)
   333             # XXX would be nice if it worked
   298                 self.assertRaises(Unauthorized, self.commit)
   334             rset = cu.execute("Affaire X WHERE X sujet 'cool'")
       
   335             self.assertEqual(len(rset), 0)
       
   336         finally:
       
   337             affschema.set_action_permissions('read', origperms)
       
   338             cnx.close()
       
   339 
   299 
   340     def test_read_erqlexpr_has_text1(self):
   300     def test_read_erqlexpr_has_text1(self):
   341         aff1 = self.execute("INSERT Affaire X: X sujet 'cool'")[0][0]
   301         aff1 = self.execute("INSERT Affaire X: X sujet 'cool'")[0][0]
   342         card1 = self.execute("INSERT Card X: X title 'cool'")[0][0]
   302         card1 = self.execute("INSERT Card X: X title 'cool'")[0][0]
   343         self.execute('SET X owned_by U WHERE X eid %(x)s, U login "iaminusersgrouponly"', {'x': card1})
   303         self.execute('SET X owned_by U WHERE X eid %(x)s, U login "iaminusersgrouponly"', {'x': card1})
   344         self.commit()
   304         self.commit()
   345         cnx = self.login('iaminusersgrouponly')
   305         with self.login('iaminusersgrouponly') as cu:
   346         cu = cnx.cursor()
   306             aff2 = cu.execute("INSERT Affaire X: X sujet 'cool'")[0][0]
   347         aff2 = cu.execute("INSERT Affaire X: X sujet 'cool'")[0][0]
   307             soc1 = cu.execute("INSERT Societe X: X nom 'chouette'")[0][0]
   348         soc1 = cu.execute("INSERT Societe X: X nom 'chouette'")[0][0]
   308             cu.execute("SET A concerne S WHERE A eid %(a)s, S eid %(s)s", {'a': aff2, 's': soc1})
   349         cu.execute("SET A concerne S WHERE A eid %(a)s, S eid %(s)s", {'a': aff2, 's': soc1})
   309             self.commit()
   350         cnx.commit()
   310             self.assertRaises(Unauthorized, cu.execute, 'Any X WHERE X eid %(x)s', {'x':aff1})
   351         self.assertRaises(Unauthorized, cu.execute, 'Any X WHERE X eid %(x)s', {'x':aff1})
   311             self.assertTrue(cu.execute('Any X WHERE X eid %(x)s', {'x':aff2}))
   352         self.assertTrue(cu.execute('Any X WHERE X eid %(x)s', {'x':aff2}))
   312             self.assertTrue(cu.execute('Any X WHERE X eid %(x)s', {'x':card1}))
   353         self.assertTrue(cu.execute('Any X WHERE X eid %(x)s', {'x':card1}))
   313             rset = cu.execute("Any X WHERE X has_text 'cool'")
   354         rset = cu.execute("Any X WHERE X has_text 'cool'")
   314             self.assertEqual(sorted(eid for eid, in rset.rows),
   355         self.assertEqual(sorted(eid for eid, in rset.rows),
   315                               [card1, aff2])
   356                           [card1, aff2])
   316             self.rollback()
   357         cnx.close()
       
   358 
   317 
   359     def test_read_erqlexpr_has_text2(self):
   318     def test_read_erqlexpr_has_text2(self):
   360         self.execute("INSERT Personne X: X nom 'bidule'")
   319         self.execute("INSERT Personne X: X nom 'bidule'")
   361         self.execute("INSERT Societe X: X nom 'bidule'")
   320         self.execute("INSERT Societe X: X nom 'bidule'")
   362         self.commit()
   321         self.commit()
   363         self.schema['Personne'].set_action_permissions('read', ('managers',))
   322         with self.temporary_permissions(Personne={'read': ('managers',)}):
   364         cnx = self.login('iaminusersgrouponly')
   323             with self.login('iaminusersgrouponly') as cu:
   365         cu = cnx.cursor()
   324                 rset = cu.execute('Any N WHERE N has_text "bidule"')
   366         rset = cu.execute('Any N WHERE N has_text "bidule"')
   325                 self.assertEqual(len(rset.rows), 1, rset.rows)
   367         self.assertEqual(len(rset.rows), 1, rset.rows)
   326                 rset = cu.execute('Any N WITH N BEING (Any N WHERE N has_text "bidule")')
   368         rset = cu.execute('Any N WITH N BEING (Any N WHERE N has_text "bidule")')
   327                 self.assertEqual(len(rset.rows), 1, rset.rows)
   369         self.assertEqual(len(rset.rows), 1, rset.rows)
       
   370         cnx.close()
       
   371 
   328 
   372     def test_read_erqlexpr_optional_rel(self):
   329     def test_read_erqlexpr_optional_rel(self):
   373         self.execute("INSERT Personne X: X nom 'bidule'")
   330         self.execute("INSERT Personne X: X nom 'bidule'")
   374         self.execute("INSERT Societe X: X nom 'bidule'")
   331         self.execute("INSERT Societe X: X nom 'bidule'")
   375         self.commit()
   332         self.commit()
   376         self.schema['Personne'].set_action_permissions('read', ('managers',))
   333         with self.temporary_permissions(Personne={'read': ('managers',)}):
   377         cnx = self.login('anon')
   334             with self.login('anon') as cu:
   378         cu = cnx.cursor()
   335                 rset = cu.execute('Any N,U WHERE N has_text "bidule", N owned_by U?')
   379         rset = cu.execute('Any N,U WHERE N has_text "bidule", N owned_by U?')
   336                 self.assertEqual(len(rset.rows), 1, rset.rows)
   380         self.assertEqual(len(rset.rows), 1, rset.rows)
       
   381         cnx.close()
       
   382 
   337 
   383     def test_read_erqlexpr_aggregat(self):
   338     def test_read_erqlexpr_aggregat(self):
   384         self.execute("INSERT Affaire X: X sujet 'cool'")[0][0]
   339         self.execute("INSERT Affaire X: X sujet 'cool'")[0][0]
   385         self.commit()
   340         self.commit()
   386         cnx = self.login('iaminusersgrouponly')
   341         with self.login('iaminusersgrouponly') as cu:
   387         cu = cnx.cursor()
   342             rset = cu.execute('Any COUNT(X) WHERE X is Affaire')
   388         rset = cu.execute('Any COUNT(X) WHERE X is Affaire')
   343             self.assertEqual(rset.rows, [[0]])
   389         self.assertEqual(rset.rows, [[0]])
   344             aff2 = cu.execute("INSERT Affaire X: X sujet 'cool'")[0][0]
   390         aff2 = cu.execute("INSERT Affaire X: X sujet 'cool'")[0][0]
   345             soc1 = cu.execute("INSERT Societe X: X nom 'chouette'")[0][0]
   391         soc1 = cu.execute("INSERT Societe X: X nom 'chouette'")[0][0]
   346             cu.execute("SET A concerne S WHERE A is Affaire, S is Societe")
   392         cu.execute("SET A concerne S WHERE A is Affaire, S is Societe")
   347             self.commit()
   393         cnx.commit()
   348             rset = cu.execute('Any COUNT(X) WHERE X is Affaire')
   394         rset = cu.execute('Any COUNT(X) WHERE X is Affaire')
   349             self.assertEqual(rset.rows, [[1]])
   395         self.assertEqual(rset.rows, [[1]])
   350             rset = cu.execute('Any ETN, COUNT(X) GROUPBY ETN WHERE X is ET, ET name ETN')
   396         rset = cu.execute('Any ETN, COUNT(X) GROUPBY ETN WHERE X is ET, ET name ETN')
   351             values = dict(rset)
   397         values = dict(rset)
   352             self.assertEqual(values['Affaire'], 1)
   398         self.assertEqual(values['Affaire'], 1)
   353             self.assertEqual(values['Societe'], 2)
   399         self.assertEqual(values['Societe'], 2)
   354             rset = cu.execute('Any ETN, COUNT(X) GROUPBY ETN WHERE X is ET, ET name ETN WITH X BEING ((Affaire X) UNION (Societe X))')
   400         rset = cu.execute('Any ETN, COUNT(X) GROUPBY ETN WHERE X is ET, ET name ETN WITH X BEING ((Affaire X) UNION (Societe X))')
   355             self.assertEqual(len(rset), 2)
   401         self.assertEqual(len(rset), 2)
   356             values = dict(rset)
   402         values = dict(rset)
   357             self.assertEqual(values['Affaire'], 1)
   403         self.assertEqual(values['Affaire'], 1)
   358             self.assertEqual(values['Societe'], 2)
   404         self.assertEqual(values['Societe'], 2)
       
   405         cnx.close()
       
   406 
   359 
   407 
   360 
   408     def test_attribute_security(self):
   361     def test_attribute_security(self):
   409         # only managers should be able to edit the 'test' attribute of Personne entities
   362         # only managers should be able to edit the 'test' attribute of Personne entities
   410         eid = self.execute("INSERT Personne X: X nom 'bidule', X web 'http://www.debian.org', X test TRUE")[0][0]
   363         eid = self.execute("INSERT Personne X: X nom 'bidule', X web 'http://www.debian.org', X test TRUE")[0][0]
   411         self.commit()
       
   412         self.execute('SET X test FALSE WHERE X eid %(x)s', {'x': eid})
   364         self.execute('SET X test FALSE WHERE X eid %(x)s', {'x': eid})
   413         self.commit()
   365         self.commit()
   414         cnx = self.login('iaminusersgrouponly')
   366         with self.login('iaminusersgrouponly') as cu:
   415         cu = cnx.cursor()
   367             cu.execute("INSERT Personne X: X nom 'bidule', X web 'http://www.debian.org', X test TRUE")
   416         cu.execute("INSERT Personne X: X nom 'bidule', X web 'http://www.debian.org', X test TRUE")
   368             self.assertRaises(Unauthorized, self.commit)
   417         self.assertRaises(Unauthorized, cnx.commit)
   369             cu.execute("INSERT Personne X: X nom 'bidule', X web 'http://www.debian.org', X test FALSE")
   418         cu.execute("INSERT Personne X: X nom 'bidule', X web 'http://www.debian.org', X test FALSE")
   370             self.assertRaises(Unauthorized, self.commit)
   419         self.assertRaises(Unauthorized, cnx.commit)
   371             eid = cu.execute("INSERT Personne X: X nom 'bidule', X web 'http://www.debian.org'")[0][0]
   420         eid = cu.execute("INSERT Personne X: X nom 'bidule', X web 'http://www.debian.org'")[0][0]
   372             self.commit()
   421         cnx.commit()
   373             cu.execute('SET X test FALSE WHERE X eid %(x)s', {'x': eid})
   422         cu.execute('SET X test FALSE WHERE X eid %(x)s', {'x': eid})
   374             self.assertRaises(Unauthorized, self.commit)
   423         self.assertRaises(Unauthorized, cnx.commit)
   375             cu.execute('SET X test TRUE WHERE X eid %(x)s', {'x': eid})
   424         cu.execute('SET X test TRUE WHERE X eid %(x)s', {'x': eid})
   376             self.assertRaises(Unauthorized, self.commit)
   425         self.assertRaises(Unauthorized, cnx.commit)
   377             cu.execute('SET X web "http://www.logilab.org" WHERE X eid %(x)s', {'x': eid})
   426         cu.execute('SET X web "http://www.logilab.org" WHERE X eid %(x)s', {'x': eid})
   378             self.commit()
   427         cnx.commit()
       
   428         cnx.close()
       
   429 
   379 
   430     def test_attribute_security_rqlexpr(self):
   380     def test_attribute_security_rqlexpr(self):
   431         # Note.para attribute editable by managers or if the note is in "todo" state
   381         # Note.para attribute editable by managers or if the note is in "todo" state
   432         note = self.execute("INSERT Note X: X para 'bidule'").get_entity(0, 0)
   382         note = self.execute("INSERT Note X: X para 'bidule'").get_entity(0, 0)
   433         self.commit()
   383         self.commit()
   434         note.cw_adapt_to('IWorkflowable').fire_transition('markasdone')
   384         note.cw_adapt_to('IWorkflowable').fire_transition('markasdone')
   435         self.execute('SET X para "truc" WHERE X eid %(x)s', {'x': note.eid})
   385         self.execute('SET X para "truc" WHERE X eid %(x)s', {'x': note.eid})
   436         self.commit()
   386         self.commit()
   437         cnx = self.login('iaminusersgrouponly')
   387         with self.login('iaminusersgrouponly') as cu:
   438         cu = cnx.cursor()
   388             cu.execute("SET X para 'chouette' WHERE X eid %(x)s", {'x': note.eid})
   439         cu.execute("SET X para 'chouette' WHERE X eid %(x)s", {'x': note.eid})
   389             self.assertRaises(Unauthorized, self.commit)
   440         self.assertRaises(Unauthorized, cnx.commit)
   390             note2 = cu.execute("INSERT Note X: X para 'bidule'").get_entity(0, 0)
   441         note2 = cu.execute("INSERT Note X: X para 'bidule'").get_entity(0, 0)
   391             self.commit()
   442         cnx.commit()
   392             note2.cw_adapt_to('IWorkflowable').fire_transition('markasdone')
   443         note2.cw_adapt_to('IWorkflowable').fire_transition('markasdone')
   393             self.commit()
   444         cnx.commit()
   394             self.assertEqual(len(cu.execute('Any X WHERE X in_state S, S name "todo", X eid %(x)s', {'x': note2.eid})),
   445         self.assertEqual(len(cu.execute('Any X WHERE X in_state S, S name "todo", X eid %(x)s', {'x': note2.eid})),
   395                               0)
   446                           0)
   396             cu.execute("SET X para 'chouette' WHERE X eid %(x)s", {'x': note2.eid})
   447         cu.execute("SET X para 'chouette' WHERE X eid %(x)s", {'x': note2.eid})
   397             self.assertRaises(Unauthorized, self.commit)
   448         self.assertRaises(Unauthorized, cnx.commit)
   398             note2.cw_adapt_to('IWorkflowable').fire_transition('redoit')
   449         note2.cw_adapt_to('IWorkflowable').fire_transition('redoit')
   399             self.commit()
   450         cnx.commit()
   400             cu.execute("SET X para 'chouette' WHERE X eid %(x)s", {'x': note2.eid})
   451         cu.execute("SET X para 'chouette' WHERE X eid %(x)s", {'x': note2.eid})
   401             self.commit()
   452         cnx.commit()
       
   453         cnx.close()
       
   454 
   402 
   455     def test_attribute_read_security(self):
   403     def test_attribute_read_security(self):
   456         # anon not allowed to see users'login, but they can see users
   404         # anon not allowed to see users'login, but they can see users
   457         self.repo.schema['CWUser'].set_action_permissions('read', ('guests', 'users', 'managers'))
   405         login_rdef = self.repo.schema['CWUser'].rdef('login')
   458         self.repo.schema['CWUser'].rdef('login').set_action_permissions('read', ('users', 'managers'))
   406         with self.temporary_permissions((login_rdef, {'read': ('users', 'managers')}),
   459         cnx = self.login('anon')
   407                                         CWUser={'read': ('guests', 'users', 'managers')}):
   460         cu = cnx.cursor()
   408             with self.login('anon') as cu:
   461         rset = cu.execute('CWUser X')
   409                 rset = cu.execute('CWUser X')
   462         self.assertTrue(rset)
   410                 self.assertTrue(rset)
   463         x = rset.get_entity(0, 0)
   411                 x = rset.get_entity(0, 0)
   464         self.assertEqual(x.login, None)
   412                 self.assertEqual(x.login, None)
   465         self.assertTrue(x.creation_date)
   413                 self.assertTrue(x.creation_date)
   466         x = rset.get_entity(1, 0)
   414                 x = rset.get_entity(1, 0)
   467         x.complete()
   415                 x.complete()
   468         self.assertEqual(x.login, None)
   416                 self.assertEqual(x.login, None)
   469         self.assertTrue(x.creation_date)
   417                 self.assertTrue(x.creation_date)
   470         cnx.rollback()
       
   471         cnx.close()
       
   472 
   418 
   473     def test_yams_inheritance_and_security_bug(self):
   419     def test_yams_inheritance_and_security_bug(self):
   474         oldperms = self.schema['Division'].permissions
   420         with self.temporary_permissions(Division={'read': ('managers', ERQLExpression('X owned_by U'))}):
   475         try:
   421             with self.login('iaminusersgrouponly'):
   476             self.schema['Division'].permissions = {
   422                 querier = self.repo.querier
   477                 'read': ('managers', ERQLExpression('X owned_by U')),
   423                 rqlst = querier.parse('Any X WHERE X is_instance_of Societe')
   478                 'add': ('managers', 'users'),
   424                 querier.solutions(self.session, rqlst, {})
   479                 'update': ('managers', 'owners'),
   425                 querier._annotate(rqlst)
   480                 'delete': ('managers', 'owners')}
   426                 plan = querier.plan_factory(rqlst, {}, self.session)
   481             self.login('iaminusersgrouponly')
   427                 plan.preprocess(rqlst)
   482             querier = self.repo.querier
   428                 self.assertEqual(
   483             rqlst = querier.parse('Any X WHERE X is_instance_of Societe')
   429                     rqlst.as_string(),
   484             querier.solutions(self.session, rqlst, {})
   430                     '(Any X WHERE X is IN(SubDivision, Societe)) UNION (Any X WHERE X is Division, EXISTS(X owned_by %(B)s))')
   485             querier._annotate(rqlst)
       
   486             plan = querier.plan_factory(rqlst, {}, self.session)
       
   487             plan.preprocess(rqlst)
       
   488             self.assertEqual(
       
   489                 rqlst.as_string(),
       
   490                 '(Any X WHERE X is IN(SubDivision, Societe)) UNION (Any X WHERE X is Division, EXISTS(X owned_by %(B)s))')
       
   491         finally:
       
   492             self.schema['Division'].permissions = oldperms
       
   493 
   431 
   494 
   432 
   495 class BaseSchemaSecurityTC(BaseSecurityTC):
   433 class BaseSchemaSecurityTC(BaseSecurityTC):
   496     """tests related to the base schema permission configuration"""
   434     """tests related to the base schema permission configuration"""
   497 
   435 
   498     def test_user_can_delete_object_he_created(self):
   436     def test_user_can_delete_object_he_created(self):
   499         # even if some other user have changed object'state
   437         # even if some other user have changed object'state
   500         cnx = self.login('iaminusersgrouponly')
   438         with self.login('iaminusersgrouponly') as cu:
   501         cu = cnx.cursor()
   439             # due to security test, affaire has to concerne a societe the user owns
   502         # due to security test, affaire has to concerne a societe the user owns
   440             cu.execute('INSERT Societe X: X nom "ARCTIA"')
   503         cu.execute('INSERT Societe X: X nom "ARCTIA"')
   441             cu.execute('INSERT Affaire X: X ref "ARCT01", X concerne S WHERE S nom "ARCTIA"')
   504         cu.execute('INSERT Affaire X: X ref "ARCT01", X concerne S WHERE S nom "ARCTIA"')
   442             self.commit()
   505         cnx.commit()
       
   506         self.restore_connection()
       
   507         affaire = self.execute('Any X WHERE X ref "ARCT01"').get_entity(0, 0)
   443         affaire = self.execute('Any X WHERE X ref "ARCT01"').get_entity(0, 0)
   508         affaire.cw_adapt_to('IWorkflowable').fire_transition('abort')
   444         affaire.cw_adapt_to('IWorkflowable').fire_transition('abort')
   509         self.commit()
   445         self.commit()
   510         self.assertEqual(len(self.execute('TrInfo X WHERE X wf_info_for A, A ref "ARCT01"')),
   446         self.assertEqual(len(self.execute('TrInfo X WHERE X wf_info_for A, A ref "ARCT01"')),
   511                           1)
   447                           1)
   512         self.assertEqual(len(self.execute('TrInfo X WHERE X wf_info_for A, A ref "ARCT01",'
   448         self.assertEqual(len(self.execute('TrInfo X WHERE X wf_info_for A, A ref "ARCT01",'
   513                                            'X owned_by U, U login "admin"')),
   449                                            'X owned_by U, U login "admin"')),
   514                           1) # TrInfo at the above state change
   450                           1) # TrInfo at the above state change
   515         cnx = self.login('iaminusersgrouponly')
   451         with self.login('iaminusersgrouponly') as cu:
   516         cu = cnx.cursor()
   452             cu.execute('DELETE Affaire X WHERE X ref "ARCT01"')
   517         cu.execute('DELETE Affaire X WHERE X ref "ARCT01"')
   453             self.commit()
   518         cnx.commit()
   454             self.assertFalse(cu.execute('Affaire X'))
   519         self.assertFalse(cu.execute('Affaire X'))
       
   520         cnx.close()
       
   521 
   455 
   522     def test_users_and_groups_non_readable_by_guests(self):
   456     def test_users_and_groups_non_readable_by_guests(self):
   523         cnx = self.login('anon')
   457         with self.login('anon') as cu:
   524         anon = cnx.user(self.session)
   458             anon = cu.connection.user(self.session)
   525         cu = cnx.cursor()
   459             # anonymous user can only read itself
   526         # anonymous user can only read itself
   460             rset = cu.execute('Any L WHERE X owned_by U, U login L')
   527         rset = cu.execute('Any L WHERE X owned_by U, U login L')
   461             self.assertEqual(rset.rows, [['anon']])
   528         self.assertEqual(rset.rows, [['anon']])
   462             rset = cu.execute('CWUser X')
   529         rset = cu.execute('CWUser X')
   463             self.assertEqual(rset.rows, [[anon.eid]])
   530         self.assertEqual(rset.rows, [[anon.eid]])
   464             # anonymous user can read groups (necessary to check allowed transitions for instance)
   531         # anonymous user can read groups (necessary to check allowed transitions for instance)
   465             self.assert_(cu.execute('CWGroup X'))
   532         self.assert_(cu.execute('CWGroup X'))
   466             # should only be able to read the anonymous user, not another one
   533         # should only be able to read the anonymous user, not another one
   467             origuser = self.adminsession.user
   534         origuser = self.adminsession.user
   468             self.assertRaises(Unauthorized,
   535         self.assertRaises(Unauthorized,
   469                               cu.execute, 'CWUser X WHERE X eid %(x)s', {'x': origuser.eid})
   536                           cu.execute, 'CWUser X WHERE X eid %(x)s', {'x': origuser.eid})
   470             # nothing selected, nothing updated, no exception raised
   537         # nothing selected, nothing updated, no exception raised
   471             #self.assertRaises(Unauthorized,
   538         #self.assertRaises(Unauthorized,
   472             #                  cu.execute, 'SET X login "toto" WHERE X eid %(x)s',
   539         #                  cu.execute, 'SET X login "toto" WHERE X eid %(x)s',
   473             #                  {'x': self.user.eid})
   540         #                  {'x': self.user.eid})
   474 
   541 
   475             rset = cu.execute('CWUser X WHERE X eid %(x)s', {'x': anon.eid})
   542         rset = cu.execute('CWUser X WHERE X eid %(x)s', {'x': anon.eid})
   476             self.assertEqual(rset.rows, [[anon.eid]])
   543         self.assertEqual(rset.rows, [[anon.eid]])
   477             # but can't modify it
   544         # but can't modify it
   478             cu.execute('SET X login "toto" WHERE X eid %(x)s', {'x': anon.eid})
   545         cu.execute('SET X login "toto" WHERE X eid %(x)s', {'x': anon.eid})
   479             self.assertRaises(Unauthorized, self.commit)
   546         self.assertRaises(Unauthorized, cnx.commit)
       
   547         cnx.close()
       
   548 
   480 
   549     def test_in_group_relation(self):
   481     def test_in_group_relation(self):
   550         cnx = self.login('iaminusersgrouponly')
   482         with self.login('iaminusersgrouponly') as cu:
   551         cu = cnx.cursor()
   483             rql = u"DELETE U in_group G WHERE U login 'admin'"
   552         rql = u"DELETE U in_group G WHERE U login 'admin'"
   484             self.assertRaises(Unauthorized, cu.execute, rql)
   553         self.assertRaises(Unauthorized, cu.execute, rql)
   485             rql = u"SET U in_group G WHERE U login 'admin', G name 'users'"
   554         rql = u"SET U in_group G WHERE U login 'admin', G name 'users'"
   486             self.assertRaises(Unauthorized, cu.execute, rql)
   555         self.assertRaises(Unauthorized, cu.execute, rql)
   487             self.rollback()
   556         cnx.close()
       
   557 
   488 
   558     def test_owned_by(self):
   489     def test_owned_by(self):
   559         self.execute("INSERT Personne X: X nom 'bidule'")
   490         self.execute("INSERT Personne X: X nom 'bidule'")
   560         self.commit()
   491         self.commit()
   561         cnx = self.login('iaminusersgrouponly')
   492         with self.login('iaminusersgrouponly') as cu:
   562         cu = cnx.cursor()
   493             rql = u"SET X owned_by U WHERE U login 'iaminusersgrouponly', X is Personne"
   563         rql = u"SET X owned_by U WHERE U login 'iaminusersgrouponly', X is Personne"
   494             self.assertRaises(Unauthorized, cu.execute, rql)
   564         self.assertRaises(Unauthorized, cu.execute, rql)
   495             self.rollback()
   565         cnx.close()
       
   566 
   496 
   567     def test_bookmarked_by_guests_security(self):
   497     def test_bookmarked_by_guests_security(self):
   568         beid1 = self.execute('INSERT Bookmark B: B path "?vid=manage", B title "manage"')[0][0]
   498         beid1 = self.execute('INSERT Bookmark B: B path "?vid=manage", B title "manage"')[0][0]
   569         beid2 = self.execute('INSERT Bookmark B: B path "?vid=index", B title "index", B bookmarked_by U WHERE U login "anon"')[0][0]
   499         beid2 = self.execute('INSERT Bookmark B: B path "?vid=index", B title "index", B bookmarked_by U WHERE U login "anon"')[0][0]
   570         self.commit()
   500         self.commit()
   571         cnx = self.login('anon')
   501         with self.login('anon') as cu:
   572         cu = cnx.cursor()
   502             anoneid = self.session.user.eid
   573         anoneid = self.session.user.eid
   503             self.assertEqual(cu.execute('Any T,P ORDERBY lower(T) WHERE B is Bookmark,B title T,B path P,'
   574         self.assertEqual(cu.execute('Any T,P ORDERBY lower(T) WHERE B is Bookmark,B title T,B path P,'
   504                                          'B bookmarked_by U, U eid %s' % anoneid).rows,
   575                                      'B bookmarked_by U, U eid %s' % anoneid).rows,
   505                               [['index', '?vid=index']])
   576                           [['index', '?vid=index']])
   506             self.assertEqual(cu.execute('Any T,P ORDERBY lower(T) WHERE B is Bookmark,B title T,B path P,'
   577         self.assertEqual(cu.execute('Any T,P ORDERBY lower(T) WHERE B is Bookmark,B title T,B path P,'
   507                                          'B bookmarked_by U, U eid %(x)s', {'x': anoneid}).rows,
   578                                      'B bookmarked_by U, U eid %(x)s', {'x': anoneid}).rows,
   508                               [['index', '?vid=index']])
   579                           [['index', '?vid=index']])
   509             # can read others bookmarks as well
   580         # can read others bookmarks as well
   510             self.assertEqual(cu.execute('Any B where B is Bookmark, NOT B bookmarked_by U').rows,
   581         self.assertEqual(cu.execute('Any B where B is Bookmark, NOT B bookmarked_by U').rows,
   511                               [[beid1]])
   582                           [[beid1]])
   512             self.assertRaises(Unauthorized, cu.execute,'DELETE B bookmarked_by U')
   583         self.assertRaises(Unauthorized, cu.execute,'DELETE B bookmarked_by U')
   513             self.assertRaises(Unauthorized,
   584         self.assertRaises(Unauthorized,
   514                               cu.execute, 'SET B bookmarked_by U WHERE U eid %(x)s, B eid %(b)s',
   585                           cu.execute, 'SET B bookmarked_by U WHERE U eid %(x)s, B eid %(b)s',
   515                               {'x': anoneid, 'b': beid1})
   586                           {'x': anoneid, 'b': beid1})
   516             self.rollback()
   587         cnx.close()
       
   588 
       
   589 
   517 
   590     def test_ambigous_ordered(self):
   518     def test_ambigous_ordered(self):
   591         cnx = self.login('anon')
   519         with self.login('anon') as cu:
   592         cu = cnx.cursor()
   520             names = [t for t, in cu.execute('Any N ORDERBY lower(N) WHERE X name N')]
   593         names = [t for t, in cu.execute('Any N ORDERBY lower(N) WHERE X name N')]
   521             self.assertEqual(names, sorted(names, key=lambda x: x.lower()))
   594         self.assertEqual(names, sorted(names, key=lambda x: x.lower()))
       
   595         cnx.close()
       
   596 
   522 
   597     def test_restrict_is_instance_ok(self):
   523     def test_restrict_is_instance_ok(self):
   598         from rql import RQLException
       
   599         rset = self.execute('Any X WHERE X is_instance_of BaseTransition')
   524         rset = self.execute('Any X WHERE X is_instance_of BaseTransition')
   600         rqlst = rset.syntax_tree()
   525         rqlst = rset.syntax_tree()
   601         select = rqlst.children[0]
   526         select = rqlst.children[0]
   602         x = select.get_selected_variables().next()
   527         x = select.get_selected_variables().next()
   603         self.assertRaises(RQLException, select.add_type_restriction,
   528         self.assertRaises(RQLException, select.add_type_restriction,
   618         """check a user change in_state without having update permission on the
   543         """check a user change in_state without having update permission on the
   619         subject
   544         subject
   620         """
   545         """
   621         eid = self.execute('INSERT Affaire X: X ref "ARCT01"')[0][0]
   546         eid = self.execute('INSERT Affaire X: X ref "ARCT01"')[0][0]
   622         self.commit()
   547         self.commit()
   623         cnx = self.login('iaminusersgrouponly')
   548         with self.login('iaminusersgrouponly') as cu:
   624         session = self.session
   549             session = self.session
   625         # needed to avoid check_perm error
   550             # needed to avoid check_perm error
   626         session.set_cnxset()
   551             session.set_cnxset()
   627         # needed to remove rql expr granting update perm to the user
   552             # needed to remove rql expr granting update perm to the user
   628         affaire_perms = self.schema['Affaire'].permissions.copy()
   553             affschema = self.schema['Affaire']
   629         self.schema['Affaire'].set_action_permissions('update', self.schema['Affaire'].get_groups('update'))
   554             with self.temporary_permissions(Affaire={'update': affschema.get_groups('update'),
   630         try:
   555                                                      'read': ('users',)}):
   631             self.assertRaises(Unauthorized,
   556                 self.assertRaises(Unauthorized,
   632                               self.schema['Affaire'].check_perm, session, 'update', eid=eid)
   557                                   affschema.check_perm, session, 'update', eid=eid)
   633             cu = cnx.cursor()
   558                 aff = cu.execute('Any X WHERE X ref "ARCT01"').get_entity(0, 0)
   634             self.schema['Affaire'].set_action_permissions('read', ('users',))
   559                 aff.cw_adapt_to('IWorkflowable').fire_transition('abort')
   635             aff = cu.execute('Any X WHERE X ref "ARCT01"').get_entity(0, 0)
   560                 self.commit()
   636             aff.cw_adapt_to('IWorkflowable').fire_transition('abort')
   561                 # though changing a user state (even logged user) is reserved to managers
   637             cnx.commit()
   562                 user = self.user(session)
   638             # though changing a user state (even logged user) is reserved to managers
   563                 session.set_cnxset()
   639             user = cnx.user(self.session)
   564                 # XXX wether it should raise Unauthorized or ValidationError is not clear
   640             # XXX wether it should raise Unauthorized or ValidationError is not clear
   565                 # the best would probably ValidationError if the transition doesn't exist
   641             # the best would probably ValidationError if the transition doesn't exist
   566                 # from the current state but Unauthorized if it exists but user can't pass it
   642             # from the current state but Unauthorized if it exists but user can't pass it
   567                 self.assertRaises(ValidationError,
   643             self.assertRaises(ValidationError,
   568                                   user.cw_adapt_to('IWorkflowable').fire_transition, 'deactivate')
   644                               user.cw_adapt_to('IWorkflowable').fire_transition, 'deactivate')
   569                 self.rollback() # else will fail on login cm exit
   645         finally:
       
   646             # restore orig perms
       
   647             for action, perms in affaire_perms.iteritems():
       
   648                 self.schema['Affaire'].set_action_permissions(action, perms)
       
   649         cnx.close()
       
   650 
   570 
   651     def test_trinfo_security(self):
   571     def test_trinfo_security(self):
   652         aff = self.execute('INSERT Affaire X: X ref "ARCT01"').get_entity(0, 0)
   572         aff = self.execute('INSERT Affaire X: X ref "ARCT01"').get_entity(0, 0)
   653         iworkflowable = aff.cw_adapt_to('IWorkflowable')
   573         iworkflowable = aff.cw_adapt_to('IWorkflowable')
   654         self.commit()
   574         self.commit()