server/sources/native.py
brancholdstable
changeset 4985 02b52bf9f5f8
parent 4900 29626bb6071b
child 4902 4e67a538e476
--- a/server/sources/native.py	Fri Feb 12 15:18:00 2010 +0100
+++ b/server/sources/native.py	Wed Mar 24 10:23:31 2010 +0100
@@ -17,7 +17,9 @@
 from datetime import datetime
 from base64 import b64decode, b64encode
 
+from logilab.common.compat import any
 from logilab.common.cache import Cache
+from logilab.common.decorators import cached, clear_cache
 from logilab.common.configuration import Method
 from logilab.common.adbh import get_adv_func_helper
 from logilab.common.shellutils import getlogin
@@ -26,6 +28,7 @@
 
 from cubicweb import UnknownEid, AuthenticationError, Binary, server
 from cubicweb.cwconfig import CubicWebNoAppConfiguration
+from cubicweb.server import hook
 from cubicweb.server.utils import crypt_password
 from cubicweb.server.sqlutils import SQL_PREFIX, SQLAdapterMixIn
 from cubicweb.server.rqlannotation import set_qdata
@@ -96,11 +99,6 @@
     """adapter for source using the native cubicweb schema (see below)
     """
     sqlgen_class = SQLGenerator
-
-    passwd_rql = "Any P WHERE X is CWUser, X login %(login)s, X upassword P"
-    auth_rql = "Any X WHERE X is CWUser, X login %(login)s, X upassword %(pwd)s"
-    _sols = ({'X': 'CWUser', 'P': 'Password'},)
-
     options = (
         ('db-driver',
          {'type' : 'string',
@@ -148,18 +146,23 @@
 
     def __init__(self, repo, appschema, source_config, *args, **kwargs):
         SQLAdapterMixIn.__init__(self, source_config)
+        self.authentifiers = [LoginPasswordAuthentifier(self)]
         AbstractSource.__init__(self, repo, appschema, source_config,
                                 *args, **kwargs)
+        # full text index helper
+        self.do_fti = not repo.config['delay-full-text-indexation']
+        if self.do_fti:
+            self.indexer = get_indexer(self.dbdriver, self.encoding)
+            # XXX should go away with logilab.db
+            self.dbhelper.fti_uid_attr = self.indexer.uid_attr
+            self.dbhelper.fti_table = self.indexer.table
+            self.dbhelper.fti_restriction_sql = self.indexer.restriction_sql
+            self.dbhelper.fti_need_distinct_query = self.indexer.need_distinct
+        else:
+            self.dbhelper.fti_need_distinct_query = False
         # sql generator
         self._rql_sqlgen = self.sqlgen_class(appschema, self.dbhelper,
                                              self.encoding, ATTR_MAP.copy())
-        # full text index helper
-        self.indexer = get_indexer(self.dbdriver, self.encoding)
-        # advanced functionality helper
-        self.dbhelper.fti_uid_attr = self.indexer.uid_attr
-        self.dbhelper.fti_table = self.indexer.table
-        self.dbhelper.fti_restriction_sql = self.indexer.restriction_sql
-        self.dbhelper.fti_need_distinct_query = self.indexer.need_distinct
         # sql queries cache
         self._cache = Cache(repo.config['rql-cache-size'])
         self._temp_table_data = {}
@@ -182,6 +185,11 @@
         #      consuming, find another way
         return SQLAdapterMixIn.get_connection(self)
 
+    def add_authentifier(self, authentifier):
+        self.authentifiers.append(authentifier)
+        authentifier.source = self
+        authentifier.set_schema(self.schema)
+
     def reset_caches(self):
         """method called during test to reset potential source caches"""
         self._cache = Cache(self.repo.config['rql-cache-size'])
@@ -200,17 +208,19 @@
         pool = self.repo._get_pool()
         pool.pool_set()
         # check full text index availibility
-        if not self.indexer.has_fti_table(pool['system']):
-            self.error('no text index table')
-            self.indexer = None
+        if self.do_fti:
+            if not self.indexer.has_fti_table(pool['system']):
+                if not self.repo.config.creating:
+                    self.critical('no text index table')
+                self.do_fti = False
         pool.pool_reset()
         self.repo._free_pool(pool)
 
-    def backup(self, backupfile):
+    def backup(self, backupfile, confirm):
         """method called to create a backup of the source's data"""
         self.close_pool_connections()
         try:
-            self.backup_to_file(backupfile)
+            self.backup_to_file(backupfile, confirm)
         finally:
             self.open_pool_connections()
 
@@ -230,12 +240,15 @@
     def map_attribute(self, etype, attr, cb):
         self._rql_sqlgen.attr_map['%s.%s' % (etype, attr)] = cb
 
+    def unmap_attribute(self, etype, attr):
+        self._rql_sqlgen.attr_map.pop('%s.%s' % (etype, attr), None)
+
     # ISource interface #######################################################
 
-    def compile_rql(self, rql):
+    def compile_rql(self, rql, sols):
         rqlst = self.repo.vreg.rqlhelper.parse(rql)
         rqlst.restricted_vars = ()
-        rqlst.children[0].solutions = self._sols
+        rqlst.children[0].solutions = sols
         self.repo.querier.sqlgen_annotate(rqlst)
         set_qdata(self.schema.rschema, rqlst, ())
         return rqlst
@@ -249,10 +262,9 @@
             self._rql_sqlgen.schema = schema
         except AttributeError:
             pass # __init__
-        if 'CWUser' in schema: # probably an empty schema if not true...
-            # rql syntax trees used to authenticate users
-            self._passwd_rqlst = self.compile_rql(self.passwd_rql)
-            self._auth_rqlst = self.compile_rql(self.auth_rql)
+        for authentifier in self.authentifiers:
+            authentifier.set_schema(self.schema)
+        clear_cache(self, 'need_fti_indexation')
 
     def support_entity(self, etype, write=False):
         """return true if the given entity's type is handled by this adapter
@@ -273,30 +285,16 @@
     def may_cross_relation(self, rtype):
         return True
 
-    def authenticate(self, session, login, password):
-        """return CWUser eid for the given login/password if this account is
-        defined in this source, else raise `AuthenticationError`
-
-        two queries are needed since passwords are stored crypted, so we have
-        to fetch the salt first
+    def authenticate(self, session, login, **kwargs):
+        """return CWUser eid for the given login and other authentication
+        information found in kwargs, else raise `AuthenticationError`
         """
-        args = {'login': login, 'pwd' : password}
-        if password is not None:
-            rset = self.syntax_tree_search(session, self._passwd_rqlst, args)
+        for authentifier in self.authentifiers:
             try:
-                pwd = rset[0][0]
-            except IndexError:
-                raise AuthenticationError('bad login')
-            # passwords are stored using the Bytes type, so we get a StringIO
-            if pwd is not None:
-                args['pwd'] = Binary(crypt_password(password, pwd.getvalue()[:2]))
-        # get eid from login and (crypted) password
-        # XXX why not simply compare password?
-        rset = self.syntax_tree_search(session, self._auth_rqlst, args)
-        try:
-            return rset[0][0]
-        except IndexError:
-            raise AuthenticationError('bad password')
+                return authentifier.authenticate(session, login, **kwargs)
+            except AuthenticationError:
+                continue
+        raise AuthenticationError()
 
     def syntax_tree_search(self, session, union, args=None, cachekey=None,
                            varmap=None):
@@ -390,7 +388,8 @@
     def update_entity(self, session, entity):
         """replace an entity in the source"""
         attrs = self.preprocess_entity(entity)
-        sql = self.sqlgen.update(SQL_PREFIX + str(entity.e_schema), attrs, [SQL_PREFIX + 'eid'])
+        sql = self.sqlgen.update(SQL_PREFIX + str(entity.e_schema), attrs,
+                                 [SQL_PREFIX + 'eid'])
         self.doexec(session, sql, attrs)
 
     def delete_entity(self, session, etype, eid):
@@ -399,10 +398,16 @@
         sql = self.sqlgen.delete(SQL_PREFIX + etype, attrs)
         self.doexec(session, sql, attrs)
 
-    def add_relation(self, session, subject, rtype, object):
+    def add_relation(self, session, subject, rtype, object, inlined=False):
         """add a relation to the source"""
-        attrs = {'eid_from': subject, 'eid_to': object}
-        sql = self.sqlgen.insert('%s_relation' % rtype, attrs)
+        if inlined is False:
+            attrs = {'eid_from': subject, 'eid_to': object}
+            sql = self.sqlgen.insert('%s_relation' % rtype, attrs)
+        else: # used by data import
+            etype = session.describe(subject)[0]
+            attrs = {SQL_PREFIX + 'eid': subject, SQL_PREFIX + rtype: object}
+            sql = self.sqlgen.update(SQL_PREFIX + etype, attrs,
+                                     [SQL_PREFIX + 'eid'])
         self.doexec(session, sql, attrs)
 
     def delete_relation(self, session, subject, rtype, object):
@@ -433,12 +438,16 @@
             # str(query) to avoid error if it's an unicode string
             cursor.execute(str(query), args)
         except Exception, ex:
-            self.critical("sql: %r\n args: %s\ndbms message: %r",
-                          query, args, ex.args[0])
+            if self.repo.config.mode != 'test':
+                # during test we get those message when trying to alter sqlite
+                # db schema
+                self.critical("sql: %r\n args: %s\ndbms message: %r",
+                              query, args, ex.args[0])
             if rollback:
                 try:
                     session.pool.connection(self.uri).rollback()
-                    self.critical('transaction has been rollbacked')
+                    if self.repo.config.mode != 'test':
+                        self.critical('transaction has been rollbacked')
                 except:
                     pass
             raise
@@ -455,11 +464,15 @@
             # str(query) to avoid error if it's an unicode string
             cursor.executemany(str(query), args)
         except Exception, ex:
-            self.critical("sql many: %r\n args: %s\ndbms message: %r",
-                          query, args, ex.args[0])
+            if self.repo.config.mode != 'test':
+                # during test we get those message when trying to alter sqlite
+                # db schema
+                self.critical("sql many: %r\n args: %s\ndbms message: %r",
+                              query, args, ex.args[0])
             try:
                 session.pool.connection(self.uri).rollback()
-                self.critical('transaction has been rollbacked')
+                if self.repo.config.mode != 'test':
+                    self.critical('transaction has been rollbacked')
             except:
                 pass
             raise
@@ -528,15 +541,29 @@
         finally:
             self._eid_creation_lock.release()
 
-    def add_info(self, session, entity, source, extid=None):
+    def add_info(self, session, entity, source, extid=None, complete=True):
         """add type and source info for an eid into the system table"""
         # begin by inserting eid/type/source/extid into the entities table
         if extid is not None:
             assert isinstance(extid, str)
             extid = b64encode(extid)
-        attrs = {'type': entity.id, 'eid': entity.eid, 'extid': extid,
+        attrs = {'type': entity.__regid__, 'eid': entity.eid, 'extid': extid,
                  'source': source.uri, 'mtime': datetime.now()}
         session.system_sql(self.sqlgen.insert('entities', attrs), attrs)
+        # now we can update the full text index
+        if self.do_fti and self.need_fti_indexation(entity.__regid__):
+            if complete:
+                entity.complete(entity.e_schema.indexable_attributes())
+            FTIndexEntityOp(session, entity=entity)
+
+    def update_info(self, session, entity, need_fti_update):
+        if self.do_fti and need_fti_update:
+            # reindex the entity only if this query is updating at least
+            # one indexable attribute
+            FTIndexEntityOp(session, entity=entity)
+        # update entities.mtime
+        attrs = {'eid': entity.eid, 'mtime': datetime.now()}
+        session.system_sql(self.sqlgen.update('entities', attrs, ['eid']), attrs)
 
     def delete_info(self, session, eid, etype, uri, extid):
         """delete system information on deletion of an entity by transfering
@@ -551,30 +578,6 @@
                  'source': uri, 'dtime': datetime.now()}
         session.system_sql(self.sqlgen.insert('deleted_entities', attrs), attrs)
 
-    def fti_unindex_entity(self, session, eid):
-        """remove text content for entity with the given eid from the full text
-        index
-        """
-        try:
-            self.indexer.cursor_unindex_object(eid, session.pool['system'])
-        except Exception: # let KeyboardInterrupt / SystemExit propagate
-            if self.indexer is not None:
-                self.exception('error while unindexing %s', eid)
-
-    def fti_index_entity(self, session, entity):
-        """add text content of a created/modified entity to the full text index
-        """
-        self.info('reindexing %r', entity.eid)
-        try:
-            self.indexer.cursor_reindex_object(entity.eid, entity,
-                                               session.pool['system'])
-        except Exception: # let KeyboardInterrupt / SystemExit propagate
-            if self.indexer is not None:
-                self.exception('error while reindexing %s', entity)
-        # update entities.mtime
-        attrs = {'eid': entity.eid, 'mtime': datetime.now()}
-        session.system_sql(self.sqlgen.update('entities', attrs, ['eid']), attrs)
-
     def modified_entities(self, session, etypes, mtime):
         """return a 2-uple:
         * list of (etype, eid) of entities of the given types which have been
@@ -591,6 +594,71 @@
         delentities = cursor.fetchall()
         return modentities, delentities
 
+    # full text index handling #################################################
+
+    @cached
+    def need_fti_indexation(self, etype):
+        eschema = self.schema.eschema(etype)
+        if any(eschema.indexable_attributes()):
+            return True
+        if any(eschema.fulltext_containers()):
+            return True
+        return False
+
+    def index_entity(self, session, entity):
+        """create an operation to [re]index textual content of the given entity
+        on commit
+        """
+        FTIndexEntityOp(session, entity=entity)
+
+    def fti_unindex_entity(self, session, eid):
+        """remove text content for entity with the given eid from the full text
+        index
+        """
+        try:
+            self.indexer.cursor_unindex_object(eid, session.pool['system'])
+        except Exception: # let KeyboardInterrupt / SystemExit propagate
+            self.exception('error while unindexing %s', eid)
+
+    def fti_index_entity(self, session, entity):
+        """add text content of a created/modified entity to the full text index
+        """
+        self.debug('reindexing %r', entity.eid)
+        try:
+            # use cursor_index_object, not cursor_reindex_object since
+            # unindexing done in the FTIndexEntityOp
+            self.indexer.cursor_index_object(entity.eid, entity,
+                                             session.pool['system'])
+        except Exception: # let KeyboardInterrupt / SystemExit propagate
+            self.exception('error while reindexing %s', entity)
+
+
+class FTIndexEntityOp(hook.LateOperation):
+    """operation to delay entity full text indexation to commit
+
+    since fti indexing may trigger discovery of other entities, it should be
+    triggered on precommit, not commit, and this should be done after other
+    precommit operation which may add relations to the entity
+    """
+
+    def precommit_event(self):
+        session = self.session
+        entity = self.entity
+        if entity.eid in session.transaction_data.get('pendingeids', ()):
+            return # entity added and deleted in the same transaction
+        alreadydone = session.transaction_data.setdefault('indexedeids', set())
+        if entity.eid in alreadydone:
+            self.debug('skipping reindexation of %s, already done', entity.eid)
+            return
+        alreadydone.add(entity.eid)
+        source = session.repo.system_source
+        for container in entity.fti_containers():
+            source.fti_unindex_entity(session, container.eid)
+            source.fti_index_entity(session, container)
+
+    def commit_event(self):
+        pass
+
 
 def sql_schema(driver):
     helper = get_adv_func_helper(driver)
@@ -644,3 +712,49 @@
     result += 'GRANT ALL ON deleted_entities TO %s;\n' % user
     result += 'GRANT ALL ON entities_id_seq TO %s;\n' % user
     return result
+
+
+class BaseAuthentifier(object):
+
+    def __init__(self, source=None):
+        self.source = source
+
+    def set_schema(self, schema):
+        """set the instance'schema"""
+        pass
+
+class LoginPasswordAuthentifier(BaseAuthentifier):
+    passwd_rql = "Any P WHERE X is CWUser, X login %(login)s, X upassword P"
+    auth_rql = "Any X WHERE X is CWUser, X login %(login)s, X upassword %(pwd)s"
+    _sols = ({'X': 'CWUser', 'P': 'Password'},)
+
+    def set_schema(self, schema):
+        """set the instance'schema"""
+        if 'CWUser' in schema: # probably an empty schema if not true...
+            # rql syntax trees used to authenticate users
+            self._passwd_rqlst = self.source.compile_rql(self.passwd_rql, self._sols)
+            self._auth_rqlst = self.source.compile_rql(self.auth_rql, self._sols)
+
+    def authenticate(self, session, login, password=None, **kwargs):
+        """return CWUser eid for the given login/password if this account is
+        defined in this source, else raise `AuthenticationError`
+
+        two queries are needed since passwords are stored crypted, so we have
+        to fetch the salt first
+        """
+        args = {'login': login, 'pwd' : password}
+        if password is not None:
+            rset = self.source.syntax_tree_search(session, self._passwd_rqlst, args)
+            try:
+                pwd = rset[0][0]
+            except IndexError:
+                raise AuthenticationError('bad login')
+            # passwords are stored using the Bytes type, so we get a StringIO
+            if pwd is not None:
+                args['pwd'] = Binary(crypt_password(password, pwd.getvalue()[:2]))
+        # get eid from login and (crypted) password
+        rset = self.source.syntax_tree_search(session, self._auth_rqlst, args)
+        try:
+            return rset[0][0]
+        except IndexError:
+            raise AuthenticationError('bad password')