[fti] refactor and fix full text indexation handling stable
authorSylvain Thénault <sylvain.thenault@logilab.fr>
Thu, 04 Mar 2010 18:02:33 +0100
branchstable
changeset 4806 4f12f59b1a13
parent 4805 2d0aa2b7da02
child 4807 5642bfa43236
[fti] refactor and fix full text indexation handling * moved logic from repository to the native source * avoid creating a FTIndexOp when no full text indexation is needed * entities with fulltext_container set are not indexed even when not related to a container * avoid duplicated unindexation
entity.py
hooks/metadata.py
hooks/syncschema.py
server/checkintegrity.py
server/repository.py
server/sources/native.py
server/test/unittest_repository.py
--- a/entity.py	Thu Mar 04 17:58:31 2010 +0100
+++ b/entity.py	Thu Mar 04 18:02:33 2010 +0100
@@ -876,7 +876,6 @@
         _done.add(self.eid)
         containers = tuple(self.e_schema.fulltext_containers())
         if containers:
-            yielded = False
             for rschema, target in containers:
                 if target == 'object':
                     targets = getattr(self, rschema.type)
@@ -888,8 +887,6 @@
                     for container in entity.fti_containers(_done):
                         yield container
                         yielded = True
-            if not yielded:
-                yield self
         else:
             yield self
 
@@ -919,7 +916,6 @@
                 continue
             if value:
                 words += tokenize(value)
-
         for rschema, role in self.e_schema.fulltext_relations():
             if role == 'subject':
                 for entity in getattr(self, rschema.type):
--- a/hooks/metadata.py	Thu Mar 04 17:58:31 2010 +0100
+++ b/hooks/metadata.py	Thu Mar 04 18:02:33 2010 +0100
@@ -12,7 +12,6 @@
 
 from cubicweb.selectors import implements
 from cubicweb.server import hook
-from cubicweb.server.repository import FTIndexEntityOp
 
 
 def eschema_type_eid(session, etype):
@@ -150,7 +149,8 @@
 
 
 class UpdateFTIHook(MetaDataHook):
-    """sync fulltext index when relevant relation is added / removed
+    """sync fulltext index text index container when a relation with
+    fulltext_container set is added / removed
     """
     __regid__ = 'updateftirel'
     events = ('after_add_relation', 'after_delete_relation')
@@ -158,15 +158,19 @@
     def __call__(self):
         rtype = self.rtype
         session = self._cw
+        ftcontainer = session.vreg.schema.rschema(rtype).fulltext_container
         if self.event == 'after_add_relation':
-            # Reindexing the contained entity is enough since it will implicitly
-            # reindex the container entity.
-            ftcontainer = session.vreg.schema.rschema(rtype).fulltext_container
             if ftcontainer == 'subject':
-                FTIndexEntityOp(session, entity=session.entity_from_eid(self.eidto))
+                session.repo.system_source.index_entity(
+                    session, session.entity_from_eid(self.eidfrom))
             elif ftcontainer == 'object':
-                FTIndexEntityOp(session, entity=session.entity_from_eid(self.eidfrom))
-        elif session.repo.schema.rschema(rtype).fulltext_container:
-            FTIndexEntityOp(session, entity=session.entity_from_eid(self.eidto))
-            FTIndexEntityOp(session, entity=session.entity_from_eid(self.eidfrom))
+                session.repo.system_source.index_entity(
+                    session, session.entity_from_eid(self.eidto))
+        # after delete relation
+        elif ftcontainer == 'subject':
+            session.repo.system_source.index_entity(
+                session, entity=session.entity_from_eid(self.eidfrom))
+        elif ftcontainer == 'object':
+            session.repo.system_source.index_entity(
+                session, entity=session.entity_from_eid(self.eidto))
 
--- a/hooks/syncschema.py	Thu Mar 04 17:58:31 2010 +0100
+++ b/hooks/syncschema.py	Thu Mar 04 18:02:33 2010 +0100
@@ -1152,7 +1152,8 @@
                     source.fti_unindex_entity(session, entity.eid)
                     for container in entity.fti_containers():
                         if still_fti or container is not entity:
-                            session.repo.index_entity(session, container)
+                            source.fti_unindex_entity(session, entity.eid)
+                            source.fti_index_entity(session, container)
                 except Exception:
                     self.critical('Error while updating Full Text Index for'
                                   ' entity %s', entity.eid, exc_info=True)
--- a/server/checkintegrity.py	Thu Mar 04 17:58:31 2010 +0100
+++ b/server/checkintegrity.py	Thu Mar 04 18:02:33 2010 +0100
@@ -80,7 +80,7 @@
         cursor.execute(indexer.sql_init_fti())
     repo.config.disabled_hooks_categories.add('metadata')
     repo.config.disabled_hooks_categories.add('integrity')
-    repo.do_fti = True  # ensure full-text indexation is activated
+    repo.system_source.do_fti = True  # ensure full-text indexation is activated
     etypes = set()
     for eschema in schema.entities():
         if eschema.final:
--- a/server/repository.py	Thu Mar 04 17:58:31 2010 +0100
+++ b/server/repository.py	Thu Mar 04 18:02:33 2010 +0100
@@ -71,27 +71,6 @@
             pass
 
 
-class FTIndexEntityOp(hook.LateOperation):
-    """operation to delay entity full text indexation to commit
-
-    since fti indexing may trigger discovery of other entities, it should be
-    triggered on precommit, not commit, and this should be done after other
-    precommit operation which may add relations to the entity
-    """
-
-    def precommit_event(self):
-        session = self.session
-        entity = self.entity
-        if entity.eid in session.transaction_data.get('pendingeids', ()):
-            return # entity added and deleted in the same transaction
-        session.repo.system_source.fti_unindex_entity(session, entity.eid)
-        for container in entity.fti_containers():
-            session.repo.index_entity(session, container)
-
-    def commit_event(self):
-        pass
-
-
 def del_existing_rel_if_needed(session, eidfrom, rtype, eidto):
     """delete existing relation when adding a new one if card is 1 or ?
 
