58 (u'Interval',), (u'Password',), |
58 (u'Interval',), (u'Password',), |
59 (u'String',), (u'Time',)]) |
59 (u'String',), (u'Time',)]) |
60 |
60 |
61 def test_schema_has_owner(self): |
61 def test_schema_has_owner(self): |
62 repo = self.repo |
62 repo = self.repo |
63 cnxid = repo.connect(self.admlogin, self.admpassword) |
63 cnxid = repo.connect(self.admlogin, password=self.admpassword) |
64 self.failIf(repo.execute(cnxid, 'CWEType X WHERE NOT X owned_by U')) |
64 self.failIf(repo.execute(cnxid, 'CWEType X WHERE NOT X owned_by U')) |
65 self.failIf(repo.execute(cnxid, 'CWRType X WHERE NOT X owned_by U')) |
65 self.failIf(repo.execute(cnxid, 'CWRType X WHERE NOT X owned_by U')) |
66 self.failIf(repo.execute(cnxid, 'CWAttribute X WHERE NOT X owned_by U')) |
66 self.failIf(repo.execute(cnxid, 'CWAttribute X WHERE NOT X owned_by U')) |
67 self.failIf(repo.execute(cnxid, 'CWRelation X WHERE NOT X owned_by U')) |
67 self.failIf(repo.execute(cnxid, 'CWRelation X WHERE NOT X owned_by U')) |
68 self.failIf(repo.execute(cnxid, 'CWConstraint X WHERE NOT X owned_by U')) |
68 self.failIf(repo.execute(cnxid, 'CWConstraint X WHERE NOT X owned_by U')) |
69 self.failIf(repo.execute(cnxid, 'CWConstraintType X WHERE NOT X owned_by U')) |
69 self.failIf(repo.execute(cnxid, 'CWConstraintType X WHERE NOT X owned_by U')) |
70 |
70 |
71 def test_connect(self): |
71 def test_connect(self): |
72 self.assert_(self.repo.connect(self.admlogin, self.admpassword)) |
72 self.assert_(self.repo.connect(self.admlogin, password=self.admpassword)) |
73 self.assertRaises(AuthenticationError, |
73 self.assertRaises(AuthenticationError, |
74 self.repo.connect, self.admlogin, 'nimportnawak') |
74 self.repo.connect, self.admlogin, password='nimportnawak') |
75 self.assertRaises(AuthenticationError, |
75 self.assertRaises(AuthenticationError, |
76 self.repo.connect, self.admlogin, None) |
76 self.repo.connect, self.admlogin, password=None) |
77 self.assertRaises(AuthenticationError, |
77 self.assertRaises(AuthenticationError, |
78 self.repo.connect, None, None) |
78 self.repo.connect, None, password=None) |
|
79 self.assertRaises(AuthenticationError, |
|
80 self.repo.connect, self.admlogin) |
|
81 self.assertRaises(AuthenticationError, |
|
82 self.repo.connect, None) |
79 |
83 |
80 def test_execute(self): |
84 def test_execute(self): |
81 repo = self.repo |
85 repo = self.repo |
82 cnxid = repo.connect(self.admlogin, self.admpassword) |
86 cnxid = repo.connect(self.admlogin, password=self.admpassword) |
83 repo.execute(cnxid, 'Any X') |
87 repo.execute(cnxid, 'Any X') |
84 repo.execute(cnxid, 'Any X where X is Personne') |
88 repo.execute(cnxid, 'Any X where X is Personne') |
85 repo.execute(cnxid, 'Any X where X is Personne, X nom ~= "to"') |
89 repo.execute(cnxid, 'Any X where X is Personne, X nom ~= "to"') |
86 repo.execute(cnxid, 'Any X WHERE X has_text %(text)s', {'text': u'\xe7a'}) |
90 repo.execute(cnxid, 'Any X WHERE X has_text %(text)s', {'text': u'\xe7a'}) |
87 repo.close(cnxid) |
91 repo.close(cnxid) |
88 |
92 |
89 def test_login_upassword_accent(self): |
93 def test_login_upassword_accent(self): |
90 repo = self.repo |
94 repo = self.repo |
91 cnxid = repo.connect(self.admlogin, self.admpassword) |
95 cnxid = repo.connect(self.admlogin, password=self.admpassword) |
92 repo.execute(cnxid, 'INSERT CWUser X: X login %(login)s, X upassword %(passwd)s, X in_group G WHERE G name "users"', |
96 repo.execute(cnxid, 'INSERT CWUser X: X login %(login)s, X upassword %(passwd)s, X in_group G WHERE G name "users"', |
93 {'login': u"barnabé", 'passwd': u"héhéhé".encode('UTF8')}) |
97 {'login': u"barnabé", 'passwd': u"héhéhé".encode('UTF8')}) |
94 repo.commit(cnxid) |
98 repo.commit(cnxid) |
95 repo.close(cnxid) |
99 repo.close(cnxid) |
96 self.assert_(repo.connect(u"barnabé", u"héhéhé".encode('UTF8'))) |
100 self.assert_(repo.connect(u"barnabé", password=u"héhéhé".encode('UTF8'))) |
97 |
101 |
98 def test_invalid_entity_rollback(self): |
102 def test_invalid_entity_rollback(self): |
99 cnxid = self.repo.connect(self.admlogin, self.admpassword) |
103 cnxid = self.repo.connect(self.admlogin, password=self.admpassword) |
100 # no group |
104 # no group |
101 self.repo.execute(cnxid, |
105 self.repo.execute(cnxid, |
102 'INSERT CWUser X: X login %(login)s, X upassword %(passwd)s', |
106 'INSERT CWUser X: X login %(login)s, X upassword %(passwd)s', |
103 {'login': u"tutetute", 'passwd': 'tutetute'}) |
107 {'login': u"tutetute", 'passwd': 'tutetute'}) |
104 self.assertRaises(ValidationError, self.repo.commit, cnxid) |
108 self.assertRaises(ValidationError, self.repo.commit, cnxid) |
105 self.failIf(self.repo.execute(cnxid, 'CWUser X WHERE X login "tutetute"')) |
109 self.failIf(self.repo.execute(cnxid, 'CWUser X WHERE X login "tutetute"')) |
106 |
110 |
107 def test_close(self): |
111 def test_close(self): |
108 repo = self.repo |
112 repo = self.repo |
109 cnxid = repo.connect(self.admlogin, self.admpassword) |
113 cnxid = repo.connect(self.admlogin, password=self.admpassword) |
110 self.assert_(cnxid) |
114 self.assert_(cnxid) |
111 repo.close(cnxid) |
115 repo.close(cnxid) |
112 self.assertRaises(BadConnectionId, repo.execute, cnxid, 'Any X') |
116 self.assertRaises(BadConnectionId, repo.execute, cnxid, 'Any X') |
113 |
117 |
114 def test_invalid_cnxid(self): |
118 def test_invalid_cnxid(self): |
115 self.assertRaises(BadConnectionId, self.repo.execute, 0, 'Any X') |
119 self.assertRaises(BadConnectionId, self.repo.execute, 0, 'Any X') |
116 self.assertRaises(BadConnectionId, self.repo.close, None) |
120 self.assertRaises(BadConnectionId, self.repo.close, None) |
117 |
121 |
118 def test_shared_data(self): |
122 def test_shared_data(self): |
119 repo = self.repo |
123 repo = self.repo |
120 cnxid = repo.connect(self.admlogin, self.admpassword) |
124 cnxid = repo.connect(self.admlogin, password=self.admpassword) |
121 repo.set_shared_data(cnxid, 'data', 4) |
125 repo.set_shared_data(cnxid, 'data', 4) |
122 cnxid2 = repo.connect(self.admlogin, self.admpassword) |
126 cnxid2 = repo.connect(self.admlogin, password=self.admpassword) |
123 self.assertEquals(repo.get_shared_data(cnxid, 'data'), 4) |
127 self.assertEquals(repo.get_shared_data(cnxid, 'data'), 4) |
124 self.assertEquals(repo.get_shared_data(cnxid2, 'data'), None) |
128 self.assertEquals(repo.get_shared_data(cnxid2, 'data'), None) |
125 repo.set_shared_data(cnxid2, 'data', 5) |
129 repo.set_shared_data(cnxid2, 'data', 5) |
126 self.assertEquals(repo.get_shared_data(cnxid, 'data'), 4) |
130 self.assertEquals(repo.get_shared_data(cnxid, 'data'), 4) |
127 self.assertEquals(repo.get_shared_data(cnxid2, 'data'), 5) |
131 self.assertEquals(repo.get_shared_data(cnxid2, 'data'), 5) |
135 self.assertRaises(BadConnectionId, repo.set_shared_data, cnxid, 'data', 1) |
139 self.assertRaises(BadConnectionId, repo.set_shared_data, cnxid, 'data', 1) |
136 self.assertRaises(BadConnectionId, repo.set_shared_data, cnxid2, 'data', 1) |
140 self.assertRaises(BadConnectionId, repo.set_shared_data, cnxid2, 'data', 1) |
137 |
141 |
138 def test_check_session(self): |
142 def test_check_session(self): |
139 repo = self.repo |
143 repo = self.repo |
140 cnxid = repo.connect(self.admlogin, self.admpassword) |
144 cnxid = repo.connect(self.admlogin, password=self.admpassword) |
141 self.assertEquals(repo.check_session(cnxid), None) |
145 self.assertEquals(repo.check_session(cnxid), None) |
142 repo.close(cnxid) |
146 repo.close(cnxid) |
143 self.assertRaises(BadConnectionId, repo.check_session, cnxid) |
147 self.assertRaises(BadConnectionId, repo.check_session, cnxid) |
144 |
148 |
145 def test_transaction_base(self): |
149 def test_transaction_base(self): |
146 repo = self.repo |
150 repo = self.repo |
147 cnxid = repo.connect(self.admlogin, self.admpassword) |
151 cnxid = repo.connect(self.admlogin, password=self.admpassword) |
148 # check db state |
152 # check db state |
149 result = repo.execute(cnxid, 'Personne X') |
153 result = repo.execute(cnxid, 'Personne X') |
150 self.assertEquals(result.rowcount, 0) |
154 self.assertEquals(result.rowcount, 0) |
151 # rollback entity insertion |
155 # rollback entity insertion |
152 repo.execute(cnxid, "INSERT Personne X: X nom 'bidule'") |
156 repo.execute(cnxid, "INSERT Personne X: X nom 'bidule'") |
161 result = repo.execute(cnxid, 'Personne X') |
165 result = repo.execute(cnxid, 'Personne X') |
162 self.assertEquals(result.rowcount, 1) |
166 self.assertEquals(result.rowcount, 1) |
163 |
167 |
164 def test_transaction_base2(self): |
168 def test_transaction_base2(self): |
165 repo = self.repo |
169 repo = self.repo |
166 cnxid = repo.connect(self.admlogin, self.admpassword) |
170 cnxid = repo.connect(self.admlogin, password=self.admpassword) |
167 # rollback relation insertion |
171 # rollback relation insertion |
168 repo.execute(cnxid, "SET U in_group G WHERE U login 'admin', G name 'guests'") |
172 repo.execute(cnxid, "SET U in_group G WHERE U login 'admin', G name 'guests'") |
169 result = repo.execute(cnxid, "Any U WHERE U in_group G, U login 'admin', G name 'guests'") |
173 result = repo.execute(cnxid, "Any U WHERE U in_group G, U login 'admin', G name 'guests'") |
170 self.assertEquals(result.rowcount, 1) |
174 self.assertEquals(result.rowcount, 1) |
171 repo.rollback(cnxid) |
175 repo.rollback(cnxid) |
172 result = repo.execute(cnxid, "Any U WHERE U in_group G, U login 'admin', G name 'guests'") |
176 result = repo.execute(cnxid, "Any U WHERE U in_group G, U login 'admin', G name 'guests'") |
173 self.assertEquals(result.rowcount, 0, result.rows) |
177 self.assertEquals(result.rowcount, 0, result.rows) |
174 |
178 |
175 def test_transaction_base3(self): |
179 def test_transaction_base3(self): |
176 repo = self.repo |
180 repo = self.repo |
177 cnxid = repo.connect(self.admlogin, self.admpassword) |
181 cnxid = repo.connect(self.admlogin, password=self.admpassword) |
178 # rollback state change which trigger TrInfo insertion |
182 # rollback state change which trigger TrInfo insertion |
179 user = repo._get_session(cnxid).user |
183 user = repo._get_session(cnxid).user |
180 user.fire_transition('deactivate') |
184 user.fire_transition('deactivate') |
181 rset = repo.execute(cnxid, 'TrInfo T WHERE T wf_info_for X, X eid %(x)s', {'x': user.eid}) |
185 rset = repo.execute(cnxid, 'TrInfo T WHERE T wf_info_for X, X eid %(x)s', {'x': user.eid}) |
182 self.assertEquals(len(rset), 1) |
186 self.assertEquals(len(rset), 1) |
208 schema = self.repo.schema |
212 schema = self.repo.schema |
209 # check order of attributes is respected |
213 # check order of attributes is respected |
210 self.assertListEquals([r.type for r in schema.eschema('CWAttribute').ordered_relations() |
214 self.assertListEquals([r.type for r in schema.eschema('CWAttribute').ordered_relations() |
211 if not r.type in ('eid', 'is', 'is_instance_of', 'identity', |
215 if not r.type in ('eid', 'is', 'is_instance_of', 'identity', |
212 'creation_date', 'modification_date', 'cwuri', |
216 'creation_date', 'modification_date', 'cwuri', |
213 'owned_by', 'created_by')], |
217 'owned_by', 'created_by', |
214 ['relation_type', 'from_entity', 'to_entity', 'in_basket', 'constrained_by', |
218 'add_permission', 'delete_permission', 'read_permission')], |
|
219 ['relation_type', |
|
220 'from_entity', 'to_entity', |
|
221 'in_basket', 'constrained_by', |
215 'cardinality', 'ordernum', |
222 'cardinality', 'ordernum', |
216 'indexed', 'fulltextindexed', 'internationalizable', |
223 'indexed', 'fulltextindexed', 'internationalizable', |
217 'defaultval', 'description', 'description_format']) |
224 'defaultval', 'description', 'description_format']) |
218 |
225 |
219 self.assertEquals(schema.eschema('CWEType').main_attribute(), 'name') |
226 self.assertEquals(schema.eschema('CWEType').main_attribute(), 'name') |
220 self.assertEquals(schema.eschema('State').main_attribute(), 'name') |
227 self.assertEquals(schema.eschema('State').main_attribute(), 'name') |
221 |
228 |
222 constraints = schema.rschema('name').rproperty('CWEType', 'String', 'constraints') |
229 constraints = schema.rschema('name').rdef('CWEType', 'String').constraints |
223 self.assertEquals(len(constraints), 2) |
230 self.assertEquals(len(constraints), 2) |
224 for cstr in constraints[:]: |
231 for cstr in constraints[:]: |
225 if isinstance(cstr, UniqueConstraint): |
232 if isinstance(cstr, UniqueConstraint): |
226 constraints.remove(cstr) |
233 constraints.remove(cstr) |
227 break |
234 break |
289 # .properties() return a result set |
296 # .properties() return a result set |
290 self.assertEquals(self.repo.properties().rql, 'Any K,V WHERE P is CWProperty,P pkey K, P value V, NOT P for_user U') |
297 self.assertEquals(self.repo.properties().rql, 'Any K,V WHERE P is CWProperty,P pkey K, P value V, NOT P for_user U') |
291 |
298 |
292 def test_session_api(self): |
299 def test_session_api(self): |
293 repo = self.repo |
300 repo = self.repo |
294 cnxid = repo.connect(self.admlogin, self.admpassword) |
301 cnxid = repo.connect(self.admlogin, password=self.admpassword) |
295 self.assertEquals(repo.user_info(cnxid), (5, 'admin', set([u'managers']), {})) |
302 self.assertEquals(repo.user_info(cnxid), (5, 'admin', set([u'managers']), {})) |
296 self.assertEquals(repo.describe(cnxid, 1), (u'CWGroup', u'system', None)) |
303 self.assertEquals(repo.describe(cnxid, 1), (u'CWGroup', u'system', None)) |
297 repo.close(cnxid) |
304 repo.close(cnxid) |
298 self.assertRaises(BadConnectionId, repo.user_info, cnxid) |
305 self.assertRaises(BadConnectionId, repo.user_info, cnxid) |
299 self.assertRaises(BadConnectionId, repo.describe, cnxid, 1) |
306 self.assertRaises(BadConnectionId, repo.describe, cnxid, 1) |
300 |
307 |
301 def test_shared_data_api(self): |
308 def test_shared_data_api(self): |
302 repo = self.repo |
309 repo = self.repo |
303 cnxid = repo.connect(self.admlogin, self.admpassword) |
310 cnxid = repo.connect(self.admlogin, password=self.admpassword) |
304 self.assertEquals(repo.get_shared_data(cnxid, 'data'), None) |
311 self.assertEquals(repo.get_shared_data(cnxid, 'data'), None) |
305 repo.set_shared_data(cnxid, 'data', 4) |
312 repo.set_shared_data(cnxid, 'data', 4) |
306 self.assertEquals(repo.get_shared_data(cnxid, 'data'), 4) |
313 self.assertEquals(repo.get_shared_data(cnxid, 'data'), 4) |
307 repo.get_shared_data(cnxid, 'data', pop=True) |
314 repo.get_shared_data(cnxid, 'data', pop=True) |
308 repo.get_shared_data(cnxid, 'whatever', pop=True) |
315 repo.get_shared_data(cnxid, 'whatever', pop=True) |
324 # finally: |
331 # finally: |
325 # self.set_debug(False) |
332 # self.set_debug(False) |
326 # print 'test time: %.3f (time) %.3f (cpu)' % ((time() - t), clock() - c) |
333 # print 'test time: %.3f (time) %.3f (cpu)' % ((time() - t), clock() - c) |
327 |
334 |
328 def test_delete_if_singlecard1(self): |
335 def test_delete_if_singlecard1(self): |
329 note = self.add_entity('Affaire') |
336 note = self.request().create_entity('Affaire') |
330 p1 = self.add_entity('Personne', nom=u'toto') |
337 p1 = self.request().create_entity('Personne', nom=u'toto') |
331 self.execute('SET A todo_by P WHERE A eid %(x)s, P eid %(p)s', |
338 self.execute('SET A todo_by P WHERE A eid %(x)s, P eid %(p)s', |
332 {'x': note.eid, 'p': p1.eid}) |
339 {'x': note.eid, 'p': p1.eid}) |
333 rset = self.execute('Any P WHERE A todo_by P, A eid %(x)s', |
340 rset = self.execute('Any P WHERE A todo_by P, A eid %(x)s', |
334 {'x': note.eid}) |
341 {'x': note.eid}) |
335 self.assertEquals(len(rset), 1) |
342 self.assertEquals(len(rset), 1) |
336 p2 = self.add_entity('Personne', nom=u'tutu') |
343 p2 = self.request().create_entity('Personne', nom=u'tutu') |
337 self.execute('SET A todo_by P WHERE A eid %(x)s, P eid %(p)s', |
344 self.execute('SET A todo_by P WHERE A eid %(x)s, P eid %(p)s', |
338 {'x': note.eid, 'p': p2.eid}) |
345 {'x': note.eid, 'p': p2.eid}) |
339 rset = self.execute('Any P WHERE A todo_by P, A eid %(x)s', |
346 rset = self.execute('Any P WHERE A todo_by P, A eid %(x)s', |
340 {'x': note.eid}) |
347 {'x': note.eid}) |
341 self.assertEquals(len(rset), 1) |
348 self.assertEquals(len(rset), 1) |