1 """This module provides misc utilities to test instances |
|
2 |
|
3 :organization: Logilab |
|
4 :copyright: 2001-2009 LOGILAB S.A. (Paris, FRANCE), license is LGPL v2. |
|
5 :contact: http://www.logilab.fr/ -- mailto:contact@logilab.fr |
|
6 :license: GNU Lesser General Public License, v2.1 - http://www.gnu.org/licenses |
|
7 """ |
|
8 __docformat__ = "restructuredtext en" |
|
9 |
|
10 from copy import deepcopy |
|
11 |
|
12 import simplejson |
|
13 |
|
14 from logilab.common.testlib import TestCase |
|
15 from logilab.common.pytest import nocoverage |
|
16 from logilab.common.umessage import message_from_string |
|
17 |
|
18 from logilab.common.deprecation import deprecated |
|
19 |
|
20 from cubicweb.devtools import init_test_database, TestServerConfiguration, ApptestConfiguration |
|
21 from cubicweb.devtools._apptest import TestEnvironment |
|
22 from cubicweb.devtools.fake import FakeRequest |
|
23 |
|
24 from cubicweb.dbapi import repo_connect, ConnectionProperties, ProgrammingError |
|
25 |
|
26 |
|
27 MAILBOX = [] |
|
28 class Email: |
|
29 def __init__(self, recipients, msg): |
|
30 self.recipients = recipients |
|
31 self.msg = msg |
|
32 |
|
33 @property |
|
34 def message(self): |
|
35 return message_from_string(self.msg) |
|
36 |
|
37 @property |
|
38 def subject(self): |
|
39 return self.message.get('Subject') |
|
40 |
|
41 @property |
|
42 def content(self): |
|
43 return self.message.get_payload(decode=True) |
|
44 |
|
45 def __repr__(self): |
|
46 return '<Email to %s with subject %s>' % (','.join(self.recipients), |
|
47 self.message.get('Subject')) |
|
48 |
|
49 class MockSMTP: |
|
50 def __init__(self, server, port): |
|
51 pass |
|
52 def close(self): |
|
53 pass |
|
54 def sendmail(self, helo_addr, recipients, msg): |
|
55 MAILBOX.append(Email(recipients, msg)) |
|
56 |
|
57 from cubicweb import cwconfig |
|
58 cwconfig.SMTP = MockSMTP |
|
59 |
|
60 |
|
61 def get_versions(self, checkversions=False): |
|
62 """return the a dictionary containing cubes used by this instance |
|
63 as key with their version as value, including cubicweb version. This is a |
|
64 public method, not requiring a session id. |
|
65 |
|
66 replace Repository.get_versions by this method if you don't want versions |
|
67 checking |
|
68 """ |
|
69 vcconf = {'cubicweb': self.config.cubicweb_version()} |
|
70 self.config.bootstrap_cubes() |
|
71 for pk in self.config.cubes(): |
|
72 version = self.config.cube_version(pk) |
|
73 vcconf[pk] = version |
|
74 self.config._cubes = None |
|
75 return vcconf |
|
76 |
|
77 |
|
78 @property |
|
79 def late_binding_env(self): |
|
80 """builds TestEnvironment as late as possible""" |
|
81 if not hasattr(self, '_env'): |
|
82 self.__class__._env = TestEnvironment('data', configcls=self.configcls, |
|
83 requestcls=self.requestcls) |
|
84 return self._env |
|
85 |
|
86 |
|
87 class autoenv(type): |
|
88 """automatically set environment on EnvBasedTC subclasses if necessary |
|
89 """ |
|
90 def __new__(mcs, name, bases, classdict): |
|
91 env = classdict.get('env') |
|
92 # try to find env in one of the base classes |
|
93 if env is None: |
|
94 for base in bases: |
|
95 env = getattr(base, 'env', None) |
|
96 if env is not None: |
|
97 classdict['env'] = env |
|
98 break |
|
99 if not classdict.get('__abstract__') and not classdict.get('env'): |
|
100 classdict['env'] = late_binding_env |
|
101 return super(autoenv, mcs).__new__(mcs, name, bases, classdict) |
|
102 |
|
103 |
|
104 class EnvBasedTC(TestCase): |
|
105 """abstract class for test using an apptest environment |
|
106 """ |
|
107 __metaclass__ = autoenv |
|
108 __abstract__ = True |
|
109 env = None |
|
110 configcls = ApptestConfiguration |
|
111 requestcls = FakeRequest |
|
112 |
|
113 # user / session management ############################################### |
|
114 |
|
115 def user(self, req=None): |
|
116 if req is None: |
|
117 req = self.env.create_request() |
|
118 return self.env.cnx.user(req) |
|
119 else: |
|
120 return req.user |
|
121 |
|
122 def create_user(self, *args, **kwargs): |
|
123 return self.env.create_user(*args, **kwargs) |
|
124 |
|
125 def login(self, login, password=None): |
|
126 return self.env.login(login, password) |
|
127 |
|
128 def restore_connection(self): |
|
129 self.env.restore_connection() |
|
130 |
|
131 # db api ################################################################## |
|
132 |
|
133 @nocoverage |
|
134 def cursor(self, req=None): |
|
135 return self.env.cnx.cursor(req or self.request()) |
|
136 |
|
137 @nocoverage |
|
138 def execute(self, *args, **kwargs): |
|
139 return self.env.execute(*args, **kwargs) |
|
140 |
|
141 @nocoverage |
|
142 def commit(self): |
|
143 self.env.cnx.commit() |
|
144 |
|
145 @nocoverage |
|
146 def rollback(self): |
|
147 try: |
|
148 self.env.cnx.rollback() |
|
149 except ProgrammingError: |
|
150 pass |
|
151 |
|
152 # other utilities ######################################################### |
|
153 def set_debug(self, debugmode): |
|
154 from cubicweb.server import set_debug |
|
155 set_debug(debugmode) |
|
156 |
|
157 @property |
|
158 def config(self): |
|
159 return self.vreg.config |
|
160 |
|
161 def session(self): |
|
162 """return current server side session (using default manager account)""" |
|
163 return self.env.repo._sessions[self.env.cnx.sessionid] |
|
164 |
|
165 def request(self, *args, **kwargs): |
|
166 """return a web interface request""" |
|
167 return self.env.create_request(*args, **kwargs) |
|
168 |
|
169 @nocoverage |
|
170 def rset_and_req(self, *args, **kwargs): |
|
171 return self.env.get_rset_and_req(*args, **kwargs) |
|
172 |
|
173 def entity(self, rql, args=None, eidkey=None, req=None): |
|
174 return self.execute(rql, args, eidkey, req=req).get_entity(0, 0) |
|
175 |
|
176 def etype_instance(self, etype, req=None): |
|
177 req = req or self.request() |
|
178 e = self.env.vreg['etypes'].etype_class(etype)(req) |
|
179 e.eid = None |
|
180 return e |
|
181 |
|
182 def add_entity(self, etype, **kwargs): |
|
183 rql = ['INSERT %s X' % etype] |
|
184 |
|
185 # dict for replacement in RQL Request |
|
186 rql_args = {} |
|
187 |
|
188 if kwargs: # |
|
189 rql.append(':') |
|
190 # dict to define new entities variables |
|
191 entities = {} |
|
192 |
|
193 # assignement part of the request |
|
194 sub_rql = [] |
|
195 for key, value in kwargs.iteritems(): |
|
196 # entities |
|
197 if hasattr(value, 'eid'): |
|
198 new_value = "%s__" % key.upper() |
|
199 |
|
200 entities[new_value] = value.eid |
|
201 rql_args[new_value] = value.eid |
|
202 |
|
203 sub_rql.append("X %s %s" % (key, new_value)) |
|
204 # final attributes |
|
205 else: |
|
206 sub_rql.append('X %s %%(%s)s' % (key, key)) |
|
207 rql_args[key] = value |
|
208 rql.append(', '.join(sub_rql)) |
|
209 |
|
210 |
|
211 if entities: |
|
212 rql.append('WHERE') |
|
213 # WHERE part of the request (to link entity to they eid) |
|
214 sub_rql = [] |
|
215 for key, value in entities.iteritems(): |
|
216 sub_rql.append("%s eid %%(%s)s" % (key, key)) |
|
217 rql.append(', '.join(sub_rql)) |
|
218 |
|
219 rql = ' '.join(rql) |
|
220 rset = self.execute(rql, rql_args) |
|
221 return rset.get_entity(0, 0) |
|
222 |
|
223 def set_option(self, optname, value): |
|
224 self.vreg.config.global_set_option(optname, value) |
|
225 |
|
226 def pviews(self, req, rset): |
|
227 return sorted((a.id, a.__class__) for a in self.vreg['views'].possible_views(req, rset=rset)) |
|
228 |
|
229 def pactions(self, req, rset, skipcategories=('addrelated', 'siteactions', 'useractions')): |
|
230 return [(a.id, a.__class__) for a in self.vreg['actions'].possible_vobjects(req, rset=rset) |
|
231 if a.category not in skipcategories] |
|
232 |
|
233 def pactions_by_cats(self, req, rset, categories=('addrelated',)): |
|
234 return [(a.id, a.__class__) for a in self.vreg['actions'].possible_vobjects(req, rset=rset) |
|
235 if a.category in categories] |
|
236 |
|
237 paddrelactions = deprecated()(pactions_by_cats) |
|
238 |
|
239 def pactionsdict(self, req, rset, skipcategories=('addrelated', 'siteactions', 'useractions')): |
|
240 res = {} |
|
241 for a in self.vreg['actions'].possible_vobjects(req, rset=rset): |
|
242 if a.category not in skipcategories: |
|
243 res.setdefault(a.category, []).append(a.__class__) |
|
244 return res |
|
245 |
|
246 |
|
247 def remote_call(self, fname, *args): |
|
248 """remote call simulation""" |
|
249 dump = simplejson.dumps |
|
250 args = [dump(arg) for arg in args] |
|
251 req = self.request(fname=fname, pageid='123', arg=args) |
|
252 ctrl = self.vreg['controllers'].select('json', req) |
|
253 return ctrl.publish(), req |
|
254 |
|
255 # default test setup and teardown ######################################### |
|
256 |
|
257 def setup_database(self): |
|
258 pass |
|
259 |
|
260 def setUp(self): |
|
261 self.restore_connection() |
|
262 session = self.session() |
|
263 #self.maxeid = self.execute('Any MAX(X)') |
|
264 session.set_pool() |
|
265 self.maxeid = session.system_sql('SELECT MAX(eid) FROM entities').fetchone()[0] |
|
266 self.app = self.env.app |
|
267 self.vreg = self.env.app.vreg |
|
268 self.schema = self.vreg.schema |
|
269 self.vreg.config.mode = 'test' |
|
270 # set default-dest-addrs to a dumb email address to avoid mailbox or |
|
271 # mail queue pollution |
|
272 self.set_option('default-dest-addrs', ['whatever']) |
|
273 self.setup_database() |
|
274 self.commit() |
|
275 MAILBOX[:] = [] # reset mailbox |
|
276 |
|
277 @nocoverage |
|
278 def tearDown(self): |
|
279 self.rollback() |
|
280 # self.env.restore_database() |
|
281 self.env.restore_connection() |
|
282 self.session().unsafe_execute('DELETE Any X WHERE X eid > %s' % self.maxeid) |
|
283 self.commit() |
|
284 |
|
285 # global resources accessors ############################################### |
|
286 |
|
287 # XXX |
|
288 try: |
|
289 from cubicweb.web import Redirect |
|
290 from urllib import unquote |
|
291 except ImportError: |
|
292 pass # cubicweb-web not installed |
|
293 else: |
|
294 class ControllerTC(EnvBasedTC): |
|
295 def setUp(self): |
|
296 super(ControllerTC, self).setUp() |
|
297 self.req = self.request() |
|
298 self.ctrl = self.vreg['controllers'].select('edit', self.req) |
|
299 |
|
300 def publish(self, req): |
|
301 assert req is self.ctrl.req |
|
302 try: |
|
303 result = self.ctrl.publish() |
|
304 req.cnx.commit() |
|
305 except Redirect: |
|
306 req.cnx.commit() |
|
307 raise |
|
308 return result |
|
309 |
|
310 def expect_redirect_publish(self, req=None): |
|
311 if req is not None: |
|
312 self.ctrl = self.vreg['controllers'].select('edit', req) |
|
313 else: |
|
314 req = self.req |
|
315 try: |
|
316 res = self.publish(req) |
|
317 except Redirect, ex: |
|
318 try: |
|
319 path, params = ex.location.split('?', 1) |
|
320 except: |
|
321 path, params = ex.location, "" |
|
322 req._url = path |
|
323 cleanup = lambda p: (p[0], unquote(p[1])) |
|
324 params = dict(cleanup(p.split('=', 1)) for p in params.split('&') if p) |
|
325 return req.relative_path(False), params # path.rsplit('/', 1)[-1], params |
|
326 else: |
|
327 self.fail('expected a Redirect exception') |
|
328 |
|
329 |
|
330 def make_late_binding_repo_property(attrname): |
|
331 @property |
|
332 def late_binding(self): |
|
333 """builds cnx as late as possible""" |
|
334 if not hasattr(self, attrname): |
|
335 # sets explicit test mode here to avoid autoreload |
|
336 from cubicweb.cwconfig import CubicWebConfiguration |
|
337 CubicWebConfiguration.mode = 'test' |
|
338 cls = self.__class__ |
|
339 config = self.repo_config or TestServerConfiguration('data') |
|
340 cls._repo, cls._cnx = init_test_database('sqlite', config=config) |
|
341 return getattr(self, attrname) |
|
342 return late_binding |
|
343 |
|
344 |
|
345 class autorepo(type): |
|
346 """automatically set repository on RepositoryBasedTC subclasses if necessary |
|
347 """ |
|
348 def __new__(mcs, name, bases, classdict): |
|
349 repo = classdict.get('repo') |
|
350 # try to find repo in one of the base classes |
|
351 if repo is None: |
|
352 for base in bases: |
|
353 repo = getattr(base, 'repo', None) |
|
354 if repo is not None: |
|
355 classdict['repo'] = repo |
|
356 break |
|
357 if name != 'RepositoryBasedTC' and not classdict.get('repo'): |
|
358 classdict['repo'] = make_late_binding_repo_property('_repo') |
|
359 classdict['cnx'] = make_late_binding_repo_property('_cnx') |
|
360 return super(autorepo, mcs).__new__(mcs, name, bases, classdict) |
|
361 |
|
362 |
|
363 class RepositoryBasedTC(TestCase): |
|
364 """abstract class for test using direct repository connections |
|
365 """ |
|
366 __metaclass__ = autorepo |
|
367 repo_config = None # set a particular config instance if necessary |
|
368 |
|
369 # user / session management ############################################### |
|
370 |
|
371 def create_user(self, user, groups=('users',), password=None, commit=True): |
|
372 if password is None: |
|
373 password = user |
|
374 eid = self.execute('INSERT CWUser X: X login %(x)s, X upassword %(p)s,' |
|
375 'X in_state S WHERE S name "activated"', |
|
376 {'x': unicode(user), 'p': password})[0][0] |
|
377 groups = ','.join(repr(group) for group in groups) |
|
378 self.execute('SET X in_group Y WHERE X eid %%(x)s, Y name IN (%s)' % groups, |
|
379 {'x': eid}) |
|
380 if commit: |
|
381 self.commit() |
|
382 self.session.reset_pool() |
|
383 return eid |
|
384 |
|
385 def login(self, login, password=None): |
|
386 cnx = repo_connect(self.repo, unicode(login), password or login, |
|
387 ConnectionProperties('inmemory')) |
|
388 self.cnxs.append(cnx) |
|
389 return cnx |
|
390 |
|
391 def current_session(self): |
|
392 return self.repo._sessions[self.cnxs[-1].sessionid] |
|
393 |
|
394 def restore_connection(self): |
|
395 assert len(self.cnxs) == 1, self.cnxs |
|
396 cnx = self.cnxs.pop() |
|
397 try: |
|
398 cnx.close() |
|
399 except Exception, ex: |
|
400 print "exception occured while closing connection", ex |
|
401 |
|
402 # db api ################################################################## |
|
403 |
|
404 def execute(self, rql, args=None, eid_key=None): |
|
405 assert self.session.id == self.cnxid |
|
406 rset = self.__execute(self.cnxid, rql, args, eid_key) |
|
407 rset.vreg = self.vreg |
|
408 rset.req = self.session |
|
409 # call to set_pool is necessary to avoid pb when using |
|
410 # instance entities for convenience |
|
411 self.session.set_pool() |
|
412 return rset |
|
413 |
|
414 def commit(self): |
|
415 self.__commit(self.cnxid) |
|
416 self.session.set_pool() |
|
417 |
|
418 def rollback(self): |
|
419 self.__rollback(self.cnxid) |
|
420 self.session.set_pool() |
|
421 |
|
422 def close(self): |
|
423 self.__close(self.cnxid) |
|
424 |
|
425 # other utilities ######################################################### |
|
426 |
|
427 def set_debug(self, debugmode): |
|
428 from cubicweb.server import set_debug |
|
429 set_debug(debugmode) |
|
430 |
|
431 def set_option(self, optname, value): |
|
432 self.vreg.config.global_set_option(optname, value) |
|
433 |
|
434 def add_entity(self, etype, **kwargs): |
|
435 restrictions = ', '.join('X %s %%(%s)s' % (key, key) for key in kwargs) |
|
436 rql = 'INSERT %s X' % etype |
|
437 if kwargs: |
|
438 rql += ': %s' % ', '.join('X %s %%(%s)s' % (key, key) for key in kwargs) |
|
439 rset = self.execute(rql, kwargs) |
|
440 return rset.get_entity(0, 0) |
|
441 |
|
442 def default_user_password(self): |
|
443 config = self.repo.config #TestConfiguration('data') |
|
444 user = unicode(config.sources()['system']['db-user']) |
|
445 passwd = config.sources()['system']['db-password'] |
|
446 return user, passwd |
|
447 |
|
448 def close_connections(self): |
|
449 for cnx in self.cnxs: |
|
450 try: |
|
451 cnx.rollback() |
|
452 cnx.close() |
|
453 except: |
|
454 continue |
|
455 self.cnxs = [] |
|
456 |
|
457 pactions = EnvBasedTC.pactions.im_func |
|
458 pactionsdict = EnvBasedTC.pactionsdict.im_func |
|
459 |
|
460 # default test setup and teardown ######################################### |
|
461 |
|
462 def _prepare(self): |
|
463 MAILBOX[:] = [] # reset mailbox |
|
464 if hasattr(self, 'cnxid'): |
|
465 return |
|
466 repo = self.repo |
|
467 self.__execute = repo.execute |
|
468 self.__commit = repo.commit |
|
469 self.__rollback = repo.rollback |
|
470 self.__close = repo.close |
|
471 self.cnxid = self.cnx.sessionid |
|
472 self.session = repo._sessions[self.cnxid] |
|
473 self.cnxs = [] |
|
474 # reset caches, they may introduce bugs among tests |
|
475 repo._type_source_cache = {} |
|
476 repo._extid_cache = {} |
|
477 repo.querier._rql_cache = {} |
|
478 for source in repo.sources: |
|
479 source.reset_caches() |
|
480 for s in repo.sources: |
|
481 if hasattr(s, '_cache'): |
|
482 s._cache = {} |
|
483 |
|
484 @property |
|
485 def config(self): |
|
486 return self.repo.config |
|
487 |
|
488 @property |
|
489 def vreg(self): |
|
490 return self.repo.vreg |
|
491 |
|
492 @property |
|
493 def schema(self): |
|
494 return self.repo.schema |
|
495 |
|
496 def setUp(self): |
|
497 self._prepare() |
|
498 self.session.set_pool() |
|
499 self.maxeid = self.session.system_sql('SELECT MAX(eid) FROM entities').fetchone()[0] |
|
500 |
|
501 def tearDown(self): |
|
502 self.close_connections() |
|
503 self.rollback() |
|
504 self.session.unsafe_execute('DELETE Any X WHERE X eid > %(x)s', {'x': self.maxeid}) |
|
505 self.commit() |
|
506 |
|