[tests/ldap] use the new connection api
authorAurelien Campeas <aurelien.campeas@logilab.fr>
Fri, 30 May 2014 17:24:44 +0200
changeset 9793 52647b05bda8
parent 9792 1349398d744e
child 9794 61da050d11e4
[tests/ldap] use the new connection api
server/test/unittest_ldapsource.py
--- a/server/test/unittest_ldapsource.py	Fri Jun 13 12:56:45 2014 +0200
+++ b/server/test/unittest_ldapsource.py	Fri May 30 17:24:44 2014 +0200
@@ -122,32 +122,29 @@
             pass
 
     @classmethod
-    def pre_setup_database(cls, session, config):
-        session.create_entity('CWSource', name=u'ldap', type=u'ldapfeed', parser=u'ldapfeed',
-                              url=URL, config=CONFIG_LDAPFEED)
+    def pre_setup_database(cls, cnx, config):
+        cnx.create_entity('CWSource', name=u'ldap', type=u'ldapfeed', parser=u'ldapfeed',
+                          url=URL, config=CONFIG_LDAPFEED)
 
-        session.commit()
-        return cls._pull(session)
+        cnx.commit()
+        return cls.pull(cnx)
 
     @classmethod
-    def _pull(cls, session):
-        with session.repo.internal_session() as isession:
-            lfsource = isession.repo.sources_by_uri['ldap']
-            stats = lfsource.pull_data(isession, force=True, raise_on_error=True)
-            isession.commit()
-            return stats
-
-    def pull(self):
-        return self._pull(self.session)
+    def pull(self, cnx):
+        lfsource = cnx.repo.sources_by_uri['ldap']
+        stats = lfsource.pull_data(cnx, force=True, raise_on_error=True)
+        cnx.commit()
+        return stats
 
     def setup_database(self):
-        with self.session.repo.internal_session(safe=True) as session:
-            session.execute('DELETE Any E WHERE E cw_source S, S name "ldap"')
-            session.execute('SET S config %(conf)s, S url %(url)s '
-                            'WHERE S is CWSource, S name "ldap"',
-                            {"conf": CONFIG_LDAPFEED, 'url': URL} )
-            session.commit()
-        self.pull()
+        with self.admin_access.repo_cnx() as cnx:
+            cnx.execute('DELETE Any E WHERE E cw_source S, S name "ldap"')
+            cnx.execute('SET S config %(conf)s, S url %(url)s '
+                        'WHERE S is CWSource, S name "ldap"',
+                        {"conf": CONFIG_LDAPFEED, 'url': URL} )
+            cnx.commit()
+        with self.repo.internal_cnx() as cnx:
+            self.pull(cnx)
 
     def add_ldap_entry(self, dn, mods):
         """
@@ -200,16 +197,16 @@
     """
 
     def test_wrong_group(self):
-        with self.session.repo.internal_session(safe=True) as session:
-            source = self.session.execute('CWSource S WHERE S type="ldapfeed"').get_entity(0,0)
+        with self.admin_access.repo_cnx() as cnx:
+            source = cnx.execute('CWSource S WHERE S type="ldapfeed"').get_entity(0,0)
             config = source.repo_source.check_config(source)
             # inject a bogus group here, along with at least a valid one
             config['user-default-group'] = ('thisgroupdoesnotexists','users')
             source.repo_source.update_config(source, config)
-            session.commit(free_cnxset=False)
+            cnx.commit()
             # here we emitted an error log entry
-            stats = source.repo_source.pull_data(session, force=True, raise_on_error=True)
-            session.commit()
+            stats = source.repo_source.pull_data(cnx, force=True, raise_on_error=True)
+            cnx.commit()
 
 
 
@@ -224,119 +221,129 @@
 
     def test_authenticate(self):
         source = self.repo.sources_by_uri['ldap']
-        self.session.set_cnxset()
-        # ensure we won't be logged against
-        self.assertRaises(AuthenticationError,
-                          source.authenticate, self.session, 'toto', 'toto')
-        self.assertTrue(source.authenticate(self.session, 'syt', 'syt'))
+        with self.admin_access.repo_cnx() as cnx:
+            # ensure we won't be logged against
+            self.assertRaises(AuthenticationError,
+                              source.authenticate, cnx, 'toto', 'toto')
+            self.assertTrue(source.authenticate(cnx, 'syt', 'syt'))
         self.assertTrue(self.repo.connect('syt', password='syt'))
 
     def test_base(self):
