87 |
87 |
88 def test_bfss_storage(self): |
88 def test_bfss_storage(self): |
89 f1 = self.create_file() |
89 f1 = self.create_file() |
90 expected_filepath = osp.join(self.tempdir, '%s_data_%s' % |
90 expected_filepath = osp.join(self.tempdir, '%s_data_%s' % |
91 (f1.eid, f1.data_name)) |
91 (f1.eid, f1.data_name)) |
92 self.failUnless(osp.isfile(expected_filepath)) |
92 self.assertTrue(osp.isfile(expected_filepath)) |
93 self.assertEqual(file(expected_filepath).read(), 'the-data') |
93 self.assertEqual(file(expected_filepath).read(), 'the-data') |
94 self.rollback() |
94 self.rollback() |
95 self.failIf(osp.isfile(expected_filepath)) |
95 self.assertFalse(osp.isfile(expected_filepath)) |
96 f1 = self.create_file() |
96 f1 = self.create_file() |
97 self.commit() |
97 self.commit() |
98 self.assertEqual(file(expected_filepath).read(), 'the-data') |
98 self.assertEqual(file(expected_filepath).read(), 'the-data') |
99 f1.set_attributes(data=Binary('the new data')) |
99 f1.set_attributes(data=Binary('the new data')) |
100 self.rollback() |
100 self.rollback() |
101 self.assertEqual(file(expected_filepath).read(), 'the-data') |
101 self.assertEqual(file(expected_filepath).read(), 'the-data') |
102 f1.cw_delete() |
102 f1.cw_delete() |
103 self.failUnless(osp.isfile(expected_filepath)) |
103 self.assertTrue(osp.isfile(expected_filepath)) |
104 self.rollback() |
104 self.rollback() |
105 self.failUnless(osp.isfile(expected_filepath)) |
105 self.assertTrue(osp.isfile(expected_filepath)) |
106 f1.cw_delete() |
106 f1.cw_delete() |
107 self.commit() |
107 self.commit() |
108 self.failIf(osp.isfile(expected_filepath)) |
108 self.assertFalse(osp.isfile(expected_filepath)) |
109 |
109 |
110 def test_bfss_sqlite_fspath(self): |
110 def test_bfss_sqlite_fspath(self): |
111 f1 = self.create_file() |
111 f1 = self.create_file() |
112 expected_filepath = osp.join(self.tempdir, '%s_data_%s' % (f1.eid, f1.data_name)) |
112 expected_filepath = osp.join(self.tempdir, '%s_data_%s' % (f1.eid, f1.data_name)) |
113 self.assertEqual(self.fspath(f1), expected_filepath) |
113 self.assertEqual(self.fspath(f1), expected_filepath) |
217 data_format=u'text/plain', data_name=u'foo.txt') |
217 data_format=u'text/plain', data_name=u'foo.txt') |
218 # NOTE: do not use set_attributes() which would automatically |
218 # NOTE: do not use set_attributes() which would automatically |
219 # update f1's local dict. We want the pure rql version to work |
219 # update f1's local dict. We want the pure rql version to work |
220 self.commit() |
220 self.commit() |
221 old_path = self.fspath(f1) |
221 old_path = self.fspath(f1) |
222 self.failUnless(osp.isfile(old_path)) |
222 self.assertTrue(osp.isfile(old_path)) |
223 self.assertEqual(osp.splitext(old_path)[1], '.txt') |
223 self.assertEqual(osp.splitext(old_path)[1], '.txt') |
224 self.execute('SET F data %(d)s, F data_name %(dn)s, F data_format %(df)s WHERE F eid %(f)s', |
224 self.execute('SET F data %(d)s, F data_name %(dn)s, F data_format %(df)s WHERE F eid %(f)s', |
225 {'d': Binary('some other data'), 'f': f1.eid, 'dn': u'bar.jpg', 'df': u'image/jpeg'}) |
225 {'d': Binary('some other data'), 'f': f1.eid, 'dn': u'bar.jpg', 'df': u'image/jpeg'}) |
226 self.commit() |
226 self.commit() |
227 # the new file exists with correct extension |
227 # the new file exists with correct extension |
228 # the old file is dead |
228 # the old file is dead |
229 f2 = self.execute('Any F WHERE F eid %(f)s, F is File', {'f': f1.eid}).get_entity(0, 0) |
229 f2 = self.execute('Any F WHERE F eid %(f)s, F is File', {'f': f1.eid}).get_entity(0, 0) |
230 new_path = self.fspath(f2) |
230 new_path = self.fspath(f2) |
231 self.failIf(osp.isfile(old_path)) |
231 self.assertFalse(osp.isfile(old_path)) |
232 self.failUnless(osp.isfile(new_path)) |
232 self.assertTrue(osp.isfile(new_path)) |
233 self.assertEqual(osp.splitext(new_path)[1], '.jpg') |
233 self.assertEqual(osp.splitext(new_path)[1], '.jpg') |
234 |
234 |
235 @tag('update', 'extension', 'rollback') |
235 @tag('update', 'extension', 'rollback') |
236 def test_bfss_update_with_different_extension_rollbacked(self): |
236 def test_bfss_update_with_different_extension_rollbacked(self): |
237 # use self.session to use server-side cache |
237 # use self.session to use server-side cache |
240 # NOTE: do not use set_attributes() which would automatically |
240 # NOTE: do not use set_attributes() which would automatically |
241 # update f1's local dict. We want the pure rql version to work |
241 # update f1's local dict. We want the pure rql version to work |
242 self.commit() |
242 self.commit() |
243 old_path = self.fspath(f1) |
243 old_path = self.fspath(f1) |
244 old_data = f1.data.getvalue() |
244 old_data = f1.data.getvalue() |
245 self.failUnless(osp.isfile(old_path)) |
245 self.assertTrue(osp.isfile(old_path)) |
246 self.assertEqual(osp.splitext(old_path)[1], '.txt') |
246 self.assertEqual(osp.splitext(old_path)[1], '.txt') |
247 self.execute('SET F data %(d)s, F data_name %(dn)s, F data_format %(df)s WHERE F eid %(f)s', |
247 self.execute('SET F data %(d)s, F data_name %(dn)s, F data_format %(df)s WHERE F eid %(f)s', |
248 {'d': Binary('some other data'), 'f': f1.eid, 'dn': u'bar.jpg', 'df': u'image/jpeg'}) |
248 {'d': Binary('some other data'), 'f': f1.eid, 'dn': u'bar.jpg', 'df': u'image/jpeg'}) |
249 self.rollback() |
249 self.rollback() |
250 # the new file exists with correct extension |
250 # the new file exists with correct extension |
251 # the old file is dead |
251 # the old file is dead |
252 f2 = self.execute('Any F WHERE F eid %(f)s, F is File', {'f': f1.eid}).get_entity(0, 0) |
252 f2 = self.execute('Any F WHERE F eid %(f)s, F is File', {'f': f1.eid}).get_entity(0, 0) |
253 new_path = self.fspath(f2) |
253 new_path = self.fspath(f2) |
254 new_data = f2.data.getvalue() |
254 new_data = f2.data.getvalue() |
255 self.failUnless(osp.isfile(new_path)) |
255 self.assertTrue(osp.isfile(new_path)) |
256 self.assertEqual(osp.splitext(new_path)[1], '.txt') |
256 self.assertEqual(osp.splitext(new_path)[1], '.txt') |
257 self.assertEqual(old_path, new_path) |
257 self.assertEqual(old_path, new_path) |
258 self.assertEqual(old_data, new_data) |
258 self.assertEqual(old_data, new_data) |
259 |
259 |
260 @tag('update', 'NULL') |
260 @tag('update', 'NULL') |
277 self.execute('SET F data %(d)s WHERE F eid %(f)s', |
277 self.execute('SET F data %(d)s WHERE F eid %(f)s', |
278 {'d': Binary(new_fspath), 'f': f1.eid}) |
278 {'d': Binary(new_fspath), 'f': f1.eid}) |
279 self.commit() |
279 self.commit() |
280 self.assertEqual(f1.data.getvalue(), 'the new data') |
280 self.assertEqual(f1.data.getvalue(), 'the new data') |
281 self.assertEqual(self.fspath(f1), new_fspath) |
281 self.assertEqual(self.fspath(f1), new_fspath) |
282 self.failIf(osp.isfile(old_fspath)) |
282 self.assertFalse(osp.isfile(old_fspath)) |
283 |
283 |
284 @tag('fsimport') |
284 @tag('fsimport') |
285 def test_clean(self): |
285 def test_clean(self): |
286 fsimport = storages.fsimport |
286 fsimport = storages.fsimport |
287 td = self.session.transaction_data |
287 td = self.session.transaction_data |