server/test/unittest_security.py
changeset 9782 95e8fa2c8da8
parent 9777 b2e47617a94e
child 9954 79d34ba48612
child 9984 793377697c81
equal deleted inserted replaced
9781:f5728fc3c486 9782:95e8fa2c8da8
    15 #
    15 #
    16 # You should have received a copy of the GNU Lesser General Public License along
    16 # You should have received a copy of the GNU Lesser General Public License along
    17 # with CubicWeb.  If not, see <http://www.gnu.org/licenses/>.
    17 # with CubicWeb.  If not, see <http://www.gnu.org/licenses/>.
    18 """functional tests for server'security"""
    18 """functional tests for server'security"""
    19 
    19 
    20 import sys
    20 from logilab.common.testlib import unittest_main
    21 
       
    22 from logilab.common.testlib import unittest_main, TestCase
       
    23 
       
    24 from rql import RQLException
       
    25 
    21 
    26 from cubicweb.devtools.testlib import CubicWebTC
    22 from cubicweb.devtools.testlib import CubicWebTC
    27 from cubicweb import Unauthorized, ValidationError, QueryError, Binary
    23 from cubicweb import Unauthorized, ValidationError, QueryError, Binary
    28 from cubicweb.schema import ERQLExpression
    24 from cubicweb.schema import ERQLExpression
    29 from cubicweb.server.querier import check_read_access
    25 from cubicweb.server.querier import check_read_access
    32 
    28 
    33 class BaseSecurityTC(CubicWebTC):
    29 class BaseSecurityTC(CubicWebTC):
    34 
    30 
    35     def setup_database(self):
    31     def setup_database(self):
    36         super(BaseSecurityTC, self).setup_database()
    32         super(BaseSecurityTC, self).setup_database()
    37         self.create_user(self.request(), 'iaminusersgrouponly')
    33         with self.admin_access.client_cnx() as cnx:
    38         hash = _CRYPTO_CTX.encrypt('oldpassword', scheme='des_crypt')
    34             self.create_user(cnx, 'iaminusersgrouponly')
    39         self.create_user(self.request(), 'oldpassword', password=Binary(hash))
    35             hash = _CRYPTO_CTX.encrypt('oldpassword', scheme='des_crypt')
       
    36             self.create_user(cnx, 'oldpassword', password=Binary(hash))
    40 
    37 
    41 class LowLevelSecurityFunctionTC(BaseSecurityTC):
    38 class LowLevelSecurityFunctionTC(BaseSecurityTC):
    42 
    39 
    43     def test_check_read_access(self):
    40     def test_check_read_access(self):
    44         rql = u'Personne U where U nom "managers"'
    41         rql = u'Personne U where U nom "managers"'
    45         rqlst = self.repo.vreg.rqlhelper.parse(rql).children[0]
    42         rqlst = self.repo.vreg.rqlhelper.parse(rql).children[0]
    46         with self.temporary_permissions(Personne={'read': ('users', 'managers')}):
    43         with self.temporary_permissions(Personne={'read': ('users', 'managers')}):
    47             self.repo.vreg.solutions(self.session, rqlst, None)
    44             with self.admin_access.repo_cnx() as cnx:
    48             solution = rqlst.solutions[0]
    45                 self.repo.vreg.solutions(cnx, rqlst, None)
    49             check_read_access(self.session, rqlst, solution, {})
    46                 solution = rqlst.solutions[0]
    50             with self.login('anon') as cu:
    47                 check_read_access(cnx, rqlst, solution, {})
       
    48             with self.new_access('anon').repo_cnx() as cnx:
    51                 self.assertRaises(Unauthorized,
    49                 self.assertRaises(Unauthorized,
    52                                   check_read_access,
    50                                   check_read_access,
    53                                   self.session, rqlst, solution, {})
    51                                   cnx, rqlst, solution, {})
    54                 self.assertRaises(Unauthorized, cu.execute, rql)
    52                 self.assertRaises(Unauthorized, cnx.execute, rql)
    55 
    53 
    56     def test_upassword_not_selectable(self):
    54     def test_upassword_not_selectable(self):
    57         self.assertRaises(Unauthorized,
    55         with self.admin_access.repo_cnx() as cnx:
    58                           self.execute, 'Any X,P WHERE X is CWUser, X upassword P')
       
    59         self.rollback()
       
    60         with self.login('iaminusersgrouponly') as cu:
       
    61             self.assertRaises(Unauthorized,
    56             self.assertRaises(Unauthorized,
    62                               cu.execute, 'Any X,P WHERE X is CWUser, X upassword P')
    57                               cnx.execute, 'Any X,P WHERE X is CWUser, X upassword P')
       
    58         with self.new_access('iaminusersgrouponly').repo_cnx() as cnx:
       
    59             self.assertRaises(Unauthorized,
       
    60                               cnx.execute, 'Any X,P WHERE X is CWUser, X upassword P')
    63 
    61 
    64     def test_update_password(self):
    62     def test_update_password(self):
    65         """Ensure that if a user's password is stored with a deprecated hash,
    63         """Ensure that if a user's password is stored with a deprecated hash,
    66         it will be updated on next login
    64         it will be updated on next login
    67         """
    65         """
    68         oldhash = str(self.session.system_sql("SELECT cw_upassword FROM cw_CWUser "
    66         with self.repo.internal_cnx() as cnx:
    69                                               "WHERE cw_login = 'oldpassword'").fetchone()[0])
    67             oldhash = str(cnx.system_sql("SELECT cw_upassword FROM cw_CWUser "
    70         with self.login('oldpassword') as cu:
    68                                          "WHERE cw_login = 'oldpassword'").fetchone()[0])
    71             pass
    69             self.repo.close(self.repo.connect('oldpassword', password='oldpassword'))
    72         newhash = str(self.session.system_sql("SELECT cw_upassword FROM cw_CWUser "
    70             newhash = str(cnx.system_sql("SELECT cw_upassword FROM cw_CWUser "
    73                                               "WHERE cw_login = 'oldpassword'").fetchone()[0])
    71                                          "WHERE cw_login = 'oldpassword'").fetchone()[0])
    74         self.assertNotEqual(oldhash, newhash)
    72             self.assertNotEqual(oldhash, newhash)
    75         self.assertTrue(newhash.startswith('$6$'))
    73             self.assertTrue(newhash.startswith('$6$'))
    76         with self.login('oldpassword') as cu:
    74             self.repo.close(self.repo.connect('oldpassword', password='oldpassword'))
    77             pass
    75             self.assertEqual(newhash,
    78         self.assertEqual(newhash,
    76                              str(cnx.system_sql("SELECT cw_upassword FROM cw_CWUser WHERE "
    79                          str(self.session.system_sql("SELECT cw_upassword FROM cw_CWUser WHERE "
    77                                                 "cw_login = 'oldpassword'").fetchone()[0]))
    80                                                      "cw_login = 'oldpassword'").fetchone()[0]))
       
    81 
    78 
    82 
    79 
    83 class SecurityRewritingTC(BaseSecurityTC):
    80 class SecurityRewritingTC(BaseSecurityTC):
    84     def hijack_source_execute(self):
    81     def hijack_source_execute(self):
    85         def syntax_tree_search(*args, **kwargs):
    82         def syntax_tree_search(*args, **kwargs):
    90     def tearDown(self):
    87     def tearDown(self):
    91         self.repo.system_source.__dict__.pop('syntax_tree_search', None)
    88         self.repo.system_source.__dict__.pop('syntax_tree_search', None)
    92         super(SecurityRewritingTC, self).tearDown()
    89         super(SecurityRewritingTC, self).tearDown()
    93 
    90 
    94     def test_not_relation_read_security(self):
    91     def test_not_relation_read_security(self):
    95         with self.login('iaminusersgrouponly'):
    92         with self.new_access('iaminusersgrouponly').repo_cnx() as cnx:
    96             self.hijack_source_execute()
    93             self.hijack_source_execute()
    97             self.execute('Any U WHERE NOT A todo_by U, A is Affaire')
    94             cnx.execute('Any U WHERE NOT A todo_by U, A is Affaire')
    98             self.assertEqual(self.query[0][1].as_string(),
    95             self.assertEqual(self.query[0][1].as_string(),
    99                               'Any U WHERE NOT EXISTS(A todo_by U), A is Affaire')
    96                               'Any U WHERE NOT EXISTS(A todo_by U), A is Affaire')
   100             self.execute('Any U WHERE NOT EXISTS(A todo_by U), A is Affaire')
    97             cnx.execute('Any U WHERE NOT EXISTS(A todo_by U), A is Affaire')
   101             self.assertEqual(self.query[0][1].as_string(),
    98             self.assertEqual(self.query[0][1].as_string(),
   102                               'Any U WHERE NOT EXISTS(A todo_by U), A is Affaire')
    99                               'Any U WHERE NOT EXISTS(A todo_by U), A is Affaire')
   103 
   100 
   104 class SecurityTC(BaseSecurityTC):
   101 class SecurityTC(BaseSecurityTC):
   105 
   102 
   106     def setUp(self):
   103     def setUp(self):
   107         BaseSecurityTC.setUp(self)
   104         super(SecurityTC, self).setUp()
   108         # implicitly test manager can add some entities
   105         # implicitly test manager can add some entities
   109         self.execute("INSERT Affaire X: X sujet 'cool'")
   106         with self.admin_access.repo_cnx() as cnx:
   110         self.execute("INSERT Societe X: X nom 'logilab'")
   107             cnx.execute("INSERT Affaire X: X sujet 'cool'")
   111         self.execute("INSERT Personne X: X nom 'bidule'")
   108             cnx.execute("INSERT Societe X: X nom 'logilab'")
   112         self.execute('INSERT CWGroup X: X name "staff"')
   109             cnx.execute("INSERT Personne X: X nom 'bidule'")
   113         self.commit()
   110             cnx.execute('INSERT CWGroup X: X name "staff"')
       
   111             cnx.commit()
   114 
   112 
   115     def test_insert_security(self):
   113     def test_insert_security(self):
   116         with self.login('anon') as cu:
   114         with self.new_access('anon').repo_cnx() as cnx:
   117             cu.execute("INSERT Personne X: X nom 'bidule'")
   115             cnx.execute("INSERT Personne X: X nom 'bidule'")
   118             self.assertRaises(Unauthorized, self.commit)
   116             self.assertRaises(Unauthorized, cnx.commit)
   119             self.assertEqual(cu.execute('Personne X').rowcount, 1)
   117             self.assertEqual(cnx.execute('Personne X').rowcount, 1)
   120 
   118 
   121     def test_insert_rql_permission(self):
   119     def test_insert_rql_permission(self):
   122         # test user can only add une affaire related to a societe he owns
   120         # test user can only add une affaire related to a societe he owns
   123         with self.login('iaminusersgrouponly') as cu:
   121         with self.new_access('iaminusersgrouponly').repo_cnx() as cnx:
   124             cu.execute("INSERT Affaire X: X sujet 'cool'")
   122             cnx.execute("INSERT Affaire X: X sujet 'cool'")
   125             self.assertRaises(Unauthorized, self.commit)
   123             self.assertRaises(Unauthorized, cnx.commit)
   126         # test nothing has actually been inserted
   124         # test nothing has actually been inserted
   127         self.assertEqual(self.execute('Affaire X').rowcount, 1)
   125         with self.admin_access.repo_cnx() as cnx:
   128         with self.login('iaminusersgrouponly') as cu:
   126             self.assertEqual(cnx.execute('Affaire X').rowcount, 1)
   129             cu.execute("INSERT Affaire X: X sujet 'cool'")
   127         with self.new_access('iaminusersgrouponly').repo_cnx() as cnx:
   130             cu.execute("INSERT Societe X: X nom 'chouette'")
   128             cnx.execute("INSERT Affaire X: X sujet 'cool'")
   131             cu.execute("SET A concerne S WHERE A sujet 'cool', S nom 'chouette'")
   129             cnx.execute("INSERT Societe X: X nom 'chouette'")
   132             self.commit()
   130             cnx.execute("SET A concerne S WHERE A sujet 'cool', S nom 'chouette'")
       
   131             cnx.commit()
   133 
   132 
   134     def test_update_security_1(self):
   133     def test_update_security_1(self):
   135         with self.login('anon') as cu:
   134         with self.new_access('anon').repo_cnx() as cnx:
   136             # local security check
   135             # local security check
   137             cu.execute( "SET X nom 'bidulechouette' WHERE X is Personne")
   136             cnx.execute( "SET X nom 'bidulechouette' WHERE X is Personne")
   138             self.assertRaises(Unauthorized, self.commit)
   137             self.assertRaises(Unauthorized, cnx.commit)
   139         self.assertEqual(self.execute('Personne X WHERE X nom "bidulechouette"').rowcount, 0)
   138         with self.admin_access.repo_cnx() as cnx:
       
   139             self.assertEqual(cnx.execute('Personne X WHERE X nom "bidulechouette"').rowcount, 0)
   140 
   140 
   141     def test_update_security_2(self):
   141     def test_update_security_2(self):
   142         with self.temporary_permissions(Personne={'read': ('users', 'managers'),
   142         with self.temporary_permissions(Personne={'read': ('users', 'managers'),
   143                                                   'add': ('guests', 'users', 'managers')}):
   143                                                   'add': ('guests', 'users', 'managers')}):
   144             with self.login('anon') as cu:
   144             with self.new_access('anon').repo_cnx() as cnx:
   145                 self.assertRaises(Unauthorized, cu.execute,
   145                 self.assertRaises(Unauthorized, cnx.execute,
   146                                   "SET X nom 'bidulechouette' WHERE X is Personne")
   146                                   "SET X nom 'bidulechouette' WHERE X is Personne")
   147                 self.rollback()
       
   148                 # self.assertRaises(Unauthorized, cnx.commit)
       
   149         # test nothing has actually been inserted
   147         # test nothing has actually been inserted
   150         self.assertEqual(self.execute('Personne X WHERE X nom "bidulechouette"').rowcount, 0)
   148         with self.admin_access.repo_cnx() as cnx:
       
   149             self.assertEqual(cnx.execute('Personne X WHERE X nom "bidulechouette"').rowcount, 0)
   151 
   150 
   152     def test_update_security_3(self):
   151     def test_update_security_3(self):
   153         with self.login('iaminusersgrouponly') as cu:
   152         with self.new_access('iaminusersgrouponly').repo_cnx() as cnx:
   154             cu.execute("INSERT Personne X: X nom 'biduuule'")
   153             cnx.execute("INSERT Personne X: X nom 'biduuule'")
   155             cu.execute("INSERT Societe X: X nom 'looogilab'")
   154             cnx.execute("INSERT Societe X: X nom 'looogilab'")
   156             cu.execute("SET X travaille S WHERE X nom 'biduuule', S nom 'looogilab'")
   155             cnx.execute("SET X travaille S WHERE X nom 'biduuule', S nom 'looogilab'")
   157 
   156 
   158     def test_update_rql_permission(self):
   157     def test_update_rql_permission(self):
   159         self.execute("SET A concerne S WHERE A is Affaire, S is Societe")
   158         with self.admin_access.repo_cnx() as cnx:
   160         self.commit()
   159             cnx.execute("SET A concerne S WHERE A is Affaire, S is Societe")
       
   160             cnx.commit()
   161         # test user can only update une affaire related to a societe he owns
   161         # test user can only update une affaire related to a societe he owns
   162         with self.login('iaminusersgrouponly') as cu:
   162         with self.new_access('iaminusersgrouponly').repo_cnx() as cnx:
   163             cu.execute("SET X sujet 'pascool' WHERE X is Affaire")
   163             cnx.execute("SET X sujet 'pascool' WHERE X is Affaire")
   164             # this won't actually do anything since the selection query won't return anything
   164             # this won't actually do anything since the selection query won't return anything
   165             self.commit()
   165             cnx.commit()
   166             # to actually get Unauthorized exception, try to update an entity we can read
   166             # to actually get Unauthorized exception, try to update an entity we can read
   167             cu.execute("SET X nom 'toto' WHERE X is Societe")
   167             cnx.execute("SET X nom 'toto' WHERE X is Societe")
   168             self.assertRaises(Unauthorized, self.commit)
   168             self.assertRaises(Unauthorized, cnx.commit)
   169             cu.execute("INSERT Affaire X: X sujet 'pascool'")
   169             cnx.execute("INSERT Affaire X: X sujet 'pascool'")
   170             cu.execute("INSERT Societe X: X nom 'chouette'")
   170             cnx.execute("INSERT Societe X: X nom 'chouette'")
   171             cu.execute("SET A concerne S WHERE A sujet 'pascool', S nom 'chouette'")
   171             cnx.execute("SET A concerne S WHERE A sujet 'pascool', S nom 'chouette'")
   172             cu.execute("SET X sujet 'habahsicestcool' WHERE X sujet 'pascool'")
   172             cnx.execute("SET X sujet 'habahsicestcool' WHERE X sujet 'pascool'")
   173             self.commit()
   173             cnx.commit()
   174 
   174 
   175     def test_delete_security(self):
   175     def test_delete_security(self):
   176         # FIXME: sample below fails because we don't detect "owner" can't delete
   176         # FIXME: sample below fails because we don't detect "owner" can't delete
   177         # user anyway, and since no user with login == 'bidule' exists, no
   177         # user anyway, and since no user with login == 'bidule' exists, no
   178         # exception is raised
   178         # exception is raised
   179         #user._groups = {'guests':1}
   179         #user._groups = {'guests':1}
   180         #self.assertRaises(Unauthorized,
   180         #self.assertRaises(Unauthorized,
   181         #                  self.o.execute, user, "DELETE CWUser X WHERE X login 'bidule'")
   181         #                  self.o.execute, user, "DELETE CWUser X WHERE X login 'bidule'")
   182         # check local security
   182         # check local security
   183         with self.login('iaminusersgrouponly') as cu:
   183         with self.new_access('iaminusersgrouponly').repo_cnx() as cnx:
   184             self.assertRaises(Unauthorized, cu.execute, "DELETE CWGroup Y WHERE Y name 'staff'")
   184             self.assertRaises(Unauthorized, cnx.execute, "DELETE CWGroup Y WHERE Y name 'staff'")
   185             self.rollback()
       
   186 
   185 
   187     def test_delete_rql_permission(self):
   186     def test_delete_rql_permission(self):
   188         self.execute("SET A concerne S WHERE A is Affaire, S is Societe")
   187         with self.admin_access.repo_cnx() as cnx:
   189         self.commit()
   188             cnx.execute("SET A concerne S WHERE A is Affaire, S is Societe")
       
   189             cnx.commit()
   190         # test user can only dele une affaire related to a societe he owns
   190         # test user can only dele une affaire related to a societe he owns
   191         with self.login('iaminusersgrouponly') as cu:
   191         with self.new_access('iaminusersgrouponly').repo_cnx() as cnx:
   192             # this won't actually do anything since the selection query won't return anything
   192             # this won't actually do anything since the selection query won't return anything
   193             cu.execute("DELETE Affaire X")
   193             cnx.execute("DELETE Affaire X")
   194             self.commit()
   194             cnx.commit()
   195             # to actually get Unauthorized exception, try to delete an entity we can read
   195             # to actually get Unauthorized exception, try to delete an entity we can read
   196             self.assertRaises(Unauthorized, cu.execute, "DELETE Societe S")
   196             self.assertRaises(Unauthorized, cnx.execute, "DELETE Societe S")
   197             self.assertRaises(QueryError, self.commit) # can't commit anymore
   197             self.assertRaises(QueryError, cnx.commit) # can't commit anymore
   198             self.rollback() # required after Unauthorized
   198             cnx.rollback()
   199             cu.execute("INSERT Affaire X: X sujet 'pascool'")
   199             cnx.execute("INSERT Affaire X: X sujet 'pascool'")
   200             cu.execute("INSERT Societe X: X nom 'chouette'")
   200             cnx.execute("INSERT Societe X: X nom 'chouette'")
   201             cu.execute("SET A concerne S WHERE A sujet 'pascool', S nom 'chouette'")
   201             cnx.execute("SET A concerne S WHERE A sujet 'pascool', S nom 'chouette'")
   202             self.commit()
   202             cnx.commit()
   203 ##         # this one should fail since it will try to delete two affaires, one authorized
   203 ##         # this one should fail since it will try to delete two affaires, one authorized
   204 ##         # and the other not
   204 ##         # and the other not
   205 ##         self.assertRaises(Unauthorized, cu.execute, "DELETE Affaire X")
   205 ##         self.assertRaises(Unauthorized, cnx.execute, "DELETE Affaire X")
   206             cu.execute("DELETE Affaire X WHERE X sujet 'pascool'")
   206             cnx.execute("DELETE Affaire X WHERE X sujet 'pascool'")
   207             self.commit()
   207             cnx.commit()
   208 
       
   209 
   208 
   210     def test_insert_relation_rql_permission(self):
   209     def test_insert_relation_rql_permission(self):
   211         with self.login('iaminusersgrouponly') as cu:
   210         with self.new_access('iaminusersgrouponly').repo_cnx() as cnx:
   212             cu.execute("SET A concerne S WHERE A is Affaire, S is Societe")
   211             cnx.execute("SET A concerne S WHERE A is Affaire, S is Societe")
   213             # should raise Unauthorized since user don't own S though this won't
   212             # should raise Unauthorized since user don't own S though this won't
   214             # actually do anything since the selection query won't return
   213             # actually do anything since the selection query won't return
   215             # anything
   214             # anything
   216             self.commit()
   215             cnx.commit()
   217             # to actually get Unauthorized exception, try to insert a relation
   216             # to actually get Unauthorized exception, try to insert a relation
   218             # were we can read both entities
   217             # were we can read both entities
   219             rset = cu.execute('Personne P')
   218             rset = cnx.execute('Personne P')
   220             self.assertEqual(len(rset), 1)
   219             self.assertEqual(len(rset), 1)
   221             ent = rset.get_entity(0, 0)
   220             ent = rset.get_entity(0, 0)
   222             self.assertFalse(cu.execute('Any P,S WHERE P travaille S,P is Personne, S is Societe'))
   221             self.assertFalse(cnx.execute('Any P,S WHERE P travaille S,P is Personne, S is Societe'))
   223             self.assertRaises(Unauthorized, ent.cw_check_perm, 'update')
   222             self.assertRaises(Unauthorized, ent.cw_check_perm, 'update')
   224             self.assertRaises(Unauthorized,
   223             self.assertRaises(Unauthorized,
   225                               cu.execute, "SET P travaille S WHERE P is Personne, S is Societe")
   224                               cnx.execute, "SET P travaille S WHERE P is Personne, S is Societe")
   226             self.assertRaises(QueryError, self.commit) # can't commit anymore
   225             self.assertRaises(QueryError, cnx.commit) # can't commit anymore
   227             self.rollback()
   226             cnx.rollback()
   228             # test nothing has actually been inserted:
   227             # test nothing has actually been inserted:
   229             self.assertFalse(cu.execute('Any P,S WHERE P travaille S,P is Personne, S is Societe'))
   228             self.assertFalse(cnx.execute('Any P,S WHERE P travaille S,P is Personne, S is Societe'))
   230             cu.execute("INSERT Societe X: X nom 'chouette'")
   229             cnx.execute("INSERT Societe X: X nom 'chouette'")
   231             cu.execute("SET A concerne S WHERE A is Affaire, S nom 'chouette'")
   230             cnx.execute("SET A concerne S WHERE A is Affaire, S nom 'chouette'")
   232             self.commit()
   231             cnx.commit()
   233 
   232 
   234     def test_delete_relation_rql_permission(self):
   233     def test_delete_relation_rql_permission(self):
   235         self.execute("SET A concerne S WHERE A is Affaire, S is Societe")
   234         with self.admin_access.repo_cnx() as cnx:
   236         self.commit()
   235             cnx.execute("SET A concerne S WHERE A is Affaire, S is Societe")
   237         with self.login('iaminusersgrouponly') as cu:
   236             cnx.commit()
       
   237         with self.new_access('iaminusersgrouponly').repo_cnx() as cnx:
   238             # this won't actually do anything since the selection query won't return anything
   238             # this won't actually do anything since the selection query won't return anything
   239             cu.execute("DELETE A concerne S")
   239             cnx.execute("DELETE A concerne S")
   240             self.commit()
   240             cnx.commit()
   241         # to actually get Unauthorized exception, try to delete a relation we can read
   241         with self.admin_access.repo_cnx() as cnx:
   242         eid = self.execute("INSERT Affaire X: X sujet 'pascool'")[0][0]
   242             # to actually get Unauthorized exception, try to delete a relation we can read
   243         self.execute('SET X owned_by U WHERE X eid %(x)s, U login "iaminusersgrouponly"',
   243             eid = cnx.execute("INSERT Affaire X: X sujet 'pascool'")[0][0]
   244                      {'x': eid})
   244             cnx.execute('SET X owned_by U WHERE X eid %(x)s, U login "iaminusersgrouponly"',
   245         self.execute("SET A concerne S WHERE A sujet 'pascool', S is Societe")
   245                          {'x': eid})
   246         self.commit()
   246             cnx.execute("SET A concerne S WHERE A sujet 'pascool', S is Societe")
   247         with self.login('iaminusersgrouponly') as cu:
   247             cnx.commit()
   248             self.assertRaises(Unauthorized, cu.execute, "DELETE A concerne S")
   248         with self.new_access('iaminusersgrouponly').repo_cnx() as cnx:
   249             self.assertRaises(QueryError, self.commit) # can't commit anymore
   249             self.assertRaises(Unauthorized, cnx.execute, "DELETE A concerne S")
   250             self.rollback() # required after Unauthorized
   250             self.assertRaises(QueryError, cnx.commit) # can't commit anymore
   251             cu.execute("INSERT Societe X: X nom 'chouette'")
   251             cnx.rollback()
   252             cu.execute("SET A concerne S WHERE A is Affaire, S nom 'chouette'")
   252             cnx.execute("INSERT Societe X: X nom 'chouette'")
   253             self.commit()
   253             cnx.execute("SET A concerne S WHERE A is Affaire, S nom 'chouette'")
   254             cu.execute("DELETE A concerne S WHERE S nom 'chouette'")
   254             cnx.commit()
   255             self.commit()
   255             cnx.execute("DELETE A concerne S WHERE S nom 'chouette'")
       
   256             cnx.commit()
   256 
   257 
   257 
   258 
   258     def test_user_can_change_its_upassword(self):
   259     def test_user_can_change_its_upassword(self):
   259         req = self.request()
   260         with self.admin_access.repo_cnx() as cnx:
   260         ueid = self.create_user(req, 'user').eid
   261             ueid = self.create_user(cnx, 'user').eid
   261         with self.login('user') as cu:
   262         with self.new_access('user').repo_cnx() as cnx:
   262             cu.execute('SET X upassword %(passwd)s WHERE X eid %(x)s',
   263             cnx.execute('SET X upassword %(passwd)s WHERE X eid %(x)s',
   263                        {'x': ueid, 'passwd': 'newpwd'})
   264                        {'x': ueid, 'passwd': 'newpwd'})
   264             self.commit()
   265             cnx.commit()
   265         cnx = self.login('user', password='newpwd')
   266         self.repo.close(self.repo.connect('user', password='newpwd'))
   266         cnx.close()
       
   267 
   267 
   268     def test_user_cant_change_other_upassword(self):
   268     def test_user_cant_change_other_upassword(self):
   269         req = self.request()
   269         with self.admin_access.repo_cnx() as cnx:
   270         ueid = self.create_user(req, 'otheruser').eid
   270             ueid = self.create_user(cnx, 'otheruser').eid
   271         with self.login('iaminusersgrouponly') as cu:
   271         with self.new_access('iaminusersgrouponly').repo_cnx() as cnx:
   272             cu.execute('SET X upassword %(passwd)s WHERE X eid %(x)s',
   272             cnx.execute('SET X upassword %(passwd)s WHERE X eid %(x)s',
   273                        {'x': ueid, 'passwd': 'newpwd'})
   273                        {'x': ueid, 'passwd': 'newpwd'})
   274             self.assertRaises(Unauthorized, self.commit)
   274             self.assertRaises(Unauthorized, cnx.commit)
   275 
   275 
   276     # read security test
   276     # read security test
   277 
   277 
   278     def test_read_base(self):
   278     def test_read_base(self):
   279         with self.temporary_permissions(Personne={'read': ('users', 'managers')}):
   279         with self.temporary_permissions(Personne={'read': ('users', 'managers')}):
   280             with self.login('anon') as cu:
   280             with self.new_access('anon').repo_cnx() as cnx:
   281                 self.assertRaises(Unauthorized,
   281                 self.assertRaises(Unauthorized,
   282                                   cu.execute, 'Personne U where U nom "managers"')
   282                                   cnx.execute, 'Personne U where U nom "managers"')
   283                 self.rollback()
       
   284 
   283 
   285     def test_read_erqlexpr_base(self):
   284     def test_read_erqlexpr_base(self):
   286         eid = self.execute("INSERT Affaire X: X sujet 'cool'")[0][0]
   285         with self.admin_access.repo_cnx() as cnx:
   287         self.commit()
   286             eid = cnx.execute("INSERT Affaire X: X sujet 'cool'")[0][0]
   288         with self.login('iaminusersgrouponly') as cu:
   287             cnx.commit()
   289             rset = cu.execute('Affaire X')
   288         with self.new_access('iaminusersgrouponly').repo_cnx() as cnx:
       
   289             rset = cnx.execute('Affaire X')
   290             self.assertEqual(rset.rows, [])
   290             self.assertEqual(rset.rows, [])
   291             self.assertRaises(Unauthorized, cu.execute, 'Any X WHERE X eid %(x)s', {'x': eid})
   291             self.assertRaises(Unauthorized, cnx.execute, 'Any X WHERE X eid %(x)s', {'x': eid})
   292             # cache test
   292             # cache test
   293             self.assertRaises(Unauthorized, cu.execute, 'Any X WHERE X eid %(x)s', {'x': eid})
   293             self.assertRaises(Unauthorized, cnx.execute, 'Any X WHERE X eid %(x)s', {'x': eid})
   294             aff2 = cu.execute("INSERT Affaire X: X sujet 'cool'")[0][0]
   294             aff2 = cnx.execute("INSERT Affaire X: X sujet 'cool'")[0][0]
   295             soc1 = cu.execute("INSERT Societe X: X nom 'chouette'")[0][0]
   295             soc1 = cnx.execute("INSERT Societe X: X nom 'chouette'")[0][0]
   296             cu.execute("SET A concerne S WHERE A is Affaire, S is Societe")
   296             cnx.execute("SET A concerne S WHERE A is Affaire, S is Societe")
   297             self.commit()
   297             cnx.commit()
   298             rset = cu.execute('Any X WHERE X eid %(x)s', {'x': aff2})
   298             rset = cnx.execute('Any X WHERE X eid %(x)s', {'x': aff2})
   299             self.assertEqual(rset.rows, [[aff2]])
   299             self.assertEqual(rset.rows, [[aff2]])
   300             # more cache test w/ NOT eid
   300             # more cache test w/ NOT eid
   301             rset = cu.execute('Affaire X WHERE NOT X eid %(x)s', {'x': eid})
   301             rset = cnx.execute('Affaire X WHERE NOT X eid %(x)s', {'x': eid})
   302             self.assertEqual(rset.rows, [[aff2]])
   302             self.assertEqual(rset.rows, [[aff2]])
   303             rset = cu.execute('Affaire X WHERE NOT X eid %(x)s', {'x': aff2})
   303             rset = cnx.execute('Affaire X WHERE NOT X eid %(x)s', {'x': aff2})
   304             self.assertEqual(rset.rows, [])
   304             self.assertEqual(rset.rows, [])
   305             # test can't update an attribute of an entity that can't be readen
   305             # test can't update an attribute of an entity that can't be readen
   306             self.assertRaises(Unauthorized, cu.execute,
   306             self.assertRaises(Unauthorized, cnx.execute,
   307                               'SET X sujet "hacked" WHERE X eid %(x)s', {'x': eid})
   307                               'SET X sujet "hacked" WHERE X eid %(x)s', {'x': eid})
   308             self.rollback()
       
   309 
   308 
   310 
   309 
   311     def test_entity_created_in_transaction(self):
   310     def test_entity_created_in_transaction(self):
   312         affschema = self.schema['Affaire']
   311         affschema = self.schema['Affaire']
   313         with self.temporary_permissions(Affaire={'read': affschema.permissions['add']}):
   312         with self.temporary_permissions(Affaire={'read': affschema.permissions['add']}):
   314             with self.login('iaminusersgrouponly') as cu:
   313             with self.new_access('iaminusersgrouponly').repo_cnx() as cnx:
   315                 aff2 = cu.execute("INSERT Affaire X: X sujet 'cool'")[0][0]
   314                 aff2 = cnx.execute("INSERT Affaire X: X sujet 'cool'")[0][0]
   316                 # entity created in transaction are readable *by eid*
   315                 # entity created in transaction are readable *by eid*
   317                 self.assertTrue(cu.execute('Any X WHERE X eid %(x)s', {'x':aff2}))
   316                 self.assertTrue(cnx.execute('Any X WHERE X eid %(x)s', {'x':aff2}))
   318                 # XXX would be nice if it worked
   317                 # XXX would be nice if it worked
   319                 rset = cu.execute("Affaire X WHERE X sujet 'cool'")
   318                 rset = cnx.execute("Affaire X WHERE X sujet 'cool'")
   320                 self.assertEqual(len(rset), 0)
   319                 self.assertEqual(len(rset), 0)
   321                 self.assertRaises(Unauthorized, self.commit)
   320                 self.assertRaises(Unauthorized, cnx.commit)
   322 
   321 
   323     def test_read_erqlexpr_has_text1(self):
   322     def test_read_erqlexpr_has_text1(self):
   324         aff1 = self.execute("INSERT Affaire X: X sujet 'cool'")[0][0]
   323         with self.admin_access.repo_cnx() as cnx:
   325         card1 = self.execute("INSERT Card X: X title 'cool'")[0][0]
   324             aff1 = cnx.execute("INSERT Affaire X: X sujet 'cool'")[0][0]
   326         self.execute('SET X owned_by U WHERE X eid %(x)s, U login "iaminusersgrouponly"',
   325             card1 = cnx.execute("INSERT Card X: X title 'cool'")[0][0]
   327                      {'x': card1})
   326             cnx.execute('SET X owned_by U WHERE X eid %(x)s, U login "iaminusersgrouponly"',
   328         self.commit()
   327                         {'x': card1})
   329         with self.login('iaminusersgrouponly') as cu:
   328             cnx.commit()
   330             aff2 = cu.execute("INSERT Affaire X: X sujet 'cool'")[0][0]
   329         with self.new_access('iaminusersgrouponly').repo_cnx() as cnx:
   331             soc1 = cu.execute("INSERT Societe X: X nom 'chouette'")[0][0]
   330             aff2 = cnx.execute("INSERT Affaire X: X sujet 'cool'")[0][0]
   332             cu.execute("SET A concerne S WHERE A eid %(a)s, S eid %(s)s", {'a': aff2, 's': soc1})
   331             soc1 = cnx.execute("INSERT Societe X: X nom 'chouette'")[0][0]
   333             self.commit()
   332             cnx.execute("SET A concerne S WHERE A eid %(a)s, S eid %(s)s", {'a': aff2, 's': soc1})
   334             self.assertRaises(Unauthorized, cu.execute, 'Any X WHERE X eid %(x)s', {'x':aff1})
   333             cnx.commit()
   335             self.assertTrue(cu.execute('Any X WHERE X eid %(x)s', {'x':aff2}))
   334             self.assertRaises(Unauthorized, cnx.execute, 'Any X WHERE X eid %(x)s', {'x':aff1})
   336             self.assertTrue(cu.execute('Any X WHERE X eid %(x)s', {'x':card1}))
   335             self.assertTrue(cnx.execute('Any X WHERE X eid %(x)s', {'x':aff2}))
   337             rset = cu.execute("Any X WHERE X has_text 'cool'")
   336             self.assertTrue(cnx.execute('Any X WHERE X eid %(x)s', {'x':card1}))
       
   337             rset = cnx.execute("Any X WHERE X has_text 'cool'")
   338             self.assertEqual(sorted(eid for eid, in rset.rows),
   338             self.assertEqual(sorted(eid for eid, in rset.rows),
   339                               [card1, aff2])
   339                               [card1, aff2])
   340             self.rollback()
       
   341 
   340 
   342     def test_read_erqlexpr_has_text2(self):
   341     def test_read_erqlexpr_has_text2(self):
   343         self.execute("INSERT Personne X: X nom 'bidule'")
   342         with self.admin_access.repo_cnx() as cnx:
   344         self.execute("INSERT Societe X: X nom 'bidule'")
   343             cnx.execute("INSERT Personne X: X nom 'bidule'")
   345         self.commit()
   344             cnx.execute("INSERT Societe X: X nom 'bidule'")
       
   345             cnx.commit()
   346         with self.temporary_permissions(Personne={'read': ('managers',)}):
   346         with self.temporary_permissions(Personne={'read': ('managers',)}):
   347             with self.login('iaminusersgrouponly') as cu:
   347             with self.new_access('iaminusersgrouponly').repo_cnx() as cnx:
   348                 rset = cu.execute('Any N WHERE N has_text "bidule"')
   348                 rset = cnx.execute('Any N WHERE N has_text "bidule"')
   349                 self.assertEqual(len(rset.rows), 1, rset.rows)
   349                 self.assertEqual(len(rset.rows), 1, rset.rows)
   350                 rset = cu.execute('Any N WITH N BEING (Any N WHERE N has_text "bidule")')
   350                 rset = cnx.execute('Any N WITH N BEING (Any N WHERE N has_text "bidule")')
   351                 self.assertEqual(len(rset.rows), 1, rset.rows)
   351                 self.assertEqual(len(rset.rows), 1, rset.rows)
   352 
   352 
   353     def test_read_erqlexpr_optional_rel(self):
   353     def test_read_erqlexpr_optional_rel(self):
   354         self.execute("INSERT Personne X: X nom 'bidule'")
   354         with self.admin_access.repo_cnx() as cnx:
   355         self.execute("INSERT Societe X: X nom 'bidule'")
   355             cnx.execute("INSERT Personne X: X nom 'bidule'")
   356         self.commit()
   356             cnx.execute("INSERT Societe X: X nom 'bidule'")
       
   357             cnx.commit()
   357         with self.temporary_permissions(Personne={'read': ('managers',)}):
   358         with self.temporary_permissions(Personne={'read': ('managers',)}):
   358             with self.login('anon') as cu:
   359             with self.new_access('anon').repo_cnx() as cnx:
   359                 rset = cu.execute('Any N,U WHERE N has_text "bidule", N owned_by U?')
   360                 rset = cnx.execute('Any N,U WHERE N has_text "bidule", N owned_by U?')
   360                 self.assertEqual(len(rset.rows), 1, rset.rows)
   361                 self.assertEqual(len(rset.rows), 1, rset.rows)
   361 
   362 
   362     def test_read_erqlexpr_aggregat(self):
   363     def test_read_erqlexpr_aggregat(self):
   363         self.execute("INSERT Affaire X: X sujet 'cool'")[0][0]
   364         with self.admin_access.repo_cnx() as cnx:
   364         self.commit()
   365             cnx.execute("INSERT Affaire X: X sujet 'cool'")[0][0]
   365         with self.login('iaminusersgrouponly') as cu:
   366             cnx.commit()
   366             rset = cu.execute('Any COUNT(X) WHERE X is Affaire')
   367         with self.new_access('iaminusersgrouponly').repo_cnx() as cnx:
       
   368             rset = cnx.execute('Any COUNT(X) WHERE X is Affaire')
   367             self.assertEqual(rset.rows, [[0]])
   369             self.assertEqual(rset.rows, [[0]])
   368             aff2 = cu.execute("INSERT Affaire X: X sujet 'cool'")[0][0]
   370             aff2 = cnx.execute("INSERT Affaire X: X sujet 'cool'")[0][0]
   369             soc1 = cu.execute("INSERT Societe X: X nom 'chouette'")[0][0]
   371             soc1 = cnx.execute("INSERT Societe X: X nom 'chouette'")[0][0]
   370             cu.execute("SET A concerne S WHERE A is Affaire, S is Societe")
   372             cnx.execute("SET A concerne S WHERE A is Affaire, S is Societe")
   371             self.commit()
   373             cnx.commit()
   372             rset = cu.execute('Any COUNT(X) WHERE X is Affaire')
   374             rset = cnx.execute('Any COUNT(X) WHERE X is Affaire')
   373             self.assertEqual(rset.rows, [[1]])
   375             self.assertEqual(rset.rows, [[1]])
   374             rset = cu.execute('Any ETN, COUNT(X) GROUPBY ETN WHERE X is ET, ET name ETN')
   376             rset = cnx.execute('Any ETN, COUNT(X) GROUPBY ETN WHERE X is ET, ET name ETN')
   375             values = dict(rset)
   377             values = dict(rset)
   376             self.assertEqual(values['Affaire'], 1)
   378             self.assertEqual(values['Affaire'], 1)
   377             self.assertEqual(values['Societe'], 2)
   379             self.assertEqual(values['Societe'], 2)
   378             rset = cu.execute('Any ETN, COUNT(X) GROUPBY ETN WHERE X is ET, ET name ETN '
   380             rset = cnx.execute('Any ETN, COUNT(X) GROUPBY ETN WHERE X is ET, ET name ETN '
   379                               'WITH X BEING ((Affaire X) UNION (Societe X))')
   381                               'WITH X BEING ((Affaire X) UNION (Societe X))')
   380             self.assertEqual(len(rset), 2)
   382             self.assertEqual(len(rset), 2)
   381             values = dict(rset)
   383             values = dict(rset)
   382             self.assertEqual(values['Affaire'], 1)
   384             self.assertEqual(values['Affaire'], 1)
   383             self.assertEqual(values['Societe'], 2)
   385             self.assertEqual(values['Societe'], 2)
   384 
   386 
   385 
   387 
   386     def test_attribute_security(self):
   388     def test_attribute_security(self):
   387         # only managers should be able to edit the 'test' attribute of Personne entities
   389         with self.admin_access.repo_cnx() as cnx:
   388         eid = self.execute("INSERT Personne X: X nom 'bidule', "
   390             # only managers should be able to edit the 'test' attribute of Personne entities
   389                            "X web 'http://www.debian.org', X test TRUE")[0][0]
   391             eid = cnx.execute("INSERT Personne X: X nom 'bidule', "
   390         self.execute('SET X test FALSE WHERE X eid %(x)s', {'x': eid})
   392                                "X web 'http://www.debian.org', X test TRUE")[0][0]
   391         self.commit()
   393             cnx.execute('SET X test FALSE WHERE X eid %(x)s', {'x': eid})
   392         with self.login('iaminusersgrouponly') as cu:
   394             cnx.commit()
   393             cu.execute("INSERT Personne X: X nom 'bidule', "
   395         with self.new_access('iaminusersgrouponly').repo_cnx() as cnx:
       
   396             cnx.execute("INSERT Personne X: X nom 'bidule', "
   394                        "X web 'http://www.debian.org', X test TRUE")
   397                        "X web 'http://www.debian.org', X test TRUE")
   395             self.assertRaises(Unauthorized, self.commit)
   398             self.assertRaises(Unauthorized, cnx.commit)
   396             cu.execute("INSERT Personne X: X nom 'bidule', "
   399             cnx.execute("INSERT Personne X: X nom 'bidule', "
   397                        "X web 'http://www.debian.org', X test FALSE")
   400                        "X web 'http://www.debian.org', X test FALSE")
   398             self.assertRaises(Unauthorized, self.commit)
   401             self.assertRaises(Unauthorized, cnx.commit)
   399             eid = cu.execute("INSERT Personne X: X nom 'bidule', "
   402             eid = cnx.execute("INSERT Personne X: X nom 'bidule', "
   400                              "X web 'http://www.debian.org'")[0][0]
   403                              "X web 'http://www.debian.org'")[0][0]
   401             self.commit()
   404             cnx.commit()
   402             cu.execute('SET X test FALSE WHERE X eid %(x)s', {'x': eid})
   405             cnx.execute('SET X test FALSE WHERE X eid %(x)s', {'x': eid})
   403             self.assertRaises(Unauthorized, self.commit)
   406             self.assertRaises(Unauthorized, cnx.commit)
   404             cu.execute('SET X test TRUE WHERE X eid %(x)s', {'x': eid})
   407             cnx.execute('SET X test TRUE WHERE X eid %(x)s', {'x': eid})
   405             self.assertRaises(Unauthorized, self.commit)
   408             self.assertRaises(Unauthorized, cnx.commit)
   406             cu.execute('SET X web "http://www.logilab.org" WHERE X eid %(x)s', {'x': eid})
   409             cnx.execute('SET X web "http://www.logilab.org" WHERE X eid %(x)s', {'x': eid})
   407             self.commit()
   410             cnx.commit()
   408 
   411 
   409     def test_attribute_security_rqlexpr(self):
   412     def test_attribute_security_rqlexpr(self):
   410         # Note.para attribute editable by managers or if the note is in "todo" state
   413         with self.admin_access.repo_cnx() as cnx:
   411         note = self.execute("INSERT Note X: X para 'bidule'").get_entity(0, 0)
   414             # Note.para attribute editable by managers or if the note is in "todo" state
   412         self.commit()
   415             note = cnx.execute("INSERT Note X: X para 'bidule'").get_entity(0, 0)
   413         note.cw_adapt_to('IWorkflowable').fire_transition('markasdone')
   416             cnx.commit()
   414         self.execute('SET X para "truc" WHERE X eid %(x)s', {'x': note.eid})
   417             note.cw_adapt_to('IWorkflowable').fire_transition('markasdone')
   415         self.commit()
   418             cnx.execute('SET X para "truc" WHERE X eid %(x)s', {'x': note.eid})
   416         with self.login('iaminusersgrouponly') as cu:
   419             cnx.commit()
   417             cu.execute("SET X para 'chouette' WHERE X eid %(x)s", {'x': note.eid})
   420         with self.new_access('iaminusersgrouponly').repo_cnx() as cnx:
   418             self.assertRaises(Unauthorized, self.commit)
   421             cnx.execute("SET X para 'chouette' WHERE X eid %(x)s", {'x': note.eid})
   419             note2 = cu.execute("INSERT Note X: X para 'bidule'").get_entity(0, 0)
   422             self.assertRaises(Unauthorized, cnx.commit)
   420             self.commit()
   423             note2 = cnx.execute("INSERT Note X: X para 'bidule'").get_entity(0, 0)
       
   424             cnx.commit()
   421             note2.cw_adapt_to('IWorkflowable').fire_transition('markasdone')
   425             note2.cw_adapt_to('IWorkflowable').fire_transition('markasdone')
   422             self.commit()
   426             cnx.commit()
   423             self.assertEqual(len(cu.execute('Any X WHERE X in_state S, S name "todo", X eid %(x)s',
   427             self.assertEqual(len(cnx.execute('Any X WHERE X in_state S, S name "todo", X eid %(x)s',
   424                                             {'x': note2.eid})),
   428                                             {'x': note2.eid})),
   425                               0)
   429                               0)
   426             cu.execute("SET X para 'chouette' WHERE X eid %(x)s", {'x': note2.eid})
   430             cnx.execute("SET X para 'chouette' WHERE X eid %(x)s", {'x': note2.eid})
   427             self.assertRaises(Unauthorized, self.commit)
   431             self.assertRaises(Unauthorized, cnx.commit)
   428             note2.cw_adapt_to('IWorkflowable').fire_transition('redoit')
   432             note2.cw_adapt_to('IWorkflowable').fire_transition('redoit')
   429             self.commit()
   433             cnx.commit()
   430             cu.execute("SET X para 'chouette' WHERE X eid %(x)s", {'x': note2.eid})
   434             cnx.execute("SET X para 'chouette' WHERE X eid %(x)s", {'x': note2.eid})
   431             self.commit()
   435             cnx.commit()
   432             cu.execute("INSERT Note X: X something 'A'")
   436             cnx.execute("INSERT Note X: X something 'A'")
   433             self.assertRaises(Unauthorized, self.commit)
   437             self.assertRaises(Unauthorized, cnx.commit)
   434             cu.execute("INSERT Note X: X para 'zogzog', X something 'A'")
   438             cnx.execute("INSERT Note X: X para 'zogzog', X something 'A'")
   435             self.commit()
   439             cnx.commit()
   436             note = cu.execute("INSERT Note X").get_entity(0,0)
   440             note = cnx.execute("INSERT Note X").get_entity(0,0)
   437             self.commit()
   441             cnx.commit()
   438             note.cw_set(something=u'B')
   442             note.cw_set(something=u'B')
   439             self.commit()
   443             cnx.commit()
   440             note.cw_set(something=None, para=u'zogzog')
   444             note.cw_set(something=None, para=u'zogzog')
   441             self.commit()
   445             cnx.commit()
   442 
   446 
   443     def test_attribute_read_security(self):
   447     def test_attribute_read_security(self):
   444         # anon not allowed to see users'login, but they can see users
   448         # anon not allowed to see users'login, but they can see users
   445         login_rdef = self.repo.schema['CWUser'].rdef('login')
   449         login_rdef = self.repo.schema['CWUser'].rdef('login')
   446         with self.temporary_permissions((login_rdef, {'read': ('users', 'managers')}),
   450         with self.temporary_permissions((login_rdef, {'read': ('users', 'managers')}),
   447                                         CWUser={'read': ('guests', 'users', 'managers')}):
   451                                         CWUser={'read': ('guests', 'users', 'managers')}):
   448             with self.login('anon') as cu:
   452             with self.new_access('anon').repo_cnx() as cnx:
   449                 rset = cu.execute('CWUser X')
   453                 rset = cnx.execute('CWUser X')
   450                 self.assertTrue(rset)
   454                 self.assertTrue(rset)
   451                 x = rset.get_entity(0, 0)
   455                 x = rset.get_entity(0, 0)
   452                 self.assertEqual(x.login, None)
   456                 self.assertEqual(x.login, None)
   453                 self.assertTrue(x.creation_date)
   457                 self.assertTrue(x.creation_date)
   454                 x = rset.get_entity(1, 0)
   458                 x = rset.get_entity(1, 0)
   457                 self.assertTrue(x.creation_date)
   461                 self.assertTrue(x.creation_date)
   458 
   462 
   459     def test_yams_inheritance_and_security_bug(self):
   463     def test_yams_inheritance_and_security_bug(self):
   460         with self.temporary_permissions(Division={'read': ('managers',
   464         with self.temporary_permissions(Division={'read': ('managers',
   461                                                            ERQLExpression('X owned_by U'))}):
   465                                                            ERQLExpression('X owned_by U'))}):
   462             with self.login('iaminusersgrouponly'):
   466             with self.new_access('iaminusersgrouponly').repo_cnx() as cnx:
   463                 querier = self.repo.querier
   467                 querier = cnx.repo.querier
   464                 rqlst = querier.parse('Any X WHERE X is_instance_of Societe')
   468                 rqlst = querier.parse('Any X WHERE X is_instance_of Societe')
   465                 querier.solutions(self.session, rqlst, {})
   469                 querier.solutions(cnx, rqlst, {})
   466                 querier._annotate(rqlst)
   470                 querier._annotate(rqlst)
   467                 plan = querier.plan_factory(rqlst, {}, self.session)
   471                 plan = querier.plan_factory(rqlst, {}, cnx)
   468                 plan.preprocess(rqlst)
   472                 plan.preprocess(rqlst)
   469                 self.assertEqual(
   473                 self.assertEqual(
   470                     rqlst.as_string(),
   474                     rqlst.as_string(),
   471                     '(Any X WHERE X is IN(SubDivision, Societe)) UNION '
   475                     '(Any X WHERE X is IN(SubDivision, Societe)) UNION '
   472                     '(Any X WHERE X is Division, EXISTS(X owned_by %(B)s))')
   476                     '(Any X WHERE X is Division, EXISTS(X owned_by %(B)s))')
   475 class BaseSchemaSecurityTC(BaseSecurityTC):
   479 class BaseSchemaSecurityTC(BaseSecurityTC):
   476     """tests related to the base schema permission configuration"""
   480     """tests related to the base schema permission configuration"""
   477 
   481 
   478     def test_user_can_delete_object_he_created(self):
   482     def test_user_can_delete_object_he_created(self):
   479         # even if some other user have changed object'state
   483         # even if some other user have changed object'state
   480         with self.login('iaminusersgrouponly') as cu:
   484         with self.new_access('iaminusersgrouponly').repo_cnx() as cnx:
   481             # due to security test, affaire has to concerne a societe the user owns
   485             # due to security test, affaire has to concerne a societe the user owns
   482             cu.execute('INSERT Societe X: X nom "ARCTIA"')
   486             cnx.execute('INSERT Societe X: X nom "ARCTIA"')
   483             cu.execute('INSERT Affaire X: X ref "ARCT01", X concerne S WHERE S nom "ARCTIA"')
   487             cnx.execute('INSERT Affaire X: X ref "ARCT01", X concerne S WHERE S nom "ARCTIA"')
   484             self.commit()
   488             cnx.commit()
   485         affaire = self.execute('Any X WHERE X ref "ARCT01"').get_entity(0, 0)
   489         with self.admin_access.repo_cnx() as cnx:
   486         affaire.cw_adapt_to('IWorkflowable').fire_transition('abort')
   490             affaire = cnx.execute('Any X WHERE X ref "ARCT01"').get_entity(0, 0)
   487         self.commit()
   491             affaire.cw_adapt_to('IWorkflowable').fire_transition('abort')
   488         self.assertEqual(len(self.execute('TrInfo X WHERE X wf_info_for A, A ref "ARCT01"')),
   492             cnx.commit()
   489                           1)
   493             self.assertEqual(len(cnx.execute('TrInfo X WHERE X wf_info_for A, A ref "ARCT01"')),
   490         self.assertEqual(len(self.execute('TrInfo X WHERE X wf_info_for A, A ref "ARCT01",'
   494                              1)
   491                                            'X owned_by U, U login "admin"')),
   495             self.assertEqual(len(cnx.execute('TrInfo X WHERE X wf_info_for A, A ref "ARCT01",'
   492                           1) # TrInfo at the above state change
   496                                               'X owned_by U, U login "admin"')),
   493         with self.login('iaminusersgrouponly') as cu:
   497                              1) # TrInfo at the above state change
   494             cu.execute('DELETE Affaire X WHERE X ref "ARCT01"')
   498         with self.new_access('iaminusersgrouponly').repo_cnx() as cnx:
   495             self.commit()
   499             cnx.execute('DELETE Affaire X WHERE X ref "ARCT01"')
   496             self.assertFalse(cu.execute('Affaire X'))
   500             cnx.commit()
       
   501             self.assertFalse(cnx.execute('Affaire X'))
   497 
   502 
   498     def test_users_and_groups_non_readable_by_guests(self):
   503     def test_users_and_groups_non_readable_by_guests(self):
   499         with self.login('anon') as cu:
   504         with self.repo.internal_cnx() as cnx:
   500             anon = cu.connection.user(self.session)
   505             admineid = cnx.execute('CWUser U WHERE U login "admin"').rows[0][0]
       
   506         with self.new_access('anon').repo_cnx() as cnx:
       
   507             anon = cnx.user
   501             # anonymous user can only read itself
   508             # anonymous user can only read itself
   502             rset = cu.execute('Any L WHERE X owned_by U, U login L')
   509             rset = cnx.execute('Any L WHERE X owned_by U, U login L')
   503             self.assertEqual([['anon']], rset.rows)
   510             self.assertEqual([['anon']], rset.rows)
   504             rset = cu.execute('CWUser X')
   511             rset = cnx.execute('CWUser X')
   505             self.assertEqual([[anon.eid]], rset.rows)
   512             self.assertEqual([[anon.eid]], rset.rows)
   506             # anonymous user can read groups (necessary to check allowed transitions for instance)
   513             # anonymous user can read groups (necessary to check allowed transitions for instance)
   507             self.assert_(cu.execute('CWGroup X'))
   514             self.assert_(cnx.execute('CWGroup X'))
   508             # should only be able to read the anonymous user, not another one
   515             # should only be able to read the anonymous user, not another one
   509             origuser = self.adminsession.user
       
   510             self.assertRaises(Unauthorized,
   516             self.assertRaises(Unauthorized,
   511                               cu.execute, 'CWUser X WHERE X eid %(x)s', {'x': origuser.eid})
   517                               cnx.execute, 'CWUser X WHERE X eid %(x)s', {'x': admineid})
   512             # nothing selected, nothing updated, no exception raised
   518             rset = cnx.execute('CWUser X WHERE X eid %(x)s', {'x': anon.eid})
   513             #self.assertRaises(Unauthorized,
       
   514             #                  cu.execute, 'SET X login "toto" WHERE X eid %(x)s',
       
   515             #                  {'x': self.user.eid})
       
   516 
       
   517             rset = cu.execute('CWUser X WHERE X eid %(x)s', {'x': anon.eid})
       
   518             self.assertEqual([[anon.eid]], rset.rows)
   519             self.assertEqual([[anon.eid]], rset.rows)
   519             # but can't modify it
   520             # but can't modify it
   520             cu.execute('SET X login "toto" WHERE X eid %(x)s', {'x': anon.eid})
   521             cnx.execute('SET X login "toto" WHERE X eid %(x)s', {'x': anon.eid})
   521             self.assertRaises(Unauthorized, self.commit)
   522             self.assertRaises(Unauthorized, cnx.commit)
   522 
   523 
   523     def test_in_group_relation(self):
   524     def test_in_group_relation(self):
   524         with self.login('iaminusersgrouponly') as cu:
   525         with self.new_access('iaminusersgrouponly').repo_cnx() as cnx:
   525             rql = u"DELETE U in_group G WHERE U login 'admin'"
   526             rql = u"DELETE U in_group G WHERE U login 'admin'"
   526             self.assertRaises(Unauthorized, cu.execute, rql)
   527             self.assertRaises(Unauthorized, cnx.execute, rql)
   527             rql = u"SET U in_group G WHERE U login 'admin', G name 'users'"
   528             rql = u"SET U in_group G WHERE U login 'admin', G name 'users'"
   528             self.assertRaises(Unauthorized, cu.execute, rql)
   529             self.assertRaises(Unauthorized, cnx.execute, rql)
   529             self.rollback()
       
   530 
   530 
   531     def test_owned_by(self):
   531     def test_owned_by(self):
   532         self.execute("INSERT Personne X: X nom 'bidule'")
   532         with self.admin_access.repo_cnx() as cnx:
   533         self.commit()
   533             cnx.execute("INSERT Personne X: X nom 'bidule'")
   534         with self.login('iaminusersgrouponly') as cu:
   534             cnx.commit()
       
   535         with self.new_access('iaminusersgrouponly').repo_cnx() as cnx:
   535             rql = u"SET X owned_by U WHERE U login 'iaminusersgrouponly', X is Personne"
   536             rql = u"SET X owned_by U WHERE U login 'iaminusersgrouponly', X is Personne"
   536             self.assertRaises(Unauthorized, cu.execute, rql)
   537             self.assertRaises(Unauthorized, cnx.execute, rql)
   537             self.rollback()
       
   538 
   538 
   539     def test_bookmarked_by_guests_security(self):
   539     def test_bookmarked_by_guests_security(self):
   540         beid1 = self.execute('INSERT Bookmark B: B path "?vid=manage", B title "manage"')[0][0]
   540         with self.admin_access.repo_cnx() as cnx:
   541         beid2 = self.execute('INSERT Bookmark B: B path "?vid=index", B title "index", '
   541             beid1 = cnx.execute('INSERT Bookmark B: B path "?vid=manage", B title "manage"')[0][0]
   542                              'B bookmarked_by U WHERE U login "anon"')[0][0]
   542             beid2 = cnx.execute('INSERT Bookmark B: B path "?vid=index", B title "index", '
   543         self.commit()
   543                                 'B bookmarked_by U WHERE U login "anon"')[0][0]
   544         with self.login('anon') as cu:
   544             cnx.commit()
   545             anoneid = self.session.user.eid
   545         with self.new_access('anon').repo_cnx() as cnx:
   546             self.assertEqual(cu.execute('Any T,P ORDERBY lower(T) WHERE B is Bookmark,B title T,B path P,'
   546             anoneid = cnx.user.eid
       
   547             self.assertEqual(cnx.execute('Any T,P ORDERBY lower(T) WHERE B is Bookmark,B title T,B path P,'
   547                                          'B bookmarked_by U, U eid %s' % anoneid).rows,
   548                                          'B bookmarked_by U, U eid %s' % anoneid).rows,
   548                               [['index', '?vid=index']])
   549                               [['index', '?vid=index']])
   549             self.assertEqual(cu.execute('Any T,P ORDERBY lower(T) WHERE B is Bookmark,B title T,B path P,'
   550             self.assertEqual(cnx.execute('Any T,P ORDERBY lower(T) WHERE B is Bookmark,B title T,B path P,'
   550                                          'B bookmarked_by U, U eid %(x)s', {'x': anoneid}).rows,
   551                                          'B bookmarked_by U, U eid %(x)s', {'x': anoneid}).rows,
   551                               [['index', '?vid=index']])
   552                               [['index', '?vid=index']])
   552             # can read others bookmarks as well
   553             # can read others bookmarks as well
   553             self.assertEqual(cu.execute('Any B where B is Bookmark, NOT B bookmarked_by U').rows,
   554             self.assertEqual(cnx.execute('Any B where B is Bookmark, NOT B bookmarked_by U').rows,
   554                               [[beid1]])
   555                               [[beid1]])
   555             self.assertRaises(Unauthorized, cu.execute,'DELETE B bookmarked_by U')
   556             self.assertRaises(Unauthorized, cnx.execute,'DELETE B bookmarked_by U')
   556             self.assertRaises(Unauthorized,
   557             self.assertRaises(Unauthorized,
   557                               cu.execute, 'SET B bookmarked_by U WHERE U eid %(x)s, B eid %(b)s',
   558                               cnx.execute, 'SET B bookmarked_by U WHERE U eid %(x)s, B eid %(b)s',
   558                               {'x': anoneid, 'b': beid1})
   559                               {'x': anoneid, 'b': beid1})
   559             self.rollback()
       
   560 
   560 
   561     def test_ambigous_ordered(self):
   561     def test_ambigous_ordered(self):
   562         with self.login('anon') as cu:
   562         with self.new_access('anon').repo_cnx() as cnx:
   563             names = [t for t, in cu.execute('Any N ORDERBY lower(N) WHERE X name N')]
   563             names = [t for t, in cnx.execute('Any N ORDERBY lower(N) WHERE X name N')]
   564             self.assertEqual(names, sorted(names, key=lambda x: x.lower()))
   564             self.assertEqual(names, sorted(names, key=lambda x: x.lower()))
   565 
   565 
   566     def test_in_state_without_update_perm(self):
   566     def test_in_state_without_update_perm(self):
   567         """check a user change in_state without having update permission on the
   567         """check a user change in_state without having update permission on the
   568         subject
   568         subject
   569         """
   569         """
   570         eid = self.execute('INSERT Affaire X: X ref "ARCT01"')[0][0]
   570         with self.admin_access.repo_cnx() as cnx:
   571         self.commit()
   571             eid = cnx.execute('INSERT Affaire X: X ref "ARCT01"')[0][0]
   572         with self.login('iaminusersgrouponly') as cu:
   572             cnx.commit()
   573             session = self.session
   573         with self.new_access('iaminusersgrouponly').repo_cnx() as cnx:
   574             # needed to avoid check_perm error
       
   575             session.set_cnxset()
       
   576             # needed to remove rql expr granting update perm to the user
   574             # needed to remove rql expr granting update perm to the user
   577             affschema = self.schema['Affaire']
   575             affschema = self.schema['Affaire']
   578             with self.temporary_permissions(Affaire={'update': affschema.get_groups('update'),
   576             with self.temporary_permissions(Affaire={'update': affschema.get_groups('update'),
   579                                                      'read': ('users',)}):
   577                                                      'read': ('users',)}):
   580                 self.assertRaises(Unauthorized,
   578                 self.assertRaises(Unauthorized,
   581                                   affschema.check_perm, session, 'update', eid=eid)
   579                                   affschema.check_perm, cnx, 'update', eid=eid)
   582                 aff = cu.execute('Any X WHERE X ref "ARCT01"').get_entity(0, 0)
   580                 aff = cnx.execute('Any X WHERE X ref "ARCT01"').get_entity(0, 0)
   583                 aff.cw_adapt_to('IWorkflowable').fire_transition('abort')
   581                 aff.cw_adapt_to('IWorkflowable').fire_transition('abort')
   584                 self.commit()
   582                 cnx.commit()
   585                 # though changing a user state (even logged user) is reserved to managers
   583                 # though changing a user state (even logged user) is reserved to managers
   586                 user = self.user(session)
   584                 user = cnx.user
   587                 session.set_cnxset()
       
   588                 # XXX wether it should raise Unauthorized or ValidationError is not clear
   585                 # XXX wether it should raise Unauthorized or ValidationError is not clear
   589                 # the best would probably ValidationError if the transition doesn't exist
   586                 # the best would probably ValidationError if the transition doesn't exist
   590                 # from the current state but Unauthorized if it exists but user can't pass it
   587                 # from the current state but Unauthorized if it exists but user can't pass it
   591                 self.assertRaises(ValidationError,
   588                 self.assertRaises(ValidationError,
   592                                   user.cw_adapt_to('IWorkflowable').fire_transition, 'deactivate')
   589                                   user.cw_adapt_to('IWorkflowable').fire_transition, 'deactivate')
   593                 self.rollback() # else will fail on login cm exit
       
   594 
   590 
   595     def test_trinfo_security(self):
   591     def test_trinfo_security(self):
   596         aff = self.execute('INSERT Affaire X: X ref "ARCT01"').get_entity(0, 0)
   592         with self.admin_access.repo_cnx() as cnx:
   597         iworkflowable = aff.cw_adapt_to('IWorkflowable')
   593             aff = cnx.execute('INSERT Affaire X: X ref "ARCT01"').get_entity(0, 0)
   598         self.commit()
   594             iworkflowable = aff.cw_adapt_to('IWorkflowable')
   599         iworkflowable.fire_transition('abort')
   595             cnx.commit()
   600         self.commit()
   596             iworkflowable.fire_transition('abort')
   601         # can change tr info comment
   597             cnx.commit()
   602         self.execute('SET TI comment %(c)s WHERE TI wf_info_for X, X ref "ARCT01"',
   598             # can change tr info comment
   603                      {'c': u'bouh!'})
   599             cnx.execute('SET TI comment %(c)s WHERE TI wf_info_for X, X ref "ARCT01"',
   604         self.commit()
   600                          {'c': u'bouh!'})
   605         aff.cw_clear_relation_cache('wf_info_for', 'object')
   601             cnx.commit()
   606         trinfo = iworkflowable.latest_trinfo()
   602             aff.cw_clear_relation_cache('wf_info_for', 'object')
   607         self.assertEqual(trinfo.comment, 'bouh!')
   603             trinfo = iworkflowable.latest_trinfo()
   608         # but not from_state/to_state
   604             self.assertEqual(trinfo.comment, 'bouh!')
   609         aff.cw_clear_relation_cache('wf_info_for', role='object')
   605             # but not from_state/to_state
   610         self.assertRaises(Unauthorized,
   606             aff.cw_clear_relation_cache('wf_info_for', role='object')
   611                           self.execute, 'SET TI from_state S WHERE TI eid %(ti)s, S name "ben non"',
   607             self.assertRaises(Unauthorized, cnx.execute,
   612                           {'ti': trinfo.eid})
   608                               'SET TI from_state S WHERE TI eid %(ti)s, S name "ben non"',
   613         self.assertRaises(Unauthorized,
   609                               {'ti': trinfo.eid})
   614                           self.execute, 'SET TI to_state S WHERE TI eid %(ti)s, S name "pitetre"',
   610             self.assertRaises(Unauthorized, cnx.execute,
   615                           {'ti': trinfo.eid})
   611                               'SET TI to_state S WHERE TI eid %(ti)s, S name "pitetre"',
       
   612                               {'ti': trinfo.eid})
   616 
   613 
   617     def test_emailaddress_security(self):
   614     def test_emailaddress_security(self):
   618         # check for prexisting email adresse
   615         # check for prexisting email adresse
   619         if self.execute('Any X WHERE X is EmailAddress'):
   616         with self.admin_access.repo_cnx() as cnx:
   620             rset = self.execute('Any X, U WHERE X is EmailAddress, U use_email X')
   617             if cnx.execute('Any X WHERE X is EmailAddress'):
   621             msg = ['Preexisting email readable by anon found!']
   618                 rset = cnx.execute('Any X, U WHERE X is EmailAddress, U use_email X')
   622             tmpl = '  - "%s" used by user "%s"'
   619                 msg = ['Preexisting email readable by anon found!']
   623             for i in xrange(len(rset)):
   620                 tmpl = '  - "%s" used by user "%s"'
   624                 email, user = rset.get_entity(i, 0), rset.get_entity(i, 1)
   621                 for i in xrange(len(rset)):
   625                 msg.append(tmpl % (email.dc_title(), user.dc_title()))
   622                     email, user = rset.get_entity(i, 0), rset.get_entity(i, 1)
   626             raise RuntimeError('\n'.join(msg))
   623                     msg.append(tmpl % (email.dc_title(), user.dc_title()))
   627         # actual test
   624                 raise RuntimeError('\n'.join(msg))
   628         self.execute('INSERT EmailAddress X: X address "hop"').get_entity(0, 0)
   625             # actual test
   629         self.execute('INSERT EmailAddress X: X address "anon", '
   626             cnx.execute('INSERT EmailAddress X: X address "hop"').get_entity(0, 0)
   630                      'U use_email X WHERE U login "anon"').get_entity(0, 0)
   627             cnx.execute('INSERT EmailAddress X: X address "anon", '
   631         self.commit()
   628                          'U use_email X WHERE U login "anon"').get_entity(0, 0)
   632         self.assertEqual(len(self.execute('Any X WHERE X is EmailAddress')), 2)
   629             cnx.commit()
   633         self.login('anon')
   630             self.assertEqual(len(cnx.execute('Any X WHERE X is EmailAddress')), 2)
   634         self.assertEqual(len(self.execute('Any X WHERE X is EmailAddress')), 1)
   631         with self.new_access('anon').repo_cnx() as cnx:
       
   632             self.assertEqual(len(cnx.execute('Any X WHERE X is EmailAddress')), 1)
   635 
   633 
   636 if __name__ == '__main__':
   634 if __name__ == '__main__':
   637     unittest_main()
   635     unittest_main()