optional rset, those prototype could be simplified once bw compat for non named rset will be dropped
"""cubicweb server sources support:organization: Logilab:copyright: 2001-2009 LOGILAB S.A. (Paris, FRANCE), license is LGPL v2.:contact: http://www.logilab.fr/ -- mailto:contact@logilab.fr:license: GNU Lesser General Public License, v2.1 - http://www.gnu.org/licenses"""__docformat__="restructuredtext en"fromdatetimeimportdatetime,timedeltafromloggingimportgetLoggerfromcubicwebimportset_log_methodsfromcubicweb.server.sqlutilsimportSQL_PREFIXclassTimedCache(dict):def__init__(self,ttlm,ttls=0):# time to live in minutesself.ttl=timedelta(0,ttlm*60+ttls,0)def__setitem__(self,key,value):dict.__setitem__(self,key,(datetime.now(),value))def__getitem__(self,key):returndict.__getitem__(self,key)[1]defclear_expired(self):now_=datetime.now()ttl=self.ttlforkey,(timestamp,value)inself.items():ifnow_-timestamp>ttl:delself[key]classAbstractSource(object):"""an abstract class for sources"""# boolean telling if modification hooks should be called when something is# modified in this sourceshould_call_hooks=True# boolean telling if the repository should connect to this source during# migrationconnect_for_migration=True# mappings telling which entities and relations are available in the source# keys are supported entity/relation types and values are boolean indicating# wether the support is read-only (False) or read-write (True)support_entities={}support_relations={}# a global identifier for this source, which has to be set by the source# instanceuri=None# a reference to the system information helperrepo=None# a reference to the application'schema (may differs from the source'schema)schema=Nonedef__init__(self,repo,appschema,source_config,*args,**kwargs):self.repo=repoself.uri=source_config['uri']set_log_methods(self,getLogger('cubicweb.sources.'+self.uri))self.set_schema(appschema)self.support_relations['identity']=Falsedefinit_creating(self):"""method called by the repository once ready to create a new instance"""passdefinit(self):"""method called by the repository once ready to handle request"""passdefreset_caches(self):"""method called during test to reset potential source caches"""passdefclear_eid_cache(self,eid,etype):"""clear potential caches for the given eid"""passdef__repr__(self):return'<%s source @%#x>'%(self.uri,id(self))def__cmp__(self,other):"""simple comparison function to get predictable source order, with the system source at last """ifself.uri==other.uri:return0ifself.uri=='system':return1ifother.uri=='system':return-1returncmp(self.uri,other.uri)defset_schema(self,schema):"""set the application'schema"""self.schema=schemadefsupport_entity(self,etype,write=False):"""return true if the given entity's type is handled by this adapter if write is true, return true only if it's a RW support """try:wsupport=self.support_entities[etype]exceptKeyError:returnFalseifwrite:returnwsupportreturnTruedefsupport_relation(self,rtype,write=False):"""return true if the given relation's type is handled by this adapter if write is true, return true only if it's a RW support current implementation return true if the relation is defined into `support_relations` or if it is a final relation of a supported entity type """try:wsupport=self.support_relations[rtype]exceptKeyError:rschema=self.schema.rschema(rtype)ifnotrschema.is_final()orrschema=='has_text':returnFalseforetypeinrschema.subjects():try:wsupport=self.support_entities[etype]breakexceptKeyError:continueelse:returnFalseifwrite:returnwsupportreturnTruedefeid2extid(self,eid,session=None):returnself.repo.eid2extid(self,eid,session)defextid2eid(self,value,etype,session=None,**kwargs):returnself.repo.extid2eid(self,value,etype,session,**kwargs)PUBLIC_KEYS=('adapter','uri')defremove_sensitive_information(self,sourcedef):"""remove sensitive information such as login / password from source definition """forkeyinsourcedef.keys():ifnotkeyinself.PUBLIC_KEYS:sourcedef.pop(key)def_cleanup_system_relations(self,session):"""remove relation in the system source referencing entities coming from this source """cu=session.system_sql('SELECT eid FROM entities WHERE source=%(uri)s',{'uri':self.uri})myeids=','.join(str(r[0])forrincu.fetchall())ifnotmyeids:return# delete relations referencing one of those eidseidcolum=SQL_PREFIX+'eid'forrschemainself.schema.relations():ifrschema.is_final()orrschema.type=='identity':continueifrschema.inlined:column=SQL_PREFIX+rschema.typeforsubjtypeinrschema.subjects():table=SQL_PREFIX+str(subjtype)forobjtypeinrschema.objects(subjtype):ifself.support_entity(objtype):sql='UPDATE %s SET %s=NULL WHERE %s IN (%s);'%(table,column,eidcolum,myeids)session.system_sql(sql)breakcontinueforetypeinrschema.subjects():ifself.support_entity(etype):sql='DELETE FROM %s_relation WHERE eid_from IN (%s);'%(rschema.type,myeids)session.system_sql(sql)breakforetypeinrschema.objects():ifself.support_entity(etype):sql='DELETE FROM %s_relation WHERE eid_to IN (%s);'%(rschema.type,myeids)session.system_sql(sql)breakdefcleanup_entities_info(self,session):"""cleanup system tables from information for entities coming from this source. This should be called when a source is removed to properly cleanup the database """self._cleanup_system_relations(session)# fti / entities tables cleanup# sqlite doesn't support DELETE FROM xxx USING yyydbhelper=session.pool.source('system').dbhelpersession.system_sql('DELETE FROM %s WHERE %s.%s IN (SELECT eid FROM ''entities WHERE entities.source=%%(uri)s)'%(dbhelper.fti_table,dbhelper.fti_table,dbhelper.fti_uid_attr),{'uri':self.uri})session.system_sql('DELETE FROM entities WHERE source=%(uri)s',{'uri':self.uri})# abstract methods to override (at least) in concrete source classes #######defget_connection(self):"""open and return a connection to the source"""raiseNotImplementedError()defcheck_connection(self,cnx):"""check connection validity, return None if the connection is still valid else a new connection (called when the pool using the given connection is being attached to a session) do nothing by default """passdefpool_reset(self,cnx):"""the pool using the given connection is being reseted from its current attached session do nothing by default """passdefauthenticate(self,session,login,password):"""if the source support CWUser entity type, it should implements this method which should return CWUser eid for the given login/password if this account is defined in this source and valid login / password is given. Else raise `AuthenticationError` """raiseNotImplementedError()defsyntax_tree_search(self,session,union,args=None,cachekey=None,varmap=None,debug=0):"""return result from this source for a rql query (actually from a rql syntax tree and a solution dictionary mapping each used variable to a possible type). If cachekey is given, the query necessary to fetch the results (but not the results themselves) may be cached using this key. """raiseNotImplementedError()defflying_insert(self,table,session,union,args=None,varmap=None):"""similar as .syntax_tree_search, but inserts data in the temporary table (on-the-fly if possible, eg for the system source whose the given cursor come from). If not possible, inserts all data by calling .executemany(). """res=self.syntax_tree_search(session,union,args,varmap=varmap)session.pool.source('system')._manual_insert(res,table,session)# system source don't have to implement the two methods belowdefbefore_entity_insertion(self,session,lid,etype,eid):"""called by the repository when an eid has been attributed for an entity stored here but the entity has not been inserted in the system table yet. This method must return the an Entity instance representation of this entity. """entity=self.repo.vreg.etype_class(etype)(session,None)entity.set_eid(eid)returnentitydefafter_entity_insertion(self,session,lid,entity):"""called by the repository after an entity stored here has been inserted in the system table. """pass# read-only sources don't have to implement methods belowdefget_extid(self,entity):"""return the external id for the given newly inserted entity"""raiseNotImplementedError()defadd_entity(self,session,entity):"""add a new entity to the source"""raiseNotImplementedError()defupdate_entity(self,session,entity):"""update an entity in the source"""raiseNotImplementedError()defdelete_entity(self,session,etype,eid):"""delete an entity from the source"""raiseNotImplementedError()defadd_relation(self,session,subject,rtype,object):"""add a relation to the source"""raiseNotImplementedError()defdelete_relation(self,session,subject,rtype,object):"""delete a relation from the source"""raiseNotImplementedError()# system source interface #################################################defeid_type_source(self,session,eid):"""return a tuple (type, source, extid) for the entity with id <eid>"""raiseNotImplementedError()defcreate_eid(self,session):raiseNotImplementedError()defadd_info(self,session,entity,source,extid=None):"""add type and source info for an eid into the system table"""raiseNotImplementedError()defdelete_info(self,session,eid,etype,uri,extid):"""delete system information on deletion of an entity by transfering record from the entities table to the deleted_entities table """raiseNotImplementedError()deffti_unindex_entity(self,session,eid):"""remove text content for entity with the given eid from the full text index """raiseNotImplementedError()deffti_index_entity(self,session,entity):"""add text content of a created/modified entity to the full text index """raiseNotImplementedError()defmodified_entities(self,session,etypes,mtime):"""return a 2-uple: * list of (etype, eid) of entities of the given types which have been modified since the given timestamp (actually entities whose full text index content has changed) * list of (etype, eid) of entities of the given types which have been deleted since the given timestamp """raiseNotImplementedError()# sql system source interface #############################################defsqlexec(self,session,sql,args=None):"""execute the query and return its result"""raiseNotImplementedError()deftemp_table_def(self,selection,solution,table,basemap):raiseNotImplementedError()defcreate_index(self,session,table,column,unique=False):raiseNotImplementedError()defdrop_index(self,session,table,column,unique=False):raiseNotImplementedError()defcreate_temp_table(self,session,table,schema):raiseNotImplementedError()defclean_temp_data(self,session,temptables):"""remove temporary data, usually associated to temporary tables"""passclassTrFunc(object):"""lower, upper"""def__init__(self,trname,index,attrname=None):self._tr=trname.lower()self.index=indexself.attrname=attrnamedefapply(self,resdict):value=resdict.get(self.attrname)ifvalueisnotNone:returngetattr(value,self._tr)()returnNoneclassGlobTrFunc(TrFunc):"""count, sum, max, min, avg"""funcs={'count':len,'sum':sum,'max':max,'min':min,# XXX avg}defapply(self,result):"""have to 'groupby' manually. For instance, if we 'count' for index 1: >>> self.apply([(1, 2), (3, 4), (1, 5)]) [(1, 7), (3, 4)] """keys,values=[],{}forrowinresult:key=tuple(vfori,vinenumerate(row)ifi!=self.index)value=row[self.index]try:values[key].append(value)exceptKeyError:keys.append(key)values[key]=[value]result=[]trfunc=self.funcs[self._tr]forkeyinkeys:row=list(key)row.insert(self.index,trfunc(values[key]))result.append(row)returnresultclassConnectionWrapper(object):def__init__(self,cnx=None):self.cnx=cnxdefcommit(self):passdefrollback(self):passdefcursor(self):returnNone# no actual cursor supportfromcubicweb.serverimportSOURCE_TYPESdefsource_adapter(source_config):adapter_type=source_config['adapter'].lower()try:returnSOURCE_TYPES[adapter_type]exceptKeyError:raiseRuntimeError('Unknown adapter %r'%adapter_type)defget_source(source_config,global_schema,repo):"""return a source adapter according to the adapter field in the source's configuration """returnsource_adapter(source_config)(repo,global_schema,source_config)