server/test/unittest_ldapuser.py
changeset 8188 1867e252e487
parent 8146 67b9b273b70d
child 8229 b7bc631816f7
--- a/server/test/unittest_ldapuser.py	Thu Feb 02 14:30:07 2012 +0100
+++ b/server/test/unittest_ldapuser.py	Tue Jan 31 21:43:24 2012 +0100
@@ -1,4 +1,4 @@
-# copyright 2003-2011 LOGILAB S.A. (Paris, FRANCE), all rights reserved.
+# copyright 2003-2012 LOGILAB S.A. (Paris, FRANCE), all rights reserved.
 # contact http://www.logilab.fr/ -- mailto:contact@logilab.fr
 #
 # This file is part of CubicWeb.
@@ -25,6 +25,8 @@
 from socket import socket, error as socketerror
 
 from logilab.common.testlib import TestCase, unittest_main, mock_object, Tags
+
+from cubicweb import AuthenticationError
 from cubicweb.devtools.testlib import CubicWebTC
 from cubicweb.devtools.repotest import RQLGeneratorTC
 from cubicweb.devtools.httptest import get_available_port
@@ -32,15 +34,14 @@
 
 from cubicweb.server.sources.ldapuser import *
 
-CONFIG = u'''host=%s
-user-base-dn=ou=People,dc=cubicweb,dc=test
+CONFIG = u'''user-base-dn=ou=People,dc=cubicweb,dc=test
 user-scope=ONELEVEL
 user-classes=top,posixAccount
 user-login-attr=uid
 user-default-group=users
 user-attrs-map=gecos:email,uid:login
 '''
-
+URL = None
 
 def setUpModule(*args):
     create_slapd_configuration(LDAPUserSourceTC.config)
@@ -49,7 +50,7 @@
     terminate_slapd()
 
 def create_slapd_configuration(config):
-    global slapd_process, CONFIG
+    global slapd_process, URL
     basedir = join(config.apphome, "ldapdb")
     slapdconf = join(config.apphome, "slapd.conf")
     confin = file(join(config.apphome, "slapd.conf.in")).read()
@@ -78,7 +79,7 @@
     else:
         raise EnvironmentError('Cannot start slapd with cmdline="%s" (from directory "%s")' %
                                (" ".join(cmdline), os.getcwd()))
-    CONFIG = CONFIG % host
+    URL = u'ldap://%s' % host
 
 def terminate_slapd():
     global slapd_process
@@ -93,23 +94,92 @@
         print "DONE"
     del slapd_process
 
-class LDAPUserSourceTC(CubicWebTC):
+
+
+
+class LDAPFeedSourceTC(CubicWebTC):
+    test_db_id = 'ldap-feed'
+
+    @classmethod
+    def pre_setup_database(cls, session, config):
+        session.create_entity('CWSource', name=u'ldapuser', type=u'ldapfeed', parser=u'ldapfeed',
+                              url=URL, config=CONFIG)
+        session.commit()
+        isession = session.repo.internal_session()
+        lfsource = isession.repo.sources_by_uri['ldapuser']
+        stats = lfsource.pull_data(isession, force=True, raise_on_error=True)
+
+    def setUp(self):
+        super(LDAPFeedSourceTC, self).setUp()
+        # ldap source url in the database may use a different port as the one
+        # just attributed
+        lfsource = self.repo.sources_by_uri['ldapuser']
+        lfsource.urls = [URL]
+
+    def assertMetadata(self, entity):
+        self.assertTrue(entity.creation_date)
+        self.assertTrue(entity.modification_date)
+
+    def test_authenticate(self):
+        source = self.repo.sources_by_uri['ldapuser']
+        self.session.set_cnxset()
+        self.assertRaises(AuthenticationError,
+                          source.authenticate, self.session, 'toto', 'toto')
+
+    def test_base(self):
+        # check a known one
+        rset = self.sexecute('CWUser X WHERE X login %(login)s', {'login': 'syt'})
+        e = rset.get_entity(0, 0)
+        self.assertEqual(e.login, 'syt')
+        e.complete()
+        self.assertMetadata(e)
+        self.assertEqual(e.firstname, None)
+        self.assertEqual(e.surname, None)
+        self.assertEqual(e.in_group[0].name, 'users')
+        self.assertEqual(e.owned_by[0].login, 'syt')
+        self.assertEqual(e.created_by, ())
+        self.assertEqual(e.primary_email[0].address, 'Sylvain Thenault')
+        # email content should be indexed on the user
+        rset = self.sexecute('CWUser X WHERE X has_text "thenault"')
+        self.assertEqual(rset.rows, [[e.eid]])
+
+    def test_copy_to_system_source(self):
+        source = self.repo.sources_by_uri['ldapuser']
+        eid = self.sexecute('CWUser X WHERE X login %(login)s', {'login': 'syt'})[0][0]
+        self.sexecute('SET X cw_source S WHERE X eid %(x)s, S name "system"', {'x': eid})
+        self.commit()
+        source.reset_caches()
+        rset = self.sexecute('CWUser X WHERE X login %(login)s', {'login': 'syt'})
+        self.assertEqual(len(rset), 1)
+        e = rset.get_entity(0, 0)
+        self.assertEqual(e.eid, eid)
+        self.assertEqual(e.cw_metainformation(), {'source': {'type': u'native', 'uri': u'system', 'use-cwuri-as-url': False},
+                                                  'type': 'CWUser',
+                                                  'extid': None})
+        self.assertEqual(e.cw_source[0].name, 'system')
+        self.assertTrue(e.creation_date)
+        self.assertTrue(e.modification_date)
+        # XXX test some password has been set
+        source.pull_data(self.session)
+        rset = self.sexecute('CWUser X WHERE X login %(login)s', {'login': 'syt'})
+        self.assertEqual(len(rset), 1)
+
+
+class LDAPUserSourceTC(LDAPFeedSourceTC):
     test_db_id = 'ldap-user'
     tags = CubicWebTC.tags | Tags(('ldap'))
 
     @classmethod
     def pre_setup_database(cls, session, config):
         session.create_entity('CWSource', name=u'ldapuser', type=u'ldapuser',
-                              config=CONFIG)
+                              url=URL, config=CONFIG)
         session.commit()
         # XXX keep it there
         session.execute('CWUser U')
 
-    def test_authenticate(self):
-        source = self.repo.sources_by_uri['ldapuser']
-        self.session.set_cnxset()
-        self.assertRaises(AuthenticationError,
-                          source.authenticate, self.session, 'toto', 'toto')
+    def assertMetadata(self, entity):
+        self.assertEqual(entity.creation_date, None)
+        self.assertEqual(entity.modification_date, None)
 
     def test_synchronize(self):
         source = self.repo.sources_by_uri['ldapuser']
@@ -121,8 +191,7 @@
         e = rset.get_entity(0, 0)
         self.assertEqual(e.login, 'syt')
         e.complete()
-        self.assertEqual(e.creation_date, None)
-        self.assertEqual(e.modification_date, None)
+        self.assertMetadata(e)
         self.assertEqual(e.firstname, None)
         self.assertEqual(e.surname, None)
         self.assertEqual(e.in_group[0].name, 'users')
@@ -347,27 +416,6 @@
         rset = cu.execute('Any F WHERE X has_text "iaminguestsgrouponly", X firstname F')
         self.assertEqual(rset.rows, [[None]])
 
-    def test_copy_to_system_source(self):
-        source = self.repo.sources_by_uri['ldapuser']
-        eid = self.sexecute('CWUser X WHERE X login %(login)s', {'login': 'syt'})[0][0]
-        self.sexecute('SET X cw_source S WHERE X eid %(x)s, S name "system"', {'x': eid})
-        self.commit()
-        source.reset_caches()
-        rset = self.sexecute('CWUser X WHERE X login %(login)s', {'login': 'syt'})
-        self.assertEqual(len(rset), 1)
-        e = rset.get_entity(0, 0)
-        self.assertEqual(e.eid, eid)
-        self.assertEqual(e.cw_metainformation(), {'source': {'type': u'native', 'uri': u'system', 'use-cwuri-as-url': False},
-                                                  'type': 'CWUser',
-                                                  'extid': None})
-        self.assertEqual(e.cw_source[0].name, 'system')
-        self.assertTrue(e.creation_date)
-        self.assertTrue(e.modification_date)
-        # XXX test some password has been set
-        source.synchronize()
-        rset = self.sexecute('CWUser X WHERE X login %(login)s', {'login': 'syt'})
-        self.assertEqual(len(rset), 1)
-
     def test_nonregr1(self):
         self.sexecute('Any X,AA ORDERBY AA DESC WHERE E eid %(x)s, E owned_by X, '
                      'X modification_date AA',
@@ -403,7 +451,6 @@
                      'OR (EXISTS(U in_group H, ME in_group H, NOT H name "users")), U login UL, U is CWUser)',
                      {'x': self.session.user.eid})
 
-
 class GlobTrFuncTC(TestCase):
 
     def test_count(self):
@@ -438,6 +485,7 @@
         res = trfunc.apply([[1, 2], [2, 4], [3, 6], [1, 5]])
         self.assertEqual(res, [[1, 5], [2, 4], [3, 6]])
 
+
 class RQL2LDAPFilterTC(RQLGeneratorTC):
 
     tags = RQLGeneratorTC.tags | Tags(('ldap'))