"""Adapters for native cubicweb sources.:organization: Logilab:copyright: 2001-2008 LOGILAB S.A. (Paris, FRANCE), all rights reserved.:contact: http://www.logilab.fr/ -- mailto:contact@logilab.fr"""__docformat__="restructuredtext en"fromthreadingimportLockfrommx.DateTimeimportnowfromlogilab.common.cacheimportCachefromlogilab.common.configurationimportREQUIREDfromlogilab.common.adbhimportget_adv_func_helperfromindexerimportget_indexerfromcubicwebimportUnknownEid,AuthenticationError,Binary,serverfromcubicweb.server.utilsimportcrypt_passwordfromcubicweb.server.sqlutilsimportSQLAdapterMixInfromcubicweb.server.rqlannotationimportset_qdatafromcubicweb.server.sourcesimportAbstractSourcefromcubicweb.server.sources.rql2sqlimportSQLGeneratorNONSYSTEM_ETYPES=set()NONSYSTEM_RELATIONS=set()classLogCursor(object):def__init__(self,cursor):self.cu=cursordefexecute(self,query,args=None):"""Execute a query. it's a function just so that it shows up in profiling """ifserver.DEBUG:print'exec',query,argstry:self.cu.execute(str(query),args)exceptException,ex:print"sql: %r\n args: %s\ndbms message: %r"%(query,args,ex.args[0])raisedeffetchall(self):returnself.cu.fetchall()deffetchone(self):returnself.cu.fetchone()defmake_schema(selected,solution,table,typemap):"""return a sql schema to store RQL query result"""sql=[]varmap={}fori,terminenumerate(selected):name='C%s'%ikey=term.as_string()varmap[key]='%s.%s'%(table,name)ttype=term.get_type(solution)try:sql.append('%s%s'%(name,typemap[ttype]))exceptKeyError:# assert not schema(ttype).is_final()sql.append('%s%s'%(name,typemap['Int']))return','.join(sql),varmapdef_modified_sql(table,etypes):# XXX protect against sql injectioniflen(etypes)>1:restr='type IN (%s)'%','.join("'%s'"%etypeforetypeinetypes)else:restr="type='%s'"%etypes[0]iftable=='entities':attr='mtime'else:attr='dtime'return'SELECT type, eid FROM %s WHERE %s AND %s > %%(time)s'%(table,restr,attr)classNativeSQLSource(SQLAdapterMixIn,AbstractSource):"""adapter for source using the native cubicweb schema (see below) """# need default value on class since migration doesn't call init methodhas_deleted_entitites_table=Truepasswd_rql="Any P WHERE X is EUser, X login %(login)s, X upassword P"auth_rql="Any X WHERE X is EUser, X login %(login)s, X upassword %(pwd)s"_sols=({'X':'EUser','P':'Password'},)options=(('db-driver',{'type':'string','default':'postgres','help':'database driver (postgres or sqlite)','group':'native-source','inputlevel':1,}),('db-host',{'type':'string','default':'','help':'database host','group':'native-source','inputlevel':1,}),('db-name',{'type':'string','default':REQUIRED,'help':'database name','group':'native-source','inputlevel':0,}),('db-user',{'type':'string','default':'cubicweb','help':'database user','group':'native-source','inputlevel':0,}),('db-password',{'type':'password','default':'','help':'database password','group':'native-source','inputlevel':0,}),('db-encoding',{'type':'string','default':'utf8','help':'database encoding','group':'native-source','inputlevel':1,}),)def__init__(self,repo,appschema,source_config,*args,**kwargs):SQLAdapterMixIn.__init__(self,source_config)AbstractSource.__init__(self,repo,appschema,source_config,*args,**kwargs)# sql generatorself._rql_sqlgen=SQLGenerator(appschema,self.dbhelper,self.encoding)# full text index helperself.indexer=get_indexer(self.dbdriver,self.encoding)# advanced functionality helperself.dbhelper.fti_uid_attr=self.indexer.uid_attrself.dbhelper.fti_table=self.indexer.tableself.dbhelper.fti_restriction_sql=self.indexer.restriction_sqlself.dbhelper.fti_need_distinct_query=self.indexer.need_distinct# sql queries cacheself._cache=Cache(repo.config['rql-cache-size'])self._temp_table_data={}self._eid_creation_lock=Lock()defreset_caches(self):"""method called during test to reset potential source caches"""self._cache=Cache(self.repo.config['rql-cache-size'])defclear_eid_cache(self,eid,etype):"""clear potential caches for the given eid"""self._cache.pop('%s X WHERE X eid %s'%(etype,eid),None)self._cache.pop('Any X WHERE X eid %s'%eid,None)defsqlexec(self,session,sql,args=None):"""execute the query and return its result"""cursor=session.pool[self.uri]self.doexec(cursor,sql,args)returnself.process_result(cursor)definit_creating(self):# check full text index availibilitypool=self.repo._get_pool()ifnotself.indexer.has_fti_table(pool['system']):self.error('no text index table')self.indexer=Noneself.repo._free_pool(pool)definit(self):self.init_creating()pool=self.repo._get_pool()# XXX cubicweb < 2.42 compatif'deleted_entities'inself.dbhelper.list_tables(pool['system']):self.has_deleted_entitites_table=Trueelse:self.has_deleted_entitites_table=Falseself.repo._free_pool(pool)# ISource interface #######################################################defcompile_rql(self,rql):rqlst=self.repo.querier._rqlhelper.parse(rql)rqlst.restricted_vars=()rqlst.children[0].solutions=self._solsself.repo.querier.sqlgen_annotate(rqlst)set_qdata(self.schema.rschema,rqlst,())returnrqlstdefset_schema(self,schema):"""set the application'schema"""self._cache=Cache(self.repo.config['rql-cache-size'])self.cache_hit,self.cache_miss,self.no_cache=0,0,0self.schema=schematry:self._rql_sqlgen.schema=schemaexceptAttributeError:pass# __init__if'EUser'inschema:# probably an empty schema if not true...# rql syntax trees used to authenticate usersself._passwd_rqlst=self.compile_rql(self.passwd_rql)self._auth_rqlst=self.compile_rql(self.auth_rql)defsupport_entity(self,etype,write=False):"""return true if the given entity's type is handled by this adapter if write is true, return true only if it's a RW support """returnnotetypeinNONSYSTEM_ETYPESdefsupport_relation(self,rtype,write=False):"""return true if the given relation's type is handled by this adapter if write is true, return true only if it's a RW support """ifwrite:returnnotrtypeinNONSYSTEM_RELATIONS# due to current multi-sources implementation, the system source# can't claim not supporting a relation returnTrue#not rtype == 'content_for'defauthenticate(self,session,login,password):"""return EUser eid for the given login/password if this account is defined in this source, else raise `AuthenticationError` two queries are needed since passwords are stored crypted, so we have to fetch the salt first """args={'login':login,'pwd':password}ifpasswordisnotNone:rset=self.syntax_tree_search(session,self._passwd_rqlst,args)try:pwd=rset[0][0]exceptIndexError:raiseAuthenticationError('bad login')# passwords are stored using the bytea type, so we get a StringIOifpwdisnotNone:args['pwd']=crypt_password(password,pwd.getvalue()[:2])# get eid from login and (crypted) passwordrset=self.syntax_tree_search(session,self._auth_rqlst,args)try:returnrset[0][0]exceptIndexError:raiseAuthenticationError('bad password')defsyntax_tree_search(self,session,union,args=None,cachekey=None,varmap=None):"""return result from this source for a rql query (actually from a rql syntax tree and a solution dictionary mapping each used variable to a possible type). If cachekey is given, the query necessary to fetch the results (but not the results themselves) may be cached using this key. """ifserver.DEBUG:print'RQL FOR NATIVE SOURCE',self.uri,cachekeyifvarmap:print'USING VARMAP',varmapprintunion.as_string()ifargs:print'ARGS',argsprint'SOLUTIONS',','.join(str(s.solutions)forsinunion.children)# remember number of actually selected term (sql generation may append some)ifcachekeyisNone:self.no_cache+=1# generate sql query if we are able to do so (not supported types...)sql,query_args=self._rql_sqlgen.generate(union,args,varmap)else:# sql may be cachedtry:sql,query_args=self._cache[cachekey]self.cache_hit+=1exceptKeyError:self.cache_miss+=1sql,query_args=self._rql_sqlgen.generate(union,args,varmap)self._cache[cachekey]=sql,query_argsargs=self.merge_args(args,query_args)cursor=session.pool[self.uri]assertisinstance(sql,basestring),repr(sql)try:self.doexec(cursor,sql,args)except(self.dbapi_module.OperationalError,self.dbapi_module.InterfaceError):# FIXME: better detection of deconnection pbself.info("request failed '%s' ... retry with a new cursor",sql)session.pool.reconnect(self)cursor=session.pool[self.uri]self.doexec(cursor,sql,args)res=self.process_result(cursor)ifserver.DEBUG:print'------>',resreturnresdefflying_insert(self,table,session,union,args=None,varmap=None):"""similar as .syntax_tree_search, but inserts data in the temporary table (on-the-fly if possible, eg for the system source whose the given cursor come from). If not possible, inserts all data by calling .executemany(). """ifself.uri=='system':ifserver.DEBUG:print'FLYING RQL FOR SOURCE',self.uriifvarmap:print'USING VARMAP',varmapprintunion.as_string()print'SOLUTIONS',','.join(str(s.solutions)forsinunion.children)# generate sql queries if we are able to do sosql,query_args=self._rql_sqlgen.generate(union,args,varmap)query='INSERT INTO %s%s'%(table,sql.encode(self.encoding))self.doexec(session.pool[self.uri],query,self.merge_args(args,query_args))# XXX commented until it's proved to be necessary# # XXX probably inefficient# tempdata = self._temp_table_data.setdefault(table, set())# cursor = session.pool[self.uri]# cursor.execute('select * from %s' % table)# for row in cursor.fetchall():# print 'data', row# tempdata.add(tuple(row))else:super(NativeSQLSource,self).flying_insert(table,session,union,args,varmap)def_manual_insert(self,results,table,session):"""insert given result into a temporary table on the system source"""#print 'manual insert', table, resultsifnotresults:return#cursor.execute('select * from %s'%table)#assert len(cursor.fetchall())== 0encoding=self.encoding# added chr to be sqlite compatiblequery_args=['%%(%s)s'%iforiinxrange(len(results[0]))]query='INSERT INTO %s VALUES(%s)'%(table,','.join(query_args))kwargs_list=[]# tempdata = self._temp_table_data.setdefault(table, set())forrowinresults:kwargs={}row=tuple(row)# XXX commented until it's proved to be necessary# if row in tempdata:# continue# tempdata.add(row)forindex,cellinenumerate(row):iftype(cell)isunicode:cell=cell.encode(encoding)elifisinstance(cell,Binary):cell=self.binary(cell.getvalue())kwargs[str(index)]=cellkwargs_list.append(kwargs)self.doexecmany(session.pool[self.uri],query,kwargs_list)defclean_temp_data(self,session,temptables):"""remove temporary data, usually associated to temporary tables"""iftemptables:cursor=session.pool[self.uri]fortableintemptables:try:self.doexec(cursor,'DROP TABLE %s'%table)except:passtry:delself._temp_table_data[table]exceptKeyError:continuedefadd_entity(self,session,entity):"""add a new entity to the source"""attrs=self.preprocess_entity(entity)sql=self.sqlgen.insert(str(entity.e_schema),attrs)self.doexec(session.pool[self.uri],sql,attrs)defupdate_entity(self,session,entity):"""replace an entity in the source"""attrs=self.preprocess_entity(entity)sql=self.sqlgen.update(str(entity.e_schema),attrs,['eid'])self.doexec(session.pool[self.uri],sql,attrs)defdelete_entity(self,session,etype,eid):"""delete an entity from the source"""attrs={'eid':eid}sql=self.sqlgen.delete(etype,attrs)self.doexec(session.pool[self.uri],sql,attrs)defadd_relation(self,session,subject,rtype,object):"""add a relation to the source"""attrs={'eid_from':subject,'eid_to':object}sql=self.sqlgen.insert('%s_relation'%rtype,attrs)self.doexec(session.pool[self.uri],sql,attrs)defdelete_relation(self,session,subject,rtype,object):"""delete a relation from the source"""rschema=self.schema.rschema(rtype)ifrschema.inlined:etype=session.describe(subject)[0]sql='UPDATE %s SET %s=NULL WHERE eid=%%(eid)s'%(etype,rtype)attrs={'eid':subject}else:attrs={'eid_from':subject,'eid_to':object}sql=self.sqlgen.delete('%s_relation'%rtype,attrs)self.doexec(session.pool[self.uri],sql,attrs)defdoexec(self,cursor,query,args=None):"""Execute a query. it's a function just so that it shows up in profiling """#t1 = time()ifserver.DEBUG:print'exec',query,args#import sys#sys.stdout.flush()# str(query) to avoid error if it's an unicode stringtry:cursor.execute(str(query),args)exceptException,ex:self.critical("sql: %r\n args: %s\ndbms message: %r",query,args,ex.args[0])raisedefdoexecmany(self,cursor,query,args):"""Execute a query. it's a function just so that it shows up in profiling """#t1 = time()ifserver.DEBUG:print'execmany',query,'with',len(args),'arguments'#import sys#sys.stdout.flush()# str(query) to avoid error if it's an unicode stringtry:cursor.executemany(str(query),args)except:self.critical("sql many: %r\n args: %s",query,args)raise# short cut to method requiring advanced db helper usage ##################defcreate_index(self,session,table,column,unique=False):cursor=LogCursor(session.pool[self.uri])self.dbhelper.create_index(cursor,table,column,unique)defdrop_index(self,session,table,column,unique=False):cursor=LogCursor(session.pool[self.uri])self.dbhelper.drop_index(cursor,table,column,unique)# system source interface #################################################defeid_type_source(self,session,eid):"""return a tuple (type, source, extid) for the entity with id <eid>"""sql='SELECT type, source, extid FROM entities WHERE eid=%s'%eidtry:res=session.system_sql(sql).fetchone()except:raiseUnknownEid(eid)ifresisNone:raiseUnknownEid(eid)returnresdefextid2eid(self,session,source,lid):"""get eid from a local id. An eid is attributed if no record is found"""cursor=session.system_sql('SELECT eid FROM entities WHERE ''extid=%(x)s AND source=%(s)s',# str() necessary with pg 8.3{'x':str(lid),'s':source.uri})# XXX testing rowcount cause strange bug with sqlite, results are there# but rowcount is 0#if cursor.rowcount > 0: try:result=cursor.fetchone()ifresult:eid=result[0]returneidexcept:passreturnNonedeftemp_table_def(self,selected,sol,table):returnmake_schema(selected,sol,table,self.dbhelper.TYPE_MAPPING)defcreate_temp_table(self,session,table,schema):# we don't want on commit drop, this may cause problem when# running with an ldap source, and table will be deleted manually any way# on commitsql=self.dbhelper.sql_temporary_table(table,schema,False)self.doexec(session.pool[self.uri],sql)defcreate_eid(self,session):self._eid_creation_lock.acquire()try:cursor=session.pool[self.uri]forsqlinself.dbhelper.sqls_increment_sequence('entities_id_seq'):self.doexec(cursor,sql)returncursor.fetchone()[0]finally:self._eid_creation_lock.release()defadd_info(self,session,entity,source,extid=None):"""add type and source info for an eid into the system table"""# begin by inserting eid/type/source/extid into the entities tableattrs={'type':str(entity.e_schema),'eid':entity.eid,'extid':extid,'source':source.uri,'mtime':now()}session.system_sql(self.sqlgen.insert('entities',attrs),attrs)defdelete_info(self,session,eid,etype,uri,extid):"""delete system information on deletion of an entity by transfering record from the entities table to the deleted_entities table """attrs={'eid':eid}session.system_sql(self.sqlgen.delete('entities',attrs),attrs)ifself.has_deleted_entitites_table:attrs={'type':etype,'eid':eid,'extid':extid,'source':uri,'dtime':now()}session.system_sql(self.sqlgen.insert('deleted_entities',attrs),attrs)deffti_unindex_entity(self,session,eid):"""remove text content for entity with the given eid from the full text index """try:self.indexer.cursor_unindex_object(eid,session.pool['system'])except:ifself.indexerisnotNone:self.exception('error while unindexing %s',eid)deffti_index_entity(self,session,entity):"""add text content of a created/modified entity to the full text index """self.info('reindexing %r',entity.eid)try:self.indexer.cursor_reindex_object(entity.eid,entity,session.pool['system'])except:ifself.indexerisnotNone:self.exception('error while reindexing %s',entity)# update entities.mtimeattrs={'eid':entity.eid,'mtime':now()}session.system_sql(self.sqlgen.update('entities',attrs,['eid']),attrs)defmodified_entities(self,session,etypes,mtime):"""return a 2-uple: * list of (etype, eid) of entities of the given types which have been modified since the given timestamp (actually entities whose full text index content has changed) * list of (etype, eid) of entities of the given types which have been deleted since the given timestamp """modsql=_modified_sql('entities',etypes)cursor=session.system_sql(modsql,{'time':mtime})modentities=cursor.fetchall()delsql=_modified_sql('deleted_entities',etypes)cursor=session.system_sql(delsql,{'time':mtime})delentities=cursor.fetchall()returnmodentities,delentitiesdefsql_schema(driver):helper=get_adv_func_helper(driver)schema="""/* Create the repository's system database */%sCREATE TABLE entities ( eid INTEGER PRIMARY KEY NOT NULL, type VARCHAR(64) NOT NULL, source VARCHAR(64) NOT NULL, mtime TIMESTAMP NOT NULL, extid VARCHAR(256));CREATE INDEX entities_type_idx ON entities(type);CREATE INDEX entities_mtime_idx ON entities(mtime);CREATE INDEX entities_extid_idx ON entities(extid);CREATE TABLE deleted_entities ( eid INTEGER PRIMARY KEY NOT NULL, type VARCHAR(64) NOT NULL, source VARCHAR(64) NOT NULL, dtime TIMESTAMP NOT NULL, extid VARCHAR(256));CREATE INDEX deleted_entities_type_idx ON deleted_entities(type);CREATE INDEX deleted_entities_dtime_idx ON deleted_entities(dtime);CREATE INDEX deleted_entities_extid_idx ON deleted_entities(extid);"""%helper.sql_create_sequence('entities_id_seq')returnschemadefsql_drop_schema(driver):helper=get_adv_func_helper(driver)return"""%sDROP TABLE entities;DROP TABLE deleted_entities;"""%helper.sql_drop_sequence('entities_id_seq')defgrant_schema(user,set_owner=True):result=''ifset_owner:result='ALTER TABLE entities OWNER TO %s;\n'%userresult+='ALTER TABLE deleted_entities OWNER TO %s;\n'%userresult+='ALTER TABLE entities_id_seq OWNER TO %s;\n'%userresult+='GRANT ALL ON entities TO %s;\n'%userresult+='GRANT ALL ON deleted_entities TO %s;\n'%userresult+='GRANT ALL ON entities_id_seq TO %s;\n'%userreturnresult