23 from datetime import date, datetime, timedelta, tzinfo |
23 from datetime import date, datetime, timedelta, tzinfo |
24 import unittest |
24 import unittest |
25 |
25 |
26 import pytz |
26 import pytz |
27 |
27 |
28 from six import PY2, integer_types, binary_type, text_type |
|
29 |
|
30 from rql import BadRQLQuery |
28 from rql import BadRQLQuery |
31 from rql.utils import register_function, FunctionDescr |
29 from rql.utils import register_function, FunctionDescr |
32 |
30 |
33 from cubicweb import QueryError, Unauthorized, Binary, devtools |
31 from cubicweb import QueryError, Unauthorized, Binary, devtools |
34 from cubicweb.server.sqlutils import SQL_CONNECT_HOOKS, SQL_PREFIX |
32 from cubicweb.server.sqlutils import SQL_CONNECT_HOOKS, SQL_PREFIX |
132 # (any one, etype will be discarded) |
130 # (any one, etype will be discarded) |
133 self.assertEqual(1, len(rqlst.solutions)) |
131 self.assertEqual(1, len(rqlst.solutions)) |
134 |
132 |
135 def assertRQLEqual(self, expected, got): |
133 def assertRQLEqual(self, expected, got): |
136 from rql import parse |
134 from rql import parse |
137 self.assertMultiLineEqual(text_type(parse(expected)), |
135 self.assertMultiLineEqual(str(parse(expected)), |
138 text_type(parse(got))) |
136 str(parse(got))) |
139 |
137 |
140 def test_preprocess_security(self): |
138 def test_preprocess_security(self): |
141 with self.user_groups_session('users') as cnx: |
139 with self.user_groups_session('users') as cnx: |
142 plan = self._prepare_plan(cnx, 'Any ETN,COUNT(X) GROUPBY ETN ' |
140 plan = self._prepare_plan(cnx, 'Any ETN,COUNT(X) GROUPBY ETN ' |
143 'WHERE X is ET, ET name ETN') |
141 'WHERE X is ET, ET name ETN') |
263 self.assertEqual(rset.description[0][0], 'Date') |
261 self.assertEqual(rset.description[0][0], 'Date') |
264 rset = self.qexecute('Any NOW') |
262 rset = self.qexecute('Any NOW') |
265 self.assertEqual(rset.description[0][0], 'Datetime') |
263 self.assertEqual(rset.description[0][0], 'Datetime') |
266 rset = self.qexecute('Any %(x)s', {'x': 1}) |
264 rset = self.qexecute('Any %(x)s', {'x': 1}) |
267 self.assertEqual(rset.description[0][0], 'Int') |
265 self.assertEqual(rset.description[0][0], 'Int') |
268 if PY2: |
|
269 rset = self.qexecute('Any %(x)s', {'x': long(1)}) |
|
270 self.assertEqual(rset.description[0][0], 'Int') |
|
271 rset = self.qexecute('Any %(x)s', {'x': True}) |
266 rset = self.qexecute('Any %(x)s', {'x': True}) |
272 self.assertEqual(rset.description[0][0], 'Boolean') |
267 self.assertEqual(rset.description[0][0], 'Boolean') |
273 rset = self.qexecute('Any %(x)s', {'x': 1.0}) |
268 rset = self.qexecute('Any %(x)s', {'x': 1.0}) |
274 self.assertEqual(rset.description[0][0], 'Float') |
269 self.assertEqual(rset.description[0][0], 'Float') |
275 rset = self.qexecute('Any %(x)s', {'x': datetime.now()}) |
270 rset = self.qexecute('Any %(x)s', {'x': datetime.now()}) |
325 self.assertFalse(self.qexecute('Any X WHERE X eid 99999999')) |
320 self.assertFalse(self.qexecute('Any X WHERE X eid 99999999')) |
326 |
321 |
327 def test_typed_eid(self): |
322 def test_typed_eid(self): |
328 # should return an empty result set |
323 # should return an empty result set |
329 rset = self.qexecute('Any X WHERE X eid %(x)s', {'x': '1'}) |
324 rset = self.qexecute('Any X WHERE X eid %(x)s', {'x': '1'}) |
330 self.assertIsInstance(rset[0][0], integer_types) |
325 self.assertIsInstance(rset[0][0], int) |
331 |
326 |
332 def test_bytes_storage(self): |
327 def test_bytes_storage(self): |
333 feid = self.qexecute('INSERT File X: X data_name "foo.pdf", ' |
328 feid = self.qexecute('INSERT File X: X data_name "foo.pdf", ' |
334 'X data_format "text/plain", X data %(data)s', |
329 'X data_format "text/plain", X data %(data)s', |
335 {'data': Binary(b"xxx")})[0][0] |
330 {'data': Binary(b"xxx")})[0][0] |
901 |
896 |
902 def test_select_constant(self): |
897 def test_select_constant(self): |
903 rset = self.qexecute('Any X, "toto" ORDERBY X WHERE X is CWGroup') |
898 rset = self.qexecute('Any X, "toto" ORDERBY X WHERE X is CWGroup') |
904 self.assertEqual(rset.rows, |
899 self.assertEqual(rset.rows, |
905 [list(x) for x in zip((2,3,4,5), ('toto','toto','toto','toto',))]) |
900 [list(x) for x in zip((2,3,4,5), ('toto','toto','toto','toto',))]) |
906 self.assertIsInstance(rset[0][1], text_type) |
901 self.assertIsInstance(rset[0][1], str) |
907 self.assertEqual(rset.description, |
902 self.assertEqual(rset.description, |
908 list(zip(('CWGroup', 'CWGroup', 'CWGroup', 'CWGroup'), |
903 list(zip(('CWGroup', 'CWGroup', 'CWGroup', 'CWGroup'), |
909 ('String', 'String', 'String', 'String',)))) |
904 ('String', 'String', 'String', 'String',)))) |
910 rset = self.qexecute('Any X, %(value)s ORDERBY X WHERE X is CWGroup', {'value': 'toto'}) |
905 rset = self.qexecute('Any X, %(value)s ORDERBY X WHERE X is CWGroup', {'value': 'toto'}) |
911 self.assertEqual(rset.rows, |
906 self.assertEqual(rset.rows, |
912 list(map(list, zip((2,3,4,5), ('toto','toto','toto','toto',))))) |
907 list(map(list, zip((2,3,4,5), ('toto','toto','toto','toto',))))) |
913 self.assertIsInstance(rset[0][1], text_type) |
908 self.assertIsInstance(rset[0][1], str) |
914 self.assertEqual(rset.description, |
909 self.assertEqual(rset.description, |
915 list(zip(('CWGroup', 'CWGroup', 'CWGroup', 'CWGroup'), |
910 list(zip(('CWGroup', 'CWGroup', 'CWGroup', 'CWGroup'), |
916 ('String', 'String', 'String', 'String',)))) |
911 ('String', 'String', 'String', 'String',)))) |
917 rset = self.qexecute('Any X,GN WHERE X is CWUser, G is CWGroup, X login "syt", ' |
912 rset = self.qexecute('Any X,GN WHERE X is CWUser, G is CWGroup, X login "syt", ' |
918 'X in_group G, G name GN') |
913 'X in_group G, G name GN') |
1071 self.assertEqual(len(self.qexecute('Any X, Y WHERE X travaille Y')), 2) |
1066 self.assertEqual(len(self.qexecute('Any X, Y WHERE X travaille Y')), 2) |
1072 |
1067 |
1073 def test_insert_4ter(self): |
1068 def test_insert_4ter(self): |
1074 peid = self.qexecute("INSERT Personne X: X nom 'bidule'")[0][0] |
1069 peid = self.qexecute("INSERT Personne X: X nom 'bidule'")[0][0] |
1075 seid = self.qexecute("INSERT Societe Y: Y nom 'toto', X travaille Y WHERE X eid %(x)s", |
1070 seid = self.qexecute("INSERT Societe Y: Y nom 'toto', X travaille Y WHERE X eid %(x)s", |
1076 {'x': text_type(peid)})[0][0] |
1071 {'x': str(peid)})[0][0] |
1077 self.assertEqual(len(self.qexecute('Any X, Y WHERE X travaille Y')), 1) |
1072 self.assertEqual(len(self.qexecute('Any X, Y WHERE X travaille Y')), 1) |
1078 self.qexecute("INSERT Personne X: X nom 'chouette', X travaille Y WHERE Y eid %(x)s", |
1073 self.qexecute("INSERT Personne X: X nom 'chouette', X travaille Y WHERE Y eid %(x)s", |
1079 {'x': text_type(seid)}) |
1074 {'x': str(seid)}) |
1080 self.assertEqual(len(self.qexecute('Any X, Y WHERE X travaille Y')), 2) |
1075 self.assertEqual(len(self.qexecute('Any X, Y WHERE X travaille Y')), 2) |
1081 |
1076 |
1082 def test_insert_5(self): |
1077 def test_insert_5(self): |
1083 self.qexecute("INSERT Personne X: X nom 'bidule'") |
1078 self.qexecute("INSERT Personne X: X nom 'bidule'") |
1084 self.qexecute("INSERT Societe Y: Y nom 'toto', X travaille Y WHERE X nom 'bidule'") |
1079 self.qexecute("INSERT Societe Y: Y nom 'toto', X travaille Y WHERE X nom 'bidule'") |
1280 |
1275 |
1281 def test_update_2ter(self): |
1276 def test_update_2ter(self): |
1282 rset = self.qexecute("INSERT Personne X, Societe Y: X nom 'bidule', Y nom 'toto'") |
1277 rset = self.qexecute("INSERT Personne X, Societe Y: X nom 'bidule', Y nom 'toto'") |
1283 eid1, eid2 = rset[0][0], rset[0][1] |
1278 eid1, eid2 = rset[0][0], rset[0][1] |
1284 self.qexecute("SET X travaille Y WHERE X eid %(x)s, Y eid %(y)s", |
1279 self.qexecute("SET X travaille Y WHERE X eid %(x)s, Y eid %(y)s", |
1285 {'x': text_type(eid1), 'y': text_type(eid2)}) |
1280 {'x': str(eid1), 'y': str(eid2)}) |
1286 rset = self.qexecute('Any X, Y WHERE X travaille Y') |
1281 rset = self.qexecute('Any X, Y WHERE X travaille Y') |
1287 self.assertEqual(len(rset.rows), 1) |
1282 self.assertEqual(len(rset.rows), 1) |
1288 |
1283 |
1289 def test_update_multiple1(self): |
1284 def test_update_multiple1(self): |
1290 peid1 = self.qexecute("INSERT Personne Y: Y nom 'tutu'")[0][0] |
1285 peid1 = self.qexecute("INSERT Personne Y: Y nom 'tutu'")[0][0] |
1330 def test_update_not_exists(self): |
1325 def test_update_not_exists(self): |
1331 rset = self.qexecute("INSERT Personne X, Societe Y: X nom 'bidule', Y nom 'toto'") |
1326 rset = self.qexecute("INSERT Personne X, Societe Y: X nom 'bidule', Y nom 'toto'") |
1332 eid1, eid2 = rset[0][0], rset[0][1] |
1327 eid1, eid2 = rset[0][0], rset[0][1] |
1333 rset = self.qexecute("SET X travaille Y WHERE X eid %(x)s, Y eid %(y)s, " |
1328 rset = self.qexecute("SET X travaille Y WHERE X eid %(x)s, Y eid %(y)s, " |
1334 "NOT EXISTS(Z ecrit_par X)", |
1329 "NOT EXISTS(Z ecrit_par X)", |
1335 {'x': text_type(eid1), 'y': text_type(eid2)}) |
1330 {'x': str(eid1), 'y': str(eid2)}) |
1336 self.assertEqual(tuplify(rset.rows), [(eid1, eid2)]) |
1331 self.assertEqual(tuplify(rset.rows), [(eid1, eid2)]) |
1337 |
1332 |
1338 def test_update_query_error(self): |
1333 def test_update_query_error(self): |
1339 self.qexecute("INSERT Personne Y: Y nom 'toto'") |
1334 self.qexecute("INSERT Personne Y: Y nom 'toto'") |
1340 self.assertRaises(Exception, self.qexecute, "SET X nom 'toto', X is Personne") |
1335 self.assertRaises(Exception, self.qexecute, "SET X nom 'toto', X is Personne") |
1377 self.qexecute, "Any P WHERE X is CWUser, X login 'bob', X upassword P") |
1372 self.qexecute, "Any P WHERE X is CWUser, X login 'bob', X upassword P") |
1378 with self.admin_access.cnx() as cnx: |
1373 with self.admin_access.cnx() as cnx: |
1379 cursor = cnx.cnxset.cu |
1374 cursor = cnx.cnxset.cu |
1380 cursor.execute("SELECT %supassword from %sCWUser WHERE %slogin='bob'" |
1375 cursor.execute("SELECT %supassword from %sCWUser WHERE %slogin='bob'" |
1381 % (SQL_PREFIX, SQL_PREFIX, SQL_PREFIX)) |
1376 % (SQL_PREFIX, SQL_PREFIX, SQL_PREFIX)) |
1382 passwd = binary_type(cursor.fetchone()[0]) |
1377 passwd = bytes(cursor.fetchone()[0]) |
1383 self.assertEqual(passwd, crypt_password('toto', passwd)) |
1378 self.assertEqual(passwd, crypt_password('toto', passwd)) |
1384 rset = self.qexecute("Any X WHERE X is CWUser, X login 'bob', X upassword %(pwd)s", |
1379 rset = self.qexecute("Any X WHERE X is CWUser, X login 'bob', X upassword %(pwd)s", |
1385 {'pwd': Binary(passwd)}) |
1380 {'pwd': Binary(passwd)}) |
1386 self.assertEqual(len(rset.rows), 1) |
1381 self.assertEqual(len(rset.rows), 1) |
1387 self.assertEqual(rset.description, [('CWUser',)]) |
1382 self.assertEqual(rset.description, [('CWUser',)]) |
1394 rset = cnx.execute("SET X upassword %(pwd)s WHERE X is CWUser, X login 'bob'", |
1389 rset = cnx.execute("SET X upassword %(pwd)s WHERE X is CWUser, X login 'bob'", |
1395 {'pwd': b'tutu'}) |
1390 {'pwd': b'tutu'}) |
1396 cursor = cnx.cnxset.cu |
1391 cursor = cnx.cnxset.cu |
1397 cursor.execute("SELECT %supassword from %sCWUser WHERE %slogin='bob'" |
1392 cursor.execute("SELECT %supassword from %sCWUser WHERE %slogin='bob'" |
1398 % (SQL_PREFIX, SQL_PREFIX, SQL_PREFIX)) |
1393 % (SQL_PREFIX, SQL_PREFIX, SQL_PREFIX)) |
1399 passwd = binary_type(cursor.fetchone()[0]) |
1394 passwd = bytes(cursor.fetchone()[0]) |
1400 self.assertEqual(passwd, crypt_password('tutu', passwd)) |
1395 self.assertEqual(passwd, crypt_password('tutu', passwd)) |
1401 rset = cnx.execute("Any X WHERE X is CWUser, X login 'bob', X upassword %(pwd)s", |
1396 rset = cnx.execute("Any X WHERE X is CWUser, X login 'bob', X upassword %(pwd)s", |
1402 {'pwd': Binary(passwd)}) |
1397 {'pwd': Binary(passwd)}) |
1403 self.assertEqual(len(rset.rows), 1) |
1398 self.assertEqual(len(rset.rows), 1) |
1404 self.assertEqual(rset.description, [('CWUser',)]) |
1399 self.assertEqual(rset.description, [('CWUser',)]) |