90 self.assertRaises(AuthenticationError, |
90 self.assertRaises(AuthenticationError, |
91 self.repo.connect, self.admlogin) |
91 self.repo.connect, self.admlogin) |
92 self.assertRaises(AuthenticationError, |
92 self.assertRaises(AuthenticationError, |
93 self.repo.connect, None) |
93 self.repo.connect, None) |
94 |
94 |
95 def test_execute(self): |
95 def test_login_upassword_accent(self): |
|
96 with self.admin_access.repo_cnx() as cnx: |
|
97 cnx.execute('INSERT CWUser X: X login %(login)s, X upassword %(passwd)s, ' |
|
98 'X in_group G WHERE G name "users"', |
|
99 {'login': u"barnabé", 'passwd': u"héhéhé".encode('UTF8')}) |
|
100 cnx.commit() |
96 repo = self.repo |
101 repo = self.repo |
97 cnxid = repo.connect(self.admlogin, password=self.admpassword) |
|
98 repo.execute(cnxid, 'Any X') |
|
99 repo.execute(cnxid, 'Any X where X is Personne') |
|
100 repo.execute(cnxid, 'Any X where X is Personne, X nom ~= "to"') |
|
101 repo.execute(cnxid, 'Any X WHERE X has_text %(text)s', {'text': u'\xe7a'}) |
|
102 repo.close(cnxid) |
|
103 |
|
104 def test_login_upassword_accent(self): |
|
105 repo = self.repo |
|
106 cnxid = repo.connect(self.admlogin, password=self.admpassword) |
|
107 repo.execute(cnxid, 'INSERT CWUser X: X login %(login)s, X upassword %(passwd)s, X in_group G WHERE G name "users"', |
|
108 {'login': u"barnabé", 'passwd': u"héhéhé".encode('UTF8')}) |
|
109 repo.commit(cnxid) |
|
110 repo.close(cnxid) |
|
111 cnxid = repo.connect(u"barnabé", password=u"héhéhé".encode('UTF8')) |
102 cnxid = repo.connect(u"barnabé", password=u"héhéhé".encode('UTF8')) |
112 self.assert_(cnxid) |
103 self.assert_(cnxid) |
113 repo.close(cnxid) |
104 repo.close(cnxid) |
114 |
|
115 def test_rollback_on_commit_error(self): |
|
116 cnxid = self.repo.connect(self.admlogin, password=self.admpassword) |
|
117 self.repo.execute(cnxid, |
|
118 'INSERT CWUser X: X login %(login)s, X upassword %(passwd)s', |
|
119 {'login': u"tutetute", 'passwd': 'tutetute'}) |
|
120 self.assertRaises(ValidationError, self.repo.commit, cnxid) |
|
121 self.assertFalse(self.repo.execute(cnxid, 'CWUser X WHERE X login "tutetute"')) |
|
122 self.repo.close(cnxid) |
|
123 |
105 |
124 def test_rollback_on_execute_validation_error(self): |
106 def test_rollback_on_execute_validation_error(self): |
125 class ValidationErrorAfterHook(Hook): |
107 class ValidationErrorAfterHook(Hook): |
126 __regid__ = 'valerror-after-hook' |
108 __regid__ = 'valerror-after-hook' |
127 __select__ = Hook.__select__ & is_instance('CWGroup') |
109 __select__ = Hook.__select__ & is_instance('CWGroup') |
163 def test_close(self): |
145 def test_close(self): |
164 repo = self.repo |
146 repo = self.repo |
165 cnxid = repo.connect(self.admlogin, password=self.admpassword) |
147 cnxid = repo.connect(self.admlogin, password=self.admpassword) |
166 self.assert_(cnxid) |
148 self.assert_(cnxid) |
167 repo.close(cnxid) |
149 repo.close(cnxid) |
168 self.assertRaises(BadConnectionId, repo.execute, cnxid, 'Any X') |
|
169 |
|
170 def test_invalid_cnxid(self): |
|
171 self.assertRaises(BadConnectionId, self.repo.execute, 0, 'Any X') |
|
172 self.assertRaises(BadConnectionId, self.repo.close, None) |
|
173 |
150 |
174 def test_shared_data(self): |
151 def test_shared_data(self): |
175 repo = self.repo |
152 repo = self.repo |
176 cnxid = repo.connect(self.admlogin, password=self.admpassword) |
153 cnxid = repo.connect(self.admlogin, password=self.admpassword) |
177 repo.set_shared_data(cnxid, 'data', 4) |
154 repo.set_shared_data(cnxid, 'data', 4) |
195 repo = self.repo |
172 repo = self.repo |
196 cnxid = repo.connect(self.admlogin, password=self.admpassword) |
173 cnxid = repo.connect(self.admlogin, password=self.admpassword) |
197 self.assertIsInstance(repo.check_session(cnxid), float) |
174 self.assertIsInstance(repo.check_session(cnxid), float) |
198 repo.close(cnxid) |
175 repo.close(cnxid) |
199 self.assertRaises(BadConnectionId, repo.check_session, cnxid) |
176 self.assertRaises(BadConnectionId, repo.check_session, cnxid) |
200 |
|
201 def test_transaction_base(self): |
|
202 repo = self.repo |
|
203 cnxid = repo.connect(self.admlogin, password=self.admpassword) |
|
204 # check db state |
|
205 result = repo.execute(cnxid, 'Personne X') |
|
206 self.assertEqual(result.rowcount, 0) |
|
207 # rollback entity insertion |
|
208 repo.execute(cnxid, "INSERT Personne X: X nom 'bidule'") |
|
209 result = repo.execute(cnxid, 'Personne X') |
|
210 self.assertEqual(result.rowcount, 1) |
|
211 repo.rollback(cnxid) |
|
212 result = repo.execute(cnxid, 'Personne X') |
|
213 self.assertEqual(result.rowcount, 0, result.rows) |
|
214 # commit |
|
215 repo.execute(cnxid, "INSERT Personne X: X nom 'bidule'") |
|
216 repo.commit(cnxid) |
|
217 result = repo.execute(cnxid, 'Personne X') |
|
218 self.assertEqual(result.rowcount, 1) |
|
219 repo.close(cnxid) |
|
220 |
|
221 def test_transaction_base2(self): |
|
222 repo = self.repo |
|
223 cnxid = repo.connect(self.admlogin, password=self.admpassword) |
|
224 # rollback relation insertion |
|
225 repo.execute(cnxid, "SET U in_group G WHERE U login 'admin', G name 'guests'") |
|
226 result = repo.execute(cnxid, "Any U WHERE U in_group G, U login 'admin', G name 'guests'") |
|
227 self.assertEqual(result.rowcount, 1) |
|
228 repo.rollback(cnxid) |
|
229 result = repo.execute(cnxid, "Any U WHERE U in_group G, U login 'admin', G name 'guests'") |
|
230 self.assertEqual(result.rowcount, 0, result.rows) |
|
231 repo.close(cnxid) |
|
232 |
|
233 def test_transaction_base3(self): |
|
234 repo = self.repo |
|
235 cnxid = repo.connect(self.admlogin, password=self.admpassword) |
|
236 # rollback state change which trigger TrInfo insertion |
|
237 session = repo._get_session(cnxid) |
|
238 user = session.user |
|
239 user.cw_adapt_to('IWorkflowable').fire_transition('deactivate') |
|
240 rset = repo.execute(cnxid, 'TrInfo T WHERE T wf_info_for X, X eid %(x)s', {'x': user.eid}) |
|
241 self.assertEqual(len(rset), 1) |
|
242 repo.rollback(cnxid) |
|
243 rset = repo.execute(cnxid, 'TrInfo T WHERE T wf_info_for X, X eid %(x)s', {'x': user.eid}) |
|
244 self.assertEqual(len(rset), 0) |
|
245 repo.close(cnxid) |
|
246 |
|
247 def test_close_kill_processing_request(self): |
|
248 repo = self.repo |
|
249 cnxid = repo.connect(self.admlogin, password=self.admpassword) |
|
250 repo.execute(cnxid, 'INSERT CWUser X: X login "toto", X upassword "tutu", X in_group G WHERE G name "users"') |
|
251 repo.commit(cnxid) |
|
252 lock = threading.Lock() |
|
253 lock.acquire() |
|
254 # close has to be in the thread due to sqlite limitations |
|
255 def close_in_a_few_moment(): |
|
256 lock.acquire() |
|
257 repo.close(cnxid) |
|
258 t = threading.Thread(target=close_in_a_few_moment) |
|
259 t.start() |
|
260 def run_transaction(): |
|
261 lock.release() |
|
262 repo.execute(cnxid, 'DELETE CWUser X WHERE X login "toto"') |
|
263 repo.commit(cnxid) |
|
264 try: |
|
265 with self.assertRaises(SessionClosedError) as cm: |
|
266 run_transaction() |
|
267 self.assertEqual(str(cm.exception), 'try to access connections set on a closed session %s' % cnxid) |
|
268 finally: |
|
269 t.join() |
|
270 |
177 |
271 def test_initial_schema(self): |
178 def test_initial_schema(self): |
272 schema = self.repo.schema |
179 schema = self.repo.schema |
273 # check order of attributes is respected |
180 # check order of attributes is respected |
274 notin = set(('eid', 'is', 'is_instance_of', 'identity', |
181 notin = set(('eid', 'is', 'is_instance_of', 'identity', |