-        # check a known one
-        rset = self.sexecute('CWUser X WHERE X login %(login)s', {'login': 'syt'})
-        e = rset.get_entity(0, 0)
-        self.assertEqual(e.login, 'syt')
-        e.complete()
-        self.assertMetadata(e)
-        self.assertEqual(e.firstname, None)
-        self.assertEqual(e.surname, None)
-        self.assertIn('users', set(g.name for g in e.in_group))
-        self.assertEqual(e.owned_by[0].login, 'syt')
-        self.assertEqual(e.created_by, ())
-        addresses = [pe.address for pe in e.use_email]
-        addresses.sort()
-        self.assertEqual(['sylvain.thenault@logilab.fr', 'syt@logilab.fr'],
-                         addresses)
-        self.assertIn(e.primary_email[0].address, ['sylvain.thenault@logilab.fr',
-                                                   'syt@logilab.fr'])
-        # email content should be indexed on the user
-        rset = self.sexecute('CWUser X WHERE X has_text "thenault"')
-        self.assertEqual(rset.rows, [[e.eid]])
+        with self.admin_access.repo_cnx() as cnx:
+            # check a known one
+            rset = cnx.execute('CWUser X WHERE X login %(login)s', {'login': 'syt'})
+            e = rset.get_entity(0, 0)
+            self.assertEqual(e.login, 'syt')
+            e.complete()
+            self.assertMetadata(e)
+            self.assertEqual(e.firstname, None)
+            self.assertEqual(e.surname, None)
+            self.assertIn('users', set(g.name for g in e.in_group))
+            self.assertEqual(e.owned_by[0].login, 'syt')
+            self.assertEqual(e.created_by, ())
+            addresses = [pe.address for pe in e.use_email]
+            addresses.sort()
+            self.assertEqual(['sylvain.thenault@logilab.fr', 'syt@logilab.fr'],
+                             addresses)
+            self.assertIn(e.primary_email[0].address, ['sylvain.thenault@logilab.fr',
+                                                       'syt@logilab.fr'])
+            # email content should be indexed on the user
+            rset = cnx.execute('CWUser X WHERE X has_text "thenault"')
+            self.assertEqual(rset.rows, [[e.eid]])
 
     def test_copy_to_system_source(self):
         "make sure we can 'convert' an LDAP user into a system one"
-        source = self.repo.sources_by_uri['ldap']
-        eid = self.sexecute('CWUser X WHERE X login %(login)s', {'login': 'syt'})[0][0]
-        self.sexecute('SET X cw_source S WHERE X eid %(x)s, S name "system"', {'x': eid})
-        self.commit()
-        source.reset_caches()
-        rset = self.sexecute('CWUser X WHERE X login %(login)s', {'login': 'syt'})
-        self.assertEqual(len(rset), 1)
-        e = rset.get_entity(0, 0)
-        self.assertEqual(e.eid, eid)
-        self.assertEqual(e.cw_metainformation(), {'source': {'type': u'native',
-                                                             'uri': u'system',
-                                                             'use-cwuri-as-url': False},
-                                                  'type': 'CWUser',
-                                                  'extid': None})
-        self.assertEqual(e.cw_source[0].name, 'system')
-        self.assertTrue(e.creation_date)
-        self.assertTrue(e.modification_date)
-        source.pull_data(self.session)
-        rset = self.sexecute('CWUser X WHERE X login %(login)s', {'login': 'syt'})
-        self.assertEqual(len(rset), 1)
-        self.assertTrue(self.repo.system_source.authenticate(
-                self.session, 'syt', password='syt'))
-        # make sure the pull from ldap have not "reverted" user as a ldap-feed user
-        self.assertEqual(e.cw_metainformation(), {'source': {'type': u'native',
-                                                             'uri': u'system',
-                                                             'use-cwuri-as-url': False},
-                                                  'type': 'CWUser',
-                                                  'extid': None})
-        # and that the password stored in the system source is not empty or so
-        user = self.execute('CWUser U WHERE U login "syt"').get_entity(0, 0)
-        user.cw_clear_all_caches()
-        pwd = self.session.system_sql("SELECT cw_upassword FROM cw_cwuser WHERE cw_login='syt';").fetchall()[0][0]
-        self.assertIsNotNone(pwd)
-        self.assertTrue(str(pwd))
+        with self.admin_access.repo_cnx() as cnx:
+            source = self.repo.sources_by_uri['ldap']
+            eid = cnx.execute('CWUser X WHERE X login %(login)s', {'login': 'syt'})[0][0]
+            cnx.execute('SET X cw_source S WHERE X eid %(x)s, S name "system"', {'x': eid})
+            cnx.commit()
+            source.reset_caches()
+            rset = cnx.execute('CWUser X WHERE X login %(login)s', {'login': 'syt'})
+            self.assertEqual(len(rset), 1)
+            e = rset.get_entity(0, 0)
+            self.assertEqual(e.eid, eid)
+            self.assertEqual(e.cw_metainformation(), {'source': {'type': u'native',
+                                                                 'uri': u'system',
+                                                                 'use-cwuri-as-url': False},
+                                                      'type': 'CWUser',
+                                                      'extid': None})
+            self.assertEqual(e.cw_source[0].name, 'system')
+            self.assertTrue(e.creation_date)
+            self.assertTrue(e.modification_date)
+            source.pull_data(cnx)
+            rset = cnx.execute('CWUser X WHERE X login %(login)s', {'login': 'syt'})
+            self.assertEqual(len(rset), 1)
+            self.assertTrue(self.repo.system_source.authenticate(cnx, 'syt', password='syt'))
+            # make sure the pull from ldap have not "reverted" user as a ldap-feed user
+            self.assertEqual(e.cw_metainformation(), {'source': {'type': u'native',
+                                                                 'uri': u'system',
+                                                                 'use-cwuri-as-url': False},
+                                                      'type': 'CWUser',
+                                                      'extid': None})
+            # and that the password stored in the system source is not empty or so
+            user = cnx.execute('CWUser U WHERE U login "syt"').get_entity(0, 0)
+            user.cw_clear_all_caches()
+            pwd = cnx.system_sql("SELECT cw_upassword FROM cw_cwuser WHERE cw_login='syt';").fetchall()[0][0]
+            self.assertIsNotNone(pwd)
+            self.assertTrue(str(pwd))
 
 
 
 class LDAPFeedUserDeletionTC(LDAPFeedTestBase):
     """
     A testcase for situations where users are deleted from or
-    unavailabe in the LDAP database.
+    unavailable in the LDAP database.
     """
