[source storage] refactor source sql generation and results handling to allow repository side callbacks stable
authorSylvain Thénault <sylvain.thenault@logilab.fr>
Thu, 25 Mar 2010 13:59:47 +0100
branchstable
changeset 5013 ad91f93bbb93
parent 5012 9c4ea944ecf9
child 5014 96bb4e7e3348
[source storage] refactor source sql generation and results handling to allow repository side callbacks for instance with the BytesFileSystemStorage, before this change: * fspath, _fsopen function were stored procedures executed on the database -> files had to be available both on the repository *and* the database host * we needed implementation for each handled database Now, those function are python callbacks executed when necessary on the repository side, on data comming from the database. The litle cons are: * you can't do anymore restriction on mapped attributes * you can't write queries which will return in the same rset column some mapped attributes (or not mapped the same way) / some not This seems much acceptable since: * it's much more easy to handle when you start having the db on another host than the repo * BFSS works seemlessly on any backend now * you don't bother that much about the cons (at least in the bfss case): you usually don't do any restriction on Bytes... Bonus points: BFSS is more efficient (no queries under the cover as it was done in the registered procedure) and we have a much nicer/efficient fspath implementation. IMO, that rocks :D
cwconfig.py
misc/migration/3.7.2_Any.py
schemas/_regproc_bss.postgres.sql
server/sources/extlite.py
server/sources/native.py
server/sources/rql2sql.py
server/sources/storages.py
server/sqlutils.py
server/test/unittest_rql2sql.py
server/test/unittest_storage.py
--- a/cwconfig.py	Thu Mar 25 13:49:07 2010 +0100
+++ b/cwconfig.py	Thu Mar 25 13:59:47 2010 +0100
@@ -90,7 +90,8 @@
 from logilab.common.configuration import (Configuration, Method,
                                           ConfigurationMixIn, merge_options)
 
-from cubicweb import CW_SOFTWARE_ROOT, CW_MIGRATION_MAP, ConfigurationError
+from cubicweb import (CW_SOFTWARE_ROOT, CW_MIGRATION_MAP,
+                      ConfigurationError, Binary)
 from cubicweb.toolsutils import env_path, create_dir
 
 CONFIGURATIONS = []
@@ -1050,7 +1051,24 @@
 
 
     class FSPATH(FunctionDescr):
-        supported_backends = ('postgres', 'sqlite',)
-        rtype = 'Bytes'
+        """return path of some bytes attribute stored using the Bytes
+        File-System Storage (bfss)
+        """
+        rtype = 'Bytes' # XXX return a String? potential pb with fs encoding
+
+        def update_cb_stack(self, stack):
+            assert len(stack) == 1
+            stack[0] = self.source_execute
+
+        def as_sql(self, backend, args):
+            raise NotImplementedError('source only callback')
+
+        def source_execute(self, source, value):
+            fpath = source.binary_to_str(value)
+            try:
+                return Binary(fpath)
+            except OSError, ex:
+                self.critical("can't open %s: %s", fpath, ex)
+                return None
 
     register_function(FSPATH)
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/misc/migration/3.7.2_Any.py	Thu Mar 25 13:59:47 2010 +0100
@@ -0,0 +1,2 @@
+sql('DROP FUNCTION IF EXISTS _fsopen')
+sql('DROP FUNCTION IF EXISTS fspath')
--- a/schemas/_regproc_bss.postgres.sql	Thu Mar 25 13:49:07 2010 +0100
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,38 +0,0 @@
-/* -*- sql -*-
-
-   postgres specific registered procedures for the Bytes File System storage,
-   require the plpythonu language installed
-
-*/
-
-
-CREATE OR REPLACE FUNCTION _fsopen(bytea) RETURNS bytea AS $$
-    fpath = args[0]
-    if fpath:
-        try:
-            data = file(fpath, 'rb').read()
-            #/* XXX due to plpython bug we have to replace some characters... */
-            return data.replace("\\", r"\134").replace("\000", r"\000").replace("'", r"\047") #'
-        except Exception, ex:
-            plpy.warning('failed to get content for %s: %s', fpath, ex)
-    return None
-$$ LANGUAGE plpythonu
-/* WITH(ISCACHABLE) XXX does postgres handle caching of large data nicely */
-;;
-
-/* fspath(eid, entity type, attribute) */
-CREATE OR REPLACE FUNCTION fspath(bigint, text, text) RETURNS bytea AS $$
-    pkey = 'plan%s%s' % (args[1], args[2])
-    try:
-        plan = SD[pkey]
-    except KeyError:
-        #/* then prepare and cache plan to get versioned file information from a
-        # version content eid */
-        plan = plpy.prepare(
-            'SELECT X.cw_%s FROM cw_%s as X WHERE X.cw_eid=$1' % (args[2], args[1]),
-            ['bigint'])
-        SD[pkey] = plan
-    return plpy.execute(plan, [args[0]])[0]['cw_' + args[2]]
-$$ LANGUAGE plpythonu
-/* WITH(ISCACHABLE) XXX does postgres handle caching of large data nicely */
-;;
--- a/server/sources/extlite.py	Thu Mar 25 13:49:07 2010 +0100
+++ b/server/sources/extlite.py	Thu Mar 25 13:59:47 2010 +0100
@@ -187,9 +187,10 @@
         if self._need_sql_create:
             return []
         assert dbg_st_search(self.uri, union, varmap, args, cachekey)
