server/test/unittest_rql2sql.py
changeset 4831 c5aec27c1bf7
parent 4766 162b2b127b15
child 4845 dc351b96f596
equal deleted inserted replaced
4829:3b79a0fc91db 4831:c5aec27c1bf7
    11 import sys
    11 import sys
    12 
    12 
    13 from logilab.common.testlib import TestCase, unittest_main, mock_object
    13 from logilab.common.testlib import TestCase, unittest_main, mock_object
    14 
    14 
    15 from rql import BadRQLQuery
    15 from rql import BadRQLQuery
    16 from indexer import get_indexer
       
    17 
    16 
    18 #from cubicweb.server.sources.native import remove_unused_solutions
    17 #from cubicweb.server.sources.native import remove_unused_solutions
    19 from cubicweb.server.sources.rql2sql import SQLGenerator, remove_unused_solutions
    18 from cubicweb.server.sources.rql2sql import SQLGenerator, remove_unused_solutions
    20 
    19 
    21 from rql.utils import register_function, FunctionDescr
    20 from rql.utils import register_function, FunctionDescr
  1070      '''SELECT rel_is0.eid_from
  1069      '''SELECT rel_is0.eid_from
  1071 FROM is_relation AS rel_is0
  1070 FROM is_relation AS rel_is0
  1072 WHERE rel_is0.eid_to=2'''),
  1071 WHERE rel_is0.eid_to=2'''),
  1073 
  1072 
  1074     ]
  1073     ]
  1075 from logilab.common.adbh import ADV_FUNC_HELPER_DIRECTORY
  1074 from logilab.db import get_db_helper
  1076 
  1075 
  1077 class CWRQLTC(RQLGeneratorTC):
  1076 class CWRQLTC(RQLGeneratorTC):
  1078     schema = schema
  1077     schema = schema
  1079 
  1078 
  1080     def test_nonregr_sol(self):
  1079     def test_nonregr_sol(self):
  1104     schema = schema
  1103     schema = schema
  1105 
  1104 
  1106     #capture = True
  1105     #capture = True
  1107     def setUp(self):
  1106     def setUp(self):
  1108         RQLGeneratorTC.setUp(self)
  1107         RQLGeneratorTC.setUp(self)
  1109         indexer = get_indexer('postgres', 'utf8')
  1108         dbms_helper = get_db_helper('postgres')
  1110         dbms_helper = ADV_FUNC_HELPER_DIRECTORY['postgres']
       
  1111         dbms_helper.fti_uid_attr = indexer.uid_attr
       
  1112         dbms_helper.fti_table = indexer.table
       
  1113         dbms_helper.fti_restriction_sql = indexer.restriction_sql
       
  1114         dbms_helper.fti_need_distinct_query = indexer.need_distinct
       
  1115         self.o = SQLGenerator(schema, dbms_helper)
  1109         self.o = SQLGenerator(schema, dbms_helper)
  1116 
  1110 
  1117     def _norm_sql(self, sql):
  1111     def _norm_sql(self, sql):
  1118         return sql.strip()
  1112         return sql.strip()
  1119 
  1113 
  1210         self.assertLinesEquals((r % args).strip(),
  1204         self.assertLinesEquals((r % args).strip(),
  1211                                '''SELECT _X.cw_eid
  1205                                '''SELECT _X.cw_eid
  1212 FROM cw_CWUser AS _X
  1206 FROM cw_CWUser AS _X
  1213 WHERE _X.cw_login IS NULL''')
  1207 WHERE _X.cw_login IS NULL''')
  1214 
  1208 
       
  1209 
       
  1210     def test_date_extraction(self):
       
  1211         self._check("Any MONTH(D) WHERE P is Personne, P creation_date D",
       
  1212                     '''SELECT CAST(EXTRACT(MONTH from _P.cw_creation_date) AS INTEGER)
       
  1213 FROM cw_Personne AS _P''')
       
  1214 
       
  1215 
  1215     def test_parser_parse(self):
  1216     def test_parser_parse(self):
  1216         for t in self._parse(PARSER):
  1217         for t in self._parse(PARSER):
  1217             yield t
  1218             yield t
  1218 
  1219 
  1219     def test_basic_parse(self):
  1220     def test_basic_parse(self):
  1407 
  1408 
  1408 class SqliteSQLGeneratorTC(PostgresSQLGeneratorTC):
  1409 class SqliteSQLGeneratorTC(PostgresSQLGeneratorTC):
  1409 
  1410 
  1410     def setUp(self):
  1411     def setUp(self):
  1411         RQLGeneratorTC.setUp(self)
  1412         RQLGeneratorTC.setUp(self)
  1412         indexer = get_indexer('sqlite', 'utf8')
  1413         dbms_helper = get_db_helper('sqlite')
  1413         dbms_helper = ADV_FUNC_HELPER_DIRECTORY['sqlite']
       
  1414         dbms_helper.fti_uid_attr = indexer.uid_attr
       
  1415         dbms_helper.fti_table = indexer.table
       
  1416         dbms_helper.fti_restriction_sql = indexer.restriction_sql
       
  1417         dbms_helper.fti_need_distinct_query = indexer.need_distinct
       
  1418         self.o = SQLGenerator(schema, dbms_helper)
  1414         self.o = SQLGenerator(schema, dbms_helper)
  1419 
  1415 
  1420     def _norm_sql(self, sql):
  1416     def _norm_sql(self, sql):
  1421         return sql.strip().replace(' ILIKE ', ' LIKE ').replace('\nINTERSECT ALL\n', '\nINTERSECT\n')
  1417         return sql.strip().replace(' ILIKE ', ' LIKE ').replace('\nINTERSECT ALL\n', '\nINTERSECT\n')
       
  1418 
       
  1419     def test_date_extraction(self):
       
  1420         self._check("Any MONTH(D) WHERE P is Personne, P creation_date D",
       
  1421                     '''SELECT MONTH(_P.cw_creation_date)
       
  1422 FROM cw_Personne AS _P''')
  1422 
  1423 
  1423     def test_union(self):
  1424     def test_union(self):
  1424         for t in self._parse((
  1425         for t in self._parse((
  1425             ('(Any N ORDERBY 1 WHERE X name N, X is State)'
  1426             ('(Any N ORDERBY 1 WHERE X name N, X is State)'
  1426              ' UNION '
  1427              ' UNION '
  1515 
  1516 
  1516 class MySQLGenerator(PostgresSQLGeneratorTC):
  1517 class MySQLGenerator(PostgresSQLGeneratorTC):
  1517 
  1518 
  1518     def setUp(self):
  1519     def setUp(self):
  1519         RQLGeneratorTC.setUp(self)
  1520         RQLGeneratorTC.setUp(self)
  1520         indexer = get_indexer('mysql', 'utf8')
  1521         dbms_helper = get_db_helper('mysql')
  1521         dbms_helper = ADV_FUNC_HELPER_DIRECTORY['mysql']
       
  1522         dbms_helper.fti_uid_attr = indexer.uid_attr
       
  1523         dbms_helper.fti_table = indexer.table
       
  1524         dbms_helper.fti_restriction_sql = indexer.restriction_sql
       
  1525         dbms_helper.fti_need_distinct_query = indexer.need_distinct
       
  1526         self.o = SQLGenerator(schema, dbms_helper)
  1522         self.o = SQLGenerator(schema, dbms_helper)
  1527 
  1523 
  1528     def _norm_sql(self, sql):
  1524     def _norm_sql(self, sql):
  1529         sql = sql.strip().replace(' ILIKE ', ' LIKE ').replace('TRUE', '1').replace('FALSE', '0')
  1525         sql = sql.strip().replace(' ILIKE ', ' LIKE ').replace('TRUE', '1').replace('FALSE', '0')
  1530         newsql = []
  1526         newsql = []
  1534             if firstword == 'WHERE' and latest == 'SELECT':
  1530             if firstword == 'WHERE' and latest == 'SELECT':
  1535                 newsql.append('FROM (SELECT 1) AS _T')
  1531                 newsql.append('FROM (SELECT 1) AS _T')
  1536             newsql.append(line)
  1532             newsql.append(line)
  1537             latest = firstword
  1533             latest = firstword
  1538         return '\n'.join(newsql)
  1534         return '\n'.join(newsql)
       
  1535 
       
  1536     def test_date_extraction(self):
       
  1537         self._check("Any MONTH(D) WHERE P is Personne, P creation_date D",
       
  1538                     '''SELECT EXTRACT(MONTH from _P.cw_creation_date)
       
  1539 FROM cw_Personne AS _P''')
  1539 
  1540 
  1540     def test_from_clause_needed(self):
  1541     def test_from_clause_needed(self):
  1541         queries = [("Any 1 WHERE EXISTS(T is CWGroup, T name 'managers')",
  1542         queries = [("Any 1 WHERE EXISTS(T is CWGroup, T name 'managers')",
  1542                     '''SELECT 1
  1543                     '''SELECT 1
  1543 FROM (SELECT 1) AS _T
  1544 FROM (SELECT 1) AS _T