server/sources/native.py
branchstable
changeset 4806 4f12f59b1a13
parent 4692 11a040e2601c
child 4807 5642bfa43236
equal deleted inserted replaced
4805:2d0aa2b7da02 4806:4f12f59b1a13
    15 
    15 
    16 from threading import Lock
    16 from threading import Lock
    17 from datetime import datetime
    17 from datetime import datetime
    18 from base64 import b64decode, b64encode
    18 from base64 import b64decode, b64encode
    19 
    19 
       
    20 from logilab.common.compat import any
    20 from logilab.common.cache import Cache
    21 from logilab.common.cache import Cache
       
    22 from logilab.common.decorators import cached, clear_cache
    21 from logilab.common.configuration import Method
    23 from logilab.common.configuration import Method
    22 from logilab.common.adbh import get_adv_func_helper
    24 from logilab.common.adbh import get_adv_func_helper
    23 from logilab.common.shellutils import getlogin
    25 from logilab.common.shellutils import getlogin
    24 
    26 
    25 from indexer import get_indexer
    27 from indexer import get_indexer
    26 
    28 
    27 from cubicweb import UnknownEid, AuthenticationError, Binary, server
    29 from cubicweb import UnknownEid, AuthenticationError, Binary, server
    28 from cubicweb.cwconfig import CubicWebNoAppConfiguration
    30 from cubicweb.cwconfig import CubicWebNoAppConfiguration
       
    31 from cubicweb.server import hook
    29 from cubicweb.server.utils import crypt_password
    32 from cubicweb.server.utils import crypt_password
    30 from cubicweb.server.sqlutils import SQL_PREFIX, SQLAdapterMixIn
    33 from cubicweb.server.sqlutils import SQL_PREFIX, SQLAdapterMixIn
    31 from cubicweb.server.rqlannotation import set_qdata
    34 from cubicweb.server.rqlannotation import set_qdata
    32 from cubicweb.server.sources import AbstractSource, dbg_st_search, dbg_results
    35 from cubicweb.server.sources import AbstractSource, dbg_st_search, dbg_results
    33 from cubicweb.server.sources.rql2sql import SQLGenerator
    36 from cubicweb.server.sources.rql2sql import SQLGenerator
   148                                 *args, **kwargs)
   151                                 *args, **kwargs)
   149         # sql generator
   152         # sql generator
   150         self._rql_sqlgen = self.sqlgen_class(appschema, self.dbhelper,
   153         self._rql_sqlgen = self.sqlgen_class(appschema, self.dbhelper,
   151                                              self.encoding, ATTR_MAP.copy())
   154                                              self.encoding, ATTR_MAP.copy())
   152         # full text index helper
   155         # full text index helper
   153         self.indexer = get_indexer(self.dbdriver, self.encoding)
   156         self.do_fti = not repo.config['delay-full-text-indexation']
   154         # advanced functionality helper
   157         if self.do_fti:
   155         self.dbhelper.fti_uid_attr = self.indexer.uid_attr
   158             self.indexer = get_indexer(self.dbdriver, self.encoding)
   156         self.dbhelper.fti_table = self.indexer.table
   159             # XXX should go away with logilab.db
   157         self.dbhelper.fti_restriction_sql = self.indexer.restriction_sql
   160             self.dbhelper.fti_uid_attr = self.indexer.uid_attr
   158         self.dbhelper.fti_need_distinct_query = self.indexer.need_distinct
   161             self.dbhelper.fti_table = self.indexer.table
       
   162             self.dbhelper.fti_restriction_sql = self.indexer.restriction_sql
       
   163             self.dbhelper.fti_need_distinct_query = self.indexer.need_distinct
   159         # sql queries cache
   164         # sql queries cache
   160         self._cache = Cache(repo.config['rql-cache-size'])
   165         self._cache = Cache(repo.config['rql-cache-size'])
   161         self._temp_table_data = {}
   166         self._temp_table_data = {}
   162         self._eid_creation_lock = Lock()
   167         self._eid_creation_lock = Lock()
   163         # XXX no_sqlite_wrap trick since we've a sqlite locking pb when
   168         # XXX no_sqlite_wrap trick since we've a sqlite locking pb when
   199 
   204 
   200     def init_creating(self):
   205     def init_creating(self):
   201         pool = self.repo._get_pool()
   206         pool = self.repo._get_pool()
   202         pool.pool_set()
   207         pool.pool_set()
   203         # check full text index availibility
   208         # check full text index availibility
   204         if not self.indexer.has_fti_table(pool['system']):
   209         if self.do_fti:
   205             self.error('no text index table')
   210             if not self.indexer.has_fti_table(pool['system']):
   206             self.indexer = None
   211                 self.critical('no text index table')
       
   212                 self.do_fti = False
   207         pool.pool_reset()
   213         pool.pool_reset()
   208         self.repo._free_pool(pool)
   214         self.repo._free_pool(pool)
   209 
   215 
   210     def backup(self, backupfile):
   216     def backup(self, backupfile):
   211         """method called to create a backup of the source's data"""
   217         """method called to create a backup of the source's data"""
   253             self._rql_sqlgen.schema = schema
   259             self._rql_sqlgen.schema = schema
   254         except AttributeError:
   260         except AttributeError:
   255             pass # __init__
   261             pass # __init__
   256         for authentifier in self.authentifiers:
   262         for authentifier in self.authentifiers:
   257             authentifier.set_schema(self.schema)
   263             authentifier.set_schema(self.schema)
       
   264         clear_cache(self, 'need_fti_indexation')
   258 
   265 
   259     def support_entity(self, etype, write=False):
   266     def support_entity(self, etype, write=False):
   260         """return true if the given entity's type is handled by this adapter
   267         """return true if the given entity's type is handled by this adapter
   261         if write is true, return true only if it's a RW support
   268         if write is true, return true only if it's a RW support
   262         """
   269         """
   522                 cursor = self.doexec(session, sql)
   529                 cursor = self.doexec(session, sql)
   523             return cursor.fetchone()[0]
   530             return cursor.fetchone()[0]
   524         finally:
   531         finally:
   525             self._eid_creation_lock.release()
   532             self._eid_creation_lock.release()
   526 
   533 
   527     def add_info(self, session, entity, source, extid=None):
   534     def add_info(self, session, entity, source, extid=None, complete=True):
   528         """add type and source info for an eid into the system table"""
   535         """add type and source info for an eid into the system table"""
   529         # begin by inserting eid/type/source/extid into the entities table
   536         # begin by inserting eid/type/source/extid into the entities table
   530         if extid is not None:
   537         if extid is not None:
   531             assert isinstance(extid, str)
   538             assert isinstance(extid, str)
   532             extid = b64encode(extid)
   539             extid = b64encode(extid)
   533         attrs = {'type': entity.__regid__, 'eid': entity.eid, 'extid': extid,
   540         attrs = {'type': entity.__regid__, 'eid': entity.eid, 'extid': extid,
   534                  'source': source.uri, 'mtime': datetime.now()}
   541                  'source': source.uri, 'mtime': datetime.now()}
   535         session.system_sql(self.sqlgen.insert('entities', attrs), attrs)
   542         session.system_sql(self.sqlgen.insert('entities', attrs), attrs)
       
   543         # now we can update the full text index
       
   544         if self.do_fti and self.need_fti_indexation(entity.__regid__):
       
   545             if complete:
       
   546                 entity.complete(entity.e_schema.indexable_attributes())
       
   547             FTIndexEntityOp(session, entity=entity)
       
   548 
       
   549     def update_info(self, session, entity, need_fti_update):
       
   550         if self.do_fti and need_fti_update:
       
   551             # reindex the entity only if this query is updating at least
       
   552             # one indexable attribute
       
   553             FTIndexEntityOp(session, entity=entity)
       
   554         # update entities.mtime
       
   555         attrs = {'eid': entity.eid, 'mtime': datetime.now()}
       
   556         session.system_sql(self.sqlgen.update('entities', attrs, ['eid']), attrs)
   536 
   557 
   537     def delete_info(self, session, eid, etype, uri, extid):
   558     def delete_info(self, session, eid, etype, uri, extid):
   538         """delete system information on deletion of an entity by transfering
   559         """delete system information on deletion of an entity by transfering
   539         record from the entities table to the deleted_entities table
   560         record from the entities table to the deleted_entities table
   540         """
   561         """
   545             extid = b64encode(extid)
   566             extid = b64encode(extid)
   546         attrs = {'type': etype, 'eid': eid, 'extid': extid,
   567         attrs = {'type': etype, 'eid': eid, 'extid': extid,
   547                  'source': uri, 'dtime': datetime.now()}
   568                  'source': uri, 'dtime': datetime.now()}
   548         session.system_sql(self.sqlgen.insert('deleted_entities', attrs), attrs)
   569         session.system_sql(self.sqlgen.insert('deleted_entities', attrs), attrs)
   549 
   570 
   550     def fti_unindex_entity(self, session, eid):
       
   551         """remove text content for entity with the given eid from the full text
       
   552         index
       
   553         """
       
   554         try:
       
   555             self.indexer.cursor_unindex_object(eid, session.pool['system'])
       
   556         except Exception: # let KeyboardInterrupt / SystemExit propagate
       
   557             if self.indexer is not None:
       
   558                 self.exception('error while unindexing %s', eid)
       
   559 
       
   560     def fti_index_entity(self, session, entity):
       
   561         """add text content of a created/modified entity to the full text index
       
   562         """
       
   563         self.debug('reindexing %r', entity.eid)
       
   564         try:
       
   565             self.indexer.cursor_reindex_object(entity.eid, entity,
       
   566                                                session.pool['system'])
       
   567         except Exception: # let KeyboardInterrupt / SystemExit propagate
       
   568             if self.indexer is not None:
       
   569                 self.exception('error while reindexing %s', entity)
       
   570         # update entities.mtime
       
   571         attrs = {'eid': entity.eid, 'mtime': datetime.now()}
       
   572         session.system_sql(self.sqlgen.update('entities', attrs, ['eid']), attrs)
       
   573 
       
   574     def modified_entities(self, session, etypes, mtime):
   571     def modified_entities(self, session, etypes, mtime):
   575         """return a 2-uple:
   572         """return a 2-uple:
   576         * list of (etype, eid) of entities of the given types which have been
   573         * list of (etype, eid) of entities of the given types which have been
   577           modified since the given timestamp (actually entities whose full text
   574           modified since the given timestamp (actually entities whose full text
   578           index content has changed)
   575           index content has changed)
   584         modentities = cursor.fetchall()
   581         modentities = cursor.fetchall()
   585         delsql = _modified_sql('deleted_entities', etypes)
   582         delsql = _modified_sql('deleted_entities', etypes)
   586         cursor = session.system_sql(delsql, {'time': mtime})
   583         cursor = session.system_sql(delsql, {'time': mtime})
   587         delentities = cursor.fetchall()
   584         delentities = cursor.fetchall()
   588         return modentities, delentities
   585         return modentities, delentities
       
   586 
       
   587     # full text index handling #################################################
       
   588 
       
   589     @cached
       
   590     def need_fti_indexation(self, etype):
       
   591         eschema = self.schema.eschema(etype)
       
   592         if any(eschema.indexable_attributes()):
       
   593             return True
       
   594         if any(eschema.fulltext_containers()):
       
   595             return True
       
   596         return False
       
   597 
       
   598     def index_entity(self, session, entity):
       
   599         FTIndexEntityOp(session, entity=entity)
       
   600 
       
   601     def fti_unindex_entity(self, session, eid):
       
   602         """remove text content for entity with the given eid from the full text
       
   603         index
       
   604         """
       
   605         try:
       
   606             self.indexer.cursor_unindex_object(eid, session.pool['system'])
       
   607         except Exception: # let KeyboardInterrupt / SystemExit propagate
       
   608             self.exception('error while unindexing %s', eid)
       
   609 
       
   610     def fti_index_entity(self, session, entity):
       
   611         """add text content of a created/modified entity to the full text index
       
   612         """
       
   613         self.debug('reindexing %r', entity.eid)
       
   614         try:
       
   615             # use cursor_index_object, not cursor_reindex_object since
       
   616             # unindexing done in the FTIndexEntityOp
       
   617             self.indexer.cursor_index_object(entity.eid, entity,
       
   618                                              session.pool['system'])
       
   619         except Exception: # let KeyboardInterrupt / SystemExit propagate
       
   620             self.exception('error while reindexing %s', entity)
       
   621 
       
   622 
       
   623 class FTIndexEntityOp(hook.LateOperation):
       
   624     """operation to delay entity full text indexation to commit
       
   625 
       
   626     since fti indexing may trigger discovery of other entities, it should be
       
   627     triggered on precommit, not commit, and this should be done after other
       
   628     precommit operation which may add relations to the entity
       
   629     """
       
   630 
       
   631     def precommit_event(self):
       
   632         session = self.session
       
   633         entity = self.entity
       
   634         if entity.eid in session.transaction_data.get('pendingeids', ()):
       
   635             return # entity added and deleted in the same transaction
       
   636         alreadydone = session.transaction_data.setdefault('indexedeids', set())
       
   637         if entity.eid in alreadydone:
       
   638             self.debug('skipping reindexation of %s, already done', entity.eid)
       
   639             return
       
   640         alreadydone.add(entity.eid)
       
   641         source = session.repo.system_source
       
   642         for container in entity.fti_containers():
       
   643             source.fti_unindex_entity(session, container.eid)
       
   644             source.fti_index_entity(session, container)
       
   645 
       
   646     def commit_event(self):
       
   647         pass
   589 
   648 
   590 
   649 
   591 def sql_schema(driver):
   650 def sql_schema(driver):
   592     helper = get_adv_func_helper(driver)
   651     helper = get_adv_func_helper(driver)
   593     tstamp_col_type = helper.TYPE_MAPPING['Datetime']
   652     tstamp_col_type = helper.TYPE_MAPPING['Datetime']