|
1 """This module provides misc utilities to test applications |
|
2 |
|
3 :organization: Logilab |
|
4 :copyright: 2001-2008 LOGILAB S.A. (Paris, FRANCE), all rights reserved. |
|
5 :contact: http://www.logilab.fr/ -- mailto:contact@logilab.fr |
|
6 """ |
|
7 __docformat__ = "restructuredtext en" |
|
8 |
|
9 from copy import deepcopy |
|
10 |
|
11 import simplejson |
|
12 |
|
13 from logilab.common.testlib import TestCase |
|
14 from logilab.common.pytest import nocoverage |
|
15 from logilab.common.umessage import message_from_string |
|
16 |
|
17 from cubicweb.devtools import init_test_database, TestServerConfiguration, ApptestConfiguration |
|
18 from cubicweb.devtools._apptest import TestEnvironment |
|
19 from cubicweb.devtools.fake import FakeRequest |
|
20 |
|
21 from cubicweb.dbapi import repo_connect, ConnectionProperties, ProgrammingError |
|
22 |
|
23 |
|
24 MAILBOX = [] |
|
25 class Email: |
|
26 def __init__(self, recipients, msg): |
|
27 self.recipients = recipients |
|
28 self.msg = msg |
|
29 |
|
30 @property |
|
31 def message(self): |
|
32 return message_from_string(self.msg) |
|
33 |
|
34 def __repr__(self): |
|
35 return '<Email to %s with subject %s>' % (','.join(self.recipients), |
|
36 self.message.get('Subject')) |
|
37 |
|
38 class MockSMTP: |
|
39 def __init__(self, server, port): |
|
40 pass |
|
41 def close(self): |
|
42 pass |
|
43 def sendmail(self, helo_addr, recipients, msg): |
|
44 MAILBOX.append(Email(recipients, msg)) |
|
45 |
|
46 from cubicweb.server import hookhelper |
|
47 hookhelper.SMTP = MockSMTP |
|
48 |
|
49 |
|
50 def get_versions(self, checkversions=False): |
|
51 """return the a dictionary containing cubes used by this application |
|
52 as key with their version as value, including cubicweb version. This is a |
|
53 public method, not requiring a session id. |
|
54 |
|
55 replace Repository.get_versions by this method if you don't want versions |
|
56 checking |
|
57 """ |
|
58 vcconf = {'cubicweb': self.config.cubicweb_version()} |
|
59 self.config.bootstrap_cubes() |
|
60 for pk in self.config.cubes(): |
|
61 version = self.config.template_version(pk) |
|
62 vcconf[pk] = version |
|
63 self.config._cubes = None |
|
64 return vcconf |
|
65 |
|
66 |
|
67 @property |
|
68 def late_binding_env(self): |
|
69 """builds TestEnvironment as late as possible""" |
|
70 if not hasattr(self, '_env'): |
|
71 self.__class__._env = TestEnvironment('data', configcls=self.configcls, |
|
72 requestcls=self.requestcls) |
|
73 return self._env |
|
74 |
|
75 |
|
76 class autoenv(type): |
|
77 """automatically set environment on EnvBasedTC subclasses if necessary |
|
78 """ |
|
79 def __new__(mcs, name, bases, classdict): |
|
80 env = classdict.get('env') |
|
81 # try to find env in one of the base classes |
|
82 if env is None: |
|
83 for base in bases: |
|
84 env = getattr(base, 'env', None) |
|
85 if env is not None: |
|
86 classdict['env'] = env |
|
87 break |
|
88 if not classdict.get('__abstract__') and not classdict.get('env'): |
|
89 classdict['env'] = late_binding_env |
|
90 return super(autoenv, mcs).__new__(mcs, name, bases, classdict) |
|
91 |
|
92 |
|
93 class EnvBasedTC(TestCase): |
|
94 """abstract class for test using an apptest environment |
|
95 """ |
|
96 __metaclass__ = autoenv |
|
97 __abstract__ = True |
|
98 env = None |
|
99 configcls = ApptestConfiguration |
|
100 requestcls = FakeRequest |
|
101 |
|
102 # user / session management ############################################### |
|
103 |
|
104 def user(self, req=None): |
|
105 if req is None: |
|
106 req = self.env.create_request() |
|
107 return self.env.cnx.user(req) |
|
108 else: |
|
109 return req.user |
|
110 |
|
111 def create_user(self, *args, **kwargs): |
|
112 return self.env.create_user(*args, **kwargs) |
|
113 |
|
114 def login(self, login): |
|
115 return self.env.login(login) |
|
116 |
|
117 def restore_connection(self): |
|
118 self.env.restore_connection() |
|
119 |
|
120 # db api ################################################################## |
|
121 |
|
122 @nocoverage |
|
123 def cursor(self, req=None): |
|
124 return self.env.cnx.cursor(req or self.request()) |
|
125 |
|
126 @nocoverage |
|
127 def execute(self, *args, **kwargs): |
|
128 return self.env.execute(*args, **kwargs) |
|
129 |
|
130 @nocoverage |
|
131 def commit(self): |
|
132 self.env.cnx.commit() |
|
133 |
|
134 @nocoverage |
|
135 def rollback(self): |
|
136 try: |
|
137 self.env.cnx.rollback() |
|
138 except ProgrammingError: |
|
139 pass |
|
140 |
|
141 # other utilities ######################################################### |
|
142 def set_debug(self, debugmode): |
|
143 from cubicweb.server import set_debug |
|
144 set_debug(debugmode) |
|
145 |
|
146 @property |
|
147 def config(self): |
|
148 return self.vreg.config |
|
149 |
|
150 def session(self): |
|
151 """return current server side session (using default manager account)""" |
|
152 return self.env.repo._sessions[self.env.cnx.sessionid] |
|
153 |
|
154 def request(self, *args, **kwargs): |
|
155 """return a web interface request""" |
|
156 return self.env.create_request(*args, **kwargs) |
|
157 |
|
158 @nocoverage |
|
159 def rset_and_req(self, *args, **kwargs): |
|
160 return self.env.get_rset_and_req(*args, **kwargs) |
|
161 |
|
162 def entity(self, rql, args=None, eidkey=None, req=None): |
|
163 return self.execute(rql, args, eidkey, req=req).get_entity(0, 0) |
|
164 |
|
165 def etype_instance(self, etype, req=None): |
|
166 req = req or self.request() |
|
167 e = self.env.vreg.etype_class(etype)(req, None, None) |
|
168 e.eid = None |
|
169 return e |
|
170 |
|
171 def add_entity(self, etype, **kwargs): |
|
172 rql = ['INSERT %s X' % etype] |
|
173 |
|
174 # dict for replacement in RQL Request |
|
175 rql_args = {} |
|
176 |
|
177 if kwargs: # |
|
178 rql.append(':') |
|
179 # dict to define new entities variables |
|
180 entities = {} |
|
181 |
|
182 # assignement part of the request |
|
183 sub_rql = [] |
|
184 for key, value in kwargs.iteritems(): |
|
185 # entities |
|
186 if hasattr(value, 'eid'): |
|
187 new_value = "%s__" % key.upper() |
|
188 |
|
189 entities[new_value] = value.eid |
|
190 rql_args[new_value] = value.eid |
|
191 |
|
192 sub_rql.append("X %s %s" % (key, new_value)) |
|
193 # final attributes |
|
194 else: |
|
195 sub_rql.append('X %s %%(%s)s' % (key, key)) |
|
196 rql_args[key] = value |
|
197 rql.append(', '.join(sub_rql)) |
|
198 |
|
199 |
|
200 if entities: |
|
201 rql.append('WHERE') |
|
202 # WHERE part of the request (to link entity to they eid) |
|
203 sub_rql = [] |
|
204 for key, value in entities.iteritems(): |
|
205 sub_rql.append("%s eid %%(%s)s" % (key, key)) |
|
206 rql.append(', '.join(sub_rql)) |
|
207 |
|
208 rql = ' '.join(rql) |
|
209 rset = self.execute(rql, rql_args) |
|
210 return rset.get_entity(0, 0) |
|
211 |
|
212 def set_option(self, optname, value): |
|
213 self.vreg.config.global_set_option(optname, value) |
|
214 |
|
215 def pviews(self, req, rset): |
|
216 return sorted((a.id, a.__class__) for a in self.vreg.possible_views(req, rset)) |
|
217 |
|
218 def pactions(self, req, rset, skipcategories=('addrelated', 'siteactions', 'useractions')): |
|
219 return [(a.id, a.__class__) for a in self.vreg.possible_vobjects('actions', req, rset) |
|
220 if a.category not in skipcategories] |
|
221 def pactionsdict(self, req, rset, skipcategories=('addrelated', 'siteactions', 'useractions')): |
|
222 res = {} |
|
223 for a in self.vreg.possible_vobjects('actions', req, rset): |
|
224 if a.category not in skipcategories: |
|
225 res.setdefault(a.category, []).append(a.__class__) |
|
226 return res |
|
227 |
|
228 def paddrelactions(self, req, rset): |
|
229 return [(a.id, a.__class__) for a in self.vreg.possible_vobjects('actions', req, rset) |
|
230 if a.category == 'addrelated'] |
|
231 |
|
232 def remote_call(self, fname, *args): |
|
233 """remote call simulation""" |
|
234 dump = simplejson.dumps |
|
235 args = [dump(arg) for arg in args] |
|
236 req = self.request(mode='remote', fname=fname, pageid='123', arg=args) |
|
237 ctrl = self.env.app.select_controller('json', req) |
|
238 return ctrl.publish(), req |
|
239 |
|
240 # default test setup and teardown ######################################### |
|
241 |
|
242 def setup_database(self): |
|
243 pass |
|
244 |
|
245 def setUp(self): |
|
246 self.restore_connection() |
|
247 session = self.session() |
|
248 #self.maxeid = self.execute('Any MAX(X)') |
|
249 session.set_pool() |
|
250 self.maxeid = session.system_sql('SELECT MAX(eid) FROM entities').fetchone()[0] |
|
251 self.app = self.env.app |
|
252 self.vreg = self.env.app.vreg |
|
253 self.schema = self.vreg.schema |
|
254 self.vreg.config.mode = 'test' |
|
255 # set default-dest-addrs to a dumb email address to avoid mailbox or |
|
256 # mail queue pollution |
|
257 self.set_option('default-dest-addrs', ['whatever']) |
|
258 self.setup_database() |
|
259 self.commit() |
|
260 MAILBOX[:] = [] # reset mailbox |
|
261 |
|
262 @nocoverage |
|
263 def tearDown(self): |
|
264 self.rollback() |
|
265 # self.env.restore_database() |
|
266 self.env.restore_connection() |
|
267 self.session().unsafe_execute('DELETE Any X WHERE X eid > %s' % self.maxeid) |
|
268 self.commit() |
|
269 |
|
270 |
|
271 # XXX |
|
272 try: |
|
273 from cubicweb.web import Redirect |
|
274 from urllib import unquote |
|
275 except ImportError: |
|
276 pass # cubicweb-web not installed |
|
277 else: |
|
278 class ControllerTC(EnvBasedTC): |
|
279 def setUp(self): |
|
280 super(ControllerTC, self).setUp() |
|
281 self.req = self.request() |
|
282 self.ctrl = self.env.app.select_controller('edit', self.req) |
|
283 |
|
284 def publish(self, req): |
|
285 assert req is self.ctrl.req |
|
286 try: |
|
287 result = self.ctrl.publish() |
|
288 req.cnx.commit() |
|
289 except Redirect: |
|
290 req.cnx.commit() |
|
291 raise |
|
292 return result |
|
293 |
|
294 def expect_redirect_publish(self, req=None): |
|
295 if req is not None: |
|
296 self.ctrl = self.env.app.select_controller('edit', req) |
|
297 else: |
|
298 req = self.req |
|
299 try: |
|
300 res = self.publish(req) |
|
301 except Redirect, ex: |
|
302 try: |
|
303 path, params = ex.location.split('?', 1) |
|
304 except: |
|
305 path, params = ex.location, "" |
|
306 req._url = path |
|
307 cleanup = lambda p: (p[0], unquote(p[1])) |
|
308 params = dict(cleanup(p.split('=', 1)) for p in params.split('&') if p) |
|
309 return req.relative_path(False), params # path.rsplit('/', 1)[-1], params |
|
310 else: |
|
311 self.fail('expected a Redirect exception') |
|
312 |
|
313 |
|
314 def make_late_binding_repo_property(attrname): |
|
315 @property |
|
316 def late_binding(self): |
|
317 """builds cnx as late as possible""" |
|
318 if not hasattr(self, attrname): |
|
319 # sets explicit test mode here to avoid autoreload |
|
320 from cubicweb.cwconfig import CubicWebConfiguration |
|
321 CubicWebConfiguration.mode = 'test' |
|
322 cls = self.__class__ |
|
323 config = self.repo_config or TestServerConfiguration('data') |
|
324 cls._repo, cls._cnx = init_test_database('sqlite', config=config) |
|
325 return getattr(self, attrname) |
|
326 return late_binding |
|
327 |
|
328 |
|
329 class autorepo(type): |
|
330 """automatically set repository on RepositoryBasedTC subclasses if necessary |
|
331 """ |
|
332 def __new__(mcs, name, bases, classdict): |
|
333 repo = classdict.get('repo') |
|
334 # try to find repo in one of the base classes |
|
335 if repo is None: |
|
336 for base in bases: |
|
337 repo = getattr(base, 'repo', None) |
|
338 if repo is not None: |
|
339 classdict['repo'] = repo |
|
340 break |
|
341 if name != 'RepositoryBasedTC' and not classdict.get('repo'): |
|
342 classdict['repo'] = make_late_binding_repo_property('_repo') |
|
343 classdict['cnx'] = make_late_binding_repo_property('_cnx') |
|
344 return super(autorepo, mcs).__new__(mcs, name, bases, classdict) |
|
345 |
|
346 |
|
347 class RepositoryBasedTC(TestCase): |
|
348 """abstract class for test using direct repository connections |
|
349 """ |
|
350 __metaclass__ = autorepo |
|
351 repo_config = None # set a particular config instance if necessary |
|
352 |
|
353 # user / session management ############################################### |
|
354 |
|
355 def create_user(self, user, groups=('users',), password=None, commit=True): |
|
356 if password is None: |
|
357 password = user |
|
358 eid = self.execute('INSERT EUser X: X login %(x)s, X upassword %(p)s,' |
|
359 'X in_state S WHERE S name "activated"', |
|
360 {'x': unicode(user), 'p': password})[0][0] |
|
361 groups = ','.join(repr(group) for group in groups) |
|
362 self.execute('SET X in_group Y WHERE X eid %%(x)s, Y name IN (%s)' % groups, |
|
363 {'x': eid}) |
|
364 if commit: |
|
365 self.commit() |
|
366 self.session.reset_pool() |
|
367 return eid |
|
368 |
|
369 def login(self, login, password=None): |
|
370 cnx = repo_connect(self.repo, unicode(login), password or login, |
|
371 ConnectionProperties('inmemory')) |
|
372 self.cnxs.append(cnx) |
|
373 return cnx |
|
374 |
|
375 def current_session(self): |
|
376 return self.repo._sessions[self.cnxs[-1].sessionid] |
|
377 |
|
378 def restore_connection(self): |
|
379 assert len(self.cnxs) == 1, self.cnxs |
|
380 cnx = self.cnxs.pop() |
|
381 try: |
|
382 cnx.close() |
|
383 except Exception, ex: |
|
384 print "exception occured while closing connection", ex |
|
385 |
|
386 # db api ################################################################## |
|
387 |
|
388 def execute(self, rql, args=None, eid_key=None): |
|
389 assert self.session.id == self.cnxid |
|
390 rset = self.__execute(self.cnxid, rql, args, eid_key) |
|
391 rset.vreg = self.vreg |
|
392 rset.req = self.session |
|
393 # call to set_pool is necessary to avoid pb when using |
|
394 # application entities for convenience |
|
395 self.session.set_pool() |
|
396 return rset |
|
397 |
|
398 def commit(self): |
|
399 self.__commit(self.cnxid) |
|
400 self.session.set_pool() |
|
401 |
|
402 def rollback(self): |
|
403 self.__rollback(self.cnxid) |
|
404 self.session.set_pool() |
|
405 |
|
406 def close(self): |
|
407 self.__close(self.cnxid) |
|
408 |
|
409 # other utilities ######################################################### |
|
410 def set_debug(self, debugmode): |
|
411 from cubicweb.server import set_debug |
|
412 set_debug(debugmode) |
|
413 |
|
414 def set_option(self, optname, value): |
|
415 self.vreg.config.global_set_option(optname, value) |
|
416 |
|
417 def add_entity(self, etype, **kwargs): |
|
418 restrictions = ', '.join('X %s %%(%s)s' % (key, key) for key in kwargs) |
|
419 rql = 'INSERT %s X' % etype |
|
420 if kwargs: |
|
421 rql += ': %s' % ', '.join('X %s %%(%s)s' % (key, key) for key in kwargs) |
|
422 rset = self.execute(rql, kwargs) |
|
423 return rset.get_entity(0, 0) |
|
424 |
|
425 def default_user_password(self): |
|
426 config = self.repo.config #TestConfiguration('data') |
|
427 user = unicode(config.sources()['system']['db-user']) |
|
428 passwd = config.sources()['system']['db-password'] |
|
429 return user, passwd |
|
430 |
|
431 def close_connections(self): |
|
432 for cnx in self.cnxs: |
|
433 try: |
|
434 cnx.rollback() |
|
435 cnx.close() |
|
436 except: |
|
437 continue |
|
438 self.cnxs = [] |
|
439 |
|
440 pactions = EnvBasedTC.pactions.im_func |
|
441 pactionsdict = EnvBasedTC.pactionsdict.im_func |
|
442 |
|
443 # default test setup and teardown ######################################### |
|
444 copy_schema = False |
|
445 |
|
446 def _prepare(self): |
|
447 MAILBOX[:] = [] # reset mailbox |
|
448 if hasattr(self, 'cnxid'): |
|
449 return |
|
450 repo = self.repo |
|
451 self.__execute = repo.execute |
|
452 self.__commit = repo.commit |
|
453 self.__rollback = repo.rollback |
|
454 self.__close = repo.close |
|
455 self.cnxid = repo.connect(*self.default_user_password()) |
|
456 self.session = repo._sessions[self.cnxid] |
|
457 # XXX copy schema since hooks may alter it and it may be not fully |
|
458 # cleaned (missing some schema synchronization support) |
|
459 try: |
|
460 origschema = repo.__schema |
|
461 except AttributeError: |
|
462 origschema = repo.schema |
|
463 repo.__schema = origschema |
|
464 if self.copy_schema: |
|
465 repo.schema = deepcopy(origschema) |
|
466 repo.set_schema(repo.schema) # reset hooks |
|
467 repo.vreg.update_schema(repo.schema) |
|
468 self.cnxs = [] |
|
469 # reset caches, they may introduce bugs among tests |
|
470 repo._type_source_cache = {} |
|
471 repo._extid_cache = {} |
|
472 repo.querier._rql_cache = {} |
|
473 for source in repo.sources: |
|
474 source.reset_caches() |
|
475 for s in repo.sources: |
|
476 if hasattr(s, '_cache'): |
|
477 s._cache = {} |
|
478 |
|
479 @property |
|
480 def config(self): |
|
481 return self.repo.config |
|
482 |
|
483 @property |
|
484 def vreg(self): |
|
485 return self.repo.vreg |
|
486 |
|
487 @property |
|
488 def schema(self): |
|
489 return self.repo.schema |
|
490 |
|
491 def setUp(self): |
|
492 self._prepare() |
|
493 self.session.set_pool() |
|
494 self.maxeid = self.session.system_sql('SELECT MAX(eid) FROM entities').fetchone()[0] |
|
495 #self.maxeid = self.execute('Any MAX(X)') |
|
496 |
|
497 def tearDown(self, close=True): |
|
498 self.close_connections() |
|
499 self.rollback() |
|
500 self.session.unsafe_execute('DELETE Any X WHERE X eid > %(x)s', {'x': self.maxeid}) |
|
501 self.commit() |
|
502 if close: |
|
503 self.close() |
|
504 |