server/test/unittest_ldapsource.py
changeset 11057 0b59724cb3f2
parent 11052 058bb3dc685f
child 11058 23eb30449fe5
equal deleted inserted replaced
11052:058bb3dc685f 11057:0b59724cb3f2
     1 # copyright 2003-2014 LOGILAB S.A. (Paris, FRANCE), all rights reserved.
       
     2 # contact http://www.logilab.fr/ -- mailto:contact@logilab.fr
       
     3 #
       
     4 # This file is part of CubicWeb.
       
     5 #
       
     6 # CubicWeb is free software: you can redistribute it and/or modify it under the
       
     7 # terms of the GNU Lesser General Public License as published by the Free
       
     8 # Software Foundation, either version 2.1 of the License, or (at your option)
       
     9 # any later version.
       
    10 #
       
    11 # CubicWeb is distributed in the hope that it will be useful, but WITHOUT
       
    12 # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
       
    13 # FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for more
       
    14 # details.
       
    15 #
       
    16 # You should have received a copy of the GNU Lesser General Public License along
       
    17 # with CubicWeb.  If not, see <http://www.gnu.org/licenses/>.
       
    18 """cubicweb.server.sources.ldapfeed unit and functional tests
       
    19 
       
    20 Those tests expect to have slapd, python-ldap3 and ldapscripts packages installed.
       
    21 """
       
    22 from __future__ import print_function
       
    23 
       
    24 import os
       
    25 import sys
       
    26 import shutil
       
    27 import time
       
    28 import subprocess
       
    29 import tempfile
       
    30 import unittest
       
    31 from os.path import join
       
    32 
       
    33 from six import string_types
       
    34 from six.moves import range
       
    35 
       
    36 from cubicweb import AuthenticationError
       
    37 from cubicweb.devtools.testlib import CubicWebTC
       
    38 from cubicweb.devtools.httptest import get_available_port
       
    39 
       
    40 
       
    41 CONFIG_LDAPFEED = u'''
       
    42 user-base-dn=ou=People,dc=cubicweb,dc=test
       
    43 group-base-dn=ou=Group,dc=cubicweb,dc=test
       
    44 user-attrs-map=uid=login,mail=email,userPassword=upassword
       
    45 group-attrs-map=cn=name,memberUid=member
       
    46 '''
       
    47 CONFIG_LDAPUSER = u'''
       
    48 user-base-dn=ou=People,dc=cubicweb,dc=test
       
    49 user-attrs-map=uid=login,mail=email,userPassword=upassword
       
    50 '''
       
    51 
       
    52 URL = None
       
    53 
       
    54 
       
    55 def create_slapd_configuration(cls):
       
    56     global URL
       
    57     slapddir = tempfile.mkdtemp('cw-unittest-ldap')
       
    58     config = cls.config
       
    59     slapdconf = join(config.apphome, "slapd.conf")
       
    60     confin = open(join(config.apphome, "slapd.conf.in")).read()
       
    61     confstream = open(slapdconf, 'w')
       
    62     confstream.write(confin % {'apphome': config.apphome, 'testdir': slapddir})
       
    63     confstream.close()
       
    64     # fill ldap server with some data
       
    65     ldiffile = join(config.apphome, "ldap_test.ldif")
       
    66     config.info('Initing ldap database')
       
    67     cmdline = ['/usr/sbin/slapadd', '-f', slapdconf, '-l', ldiffile, '-c']
       
    68     PIPE = subprocess.PIPE
       
    69     slapproc = subprocess.Popen(cmdline, stdout=PIPE, stderr=PIPE)
       
    70     stdout, stderr = slapproc.communicate()
       
    71     if slapproc.returncode:
       
    72         print('slapadd returned with status: %s'
       
    73               % slapproc.returncode, file=sys.stderr)
       
    74         sys.stdout.write(stdout)
       
    75         sys.stderr.write(stderr)
       
    76 
       
    77     # ldapuri = 'ldapi://' + join(basedir, "ldapi").replace('/', '%2f')
       
    78     port = get_available_port(range(9000, 9100))
       
    79     host = 'localhost:%s' % port
       
    80     ldapuri = 'ldap://%s' % host
       
    81     cmdline = ["/usr/sbin/slapd", "-f", slapdconf, "-h", ldapuri, "-d", "0"]
       
    82     config.info('Starting slapd:', ' '.join(cmdline))
       
    83     PIPE = subprocess.PIPE
       
    84     cls.slapd_process = subprocess.Popen(cmdline, stdout=PIPE, stderr=PIPE)
       
    85     time.sleep(0.2)
       
    86     if cls.slapd_process.poll() is None:
       
    87         config.info('slapd started with pid %s', cls.slapd_process.pid)
       
    88     else:
       
    89         raise EnvironmentError('Cannot start slapd with cmdline="%s" (from directory "%s")' %
       
    90                                (" ".join(cmdline), os.getcwd()))
       
    91     URL = u'ldap://%s' % host
       
    92     return slapddir
       
    93 
       
    94 
       
    95 def terminate_slapd(cls):
       
    96     config = cls.config
       
    97     if cls.slapd_process and cls.slapd_process.returncode is None:
       
    98         config.info('terminating slapd')
       
    99         if hasattr(cls.slapd_process, 'terminate'):
       
   100             cls.slapd_process.terminate()
       
   101         else:
       
   102             import signal
       
   103             os.kill(cls.slapd_process.pid, signal.SIGTERM)
       
   104         stdout, stderr = cls.slapd_process.communicate()
       
   105         if cls.slapd_process.returncode:
       
   106             print('slapd returned with status: %s'
       
   107                   % cls.slapd_process.returncode, file=sys.stderr)
       
   108             sys.stdout.write(stdout)
       
   109             sys.stderr.write(stderr)
       
   110         config.info('DONE')
       
   111 
       
   112 
       
   113 class LDAPFeedTestBase(CubicWebTC):
       
   114     test_db_id = 'ldap-feed'
       
   115     loglevel = 'ERROR'
       
   116 
       
   117     @classmethod
       
   118     def setUpClass(cls):
       
   119         if not os.path.exists('/usr/sbin/slapd'):
       
   120             raise unittest.SkipTest('slapd not found')
       
   121         from cubicweb.cwctl import init_cmdline_log_threshold
       
   122         init_cmdline_log_threshold(cls.config, cls.loglevel)
       
   123         cls._tmpdir = create_slapd_configuration(cls)
       
   124 
       
   125     @classmethod
       
   126     def tearDownClass(cls):
       
   127         terminate_slapd(cls)
       
   128         try:
       
   129             shutil.rmtree(cls._tmpdir)
       
   130         except:
       
   131             pass
       
   132 
       
   133     @classmethod
       
   134     def pre_setup_database(cls, cnx, config):
       
   135         cnx.create_entity('CWSource', name=u'ldap', type=u'ldapfeed', parser=u'ldapfeed',
       
   136                           url=URL, config=CONFIG_LDAPFEED)
       
   137 
       
   138         cnx.commit()
       
   139         return cls.pull(cnx)
       
   140 
       
   141     @classmethod
       
   142     def pull(self, cnx):
       
   143         lfsource = cnx.repo.sources_by_uri['ldap']
       
   144         stats = lfsource.pull_data(cnx, force=True, raise_on_error=True)
       
   145         cnx.commit()
       
   146         return stats
       
   147 
       
   148     def setup_database(self):
       
   149         with self.admin_access.repo_cnx() as cnx:
       
   150             cnx.execute('DELETE Any E WHERE E cw_source S, S name "ldap"')
       
   151             cnx.execute('SET S config %(conf)s, S url %(url)s '
       
   152                         'WHERE S is CWSource, S name "ldap"',
       
   153                         {"conf": CONFIG_LDAPFEED, 'url': URL})
       
   154             cnx.commit()
       
   155         with self.repo.internal_cnx() as cnx:
       
   156             self.pull(cnx)
       
   157 
       
   158     def add_ldap_entry(self, dn, mods):
       
   159         """
       
   160         add an LDAP entity
       
   161         """
       
   162         modcmd = ['dn: %s' % dn, 'changetype: add']
       
   163         for key, values in mods.items():
       
   164             if isinstance(values, string_types):
       
   165                 values = [values]
       
   166             for value in values:
       
   167                 modcmd.append('%s: %s' % (key, value))
       
   168         self._ldapmodify(modcmd)
       
   169 
       
   170     def delete_ldap_entry(self, dn):
       
   171         """
       
   172         delete an LDAP entity
       
   173         """
       
   174         modcmd = ['dn: %s' % dn, 'changetype: delete']
       
   175         self._ldapmodify(modcmd)
       
   176 
       
   177     def update_ldap_entry(self, dn, mods):
       
   178         """
       
   179         modify one or more attributes of an LDAP entity
       
   180         """
       
   181         modcmd = ['dn: %s' % dn, 'changetype: modify']
       
   182         for (kind, key), values in mods.items():
       
   183             modcmd.append('%s: %s' % (kind, key))
       
   184             if isinstance(values, string_types):
       
   185                 values = [values]
       
   186             for value in values:
       
   187                 modcmd.append('%s: %s' % (key, value))
       
   188             modcmd.append('-')
       
   189         self._ldapmodify(modcmd)
       
   190 
       
   191     def _ldapmodify(self, modcmd):
       
   192         uri = self.repo.sources_by_uri['ldap'].urls[0]
       
   193         updatecmd = ['ldapmodify', '-H', uri, '-v', '-x', '-D',
       
   194                      'cn=admin,dc=cubicweb,dc=test', '-w', 'cw']
       
   195         PIPE = subprocess.PIPE
       
   196         p = subprocess.Popen(updatecmd, stdin=PIPE, stdout=PIPE, stderr=PIPE)
       
   197         p.stdin.write('\n'.join(modcmd).encode('ascii'))
       
   198         p.stdin.close()
       
   199         if p.wait():
       
   200             raise RuntimeError("ldap update failed: %s" % ('\n'.join(p.stderr.readlines())))
       
   201 
       
   202 
       
   203 class CheckWrongGroup(LDAPFeedTestBase):
       
   204     """
       
   205     A testcase for situations where the default group for CWUser
       
   206     created from LDAP is wrongly configured.
       
   207     """
       
   208 
       
   209     def test_wrong_group(self):
       
   210         with self.admin_access.repo_cnx() as cnx:
       
   211             source = cnx.execute('CWSource S WHERE S type="ldapfeed"').get_entity(0, 0)
       
   212             config = source.repo_source.check_config(source)
       
   213             # inject a bogus group here, along with at least a valid one
       
   214             config['user-default-group'] = ('thisgroupdoesnotexists', 'users')
       
   215             source.repo_source.update_config(source, config)
       
   216             cnx.commit()
       
   217             # here we emitted an error log entry
       
   218             source.repo_source.pull_data(cnx, force=True, raise_on_error=True)
       
   219             cnx.commit()
       
   220 
       
   221 
       
   222 class LDAPFeedUserTC(LDAPFeedTestBase):
       
   223     """
       
   224     A testcase for CWUser support in ldapfeed (basic tests and authentication).
       
   225     """
       
   226 
       
   227     def assertMetadata(self, entity):
       
   228         self.assertTrue(entity.creation_date)
       
   229         self.assertTrue(entity.modification_date)
       
   230 
       
   231     def test_authenticate(self):
       
   232         source = self.repo.sources_by_uri['ldap']
       
   233         with self.admin_access.repo_cnx() as cnx:
       
   234             # ensure we won't be logged against
       
   235             self.assertRaises(AuthenticationError,
       
   236                               source.authenticate, cnx, 'toto', 'toto')
       
   237             self.assertTrue(source.authenticate(cnx, 'syt', 'syt'))
       
   238         sessionid = self.repo.connect('syt', password='syt')
       
   239         self.assertTrue(sessionid)
       
   240         self.repo.close(sessionid)
       
   241 
       
   242     def test_base(self):
       
   243         with self.admin_access.repo_cnx() as cnx:
       
   244             # check a known one
       
   245             rset = cnx.execute('CWUser X WHERE X login %(login)s', {'login': 'syt'})
       
   246             e = rset.get_entity(0, 0)
       
   247             self.assertEqual(e.login, 'syt')
       
   248             e.complete()
       
   249             self.assertMetadata(e)
       
   250             self.assertEqual(e.firstname, None)
       
   251             self.assertEqual(e.surname, None)
       
   252             self.assertIn('users', set(g.name for g in e.in_group))
       
   253             self.assertEqual(e.owned_by[0].login, 'syt')
       
   254             self.assertEqual(e.created_by, ())
       
   255             addresses = [pe.address for pe in e.use_email]
       
   256             addresses.sort()
       
   257             self.assertEqual(['sylvain.thenault@logilab.fr', 'syt@logilab.fr'],
       
   258                              addresses)
       
   259             self.assertIn(e.primary_email[0].address, ['sylvain.thenault@logilab.fr',
       
   260                                                        'syt@logilab.fr'])
       
   261             # email content should be indexed on the user
       
   262             rset = cnx.execute('CWUser X WHERE X has_text "thenault"')
       
   263             self.assertEqual(rset.rows, [[e.eid]])
       
   264 
       
   265     def test_copy_to_system_source(self):
       
   266         "make sure we can 'convert' an LDAP user into a system one"
       
   267         with self.admin_access.repo_cnx() as cnx:
       
   268             source = self.repo.sources_by_uri['ldap']
       
   269             eid = cnx.execute('CWUser X WHERE X login %(login)s', {'login': 'syt'})[0][0]
       
   270             cnx.execute('SET X cw_source S WHERE X eid %(x)s, S name "system"', {'x': eid})
       
   271             cnx.commit()
       
   272             source.reset_caches()
       
   273             rset = cnx.execute('CWUser X WHERE X login %(login)s', {'login': 'syt'})
       
   274             self.assertEqual(len(rset), 1)
       
   275             e = rset.get_entity(0, 0)
       
   276             self.assertEqual(e.eid, eid)
       
   277             self.assertEqual(e.cw_metainformation(), {'source': {'type': u'native',
       
   278                                                                  'uri': u'system',
       
   279                                                                  'use-cwuri-as-url': False},
       
   280                                                       'type': 'CWUser',
       
   281                                                       'extid': None})
       
   282             self.assertEqual(e.cw_source[0].name, 'system')
       
   283             self.assertTrue(e.creation_date)
       
   284             self.assertTrue(e.modification_date)
       
   285             source.pull_data(cnx)
       
   286             rset = cnx.execute('CWUser X WHERE X login %(login)s', {'login': 'syt'})
       
   287             self.assertEqual(len(rset), 1)
       
   288             self.assertTrue(self.repo.system_source.authenticate(cnx, 'syt', password='syt'))
       
   289             # make sure the pull from ldap have not "reverted" user as a ldap-feed user
       
   290             self.assertEqual(e.cw_metainformation(), {'source': {'type': u'native',
       
   291                                                                  'uri': u'system',
       
   292                                                                  'use-cwuri-as-url': False},
       
   293                                                       'type': 'CWUser',
       
   294                                                       'extid': None})
       
   295             # and that the password stored in the system source is not empty or so
       
   296             user = cnx.execute('CWUser U WHERE U login "syt"').get_entity(0, 0)
       
   297             user.cw_clear_all_caches()
       
   298             cu = cnx.system_sql("SELECT cw_upassword FROM cw_cwuser WHERE cw_login='syt';")
       
   299             pwd = cu.fetchall()[0][0]
       
   300             self.assertIsNotNone(pwd)
       
   301             self.assertTrue(str(pwd))
       
   302 
       
   303 
       
   304 class LDAPFeedUserDeletionTC(LDAPFeedTestBase):
       
   305     """
       
   306     A testcase for situations where users are deleted from or
       
   307     unavailable in the LDAP database.
       
   308     """
       
   309 
       
   310     def test_a_filter_inactivate(self):
       
   311         """ filtered out people should be deactivated, unable to authenticate """
       
   312         with self.admin_access.repo_cnx() as cnx:
       
   313             source = cnx.execute('CWSource S WHERE S type="ldapfeed"').get_entity(0, 0)
       
   314             config = source.repo_source.check_config(source)
       
   315             # filter with adim's phone number
       
   316             config['user-filter'] = u'(%s=%s)' % ('telephoneNumber', '109')
       
   317             source.repo_source.update_config(source, config)
       
   318             cnx.commit()
       
   319         with self.repo.internal_cnx() as cnx:
       
   320             self.pull(cnx)
       
   321         self.assertRaises(AuthenticationError, self.repo.connect, 'syt', password='syt')
       
   322         with self.admin_access.repo_cnx() as cnx:
       
   323             self.assertEqual(cnx.execute('Any N WHERE U login "syt", '
       
   324                                          'U in_state S, S name N').rows[0][0],
       
   325                              'deactivated')
       
   326             self.assertEqual(cnx.execute('Any N WHERE U login "adim", '
       
   327                                          'U in_state S, S name N').rows[0][0],
       
   328                              'activated')
       
   329             # unfilter, syt should be activated again
       
   330             config['user-filter'] = u''
       
   331             source.repo_source.update_config(source, config)
       
   332             cnx.commit()
       
   333         with self.repo.internal_cnx() as cnx:
       
   334             self.pull(cnx)
       
   335         with self.admin_access.repo_cnx() as cnx:
       
   336             self.assertEqual(cnx.execute('Any N WHERE U login "syt", '
       
   337                                          'U in_state S, S name N').rows[0][0],
       
   338                              'activated')
       
   339             self.assertEqual(cnx.execute('Any N WHERE U login "adim", '
       
   340                                          'U in_state S, S name N').rows[0][0],
       
   341                              'activated')
       
   342 
       
   343     def test_delete(self):
       
   344         """ delete syt, pull, check deactivation, repull,
       
   345         read syt, pull, check activation
       
   346         """
       
   347         self.delete_ldap_entry('uid=syt,ou=People,dc=cubicweb,dc=test')
       
   348         with self.repo.internal_cnx() as cnx:
       
   349             self.pull(cnx)
       
   350         self.assertRaises(AuthenticationError, self.repo.connect, 'syt', password='syt')
       
   351         with self.admin_access.repo_cnx() as cnx:
       
   352             self.assertEqual(cnx.execute('Any N WHERE U login "syt", '
       
   353                                          'U in_state S, S name N').rows[0][0],
       
   354                              'deactivated')
       
   355         with self.repo.internal_cnx() as cnx:
       
   356             # check that it doesn't choke
       
   357             self.pull(cnx)
       
   358         # reinsert syt
       
   359         self.add_ldap_entry('uid=syt,ou=People,dc=cubicweb,dc=test',
       
   360                             {'objectClass': ['OpenLDAPperson', 'posixAccount', 'top',
       
   361                                              'shadowAccount'],
       
   362                              'cn': 'Sylvain Thenault',
       
   363                              'sn': 'Thenault',
       
   364                              'gidNumber': '1004',
       
   365                              'uid': 'syt',
       
   366                              'homeDirectory': '/home/syt',
       
   367                              'shadowFlag': '134538764',
       
   368                              'uidNumber': '1004',
       
   369                              'givenName': 'Sylvain',
       
   370                              'telephoneNumber': '106',
       
   371                              'displayName': 'sthenault',
       
   372                              'gecos': 'Sylvain Thenault',
       
   373                              'mail': ['sylvain.thenault@logilab.fr', 'syt@logilab.fr'],
       
   374                              'userPassword': 'syt',
       
   375                              })
       
   376         with self.repo.internal_cnx() as cnx:
       
   377             self.pull(cnx)
       
   378         with self.admin_access.repo_cnx() as cnx:
       
   379             self.assertEqual(cnx.execute('Any N WHERE U login "syt", '
       
   380                                          'U in_state S, S name N').rows[0][0],
       
   381                              'activated')
       
   382 
       
   383     def test_reactivate_deleted(self):
       
   384         # test reactivating BY HAND the user isn't enough to
       
   385         # authenticate, as the native source refuse to authenticate
       
   386         # user from other sources
       
   387         self.delete_ldap_entry('uid=syt,ou=People,dc=cubicweb,dc=test')
       
   388         with self.repo.internal_cnx() as cnx:
       
   389             self.pull(cnx)
       
   390         with self.admin_access.repo_cnx() as cnx:
       
   391             # reactivate user (which source is still ldap-feed)
       
   392             user = cnx.execute('CWUser U WHERE U login "syt"').get_entity(0, 0)
       
   393             user.cw_adapt_to('IWorkflowable').fire_transition('activate')
       
   394             cnx.commit()
       
   395             with self.assertRaises(AuthenticationError):
       
   396                 self.repo.connect('syt', password='syt')
       
   397 
       
   398             # ok now let's try to make it a system user
       
   399             cnx.execute('SET X cw_source S WHERE X eid %(x)s, S name "system"', {'x': user.eid})
       
   400             cnx.commit()
       
   401         # and that we can now authenticate again
       
   402         self.assertRaises(AuthenticationError, self.repo.connect, 'syt', password='toto')
       
   403         sessionid = self.repo.connect('syt', password='syt')
       
   404         self.assertTrue(sessionid)
       
   405         self.repo.close(sessionid)
       
   406 
       
   407 
       
   408 class LDAPFeedGroupTC(LDAPFeedTestBase):
       
   409     """
       
   410     A testcase for group support in ldapfeed.
       
   411     """
       
   412 
       
   413     def test_groups_exist(self):
       
   414         with self.admin_access.repo_cnx() as cnx:
       
   415             rset = cnx.execute('CWGroup X WHERE X name "dir"')
       
   416             self.assertEqual(len(rset), 1)
       
   417 
       
   418             rset = cnx.execute('CWGroup X WHERE X cw_source S, S name "ldap"')
       
   419             self.assertEqual(len(rset), 2)
       
   420 
       
   421     def test_group_deleted(self):
       
   422         with self.admin_access.repo_cnx() as cnx:
       
   423             rset = cnx.execute('CWGroup X WHERE X name "dir"')
       
   424             self.assertEqual(len(rset), 1)
       
   425 
       
   426     def test_in_group(self):
       
   427         with self.admin_access.repo_cnx() as cnx:
       
   428             rset = cnx.execute('CWGroup X WHERE X name %(name)s', {'name': 'dir'})
       
   429             dirgroup = rset.get_entity(0, 0)
       
   430             self.assertEqual(set(['syt', 'adim']),
       
   431                              set([u.login for u in dirgroup.reverse_in_group]))
       
   432             rset = cnx.execute('CWGroup X WHERE X name %(name)s', {'name': 'logilab'})
       
   433             logilabgroup = rset.get_entity(0, 0)
       
   434             self.assertEqual(set(['adim']),
       
   435                              set([u.login for u in logilabgroup.reverse_in_group]))
       
   436 
       
   437     def test_group_member_added(self):
       
   438         with self.repo.internal_cnx() as cnx:
       
   439             self.pull(cnx)
       
   440         with self.admin_access.repo_cnx() as cnx:
       
   441             rset = cnx.execute('Any L WHERE U in_group G, G name %(name)s, U login L',
       
   442                                {'name': 'logilab'})
       
   443             self.assertEqual(len(rset), 1)
       
   444             self.assertEqual(rset[0][0], 'adim')
       
   445 
       
   446         try:
       
   447             self.update_ldap_entry('cn=logilab,ou=Group,dc=cubicweb,dc=test',
       
   448                                    {('add', 'memberUid'): ['syt']})
       
   449             with self.repo.internal_cnx() as cnx:
       
   450                 self.pull(cnx)
       
   451 
       
   452             with self.admin_access.repo_cnx() as cnx:
       
   453                 rset = cnx.execute('Any L WHERE U in_group G, G name %(name)s, U login L',
       
   454                                    {'name': 'logilab'})
       
   455                 self.assertEqual(len(rset), 2)
       
   456                 members = set([u[0] for u in rset])
       
   457                 self.assertEqual(set(['adim', 'syt']), members)
       
   458 
       
   459         finally:
       
   460             # back to normal ldap setup
       
   461             self.tearDownClass()
       
   462             self.setUpClass()
       
   463 
       
   464     def test_group_member_deleted(self):
       
   465         with self.repo.internal_cnx() as cnx:
       
   466             self.pull(cnx)  # ensure we are sync'ed
       
   467         with self.admin_access.repo_cnx() as cnx:
       
   468             rset = cnx.execute('Any L WHERE U in_group G, G name %(name)s, U login L',
       
   469                                {'name': 'logilab'})
       
   470             self.assertEqual(len(rset), 1)
       
   471             self.assertEqual(rset[0][0], 'adim')
       
   472 
       
   473         try:
       
   474             self.update_ldap_entry('cn=logilab,ou=Group,dc=cubicweb,dc=test',
       
   475                                    {('delete', 'memberUid'): ['adim']})
       
   476             with self.repo.internal_cnx() as cnx:
       
   477                 self.pull(cnx)
       
   478 
       
   479             with self.admin_access.repo_cnx() as cnx:
       
   480                 rset = cnx.execute('Any L WHERE U in_group G, G name %(name)s, U login L',
       
   481                                    {'name': 'logilab'})
       
   482                 self.assertEqual(len(rset), 0, rset.rows)
       
   483         finally:
       
   484             # back to normal ldap setup
       
   485             self.tearDownClass()
       
   486             self.setUpClass()
       
   487 
       
   488 
       
   489 if __name__ == '__main__':
       
   490     from logilab.common.testlib import unittest_main
       
   491     unittest_main()