|
1 """Hidden internals for the devtools.apptest module |
|
2 |
|
3 :organization: Logilab |
|
4 :copyright: 2001-2008 LOGILAB S.A. (Paris, FRANCE), all rights reserved. |
|
5 :contact: http://www.logilab.fr/ -- mailto:contact@logilab.fr |
|
6 """ |
|
7 __docformat__ = "restructuredtext en" |
|
8 |
|
9 import sys, traceback |
|
10 |
|
11 from logilab.common.pytest import pause_tracing, resume_tracing |
|
12 |
|
13 import yams.schema |
|
14 |
|
15 from cubicweb.dbapi import repo_connect, ConnectionProperties, ProgrammingError |
|
16 from cubicweb.cwvreg import CubicWebRegistry |
|
17 |
|
18 from cubicweb.web.application import CubicWebPublisher |
|
19 from cubicweb.web import Redirect |
|
20 |
|
21 from cubicweb.devtools import ApptestConfiguration, init_test_database |
|
22 from cubicweb.devtools.fake import FakeRequest |
|
23 |
|
24 SYSTEM_ENTITIES = ('EGroup', 'EUser', |
|
25 'EFRDef', 'ENFRDef', |
|
26 'EConstraint', 'EConstraintType', 'EProperty', |
|
27 'EEType', 'ERType', |
|
28 'State', 'Transition', 'TrInfo', |
|
29 'RQLExpression', |
|
30 ) |
|
31 SYSTEM_RELATIONS = ( |
|
32 # virtual relation |
|
33 'identity', |
|
34 # metadata |
|
35 'is', 'is_instance_of', 'owned_by', 'created_by', 'specializes', |
|
36 # workflow related |
|
37 'state_of', 'transition_of', 'initial_state', 'allowed_transition', |
|
38 'destination_state', 'in_state', 'wf_info_for', 'from_state', 'to_state', |
|
39 'condition', |
|
40 # permission |
|
41 'in_group', 'require_group', 'require_permission', |
|
42 'read_permission', 'update_permission', 'delete_permission', 'add_permission', |
|
43 # eproperty |
|
44 'for_user', |
|
45 # schema definition |
|
46 'relation_type', 'from_entity', 'to_entity', |
|
47 'constrained_by', 'cstrtype', 'widget', |
|
48 # deducted from other relations |
|
49 'primary_email', |
|
50 ) |
|
51 |
|
52 def unprotected_entities(app_schema, strict=False): |
|
53 """returned a Set of each non final entity type, excluding EGroup, and EUser... |
|
54 """ |
|
55 if strict: |
|
56 protected_entities = yams.schema.BASE_TYPES |
|
57 else: |
|
58 protected_entities = yams.schema.BASE_TYPES.union(set(SYSTEM_ENTITIES)) |
|
59 entities = set(app_schema.entities()) |
|
60 return entities - protected_entities |
|
61 |
|
62 |
|
63 def ignore_relations(*relations): |
|
64 SYSTEM_RELATIONS += relations |
|
65 |
|
66 class TestEnvironment(object): |
|
67 """TestEnvironment defines a context (e.g. a config + a given connection) in |
|
68 which the tests are executed |
|
69 """ |
|
70 |
|
71 def __init__(self, appid, reporter=None, verbose=False, |
|
72 configcls=ApptestConfiguration, requestcls=FakeRequest): |
|
73 config = configcls(appid) |
|
74 self.requestcls = requestcls |
|
75 self.cnx = None |
|
76 config.db_perms = False |
|
77 source = config.sources()['system'] |
|
78 if verbose: |
|
79 print "init test database ..." |
|
80 self.vreg = vreg = CubicWebRegistry(config) |
|
81 self.admlogin = source['db-user'] |
|
82 # restore database <=> init database |
|
83 self.restore_database() |
|
84 if verbose: |
|
85 print "init done" |
|
86 login = source['db-user'] |
|
87 config.repository = lambda x=None: self.repo |
|
88 self.app = CubicWebPublisher(config, vreg=vreg) |
|
89 self.verbose = verbose |
|
90 schema = self.vreg.schema |
|
91 # else we may run into problems since email address are ususally share in app tests |
|
92 # XXX should not be necessary anymore |
|
93 schema.rschema('primary_email').set_rproperty('EUser', 'EmailAddress', 'composite', False) |
|
94 self.deletable_entities = unprotected_entities(schema) |
|
95 |
|
96 def restore_database(self): |
|
97 """called by unittests' tearDown to restore the original database |
|
98 """ |
|
99 try: |
|
100 pause_tracing() |
|
101 if self.cnx: |
|
102 self.cnx.close() |
|
103 source = self.vreg.config.sources()['system'] |
|
104 self.repo, self.cnx = init_test_database(driver=source['db-driver'], |
|
105 vreg=self.vreg) |
|
106 self._orig_cnx = self.cnx |
|
107 resume_tracing() |
|
108 except: |
|
109 resume_tracing() |
|
110 traceback.print_exc() |
|
111 sys.exit(1) |
|
112 # XXX cnx decoration is usually done by the repository authentication manager, |
|
113 # necessary in authentication tests |
|
114 self.cnx.vreg = self.vreg |
|
115 self.cnx.login = source['db-user'] |
|
116 self.cnx.password = source['db-password'] |
|
117 |
|
118 |
|
119 def create_user(self, login, groups=('users',), req=None): |
|
120 req = req or self.create_request() |
|
121 cursor = self._orig_cnx.cursor(req) |
|
122 rset = cursor.execute('INSERT EUser X: X login %(login)s, X upassword %(passwd)s,' |
|
123 'X in_state S WHERE S name "activated"', |
|
124 {'login': unicode(login), 'passwd': login.encode('utf8')}) |
|
125 user = rset.get_entity(0, 0) |
|
126 cursor.execute('SET X in_group G WHERE X eid %%(x)s, G name IN(%s)' |
|
127 % ','.join(repr(g) for g in groups), |
|
128 {'x': user.eid}, 'x') |
|
129 user.clear_related_cache('in_group', 'subject') |
|
130 self._orig_cnx.commit() |
|
131 return user |
|
132 |
|
133 def login(self, login): |
|
134 if login == self.admlogin: |
|
135 self.restore_connection() |
|
136 else: |
|
137 self.cnx = repo_connect(self.repo, unicode(login), str(login), |
|
138 ConnectionProperties('inmemory')) |
|
139 if login == self.vreg.config.anonymous_user()[0]: |
|
140 self.cnx.anonymous_connection = True |
|
141 return self.cnx |
|
142 |
|
143 def restore_connection(self): |
|
144 if not self.cnx is self._orig_cnx: |
|
145 try: |
|
146 self.cnx.close() |
|
147 except ProgrammingError: |
|
148 pass # already closed |
|
149 self.cnx = self._orig_cnx |
|
150 |
|
151 ############################################################################ |
|
152 |
|
153 def execute(self, rql, args=None, eidkey=None, req=None): |
|
154 """executes <rql>, builds a resultset, and returns a couple (rset, req) |
|
155 where req is a FakeRequest |
|
156 """ |
|
157 req = req or self.create_request(rql=rql) |
|
158 return self.cnx.cursor(req).execute(unicode(rql), args, eidkey) |
|
159 |
|
160 def create_request(self, rql=None, **kwargs): |
|
161 """executes <rql>, builds a resultset, and returns a |
|
162 couple (rset, req) where req is a FakeRequest |
|
163 """ |
|
164 if rql: |
|
165 kwargs['rql'] = rql |
|
166 req = self.requestcls(self.vreg, form=kwargs) |
|
167 req.set_connection(self.cnx) |
|
168 return req |
|
169 |
|
170 def get_rset_and_req(self, rql, optional_args=None, args=None, eidkey=None): |
|
171 """executes <rql>, builds a resultset, and returns a |
|
172 couple (rset, req) where req is a FakeRequest |
|
173 """ |
|
174 return (self.execute(rql, args, eidkey), |
|
175 self.create_request(rql=rql, **optional_args or {})) |
|
176 |
|
177 def check_view(self, rql, vid, optional_args, template='main'): |
|
178 """checks if vreg.view() raises an exception in this environment |
|
179 |
|
180 If any exception is raised in this method, it will be considered |
|
181 as a TestFailure |
|
182 """ |
|
183 return self.call_view(vid, rql, |
|
184 template=template, optional_args=optional_args) |
|
185 |
|
186 def call_view(self, vid, rql, template='main', optional_args=None): |
|
187 """shortcut for self.vreg.view()""" |
|
188 assert template |
|
189 if optional_args is None: |
|
190 optional_args = {} |
|
191 optional_args['vid'] = vid |
|
192 req = self.create_request(rql=rql, **optional_args) |
|
193 return self.vreg.main_template(req, template) |
|
194 |
|
195 def call_edit(self, req): |
|
196 """shortcut for self.app.edit()""" |
|
197 controller = self.app.select_controller('edit', req) |
|
198 try: |
|
199 controller.publish() |
|
200 except Redirect: |
|
201 result = 'success' |
|
202 else: |
|
203 raise Exception('edit should raise Redirect on success') |
|
204 req.cnx.commit() |
|
205 return result |
|
206 |
|
207 def iter_possible_views(self, req, rset): |
|
208 """returns a list of possible vids for <rql>""" |
|
209 for view in self.vreg.possible_views(req, rset): |
|
210 if view.category == 'startupview': |
|
211 continue |
|
212 yield view.id |
|
213 if rset.rowcount == 1: |
|
214 yield 'edition' |
|
215 |
|
216 def iter_startup_views(self, req): |
|
217 """returns the list of startup views""" |
|
218 for view in self.vreg.possible_views(req, None): |
|
219 if view.category != 'startupview': |
|
220 continue |
|
221 yield view.id |
|
222 |
|
223 def iter_possible_actions(self, req, rset): |
|
224 """returns a list of possible vids for <rql>""" |
|
225 for action in self.vreg.possible_vobjects('actions', req, rset): |
|
226 yield action |
|
227 |
|
228 class ExistingTestEnvironment(TestEnvironment): |
|
229 |
|
230 def __init__(self, appid, sourcefile, verbose=False): |
|
231 config = ApptestConfiguration(appid, sourcefile=sourcefile) |
|
232 if verbose: |
|
233 print "init test database ..." |
|
234 source = config.sources()['system'] |
|
235 self.vreg = CubicWebRegistry(config) |
|
236 repo, self.cnx = init_test_database(driver=source['db-driver'], |
|
237 vreg=self.vreg) |
|
238 if verbose: |
|
239 print "init done" |
|
240 self.app = CubicWebPublisher(config, vreg=self.vreg) |
|
241 self.verbose = verbose |
|
242 # this is done when the publisher is opening a connection |
|
243 self.cnx.vreg = self.vreg |
|
244 login = source['db-user'] |
|
245 |
|
246 def setup(self, config=None): |
|
247 """config is passed by TestSuite but is ignored in this environment""" |
|
248 cursor = self.cnx.cursor() |
|
249 self.last_eid = cursor.execute('Any X WHERE X creation_date D ORDERBY D DESC LIMIT 1').rows[0][0] |
|
250 |
|
251 def cleanup(self): |
|
252 """cancel inserted elements during tests""" |
|
253 cursor = self.cnx.cursor() |
|
254 cursor.execute('DELETE Any X WHERE X eid > %(x)s', {'x' : self.last_eid}, eid_key='x') |
|
255 print "cleaning done" |
|
256 self.cnx.commit() |
|
257 |