"""Adapters for native cubicweb sources.Notes:* extid (aka external id, the primary key of an entity in the external source from which it comes from) are stored in a varchar column encoded as a base64 string. This is because it should actually be Bytes but we want an index on it for fast querying.:organization: Logilab:copyright: 2001-2009 LOGILAB S.A. (Paris, FRANCE), license is LGPL v2.:contact: http://www.logilab.fr/ -- mailto:contact@logilab.fr:license: GNU Lesser General Public License, v2.1 - http://www.gnu.org/licenses"""__docformat__="restructuredtext en"fromthreadingimportLockfromdatetimeimportdatetimefrombase64importb64decode,b64encodefromlogilab.common.cacheimportCachefromlogilab.common.configurationimportREQUIREDfromlogilab.common.adbhimportget_adv_func_helperfromindexerimportget_indexerfromcubicwebimportUnknownEid,AuthenticationError,Binary,serverfromcubicweb.server.utilsimportcrypt_passwordfromcubicweb.server.sqlutilsimportSQL_PREFIX,SQLAdapterMixInfromcubicweb.server.rqlannotationimportset_qdatafromcubicweb.server.sourcesimportAbstractSource,dbg_st_search,dbg_resultsfromcubicweb.server.sources.rql2sqlimportSQLGeneratorATTR_MAP={}NONSYSTEM_ETYPES=set()NONSYSTEM_RELATIONS=set()classLogCursor(object):def__init__(self,cursor):self.cu=cursordefexecute(self,query,args=None):"""Execute a query. it's a function just so that it shows up in profiling """ifserver.DEBUG&server.DBG_SQL:print'exec',query,argstry:self.cu.execute(str(query),args)exceptException,ex:print"sql: %r\n args: %s\ndbms message: %r"%(query,args,ex.args[0])raisedeffetchall(self):returnself.cu.fetchall()deffetchone(self):returnself.cu.fetchone()defmake_schema(selected,solution,table,typemap):"""return a sql schema to store RQL query result"""sql=[]varmap={}fori,terminenumerate(selected):name='C%s'%ikey=term.as_string()varmap[key]='%s.%s'%(table,name)ttype=term.get_type(solution)try:sql.append('%s%s'%(name,typemap[ttype]))exceptKeyError:# assert not schema(ttype).is_final()sql.append('%s%s'%(name,typemap['Int']))return','.join(sql),varmapdef_modified_sql(table,etypes):# XXX protect against sql injectioniflen(etypes)>1:restr='type IN (%s)'%','.join("'%s'"%etypeforetypeinetypes)else:restr="type='%s'"%etypes[0]iftable=='entities':attr='mtime'else:attr='dtime'return'SELECT type, eid FROM %s WHERE %s AND %s > %%(time)s'%(table,restr,attr)classNativeSQLSource(SQLAdapterMixIn,AbstractSource):"""adapter for source using the native cubicweb schema (see below) """sqlgen_class=SQLGeneratorpasswd_rql="Any P WHERE X is CWUser, X login %(login)s, X upassword P"auth_rql="Any X WHERE X is CWUser, X login %(login)s, X upassword %(pwd)s"_sols=({'X':'CWUser','P':'Password'},)options=(('db-driver',{'type':'string','default':'postgres','help':'database driver (postgres or sqlite)','group':'native-source','inputlevel':1,}),('db-host',{'type':'string','default':'','help':'database host','group':'native-source','inputlevel':1,}),('db-port',{'type':'string','default':'','help':'database port','group':'native-source','inputlevel':1,}),('db-name',{'type':'string','default':REQUIRED,'help':'database name','group':'native-source','inputlevel':0,}),('db-user',{'type':'string','default':'cubicweb','help':'database user','group':'native-source','inputlevel':0,}),('db-password',{'type':'password','default':'','help':'database password','group':'native-source','inputlevel':0,}),('db-encoding',{'type':'string','default':'utf8','help':'database encoding','group':'native-source','inputlevel':1,}),)def__init__(self,repo,appschema,source_config,*args,**kwargs):SQLAdapterMixIn.__init__(self,source_config)AbstractSource.__init__(self,repo,appschema,source_config,*args,**kwargs)# sql generatorself._rql_sqlgen=self.sqlgen_class(appschema,self.dbhelper,self.encoding,ATTR_MAP.copy())# full text index helperself.indexer=get_indexer(self.dbdriver,self.encoding)# advanced functionality helperself.dbhelper.fti_uid_attr=self.indexer.uid_attrself.dbhelper.fti_table=self.indexer.tableself.dbhelper.fti_restriction_sql=self.indexer.restriction_sqlself.dbhelper.fti_need_distinct_query=self.indexer.need_distinct# sql queries cacheself._cache=Cache(repo.config['rql-cache-size'])self._temp_table_data={}self._eid_creation_lock=Lock()# XXX no_sqlite_wrap trick since we've a sqlite locking pb when# running unittest_multisources with the wrapping belowifself.dbdriver=='sqlite'and \notgetattr(repo.config,'no_sqlite_wrap',False):fromcubicweb.server.sources.extliteimportConnectionWrapperself.get_connection=lambda:ConnectionWrapper(self)self.check_connection=lambdacnx:cnxdefpool_reset(cnx):cnx.close()self.pool_reset=pool_reset@propertydef_sqlcnx(self):# XXX: sqlite connections can only be used in the same thread, so# create a new one each time necessary. If it appears to be time# consuming, find another wayreturnSQLAdapterMixIn.get_connection(self)defreset_caches(self):"""method called during test to reset potential source caches"""self._cache=Cache(self.repo.config['rql-cache-size'])defclear_eid_cache(self,eid,etype):"""clear potential caches for the given eid"""self._cache.pop('Any X WHERE X eid %s, X is %s'%(eid,etype),None)self._cache.pop('Any X WHERE X eid %s'%eid,None)self._cache.pop('Any %s'%eid,None)defsqlexec(self,session,sql,args=None):"""execute the query and return its result"""returnself.process_result(self.doexec(session,sql,args))definit_creating(self):pool=self.repo._get_pool()pool.pool_set()# check full text index availibilityifnotself.indexer.has_fti_table(pool['system']):self.error('no text index table')self.indexer=Nonepool.pool_reset()self.repo._free_pool(pool)defbackup(self,backupfile):"""method called to create a backup of the source's data"""self.close_pool_connections()try:self.backup_to_file(backupfile)finally:self.open_pool_connections()defrestore(self,backupfile,drop):"""method called to restore a backup of source's data"""ifself.repo.config.open_connections_pools:self.close_pool_connections()try:self.restore_from_file(backupfile,drop)finally:ifself.repo.config.open_connections_pools:self.open_pool_connections()definit(self):self.init_creating()defmap_attribute(self,etype,attr,cb):self._rql_sqlgen.attr_map['%s.%s'%(etype,attr)]=cb# ISource interface #######################################################defcompile_rql(self,rql):rqlst=self.repo.vreg.rqlhelper.parse(rql)rqlst.restricted_vars=()rqlst.children[0].solutions=self._solsself.repo.querier.sqlgen_annotate(rqlst)set_qdata(self.schema.rschema,rqlst,())returnrqlstdefset_schema(self,schema):"""set the instance'schema"""self._cache=Cache(self.repo.config['rql-cache-size'])self.cache_hit,self.cache_miss,self.no_cache=0,0,0self.schema=schematry:self._rql_sqlgen.schema=schemaexceptAttributeError:pass# __init__if'CWUser'inschema:# probably an empty schema if not true...# rql syntax trees used to authenticate usersself._passwd_rqlst=self.compile_rql(self.passwd_rql)self._auth_rqlst=self.compile_rql(self.auth_rql)defsupport_entity(self,etype,write=False):"""return true if the given entity's type is handled by this adapter if write is true, return true only if it's a RW support """returnnotetypeinNONSYSTEM_ETYPESdefsupport_relation(self,rtype,write=False):"""return true if the given relation's type is handled by this adapter if write is true, return true only if it's a RW support """ifwrite:returnnotrtypeinNONSYSTEM_RELATIONS# due to current multi-sources implementation, the system source# can't claim not supporting a relationreturnTrue#not rtype == 'content_for'defmay_cross_relation(self,rtype):returnTruedefauthenticate(self,session,login,password):"""return CWUser eid for the given login/password if this account is defined in this source, else raise `AuthenticationError` two queries are needed since passwords are stored crypted, so we have to fetch the salt first """args={'login':login,'pwd':password}ifpasswordisnotNone:rset=self.syntax_tree_search(session,self._passwd_rqlst,args)try:pwd=rset[0][0]exceptIndexError:raiseAuthenticationError('bad login')# passwords are stored using the Bytes type, so we get a StringIOifpwdisnotNone:args['pwd']=crypt_password(password,pwd.getvalue()[:2])# get eid from login and (crypted) passwordrset=self.syntax_tree_search(session,self._auth_rqlst,args)try:returnrset[0][0]exceptIndexError:raiseAuthenticationError('bad password')defsyntax_tree_search(self,session,union,args=None,cachekey=None,varmap=None):"""return result from this source for a rql query (actually from a rql syntax tree and a solution dictionary mapping each used variable to a possible type). If cachekey is given, the query necessary to fetch the results (but not the results themselves) may be cached using this key. """assertdbg_st_search(self.uri,union,varmap,args,cachekey)# remember number of actually selected term (sql generation may append some)ifcachekeyisNone:self.no_cache+=1# generate sql query if we are able to do so (not supported types...)sql,query_args=self._rql_sqlgen.generate(union,args,varmap)else:# sql may be cachedtry:sql,query_args=self._cache[cachekey]self.cache_hit+=1exceptKeyError:self.cache_miss+=1sql,query_args=self._rql_sqlgen.generate(union,args,varmap)self._cache[cachekey]=sql,query_argsargs=self.merge_args(args,query_args)assertisinstance(sql,basestring),repr(sql)try:cursor=self.doexec(session,sql,args)except(self.dbapi_module.OperationalError,self.dbapi_module.InterfaceError):# FIXME: better detection of deconnection pbself.info("request failed '%s' ... retry with a new cursor",sql)session.pool.reconnect(self)cursor=self.doexec(session,sql,args)results=self.process_result(cursor)assertdbg_results(results)returnresultsdefflying_insert(self,table,session,union,args=None,varmap=None):"""similar as .syntax_tree_search, but inserts data in the temporary table (on-the-fly if possible, eg for the system source whose the given cursor come from). If not possible, inserts all data by calling .executemany(). """assertdbg_st_search(self.uri,union,varmap,args,prefix='ON THE FLY temp data insertion into %s from'%table)# generate sql queries if we are able to do sosql,query_args=self._rql_sqlgen.generate(union,args,varmap)query='INSERT INTO %s%s'%(table,sql.encode(self.encoding))self.doexec(session,query,self.merge_args(args,query_args))defmanual_insert(self,results,table,session):"""insert given result into a temporary table on the system source"""ifserver.DEBUG&server.DBG_RQL:print' manual insertion of',res,'into',tableifnotresults:returnquery_args=['%%(%s)s'%iforiinxrange(len(results[0]))]query='INSERT INTO %s VALUES(%s)'%(table,','.join(query_args))kwargs_list=[]forrowinresults:kwargs={}row=tuple(row)forindex,cellinenumerate(row):ifisinstance(cell,Binary):cell=self.binary(cell.getvalue())kwargs[str(index)]=cellkwargs_list.append(kwargs)self.doexecmany(session,query,kwargs_list)defclean_temp_data(self,session,temptables):"""remove temporary data, usually associated to temporary tables"""iftemptables:fortableintemptables:try:self.doexec(session,'DROP TABLE %s'%table)except:passtry:delself._temp_table_data[table]exceptKeyError:continuedefadd_entity(self,session,entity):"""add a new entity to the source"""attrs=self.preprocess_entity(entity)sql=self.sqlgen.insert(SQL_PREFIX+str(entity.e_schema),attrs)self.doexec(session,sql,attrs)defupdate_entity(self,session,entity):"""replace an entity in the source"""attrs=self.preprocess_entity(entity)sql=self.sqlgen.update(SQL_PREFIX+str(entity.e_schema),attrs,[SQL_PREFIX+'eid'])self.doexec(session,sql,attrs)defdelete_entity(self,session,etype,eid):"""delete an entity from the source"""attrs={SQL_PREFIX+'eid':eid}sql=self.sqlgen.delete(SQL_PREFIX+etype,attrs)self.doexec(session,sql,attrs)defadd_relation(self,session,subject,rtype,object):"""add a relation to the source"""attrs={'eid_from':subject,'eid_to':object}sql=self.sqlgen.insert('%s_relation'%rtype,attrs)self.doexec(session,sql,attrs)defdelete_relation(self,session,subject,rtype,object):"""delete a relation from the source"""rschema=self.schema.rschema(rtype)ifrschema.inlined:table=SQL_PREFIX+session.describe(subject)[0]column=SQL_PREFIX+rtypesql='UPDATE %s SET %s=NULL WHERE %seid=%%(eid)s'%(table,column,SQL_PREFIX)attrs={'eid':subject}else:attrs={'eid_from':subject,'eid_to':object}sql=self.sqlgen.delete('%s_relation'%rtype,attrs)self.doexec(session,sql,attrs)defdoexec(self,session,query,args=None,rollback=True):"""Execute a query. it's a function just so that it shows up in profiling """cursor=session.pool[self.uri]ifserver.DEBUG&server.DBG_SQL:cnx=session.pool.connection(self.uri)# getattr to get the actual connection if cnx is a ConnectionWrapper# instanceprint'exec',query,args,getattr(cnx,'_cnx',cnx)try:# str(query) to avoid error if it's an unicode stringcursor.execute(str(query),args)exceptException,ex:self.critical("sql: %r\n args: %s\ndbms message: %r",query,args,ex.args[0])ifrollback:try:session.pool.connection(self.uri).rollback()self.critical('transaction has been rollbacked')except:passraisereturncursordefdoexecmany(self,session,query,args):"""Execute a query. it's a function just so that it shows up in profiling """ifserver.DEBUG&server.DBG_SQL:print'execmany',query,'with',len(args),'arguments'cursor=session.pool[self.uri]try:# str(query) to avoid error if it's an unicode stringcursor.executemany(str(query),args)exceptException,ex:self.critical("sql many: %r\n args: %s\ndbms message: %r",query,args,ex.args[0])try:session.pool.connection(self.uri).rollback()self.critical('transaction has been rollbacked')except:passraise# short cut to method requiring advanced db helper usage ##################defcreate_index(self,session,table,column,unique=False):cursor=LogCursor(session.pool[self.uri])self.dbhelper.create_index(cursor,table,column,unique)defdrop_index(self,session,table,column,unique=False):cursor=LogCursor(session.pool[self.uri])self.dbhelper.drop_index(cursor,table,column,unique)# system source interface #################################################defeid_type_source(self,session,eid):"""return a tuple (type, source, extid) for the entity with id <eid>"""sql='SELECT type, source, extid FROM entities WHERE eid=%s'%eidtry:res=session.system_sql(sql).fetchone()except:assertsession.pool,'session has no pool set'raiseUnknownEid(eid)ifresisNone:raiseUnknownEid(eid)ifres[-1]isnotNone:ifnotisinstance(res,list):res=list(res)res[-1]=b64decode(res[-1])returnresdefextid2eid(self,session,source,extid):"""get eid from an external id. Return None if no record found."""assertisinstance(extid,str)cursor=session.system_sql('SELECT eid FROM entities WHERE ''extid=%(x)s AND source=%(s)s',{'x':b64encode(extid),'s':source.uri})# XXX testing rowcount cause strange bug with sqlite, results are there# but rowcount is 0#if cursor.rowcount > 0:try:result=cursor.fetchone()ifresult:returnresult[0]except:passreturnNonedeftemp_table_def(self,selected,sol,table):returnmake_schema(selected,sol,table,self.dbhelper.TYPE_MAPPING)defcreate_temp_table(self,session,table,schema):# we don't want on commit drop, this may cause problem when# running with an ldap source, and table will be deleted manually any way# on commitsql=self.dbhelper.sql_temporary_table(table,schema,False)self.doexec(session,sql)defcreate_eid(self,session):self._eid_creation_lock.acquire()try:forsqlinself.dbhelper.sqls_increment_sequence('entities_id_seq'):cursor=self.doexec(session,sql)returncursor.fetchone()[0]finally:self._eid_creation_lock.release()defadd_info(self,session,entity,source,extid=None):"""add type and source info for an eid into the system table"""# begin by inserting eid/type/source/extid into the entities tableifextidisnotNone:assertisinstance(extid,str)extid=b64encode(extid)attrs={'type':entity.id,'eid':entity.eid,'extid':extid,'source':source.uri,'mtime':datetime.now()}session.system_sql(self.sqlgen.insert('entities',attrs),attrs)defdelete_info(self,session,eid,etype,uri,extid):"""delete system information on deletion of an entity by transfering record from the entities table to the deleted_entities table """attrs={'eid':eid}session.system_sql(self.sqlgen.delete('entities',attrs),attrs)ifextidisnotNone:assertisinstance(extid,str),type(extid)extid=b64encode(extid)attrs={'type':etype,'eid':eid,'extid':extid,'source':uri,'dtime':datetime.now()}session.system_sql(self.sqlgen.insert('deleted_entities',attrs),attrs)deffti_unindex_entity(self,session,eid):"""remove text content for entity with the given eid from the full text index """try:self.indexer.cursor_unindex_object(eid,session.pool['system'])exceptException:# let KeyboardInterrupt / SystemExit propagateifself.indexerisnotNone:self.exception('error while unindexing %s',eid)deffti_index_entity(self,session,entity):"""add text content of a created/modified entity to the full text index """self.info('reindexing %r',entity.eid)try:self.indexer.cursor_reindex_object(entity.eid,entity,session.pool['system'])exceptException:# let KeyboardInterrupt / SystemExit propagateifself.indexerisnotNone:self.exception('error while reindexing %s',entity)# update entities.mtimeattrs={'eid':entity.eid,'mtime':datetime.now()}session.system_sql(self.sqlgen.update('entities',attrs,['eid']),attrs)defmodified_entities(self,session,etypes,mtime):"""return a 2-uple: * list of (etype, eid) of entities of the given types which have been modified since the given timestamp (actually entities whose full text index content has changed) * list of (etype, eid) of entities of the given types which have been deleted since the given timestamp """modsql=_modified_sql('entities',etypes)cursor=session.system_sql(modsql,{'time':mtime})modentities=cursor.fetchall()delsql=_modified_sql('deleted_entities',etypes)cursor=session.system_sql(delsql,{'time':mtime})delentities=cursor.fetchall()returnmodentities,delentitiesdefsql_schema(driver):helper=get_adv_func_helper(driver)schema="""/* Create the repository's system database */%sCREATE TABLE entities ( eid INTEGER PRIMARY KEY NOT NULL, type VARCHAR(64) NOT NULL, source VARCHAR(64) NOT NULL, mtime TIMESTAMP NOT NULL, extid VARCHAR(256));CREATE INDEX entities_type_idx ON entities(type);CREATE INDEX entities_mtime_idx ON entities(mtime);CREATE INDEX entities_extid_idx ON entities(extid);CREATE TABLE deleted_entities ( eid INTEGER PRIMARY KEY NOT NULL, type VARCHAR(64) NOT NULL, source VARCHAR(64) NOT NULL, dtime TIMESTAMP NOT NULL, extid VARCHAR(256));CREATE INDEX deleted_entities_type_idx ON deleted_entities(type);CREATE INDEX deleted_entities_dtime_idx ON deleted_entities(dtime);CREATE INDEX deleted_entities_extid_idx ON deleted_entities(extid);"""%helper.sql_create_sequence('entities_id_seq')returnschemadefsql_drop_schema(driver):helper=get_adv_func_helper(driver)return"""%sDROP TABLE entities;DROP TABLE deleted_entities;"""%helper.sql_drop_sequence('entities_id_seq')defgrant_schema(user,set_owner=True):result=''ifset_owner:result='ALTER TABLE entities OWNER TO %s;\n'%userresult+='ALTER TABLE deleted_entities OWNER TO %s;\n'%userresult+='ALTER TABLE entities_id_seq OWNER TO %s;\n'%userresult+='GRANT ALL ON entities TO %s;\n'%userresult+='GRANT ALL ON deleted_entities TO %s;\n'%userresult+='GRANT ALL ON entities_id_seq TO %s;\n'%userreturnresult