server/test/unittest_rql2sql.py
changeset 5016 b3b0b808a0ed
parent 5004 4cc020ee70e2
parent 5013 ad91f93bbb93
child 5233 673b63953e7a
equal deleted inserted replaced
5004:4cc020ee70e2 5016:b3b0b808a0ed
  1100     schema = schema
  1100     schema = schema
  1101 
  1101 
  1102     #capture = True
  1102     #capture = True
  1103     def setUp(self):
  1103     def setUp(self):
  1104         RQLGeneratorTC.setUp(self)
  1104         RQLGeneratorTC.setUp(self)
  1105         dbms_helper = get_db_helper('postgres')
  1105         dbhelper = get_db_helper('postgres')
  1106         self.o = SQLGenerator(schema, dbms_helper)
  1106         self.o = SQLGenerator(schema, dbhelper)
  1107 
  1107 
  1108     def _norm_sql(self, sql):
  1108     def _norm_sql(self, sql):
  1109         return sql.strip()
  1109         return sql.strip()
  1110 
  1110 
  1111     def _check(self, rql, sql, varmap=None, args=None):
  1111     def _check(self, rql, sql, varmap=None, args=None):
  1112         if args is None:
  1112         if args is None:
  1113             args = {'text': 'hip hop momo'}
  1113             args = {'text': 'hip hop momo'}
  1114         try:
  1114         try:
  1115             union = self._prepare(rql)
  1115             union = self._prepare(rql)
  1116             r, nargs = self.o.generate(union, args,
  1116             r, nargs, cbs = self.o.generate(union, args,
  1117                                       varmap=varmap)
  1117                                             varmap=varmap)
  1118             args.update(nargs)
  1118             args.update(nargs)
  1119             self.assertLinesEquals((r % args).strip(), self._norm_sql(sql), striplines=True)
  1119             self.assertLinesEquals((r % args).strip(), self._norm_sql(sql), striplines=True)
  1120         except Exception, ex:
  1120         except Exception, ex:
  1121             if 'r' in locals():
  1121             if 'r' in locals():
  1122                 try:
  1122                 try:
  1133             yield self._check, rql, sql
  1133             yield self._check, rql, sql
  1134 
  1134 
  1135     def _checkall(self, rql, sql):
  1135     def _checkall(self, rql, sql):
  1136         try:
  1136         try:
  1137             rqlst = self._prepare(rql)
  1137             rqlst = self._prepare(rql)
  1138             r, args = self.o.generate(rqlst)
  1138             r, args, cbs = self.o.generate(rqlst)
  1139             self.assertEqual((r.strip(), args), sql)
  1139             self.assertEqual((r.strip(), args), sql)
  1140         except Exception, ex:
  1140         except Exception, ex:
  1141             print rql
  1141             print rql
  1142             if 'r' in locals():
  1142             if 'r' in locals():
  1143                 print r.strip()
  1143                 print r.strip()
  1195                     args={'x': 728},
  1195                     args={'x': 728},
  1196                     varmap={'F.data': '_TDF0.C0', 'D': '_TDF0.C0'})
  1196                     varmap={'F.data': '_TDF0.C0', 'D': '_TDF0.C0'})
  1197 
  1197 
  1198     def test_is_null_transform(self):
  1198     def test_is_null_transform(self):
  1199         union = self._prepare('Any X WHERE X login %(login)s')
  1199         union = self._prepare('Any X WHERE X login %(login)s')
  1200         r, args = self.o.generate(union, {'login': None})
  1200         r, args, cbs = self.o.generate(union, {'login': None})
  1201         self.assertLinesEquals((r % args).strip(),
  1201         self.assertLinesEquals((r % args).strip(),
  1202                                '''SELECT _X.cw_eid
  1202                                '''SELECT _X.cw_eid
  1203 FROM cw_CWUser AS _X
  1203 FROM cw_CWUser AS _X
  1204 WHERE _X.cw_login IS NULL''')
  1204 WHERE _X.cw_login IS NULL''')
  1205 
  1205 
  1384     def test_ambigous_exists_no_from_clause(self):
  1384     def test_ambigous_exists_no_from_clause(self):
  1385         self._check('Any COUNT(U) WHERE U eid 1, EXISTS (P owned_by U, P is IN (Note, Affaire))',
  1385         self._check('Any COUNT(U) WHERE U eid 1, EXISTS (P owned_by U, P is IN (Note, Affaire))',
  1386                     '''SELECT COUNT(1)
  1386                     '''SELECT COUNT(1)
  1387 WHERE EXISTS(SELECT 1 FROM owned_by_relation AS rel_owned_by0, cw_Affaire AS _P WHERE rel_owned_by0.eid_from=_P.cw_eid AND rel_owned_by0.eid_to=1 UNION SELECT 1 FROM owned_by_relation AS rel_owned_by1, cw_Note AS _P WHERE rel_owned_by1.eid_from=_P.cw_eid AND rel_owned_by1.eid_to=1)''')
  1387 WHERE EXISTS(SELECT 1 FROM owned_by_relation AS rel_owned_by0, cw_Affaire AS _P WHERE rel_owned_by0.eid_from=_P.cw_eid AND rel_owned_by0.eid_to=1 UNION SELECT 1 FROM owned_by_relation AS rel_owned_by1, cw_Note AS _P WHERE rel_owned_by1.eid_from=_P.cw_eid AND rel_owned_by1.eid_to=1)''')
  1388 
  1388 
  1389     def test_attr_map(self):
  1389     def test_attr_map_sqlcb(self):
  1390         def generate_ref(gen, linkedvar, rel):
  1390         def generate_ref(gen, linkedvar, rel):
  1391             linkedvar.accept(gen)
  1391             linkedvar.accept(gen)
  1392             return 'VERSION_DATA(%s)' % linkedvar._q_sql
  1392             return 'VERSION_DATA(%s)' % linkedvar._q_sql
  1393         self.o.attr_map['Affaire.ref'] = generate_ref
  1393         self.o.attr_map['Affaire.ref'] = (generate_ref, False)
  1394         try:
  1394         try:
  1395             self._check('Any R WHERE X ref R',
  1395             self._check('Any R WHERE X ref R',
  1396                         '''SELECT VERSION_DATA(_X.cw_eid)
  1396                         '''SELECT VERSION_DATA(_X.cw_eid)
  1397 FROM cw_Affaire AS _X''')
  1397 FROM cw_Affaire AS _X''')
  1398             self._check('Any X WHERE X ref 1',
  1398             self._check('Any X WHERE X ref 1',
  1400 FROM cw_Affaire AS _X
  1400 FROM cw_Affaire AS _X
  1401 WHERE VERSION_DATA(_X.cw_eid)=1''')
  1401 WHERE VERSION_DATA(_X.cw_eid)=1''')
  1402         finally:
  1402         finally:
  1403             self.o.attr_map.clear()
  1403             self.o.attr_map.clear()
  1404 
  1404 
       
  1405     def test_attr_map_sourcecb(self):
       
  1406         cb = lambda x,y: None
       
  1407         self.o.attr_map['Affaire.ref'] = (cb, True)
       
  1408         try:
       
  1409             union = self._prepare('Any R WHERE X ref R')
       
  1410             r, nargs, cbs = self.o.generate(union, args={})
       
  1411             self.assertLinesEquals(r.strip(), 'SELECT _X.cw_ref\nFROM cw_Affaire AS _X')
       
  1412             self.assertEquals(cbs, {0: [cb]})
       
  1413         finally:
       
  1414             self.o.attr_map.clear()
       
  1415 
  1405 
  1416 
  1406 class SqliteSQLGeneratorTC(PostgresSQLGeneratorTC):
  1417 class SqliteSQLGeneratorTC(PostgresSQLGeneratorTC):
  1407 
  1418 
  1408     def setUp(self):
  1419     def setUp(self):
  1409         RQLGeneratorTC.setUp(self)
  1420         RQLGeneratorTC.setUp(self)
  1410         dbms_helper = get_db_helper('sqlite')
  1421         dbhelper = get_db_helper('sqlite')
  1411         self.o = SQLGenerator(schema, dbms_helper)
  1422         self.o = SQLGenerator(schema, dbhelper)
  1412 
  1423 
  1413     def _norm_sql(self, sql):
  1424     def _norm_sql(self, sql):
  1414         return sql.strip().replace(' ILIKE ', ' LIKE ').replace('\nINTERSECT ALL\n', '\nINTERSECT\n')
  1425         return sql.strip().replace(' ILIKE ', ' LIKE ').replace('\nINTERSECT ALL\n', '\nINTERSECT\n')
  1415 
  1426 
  1416     def test_date_extraction(self):
  1427     def test_date_extraction(self):
  1513 
  1524 
  1514 class MySQLGenerator(PostgresSQLGeneratorTC):
  1525 class MySQLGenerator(PostgresSQLGeneratorTC):
  1515 
  1526 
  1516     def setUp(self):
  1527     def setUp(self):
  1517         RQLGeneratorTC.setUp(self)
  1528         RQLGeneratorTC.setUp(self)
  1518         dbms_helper = get_db_helper('mysql')
  1529         dbhelper = get_db_helper('mysql')
  1519         self.o = SQLGenerator(schema, dbms_helper)
  1530         self.o = SQLGenerator(schema, dbhelper)
  1520 
  1531 
  1521     def _norm_sql(self, sql):
  1532     def _norm_sql(self, sql):
  1522         sql = sql.strip().replace(' ILIKE ', ' LIKE ').replace('TRUE', '1').replace('FALSE', '0')
  1533         sql = sql.strip().replace(' ILIKE ', ' LIKE ').replace('TRUE', '1').replace('FALSE', '0')
  1523         newsql = []
  1534         newsql = []
  1524         latest = None
  1535         latest = None