[entity] Add a cw_related_rqlst method returning the RQL select node of cw_related method
This is useful if one wants to modify the query before execution.
A bit a PEP8 style fix in tests along the way.
Related to #12306543.
# copyright 2003-2012 LOGILAB S.A. (Paris, FRANCE), all rights reserved.# contact http://www.logilab.fr/ -- mailto:contact@logilab.fr## This file is part of CubicWeb.## CubicWeb is free software: you can redistribute it and/or modify it under the# terms of the GNU Lesser General Public License as published by the Free# Software Foundation, either version 2.1 of the License, or (at your option)# any later version.## CubicWeb is distributed in the hope that it will be useful, but WITHOUT# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS# FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more# details.## You should have received a copy of the GNU Lesser General Public License along# with CubicWeb. If not, see <http://www.gnu.org/licenses/>."""Base class for request/session"""__docformat__="restructuredtext en"fromwarningsimportwarnfromdatetimeimporttime,datetime,timedeltafromsiximportPY2,PY3,text_typefromsix.moves.urllib.parseimportparse_qs,parse_qsl,quoteasurlquote,unquoteasurlunquote,urlsplit,urlunsplitfromlogilab.common.decoratorsimportcachedfromlogilab.common.deprecationimportdeprecatedfromlogilab.common.dateimportustrftime,strptime,todate,todatetimefromrql.utilsimportrqlvar_makerfromcubicwebimport(Unauthorized,NoSelectableObject,NoResultError,MultipleResultsError,uilib)fromcubicweb.rsetimportResultSetONESECOND=timedelta(0,1,0)CACHE_REGISTRY={}classFindEntityError(Exception):"""raised when find_one_entity() can not return one and only one entity"""classCache(dict):def__init__(self):super(Cache,self).__init__()_now=datetime.now()self.cache_creation_date=_nowself.latest_cache_lookup=_nowclassRequestSessionBase(object):"""base class containing stuff shared by server session and web request request/session is the main resources accessor, mainly through it's vreg attribute: :attribute vreg: the instance's registry :attribute vreg.schema: the instance's schema :attribute vreg.config: the instance's configuration """is_request=True# False for repository sessiondef__init__(self,vreg):self.vreg=vregtry:encoding=vreg.property_value('ui.encoding')exceptException:# no vreg or property not registeredencoding='utf-8'self.encoding=encoding# cache result of execution for (rql expr / eids),# should be emptied on commit/rollback of the server session / web# connectionself.user=Noneself.local_perm_cache={}self._=text_typedef_set_user(self,orig_user):"""set the user for this req_session_base A special method is needed to ensure the linked user is linked to the connection too. """rset=self.eid_rset(orig_user.eid,'CWUser')user_cls=self.vreg['etypes'].etype_class('CWUser')user=user_cls(self,rset,row=0,groups=orig_user.groups,properties=orig_user.properties)user.cw_attr_cache['login']=orig_user.login# cache loginself.user=userself.set_entity_cache(user)self.set_language(user.prefered_language())defset_language(self,lang):"""install i18n configuration for `lang` translation. Raises :exc:`KeyError` if translation doesn't exist. """self.lang=langgettext,pgettext=self.vreg.config.translations[lang]# use _cw.__ to translate a message without registering it to the catalogself._=self.__=gettextself.pgettext=pgettextdefget_option_value(self,option):raiseNotImplementedErrordefproperty_value(self,key):"""return value of the property with the given key, giving priority to user specific value if any, else using site value """ifself.user:val=self.user.property_value(key)ifvalisnotNone:returnvalreturnself.vreg.property_value(key)defetype_rset(self,etype,size=1):"""return a fake result set for a particular entity type"""rset=ResultSet([('A',)]*size,'%s X'%etype,description=[(etype,)]*size)defget_entity(row,col=0,etype=etype,req=self,rset=rset):returnreq.vreg['etypes'].etype_class(etype)(req,rset,row,col)rset.get_entity=get_entityrset.req=selfreturnrsetdefeid_rset(self,eid,etype=None):"""return a result set for the given eid without doing actual query (we have the eid, we can suppose it exists and user has access to the entity) """eid=int(eid)ifetypeisNone:etype=self.entity_metas(eid)['type']rset=ResultSet([(eid,)],'Any X WHERE X eid %(x)s',{'x':eid},[(etype,)])rset.req=selfreturnrsetdefempty_rset(self):""" return a guaranteed empty result """rset=ResultSet([],'Any X WHERE X eid -1')rset.req=selfreturnrsetdefentity_from_eid(self,eid,etype=None):"""return an entity instance for the given eid. No query is done"""try:returnself.entity_cache(eid)exceptKeyError:rset=self.eid_rset(eid,etype)entity=rset.get_entity(0,0)self.set_entity_cache(entity)returnentitydefentity_cache(self,eid):raiseKeyErrordefset_entity_cache(self,entity):passdefcreate_entity(self,etype,**kwargs):"""add a new entity of the given type Example (in a shell session): >>> c = create_entity('Company', name=u'Logilab') >>> create_entity('Person', firstname=u'John', surname=u'Doe', ... works_for=c) """cls=self.vreg['etypes'].etype_class(etype)returncls.cw_instantiate(self.execute,**kwargs)@deprecated('[3.18] use find(etype, **kwargs).entities()')deffind_entities(self,etype,**kwargs):"""find entities of the given type and attribute values. >>> users = find_entities('CWGroup', name=u'users') >>> groups = find_entities('CWGroup') """returnself.find(etype,**kwargs).entities()@deprecated('[3.18] use find(etype, **kwargs).one()')deffind_one_entity(self,etype,**kwargs):"""find one entity of the given type and attribute values. raise :exc:`FindEntityError` if can not return one and only one entity. >>> users = find_one_entity('CWGroup', name=u'users') >>> groups = find_one_entity('CWGroup') Exception() """try:returnself.find(etype,**kwargs).one()except(NoResultError,MultipleResultsError)ase:raiseFindEntityError("%s: (%s, %s)"%(str(e),etype,kwargs))deffind(self,etype,**kwargs):"""find entities of the given type and attribute values. :returns: A :class:`ResultSet` >>> users = find('CWGroup', name=u"users").one() >>> groups = find('CWGroup').entities() """parts=['Any X WHERE X is %s'%etype]varmaker=rqlvar_maker(defined='X')eschema=self.vreg.schema.eschema(etype)forattr,valueinkwargs.items():ifisinstance(value,list)orisinstance(value,tuple):raiseNotImplementedError("List of values are not supported")ifhasattr(value,'eid'):kwargs[attr]=value.eidifattr.startswith('reverse_'):attr=attr[8:]assertattrineschema.objrels, \'%s not in %s object relations'%(attr,eschema)parts.append('%(varname)s%(attr)s X, ''%(varname)s eid %%(reverse_%(attr)s)s'%{'attr':attr,'varname':next(varmaker)})else:assertattrineschema.subjrels, \'%s not in %s subject relations'%(attr,eschema)parts.append('X %(attr)s%%(%(attr)s)s'%{'attr':attr})rql=', '.join(parts)returnself.execute(rql,kwargs)defensure_ro_rql(self,rql):"""raise an exception if the given rql is not a select query"""first=rql.split(None,1)[0].lower()iffirstin('insert','set','delete'):raiseUnauthorized(self._('only select queries are authorized'))defget_cache(self,cachename):"""cachename should be dotted names as in : - cubicweb.mycache - cubes.blog.mycache - etc. """warn.warning('[3.19] .get_cache will disappear soon. ''Distributed caching mechanisms are being introduced instead.''Other caching mechanism can be used more reliably ''to the same effect.',DeprecationWarning)ifcachenameinCACHE_REGISTRY:cache=CACHE_REGISTRY[cachename]else:cache=CACHE_REGISTRY[cachename]=Cache()_now=datetime.now()if_now>cache.latest_cache_lookup+ONESECOND:ecache=self.execute('Any C,T WHERE C is CWCache, C name %(name)s, C timestamp T',{'name':cachename}).get_entity(0,0)cache.latest_cache_lookup=_nowifnotecache.valid(cache.cache_creation_date):cache.clear()cache.cache_creation_date=_nowreturncache# url generation methods ##################################################defbuild_url(self,*args,**kwargs):"""return an absolute URL using params dictionary key/values as URL parameters. Values are automatically URL quoted, and the publishing method to use may be specified or will be guessed. if ``__secure__`` argument is True, the request will try to build a https url. raises :exc:`ValueError` if None is found in arguments """# use *args since we don't want first argument to be "anonymous" to# avoid potential clash with kwargsmethod=Noneifargs:assertlen(args)==1,'only 0 or 1 non-named-argument expected'method=args[0]ifmethodisNone:method='view'# XXX I (adim) think that if method is passed explicitly, we should# not try to process it and directly call req.build_url()base_url=kwargs.pop('base_url',None)ifbase_urlisNone:secure=kwargs.pop('__secure__',None)base_url=self.base_url(secure=secure)if'_restpath'inkwargs:assertmethod=='view',repr(method)path=kwargs.pop('_restpath')else:path=methodifnotkwargs:returnu'%s%s'%(base_url,path)returnu'%s%s?%s'%(base_url,path,self.build_url_params(**kwargs))defbuild_url_params(self,**kwargs):"""return encoded params to incorporate them in a URL"""args=[]forparam,valuesinkwargs.items():ifnotisinstance(values,(list,tuple)):values=(values,)forvalueinvalues:assertvalueisnotNoneargs.append(u'%s=%s'%(param,self.url_quote(value)))return'&'.join(args)defurl_quote(self,value,safe=''):"""urllib.quote is not unicode safe, use this method to do the necessary encoding / decoding. Also it's designed to quote each part of a url path and so the '/' character will be encoded as well. """ifPY2andisinstance(value,unicode):quoted=urlquote(value.encode(self.encoding),safe=safe)returnunicode(quoted,self.encoding)returnurlquote(str(value),safe=safe)defurl_unquote(self,quoted):"""returns a unicode unquoted string decoding is based on `self.encoding` which is the encoding used in `url_quote` """ifPY3:returnurlunquote(quoted)ifisinstance(quoted,unicode):quoted=quoted.encode(self.encoding)try:returnunicode(urlunquote(quoted),self.encoding)exceptUnicodeDecodeError:# might occurs on manually typed URLsreturnunicode(urlunquote(quoted),'iso-8859-1')defurl_parse_qsl(self,querystring):"""return a list of (key, val) found in the url quoted query string"""ifPY3:forkey,valinparse_qsl(querystring):yieldkey,valreturnifisinstance(querystring,unicode):querystring=querystring.encode(self.encoding)forkey,valinparse_qsl(querystring):try:yieldunicode(key,self.encoding),unicode(val,self.encoding)exceptUnicodeDecodeError:# might occurs on manually typed URLsyieldunicode(key,'iso-8859-1'),unicode(val,'iso-8859-1')defrebuild_url(self,url,**newparams):"""return the given url with newparams inserted. If any new params is already specified in the url, it's overriden by the new value newparams may only be mono-valued. """ifPY2andisinstance(url,unicode):url=url.encode(self.encoding)schema,netloc,path,query,fragment=urlsplit(url)query=parse_qs(query)# sort for testing predictabilityforkey,valinsorted(newparams.items()):query[key]=(self.url_quote(val),)query='&'.join(u'%s=%s'%(param,value)forparam,valuesinsorted(query.items())forvalueinvalues)returnurlunsplit((schema,netloc,path,query,fragment))# bound user related methods ###############################################@cacheddefuser_data(self):"""returns a dictionary with this user's information. The keys are : login The user login name The user name, returned by user.name() email The user principal email """userinfo={}user=self.useruserinfo['login']=user.loginuserinfo['name']=user.name()userinfo['email']=user.cw_adapt_to('IEmailable').get_email()returnuserinfo# formating methods #######################################################defview(self,__vid,rset=None,__fallback_oid=None,__registry='views',initargs=None,w=None,**kwargs):"""Select object with the given id (`__oid`) then render it. If the object isn't selectable, try to select fallback object if `__fallback_oid` is specified. If specified `initargs` is expected to be a dictionary containing arguments that should be given to selection (hence to object's __init__ as well), but not to render(). Other arbitrary keyword arguments will be given to selection *and* to render(), and so should be handled by object's call or cell_call method.. """ifinitargsisNone:initargs=kwargselse:initargs.update(kwargs)try:view=self.vreg[__registry].select(__vid,self,rset=rset,**initargs)exceptNoSelectableObject:if__fallback_oidisNone:raiseview=self.vreg[__registry].select(__fallback_oid,self,rset=rset,**initargs)returnview.render(w=w,**kwargs)defprintable_value(self,attrtype,value,props=None,displaytime=True,formatters=uilib.PRINTERS):"""return a displayablye value (i.e. unicode string)"""ifvalueisNone:returnu''try:as_string=formatters[attrtype]exceptKeyError:self.error('given bad attrtype %s',attrtype)returnunicode(value)returnas_string(value,self,props,displaytime)defformat_date(self,date,date_format=None,time=False):"""return a string for a date time according to instance's configuration """ifdateisnotNone:ifdate_formatisNone:iftime:date_format=self.property_value('ui.datetime-format')else:date_format=self.property_value('ui.date-format')returnustrftime(date,date_format)returnu''defformat_time(self,time):"""return a string for a time according to instance's configuration """iftimeisnotNone:returnustrftime(time,self.property_value('ui.time-format'))returnu''defformat_float(self,num):"""return a string for floating point number according to instance's configuration """ifnumisnotNone:returnself.property_value('ui.float-format')%numreturnu''defparse_datetime(self,value,etype='Datetime'):"""get a datetime or time from a string (according to etype) Datetime formatted as Date are accepted """assertetypein('Datetime','Date','Time'),etype# XXX raise proper validation errorifetype=='Datetime':format=self.property_value('ui.datetime-format')try:returntodatetime(strptime(value,format))exceptValueError:passelifetype=='Time':format=self.property_value('ui.time-format')try:# (adim) I can't find a way to parse a Time with a custom formatdate=strptime(value,format)# this returns a DateTimereturntime(date.hour,date.minute,date.second)exceptValueError:raiseValueError(self._('can\'t parse %(value)r (expected %(format)s)')%{'value':value,'format':format})try:format=self.property_value('ui.date-format')dt=strptime(value,format)ifetype=='Datetime':returntodatetime(dt)returntodate(dt)exceptValueError:raiseValueError(self._('can\'t parse %(value)r (expected %(format)s)')%{'value':value,'format':format})def_base_url(self,secure=None):ifsecure:returnself.vreg.config.get('https-url')orself.vreg.config['base-url']returnself.vreg.config['base-url']defbase_url(self,secure=None):"""return the root url of the instance """url=self._base_url(secure=secure)returnurlifurlisNoneelseurl.rstrip('/')+'/'# abstract methods to override according to the web front-end #############defdescribe(self,eid,asdict=False):"""return a tuple (type, sourceuri, extid) for the entity with id <eid>"""raiseNotImplementedError