@@ -133,6 +112,7 @@
         if rset:
             safe_delete_relation(session, rschema, *rset[0])
 
+
 def safe_delete_relation(session, rschema, subject, object):
     if not rschema.has_perm(session, 'delete', fromeid=subject, toeid=object):
         raise Unauthorized()
@@ -164,8 +144,6 @@
         self.vreg.schema = self.schema # until actual schema is loaded...
         # querier helper, need to be created after sources initialization
         self.querier = querier.QuerierHelper(self, self.schema)
-        # should we reindex in changes?
-        self.do_fti = not config['delay-full-text-indexation']
         # sources
         self.sources = []
         self.sources_by_uri = {}
@@ -777,7 +755,6 @@
     # data sources handling ###################################################
     # * correspondance between eid and (type, source)
     # * correspondance between eid and local id (i.e. specific to a given source)
-    # * searchable text indexes
 
     def type_and_source_from_eid(self, eid, session=None):
         """return a tuple (type, source, extid) for the entity with id <eid>"""
@@ -904,14 +881,9 @@
         and index the entity with the full text index
         """
         # begin by inserting eid/type/source/extid into the entities table
-        self.system_source.add_info(session, entity, source, extid)
-        if complete:
-            entity.complete(entity.e_schema.indexable_attributes())
         new = session.transaction_data.setdefault('neweids', set())
         new.add(entity.eid)
-        # now we can update the full text index
-        if self.do_fti:
-            FTIndexEntityOp(session, entity=entity)
+        self.system_source.add_info(session, entity, source, extid, complete)
         CleanupEidTypeCacheOp(session)
 
     def delete_info(self, session, eid):
@@ -961,15 +933,6 @@
             # he can delete all its relations without security checking
             session.unsafe_execute(rql, {'x': eid}, 'x', build_descr=False)
 
-    def index_entity(self, session, entity):
-        """full text index a modified entity"""
-        alreadydone = session.transaction_data.setdefault('indexedeids', set())
-        if entity.eid in alreadydone:
-            self.debug('skipping reindexation of %s, already done', entity.eid)
-            return
-        alreadydone.add(entity.eid)
-        self.system_source.fti_index_entity(session, entity)
-
     def locate_relation_source(self, session, subject, rtype, object):
         subjsource = self.source_from_eid(subject, session)
         objsource = self.source_from_eid(object, session)
@@ -1105,14 +1068,10 @@
             if not only_inline_rels:
                 self.hm.call_hooks('before_update_entity', session, entity=entity)
         source.update_entity(session, entity)
-        if not only_inline_rels:
-            if need_fti_update and self.do_fti:
-                # reindex the entity only if this query is updating at least
-                # one indexable attribute
-                FTIndexEntityOp(session, entity=entity)
-            if source.should_call_hooks:
+        self.system_source.update_info(session, entity, need_fti_update)
+        if source.should_call_hooks:
+            if not only_inline_rels:
                 self.hm.call_hooks('after_update_entity', session, entity=entity)
-        if source.should_call_hooks:
             for attr, value, prevvalue in relations:
                 # if the relation is already cached, update existant cache
                 relcache = entity.relation_cached(attr, 'subject')
--- a/server/sources/native.py	Thu Mar 04 17:58:31 2010 +0100
+++ b/server/sources/native.py	Thu Mar 04 18:02:33 2010 +0100
@@ -17,7 +17,9 @@
 from datetime import datetime
 from base64 import b64decode, b64encode
 
+from logilab.common.compat import any
 from logilab.common.cache import Cache
+from logilab.common.decorators import cached, clear_cache
 from logilab.common.configuration import Method
 from logilab.common.adbh import get_adv_func_helper
 from logilab.common.shellutils import getlogin
@@ -26,6 +28,7 @@
 
 from cubicweb import UnknownEid, AuthenticationError, Binary, server
 from cubicweb.cwconfig import CubicWebNoAppConfiguration
+from cubicweb.server import hook
 from cubicweb.server.utils import crypt_password
 from cubicweb.server.sqlutils import SQL_PREFIX, SQLAdapterMixIn
 from cubicweb.server.rqlannotation import set_qdata
@@ -150,12 +153,14 @@
         self._rql_sqlgen = self.sqlgen_class(appschema, self.dbhelper,
                                              self.encoding, ATTR_MAP.copy())
         # full text index helper
-        self.indexer = get_indexer(self.dbdriver, self.encoding)
-        # advanced functionality helper
-        self.dbhelper.fti_uid_attr = self.indexer.uid_attr
-        self.dbhelper.fti_table = self.indexer.table
-        self.dbhelper.fti_restriction_sql = self.indexer.restriction_sql
-        self.dbhelper.fti_need_distinct_query = self.indexer.need_distinct
+        self.do_fti = not repo.config['delay-full-text-indexation']
+        if self.do_fti:
+            self.indexer = get_indexer(self.dbdriver, self.encoding)
+            # XXX should go away with logilab.db
+            self.dbhelper.fti_uid_attr = self.indexer.uid_attr
+            self.dbhelper.fti_table = self.indexer.table
+            self.dbhelper.fti_restriction_sql = self.indexer.restriction_sql
+            self.dbhelper.fti_need_distinct_query = self.indexer.need_distinct
         # sql queries cache
         self._cache = Cache(repo.config['rql-cache-size'])
         self._temp_table_data = {}
@@ -201,9 +206,10 @@
         pool = self.repo._get_pool()
         pool.pool_set()
         # check full text index availibility
-        if not self.indexer.has_fti_table(pool['system']):
-            self.error('no text index table')
-            self.indexer = None
+        if self.do_fti:
+            if not self.indexer.has_fti_table(pool['system']):
+                self.critical('no text index table')
+                self.do_fti = False
         pool.pool_reset()
         self.repo._free_pool(pool)
 
@@ -255,6 +261,7 @@
             pass # __init__
         for authentifier in self.authentifiers:
             authentifier.set_schema(self.schema)
+        clear_cache(self, 'need_fti_indexation')
 
     def support_entity(self, etype, write=False):
         """return true if the given entity's type is handled by this adapter
