first draft for a simple hooks based custom attribute storage,
with a BytesFileSystemStorage POC implementation.
Basically:
* a dictionary contains maps from which attribute of which entity types are
mapped to which custom storage
* hooks check for one of these entity type being added/modified/deleted
* read is based on the sql generator callback mecanism (used in vcsfile for
instance)
* all storages have the same basic interface (read, add, update, delete),
and should be pluggable in a transparent way (except at migration time
when one want to change from a storage to another)
* the sample BytesFileSystemStorage:
* may store Bytes attributes content of any entity type as file on the file system
* is based on one FSPATH rql/sql function and another _fsopen only available in sql
* has a dumb file name allocation algorithm
--- a/cwconfig.py Fri Jan 22 08:40:38 2010 +0100
+++ b/cwconfig.py Fri Jan 22 08:49:16 2010 +0100
@@ -1037,3 +1037,11 @@
supported_backends = ('mysql', 'postgres', 'sqlite',)
register_function(TEXT_LIMIT_SIZE)
+
+
+
+ class FSPATH(FunctionDescr):
+ supported_backends = ('postgres', 'sqlite',)
+ rtype = 'Bytes'
+
+ register_function(FSPATH)
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/hooks/storages.py Fri Jan 22 08:49:16 2010 +0100
@@ -0,0 +1,45 @@
+"""hooks to handle attributes mapped to a custom storage
+"""
+from os import unlink
+
+from cubicweb.server.hook import Hook
+from cubicweb.server.sources.storages import ETYPE_ATTR_STORAGE
+
+
+class BFSSHook(Hook):
+ """abstract class for bytes file-system storage hooks"""
+ __abstract__ = True
+ category = 'bfss'
+
+
+class PreAddEntityHook(BFSSHook):
+ """"""
+ __regid__ = 'bfss_add_entity'
+ events = ('before_add_entity', )
+ #__select__ = Hook.__select__ & implements('Repository')
+
+ def __call__(self):
+ for attr in ETYPE_ATTR_STORAGE.get(self.entity.__regid__, ()):
+ fpath = ETYPE_ATTR_STORAGE[self.entity.__regid__][attr].entity_added(self.entity, attr)
+ if fpath is not None:
+ AddFileOp(filepath=fpath)
+
+class PreUpdateEntityHook(BFSSHook):
+ """"""
+ __regid__ = 'bfss_update_entity'
+ events = ('before_update_entity', )
+ #__select__ = Hook.__select__ & implements('Repository')
+
+ def __call__(self):
+ for attr in ETYPE_ATTR_STORAGE.get(self.entity.__regid__, ()):
+ ETYPE_ATTR_STORAGE[self.entity.__regid__][attr].entity_updated(self.entity, attr)
+
+class PreDeleteEntityHook(BFSSHook):
+ """"""
+ __regid__ = 'bfss_delete_entity'
+ events = ('before_delete_entity', )
+ #__select__ = Hook.__select__ & implements('Repository')
+
+ def __call__(self):
+ for attr in ETYPE_ATTR_STORAGE.get(self.entity.__regid__, ()):
+ ETYPE_ATTR_STORAGE[self.entity.__regid__][attr].entity_deleted(self.entity, attr)
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/schemas/_regproc_bss.postgres.sql Fri Jan 22 08:49:16 2010 +0100
@@ -0,0 +1,38 @@
+/* -*- sql -*-
+
+ postgres specific registered procedures for the Bytes File System storage,
+ require the plpythonu language installed
+
+*/
+
+
+CREATE OR REPLACE FUNCTION _fsopen(bytea) RETURNS bytea AS $$
+ fpath = args[0]
+ if fpath:
+ try:
+ data = file(fpath, 'rb').read()
+ #/* XXX due to plpython bug we have to replace some characters... */
+ return data.replace("\\", r"\134").replace("\000", r"\000").replace("'", r"\047") #'
+ except Exception, ex:
+ plpy.warning('failed to get content for %s: %s', fpath, ex)
+ return None
+$$ LANGUAGE plpythonu
+/* WITH(ISCACHABLE) XXX does postgres handle caching of large data nicely */
+;;
+
+/* fspath(eid, entity type, attribute) */
+CREATE OR REPLACE FUNCTION fspath(bigint, text, text) RETURNS bytea AS $$
+ pkey = 'plan%s%s' % (args[1], args[2])
+ try:
+ plan = SD[pkey]
+ except KeyError:
+ #/* then prepare and cache plan to get versioned file information from a
+ # version content eid */
+ plan = plpy.prepare(
+ 'SELECT X.cw_%s FROM cw_%s as X WHERE X.cw_eid=$1' % (args[2], args[1]),
+ ['bigint'])
+ SD[pkey] = plan
+ return plpy.execute(plan, [args[0]])[0]
+$$ LANGUAGE plpythonu
+/* WITH(ISCACHABLE) XXX does postgres handle caching of large data nicely */
+;;
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/server/sources/storages.py Fri Jan 22 08:49:16 2010 +0100
@@ -0,0 +1,110 @@
+"""custom storages for the system source"""
+from os import unlink, path as osp
+
+from cubicweb.server.hook import Operation
+
+
+ETYPE_ATTR_STORAGE = {}
+def set_attribute_storage(repo, etype, attr, storage):
+ ETYPE_ATTR_STORAGE.setdefault(etype, {})[attr] = storage
+ repo.system_source.map_attribute(etype, attr, storage.sqlgen_callback)
+
+
+class Storage(object):
+ """abstract storage"""
+ def sqlgen_callback(self, generator, relation, linkedvar):
+ """sql generator callback when some attribute with a custom storage is
+ accessed
+ """
+ raise NotImplementedError()
+
+ def entity_added(self, entity, attr):
+ """an entity using this storage for attr has been added"""
+ raise NotImplementedError()
+ def entity_updated(self, entity, attr):
+ """an entity using this storage for attr has been updatded"""
+ raise NotImplementedError()
+ def entity_deleted(self, entity, attr):
+ """an entity using this storage for attr has been deleted"""
+ raise NotImplementedError()
+
+# TODO
+# * make it configurable without code
+# * better file path attribution
+
+class BytesFileSystemStorage(Storage):
+ """store Bytes attribute value on the file system"""
+ def __init__(self, defaultdir):
+ self.default_directory = defaultdir
+
+ def sqlgen_callback(self, generator, linkedvar, relation):
+ """sql generator callback when some attribute with a custom storage is
+ accessed
+ """
+ linkedvar.accept(generator)
+ return '_fsopen(%s.cw_%s)' % (
+ linkedvar._q_sql.split('.', 1)[0], # table name
+ relation.r_type) # attribute name
+
+ def entity_added(self, entity, attr):
+ """an entity using this storage for attr has been added"""
+ if not entity._cw.transaction_data.get('fs_importing'):
+ try:
+ value = entity.pop(attr)
+ except KeyError:
+ pass
+ else:
+ fpath = self.new_fs_path(entity, attr)
+ # bytes storage used to store file's path
+ entity[attr]= Binary(fpath)
+ file(fpath, 'w').write(value.getvalue())
+ AddFileOp(entity._cw, filepath=fpath)
+ # else entity[attr] is expected to be an already existant file path
+
+ def entity_updated(self, entity, attr):
+ """an entity using this storage for attr has been updatded"""
+ try:
+ value = entity.pop(attr)
+ except KeyError:
+ pass
+ else:
+ fpath = self.current_fs_path(entity, attr)
+ UpdateFileOp(entity._cw, filepath=fpath, filedata=value.getvalue())
+
+ def entity_deleted(self, entity, attr):
+ """an entity using this storage for attr has been deleted"""
+ DeleteFileOp(entity._cw, filepath=self.current_fs_path(entity, attr))
+
+ def new_fs_path(self, entity, attr):
+ fpath = osp.join(self.default_directory, '%s_%s_%s' % (
+ self.default_directory, entity.eid, attr))
+ while osp.exists(fspath):
+ fspath = '_' + fspath
+ return fspath
+
+ def current_fs_path(self, entity, attr):
+ cu = entity._cw.system_sql('SELECT cw_%s.%s WHERE cw_eid=%s' %
+ (entity.__regid__, attr, entity.eid))
+ return cu.fetchone()[0]
+
+
+class AddFileOp(Operation):
+ def rollback_event(self):
+ try:
+ unlink(self.filepath)
+ except:
+ pass
+
+class DeleteFileOp(Operation):
+ def commit_event(self):
+ try:
+ unlink(self.filepath)
+ except:
+ pass
+
+class UpdateFileOp(Operation):
+ def precommit_event(self):
+ try:
+ file(self.filepath, 'w').write(self.filedata)
+ except:
+ pass
--- a/server/sqlutils.py Fri Jan 22 08:40:38 2010 +0100
+++ b/server/sqlutils.py Fri Jan 22 08:49:16 2010 +0100
@@ -314,6 +314,29 @@
if hasattr(yams.constraints, 'patch_sqlite_decimal'):
yams.constraints.patch_sqlite_decimal()
+ def fspath(eid, etype, attr):
+ try:
+ cu = cnx.cursor()
+ cu.execute('SELECT X.cw_%s FROM cw_%s as X '
+ 'WHERE X.cw_eid=%%(eid)s' % (attr, etype),
+ {'eid': eid})
+ return cu.fetchone()[0]
+ except:
+ import traceback
+ traceback.print_exc()
+ raise
+ cnx.create_function('fspath', 3, fspath)
+
+ def _fsopen(fspath):
+ if fspath:
+ try:
+ return buffer(file(fspath).read())
+ except:
+ import traceback
+ traceback.print_exc()
+ raise
+ cnx.create_function('_fsopen', 1, _fsopen)
+
sqlite_hooks = SQL_CONNECT_HOOKS.setdefault('sqlite', [])
sqlite_hooks.append(init_sqlite_connexion)