146 repo = self.repo |
146 repo = self.repo |
147 cnxid = repo.connect(self.admlogin, password=self.admpassword) |
147 cnxid = repo.connect(self.admlogin, password=self.admpassword) |
148 self.assert_(cnxid) |
148 self.assert_(cnxid) |
149 repo.close(cnxid) |
149 repo.close(cnxid) |
150 |
150 |
151 def test_shared_data(self): |
|
152 repo = self.repo |
|
153 cnxid = repo.connect(self.admlogin, password=self.admpassword) |
|
154 repo.set_shared_data(cnxid, 'data', 4) |
|
155 cnxid2 = repo.connect(self.admlogin, password=self.admpassword) |
|
156 self.assertEqual(repo.get_shared_data(cnxid, 'data'), 4) |
|
157 self.assertEqual(repo.get_shared_data(cnxid2, 'data'), None) |
|
158 repo.set_shared_data(cnxid2, 'data', 5) |
|
159 self.assertEqual(repo.get_shared_data(cnxid, 'data'), 4) |
|
160 self.assertEqual(repo.get_shared_data(cnxid2, 'data'), 5) |
|
161 repo.get_shared_data(cnxid2, 'data', pop=True) |
|
162 self.assertEqual(repo.get_shared_data(cnxid, 'data'), 4) |
|
163 self.assertEqual(repo.get_shared_data(cnxid2, 'data'), None) |
|
164 repo.close(cnxid) |
|
165 repo.close(cnxid2) |
|
166 self.assertRaises(BadConnectionId, repo.get_shared_data, cnxid, 'data') |
|
167 self.assertRaises(BadConnectionId, repo.get_shared_data, cnxid2, 'data') |
|
168 self.assertRaises(BadConnectionId, repo.set_shared_data, cnxid, 'data', 1) |
|
169 self.assertRaises(BadConnectionId, repo.set_shared_data, cnxid2, 'data', 1) |
|
170 |
|
171 def test_check_session(self): |
151 def test_check_session(self): |
172 repo = self.repo |
152 repo = self.repo |
173 cnxid = repo.connect(self.admlogin, password=self.admpassword) |
153 cnxid = repo.connect(self.admlogin, password=self.admpassword) |
174 self.assertIsInstance(repo.check_session(cnxid), float) |
154 self.assertIsInstance(repo.check_session(cnxid), float) |
175 repo.close(cnxid) |
155 repo.close(cnxid) |
233 'uri': 'system', |
213 'uri': 'system', |
234 'use-cwuri-as-url': False} |
214 'use-cwuri-as-url': False} |
235 }) |
215 }) |
236 # .properties() return a result set |
216 # .properties() return a result set |
237 self.assertEqual(self.repo.properties().rql, 'Any K,V WHERE P is CWProperty,P pkey K, P value V, NOT P for_user U') |
217 self.assertEqual(self.repo.properties().rql, 'Any K,V WHERE P is CWProperty,P pkey K, P value V, NOT P for_user U') |
238 |
|
239 def test_shared_data_api(self): |
|
240 repo = self.repo |
|
241 cnxid = repo.connect(self.admlogin, password=self.admpassword) |
|
242 self.assertEqual(repo.get_shared_data(cnxid, 'data'), None) |
|
243 repo.set_shared_data(cnxid, 'data', 4) |
|
244 self.assertEqual(repo.get_shared_data(cnxid, 'data'), 4) |
|
245 repo.get_shared_data(cnxid, 'data', pop=True) |
|
246 repo.get_shared_data(cnxid, 'whatever', pop=True) |
|
247 self.assertEqual(repo.get_shared_data(cnxid, 'data'), None) |
|
248 repo.close(cnxid) |
|
249 self.assertRaises(BadConnectionId, repo.set_shared_data, cnxid, 'data', 0) |
|
250 self.assertRaises(BadConnectionId, repo.get_shared_data, cnxid, 'data') |
|
251 |
218 |
252 def test_schema_is_relation(self): |
219 def test_schema_is_relation(self): |
253 with self.admin_access.repo_cnx() as cnx: |
220 with self.admin_access.repo_cnx() as cnx: |
254 no_is_rset = cnx.execute('Any X WHERE NOT X is ET') |
221 no_is_rset = cnx.execute('Any X WHERE NOT X is ET') |
255 self.assertFalse(no_is_rset, no_is_rset.description) |
222 self.assertFalse(no_is_rset, no_is_rset.description) |