--- a/server/test/unittest_ldapsource.py Fri Jun 13 12:56:45 2014 +0200
+++ b/server/test/unittest_ldapsource.py Fri May 30 17:24:44 2014 +0200
@@ -122,32 +122,29 @@
pass
@classmethod
- def pre_setup_database(cls, session, config):
- session.create_entity('CWSource', name=u'ldap', type=u'ldapfeed', parser=u'ldapfeed',
- url=URL, config=CONFIG_LDAPFEED)
+ def pre_setup_database(cls, cnx, config):
+ cnx.create_entity('CWSource', name=u'ldap', type=u'ldapfeed', parser=u'ldapfeed',
+ url=URL, config=CONFIG_LDAPFEED)
- session.commit()
- return cls._pull(session)
+ cnx.commit()
+ return cls.pull(cnx)
@classmethod
- def _pull(cls, session):
- with session.repo.internal_session() as isession:
- lfsource = isession.repo.sources_by_uri['ldap']
- stats = lfsource.pull_data(isession, force=True, raise_on_error=True)
- isession.commit()
- return stats
-
- def pull(self):
- return self._pull(self.session)
+ def pull(self, cnx):
+ lfsource = cnx.repo.sources_by_uri['ldap']
+ stats = lfsource.pull_data(cnx, force=True, raise_on_error=True)
+ cnx.commit()
+ return stats
def setup_database(self):
- with self.session.repo.internal_session(safe=True) as session:
- session.execute('DELETE Any E WHERE E cw_source S, S name "ldap"')
- session.execute('SET S config %(conf)s, S url %(url)s '
- 'WHERE S is CWSource, S name "ldap"',
- {"conf": CONFIG_LDAPFEED, 'url': URL} )
- session.commit()
- self.pull()
+ with self.admin_access.repo_cnx() as cnx:
+ cnx.execute('DELETE Any E WHERE E cw_source S, S name "ldap"')
+ cnx.execute('SET S config %(conf)s, S url %(url)s '
+ 'WHERE S is CWSource, S name "ldap"',
+ {"conf": CONFIG_LDAPFEED, 'url': URL} )
+ cnx.commit()
+ with self.repo.internal_cnx() as cnx:
+ self.pull(cnx)
def add_ldap_entry(self, dn, mods):
"""
@@ -200,16 +197,16 @@
"""
def test_wrong_group(self):
- with self.session.repo.internal_session(safe=True) as session:
- source = self.session.execute('CWSource S WHERE S type="ldapfeed"').get_entity(0,0)
+ with self.admin_access.repo_cnx() as cnx:
+ source = cnx.execute('CWSource S WHERE S type="ldapfeed"').get_entity(0,0)
config = source.repo_source.check_config(source)
# inject a bogus group here, along with at least a valid one
config['user-default-group'] = ('thisgroupdoesnotexists','users')
source.repo_source.update_config(source, config)
- session.commit(free_cnxset=False)
+ cnx.commit()
# here we emitted an error log entry
- stats = source.repo_source.pull_data(session, force=True, raise_on_error=True)
- session.commit()
+ stats = source.repo_source.pull_data(cnx, force=True, raise_on_error=True)
+ cnx.commit()
@@ -224,119 +221,129 @@
def test_authenticate(self):
source = self.repo.sources_by_uri['ldap']
- self.session.set_cnxset()
- # ensure we won't be logged against
- self.assertRaises(AuthenticationError,
- source.authenticate, self.session, 'toto', 'toto')
- self.assertTrue(source.authenticate(self.session, 'syt', 'syt'))
+ with self.admin_access.repo_cnx() as cnx:
+ # ensure we won't be logged against
+ self.assertRaises(AuthenticationError,
+ source.authenticate, cnx, 'toto', 'toto')
+ self.assertTrue(source.authenticate(cnx, 'syt', 'syt'))
self.assertTrue(self.repo.connect('syt', password='syt'))
def test_base(self):
- # check a known one
- rset = self.sexecute('CWUser X WHERE X login %(login)s', {'login': 'syt'})
- e = rset.get_entity(0, 0)
- self.assertEqual(e.login, 'syt')
- e.complete()
- self.assertMetadata(e)
- self.assertEqual(e.firstname, None)
- self.assertEqual(e.surname, None)
- self.assertIn('users', set(g.name for g in e.in_group))
- self.assertEqual(e.owned_by[0].login, 'syt')
- self.assertEqual(e.created_by, ())
- addresses = [pe.address for pe in e.use_email]
- addresses.sort()
- self.assertEqual(['sylvain.thenault@logilab.fr', 'syt@logilab.fr'],
- addresses)
- self.assertIn(e.primary_email[0].address, ['sylvain.thenault@logilab.fr',
- 'syt@logilab.fr'])
- # email content should be indexed on the user
- rset = self.sexecute('CWUser X WHERE X has_text "thenault"')
- self.assertEqual(rset.rows, [[e.eid]])
+ with self.admin_access.repo_cnx() as cnx:
+ # check a known one
+ rset = cnx.execute('CWUser X WHERE X login %(login)s', {'login': 'syt'})
+ e = rset.get_entity(0, 0)
+ self.assertEqual(e.login, 'syt')
+ e.complete()
+ self.assertMetadata(e)
+ self.assertEqual(e.firstname, None)
+ self.assertEqual(e.surname, None)
+ self.assertIn('users', set(g.name for g in e.in_group))
+ self.assertEqual(e.owned_by[0].login, 'syt')
+ self.assertEqual(e.created_by, ())
+ addresses = [pe.address for pe in e.use_email]
+ addresses.sort()
+ self.assertEqual(['sylvain.thenault@logilab.fr', 'syt@logilab.fr'],
+ addresses)
+ self.assertIn(e.primary_email[0].address, ['sylvain.thenault@logilab.fr',
+ 'syt@logilab.fr'])
+ # email content should be indexed on the user
+ rset = cnx.execute('CWUser X WHERE X has_text "thenault"')
+ self.assertEqual(rset.rows, [[e.eid]])
def test_copy_to_system_source(self):
"make sure we can 'convert' an LDAP user into a system one"
- source = self.repo.sources_by_uri['ldap']
- eid = self.sexecute('CWUser X WHERE X login %(login)s', {'login': 'syt'})[0][0]
- self.sexecute('SET X cw_source S WHERE X eid %(x)s, S name "system"', {'x': eid})
- self.commit()
- source.reset_caches()
- rset = self.sexecute('CWUser X WHERE X login %(login)s', {'login': 'syt'})
- self.assertEqual(len(rset), 1)
- e = rset.get_entity(0, 0)
- self.assertEqual(e.eid, eid)
- self.assertEqual(e.cw_metainformation(), {'source': {'type': u'native',
- 'uri': u'system',
- 'use-cwuri-as-url': False},
- 'type': 'CWUser',
- 'extid': None})
- self.assertEqual(e.cw_source[0].name, 'system')
- self.assertTrue(e.creation_date)
- self.assertTrue(e.modification_date)
- source.pull_data(self.session)
- rset = self.sexecute('CWUser X WHERE X login %(login)s', {'login': 'syt'})
- self.assertEqual(len(rset), 1)
- self.assertTrue(self.repo.system_source.authenticate(
- self.session, 'syt', password='syt'))
- # make sure the pull from ldap have not "reverted" user as a ldap-feed user
- self.assertEqual(e.cw_metainformation(), {'source': {'type': u'native',
- 'uri': u'system',
- 'use-cwuri-as-url': False},
- 'type': 'CWUser',
- 'extid': None})
- # and that the password stored in the system source is not empty or so
- user = self.execute('CWUser U WHERE U login "syt"').get_entity(0, 0)
- user.cw_clear_all_caches()
- pwd = self.session.system_sql("SELECT cw_upassword FROM cw_cwuser WHERE cw_login='syt';").fetchall()[0][0]
- self.assertIsNotNone(pwd)
- self.assertTrue(str(pwd))
+ with self.admin_access.repo_cnx() as cnx:
+ source = self.repo.sources_by_uri['ldap']
+ eid = cnx.execute('CWUser X WHERE X login %(login)s', {'login': 'syt'})[0][0]
+ cnx.execute('SET X cw_source S WHERE X eid %(x)s, S name "system"', {'x': eid})
+ cnx.commit()
+ source.reset_caches()
+ rset = cnx.execute('CWUser X WHERE X login %(login)s', {'login': 'syt'})
+ self.assertEqual(len(rset), 1)
+ e = rset.get_entity(0, 0)
+ self.assertEqual(e.eid, eid)
+ self.assertEqual(e.cw_metainformation(), {'source': {'type': u'native',
+ 'uri': u'system',
+ 'use-cwuri-as-url': False},
+ 'type': 'CWUser',
+ 'extid': None})
+ self.assertEqual(e.cw_source[0].name, 'system')
+ self.assertTrue(e.creation_date)
+ self.assertTrue(e.modification_date)
+ source.pull_data(cnx)
+ rset = cnx.execute('CWUser X WHERE X login %(login)s', {'login': 'syt'})
+ self.assertEqual(len(rset), 1)
+ self.assertTrue(self.repo.system_source.authenticate(cnx, 'syt', password='syt'))
+ # make sure the pull from ldap have not "reverted" user as a ldap-feed user
+ self.assertEqual(e.cw_metainformation(), {'source': {'type': u'native',
+ 'uri': u'system',
+ 'use-cwuri-as-url': False},
+ 'type': 'CWUser',
+ 'extid': None})
+ # and that the password stored in the system source is not empty or so
+ user = cnx.execute('CWUser U WHERE U login "syt"').get_entity(0, 0)
+ user.cw_clear_all_caches()
+ pwd = cnx.system_sql("SELECT cw_upassword FROM cw_cwuser WHERE cw_login='syt';").fetchall()[0][0]
+ self.assertIsNotNone(pwd)
+ self.assertTrue(str(pwd))
class LDAPFeedUserDeletionTC(LDAPFeedTestBase):
"""
A testcase for situations where users are deleted from or
- unavailabe in the LDAP database.
+ unavailable in the LDAP database.
"""
+
def test_a_filter_inactivate(self):
""" filtered out people should be deactivated, unable to authenticate """
- source = self.session.execute('CWSource S WHERE S type="ldapfeed"').get_entity(0,0)
- config = source.repo_source.check_config(source)
- # filter with adim's phone number
- config['user-filter'] = u'(%s=%s)' % ('telephoneNumber', '109')
- source.repo_source.update_config(source, config)
- self.commit()
- self.pull()
+ with self.admin_access.repo_cnx() as cnx:
+ source = cnx.execute('CWSource S WHERE S type="ldapfeed"').get_entity(0,0)
+ config = source.repo_source.check_config(source)
+ # filter with adim's phone number
+ config['user-filter'] = u'(%s=%s)' % ('telephoneNumber', '109')
+ source.repo_source.update_config(source, config)
+ cnx.commit()
+ with self.repo.internal_cnx() as cnx:
+ self.pull(cnx)
self.assertRaises(AuthenticationError, self.repo.connect, 'syt', password='syt')
- self.assertEqual(self.execute('Any N WHERE U login "syt", '
- 'U in_state S, S name N').rows[0][0],
- 'deactivated')
- self.assertEqual(self.execute('Any N WHERE U login "adim", '
- 'U in_state S, S name N').rows[0][0],
- 'activated')
- # unfilter, syt should be activated again
- config['user-filter'] = u''
- source.repo_source.update_config(source, config)
- self.commit()
- self.pull()
- self.assertEqual(self.execute('Any N WHERE U login "syt", '
- 'U in_state S, S name N').rows[0][0],
- 'activated')
- self.assertEqual(self.execute('Any N WHERE U login "adim", '
- 'U in_state S, S name N').rows[0][0],
- 'activated')
+ with self.admin_access.repo_cnx() as cnx:
+ self.assertEqual(cnx.execute('Any N WHERE U login "syt", '
+ 'U in_state S, S name N').rows[0][0],
+ 'deactivated')
+ self.assertEqual(cnx.execute('Any N WHERE U login "adim", '
+ 'U in_state S, S name N').rows[0][0],
+ 'activated')
+ # unfilter, syt should be activated again
+ config['user-filter'] = u''
+ source.repo_source.update_config(source, config)
+ cnx.commit()
+ with self.repo.internal_cnx() as cnx:
+ self.pull(cnx)
+ with self.admin_access.repo_cnx() as cnx:
+ self.assertEqual(cnx.execute('Any N WHERE U login "syt", '
+ 'U in_state S, S name N').rows[0][0],
+ 'activated')
+ self.assertEqual(cnx.execute('Any N WHERE U login "adim", '
+ 'U in_state S, S name N').rows[0][0],
+ 'activated')
def test_delete(self):
""" delete syt, pull, check deactivation, repull,
read syt, pull, check activation
"""
self.delete_ldap_entry('uid=syt,ou=People,dc=cubicweb,dc=test')
- self.pull()
+ with self.repo.internal_cnx() as cnx:
+ self.pull(cnx)
self.assertRaises(AuthenticationError, self.repo.connect, 'syt', password='syt')
- self.assertEqual(self.execute('Any N WHERE U login "syt", '
- 'U in_state S, S name N').rows[0][0],
- 'deactivated')
- # check that it doesn't choke
- self.pull()
+ with self.admin_access.repo_cnx() as cnx:
+ self.assertEqual(cnx.execute('Any N WHERE U login "syt", '
+ 'U in_state S, S name N').rows[0][0],
+ 'deactivated')
+ with self.repo.internal_cnx() as cnx:
+ # check that it doesn't choke
+ self.pull(cnx)
# reinsert syt
self.add_ldap_entry('uid=syt,ou=People,dc=cubicweb,dc=test',
{ 'objectClass': ['OpenLDAPperson','posixAccount','top','shadowAccount'],
@@ -353,76 +360,88 @@
'gecos': 'Sylvain Thenault',
'mail': ['sylvain.thenault@logilab.fr','syt@logilab.fr'],
'userPassword': 'syt',
- })
- self.pull()
- self.assertEqual(self.execute('Any N WHERE U login "syt", '
- 'U in_state S, S name N').rows[0][0],
- 'activated')
+ })
+ with self.repo.internal_cnx() as cnx:
+ self.pull(cnx)
+ with self.admin_access.repo_cnx() as cnx:
+ self.assertEqual(cnx.execute('Any N WHERE U login "syt", '
+ 'U in_state S, S name N').rows[0][0],
+ 'activated')
def test_reactivate_deleted(self):
# test reactivating BY HAND the user isn't enough to
# authenticate, as the native source refuse to authenticate
# user from other sources
self.delete_ldap_entry('uid=syt,ou=People,dc=cubicweb,dc=test')
- self.pull()
- # reactivate user (which source is still ldap-feed)
- user = self.execute('CWUser U WHERE U login "syt"').get_entity(0, 0)
- user.cw_adapt_to('IWorkflowable').fire_transition('activate')
- self.commit()
- with self.assertRaises(AuthenticationError):
- self.repo.connect('syt', password='syt')
+ with self.repo.internal_cnx() as cnx:
+ self.pull(cnx)
+ with self.admin_access.repo_cnx() as cnx:
+ # reactivate user (which source is still ldap-feed)
+ user = cnx.execute('CWUser U WHERE U login "syt"').get_entity(0, 0)
+ user.cw_adapt_to('IWorkflowable').fire_transition('activate')
+ cnx.commit()
+ with self.assertRaises(AuthenticationError):
+ self.repo.connect('syt', password='syt')
- # ok now let's try to make it a system user
- self.sexecute('SET X cw_source S WHERE X eid %(x)s, S name "system"', {'x': user.eid})
- self.commit()
+ # ok now let's try to make it a system user
+ cnx.execute('SET X cw_source S WHERE X eid %(x)s, S name "system"', {'x': user.eid})
+ cnx.commit()
# and that we can now authenticate again
self.assertRaises(AuthenticationError, self.repo.connect, 'syt', password='toto')
self.assertTrue(self.repo.connect('syt', password='syt'))
+
class LDAPFeedGroupTC(LDAPFeedTestBase):
"""
A testcase for group support in ldapfeed.
"""
def test_groups_exist(self):
- rset = self.sexecute('CWGroup X WHERE X name "dir"')
- self.assertEqual(len(rset), 1)
+ with self.admin_access.repo_cnx() as cnx:
+ rset = cnx.execute('CWGroup X WHERE X name "dir"')
+ self.assertEqual(len(rset), 1)
- rset = self.sexecute('CWGroup X WHERE X cw_source S, S name "ldap"')
- self.assertEqual(len(rset), 2)
+ rset = cnx.execute('CWGroup X WHERE X cw_source S, S name "ldap"')
+ self.assertEqual(len(rset), 2)
def test_group_deleted(self):
- rset = self.sexecute('CWGroup X WHERE X name "dir"')
- self.assertEqual(len(rset), 1)
+ with self.admin_access.repo_cnx() as cnx:
+ rset = cnx.execute('CWGroup X WHERE X name "dir"')
+ self.assertEqual(len(rset), 1)
def test_in_group(self):
- rset = self.sexecute('CWGroup X WHERE X name %(name)s', {'name': 'dir'})
- dirgroup = rset.get_entity(0, 0)
- self.assertEqual(set(['syt', 'adim']),
- set([u.login for u in dirgroup.reverse_in_group]))
- rset = self.sexecute('CWGroup X WHERE X name %(name)s', {'name': 'logilab'})
- logilabgroup = rset.get_entity(0, 0)
- self.assertEqual(set(['adim']),
- set([u.login for u in logilabgroup.reverse_in_group]))
+ with self.admin_access.repo_cnx() as cnx:
+ rset = cnx.execute('CWGroup X WHERE X name %(name)s', {'name': 'dir'})
+ dirgroup = rset.get_entity(0, 0)
+ self.assertEqual(set(['syt', 'adim']),
+ set([u.login for u in dirgroup.reverse_in_group]))
+ rset = cnx.execute('CWGroup X WHERE X name %(name)s', {'name': 'logilab'})
+ logilabgroup = rset.get_entity(0, 0)
+ self.assertEqual(set(['adim']),
+ set([u.login for u in logilabgroup.reverse_in_group]))
def test_group_member_added(self):
- self.pull()
- rset = self.sexecute('Any L WHERE U in_group G, G name %(name)s, U login L',
- {'name': 'logilab'})
- self.assertEqual(len(rset), 1)
- self.assertEqual(rset[0][0], 'adim')
+ with self.repo.internal_cnx() as cnx:
+ self.pull(cnx)
+ with self.admin_access.repo_cnx() as cnx:
+ rset = cnx.execute('Any L WHERE U in_group G, G name %(name)s, U login L',
+ {'name': 'logilab'})
+ self.assertEqual(len(rset), 1)
+ self.assertEqual(rset[0][0], 'adim')
try:
self.update_ldap_entry('cn=logilab,ou=Group,dc=cubicweb,dc=test',
- {('add', 'memberUid'): ['syt']})
+ {('add', 'memberUid'): ['syt']})
time.sleep(1.1) # timestamps precision is 1s
- self.pull()
+ with self.repo.internal_cnx() as cnx:
+ self.pull(cnx)
- rset = self.sexecute('Any L WHERE U in_group G, G name %(name)s, U login L',
- {'name': 'logilab'})
- self.assertEqual(len(rset), 2)
- members = set([u[0] for u in rset])
- self.assertEqual(set(['adim', 'syt']), members)
+ with self.admin_access.repo_cnx() as cnx:
+ rset = cnx.execute('Any L WHERE U in_group G, G name %(name)s, U login L',
+ {'name': 'logilab'})
+ self.assertEqual(len(rset), 2)
+ members = set([u[0] for u in rset])
+ self.assertEqual(set(['adim', 'syt']), members)
finally:
# back to normal ldap setup
@@ -430,21 +449,25 @@
self.setUpClass()
def test_group_member_deleted(self):
- self.pull() # ensure we are sync'ed
- rset = self.sexecute('Any L WHERE U in_group G, G name %(name)s, U login L',
- {'name': 'logilab'})
- self.assertEqual(len(rset), 1)
- self.assertEqual(rset[0][0], 'adim')
+ with self.repo.internal_cnx() as cnx:
+ self.pull(cnx) # ensure we are sync'ed
+ with self.admin_access.repo_cnx() as cnx:
+ rset = cnx.execute('Any L WHERE U in_group G, G name %(name)s, U login L',
+ {'name': 'logilab'})
+ self.assertEqual(len(rset), 1)
+ self.assertEqual(rset[0][0], 'adim')
try:
self.update_ldap_entry('cn=logilab,ou=Group,dc=cubicweb,dc=test',
{('delete', 'memberUid'): ['adim']})
time.sleep(1.1) # timestamps precision is 1s
- self.pull()
+ with self.repo.internal_cnx() as cnx:
+ self.pull(cnx)
- rset = self.sexecute('Any L WHERE U in_group G, G name %(name)s, U login L',
- {'name': 'logilab'})
- self.assertEqual(len(rset), 0)
+ with self.admin_access.repo_cnx() as cnx:
+ rset = cnx.execute('Any L WHERE U in_group G, G name %(name)s, U login L',
+ {'name': 'logilab'})
+ self.assertEqual(len(rset), 0)
finally:
# back to normal ldap setup
self.tearDownClass()