--- a/server/test/unittest_ldapuser.py Thu Feb 02 14:30:07 2012 +0100
+++ b/server/test/unittest_ldapuser.py Tue Jan 31 21:43:24 2012 +0100
@@ -1,4 +1,4 @@
-# copyright 2003-2011 LOGILAB S.A. (Paris, FRANCE), all rights reserved.
+# copyright 2003-2012 LOGILAB S.A. (Paris, FRANCE), all rights reserved.
# contact http://www.logilab.fr/ -- mailto:contact@logilab.fr
#
# This file is part of CubicWeb.
@@ -25,6 +25,8 @@
from socket import socket, error as socketerror
from logilab.common.testlib import TestCase, unittest_main, mock_object, Tags
+
+from cubicweb import AuthenticationError
from cubicweb.devtools.testlib import CubicWebTC
from cubicweb.devtools.repotest import RQLGeneratorTC
from cubicweb.devtools.httptest import get_available_port
@@ -32,15 +34,14 @@
from cubicweb.server.sources.ldapuser import *
-CONFIG = u'''host=%s
-user-base-dn=ou=People,dc=cubicweb,dc=test
+CONFIG = u'''user-base-dn=ou=People,dc=cubicweb,dc=test
user-scope=ONELEVEL
user-classes=top,posixAccount
user-login-attr=uid
user-default-group=users
user-attrs-map=gecos:email,uid:login
'''
-
+URL = None
def setUpModule(*args):
create_slapd_configuration(LDAPUserSourceTC.config)
@@ -49,7 +50,7 @@
terminate_slapd()
def create_slapd_configuration(config):
- global slapd_process, CONFIG
+ global slapd_process, URL
basedir = join(config.apphome, "ldapdb")
slapdconf = join(config.apphome, "slapd.conf")
confin = file(join(config.apphome, "slapd.conf.in")).read()
@@ -78,7 +79,7 @@
else:
raise EnvironmentError('Cannot start slapd with cmdline="%s" (from directory "%s")' %
(" ".join(cmdline), os.getcwd()))
- CONFIG = CONFIG % host
+ URL = u'ldap://%s' % host
def terminate_slapd():
global slapd_process
@@ -93,23 +94,92 @@
print "DONE"
del slapd_process
-class LDAPUserSourceTC(CubicWebTC):
+
+
+
+class LDAPFeedSourceTC(CubicWebTC):
+ test_db_id = 'ldap-feed'
+
+ @classmethod
+ def pre_setup_database(cls, session, config):
+ session.create_entity('CWSource', name=u'ldapuser', type=u'ldapfeed', parser=u'ldapfeed',
+ url=URL, config=CONFIG)
+ session.commit()
+ isession = session.repo.internal_session()
+ lfsource = isession.repo.sources_by_uri['ldapuser']
+ stats = lfsource.pull_data(isession, force=True, raise_on_error=True)
+
+ def setUp(self):
+ super(LDAPFeedSourceTC, self).setUp()
+ # ldap source url in the database may use a different port as the one
+ # just attributed
+ lfsource = self.repo.sources_by_uri['ldapuser']
+ lfsource.urls = [URL]
+
+ def assertMetadata(self, entity):
+ self.assertTrue(entity.creation_date)
+ self.assertTrue(entity.modification_date)
+
+ def test_authenticate(self):
+ source = self.repo.sources_by_uri['ldapuser']
+ self.session.set_cnxset()
+ self.assertRaises(AuthenticationError,
+ source.authenticate, self.session, 'toto', 'toto')
+
+ def test_base(self):
+ # check a known one
+ rset = self.sexecute('CWUser X WHERE X login %(login)s', {'login': 'syt'})
+ e = rset.get_entity(0, 0)
+ self.assertEqual(e.login, 'syt')
+ e.complete()
+ self.assertMetadata(e)
+ self.assertEqual(e.firstname, None)
+ self.assertEqual(e.surname, None)
+ self.assertEqual(e.in_group[0].name, 'users')
+ self.assertEqual(e.owned_by[0].login, 'syt')
+ self.assertEqual(e.created_by, ())
+ self.assertEqual(e.primary_email[0].address, 'Sylvain Thenault')
+ # email content should be indexed on the user
+ rset = self.sexecute('CWUser X WHERE X has_text "thenault"')
+ self.assertEqual(rset.rows, [[e.eid]])
+
+ def test_copy_to_system_source(self):
+ source = self.repo.sources_by_uri['ldapuser']
+ eid = self.sexecute('CWUser X WHERE X login %(login)s', {'login': 'syt'})[0][0]
+ self.sexecute('SET X cw_source S WHERE X eid %(x)s, S name "system"', {'x': eid})
+ self.commit()
+ source.reset_caches()
+ rset = self.sexecute('CWUser X WHERE X login %(login)s', {'login': 'syt'})
+ self.assertEqual(len(rset), 1)
+ e = rset.get_entity(0, 0)
+ self.assertEqual(e.eid, eid)
+ self.assertEqual(e.cw_metainformation(), {'source': {'type': u'native', 'uri': u'system', 'use-cwuri-as-url': False},
+ 'type': 'CWUser',
+ 'extid': None})
+ self.assertEqual(e.cw_source[0].name, 'system')
+ self.assertTrue(e.creation_date)
+ self.assertTrue(e.modification_date)
+ # XXX test some password has been set
+ source.pull_data(self.session)
+ rset = self.sexecute('CWUser X WHERE X login %(login)s', {'login': 'syt'})
+ self.assertEqual(len(rset), 1)
+
+
+class LDAPUserSourceTC(LDAPFeedSourceTC):
test_db_id = 'ldap-user'
tags = CubicWebTC.tags | Tags(('ldap'))
@classmethod
def pre_setup_database(cls, session, config):
session.create_entity('CWSource', name=u'ldapuser', type=u'ldapuser',
- config=CONFIG)
+ url=URL, config=CONFIG)
session.commit()
# XXX keep it there
session.execute('CWUser U')
- def test_authenticate(self):
- source = self.repo.sources_by_uri['ldapuser']
- self.session.set_cnxset()
- self.assertRaises(AuthenticationError,
- source.authenticate, self.session, 'toto', 'toto')
+ def assertMetadata(self, entity):
+ self.assertEqual(entity.creation_date, None)
+ self.assertEqual(entity.modification_date, None)
def test_synchronize(self):
source = self.repo.sources_by_uri['ldapuser']
@@ -121,8 +191,7 @@
e = rset.get_entity(0, 0)
self.assertEqual(e.login, 'syt')
e.complete()
- self.assertEqual(e.creation_date, None)
- self.assertEqual(e.modification_date, None)
+ self.assertMetadata(e)
self.assertEqual(e.firstname, None)
self.assertEqual(e.surname, None)
self.assertEqual(e.in_group[0].name, 'users')
@@ -347,27 +416,6 @@
rset = cu.execute('Any F WHERE X has_text "iaminguestsgrouponly", X firstname F')
self.assertEqual(rset.rows, [[None]])
- def test_copy_to_system_source(self):
- source = self.repo.sources_by_uri['ldapuser']
- eid = self.sexecute('CWUser X WHERE X login %(login)s', {'login': 'syt'})[0][0]
- self.sexecute('SET X cw_source S WHERE X eid %(x)s, S name "system"', {'x': eid})
- self.commit()
- source.reset_caches()
- rset = self.sexecute('CWUser X WHERE X login %(login)s', {'login': 'syt'})
- self.assertEqual(len(rset), 1)
- e = rset.get_entity(0, 0)
- self.assertEqual(e.eid, eid)
- self.assertEqual(e.cw_metainformation(), {'source': {'type': u'native', 'uri': u'system', 'use-cwuri-as-url': False},
- 'type': 'CWUser',
- 'extid': None})
- self.assertEqual(e.cw_source[0].name, 'system')
- self.assertTrue(e.creation_date)
- self.assertTrue(e.modification_date)
- # XXX test some password has been set
- source.synchronize()
- rset = self.sexecute('CWUser X WHERE X login %(login)s', {'login': 'syt'})
- self.assertEqual(len(rset), 1)
-
def test_nonregr1(self):
self.sexecute('Any X,AA ORDERBY AA DESC WHERE E eid %(x)s, E owned_by X, '
'X modification_date AA',
@@ -403,7 +451,6 @@
'OR (EXISTS(U in_group H, ME in_group H, NOT H name "users")), U login UL, U is CWUser)',
{'x': self.session.user.eid})
-
class GlobTrFuncTC(TestCase):
def test_count(self):
@@ -438,6 +485,7 @@
res = trfunc.apply([[1, 2], [2, 4], [3, 6], [1, 5]])
self.assertEqual(res, [[1, 5], [2, 4], [3, 6]])
+
class RQL2LDAPFilterTC(RQLGeneratorTC):
tags = RQLGeneratorTC.tags | Tags(('ldap'))