@@ -524,7 +531,7 @@
         finally:
             self._eid_creation_lock.release()
 
-    def add_info(self, session, entity, source, extid=None):
+    def add_info(self, session, entity, source, extid=None, complete=True):
         """add type and source info for an eid into the system table"""
         # begin by inserting eid/type/source/extid into the entities table
         if extid is not None:
@@ -533,6 +540,20 @@
         attrs = {'type': entity.__regid__, 'eid': entity.eid, 'extid': extid,
                  'source': source.uri, 'mtime': datetime.now()}
         session.system_sql(self.sqlgen.insert('entities', attrs), attrs)
+        # now we can update the full text index
+        if self.do_fti and self.need_fti_indexation(entity.__regid__):
+            if complete:
+                entity.complete(entity.e_schema.indexable_attributes())
+            FTIndexEntityOp(session, entity=entity)
+
+    def update_info(self, session, entity, need_fti_update):
+        if self.do_fti and need_fti_update:
+            # reindex the entity only if this query is updating at least
+            # one indexable attribute
+            FTIndexEntityOp(session, entity=entity)
+        # update entities.mtime
+        attrs = {'eid': entity.eid, 'mtime': datetime.now()}
+        session.system_sql(self.sqlgen.update('entities', attrs, ['eid']), attrs)
 
     def delete_info(self, session, eid, etype, uri, extid):
         """delete system information on deletion of an entity by transfering
@@ -547,30 +568,6 @@
                  'source': uri, 'dtime': datetime.now()}
         session.system_sql(self.sqlgen.insert('deleted_entities', attrs), attrs)
 
-    def fti_unindex_entity(self, session, eid):
-        """remove text content for entity with the given eid from the full text
-        index
-        """
-        try:
-            self.indexer.cursor_unindex_object(eid, session.pool['system'])
-        except Exception: # let KeyboardInterrupt / SystemExit propagate
-            if self.indexer is not None:
-                self.exception('error while unindexing %s', eid)
-
-    def fti_index_entity(self, session, entity):
-        """add text content of a created/modified entity to the full text index
-        """
-        self.debug('reindexing %r', entity.eid)
-        try:
-            self.indexer.cursor_reindex_object(entity.eid, entity,
-                                               session.pool['system'])
-        except Exception: # let KeyboardInterrupt / SystemExit propagate
-            if self.indexer is not None:
-                self.exception('error while reindexing %s', entity)
-        # update entities.mtime
-        attrs = {'eid': entity.eid, 'mtime': datetime.now()}
-        session.system_sql(self.sqlgen.update('entities', attrs, ['eid']), attrs)
-
     def modified_entities(self, session, etypes, mtime):
         """return a 2-uple:
         * list of (etype, eid) of entities of the given types which have been
