server/test/unittest_ldapsource.py
changeset 9793 52647b05bda8
parent 9754 54f422ee3deb
child 9794 61da050d11e4
equal deleted inserted replaced
9792:1349398d744e 9793:52647b05bda8
   120             shutil.rmtree(cls._tmpdir)
   120             shutil.rmtree(cls._tmpdir)
   121         except:
   121         except:
   122             pass
   122             pass
   123 
   123 
   124     @classmethod
   124     @classmethod
   125     def pre_setup_database(cls, session, config):
   125     def pre_setup_database(cls, cnx, config):
   126         session.create_entity('CWSource', name=u'ldap', type=u'ldapfeed', parser=u'ldapfeed',
   126         cnx.create_entity('CWSource', name=u'ldap', type=u'ldapfeed', parser=u'ldapfeed',
   127                               url=URL, config=CONFIG_LDAPFEED)
   127                           url=URL, config=CONFIG_LDAPFEED)
   128 
   128 
   129         session.commit()
   129         cnx.commit()
   130         return cls._pull(session)
   130         return cls.pull(cnx)
   131 
   131 
   132     @classmethod
   132     @classmethod
   133     def _pull(cls, session):
   133     def pull(self, cnx):
   134         with session.repo.internal_session() as isession:
   134         lfsource = cnx.repo.sources_by_uri['ldap']
   135             lfsource = isession.repo.sources_by_uri['ldap']
   135         stats = lfsource.pull_data(cnx, force=True, raise_on_error=True)
   136             stats = lfsource.pull_data(isession, force=True, raise_on_error=True)
   136         cnx.commit()
   137             isession.commit()
   137         return stats
   138             return stats
       
   139 
       
   140     def pull(self):
       
   141         return self._pull(self.session)
       
   142 
   138 
   143     def setup_database(self):
   139     def setup_database(self):
   144         with self.session.repo.internal_session(safe=True) as session:
   140         with self.admin_access.repo_cnx() as cnx:
   145             session.execute('DELETE Any E WHERE E cw_source S, S name "ldap"')
   141             cnx.execute('DELETE Any E WHERE E cw_source S, S name "ldap"')
   146             session.execute('SET S config %(conf)s, S url %(url)s '
   142             cnx.execute('SET S config %(conf)s, S url %(url)s '
   147                             'WHERE S is CWSource, S name "ldap"',
   143                         'WHERE S is CWSource, S name "ldap"',
   148                             {"conf": CONFIG_LDAPFEED, 'url': URL} )
   144                         {"conf": CONFIG_LDAPFEED, 'url': URL} )
   149             session.commit()
   145             cnx.commit()
   150         self.pull()
   146         with self.repo.internal_cnx() as cnx:
       
   147             self.pull(cnx)
   151 
   148 
   152     def add_ldap_entry(self, dn, mods):
   149     def add_ldap_entry(self, dn, mods):
   153         """
   150         """
   154         add an LDAP entity
   151         add an LDAP entity
   155         """
   152         """
   198     A testcase for situations where the default group for CWUser
   195     A testcase for situations where the default group for CWUser
   199     created from LDAP is wrongly configured.
   196     created from LDAP is wrongly configured.
   200     """
   197     """
   201 
   198 
   202     def test_wrong_group(self):
   199     def test_wrong_group(self):
   203         with self.session.repo.internal_session(safe=True) as session:
   200         with self.admin_access.repo_cnx() as cnx:
   204             source = self.session.execute('CWSource S WHERE S type="ldapfeed"').get_entity(0,0)
   201             source = cnx.execute('CWSource S WHERE S type="ldapfeed"').get_entity(0,0)
   205             config = source.repo_source.check_config(source)
   202             config = source.repo_source.check_config(source)
   206             # inject a bogus group here, along with at least a valid one
   203             # inject a bogus group here, along with at least a valid one
   207             config['user-default-group'] = ('thisgroupdoesnotexists','users')
   204             config['user-default-group'] = ('thisgroupdoesnotexists','users')
   208             source.repo_source.update_config(source, config)
   205             source.repo_source.update_config(source, config)
   209             session.commit(free_cnxset=False)
   206             cnx.commit()
   210             # here we emitted an error log entry
   207             # here we emitted an error log entry
   211             stats = source.repo_source.pull_data(session, force=True, raise_on_error=True)
   208             stats = source.repo_source.pull_data(cnx, force=True, raise_on_error=True)
   212             session.commit()
   209             cnx.commit()
   213 
   210 
   214 
   211 
   215 
   212 
   216 class LDAPFeedUserTC(LDAPFeedTestBase):
   213 class LDAPFeedUserTC(LDAPFeedTestBase):
   217     """
   214     """
   222         self.assertTrue(entity.creation_date)
   219         self.assertTrue(entity.creation_date)
   223         self.assertTrue(entity.modification_date)
   220         self.assertTrue(entity.modification_date)
   224 
   221 
   225     def test_authenticate(self):
   222     def test_authenticate(self):
   226         source = self.repo.sources_by_uri['ldap']
   223         source = self.repo.sources_by_uri['ldap']
   227         self.session.set_cnxset()
   224         with self.admin_access.repo_cnx() as cnx:
   228         # ensure we won't be logged against
   225             # ensure we won't be logged against
   229         self.assertRaises(AuthenticationError,
   226             self.assertRaises(AuthenticationError,
   230                           source.authenticate, self.session, 'toto', 'toto')
   227                               source.authenticate, cnx, 'toto', 'toto')
   231         self.assertTrue(source.authenticate(self.session, 'syt', 'syt'))
   228             self.assertTrue(source.authenticate(cnx, 'syt', 'syt'))
   232         self.assertTrue(self.repo.connect('syt', password='syt'))
   229         self.assertTrue(self.repo.connect('syt', password='syt'))
   233 
   230 
   234     def test_base(self):
   231     def test_base(self):
   235         # check a known one
   232         with self.admin_access.repo_cnx() as cnx:
   236         rset = self.sexecute('CWUser X WHERE X login %(login)s', {'login': 'syt'})
   233             # check a known one
   237         e = rset.get_entity(0, 0)
   234             rset = cnx.execute('CWUser X WHERE X login %(login)s', {'login': 'syt'})
   238         self.assertEqual(e.login, 'syt')
   235             e = rset.get_entity(0, 0)
   239         e.complete()
   236             self.assertEqual(e.login, 'syt')
   240         self.assertMetadata(e)
   237             e.complete()
   241         self.assertEqual(e.firstname, None)
   238             self.assertMetadata(e)
   242         self.assertEqual(e.surname, None)
   239             self.assertEqual(e.firstname, None)
   243         self.assertIn('users', set(g.name for g in e.in_group))
   240             self.assertEqual(e.surname, None)
   244         self.assertEqual(e.owned_by[0].login, 'syt')
   241             self.assertIn('users', set(g.name for g in e.in_group))
   245         self.assertEqual(e.created_by, ())
   242             self.assertEqual(e.owned_by[0].login, 'syt')
   246         addresses = [pe.address for pe in e.use_email]
   243             self.assertEqual(e.created_by, ())
   247         addresses.sort()
   244             addresses = [pe.address for pe in e.use_email]
   248         self.assertEqual(['sylvain.thenault@logilab.fr', 'syt@logilab.fr'],
   245             addresses.sort()
   249                          addresses)
   246             self.assertEqual(['sylvain.thenault@logilab.fr', 'syt@logilab.fr'],
   250         self.assertIn(e.primary_email[0].address, ['sylvain.thenault@logilab.fr',
   247                              addresses)
   251                                                    'syt@logilab.fr'])
   248             self.assertIn(e.primary_email[0].address, ['sylvain.thenault@logilab.fr',
   252         # email content should be indexed on the user
   249                                                        'syt@logilab.fr'])
   253         rset = self.sexecute('CWUser X WHERE X has_text "thenault"')
   250             # email content should be indexed on the user
   254         self.assertEqual(rset.rows, [[e.eid]])
   251             rset = cnx.execute('CWUser X WHERE X has_text "thenault"')
       
   252             self.assertEqual(rset.rows, [[e.eid]])
   255 
   253 
   256     def test_copy_to_system_source(self):
   254     def test_copy_to_system_source(self):
   257         "make sure we can 'convert' an LDAP user into a system one"
   255         "make sure we can 'convert' an LDAP user into a system one"
   258         source = self.repo.sources_by_uri['ldap']
   256         with self.admin_access.repo_cnx() as cnx:
   259         eid = self.sexecute('CWUser X WHERE X login %(login)s', {'login': 'syt'})[0][0]
   257             source = self.repo.sources_by_uri['ldap']
   260         self.sexecute('SET X cw_source S WHERE X eid %(x)s, S name "system"', {'x': eid})
   258             eid = cnx.execute('CWUser X WHERE X login %(login)s', {'login': 'syt'})[0][0]
   261         self.commit()
   259             cnx.execute('SET X cw_source S WHERE X eid %(x)s, S name "system"', {'x': eid})
   262         source.reset_caches()
   260             cnx.commit()
   263         rset = self.sexecute('CWUser X WHERE X login %(login)s', {'login': 'syt'})
   261             source.reset_caches()
   264         self.assertEqual(len(rset), 1)
   262             rset = cnx.execute('CWUser X WHERE X login %(login)s', {'login': 'syt'})
   265         e = rset.get_entity(0, 0)
   263             self.assertEqual(len(rset), 1)
   266         self.assertEqual(e.eid, eid)
   264             e = rset.get_entity(0, 0)
   267         self.assertEqual(e.cw_metainformation(), {'source': {'type': u'native',
   265             self.assertEqual(e.eid, eid)
   268                                                              'uri': u'system',
   266             self.assertEqual(e.cw_metainformation(), {'source': {'type': u'native',
   269                                                              'use-cwuri-as-url': False},
   267                                                                  'uri': u'system',
   270                                                   'type': 'CWUser',
   268                                                                  'use-cwuri-as-url': False},
   271                                                   'extid': None})
   269                                                       'type': 'CWUser',
   272         self.assertEqual(e.cw_source[0].name, 'system')
   270                                                       'extid': None})
   273         self.assertTrue(e.creation_date)
   271             self.assertEqual(e.cw_source[0].name, 'system')
   274         self.assertTrue(e.modification_date)
   272             self.assertTrue(e.creation_date)
   275         source.pull_data(self.session)
   273             self.assertTrue(e.modification_date)
   276         rset = self.sexecute('CWUser X WHERE X login %(login)s', {'login': 'syt'})
   274             source.pull_data(cnx)
   277         self.assertEqual(len(rset), 1)
   275             rset = cnx.execute('CWUser X WHERE X login %(login)s', {'login': 'syt'})
   278         self.assertTrue(self.repo.system_source.authenticate(
   276             self.assertEqual(len(rset), 1)
   279                 self.session, 'syt', password='syt'))
   277             self.assertTrue(self.repo.system_source.authenticate(cnx, 'syt', password='syt'))
   280         # make sure the pull from ldap have not "reverted" user as a ldap-feed user
   278             # make sure the pull from ldap have not "reverted" user as a ldap-feed user
   281         self.assertEqual(e.cw_metainformation(), {'source': {'type': u'native',
   279             self.assertEqual(e.cw_metainformation(), {'source': {'type': u'native',
   282                                                              'uri': u'system',
   280                                                                  'uri': u'system',
   283                                                              'use-cwuri-as-url': False},
   281                                                                  'use-cwuri-as-url': False},
   284                                                   'type': 'CWUser',
   282                                                       'type': 'CWUser',
   285                                                   'extid': None})
   283                                                       'extid': None})
   286         # and that the password stored in the system source is not empty or so
   284             # and that the password stored in the system source is not empty or so
   287         user = self.execute('CWUser U WHERE U login "syt"').get_entity(0, 0)
   285             user = cnx.execute('CWUser U WHERE U login "syt"').get_entity(0, 0)
   288         user.cw_clear_all_caches()
   286             user.cw_clear_all_caches()
   289         pwd = self.session.system_sql("SELECT cw_upassword FROM cw_cwuser WHERE cw_login='syt';").fetchall()[0][0]
   287             pwd = cnx.system_sql("SELECT cw_upassword FROM cw_cwuser WHERE cw_login='syt';").fetchall()[0][0]
   290         self.assertIsNotNone(pwd)
   288             self.assertIsNotNone(pwd)
   291         self.assertTrue(str(pwd))
   289             self.assertTrue(str(pwd))
   292 
   290 
   293 
   291 
   294 
   292 
   295 class LDAPFeedUserDeletionTC(LDAPFeedTestBase):
   293 class LDAPFeedUserDeletionTC(LDAPFeedTestBase):
   296     """
   294     """
   297     A testcase for situations where users are deleted from or
   295     A testcase for situations where users are deleted from or
   298     unavailabe in the LDAP database.
   296     unavailable in the LDAP database.
   299     """
   297     """
       
   298 
   300     def test_a_filter_inactivate(self):
   299     def test_a_filter_inactivate(self):
   301         """ filtered out people should be deactivated, unable to authenticate """
   300         """ filtered out people should be deactivated, unable to authenticate """
   302         source = self.session.execute('CWSource S WHERE S type="ldapfeed"').get_entity(0,0)
   301         with self.admin_access.repo_cnx() as cnx:
   303         config = source.repo_source.check_config(source)
   302             source = cnx.execute('CWSource S WHERE S type="ldapfeed"').get_entity(0,0)
   304         # filter with adim's phone number
   303             config = source.repo_source.check_config(source)
   305         config['user-filter'] = u'(%s=%s)' % ('telephoneNumber', '109')
   304             # filter with adim's phone number
   306         source.repo_source.update_config(source, config)
   305             config['user-filter'] = u'(%s=%s)' % ('telephoneNumber', '109')
   307         self.commit()
   306             source.repo_source.update_config(source, config)
   308         self.pull()
   307             cnx.commit()
       
   308         with self.repo.internal_cnx() as cnx:
       
   309             self.pull(cnx)
   309         self.assertRaises(AuthenticationError, self.repo.connect, 'syt', password='syt')
   310         self.assertRaises(AuthenticationError, self.repo.connect, 'syt', password='syt')
   310         self.assertEqual(self.execute('Any N WHERE U login "syt", '
   311         with self.admin_access.repo_cnx() as cnx:
   311                                       'U in_state S, S name N').rows[0][0],
   312             self.assertEqual(cnx.execute('Any N WHERE U login "syt", '
   312                          'deactivated')
   313                                          'U in_state S, S name N').rows[0][0],
   313         self.assertEqual(self.execute('Any N WHERE U login "adim", '
   314                              'deactivated')
   314                                       'U in_state S, S name N').rows[0][0],
   315             self.assertEqual(cnx.execute('Any N WHERE U login "adim", '
   315                          'activated')
   316                                          'U in_state S, S name N').rows[0][0],
   316         # unfilter, syt should be activated again
   317                              'activated')
   317         config['user-filter'] = u''
   318             # unfilter, syt should be activated again
   318         source.repo_source.update_config(source, config)
   319             config['user-filter'] = u''
   319         self.commit()
   320             source.repo_source.update_config(source, config)
   320         self.pull()
   321             cnx.commit()
   321         self.assertEqual(self.execute('Any N WHERE U login "syt", '
   322         with self.repo.internal_cnx() as cnx:
   322                                       'U in_state S, S name N').rows[0][0],
   323             self.pull(cnx)
   323                          'activated')
   324         with self.admin_access.repo_cnx() as cnx:
   324         self.assertEqual(self.execute('Any N WHERE U login "adim", '
   325             self.assertEqual(cnx.execute('Any N WHERE U login "syt", '
   325                                       'U in_state S, S name N').rows[0][0],
   326                                          'U in_state S, S name N').rows[0][0],
   326                          'activated')
   327                              'activated')
       
   328             self.assertEqual(cnx.execute('Any N WHERE U login "adim", '
       
   329                                          'U in_state S, S name N').rows[0][0],
       
   330                              'activated')
   327 
   331 
   328     def test_delete(self):
   332     def test_delete(self):
   329         """ delete syt, pull, check deactivation, repull,
   333         """ delete syt, pull, check deactivation, repull,
   330         read syt, pull, check activation
   334         read syt, pull, check activation
   331         """
   335         """
   332         self.delete_ldap_entry('uid=syt,ou=People,dc=cubicweb,dc=test')
   336         self.delete_ldap_entry('uid=syt,ou=People,dc=cubicweb,dc=test')
   333         self.pull()
   337         with self.repo.internal_cnx() as cnx:
       
   338             self.pull(cnx)
   334         self.assertRaises(AuthenticationError, self.repo.connect, 'syt', password='syt')
   339         self.assertRaises(AuthenticationError, self.repo.connect, 'syt', password='syt')
   335         self.assertEqual(self.execute('Any N WHERE U login "syt", '
   340         with self.admin_access.repo_cnx() as cnx:
   336                                       'U in_state S, S name N').rows[0][0],
   341             self.assertEqual(cnx.execute('Any N WHERE U login "syt", '
   337                          'deactivated')
   342                                          'U in_state S, S name N').rows[0][0],
   338         # check that it doesn't choke
   343                              'deactivated')
   339         self.pull()
   344         with self.repo.internal_cnx() as cnx:
       
   345             # check that it doesn't choke
       
   346             self.pull(cnx)
   340         # reinsert syt
   347         # reinsert syt
   341         self.add_ldap_entry('uid=syt,ou=People,dc=cubicweb,dc=test',
   348         self.add_ldap_entry('uid=syt,ou=People,dc=cubicweb,dc=test',
   342                             { 'objectClass': ['OpenLDAPperson','posixAccount','top','shadowAccount'],
   349                             { 'objectClass': ['OpenLDAPperson','posixAccount','top','shadowAccount'],
   343                               'cn': 'Sylvain Thenault',
   350                               'cn': 'Sylvain Thenault',
   344                               'sn': 'Thenault',
   351                               'sn': 'Thenault',
   351                               'telephoneNumber': '106',
   358                               'telephoneNumber': '106',
   352                               'displayName': 'sthenault',
   359                               'displayName': 'sthenault',
   353                               'gecos': 'Sylvain Thenault',
   360                               'gecos': 'Sylvain Thenault',
   354                               'mail': ['sylvain.thenault@logilab.fr','syt@logilab.fr'],
   361                               'mail': ['sylvain.thenault@logilab.fr','syt@logilab.fr'],
   355                               'userPassword': 'syt',
   362                               'userPassword': 'syt',
   356                              })
   363                           })
   357         self.pull()
   364         with self.repo.internal_cnx() as cnx:
   358         self.assertEqual(self.execute('Any N WHERE U login "syt", '
   365             self.pull(cnx)
   359                                       'U in_state S, S name N').rows[0][0],
   366         with self.admin_access.repo_cnx() as cnx:
   360                          'activated')
   367             self.assertEqual(cnx.execute('Any N WHERE U login "syt", '
       
   368                                          'U in_state S, S name N').rows[0][0],
       
   369                              'activated')
   361 
   370 
   362     def test_reactivate_deleted(self):
   371     def test_reactivate_deleted(self):
   363         # test reactivating BY HAND the user isn't enough to
   372         # test reactivating BY HAND the user isn't enough to
   364         # authenticate, as the native source refuse to authenticate
   373         # authenticate, as the native source refuse to authenticate
   365         # user from other sources
   374         # user from other sources
   366         self.delete_ldap_entry('uid=syt,ou=People,dc=cubicweb,dc=test')
   375         self.delete_ldap_entry('uid=syt,ou=People,dc=cubicweb,dc=test')
   367         self.pull()
   376         with self.repo.internal_cnx() as cnx:
   368         # reactivate user (which source is still ldap-feed)
   377             self.pull(cnx)
   369         user = self.execute('CWUser U WHERE U login "syt"').get_entity(0, 0)
   378         with self.admin_access.repo_cnx() as cnx:
   370         user.cw_adapt_to('IWorkflowable').fire_transition('activate')
   379             # reactivate user (which source is still ldap-feed)
   371         self.commit()
   380             user = cnx.execute('CWUser U WHERE U login "syt"').get_entity(0, 0)
   372         with self.assertRaises(AuthenticationError):
   381             user.cw_adapt_to('IWorkflowable').fire_transition('activate')
   373             self.repo.connect('syt', password='syt')
   382             cnx.commit()
   374 
   383             with self.assertRaises(AuthenticationError):
   375         # ok now let's try to make it a system user
   384                 self.repo.connect('syt', password='syt')
   376         self.sexecute('SET X cw_source S WHERE X eid %(x)s, S name "system"', {'x': user.eid})
   385 
   377         self.commit()
   386             # ok now let's try to make it a system user
       
   387             cnx.execute('SET X cw_source S WHERE X eid %(x)s, S name "system"', {'x': user.eid})
       
   388             cnx.commit()
   378         # and that we can now authenticate again
   389         # and that we can now authenticate again
   379         self.assertRaises(AuthenticationError, self.repo.connect, 'syt', password='toto')
   390         self.assertRaises(AuthenticationError, self.repo.connect, 'syt', password='toto')
   380         self.assertTrue(self.repo.connect('syt', password='syt'))
   391         self.assertTrue(self.repo.connect('syt', password='syt'))
   381 
   392 
       
   393 
   382 class LDAPFeedGroupTC(LDAPFeedTestBase):
   394 class LDAPFeedGroupTC(LDAPFeedTestBase):
   383     """
   395     """
   384     A testcase for group support in ldapfeed.
   396     A testcase for group support in ldapfeed.
   385     """
   397     """
   386 
   398 
   387     def test_groups_exist(self):
   399     def test_groups_exist(self):
   388         rset = self.sexecute('CWGroup X WHERE X name "dir"')
   400         with self.admin_access.repo_cnx() as cnx:
   389         self.assertEqual(len(rset), 1)
   401             rset = cnx.execute('CWGroup X WHERE X name "dir"')
   390 
   402             self.assertEqual(len(rset), 1)
   391         rset = self.sexecute('CWGroup X WHERE X cw_source S, S name "ldap"')
   403 
   392         self.assertEqual(len(rset), 2)
   404             rset = cnx.execute('CWGroup X WHERE X cw_source S, S name "ldap"')
       
   405             self.assertEqual(len(rset), 2)
   393 
   406 
   394     def test_group_deleted(self):
   407     def test_group_deleted(self):
   395         rset = self.sexecute('CWGroup X WHERE X name "dir"')
   408         with self.admin_access.repo_cnx() as cnx:
   396         self.assertEqual(len(rset), 1)
   409             rset = cnx.execute('CWGroup X WHERE X name "dir"')
       
   410             self.assertEqual(len(rset), 1)
   397 
   411 
   398     def test_in_group(self):
   412     def test_in_group(self):
   399         rset = self.sexecute('CWGroup X WHERE X name %(name)s', {'name': 'dir'})
   413         with self.admin_access.repo_cnx() as cnx:
   400         dirgroup = rset.get_entity(0, 0)
   414             rset = cnx.execute('CWGroup X WHERE X name %(name)s', {'name': 'dir'})
   401         self.assertEqual(set(['syt', 'adim']),
   415             dirgroup = rset.get_entity(0, 0)
   402                          set([u.login for u in dirgroup.reverse_in_group]))
   416             self.assertEqual(set(['syt', 'adim']),
   403         rset = self.sexecute('CWGroup X WHERE X name %(name)s', {'name': 'logilab'})
   417                              set([u.login for u in dirgroup.reverse_in_group]))
   404         logilabgroup = rset.get_entity(0, 0)
   418             rset = cnx.execute('CWGroup X WHERE X name %(name)s', {'name': 'logilab'})
   405         self.assertEqual(set(['adim']),
   419             logilabgroup = rset.get_entity(0, 0)
   406                          set([u.login for u in logilabgroup.reverse_in_group]))
   420             self.assertEqual(set(['adim']),
       
   421                              set([u.login for u in logilabgroup.reverse_in_group]))
   407 
   422 
   408     def test_group_member_added(self):
   423     def test_group_member_added(self):
   409         self.pull()
   424         with self.repo.internal_cnx() as cnx:
   410         rset = self.sexecute('Any L WHERE U in_group G, G name %(name)s, U login L',
   425             self.pull(cnx)
   411                              {'name': 'logilab'})
   426         with self.admin_access.repo_cnx() as cnx:
   412         self.assertEqual(len(rset), 1)
   427             rset = cnx.execute('Any L WHERE U in_group G, G name %(name)s, U login L',
   413         self.assertEqual(rset[0][0], 'adim')
   428                                {'name': 'logilab'})
       
   429             self.assertEqual(len(rset), 1)
       
   430             self.assertEqual(rset[0][0], 'adim')
   414 
   431 
   415         try:
   432         try:
   416             self.update_ldap_entry('cn=logilab,ou=Group,dc=cubicweb,dc=test',
   433             self.update_ldap_entry('cn=logilab,ou=Group,dc=cubicweb,dc=test',
   417                                    {('add', 'memberUid'): ['syt']})
   434                                        {('add', 'memberUid'): ['syt']})
   418             time.sleep(1.1) # timestamps precision is 1s
   435             time.sleep(1.1) # timestamps precision is 1s
   419             self.pull()
   436             with self.repo.internal_cnx() as cnx:
   420 
   437                 self.pull(cnx)
   421             rset = self.sexecute('Any L WHERE U in_group G, G name %(name)s, U login L',
   438 
   422                                  {'name': 'logilab'})
   439             with self.admin_access.repo_cnx() as cnx:
   423             self.assertEqual(len(rset), 2)
   440                 rset = cnx.execute('Any L WHERE U in_group G, G name %(name)s, U login L',
   424             members = set([u[0] for u in rset])
   441                                    {'name': 'logilab'})
   425             self.assertEqual(set(['adim', 'syt']), members)
   442                 self.assertEqual(len(rset), 2)
       
   443                 members = set([u[0] for u in rset])
       
   444                 self.assertEqual(set(['adim', 'syt']), members)
   426 
   445 
   427         finally:
   446         finally:
   428             # back to normal ldap setup
   447             # back to normal ldap setup
   429             self.tearDownClass()
   448             self.tearDownClass()
   430             self.setUpClass()
   449             self.setUpClass()
   431 
   450 
   432     def test_group_member_deleted(self):
   451     def test_group_member_deleted(self):
   433         self.pull() # ensure we are sync'ed
   452         with self.repo.internal_cnx() as cnx:
   434         rset = self.sexecute('Any L WHERE U in_group G, G name %(name)s, U login L',
   453             self.pull(cnx) # ensure we are sync'ed
   435                              {'name': 'logilab'})
   454         with self.admin_access.repo_cnx() as cnx:
   436         self.assertEqual(len(rset), 1)
   455             rset = cnx.execute('Any L WHERE U in_group G, G name %(name)s, U login L',
   437         self.assertEqual(rset[0][0], 'adim')
   456                                {'name': 'logilab'})
       
   457             self.assertEqual(len(rset), 1)
       
   458             self.assertEqual(rset[0][0], 'adim')
   438 
   459 
   439         try:
   460         try:
   440             self.update_ldap_entry('cn=logilab,ou=Group,dc=cubicweb,dc=test',
   461             self.update_ldap_entry('cn=logilab,ou=Group,dc=cubicweb,dc=test',
   441                                    {('delete', 'memberUid'): ['adim']})
   462                                    {('delete', 'memberUid'): ['adim']})
   442             time.sleep(1.1) # timestamps precision is 1s
   463             time.sleep(1.1) # timestamps precision is 1s
   443             self.pull()
   464             with self.repo.internal_cnx() as cnx:
   444 
   465                 self.pull(cnx)
   445             rset = self.sexecute('Any L WHERE U in_group G, G name %(name)s, U login L',
   466 
   446                                  {'name': 'logilab'})
   467             with self.admin_access.repo_cnx() as cnx:
   447             self.assertEqual(len(rset), 0)
   468                 rset = cnx.execute('Any L WHERE U in_group G, G name %(name)s, U login L',
       
   469                                    {'name': 'logilab'})
       
   470                 self.assertEqual(len(rset), 0)
   448         finally:
   471         finally:
   449             # back to normal ldap setup
   472             # back to normal ldap setup
   450             self.tearDownClass()
   473             self.tearDownClass()
   451             self.setUpClass()
   474             self.setUpClass()
   452 
   475