server/test/unittest_msplanner.py
changeset 9072 774029a4a718
parent 9071 46885bfa4150
child 9073 9574df1cd054
equal deleted inserted replaced
9071:46885bfa4150 9072:774029a4a718
     1 # copyright 2003-2012 LOGILAB S.A. (Paris, FRANCE), all rights reserved.
       
     2 # contact http://www.logilab.fr/ -- mailto:contact@logilab.fr
       
     3 #
       
     4 # This file is part of CubicWeb.
       
     5 #
       
     6 # CubicWeb is free software: you can redistribute it and/or modify it under the
       
     7 # terms of the GNU Lesser General Public License as published by the Free
       
     8 # Software Foundation, either version 2.1 of the License, or (at your option)
       
     9 # any later version.
       
    10 #
       
    11 # CubicWeb is distributed in the hope that it will be useful, but WITHOUT
       
    12 # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
       
    13 # FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for more
       
    14 # details.
       
    15 #
       
    16 # You should have received a copy of the GNU Lesser General Public License along
       
    17 # with CubicWeb.  If not, see <http://www.gnu.org/licenses/>.
       
    18 """unit tests for module cubicweb.server.msplanner"""
       
    19 
       
    20 from logilab.common.decorators import clear_cache
       
    21 from yams.buildobjs import RelationDefinition
       
    22 from rql import BadRQLQuery
       
    23 
       
    24 from cubicweb.devtools import get_test_db_handler, TestServerConfiguration
       
    25 from cubicweb.devtools.repotest import BasePlannerTC, test_plan
       
    26 
       
    27 class _SetGenerator(object):
       
    28     """singleton to easily create set using "s[0]" or "s[0,1,2]" for instance
       
    29     """
       
    30     def __getitem__(self, key):
       
    31         try:
       
    32             it = iter(key)
       
    33         except TypeError:
       
    34             it = (key,)
       
    35         return set(it)
       
    36 s = _SetGenerator()
       
    37 
       
    38 from cubicweb.schema import ERQLExpression
       
    39 from cubicweb.server.sources import AbstractSource
       
    40 from cubicweb.server.msplanner import MSPlanner, PartPlanInformation
       
    41 
       
    42 class FakeUserROSource(AbstractSource):
       
    43     support_entities = {'CWUser': False}
       
    44     support_relations = {}
       
    45     def syntax_tree_search(self, *args, **kwargs):
       
    46         return []
       
    47 
       
    48 
       
    49 class FakeCardSource(AbstractSource):
       
    50     support_entities = {'Card': True, 'Note': True, 'State': True}
       
    51     support_relations = {'in_state': True, 'multisource_rel': True, 'multisource_inlined_rel': True,
       
    52                          'multisource_crossed_rel': True,}
       
    53     dont_cross_relations = set(('fiche', 'state_of'))
       
    54     cross_relations = set(('multisource_crossed_rel',))
       
    55 
       
    56     def syntax_tree_search(self, *args, **kwargs):
       
    57         return []
       
    58 
       
    59 
       
    60 class FakeDataFeedSource(FakeCardSource):
       
    61     copy_based_source = True
       
    62 
       
    63 X_ALL_SOLS = sorted([{'X': 'Affaire'}, {'X': 'BaseTransition'}, {'X': 'Basket'},
       
    64                      {'X': 'Bookmark'}, {'X': 'CWAttribute'}, {'X': 'CWCache'},
       
    65                      {'X': 'CWConstraint'}, {'X': 'CWConstraintType'}, {'X': 'CWDataImport'}, {'X': 'CWEType'},
       
    66                      {'X': 'CWGroup'}, {'X': 'CWPermission'}, {'X': 'CWProperty'},
       
    67                      {'X': 'CWRType'}, {'X': 'CWRelation'},
       
    68                      {'X': 'CWSource'}, {'X': 'CWSourceHostConfig'}, {'X': 'CWSourceSchemaConfig'},
       
    69                      {'X': 'CWUser'}, {'X': 'CWUniqueTogetherConstraint'},
       
    70                      {'X': 'Card'}, {'X': 'Comment'}, {'X': 'Division'},
       
    71                      {'X': 'Email'}, {'X': 'EmailAddress'}, {'X': 'EmailPart'},
       
    72                      {'X': 'EmailThread'}, {'X': 'ExternalUri'}, {'X': 'File'},
       
    73                      {'X': 'Folder'}, {'X': 'Note'}, {'X': 'Old'},
       
    74                      {'X': 'Personne'}, {'X': 'RQLExpression'}, {'X': 'Societe'},
       
    75                      {'X': 'State'}, {'X': 'SubDivision'}, {'X': 'SubWorkflowExitPoint'},
       
    76                      {'X': 'Tag'}, {'X': 'TrInfo'}, {'X': 'Transition'},
       
    77                      {'X': 'Workflow'}, {'X': 'WorkflowTransition'}])
       
    78 
       
    79 
       
    80 # keep cnx so it's not garbage collected and the associated session is closed
       
    81 def setUpModule(*args):
       
    82     global repo, cnx
       
    83     handler = get_test_db_handler(TestServerConfiguration(apphome=BaseMSPlannerTC.datadir))
       
    84     handler.build_db_cache()
       
    85     repo, cnx = handler.get_repo_and_cnx()
       
    86 
       
    87 def tearDownModule(*args):
       
    88     global repo, cnx
       
    89     del repo, cnx
       
    90 
       
    91 
       
    92 class BaseMSPlannerTC(BasePlannerTC):
       
    93     """test planner related feature on a 3-sources repository:
       
    94 
       
    95     * system source supporting everything
       
    96     * ldap source supporting CWUser
       
    97     * rql source supporting Card
       
    98     """
       
    99 
       
   100     def setUp(self):
       
   101         self.__class__.repo = repo
       
   102         #_QuerierTC.setUp(self)
       
   103         self.setup()
       
   104         # hijack Affaire security
       
   105         affreadperms = list(self.schema['Affaire'].permissions['read'])
       
   106         self.prevrqlexpr_affaire = affreadperms[-1]
       
   107         # add access to type attribute so S can't be invariant
       
   108         affreadperms[-1] = ERQLExpression('X concerne S?, S owned_by U, S type "X"')
       
   109         self.schema['Affaire'].set_action_permissions('read', affreadperms)
       
   110         # hijack CWUser security
       
   111         userreadperms = list(self.schema['CWUser'].permissions['read'])
       
   112         self.prevrqlexpr_user = userreadperms[-1]
       
   113         userreadperms[-1] = ERQLExpression('X owned_by U')
       
   114         self.schema['CWUser'].set_action_permissions('read', userreadperms)
       
   115         self.add_source(FakeUserROSource, 'ldap')
       
   116         self.add_source(FakeCardSource, 'cards')
       
   117         self.add_source(FakeDataFeedSource, 'datafeed')
       
   118 
       
   119     def tearDown(self):
       
   120         # restore hijacked security
       
   121         self.restore_orig_affaire_security()
       
   122         self.restore_orig_cwuser_security()
       
   123         super(BaseMSPlannerTC, self).tearDown()
       
   124 
       
   125     def restore_orig_affaire_security(self):
       
   126         affreadperms = list(self.schema['Affaire'].permissions['read'])
       
   127         affreadperms[-1] = self.prevrqlexpr_affaire
       
   128         self.schema['Affaire'].set_action_permissions('read', affreadperms)
       
   129 
       
   130     def restore_orig_cwuser_security(self):
       
   131         if hasattr(self, '_orig_cwuser_security_restored'):
       
   132             return
       
   133         self._orig_cwuser_security_restored = True
       
   134         userreadperms = list(self.schema['CWUser'].permissions['read'])
       
   135         userreadperms[-1] = self.prevrqlexpr_user
       
   136         self.schema['CWUser'].set_action_permissions('read', userreadperms)
       
   137 
       
   138 
       
   139 class PartPlanInformationTC(BaseMSPlannerTC):
       
   140 
       
   141     def _test(self, rql, *args):
       
   142         if len(args) == 3:
       
   143             kwargs, sourcesterms, needsplit = args
       
   144         else:
       
   145             sourcesterms, needsplit = args
       
   146             kwargs = None
       
   147         plan = self._prepare_plan(rql, kwargs)
       
   148         union = plan.rqlst
       
   149         plan.preprocess(union)
       
   150         ppi = PartPlanInformation(plan, union.children[0])
       
   151         for sourcevars in ppi._sourcesterms.itervalues():
       
   152             for var in list(sourcevars):
       
   153                 solindices = sourcevars.pop(var)
       
   154                 sourcevars[var._ms_table_key()] = solindices
       
   155         self.assertEqual(ppi._sourcesterms, sourcesterms)
       
   156         self.assertEqual(ppi.needsplit, needsplit)
       
   157 
       
   158 
       
   159     def test_simple_system_only(self):
       
   160         """retrieve entities only supported by the system source"""
       
   161         self._test('CWGroup X',
       
   162                    {self.system: {'X': s[0]}}, False)
       
   163 
       
   164     def test_simple_system_ldap(self):
       
   165         """retrieve CWUser X from both sources and return concatenation of results
       
   166         """
       
   167         self._test('CWUser X',
       
   168                    {self.system: {'X': s[0]}, self.ldap: {'X': s[0]}}, False)
       
   169 
       
   170     def test_simple_system_rql(self):
       
   171         """retrieve Card X from both sources and return concatenation of results
       
   172         """
       
   173         self._test('Any X, XT WHERE X is Card, X title XT',
       
   174                    {self.system: {'X': s[0]}, self.cards: {'X': s[0]}}, False)
       
   175 
       
   176     def test_simple_eid_specified(self):
       
   177         """retrieve CWUser X from system source (eid is specified, can locate the entity)
       
   178         """
       
   179         ueid = self.session.user.eid
       
   180         self._test('Any X,L WHERE X eid %(x)s, X login L', {'x': ueid},
       
   181                    {self.system: {'X': s[0]}}, False)
       
   182 
       
   183     def test_simple_eid_invariant(self):
       
   184         """retrieve CWUser X from system source (eid is specified, can locate the entity)
       
   185         """
       
   186         ueid = self.session.user.eid
       
   187         self._test('Any X WHERE X eid %(x)s', {'x': ueid},
       
   188                    {self.system: {'x': s[0]}}, False)
       
   189 
       
   190     def test_simple_invariant(self):
       
   191         """retrieve CWUser X from system source only (X is invariant and in_group not supported by ldap source)
       
   192         """
       
   193         self._test('Any X WHERE X is CWUser, X in_group G, G name "users"',
       
   194                    {self.system: {'X': s[0], 'G': s[0], 'in_group': s[0]}}, False)
       
   195 
       
   196     def test_security_has_text(self):
       
   197         """retrieve CWUser X from system source only (has_text not supported by ldap source)
       
   198         """
       
   199         # specify CWUser instead of any since the way this test is written we aren't well dealing
       
   200         # with ambigous query (eg only considering the first solution)
       
   201         self._test('CWUser X WHERE X has_text "bla"',
       
   202                    {self.system: {'X': s[0]}}, False)
       
   203 
       
   204     def test_complex_base(self):
       
   205         """
       
   206         1. retrieve Any X, L WHERE X is CWUser, X login L from system and ldap sources, store
       
   207            concatenation of results into a temporary table
       
   208         2. return the result of Any X, L WHERE X is TMP, X login L, X in_group G,
       
   209            G name 'users' on the system source
       
   210         """
       
   211         self._test('Any X,L WHERE X is CWUser, X in_group G, X login L, G name "users"',
       
   212                    {self.system: {'X': s[0], 'G': s[0], 'in_group': s[0]},
       
   213                     self.ldap : {'X': s[0]}}, True)
       
   214 
       
   215     def test_complex_invariant_ordered(self):
       
   216         """
       
   217         1. retrieve Any X,AA WHERE X modification_date AA from system and ldap sources, store
       
   218            concatenation of results into a temporary table
       
   219         2. return the result of Any X,AA ORDERBY AA WHERE %s owned_by X, X modification_date AA
       
   220            on the system source
       
   221         """
       
   222         ueid = self.session.user.eid
       
   223         self._test('Any X,AA ORDERBY AA WHERE E eid %(x)s, E owned_by X, X modification_date AA', {'x': ueid},
       
   224                    {self.system: {'x': s[0], 'X': s[0], 'owned_by': s[0]},
       
   225                     self.ldap : {'X': s[0]}}, True)
       
   226 
       
   227     def test_complex_invariant(self):
       
   228         """
       
   229         1. retrieve Any X,L,AA WHERE X login L, X modification_date AA from system and ldap sources, store
       
   230            concatenation of results into a temporary table
       
   231         2. return the result of Any X,L,AA WHERE %s owned_by X, X login L, X modification_date AA
       
   232            on the system source
       
   233         """
       
   234         ueid = self.session.user.eid
       
   235         self._test('Any X,L,AA WHERE E eid %(x)s, E owned_by X, X login L, X modification_date AA', {'x': ueid},
       
   236                    {self.system: {'x': s[0], 'X': s[0], 'owned_by': s[0]},
       
   237                     self.ldap : {'X': s[0]}}, True)
       
   238 
       
   239     def test_complex_ambigous(self):
       
   240         """retrieve CWUser X from system and ldap sources, Person X from system source only
       
   241         """
       
   242         self._test('Any X,F WHERE X firstname F',
       
   243                    {self.system: {'X': s[0, 1]},
       
   244                     self.ldap: {'X': s[0]}}, True)
       
   245 
       
   246     def test_complex_multiple(self):
       
   247         """
       
   248         1. retrieve Any X,A,Y,B WHERE X login A, Y login B from system and ldap sources, store
       
   249            cartesian product of results into a temporary table
       
   250         2. return the result of Any X,Y WHERE X login 'syt', Y login 'adim'
       
   251            on the system source
       
   252         """
       
   253         ueid = self.session.user.eid
       
   254         self._test('Any X,Y WHERE X login "syt", Y login "adim"', {'x': ueid},
       
   255                    {self.system: {'Y': s[0], 'X': s[0]},
       
   256                     self.ldap: {'Y': s[0], 'X': s[0]}}, True)
       
   257 
       
   258     def test_complex_aggregat(self):
       
   259         solindexes = set(range(len([e for e in self.schema.entities() if not e.final])))
       
   260         self._test('Any MAX(X)',
       
   261                    {self.system: {'X': solindexes}}, False)
       
   262 
       
   263     def test_complex_optional(self):
       
   264         ueid = self.session.user.eid
       
   265         self._test('Any U WHERE WF wf_info_for X, X eid %(x)s, WF owned_by U?, WF from_state FS', {'x': ueid},
       
   266                    {self.system: {'WF': s[0], 'FS': s[0], 'U': s[0],
       
   267                                   'from_state': s[0], 'owned_by': s[0], 'wf_info_for': s[0],
       
   268                                   'x': s[0]}},
       
   269                    False)
       
   270 
       
   271     def test_exists4(self):
       
   272         """
       
   273         State S could come from both rql source and system source,
       
   274         but since X cannot come from the rql source, the solution
       
   275         {self.cards : 'S'} must be removed
       
   276         """
       
   277         self._test('Any G,L WHERE X in_group G, X login L, G name "managers", '
       
   278                    'EXISTS(X copain T, T login L, T login in ("comme", "cochon")) OR '
       
   279                    'EXISTS(X in_state S, S name "pascontent", NOT X copain T2, T2 login "billy")',
       
   280                    {self.system: {'X': s[0], 'S': s[0], 'T2': s[0], 'T': s[0], 'G': s[0], 'copain': s[0], 'in_group': s[0]},
       
   281                     self.ldap: {'X': s[0], 'T2': s[0], 'T': s[0]}},
       
   282                    True)
       
   283 
       
   284     def test_relation_need_split(self):
       
   285         self._test('Any X, S WHERE X in_state S',
       
   286                    {self.system: {'X': s[0, 1, 2], 'S': s[0, 1, 2]},
       
   287                     self.cards: {'X': s[2], 'S': s[2]}},
       
   288                    True)
       
   289 
       
   290     def test_not_relation_need_split(self):
       
   291         self._test('Any SN WHERE NOT X in_state S, S name SN',
       
   292                    {self.cards: {'X': s[2], 'S': s[0, 1, 2]},
       
   293                     self.system: {'X': s[0, 1, 2], 'S': s[0, 1, 2]}},
       
   294                    True)
       
   295 
       
   296     def test_not_relation_no_split_external(self):
       
   297         repo._type_source_cache[999999] = ('Note', 'cards', 999999, 'cards')
       
   298         # similar to the above test but with an eid coming from the external source.
       
   299         # the same plan may be used, since we won't find any record in the system source
       
   300         # linking 9999999 to a state
       
   301         self._test('Any SN WHERE NOT X in_state S, X eid %(x)s, S name SN',
       
   302                    {'x': 999999},
       
   303                    {self.cards: {'x': s[0], 'S': s[0]},
       
   304                     self.system: {'x': s[0], 'S': s[0]}},
       
   305                    False)
       
   306 
       
   307     def test_relation_restriction_ambigous_need_split(self):
       
   308         self._test('Any X,T WHERE X in_state S, S name "pending", T tags X',
       
   309                    {self.system: {'X': s[0, 1, 2], 'S': s[0, 1, 2], 'T': s[0, 1, 2], 'tags': s[0, 1, 2]},
       
   310                     self.cards: {'X': s[2], 'S': s[2]}},
       
   311                    True)
       
   312 
       
   313     def test_simplified_var(self):
       
   314         repo._type_source_cache[999999] = ('Note', 'cards', 999999, 'cards')
       
   315         # need access to source since X table has to be accessed because of the outer join
       
   316         self._test('Any U WHERE U in_group G, (G name IN ("managers", "logilab") OR (X require_permission P?, P name "bla", P require_group G)), X eid %(x)s, U eid %(u)s',
       
   317                    {'x': 999999, 'u': self.session.user.eid},
       
   318                    {self.system: {'P': s[0], 'G': s[0],
       
   319                                   'require_permission': s[0], 'in_group': s[0], 'P': s[0], 'require_group': s[0],
       
   320                                   'u': s[0]},
       
   321                     self.cards: {'X': s[0]}},
       
   322                    True)
       
   323 
       
   324     def test_delete_relation1(self):
       
   325         ueid = self.session.user.eid
       
   326         self._test('Any X, Y WHERE X created_by Y, X eid %(x)s, NOT Y eid %(y)s',
       
   327                    {'x': ueid, 'y': ueid},
       
   328                    {self.system: {'Y': s[0], 'created_by': s[0], 'x': s[0]}},
       
   329                    False)
       
   330 
       
   331     def test_crossed_relation_eid_1_needattr(self):
       
   332         repo._type_source_cache[999999] = ('Note', 'system', 999999, 'system')
       
   333         ueid = self.session.user.eid
       
   334         self._test('Any Y,T WHERE X eid %(x)s, X multisource_crossed_rel Y, Y type T',
       
   335                    {'x': 999999,},
       
   336                    {self.cards: {'Y': s[0]}, self.system: {'Y': s[0], 'x': s[0]}},
       
   337                    True)
       
   338 
       
   339     def test_crossed_relation_eid_1_invariant(self):
       
   340         repo._type_source_cache[999999] = ('Note', 'system', 999999, 'system')
       
   341         self._test('Any Y WHERE X eid %(x)s, X multisource_crossed_rel Y',
       
   342                    {'x': 999999},
       
   343                    {self.system: {'Y': s[0], 'x': s[0]}},
       
   344                    False)
       
   345 
       
   346     def test_crossed_relation_eid_2_invariant(self):
       
   347         repo._type_source_cache[999999] = ('Note', 'cards', 999999, 'cards')
       
   348         self._test('Any Y WHERE X eid %(x)s, X multisource_crossed_rel Y',
       
   349                    {'x': 999999,},
       
   350                    {self.cards: {'Y': s[0], 'multisource_crossed_rel': s[0], 'x': s[0]},
       
   351                     self.system: {'Y': s[0], 'multisource_crossed_rel': s[0], 'x': s[0]}},
       
   352                    False)
       
   353 
       
   354     def test_version_crossed_depends_on_1(self):
       
   355         repo._type_source_cache[999999] = ('Note', 'cards', 999999, 'cards')
       
   356         self._test('Any X,AD,AE WHERE E eid %(x)s, E multisource_crossed_rel X, X in_state AD, AD name AE',
       
   357                    {'x': 999999},
       
   358                    {self.cards: {'X': s[0], 'AD': s[0], 'multisource_crossed_rel': s[0], 'x': s[0]},
       
   359                     self.system: {'X': s[0], 'AD': s[0], 'multisource_crossed_rel': s[0], 'x': s[0]}},
       
   360                    True)
       
   361 
       
   362     def test_version_crossed_depends_on_2(self):
       
   363         repo._type_source_cache[999999] = ('Note', 'system', 999999, 'system')
       
   364         self._test('Any X,AD,AE WHERE E eid %(x)s, E multisource_crossed_rel X, X in_state AD, AD name AE',
       
   365                    {'x': 999999},
       
   366                    {self.cards: {'X': s[0], 'AD': s[0]},
       
   367                     self.system: {'X': s[0], 'AD': s[0], 'x': s[0]}},
       
   368                     True)
       
   369 
       
   370     def test_simplified_var_3(self):
       
   371         repo._type_source_cache[999999] = ('Note', 'cards', 999999, 'cards')
       
   372         repo._type_source_cache[999998] = ('State', 'cards', 999998, 'cards')
       
   373         self._test('Any S,T WHERE S eid %(s)s, N eid %(n)s, N type T, N is Note, S is State',
       
   374                    {'n': 999999, 's': 999998},
       
   375                    {self.cards: {'s': s[0], 'N': s[0]}}, False)
       
   376 
       
   377 
       
   378 
       
   379 class MSPlannerTC(BaseMSPlannerTC):
       
   380 
       
   381     def setUp(self):
       
   382         BaseMSPlannerTC.setUp(self)
       
   383         self.planner = MSPlanner(self.o.schema, self.repo.vreg.rqlhelper)
       
   384         for cached in ('rel_type_sources', 'can_cross_relation', 'is_multi_sources_relation'):
       
   385             clear_cache(self.repo, cached)
       
   386 
       
   387     _test = test_plan
       
   388 
       
   389     def test_simple_system_only(self):
       
   390         """retrieve entities only supported by the system source
       
   391         """
       
   392         self._test('CWGroup X',
       
   393                    [('OneFetchStep', [('Any X WHERE X is CWGroup', [{'X': 'CWGroup'}])],
       
   394                      None, None, [self.system], {}, [])])
       
   395 
       
   396     def test_simple_system_only_limit(self):
       
   397         """retrieve entities only supported by the system source
       
   398         """
       
   399         self._test('CWGroup X LIMIT 10',
       
   400                    [('OneFetchStep', [('Any X LIMIT 10 WHERE X is CWGroup', [{'X': 'CWGroup'}])],
       
   401                      10, None, [self.system], {}, [])])
       
   402 
       
   403     def test_simple_system_only_limit_offset(self):
       
   404         """retrieve entities only supported by the system source
       
   405         """
       
   406         self._test('CWGroup X LIMIT 10 OFFSET 10',
       
   407                    [('OneFetchStep', [('Any X LIMIT 10 OFFSET 10 WHERE X is CWGroup', [{'X': 'CWGroup'}])],
       
   408                      10, 10, [self.system], {}, [])])
       
   409 
       
   410     def test_simple_system_ldap(self):
       
   411         """retrieve CWUser X from both sources and return concatenation of results
       
   412         """
       
   413         self._test('CWUser X',
       
   414                    [('OneFetchStep', [('Any X WHERE X is CWUser', [{'X': 'CWUser'}])],
       
   415                      None, None, [self.ldap, self.system], {}, [])])
       
   416 
       
   417     def test_simple_system_ldap_limit(self):
       
   418         """retrieve CWUser X from both sources and return concatenation of results
       
   419         """
       
   420         self._test('CWUser X LIMIT 10',
       
   421                    [('OneFetchStep', [('Any X LIMIT 10 WHERE X is CWUser', [{'X': 'CWUser'}])],
       
   422                      10, None, [self.ldap, self.system], {}, [])])
       
   423 
       
   424     def test_simple_system_ldap_limit_offset(self):
       
   425         """retrieve CWUser X from both sources and return concatenation of results
       
   426         """
       
   427         self._test('CWUser X LIMIT 10 OFFSET 10',
       
   428                    [('OneFetchStep', [('Any X LIMIT 10 OFFSET 10 WHERE X is CWUser', [{'X': 'CWUser'}])],
       
   429                      10, 10, [self.ldap, self.system], {}, [])])
       
   430 
       
   431     def test_simple_system_ldap_ordered_limit_offset(self):
       
   432         """retrieve CWUser X from both sources and return concatenation of results
       
   433         """
       
   434         self._test('CWUser X ORDERBY X LIMIT 10 OFFSET 10',
       
   435                    [('AggrStep', 'SELECT table0.C0 FROM table0\nORDER BY table0.C0\nLIMIT 10\nOFFSET 10', None, [
       
   436                        ('FetchStep', [('Any X WHERE X is CWUser', [{'X': 'CWUser'}])],
       
   437                         [self.ldap, self.system], {}, {'X': 'table0.C0'}, []),
       
   438                        ]),
       
   439                    ])
       
   440     def test_simple_system_ldap_aggregat(self):
       
   441         """retrieve CWUser X from both sources and return concatenation of results
       
   442         """
       
   443         # COUNT(X) is kept in sub-step and transformed into SUM(X) in the AggrStep
       
   444         self._test('Any COUNT(X) WHERE X is CWUser',
       
   445                    [('AggrStep', 'SELECT SUM(table0.C0) FROM table0', None, [
       
   446                        ('FetchStep', [('Any COUNT(X) WHERE X is CWUser', [{'X': 'CWUser'}])],
       
   447                         [self.ldap, self.system], {}, {'COUNT(X)': 'table0.C0'}, []),
       
   448                        ]),
       
   449                    ])
       
   450 
       
   451     def test_simple_system_rql(self):
       
   452         """retrieve Card X from both sources and return concatenation of results
       
   453         """
       
   454         self._test('Any X, XT WHERE X is Card, X title XT',
       
   455                    [('OneFetchStep', [('Any X,XT WHERE X is Card, X title XT', [{'X': 'Card', 'XT': 'String'}])],
       
   456                      None, None, [self.cards, self.system], {}, [])])
       
   457 
       
   458     def test_simple_eid_specified(self):
       
   459         """retrieve CWUser X from system source (eid is specified, can locate the entity)
       
   460         """
       
   461         ueid = self.session.user.eid
       
   462         self._test('Any X,L WHERE X eid %(x)s, X login L',
       
   463                    [('OneFetchStep', [('Any X,L WHERE X eid %s, X login L'%ueid, [{'X': 'CWUser', 'L': 'String'}])],
       
   464                      None, None, [self.system], {}, [])],
       
   465                    {'x': ueid})
       
   466 
       
   467     def test_simple_eid_invariant(self):
       
   468         """retrieve CWUser X from system source (eid is specified, can locate the entity)
       
   469         """
       
   470         ueid = self.session.user.eid
       
   471         self._test('Any X WHERE X eid %(x)s',
       
   472                    [('OneFetchStep', [('Any %s'%ueid, [{}])],
       
   473                      None, None, [self.system], {}, [])],
       
   474                    {'x': ueid})
       
   475 
       
   476     def test_simple_invariant(self):
       
   477         """retrieve CWUser X from system source only (X is invariant and in_group not supported by ldap source)
       
   478         """
       
   479         self._test('Any X WHERE X is CWUser, X in_group G, G name "users"',
       
   480                    [('OneFetchStep', [('Any X WHERE X is CWUser, X in_group G, G name "users"',
       
   481                                        [{'X': 'CWUser', 'G': 'CWGroup'}])],
       
   482                      None, None, [self.system], {}, [])])
       
   483 
       
   484     def test_complex_base(self):
       
   485         """
       
   486         1. retrieve Any X, L WHERE X is CWUser, X login L from system and ldap sources, store
       
   487            concatenation of results into a temporary table
       
   488         2. return the result of Any X, L WHERE X is TMP, X login LX in_group G,
       
   489            G name 'users' on the system source
       
   490         """
       
   491         self._test('Any X,L WHERE X is CWUser, X in_group G, X login L, G name "users"',
       
   492                    [('FetchStep', [('Any X,L WHERE X login L, X is CWUser', [{'X': 'CWUser', 'L': 'String'}])],
       
   493                      [self.ldap, self.system], None,
       
   494                      {'X': 'table0.C0', 'X.login': 'table0.C1', 'L': 'table0.C1'}, []),
       
   495                     ('OneFetchStep', [('Any X,L WHERE X in_group G, X login L, G name "users", G is CWGroup, X is CWUser',
       
   496                                        [{'X': 'CWUser', 'L': 'String', 'G': 'CWGroup'}])],
       
   497                      None, None, [self.system],
       
   498                      {'X': 'table0.C0', 'X.login': 'table0.C1', 'L': 'table0.C1'}, [])
       
   499                     ])
       
   500 
       
   501     def test_complex_base_limit_offset(self):
       
   502         """
       
   503         1. retrieve Any X, L WHERE X is CWUser, X login L from system and ldap sources, store
       
   504            concatenation of results into a temporary table
       
   505         2. return the result of Any X, L WHERE X is TMP, X login LX in_group G,
       
   506            G name 'users' on the system source
       
   507         """
       
   508         self._test('Any X,L LIMIT 10 OFFSET 10 WHERE X is CWUser, X in_group G, X login L, G name "users"',
       
   509                    [('FetchStep', [('Any X,L WHERE X login L, X is CWUser', [{'X': 'CWUser', 'L': 'String'}])],
       
   510                      [self.ldap, self.system], None,
       
   511                      {'X': 'table0.C0', 'X.login': 'table0.C1', 'L': 'table0.C1'}, []),
       
   512                     ('OneFetchStep', [('Any X,L LIMIT 10 OFFSET 10 WHERE X in_group G, X login L, G name "users", G is CWGroup, X is CWUser',
       
   513                                        [{'X': 'CWUser', 'L': 'String', 'G': 'CWGroup'}])],
       
   514                      10, 10,
       
   515                      [self.system], {'X': 'table0.C0', 'X.login': 'table0.C1', 'L': 'table0.C1'}, [])
       
   516                     ])
       
   517 
       
   518     def test_complex_ordered(self):
       
   519         self._test('Any L ORDERBY L WHERE X login L',
       
   520                    [('AggrStep', 'SELECT table0.C0 FROM table0\nORDER BY table0.C0', None,
       
   521                      [('FetchStep', [('Any L WHERE X login L, X is CWUser',
       
   522                                       [{'X': 'CWUser', 'L': 'String'}])],
       
   523                        [self.ldap, self.system], {}, {'X.login': 'table0.C0', 'L': 'table0.C0'}, []),
       
   524                       ])
       
   525                     ])
       
   526 
       
   527     def test_complex_ordered_limit_offset(self):
       
   528         self._test('Any L ORDERBY L LIMIT 10 OFFSET 10 WHERE X login L',
       
   529                    [('AggrStep', 'SELECT table0.C0 FROM table0\nORDER BY table0.C0\nLIMIT 10\nOFFSET 10', None,
       
   530                      [('FetchStep', [('Any L WHERE X login L, X is CWUser',
       
   531                                       [{'X': 'CWUser', 'L': 'String'}])],
       
   532                        [self.ldap, self.system], {}, {'X.login': 'table0.C0', 'L': 'table0.C0'}, []),
       
   533                       ])
       
   534                     ])
       
   535 
       
   536     def test_complex_invariant_ordered(self):
       
   537         """
       
   538         1. retrieve Any X,AA WHERE X modification_date AA from system and ldap sources, store
       
   539            concatenation of results into a temporary table
       
   540         2. return the result of Any X,AA ORDERBY AA WHERE %s owned_by X, X modification_date AA
       
   541            on the system source
       
   542 
       
   543         herrr, this is what is expected by the XXX :(, not the actual result (which is correct anyway)
       
   544         """
       
   545         ueid = self.session.user.eid
       
   546         self._test('Any X,AA ORDERBY AA WHERE E eid %(x)s, E owned_by X, X modification_date AA',
       
   547                    [('FetchStep',
       
   548                      [('Any X,AA WHERE X modification_date AA, X is CWUser',
       
   549                        [{'AA': 'Datetime', 'X': 'CWUser'}])],
       
   550                      [self.ldap, self.system], None,
       
   551                      {'AA': 'table0.C1', 'X': 'table0.C0', 'X.modification_date': 'table0.C1'}, []),
       
   552                     ('OneFetchStep',
       
   553                      [('Any X,AA ORDERBY AA WHERE %s owned_by X, X modification_date AA, X is CWUser' % ueid,
       
   554                        [{'AA': 'Datetime', 'X': 'CWUser'}])],
       
   555                      None, None, [self.system],
       
   556                      {'AA': 'table0.C1', 'X': 'table0.C0', 'X.modification_date': 'table0.C1'}, []),
       
   557                     ],
       
   558                    {'x': ueid})
       
   559 
       
   560     def test_complex_invariant(self):
       
   561         """
       
   562         1. retrieve Any X,L,AA WHERE X login L, X modification_date AA from system and ldap sources, store
       
   563            concatenation of results into a temporary table
       
   564         2. return the result of Any X,L,AA WHERE %s owned_by X, X login L, X modification_date AA
       
   565            on the system source
       
   566         """
       
   567         ueid = self.session.user.eid
       
   568         self._test('Any X,L,AA WHERE E eid %(x)s, E owned_by X, X login L, X modification_date AA',
       
   569                    [('FetchStep', [('Any X,L,AA WHERE X login L, X modification_date AA, X is CWUser',
       
   570                                     [{'AA': 'Datetime', 'X': 'CWUser', 'L': 'String'}])],
       
   571                      [self.ldap, self.system], None,
       
   572                      {'AA': 'table0.C2', 'X': 'table0.C0', 'X.login': 'table0.C1', 'X.modification_date': 'table0.C2', 'L': 'table0.C1'}, []),
       
   573                     ('OneFetchStep', [('Any X,L,AA WHERE %s owned_by X, X login L, X modification_date AA, X is CWUser'%ueid,
       
   574                                        [{'AA': 'Datetime', 'X': 'CWUser', 'L': 'String'}])],
       
   575                      None, None, [self.system],
       
   576                      {'AA': 'table0.C2', 'X': 'table0.C0', 'X.login': 'table0.C1', 'X.modification_date': 'table0.C2', 'L': 'table0.C1'}, [])],
       
   577                    {'x': ueid})
       
   578 
       
   579     def test_complex_ambigous(self):
       
   580         """retrieve CWUser X from system and ldap sources, Person X from system source only
       
   581         """
       
   582         self._test('Any X,F WHERE X firstname F',
       
   583                    [('UnionStep', None, None, [
       
   584                        ('OneFetchStep', [('Any X,F WHERE X firstname F, X is CWUser',
       
   585                                           [{'X': 'CWUser', 'F': 'String'}])],
       
   586                         None, None, [self.ldap, self.system], {}, []),
       
   587                        ('OneFetchStep', [('Any X,F WHERE X firstname F, X is Personne',
       
   588                                           [{'X': 'Personne', 'F': 'String'}])],
       
   589                         None, None, [self.system], {}, []),
       
   590                        ]),
       
   591                     ])
       
   592 
       
   593     def test_complex_ambigous_limit_offset(self):
       
   594         """retrieve CWUser X from system and ldap sources, Person X from system source only
       
   595         """
       
   596         self._test('Any X,F LIMIT 10 OFFSET 10 WHERE X firstname F',
       
   597                    [('UnionStep', 10, 10, [
       
   598                        ('OneFetchStep', [('Any X,F WHERE X firstname F, X is CWUser',
       
   599                                           [{'X': 'CWUser', 'F': 'String'}])],
       
   600                         None, None,
       
   601                         [self.ldap, self.system], {}, []),
       
   602                        ('OneFetchStep', [('Any X,F WHERE X firstname F, X is Personne',
       
   603                                           [{'X': 'Personne', 'F': 'String'}])],
       
   604                         None, None, [self.system], {}, []),
       
   605                        ]),
       
   606                     ])
       
   607 
       
   608     def test_complex_ambigous_ordered(self):
       
   609         """
       
   610         1. retrieve CWUser X from system and ldap sources, Person X from system source only, store
       
   611            each result in the same temp table
       
   612         2. return content of the table sorted
       
   613         """
       
   614         self._test('Any X,F ORDERBY F WHERE X firstname F',
       
   615                    [('AggrStep', 'SELECT table0.C0, table0.C1 FROM table0\nORDER BY table0.C1', None,
       
   616                      [('FetchStep', [('Any X,F WHERE X firstname F, X is CWUser',
       
   617                                       [{'X': 'CWUser', 'F': 'String'}])],
       
   618                        [self.ldap, self.system], {},
       
   619                        {'X': 'table0.C0', 'X.firstname': 'table0.C1', 'F': 'table0.C1'}, []),
       
   620                       ('FetchStep', [('Any X,F WHERE X firstname F, X is Personne',
       
   621                                       [{'X': 'Personne', 'F': 'String'}])],
       
   622                        [self.system], {},
       
   623                        {'X': 'table0.C0', 'X.firstname': 'table0.C1', 'F': 'table0.C1'}, []),
       
   624                       ]),
       
   625                     ])
       
   626 
       
   627     def test_complex_multiple(self):
       
   628         """
       
   629         1. retrieve Any X,A,Y,B WHERE X login A, Y login B from system and ldap sources, store
       
   630            cartesian product of results into a temporary table
       
   631         2. return the result of Any X,Y WHERE X login 'syt', Y login 'adim'
       
   632            on the system source
       
   633         """
       
   634         ueid = self.session.user.eid
       
   635         self._test('Any X,Y WHERE X login "syt", Y login "adim"',
       
   636                    [('FetchStep',
       
   637                      [('Any X WHERE X login "syt", X is CWUser', [{'X': 'CWUser'}])],
       
   638                      [self.ldap, self.system], None,
       
   639                      {'X': 'table0.C0'}, []),
       
   640                     ('FetchStep',
       
   641                      [('Any Y WHERE Y login "adim", Y is CWUser', [{'Y': 'CWUser'}])],
       
   642                      [self.ldap, self.system], None,
       
   643                      {'Y': 'table1.C0'}, []),
       
   644                     ('OneFetchStep',
       
   645                      [('Any X,Y WHERE X is CWUser, Y is CWUser', [{'X': 'CWUser', 'Y': 'CWUser'}])],
       
   646                      None, None, [self.system],
       
   647                      {'X': 'table0.C0', 'Y': 'table1.C0'}, [])
       
   648                     ], {'x': ueid})
       
   649 
       
   650     def test_complex_multiple_limit_offset(self):
       
   651         """
       
   652         1. retrieve Any X,A,Y,B WHERE X login A, Y login B from system and ldap sources, store
       
   653            cartesian product of results into a temporary table
       
   654         2. return the result of Any X,Y WHERE X login 'syt', Y login 'adim'
       
   655            on the system source
       
   656         """
       
   657         self._test('Any X,Y LIMIT 10 OFFSET 10 WHERE X login "syt", Y login "adim"',
       
   658                    [('FetchStep',
       
   659                      [('Any X WHERE X login "syt", X is CWUser', [{'X': 'CWUser'}])],
       
   660                      [self.ldap, self.system], None, {'X': 'table0.C0'}, []),
       
   661                     ('FetchStep',
       
   662                      [('Any Y WHERE Y login "adim", Y is CWUser', [{'Y': 'CWUser'}])],
       
   663                      [self.ldap, self.system], None, {'Y': 'table1.C0'}, []),
       
   664                     ('OneFetchStep',
       
   665                      [('Any X,Y LIMIT 10 OFFSET 10 WHERE X is CWUser, Y is CWUser', [{'X': 'CWUser', 'Y': 'CWUser'}])],
       
   666                      10, 10, [self.system],
       
   667                      {'X': 'table0.C0', 'Y': 'table1.C0'}, [])
       
   668                     ])
       
   669 
       
   670     def test_complex_aggregat(self):
       
   671         self._test('Any MAX(X)',
       
   672                    [('OneFetchStep',
       
   673                      [('Any MAX(X)', X_ALL_SOLS)],
       
   674                      None, None, [self.system], {}, [])
       
   675                     ])
       
   676 
       
   677     def test_complex_typed_aggregat(self):
       
   678         self._test('Any MAX(X) WHERE X is Card',
       
   679                    [('AggrStep', 'SELECT MAX(table0.C0) FROM table0',  None,
       
   680                      [('FetchStep',
       
   681                        [('Any MAX(X) WHERE X is Card', [{'X': 'Card'}])],
       
   682                        [self.cards, self.system], {}, {'MAX(X)': 'table0.C0'}, [])
       
   683                       ])
       
   684                     ])
       
   685 
       
   686     def test_complex_greater_eid(self):
       
   687         self._test('Any X WHERE X eid > 12',
       
   688                    [('OneFetchStep',
       
   689                      [('Any X WHERE X eid > 12', X_ALL_SOLS)],
       
   690                      None, None, [self.system], {}, [])
       
   691                     ])
       
   692 
       
   693     def test_complex_greater_typed_eid(self):
       
   694         self._test('Any X WHERE X eid > 12, X is Card',
       
   695                    [('OneFetchStep',
       
   696                      [('Any X WHERE X eid > 12, X is Card', [{'X': 'Card'}])],
       
   697                      None, None, [self.system], {}, [])
       
   698                     ])
       
   699 
       
   700     def test_complex_optional(self):
       
   701         ueid = self.session.user.eid
       
   702         self._test('Any U WHERE WF wf_info_for X, X eid %(x)s, WF owned_by U?, WF from_state FS',
       
   703                    [('OneFetchStep', [('Any U WHERE WF wf_info_for %s, WF owned_by U?, WF from_state FS' % ueid,
       
   704                                        [{'WF': 'TrInfo', 'FS': 'State', 'U': 'CWUser'}])],
       
   705                      None, None, [self.system], {}, [])],
       
   706                    {'x': ueid})
       
   707 
       
   708     def test_complex_optional(self):
       
   709         ueid = self.session.user.eid
       
   710         self._test('Any U WHERE WF wf_info_for X, X eid %(x)s, WF owned_by U?, WF from_state FS',
       
   711                    [('OneFetchStep', [('Any U WHERE WF wf_info_for %s, WF owned_by U?, WF from_state FS' % ueid,
       
   712                                        [{'WF': 'TrInfo', 'FS': 'State', 'U': 'CWUser'}])],
       
   713                      None, None, [self.system], {}, [])],
       
   714                    {'x': ueid})
       
   715 
       
   716 
       
   717     def test_3sources_ambigous(self):
       
   718         self._test('Any X,T WHERE X owned_by U, U login "syt", X title T, X is IN(Bookmark, Card, EmailThread)',
       
   719                    [('FetchStep', [('Any X,T WHERE X title T, X is Card', [{'X': 'Card', 'T': 'String'}])],
       
   720                      [self.cards, self.system], None,
       
   721                      {'T': 'table0.C1', 'X': 'table0.C0', 'X.title': 'table0.C1'}, []),
       
   722                     ('FetchStep', [('Any U WHERE U login "syt", U is CWUser', [{'U': 'CWUser'}])],
       
   723                      [self.ldap, self.system], None,
       
   724                      {'U': 'table1.C0'}, []),
       
   725                     ('UnionStep', None, None, [
       
   726                         ('OneFetchStep', [('Any X,T WHERE X owned_by U, X title T, U is CWUser, X is IN(Bookmark, EmailThread)',
       
   727                                            [{'T': 'String', 'U': 'CWUser', 'X': 'Bookmark'},
       
   728                                             {'T': 'String', 'U': 'CWUser', 'X': 'EmailThread'}])],
       
   729                          None, None, [self.system], {'U': 'table1.C0'}, []),
       
   730                         ('OneFetchStep', [('Any X,T WHERE X owned_by U, X title T, U is CWUser, X is Card',
       
   731                                            [{'X': 'Card', 'U': 'CWUser', 'T': 'String'}])],
       
   732                          None, None, [self.system],
       
   733                          {'X': 'table0.C0', 'X.title': 'table0.C1', 'T': 'table0.C1', 'U': 'table1.C0'}, []),
       
   734                         ]),
       
   735                     ])
       
   736 
       
   737     def test_restricted_max(self):
       
   738         # dumb query to emulate the one generated by svnfile.entities.rql_revision_content
       
   739         self._test('Any V, MAX(VR) WHERE V is Card, V creation_date VR, '
       
   740                    '(V creation_date TODAY OR (V creation_date < TODAY AND NOT EXISTS('
       
   741                    'X is Card, X creation_date < TODAY, X creation_date >= VR)))',
       
   742                    [('FetchStep', [('Any VR WHERE X creation_date < TODAY, X creation_date VR, X is Card',
       
   743                                     [{'X': 'Card', 'VR': 'Datetime'}])],
       
   744                      [self.cards, self.system], None,
       
   745                      {'VR': 'table0.C0', 'X.creation_date': 'table0.C0'}, []),
       
   746                     ('FetchStep', [('Any V,VR WHERE V creation_date VR, V is Card',
       
   747                                     [{'VR': 'Datetime', 'V': 'Card'}])],
       
   748                      [self.cards, self.system], None,
       
   749                      {'VR': 'table1.C1', 'V': 'table1.C0', 'V.creation_date': 'table1.C1'}, []),
       
   750                     ('OneFetchStep', [('Any V,MAX(VR) WHERE V creation_date VR, (V creation_date TODAY) OR (V creation_date < TODAY, NOT EXISTS(X creation_date >= VR, X is Card)), V is Card',
       
   751                                        [{'X': 'Card', 'VR': 'Datetime', 'V': 'Card'}])],
       
   752                      None, None, [self.system],
       
   753                      {'VR': 'table1.C1', 'V': 'table1.C0', 'V.creation_date': 'table1.C1', 'X.creation_date': 'table0.C0'}, [])
       
   754                     ])
       
   755 
       
   756     def test_outer_supported_rel1(self):
       
   757         # both system and rql support all variables, can be
       
   758         self._test('Any X, R WHERE X is Note, X in_state S, X type R, '
       
   759                    'NOT EXISTS(Y is Note, Y in_state S, Y type R, X identity Y)',
       
   760                    [('OneFetchStep', [('Any X,R WHERE X is Note, X in_state S, X type R, NOT EXISTS(Y is Note, Y in_state S, Y type R, X identity Y), S is State',
       
   761                                        [{'Y': 'Note', 'X': 'Note', 'S': 'State', 'R': 'String'}])],
       
   762                      None, None,
       
   763                      [self.cards, self.system], {}, [])
       
   764                     ])
       
   765 
       
   766     def test_not_identity(self):
       
   767         ueid = self.session.user.eid
       
   768         self._test('Any X WHERE NOT X identity U, U eid %s, X is CWUser' % ueid,
       
   769                    [('OneFetchStep',
       
   770                      [('Any X WHERE NOT X identity %s, X is CWUser' % ueid, [{'X': 'CWUser'}])],
       
   771                      None, None,
       
   772                      [self.ldap, self.system], {}, [])
       
   773                     ])
       
   774 
       
   775     def test_outer_supported_rel2(self):
       
   776         self._test('Any X, MAX(R) GROUPBY X WHERE X in_state S, X login R, '
       
   777                    'NOT EXISTS(Y is Note, Y in_state S, Y type R)',
       
   778                    [('FetchStep', [('Any A,R WHERE Y in_state A, Y type R, A is State, Y is Note',
       
   779                                     [{'Y': 'Note', 'A': 'State', 'R': 'String'}])],
       
   780                      [self.cards, self.system], None,
       
   781                      {'A': 'table0.C0', 'R': 'table0.C1', 'Y.type': 'table0.C1'}, []),
       
   782                     ('FetchStep', [('Any X,R WHERE X login R, X is CWUser', [{'X': 'CWUser', 'R': 'String'}])],
       
   783                      [self.ldap, self.system], None,
       
   784                      {'X': 'table1.C0', 'X.login': 'table1.C1', 'R': 'table1.C1'}, []),
       
   785                     ('OneFetchStep', [('Any X,MAX(R) GROUPBY X WHERE X in_state S, X login R, NOT EXISTS(Y type R, S identity A, A is State, Y is Note), S is State, X is CWUser',
       
   786                                        [{'Y': 'Note', 'X': 'CWUser', 'S': 'State', 'R': 'String', 'A': 'State'}])],
       
   787                      None, None, [self.system],
       
   788                      {'A': 'table0.C0', 'X': 'table1.C0', 'X.login': 'table1.C1', 'R': 'table1.C1', 'Y.type': 'table0.C1'}, [])
       
   789                     ])
       
   790 
       
   791     def test_security_has_text(self):
       
   792         # use a guest user
       
   793         self.session = self.user_groups_session('guests')
       
   794         ueid = self.session.user.eid
       
   795         self._test('Any X WHERE X has_text "bla"',
       
   796                    [('FetchStep', [('Any E WHERE E type "X", E is Note', [{'E': 'Note'}])],
       
   797                      [self.cards, self.system], None, {'E': 'table0.C0'}, []),
       
   798                     ('UnionStep', None, None,
       
   799                      [('OneFetchStep',
       
   800                        [(u'Any X WHERE X has_text "bla", (EXISTS(X owned_by %(ueid)s)) OR ((((EXISTS(D concerne C?, C owned_by %(ueid)s, C type "X", X identity D, C is Division, D is Affaire)) OR (EXISTS(H concerne G?, G owned_by %(ueid)s, G type "X", X identity H, G is SubDivision, H is Affaire))) OR (EXISTS(I concerne F?, F owned_by %(ueid)s, F type "X", X identity I, F is Societe, I is Affaire))) OR (EXISTS(J concerne E?, E owned_by %(ueid)s, X identity J, E is Note, J is Affaire))), X is Affaire' % {'ueid': ueid},
       
   801                          [{'C': 'Division', 'E': 'Note', 'D': 'Affaire', 'G': 'SubDivision', 'F': 'Societe', 'I': 'Affaire', 'H': 'Affaire', 'J': 'Affaire', 'X': 'Affaire'}])],
       
   802                        None, None, [self.system], {'E': 'table0.C0'}, []),
       
   803                       ('OneFetchStep',
       
   804                        [('Any X WHERE X has_text "bla", EXISTS(X owned_by %s), X is Basket' % ueid,
       
   805                          [{'X': 'Basket'}]),
       
   806                         ('Any X WHERE X has_text "bla", EXISTS(X owned_by %s), X is CWUser' % ueid,
       
   807                          [{'X': 'CWUser'}]),
       
   808                         ('Any X WHERE X has_text "bla", X is IN(Card, Comment, Division, Email, EmailThread, File, Folder, Note, Personne, Societe, SubDivision, Tag)',
       
   809                          [{'X': 'Card'}, {'X': 'Comment'},
       
   810                           {'X': 'Division'}, {'X': 'Email'}, {'X': 'EmailThread'},
       
   811                           {'X': 'File'}, {'X': 'Folder'},
       
   812                           {'X': 'Note'}, {'X': 'Personne'}, {'X': 'Societe'},
       
   813                           {'X': 'SubDivision'}, {'X': 'Tag'}]),],
       
   814                        None, None, [self.system], {}, []),
       
   815                       ])
       
   816                      ])
       
   817 
       
   818     def test_security_has_text_limit_offset(self):
       
   819         # use a guest user
       
   820         self.session = self.user_groups_session('guests')
       
   821         ueid = self.session.user.eid
       
   822         # note: same as the above query but because of the subquery usage, the
       
   823         # display differs (not printing solutions for each union)
       
   824         self._test('Any X LIMIT 10 OFFSET 10 WHERE X has_text "bla"',
       
   825                    [('FetchStep', [('Any E WHERE E type "X", E is Note', [{'E': 'Note'}])],
       
   826                       [self.cards, self.system], None, {'E': 'table1.C0'}, []),
       
   827                      ('UnionFetchStep', [
       
   828                         ('FetchStep', [('Any X WHERE X has_text "bla", (EXISTS(X owned_by %(ueid)s)) OR ((((EXISTS(D concerne C?, C owned_by %(ueid)s, C type "X", X identity D, C is Division, D is Affaire)) OR (EXISTS(H concerne G?, G owned_by %(ueid)s, G type "X", X identity H, G is SubDivision, H is Affaire))) OR (EXISTS(I concerne F?, F owned_by %(ueid)s, F type "X", X identity I, F is Societe, I is Affaire))) OR (EXISTS(J concerne E?, E owned_by %(ueid)s, X identity J, E is Note, J is Affaire))), X is Affaire' % {'ueid': ueid},
       
   829                                             [{'C': 'Division', 'E': 'Note', 'D': 'Affaire', 'G': 'SubDivision', 'F': 'Societe', 'I': 'Affaire', 'H': 'Affaire', 'J': 'Affaire', 'X': 'Affaire'}])],
       
   830                           [self.system], {'E': 'table1.C0'}, {'X': 'table0.C0'}, []),
       
   831                          ('FetchStep',
       
   832                           [('Any X WHERE X has_text "bla", EXISTS(X owned_by %s), X is Basket' % ueid,
       
   833                             [{'X': 'Basket'}]),
       
   834                            ('Any X WHERE X has_text "bla", EXISTS(X owned_by %s), X is CWUser' % ueid,
       
   835                             [{'X': 'CWUser'}]),
       
   836                            ('Any X WHERE X has_text "bla", X is IN(Card, Comment, Division, Email, EmailThread, File, Folder, Note, Personne, Societe, SubDivision, Tag)',
       
   837                             [{'X': 'Card'}, {'X': 'Comment'},
       
   838                              {'X': 'Division'}, {'X': 'Email'}, {'X': 'EmailThread'},
       
   839                              {'X': 'File'}, {'X': 'Folder'},
       
   840                              {'X': 'Note'}, {'X': 'Personne'}, {'X': 'Societe'},
       
   841                              {'X': 'SubDivision'}, {'X': 'Tag'}])],
       
   842                           [self.system], {}, {'X': 'table0.C0'}, []),
       
   843                          ]),
       
   844                     ('OneFetchStep',
       
   845                      [('Any X LIMIT 10 OFFSET 10',
       
   846                        [{'X': 'Affaire'}, {'X': 'Basket'},
       
   847                         {'X': 'CWUser'}, {'X': 'Card'}, {'X': 'Comment'},
       
   848                         {'X': 'Division'}, {'X': 'Email'}, {'X': 'EmailThread'},
       
   849                         {'X': 'File'}, {'X': 'Folder'},
       
   850                         {'X': 'Note'}, {'X': 'Personne'}, {'X': 'Societe'},
       
   851                         {'X': 'SubDivision'}, {'X': 'Tag'}])],
       
   852                      10, 10, [self.system], {'X': 'table0.C0'}, [])
       
   853                      ])
       
   854 
       
   855     def test_security_user(self):
       
   856         """a guest user trying to see another user: EXISTS(X owned_by U) is automatically inserted"""
       
   857         # use a guest user
       
   858         self.session = self.user_groups_session('guests')
       
   859         ueid = self.session.user.eid
       
   860         self._test('Any X WHERE X login "bla"',
       
   861                    [('FetchStep',
       
   862                      [('Any X WHERE X login "bla", X is CWUser', [{'X': 'CWUser'}])],
       
   863                      [self.ldap, self.system], None, {'X': 'table0.C0'}, []),
       
   864                     ('OneFetchStep',
       
   865                      [('Any X WHERE EXISTS(X owned_by %s), X is CWUser' % ueid, [{'X': 'CWUser'}])],
       
   866                      None, None, [self.system], {'X': 'table0.C0'}, [])])
       
   867 
       
   868     def test_security_complex_has_text(self):
       
   869         # use a guest user
       
   870         self.session = self.user_groups_session('guests')
       
   871         ueid = self.session.user.eid
       
   872         self._test('Any X WHERE X has_text "bla", X firstname "bla"',
       
   873                    [('FetchStep', [('Any X WHERE X firstname "bla", X is CWUser', [{'X': 'CWUser'}])],
       
   874                      [self.ldap, self.system], None, {'X': 'table0.C0'}, []),
       
   875                     ('UnionStep', None, None, [
       
   876                         ('OneFetchStep', [('Any X WHERE X has_text "bla", EXISTS(X owned_by %s), X is CWUser' % ueid, [{'X': 'CWUser'}])],
       
   877                          None, None, [self.system], {'X': 'table0.C0'}, []),
       
   878                         ('OneFetchStep', [('Any X WHERE X has_text "bla", X firstname "bla", X is Personne', [{'X': 'Personne'}])],
       
   879                          None, None, [self.system], {}, []),
       
   880                         ]),
       
   881                     ])
       
   882 
       
   883     def test_security_complex_has_text_limit_offset(self):
       
   884         # use a guest user
       
   885         self.session = self.user_groups_session('guests')
       
   886         ueid = self.session.user.eid
       
   887         self._test('Any X LIMIT 10 OFFSET 10 WHERE X has_text "bla", X firstname "bla"',
       
   888                    [('FetchStep', [('Any X WHERE X firstname "bla", X is CWUser', [{'X': 'CWUser'}])],
       
   889                      [self.ldap, self.system], None, {'X': 'table1.C0'}, []),
       
   890                     ('UnionFetchStep', [
       
   891                         ('FetchStep', [('Any X WHERE X has_text "bla", EXISTS(X owned_by %s), X is CWUser' % ueid, [{'X': 'CWUser'}])],
       
   892                          [self.system], {'X': 'table1.C0'}, {'X': 'table0.C0'}, []),
       
   893                         ('FetchStep', [('Any X WHERE X has_text "bla", X firstname "bla", X is Personne', [{'X': 'Personne'}])],
       
   894                          [self.system], {}, {'X': 'table0.C0'}, []),
       
   895                         ]),
       
   896                      ('OneFetchStep',
       
   897                       [('Any X LIMIT 10 OFFSET 10', [{'X': 'CWUser'}, {'X': 'Personne'}])],
       
   898                       10, 10, [self.system], {'X': 'table0.C0'}, [])
       
   899                     ])
       
   900 
       
   901     def test_security_complex_aggregat(self):
       
   902         # use a guest user
       
   903         self.session = self.user_groups_session('guests')
       
   904         ueid = self.session.user.eid
       
   905         ALL_SOLS = X_ALL_SOLS[:]
       
   906         ALL_SOLS.remove({'X': 'CWSourceHostConfig'}) # not authorized
       
   907         ALL_SOLS.remove({'X': 'CWSourceSchemaConfig'}) # not authorized
       
   908         ALL_SOLS.remove({'X': 'CWDataImport'}) # not authorized
       
   909         self._test('Any MAX(X)',
       
   910                    [('FetchStep', [('Any E WHERE E type "X", E is Note', [{'E': 'Note'}])],
       
   911                      [self.cards, self.system],  None, {'E': 'table1.C0'}, []),
       
   912                     ('FetchStep', [('Any X WHERE X is CWUser', [{'X': 'CWUser'}])],
       
   913                      [self.ldap, self.system], None, {'X': 'table2.C0'}, []),
       
   914                     ('UnionFetchStep', [
       
   915                         ('FetchStep', [('Any X WHERE EXISTS(%s use_email X), X is EmailAddress' % ueid,
       
   916       [{'X': 'EmailAddress'}]),
       
   917                                        ('Any X WHERE EXISTS(X owned_by %s), X is Basket' % ueid, [{'X': 'Basket'}])],
       
   918                           [self.system], {}, {'X': 'table0.C0'}, []),
       
   919                         ('UnionFetchStep',
       
   920                          [('FetchStep', [('Any X WHERE X is IN(Card, Note, State)',
       
   921                                           [{'X': 'Card'}, {'X': 'Note'}, {'X': 'State'}])],
       
   922                            [self.cards, self.system], {}, {'X': 'table0.C0'}, []),
       
   923                           ('FetchStep',
       
   924                            [('Any X WHERE X is IN(BaseTransition, Bookmark, CWAttribute, CWCache, CWConstraint, CWConstraintType, CWEType, CWGroup, CWPermission, CWProperty, CWRType, CWRelation, CWSource, CWUniqueTogetherConstraint, Comment, Division, Email, EmailPart, EmailThread, ExternalUri, File, Folder, Old, Personne, RQLExpression, Societe, SubDivision, SubWorkflowExitPoint, Tag, TrInfo, Transition, Workflow, WorkflowTransition)',
       
   925                              [{'X': 'BaseTransition'}, {'X': 'Bookmark'},
       
   926                               {'X': 'CWAttribute'}, {'X': 'CWCache'},
       
   927                               {'X': 'CWConstraint'}, {'X': 'CWConstraintType'},
       
   928                               {'X': 'CWEType'}, {'X': 'CWGroup'},
       
   929                               {'X': 'CWPermission'}, {'X': 'CWProperty'},
       
   930                               {'X': 'CWRType'}, {'X': 'CWRelation'},
       
   931                               {'X': 'CWSource'},
       
   932                               {'X': 'CWUniqueTogetherConstraint'},
       
   933                               {'X': 'Comment'}, {'X': 'Division'},
       
   934                               {'X': 'Email'},
       
   935                               {'X': 'EmailPart'}, {'X': 'EmailThread'},
       
   936                               {'X': 'ExternalUri'}, {'X': 'File'},
       
   937                               {'X': 'Folder'}, {'X': 'Old'},
       
   938                               {'X': 'Personne'}, {'X': 'RQLExpression'},
       
   939                               {'X': 'Societe'}, {'X': 'SubDivision'},
       
   940                               {'X': 'SubWorkflowExitPoint'}, {'X': 'Tag'},
       
   941                               {'X': 'TrInfo'}, {'X': 'Transition'},
       
   942                               {'X': 'Workflow'}, {'X': 'WorkflowTransition'}])],
       
   943                            [self.system], {}, {'X': 'table0.C0'}, []),
       
   944                           ]),
       
   945                         ('FetchStep', [('Any X WHERE EXISTS(X owned_by %s), X is CWUser' % ueid, [{'X': 'CWUser'}])],
       
   946                          [self.system], {'X': 'table2.C0'}, {'X': 'table0.C0'}, []),
       
   947                         ('FetchStep', [('Any X WHERE (EXISTS(X owned_by %(ueid)s)) OR ((((EXISTS(D concerne C?, C owned_by %(ueid)s, C type "X", X identity D, C is Division, D is Affaire)) OR (EXISTS(H concerne G?, G owned_by %(ueid)s, G type "X", X identity H, G is SubDivision, H is Affaire))) OR (EXISTS(I concerne F?, F owned_by %(ueid)s, F type "X", X identity I, F is Societe, I is Affaire))) OR (EXISTS(J concerne E?, E owned_by %(ueid)s, X identity J, E is Note, J is Affaire))), X is Affaire' % {'ueid': ueid},
       
   948                                         [{'C': 'Division', 'E': 'Note', 'D': 'Affaire', 'G': 'SubDivision', 'F': 'Societe', 'I': 'Affaire', 'H': 'Affaire', 'J': 'Affaire', 'X': 'Affaire'}])],
       
   949                          [self.system], {'E': 'table1.C0'}, {'X': 'table0.C0'}, []),
       
   950                         ]),
       
   951                     ('OneFetchStep', [('Any MAX(X)', ALL_SOLS)],
       
   952                      None, None, [self.system], {'X': 'table0.C0'}, [])
       
   953                     ])
       
   954 
       
   955     def test_security_complex_aggregat2(self):
       
   956         # use a guest user
       
   957         self.session = self.user_groups_session('guests')
       
   958         ueid = self.session.user.eid
       
   959         X_ET_ALL_SOLS = []
       
   960         for s in X_ALL_SOLS:
       
   961             if s in ({'X': 'CWSourceHostConfig'}, {'X': 'CWSourceSchemaConfig'}, {'X': 'CWDataImport'}):
       
   962                 continue # not authorized
       
   963             ets = {'ET': 'CWEType'}
       
   964             ets.update(s)
       
   965             X_ET_ALL_SOLS.append(ets)
       
   966         self._test('Any ET, COUNT(X) GROUPBY ET ORDERBY ET WHERE X is ET',
       
   967                    [('FetchStep', [('Any X WHERE X is IN(Card, Note, State)',
       
   968                                     [{'X': 'Card'}, {'X': 'Note'}, {'X': 'State'}])],
       
   969                      [self.cards, self.system], None, {'X': 'table1.C0'}, []),
       
   970                     ('FetchStep', [('Any E WHERE E type "X", E is Note', [{'E': 'Note'}])],
       
   971                      [self.cards, self.system],  None, {'E': 'table2.C0'}, []),
       
   972                     ('FetchStep', [('Any X WHERE X is CWUser', [{'X': 'CWUser'}])],
       
   973                      [self.ldap, self.system], None, {'X': 'table3.C0'}, []),
       
   974                     ('UnionFetchStep',
       
   975                      [('FetchStep', [('Any ET,X WHERE X is ET, EXISTS(%s use_email X), ET is CWEType, X is EmailAddress' % ueid,
       
   976       [{'ET': 'CWEType', 'X': 'EmailAddress'}]), ('Any ET,X WHERE X is ET, EXISTS(X owned_by %s), ET is CWEType, X is Basket' % ueid,
       
   977                                       [{'ET': 'CWEType', 'X': 'Basket'}])],
       
   978                        [self.system], {}, {'ET': 'table0.C0', 'X': 'table0.C1'}, []),
       
   979                       ('FetchStep', [('Any ET,X WHERE X is ET, (EXISTS(X owned_by %(ueid)s)) OR ((((EXISTS(D concerne C?, C owned_by %(ueid)s, C type "X", X identity D, C is Division, D is Affaire)) OR (EXISTS(H concerne G?, G owned_by %(ueid)s, G type "X", X identity H, G is SubDivision, H is Affaire))) OR (EXISTS(I concerne F?, F owned_by %(ueid)s, F type "X", X identity I, F is Societe, I is Affaire))) OR (EXISTS(J concerne E?, E owned_by %(ueid)s, X identity J, E is Note, J is Affaire))), ET is CWEType, X is Affaire' % {'ueid': ueid},
       
   980                                       [{'C': 'Division', 'E': 'Note', 'D': 'Affaire',
       
   981                                         'G': 'SubDivision', 'F': 'Societe', 'I': 'Affaire',
       
   982                                         'H': 'Affaire', 'J': 'Affaire', 'X': 'Affaire',
       
   983                                         'ET': 'CWEType'}])],
       
   984                        [self.system], {'E': 'table2.C0'}, {'ET': 'table0.C0', 'X': 'table0.C1'},
       
   985                        []),
       
   986                       ('FetchStep', [('Any ET,X WHERE X is ET, EXISTS(X owned_by %s), ET is CWEType, X is CWUser' % ueid,
       
   987                                       [{'ET': 'CWEType', 'X': 'CWUser'}])],
       
   988                        [self.system], {'X': 'table3.C0'}, {'ET': 'table0.C0', 'X': 'table0.C1'}, []),
       
   989                       # extra UnionFetchStep could be avoided but has no cost, so don't care
       
   990                       ('UnionFetchStep',
       
   991                        [('FetchStep', [('Any ET,X WHERE X is ET, ET is CWEType, X is IN(BaseTransition, Bookmark, CWAttribute, CWCache, CWConstraint, CWConstraintType, CWEType, CWGroup, CWPermission, CWProperty, CWRType, CWRelation, CWSource, CWUniqueTogetherConstraint, Comment, Division, Email, EmailPart, EmailThread, ExternalUri, File, Folder, Old, Personne, RQLExpression, Societe, SubDivision, SubWorkflowExitPoint, Tag, TrInfo, Transition, Workflow, WorkflowTransition)',
       
   992                                         [{'X': 'BaseTransition', 'ET': 'CWEType'},
       
   993                                          {'X': 'Bookmark', 'ET': 'CWEType'}, {'X': 'CWAttribute', 'ET': 'CWEType'},
       
   994                                          {'X': 'CWCache', 'ET': 'CWEType'}, {'X': 'CWConstraint', 'ET': 'CWEType'},
       
   995                                          {'X': 'CWConstraintType', 'ET': 'CWEType'},
       
   996                                          {'X': 'CWEType', 'ET': 'CWEType'},
       
   997                                          {'X': 'CWGroup', 'ET': 'CWEType'}, {'X': 'CWPermission', 'ET': 'CWEType'},
       
   998                                          {'X': 'CWProperty', 'ET': 'CWEType'}, {'X': 'CWRType', 'ET': 'CWEType'},
       
   999                                          {'X': 'CWSource', 'ET': 'CWEType'},
       
  1000                                          {'X': 'CWRelation', 'ET': 'CWEType'},
       
  1001                                          {'X': 'CWUniqueTogetherConstraint', 'ET': 'CWEType'},
       
  1002                                          {'X': 'Comment', 'ET': 'CWEType'},
       
  1003                                          {'X': 'Division', 'ET': 'CWEType'}, {'X': 'Email', 'ET': 'CWEType'},
       
  1004                                          {'X': 'EmailPart', 'ET': 'CWEType'},
       
  1005                                          {'X': 'EmailThread', 'ET': 'CWEType'}, {'X': 'ExternalUri', 'ET': 'CWEType'},
       
  1006                                          {'X': 'File', 'ET': 'CWEType'}, {'X': 'Folder', 'ET': 'CWEType'},
       
  1007                                          {'X': 'Old', 'ET': 'CWEType'}, {'X': 'Personne', 'ET': 'CWEType'},
       
  1008                                          {'X': 'RQLExpression', 'ET': 'CWEType'}, {'X': 'Societe', 'ET': 'CWEType'},
       
  1009                                          {'X': 'SubDivision', 'ET': 'CWEType'}, {'X': 'SubWorkflowExitPoint', 'ET': 'CWEType'},
       
  1010                                          {'X': 'Tag', 'ET': 'CWEType'}, {'X': 'TrInfo', 'ET': 'CWEType'},
       
  1011                                          {'X': 'Transition', 'ET': 'CWEType'}, {'X': 'Workflow', 'ET': 'CWEType'},
       
  1012                                          {'X': 'WorkflowTransition', 'ET': 'CWEType'}])],
       
  1013                          [self.system], {}, {'ET': 'table0.C0', 'X': 'table0.C1'}, []),
       
  1014                         ('FetchStep',
       
  1015                          [('Any ET,X WHERE X is ET, ET is CWEType, X is IN(Card, Note, State)',
       
  1016                            [{'ET': 'CWEType', 'X': 'Card'},
       
  1017                             {'ET': 'CWEType', 'X': 'Note'},
       
  1018                             {'ET': 'CWEType', 'X': 'State'}])],
       
  1019                          [self.system], {'X': 'table1.C0'}, {'ET': 'table0.C0', 'X': 'table0.C1'}, []),
       
  1020                         ]),
       
  1021                     ]),
       
  1022                     ('OneFetchStep',
       
  1023                      [('Any ET,COUNT(X) GROUPBY ET ORDERBY ET', X_ET_ALL_SOLS)],
       
  1024                      None, None, [self.system], {'ET': 'table0.C0', 'X': 'table0.C1'}, [])
       
  1025                     ])
       
  1026 
       
  1027     def test_security_3sources(self):
       
  1028         # use a guest user
       
  1029         self.session = self.user_groups_session('guests')
       
  1030         ueid = self.session.user.eid
       
  1031         self._test('Any X, XT WHERE X is Card, X owned_by U, X title XT, U login "syt"',
       
  1032                    [('FetchStep',
       
  1033                      [('Any X,XT WHERE X title XT, X is Card', [{'X': 'Card', 'XT': 'String'}])],
       
  1034                      [self.cards, self.system], None, {'X': 'table0.C0', 'X.title': 'table0.C1', 'XT': 'table0.C1'}, []),
       
  1035                     ('FetchStep',
       
  1036                      [('Any U WHERE U login "syt", U is CWUser', [{'U': 'CWUser'}])],
       
  1037                      [self.ldap, self.system], None, {'U': 'table1.C0'}, []),
       
  1038                     ('OneFetchStep',
       
  1039                      [('Any X,XT WHERE X owned_by U, X title XT, EXISTS(U owned_by %s), U is CWUser, X is Card' % ueid,
       
  1040                        [{'X': 'Card', 'U': 'CWUser', 'XT': 'String'}])],
       
  1041                      None, None, [self.system],
       
  1042                      {'X': 'table0.C0', 'X.title': 'table0.C1', 'XT': 'table0.C1', 'U': 'table1.C0'}, [])
       
  1043                     ])
       
  1044 
       
  1045     def test_security_3sources_identity(self):
       
  1046         self.restore_orig_cwuser_security()
       
  1047         # use a guest user
       
  1048         self.session = self.user_groups_session('guests')
       
  1049         ueid = self.session.user.eid
       
  1050         self._test('Any X, XT WHERE X is Card, X owned_by U, X title XT, U login "syt"',
       
  1051                    [('FetchStep',
       
  1052                      [('Any X,XT WHERE X title XT, X is Card', [{'X': 'Card', 'XT': 'String'}])],
       
  1053                      [self.cards, self.system], None, {'X': 'table0.C0', 'X.title': 'table0.C1', 'XT': 'table0.C1'}, []),
       
  1054                     ('OneFetchStep',
       
  1055                      [('Any X,XT WHERE X owned_by U, X title XT, U login "syt", EXISTS(U identity %s), U is CWUser, X is Card' % ueid,
       
  1056                        [{'U': 'CWUser', 'X': 'Card', 'XT': 'String'}])],
       
  1057                      None, None, [self.system], {'X': 'table0.C0', 'X.title': 'table0.C1', 'XT': 'table0.C1'}, [])
       
  1058                     ])
       
  1059 
       
  1060     def test_security_3sources_identity_optional_var(self):
       
  1061         self.restore_orig_cwuser_security()
       
  1062         # use a guest user
       
  1063         self.session = self.user_groups_session('guests')
       
  1064         ueid = self.session.user.eid
       
  1065         self._test('Any X,XT,U WHERE X is Card, X owned_by U?, X title XT, U login L',
       
  1066                    [('FetchStep',
       
  1067                      [('Any U,L WHERE U login L, EXISTS(U identity %s), U is CWUser' % ueid,
       
  1068                        [{'L': 'String', u'U': 'CWUser'}])],
       
  1069                      [self.system], {}, {'L': 'table0.C1', 'U': 'table0.C0', 'U.login': 'table0.C1'}, []),
       
  1070                     ('FetchStep',
       
  1071                      [('Any X,XT WHERE X title XT, X is Card', [{'X': 'Card', 'XT': 'String'}])],
       
  1072                      [self.cards, self.system], None, {'X': 'table1.C0', 'X.title': 'table1.C1', 'XT': 'table1.C1'}, []),
       
  1073                     ('OneFetchStep',
       
  1074                      [('Any X,XT,U WHERE X owned_by U?, X title XT, X is Card',
       
  1075                        [{'X': 'Card', 'U': 'CWUser', 'XT': 'String'}])],
       
  1076                      None, None, [self.system], {'L': 'table0.C1',
       
  1077                                                  'U': 'table0.C0',
       
  1078                                                  'X': 'table1.C0',
       
  1079                                                  'X.title': 'table1.C1',
       
  1080                                                  'XT': 'table1.C1'}, [])
       
  1081                     ])
       
  1082 
       
  1083     def test_security_3sources_limit_offset(self):
       
  1084         # use a guest user
       
  1085         self.session = self.user_groups_session('guests')
       
  1086         ueid = self.session.user.eid
       
  1087         self._test('Any X, XT LIMIT 10 OFFSET 10 WHERE X is Card, X owned_by U, X title XT, U login "syt"',
       
  1088                    [('FetchStep',
       
  1089                      [('Any X,XT WHERE X title XT, X is Card', [{'X': 'Card', 'XT': 'String'}])],
       
  1090                      [self.cards, self.system], None, {'X': 'table0.C0', 'X.title': 'table0.C1', 'XT': 'table0.C1'}, []),
       
  1091                     ('FetchStep',
       
  1092                      [('Any U WHERE U login "syt", U is CWUser', [{'U': 'CWUser'}])],
       
  1093                      [self.ldap, self.system], None, {'U': 'table1.C0'}, []),
       
  1094                     ('OneFetchStep',
       
  1095                      [('Any X,XT LIMIT 10 OFFSET 10 WHERE X owned_by U, X title XT, EXISTS(U owned_by %s), U is CWUser, X is Card' % ueid,
       
  1096                        [{'X': 'Card', 'U': 'CWUser', 'XT': 'String'}])],
       
  1097                      10, 10, [self.system],
       
  1098                      {'X': 'table0.C0', 'X.title': 'table0.C1', 'XT': 'table0.C1', 'U': 'table1.C0'}, [])
       
  1099                     ])
       
  1100 
       
  1101     def test_exists_base(self):
       
  1102         self._test('Any X,L,S WHERE X in_state S, X login L, EXISTS(X in_group G, G name "bougloup")',
       
  1103                    [('FetchStep', [('Any X,L WHERE X login L, X is CWUser', [{'X': 'CWUser', 'L': 'String'}])],
       
  1104                      [self.ldap, self.system], None, {'X': 'table0.C0', 'X.login': 'table0.C1', 'L': 'table0.C1'}, []),
       
  1105                     ('OneFetchStep', [("Any X,L,S WHERE X in_state S, X login L, "
       
  1106                                       'EXISTS(X in_group G, G name "bougloup", G is CWGroup), S is State, X is CWUser',
       
  1107                                        [{'X': 'CWUser', 'L': 'String', 'S': 'State', 'G': 'CWGroup'}])],
       
  1108                      None, None, [self.system],
       
  1109                      {'X': 'table0.C0', 'X.login': 'table0.C1', 'L': 'table0.C1'}, [])])
       
  1110 
       
  1111     def test_exists_complex(self):
       
  1112         self._test('Any G WHERE X in_group G, G name "managers", EXISTS(X copain T, T login in ("comme", "cochon"))',
       
  1113                    [('FetchStep', [('Any T WHERE T login IN("comme", "cochon"), T is CWUser', [{'T': 'CWUser'}])],
       
  1114                      [self.ldap, self.system], None, {'T': 'table0.C0'}, []),
       
  1115                     ('OneFetchStep',
       
  1116                      [('Any G WHERE X in_group G, G name "managers", EXISTS(X copain T, T is CWUser), G is CWGroup, X is CWUser',
       
  1117                        [{'X': 'CWUser', 'T': 'CWUser', 'G': 'CWGroup'}])],
       
  1118                      None, None, [self.system], {'T': 'table0.C0'}, [])])
       
  1119 
       
  1120     def test_exists3(self):
       
  1121         self._test('Any G,L WHERE X in_group G, X login L, G name "managers", EXISTS(X copain T, T login in ("comme", "cochon"))',
       
  1122                    [('FetchStep',
       
  1123                      [('Any T WHERE T login IN("comme", "cochon"), T is CWUser',
       
  1124                        [{'T': 'CWUser'}])],
       
  1125                      [self.ldap, self.system], None, {'T': 'table0.C0'}, []),
       
  1126                     ('FetchStep',
       
  1127                      [('Any L,X WHERE X login L, X is CWUser', [{'X': 'CWUser', 'L': 'String'}])],
       
  1128                      [self.ldap, self.system], None,
       
  1129                      {'X': 'table1.C1', 'X.login': 'table1.C0', 'L': 'table1.C0'}, []),
       
  1130                     ('OneFetchStep',
       
  1131                      [('Any G,L WHERE X in_group G, X login L, G name "managers", EXISTS(X copain T, T is CWUser), G is CWGroup, X is CWUser',
       
  1132                        [{'G': 'CWGroup', 'L': 'String', 'T': 'CWUser', 'X': 'CWUser'}])],
       
  1133                      None, None,
       
  1134                      [self.system], {'T': 'table0.C0', 'X': 'table1.C1', 'X.login': 'table1.C0', 'L': 'table1.C0'}, [])])
       
  1135 
       
  1136     def test_exists4(self):
       
  1137         self._test('Any G,L WHERE X in_group G, X login L, G name "managers", '
       
  1138                    'EXISTS(X copain T, T login L, T login in ("comme", "cochon")) OR '
       
  1139                    'EXISTS(X in_state S, S name "pascontent", NOT X copain T2, T2 login "billy")',
       
  1140                    [('FetchStep',
       
  1141                      [('Any T,L WHERE T login L, T login IN("comme", "cochon"), T is CWUser', [{'T': 'CWUser', 'L': 'String'}])],
       
  1142                      [self.ldap, self.system], None,
       
  1143                      {'T': 'table0.C0', 'T.login': 'table0.C1', 'L': 'table0.C1'}, []),
       
  1144                     ('FetchStep',
       
  1145                      [('Any T2 WHERE T2 login "billy", T2 is CWUser', [{'T2': 'CWUser'}])],
       
  1146                      [self.ldap, self.system], None, {'T2': 'table1.C0'}, []),
       
  1147                     ('FetchStep',
       
  1148                      [('Any L,X WHERE X login L, X is CWUser', [{'X': 'CWUser', 'L': 'String'}])],
       
  1149                      [self.ldap, self.system], None, {'X': 'table2.C1', 'X.login': 'table2.C0', 'L': 'table2.C0'}, []),
       
  1150                     ('OneFetchStep',
       
  1151                      [('Any G,L WHERE X in_group G, X login L, G name "managers", (EXISTS(X copain T, T login L, T is CWUser)) OR (EXISTS(X in_state S, S name "pascontent", NOT EXISTS(X copain T2), S is State)), G is CWGroup, T2 is CWUser, X is CWUser',
       
  1152                        [{'G': 'CWGroup', 'L': 'String', 'S': 'State', 'T': 'CWUser', 'T2': 'CWUser', 'X': 'CWUser'}])],
       
  1153                      None, None, [self.system],
       
  1154                      {'T2': 'table1.C0', 'L': 'table2.C0',
       
  1155                       'T': 'table0.C0', 'T.login': 'table0.C1', 'X': 'table2.C1', 'X.login': 'table2.C0'}, [])])
       
  1156 
       
  1157     def test_exists5(self):
       
  1158         self._test('Any GN,L WHERE X in_group G, X login L, G name GN, '
       
  1159                    'EXISTS(X copain T, T login in ("comme", "cochon")) AND '
       
  1160                    'NOT EXISTS(X copain T2, T2 login "billy")',
       
  1161                    [('FetchStep', [('Any T WHERE T login IN("comme", "cochon"), T is CWUser',
       
  1162                                     [{'T': 'CWUser'}])],
       
  1163                      [self.ldap, self.system], None, {'T': 'table0.C0'}, []),
       
  1164                     ('FetchStep', [('Any T2 WHERE T2 login "billy", T2 is CWUser', [{'T2': 'CWUser'}])],
       
  1165                      [self.ldap, self.system], None, {'T2': 'table1.C0'}, []),
       
  1166                     ('FetchStep', [('Any L,X WHERE X login L, X is CWUser', [{'X': 'CWUser', 'L': 'String'}])],
       
  1167                      [self.ldap, self.system], None,
       
  1168                      {'X': 'table2.C1', 'X.login': 'table2.C0', 'L': 'table2.C0'}, []),
       
  1169                     ('OneFetchStep', [('Any GN,L WHERE X in_group G, X login L, G name GN, EXISTS(X copain T, T is CWUser), NOT EXISTS(X copain T2, T2 is CWUser), G is CWGroup, X is CWUser',
       
  1170                        [{'G': 'CWGroup', 'GN': 'String', 'L': 'String', 'T': 'CWUser', 'T2': 'CWUser', 'X': 'CWUser'}])],
       
  1171                      None, None, [self.system],
       
  1172                      {'T': 'table0.C0', 'T2': 'table1.C0',
       
  1173                       'X': 'table2.C1', 'X.login': 'table2.C0', 'L': 'table2.C0'}, [])])
       
  1174 
       
  1175     def test_exists_security_no_invariant(self):
       
  1176         ueid = self.session.user.eid
       
  1177         self._test('Any X,AA,AB,AC,AD ORDERBY AA WHERE X is CWUser, X login AA, X firstname AB, X surname AC, X modification_date AD, A eid %(B)s, \
       
  1178     EXISTS(((X identity A) OR \
       
  1179             (EXISTS(X in_group C, C name IN("managers", "staff"), C is CWGroup))) OR \
       
  1180             (EXISTS(X in_group D, A in_group D, NOT D name "users", D is CWGroup)))',
       
  1181                [('FetchStep', [('Any X,AA,AB,AC,AD WHERE X login AA, X firstname AB, X surname AC, X modification_date AD, X is CWUser',
       
  1182                                 [{'AA': 'String', 'AB': 'String', 'AC': 'String', 'AD': 'Datetime',
       
  1183                                   'X': 'CWUser'}])],
       
  1184                  [self.ldap, self.system], None, {'AA': 'table0.C1', 'AB': 'table0.C2',
       
  1185                                                   'AC': 'table0.C3', 'AD': 'table0.C4',
       
  1186                                                   'X': 'table0.C0',
       
  1187                                                   'X.firstname': 'table0.C2',
       
  1188                                                   'X.login': 'table0.C1',
       
  1189                                                   'X.modification_date': 'table0.C4',
       
  1190                                                   'X.surname': 'table0.C3'}, []),
       
  1191                 ('OneFetchStep', [('Any X,AA,AB,AC,AD ORDERBY AA WHERE X login AA, X firstname AB, X surname AC, X modification_date AD, EXISTS(((X identity %(ueid)s) OR (EXISTS(X in_group C, C name IN("managers", "staff"), C is CWGroup))) OR (EXISTS(X in_group D, %(ueid)s in_group D, NOT D name "users", D is CWGroup))), X is CWUser' % {'ueid': ueid},
       
  1192                                    [{'AA': 'String', 'AB': 'String', 'AC': 'String', 'AD': 'Datetime',
       
  1193                                      'C': 'CWGroup', 'D': 'CWGroup', 'X': 'CWUser'}])],
       
  1194                  None, None, [self.system],
       
  1195                  {'AA': 'table0.C1', 'AB': 'table0.C2', 'AC': 'table0.C3', 'AD': 'table0.C4',
       
  1196                   'X': 'table0.C0',
       
  1197                   'X.firstname': 'table0.C2', 'X.login': 'table0.C1', 'X.modification_date': 'table0.C4', 'X.surname': 'table0.C3'},
       
  1198                  [])],
       
  1199                    {'B': ueid})
       
  1200 
       
  1201     def test_relation_need_split(self):
       
  1202         self._test('Any X, S WHERE X in_state S',
       
  1203                    [('UnionStep', None, None, [
       
  1204                        ('OneFetchStep', [('Any X,S WHERE X in_state S, S is State, X is IN(Affaire, CWUser)',
       
  1205                                           [{'X': 'Affaire', 'S': 'State'}, {'X': 'CWUser', 'S': 'State'}])],
       
  1206                         None, None, [self.system], {}, []),
       
  1207                        ('OneFetchStep', [('Any X,S WHERE X in_state S, S is State, X is Note',
       
  1208                                           [{'X': 'Note', 'S': 'State'}])],
       
  1209                         None, None, [self.cards, self.system], {}, []),
       
  1210                     ])])
       
  1211 
       
  1212     def test_relation_selection_need_split(self):
       
  1213         self._test('Any X,S,U WHERE X in_state S, X todo_by U',
       
  1214                    [('FetchStep', [('Any X,S WHERE X in_state S, S is State, X is Note',
       
  1215                                     [{'X': 'Note', 'S': 'State'}])],
       
  1216                      [self.cards, self.system], None, {'X': 'table0.C0', 'S': 'table0.C1'}, []),
       
  1217                      ('UnionStep', None, None,
       
  1218                       [('OneFetchStep', [('Any X,S,U WHERE X in_state S, X todo_by U, S is State, U is Personne, X is Affaire',
       
  1219                                           [{'X': 'Affaire', 'S': 'State', 'U': 'Personne'}])],
       
  1220                         None, None, [self.system], {}, []),
       
  1221                        ('OneFetchStep', [('Any X,S,U WHERE X todo_by U, S is State, U is CWUser, X is Note',
       
  1222                                           [{'X': 'Note', 'S': 'State', 'U': 'CWUser'}])],
       
  1223                         None, None, [self.system], {'X': 'table0.C0', 'S': 'table0.C1'}, []),
       
  1224                        ])
       
  1225                     ])
       
  1226 
       
  1227     def test_relation_restriction_need_split(self):
       
  1228         self._test('Any X,U WHERE X in_state S, S name "pending", X todo_by U',
       
  1229                    [('FetchStep', [('Any X WHERE X in_state S, S name "pending", S is State, X is Note',
       
  1230                                     [{'X': 'Note', 'S': 'State'}])],
       
  1231                      [self.cards, self.system], None, {'X': 'table0.C0'}, []),
       
  1232                      ('UnionStep', None, None,
       
  1233                       [('OneFetchStep', [('Any X,U WHERE X todo_by U, U is CWUser, X is Note',
       
  1234                                           [{'X': 'Note', 'U': 'CWUser'}])],
       
  1235                         None, None, [self.system], {'X': 'table0.C0'}, []),
       
  1236                        ('OneFetchStep', [('Any X,U WHERE X in_state S, S name "pending", X todo_by U, S is State, U is Personne, X is Affaire',
       
  1237                                           [{'S': 'State', 'U': 'Personne', 'X': 'Affaire'}])],
       
  1238                         None, None, [self.system], {}, [])
       
  1239                        ])
       
  1240                     ])
       
  1241 
       
  1242     def test_relation_restriction_ambigous_need_split(self):
       
  1243         self._test('Any X,T WHERE X in_state S, S name "pending", T tags X',
       
  1244                    [('FetchStep', [('Any X WHERE X in_state S, S name "pending", S is State, X is Note',
       
  1245                                     [{'X': 'Note', 'S': 'State'}])],
       
  1246                      [self.cards, self.system], None, {'X': 'table0.C0'}, []),
       
  1247                     ('UnionStep', None, None, [
       
  1248                         ('OneFetchStep', [('Any X,T WHERE T tags X, T is Tag, X is Note',
       
  1249                                            [{'X': 'Note', 'T': 'Tag'}])],
       
  1250                          None, None,
       
  1251                          [self.system], {'X': 'table0.C0'}, []),
       
  1252                         ('OneFetchStep', [('Any X,T WHERE X in_state S, S name "pending", T tags X, S is State, T is Tag, X is IN(Affaire, CWUser)',
       
  1253                                            [{'X': 'Affaire', 'S': 'State', 'T': 'Tag'},
       
  1254                                             {'X': 'CWUser', 'S': 'State', 'T': 'Tag'}])],
       
  1255                          None, None,
       
  1256                          [self.system], {}, []),
       
  1257                         ])
       
  1258                     ])
       
  1259 
       
  1260     def test_not_relation_no_split_internal(self):
       
  1261         ueid = self.session.user.eid
       
  1262         # NOT on a relation supported by rql and system source: we want to get
       
  1263         # all states (eg from both sources) which are not related to entity with the
       
  1264         # given eid. The "NOT X in_state S, X eid %(x)s" expression is necessarily true
       
  1265         # in the source where %(x)s is not coming from and will be removed during rql
       
  1266         # generation for the external source
       
  1267         self._test('Any SN WHERE NOT X in_state S, X eid %(x)s, S name SN',
       
  1268                    [('OneFetchStep', [('Any SN WHERE NOT EXISTS(%s in_state S), S name SN, S is State' % ueid,
       
  1269                                        [{'S': 'State', 'SN': 'String'}])],
       
  1270                      None, None, [self.cards, self.system], {}, [])],
       
  1271                    {'x': ueid})
       
  1272 
       
  1273     def test_not_relation_no_split_external(self):
       
  1274         repo._type_source_cache[999999] = ('Note', 'cards', 999999, 'cards')
       
  1275         # similar to the above test but with an eid coming from the external source.
       
  1276         # the same plan may be used, since we won't find any record in the system source
       
  1277         # linking 9999999 to a state
       
  1278         self._test('Any SN WHERE NOT X in_state S, X eid %(x)s, S name SN',
       
  1279                    [('OneFetchStep', [('Any SN WHERE NOT EXISTS(999999 in_state S), S name SN, S is State',
       
  1280                                        [{'S': 'State', 'SN': 'String'}])],
       
  1281                      None, None, [self.cards, self.system], {}, [])],
       
  1282                    {'x': 999999})
       
  1283 
       
  1284     def test_not_relation_need_split(self):
       
  1285         self._test('Any SN WHERE NOT X in_state S, S name SN',
       
  1286                    [('FetchStep', [('Any SN,S WHERE S name SN, S is State',
       
  1287                                     [{'S': 'State', 'SN': 'String'}])],
       
  1288                      [self.cards, self.system], None, {'S': 'table0.C1', 'S.name': 'table0.C0', 'SN': 'table0.C0'},
       
  1289                      []),
       
  1290                     ('IntersectStep', None, None,
       
  1291                      [('OneFetchStep',
       
  1292                        [('Any SN WHERE NOT EXISTS(X in_state S, X is Note), S name SN, S is State',
       
  1293                          [{'S': 'State', 'SN': 'String', 'X': 'Note'}])],
       
  1294                        None, None, [self.cards, self.system], {},
       
  1295                        []),
       
  1296                       ('OneFetchStep',
       
  1297                        [('Any SN WHERE NOT EXISTS(X in_state S, X is IN(Affaire, CWUser)), S name SN, S is State',
       
  1298                          [{'S': 'State', 'SN': 'String', 'X': 'Affaire'},
       
  1299                           {'S': 'State', 'SN': 'String', 'X': 'CWUser'}])],
       
  1300                        None, None, [self.system], {'S': 'table0.C1', 'S.name': 'table0.C0', 'SN': 'table0.C0'},
       
  1301                        []),]
       
  1302                      )])
       
  1303 
       
  1304     def test_external_attributes_and_relation(self):
       
  1305         repo._type_source_cache[999999] = ('Note', 'cards', 999999, 'cards')
       
  1306         self._test('Any A,B,C,D WHERE A eid %(x)s,A creation_date B,A modification_date C, A todo_by D?',
       
  1307                    [('FetchStep', [('Any A,B,C WHERE A eid 999999, A creation_date B, A modification_date C, A is Note',
       
  1308                                     [{'A': 'Note', 'C': 'Datetime', 'B': 'Datetime'}])],
       
  1309                      [self.cards], None,
       
  1310                      {'A': 'table0.C0', 'A.creation_date': 'table0.C1', 'A.modification_date': 'table0.C2', 'C': 'table0.C2', 'B': 'table0.C1'}, []),
       
  1311                     #('FetchStep', [('Any D WHERE D is CWUser', [{'D': 'CWUser'}])],
       
  1312                     # [self.ldap, self.system], None, {'D': 'table1.C0'}, []),
       
  1313                     ('OneFetchStep', [('Any A,B,C,D WHERE A creation_date B, A modification_date C, A todo_by D?, A is Note, D is CWUser',
       
  1314                                        [{'A': 'Note', 'C': 'Datetime', 'B': 'Datetime', 'D': 'CWUser'}])],
       
  1315                      None, None, [self.system],
       
  1316                      {'A': 'table0.C0', 'A.creation_date': 'table0.C1', 'A.modification_date': 'table0.C2', 'C': 'table0.C2', 'B': 'table0.C1'}, [])],
       
  1317                    {'x': 999999})
       
  1318 
       
  1319 
       
  1320     def test_simplified_var_1(self):
       
  1321         ueid = self.session.user.eid
       
  1322         repo._type_source_cache[999999] = ('Note', 'cards', 999999, 'cards')
       
  1323         # need access to cards source since X table has to be accessed because of the outer join
       
  1324         self._test('Any U WHERE U in_group G, (G name IN ("managers", "logilab") OR '
       
  1325                    '(X require_permission P?, P name "bla", P require_group G)), X eid %(x)s, U eid %(u)s',
       
  1326                    [('FetchStep',
       
  1327                      [('Any 999999', [{}])], [self.cards],
       
  1328                      None, {u'%(x)s': 'table0.C0'}, []),
       
  1329                     ('OneFetchStep',
       
  1330                      [(u'Any 6 WHERE 6 in_group G, (G name IN("managers", "logilab")) OR '
       
  1331                        '(X require_permission P?, P name "bla", P require_group G), '
       
  1332                        'G is CWGroup, P is CWPermission, X is Note',
       
  1333                        [{'G': 'CWGroup', 'P': 'CWPermission', 'X': 'Note'}])],
       
  1334                      None, None, [self.system], {u'%(x)s': 'table0.C0'}, [])],
       
  1335                    {'x': 999999, 'u': ueid})
       
  1336 
       
  1337     def test_simplified_var_2(self):
       
  1338         ueid = self.session.user.eid
       
  1339         repo._type_source_cache[999999] = ('Note', 'cards', 999999, 'cards')
       
  1340         # no need access to source since X is invariant
       
  1341         self._test('Any U WHERE U in_group G, (G name IN ("managers", "logilab") OR '
       
  1342                    '(X require_permission P, P name "bla", P require_group G)), X eid %(x)s, U eid %(u)s',
       
  1343                    [('OneFetchStep', [('Any %s WHERE %s in_group G, (G name IN("managers", "logilab")) OR (999999 require_permission P, P name "bla", P require_group G)' % (ueid, ueid),
       
  1344                                        [{'G': 'CWGroup', 'P': 'CWPermission'}])],
       
  1345                      None, None, [self.system], {}, [])],
       
  1346                    {'x': 999999, 'u': ueid})
       
  1347 
       
  1348     def test_has_text(self):
       
  1349         self._test('Card X WHERE X has_text "toto"',
       
  1350                    [('OneFetchStep', [('Any X WHERE X has_text "toto", X is Card',
       
  1351                                        [{'X': 'Card'}])],
       
  1352                      None, None, [self.system], {}, [])])
       
  1353 
       
  1354     def test_has_text_3(self):
       
  1355         self._test('Any X WHERE X has_text "toto", X title "zoubidou", X is IN (Card, EmailThread)',
       
  1356                    [('FetchStep', [(u'Any X WHERE X title "zoubidou", X is Card',
       
  1357                                     [{'X': 'Card'}])],
       
  1358                      [self.cards, self.system], None, {'X': 'table0.C0'}, []),
       
  1359                     ('UnionStep', None, None, [
       
  1360                         ('OneFetchStep', [(u'Any X WHERE X has_text "toto", X is Card',
       
  1361                                            [{'X': 'Card'}])],
       
  1362                          None, None, [self.system], {'X': 'table0.C0'}, []),
       
  1363                         ('OneFetchStep', [(u'Any X WHERE X has_text "toto", X title "zoubidou", X is EmailThread',
       
  1364                                            [{'X': 'EmailThread'}])],
       
  1365                          None, None, [self.system], {}, []),
       
  1366                         ]),
       
  1367                     ])
       
  1368 
       
  1369     def test_has_text_orderby_rank(self):
       
  1370         self._test('Any X ORDERBY FTIRANK(X) WHERE X has_text "bla", X firstname "bla"',
       
  1371                    [('FetchStep', [('Any X WHERE X firstname "bla", X is CWUser', [{'X': 'CWUser'}])],
       
  1372                      [self.ldap, self.system], None, {'X': 'table0.C0'}, []),
       
  1373                     ('AggrStep', 'SELECT table1.C1 FROM table1\nORDER BY table1.C0', None, [
       
  1374                         ('FetchStep', [('Any FTIRANK(X),X WHERE X has_text "bla", X is CWUser',
       
  1375                                         [{'X': 'CWUser'}])],
       
  1376                          [self.system], {'X': 'table0.C0'}, {'FTIRANK(X)': 'table1.C0', 'X': 'table1.C1'}, []),
       
  1377                         ('FetchStep', [('Any FTIRANK(X),X WHERE X has_text "bla", X firstname "bla", X is Personne',
       
  1378                                         [{'X': 'Personne'}])],
       
  1379                          [self.system], {}, {'FTIRANK(X)': 'table1.C0', 'X': 'table1.C1'}, []),
       
  1380                         ]),
       
  1381                     ])
       
  1382 
       
  1383     def test_security_has_text_orderby_rank(self):
       
  1384         # use a guest user
       
  1385         self.session = self.user_groups_session('guests')
       
  1386         ueid = self.session.user.eid
       
  1387         self._test('Any X ORDERBY FTIRANK(X) WHERE X has_text "bla", X firstname "bla"',
       
  1388                    [('FetchStep', [('Any X WHERE X firstname "bla", X is CWUser', [{'X': 'CWUser'}])],
       
  1389                      [self.ldap, self.system], None, {'X': 'table1.C0'}, []),
       
  1390                     ('UnionFetchStep',
       
  1391                      [('FetchStep', [('Any X WHERE X firstname "bla", X is Personne', [{'X': 'Personne'}])],
       
  1392                        [self.system], {}, {'X': 'table0.C0'}, []),
       
  1393                       ('FetchStep', [('Any X WHERE EXISTS(X owned_by %s), X is CWUser' % ueid, [{'X': 'CWUser'}])],
       
  1394                        [self.system], {'X': 'table1.C0'}, {'X': 'table0.C0'}, [])]),
       
  1395                     ('OneFetchStep', [('Any X ORDERBY FTIRANK(X) WHERE X has_text "bla"',
       
  1396                                        [{'X': 'CWUser'}, {'X': 'Personne'}])],
       
  1397                      None, None, [self.system], {'X': 'table0.C0'}, []),
       
  1398                     ])
       
  1399 
       
  1400     def test_has_text_select_rank(self):
       
  1401         self._test('Any X, FTIRANK(X) WHERE X has_text "bla", X firstname "bla"',
       
  1402                    # XXX unecessary duplicate selection
       
  1403                    [('FetchStep', [('Any X,X WHERE X firstname "bla", X is CWUser', [{'X': 'CWUser'}])],
       
  1404                      [self.ldap, self.system], None, {'X': 'table0.C1'}, []),
       
  1405                     ('UnionStep', None, None, [
       
  1406                         ('OneFetchStep', [('Any X,FTIRANK(X) WHERE X has_text "bla", X is CWUser', [{'X': 'CWUser'}])],
       
  1407                          None, None, [self.system], {'X': 'table0.C1'}, []),
       
  1408                         ('OneFetchStep', [('Any X,FTIRANK(X) WHERE X has_text "bla", X firstname "bla", X is Personne', [{'X': 'Personne'}])],
       
  1409                          None, None, [self.system], {}, []),
       
  1410                         ]),
       
  1411                     ])
       
  1412 
       
  1413     def test_security_has_text_select_rank(self):
       
  1414         # use a guest user
       
  1415         self.session = self.user_groups_session('guests')
       
  1416         ueid = self.session.user.eid
       
  1417         self._test('Any X, FTIRANK(X) WHERE X has_text "bla", X firstname "bla"',
       
  1418                    [('FetchStep', [('Any X,X WHERE X firstname "bla", X is CWUser', [{'X': 'CWUser'}])],
       
  1419                      [self.ldap, self.system], None, {'X': 'table0.C1'}, []),
       
  1420                     ('UnionStep', None, None, [
       
  1421                         ('OneFetchStep', [('Any X,FTIRANK(X) WHERE X has_text "bla", EXISTS(X owned_by %s), X is CWUser' % ueid, [{'X': 'CWUser'}])],
       
  1422                          None, None, [self.system], {'X': 'table0.C1'}, []),
       
  1423                         ('OneFetchStep', [('Any X,FTIRANK(X) WHERE X has_text "bla", X firstname "bla", X is Personne', [{'X': 'Personne'}])],
       
  1424                          None, None, [self.system], {}, []),
       
  1425                         ]),
       
  1426                     ])
       
  1427 
       
  1428     def test_sort_func(self):
       
  1429         self._test('Note X ORDERBY DUMB_SORT(RF) WHERE X type RF',
       
  1430                    [('AggrStep', 'SELECT table0.C0 FROM table0\nORDER BY DUMB_SORT(table0.C1)', None, [
       
  1431                        ('FetchStep', [('Any X,RF WHERE X type RF, X is Note',
       
  1432                                        [{'X': 'Note', 'RF': 'String'}])],
       
  1433                         [self.cards, self.system], {}, {'X': 'table0.C0', 'X.type': 'table0.C1', 'RF': 'table0.C1'}, []),
       
  1434                        ])
       
  1435                     ])
       
  1436 
       
  1437     def test_ambigous_sort_func(self):
       
  1438         self._test('Any X ORDERBY DUMB_SORT(RF) WHERE X title RF, X is IN (Bookmark, Card, EmailThread)',
       
  1439                    [('AggrStep', 'SELECT table0.C0 FROM table0\nORDER BY DUMB_SORT(table0.C1)', None,
       
  1440                      [('FetchStep', [('Any X,RF WHERE X title RF, X is Card',
       
  1441                                       [{'X': 'Card', 'RF': 'String'}])],
       
  1442                        [self.cards, self.system], {},
       
  1443                        {'X': 'table0.C0', 'X.title': 'table0.C1', 'RF': 'table0.C1'}, []),
       
  1444                       ('FetchStep', [('Any X,RF WHERE X title RF, X is IN(Bookmark, EmailThread)',
       
  1445                                       [{'RF': 'String', 'X': 'Bookmark'},
       
  1446                                        {'RF': 'String', 'X': 'EmailThread'}])],
       
  1447                        [self.system], {},
       
  1448                        {'X': 'table0.C0', 'X.title': 'table0.C1', 'RF': 'table0.C1'}, []),
       
  1449                       ]),
       
  1450                    ])
       
  1451 
       
  1452     def test_attr_unification_1(self):
       
  1453         self._test('Any X,Y WHERE X is Bookmark, Y is Card, X title T, Y title T',
       
  1454                    [('FetchStep',
       
  1455                      [('Any Y,T WHERE Y title T, Y is Card', [{'T': 'String', 'Y': 'Card'}])],
       
  1456                      [self.cards, self.system], None,
       
  1457                      {'T': 'table0.C1', 'Y': 'table0.C0', 'Y.title': 'table0.C1'}, []),
       
  1458                     ('OneFetchStep',
       
  1459                      [('Any X,Y WHERE X title T, Y title T, X is Bookmark, Y is Card',
       
  1460                        [{'T': 'String', 'X': 'Bookmark', 'Y': 'Card'}])],
       
  1461                      None, None, [self.system],
       
  1462                      {'T': 'table0.C1', 'Y': 'table0.C0', 'Y.title': 'table0.C1'}, [])
       
  1463                     ])
       
  1464 
       
  1465     def test_attr_unification_2(self):
       
  1466         self._test('Any X,Y WHERE X is Note, Y is Card, X type T, Y title T',
       
  1467                    [('FetchStep',
       
  1468                      [('Any X,T WHERE X type T, X is Note', [{'T': 'String', 'X': 'Note'}])],
       
  1469                      [self.cards, self.system], None,
       
  1470                      {'T': 'table0.C1', 'X': 'table0.C0', 'X.type': 'table0.C1'}, []),
       
  1471                     ('FetchStep',
       
  1472                      [('Any Y,T WHERE Y title T, Y is Card', [{'T': 'String', 'Y': 'Card'}])],
       
  1473                      [self.cards, self.system], None,
       
  1474                      {'T': 'table1.C1', 'Y': 'table1.C0', 'Y.title': 'table1.C1'}, []),
       
  1475                     ('OneFetchStep',
       
  1476                      [('Any X,Y WHERE X type T, Y title T, X is Note, Y is Card',
       
  1477                        [{'T': 'String', 'X': 'Note', 'Y': 'Card'}])],
       
  1478                      None, None, [self.system],
       
  1479                      {'T': 'table1.C1',
       
  1480                       'X': 'table0.C0', 'X.type': 'table0.C1',
       
  1481                       'Y': 'table1.C0', 'Y.title': 'table1.C1'}, [])
       
  1482                     ])
       
  1483 
       
  1484     def test_attr_unification_neq_1(self):
       
  1485         self._test('Any X,Y WHERE X is Bookmark, Y is Card, X creation_date D, Y creation_date > D',
       
  1486                    [('FetchStep',
       
  1487                      [('Any Y,D WHERE Y creation_date D, Y is Card',
       
  1488                        [{'D': 'Datetime', 'Y': 'Card'}])],
       
  1489                      [self.cards,self.system], None,
       
  1490                      {'D': 'table0.C1', 'Y': 'table0.C0', 'Y.creation_date': 'table0.C1'}, []),
       
  1491                     ('OneFetchStep',
       
  1492                      [('Any X,Y WHERE X creation_date D, Y creation_date > D, X is Bookmark, Y is Card',
       
  1493                        [{'D': 'Datetime', 'X': 'Bookmark', 'Y': 'Card'}])], None, None,
       
  1494                      [self.system],
       
  1495                      {'D': 'table0.C1', 'Y': 'table0.C0', 'Y.creation_date': 'table0.C1'}, [])
       
  1496                    ])
       
  1497 
       
  1498     def test_subquery_1(self):
       
  1499         ueid = self.session.user.eid
       
  1500         self._test('DISTINCT Any B,C ORDERBY C WHERE A created_by B, B login C, EXISTS(B owned_by D), D eid %(E)s '
       
  1501                    'WITH A,N BEING ((Any X,N WHERE X is Tag, X name N) UNION (Any X,T WHERE X is Bookmark, X title T))',
       
  1502                    [('FetchStep', [('Any X,N WHERE X is Tag, X name N', [{'N': 'String', 'X': 'Tag'}]),
       
  1503                                    ('Any X,T WHERE X is Bookmark, X title T',
       
  1504                                     [{'T': 'String', 'X': 'Bookmark'}])],
       
  1505                      [self.system], {}, {'N': 'table0.C1', 'X': 'table0.C0', 'X.name': 'table0.C1'}, []),
       
  1506                     ('FetchStep',
       
  1507                      [('Any B,C WHERE B login C, B is CWUser', [{'B': 'CWUser', 'C': 'String'}])],
       
  1508                      [self.ldap, self.system], None, {'B': 'table1.C0', 'B.login': 'table1.C1', 'C': 'table1.C1'}, []),
       
  1509                     ('OneFetchStep', [('DISTINCT Any B,C ORDERBY C WHERE A created_by B, B login C, EXISTS(B owned_by %s), B is CWUser, A is IN(Bookmark, Tag)' % ueid,
       
  1510                                        [{'A': 'Bookmark', 'B': 'CWUser', 'C': 'String'},
       
  1511                                         {'A': 'Tag', 'B': 'CWUser', 'C': 'String'}])],
       
  1512                      None, None, [self.system],
       
  1513                      {'A': 'table0.C0',
       
  1514                       'B': 'table1.C0', 'B.login': 'table1.C1',
       
  1515                       'C': 'table1.C1',
       
  1516                       'N': 'table0.C1'},
       
  1517                      [])],
       
  1518                    {'E': ueid})
       
  1519 
       
  1520     def test_subquery_2(self):
       
  1521         ueid = self.session.user.eid
       
  1522         self._test('DISTINCT Any B,C ORDERBY C WHERE A created_by B, B login C, EXISTS(B owned_by D), D eid %(E)s '
       
  1523                    'WITH A,N BEING ((Any X,N WHERE X is Tag, X name N) UNION (Any X,T WHERE X is Card, X title T))',
       
  1524                    [('UnionFetchStep',
       
  1525                      [('FetchStep', [('Any X,N WHERE X is Tag, X name N', [{'N': 'String', 'X': 'Tag'}])],
       
  1526                        [self.system], {},
       
  1527                        {'N': 'table0.C1',
       
  1528                         'T': 'table0.C1',
       
  1529                         'X': 'table0.C0',
       
  1530                         'X.name': 'table0.C1',
       
  1531                         'X.title': 'table0.C1'}, []),
       
  1532                       ('FetchStep', [('Any X,T WHERE X is Card, X title T',
       
  1533                                       [{'T': 'String', 'X': 'Card'}])],
       
  1534                        [self.cards, self.system], {},
       
  1535                        {'N': 'table0.C1',
       
  1536                         'T': 'table0.C1',
       
  1537                         'X': 'table0.C0',
       
  1538                         'X.name': 'table0.C1',
       
  1539                         'X.title': 'table0.C1'}, []),
       
  1540                       ]),
       
  1541                     ('FetchStep',
       
  1542                      [('Any B,C WHERE B login C, B is CWUser', [{'B': 'CWUser', 'C': 'String'}])],
       
  1543                      [self.ldap, self.system], None, {'B': 'table1.C0', 'B.login': 'table1.C1', 'C': 'table1.C1'}, []),
       
  1544                     ('OneFetchStep', [('DISTINCT Any B,C ORDERBY C WHERE A created_by B, B login C, EXISTS(B owned_by %s), B is CWUser, A is IN(Card, Tag)' % ueid,
       
  1545                                        [{'A': 'Card', 'B': 'CWUser', 'C': 'String'},
       
  1546                                         {'A': 'Tag', 'B': 'CWUser', 'C': 'String'}])],
       
  1547                      None, None, [self.system],
       
  1548                      {'A': 'table0.C0',
       
  1549                       'B': 'table1.C0', 'B.login': 'table1.C1',
       
  1550                       'C': 'table1.C1',
       
  1551                       'N': 'table0.C1'},
       
  1552                      [])],
       
  1553                    {'E': ueid})
       
  1554 
       
  1555     def test_eid_dont_cross_relation_1(self):
       
  1556         repo._type_source_cache[999999] = ('Personne', 'system', 999999, 'system')
       
  1557         self._test('Any Y,YT WHERE X eid %(x)s, X fiche Y, Y title YT',
       
  1558                    [('OneFetchStep', [('Any Y,YT WHERE X eid 999999, X fiche Y, Y title YT',
       
  1559                                        [{'X': 'Personne', 'Y': 'Card', 'YT': 'String'}])],
       
  1560                      None, None, [self.system], {}, [])],
       
  1561                    {'x': 999999})
       
  1562 
       
  1563     def test_eid_dont_cross_relation_2(self):
       
  1564         repo._type_source_cache[999999] = ('Note', 'cards', 999999, 'cards')
       
  1565         self.cards.dont_cross_relations.add('concerne')
       
  1566         try:
       
  1567             self._test('Any Y,S,YT,X WHERE Y concerne X, Y in_state S, X eid 999999, Y ref YT',
       
  1568                    [('OneFetchStep', [('Any Y,S,YT,999999 WHERE Y concerne 999999, Y in_state S, Y ref YT',
       
  1569                                        [{'Y': 'Affaire', 'YT': 'String', 'S': 'State'}])],
       
  1570                      None, None, [self.system], {}, [])],
       
  1571                    {'x': 999999})
       
  1572         finally:
       
  1573             self.cards.dont_cross_relations.remove('concerne')
       
  1574 
       
  1575 
       
  1576     # external source w/ .cross_relations == ['multisource_crossed_rel'] ######
       
  1577 
       
  1578     def test_crossed_relation_eid_1_invariant(self):
       
  1579         repo._type_source_cache[999999] = ('Note', 'system', 999999, 'system')
       
  1580         self._test('Any Y WHERE X eid %(x)s, X multisource_crossed_rel Y',
       
  1581                    [('OneFetchStep', [('Any Y WHERE 999999 multisource_crossed_rel Y', [{u'Y': 'Note'}])],
       
  1582                       None, None, [self.system], {}, [])
       
  1583                     ],
       
  1584                    {'x': 999999,})
       
  1585 
       
  1586     def test_crossed_relation_eid_1_needattr(self):
       
  1587         repo._type_source_cache[999999] = ('Note', 'system', 999999, 'system')
       
  1588         self._test('Any Y,T WHERE X eid %(x)s, X multisource_crossed_rel Y, Y type T',
       
  1589                    [('FetchStep', [('Any Y,T WHERE Y type T, Y is Note', [{'T': 'String', 'Y': 'Note'}])],
       
  1590                      [self.cards, self.system], None,
       
  1591                      {'T': 'table0.C1', 'Y': 'table0.C0', 'Y.type': 'table0.C1'}, []),
       
  1592                     ('OneFetchStep', [('Any Y,T WHERE 999999 multisource_crossed_rel Y, Y type T, Y is Note',
       
  1593                                        [{'T': 'String', 'Y': 'Note'}])],
       
  1594                      None, None, [self.system],
       
  1595                      {'T': 'table0.C1', 'Y': 'table0.C0', 'Y.type': 'table0.C1'}, []),
       
  1596                     ],
       
  1597                    {'x': 999999,})
       
  1598 
       
  1599     def test_crossed_relation_eid_2_invariant(self):
       
  1600         repo._type_source_cache[999999] = ('Note', 'cards', 999999, 'cards')
       
  1601         self._test('Any Y WHERE X eid %(x)s, X multisource_crossed_rel Y',
       
  1602                    [('OneFetchStep', [('Any Y WHERE 999999 multisource_crossed_rel Y, Y is Note', [{'Y': 'Note'}])],
       
  1603                       None, None, [self.cards, self.system], {}, [])
       
  1604                     ],
       
  1605                    {'x': 999999,})
       
  1606 
       
  1607     def test_crossed_relation_eid_2_needattr(self):
       
  1608         repo._type_source_cache[999999] = ('Note', 'cards', 999999, 'cards')
       
  1609         self._test('Any Y,T WHERE X eid %(x)s, X multisource_crossed_rel Y, Y type T',
       
  1610                    [('OneFetchStep', [('Any Y,T WHERE 999999 multisource_crossed_rel Y, Y type T, Y is Note',
       
  1611                                        [{'T': 'String', 'Y': 'Note'}])],
       
  1612                      None, None, [self.cards, self.system], {},
       
  1613                      []),
       
  1614                     ],
       
  1615                    {'x': 999999,})
       
  1616 
       
  1617     def test_crossed_relation_eid_not_1(self):
       
  1618         repo._type_source_cache[999999] = ('Note', 'system', 999999, 'system')
       
  1619         self._test('Any Y WHERE X eid %(x)s, NOT X multisource_crossed_rel Y',
       
  1620                    [('FetchStep', [('Any Y WHERE Y is Note', [{'Y': 'Note'}])],
       
  1621                      [self.cards, self.system], None, {'Y': 'table0.C0'}, []),
       
  1622                     ('OneFetchStep', [('Any Y WHERE NOT EXISTS(999999 multisource_crossed_rel Y), Y is Note',
       
  1623                                        [{'Y': 'Note'}])],
       
  1624                      None, None, [self.system],
       
  1625                      {'Y': 'table0.C0'},  [])],
       
  1626                    {'x': 999999,})
       
  1627 
       
  1628 #     def test_crossed_relation_eid_not_2(self):
       
  1629 #         repo._type_source_cache[999999] = ('Note', 'cards', 999999)
       
  1630 #         self._test('Any Y WHERE X eid %(x)s, NOT X multisource_crossed_rel Y',
       
  1631 #                    [],
       
  1632 #                    {'x': 999999,})
       
  1633 
       
  1634     def test_crossed_relation_base_XXXFIXME(self):
       
  1635         repo._type_source_cache[999999] = ('Note', 'system', 999999, 'system')
       
  1636         self._test('Any X,Y,T WHERE X multisource_crossed_rel Y, Y type T, X type T',
       
  1637                    [('FetchStep', [('Any X,T WHERE X type T, X is Note', [{'T': 'String', 'X': 'Note'}])],
       
  1638                      [self.cards, self.system], None,
       
  1639                      {'T': 'table0.C1', 'X': 'table0.C0', 'X.type': 'table0.C1'}, []),
       
  1640                     ('FetchStep',  [('Any Y,T WHERE Y type T, Y is Note', [{'T': 'String', 'Y': 'Note'}])],
       
  1641                      [self.cards, self.system], None,
       
  1642                      {'T': 'table1.C1', 'Y': 'table1.C0', 'Y.type': 'table1.C1'},  []),
       
  1643                     ('FetchStep', [('Any X,Y WHERE X multisource_crossed_rel Y, X is Note, Y is Note',
       
  1644                                     [{'X': 'Note', 'Y': 'Note'}])],
       
  1645                      [self.cards, self.system], None,
       
  1646                      {'X': 'table2.C0', 'Y': 'table2.C1'},
       
  1647                      []),
       
  1648                     ('OneFetchStep', [('Any X,Y,T WHERE X multisource_crossed_rel Y, Y type T, X type T, '
       
  1649                                        'X is Note, Y is Note, Y identity A, X identity B, A is Note, B is Note',
       
  1650                                        [{u'A': 'Note', u'B': 'Note', 'T': 'String', 'X': 'Note', 'Y': 'Note'}])],
       
  1651                      None, None,
       
  1652                      [self.system],
       
  1653                      {'A': 'table1.C0',
       
  1654                       'B': 'table0.C0',
       
  1655                       'T': 'table1.C1',
       
  1656                       'X': 'table2.C0',
       
  1657                       'X.type': 'table0.C1',
       
  1658                       'Y': 'table2.C1',
       
  1659                       'Y.type': 'table1.C1'},
       
  1660                      []),
       
  1661                     ],
       
  1662                     {'x': 999999,})
       
  1663 
       
  1664     def test_crossed_relation_noeid_needattr(self):
       
  1665         # http://www.cubicweb.org/ticket/1382452
       
  1666         self._test('DISTINCT Any DEP WHERE DEP is Note, P type "cubicweb-foo", P multisource_crossed_rel DEP, DEP type LIKE "cubicweb%"',
       
  1667                    [('FetchStep', [(u'Any DEP WHERE DEP type LIKE "cubicweb%", DEP is Note',
       
  1668                                     [{'DEP': 'Note'}])],
       
  1669                      [self.cards, self.system], None,
       
  1670                      {'DEP': 'table0.C0'},
       
  1671                      []),
       
  1672                     ('FetchStep', [(u'Any P WHERE P type "cubicweb-foo", P is Note', [{'P': 'Note'}])],
       
  1673                      [self.cards, self.system], None, {'P': 'table1.C0'},
       
  1674                      []),
       
  1675                     ('FetchStep', [('Any DEP,P WHERE P multisource_crossed_rel DEP, DEP is Note, P is Note',
       
  1676                                     [{'DEP': 'Note', 'P': 'Note'}])],
       
  1677                      [self.cards, self.system], None, {'DEP': 'table2.C0', 'P': 'table2.C1'},
       
  1678                      []),
       
  1679                     ('OneFetchStep',
       
  1680                      [('DISTINCT Any DEP WHERE P multisource_crossed_rel DEP, DEP is Note, '
       
  1681                        'P is Note, DEP identity A, P identity B, A is Note, B is Note',
       
  1682                        [{u'A': 'Note', u'B': 'Note', 'DEP': 'Note', 'P': 'Note'}])],
       
  1683                      None, None, [self.system],
       
  1684                      {'A': 'table0.C0', 'B': 'table1.C0', 'DEP': 'table2.C0', 'P': 'table2.C1'},
       
  1685                      [])])
       
  1686 
       
  1687     def test_crossed_relation_noeid_invariant(self):
       
  1688         # see comment in http://www.cubicweb.org/ticket/1382452
       
  1689         self.schema.add_relation_def(
       
  1690             RelationDefinition(subject='Note', name='multisource_crossed_rel', object='Affaire'))
       
  1691         self.repo.set_schema(self.schema)
       
  1692         try:
       
  1693             self._test('DISTINCT Any P,DEP WHERE P type "cubicweb-foo", P multisource_crossed_rel DEP',
       
  1694                        [('FetchStep',
       
  1695                          [('Any DEP WHERE DEP is Note', [{'DEP': 'Note'}])],
       
  1696                          [self.cards, self.system], None, {'DEP': 'table0.C0'}, []),
       
  1697                         ('FetchStep',
       
  1698                          [(u'Any P WHERE P type "cubicweb-foo", P is Note', [{'P': 'Note'}])],
       
  1699                          [self.cards, self.system], None, {'P': 'table1.C0'}, []),
       
  1700                         ('UnionStep', None, None,
       
  1701                          [('OneFetchStep',
       
  1702                            [('DISTINCT Any P,DEP WHERE P multisource_crossed_rel DEP, DEP is Note, P is Note',
       
  1703                              [{'DEP': 'Note', 'P': 'Note'}])],
       
  1704                            None, None, [self.cards], None, []),
       
  1705                           ('OneFetchStep',
       
  1706                            [('DISTINCT Any P,DEP WHERE P multisource_crossed_rel DEP, DEP is Note, P is Note',
       
  1707                              [{'DEP': 'Note', 'P': 'Note'}])],
       
  1708                            None, None, [self.system],
       
  1709                            {'DEP': 'table0.C0', 'P': 'table1.C0'},
       
  1710                            []),
       
  1711                           ('OneFetchStep',
       
  1712                            [('DISTINCT Any P,DEP WHERE P multisource_crossed_rel DEP, DEP is Affaire, P is Note',
       
  1713                              [{'DEP': 'Affaire', 'P': 'Note'}])],
       
  1714                            None, None, [self.system], {'P': 'table1.C0'},
       
  1715                            [])])
       
  1716                         ])
       
  1717         finally:
       
  1718             self.schema.del_relation_def('Note', 'multisource_crossed_rel', 'Affaire')
       
  1719             self.repo.set_schema(self.schema)
       
  1720 
       
  1721     # edition queries tests ###################################################
       
  1722 
       
  1723     def test_insert_simplified_var_1(self):
       
  1724         repo._type_source_cache[999999] = ('Note', 'cards', 999999, 'cards')
       
  1725         repo._type_source_cache[999998] = ('State', 'system', None, 'system')
       
  1726         self._test('INSERT Note X: X in_state S, X type T WHERE S eid %(s)s, N eid %(n)s, N type T',
       
  1727                    [('InsertStep',
       
  1728                      [('InsertRelationsStep',
       
  1729                        [('OneFetchStep', [('Any T WHERE N eid 999999, N type T, N is Note',
       
  1730                                            [{'N': 'Note', 'T': 'String'}])],
       
  1731                         None, None, [self.cards], {}, [])])
       
  1732                       ])
       
  1733                     ],
       
  1734                    {'n': 999999, 's': 999998})
       
  1735 
       
  1736     def test_insert_simplified_var_2(self):
       
  1737         repo._type_source_cache[999999] = ('Note', 'cards', 999999, 'cards')
       
  1738         repo._type_source_cache[999998] = ('State', 'system', None, 'system')
       
  1739         self._test('INSERT Note X: X in_state S, X type T, X migrated_from N WHERE S eid %(s)s, N eid %(n)s, N type T',
       
  1740                    [('InsertStep',
       
  1741                      [('InsertRelationsStep',
       
  1742                        [('OneFetchStep', [('Any T WHERE N eid 999999, N type T, N is Note',
       
  1743                                            [{'N': 'Note', 'T': 'String'}])],
       
  1744                          None, None, [self.cards], {}, [])
       
  1745                         ])
       
  1746                       ])
       
  1747                     ],
       
  1748                    {'n': 999999, 's': 999998})
       
  1749 
       
  1750     def test_insert_simplified_var_3(self):
       
  1751         repo._type_source_cache[999999] = ('Note', 'cards', 999999, 'cards')
       
  1752         repo._type_source_cache[999998] = ('State', 'cards', 999998, 'cards')
       
  1753         self._test('INSERT Note X: X in_state S, X type T WHERE S eid %(s)s, N eid %(n)s, N type T',
       
  1754                    [('InsertStep',
       
  1755                      [('InsertRelationsStep',
       
  1756                        [('OneFetchStep', [('Any T WHERE N eid 999999, N type T, N is Note',
       
  1757                                            [{'N': 'Note', 'T': 'String'}])],
       
  1758                          None, None, [self.cards], {}, [])]
       
  1759                        )]
       
  1760                      )],
       
  1761                    {'n': 999999, 's': 999998})
       
  1762 
       
  1763     def test_insert_simplified_var_4(self):
       
  1764         repo._type_source_cache[999999] = ('Note', 'cards', 999999, 'cards')
       
  1765         repo._type_source_cache[999998] = ('State', 'system', None, 'system')
       
  1766         self._test('INSERT Note X: X in_state S, X type "bla", X migrated_from N WHERE S eid %(s)s, N eid %(n)s',
       
  1767                    [('InsertStep',
       
  1768                       [('InsertRelationsStep', [])]
       
  1769                      )],
       
  1770                    {'n': 999999, 's': 999998})
       
  1771 
       
  1772     def test_insert_simplified_var_5(self):
       
  1773         repo._type_source_cache[999999] = ('Note', 'cards', 999999, 'cards')
       
  1774         repo._type_source_cache[999998] = ('State', 'system', None, 'system')
       
  1775         self._test('INSERT Note X: X in_state S, X type "bla", X migrated_from N WHERE S eid %(s)s, N eid %(n)s, A concerne N',
       
  1776                    [('InsertStep',
       
  1777                      [('InsertRelationsStep',
       
  1778                        [('OneFetchStep',
       
  1779                          [('Any A WHERE A concerne 999999, A is Affaire',
       
  1780                            [{'A': 'Affaire'}])],
       
  1781                          None, None, [self.system], {}, []),
       
  1782                         ]),
       
  1783                       ])
       
  1784                     ],
       
  1785                    {'n': 999999, 's': 999998})
       
  1786 
       
  1787     def test_delete_relation1(self):
       
  1788         ueid = self.session.user.eid
       
  1789         self._test('DELETE X created_by Y WHERE X eid %(x)s, NOT Y eid %(y)s',
       
  1790                    [('DeleteRelationsStep', [
       
  1791                        ('OneFetchStep', [('Any %s,Y WHERE %s created_by Y, NOT Y eid %s, Y is CWUser' % (ueid, ueid, ueid),
       
  1792                                           [{'Y': 'CWUser'}])],
       
  1793                         None, None, [self.system], {}, []),
       
  1794                        ]),
       
  1795                     ],
       
  1796                    {'x': ueid, 'y': ueid})
       
  1797 
       
  1798     def test_delete_relation2(self):
       
  1799         ueid = self.session.user.eid
       
  1800         self._test('DELETE X created_by Y WHERE X eid %(x)s, NOT Y login "syt"',
       
  1801                    [('FetchStep', [('Any Y WHERE NOT Y login "syt", Y is CWUser', [{'Y': 'CWUser'}])],
       
  1802                      [self.ldap, self.system], None, {'Y': 'table0.C0'}, []),
       
  1803                     ('DeleteRelationsStep', [
       
  1804                         ('OneFetchStep', [('Any %s,Y WHERE %s created_by Y, Y is CWUser'%(ueid,ueid), [{'Y': 'CWUser'}])],
       
  1805                          None, None, [self.system], {'Y': 'table0.C0'}, []),
       
  1806                         ]),
       
  1807                     ],
       
  1808                    {'x': ueid, 'y': ueid})
       
  1809 
       
  1810     def test_delete_relation3(self):
       
  1811         repo._type_source_cache[999999] = ('Note', 'cards', 999999, 'cards')
       
  1812         self.assertRaises(
       
  1813             BadRQLQuery, self._test,
       
  1814             'DELETE Y multisource_inlined_rel X WHERE X eid %(x)s, '
       
  1815             'NOT (Y cw_source S, S name %(source)s)', [],
       
  1816             {'x': 999999, 'source': 'cards'})
       
  1817 
       
  1818     def test_delete_relation4(self):
       
  1819         repo._type_source_cache[999999] = ('Note', 'cards', 999999, 'cards')
       
  1820         self.assertRaises(
       
  1821             BadRQLQuery, self._test,
       
  1822             'DELETE X multisource_inlined_rel Y WHERE Y is Note, X eid %(x)s, '
       
  1823             'NOT (Y cw_source S, S name %(source)s)', [],
       
  1824             {'x': 999999, 'source': 'cards'})
       
  1825 
       
  1826     def test_delete_entity1(self):
       
  1827         repo._type_source_cache[999999] = ('Note', 'system', 999999, 'system')
       
  1828         self._test('DELETE Note X WHERE X eid %(x)s, NOT Y multisource_rel X',
       
  1829                    [('DeleteEntitiesStep',
       
  1830                      [('OneFetchStep', [('Any 999999 WHERE NOT EXISTS(Y multisource_rel 999999), Y is IN(Card, Note)',
       
  1831                                          [{'Y': 'Card'}, {'Y': 'Note'}])],
       
  1832                        None, None, [self.system], {}, [])
       
  1833                       ])
       
  1834                     ],
       
  1835                    {'x': 999999})
       
  1836 
       
  1837     def test_delete_entity2(self):
       
  1838         repo._type_source_cache[999999] = ('Note', 'system', 999999, 'system')
       
  1839         self._test('DELETE Note X WHERE X eid %(x)s, NOT X multisource_inlined_rel Y',
       
  1840                    [('DeleteEntitiesStep',
       
  1841                      [('OneFetchStep', [('Any X WHERE X eid 999999, NOT X multisource_inlined_rel Y, X is Note, Y is IN(Affaire, Note)',
       
  1842                                          [{'X': 'Note', 'Y': 'Affaire'}, {'X': 'Note', 'Y': 'Note'}])],
       
  1843                        None, None, [self.system], {}, [])
       
  1844                       ])
       
  1845                     ],
       
  1846                    {'x': 999999})
       
  1847 
       
  1848     def test_update(self):
       
  1849         self._test('SET X copain Y WHERE X login "comme", Y login "cochon"',
       
  1850                    [('FetchStep',
       
  1851                      [('Any X WHERE X login "comme", X is CWUser', [{'X': 'CWUser'}])],
       
  1852                      [self.ldap, self.system], None, {'X': 'table0.C0'}, []),
       
  1853                     ('FetchStep',
       
  1854                      [('Any Y WHERE Y login "cochon", Y is CWUser', [{'Y': 'CWUser'}])],
       
  1855                      [self.ldap, self.system], None, {'Y': 'table1.C0'}, []),
       
  1856                     ('UpdateStep',
       
  1857                      [('OneFetchStep',
       
  1858                        [('DISTINCT Any X,Y WHERE X is CWUser, Y is CWUser',
       
  1859                          [{'X': 'CWUser', 'Y': 'CWUser'}])],
       
  1860                        None, None, [self.system], {'X': 'table0.C0', 'Y': 'table1.C0'}, [])
       
  1861                       ])
       
  1862                     ])
       
  1863 
       
  1864     def test_update2(self):
       
  1865         self._test('SET U in_group G WHERE G name ~= "bougloup%", U login "admin"',
       
  1866                    [('FetchStep', [('Any U WHERE U login "admin", U is CWUser', [{'U': 'CWUser'}])],
       
  1867                      [self.ldap, self.system], None, {'U': 'table0.C0'}, []),
       
  1868                      ('UpdateStep', [
       
  1869                         ('OneFetchStep', [('DISTINCT Any U,G WHERE G name ILIKE "bougloup%", G is CWGroup, U is CWUser',
       
  1870                                            [{'U': 'CWUser', 'G': 'CWGroup'}])],
       
  1871                          None, None, [self.system], {'U': 'table0.C0'}, []),
       
  1872                         ]),
       
  1873                     ])
       
  1874 
       
  1875     def test_update3(self):
       
  1876         anoneid = self.user_groups_session('guests').user.eid
       
  1877         # since we are adding a in_state relation for an entity in the system
       
  1878         # source, states should only be searched in the system source as well
       
  1879         self._test('SET X in_state S WHERE X eid %(x)s, S name "deactivated"',
       
  1880                    [('UpdateStep', [
       
  1881                        ('OneFetchStep', [('DISTINCT Any S WHERE S name "deactivated", S is State',
       
  1882                                           [{'S': 'State'}])],
       
  1883                         None, None, [self.system], {}, []),
       
  1884                        ]),
       
  1885                     ],
       
  1886                    {'x': anoneid})
       
  1887 
       
  1888 #     def test_update4(self):
       
  1889 #         # since we are adding a in_state relation with a state from the system
       
  1890 #         # source, CWUser should only be searched only in the system source as well
       
  1891 #         rset = self.execute('State X WHERE X name "activated"')
       
  1892 #         assert len(rset) == 1, rset
       
  1893 #         activatedeid = rset[0][0]
       
  1894 #         self._test('SET X in_state S WHERE X is CWUser, S eid %s' % activatedeid,
       
  1895 #                    [('UpdateStep', [
       
  1896 #                        ('OneFetchStep', [('DISTINCT Any X,%s WHERE X is CWUser' % activatedeid,
       
  1897 #                                           [{'X': 'CWUser'}])],
       
  1898 #                         None, None, [self.system], {}, []),
       
  1899 #                        ]),
       
  1900 #                     ])
       
  1901 
       
  1902     def test_ldap_user_related_to_invariant_and_dont_cross_rel(self):
       
  1903         self.repo._type_source_cache[999999] = ('Note', 'cards', 999999, 'cards')
       
  1904         self.cards.dont_cross_relations.add('created_by')
       
  1905         try:
       
  1906             self._test('Any X,XL WHERE E eid %(x)s, E created_by X, X login XL',
       
  1907                    [('FetchStep', [('Any X,XL WHERE X login XL, X is CWUser',
       
  1908                                     [{'X': 'CWUser', 'XL': 'String'}])],
       
  1909                      [self.ldap, self.system], None,
       
  1910                      {'X': 'table0.C0', 'X.login': 'table0.C1', 'XL': 'table0.C1'},
       
  1911                      []),
       
  1912                     ('OneFetchStep',
       
  1913                      [('Any X,XL WHERE 999999 created_by X, X login XL, X is CWUser',
       
  1914                        [{'X': 'CWUser', 'XL': 'String'}])],
       
  1915                      None, None,
       
  1916                      [self.system],
       
  1917                      {'X': 'table0.C0', 'X.login': 'table0.C1', 'XL': 'table0.C1'},
       
  1918                      [])],
       
  1919                        {'x': 999999})
       
  1920         finally:
       
  1921             self.cards.dont_cross_relations.remove('created_by')
       
  1922 
       
  1923     def test_ambigous_cross_relation(self):
       
  1924         self.repo._type_source_cache[999999] = ('Note', 'cards', 999999, 'cards')
       
  1925         self.cards.support_relations['see_also'] = True
       
  1926         self.cards.cross_relations.add('see_also')
       
  1927         try:
       
  1928             self._test('Any X,AA ORDERBY AA WHERE E eid %(x)s, E see_also X, X modification_date AA',
       
  1929                        [('AggrStep',
       
  1930                          'SELECT table0.C0, table0.C1 FROM table0\nORDER BY table0.C1',
       
  1931                          None,
       
  1932                          [('FetchStep',
       
  1933                            [('Any X,AA WHERE 999999 see_also X, X modification_date AA, X is Note',
       
  1934                              [{'AA': 'Datetime', 'X': 'Note'}])], [self.cards, self.system], {},
       
  1935                            {'AA': 'table0.C1', 'X': 'table0.C0',
       
  1936                             'X.modification_date': 'table0.C1'},
       
  1937                            []),
       
  1938                           ('FetchStep',
       
  1939                            [('Any X,AA WHERE 999999 see_also X, X modification_date AA, X is Bookmark',
       
  1940                              [{'AA': 'Datetime', 'X': 'Bookmark'}])],
       
  1941                            [self.system], {},
       
  1942                            {'AA': 'table0.C1', 'X': 'table0.C0',
       
  1943                             'X.modification_date': 'table0.C1'},
       
  1944                            [])])],
       
  1945                          {'x': 999999})
       
  1946         finally:
       
  1947             del self.cards.support_relations['see_also']
       
  1948             self.cards.cross_relations.remove('see_also')
       
  1949 
       
  1950     def test_state_of_cross(self):
       
  1951         self._test('DELETE State X WHERE NOT X state_of Y',
       
  1952                    [('DeleteEntitiesStep',
       
  1953                      [('OneFetchStep',
       
  1954                        [('Any X WHERE NOT X state_of Y, X is State, Y is Workflow',
       
  1955                          [{'X': 'State', 'Y': 'Workflow'}])],
       
  1956                        None, None, [self.system], {}, [])])]
       
  1957                    )
       
  1958 
       
  1959 
       
  1960     def test_source_specified_0_0(self):
       
  1961         self._test('Card X WHERE X cw_source S, S eid 1',
       
  1962                    [('OneFetchStep', [('Any X WHERE X cw_source 1, X is Card',
       
  1963                                        [{'X': 'Card'}])],
       
  1964                      None, None,
       
  1965                      [self.system],{}, [])
       
  1966                     ])
       
  1967 
       
  1968     def test_source_specified_0_1(self):
       
  1969         self._test('Any X, S WHERE X is Card, X cw_source S, S eid 1',
       
  1970                    [('OneFetchStep', [('Any X,1 WHERE X is Card, X cw_source 1',
       
  1971                                        [{'X': 'Card'}])],
       
  1972                      None, None,
       
  1973                      [self.system],{}, [])
       
  1974                     ])
       
  1975 
       
  1976     def test_source_specified_1_0(self):
       
  1977         self._test('Card X WHERE X cw_source S, S name "system"',
       
  1978                    [('OneFetchStep', [('Any X WHERE X cw_source S, S name "system", X is Card',
       
  1979                                        [{'X': 'Card', 'S': 'CWSource'}])],
       
  1980                      None, None,
       
  1981                      [self.system],{}, [])
       
  1982                     ])
       
  1983 
       
  1984     def test_source_specified_1_1(self):
       
  1985         self._test('Any X, SN WHERE X is Card, X cw_source S, S name "system", S name SN',
       
  1986                    [('OneFetchStep', [('Any X,SN WHERE X is Card, X cw_source S, S name "system", '
       
  1987                                        'S name SN',
       
  1988                                        [{'S': 'CWSource', 'SN': 'String', 'X': 'Card'}])],
       
  1989                      None, None, [self.system], {}, [])
       
  1990                     ])
       
  1991 
       
  1992     def test_source_specified_1_2(self):
       
  1993         self._test('Card X WHERE X cw_source S, S name "datafeed"',
       
  1994                    [('OneFetchStep', [('Any X WHERE X cw_source S, S name "datafeed", X is Card',
       
  1995                                        [{'X': 'Card', 'S': 'CWSource'}])],
       
  1996                      None, None,
       
  1997                      [self.system],{}, [])
       
  1998                     ])
       
  1999 
       
  2000     def test_source_specified_1_3(self):
       
  2001         self._test('Any X, SN WHERE X is Card, X cw_source S, S name "datafeed", S name SN',
       
  2002                    [('OneFetchStep', [('Any X,SN WHERE X is Card, X cw_source S, S name "datafeed", '
       
  2003                                        'S name SN',
       
  2004                                        [{'S': 'CWSource', 'SN': 'String', 'X': 'Card'}])],
       
  2005                      None, None, [self.system], {}, [])
       
  2006                     ])
       
  2007 
       
  2008     def test_source_specified_1_4(self):
       
  2009         sols = []
       
  2010         for sol in X_ALL_SOLS:
       
  2011             sol = sol.copy()
       
  2012             sol['S'] = 'CWSource'
       
  2013             sols.append(sol)
       
  2014         self._test('Any X WHERE X cw_source S, S name "cards"',
       
  2015                    [('OneFetchStep', [('Any X WHERE X cw_source S, S name "cards"',
       
  2016                                        sols)],
       
  2017                      None, None,
       
  2018                      [self.system],{}, [])
       
  2019                     ])
       
  2020 
       
  2021     def test_source_specified_2_0(self):
       
  2022         # self._test('Card X WHERE X cw_source S, NOT S eid 1',
       
  2023         #            [('OneFetchStep', [('Any X WHERE X is Card',
       
  2024         #                                [{'X': 'Card'}])],
       
  2025         #              None, None,
       
  2026         #              [self.cards],{}, [])
       
  2027         #             ])
       
  2028         self._test('Card X WHERE NOT X cw_source S, S eid 1',
       
  2029                    [('OneFetchStep', [('Any X WHERE X is Card',
       
  2030                                        [{'X': 'Card'}])],
       
  2031                      None, None,
       
  2032                      [self.cards],{}, [])
       
  2033                     ])
       
  2034 
       
  2035     def test_source_specified_2_1(self):
       
  2036         self._test('Card X WHERE X cw_source S, NOT S name "system"',
       
  2037                    [('OneFetchStep', [('Any X WHERE X is Card',
       
  2038                                        [{'X': 'Card'}])],
       
  2039                      None, None,
       
  2040                      [self.cards],{}, [])
       
  2041                     ])
       
  2042         self._test('Card X WHERE NOT X cw_source S, S name "system"',
       
  2043                    [('OneFetchStep', [('Any X WHERE X is Card',
       
  2044                                        [{'X': 'Card'}])],
       
  2045                      None, None,
       
  2046                      [self.cards],{}, [])
       
  2047                     ])
       
  2048 
       
  2049     def test_source_specified_3_1(self):
       
  2050         self._test('Any X,XT WHERE X is Card, X title XT, X cw_source S, S name "cards"',
       
  2051                    [('OneFetchStep',
       
  2052                      [('Any X,XT WHERE X is Card, X title XT',
       
  2053                        [{'X': 'Card', 'XT': 'String'}])],
       
  2054                      None, None, [self.cards], {}, [])
       
  2055                     ])
       
  2056 
       
  2057     def test_source_specified_3_2(self):
       
  2058         self._test('Any X,XT WHERE X is Card, X title XT, X cw_source S, S name "datafeed"',
       
  2059                    [('OneFetchStep',
       
  2060                      [('Any X,XT WHERE X is Card, X title XT, X cw_source S, S name "datafeed"',
       
  2061                        [{'X': 'Card', 'XT': 'String', 'S': 'CWSource'}])],
       
  2062                      None, None, [self.system], {}, [])
       
  2063                     ])
       
  2064 
       
  2065     def test_source_specified_3_3(self):
       
  2066         self.skipTest('oops')
       
  2067         self._test('Any STN WHERE X is Note, X type XT, X in_state ST, ST name STN, X cw_source S, S name "cards"',
       
  2068                    [('OneFetchStep',
       
  2069                      [('Any X,XT WHERE X is Card, X title XT',
       
  2070                        [{'X': 'Card', 'XT': 'String'}])],
       
  2071                      None, None, [self.cards], {}, [])
       
  2072                     ])
       
  2073 
       
  2074     def test_source_conflict_1(self):
       
  2075         self.repo._type_source_cache[999999] = ('Note', 'cards', 999999, 'cards')
       
  2076         with self.assertRaises(BadRQLQuery) as cm:
       
  2077             self._test('Any X WHERE X cw_source S, S name "system", X eid %(x)s',
       
  2078                        [], {'x': 999999})
       
  2079         self.assertEqual(str(cm.exception), 'source conflict for term %(x)s')
       
  2080 
       
  2081     def test_source_conflict_2(self):
       
  2082         with self.assertRaises(BadRQLQuery) as cm:
       
  2083             self._test('Card X WHERE X cw_source S, S name "systeme"', [])
       
  2084         self.assertEqual(str(cm.exception), 'source conflict for term X')
       
  2085 
       
  2086     def test_source_conflict_3(self):
       
  2087         self.skipTest('oops')
       
  2088         self._test('CWSource X WHERE X cw_source S, S name "cards"',
       
  2089                    [('OneFetchStep',
       
  2090                      [(u'Any X WHERE X cw_source S, S name "cards", X is CWSource',
       
  2091                        [{'S': 'CWSource', 'X': 'CWSource'}])],
       
  2092                      None, None,
       
  2093                      [self.system],
       
  2094                      {}, [])])
       
  2095 
       
  2096 
       
  2097     def test_ambigous_cross_relation_source_specified(self):
       
  2098         self.repo._type_source_cache[999999] = ('Note', 'cards', 999999, 'cards')
       
  2099         self.cards.support_relations['see_also'] = True
       
  2100         self.cards.cross_relations.add('see_also')
       
  2101         try:
       
  2102             self._test('Any X,AA ORDERBY AA WHERE E eid %(x)s, E see_also X, X modification_date AA',
       
  2103                        [('AggrStep',
       
  2104                          'SELECT table0.C0, table0.C1 FROM table0\nORDER BY table0.C1',
       
  2105                          None,
       
  2106                          [('FetchStep',
       
  2107                            [('Any X,AA WHERE 999999 see_also X, X modification_date AA, X is Note',
       
  2108                              [{'AA': 'Datetime', 'X': 'Note'}])], [self.cards, self.system], {},
       
  2109                            {'AA': 'table0.C1', 'X': 'table0.C0',
       
  2110                             'X.modification_date': 'table0.C1'},
       
  2111                            []),
       
  2112                           ('FetchStep',
       
  2113                            [('Any X,AA WHERE 999999 see_also X, X modification_date AA, X is Bookmark',
       
  2114                              [{'AA': 'Datetime', 'X': 'Bookmark'}])],
       
  2115                            [self.system], {},
       
  2116                            {'AA': 'table0.C1', 'X': 'table0.C0',
       
  2117                             'X.modification_date': 'table0.C1'},
       
  2118                            [])])],
       
  2119                          {'x': 999999})
       
  2120         finally:
       
  2121             del self.cards.support_relations['see_also']
       
  2122             self.cards.cross_relations.remove('see_also')
       
  2123 
       
  2124     # non regression tests ####################################################
       
  2125 
       
  2126     def test_nonregr1(self):
       
  2127         self._test('Any X, Y WHERE X copain Y, X login "syt", Y login "cochon"',
       
  2128                    [('FetchStep',
       
  2129                      [('Any X WHERE X login "syt", X is CWUser', [{'X': 'CWUser'}])],
       
  2130                      [self.ldap, self.system], None, {'X': 'table0.C0'}, []),
       
  2131                     ('FetchStep',
       
  2132                      [('Any Y WHERE Y login "cochon", Y is CWUser', [{'Y': 'CWUser'}])],
       
  2133                      [self.ldap, self.system], None, {'Y': 'table1.C0'}, []),
       
  2134                     ('OneFetchStep',
       
  2135                      [('Any X,Y WHERE X copain Y, X is CWUser, Y is CWUser',
       
  2136                        [{'X': 'CWUser', 'Y': 'CWUser'}])],
       
  2137                      None, None, [self.system], {'X': 'table0.C0', 'Y': 'table1.C0'}, [])
       
  2138                     ])
       
  2139 
       
  2140     def test_nonregr2(self):
       
  2141         iworkflowable = self.session.user.cw_adapt_to('IWorkflowable')
       
  2142         iworkflowable.fire_transition('deactivate')
       
  2143         treid = iworkflowable.latest_trinfo().eid
       
  2144         self._test('Any X ORDERBY D DESC WHERE E eid %(x)s, E wf_info_for X, X modification_date D',
       
  2145                    [('FetchStep', [('Any X,D WHERE X modification_date D, X is Note',
       
  2146                                     [{'X': 'Note', 'D': 'Datetime'}])],
       
  2147                      [self.cards, self.system], None, {'X': 'table0.C0', 'X.modification_date': 'table0.C1', 'D': 'table0.C1'}, []),
       
  2148                     ('FetchStep', [('Any X,D WHERE X modification_date D, X is CWUser',
       
  2149                                     [{'X': 'CWUser', 'D': 'Datetime'}])],
       
  2150                      [self.ldap, self.system], None, {'X': 'table1.C0', 'X.modification_date': 'table1.C1', 'D': 'table1.C1'}, []),
       
  2151                     ('AggrStep', 'SELECT table2.C0 FROM table2\nORDER BY table2.C1 DESC', None, [
       
  2152                         ('FetchStep', [('Any X,D WHERE E eid %s, E wf_info_for X, X modification_date D, E is TrInfo, X is Affaire'%treid,
       
  2153                                         [{'X': 'Affaire', 'E': 'TrInfo', 'D': 'Datetime'}])],
       
  2154                          [self.system],
       
  2155                          {},
       
  2156                          {'X': 'table2.C0', 'X.modification_date': 'table2.C1', 'D': 'table2.C1', 'E.wf_info_for': 'table2.C0'}, []),
       
  2157                         ('FetchStep', [('Any X,D WHERE E eid %s, E wf_info_for X, X modification_date D, E is TrInfo, X is CWUser'%treid,
       
  2158                                         [{'X': 'CWUser', 'E': 'TrInfo', 'D': 'Datetime'}])],
       
  2159                          [self.system],
       
  2160                          {'X': 'table1.C0', 'X.modification_date': 'table1.C1', 'D': 'table1.C1'},
       
  2161                          {'X': 'table2.C0', 'X.modification_date': 'table2.C1', 'D': 'table2.C1', 'E.wf_info_for': 'table2.C0'}, []),
       
  2162                         ('FetchStep', [('Any X,D WHERE E eid %s, E wf_info_for X, X modification_date D, E is TrInfo, X is Note'%treid,
       
  2163                                         [{'X': 'Note', 'E': 'TrInfo', 'D': 'Datetime'}])],
       
  2164                          [self.system],
       
  2165                          {'X': 'table0.C0', 'X.modification_date': 'table0.C1', 'D': 'table0.C1'},
       
  2166                          {'X': 'table2.C0', 'X.modification_date': 'table2.C1', 'D': 'table2.C1', 'E.wf_info_for': 'table2.C0'}, []),
       
  2167                         ]),
       
  2168                     ],
       
  2169                    {'x': treid})
       
  2170 
       
  2171     def test_nonregr3(self):
       
  2172         # original jpl query:
       
  2173         # Any X, NOW - CD, P WHERE P is Project, U interested_in P, U is CWUser, U login "sthenault", X concerns P, X creation_date CD ORDERBY CD DESC LIMIT 5
       
  2174         self._test('Any X, NOW - CD, P ORDERBY CD DESC LIMIT 5 WHERE P bookmarked_by U, U login "admin", P is X, X creation_date CD',
       
  2175                    [('FetchStep', [('Any U WHERE U login "admin", U is CWUser', [{'U': 'CWUser'}])],
       
  2176                      [self.ldap, self.system], None, {'U': 'table0.C0'}, []),
       
  2177                     ('OneFetchStep', [('Any X,(NOW - CD),P ORDERBY CD DESC LIMIT 5 WHERE P bookmarked_by U, P is X, X creation_date CD, P is Bookmark, U is CWUser, X is CWEType',
       
  2178                                        [{'P': 'Bookmark', 'U': 'CWUser', 'X': 'CWEType', 'CD': 'Datetime'}])],
       
  2179                      5, None,  [self.system], {'U': 'table0.C0'}, [])]
       
  2180                    )
       
  2181 
       
  2182     def test_nonregr4(self):
       
  2183         ueid = self.session.user.eid
       
  2184         self._test('Any U ORDERBY D DESC WHERE WF wf_info_for X, WF creation_date D, WF from_state FS, '
       
  2185                    'WF owned_by U?, X eid %(x)s',
       
  2186                    [#('FetchStep', [('Any U WHERE U is CWUser', [{'U': 'CWUser'}])],
       
  2187                     # [self.ldap, self.system], None, {'U': 'table0.C0'}, []),
       
  2188                     ('OneFetchStep', [('Any U ORDERBY D DESC WHERE WF wf_info_for %s, WF creation_date D, WF from_state FS, WF owned_by U?' % ueid,
       
  2189                                        [{'WF': 'TrInfo', 'FS': 'State', 'U': 'CWUser', 'D': 'Datetime'}])],
       
  2190                      None, None,
       
  2191                      [self.system], {}, [])],
       
  2192                    {'x': ueid})
       
  2193 
       
  2194     def test_nonregr5(self):
       
  2195         # original jpl query:
       
  2196         # DISTINCT Version V WHERE MB done_in MV, MV eid %(x)s,
       
  2197         # MB depends_on B, B done_in V, V version_of P, NOT P eid %(p)s'
       
  2198         cardeid = self.execute('INSERT Card X: X title "hop"')[0][0]
       
  2199         noteeid = self.execute('INSERT Note X')[0][0]
       
  2200         self._test('DISTINCT Card V WHERE MB documented_by MV, MV eid %(x)s, '
       
  2201                    'MB depends_on B, B documented_by V, V multisource_rel P, NOT P eid %(p)s',
       
  2202                    [('FetchStep', [('Any V WHERE V multisource_rel P, NOT P eid %s, P is Note, V is Card'%noteeid,
       
  2203                                     [{'P': 'Note', 'V': 'Card'}])],
       
  2204                      [self.cards, self.system], None, {'V': 'table0.C0'}, []),
       
  2205                     ('OneFetchStep', [('DISTINCT Any V WHERE MB documented_by %s, MB depends_on B, B documented_by V, B is Affaire, MB is Affaire, V is Card'%cardeid,
       
  2206                                        [{'B': 'Affaire', 'MB': 'Affaire', 'V': 'Card'}])],
       
  2207                      None, None, [self.system], {'V': 'table0.C0'}, [])],
       
  2208                    {'x': cardeid, 'p': noteeid})
       
  2209 
       
  2210     def test_nonregr6(self):
       
  2211         self._test('Any X WHERE X concerne Y',
       
  2212                    [('OneFetchStep', [('Any X WHERE X concerne Y',
       
  2213                                        [{'Y': 'Division', 'X': 'Affaire'},
       
  2214                                         {'Y': 'Note', 'X': 'Affaire'},
       
  2215                                         {'Y': 'Societe', 'X': 'Affaire'},
       
  2216                                         {'Y': 'SubDivision', 'X': 'Affaire'},
       
  2217                                         {'Y': 'Affaire', 'X': 'Personne'}])],
       
  2218                      None,  None, [self.system], {}, [])
       
  2219                     ])
       
  2220         self._test('Any X WHERE X concerne Y, Y is Note',
       
  2221                    [('FetchStep', [('Any Y WHERE Y is Note', [{'Y': 'Note'}])],
       
  2222                       [self.cards, self.system], None, {'Y': 'table0.C0'}, []),
       
  2223                     ('OneFetchStep', [('Any X WHERE X concerne Y, X is Affaire, Y is Note',
       
  2224                                        [{'X': 'Affaire', 'Y': 'Note'}])],
       
  2225                      None, None, [self.system], {'Y': 'table0.C0'}, [])
       
  2226                     ])
       
  2227 
       
  2228     def test_nonregr7(self):
       
  2229         repo._type_source_cache[999999] = ('Note', 'cards', 999999, 'cards')
       
  2230         self._test('Any S,SUM(DUR),SUM(I),(SUM(I) - SUM(DUR)),MIN(DI),MAX(DI) GROUPBY S ORDERBY S WHERE A is Affaire, A duration DUR, A invoiced I, A modification_date DI, A in_state S, S name SN, (EXISTS(A concerne WP, W multisource_rel WP)) OR (EXISTS(A concerne W)), W eid %(n)s',
       
  2231                    [('FetchStep', [('Any WP WHERE 999999 multisource_rel WP, WP is Note', [{'WP': 'Note'}])],
       
  2232                      [self.cards], None, {'WP': u'table0.C0'}, []),
       
  2233                     ('OneFetchStep', [('Any S,SUM(DUR),SUM(I),(SUM(I) - SUM(DUR)),MIN(DI),MAX(DI) GROUPBY S ORDERBY S WHERE A duration DUR, A invoiced I, A modification_date DI, A in_state S, S name SN, (EXISTS(A concerne WP, WP is Note)) OR (EXISTS(A concerne 999999)), A is Affaire, S is State',
       
  2234                                        [{'A': 'Affaire', 'DI': 'Datetime', 'DUR': 'Int', 'I': 'Float', 'S': 'State', 'SN': 'String', 'WP': 'Note'}])],
       
  2235                      None, None, [self.system], {'WP': u'table0.C0'}, [])],
       
  2236                    {'n': 999999})
       
  2237 
       
  2238     def test_nonregr8(self):
       
  2239         repo._type_source_cache[999999] = ('Note', 'cards', 999999, 'cards')
       
  2240         self._test('Any X,Z WHERE X eid %(x)s, X multisource_rel Y, Z concerne X',
       
  2241                    [('FetchStep', [('Any 999999 WHERE 999999 multisource_rel Y, Y is Note',
       
  2242                                     [{'Y': 'Note'}])],
       
  2243                      [self.cards],
       
  2244                      None, {u'%(x)s': 'table0.C0'},
       
  2245                      []),
       
  2246                     ('OneFetchStep', [('Any 999999,Z WHERE Z concerne 999999, Z is Affaire',
       
  2247                                        [{'Z': 'Affaire'}])],
       
  2248                      None, None, [self.system],
       
  2249                      {u'%(x)s': 'table0.C0'}, []),
       
  2250                     ],
       
  2251                    {'x': 999999})
       
  2252 
       
  2253     def test_nonregr9(self):
       
  2254         repo._type_source_cache[999999] = ('Note', 'cards', 999999, 'cards')
       
  2255         repo._type_source_cache[999998] = ('Note', 'cards', 999998, 'cards')
       
  2256         self._test('SET X migrated_from Y WHERE X eid %(x)s, Y multisource_rel Z, Z eid %(z)s, Y migrated_from Z',
       
  2257                    [('FetchStep', [('Any Y WHERE Y multisource_rel 999998, Y is Note', [{'Y': 'Note'}])],
       
  2258                      [self.cards], None, {'Y': u'table0.C0'}, []),
       
  2259                     ('UpdateStep',
       
  2260                      [('OneFetchStep', [('DISTINCT Any Y WHERE Y migrated_from 999998, Y is Note',
       
  2261                                          [{'Y': 'Note'}])],
       
  2262                        None, None, [self.system],
       
  2263                        {'Y': u'table0.C0'}, [])])],
       
  2264                    {'x': 999999, 'z': 999998})
       
  2265 
       
  2266     def test_nonregr10(self):
       
  2267         repo._type_source_cache[999999] = ('CWUser', 'ldap', 999999, 'ldap')
       
  2268         self._test('Any X,AA,AB ORDERBY AA WHERE E eid %(x)s, E owned_by X, X login AA, X modification_date AB',
       
  2269                    [('FetchStep',
       
  2270                      [('Any X,AA,AB WHERE X login AA, X modification_date AB, X is CWUser',
       
  2271                        [{'AA': 'String', 'AB': 'Datetime', 'X': 'CWUser'}])],
       
  2272                      [self.ldap, self.system], None, {'AA': 'table0.C1', 'AB': 'table0.C2',
       
  2273                                                       'X': 'table0.C0', 'X.login': 'table0.C1', 'X.modification_date': 'table0.C2'},
       
  2274                      []),
       
  2275                     ('OneFetchStep',
       
  2276                      [('Any X,AA,AB ORDERBY AA WHERE 999999 owned_by X, X login AA, X modification_date AB, X is CWUser',
       
  2277                        [{'AA': 'String', 'AB': 'Datetime', 'X': 'CWUser'}])],
       
  2278                      None, None, [self.system], {'AA': 'table0.C1', 'AB': 'table0.C2',
       
  2279                                                  'X': 'table0.C0', 'X.login': 'table0.C1', 'X.modification_date': 'table0.C2'},
       
  2280                      [])
       
  2281                     ],
       
  2282                    {'x': 999999})
       
  2283 
       
  2284     def test_nonregr11(self):
       
  2285         repo._type_source_cache[999999] = ('Bookmark', 'system', 999999, 'system')
       
  2286         self._test('SET X bookmarked_by Y WHERE X eid %(x)s, Y login "hop"',
       
  2287                    [('UpdateStep',
       
  2288                      [('OneFetchStep', [('DISTINCT Any Y WHERE Y login "hop", Y is CWUser', [{'Y': 'CWUser'}])],
       
  2289                        None, None, [self.ldap, self.system], {}, [])]
       
  2290                      )],
       
  2291                    {'x': 999999})
       
  2292 
       
  2293     def test_nonregr12(self):
       
  2294         repo._type_source_cache[999999] = ('Note', 'cards', 999999, 'cards')
       
  2295         self._test('Any X ORDERBY Z DESC WHERE X modification_date Z, E eid %(x)s, E see_also X',
       
  2296                    [('FetchStep', [('Any X,Z WHERE X modification_date Z, X is Note',
       
  2297                                     [{'X': 'Note', 'Z': 'Datetime'}])],
       
  2298                      [self.cards, self.system], None, {'X': 'table0.C0', 'X.modification_date': 'table0.C1', 'Z': 'table0.C1'},
       
  2299                      []),
       
  2300                     ('AggrStep', 'SELECT table1.C0 FROM table1\nORDER BY table1.C1 DESC', None,
       
  2301                      [('FetchStep', [('Any X,Z WHERE X modification_date Z, 999999 see_also X, X is Bookmark',
       
  2302                                       [{'X': 'Bookmark', 'Z': 'Datetime'}])],
       
  2303                        [self.system], {},   {'X': 'table1.C0', 'X.modification_date': 'table1.C1',
       
  2304                                              'Z': 'table1.C1'},
       
  2305                        []),
       
  2306                       ('FetchStep', [('Any X,Z WHERE X modification_date Z, 999999 see_also X, X is Note',
       
  2307                                       [{'X': 'Note', 'Z': 'Datetime'}])],
       
  2308                        [self.system], {'X': 'table0.C0', 'X.modification_date': 'table0.C1',
       
  2309                                        'Z': 'table0.C1'},
       
  2310                        {'X': 'table1.C0', 'X.modification_date': 'table1.C1',
       
  2311                         'Z': 'table1.C1'},
       
  2312                        [])]
       
  2313                       )],
       
  2314                    {'x': 999999})
       
  2315 
       
  2316     def test_nonregr13_1(self):
       
  2317         ueid = self.session.user.eid
       
  2318         # identity wrapped into exists:
       
  2319         # should'nt propagate constraint that U is in the same source as ME
       
  2320         self._test('Any B,U,UL GROUPBY B,U,UL WHERE B created_by U?, B is File '
       
  2321                    'WITH U,UL BEING (Any U,UL WHERE ME eid %(x)s, (EXISTS(U identity ME) '
       
  2322                    'OR (EXISTS(U in_group G, G name IN("managers", "staff")))) '
       
  2323                    'OR (EXISTS(U in_group H, ME in_group H, NOT H name "users")), U login UL, U is CWUser)',
       
  2324                    [('FetchStep', [('Any U,UL WHERE U login UL, U is CWUser',
       
  2325                                     [{'U': 'CWUser', 'UL': 'String'}])],
       
  2326                      [self.ldap, self.system], None,
       
  2327                      {'U': 'table0.C0', 'U.login': 'table0.C1', 'UL': 'table0.C1'},
       
  2328                      []),
       
  2329                     ('FetchStep', [('Any U,UL WHERE ((EXISTS(U identity %s)) OR (EXISTS(U in_group G, G name IN("managers", "staff"), G is CWGroup))) OR (EXISTS(U in_group H, %s in_group H, NOT H name "users", H is CWGroup)), U login UL, U is CWUser' % (ueid, ueid),
       
  2330                                     [{'G': 'CWGroup', 'H': 'CWGroup', 'U': 'CWUser', 'UL': 'String'}])],
       
  2331                      [self.system],
       
  2332                      {'U': 'table0.C0', 'U.login': 'table0.C1', 'UL': 'table0.C1'},
       
  2333                      {'U': 'table1.C0', 'U.login': 'table1.C1', 'UL': 'table1.C1'},
       
  2334                      []),
       
  2335                     ('OneFetchStep', [('Any B,U,UL GROUPBY B,U,UL WHERE B created_by U?, B is File',
       
  2336                                        [{'B': 'File', 'U': 'CWUser', 'UL': 'String'}])],
       
  2337                      None, None, [self.system],
       
  2338                      {'U': 'table1.C0', 'UL': 'table1.C1'},
       
  2339                      [])],
       
  2340                    {'x': ueid})
       
  2341 
       
  2342     def test_nonregr13_2(self):
       
  2343         # identity *not* wrapped into exists.
       
  2344         #
       
  2345         # XXX this test fail since in this case, in "U identity 5" U and 5 are
       
  2346         # from the same scope so constraints are applied (telling the U should
       
  2347         # come from the same source as user with eid 5).
       
  2348         #
       
  2349         # IMO this is normal, unless we introduce a special case for the
       
  2350         # identity relation. BUT I think it's better to leave it as is and to
       
  2351         # explain constraint propagation rules, and so why this should be
       
  2352         # wrapped in exists() if used in multi-source
       
  2353         self.skipTest('take a look at me if you wish')
       
  2354         ueid = self.session.user.eid
       
  2355         self._test('Any B,U,UL GROUPBY B,U,UL WHERE B created_by U?, B is File '
       
  2356                    'WITH U,UL BEING (Any U,UL WHERE ME eid %(x)s, (U identity ME '
       
  2357                    'OR (EXISTS(U in_group G, G name IN("managers", "staff")))) '
       
  2358                    'OR (EXISTS(U in_group H, ME in_group H, NOT H name "users")), U login UL, U is CWUser)',
       
  2359                    [('FetchStep', [('Any U,UL WHERE U login UL, U is CWUser',
       
  2360                                     [{'U': 'CWUser', 'UL': 'String'}])],
       
  2361                      [self.ldap, self.system], None,
       
  2362                      {'U': 'table0.C0', 'U.login': 'table0.C1', 'UL': 'table0.C1'},
       
  2363                      []),
       
  2364                     ('FetchStep', [('Any U,UL WHERE ((U identity %s) OR (EXISTS(U in_group G, G name IN("managers", "staff"), G is CWGroup))) OR (EXISTS(U in_group H, %s in_group H, NOT H name "users", H is CWGroup)), U login UL, U is CWUser' % (ueid, ueid),
       
  2365                                     [{'G': 'CWGroup', 'H': 'CWGroup', 'U': 'CWUser', 'UL': 'String'}])],
       
  2366                      [self.system],
       
  2367                      {'U': 'table0.C0', 'U.login': 'table0.C1', 'UL': 'table0.C1'},
       
  2368                      {'U': 'table1.C0', 'U.login': 'table1.C1', 'UL': 'table1.C1'},
       
  2369                      []),
       
  2370                     ('OneFetchStep', [('Any B,U,UL GROUPBY B,U,UL WHERE B created_by U?, B is File',
       
  2371                                        [{'B': 'File', 'U': 'CWUser', 'UL': 'String'}])],
       
  2372                      None, None, [self.system],
       
  2373                      {'U': 'table1.C0', 'UL': 'table1.C1'},
       
  2374                      [])],
       
  2375                    {'x': self.session.user.eid})
       
  2376 
       
  2377     def test_nonregr14_1(self):
       
  2378         repo._type_source_cache[999999] = ('CWUser', 'ldap', 999999, 'ldap')
       
  2379         self._test('Any X WHERE X eid %(x)s, X owned_by U, U eid %(u)s',
       
  2380                    [('OneFetchStep', [('Any 999999 WHERE 999999 owned_by 999999', [{}])],
       
  2381                      None, None, [self.system], {}, [])],
       
  2382                    {'x': 999999, 'u': 999999})
       
  2383 
       
  2384     def test_nonregr14_2(self):
       
  2385         repo._type_source_cache[999999] = ('CWUser', 'ldap', 999999, 'ldap')
       
  2386         repo._type_source_cache[999998] = ('Note', 'system', 999998, 'system')
       
  2387         self._test('Any X WHERE X eid %(x)s, X owned_by U, U eid %(u)s',
       
  2388                    [('OneFetchStep', [('Any 999998 WHERE 999998 owned_by 999999', [{}])],
       
  2389                      None, None, [self.system], {}, [])],
       
  2390                    {'x': 999998, 'u': 999999})
       
  2391 
       
  2392     def test_nonregr14_3(self):
       
  2393         repo._type_source_cache[999999] = ('CWUser', 'system', 999999, 'system')
       
  2394         repo._type_source_cache[999998] = ('CWUser', 'ldap', 999998, 'ldap')
       
  2395         self._test('Any X WHERE X eid %(x)s, X owned_by U, U eid %(u)s',
       
  2396                    [('OneFetchStep', [('Any 999998 WHERE 999998 owned_by 999999', [{}])],
       
  2397                      None, None, [self.system], {}, [])],
       
  2398                    {'x': 999998, 'u': 999999})
       
  2399 
       
  2400     def test_nonregr_identity_no_source_access_1(self):
       
  2401         repo._type_source_cache[999999] = ('CWUser', 'ldap', 999998, 'ldap')
       
  2402         self._test('Any S WHERE S identity U, S eid %(s)s, U eid %(u)s',
       
  2403                    [('OneFetchStep', [('Any 999999 WHERE 999999 identity 999999', [{}])],
       
  2404                      None, None, [self.system], {}, [])],
       
  2405                    {'s': 999999, 'u': 999999})
       
  2406 
       
  2407     def test_nonregr_identity_no_source_access_2(self):
       
  2408         repo._type_source_cache[999999] = ('EmailAddress', 'system', 999999, 'system')
       
  2409         repo._type_source_cache[999998] = ('CWUser', 'ldap', 999998, 'ldap')
       
  2410         self._test('Any X WHERE O use_email X, ((EXISTS(O identity U)) OR (EXISTS(O in_group G, G name IN("managers", "staff")))) OR (EXISTS(O in_group G2, U in_group G2, NOT G2 name "users")), X eid %(x)s, U eid %(u)s',
       
  2411                    [('OneFetchStep', [('Any 999999 WHERE O use_email 999999, ((EXISTS(O identity 999998)) OR (EXISTS(O in_group G, G name IN("managers", "staff")))) OR (EXISTS(O in_group G2, 999998 in_group G2, NOT G2 name "users"))',
       
  2412                                        [{'G': 'CWGroup', 'G2': 'CWGroup', 'O': 'CWUser'}])],
       
  2413                      None, None, [self.system], {}, [])],
       
  2414                    {'x': 999999, 'u': 999998})
       
  2415 
       
  2416     def test_nonregr_similar_subquery(self):
       
  2417         repo._type_source_cache[999999] = ('Personne', 'system', 999999, 'system')
       
  2418         self._test('Any T,TD,U,T,UL WITH T,TD,U,UL BEING ('
       
  2419                    '(Any T,TD,U,UL WHERE X eid %(x)s, T comments X, T content TD, T created_by U?, U login UL)'
       
  2420                    ' UNION '
       
  2421                    '(Any T,TD,U,UL WHERE X eid %(x)s, X connait P, T comments P, T content TD, T created_by U?, U login UL))',
       
  2422                    # XXX optimization: use a OneFetchStep with a UNION of both queries
       
  2423                    [('FetchStep', [('Any U,UL WHERE U login UL, U is CWUser',
       
  2424                                     [{'U': 'CWUser', 'UL': 'String'}])],
       
  2425                      [self.ldap, self.system], None,
       
  2426                      {'U': 'table0.C0', 'U.login': 'table0.C1', 'UL': 'table0.C1'},
       
  2427                      []),
       
  2428                     ('UnionFetchStep',
       
  2429                      [('FetchStep',
       
  2430                        [('Any T,TD,U,UL WHERE T comments 999999, T content TD, T created_by U?, U login UL, T is Comment, U is CWUser',
       
  2431                          [{'T': 'Comment', 'TD': 'String', 'U': 'CWUser', 'UL': 'String'}])],
       
  2432                        [self.system],
       
  2433                        {'U': 'table0.C0', 'U.login': 'table0.C1', 'UL': 'table0.C1'},
       
  2434                        {'T': 'table1.C0',
       
  2435                         'T.content': 'table1.C1',
       
  2436                         'TD': 'table1.C1',
       
  2437                         'U': 'table1.C2',
       
  2438                         'U.login': 'table1.C3',
       
  2439                         'UL': 'table1.C3'},
       
  2440                        []),
       
  2441                       ('FetchStep',
       
  2442                        [('Any T,TD,U,UL WHERE 999999 connait P, T comments P, T content TD, T created_by U?, U login UL, P is Personne, T is Comment, U is CWUser',
       
  2443                          [{'P': 'Personne',
       
  2444                            'T': 'Comment',
       
  2445                            'TD': 'String',
       
  2446                            'U': 'CWUser',
       
  2447                            'UL': 'String'}])],
       
  2448                        [self.system],
       
  2449                        {'U': 'table0.C0', 'U.login': 'table0.C1', 'UL': 'table0.C1'},
       
  2450                        {'T': 'table1.C0',
       
  2451                         'T.content': 'table1.C1',
       
  2452                         'TD': 'table1.C1',
       
  2453                         'U': 'table1.C2',
       
  2454                         'U.login': 'table1.C3',
       
  2455                         'UL': 'table1.C3'},
       
  2456                        [])]),
       
  2457                     ('OneFetchStep',
       
  2458                      [('Any T,TD,U,T,UL',
       
  2459                        [{'T': 'Comment', 'TD': 'String', 'U': 'CWUser', 'UL': 'String'}])],
       
  2460                      None, None,
       
  2461                      [self.system],
       
  2462                      {'T': 'table1.C0', 'TD': 'table1.C1', 'U': 'table1.C2', 'UL': 'table1.C3'},
       
  2463                      [])],
       
  2464                    {'x': 999999})
       
  2465 
       
  2466     def test_nonregr_dont_readd_already_processed_relation(self):
       
  2467         self._test('Any WO,D,SO WHERE WO is Note, D tags WO, WO in_state SO',
       
  2468                    [('FetchStep',
       
  2469                      [('Any WO,SO WHERE WO in_state SO, SO is State, WO is Note',
       
  2470                        [{'SO': 'State', 'WO': 'Note'}])],
       
  2471                      [self.cards, self.system], None,
       
  2472                      {'SO': 'table0.C1', 'WO': 'table0.C0'},
       
  2473                      []),
       
  2474                     ('OneFetchStep',
       
  2475                      [('Any WO,D,SO WHERE D tags WO, D is Tag, SO is State, WO is Note',
       
  2476                        [{'D': 'Tag', 'SO': 'State', 'WO': 'Note'}])],
       
  2477                      None, None, [self.system],
       
  2478                      {'SO': 'table0.C1', 'WO': 'table0.C0'},
       
  2479                      [])
       
  2480                     ])
       
  2481 
       
  2482 class MSPlannerTwoSameExternalSourcesTC(BasePlannerTC):
       
  2483     """test planner related feature on a 3-sources repository:
       
  2484 
       
  2485     * 2 rql sources supporting Card
       
  2486     """
       
  2487 
       
  2488     def setUp(self):
       
  2489         self.__class__.repo = repo
       
  2490         self.setup()
       
  2491         self.add_source(FakeCardSource, 'cards')
       
  2492         self.add_source(FakeCardSource, 'cards2')
       
  2493         self.planner = MSPlanner(self.o.schema, self.repo.vreg.rqlhelper)
       
  2494         assert repo.sources_by_uri['cards2'].support_relation('multisource_crossed_rel')
       
  2495         assert 'multisource_crossed_rel' in repo.sources_by_uri['cards2'].cross_relations
       
  2496         assert repo.sources_by_uri['cards'].support_relation('multisource_crossed_rel')
       
  2497         assert 'multisource_crossed_rel' in repo.sources_by_uri['cards'].cross_relations
       
  2498     _test = test_plan
       
  2499 
       
  2500 
       
  2501     def test_linked_external_entities(self):
       
  2502         repo._type_source_cache[999999] = ('Tag', 'system', 999999, 'system')
       
  2503         self._test('Any X,XT WHERE X is Card, X title XT, T tags X, T eid %(t)s',
       
  2504                    [('FetchStep',
       
  2505                      [('Any X,XT WHERE X title XT, X is Card', [{'X': 'Card', 'XT': 'String'}])],
       
  2506                      [self.cards, self.cards2, self.system],
       
  2507                      None, {'X': 'table0.C0', 'X.title': 'table0.C1', 'XT': 'table0.C1'},
       
  2508                      []),
       
  2509                     ('OneFetchStep',
       
  2510                      [('Any X,XT WHERE X title XT, 999999 tags X, X is Card',
       
  2511                        [{'X': 'Card', 'XT': 'String'}])],
       
  2512                      None, None, [self.system],
       
  2513                      {'X': 'table0.C0', 'X.title': 'table0.C1', 'XT': 'table0.C1'},
       
  2514                      [])],
       
  2515                    {'t': 999999})
       
  2516 
       
  2517     def test_version_depends_on(self):
       
  2518         self.repo._type_source_cache[999999] = ('Note', 'cards', 999999, 'cards')
       
  2519         self._test('Any X,AD,AE WHERE E eid %(x)s, E migrated_from X, X in_state AD, AD name AE',
       
  2520                    [('FetchStep', [('Any X,AD,AE WHERE X in_state AD, AD name AE, AD is State, X is Note',
       
  2521                                     [{'AD': 'State', 'AE': 'String', 'X': 'Note'}])],
       
  2522                      [self.cards, self.cards2, self.system],
       
  2523                      None, {'AD': 'table0.C1', 'AD.name': 'table0.C2',
       
  2524                             'AE': 'table0.C2', 'X': 'table0.C0'},
       
  2525                      []),
       
  2526                     ('OneFetchStep', [('Any X,AD,AE WHERE 999999 migrated_from X, AD name AE, AD is State, X is Note',
       
  2527                                        [{'AD': 'State', 'AE': 'String', 'X': 'Note'}])],
       
  2528                      None, None, [self.system],
       
  2529                      {'AD': 'table0.C1', 'AD.name': 'table0.C2', 'AE': 'table0.C2', 'X': 'table0.C0'},
       
  2530                      [])],
       
  2531                    {'x': 999999})
       
  2532 
       
  2533     def test_version_crossed_depends_on_1(self):
       
  2534         self.repo._type_source_cache[999999] = ('Note', 'cards', 999999, 'cards')
       
  2535         self._test('Any X,AD,AE WHERE E eid %(x)s, E multisource_crossed_rel X, X in_state AD, AD name AE',
       
  2536                    [('FetchStep', [('Any X,AD,AE WHERE X in_state AD, AD name AE, AD is State, X is Note',
       
  2537                                     [{'AD': 'State', 'AE': 'String', 'X': 'Note'}])],
       
  2538                      [self.cards, self.cards2, self.system],
       
  2539                      None, {'AD': 'table0.C1', 'AD.name': 'table0.C2',
       
  2540                             'AE': 'table0.C2', 'X': 'table0.C0'},
       
  2541                      []),
       
  2542                     ('UnionStep', None, None,
       
  2543                      [('OneFetchStep', [('Any X,AD,AE WHERE 999999 multisource_crossed_rel X, AD name AE, AD is State, X is Note',
       
  2544                                          [{'AD': 'State', 'AE': 'String', 'X': 'Note'}])],
       
  2545                        None, None, [self.cards], None,
       
  2546                        []),
       
  2547                       ('OneFetchStep', [('Any X,AD,AE WHERE 999999 multisource_crossed_rel X, AD name AE, AD is State, X is Note',
       
  2548                                          [{'AD': 'State', 'AE': 'String', 'X': 'Note'}])],
       
  2549                        None, None, [self.system],
       
  2550                        {'AD': 'table0.C1', 'AD.name': 'table0.C2',
       
  2551                         'AE': 'table0.C2', 'X': 'table0.C0'},
       
  2552                        [])]
       
  2553                      )],
       
  2554                    {'x': 999999})
       
  2555 
       
  2556     def test_version_crossed_depends_on_2(self):
       
  2557         self.repo._type_source_cache[999999] = ('Note', 'system', 999999, 'system')
       
  2558         self._test('Any X,AD,AE WHERE E eid %(x)s, E multisource_crossed_rel X, X in_state AD, AD name AE',
       
  2559                    [('FetchStep', [('Any X,AD,AE WHERE X in_state AD, AD name AE, AD is State, X is Note',
       
  2560                                     [{'AD': 'State', 'AE': 'String', 'X': 'Note'}])],
       
  2561                      [self.cards, self.cards2, self.system],
       
  2562                      None, {'AD': 'table0.C1', 'AD.name': 'table0.C2',
       
  2563                             'AE': 'table0.C2', 'X': 'table0.C0'},
       
  2564                      []),
       
  2565                     ('OneFetchStep', [('Any X,AD,AE WHERE 999999 multisource_crossed_rel X, AD name AE, AD is State, X is Note',
       
  2566                                        [{'AD': 'State', 'AE': 'String', 'X': 'Note'}])],
       
  2567                      None, None, [self.system],
       
  2568                      {'AD': 'table0.C1', 'AD.name': 'table0.C2', 'AE': 'table0.C2', 'X': 'table0.C0'},
       
  2569                      [])],
       
  2570                    {'x': 999999})
       
  2571 
       
  2572     def test_version_crossed_depends_on_3(self):
       
  2573         self._test('Any X,AD,AE WHERE E multisource_crossed_rel X, X in_state AD, AD name AE, E is Note',
       
  2574                    [('FetchStep', [('Any X,AD,AE WHERE X in_state AD, AD name AE, AD is State, X is Note',
       
  2575                                     [{'AD': 'State', 'AE': 'String', 'X': 'Note'}])],
       
  2576                      [self.cards, self.cards2, self.system],
       
  2577                      None, {'AD': 'table0.C1', 'AD.name': 'table0.C2',
       
  2578                             'AE': 'table0.C2', 'X': 'table0.C0'},
       
  2579                      []),
       
  2580                     ('FetchStep', [('Any E WHERE E is Note', [{'E': 'Note'}])],
       
  2581                      [self.cards, self.cards2, self.system],
       
  2582                      None, {'E': 'table1.C0'},
       
  2583                      []),
       
  2584                     ('UnionStep', None, None,
       
  2585                      [('OneFetchStep', [('Any X,AD,AE WHERE E multisource_crossed_rel X, AD name AE, AD is State, E is Note, X is Note',
       
  2586                                          [{'AD': 'State', 'AE': 'String', 'E': 'Note', 'X': 'Note'}])],
       
  2587                        None, None, [self.cards, self.cards2], None,
       
  2588                        []),
       
  2589                       ('OneFetchStep', [('Any X,AD,AE WHERE E multisource_crossed_rel X, AD name AE, AD is State, E is Note, X is Note',
       
  2590                                          [{'AD': 'State', 'AE': 'String', 'E': 'Note', 'X': 'Note'}])],
       
  2591                        None, None, [self.system],
       
  2592                        {'AD': 'table0.C1',
       
  2593                         'AD.name': 'table0.C2',
       
  2594                         'AE': 'table0.C2',
       
  2595                         'E': 'table1.C0',
       
  2596                         'X': 'table0.C0'},
       
  2597                        [])]
       
  2598                      )]
       
  2599                    )
       
  2600 
       
  2601     def test_version_crossed_depends_on_4(self):
       
  2602         self._test('Any X,AD,AE WHERE EXISTS(E multisource_crossed_rel X), X in_state AD, AD name AE, E is Note',
       
  2603                    [('FetchStep',
       
  2604                      [('Any X,AD,AE WHERE X in_state AD, AD name AE, AD is State, X is Note',
       
  2605                        [{'X': 'Note', 'AD': 'State', 'AE': 'String'}])],
       
  2606                      [self.cards, self.cards2, self.system], None,
       
  2607                      {'X': 'table0.C0',
       
  2608                       'AD': 'table0.C1',
       
  2609                       'AD.name': 'table0.C2',
       
  2610                       'AE': 'table0.C2'},
       
  2611                      []),
       
  2612                     ('FetchStep',
       
  2613                      [('Any A WHERE E multisource_crossed_rel A, A is Note, E is Note',
       
  2614                        [{'A': 'Note', 'E': 'Note'}])],
       
  2615                      [self.cards, self.cards2, self.system], None,
       
  2616                      {'A': 'table1.C0'},
       
  2617                      []),
       
  2618                     ('OneFetchStep',
       
  2619                      [('Any X,AD,AE WHERE EXISTS(X identity A), AD name AE, A is Note, AD is State, X is Note',
       
  2620                        [{'A': 'Note', 'AD': 'State', 'AE': 'String', 'X': 'Note'}])],
       
  2621                      None, None,
       
  2622                      [self.system],
       
  2623                      {'A': 'table1.C0',
       
  2624                       'AD': 'table0.C1',
       
  2625                       'AD.name': 'table0.C2',
       
  2626                       'AE': 'table0.C2',
       
  2627                       'X': 'table0.C0'},
       
  2628                      []
       
  2629                      )]
       
  2630                        )
       
  2631 
       
  2632     def test_nonregr_dont_cross_rel_source_filtering_1(self):
       
  2633         self.repo._type_source_cache[999999] = ('Note', 'cards', 999999, 'cards')
       
  2634         self._test('Any S WHERE E eid %(x)s, E in_state S, NOT S name "moved"',
       
  2635                    [('OneFetchStep', [('Any S WHERE 999999 in_state S, NOT S name "moved", S is State',
       
  2636                                        [{'S': 'State'}])],
       
  2637                      None, None, [self.cards], {}, []
       
  2638                      )],
       
  2639                    {'x': 999999})
       
  2640 
       
  2641     def test_nonregr_dont_cross_rel_source_filtering_2(self):
       
  2642         self.repo._type_source_cache[999999] = ('Note', 'cards', 999999, 'cards')
       
  2643         self._test('Any X,AA,AB WHERE E eid %(x)s, E in_state X, X name AA, X modification_date AB',
       
  2644                    [('OneFetchStep', [('Any X,AA,AB WHERE 999999 in_state X, X name AA, X modification_date AB, X is State',
       
  2645                                        [{'AA': 'String', 'AB': 'Datetime', 'X': 'State'}])],
       
  2646                      None, None, [self.cards], {}, []
       
  2647                      )],
       
  2648                    {'x': 999999})
       
  2649 
       
  2650     def test_nonregr_eid_query(self):
       
  2651         self.repo._type_source_cache[999999] = ('Note', 'cards', 999999, 'cards')
       
  2652         self._test('Any X WHERE X eid 999999',
       
  2653                    [('OneFetchStep', [('Any 999999', [{}])],
       
  2654                      None, None, [self.system], {}, []
       
  2655                      )],
       
  2656                    {'x': 999999})
       
  2657 
       
  2658 
       
  2659     def test_nonregr_not_is(self):
       
  2660         self._test("Any X WHERE X owned_by U, U login 'anon', NOT X is Comment",
       
  2661                    [('FetchStep', [('Any X WHERE X is IN(Card, Note, State)',
       
  2662                                     [{'X': 'Note'}, {'X': 'State'}, {'X': 'Card'}])],
       
  2663                      [self.cards, self.cards2, self.system],
       
  2664                      None, {'X': 'table0.C0'}, []),
       
  2665                     ('UnionStep', None, None,
       
  2666                      [('OneFetchStep',
       
  2667                        [(u'Any X WHERE X owned_by U, U login "anon", U is CWUser, X is IN(Affaire, BaseTransition, Basket, Bookmark, CWAttribute, CWCache, CWConstraint, CWConstraintType, CWDataImport, CWEType, CWGroup, CWPermission, CWProperty, CWRType, CWRelation, CWSource, CWSourceHostConfig, CWSourceSchemaConfig, CWUniqueTogetherConstraint, CWUser, Division, Email, EmailAddress, EmailPart, EmailThread, ExternalUri, File, Folder, Old, Personne, RQLExpression, Societe, SubDivision, SubWorkflowExitPoint, Tag, TrInfo, Transition, Workflow, WorkflowTransition)',
       
  2668                          [{'U': 'CWUser', 'X': 'Affaire'},
       
  2669                           {'U': 'CWUser', 'X': 'BaseTransition'},
       
  2670                           {'U': 'CWUser', 'X': 'Basket'},
       
  2671                           {'U': 'CWUser', 'X': 'Bookmark'},
       
  2672                           {'U': 'CWUser', 'X': 'CWAttribute'},
       
  2673                           {'U': 'CWUser', 'X': 'CWCache'},
       
  2674                           {'U': 'CWUser', 'X': 'CWConstraint'},
       
  2675                           {'U': 'CWUser', 'X': 'CWConstraintType'},
       
  2676                           {'U': 'CWUser', 'X': 'CWDataImport'},
       
  2677                           {'U': 'CWUser', 'X': 'CWEType'},
       
  2678                           {'U': 'CWUser', 'X': 'CWGroup'},
       
  2679                           {'U': 'CWUser', 'X': 'CWPermission'},
       
  2680                           {'U': 'CWUser', 'X': 'CWProperty'},
       
  2681                           {'U': 'CWUser', 'X': 'CWRType'},
       
  2682                           {'U': 'CWUser', 'X': 'CWRelation'},
       
  2683                           {'U': 'CWUser', 'X': 'CWSource'},
       
  2684                           {'U': 'CWUser', 'X': 'CWSourceHostConfig'},
       
  2685                           {'U': 'CWUser', 'X': 'CWSourceSchemaConfig'},
       
  2686                           {'U': 'CWUser', 'X': 'CWUniqueTogetherConstraint'},
       
  2687                           {'U': 'CWUser', 'X': 'CWUser'},
       
  2688                           {'U': 'CWUser', 'X': 'Division'},
       
  2689                           {'U': 'CWUser', 'X': 'Email'},
       
  2690                           {'U': 'CWUser', 'X': 'EmailAddress'},
       
  2691                           {'U': 'CWUser', 'X': 'EmailPart'},
       
  2692                           {'U': 'CWUser', 'X': 'EmailThread'},
       
  2693                           {'U': 'CWUser', 'X': 'ExternalUri'},
       
  2694                           {'U': 'CWUser', 'X': 'File'},
       
  2695                           {'U': 'CWUser', 'X': 'Folder'},
       
  2696                           {'U': 'CWUser', 'X': 'Old'},
       
  2697                           {'U': 'CWUser', 'X': 'Personne'},
       
  2698                           {'U': 'CWUser', 'X': 'RQLExpression'},
       
  2699                           {'U': 'CWUser', 'X': 'Societe'},
       
  2700                           {'U': 'CWUser', 'X': 'SubDivision'},
       
  2701                           {'U': 'CWUser', 'X': 'SubWorkflowExitPoint'},
       
  2702                           {'U': 'CWUser', 'X': 'Tag'},
       
  2703                           {'U': 'CWUser', 'X': 'TrInfo'},
       
  2704                           {'U': 'CWUser', 'X': 'Transition'},
       
  2705                           {'U': 'CWUser', 'X': 'Workflow'},
       
  2706                           {'U': 'CWUser', 'X': 'WorkflowTransition'}])],
       
  2707                        None, None,
       
  2708                        [self.system], {}, []),
       
  2709                       ('OneFetchStep',
       
  2710                        [(u'Any X WHERE X owned_by U, U login "anon", U is CWUser, X is IN(Card, Note, State)',
       
  2711                          [{'U': 'CWUser', 'X': 'Note'},
       
  2712                           {'U': 'CWUser', 'X': 'State'},
       
  2713                           {'U': 'CWUser', 'X': 'Card'}])],
       
  2714                        None, None,
       
  2715                        [self.system], {'X': 'table0.C0'}, [])
       
  2716                       ])
       
  2717                     ])
       
  2718 
       
  2719     def test_remove_from_deleted_source_1(self):
       
  2720         self.repo._type_source_cache[999999] = ('Note', 'cards', 999999, 'cards')
       
  2721         self._test('Note X WHERE X eid 999999, NOT X cw_source Y',
       
  2722                    [('OneFetchStep',
       
  2723                      [('Any 999999 WHERE NOT EXISTS(999999 cw_source Y)',
       
  2724                        [{'Y': 'CWSource'}])],
       
  2725                      None, None, [self.system], {}, [])
       
  2726                     ])
       
  2727 
       
  2728     def test_remove_from_deleted_source_2(self):
       
  2729         self.repo._type_source_cache[999999] = ('Note', 'cards', 999999, 'cards')
       
  2730         self.repo._type_source_cache[999998] = ('Note', 'cards', 999998, 'cards')
       
  2731         self._test('Note X WHERE X eid IN (999998, 999999), NOT X cw_source Y',
       
  2732                    [('FetchStep',
       
  2733                      [('Any X WHERE X eid IN(999998, 999999), X is Note',
       
  2734                        [{'X': 'Note'}])],
       
  2735                      [self.cards], None, {'X': 'table0.C0'}, []),
       
  2736                     ('OneFetchStep',
       
  2737                      [('Any X WHERE NOT EXISTS(X cw_source Y, Y is CWSource), X is Note',
       
  2738                        [{'X': 'Note', 'Y': 'CWSource'}])],
       
  2739                          None, None, [self.system],{'X': 'table0.C0'}, [])
       
  2740                         ])
       
  2741 
       
  2742 
       
  2743 class FakeVCSSource(AbstractSource):
       
  2744     uri = 'ccc'
       
  2745     support_entities = {'Card': True, 'Note': True}
       
  2746     support_relations = {'multisource_inlined_rel': True,
       
  2747                          'multisource_rel': True}
       
  2748 
       
  2749     def syntax_tree_search(self, *args, **kwargs):
       
  2750         return []
       
  2751 
       
  2752 class MSPlannerVCSSource(BasePlannerTC):
       
  2753 
       
  2754     def setUp(self):
       
  2755         self.__class__.repo = repo
       
  2756         self.setup()
       
  2757         self.add_source(FakeVCSSource, 'vcs')
       
  2758         self.planner = MSPlanner(self.o.schema, self.repo.vreg.rqlhelper)
       
  2759     _test = test_plan
       
  2760 
       
  2761     def test_multisource_inlined_rel_skipped(self):
       
  2762         self._test('Any MAX(VC) '
       
  2763                    'WHERE VC multisource_inlined_rel R2, R para %(branch)s, VC in_state S, S name "published", '
       
  2764                    '(EXISTS(R identity R2)) OR (EXISTS(R multisource_rel R2))',
       
  2765                    [('FetchStep', [('Any VC WHERE VC multisource_inlined_rel R2, R para "???", (EXISTS(R identity R2)) OR (EXISTS(R multisource_rel R2)), R is Note, R2 is Note, VC is Note',
       
  2766                                     [{'R': 'Note', 'R2': 'Note', 'VC': 'Note'}])],
       
  2767                      [self.vcs, self.system], None,
       
  2768                      {'VC': 'table0.C0'},
       
  2769                      []),
       
  2770                     ('OneFetchStep', [(u'Any MAX(VC) WHERE VC in_state S, S name "published", S is State, VC is Note',
       
  2771                                        [{'S': 'State', 'VC': 'Note'}])],
       
  2772                      None, None, [self.system],
       
  2773                      {'VC': 'table0.C0'},
       
  2774                      [])
       
  2775                     ])
       
  2776 
       
  2777     def test_fully_simplified_extsource(self):
       
  2778         self.repo._type_source_cache[999998] = ('Note', 'vcs', 999998, 'vcs')
       
  2779         self.repo._type_source_cache[999999] = ('Note', 'vcs', 999999, 'vcs')
       
  2780         self._test('Any X, Y WHERE NOT X multisource_rel Y, X eid 999998, Y eid 999999',
       
  2781                    [('OneFetchStep', [('Any 999998,999999 WHERE NOT EXISTS(999998 multisource_rel 999999)', [{}])],
       
  2782                      None, None, [self.vcs], {}, [])
       
  2783                     ])
       
  2784 
       
  2785     def test_nonregr_fully_simplified_extsource(self):
       
  2786         self.repo._type_source_cache[999998] = ('Note', 'vcs', 999998, 'vcs')
       
  2787         self.repo._type_source_cache[999999] = ('Note', 'vcs', 999999, 'vcs')
       
  2788         self.repo._type_source_cache[1000000] = ('Note', 'system', 1000000, 'system')
       
  2789         self._test('DISTINCT Any T,FALSE,L,M WHERE L eid 1000000, M eid 999999, T eid 999998',
       
  2790                    [('OneFetchStep', [('DISTINCT Any 999998,FALSE,1000000,999999', [{}])],
       
  2791                      None, None, [self.system], {}, [])
       
  2792                     ])
       
  2793 
       
  2794 
       
  2795 if __name__ == '__main__':
       
  2796     from logilab.common.testlib import unittest_main
       
  2797     unittest_main()