# HG changeset patch # User Sylvain Thenault # Date 1231404927 -3600 # Node ID 600dd2fe8b407b03e6e8576f9482680dd0d65df8 # Parent 164307023401cdff78fe206f3724da2e0a73d910 backport unittest_ldapuser diff -r 164307023401 -r 600dd2fe8b40 server/test/data/schema/custom.py --- a/server/test/data/schema/custom.py Wed Jan 07 11:36:16 2009 -0800 +++ b/server/test/data/schema/custom.py Thu Jan 08 09:55:27 2009 +0100 @@ -25,3 +25,6 @@ class see_also(RelationDefinition): subject = ('Bookmark', 'Note') object = ('Bookmark', 'Note') + +_euser = import_schema('base').EUser +_euser.__relations__[0].fulltextindexed = True diff -r 164307023401 -r 600dd2fe8b40 server/test/unittest_ldapuser.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/server/test/unittest_ldapuser.py Thu Jan 08 09:55:27 2009 +0100 @@ -0,0 +1,372 @@ +"""cubicweb.server.sources.ldapusers unit and functional tests""" + +from logilab.common.testlib import TestCase, unittest_main, mock_object +from cubicweb.devtools import init_test_database, TestServerConfiguration +from cubicweb.devtools.apptest import RepositoryBasedTC +from cubicweb.devtools.repotest import RQLGeneratorTC + +from cubicweb.server.sources.ldapuser import * + +def nopwd_authenticate(self, session, login, upassword): + """used to monkey patch the source to get successful authentication without + upassword checking + """ + assert login, 'no login!' + searchfilter = [filter_format('(%s=%s)', (self.user_login_attr, login))] + searchfilter.extend([filter_format('(%s=%s)', ('objectClass', o)) + for o in self.user_classes]) + searchstr = '(&%s)' % ''.join(searchfilter) + # first search the user + try: + user = self._search(session, self.user_base_dn, self.user_base_scope, + searchstr)[0] + except IndexError: + # no such user + raise AuthenticationError() + # don't check upassword ! + return self.extid2eid(user['dn'], 'EUser', session) + + + +config = TestServerConfiguration('data') +config.sources_file = lambda : 'data/sourcesldap' +repo, cnx = init_test_database('sqlite', config=config) + +class LDAPUserSourceTC(RepositoryBasedTC): + repo = repo + + def patch_authenticate(self): + self._orig_authenticate = LDAPUserSource.authenticate + LDAPUserSource.authenticate = nopwd_authenticate + + def setUp(self): + self._prepare() + # XXX: need this first query else we get 'database is locked' from + # sqlite since it doesn't support multiple connections on the same + # database + # so doing, ldap inserted users don't get removed between each test + rset = self.execute('EUser X') + self.commit() + # check we get some users from ldap + self.assert_(len(rset) > 1) + self.maxeid = self.execute('Any MAX(X)')[0][0] + + def tearDown(self): + if hasattr(self, '_orig_authenticate'): + LDAPUserSource.authenticate = self._orig_authenticate + RepositoryBasedTC.tearDown(self) + + def test_authenticate(self): + source = self.repo.sources_by_uri['ldapuser'] + self.assertRaises(AuthenticationError, + source.authenticate, self.session, 'toto', 'toto') + + def test_synchronize(self): + source = self.repo.sources_by_uri['ldapuser'] + source.synchronize() + + def test_base(self): + # check a known one + e = self.execute('EUser X WHERE X login "syt"').get_entity(0, 0) + self.assertEquals(e.login, 'syt') + e.complete() + self.assertEquals(e.creation_date, None) + self.assertEquals(e.modification_date, None) + self.assertEquals(e.firstname, None) + self.assertEquals(e.surname, None) + self.assertEquals(e.in_group[0].name, 'users') + self.assertEquals(e.owned_by[0].login, 'syt') + self.assertEquals(e.created_by, []) + self.assertEquals(e.primary_email[0].address, 'Sylvain Thenault') + # email content should be indexed on the user + rset = self.execute('EUser X WHERE X has_text "thenault"') + self.assertEquals(rset.rows, [[e.eid]]) + + def test_not(self): + eid = self.execute('EUser X WHERE X login "syt"')[0][0] + rset = self.execute('EUser X WHERE NOT X eid %s' % eid) + self.assert_(rset) + self.assert_(not eid in (r[0] for r in rset)) + + def test_multiple(self): + seid = self.execute('EUser X WHERE X login "syt"')[0][0] + aeid = self.execute('EUser X WHERE X login "adim"')[0][0] + rset = self.execute('EUser X, Y WHERE X login "syt", Y login "adim"') + self.assertEquals(rset.rows, [[seid, aeid]]) + rset = self.execute('Any X,Y,L WHERE X login L, X login "syt", Y login "adim"') + self.assertEquals(rset.rows, [[seid, aeid, 'syt']]) + + def test_in(self): + seid = self.execute('EUser X WHERE X login "syt"')[0][0] + aeid = self.execute('EUser X WHERE X login "adim"')[0][0] + rset = self.execute('Any X,L ORDERBY L WHERE X login IN("syt", "adim"), X login L') + self.assertEquals(rset.rows, [[aeid, 'adim'], [seid, 'syt']]) + + def test_relations(self): + eid = self.execute('EUser X WHERE X login "syt"')[0][0] + rset = self.execute('Any X,E WHERE X is EUser, X login L, X primary_email E') + self.assert_(eid in (r[0] for r in rset)) + rset = self.execute('Any X,L,E WHERE X is EUser, X login L, X primary_email E') + self.assert_('syt' in (r[1] for r in rset)) + + def test_count(self): + nbusers = self.execute('Any COUNT(X) WHERE X is EUser')[0][0] + # just check this is a possible number + self.assert_(nbusers > 1, nbusers) + self.assert_(nbusers < 30, nbusers) + + def test_upper(self): + eid = self.execute('EUser X WHERE X login "syt"')[0][0] + rset = self.execute('Any UPPER(L) WHERE X eid %s, X login L' % eid) + self.assertEquals(rset[0][0], 'SYT') + + def test_unknown_attr(self): + eid = self.execute('EUser X WHERE X login "syt"')[0][0] + rset = self.execute('Any L,C,M WHERE X eid %s, X login L, ' + 'X creation_date C, X modification_date M' % eid) + self.assertEquals(rset[0][0], 'syt') + self.assertEquals(rset[0][1], None) + self.assertEquals(rset[0][2], None) + + def test_sort(self): + logins = [l for l, in self.execute('Any L ORDERBY L WHERE X login L')] + self.assertEquals(logins, sorted(logins)) + + def test_lower_sort(self): + logins = [l for l, in self.execute('Any L ORDERBY lower(L) WHERE X login L')] + self.assertEquals(logins, sorted(logins)) + + def test_or(self): + rset = self.execute('DISTINCT Any X WHERE X login "syt" OR (X in_group G, G name "managers")') + self.assertEquals(len(rset), 2, rset.rows) # syt + admin + + def test_nonregr_set_owned_by(self): + # test that when a user coming from ldap is triggering a transition + # the related TrInfo has correct owner information + self.execute('SET X in_group G WHERE X login "syt", G name "managers"') + self.commit() + syt = self.execute('EUser X WHERE X login "syt"').get_entity(0, 0) + self.assertEquals([g.name for g in syt.in_group], ['managers', 'users']) + self.patch_authenticate() + cnx = self.login('syt', 'dummypassword') + cu = cnx.cursor() + cu.execute('SET X in_state S WHERE X login "alf", S name "deactivated"') + try: + cnx.commit() + alf = self.execute('EUser X WHERE X login "alf"').get_entity(0, 0) + self.assertEquals(alf.in_state[0].name, 'deactivated') + trinfo = alf.latest_trinfo() + self.assertEquals(trinfo.owned_by[0].login, 'syt') + # select from_state to skip the user's creation TrInfo + rset = self.execute('Any U ORDERBY D DESC WHERE WF wf_info_for X,' + 'WF creation_date D, WF from_state FS,' + 'WF owned_by U?, X eid %(x)s', + {'x': alf.eid}, 'x') + self.assertEquals(rset.rows, [[syt.eid]]) + finally: + # restore db state + self.restore_connection() + self.execute('SET X in_state S WHERE X login "alf", S name "activated"') + self.execute('DELETE X in_group G WHERE X login "syt", G name "managers"') + + def test_same_column_names(self): + self.execute('Any X, Y WHERE X copain Y, X login "comme", Y login "cochon"') + + def test_multiple_entities_from_different_sources(self): + self.create_user('cochon') + self.failUnless(self.execute('Any X,Y WHERE X login "syt", Y login "cochon"')) + + def test_exists1(self): + self.add_entity('EGroup', name=u'bougloup1') + self.add_entity('EGroup', name=u'bougloup2') + self.execute('SET U in_group G WHERE G name ~= "bougloup%", U login "admin"') + self.execute('SET U in_group G WHERE G name = "bougloup1", U login "syt"') + rset = self.execute('Any L,SN ORDERBY L WHERE X in_state S, S name SN, X login L, EXISTS(X in_group G, G name ~= "bougloup%")') + self.assertEquals(rset.rows, [['admin', 'activated'], ['syt', 'activated']]) + + def test_exists2(self): + self.create_user('comme') + self.create_user('cochon') + self.execute('SET X copain Y WHERE X login "comme", Y login "cochon"') + rset = self.execute('Any GN ORDERBY GN WHERE X in_group G, G name GN, (G name "managers" OR EXISTS(X copain T, T login in ("comme", "cochon")))') + self.assertEquals(rset.rows, [['managers'], ['users']]) + + def test_exists3(self): + self.create_user('comme') + self.create_user('cochon') + self.execute('SET X copain Y WHERE X login "comme", Y login "cochon"') + self.failUnless(self.execute('Any X, Y WHERE X copain Y, X login "comme", Y login "cochon"')) + self.execute('SET X copain Y WHERE X login "syt", Y login "cochon"') + self.failUnless(self.execute('Any X, Y WHERE X copain Y, X login "syt", Y login "cochon"')) + rset = self.execute('Any GN,L WHERE X in_group G, X login L, G name GN, G name "managers" OR EXISTS(X copain T, T login in ("comme", "cochon"))') + self.assertEquals(sorted(rset.rows), [['managers', 'admin'], ['users', 'comme'], ['users', 'syt']]) + + def test_exists4(self): + self.create_user('comme') + self.create_user('cochon', groups=('users', 'guests')) + self.create_user('billy') + self.execute('SET X copain Y WHERE X login "comme", Y login "cochon"') + self.execute('SET X copain Y WHERE X login "cochon", Y login "cochon"') + self.execute('SET X copain Y WHERE X login "comme", Y login "billy"') + self.execute('SET X copain Y WHERE X login "syt", Y login "billy"') + # search for group name, login where + # EUser copain with "comme" or "cochon" AND same login as the copain + # OR + # EUser in_state activated AND not copain with billy + # + # SO we expect everybody but "comme" and "syt" + rset= self.execute('Any GN,L WHERE X in_group G, X login L, G name GN, ' + 'EXISTS(X copain T, T login L, T login in ("comme", "cochon")) OR ' + 'EXISTS(X in_state S, S name "activated", NOT X copain T2, T2 login "billy")') + all = self.execute('Any GN, L WHERE X in_group G, X login L, G name GN') + all.rows.remove(['users', 'comme']) + all.rows.remove(['users', 'syt']) + self.assertEquals(sorted(rset.rows), sorted(all.rows)) + + def test_exists5(self): + self.create_user('comme') + self.create_user('cochon', groups=('users', 'guests')) + self.create_user('billy') + self.execute('SET X copain Y WHERE X login "comme", Y login "cochon"') + self.execute('SET X copain Y WHERE X login "cochon", Y login "cochon"') + self.execute('SET X copain Y WHERE X login "comme", Y login "billy"') + self.execute('SET X copain Y WHERE X login "syt", Y login "cochon"') + rset= self.execute('Any L WHERE X login L, ' + 'EXISTS(X copain T, T login in ("comme", "cochon")) AND ' + 'NOT EXISTS(X copain T2, T2 login "billy")') + self.assertEquals(sorted(rset.rows), [['cochon'], ['syt']]) + rset= self.execute('Any GN,L WHERE X in_group G, X login L, G name GN, ' + 'EXISTS(X copain T, T login in ("comme", "cochon")) AND ' + 'NOT EXISTS(X copain T2, T2 login "billy")') + self.assertEquals(sorted(rset.rows), [['guests', 'cochon'], + ['users', 'cochon'], + ['users', 'syt']]) + + + def test_union(self): + afeids = self.execute('State X') + ueids = self.execute('EUser X') + rset = self.execute('(Any X WHERE X is State) UNION (Any X WHERE X is EUser)') + self.assertEquals(sorted(r[0] for r in rset.rows), + sorted(r[0] for r in afeids + ueids)) + + def _init_security_test(self): + self.create_user('iaminguestsgrouponly', groups=('guests',)) + cnx = self.login('iaminguestsgrouponly') + return cnx.cursor() + + def test_security1(self): + cu = self._init_security_test() + rset = cu.execute('Any X WHERE X login "syt"') + self.assertEquals(rset.rows, []) + rset = cu.execute('Any X WHERE X login "iaminguestsgrouponly"') + self.assertEquals(len(rset.rows), 1) + + def test_security2(self): + cu = self._init_security_test() + rset = cu.execute('Any X WHERE X has_text "syt"') + self.assertEquals(rset.rows, []) + rset = cu.execute('Any X WHERE X has_text "iaminguestsgrouponly"') + self.assertEquals(len(rset.rows), 1) + + def test_security3(self): + cu = self._init_security_test() + rset = cu.execute('Any F WHERE X has_text "syt", X firstname F') + self.assertEquals(rset.rows, []) + rset = cu.execute('Any F WHERE X has_text "iaminguestsgrouponly", X firstname F') + self.assertEquals(rset.rows, [[None]]) + + def test_nonregr_1(self): + self.execute('Any X,AA ORDERBY AA DESC WHERE E eid %(x)s, E owned_by X, ' + 'X modification_date AA', + {'x': cnx.user(self.session).eid}) + + def test_nonregr_2(self): + self.execute('Any X,L,AA WHERE E eid %(x)s, E owned_by X, ' + 'X login L, X modification_date AA', + {'x': cnx.user(self.session).eid}) + + def test_nonregr_3(self): + self.execute('Any X,AA ORDERBY AA DESC WHERE E eid %(x)s, ' + 'X modification_date AA', + {'x': cnx.user(self.session).eid}) + + def test_nonregr_4(self): + emaileid = self.execute('INSERT EmailAddress X: X address "toto@logilab.org"')[0][0] + self.execute('Any X,AA WHERE X use_email Y, Y eid %(x)s, X modification_date AA', + {'x': emaileid}) + + def test_nonregr_5(self): + # original jpl query: + # Any X, NOW - CD, P WHERE P is Project, U interested_in P, U is EUser, U login "sthenault", X concerns P, X creation_date CD ORDERBY CD DESC LIMIT 5 + rql = 'Any X, NOW - CD, P ORDERBY CD DESC LIMIT 5 WHERE P bookmarked_by U, U login "%s", P is X, X creation_date CD' % self.session.user.login + self.execute(rql, )#{'x': }) + + +class GlobTrFuncTC(TestCase): + + def test_count(self): + trfunc = GlobTrFunc('count', 0) + res = trfunc.apply([[1], [2], [3], [4]]) + self.assertEquals(res, [[4]]) + trfunc = GlobTrFunc('count', 1) + res = trfunc.apply([[1, 2], [2, 4], [3, 6], [1, 5]]) + self.assertEquals(res, [[1, 2], [2, 1], [3, 1]]) + + def test_sum(self): + trfunc = GlobTrFunc('sum', 0) + res = trfunc.apply([[1], [2], [3], [4]]) + self.assertEquals(res, [[10]]) + trfunc = GlobTrFunc('sum', 1) + res = trfunc.apply([[1, 2], [2, 4], [3, 6], [1, 5]]) + self.assertEquals(res, [[1, 7], [2, 4], [3, 6]]) + + def test_min(self): + trfunc = GlobTrFunc('min', 0) + res = trfunc.apply([[1], [2], [3], [4]]) + self.assertEquals(res, [[1]]) + trfunc = GlobTrFunc('min', 1) + res = trfunc.apply([[1, 2], [2, 4], [3, 6], [1, 5]]) + self.assertEquals(res, [[1, 2], [2, 4], [3, 6]]) + + def test_max(self): + trfunc = GlobTrFunc('max', 0) + res = trfunc.apply([[1], [2], [3], [4]]) + self.assertEquals(res, [[4]]) + trfunc = GlobTrFunc('max', 1) + res = trfunc.apply([[1, 2], [2, 4], [3, 6], [1, 5]]) + self.assertEquals(res, [[1, 5], [2, 4], [3, 6]]) + + +class RQL2LDAPFilterTC(RQLGeneratorTC): + schema = repo.schema + + def setUp(self): + RQLGeneratorTC.setUp(self) + ldapsource = repo.sources[-1] + self.pool = repo._get_pool() + session = mock_object(pool=self.pool) + self.o = RQL2LDAPFilter(ldapsource, session) + + def tearDown(self): + repo._free_pool(self.pool) + RQLGeneratorTC.tearDown(self) + + def test_base(self): + rqlst = self._prepare('EUser X WHERE X login "toto"').children[0] + self.assertEquals(self.o.generate(rqlst, 'X')[1], + '(&(objectClass=top)(objectClass=posixAccount)(uid=toto))') + + def test_kwargs(self): + rqlst = self._prepare('EUser X WHERE X login %(x)s').children[0] + self.o._args = {'x': "toto"} + self.assertEquals(self.o.generate(rqlst, 'X')[1], + '(&(objectClass=top)(objectClass=posixAccount)(uid=toto))') + + def test_get_attr(self): + rqlst = self._prepare('Any X WHERE E firstname X, E eid 12').children[0] + self.assertRaises(UnknownEid, self.o.generate, rqlst, 'E') + + +if __name__ == '__main__': + unittest_main()