+
     def test_a_filter_inactivate(self):
         """ filtered out people should be deactivated, unable to authenticate """
-        source = self.session.execute('CWSource S WHERE S type="ldapfeed"').get_entity(0,0)
-        config = source.repo_source.check_config(source)
-        # filter with adim's phone number
-        config['user-filter'] = u'(%s=%s)' % ('telephoneNumber', '109')
-        source.repo_source.update_config(source, config)
-        self.commit()
-        self.pull()
+        with self.admin_access.repo_cnx() as cnx:
+            source = cnx.execute('CWSource S WHERE S type="ldapfeed"').get_entity(0,0)
+            config = source.repo_source.check_config(source)
+            # filter with adim's phone number
+            config['user-filter'] = u'(%s=%s)' % ('telephoneNumber', '109')
+            source.repo_source.update_config(source, config)
+            cnx.commit()
+        with self.repo.internal_cnx() as cnx:
+            self.pull(cnx)
         self.assertRaises(AuthenticationError, self.repo.connect, 'syt', password='syt')
-        self.assertEqual(self.execute('Any N WHERE U login "syt", '
-                                      'U in_state S, S name N').rows[0][0],
-                         'deactivated')
-        self.assertEqual(self.execute('Any N WHERE U login "adim", '
-                                      'U in_state S, S name N').rows[0][0],
-                         'activated')
-        # unfilter, syt should be activated again
-        config['user-filter'] = u''
-        source.repo_source.update_config(source, config)
-        self.commit()
-        self.pull()
-        self.assertEqual(self.execute('Any N WHERE U login "syt", '
-                                      'U in_state S, S name N').rows[0][0],
-                         'activated')
-        self.assertEqual(self.execute('Any N WHERE U login "adim", '
-                                      'U in_state S, S name N').rows[0][0],
-                         'activated')
+        with self.admin_access.repo_cnx() as cnx:
+            self.assertEqual(cnx.execute('Any N WHERE U login "syt", '
+                                         'U in_state S, S name N').rows[0][0],
+                             'deactivated')
+            self.assertEqual(cnx.execute('Any N WHERE U login "adim", '
+                                         'U in_state S, S name N').rows[0][0],
+                             'activated')
+            # unfilter, syt should be activated again
+            config['user-filter'] = u''
+            source.repo_source.update_config(source, config)
+            cnx.commit()
+        with self.repo.internal_cnx() as cnx:
+            self.pull(cnx)
+        with self.admin_access.repo_cnx() as cnx:
+            self.assertEqual(cnx.execute('Any N WHERE U login "syt", '
+                                         'U in_state S, S name N').rows[0][0],
+                             'activated')
+            self.assertEqual(cnx.execute('Any N WHERE U login "adim", '
+                                         'U in_state S, S name N').rows[0][0],
+                             'activated')
 
     def test_delete(self):
         """ delete syt, pull, check deactivation, repull,
         read syt, pull, check activation
         """
         self.delete_ldap_entry('uid=syt,ou=People,dc=cubicweb,dc=test')
