1100 schema = schema |
1100 schema = schema |
1101 |
1101 |
1102 #capture = True |
1102 #capture = True |
1103 def setUp(self): |
1103 def setUp(self): |
1104 RQLGeneratorTC.setUp(self) |
1104 RQLGeneratorTC.setUp(self) |
1105 dbms_helper = get_db_helper('postgres') |
1105 dbhelper = get_db_helper('postgres') |
1106 self.o = SQLGenerator(schema, dbms_helper) |
1106 self.o = SQLGenerator(schema, dbhelper) |
1107 |
1107 |
1108 def _norm_sql(self, sql): |
1108 def _norm_sql(self, sql): |
1109 return sql.strip() |
1109 return sql.strip() |
1110 |
1110 |
1111 def _check(self, rql, sql, varmap=None, args=None): |
1111 def _check(self, rql, sql, varmap=None, args=None): |
1112 if args is None: |
1112 if args is None: |
1113 args = {'text': 'hip hop momo'} |
1113 args = {'text': 'hip hop momo'} |
1114 try: |
1114 try: |
1115 union = self._prepare(rql) |
1115 union = self._prepare(rql) |
1116 r, nargs = self.o.generate(union, args, |
1116 r, nargs, cbs = self.o.generate(union, args, |
1117 varmap=varmap) |
1117 varmap=varmap) |
1118 args.update(nargs) |
1118 args.update(nargs) |
1119 self.assertLinesEquals((r % args).strip(), self._norm_sql(sql), striplines=True) |
1119 self.assertLinesEquals((r % args).strip(), self._norm_sql(sql), striplines=True) |
1120 except Exception, ex: |
1120 except Exception, ex: |
1121 if 'r' in locals(): |
1121 if 'r' in locals(): |
1122 try: |
1122 try: |
1133 yield self._check, rql, sql |
1133 yield self._check, rql, sql |
1134 |
1134 |
1135 def _checkall(self, rql, sql): |
1135 def _checkall(self, rql, sql): |
1136 try: |
1136 try: |
1137 rqlst = self._prepare(rql) |
1137 rqlst = self._prepare(rql) |
1138 r, args = self.o.generate(rqlst) |
1138 r, args, cbs = self.o.generate(rqlst) |
1139 self.assertEqual((r.strip(), args), sql) |
1139 self.assertEqual((r.strip(), args), sql) |
1140 except Exception, ex: |
1140 except Exception, ex: |
1141 print rql |
1141 print rql |
1142 if 'r' in locals(): |
1142 if 'r' in locals(): |
1143 print r.strip() |
1143 print r.strip() |
1195 args={'x': 728}, |
1195 args={'x': 728}, |
1196 varmap={'F.data': '_TDF0.C0', 'D': '_TDF0.C0'}) |
1196 varmap={'F.data': '_TDF0.C0', 'D': '_TDF0.C0'}) |
1197 |
1197 |
1198 def test_is_null_transform(self): |
1198 def test_is_null_transform(self): |
1199 union = self._prepare('Any X WHERE X login %(login)s') |
1199 union = self._prepare('Any X WHERE X login %(login)s') |
1200 r, args = self.o.generate(union, {'login': None}) |
1200 r, args, cbs = self.o.generate(union, {'login': None}) |
1201 self.assertLinesEquals((r % args).strip(), |
1201 self.assertLinesEquals((r % args).strip(), |
1202 '''SELECT _X.cw_eid |
1202 '''SELECT _X.cw_eid |
1203 FROM cw_CWUser AS _X |
1203 FROM cw_CWUser AS _X |
1204 WHERE _X.cw_login IS NULL''') |
1204 WHERE _X.cw_login IS NULL''') |
1205 |
1205 |
1384 def test_ambigous_exists_no_from_clause(self): |
1384 def test_ambigous_exists_no_from_clause(self): |
1385 self._check('Any COUNT(U) WHERE U eid 1, EXISTS (P owned_by U, P is IN (Note, Affaire))', |
1385 self._check('Any COUNT(U) WHERE U eid 1, EXISTS (P owned_by U, P is IN (Note, Affaire))', |
1386 '''SELECT COUNT(1) |
1386 '''SELECT COUNT(1) |
1387 WHERE EXISTS(SELECT 1 FROM owned_by_relation AS rel_owned_by0, cw_Affaire AS _P WHERE rel_owned_by0.eid_from=_P.cw_eid AND rel_owned_by0.eid_to=1 UNION SELECT 1 FROM owned_by_relation AS rel_owned_by1, cw_Note AS _P WHERE rel_owned_by1.eid_from=_P.cw_eid AND rel_owned_by1.eid_to=1)''') |
1387 WHERE EXISTS(SELECT 1 FROM owned_by_relation AS rel_owned_by0, cw_Affaire AS _P WHERE rel_owned_by0.eid_from=_P.cw_eid AND rel_owned_by0.eid_to=1 UNION SELECT 1 FROM owned_by_relation AS rel_owned_by1, cw_Note AS _P WHERE rel_owned_by1.eid_from=_P.cw_eid AND rel_owned_by1.eid_to=1)''') |
1388 |
1388 |
1389 def test_attr_map(self): |
1389 def test_attr_map_sqlcb(self): |
1390 def generate_ref(gen, linkedvar, rel): |
1390 def generate_ref(gen, linkedvar, rel): |
1391 linkedvar.accept(gen) |
1391 linkedvar.accept(gen) |
1392 return 'VERSION_DATA(%s)' % linkedvar._q_sql |
1392 return 'VERSION_DATA(%s)' % linkedvar._q_sql |
1393 self.o.attr_map['Affaire.ref'] = generate_ref |
1393 self.o.attr_map['Affaire.ref'] = (generate_ref, False) |
1394 try: |
1394 try: |
1395 self._check('Any R WHERE X ref R', |
1395 self._check('Any R WHERE X ref R', |
1396 '''SELECT VERSION_DATA(_X.cw_eid) |
1396 '''SELECT VERSION_DATA(_X.cw_eid) |
1397 FROM cw_Affaire AS _X''') |
1397 FROM cw_Affaire AS _X''') |
1398 self._check('Any X WHERE X ref 1', |
1398 self._check('Any X WHERE X ref 1', |
1400 FROM cw_Affaire AS _X |
1400 FROM cw_Affaire AS _X |
1401 WHERE VERSION_DATA(_X.cw_eid)=1''') |
1401 WHERE VERSION_DATA(_X.cw_eid)=1''') |
1402 finally: |
1402 finally: |
1403 self.o.attr_map.clear() |
1403 self.o.attr_map.clear() |
1404 |
1404 |
|
1405 def test_attr_map_sourcecb(self): |
|
1406 cb = lambda x,y: None |
|
1407 self.o.attr_map['Affaire.ref'] = (cb, True) |
|
1408 try: |
|
1409 union = self._prepare('Any R WHERE X ref R') |
|
1410 r, nargs, cbs = self.o.generate(union, args={}) |
|
1411 self.assertLinesEquals(r.strip(), 'SELECT _X.cw_ref\nFROM cw_Affaire AS _X') |
|
1412 self.assertEquals(cbs, {0: [cb]}) |
|
1413 finally: |
|
1414 self.o.attr_map.clear() |
|
1415 |
1405 |
1416 |
1406 class SqliteSQLGeneratorTC(PostgresSQLGeneratorTC): |
1417 class SqliteSQLGeneratorTC(PostgresSQLGeneratorTC): |
1407 |
1418 |
1408 def setUp(self): |
1419 def setUp(self): |
1409 RQLGeneratorTC.setUp(self) |
1420 RQLGeneratorTC.setUp(self) |
1410 dbms_helper = get_db_helper('sqlite') |
1421 dbhelper = get_db_helper('sqlite') |
1411 self.o = SQLGenerator(schema, dbms_helper) |
1422 self.o = SQLGenerator(schema, dbhelper) |
1412 |
1423 |
1413 def _norm_sql(self, sql): |
1424 def _norm_sql(self, sql): |
1414 return sql.strip().replace(' ILIKE ', ' LIKE ').replace('\nINTERSECT ALL\n', '\nINTERSECT\n') |
1425 return sql.strip().replace(' ILIKE ', ' LIKE ').replace('\nINTERSECT ALL\n', '\nINTERSECT\n') |
1415 |
1426 |
1416 def test_date_extraction(self): |
1427 def test_date_extraction(self): |
1513 |
1524 |
1514 class MySQLGenerator(PostgresSQLGeneratorTC): |
1525 class MySQLGenerator(PostgresSQLGeneratorTC): |
1515 |
1526 |
1516 def setUp(self): |
1527 def setUp(self): |
1517 RQLGeneratorTC.setUp(self) |
1528 RQLGeneratorTC.setUp(self) |
1518 dbms_helper = get_db_helper('mysql') |
1529 dbhelper = get_db_helper('mysql') |
1519 self.o = SQLGenerator(schema, dbms_helper) |
1530 self.o = SQLGenerator(schema, dbhelper) |
1520 |
1531 |
1521 def _norm_sql(self, sql): |
1532 def _norm_sql(self, sql): |
1522 sql = sql.strip().replace(' ILIKE ', ' LIKE ').replace('TRUE', '1').replace('FALSE', '0') |
1533 sql = sql.strip().replace(' ILIKE ', ' LIKE ').replace('TRUE', '1').replace('FALSE', '0') |
1523 newsql = [] |
1534 newsql = [] |
1524 latest = None |
1535 latest = None |