54 |
54 |
55 def create_file(self, content='the-data'): |
55 def create_file(self, content='the-data'): |
56 req = self.request() |
56 req = self.request() |
57 return req.create_entity('File', data=Binary(content), |
57 return req.create_entity('File', data=Binary(content), |
58 data_format=u'text/plain', data_name=u'foo') |
58 data_format=u'text/plain', data_name=u'foo') |
|
59 |
|
60 def fspath(self, entity): |
|
61 fspath = self.execute('Any fspath(D) WHERE F eid %(f)s, F data D', |
|
62 {'f': entity.eid})[0][0] |
|
63 return fspath.getvalue() |
59 |
64 |
60 def test_bfss_storage(self): |
65 def test_bfss_storage(self): |
61 f1 = self.create_file() |
66 f1 = self.create_file() |
62 expected_filepath = osp.join(self.tempdir, '%s_data' % f1.eid) |
67 expected_filepath = osp.join(self.tempdir, '%s_data' % f1.eid) |
63 self.failUnless(osp.isfile(expected_filepath)) |
68 self.failUnless(osp.isfile(expected_filepath)) |
79 self.failIf(osp.isfile(expected_filepath)) |
84 self.failIf(osp.isfile(expected_filepath)) |
80 |
85 |
81 def test_bfss_sqlite_fspath(self): |
86 def test_bfss_sqlite_fspath(self): |
82 f1 = self.create_file() |
87 f1 = self.create_file() |
83 expected_filepath = osp.join(self.tempdir, '%s_data' % f1.eid) |
88 expected_filepath = osp.join(self.tempdir, '%s_data' % f1.eid) |
84 fspath = self.execute('Any fspath(D) WHERE F eid %(f)s, F data D', |
89 self.assertEquals(self.fspath(f1), expected_filepath) |
85 {'f': f1.eid})[0][0] |
|
86 self.assertEquals(fspath.getvalue(), expected_filepath) |
|
87 |
90 |
88 def test_bfss_fs_importing_doesnt_touch_path(self): |
91 def test_bfss_fs_importing_doesnt_touch_path(self): |
89 self.session.transaction_data['fs_importing'] = True |
92 self.session.transaction_data['fs_importing'] = True |
90 filepath = osp.abspath(__file__) |
93 filepath = osp.abspath(__file__) |
91 f1 = self.session.create_entity('File', data=Binary(filepath), |
94 f1 = self.session.create_entity('File', data=Binary(filepath), |
92 data_format=u'text/plain', data_name=u'foo') |
95 data_format=u'text/plain', data_name=u'foo') |
93 fspath = self.execute('Any fspath(D) WHERE F eid %(f)s, F data D', |
96 self.assertEquals(self.fspath(f1), filepath) |
94 {'f': f1.eid})[0][0] |
|
95 self.assertEquals(fspath.getvalue(), filepath) |
|
96 |
97 |
97 def test_source_storage_transparency(self): |
98 def test_source_storage_transparency(self): |
98 with self.temporary_appobjects(DummyBeforeHook, DummyAfterHook): |
99 with self.temporary_appobjects(DummyBeforeHook, DummyAfterHook): |
99 self.create_file() |
100 self.create_file() |
100 |
101 |
178 self.commit() |
179 self.commit() |
179 f2 = self.entity('Any F WHERE F eid %(f)s, F is File', {'f': f1.eid}) |
180 f2 = self.entity('Any F WHERE F eid %(f)s, F is File', {'f': f1.eid}) |
180 self.assertEquals(f2.data.getvalue(), 'some other data') |
181 self.assertEquals(f2.data.getvalue(), 'some other data') |
181 |
182 |
182 |
183 |
|
184 def test_bfss_update_with_fs_importing(self): |
|
185 # use self.session to use server-side cache |
|
186 f1 = self.session.create_entity('File', data=Binary('some data'), |
|
187 data_format=u'text/plain', data_name=u'foo') |
|
188 old_fspath = self.fspath(f1) |
|
189 self.session.transaction_data['fs_importing'] = True |
|
190 new_fspath = osp.join(self.tempdir, 'newfile.txt') |
|
191 file(new_fspath, 'w').write('the new data') |
|
192 self.execute('SET F data %(d)s WHERE F eid %(f)s', |
|
193 {'d': Binary(new_fspath), 'f': f1.eid}) |
|
194 self.commit() |
|
195 self.assertEquals(f1.data.getvalue(), 'the new data') |
|
196 self.assertEquals(self.fspath(f1), new_fspath) |
|
197 self.failIf(osp.isfile(old_fspath)) |
|
198 |
|
199 |
183 if __name__ == '__main__': |
200 if __name__ == '__main__': |
184 unittest_main() |
201 unittest_main() |