diff -r 1349398d744e -r 52647b05bda8 server/test/unittest_ldapsource.py --- a/server/test/unittest_ldapsource.py Fri Jun 13 12:56:45 2014 +0200 +++ b/server/test/unittest_ldapsource.py Fri May 30 17:24:44 2014 +0200 @@ -122,32 +122,29 @@ pass @classmethod - def pre_setup_database(cls, session, config): - session.create_entity('CWSource', name=u'ldap', type=u'ldapfeed', parser=u'ldapfeed', - url=URL, config=CONFIG_LDAPFEED) + def pre_setup_database(cls, cnx, config): + cnx.create_entity('CWSource', name=u'ldap', type=u'ldapfeed', parser=u'ldapfeed', + url=URL, config=CONFIG_LDAPFEED) - session.commit() - return cls._pull(session) + cnx.commit() + return cls.pull(cnx) @classmethod - def _pull(cls, session): - with session.repo.internal_session() as isession: - lfsource = isession.repo.sources_by_uri['ldap'] - stats = lfsource.pull_data(isession, force=True, raise_on_error=True) - isession.commit() - return stats - - def pull(self): - return self._pull(self.session) + def pull(self, cnx): + lfsource = cnx.repo.sources_by_uri['ldap'] + stats = lfsource.pull_data(cnx, force=True, raise_on_error=True) + cnx.commit() + return stats def setup_database(self): - with self.session.repo.internal_session(safe=True) as session: - session.execute('DELETE Any E WHERE E cw_source S, S name "ldap"') - session.execute('SET S config %(conf)s, S url %(url)s ' - 'WHERE S is CWSource, S name "ldap"', - {"conf": CONFIG_LDAPFEED, 'url': URL} ) - session.commit() - self.pull() + with self.admin_access.repo_cnx() as cnx: + cnx.execute('DELETE Any E WHERE E cw_source S, S name "ldap"') + cnx.execute('SET S config %(conf)s, S url %(url)s ' + 'WHERE S is CWSource, S name "ldap"', + {"conf": CONFIG_LDAPFEED, 'url': URL} ) + cnx.commit() + with self.repo.internal_cnx() as cnx: + self.pull(cnx) def add_ldap_entry(self, dn, mods): """ @@ -200,16 +197,16 @@ """ def test_wrong_group(self): - with self.session.repo.internal_session(safe=True) as session: - source = self.session.execute('CWSource S WHERE S type="ldapfeed"').get_entity(0,0) + with self.admin_access.repo_cnx() as cnx: + source = cnx.execute('CWSource S WHERE S type="ldapfeed"').get_entity(0,0) config = source.repo_source.check_config(source) # inject a bogus group here, along with at least a valid one config['user-default-group'] = ('thisgroupdoesnotexists','users') source.repo_source.update_config(source, config) - session.commit(free_cnxset=False) + cnx.commit() # here we emitted an error log entry - stats = source.repo_source.pull_data(session, force=True, raise_on_error=True) - session.commit() + stats = source.repo_source.pull_data(cnx, force=True, raise_on_error=True) + cnx.commit() @@ -224,119 +221,129 @@ def test_authenticate(self): source = self.repo.sources_by_uri['ldap'] - self.session.set_cnxset() - # ensure we won't be logged against - self.assertRaises(AuthenticationError, - source.authenticate, self.session, 'toto', 'toto') - self.assertTrue(source.authenticate(self.session, 'syt', 'syt')) + with self.admin_access.repo_cnx() as cnx: + # ensure we won't be logged against + self.assertRaises(AuthenticationError, + source.authenticate, cnx, 'toto', 'toto') + self.assertTrue(source.authenticate(cnx, 'syt', 'syt')) self.assertTrue(self.repo.connect('syt', password='syt')) def test_base(self): - # check a known one - rset = self.sexecute('CWUser X WHERE X login %(login)s', {'login': 'syt'}) - e = rset.get_entity(0, 0) - self.assertEqual(e.login, 'syt') - e.complete() - self.assertMetadata(e) - self.assertEqual(e.firstname, None) - self.assertEqual(e.surname, None) - self.assertIn('users', set(g.name for g in e.in_group)) - self.assertEqual(e.owned_by[0].login, 'syt') - self.assertEqual(e.created_by, ()) - addresses = [pe.address for pe in e.use_email] - addresses.sort() - self.assertEqual(['sylvain.thenault@logilab.fr', 'syt@logilab.fr'], - addresses) - self.assertIn(e.primary_email[0].address, ['sylvain.thenault@logilab.fr', - 'syt@logilab.fr']) - # email content should be indexed on the user - rset = self.sexecute('CWUser X WHERE X has_text "thenault"') - self.assertEqual(rset.rows, [[e.eid]]) + with self.admin_access.repo_cnx() as cnx: + # check a known one + rset = cnx.execute('CWUser X WHERE X login %(login)s', {'login': 'syt'}) + e = rset.get_entity(0, 0) + self.assertEqual(e.login, 'syt') + e.complete() + self.assertMetadata(e) + self.assertEqual(e.firstname, None) + self.assertEqual(e.surname, None) + self.assertIn('users', set(g.name for g in e.in_group)) + self.assertEqual(e.owned_by[0].login, 'syt') + self.assertEqual(e.created_by, ()) + addresses = [pe.address for pe in e.use_email] + addresses.sort() + self.assertEqual(['sylvain.thenault@logilab.fr', 'syt@logilab.fr'], + addresses) + self.assertIn(e.primary_email[0].address, ['sylvain.thenault@logilab.fr', + 'syt@logilab.fr']) + # email content should be indexed on the user + rset = cnx.execute('CWUser X WHERE X has_text "thenault"') + self.assertEqual(rset.rows, [[e.eid]]) def test_copy_to_system_source(self): "make sure we can 'convert' an LDAP user into a system one" - source = self.repo.sources_by_uri['ldap'] - eid = self.sexecute('CWUser X WHERE X login %(login)s', {'login': 'syt'})[0][0] - self.sexecute('SET X cw_source S WHERE X eid %(x)s, S name "system"', {'x': eid}) - self.commit() - source.reset_caches() - rset = self.sexecute('CWUser X WHERE X login %(login)s', {'login': 'syt'}) - self.assertEqual(len(rset), 1) - e = rset.get_entity(0, 0) - self.assertEqual(e.eid, eid) - self.assertEqual(e.cw_metainformation(), {'source': {'type': u'native', - 'uri': u'system', - 'use-cwuri-as-url': False}, - 'type': 'CWUser', - 'extid': None}) - self.assertEqual(e.cw_source[0].name, 'system') - self.assertTrue(e.creation_date) - self.assertTrue(e.modification_date) - source.pull_data(self.session) - rset = self.sexecute('CWUser X WHERE X login %(login)s', {'login': 'syt'}) - self.assertEqual(len(rset), 1) - self.assertTrue(self.repo.system_source.authenticate( - self.session, 'syt', password='syt')) - # make sure the pull from ldap have not "reverted" user as a ldap-feed user - self.assertEqual(e.cw_metainformation(), {'source': {'type': u'native', - 'uri': u'system', - 'use-cwuri-as-url': False}, - 'type': 'CWUser', - 'extid': None}) - # and that the password stored in the system source is not empty or so - user = self.execute('CWUser U WHERE U login "syt"').get_entity(0, 0) - user.cw_clear_all_caches() - pwd = self.session.system_sql("SELECT cw_upassword FROM cw_cwuser WHERE cw_login='syt';").fetchall()[0][0] - self.assertIsNotNone(pwd) - self.assertTrue(str(pwd)) + with self.admin_access.repo_cnx() as cnx: + source = self.repo.sources_by_uri['ldap'] + eid = cnx.execute('CWUser X WHERE X login %(login)s', {'login': 'syt'})[0][0] + cnx.execute('SET X cw_source S WHERE X eid %(x)s, S name "system"', {'x': eid}) + cnx.commit() + source.reset_caches() + rset = cnx.execute('CWUser X WHERE X login %(login)s', {'login': 'syt'}) + self.assertEqual(len(rset), 1) + e = rset.get_entity(0, 0) + self.assertEqual(e.eid, eid) + self.assertEqual(e.cw_metainformation(), {'source': {'type': u'native', + 'uri': u'system', + 'use-cwuri-as-url': False}, + 'type': 'CWUser', + 'extid': None}) + self.assertEqual(e.cw_source[0].name, 'system') + self.assertTrue(e.creation_date) + self.assertTrue(e.modification_date) + source.pull_data(cnx) + rset = cnx.execute('CWUser X WHERE X login %(login)s', {'login': 'syt'}) + self.assertEqual(len(rset), 1) + self.assertTrue(self.repo.system_source.authenticate(cnx, 'syt', password='syt')) + # make sure the pull from ldap have not "reverted" user as a ldap-feed user + self.assertEqual(e.cw_metainformation(), {'source': {'type': u'native', + 'uri': u'system', + 'use-cwuri-as-url': False}, + 'type': 'CWUser', + 'extid': None}) + # and that the password stored in the system source is not empty or so + user = cnx.execute('CWUser U WHERE U login "syt"').get_entity(0, 0) + user.cw_clear_all_caches() + pwd = cnx.system_sql("SELECT cw_upassword FROM cw_cwuser WHERE cw_login='syt';").fetchall()[0][0] + self.assertIsNotNone(pwd) + self.assertTrue(str(pwd)) class LDAPFeedUserDeletionTC(LDAPFeedTestBase): """ A testcase for situations where users are deleted from or - unavailabe in the LDAP database. + unavailable in the LDAP database. """ + def test_a_filter_inactivate(self): """ filtered out people should be deactivated, unable to authenticate """ - source = self.session.execute('CWSource S WHERE S type="ldapfeed"').get_entity(0,0) - config = source.repo_source.check_config(source) - # filter with adim's phone number - config['user-filter'] = u'(%s=%s)' % ('telephoneNumber', '109') - source.repo_source.update_config(source, config) - self.commit() - self.pull() + with self.admin_access.repo_cnx() as cnx: + source = cnx.execute('CWSource S WHERE S type="ldapfeed"').get_entity(0,0) + config = source.repo_source.check_config(source) + # filter with adim's phone number + config['user-filter'] = u'(%s=%s)' % ('telephoneNumber', '109') + source.repo_source.update_config(source, config) + cnx.commit() + with self.repo.internal_cnx() as cnx: + self.pull(cnx) self.assertRaises(AuthenticationError, self.repo.connect, 'syt', password='syt') - self.assertEqual(self.execute('Any N WHERE U login "syt", ' - 'U in_state S, S name N').rows[0][0], - 'deactivated') - self.assertEqual(self.execute('Any N WHERE U login "adim", ' - 'U in_state S, S name N').rows[0][0], - 'activated') - # unfilter, syt should be activated again - config['user-filter'] = u'' - source.repo_source.update_config(source, config) - self.commit() - self.pull() - self.assertEqual(self.execute('Any N WHERE U login "syt", ' - 'U in_state S, S name N').rows[0][0], - 'activated') - self.assertEqual(self.execute('Any N WHERE U login "adim", ' - 'U in_state S, S name N').rows[0][0], - 'activated') + with self.admin_access.repo_cnx() as cnx: + self.assertEqual(cnx.execute('Any N WHERE U login "syt", ' + 'U in_state S, S name N').rows[0][0], + 'deactivated') + self.assertEqual(cnx.execute('Any N WHERE U login "adim", ' + 'U in_state S, S name N').rows[0][0], + 'activated') + # unfilter, syt should be activated again + config['user-filter'] = u'' + source.repo_source.update_config(source, config) + cnx.commit() + with self.repo.internal_cnx() as cnx: + self.pull(cnx) + with self.admin_access.repo_cnx() as cnx: + self.assertEqual(cnx.execute('Any N WHERE U login "syt", ' + 'U in_state S, S name N').rows[0][0], + 'activated') + self.assertEqual(cnx.execute('Any N WHERE U login "adim", ' + 'U in_state S, S name N').rows[0][0], + 'activated') def test_delete(self): """ delete syt, pull, check deactivation, repull, read syt, pull, check activation """ self.delete_ldap_entry('uid=syt,ou=People,dc=cubicweb,dc=test') - self.pull() + with self.repo.internal_cnx() as cnx: + self.pull(cnx) self.assertRaises(AuthenticationError, self.repo.connect, 'syt', password='syt') - self.assertEqual(self.execute('Any N WHERE U login "syt", ' - 'U in_state S, S name N').rows[0][0], - 'deactivated') - # check that it doesn't choke - self.pull() + with self.admin_access.repo_cnx() as cnx: + self.assertEqual(cnx.execute('Any N WHERE U login "syt", ' + 'U in_state S, S name N').rows[0][0], + 'deactivated') + with self.repo.internal_cnx() as cnx: + # check that it doesn't choke + self.pull(cnx) # reinsert syt self.add_ldap_entry('uid=syt,ou=People,dc=cubicweb,dc=test', { 'objectClass': ['OpenLDAPperson','posixAccount','top','shadowAccount'], @@ -353,76 +360,88 @@ 'gecos': 'Sylvain Thenault', 'mail': ['sylvain.thenault@logilab.fr','syt@logilab.fr'], 'userPassword': 'syt', - }) - self.pull() - self.assertEqual(self.execute('Any N WHERE U login "syt", ' - 'U in_state S, S name N').rows[0][0], - 'activated') + }) + with self.repo.internal_cnx() as cnx: + self.pull(cnx) + with self.admin_access.repo_cnx() as cnx: + self.assertEqual(cnx.execute('Any N WHERE U login "syt", ' + 'U in_state S, S name N').rows[0][0], + 'activated') def test_reactivate_deleted(self): # test reactivating BY HAND the user isn't enough to # authenticate, as the native source refuse to authenticate # user from other sources self.delete_ldap_entry('uid=syt,ou=People,dc=cubicweb,dc=test') - self.pull() - # reactivate user (which source is still ldap-feed) - user = self.execute('CWUser U WHERE U login "syt"').get_entity(0, 0) - user.cw_adapt_to('IWorkflowable').fire_transition('activate') - self.commit() - with self.assertRaises(AuthenticationError): - self.repo.connect('syt', password='syt') + with self.repo.internal_cnx() as cnx: + self.pull(cnx) + with self.admin_access.repo_cnx() as cnx: + # reactivate user (which source is still ldap-feed) + user = cnx.execute('CWUser U WHERE U login "syt"').get_entity(0, 0) + user.cw_adapt_to('IWorkflowable').fire_transition('activate') + cnx.commit() + with self.assertRaises(AuthenticationError): + self.repo.connect('syt', password='syt') - # ok now let's try to make it a system user - self.sexecute('SET X cw_source S WHERE X eid %(x)s, S name "system"', {'x': user.eid}) - self.commit() + # ok now let's try to make it a system user + cnx.execute('SET X cw_source S WHERE X eid %(x)s, S name "system"', {'x': user.eid}) + cnx.commit() # and that we can now authenticate again self.assertRaises(AuthenticationError, self.repo.connect, 'syt', password='toto') self.assertTrue(self.repo.connect('syt', password='syt')) + class LDAPFeedGroupTC(LDAPFeedTestBase): """ A testcase for group support in ldapfeed. """ def test_groups_exist(self): - rset = self.sexecute('CWGroup X WHERE X name "dir"') - self.assertEqual(len(rset), 1) + with self.admin_access.repo_cnx() as cnx: + rset = cnx.execute('CWGroup X WHERE X name "dir"') + self.assertEqual(len(rset), 1) - rset = self.sexecute('CWGroup X WHERE X cw_source S, S name "ldap"') - self.assertEqual(len(rset), 2) + rset = cnx.execute('CWGroup X WHERE X cw_source S, S name "ldap"') + self.assertEqual(len(rset), 2) def test_group_deleted(self): - rset = self.sexecute('CWGroup X WHERE X name "dir"') - self.assertEqual(len(rset), 1) + with self.admin_access.repo_cnx() as cnx: + rset = cnx.execute('CWGroup X WHERE X name "dir"') + self.assertEqual(len(rset), 1) def test_in_group(self): - rset = self.sexecute('CWGroup X WHERE X name %(name)s', {'name': 'dir'}) - dirgroup = rset.get_entity(0, 0) - self.assertEqual(set(['syt', 'adim']), - set([u.login for u in dirgroup.reverse_in_group])) - rset = self.sexecute('CWGroup X WHERE X name %(name)s', {'name': 'logilab'}) - logilabgroup = rset.get_entity(0, 0) - self.assertEqual(set(['adim']), - set([u.login for u in logilabgroup.reverse_in_group])) + with self.admin_access.repo_cnx() as cnx: + rset = cnx.execute('CWGroup X WHERE X name %(name)s', {'name': 'dir'}) + dirgroup = rset.get_entity(0, 0) + self.assertEqual(set(['syt', 'adim']), + set([u.login for u in dirgroup.reverse_in_group])) + rset = cnx.execute('CWGroup X WHERE X name %(name)s', {'name': 'logilab'}) + logilabgroup = rset.get_entity(0, 0) + self.assertEqual(set(['adim']), + set([u.login for u in logilabgroup.reverse_in_group])) def test_group_member_added(self): - self.pull() - rset = self.sexecute('Any L WHERE U in_group G, G name %(name)s, U login L', - {'name': 'logilab'}) - self.assertEqual(len(rset), 1) - self.assertEqual(rset[0][0], 'adim') + with self.repo.internal_cnx() as cnx: + self.pull(cnx) + with self.admin_access.repo_cnx() as cnx: + rset = cnx.execute('Any L WHERE U in_group G, G name %(name)s, U login L', + {'name': 'logilab'}) + self.assertEqual(len(rset), 1) + self.assertEqual(rset[0][0], 'adim') try: self.update_ldap_entry('cn=logilab,ou=Group,dc=cubicweb,dc=test', - {('add', 'memberUid'): ['syt']}) + {('add', 'memberUid'): ['syt']}) time.sleep(1.1) # timestamps precision is 1s - self.pull() + with self.repo.internal_cnx() as cnx: + self.pull(cnx) - rset = self.sexecute('Any L WHERE U in_group G, G name %(name)s, U login L', - {'name': 'logilab'}) - self.assertEqual(len(rset), 2) - members = set([u[0] for u in rset]) - self.assertEqual(set(['adim', 'syt']), members) + with self.admin_access.repo_cnx() as cnx: + rset = cnx.execute('Any L WHERE U in_group G, G name %(name)s, U login L', + {'name': 'logilab'}) + self.assertEqual(len(rset), 2) + members = set([u[0] for u in rset]) + self.assertEqual(set(['adim', 'syt']), members) finally: # back to normal ldap setup @@ -430,21 +449,25 @@ self.setUpClass() def test_group_member_deleted(self): - self.pull() # ensure we are sync'ed - rset = self.sexecute('Any L WHERE U in_group G, G name %(name)s, U login L', - {'name': 'logilab'}) - self.assertEqual(len(rset), 1) - self.assertEqual(rset[0][0], 'adim') + with self.repo.internal_cnx() as cnx: + self.pull(cnx) # ensure we are sync'ed + with self.admin_access.repo_cnx() as cnx: + rset = cnx.execute('Any L WHERE U in_group G, G name %(name)s, U login L', + {'name': 'logilab'}) + self.assertEqual(len(rset), 1) + self.assertEqual(rset[0][0], 'adim') try: self.update_ldap_entry('cn=logilab,ou=Group,dc=cubicweb,dc=test', {('delete', 'memberUid'): ['adim']}) time.sleep(1.1) # timestamps precision is 1s - self.pull() + with self.repo.internal_cnx() as cnx: + self.pull(cnx) - rset = self.sexecute('Any L WHERE U in_group G, G name %(name)s, U login L', - {'name': 'logilab'}) - self.assertEqual(len(rset), 0) + with self.admin_access.repo_cnx() as cnx: + rset = cnx.execute('Any L WHERE U in_group G, G name %(name)s, U login L', + {'name': 'logilab'}) + self.assertEqual(len(rset), 0) finally: # back to normal ldap setup self.tearDownClass()