# HG changeset patch # User Pierre-Yves David # Date 1372247298 -7200 # Node ID 774029a4a71817bb1256fb6907b173899a73c55e # Parent 46885bfa4150b2250f0c5ac04c850fe358da9051 [multi-sources] drop multi-sources related test. another branch is removing the multi-source itself. We do not want to bother fixing those test. diff -r 46885bfa4150 -r 774029a4a718 server/test/unittest_msplanner.py --- a/server/test/unittest_msplanner.py Thu Jun 27 18:21:04 2013 +0200 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,2797 +0,0 @@ -# copyright 2003-2012 LOGILAB S.A. (Paris, FRANCE), all rights reserved. -# contact http://www.logilab.fr/ -- mailto:contact@logilab.fr -# -# This file is part of CubicWeb. -# -# CubicWeb is free software: you can redistribute it and/or modify it under the -# terms of the GNU Lesser General Public License as published by the Free -# Software Foundation, either version 2.1 of the License, or (at your option) -# any later version. -# -# CubicWeb is distributed in the hope that it will be useful, but WITHOUT -# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS -# FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more -# details. -# -# You should have received a copy of the GNU Lesser General Public License along -# with CubicWeb. If not, see . -"""unit tests for module cubicweb.server.msplanner""" - -from logilab.common.decorators import clear_cache -from yams.buildobjs import RelationDefinition -from rql import BadRQLQuery - -from cubicweb.devtools import get_test_db_handler, TestServerConfiguration -from cubicweb.devtools.repotest import BasePlannerTC, test_plan - -class _SetGenerator(object): - """singleton to easily create set using "s[0]" or "s[0,1,2]" for instance - """ - def __getitem__(self, key): - try: - it = iter(key) - except TypeError: - it = (key,) - return set(it) -s = _SetGenerator() - -from cubicweb.schema import ERQLExpression -from cubicweb.server.sources import AbstractSource -from cubicweb.server.msplanner import MSPlanner, PartPlanInformation - -class FakeUserROSource(AbstractSource): - support_entities = {'CWUser': False} - support_relations = {} - def syntax_tree_search(self, *args, **kwargs): - return [] - - -class FakeCardSource(AbstractSource): - support_entities = {'Card': True, 'Note': True, 'State': True} - support_relations = {'in_state': True, 'multisource_rel': True, 'multisource_inlined_rel': True, - 'multisource_crossed_rel': True,} - dont_cross_relations = set(('fiche', 'state_of')) - cross_relations = set(('multisource_crossed_rel',)) - - def syntax_tree_search(self, *args, **kwargs): - return [] - - -class FakeDataFeedSource(FakeCardSource): - copy_based_source = True - -X_ALL_SOLS = sorted([{'X': 'Affaire'}, {'X': 'BaseTransition'}, {'X': 'Basket'}, - {'X': 'Bookmark'}, {'X': 'CWAttribute'}, {'X': 'CWCache'}, - {'X': 'CWConstraint'}, {'X': 'CWConstraintType'}, {'X': 'CWDataImport'}, {'X': 'CWEType'}, - {'X': 'CWGroup'}, {'X': 'CWPermission'}, {'X': 'CWProperty'}, - {'X': 'CWRType'}, {'X': 'CWRelation'}, - {'X': 'CWSource'}, {'X': 'CWSourceHostConfig'}, {'X': 'CWSourceSchemaConfig'}, - {'X': 'CWUser'}, {'X': 'CWUniqueTogetherConstraint'}, - {'X': 'Card'}, {'X': 'Comment'}, {'X': 'Division'}, - {'X': 'Email'}, {'X': 'EmailAddress'}, {'X': 'EmailPart'}, - {'X': 'EmailThread'}, {'X': 'ExternalUri'}, {'X': 'File'}, - {'X': 'Folder'}, {'X': 'Note'}, {'X': 'Old'}, - {'X': 'Personne'}, {'X': 'RQLExpression'}, {'X': 'Societe'}, - {'X': 'State'}, {'X': 'SubDivision'}, {'X': 'SubWorkflowExitPoint'}, - {'X': 'Tag'}, {'X': 'TrInfo'}, {'X': 'Transition'}, - {'X': 'Workflow'}, {'X': 'WorkflowTransition'}]) - - -# keep cnx so it's not garbage collected and the associated session is closed -def setUpModule(*args): - global repo, cnx - handler = get_test_db_handler(TestServerConfiguration(apphome=BaseMSPlannerTC.datadir)) - handler.build_db_cache() - repo, cnx = handler.get_repo_and_cnx() - -def tearDownModule(*args): - global repo, cnx - del repo, cnx - - -class BaseMSPlannerTC(BasePlannerTC): - """test planner related feature on a 3-sources repository: - - * system source supporting everything - * ldap source supporting CWUser - * rql source supporting Card - """ - - def setUp(self): - self.__class__.repo = repo - #_QuerierTC.setUp(self) - self.setup() - # hijack Affaire security - affreadperms = list(self.schema['Affaire'].permissions['read']) - self.prevrqlexpr_affaire = affreadperms[-1] - # add access to type attribute so S can't be invariant - affreadperms[-1] = ERQLExpression('X concerne S?, S owned_by U, S type "X"') - self.schema['Affaire'].set_action_permissions('read', affreadperms) - # hijack CWUser security - userreadperms = list(self.schema['CWUser'].permissions['read']) - self.prevrqlexpr_user = userreadperms[-1] - userreadperms[-1] = ERQLExpression('X owned_by U') - self.schema['CWUser'].set_action_permissions('read', userreadperms) - self.add_source(FakeUserROSource, 'ldap') - self.add_source(FakeCardSource, 'cards') - self.add_source(FakeDataFeedSource, 'datafeed') - - def tearDown(self): - # restore hijacked security - self.restore_orig_affaire_security() - self.restore_orig_cwuser_security() - super(BaseMSPlannerTC, self).tearDown() - - def restore_orig_affaire_security(self): - affreadperms = list(self.schema['Affaire'].permissions['read']) - affreadperms[-1] = self.prevrqlexpr_affaire - self.schema['Affaire'].set_action_permissions('read', affreadperms) - - def restore_orig_cwuser_security(self): - if hasattr(self, '_orig_cwuser_security_restored'): - return - self._orig_cwuser_security_restored = True - userreadperms = list(self.schema['CWUser'].permissions['read']) - userreadperms[-1] = self.prevrqlexpr_user - self.schema['CWUser'].set_action_permissions('read', userreadperms) - - -class PartPlanInformationTC(BaseMSPlannerTC): - - def _test(self, rql, *args): - if len(args) == 3: - kwargs, sourcesterms, needsplit = args - else: - sourcesterms, needsplit = args - kwargs = None - plan = self._prepare_plan(rql, kwargs) - union = plan.rqlst - plan.preprocess(union) - ppi = PartPlanInformation(plan, union.children[0]) - for sourcevars in ppi._sourcesterms.itervalues(): - for var in list(sourcevars): - solindices = sourcevars.pop(var) - sourcevars[var._ms_table_key()] = solindices - self.assertEqual(ppi._sourcesterms, sourcesterms) - self.assertEqual(ppi.needsplit, needsplit) - - - def test_simple_system_only(self): - """retrieve entities only supported by the system source""" - self._test('CWGroup X', - {self.system: {'X': s[0]}}, False) - - def test_simple_system_ldap(self): - """retrieve CWUser X from both sources and return concatenation of results - """ - self._test('CWUser X', - {self.system: {'X': s[0]}, self.ldap: {'X': s[0]}}, False) - - def test_simple_system_rql(self): - """retrieve Card X from both sources and return concatenation of results - """ - self._test('Any X, XT WHERE X is Card, X title XT', - {self.system: {'X': s[0]}, self.cards: {'X': s[0]}}, False) - - def test_simple_eid_specified(self): - """retrieve CWUser X from system source (eid is specified, can locate the entity) - """ - ueid = self.session.user.eid - self._test('Any X,L WHERE X eid %(x)s, X login L', {'x': ueid}, - {self.system: {'X': s[0]}}, False) - - def test_simple_eid_invariant(self): - """retrieve CWUser X from system source (eid is specified, can locate the entity) - """ - ueid = self.session.user.eid - self._test('Any X WHERE X eid %(x)s', {'x': ueid}, - {self.system: {'x': s[0]}}, False) - - def test_simple_invariant(self): - """retrieve CWUser X from system source only (X is invariant and in_group not supported by ldap source) - """ - self._test('Any X WHERE X is CWUser, X in_group G, G name "users"', - {self.system: {'X': s[0], 'G': s[0], 'in_group': s[0]}}, False) - - def test_security_has_text(self): - """retrieve CWUser X from system source only (has_text not supported by ldap source) - """ - # specify CWUser instead of any since the way this test is written we aren't well dealing - # with ambigous query (eg only considering the first solution) - self._test('CWUser X WHERE X has_text "bla"', - {self.system: {'X': s[0]}}, False) - - def test_complex_base(self): - """ - 1. retrieve Any X, L WHERE X is CWUser, X login L from system and ldap sources, store - concatenation of results into a temporary table - 2. return the result of Any X, L WHERE X is TMP, X login L, X in_group G, - G name 'users' on the system source - """ - self._test('Any X,L WHERE X is CWUser, X in_group G, X login L, G name "users"', - {self.system: {'X': s[0], 'G': s[0], 'in_group': s[0]}, - self.ldap : {'X': s[0]}}, True) - - def test_complex_invariant_ordered(self): - """ - 1. retrieve Any X,AA WHERE X modification_date AA from system and ldap sources, store - concatenation of results into a temporary table - 2. return the result of Any X,AA ORDERBY AA WHERE %s owned_by X, X modification_date AA - on the system source - """ - ueid = self.session.user.eid - self._test('Any X,AA ORDERBY AA WHERE E eid %(x)s, E owned_by X, X modification_date AA', {'x': ueid}, - {self.system: {'x': s[0], 'X': s[0], 'owned_by': s[0]}, - self.ldap : {'X': s[0]}}, True) - - def test_complex_invariant(self): - """ - 1. retrieve Any X,L,AA WHERE X login L, X modification_date AA from system and ldap sources, store - concatenation of results into a temporary table - 2. return the result of Any X,L,AA WHERE %s owned_by X, X login L, X modification_date AA - on the system source - """ - ueid = self.session.user.eid - self._test('Any X,L,AA WHERE E eid %(x)s, E owned_by X, X login L, X modification_date AA', {'x': ueid}, - {self.system: {'x': s[0], 'X': s[0], 'owned_by': s[0]}, - self.ldap : {'X': s[0]}}, True) - - def test_complex_ambigous(self): - """retrieve CWUser X from system and ldap sources, Person X from system source only - """ - self._test('Any X,F WHERE X firstname F', - {self.system: {'X': s[0, 1]}, - self.ldap: {'X': s[0]}}, True) - - def test_complex_multiple(self): - """ - 1. retrieve Any X,A,Y,B WHERE X login A, Y login B from system and ldap sources, store - cartesian product of results into a temporary table - 2. return the result of Any X,Y WHERE X login 'syt', Y login 'adim' - on the system source - """ - ueid = self.session.user.eid - self._test('Any X,Y WHERE X login "syt", Y login "adim"', {'x': ueid}, - {self.system: {'Y': s[0], 'X': s[0]}, - self.ldap: {'Y': s[0], 'X': s[0]}}, True) - - def test_complex_aggregat(self): - solindexes = set(range(len([e for e in self.schema.entities() if not e.final]))) - self._test('Any MAX(X)', - {self.system: {'X': solindexes}}, False) - - def test_complex_optional(self): - ueid = self.session.user.eid - self._test('Any U WHERE WF wf_info_for X, X eid %(x)s, WF owned_by U?, WF from_state FS', {'x': ueid}, - {self.system: {'WF': s[0], 'FS': s[0], 'U': s[0], - 'from_state': s[0], 'owned_by': s[0], 'wf_info_for': s[0], - 'x': s[0]}}, - False) - - def test_exists4(self): - """ - State S could come from both rql source and system source, - but since X cannot come from the rql source, the solution - {self.cards : 'S'} must be removed - """ - self._test('Any G,L WHERE X in_group G, X login L, G name "managers", ' - 'EXISTS(X copain T, T login L, T login in ("comme", "cochon")) OR ' - 'EXISTS(X in_state S, S name "pascontent", NOT X copain T2, T2 login "billy")', - {self.system: {'X': s[0], 'S': s[0], 'T2': s[0], 'T': s[0], 'G': s[0], 'copain': s[0], 'in_group': s[0]}, - self.ldap: {'X': s[0], 'T2': s[0], 'T': s[0]}}, - True) - - def test_relation_need_split(self): - self._test('Any X, S WHERE X in_state S', - {self.system: {'X': s[0, 1, 2], 'S': s[0, 1, 2]}, - self.cards: {'X': s[2], 'S': s[2]}}, - True) - - def test_not_relation_need_split(self): - self._test('Any SN WHERE NOT X in_state S, S name SN', - {self.cards: {'X': s[2], 'S': s[0, 1, 2]}, - self.system: {'X': s[0, 1, 2], 'S': s[0, 1, 2]}}, - True) - - def test_not_relation_no_split_external(self): - repo._type_source_cache[999999] = ('Note', 'cards', 999999, 'cards') - # similar to the above test but with an eid coming from the external source. - # the same plan may be used, since we won't find any record in the system source - # linking 9999999 to a state - self._test('Any SN WHERE NOT X in_state S, X eid %(x)s, S name SN', - {'x': 999999}, - {self.cards: {'x': s[0], 'S': s[0]}, - self.system: {'x': s[0], 'S': s[0]}}, - False) - - def test_relation_restriction_ambigous_need_split(self): - self._test('Any X,T WHERE X in_state S, S name "pending", T tags X', - {self.system: {'X': s[0, 1, 2], 'S': s[0, 1, 2], 'T': s[0, 1, 2], 'tags': s[0, 1, 2]}, - self.cards: {'X': s[2], 'S': s[2]}}, - True) - - def test_simplified_var(self): - repo._type_source_cache[999999] = ('Note', 'cards', 999999, 'cards') - # need access to source since X table has to be accessed because of the outer join - self._test('Any U WHERE U in_group G, (G name IN ("managers", "logilab") OR (X require_permission P?, P name "bla", P require_group G)), X eid %(x)s, U eid %(u)s', - {'x': 999999, 'u': self.session.user.eid}, - {self.system: {'P': s[0], 'G': s[0], - 'require_permission': s[0], 'in_group': s[0], 'P': s[0], 'require_group': s[0], - 'u': s[0]}, - self.cards: {'X': s[0]}}, - True) - - def test_delete_relation1(self): - ueid = self.session.user.eid - self._test('Any X, Y WHERE X created_by Y, X eid %(x)s, NOT Y eid %(y)s', - {'x': ueid, 'y': ueid}, - {self.system: {'Y': s[0], 'created_by': s[0], 'x': s[0]}}, - False) - - def test_crossed_relation_eid_1_needattr(self): - repo._type_source_cache[999999] = ('Note', 'system', 999999, 'system') - ueid = self.session.user.eid - self._test('Any Y,T WHERE X eid %(x)s, X multisource_crossed_rel Y, Y type T', - {'x': 999999,}, - {self.cards: {'Y': s[0]}, self.system: {'Y': s[0], 'x': s[0]}}, - True) - - def test_crossed_relation_eid_1_invariant(self): - repo._type_source_cache[999999] = ('Note', 'system', 999999, 'system') - self._test('Any Y WHERE X eid %(x)s, X multisource_crossed_rel Y', - {'x': 999999}, - {self.system: {'Y': s[0], 'x': s[0]}}, - False) - - def test_crossed_relation_eid_2_invariant(self): - repo._type_source_cache[999999] = ('Note', 'cards', 999999, 'cards') - self._test('Any Y WHERE X eid %(x)s, X multisource_crossed_rel Y', - {'x': 999999,}, - {self.cards: {'Y': s[0], 'multisource_crossed_rel': s[0], 'x': s[0]}, - self.system: {'Y': s[0], 'multisource_crossed_rel': s[0], 'x': s[0]}}, - False) - - def test_version_crossed_depends_on_1(self): - repo._type_source_cache[999999] = ('Note', 'cards', 999999, 'cards') - self._test('Any X,AD,AE WHERE E eid %(x)s, E multisource_crossed_rel X, X in_state AD, AD name AE', - {'x': 999999}, - {self.cards: {'X': s[0], 'AD': s[0], 'multisource_crossed_rel': s[0], 'x': s[0]}, - self.system: {'X': s[0], 'AD': s[0], 'multisource_crossed_rel': s[0], 'x': s[0]}}, - True) - - def test_version_crossed_depends_on_2(self): - repo._type_source_cache[999999] = ('Note', 'system', 999999, 'system') - self._test('Any X,AD,AE WHERE E eid %(x)s, E multisource_crossed_rel X, X in_state AD, AD name AE', - {'x': 999999}, - {self.cards: {'X': s[0], 'AD': s[0]}, - self.system: {'X': s[0], 'AD': s[0], 'x': s[0]}}, - True) - - def test_simplified_var_3(self): - repo._type_source_cache[999999] = ('Note', 'cards', 999999, 'cards') - repo._type_source_cache[999998] = ('State', 'cards', 999998, 'cards') - self._test('Any S,T WHERE S eid %(s)s, N eid %(n)s, N type T, N is Note, S is State', - {'n': 999999, 's': 999998}, - {self.cards: {'s': s[0], 'N': s[0]}}, False) - - - -class MSPlannerTC(BaseMSPlannerTC): - - def setUp(self): - BaseMSPlannerTC.setUp(self) - self.planner = MSPlanner(self.o.schema, self.repo.vreg.rqlhelper) - for cached in ('rel_type_sources', 'can_cross_relation', 'is_multi_sources_relation'): - clear_cache(self.repo, cached) - - _test = test_plan - - def test_simple_system_only(self): - """retrieve entities only supported by the system source - """ - self._test('CWGroup X', - [('OneFetchStep', [('Any X WHERE X is CWGroup', [{'X': 'CWGroup'}])], - None, None, [self.system], {}, [])]) - - def test_simple_system_only_limit(self): - """retrieve entities only supported by the system source - """ - self._test('CWGroup X LIMIT 10', - [('OneFetchStep', [('Any X LIMIT 10 WHERE X is CWGroup', [{'X': 'CWGroup'}])], - 10, None, [self.system], {}, [])]) - - def test_simple_system_only_limit_offset(self): - """retrieve entities only supported by the system source - """ - self._test('CWGroup X LIMIT 10 OFFSET 10', - [('OneFetchStep', [('Any X LIMIT 10 OFFSET 10 WHERE X is CWGroup', [{'X': 'CWGroup'}])], - 10, 10, [self.system], {}, [])]) - - def test_simple_system_ldap(self): - """retrieve CWUser X from both sources and return concatenation of results - """ - self._test('CWUser X', - [('OneFetchStep', [('Any X WHERE X is CWUser', [{'X': 'CWUser'}])], - None, None, [self.ldap, self.system], {}, [])]) - - def test_simple_system_ldap_limit(self): - """retrieve CWUser X from both sources and return concatenation of results - """ - self._test('CWUser X LIMIT 10', - [('OneFetchStep', [('Any X LIMIT 10 WHERE X is CWUser', [{'X': 'CWUser'}])], - 10, None, [self.ldap, self.system], {}, [])]) - - def test_simple_system_ldap_limit_offset(self): - """retrieve CWUser X from both sources and return concatenation of results - """ - self._test('CWUser X LIMIT 10 OFFSET 10', - [('OneFetchStep', [('Any X LIMIT 10 OFFSET 10 WHERE X is CWUser', [{'X': 'CWUser'}])], - 10, 10, [self.ldap, self.system], {}, [])]) - - def test_simple_system_ldap_ordered_limit_offset(self): - """retrieve CWUser X from both sources and return concatenation of results - """ - self._test('CWUser X ORDERBY X LIMIT 10 OFFSET 10', - [('AggrStep', 'SELECT table0.C0 FROM table0\nORDER BY table0.C0\nLIMIT 10\nOFFSET 10', None, [ - ('FetchStep', [('Any X WHERE X is CWUser', [{'X': 'CWUser'}])], - [self.ldap, self.system], {}, {'X': 'table0.C0'}, []), - ]), - ]) - def test_simple_system_ldap_aggregat(self): - """retrieve CWUser X from both sources and return concatenation of results - """ - # COUNT(X) is kept in sub-step and transformed into SUM(X) in the AggrStep - self._test('Any COUNT(X) WHERE X is CWUser', - [('AggrStep', 'SELECT SUM(table0.C0) FROM table0', None, [ - ('FetchStep', [('Any COUNT(X) WHERE X is CWUser', [{'X': 'CWUser'}])], - [self.ldap, self.system], {}, {'COUNT(X)': 'table0.C0'}, []), - ]), - ]) - - def test_simple_system_rql(self): - """retrieve Card X from both sources and return concatenation of results - """ - self._test('Any X, XT WHERE X is Card, X title XT', - [('OneFetchStep', [('Any X,XT WHERE X is Card, X title XT', [{'X': 'Card', 'XT': 'String'}])], - None, None, [self.cards, self.system], {}, [])]) - - def test_simple_eid_specified(self): - """retrieve CWUser X from system source (eid is specified, can locate the entity) - """ - ueid = self.session.user.eid - self._test('Any X,L WHERE X eid %(x)s, X login L', - [('OneFetchStep', [('Any X,L WHERE X eid %s, X login L'%ueid, [{'X': 'CWUser', 'L': 'String'}])], - None, None, [self.system], {}, [])], - {'x': ueid}) - - def test_simple_eid_invariant(self): - """retrieve CWUser X from system source (eid is specified, can locate the entity) - """ - ueid = self.session.user.eid - self._test('Any X WHERE X eid %(x)s', - [('OneFetchStep', [('Any %s'%ueid, [{}])], - None, None, [self.system], {}, [])], - {'x': ueid}) - - def test_simple_invariant(self): - """retrieve CWUser X from system source only (X is invariant and in_group not supported by ldap source) - """ - self._test('Any X WHERE X is CWUser, X in_group G, G name "users"', - [('OneFetchStep', [('Any X WHERE X is CWUser, X in_group G, G name "users"', - [{'X': 'CWUser', 'G': 'CWGroup'}])], - None, None, [self.system], {}, [])]) - - def test_complex_base(self): - """ - 1. retrieve Any X, L WHERE X is CWUser, X login L from system and ldap sources, store - concatenation of results into a temporary table - 2. return the result of Any X, L WHERE X is TMP, X login LX in_group G, - G name 'users' on the system source - """ - self._test('Any X,L WHERE X is CWUser, X in_group G, X login L, G name "users"', - [('FetchStep', [('Any X,L WHERE X login L, X is CWUser', [{'X': 'CWUser', 'L': 'String'}])], - [self.ldap, self.system], None, - {'X': 'table0.C0', 'X.login': 'table0.C1', 'L': 'table0.C1'}, []), - ('OneFetchStep', [('Any X,L WHERE X in_group G, X login L, G name "users", G is CWGroup, X is CWUser', - [{'X': 'CWUser', 'L': 'String', 'G': 'CWGroup'}])], - None, None, [self.system], - {'X': 'table0.C0', 'X.login': 'table0.C1', 'L': 'table0.C1'}, []) - ]) - - def test_complex_base_limit_offset(self): - """ - 1. retrieve Any X, L WHERE X is CWUser, X login L from system and ldap sources, store - concatenation of results into a temporary table - 2. return the result of Any X, L WHERE X is TMP, X login LX in_group G, - G name 'users' on the system source - """ - self._test('Any X,L LIMIT 10 OFFSET 10 WHERE X is CWUser, X in_group G, X login L, G name "users"', - [('FetchStep', [('Any X,L WHERE X login L, X is CWUser', [{'X': 'CWUser', 'L': 'String'}])], - [self.ldap, self.system], None, - {'X': 'table0.C0', 'X.login': 'table0.C1', 'L': 'table0.C1'}, []), - ('OneFetchStep', [('Any X,L LIMIT 10 OFFSET 10 WHERE X in_group G, X login L, G name "users", G is CWGroup, X is CWUser', - [{'X': 'CWUser', 'L': 'String', 'G': 'CWGroup'}])], - 10, 10, - [self.system], {'X': 'table0.C0', 'X.login': 'table0.C1', 'L': 'table0.C1'}, []) - ]) - - def test_complex_ordered(self): - self._test('Any L ORDERBY L WHERE X login L', - [('AggrStep', 'SELECT table0.C0 FROM table0\nORDER BY table0.C0', None, - [('FetchStep', [('Any L WHERE X login L, X is CWUser', - [{'X': 'CWUser', 'L': 'String'}])], - [self.ldap, self.system], {}, {'X.login': 'table0.C0', 'L': 'table0.C0'}, []), - ]) - ]) - - def test_complex_ordered_limit_offset(self): - self._test('Any L ORDERBY L LIMIT 10 OFFSET 10 WHERE X login L', - [('AggrStep', 'SELECT table0.C0 FROM table0\nORDER BY table0.C0\nLIMIT 10\nOFFSET 10', None, - [('FetchStep', [('Any L WHERE X login L, X is CWUser', - [{'X': 'CWUser', 'L': 'String'}])], - [self.ldap, self.system], {}, {'X.login': 'table0.C0', 'L': 'table0.C0'}, []), - ]) - ]) - - def test_complex_invariant_ordered(self): - """ - 1. retrieve Any X,AA WHERE X modification_date AA from system and ldap sources, store - concatenation of results into a temporary table - 2. return the result of Any X,AA ORDERBY AA WHERE %s owned_by X, X modification_date AA - on the system source - - herrr, this is what is expected by the XXX :(, not the actual result (which is correct anyway) - """ - ueid = self.session.user.eid - self._test('Any X,AA ORDERBY AA WHERE E eid %(x)s, E owned_by X, X modification_date AA', - [('FetchStep', - [('Any X,AA WHERE X modification_date AA, X is CWUser', - [{'AA': 'Datetime', 'X': 'CWUser'}])], - [self.ldap, self.system], None, - {'AA': 'table0.C1', 'X': 'table0.C0', 'X.modification_date': 'table0.C1'}, []), - ('OneFetchStep', - [('Any X,AA ORDERBY AA WHERE %s owned_by X, X modification_date AA, X is CWUser' % ueid, - [{'AA': 'Datetime', 'X': 'CWUser'}])], - None, None, [self.system], - {'AA': 'table0.C1', 'X': 'table0.C0', 'X.modification_date': 'table0.C1'}, []), - ], - {'x': ueid}) - - def test_complex_invariant(self): - """ - 1. retrieve Any X,L,AA WHERE X login L, X modification_date AA from system and ldap sources, store - concatenation of results into a temporary table - 2. return the result of Any X,L,AA WHERE %s owned_by X, X login L, X modification_date AA - on the system source - """ - ueid = self.session.user.eid - self._test('Any X,L,AA WHERE E eid %(x)s, E owned_by X, X login L, X modification_date AA', - [('FetchStep', [('Any X,L,AA WHERE X login L, X modification_date AA, X is CWUser', - [{'AA': 'Datetime', 'X': 'CWUser', 'L': 'String'}])], - [self.ldap, self.system], None, - {'AA': 'table0.C2', 'X': 'table0.C0', 'X.login': 'table0.C1', 'X.modification_date': 'table0.C2', 'L': 'table0.C1'}, []), - ('OneFetchStep', [('Any X,L,AA WHERE %s owned_by X, X login L, X modification_date AA, X is CWUser'%ueid, - [{'AA': 'Datetime', 'X': 'CWUser', 'L': 'String'}])], - None, None, [self.system], - {'AA': 'table0.C2', 'X': 'table0.C0', 'X.login': 'table0.C1', 'X.modification_date': 'table0.C2', 'L': 'table0.C1'}, [])], - {'x': ueid}) - - def test_complex_ambigous(self): - """retrieve CWUser X from system and ldap sources, Person X from system source only - """ - self._test('Any X,F WHERE X firstname F', - [('UnionStep', None, None, [ - ('OneFetchStep', [('Any X,F WHERE X firstname F, X is CWUser', - [{'X': 'CWUser', 'F': 'String'}])], - None, None, [self.ldap, self.system], {}, []), - ('OneFetchStep', [('Any X,F WHERE X firstname F, X is Personne', - [{'X': 'Personne', 'F': 'String'}])], - None, None, [self.system], {}, []), - ]), - ]) - - def test_complex_ambigous_limit_offset(self): - """retrieve CWUser X from system and ldap sources, Person X from system source only - """ - self._test('Any X,F LIMIT 10 OFFSET 10 WHERE X firstname F', - [('UnionStep', 10, 10, [ - ('OneFetchStep', [('Any X,F WHERE X firstname F, X is CWUser', - [{'X': 'CWUser', 'F': 'String'}])], - None, None, - [self.ldap, self.system], {}, []), - ('OneFetchStep', [('Any X,F WHERE X firstname F, X is Personne', - [{'X': 'Personne', 'F': 'String'}])], - None, None, [self.system], {}, []), - ]), - ]) - - def test_complex_ambigous_ordered(self): - """ - 1. retrieve CWUser X from system and ldap sources, Person X from system source only, store - each result in the same temp table - 2. return content of the table sorted - """ - self._test('Any X,F ORDERBY F WHERE X firstname F', - [('AggrStep', 'SELECT table0.C0, table0.C1 FROM table0\nORDER BY table0.C1', None, - [('FetchStep', [('Any X,F WHERE X firstname F, X is CWUser', - [{'X': 'CWUser', 'F': 'String'}])], - [self.ldap, self.system], {}, - {'X': 'table0.C0', 'X.firstname': 'table0.C1', 'F': 'table0.C1'}, []), - ('FetchStep', [('Any X,F WHERE X firstname F, X is Personne', - [{'X': 'Personne', 'F': 'String'}])], - [self.system], {}, - {'X': 'table0.C0', 'X.firstname': 'table0.C1', 'F': 'table0.C1'}, []), - ]), - ]) - - def test_complex_multiple(self): - """ - 1. retrieve Any X,A,Y,B WHERE X login A, Y login B from system and ldap sources, store - cartesian product of results into a temporary table - 2. return the result of Any X,Y WHERE X login 'syt', Y login 'adim' - on the system source - """ - ueid = self.session.user.eid - self._test('Any X,Y WHERE X login "syt", Y login "adim"', - [('FetchStep', - [('Any X WHERE X login "syt", X is CWUser', [{'X': 'CWUser'}])], - [self.ldap, self.system], None, - {'X': 'table0.C0'}, []), - ('FetchStep', - [('Any Y WHERE Y login "adim", Y is CWUser', [{'Y': 'CWUser'}])], - [self.ldap, self.system], None, - {'Y': 'table1.C0'}, []), - ('OneFetchStep', - [('Any X,Y WHERE X is CWUser, Y is CWUser', [{'X': 'CWUser', 'Y': 'CWUser'}])], - None, None, [self.system], - {'X': 'table0.C0', 'Y': 'table1.C0'}, []) - ], {'x': ueid}) - - def test_complex_multiple_limit_offset(self): - """ - 1. retrieve Any X,A,Y,B WHERE X login A, Y login B from system and ldap sources, store - cartesian product of results into a temporary table - 2. return the result of Any X,Y WHERE X login 'syt', Y login 'adim' - on the system source - """ - self._test('Any X,Y LIMIT 10 OFFSET 10 WHERE X login "syt", Y login "adim"', - [('FetchStep', - [('Any X WHERE X login "syt", X is CWUser', [{'X': 'CWUser'}])], - [self.ldap, self.system], None, {'X': 'table0.C0'}, []), - ('FetchStep', - [('Any Y WHERE Y login "adim", Y is CWUser', [{'Y': 'CWUser'}])], - [self.ldap, self.system], None, {'Y': 'table1.C0'}, []), - ('OneFetchStep', - [('Any X,Y LIMIT 10 OFFSET 10 WHERE X is CWUser, Y is CWUser', [{'X': 'CWUser', 'Y': 'CWUser'}])], - 10, 10, [self.system], - {'X': 'table0.C0', 'Y': 'table1.C0'}, []) - ]) - - def test_complex_aggregat(self): - self._test('Any MAX(X)', - [('OneFetchStep', - [('Any MAX(X)', X_ALL_SOLS)], - None, None, [self.system], {}, []) - ]) - - def test_complex_typed_aggregat(self): - self._test('Any MAX(X) WHERE X is Card', - [('AggrStep', 'SELECT MAX(table0.C0) FROM table0', None, - [('FetchStep', - [('Any MAX(X) WHERE X is Card', [{'X': 'Card'}])], - [self.cards, self.system], {}, {'MAX(X)': 'table0.C0'}, []) - ]) - ]) - - def test_complex_greater_eid(self): - self._test('Any X WHERE X eid > 12', - [('OneFetchStep', - [('Any X WHERE X eid > 12', X_ALL_SOLS)], - None, None, [self.system], {}, []) - ]) - - def test_complex_greater_typed_eid(self): - self._test('Any X WHERE X eid > 12, X is Card', - [('OneFetchStep', - [('Any X WHERE X eid > 12, X is Card', [{'X': 'Card'}])], - None, None, [self.system], {}, []) - ]) - - def test_complex_optional(self): - ueid = self.session.user.eid - self._test('Any U WHERE WF wf_info_for X, X eid %(x)s, WF owned_by U?, WF from_state FS', - [('OneFetchStep', [('Any U WHERE WF wf_info_for %s, WF owned_by U?, WF from_state FS' % ueid, - [{'WF': 'TrInfo', 'FS': 'State', 'U': 'CWUser'}])], - None, None, [self.system], {}, [])], - {'x': ueid}) - - def test_complex_optional(self): - ueid = self.session.user.eid - self._test('Any U WHERE WF wf_info_for X, X eid %(x)s, WF owned_by U?, WF from_state FS', - [('OneFetchStep', [('Any U WHERE WF wf_info_for %s, WF owned_by U?, WF from_state FS' % ueid, - [{'WF': 'TrInfo', 'FS': 'State', 'U': 'CWUser'}])], - None, None, [self.system], {}, [])], - {'x': ueid}) - - - def test_3sources_ambigous(self): - self._test('Any X,T WHERE X owned_by U, U login "syt", X title T, X is IN(Bookmark, Card, EmailThread)', - [('FetchStep', [('Any X,T WHERE X title T, X is Card', [{'X': 'Card', 'T': 'String'}])], - [self.cards, self.system], None, - {'T': 'table0.C1', 'X': 'table0.C0', 'X.title': 'table0.C1'}, []), - ('FetchStep', [('Any U WHERE U login "syt", U is CWUser', [{'U': 'CWUser'}])], - [self.ldap, self.system], None, - {'U': 'table1.C0'}, []), - ('UnionStep', None, None, [ - ('OneFetchStep', [('Any X,T WHERE X owned_by U, X title T, U is CWUser, X is IN(Bookmark, EmailThread)', - [{'T': 'String', 'U': 'CWUser', 'X': 'Bookmark'}, - {'T': 'String', 'U': 'CWUser', 'X': 'EmailThread'}])], - None, None, [self.system], {'U': 'table1.C0'}, []), - ('OneFetchStep', [('Any X,T WHERE X owned_by U, X title T, U is CWUser, X is Card', - [{'X': 'Card', 'U': 'CWUser', 'T': 'String'}])], - None, None, [self.system], - {'X': 'table0.C0', 'X.title': 'table0.C1', 'T': 'table0.C1', 'U': 'table1.C0'}, []), - ]), - ]) - - def test_restricted_max(self): - # dumb query to emulate the one generated by svnfile.entities.rql_revision_content - self._test('Any V, MAX(VR) WHERE V is Card, V creation_date VR, ' - '(V creation_date TODAY OR (V creation_date < TODAY AND NOT EXISTS(' - 'X is Card, X creation_date < TODAY, X creation_date >= VR)))', - [('FetchStep', [('Any VR WHERE X creation_date < TODAY, X creation_date VR, X is Card', - [{'X': 'Card', 'VR': 'Datetime'}])], - [self.cards, self.system], None, - {'VR': 'table0.C0', 'X.creation_date': 'table0.C0'}, []), - ('FetchStep', [('Any V,VR WHERE V creation_date VR, V is Card', - [{'VR': 'Datetime', 'V': 'Card'}])], - [self.cards, self.system], None, - {'VR': 'table1.C1', 'V': 'table1.C0', 'V.creation_date': 'table1.C1'}, []), - ('OneFetchStep', [('Any V,MAX(VR) WHERE V creation_date VR, (V creation_date TODAY) OR (V creation_date < TODAY, NOT EXISTS(X creation_date >= VR, X is Card)), V is Card', - [{'X': 'Card', 'VR': 'Datetime', 'V': 'Card'}])], - None, None, [self.system], - {'VR': 'table1.C1', 'V': 'table1.C0', 'V.creation_date': 'table1.C1', 'X.creation_date': 'table0.C0'}, []) - ]) - - def test_outer_supported_rel1(self): - # both system and rql support all variables, can be - self._test('Any X, R WHERE X is Note, X in_state S, X type R, ' - 'NOT EXISTS(Y is Note, Y in_state S, Y type R, X identity Y)', - [('OneFetchStep', [('Any X,R WHERE X is Note, X in_state S, X type R, NOT EXISTS(Y is Note, Y in_state S, Y type R, X identity Y), S is State', - [{'Y': 'Note', 'X': 'Note', 'S': 'State', 'R': 'String'}])], - None, None, - [self.cards, self.system], {}, []) - ]) - - def test_not_identity(self): - ueid = self.session.user.eid - self._test('Any X WHERE NOT X identity U, U eid %s, X is CWUser' % ueid, - [('OneFetchStep', - [('Any X WHERE NOT X identity %s, X is CWUser' % ueid, [{'X': 'CWUser'}])], - None, None, - [self.ldap, self.system], {}, []) - ]) - - def test_outer_supported_rel2(self): - self._test('Any X, MAX(R) GROUPBY X WHERE X in_state S, X login R, ' - 'NOT EXISTS(Y is Note, Y in_state S, Y type R)', - [('FetchStep', [('Any A,R WHERE Y in_state A, Y type R, A is State, Y is Note', - [{'Y': 'Note', 'A': 'State', 'R': 'String'}])], - [self.cards, self.system], None, - {'A': 'table0.C0', 'R': 'table0.C1', 'Y.type': 'table0.C1'}, []), - ('FetchStep', [('Any X,R WHERE X login R, X is CWUser', [{'X': 'CWUser', 'R': 'String'}])], - [self.ldap, self.system], None, - {'X': 'table1.C0', 'X.login': 'table1.C1', 'R': 'table1.C1'}, []), - ('OneFetchStep', [('Any X,MAX(R) GROUPBY X WHERE X in_state S, X login R, NOT EXISTS(Y type R, S identity A, A is State, Y is Note), S is State, X is CWUser', - [{'Y': 'Note', 'X': 'CWUser', 'S': 'State', 'R': 'String', 'A': 'State'}])], - None, None, [self.system], - {'A': 'table0.C0', 'X': 'table1.C0', 'X.login': 'table1.C1', 'R': 'table1.C1', 'Y.type': 'table0.C1'}, []) - ]) - - def test_security_has_text(self): - # use a guest user - self.session = self.user_groups_session('guests') - ueid = self.session.user.eid - self._test('Any X WHERE X has_text "bla"', - [('FetchStep', [('Any E WHERE E type "X", E is Note', [{'E': 'Note'}])], - [self.cards, self.system], None, {'E': 'table0.C0'}, []), - ('UnionStep', None, None, - [('OneFetchStep', - [(u'Any X WHERE X has_text "bla", (EXISTS(X owned_by %(ueid)s)) OR ((((EXISTS(D concerne C?, C owned_by %(ueid)s, C type "X", X identity D, C is Division, D is Affaire)) OR (EXISTS(H concerne G?, G owned_by %(ueid)s, G type "X", X identity H, G is SubDivision, H is Affaire))) OR (EXISTS(I concerne F?, F owned_by %(ueid)s, F type "X", X identity I, F is Societe, I is Affaire))) OR (EXISTS(J concerne E?, E owned_by %(ueid)s, X identity J, E is Note, J is Affaire))), X is Affaire' % {'ueid': ueid}, - [{'C': 'Division', 'E': 'Note', 'D': 'Affaire', 'G': 'SubDivision', 'F': 'Societe', 'I': 'Affaire', 'H': 'Affaire', 'J': 'Affaire', 'X': 'Affaire'}])], - None, None, [self.system], {'E': 'table0.C0'}, []), - ('OneFetchStep', - [('Any X WHERE X has_text "bla", EXISTS(X owned_by %s), X is Basket' % ueid, - [{'X': 'Basket'}]), - ('Any X WHERE X has_text "bla", EXISTS(X owned_by %s), X is CWUser' % ueid, - [{'X': 'CWUser'}]), - ('Any X WHERE X has_text "bla", X is IN(Card, Comment, Division, Email, EmailThread, File, Folder, Note, Personne, Societe, SubDivision, Tag)', - [{'X': 'Card'}, {'X': 'Comment'}, - {'X': 'Division'}, {'X': 'Email'}, {'X': 'EmailThread'}, - {'X': 'File'}, {'X': 'Folder'}, - {'X': 'Note'}, {'X': 'Personne'}, {'X': 'Societe'}, - {'X': 'SubDivision'}, {'X': 'Tag'}]),], - None, None, [self.system], {}, []), - ]) - ]) - - def test_security_has_text_limit_offset(self): - # use a guest user - self.session = self.user_groups_session('guests') - ueid = self.session.user.eid - # note: same as the above query but because of the subquery usage, the - # display differs (not printing solutions for each union) - self._test('Any X LIMIT 10 OFFSET 10 WHERE X has_text "bla"', - [('FetchStep', [('Any E WHERE E type "X", E is Note', [{'E': 'Note'}])], - [self.cards, self.system], None, {'E': 'table1.C0'}, []), - ('UnionFetchStep', [ - ('FetchStep', [('Any X WHERE X has_text "bla", (EXISTS(X owned_by %(ueid)s)) OR ((((EXISTS(D concerne C?, C owned_by %(ueid)s, C type "X", X identity D, C is Division, D is Affaire)) OR (EXISTS(H concerne G?, G owned_by %(ueid)s, G type "X", X identity H, G is SubDivision, H is Affaire))) OR (EXISTS(I concerne F?, F owned_by %(ueid)s, F type "X", X identity I, F is Societe, I is Affaire))) OR (EXISTS(J concerne E?, E owned_by %(ueid)s, X identity J, E is Note, J is Affaire))), X is Affaire' % {'ueid': ueid}, - [{'C': 'Division', 'E': 'Note', 'D': 'Affaire', 'G': 'SubDivision', 'F': 'Societe', 'I': 'Affaire', 'H': 'Affaire', 'J': 'Affaire', 'X': 'Affaire'}])], - [self.system], {'E': 'table1.C0'}, {'X': 'table0.C0'}, []), - ('FetchStep', - [('Any X WHERE X has_text "bla", EXISTS(X owned_by %s), X is Basket' % ueid, - [{'X': 'Basket'}]), - ('Any X WHERE X has_text "bla", EXISTS(X owned_by %s), X is CWUser' % ueid, - [{'X': 'CWUser'}]), - ('Any X WHERE X has_text "bla", X is IN(Card, Comment, Division, Email, EmailThread, File, Folder, Note, Personne, Societe, SubDivision, Tag)', - [{'X': 'Card'}, {'X': 'Comment'}, - {'X': 'Division'}, {'X': 'Email'}, {'X': 'EmailThread'}, - {'X': 'File'}, {'X': 'Folder'}, - {'X': 'Note'}, {'X': 'Personne'}, {'X': 'Societe'}, - {'X': 'SubDivision'}, {'X': 'Tag'}])], - [self.system], {}, {'X': 'table0.C0'}, []), - ]), - ('OneFetchStep', - [('Any X LIMIT 10 OFFSET 10', - [{'X': 'Affaire'}, {'X': 'Basket'}, - {'X': 'CWUser'}, {'X': 'Card'}, {'X': 'Comment'}, - {'X': 'Division'}, {'X': 'Email'}, {'X': 'EmailThread'}, - {'X': 'File'}, {'X': 'Folder'}, - {'X': 'Note'}, {'X': 'Personne'}, {'X': 'Societe'}, - {'X': 'SubDivision'}, {'X': 'Tag'}])], - 10, 10, [self.system], {'X': 'table0.C0'}, []) - ]) - - def test_security_user(self): - """a guest user trying to see another user: EXISTS(X owned_by U) is automatically inserted""" - # use a guest user - self.session = self.user_groups_session('guests') - ueid = self.session.user.eid - self._test('Any X WHERE X login "bla"', - [('FetchStep', - [('Any X WHERE X login "bla", X is CWUser', [{'X': 'CWUser'}])], - [self.ldap, self.system], None, {'X': 'table0.C0'}, []), - ('OneFetchStep', - [('Any X WHERE EXISTS(X owned_by %s), X is CWUser' % ueid, [{'X': 'CWUser'}])], - None, None, [self.system], {'X': 'table0.C0'}, [])]) - - def test_security_complex_has_text(self): - # use a guest user - self.session = self.user_groups_session('guests') - ueid = self.session.user.eid - self._test('Any X WHERE X has_text "bla", X firstname "bla"', - [('FetchStep', [('Any X WHERE X firstname "bla", X is CWUser', [{'X': 'CWUser'}])], - [self.ldap, self.system], None, {'X': 'table0.C0'}, []), - ('UnionStep', None, None, [ - ('OneFetchStep', [('Any X WHERE X has_text "bla", EXISTS(X owned_by %s), X is CWUser' % ueid, [{'X': 'CWUser'}])], - None, None, [self.system], {'X': 'table0.C0'}, []), - ('OneFetchStep', [('Any X WHERE X has_text "bla", X firstname "bla", X is Personne', [{'X': 'Personne'}])], - None, None, [self.system], {}, []), - ]), - ]) - - def test_security_complex_has_text_limit_offset(self): - # use a guest user - self.session = self.user_groups_session('guests') - ueid = self.session.user.eid - self._test('Any X LIMIT 10 OFFSET 10 WHERE X has_text "bla", X firstname "bla"', - [('FetchStep', [('Any X WHERE X firstname "bla", X is CWUser', [{'X': 'CWUser'}])], - [self.ldap, self.system], None, {'X': 'table1.C0'}, []), - ('UnionFetchStep', [ - ('FetchStep', [('Any X WHERE X has_text "bla", EXISTS(X owned_by %s), X is CWUser' % ueid, [{'X': 'CWUser'}])], - [self.system], {'X': 'table1.C0'}, {'X': 'table0.C0'}, []), - ('FetchStep', [('Any X WHERE X has_text "bla", X firstname "bla", X is Personne', [{'X': 'Personne'}])], - [self.system], {}, {'X': 'table0.C0'}, []), - ]), - ('OneFetchStep', - [('Any X LIMIT 10 OFFSET 10', [{'X': 'CWUser'}, {'X': 'Personne'}])], - 10, 10, [self.system], {'X': 'table0.C0'}, []) - ]) - - def test_security_complex_aggregat(self): - # use a guest user - self.session = self.user_groups_session('guests') - ueid = self.session.user.eid - ALL_SOLS = X_ALL_SOLS[:] - ALL_SOLS.remove({'X': 'CWSourceHostConfig'}) # not authorized - ALL_SOLS.remove({'X': 'CWSourceSchemaConfig'}) # not authorized - ALL_SOLS.remove({'X': 'CWDataImport'}) # not authorized - self._test('Any MAX(X)', - [('FetchStep', [('Any E WHERE E type "X", E is Note', [{'E': 'Note'}])], - [self.cards, self.system], None, {'E': 'table1.C0'}, []), - ('FetchStep', [('Any X WHERE X is CWUser', [{'X': 'CWUser'}])], - [self.ldap, self.system], None, {'X': 'table2.C0'}, []), - ('UnionFetchStep', [ - ('FetchStep', [('Any X WHERE EXISTS(%s use_email X), X is EmailAddress' % ueid, - [{'X': 'EmailAddress'}]), - ('Any X WHERE EXISTS(X owned_by %s), X is Basket' % ueid, [{'X': 'Basket'}])], - [self.system], {}, {'X': 'table0.C0'}, []), - ('UnionFetchStep', - [('FetchStep', [('Any X WHERE X is IN(Card, Note, State)', - [{'X': 'Card'}, {'X': 'Note'}, {'X': 'State'}])], - [self.cards, self.system], {}, {'X': 'table0.C0'}, []), - ('FetchStep', - [('Any X WHERE X is IN(BaseTransition, Bookmark, CWAttribute, CWCache, CWConstraint, CWConstraintType, CWEType, CWGroup, CWPermission, CWProperty, CWRType, CWRelation, CWSource, CWUniqueTogetherConstraint, Comment, Division, Email, EmailPart, EmailThread, ExternalUri, File, Folder, Old, Personne, RQLExpression, Societe, SubDivision, SubWorkflowExitPoint, Tag, TrInfo, Transition, Workflow, WorkflowTransition)', - [{'X': 'BaseTransition'}, {'X': 'Bookmark'}, - {'X': 'CWAttribute'}, {'X': 'CWCache'}, - {'X': 'CWConstraint'}, {'X': 'CWConstraintType'}, - {'X': 'CWEType'}, {'X': 'CWGroup'}, - {'X': 'CWPermission'}, {'X': 'CWProperty'}, - {'X': 'CWRType'}, {'X': 'CWRelation'}, - {'X': 'CWSource'}, - {'X': 'CWUniqueTogetherConstraint'}, - {'X': 'Comment'}, {'X': 'Division'}, - {'X': 'Email'}, - {'X': 'EmailPart'}, {'X': 'EmailThread'}, - {'X': 'ExternalUri'}, {'X': 'File'}, - {'X': 'Folder'}, {'X': 'Old'}, - {'X': 'Personne'}, {'X': 'RQLExpression'}, - {'X': 'Societe'}, {'X': 'SubDivision'}, - {'X': 'SubWorkflowExitPoint'}, {'X': 'Tag'}, - {'X': 'TrInfo'}, {'X': 'Transition'}, - {'X': 'Workflow'}, {'X': 'WorkflowTransition'}])], - [self.system], {}, {'X': 'table0.C0'}, []), - ]), - ('FetchStep', [('Any X WHERE EXISTS(X owned_by %s), X is CWUser' % ueid, [{'X': 'CWUser'}])], - [self.system], {'X': 'table2.C0'}, {'X': 'table0.C0'}, []), - ('FetchStep', [('Any X WHERE (EXISTS(X owned_by %(ueid)s)) OR ((((EXISTS(D concerne C?, C owned_by %(ueid)s, C type "X", X identity D, C is Division, D is Affaire)) OR (EXISTS(H concerne G?, G owned_by %(ueid)s, G type "X", X identity H, G is SubDivision, H is Affaire))) OR (EXISTS(I concerne F?, F owned_by %(ueid)s, F type "X", X identity I, F is Societe, I is Affaire))) OR (EXISTS(J concerne E?, E owned_by %(ueid)s, X identity J, E is Note, J is Affaire))), X is Affaire' % {'ueid': ueid}, - [{'C': 'Division', 'E': 'Note', 'D': 'Affaire', 'G': 'SubDivision', 'F': 'Societe', 'I': 'Affaire', 'H': 'Affaire', 'J': 'Affaire', 'X': 'Affaire'}])], - [self.system], {'E': 'table1.C0'}, {'X': 'table0.C0'}, []), - ]), - ('OneFetchStep', [('Any MAX(X)', ALL_SOLS)], - None, None, [self.system], {'X': 'table0.C0'}, []) - ]) - - def test_security_complex_aggregat2(self): - # use a guest user - self.session = self.user_groups_session('guests') - ueid = self.session.user.eid - X_ET_ALL_SOLS = [] - for s in X_ALL_SOLS: - if s in ({'X': 'CWSourceHostConfig'}, {'X': 'CWSourceSchemaConfig'}, {'X': 'CWDataImport'}): - continue # not authorized - ets = {'ET': 'CWEType'} - ets.update(s) - X_ET_ALL_SOLS.append(ets) - self._test('Any ET, COUNT(X) GROUPBY ET ORDERBY ET WHERE X is ET', - [('FetchStep', [('Any X WHERE X is IN(Card, Note, State)', - [{'X': 'Card'}, {'X': 'Note'}, {'X': 'State'}])], - [self.cards, self.system], None, {'X': 'table1.C0'}, []), - ('FetchStep', [('Any E WHERE E type "X", E is Note', [{'E': 'Note'}])], - [self.cards, self.system], None, {'E': 'table2.C0'}, []), - ('FetchStep', [('Any X WHERE X is CWUser', [{'X': 'CWUser'}])], - [self.ldap, self.system], None, {'X': 'table3.C0'}, []), - ('UnionFetchStep', - [('FetchStep', [('Any ET,X WHERE X is ET, EXISTS(%s use_email X), ET is CWEType, X is EmailAddress' % ueid, - [{'ET': 'CWEType', 'X': 'EmailAddress'}]), ('Any ET,X WHERE X is ET, EXISTS(X owned_by %s), ET is CWEType, X is Basket' % ueid, - [{'ET': 'CWEType', 'X': 'Basket'}])], - [self.system], {}, {'ET': 'table0.C0', 'X': 'table0.C1'}, []), - ('FetchStep', [('Any ET,X WHERE X is ET, (EXISTS(X owned_by %(ueid)s)) OR ((((EXISTS(D concerne C?, C owned_by %(ueid)s, C type "X", X identity D, C is Division, D is Affaire)) OR (EXISTS(H concerne G?, G owned_by %(ueid)s, G type "X", X identity H, G is SubDivision, H is Affaire))) OR (EXISTS(I concerne F?, F owned_by %(ueid)s, F type "X", X identity I, F is Societe, I is Affaire))) OR (EXISTS(J concerne E?, E owned_by %(ueid)s, X identity J, E is Note, J is Affaire))), ET is CWEType, X is Affaire' % {'ueid': ueid}, - [{'C': 'Division', 'E': 'Note', 'D': 'Affaire', - 'G': 'SubDivision', 'F': 'Societe', 'I': 'Affaire', - 'H': 'Affaire', 'J': 'Affaire', 'X': 'Affaire', - 'ET': 'CWEType'}])], - [self.system], {'E': 'table2.C0'}, {'ET': 'table0.C0', 'X': 'table0.C1'}, - []), - ('FetchStep', [('Any ET,X WHERE X is ET, EXISTS(X owned_by %s), ET is CWEType, X is CWUser' % ueid, - [{'ET': 'CWEType', 'X': 'CWUser'}])], - [self.system], {'X': 'table3.C0'}, {'ET': 'table0.C0', 'X': 'table0.C1'}, []), - # extra UnionFetchStep could be avoided but has no cost, so don't care - ('UnionFetchStep', - [('FetchStep', [('Any ET,X WHERE X is ET, ET is CWEType, X is IN(BaseTransition, Bookmark, CWAttribute, CWCache, CWConstraint, CWConstraintType, CWEType, CWGroup, CWPermission, CWProperty, CWRType, CWRelation, CWSource, CWUniqueTogetherConstraint, Comment, Division, Email, EmailPart, EmailThread, ExternalUri, File, Folder, Old, Personne, RQLExpression, Societe, SubDivision, SubWorkflowExitPoint, Tag, TrInfo, Transition, Workflow, WorkflowTransition)', - [{'X': 'BaseTransition', 'ET': 'CWEType'}, - {'X': 'Bookmark', 'ET': 'CWEType'}, {'X': 'CWAttribute', 'ET': 'CWEType'}, - {'X': 'CWCache', 'ET': 'CWEType'}, {'X': 'CWConstraint', 'ET': 'CWEType'}, - {'X': 'CWConstraintType', 'ET': 'CWEType'}, - {'X': 'CWEType', 'ET': 'CWEType'}, - {'X': 'CWGroup', 'ET': 'CWEType'}, {'X': 'CWPermission', 'ET': 'CWEType'}, - {'X': 'CWProperty', 'ET': 'CWEType'}, {'X': 'CWRType', 'ET': 'CWEType'}, - {'X': 'CWSource', 'ET': 'CWEType'}, - {'X': 'CWRelation', 'ET': 'CWEType'}, - {'X': 'CWUniqueTogetherConstraint', 'ET': 'CWEType'}, - {'X': 'Comment', 'ET': 'CWEType'}, - {'X': 'Division', 'ET': 'CWEType'}, {'X': 'Email', 'ET': 'CWEType'}, - {'X': 'EmailPart', 'ET': 'CWEType'}, - {'X': 'EmailThread', 'ET': 'CWEType'}, {'X': 'ExternalUri', 'ET': 'CWEType'}, - {'X': 'File', 'ET': 'CWEType'}, {'X': 'Folder', 'ET': 'CWEType'}, - {'X': 'Old', 'ET': 'CWEType'}, {'X': 'Personne', 'ET': 'CWEType'}, - {'X': 'RQLExpression', 'ET': 'CWEType'}, {'X': 'Societe', 'ET': 'CWEType'}, - {'X': 'SubDivision', 'ET': 'CWEType'}, {'X': 'SubWorkflowExitPoint', 'ET': 'CWEType'}, - {'X': 'Tag', 'ET': 'CWEType'}, {'X': 'TrInfo', 'ET': 'CWEType'}, - {'X': 'Transition', 'ET': 'CWEType'}, {'X': 'Workflow', 'ET': 'CWEType'}, - {'X': 'WorkflowTransition', 'ET': 'CWEType'}])], - [self.system], {}, {'ET': 'table0.C0', 'X': 'table0.C1'}, []), - ('FetchStep', - [('Any ET,X WHERE X is ET, ET is CWEType, X is IN(Card, Note, State)', - [{'ET': 'CWEType', 'X': 'Card'}, - {'ET': 'CWEType', 'X': 'Note'}, - {'ET': 'CWEType', 'X': 'State'}])], - [self.system], {'X': 'table1.C0'}, {'ET': 'table0.C0', 'X': 'table0.C1'}, []), - ]), - ]), - ('OneFetchStep', - [('Any ET,COUNT(X) GROUPBY ET ORDERBY ET', X_ET_ALL_SOLS)], - None, None, [self.system], {'ET': 'table0.C0', 'X': 'table0.C1'}, []) - ]) - - def test_security_3sources(self): - # use a guest user - self.session = self.user_groups_session('guests') - ueid = self.session.user.eid - self._test('Any X, XT WHERE X is Card, X owned_by U, X title XT, U login "syt"', - [('FetchStep', - [('Any X,XT WHERE X title XT, X is Card', [{'X': 'Card', 'XT': 'String'}])], - [self.cards, self.system], None, {'X': 'table0.C0', 'X.title': 'table0.C1', 'XT': 'table0.C1'}, []), - ('FetchStep', - [('Any U WHERE U login "syt", U is CWUser', [{'U': 'CWUser'}])], - [self.ldap, self.system], None, {'U': 'table1.C0'}, []), - ('OneFetchStep', - [('Any X,XT WHERE X owned_by U, X title XT, EXISTS(U owned_by %s), U is CWUser, X is Card' % ueid, - [{'X': 'Card', 'U': 'CWUser', 'XT': 'String'}])], - None, None, [self.system], - {'X': 'table0.C0', 'X.title': 'table0.C1', 'XT': 'table0.C1', 'U': 'table1.C0'}, []) - ]) - - def test_security_3sources_identity(self): - self.restore_orig_cwuser_security() - # use a guest user - self.session = self.user_groups_session('guests') - ueid = self.session.user.eid - self._test('Any X, XT WHERE X is Card, X owned_by U, X title XT, U login "syt"', - [('FetchStep', - [('Any X,XT WHERE X title XT, X is Card', [{'X': 'Card', 'XT': 'String'}])], - [self.cards, self.system], None, {'X': 'table0.C0', 'X.title': 'table0.C1', 'XT': 'table0.C1'}, []), - ('OneFetchStep', - [('Any X,XT WHERE X owned_by U, X title XT, U login "syt", EXISTS(U identity %s), U is CWUser, X is Card' % ueid, - [{'U': 'CWUser', 'X': 'Card', 'XT': 'String'}])], - None, None, [self.system], {'X': 'table0.C0', 'X.title': 'table0.C1', 'XT': 'table0.C1'}, []) - ]) - - def test_security_3sources_identity_optional_var(self): - self.restore_orig_cwuser_security() - # use a guest user - self.session = self.user_groups_session('guests') - ueid = self.session.user.eid - self._test('Any X,XT,U WHERE X is Card, X owned_by U?, X title XT, U login L', - [('FetchStep', - [('Any U,L WHERE U login L, EXISTS(U identity %s), U is CWUser' % ueid, - [{'L': 'String', u'U': 'CWUser'}])], - [self.system], {}, {'L': 'table0.C1', 'U': 'table0.C0', 'U.login': 'table0.C1'}, []), - ('FetchStep', - [('Any X,XT WHERE X title XT, X is Card', [{'X': 'Card', 'XT': 'String'}])], - [self.cards, self.system], None, {'X': 'table1.C0', 'X.title': 'table1.C1', 'XT': 'table1.C1'}, []), - ('OneFetchStep', - [('Any X,XT,U WHERE X owned_by U?, X title XT, X is Card', - [{'X': 'Card', 'U': 'CWUser', 'XT': 'String'}])], - None, None, [self.system], {'L': 'table0.C1', - 'U': 'table0.C0', - 'X': 'table1.C0', - 'X.title': 'table1.C1', - 'XT': 'table1.C1'}, []) - ]) - - def test_security_3sources_limit_offset(self): - # use a guest user - self.session = self.user_groups_session('guests') - ueid = self.session.user.eid - self._test('Any X, XT LIMIT 10 OFFSET 10 WHERE X is Card, X owned_by U, X title XT, U login "syt"', - [('FetchStep', - [('Any X,XT WHERE X title XT, X is Card', [{'X': 'Card', 'XT': 'String'}])], - [self.cards, self.system], None, {'X': 'table0.C0', 'X.title': 'table0.C1', 'XT': 'table0.C1'}, []), - ('FetchStep', - [('Any U WHERE U login "syt", U is CWUser', [{'U': 'CWUser'}])], - [self.ldap, self.system], None, {'U': 'table1.C0'}, []), - ('OneFetchStep', - [('Any X,XT LIMIT 10 OFFSET 10 WHERE X owned_by U, X title XT, EXISTS(U owned_by %s), U is CWUser, X is Card' % ueid, - [{'X': 'Card', 'U': 'CWUser', 'XT': 'String'}])], - 10, 10, [self.system], - {'X': 'table0.C0', 'X.title': 'table0.C1', 'XT': 'table0.C1', 'U': 'table1.C0'}, []) - ]) - - def test_exists_base(self): - self._test('Any X,L,S WHERE X in_state S, X login L, EXISTS(X in_group G, G name "bougloup")', - [('FetchStep', [('Any X,L WHERE X login L, X is CWUser', [{'X': 'CWUser', 'L': 'String'}])], - [self.ldap, self.system], None, {'X': 'table0.C0', 'X.login': 'table0.C1', 'L': 'table0.C1'}, []), - ('OneFetchStep', [("Any X,L,S WHERE X in_state S, X login L, " - 'EXISTS(X in_group G, G name "bougloup", G is CWGroup), S is State, X is CWUser', - [{'X': 'CWUser', 'L': 'String', 'S': 'State', 'G': 'CWGroup'}])], - None, None, [self.system], - {'X': 'table0.C0', 'X.login': 'table0.C1', 'L': 'table0.C1'}, [])]) - - def test_exists_complex(self): - self._test('Any G WHERE X in_group G, G name "managers", EXISTS(X copain T, T login in ("comme", "cochon"))', - [('FetchStep', [('Any T WHERE T login IN("comme", "cochon"), T is CWUser', [{'T': 'CWUser'}])], - [self.ldap, self.system], None, {'T': 'table0.C0'}, []), - ('OneFetchStep', - [('Any G WHERE X in_group G, G name "managers", EXISTS(X copain T, T is CWUser), G is CWGroup, X is CWUser', - [{'X': 'CWUser', 'T': 'CWUser', 'G': 'CWGroup'}])], - None, None, [self.system], {'T': 'table0.C0'}, [])]) - - def test_exists3(self): - self._test('Any G,L WHERE X in_group G, X login L, G name "managers", EXISTS(X copain T, T login in ("comme", "cochon"))', - [('FetchStep', - [('Any T WHERE T login IN("comme", "cochon"), T is CWUser', - [{'T': 'CWUser'}])], - [self.ldap, self.system], None, {'T': 'table0.C0'}, []), - ('FetchStep', - [('Any L,X WHERE X login L, X is CWUser', [{'X': 'CWUser', 'L': 'String'}])], - [self.ldap, self.system], None, - {'X': 'table1.C1', 'X.login': 'table1.C0', 'L': 'table1.C0'}, []), - ('OneFetchStep', - [('Any G,L WHERE X in_group G, X login L, G name "managers", EXISTS(X copain T, T is CWUser), G is CWGroup, X is CWUser', - [{'G': 'CWGroup', 'L': 'String', 'T': 'CWUser', 'X': 'CWUser'}])], - None, None, - [self.system], {'T': 'table0.C0', 'X': 'table1.C1', 'X.login': 'table1.C0', 'L': 'table1.C0'}, [])]) - - def test_exists4(self): - self._test('Any G,L WHERE X in_group G, X login L, G name "managers", ' - 'EXISTS(X copain T, T login L, T login in ("comme", "cochon")) OR ' - 'EXISTS(X in_state S, S name "pascontent", NOT X copain T2, T2 login "billy")', - [('FetchStep', - [('Any T,L WHERE T login L, T login IN("comme", "cochon"), T is CWUser', [{'T': 'CWUser', 'L': 'String'}])], - [self.ldap, self.system], None, - {'T': 'table0.C0', 'T.login': 'table0.C1', 'L': 'table0.C1'}, []), - ('FetchStep', - [('Any T2 WHERE T2 login "billy", T2 is CWUser', [{'T2': 'CWUser'}])], - [self.ldap, self.system], None, {'T2': 'table1.C0'}, []), - ('FetchStep', - [('Any L,X WHERE X login L, X is CWUser', [{'X': 'CWUser', 'L': 'String'}])], - [self.ldap, self.system], None, {'X': 'table2.C1', 'X.login': 'table2.C0', 'L': 'table2.C0'}, []), - ('OneFetchStep', - [('Any G,L WHERE X in_group G, X login L, G name "managers", (EXISTS(X copain T, T login L, T is CWUser)) OR (EXISTS(X in_state S, S name "pascontent", NOT EXISTS(X copain T2), S is State)), G is CWGroup, T2 is CWUser, X is CWUser', - [{'G': 'CWGroup', 'L': 'String', 'S': 'State', 'T': 'CWUser', 'T2': 'CWUser', 'X': 'CWUser'}])], - None, None, [self.system], - {'T2': 'table1.C0', 'L': 'table2.C0', - 'T': 'table0.C0', 'T.login': 'table0.C1', 'X': 'table2.C1', 'X.login': 'table2.C0'}, [])]) - - def test_exists5(self): - self._test('Any GN,L WHERE X in_group G, X login L, G name GN, ' - 'EXISTS(X copain T, T login in ("comme", "cochon")) AND ' - 'NOT EXISTS(X copain T2, T2 login "billy")', - [('FetchStep', [('Any T WHERE T login IN("comme", "cochon"), T is CWUser', - [{'T': 'CWUser'}])], - [self.ldap, self.system], None, {'T': 'table0.C0'}, []), - ('FetchStep', [('Any T2 WHERE T2 login "billy", T2 is CWUser', [{'T2': 'CWUser'}])], - [self.ldap, self.system], None, {'T2': 'table1.C0'}, []), - ('FetchStep', [('Any L,X WHERE X login L, X is CWUser', [{'X': 'CWUser', 'L': 'String'}])], - [self.ldap, self.system], None, - {'X': 'table2.C1', 'X.login': 'table2.C0', 'L': 'table2.C0'}, []), - ('OneFetchStep', [('Any GN,L WHERE X in_group G, X login L, G name GN, EXISTS(X copain T, T is CWUser), NOT EXISTS(X copain T2, T2 is CWUser), G is CWGroup, X is CWUser', - [{'G': 'CWGroup', 'GN': 'String', 'L': 'String', 'T': 'CWUser', 'T2': 'CWUser', 'X': 'CWUser'}])], - None, None, [self.system], - {'T': 'table0.C0', 'T2': 'table1.C0', - 'X': 'table2.C1', 'X.login': 'table2.C0', 'L': 'table2.C0'}, [])]) - - def test_exists_security_no_invariant(self): - ueid = self.session.user.eid - self._test('Any X,AA,AB,AC,AD ORDERBY AA WHERE X is CWUser, X login AA, X firstname AB, X surname AC, X modification_date AD, A eid %(B)s, \ - EXISTS(((X identity A) OR \ - (EXISTS(X in_group C, C name IN("managers", "staff"), C is CWGroup))) OR \ - (EXISTS(X in_group D, A in_group D, NOT D name "users", D is CWGroup)))', - [('FetchStep', [('Any X,AA,AB,AC,AD WHERE X login AA, X firstname AB, X surname AC, X modification_date AD, X is CWUser', - [{'AA': 'String', 'AB': 'String', 'AC': 'String', 'AD': 'Datetime', - 'X': 'CWUser'}])], - [self.ldap, self.system], None, {'AA': 'table0.C1', 'AB': 'table0.C2', - 'AC': 'table0.C3', 'AD': 'table0.C4', - 'X': 'table0.C0', - 'X.firstname': 'table0.C2', - 'X.login': 'table0.C1', - 'X.modification_date': 'table0.C4', - 'X.surname': 'table0.C3'}, []), - ('OneFetchStep', [('Any X,AA,AB,AC,AD ORDERBY AA WHERE X login AA, X firstname AB, X surname AC, X modification_date AD, EXISTS(((X identity %(ueid)s) OR (EXISTS(X in_group C, C name IN("managers", "staff"), C is CWGroup))) OR (EXISTS(X in_group D, %(ueid)s in_group D, NOT D name "users", D is CWGroup))), X is CWUser' % {'ueid': ueid}, - [{'AA': 'String', 'AB': 'String', 'AC': 'String', 'AD': 'Datetime', - 'C': 'CWGroup', 'D': 'CWGroup', 'X': 'CWUser'}])], - None, None, [self.system], - {'AA': 'table0.C1', 'AB': 'table0.C2', 'AC': 'table0.C3', 'AD': 'table0.C4', - 'X': 'table0.C0', - 'X.firstname': 'table0.C2', 'X.login': 'table0.C1', 'X.modification_date': 'table0.C4', 'X.surname': 'table0.C3'}, - [])], - {'B': ueid}) - - def test_relation_need_split(self): - self._test('Any X, S WHERE X in_state S', - [('UnionStep', None, None, [ - ('OneFetchStep', [('Any X,S WHERE X in_state S, S is State, X is IN(Affaire, CWUser)', - [{'X': 'Affaire', 'S': 'State'}, {'X': 'CWUser', 'S': 'State'}])], - None, None, [self.system], {}, []), - ('OneFetchStep', [('Any X,S WHERE X in_state S, S is State, X is Note', - [{'X': 'Note', 'S': 'State'}])], - None, None, [self.cards, self.system], {}, []), - ])]) - - def test_relation_selection_need_split(self): - self._test('Any X,S,U WHERE X in_state S, X todo_by U', - [('FetchStep', [('Any X,S WHERE X in_state S, S is State, X is Note', - [{'X': 'Note', 'S': 'State'}])], - [self.cards, self.system], None, {'X': 'table0.C0', 'S': 'table0.C1'}, []), - ('UnionStep', None, None, - [('OneFetchStep', [('Any X,S,U WHERE X in_state S, X todo_by U, S is State, U is Personne, X is Affaire', - [{'X': 'Affaire', 'S': 'State', 'U': 'Personne'}])], - None, None, [self.system], {}, []), - ('OneFetchStep', [('Any X,S,U WHERE X todo_by U, S is State, U is CWUser, X is Note', - [{'X': 'Note', 'S': 'State', 'U': 'CWUser'}])], - None, None, [self.system], {'X': 'table0.C0', 'S': 'table0.C1'}, []), - ]) - ]) - - def test_relation_restriction_need_split(self): - self._test('Any X,U WHERE X in_state S, S name "pending", X todo_by U', - [('FetchStep', [('Any X WHERE X in_state S, S name "pending", S is State, X is Note', - [{'X': 'Note', 'S': 'State'}])], - [self.cards, self.system], None, {'X': 'table0.C0'}, []), - ('UnionStep', None, None, - [('OneFetchStep', [('Any X,U WHERE X todo_by U, U is CWUser, X is Note', - [{'X': 'Note', 'U': 'CWUser'}])], - None, None, [self.system], {'X': 'table0.C0'}, []), - ('OneFetchStep', [('Any X,U WHERE X in_state S, S name "pending", X todo_by U, S is State, U is Personne, X is Affaire', - [{'S': 'State', 'U': 'Personne', 'X': 'Affaire'}])], - None, None, [self.system], {}, []) - ]) - ]) - - def test_relation_restriction_ambigous_need_split(self): - self._test('Any X,T WHERE X in_state S, S name "pending", T tags X', - [('FetchStep', [('Any X WHERE X in_state S, S name "pending", S is State, X is Note', - [{'X': 'Note', 'S': 'State'}])], - [self.cards, self.system], None, {'X': 'table0.C0'}, []), - ('UnionStep', None, None, [ - ('OneFetchStep', [('Any X,T WHERE T tags X, T is Tag, X is Note', - [{'X': 'Note', 'T': 'Tag'}])], - None, None, - [self.system], {'X': 'table0.C0'}, []), - ('OneFetchStep', [('Any X,T WHERE X in_state S, S name "pending", T tags X, S is State, T is Tag, X is IN(Affaire, CWUser)', - [{'X': 'Affaire', 'S': 'State', 'T': 'Tag'}, - {'X': 'CWUser', 'S': 'State', 'T': 'Tag'}])], - None, None, - [self.system], {}, []), - ]) - ]) - - def test_not_relation_no_split_internal(self): - ueid = self.session.user.eid - # NOT on a relation supported by rql and system source: we want to get - # all states (eg from both sources) which are not related to entity with the - # given eid. The "NOT X in_state S, X eid %(x)s" expression is necessarily true - # in the source where %(x)s is not coming from and will be removed during rql - # generation for the external source - self._test('Any SN WHERE NOT X in_state S, X eid %(x)s, S name SN', - [('OneFetchStep', [('Any SN WHERE NOT EXISTS(%s in_state S), S name SN, S is State' % ueid, - [{'S': 'State', 'SN': 'String'}])], - None, None, [self.cards, self.system], {}, [])], - {'x': ueid}) - - def test_not_relation_no_split_external(self): - repo._type_source_cache[999999] = ('Note', 'cards', 999999, 'cards') - # similar to the above test but with an eid coming from the external source. - # the same plan may be used, since we won't find any record in the system source - # linking 9999999 to a state - self._test('Any SN WHERE NOT X in_state S, X eid %(x)s, S name SN', - [('OneFetchStep', [('Any SN WHERE NOT EXISTS(999999 in_state S), S name SN, S is State', - [{'S': 'State', 'SN': 'String'}])], - None, None, [self.cards, self.system], {}, [])], - {'x': 999999}) - - def test_not_relation_need_split(self): - self._test('Any SN WHERE NOT X in_state S, S name SN', - [('FetchStep', [('Any SN,S WHERE S name SN, S is State', - [{'S': 'State', 'SN': 'String'}])], - [self.cards, self.system], None, {'S': 'table0.C1', 'S.name': 'table0.C0', 'SN': 'table0.C0'}, - []), - ('IntersectStep', None, None, - [('OneFetchStep', - [('Any SN WHERE NOT EXISTS(X in_state S, X is Note), S name SN, S is State', - [{'S': 'State', 'SN': 'String', 'X': 'Note'}])], - None, None, [self.cards, self.system], {}, - []), - ('OneFetchStep', - [('Any SN WHERE NOT EXISTS(X in_state S, X is IN(Affaire, CWUser)), S name SN, S is State', - [{'S': 'State', 'SN': 'String', 'X': 'Affaire'}, - {'S': 'State', 'SN': 'String', 'X': 'CWUser'}])], - None, None, [self.system], {'S': 'table0.C1', 'S.name': 'table0.C0', 'SN': 'table0.C0'}, - []),] - )]) - - def test_external_attributes_and_relation(self): - repo._type_source_cache[999999] = ('Note', 'cards', 999999, 'cards') - self._test('Any A,B,C,D WHERE A eid %(x)s,A creation_date B,A modification_date C, A todo_by D?', - [('FetchStep', [('Any A,B,C WHERE A eid 999999, A creation_date B, A modification_date C, A is Note', - [{'A': 'Note', 'C': 'Datetime', 'B': 'Datetime'}])], - [self.cards], None, - {'A': 'table0.C0', 'A.creation_date': 'table0.C1', 'A.modification_date': 'table0.C2', 'C': 'table0.C2', 'B': 'table0.C1'}, []), - #('FetchStep', [('Any D WHERE D is CWUser', [{'D': 'CWUser'}])], - # [self.ldap, self.system], None, {'D': 'table1.C0'}, []), - ('OneFetchStep', [('Any A,B,C,D WHERE A creation_date B, A modification_date C, A todo_by D?, A is Note, D is CWUser', - [{'A': 'Note', 'C': 'Datetime', 'B': 'Datetime', 'D': 'CWUser'}])], - None, None, [self.system], - {'A': 'table0.C0', 'A.creation_date': 'table0.C1', 'A.modification_date': 'table0.C2', 'C': 'table0.C2', 'B': 'table0.C1'}, [])], - {'x': 999999}) - - - def test_simplified_var_1(self): - ueid = self.session.user.eid - repo._type_source_cache[999999] = ('Note', 'cards', 999999, 'cards') - # need access to cards source since X table has to be accessed because of the outer join - self._test('Any U WHERE U in_group G, (G name IN ("managers", "logilab") OR ' - '(X require_permission P?, P name "bla", P require_group G)), X eid %(x)s, U eid %(u)s', - [('FetchStep', - [('Any 999999', [{}])], [self.cards], - None, {u'%(x)s': 'table0.C0'}, []), - ('OneFetchStep', - [(u'Any 6 WHERE 6 in_group G, (G name IN("managers", "logilab")) OR ' - '(X require_permission P?, P name "bla", P require_group G), ' - 'G is CWGroup, P is CWPermission, X is Note', - [{'G': 'CWGroup', 'P': 'CWPermission', 'X': 'Note'}])], - None, None, [self.system], {u'%(x)s': 'table0.C0'}, [])], - {'x': 999999, 'u': ueid}) - - def test_simplified_var_2(self): - ueid = self.session.user.eid - repo._type_source_cache[999999] = ('Note', 'cards', 999999, 'cards') - # no need access to source since X is invariant - self._test('Any U WHERE U in_group G, (G name IN ("managers", "logilab") OR ' - '(X require_permission P, P name "bla", P require_group G)), X eid %(x)s, U eid %(u)s', - [('OneFetchStep', [('Any %s WHERE %s in_group G, (G name IN("managers", "logilab")) OR (999999 require_permission P, P name "bla", P require_group G)' % (ueid, ueid), - [{'G': 'CWGroup', 'P': 'CWPermission'}])], - None, None, [self.system], {}, [])], - {'x': 999999, 'u': ueid}) - - def test_has_text(self): - self._test('Card X WHERE X has_text "toto"', - [('OneFetchStep', [('Any X WHERE X has_text "toto", X is Card', - [{'X': 'Card'}])], - None, None, [self.system], {}, [])]) - - def test_has_text_3(self): - self._test('Any X WHERE X has_text "toto", X title "zoubidou", X is IN (Card, EmailThread)', - [('FetchStep', [(u'Any X WHERE X title "zoubidou", X is Card', - [{'X': 'Card'}])], - [self.cards, self.system], None, {'X': 'table0.C0'}, []), - ('UnionStep', None, None, [ - ('OneFetchStep', [(u'Any X WHERE X has_text "toto", X is Card', - [{'X': 'Card'}])], - None, None, [self.system], {'X': 'table0.C0'}, []), - ('OneFetchStep', [(u'Any X WHERE X has_text "toto", X title "zoubidou", X is EmailThread', - [{'X': 'EmailThread'}])], - None, None, [self.system], {}, []), - ]), - ]) - - def test_has_text_orderby_rank(self): - self._test('Any X ORDERBY FTIRANK(X) WHERE X has_text "bla", X firstname "bla"', - [('FetchStep', [('Any X WHERE X firstname "bla", X is CWUser', [{'X': 'CWUser'}])], - [self.ldap, self.system], None, {'X': 'table0.C0'}, []), - ('AggrStep', 'SELECT table1.C1 FROM table1\nORDER BY table1.C0', None, [ - ('FetchStep', [('Any FTIRANK(X),X WHERE X has_text "bla", X is CWUser', - [{'X': 'CWUser'}])], - [self.system], {'X': 'table0.C0'}, {'FTIRANK(X)': 'table1.C0', 'X': 'table1.C1'}, []), - ('FetchStep', [('Any FTIRANK(X),X WHERE X has_text "bla", X firstname "bla", X is Personne', - [{'X': 'Personne'}])], - [self.system], {}, {'FTIRANK(X)': 'table1.C0', 'X': 'table1.C1'}, []), - ]), - ]) - - def test_security_has_text_orderby_rank(self): - # use a guest user - self.session = self.user_groups_session('guests') - ueid = self.session.user.eid - self._test('Any X ORDERBY FTIRANK(X) WHERE X has_text "bla", X firstname "bla"', - [('FetchStep', [('Any X WHERE X firstname "bla", X is CWUser', [{'X': 'CWUser'}])], - [self.ldap, self.system], None, {'X': 'table1.C0'}, []), - ('UnionFetchStep', - [('FetchStep', [('Any X WHERE X firstname "bla", X is Personne', [{'X': 'Personne'}])], - [self.system], {}, {'X': 'table0.C0'}, []), - ('FetchStep', [('Any X WHERE EXISTS(X owned_by %s), X is CWUser' % ueid, [{'X': 'CWUser'}])], - [self.system], {'X': 'table1.C0'}, {'X': 'table0.C0'}, [])]), - ('OneFetchStep', [('Any X ORDERBY FTIRANK(X) WHERE X has_text "bla"', - [{'X': 'CWUser'}, {'X': 'Personne'}])], - None, None, [self.system], {'X': 'table0.C0'}, []), - ]) - - def test_has_text_select_rank(self): - self._test('Any X, FTIRANK(X) WHERE X has_text "bla", X firstname "bla"', - # XXX unecessary duplicate selection - [('FetchStep', [('Any X,X WHERE X firstname "bla", X is CWUser', [{'X': 'CWUser'}])], - [self.ldap, self.system], None, {'X': 'table0.C1'}, []), - ('UnionStep', None, None, [ - ('OneFetchStep', [('Any X,FTIRANK(X) WHERE X has_text "bla", X is CWUser', [{'X': 'CWUser'}])], - None, None, [self.system], {'X': 'table0.C1'}, []), - ('OneFetchStep', [('Any X,FTIRANK(X) WHERE X has_text "bla", X firstname "bla", X is Personne', [{'X': 'Personne'}])], - None, None, [self.system], {}, []), - ]), - ]) - - def test_security_has_text_select_rank(self): - # use a guest user - self.session = self.user_groups_session('guests') - ueid = self.session.user.eid - self._test('Any X, FTIRANK(X) WHERE X has_text "bla", X firstname "bla"', - [('FetchStep', [('Any X,X WHERE X firstname "bla", X is CWUser', [{'X': 'CWUser'}])], - [self.ldap, self.system], None, {'X': 'table0.C1'}, []), - ('UnionStep', None, None, [ - ('OneFetchStep', [('Any X,FTIRANK(X) WHERE X has_text "bla", EXISTS(X owned_by %s), X is CWUser' % ueid, [{'X': 'CWUser'}])], - None, None, [self.system], {'X': 'table0.C1'}, []), - ('OneFetchStep', [('Any X,FTIRANK(X) WHERE X has_text "bla", X firstname "bla", X is Personne', [{'X': 'Personne'}])], - None, None, [self.system], {}, []), - ]), - ]) - - def test_sort_func(self): - self._test('Note X ORDERBY DUMB_SORT(RF) WHERE X type RF', - [('AggrStep', 'SELECT table0.C0 FROM table0\nORDER BY DUMB_SORT(table0.C1)', None, [ - ('FetchStep', [('Any X,RF WHERE X type RF, X is Note', - [{'X': 'Note', 'RF': 'String'}])], - [self.cards, self.system], {}, {'X': 'table0.C0', 'X.type': 'table0.C1', 'RF': 'table0.C1'}, []), - ]) - ]) - - def test_ambigous_sort_func(self): - self._test('Any X ORDERBY DUMB_SORT(RF) WHERE X title RF, X is IN (Bookmark, Card, EmailThread)', - [('AggrStep', 'SELECT table0.C0 FROM table0\nORDER BY DUMB_SORT(table0.C1)', None, - [('FetchStep', [('Any X,RF WHERE X title RF, X is Card', - [{'X': 'Card', 'RF': 'String'}])], - [self.cards, self.system], {}, - {'X': 'table0.C0', 'X.title': 'table0.C1', 'RF': 'table0.C1'}, []), - ('FetchStep', [('Any X,RF WHERE X title RF, X is IN(Bookmark, EmailThread)', - [{'RF': 'String', 'X': 'Bookmark'}, - {'RF': 'String', 'X': 'EmailThread'}])], - [self.system], {}, - {'X': 'table0.C0', 'X.title': 'table0.C1', 'RF': 'table0.C1'}, []), - ]), - ]) - - def test_attr_unification_1(self): - self._test('Any X,Y WHERE X is Bookmark, Y is Card, X title T, Y title T', - [('FetchStep', - [('Any Y,T WHERE Y title T, Y is Card', [{'T': 'String', 'Y': 'Card'}])], - [self.cards, self.system], None, - {'T': 'table0.C1', 'Y': 'table0.C0', 'Y.title': 'table0.C1'}, []), - ('OneFetchStep', - [('Any X,Y WHERE X title T, Y title T, X is Bookmark, Y is Card', - [{'T': 'String', 'X': 'Bookmark', 'Y': 'Card'}])], - None, None, [self.system], - {'T': 'table0.C1', 'Y': 'table0.C0', 'Y.title': 'table0.C1'}, []) - ]) - - def test_attr_unification_2(self): - self._test('Any X,Y WHERE X is Note, Y is Card, X type T, Y title T', - [('FetchStep', - [('Any X,T WHERE X type T, X is Note', [{'T': 'String', 'X': 'Note'}])], - [self.cards, self.system], None, - {'T': 'table0.C1', 'X': 'table0.C0', 'X.type': 'table0.C1'}, []), - ('FetchStep', - [('Any Y,T WHERE Y title T, Y is Card', [{'T': 'String', 'Y': 'Card'}])], - [self.cards, self.system], None, - {'T': 'table1.C1', 'Y': 'table1.C0', 'Y.title': 'table1.C1'}, []), - ('OneFetchStep', - [('Any X,Y WHERE X type T, Y title T, X is Note, Y is Card', - [{'T': 'String', 'X': 'Note', 'Y': 'Card'}])], - None, None, [self.system], - {'T': 'table1.C1', - 'X': 'table0.C0', 'X.type': 'table0.C1', - 'Y': 'table1.C0', 'Y.title': 'table1.C1'}, []) - ]) - - def test_attr_unification_neq_1(self): - self._test('Any X,Y WHERE X is Bookmark, Y is Card, X creation_date D, Y creation_date > D', - [('FetchStep', - [('Any Y,D WHERE Y creation_date D, Y is Card', - [{'D': 'Datetime', 'Y': 'Card'}])], - [self.cards,self.system], None, - {'D': 'table0.C1', 'Y': 'table0.C0', 'Y.creation_date': 'table0.C1'}, []), - ('OneFetchStep', - [('Any X,Y WHERE X creation_date D, Y creation_date > D, X is Bookmark, Y is Card', - [{'D': 'Datetime', 'X': 'Bookmark', 'Y': 'Card'}])], None, None, - [self.system], - {'D': 'table0.C1', 'Y': 'table0.C0', 'Y.creation_date': 'table0.C1'}, []) - ]) - - def test_subquery_1(self): - ueid = self.session.user.eid - self._test('DISTINCT Any B,C ORDERBY C WHERE A created_by B, B login C, EXISTS(B owned_by D), D eid %(E)s ' - 'WITH A,N BEING ((Any X,N WHERE X is Tag, X name N) UNION (Any X,T WHERE X is Bookmark, X title T))', - [('FetchStep', [('Any X,N WHERE X is Tag, X name N', [{'N': 'String', 'X': 'Tag'}]), - ('Any X,T WHERE X is Bookmark, X title T', - [{'T': 'String', 'X': 'Bookmark'}])], - [self.system], {}, {'N': 'table0.C1', 'X': 'table0.C0', 'X.name': 'table0.C1'}, []), - ('FetchStep', - [('Any B,C WHERE B login C, B is CWUser', [{'B': 'CWUser', 'C': 'String'}])], - [self.ldap, self.system], None, {'B': 'table1.C0', 'B.login': 'table1.C1', 'C': 'table1.C1'}, []), - ('OneFetchStep', [('DISTINCT Any B,C ORDERBY C WHERE A created_by B, B login C, EXISTS(B owned_by %s), B is CWUser, A is IN(Bookmark, Tag)' % ueid, - [{'A': 'Bookmark', 'B': 'CWUser', 'C': 'String'}, - {'A': 'Tag', 'B': 'CWUser', 'C': 'String'}])], - None, None, [self.system], - {'A': 'table0.C0', - 'B': 'table1.C0', 'B.login': 'table1.C1', - 'C': 'table1.C1', - 'N': 'table0.C1'}, - [])], - {'E': ueid}) - - def test_subquery_2(self): - ueid = self.session.user.eid - self._test('DISTINCT Any B,C ORDERBY C WHERE A created_by B, B login C, EXISTS(B owned_by D), D eid %(E)s ' - 'WITH A,N BEING ((Any X,N WHERE X is Tag, X name N) UNION (Any X,T WHERE X is Card, X title T))', - [('UnionFetchStep', - [('FetchStep', [('Any X,N WHERE X is Tag, X name N', [{'N': 'String', 'X': 'Tag'}])], - [self.system], {}, - {'N': 'table0.C1', - 'T': 'table0.C1', - 'X': 'table0.C0', - 'X.name': 'table0.C1', - 'X.title': 'table0.C1'}, []), - ('FetchStep', [('Any X,T WHERE X is Card, X title T', - [{'T': 'String', 'X': 'Card'}])], - [self.cards, self.system], {}, - {'N': 'table0.C1', - 'T': 'table0.C1', - 'X': 'table0.C0', - 'X.name': 'table0.C1', - 'X.title': 'table0.C1'}, []), - ]), - ('FetchStep', - [('Any B,C WHERE B login C, B is CWUser', [{'B': 'CWUser', 'C': 'String'}])], - [self.ldap, self.system], None, {'B': 'table1.C0', 'B.login': 'table1.C1', 'C': 'table1.C1'}, []), - ('OneFetchStep', [('DISTINCT Any B,C ORDERBY C WHERE A created_by B, B login C, EXISTS(B owned_by %s), B is CWUser, A is IN(Card, Tag)' % ueid, - [{'A': 'Card', 'B': 'CWUser', 'C': 'String'}, - {'A': 'Tag', 'B': 'CWUser', 'C': 'String'}])], - None, None, [self.system], - {'A': 'table0.C0', - 'B': 'table1.C0', 'B.login': 'table1.C1', - 'C': 'table1.C1', - 'N': 'table0.C1'}, - [])], - {'E': ueid}) - - def test_eid_dont_cross_relation_1(self): - repo._type_source_cache[999999] = ('Personne', 'system', 999999, 'system') - self._test('Any Y,YT WHERE X eid %(x)s, X fiche Y, Y title YT', - [('OneFetchStep', [('Any Y,YT WHERE X eid 999999, X fiche Y, Y title YT', - [{'X': 'Personne', 'Y': 'Card', 'YT': 'String'}])], - None, None, [self.system], {}, [])], - {'x': 999999}) - - def test_eid_dont_cross_relation_2(self): - repo._type_source_cache[999999] = ('Note', 'cards', 999999, 'cards') - self.cards.dont_cross_relations.add('concerne') - try: - self._test('Any Y,S,YT,X WHERE Y concerne X, Y in_state S, X eid 999999, Y ref YT', - [('OneFetchStep', [('Any Y,S,YT,999999 WHERE Y concerne 999999, Y in_state S, Y ref YT', - [{'Y': 'Affaire', 'YT': 'String', 'S': 'State'}])], - None, None, [self.system], {}, [])], - {'x': 999999}) - finally: - self.cards.dont_cross_relations.remove('concerne') - - - # external source w/ .cross_relations == ['multisource_crossed_rel'] ###### - - def test_crossed_relation_eid_1_invariant(self): - repo._type_source_cache[999999] = ('Note', 'system', 999999, 'system') - self._test('Any Y WHERE X eid %(x)s, X multisource_crossed_rel Y', - [('OneFetchStep', [('Any Y WHERE 999999 multisource_crossed_rel Y', [{u'Y': 'Note'}])], - None, None, [self.system], {}, []) - ], - {'x': 999999,}) - - def test_crossed_relation_eid_1_needattr(self): - repo._type_source_cache[999999] = ('Note', 'system', 999999, 'system') - self._test('Any Y,T WHERE X eid %(x)s, X multisource_crossed_rel Y, Y type T', - [('FetchStep', [('Any Y,T WHERE Y type T, Y is Note', [{'T': 'String', 'Y': 'Note'}])], - [self.cards, self.system], None, - {'T': 'table0.C1', 'Y': 'table0.C0', 'Y.type': 'table0.C1'}, []), - ('OneFetchStep', [('Any Y,T WHERE 999999 multisource_crossed_rel Y, Y type T, Y is Note', - [{'T': 'String', 'Y': 'Note'}])], - None, None, [self.system], - {'T': 'table0.C1', 'Y': 'table0.C0', 'Y.type': 'table0.C1'}, []), - ], - {'x': 999999,}) - - def test_crossed_relation_eid_2_invariant(self): - repo._type_source_cache[999999] = ('Note', 'cards', 999999, 'cards') - self._test('Any Y WHERE X eid %(x)s, X multisource_crossed_rel Y', - [('OneFetchStep', [('Any Y WHERE 999999 multisource_crossed_rel Y, Y is Note', [{'Y': 'Note'}])], - None, None, [self.cards, self.system], {}, []) - ], - {'x': 999999,}) - - def test_crossed_relation_eid_2_needattr(self): - repo._type_source_cache[999999] = ('Note', 'cards', 999999, 'cards') - self._test('Any Y,T WHERE X eid %(x)s, X multisource_crossed_rel Y, Y type T', - [('OneFetchStep', [('Any Y,T WHERE 999999 multisource_crossed_rel Y, Y type T, Y is Note', - [{'T': 'String', 'Y': 'Note'}])], - None, None, [self.cards, self.system], {}, - []), - ], - {'x': 999999,}) - - def test_crossed_relation_eid_not_1(self): - repo._type_source_cache[999999] = ('Note', 'system', 999999, 'system') - self._test('Any Y WHERE X eid %(x)s, NOT X multisource_crossed_rel Y', - [('FetchStep', [('Any Y WHERE Y is Note', [{'Y': 'Note'}])], - [self.cards, self.system], None, {'Y': 'table0.C0'}, []), - ('OneFetchStep', [('Any Y WHERE NOT EXISTS(999999 multisource_crossed_rel Y), Y is Note', - [{'Y': 'Note'}])], - None, None, [self.system], - {'Y': 'table0.C0'}, [])], - {'x': 999999,}) - -# def test_crossed_relation_eid_not_2(self): -# repo._type_source_cache[999999] = ('Note', 'cards', 999999) -# self._test('Any Y WHERE X eid %(x)s, NOT X multisource_crossed_rel Y', -# [], -# {'x': 999999,}) - - def test_crossed_relation_base_XXXFIXME(self): - repo._type_source_cache[999999] = ('Note', 'system', 999999, 'system') - self._test('Any X,Y,T WHERE X multisource_crossed_rel Y, Y type T, X type T', - [('FetchStep', [('Any X,T WHERE X type T, X is Note', [{'T': 'String', 'X': 'Note'}])], - [self.cards, self.system], None, - {'T': 'table0.C1', 'X': 'table0.C0', 'X.type': 'table0.C1'}, []), - ('FetchStep', [('Any Y,T WHERE Y type T, Y is Note', [{'T': 'String', 'Y': 'Note'}])], - [self.cards, self.system], None, - {'T': 'table1.C1', 'Y': 'table1.C0', 'Y.type': 'table1.C1'}, []), - ('FetchStep', [('Any X,Y WHERE X multisource_crossed_rel Y, X is Note, Y is Note', - [{'X': 'Note', 'Y': 'Note'}])], - [self.cards, self.system], None, - {'X': 'table2.C0', 'Y': 'table2.C1'}, - []), - ('OneFetchStep', [('Any X,Y,T WHERE X multisource_crossed_rel Y, Y type T, X type T, ' - 'X is Note, Y is Note, Y identity A, X identity B, A is Note, B is Note', - [{u'A': 'Note', u'B': 'Note', 'T': 'String', 'X': 'Note', 'Y': 'Note'}])], - None, None, - [self.system], - {'A': 'table1.C0', - 'B': 'table0.C0', - 'T': 'table1.C1', - 'X': 'table2.C0', - 'X.type': 'table0.C1', - 'Y': 'table2.C1', - 'Y.type': 'table1.C1'}, - []), - ], - {'x': 999999,}) - - def test_crossed_relation_noeid_needattr(self): - # http://www.cubicweb.org/ticket/1382452 - self._test('DISTINCT Any DEP WHERE DEP is Note, P type "cubicweb-foo", P multisource_crossed_rel DEP, DEP type LIKE "cubicweb%"', - [('FetchStep', [(u'Any DEP WHERE DEP type LIKE "cubicweb%", DEP is Note', - [{'DEP': 'Note'}])], - [self.cards, self.system], None, - {'DEP': 'table0.C0'}, - []), - ('FetchStep', [(u'Any P WHERE P type "cubicweb-foo", P is Note', [{'P': 'Note'}])], - [self.cards, self.system], None, {'P': 'table1.C0'}, - []), - ('FetchStep', [('Any DEP,P WHERE P multisource_crossed_rel DEP, DEP is Note, P is Note', - [{'DEP': 'Note', 'P': 'Note'}])], - [self.cards, self.system], None, {'DEP': 'table2.C0', 'P': 'table2.C1'}, - []), - ('OneFetchStep', - [('DISTINCT Any DEP WHERE P multisource_crossed_rel DEP, DEP is Note, ' - 'P is Note, DEP identity A, P identity B, A is Note, B is Note', - [{u'A': 'Note', u'B': 'Note', 'DEP': 'Note', 'P': 'Note'}])], - None, None, [self.system], - {'A': 'table0.C0', 'B': 'table1.C0', 'DEP': 'table2.C0', 'P': 'table2.C1'}, - [])]) - - def test_crossed_relation_noeid_invariant(self): - # see comment in http://www.cubicweb.org/ticket/1382452 - self.schema.add_relation_def( - RelationDefinition(subject='Note', name='multisource_crossed_rel', object='Affaire')) - self.repo.set_schema(self.schema) - try: - self._test('DISTINCT Any P,DEP WHERE P type "cubicweb-foo", P multisource_crossed_rel DEP', - [('FetchStep', - [('Any DEP WHERE DEP is Note', [{'DEP': 'Note'}])], - [self.cards, self.system], None, {'DEP': 'table0.C0'}, []), - ('FetchStep', - [(u'Any P WHERE P type "cubicweb-foo", P is Note', [{'P': 'Note'}])], - [self.cards, self.system], None, {'P': 'table1.C0'}, []), - ('UnionStep', None, None, - [('OneFetchStep', - [('DISTINCT Any P,DEP WHERE P multisource_crossed_rel DEP, DEP is Note, P is Note', - [{'DEP': 'Note', 'P': 'Note'}])], - None, None, [self.cards], None, []), - ('OneFetchStep', - [('DISTINCT Any P,DEP WHERE P multisource_crossed_rel DEP, DEP is Note, P is Note', - [{'DEP': 'Note', 'P': 'Note'}])], - None, None, [self.system], - {'DEP': 'table0.C0', 'P': 'table1.C0'}, - []), - ('OneFetchStep', - [('DISTINCT Any P,DEP WHERE P multisource_crossed_rel DEP, DEP is Affaire, P is Note', - [{'DEP': 'Affaire', 'P': 'Note'}])], - None, None, [self.system], {'P': 'table1.C0'}, - [])]) - ]) - finally: - self.schema.del_relation_def('Note', 'multisource_crossed_rel', 'Affaire') - self.repo.set_schema(self.schema) - - # edition queries tests ################################################### - - def test_insert_simplified_var_1(self): - repo._type_source_cache[999999] = ('Note', 'cards', 999999, 'cards') - repo._type_source_cache[999998] = ('State', 'system', None, 'system') - self._test('INSERT Note X: X in_state S, X type T WHERE S eid %(s)s, N eid %(n)s, N type T', - [('InsertStep', - [('InsertRelationsStep', - [('OneFetchStep', [('Any T WHERE N eid 999999, N type T, N is Note', - [{'N': 'Note', 'T': 'String'}])], - None, None, [self.cards], {}, [])]) - ]) - ], - {'n': 999999, 's': 999998}) - - def test_insert_simplified_var_2(self): - repo._type_source_cache[999999] = ('Note', 'cards', 999999, 'cards') - repo._type_source_cache[999998] = ('State', 'system', None, 'system') - self._test('INSERT Note X: X in_state S, X type T, X migrated_from N WHERE S eid %(s)s, N eid %(n)s, N type T', - [('InsertStep', - [('InsertRelationsStep', - [('OneFetchStep', [('Any T WHERE N eid 999999, N type T, N is Note', - [{'N': 'Note', 'T': 'String'}])], - None, None, [self.cards], {}, []) - ]) - ]) - ], - {'n': 999999, 's': 999998}) - - def test_insert_simplified_var_3(self): - repo._type_source_cache[999999] = ('Note', 'cards', 999999, 'cards') - repo._type_source_cache[999998] = ('State', 'cards', 999998, 'cards') - self._test('INSERT Note X: X in_state S, X type T WHERE S eid %(s)s, N eid %(n)s, N type T', - [('InsertStep', - [('InsertRelationsStep', - [('OneFetchStep', [('Any T WHERE N eid 999999, N type T, N is Note', - [{'N': 'Note', 'T': 'String'}])], - None, None, [self.cards], {}, [])] - )] - )], - {'n': 999999, 's': 999998}) - - def test_insert_simplified_var_4(self): - repo._type_source_cache[999999] = ('Note', 'cards', 999999, 'cards') - repo._type_source_cache[999998] = ('State', 'system', None, 'system') - self._test('INSERT Note X: X in_state S, X type "bla", X migrated_from N WHERE S eid %(s)s, N eid %(n)s', - [('InsertStep', - [('InsertRelationsStep', [])] - )], - {'n': 999999, 's': 999998}) - - def test_insert_simplified_var_5(self): - repo._type_source_cache[999999] = ('Note', 'cards', 999999, 'cards') - repo._type_source_cache[999998] = ('State', 'system', None, 'system') - self._test('INSERT Note X: X in_state S, X type "bla", X migrated_from N WHERE S eid %(s)s, N eid %(n)s, A concerne N', - [('InsertStep', - [('InsertRelationsStep', - [('OneFetchStep', - [('Any A WHERE A concerne 999999, A is Affaire', - [{'A': 'Affaire'}])], - None, None, [self.system], {}, []), - ]), - ]) - ], - {'n': 999999, 's': 999998}) - - def test_delete_relation1(self): - ueid = self.session.user.eid - self._test('DELETE X created_by Y WHERE X eid %(x)s, NOT Y eid %(y)s', - [('DeleteRelationsStep', [ - ('OneFetchStep', [('Any %s,Y WHERE %s created_by Y, NOT Y eid %s, Y is CWUser' % (ueid, ueid, ueid), - [{'Y': 'CWUser'}])], - None, None, [self.system], {}, []), - ]), - ], - {'x': ueid, 'y': ueid}) - - def test_delete_relation2(self): - ueid = self.session.user.eid - self._test('DELETE X created_by Y WHERE X eid %(x)s, NOT Y login "syt"', - [('FetchStep', [('Any Y WHERE NOT Y login "syt", Y is CWUser', [{'Y': 'CWUser'}])], - [self.ldap, self.system], None, {'Y': 'table0.C0'}, []), - ('DeleteRelationsStep', [ - ('OneFetchStep', [('Any %s,Y WHERE %s created_by Y, Y is CWUser'%(ueid,ueid), [{'Y': 'CWUser'}])], - None, None, [self.system], {'Y': 'table0.C0'}, []), - ]), - ], - {'x': ueid, 'y': ueid}) - - def test_delete_relation3(self): - repo._type_source_cache[999999] = ('Note', 'cards', 999999, 'cards') - self.assertRaises( - BadRQLQuery, self._test, - 'DELETE Y multisource_inlined_rel X WHERE X eid %(x)s, ' - 'NOT (Y cw_source S, S name %(source)s)', [], - {'x': 999999, 'source': 'cards'}) - - def test_delete_relation4(self): - repo._type_source_cache[999999] = ('Note', 'cards', 999999, 'cards') - self.assertRaises( - BadRQLQuery, self._test, - 'DELETE X multisource_inlined_rel Y WHERE Y is Note, X eid %(x)s, ' - 'NOT (Y cw_source S, S name %(source)s)', [], - {'x': 999999, 'source': 'cards'}) - - def test_delete_entity1(self): - repo._type_source_cache[999999] = ('Note', 'system', 999999, 'system') - self._test('DELETE Note X WHERE X eid %(x)s, NOT Y multisource_rel X', - [('DeleteEntitiesStep', - [('OneFetchStep', [('Any 999999 WHERE NOT EXISTS(Y multisource_rel 999999), Y is IN(Card, Note)', - [{'Y': 'Card'}, {'Y': 'Note'}])], - None, None, [self.system], {}, []) - ]) - ], - {'x': 999999}) - - def test_delete_entity2(self): - repo._type_source_cache[999999] = ('Note', 'system', 999999, 'system') - self._test('DELETE Note X WHERE X eid %(x)s, NOT X multisource_inlined_rel Y', - [('DeleteEntitiesStep', - [('OneFetchStep', [('Any X WHERE X eid 999999, NOT X multisource_inlined_rel Y, X is Note, Y is IN(Affaire, Note)', - [{'X': 'Note', 'Y': 'Affaire'}, {'X': 'Note', 'Y': 'Note'}])], - None, None, [self.system], {}, []) - ]) - ], - {'x': 999999}) - - def test_update(self): - self._test('SET X copain Y WHERE X login "comme", Y login "cochon"', - [('FetchStep', - [('Any X WHERE X login "comme", X is CWUser', [{'X': 'CWUser'}])], - [self.ldap, self.system], None, {'X': 'table0.C0'}, []), - ('FetchStep', - [('Any Y WHERE Y login "cochon", Y is CWUser', [{'Y': 'CWUser'}])], - [self.ldap, self.system], None, {'Y': 'table1.C0'}, []), - ('UpdateStep', - [('OneFetchStep', - [('DISTINCT Any X,Y WHERE X is CWUser, Y is CWUser', - [{'X': 'CWUser', 'Y': 'CWUser'}])], - None, None, [self.system], {'X': 'table0.C0', 'Y': 'table1.C0'}, []) - ]) - ]) - - def test_update2(self): - self._test('SET U in_group G WHERE G name ~= "bougloup%", U login "admin"', - [('FetchStep', [('Any U WHERE U login "admin", U is CWUser', [{'U': 'CWUser'}])], - [self.ldap, self.system], None, {'U': 'table0.C0'}, []), - ('UpdateStep', [ - ('OneFetchStep', [('DISTINCT Any U,G WHERE G name ILIKE "bougloup%", G is CWGroup, U is CWUser', - [{'U': 'CWUser', 'G': 'CWGroup'}])], - None, None, [self.system], {'U': 'table0.C0'}, []), - ]), - ]) - - def test_update3(self): - anoneid = self.user_groups_session('guests').user.eid - # since we are adding a in_state relation for an entity in the system - # source, states should only be searched in the system source as well - self._test('SET X in_state S WHERE X eid %(x)s, S name "deactivated"', - [('UpdateStep', [ - ('OneFetchStep', [('DISTINCT Any S WHERE S name "deactivated", S is State', - [{'S': 'State'}])], - None, None, [self.system], {}, []), - ]), - ], - {'x': anoneid}) - -# def test_update4(self): -# # since we are adding a in_state relation with a state from the system -# # source, CWUser should only be searched only in the system source as well -# rset = self.execute('State X WHERE X name "activated"') -# assert len(rset) == 1, rset -# activatedeid = rset[0][0] -# self._test('SET X in_state S WHERE X is CWUser, S eid %s' % activatedeid, -# [('UpdateStep', [ -# ('OneFetchStep', [('DISTINCT Any X,%s WHERE X is CWUser' % activatedeid, -# [{'X': 'CWUser'}])], -# None, None, [self.system], {}, []), -# ]), -# ]) - - def test_ldap_user_related_to_invariant_and_dont_cross_rel(self): - self.repo._type_source_cache[999999] = ('Note', 'cards', 999999, 'cards') - self.cards.dont_cross_relations.add('created_by') - try: - self._test('Any X,XL WHERE E eid %(x)s, E created_by X, X login XL', - [('FetchStep', [('Any X,XL WHERE X login XL, X is CWUser', - [{'X': 'CWUser', 'XL': 'String'}])], - [self.ldap, self.system], None, - {'X': 'table0.C0', 'X.login': 'table0.C1', 'XL': 'table0.C1'}, - []), - ('OneFetchStep', - [('Any X,XL WHERE 999999 created_by X, X login XL, X is CWUser', - [{'X': 'CWUser', 'XL': 'String'}])], - None, None, - [self.system], - {'X': 'table0.C0', 'X.login': 'table0.C1', 'XL': 'table0.C1'}, - [])], - {'x': 999999}) - finally: - self.cards.dont_cross_relations.remove('created_by') - - def test_ambigous_cross_relation(self): - self.repo._type_source_cache[999999] = ('Note', 'cards', 999999, 'cards') - self.cards.support_relations['see_also'] = True - self.cards.cross_relations.add('see_also') - try: - self._test('Any X,AA ORDERBY AA WHERE E eid %(x)s, E see_also X, X modification_date AA', - [('AggrStep', - 'SELECT table0.C0, table0.C1 FROM table0\nORDER BY table0.C1', - None, - [('FetchStep', - [('Any X,AA WHERE 999999 see_also X, X modification_date AA, X is Note', - [{'AA': 'Datetime', 'X': 'Note'}])], [self.cards, self.system], {}, - {'AA': 'table0.C1', 'X': 'table0.C0', - 'X.modification_date': 'table0.C1'}, - []), - ('FetchStep', - [('Any X,AA WHERE 999999 see_also X, X modification_date AA, X is Bookmark', - [{'AA': 'Datetime', 'X': 'Bookmark'}])], - [self.system], {}, - {'AA': 'table0.C1', 'X': 'table0.C0', - 'X.modification_date': 'table0.C1'}, - [])])], - {'x': 999999}) - finally: - del self.cards.support_relations['see_also'] - self.cards.cross_relations.remove('see_also') - - def test_state_of_cross(self): - self._test('DELETE State X WHERE NOT X state_of Y', - [('DeleteEntitiesStep', - [('OneFetchStep', - [('Any X WHERE NOT X state_of Y, X is State, Y is Workflow', - [{'X': 'State', 'Y': 'Workflow'}])], - None, None, [self.system], {}, [])])] - ) - - - def test_source_specified_0_0(self): - self._test('Card X WHERE X cw_source S, S eid 1', - [('OneFetchStep', [('Any X WHERE X cw_source 1, X is Card', - [{'X': 'Card'}])], - None, None, - [self.system],{}, []) - ]) - - def test_source_specified_0_1(self): - self._test('Any X, S WHERE X is Card, X cw_source S, S eid 1', - [('OneFetchStep', [('Any X,1 WHERE X is Card, X cw_source 1', - [{'X': 'Card'}])], - None, None, - [self.system],{}, []) - ]) - - def test_source_specified_1_0(self): - self._test('Card X WHERE X cw_source S, S name "system"', - [('OneFetchStep', [('Any X WHERE X cw_source S, S name "system", X is Card', - [{'X': 'Card', 'S': 'CWSource'}])], - None, None, - [self.system],{}, []) - ]) - - def test_source_specified_1_1(self): - self._test('Any X, SN WHERE X is Card, X cw_source S, S name "system", S name SN', - [('OneFetchStep', [('Any X,SN WHERE X is Card, X cw_source S, S name "system", ' - 'S name SN', - [{'S': 'CWSource', 'SN': 'String', 'X': 'Card'}])], - None, None, [self.system], {}, []) - ]) - - def test_source_specified_1_2(self): - self._test('Card X WHERE X cw_source S, S name "datafeed"', - [('OneFetchStep', [('Any X WHERE X cw_source S, S name "datafeed", X is Card', - [{'X': 'Card', 'S': 'CWSource'}])], - None, None, - [self.system],{}, []) - ]) - - def test_source_specified_1_3(self): - self._test('Any X, SN WHERE X is Card, X cw_source S, S name "datafeed", S name SN', - [('OneFetchStep', [('Any X,SN WHERE X is Card, X cw_source S, S name "datafeed", ' - 'S name SN', - [{'S': 'CWSource', 'SN': 'String', 'X': 'Card'}])], - None, None, [self.system], {}, []) - ]) - - def test_source_specified_1_4(self): - sols = [] - for sol in X_ALL_SOLS: - sol = sol.copy() - sol['S'] = 'CWSource' - sols.append(sol) - self._test('Any X WHERE X cw_source S, S name "cards"', - [('OneFetchStep', [('Any X WHERE X cw_source S, S name "cards"', - sols)], - None, None, - [self.system],{}, []) - ]) - - def test_source_specified_2_0(self): - # self._test('Card X WHERE X cw_source S, NOT S eid 1', - # [('OneFetchStep', [('Any X WHERE X is Card', - # [{'X': 'Card'}])], - # None, None, - # [self.cards],{}, []) - # ]) - self._test('Card X WHERE NOT X cw_source S, S eid 1', - [('OneFetchStep', [('Any X WHERE X is Card', - [{'X': 'Card'}])], - None, None, - [self.cards],{}, []) - ]) - - def test_source_specified_2_1(self): - self._test('Card X WHERE X cw_source S, NOT S name "system"', - [('OneFetchStep', [('Any X WHERE X is Card', - [{'X': 'Card'}])], - None, None, - [self.cards],{}, []) - ]) - self._test('Card X WHERE NOT X cw_source S, S name "system"', - [('OneFetchStep', [('Any X WHERE X is Card', - [{'X': 'Card'}])], - None, None, - [self.cards],{}, []) - ]) - - def test_source_specified_3_1(self): - self._test('Any X,XT WHERE X is Card, X title XT, X cw_source S, S name "cards"', - [('OneFetchStep', - [('Any X,XT WHERE X is Card, X title XT', - [{'X': 'Card', 'XT': 'String'}])], - None, None, [self.cards], {}, []) - ]) - - def test_source_specified_3_2(self): - self._test('Any X,XT WHERE X is Card, X title XT, X cw_source S, S name "datafeed"', - [('OneFetchStep', - [('Any X,XT WHERE X is Card, X title XT, X cw_source S, S name "datafeed"', - [{'X': 'Card', 'XT': 'String', 'S': 'CWSource'}])], - None, None, [self.system], {}, []) - ]) - - def test_source_specified_3_3(self): - self.skipTest('oops') - self._test('Any STN WHERE X is Note, X type XT, X in_state ST, ST name STN, X cw_source S, S name "cards"', - [('OneFetchStep', - [('Any X,XT WHERE X is Card, X title XT', - [{'X': 'Card', 'XT': 'String'}])], - None, None, [self.cards], {}, []) - ]) - - def test_source_conflict_1(self): - self.repo._type_source_cache[999999] = ('Note', 'cards', 999999, 'cards') - with self.assertRaises(BadRQLQuery) as cm: - self._test('Any X WHERE X cw_source S, S name "system", X eid %(x)s', - [], {'x': 999999}) - self.assertEqual(str(cm.exception), 'source conflict for term %(x)s') - - def test_source_conflict_2(self): - with self.assertRaises(BadRQLQuery) as cm: - self._test('Card X WHERE X cw_source S, S name "systeme"', []) - self.assertEqual(str(cm.exception), 'source conflict for term X') - - def test_source_conflict_3(self): - self.skipTest('oops') - self._test('CWSource X WHERE X cw_source S, S name "cards"', - [('OneFetchStep', - [(u'Any X WHERE X cw_source S, S name "cards", X is CWSource', - [{'S': 'CWSource', 'X': 'CWSource'}])], - None, None, - [self.system], - {}, [])]) - - - def test_ambigous_cross_relation_source_specified(self): - self.repo._type_source_cache[999999] = ('Note', 'cards', 999999, 'cards') - self.cards.support_relations['see_also'] = True - self.cards.cross_relations.add('see_also') - try: - self._test('Any X,AA ORDERBY AA WHERE E eid %(x)s, E see_also X, X modification_date AA', - [('AggrStep', - 'SELECT table0.C0, table0.C1 FROM table0\nORDER BY table0.C1', - None, - [('FetchStep', - [('Any X,AA WHERE 999999 see_also X, X modification_date AA, X is Note', - [{'AA': 'Datetime', 'X': 'Note'}])], [self.cards, self.system], {}, - {'AA': 'table0.C1', 'X': 'table0.C0', - 'X.modification_date': 'table0.C1'}, - []), - ('FetchStep', - [('Any X,AA WHERE 999999 see_also X, X modification_date AA, X is Bookmark', - [{'AA': 'Datetime', 'X': 'Bookmark'}])], - [self.system], {}, - {'AA': 'table0.C1', 'X': 'table0.C0', - 'X.modification_date': 'table0.C1'}, - [])])], - {'x': 999999}) - finally: - del self.cards.support_relations['see_also'] - self.cards.cross_relations.remove('see_also') - - # non regression tests #################################################### - - def test_nonregr1(self): - self._test('Any X, Y WHERE X copain Y, X login "syt", Y login "cochon"', - [('FetchStep', - [('Any X WHERE X login "syt", X is CWUser', [{'X': 'CWUser'}])], - [self.ldap, self.system], None, {'X': 'table0.C0'}, []), - ('FetchStep', - [('Any Y WHERE Y login "cochon", Y is CWUser', [{'Y': 'CWUser'}])], - [self.ldap, self.system], None, {'Y': 'table1.C0'}, []), - ('OneFetchStep', - [('Any X,Y WHERE X copain Y, X is CWUser, Y is CWUser', - [{'X': 'CWUser', 'Y': 'CWUser'}])], - None, None, [self.system], {'X': 'table0.C0', 'Y': 'table1.C0'}, []) - ]) - - def test_nonregr2(self): - iworkflowable = self.session.user.cw_adapt_to('IWorkflowable') - iworkflowable.fire_transition('deactivate') - treid = iworkflowable.latest_trinfo().eid - self._test('Any X ORDERBY D DESC WHERE E eid %(x)s, E wf_info_for X, X modification_date D', - [('FetchStep', [('Any X,D WHERE X modification_date D, X is Note', - [{'X': 'Note', 'D': 'Datetime'}])], - [self.cards, self.system], None, {'X': 'table0.C0', 'X.modification_date': 'table0.C1', 'D': 'table0.C1'}, []), - ('FetchStep', [('Any X,D WHERE X modification_date D, X is CWUser', - [{'X': 'CWUser', 'D': 'Datetime'}])], - [self.ldap, self.system], None, {'X': 'table1.C0', 'X.modification_date': 'table1.C1', 'D': 'table1.C1'}, []), - ('AggrStep', 'SELECT table2.C0 FROM table2\nORDER BY table2.C1 DESC', None, [ - ('FetchStep', [('Any X,D WHERE E eid %s, E wf_info_for X, X modification_date D, E is TrInfo, X is Affaire'%treid, - [{'X': 'Affaire', 'E': 'TrInfo', 'D': 'Datetime'}])], - [self.system], - {}, - {'X': 'table2.C0', 'X.modification_date': 'table2.C1', 'D': 'table2.C1', 'E.wf_info_for': 'table2.C0'}, []), - ('FetchStep', [('Any X,D WHERE E eid %s, E wf_info_for X, X modification_date D, E is TrInfo, X is CWUser'%treid, - [{'X': 'CWUser', 'E': 'TrInfo', 'D': 'Datetime'}])], - [self.system], - {'X': 'table1.C0', 'X.modification_date': 'table1.C1', 'D': 'table1.C1'}, - {'X': 'table2.C0', 'X.modification_date': 'table2.C1', 'D': 'table2.C1', 'E.wf_info_for': 'table2.C0'}, []), - ('FetchStep', [('Any X,D WHERE E eid %s, E wf_info_for X, X modification_date D, E is TrInfo, X is Note'%treid, - [{'X': 'Note', 'E': 'TrInfo', 'D': 'Datetime'}])], - [self.system], - {'X': 'table0.C0', 'X.modification_date': 'table0.C1', 'D': 'table0.C1'}, - {'X': 'table2.C0', 'X.modification_date': 'table2.C1', 'D': 'table2.C1', 'E.wf_info_for': 'table2.C0'}, []), - ]), - ], - {'x': treid}) - - def test_nonregr3(self): - # original jpl query: - # Any X, NOW - CD, P WHERE P is Project, U interested_in P, U is CWUser, U login "sthenault", X concerns P, X creation_date CD ORDERBY CD DESC LIMIT 5 - self._test('Any X, NOW - CD, P ORDERBY CD DESC LIMIT 5 WHERE P bookmarked_by U, U login "admin", P is X, X creation_date CD', - [('FetchStep', [('Any U WHERE U login "admin", U is CWUser', [{'U': 'CWUser'}])], - [self.ldap, self.system], None, {'U': 'table0.C0'}, []), - ('OneFetchStep', [('Any X,(NOW - CD),P ORDERBY CD DESC LIMIT 5 WHERE P bookmarked_by U, P is X, X creation_date CD, P is Bookmark, U is CWUser, X is CWEType', - [{'P': 'Bookmark', 'U': 'CWUser', 'X': 'CWEType', 'CD': 'Datetime'}])], - 5, None, [self.system], {'U': 'table0.C0'}, [])] - ) - - def test_nonregr4(self): - ueid = self.session.user.eid - self._test('Any U ORDERBY D DESC WHERE WF wf_info_for X, WF creation_date D, WF from_state FS, ' - 'WF owned_by U?, X eid %(x)s', - [#('FetchStep', [('Any U WHERE U is CWUser', [{'U': 'CWUser'}])], - # [self.ldap, self.system], None, {'U': 'table0.C0'}, []), - ('OneFetchStep', [('Any U ORDERBY D DESC WHERE WF wf_info_for %s, WF creation_date D, WF from_state FS, WF owned_by U?' % ueid, - [{'WF': 'TrInfo', 'FS': 'State', 'U': 'CWUser', 'D': 'Datetime'}])], - None, None, - [self.system], {}, [])], - {'x': ueid}) - - def test_nonregr5(self): - # original jpl query: - # DISTINCT Version V WHERE MB done_in MV, MV eid %(x)s, - # MB depends_on B, B done_in V, V version_of P, NOT P eid %(p)s' - cardeid = self.execute('INSERT Card X: X title "hop"')[0][0] - noteeid = self.execute('INSERT Note X')[0][0] - self._test('DISTINCT Card V WHERE MB documented_by MV, MV eid %(x)s, ' - 'MB depends_on B, B documented_by V, V multisource_rel P, NOT P eid %(p)s', - [('FetchStep', [('Any V WHERE V multisource_rel P, NOT P eid %s, P is Note, V is Card'%noteeid, - [{'P': 'Note', 'V': 'Card'}])], - [self.cards, self.system], None, {'V': 'table0.C0'}, []), - ('OneFetchStep', [('DISTINCT Any V WHERE MB documented_by %s, MB depends_on B, B documented_by V, B is Affaire, MB is Affaire, V is Card'%cardeid, - [{'B': 'Affaire', 'MB': 'Affaire', 'V': 'Card'}])], - None, None, [self.system], {'V': 'table0.C0'}, [])], - {'x': cardeid, 'p': noteeid}) - - def test_nonregr6(self): - self._test('Any X WHERE X concerne Y', - [('OneFetchStep', [('Any X WHERE X concerne Y', - [{'Y': 'Division', 'X': 'Affaire'}, - {'Y': 'Note', 'X': 'Affaire'}, - {'Y': 'Societe', 'X': 'Affaire'}, - {'Y': 'SubDivision', 'X': 'Affaire'}, - {'Y': 'Affaire', 'X': 'Personne'}])], - None, None, [self.system], {}, []) - ]) - self._test('Any X WHERE X concerne Y, Y is Note', - [('FetchStep', [('Any Y WHERE Y is Note', [{'Y': 'Note'}])], - [self.cards, self.system], None, {'Y': 'table0.C0'}, []), - ('OneFetchStep', [('Any X WHERE X concerne Y, X is Affaire, Y is Note', - [{'X': 'Affaire', 'Y': 'Note'}])], - None, None, [self.system], {'Y': 'table0.C0'}, []) - ]) - - def test_nonregr7(self): - repo._type_source_cache[999999] = ('Note', 'cards', 999999, 'cards') - self._test('Any S,SUM(DUR),SUM(I),(SUM(I) - SUM(DUR)),MIN(DI),MAX(DI) GROUPBY S ORDERBY S WHERE A is Affaire, A duration DUR, A invoiced I, A modification_date DI, A in_state S, S name SN, (EXISTS(A concerne WP, W multisource_rel WP)) OR (EXISTS(A concerne W)), W eid %(n)s', - [('FetchStep', [('Any WP WHERE 999999 multisource_rel WP, WP is Note', [{'WP': 'Note'}])], - [self.cards], None, {'WP': u'table0.C0'}, []), - ('OneFetchStep', [('Any S,SUM(DUR),SUM(I),(SUM(I) - SUM(DUR)),MIN(DI),MAX(DI) GROUPBY S ORDERBY S WHERE A duration DUR, A invoiced I, A modification_date DI, A in_state S, S name SN, (EXISTS(A concerne WP, WP is Note)) OR (EXISTS(A concerne 999999)), A is Affaire, S is State', - [{'A': 'Affaire', 'DI': 'Datetime', 'DUR': 'Int', 'I': 'Float', 'S': 'State', 'SN': 'String', 'WP': 'Note'}])], - None, None, [self.system], {'WP': u'table0.C0'}, [])], - {'n': 999999}) - - def test_nonregr8(self): - repo._type_source_cache[999999] = ('Note', 'cards', 999999, 'cards') - self._test('Any X,Z WHERE X eid %(x)s, X multisource_rel Y, Z concerne X', - [('FetchStep', [('Any 999999 WHERE 999999 multisource_rel Y, Y is Note', - [{'Y': 'Note'}])], - [self.cards], - None, {u'%(x)s': 'table0.C0'}, - []), - ('OneFetchStep', [('Any 999999,Z WHERE Z concerne 999999, Z is Affaire', - [{'Z': 'Affaire'}])], - None, None, [self.system], - {u'%(x)s': 'table0.C0'}, []), - ], - {'x': 999999}) - - def test_nonregr9(self): - repo._type_source_cache[999999] = ('Note', 'cards', 999999, 'cards') - repo._type_source_cache[999998] = ('Note', 'cards', 999998, 'cards') - self._test('SET X migrated_from Y WHERE X eid %(x)s, Y multisource_rel Z, Z eid %(z)s, Y migrated_from Z', - [('FetchStep', [('Any Y WHERE Y multisource_rel 999998, Y is Note', [{'Y': 'Note'}])], - [self.cards], None, {'Y': u'table0.C0'}, []), - ('UpdateStep', - [('OneFetchStep', [('DISTINCT Any Y WHERE Y migrated_from 999998, Y is Note', - [{'Y': 'Note'}])], - None, None, [self.system], - {'Y': u'table0.C0'}, [])])], - {'x': 999999, 'z': 999998}) - - def test_nonregr10(self): - repo._type_source_cache[999999] = ('CWUser', 'ldap', 999999, 'ldap') - self._test('Any X,AA,AB ORDERBY AA WHERE E eid %(x)s, E owned_by X, X login AA, X modification_date AB', - [('FetchStep', - [('Any X,AA,AB WHERE X login AA, X modification_date AB, X is CWUser', - [{'AA': 'String', 'AB': 'Datetime', 'X': 'CWUser'}])], - [self.ldap, self.system], None, {'AA': 'table0.C1', 'AB': 'table0.C2', - 'X': 'table0.C0', 'X.login': 'table0.C1', 'X.modification_date': 'table0.C2'}, - []), - ('OneFetchStep', - [('Any X,AA,AB ORDERBY AA WHERE 999999 owned_by X, X login AA, X modification_date AB, X is CWUser', - [{'AA': 'String', 'AB': 'Datetime', 'X': 'CWUser'}])], - None, None, [self.system], {'AA': 'table0.C1', 'AB': 'table0.C2', - 'X': 'table0.C0', 'X.login': 'table0.C1', 'X.modification_date': 'table0.C2'}, - []) - ], - {'x': 999999}) - - def test_nonregr11(self): - repo._type_source_cache[999999] = ('Bookmark', 'system', 999999, 'system') - self._test('SET X bookmarked_by Y WHERE X eid %(x)s, Y login "hop"', - [('UpdateStep', - [('OneFetchStep', [('DISTINCT Any Y WHERE Y login "hop", Y is CWUser', [{'Y': 'CWUser'}])], - None, None, [self.ldap, self.system], {}, [])] - )], - {'x': 999999}) - - def test_nonregr12(self): - repo._type_source_cache[999999] = ('Note', 'cards', 999999, 'cards') - self._test('Any X ORDERBY Z DESC WHERE X modification_date Z, E eid %(x)s, E see_also X', - [('FetchStep', [('Any X,Z WHERE X modification_date Z, X is Note', - [{'X': 'Note', 'Z': 'Datetime'}])], - [self.cards, self.system], None, {'X': 'table0.C0', 'X.modification_date': 'table0.C1', 'Z': 'table0.C1'}, - []), - ('AggrStep', 'SELECT table1.C0 FROM table1\nORDER BY table1.C1 DESC', None, - [('FetchStep', [('Any X,Z WHERE X modification_date Z, 999999 see_also X, X is Bookmark', - [{'X': 'Bookmark', 'Z': 'Datetime'}])], - [self.system], {}, {'X': 'table1.C0', 'X.modification_date': 'table1.C1', - 'Z': 'table1.C1'}, - []), - ('FetchStep', [('Any X,Z WHERE X modification_date Z, 999999 see_also X, X is Note', - [{'X': 'Note', 'Z': 'Datetime'}])], - [self.system], {'X': 'table0.C0', 'X.modification_date': 'table0.C1', - 'Z': 'table0.C1'}, - {'X': 'table1.C0', 'X.modification_date': 'table1.C1', - 'Z': 'table1.C1'}, - [])] - )], - {'x': 999999}) - - def test_nonregr13_1(self): - ueid = self.session.user.eid - # identity wrapped into exists: - # should'nt propagate constraint that U is in the same source as ME - self._test('Any B,U,UL GROUPBY B,U,UL WHERE B created_by U?, B is File ' - 'WITH U,UL BEING (Any U,UL WHERE ME eid %(x)s, (EXISTS(U identity ME) ' - 'OR (EXISTS(U in_group G, G name IN("managers", "staff")))) ' - 'OR (EXISTS(U in_group H, ME in_group H, NOT H name "users")), U login UL, U is CWUser)', - [('FetchStep', [('Any U,UL WHERE U login UL, U is CWUser', - [{'U': 'CWUser', 'UL': 'String'}])], - [self.ldap, self.system], None, - {'U': 'table0.C0', 'U.login': 'table0.C1', 'UL': 'table0.C1'}, - []), - ('FetchStep', [('Any U,UL WHERE ((EXISTS(U identity %s)) OR (EXISTS(U in_group G, G name IN("managers", "staff"), G is CWGroup))) OR (EXISTS(U in_group H, %s in_group H, NOT H name "users", H is CWGroup)), U login UL, U is CWUser' % (ueid, ueid), - [{'G': 'CWGroup', 'H': 'CWGroup', 'U': 'CWUser', 'UL': 'String'}])], - [self.system], - {'U': 'table0.C0', 'U.login': 'table0.C1', 'UL': 'table0.C1'}, - {'U': 'table1.C0', 'U.login': 'table1.C1', 'UL': 'table1.C1'}, - []), - ('OneFetchStep', [('Any B,U,UL GROUPBY B,U,UL WHERE B created_by U?, B is File', - [{'B': 'File', 'U': 'CWUser', 'UL': 'String'}])], - None, None, [self.system], - {'U': 'table1.C0', 'UL': 'table1.C1'}, - [])], - {'x': ueid}) - - def test_nonregr13_2(self): - # identity *not* wrapped into exists. - # - # XXX this test fail since in this case, in "U identity 5" U and 5 are - # from the same scope so constraints are applied (telling the U should - # come from the same source as user with eid 5). - # - # IMO this is normal, unless we introduce a special case for the - # identity relation. BUT I think it's better to leave it as is and to - # explain constraint propagation rules, and so why this should be - # wrapped in exists() if used in multi-source - self.skipTest('take a look at me if you wish') - ueid = self.session.user.eid - self._test('Any B,U,UL GROUPBY B,U,UL WHERE B created_by U?, B is File ' - 'WITH U,UL BEING (Any U,UL WHERE ME eid %(x)s, (U identity ME ' - 'OR (EXISTS(U in_group G, G name IN("managers", "staff")))) ' - 'OR (EXISTS(U in_group H, ME in_group H, NOT H name "users")), U login UL, U is CWUser)', - [('FetchStep', [('Any U,UL WHERE U login UL, U is CWUser', - [{'U': 'CWUser', 'UL': 'String'}])], - [self.ldap, self.system], None, - {'U': 'table0.C0', 'U.login': 'table0.C1', 'UL': 'table0.C1'}, - []), - ('FetchStep', [('Any U,UL WHERE ((U identity %s) OR (EXISTS(U in_group G, G name IN("managers", "staff"), G is CWGroup))) OR (EXISTS(U in_group H, %s in_group H, NOT H name "users", H is CWGroup)), U login UL, U is CWUser' % (ueid, ueid), - [{'G': 'CWGroup', 'H': 'CWGroup', 'U': 'CWUser', 'UL': 'String'}])], - [self.system], - {'U': 'table0.C0', 'U.login': 'table0.C1', 'UL': 'table0.C1'}, - {'U': 'table1.C0', 'U.login': 'table1.C1', 'UL': 'table1.C1'}, - []), - ('OneFetchStep', [('Any B,U,UL GROUPBY B,U,UL WHERE B created_by U?, B is File', - [{'B': 'File', 'U': 'CWUser', 'UL': 'String'}])], - None, None, [self.system], - {'U': 'table1.C0', 'UL': 'table1.C1'}, - [])], - {'x': self.session.user.eid}) - - def test_nonregr14_1(self): - repo._type_source_cache[999999] = ('CWUser', 'ldap', 999999, 'ldap') - self._test('Any X WHERE X eid %(x)s, X owned_by U, U eid %(u)s', - [('OneFetchStep', [('Any 999999 WHERE 999999 owned_by 999999', [{}])], - None, None, [self.system], {}, [])], - {'x': 999999, 'u': 999999}) - - def test_nonregr14_2(self): - repo._type_source_cache[999999] = ('CWUser', 'ldap', 999999, 'ldap') - repo._type_source_cache[999998] = ('Note', 'system', 999998, 'system') - self._test('Any X WHERE X eid %(x)s, X owned_by U, U eid %(u)s', - [('OneFetchStep', [('Any 999998 WHERE 999998 owned_by 999999', [{}])], - None, None, [self.system], {}, [])], - {'x': 999998, 'u': 999999}) - - def test_nonregr14_3(self): - repo._type_source_cache[999999] = ('CWUser', 'system', 999999, 'system') - repo._type_source_cache[999998] = ('CWUser', 'ldap', 999998, 'ldap') - self._test('Any X WHERE X eid %(x)s, X owned_by U, U eid %(u)s', - [('OneFetchStep', [('Any 999998 WHERE 999998 owned_by 999999', [{}])], - None, None, [self.system], {}, [])], - {'x': 999998, 'u': 999999}) - - def test_nonregr_identity_no_source_access_1(self): - repo._type_source_cache[999999] = ('CWUser', 'ldap', 999998, 'ldap') - self._test('Any S WHERE S identity U, S eid %(s)s, U eid %(u)s', - [('OneFetchStep', [('Any 999999 WHERE 999999 identity 999999', [{}])], - None, None, [self.system], {}, [])], - {'s': 999999, 'u': 999999}) - - def test_nonregr_identity_no_source_access_2(self): - repo._type_source_cache[999999] = ('EmailAddress', 'system', 999999, 'system') - repo._type_source_cache[999998] = ('CWUser', 'ldap', 999998, 'ldap') - self._test('Any X WHERE O use_email X, ((EXISTS(O identity U)) OR (EXISTS(O in_group G, G name IN("managers", "staff")))) OR (EXISTS(O in_group G2, U in_group G2, NOT G2 name "users")), X eid %(x)s, U eid %(u)s', - [('OneFetchStep', [('Any 999999 WHERE O use_email 999999, ((EXISTS(O identity 999998)) OR (EXISTS(O in_group G, G name IN("managers", "staff")))) OR (EXISTS(O in_group G2, 999998 in_group G2, NOT G2 name "users"))', - [{'G': 'CWGroup', 'G2': 'CWGroup', 'O': 'CWUser'}])], - None, None, [self.system], {}, [])], - {'x': 999999, 'u': 999998}) - - def test_nonregr_similar_subquery(self): - repo._type_source_cache[999999] = ('Personne', 'system', 999999, 'system') - self._test('Any T,TD,U,T,UL WITH T,TD,U,UL BEING (' - '(Any T,TD,U,UL WHERE X eid %(x)s, T comments X, T content TD, T created_by U?, U login UL)' - ' UNION ' - '(Any T,TD,U,UL WHERE X eid %(x)s, X connait P, T comments P, T content TD, T created_by U?, U login UL))', - # XXX optimization: use a OneFetchStep with a UNION of both queries - [('FetchStep', [('Any U,UL WHERE U login UL, U is CWUser', - [{'U': 'CWUser', 'UL': 'String'}])], - [self.ldap, self.system], None, - {'U': 'table0.C0', 'U.login': 'table0.C1', 'UL': 'table0.C1'}, - []), - ('UnionFetchStep', - [('FetchStep', - [('Any T,TD,U,UL WHERE T comments 999999, T content TD, T created_by U?, U login UL, T is Comment, U is CWUser', - [{'T': 'Comment', 'TD': 'String', 'U': 'CWUser', 'UL': 'String'}])], - [self.system], - {'U': 'table0.C0', 'U.login': 'table0.C1', 'UL': 'table0.C1'}, - {'T': 'table1.C0', - 'T.content': 'table1.C1', - 'TD': 'table1.C1', - 'U': 'table1.C2', - 'U.login': 'table1.C3', - 'UL': 'table1.C3'}, - []), - ('FetchStep', - [('Any T,TD,U,UL WHERE 999999 connait P, T comments P, T content TD, T created_by U?, U login UL, P is Personne, T is Comment, U is CWUser', - [{'P': 'Personne', - 'T': 'Comment', - 'TD': 'String', - 'U': 'CWUser', - 'UL': 'String'}])], - [self.system], - {'U': 'table0.C0', 'U.login': 'table0.C1', 'UL': 'table0.C1'}, - {'T': 'table1.C0', - 'T.content': 'table1.C1', - 'TD': 'table1.C1', - 'U': 'table1.C2', - 'U.login': 'table1.C3', - 'UL': 'table1.C3'}, - [])]), - ('OneFetchStep', - [('Any T,TD,U,T,UL', - [{'T': 'Comment', 'TD': 'String', 'U': 'CWUser', 'UL': 'String'}])], - None, None, - [self.system], - {'T': 'table1.C0', 'TD': 'table1.C1', 'U': 'table1.C2', 'UL': 'table1.C3'}, - [])], - {'x': 999999}) - - def test_nonregr_dont_readd_already_processed_relation(self): - self._test('Any WO,D,SO WHERE WO is Note, D tags WO, WO in_state SO', - [('FetchStep', - [('Any WO,SO WHERE WO in_state SO, SO is State, WO is Note', - [{'SO': 'State', 'WO': 'Note'}])], - [self.cards, self.system], None, - {'SO': 'table0.C1', 'WO': 'table0.C0'}, - []), - ('OneFetchStep', - [('Any WO,D,SO WHERE D tags WO, D is Tag, SO is State, WO is Note', - [{'D': 'Tag', 'SO': 'State', 'WO': 'Note'}])], - None, None, [self.system], - {'SO': 'table0.C1', 'WO': 'table0.C0'}, - []) - ]) - -class MSPlannerTwoSameExternalSourcesTC(BasePlannerTC): - """test planner related feature on a 3-sources repository: - - * 2 rql sources supporting Card - """ - - def setUp(self): - self.__class__.repo = repo - self.setup() - self.add_source(FakeCardSource, 'cards') - self.add_source(FakeCardSource, 'cards2') - self.planner = MSPlanner(self.o.schema, self.repo.vreg.rqlhelper) - assert repo.sources_by_uri['cards2'].support_relation('multisource_crossed_rel') - assert 'multisource_crossed_rel' in repo.sources_by_uri['cards2'].cross_relations - assert repo.sources_by_uri['cards'].support_relation('multisource_crossed_rel') - assert 'multisource_crossed_rel' in repo.sources_by_uri['cards'].cross_relations - _test = test_plan - - - def test_linked_external_entities(self): - repo._type_source_cache[999999] = ('Tag', 'system', 999999, 'system') - self._test('Any X,XT WHERE X is Card, X title XT, T tags X, T eid %(t)s', - [('FetchStep', - [('Any X,XT WHERE X title XT, X is Card', [{'X': 'Card', 'XT': 'String'}])], - [self.cards, self.cards2, self.system], - None, {'X': 'table0.C0', 'X.title': 'table0.C1', 'XT': 'table0.C1'}, - []), - ('OneFetchStep', - [('Any X,XT WHERE X title XT, 999999 tags X, X is Card', - [{'X': 'Card', 'XT': 'String'}])], - None, None, [self.system], - {'X': 'table0.C0', 'X.title': 'table0.C1', 'XT': 'table0.C1'}, - [])], - {'t': 999999}) - - def test_version_depends_on(self): - self.repo._type_source_cache[999999] = ('Note', 'cards', 999999, 'cards') - self._test('Any X,AD,AE WHERE E eid %(x)s, E migrated_from X, X in_state AD, AD name AE', - [('FetchStep', [('Any X,AD,AE WHERE X in_state AD, AD name AE, AD is State, X is Note', - [{'AD': 'State', 'AE': 'String', 'X': 'Note'}])], - [self.cards, self.cards2, self.system], - None, {'AD': 'table0.C1', 'AD.name': 'table0.C2', - 'AE': 'table0.C2', 'X': 'table0.C0'}, - []), - ('OneFetchStep', [('Any X,AD,AE WHERE 999999 migrated_from X, AD name AE, AD is State, X is Note', - [{'AD': 'State', 'AE': 'String', 'X': 'Note'}])], - None, None, [self.system], - {'AD': 'table0.C1', 'AD.name': 'table0.C2', 'AE': 'table0.C2', 'X': 'table0.C0'}, - [])], - {'x': 999999}) - - def test_version_crossed_depends_on_1(self): - self.repo._type_source_cache[999999] = ('Note', 'cards', 999999, 'cards') - self._test('Any X,AD,AE WHERE E eid %(x)s, E multisource_crossed_rel X, X in_state AD, AD name AE', - [('FetchStep', [('Any X,AD,AE WHERE X in_state AD, AD name AE, AD is State, X is Note', - [{'AD': 'State', 'AE': 'String', 'X': 'Note'}])], - [self.cards, self.cards2, self.system], - None, {'AD': 'table0.C1', 'AD.name': 'table0.C2', - 'AE': 'table0.C2', 'X': 'table0.C0'}, - []), - ('UnionStep', None, None, - [('OneFetchStep', [('Any X,AD,AE WHERE 999999 multisource_crossed_rel X, AD name AE, AD is State, X is Note', - [{'AD': 'State', 'AE': 'String', 'X': 'Note'}])], - None, None, [self.cards], None, - []), - ('OneFetchStep', [('Any X,AD,AE WHERE 999999 multisource_crossed_rel X, AD name AE, AD is State, X is Note', - [{'AD': 'State', 'AE': 'String', 'X': 'Note'}])], - None, None, [self.system], - {'AD': 'table0.C1', 'AD.name': 'table0.C2', - 'AE': 'table0.C2', 'X': 'table0.C0'}, - [])] - )], - {'x': 999999}) - - def test_version_crossed_depends_on_2(self): - self.repo._type_source_cache[999999] = ('Note', 'system', 999999, 'system') - self._test('Any X,AD,AE WHERE E eid %(x)s, E multisource_crossed_rel X, X in_state AD, AD name AE', - [('FetchStep', [('Any X,AD,AE WHERE X in_state AD, AD name AE, AD is State, X is Note', - [{'AD': 'State', 'AE': 'String', 'X': 'Note'}])], - [self.cards, self.cards2, self.system], - None, {'AD': 'table0.C1', 'AD.name': 'table0.C2', - 'AE': 'table0.C2', 'X': 'table0.C0'}, - []), - ('OneFetchStep', [('Any X,AD,AE WHERE 999999 multisource_crossed_rel X, AD name AE, AD is State, X is Note', - [{'AD': 'State', 'AE': 'String', 'X': 'Note'}])], - None, None, [self.system], - {'AD': 'table0.C1', 'AD.name': 'table0.C2', 'AE': 'table0.C2', 'X': 'table0.C0'}, - [])], - {'x': 999999}) - - def test_version_crossed_depends_on_3(self): - self._test('Any X,AD,AE WHERE E multisource_crossed_rel X, X in_state AD, AD name AE, E is Note', - [('FetchStep', [('Any X,AD,AE WHERE X in_state AD, AD name AE, AD is State, X is Note', - [{'AD': 'State', 'AE': 'String', 'X': 'Note'}])], - [self.cards, self.cards2, self.system], - None, {'AD': 'table0.C1', 'AD.name': 'table0.C2', - 'AE': 'table0.C2', 'X': 'table0.C0'}, - []), - ('FetchStep', [('Any E WHERE E is Note', [{'E': 'Note'}])], - [self.cards, self.cards2, self.system], - None, {'E': 'table1.C0'}, - []), - ('UnionStep', None, None, - [('OneFetchStep', [('Any X,AD,AE WHERE E multisource_crossed_rel X, AD name AE, AD is State, E is Note, X is Note', - [{'AD': 'State', 'AE': 'String', 'E': 'Note', 'X': 'Note'}])], - None, None, [self.cards, self.cards2], None, - []), - ('OneFetchStep', [('Any X,AD,AE WHERE E multisource_crossed_rel X, AD name AE, AD is State, E is Note, X is Note', - [{'AD': 'State', 'AE': 'String', 'E': 'Note', 'X': 'Note'}])], - None, None, [self.system], - {'AD': 'table0.C1', - 'AD.name': 'table0.C2', - 'AE': 'table0.C2', - 'E': 'table1.C0', - 'X': 'table0.C0'}, - [])] - )] - ) - - def test_version_crossed_depends_on_4(self): - self._test('Any X,AD,AE WHERE EXISTS(E multisource_crossed_rel X), X in_state AD, AD name AE, E is Note', - [('FetchStep', - [('Any X,AD,AE WHERE X in_state AD, AD name AE, AD is State, X is Note', - [{'X': 'Note', 'AD': 'State', 'AE': 'String'}])], - [self.cards, self.cards2, self.system], None, - {'X': 'table0.C0', - 'AD': 'table0.C1', - 'AD.name': 'table0.C2', - 'AE': 'table0.C2'}, - []), - ('FetchStep', - [('Any A WHERE E multisource_crossed_rel A, A is Note, E is Note', - [{'A': 'Note', 'E': 'Note'}])], - [self.cards, self.cards2, self.system], None, - {'A': 'table1.C0'}, - []), - ('OneFetchStep', - [('Any X,AD,AE WHERE EXISTS(X identity A), AD name AE, A is Note, AD is State, X is Note', - [{'A': 'Note', 'AD': 'State', 'AE': 'String', 'X': 'Note'}])], - None, None, - [self.system], - {'A': 'table1.C0', - 'AD': 'table0.C1', - 'AD.name': 'table0.C2', - 'AE': 'table0.C2', - 'X': 'table0.C0'}, - [] - )] - ) - - def test_nonregr_dont_cross_rel_source_filtering_1(self): - self.repo._type_source_cache[999999] = ('Note', 'cards', 999999, 'cards') - self._test('Any S WHERE E eid %(x)s, E in_state S, NOT S name "moved"', - [('OneFetchStep', [('Any S WHERE 999999 in_state S, NOT S name "moved", S is State', - [{'S': 'State'}])], - None, None, [self.cards], {}, [] - )], - {'x': 999999}) - - def test_nonregr_dont_cross_rel_source_filtering_2(self): - self.repo._type_source_cache[999999] = ('Note', 'cards', 999999, 'cards') - self._test('Any X,AA,AB WHERE E eid %(x)s, E in_state X, X name AA, X modification_date AB', - [('OneFetchStep', [('Any X,AA,AB WHERE 999999 in_state X, X name AA, X modification_date AB, X is State', - [{'AA': 'String', 'AB': 'Datetime', 'X': 'State'}])], - None, None, [self.cards], {}, [] - )], - {'x': 999999}) - - def test_nonregr_eid_query(self): - self.repo._type_source_cache[999999] = ('Note', 'cards', 999999, 'cards') - self._test('Any X WHERE X eid 999999', - [('OneFetchStep', [('Any 999999', [{}])], - None, None, [self.system], {}, [] - )], - {'x': 999999}) - - - def test_nonregr_not_is(self): - self._test("Any X WHERE X owned_by U, U login 'anon', NOT X is Comment", - [('FetchStep', [('Any X WHERE X is IN(Card, Note, State)', - [{'X': 'Note'}, {'X': 'State'}, {'X': 'Card'}])], - [self.cards, self.cards2, self.system], - None, {'X': 'table0.C0'}, []), - ('UnionStep', None, None, - [('OneFetchStep', - [(u'Any X WHERE X owned_by U, U login "anon", U is CWUser, X is IN(Affaire, BaseTransition, Basket, Bookmark, CWAttribute, CWCache, CWConstraint, CWConstraintType, CWDataImport, CWEType, CWGroup, CWPermission, CWProperty, CWRType, CWRelation, CWSource, CWSourceHostConfig, CWSourceSchemaConfig, CWUniqueTogetherConstraint, CWUser, Division, Email, EmailAddress, EmailPart, EmailThread, ExternalUri, File, Folder, Old, Personne, RQLExpression, Societe, SubDivision, SubWorkflowExitPoint, Tag, TrInfo, Transition, Workflow, WorkflowTransition)', - [{'U': 'CWUser', 'X': 'Affaire'}, - {'U': 'CWUser', 'X': 'BaseTransition'}, - {'U': 'CWUser', 'X': 'Basket'}, - {'U': 'CWUser', 'X': 'Bookmark'}, - {'U': 'CWUser', 'X': 'CWAttribute'}, - {'U': 'CWUser', 'X': 'CWCache'}, - {'U': 'CWUser', 'X': 'CWConstraint'}, - {'U': 'CWUser', 'X': 'CWConstraintType'}, - {'U': 'CWUser', 'X': 'CWDataImport'}, - {'U': 'CWUser', 'X': 'CWEType'}, - {'U': 'CWUser', 'X': 'CWGroup'}, - {'U': 'CWUser', 'X': 'CWPermission'}, - {'U': 'CWUser', 'X': 'CWProperty'}, - {'U': 'CWUser', 'X': 'CWRType'}, - {'U': 'CWUser', 'X': 'CWRelation'}, - {'U': 'CWUser', 'X': 'CWSource'}, - {'U': 'CWUser', 'X': 'CWSourceHostConfig'}, - {'U': 'CWUser', 'X': 'CWSourceSchemaConfig'}, - {'U': 'CWUser', 'X': 'CWUniqueTogetherConstraint'}, - {'U': 'CWUser', 'X': 'CWUser'}, - {'U': 'CWUser', 'X': 'Division'}, - {'U': 'CWUser', 'X': 'Email'}, - {'U': 'CWUser', 'X': 'EmailAddress'}, - {'U': 'CWUser', 'X': 'EmailPart'}, - {'U': 'CWUser', 'X': 'EmailThread'}, - {'U': 'CWUser', 'X': 'ExternalUri'}, - {'U': 'CWUser', 'X': 'File'}, - {'U': 'CWUser', 'X': 'Folder'}, - {'U': 'CWUser', 'X': 'Old'}, - {'U': 'CWUser', 'X': 'Personne'}, - {'U': 'CWUser', 'X': 'RQLExpression'}, - {'U': 'CWUser', 'X': 'Societe'}, - {'U': 'CWUser', 'X': 'SubDivision'}, - {'U': 'CWUser', 'X': 'SubWorkflowExitPoint'}, - {'U': 'CWUser', 'X': 'Tag'}, - {'U': 'CWUser', 'X': 'TrInfo'}, - {'U': 'CWUser', 'X': 'Transition'}, - {'U': 'CWUser', 'X': 'Workflow'}, - {'U': 'CWUser', 'X': 'WorkflowTransition'}])], - None, None, - [self.system], {}, []), - ('OneFetchStep', - [(u'Any X WHERE X owned_by U, U login "anon", U is CWUser, X is IN(Card, Note, State)', - [{'U': 'CWUser', 'X': 'Note'}, - {'U': 'CWUser', 'X': 'State'}, - {'U': 'CWUser', 'X': 'Card'}])], - None, None, - [self.system], {'X': 'table0.C0'}, []) - ]) - ]) - - def test_remove_from_deleted_source_1(self): - self.repo._type_source_cache[999999] = ('Note', 'cards', 999999, 'cards') - self._test('Note X WHERE X eid 999999, NOT X cw_source Y', - [('OneFetchStep', - [('Any 999999 WHERE NOT EXISTS(999999 cw_source Y)', - [{'Y': 'CWSource'}])], - None, None, [self.system], {}, []) - ]) - - def test_remove_from_deleted_source_2(self): - self.repo._type_source_cache[999999] = ('Note', 'cards', 999999, 'cards') - self.repo._type_source_cache[999998] = ('Note', 'cards', 999998, 'cards') - self._test('Note X WHERE X eid IN (999998, 999999), NOT X cw_source Y', - [('FetchStep', - [('Any X WHERE X eid IN(999998, 999999), X is Note', - [{'X': 'Note'}])], - [self.cards], None, {'X': 'table0.C0'}, []), - ('OneFetchStep', - [('Any X WHERE NOT EXISTS(X cw_source Y, Y is CWSource), X is Note', - [{'X': 'Note', 'Y': 'CWSource'}])], - None, None, [self.system],{'X': 'table0.C0'}, []) - ]) - - -class FakeVCSSource(AbstractSource): - uri = 'ccc' - support_entities = {'Card': True, 'Note': True} - support_relations = {'multisource_inlined_rel': True, - 'multisource_rel': True} - - def syntax_tree_search(self, *args, **kwargs): - return [] - -class MSPlannerVCSSource(BasePlannerTC): - - def setUp(self): - self.__class__.repo = repo - self.setup() - self.add_source(FakeVCSSource, 'vcs') - self.planner = MSPlanner(self.o.schema, self.repo.vreg.rqlhelper) - _test = test_plan - - def test_multisource_inlined_rel_skipped(self): - self._test('Any MAX(VC) ' - 'WHERE VC multisource_inlined_rel R2, R para %(branch)s, VC in_state S, S name "published", ' - '(EXISTS(R identity R2)) OR (EXISTS(R multisource_rel R2))', - [('FetchStep', [('Any VC WHERE VC multisource_inlined_rel R2, R para "???", (EXISTS(R identity R2)) OR (EXISTS(R multisource_rel R2)), R is Note, R2 is Note, VC is Note', - [{'R': 'Note', 'R2': 'Note', 'VC': 'Note'}])], - [self.vcs, self.system], None, - {'VC': 'table0.C0'}, - []), - ('OneFetchStep', [(u'Any MAX(VC) WHERE VC in_state S, S name "published", S is State, VC is Note', - [{'S': 'State', 'VC': 'Note'}])], - None, None, [self.system], - {'VC': 'table0.C0'}, - []) - ]) - - def test_fully_simplified_extsource(self): - self.repo._type_source_cache[999998] = ('Note', 'vcs', 999998, 'vcs') - self.repo._type_source_cache[999999] = ('Note', 'vcs', 999999, 'vcs') - self._test('Any X, Y WHERE NOT X multisource_rel Y, X eid 999998, Y eid 999999', - [('OneFetchStep', [('Any 999998,999999 WHERE NOT EXISTS(999998 multisource_rel 999999)', [{}])], - None, None, [self.vcs], {}, []) - ]) - - def test_nonregr_fully_simplified_extsource(self): - self.repo._type_source_cache[999998] = ('Note', 'vcs', 999998, 'vcs') - self.repo._type_source_cache[999999] = ('Note', 'vcs', 999999, 'vcs') - self.repo._type_source_cache[1000000] = ('Note', 'system', 1000000, 'system') - self._test('DISTINCT Any T,FALSE,L,M WHERE L eid 1000000, M eid 999999, T eid 999998', - [('OneFetchStep', [('DISTINCT Any 999998,FALSE,1000000,999999', [{}])], - None, None, [self.system], {}, []) - ]) - - -if __name__ == '__main__': - from logilab.common.testlib import unittest_main - unittest_main() diff -r 46885bfa4150 -r 774029a4a718 server/test/unittest_multisources.py --- a/server/test/unittest_multisources.py Thu Jun 27 18:21:04 2013 +0200 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,384 +0,0 @@ -# copyright 2003-2012 LOGILAB S.A. (Paris, FRANCE), all rights reserved. -# contact http://www.logilab.fr/ -- mailto:contact@logilab.fr -# -# This file is part of CubicWeb. -# -# CubicWeb is free software: you can redistribute it and/or modify it under the -# terms of the GNU Lesser General Public License as published by the Free -# Software Foundation, either version 2.1 of the License, or (at your option) -# any later version. -# -# CubicWeb is distributed in the hope that it will be useful, but WITHOUT -# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS -# FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more -# details. -# -# You should have received a copy of the GNU Lesser General Public License along -# with CubicWeb. If not, see . - -from datetime import datetime, timedelta -from itertools import repeat - -from cubicweb.devtools import TestServerConfiguration, init_test_database -from cubicweb.devtools.testlib import CubicWebTC, Tags -from cubicweb.devtools.repotest import do_monkey_patch, undo_monkey_patch -from cubicweb.devtools import get_test_db_handler - -class ExternalSource1Configuration(TestServerConfiguration): - sourcefile = 'sources_extern' - -class ExternalSource2Configuration(TestServerConfiguration): - sourcefile = 'sources_multi' - -MTIME = datetime.utcnow() - timedelta(0, 10) - -EXTERN_SOURCE_CFG = u''' -cubicweb-user = admin -cubicweb-password = gingkow -base-url=http://extern.org/ -''' - -# hi-jacking -from cubicweb.server.sources.pyrorql import PyroRQLSource -from cubicweb.dbapi import Connection - -PyroRQLSource_get_connection = PyroRQLSource.get_connection -Connection_close = Connection.close - -def add_extern_mapping(source): - source.init_mapping(zip(('Card', 'Affaire', 'State', - 'in_state', 'documented_by', 'multisource_inlined_rel'), - repeat(u'write'))) - - -def pre_setup_database_extern(session, config): - session.execute('INSERT Card X: X title "C3: An external card", X wikiid "aaa"') - session.execute('INSERT Card X: X title "C4: Ze external card", X wikiid "zzz"') - session.execute('INSERT Affaire X: X ref "AFFREF"') - session.commit() - -def pre_setup_database_multi(session, config): - session.create_entity('CWSource', name=u'extern', type=u'pyrorql', - url=u'pyro:///extern', config=EXTERN_SOURCE_CFG) - session.commit() - - -class TwoSourcesTC(CubicWebTC): - """Main repo -> extern-multi -> extern - \-------------/ - """ - test_db_id= 'cw-server-multisources' - tags = CubicWebTC.tags | Tags(('multisources')) - - - def _init_repo(self): - repo2_handler = get_test_db_handler(self._cfg2) - repo2_handler.build_db_cache('4cards-1affaire',pre_setup_func=pre_setup_database_extern) - self.repo2, self.cnx2 = repo2_handler.get_repo_and_cnx('4cards-1affaire') - - repo3_handler = get_test_db_handler(self._cfg3) - repo3_handler.build_db_cache('multisource',pre_setup_func=pre_setup_database_multi) - self.repo3, self.cnx3 = repo3_handler.get_repo_and_cnx('multisource') - - - super(TwoSourcesTC, self)._init_repo() - - def setUp(self): - self._cfg2 = ExternalSource1Configuration('data', apphome=TwoSourcesTC.datadir) - self._cfg3 = ExternalSource2Configuration('data', apphome=TwoSourcesTC.datadir) - TestServerConfiguration.no_sqlite_wrap = True - # hi-jack PyroRQLSource.get_connection to access existing connection (no - # pyro connection) - PyroRQLSource.get_connection = lambda x: x.uri == 'extern-multi' and self.cnx3 or self.cnx2 - # also necessary since the repository is closing its initial connections - # pool though we want to keep cnx2 valid - Connection.close = lambda x: None - CubicWebTC.setUp(self) - self.addCleanup(self.cnx2.close) - self.addCleanup(self.cnx3.close) - do_monkey_patch() - - def tearDown(self): - for source in self.repo.sources[1:]: - self.repo.remove_source(source.uri) - super(TwoSourcesTC, self).tearDown() - PyroRQLSource.get_connection = PyroRQLSource_get_connection - Connection.close = Connection_close - TestServerConfiguration.no_sqlite_wrap = False - undo_monkey_patch() - - @staticmethod - def pre_setup_database(session, config): - for uri, src_config in [('extern', EXTERN_SOURCE_CFG), - ('extern-multi', ''' -cubicweb-user = admin -cubicweb-password = gingkow -''')]: - source = session.create_entity('CWSource', name=unicode(uri), - type=u'pyrorql', url=u'pyro:///extern-multi', - config=unicode(src_config)) - session.commit() - add_extern_mapping(source) - - session.commit() - # trigger discovery - session.execute('Card X') - session.execute('Affaire X') - session.execute('State X') - - def setup_database(self): - cu2 = self.cnx2.cursor() - self.ec1 = cu2.execute('Any X WHERE X is Card, X title "C3: An external card", X wikiid "aaa"')[0][0] - self.aff1 = cu2.execute('Any X WHERE X is Affaire, X ref "AFFREF"')[0][0] - cu2.close() - # add some entities - self.ic1 = self.sexecute('INSERT Card X: X title "C1: An internal card", X wikiid "aaai"')[0][0] - self.ic2 = self.sexecute('INSERT Card X: X title "C2: Ze internal card", X wikiid "zzzi"')[0][0] - - def test_eid_comp(self): - rset = self.sexecute('Card X WHERE X eid > 1') - self.assertEqual(len(rset), 4) - rset = self.sexecute('Any X,T WHERE X title T, X eid > 1') - self.assertEqual(len(rset), 4) - - def test_metainformation(self): - rset = self.sexecute('Card X ORDERBY T WHERE X title T') - # 2 added to the system source, 2 added to the external source - self.assertEqual(len(rset), 4) - # since they are orderd by eid, we know the 3 first one is coming from the system source - # and the others from external source - self.assertEqual(rset.get_entity(0, 0).cw_metainformation(), - {'source': {'type': 'native', 'uri': 'system', 'use-cwuri-as-url': False}, - 'type': u'Card', 'extid': None}) - externent = rset.get_entity(3, 0) - metainf = externent.cw_metainformation() - self.assertEqual(metainf['source'], {'type': 'pyrorql', 'base-url': 'http://extern.org/', 'uri': 'extern', 'use-cwuri-as-url': False}) - self.assertEqual(metainf['type'], 'Card') - self.assert_(metainf['extid']) - etype = self.sexecute('Any ETN WHERE X is ET, ET name ETN, X eid %(x)s', - {'x': externent.eid})[0][0] - self.assertEqual(etype, 'Card') - - def test_order_limit_offset(self): - rsetbase = self.sexecute('Any W,X ORDERBY W,X WHERE X wikiid W') - self.assertEqual(len(rsetbase), 4) - self.assertEqual(sorted(rsetbase.rows), rsetbase.rows) - rset = self.sexecute('Any W,X ORDERBY W,X LIMIT 2 OFFSET 2 WHERE X wikiid W') - self.assertEqual(rset.rows, rsetbase.rows[2:4]) - - def test_has_text(self): - self.repo.sources_by_uri['extern'].synchronize(MTIME) # in case fti_update has been run before - self.assertTrue(self.sexecute('Any X WHERE X has_text "affref"')) - self.assertTrue(self.sexecute('Affaire X WHERE X has_text "affref"')) - self.assertTrue(self.sexecute('Any X ORDERBY FTIRANK(X) WHERE X has_text "affref"')) - self.assertTrue(self.sexecute('Affaire X ORDERBY FTIRANK(X) WHERE X has_text "affref"')) - - def test_anon_has_text(self): - self.repo.sources_by_uri['extern'].synchronize(MTIME) # in case fti_update has been run before - self.sexecute('INSERT Affaire X: X ref "no readable card"')[0][0] - aff1 = self.sexecute('INSERT Affaire X: X ref "card"')[0][0] - # grant read access - self.sexecute('SET X owned_by U WHERE X eid %(x)s, U login "anon"', {'x': aff1}) - self.commit() - cnx = self.login('anon') - cu = cnx.cursor() - rset = cu.execute('Any X WHERE X has_text "card"') - # 5: 4 card + 1 readable affaire - self.assertEqual(len(rset), 5, zip(rset.rows, rset.description)) - rset = cu.execute('Any X ORDERBY FTIRANK(X) WHERE X has_text "card"') - self.assertEqual(len(rset), 5, zip(rset.rows, rset.description)) - Connection_close(cnx.cnx) # cnx is a TestCaseConnectionProxy - - def test_synchronization(self): - cu = self.cnx2.cursor() - assert cu.execute('Any X WHERE X eid %(x)s', {'x': self.aff1}) - cu.execute('SET X ref "BLAH" WHERE X eid %(x)s', {'x': self.aff1}) - aff2 = cu.execute('INSERT Affaire X: X ref "AFFREUX"')[0][0] - self.cnx2.commit() - try: - # force sync - self.repo.sources_by_uri['extern'].synchronize(MTIME) - self.assertTrue(self.sexecute('Any X WHERE X has_text "blah"')) - self.assertTrue(self.sexecute('Any X WHERE X has_text "affreux"')) - cu.execute('DELETE Affaire X WHERE X eid %(x)s', {'x': aff2}) - self.cnx2.commit() - self.repo.sources_by_uri['extern'].synchronize(MTIME) - rset = self.sexecute('Any X WHERE X has_text "affreux"') - self.assertFalse(rset) - finally: - # restore state - cu.execute('SET X ref "AFFREF" WHERE X eid %(x)s', {'x': self.aff1}) - self.cnx2.commit() - - def test_simplifiable_var(self): - affeid = self.sexecute('Affaire X WHERE X ref "AFFREF"')[0][0] - rset = self.sexecute('Any X,AA,AB WHERE E eid %(x)s, E in_state X, X name AA, X modification_date AB', - {'x': affeid}) - self.assertEqual(len(rset), 1) - self.assertEqual(rset[0][1], "pitetre") - - def test_simplifiable_var_2(self): - affeid = self.sexecute('Affaire X WHERE X ref "AFFREF"')[0][0] - rset = self.sexecute('Any E WHERE E eid %(x)s, E in_state S, NOT S name "moved"', - {'x': affeid, 'u': self.session.user.eid}) - self.assertEqual(len(rset), 1) - - def test_sort_func(self): - self.sexecute('Affaire X ORDERBY DUMB_SORT(RF) WHERE X ref RF') - - def test_sort_func_ambigous(self): - self.sexecute('Any X ORDERBY DUMB_SORT(RF) WHERE X title RF') - - def test_in_eid(self): - iec1 = self.repo.extid2eid(self.repo.sources_by_uri['extern'], str(self.ec1), - 'Card', self.session) - rset = self.sexecute('Any X WHERE X eid IN (%s, %s)' % (iec1, self.ic1)) - self.assertEqual(sorted(r[0] for r in rset.rows), sorted([iec1, self.ic1])) - - def test_greater_eid(self): - rset = self.sexecute('Any X WHERE X eid > %s' % (self.ic1 - 1)) - self.assertEqual(len(rset.rows), 2) # self.ic1 and self.ic2 - cu = self.cnx2.cursor() - ec2 = cu.execute('INSERT Card X: X title "glup"')[0][0] - self.cnx2.commit() - # 'X eid > something' should not trigger discovery - rset = self.sexecute('Any X WHERE X eid > %s' % (self.ic1 - 1)) - self.assertEqual(len(rset.rows), 2) - # trigger discovery using another query - crset = self.sexecute('Card X WHERE X title "glup"') - self.assertEqual(len(crset.rows), 1) - rset = self.sexecute('Any X WHERE X eid > %s' % (self.ic1 - 1)) - self.assertEqual(len(rset.rows), 3) - rset = self.sexecute('Any MAX(X)') - self.assertEqual(len(rset.rows), 1) - self.assertEqual(rset.rows[0][0], crset[0][0]) - - def test_attr_unification_1(self): - n1 = self.sexecute('INSERT Note X: X type "AFFREF"')[0][0] - n2 = self.sexecute('INSERT Note X: X type "AFFREU"')[0][0] - rset = self.sexecute('Any X,Y WHERE X is Note, Y is Affaire, X type T, Y ref T') - self.assertEqual(len(rset), 1, rset.rows) - - def test_attr_unification_2(self): - cu = self.cnx2.cursor() - ec2 = cu.execute('INSERT Card X: X title "AFFREF"')[0][0] - self.cnx2.commit() - try: - c1 = self.sexecute('INSERT Card C: C title "AFFREF"')[0][0] - rset = self.sexecute('Any X,Y WHERE X is Card, Y is Affaire, X title T, Y ref T') - self.assertEqual(len(rset), 2, rset.rows) - finally: - cu.execute('DELETE Card X WHERE X eid %(x)s', {'x': ec2}) - self.cnx2.commit() - - def test_attr_unification_neq_1(self): - # XXX complete - self.sexecute('Any X,Y WHERE X is Note, Y is Affaire, X creation_date D, Y creation_date > D') - - def test_attr_unification_neq_2(self): - # XXX complete - self.sexecute('Any X,Y WHERE X is Card, Y is Affaire, X creation_date D, Y creation_date > D') - - def test_union(self): - afeids = self.sexecute('Affaire X') - ueids = self.sexecute('CWUser X') - rset = self.sexecute('(Any X WHERE X is Affaire) UNION (Any X WHERE X is CWUser)') - self.assertEqual(sorted(r[0] for r in rset.rows), - sorted(r[0] for r in afeids + ueids)) - - def test_subquery1(self): - rsetbase = self.sexecute('Any W,X WITH W,X BEING (Any W,X ORDERBY W,X WHERE X wikiid W)') - self.assertEqual(len(rsetbase), 4) - self.assertEqual(sorted(rsetbase.rows), rsetbase.rows) - rset = self.sexecute('Any W,X LIMIT 2 OFFSET 2 WITH W,X BEING (Any W,X ORDERBY W,X WHERE X wikiid W)') - self.assertEqual(rset.rows, rsetbase.rows[2:4]) - rset = self.sexecute('Any W,X ORDERBY W,X LIMIT 2 OFFSET 2 WITH W,X BEING (Any W,X WHERE X wikiid W)') - self.assertEqual(rset.rows, rsetbase.rows[2:4]) - rset = self.sexecute('Any W,X WITH W,X BEING (Any W,X ORDERBY W,X LIMIT 2 OFFSET 2 WHERE X wikiid W)') - self.assertEqual(rset.rows, rsetbase.rows[2:4]) - - def test_subquery2(self): - affeid = self.sexecute('Affaire X WHERE X ref "AFFREF"')[0][0] - rset = self.sexecute('Any X,AA,AB WITH X,AA,AB BEING (Any X,AA,AB WHERE E eid %(x)s, E in_state X, X name AA, X modification_date AB)', - {'x': affeid}) - self.assertEqual(len(rset), 1) - self.assertEqual(rset[0][1], "pitetre") - - def test_not_relation(self): - states = set(tuple(x) for x in self.sexecute('Any S,SN WHERE S is State, S name SN')) - userstate = self.session.user.in_state[0] - states.remove((userstate.eid, userstate.name)) - notstates = set(tuple(x) for x in self.sexecute('Any S,SN WHERE S is State, S name SN, NOT X in_state S, X eid %(x)s', - {'x': self.session.user.eid})) - self.assertSetEqual(notstates, states) - aff1 = self.sexecute('Any X WHERE X is Affaire, X ref "AFFREF"')[0][0] - aff1stateeid, aff1statename = self.sexecute('Any S,SN WHERE X eid %(x)s, X in_state S, S name SN', {'x': aff1})[0] - self.assertEqual(aff1statename, 'pitetre') - states.add((userstate.eid, userstate.name)) - states.remove((aff1stateeid, aff1statename)) - notstates = set(tuple(x) for x in self.sexecute('Any S,SN WHERE S is State, S name SN, NOT X in_state S, X eid %(x)s', - {'x': aff1})) - self.assertSetEqual(notstates, states) - - def test_absolute_url_base_url(self): - cu = self.cnx2.cursor() - ceid = cu.execute('INSERT Card X: X title "without wikiid to get eid based url"')[0][0] - self.cnx2.commit() - lc = self.sexecute('Card X WHERE X title "without wikiid to get eid based url"').get_entity(0, 0) - self.assertEqual(lc.absolute_url(), 'http://extern.org/%s' % ceid) - cu.execute('DELETE Card X WHERE X eid %(x)s', {'x':ceid}) - self.cnx2.commit() - - def test_absolute_url_no_base_url(self): - cu = self.cnx3.cursor() - ceid = cu.execute('INSERT Card X: X title "without wikiid to get eid based url"')[0][0] - self.cnx3.commit() - lc = self.sexecute('Card X WHERE X title "without wikiid to get eid based url"').get_entity(0, 0) - self.assertEqual(lc.absolute_url(), 'http://testing.fr/cubicweb/%s' % lc.eid) - cu.execute('DELETE Card X WHERE X eid %(x)s', {'x':ceid}) - self.cnx3.commit() - - def test_crossed_relation_noeid_needattr(self): - """http://www.cubicweb.org/ticket/1382452""" - aff1 = self.sexecute('INSERT Affaire X: X ref "AFFREF"')[0][0] - # link within extern source - ec1 = self.sexecute('Card X WHERE X wikiid "zzz"')[0][0] - self.sexecute('SET A documented_by C WHERE E eid %(a)s, C eid %(c)s', - {'a': aff1, 'c': ec1}) - # link from system to extern source - self.sexecute('SET A documented_by C WHERE E eid %(a)s, C eid %(c)s', - {'a': aff1, 'c': self.ic2}) - rset = self.sexecute('DISTINCT Any DEP WHERE P ref "AFFREF", P documented_by DEP, DEP wikiid LIKE "z%"') - self.assertEqual(sorted(rset.rows), [[ec1], [self.ic2]]) - - def test_nonregr1(self): - ueid = self.session.user.eid - affaire = self.sexecute('Affaire X WHERE X ref "AFFREF"').get_entity(0, 0) - self.sexecute('Any U WHERE U in_group G, (G name IN ("managers", "logilab") OR (X require_permission P?, P name "bla", P require_group G)), X eid %(x)s, U eid %(u)s', - {'x': affaire.eid, 'u': ueid}) - - def test_nonregr2(self): - iworkflowable = self.session.user.cw_adapt_to('IWorkflowable') - iworkflowable.fire_transition('deactivate') - treid = iworkflowable.latest_trinfo().eid - rset = self.sexecute('Any X ORDERBY D DESC WHERE E eid %(x)s, E wf_info_for X, X modification_date D', - {'x': treid}) - self.assertEqual(len(rset), 1) - self.assertEqual(rset.rows[0], [self.session.user.eid]) - - def test_nonregr3(self): - self.sexecute('DELETE Card X WHERE X eid %(x)s, NOT X multisource_inlined_rel Y', {'x': self.ic1}) - - def test_nonregr4(self): - self.sexecute('Any X,S,U WHERE X in_state S, X todo_by U') - - def test_delete_source(self): - req = self.request() - req.execute('DELETE CWSource S WHERE S name "extern"') - self.commit() - cu = self.session.system_sql("SELECT * FROM entities WHERE source='extern'") - self.assertFalse(cu.fetchall()) - -if __name__ == '__main__': - from logilab.common.testlib import unittest_main - unittest_main()