-        sql, query_args = self.rqlsqlgen.generate(union, args)
-        args = self.sqladapter.merge_args(args, query_args)
-        results = self.sqladapter.process_result(self.doexec(session, sql, args))
+        sql, qargs, cbs = self.rqlsqlgen.generate(union, args)
+        args = self.sqladapter.merge_args(args, qargs)
+        cursor = self.doexec(session, sql, args)
+        results = self.sqladapter.process_result(cursor, cbs)
         assert dbg_results(results)
         return results
 
--- a/server/sources/native.py	Thu Mar 25 13:49:07 2010 +0100
+++ b/server/sources/native.py	Thu Mar 25 13:59:47 2010 +0100
@@ -264,8 +264,9 @@
     def init(self):
         self.init_creating()
 
-    def map_attribute(self, etype, attr, cb):
-        self._rql_sqlgen.attr_map['%s.%s' % (etype, attr)] = cb
+    # XXX deprecates [un]map_attribute ?
+    def map_attribute(self, etype, attr, cb, sourcedb=True):
+        self._rql_sqlgen.attr_map['%s.%s' % (etype, attr)] = (cb, sourcedb)
 
     def unmap_attribute(self, etype, attr):
         self._rql_sqlgen.attr_map.pop('%s.%s' % (etype, attr), None)
@@ -273,7 +274,8 @@
     def set_storage(self, etype, attr, storage):
         storage_dict = self._storages.setdefault(etype, {})
         storage_dict[attr] = storage
-        self.map_attribute(etype, attr, storage.sqlgen_callback)
+        self.map_attribute(etype, attr,
+                           storage.callback, storage.is_source_callback)
 
     def unset_storage(self, etype, attr):
         self._storages[etype].pop(attr)
@@ -348,17 +350,17 @@
         if cachekey is None:
             self.no_cache += 1
             # generate sql query if we are able to do so (not supported types...)
-            sql, query_args = self._rql_sqlgen.generate(union, args, varmap)
+            sql, qargs, cbs = self._rql_sqlgen.generate(union, args, varmap)
         else:
             # sql may be cached
             try:
-                sql, query_args = self._cache[cachekey]
+                sql, qargs, cbs = self._cache[cachekey]
                 self.cache_hit += 1
             except KeyError:
                 self.cache_miss += 1
