server/sources/extlite.py
changeset 257 4c7d3af7e94d
child 983 cf1caf460081
child 1137 9ce0ac82f94f
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/server/sources/extlite.py	Mon Dec 22 17:34:15 2008 +0100
@@ -0,0 +1,247 @@
+"""provide an abstract class for external sources using a sqlite database helper
+
+:organization: Logilab
+:copyright: 2007-2008 LOGILAB S.A. (Paris, FRANCE), all rights reserved.
+:contact: http://www.logilab.fr/ -- mailto:contact@logilab.fr
+"""
+__docformat__ = "restructuredtext en"
+
+
+import time
+import threading
+from os.path import join, exists
+
+from cubicweb import server
+from cubicweb.server.sqlutils import sqlexec, SQLAdapterMixIn
+from cubicweb.server.sources import AbstractSource, native
+from cubicweb.server.sources.rql2sql import SQLGenerator
+
+def timeout_acquire(lock, timeout):
+    while not lock.acquire(False):
+        time.sleep(0.2)
+        timeout -= 0.2
+        if timeout <= 0:
+            raise RuntimeError("svn source is busy, can't acquire connection lock")
+        
+class ConnectionWrapper(object):
+    def __init__(self, source=None):
+        self.source = source
+        self._cnx = None
+
+    @property
+    def cnx(self):
+        if self._cnx is None:
+            timeout_acquire(self.source._cnxlock, 5)
+            self._cnx = self.source._sqlcnx
+        return self._cnx
+    
+    def commit(self):
+        if self._cnx is not None:
+            self._cnx.commit()
+        
+    def rollback(self):
+        if self._cnx is not None:
+            self._cnx.rollback()
+        
+    def cursor(self):
+        return self.cnx.cursor()
+
+    
+class SQLiteAbstractSource(AbstractSource):
+    """an abstract class for external sources using a sqlite database helper
+    """
+    sqlgen_class = SQLGenerator
+    @classmethod
+    def set_nonsystem_types(cls):
+        # those entities are only in this source, we don't want them in the
+        # system source
+        for etype in cls.support_entities:
+            native.NONSYSTEM_ETYPES.add(etype)
+        for rtype in cls.support_relations:
+            native.NONSYSTEM_RELATIONS.add(rtype)
+        
+    options = (
+        ('helper-db-path',
+         {'type' : 'string',
+          'default': None,
+          'help': 'path to the sqlite database file used to do queries on the \
+repository.',
+          'inputlevel': 2,
+          }),
+    )
+            
+    def __init__(self, repo, appschema, source_config, *args, **kwargs):
+        # the helper db is used to easy querying and will store everything but
+        # actual file content 
+        dbpath = source_config.get('helper-db-path')
+        if dbpath is None:
+            dbpath = join(repo.config.appdatahome,
+                          '%(uri)s.sqlite' % source_config)
+        self.dbpath = dbpath
+        self.sqladapter = SQLAdapterMixIn({'db-driver': 'sqlite',
+                                           'db-name': dbpath})
+        # those attributes have to be initialized before ancestor's __init__
+        # which will call set_schema
+        self._need_sql_create = not exists(dbpath)
+        self._need_full_import = self._need_sql_create
+        AbstractSource.__init__(self, repo, appschema, source_config,
+                                *args, **kwargs)
+        # sql database can only be accessed by one connection at a time, and a
+        # connection can only be used by the thread which created it so:
+        # * create the connection when needed
+        # * use a lock to be sure only one connection is used
+        self._cnxlock = threading.Lock()
+        
+    @property
+    def _sqlcnx(self):
+        # XXX: sqlite connections can only be used in the same thread, so
+        #      create a new one each time necessary. If it appears to be time
+        #      consuming, find another way
+        return self.sqladapter.get_connection()
+
+    def _is_schema_complete(self):
+        for etype in self.support_entities:
+            if not etype in self.schema:
+                self.warning('not ready to generate %s database, %s support missing from schema',
+                             self.uri, etype)
+                return False
+        for rtype in self.support_relations:
+            if not rtype in self.schema:
+                self.warning('not ready to generate %s database, %s support missing from schema',
+                             self.uri, rtype)
+                return False
+        return True
+
+    def _create_database(self):
+        from yams.schema2sql import eschema2sql, rschema2sql
+        from cubicweb.toolsutils import restrict_perms_to_user
+        self.warning('initializing sqlite database for %s source' % self.uri)
+        cnx = self._sqlcnx
+        cu = cnx.cursor()
+        schema = self.schema
+        for etype in self.support_entities:
+            eschema = schema.eschema(etype)
+            createsqls = eschema2sql(self.sqladapter.dbhelper, eschema,
+                                     skip_relations=('data',))
+            sqlexec(createsqls, cu, withpb=False)
+        for rtype in self.support_relations:
+            rschema = schema.rschema(rtype)
+            if not rschema.inlined:
+                sqlexec(rschema2sql(rschema), cu, withpb=False)
+        cnx.commit()
+        cnx.close()
+        self._need_sql_create = False
+        if self.repo.config['uid']:
+            from logilab.common.shellutils import chown
+            # database file must be owned by the uid of the server process
+            self.warning('set %s as owner of the database file',
+                         self.repo.config['uid'])
+            chown(self.dbpath, self.repo.config['uid'])
+        restrict_perms_to_user(self.dbpath, self.info)
+        
+    def set_schema(self, schema):
+        super(SQLiteAbstractSource, self).set_schema(schema)
+        if self._need_sql_create and self._is_schema_complete():
+            self._create_database()
+        self.rqlsqlgen = self.sqlgen_class(schema, self.sqladapter.dbhelper)
+                
+    def get_connection(self):
+        return ConnectionWrapper(self)
+
+    def check_connection(self, cnx):
+        """check connection validity, return None if the connection is still valid
+        else a new connection (called when the pool using the given connection is
+        being attached to a session)
+
+        always return the connection to reset eventually cached cursor
+        """
+        return cnx
+
+    def pool_reset(self, cnx):
+        """the pool using the given connection is being reseted from its current
+        attached session: release the connection lock if the connection wrapper
+        has a connection set
+        """
+        if cnx._cnx is not None:
+            try:
+                cnx._cnx.close()
+                cnx._cnx = None
+            finally:
+                self._cnxlock.release()
+        
+    def syntax_tree_search(self, session, union,
+                           args=None, cachekey=None, varmap=None, debug=0):
+        """return result from this source for a rql query (actually from a rql 
+        syntax tree and a solution dictionary mapping each used variable to a 
+        possible type). If cachekey is given, the query necessary to fetch the
+        results (but not the results themselves) may be cached using this key.
+        """
+        if self._need_sql_create:
+            return []
+        sql, query_args = self.rqlsqlgen.generate(union, args)
+        if server.DEBUG:
+            print self.uri, 'SOURCE RQL', union.as_string()
+            print 'GENERATED SQL', sql
+        args = self.sqladapter.merge_args(args, query_args)
+        cursor = session.pool[self.uri]
+        cursor.execute(sql, args)
+        return self.sqladapter.process_result(cursor) 
+
+    def local_add_entity(self, session, entity):
+        """insert the entity in the local database.
+
+        This is not provided as add_entity implementation since usually source
+        don't want to simply do this, so let raise NotImplementedError and the
+        source implementor may use this method if necessary
+        """
+        cu = session.pool[self.uri]
+        attrs = self.sqladapter.preprocess_entity(entity)
+        sql = self.sqladapter.sqlgen.insert(str(entity.e_schema), attrs)
+        cu.execute(sql, attrs)
+        
+    def add_entity(self, session, entity):
+        """add a new entity to the source"""
+        raise NotImplementedError()
+
+    def local_update_entity(self, session, entity):
+        """update an entity in the source
+
+        This is not provided as update_entity implementation since usually
+        source don't want to simply do this, so let raise NotImplementedError
+        and the source implementor may use this method if necessary
+        """
+        cu = session.pool[self.uri]
+        attrs = self.sqladapter.preprocess_entity(entity)
+        sql = self.sqladapter.sqlgen.update(str(entity.e_schema), attrs, ['eid'])
+        cu.execute(sql, attrs)
+        
+    def update_entity(self, session, entity):
+        """update an entity in the source"""
+        raise NotImplementedError()
+        
+    def delete_entity(self, session, etype, eid):
+        """delete an entity from the source
+
+        this is not deleting a file in the svn but deleting entities from the
+        source. Main usage is to delete repository content when a Repository
+        entity is deleted.
+        """
+        sqlcursor = session.pool[self.uri]        
+        attrs = {'eid': eid}
+        sql = self.sqladapter.sqlgen.delete(etype, attrs)
+        sqlcursor.execute(sql, attrs)
+    
+    def delete_relation(self, session, subject, rtype, object):
+        """delete a relation from the source"""
+        rschema = self.schema.rschema(rtype)
+        if rschema.inlined:
+            if subject in session.query_data('pendingeids', ()):
+                return
+            etype = session.describe(subject)[0]
+            sql = 'UPDATE %s SET %s=NULL WHERE eid=%%(eid)s' % (etype, rtype)
+            attrs = {'eid' : subject}
+        else:
+            attrs = {'eid_from': subject, 'eid_to': object}
+            sql = self.sqladapter.sqlgen.delete('%s_relation' % rtype, attrs)
+        sqlcursor = session.pool[self.uri]        
+        sqlcursor.execute(sql, attrs)