# copyright 2003-2012 LOGILAB S.A. (Paris, FRANCE), all rights reserved.# contact http://www.logilab.fr/ -- mailto:contact@logilab.fr## This file is part of CubicWeb.## CubicWeb is free software: you can redistribute it and/or modify it under the# terms of the GNU Lesser General Public License as published by the Free# Software Foundation, either version 2.1 of the License, or (at your option)# any later version.## CubicWeb is distributed in the hope that it will be useful, but WITHOUT# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS# FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more# details.## You should have received a copy of the GNU Lesser General Public License along# with CubicWeb. If not, see <http://www.gnu.org/licenses/>."""cubicweb.server.sources.ldapusers unit and functional tests"""importosimportsysimportshutilimporttimefromos.pathimportjoin,existsimportsubprocessimporttempfilefromlogilab.common.testlibimportTestCase,unittest_main,mock_object,TagsfromcubicwebimportAuthenticationErrorfromcubicweb.devtools.testlibimportCubicWebTCfromcubicweb.devtools.repotestimportRQLGeneratorTCfromcubicweb.devtools.httptestimportget_available_portfromcubicweb.devtoolsimportget_test_db_handlerCONFIG_LDAPFEED=u'''user-base-dn=ou=People,dc=cubicweb,dc=testgroup-base-dn=ou=Group,dc=cubicweb,dc=testuser-attrs-map=uid=login,mail=email,userPassword=upasswordgroup-attrs-map=cn=name,memberUid=member'''CONFIG_LDAPUSER=u'''user-base-dn=ou=People,dc=cubicweb,dc=testuser-attrs-map=uid=login,mail=email,userPassword=upassword'''URL=Nonedefcreate_slapd_configuration(cls):globalURLslapddir=tempfile.mkdtemp('cw-unittest-ldap')config=cls.configslapdconf=join(config.apphome,"slapd.conf")confin=file(join(config.apphome,"slapd.conf.in")).read()confstream=file(slapdconf,'w')confstream.write(confin%{'apphome':config.apphome,'testdir':slapddir})confstream.close()# fill ldap server with some dataldiffile=join(config.apphome,"ldap_test.ldif")config.info('Initing ldap database')cmdline=['/usr/sbin/slapadd','-f',slapdconf,'-l',ldiffile,'-c']PIPE=subprocess.PIPEslapproc=subprocess.Popen(cmdline,stdout=PIPE,stderr=PIPE)stdout,stderr=slapproc.communicate()ifslapproc.returncode:print>>sys.stderr,('slapadd returned with status: %s'%slapproc.returncode)sys.stdout.write(stdout)sys.stderr.write(stderr)#ldapuri = 'ldapi://' + join(basedir, "ldapi").replace('/', '%2f')port=get_available_port(xrange(9000,9100))host='localhost:%s'%portldapuri='ldap://%s'%hostcmdline=["/usr/sbin/slapd","-f",slapdconf,"-h",ldapuri,"-d","0"]config.info('Starting slapd:',' '.join(cmdline))PIPE=subprocess.PIPEcls.slapd_process=subprocess.Popen(cmdline,stdout=PIPE,stderr=PIPE)time.sleep(0.2)ifcls.slapd_process.poll()isNone:config.info('slapd started with pid %s',cls.slapd_process.pid)else:raiseEnvironmentError('Cannot start slapd with cmdline="%s" (from directory "%s")'%(" ".join(cmdline),os.getcwd()))URL=u'ldap://%s'%hostreturnslapddirdefterminate_slapd(cls):config=cls.configifcls.slapd_processandcls.slapd_process.returncodeisNone:config.info('terminating slapd')ifhasattr(cls.slapd_process,'terminate'):cls.slapd_process.terminate()else:importos,signalos.kill(cls.slapd_process.pid,signal.SIGTERM)stdout,stderr=cls.slapd_process.communicate()ifcls.slapd_process.returncode:print>>sys.stderr,('slapd returned with status: %s'%cls.slapd_process.returncode)sys.stdout.write(stdout)sys.stderr.write(stderr)config.info('DONE')classLDAPFeedTestBase(CubicWebTC):test_db_id='ldap-feed'loglevel='ERROR'@classmethoddefsetUpClass(cls):fromcubicweb.cwctlimportinit_cmdline_log_thresholdinit_cmdline_log_threshold(cls.config,cls.loglevel)cls._tmpdir=create_slapd_configuration(cls)@classmethoddeftearDownClass(cls):terminate_slapd(cls)try:shutil.rmtree(cls._tmpdir)except:pass@classmethoddefpre_setup_database(cls,session,config):session.create_entity('CWSource',name=u'ldap',type=u'ldapfeed',parser=u'ldapfeed',url=URL,config=CONFIG_LDAPFEED)session.commit()returncls._pull(session)@classmethoddef_pull(cls,session):withsession.repo.internal_session()asisession:lfsource=isession.repo.sources_by_uri['ldap']stats=lfsource.pull_data(isession,force=True,raise_on_error=True)isession.commit()returnstatsdefpull(self):returnself._pull(self.session)defsetup_database(self):withself.session.repo.internal_session(safe=True)assession:session.execute('DELETE Any E WHERE E cw_source S, S name "ldap"')session.execute('SET S config %(conf)s, S url %(url)s ''WHERE S is CWSource, S name "ldap"',{"conf":CONFIG_LDAPFEED,'url':URL})session.commit()self.pull()defadd_ldap_entry(self,dn,mods):""" add an LDAP entity """modcmd=['dn: %s'%dn,'changetype: add']forkey,valuesinmods.iteritems():ifisinstance(values,basestring):values=[values]forvalueinvalues:modcmd.append('%s: %s'%(key,value))self._ldapmodify(modcmd)defdelete_ldap_entry(self,dn):""" delete an LDAP entity """modcmd=['dn: %s'%dn,'changetype: delete']self._ldapmodify(modcmd)defupdate_ldap_entry(self,dn,mods):""" modify one or more attributes of an LDAP entity """modcmd=['dn: %s'%dn,'changetype: modify']for(kind,key),valuesinmods.iteritems():modcmd.append('%s: %s'%(kind,key))ifisinstance(values,basestring):values=[values]forvalueinvalues:modcmd.append('%s: %s'%(key,value))modcmd.append('-')self._ldapmodify(modcmd)def_ldapmodify(self,modcmd):uri=self.repo.sources_by_uri['ldap'].urls[0]updatecmd=['ldapmodify','-H',uri,'-v','-x','-D','cn=admin,dc=cubicweb,dc=test','-w','cw']PIPE=subprocess.PIPEp=subprocess.Popen(updatecmd,stdin=PIPE,stdout=PIPE,stderr=PIPE)p.stdin.write('\n'.join(modcmd))p.stdin.close()ifp.wait():raiseRuntimeError("ldap update failed: %s"%('\n'.join(p.stderr.readlines())))classCheckWrongGroup(LDAPFeedTestBase):""" A testcase for situations where the default group for CWUser created from LDAP is wrongly configured. """deftest_wrong_group(self):withself.session.repo.internal_session(safe=True)assession:source=self.session.execute('CWSource S WHERE S type="ldapfeed"').get_entity(0,0)config=source.repo_source.check_config(source)# inject a bogus group here, along with at least a valid oneconfig['user-default-group']=('thisgroupdoesnotexists','users')source.repo_source.update_config(source,config)session.commit(free_cnxset=False)# here we emitted an error log entrystats=source.repo_source.pull_data(session,force=True,raise_on_error=True)session.commit()classLDAPFeedUserTC(LDAPFeedTestBase):""" A testcase for CWUser support in ldapfeed (basic tests and authentication). """defassertMetadata(self,entity):self.assertTrue(entity.creation_date)self.assertTrue(entity.modification_date)deftest_authenticate(self):source=self.repo.sources_by_uri['ldap']self.session.set_cnxset()# ensure we won't be logged againstself.assertRaises(AuthenticationError,source.authenticate,self.session,'toto','toto')self.assertTrue(source.authenticate(self.session,'syt','syt'))self.assertTrue(self.repo.connect('syt',password='syt'))deftest_base(self):# check a known onerset=self.sexecute('CWUser X WHERE X login %(login)s',{'login':'syt'})e=rset.get_entity(0,0)self.assertEqual(e.login,'syt')e.complete()self.assertMetadata(e)self.assertEqual(e.firstname,None)self.assertEqual(e.surname,None)self.assertTrue('users'in[g.nameforgine.in_group])self.assertEqual(e.owned_by[0].login,'syt')self.assertEqual(e.created_by,())addresses=[pe.addressforpeine.use_email]addresses.sort()self.assertEqual(['sylvain.thenault@logilab.fr','syt@logilab.fr'],addresses)self.assertIn(e.primary_email[0].address,['sylvain.thenault@logilab.fr','syt@logilab.fr'])# email content should be indexed on the userrset=self.sexecute('CWUser X WHERE X has_text "thenault"')self.assertEqual(rset.rows,[[e.eid]])deftest_copy_to_system_source(self):"make sure we can 'convert' an LDAP user into a system one"source=self.repo.sources_by_uri['ldap']eid=self.sexecute('CWUser X WHERE X login %(login)s',{'login':'syt'})[0][0]self.sexecute('SET X cw_source S WHERE X eid %(x)s, S name "system"',{'x':eid})self.commit()source.reset_caches()rset=self.sexecute('CWUser X WHERE X login %(login)s',{'login':'syt'})self.assertEqual(len(rset),1)e=rset.get_entity(0,0)self.assertEqual(e.eid,eid)self.assertEqual(e.cw_metainformation(),{'source':{'type':u'native','uri':u'system','use-cwuri-as-url':False},'type':'CWUser','extid':None})self.assertEqual(e.cw_source[0].name,'system')self.assertTrue(e.creation_date)self.assertTrue(e.modification_date)source.pull_data(self.session)rset=self.sexecute('CWUser X WHERE X login %(login)s',{'login':'syt'})self.assertEqual(len(rset),1)self.assertTrue(self.repo.system_source.authenticate(self.session,'syt',password='syt'))# make sure the pull from ldap have not "reverted" user as a ldap-feed userself.assertEqual(e.cw_metainformation(),{'source':{'type':u'native','uri':u'system','use-cwuri-as-url':False},'type':'CWUser','extid':None})# and that the password stored in the system source is not empty or souser=self.execute('CWUser U WHERE U login "syt"').get_entity(0,0)user.cw_clear_all_caches()pwd=self.session.system_sql("SELECT cw_upassword FROM cw_cwuser WHERE cw_login='syt';").fetchall()[0][0]self.assertIsNotNone(pwd)self.assertTrue(str(pwd))classLDAPFeedUserDeletionTC(LDAPFeedTestBase):""" A testcase for situations where users are deleted from or unavailabe in the LDAP database. """deftest_a_filter_inactivate(self):""" filtered out people should be deactivated, unable to authenticate """source=self.session.execute('CWSource S WHERE S type="ldapfeed"').get_entity(0,0)config=source.repo_source.check_config(source)# filter with adim's phone numberconfig['user-filter']=u'(%s=%s)'%('telephoneNumber','109')source.repo_source.update_config(source,config)self.commit()self.pull()self.assertRaises(AuthenticationError,self.repo.connect,'syt',password='syt')self.assertEqual(self.execute('Any N WHERE U login "syt", ''U in_state S, S name N').rows[0][0],'deactivated')self.assertEqual(self.execute('Any N WHERE U login "adim", ''U in_state S, S name N').rows[0][0],'activated')# unfilter, syt should be activated againconfig['user-filter']=u''source.repo_source.update_config(source,config)self.commit()self.pull()self.assertEqual(self.execute('Any N WHERE U login "syt", ''U in_state S, S name N').rows[0][0],'activated')self.assertEqual(self.execute('Any N WHERE U login "adim", ''U in_state S, S name N').rows[0][0],'activated')deftest_delete(self):""" delete syt, pull, check deactivation, repull, read syt, pull, check activation """self.delete_ldap_entry('uid=syt,ou=People,dc=cubicweb,dc=test')self.pull()self.assertRaises(AuthenticationError,self.repo.connect,'syt',password='syt')self.assertEqual(self.execute('Any N WHERE U login "syt", ''U in_state S, S name N').rows[0][0],'deactivated')# check that it doesn't chokeself.pull()# reinsert sytself.add_ldap_entry('uid=syt,ou=People,dc=cubicweb,dc=test',{'objectClass':['OpenLDAPperson','posixAccount','top','shadowAccount'],'cn':'Sylvain Thenault','sn':'Thenault','gidNumber':'1004','uid':'syt','homeDirectory':'/home/syt','shadowFlag':'134538764','uidNumber':'1004','givenName':'Sylvain','telephoneNumber':'106','displayName':'sthenault','gecos':'Sylvain Thenault','mail':['sylvain.thenault@logilab.fr','syt@logilab.fr'],'userPassword':'syt',})self.pull()self.assertEqual(self.execute('Any N WHERE U login "syt", ''U in_state S, S name N').rows[0][0],'activated')deftest_reactivate_deleted(self):# test reactivating BY HAND the user isn't enough to# authenticate, as the native source refuse to authenticate# user from other sourcesself.delete_ldap_entry('uid=syt,ou=People,dc=cubicweb,dc=test')self.pull()# reactivate user (which source is still ldap-feed)user=self.execute('CWUser U WHERE U login "syt"').get_entity(0,0)user.cw_adapt_to('IWorkflowable').fire_transition('activate')self.commit()withself.assertRaises(AuthenticationError):self.repo.connect('syt',password='syt')# ok now let's try to make it a system userself.sexecute('SET X cw_source S WHERE X eid %(x)s, S name "system"',{'x':user.eid})self.commit()# and that we can now authenticate againself.assertRaises(AuthenticationError,self.repo.connect,'syt',password='toto')self.assertTrue(self.repo.connect('syt',password='syt'))classLDAPFeedGroupTC(LDAPFeedTestBase):""" A testcase for group support in ldapfeed. """deftest_groups_exist(self):rset=self.sexecute('CWGroup X WHERE X name "dir"')self.assertEqual(len(rset),1)rset=self.sexecute('CWGroup X WHERE X cw_source S, S name "ldap"')self.assertEqual(len(rset),2)deftest_group_deleted(self):rset=self.sexecute('CWGroup X WHERE X name "dir"')self.assertEqual(len(rset),1)deftest_in_group(self):rset=self.sexecute('CWGroup X WHERE X name %(name)s',{'name':'dir'})dirgroup=rset.get_entity(0,0)self.assertEqual(set(['syt','adim']),set([u.loginforuindirgroup.reverse_in_group]))rset=self.sexecute('CWGroup X WHERE X name %(name)s',{'name':'logilab'})logilabgroup=rset.get_entity(0,0)self.assertEqual(set(['adim']),set([u.loginforuinlogilabgroup.reverse_in_group]))deftest_group_member_added(self):self.pull()rset=self.sexecute('Any L WHERE U in_group G, G name %(name)s, U login L',{'name':'logilab'})self.assertEqual(len(rset),1)self.assertEqual(rset[0][0],'adim')try:self.update_ldap_entry('cn=logilab,ou=Group,dc=cubicweb,dc=test',{('add','memberUid'):['syt']})time.sleep(1.1)# timestamps precision is 1sself.pull()rset=self.sexecute('Any L WHERE U in_group G, G name %(name)s, U login L',{'name':'logilab'})self.assertEqual(len(rset),2)members=set([u[0]foruinrset])self.assertEqual(set(['adim','syt']),members)finally:# back to normal ldap setupself.tearDownClass()self.setUpClass()deftest_group_member_deleted(self):self.pull()# ensure we are sync'edrset=self.sexecute('Any L WHERE U in_group G, G name %(name)s, U login L',{'name':'logilab'})self.assertEqual(len(rset),1)self.assertEqual(rset[0][0],'adim')try:self.update_ldap_entry('cn=logilab,ou=Group,dc=cubicweb,dc=test',{('delete','memberUid'):['adim']})time.sleep(1.1)# timestamps precision is 1sself.pull()rset=self.sexecute('Any L WHERE U in_group G, G name %(name)s, U login L',{'name':'logilab'})self.assertEqual(len(rset),0)finally:# back to normal ldap setupself.tearDownClass()self.setUpClass()if__name__=='__main__':unittest_main()