# copyright 2003-2010 LOGILAB S.A. (Paris, FRANCE), all rights reserved.# contact http://www.logilab.fr/ -- mailto:contact@logilab.fr## This file is part of CubicWeb.## CubicWeb is free software: you can redistribute it and/or modify it under the# terms of the GNU Lesser General Public License as published by the Free# Software Foundation, either version 2.1 of the License, or (at your option)# any later version.## CubicWeb is distributed in the hope that it will be useful, but WITHOUT# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS# FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more# details.## You should have received a copy of the GNU Lesser General Public License along# with CubicWeb. If not, see <http://www.gnu.org/licenses/>."""cubicweb.server.sources.ldapusers unit and functional tests"""importsocketfromlogilab.common.testlibimportTestCase,unittest_main,mock_objectfromcubicweb.devtools.testlibimportCubicWebTCfromcubicweb.devtools.repotestimportRQLGeneratorTCfromcubicweb.server.sources.ldapuserimport*if'17.1'insocket.gethostbyname('ldap1'):SYT='syt'SYT_EMAIL='Sylvain Thenault'ADIM='adim'CONFIG=u'''host=ldap1user-base-dn=ou=People,dc=logilab,dc=fruser-scope=ONELEVELuser-classes=top,posixAccountuser-login-attr=uiduser-default-group=usersuser-attrs-map=gecos:email,uid:login'''else:SYT='sthenault'SYT_EMAIL='sylvain.thenault@logilab.fr'ADIM='adimascio'CONFIG=u'''host=ldap1user-base-dn=ou=People,dc=logilab,dc=netuser-scope=ONELEVELuser-classes=top,OpenLDAPpersonuser-login-attr=uiduser-default-group=usersuser-attrs-map=mail:email,uid:login'''defnopwd_authenticate(self,session,login,password):"""used to monkey patch the source to get successful authentication without upassword checking """assertlogin,'no login!'searchfilter=[filter_format('(%s=%s)',(self.user_login_attr,login))]searchfilter.extend([filter_format('(%s=%s)',('objectClass',o))foroinself.user_classes])searchstr='(&%s)'%''.join(searchfilter)# first search the usertry:user=self._search(session,self.user_base_dn,self.user_base_scope,searchstr)[0]exceptIndexError:# no such userraiseAuthenticationError()# don't check upassword !returnself.extid2eid(user['dn'],'CWUser',session)defsetUpModule(*args):globalrepoLDAPUserSourceTC._init_repo()repo=LDAPUserSourceTC.repoadd_ldap_source(LDAPUserSourceTC.cnx)deftearDownModule(*args):globalreporepo.shutdown()delrepodefadd_ldap_source(cnx):cnx.request().create_entity('CWSource',name=u'ldapuser',type=u'ldapuser',config=CONFIG)cnx.commit()# XXX: need this first query else we get 'database is locked' from# sqlite since it doesn't support multiple connections on the same# database# so doing, ldap inserted users don't get removed between each testrset=cnx.cursor().execute('CWUser X')# check we get some users from ldapassertlen(rset)>1classLDAPUserSourceTC(CubicWebTC):defpatch_authenticate(self):self._orig_authenticate=LDAPUserSource.authenticateLDAPUserSource.authenticate=nopwd_authenticatedeftearDown(self):ifhasattr(self,'_orig_authenticate'):LDAPUserSource.authenticate=self._orig_authenticateCubicWebTC.tearDown(self)deftest_authenticate(self):source=self.repo.sources_by_uri['ldapuser']self.session.set_pool()self.assertRaises(AuthenticationError,source.authenticate,self.session,'toto','toto')deftest_synchronize(self):source=self.repo.sources_by_uri['ldapuser']source.synchronize()deftest_base(self):# check a known onee=self.sexecute('CWUser X WHERE X login %(login)s',{'login':SYT}).get_entity(0,0)self.assertEqual(e.login,SYT)e.complete()self.assertEqual(e.creation_date,None)self.assertEqual(e.modification_date,None)self.assertEqual(e.firstname,None)self.assertEqual(e.surname,None)self.assertEqual(e.in_group[0].name,'users')self.assertEqual(e.owned_by[0].login,SYT)self.assertEqual(e.created_by,())self.assertEqual(e.primary_email[0].address,SYT_EMAIL)# email content should be indexed on the userrset=self.sexecute('CWUser X WHERE X has_text "thenault"')self.assertEqual(rset.rows,[[e.eid]])deftest_not(self):eid=self.sexecute('CWUser X WHERE X login %(login)s',{'login':SYT})[0][0]rset=self.sexecute('CWUser X WHERE NOT X eid %s'%eid)self.assert_(rset)self.assert_(noteidin(r[0]forrinrset))deftest_multiple(self):seid=self.sexecute('CWUser X WHERE X login %(login)s',{'login':SYT})[0][0]aeid=self.sexecute('CWUser X WHERE X login %(login)s',{'login':ADIM})[0][0]rset=self.sexecute('CWUser X, Y WHERE X login %(syt)s, Y login %(adim)s',{'syt':SYT,'adim':ADIM})self.assertEqual(rset.rows,[[seid,aeid]])rset=self.sexecute('Any X,Y,L WHERE X login L, X login %(syt)s, Y login %(adim)s',{'syt':SYT,'adim':ADIM})self.assertEqual(rset.rows,[[seid,aeid,SYT]])deftest_in(self):seid=self.sexecute('CWUser X WHERE X login %(login)s',{'login':SYT})[0][0]aeid=self.sexecute('CWUser X WHERE X login %(login)s',{'login':ADIM})[0][0]rset=self.sexecute('Any X,L ORDERBY L WHERE X login IN("%s", "%s"), X login L'%(SYT,ADIM))self.assertEqual(rset.rows,[[aeid,ADIM],[seid,SYT]])deftest_relations(self):eid=self.sexecute('CWUser X WHERE X login %(login)s',{'login':SYT})[0][0]rset=self.sexecute('Any X,E WHERE X is CWUser, X login L, X primary_email E')self.assert_(eidin(r[0]forrinrset))rset=self.sexecute('Any X,L,E WHERE X is CWUser, X login L, X primary_email E')self.assert_(SYTin(r[1]forrinrset))deftest_count(self):nbusers=self.sexecute('Any COUNT(X) WHERE X is CWUser')[0][0]# just check this is a possible numberself.assert_(nbusers>1,nbusers)self.assert_(nbusers<30,nbusers)deftest_upper(self):eid=self.sexecute('CWUser X WHERE X login %(login)s',{'login':SYT})[0][0]rset=self.sexecute('Any UPPER(L) WHERE X eid %s, X login L'%eid)self.assertEqual(rset[0][0],SYT.upper())deftest_unknown_attr(self):eid=self.sexecute('CWUser X WHERE X login %(login)s',{'login':SYT})[0][0]rset=self.sexecute('Any L,C,M WHERE X eid %s, X login L, ''X creation_date C, X modification_date M'%eid)self.assertEqual(rset[0][0],SYT)self.assertEqual(rset[0][1],None)self.assertEqual(rset[0][2],None)deftest_sort(self):logins=[lforl,inself.sexecute('Any L ORDERBY L WHERE X login L')]self.assertEqual(logins,sorted(logins))deftest_lower_sort(self):logins=[lforl,inself.sexecute('Any L ORDERBY lower(L) WHERE X login L')]self.assertEqual(logins,sorted(logins))deftest_or(self):rset=self.sexecute('DISTINCT Any X WHERE X login %(login)s OR (X in_group G, G name "managers")',{'login':SYT})self.assertEqual(len(rset),2,rset.rows)# syt + admindeftest_nonregr_set_owned_by(self):# test that when a user coming from ldap is triggering a transition# the related TrInfo has correct owner informationself.sexecute('SET X in_group G WHERE X login %(syt)s, G name "managers"',{'syt':SYT})self.commit()syt=self.sexecute('CWUser X WHERE X login %(login)s',{'login':SYT}).get_entity(0,0)self.assertEqual([g.nameforginsyt.in_group],['managers','users'])self.patch_authenticate()cnx=self.login(SYT,password='dummypassword')cu=cnx.cursor()adim=cu.execute('CWUser X WHERE X login %(login)s',{'login':ADIM}).get_entity(0,0)iworkflowable=adim.cw_adapt_to('IWorkflowable')iworkflowable.fire_transition('deactivate')try:cnx.commit()adim.clear_all_caches()self.assertEqual(adim.in_state[0].name,'deactivated')trinfo=iworkflowable.latest_trinfo()self.assertEqual(trinfo.owned_by[0].login,SYT)# select from_state to skip the user's creation TrInforset=self.sexecute('Any U ORDERBY D DESC WHERE WF wf_info_for X,''WF creation_date D, WF from_state FS,''WF owned_by U?, X eid %(x)s',{'x':adim.eid})self.assertEqual(rset.rows,[[syt.eid]])finally:# restore db stateself.restore_connection()adim=self.sexecute('CWUser X WHERE X login %(login)s',{'login':ADIM}).get_entity(0,0)adim.cw_adapt_to('IWorkflowable').fire_transition('activate')self.sexecute('DELETE X in_group G WHERE X login %(syt)s, G name "managers"',{'syt':SYT})deftest_same_column_names(self):self.sexecute('Any X, Y WHERE X copain Y, X login "comme", Y login "cochon"')deftest_multiple_entities_from_different_sources(self):self.create_user('cochon')self.failUnless(self.sexecute('Any X,Y WHERE X login %(syt)s, Y login "cochon"',{'syt':SYT}))deftest_exists1(self):self.session.set_pool()self.session.create_entity('CWGroup',name=u'bougloup1')self.session.create_entity('CWGroup',name=u'bougloup2')self.sexecute('SET U in_group G WHERE G name ~= "bougloup%", U login "admin"')self.sexecute('SET U in_group G WHERE G name = "bougloup1", U login %(syt)s',{'syt':SYT})rset=self.sexecute('Any L,SN ORDERBY L WHERE X in_state S, S name SN, X login L, EXISTS(X in_group G, G name ~= "bougloup%")')self.assertEqual(rset.rows,[['admin','activated'],[SYT,'activated']])deftest_exists2(self):self.create_user('comme')self.create_user('cochon')self.sexecute('SET X copain Y WHERE X login "comme", Y login "cochon"')rset=self.sexecute('Any GN ORDERBY GN WHERE X in_group G, G name GN, (G name "managers" OR EXISTS(X copain T, T login in ("comme", "cochon")))')self.assertEqual(rset.rows,[['managers'],['users']])deftest_exists3(self):self.create_user('comme')self.create_user('cochon')self.sexecute('SET X copain Y WHERE X login "comme", Y login "cochon"')self.failUnless(self.sexecute('Any X, Y WHERE X copain Y, X login "comme", Y login "cochon"'))self.sexecute('SET X copain Y WHERE X login %(syt)s, Y login "cochon"',{'syt':SYT})self.failUnless(self.sexecute('Any X, Y WHERE X copain Y, X login %(syt)s, Y login "cochon"',{'syt':SYT}))rset=self.sexecute('Any GN,L WHERE X in_group G, X login L, G name GN, G name "managers" OR EXISTS(X copain T, T login in ("comme", "cochon"))')self.assertEqual(sorted(rset.rows),[['managers','admin'],['users','comme'],['users',SYT]])deftest_exists4(self):self.create_user('comme')self.create_user('cochon',groups=('users','guests'))self.create_user('billy')self.sexecute('SET X copain Y WHERE X login "comme", Y login "cochon"')self.sexecute('SET X copain Y WHERE X login "cochon", Y login "cochon"')self.sexecute('SET X copain Y WHERE X login "comme", Y login "billy"')self.sexecute('SET X copain Y WHERE X login %(syt)s, Y login "billy"',{'syt':SYT})# search for group name, login where# CWUser copain with "comme" or "cochon" AND same login as the copain# OR# CWUser in_state activated AND not copain with billy## SO we expect everybody but "comme" and "syt"rset=self.sexecute('Any GN,L WHERE X in_group G, X login L, G name GN, ''EXISTS(X copain T, T login L, T login in ("comme", "cochon")) OR ''EXISTS(X in_state S, S name "activated", NOT X copain T2, T2 login "billy")')all=self.sexecute('Any GN, L WHERE X in_group G, X login L, G name GN')all.rows.remove(['users','comme'])all.rows.remove(['users',SYT])self.assertEqual(sorted(rset.rows),sorted(all.rows))deftest_exists5(self):self.create_user('comme')self.create_user('cochon',groups=('users','guests'))self.create_user('billy')self.sexecute('SET X copain Y WHERE X login "comme", Y login "cochon"')self.sexecute('SET X copain Y WHERE X login "cochon", Y login "cochon"')self.sexecute('SET X copain Y WHERE X login "comme", Y login "billy"')self.sexecute('SET X copain Y WHERE X login %(syt)s, Y login "cochon"',{'syt':SYT})rset=self.sexecute('Any L WHERE X login L, ''EXISTS(X copain T, T login in ("comme", "cochon")) AND ''NOT EXISTS(X copain T2, T2 login "billy")')self.assertEqual(sorted(rset.rows),[['cochon'],[SYT]])rset=self.sexecute('Any GN,L WHERE X in_group G, X login L, G name GN, ''EXISTS(X copain T, T login in ("comme", "cochon")) AND ''NOT EXISTS(X copain T2, T2 login "billy")')self.assertEqual(sorted(rset.rows),[['guests','cochon'],['users','cochon'],['users',SYT]])deftest_cd_restriction(self):rset=self.sexecute('CWUser X WHERE X creation_date > "2009-02-01"')# admin/anon but no ldap user since it doesn't support creation_dateself.assertEqual(sorted(e.loginforeinrset.entities()),['admin','anon'])deftest_union(self):afeids=self.sexecute('State X')ueids=self.sexecute('CWUser X')rset=self.sexecute('(Any X WHERE X is State) UNION (Any X WHERE X is CWUser)')self.assertEqual(sorted(r[0]forrinrset.rows),sorted(r[0]forrinafeids+ueids))def_init_security_test(self):self.create_user('iaminguestsgrouponly',groups=('guests',))cnx=self.login('iaminguestsgrouponly')returncnx.cursor()deftest_security1(self):cu=self._init_security_test()rset=cu.execute('CWUser X WHERE X login %(login)s',{'login':SYT})self.assertEqual(rset.rows,[])rset=cu.execute('Any X WHERE X login "iaminguestsgrouponly"')self.assertEqual(len(rset.rows),1)deftest_security2(self):cu=self._init_security_test()rset=cu.execute('Any X WHERE X has_text %(syt)s',{'syt':SYT})self.assertEqual(rset.rows,[])rset=cu.execute('Any X WHERE X has_text "iaminguestsgrouponly"')self.assertEqual(len(rset.rows),1)deftest_security3(self):cu=self._init_security_test()rset=cu.execute('Any F WHERE X has_text %(syt)s, X firstname F',{'syt':SYT})self.assertEqual(rset.rows,[])rset=cu.execute('Any F WHERE X has_text "iaminguestsgrouponly", X firstname F')self.assertEqual(rset.rows,[[None]])deftest_nonregr1(self):self.sexecute('Any X,AA ORDERBY AA DESC WHERE E eid %(x)s, E owned_by X, ''X modification_date AA',{'x':self.session.user.eid})deftest_nonregr2(self):self.sexecute('Any X,L,AA WHERE E eid %(x)s, E owned_by X, ''X login L, X modification_date AA',{'x':self.session.user.eid})deftest_nonregr3(self):self.sexecute('Any X,AA ORDERBY AA DESC WHERE E eid %(x)s, ''X modification_date AA',{'x':self.session.user.eid})deftest_nonregr4(self):emaileid=self.sexecute('INSERT EmailAddress X: X address "toto@logilab.org"')[0][0]self.sexecute('Any X,AA WHERE X use_email Y, Y eid %(x)s, X modification_date AA',{'x':emaileid})deftest_nonregr5(self):# original jpl query:# Any X, NOW - CD, P WHERE P is Project, U interested_in P, U is CWUser, U login "sthenault", X concerns P, X creation_date CD ORDERBY CD DESC LIMIT 5rql='Any X, NOW - CD, P ORDERBY CD DESC LIMIT 5 WHERE P bookmarked_by U, U login "%s", P is X, X creation_date CD'%self.session.user.loginself.sexecute(rql,)#{'x': })deftest_nonregr6(self):self.sexecute('Any B,U,UL GROUPBY B,U,UL WHERE B created_by U?, B is File ''WITH U,UL BEING (Any U,UL WHERE ME eid %(x)s, (EXISTS(U identity ME) ''OR (EXISTS(U in_group G, G name IN("managers", "staff")))) ''OR (EXISTS(U in_group H, ME in_group H, NOT H name "users")), U login UL, U is CWUser)',{'x':self.session.user.eid})classGlobTrFuncTC(TestCase):deftest_count(self):trfunc=GlobTrFunc('count',0)res=trfunc.apply([[1],[2],[3],[4]])self.assertEqual(res,[[4]])trfunc=GlobTrFunc('count',1)res=trfunc.apply([[1,2],[2,4],[3,6],[1,5]])self.assertEqual(res,[[1,2],[2,1],[3,1]])deftest_sum(self):trfunc=GlobTrFunc('sum',0)res=trfunc.apply([[1],[2],[3],[4]])self.assertEqual(res,[[10]])trfunc=GlobTrFunc('sum',1)res=trfunc.apply([[1,2],[2,4],[3,6],[1,5]])self.assertEqual(res,[[1,7],[2,4],[3,6]])deftest_min(self):trfunc=GlobTrFunc('min',0)res=trfunc.apply([[1],[2],[3],[4]])self.assertEqual(res,[[1]])trfunc=GlobTrFunc('min',1)res=trfunc.apply([[1,2],[2,4],[3,6],[1,5]])self.assertEqual(res,[[1,2],[2,4],[3,6]])deftest_max(self):trfunc=GlobTrFunc('max',0)res=trfunc.apply([[1],[2],[3],[4]])self.assertEqual(res,[[4]])trfunc=GlobTrFunc('max',1)res=trfunc.apply([[1,2],[2,4],[3,6],[1,5]])self.assertEqual(res,[[1,5],[2,4],[3,6]])classRQL2LDAPFilterTC(RQLGeneratorTC):defsetUp(self):self.schema=repo.schemaRQLGeneratorTC.setUp(self)ldapsource=repo.sources[-1]self.pool=repo._get_pool()session=mock_object(pool=self.pool)self.o=RQL2LDAPFilter(ldapsource,session)self.ldapclasses=''.join('(objectClass=%s)'%ldapclsforldapclsinldapsource.user_classes)deftearDown(self):repo._free_pool(self.pool)RQLGeneratorTC.tearDown(self)deftest_base(self):rqlst=self._prepare('CWUser X WHERE X login "toto"').children[0]self.assertEqual(self.o.generate(rqlst,'X')[1],'(&%s(uid=toto))'%self.ldapclasses)deftest_kwargs(self):rqlst=self._prepare('CWUser X WHERE X login %(x)s').children[0]self.o._args={'x':"toto"}self.assertEqual(self.o.generate(rqlst,'X')[1],'(&%s(uid=toto))'%self.ldapclasses)deftest_get_attr(self):rqlst=self._prepare('Any X WHERE E firstname X, E eid 12').children[0]self.assertRaises(UnknownEid,self.o.generate,rqlst,'E')if__name__=='__main__':unittest_main()