[source storage] refactor source sql generation and results handling to allow repository side callbacks
for instance with the BytesFileSystemStorage, before this change:
* fspath, _fsopen function were stored procedures executed on the database
-> files had to be available both on the repository *and* the database host
* we needed implementation for each handled database
Now, those function are python callbacks executed when necessary on the
repository side, on data comming from the database.
The litle cons are:
* you can't do anymore restriction on mapped attributes
* you can't write queries which will return in the same rset column
some mapped attributes (or not mapped the same way) / some not
This seems much acceptable since:
* it's much more easy to handle when you start having the db on another host
than the repo
* BFSS works seemlessly on any backend now
* you don't bother that much about the cons (at least in the bfss case):
you usually don't do any restriction on Bytes...
Bonus points: BFSS is more efficient (no queries under the cover as it
was done in the registered procedure) and we have a much nicer/efficient
fspath implementation.
IMO, that rocks :D
--- a/cwconfig.py Thu Mar 25 13:49:07 2010 +0100
+++ b/cwconfig.py Thu Mar 25 13:59:47 2010 +0100
@@ -90,7 +90,8 @@
from logilab.common.configuration import (Configuration, Method,
ConfigurationMixIn, merge_options)
-from cubicweb import CW_SOFTWARE_ROOT, CW_MIGRATION_MAP, ConfigurationError
+from cubicweb import (CW_SOFTWARE_ROOT, CW_MIGRATION_MAP,
+ ConfigurationError, Binary)
from cubicweb.toolsutils import env_path, create_dir
CONFIGURATIONS = []
@@ -1050,7 +1051,24 @@
class FSPATH(FunctionDescr):
- supported_backends = ('postgres', 'sqlite',)
- rtype = 'Bytes'
+ """return path of some bytes attribute stored using the Bytes
+ File-System Storage (bfss)
+ """
+ rtype = 'Bytes' # XXX return a String? potential pb with fs encoding
+
+ def update_cb_stack(self, stack):
+ assert len(stack) == 1
+ stack[0] = self.source_execute
+
+ def as_sql(self, backend, args):
+ raise NotImplementedError('source only callback')
+
+ def source_execute(self, source, value):
+ fpath = source.binary_to_str(value)
+ try:
+ return Binary(fpath)
+ except OSError, ex:
+ self.critical("can't open %s: %s", fpath, ex)
+ return None
register_function(FSPATH)
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/misc/migration/3.7.2_Any.py Thu Mar 25 13:59:47 2010 +0100
@@ -0,0 +1,2 @@
+sql('DROP FUNCTION IF EXISTS _fsopen')
+sql('DROP FUNCTION IF EXISTS fspath')
--- a/schemas/_regproc_bss.postgres.sql Thu Mar 25 13:49:07 2010 +0100
+++ /dev/null Thu Jan 01 00:00:00 1970 +0000
@@ -1,38 +0,0 @@
-/* -*- sql -*-
-
- postgres specific registered procedures for the Bytes File System storage,
- require the plpythonu language installed
-
-*/
-
-
-CREATE OR REPLACE FUNCTION _fsopen(bytea) RETURNS bytea AS $$
- fpath = args[0]
- if fpath:
- try:
- data = file(fpath, 'rb').read()
- #/* XXX due to plpython bug we have to replace some characters... */
- return data.replace("\\", r"\134").replace("\000", r"\000").replace("'", r"\047") #'
- except Exception, ex:
- plpy.warning('failed to get content for %s: %s', fpath, ex)
- return None
-$$ LANGUAGE plpythonu
-/* WITH(ISCACHABLE) XXX does postgres handle caching of large data nicely */
-;;
-
-/* fspath(eid, entity type, attribute) */
-CREATE OR REPLACE FUNCTION fspath(bigint, text, text) RETURNS bytea AS $$
- pkey = 'plan%s%s' % (args[1], args[2])
- try:
- plan = SD[pkey]
- except KeyError:
- #/* then prepare and cache plan to get versioned file information from a
- # version content eid */
- plan = plpy.prepare(
- 'SELECT X.cw_%s FROM cw_%s as X WHERE X.cw_eid=$1' % (args[2], args[1]),
- ['bigint'])
- SD[pkey] = plan
- return plpy.execute(plan, [args[0]])[0]['cw_' + args[2]]
-$$ LANGUAGE plpythonu
-/* WITH(ISCACHABLE) XXX does postgres handle caching of large data nicely */
-;;
--- a/server/sources/extlite.py Thu Mar 25 13:49:07 2010 +0100
+++ b/server/sources/extlite.py Thu Mar 25 13:59:47 2010 +0100
@@ -187,9 +187,10 @@
if self._need_sql_create:
return []
assert dbg_st_search(self.uri, union, varmap, args, cachekey)
- sql, query_args = self.rqlsqlgen.generate(union, args)
- args = self.sqladapter.merge_args(args, query_args)
- results = self.sqladapter.process_result(self.doexec(session, sql, args))
+ sql, qargs, cbs = self.rqlsqlgen.generate(union, args)
+ args = self.sqladapter.merge_args(args, qargs)
+ cursor = self.doexec(session, sql, args)
+ results = self.sqladapter.process_result(cursor, cbs)
assert dbg_results(results)
return results
--- a/server/sources/native.py Thu Mar 25 13:49:07 2010 +0100
+++ b/server/sources/native.py Thu Mar 25 13:59:47 2010 +0100
@@ -264,8 +264,9 @@
def init(self):
self.init_creating()
- def map_attribute(self, etype, attr, cb):
- self._rql_sqlgen.attr_map['%s.%s' % (etype, attr)] = cb
+ # XXX deprecates [un]map_attribute ?
+ def map_attribute(self, etype, attr, cb, sourcedb=True):
+ self._rql_sqlgen.attr_map['%s.%s' % (etype, attr)] = (cb, sourcedb)
def unmap_attribute(self, etype, attr):
self._rql_sqlgen.attr_map.pop('%s.%s' % (etype, attr), None)
@@ -273,7 +274,8 @@
def set_storage(self, etype, attr, storage):
storage_dict = self._storages.setdefault(etype, {})
storage_dict[attr] = storage
- self.map_attribute(etype, attr, storage.sqlgen_callback)
+ self.map_attribute(etype, attr,
+ storage.callback, storage.is_source_callback)
def unset_storage(self, etype, attr):
self._storages[etype].pop(attr)
@@ -348,17 +350,17 @@
if cachekey is None:
self.no_cache += 1
# generate sql query if we are able to do so (not supported types...)
- sql, query_args = self._rql_sqlgen.generate(union, args, varmap)
+ sql, qargs, cbs = self._rql_sqlgen.generate(union, args, varmap)
else:
# sql may be cached
try:
- sql, query_args = self._cache[cachekey]
+ sql, qargs, cbs = self._cache[cachekey]
self.cache_hit += 1
except KeyError:
self.cache_miss += 1
- sql, query_args = self._rql_sqlgen.generate(union, args, varmap)
- self._cache[cachekey] = sql, query_args
- args = self.merge_args(args, query_args)
+ sql, qargs, cbs = self._rql_sqlgen.generate(union, args, varmap)
+ self._cache[cachekey] = sql, qargs, cbs
+ args = self.merge_args(args, qargs)
assert isinstance(sql, basestring), repr(sql)
try:
cursor = self.doexec(session, sql, args)
@@ -367,7 +369,7 @@
self.info("request failed '%s' ... retry with a new cursor", sql)
session.pool.reconnect(self)
cursor = self.doexec(session, sql, args)
- results = self.process_result(cursor)
+ results = self.process_result(cursor, cbs)
assert dbg_results(results)
return results
@@ -381,9 +383,9 @@
self.uri, union, varmap, args,
prefix='ON THE FLY temp data insertion into %s from' % table)
# generate sql queries if we are able to do so
- sql, query_args = self._rql_sqlgen.generate(union, args, varmap)
+ sql, qargs, cbs = self._rql_sqlgen.generate(union, args, varmap)
query = 'INSERT INTO %s %s' % (table, sql.encode(self._dbencoding))
- self.doexec(session, query, self.merge_args(args, query_args))
+ self.doexec(session, query, self.merge_args(args, qargs))
def manual_insert(self, results, table, session):
"""insert given result into a temporary table on the system source"""
--- a/server/sources/rql2sql.py Thu Mar 25 13:49:07 2010 +0100
+++ b/server/sources/rql2sql.py Thu Mar 25 13:59:47 2010 +0100
@@ -33,16 +33,30 @@
import threading
+from logilab.database import FunctionDescr, SQL_FUNCTIONS_REGISTRY
+
from rql import BadRQLQuery, CoercionError
from rql.stmts import Union, Select
from rql.nodes import (SortTerm, VariableRef, Constant, Function, Not,
Variable, ColumnAlias, Relation, SubQuery, Exists)
+from cubicweb import QueryError
from cubicweb.server.sqlutils import SQL_PREFIX
from cubicweb.server.utils import cleanup_solutions
ColumnAlias._q_invariant = False # avoid to check for ColumnAlias / Variable
+FunctionDescr.source_execute = None
+
+def default_update_cb_stack(self, stack):
+ stack.append(self.source_execute)
+FunctionDescr.update_cb_stack = default_update_cb_stack
+
+LENGTH = SQL_FUNCTIONS_REGISTRY.get_function('LENGTH')
+def length_source_execute(source, value):
+ return len(value.getvalue())
+LENGTH.source_execute = length_source_execute
+
def _new_var(select, varname):
newvar = select.get_variable(varname)
if not 'relations' in newvar.stinfo:
@@ -252,14 +266,44 @@
selectedidx.append(vref.name)
rqlst.selection.append(vref)
-# IGenerator implementation for RQL->SQL ######################################
+def iter_mapped_var_sels(stmt, variable):
+ # variable is a Variable or ColumnAlias node mapped to a source side
+ # callback
+ if not (len(variable.stinfo['rhsrelations']) <= 1 and # < 1 on column alias
+ variable.stinfo['selected']):
+ raise QueryError("can't use %s as a restriction variable"
+ % variable.name)
+ for selectidx in variable.stinfo['selected']:
+ vrefs = stmt.selection[selectidx].get_nodes(VariableRef)
+ if len(vrefs) != 1:
+ raise QueryError()
+ yield selectidx, vrefs[0]
+def update_source_cb_stack(state, stmt, node, stack):
+ while True:
+ node = node.parent
+ if node is stmt:
+ break
+ if not isinstance(node, Function):
+ raise QueryError()
+ func = SQL_FUNCTIONS_REGISTRY.get_function(node.name)
+ if func.source_execute is None:
+ raise QueryError('%s can not be called on mapped attribute'
+ % node.name)
+ state.source_cb_funcs.add(node)
+ func.update_cb_stack(stack)
+
+
+# IGenerator implementation for RQL->SQL #######################################
class StateInfo(object):
def __init__(self, existssols, unstablevars):
self.existssols = existssols
self.unstablevars = unstablevars
self.subtables = {}
+ self.needs_source_cb = None
+ self.subquery_source_cb = None
+ self.source_cb_funcs = set()
def reset(self, solution):
"""reset some visit variables"""
@@ -276,6 +320,17 @@
self.restrictions = []
self._restr_stack = []
self.ignore_varmap = False
+ self._needs_source_cb = {}
+
+ def merge_source_cbs(self, needs_source_cb):
+ if self.needs_source_cb is None:
+ self.needs_source_cb = needs_source_cb
+ elif needs_source_cb != self.needs_source_cb:
+ raise QueryError('query fetch some source mapped attribute, some not')
+
+ def finalize_source_cbs(self):
+ if self.subquery_source_cb is not None:
+ self.needs_source_cb.update(self.subquery_source_cb)
def add_restriction(self, restr):
if restr:
@@ -373,7 +428,7 @@
# union query for each rqlst / solution
sql = self.union_sql(union)
# we are done
- return sql, self._query_attrs
+ return sql, self._query_attrs, self._state.needs_source_cb
finally:
self._lock.release()
@@ -436,6 +491,9 @@
else:
existssols, unstable = {}, ()
state = StateInfo(existssols, unstable)
+ if self._state is not None:
+ # state from a previous unioned select
+ state.merge_source_cbs(self._state.needs_source_cb)
# treat subqueries
self._subqueries_sql(select, state)
# generate sql for this select node
@@ -491,6 +549,7 @@
if fneedwrap:
selection = ['T1.C%s' % i for i in xrange(len(origselection))]
sql = 'SELECT %s FROM (%s) AS T1' % (','.join(selection), sql)
+ state.finalize_source_cbs()
finally:
select.selection = origselection
# limit / offset
@@ -508,10 +567,21 @@
tablealias = '_T%s' % i # XXX nested subqueries
sql = '(%s) AS %s' % (sql, tablealias)
state.subtables[tablealias] = (0, sql)
+ latest_state = self._state
for vref in subquery.aliases:
alias = vref.variable
alias._q_sqltable = tablealias
alias._q_sql = '%s.C%s' % (tablealias, alias.colnum)
+ try:
+ stack = latest_state.needs_source_cb[alias.colnum]
+ if state.subquery_source_cb is None:
+ state.subquery_source_cb = {}
+ for selectidx, vref in iter_mapped_var_sels(select, alias):
+ stack = stack[:]
+ update_source_cb_stack(state, select, vref, stack)
+ state.subquery_source_cb[selectidx] = stack
+ except KeyError:
+ continue
def _solutions_sql(self, select, solutions, distinct, needalias):
sqls = []
@@ -523,6 +593,7 @@
sql = [self._selection_sql(select.selection, distinct, needalias)]
if self._state.restrictions:
sql.append('WHERE %s' % ' AND '.join(self._state.restrictions))
+ self._state.merge_source_cbs(self._state._needs_source_cb)
# add required tables
assert len(self._state.actual_tables) == 1, self._state.actual_tables
tables = self._state.actual_tables[-1]
@@ -895,7 +966,13 @@
except KeyError:
mapkey = '%s.%s' % (self._state.solution[lhs.name], rel.r_type)
if mapkey in self.attr_map:
- lhssql = self.attr_map[mapkey](self, lhs.variable, rel)
+ cb, sourcecb = self.attr_map[mapkey]
+ if sourcecb:
+ # callback is a source callback, we can't use this
+ # attribute in restriction
+ raise QueryError("can't use %s (%s) in restriction"
+ % (mapkey, rel.as_string()))
+ lhssql = cb(self, lhs.variable, rel)
elif rel.r_type == 'eid':
lhssql = lhs.variable._q_sql
else:
@@ -987,9 +1064,13 @@
def visit_function(self, func):
"""generate SQL name for a function"""
- # func_sql_call will check function is supported by the backend
- return self.dbms_helper.func_as_sql(func.name,
- [c.accept(self) for c in func.children])
+ args = [c.accept(self) for c in func.children]
+ if func in self._state.source_cb_funcs:
+ # function executed as a callback on the source
+ assert len(args) == 1
+ return args[0]
+ # func_as_sql will check function is supported by the backend
+ return self.dbhelper.func_as_sql(func.name, args)
def visit_constant(self, constant):
"""generate SQL name for a constant"""
@@ -1156,12 +1237,20 @@
if isinstance(linkedvar, ColumnAlias):
raise BadRQLQuery('variable %s should be selected by the subquery'
% variable.name)
- mapkey = '%s.%s' % (self._state.solution[linkedvar.name], rel.r_type)
- if mapkey in self.attr_map:
- return self.attr_map[mapkey](self, linkedvar, rel)
try:
sql = self._varmap['%s.%s' % (linkedvar.name, rel.r_type)]
except KeyError:
+ mapkey = '%s.%s' % (self._state.solution[linkedvar.name], rel.r_type)
+ if mapkey in self.attr_map:
+ cb, sourcecb = self.attr_map[mapkey]
+ if not sourcecb:
+ return cb(self, linkedvar, rel)
+ # attribute mapped at the source level (bfss for instance)
+ stmt = rel.stmt
+ for selectidx, vref in iter_mapped_var_sels(stmt, variable):
+ stack = [cb]
+ update_source_cb_stack(self._state, stmt, vref, stack)
+ self._state._needs_source_cb[selectidx] = stack
linkedvar.accept(self)
sql = '%s.%s%s' % (linkedvar._q_sqltable, SQL_PREFIX, rel.r_type)
return sql
--- a/server/sources/storages.py Thu Mar 25 13:49:07 2010 +0100
+++ b/server/sources/storages.py Thu Mar 25 13:59:47 2010 +0100
@@ -11,10 +11,31 @@
repo.system_source.unset_storage(etype, attr)
class Storage(object):
- """abstract storage"""
- def sqlgen_callback(self, generator, relation, linkedvar):
- """sql generator callback when some attribute with a custom storage is
- accessed
+ """abstract storage
+
+ * If `source_callback` is true (by default), the callback will be run during
+ query result process of fetched attribute's valu and should have the
+ following prototype::
+
+ callback(self, source, value)
+
+ where `value` is the value actually stored in the backend. None values
+ will be skipped (eg callback won't be called).
+
+ * if `source_callback` is false, the callback will be run during sql
+ generation when some attribute with a custom storage is accessed and
+ should have the following prototype::
+
+ callback(self, generator, relation, linkedvar)
+
+ where `generator` is the sql generator, `relation` the current rql syntax
+ tree relation and linkedvar the principal syntax tree variable holding the
+ attribute.
+ """
+ is_source_callback = True
+
+ def callback(self, *args):
+ """see docstring for prototype, which vary according to is_source_callback
"""
raise NotImplementedError()
@@ -38,14 +59,16 @@
def __init__(self, defaultdir):
self.default_directory = defaultdir
- def sqlgen_callback(self, generator, linkedvar, relation):
+ def callback(self, source, value):
"""sql generator callback when some attribute with a custom storage is
accessed
"""
- linkedvar.accept(generator)
- return '_fsopen(%s.cw_%s)' % (
- linkedvar._q_sql.split('.', 1)[0], # table name
- relation.r_type) # attribute name
+ fpath = source.binary_to_str(value)
+ try:
+ return Binary(file(fpath).read())
+ except OSError, ex:
+ source.critical("can't open %s: %s", value, ex)
+ return None
def entity_added(self, entity, attr):
"""an entity using this storage for attr has been added"""
--- a/server/sqlutils.py Thu Mar 25 13:49:07 2010 +0100
+++ b/server/sqlutils.py Thu Mar 25 13:59:47 2010 +0100
@@ -188,9 +188,17 @@
return newargs
return query_args
- def process_result(self, cursor):
+ def process_result(self, cursor, column_callbacks=None):
"""return a list of CubicWeb compliant values from data in the given cursor
"""
+ # use two different implementations to avoid paying the price of
+ # callback lookup for each *cell* in results when there is nothing to
+ # lookup
+ if not column_callbacks:
+ return self._process_result(cursor)
+ return self._cb_process_result(cursor, column_callbacks)
+
+ def _process_result(self, cursor, column_callbacks=None):
# begin bind to locals for optimization
descr = cursor.description
encoding = self._dbencoding
@@ -208,6 +216,30 @@
results[i] = result
return results
+ def _cb_process_result(self, cursor, column_callbacks):
+ # begin bind to locals for optimization
+ descr = cursor.description
+ encoding = self._dbencoding
+ process_value = self._process_value
+ binary = Binary
+ # /end
+ results = cursor.fetchall()
+ for i, line in enumerate(results):
+ result = []
+ for col, value in enumerate(line):
+ if value is None:
+ result.append(value)
+ continue
+ cbstack = column_callbacks.get(col, None)
+ if cbstack is None:
+ value = process_value(value, descr[col], encoding, binary)
+ else:
+ for cb in cbstack:
+ value = cb(self, value)
+ result.append(value)
+ results[i] = result
+ return results
+
def preprocess_entity(self, entity):
"""return a dictionary to use as extra argument to cursor.execute
to insert/update an entity into a SQL database
@@ -277,28 +309,5 @@
import yams.constraints
yams.constraints.patch_sqlite_decimal()
- def fspath(eid, etype, attr):
- try:
- cu = cnx.cursor()
- cu.execute('SELECT X.cw_%s FROM cw_%s as X '
- 'WHERE X.cw_eid=%%(eid)s' % (attr, etype),
- {'eid': eid})
- return cu.fetchone()[0]
- except:
- import traceback
- traceback.print_exc()
- raise
- cnx.create_function('fspath', 3, fspath)
-
- def _fsopen(fspath):
- if fspath:
- try:
- return buffer(file(fspath).read())
- except:
- import traceback
- traceback.print_exc()
- raise
- cnx.create_function('_fsopen', 1, _fsopen)
-
sqlite_hooks = SQL_CONNECT_HOOKS.setdefault('sqlite', [])
sqlite_hooks.append(init_sqlite_connexion)
--- a/server/test/unittest_rql2sql.py Thu Mar 25 13:49:07 2010 +0100
+++ b/server/test/unittest_rql2sql.py Thu Mar 25 13:59:47 2010 +0100
@@ -1113,8 +1113,8 @@
args = {'text': 'hip hop momo'}
try:
union = self._prepare(rql)
- r, nargs = self.o.generate(union, args,
- varmap=varmap)
+ r, nargs, cbs = self.o.generate(union, args,
+ varmap=varmap)
args.update(nargs)
self.assertLinesEquals((r % args).strip(), self._norm_sql(sql), striplines=True)
except Exception, ex:
@@ -1135,7 +1135,7 @@
def _checkall(self, rql, sql):
try:
rqlst = self._prepare(rql)
- r, args = self.o.generate(rqlst)
+ r, args, cbs = self.o.generate(rqlst)
self.assertEqual((r.strip(), args), sql)
except Exception, ex:
print rql
@@ -1197,7 +1197,7 @@
def test_is_null_transform(self):
union = self._prepare('Any X WHERE X login %(login)s')
- r, args = self.o.generate(union, {'login': None})
+ r, args, cbs = self.o.generate(union, {'login': None})
self.assertLinesEquals((r % args).strip(),
'''SELECT _X.cw_eid
FROM cw_CWUser AS _X
@@ -1386,11 +1386,11 @@
'''SELECT COUNT(1)
WHERE EXISTS(SELECT 1 FROM owned_by_relation AS rel_owned_by0, cw_Affaire AS _P WHERE rel_owned_by0.eid_from=_P.cw_eid AND rel_owned_by0.eid_to=1 UNION SELECT 1 FROM owned_by_relation AS rel_owned_by1, cw_Note AS _P WHERE rel_owned_by1.eid_from=_P.cw_eid AND rel_owned_by1.eid_to=1)''')
- def test_attr_map(self):
+ def test_attr_map_sqlcb(self):
def generate_ref(gen, linkedvar, rel):
linkedvar.accept(gen)
return 'VERSION_DATA(%s)' % linkedvar._q_sql
- self.o.attr_map['Affaire.ref'] = generate_ref
+ self.o.attr_map['Affaire.ref'] = (generate_ref, False)
try:
self._check('Any R WHERE X ref R',
'''SELECT VERSION_DATA(_X.cw_eid)
@@ -1402,6 +1402,17 @@
finally:
self.o.attr_map.clear()
+ def test_attr_map_sourcecb(self):
+ cb = lambda x,y: None
+ self.o.attr_map['Affaire.ref'] = (cb, True)
+ try:
+ union = self._prepare('Any R WHERE X ref R')
+ r, nargs, cbs = self.o.generate(union, args={})
+ self.assertLinesEquals(r.strip(), 'SELECT _X.cw_ref\nFROM cw_Affaire AS _X')
+ self.assertEquals(cbs, {0: [cb]})
+ finally:
+ self.o.attr_map.clear()
+
class SqliteSQLGeneratorTC(PostgresSQLGeneratorTC):
--- a/server/test/unittest_storage.py Thu Mar 25 13:49:07 2010 +0100
+++ b/server/test/unittest_storage.py Thu Mar 25 13:59:47 2010 +0100
@@ -13,7 +13,7 @@
import shutil
import tempfile
-from cubicweb import Binary
+from cubicweb import Binary, QueryError
from cubicweb.selectors import implements
from cubicweb.server.sources import storages
from cubicweb.server.hook import Hook, Operation
@@ -80,7 +80,7 @@
def test_bfss_sqlite_fspath(self):
f1 = self.create_file()
expected_filepath = osp.join(self.tempdir, '%s_data' % f1.eid)
- fspath = self.execute('Any fspath(F, "File", "data") WHERE F eid %(f)s',
+ fspath = self.execute('Any fspath(D) WHERE F eid %(f)s, F data D',
{'f': f1.eid})[0][0]
self.assertEquals(fspath.getvalue(), expected_filepath)
@@ -88,7 +88,7 @@
self.session.transaction_data['fs_importing'] = True
f1 = self.session.create_entity('File', data=Binary('/the/path'),
data_format=u'text/plain', data_name=u'foo')
- fspath = self.execute('Any fspath(F, "File", "data") WHERE F eid %(f)s',
+ fspath = self.execute('Any fspath(D) WHERE F eid %(f)s, F data D',
{'f': f1.eid})[0][0]
self.assertEquals(fspath.getvalue(), '/the/path')
@@ -102,5 +102,63 @@
self.vreg.unregister(DummyBeforeHook)
self.vreg.unregister(DummyAfterHook)
+ def test_source_mapped_attribute_error_cases(self):
+ ex = self.assertRaises(QueryError, self.execute,
+ 'Any X WHERE X data ~= "hop", X is File')
+ self.assertEquals(str(ex), 'can\'t use File.data (X data ILIKE "hop") in restriction')
+ ex = self.assertRaises(QueryError, self.execute,
+ 'Any X, Y WHERE X data D, Y data D, '
+ 'NOT X identity Y, X is File, Y is File')
+ self.assertEquals(str(ex), "can't use D as a restriction variable")
+ # query returning mix of mapped / regular attributes (only file.data
+ # mapped, not image.data for instance)
+ ex = self.assertRaises(QueryError, self.execute,
+ 'Any X WITH X BEING ('
+ ' (Any NULL)'
+ ' UNION '
+ ' (Any D WHERE X data D, X is File)'
+ ')')
+ self.assertEquals(str(ex), 'query fetch some source mapped attribute, some not')
+ ex = self.assertRaises(QueryError, self.execute,
+ '(Any D WHERE X data D, X is File)'
+ ' UNION '
+ '(Any D WHERE X data D, X is Image)')
+ self.assertEquals(str(ex), 'query fetch some source mapped attribute, some not')
+ ex = self.assertRaises(QueryError,
+ self.execute, 'Any D WHERE X data D')
+ self.assertEquals(str(ex), 'query fetch some source mapped attribute, some not')
+
+ def test_source_mapped_attribute_advanced(self):
+ f1 = self.create_file()
+ rset = self.execute('Any X,D WITH D,X BEING ('
+ ' (Any D, X WHERE X eid %(x)s, X data D)'
+ ' UNION '
+ ' (Any D, X WHERE X eid %(x)s, X data D)'
+ ')', {'x': f1.eid}, 'x')
+ self.assertEquals(len(rset), 2)
+ self.assertEquals(rset[0][0], f1.eid)
+ self.assertEquals(rset[1][0], f1.eid)
+ self.assertEquals(rset[0][1].getvalue(), 'the-data')
+ self.assertEquals(rset[1][1].getvalue(), 'the-data')
+ rset = self.execute('Any X,LENGTH(D) WHERE X eid %(x)s, X data D',
+ {'x': f1.eid}, 'x')
+ self.assertEquals(len(rset), 1)
+ self.assertEquals(rset[0][0], f1.eid)
+ self.assertEquals(rset[0][1], len('the-data'))
+ rset = self.execute('Any X,LENGTH(D) WITH D,X BEING ('
+ ' (Any D, X WHERE X eid %(x)s, X data D)'
+ ' UNION '
+ ' (Any D, X WHERE X eid %(x)s, X data D)'
+ ')', {'x': f1.eid}, 'x')
+ self.assertEquals(len(rset), 2)
+ self.assertEquals(rset[0][0], f1.eid)
+ self.assertEquals(rset[1][0], f1.eid)
+ self.assertEquals(rset[0][1], len('the-data'))
+ self.assertEquals(rset[1][1], len('the-data'))
+ ex = self.assertRaises(QueryError, self.execute,
+ 'Any X,UPPER(D) WHERE X eid %(x)s, X data D',
+ {'x': f1.eid}, 'x')
+ self.assertEquals(str(ex), 'UPPER can not be called on mapped attribute')
+
if __name__ == '__main__':
unittest_main()