# HG changeset patch # User Sylvain Thénault # Date 1269521987 -3600 # Node ID ad91f93bbb935f564549eb7cf0e09ee490248899 # Parent 9c4ea944ecf9672af61f319ba6312ae737db6f52 [source storage] refactor source sql generation and results handling to allow repository side callbacks for instance with the BytesFileSystemStorage, before this change: * fspath, _fsopen function were stored procedures executed on the database -> files had to be available both on the repository *and* the database host * we needed implementation for each handled database Now, those function are python callbacks executed when necessary on the repository side, on data comming from the database. The litle cons are: * you can't do anymore restriction on mapped attributes * you can't write queries which will return in the same rset column some mapped attributes (or not mapped the same way) / some not This seems much acceptable since: * it's much more easy to handle when you start having the db on another host than the repo * BFSS works seemlessly on any backend now * you don't bother that much about the cons (at least in the bfss case): you usually don't do any restriction on Bytes... Bonus points: BFSS is more efficient (no queries under the cover as it was done in the registered procedure) and we have a much nicer/efficient fspath implementation. IMO, that rocks :D diff -r 9c4ea944ecf9 -r ad91f93bbb93 cwconfig.py --- a/cwconfig.py Thu Mar 25 13:49:07 2010 +0100 +++ b/cwconfig.py Thu Mar 25 13:59:47 2010 +0100 @@ -90,7 +90,8 @@ from logilab.common.configuration import (Configuration, Method, ConfigurationMixIn, merge_options) -from cubicweb import CW_SOFTWARE_ROOT, CW_MIGRATION_MAP, ConfigurationError +from cubicweb import (CW_SOFTWARE_ROOT, CW_MIGRATION_MAP, + ConfigurationError, Binary) from cubicweb.toolsutils import env_path, create_dir CONFIGURATIONS = [] @@ -1050,7 +1051,24 @@ class FSPATH(FunctionDescr): - supported_backends = ('postgres', 'sqlite',) - rtype = 'Bytes' + """return path of some bytes attribute stored using the Bytes + File-System Storage (bfss) + """ + rtype = 'Bytes' # XXX return a String? potential pb with fs encoding + + def update_cb_stack(self, stack): + assert len(stack) == 1 + stack[0] = self.source_execute + + def as_sql(self, backend, args): + raise NotImplementedError('source only callback') + + def source_execute(self, source, value): + fpath = source.binary_to_str(value) + try: + return Binary(fpath) + except OSError, ex: + self.critical("can't open %s: %s", fpath, ex) + return None register_function(FSPATH) diff -r 9c4ea944ecf9 -r ad91f93bbb93 misc/migration/3.7.2_Any.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/misc/migration/3.7.2_Any.py Thu Mar 25 13:59:47 2010 +0100 @@ -0,0 +1,2 @@ +sql('DROP FUNCTION IF EXISTS _fsopen') +sql('DROP FUNCTION IF EXISTS fspath') diff -r 9c4ea944ecf9 -r ad91f93bbb93 schemas/_regproc_bss.postgres.sql --- a/schemas/_regproc_bss.postgres.sql Thu Mar 25 13:49:07 2010 +0100 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,38 +0,0 @@ -/* -*- sql -*- - - postgres specific registered procedures for the Bytes File System storage, - require the plpythonu language installed - -*/ - - -CREATE OR REPLACE FUNCTION _fsopen(bytea) RETURNS bytea AS $$ - fpath = args[0] - if fpath: - try: - data = file(fpath, 'rb').read() - #/* XXX due to plpython bug we have to replace some characters... */ - return data.replace("\\", r"\134").replace("\000", r"\000").replace("'", r"\047") #' - except Exception, ex: - plpy.warning('failed to get content for %s: %s', fpath, ex) - return None -$$ LANGUAGE plpythonu -/* WITH(ISCACHABLE) XXX does postgres handle caching of large data nicely */ -;; - -/* fspath(eid, entity type, attribute) */ -CREATE OR REPLACE FUNCTION fspath(bigint, text, text) RETURNS bytea AS $$ - pkey = 'plan%s%s' % (args[1], args[2]) - try: - plan = SD[pkey] - except KeyError: - #/* then prepare and cache plan to get versioned file information from a - # version content eid */ - plan = plpy.prepare( - 'SELECT X.cw_%s FROM cw_%s as X WHERE X.cw_eid=$1' % (args[2], args[1]), - ['bigint']) - SD[pkey] = plan - return plpy.execute(plan, [args[0]])[0]['cw_' + args[2]] -$$ LANGUAGE plpythonu -/* WITH(ISCACHABLE) XXX does postgres handle caching of large data nicely */ -;; diff -r 9c4ea944ecf9 -r ad91f93bbb93 server/sources/extlite.py --- a/server/sources/extlite.py Thu Mar 25 13:49:07 2010 +0100 +++ b/server/sources/extlite.py Thu Mar 25 13:59:47 2010 +0100 @@ -187,9 +187,10 @@ if self._need_sql_create: return [] assert dbg_st_search(self.uri, union, varmap, args, cachekey) - sql, query_args = self.rqlsqlgen.generate(union, args) - args = self.sqladapter.merge_args(args, query_args) - results = self.sqladapter.process_result(self.doexec(session, sql, args)) + sql, qargs, cbs = self.rqlsqlgen.generate(union, args) + args = self.sqladapter.merge_args(args, qargs) + cursor = self.doexec(session, sql, args) + results = self.sqladapter.process_result(cursor, cbs) assert dbg_results(results) return results diff -r 9c4ea944ecf9 -r ad91f93bbb93 server/sources/native.py --- a/server/sources/native.py Thu Mar 25 13:49:07 2010 +0100 +++ b/server/sources/native.py Thu Mar 25 13:59:47 2010 +0100 @@ -264,8 +264,9 @@ def init(self): self.init_creating() - def map_attribute(self, etype, attr, cb): - self._rql_sqlgen.attr_map['%s.%s' % (etype, attr)] = cb + # XXX deprecates [un]map_attribute ? + def map_attribute(self, etype, attr, cb, sourcedb=True): + self._rql_sqlgen.attr_map['%s.%s' % (etype, attr)] = (cb, sourcedb) def unmap_attribute(self, etype, attr): self._rql_sqlgen.attr_map.pop('%s.%s' % (etype, attr), None) @@ -273,7 +274,8 @@ def set_storage(self, etype, attr, storage): storage_dict = self._storages.setdefault(etype, {}) storage_dict[attr] = storage - self.map_attribute(etype, attr, storage.sqlgen_callback) + self.map_attribute(etype, attr, + storage.callback, storage.is_source_callback) def unset_storage(self, etype, attr): self._storages[etype].pop(attr) @@ -348,17 +350,17 @@ if cachekey is None: self.no_cache += 1 # generate sql query if we are able to do so (not supported types...) - sql, query_args = self._rql_sqlgen.generate(union, args, varmap) + sql, qargs, cbs = self._rql_sqlgen.generate(union, args, varmap) else: # sql may be cached try: - sql, query_args = self._cache[cachekey] + sql, qargs, cbs = self._cache[cachekey] self.cache_hit += 1 except KeyError: self.cache_miss += 1 - sql, query_args = self._rql_sqlgen.generate(union, args, varmap) - self._cache[cachekey] = sql, query_args - args = self.merge_args(args, query_args) + sql, qargs, cbs = self._rql_sqlgen.generate(union, args, varmap) + self._cache[cachekey] = sql, qargs, cbs + args = self.merge_args(args, qargs) assert isinstance(sql, basestring), repr(sql) try: cursor = self.doexec(session, sql, args) @@ -367,7 +369,7 @@ self.info("request failed '%s' ... retry with a new cursor", sql) session.pool.reconnect(self) cursor = self.doexec(session, sql, args) - results = self.process_result(cursor) + results = self.process_result(cursor, cbs) assert dbg_results(results) return results @@ -381,9 +383,9 @@ self.uri, union, varmap, args, prefix='ON THE FLY temp data insertion into %s from' % table) # generate sql queries if we are able to do so - sql, query_args = self._rql_sqlgen.generate(union, args, varmap) + sql, qargs, cbs = self._rql_sqlgen.generate(union, args, varmap) query = 'INSERT INTO %s %s' % (table, sql.encode(self._dbencoding)) - self.doexec(session, query, self.merge_args(args, query_args)) + self.doexec(session, query, self.merge_args(args, qargs)) def manual_insert(self, results, table, session): """insert given result into a temporary table on the system source""" diff -r 9c4ea944ecf9 -r ad91f93bbb93 server/sources/rql2sql.py --- a/server/sources/rql2sql.py Thu Mar 25 13:49:07 2010 +0100 +++ b/server/sources/rql2sql.py Thu Mar 25 13:59:47 2010 +0100 @@ -33,16 +33,30 @@ import threading +from logilab.database import FunctionDescr, SQL_FUNCTIONS_REGISTRY + from rql import BadRQLQuery, CoercionError from rql.stmts import Union, Select from rql.nodes import (SortTerm, VariableRef, Constant, Function, Not, Variable, ColumnAlias, Relation, SubQuery, Exists) +from cubicweb import QueryError from cubicweb.server.sqlutils import SQL_PREFIX from cubicweb.server.utils import cleanup_solutions ColumnAlias._q_invariant = False # avoid to check for ColumnAlias / Variable +FunctionDescr.source_execute = None + +def default_update_cb_stack(self, stack): + stack.append(self.source_execute) +FunctionDescr.update_cb_stack = default_update_cb_stack + +LENGTH = SQL_FUNCTIONS_REGISTRY.get_function('LENGTH') +def length_source_execute(source, value): + return len(value.getvalue()) +LENGTH.source_execute = length_source_execute + def _new_var(select, varname): newvar = select.get_variable(varname) if not 'relations' in newvar.stinfo: @@ -252,14 +266,44 @@ selectedidx.append(vref.name) rqlst.selection.append(vref) -# IGenerator implementation for RQL->SQL ###################################### +def iter_mapped_var_sels(stmt, variable): + # variable is a Variable or ColumnAlias node mapped to a source side + # callback + if not (len(variable.stinfo['rhsrelations']) <= 1 and # < 1 on column alias + variable.stinfo['selected']): + raise QueryError("can't use %s as a restriction variable" + % variable.name) + for selectidx in variable.stinfo['selected']: + vrefs = stmt.selection[selectidx].get_nodes(VariableRef) + if len(vrefs) != 1: + raise QueryError() + yield selectidx, vrefs[0] +def update_source_cb_stack(state, stmt, node, stack): + while True: + node = node.parent + if node is stmt: + break + if not isinstance(node, Function): + raise QueryError() + func = SQL_FUNCTIONS_REGISTRY.get_function(node.name) + if func.source_execute is None: + raise QueryError('%s can not be called on mapped attribute' + % node.name) + state.source_cb_funcs.add(node) + func.update_cb_stack(stack) + + +# IGenerator implementation for RQL->SQL ####################################### class StateInfo(object): def __init__(self, existssols, unstablevars): self.existssols = existssols self.unstablevars = unstablevars self.subtables = {} + self.needs_source_cb = None + self.subquery_source_cb = None + self.source_cb_funcs = set() def reset(self, solution): """reset some visit variables""" @@ -276,6 +320,17 @@ self.restrictions = [] self._restr_stack = [] self.ignore_varmap = False + self._needs_source_cb = {} + + def merge_source_cbs(self, needs_source_cb): + if self.needs_source_cb is None: + self.needs_source_cb = needs_source_cb + elif needs_source_cb != self.needs_source_cb: + raise QueryError('query fetch some source mapped attribute, some not') + + def finalize_source_cbs(self): + if self.subquery_source_cb is not None: + self.needs_source_cb.update(self.subquery_source_cb) def add_restriction(self, restr): if restr: @@ -373,7 +428,7 @@ # union query for each rqlst / solution sql = self.union_sql(union) # we are done - return sql, self._query_attrs + return sql, self._query_attrs, self._state.needs_source_cb finally: self._lock.release() @@ -436,6 +491,9 @@ else: existssols, unstable = {}, () state = StateInfo(existssols, unstable) + if self._state is not None: + # state from a previous unioned select + state.merge_source_cbs(self._state.needs_source_cb) # treat subqueries self._subqueries_sql(select, state) # generate sql for this select node @@ -491,6 +549,7 @@ if fneedwrap: selection = ['T1.C%s' % i for i in xrange(len(origselection))] sql = 'SELECT %s FROM (%s) AS T1' % (','.join(selection), sql) + state.finalize_source_cbs() finally: select.selection = origselection # limit / offset @@ -508,10 +567,21 @@ tablealias = '_T%s' % i # XXX nested subqueries sql = '(%s) AS %s' % (sql, tablealias) state.subtables[tablealias] = (0, sql) + latest_state = self._state for vref in subquery.aliases: alias = vref.variable alias._q_sqltable = tablealias alias._q_sql = '%s.C%s' % (tablealias, alias.colnum) + try: + stack = latest_state.needs_source_cb[alias.colnum] + if state.subquery_source_cb is None: + state.subquery_source_cb = {} + for selectidx, vref in iter_mapped_var_sels(select, alias): + stack = stack[:] + update_source_cb_stack(state, select, vref, stack) + state.subquery_source_cb[selectidx] = stack + except KeyError: + continue def _solutions_sql(self, select, solutions, distinct, needalias): sqls = [] @@ -523,6 +593,7 @@ sql = [self._selection_sql(select.selection, distinct, needalias)] if self._state.restrictions: sql.append('WHERE %s' % ' AND '.join(self._state.restrictions)) + self._state.merge_source_cbs(self._state._needs_source_cb) # add required tables assert len(self._state.actual_tables) == 1, self._state.actual_tables tables = self._state.actual_tables[-1] @@ -895,7 +966,13 @@ except KeyError: mapkey = '%s.%s' % (self._state.solution[lhs.name], rel.r_type) if mapkey in self.attr_map: - lhssql = self.attr_map[mapkey](self, lhs.variable, rel) + cb, sourcecb = self.attr_map[mapkey] + if sourcecb: + # callback is a source callback, we can't use this + # attribute in restriction + raise QueryError("can't use %s (%s) in restriction" + % (mapkey, rel.as_string())) + lhssql = cb(self, lhs.variable, rel) elif rel.r_type == 'eid': lhssql = lhs.variable._q_sql else: @@ -987,9 +1064,13 @@ def visit_function(self, func): """generate SQL name for a function""" - # func_sql_call will check function is supported by the backend - return self.dbms_helper.func_as_sql(func.name, - [c.accept(self) for c in func.children]) + args = [c.accept(self) for c in func.children] + if func in self._state.source_cb_funcs: + # function executed as a callback on the source + assert len(args) == 1 + return args[0] + # func_as_sql will check function is supported by the backend + return self.dbhelper.func_as_sql(func.name, args) def visit_constant(self, constant): """generate SQL name for a constant""" @@ -1156,12 +1237,20 @@ if isinstance(linkedvar, ColumnAlias): raise BadRQLQuery('variable %s should be selected by the subquery' % variable.name) - mapkey = '%s.%s' % (self._state.solution[linkedvar.name], rel.r_type) - if mapkey in self.attr_map: - return self.attr_map[mapkey](self, linkedvar, rel) try: sql = self._varmap['%s.%s' % (linkedvar.name, rel.r_type)] except KeyError: + mapkey = '%s.%s' % (self._state.solution[linkedvar.name], rel.r_type) + if mapkey in self.attr_map: + cb, sourcecb = self.attr_map[mapkey] + if not sourcecb: + return cb(self, linkedvar, rel) + # attribute mapped at the source level (bfss for instance) + stmt = rel.stmt + for selectidx, vref in iter_mapped_var_sels(stmt, variable): + stack = [cb] + update_source_cb_stack(self._state, stmt, vref, stack) + self._state._needs_source_cb[selectidx] = stack linkedvar.accept(self) sql = '%s.%s%s' % (linkedvar._q_sqltable, SQL_PREFIX, rel.r_type) return sql diff -r 9c4ea944ecf9 -r ad91f93bbb93 server/sources/storages.py --- a/server/sources/storages.py Thu Mar 25 13:49:07 2010 +0100 +++ b/server/sources/storages.py Thu Mar 25 13:59:47 2010 +0100 @@ -11,10 +11,31 @@ repo.system_source.unset_storage(etype, attr) class Storage(object): - """abstract storage""" - def sqlgen_callback(self, generator, relation, linkedvar): - """sql generator callback when some attribute with a custom storage is - accessed + """abstract storage + + * If `source_callback` is true (by default), the callback will be run during + query result process of fetched attribute's valu and should have the + following prototype:: + + callback(self, source, value) + + where `value` is the value actually stored in the backend. None values + will be skipped (eg callback won't be called). + + * if `source_callback` is false, the callback will be run during sql + generation when some attribute with a custom storage is accessed and + should have the following prototype:: + + callback(self, generator, relation, linkedvar) + + where `generator` is the sql generator, `relation` the current rql syntax + tree relation and linkedvar the principal syntax tree variable holding the + attribute. + """ + is_source_callback = True + + def callback(self, *args): + """see docstring for prototype, which vary according to is_source_callback """ raise NotImplementedError() @@ -38,14 +59,16 @@ def __init__(self, defaultdir): self.default_directory = defaultdir - def sqlgen_callback(self, generator, linkedvar, relation): + def callback(self, source, value): """sql generator callback when some attribute with a custom storage is accessed """ - linkedvar.accept(generator) - return '_fsopen(%s.cw_%s)' % ( - linkedvar._q_sql.split('.', 1)[0], # table name - relation.r_type) # attribute name + fpath = source.binary_to_str(value) + try: + return Binary(file(fpath).read()) + except OSError, ex: + source.critical("can't open %s: %s", value, ex) + return None def entity_added(self, entity, attr): """an entity using this storage for attr has been added""" diff -r 9c4ea944ecf9 -r ad91f93bbb93 server/sqlutils.py --- a/server/sqlutils.py Thu Mar 25 13:49:07 2010 +0100 +++ b/server/sqlutils.py Thu Mar 25 13:59:47 2010 +0100 @@ -188,9 +188,17 @@ return newargs return query_args - def process_result(self, cursor): + def process_result(self, cursor, column_callbacks=None): """return a list of CubicWeb compliant values from data in the given cursor """ + # use two different implementations to avoid paying the price of + # callback lookup for each *cell* in results when there is nothing to + # lookup + if not column_callbacks: + return self._process_result(cursor) + return self._cb_process_result(cursor, column_callbacks) + + def _process_result(self, cursor, column_callbacks=None): # begin bind to locals for optimization descr = cursor.description encoding = self._dbencoding @@ -208,6 +216,30 @@ results[i] = result return results + def _cb_process_result(self, cursor, column_callbacks): + # begin bind to locals for optimization + descr = cursor.description + encoding = self._dbencoding + process_value = self._process_value + binary = Binary + # /end + results = cursor.fetchall() + for i, line in enumerate(results): + result = [] + for col, value in enumerate(line): + if value is None: + result.append(value) + continue + cbstack = column_callbacks.get(col, None) + if cbstack is None: + value = process_value(value, descr[col], encoding, binary) + else: + for cb in cbstack: + value = cb(self, value) + result.append(value) + results[i] = result + return results + def preprocess_entity(self, entity): """return a dictionary to use as extra argument to cursor.execute to insert/update an entity into a SQL database @@ -277,28 +309,5 @@ import yams.constraints yams.constraints.patch_sqlite_decimal() - def fspath(eid, etype, attr): - try: - cu = cnx.cursor() - cu.execute('SELECT X.cw_%s FROM cw_%s as X ' - 'WHERE X.cw_eid=%%(eid)s' % (attr, etype), - {'eid': eid}) - return cu.fetchone()[0] - except: - import traceback - traceback.print_exc() - raise - cnx.create_function('fspath', 3, fspath) - - def _fsopen(fspath): - if fspath: - try: - return buffer(file(fspath).read()) - except: - import traceback - traceback.print_exc() - raise - cnx.create_function('_fsopen', 1, _fsopen) - sqlite_hooks = SQL_CONNECT_HOOKS.setdefault('sqlite', []) sqlite_hooks.append(init_sqlite_connexion) diff -r 9c4ea944ecf9 -r ad91f93bbb93 server/test/unittest_rql2sql.py --- a/server/test/unittest_rql2sql.py Thu Mar 25 13:49:07 2010 +0100 +++ b/server/test/unittest_rql2sql.py Thu Mar 25 13:59:47 2010 +0100 @@ -1113,8 +1113,8 @@ args = {'text': 'hip hop momo'} try: union = self._prepare(rql) - r, nargs = self.o.generate(union, args, - varmap=varmap) + r, nargs, cbs = self.o.generate(union, args, + varmap=varmap) args.update(nargs) self.assertLinesEquals((r % args).strip(), self._norm_sql(sql), striplines=True) except Exception, ex: @@ -1135,7 +1135,7 @@ def _checkall(self, rql, sql): try: rqlst = self._prepare(rql) - r, args = self.o.generate(rqlst) + r, args, cbs = self.o.generate(rqlst) self.assertEqual((r.strip(), args), sql) except Exception, ex: print rql @@ -1197,7 +1197,7 @@ def test_is_null_transform(self): union = self._prepare('Any X WHERE X login %(login)s') - r, args = self.o.generate(union, {'login': None}) + r, args, cbs = self.o.generate(union, {'login': None}) self.assertLinesEquals((r % args).strip(), '''SELECT _X.cw_eid FROM cw_CWUser AS _X @@ -1386,11 +1386,11 @@ '''SELECT COUNT(1) WHERE EXISTS(SELECT 1 FROM owned_by_relation AS rel_owned_by0, cw_Affaire AS _P WHERE rel_owned_by0.eid_from=_P.cw_eid AND rel_owned_by0.eid_to=1 UNION SELECT 1 FROM owned_by_relation AS rel_owned_by1, cw_Note AS _P WHERE rel_owned_by1.eid_from=_P.cw_eid AND rel_owned_by1.eid_to=1)''') - def test_attr_map(self): + def test_attr_map_sqlcb(self): def generate_ref(gen, linkedvar, rel): linkedvar.accept(gen) return 'VERSION_DATA(%s)' % linkedvar._q_sql - self.o.attr_map['Affaire.ref'] = generate_ref + self.o.attr_map['Affaire.ref'] = (generate_ref, False) try: self._check('Any R WHERE X ref R', '''SELECT VERSION_DATA(_X.cw_eid) @@ -1402,6 +1402,17 @@ finally: self.o.attr_map.clear() + def test_attr_map_sourcecb(self): + cb = lambda x,y: None + self.o.attr_map['Affaire.ref'] = (cb, True) + try: + union = self._prepare('Any R WHERE X ref R') + r, nargs, cbs = self.o.generate(union, args={}) + self.assertLinesEquals(r.strip(), 'SELECT _X.cw_ref\nFROM cw_Affaire AS _X') + self.assertEquals(cbs, {0: [cb]}) + finally: + self.o.attr_map.clear() + class SqliteSQLGeneratorTC(PostgresSQLGeneratorTC): diff -r 9c4ea944ecf9 -r ad91f93bbb93 server/test/unittest_storage.py --- a/server/test/unittest_storage.py Thu Mar 25 13:49:07 2010 +0100 +++ b/server/test/unittest_storage.py Thu Mar 25 13:59:47 2010 +0100 @@ -13,7 +13,7 @@ import shutil import tempfile -from cubicweb import Binary +from cubicweb import Binary, QueryError from cubicweb.selectors import implements from cubicweb.server.sources import storages from cubicweb.server.hook import Hook, Operation @@ -80,7 +80,7 @@ def test_bfss_sqlite_fspath(self): f1 = self.create_file() expected_filepath = osp.join(self.tempdir, '%s_data' % f1.eid) - fspath = self.execute('Any fspath(F, "File", "data") WHERE F eid %(f)s', + fspath = self.execute('Any fspath(D) WHERE F eid %(f)s, F data D', {'f': f1.eid})[0][0] self.assertEquals(fspath.getvalue(), expected_filepath) @@ -88,7 +88,7 @@ self.session.transaction_data['fs_importing'] = True f1 = self.session.create_entity('File', data=Binary('/the/path'), data_format=u'text/plain', data_name=u'foo') - fspath = self.execute('Any fspath(F, "File", "data") WHERE F eid %(f)s', + fspath = self.execute('Any fspath(D) WHERE F eid %(f)s, F data D', {'f': f1.eid})[0][0] self.assertEquals(fspath.getvalue(), '/the/path') @@ -102,5 +102,63 @@ self.vreg.unregister(DummyBeforeHook) self.vreg.unregister(DummyAfterHook) + def test_source_mapped_attribute_error_cases(self): + ex = self.assertRaises(QueryError, self.execute, + 'Any X WHERE X data ~= "hop", X is File') + self.assertEquals(str(ex), 'can\'t use File.data (X data ILIKE "hop") in restriction') + ex = self.assertRaises(QueryError, self.execute, + 'Any X, Y WHERE X data D, Y data D, ' + 'NOT X identity Y, X is File, Y is File') + self.assertEquals(str(ex), "can't use D as a restriction variable") + # query returning mix of mapped / regular attributes (only file.data + # mapped, not image.data for instance) + ex = self.assertRaises(QueryError, self.execute, + 'Any X WITH X BEING (' + ' (Any NULL)' + ' UNION ' + ' (Any D WHERE X data D, X is File)' + ')') + self.assertEquals(str(ex), 'query fetch some source mapped attribute, some not') + ex = self.assertRaises(QueryError, self.execute, + '(Any D WHERE X data D, X is File)' + ' UNION ' + '(Any D WHERE X data D, X is Image)') + self.assertEquals(str(ex), 'query fetch some source mapped attribute, some not') + ex = self.assertRaises(QueryError, + self.execute, 'Any D WHERE X data D') + self.assertEquals(str(ex), 'query fetch some source mapped attribute, some not') + + def test_source_mapped_attribute_advanced(self): + f1 = self.create_file() + rset = self.execute('Any X,D WITH D,X BEING (' + ' (Any D, X WHERE X eid %(x)s, X data D)' + ' UNION ' + ' (Any D, X WHERE X eid %(x)s, X data D)' + ')', {'x': f1.eid}, 'x') + self.assertEquals(len(rset), 2) + self.assertEquals(rset[0][0], f1.eid) + self.assertEquals(rset[1][0], f1.eid) + self.assertEquals(rset[0][1].getvalue(), 'the-data') + self.assertEquals(rset[1][1].getvalue(), 'the-data') + rset = self.execute('Any X,LENGTH(D) WHERE X eid %(x)s, X data D', + {'x': f1.eid}, 'x') + self.assertEquals(len(rset), 1) + self.assertEquals(rset[0][0], f1.eid) + self.assertEquals(rset[0][1], len('the-data')) + rset = self.execute('Any X,LENGTH(D) WITH D,X BEING (' + ' (Any D, X WHERE X eid %(x)s, X data D)' + ' UNION ' + ' (Any D, X WHERE X eid %(x)s, X data D)' + ')', {'x': f1.eid}, 'x') + self.assertEquals(len(rset), 2) + self.assertEquals(rset[0][0], f1.eid) + self.assertEquals(rset[1][0], f1.eid) + self.assertEquals(rset[0][1], len('the-data')) + self.assertEquals(rset[1][1], len('the-data')) + ex = self.assertRaises(QueryError, self.execute, + 'Any X,UPPER(D) WHERE X eid %(x)s, X data D', + {'x': f1.eid}, 'x') + self.assertEquals(str(ex), 'UPPER can not be called on mapped attribute') + if __name__ == '__main__': unittest_main()