-                sql, query_args = self._rql_sqlgen.generate(union, args, varmap)
-                self._cache[cachekey] = sql, query_args
-        args = self.merge_args(args, query_args)
+                sql, qargs, cbs = self._rql_sqlgen.generate(union, args, varmap)
+                self._cache[cachekey] = sql, qargs, cbs
+        args = self.merge_args(args, qargs)
         assert isinstance(sql, basestring), repr(sql)
         try:
             cursor = self.doexec(session, sql, args)
@@ -367,7 +369,7 @@
             self.info("request failed '%s' ... retry with a new cursor", sql)
             session.pool.reconnect(self)
             cursor = self.doexec(session, sql, args)
-        results = self.process_result(cursor)
+        results = self.process_result(cursor, cbs)
         assert dbg_results(results)
         return results
 
@@ -381,9 +383,9 @@
             self.uri, union, varmap, args,
             prefix='ON THE FLY temp data insertion into %s from' % table)
         # generate sql queries if we are able to do so
-        sql, query_args = self._rql_sqlgen.generate(union, args, varmap)
+        sql, qargs, cbs = self._rql_sqlgen.generate(union, args, varmap)
         query = 'INSERT INTO %s %s' % (table, sql.encode(self._dbencoding))
-        self.doexec(session, query, self.merge_args(args, query_args))
+        self.doexec(session, query, self.merge_args(args, qargs))
 
     def manual_insert(self, results, table, session):
         """insert given result into a temporary table on the system source"""
--- a/server/sources/rql2sql.py	Thu Mar 25 13:49:07 2010 +0100
+++ b/server/sources/rql2sql.py	Thu Mar 25 13:59:47 2010 +0100
@@ -33,16 +33,30 @@
 
 import threading
 
+from logilab.database import FunctionDescr, SQL_FUNCTIONS_REGISTRY
+
 from rql import BadRQLQuery, CoercionError
 from rql.stmts import Union, Select
 from rql.nodes import (SortTerm, VariableRef, Constant, Function, Not,
                        Variable, ColumnAlias, Relation, SubQuery, Exists)
 
+from cubicweb import QueryError
 from cubicweb.server.sqlutils import SQL_PREFIX
 from cubicweb.server.utils import cleanup_solutions
 
 ColumnAlias._q_invariant = False # avoid to check for ColumnAlias / Variable
 
+FunctionDescr.source_execute = None
+
+def default_update_cb_stack(self, stack):
+    stack.append(self.source_execute)
+FunctionDescr.update_cb_stack = default_update_cb_stack
+
+LENGTH = SQL_FUNCTIONS_REGISTRY.get_function('LENGTH')
+def length_source_execute(source, value):
+    return len(value.getvalue())
+LENGTH.source_execute = length_source_execute
+
 def _new_var(select, varname):
     newvar = select.get_variable(varname)
     if not 'relations' in newvar.stinfo:
@@ -252,14 +266,44 @@
                         selectedidx.append(vref.name)
                         rqlst.selection.append(vref)
 
-# IGenerator implementation for RQL->SQL ######################################
+def iter_mapped_var_sels(stmt, variable):
+    # variable is a Variable or ColumnAlias node mapped to a source side
+    # callback
+    if not (len(variable.stinfo['rhsrelations']) <= 1 and # < 1 on column alias
+            variable.stinfo['selected']):
+        raise QueryError("can't use %s as a restriction variable"
+                         % variable.name)
+    for selectidx in variable.stinfo['selected']:
+        vrefs = stmt.selection[selectidx].get_nodes(VariableRef)
+        if len(vrefs) != 1:
+            raise QueryError()
+        yield selectidx, vrefs[0]
 
