cubicweb/test/unittest_rset.py
branch3.26
changeset 12420 1c0dce14c3b5
parent 12063 4bcb58aa103a
child 12421 e460eac6e648
equal deleted inserted replaced
12419:e034454af0d5 12420:1c0dce14c3b5
    42     """TestCase for cubicweb.rset.attr_desc_iterator"""
    42     """TestCase for cubicweb.rset.attr_desc_iterator"""
    43 
    43 
    44     def test_relations_description(self):
    44     def test_relations_description(self):
    45         """tests relations_description() function"""
    45         """tests relations_description() function"""
    46         queries = {
    46         queries = {
    47             'Any U,L,M where U is CWUser, U login L, U mail M': [(1, 'login', 'subject'), (2, 'mail', 'subject')],
    47             'Any U,L,M where U is CWUser, U login L, U mail M': [
    48             'Any U,L,M where U is CWUser, L is Foo, U mail M': [(2, 'mail', 'subject')],
    48                 (1, 'login', 'subject'), (2, 'mail', 'subject')],
    49             'Any C,P where C is Company, C employs P': [(1, 'employs', 'subject')],
    49             'Any U,L,M where U is CWUser, L is Foo, U mail M': [
       
    50                 (2, 'mail', 'subject')],
       
    51             'Any C,P where C is Company, C employs P': [
       
    52                 (1, 'employs', 'subject')],
    50             'Any C,P where C is Company, P employed_by P': [],
    53             'Any C,P where C is Company, P employed_by P': [],
    51             'Any C where C is Company, C employs P': [],
    54             'Any C where C is Company, C employs P': [],
    52         }
    55         }
    53         for rql, relations in queries.items():
    56         for rql, relations in queries.items():
    54             result = list(attr_desc_iterator(parse(rql).children[0], 0, 0))
    57             result = list(attr_desc_iterator(parse(rql).children[0], 0, 0))
    66                 result = list(attr_desc_iterator(parse(rql).children[0], idx, idx))
    69                 result = list(attr_desc_iterator(parse(rql).children[0], idx, idx))
    67                 self.assertEqual(result, relations)
    70                 self.assertEqual(result, relations)
    68 
    71 
    69     def test_subquery_callfunc(self):
    72     def test_subquery_callfunc(self):
    70         rql = ('Any A,B,C,COUNT(D) GROUPBY A,B,C WITH A,B,C,D BEING '
    73         rql = ('Any A,B,C,COUNT(D) GROUPBY A,B,C WITH A,B,C,D BEING '
    71                '(Any YEAR(CD), MONTH(CD), S, X WHERE X is CWUser, X creation_date CD, X in_state S)')
    74                '(Any YEAR(CD), MONTH(CD), S, X WHERE X is CWUser, '
       
    75                'X creation_date CD, X in_state S)')
    72         rqlst = parse(rql)
    76         rqlst = parse(rql)
    73         select, col = rqlst.locate_subquery(2, 'CWUser', None)
    77         select, col = rqlst.locate_subquery(2, 'CWUser', None)
    74         result = list(attr_desc_iterator(select, col, 2))
    78         result = list(attr_desc_iterator(select, col, 2))
    75         self.assertEqual(result, [])
    79         self.assertEqual(result, [])
    76 
    80 
    77     def test_subquery_callfunc_2(self):
    81     def test_subquery_callfunc_2(self):
    78         rql = ('Any X,S,L WHERE X in_state S WITH X, L BEING (Any X,MAX(L) GROUPBY X WHERE X is CWUser, T wf_info_for X, T creation_date L)')
    82         rql = ('Any X,S,L WHERE '
       
    83                'X in_state S WITH X, L BEING (Any X,MAX(L) GROUPBY X WHERE '
       
    84                'X is CWUser, T wf_info_for X, T creation_date L)')
    79         rqlst = parse(rql)
    85         rqlst = parse(rql)
    80         select, col = rqlst.locate_subquery(0, 'CWUser', None)
    86         select, col = rqlst.locate_subquery(0, 'CWUser', None)
    81         result = list(attr_desc_iterator(select, col, 0))
    87         result = list(attr_desc_iterator(select, col, 0))
    82         self.assertEqual(result, [(1, 'in_state', 'subject')])
    88         self.assertEqual(result, [(1, 'in_state', 'subject')])
    83 
    89 
   113             baseurl = req.base_url()
   119             baseurl = req.base_url()
   114             self.compare_urls(req.build_url('view', vid='foo', rql='yo'),
   120             self.compare_urls(req.build_url('view', vid='foo', rql='yo'),
   115                               '%sview?vid=foo&rql=yo' % baseurl)
   121                               '%sview?vid=foo&rql=yo' % baseurl)
   116             self.compare_urls(req.build_url('view', _restpath='task/title/go'),
   122             self.compare_urls(req.build_url('view', _restpath='task/title/go'),
   117                               '%stask/title/go' % baseurl)
   123                               '%stask/title/go' % baseurl)
   118             #self.compare_urls(req.build_url('view', _restpath='/task/title/go'),
   124             # self.compare_urls(req.build_url('view', _restpath='/task/title/go'),
   119             #                  '%stask/title/go' % baseurl)
   125             #                   '%stask/title/go' % baseurl)
   120             # empty _restpath should not crash
   126             # empty _restpath should not crash
   121             self.compare_urls(req.build_url('view', _restpath=''), baseurl)
   127             self.compare_urls(req.build_url('view', _restpath=''), baseurl)
   122 
   128 
   123 
       
   124     def test_build(self):
   129     def test_build(self):
   125         """test basic build of a ResultSet"""
   130         """test basic build of a ResultSet"""
   126         rs = ResultSet([1,2,3], 'CWGroup X', description=['CWGroup', 'CWGroup', 'CWGroup'])
   131         rs = ResultSet([1, 2, 3], 'CWGroup X',
       
   132                        description=['CWGroup', 'CWGroup', 'CWGroup'])
   127         self.assertEqual(rs.rowcount, 3)
   133         self.assertEqual(rs.rowcount, 3)
   128         self.assertEqual(rs.rows, [1,2,3])
   134         self.assertEqual(rs.rows, [1, 2, 3])
   129         self.assertEqual(rs.description, ['CWGroup', 'CWGroup', 'CWGroup'])
   135         self.assertEqual(rs.description, ['CWGroup', 'CWGroup', 'CWGroup'])
   130 
       
   131 
   136 
   132     def test_limit(self):
   137     def test_limit(self):
   133         rs = ResultSet([[12000, 'adim'], [13000, 'syt'], [14000, 'nico']],
   138         rs = ResultSet([[12000, 'adim'], [13000, 'syt'], [14000, 'nico']],
   134                        'Any U,L where U is CWUser, U login L',
   139                        'Any U,L where U is CWUser, U login L',
   135                        description=[['CWUser', 'String']] * 3)
   140                        description=[['CWUser', 'String']] * 3)
   146     def test_limit_2(self):
   151     def test_limit_2(self):
   147         with self.admin_access.web_request() as req:
   152         with self.admin_access.web_request() as req:
   148             rs = req.execute('Any E,U WHERE E is CWEType, E created_by U')
   153             rs = req.execute('Any E,U WHERE E is CWEType, E created_by U')
   149             # get entity on row 9. This will fill its created_by relation cache,
   154             # get entity on row 9. This will fill its created_by relation cache,
   150             # with cwuser on row 9 as well
   155             # with cwuser on row 9 as well
   151             e1 = rs.get_entity(9, 0)
   156             e1 = rs.get_entity(9, 0)  # noqa
   152             # get entity on row 10. This will fill its created_by relation cache,
   157             # get entity on row 10. This will fill its created_by relation cache,
   153             # with cwuser built on row 9
   158             # with cwuser built on row 9
   154             e2 = rs.get_entity(10, 0)
   159             e2 = rs.get_entity(10, 0)
   155             # limit result set from row 10
   160             # limit result set from row 10
   156             rs.limit(1, 10, inplace=True)
   161             rs.limit(1, 10, inplace=True)
   169                        'Any U,L where U is CWUser, U login L',
   174                        'Any U,L where U is CWUser, U login L',
   170                        description=[['CWUser', 'String']] * 3)
   175                        description=[['CWUser', 'String']] * 3)
   171         with self.admin_access.web_request() as req:
   176         with self.admin_access.web_request() as req:
   172             rs.req = req
   177             rs.req = req
   173             rs.vreg = self.vreg
   178             rs.vreg = self.vreg
       
   179 
   174             def test_filter(entity):
   180             def test_filter(entity):
   175                 return entity.login != 'nico'
   181                 return entity.login != 'nico'
   176 
   182 
   177             rs2 = rs.filtered_rset(test_filter)
   183             rs2 = rs.filtered_rset(test_filter)
   178             self.assertEqual(len(rs2), 2)
   184             self.assertEqual(len(rs2), 2)
   183         rs = ResultSet([[12, 'adim'], [13, 'syt'], [14, 'nico']],
   189         rs = ResultSet([[12, 'adim'], [13, 'syt'], [14, 'nico']],
   184                        'Any U,L where U is CWUser, U login L',
   190                        'Any U,L where U is CWUser, U login L',
   185                        description=[['CWUser', 'String']] * 3)
   191                        description=[['CWUser', 'String']] * 3)
   186         with self.admin_access.web_request() as req:
   192         with self.admin_access.web_request() as req:
   187             rs.req = req
   193             rs.req = req
       
   194 
   188             def test_transform(row, desc):
   195             def test_transform(row, desc):
   189                 return row[1:], desc[1:]
   196                 return row[1:], desc[1:]
   190             rs2 = rs.transformed_rset(test_transform)
   197             rs2 = rs.transformed_rset(test_transform)
   191 
   198 
   192             self.assertEqual(len(rs2), 3)
   199             self.assertEqual(len(rs2), 3)
   193             self.assertEqual(list(rs2), [['adim'],['syt'],['nico']])
   200             self.assertEqual(list(rs2), [['adim'], ['syt'], ['nico']])
   194 
   201 
   195     def test_sort(self):
   202     def test_sort(self):
   196         rs = ResultSet([[12000, 'adim'], [13000, 'syt'], [14000, 'nico']],
   203         rs = ResultSet([[12000, 'adim'], [13000, 'syt'], [14000, 'nico']],
   197                        'Any U,L where U is CWUser, U login L',
   204                        'Any U,L where U is CWUser, U login L',
   198                        description=[['CWUser', 'String']] * 3)
   205                        description=[['CWUser', 'String']] * 3)
   199         with self.admin_access.web_request() as req:
   206         with self.admin_access.web_request() as req:
   200             rs.req = req
   207             rs.req = req
   201             rs.vreg = self.vreg
   208             rs.vreg = self.vreg
   202 
   209 
   203             rs2 = rs.sorted_rset(lambda e:e.cw_attr_cache['login'])
   210             rs2 = rs.sorted_rset(lambda e: e.cw_attr_cache['login'])
   204             self.assertEqual(len(rs2), 3)
   211             self.assertEqual(len(rs2), 3)
   205             self.assertEqual([login for _, login in rs2], ['adim', 'nico', 'syt'])
   212             self.assertEqual([login for _, login in rs2], ['adim', 'nico', 'syt'])
   206             # make sure rs is unchanged
   213             # make sure rs is unchanged
   207             self.assertEqual([login for _, login in rs], ['adim', 'syt', 'nico'])
   214             self.assertEqual([login for _, login in rs], ['adim', 'syt', 'nico'])
   208 
   215 
   209             rs2 = rs.sorted_rset(lambda e:e.cw_attr_cache['login'], reverse=True)
   216             rs2 = rs.sorted_rset(lambda e: e.cw_attr_cache['login'], reverse=True)
   210             self.assertEqual(len(rs2), 3)
   217             self.assertEqual(len(rs2), 3)
   211             self.assertEqual([login for _, login in rs2], ['syt', 'nico', 'adim'])
   218             self.assertEqual([login for _, login in rs2], ['syt', 'nico', 'adim'])
   212             # make sure rs is unchanged
   219             # make sure rs is unchanged
   213             self.assertEqual([login for _, login in rs], ['adim', 'syt', 'nico'])
   220             self.assertEqual([login for _, login in rs], ['adim', 'syt', 'nico'])
   214 
   221 
   219             self.assertEqual([login for _, login in rs], ['adim', 'syt', 'nico'])
   226             self.assertEqual([login for _, login in rs], ['adim', 'syt', 'nico'])
   220 
   227 
   221     def test_split(self):
   228     def test_split(self):
   222         rs = ResultSet([[12000, 'adim', u'Adim chez les pinguins'],
   229         rs = ResultSet([[12000, 'adim', u'Adim chez les pinguins'],
   223                         [12000, 'adim', u'Jardiner facile'],
   230                         [12000, 'adim', u'Jardiner facile'],
   224                         [13000, 'syt',  u'Le carrelage en 42 leçons'],
   231                         [13000, 'syt', u'Le carrelage en 42 leçons'],
   225                         [14000, 'nico', u'La tarte tatin en 15 minutes'],
   232                         [14000, 'nico', u'La tarte tatin en 15 minutes'],
   226                         [14000, 'nico', u"L'épluchage du castor commun"]],
   233                         [14000, 'nico', u"L'épluchage du castor commun"]],
   227                        'Any U, L, T WHERE U is CWUser, U login L,'\
   234                        ('Any U, L, T WHERE U is CWUser, U login L,'
   228                        'D created_by U, D title T',
   235                         'D created_by U, D title T'),
   229                        description=[['CWUser', 'String', 'String']] * 5)
   236                        description=[['CWUser', 'String', 'String']] * 5)
   230         with self.admin_access.web_request() as req:
   237         with self.admin_access.web_request() as req:
   231             rs.req = req
   238             rs.req = req
   232             rs.vreg = self.vreg
   239             rs.vreg = self.vreg
   233             rsets = rs.split_rset(lambda e:e.cw_attr_cache['login'])
   240             rsets = rs.split_rset(lambda e: e.cw_attr_cache['login'])
   234             self.assertEqual(len(rsets), 3)
   241             self.assertEqual(len(rsets), 3)
   235             self.assertEqual([login for _, login,_ in rsets[0]], ['adim', 'adim'])
   242             self.assertEqual([login for _, login, _ in rsets[0]], ['adim', 'adim'])
   236             self.assertEqual([login for _, login,_ in rsets[1]], ['syt'])
   243             self.assertEqual([login for _, login, _ in rsets[1]], ['syt'])
   237             self.assertEqual([login for _, login,_ in rsets[2]], ['nico', 'nico'])
   244             self.assertEqual([login for _, login, _ in rsets[2]], ['nico', 'nico'])
   238             # make sure rs is unchanged
   245             # make sure rs is unchanged
   239             self.assertEqual([login for _, login,_ in rs], ['adim', 'adim', 'syt', 'nico', 'nico'])
   246             self.assertEqual([login for _, login, _ in rs], ['adim', 'adim', 'syt', 'nico', 'nico'])
   240 
   247 
   241             rsets = rs.split_rset(lambda e:e.cw_attr_cache['login'], return_dict=True)
   248             rsets = rs.split_rset(lambda e: e.cw_attr_cache['login'], return_dict=True)
   242             self.assertEqual(len(rsets), 3)
   249             self.assertEqual(len(rsets), 3)
   243             self.assertEqual([login for _, login,_ in rsets['nico']], ['nico', 'nico'])
   250             self.assertEqual([login for _, login, _ in rsets['nico']], ['nico', 'nico'])
   244             self.assertEqual([login for _, login,_ in rsets['adim']], ['adim', 'adim'])
   251             self.assertEqual([login for _, login, _ in rsets['adim']], ['adim', 'adim'])
   245             self.assertEqual([login for _, login,_ in rsets['syt']], ['syt'])
   252             self.assertEqual([login for _, login, _ in rsets['syt']], ['syt'])
   246             # make sure rs is unchanged
   253             # make sure rs is unchanged
   247             self.assertEqual([login for _, login,_ in rs], ['adim', 'adim', 'syt', 'nico', 'nico'])
   254             self.assertEqual([login for _, login, _ in rs], ['adim', 'adim', 'syt', 'nico', 'nico'])
   248 
   255 
   249             rsets = rs.split_rset(lambda s: s.count('d'), col=2)
   256             rsets = rs.split_rset(lambda s: s.count('d'), col=2)
   250             self.assertEqual(len(rsets), 2)
   257             self.assertEqual(len(rsets), 2)
   251             self.assertEqual([title for _, _, title in rsets[0]],
   258             self.assertEqual([title for _, _, title in rsets[0]],
   252                               [u"Adim chez les pinguins",
   259                              [u"Adim chez les pinguins",
   253                                u"Jardiner facile",
   260                               u"Jardiner facile",
   254                                u"L'épluchage du castor commun",])
   261                               u"L'épluchage du castor commun"])
   255             self.assertEqual([title for _, _, title in rsets[1]],
   262             self.assertEqual([title for _, _, title in rsets[1]],
   256                               [u"Le carrelage en 42 leçons",
   263                              [u"Le carrelage en 42 leçons",
   257                                u"La tarte tatin en 15 minutes",])
   264                               u"La tarte tatin en 15 minutes"])
   258             # make sure rs is unchanged
   265             # make sure rs is unchanged
   259             self.assertEqual([title for _, _, title in rs],
   266             self.assertEqual([title for _, _, title in rs],
   260                               [u'Adim chez les pinguins',
   267                              [u'Adim chez les pinguins',
   261                                u'Jardiner facile',
   268                               u'Jardiner facile',
   262                                u'Le carrelage en 42 leçons',
   269                               u'Le carrelage en 42 leçons',
   263                                u'La tarte tatin en 15 minutes',
   270                               u'La tarte tatin en 15 minutes',
   264                                u"L'épluchage du castor commun"])
   271                               u"L'épluchage du castor commun"])
   265 
   272 
   266     def test_cached_syntax_tree(self):
   273     def test_cached_syntax_tree(self):
   267         """make sure syntax tree is cached"""
   274         """make sure syntax tree is cached"""
   268         rqlst1 = self.rset.syntax_tree()
   275         rqlst1 = self.rset.syntax_tree()
   269         rqlst2 = self.rset.syntax_tree()
   276         rqlst2 = self.rset.syntax_tree()
   306             e = rset.get_entity(0, 1)
   313             e = rset.get_entity(0, 1)
   307             self.assertEqual(e.cw_row, 0)
   314             self.assertEqual(e.cw_row, 0)
   308             self.assertEqual(e.cw_col, 1)
   315             self.assertEqual(e.cw_col, 1)
   309             self.assertEqual(e.cw_attr_cache['login'], 'anon')
   316             self.assertEqual(e.cw_attr_cache['login'], 'anon')
   310             self.assertRaises(KeyError, e.cw_attr_cache.__getitem__, 'firstname')
   317             self.assertRaises(KeyError, e.cw_attr_cache.__getitem__, 'firstname')
   311             self.assertEqual(pprelcachedict(e._cw_related_cache),
   318             self.assertEqual(pprelcachedict(e._cw_related_cache), [])
   312                               [])
       
   313             e.complete()
   319             e.complete()
   314             self.assertEqual(e.cw_attr_cache['firstname'], None)
   320             self.assertEqual(e.cw_attr_cache['firstname'], None)
   315             self.assertEqual(e.view('text'), 'anon')
   321             self.assertEqual(e.view('text'), 'anon')
   316             self.assertEqual(pprelcachedict(e._cw_related_cache),
   322             self.assertEqual(pprelcachedict(e._cw_related_cache), [])
   317                               [])
       
   318 
   323 
   319             self.assertRaises(NotAnEntity, rset.get_entity, 0, 2)
   324             self.assertRaises(NotAnEntity, rset.get_entity, 0, 2)
   320             self.assertRaises(NotAnEntity, rset.get_entity, 0, 3)
   325             self.assertRaises(NotAnEntity, rset.get_entity, 0, 3)
   321 
   326 
   322     def test_get_entity_relation_cache_compt(self):
   327     def test_get_entity_relation_cache_compt(self):
   325             e = rset.get_entity(0, 0)
   330             e = rset.get_entity(0, 0)
   326             seid = req.execute('State X WHERE X name "activated"')[0][0]
   331             seid = req.execute('State X WHERE X name "activated"')[0][0]
   327             # for_user / in_group are prefetched in CWUser __init__, in_state should
   332             # for_user / in_group are prefetched in CWUser __init__, in_state should
   328             # be filed from our query rset
   333             # be filed from our query rset
   329             self.assertEqual(pprelcachedict(e._cw_related_cache),
   334             self.assertEqual(pprelcachedict(e._cw_related_cache),
   330                               [('in_state_subject', [seid])])
   335                              [('in_state_subject', [seid])])
   331 
   336 
   332     def test_get_entity_advanced_prefilled_cache(self):
   337     def test_get_entity_advanced_prefilled_cache(self):
   333         with self.admin_access.web_request() as req:
   338         with self.admin_access.web_request() as req:
   334             e = req.create_entity('Bookmark', title=u'zou', path=u'path')
   339             e = req.create_entity('Bookmark', title=u'zou', path=u'path')
   335             req.cnx.commit()
   340             req.cnx.commit()
   336             rset = req.execute('Any X,U,S,XT,UL,SN WHERE X created_by U, U in_state S, '
   341             rset = req.execute('Any X,U,S,XT,UL,SN WHERE X created_by U, U in_state S, '
   337                                 'X title XT, S name SN, U login UL, X eid %s' % e.eid)
   342                                'X title XT, S name SN, U login UL, X eid %s' % e.eid)
   338             e = rset.get_entity(0, 0)
   343             e = rset.get_entity(0, 0)
   339             self.assertEqual(e.cw_attr_cache['title'], 'zou')
   344             self.assertEqual(e.cw_attr_cache['title'], 'zou')
   340             self.assertEqual(pprelcachedict(e._cw_related_cache),
   345             self.assertEqual(pprelcachedict(e._cw_related_cache),
   341                              [('created_by_subject', [req.user.eid])])
   346                              [('created_by_subject', [req.user.eid])])
   342             # first level of recursion
   347             # first level of recursion
   369             rset.get_entity(0, 1)
   374             rset.get_entity(0, 1)
   370             self.assertTrue(mail.cw_relation_cached('primary_email', 'object'))
   375             self.assertTrue(mail.cw_relation_cached('primary_email', 'object'))
   371             u = cnx.user
   376             u = cnx.user
   372             self.assertTrue(u.cw_relation_cached('primary_email', 'subject'))
   377             self.assertTrue(u.cw_relation_cached('primary_email', 'subject'))
   373 
   378 
   374 
       
   375     def test_get_entity_cache_with_left_outer_join(self):
   379     def test_get_entity_cache_with_left_outer_join(self):
   376         with self.admin_access.web_request() as req:
   380         with self.admin_access.web_request() as req:
   377             eid = req.execute('INSERT CWUser E: E login "joe", E upassword "joe", E in_group G '
   381             eid = req.execute('INSERT CWUser E: E login "joe", E upassword "joe", E in_group G '
   378                                'WHERE G name "users"')[0][0]
   382                               'WHERE G name "users"')[0][0]
   379             rset = req.execute('Any X,E WHERE X eid %(x)s, X primary_email E?', {'x': eid})
   383             rset = req.execute('Any X,E WHERE X eid %(x)s, X primary_email E?', {'x': eid})
   380             e = rset.get_entity(0, 0)
   384             e = rset.get_entity(0, 0)
   381             # if any of the assertion below fails with a KeyError, the relation is not cached
   385             # if any of the assertion below fails with a KeyError, the relation is not cached
   382             # related entities should be an empty list
   386             # related entities should be an empty list
   383             self.assertEqual(e._cw_related_cache['primary_email_subject'][True], ())
   387             self.assertEqual(e._cw_related_cache['primary_email_subject'][True], ())
   384             # related rset should be an empty rset
   388             # related rset should be an empty rset
   385             cached = e._cw_related_cache['primary_email_subject'][False]
   389             cached = e._cw_related_cache['primary_email_subject'][False]
   386             self.assertIsInstance(cached, ResultSet)
   390             self.assertIsInstance(cached, ResultSet)
   387             self.assertEqual(cached.rowcount, 0)
   391             self.assertEqual(cached.rowcount, 0)
   388 
   392 
   389 
       
   390     def test_get_entity_union(self):
   393     def test_get_entity_union(self):
   391         with self.admin_access.web_request() as req:
   394         with self.admin_access.web_request() as req:
   392             e = req.create_entity('Bookmark', title=u'manger', path=u'path')
   395             req.create_entity('Bookmark', title=u'manger', path=u'path')
   393             req.drop_entity_cache()
   396             req.drop_entity_cache()
   394             rset = req.execute('Any X,N ORDERBY N WITH X,N BEING '
   397             rset = req.execute('Any X,N ORDERBY N WITH X,N BEING '
   395                                 '((Any X,N WHERE X is Bookmark, X title N)'
   398                                '((Any X,N WHERE X is Bookmark, X title N)'
   396                                 ' UNION '
   399                                ' UNION '
   397                                 ' (Any X,N WHERE X is CWGroup, X name N))')
   400                                ' (Any X,N WHERE X is CWGroup, X name N))')
   398             expected = (('CWGroup', 'guests'), ('CWGroup', 'managers'),
   401             expected = (('CWGroup', 'guests'), ('CWGroup', 'managers'),
   399                         ('Bookmark', 'manger'), ('CWGroup', 'owners'),
   402                         ('Bookmark', 'manger'), ('CWGroup', 'owners'),
   400                         ('CWGroup', 'users'))
   403                         ('CWGroup', 'users'))
   401             for entity in rset.entities(): # test get_entity for each row actually
   404             for entity in rset.entities():  # test get_entity for each row actually
   402                 etype, n = expected[entity.cw_row]
   405                 etype, n = expected[entity.cw_row]
   403                 self.assertEqual(entity.cw_etype, etype)
   406                 self.assertEqual(entity.cw_etype, etype)
   404                 attr = etype == 'Bookmark' and 'title' or 'name'
   407                 attr = etype == 'Bookmark' and 'title' or 'name'
   405                 self.assertEqual(entity.cw_attr_cache[attr], n)
   408                 self.assertEqual(entity.cw_attr_cache[attr], n)
   406 
   409 
   407     def test_one(self):
   410     def test_one(self):
   408         with self.admin_access.web_request() as req:
   411         with self.admin_access.web_request() as req:
   409             req.create_entity('CWUser', login=u'cdevienne',
   412             req.create_entity('CWUser',
   410                                          upassword=u'cdevienne',
   413                               login=u'cdevienne',
   411                                          surname=u'de Vienne',
   414                               upassword=u'cdevienne',
   412                                          firstname=u'Christophe')
   415                               surname=u'de Vienne',
       
   416                               firstname=u'Christophe')
   413             e = req.execute('Any X WHERE X login "cdevienne"').one()
   417             e = req.execute('Any X WHERE X login "cdevienne"').one()
   414 
   418 
   415             self.assertEqual(e.surname, u'de Vienne')
   419             self.assertEqual(e.surname, u'de Vienne')
   416 
   420 
   417             e = req.execute(
   421             e = req.execute(
   440             with self.assertRaises(MultipleResultsError):
   444             with self.assertRaises(MultipleResultsError):
   441                 req.execute('Any X WHERE X is CWUser').one()
   445                 req.execute('Any X WHERE X is CWUser').one()
   442 
   446 
   443     def test_related_entity_optional(self):
   447     def test_related_entity_optional(self):
   444         with self.admin_access.web_request() as req:
   448         with self.admin_access.web_request() as req:
   445             e = req.create_entity('Bookmark', title=u'aaaa', path=u'path')
   449             req.create_entity('Bookmark', title=u'aaaa', path=u'path')
   446             rset = req.execute('Any B,U,L WHERE B bookmarked_by U?, U login L')
   450             rset = req.execute('Any B,U,L WHERE B bookmarked_by U?, U login L')
   447             entity, rtype = rset.related_entity(0, 2)
   451             entity, rtype = rset.related_entity(0, 2)
   448             self.assertEqual(entity, None)
   452             self.assertEqual(entity, None)
   449             self.assertEqual(rtype, None)
   453             self.assertEqual(rtype, None)
   450 
   454 
   451     def test_related_entity_union_subquery_1(self):
   455     def test_related_entity_union_subquery_1(self):
   452         with self.admin_access.web_request() as req:
   456         with self.admin_access.web_request() as req:
   453             e = req.create_entity('Bookmark', title=u'aaaa', path=u'path')
   457             e = req.create_entity('Bookmark', title=u'aaaa', path=u'path')
   454             rset = req.execute('Any X,N ORDERBY N WITH X,N BEING '
   458             rset = req.execute('Any X,N ORDERBY N WITH X,N BEING '
   455                                 '((Any X,N WHERE X is CWGroup, X name N)'
   459                                '((Any X,N WHERE X is CWGroup, X name N)'
   456                                 ' UNION '
   460                                ' UNION '
   457                                 ' (Any X,N WHERE X is Bookmark, X title N))')
   461                                ' (Any X,N WHERE X is Bookmark, X title N))')
   458             entity, rtype = rset.related_entity(0, 1)
   462             entity, rtype = rset.related_entity(0, 1)
   459             self.assertEqual(entity.eid, e.eid)
   463             self.assertEqual(entity.eid, e.eid)
   460             self.assertEqual(rtype, 'title')
   464             self.assertEqual(rtype, 'title')
   461             self.assertEqual(entity.title, 'aaaa')
   465             self.assertEqual(entity.title, 'aaaa')
   462             entity, rtype = rset.related_entity(1, 1)
   466             entity, rtype = rset.related_entity(1, 1)
   466 
   470 
   467     def test_related_entity_union_subquery_2(self):
   471     def test_related_entity_union_subquery_2(self):
   468         with self.admin_access.web_request() as req:
   472         with self.admin_access.web_request() as req:
   469             e = req.create_entity('Bookmark', title=u'aaaa', path=u'path')
   473             e = req.create_entity('Bookmark', title=u'aaaa', path=u'path')
   470             rset = req.execute('Any X,N ORDERBY N WHERE X is Bookmark WITH X,N BEING '
   474             rset = req.execute('Any X,N ORDERBY N WHERE X is Bookmark WITH X,N BEING '
   471                                 '((Any X,N WHERE X is CWGroup, X name N)'
   475                                '((Any X,N WHERE X is CWGroup, X name N)'
   472                                 ' UNION '
   476                                ' UNION '
   473                                 ' (Any X,N WHERE X is Bookmark, X title N))')
   477                                ' (Any X,N WHERE X is Bookmark, X title N))')
   474             entity, rtype = rset.related_entity(0, 1)
   478             entity, rtype = rset.related_entity(0, 1)
   475             self.assertEqual(entity.eid, e.eid)
   479             self.assertEqual(entity.eid, e.eid)
   476             self.assertEqual(rtype, 'title')
   480             self.assertEqual(rtype, 'title')
   477             self.assertEqual(entity.title, 'aaaa')
   481             self.assertEqual(entity.title, 'aaaa')
   478 
   482 
   479     def test_related_entity_union_subquery_3(self):
   483     def test_related_entity_union_subquery_3(self):
   480         with self.admin_access.web_request() as req:
   484         with self.admin_access.web_request() as req:
   481             e = req.create_entity('Bookmark', title=u'aaaa', path=u'path')
   485             e = req.create_entity('Bookmark', title=u'aaaa', path=u'path')
   482             rset = req.execute('Any X,N ORDERBY N WITH N,X BEING '
   486             rset = req.execute('Any X,N ORDERBY N WITH N,X BEING '
   483                                 '((Any N,X WHERE X is CWGroup, X name N)'
   487                                '((Any N,X WHERE X is CWGroup, X name N)'
   484                                 ' UNION '
   488                                ' UNION '
   485                                 ' (Any N,X WHERE X is Bookmark, X title N))')
   489                                ' (Any N,X WHERE X is Bookmark, X title N))')
   486             entity, rtype = rset.related_entity(0, 1)
   490             entity, rtype = rset.related_entity(0, 1)
   487             self.assertEqual(entity.eid, e.eid)
   491             self.assertEqual(entity.eid, e.eid)
   488             self.assertEqual(rtype, 'title')
   492             self.assertEqual(rtype, 'title')
   489             self.assertEqual(entity.title, 'aaaa')
   493             self.assertEqual(entity.title, 'aaaa')
   490 
   494 
   491     def test_related_entity_union_subquery_4(self):
   495     def test_related_entity_union_subquery_4(self):
   492         with self.admin_access.web_request() as req:
   496         with self.admin_access.web_request() as req:
   493             e = req.create_entity('Bookmark', title=u'aaaa', path=u'path')
   497             e = req.create_entity('Bookmark', title=u'aaaa', path=u'path')
   494             rset = req.execute('Any X,X, N ORDERBY N WITH X,N BEING '
   498             rset = req.execute('Any X,X, N ORDERBY N WITH X,N BEING '
   495                                 '((Any X,N WHERE X is CWGroup, X name N)'
   499                                '((Any X,N WHERE X is CWGroup, X name N)'
   496                                 ' UNION '
   500                                ' UNION '
   497                                 ' (Any X,N WHERE X is Bookmark, X title N))')
   501                                ' (Any X,N WHERE X is Bookmark, X title N))')
   498             entity, rtype = rset.related_entity(0, 2)
   502             entity, rtype = rset.related_entity(0, 2)
   499             self.assertEqual(entity.eid, e.eid)
   503             self.assertEqual(entity.eid, e.eid)
   500             self.assertEqual(rtype, 'title')
   504             self.assertEqual(rtype, 'title')
   501             self.assertEqual(entity.title, 'aaaa')
   505             self.assertEqual(entity.title, 'aaaa')
   502 
   506 
   503     def test_related_entity_trap_subquery(self):
   507     def test_related_entity_trap_subquery(self):
   504         with self.admin_access.web_request() as req:
   508         with self.admin_access.web_request() as req:
   505             req.create_entity('Bookmark', title=u'test bookmark', path=u'')
   509             req.create_entity('Bookmark', title=u'test bookmark', path=u'')
   506             req.execute('SET B bookmarked_by U WHERE U login "admin"')
   510             req.execute('SET B bookmarked_by U WHERE U login "admin"')
   507             rset = req.execute('Any B,T,L WHERE B bookmarked_by U, U login L '
   511             rset = req.execute('Any B,T,L WHERE B bookmarked_by U, U login L '
   508                                 'WITH B,T BEING (Any B,T WHERE B is Bookmark, B title T)')
   512                                'WITH B,T BEING (Any B,T WHERE B is Bookmark, B title T)')
   509             rset.related_entity(0, 2)
   513             rset.related_entity(0, 2)
   510 
   514 
   511     def test_related_entity_subquery_outerjoin(self):
   515     def test_related_entity_subquery_outerjoin(self):
   512         with self.admin_access.web_request() as req:
   516         with self.admin_access.web_request() as req:
   513             rset = req.execute('Any X,S,L WHERE X in_state S '
   517             rset = req.execute('Any X,S,L WHERE X in_state S '
   514                                 'WITH X, L BEING (Any X,MAX(L) GROUPBY X '
   518                                'WITH X, L BEING (Any X,MAX(L) GROUPBY X '
   515                                 'WHERE X is CWUser, T? wf_info_for X, T creation_date L)')
   519                                'WHERE X is CWUser, T? wf_info_for X, T creation_date L)')
   516             self.assertEqual(len(rset), 2)
   520             self.assertEqual(len(rset), 2)
   517             rset.related_entity(0, 1)
   521             rset.related_entity(0, 1)
   518             rset.related_entity(0, 2)
   522             rset.related_entity(0, 2)
   519 
   523 
   520     def test_entities(self):
   524     def test_entities(self):
   521         with self.admin_access.web_request() as req:
   525         with self.admin_access.web_request() as req:
   522             rset = req.execute('Any U,G WHERE U in_group G')
   526             rset = req.execute('Any U,G WHERE U in_group G')
   523             # make sure we have at least one element
   527             # make sure we have at least one element
   524             self.assertTrue(rset)
   528             self.assertTrue(rset)
   525             self.assertEqual(set(e.e_schema.type for e in rset.entities(0)),
   529             self.assertEqual(set(e.e_schema.type for e in rset.entities(0)),
   526                               set(['CWUser',]))
   530                              set(['CWUser']))
   527             self.assertEqual(set(e.e_schema.type for e in rset.entities(1)),
   531             self.assertEqual(set(e.e_schema.type for e in rset.entities(1)),
   528                               set(['CWGroup',]))
   532                              set(['CWGroup']))
   529 
   533 
   530     def test_iter_rows_with_entities(self):
   534     def test_iter_rows_with_entities(self):
   531         with self.admin_access.web_request() as req:
   535         with self.admin_access.web_request() as req:
   532             rset = req.execute('Any U,UN,G,GN WHERE U in_group G, U login UN, G name GN')
   536             rset = req.execute('Any U,UN,G,GN WHERE U in_group G, U login UN, G name GN')
   533             # make sure we have at least one element
   537             # make sure we have at least one element
   534             self.assertTrue(rset)
   538             self.assertTrue(rset)
   535             out = list(rset.iter_rows_with_entities())[0]
   539             out = list(rset.iter_rows_with_entities())[0]
   536             self.assertEqual( out[0].login, out[1] )
   540             self.assertEqual(out[0].login, out[1])
   537             self.assertEqual( out[2].name, out[3] )
   541             self.assertEqual(out[2].name, out[3])
   538 
   542 
   539     def test_printable_rql(self):
   543     def test_printable_rql(self):
   540         with self.admin_access.web_request() as req:
   544         with self.admin_access.web_request() as req:
   541             rset = req.execute(u'CWEType X WHERE X final FALSE')
   545             rset = req.execute(u'CWEType X WHERE X final FALSE')
   542             self.assertEqual(rset.printable_rql(),
   546             self.assertEqual(rset.printable_rql(),
   543                               'Any X WHERE X final FALSE, X is CWEType')
   547                              'Any X WHERE X final FALSE, X is CWEType')
   544 
   548 
   545     def test_searched_text(self):
   549     def test_searched_text(self):
   546         with self.admin_access.web_request() as req:
   550         with self.admin_access.web_request() as req:
   547             rset = req.execute(u'Any X WHERE X has_text "foobar"')
   551             rset = req.execute(u'Any X WHERE X has_text "foobar"')
   548             self.assertEqual(rset.searched_text(), 'foobar')
   552             self.assertEqual(rset.searched_text(), 'foobar')
   549             rset = req.execute(u'Any X WHERE X has_text %(text)s', {'text' : 'foo'})
   553             rset = req.execute(u'Any X WHERE X has_text %(text)s', {'text': 'foo'})
   550             self.assertEqual(rset.searched_text(), 'foo')
   554             self.assertEqual(rset.searched_text(), 'foo')
   551 
   555 
   552     def test_union_limited_rql(self):
   556     def test_union_limited_rql(self):
   553         with self.admin_access.web_request() as req:
   557         with self.admin_access.web_request() as req:
   554             rset = req.execute('(Any X,N WHERE X is Bookmark, X title N)'
   558             rset = req.execute('(Any X,N WHERE X is Bookmark, X title N)'
   555                                 ' UNION '
   559                                ' UNION '
   556                                 '(Any X,N WHERE X is CWGroup, X name N)')
   560                                '(Any X,N WHERE X is CWGroup, X name N)')
   557             rset.limit(2, 10, inplace=True)
   561             rset.limit(2, 10, inplace=True)
   558             self.assertEqual(rset.limited_rql(),
   562             self.assertEqual(rset.limited_rql(),
   559                               'Any A,B LIMIT 2 OFFSET 10 '
   563                              'Any A,B LIMIT 2 OFFSET 10 '
   560                               'WITH A,B BEING ('
   564                              'WITH A,B BEING ('
   561                               '(Any X,N WHERE X is Bookmark, X title N) '
   565                              '(Any X,N WHERE X is Bookmark, X title N) '
   562                               'UNION '
   566                              'UNION '
   563                               '(Any X,N WHERE X is CWGroup, X name N)'
   567                              '(Any X,N WHERE X is CWGroup, X name N)'
   564                               ')')
   568                              ')')
   565 
   569 
   566     def test_possible_actions_cache(self):
   570     def test_possible_actions_cache(self):
   567         with self.admin_access.web_request() as req:
   571         with self.admin_access.web_request() as req:
   568             rset = req.execute('Any D, COUNT(U) GROUPBY D WHERE U is CWUser, U creation_date D')
   572             rset = req.execute('Any D, COUNT(U) GROUPBY D WHERE U is CWUser, U creation_date D')
   569             rset.possible_actions(argument='Value')
   573             rset.possible_actions(argument='Value')
   570             self.assertRaises(AssertionError, rset.possible_actions, argument='OtherValue')
   574             self.assertRaises(AssertionError, rset.possible_actions, argument='OtherValue')
   571             self.assertRaises(AssertionError, rset.possible_actions, other_argument='Value')
   575             self.assertRaises(AssertionError, rset.possible_actions, other_argument='Value')
   572 
   576 
   573     def test_count_users_by_date(self):
   577     def test_count_users_by_date(self):
   574         with self.admin_access.web_request() as req:
   578         with self.admin_access.web_request() as req:
   575             rset = req.execute('Any D, COUNT(U) GROUPBY D WHERE U is CWUser, U creation_date D')
   579             rset = req.execute('Any D, COUNT(U) GROUPBY D '
   576             self.assertEqual(rset.related_entity(0,0), (None, None))
   580                                'WHERE U is CWUser, U creation_date D')
       
   581             self.assertEqual(rset.related_entity(0, 0), (None, None))
   577 
   582 
   578     def test_str(self):
   583     def test_str(self):
   579         with self.admin_access.web_request() as req:
   584         with self.admin_access.web_request() as req:
   580             rset = req.execute('(Any X,N WHERE X is CWGroup, X name N)')
   585             rset = req.execute('(Any X,N WHERE X is CWGroup, X name N)')
   581             self.assertIsInstance(str(rset), string_types)
   586             self.assertIsInstance(str(rset), string_types)
   592             self.assertEqual(len(str(rset).splitlines()), 1)
   597             self.assertEqual(len(str(rset).splitlines()), 1)
   593 
   598 
   594     def test_slice(self):
   599     def test_slice(self):
   595         rs = ResultSet([[12000, 'adim', u'Adim chez les pinguins'],
   600         rs = ResultSet([[12000, 'adim', u'Adim chez les pinguins'],
   596                         [12000, 'adim', u'Jardiner facile'],
   601                         [12000, 'adim', u'Jardiner facile'],
   597                         [13000, 'syt',  u'Le carrelage en 42 leçons'],
   602                         [13000, 'syt', u'Le carrelage en 42 leçons'],
   598                         [14000, 'nico', u'La tarte tatin en 15 minutes'],
   603                         [14000, 'nico', u'La tarte tatin en 15 minutes'],
   599                         [14000, 'nico', u"L'épluchage du castor commun"]],
   604                         [14000, 'nico', u"L'épluchage du castor commun"]],
   600                        'Any U, L, T WHERE U is CWUser, U login L,'\
   605                        ('Any U, L, T WHERE U is CWUser, U login L,'
   601                        'D created_by U, D title T',
   606                         'D created_by U, D title T'),
   602                        description=[['CWUser', 'String', 'String']] * 5)
   607                        description=[['CWUser', 'String', 'String']] * 5)
   603         self.assertEqual(rs[1::2],
   608         self.assertEqual(rs[1::2],
   604             [[12000, 'adim', u'Jardiner facile'],
   609                          [[12000, 'adim', u'Jardiner facile'],
   605              [14000, 'nico', u'La tarte tatin en 15 minutes']])
   610                          [14000, 'nico', u'La tarte tatin en 15 minutes']])
   606 
   611 
   607     def test_nonregr_symmetric_relation(self):
   612     def test_nonregr_symmetric_relation(self):
   608         # see https://www.cubicweb.org/ticket/4739253
   613         # see https://www.cubicweb.org/ticket/4739253
   609         with self.admin_access.client_cnx() as cnx:
   614         with self.admin_access.client_cnx() as cnx:
   610             p1 = cnx.create_entity('Personne', nom=u'sylvain')
   615             p1 = cnx.create_entity('Personne', nom=u'sylvain')
   611             cnx.create_entity('Personne', nom=u'denis', connait=p1)
   616             cnx.create_entity('Personne', nom=u'denis', connait=p1)
   612             cnx.commit()
   617             cnx.commit()
   613             rset = cnx.execute('Any X,Y WHERE X connait Y')
   618             rset = cnx.execute('Any X,Y WHERE X connait Y')
   614             rset.get_entity(0, 1) # used to raise KeyError
   619             rset.get_entity(0, 1)  # used to raise KeyError
       
   620 
   615 
   621 
   616 if __name__ == '__main__':
   622 if __name__ == '__main__':
   617     unittest_main()
   623     unittest_main()