# copyright 2003-2014 LOGILAB S.A. (Paris, FRANCE), all rights reserved.# contact http://www.logilab.fr/ -- mailto:contact@logilab.fr## This file is part of CubicWeb.## CubicWeb is free software: you can redistribute it and/or modify it under the# terms of the GNU Lesser General Public License as published by the Free# Software Foundation, either version 2.1 of the License, or (at your option)# any later version.## CubicWeb is distributed in the hope that it will be useful, but WITHOUT# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS# FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more# details.## You should have received a copy of the GNU Lesser General Public License along# with CubicWeb. If not, see <http://www.gnu.org/licenses/>."""cubicweb.server.sources.ldapusers unit and functional tests"""from__future__importprint_functionimportosimportsysimportshutilimporttimefromos.pathimportjoin,existsimportsubprocessimporttempfilefromsiximportstring_typesfromsix.movesimportrangefromcubicwebimportAuthenticationErrorfromcubicweb.devtools.testlibimportCubicWebTCfromcubicweb.devtools.repotestimportRQLGeneratorTCfromcubicweb.devtools.httptestimportget_available_portCONFIG_LDAPFEED=u'''user-base-dn=ou=People,dc=cubicweb,dc=testgroup-base-dn=ou=Group,dc=cubicweb,dc=testuser-attrs-map=uid=login,mail=email,userPassword=upasswordgroup-attrs-map=cn=name,memberUid=member'''CONFIG_LDAPUSER=u'''user-base-dn=ou=People,dc=cubicweb,dc=testuser-attrs-map=uid=login,mail=email,userPassword=upassword'''URL=Nonedefcreate_slapd_configuration(cls):globalURLslapddir=tempfile.mkdtemp('cw-unittest-ldap')config=cls.configslapdconf=join(config.apphome,"slapd.conf")confin=open(join(config.apphome,"slapd.conf.in")).read()confstream=open(slapdconf,'w')confstream.write(confin%{'apphome':config.apphome,'testdir':slapddir})confstream.close()# fill ldap server with some dataldiffile=join(config.apphome,"ldap_test.ldif")config.info('Initing ldap database')cmdline=['/usr/sbin/slapadd','-f',slapdconf,'-l',ldiffile,'-c']PIPE=subprocess.PIPEslapproc=subprocess.Popen(cmdline,stdout=PIPE,stderr=PIPE)stdout,stderr=slapproc.communicate()ifslapproc.returncode:print('slapadd returned with status: %s'%slapproc.returncode,file=sys.stderr)sys.stdout.write(stdout)sys.stderr.write(stderr)#ldapuri = 'ldapi://' + join(basedir, "ldapi").replace('/', '%2f')port=get_available_port(range(9000,9100))host='localhost:%s'%portldapuri='ldap://%s'%hostcmdline=["/usr/sbin/slapd","-f",slapdconf,"-h",ldapuri,"-d","0"]config.info('Starting slapd:',' '.join(cmdline))PIPE=subprocess.PIPEcls.slapd_process=subprocess.Popen(cmdline,stdout=PIPE,stderr=PIPE)time.sleep(0.2)ifcls.slapd_process.poll()isNone:config.info('slapd started with pid %s',cls.slapd_process.pid)else:raiseEnvironmentError('Cannot start slapd with cmdline="%s" (from directory "%s")'%(" ".join(cmdline),os.getcwd()))URL=u'ldap://%s'%hostreturnslapddirdefterminate_slapd(cls):config=cls.configifcls.slapd_processandcls.slapd_process.returncodeisNone:config.info('terminating slapd')ifhasattr(cls.slapd_process,'terminate'):cls.slapd_process.terminate()else:importos,signalos.kill(cls.slapd_process.pid,signal.SIGTERM)stdout,stderr=cls.slapd_process.communicate()ifcls.slapd_process.returncode:print('slapd returned with status: %s'%cls.slapd_process.returncode,file=sys.stderr)sys.stdout.write(stdout)sys.stderr.write(stderr)config.info('DONE')classLDAPFeedTestBase(CubicWebTC):test_db_id='ldap-feed'loglevel='ERROR'@classmethoddefsetUpClass(cls):fromcubicweb.cwctlimportinit_cmdline_log_thresholdinit_cmdline_log_threshold(cls.config,cls.loglevel)cls._tmpdir=create_slapd_configuration(cls)@classmethoddeftearDownClass(cls):terminate_slapd(cls)try:shutil.rmtree(cls._tmpdir)except:pass@classmethoddefpre_setup_database(cls,cnx,config):cnx.create_entity('CWSource',name=u'ldap',type=u'ldapfeed',parser=u'ldapfeed',url=URL,config=CONFIG_LDAPFEED)cnx.commit()returncls.pull(cnx)@classmethoddefpull(self,cnx):lfsource=cnx.repo.sources_by_uri['ldap']stats=lfsource.pull_data(cnx,force=True,raise_on_error=True)cnx.commit()returnstatsdefsetup_database(self):withself.admin_access.repo_cnx()ascnx:cnx.execute('DELETE Any E WHERE E cw_source S, S name "ldap"')cnx.execute('SET S config %(conf)s, S url %(url)s ''WHERE S is CWSource, S name "ldap"',{"conf":CONFIG_LDAPFEED,'url':URL})cnx.commit()withself.repo.internal_cnx()ascnx:self.pull(cnx)defadd_ldap_entry(self,dn,mods):""" add an LDAP entity """modcmd=['dn: %s'%dn,'changetype: add']forkey,valuesinmods.items():ifisinstance(values,string_types):values=[values]forvalueinvalues:modcmd.append('%s: %s'%(key,value))self._ldapmodify(modcmd)defdelete_ldap_entry(self,dn):""" delete an LDAP entity """modcmd=['dn: %s'%dn,'changetype: delete']self._ldapmodify(modcmd)defupdate_ldap_entry(self,dn,mods):""" modify one or more attributes of an LDAP entity """modcmd=['dn: %s'%dn,'changetype: modify']for(kind,key),valuesinmods.items():modcmd.append('%s: %s'%(kind,key))ifisinstance(values,string_types):values=[values]forvalueinvalues:modcmd.append('%s: %s'%(key,value))modcmd.append('-')self._ldapmodify(modcmd)def_ldapmodify(self,modcmd):uri=self.repo.sources_by_uri['ldap'].urls[0]updatecmd=['ldapmodify','-H',uri,'-v','-x','-D','cn=admin,dc=cubicweb,dc=test','-w','cw']PIPE=subprocess.PIPEp=subprocess.Popen(updatecmd,stdin=PIPE,stdout=PIPE,stderr=PIPE)p.stdin.write('\n'.join(modcmd).encode('ascii'))p.stdin.close()ifp.wait():raiseRuntimeError("ldap update failed: %s"%('\n'.join(p.stderr.readlines())))classCheckWrongGroup(LDAPFeedTestBase):""" A testcase for situations where the default group for CWUser created from LDAP is wrongly configured. """deftest_wrong_group(self):withself.admin_access.repo_cnx()ascnx:source=cnx.execute('CWSource S WHERE S type="ldapfeed"').get_entity(0,0)config=source.repo_source.check_config(source)# inject a bogus group here, along with at least a valid oneconfig['user-default-group']=('thisgroupdoesnotexists','users')source.repo_source.update_config(source,config)cnx.commit()# here we emitted an error log entrystats=source.repo_source.pull_data(cnx,force=True,raise_on_error=True)cnx.commit()classLDAPFeedUserTC(LDAPFeedTestBase):""" A testcase for CWUser support in ldapfeed (basic tests and authentication). """defassertMetadata(self,entity):self.assertTrue(entity.creation_date)self.assertTrue(entity.modification_date)deftest_authenticate(self):source=self.repo.sources_by_uri['ldap']withself.admin_access.repo_cnx()ascnx:# ensure we won't be logged againstself.assertRaises(AuthenticationError,source.authenticate,cnx,'toto','toto')self.assertTrue(source.authenticate(cnx,'syt','syt'))sessionid=self.repo.connect('syt',password='syt')self.assertTrue(sessionid)self.repo.close(sessionid)deftest_base(self):withself.admin_access.repo_cnx()ascnx:# check a known onerset=cnx.execute('CWUser X WHERE X login %(login)s',{'login':'syt'})e=rset.get_entity(0,0)self.assertEqual(e.login,'syt')e.complete()self.assertMetadata(e)self.assertEqual(e.firstname,None)self.assertEqual(e.surname,None)self.assertIn('users',set(g.nameforgine.in_group))self.assertEqual(e.owned_by[0].login,'syt')self.assertEqual(e.created_by,())addresses=[pe.addressforpeine.use_email]addresses.sort()self.assertEqual(['sylvain.thenault@logilab.fr','syt@logilab.fr'],addresses)self.assertIn(e.primary_email[0].address,['sylvain.thenault@logilab.fr','syt@logilab.fr'])# email content should be indexed on the userrset=cnx.execute('CWUser X WHERE X has_text "thenault"')self.assertEqual(rset.rows,[[e.eid]])deftest_copy_to_system_source(self):"make sure we can 'convert' an LDAP user into a system one"withself.admin_access.repo_cnx()ascnx:source=self.repo.sources_by_uri['ldap']eid=cnx.execute('CWUser X WHERE X login %(login)s',{'login':'syt'})[0][0]cnx.execute('SET X cw_source S WHERE X eid %(x)s, S name "system"',{'x':eid})cnx.commit()source.reset_caches()rset=cnx.execute('CWUser X WHERE X login %(login)s',{'login':'syt'})self.assertEqual(len(rset),1)e=rset.get_entity(0,0)self.assertEqual(e.eid,eid)self.assertEqual(e.cw_metainformation(),{'source':{'type':u'native','uri':u'system','use-cwuri-as-url':False},'type':'CWUser','extid':None})self.assertEqual(e.cw_source[0].name,'system')self.assertTrue(e.creation_date)self.assertTrue(e.modification_date)source.pull_data(cnx)rset=cnx.execute('CWUser X WHERE X login %(login)s',{'login':'syt'})self.assertEqual(len(rset),1)self.assertTrue(self.repo.system_source.authenticate(cnx,'syt',password='syt'))# make sure the pull from ldap have not "reverted" user as a ldap-feed userself.assertEqual(e.cw_metainformation(),{'source':{'type':u'native','uri':u'system','use-cwuri-as-url':False},'type':'CWUser','extid':None})# and that the password stored in the system source is not empty or souser=cnx.execute('CWUser U WHERE U login "syt"').get_entity(0,0)user.cw_clear_all_caches()pwd=cnx.system_sql("SELECT cw_upassword FROM cw_cwuser WHERE cw_login='syt';").fetchall()[0][0]self.assertIsNotNone(pwd)self.assertTrue(str(pwd))classLDAPFeedUserDeletionTC(LDAPFeedTestBase):""" A testcase for situations where users are deleted from or unavailable in the LDAP database. """deftest_a_filter_inactivate(self):""" filtered out people should be deactivated, unable to authenticate """withself.admin_access.repo_cnx()ascnx:source=cnx.execute('CWSource S WHERE S type="ldapfeed"').get_entity(0,0)config=source.repo_source.check_config(source)# filter with adim's phone numberconfig['user-filter']=u'(%s=%s)'%('telephoneNumber','109')source.repo_source.update_config(source,config)cnx.commit()withself.repo.internal_cnx()ascnx:self.pull(cnx)self.assertRaises(AuthenticationError,self.repo.connect,'syt',password='syt')withself.admin_access.repo_cnx()ascnx:self.assertEqual(cnx.execute('Any N WHERE U login "syt", ''U in_state S, S name N').rows[0][0],'deactivated')self.assertEqual(cnx.execute('Any N WHERE U login "adim", ''U in_state S, S name N').rows[0][0],'activated')# unfilter, syt should be activated againconfig['user-filter']=u''source.repo_source.update_config(source,config)cnx.commit()withself.repo.internal_cnx()ascnx:self.pull(cnx)withself.admin_access.repo_cnx()ascnx:self.assertEqual(cnx.execute('Any N WHERE U login "syt", ''U in_state S, S name N').rows[0][0],'activated')self.assertEqual(cnx.execute('Any N WHERE U login "adim", ''U in_state S, S name N').rows[0][0],'activated')deftest_delete(self):""" delete syt, pull, check deactivation, repull, read syt, pull, check activation """self.delete_ldap_entry('uid=syt,ou=People,dc=cubicweb,dc=test')withself.repo.internal_cnx()ascnx:self.pull(cnx)self.assertRaises(AuthenticationError,self.repo.connect,'syt',password='syt')withself.admin_access.repo_cnx()ascnx:self.assertEqual(cnx.execute('Any N WHERE U login "syt", ''U in_state S, S name N').rows[0][0],'deactivated')withself.repo.internal_cnx()ascnx:# check that it doesn't chokeself.pull(cnx)# reinsert sytself.add_ldap_entry('uid=syt,ou=People,dc=cubicweb,dc=test',{'objectClass':['OpenLDAPperson','posixAccount','top','shadowAccount'],'cn':'Sylvain Thenault','sn':'Thenault','gidNumber':'1004','uid':'syt','homeDirectory':'/home/syt','shadowFlag':'134538764','uidNumber':'1004','givenName':'Sylvain','telephoneNumber':'106','displayName':'sthenault','gecos':'Sylvain Thenault','mail':['sylvain.thenault@logilab.fr','syt@logilab.fr'],'userPassword':'syt',})withself.repo.internal_cnx()ascnx:self.pull(cnx)withself.admin_access.repo_cnx()ascnx:self.assertEqual(cnx.execute('Any N WHERE U login "syt", ''U in_state S, S name N').rows[0][0],'activated')deftest_reactivate_deleted(self):# test reactivating BY HAND the user isn't enough to# authenticate, as the native source refuse to authenticate# user from other sourcesself.delete_ldap_entry('uid=syt,ou=People,dc=cubicweb,dc=test')withself.repo.internal_cnx()ascnx:self.pull(cnx)withself.admin_access.repo_cnx()ascnx:# reactivate user (which source is still ldap-feed)user=cnx.execute('CWUser U WHERE U login "syt"').get_entity(0,0)user.cw_adapt_to('IWorkflowable').fire_transition('activate')cnx.commit()withself.assertRaises(AuthenticationError):self.repo.connect('syt',password='syt')# ok now let's try to make it a system usercnx.execute('SET X cw_source S WHERE X eid %(x)s, S name "system"',{'x':user.eid})cnx.commit()# and that we can now authenticate againself.assertRaises(AuthenticationError,self.repo.connect,'syt',password='toto')sessionid=self.repo.connect('syt',password='syt')self.assertTrue(sessionid)self.repo.close(sessionid)classLDAPFeedGroupTC(LDAPFeedTestBase):""" A testcase for group support in ldapfeed. """deftest_groups_exist(self):withself.admin_access.repo_cnx()ascnx:rset=cnx.execute('CWGroup X WHERE X name "dir"')self.assertEqual(len(rset),1)rset=cnx.execute('CWGroup X WHERE X cw_source S, S name "ldap"')self.assertEqual(len(rset),2)deftest_group_deleted(self):withself.admin_access.repo_cnx()ascnx:rset=cnx.execute('CWGroup X WHERE X name "dir"')self.assertEqual(len(rset),1)deftest_in_group(self):withself.admin_access.repo_cnx()ascnx:rset=cnx.execute('CWGroup X WHERE X name %(name)s',{'name':'dir'})dirgroup=rset.get_entity(0,0)self.assertEqual(set(['syt','adim']),set([u.loginforuindirgroup.reverse_in_group]))rset=cnx.execute('CWGroup X WHERE X name %(name)s',{'name':'logilab'})logilabgroup=rset.get_entity(0,0)self.assertEqual(set(['adim']),set([u.loginforuinlogilabgroup.reverse_in_group]))deftest_group_member_added(self):withself.repo.internal_cnx()ascnx:self.pull(cnx)withself.admin_access.repo_cnx()ascnx:rset=cnx.execute('Any L WHERE U in_group G, G name %(name)s, U login L',{'name':'logilab'})self.assertEqual(len(rset),1)self.assertEqual(rset[0][0],'adim')try:self.update_ldap_entry('cn=logilab,ou=Group,dc=cubicweb,dc=test',{('add','memberUid'):['syt']})time.sleep(1.1)# timestamps precision is 1swithself.repo.internal_cnx()ascnx:self.pull(cnx)withself.admin_access.repo_cnx()ascnx:rset=cnx.execute('Any L WHERE U in_group G, G name %(name)s, U login L',{'name':'logilab'})self.assertEqual(len(rset),2)members=set([u[0]foruinrset])self.assertEqual(set(['adim','syt']),members)finally:# back to normal ldap setupself.tearDownClass()self.setUpClass()deftest_group_member_deleted(self):withself.repo.internal_cnx()ascnx:self.pull(cnx)# ensure we are sync'edwithself.admin_access.repo_cnx()ascnx:rset=cnx.execute('Any L WHERE U in_group G, G name %(name)s, U login L',{'name':'logilab'})self.assertEqual(len(rset),1)self.assertEqual(rset[0][0],'adim')try:self.update_ldap_entry('cn=logilab,ou=Group,dc=cubicweb,dc=test',{('delete','memberUid'):['adim']})time.sleep(1.1)# timestamps precision is 1swithself.repo.internal_cnx()ascnx:self.pull(cnx)withself.admin_access.repo_cnx()ascnx:rset=cnx.execute('Any L WHERE U in_group G, G name %(name)s, U login L',{'name':'logilab'})self.assertEqual(len(rset),0)finally:# back to normal ldap setupself.tearDownClass()self.setUpClass()if__name__=='__main__':fromlogilab.common.testlibimportunittest_mainunittest_main()