server/test/unittest_security.py
changeset 9777 b2e47617a94e
parent 9586 121c88b360d0
child 9782 95e8fa2c8da8
equal deleted inserted replaced
9776:d5413f2453a1 9777:b2e47617a94e
     1 # copyright 2003-2012 LOGILAB S.A. (Paris, FRANCE), all rights reserved.
     1 # copyright 2003-2014 LOGILAB S.A. (Paris, FRANCE), all rights reserved.
     2 # contact http://www.logilab.fr/ -- mailto:contact@logilab.fr
     2 # contact http://www.logilab.fr/ -- mailto:contact@logilab.fr
     3 #
     3 #
     4 # This file is part of CubicWeb.
     4 # This file is part of CubicWeb.
     5 #
     5 #
     6 # CubicWeb is free software: you can redistribute it and/or modify it under the
     6 # CubicWeb is free software: you can redistribute it and/or modify it under the
    60         with self.login('iaminusersgrouponly') as cu:
    60         with self.login('iaminusersgrouponly') as cu:
    61             self.assertRaises(Unauthorized,
    61             self.assertRaises(Unauthorized,
    62                               cu.execute, 'Any X,P WHERE X is CWUser, X upassword P')
    62                               cu.execute, 'Any X,P WHERE X is CWUser, X upassword P')
    63 
    63 
    64     def test_update_password(self):
    64     def test_update_password(self):
    65         """Ensure that if a user's password is stored with a deprecated hash, it will be updated on next login"""
    65         """Ensure that if a user's password is stored with a deprecated hash,
    66         oldhash = str(self.session.system_sql("SELECT cw_upassword FROM cw_CWUser WHERE cw_login = 'oldpassword'").fetchone()[0])
    66         it will be updated on next login
       
    67         """
       
    68         oldhash = str(self.session.system_sql("SELECT cw_upassword FROM cw_CWUser "
       
    69                                               "WHERE cw_login = 'oldpassword'").fetchone()[0])
    67         with self.login('oldpassword') as cu:
    70         with self.login('oldpassword') as cu:
    68             pass
    71             pass
    69         newhash = str(self.session.system_sql("SELECT cw_upassword FROM cw_CWUser WHERE cw_login = 'oldpassword'").fetchone()[0])
    72         newhash = str(self.session.system_sql("SELECT cw_upassword FROM cw_CWUser "
       
    73                                               "WHERE cw_login = 'oldpassword'").fetchone()[0])
    70         self.assertNotEqual(oldhash, newhash)
    74         self.assertNotEqual(oldhash, newhash)
    71         self.assertTrue(newhash.startswith('$6$'))
    75         self.assertTrue(newhash.startswith('$6$'))
    72         with self.login('oldpassword') as cu:
    76         with self.login('oldpassword') as cu:
    73             pass
    77             pass
    74         self.assertEqual(newhash, str(self.session.system_sql("SELECT cw_upassword FROM cw_CWUser WHERE cw_login = 'oldpassword'").fetchone()[0]))
    78         self.assertEqual(newhash,
       
    79                          str(self.session.system_sql("SELECT cw_upassword FROM cw_CWUser WHERE "
       
    80                                                      "cw_login = 'oldpassword'").fetchone()[0]))
    75 
    81 
    76 
    82 
    77 class SecurityRewritingTC(BaseSecurityTC):
    83 class SecurityRewritingTC(BaseSecurityTC):
    78     def hijack_source_execute(self):
    84     def hijack_source_execute(self):
    79         def syntax_tree_search(*args, **kwargs):
    85         def syntax_tree_search(*args, **kwargs):
   134 
   140 
   135     def test_update_security_2(self):
   141     def test_update_security_2(self):
   136         with self.temporary_permissions(Personne={'read': ('users', 'managers'),
   142         with self.temporary_permissions(Personne={'read': ('users', 'managers'),
   137                                                   'add': ('guests', 'users', 'managers')}):
   143                                                   'add': ('guests', 'users', 'managers')}):
   138             with self.login('anon') as cu:
   144             with self.login('anon') as cu:
   139                 self.assertRaises(Unauthorized, cu.execute, "SET X nom 'bidulechouette' WHERE X is Personne")
   145                 self.assertRaises(Unauthorized, cu.execute,
       
   146                                   "SET X nom 'bidulechouette' WHERE X is Personne")
   140                 self.rollback()
   147                 self.rollback()
   141                 # self.assertRaises(Unauthorized, cnx.commit)
   148                 # self.assertRaises(Unauthorized, cnx.commit)
   142         # test nothing has actually been inserted
   149         # test nothing has actually been inserted
   143         self.assertEqual(self.execute('Personne X WHERE X nom "bidulechouette"').rowcount, 0)
   150         self.assertEqual(self.execute('Personne X WHERE X nom "bidulechouette"').rowcount, 0)
   144 
   151 
   231             # this won't actually do anything since the selection query won't return anything
   238             # this won't actually do anything since the selection query won't return anything
   232             cu.execute("DELETE A concerne S")
   239             cu.execute("DELETE A concerne S")
   233             self.commit()
   240             self.commit()
   234         # to actually get Unauthorized exception, try to delete a relation we can read
   241         # to actually get Unauthorized exception, try to delete a relation we can read
   235         eid = self.execute("INSERT Affaire X: X sujet 'pascool'")[0][0]
   242         eid = self.execute("INSERT Affaire X: X sujet 'pascool'")[0][0]
   236         self.execute('SET X owned_by U WHERE X eid %(x)s, U login "iaminusersgrouponly"', {'x': eid})
   243         self.execute('SET X owned_by U WHERE X eid %(x)s, U login "iaminusersgrouponly"',
       
   244                      {'x': eid})
   237         self.execute("SET A concerne S WHERE A sujet 'pascool', S is Societe")
   245         self.execute("SET A concerne S WHERE A sujet 'pascool', S is Societe")
   238         self.commit()
   246         self.commit()
   239         with self.login('iaminusersgrouponly') as cu:
   247         with self.login('iaminusersgrouponly') as cu:
   240             self.assertRaises(Unauthorized, cu.execute, "DELETE A concerne S")
   248             self.assertRaises(Unauthorized, cu.execute, "DELETE A concerne S")
   241             self.assertRaises(QueryError, self.commit) # can't commit anymore
   249             self.assertRaises(QueryError, self.commit) # can't commit anymore
   293             rset = cu.execute('Affaire X WHERE NOT X eid %(x)s', {'x': eid})
   301             rset = cu.execute('Affaire X WHERE NOT X eid %(x)s', {'x': eid})
   294             self.assertEqual(rset.rows, [[aff2]])
   302             self.assertEqual(rset.rows, [[aff2]])
   295             rset = cu.execute('Affaire X WHERE NOT X eid %(x)s', {'x': aff2})
   303             rset = cu.execute('Affaire X WHERE NOT X eid %(x)s', {'x': aff2})
   296             self.assertEqual(rset.rows, [])
   304             self.assertEqual(rset.rows, [])
   297             # test can't update an attribute of an entity that can't be readen
   305             # test can't update an attribute of an entity that can't be readen
   298             self.assertRaises(Unauthorized, cu.execute, 'SET X sujet "hacked" WHERE X eid %(x)s', {'x': eid})
   306             self.assertRaises(Unauthorized, cu.execute,
       
   307                               'SET X sujet "hacked" WHERE X eid %(x)s', {'x': eid})
   299             self.rollback()
   308             self.rollback()
   300 
   309 
   301 
   310 
   302     def test_entity_created_in_transaction(self):
   311     def test_entity_created_in_transaction(self):
   303         affschema = self.schema['Affaire']
   312         affschema = self.schema['Affaire']
   312                 self.assertRaises(Unauthorized, self.commit)
   321                 self.assertRaises(Unauthorized, self.commit)
   313 
   322 
   314     def test_read_erqlexpr_has_text1(self):
   323     def test_read_erqlexpr_has_text1(self):
   315         aff1 = self.execute("INSERT Affaire X: X sujet 'cool'")[0][0]
   324         aff1 = self.execute("INSERT Affaire X: X sujet 'cool'")[0][0]
   316         card1 = self.execute("INSERT Card X: X title 'cool'")[0][0]
   325         card1 = self.execute("INSERT Card X: X title 'cool'")[0][0]
   317         self.execute('SET X owned_by U WHERE X eid %(x)s, U login "iaminusersgrouponly"', {'x': card1})
   326         self.execute('SET X owned_by U WHERE X eid %(x)s, U login "iaminusersgrouponly"',
       
   327                      {'x': card1})
   318         self.commit()
   328         self.commit()
   319         with self.login('iaminusersgrouponly') as cu:
   329         with self.login('iaminusersgrouponly') as cu:
   320             aff2 = cu.execute("INSERT Affaire X: X sujet 'cool'")[0][0]
   330             aff2 = cu.execute("INSERT Affaire X: X sujet 'cool'")[0][0]
   321             soc1 = cu.execute("INSERT Societe X: X nom 'chouette'")[0][0]
   331             soc1 = cu.execute("INSERT Societe X: X nom 'chouette'")[0][0]
   322             cu.execute("SET A concerne S WHERE A eid %(a)s, S eid %(s)s", {'a': aff2, 's': soc1})
   332             cu.execute("SET A concerne S WHERE A eid %(a)s, S eid %(s)s", {'a': aff2, 's': soc1})
   363             self.assertEqual(rset.rows, [[1]])
   373             self.assertEqual(rset.rows, [[1]])
   364             rset = cu.execute('Any ETN, COUNT(X) GROUPBY ETN WHERE X is ET, ET name ETN')
   374             rset = cu.execute('Any ETN, COUNT(X) GROUPBY ETN WHERE X is ET, ET name ETN')
   365             values = dict(rset)
   375             values = dict(rset)
   366             self.assertEqual(values['Affaire'], 1)
   376             self.assertEqual(values['Affaire'], 1)
   367             self.assertEqual(values['Societe'], 2)
   377             self.assertEqual(values['Societe'], 2)
   368             rset = cu.execute('Any ETN, COUNT(X) GROUPBY ETN WHERE X is ET, ET name ETN WITH X BEING ((Affaire X) UNION (Societe X))')
   378             rset = cu.execute('Any ETN, COUNT(X) GROUPBY ETN WHERE X is ET, ET name ETN '
       
   379                               'WITH X BEING ((Affaire X) UNION (Societe X))')
   369             self.assertEqual(len(rset), 2)
   380             self.assertEqual(len(rset), 2)
   370             values = dict(rset)
   381             values = dict(rset)
   371             self.assertEqual(values['Affaire'], 1)
   382             self.assertEqual(values['Affaire'], 1)
   372             self.assertEqual(values['Societe'], 2)
   383             self.assertEqual(values['Societe'], 2)
   373 
   384 
   374 
   385 
   375     def test_attribute_security(self):
   386     def test_attribute_security(self):
   376         # only managers should be able to edit the 'test' attribute of Personne entities
   387         # only managers should be able to edit the 'test' attribute of Personne entities
   377         eid = self.execute("INSERT Personne X: X nom 'bidule', X web 'http://www.debian.org', X test TRUE")[0][0]
   388         eid = self.execute("INSERT Personne X: X nom 'bidule', "
       
   389                            "X web 'http://www.debian.org', X test TRUE")[0][0]
   378         self.execute('SET X test FALSE WHERE X eid %(x)s', {'x': eid})
   390         self.execute('SET X test FALSE WHERE X eid %(x)s', {'x': eid})
   379         self.commit()
   391         self.commit()
   380         with self.login('iaminusersgrouponly') as cu:
   392         with self.login('iaminusersgrouponly') as cu:
   381             cu.execute("INSERT Personne X: X nom 'bidule', X web 'http://www.debian.org', X test TRUE")
   393             cu.execute("INSERT Personne X: X nom 'bidule', "
   382             self.assertRaises(Unauthorized, self.commit)
   394                        "X web 'http://www.debian.org', X test TRUE")
   383             cu.execute("INSERT Personne X: X nom 'bidule', X web 'http://www.debian.org', X test FALSE")
   395             self.assertRaises(Unauthorized, self.commit)
   384             self.assertRaises(Unauthorized, self.commit)
   396             cu.execute("INSERT Personne X: X nom 'bidule', "
   385             eid = cu.execute("INSERT Personne X: X nom 'bidule', X web 'http://www.debian.org'")[0][0]
   397                        "X web 'http://www.debian.org', X test FALSE")
       
   398             self.assertRaises(Unauthorized, self.commit)
       
   399             eid = cu.execute("INSERT Personne X: X nom 'bidule', "
       
   400                              "X web 'http://www.debian.org'")[0][0]
   386             self.commit()
   401             self.commit()
   387             cu.execute('SET X test FALSE WHERE X eid %(x)s', {'x': eid})
   402             cu.execute('SET X test FALSE WHERE X eid %(x)s', {'x': eid})
   388             self.assertRaises(Unauthorized, self.commit)
   403             self.assertRaises(Unauthorized, self.commit)
   389             cu.execute('SET X test TRUE WHERE X eid %(x)s', {'x': eid})
   404             cu.execute('SET X test TRUE WHERE X eid %(x)s', {'x': eid})
   390             self.assertRaises(Unauthorized, self.commit)
   405             self.assertRaises(Unauthorized, self.commit)
   403             self.assertRaises(Unauthorized, self.commit)
   418             self.assertRaises(Unauthorized, self.commit)
   404             note2 = cu.execute("INSERT Note X: X para 'bidule'").get_entity(0, 0)
   419             note2 = cu.execute("INSERT Note X: X para 'bidule'").get_entity(0, 0)
   405             self.commit()
   420             self.commit()
   406             note2.cw_adapt_to('IWorkflowable').fire_transition('markasdone')
   421             note2.cw_adapt_to('IWorkflowable').fire_transition('markasdone')
   407             self.commit()
   422             self.commit()
   408             self.assertEqual(len(cu.execute('Any X WHERE X in_state S, S name "todo", X eid %(x)s', {'x': note2.eid})),
   423             self.assertEqual(len(cu.execute('Any X WHERE X in_state S, S name "todo", X eid %(x)s',
       
   424                                             {'x': note2.eid})),
   409                               0)
   425                               0)
   410             cu.execute("SET X para 'chouette' WHERE X eid %(x)s", {'x': note2.eid})
   426             cu.execute("SET X para 'chouette' WHERE X eid %(x)s", {'x': note2.eid})
   411             self.assertRaises(Unauthorized, self.commit)
   427             self.assertRaises(Unauthorized, self.commit)
   412             note2.cw_adapt_to('IWorkflowable').fire_transition('redoit')
   428             note2.cw_adapt_to('IWorkflowable').fire_transition('redoit')
   413             self.commit()
   429             self.commit()
   439                 x.complete()
   455                 x.complete()
   440                 self.assertEqual(x.login, None)
   456                 self.assertEqual(x.login, None)
   441                 self.assertTrue(x.creation_date)
   457                 self.assertTrue(x.creation_date)
   442 
   458 
   443     def test_yams_inheritance_and_security_bug(self):
   459     def test_yams_inheritance_and_security_bug(self):
   444         with self.temporary_permissions(Division={'read': ('managers', ERQLExpression('X owned_by U'))}):
   460         with self.temporary_permissions(Division={'read': ('managers',
       
   461                                                            ERQLExpression('X owned_by U'))}):
   445             with self.login('iaminusersgrouponly'):
   462             with self.login('iaminusersgrouponly'):
   446                 querier = self.repo.querier
   463                 querier = self.repo.querier
   447                 rqlst = querier.parse('Any X WHERE X is_instance_of Societe')
   464                 rqlst = querier.parse('Any X WHERE X is_instance_of Societe')
   448                 querier.solutions(self.session, rqlst, {})
   465                 querier.solutions(self.session, rqlst, {})
   449                 querier._annotate(rqlst)
   466                 querier._annotate(rqlst)
   450                 plan = querier.plan_factory(rqlst, {}, self.session)
   467                 plan = querier.plan_factory(rqlst, {}, self.session)
   451                 plan.preprocess(rqlst)
   468                 plan.preprocess(rqlst)
   452                 self.assertEqual(
   469                 self.assertEqual(
   453                     rqlst.as_string(),
   470                     rqlst.as_string(),
   454                     '(Any X WHERE X is IN(SubDivision, Societe)) UNION (Any X WHERE X is Division, EXISTS(X owned_by %(B)s))')
   471                     '(Any X WHERE X is IN(SubDivision, Societe)) UNION '
       
   472                     '(Any X WHERE X is Division, EXISTS(X owned_by %(B)s))')
   455 
   473 
   456 
   474 
   457 class BaseSchemaSecurityTC(BaseSecurityTC):
   475 class BaseSchemaSecurityTC(BaseSecurityTC):
   458     """tests related to the base schema permission configuration"""
   476     """tests related to the base schema permission configuration"""
   459 
   477 
   518             self.assertRaises(Unauthorized, cu.execute, rql)
   536             self.assertRaises(Unauthorized, cu.execute, rql)
   519             self.rollback()
   537             self.rollback()
   520 
   538 
   521     def test_bookmarked_by_guests_security(self):
   539     def test_bookmarked_by_guests_security(self):
   522         beid1 = self.execute('INSERT Bookmark B: B path "?vid=manage", B title "manage"')[0][0]
   540         beid1 = self.execute('INSERT Bookmark B: B path "?vid=manage", B title "manage"')[0][0]
   523         beid2 = self.execute('INSERT Bookmark B: B path "?vid=index", B title "index", B bookmarked_by U WHERE U login "anon"')[0][0]
   541         beid2 = self.execute('INSERT Bookmark B: B path "?vid=index", B title "index", '
       
   542                              'B bookmarked_by U WHERE U login "anon"')[0][0]
   524         self.commit()
   543         self.commit()
   525         with self.login('anon') as cu:
   544         with self.login('anon') as cu:
   526             anoneid = self.session.user.eid
   545             anoneid = self.session.user.eid
   527             self.assertEqual(cu.execute('Any T,P ORDERBY lower(T) WHERE B is Bookmark,B title T,B path P,'
   546             self.assertEqual(cu.execute('Any T,P ORDERBY lower(T) WHERE B is Bookmark,B title T,B path P,'
   528                                          'B bookmarked_by U, U eid %s' % anoneid).rows,
   547                                          'B bookmarked_by U, U eid %s' % anoneid).rows,
   605                 email, user = rset.get_entity(i, 0), rset.get_entity(i, 1)
   624                 email, user = rset.get_entity(i, 0), rset.get_entity(i, 1)
   606                 msg.append(tmpl % (email.dc_title(), user.dc_title()))
   625                 msg.append(tmpl % (email.dc_title(), user.dc_title()))
   607             raise RuntimeError('\n'.join(msg))
   626             raise RuntimeError('\n'.join(msg))
   608         # actual test
   627         # actual test
   609         self.execute('INSERT EmailAddress X: X address "hop"').get_entity(0, 0)
   628         self.execute('INSERT EmailAddress X: X address "hop"').get_entity(0, 0)
   610         self.execute('INSERT EmailAddress X: X address "anon", U use_email X WHERE U login "anon"').get_entity(0, 0)
   629         self.execute('INSERT EmailAddress X: X address "anon", '
       
   630                      'U use_email X WHERE U login "anon"').get_entity(0, 0)
   611         self.commit()
   631         self.commit()
   612         self.assertEqual(len(self.execute('Any X WHERE X is EmailAddress')), 2)
   632         self.assertEqual(len(self.execute('Any X WHERE X is EmailAddress')), 2)
   613         self.login('anon')
   633         self.login('anon')
   614         self.assertEqual(len(self.execute('Any X WHERE X is EmailAddress')), 1)
   634         self.assertEqual(len(self.execute('Any X WHERE X is EmailAddress')), 1)
   615 
   635