[session] make session lock reentrant
We are going to use it for other session related business. It is renamed from
_closed_lock to _lock in the same move.
# copyright 2003-2013 LOGILAB S.A. (Paris, FRANCE), all rights reserved.# contact http://www.logilab.fr/ -- mailto:contact@logilab.fr## This file is part of CubicWeb.## CubicWeb is free software: you can redistribute it and/or modify it under the# terms of the GNU Lesser General Public License as published by the Free# Software Foundation, either version 2.1 of the License, or (at your option)# any later version.## CubicWeb is distributed in the hope that it will be useful, but WITHOUT# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS# FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more# details.## You should have received a copy of the GNU Lesser General Public License along# with CubicWeb. If not, see <http://www.gnu.org/licenses/>."""cubicweb ldap user sourcethis source is for now limited to a read-only CWUser source"""from__future__importdivision,with_statementfrombase64importb64decodeimportldapfromldap.filterimportescape_filter_charsfromrql.nodesimportRelation,VariableRef,Constant,FunctionfromcubicwebimportUnknownEid,RepositoryErrorfromcubicweb.serverimportldaputilsfromcubicweb.server.utilsimportcartesian_productfromcubicweb.server.sourcesimport(AbstractSource,TrFunc,GlobTrFunc,TimedCache)# search scopesBASE=ldap.SCOPE_BASEONELEVEL=ldap.SCOPE_ONELEVELSUBTREE=ldap.SCOPE_SUBTREE# map ldap protocol to their standard portPROTO_PORT={'ldap':389,'ldaps':636,'ldapi':None,}classLDAPUserSource(ldaputils.LDAPSourceMixIn,AbstractSource):"""LDAP read-only CWUser source"""support_entities={'CWUser':False}options=ldaputils.LDAPSourceMixIn.options+(('synchronization-interval',{'type':'time','default':'1d','help':'interval between synchronization with the ldap \directory (default to once a day).','group':'ldap-source','level':3,}),('cache-life-time',{'type':'time','default':'2h','help':'life time of query cache (default to two hours).','group':'ldap-source','level':3,}),)defupdate_config(self,source_entity,typedconfig):"""update configuration from source entity. `typedconfig` is config properly typed with defaults set """super(LDAPUserSource,self).update_config(source_entity,typedconfig)self._interval=typedconfig['synchronization-interval']self._cache_ttl=max(71,typedconfig['cache-life-time'])self.reset_caches()# XXX copy from datafeed sourceifsource_entityisnotNone:self._entity_update(source_entity)self.config=typedconfig# /end XXXdefreset_caches(self):"""method called during test to reset potential source caches"""self._cache={}self._query_cache=TimedCache(self._cache_ttl)definit(self,activated,source_entity):"""method called by the repository once ready to handle request"""super(LDAPUserSource,self).init(activated,source_entity)ifactivated:self.info('ldap init')# set minimum period of 5min 1s (the additional second is to# minimize resonnance effet)ifself.user_rev_attrs['email']:self.repo.looping_task(max(301,self._interval),self.synchronize)self.repo.looping_task(self._cache_ttl//10,self._query_cache.clear_expired)defsynchronize(self):withself.repo.internal_session()assession:self.pull_data(session)defpull_data(self,session,force=False,raise_on_error=False):"""synchronize content known by this repository with content in the external repository """self.info('synchronizing ldap source %s',self.uri)ldap_emailattr=self.user_rev_attrs['email']assertldap_emailattrexecute=session.executecursor=session.system_sql("SELECT eid, extid FROM entities WHERE ""source='%s'"%self.uri)foreid,b64extidincursor.fetchall():extid=b64decode(b64extid)self.debug('ldap eid %s',eid)# if no result found, _search automatically delete entity informationres=self._search(session,extid,BASE)self.debug('ldap search %s',res)ifres:ldapemailaddr=res[0].get(ldap_emailattr)ifldapemailaddr:ifisinstance(ldapemailaddr,list):ldapemailaddr=ldapemailaddr[0]# XXX consider only the first email in the listrset=execute('Any X,A WHERE ''X address A, U use_email X, U eid %(u)s',{'u':eid})ldapemailaddr=unicode(ldapemailaddr)foremaileid,emailaddr,inrset:ifemailaddr==ldapemailaddr:breakelse:self.debug('updating email address of user %s to %s',extid,ldapemailaddr)emailrset=execute('EmailAddress A WHERE A address %(addr)s',{'addr':ldapemailaddr})ifemailrset:execute('SET U use_email X WHERE ''X eid %(x)s, U eid %(u)s',{'x':emailrset[0][0],'u':eid})elifrset:ifnotexecute('SET X address %(addr)s WHERE ''U primary_email X, U eid %(u)s',{'addr':ldapemailaddr,'u':eid}):execute('SET X address %(addr)s WHERE ''X eid %(x)s',{'addr':ldapemailaddr,'x':rset[0][0]})else:# no email found, create it_insert_email(session,ldapemailaddr,eid)session.commit()defldap_name(self,var):ifvar.stinfo['relations']:relname=iter(var.stinfo['relations']).next().r_typereturnself.user_rev_attrs.get(relname)returnNonedefprepare_columns(self,mainvars,rqlst):"""return two list describing how to build the final results from the result of an ldap search (ie a list of dictionary) """columns=[]global_transforms=[]fori,terminenumerate(rqlst.selection):ifisinstance(term,Constant):columns.append(term)continueifisinstance(term,Function):# LOWER, UPPER, COUNT...var=term.get_nodes(VariableRef)[0]var=var.variabletry:mainvar=var.stinfo['attrvar'].nameexceptAttributeError:# no attrvar setmainvar=var.nameassertmainvarinmainvarstrname=term.nameldapname=self.ldap_name(var)iftrnamein('COUNT','MIN','MAX','SUM'):global_transforms.append(GlobTrFunc(trname,i,ldapname))columns.append((mainvar,ldapname))continueiftrnamein('LOWER','UPPER'):columns.append((mainvar,TrFunc(trname,i,ldapname)))continueraiseNotImplementedError('no support for %s function'%trname)ifterm.nameinmainvars:columns.append((term.name,'dn'))continuevar=term.variablemainvar=var.stinfo['attrvar'].namecolumns.append((mainvar,self.ldap_name(var)))#else:# # probably a bug in rql splitting if we arrive here# raise NotImplementedErrorreturncolumns,global_transformsdefsyntax_tree_search(self,session,union,args=None,cachekey=None,varmap=None,debug=0):"""return result from this source for a rql query (actually from a rql syntax tree and a solution dictionary mapping each used variable to a possible type). If cachekey is given, the query necessary to fetch the results (but not the results themselves) may be cached using this key. """self.debug('ldap syntax tree search')# XXX not handled : transform/aggregat function, join on multiple users...assertlen(union.children)==1,'union not supported'rqlst=union.children[0]assertnotrqlst.with_,'subquery not supported'rqlkey=rqlst.as_string(kwargs=args)try:results=self._query_cache[rqlkey]exceptKeyError:try:results=self.rqlst_search(session,rqlst,args)self._query_cache[rqlkey]=resultsexceptldap.SERVER_DOWN:# cant connect to servermsg=session._("can't connect to source %s, some data may be missing")session.set_shared_data('sources_error',msg%self.uri,txdata=True)return[]returnresultsdefrqlst_search(self,session,rqlst,args):mainvars=[]forvarnameinrqlst.defined_vars:forsolinrqlst.solutions:ifsol[varname]=='CWUser':mainvars.append(varname)breakassertmainvars,rqlstcolumns,globtransforms=self.prepare_columns(mainvars,rqlst)eidfilters=[lambdax:x>0]allresults=[]generator=RQL2LDAPFilter(self,session,args,mainvars)formainvarinmainvars:# handle restrictiontry:eidfilters_,ldapfilter=generator.generate(rqlst,mainvar)exceptGotDNasex:assertex.dn,'no dn!'try:res=[self._cache[ex.dn]]exceptKeyError:res=self._search(session,ex.dn,BASE)exceptUnknownEidasex:# raised when we are looking for the dn of an eid which is not# coming from this sourceres=[]else:eidfilters+=eidfilters_res=self._search(session,self.user_base_dn,self.user_base_scope,ldapfilter)allresults.append(res)# 1. get eid for each dn and filter according to that eid if necessaryfori,resinenumerate(allresults):filteredres=[]forresdictinres:# get sure the entity exists in the system tableeid=self.repo.extid2eid(self,resdict['dn'],'CWUser',session)foreidfilterineidfilters:ifnoteidfilter(eid):breakelse:resdict['eid']=eidfilteredres.append(resdict)allresults[i]=filteredres# 2. merge result for each "mainvar": cartesian productallresults=cartesian_product(allresults)# 3. build final result according to column definitionresult=[]forrawlineinallresults:rawline=dict(zip(mainvars,rawline))line=[]forvarname,ldapnameincolumns:ifldapnameisNone:value=None# no mapping availableelifldapname=='dn':value=rawline[varname]['eid']elifisinstance(ldapname,Constant):ifldapname.type=='Substitute':value=args[ldapname.value]else:value=ldapname.valueelifisinstance(ldapname,TrFunc):value=ldapname.apply(rawline[varname])else:value=rawline[varname].get(ldapname)line.append(value)result.append(line)fortrfuncinglobtransforms:result=trfunc.apply(result)#print '--> ldap result', resultreturnresultdef_process_ldap_item(self,dn,iterator):itemdict=super(LDAPUserSource,self)._process_ldap_item(dn,iterator)self._cache[dn]=itemdictreturnitemdictdef_process_no_such_object(self,session,dn):eid=self.repo.extid2eid(self,dn,'CWUser',session,insert=False)ifeid:self.warning('deleting ldap user with eid %s and dn %s',eid,dn)entity=session.entity_from_eid(eid,'CWUser')self.repo.delete_info(session,entity,self.uri)self.reset_caches()defbefore_entity_insertion(self,session,lid,etype,eid,sourceparams):"""called by the repository when an eid has been attributed for an entity stored here but the entity has not been inserted in the system table yet. This method must return the an Entity instance representation of this entity. """self.debug('ldap before entity insertion')entity=super(LDAPUserSource,self).before_entity_insertion(session,lid,etype,eid,sourceparams)res=self._search(session,lid,BASE)[0]forattrinentity.e_schema.indexable_attributes():entity.cw_edited[attr]=res[self.user_rev_attrs[attr]]returnentitydefafter_entity_insertion(self,session,lid,entity,sourceparams):"""called by the repository after an entity stored here has been inserted in the system table. """self.debug('ldap after entity insertion')super(LDAPUserSource,self).after_entity_insertion(session,lid,entity,sourceparams)forgroupinself.user_default_groups:session.execute('SET X in_group G WHERE X eid %(x)s, G name %(group)s',{'x':entity.eid,'group':group})# search for existant email firsttry:# lid = dnemailaddr=self._cache[lid][self.user_rev_attrs['email']]exceptKeyError:returnifisinstance(emailaddr,list):emailaddr=emailaddr[0]# XXX consider only the first email in the listrset=session.execute('EmailAddress X WHERE X address %(addr)s',{'addr':emailaddr})ifrset:session.execute('SET U primary_email X WHERE U eid %(u)s, X eid %(x)s',{'x':rset[0][0],'u':entity.eid})else:# not found, create it_insert_email(session,emailaddr,entity.eid)defupdate_entity(self,session,entity):"""replace an entity in the source"""raiseRepositoryError('this source is read only')defdelete_entity(self,session,entity):"""delete an entity from the source"""raiseRepositoryError('this source is read only')def_insert_email(session,emailaddr,ueid):session.execute('INSERT EmailAddress X: X address %(addr)s, U primary_email X ''WHERE U eid %(x)s',{'addr':emailaddr,'x':ueid})classGotDN(Exception):"""exception used when a dn localizing the searched user has been found"""def__init__(self,dn):self.dn=dnclassRQL2LDAPFilter(object):"""generate an LDAP filter for a rql query"""def__init__(self,source,session,args=None,mainvars=()):self.source=sourceself.repo=source.repoself._ldap_attrs=source.user_rev_attrsself._base_filters=source.base_filtersself._session=sessionifargsisNone:args={}self._args=argsself.mainvars=mainvarsdefgenerate(self,selection,mainvarname):self._filters=res=self._base_filters[:]self._mainvarname=mainvarnameself._eidfilters=[]self._done_not=set()restriction=selection.whereifisinstance(restriction,Relation):# only a single relation, need to append result here (no AND/OR)filter=restriction.accept(self)iffilterisnotNone:res.append(filter)elifrestriction:restriction.accept(self)iflen(res)>1:returnself._eidfilters,'(&%s)'%''.join(res)returnself._eidfilters,res[0]defvisit_and(self,et):"""generate filter for a AND subtree"""forcinet.children:part=c.accept(self)ifpart:self._filters.append(part)defvisit_or(self,ou):"""generate filter for a OR subtree"""res=[]forcinou.children:part=c.accept(self)ifpart:res.append(part)ifres:iflen(res)>1:part='(|%s)'%''.join(res)else:part=res[0]self._filters.append(part)defvisit_not(self,node):"""generate filter for a OR subtree"""part=node.children[0].accept(self)ifpart:self._filters.append('(!(%s))'%part)defvisit_relation(self,relation):"""generate filter for a relation"""rtype=relation.r_type# don't care of type constraint statement (i.e. relation_type = 'is')ifrtype=='is':return''lhs,rhs=relation.get_parts()# attribute relationifself.source.schema.rschema(rtype).final:# dunno what to do here, don't pretend anything elseiflhs.name!=self._mainvarname:iflhs.nameinself.mainvars:# XXX check we don't have variable as rhsreturnraiseNotImplementedErrorrhs_vars=rhs.get_nodes(VariableRef)ifrhs_vars:iflen(rhs_vars)>1:raiseNotImplementedError# selected variable, nothing to do herereturn# no variables in the RHSifisinstance(rhs.children[0],Function):res=rhs.children[0].accept(self)elifrtype!='has_text':res=self._visit_attribute_relation(relation)else:raiseNotImplementedError(relation)# regular relation XXX todo: in_groupelse:raiseNotImplementedError(relation)returnresdef_visit_attribute_relation(self,relation):"""generate filter for an attribute relation"""lhs,rhs=relation.get_parts()lhsvar=lhs.variableifrelation.r_type=='eid':# XXX hack# skip comparison signeid=int(rhs.children[0].accept(self))ifrelation.neged(strict=True):self._done_not.add(relation.parent)self._eidfilters.append(lambdax:notx==eid)returnifrhs.operator!='=':filter={'>':lambdax:x>eid,'>=':lambdax:x>=eid,'<':lambdax:x<eid,'<=':lambdax:x<=eid,}[rhs.operator]self._eidfilters.append(filter)returndn=self.repo.eid2extid(self.source,eid,self._session)raiseGotDN(dn)try:filter='(%s%s)'%(self._ldap_attrs[relation.r_type],rhs.accept(self))exceptKeyError:# unsupported attributeself.source.warning('%s source can\'t handle relation %s, no ''results will be returned from this source',self.source.uri,relation)raiseUnknownEid# trick to return no resultreturnfilterdefvisit_comparison(self,cmp):"""generate filter for a comparaison"""return'%s%s'%(cmp.operator,cmp.children[0].accept(self))defvisit_mathexpression(self,mexpr):"""generate filter for a mathematic expression"""raiseNotImplementedErrordefvisit_function(self,function):"""generate filter name for a function"""iffunction.name=='IN':returnself.visit_in(function)raiseNotImplementedErrordefvisit_in(self,function):grandpapa=function.parent.parentldapattr=self._ldap_attrs[grandpapa.r_type]res=[]forcinfunction.children:part=c.accept(self)ifpart:res.append(part)ifres:iflen(res)>1:part='(|%s)'%''.join('(%s=%s)'%(ldapattr,v)forvinres)else:part='(%s=%s)'%(ldapattr,res[0])returnpartdefvisit_constant(self,constant):"""generate filter name for a constant"""value=constant.valueifconstant.typeisNone:raiseNotImplementedErrorifconstant.type=='Date':raiseNotImplementedError#value = self.keyword_map[value]()elifconstant.type=='Substitute':value=self._args[constant.value]else:value=constant.valueifisinstance(value,unicode):value=value.encode('utf8')else:value=str(value)returnescape_filter_chars(value)defvisit_variableref(self,variableref):"""get the sql name for a variable reference"""pass