[widget] allow to specify hour/minute separator on the JQueryTimePicker (vgodard patch)
"""Adapters for native cubicweb sources.Notes:* extid (aka external id, the primary key of an entity in the external source from which it comes from) are stored in a varchar column encoded as a base64 string. This is because it should actually be Bytes but we want an index on it for fast querying.:organization: Logilab:copyright: 2001-2010 LOGILAB S.A. (Paris, FRANCE), license is LGPL v2.:contact: http://www.logilab.fr/ -- mailto:contact@logilab.fr:license: GNU Lesser General Public License, v2.1 - http://www.gnu.org/licenses"""__docformat__="restructuredtext en"fromthreadingimportLockfromdatetimeimportdatetimefrombase64importb64decode,b64encodefromlogilab.common.compatimportanyfromlogilab.common.cacheimportCachefromlogilab.common.decoratorsimportcached,clear_cachefromlogilab.common.configurationimportMethodfromlogilab.common.adbhimportget_adv_func_helperfromlogilab.common.shellutilsimportgetloginfromindexerimportget_indexerfromcubicwebimportUnknownEid,AuthenticationError,Binary,serverfromcubicweb.cwconfigimportCubicWebNoAppConfigurationfromcubicweb.serverimporthookfromcubicweb.server.utilsimportcrypt_passwordfromcubicweb.server.sqlutilsimportSQL_PREFIX,SQLAdapterMixInfromcubicweb.server.rqlannotationimportset_qdatafromcubicweb.server.sourcesimportAbstractSource,dbg_st_search,dbg_resultsfromcubicweb.server.sources.rql2sqlimportSQLGeneratorATTR_MAP={}NONSYSTEM_ETYPES=set()NONSYSTEM_RELATIONS=set()classLogCursor(object):def__init__(self,cursor):self.cu=cursordefexecute(self,query,args=None):"""Execute a query. it's a function just so that it shows up in profiling """ifserver.DEBUG&server.DBG_SQL:print'exec',query,argstry:self.cu.execute(str(query),args)exceptException,ex:print"sql: %r\n args: %s\ndbms message: %r"%(query,args,ex.args[0])raisedeffetchall(self):returnself.cu.fetchall()deffetchone(self):returnself.cu.fetchone()defmake_schema(selected,solution,table,typemap):"""return a sql schema to store RQL query result"""sql=[]varmap={}fori,terminenumerate(selected):name='C%s'%ikey=term.as_string()varmap[key]='%s.%s'%(table,name)ttype=term.get_type(solution)try:sql.append('%s%s'%(name,typemap[ttype]))exceptKeyError:# assert not schema(ttype).finalsql.append('%s%s'%(name,typemap['Int']))return','.join(sql),varmapdef_modified_sql(table,etypes):# XXX protect against sql injectioniflen(etypes)>1:restr='type IN (%s)'%','.join("'%s'"%etypeforetypeinetypes)else:restr="type='%s'"%etypes[0]iftable=='entities':attr='mtime'else:attr='dtime'return'SELECT type, eid FROM %s WHERE %s AND %s > %%(time)s'%(table,restr,attr)classNativeSQLSource(SQLAdapterMixIn,AbstractSource):"""adapter for source using the native cubicweb schema (see below) """sqlgen_class=SQLGeneratoroptions=(('db-driver',{'type':'string','default':'postgres','help':'database driver (postgres or sqlite)','group':'native-source','inputlevel':1,}),('db-host',{'type':'string','default':'','help':'database host','group':'native-source','inputlevel':1,}),('db-port',{'type':'string','default':'','help':'database port','group':'native-source','inputlevel':1,}),('db-name',{'type':'string','default':Method('default_instance_id'),'help':'database name','group':'native-source','inputlevel':0,}),('db-user',{'type':'string','default':CubicWebNoAppConfiguration.mode=='user'andgetlogin()or'cubicweb','help':'database user','group':'native-source','inputlevel':0,}),('db-password',{'type':'password','default':'','help':'database password','group':'native-source','inputlevel':0,}),('db-encoding',{'type':'string','default':'utf8','help':'database encoding','group':'native-source','inputlevel':1,}),)def__init__(self,repo,appschema,source_config,*args,**kwargs):SQLAdapterMixIn.__init__(self,source_config)self.authentifiers=[LoginPasswordAuthentifier(self)]AbstractSource.__init__(self,repo,appschema,source_config,*args,**kwargs)# full text index helperself.do_fti=notrepo.config['delay-full-text-indexation']ifself.do_fti:self.indexer=get_indexer(self.dbdriver,self.encoding)# XXX should go away with logilab.dbself.dbhelper.fti_uid_attr=self.indexer.uid_attrself.dbhelper.fti_table=self.indexer.tableself.dbhelper.fti_restriction_sql=self.indexer.restriction_sqlself.dbhelper.fti_need_distinct_query=self.indexer.need_distinctelse:self.dbhelper.fti_need_distinct_query=False# sql generatorself._rql_sqlgen=self.sqlgen_class(appschema,self.dbhelper,self.encoding,ATTR_MAP.copy())# sql queries cacheself._cache=Cache(repo.config['rql-cache-size'])self._temp_table_data={}self._eid_creation_lock=Lock()# XXX no_sqlite_wrap trick since we've a sqlite locking pb when# running unittest_multisources with the wrapping belowifself.dbdriver=='sqlite'and \notgetattr(repo.config,'no_sqlite_wrap',False):fromcubicweb.server.sources.extliteimportConnectionWrapperself.get_connection=lambda:ConnectionWrapper(self)self.check_connection=lambdacnx:cnxdefpool_reset(cnx):cnx.close()self.pool_reset=pool_reset@propertydef_sqlcnx(self):# XXX: sqlite connections can only be used in the same thread, so# create a new one each time necessary. If it appears to be time# consuming, find another wayreturnSQLAdapterMixIn.get_connection(self)defadd_authentifier(self,authentifier):self.authentifiers.append(authentifier)authentifier.source=selfauthentifier.set_schema(self.schema)defreset_caches(self):"""method called during test to reset potential source caches"""self._cache=Cache(self.repo.config['rql-cache-size'])defclear_eid_cache(self,eid,etype):"""clear potential caches for the given eid"""self._cache.pop('Any X WHERE X eid %s, X is %s'%(eid,etype),None)self._cache.pop('Any X WHERE X eid %s'%eid,None)self._cache.pop('Any %s'%eid,None)defsqlexec(self,session,sql,args=None):"""execute the query and return its result"""returnself.process_result(self.doexec(session,sql,args))definit_creating(self):pool=self.repo._get_pool()pool.pool_set()# check full text index availibilityifself.do_fti:ifnotself.indexer.has_fti_table(pool['system']):ifnotself.repo.config.creating:self.critical('no text index table')self.do_fti=Falsepool.pool_reset()self.repo._free_pool(pool)defbackup(self,backupfile):"""method called to create a backup of the source's data"""self.close_pool_connections()try:self.backup_to_file(backupfile)finally:self.open_pool_connections()defrestore(self,backupfile,confirm,drop):"""method called to restore a backup of source's data"""ifself.repo.config.open_connections_pools:self.close_pool_connections()try:self.restore_from_file(backupfile,confirm,drop=drop)finally:ifself.repo.config.open_connections_pools:self.open_pool_connections()definit(self):self.init_creating()defmap_attribute(self,etype,attr,cb):self._rql_sqlgen.attr_map['%s.%s'%(etype,attr)]=cbdefunmap_attribute(self,etype,attr):self._rql_sqlgen.attr_map.pop('%s.%s'%(etype,attr),None)# ISource interface #######################################################defcompile_rql(self,rql,sols):rqlst=self.repo.vreg.rqlhelper.parse(rql)rqlst.restricted_vars=()rqlst.children[0].solutions=solsself.repo.querier.sqlgen_annotate(rqlst)set_qdata(self.schema.rschema,rqlst,())returnrqlstdefset_schema(self,schema):"""set the instance'schema"""self._cache=Cache(self.repo.config['rql-cache-size'])self.cache_hit,self.cache_miss,self.no_cache=0,0,0self.schema=schematry:self._rql_sqlgen.schema=schemaexceptAttributeError:pass# __init__forauthentifierinself.authentifiers:authentifier.set_schema(self.schema)clear_cache(self,'need_fti_indexation')defsupport_entity(self,etype,write=False):"""return true if the given entity's type is handled by this adapter if write is true, return true only if it's a RW support """returnnotetypeinNONSYSTEM_ETYPESdefsupport_relation(self,rtype,write=False):"""return true if the given relation's type is handled by this adapter if write is true, return true only if it's a RW support """ifwrite:returnnotrtypeinNONSYSTEM_RELATIONS# due to current multi-sources implementation, the system source# can't claim not supporting a relationreturnTrue#not rtype == 'content_for'defmay_cross_relation(self,rtype):returnTruedefauthenticate(self,session,login,**kwargs):"""return CWUser eid for the given login and other authentication information found in kwargs, else raise `AuthenticationError` """forauthentifierinself.authentifiers:try:returnauthentifier.authenticate(session,login,**kwargs)exceptAuthenticationError:continueraiseAuthenticationError()defsyntax_tree_search(self,session,union,args=None,cachekey=None,varmap=None):"""return result from this source for a rql query (actually from a rql syntax tree and a solution dictionary mapping each used variable to a possible type). If cachekey is given, the query necessary to fetch the results (but not the results themselves) may be cached using this key. """assertdbg_st_search(self.uri,union,varmap,args,cachekey)# remember number of actually selected term (sql generation may append some)ifcachekeyisNone:self.no_cache+=1# generate sql query if we are able to do so (not supported types...)sql,query_args=self._rql_sqlgen.generate(union,args,varmap)else:# sql may be cachedtry:sql,query_args=self._cache[cachekey]self.cache_hit+=1exceptKeyError:self.cache_miss+=1sql,query_args=self._rql_sqlgen.generate(union,args,varmap)self._cache[cachekey]=sql,query_argsargs=self.merge_args(args,query_args)assertisinstance(sql,basestring),repr(sql)try:cursor=self.doexec(session,sql,args)except(self.dbapi_module.OperationalError,self.dbapi_module.InterfaceError):# FIXME: better detection of deconnection pbself.info("request failed '%s' ... retry with a new cursor",sql)session.pool.reconnect(self)cursor=self.doexec(session,sql,args)results=self.process_result(cursor)assertdbg_results(results)returnresultsdefflying_insert(self,table,session,union,args=None,varmap=None):"""similar as .syntax_tree_search, but inserts data in the temporary table (on-the-fly if possible, eg for the system source whose the given cursor come from). If not possible, inserts all data by calling .executemany(). """assertdbg_st_search(self.uri,union,varmap,args,prefix='ON THE FLY temp data insertion into %s from'%table)# generate sql queries if we are able to do sosql,query_args=self._rql_sqlgen.generate(union,args,varmap)query='INSERT INTO %s%s'%(table,sql.encode(self.encoding))self.doexec(session,query,self.merge_args(args,query_args))defmanual_insert(self,results,table,session):"""insert given result into a temporary table on the system source"""ifserver.DEBUG&server.DBG_RQL:print' manual insertion of',results,'into',tableifnotresults:returnquery_args=['%%(%s)s'%iforiinxrange(len(results[0]))]query='INSERT INTO %s VALUES(%s)'%(table,','.join(query_args))kwargs_list=[]forrowinresults:kwargs={}row=tuple(row)forindex,cellinenumerate(row):ifisinstance(cell,Binary):cell=self.binary(cell.getvalue())kwargs[str(index)]=cellkwargs_list.append(kwargs)self.doexecmany(session,query,kwargs_list)defclean_temp_data(self,session,temptables):"""remove temporary data, usually associated to temporary tables"""iftemptables:fortableintemptables:try:self.doexec(session,'DROP TABLE %s'%table)except:passtry:delself._temp_table_data[table]exceptKeyError:continuedefadd_entity(self,session,entity):"""add a new entity to the source"""attrs=self.preprocess_entity(entity)sql=self.sqlgen.insert(SQL_PREFIX+str(entity.e_schema),attrs)self.doexec(session,sql,attrs)defupdate_entity(self,session,entity):"""replace an entity in the source"""attrs=self.preprocess_entity(entity)sql=self.sqlgen.update(SQL_PREFIX+str(entity.e_schema),attrs,[SQL_PREFIX+'eid'])self.doexec(session,sql,attrs)defdelete_entity(self,session,etype,eid):"""delete an entity from the source"""attrs={SQL_PREFIX+'eid':eid}sql=self.sqlgen.delete(SQL_PREFIX+etype,attrs)self.doexec(session,sql,attrs)defadd_relation(self,session,subject,rtype,object,inlined=False):"""add a relation to the source"""ifinlinedisFalse:attrs={'eid_from':subject,'eid_to':object}sql=self.sqlgen.insert('%s_relation'%rtype,attrs)else:# used by data importetype=session.describe(subject)[0]attrs={SQL_PREFIX+'eid':subject,SQL_PREFIX+rtype:object}sql=self.sqlgen.update(SQL_PREFIX+etype,attrs,[SQL_PREFIX+'eid'])self.doexec(session,sql,attrs)defdelete_relation(self,session,subject,rtype,object):"""delete a relation from the source"""rschema=self.schema.rschema(rtype)ifrschema.inlined:table=SQL_PREFIX+session.describe(subject)[0]column=SQL_PREFIX+rtypesql='UPDATE %s SET %s=NULL WHERE %seid=%%(eid)s'%(table,column,SQL_PREFIX)attrs={'eid':subject}else:attrs={'eid_from':subject,'eid_to':object}sql=self.sqlgen.delete('%s_relation'%rtype,attrs)self.doexec(session,sql,attrs)defdoexec(self,session,query,args=None,rollback=True):"""Execute a query. it's a function just so that it shows up in profiling """cursor=session.pool[self.uri]ifserver.DEBUG&server.DBG_SQL:cnx=session.pool.connection(self.uri)# getattr to get the actual connection if cnx is a ConnectionWrapper# instanceprint'exec',query,args,getattr(cnx,'_cnx',cnx)try:# str(query) to avoid error if it's an unicode stringcursor.execute(str(query),args)exceptException,ex:ifself.repo.config.mode!='test':# during test we get those message when trying to alter sqlite# db schemaself.critical("sql: %r\n args: %s\ndbms message: %r",query,args,ex.args[0])ifrollback:try:session.pool.connection(self.uri).rollback()ifself.repo.config.mode!='test':self.critical('transaction has been rollbacked')except:passraisereturncursordefdoexecmany(self,session,query,args):"""Execute a query. it's a function just so that it shows up in profiling """ifserver.DEBUG&server.DBG_SQL:print'execmany',query,'with',len(args),'arguments'cursor=session.pool[self.uri]try:# str(query) to avoid error if it's an unicode stringcursor.executemany(str(query),args)exceptException,ex:ifself.repo.config.mode!='test':# during test we get those message when trying to alter sqlite# db schemaself.critical("sql many: %r\n args: %s\ndbms message: %r",query,args,ex.args[0])try:session.pool.connection(self.uri).rollback()ifself.repo.config.mode!='test':self.critical('transaction has been rollbacked')except:passraise# short cut to method requiring advanced db helper usage ##################defcreate_index(self,session,table,column,unique=False):cursor=LogCursor(session.pool[self.uri])self.dbhelper.create_index(cursor,table,column,unique)defdrop_index(self,session,table,column,unique=False):cursor=LogCursor(session.pool[self.uri])self.dbhelper.drop_index(cursor,table,column,unique)# system source interface #################################################defeid_type_source(self,session,eid):"""return a tuple (type, source, extid) for the entity with id <eid>"""sql='SELECT type, source, extid FROM entities WHERE eid=%s'%eidtry:res=session.system_sql(sql).fetchone()except:assertsession.pool,'session has no pool set'raiseUnknownEid(eid)ifresisNone:raiseUnknownEid(eid)ifres[-1]isnotNone:ifnotisinstance(res,list):res=list(res)res[-1]=b64decode(res[-1])returnresdefextid2eid(self,session,source,extid):"""get eid from an external id. Return None if no record found."""assertisinstance(extid,str)cursor=session.system_sql('SELECT eid FROM entities WHERE ''extid=%(x)s AND source=%(s)s',{'x':b64encode(extid),'s':source.uri})# XXX testing rowcount cause strange bug with sqlite, results are there# but rowcount is 0#if cursor.rowcount > 0:try:result=cursor.fetchone()ifresult:returnresult[0]except:passreturnNonedeftemp_table_def(self,selected,sol,table):returnmake_schema(selected,sol,table,self.dbhelper.TYPE_MAPPING)defcreate_temp_table(self,session,table,schema):# we don't want on commit drop, this may cause problem when# running with an ldap source, and table will be deleted manually any way# on commitsql=self.dbhelper.sql_temporary_table(table,schema,False)self.doexec(session,sql)defcreate_eid(self,session):self._eid_creation_lock.acquire()try:forsqlinself.dbhelper.sqls_increment_sequence('entities_id_seq'):cursor=self.doexec(session,sql)returncursor.fetchone()[0]finally:self._eid_creation_lock.release()defadd_info(self,session,entity,source,extid=None,complete=True):"""add type and source info for an eid into the system table"""# begin by inserting eid/type/source/extid into the entities tableifextidisnotNone:assertisinstance(extid,str)extid=b64encode(extid)attrs={'type':entity.__regid__,'eid':entity.eid,'extid':extid,'source':source.uri,'mtime':datetime.now()}session.system_sql(self.sqlgen.insert('entities',attrs),attrs)# now we can update the full text indexifself.do_ftiandself.need_fti_indexation(entity.__regid__):ifcomplete:entity.complete(entity.e_schema.indexable_attributes())FTIndexEntityOp(session,entity=entity)defupdate_info(self,session,entity,need_fti_update):ifself.do_ftiandneed_fti_update:# reindex the entity only if this query is updating at least# one indexable attributeFTIndexEntityOp(session,entity=entity)# update entities.mtimeattrs={'eid':entity.eid,'mtime':datetime.now()}session.system_sql(self.sqlgen.update('entities',attrs,['eid']),attrs)defdelete_info(self,session,eid,etype,uri,extid):"""delete system information on deletion of an entity by transfering record from the entities table to the deleted_entities table """attrs={'eid':eid}session.system_sql(self.sqlgen.delete('entities',attrs),attrs)ifextidisnotNone:assertisinstance(extid,str),type(extid)extid=b64encode(extid)attrs={'type':etype,'eid':eid,'extid':extid,'source':uri,'dtime':datetime.now()}session.system_sql(self.sqlgen.insert('deleted_entities',attrs),attrs)defmodified_entities(self,session,etypes,mtime):"""return a 2-uple: * list of (etype, eid) of entities of the given types which have been modified since the given timestamp (actually entities whose full text index content has changed) * list of (etype, eid) of entities of the given types which have been deleted since the given timestamp """modsql=_modified_sql('entities',etypes)cursor=session.system_sql(modsql,{'time':mtime})modentities=cursor.fetchall()delsql=_modified_sql('deleted_entities',etypes)cursor=session.system_sql(delsql,{'time':mtime})delentities=cursor.fetchall()returnmodentities,delentities# full text index handling #################################################@cacheddefneed_fti_indexation(self,etype):eschema=self.schema.eschema(etype)ifany(eschema.indexable_attributes()):returnTrueifany(eschema.fulltext_containers()):returnTruereturnFalsedefindex_entity(self,session,entity):"""create an operation to [re]index textual content of the given entity on commit """FTIndexEntityOp(session,entity=entity)deffti_unindex_entity(self,session,eid):"""remove text content for entity with the given eid from the full text index """try:self.indexer.cursor_unindex_object(eid,session.pool['system'])exceptException:# let KeyboardInterrupt / SystemExit propagateself.exception('error while unindexing %s',eid)deffti_index_entity(self,session,entity):"""add text content of a created/modified entity to the full text index """self.debug('reindexing %r',entity.eid)try:# use cursor_index_object, not cursor_reindex_object since# unindexing done in the FTIndexEntityOpself.indexer.cursor_index_object(entity.eid,entity,session.pool['system'])exceptException:# let KeyboardInterrupt / SystemExit propagateself.exception('error while reindexing %s',entity)classFTIndexEntityOp(hook.LateOperation):"""operation to delay entity full text indexation to commit since fti indexing may trigger discovery of other entities, it should be triggered on precommit, not commit, and this should be done after other precommit operation which may add relations to the entity """defprecommit_event(self):session=self.sessionentity=self.entityifentity.eidinsession.transaction_data.get('pendingeids',()):return# entity added and deleted in the same transactionalreadydone=session.transaction_data.setdefault('indexedeids',set())ifentity.eidinalreadydone:self.debug('skipping reindexation of %s, already done',entity.eid)returnalreadydone.add(entity.eid)source=session.repo.system_sourceforcontainerinentity.fti_containers():source.fti_unindex_entity(session,container.eid)source.fti_index_entity(session,container)defcommit_event(self):passdefsql_schema(driver):helper=get_adv_func_helper(driver)tstamp_col_type=helper.TYPE_MAPPING['Datetime']schema="""/* Create the repository's system database */%sCREATE TABLE entities ( eid INTEGER PRIMARY KEY NOT NULL, type VARCHAR(64) NOT NULL, source VARCHAR(64) NOT NULL, mtime %s NOT NULL, extid VARCHAR(256));CREATE INDEX entities_type_idx ON entities(type);CREATE INDEX entities_mtime_idx ON entities(mtime);CREATE INDEX entities_extid_idx ON entities(extid);CREATE TABLE deleted_entities ( eid INTEGER PRIMARY KEY NOT NULL, type VARCHAR(64) NOT NULL, source VARCHAR(64) NOT NULL, dtime %s NOT NULL, extid VARCHAR(256));CREATE INDEX deleted_entities_type_idx ON deleted_entities(type);CREATE INDEX deleted_entities_dtime_idx ON deleted_entities(dtime);CREATE INDEX deleted_entities_extid_idx ON deleted_entities(extid);"""%(helper.sql_create_sequence('entities_id_seq'),tstamp_col_type,tstamp_col_type)returnschemadefsql_drop_schema(driver):helper=get_adv_func_helper(driver)return"""%sDROP TABLE entities;DROP TABLE deleted_entities;"""%helper.sql_drop_sequence('entities_id_seq')defgrant_schema(user,set_owner=True):result=''ifset_owner:result='ALTER TABLE entities OWNER TO %s;\n'%userresult+='ALTER TABLE deleted_entities OWNER TO %s;\n'%userresult+='ALTER TABLE entities_id_seq OWNER TO %s;\n'%userresult+='GRANT ALL ON entities TO %s;\n'%userresult+='GRANT ALL ON deleted_entities TO %s;\n'%userresult+='GRANT ALL ON entities_id_seq TO %s;\n'%userreturnresultclassBaseAuthentifier(object):def__init__(self,source=None):self.source=sourcedefset_schema(self,schema):"""set the instance'schema"""passclassLoginPasswordAuthentifier(BaseAuthentifier):passwd_rql="Any P WHERE X is CWUser, X login %(login)s, X upassword P"auth_rql="Any X WHERE X is CWUser, X login %(login)s, X upassword %(pwd)s"_sols=({'X':'CWUser','P':'Password'},)defset_schema(self,schema):"""set the instance'schema"""if'CWUser'inschema:# probably an empty schema if not true...# rql syntax trees used to authenticate usersself._passwd_rqlst=self.source.compile_rql(self.passwd_rql,self._sols)self._auth_rqlst=self.source.compile_rql(self.auth_rql,self._sols)defauthenticate(self,session,login,password=None,**kwargs):"""return CWUser eid for the given login/password if this account is defined in this source, else raise `AuthenticationError` two queries are needed since passwords are stored crypted, so we have to fetch the salt first """args={'login':login,'pwd':password}ifpasswordisnotNone:rset=self.source.syntax_tree_search(session,self._passwd_rqlst,args)try:pwd=rset[0][0]exceptIndexError:raiseAuthenticationError('bad login')# passwords are stored using the Bytes type, so we get a StringIOifpwdisnotNone:args['pwd']=Binary(crypt_password(password,pwd.getvalue()[:2]))# get eid from login and (crypted) passwordrset=self.source.syntax_tree_search(session,self._auth_rqlst,args)try:returnrset[0][0]exceptIndexError:raiseAuthenticationError('bad password')