# HG changeset patch # User Sylvain Thénault # Date 1264146556 -3600 # Node ID f65743cc53e459c5f2af63cc5278b2e4a8eed3a4 # Parent 80b455066c9a8b8e960b4c24cd2b7750333b2406 first draft for a simple hooks based custom attribute storage, with a BytesFileSystemStorage POC implementation. Basically: * a dictionary contains maps from which attribute of which entity types are mapped to which custom storage * hooks check for one of these entity type being added/modified/deleted * read is based on the sql generator callback mecanism (used in vcsfile for instance) * all storages have the same basic interface (read, add, update, delete), and should be pluggable in a transparent way (except at migration time when one want to change from a storage to another) * the sample BytesFileSystemStorage: * may store Bytes attributes content of any entity type as file on the file system * is based on one FSPATH rql/sql function and another _fsopen only available in sql * has a dumb file name allocation algorithm diff -r 80b455066c9a -r f65743cc53e4 cwconfig.py --- a/cwconfig.py Fri Jan 22 08:40:38 2010 +0100 +++ b/cwconfig.py Fri Jan 22 08:49:16 2010 +0100 @@ -1037,3 +1037,11 @@ supported_backends = ('mysql', 'postgres', 'sqlite',) register_function(TEXT_LIMIT_SIZE) + + + + class FSPATH(FunctionDescr): + supported_backends = ('postgres', 'sqlite',) + rtype = 'Bytes' + + register_function(FSPATH) diff -r 80b455066c9a -r f65743cc53e4 hooks/storages.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/hooks/storages.py Fri Jan 22 08:49:16 2010 +0100 @@ -0,0 +1,45 @@ +"""hooks to handle attributes mapped to a custom storage +""" +from os import unlink + +from cubicweb.server.hook import Hook +from cubicweb.server.sources.storages import ETYPE_ATTR_STORAGE + + +class BFSSHook(Hook): + """abstract class for bytes file-system storage hooks""" + __abstract__ = True + category = 'bfss' + + +class PreAddEntityHook(BFSSHook): + """""" + __regid__ = 'bfss_add_entity' + events = ('before_add_entity', ) + #__select__ = Hook.__select__ & implements('Repository') + + def __call__(self): + for attr in ETYPE_ATTR_STORAGE.get(self.entity.__regid__, ()): + fpath = ETYPE_ATTR_STORAGE[self.entity.__regid__][attr].entity_added(self.entity, attr) + if fpath is not None: + AddFileOp(filepath=fpath) + +class PreUpdateEntityHook(BFSSHook): + """""" + __regid__ = 'bfss_update_entity' + events = ('before_update_entity', ) + #__select__ = Hook.__select__ & implements('Repository') + + def __call__(self): + for attr in ETYPE_ATTR_STORAGE.get(self.entity.__regid__, ()): + ETYPE_ATTR_STORAGE[self.entity.__regid__][attr].entity_updated(self.entity, attr) + +class PreDeleteEntityHook(BFSSHook): + """""" + __regid__ = 'bfss_delete_entity' + events = ('before_delete_entity', ) + #__select__ = Hook.__select__ & implements('Repository') + + def __call__(self): + for attr in ETYPE_ATTR_STORAGE.get(self.entity.__regid__, ()): + ETYPE_ATTR_STORAGE[self.entity.__regid__][attr].entity_deleted(self.entity, attr) diff -r 80b455066c9a -r f65743cc53e4 schemas/_regproc_bss.postgres.sql --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/schemas/_regproc_bss.postgres.sql Fri Jan 22 08:49:16 2010 +0100 @@ -0,0 +1,38 @@ +/* -*- sql -*- + + postgres specific registered procedures for the Bytes File System storage, + require the plpythonu language installed + +*/ + + +CREATE OR REPLACE FUNCTION _fsopen(bytea) RETURNS bytea AS $$ + fpath = args[0] + if fpath: + try: + data = file(fpath, 'rb').read() + #/* XXX due to plpython bug we have to replace some characters... */ + return data.replace("\\", r"\134").replace("\000", r"\000").replace("'", r"\047") #' + except Exception, ex: + plpy.warning('failed to get content for %s: %s', fpath, ex) + return None +$$ LANGUAGE plpythonu +/* WITH(ISCACHABLE) XXX does postgres handle caching of large data nicely */ +;; + +/* fspath(eid, entity type, attribute) */ +CREATE OR REPLACE FUNCTION fspath(bigint, text, text) RETURNS bytea AS $$ + pkey = 'plan%s%s' % (args[1], args[2]) + try: + plan = SD[pkey] + except KeyError: + #/* then prepare and cache plan to get versioned file information from a + # version content eid */ + plan = plpy.prepare( + 'SELECT X.cw_%s FROM cw_%s as X WHERE X.cw_eid=$1' % (args[2], args[1]), + ['bigint']) + SD[pkey] = plan + return plpy.execute(plan, [args[0]])[0] +$$ LANGUAGE plpythonu +/* WITH(ISCACHABLE) XXX does postgres handle caching of large data nicely */ +;; diff -r 80b455066c9a -r f65743cc53e4 server/sources/storages.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/server/sources/storages.py Fri Jan 22 08:49:16 2010 +0100 @@ -0,0 +1,110 @@ +"""custom storages for the system source""" +from os import unlink, path as osp + +from cubicweb.server.hook import Operation + + +ETYPE_ATTR_STORAGE = {} +def set_attribute_storage(repo, etype, attr, storage): + ETYPE_ATTR_STORAGE.setdefault(etype, {})[attr] = storage + repo.system_source.map_attribute(etype, attr, storage.sqlgen_callback) + + +class Storage(object): + """abstract storage""" + def sqlgen_callback(self, generator, relation, linkedvar): + """sql generator callback when some attribute with a custom storage is + accessed + """ + raise NotImplementedError() + + def entity_added(self, entity, attr): + """an entity using this storage for attr has been added""" + raise NotImplementedError() + def entity_updated(self, entity, attr): + """an entity using this storage for attr has been updatded""" + raise NotImplementedError() + def entity_deleted(self, entity, attr): + """an entity using this storage for attr has been deleted""" + raise NotImplementedError() + +# TODO +# * make it configurable without code +# * better file path attribution + +class BytesFileSystemStorage(Storage): + """store Bytes attribute value on the file system""" + def __init__(self, defaultdir): + self.default_directory = defaultdir + + def sqlgen_callback(self, generator, linkedvar, relation): + """sql generator callback when some attribute with a custom storage is + accessed + """ + linkedvar.accept(generator) + return '_fsopen(%s.cw_%s)' % ( + linkedvar._q_sql.split('.', 1)[0], # table name + relation.r_type) # attribute name + + def entity_added(self, entity, attr): + """an entity using this storage for attr has been added""" + if not entity._cw.transaction_data.get('fs_importing'): + try: + value = entity.pop(attr) + except KeyError: + pass + else: + fpath = self.new_fs_path(entity, attr) + # bytes storage used to store file's path + entity[attr]= Binary(fpath) + file(fpath, 'w').write(value.getvalue()) + AddFileOp(entity._cw, filepath=fpath) + # else entity[attr] is expected to be an already existant file path + + def entity_updated(self, entity, attr): + """an entity using this storage for attr has been updatded""" + try: + value = entity.pop(attr) + except KeyError: + pass + else: + fpath = self.current_fs_path(entity, attr) + UpdateFileOp(entity._cw, filepath=fpath, filedata=value.getvalue()) + + def entity_deleted(self, entity, attr): + """an entity using this storage for attr has been deleted""" + DeleteFileOp(entity._cw, filepath=self.current_fs_path(entity, attr)) + + def new_fs_path(self, entity, attr): + fpath = osp.join(self.default_directory, '%s_%s_%s' % ( + self.default_directory, entity.eid, attr)) + while osp.exists(fspath): + fspath = '_' + fspath + return fspath + + def current_fs_path(self, entity, attr): + cu = entity._cw.system_sql('SELECT cw_%s.%s WHERE cw_eid=%s' % + (entity.__regid__, attr, entity.eid)) + return cu.fetchone()[0] + + +class AddFileOp(Operation): + def rollback_event(self): + try: + unlink(self.filepath) + except: + pass + +class DeleteFileOp(Operation): + def commit_event(self): + try: + unlink(self.filepath) + except: + pass + +class UpdateFileOp(Operation): + def precommit_event(self): + try: + file(self.filepath, 'w').write(self.filedata) + except: + pass diff -r 80b455066c9a -r f65743cc53e4 server/sqlutils.py --- a/server/sqlutils.py Fri Jan 22 08:40:38 2010 +0100 +++ b/server/sqlutils.py Fri Jan 22 08:49:16 2010 +0100 @@ -314,6 +314,29 @@ if hasattr(yams.constraints, 'patch_sqlite_decimal'): yams.constraints.patch_sqlite_decimal() + def fspath(eid, etype, attr): + try: + cu = cnx.cursor() + cu.execute('SELECT X.cw_%s FROM cw_%s as X ' + 'WHERE X.cw_eid=%%(eid)s' % (attr, etype), + {'eid': eid}) + return cu.fetchone()[0] + except: + import traceback + traceback.print_exc() + raise + cnx.create_function('fspath', 3, fspath) + + def _fsopen(fspath): + if fspath: + try: + return buffer(file(fspath).read()) + except: + import traceback + traceback.print_exc() + raise + cnx.create_function('_fsopen', 1, _fsopen) + sqlite_hooks = SQL_CONNECT_HOOKS.setdefault('sqlite', []) sqlite_hooks.append(init_sqlite_connexion)