first draft for a simple hooks based custom attribute storage,
authorSylvain Thénault <sylvain.thenault@logilab.fr>
Fri, 22 Jan 2010 08:49:16 +0100
changeset 4322 f65743cc53e4
parent 4321 80b455066c9a
child 4323 aae19998dd93
first draft for a simple hooks based custom attribute storage, with a BytesFileSystemStorage POC implementation. Basically: * a dictionary contains maps from which attribute of which entity types are mapped to which custom storage * hooks check for one of these entity type being added/modified/deleted * read is based on the sql generator callback mecanism (used in vcsfile for instance) * all storages have the same basic interface (read, add, update, delete), and should be pluggable in a transparent way (except at migration time when one want to change from a storage to another) * the sample BytesFileSystemStorage: * may store Bytes attributes content of any entity type as file on the file system * is based on one FSPATH rql/sql function and another _fsopen only available in sql * has a dumb file name allocation algorithm
cwconfig.py
hooks/storages.py
schemas/_regproc_bss.postgres.sql
server/sources/storages.py
server/sqlutils.py
--- a/cwconfig.py	Fri Jan 22 08:40:38 2010 +0100
+++ b/cwconfig.py	Fri Jan 22 08:49:16 2010 +0100
@@ -1037,3 +1037,11 @@
         supported_backends = ('mysql', 'postgres', 'sqlite',)
 
     register_function(TEXT_LIMIT_SIZE)
