7 from cubicweb.devtools.repotest import tuplify, BaseQuerierTC |
7 from cubicweb.devtools.repotest import tuplify, BaseQuerierTC |
8 from unittest_session import Variable |
8 from unittest_session import Variable |
9 |
9 |
10 from mx.DateTime import today, now, DateTimeType |
10 from mx.DateTime import today, now, DateTimeType |
11 from rql import BadRQLQuery, RQLSyntaxError |
11 from rql import BadRQLQuery, RQLSyntaxError |
|
12 |
12 from cubicweb import QueryError, Unauthorized |
13 from cubicweb import QueryError, Unauthorized |
|
14 from cubicweb.server.sqlutils import SQL_PREFIX |
13 from cubicweb.server.utils import crypt_password |
15 from cubicweb.server.utils import crypt_password |
14 from cubicweb.server.sources.native import make_schema |
16 from cubicweb.server.sources.native import make_schema |
15 |
17 |
16 |
18 |
17 # register priority/severity sorting registered procedure |
19 # register priority/severity sorting registered procedure |
466 'HAVING COUNT(X) > 10', {'x': self.ueid}) |
468 'HAVING COUNT(X) > 10', {'x': self.ueid}) |
467 self.assertEquals(len(rset.rows), 1) |
469 self.assertEquals(len(rset.rows), 1) |
468 self.assertEquals(rset.rows[0][0], self.ueid) |
470 self.assertEquals(rset.rows[0][0], self.ueid) |
469 |
471 |
470 def test_select_complex_sort(self): |
472 def test_select_complex_sort(self): |
|
473 self.skip('retry me once http://www.sqlite.org/cvstrac/tktview?tn=3773 is fixed') |
471 rset = self.execute('Any X ORDERBY X,D LIMIT 5 WHERE X creation_date D') |
474 rset = self.execute('Any X ORDERBY X,D LIMIT 5 WHERE X creation_date D') |
472 result = rset.rows |
475 result = rset.rows |
473 result.sort() |
476 result.sort() |
474 self.assertEquals(tuplify(result), [(1,), (2,), (3,), (4,), (5,)]) |
477 self.assertEquals(tuplify(result), [(1,), (2,), (3,), (4,), (5,)]) |
475 |
478 |
1070 self.assertEquals(len(rset.rows), 1) |
1073 self.assertEquals(len(rset.rows), 1) |
1071 self.assertEquals(rset.description, [('EUser',)]) |
1074 self.assertEquals(rset.description, [('EUser',)]) |
1072 self.assertRaises(Unauthorized, |
1075 self.assertRaises(Unauthorized, |
1073 self.execute, "Any P WHERE X is EUser, X login 'bob', X upassword P") |
1076 self.execute, "Any P WHERE X is EUser, X login 'bob', X upassword P") |
1074 cursor = self.pool['system'] |
1077 cursor = self.pool['system'] |
1075 cursor.execute("SELECT upassword from EUser WHERE login='bob'") |
1078 cursor.execute("SELECT %supassword from %sEUser WHERE %slogin='bob'" |
|
1079 % (SQL_PREFIX, SQL_PREFIX, SQL_PREFIX)) |
1076 passwd = cursor.fetchone()[0].getvalue() |
1080 passwd = cursor.fetchone()[0].getvalue() |
1077 self.assertEquals(passwd, crypt_password('toto', passwd[:2])) |
1081 self.assertEquals(passwd, crypt_password('toto', passwd[:2])) |
1078 rset = self.execute("Any X WHERE X is EUser, X login 'bob', X upassword '%s'" % passwd) |
1082 rset = self.execute("Any X WHERE X is EUser, X login 'bob', X upassword '%s'" % passwd) |
1079 self.assertEquals(len(rset.rows), 1) |
1083 self.assertEquals(len(rset.rows), 1) |
1080 self.assertEquals(rset.description, [('EUser',)]) |
1084 self.assertEquals(rset.description, [('EUser',)]) |
1083 cursor = self.pool['system'] |
1087 cursor = self.pool['system'] |
1084 rset = self.execute("INSERT EUser X: X login 'bob', X upassword %(pwd)s", {'pwd': 'toto'}) |
1088 rset = self.execute("INSERT EUser X: X login 'bob', X upassword %(pwd)s", {'pwd': 'toto'}) |
1085 self.assertEquals(rset.description[0][0], 'EUser') |
1089 self.assertEquals(rset.description[0][0], 'EUser') |
1086 rset = self.execute("SET X upassword %(pwd)s WHERE X is EUser, X login 'bob'", |
1090 rset = self.execute("SET X upassword %(pwd)s WHERE X is EUser, X login 'bob'", |
1087 {'pwd': 'tutu'}) |
1091 {'pwd': 'tutu'}) |
1088 cursor.execute("SELECT upassword from EUser WHERE login='bob'") |
1092 cursor.execute("SELECT %supassword from %sEUser WHERE %slogin='bob'" |
|
1093 % (SQL_PREFIX, SQL_PREFIX, SQL_PREFIX)) |
1089 passwd = cursor.fetchone()[0].getvalue() |
1094 passwd = cursor.fetchone()[0].getvalue() |
1090 self.assertEquals(passwd, crypt_password('tutu', passwd[:2])) |
1095 self.assertEquals(passwd, crypt_password('tutu', passwd[:2])) |
1091 rset = self.execute("Any X WHERE X is EUser, X login 'bob', X upassword '%s'" % passwd) |
1096 rset = self.execute("Any X WHERE X is EUser, X login 'bob', X upassword '%s'" % passwd) |
1092 self.assertEquals(len(rset.rows), 1) |
1097 self.assertEquals(len(rset.rows), 1) |
1093 self.assertEquals(rset.description, [('EUser',)]) |
1098 self.assertEquals(rset.description, [('EUser',)]) |
1210 """Error: Variable has no attribute 'sql' in rql2sql.py (visit_variable) |
1215 """Error: Variable has no attribute 'sql' in rql2sql.py (visit_variable) |
1211 |
1216 |
1212 cause: old variable ref inserted into a fresh rqlst copy |
1217 cause: old variable ref inserted into a fresh rqlst copy |
1213 (in RQLSpliter._complex_select_plan) |
1218 (in RQLSpliter._complex_select_plan) |
1214 """ |
1219 """ |
|
1220 self.skip('retry me once http://www.sqlite.org/cvstrac/tktview?tn=3773 is fixed') |
1215 self.execute('Any X ORDERBY D DESC WHERE X creation_date D') |
1221 self.execute('Any X ORDERBY D DESC WHERE X creation_date D') |
1216 |
1222 |
1217 def test_nonregr_extra_joins(self): |
1223 def test_nonregr_extra_joins(self): |
1218 ueid = self.session.user.eid |
1224 ueid = self.session.user.eid |
1219 teid1 = self.execute("INSERT Folder X: X name 'folder1'")[0][0] |
1225 teid1 = self.execute("INSERT Folder X: X name 'folder1'")[0][0] |