17 # with CubicWeb. If not, see <http://www.gnu.org/licenses/>. |
17 # with CubicWeb. If not, see <http://www.gnu.org/licenses/>. |
18 """unit tests for module cubicweb.server.sources.storages""" |
18 """unit tests for module cubicweb.server.sources.storages""" |
19 |
19 |
20 from __future__ import with_statement |
20 from __future__ import with_statement |
21 |
21 |
22 from logilab.common.testlib import unittest_main, tag |
22 from logilab.common.testlib import unittest_main, tag, Tags |
23 from cubicweb.devtools.testlib import CubicWebTC |
23 from cubicweb.devtools.testlib import CubicWebTC |
24 |
24 |
25 import os.path as osp |
25 import os.path as osp |
26 import shutil |
26 import shutil |
27 import tempfile |
27 import tempfile |
49 # new value of entity.data should be the same as before |
49 # new value of entity.data should be the same as before |
50 oldvalue = self._cw.transaction_data['orig_file_value'] |
50 oldvalue = self._cw.transaction_data['orig_file_value'] |
51 assert oldvalue == self.entity.data.getvalue() |
51 assert oldvalue == self.entity.data.getvalue() |
52 |
52 |
53 class StorageTC(CubicWebTC): |
53 class StorageTC(CubicWebTC): |
|
54 |
|
55 tags = CubicWebTC.tags | Tags('Storage', 'BFSS') |
54 |
56 |
55 def setup_database(self): |
57 def setup_database(self): |
56 self.tempdir = tempfile.mkdtemp() |
58 self.tempdir = tempfile.mkdtemp() |
57 bfs_storage = storages.BytesFileSystemStorage(self.tempdir) |
59 bfs_storage = storages.BytesFileSystemStorage(self.tempdir) |
58 storages.set_attribute_storage(self.repo, 'File', 'data', bfs_storage) |
60 storages.set_attribute_storage(self.repo, 'File', 'data', bfs_storage) |
182 f1 = self.session.create_entity('File', data=Binary(filepath), |
184 f1 = self.session.create_entity('File', data=Binary(filepath), |
183 data_format=u'text/plain', data_name=u'foo') |
185 data_format=u'text/plain', data_name=u'foo') |
184 self.assertEqual(f1.data.getvalue(), file(filepath).read(), |
186 self.assertEqual(f1.data.getvalue(), file(filepath).read(), |
185 'files content differ') |
187 'files content differ') |
186 |
188 |
187 @tag('Storage', 'BFSS', 'update') |
189 @tag('update') |
188 def test_bfss_update_with_existing_data(self): |
190 def test_bfss_update_with_existing_data(self): |
189 # use self.session to use server-side cache |
191 # use self.session to use server-side cache |
190 f1 = self.session.create_entity('File', data=Binary('some data'), |
192 f1 = self.session.create_entity('File', data=Binary('some data'), |
191 data_format=u'text/plain', data_name=u'foo') |
193 data_format=u'text/plain', data_name=u'foo') |
192 # NOTE: do not use set_attributes() which would automatically |
194 # NOTE: do not use set_attributes() which would automatically |
196 self.assertEqual(f1.data.getvalue(), 'some other data') |
198 self.assertEqual(f1.data.getvalue(), 'some other data') |
197 self.commit() |
199 self.commit() |
198 f2 = self.execute('Any F WHERE F eid %(f)s, F is File', {'f': f1.eid}).get_entity(0, 0) |
200 f2 = self.execute('Any F WHERE F eid %(f)s, F is File', {'f': f1.eid}).get_entity(0, 0) |
199 self.assertEqual(f2.data.getvalue(), 'some other data') |
201 self.assertEqual(f2.data.getvalue(), 'some other data') |
200 |
202 |
201 @tag('Storage', 'BFSS', 'update', 'extension', 'commit') |
203 @tag('update', 'extension', 'commit') |
202 def test_bfss_update_with_different_extension_commited(self): |
204 def test_bfss_update_with_different_extension_commited(self): |
203 # use self.session to use server-side cache |
205 # use self.session to use server-side cache |
204 f1 = self.session.create_entity('File', data=Binary('some data'), |
206 f1 = self.session.create_entity('File', data=Binary('some data'), |
205 data_format=u'text/plain', data_name=u'foo.txt') |
207 data_format=u'text/plain', data_name=u'foo.txt') |
206 # NOTE: do not use set_attributes() which would automatically |
208 # NOTE: do not use set_attributes() which would automatically |
218 new_path = self.fspath(f2) |
220 new_path = self.fspath(f2) |
219 self.failIf(osp.isfile(old_path)) |
221 self.failIf(osp.isfile(old_path)) |
220 self.failUnless(osp.isfile(new_path)) |
222 self.failUnless(osp.isfile(new_path)) |
221 self.assertEqual(osp.splitext(new_path)[1], '.jpg') |
223 self.assertEqual(osp.splitext(new_path)[1], '.jpg') |
222 |
224 |
223 @tag('Storage', 'BFSS', 'update', 'extension', 'rollback') |
225 @tag('update', 'extension', 'rollback') |
224 def test_bfss_update_with_different_extension_rollbacked(self): |
226 def test_bfss_update_with_different_extension_rollbacked(self): |
225 # use self.session to use server-side cache |
227 # use self.session to use server-side cache |
226 f1 = self.session.create_entity('File', data=Binary('some data'), |
228 f1 = self.session.create_entity('File', data=Binary('some data'), |
227 data_format=u'text/plain', data_name=u'foo.txt') |
229 data_format=u'text/plain', data_name=u'foo.txt') |
228 # NOTE: do not use set_attributes() which would automatically |
230 # NOTE: do not use set_attributes() which would automatically |
243 self.failUnless(osp.isfile(new_path)) |
245 self.failUnless(osp.isfile(new_path)) |
244 self.assertEqual(osp.splitext(new_path)[1], '.txt') |
246 self.assertEqual(osp.splitext(new_path)[1], '.txt') |
245 self.assertEqual(old_path, new_path) |
247 self.assertEqual(old_path, new_path) |
246 self.assertEqual(old_data, new_data) |
248 self.assertEqual(old_data, new_data) |
247 |
249 |
|
250 @tag('fs_importing', 'update') |
248 def test_bfss_update_with_fs_importing(self): |
251 def test_bfss_update_with_fs_importing(self): |
249 # use self.session to use server-side cache |
252 # use self.session to use server-side cache |
250 f1 = self.session.create_entity('File', data=Binary('some data'), |
253 f1 = self.session.create_entity('File', data=Binary('some data'), |
251 data_format=u'text/plain', data_name=u'foo') |
254 data_format=u'text/plain', data_name=u'foo') |
252 old_fspath = self.fspath(f1) |
255 old_fspath = self.fspath(f1) |
258 self.commit() |
261 self.commit() |
259 self.assertEqual(f1.data.getvalue(), 'the new data') |
262 self.assertEqual(f1.data.getvalue(), 'the new data') |
260 self.assertEqual(self.fspath(f1), new_fspath) |
263 self.assertEqual(self.fspath(f1), new_fspath) |
261 self.failIf(osp.isfile(old_fspath)) |
264 self.failIf(osp.isfile(old_fspath)) |
262 |
265 |
|
266 @tag('fsimport') |
|
267 def test_clean(self): |
|
268 fsimport = storages.fsimport |
|
269 td = self.session.transaction_data |
|
270 self.assertNotIn('fs_importing', td) |
|
271 with fsimport(self.session): |
|
272 self.assertIn('fs_importing', td) |
|
273 self.assertTrue(td['fs_importing']) |
|
274 self.assertNotIn('fs_importing', td) |
|
275 |
|
276 @tag('fsimport') |
|
277 def test_true(self): |
|
278 fsimport = storages.fsimport |
|
279 td = self.session.transaction_data |
|
280 td['fs_importing'] = True |
|
281 with fsimport(self.session): |
|
282 self.assertIn('fs_importing', td) |
|
283 self.assertTrue(td['fs_importing']) |
|
284 self.assertTrue(td['fs_importing']) |
|
285 |
|
286 @tag('fsimport') |
|
287 def test_False(self): |
|
288 fsimport = storages.fsimport |
|
289 td = self.session.transaction_data |
|
290 td['fs_importing'] = False |
|
291 with fsimport(self.session): |
|
292 self.assertIn('fs_importing', td) |
|
293 self.assertTrue(td['fs_importing']) |
|
294 self.assertFalse(td['fs_importing']) |
263 |
295 |
264 if __name__ == '__main__': |
296 if __name__ == '__main__': |
265 unittest_main() |
297 unittest_main() |