+def update_source_cb_stack(state, stmt, node, stack):
+    while True:
+        node = node.parent
+        if node is stmt:
+            break
+        if not isinstance(node, Function):
+            raise QueryError()
+        func = SQL_FUNCTIONS_REGISTRY.get_function(node.name)
+        if func.source_execute is None:
+            raise QueryError('%s can not be called on mapped attribute'
+                             % node.name)
+        state.source_cb_funcs.add(node)
+        func.update_cb_stack(stack)
+
+
+# IGenerator implementation for RQL->SQL #######################################
 
 class StateInfo(object):
     def __init__(self, existssols, unstablevars):
         self.existssols = existssols
         self.unstablevars = unstablevars
         self.subtables = {}
+        self.needs_source_cb = None
+        self.subquery_source_cb = None
+        self.source_cb_funcs = set()
 
     def reset(self, solution):
         """reset some visit variables"""
@@ -276,6 +320,17 @@
         self.restrictions = []
         self._restr_stack = []
         self.ignore_varmap = False
+        self._needs_source_cb = {}
+
+    def merge_source_cbs(self, needs_source_cb):
+        if self.needs_source_cb is None:
+            self.needs_source_cb = needs_source_cb
+        elif needs_source_cb != self.needs_source_cb:
+            raise QueryError('query fetch some source mapped attribute, some not')
+
+    def finalize_source_cbs(self):
+        if self.subquery_source_cb is not None:
+            self.needs_source_cb.update(self.subquery_source_cb)
 
     def add_restriction(self, restr):
         if restr:
@@ -373,7 +428,7 @@
             # union query for each rqlst / solution
             sql = self.union_sql(union)
             # we are done
-            return sql, self._query_attrs
+            return sql, self._query_attrs, self._state.needs_source_cb
         finally:
             self._lock.release()
 
@@ -436,6 +491,9 @@
         else:
             existssols, unstable = {}, ()
         state = StateInfo(existssols, unstable)
+        if self._state is not None:
+            # state from a previous unioned select
+            state.merge_source_cbs(self._state.needs_source_cb)
         # treat subqueries
         self._subqueries_sql(select, state)
         # generate sql for this select node
@@ -491,6 +549,7 @@
                 if fneedwrap:
                     selection = ['T1.C%s' % i for i in xrange(len(origselection))]
                     sql = 'SELECT %s FROM (%s) AS T1' % (','.join(selection), sql)
+            state.finalize_source_cbs()
         finally:
             select.selection = origselection
         # limit / offset
@@ -508,10 +567,21 @@
             tablealias = '_T%s' % i # XXX nested subqueries
             sql = '(%s) AS %s' % (sql, tablealias)
             state.subtables[tablealias] = (0, sql)
+            latest_state = self._state
             for vref in subquery.aliases:
                 alias = vref.variable
                 alias._q_sqltable = tablealias
                 alias._q_sql = '%s.C%s' % (tablealias, alias.colnum)
+                try:
+                    stack = latest_state.needs_source_cb[alias.colnum]
+                    if state.subquery_source_cb is None:
+                        state.subquery_source_cb = {}
+                    for selectidx, vref in iter_mapped_var_sels(select, alias):
+                        stack = stack[:]
+                        update_source_cb_stack(state, select, vref, stack)
+                        state.subquery_source_cb[selectidx] = stack
+                except KeyError:
+                    continue
 
     def _solutions_sql(self, select, solutions, distinct, needalias):
         sqls = []
@@ -523,6 +593,7 @@
             sql = [self._selection_sql(select.selection, distinct, needalias)]
             if self._state.restrictions:
                 sql.append('WHERE %s' % ' AND '.join(self._state.restrictions))
+            self._state.merge_source_cbs(self._state._needs_source_cb)
             # add required tables
             assert len(self._state.actual_tables) == 1, self._state.actual_tables
             tables = self._state.actual_tables[-1]
@@ -895,7 +966,13 @@
             except KeyError:
                 mapkey = '%s.%s' % (self._state.solution[lhs.name], rel.r_type)
                 if mapkey in self.attr_map:
