cubicweb/server/test/unittest_querier.py
changeset 12567 26744ad37953
parent 12539 10159a3d1d72
child 12931 6eae252361e5
equal deleted inserted replaced
12566:6b3523f81f42 12567:26744ad37953
    23 from datetime import date, datetime, timedelta, tzinfo
    23 from datetime import date, datetime, timedelta, tzinfo
    24 import unittest
    24 import unittest
    25 
    25 
    26 import pytz
    26 import pytz
    27 
    27 
    28 from six import PY2, integer_types, binary_type, text_type
       
    29 
       
    30 from rql import BadRQLQuery
    28 from rql import BadRQLQuery
    31 from rql.utils import register_function, FunctionDescr
    29 from rql.utils import register_function, FunctionDescr
    32 
    30 
    33 from cubicweb import QueryError, Unauthorized, Binary, devtools
    31 from cubicweb import QueryError, Unauthorized, Binary, devtools
    34 from cubicweb.server.sqlutils import SQL_CONNECT_HOOKS, SQL_PREFIX
    32 from cubicweb.server.sqlutils import SQL_CONNECT_HOOKS, SQL_PREFIX
   132             # (any one, etype will be discarded)
   130             # (any one, etype will be discarded)
   133             self.assertEqual(1, len(rqlst.solutions))
   131             self.assertEqual(1, len(rqlst.solutions))
   134 
   132 
   135     def assertRQLEqual(self, expected, got):
   133     def assertRQLEqual(self, expected, got):
   136         from rql import parse
   134         from rql import parse
   137         self.assertMultiLineEqual(text_type(parse(expected)),
   135         self.assertMultiLineEqual(str(parse(expected)),
   138                                   text_type(parse(got)))
   136                                   str(parse(got)))
   139 
   137 
   140     def test_preprocess_security(self):
   138     def test_preprocess_security(self):
   141         with self.user_groups_session('users') as cnx:
   139         with self.user_groups_session('users') as cnx:
   142             plan = self._prepare_plan(cnx, 'Any ETN,COUNT(X) GROUPBY ETN '
   140             plan = self._prepare_plan(cnx, 'Any ETN,COUNT(X) GROUPBY ETN '
   143                                       'WHERE X is ET, ET name ETN')
   141                                       'WHERE X is ET, ET name ETN')
   263         self.assertEqual(rset.description[0][0], 'Date')
   261         self.assertEqual(rset.description[0][0], 'Date')
   264         rset = self.qexecute('Any NOW')
   262         rset = self.qexecute('Any NOW')
   265         self.assertEqual(rset.description[0][0], 'Datetime')
   263         self.assertEqual(rset.description[0][0], 'Datetime')
   266         rset = self.qexecute('Any %(x)s', {'x': 1})
   264         rset = self.qexecute('Any %(x)s', {'x': 1})
   267         self.assertEqual(rset.description[0][0], 'Int')
   265         self.assertEqual(rset.description[0][0], 'Int')
   268         if PY2:
       
   269             rset = self.qexecute('Any %(x)s', {'x': long(1)})
       
   270             self.assertEqual(rset.description[0][0], 'Int')
       
   271         rset = self.qexecute('Any %(x)s', {'x': True})
   266         rset = self.qexecute('Any %(x)s', {'x': True})
   272         self.assertEqual(rset.description[0][0], 'Boolean')
   267         self.assertEqual(rset.description[0][0], 'Boolean')
   273         rset = self.qexecute('Any %(x)s', {'x': 1.0})
   268         rset = self.qexecute('Any %(x)s', {'x': 1.0})
   274         self.assertEqual(rset.description[0][0], 'Float')
   269         self.assertEqual(rset.description[0][0], 'Float')
   275         rset = self.qexecute('Any %(x)s', {'x': datetime.now()})
   270         rset = self.qexecute('Any %(x)s', {'x': datetime.now()})
   325         self.assertFalse(self.qexecute('Any X WHERE X eid 99999999'))
   320         self.assertFalse(self.qexecute('Any X WHERE X eid 99999999'))
   326 
   321 
   327     def test_typed_eid(self):
   322     def test_typed_eid(self):
   328         # should return an empty result set
   323         # should return an empty result set
   329         rset = self.qexecute('Any X WHERE X eid %(x)s', {'x': '1'})
   324         rset = self.qexecute('Any X WHERE X eid %(x)s', {'x': '1'})
   330         self.assertIsInstance(rset[0][0], integer_types)
   325         self.assertIsInstance(rset[0][0], int)
   331 
   326 
   332     def test_bytes_storage(self):
   327     def test_bytes_storage(self):
   333         feid = self.qexecute('INSERT File X: X data_name "foo.pdf", '
   328         feid = self.qexecute('INSERT File X: X data_name "foo.pdf", '
   334                              'X data_format "text/plain", X data %(data)s',
   329                              'X data_format "text/plain", X data %(data)s',
   335                             {'data': Binary(b"xxx")})[0][0]
   330                             {'data': Binary(b"xxx")})[0][0]
   901 
   896 
   902     def test_select_constant(self):
   897     def test_select_constant(self):
   903         rset = self.qexecute('Any X, "toto" ORDERBY X WHERE X is CWGroup')
   898         rset = self.qexecute('Any X, "toto" ORDERBY X WHERE X is CWGroup')
   904         self.assertEqual(rset.rows,
   899         self.assertEqual(rset.rows,
   905                          [list(x) for x in zip((2,3,4,5), ('toto','toto','toto','toto',))])
   900                          [list(x) for x in zip((2,3,4,5), ('toto','toto','toto','toto',))])
   906         self.assertIsInstance(rset[0][1], text_type)
   901         self.assertIsInstance(rset[0][1], str)
   907         self.assertEqual(rset.description,
   902         self.assertEqual(rset.description,
   908                          list(zip(('CWGroup', 'CWGroup', 'CWGroup', 'CWGroup'),
   903                          list(zip(('CWGroup', 'CWGroup', 'CWGroup', 'CWGroup'),
   909                                   ('String', 'String', 'String', 'String',))))
   904                                   ('String', 'String', 'String', 'String',))))
   910         rset = self.qexecute('Any X, %(value)s ORDERBY X WHERE X is CWGroup', {'value': 'toto'})
   905         rset = self.qexecute('Any X, %(value)s ORDERBY X WHERE X is CWGroup', {'value': 'toto'})
   911         self.assertEqual(rset.rows,
   906         self.assertEqual(rset.rows,
   912                          list(map(list, zip((2,3,4,5), ('toto','toto','toto','toto',)))))
   907                          list(map(list, zip((2,3,4,5), ('toto','toto','toto','toto',)))))
   913         self.assertIsInstance(rset[0][1], text_type)
   908         self.assertIsInstance(rset[0][1], str)
   914         self.assertEqual(rset.description,
   909         self.assertEqual(rset.description,
   915                          list(zip(('CWGroup', 'CWGroup', 'CWGroup', 'CWGroup'),
   910                          list(zip(('CWGroup', 'CWGroup', 'CWGroup', 'CWGroup'),
   916                                   ('String', 'String', 'String', 'String',))))
   911                                   ('String', 'String', 'String', 'String',))))
   917         rset = self.qexecute('Any X,GN WHERE X is CWUser, G is CWGroup, X login "syt", '
   912         rset = self.qexecute('Any X,GN WHERE X is CWUser, G is CWGroup, X login "syt", '
   918                              'X in_group G, G name GN')
   913                              'X in_group G, G name GN')
  1071         self.assertEqual(len(self.qexecute('Any X, Y WHERE X travaille Y')), 2)
  1066         self.assertEqual(len(self.qexecute('Any X, Y WHERE X travaille Y')), 2)
  1072 
  1067 
  1073     def test_insert_4ter(self):
  1068     def test_insert_4ter(self):
  1074         peid = self.qexecute("INSERT Personne X: X nom 'bidule'")[0][0]
  1069         peid = self.qexecute("INSERT Personne X: X nom 'bidule'")[0][0]
  1075         seid = self.qexecute("INSERT Societe Y: Y nom 'toto', X travaille Y WHERE X eid %(x)s",
  1070         seid = self.qexecute("INSERT Societe Y: Y nom 'toto', X travaille Y WHERE X eid %(x)s",
  1076                              {'x': text_type(peid)})[0][0]
  1071                              {'x': str(peid)})[0][0]
  1077         self.assertEqual(len(self.qexecute('Any X, Y WHERE X travaille Y')), 1)
  1072         self.assertEqual(len(self.qexecute('Any X, Y WHERE X travaille Y')), 1)
  1078         self.qexecute("INSERT Personne X: X nom 'chouette', X travaille Y WHERE Y eid %(x)s",
  1073         self.qexecute("INSERT Personne X: X nom 'chouette', X travaille Y WHERE Y eid %(x)s",
  1079                       {'x': text_type(seid)})
  1074                       {'x': str(seid)})
  1080         self.assertEqual(len(self.qexecute('Any X, Y WHERE X travaille Y')), 2)
  1075         self.assertEqual(len(self.qexecute('Any X, Y WHERE X travaille Y')), 2)
  1081 
  1076 
  1082     def test_insert_5(self):
  1077     def test_insert_5(self):
  1083         self.qexecute("INSERT Personne X: X nom 'bidule'")
  1078         self.qexecute("INSERT Personne X: X nom 'bidule'")
  1084         self.qexecute("INSERT Societe Y: Y nom 'toto', X travaille Y WHERE X nom 'bidule'")
  1079         self.qexecute("INSERT Societe Y: Y nom 'toto', X travaille Y WHERE X nom 'bidule'")
  1280 
  1275 
  1281     def test_update_2ter(self):
  1276     def test_update_2ter(self):
  1282         rset = self.qexecute("INSERT Personne X, Societe Y: X nom 'bidule', Y nom 'toto'")
  1277         rset = self.qexecute("INSERT Personne X, Societe Y: X nom 'bidule', Y nom 'toto'")
  1283         eid1, eid2 = rset[0][0], rset[0][1]
  1278         eid1, eid2 = rset[0][0], rset[0][1]
  1284         self.qexecute("SET X travaille Y WHERE X eid %(x)s, Y eid %(y)s",
  1279         self.qexecute("SET X travaille Y WHERE X eid %(x)s, Y eid %(y)s",
  1285                       {'x': text_type(eid1), 'y': text_type(eid2)})
  1280                       {'x': str(eid1), 'y': str(eid2)})
  1286         rset = self.qexecute('Any X, Y WHERE X travaille Y')
  1281         rset = self.qexecute('Any X, Y WHERE X travaille Y')
  1287         self.assertEqual(len(rset.rows), 1)
  1282         self.assertEqual(len(rset.rows), 1)
  1288 
  1283 
  1289     def test_update_multiple1(self):
  1284     def test_update_multiple1(self):
  1290         peid1 = self.qexecute("INSERT Personne Y: Y nom 'tutu'")[0][0]
  1285         peid1 = self.qexecute("INSERT Personne Y: Y nom 'tutu'")[0][0]
  1330     def test_update_not_exists(self):
  1325     def test_update_not_exists(self):
  1331         rset = self.qexecute("INSERT Personne X, Societe Y: X nom 'bidule', Y nom 'toto'")
  1326         rset = self.qexecute("INSERT Personne X, Societe Y: X nom 'bidule', Y nom 'toto'")
  1332         eid1, eid2 = rset[0][0], rset[0][1]
  1327         eid1, eid2 = rset[0][0], rset[0][1]
  1333         rset = self.qexecute("SET X travaille Y WHERE X eid %(x)s, Y eid %(y)s, "
  1328         rset = self.qexecute("SET X travaille Y WHERE X eid %(x)s, Y eid %(y)s, "
  1334                             "NOT EXISTS(Z ecrit_par X)",
  1329                             "NOT EXISTS(Z ecrit_par X)",
  1335                             {'x': text_type(eid1), 'y': text_type(eid2)})
  1330                             {'x': str(eid1), 'y': str(eid2)})
  1336         self.assertEqual(tuplify(rset.rows), [(eid1, eid2)])
  1331         self.assertEqual(tuplify(rset.rows), [(eid1, eid2)])
  1337 
  1332 
  1338     def test_update_query_error(self):
  1333     def test_update_query_error(self):
  1339         self.qexecute("INSERT Personne Y: Y nom 'toto'")
  1334         self.qexecute("INSERT Personne Y: Y nom 'toto'")
  1340         self.assertRaises(Exception, self.qexecute, "SET X nom 'toto', X is Personne")
  1335         self.assertRaises(Exception, self.qexecute, "SET X nom 'toto', X is Personne")
  1377                           self.qexecute, "Any P WHERE X is CWUser, X login 'bob', X upassword P")
  1372                           self.qexecute, "Any P WHERE X is CWUser, X login 'bob', X upassword P")
  1378         with self.admin_access.cnx() as cnx:
  1373         with self.admin_access.cnx() as cnx:
  1379             cursor = cnx.cnxset.cu
  1374             cursor = cnx.cnxset.cu
  1380             cursor.execute("SELECT %supassword from %sCWUser WHERE %slogin='bob'"
  1375             cursor.execute("SELECT %supassword from %sCWUser WHERE %slogin='bob'"
  1381                            % (SQL_PREFIX, SQL_PREFIX, SQL_PREFIX))
  1376                            % (SQL_PREFIX, SQL_PREFIX, SQL_PREFIX))
  1382             passwd = binary_type(cursor.fetchone()[0])
  1377             passwd = bytes(cursor.fetchone()[0])
  1383             self.assertEqual(passwd, crypt_password('toto', passwd))
  1378             self.assertEqual(passwd, crypt_password('toto', passwd))
  1384         rset = self.qexecute("Any X WHERE X is CWUser, X login 'bob', X upassword %(pwd)s",
  1379         rset = self.qexecute("Any X WHERE X is CWUser, X login 'bob', X upassword %(pwd)s",
  1385                             {'pwd': Binary(passwd)})
  1380                             {'pwd': Binary(passwd)})
  1386         self.assertEqual(len(rset.rows), 1)
  1381         self.assertEqual(len(rset.rows), 1)
  1387         self.assertEqual(rset.description, [('CWUser',)])
  1382         self.assertEqual(rset.description, [('CWUser',)])
  1394             rset = cnx.execute("SET X upassword %(pwd)s WHERE X is CWUser, X login 'bob'",
  1389             rset = cnx.execute("SET X upassword %(pwd)s WHERE X is CWUser, X login 'bob'",
  1395                                {'pwd': b'tutu'})
  1390                                {'pwd': b'tutu'})
  1396             cursor = cnx.cnxset.cu
  1391             cursor = cnx.cnxset.cu
  1397             cursor.execute("SELECT %supassword from %sCWUser WHERE %slogin='bob'"
  1392             cursor.execute("SELECT %supassword from %sCWUser WHERE %slogin='bob'"
  1398                            % (SQL_PREFIX, SQL_PREFIX, SQL_PREFIX))
  1393                            % (SQL_PREFIX, SQL_PREFIX, SQL_PREFIX))
  1399             passwd = binary_type(cursor.fetchone()[0])
  1394             passwd = bytes(cursor.fetchone()[0])
  1400             self.assertEqual(passwd, crypt_password('tutu', passwd))
  1395             self.assertEqual(passwd, crypt_password('tutu', passwd))
  1401             rset = cnx.execute("Any X WHERE X is CWUser, X login 'bob', X upassword %(pwd)s",
  1396             rset = cnx.execute("Any X WHERE X is CWUser, X login 'bob', X upassword %(pwd)s",
  1402                                {'pwd': Binary(passwd)})
  1397                                {'pwd': Binary(passwd)})
  1403             self.assertEqual(len(rset.rows), 1)
  1398             self.assertEqual(len(rset.rows), 1)
  1404             self.assertEqual(rset.description, [('CWUser',)])
  1399             self.assertEqual(rset.description, [('CWUser',)])