# HG changeset patch # User Philippe Pepiot # Date 1553769576 -3600 # Node ID 3648a2c293f64608a1164a8b92ee94e8cc191676 # Parent 32ee89340e590e07edd20c6811742f8e25c6dfdf [server/test] make test filename uniques To avoid these pytest error when collecting the whole test suite: import file mismatch: imported module 'unittest_utils' has this __file__ attribute: cubicweb/cubicweb/server/test/unittest_utils.py which is not the same as the test file we want to collect: cubicweb/cubicweb/test/unittest_utils.py Move cubicweb/server/test/unittest_security.py to cubicweb/server/test/unittest_security.py and cubicweb/test/unittest_utils.py to cubicweb/test/unittest_server_utils.py diff -r 32ee89340e59 -r 3648a2c293f6 cubicweb/server/test/unittest_security.py --- a/cubicweb/server/test/unittest_security.py Fri May 24 16:29:14 2019 +0200 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,689 +0,0 @@ -# copyright 2003-2016 LOGILAB S.A. (Paris, FRANCE), all rights reserved. -# contact http://www.logilab.fr/ -- mailto:contact@logilab.fr -# -# This file is part of CubicWeb. -# -# CubicWeb is free software: you can redistribute it and/or modify it under the -# terms of the GNU Lesser General Public License as published by the Free -# Software Foundation, either version 2.1 of the License, or (at your option) -# any later version. -# -# CubicWeb is distributed in the hope that it will be useful, but WITHOUT -# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS -# FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more -# details. -# -# You should have received a copy of the GNU Lesser General Public License along -# with CubicWeb. If not, see . -"""functional tests for server'security""" - -from logilab.common.testlib import unittest_main - -from cubicweb.devtools.testlib import CubicWebTC -from cubicweb import Unauthorized, ValidationError, QueryError, Binary -from cubicweb.schema import ERQLExpression -from cubicweb.server.querier import get_local_checks, check_relations_read_access -from cubicweb.server.utils import _CRYPTO_CTX - - -class BaseSecurityTC(CubicWebTC): - - def setup_database(self): - super(BaseSecurityTC, self).setup_database() - with self.admin_access.client_cnx() as cnx: - self.create_user(cnx, u'iaminusersgrouponly') - hash = _CRYPTO_CTX.encrypt('oldpassword', scheme='des_crypt') - self.create_user(cnx, u'oldpassword', password=Binary(hash.encode('ascii'))) - - -class LowLevelSecurityFunctionTC(BaseSecurityTC): - - def test_check_relation_read_access(self): - rql = u'Personne U WHERE U nom "managers"' - rqlst = self.repo.vreg.rqlhelper.parse(rql).children[0] - nom = self.repo.schema['Personne'].rdef('nom') - with self.temporary_permissions((nom, {'read': ('users', 'managers')})): - with self.admin_access.repo_cnx() as cnx: - self.repo.vreg.solutions(cnx, rqlst, None) - check_relations_read_access(cnx, rqlst, {}) - with self.new_access(u'anon').repo_cnx() as cnx: - self.assertRaises(Unauthorized, - check_relations_read_access, - cnx, rqlst, {}) - self.assertRaises(Unauthorized, cnx.execute, rql) - - def test_get_local_checks(self): - rql = u'Personne U WHERE U nom "managers"' - rqlst = self.repo.vreg.rqlhelper.parse(rql).children[0] - with self.temporary_permissions(Personne={'read': ('users', 'managers')}): - with self.admin_access.repo_cnx() as cnx: - self.repo.vreg.solutions(cnx, rqlst, None) - solution = rqlst.solutions[0] - localchecks = get_local_checks(cnx, rqlst, solution) - self.assertEqual({}, localchecks) - with self.new_access(u'anon').repo_cnx() as cnx: - self.assertRaises(Unauthorized, - get_local_checks, - cnx, rqlst, solution) - self.assertRaises(Unauthorized, cnx.execute, rql) - - def test_upassword_not_selectable(self): - with self.admin_access.repo_cnx() as cnx: - self.assertRaises(Unauthorized, - cnx.execute, 'Any X,P WHERE X is CWUser, X upassword P') - with self.new_access(u'iaminusersgrouponly').repo_cnx() as cnx: - self.assertRaises(Unauthorized, - cnx.execute, 'Any X,P WHERE X is CWUser, X upassword P') - - def test_update_password(self): - """Ensure that if a user's password is stored with a deprecated hash, - it will be updated on next login - """ - with self.repo.internal_cnx() as cnx: - oldhash = cnx.system_sql("SELECT cw_upassword FROM cw_CWUser " - "WHERE cw_login = 'oldpassword'").fetchone()[0] - oldhash = self.repo.system_source.binary_to_str(oldhash) - self.repo.authenticate_user(cnx, 'oldpassword', password='oldpassword') - newhash = cnx.system_sql("SELECT cw_upassword FROM cw_CWUser " - "WHERE cw_login = 'oldpassword'").fetchone()[0] - newhash = self.repo.system_source.binary_to_str(newhash) - self.assertNotEqual(oldhash, newhash) - self.assertTrue(newhash.startswith(b'$6$')) - self.repo.authenticate_user(cnx, 'oldpassword', password='oldpassword') - newnewhash = cnx.system_sql("SELECT cw_upassword FROM cw_CWUser WHERE " - "cw_login = 'oldpassword'").fetchone()[0] - newnewhash = self.repo.system_source.binary_to_str(newnewhash) - self.assertEqual(newhash, newnewhash) - - -class SecurityRewritingTC(BaseSecurityTC): - def hijack_source_execute(self): - def syntax_tree_search(*args, **kwargs): - self.query = (args, kwargs) - return [] - self.repo.system_source.syntax_tree_search = syntax_tree_search - - def tearDown(self): - self.repo.system_source.__dict__.pop('syntax_tree_search', None) - super(SecurityRewritingTC, self).tearDown() - - def test_not_relation_read_security(self): - with self.new_access(u'iaminusersgrouponly').repo_cnx() as cnx: - cnx.user.groups # fill the cache before screwing syntax_tree_search - self.hijack_source_execute() - cnx.execute('Any U WHERE NOT A todo_by U, A is Affaire') - self.assertEqual(self.query[0][1].as_string(), - 'Any U WHERE NOT EXISTS(A todo_by U), A is Affaire') - cnx.execute('Any U WHERE NOT EXISTS(A todo_by U), A is Affaire') - self.assertEqual(self.query[0][1].as_string(), - 'Any U WHERE NOT EXISTS(A todo_by U), A is Affaire') - - -class SecurityTC(BaseSecurityTC): - - def setUp(self): - super(SecurityTC, self).setUp() - # implicitly test manager can add some entities - with self.admin_access.repo_cnx() as cnx: - cnx.execute("INSERT Affaire X: X sujet 'cool'") - cnx.execute("INSERT Societe X: X nom 'logilab'") - cnx.execute("INSERT Personne X: X nom 'bidule'") - cnx.execute('INSERT CWGroup X: X name "staff"') - cnx.commit() - - def test_insert_security(self): - with self.new_access(u'anon').repo_cnx() as cnx: - cnx.execute("INSERT Personne X: X nom 'bidule'") - self.assertRaises(Unauthorized, cnx.commit) - self.assertEqual(cnx.execute('Personne X').rowcount, 1) - - def test_insert_security_2(self): - with self.new_access(u'anon').repo_cnx() as cnx: - cnx.execute("INSERT Affaire X") - self.assertRaises(Unauthorized, cnx.commit) - # anon has no read permission on Affaire entities, so - # rowcount == 0 - self.assertEqual(cnx.execute('Affaire X').rowcount, 0) - - def test_insert_rql_permission(self): - # test user can only add une affaire related to a societe he owns - with self.new_access(u'iaminusersgrouponly').repo_cnx() as cnx: - cnx.execute("INSERT Affaire X: X sujet 'cool'") - self.assertRaises(Unauthorized, cnx.commit) - # test nothing has actually been inserted - with self.admin_access.repo_cnx() as cnx: - self.assertEqual(cnx.execute('Affaire X').rowcount, 1) - with self.new_access(u'iaminusersgrouponly').repo_cnx() as cnx: - cnx.execute("INSERT Affaire X: X sujet 'cool'") - cnx.execute("INSERT Societe X: X nom 'chouette'") - cnx.execute("SET A concerne S WHERE A sujet 'cool', S nom 'chouette'") - cnx.commit() - - def test_update_security_1(self): - with self.new_access(u'anon').repo_cnx() as cnx: - # local security check - cnx.execute( "SET X nom 'bidulechouette' WHERE X is Personne") - self.assertRaises(Unauthorized, cnx.commit) - with self.admin_access.repo_cnx() as cnx: - self.assertEqual(cnx.execute('Personne X WHERE X nom "bidulechouette"').rowcount, 0) - - def test_update_security_2(self): - with self.temporary_permissions(Personne={'read': ('users', 'managers'), - 'add': ('guests', 'users', 'managers')}): - with self.new_access(u'anon').repo_cnx() as cnx: - self.assertRaises(Unauthorized, cnx.execute, - "SET X nom 'bidulechouette' WHERE X is Personne") - # test nothing has actually been inserted - with self.admin_access.repo_cnx() as cnx: - self.assertEqual(cnx.execute('Personne X WHERE X nom "bidulechouette"').rowcount, 0) - - def test_update_security_3(self): - with self.new_access(u'iaminusersgrouponly').repo_cnx() as cnx: - cnx.execute("INSERT Personne X: X nom 'biduuule'") - cnx.execute("INSERT Societe X: X nom 'looogilab'") - cnx.execute("SET X travaille S WHERE X nom 'biduuule', S nom 'looogilab'") - - def test_insert_immutable_attribute_update(self): - with self.admin_access.repo_cnx() as cnx: - cnx.create_entity('Old', name=u'Babar') - cnx.commit() - # this should be equivalent - o = cnx.create_entity('Old') - o.cw_set(name=u'Celeste') - cnx.commit() - - def test_update_rql_permission(self): - with self.admin_access.repo_cnx() as cnx: - cnx.execute("SET A concerne S WHERE A is Affaire, S is Societe") - cnx.commit() - # test user can only update une affaire related to a societe he owns - with self.new_access(u'iaminusersgrouponly').repo_cnx() as cnx: - cnx.execute("SET X sujet 'pascool' WHERE X is Affaire") - # this won't actually do anything since the selection query won't return anything - cnx.commit() - # to actually get Unauthorized exception, try to update an entity we can read - cnx.execute("SET X nom 'toto' WHERE X is Societe") - self.assertRaises(Unauthorized, cnx.commit) - cnx.execute("INSERT Affaire X: X sujet 'pascool'") - cnx.execute("INSERT Societe X: X nom 'chouette'") - cnx.execute("SET A concerne S WHERE A sujet 'pascool', S nom 'chouette'") - cnx.execute("SET X sujet 'habahsicestcool' WHERE X sujet 'pascool'") - cnx.commit() - - def test_delete_security(self): - # FIXME: sample below fails because we don't detect "owner" can't delete - # user anyway, and since no user with login == 'bidule' exists, no - # exception is raised - #user._groups = {'guests':1} - #self.assertRaises(Unauthorized, - # self.o.execute, user, "DELETE CWUser X WHERE X login 'bidule'") - # check local security - with self.new_access(u'iaminusersgrouponly').repo_cnx() as cnx: - self.assertRaises(Unauthorized, cnx.execute, "DELETE CWGroup Y WHERE Y name 'staff'") - - def test_delete_rql_permission(self): - with self.admin_access.repo_cnx() as cnx: - cnx.execute("SET A concerne S WHERE A is Affaire, S is Societe") - cnx.commit() - # test user can only dele une affaire related to a societe he owns - with self.new_access(u'iaminusersgrouponly').repo_cnx() as cnx: - # this won't actually do anything since the selection query won't return anything - cnx.execute("DELETE Affaire X") - cnx.commit() - # to actually get Unauthorized exception, try to delete an entity we can read - self.assertRaises(Unauthorized, cnx.execute, "DELETE Societe S") - self.assertRaises(QueryError, cnx.commit) # can't commit anymore - cnx.rollback() - cnx.execute("INSERT Affaire X: X sujet 'pascool'") - cnx.execute("INSERT Societe X: X nom 'chouette'") - cnx.execute("SET A concerne S WHERE A sujet 'pascool', S nom 'chouette'") - cnx.commit() -## # this one should fail since it will try to delete two affaires, one authorized -## # and the other not -## self.assertRaises(Unauthorized, cnx.execute, "DELETE Affaire X") - cnx.execute("DELETE Affaire X WHERE X sujet 'pascool'") - cnx.commit() - - def test_insert_relation_rql_permission(self): - with self.new_access(u'iaminusersgrouponly').repo_cnx() as cnx: - cnx.execute("SET A concerne S WHERE A is Affaire, S is Societe") - # should raise Unauthorized since user don't own S though this won't - # actually do anything since the selection query won't return - # anything - cnx.commit() - # to actually get Unauthorized exception, try to insert a relation - # were we can read both entities - rset = cnx.execute('Personne P') - self.assertEqual(len(rset), 1) - ent = rset.get_entity(0, 0) - self.assertFalse(cnx.execute('Any P,S WHERE P travaille S,P is Personne, S is Societe')) - self.assertRaises(Unauthorized, ent.cw_check_perm, 'update') - self.assertRaises(Unauthorized, - cnx.execute, "SET P travaille S WHERE P is Personne, S is Societe") - self.assertRaises(QueryError, cnx.commit) # can't commit anymore - cnx.rollback() - # test nothing has actually been inserted: - self.assertFalse(cnx.execute('Any P,S WHERE P travaille S,P is Personne, S is Societe')) - cnx.execute("INSERT Societe X: X nom 'chouette'") - cnx.execute("SET A concerne S WHERE A is Affaire, S nom 'chouette'") - cnx.commit() - - def test_delete_relation_rql_permission(self): - with self.admin_access.repo_cnx() as cnx: - cnx.execute("SET A concerne S WHERE A is Affaire, S is Societe") - cnx.commit() - with self.new_access(u'iaminusersgrouponly').repo_cnx() as cnx: - # this won't actually do anything since the selection query won't return anything - cnx.execute("DELETE A concerne S") - cnx.commit() - with self.admin_access.repo_cnx() as cnx: - # to actually get Unauthorized exception, try to delete a relation we can read - eid = cnx.execute("INSERT Affaire X: X sujet 'pascool'")[0][0] - cnx.execute('SET X owned_by U WHERE X eid %(x)s, U login "iaminusersgrouponly"', - {'x': eid}) - cnx.execute("SET A concerne S WHERE A sujet 'pascool', S is Societe") - cnx.commit() - with self.new_access(u'iaminusersgrouponly').repo_cnx() as cnx: - self.assertRaises(Unauthorized, cnx.execute, "DELETE A concerne S") - self.assertRaises(QueryError, cnx.commit) # can't commit anymore - cnx.rollback() - cnx.execute("INSERT Societe X: X nom 'chouette'") - cnx.execute("SET A concerne S WHERE A is Affaire, S nom 'chouette'") - cnx.commit() - cnx.execute("DELETE A concerne S WHERE S nom 'chouette'") - cnx.commit() - - - def test_user_can_change_its_upassword(self): - with self.admin_access.repo_cnx() as cnx: - ueid = self.create_user(cnx, u'user').eid - with self.new_access(u'user').repo_cnx() as cnx: - cnx.execute('SET X upassword %(passwd)s WHERE X eid %(x)s', - {'x': ueid, 'passwd': b'newpwd'}) - cnx.commit() - with self.repo.internal_cnx() as cnx: - self.repo.authenticate_user(cnx, 'user', password='newpwd') - - def test_user_cant_change_other_upassword(self): - with self.admin_access.repo_cnx() as cnx: - ueid = self.create_user(cnx, u'otheruser').eid - with self.new_access(u'iaminusersgrouponly').repo_cnx() as cnx: - cnx.execute('SET X upassword %(passwd)s WHERE X eid %(x)s', - {'x': ueid, 'passwd': b'newpwd'}) - self.assertRaises(Unauthorized, cnx.commit) - - # read security test - - def test_read_base(self): - with self.temporary_permissions(Personne={'read': ('users', 'managers')}): - with self.new_access(u'anon').repo_cnx() as cnx: - self.assertRaises(Unauthorized, - cnx.execute, 'Personne U where U nom "managers"') - - def test_read_erqlexpr_base(self): - with self.admin_access.repo_cnx() as cnx: - eid = cnx.execute("INSERT Affaire X: X sujet 'cool'")[0][0] - cnx.commit() - with self.new_access(u'iaminusersgrouponly').repo_cnx() as cnx: - rset = cnx.execute('Affaire X') - self.assertEqual(rset.rows, []) - self.assertRaises(Unauthorized, cnx.execute, 'Any X WHERE X eid %(x)s', {'x': eid}) - # cache test - self.assertRaises(Unauthorized, cnx.execute, 'Any X WHERE X eid %(x)s', {'x': eid}) - aff2 = cnx.execute("INSERT Affaire X: X sujet 'cool'")[0][0] - soc1 = cnx.execute("INSERT Societe X: X nom 'chouette'")[0][0] - cnx.execute("SET A concerne S WHERE A is Affaire, S is Societe") - cnx.commit() - rset = cnx.execute('Any X WHERE X eid %(x)s', {'x': aff2}) - self.assertEqual(rset.rows, [[aff2]]) - # more cache test w/ NOT eid - rset = cnx.execute('Affaire X WHERE NOT X eid %(x)s', {'x': eid}) - self.assertEqual(rset.rows, [[aff2]]) - rset = cnx.execute('Affaire X WHERE NOT X eid %(x)s', {'x': aff2}) - self.assertEqual(rset.rows, []) - # test can't update an attribute of an entity that can't be readen - self.assertRaises(Unauthorized, cnx.execute, - 'SET X sujet "hacked" WHERE X eid %(x)s', {'x': eid}) - - - def test_entity_created_in_transaction(self): - affschema = self.schema['Affaire'] - with self.temporary_permissions(Affaire={'read': affschema.permissions['add']}): - with self.new_access(u'iaminusersgrouponly').repo_cnx() as cnx: - aff2 = cnx.execute("INSERT Affaire X: X sujet 'cool'")[0][0] - # entity created in transaction are readable *by eid* - self.assertTrue(cnx.execute('Any X WHERE X eid %(x)s', {'x':aff2})) - # XXX would be nice if it worked - rset = cnx.execute("Affaire X WHERE X sujet 'cool'") - self.assertEqual(len(rset), 0) - self.assertRaises(Unauthorized, cnx.commit) - - def test_read_erqlexpr_has_text1(self): - with self.admin_access.repo_cnx() as cnx: - aff1 = cnx.execute("INSERT Affaire X: X sujet 'cool'")[0][0] - card1 = cnx.execute("INSERT Card X: X title 'cool'")[0][0] - cnx.execute('SET X owned_by U WHERE X eid %(x)s, U login "iaminusersgrouponly"', - {'x': card1}) - cnx.commit() - with self.new_access(u'iaminusersgrouponly').repo_cnx() as cnx: - aff2 = cnx.execute("INSERT Affaire X: X sujet 'cool'")[0][0] - soc1 = cnx.execute("INSERT Societe X: X nom 'chouette'")[0][0] - cnx.execute("SET A concerne S WHERE A eid %(a)s, S eid %(s)s", {'a': aff2, 's': soc1}) - cnx.commit() - self.assertRaises(Unauthorized, cnx.execute, 'Any X WHERE X eid %(x)s', {'x':aff1}) - self.assertTrue(cnx.execute('Any X WHERE X eid %(x)s', {'x':aff2})) - self.assertTrue(cnx.execute('Any X WHERE X eid %(x)s', {'x':card1})) - rset = cnx.execute("Any X WHERE X has_text 'cool'") - self.assertEqual(sorted(eid for eid, in rset.rows), - [card1, aff2]) - - def test_read_erqlexpr_has_text2(self): - with self.admin_access.repo_cnx() as cnx: - cnx.execute("INSERT Personne X: X nom 'bidule'") - cnx.execute("INSERT Societe X: X nom 'bidule'") - cnx.commit() - with self.temporary_permissions(Personne={'read': ('managers',)}): - with self.new_access(u'iaminusersgrouponly').repo_cnx() as cnx: - rset = cnx.execute('Any N WHERE N has_text "bidule"') - self.assertEqual(len(rset.rows), 1, rset.rows) - rset = cnx.execute('Any N WITH N BEING (Any N WHERE N has_text "bidule")') - self.assertEqual(len(rset.rows), 1, rset.rows) - - def test_read_erqlexpr_optional_rel(self): - with self.admin_access.repo_cnx() as cnx: - cnx.execute("INSERT Personne X: X nom 'bidule'") - cnx.execute("INSERT Societe X: X nom 'bidule'") - cnx.commit() - with self.temporary_permissions(Personne={'read': ('managers',)}): - with self.new_access(u'anon').repo_cnx() as cnx: - rset = cnx.execute('Any N,U WHERE N has_text "bidule", N owned_by U?') - self.assertEqual(len(rset.rows), 1, rset.rows) - - def test_read_erqlexpr_aggregat(self): - with self.admin_access.repo_cnx() as cnx: - cnx.execute("INSERT Affaire X: X sujet 'cool'")[0][0] - cnx.commit() - with self.new_access(u'iaminusersgrouponly').repo_cnx() as cnx: - rset = cnx.execute('Any COUNT(X) WHERE X is Affaire') - self.assertEqual(rset.rows, [[0]]) - aff2 = cnx.execute("INSERT Affaire X: X sujet 'cool'")[0][0] - soc1 = cnx.execute("INSERT Societe X: X nom 'chouette'")[0][0] - cnx.execute("SET A concerne S WHERE A is Affaire, S is Societe") - cnx.commit() - rset = cnx.execute('Any COUNT(X) WHERE X is Affaire') - self.assertEqual(rset.rows, [[1]]) - rset = cnx.execute('Any ETN, COUNT(X) GROUPBY ETN WHERE X is ET, ET name ETN') - values = dict(rset) - self.assertEqual(values['Affaire'], 1) - self.assertEqual(values['Societe'], 2) - rset = cnx.execute('Any ETN, COUNT(X) GROUPBY ETN WHERE X is ET, ET name ETN ' - 'WITH X BEING ((Affaire X) UNION (Societe X))') - self.assertEqual(len(rset), 2) - values = dict(rset) - self.assertEqual(values['Affaire'], 1) - self.assertEqual(values['Societe'], 2) - - - def test_attribute_security(self): - with self.admin_access.repo_cnx() as cnx: - # only managers should be able to edit the 'test' attribute of Personne entities - eid = cnx.execute("INSERT Personne X: X nom 'bidule', " - "X web 'http://www.debian.org', X test TRUE")[0][0] - cnx.execute('SET X test FALSE WHERE X eid %(x)s', {'x': eid}) - cnx.commit() - with self.new_access(u'iaminusersgrouponly').repo_cnx() as cnx: - cnx.execute("INSERT Personne X: X nom 'bidule', " - "X web 'http://www.debian.org', X test TRUE") - self.assertRaises(Unauthorized, cnx.commit) - cnx.execute("INSERT Personne X: X nom 'bidule', " - "X web 'http://www.debian.org', X test FALSE") - self.assertRaises(Unauthorized, cnx.commit) - eid = cnx.execute("INSERT Personne X: X nom 'bidule', " - "X web 'http://www.debian.org'")[0][0] - cnx.commit() - cnx.execute('SET X test FALSE WHERE X eid %(x)s', {'x': eid}) - self.assertRaises(Unauthorized, cnx.commit) - cnx.execute('SET X test TRUE WHERE X eid %(x)s', {'x': eid}) - self.assertRaises(Unauthorized, cnx.commit) - cnx.execute('SET X web "http://www.logilab.org" WHERE X eid %(x)s', {'x': eid}) - cnx.commit() - with self.new_access(u'iaminusersgrouponly').repo_cnx() as cnx: - cnx.execute('INSERT Frozable F: F name "Foo"') - cnx.commit() - cnx.execute('SET F name "Bar" WHERE F is Frozable') - cnx.commit() - cnx.execute('SET F name "BaBar" WHERE F is Frozable') - cnx.execute('SET F frozen True WHERE F is Frozable') - with self.assertRaises(Unauthorized): - cnx.commit() - cnx.rollback() - cnx.execute('SET F frozen True WHERE F is Frozable') - cnx.commit() - cnx.execute('SET F name "Bar" WHERE F is Frozable') - with self.assertRaises(Unauthorized): - cnx.commit() - - def test_attribute_security_rqlexpr(self): - with self.admin_access.repo_cnx() as cnx: - # Note.para attribute editable by managers or if the note is in "todo" state - note = cnx.execute("INSERT Note X: X para 'bidule'").get_entity(0, 0) - cnx.commit() - note.cw_adapt_to('IWorkflowable').fire_transition('markasdone') - cnx.execute('SET X para "truc" WHERE X eid %(x)s', {'x': note.eid}) - cnx.commit() - with self.new_access(u'iaminusersgrouponly').repo_cnx() as cnx: - cnx.execute("SET X para 'chouette' WHERE X eid %(x)s", {'x': note.eid}) - self.assertRaises(Unauthorized, cnx.commit) - note2 = cnx.execute("INSERT Note X: X para 'bidule'").get_entity(0, 0) - cnx.commit() - note2.cw_adapt_to('IWorkflowable').fire_transition('markasdone') - cnx.commit() - self.assertEqual(len(cnx.execute('Any X WHERE X in_state S, S name "todo", X eid %(x)s', - {'x': note2.eid})), - 0) - cnx.execute("SET X para 'chouette' WHERE X eid %(x)s", {'x': note2.eid}) - self.assertRaises(Unauthorized, cnx.commit) - note2.cw_adapt_to('IWorkflowable').fire_transition('redoit') - cnx.commit() - cnx.execute("SET X para 'chouette' WHERE X eid %(x)s", {'x': note2.eid}) - cnx.commit() - cnx.execute("INSERT Note X: X something 'A'") - self.assertRaises(Unauthorized, cnx.commit) - cnx.execute("INSERT Note X: X para 'zogzog', X something 'A'") - cnx.commit() - note = cnx.execute("INSERT Note X").get_entity(0,0) - cnx.commit() - note.cw_set(something=u'B') - cnx.commit() - note.cw_set(something=None, para=u'zogzog') - cnx.commit() - - def test_attribute_read_security(self): - # anon not allowed to see users'login, but they can see users - login_rdef = self.repo.schema['CWUser'].rdef('login') - with self.temporary_permissions((login_rdef, {'read': ('users', 'managers')}), - CWUser={'read': ('guests', 'users', 'managers')}): - with self.new_access(u'anon').repo_cnx() as cnx: - rset = cnx.execute('CWUser X') - self.assertTrue(rset) - x = rset.get_entity(0, 0) - x.complete() - self.assertEqual(x.login, None) - self.assertTrue(x.creation_date) - x = rset.get_entity(1, 0) - x.complete() - self.assertEqual(x.login, None) - self.assertTrue(x.creation_date) - - def test_yams_inheritance_and_security_bug(self): - with self.temporary_permissions(Division={'read': ('managers', - ERQLExpression('X owned_by U'))}): - with self.new_access(u'iaminusersgrouponly').repo_cnx() as cnx: - rqlst = self.repo.vreg.rqlhelper.parse('Any X WHERE X is_instance_of Societe') - self.repo.vreg.solutions(cnx, rqlst, {}) - self.repo.vreg.rqlhelper.annotate(rqlst) - plan = cnx.repo.querier.plan_factory(rqlst, {}, cnx) - plan.preprocess(rqlst) - self.assertEqual( - rqlst.as_string(), - '(Any X WHERE X is IN(Societe, SubDivision)) UNION ' - '(Any X WHERE X is Division, EXISTS(X owned_by %(B)s))') - - -class BaseSchemaSecurityTC(BaseSecurityTC): - """tests related to the base schema permission configuration""" - - def test_user_can_delete_object_he_created(self): - # even if some other user have changed object'state - with self.new_access(u'iaminusersgrouponly').repo_cnx() as cnx: - # due to security test, affaire has to concerne a societe the user owns - cnx.execute('INSERT Societe X: X nom "ARCTIA"') - cnx.execute('INSERT Affaire X: X ref "ARCT01", X concerne S WHERE S nom "ARCTIA"') - cnx.commit() - with self.admin_access.repo_cnx() as cnx: - affaire = cnx.execute('Any X WHERE X ref "ARCT01"').get_entity(0, 0) - affaire.cw_adapt_to('IWorkflowable').fire_transition('abort') - cnx.commit() - self.assertEqual(len(cnx.execute('TrInfo X WHERE X wf_info_for A, A ref "ARCT01"')), - 1) - self.assertEqual(len(cnx.execute('TrInfo X WHERE X wf_info_for A, A ref "ARCT01",' - 'X owned_by U, U login "admin"')), - 1) # TrInfo at the above state change - with self.new_access(u'iaminusersgrouponly').repo_cnx() as cnx: - cnx.execute('DELETE Affaire X WHERE X ref "ARCT01"') - cnx.commit() - self.assertFalse(cnx.execute('Affaire X')) - - def test_users_and_groups_non_readable_by_guests(self): - with self.repo.internal_cnx() as cnx: - admineid = cnx.execute('CWUser U WHERE U login "admin"').rows[0][0] - with self.new_access(u'anon').repo_cnx() as cnx: - anon = cnx.user - # anonymous user can only read itself - rset = cnx.execute('Any L WHERE X owned_by U, U login L') - self.assertEqual([['anon']], rset.rows) - rset = cnx.execute('CWUser X') - self.assertEqual([[anon.eid]], rset.rows) - # anonymous user can read groups (necessary to check allowed transitions for instance) - self.assertTrue(cnx.execute('CWGroup X')) - # should only be able to read the anonymous user, not another one - self.assertRaises(Unauthorized, - cnx.execute, 'CWUser X WHERE X eid %(x)s', {'x': admineid}) - rset = cnx.execute('CWUser X WHERE X eid %(x)s', {'x': anon.eid}) - self.assertEqual([[anon.eid]], rset.rows) - # but can't modify it - cnx.execute('SET X login "toto" WHERE X eid %(x)s', {'x': anon.eid}) - self.assertRaises(Unauthorized, cnx.commit) - - def test_in_group_relation(self): - with self.new_access(u'iaminusersgrouponly').repo_cnx() as cnx: - rql = u"DELETE U in_group G WHERE U login 'admin'" - self.assertRaises(Unauthorized, cnx.execute, rql) - rql = u"SET U in_group G WHERE U login 'admin', G name 'users'" - self.assertRaises(Unauthorized, cnx.execute, rql) - - def test_owned_by(self): - with self.admin_access.repo_cnx() as cnx: - cnx.execute("INSERT Personne X: X nom 'bidule'") - cnx.commit() - with self.new_access(u'iaminusersgrouponly').repo_cnx() as cnx: - rql = u"SET X owned_by U WHERE U login 'iaminusersgrouponly', X is Personne" - self.assertRaises(Unauthorized, cnx.execute, rql) - - def test_bookmarked_by_guests_security(self): - with self.admin_access.repo_cnx() as cnx: - beid1 = cnx.execute('INSERT Bookmark B: B path "?vid=manage", B title "manage"')[0][0] - beid2 = cnx.execute('INSERT Bookmark B: B path "?vid=index", B title "index", ' - 'B bookmarked_by U WHERE U login "anon"')[0][0] - cnx.commit() - with self.new_access(u'anon').repo_cnx() as cnx: - anoneid = cnx.user.eid - self.assertEqual(cnx.execute('Any T,P ORDERBY lower(T) WHERE B is Bookmark,B title T,B path P,' - 'B bookmarked_by U, U eid %s' % anoneid).rows, - [['index', '?vid=index']]) - self.assertEqual(cnx.execute('Any T,P ORDERBY lower(T) WHERE B is Bookmark,B title T,B path P,' - 'B bookmarked_by U, U eid %(x)s', {'x': anoneid}).rows, - [['index', '?vid=index']]) - # can read others bookmarks as well - self.assertEqual(cnx.execute('Any B where B is Bookmark, NOT B bookmarked_by U').rows, - [[beid1]]) - self.assertRaises(Unauthorized, cnx.execute,'DELETE B bookmarked_by U') - self.assertRaises(Unauthorized, - cnx.execute, 'SET B bookmarked_by U WHERE U eid %(x)s, B eid %(b)s', - {'x': anoneid, 'b': beid1}) - - def test_ambigous_ordered(self): - with self.new_access(u'anon').repo_cnx() as cnx: - names = [t for t, in cnx.execute('Any N ORDERBY lower(N) WHERE X name N')] - self.assertEqual(names, sorted(names, key=lambda x: x.lower())) - - def test_in_state_without_update_perm(self): - """check a user change in_state without having update permission on the - subject - """ - with self.admin_access.repo_cnx() as cnx: - eid = cnx.execute('INSERT Affaire X: X ref "ARCT01"')[0][0] - cnx.commit() - with self.new_access(u'iaminusersgrouponly').repo_cnx() as cnx: - # needed to remove rql expr granting update perm to the user - affschema = self.schema['Affaire'] - with self.temporary_permissions(Affaire={'update': affschema.get_groups('update'), - 'read': ('users',)}): - self.assertRaises(Unauthorized, - affschema.check_perm, cnx, 'update', eid=eid) - aff = cnx.execute('Any X WHERE X ref "ARCT01"').get_entity(0, 0) - aff.cw_adapt_to('IWorkflowable').fire_transition('abort') - cnx.commit() - # though changing a user state (even logged user) is reserved to managers - user = cnx.user - # XXX wether it should raise Unauthorized or ValidationError is not clear - # the best would probably ValidationError if the transition doesn't exist - # from the current state but Unauthorized if it exists but user can't pass it - self.assertRaises(ValidationError, - user.cw_adapt_to('IWorkflowable').fire_transition, 'deactivate') - - def test_trinfo_security(self): - with self.admin_access.repo_cnx() as cnx: - aff = cnx.execute('INSERT Affaire X: X ref "ARCT01"').get_entity(0, 0) - iworkflowable = aff.cw_adapt_to('IWorkflowable') - cnx.commit() - iworkflowable.fire_transition('abort') - cnx.commit() - # can change tr info comment - cnx.execute('SET TI comment %(c)s WHERE TI wf_info_for X, X ref "ARCT01"', - {'c': u'bouh!'}) - cnx.commit() - aff.cw_clear_relation_cache('wf_info_for', 'object') - trinfo = iworkflowable.latest_trinfo() - self.assertEqual(trinfo.comment, 'bouh!') - # but not from_state/to_state - aff.cw_clear_relation_cache('wf_info_for', role='object') - self.assertRaises(Unauthorized, cnx.execute, - 'SET TI from_state S WHERE TI eid %(ti)s, S name "ben non"', - {'ti': trinfo.eid}) - self.assertRaises(Unauthorized, cnx.execute, - 'SET TI to_state S WHERE TI eid %(ti)s, S name "pitetre"', - {'ti': trinfo.eid}) - - def test_emailaddress_security(self): - # check for prexisting email adresse - with self.admin_access.repo_cnx() as cnx: - if cnx.execute('Any X WHERE X is EmailAddress'): - rset = cnx.execute('Any X, U WHERE X is EmailAddress, U use_email X') - msg = ['Preexisting email readable by anon found!'] - tmpl = ' - "%s" used by user "%s"' - for i in range(len(rset)): - email, user = rset.get_entity(i, 0), rset.get_entity(i, 1) - msg.append(tmpl % (email.dc_title(), user.dc_title())) - raise RuntimeError('\n'.join(msg)) - # actual test - cnx.execute('INSERT EmailAddress X: X address "hop"').get_entity(0, 0) - cnx.execute('INSERT EmailAddress X: X address "anon", ' - 'U use_email X WHERE U login "anon"').get_entity(0, 0) - cnx.commit() - self.assertEqual(len(cnx.execute('Any X WHERE X is EmailAddress')), 2) - with self.new_access(u'anon').repo_cnx() as cnx: - self.assertEqual(len(cnx.execute('Any X WHERE X is EmailAddress')), 1) - -if __name__ == '__main__': - unittest_main() diff -r 32ee89340e59 -r 3648a2c293f6 cubicweb/server/test/unittest_server_security.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/cubicweb/server/test/unittest_server_security.py Thu Mar 28 11:39:36 2019 +0100 @@ -0,0 +1,689 @@ +# copyright 2003-2016 LOGILAB S.A. (Paris, FRANCE), all rights reserved. +# contact http://www.logilab.fr/ -- mailto:contact@logilab.fr +# +# This file is part of CubicWeb. +# +# CubicWeb is free software: you can redistribute it and/or modify it under the +# terms of the GNU Lesser General Public License as published by the Free +# Software Foundation, either version 2.1 of the License, or (at your option) +# any later version. +# +# CubicWeb is distributed in the hope that it will be useful, but WITHOUT +# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS +# FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more +# details. +# +# You should have received a copy of the GNU Lesser General Public License along +# with CubicWeb. If not, see . +"""functional tests for server'security""" + +from logilab.common.testlib import unittest_main + +from cubicweb.devtools.testlib import CubicWebTC +from cubicweb import Unauthorized, ValidationError, QueryError, Binary +from cubicweb.schema import ERQLExpression +from cubicweb.server.querier import get_local_checks, check_relations_read_access +from cubicweb.server.utils import _CRYPTO_CTX + + +class BaseSecurityTC(CubicWebTC): + + def setup_database(self): + super(BaseSecurityTC, self).setup_database() + with self.admin_access.client_cnx() as cnx: + self.create_user(cnx, u'iaminusersgrouponly') + hash = _CRYPTO_CTX.encrypt('oldpassword', scheme='des_crypt') + self.create_user(cnx, u'oldpassword', password=Binary(hash.encode('ascii'))) + + +class LowLevelSecurityFunctionTC(BaseSecurityTC): + + def test_check_relation_read_access(self): + rql = u'Personne U WHERE U nom "managers"' + rqlst = self.repo.vreg.rqlhelper.parse(rql).children[0] + nom = self.repo.schema['Personne'].rdef('nom') + with self.temporary_permissions((nom, {'read': ('users', 'managers')})): + with self.admin_access.repo_cnx() as cnx: + self.repo.vreg.solutions(cnx, rqlst, None) + check_relations_read_access(cnx, rqlst, {}) + with self.new_access(u'anon').repo_cnx() as cnx: + self.assertRaises(Unauthorized, + check_relations_read_access, + cnx, rqlst, {}) + self.assertRaises(Unauthorized, cnx.execute, rql) + + def test_get_local_checks(self): + rql = u'Personne U WHERE U nom "managers"' + rqlst = self.repo.vreg.rqlhelper.parse(rql).children[0] + with self.temporary_permissions(Personne={'read': ('users', 'managers')}): + with self.admin_access.repo_cnx() as cnx: + self.repo.vreg.solutions(cnx, rqlst, None) + solution = rqlst.solutions[0] + localchecks = get_local_checks(cnx, rqlst, solution) + self.assertEqual({}, localchecks) + with self.new_access(u'anon').repo_cnx() as cnx: + self.assertRaises(Unauthorized, + get_local_checks, + cnx, rqlst, solution) + self.assertRaises(Unauthorized, cnx.execute, rql) + + def test_upassword_not_selectable(self): + with self.admin_access.repo_cnx() as cnx: + self.assertRaises(Unauthorized, + cnx.execute, 'Any X,P WHERE X is CWUser, X upassword P') + with self.new_access(u'iaminusersgrouponly').repo_cnx() as cnx: + self.assertRaises(Unauthorized, + cnx.execute, 'Any X,P WHERE X is CWUser, X upassword P') + + def test_update_password(self): + """Ensure that if a user's password is stored with a deprecated hash, + it will be updated on next login + """ + with self.repo.internal_cnx() as cnx: + oldhash = cnx.system_sql("SELECT cw_upassword FROM cw_CWUser " + "WHERE cw_login = 'oldpassword'").fetchone()[0] + oldhash = self.repo.system_source.binary_to_str(oldhash) + self.repo.authenticate_user(cnx, 'oldpassword', password='oldpassword') + newhash = cnx.system_sql("SELECT cw_upassword FROM cw_CWUser " + "WHERE cw_login = 'oldpassword'").fetchone()[0] + newhash = self.repo.system_source.binary_to_str(newhash) + self.assertNotEqual(oldhash, newhash) + self.assertTrue(newhash.startswith(b'$6$')) + self.repo.authenticate_user(cnx, 'oldpassword', password='oldpassword') + newnewhash = cnx.system_sql("SELECT cw_upassword FROM cw_CWUser WHERE " + "cw_login = 'oldpassword'").fetchone()[0] + newnewhash = self.repo.system_source.binary_to_str(newnewhash) + self.assertEqual(newhash, newnewhash) + + +class SecurityRewritingTC(BaseSecurityTC): + def hijack_source_execute(self): + def syntax_tree_search(*args, **kwargs): + self.query = (args, kwargs) + return [] + self.repo.system_source.syntax_tree_search = syntax_tree_search + + def tearDown(self): + self.repo.system_source.__dict__.pop('syntax_tree_search', None) + super(SecurityRewritingTC, self).tearDown() + + def test_not_relation_read_security(self): + with self.new_access(u'iaminusersgrouponly').repo_cnx() as cnx: + cnx.user.groups # fill the cache before screwing syntax_tree_search + self.hijack_source_execute() + cnx.execute('Any U WHERE NOT A todo_by U, A is Affaire') + self.assertEqual(self.query[0][1].as_string(), + 'Any U WHERE NOT EXISTS(A todo_by U), A is Affaire') + cnx.execute('Any U WHERE NOT EXISTS(A todo_by U), A is Affaire') + self.assertEqual(self.query[0][1].as_string(), + 'Any U WHERE NOT EXISTS(A todo_by U), A is Affaire') + + +class SecurityTC(BaseSecurityTC): + + def setUp(self): + super(SecurityTC, self).setUp() + # implicitly test manager can add some entities + with self.admin_access.repo_cnx() as cnx: + cnx.execute("INSERT Affaire X: X sujet 'cool'") + cnx.execute("INSERT Societe X: X nom 'logilab'") + cnx.execute("INSERT Personne X: X nom 'bidule'") + cnx.execute('INSERT CWGroup X: X name "staff"') + cnx.commit() + + def test_insert_security(self): + with self.new_access(u'anon').repo_cnx() as cnx: + cnx.execute("INSERT Personne X: X nom 'bidule'") + self.assertRaises(Unauthorized, cnx.commit) + self.assertEqual(cnx.execute('Personne X').rowcount, 1) + + def test_insert_security_2(self): + with self.new_access(u'anon').repo_cnx() as cnx: + cnx.execute("INSERT Affaire X") + self.assertRaises(Unauthorized, cnx.commit) + # anon has no read permission on Affaire entities, so + # rowcount == 0 + self.assertEqual(cnx.execute('Affaire X').rowcount, 0) + + def test_insert_rql_permission(self): + # test user can only add une affaire related to a societe he owns + with self.new_access(u'iaminusersgrouponly').repo_cnx() as cnx: + cnx.execute("INSERT Affaire X: X sujet 'cool'") + self.assertRaises(Unauthorized, cnx.commit) + # test nothing has actually been inserted + with self.admin_access.repo_cnx() as cnx: + self.assertEqual(cnx.execute('Affaire X').rowcount, 1) + with self.new_access(u'iaminusersgrouponly').repo_cnx() as cnx: + cnx.execute("INSERT Affaire X: X sujet 'cool'") + cnx.execute("INSERT Societe X: X nom 'chouette'") + cnx.execute("SET A concerne S WHERE A sujet 'cool', S nom 'chouette'") + cnx.commit() + + def test_update_security_1(self): + with self.new_access(u'anon').repo_cnx() as cnx: + # local security check + cnx.execute( "SET X nom 'bidulechouette' WHERE X is Personne") + self.assertRaises(Unauthorized, cnx.commit) + with self.admin_access.repo_cnx() as cnx: + self.assertEqual(cnx.execute('Personne X WHERE X nom "bidulechouette"').rowcount, 0) + + def test_update_security_2(self): + with self.temporary_permissions(Personne={'read': ('users', 'managers'), + 'add': ('guests', 'users', 'managers')}): + with self.new_access(u'anon').repo_cnx() as cnx: + self.assertRaises(Unauthorized, cnx.execute, + "SET X nom 'bidulechouette' WHERE X is Personne") + # test nothing has actually been inserted + with self.admin_access.repo_cnx() as cnx: + self.assertEqual(cnx.execute('Personne X WHERE X nom "bidulechouette"').rowcount, 0) + + def test_update_security_3(self): + with self.new_access(u'iaminusersgrouponly').repo_cnx() as cnx: + cnx.execute("INSERT Personne X: X nom 'biduuule'") + cnx.execute("INSERT Societe X: X nom 'looogilab'") + cnx.execute("SET X travaille S WHERE X nom 'biduuule', S nom 'looogilab'") + + def test_insert_immutable_attribute_update(self): + with self.admin_access.repo_cnx() as cnx: + cnx.create_entity('Old', name=u'Babar') + cnx.commit() + # this should be equivalent + o = cnx.create_entity('Old') + o.cw_set(name=u'Celeste') + cnx.commit() + + def test_update_rql_permission(self): + with self.admin_access.repo_cnx() as cnx: + cnx.execute("SET A concerne S WHERE A is Affaire, S is Societe") + cnx.commit() + # test user can only update une affaire related to a societe he owns + with self.new_access(u'iaminusersgrouponly').repo_cnx() as cnx: + cnx.execute("SET X sujet 'pascool' WHERE X is Affaire") + # this won't actually do anything since the selection query won't return anything + cnx.commit() + # to actually get Unauthorized exception, try to update an entity we can read + cnx.execute("SET X nom 'toto' WHERE X is Societe") + self.assertRaises(Unauthorized, cnx.commit) + cnx.execute("INSERT Affaire X: X sujet 'pascool'") + cnx.execute("INSERT Societe X: X nom 'chouette'") + cnx.execute("SET A concerne S WHERE A sujet 'pascool', S nom 'chouette'") + cnx.execute("SET X sujet 'habahsicestcool' WHERE X sujet 'pascool'") + cnx.commit() + + def test_delete_security(self): + # FIXME: sample below fails because we don't detect "owner" can't delete + # user anyway, and since no user with login == 'bidule' exists, no + # exception is raised + #user._groups = {'guests':1} + #self.assertRaises(Unauthorized, + # self.o.execute, user, "DELETE CWUser X WHERE X login 'bidule'") + # check local security + with self.new_access(u'iaminusersgrouponly').repo_cnx() as cnx: + self.assertRaises(Unauthorized, cnx.execute, "DELETE CWGroup Y WHERE Y name 'staff'") + + def test_delete_rql_permission(self): + with self.admin_access.repo_cnx() as cnx: + cnx.execute("SET A concerne S WHERE A is Affaire, S is Societe") + cnx.commit() + # test user can only dele une affaire related to a societe he owns + with self.new_access(u'iaminusersgrouponly').repo_cnx() as cnx: + # this won't actually do anything since the selection query won't return anything + cnx.execute("DELETE Affaire X") + cnx.commit() + # to actually get Unauthorized exception, try to delete an entity we can read + self.assertRaises(Unauthorized, cnx.execute, "DELETE Societe S") + self.assertRaises(QueryError, cnx.commit) # can't commit anymore + cnx.rollback() + cnx.execute("INSERT Affaire X: X sujet 'pascool'") + cnx.execute("INSERT Societe X: X nom 'chouette'") + cnx.execute("SET A concerne S WHERE A sujet 'pascool', S nom 'chouette'") + cnx.commit() +## # this one should fail since it will try to delete two affaires, one authorized +## # and the other not +## self.assertRaises(Unauthorized, cnx.execute, "DELETE Affaire X") + cnx.execute("DELETE Affaire X WHERE X sujet 'pascool'") + cnx.commit() + + def test_insert_relation_rql_permission(self): + with self.new_access(u'iaminusersgrouponly').repo_cnx() as cnx: + cnx.execute("SET A concerne S WHERE A is Affaire, S is Societe") + # should raise Unauthorized since user don't own S though this won't + # actually do anything since the selection query won't return + # anything + cnx.commit() + # to actually get Unauthorized exception, try to insert a relation + # were we can read both entities + rset = cnx.execute('Personne P') + self.assertEqual(len(rset), 1) + ent = rset.get_entity(0, 0) + self.assertFalse(cnx.execute('Any P,S WHERE P travaille S,P is Personne, S is Societe')) + self.assertRaises(Unauthorized, ent.cw_check_perm, 'update') + self.assertRaises(Unauthorized, + cnx.execute, "SET P travaille S WHERE P is Personne, S is Societe") + self.assertRaises(QueryError, cnx.commit) # can't commit anymore + cnx.rollback() + # test nothing has actually been inserted: + self.assertFalse(cnx.execute('Any P,S WHERE P travaille S,P is Personne, S is Societe')) + cnx.execute("INSERT Societe X: X nom 'chouette'") + cnx.execute("SET A concerne S WHERE A is Affaire, S nom 'chouette'") + cnx.commit() + + def test_delete_relation_rql_permission(self): + with self.admin_access.repo_cnx() as cnx: + cnx.execute("SET A concerne S WHERE A is Affaire, S is Societe") + cnx.commit() + with self.new_access(u'iaminusersgrouponly').repo_cnx() as cnx: + # this won't actually do anything since the selection query won't return anything + cnx.execute("DELETE A concerne S") + cnx.commit() + with self.admin_access.repo_cnx() as cnx: + # to actually get Unauthorized exception, try to delete a relation we can read + eid = cnx.execute("INSERT Affaire X: X sujet 'pascool'")[0][0] + cnx.execute('SET X owned_by U WHERE X eid %(x)s, U login "iaminusersgrouponly"', + {'x': eid}) + cnx.execute("SET A concerne S WHERE A sujet 'pascool', S is Societe") + cnx.commit() + with self.new_access(u'iaminusersgrouponly').repo_cnx() as cnx: + self.assertRaises(Unauthorized, cnx.execute, "DELETE A concerne S") + self.assertRaises(QueryError, cnx.commit) # can't commit anymore + cnx.rollback() + cnx.execute("INSERT Societe X: X nom 'chouette'") + cnx.execute("SET A concerne S WHERE A is Affaire, S nom 'chouette'") + cnx.commit() + cnx.execute("DELETE A concerne S WHERE S nom 'chouette'") + cnx.commit() + + + def test_user_can_change_its_upassword(self): + with self.admin_access.repo_cnx() as cnx: + ueid = self.create_user(cnx, u'user').eid + with self.new_access(u'user').repo_cnx() as cnx: + cnx.execute('SET X upassword %(passwd)s WHERE X eid %(x)s', + {'x': ueid, 'passwd': b'newpwd'}) + cnx.commit() + with self.repo.internal_cnx() as cnx: + self.repo.authenticate_user(cnx, 'user', password='newpwd') + + def test_user_cant_change_other_upassword(self): + with self.admin_access.repo_cnx() as cnx: + ueid = self.create_user(cnx, u'otheruser').eid + with self.new_access(u'iaminusersgrouponly').repo_cnx() as cnx: + cnx.execute('SET X upassword %(passwd)s WHERE X eid %(x)s', + {'x': ueid, 'passwd': b'newpwd'}) + self.assertRaises(Unauthorized, cnx.commit) + + # read security test + + def test_read_base(self): + with self.temporary_permissions(Personne={'read': ('users', 'managers')}): + with self.new_access(u'anon').repo_cnx() as cnx: + self.assertRaises(Unauthorized, + cnx.execute, 'Personne U where U nom "managers"') + + def test_read_erqlexpr_base(self): + with self.admin_access.repo_cnx() as cnx: + eid = cnx.execute("INSERT Affaire X: X sujet 'cool'")[0][0] + cnx.commit() + with self.new_access(u'iaminusersgrouponly').repo_cnx() as cnx: + rset = cnx.execute('Affaire X') + self.assertEqual(rset.rows, []) + self.assertRaises(Unauthorized, cnx.execute, 'Any X WHERE X eid %(x)s', {'x': eid}) + # cache test + self.assertRaises(Unauthorized, cnx.execute, 'Any X WHERE X eid %(x)s', {'x': eid}) + aff2 = cnx.execute("INSERT Affaire X: X sujet 'cool'")[0][0] + soc1 = cnx.execute("INSERT Societe X: X nom 'chouette'")[0][0] + cnx.execute("SET A concerne S WHERE A is Affaire, S is Societe") + cnx.commit() + rset = cnx.execute('Any X WHERE X eid %(x)s', {'x': aff2}) + self.assertEqual(rset.rows, [[aff2]]) + # more cache test w/ NOT eid + rset = cnx.execute('Affaire X WHERE NOT X eid %(x)s', {'x': eid}) + self.assertEqual(rset.rows, [[aff2]]) + rset = cnx.execute('Affaire X WHERE NOT X eid %(x)s', {'x': aff2}) + self.assertEqual(rset.rows, []) + # test can't update an attribute of an entity that can't be readen + self.assertRaises(Unauthorized, cnx.execute, + 'SET X sujet "hacked" WHERE X eid %(x)s', {'x': eid}) + + + def test_entity_created_in_transaction(self): + affschema = self.schema['Affaire'] + with self.temporary_permissions(Affaire={'read': affschema.permissions['add']}): + with self.new_access(u'iaminusersgrouponly').repo_cnx() as cnx: + aff2 = cnx.execute("INSERT Affaire X: X sujet 'cool'")[0][0] + # entity created in transaction are readable *by eid* + self.assertTrue(cnx.execute('Any X WHERE X eid %(x)s', {'x':aff2})) + # XXX would be nice if it worked + rset = cnx.execute("Affaire X WHERE X sujet 'cool'") + self.assertEqual(len(rset), 0) + self.assertRaises(Unauthorized, cnx.commit) + + def test_read_erqlexpr_has_text1(self): + with self.admin_access.repo_cnx() as cnx: + aff1 = cnx.execute("INSERT Affaire X: X sujet 'cool'")[0][0] + card1 = cnx.execute("INSERT Card X: X title 'cool'")[0][0] + cnx.execute('SET X owned_by U WHERE X eid %(x)s, U login "iaminusersgrouponly"', + {'x': card1}) + cnx.commit() + with self.new_access(u'iaminusersgrouponly').repo_cnx() as cnx: + aff2 = cnx.execute("INSERT Affaire X: X sujet 'cool'")[0][0] + soc1 = cnx.execute("INSERT Societe X: X nom 'chouette'")[0][0] + cnx.execute("SET A concerne S WHERE A eid %(a)s, S eid %(s)s", {'a': aff2, 's': soc1}) + cnx.commit() + self.assertRaises(Unauthorized, cnx.execute, 'Any X WHERE X eid %(x)s', {'x':aff1}) + self.assertTrue(cnx.execute('Any X WHERE X eid %(x)s', {'x':aff2})) + self.assertTrue(cnx.execute('Any X WHERE X eid %(x)s', {'x':card1})) + rset = cnx.execute("Any X WHERE X has_text 'cool'") + self.assertEqual(sorted(eid for eid, in rset.rows), + [card1, aff2]) + + def test_read_erqlexpr_has_text2(self): + with self.admin_access.repo_cnx() as cnx: + cnx.execute("INSERT Personne X: X nom 'bidule'") + cnx.execute("INSERT Societe X: X nom 'bidule'") + cnx.commit() + with self.temporary_permissions(Personne={'read': ('managers',)}): + with self.new_access(u'iaminusersgrouponly').repo_cnx() as cnx: + rset = cnx.execute('Any N WHERE N has_text "bidule"') + self.assertEqual(len(rset.rows), 1, rset.rows) + rset = cnx.execute('Any N WITH N BEING (Any N WHERE N has_text "bidule")') + self.assertEqual(len(rset.rows), 1, rset.rows) + + def test_read_erqlexpr_optional_rel(self): + with self.admin_access.repo_cnx() as cnx: + cnx.execute("INSERT Personne X: X nom 'bidule'") + cnx.execute("INSERT Societe X: X nom 'bidule'") + cnx.commit() + with self.temporary_permissions(Personne={'read': ('managers',)}): + with self.new_access(u'anon').repo_cnx() as cnx: + rset = cnx.execute('Any N,U WHERE N has_text "bidule", N owned_by U?') + self.assertEqual(len(rset.rows), 1, rset.rows) + + def test_read_erqlexpr_aggregat(self): + with self.admin_access.repo_cnx() as cnx: + cnx.execute("INSERT Affaire X: X sujet 'cool'")[0][0] + cnx.commit() + with self.new_access(u'iaminusersgrouponly').repo_cnx() as cnx: + rset = cnx.execute('Any COUNT(X) WHERE X is Affaire') + self.assertEqual(rset.rows, [[0]]) + aff2 = cnx.execute("INSERT Affaire X: X sujet 'cool'")[0][0] + soc1 = cnx.execute("INSERT Societe X: X nom 'chouette'")[0][0] + cnx.execute("SET A concerne S WHERE A is Affaire, S is Societe") + cnx.commit() + rset = cnx.execute('Any COUNT(X) WHERE X is Affaire') + self.assertEqual(rset.rows, [[1]]) + rset = cnx.execute('Any ETN, COUNT(X) GROUPBY ETN WHERE X is ET, ET name ETN') + values = dict(rset) + self.assertEqual(values['Affaire'], 1) + self.assertEqual(values['Societe'], 2) + rset = cnx.execute('Any ETN, COUNT(X) GROUPBY ETN WHERE X is ET, ET name ETN ' + 'WITH X BEING ((Affaire X) UNION (Societe X))') + self.assertEqual(len(rset), 2) + values = dict(rset) + self.assertEqual(values['Affaire'], 1) + self.assertEqual(values['Societe'], 2) + + + def test_attribute_security(self): + with self.admin_access.repo_cnx() as cnx: + # only managers should be able to edit the 'test' attribute of Personne entities + eid = cnx.execute("INSERT Personne X: X nom 'bidule', " + "X web 'http://www.debian.org', X test TRUE")[0][0] + cnx.execute('SET X test FALSE WHERE X eid %(x)s', {'x': eid}) + cnx.commit() + with self.new_access(u'iaminusersgrouponly').repo_cnx() as cnx: + cnx.execute("INSERT Personne X: X nom 'bidule', " + "X web 'http://www.debian.org', X test TRUE") + self.assertRaises(Unauthorized, cnx.commit) + cnx.execute("INSERT Personne X: X nom 'bidule', " + "X web 'http://www.debian.org', X test FALSE") + self.assertRaises(Unauthorized, cnx.commit) + eid = cnx.execute("INSERT Personne X: X nom 'bidule', " + "X web 'http://www.debian.org'")[0][0] + cnx.commit() + cnx.execute('SET X test FALSE WHERE X eid %(x)s', {'x': eid}) + self.assertRaises(Unauthorized, cnx.commit) + cnx.execute('SET X test TRUE WHERE X eid %(x)s', {'x': eid}) + self.assertRaises(Unauthorized, cnx.commit) + cnx.execute('SET X web "http://www.logilab.org" WHERE X eid %(x)s', {'x': eid}) + cnx.commit() + with self.new_access(u'iaminusersgrouponly').repo_cnx() as cnx: + cnx.execute('INSERT Frozable F: F name "Foo"') + cnx.commit() + cnx.execute('SET F name "Bar" WHERE F is Frozable') + cnx.commit() + cnx.execute('SET F name "BaBar" WHERE F is Frozable') + cnx.execute('SET F frozen True WHERE F is Frozable') + with self.assertRaises(Unauthorized): + cnx.commit() + cnx.rollback() + cnx.execute('SET F frozen True WHERE F is Frozable') + cnx.commit() + cnx.execute('SET F name "Bar" WHERE F is Frozable') + with self.assertRaises(Unauthorized): + cnx.commit() + + def test_attribute_security_rqlexpr(self): + with self.admin_access.repo_cnx() as cnx: + # Note.para attribute editable by managers or if the note is in "todo" state + note = cnx.execute("INSERT Note X: X para 'bidule'").get_entity(0, 0) + cnx.commit() + note.cw_adapt_to('IWorkflowable').fire_transition('markasdone') + cnx.execute('SET X para "truc" WHERE X eid %(x)s', {'x': note.eid}) + cnx.commit() + with self.new_access(u'iaminusersgrouponly').repo_cnx() as cnx: + cnx.execute("SET X para 'chouette' WHERE X eid %(x)s", {'x': note.eid}) + self.assertRaises(Unauthorized, cnx.commit) + note2 = cnx.execute("INSERT Note X: X para 'bidule'").get_entity(0, 0) + cnx.commit() + note2.cw_adapt_to('IWorkflowable').fire_transition('markasdone') + cnx.commit() + self.assertEqual(len(cnx.execute('Any X WHERE X in_state S, S name "todo", X eid %(x)s', + {'x': note2.eid})), + 0) + cnx.execute("SET X para 'chouette' WHERE X eid %(x)s", {'x': note2.eid}) + self.assertRaises(Unauthorized, cnx.commit) + note2.cw_adapt_to('IWorkflowable').fire_transition('redoit') + cnx.commit() + cnx.execute("SET X para 'chouette' WHERE X eid %(x)s", {'x': note2.eid}) + cnx.commit() + cnx.execute("INSERT Note X: X something 'A'") + self.assertRaises(Unauthorized, cnx.commit) + cnx.execute("INSERT Note X: X para 'zogzog', X something 'A'") + cnx.commit() + note = cnx.execute("INSERT Note X").get_entity(0,0) + cnx.commit() + note.cw_set(something=u'B') + cnx.commit() + note.cw_set(something=None, para=u'zogzog') + cnx.commit() + + def test_attribute_read_security(self): + # anon not allowed to see users'login, but they can see users + login_rdef = self.repo.schema['CWUser'].rdef('login') + with self.temporary_permissions((login_rdef, {'read': ('users', 'managers')}), + CWUser={'read': ('guests', 'users', 'managers')}): + with self.new_access(u'anon').repo_cnx() as cnx: + rset = cnx.execute('CWUser X') + self.assertTrue(rset) + x = rset.get_entity(0, 0) + x.complete() + self.assertEqual(x.login, None) + self.assertTrue(x.creation_date) + x = rset.get_entity(1, 0) + x.complete() + self.assertEqual(x.login, None) + self.assertTrue(x.creation_date) + + def test_yams_inheritance_and_security_bug(self): + with self.temporary_permissions(Division={'read': ('managers', + ERQLExpression('X owned_by U'))}): + with self.new_access(u'iaminusersgrouponly').repo_cnx() as cnx: + rqlst = self.repo.vreg.rqlhelper.parse('Any X WHERE X is_instance_of Societe') + self.repo.vreg.solutions(cnx, rqlst, {}) + self.repo.vreg.rqlhelper.annotate(rqlst) + plan = cnx.repo.querier.plan_factory(rqlst, {}, cnx) + plan.preprocess(rqlst) + self.assertEqual( + rqlst.as_string(), + '(Any X WHERE X is IN(Societe, SubDivision)) UNION ' + '(Any X WHERE X is Division, EXISTS(X owned_by %(B)s))') + + +class BaseSchemaSecurityTC(BaseSecurityTC): + """tests related to the base schema permission configuration""" + + def test_user_can_delete_object_he_created(self): + # even if some other user have changed object'state + with self.new_access(u'iaminusersgrouponly').repo_cnx() as cnx: + # due to security test, affaire has to concerne a societe the user owns + cnx.execute('INSERT Societe X: X nom "ARCTIA"') + cnx.execute('INSERT Affaire X: X ref "ARCT01", X concerne S WHERE S nom "ARCTIA"') + cnx.commit() + with self.admin_access.repo_cnx() as cnx: + affaire = cnx.execute('Any X WHERE X ref "ARCT01"').get_entity(0, 0) + affaire.cw_adapt_to('IWorkflowable').fire_transition('abort') + cnx.commit() + self.assertEqual(len(cnx.execute('TrInfo X WHERE X wf_info_for A, A ref "ARCT01"')), + 1) + self.assertEqual(len(cnx.execute('TrInfo X WHERE X wf_info_for A, A ref "ARCT01",' + 'X owned_by U, U login "admin"')), + 1) # TrInfo at the above state change + with self.new_access(u'iaminusersgrouponly').repo_cnx() as cnx: + cnx.execute('DELETE Affaire X WHERE X ref "ARCT01"') + cnx.commit() + self.assertFalse(cnx.execute('Affaire X')) + + def test_users_and_groups_non_readable_by_guests(self): + with self.repo.internal_cnx() as cnx: + admineid = cnx.execute('CWUser U WHERE U login "admin"').rows[0][0] + with self.new_access(u'anon').repo_cnx() as cnx: + anon = cnx.user + # anonymous user can only read itself + rset = cnx.execute('Any L WHERE X owned_by U, U login L') + self.assertEqual([['anon']], rset.rows) + rset = cnx.execute('CWUser X') + self.assertEqual([[anon.eid]], rset.rows) + # anonymous user can read groups (necessary to check allowed transitions for instance) + self.assertTrue(cnx.execute('CWGroup X')) + # should only be able to read the anonymous user, not another one + self.assertRaises(Unauthorized, + cnx.execute, 'CWUser X WHERE X eid %(x)s', {'x': admineid}) + rset = cnx.execute('CWUser X WHERE X eid %(x)s', {'x': anon.eid}) + self.assertEqual([[anon.eid]], rset.rows) + # but can't modify it + cnx.execute('SET X login "toto" WHERE X eid %(x)s', {'x': anon.eid}) + self.assertRaises(Unauthorized, cnx.commit) + + def test_in_group_relation(self): + with self.new_access(u'iaminusersgrouponly').repo_cnx() as cnx: + rql = u"DELETE U in_group G WHERE U login 'admin'" + self.assertRaises(Unauthorized, cnx.execute, rql) + rql = u"SET U in_group G WHERE U login 'admin', G name 'users'" + self.assertRaises(Unauthorized, cnx.execute, rql) + + def test_owned_by(self): + with self.admin_access.repo_cnx() as cnx: + cnx.execute("INSERT Personne X: X nom 'bidule'") + cnx.commit() + with self.new_access(u'iaminusersgrouponly').repo_cnx() as cnx: + rql = u"SET X owned_by U WHERE U login 'iaminusersgrouponly', X is Personne" + self.assertRaises(Unauthorized, cnx.execute, rql) + + def test_bookmarked_by_guests_security(self): + with self.admin_access.repo_cnx() as cnx: + beid1 = cnx.execute('INSERT Bookmark B: B path "?vid=manage", B title "manage"')[0][0] + beid2 = cnx.execute('INSERT Bookmark B: B path "?vid=index", B title "index", ' + 'B bookmarked_by U WHERE U login "anon"')[0][0] + cnx.commit() + with self.new_access(u'anon').repo_cnx() as cnx: + anoneid = cnx.user.eid + self.assertEqual(cnx.execute('Any T,P ORDERBY lower(T) WHERE B is Bookmark,B title T,B path P,' + 'B bookmarked_by U, U eid %s' % anoneid).rows, + [['index', '?vid=index']]) + self.assertEqual(cnx.execute('Any T,P ORDERBY lower(T) WHERE B is Bookmark,B title T,B path P,' + 'B bookmarked_by U, U eid %(x)s', {'x': anoneid}).rows, + [['index', '?vid=index']]) + # can read others bookmarks as well + self.assertEqual(cnx.execute('Any B where B is Bookmark, NOT B bookmarked_by U').rows, + [[beid1]]) + self.assertRaises(Unauthorized, cnx.execute,'DELETE B bookmarked_by U') + self.assertRaises(Unauthorized, + cnx.execute, 'SET B bookmarked_by U WHERE U eid %(x)s, B eid %(b)s', + {'x': anoneid, 'b': beid1}) + + def test_ambigous_ordered(self): + with self.new_access(u'anon').repo_cnx() as cnx: + names = [t for t, in cnx.execute('Any N ORDERBY lower(N) WHERE X name N')] + self.assertEqual(names, sorted(names, key=lambda x: x.lower())) + + def test_in_state_without_update_perm(self): + """check a user change in_state without having update permission on the + subject + """ + with self.admin_access.repo_cnx() as cnx: + eid = cnx.execute('INSERT Affaire X: X ref "ARCT01"')[0][0] + cnx.commit() + with self.new_access(u'iaminusersgrouponly').repo_cnx() as cnx: + # needed to remove rql expr granting update perm to the user + affschema = self.schema['Affaire'] + with self.temporary_permissions(Affaire={'update': affschema.get_groups('update'), + 'read': ('users',)}): + self.assertRaises(Unauthorized, + affschema.check_perm, cnx, 'update', eid=eid) + aff = cnx.execute('Any X WHERE X ref "ARCT01"').get_entity(0, 0) + aff.cw_adapt_to('IWorkflowable').fire_transition('abort') + cnx.commit() + # though changing a user state (even logged user) is reserved to managers + user = cnx.user + # XXX wether it should raise Unauthorized or ValidationError is not clear + # the best would probably ValidationError if the transition doesn't exist + # from the current state but Unauthorized if it exists but user can't pass it + self.assertRaises(ValidationError, + user.cw_adapt_to('IWorkflowable').fire_transition, 'deactivate') + + def test_trinfo_security(self): + with self.admin_access.repo_cnx() as cnx: + aff = cnx.execute('INSERT Affaire X: X ref "ARCT01"').get_entity(0, 0) + iworkflowable = aff.cw_adapt_to('IWorkflowable') + cnx.commit() + iworkflowable.fire_transition('abort') + cnx.commit() + # can change tr info comment + cnx.execute('SET TI comment %(c)s WHERE TI wf_info_for X, X ref "ARCT01"', + {'c': u'bouh!'}) + cnx.commit() + aff.cw_clear_relation_cache('wf_info_for', 'object') + trinfo = iworkflowable.latest_trinfo() + self.assertEqual(trinfo.comment, 'bouh!') + # but not from_state/to_state + aff.cw_clear_relation_cache('wf_info_for', role='object') + self.assertRaises(Unauthorized, cnx.execute, + 'SET TI from_state S WHERE TI eid %(ti)s, S name "ben non"', + {'ti': trinfo.eid}) + self.assertRaises(Unauthorized, cnx.execute, + 'SET TI to_state S WHERE TI eid %(ti)s, S name "pitetre"', + {'ti': trinfo.eid}) + + def test_emailaddress_security(self): + # check for prexisting email adresse + with self.admin_access.repo_cnx() as cnx: + if cnx.execute('Any X WHERE X is EmailAddress'): + rset = cnx.execute('Any X, U WHERE X is EmailAddress, U use_email X') + msg = ['Preexisting email readable by anon found!'] + tmpl = ' - "%s" used by user "%s"' + for i in range(len(rset)): + email, user = rset.get_entity(i, 0), rset.get_entity(i, 1) + msg.append(tmpl % (email.dc_title(), user.dc_title())) + raise RuntimeError('\n'.join(msg)) + # actual test + cnx.execute('INSERT EmailAddress X: X address "hop"').get_entity(0, 0) + cnx.execute('INSERT EmailAddress X: X address "anon", ' + 'U use_email X WHERE U login "anon"').get_entity(0, 0) + cnx.commit() + self.assertEqual(len(cnx.execute('Any X WHERE X is EmailAddress')), 2) + with self.new_access(u'anon').repo_cnx() as cnx: + self.assertEqual(len(cnx.execute('Any X WHERE X is EmailAddress')), 1) + +if __name__ == '__main__': + unittest_main() diff -r 32ee89340e59 -r 3648a2c293f6 cubicweb/server/test/unittest_server_utils.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/cubicweb/server/test/unittest_server_utils.py Thu Mar 28 11:39:36 2019 +0100 @@ -0,0 +1,426 @@ +# copyright 2003-2017 LOGILAB S.A. (Paris, FRANCE), all rights reserved. +# contact http://www.logilab.fr/ -- mailto:contact@logilab.fr +# +# This file is part of CubicWeb. +# +# CubicWeb is free software: you can redistribute it and/or modify it under the +# terms of the GNU Lesser General Public License as published by the Free +# Software Foundation, either version 2.1 of the License, or (at your option) +# any later version. +# +# CubicWeb is distributed in the hope that it will be useful, but WITHOUT +# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS +# FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more +# details. +# +# You should have received a copy of the GNU Lesser General Public License along +# with CubicWeb. If not, see . +"""unit tests for module cubicweb.utils""" + +import base64 +import datetime +import decimal +import doctest +import re +from unittest import TestCase + +from cubicweb import Binary, Unauthorized +from cubicweb.devtools.testlib import CubicWebTC +from cubicweb.utils import (make_uid, UStringIO, RepeatList, HTMLHead, + QueryCache) +from cubicweb.entity import Entity + +try: + from cubicweb.utils import CubicWebJsonEncoder, json +except ImportError: + json = None + + +class MakeUidTC(TestCase): + def test_1(self): + self.assertNotEqual(make_uid('xyz'), make_uid('abcd')) + self.assertNotEqual(make_uid('xyz'), make_uid('xyz')) + + def test_2(self): + d = set() + while len(d)<10000: + uid = make_uid('xyz') + if uid in d: + self.fail(len(d)) + if re.match('\d', uid): + self.fail('make_uid must not return something begining with ' + 'some numeric character, got %s' % uid) + d.add(uid) + + +class TestQueryCache(TestCase): + def test_querycache(self): + c = QueryCache(ceiling=20) + # write only + for x in range(10): + c[x] = x + self.assertEqual(c._usage_report(), + {'transientcount': 0, + 'itemcount': 10, + 'permanentcount': 0}) + c = QueryCache(ceiling=10) + # we should also get a warning + for x in range(20): + c[x] = x + self.assertEqual(c._usage_report(), + {'transientcount': 0, + 'itemcount': 10, + 'permanentcount': 0}) + # write + reads + c = QueryCache(ceiling=20) + for n in range(4): + for x in range(10): + c[x] = x + c[x] + self.assertEqual(c._usage_report(), + {'transientcount': 10, + 'itemcount': 10, + 'permanentcount': 0}) + c = QueryCache(ceiling=20) + for n in range(17): + for x in range(10): + c[x] = x + c[x] + self.assertEqual(c._usage_report(), + {'transientcount': 0, + 'itemcount': 10, + 'permanentcount': 10}) + c = QueryCache(ceiling=20) + for n in range(17): + for x in range(10): + c[x] = x + if n % 2: + c[x] + if x % 2: + c[x] + self.assertEqual(c._usage_report(), + {'transientcount': 5, + 'itemcount': 10, + 'permanentcount': 5}) + + def test_clear_on_overflow(self): + """Tests that only non-permanent items in the cache are wiped-out on ceiling overflow + """ + c = QueryCache(ceiling=10) + # set 10 values + for x in range(10): + c[x] = x + # arrange for the first 5 to be permanent + for x in range(5): + for r in range(QueryCache._maxlevel + 2): + v = c[x] + self.assertEqual(v, x) + # Add the 11-th + c[10] = 10 + self.assertEqual(c._usage_report(), + {'transientcount': 0, + 'itemcount': 6, + 'permanentcount': 5}) + + def test_get_with_default(self): + """ + Tests the capability of QueryCache for retrieving items with a default value + """ + c = QueryCache(ceiling=20) + # set 10 values + for x in range(10): + c[x] = x + # arrange for the first 5 to be permanent + for x in range(5): + for r in range(QueryCache._maxlevel + 2): + v = c[x] + self.assertEqual(v, x) + self.assertEqual(c._usage_report(), + {'transientcount': 0, + 'itemcount': 10, + 'permanentcount': 5}) + # Test defaults for existing (including in permanents) + for x in range(10): + v = c.get(x, -1) + self.assertEqual(v, x) + # Test defaults for others + for x in range(10, 15): + v = c.get(x, -1) + self.assertEqual(v, -1) + + def test_iterkeys(self): + """ + Tests the iterating on keys in the cache + """ + c = QueryCache(ceiling=20) + # set 10 values + for x in range(10): + c[x] = x + # arrange for the first 5 to be permanent + for x in range(5): + for r in range(QueryCache._maxlevel + 2): + v = c[x] + self.assertEqual(v, x) + self.assertEqual(c._usage_report(), + {'transientcount': 0, + 'itemcount': 10, + 'permanentcount': 5}) + keys = sorted(c) + for x in range(10): + self.assertEqual(x, keys[x]) + + def test_items(self): + """ + Tests the iterating on key-value couples in the cache + """ + c = QueryCache(ceiling=20) + # set 10 values + for x in range(10): + c[x] = x + # arrange for the first 5 to be permanent + for x in range(5): + for r in range(QueryCache._maxlevel + 2): + v = c[x] + self.assertEqual(v, x) + self.assertEqual(c._usage_report(), + {'transientcount': 0, + 'itemcount': 10, + 'permanentcount': 5}) + content = sorted(c.items()) + for x in range(10): + self.assertEqual(x, content[x][0]) + self.assertEqual(x, content[x][1]) + + +class UStringIOTC(TestCase): + def test_boolean_value(self): + self.assertTrue(UStringIO()) + + +class RepeatListTC(TestCase): + + def test_base(self): + l = RepeatList(3, (1, 3)) + self.assertEqual(l[0], (1, 3)) + self.assertEqual(l[2], (1, 3)) + self.assertEqual(l[-1], (1, 3)) + self.assertEqual(len(l), 3) + # XXX + self.assertEqual(l[4], (1, 3)) + + self.assertFalse(RepeatList(0, None)) + + def test_slice(self): + l = RepeatList(3, (1, 3)) + self.assertEqual(l[0:1], [(1, 3)]) + self.assertEqual(l[0:4], [(1, 3)]*3) + self.assertEqual(l[:], [(1, 3)]*3) + + def test_iter(self): + self.assertEqual(list(RepeatList(3, (1, 3))), + [(1, 3)]*3) + + def test_add(self): + l = RepeatList(3, (1, 3)) + self.assertEqual(l + [(1, 4)], [(1, 3)]*3 + [(1, 4)]) + self.assertEqual([(1, 4)] + l, [(1, 4)] + [(1, 3)]*3) + self.assertEqual(l + RepeatList(2, (2, 3)), [(1, 3)]*3 + [(2, 3)]*2) + + x = l + RepeatList(2, (1, 3)) + self.assertIsInstance(x, RepeatList) + self.assertEqual(len(x), 5) + self.assertEqual(x[0], (1, 3)) + + x = l + [(1, 3)] * 2 + self.assertEqual(x, [(1, 3)] * 5) + + def test_eq(self): + self.assertEqual(RepeatList(3, (1, 3)), + [(1, 3)]*3) + + def test_pop(self): + l = RepeatList(3, (1, 3)) + l.pop(2) + self.assertEqual(l, [(1, 3)]*2) + + +class JSONEncoderTC(TestCase): + def setUp(self): + if json is None: + self.skipTest('json not available') + + def encode(self, value): + return json.dumps(value, cls=CubicWebJsonEncoder) + + def test_encoding_dates(self): + self.assertEqual(self.encode(datetime.datetime(2009, 9, 9, 20, 30)), + '"2009/09/09 20:30:00"') + self.assertEqual(self.encode(datetime.date(2009, 9, 9)), + '"2009/09/09"') + self.assertEqual(self.encode(datetime.time(20, 30)), + '"20:30:00"') + + def test_encoding_decimal(self): + self.assertEqual(self.encode(decimal.Decimal('1.2')), '1.2') + + def test_encoding_bare_entity(self): + e = Entity(None) + e.cw_attr_cache['pouet'] = 'hop' + e.eid = 2 + self.assertEqual(json.loads(self.encode(e)), + {'pouet': 'hop', 'eid': 2}) + + def test_encoding_entity_in_list(self): + e = Entity(None) + e.cw_attr_cache['pouet'] = 'hop' + e.eid = 2 + self.assertEqual(json.loads(self.encode([e])), + [{'pouet': 'hop', 'eid': 2}]) + + def test_encoding_binary(self): + for content in (b'he he', b'h\xe9 hxe9'): + with self.subTest(content=content): + encoded = self.encode(Binary(content)) + self.assertEqual(base64.b64decode(encoded), content) + + def test_encoding_unknown_stuff(self): + self.assertEqual(self.encode(TestCase), 'null') + + +class HTMLHeadTC(CubicWebTC): + + def htmlhead(self, datadir_url): + with self.admin_access.web_request() as req: + base_url = u'http://test.fr/data/' + req.datadir_url = base_url + head = HTMLHead(req) + return head + + def test_concat_urls(self): + base_url = u'http://test.fr/data/' + head = self.htmlhead(base_url) + urls = [base_url + u'bob1.js', + base_url + u'bob2.js', + base_url + u'bob3.js'] + result = head.concat_urls(urls) + expected = u'http://test.fr/data/??bob1.js,bob2.js,bob3.js' + self.assertEqual(result, expected) + + def test_group_urls(self): + base_url = u'http://test.fr/data/' + head = self.htmlhead(base_url) + urls_spec = [(base_url + u'bob0.js', None), + (base_url + u'bob1.js', None), + (u'http://ext.com/bob2.js', None), + (u'http://ext.com/bob3.js', None), + (base_url + u'bob4.css', 'all'), + (base_url + u'bob5.css', 'all'), + (base_url + u'bob6.css', 'print'), + (base_url + u'bob7.css', 'print'), + (base_url + u'bob8.css', ('all', u'[if IE 8]')), + (base_url + u'bob9.css', ('print', u'[if IE 8]')) + ] + result = head.group_urls(urls_spec) + expected = [(base_url + u'??bob0.js,bob1.js', None), + (u'http://ext.com/bob2.js', None), + (u'http://ext.com/bob3.js', None), + (base_url + u'??bob4.css,bob5.css', 'all'), + (base_url + u'??bob6.css,bob7.css', 'print'), + (base_url + u'bob8.css', ('all', u'[if IE 8]')), + (base_url + u'bob9.css', ('print', u'[if IE 8]')) + ] + self.assertEqual(list(result), expected) + + def test_getvalue_with_concat(self): + self.config.global_set_option('concat-resources', True) + base_url = u'http://test.fr/data/' + head = self.htmlhead(base_url) + head.add_js(base_url + u'bob0.js') + head.add_js(base_url + u'bob1.js') + head.add_js(u'http://ext.com/bob2.js') + head.add_js(u'http://ext.com/bob3.js') + head.add_css(base_url + u'bob4.css') + head.add_css(base_url + u'bob5.css') + head.add_css(base_url + u'bob6.css', 'print') + head.add_css(base_url + u'bob7.css', 'print') + head.add_ie_css(base_url + u'bob8.css') + head.add_ie_css(base_url + u'bob9.css', 'print', u'[if lt IE 7]') + result = head.getvalue() + expected = u""" + + + + + + + +""" + self.assertEqual(result, expected) + + def test_getvalue_without_concat(self): + self.config.global_set_option('concat-resources', False) + try: + base_url = u'http://test.fr/data/' + head = self.htmlhead(base_url) + head.add_js(base_url + u'bob0.js') + head.add_js(base_url + u'bob1.js') + head.add_js(u'http://ext.com/bob2.js') + head.add_js(u'http://ext.com/bob3.js') + head.add_css(base_url + u'bob4.css') + head.add_css(base_url + u'bob5.css') + head.add_css(base_url + u'bob6.css', 'print') + head.add_css(base_url + u'bob7.css', 'print') + head.add_ie_css(base_url + u'bob8.css') + head.add_ie_css(base_url + u'bob9.css', 'print', u'[if lt IE 7]') + result = head.getvalue() + expected = u""" + + + + + + + + + + +""" + self.assertEqual(result, expected) + finally: + self.config.global_set_option('concat-resources', True) + + +def UnauthorizedTC(TestCase): + + def _test(self, func): + self.assertEqual(func(Unauthorized()), + 'You are not allowed to perform this operation') + self.assertEqual(func(Unauthorized('a')), + 'a') + self.assertEqual(func(Unauthorized('a', 'b')), + 'You are not allowed to perform a operation on b') + self.assertEqual(func(Unauthorized('a', 'b', 'c')), + 'a b c') + + def test_str(self): + self._test(str) + + + +def load_tests(loader, tests, ignore): + import cubicweb.utils + tests.addTests(doctest.DocTestSuite(cubicweb.utils)) + return tests + + +if __name__ == '__main__': + import unittest + unittest.main() diff -r 32ee89340e59 -r 3648a2c293f6 cubicweb/test/unittest_utils.py --- a/cubicweb/test/unittest_utils.py Fri May 24 16:29:14 2019 +0200 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,426 +0,0 @@ -# copyright 2003-2017 LOGILAB S.A. (Paris, FRANCE), all rights reserved. -# contact http://www.logilab.fr/ -- mailto:contact@logilab.fr -# -# This file is part of CubicWeb. -# -# CubicWeb is free software: you can redistribute it and/or modify it under the -# terms of the GNU Lesser General Public License as published by the Free -# Software Foundation, either version 2.1 of the License, or (at your option) -# any later version. -# -# CubicWeb is distributed in the hope that it will be useful, but WITHOUT -# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS -# FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more -# details. -# -# You should have received a copy of the GNU Lesser General Public License along -# with CubicWeb. If not, see . -"""unit tests for module cubicweb.utils""" - -import base64 -import datetime -import decimal -import doctest -import re -from unittest import TestCase - -from cubicweb import Binary, Unauthorized -from cubicweb.devtools.testlib import CubicWebTC -from cubicweb.utils import (make_uid, UStringIO, RepeatList, HTMLHead, - QueryCache) -from cubicweb.entity import Entity - -try: - from cubicweb.utils import CubicWebJsonEncoder, json -except ImportError: - json = None - - -class MakeUidTC(TestCase): - def test_1(self): - self.assertNotEqual(make_uid('xyz'), make_uid('abcd')) - self.assertNotEqual(make_uid('xyz'), make_uid('xyz')) - - def test_2(self): - d = set() - while len(d)<10000: - uid = make_uid('xyz') - if uid in d: - self.fail(len(d)) - if re.match('\d', uid): - self.fail('make_uid must not return something begining with ' - 'some numeric character, got %s' % uid) - d.add(uid) - - -class TestQueryCache(TestCase): - def test_querycache(self): - c = QueryCache(ceiling=20) - # write only - for x in range(10): - c[x] = x - self.assertEqual(c._usage_report(), - {'transientcount': 0, - 'itemcount': 10, - 'permanentcount': 0}) - c = QueryCache(ceiling=10) - # we should also get a warning - for x in range(20): - c[x] = x - self.assertEqual(c._usage_report(), - {'transientcount': 0, - 'itemcount': 10, - 'permanentcount': 0}) - # write + reads - c = QueryCache(ceiling=20) - for n in range(4): - for x in range(10): - c[x] = x - c[x] - self.assertEqual(c._usage_report(), - {'transientcount': 10, - 'itemcount': 10, - 'permanentcount': 0}) - c = QueryCache(ceiling=20) - for n in range(17): - for x in range(10): - c[x] = x - c[x] - self.assertEqual(c._usage_report(), - {'transientcount': 0, - 'itemcount': 10, - 'permanentcount': 10}) - c = QueryCache(ceiling=20) - for n in range(17): - for x in range(10): - c[x] = x - if n % 2: - c[x] - if x % 2: - c[x] - self.assertEqual(c._usage_report(), - {'transientcount': 5, - 'itemcount': 10, - 'permanentcount': 5}) - - def test_clear_on_overflow(self): - """Tests that only non-permanent items in the cache are wiped-out on ceiling overflow - """ - c = QueryCache(ceiling=10) - # set 10 values - for x in range(10): - c[x] = x - # arrange for the first 5 to be permanent - for x in range(5): - for r in range(QueryCache._maxlevel + 2): - v = c[x] - self.assertEqual(v, x) - # Add the 11-th - c[10] = 10 - self.assertEqual(c._usage_report(), - {'transientcount': 0, - 'itemcount': 6, - 'permanentcount': 5}) - - def test_get_with_default(self): - """ - Tests the capability of QueryCache for retrieving items with a default value - """ - c = QueryCache(ceiling=20) - # set 10 values - for x in range(10): - c[x] = x - # arrange for the first 5 to be permanent - for x in range(5): - for r in range(QueryCache._maxlevel + 2): - v = c[x] - self.assertEqual(v, x) - self.assertEqual(c._usage_report(), - {'transientcount': 0, - 'itemcount': 10, - 'permanentcount': 5}) - # Test defaults for existing (including in permanents) - for x in range(10): - v = c.get(x, -1) - self.assertEqual(v, x) - # Test defaults for others - for x in range(10, 15): - v = c.get(x, -1) - self.assertEqual(v, -1) - - def test_iterkeys(self): - """ - Tests the iterating on keys in the cache - """ - c = QueryCache(ceiling=20) - # set 10 values - for x in range(10): - c[x] = x - # arrange for the first 5 to be permanent - for x in range(5): - for r in range(QueryCache._maxlevel + 2): - v = c[x] - self.assertEqual(v, x) - self.assertEqual(c._usage_report(), - {'transientcount': 0, - 'itemcount': 10, - 'permanentcount': 5}) - keys = sorted(c) - for x in range(10): - self.assertEqual(x, keys[x]) - - def test_items(self): - """ - Tests the iterating on key-value couples in the cache - """ - c = QueryCache(ceiling=20) - # set 10 values - for x in range(10): - c[x] = x - # arrange for the first 5 to be permanent - for x in range(5): - for r in range(QueryCache._maxlevel + 2): - v = c[x] - self.assertEqual(v, x) - self.assertEqual(c._usage_report(), - {'transientcount': 0, - 'itemcount': 10, - 'permanentcount': 5}) - content = sorted(c.items()) - for x in range(10): - self.assertEqual(x, content[x][0]) - self.assertEqual(x, content[x][1]) - - -class UStringIOTC(TestCase): - def test_boolean_value(self): - self.assertTrue(UStringIO()) - - -class RepeatListTC(TestCase): - - def test_base(self): - l = RepeatList(3, (1, 3)) - self.assertEqual(l[0], (1, 3)) - self.assertEqual(l[2], (1, 3)) - self.assertEqual(l[-1], (1, 3)) - self.assertEqual(len(l), 3) - # XXX - self.assertEqual(l[4], (1, 3)) - - self.assertFalse(RepeatList(0, None)) - - def test_slice(self): - l = RepeatList(3, (1, 3)) - self.assertEqual(l[0:1], [(1, 3)]) - self.assertEqual(l[0:4], [(1, 3)]*3) - self.assertEqual(l[:], [(1, 3)]*3) - - def test_iter(self): - self.assertEqual(list(RepeatList(3, (1, 3))), - [(1, 3)]*3) - - def test_add(self): - l = RepeatList(3, (1, 3)) - self.assertEqual(l + [(1, 4)], [(1, 3)]*3 + [(1, 4)]) - self.assertEqual([(1, 4)] + l, [(1, 4)] + [(1, 3)]*3) - self.assertEqual(l + RepeatList(2, (2, 3)), [(1, 3)]*3 + [(2, 3)]*2) - - x = l + RepeatList(2, (1, 3)) - self.assertIsInstance(x, RepeatList) - self.assertEqual(len(x), 5) - self.assertEqual(x[0], (1, 3)) - - x = l + [(1, 3)] * 2 - self.assertEqual(x, [(1, 3)] * 5) - - def test_eq(self): - self.assertEqual(RepeatList(3, (1, 3)), - [(1, 3)]*3) - - def test_pop(self): - l = RepeatList(3, (1, 3)) - l.pop(2) - self.assertEqual(l, [(1, 3)]*2) - - -class JSONEncoderTC(TestCase): - def setUp(self): - if json is None: - self.skipTest('json not available') - - def encode(self, value): - return json.dumps(value, cls=CubicWebJsonEncoder) - - def test_encoding_dates(self): - self.assertEqual(self.encode(datetime.datetime(2009, 9, 9, 20, 30)), - '"2009/09/09 20:30:00"') - self.assertEqual(self.encode(datetime.date(2009, 9, 9)), - '"2009/09/09"') - self.assertEqual(self.encode(datetime.time(20, 30)), - '"20:30:00"') - - def test_encoding_decimal(self): - self.assertEqual(self.encode(decimal.Decimal('1.2')), '1.2') - - def test_encoding_bare_entity(self): - e = Entity(None) - e.cw_attr_cache['pouet'] = 'hop' - e.eid = 2 - self.assertEqual(json.loads(self.encode(e)), - {'pouet': 'hop', 'eid': 2}) - - def test_encoding_entity_in_list(self): - e = Entity(None) - e.cw_attr_cache['pouet'] = 'hop' - e.eid = 2 - self.assertEqual(json.loads(self.encode([e])), - [{'pouet': 'hop', 'eid': 2}]) - - def test_encoding_binary(self): - for content in (b'he he', b'h\xe9 hxe9'): - with self.subTest(content=content): - encoded = self.encode(Binary(content)) - self.assertEqual(base64.b64decode(encoded), content) - - def test_encoding_unknown_stuff(self): - self.assertEqual(self.encode(TestCase), 'null') - - -class HTMLHeadTC(CubicWebTC): - - def htmlhead(self, datadir_url): - with self.admin_access.web_request() as req: - base_url = u'http://test.fr/data/' - req.datadir_url = base_url - head = HTMLHead(req) - return head - - def test_concat_urls(self): - base_url = u'http://test.fr/data/' - head = self.htmlhead(base_url) - urls = [base_url + u'bob1.js', - base_url + u'bob2.js', - base_url + u'bob3.js'] - result = head.concat_urls(urls) - expected = u'http://test.fr/data/??bob1.js,bob2.js,bob3.js' - self.assertEqual(result, expected) - - def test_group_urls(self): - base_url = u'http://test.fr/data/' - head = self.htmlhead(base_url) - urls_spec = [(base_url + u'bob0.js', None), - (base_url + u'bob1.js', None), - (u'http://ext.com/bob2.js', None), - (u'http://ext.com/bob3.js', None), - (base_url + u'bob4.css', 'all'), - (base_url + u'bob5.css', 'all'), - (base_url + u'bob6.css', 'print'), - (base_url + u'bob7.css', 'print'), - (base_url + u'bob8.css', ('all', u'[if IE 8]')), - (base_url + u'bob9.css', ('print', u'[if IE 8]')) - ] - result = head.group_urls(urls_spec) - expected = [(base_url + u'??bob0.js,bob1.js', None), - (u'http://ext.com/bob2.js', None), - (u'http://ext.com/bob3.js', None), - (base_url + u'??bob4.css,bob5.css', 'all'), - (base_url + u'??bob6.css,bob7.css', 'print'), - (base_url + u'bob8.css', ('all', u'[if IE 8]')), - (base_url + u'bob9.css', ('print', u'[if IE 8]')) - ] - self.assertEqual(list(result), expected) - - def test_getvalue_with_concat(self): - self.config.global_set_option('concat-resources', True) - base_url = u'http://test.fr/data/' - head = self.htmlhead(base_url) - head.add_js(base_url + u'bob0.js') - head.add_js(base_url + u'bob1.js') - head.add_js(u'http://ext.com/bob2.js') - head.add_js(u'http://ext.com/bob3.js') - head.add_css(base_url + u'bob4.css') - head.add_css(base_url + u'bob5.css') - head.add_css(base_url + u'bob6.css', 'print') - head.add_css(base_url + u'bob7.css', 'print') - head.add_ie_css(base_url + u'bob8.css') - head.add_ie_css(base_url + u'bob9.css', 'print', u'[if lt IE 7]') - result = head.getvalue() - expected = u""" - - - - - - - -""" - self.assertEqual(result, expected) - - def test_getvalue_without_concat(self): - self.config.global_set_option('concat-resources', False) - try: - base_url = u'http://test.fr/data/' - head = self.htmlhead(base_url) - head.add_js(base_url + u'bob0.js') - head.add_js(base_url + u'bob1.js') - head.add_js(u'http://ext.com/bob2.js') - head.add_js(u'http://ext.com/bob3.js') - head.add_css(base_url + u'bob4.css') - head.add_css(base_url + u'bob5.css') - head.add_css(base_url + u'bob6.css', 'print') - head.add_css(base_url + u'bob7.css', 'print') - head.add_ie_css(base_url + u'bob8.css') - head.add_ie_css(base_url + u'bob9.css', 'print', u'[if lt IE 7]') - result = head.getvalue() - expected = u""" - - - - - - - - - - -""" - self.assertEqual(result, expected) - finally: - self.config.global_set_option('concat-resources', True) - - -def UnauthorizedTC(TestCase): - - def _test(self, func): - self.assertEqual(func(Unauthorized()), - 'You are not allowed to perform this operation') - self.assertEqual(func(Unauthorized('a')), - 'a') - self.assertEqual(func(Unauthorized('a', 'b')), - 'You are not allowed to perform a operation on b') - self.assertEqual(func(Unauthorized('a', 'b', 'c')), - 'a b c') - - def test_str(self): - self._test(str) - - - -def load_tests(loader, tests, ignore): - import cubicweb.utils - tests.addTests(doctest.DocTestSuite(cubicweb.utils)) - return tests - - -if __name__ == '__main__': - import unittest - unittest.main()