cubicweb/server/test/unittest_querier.py
changeset 11057 0b59724cb3f2
parent 11035 0fb100e8385b
child 11269 73ac69970047
equal deleted inserted replaced
11052:058bb3dc685f 11057:0b59724cb3f2
       
     1 # -*- coding: iso-8859-1 -*-
       
     2 # copyright 2003-2014 LOGILAB S.A. (Paris, FRANCE), all rights reserved.
       
     3 # contact http://www.logilab.fr/ -- mailto:contact@logilab.fr
       
     4 #
       
     5 # This file is part of CubicWeb.
       
     6 #
       
     7 # CubicWeb is free software: you can redistribute it and/or modify it under the
       
     8 # terms of the GNU Lesser General Public License as published by the Free
       
     9 # Software Foundation, either version 2.1 of the License, or (at your option)
       
    10 # any later version.
       
    11 #
       
    12 # CubicWeb is distributed in the hope that it will be useful, but WITHOUT
       
    13 # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
       
    14 # FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for more
       
    15 # details.
       
    16 #
       
    17 # You should have received a copy of the GNU Lesser General Public License along
       
    18 # with CubicWeb.  If not, see <http://www.gnu.org/licenses/>.
       
    19 """unit tests for modules cubicweb.server.querier and cubicweb.server.ssplanner
       
    20 """
       
    21 
       
    22 from datetime import date, datetime, timedelta, tzinfo
       
    23 
       
    24 import pytz
       
    25 
       
    26 from six import PY2, integer_types, binary_type, text_type
       
    27 
       
    28 from logilab.common.testlib import TestCase, unittest_main
       
    29 from rql import BadRQLQuery
       
    30 
       
    31 from cubicweb import QueryError, Unauthorized, Binary
       
    32 from cubicweb.server.sqlutils import SQL_PREFIX
       
    33 from cubicweb.server.utils import crypt_password
       
    34 from cubicweb.server.querier import manual_build_descr, _make_description
       
    35 from cubicweb.devtools import get_test_db_handler, TestServerConfiguration
       
    36 from cubicweb.devtools.testlib import CubicWebTC
       
    37 from cubicweb.devtools.repotest import tuplify, BaseQuerierTC
       
    38 
       
    39 
       
    40 class FixedOffset(tzinfo):
       
    41     def __init__(self, hours=0):
       
    42         self.hours = hours
       
    43     def utcoffset(self, dt):
       
    44         return timedelta(hours=self.hours)
       
    45     def dst(self, dt):
       
    46         return timedelta(0)
       
    47 
       
    48 
       
    49 # register priority/severity sorting registered procedure
       
    50 from rql.utils import register_function, FunctionDescr
       
    51 
       
    52 class group_sort_value(FunctionDescr):
       
    53     supported_backends = ('sqlite',)
       
    54     rtype = 'Int'
       
    55 try:
       
    56     register_function(group_sort_value)
       
    57 except AssertionError:
       
    58     pass
       
    59 from cubicweb.server.sqlutils import SQL_CONNECT_HOOKS
       
    60 def init_sqlite_connexion(cnx):
       
    61     def group_sort_value(text):
       
    62         return {"managers": "3", "users": "2", "guests":  "1", "owners": "0"}[text]
       
    63     cnx.create_function("GROUP_SORT_VALUE", 1, group_sort_value)
       
    64 SQL_CONNECT_HOOKS['sqlite'].append(init_sqlite_connexion)
       
    65 
       
    66 
       
    67 def setUpClass(cls, *args):
       
    68     global repo, cnx
       
    69     config = TestServerConfiguration(apphome=UtilsTC.datadir)
       
    70     handler = get_test_db_handler(config)
       
    71     handler.build_db_cache()
       
    72     repo, cnx = handler.get_repo_and_cnx()
       
    73     cls.repo = repo
       
    74 
       
    75 def tearDownClass(cls, *args):
       
    76     global repo, cnx
       
    77     repo.shutdown()
       
    78     del repo, cnx
       
    79 
       
    80 
       
    81 class Variable:
       
    82     def __init__(self, name):
       
    83         self.name = name
       
    84         self.children = []
       
    85 
       
    86     def get_type(self, solution, args=None):
       
    87         return solution[self.name]
       
    88     def as_string(self):
       
    89         return self.name
       
    90 
       
    91 class Function:
       
    92     def __init__(self, name, varname):
       
    93         self.name = name
       
    94         self.children = [Variable(varname)]
       
    95     def get_type(self, solution, args=None):
       
    96         return 'Int'
       
    97 
       
    98 class MakeDescriptionTC(TestCase):
       
    99     def test_known_values(self):
       
   100         solution = {'A': 'Int', 'B': 'CWUser'}
       
   101         self.assertEqual(_make_description((Function('max', 'A'), Variable('B')), {}, solution),
       
   102                           ['Int','CWUser'])
       
   103 
       
   104 
       
   105 class UtilsTC(BaseQuerierTC):
       
   106     setUpClass = classmethod(setUpClass)
       
   107     tearDownClass = classmethod(tearDownClass)
       
   108 
       
   109     def get_max_eid(self):
       
   110         # no need for cleanup here
       
   111         return None
       
   112     def cleanup(self):
       
   113         # no need for cleanup here
       
   114         pass
       
   115 
       
   116     def test_preprocess_1(self):
       
   117         with self.session.new_cnx() as cnx:
       
   118             reid = cnx.execute('Any X WHERE X is CWRType, X name "owned_by"')[0][0]
       
   119             rqlst = self._prepare(cnx, 'Any COUNT(RDEF) WHERE RDEF relation_type X, X eid %(x)s',
       
   120                                   {'x': reid})
       
   121             self.assertEqual([{'RDEF': 'CWAttribute'}, {'RDEF': 'CWRelation'}],
       
   122                              rqlst.solutions)
       
   123 
       
   124     def test_preprocess_2(self):
       
   125         with self.session.new_cnx() as cnx:
       
   126             teid = cnx.execute("INSERT Tag X: X name 'tag'")[0][0]
       
   127             #geid = self.execute("CWGroup G WHERE G name 'users'")[0][0]
       
   128             #self.execute("SET X tags Y WHERE X eid %(t)s, Y eid %(g)s",
       
   129             #             {'g': geid, 't': teid}, 'g')
       
   130             rqlst = self._prepare(cnx, 'Any X WHERE E eid %(x)s, E tags X', {'x': teid})
       
   131             # the query may be optimized, should keep only one solution
       
   132             # (any one, etype will be discarded)
       
   133             self.assertEqual(1, len(rqlst.solutions))
       
   134 
       
   135     def assertRQLEqual(self, expected, got):
       
   136         from rql import parse
       
   137         self.assertMultiLineEqual(text_type(parse(expected)),
       
   138                                   text_type(parse(got)))
       
   139 
       
   140     def test_preprocess_security(self):
       
   141         s = self.user_groups_session('users')
       
   142         with s.new_cnx() as cnx:
       
   143             plan = self._prepare_plan(cnx, 'Any ETN,COUNT(X) GROUPBY ETN '
       
   144                                       'WHERE X is ET, ET name ETN')
       
   145             union = plan.rqlst
       
   146             plan.preprocess(union)
       
   147             self.assertEqual(len(union.children), 1)
       
   148             self.assertEqual(len(union.children[0].with_), 1)
       
   149             subq = union.children[0].with_[0].query
       
   150             self.assertEqual(len(subq.children), 4)
       
   151             self.assertEqual([t.as_string() for t in union.children[0].selection],
       
   152                               ['ETN','COUNT(X)'])
       
   153             self.assertEqual([t.as_string() for t in union.children[0].groupby],
       
   154                               ['ETN'])
       
   155             partrqls = sorted(((rqlst.as_string(), rqlst.solutions) for rqlst in subq.children))
       
   156             rql, solutions = partrqls[0]
       
   157             self.assertRQLEqual(rql,
       
   158                                 'Any ETN,X WHERE X is ET, ET name ETN, (EXISTS(X owned_by %(B)s))'
       
   159                                 ' OR ((((EXISTS(D concerne C?, C owned_by %(B)s, '
       
   160                                 '               X identity D, C is Division, D is Affaire))'
       
   161                                 ' OR (EXISTS(H concerne G?, G owned_by %(B)s, G is SubDivision, '
       
   162                                 '            X identity H, H is Affaire)))'
       
   163                                 ' OR (EXISTS(I concerne F?, F owned_by %(B)s, F is Societe, '
       
   164                                 '            X identity I, I is Affaire)))'
       
   165                                 ' OR (EXISTS(J concerne E?, E owned_by %(B)s, E is Note, '
       
   166                                 '            X identity J, J is Affaire)))'
       
   167                                 ', ET is CWEType, X is Affaire')
       
   168             self.assertEqual(solutions, [{'C': 'Division',
       
   169                                            'D': 'Affaire',
       
   170                                            'E': 'Note',
       
   171                                            'F': 'Societe',
       
   172                                            'G': 'SubDivision',
       
   173                                            'H': 'Affaire',
       
   174                                            'I': 'Affaire',
       
   175                                            'J': 'Affaire',
       
   176                                            'X': 'Affaire',
       
   177                                            'ET': 'CWEType', 'ETN': 'String'}])
       
   178             rql, solutions = partrqls[1]
       
   179             self.assertRQLEqual(rql,  'Any ETN,X WHERE X is ET, ET name ETN, ET is CWEType, '
       
   180                                 'X is IN(BaseTransition, Bookmark, CWAttribute, CWCache, CWComputedRType, '
       
   181                                 '        CWConstraint, CWConstraintType, CWEType, CWGroup, CWPermission, CWProperty, '
       
   182                                 '        CWRType, CWRelation, CWSource, CWUniqueTogetherConstraint, CWUser, Card, '
       
   183                                 '        Comment, Division, Email, EmailPart, EmailThread, ExternalUri, File, Folder, '
       
   184                                 '        Frozable, Note, Old, Personne, RQLExpression, Societe, State, SubDivision, '
       
   185                                 '        SubWorkflowExitPoint, Tag, TrInfo, Transition, Workflow, WorkflowTransition)')
       
   186             self.assertCountEqual(solutions,
       
   187                                   [{'X': 'BaseTransition', 'ETN': 'String', 'ET': 'CWEType'},
       
   188                                    {'X': 'Bookmark', 'ETN': 'String', 'ET': 'CWEType'},
       
   189                                    {'X': 'Card', 'ETN': 'String', 'ET': 'CWEType'},
       
   190                                    {'X': 'Comment', 'ETN': 'String', 'ET': 'CWEType'},
       
   191                                    {'X': 'Division', 'ETN': 'String', 'ET': 'CWEType'},
       
   192                                    {'X': 'CWCache', 'ETN': 'String', 'ET': 'CWEType'},
       
   193                                    {'X': 'CWComputedRType', 'ETN': 'String', 'ET': 'CWEType'},
       
   194                                    {'X': 'CWConstraint', 'ETN': 'String', 'ET': 'CWEType'},
       
   195                                    {'X': 'CWConstraintType', 'ETN': 'String', 'ET': 'CWEType'},
       
   196                                    {'X': 'CWEType', 'ETN': 'String', 'ET': 'CWEType'},
       
   197                                    {'X': 'CWAttribute', 'ETN': 'String', 'ET': 'CWEType'},
       
   198                                    {'X': 'CWGroup', 'ETN': 'String', 'ET': 'CWEType'},
       
   199                                    {'X': 'CWRelation', 'ETN': 'String', 'ET': 'CWEType'},
       
   200                                    {'X': 'CWPermission', 'ETN': 'String', 'ET': 'CWEType'},
       
   201                                    {'X': 'CWProperty', 'ETN': 'String', 'ET': 'CWEType'},
       
   202                                    {'X': 'CWRType', 'ETN': 'String', 'ET': 'CWEType'},
       
   203                                    {'X': 'CWSource', 'ETN': 'String', 'ET': 'CWEType'},
       
   204                                    {'X': 'CWUniqueTogetherConstraint', 'ETN': 'String', 'ET': 'CWEType'},
       
   205                                    {'X': 'CWUser', 'ETN': 'String', 'ET': 'CWEType'},
       
   206                                    {'X': 'Email', 'ETN': 'String', 'ET': 'CWEType'},
       
   207                                    {'X': 'EmailPart', 'ETN': 'String', 'ET': 'CWEType'},
       
   208                                    {'X': 'EmailThread', 'ETN': 'String', 'ET': 'CWEType'},
       
   209                                    {'X': 'ExternalUri', 'ETN': 'String', 'ET': 'CWEType'},
       
   210                                    {'X': 'File', 'ETN': 'String', 'ET': 'CWEType'},
       
   211                                    {'X': 'Folder', 'ETN': 'String', 'ET': 'CWEType'},
       
   212                                    {'X': 'Frozable', 'ETN': 'String', 'ET': 'CWEType'},
       
   213                                    {'X': 'Note', 'ETN': 'String', 'ET': 'CWEType'},
       
   214                                    {'X': 'Old', 'ETN': 'String', 'ET': 'CWEType'},
       
   215                                    {'X': 'Personne', 'ETN': 'String', 'ET': 'CWEType'},
       
   216                                    {'X': 'RQLExpression', 'ETN': 'String', 'ET': 'CWEType'},
       
   217                                    {'X': 'Societe', 'ETN': 'String', 'ET': 'CWEType'},
       
   218                                    {'X': 'State', 'ETN': 'String', 'ET': 'CWEType'},
       
   219                                    {'X': 'SubDivision', 'ETN': 'String', 'ET': 'CWEType'},
       
   220                                    {'X': 'SubWorkflowExitPoint', 'ETN': 'String', 'ET': 'CWEType'},
       
   221                                    {'X': 'Tag', 'ETN': 'String', 'ET': 'CWEType'},
       
   222                                    {'X': 'Transition', 'ETN': 'String', 'ET': 'CWEType'},
       
   223                                    {'X': 'TrInfo', 'ETN': 'String', 'ET': 'CWEType'},
       
   224                                    {'X': 'Workflow', 'ETN': 'String', 'ET': 'CWEType'},
       
   225                                    {'X': 'WorkflowTransition', 'ETN': 'String', 'ET': 'CWEType'}])
       
   226             rql, solutions = partrqls[2]
       
   227             self.assertEqual(rql,
       
   228                              'Any ETN,X WHERE X is ET, ET name ETN, EXISTS(%(D)s use_email X), '
       
   229                              'ET is CWEType, X is EmailAddress')
       
   230             self.assertEqual(solutions, [{'X': 'EmailAddress', 'ET': 'CWEType', 'ETN': 'String'}])
       
   231             rql, solutions = partrqls[3]
       
   232             self.assertEqual(rql,
       
   233                               'Any ETN,X WHERE X is ET, ET name ETN, EXISTS(X owned_by %(C)s), '
       
   234                               'ET is CWEType, X is Basket')
       
   235             self.assertEqual(solutions, [{'X': 'Basket', 'ET': 'CWEType', 'ETN': 'String'}])
       
   236 
       
   237     def test_preprocess_security_aggregat(self):
       
   238         s = self.user_groups_session('users')
       
   239         with s.new_cnx() as cnx:
       
   240             plan = self._prepare_plan(cnx, 'Any MAX(X)')
       
   241             union = plan.rqlst
       
   242             plan.preprocess(union)
       
   243             self.assertEqual(len(union.children), 1)
       
   244             self.assertEqual(len(union.children[0].with_), 1)
       
   245             subq = union.children[0].with_[0].query
       
   246             self.assertEqual(len(subq.children), 4)
       
   247             self.assertEqual([t.as_string() for t in union.children[0].selection],
       
   248                               ['MAX(X)'])
       
   249 
       
   250     def test_preprocess_nonregr(self):
       
   251         with self.session.new_cnx() as cnx:
       
   252             rqlst = self._prepare(cnx, 'Any S ORDERBY SI WHERE NOT S ecrit_par O, S para SI')
       
   253             self.assertEqual(len(rqlst.solutions), 1)
       
   254 
       
   255     def test_build_description(self):
       
   256         # should return an empty result set
       
   257         rset = self.qexecute('Any X WHERE X eid %(x)s', {'x': self.session.user.eid})
       
   258         self.assertEqual(rset.description[0][0], 'CWUser')
       
   259         rset = self.qexecute('Any 1')
       
   260         self.assertEqual(rset.description[0][0], 'Int')
       
   261         rset = self.qexecute('Any TRUE')
       
   262         self.assertEqual(rset.description[0][0], 'Boolean')
       
   263         rset = self.qexecute('Any "hop"')
       
   264         self.assertEqual(rset.description[0][0], 'String')
       
   265         rset = self.qexecute('Any TODAY')
       
   266         self.assertEqual(rset.description[0][0], 'Date')
       
   267         rset = self.qexecute('Any NOW')
       
   268         self.assertEqual(rset.description[0][0], 'Datetime')
       
   269         rset = self.qexecute('Any %(x)s', {'x': 1})
       
   270         self.assertEqual(rset.description[0][0], 'Int')
       
   271         if PY2:
       
   272             rset = self.qexecute('Any %(x)s', {'x': long(1)})
       
   273             self.assertEqual(rset.description[0][0], 'Int')
       
   274         rset = self.qexecute('Any %(x)s', {'x': True})
       
   275         self.assertEqual(rset.description[0][0], 'Boolean')
       
   276         rset = self.qexecute('Any %(x)s', {'x': 1.0})
       
   277         self.assertEqual(rset.description[0][0], 'Float')
       
   278         rset = self.qexecute('Any %(x)s', {'x': datetime.now()})
       
   279         self.assertEqual(rset.description[0][0], 'Datetime')
       
   280         rset = self.qexecute('Any %(x)s', {'x': 'str'})
       
   281         self.assertEqual(rset.description[0][0], 'String')
       
   282         rset = self.qexecute('Any %(x)s', {'x': u'str'})
       
   283         self.assertEqual(rset.description[0][0], 'String')
       
   284 
       
   285     def test_build_descr1(self):
       
   286         with self.session.new_cnx() as cnx:
       
   287             rset = cnx.execute('(Any U,L WHERE U login L) UNION '
       
   288                                '(Any G,N WHERE G name N, G is CWGroup)')
       
   289             # rset.req = self.session
       
   290             orig_length = len(rset)
       
   291             rset.rows[0][0] = 9999999
       
   292             description = manual_build_descr(cnx, rset.syntax_tree(), None, rset.rows)
       
   293             self.assertEqual(len(description), orig_length - 1)
       
   294             self.assertEqual(len(rset.rows), orig_length - 1)
       
   295             self.assertNotEqual(rset.rows[0][0], 9999999)
       
   296 
       
   297     def test_build_descr2(self):
       
   298         rset = self.qexecute('Any X,Y WITH X,Y BEING ((Any G,NULL WHERE G is CWGroup) UNION '
       
   299                              '(Any U,G WHERE U in_group G))')
       
   300         for x, y in rset.description:
       
   301             if y is not None:
       
   302                 self.assertEqual(y, 'CWGroup')
       
   303 
       
   304     def test_build_descr3(self):
       
   305         rset = self.qexecute('(Any G,NULL WHERE G is CWGroup) UNION '
       
   306                              '(Any U,G WHERE U in_group G)')
       
   307         for x, y in rset.description:
       
   308             if y is not None:
       
   309                 self.assertEqual(y, 'CWGroup')
       
   310 
       
   311 
       
   312 class QuerierTC(BaseQuerierTC):
       
   313     setUpClass = classmethod(setUpClass)
       
   314     tearDownClass = classmethod(tearDownClass)
       
   315 
       
   316     def test_unknown_eid(self):
       
   317         # should return an empty result set
       
   318         self.assertFalse(self.qexecute('Any X WHERE X eid 99999999'))
       
   319 
       
   320     def test_typed_eid(self):
       
   321         # should return an empty result set
       
   322         rset = self.qexecute('Any X WHERE X eid %(x)s', {'x': '1'})
       
   323         self.assertIsInstance(rset[0][0], integer_types)
       
   324 
       
   325     def test_bytes_storage(self):
       
   326         feid = self.qexecute('INSERT File X: X data_name "foo.pdf", '
       
   327                              'X data_format "text/plain", X data %(data)s',
       
   328                             {'data': Binary(b"xxx")})[0][0]
       
   329         fdata = self.qexecute('Any D WHERE X data D, X eid %(x)s', {'x': feid})[0][0]
       
   330         self.assertIsInstance(fdata, Binary)
       
   331         self.assertEqual(fdata.getvalue(), b'xxx')
       
   332 
       
   333     # selection queries tests #################################################
       
   334 
       
   335     def test_select_1(self):
       
   336         rset = self.qexecute('Any X ORDERBY X WHERE X is CWGroup')
       
   337         result, descr = rset.rows, rset.description
       
   338         self.assertEqual(tuplify(result), [(2,), (3,), (4,), (5,)])
       
   339         self.assertEqual(descr, [('CWGroup',), ('CWGroup',), ('CWGroup',), ('CWGroup',)])
       
   340 
       
   341     def test_select_2(self):
       
   342         rset = self.qexecute('Any X ORDERBY N WHERE X is CWGroup, X name N')
       
   343         self.assertEqual(tuplify(rset.rows), [(2,), (3,), (4,), (5,)])
       
   344         self.assertEqual(rset.description, [('CWGroup',), ('CWGroup',), ('CWGroup',), ('CWGroup',)])
       
   345         rset = self.qexecute('Any X ORDERBY N DESC WHERE X is CWGroup, X name N')
       
   346         self.assertEqual(tuplify(rset.rows), [(5,), (4,), (3,), (2,)])
       
   347 
       
   348     def test_select_3(self):
       
   349         rset = self.qexecute('Any N GROUPBY N WHERE X is CWGroup, X name N')
       
   350         result, descr = rset.rows, rset.description
       
   351         result.sort()
       
   352         self.assertEqual(tuplify(result), [('guests',), ('managers',), ('owners',), ('users',)])
       
   353         self.assertEqual(descr, [('String',), ('String',), ('String',), ('String',)])
       
   354 
       
   355     def test_select_is(self):
       
   356         rset = self.qexecute('Any X, TN ORDERBY TN LIMIT 10 WHERE X is T, T name TN')
       
   357         result, descr = rset.rows, rset.description
       
   358         self.assertEqual(result[0][1], descr[0][0])
       
   359 
       
   360     def test_select_is_aggr(self):
       
   361         rset = self.qexecute('Any TN, COUNT(X) GROUPBY TN ORDERBY 2 DESC WHERE X is T, T name TN')
       
   362         result, descr = rset.rows, rset.description
       
   363         self.assertEqual(descr[0][0], 'String')
       
   364         self.assertEqual(descr[0][1], 'Int')
       
   365         self.assertEqual(result[0][0], 'RQLExpression') # XXX may change as schema evolve
       
   366 
       
   367     def test_select_groupby_orderby(self):
       
   368         rset = self.qexecute('Any N GROUPBY N ORDERBY N WHERE X is CWGroup, X name N')
       
   369         self.assertEqual(tuplify(rset.rows), [('guests',), ('managers',), ('owners',), ('users',)])
       
   370         self.assertEqual(rset.description, [('String',), ('String',), ('String',), ('String',)])
       
   371 
       
   372     def test_select_complex_groupby(self):
       
   373         rset = self.qexecute('Any N GROUPBY N WHERE X name N')
       
   374         rset = self.qexecute('Any N,MAX(D) GROUPBY N LIMIT 5 WHERE X name N, X creation_date D')
       
   375 
       
   376     def test_select_inlined_groupby(self):
       
   377         seid = self.qexecute('State X WHERE X name "deactivated"')[0][0]
       
   378         rset = self.qexecute('Any U,L,S GROUPBY U,L,S WHERE X in_state S, U login L, S eid %s' % seid)
       
   379 
       
   380     def test_select_groupby_funccall(self):
       
   381         rset = self.qexecute('Any YEAR(CD), COUNT(X) GROUPBY YEAR(CD) '
       
   382                              'WHERE X is CWUser, X creation_date CD')
       
   383         self.assertListEqual(rset.rows, [[date.today().year, 2]])
       
   384 
       
   385     def test_select_groupby_colnumber(self):
       
   386         rset = self.qexecute('Any YEAR(CD), COUNT(X) GROUPBY 1 '
       
   387                              'WHERE X is CWUser, X creation_date CD')
       
   388         self.assertListEqual(rset.rows, [[date.today().year, 2]])
       
   389 
       
   390     def test_select_complex_orderby(self):
       
   391         rset1 = self.qexecute('Any N ORDERBY N WHERE X name N')
       
   392         self.assertEqual(sorted(rset1.rows), rset1.rows)
       
   393         rset = self.qexecute('Any N ORDERBY N LIMIT 5 OFFSET 1 WHERE X name N')
       
   394         self.assertEqual(rset.rows[0][0], rset1.rows[1][0])
       
   395         self.assertEqual(len(rset), 5)
       
   396 
       
   397     def test_select_5(self):
       
   398         rset = self.qexecute('Any X, TMP ORDERBY TMP WHERE X name TMP, X is CWGroup')
       
   399         self.assertEqual(tuplify(rset.rows),
       
   400                          [(2, 'guests',),
       
   401                           (3, 'managers',),
       
   402                           (4, 'owners',),
       
   403                           (5, 'users',)])
       
   404         self.assertEqual(rset.description,
       
   405                          [('CWGroup', 'String',),
       
   406                           ('CWGroup', 'String',),
       
   407                           ('CWGroup', 'String',),
       
   408                           ('CWGroup', 'String',)])
       
   409 
       
   410     def test_select_6(self):
       
   411         self.qexecute("INSERT Personne X: X nom 'bidule'")[0]
       
   412         rset = self.qexecute('Any Y where X name TMP, Y nom in (TMP, "bidule")')
       
   413         #self.assertEqual(rset.description, [('Personne',), ('Personne',)])
       
   414         self.assertIn(('Personne',), rset.description)
       
   415         rset = self.qexecute('DISTINCT Any Y where X name TMP, Y nom in (TMP, "bidule")')
       
   416         self.assertIn(('Personne',), rset.description)
       
   417 
       
   418     def test_select_not_attr(self):
       
   419         peid = self.qexecute("INSERT Personne X: X nom 'bidule'")[0][0]
       
   420         seid = self.qexecute("INSERT Societe X: X nom 'chouette'")[0][0]
       
   421         rset = self.qexecute('Personne X WHERE NOT X nom "bidule"')
       
   422         self.assertEqual(len(rset.rows), 0, rset.rows)
       
   423         rset = self.qexecute('Personne X WHERE NOT X nom "bid"')
       
   424         self.assertEqual(len(rset.rows), 1, rset.rows)
       
   425         self.qexecute("SET P travaille S WHERE P nom 'bidule', S nom 'chouette'")
       
   426         rset = self.qexecute('Personne X WHERE NOT X travaille S')
       
   427         self.assertEqual(len(rset.rows), 0, rset.rows)
       
   428 
       
   429     def test_select_is_in(self):
       
   430         self.qexecute("INSERT Personne X: X nom 'bidule'")
       
   431         self.qexecute("INSERT Societe X: X nom 'chouette'")
       
   432         self.assertEqual(len(self.qexecute("Any X WHERE X is IN (Personne, Societe)")),
       
   433                           2)
       
   434 
       
   435     def test_select_not_rel(self):
       
   436         self.qexecute("INSERT Personne X: X nom 'bidule'")
       
   437         self.qexecute("INSERT Societe X: X nom 'chouette'")
       
   438         self.qexecute("INSERT Personne X: X nom 'autre'")
       
   439         self.qexecute("SET P travaille S WHERE P nom 'bidule', S nom 'chouette'")
       
   440         rset = self.qexecute('Personne X WHERE NOT X travaille S')
       
   441         self.assertEqual(len(rset.rows), 1, rset.rows)
       
   442         rset = self.qexecute('Personne X WHERE NOT X travaille S, S nom "chouette"')
       
   443         self.assertEqual(len(rset.rows), 1, rset.rows)
       
   444 
       
   445     def test_select_nonregr_inlined(self):
       
   446         self.qexecute("INSERT Note X: X para 'bidule'")
       
   447         self.qexecute("INSERT Personne X: X nom 'chouette'")
       
   448         self.qexecute("INSERT Personne X: X nom 'autre'")
       
   449         self.qexecute("SET X ecrit_par P WHERE X para 'bidule', P nom 'chouette'")
       
   450         rset = self.qexecute('Any U,T ORDERBY T DESC WHERE U is CWUser, '
       
   451                              'N ecrit_par U, N type T')#, {'x': self.ueid})
       
   452         self.assertEqual(len(rset.rows), 0)
       
   453 
       
   454     def test_select_nonregr_edition_not(self):
       
   455         groupeids = set((2, 3, 4))
       
   456         groupreadperms = set(r[0] for r in self.qexecute('Any Y WHERE X name "CWGroup", '
       
   457                                                          'Y eid IN(2, 3, 4), X read_permission Y'))
       
   458         rset = self.qexecute('DISTINCT Any Y WHERE X is CWEType, X name "CWGroup", '
       
   459                              'Y eid IN(2, 3, 4), NOT X read_permission Y')
       
   460         self.assertEqual(sorted(r[0] for r in rset.rows), sorted(groupeids - groupreadperms))
       
   461         rset = self.qexecute('DISTINCT Any Y WHERE X name "CWGroup", '
       
   462                              'Y eid IN(2, 3, 4), NOT X read_permission Y')
       
   463         self.assertEqual(sorted(r[0] for r in rset.rows), sorted(groupeids - groupreadperms))
       
   464 
       
   465     def test_select_outer_join(self):
       
   466         peid1 = self.qexecute("INSERT Personne X: X nom 'bidule'")[0][0]
       
   467         peid2 = self.qexecute("INSERT Personne X: X nom 'autre'")[0][0]
       
   468         seid1 = self.qexecute("INSERT Societe X: X nom 'chouette'")[0][0]
       
   469         seid2 = self.qexecute("INSERT Societe X: X nom 'chouetos'")[0][0]
       
   470         rset = self.qexecute('Any X,S ORDERBY X WHERE X travaille S?')
       
   471         self.assertEqual(rset.rows, [[peid1, None], [peid2, None]])
       
   472         self.qexecute("SET P travaille S WHERE P nom 'bidule', S nom 'chouette'")
       
   473         rset = self.qexecute('Any X,S ORDERBY X WHERE X travaille S?')
       
   474         self.assertEqual(rset.rows, [[peid1, seid1], [peid2, None]])
       
   475         rset = self.qexecute('Any S,X ORDERBY S WHERE X? travaille S')
       
   476         self.assertEqual(rset.rows, [[seid1, peid1], [seid2, None]])
       
   477 
       
   478     def test_select_outer_join_optimized(self):
       
   479         peid1 = self.qexecute("INSERT Personne X: X nom 'bidule'")[0][0]
       
   480         rset = self.qexecute('Any X WHERE X eid %(x)s, P? connait X', {'x':peid1})
       
   481         self.assertEqual(rset.rows, [[peid1]])
       
   482         rset = self.qexecute('Any X WHERE X eid %(x)s, X require_permission P?',
       
   483                             {'x':peid1})
       
   484         self.assertEqual(rset.rows, [[peid1]])
       
   485 
       
   486     def test_select_left_outer_join(self):
       
   487         rset = self.qexecute('DISTINCT Any G WHERE U? in_group G')
       
   488         self.assertEqual(len(rset), 4)
       
   489         rset = self.qexecute('DISTINCT Any G WHERE U? in_group G, U eid %(x)s',
       
   490                             {'x': self.session.user.eid})
       
   491         self.assertEqual(len(rset), 4)
       
   492 
       
   493     def test_select_ambigous_outer_join(self):
       
   494         teid = self.qexecute("INSERT Tag X: X name 'tag'")[0][0]
       
   495         self.qexecute("INSERT Tag X: X name 'tagbis'")[0][0]
       
   496         geid = self.qexecute("CWGroup G WHERE G name 'users'")[0][0]
       
   497         self.qexecute("SET X tags Y WHERE X eid %(t)s, Y eid %(g)s",
       
   498                      {'g': geid, 't': teid})
       
   499         rset = self.qexecute("Any GN,TN ORDERBY GN WHERE T? tags G, T name TN, G name GN")
       
   500         self.assertIn(['users', 'tag'], rset.rows)
       
   501         self.assertIn(['activated', None], rset.rows)
       
   502         rset = self.qexecute("Any GN,TN ORDERBY GN WHERE T tags G?, T name TN, G name GN")
       
   503         self.assertEqual(rset.rows, [[None, 'tagbis'], ['users', 'tag']])
       
   504 
       
   505     def test_select_not_inline_rel(self):
       
   506         self.qexecute("INSERT Personne X: X nom 'bidule'")
       
   507         self.qexecute("INSERT Note X: X type 'a'")
       
   508         self.qexecute("INSERT Note X: X type 'b'")
       
   509         self.qexecute("SET X ecrit_par Y WHERE X type 'a', Y nom 'bidule'")
       
   510         rset = self.qexecute('Note X WHERE NOT X ecrit_par P')
       
   511         self.assertEqual(len(rset.rows), 1, rset.rows)
       
   512 
       
   513     def test_select_not_unlinked_multiple_solutions(self):
       
   514         self.qexecute("INSERT Personne X: X nom 'bidule'")
       
   515         self.qexecute("INSERT Note X: X type 'a'")
       
   516         self.qexecute("INSERT Note X: X type 'b'")
       
   517         self.qexecute("SET Y evaluee X WHERE X type 'a', Y nom 'bidule'")
       
   518         rset = self.qexecute('Note X WHERE NOT Y evaluee X')
       
   519         self.assertEqual(len(rset.rows), 1, rset.rows)
       
   520 
       
   521     def test_select_date_extraction(self):
       
   522         self.qexecute("INSERT Personne X: X nom 'foo', X datenaiss %(d)s",
       
   523                      {'d': datetime(2001, 2,3, 12,13)})
       
   524         test_data = [('YEAR', 2001), ('MONTH', 2), ('DAY', 3),
       
   525                      ('HOUR', 12), ('MINUTE', 13), ('WEEKDAY', 6)]
       
   526         for funcname, result in test_data:
       
   527             rset = self.qexecute('Any %s(D) WHERE X is Personne, X datenaiss D'
       
   528                                 % funcname)
       
   529             self.assertEqual(len(rset.rows), 1)
       
   530             self.assertEqual(rset.rows[0][0], result)
       
   531             self.assertEqual(rset.description, [('Int',)])
       
   532 
       
   533     def test_regexp_based_pattern_matching(self):
       
   534         peid1 = self.qexecute("INSERT Personne X: X nom 'bidule'")[0][0]
       
   535         peid2 = self.qexecute("INSERT Personne X: X nom 'cidule'")[0][0]
       
   536         rset = self.qexecute('Any X WHERE X is Personne, X nom REGEXP "^b"')
       
   537         self.assertEqual(len(rset.rows), 1, rset.rows)
       
   538         self.assertEqual(rset.rows[0][0], peid1)
       
   539         rset = self.qexecute('Any X WHERE X is Personne, X nom REGEXP "idu"')
       
   540         self.assertEqual(len(rset.rows), 2, rset.rows)
       
   541 
       
   542     def test_select_aggregat_count(self):
       
   543         rset = self.qexecute('Any COUNT(X)')
       
   544         self.assertEqual(len(rset.rows), 1)
       
   545         self.assertEqual(len(rset.rows[0]), 1)
       
   546         self.assertEqual(rset.description, [('Int',)])
       
   547 
       
   548     def test_select_aggregat_sum(self):
       
   549         rset = self.qexecute('Any SUM(O) WHERE X ordernum O')
       
   550         self.assertEqual(len(rset.rows), 1)
       
   551         self.assertEqual(len(rset.rows[0]), 1)
       
   552         self.assertEqual(rset.description, [('Int',)])
       
   553 
       
   554     def test_select_aggregat_min(self):
       
   555         rset = self.qexecute('Any MIN(X) WHERE X is Personne')
       
   556         self.assertEqual(len(rset.rows), 1)
       
   557         self.assertEqual(len(rset.rows[0]), 1)
       
   558         self.assertEqual(rset.description, [('Personne',)])
       
   559         rset = self.qexecute('Any MIN(O) WHERE X ordernum O')
       
   560         self.assertEqual(len(rset.rows), 1)
       
   561         self.assertEqual(len(rset.rows[0]), 1)
       
   562         self.assertEqual(rset.description, [('Int',)])
       
   563 
       
   564     def test_select_aggregat_max(self):
       
   565         rset = self.qexecute('Any MAX(X) WHERE X is Personne')
       
   566         self.assertEqual(len(rset.rows), 1)
       
   567         self.assertEqual(len(rset.rows[0]), 1)
       
   568         self.assertEqual(rset.description, [('Personne',)])
       
   569         rset = self.qexecute('Any MAX(O) WHERE X ordernum O')
       
   570         self.assertEqual(len(rset.rows), 1)
       
   571         self.assertEqual(len(rset.rows[0]), 1)
       
   572         self.assertEqual(rset.description, [('Int',)])
       
   573 
       
   574     def test_select_custom_aggregat_concat_string(self):
       
   575         rset = self.qexecute('Any GROUP_CONCAT(N) WHERE X is CWGroup, X name N')
       
   576         self.assertTrue(rset)
       
   577         self.assertEqual(sorted(rset[0][0].split(', ')), ['guests', 'managers',
       
   578                                                              'owners', 'users'])
       
   579 
       
   580     def test_select_custom_regproc_limit_size(self):
       
   581         rset = self.qexecute('Any TEXT_LIMIT_SIZE(N, 3) WHERE X is CWGroup, X name N, X name "managers"')
       
   582         self.assertTrue(rset)
       
   583         self.assertEqual(rset[0][0], 'man...')
       
   584         self.qexecute("INSERT Basket X: X name 'bidule', X description '<b>hop hop</b>', X description_format 'text/html'")
       
   585         rset = self.qexecute('Any LIMIT_SIZE(D, DF, 3) WHERE X is Basket, X description D, X description_format DF')
       
   586         self.assertTrue(rset)
       
   587         self.assertEqual(rset[0][0], 'hop...')
       
   588 
       
   589     def test_select_regproc_orderby(self):
       
   590         rset = self.qexecute('DISTINCT Any X,N ORDERBY GROUP_SORT_VALUE(N) WHERE X is CWGroup, X name N, X name "managers"')
       
   591         self.assertEqual(len(rset), 1)
       
   592         self.assertEqual(rset[0][1], 'managers')
       
   593         rset = self.qexecute('Any X,N ORDERBY GROUP_SORT_VALUE(N) WHERE X is CWGroup, X name N, NOT U in_group X, U login "admin"')
       
   594         self.assertEqual(len(rset), 3)
       
   595         self.assertEqual(rset[0][1], 'owners')
       
   596 
       
   597     def test_select_aggregat_sort(self):
       
   598         rset = self.qexecute('Any G, COUNT(U) GROUPBY G ORDERBY 2 WHERE U in_group G')
       
   599         self.assertEqual(len(rset.rows), 2)
       
   600         self.assertEqual(len(rset.rows[0]), 2)
       
   601         self.assertEqual(rset.description[0], ('CWGroup', 'Int',))
       
   602 
       
   603     def test_select_aggregat_having(self):
       
   604         rset = self.qexecute('Any N,COUNT(RDEF) GROUPBY N ORDERBY 2,N '
       
   605                             'WHERE RT name N, RDEF relation_type RT '
       
   606                             'HAVING COUNT(RDEF) > 10')
       
   607         self.assertListEqual(rset.rows,
       
   608                               [[u'description_format', 13],
       
   609                                [u'description', 14],
       
   610                                [u'name', 19],
       
   611                                [u'created_by', 45],
       
   612                                [u'creation_date', 45],
       
   613                                [u'cw_source', 45],
       
   614                                [u'cwuri', 45],
       
   615                                [u'in_basket', 45],
       
   616                                [u'is', 45],
       
   617                                [u'is_instance_of', 45],
       
   618                                [u'modification_date', 45],
       
   619                                [u'owned_by', 45]])
       
   620 
       
   621     def test_select_aggregat_having_dumb(self):
       
   622         # dumb but should not raise an error
       
   623         rset = self.qexecute('Any U,COUNT(X) GROUPBY U '
       
   624                             'WHERE U eid %(x)s, X owned_by U '
       
   625                             'HAVING COUNT(X) > 10', {'x': self.ueid})
       
   626         self.assertEqual(len(rset.rows), 1)
       
   627         self.assertEqual(rset.rows[0][0], self.ueid)
       
   628 
       
   629     def test_select_having_non_aggregat_1(self):
       
   630         rset = self.qexecute('Any L WHERE X login L, X creation_date CD '
       
   631                             'HAVING YEAR(CD) = %s' % date.today().year)
       
   632         self.assertListEqual(rset.rows,
       
   633                               [[u'admin'],
       
   634                                [u'anon']])
       
   635 
       
   636     def test_select_having_non_aggregat_2(self):
       
   637         rset = self.qexecute('Any L GROUPBY L WHERE X login L, X in_group G, '
       
   638                             'X creation_date CD HAVING YEAR(CD) = %s OR COUNT(G) > 1'
       
   639                             % date.today().year)
       
   640         self.assertListEqual(rset.rows,
       
   641                               [[u'admin'],
       
   642                                [u'anon']])
       
   643 
       
   644     def test_select_complex_sort(self):
       
   645         """need sqlite including http://www.sqlite.org/cvstrac/tktview?tn=3773 fix"""
       
   646         rset = self.qexecute('Any X ORDERBY X,D LIMIT 5 WHERE X creation_date D')
       
   647         result = rset.rows
       
   648         result.sort()
       
   649         self.assertEqual(tuplify(result), [(1,), (2,), (3,), (4,), (5,)])
       
   650 
       
   651     def test_select_upper(self):
       
   652         rset = self.qexecute('Any X, UPPER(L) ORDERBY L WHERE X is CWUser, X login L')
       
   653         self.assertEqual(len(rset.rows), 2)
       
   654         self.assertEqual(rset.rows[0][1], 'ADMIN')
       
   655         self.assertEqual(rset.description[0], ('CWUser', 'String',))
       
   656         self.assertEqual(rset.rows[1][1], 'ANON')
       
   657         self.assertEqual(rset.description[1], ('CWUser', 'String',))
       
   658         eid = rset.rows[0][0]
       
   659         rset = self.qexecute('Any UPPER(L) WHERE X eid %s, X login L'%eid)
       
   660         self.assertEqual(rset.rows[0][0], 'ADMIN')
       
   661         self.assertEqual(rset.description, [('String',)])
       
   662 
       
   663     def test_select_float_abs(self):
       
   664         # test positive number
       
   665         eid = self.qexecute('INSERT Affaire A: A invoiced %(i)s', {'i': 1.2})[0][0]
       
   666         rset = self.qexecute('Any ABS(I) WHERE X eid %(x)s, X invoiced I', {'x': eid})
       
   667         self.assertEqual(rset.rows[0][0], 1.2)
       
   668         # test negative number
       
   669         eid = self.qexecute('INSERT Affaire A: A invoiced %(i)s', {'i': -1.2})[0][0]
       
   670         rset = self.qexecute('Any ABS(I) WHERE X eid %(x)s, X invoiced I', {'x': eid})
       
   671         self.assertEqual(rset.rows[0][0], 1.2)
       
   672 
       
   673     def test_select_int_abs(self):
       
   674         # test positive number
       
   675         eid = self.qexecute('INSERT Affaire A: A duration %(d)s', {'d': 12})[0][0]
       
   676         rset = self.qexecute('Any ABS(D) WHERE X eid %(x)s, X duration D', {'x': eid})
       
   677         self.assertEqual(rset.rows[0][0], 12)
       
   678         # test negative number
       
   679         eid = self.qexecute('INSERT Affaire A: A duration %(d)s', {'d': -12})[0][0]
       
   680         rset = self.qexecute('Any ABS(D) WHERE X eid %(x)s, X duration D', {'x': eid})
       
   681         self.assertEqual(rset.rows[0][0], 12)
       
   682 
       
   683 ##     def test_select_simplified(self):
       
   684 ##         ueid = self.session.user.eid
       
   685 ##         rset = self.qexecute('Any L WHERE %s login L'%ueid)
       
   686 ##         self.assertEqual(rset.rows[0][0], 'admin')
       
   687 ##         rset = self.qexecute('Any L WHERE %(x)s login L', {'x':ueid})
       
   688 ##         self.assertEqual(rset.rows[0][0], 'admin')
       
   689 
       
   690     def test_select_searchable_text_1(self):
       
   691         rset = self.qexecute(u"INSERT Personne X: X nom 'bidüle'")
       
   692         rset = self.qexecute(u"INSERT Societe X: X nom 'bidüle'")
       
   693         rset = self.qexecute("INSERT Societe X: X nom 'chouette'")
       
   694         rset = self.qexecute('Any X where X has_text %(text)s', {'text': u'bidüle'})
       
   695         self.assertEqual(len(rset.rows), 2, rset.rows)
       
   696         rset = self.qexecute(u'Any N where N has_text "bidüle"')
       
   697         self.assertEqual(len(rset.rows), 2, rset.rows)
       
   698         biduleeids = [r[0] for r in rset.rows]
       
   699         rset = self.qexecute(u'Any N where NOT N has_text "bidüle"')
       
   700         self.assertFalse([r[0] for r in rset.rows if r[0] in biduleeids])
       
   701         # duh?
       
   702         rset = self.qexecute('Any X WHERE X has_text %(text)s', {'text': u'ça'})
       
   703 
       
   704     def test_select_searchable_text_2(self):
       
   705         rset = self.qexecute("INSERT Personne X: X nom 'bidule'")
       
   706         rset = self.qexecute("INSERT Personne X: X nom 'chouette'")
       
   707         rset = self.qexecute("INSERT Societe X: X nom 'bidule'")
       
   708         rset = self.qexecute('Personne N where N has_text "bidule"')
       
   709         self.assertEqual(len(rset.rows), 1, rset.rows)
       
   710 
       
   711     def test_select_searchable_text_3(self):
       
   712         rset = self.qexecute("INSERT Personne X: X nom 'bidule', X sexe 'M'")
       
   713         rset = self.qexecute("INSERT Personne X: X nom 'bidule', X sexe 'F'")
       
   714         rset = self.qexecute("INSERT Societe X: X nom 'bidule'")
       
   715         rset = self.qexecute('Any X where X has_text "bidule" and X sexe "M"')
       
   716         self.assertEqual(len(rset.rows), 1, rset.rows)
       
   717 
       
   718     def test_select_multiple_searchable_text(self):
       
   719         self.qexecute(u"INSERT Personne X: X nom 'bidüle'")
       
   720         self.qexecute("INSERT Societe X: X nom 'chouette', S travaille X")
       
   721         self.qexecute(u"INSERT Personne X: X nom 'bidüle'")
       
   722         rset = self.qexecute('Personne X WHERE X has_text %(text)s, X travaille S, S has_text %(text2)s',
       
   723                             {'text': u'bidüle',
       
   724                              'text2': u'chouette',}
       
   725                             )
       
   726         self.assertEqual(len(rset.rows), 1, rset.rows)
       
   727 
       
   728     def test_select_no_descr(self):
       
   729         rset = self.qexecute('Any X WHERE X is CWGroup', build_descr=0)
       
   730         rset.rows.sort()
       
   731         self.assertEqual(tuplify(rset.rows), [(2,), (3,), (4,), (5,)])
       
   732         self.assertEqual(rset.description, ())
       
   733 
       
   734     def test_select_limit_offset(self):
       
   735         rset = self.qexecute('CWGroup X ORDERBY N LIMIT 2 WHERE X name N')
       
   736         self.assertEqual(tuplify(rset.rows), [(2,), (3,)])
       
   737         self.assertEqual(rset.description, [('CWGroup',), ('CWGroup',)])
       
   738         rset = self.qexecute('CWGroup X ORDERBY N LIMIT 2 OFFSET 2 WHERE X name N')
       
   739         self.assertEqual(tuplify(rset.rows), [(4,), (5,)])
       
   740 
       
   741     def test_select_symmetric(self):
       
   742         self.qexecute("INSERT Personne X: X nom 'machin'")
       
   743         self.qexecute("INSERT Personne X: X nom 'bidule'")
       
   744         self.qexecute("INSERT Personne X: X nom 'chouette'")
       
   745         self.qexecute("INSERT Personne X: X nom 'trucmuche'")
       
   746         self.qexecute("SET X connait Y WHERE X nom 'chouette', Y nom 'bidule'")
       
   747         self.qexecute("SET X connait Y WHERE X nom 'machin', Y nom 'chouette'")
       
   748         rset = self.qexecute('Any P WHERE P connait P2')
       
   749         self.assertEqual(len(rset.rows), 4, rset.rows)
       
   750         rset = self.qexecute('Any P WHERE NOT P connait P2')
       
   751         self.assertEqual(len(rset.rows), 1, rset.rows) # trucmuche
       
   752         rset = self.qexecute('Any P WHERE P connait P2, P2 nom "bidule"')
       
   753         self.assertEqual(len(rset.rows), 1, rset.rows)
       
   754         rset = self.qexecute('Any P WHERE P2 connait P, P2 nom "bidule"')
       
   755         self.assertEqual(len(rset.rows), 1, rset.rows)
       
   756         rset = self.qexecute('Any P WHERE P connait P2, P2 nom "chouette"')
       
   757         self.assertEqual(len(rset.rows), 2, rset.rows)
       
   758         rset = self.qexecute('Any P WHERE P2 connait P, P2 nom "chouette"')
       
   759         self.assertEqual(len(rset.rows), 2, rset.rows)
       
   760 
       
   761     def test_select_inline(self):
       
   762         self.qexecute("INSERT Personne X: X nom 'bidule'")
       
   763         self.qexecute("INSERT Note X: X type 'a'")
       
   764         self.qexecute("SET X ecrit_par Y WHERE X type 'a', Y nom 'bidule'")
       
   765         rset = self.qexecute('Any N where N ecrit_par X, X nom "bidule"')
       
   766         self.assertEqual(len(rset.rows), 1, rset.rows)
       
   767 
       
   768     def test_select_creation_date(self):
       
   769         self.qexecute("INSERT Personne X: X nom 'bidule'")
       
   770         rset = self.qexecute('Any D WHERE X nom "bidule", X creation_date D')
       
   771         self.assertEqual(len(rset.rows), 1)
       
   772 
       
   773     def test_select_or_relation(self):
       
   774         self.qexecute("INSERT Personne X: X nom 'bidule'")
       
   775         self.qexecute("INSERT Personne X: X nom 'chouette'")
       
   776         self.qexecute("INSERT Societe X: X nom 'logilab'")
       
   777         self.qexecute("INSERT Societe X: X nom 'caesium'")
       
   778         self.qexecute("SET P travaille S WHERE P nom 'bidule', S nom 'logilab'")
       
   779         rset = self.qexecute('DISTINCT Any P WHERE P travaille S1 OR P travaille S2, '
       
   780                              'S1 nom "logilab", S2 nom "caesium"')
       
   781         self.assertEqual(len(rset.rows), 1)
       
   782         self.qexecute("SET P travaille S WHERE P nom 'chouette', S nom 'caesium'")
       
   783         rset = self.qexecute('DISTINCT Any P WHERE P travaille S1 OR P travaille S2, '
       
   784                              'S1 nom "logilab", S2 nom "caesium"')
       
   785         self.assertEqual(len(rset.rows), 2)
       
   786 
       
   787     def test_select_or_sym_relation(self):
       
   788         self.qexecute("INSERT Personne X: X nom 'bidule'")
       
   789         self.qexecute("INSERT Personne X: X nom 'chouette'")
       
   790         self.qexecute("INSERT Personne X: X nom 'truc'")
       
   791         self.qexecute("SET P connait S WHERE P nom 'bidule', S nom 'chouette'")
       
   792         rset = self.qexecute('DISTINCT Any P WHERE S connait P, S nom "chouette"')
       
   793         self.assertEqual(len(rset.rows), 1, rset.rows)
       
   794         rset = self.qexecute('DISTINCT Any P WHERE P connait S or S connait P, S nom "chouette"')
       
   795         self.assertEqual(len(rset.rows), 1, rset.rows)
       
   796         self.qexecute("SET P connait S WHERE P nom 'chouette', S nom 'truc'")
       
   797         rset = self.qexecute('DISTINCT Any P WHERE S connait P, S nom "chouette"')
       
   798         self.assertEqual(len(rset.rows), 2, rset.rows)
       
   799         rset = self.qexecute('DISTINCT Any P WHERE P connait S OR S connait P, S nom "chouette"')
       
   800         self.assertEqual(len(rset.rows), 2, rset.rows)
       
   801 
       
   802     def test_select_follow_relation(self):
       
   803         self.qexecute("INSERT Affaire X: X sujet 'cool'")
       
   804         self.qexecute("INSERT Societe X: X nom 'chouette'")
       
   805         self.qexecute("SET A concerne S WHERE A is Affaire, S is Societe")
       
   806         self.qexecute("INSERT Note X: X para 'truc'")
       
   807         self.qexecute("SET S evaluee N WHERE S is Societe, N is Note")
       
   808         self.qexecute("INSERT Societe X: X nom 'bidule'")
       
   809         self.qexecute("INSERT Note X: X para 'troc'")
       
   810         self.qexecute("SET S evaluee N WHERE S nom 'bidule', N para 'troc'")
       
   811         rset = self.qexecute('DISTINCT Any A,N WHERE A concerne S, S evaluee N')
       
   812         self.assertEqual(len(rset.rows), 1, rset.rows)
       
   813 
       
   814     def test_select_ordered_distinct_1(self):
       
   815         self.assertRaises(BadRQLQuery,
       
   816                           self.qexecute, 'DISTINCT Any S ORDERBY R WHERE A is Affaire, A sujet S, A ref R')
       
   817 
       
   818     def test_select_ordered_distinct_2(self):
       
   819         self.qexecute("INSERT Affaire X: X sujet 'minor'")
       
   820         self.qexecute("INSERT Affaire X: X sujet 'zou'")
       
   821         self.qexecute("INSERT Affaire X: X sujet 'abcd'")
       
   822         rset = self.qexecute('DISTINCT Any S ORDERBY S WHERE A is Affaire, A sujet S')
       
   823         self.assertEqual(rset.rows, [['abcd'], ['minor'], ['zou']])
       
   824 
       
   825     def test_select_ordered_distinct_3(self):
       
   826         rset = self.qexecute('DISTINCT Any N ORDERBY GROUP_SORT_VALUE(N) WHERE X is CWGroup, X name N')
       
   827         self.assertEqual(rset.rows, [['owners'], ['guests'], ['users'], ['managers']])
       
   828 
       
   829     def test_select_or_value(self):
       
   830         rset = self.qexecute('Any U WHERE U in_group G, G name "owners" OR G name "users"')
       
   831         self.assertEqual(len(rset.rows), 0)
       
   832         rset = self.qexecute('Any U WHERE U in_group G, G name "guests" OR G name "managers"')
       
   833         self.assertEqual(len(rset.rows), 2)
       
   834 
       
   835     def test_select_explicit_eid(self):
       
   836         rset = self.qexecute('Any X,E WHERE X owned_by U, X eid E, U eid %(u)s',
       
   837                              {'u': self.session.user.eid})
       
   838         self.assertTrue(rset)
       
   839         self.assertEqual(rset.description[0][1], 'Int')
       
   840 
       
   841 #     def test_select_rewritten_optional(self):
       
   842 #         eid = self.qexecute("INSERT Affaire X: X sujet 'cool'")[0][0]
       
   843 #         rset = self.qexecute('Any X WHERE X eid %(x)s, EXISTS(X owned_by U) OR EXISTS(X concerne S?, S owned_by U)',
       
   844 #                             {'x': eid}, 'x')
       
   845 #         self.assertEqual(rset.rows, [[eid]])
       
   846 
       
   847     def test_today_bug(self):
       
   848         self.qexecute("INSERT Tag X: X name 'bidule', X creation_date NOW")
       
   849         self.qexecute("INSERT Tag Y: Y name 'toto'")
       
   850         rset = self.qexecute("Any D WHERE X name in ('bidule', 'toto') , X creation_date D")
       
   851         self.assertIsInstance(rset.rows[0][0], datetime)
       
   852         rset = self.qexecute('Tag X WHERE X creation_date TODAY')
       
   853         self.assertEqual(len(rset.rows), 2)
       
   854 
       
   855     def test_sqlite_patch(self):
       
   856         """this test monkey patch done by sqlutils._install_sqlite_querier_patch"""
       
   857         self.qexecute("INSERT Personne X: X nom 'bidule', X datenaiss NOW, X tzdatenaiss NOW")
       
   858         rset = self.qexecute('Any MAX(D) WHERE X is Personne, X datenaiss D')
       
   859         self.assertIsInstance(rset[0][0], datetime)
       
   860         rset = self.qexecute('Any MAX(D) WHERE X is Personne, X tzdatenaiss D')
       
   861         self.assertIsInstance(rset[0][0], datetime)
       
   862         self.assertEqual(rset[0][0].tzinfo, pytz.utc)
       
   863 
       
   864     def test_today(self):
       
   865         self.qexecute("INSERT Tag X: X name 'bidule', X creation_date TODAY")
       
   866         self.qexecute("INSERT Tag Y: Y name 'toto'")
       
   867         rset = self.qexecute('Tag X WHERE X creation_date TODAY')
       
   868         self.assertEqual(len(rset.rows), 2)
       
   869 
       
   870     def test_select_boolean(self):
       
   871         rset = self.qexecute('Any N WHERE X is CWEType, X name N, X final %(val)s',
       
   872                             {'val': True})
       
   873         self.assertEqual(sorted(r[0] for r in rset.rows), ['BigInt', 'Boolean', 'Bytes',
       
   874                                                            'Date', 'Datetime',
       
   875                                                            'Decimal', 'Float',
       
   876                                                            'Int', 'Interval',
       
   877                                                            'Password', 'String',
       
   878                                                            'TZDatetime', 'TZTime',
       
   879                                                            'Time'])
       
   880         rset = self.qexecute('Any N WHERE X is CWEType, X name N, X final TRUE')
       
   881         self.assertEqual(sorted(r[0] for r in rset.rows), ['BigInt', 'Boolean', 'Bytes',
       
   882                                                            'Date', 'Datetime',
       
   883                                                            'Decimal', 'Float',
       
   884                                                            'Int', 'Interval',
       
   885                                                            'Password', 'String',
       
   886                                                            'TZDatetime', 'TZTime',
       
   887                                                            'Time'])
       
   888         with self.session.new_cnx() as cnx:
       
   889             cnx.create_entity('Personne', nom=u'louis', test=True)
       
   890             self.assertEqual(len(cnx.execute('Any X WHERE X test %(val)s', {'val': True})), 1)
       
   891             self.assertEqual(len(cnx.execute('Any X WHERE X test TRUE')), 1)
       
   892             self.assertEqual(len(cnx.execute('Any X WHERE X test %(val)s', {'val': False})), 0)
       
   893             self.assertEqual(len(cnx.execute('Any X WHERE X test FALSE')), 0)
       
   894 
       
   895     def test_select_constant(self):
       
   896         rset = self.qexecute('Any X, "toto" ORDERBY X WHERE X is CWGroup')
       
   897         self.assertEqual(rset.rows,
       
   898                          [list(x) for x in zip((2,3,4,5), ('toto','toto','toto','toto',))])
       
   899         self.assertIsInstance(rset[0][1], text_type)
       
   900         self.assertEqual(rset.description,
       
   901                          list(zip(('CWGroup', 'CWGroup', 'CWGroup', 'CWGroup'),
       
   902                                   ('String', 'String', 'String', 'String',))))
       
   903         rset = self.qexecute('Any X, %(value)s ORDERBY X WHERE X is CWGroup', {'value': 'toto'})
       
   904         self.assertEqual(rset.rows,
       
   905                          list(map(list, zip((2,3,4,5), ('toto','toto','toto','toto',)))))
       
   906         self.assertIsInstance(rset[0][1], text_type)
       
   907         self.assertEqual(rset.description,
       
   908                          list(zip(('CWGroup', 'CWGroup', 'CWGroup', 'CWGroup'),
       
   909                                   ('String', 'String', 'String', 'String',))))
       
   910         rset = self.qexecute('Any X,GN WHERE X is CWUser, G is CWGroup, X login "syt", '
       
   911                              'X in_group G, G name GN')
       
   912 
       
   913     def test_select_union(self):
       
   914         rset = self.qexecute('Any X,N ORDERBY N WITH X,N BEING '
       
   915                             '((Any X,N WHERE X name N, X transition_of WF, WF workflow_of E, E name %(name)s)'
       
   916                             ' UNION '
       
   917                             '(Any X,N WHERE X name N, X state_of WF, WF workflow_of E, E name %(name)s))',
       
   918                             {'name': 'CWUser'})
       
   919         self.assertEqual([x[1] for x in rset.rows],
       
   920                           ['activate', 'activated', 'deactivate', 'deactivated'])
       
   921         self.assertEqual(rset.description,
       
   922                           [('Transition', 'String'), ('State', 'String'),
       
   923                            ('Transition', 'String'), ('State', 'String')])
       
   924 
       
   925     def test_select_union_aggregat(self):
       
   926         # meaningless, the goal in to have group by done on different attribute
       
   927         # for each sub-query
       
   928         self.qexecute('(Any N,COUNT(X) GROUPBY N WHERE X name N, X is State)'
       
   929                      ' UNION '
       
   930                      '(Any N,COUNT(X) GROUPBY N ORDERBY 2 WHERE X login N)')
       
   931 
       
   932     def test_select_union_aggregat_independant_group(self):
       
   933         with self.session.new_cnx() as cnx:
       
   934             cnx.execute('INSERT State X: X name "hop"')
       
   935             cnx.execute('INSERT State X: X name "hop"')
       
   936             cnx.execute('INSERT Transition X: X name "hop"')
       
   937             cnx.execute('INSERT Transition X: X name "hop"')
       
   938             rset = cnx.execute('Any N,NX ORDERBY 2 WITH N,NX BEING '
       
   939                                '((Any N,COUNT(X) GROUPBY N WHERE X name N, '
       
   940                                '  X is State HAVING COUNT(X)>1)'
       
   941                                ' UNION '
       
   942                                '(Any N,COUNT(X) GROUPBY N WHERE X name N, '
       
   943                                ' X is Transition HAVING COUNT(X)>1))')
       
   944             self.assertEqual(rset.rows, [[u'hop', 2], [u'hop', 2]])
       
   945 
       
   946     def test_select_union_selection_with_diff_variables(self):
       
   947         rset = self.qexecute('(Any N WHERE X name N, X is State)'
       
   948                             ' UNION '
       
   949                             '(Any NN WHERE XX name NN, XX is Transition)')
       
   950         self.assertEqual(sorted(r[0] for r in rset.rows),
       
   951                           ['abort', 'activate', 'activated', 'ben non',
       
   952                            'deactivate', 'deactivated', 'done', 'en cours',
       
   953                            'end', 'finie', 'markasdone', 'pitetre', 'redoit',
       
   954                            'start', 'todo'])
       
   955 
       
   956     def test_select_union_description_diff_var(self):
       
   957         eid1 = self.qexecute('CWGroup X WHERE X name "managers"')[0][0]
       
   958         eid2 = self.qexecute('CWUser X WHERE X login "admin"')[0][0]
       
   959         rset = self.qexecute('(Any X WHERE X eid %(x)s)'
       
   960                             ' UNION '
       
   961                             '(Any Y WHERE Y eid %(y)s)',
       
   962                             {'x': eid1, 'y': eid2})
       
   963         self.assertEqual(rset.description[:], [('CWGroup',), ('CWUser',)])
       
   964 
       
   965     def test_exists(self):
       
   966         geid = self.qexecute("INSERT CWGroup X: X name 'lulufanclub'")[0][0]
       
   967         self.qexecute("SET U in_group G WHERE G name 'lulufanclub'")
       
   968         peid = self.qexecute("INSERT Personne X: X prenom 'lulu', X nom 'petit'")[0][0]
       
   969         rset = self.qexecute("Any X WHERE X prenom 'lulu',"
       
   970                             "EXISTS (U in_group G, G name 'lulufanclub' OR G name 'managers');")
       
   971         self.assertEqual(rset.rows, [[peid]])
       
   972 
       
   973     def test_identity(self):
       
   974         eid = self.qexecute('Any X WHERE X identity Y, Y eid 1')[0][0]
       
   975         self.assertEqual(eid, 1)
       
   976         eid = self.qexecute('Any X WHERE Y identity X, Y eid 1')[0][0]
       
   977         self.assertEqual(eid, 1)
       
   978         login = self.qexecute('Any L WHERE X login "admin", X identity Y, Y login L')[0][0]
       
   979         self.assertEqual(login, 'admin')
       
   980 
       
   981     def test_select_date_mathexp(self):
       
   982         rset = self.qexecute('Any X, TODAY - CD WHERE X is CWUser, X creation_date CD')
       
   983         self.assertTrue(rset)
       
   984         self.assertEqual(rset.description[0][1], 'Interval')
       
   985         eid, = self.qexecute("INSERT Personne X: X nom 'bidule'")[0]
       
   986         rset = self.qexecute('Any X, NOW - CD WHERE X is Personne, X creation_date CD')
       
   987         self.assertEqual(rset.description[0][1], 'Interval')
       
   988 
       
   989     def test_select_subquery_aggregat_1(self):
       
   990         # percent users by groups
       
   991         self.qexecute('SET X in_group G WHERE G name "users"')
       
   992         rset = self.qexecute('Any GN, COUNT(X)*100/T GROUPBY GN ORDERBY 2,1'
       
   993                             ' WHERE G name GN, X in_group G'
       
   994                             ' WITH T BEING (Any COUNT(U) WHERE U is CWUser)')
       
   995         self.assertEqual(rset.rows, [[u'guests', 50], [u'managers', 50], [u'users', 100]])
       
   996         self.assertEqual(rset.description, [('String', 'Int'), ('String', 'Int'), ('String', 'Int')])
       
   997 
       
   998     def test_select_subquery_aggregat_2(self):
       
   999         expected = self.qexecute('Any X, 0, COUNT(T) GROUPBY X '
       
  1000                                 'WHERE X is Workflow, T transition_of X').rows
       
  1001         rset = self.qexecute('''
       
  1002 Any P1,B,E WHERE P1 identity P2 WITH
       
  1003   P1,B BEING (Any P,COUNT(T) GROUPBY P WHERE P is Workflow, T is Transition,
       
  1004               T? transition_of P, T type "auto"),
       
  1005   P2,E BEING (Any P,COUNT(T) GROUPBY P WHERE P is Workflow, T is Transition,
       
  1006               T? transition_of P, T type "normal")''')
       
  1007         self.assertEqual(sorted(rset.rows), sorted(expected))
       
  1008 
       
  1009     def test_select_subquery_const(self):
       
  1010         rset = self.qexecute('Any X WITH X BEING ((Any NULL) UNION (Any "toto"))')
       
  1011         self.assertEqual(rset.rows, [[None], ['toto']])
       
  1012         self.assertEqual(rset.description, [(None,), ('String',)])
       
  1013 
       
  1014     # insertion queries tests #################################################
       
  1015 
       
  1016     def test_insert_is(self):
       
  1017         eid, = self.qexecute("INSERT Personne X: X nom 'bidule'")[0]
       
  1018         etype, = self.qexecute("Any TN WHERE X is T, X eid %s, T name TN" % eid)[0]
       
  1019         self.assertEqual(etype, 'Personne')
       
  1020         self.qexecute("INSERT Personne X: X nom 'managers'")
       
  1021 
       
  1022     def test_insert_1(self):
       
  1023         rset = self.qexecute("INSERT Personne X: X nom 'bidule'")
       
  1024         self.assertEqual(len(rset.rows), 1)
       
  1025         self.assertEqual(rset.description, [('Personne',)])
       
  1026         rset = self.qexecute('Personne X WHERE X nom "bidule"')
       
  1027         self.assertTrue(rset.rows)
       
  1028         self.assertEqual(rset.description, [('Personne',)])
       
  1029 
       
  1030     def test_insert_1_multiple(self):
       
  1031         self.qexecute("INSERT Personne X: X nom 'bidule'")
       
  1032         self.qexecute("INSERT Personne X: X nom 'chouette'")
       
  1033         rset = self.qexecute("INSERT Societe Y: Y nom N, P travaille Y WHERE P nom N")
       
  1034         self.assertEqual(len(rset.rows), 2)
       
  1035         self.assertEqual(rset.description, [('Societe',), ('Societe',)])
       
  1036 
       
  1037     def test_insert_2(self):
       
  1038         rset = self.qexecute("INSERT Personne X, Personne Y: X nom 'bidule', Y nom 'tutu'")
       
  1039         self.assertEqual(rset.description, [('Personne', 'Personne')])
       
  1040         rset = self.qexecute('Personne X WHERE X nom "bidule" or X nom "tutu"')
       
  1041         self.assertTrue(rset.rows)
       
  1042         self.assertEqual(rset.description, [('Personne',), ('Personne',)])
       
  1043 
       
  1044     def test_insert_3(self):
       
  1045         self.qexecute("INSERT Personne X: X nom Y WHERE U login 'admin', U login Y")
       
  1046         rset = self.qexecute('Personne X WHERE X nom "admin"')
       
  1047         self.assertTrue(rset.rows)
       
  1048         self.assertEqual(rset.description, [('Personne',)])
       
  1049 
       
  1050     def test_insert_4(self):
       
  1051         self.qexecute("INSERT Societe Y: Y nom 'toto'")
       
  1052         self.qexecute("INSERT Personne X: X nom 'bidule', X travaille Y WHERE Y nom 'toto'")
       
  1053         rset = self.qexecute('Any X, Y WHERE X nom "bidule", Y nom "toto", X travaille Y')
       
  1054         self.assertTrue(rset.rows)
       
  1055         self.assertEqual(rset.description, [('Personne', 'Societe',)])
       
  1056 
       
  1057     def test_insert_4bis(self):
       
  1058         peid = self.qexecute("INSERT Personne X: X nom 'bidule'")[0][0]
       
  1059         seid = self.qexecute("INSERT Societe Y: Y nom 'toto', X travaille Y WHERE X eid %(x)s",
       
  1060                              {'x': str(peid)})[0][0]
       
  1061         self.assertEqual(len(self.qexecute('Any X, Y WHERE X travaille Y')), 1)
       
  1062         self.qexecute("INSERT Personne X: X nom 'chouette', X travaille Y WHERE Y eid %(x)s",
       
  1063                       {'x': str(seid)})
       
  1064         self.assertEqual(len(self.qexecute('Any X, Y WHERE X travaille Y')), 2)
       
  1065 
       
  1066     def test_insert_4ter(self):
       
  1067         peid = self.qexecute("INSERT Personne X: X nom 'bidule'")[0][0]
       
  1068         seid = self.qexecute("INSERT Societe Y: Y nom 'toto', X travaille Y WHERE X eid %(x)s",
       
  1069                              {'x': text_type(peid)})[0][0]
       
  1070         self.assertEqual(len(self.qexecute('Any X, Y WHERE X travaille Y')), 1)
       
  1071         self.qexecute("INSERT Personne X: X nom 'chouette', X travaille Y WHERE Y eid %(x)s",
       
  1072                       {'x': text_type(seid)})
       
  1073         self.assertEqual(len(self.qexecute('Any X, Y WHERE X travaille Y')), 2)
       
  1074 
       
  1075     def test_insert_5(self):
       
  1076         self.qexecute("INSERT Personne X: X nom 'bidule'")
       
  1077         self.qexecute("INSERT Societe Y: Y nom 'toto', X travaille Y WHERE X nom 'bidule'")
       
  1078         rset = self.qexecute('Any X, Y WHERE X nom "bidule", Y nom "toto", X travaille Y')
       
  1079         self.assertTrue(rset.rows)
       
  1080         self.assertEqual(rset.description, [('Personne', 'Societe',)])
       
  1081 
       
  1082     def test_insert_5bis(self):
       
  1083         peid = self.qexecute("INSERT Personne X: X nom 'bidule'")[0][0]
       
  1084         self.qexecute("INSERT Societe Y: Y nom 'toto', X travaille Y WHERE X eid %(x)s",
       
  1085                      {'x': peid})
       
  1086         rset = self.qexecute('Any X, Y WHERE X nom "bidule", Y nom "toto", X travaille Y')
       
  1087         self.assertTrue(rset.rows)
       
  1088         self.assertEqual(rset.description, [('Personne', 'Societe',)])
       
  1089 
       
  1090     def test_insert_6(self):
       
  1091         self.qexecute("INSERT Personne X, Societe Y: X nom 'bidule', Y nom 'toto', X travaille Y")
       
  1092         rset = self.qexecute('Any X, Y WHERE X nom "bidule", Y nom "toto", X travaille Y')
       
  1093         self.assertTrue(rset.rows)
       
  1094         self.assertEqual(rset.description, [('Personne', 'Societe',)])
       
  1095 
       
  1096     def test_insert_7(self):
       
  1097         self.qexecute("INSERT Personne X, Societe Y: X nom N, Y nom 'toto', "
       
  1098                       "X travaille Y WHERE U login 'admin', U login N")
       
  1099         rset = self.qexecute('Any X, Y WHERE X nom "admin", Y nom "toto", X travaille Y')
       
  1100         self.assertTrue(rset.rows)
       
  1101         self.assertEqual(rset.description, [('Personne', 'Societe',)])
       
  1102 
       
  1103     def test_insert_7_2(self):
       
  1104         self.qexecute("INSERT Personne X, Societe Y: X nom N, Y nom 'toto', "
       
  1105                       "X travaille Y WHERE U login N")
       
  1106         rset = self.qexecute('Any X, Y WHERE Y nom "toto", X travaille Y')
       
  1107         self.assertEqual(len(rset), 2)
       
  1108         self.assertEqual(rset.description, [('Personne', 'Societe',),
       
  1109                                              ('Personne', 'Societe',)])
       
  1110 
       
  1111     def test_insert_8(self):
       
  1112         self.qexecute("INSERT Societe Y, Personne X: Y nom N, X nom 'toto', X travaille Y "
       
  1113                       "WHERE U login 'admin', U login N")
       
  1114         rset = self.qexecute('Any X, Y WHERE X nom "toto", Y nom "admin", X travaille Y')
       
  1115         self.assertTrue(rset.rows)
       
  1116         self.assertEqual(rset.description, [('Personne', 'Societe',)])
       
  1117 
       
  1118     def test_insert_9(self):
       
  1119         self.qexecute("INSERT Societe X: X nom  'Lo'")
       
  1120         self.qexecute("INSERT Societe X: X nom  'Gi'")
       
  1121         self.qexecute("INSERT SubDivision X: X nom  'Lab'")
       
  1122         rset = self.qexecute("INSERT Personne X: X nom N, X travaille Y, X travaille_subdivision Z "
       
  1123                              "WHERE Y is Societe, Z is SubDivision, Y nom N")
       
  1124         self.assertEqual(len(rset), 2)
       
  1125         self.assertEqual(rset.description, [('Personne',), ('Personne',)])
       
  1126         # self.assertSetEqual(set(x.nom for x in rset.entities()),
       
  1127         #                      ['Lo', 'Gi'])
       
  1128         # self.assertSetEqual(set(y.nom for x in rset.entities() for y in x.travaille),
       
  1129         #                      ['Lo', 'Gi'])
       
  1130         # self.assertEqual([y.nom for x in rset.entities() for y in x.travaille_subdivision],
       
  1131         #                      ['Lab', 'Lab'])
       
  1132 
       
  1133     def test_insert_query_error(self):
       
  1134         self.assertRaises(Exception,
       
  1135                           self.qexecute,
       
  1136                           "INSERT Personne X: X nom 'toto', X is Personne")
       
  1137         self.assertRaises(Exception,
       
  1138                           self.qexecute,
       
  1139                           "INSERT Personne X: X nom 'toto', X is_instance_of Personne")
       
  1140         self.assertRaises(QueryError,
       
  1141                           self.qexecute,
       
  1142                           "INSERT Personne X: X nom 'toto', X has_text 'tutu'")
       
  1143 
       
  1144         self.assertRaises(QueryError,
       
  1145                           self.qexecute,
       
  1146                           "INSERT CWUser X: X login 'toto', X eid %s" % cnx.user(self.session).eid)
       
  1147 
       
  1148     def test_insertion_description_with_where(self):
       
  1149         rset = self.qexecute('INSERT CWUser E, EmailAddress EM: E login "X", E upassword "X", '
       
  1150                             'E primary_email EM, EM address "X", E in_group G '
       
  1151                             'WHERE G name "managers"')
       
  1152         self.assertEqual(list(rset.description[0]), ['CWUser', 'EmailAddress'])
       
  1153 
       
  1154     # deletion queries tests ##################################################
       
  1155 
       
  1156     def test_delete_1(self):
       
  1157         self.qexecute("INSERT Personne Y: Y nom 'toto'")
       
  1158         rset = self.qexecute('Personne X WHERE X nom "toto"')
       
  1159         self.assertEqual(len(rset.rows), 1)
       
  1160         drset = self.qexecute("DELETE Personne Y WHERE Y nom 'toto'")
       
  1161         self.assertEqual(drset.rows, rset.rows)
       
  1162         rset = self.qexecute('Personne X WHERE X nom "toto"')
       
  1163         self.assertEqual(len(rset.rows), 0)
       
  1164 
       
  1165     def test_delete_2(self):
       
  1166         rset = self.qexecute("INSERT Personne X, Personne Y, Societe Z : "
       
  1167                              "X nom 'syt', Y nom 'adim', Z nom 'Logilab', X travaille Z, Y travaille Z")
       
  1168         self.assertEqual(len(rset), 1)
       
  1169         self.assertEqual(len(rset[0]), 3)
       
  1170         self.assertEqual(rset.description[0], ('Personne', 'Personne', 'Societe'))
       
  1171         self.assertEqual(self.qexecute('Any N WHERE X nom N, X eid %s'% rset[0][0])[0][0], 'syt')
       
  1172         rset = self.qexecute('Personne X WHERE X travaille Y, Y nom "Logilab"')
       
  1173         self.assertEqual(len(rset.rows), 2, rset.rows)
       
  1174         self.qexecute("DELETE X travaille Y WHERE X is Personne, Y nom 'Logilabo'")
       
  1175         rset = self.qexecute('Personne X WHERE X travaille Y, Y nom "Logilab"')
       
  1176         self.assertEqual(len(rset.rows), 2, rset.rows)
       
  1177         self.qexecute("DELETE X travaille Y WHERE X is Personne, Y nom 'Logilab'")
       
  1178         rset = self.qexecute('Personne X WHERE X travaille Y, Y nom "Logilab"')
       
  1179         self.assertEqual(len(rset.rows), 0, rset.rows)
       
  1180 
       
  1181     def test_delete_3(self):
       
  1182         s = self.user_groups_session('users')
       
  1183         with s.new_cnx() as cnx:
       
  1184             peid, = self.o.execute(cnx, "INSERT Personne P: P nom 'toto'")[0]
       
  1185             seid, = self.o.execute(cnx, "INSERT Societe S: S nom 'logilab'")[0]
       
  1186             self.o.execute(cnx, "SET P travaille S")
       
  1187             cnx.commit()
       
  1188         rset = self.qexecute('Personne P WHERE P travaille S')
       
  1189         self.assertEqual(len(rset.rows), 1)
       
  1190         self.qexecute("DELETE X travaille Y WHERE X eid %s, Y eid %s" % (peid, seid))
       
  1191         rset = self.qexecute('Personne P WHERE P travaille S')
       
  1192         self.assertEqual(len(rset.rows), 0)
       
  1193 
       
  1194     def test_delete_symmetric(self):
       
  1195         teid1 = self.qexecute("INSERT Folder T: T name 'toto'")[0][0]
       
  1196         teid2 = self.qexecute("INSERT Folder T: T name 'tutu'")[0][0]
       
  1197         self.qexecute('SET X see_also Y WHERE X eid %s, Y eid %s' % (teid1, teid2))
       
  1198         rset = self.qexecute('Any X,Y WHERE X see_also Y')
       
  1199         self.assertEqual(len(rset) , 2, rset.rows)
       
  1200         self.qexecute('DELETE X see_also Y WHERE X eid %s, Y eid %s' % (teid1, teid2))
       
  1201         rset = self.qexecute('Any X,Y WHERE X see_also Y')
       
  1202         self.assertEqual(len(rset) , 0)
       
  1203         self.qexecute('SET X see_also Y WHERE X eid %s, Y eid %s' % (teid1, teid2))
       
  1204         rset = self.qexecute('Any X,Y WHERE X see_also Y')
       
  1205         self.assertEqual(len(rset) , 2)
       
  1206         self.qexecute('DELETE X see_also Y WHERE X eid %s, Y eid %s' % (teid2, teid1))
       
  1207         rset = self.qexecute('Any X,Y WHERE X see_also Y')
       
  1208         self.assertEqual(len(rset) , 0)
       
  1209 
       
  1210     def test_nonregr_delete_cache(self):
       
  1211         """test that relations are properly cleaned when an entity is deleted
       
  1212         (using cachekey on sql generation returned always the same query for an eid,
       
  1213         whatever the relation)
       
  1214         """
       
  1215         aeid, = self.qexecute('INSERT EmailAddress X: X address "toto@logilab.fr", X alias "hop"')[0]
       
  1216         # XXX would be nice if the rql below was enough...
       
  1217         #'INSERT Email X: X messageid "<1234>", X subject "test", X sender Y, X recipients Y'
       
  1218         eeid, = self.qexecute('INSERT Email X: X messageid "<1234>", X subject "test", '
       
  1219                               'X sender Y, X recipients Y WHERE Y is EmailAddress')[0]
       
  1220         self.qexecute("DELETE Email X")
       
  1221         with self.session.new_cnx() as cnx:
       
  1222             sqlc = cnx.cnxset.cu
       
  1223             sqlc.execute('SELECT * FROM recipients_relation')
       
  1224             self.assertEqual(len(sqlc.fetchall()), 0)
       
  1225             sqlc.execute('SELECT * FROM owned_by_relation WHERE eid_from=%s'%eeid)
       
  1226             self.assertEqual(len(sqlc.fetchall()), 0)
       
  1227 
       
  1228     def test_nonregr_delete_cache2(self):
       
  1229         eid = self.qexecute("INSERT Folder T: T name 'toto'")[0][0]
       
  1230         # fill the cache
       
  1231         self.qexecute("Any X WHERE X eid %(x)s", {'x': eid})
       
  1232         self.qexecute("Any X WHERE X eid %s" % eid)
       
  1233         self.qexecute("Folder X WHERE X eid %(x)s", {'x': eid})
       
  1234         self.qexecute("Folder X WHERE X eid %s" % eid)
       
  1235         self.qexecute("DELETE Folder T WHERE T eid %s" % eid)
       
  1236         rset = self.qexecute("Any X WHERE X eid %(x)s", {'x': eid})
       
  1237         self.assertEqual(rset.rows, [])
       
  1238         rset = self.qexecute("Any X WHERE X eid %s" % eid)
       
  1239         self.assertEqual(rset.rows, [])
       
  1240         rset = self.qexecute("Folder X WHERE X eid %(x)s", {'x': eid})
       
  1241         self.assertEqual(rset.rows, [])
       
  1242         rset = self.qexecute("Folder X WHERE X eid %s" %eid)
       
  1243         self.assertEqual(rset.rows, [])
       
  1244 
       
  1245     # update queries tests ####################################################
       
  1246 
       
  1247     def test_update_1(self):
       
  1248         peid = self.qexecute("INSERT Personne Y: Y nom 'toto'")[0][0]
       
  1249         rset = self.qexecute('Personne X WHERE X nom "toto"')
       
  1250         self.assertEqual(len(rset.rows), 1)
       
  1251         rset = self.qexecute("SET X nom 'tutu', X prenom 'original' WHERE X is Personne, X nom 'toto'")
       
  1252         self.assertEqual(tuplify(rset.rows), [(peid, 'tutu', 'original')])
       
  1253         rset = self.qexecute('Any Y, Z WHERE X is Personne, X nom Y, X prenom Z')
       
  1254         self.assertEqual(tuplify(rset.rows), [('tutu', 'original')])
       
  1255 
       
  1256     def test_update_2(self):
       
  1257         peid, seid = self.qexecute("INSERT Personne X, Societe Y: X nom 'bidule', Y nom 'toto'")[0]
       
  1258         rset = self.qexecute("SET X travaille Y WHERE X nom 'bidule', Y nom 'toto'")
       
  1259         self.assertEqual(tuplify(rset.rows), [(peid, seid)])
       
  1260         rset = self.qexecute('Any X, Y WHERE X travaille Y')
       
  1261         self.assertEqual(len(rset.rows), 1)
       
  1262 
       
  1263     def test_update_2bis(self):
       
  1264         rset = self.qexecute("INSERT Personne X, Societe Y: X nom 'bidule', Y nom 'toto'")
       
  1265         eid1, eid2 = rset[0][0], rset[0][1]
       
  1266         self.qexecute("SET X travaille Y WHERE X eid %(x)s, Y eid %(y)s",
       
  1267                       {'x': str(eid1), 'y': str(eid2)})
       
  1268         rset = self.qexecute('Any X, Y WHERE X travaille Y')
       
  1269         self.assertEqual(len(rset.rows), 1)
       
  1270         # test add of an existant relation but with NOT X rel Y protection
       
  1271         self.assertFalse(self.qexecute("SET X travaille Y WHERE X eid %(x)s, Y eid %(y)s,"
       
  1272                                  "NOT X travaille Y",
       
  1273                                  {'x': str(eid1), 'y': str(eid2)}))
       
  1274 
       
  1275     def test_update_2ter(self):
       
  1276         rset = self.qexecute("INSERT Personne X, Societe Y: X nom 'bidule', Y nom 'toto'")
       
  1277         eid1, eid2 = rset[0][0], rset[0][1]
       
  1278         self.qexecute("SET X travaille Y WHERE X eid %(x)s, Y eid %(y)s",
       
  1279                       {'x': text_type(eid1), 'y': text_type(eid2)})
       
  1280         rset = self.qexecute('Any X, Y WHERE X travaille Y')
       
  1281         self.assertEqual(len(rset.rows), 1)
       
  1282 
       
  1283     def test_update_multiple1(self):
       
  1284         peid1 = self.qexecute("INSERT Personne Y: Y nom 'tutu'")[0][0]
       
  1285         peid2 = self.qexecute("INSERT Personne Y: Y nom 'toto'")[0][0]
       
  1286         self.qexecute("SET X nom 'tutu', Y nom 'toto' WHERE X nom 'toto', Y nom 'tutu'")
       
  1287         self.assertEqual(self.qexecute('Any X WHERE X nom "toto"').rows, [[peid1]])
       
  1288         self.assertEqual(self.qexecute('Any X WHERE X nom "tutu"').rows, [[peid2]])
       
  1289 
       
  1290     def test_update_multiple2(self):
       
  1291         with self.session.new_cnx() as cnx:
       
  1292             ueid = cnx.execute("INSERT CWUser X: X login 'bob', X upassword 'toto'")[0][0]
       
  1293             peid1 = cnx.execute("INSERT Personne Y: Y nom 'turlu'")[0][0]
       
  1294             peid2 = cnx.execute("INSERT Personne Y: Y nom 'tutu'")[0][0]
       
  1295             cnx.execute('SET P1 owned_by U, P2 owned_by U '
       
  1296                         'WHERE P1 eid %s, P2 eid %s, U eid %s' % (peid1, peid2, ueid))
       
  1297             self.assertTrue(cnx.execute('Any X WHERE X eid %s, X owned_by U, U eid %s'
       
  1298                                           % (peid1, ueid)))
       
  1299             self.assertTrue(cnx.execute('Any X WHERE X eid %s, X owned_by U, U eid %s'
       
  1300                                           % (peid2, ueid)))
       
  1301 
       
  1302     def test_update_math_expr(self):
       
  1303         orders = [r[0] for r in self.qexecute('Any O ORDERBY O WHERE ST name "Personne", '
       
  1304                                               'X from_entity ST, X ordernum O')]
       
  1305         for i,v in enumerate(orders):
       
  1306             if v != orders[0]:
       
  1307                 splitidx = i
       
  1308                 break
       
  1309         self.qexecute('SET X ordernum Y+1 WHERE X from_entity SE, SE name "Personne", '
       
  1310                       'X ordernum Y, X ordernum >= %(order)s',
       
  1311                      {'order': orders[splitidx]})
       
  1312         orders2 = [r[0] for r in self.qexecute('Any O ORDERBY O WHERE ST name "Personne", '
       
  1313                                                'X from_entity ST, X ordernum O')]
       
  1314         orders = orders[:splitidx] + [o+1 for o in orders[splitidx:]]
       
  1315         self.assertEqual(orders2, orders)
       
  1316 
       
  1317     def test_update_string_concat(self):
       
  1318         beid = self.qexecute("INSERT Bookmark Y: Y title 'toto', Y path '/view'")[0][0]
       
  1319         self.qexecute('SET X title XN + %(suffix)s WHERE X is Bookmark, X title XN',
       
  1320                       {'suffix': u'-moved'})
       
  1321         newname = self.qexecute('Any XN WHERE X eid %(x)s, X title XN', {'x': beid})[0][0]
       
  1322         self.assertEqual(newname, 'toto-moved')
       
  1323 
       
  1324     def test_update_not_exists(self):
       
  1325         rset = self.qexecute("INSERT Personne X, Societe Y: X nom 'bidule', Y nom 'toto'")
       
  1326         eid1, eid2 = rset[0][0], rset[0][1]
       
  1327         rset = self.qexecute("SET X travaille Y WHERE X eid %(x)s, Y eid %(y)s, "
       
  1328                             "NOT EXISTS(Z ecrit_par X)",
       
  1329                             {'x': text_type(eid1), 'y': text_type(eid2)})
       
  1330         self.assertEqual(tuplify(rset.rows), [(eid1, eid2)])
       
  1331 
       
  1332     def test_update_query_error(self):
       
  1333         self.qexecute("INSERT Personne Y: Y nom 'toto'")
       
  1334         self.assertRaises(Exception, self.qexecute, "SET X nom 'toto', X is Personne")
       
  1335         self.assertRaises(QueryError, self.qexecute, "SET X nom 'toto', X has_text 'tutu' "
       
  1336                           "WHERE X is Personne")
       
  1337         self.assertRaises(QueryError,
       
  1338                           self.qexecute,
       
  1339                           "SET X login 'tutu', X eid %s" % cnx.user(self.session).eid)
       
  1340 
       
  1341 
       
  1342     # HAVING on write queries test #############################################
       
  1343 
       
  1344     def test_update_having(self):
       
  1345         peid1 = self.qexecute("INSERT Personne Y: Y nom 'hop', Y tel 1")[0][0]
       
  1346         peid2 = self.qexecute("INSERT Personne Y: Y nom 'hop', Y tel 2")[0][0]
       
  1347         rset = self.qexecute("SET X tel 3 WHERE X tel TEL HAVING TEL&1=1")
       
  1348         self.assertEqual(tuplify(rset.rows), [(peid1, 3)])
       
  1349 
       
  1350     def test_insert_having(self):
       
  1351         self.skipTest('unsupported yet')
       
  1352         self.qexecute("INSERT Personne Y: Y nom 'hop', Y tel 1")[0][0]
       
  1353         self.assertFalse(self.qexecute("INSERT Personne Y: Y nom 'hop', Y tel 2 "
       
  1354                                        "WHERE X tel XT HAVING XT&2=2"))
       
  1355         self.assertTrue(self.qexecute("INSERT Personne Y: Y nom 'hop', Y tel 2 "
       
  1356                                       "WHERE X tel XT HAVING XT&1=1"))
       
  1357 
       
  1358     def test_delete_having(self):
       
  1359         self.qexecute("INSERT Personne Y: Y nom 'hop', Y tel 1")[0][0]
       
  1360         self.assertFalse(self.qexecute("DELETE Personne Y WHERE X tel XT HAVING XT&2=2"))
       
  1361         self.assertTrue(self.qexecute("DELETE Personne Y WHERE X tel XT HAVING XT&1=1"))
       
  1362 
       
  1363     # upassword encryption tests #################################################
       
  1364 
       
  1365     def test_insert_upassword(self):
       
  1366         rset = self.qexecute("INSERT CWUser X: X login 'bob', X upassword 'toto', "
       
  1367                              "X in_group G WHERE G name 'users'")
       
  1368         self.assertEqual(len(rset.rows), 1)
       
  1369         self.assertEqual(rset.description, [('CWUser',)])
       
  1370         self.assertRaises(Unauthorized,
       
  1371                           self.qexecute, "Any P WHERE X is CWUser, X login 'bob', X upassword P")
       
  1372         with self.session.new_cnx() as cnx:
       
  1373             cursor = cnx.cnxset.cu
       
  1374             cursor.execute("SELECT %supassword from %sCWUser WHERE %slogin='bob'"
       
  1375                            % (SQL_PREFIX, SQL_PREFIX, SQL_PREFIX))
       
  1376             passwd = binary_type(cursor.fetchone()[0])
       
  1377             self.assertEqual(passwd, crypt_password('toto', passwd))
       
  1378         rset = self.qexecute("Any X WHERE X is CWUser, X login 'bob', X upassword %(pwd)s",
       
  1379                             {'pwd': Binary(passwd)})
       
  1380         self.assertEqual(len(rset.rows), 1)
       
  1381         self.assertEqual(rset.description, [('CWUser',)])
       
  1382 
       
  1383     def test_update_upassword(self):
       
  1384         with self.session.new_cnx() as cnx:
       
  1385             rset = cnx.execute("INSERT CWUser X: X login 'bob', X upassword %(pwd)s",
       
  1386                                {'pwd': 'toto'})
       
  1387             self.assertEqual(rset.description[0][0], 'CWUser')
       
  1388             rset = cnx.execute("SET X upassword %(pwd)s WHERE X is CWUser, X login 'bob'",
       
  1389                                {'pwd': b'tutu'})
       
  1390             cursor = cnx.cnxset.cu
       
  1391             cursor.execute("SELECT %supassword from %sCWUser WHERE %slogin='bob'"
       
  1392                            % (SQL_PREFIX, SQL_PREFIX, SQL_PREFIX))
       
  1393             passwd = binary_type(cursor.fetchone()[0])
       
  1394             self.assertEqual(passwd, crypt_password('tutu', passwd))
       
  1395             rset = cnx.execute("Any X WHERE X is CWUser, X login 'bob', X upassword %(pwd)s",
       
  1396                                {'pwd': Binary(passwd)})
       
  1397             self.assertEqual(len(rset.rows), 1)
       
  1398             self.assertEqual(rset.description, [('CWUser',)])
       
  1399 
       
  1400     # ZT datetime tests ########################################################
       
  1401 
       
  1402     def test_tz_datetime(self):
       
  1403         self.qexecute("INSERT Personne X: X nom 'bob', X tzdatenaiss %(date)s",
       
  1404                      {'date': datetime(1977, 6, 7, 2, 0, tzinfo=FixedOffset(1))})
       
  1405         datenaiss = self.qexecute("Any XD WHERE X nom 'bob', X tzdatenaiss XD")[0][0]
       
  1406         self.assertIsNotNone(datenaiss.tzinfo)
       
  1407         self.assertEqual(datenaiss.utctimetuple()[:5], (1977, 6, 7, 1, 0))
       
  1408 
       
  1409     def test_tz_datetime_cache_nonregr(self):
       
  1410         datenaiss = datetime(1977, 6, 7, 2, 0, tzinfo=FixedOffset(1))
       
  1411         self.qexecute("INSERT Personne X: X nom 'bob', X tzdatenaiss %(date)s",
       
  1412                      {'date': datenaiss})
       
  1413         self.assertTrue(self.qexecute("Any X WHERE X tzdatenaiss %(d)s", {'d': datenaiss}))
       
  1414         self.assertFalse(self.qexecute("Any X WHERE X tzdatenaiss %(d)s", {'d': datenaiss - timedelta(1)}))
       
  1415 
       
  1416     # non regression tests #####################################################
       
  1417 
       
  1418     def test_nonregr_1(self):
       
  1419         teid = self.qexecute("INSERT Tag X: X name 'tag'")[0][0]
       
  1420         self.qexecute("SET X tags Y WHERE X name 'tag', Y is State, Y name 'activated'")
       
  1421         rset = self.qexecute('Any X WHERE T tags X')
       
  1422         self.assertEqual(len(rset.rows), 1, rset.rows)
       
  1423         rset = self.qexecute('Any T WHERE T tags X, X is State')
       
  1424         self.assertEqual(rset.rows, [[teid]])
       
  1425         rset = self.qexecute('Any T WHERE T tags X')
       
  1426         self.assertEqual(rset.rows, [[teid]])
       
  1427 
       
  1428     def test_nonregr_2(self):
       
  1429         teid = self.qexecute("INSERT Tag X: X name 'tag'")[0][0]
       
  1430         geid = self.qexecute("CWGroup G WHERE G name 'users'")[0][0]
       
  1431         self.qexecute("SET X tags Y WHERE X eid %(t)s, Y eid %(g)s",
       
  1432                        {'g': geid, 't': teid})
       
  1433         rset = self.qexecute('Any X WHERE E eid %(x)s, E tags X',
       
  1434                               {'x': teid})
       
  1435         self.assertEqual(rset.rows, [[geid]])
       
  1436 
       
  1437     def test_nonregr_3(self):
       
  1438         """bad sql generated on the second query (destination_state is not
       
  1439         detected as an inlined relation)
       
  1440         """
       
  1441         rset = self.qexecute('Any S,ES,T WHERE S state_of WF, WF workflow_of ET, ET name "CWUser",'
       
  1442                              'ES allowed_transition T, T destination_state S')
       
  1443         self.assertEqual(len(rset.rows), 2)
       
  1444 
       
  1445     def test_nonregr_4(self):
       
  1446         # fix variables'type, else we get (nb of entity types with a 'name' attribute)**3
       
  1447         # union queries and that make for instance a 266Ko sql query which is refused
       
  1448         # by the server (or client lib)
       
  1449         rset = self.qexecute('Any ER,SE,OE WHERE SE name "Comment", ER name "comments", OE name "Comment",'
       
  1450                             'ER is CWRType, SE is CWEType, OE is CWEType')
       
  1451         self.assertEqual(len(rset), 1)
       
  1452 
       
  1453     def test_nonregr_5(self):
       
  1454         # jpl #15505: equivalent queries returning different result sets
       
  1455         teid1 = self.qexecute("INSERT Folder X: X name 'hop'")[0][0]
       
  1456         teid2 = self.qexecute("INSERT Folder X: X name 'hip'")[0][0]
       
  1457         neid = self.qexecute("INSERT Note X: X todo_by U, X filed_under T "
       
  1458                              "WHERE U login 'admin', T name 'hop'")[0][0]
       
  1459         weid = self.qexecute("INSERT Affaire X: X concerne N, X filed_under T "
       
  1460                              "WHERE N is Note, T name 'hip'")[0][0]
       
  1461         rset1 = self.qexecute('Any N,U WHERE N filed_under T, T eid %s,'
       
  1462                              'N todo_by U, W concerne N,'
       
  1463                              'W is Affaire, W filed_under A, A eid %s' % (teid1, teid2))
       
  1464         rset2 = self.qexecute('Any N,U WHERE N filed_under T, T eid %s,'
       
  1465                              'N todo_by U, W concerne N,'
       
  1466                              'W filed_under A, A eid %s' % (teid1, teid2))
       
  1467         rset3 = self.qexecute('Any N,U WHERE N todo_by U, T eid %s,'
       
  1468                              'N filed_under T, W concerne N,'
       
  1469                              'W is Affaire, W filed_under A, A eid %s' % (teid1, teid2))
       
  1470         rset4 = self.qexecute('Any N,U WHERE N todo_by U, T eid %s,'
       
  1471                              'N filed_under T, W concerne N,'
       
  1472                              'W filed_under A, A eid %s' % (teid1, teid2))
       
  1473         self.assertEqual(rset1.rows, rset2.rows)
       
  1474         self.assertEqual(rset1.rows, rset3.rows)
       
  1475         self.assertEqual(rset1.rows, rset4.rows)
       
  1476 
       
  1477     def test_nonregr_6(self):
       
  1478         self.qexecute('Any N,COUNT(S) GROUPBY N ORDERBY COUNT(N) WHERE S name N, S is State')
       
  1479 
       
  1480     def test_sqlite_encoding(self):
       
  1481         """XXX this test was trying to show a bug on use of lower which only
       
  1482         occurs with non ascii string and misconfigured locale
       
  1483         """
       
  1484         self.qexecute("INSERT Tag X: X name %(name)s,"
       
  1485                        "X modification_date %(modification_date)s,"
       
  1486                        "X creation_date %(creation_date)s",
       
  1487                        {'name': u'éname0',
       
  1488                         'modification_date': '2003/03/12 11:00',
       
  1489                         'creation_date': '2000/07/03 11:00'})
       
  1490         rset = self.qexecute('Any lower(N) ORDERBY LOWER(N) WHERE X is Tag, X name N,'
       
  1491                             'X owned_by U, U eid %(x)s',
       
  1492                             {'x':self.session.user.eid})
       
  1493         self.assertEqual(rset.rows, [[u'\xe9name0']])
       
  1494 
       
  1495 
       
  1496     def test_nonregr_description(self):
       
  1497         """check that a correct description is built in case where infered
       
  1498         solutions may be "fusionned" into one by the querier while all solutions
       
  1499         are needed to build the result's description
       
  1500         """
       
  1501         self.qexecute("INSERT Personne X: X nom 'bidule'")
       
  1502         self.qexecute("INSERT Societe Y: Y nom 'toto'")
       
  1503         beid = self.qexecute("INSERT Basket B: B name 'mybasket'")[0][0]
       
  1504         self.qexecute("SET X in_basket B WHERE X is Personne")
       
  1505         self.qexecute("SET X in_basket B WHERE X is Societe")
       
  1506         rset = self.qexecute('Any X WHERE X in_basket B, B eid %s' % beid)
       
  1507         self.assertEqual(len(rset), 2)
       
  1508         self.assertEqual(rset.description, [('Personne',), ('Societe',)])
       
  1509 
       
  1510 
       
  1511     def test_nonregr_cache_1(self):
       
  1512         peid = self.qexecute("INSERT Personne X: X nom 'bidule'")[0][0]
       
  1513         beid = self.qexecute("INSERT Basket X: X name 'tag'")[0][0]
       
  1514         self.qexecute("SET X in_basket Y WHERE X is Personne, Y eid %(y)s",
       
  1515                        {'y': beid})
       
  1516         rset = self.qexecute("Any X WHERE X in_basket B, B eid %(x)s",
       
  1517                        {'x': beid})
       
  1518         self.assertEqual(rset.rows, [[peid]])
       
  1519         rset = self.qexecute("Any X WHERE X in_basket B, B eid %(x)s",
       
  1520                        {'x': beid})
       
  1521         self.assertEqual(rset.rows, [[peid]])
       
  1522 
       
  1523     def test_nonregr_has_text_cache(self):
       
  1524         eid1 = self.qexecute("INSERT Personne X: X nom 'bidule'")[0][0]
       
  1525         eid2 = self.qexecute("INSERT Personne X: X nom 'tag'")[0][0]
       
  1526         rset = self.qexecute("Any X WHERE X has_text %(text)s", {'text': u'bidule'})
       
  1527         self.assertEqual(rset.rows, [[eid1]])
       
  1528         rset = self.qexecute("Any X WHERE X has_text %(text)s", {'text': u'tag'})
       
  1529         self.assertEqual(rset.rows, [[eid2]])
       
  1530 
       
  1531     def test_nonregr_sortterm_management(self):
       
  1532         """Error: Variable has no attribute 'sql' in rql2sql.py (visit_variable)
       
  1533 
       
  1534         cause: old variable ref inserted into a fresh rqlst copy
       
  1535         (in RQLSpliter._complex_select_plan)
       
  1536 
       
  1537         need sqlite including http://www.sqlite.org/cvstrac/tktview?tn=3773 fix
       
  1538         """
       
  1539         self.qexecute('Any X ORDERBY D DESC WHERE X creation_date D')
       
  1540 
       
  1541     def test_nonregr_extra_joins(self):
       
  1542         ueid = self.session.user.eid
       
  1543         teid1 = self.qexecute("INSERT Folder X: X name 'folder1'")[0][0]
       
  1544         teid2 = self.qexecute("INSERT Folder X: X name 'folder2'")[0][0]
       
  1545         neid1 = self.qexecute("INSERT Note X: X para 'note1'")[0][0]
       
  1546         neid2 = self.qexecute("INSERT Note X: X para 'note2'")[0][0]
       
  1547         self.qexecute("SET X filed_under Y WHERE X eid %s, Y eid %s"
       
  1548                        % (neid1, teid1))
       
  1549         self.qexecute("SET X filed_under Y WHERE X eid %s, Y eid %s"
       
  1550                        % (neid2, teid2))
       
  1551         self.qexecute("SET X todo_by Y WHERE X is Note, Y eid %s" % ueid)
       
  1552         rset = self.qexecute('Any N WHERE N todo_by U, N is Note, U eid %s, N filed_under T, T eid %s'
       
  1553                              % (ueid, teid1))
       
  1554         self.assertEqual(len(rset), 1)
       
  1555 
       
  1556     def test_nonregr_XXX(self):
       
  1557         teid = self.qexecute('Transition S WHERE S name "deactivate"')[0][0]
       
  1558         rset = self.qexecute('Any O WHERE O is State, '
       
  1559                              'S eid %(x)s, S transition_of ET, O state_of ET', {'x': teid})
       
  1560         self.assertEqual(len(rset), 2)
       
  1561         rset = self.qexecute('Any O WHERE O is State, NOT S destination_state O, '
       
  1562                              'S eid %(x)s, S transition_of ET, O state_of ET', {'x': teid})
       
  1563         self.assertEqual(len(rset), 1)
       
  1564 
       
  1565 
       
  1566     def test_nonregr_set_datetime(self):
       
  1567         # huum, psycopg specific
       
  1568         self.qexecute('SET X creation_date %(date)s WHERE X eid 1', {'date': date.today()})
       
  1569 
       
  1570     def test_nonregr_u_owned_by_u(self):
       
  1571         ueid = self.qexecute("INSERT CWUser X: X login 'bob', X upassword 'toto', X in_group G "
       
  1572                              "WHERE G name 'users'")[0][0]
       
  1573         rset = self.qexecute("CWUser U")
       
  1574         self.assertEqual(len(rset), 3) # bob + admin + anon
       
  1575         rset = self.qexecute("Any U WHERE NOT U owned_by U")
       
  1576         # even admin created at repo initialization time should belong to itself
       
  1577         self.assertEqual(len(rset), 0)
       
  1578 
       
  1579     def test_nonreg_update_index(self):
       
  1580         # this is the kind of queries generated by "cubicweb-ctl db-check -ry"
       
  1581         self.qexecute("SET X description D WHERE X is State, X description D")
       
  1582 
       
  1583     def test_nonregr_is(self):
       
  1584         uteid = self.qexecute('Any ET WHERE ET name "CWUser"')[0][0]
       
  1585         self.qexecute('Any X, ET WHERE X is ET, ET eid %s' % uteid)
       
  1586 
       
  1587     def test_nonregr_orderby(self):
       
  1588         seid = self.qexecute('Any X WHERE X name "activated"')[0][0]
       
  1589         self.qexecute('Any X,S, MAX(T) GROUPBY X,S ORDERBY S '
       
  1590                       'WHERE X is CWUser, T tags X, S eid IN(%s), X in_state S' % seid)
       
  1591 
       
  1592     def test_nonregr_solution_cache(self):
       
  1593         self.skipTest('XXX should be fixed or documented') # (doesn't occur if cache key is provided.)
       
  1594         rset = self.qexecute('Any X WHERE X is CWUser, X eid %(x)s', {'x':self.ueid})
       
  1595         self.assertEqual(len(rset), 1)
       
  1596         rset = self.qexecute('Any X WHERE X is CWUser, X eid %(x)s', {'x':12345})
       
  1597         self.assertEqual(len(rset), 0)
       
  1598 
       
  1599     def test_nonregr_final_norestr(self):
       
  1600         self.assertRaises(BadRQLQuery, self.qexecute, 'Date X')
       
  1601 
       
  1602     def test_nonregr_eid_cmp(self):
       
  1603         peid1 = self.qexecute("INSERT Personne X: X nom 'bidule'")[0][0]
       
  1604         peid2 = self.qexecute("INSERT Personne X: X nom 'bidule'")[0][0]
       
  1605         rset = self.qexecute('Any X,Y WHERE X is Personne, Y is Personne, '
       
  1606                              'X nom XD, Y nom XD, X eid Z, Y eid > Z')
       
  1607         self.assertEqual(rset.rows, [[peid1, peid2]])
       
  1608         rset = self.qexecute('Any X,Y WHERE X nom XD, Y nom XD, X eid Z, Y eid > Z')
       
  1609         self.assertEqual(rset.rows, [[peid1, peid2]])
       
  1610 
       
  1611     def test_nonregr_has_text_ambiguity_1(self):
       
  1612         peid = self.qexecute("INSERT CWUser X: X login 'bidule', X upassword 'bidule', "
       
  1613                              "X in_group G WHERE G name 'users'")[0][0]
       
  1614         aeid = self.qexecute("INSERT Affaire X: X ref 'bidule'")[0][0]
       
  1615         rset = self.qexecute('Any X WHERE X is CWUser, X has_text "bidule"')
       
  1616         self.assertEqual(rset.rows, [[peid]])
       
  1617         rset = self.qexecute('Any X WHERE X is CWUser, X has_text "bidule", '
       
  1618                              'X in_state S, S name SN')
       
  1619         self.assertEqual(rset.rows, [[peid]])
       
  1620 
       
  1621 
       
  1622     def test_nonregr_sql_cache(self):
       
  1623         # different SQL generated when 'name' is None or not (IS NULL).
       
  1624         self.assertFalse(self.qexecute('Any X WHERE X is CWEType, X name %(name)s',
       
  1625                                        {'name': None}))
       
  1626         self.assertTrue(self.qexecute('Any X WHERE X is CWEType, X name %(name)s',
       
  1627                                       {'name': 'CWEType'}))
       
  1628 
       
  1629 
       
  1630 class NonRegressionTC(CubicWebTC):
       
  1631 
       
  1632     def test_has_text_security_cache_bug(self):
       
  1633         with self.admin_access.repo_cnx() as cnx:
       
  1634             self.create_user(cnx, 'user', ('users',))
       
  1635             aff1 = cnx.create_entity('Societe', nom=u'aff1')
       
  1636             aff2 = cnx.create_entity('Societe', nom=u'aff2')
       
  1637             cnx.commit()
       
  1638         with self.new_access('user').repo_cnx() as cnx:
       
  1639             res = cnx.execute('Any X WHERE X has_text %(text)s', {'text': u'aff1'})
       
  1640             self.assertEqual(res.rows, [[aff1.eid]])
       
  1641             res = cnx.execute('Any X WHERE X has_text %(text)s', {'text': u'aff2'})
       
  1642             self.assertEqual(res.rows, [[aff2.eid]])
       
  1643 
       
  1644     def test_set_relations_eid(self):
       
  1645         with self.admin_access.repo_cnx() as cnx:
       
  1646             # create 3 email addresses
       
  1647             a1 = cnx.create_entity('EmailAddress', address=u'a1')
       
  1648             a2 = cnx.create_entity('EmailAddress', address=u'a2')
       
  1649             a3 = cnx.create_entity('EmailAddress', address=u'a3')
       
  1650             # SET relations using '>=' operator on eids
       
  1651             cnx.execute('SET U use_email A WHERE U login "admin", A eid >= %s' % a2.eid)
       
  1652             self.assertEqual(
       
  1653                 [[a2.eid], [a3.eid]],
       
  1654                 cnx.execute('Any A ORDERBY A WHERE U use_email A, U login "admin"').rows)
       
  1655             # DELETE
       
  1656             cnx.execute('DELETE U use_email A WHERE U login "admin", A eid > %s' % a2.eid)
       
  1657             self.assertEqual(
       
  1658                 [[a2.eid]],
       
  1659                 cnx.execute('Any A ORDERBY A WHERE U use_email A, U login "admin"').rows)
       
  1660             cnx.execute('DELETE U use_email A WHERE U login "admin"')
       
  1661             # SET relations using '<' operator on eids
       
  1662             cnx.execute('SET U use_email A WHERE U login "admin", A eid < %s' % a2.eid)
       
  1663             self.assertEqual(
       
  1664                 [[a1.eid]],
       
  1665                 cnx.execute('Any A ORDERBY A WHERE U use_email A, U login "admin"').rows)
       
  1666 
       
  1667 if __name__ == '__main__':
       
  1668     unittest_main()