server/test/unittest_querier.py
changeset 0 b97547f5f1fa
child 47 54087a269bdd
equal deleted inserted replaced
-1:000000000000 0:b97547f5f1fa
       
     1 # -*- coding: iso-8859-1 -*-
       
     2 """unit tests for modules cubicweb.server.querier and cubicweb.server.querier_steps
       
     3 """
       
     4 
       
     5 from logilab.common.testlib import TestCase, unittest_main
       
     6 from cubicweb.devtools import init_test_database
       
     7 from cubicweb.devtools.repotest import tuplify, BaseQuerierTC
       
     8 from unittest_session import Variable
       
     9 
       
    10 from mx.DateTime import today, now, DateTimeType
       
    11 from rql import BadRQLQuery, RQLSyntaxError
       
    12 from cubicweb import QueryError, Unauthorized
       
    13 from cubicweb.server.utils import crypt_password
       
    14 from cubicweb.server.sources.native import make_schema
       
    15 
       
    16 
       
    17 # register priority/severity sorting registered procedure
       
    18 from rql.utils import register_function, FunctionDescr
       
    19 
       
    20 class group_sort_value(FunctionDescr):
       
    21     supported_backends = ('sqlite',)
       
    22     rtype = 'Int'
       
    23 try:
       
    24     register_function(group_sort_value)
       
    25 except AssertionError:
       
    26     pass
       
    27 from cubicweb.server.sqlutils import SQL_CONNECT_HOOKS
       
    28 def init_sqlite_connexion(cnx):
       
    29     def group_sort_value(text):
       
    30         return {"managers": "3", "users": "2", "guests":  "1", "owners": "0"}[text]
       
    31     cnx.create_function("GROUP_SORT_VALUE", 1, group_sort_value)
       
    32 SQL_CONNECT_HOOKS['sqlite'].append(init_sqlite_connexion)
       
    33 
       
    34 
       
    35 from logilab.common.adbh import _GenericAdvFuncHelper
       
    36 TYPEMAP = _GenericAdvFuncHelper.TYPE_MAPPING
       
    37 
       
    38 class MakeSchemaTC(TestCase):
       
    39     def test_known_values(self):
       
    40         solution = {'A': 'String', 'B': 'EUser'}
       
    41         self.assertEquals(make_schema((Variable('A'), Variable('B')), solution, 
       
    42                                       'table0', TYPEMAP),
       
    43                           ('C0 text,C1 integer', {'A': 'table0.C0', 'B': 'table0.C1'}))
       
    44         
       
    45 
       
    46 repo, cnx = init_test_database('sqlite')
       
    47 
       
    48 
       
    49 
       
    50 class UtilsTC(BaseQuerierTC):
       
    51     repo = repo
       
    52     
       
    53     def get_max_eid(self):
       
    54         # no need for cleanup here
       
    55         return None
       
    56     def cleanup(self):
       
    57         # no need for cleanup here
       
    58         pass
       
    59     
       
    60     def test_preprocess_1(self):
       
    61         reid = self.execute('Any X WHERE X is ERType, X name "owned_by"')[0][0]
       
    62         rqlst = self._prepare('Any COUNT(RDEF) WHERE RDEF relation_type X, X eid %(x)s', {'x': reid})
       
    63         self.assertEquals(rqlst.solutions, [{'RDEF': 'EFRDef'}, {'RDEF': 'ENFRDef'}])
       
    64         
       
    65     def test_preprocess_2(self):
       
    66         teid = self.execute("INSERT Tag X: X name 'tag'")[0][0]
       
    67         #geid = self.execute("EGroup G WHERE G name 'users'")[0][0]
       
    68         #self.execute("SET X tags Y WHERE X eid %(t)s, Y eid %(g)s",
       
    69         #             {'g': geid, 't': teid}, 'g')
       
    70         rqlst = self._prepare('Any X WHERE E eid %(x)s, E tags X', {'x': teid})
       
    71         # the query may be optimized, should keep only one solution
       
    72         # (any one, etype will be discarded)
       
    73         self.assertEquals(len(rqlst.solutions), 1)
       
    74         
       
    75     def test_preprocess_security(self):
       
    76         plan = self._prepare_plan('Any ETN,COUNT(X) GROUPBY ETN '
       
    77                                   'WHERE X is ET, ET name ETN')
       
    78         plan.session = self._user_session(('users',))[1]
       
    79         union = plan.rqlst
       
    80         plan.preprocess(union)
       
    81         self.assertEquals(len(union.children), 1)
       
    82         self.assertEquals(len(union.children[0].with_), 1)
       
    83         subq = union.children[0].with_[0].query
       
    84         self.assertEquals(len(subq.children), 3)
       
    85         self.assertEquals([t.as_string() for t in union.children[0].selection],
       
    86                           ['ETN','COUNT(X)'])
       
    87         self.assertEquals([t.as_string() for t in union.children[0].groupby],
       
    88                           ['ETN'])
       
    89         partrqls = sorted(((rqlst.as_string(), rqlst.solutions) for rqlst in subq.children))
       
    90         rql, solutions = partrqls[0]
       
    91         self.assertEquals(rql,
       
    92                           'Any ETN,X WHERE X is ET, ET name ETN, (EXISTS(X owned_by %(B)s))'
       
    93                           ' OR ((((EXISTS(D concerne C?, C owned_by %(B)s, X identity D, C is Division, D is Affaire))'
       
    94                           ' OR (EXISTS(H concerne G?, G owned_by %(B)s, G is SubDivision, X identity H, H is Affaire)))'
       
    95                           ' OR (EXISTS(I concerne F?, F owned_by %(B)s, F is Societe, X identity I, I is Affaire)))'
       
    96                           ' OR (EXISTS(J concerne E?, E owned_by %(B)s, E is Note, X identity J, J is Affaire)))'
       
    97                           ', ET is EEType, X is Affaire')
       
    98         self.assertEquals(solutions, [{'C': 'Division',
       
    99                                        'D': 'Affaire',
       
   100                                        'E': 'Note',
       
   101                                        'F': 'Societe',
       
   102                                        'G': 'SubDivision',
       
   103                                        'H': 'Affaire',
       
   104                                        'I': 'Affaire',
       
   105                                        'J': 'Affaire',
       
   106                                        'X': 'Affaire',
       
   107                                        'ET': 'EEType', 'ETN': 'String'}])
       
   108         rql, solutions = partrqls[1]
       
   109         self.assertEquals(rql,  'Any ETN,X WHERE X is ET, ET name ETN, ET is EEType, '
       
   110                           'X is IN(Bookmark, Card, Comment, Division, EConstraint, EConstraintType, EEType, EFRDef, EGroup, ENFRDef, EPermission, EProperty, ERType, EUser, Email, EmailAddress, EmailPart, EmailThread, File, Folder, Image, Note, Personne, RQLExpression, Societe, State, SubDivision, Tag, TrInfo, Transition)')
       
   111         self.assertListEquals(sorted(solutions),
       
   112                               sorted([{'X': 'Bookmark', 'ETN': 'String', 'ET': 'EEType'},
       
   113                                       {'X': 'Card', 'ETN': 'String', 'ET': 'EEType'},
       
   114                                       {'X': 'Comment', 'ETN': 'String', 'ET': 'EEType'},
       
   115                                       {'X': 'Division', 'ETN': 'String', 'ET': 'EEType'},
       
   116                                       {'X': 'EConstraint', 'ETN': 'String', 'ET': 'EEType'},
       
   117                                       {'X': 'EConstraintType', 'ETN': 'String', 'ET': 'EEType'},
       
   118                                       {'X': 'EEType', 'ETN': 'String', 'ET': 'EEType'},
       
   119                                       {'X': 'EFRDef', 'ETN': 'String', 'ET': 'EEType'},
       
   120                                       {'X': 'EGroup', 'ETN': 'String', 'ET': 'EEType'},
       
   121                                       {'X': 'Email', 'ETN': 'String', 'ET': 'EEType'},
       
   122                                       {'X': 'EmailAddress', 'ETN': 'String', 'ET': 'EEType'},
       
   123                                       {'X': 'EmailPart', 'ETN': 'String', 'ET': 'EEType'},
       
   124                                       {'X': 'EmailThread', 'ETN': 'String', 'ET': 'EEType'},
       
   125                                       {'X': 'ENFRDef', 'ETN': 'String', 'ET': 'EEType'},
       
   126                                       {'X': 'EPermission', 'ETN': 'String', 'ET': 'EEType'},
       
   127                                       {'X': 'EProperty', 'ETN': 'String', 'ET': 'EEType'},
       
   128                                       {'X': 'ERType', 'ETN': 'String', 'ET': 'EEType'},
       
   129                                       {'X': 'EUser', 'ETN': 'String', 'ET': 'EEType'},
       
   130                                       {'X': 'File', 'ETN': 'String', 'ET': 'EEType'},
       
   131                                       {'X': 'Folder', 'ETN': 'String', 'ET': 'EEType'},
       
   132                                       {'X': 'Image', 'ETN': 'String', 'ET': 'EEType'},
       
   133                                       {'X': 'Note', 'ETN': 'String', 'ET': 'EEType'},
       
   134                                       {'X': 'Personne', 'ETN': 'String', 'ET': 'EEType'},
       
   135                                       {'X': 'RQLExpression', 'ETN': 'String', 'ET': 'EEType'},
       
   136                                       {'X': 'Societe', 'ETN': 'String', 'ET': 'EEType'},
       
   137                                       {'X': 'State', 'ETN': 'String', 'ET': 'EEType'},
       
   138                                       {'X': 'SubDivision', 'ETN': 'String', 'ET': 'EEType'},
       
   139                                       {'X': 'Tag', 'ETN': 'String', 'ET': 'EEType'},
       
   140                                       {'X': 'Transition', 'ETN': 'String', 'ET': 'EEType'},
       
   141                                       {'X': 'TrInfo', 'ETN': 'String', 'ET': 'EEType'}]))
       
   142         rql, solutions = partrqls[2]
       
   143         self.assertEquals(rql,
       
   144                           'Any ETN,X WHERE X is ET, ET name ETN, EXISTS(X owned_by %(C)s), '
       
   145                           'ET is EEType, X is Basket')
       
   146         self.assertEquals(solutions, [{'ET': 'EEType',
       
   147                                        'X': 'Basket',
       
   148                                        'ETN': 'String',
       
   149                                        }])
       
   150 
       
   151     def test_preprocess_security_aggregat(self):
       
   152         plan = self._prepare_plan('Any MAX(X)')
       
   153         plan.session = self._user_session(('users',))[1]
       
   154         union = plan.rqlst
       
   155         plan.preprocess(union)
       
   156         self.assertEquals(len(union.children), 1)
       
   157         self.assertEquals(len(union.children[0].with_), 1)
       
   158         subq = union.children[0].with_[0].query
       
   159         self.assertEquals(len(subq.children), 3)
       
   160         self.assertEquals([t.as_string() for t in union.children[0].selection],
       
   161                           ['MAX(X)'])
       
   162         
       
   163     def test_preprocess_nonregr(self):
       
   164         rqlst = self._prepare('Any S ORDERBY SI WHERE NOT S ecrit_par O, S para SI')
       
   165         self.assertEquals(len(rqlst.solutions), 1)
       
   166     
       
   167     def test_build_description(self):
       
   168         # should return an empty result set
       
   169         rset = self.execute('Any X WHERE X eid %(x)s', {'x': self.session.user.eid})
       
   170         self.assertEquals(rset.description[0][0], 'EUser')
       
   171         rset = self.execute('Any 1')
       
   172         self.assertEquals(rset.description[0][0], 'Int')
       
   173         rset = self.execute('Any TRUE')
       
   174         self.assertEquals(rset.description[0][0], 'Boolean')
       
   175         rset = self.execute('Any "hop"')
       
   176         self.assertEquals(rset.description[0][0], 'String')
       
   177         rset = self.execute('Any TODAY')
       
   178         self.assertEquals(rset.description[0][0], 'Date')
       
   179         rset = self.execute('Any NOW')
       
   180         self.assertEquals(rset.description[0][0], 'Datetime')
       
   181         rset = self.execute('Any %(x)s', {'x': 1})
       
   182         self.assertEquals(rset.description[0][0], 'Int')
       
   183         rset = self.execute('Any %(x)s', {'x': 1L})
       
   184         self.assertEquals(rset.description[0][0], 'Int')
       
   185         rset = self.execute('Any %(x)s', {'x': True})
       
   186         self.assertEquals(rset.description[0][0], 'Boolean')
       
   187         rset = self.execute('Any %(x)s', {'x': 1.0})
       
   188         self.assertEquals(rset.description[0][0], 'Float')
       
   189         rset = self.execute('Any %(x)s', {'x': now()})
       
   190         self.assertEquals(rset.description[0][0], 'Datetime')
       
   191         rset = self.execute('Any %(x)s', {'x': 'str'})
       
   192         self.assertEquals(rset.description[0][0], 'String')
       
   193         rset = self.execute('Any %(x)s', {'x': u'str'})
       
   194         self.assertEquals(rset.description[0][0], 'String')
       
   195 
       
   196 
       
   197 class QuerierTC(BaseQuerierTC):
       
   198     repo = repo
       
   199 
       
   200     def test_encoding_pb(self):
       
   201         self.assertRaises(RQLSyntaxError, self.execute,
       
   202                           'Any X WHERE X is ERType, X name "öwned_by"')
       
   203 
       
   204     def test_unknown_eid(self):
       
   205         # should return an empty result set
       
   206         self.failIf(self.execute('Any X WHERE X eid 99999999'))
       
   207         
       
   208     # selection queries tests #################################################
       
   209     
       
   210     def test_select_1(self):
       
   211         rset = self.execute('Any X ORDERBY X WHERE X is EGroup')
       
   212         result, descr = rset.rows, rset.description
       
   213         self.assertEquals(tuplify(result), [(1,), (2,), (3,), (4,)])
       
   214         self.assertEquals(descr, [('EGroup',), ('EGroup',), ('EGroup',), ('EGroup',)])
       
   215         
       
   216     def test_select_2(self):
       
   217         rset = self.execute('Any X ORDERBY N WHERE X is EGroup, X name N')
       
   218         self.assertEquals(tuplify(rset.rows), [(3,), (1,), (4,), (2,)])
       
   219         self.assertEquals(rset.description, [('EGroup',), ('EGroup',), ('EGroup',), ('EGroup',)])
       
   220         rset = self.execute('Any X ORDERBY N DESC WHERE X is EGroup, X name N')
       
   221         self.assertEquals(tuplify(rset.rows), [(2,), (4,), (1,), (3,)])
       
   222         
       
   223     def test_select_3(self):
       
   224         rset = self.execute('Any N GROUPBY N WHERE X is EGroup, X name N')
       
   225         result, descr = rset.rows, rset.description
       
   226         result.sort()
       
   227         self.assertEquals(tuplify(result), [('guests',), ('managers',), ('owners',), ('users',)])
       
   228         self.assertEquals(descr, [('String',), ('String',), ('String',), ('String',)])
       
   229         
       
   230     def test_select_is(self):
       
   231         rset = self.execute('Any X, TN ORDERBY TN LIMIT 10 WHERE X is T, T name TN')
       
   232         result, descr = rset.rows, rset.description
       
   233         self.assertEquals(result[0][1], descr[0][0])
       
   234         
       
   235     def test_select_is_aggr(self):
       
   236         rset = self.execute('Any TN, COUNT(X) GROUPBY TN ORDERBY 2 DESC WHERE X is T, T name TN')
       
   237         result, descr = rset.rows, rset.description
       
   238         self.assertEquals(descr[0][0], 'String')
       
   239         self.assertEquals(descr[0][1], 'Int')
       
   240         self.assertEquals(result[0][0], 'ENFRDef')
       
   241         
       
   242     def test_select_groupby_orderby(self):
       
   243         rset = self.execute('Any N GROUPBY N ORDERBY N WHERE X is EGroup, X name N')
       
   244         self.assertEquals(tuplify(rset.rows), [('guests',), ('managers',), ('owners',), ('users',)])
       
   245         self.assertEquals(rset.description, [('String',), ('String',), ('String',), ('String',)])
       
   246         
       
   247     def test_select_complex_groupby(self):
       
   248         rset = self.execute('Any N GROUPBY N WHERE X name N')
       
   249         rset = self.execute('Any N,MAX(D) GROUPBY N LIMIT 5 WHERE X name N, X creation_date D')
       
   250         
       
   251     def test_select_inlined_groupby(self):
       
   252         seid = self.execute('State X WHERE X name "deactivated"')[0][0]
       
   253         rset = self.execute('Any U,L,S GROUPBY U,L,S WHERE X in_state S, U login L, S eid %s' % seid)
       
   254         
       
   255     def test_select_complex_orderby(self):
       
   256         rset1 = self.execute('Any N ORDERBY N WHERE X name N')
       
   257         self.assertEquals(sorted(rset1.rows), rset1.rows)
       
   258         rset = self.execute('Any N ORDERBY N LIMIT 5 OFFSET 1 WHERE X name N')
       
   259         self.assertEquals(rset.rows[0][0], rset1.rows[1][0]) 
       
   260         self.assertEquals(len(rset), 5)
       
   261         
       
   262     def test_select_5(self):
       
   263         rset = self.execute('Any X, TMP ORDERBY TMP WHERE X name TMP, X is EGroup')
       
   264         self.assertEquals(tuplify(rset.rows), [(3, 'guests',), (1, 'managers',), (4, 'owners',), (2, 'users',)])
       
   265         self.assertEquals(rset.description, [('EGroup', 'String',), ('EGroup', 'String',), ('EGroup', 'String',), ('EGroup', 'String',)])
       
   266         
       
   267     def test_select_6(self):
       
   268         self.execute("INSERT Personne X: X nom 'bidule'")[0]
       
   269         rset = self.execute('Any Y where X name TMP, Y nom in (TMP, "bidule")')
       
   270         #self.assertEquals(rset.description, [('Personne',), ('Personne',)])
       
   271         self.assert_(('Personne',) in rset.description)
       
   272         rset = self.execute('DISTINCT Any Y where X name TMP, Y nom in (TMP, "bidule")')
       
   273         self.assert_(('Personne',) in rset.description)
       
   274         
       
   275     def test_select_not_attr(self):
       
   276         self.execute("INSERT Personne X: X nom 'bidule'")
       
   277         self.execute("INSERT Societe X: X nom 'chouette'")
       
   278         rset = self.execute('Personne X WHERE NOT X nom "bidule"')
       
   279         self.assertEquals(len(rset.rows), 0, rset.rows)
       
   280         rset = self.execute('Personne X WHERE NOT X nom "bid"')
       
   281         self.assertEquals(len(rset.rows), 1, rset.rows)
       
   282         self.execute("SET P travaille S WHERE P nom 'bidule', S nom 'chouette'")
       
   283         rset = self.execute('Personne X WHERE NOT X travaille S')
       
   284         self.assertEquals(len(rset.rows), 0, rset.rows)
       
   285         
       
   286     def test_select_is_in(self):
       
   287         self.execute("INSERT Personne X: X nom 'bidule'")
       
   288         self.execute("INSERT Societe X: X nom 'chouette'")
       
   289         self.assertEquals(len(self.execute("Any X WHERE X is IN (Personne, Societe)")),
       
   290                           2)
       
   291         
       
   292     def test_select_not_rel(self):
       
   293         self.execute("INSERT Personne X: X nom 'bidule'")
       
   294         self.execute("INSERT Societe X: X nom 'chouette'")
       
   295         self.execute("INSERT Personne X: X nom 'autre'")
       
   296         self.execute("SET P travaille S WHERE P nom 'bidule', S nom 'chouette'")
       
   297         rset = self.execute('Personne X WHERE NOT X travaille S')
       
   298         self.assertEquals(len(rset.rows), 1, rset.rows)
       
   299         rset = self.execute('Personne X WHERE NOT X travaille S, S nom "chouette"')
       
   300         self.assertEquals(len(rset.rows), 1, rset.rows)
       
   301         
       
   302     def test_select_nonregr_inlined(self):
       
   303         self.execute("INSERT Note X: X para 'bidule'")
       
   304         self.execute("INSERT Personne X: X nom 'chouette'")
       
   305         self.execute("INSERT Personne X: X nom 'autre'")
       
   306         self.execute("SET X ecrit_par P WHERE X para 'bidule', P nom 'chouette'")
       
   307         rset = self.execute('Any U,T ORDERBY T DESC WHERE U is EUser, '
       
   308                             'N ecrit_par U, N type T')#, {'x': self.ueid})
       
   309         self.assertEquals(len(rset.rows), 0)
       
   310         
       
   311     def test_select_nonregr_edition_not(self):
       
   312         groupeids = set((1, 2, 3))
       
   313         groupreadperms = set(r[0] for r in self.execute('Any Y WHERE X name "EGroup", Y eid IN(1, 2, 3), X read_permission Y'))
       
   314         rset = self.execute('DISTINCT Any Y WHERE X is EEType, X name "EGroup", Y eid IN(1, 2, 3), NOT X read_permission Y')
       
   315         self.assertEquals(sorted(r[0] for r in rset.rows), sorted(groupeids - groupreadperms))
       
   316         rset = self.execute('DISTINCT Any Y WHERE X name "EGroup", Y eid IN(1, 2, 3), NOT X read_permission Y')
       
   317         self.assertEquals(sorted(r[0] for r in rset.rows), sorted(groupeids - groupreadperms))
       
   318                      
       
   319     def test_select_outer_join(self):
       
   320         peid1 = self.execute("INSERT Personne X: X nom 'bidule'")[0][0]
       
   321         peid2 = self.execute("INSERT Personne X: X nom 'autre'")[0][0]
       
   322         seid1 = self.execute("INSERT Societe X: X nom 'chouette'")[0][0]
       
   323         seid2 = self.execute("INSERT Societe X: X nom 'chouetos'")[0][0]
       
   324         rset = self.execute('Any X,S ORDERBY X WHERE X travaille S?')
       
   325         self.assertEquals(rset.rows, [[peid1, None], [peid2, None]])
       
   326         self.execute("SET P travaille S WHERE P nom 'bidule', S nom 'chouette'")
       
   327         rset = self.execute('Any X,S ORDERBY X WHERE X travaille S?')
       
   328         self.assertEquals(rset.rows, [[peid1, seid1], [peid2, None]])
       
   329         rset = self.execute('Any S,X ORDERBY S WHERE X? travaille S')
       
   330         self.assertEquals(rset.rows, [[seid1, peid1], [seid2, None]])
       
   331         
       
   332     def test_select_outer_join_optimized(self):
       
   333         peid1 = self.execute("INSERT Personne X: X nom 'bidule'")[0][0]
       
   334         rset = self.execute('Any X WHERE X eid %(x)s, P? connait X', {'x':peid1}, 'x')
       
   335         self.assertEquals(rset.rows, [[peid1]])
       
   336         rset = self.execute('Any X WHERE X eid %(x)s, X require_permission P?', {'x':peid1}, 'x')
       
   337         self.assertEquals(rset.rows, [[peid1]])
       
   338 
       
   339     def test_select_left_outer_join(self):
       
   340         ueid = self.execute("INSERT EUser X: X login 'bob', X upassword 'toto', X in_group G "
       
   341                             "WHERE G name 'users'")[0][0]
       
   342         self.commit()
       
   343         try:
       
   344             rset = self.execute('Any FS,TS,C,D,U ORDERBY D DESC '
       
   345                                 'WHERE WF wf_info_for X,'
       
   346                                 'WF from_state FS?, WF to_state TS, WF comment C,'
       
   347                                 'WF creation_date D, WF owned_by U, X eid %(x)s',
       
   348                                 {'x': ueid}, 'x')
       
   349             self.assertEquals(len(rset), 1)
       
   350             self.execute('SET X in_state S WHERE X eid %(x)s, S name "deactivated"',
       
   351                          {'x': ueid}, 'x')
       
   352             rset = self.execute('Any FS,TS,C,D,U ORDERBY D DESC '
       
   353                                 'WHERE WF wf_info_for X,'
       
   354                                 'WF from_state FS?, WF to_state TS, WF comment C,'
       
   355                                 'WF creation_date D, WF owned_by U, X eid %(x)s',
       
   356                                 {'x': ueid}, 'x')
       
   357             self.assertEquals(len(rset), 2)
       
   358         finally:
       
   359             self.execute('DELETE EUser X WHERE X eid %s' % ueid)
       
   360             self.commit()
       
   361 
       
   362     def test_select_ambigous_outer_join(self):
       
   363         teid = self.execute("INSERT Tag X: X name 'tag'")[0][0]
       
   364         self.execute("INSERT Tag X: X name 'tagbis'")[0][0]
       
   365         geid = self.execute("EGroup G WHERE G name 'users'")[0][0]
       
   366         self.execute("SET X tags Y WHERE X eid %(t)s, Y eid %(g)s",
       
   367                      {'g': geid, 't': teid}, 'g')
       
   368         rset = self.execute("Any GN,TN ORDERBY GN WHERE T? tags G, T name TN, G name GN")
       
   369         self.failUnless(['users', 'tag'] in rset.rows)
       
   370         self.failUnless(['activated', None] in rset.rows)
       
   371         rset = self.execute("Any GN,TN ORDERBY GN WHERE T tags G?, T name TN, G name GN")
       
   372         self.assertEquals(rset.rows, [[None, 'tagbis'], ['users', 'tag']])            
       
   373         
       
   374     def test_select_not_inline_rel(self):
       
   375         self.execute("INSERT Personne X: X nom 'bidule'")
       
   376         self.execute("INSERT Note X: X type 'a'")
       
   377         self.execute("INSERT Note X: X type 'b'")
       
   378         self.execute("SET X ecrit_par Y WHERE X type 'a', Y nom 'bidule'")
       
   379         rset = self.execute('Note X WHERE NOT X ecrit_par P')
       
   380         self.assertEquals(len(rset.rows), 1, rset.rows)
       
   381         
       
   382     def test_select_not_unlinked_multiple_solutions(self):
       
   383         self.execute("INSERT Personne X: X nom 'bidule'")
       
   384         self.execute("INSERT Note X: X type 'a'")
       
   385         self.execute("INSERT Note X: X type 'b'")
       
   386         self.execute("SET Y evaluee X WHERE X type 'a', Y nom 'bidule'")
       
   387         rset = self.execute('Note X WHERE NOT Y evaluee X')
       
   388         self.assertEquals(len(rset.rows), 1, rset.rows)
       
   389 
       
   390     def test_select_aggregat_count(self):
       
   391         rset = self.execute('Any COUNT(X)')
       
   392         self.assertEquals(len(rset.rows), 1)
       
   393         self.assertEquals(len(rset.rows[0]), 1)
       
   394         self.assertEquals(rset.description, [('Int',)])
       
   395         
       
   396     def test_select_aggregat_sum(self):
       
   397         rset = self.execute('Any SUM(O) WHERE X ordernum O')
       
   398         self.assertEquals(len(rset.rows), 1)
       
   399         self.assertEquals(len(rset.rows[0]), 1)
       
   400         self.assertEquals(rset.description, [('Int',)])
       
   401         
       
   402     def test_select_aggregat_min(self):
       
   403         rset = self.execute('Any MIN(X) WHERE X is Personne')
       
   404         self.assertEquals(len(rset.rows), 1)
       
   405         self.assertEquals(len(rset.rows[0]), 1)
       
   406         self.assertEquals(rset.description, [('Personne',)])
       
   407         rset = self.execute('Any MIN(O) WHERE X ordernum O')
       
   408         self.assertEquals(len(rset.rows), 1)
       
   409         self.assertEquals(len(rset.rows[0]), 1)
       
   410         self.assertEquals(rset.description, [('Int',)])
       
   411         
       
   412     def test_select_aggregat_max(self):
       
   413         rset = self.execute('Any MAX(X) WHERE X is Personne')
       
   414         self.assertEquals(len(rset.rows), 1)
       
   415         self.assertEquals(len(rset.rows[0]), 1)
       
   416         self.assertEquals(rset.description, [('Personne',)])
       
   417         rset = self.execute('Any MAX(O) WHERE X ordernum O')
       
   418         self.assertEquals(len(rset.rows), 1)
       
   419         self.assertEquals(len(rset.rows[0]), 1)
       
   420         self.assertEquals(rset.description, [('Int',)])
       
   421 
       
   422     def test_select_custom_aggregat_concat_string(self):
       
   423         rset = self.execute('Any CONCAT_STRINGS(N) WHERE X is EGroup, X name N')
       
   424         self.failUnless(rset)
       
   425         self.failUnlessEqual(sorted(rset[0][0].split(', ')), ['guests', 'managers',
       
   426                                                              'owners', 'users'])
       
   427 
       
   428     def test_select_custom_regproc_limit_size(self):
       
   429         rset = self.execute('Any TEXT_LIMIT_SIZE(N, 3) WHERE X is EGroup, X name N, X name "managers"')
       
   430         self.failUnless(rset)
       
   431         self.failUnlessEqual(rset[0][0], 'man...')
       
   432         self.execute("INSERT Basket X: X name 'bidule', X description '<b>hop hop</b>', X description_format 'text/html'")
       
   433         rset = self.execute('Any LIMIT_SIZE(D, DF, 3) WHERE X is Basket, X description D, X description_format DF')
       
   434         self.failUnless(rset)
       
   435         self.failUnlessEqual(rset[0][0], 'hop...')
       
   436 
       
   437     def test_select_regproc_orderby(self):
       
   438         rset = self.execute('DISTINCT Any X,N ORDERBY GROUP_SORT_VALUE(N) WHERE X is EGroup, X name N, X name "managers"')
       
   439         self.failUnlessEqual(len(rset), 1)
       
   440         self.failUnlessEqual(rset[0][1], 'managers')
       
   441         rset = self.execute('Any X,N ORDERBY GROUP_SORT_VALUE(N) WHERE X is EGroup, X name N, NOT U in_group X, U login "admin"')
       
   442         self.failUnlessEqual(len(rset), 3)
       
   443         self.failUnlessEqual(rset[0][1], 'owners')
       
   444         
       
   445     def test_select_aggregat_sort(self):
       
   446         rset = self.execute('Any G, COUNT(U) GROUPBY G ORDERBY 2 WHERE U in_group G')
       
   447         self.assertEquals(len(rset.rows), 2)
       
   448         self.assertEquals(len(rset.rows[0]), 2)
       
   449         self.assertEquals(rset.description[0], ('EGroup', 'Int',))
       
   450 
       
   451     def test_select_aggregat_having(self):
       
   452         rset = self.execute('Any N,COUNT(RDEF) GROUPBY N ORDERBY 2,N '
       
   453                             'WHERE RT name N, RDEF relation_type RT '
       
   454                             'HAVING COUNT(RDEF) > 10')
       
   455         self.assertListEquals(rset.rows,
       
   456                               [[u'description', 11], ['in_basket', 11],
       
   457                                [u'name', 12], [u'created_by', 32],
       
   458                                [u'creation_date', 32], [u'is', 32], [u'is_instance_of', 32],
       
   459                                [u'modification_date', 32], [u'owned_by', 32]])
       
   460 
       
   461     def test_select_aggregat_having_dumb(self):
       
   462         # dumb but should not raise an error
       
   463         rset = self.execute('Any U,COUNT(X) GROUPBY U '
       
   464                             'WHERE U eid %(x)s, X owned_by U '
       
   465                             'HAVING COUNT(X) > 10', {'x': self.ueid})
       
   466         self.assertEquals(len(rset.rows), 1)
       
   467         self.assertEquals(rset.rows[0][0], self.ueid)
       
   468 
       
   469     def test_select_complex_sort(self):
       
   470         rset = self.execute('Any X ORDERBY X,D LIMIT 5 WHERE X creation_date D')
       
   471         result = rset.rows
       
   472         result.sort()
       
   473         self.assertEquals(tuplify(result), [(1,), (2,), (3,), (4,), (5,)])
       
   474         
       
   475     def test_select_upper(self):
       
   476         rset = self.execute('Any X, UPPER(L) ORDERBY L WHERE X is EUser, X login L')
       
   477         self.assertEquals(len(rset.rows), 2)
       
   478         self.assertEquals(rset.rows[0][1], 'ADMIN')
       
   479         self.assertEquals(rset.description[0], ('EUser', 'String',))
       
   480         self.assertEquals(rset.rows[1][1], 'ANON')
       
   481         self.assertEquals(rset.description[1], ('EUser', 'String',))
       
   482         eid = rset.rows[0][0]
       
   483         rset = self.execute('Any UPPER(L) WHERE X eid %s, X login L'%eid)
       
   484         self.assertEquals(rset.rows[0][0], 'ADMIN')
       
   485         self.assertEquals(rset.description, [('String',)])
       
   486 
       
   487 ##     def test_select_simplified(self):
       
   488 ##         ueid = self.session.user.eid
       
   489 ##         rset = self.execute('Any L WHERE %s login L'%ueid)
       
   490 ##         self.assertEquals(rset.rows[0][0], 'admin')
       
   491 ##         rset = self.execute('Any L WHERE %(x)s login L', {'x':ueid})
       
   492 ##         self.assertEquals(rset.rows[0][0], 'admin')
       
   493         
       
   494     def test_select_searchable_text_1(self):
       
   495         rset = self.execute(u"INSERT Personne X: X nom 'bidüle'")
       
   496         rset = self.execute(u"INSERT Societe X: X nom 'bidüle'")
       
   497         rset = self.execute("INSERT Societe X: X nom 'chouette'")
       
   498         self.commit()
       
   499         rset = self.execute('Any X where X has_text %(text)s', {'text': u'bidüle'})
       
   500         self.assertEquals(len(rset.rows), 2, rset.rows)
       
   501         rset = self.execute(u'Any N where N has_text "bidüle"')
       
   502         self.assertEquals(len(rset.rows), 2, rset.rows)
       
   503         biduleeids = [r[0] for r in rset.rows]
       
   504         rset = self.execute(u'Any N where NOT N has_text "bidüle"')
       
   505         self.failIf([r[0] for r in rset.rows if r[0] in biduleeids])
       
   506         # duh?
       
   507         rset = self.execute('Any X WHERE X has_text %(text)s', {'text': u'ça'})
       
   508         
       
   509     def test_select_searchable_text_2(self):
       
   510         rset = self.execute("INSERT Personne X: X nom 'bidule'")
       
   511         rset = self.execute("INSERT Personne X: X nom 'chouette'")
       
   512         rset = self.execute("INSERT Societe X: X nom 'bidule'")
       
   513         self.commit()
       
   514         rset = self.execute('Personne N where N has_text "bidule"')
       
   515         self.assertEquals(len(rset.rows), 1, rset.rows)
       
   516         
       
   517     def test_select_searchable_text_3(self):
       
   518         rset = self.execute("INSERT Personne X: X nom 'bidule', X sexe 'M'")
       
   519         rset = self.execute("INSERT Personne X: X nom 'bidule', X sexe 'F'")
       
   520         rset = self.execute("INSERT Societe X: X nom 'bidule'")
       
   521         self.commit()
       
   522         rset = self.execute('Any X where X has_text "bidule" and X sexe "M"')
       
   523         self.assertEquals(len(rset.rows), 1, rset.rows)
       
   524         
       
   525     def test_select_multiple_searchable_text(self):
       
   526         self.execute(u"INSERT Personne X: X nom 'bidüle'")
       
   527         self.execute("INSERT Societe X: X nom 'chouette', S travaille X")
       
   528         self.execute(u"INSERT Personne X: X nom 'bidüle'")
       
   529         self.commit()
       
   530         rset = self.execute('Personne X WHERE X has_text %(text)s, X travaille S, S has_text %(text2)s',
       
   531                             {'text': u'bidüle',
       
   532                              'text2': u'chouette',}
       
   533                             )
       
   534         self.assertEquals(len(rset.rows), 1, rset.rows)
       
   535         
       
   536     def test_select_no_descr(self):
       
   537         rset = self.execute('Any X WHERE X is EGroup', build_descr=0)
       
   538         rset.rows.sort()
       
   539         self.assertEquals(tuplify(rset.rows), [(1,), (2,), (3,), (4,)])
       
   540         self.assertEquals(rset.description, ())
       
   541 
       
   542     def test_select_limit_offset(self):
       
   543         rset = self.execute('EGroup X ORDERBY N LIMIT 2 WHERE X name N')
       
   544         self.assertEquals(tuplify(rset.rows), [(3,), (1,)])
       
   545         self.assertEquals(rset.description, [('EGroup',), ('EGroup',)])
       
   546         rset = self.execute('EGroup X ORDERBY N LIMIT 2 OFFSET 2 WHERE X name N')
       
   547         self.assertEquals(tuplify(rset.rows), [(4,), (2,)])
       
   548         
       
   549     def test_select_symetric(self):
       
   550         self.execute("INSERT Personne X: X nom 'machin'")
       
   551         self.execute("INSERT Personne X: X nom 'bidule'")
       
   552         self.execute("INSERT Personne X: X nom 'chouette'")
       
   553         self.execute("INSERT Personne X: X nom 'trucmuche'")
       
   554         self.execute("SET X connait Y WHERE X nom 'chouette', Y nom 'bidule'")
       
   555         self.execute("SET X connait Y WHERE X nom 'machin', Y nom 'chouette'")
       
   556         rset = self.execute('Any P where P connait P2')
       
   557         self.assertEquals(len(rset.rows), 3, rset.rows)
       
   558         rset = self.execute('Any P where NOT P connait P2')
       
   559         self.assertEquals(len(rset.rows), 1, rset.rows) # trucmuche
       
   560         rset = self.execute('Any P where P connait P2, P2 nom "bidule"')
       
   561         self.assertEquals(len(rset.rows), 1, rset.rows)
       
   562         rset = self.execute('Any P where P2 connait P, P2 nom "bidule"')
       
   563         self.assertEquals(len(rset.rows), 1, rset.rows)
       
   564         rset = self.execute('Any P where P connait P2, P2 nom "chouette"')
       
   565         self.assertEquals(len(rset.rows), 2, rset.rows)
       
   566         rset = self.execute('Any P where P2 connait P, P2 nom "chouette"')
       
   567         self.assertEquals(len(rset.rows), 2, rset.rows)
       
   568         
       
   569     def test_select_inline(self):
       
   570         self.execute("INSERT Personne X: X nom 'bidule'")
       
   571         self.execute("INSERT Note X: X type 'a'")
       
   572         self.execute("SET X ecrit_par Y WHERE X type 'a', Y nom 'bidule'")
       
   573         rset = self.execute('Any N where N ecrit_par X, X nom "bidule"')
       
   574         self.assertEquals(len(rset.rows), 1, rset.rows)
       
   575         
       
   576     def test_select_creation_date(self):
       
   577         self.execute("INSERT Personne X: X nom 'bidule'")
       
   578         rset = self.execute('Any D WHERE X nom "bidule", X creation_date D')
       
   579         self.assertEqual(len(rset.rows), 1)
       
   580 
       
   581     def test_select_or_relation(self):
       
   582         self.execute("INSERT Personne X: X nom 'bidule'")
       
   583         self.execute("INSERT Personne X: X nom 'chouette'")
       
   584         self.execute("INSERT Societe X: X nom 'logilab'")
       
   585         self.execute("INSERT Societe X: X nom 'caesium'")
       
   586         self.execute("SET P travaille S WHERE P nom 'bidule', S nom 'logilab'")
       
   587         rset = self.execute('DISTINCT Any P WHERE P travaille S1 OR P travaille S2, S1 nom "logilab", S2 nom "caesium"')
       
   588         self.assertEqual(len(rset.rows), 1)
       
   589         self.execute("SET P travaille S WHERE P nom 'chouette', S nom 'caesium'")
       
   590         rset = self.execute('DISTINCT Any P WHERE P travaille S1 OR P travaille S2, S1 nom "logilab", S2 nom "caesium"')
       
   591         self.assertEqual(len(rset.rows), 2)
       
   592         
       
   593     def test_select_or_sym_relation(self):
       
   594         self.execute("INSERT Personne X: X nom 'bidule'")
       
   595         self.execute("INSERT Personne X: X nom 'chouette'")
       
   596         self.execute("INSERT Personne X: X nom 'truc'")
       
   597         self.execute("SET P connait S WHERE P nom 'bidule', S nom 'chouette'")
       
   598         rset = self.execute('DISTINCT Any P WHERE S connait P, S nom "chouette"')
       
   599         self.assertEqual(len(rset.rows), 1, rset.rows)
       
   600         rset = self.execute('DISTINCT Any P WHERE P connait S or S connait P, S nom "chouette"')
       
   601         self.assertEqual(len(rset.rows), 1, rset.rows)
       
   602         self.execute("SET P connait S WHERE P nom 'chouette', S nom 'truc'")
       
   603         rset = self.execute('DISTINCT Any P WHERE S connait P, S nom "chouette"')
       
   604         self.assertEqual(len(rset.rows), 2, rset.rows)
       
   605         rset = self.execute('DISTINCT Any P WHERE P connait S OR S connait P, S nom "chouette"')
       
   606         self.assertEqual(len(rset.rows), 2, rset.rows)
       
   607             
       
   608     def test_select_follow_relation(self):
       
   609         self.execute("INSERT Affaire X: X sujet 'cool'")
       
   610         self.execute("INSERT Societe X: X nom 'chouette'")
       
   611         self.execute("SET A concerne S WHERE A is Affaire, S is Societe")
       
   612         self.execute("INSERT Note X: X para 'truc'")
       
   613         self.execute("SET S evaluee N WHERE S is Societe, N is Note")
       
   614         self.execute("INSERT Societe X: X nom 'bidule'")
       
   615         self.execute("INSERT Note X: X para 'troc'")
       
   616         self.execute("SET S evaluee N WHERE S nom 'bidule', N para 'troc'")
       
   617         rset = self.execute('DISTINCT Any A,N WHERE A concerne S, S evaluee N')
       
   618         self.assertEqual(len(rset.rows), 1, rset.rows)
       
   619 
       
   620     def test_select_ordered_distinct_1(self):
       
   621         self.execute("INSERT Affaire X: X sujet 'cool', X ref '1'")
       
   622         self.execute("INSERT Affaire X: X sujet 'cool', X ref '2'")
       
   623         rset = self.execute('DISTINCT Any S ORDERBY R WHERE A is Affaire, A sujet S, A ref R')
       
   624         self.assertEqual(rset.rows, [['cool']])
       
   625 
       
   626     def test_select_ordered_distinct_2(self):
       
   627         self.execute("INSERT Affaire X: X sujet 'minor'")
       
   628         self.execute("INSERT Affaire X: X sujet 'important'")
       
   629         self.execute("INSERT Affaire X: X sujet 'normal'")
       
   630         self.execute("INSERT Affaire X: X sujet 'zou'")
       
   631         self.execute("INSERT Affaire X: X sujet 'abcd'")
       
   632         rset = self.execute('DISTINCT Any S ORDERBY S WHERE A is Affaire, A sujet S')
       
   633         self.assertEqual(rset.rows, [['abcd'], ['important'], ['minor'], ['normal'], ['zou']])
       
   634         
       
   635     def test_select_ordered_distinct_3(self):
       
   636         rset = self.execute('DISTINCT Any N ORDERBY GROUP_SORT_VALUE(N) WHERE X is EGroup, X name N')
       
   637         self.assertEqual(rset.rows, [['owners'], ['guests'], ['users'], ['managers']])
       
   638 
       
   639     def test_select_or_value(self):
       
   640         rset = self.execute('Any U WHERE U in_group G, G name "owners" OR G name "users"')
       
   641         self.assertEqual(len(rset.rows), 0)
       
   642         rset = self.execute('Any U WHERE U in_group G, G name "guests" OR G name "managers"')
       
   643         self.assertEqual(len(rset.rows), 2)
       
   644 
       
   645     def test_select_explicit_eid(self):
       
   646         rset = self.execute('Any X,E WHERE X owned_by U, X eid E, U eid %(u)s', {'u': self.session.user.eid})
       
   647         self.failUnless(rset)
       
   648         self.assertEquals(rset.description[0][1], 'Int')
       
   649         
       
   650 #     def test_select_rewritten_optional(self):
       
   651 #         eid = self.execute("INSERT Affaire X: X sujet 'cool'")[0][0]
       
   652 #         rset = self.execute('Any X WHERE X eid %(x)s, EXISTS(X owned_by U) OR EXISTS(X concerne S?, S owned_by U)',
       
   653 #                             {'x': eid}, 'x')
       
   654 #         self.assertEquals(rset.rows, [[eid]])
       
   655         
       
   656     def test_today_bug(self):
       
   657         self.execute("INSERT Tag X: X name 'bidule', X creation_date TODAY")
       
   658         self.execute("INSERT Tag Y: Y name 'toto'")
       
   659         rset = self.execute("Any D WHERE X name in ('bidule', 'toto') , X creation_date D")
       
   660         self.assert_(isinstance(rset.rows[0][0], DateTimeType), rset.rows)
       
   661         rset = self.execute('Tag X WHERE X creation_date TODAY')
       
   662         self.assertEqual(len(rset.rows), 2)
       
   663         rset = self.execute('Any MAX(D) WHERE X is Tag, X creation_date D')
       
   664         self.failUnless(isinstance(rset[0][0], DateTimeType), type(rset[0][0]))
       
   665 
       
   666     def test_today(self):
       
   667         self.execute("INSERT Tag X: X name 'bidule', X creation_date TODAY")
       
   668         self.execute("INSERT Tag Y: Y name 'toto'")
       
   669         rset = self.execute('Tag X WHERE X creation_date TODAY')
       
   670         self.assertEqual(len(rset.rows), 2)
       
   671 
       
   672     def test_select_boolean(self):
       
   673         rset = self.execute('Any N WHERE X is EEType, X name N, X final %(val)s',
       
   674                             {'val': True})
       
   675         self.assertEquals(sorted(r[0] for r in rset.rows), ['Boolean', 'Bytes',
       
   676                                                             'Date', 'Datetime',
       
   677                                                             'Decimal', 'Float',
       
   678                                                             'Int', 'Interval',
       
   679                                                             'Password', 'String',
       
   680                                                             'Time'])
       
   681         rset = self.execute('Any N WHERE X is EEType, X name N, X final TRUE')
       
   682         self.assertEquals(sorted(r[0] for r in rset.rows), ['Boolean', 'Bytes',
       
   683                                                             'Date', 'Datetime',
       
   684                                                             'Decimal', 'Float',
       
   685                                                             'Int', 'Interval',
       
   686                                                             'Password', 'String',
       
   687                                                             'Time'])
       
   688         
       
   689     def test_select_constant(self):
       
   690         rset = self.execute('Any X, "toto" ORDERBY X WHERE X is EGroup')
       
   691         self.assertEquals(rset.rows,
       
   692                           map(list, zip((1,2,3,4), ('toto','toto','toto','toto',))))
       
   693         self.assertIsInstance(rset[0][1], unicode)
       
   694         self.assertEquals(rset.description,
       
   695                           zip(('EGroup', 'EGroup', 'EGroup', 'EGroup'),
       
   696                               ('String', 'String', 'String', 'String',)))
       
   697         rset = self.execute('Any X, %(value)s ORDERBY X WHERE X is EGroup', {'value': 'toto'})
       
   698         self.assertEquals(rset.rows,
       
   699                           map(list, zip((1,2,3,4), ('toto','toto','toto','toto',))))
       
   700         self.assertIsInstance(rset[0][1], unicode)
       
   701         self.assertEquals(rset.description,
       
   702                           zip(('EGroup', 'EGroup', 'EGroup', 'EGroup'),
       
   703                               ('String', 'String', 'String', 'String',)))
       
   704         rset = self.execute('Any X,GN WHERE X is EUser, G is EGroup, X login "syt", X in_group G, G name GN')
       
   705 
       
   706     def test_select_union(self):
       
   707         rset = self.execute('Any X,N ORDERBY N WITH X,N BEING '
       
   708                             '((Any X,N WHERE X name N, X transition_of E, E name %(name)s)'
       
   709                             ' UNION '
       
   710                             '(Any X,N WHERE X name N, X state_of E, E name %(name)s))',
       
   711                             {'name': 'EUser'})
       
   712         self.assertEquals([x[1] for x in rset.rows],
       
   713                           ['activate', 'activated', 'deactivate', 'deactivated'])
       
   714         self.assertEquals(rset.description,
       
   715                           [('Transition', 'String'), ('State', 'String'),
       
   716                            ('Transition', 'String'), ('State', 'String')])
       
   717         
       
   718     def test_select_union_aggregat(self):
       
   719         # meaningless, the goal in to have group by done on different attribute
       
   720         # for each sub-query
       
   721         self.execute('(Any N,COUNT(X) GROUPBY N WHERE X name N, X is State)'
       
   722                      ' UNION '
       
   723                      '(Any N,COUNT(X) GROUPBY N ORDERBY 2 WHERE X login N)')
       
   724         
       
   725     def test_select_union_aggregat_independant_group(self):
       
   726         self.execute('INSERT State X: X name "hop"')
       
   727         self.execute('INSERT State X: X name "hop"')
       
   728         self.execute('INSERT Transition X: X name "hop"')
       
   729         self.execute('INSERT Transition X: X name "hop"')
       
   730         rset = self.execute('Any N,NX ORDERBY 2 WITH N,NX BEING '
       
   731                             '((Any N,COUNT(X) GROUPBY N WHERE X name N, X is State HAVING COUNT(X)>1)'
       
   732                             ' UNION '
       
   733                             '(Any N,COUNT(X) GROUPBY N WHERE X name N, X is Transition HAVING COUNT(X)>1))')
       
   734         self.assertEquals(rset.rows, [[u'hop', 2], [u'hop', 2]])
       
   735         
       
   736     def test_select_union_selection_with_diff_variables(self):
       
   737         rset = self.execute('(Any N WHERE X name N, X is State)'
       
   738                             ' UNION '
       
   739                             '(Any NN WHERE XX name NN, XX is Transition)')
       
   740         self.assertEquals(sorted(r[0] for r in rset.rows),
       
   741                           ['abort', 'activate', 'activated', 'ben non',
       
   742                            'deactivate', 'deactivated', 'done', 'en cours',
       
   743                            'end', 'finie', 'markasdone', 'pitetre', 'redoit',
       
   744                            'start', 'todo'])
       
   745         
       
   746     def test_exists(self):
       
   747         geid = self.execute("INSERT EGroup X: X name 'lulufanclub'")[0][0]
       
   748         self.execute("SET U in_group G WHERE G name 'lulufanclub'")
       
   749         peid = self.execute("INSERT Personne X: X prenom 'lulu', X nom 'petit'")[0][0]
       
   750         rset = self.execute("Any X WHERE X prenom 'lulu',"
       
   751                             "EXISTS (U in_group G, G name 'lulufanclub' OR G name 'managers');")
       
   752         self.assertEquals(rset.rows, [[peid]])
       
   753 
       
   754     def test_identity(self):
       
   755         eid = self.execute('Any X WHERE X identity Y, Y eid 1')[0][0]
       
   756         self.assertEquals(eid, 1)
       
   757         eid = self.execute('Any X WHERE Y identity X, Y eid 1')[0][0]
       
   758         self.assertEquals(eid, 1)
       
   759         login = self.execute('Any L WHERE X login "admin", X identity Y, Y login L')[0][0]
       
   760         self.assertEquals(login, 'admin')
       
   761 
       
   762     def test_select_date_mathexp(self):
       
   763         rset = self.execute('Any X, TODAY - CD WHERE X is EUser, X creation_date CD')
       
   764         self.failUnless(rset)
       
   765         self.failUnlessEqual(rset.description[0][1], 'Interval')
       
   766         eid, = self.execute("INSERT Personne X: X nom 'bidule'")[0]
       
   767         rset = self.execute('Any X, NOW - CD WHERE X is Personne, X creation_date CD')
       
   768         self.failUnlessEqual(rset.description[0][1], 'Interval')
       
   769         # sqlite bug
       
   770         #from mx.DateTime import DateTimeDeltaType
       
   771         #self.assertIsInstance(rset[0][1], DateTimeDeltaType) 
       
   772         #self.failUnless(rset[0][1].seconds > 0)
       
   773 
       
   774     def test_select_subquery_aggregat(self):
       
   775         # percent users by groups
       
   776         self.execute('SET X in_group G WHERE G name "users"')
       
   777         rset = self.execute('Any GN, COUNT(X)*100/T GROUPBY GN ORDERBY 2,1'
       
   778                             ' WHERE G name GN, X in_group G'
       
   779                             ' WITH T BEING (Any COUNT(U) WHERE U is EUser)')
       
   780         self.assertEquals(rset.rows, [[u'guests', 50], [u'managers', 50], [u'users', 100]])
       
   781         self.assertEquals(rset.description, [('String', 'Int'), ('String', 'Int'), ('String', 'Int')])
       
   782 
       
   783     def test_select_subquery_const(self):
       
   784         rset = self.execute('Any X WITH X BEING ((Any NULL) UNION (Any "toto"))')
       
   785         self.assertEquals(rset.rows, [[None], ['toto']])
       
   786         self.assertEquals(rset.description, [(None,), ('String',)])
       
   787                           
       
   788     # insertion queries tests #################################################
       
   789     
       
   790     def test_insert_is(self):
       
   791         eid, = self.execute("INSERT Personne X: X nom 'bidule'")[0]
       
   792         etype, = self.execute("Any TN WHERE X is T, X eid %s, T name TN" % eid)[0]
       
   793         self.assertEquals(etype, 'Personne')
       
   794         self.execute("INSERT Personne X: X nom 'managers'")
       
   795     
       
   796     def test_insert_1(self):
       
   797         rset = self.execute("INSERT Personne X: X nom 'bidule'")
       
   798         self.assertEquals(len(rset.rows), 1)
       
   799         self.assertEquals(rset.description, [('Personne',)])
       
   800         rset = self.execute('Personne X WHERE X nom "bidule"')
       
   801         self.assert_(rset.rows)
       
   802         self.assertEquals(rset.description, [('Personne',)])
       
   803 
       
   804     def test_insert_1_multiple(self):
       
   805         self.execute("INSERT Personne X: X nom 'bidule'")
       
   806         self.execute("INSERT Personne X: X nom 'chouette'")
       
   807         rset = self.execute("INSERT Societe Y: Y nom N, P travaille Y WHERE P nom N")
       
   808         self.assertEquals(len(rset.rows), 2)
       
   809         self.assertEquals(rset.description, [('Societe',), ('Societe',)])
       
   810 
       
   811     def test_insert_2(self):
       
   812         rset = self.execute("INSERT Personne X, Personne Y: X nom 'bidule', Y nom 'tutu'")
       
   813         self.assertEquals(rset.description, [('Personne', 'Personne')])
       
   814         rset = self.execute('Personne X WHERE X nom "bidule" or X nom "tutu"')
       
   815         self.assert_(rset.rows)
       
   816         self.assertEquals(rset.description, [('Personne',), ('Personne',)])
       
   817 
       
   818     def test_insert_3(self):
       
   819         self.execute("INSERT Personne X: X nom Y WHERE U login 'admin', U login Y")
       
   820         rset = self.execute('Personne X WHERE X nom "admin"')
       
   821         self.assert_(rset.rows)
       
   822         self.assertEquals(rset.description, [('Personne',)])        
       
   823 
       
   824     def test_insert_4(self):
       
   825         self.execute("INSERT Societe Y: Y nom 'toto'")
       
   826         self.execute("INSERT Personne X: X nom 'bidule', X travaille Y WHERE Y nom 'toto'")
       
   827         rset = self.execute('Any X, Y WHERE X nom "bidule", Y nom "toto", X travaille Y')
       
   828         self.assert_(rset.rows)
       
   829         self.assertEquals(rset.description, [('Personne', 'Societe',)])
       
   830         
       
   831     def test_insert_4bis(self):
       
   832         peid = self.execute("INSERT Personne X: X nom 'bidule'")[0][0]
       
   833         seid = self.execute("INSERT Societe Y: Y nom 'toto', X travaille Y WHERE X eid %(x)s",
       
   834                              {'x': str(peid)})[0][0]
       
   835         self.assertEqual(len(self.execute('Any X, Y WHERE X travaille Y')), 1)
       
   836         self.execute("INSERT Personne X: X nom 'chouette', X travaille Y WHERE Y eid %(x)s",
       
   837                       {'x': str(seid)})
       
   838         self.assertEqual(len(self.execute('Any X, Y WHERE X travaille Y')), 2)
       
   839         
       
   840     def test_insert_4ter(self):
       
   841         peid = self.execute("INSERT Personne X: X nom 'bidule'")[0][0]
       
   842         seid = self.execute("INSERT Societe Y: Y nom 'toto', X travaille Y WHERE X eid %(x)s",
       
   843                              {'x': unicode(peid)})[0][0]
       
   844         self.assertEqual(len(self.execute('Any X, Y WHERE X travaille Y')), 1)
       
   845         self.execute("INSERT Personne X: X nom 'chouette', X travaille Y WHERE Y eid %(x)s",
       
   846                       {'x': unicode(seid)})
       
   847         self.assertEqual(len(self.execute('Any X, Y WHERE X travaille Y')), 2)
       
   848 
       
   849     def test_insert_5(self):
       
   850         self.execute("INSERT Personne X: X nom 'bidule'")
       
   851         self.execute("INSERT Societe Y: Y nom 'toto', X travaille Y WHERE X nom 'bidule'")
       
   852         rset = self.execute('Any X, Y WHERE X nom "bidule", Y nom "toto", X travaille Y')
       
   853         self.assert_(rset.rows)
       
   854         self.assertEquals(rset.description, [('Personne', 'Societe',)])
       
   855 
       
   856     def test_insert_6(self):
       
   857         self.execute("INSERT Personne X, Societe Y: X nom 'bidule', Y nom 'toto', X travaille Y")
       
   858         rset = self.execute('Any X, Y WHERE X nom "bidule", Y nom "toto", X travaille Y')
       
   859         self.assert_(rset.rows)
       
   860         self.assertEquals(rset.description, [('Personne', 'Societe',)])
       
   861 
       
   862     def test_insert_7(self):
       
   863         self.execute("INSERT Personne X, Societe Y: X nom N, Y nom 'toto', X travaille Y WHERE U login 'admin', U login N")
       
   864         rset = self.execute('Any X, Y WHERE X nom "admin", Y nom "toto", X travaille Y')
       
   865         self.assert_(rset.rows)
       
   866         self.assertEquals(rset.description, [('Personne', 'Societe',)])
       
   867 
       
   868     def test_insert_8(self):
       
   869         self.execute("INSERT Societe Y, Personne X: Y nom N, X nom 'toto', X travaille Y WHERE U login 'admin', U login N")
       
   870         rset = self.execute('Any X, Y WHERE X nom "toto", Y nom "admin", X travaille Y')
       
   871         self.assert_(rset.rows)
       
   872         self.assertEquals(rset.description, [('Personne', 'Societe',)])
       
   873 
       
   874     def test_insert_query_error(self):
       
   875         self.assertRaises(Exception,
       
   876                           self.execute,
       
   877                           "INSERT Personne X: X nom 'toto', X is Personne")
       
   878         self.assertRaises(Exception,
       
   879                           self.execute,
       
   880                           "INSERT Personne X: X nom 'toto', X is_instance_of Personne")
       
   881         self.assertRaises(QueryError,
       
   882                           self.execute,
       
   883                           "INSERT Personne X: X nom 'toto', X has_text 'tutu'")
       
   884 
       
   885         self.assertRaises(QueryError,
       
   886                           self.execute,
       
   887                           "INSERT EUser X: X login 'toto', X eid %s" % cnx.user(self.session).eid)
       
   888 
       
   889     def test_insertion_description_with_where(self):
       
   890         rset = self.execute('INSERT EUser E, EmailAddress EM: E login "X", E upassword "X", '
       
   891                             'E primary_email EM, EM address "X", E in_group G '
       
   892                             'WHERE G name "managers"')
       
   893         self.assertEquals(list(rset.description[0]), ['EUser', 'EmailAddress'])
       
   894     
       
   895     # deletion queries tests ##################################################
       
   896 
       
   897     def test_delete_1(self):
       
   898         self.execute("INSERT Personne Y: Y nom 'toto'")
       
   899         rset = self.execute('Personne X WHERE X nom "toto"')
       
   900         self.assertEqual(len(rset.rows), 1)
       
   901         self.execute("DELETE Personne Y WHERE Y nom 'toto'")
       
   902         rset = self.execute('Personne X WHERE X nom "toto"')
       
   903         self.assertEqual(len(rset.rows), 0)
       
   904         
       
   905     def test_delete_2(self):
       
   906         rset = self.execute("INSERT Personne X, Personne Y, Societe Z : X nom 'syt', Y nom 'adim', Z nom 'Logilab', X travaille Z, Y travaille Z")
       
   907         self.assertEquals(len(rset), 1)
       
   908         self.assertEquals(len(rset[0]), 3)
       
   909         self.assertEquals(rset.description[0], ('Personne', 'Personne', 'Societe'))
       
   910         self.assertEquals(self.execute('Any N WHERE X nom N, X eid %s'% rset[0][0])[0][0], 'syt')
       
   911         rset = self.execute('Personne X WHERE X travaille Y, Y nom "Logilab"')
       
   912         self.assertEqual(len(rset.rows), 2, rset.rows)
       
   913         self.execute("DELETE X travaille Y WHERE X is Personne, Y nom 'Logilabo'")
       
   914         rset = self.execute('Personne X WHERE X travaille Y, Y nom "Logilab"')
       
   915         self.assertEqual(len(rset.rows), 2, rset.rows)
       
   916         self.execute("DELETE X travaille Y WHERE X is Personne, Y nom 'Logilab'")
       
   917         rset = self.execute('Personne X WHERE X travaille Y, Y nom "Logilab"')
       
   918         self.assertEqual(len(rset.rows), 0, rset.rows)
       
   919 
       
   920     def test_delete_3(self):
       
   921         u, s = self._user_session(('users',))
       
   922         peid, = self.o.execute(s, "INSERT Personne P: P nom 'toto'")[0]
       
   923         seid, = self.o.execute(s, "INSERT Societe S: S nom 'logilab'")[0]
       
   924         self.o.execute(s, "SET P travaille S")
       
   925         rset = self.execute('Personne P WHERE P travaille S')
       
   926         self.assertEqual(len(rset.rows), 1)
       
   927         self.execute("DELETE X travaille Y WHERE X eid %s, Y eid %s" % (peid, seid))
       
   928         rset = self.execute('Personne P WHERE P travaille S')
       
   929         self.assertEqual(len(rset.rows), 0)
       
   930 
       
   931     def test_delete_symetric(self):
       
   932         teid1 = self.execute("INSERT Folder T: T name 'toto'")[0][0]
       
   933         teid2 = self.execute("INSERT Folder T: T name 'tutu'")[0][0]
       
   934         self.execute('SET X see_also Y WHERE X eid %s, Y eid %s' % (teid1, teid2))
       
   935         rset = self.execute('Any X,Y WHERE X see_also Y')
       
   936         self.assertEquals(len(rset) , 2, rset.rows)
       
   937         self.execute('DELETE X see_also Y WHERE X eid %s, Y eid %s' % (teid1, teid2))
       
   938         rset = self.execute('Any X,Y WHERE X see_also Y')
       
   939         self.assertEquals(len(rset) , 0)
       
   940         self.execute('SET X see_also Y WHERE X eid %s, Y eid %s' % (teid1, teid2))
       
   941         rset = self.execute('Any X,Y WHERE X see_also Y')
       
   942         self.assertEquals(len(rset) , 2)
       
   943         self.execute('DELETE X see_also Y WHERE X eid %s, Y eid %s' % (teid2, teid1))
       
   944         rset = self.execute('Any X,Y WHERE X see_also Y')
       
   945         self.assertEquals(len(rset) , 0)
       
   946 
       
   947     def test_nonregr_delete_cache(self):
       
   948         """test that relations are properly cleaned when an entity is deleted
       
   949         (using cachekey on sql generation returned always the same query for an eid,
       
   950         whatever the relation)
       
   951         """
       
   952         u, s = self._user_session(('users',))
       
   953         aeid, = self.o.execute(s, 'INSERT EmailAddress X: X address "toto@logilab.fr", X alias "hop"')[0]
       
   954         # XXX would be nice if the rql below was enough...
       
   955         #'INSERT Email X: X messageid "<1234>", X subject "test", X sender Y, X recipients Y'
       
   956         eeid, = self.o.execute(s, 'INSERT Email X: X messageid "<1234>", X subject "test", X sender Y, X recipients Y WHERE Y is EmailAddress')[0]
       
   957         self.o.execute(s, "DELETE Email X")
       
   958         sqlc = s.pool['system']
       
   959         sqlc.execute('SELECT * FROM recipients_relation')
       
   960         self.assertEquals(len(sqlc.fetchall()), 0)
       
   961         sqlc.execute('SELECT * FROM owned_by_relation WHERE eid_from=%s'%eeid)
       
   962         self.assertEquals(len(sqlc.fetchall()), 0)
       
   963             
       
   964     def test_nonregr_delete_cache2(self):
       
   965         eid = self.execute("INSERT Folder T: T name 'toto'")[0][0]
       
   966         self.commit()
       
   967         # fill the cache
       
   968         self.execute("Any X WHERE X eid %(x)s", {'x': eid}, 'x')
       
   969         self.execute("Any X WHERE X eid %s" %eid)
       
   970         self.execute("Folder X WHERE X eid %(x)s", {'x': eid}, 'x')
       
   971         self.execute("Folder X WHERE X eid %s" %eid)
       
   972         self.execute("DELETE Folder T WHERE T eid %s"%eid)
       
   973         self.commit()
       
   974         rset = self.execute("Any X WHERE X eid %(x)s", {'x': eid}, 'x')
       
   975         self.assertEquals(rset.rows, [])
       
   976         rset = self.execute("Any X WHERE X eid %s" %eid)
       
   977         self.assertEquals(rset.rows, [])
       
   978         rset = self.execute("Folder X WHERE X eid %(x)s", {'x': eid}, 'x')
       
   979         self.assertEquals(rset.rows, [])
       
   980         rset = self.execute("Folder X WHERE X eid %s" %eid)
       
   981         self.assertEquals(rset.rows, [])
       
   982         
       
   983     # update queries tests ####################################################
       
   984 
       
   985     def test_update_1(self):
       
   986         self.execute("INSERT Personne Y: Y nom 'toto'")
       
   987         rset = self.execute('Personne X WHERE X nom "toto"')
       
   988         self.assertEqual(len(rset.rows), 1)
       
   989         self.execute("SET X nom 'tutu', X prenom 'original' WHERE X is Personne, X nom 'toto'")
       
   990         rset = self.execute('Any Y, Z WHERE X is Personne, X nom Y, X prenom Z')
       
   991         self.assertEqual(tuplify(rset.rows), [('tutu', 'original')])
       
   992         
       
   993     def test_update_2(self):
       
   994         self.execute("INSERT Personne X, Societe Y: X nom 'bidule', Y nom 'toto'")
       
   995         #rset = self.execute('Any X, Y WHERE X nom "bidule", Y nom "toto"')
       
   996         #self.assertEqual(len(rset.rows), 1)
       
   997         #rset = self.execute('Any X, Y WHERE X travaille Y')
       
   998         #self.assertEqual(len(rset.rows), 0)
       
   999         self.execute("SET X travaille Y WHERE X nom 'bidule', Y nom 'toto'")
       
  1000         rset = self.execute('Any X, Y WHERE X travaille Y')
       
  1001         self.assertEqual(len(rset.rows), 1)
       
  1002         
       
  1003     def test_update_2bis(self):
       
  1004         rset = self.execute("INSERT Personne X, Societe Y: X nom 'bidule', Y nom 'toto'")
       
  1005         eid1, eid2 = rset[0][0], rset[0][1]
       
  1006         self.execute("SET X travaille Y WHERE X eid %(x)s, Y eid %(y)s",
       
  1007                       {'x': str(eid1), 'y': str(eid2)})
       
  1008         rset = self.execute('Any X, Y WHERE X travaille Y')
       
  1009         self.assertEqual(len(rset.rows), 1)
       
  1010         
       
  1011     def test_update_2ter(self):
       
  1012         rset = self.execute("INSERT Personne X, Societe Y: X nom 'bidule', Y nom 'toto'")
       
  1013         eid1, eid2 = rset[0][0], rset[0][1]
       
  1014         self.execute("SET X travaille Y WHERE X eid %(x)s, Y eid %(y)s",
       
  1015                       {'x': unicode(eid1), 'y': unicode(eid2)})
       
  1016         rset = self.execute('Any X, Y WHERE X travaille Y')
       
  1017         self.assertEqual(len(rset.rows), 1)
       
  1018         
       
  1019 ##     def test_update_4(self):
       
  1020 ##         self.execute("SET X know Y WHERE X ami Y")
       
  1021         
       
  1022     def test_update_multiple1(self):
       
  1023         peid1 = self.execute("INSERT Personne Y: Y nom 'tutu'")[0][0]
       
  1024         peid2 = self.execute("INSERT Personne Y: Y nom 'toto'")[0][0]
       
  1025         self.execute("SET X nom 'tutu', Y nom 'toto' WHERE X nom 'toto', Y nom 'tutu'")
       
  1026         self.assertEquals(self.execute('Any X WHERE X nom "toto"').rows, [[peid1]])
       
  1027         self.assertEquals(self.execute('Any X WHERE X nom "tutu"').rows, [[peid2]])
       
  1028 
       
  1029     def test_update_multiple2(self):
       
  1030         ueid = self.execute("INSERT EUser X: X login 'bob', X upassword 'toto'")[0][0]
       
  1031         peid1 = self.execute("INSERT Personne Y: Y nom 'turlu'")[0][0]
       
  1032         peid2 = self.execute("INSERT Personne Y: Y nom 'tutu'")[0][0]
       
  1033         self.execute('SET P1 owned_by U, P2 owned_by U '
       
  1034                      'WHERE P1 eid %s, P2 eid %s, U eid %s' % (peid1, peid2, ueid))
       
  1035         self.failUnless(self.execute('Any X WHERE X eid %s, X owned_by U, U eid %s'
       
  1036                                        % (peid1, ueid)))
       
  1037         self.failUnless(self.execute('Any X WHERE X eid %s, X owned_by U, U eid %s'
       
  1038                                        % (peid2, ueid)))
       
  1039 
       
  1040     def test_update_math_expr(self):
       
  1041         orders = [r[0] for r in self.execute('Any O ORDERBY O WHERE ST name "Personne", X from_entity ST, X ordernum O')]
       
  1042         for i,v in enumerate(orders):
       
  1043             if v != orders[0]:
       
  1044                 splitidx = i
       
  1045                 break
       
  1046         self.execute('SET X ordernum Y+1 WHERE X from_entity SE, SE name "Personne", X ordernum Y, X ordernum >= %(order)s',
       
  1047                      {'order': orders[splitidx]})
       
  1048         orders2 = [r[0] for r in self.execute('Any O ORDERBY O WHERE ST name "Personne", X from_entity ST, X ordernum O')]
       
  1049         orders = orders[:splitidx] + [o+1 for o in orders[splitidx:]]
       
  1050         self.assertEquals(orders2, orders)
       
  1051 
       
  1052     def test_update_string_concat(self):
       
  1053         beid = self.execute("INSERT Bookmark Y: Y title 'toto', Y path '/view'")[0][0]
       
  1054         self.execute('SET X title XN + %(suffix)s WHERE X is Bookmark, X title XN', {'suffix': u'-moved'})
       
  1055         newname = self.execute('Any XN WHERE X eid %(x)s, X title XN', {'x': beid}, 'x')[0][0]
       
  1056         self.assertEquals(newname, 'toto-moved')
       
  1057                        
       
  1058     def test_update_query_error(self):
       
  1059         self.execute("INSERT Personne Y: Y nom 'toto'")
       
  1060         self.assertRaises(Exception, self.execute, "SET X nom 'toto', X is Personne")
       
  1061         self.assertRaises(QueryError, self.execute, "SET X nom 'toto', X has_text 'tutu' WHERE X is Personne")
       
  1062         self.assertRaises(QueryError, self.execute, "SET X login 'tutu', X eid %s" % cnx.user(self.session).eid)
       
  1063 
       
  1064        
       
  1065     # upassword encryption tests #################################################
       
  1066     
       
  1067     def test_insert_upassword(self):
       
  1068         rset = self.execute("INSERT EUser X: X login 'bob', X upassword 'toto'")
       
  1069         self.assertEquals(len(rset.rows), 1)
       
  1070         self.assertEquals(rset.description, [('EUser',)])
       
  1071         self.assertRaises(Unauthorized,
       
  1072                           self.execute, "Any P WHERE X is EUser, X login 'bob', X upassword P")
       
  1073         cursor = self.pool['system']
       
  1074         cursor.execute("SELECT upassword from EUser WHERE login='bob'")
       
  1075         passwd = cursor.fetchone()[0].getvalue()
       
  1076         self.assertEquals(passwd, crypt_password('toto', passwd[:2])) 
       
  1077         rset = self.execute("Any X WHERE X is EUser, X login 'bob', X upassword '%s'" % passwd)
       
  1078         self.assertEquals(len(rset.rows), 1)
       
  1079         self.assertEquals(rset.description, [('EUser',)])
       
  1080         
       
  1081     def test_update_upassword(self):
       
  1082         cursor = self.pool['system']
       
  1083         rset = self.execute("INSERT EUser X: X login 'bob', X upassword %(pwd)s", {'pwd': 'toto'})
       
  1084         self.assertEquals(rset.description[0][0], 'EUser')
       
  1085         rset = self.execute("SET X upassword %(pwd)s WHERE X is EUser, X login 'bob'",
       
  1086                             {'pwd': 'tutu'})
       
  1087         cursor.execute("SELECT upassword from EUser WHERE login='bob'")
       
  1088         passwd = cursor.fetchone()[0].getvalue()
       
  1089         self.assertEquals(passwd, crypt_password('tutu', passwd[:2])) 
       
  1090         rset = self.execute("Any X WHERE X is EUser, X login 'bob', X upassword '%s'" % passwd)
       
  1091         self.assertEquals(len(rset.rows), 1)
       
  1092         self.assertEquals(rset.description, [('EUser',)])
       
  1093 
       
  1094     # non regression tests ####################################################
       
  1095     
       
  1096     def test_nonregr_1(self):
       
  1097         teid = self.execute("INSERT Tag X: X name 'tag'")[0][0]
       
  1098         self.execute("SET X tags Y WHERE X name 'tag', Y is State, Y name 'activated'")
       
  1099         rset = self.execute('Any X WHERE T tags X')
       
  1100         self.assertEquals(len(rset.rows), 1, rset.rows)
       
  1101         rset = self.execute('Any T WHERE T tags X, X is State')
       
  1102         self.assertEquals(rset.rows, [[teid]])
       
  1103         rset = self.execute('Any T WHERE T tags X')
       
  1104         self.assertEquals(rset.rows, [[teid]])
       
  1105 
       
  1106     def test_nonregr_2(self):
       
  1107         teid = self.execute("INSERT Tag X: X name 'tag'")[0][0]
       
  1108         geid = self.execute("EGroup G WHERE G name 'users'")[0][0]
       
  1109         self.execute("SET X tags Y WHERE X eid %(t)s, Y eid %(g)s",
       
  1110                        {'g': geid, 't': teid})
       
  1111         rset = self.execute('Any X WHERE E eid %(x)s, E tags X',
       
  1112                               {'x': teid})
       
  1113         self.assertEquals(rset.rows, [[geid]])
       
  1114         
       
  1115     def test_nonregr_3(self):
       
  1116         """bad sql generated on the second query (destination_state is not
       
  1117         detected as an inlined relation)
       
  1118         """
       
  1119         rset = self.execute('Any S,ES,T WHERE S state_of ET, ET name "EUser",'
       
  1120                              'ES allowed_transition T, T destination_state S')
       
  1121         self.assertEquals(len(rset.rows), 2)
       
  1122 
       
  1123     def test_nonregr_4(self):
       
  1124         # fix variables'type, else we get (nb of entity types with a 'name' attribute)**3
       
  1125         # union queries and that make for instance a 266Ko sql query which is refused
       
  1126         # by the server (or client lib)
       
  1127         rset = self.execute('Any ER,SE,OE WHERE SE name "Comment", ER name "comments", OE name "Comment",'
       
  1128                             'ER is ERType, SE is EEType, OE is EEType')
       
  1129         self.assertEquals(len(rset), 1)
       
  1130 
       
  1131     def test_nonregr_5(self):
       
  1132         # jpl #15505: equivalent queries returning different result sets
       
  1133         teid1 = self.execute("INSERT Folder X: X name 'hop'")[0][0]
       
  1134         teid2 = self.execute("INSERT Folder X: X name 'hip'")[0][0]
       
  1135         neid = self.execute("INSERT Note X: X todo_by U, X filed_under T WHERE U login 'admin', T name 'hop'")[0][0]
       
  1136         weid = self.execute("INSERT Affaire X: X concerne N, X filed_under T WHERE N is Note, T name 'hip'")[0][0]
       
  1137         rset1 = self.execute('Any N,U WHERE N filed_under T, T eid %s,'
       
  1138                              'N todo_by U, W concerne N,'
       
  1139                              'W is Affaire, W filed_under A, A eid %s' % (teid1, teid2))
       
  1140         rset2 = self.execute('Any N,U WHERE N filed_under T, T eid %s,'
       
  1141                              'N todo_by U, W concerne N,'
       
  1142                              'W filed_under A, A eid %s' % (teid1, teid2))
       
  1143         rset3 = self.execute('Any N,U WHERE N todo_by U, T eid %s,'
       
  1144                              'N filed_under T, W concerne N,'
       
  1145                              'W is Affaire, W filed_under A, A eid %s' % (teid1, teid2))
       
  1146         rset4 = self.execute('Any N,U WHERE N todo_by U, T eid %s,'
       
  1147                              'N filed_under T, W concerne N,'
       
  1148                              'W filed_under A, A eid %s' % (teid1, teid2))
       
  1149         self.assertEquals(rset1.rows, rset2.rows)
       
  1150         self.assertEquals(rset1.rows, rset3.rows)
       
  1151         self.assertEquals(rset1.rows, rset4.rows)
       
  1152         
       
  1153     def test_nonregr_6(self):
       
  1154         self.execute('Any N,COUNT(S) GROUPBY N ORDERBY COUNT(N) WHERE S name N, S is State')
       
  1155         
       
  1156     def test_sqlite_encoding(self):
       
  1157         """XXX this test was trying to show a bug on use of lower which only
       
  1158         occurs with non ascii string and misconfigured locale
       
  1159         """
       
  1160         self.execute("INSERT Tag X: X name %(name)s,"
       
  1161                        "X modification_date %(modification_date)s,"
       
  1162                        "X creation_date %(creation_date)s",
       
  1163                        {'name': u'éname0',
       
  1164                         'modification_date': '2003/03/12 11:00',
       
  1165                         'creation_date': '2000/07/03 11:00'})
       
  1166         rset = self.execute('Any lower(N) ORDERBY LOWER(N) WHERE X is Tag, X name N,'
       
  1167                             'X owned_by U, U eid %(x)s',
       
  1168                             {'x':self.session.user.eid}, 'x')
       
  1169         self.assertEquals(rset.rows, [[u'\xe9name0']])
       
  1170 
       
  1171 
       
  1172     def test_nonregr_description(self):
       
  1173         """check that a correct description is built in case where infered
       
  1174         solutions may be "fusionned" into one by the querier while all solutions
       
  1175         are needed to build the result's description
       
  1176         """
       
  1177         self.execute("INSERT Personne X: X nom 'bidule'")
       
  1178         self.execute("INSERT Societe Y: Y nom 'toto'")
       
  1179         beid = self.execute("INSERT Basket B: B name 'mybasket'")[0][0]
       
  1180         self.execute("SET X in_basket B WHERE X is Personne")
       
  1181         self.execute("SET X in_basket B WHERE X is Societe")
       
  1182         rset = self.execute('Any X WHERE X in_basket B, B eid %s' % beid)
       
  1183         self.assertEquals(len(rset), 2)
       
  1184         self.assertEquals(rset.description, [('Personne',), ('Societe',)])
       
  1185 
       
  1186 
       
  1187     def test_nonregr_cache_1(self):
       
  1188         peid = self.execute("INSERT Personne X: X nom 'bidule'")[0][0]
       
  1189         beid = self.execute("INSERT Basket X: X name 'tag'")[0][0]
       
  1190         self.execute("SET X in_basket Y WHERE X is Personne, Y eid %(y)s",
       
  1191                        {'y': beid})
       
  1192         rset = self.execute("Any X WHERE X in_basket B, B eid %(x)s",
       
  1193                        {'x': beid})
       
  1194         self.assertEquals(rset.rows, [[peid]])
       
  1195         rset = self.execute("Any X WHERE X in_basket B, B eid %(x)s",
       
  1196                        {'x': beid})
       
  1197         self.assertEquals(rset.rows, [[peid]])
       
  1198 
       
  1199     def test_nonregr_has_text_cache(self):
       
  1200         eid1 = self.execute("INSERT Personne X: X nom 'bidule'")[0][0]
       
  1201         eid2 = self.execute("INSERT Personne X: X nom 'tag'")[0][0]
       
  1202         self.commit()
       
  1203         rset = self.execute("Any X WHERE X has_text %(text)s", {'text': 'bidule'})
       
  1204         self.assertEquals(rset.rows, [[eid1]])
       
  1205         rset = self.execute("Any X WHERE X has_text %(text)s", {'text': 'tag'})
       
  1206         self.assertEquals(rset.rows, [[eid2]])
       
  1207 
       
  1208     def test_nonregr_sortterm_management(self):
       
  1209         """Error: Variable has no attribute 'sql' in rql2sql.py (visit_variable)
       
  1210 
       
  1211         cause: old variable ref inserted into a fresh rqlst copy
       
  1212         (in RQLSpliter._complex_select_plan)
       
  1213         """
       
  1214         self.execute('Any X ORDERBY D DESC WHERE X creation_date D')
       
  1215     
       
  1216     def test_nonregr_extra_joins(self):
       
  1217         ueid = self.session.user.eid
       
  1218         teid1 = self.execute("INSERT Folder X: X name 'folder1'")[0][0]
       
  1219         teid2 = self.execute("INSERT Folder X: X name 'folder2'")[0][0]
       
  1220         neid1 = self.execute("INSERT Note X: X para 'note1'")[0][0]
       
  1221         neid2 = self.execute("INSERT Note X: X para 'note2'")[0][0]
       
  1222         self.execute("SET X filed_under Y WHERE X eid %s, Y eid %s"
       
  1223                        % (neid1, teid1))
       
  1224         self.execute("SET X filed_under Y WHERE X eid %s, Y eid %s"
       
  1225                        % (neid2, teid2))
       
  1226         self.execute("SET X todo_by Y WHERE X is Note, Y eid %s" % ueid)
       
  1227         rset = self.execute('Any N WHERE N todo_by U, N is Note, U eid %s, N filed_under T, T eid %s'
       
  1228                              % (ueid, teid1))
       
  1229         self.assertEquals(len(rset), 1)
       
  1230 
       
  1231     def test_nonregr_XXX(self):
       
  1232         teid = self.execute('Transition S WHERE S name "deactivate"')[0][0]
       
  1233         rset = self.execute('Any O WHERE O is State, '
       
  1234                              'S eid %(x)s, S transition_of ET, O state_of ET', {'x': teid})
       
  1235         self.assertEquals(len(rset), 2)
       
  1236         rset = self.execute('Any O WHERE O is State, NOT S destination_state O, '
       
  1237                              'S eid %(x)s, S transition_of ET, O state_of ET', {'x': teid})
       
  1238         self.assertEquals(len(rset), 1)
       
  1239 
       
  1240 
       
  1241     def test_nonregr_set_datetime(self):
       
  1242         # huum, psycopg specific
       
  1243         self.execute('SET X creation_date %(date)s WHERE X eid 1', {'date': today()})
       
  1244 
       
  1245     def test_nonregr_set_query(self):
       
  1246         ueid = self.execute("INSERT EUser X: X login 'bob', X upassword 'toto'")[0][0]
       
  1247         self.execute("SET E in_group G, E in_state S, "
       
  1248                       "E firstname %(firstname)s, E surname %(surname)s "
       
  1249                       "WHERE E eid %(x)s, G name 'users', S name 'activated'",
       
  1250                       {'x':ueid, 'firstname': u'jean', 'surname': u'paul'}, 'x')
       
  1251         
       
  1252     def test_nonregr_u_owned_by_u(self):
       
  1253         ueid = self.execute("INSERT EUser X: X login 'bob', X upassword 'toto', X in_group G "
       
  1254                              "WHERE G name 'users'")[0][0]
       
  1255         rset = self.execute("EUser U")
       
  1256         self.assertEquals(len(rset), 3) # bob + admin + anon
       
  1257         rset = self.execute("Any U WHERE NOT U owned_by U")
       
  1258         self.assertEquals(len(rset), 0) # even admin created at repo initialization time should belong to itself
       
  1259 
       
  1260     def test_nonreg_update_index(self):
       
  1261         # this is the kind of queries generated by "cubicweb-ctl db-check -ry"
       
  1262         self.execute("SET X description D WHERE X is State, X description D")
       
  1263 
       
  1264     def test_nonregr_is(self):
       
  1265         uteid = self.execute('Any ET WHERE ET name "EUser"')[0][0]
       
  1266         self.execute('Any X, ET WHERE X is ET, ET eid %s' % uteid)
       
  1267 
       
  1268     def test_nonregr_orderby(self):
       
  1269         seid = self.execute('Any X WHERE X name "activated"')[0][0]
       
  1270         self.execute('Any X,S, MAX(T) GROUPBY X,S ORDERBY S WHERE X is EUser, T tags X, S eid IN(%s), X in_state S' % seid)
       
  1271 
       
  1272     def test_nonregr_solution_cache(self):
       
  1273         self.skip('XXX should be fixed or documented') # (doesn't occur if cache key is provided.)
       
  1274         rset = self.execute('Any X WHERE X is EUser, X eid %(x)s', {'x':self.ueid})
       
  1275         self.assertEquals(len(rset), 1)
       
  1276         rset = self.execute('Any X WHERE X is EUser, X eid %(x)s', {'x':12345})
       
  1277         self.assertEquals(len(rset), 0)
       
  1278 
       
  1279     def test_nonregr_final_norestr(self):
       
  1280         self.assertRaises(BadRQLQuery, self.execute, 'Date X')
       
  1281 
       
  1282 
       
  1283 if __name__ == '__main__':
       
  1284     unittest_main()