@@ -587,6 +584,68 @@
         delentities = cursor.fetchall()
         return modentities, delentities
 
+    # full text index handling #################################################
+
+    @cached
+    def need_fti_indexation(self, etype):
+        eschema = self.schema.eschema(etype)
+        if any(eschema.indexable_attributes()):
+            return True
+        if any(eschema.fulltext_containers()):
+            return True
+        return False
+
+    def index_entity(self, session, entity):
+        FTIndexEntityOp(session, entity=entity)
+
+    def fti_unindex_entity(self, session, eid):
+        """remove text content for entity with the given eid from the full text
+        index
+        """
+        try:
+            self.indexer.cursor_unindex_object(eid, session.pool['system'])
+        except Exception: # let KeyboardInterrupt / SystemExit propagate
+            self.exception('error while unindexing %s', eid)
+
+    def fti_index_entity(self, session, entity):
+        """add text content of a created/modified entity to the full text index
+        """
+        self.debug('reindexing %r', entity.eid)
+        try:
+            # use cursor_index_object, not cursor_reindex_object since
+            # unindexing done in the FTIndexEntityOp
+            self.indexer.cursor_index_object(entity.eid, entity,
+                                             session.pool['system'])
+        except Exception: # let KeyboardInterrupt / SystemExit propagate
+            self.exception('error while reindexing %s', entity)
+
+
+class FTIndexEntityOp(hook.LateOperation):
+    """operation to delay entity full text indexation to commit
+
+    since fti indexing may trigger discovery of other entities, it should be
+    triggered on precommit, not commit, and this should be done after other
+    precommit operation which may add relations to the entity
+    """
+
+    def precommit_event(self):
+        session = self.session
+        entity = self.entity
+        if entity.eid in session.transaction_data.get('pendingeids', ()):
+            return # entity added and deleted in the same transaction
+        alreadydone = session.transaction_data.setdefault('indexedeids', set())
+        if entity.eid in alreadydone:
+            self.debug('skipping reindexation of %s, already done', entity.eid)
+            return
+        alreadydone.add(entity.eid)
+        source = session.repo.system_source
+        for container in entity.fti_containers():
+            source.fti_unindex_entity(session, container.eid)
+            source.fti_index_entity(session, container)
+
+    def commit_event(self):
+        pass
+
 
 def sql_schema(driver):
     helper = get_adv_func_helper(driver)