-        self.pull()
+        with self.repo.internal_cnx() as cnx:
+            self.pull(cnx)
         self.assertRaises(AuthenticationError, self.repo.connect, 'syt', password='syt')
-        self.assertEqual(self.execute('Any N WHERE U login "syt", '
-                                      'U in_state S, S name N').rows[0][0],
-                         'deactivated')
-        # check that it doesn't choke
-        self.pull()
+        with self.admin_access.repo_cnx() as cnx:
+            self.assertEqual(cnx.execute('Any N WHERE U login "syt", '
+                                         'U in_state S, S name N').rows[0][0],
+                             'deactivated')
+        with self.repo.internal_cnx() as cnx:
+            # check that it doesn't choke
+            self.pull(cnx)
         # reinsert syt
         self.add_ldap_entry('uid=syt,ou=People,dc=cubicweb,dc=test',
                             { 'objectClass': ['OpenLDAPperson','posixAccount','top','shadowAccount'],
@@ -353,76 +360,88 @@
                               'gecos': 'Sylvain Thenault',
                               'mail': ['sylvain.thenault@logilab.fr','syt@logilab.fr'],
                               'userPassword': 'syt',
-                             })
-        self.pull()
-        self.assertEqual(self.execute('Any N WHERE U login "syt", '
-                                      'U in_state S, S name N').rows[0][0],
-                         'activated')
+                          })
+        with self.repo.internal_cnx() as cnx:
+            self.pull(cnx)
+        with self.admin_access.repo_cnx() as cnx:
+            self.assertEqual(cnx.execute('Any N WHERE U login "syt", '
+                                         'U in_state S, S name N').rows[0][0],
+                             'activated')
 
     def test_reactivate_deleted(self):
         # test reactivating BY HAND the user isn't enough to
         # authenticate, as the native source refuse to authenticate
         # user from other sources
         self.delete_ldap_entry('uid=syt,ou=People,dc=cubicweb,dc=test')
-        self.pull()
-        # reactivate user (which source is still ldap-feed)
-        user = self.execute('CWUser U WHERE U login "syt"').get_entity(0, 0)
-        user.cw_adapt_to('IWorkflowable').fire_transition('activate')
-        self.commit()
-        with self.assertRaises(AuthenticationError):
-            self.repo.connect('syt', password='syt')
+        with self.repo.internal_cnx() as cnx:
+            self.pull(cnx)
+        with self.admin_access.repo_cnx() as cnx:
+            # reactivate user (which source is still ldap-feed)
+            user = cnx.execute('CWUser U WHERE U login "syt"').get_entity(0, 0)
+            user.cw_adapt_to('IWorkflowable').fire_transition('activate')
+            cnx.commit()
+            with self.assertRaises(AuthenticationError):
+                self.repo.connect('syt', password='syt')
 
-        # ok now let's try to make it a system user
-        self.sexecute('SET X cw_source S WHERE X eid %(x)s, S name "system"', {'x': user.eid})
-        self.commit()
+            # ok now let's try to make it a system user
+            cnx.execute('SET X cw_source S WHERE X eid %(x)s, S name "system"', {'x': user.eid})
+            cnx.commit()
         # and that we can now authenticate again
         self.assertRaises(AuthenticationError, self.repo.connect, 'syt', password='toto')
         self.assertTrue(self.repo.connect('syt', password='syt'))
 
+
 class LDAPFeedGroupTC(LDAPFeedTestBase):
     """
     A testcase for group support in ldapfeed.
     """
 
     def test_groups_exist(self):
-        rset = self.sexecute('CWGroup X WHERE X name "dir"')
-        self.assertEqual(len(rset), 1)
+        with self.admin_access.repo_cnx() as cnx:
+            rset = cnx.execute('CWGroup X WHERE X name "dir"')
+            self.assertEqual(len(rset), 1)
 
-        rset = self.sexecute('CWGroup X WHERE X cw_source S, S name "ldap"')
-        self.assertEqual(len(rset), 2)
+            rset = cnx.execute('CWGroup X WHERE X cw_source S, S name "ldap"')
+            self.assertEqual(len(rset), 2)
 
     def test_group_deleted(self):
-        rset = self.sexecute('CWGroup X WHERE X name "dir"')
-        self.assertEqual(len(rset), 1)
+        with self.admin_access.repo_cnx() as cnx:
+            rset = cnx.execute('CWGroup X WHERE X name "dir"')
+            self.assertEqual(len(rset), 1)
 
     def test_in_group(self):
