# HG changeset patch # User Sylvain Thénault # Date 1324404355 -3600 # Node ID 67b9b273b70da6b93855f1fbed030a7f63d5ff79 # Parent e88a24f881431ce8a1f61551ffd3f4c062ccee19 [ldap test] test actual ldap authentication instead of monkey-patching also drop usage of not-anymore-necessary constants. diff -r e88a24f88143 -r 67b9b273b70d server/test/data/ldap_test.ldif --- a/server/test/data/ldap_test.ldif Thu Dec 22 17:56:04 2011 +0100 +++ b/server/test/data/ldap_test.ldif Tue Dec 20 19:05:55 2011 +0100 @@ -31,6 +31,7 @@ gecos: Sylvain Thenault mail: sylvain.thenault@logilab.fr mail: syt@logilab.fr +userPassword: {SSHA}v/8xJQP3uoaTBZz1T7Y0B3qOxRN1cj7D dn: uid=adim,ou=People,dc=cubicweb,dc=test loginShell: /bin/bash @@ -52,4 +53,5 @@ gecos: Adrien Di Mascio mail: adim@logilab.fr mail: adrien.dimascio@logilab.fr +userPassword: {SSHA}cPQOWqkkLDlfWFwxcl1m8V2JdySQBHfS diff -r e88a24f88143 -r 67b9b273b70d server/test/unittest_ldapuser.py --- a/server/test/unittest_ldapuser.py Thu Dec 22 17:56:04 2011 +0100 +++ b/server/test/unittest_ldapuser.py Tue Dec 20 19:05:55 2011 +0100 @@ -32,9 +32,6 @@ from cubicweb.server.sources.ldapuser import * -SYT = 'syt' -SYT_EMAIL = 'Sylvain Thenault' -ADIM = 'adim' CONFIG = u'''host=%s user-base-dn=ou=People,dc=cubicweb,dc=test user-scope=ONELEVEL @@ -45,24 +42,6 @@ ''' -def nopwd_authenticate(self, session, login, password): - """used to monkey patch the source to get successful authentication without - upassword checking - """ - assert login, 'no login!' - searchfilter = [filter_format('(%s=%s)', (self.user_login_attr, login))] - searchfilter.extend(self.base_filters) - searchstr = '(&%s)' % ''.join(searchfilter) - # first search the user - try: - user = self._search(session, self.user_base_dn, self.user_base_scope, - searchstr)[0] - except IndexError: - # no such user - raise AuthenticationError() - # don't check upassword ! - return self.repo.extid2eid(self, user['dn'], 'CWUser', session) - def setUpModule(*args): create_slapd_configuration(LDAPUserSourceTC.config) @@ -91,7 +70,7 @@ host = 'localhost:%s' % port ldapuri = 'ldap://%s' % host cmdline = ["/usr/sbin/slapd", "-f", slapdconf, "-h", ldapuri, "-d", "0"] - print "Starting slapd on", ldapuri + print 'Starting slapd:', ' '.join(cmdline) slapd_process = subprocess.Popen(cmdline) time.sleep(0.2) if slapd_process.poll() is None: @@ -121,20 +100,11 @@ @classmethod def pre_setup_database(cls, session, config): session.create_entity('CWSource', name=u'ldapuser', type=u'ldapuser', - config=CONFIG) + config=CONFIG) session.commit() # XXX keep it there session.execute('CWUser U') - def patch_authenticate(self): - self._orig_authenticate = LDAPUserSource.authenticate - LDAPUserSource.authenticate = nopwd_authenticate - - def tearDown(self): - if hasattr(self, '_orig_authenticate'): - LDAPUserSource.authenticate = self._orig_authenticate - CubicWebTC.tearDown(self) - def test_authenticate(self): source = self.repo.sources_by_uri['ldapuser'] self.session.set_cnxset() @@ -147,50 +117,50 @@ def test_base(self): # check a known one - rset = self.sexecute('CWUser X WHERE X login %(login)s', {'login': SYT}) + rset = self.sexecute('CWUser X WHERE X login %(login)s', {'login': 'syt'}) e = rset.get_entity(0, 0) - self.assertEqual(e.login, SYT) + self.assertEqual(e.login, 'syt') e.complete() self.assertEqual(e.creation_date, None) self.assertEqual(e.modification_date, None) self.assertEqual(e.firstname, None) self.assertEqual(e.surname, None) self.assertEqual(e.in_group[0].name, 'users') - self.assertEqual(e.owned_by[0].login, SYT) + self.assertEqual(e.owned_by[0].login, 'syt') self.assertEqual(e.created_by, ()) - self.assertEqual(e.primary_email[0].address, SYT_EMAIL) + self.assertEqual(e.primary_email[0].address, 'Sylvain Thenault') # email content should be indexed on the user rset = self.sexecute('CWUser X WHERE X has_text "thenault"') self.assertEqual(rset.rows, [[e.eid]]) def test_not(self): - eid = self.sexecute('CWUser X WHERE X login %(login)s', {'login': SYT})[0][0] + eid = self.sexecute('CWUser X WHERE X login %(login)s', {'login': 'syt'})[0][0] rset = self.sexecute('CWUser X WHERE NOT X eid %s' % eid) self.assert_(rset) self.assert_(not eid in (r[0] for r in rset)) def test_multiple(self): - seid = self.sexecute('CWUser X WHERE X login %(login)s', {'login': SYT})[0][0] - aeid = self.sexecute('CWUser X WHERE X login %(login)s', {'login': ADIM})[0][0] + seid = self.sexecute('CWUser X WHERE X login %(login)s', {'login': 'syt'})[0][0] + aeid = self.sexecute('CWUser X WHERE X login %(login)s', {'login': 'adim'})[0][0] rset = self.sexecute('CWUser X, Y WHERE X login %(syt)s, Y login %(adim)s', - {'syt': SYT, 'adim': ADIM}) + {'syt': 'syt', 'adim': 'adim'}) self.assertEqual(rset.rows, [[seid, aeid]]) rset = self.sexecute('Any X,Y,L WHERE X login L, X login %(syt)s, Y login %(adim)s', - {'syt': SYT, 'adim': ADIM}) - self.assertEqual(rset.rows, [[seid, aeid, SYT]]) + {'syt': 'syt', 'adim': 'adim'}) + self.assertEqual(rset.rows, [[seid, aeid, 'syt']]) def test_in(self): - seid = self.sexecute('CWUser X WHERE X login %(login)s', {'login': SYT})[0][0] - aeid = self.sexecute('CWUser X WHERE X login %(login)s', {'login': ADIM})[0][0] - rset = self.sexecute('Any X,L ORDERBY L WHERE X login IN("%s", "%s"), X login L' % (SYT, ADIM)) - self.assertEqual(rset.rows, [[aeid, ADIM], [seid, SYT]]) + seid = self.sexecute('CWUser X WHERE X login %(login)s', {'login': 'syt'})[0][0] + aeid = self.sexecute('CWUser X WHERE X login %(login)s', {'login': 'adim'})[0][0] + rset = self.sexecute('Any X,L ORDERBY L WHERE X login IN("%s", "%s"), X login L' % ('syt', 'adim')) + self.assertEqual(rset.rows, [[aeid, 'adim'], [seid, 'syt']]) def test_relations(self): - eid = self.sexecute('CWUser X WHERE X login %(login)s', {'login': SYT})[0][0] + eid = self.sexecute('CWUser X WHERE X login %(login)s', {'login': 'syt'})[0][0] rset = self.sexecute('Any X,E WHERE X is CWUser, X login L, X primary_email E') self.assert_(eid in (r[0] for r in rset)) rset = self.sexecute('Any X,L,E WHERE X is CWUser, X login L, X primary_email E') - self.assert_(SYT in (r[1] for r in rset)) + self.assert_('syt' in (r[1] for r in rset)) def test_count(self): nbusers = self.sexecute('Any COUNT(X) WHERE X is CWUser')[0][0] @@ -199,15 +169,15 @@ self.assert_(nbusers < 30, nbusers) def test_upper(self): - eid = self.sexecute('CWUser X WHERE X login %(login)s', {'login': SYT})[0][0] + eid = self.sexecute('CWUser X WHERE X login %(login)s', {'login': 'syt'})[0][0] rset = self.sexecute('Any UPPER(L) WHERE X eid %s, X login L' % eid) - self.assertEqual(rset[0][0], SYT.upper()) + self.assertEqual(rset[0][0], 'syt'.upper()) def test_unknown_attr(self): - eid = self.sexecute('CWUser X WHERE X login %(login)s', {'login': SYT})[0][0] + eid = self.sexecute('CWUser X WHERE X login %(login)s', {'login': 'syt'})[0][0] rset = self.sexecute('Any L,C,M WHERE X eid %s, X login L, ' 'X creation_date C, X modification_date M' % eid) - self.assertEqual(rset[0][0], SYT) + self.assertEqual(rset[0][0], 'syt') self.assertEqual(rset[0][1], None) self.assertEqual(rset[0][2], None) @@ -221,20 +191,19 @@ def test_or(self): rset = self.sexecute('DISTINCT Any X WHERE X login %(login)s OR (X in_group G, G name "managers")', - {'login': SYT}) + {'login': 'syt'}) self.assertEqual(len(rset), 2, rset.rows) # syt + admin def test_nonregr_set_owned_by(self): # test that when a user coming from ldap is triggering a transition # the related TrInfo has correct owner information - self.sexecute('SET X in_group G WHERE X login %(syt)s, G name "managers"', {'syt': SYT}) + self.sexecute('SET X in_group G WHERE X login %(syt)s, G name "managers"', {'syt': 'syt'}) self.commit() - syt = self.sexecute('CWUser X WHERE X login %(login)s', {'login': SYT}).get_entity(0, 0) + syt = self.sexecute('CWUser X WHERE X login %(login)s', {'login': 'syt'}).get_entity(0, 0) self.assertEqual([g.name for g in syt.in_group], ['managers', 'users']) - self.patch_authenticate() - cnx = self.login(SYT, password='dummypassword') + cnx = self.login('syt', password='syt') cu = cnx.cursor() - adim = cu.execute('CWUser X WHERE X login %(login)s', {'login': ADIM}).get_entity(0, 0) + adim = cu.execute('CWUser X WHERE X login %(login)s', {'login': 'adim'}).get_entity(0, 0) iworkflowable = adim.cw_adapt_to('IWorkflowable') iworkflowable.fire_transition('deactivate') try: @@ -242,7 +211,7 @@ adim.cw_clear_all_caches() self.assertEqual(adim.in_state[0].name, 'deactivated') trinfo = iworkflowable.latest_trinfo() - self.assertEqual(trinfo.owned_by[0].login, SYT) + self.assertEqual(trinfo.owned_by[0].login, 'syt') # select from_state to skip the user's creation TrInfo rset = self.sexecute('Any U ORDERBY D DESC WHERE WF wf_info_for X,' 'WF creation_date D, WF from_state FS,' @@ -252,9 +221,9 @@ finally: # restore db state self.restore_connection() - adim = self.sexecute('CWUser X WHERE X login %(login)s', {'login': ADIM}).get_entity(0, 0) + adim = self.sexecute('CWUser X WHERE X login %(login)s', {'login': 'adim'}).get_entity(0, 0) adim.cw_adapt_to('IWorkflowable').fire_transition('activate') - self.sexecute('DELETE X in_group G WHERE X login %(syt)s, G name "managers"', {'syt': SYT}) + self.sexecute('DELETE X in_group G WHERE X login %(syt)s, G name "managers"', {'syt': 'syt'}) def test_same_column_names(self): self.sexecute('Any X, Y WHERE X copain Y, X login "comme", Y login "cochon"') @@ -262,17 +231,17 @@ def test_multiple_entities_from_different_sources(self): req = self.request() self.create_user(req, 'cochon') - self.assertTrue(self.sexecute('Any X,Y WHERE X login %(syt)s, Y login "cochon"', {'syt': SYT})) + self.assertTrue(self.sexecute('Any X,Y WHERE X login %(syt)s, Y login "cochon"', {'syt': 'syt'})) def test_exists1(self): self.session.set_cnxset() self.session.create_entity('CWGroup', name=u'bougloup1') self.session.create_entity('CWGroup', name=u'bougloup2') self.sexecute('SET U in_group G WHERE G name ~= "bougloup%", U login "admin"') - self.sexecute('SET U in_group G WHERE G name = "bougloup1", U login %(syt)s', {'syt': SYT}) + self.sexecute('SET U in_group G WHERE G name = "bougloup1", U login %(syt)s', {'syt': 'syt'}) rset = self.sexecute('Any L,SN ORDERBY L WHERE X in_state S, ' 'S name SN, X login L, EXISTS(X in_group G, G name ~= "bougloup%")') - self.assertEqual(rset.rows, [['admin', 'activated'], [SYT, 'activated']]) + self.assertEqual(rset.rows, [['admin', 'activated'], ['syt', 'activated']]) def test_exists2(self): req = self.request() @@ -289,11 +258,11 @@ self.create_user(req, 'cochon') self.sexecute('SET X copain Y WHERE X login "comme", Y login "cochon"') self.assertTrue(self.sexecute('Any X, Y WHERE X copain Y, X login "comme", Y login "cochon"')) - self.sexecute('SET X copain Y WHERE X login %(syt)s, Y login "cochon"', {'syt': SYT}) - self.assertTrue(self.sexecute('Any X, Y WHERE X copain Y, X login %(syt)s, Y login "cochon"', {'syt': SYT})) + self.sexecute('SET X copain Y WHERE X login %(syt)s, Y login "cochon"', {'syt': 'syt'}) + self.assertTrue(self.sexecute('Any X, Y WHERE X copain Y, X login %(syt)s, Y login "cochon"', {'syt': 'syt'})) rset = self.sexecute('Any GN,L WHERE X in_group G, X login L, G name GN, G name "managers" ' 'OR EXISTS(X copain T, T login in ("comme", "cochon"))') - self.assertEqual(sorted(rset.rows), [['managers', 'admin'], ['users', 'comme'], ['users', SYT]]) + self.assertEqual(sorted(rset.rows), [['managers', 'admin'], ['users', 'comme'], ['users', 'syt']]) def test_exists4(self): req = self.request() @@ -303,7 +272,7 @@ self.sexecute('SET X copain Y WHERE X login "comme", Y login "cochon"') self.sexecute('SET X copain Y WHERE X login "cochon", Y login "cochon"') self.sexecute('SET X copain Y WHERE X login "comme", Y login "billy"') - self.sexecute('SET X copain Y WHERE X login %(syt)s, Y login "billy"', {'syt': SYT}) + self.sexecute('SET X copain Y WHERE X login %(syt)s, Y login "billy"', {'syt': 'syt'}) # search for group name, login where # CWUser copain with "comme" or "cochon" AND same login as the copain # OR @@ -315,7 +284,7 @@ 'EXISTS(X in_state S, S name "activated", NOT X copain T2, T2 login "billy")') all = self.sexecute('Any GN, L WHERE X in_group G, X login L, G name GN') all.rows.remove(['users', 'comme']) - all.rows.remove(['users', SYT]) + all.rows.remove(['users', 'syt']) self.assertEqual(sorted(rset.rows), sorted(all.rows)) def test_exists5(self): @@ -326,17 +295,17 @@ self.sexecute('SET X copain Y WHERE X login "comme", Y login "cochon"') self.sexecute('SET X copain Y WHERE X login "cochon", Y login "cochon"') self.sexecute('SET X copain Y WHERE X login "comme", Y login "billy"') - self.sexecute('SET X copain Y WHERE X login %(syt)s, Y login "cochon"', {'syt': SYT}) + self.sexecute('SET X copain Y WHERE X login %(syt)s, Y login "cochon"', {'syt': 'syt'}) rset= self.sexecute('Any L WHERE X login L, ' 'EXISTS(X copain T, T login in ("comme", "cochon")) AND ' 'NOT EXISTS(X copain T2, T2 login "billy")') - self.assertEqual(sorted(rset.rows), [['cochon'], [SYT]]) + self.assertEqual(sorted(rset.rows), [['cochon'], ['syt']]) rset= self.sexecute('Any GN,L WHERE X in_group G, X login L, G name GN, ' 'EXISTS(X copain T, T login in ("comme", "cochon")) AND ' 'NOT EXISTS(X copain T2, T2 login "billy")') self.assertEqual(sorted(rset.rows), [['guests', 'cochon'], ['users', 'cochon'], - ['users', SYT]]) + ['users', 'syt']]) def test_cd_restriction(self): rset = self.sexecute('CWUser X WHERE X creation_date > "2009-02-01"') @@ -359,32 +328,32 @@ def test_security1(self): cu = self._init_security_test() - rset = cu.execute('CWUser X WHERE X login %(login)s', {'login': SYT}) + rset = cu.execute('CWUser X WHERE X login %(login)s', {'login': 'syt'}) self.assertEqual(rset.rows, []) rset = cu.execute('Any X WHERE X login "iaminguestsgrouponly"') self.assertEqual(len(rset.rows), 1) def test_security2(self): cu = self._init_security_test() - rset = cu.execute('Any X WHERE X has_text %(syt)s', {'syt': SYT}) + rset = cu.execute('Any X WHERE X has_text %(syt)s', {'syt': 'syt'}) self.assertEqual(rset.rows, []) rset = cu.execute('Any X WHERE X has_text "iaminguestsgrouponly"') self.assertEqual(len(rset.rows), 1) def test_security3(self): cu = self._init_security_test() - rset = cu.execute('Any F WHERE X has_text %(syt)s, X firstname F', {'syt': SYT}) + rset = cu.execute('Any F WHERE X has_text %(syt)s, X firstname F', {'syt': 'syt'}) self.assertEqual(rset.rows, []) rset = cu.execute('Any F WHERE X has_text "iaminguestsgrouponly", X firstname F') self.assertEqual(rset.rows, [[None]]) def test_copy_to_system_source(self): source = self.repo.sources_by_uri['ldapuser'] - eid = self.sexecute('CWUser X WHERE X login %(login)s', {'login': SYT})[0][0] + eid = self.sexecute('CWUser X WHERE X login %(login)s', {'login': 'syt'})[0][0] self.sexecute('SET X cw_source S WHERE X eid %(x)s, S name "system"', {'x': eid}) self.commit() source.reset_caches() - rset = self.sexecute('CWUser X WHERE X login %(login)s', {'login': SYT}) + rset = self.sexecute('CWUser X WHERE X login %(login)s', {'login': 'syt'}) self.assertEqual(len(rset), 1) e = rset.get_entity(0, 0) self.assertEqual(e.eid, eid) @@ -396,7 +365,7 @@ self.assertTrue(e.modification_date) # XXX test some password has been set source.synchronize() - rset = self.sexecute('CWUser X WHERE X login %(login)s', {'login': SYT}) + rset = self.sexecute('CWUser X WHERE X login %(login)s', {'login': 'syt'}) self.assertEqual(len(rset), 1) def test_nonregr1(self):