--- a/server/test/unittest_repository.py	Thu Mar 04 17:58:31 2010 +0100
+++ b/server/test/unittest_repository.py	Thu Mar 04 18:02:33 2010 +0100
@@ -26,7 +26,7 @@
 from cubicweb.devtools.repotest import tuplify
 from cubicweb.server import repository, hook
 from cubicweb.server.sqlutils import SQL_PREFIX
-
+from cubicweb.server.sources import native
 
 # start name server anyway, process will fail if already running
 os.system('pyro-ns >/dev/null 2>/dev/null &')
@@ -424,25 +424,39 @@
         self.assertEquals(modified, [])
         self.assertEquals(deleted, [('Personne', eidp)])
 
-    def test_composite_entity(self):
+    def test_fulltext_container_entity(self):
         assert self.schema.rschema('use_email').fulltext_container == 'subject'
-        eid = self.request().create_entity('EmailAddress', address=u'toto@logilab.fr').eid
+        req = self.request()
+        toto = req.create_entity('EmailAddress', address=u'toto@logilab.fr')
         self.commit()
-        rset = self.execute('Any X WHERE X has_text %(t)s', {'t': 'toto'})
-        self.assertEquals(rset.rows, [[eid]])
-        self.execute('SET X use_email Y WHERE X login "admin", Y eid %(y)s', {'y': eid})
+        rset = req.execute('Any X WHERE X has_text %(t)s', {'t': 'toto'})
+        self.assertEquals(rset.rows, [])
+        req.user.set_relations(use_email=toto)
+        self.commit()
+        rset = req.execute('Any X WHERE X has_text %(t)s', {'t': 'toto'})
+        self.assertEquals(rset.rows, [[req.user.eid]])
+        req.execute('DELETE X use_email Y WHERE X login "admin", Y eid %(y)s',
+                    {'y': toto.eid})
         self.commit()
-        rset = self.execute('Any X WHERE X has_text %(t)s', {'t': 'toto'})
-        self.assertEquals(rset.rows, [[self.session.user.eid]])
-        self.execute('DELETE X use_email Y WHERE X login "admin", Y eid %(y)s', {'y': eid})
-        self.commit()
-        rset = self.execute('Any X WHERE X has_text %(t)s', {'t': 'toto'})
+        rset = req.execute('Any X WHERE X has_text %(t)s', {'t': 'toto'})
         self.assertEquals(rset.rows, [])
-        eid = self.request().create_entity('EmailAddress', address=u'tutu@logilab.fr').eid
-        self.execute('SET X use_email Y WHERE X login "admin", Y eid %(y)s', {'y': eid})
+        tutu = req.create_entity('EmailAddress', address=u'tutu@logilab.fr')
+        req.user.set_relations(use_email=tutu)
+        self.commit()
+        rset = req.execute('Any X WHERE X has_text %(t)s', {'t': 'tutu'})
+        self.assertEquals(rset.rows, [[req.user.eid]])
+        tutu.set_attributes(address=u'hip@logilab.fr')
         self.commit()
-        rset = self.execute('Any X WHERE X has_text %(t)s', {'t': 'tutu'})
-        self.assertEquals(rset.rows, [[self.session.user.eid]])
+        rset = req.execute('Any X WHERE X has_text %(t)s', {'t': 'tutu'})
+        self.assertEquals(rset.rows, [])
+        rset = req.execute('Any X WHERE X has_text %(t)s', {'t': 'hip'})
+        self.assertEquals(rset.rows, [[req.user.eid]])
+
+    def test_no_uncessary_ftiindex_op(self):
+        req = self.request()
+        req.create_entity('Workflow', name=u'dummy workflow', description=u'huuuuu')
+        self.failIf(any(x for x in self.session.pending_operations
+                        if isinstance(x, native.FTIndexEntityOp)))
 
 
 class DBInitTC(CubicWebTC):