11 import sys |
11 import sys |
12 |
12 |
13 from logilab.common.testlib import TestCase, unittest_main, mock_object |
13 from logilab.common.testlib import TestCase, unittest_main, mock_object |
14 |
14 |
15 from rql import BadRQLQuery |
15 from rql import BadRQLQuery |
16 from indexer import get_indexer |
|
17 |
16 |
18 #from cubicweb.server.sources.native import remove_unused_solutions |
17 #from cubicweb.server.sources.native import remove_unused_solutions |
19 from cubicweb.server.sources.rql2sql import SQLGenerator, remove_unused_solutions |
18 from cubicweb.server.sources.rql2sql import SQLGenerator, remove_unused_solutions |
20 |
19 |
21 from rql.utils import register_function, FunctionDescr |
20 from rql.utils import register_function, FunctionDescr |
1070 '''SELECT rel_is0.eid_from |
1069 '''SELECT rel_is0.eid_from |
1071 FROM is_relation AS rel_is0 |
1070 FROM is_relation AS rel_is0 |
1072 WHERE rel_is0.eid_to=2'''), |
1071 WHERE rel_is0.eid_to=2'''), |
1073 |
1072 |
1074 ] |
1073 ] |
1075 from logilab.common.adbh import ADV_FUNC_HELPER_DIRECTORY |
1074 from logilab.db import get_db_helper |
1076 |
1075 |
1077 class CWRQLTC(RQLGeneratorTC): |
1076 class CWRQLTC(RQLGeneratorTC): |
1078 schema = schema |
1077 schema = schema |
1079 |
1078 |
1080 def test_nonregr_sol(self): |
1079 def test_nonregr_sol(self): |
1104 schema = schema |
1103 schema = schema |
1105 |
1104 |
1106 #capture = True |
1105 #capture = True |
1107 def setUp(self): |
1106 def setUp(self): |
1108 RQLGeneratorTC.setUp(self) |
1107 RQLGeneratorTC.setUp(self) |
1109 indexer = get_indexer('postgres', 'utf8') |
1108 dbms_helper = get_db_helper('postgres') |
1110 dbms_helper = ADV_FUNC_HELPER_DIRECTORY['postgres'] |
|
1111 dbms_helper.fti_uid_attr = indexer.uid_attr |
|
1112 dbms_helper.fti_table = indexer.table |
|
1113 dbms_helper.fti_restriction_sql = indexer.restriction_sql |
|
1114 dbms_helper.fti_need_distinct_query = indexer.need_distinct |
|
1115 self.o = SQLGenerator(schema, dbms_helper) |
1109 self.o = SQLGenerator(schema, dbms_helper) |
1116 |
1110 |
1117 def _norm_sql(self, sql): |
1111 def _norm_sql(self, sql): |
1118 return sql.strip() |
1112 return sql.strip() |
1119 |
1113 |
1210 self.assertLinesEquals((r % args).strip(), |
1204 self.assertLinesEquals((r % args).strip(), |
1211 '''SELECT _X.cw_eid |
1205 '''SELECT _X.cw_eid |
1212 FROM cw_CWUser AS _X |
1206 FROM cw_CWUser AS _X |
1213 WHERE _X.cw_login IS NULL''') |
1207 WHERE _X.cw_login IS NULL''') |
1214 |
1208 |
|
1209 |
|
1210 def test_date_extraction(self): |
|
1211 self._check("Any MONTH(D) WHERE P is Personne, P creation_date D", |
|
1212 '''SELECT CAST(EXTRACT(MONTH from _P.cw_creation_date) AS INTEGER) |
|
1213 FROM cw_Personne AS _P''') |
|
1214 |
|
1215 |
1215 def test_parser_parse(self): |
1216 def test_parser_parse(self): |
1216 for t in self._parse(PARSER): |
1217 for t in self._parse(PARSER): |
1217 yield t |
1218 yield t |
1218 |
1219 |
1219 def test_basic_parse(self): |
1220 def test_basic_parse(self): |
1407 |
1408 |
1408 class SqliteSQLGeneratorTC(PostgresSQLGeneratorTC): |
1409 class SqliteSQLGeneratorTC(PostgresSQLGeneratorTC): |
1409 |
1410 |
1410 def setUp(self): |
1411 def setUp(self): |
1411 RQLGeneratorTC.setUp(self) |
1412 RQLGeneratorTC.setUp(self) |
1412 indexer = get_indexer('sqlite', 'utf8') |
1413 dbms_helper = get_db_helper('sqlite') |
1413 dbms_helper = ADV_FUNC_HELPER_DIRECTORY['sqlite'] |
|
1414 dbms_helper.fti_uid_attr = indexer.uid_attr |
|
1415 dbms_helper.fti_table = indexer.table |
|
1416 dbms_helper.fti_restriction_sql = indexer.restriction_sql |
|
1417 dbms_helper.fti_need_distinct_query = indexer.need_distinct |
|
1418 self.o = SQLGenerator(schema, dbms_helper) |
1414 self.o = SQLGenerator(schema, dbms_helper) |
1419 |
1415 |
1420 def _norm_sql(self, sql): |
1416 def _norm_sql(self, sql): |
1421 return sql.strip().replace(' ILIKE ', ' LIKE ').replace('\nINTERSECT ALL\n', '\nINTERSECT\n') |
1417 return sql.strip().replace(' ILIKE ', ' LIKE ').replace('\nINTERSECT ALL\n', '\nINTERSECT\n') |
|
1418 |
|
1419 def test_date_extraction(self): |
|
1420 self._check("Any MONTH(D) WHERE P is Personne, P creation_date D", |
|
1421 '''SELECT MONTH(_P.cw_creation_date) |
|
1422 FROM cw_Personne AS _P''') |
1422 |
1423 |
1423 def test_union(self): |
1424 def test_union(self): |
1424 for t in self._parse(( |
1425 for t in self._parse(( |
1425 ('(Any N ORDERBY 1 WHERE X name N, X is State)' |
1426 ('(Any N ORDERBY 1 WHERE X name N, X is State)' |
1426 ' UNION ' |
1427 ' UNION ' |
1515 |
1516 |
1516 class MySQLGenerator(PostgresSQLGeneratorTC): |
1517 class MySQLGenerator(PostgresSQLGeneratorTC): |
1517 |
1518 |
1518 def setUp(self): |
1519 def setUp(self): |
1519 RQLGeneratorTC.setUp(self) |
1520 RQLGeneratorTC.setUp(self) |
1520 indexer = get_indexer('mysql', 'utf8') |
1521 dbms_helper = get_db_helper('mysql') |
1521 dbms_helper = ADV_FUNC_HELPER_DIRECTORY['mysql'] |
|
1522 dbms_helper.fti_uid_attr = indexer.uid_attr |
|
1523 dbms_helper.fti_table = indexer.table |
|
1524 dbms_helper.fti_restriction_sql = indexer.restriction_sql |
|
1525 dbms_helper.fti_need_distinct_query = indexer.need_distinct |
|
1526 self.o = SQLGenerator(schema, dbms_helper) |
1522 self.o = SQLGenerator(schema, dbms_helper) |
1527 |
1523 |
1528 def _norm_sql(self, sql): |
1524 def _norm_sql(self, sql): |
1529 sql = sql.strip().replace(' ILIKE ', ' LIKE ').replace('TRUE', '1').replace('FALSE', '0') |
1525 sql = sql.strip().replace(' ILIKE ', ' LIKE ').replace('TRUE', '1').replace('FALSE', '0') |
1530 newsql = [] |
1526 newsql = [] |
1534 if firstword == 'WHERE' and latest == 'SELECT': |
1530 if firstword == 'WHERE' and latest == 'SELECT': |
1535 newsql.append('FROM (SELECT 1) AS _T') |
1531 newsql.append('FROM (SELECT 1) AS _T') |
1536 newsql.append(line) |
1532 newsql.append(line) |
1537 latest = firstword |
1533 latest = firstword |
1538 return '\n'.join(newsql) |
1534 return '\n'.join(newsql) |
|
1535 |
|
1536 def test_date_extraction(self): |
|
1537 self._check("Any MONTH(D) WHERE P is Personne, P creation_date D", |
|
1538 '''SELECT EXTRACT(MONTH from _P.cw_creation_date) |
|
1539 FROM cw_Personne AS _P''') |
1539 |
1540 |
1540 def test_from_clause_needed(self): |
1541 def test_from_clause_needed(self): |
1541 queries = [("Any 1 WHERE EXISTS(T is CWGroup, T name 'managers')", |
1542 queries = [("Any 1 WHERE EXISTS(T is CWGroup, T name 'managers')", |
1542 '''SELECT 1 |
1543 '''SELECT 1 |
1543 FROM (SELECT 1) AS _T |
1544 FROM (SELECT 1) AS _T |