-        rset = self.sexecute('CWGroup X WHERE X name %(name)s', {'name': 'dir'})
-        dirgroup = rset.get_entity(0, 0)
-        self.assertEqual(set(['syt', 'adim']),
-                         set([u.login for u in dirgroup.reverse_in_group]))
-        rset = self.sexecute('CWGroup X WHERE X name %(name)s', {'name': 'logilab'})
-        logilabgroup = rset.get_entity(0, 0)
-        self.assertEqual(set(['adim']),
-                         set([u.login for u in logilabgroup.reverse_in_group]))
+        with self.admin_access.repo_cnx() as cnx:
+            rset = cnx.execute('CWGroup X WHERE X name %(name)s', {'name': 'dir'})
+            dirgroup = rset.get_entity(0, 0)
+            self.assertEqual(set(['syt', 'adim']),
+                             set([u.login for u in dirgroup.reverse_in_group]))
+            rset = cnx.execute('CWGroup X WHERE X name %(name)s', {'name': 'logilab'})
+            logilabgroup = rset.get_entity(0, 0)
+            self.assertEqual(set(['adim']),
+                             set([u.login for u in logilabgroup.reverse_in_group]))
 
     def test_group_member_added(self):
-        self.pull()
-        rset = self.sexecute('Any L WHERE U in_group G, G name %(name)s, U login L',
-                             {'name': 'logilab'})
-        self.assertEqual(len(rset), 1)
-        self.assertEqual(rset[0][0], 'adim')
+        with self.repo.internal_cnx() as cnx:
+            self.pull(cnx)
+        with self.admin_access.repo_cnx() as cnx:
+            rset = cnx.execute('Any L WHERE U in_group G, G name %(name)s, U login L',
+                               {'name': 'logilab'})
+            self.assertEqual(len(rset), 1)
+            self.assertEqual(rset[0][0], 'adim')
 
         try:
             self.update_ldap_entry('cn=logilab,ou=Group,dc=cubicweb,dc=test',
-                                   {('add', 'memberUid'): ['syt']})
+                                       {('add', 'memberUid'): ['syt']})
             time.sleep(1.1) # timestamps precision is 1s
-            self.pull()
+            with self.repo.internal_cnx() as cnx:
+                self.pull(cnx)
 
-            rset = self.sexecute('Any L WHERE U in_group G, G name %(name)s, U login L',
-                                 {'name': 'logilab'})
-            self.assertEqual(len(rset), 2)
-            members = set([u[0] for u in rset])
-            self.assertEqual(set(['adim', 'syt']), members)
+            with self.admin_access.repo_cnx() as cnx:
+                rset = cnx.execute('Any L WHERE U in_group G, G name %(name)s, U login L',
+                                   {'name': 'logilab'})
+                self.assertEqual(len(rset), 2)
+                members = set([u[0] for u in rset])
+                self.assertEqual(set(['adim', 'syt']), members)
 
         finally:
             # back to normal ldap setup
@@ -430,21 +449,25 @@
             self.setUpClass()
 
     def test_group_member_deleted(self):
-        self.pull() # ensure we are sync'ed
-        rset = self.sexecute('Any L WHERE U in_group G, G name %(name)s, U login L',
-                             {'name': 'logilab'})
-        self.assertEqual(len(rset), 1)
-        self.assertEqual(rset[0][0], 'adim')
+        with self.repo.internal_cnx() as cnx:
+            self.pull(cnx) # ensure we are sync'ed
+        with self.admin_access.repo_cnx() as cnx:
+            rset = cnx.execute('Any L WHERE U in_group G, G name %(name)s, U login L',
+                               {'name': 'logilab'})
+            self.assertEqual(len(rset), 1)
+            self.assertEqual(rset[0][0], 'adim')
 
         try:
             self.update_ldap_entry('cn=logilab,ou=Group,dc=cubicweb,dc=test',
                                    {('delete', 'memberUid'): ['adim']})
             time.sleep(1.1) # timestamps precision is 1s
-            self.pull()
+            with self.repo.internal_cnx() as cnx:
+                self.pull(cnx)
 
-            rset = self.sexecute('Any L WHERE U in_group G, G name %(name)s, U login L',
-                                 {'name': 'logilab'})
-            self.assertEqual(len(rset), 0)
+            with self.admin_access.repo_cnx() as cnx:
+                rset = cnx.execute('Any L WHERE U in_group G, G name %(name)s, U login L',
+                                   {'name': 'logilab'})
+                self.assertEqual(len(rset), 0)
         finally:
             # back to normal ldap setup
             self.tearDownClass()