+
+
+
+    class FSPATH(FunctionDescr):
+        supported_backends = ('postgres', 'sqlite',)
+        rtype = 'Bytes'
+
+    register_function(FSPATH)
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/hooks/storages.py	Fri Jan 22 08:49:16 2010 +0100
@@ -0,0 +1,45 @@
+"""hooks to handle attributes mapped to a custom storage
+"""
+from os import unlink
+
+from cubicweb.server.hook import Hook
+from cubicweb.server.sources.storages import ETYPE_ATTR_STORAGE
+
+
+class BFSSHook(Hook):
+    """abstract class for bytes file-system storage hooks"""
+    __abstract__ = True
+    category = 'bfss'
+
+
+class PreAddEntityHook(BFSSHook):
+    """"""
+    __regid__ = 'bfss_add_entity'
+    events = ('before_add_entity', )
+    #__select__ = Hook.__select__ & implements('Repository')
+
+    def __call__(self):
+        for attr in ETYPE_ATTR_STORAGE.get(self.entity.__regid__, ()):
+            fpath = ETYPE_ATTR_STORAGE[self.entity.__regid__][attr].entity_added(self.entity, attr)
+            if fpath is not None:
+                AddFileOp(filepath=fpath)
+
+class PreUpdateEntityHook(BFSSHook):
+    """"""
+    __regid__ = 'bfss_update_entity'
+    events = ('before_update_entity', )
+    #__select__ = Hook.__select__ & implements('Repository')
+
+    def __call__(self):
+        for attr in ETYPE_ATTR_STORAGE.get(self.entity.__regid__, ()):
+            ETYPE_ATTR_STORAGE[self.entity.__regid__][attr].entity_updated(self.entity, attr)
+
+class PreDeleteEntityHook(BFSSHook):
+    """"""
+    __regid__ = 'bfss_delete_entity'
+    events = ('before_delete_entity', )
+    #__select__ = Hook.__select__ & implements('Repository')
+
+    def __call__(self):
+        for attr in ETYPE_ATTR_STORAGE.get(self.entity.__regid__, ()):
+            ETYPE_ATTR_STORAGE[self.entity.__regid__][attr].entity_deleted(self.entity, attr)
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/schemas/_regproc_bss.postgres.sql	Fri Jan 22 08:49:16 2010 +0100
@@ -0,0 +1,38 @@
+/* -*- sql -*-
+
+   postgres specific registered procedures for the Bytes File System storage,
+   require the plpythonu language installed
+
+*/
+
+
+CREATE OR REPLACE FUNCTION _fsopen(bytea) RETURNS bytea AS $$
+    fpath = args[0]
+    if fpath:
+        try:
+	    data = file(fpath, 'rb').read()
+	    #/* XXX due to plpython bug we have to replace some characters... */
+            return data.replace("\\", r"\134").replace("\000", r"\000").replace("'", r"\047") #'
+        except Exception, ex:
+	    plpy.warning('failed to get content for %s: %s', fpath, ex)
+     return None
+$$ LANGUAGE plpythonu
+/* WITH(ISCACHABLE) XXX does postgres handle caching of large data nicely */
+;;
+
+/* fspath(eid, entity type, attribute) */
+CREATE OR REPLACE FUNCTION fspath(bigint, text, text) RETURNS bytea AS $$
+    pkey = 'plan%s%s' % (args[1], args[2])
+    try:
+        plan = SD[pkey]
+    except KeyError:
+        #/* then prepare and cache plan to get versioned file information from a
+        # version content eid */
+        plan = plpy.prepare(
+            'SELECT X.cw_%s FROM cw_%s as X WHERE X.cw_eid=$1' % (args[2], args[1]),
+            ['bigint'])
+        SD[pkey] = plan
+    return plpy.execute(plan, [args[0]])[0]
+$$ LANGUAGE plpythonu
+/* WITH(ISCACHABLE) XXX does postgres handle caching of large data nicely */
+;;
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/server/sources/storages.py	Fri Jan 22 08:49:16 2010 +0100
@@ -0,0 +1,110 @@
+"""custom storages for the system source"""
+from os import unlink, path as osp
+
+from cubicweb.server.hook import Operation
+
+
+ETYPE_ATTR_STORAGE = {}
+def set_attribute_storage(repo, etype, attr, storage):
+    ETYPE_ATTR_STORAGE.setdefault(etype, {})[attr] = storage
+    repo.system_source.map_attribute(etype, attr, storage.sqlgen_callback)
+
+
+class Storage(object):
+    """abstract storage"""
+    def sqlgen_callback(self, generator, relation, linkedvar):
+        """sql generator callback when some attribute with a custom storage is
+        accessed
+        """
+        raise NotImplementedError()
+
+    def entity_added(self, entity, attr):
+        """an entity using this storage for attr has been added"""
+        raise NotImplementedError()
+    def entity_updated(self, entity, attr):
+        """an entity using this storage for attr has been updatded"""
+        raise NotImplementedError()
+    def entity_deleted(self, entity, attr):
+        """an entity using this storage for attr has been deleted"""
+        raise NotImplementedError()
+
+# TODO
+# * make it configurable without code
+# * better file path attribution
+
+class BytesFileSystemStorage(Storage):
+    """store Bytes attribute value on the file system"""
+    def __init__(self, defaultdir):
+        self.default_directory = defaultdir
+
+    def sqlgen_callback(self, generator, linkedvar, relation):
+        """sql generator callback when some attribute with a custom storage is
+        accessed
+        """
+        linkedvar.accept(generator)
+        return '_fsopen(%s.cw_%s)' % (
+            linkedvar._q_sql.split('.', 1)[0], # table name
+            relation.r_type) # attribute name
+
+    def entity_added(self, entity, attr):
+        """an entity using this storage for attr has been added"""
+        if not entity._cw.transaction_data.get('fs_importing'):
+            try:
+                value = entity.pop(attr)
+            except KeyError:
+                pass
+            else:
+                fpath = self.new_fs_path(entity, attr)
+                # bytes storage used to store file's path
+                entity[attr]= Binary(fpath)
+                file(fpath, 'w').write(value.getvalue())
+                AddFileOp(entity._cw, filepath=fpath)
+        # else entity[attr] is expected to be an already existant file path
+
+    def entity_updated(self, entity, attr):
+        """an entity using this storage for attr has been updatded"""
+        try:
+            value = entity.pop(attr)
+        except KeyError:
+            pass
+        else:
+            fpath = self.current_fs_path(entity, attr)
+            UpdateFileOp(entity._cw, filepath=fpath, filedata=value.getvalue())
+
+    def entity_deleted(self, entity, attr):
+        """an entity using this storage for attr has been deleted"""
+        DeleteFileOp(entity._cw, filepath=self.current_fs_path(entity, attr))
+
+    def new_fs_path(self, entity, attr):
+        fpath = osp.join(self.default_directory, '%s_%s_%s' % (
+            self.default_directory, entity.eid, attr))
+        while osp.exists(fspath):
+            fspath = '_' + fspath
+        return fspath
+
+    def current_fs_path(self, entity, attr):
+        cu = entity._cw.system_sql('SELECT cw_%s.%s WHERE cw_eid=%s' %
+                                   (entity.__regid__, attr, entity.eid))
+        return cu.fetchone()[0]
+
+
+class AddFileOp(Operation):
+    def rollback_event(self):
+        try:
+            unlink(self.filepath)
+        except:
+            pass
+
+class DeleteFileOp(Operation):
+    def commit_event(self):
+        try:
+            unlink(self.filepath)
+        except:
+            pass
+
+class UpdateFileOp(Operation):
+    def precommit_event(self):
+        try:
+            file(self.filepath, 'w').write(self.filedata)
+        except:
+            pass
--- a/server/sqlutils.py	Fri Jan 22 08:40:38 2010 +0100
+++ b/server/sqlutils.py	Fri Jan 22 08:49:16 2010 +0100
@@ -314,6 +314,29 @@
     if hasattr(yams.constraints, 'patch_sqlite_decimal'):
         yams.constraints.patch_sqlite_decimal()
 
+    def fspath(eid, etype, attr):
+        try:
+            cu = cnx.cursor()
+            cu.execute('SELECT X.cw_%s FROM cw_%s as X '
+                       'WHERE X.cw_eid=%%(eid)s' % (attr, etype),
+                       {'eid': eid})
+            return cu.fetchone()[0]
+        except:
+            import traceback
+            traceback.print_exc()
+            raise
+    cnx.create_function('fspath', 3, fspath)
+
+    def _fsopen(fspath):
+        if fspath:
+            try:
+                return buffer(file(fspath).read())
+            except:
+                import traceback
+                traceback.print_exc()
+                raise
+    cnx.create_function('_fsopen', 1, _fsopen)
+
 
 sqlite_hooks = SQL_CONNECT_HOOKS.setdefault('sqlite', [])
 sqlite_hooks.append(init_sqlite_connexion)