-                    lhssql = self.attr_map[mapkey](self, lhs.variable, rel)
+                    cb, sourcecb = self.attr_map[mapkey]
+                    if sourcecb:
+                        # callback is a source callback, we can't use this
+                        # attribute in restriction
+                        raise QueryError("can't use %s (%s) in restriction"
+                                         % (mapkey, rel.as_string()))
+                    lhssql = cb(self, lhs.variable, rel)
                 elif rel.r_type == 'eid':
                     lhssql = lhs.variable._q_sql
                 else:
@@ -987,9 +1064,13 @@
 
     def visit_function(self, func):
         """generate SQL name for a function"""
-        # func_sql_call will check function is supported by the backend
-        return self.dbms_helper.func_as_sql(func.name,
-                                            [c.accept(self) for c in func.children])
+        args = [c.accept(self) for c in func.children]
+        if func in self._state.source_cb_funcs:
+            # function executed as a callback on the source
+            assert len(args) == 1
+            return args[0]
+        # func_as_sql will check function is supported by the backend
+        return self.dbhelper.func_as_sql(func.name, args)
 
     def visit_constant(self, constant):
         """generate SQL name for a constant"""
@@ -1156,12 +1237,20 @@
         if isinstance(linkedvar, ColumnAlias):
             raise BadRQLQuery('variable %s should be selected by the subquery'
                               % variable.name)
-        mapkey = '%s.%s' % (self._state.solution[linkedvar.name], rel.r_type)
-        if mapkey in self.attr_map:
-            return self.attr_map[mapkey](self, linkedvar, rel)
         try:
             sql = self._varmap['%s.%s' % (linkedvar.name, rel.r_type)]
         except KeyError:
+            mapkey = '%s.%s' % (self._state.solution[linkedvar.name], rel.r_type)
+            if mapkey in self.attr_map:
+                cb, sourcecb = self.attr_map[mapkey]
+                if not sourcecb:
+                    return cb(self, linkedvar, rel)
+                # attribute mapped at the source level (bfss for instance)
+                stmt = rel.stmt
+                for selectidx, vref in iter_mapped_var_sels(stmt, variable):
+                    stack = [cb]
+                    update_source_cb_stack(self._state, stmt, vref, stack)
+                    self._state._needs_source_cb[selectidx] = stack
             linkedvar.accept(self)
             sql = '%s.%s%s' % (linkedvar._q_sqltable, SQL_PREFIX, rel.r_type)
         return sql
--- a/server/sources/storages.py	Thu Mar 25 13:49:07 2010 +0100
+++ b/server/sources/storages.py	Thu Mar 25 13:59:47 2010 +0100
@@ -11,10 +11,31 @@
     repo.system_source.unset_storage(etype, attr)
 
 class Storage(object):
-    """abstract storage"""
-    def sqlgen_callback(self, generator, relation, linkedvar):
-        """sql generator callback when some attribute with a custom storage is
-        accessed
+    """abstract storage
+
+    * If `source_callback` is true (by default), the callback will be run during
+      query result process of fetched attribute's valu and should have the
+      following prototype::
+
+        callback(self, source, value)
+
+      where `value` is the value actually stored in the backend. None values
+      will be skipped (eg callback won't be called).
+
+    * if `source_callback` is false, the callback will be run during sql
+      generation when some attribute with a custom storage is accessed and
+      should have the following prototype::
+
+        callback(self, generator, relation, linkedvar)
+
+      where `generator` is the sql generator, `relation` the current rql syntax
+      tree relation and linkedvar the principal syntax tree variable holding the
+      attribute.
+    """
+    is_source_callback = True
+
+    def callback(self, *args):
+        """see docstring for prototype, which vary according to is_source_callback
         """
         raise NotImplementedError()
 
@@ -38,14 +59,16 @@
     def __init__(self, defaultdir):
         self.default_directory = defaultdir
 
-    def sqlgen_callback(self, generator, linkedvar, relation):
+    def callback(self, source, value):
         """sql generator callback when some attribute with a custom storage is
         accessed
         """
-        linkedvar.accept(generator)
-        return '_fsopen(%s.cw_%s)' % (
-            linkedvar._q_sql.split('.', 1)[0], # table name
-            relation.r_type) # attribute name
+        fpath = source.binary_to_str(value)
+        try:
+            return Binary(file(fpath).read())
+        except OSError, ex:
+            source.critical("can't open %s: %s", value, ex)
+            return None
 
     def entity_added(self, entity, attr):
         """an entity using this storage for attr has been added"""
--- a/server/sqlutils.py	Thu Mar 25 13:49:07 2010 +0100
+++ b/server/sqlutils.py	Thu Mar 25 13:59:47 2010 +0100
@@ -188,9 +188,17 @@
             return newargs
         return query_args
 
-    def process_result(self, cursor):
+    def process_result(self, cursor, column_callbacks=None):
         """return a list of CubicWeb compliant values from data in the given cursor
         """
+        # use two different implementations to avoid paying the price of
+        # callback lookup for each *cell* in results when there is nothing to
+        # lookup
+        if not column_callbacks:
+            return self._process_result(cursor)
+        return self._cb_process_result(cursor, column_callbacks)
+
+    def _process_result(self, cursor, column_callbacks=None):
         # begin bind to locals for optimization
         descr = cursor.description
         encoding = self._dbencoding
@@ -208,6 +216,30 @@
             results[i] = result
         return results
 
+    def _cb_process_result(self, cursor, column_callbacks):
+        # begin bind to locals for optimization
+        descr = cursor.description
+        encoding = self._dbencoding
+        process_value = self._process_value
+        binary = Binary
+        # /end
+        results = cursor.fetchall()
+        for i, line in enumerate(results):
+            result = []
+            for col, value in enumerate(line):
+                if value is None:
+                    result.append(value)
+                    continue
+                cbstack = column_callbacks.get(col, None)
+                if cbstack is None:
+                    value = process_value(value, descr[col], encoding, binary)
+                else:
+                    for cb in cbstack:
+                        value = cb(self, value)
+                result.append(value)
+            results[i] = result
+        return results
+
     def preprocess_entity(self, entity):
         """return a dictionary to use as extra argument to cursor.execute
         to insert/update an entity into a SQL database
@@ -277,28 +309,5 @@
     import yams.constraints
     yams.constraints.patch_sqlite_decimal()
 
-    def fspath(eid, etype, attr):
-        try:
-            cu = cnx.cursor()
-            cu.execute('SELECT X.cw_%s FROM cw_%s as X '
-                       'WHERE X.cw_eid=%%(eid)s' % (attr, etype),
-                       {'eid': eid})
-            return cu.fetchone()[0]
-        except:
-            import traceback
-            traceback.print_exc()
-            raise
-    cnx.create_function('fspath', 3, fspath)
-
-    def _fsopen(fspath):
-        if fspath:
-            try:
-                return buffer(file(fspath).read())
-            except:
-                import traceback
-                traceback.print_exc()
-                raise
-    cnx.create_function('_fsopen', 1, _fsopen)
-
 sqlite_hooks = SQL_CONNECT_HOOKS.setdefault('sqlite', [])
 sqlite_hooks.append(init_sqlite_connexion)
--- a/server/test/unittest_rql2sql.py	Thu Mar 25 13:49:07 2010 +0100
+++ b/server/test/unittest_rql2sql.py	Thu Mar 25 13:59:47 2010 +0100
@@ -1113,8 +1113,8 @@
             args = {'text': 'hip hop momo'}
         try:
             union = self._prepare(rql)
-            r, nargs = self.o.generate(union, args,
-                                      varmap=varmap)
+            r, nargs, cbs = self.o.generate(union, args,
+                                            varmap=varmap)
             args.update(nargs)
             self.assertLinesEquals((r % args).strip(), self._norm_sql(sql), striplines=True)
         except Exception, ex:
@@ -1135,7 +1135,7 @@
     def _checkall(self, rql, sql):
         try:
             rqlst = self._prepare(rql)
-            r, args = self.o.generate(rqlst)
+            r, args, cbs = self.o.generate(rqlst)
             self.assertEqual((r.strip(), args), sql)
         except Exception, ex:
             print rql
@@ -1197,7 +1197,7 @@
 
     def test_is_null_transform(self):
         union = self._prepare('Any X WHERE X login %(login)s')
-        r, args = self.o.generate(union, {'login': None})
+        r, args, cbs = self.o.generate(union, {'login': None})
         self.assertLinesEquals((r % args).strip(),
                                '''SELECT _X.cw_eid
 FROM cw_CWUser AS _X
@@ -1386,11 +1386,11 @@
                     '''SELECT COUNT(1)
 WHERE EXISTS(SELECT 1 FROM owned_by_relation AS rel_owned_by0, cw_Affaire AS _P WHERE rel_owned_by0.eid_from=_P.cw_eid AND rel_owned_by0.eid_to=1 UNION SELECT 1 FROM owned_by_relation AS rel_owned_by1, cw_Note AS _P WHERE rel_owned_by1.eid_from=_P.cw_eid AND rel_owned_by1.eid_to=1)''')
 
-    def test_attr_map(self):
+    def test_attr_map_sqlcb(self):
         def generate_ref(gen, linkedvar, rel):
             linkedvar.accept(gen)
             return 'VERSION_DATA(%s)' % linkedvar._q_sql
-        self.o.attr_map['Affaire.ref'] = generate_ref
+        self.o.attr_map['Affaire.ref'] = (generate_ref, False)
         try:
             self._check('Any R WHERE X ref R',
                         '''SELECT VERSION_DATA(_X.cw_eid)
@@ -1402,6 +1402,17 @@
         finally:
             self.o.attr_map.clear()
 
+    def test_attr_map_sourcecb(self):
+        cb = lambda x,y: None
+        self.o.attr_map['Affaire.ref'] = (cb, True)
+        try:
+            union = self._prepare('Any R WHERE X ref R')
+            r, nargs, cbs = self.o.generate(union, args={})
+            self.assertLinesEquals(r.strip(), 'SELECT _X.cw_ref\nFROM cw_Affaire AS _X')
+            self.assertEquals(cbs, {0: [cb]})
+        finally:
+            self.o.attr_map.clear()
+
 
 class SqliteSQLGeneratorTC(PostgresSQLGeneratorTC):
 
--- a/server/test/unittest_storage.py	Thu Mar 25 13:49:07 2010 +0100
+++ b/server/test/unittest_storage.py	Thu Mar 25 13:59:47 2010 +0100
@@ -13,7 +13,7 @@
 import shutil
 import tempfile
 
-from cubicweb import Binary
+from cubicweb import Binary, QueryError
 from cubicweb.selectors import implements
 from cubicweb.server.sources import storages
 from cubicweb.server.hook import Hook, Operation
@@ -80,7 +80,7 @@
     def test_bfss_sqlite_fspath(self):
         f1 = self.create_file()
         expected_filepath = osp.join(self.tempdir, '%s_data' % f1.eid)
-        fspath = self.execute('Any fspath(F, "File", "data") WHERE F eid %(f)s',
+        fspath = self.execute('Any fspath(D) WHERE F eid %(f)s, F data D',
                               {'f': f1.eid})[0][0]
         self.assertEquals(fspath.getvalue(), expected_filepath)
 
@@ -88,7 +88,7 @@
         self.session.transaction_data['fs_importing'] = True
         f1 = self.session.create_entity('File', data=Binary('/the/path'),
                                         data_format=u'text/plain', data_name=u'foo')
-        fspath = self.execute('Any fspath(F, "File", "data") WHERE F eid %(f)s',
+        fspath = self.execute('Any fspath(D) WHERE F eid %(f)s, F data D',
                               {'f': f1.eid})[0][0]
         self.assertEquals(fspath.getvalue(), '/the/path')
 
@@ -102,5 +102,63 @@
             self.vreg.unregister(DummyBeforeHook)
             self.vreg.unregister(DummyAfterHook)
 
+    def test_source_mapped_attribute_error_cases(self):
+        ex = self.assertRaises(QueryError, self.execute,
+                               'Any X WHERE X data ~= "hop", X is File')
+        self.assertEquals(str(ex), 'can\'t use File.data (X data ILIKE "hop") in restriction')
+        ex = self.assertRaises(QueryError, self.execute,
+                               'Any X, Y WHERE X data D, Y data D, '
+                               'NOT X identity Y, X is File, Y is File')
+        self.assertEquals(str(ex), "can't use D as a restriction variable")
+        # query returning mix of mapped / regular attributes (only file.data
+        # mapped, not image.data for instance)
+        ex = self.assertRaises(QueryError, self.execute,
+                               'Any X WITH X BEING ('
+                               ' (Any NULL)'
+                               '  UNION '
+                               ' (Any D WHERE X data D, X is File)'
+                               ')')
+        self.assertEquals(str(ex), 'query fetch some source mapped attribute, some not')
+        ex = self.assertRaises(QueryError, self.execute,
+                               '(Any D WHERE X data D, X is File)'
+                               ' UNION '
+                               '(Any D WHERE X data D, X is Image)')
+        self.assertEquals(str(ex), 'query fetch some source mapped attribute, some not')
+        ex = self.assertRaises(QueryError,
+                               self.execute, 'Any D WHERE X data D')
+        self.assertEquals(str(ex), 'query fetch some source mapped attribute, some not')
+
+    def test_source_mapped_attribute_advanced(self):
+        f1 = self.create_file()
+        rset = self.execute('Any X,D WITH D,X BEING ('
+                            ' (Any D, X WHERE X eid %(x)s, X data D)'
+                            '  UNION '
+                            ' (Any D, X WHERE X eid %(x)s, X data D)'
+                            ')', {'x': f1.eid}, 'x')
+        self.assertEquals(len(rset), 2)
+        self.assertEquals(rset[0][0], f1.eid)
+        self.assertEquals(rset[1][0], f1.eid)
+        self.assertEquals(rset[0][1].getvalue(), 'the-data')
+        self.assertEquals(rset[1][1].getvalue(), 'the-data')
+        rset = self.execute('Any X,LENGTH(D) WHERE X eid %(x)s, X data D',
+                            {'x': f1.eid}, 'x')
+        self.assertEquals(len(rset), 1)
+        self.assertEquals(rset[0][0], f1.eid)
+        self.assertEquals(rset[0][1], len('the-data'))
+        rset = self.execute('Any X,LENGTH(D) WITH D,X BEING ('
+                            ' (Any D, X WHERE X eid %(x)s, X data D)'
+                            '  UNION '
+                            ' (Any D, X WHERE X eid %(x)s, X data D)'
+                            ')', {'x': f1.eid}, 'x')
+        self.assertEquals(len(rset), 2)
+        self.assertEquals(rset[0][0], f1.eid)
+        self.assertEquals(rset[1][0], f1.eid)
+        self.assertEquals(rset[0][1], len('the-data'))
+        self.assertEquals(rset[1][1], len('the-data'))
+        ex = self.assertRaises(QueryError, self.execute,
+                               'Any X,UPPER(D) WHERE X eid %(x)s, X data D',
+                               {'x': f1.eid}, 'x')
+        self.assertEquals(str(ex), 'UPPER can not be called on mapped attribute')
+
 if __name__ == '__main__':
     unittest_main()