|
1 # copyright 2003-2014 LOGILAB S.A. (Paris, FRANCE), all rights reserved. |
|
2 # contact http://www.logilab.fr/ -- mailto:contact@logilab.fr |
|
3 # |
|
4 # This file is part of CubicWeb. |
|
5 # |
|
6 # CubicWeb is free software: you can redistribute it and/or modify it under the |
|
7 # terms of the GNU Lesser General Public License as published by the Free |
|
8 # Software Foundation, either version 2.1 of the License, or (at your option) |
|
9 # any later version. |
|
10 # |
|
11 # CubicWeb is distributed in the hope that it will be useful, but WITHOUT |
|
12 # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS |
|
13 # FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more |
|
14 # details. |
|
15 # |
|
16 # You should have received a copy of the GNU Lesser General Public License along |
|
17 # with CubicWeb. If not, see <http://www.gnu.org/licenses/>. |
|
18 """unit tests for module cubicweb.server.sources.storages""" |
|
19 |
|
20 from six import PY2 |
|
21 |
|
22 from logilab.common.testlib import unittest_main, tag, Tags |
|
23 from cubicweb.devtools.testlib import CubicWebTC |
|
24 |
|
25 from glob import glob |
|
26 import os |
|
27 import os.path as osp |
|
28 import sys |
|
29 import shutil |
|
30 import tempfile |
|
31 |
|
32 from cubicweb import Binary, QueryError |
|
33 from cubicweb.predicates import is_instance |
|
34 from cubicweb.server.sources import storages |
|
35 from cubicweb.server.hook import Hook |
|
36 |
|
37 class DummyBeforeHook(Hook): |
|
38 __regid__ = 'dummy-before-hook' |
|
39 __select__ = Hook.__select__ & is_instance('File') |
|
40 events = ('before_add_entity',) |
|
41 |
|
42 def __call__(self): |
|
43 self._cw.transaction_data['orig_file_value'] = self.entity.data.getvalue() |
|
44 |
|
45 |
|
46 class DummyAfterHook(Hook): |
|
47 __regid__ = 'dummy-after-hook' |
|
48 __select__ = Hook.__select__ & is_instance('File') |
|
49 events = ('after_add_entity',) |
|
50 |
|
51 def __call__(self): |
|
52 # new value of entity.data should be the same as before |
|
53 oldvalue = self._cw.transaction_data['orig_file_value'] |
|
54 assert oldvalue == self.entity.data.getvalue() |
|
55 |
|
56 class StorageTC(CubicWebTC): |
|
57 tempdir = None |
|
58 tags = CubicWebTC.tags | Tags('Storage', 'BFSS') |
|
59 |
|
60 def setup_database(self): |
|
61 self.tempdir = tempfile.mkdtemp() |
|
62 bfs_storage = storages.BytesFileSystemStorage(self.tempdir) |
|
63 self.bfs_storage = bfs_storage |
|
64 storages.set_attribute_storage(self.repo, 'File', 'data', bfs_storage) |
|
65 storages.set_attribute_storage(self.repo, 'BFSSTestable', 'opt_attr', bfs_storage) |
|
66 |
|
67 def tearDown(self): |
|
68 super(StorageTC, self).tearDown() |
|
69 storages.unset_attribute_storage(self.repo, 'File', 'data') |
|
70 del self.bfs_storage |
|
71 shutil.rmtree(self.tempdir) |
|
72 |
|
73 |
|
74 def create_file(self, cnx, content=b'the-data'): |
|
75 return cnx.create_entity('File', data=Binary(content), |
|
76 data_format=u'text/plain', |
|
77 data_name=u'foo.pdf') |
|
78 |
|
79 def fspath(self, cnx, entity): |
|
80 fspath = cnx.execute('Any fspath(D) WHERE F eid %(f)s, F data D', |
|
81 {'f': entity.eid})[0][0].getvalue() |
|
82 return fspath if PY2 else fspath.decode('utf-8') |
|
83 |
|
84 def test_bfss_wrong_fspath_usage(self): |
|
85 with self.admin_access.repo_cnx() as cnx: |
|
86 f1 = self.create_file(cnx) |
|
87 cnx.execute('Any fspath(D) WHERE F eid %(f)s, F data D', {'f': f1.eid}) |
|
88 with self.assertRaises(NotImplementedError) as cm: |
|
89 cnx.execute('Any fspath(F) WHERE F eid %(f)s', {'f': f1.eid}) |
|
90 self.assertEqual(str(cm.exception), |
|
91 'This callback is only available for BytesFileSystemStorage ' |
|
92 'managed attribute. Is FSPATH() argument BFSS managed?') |
|
93 |
|
94 def test_bfss_storage(self): |
|
95 with self.admin_access.web_request() as req: |
|
96 cnx = req.cnx |
|
97 f1 = self.create_file(req) |
|
98 filepaths = glob(osp.join(self.tempdir, '%s_data_*' % f1.eid)) |
|
99 self.assertEqual(len(filepaths), 1, filepaths) |
|
100 expected_filepath = filepaths[0] |
|
101 # file should be read only |
|
102 self.assertFalse(os.access(expected_filepath, os.W_OK)) |
|
103 self.assertEqual(open(expected_filepath).read(), 'the-data') |
|
104 cnx.rollback() |
|
105 self.assertFalse(osp.isfile(expected_filepath)) |
|
106 filepaths = glob(osp.join(self.tempdir, '%s_data_*' % f1.eid)) |
|
107 self.assertEqual(len(filepaths), 0, filepaths) |
|
108 f1 = self.create_file(req) |
|
109 cnx.commit() |
|
110 filepaths = glob(osp.join(self.tempdir, '%s_data_*' % f1.eid)) |
|
111 self.assertEqual(len(filepaths), 1, filepaths) |
|
112 expected_filepath = filepaths[0] |
|
113 self.assertEqual(open(expected_filepath).read(), 'the-data') |
|
114 |
|
115 # add f1 back to the entity cache with req as _cw |
|
116 f1 = req.entity_from_eid(f1.eid) |
|
117 f1.cw_set(data=Binary(b'the new data')) |
|
118 cnx.rollback() |
|
119 self.assertEqual(open(expected_filepath).read(), 'the-data') |
|
120 f1.cw_delete() |
|
121 self.assertTrue(osp.isfile(expected_filepath)) |
|
122 cnx.rollback() |
|
123 self.assertTrue(osp.isfile(expected_filepath)) |
|
124 f1.cw_delete() |
|
125 cnx.commit() |
|
126 self.assertFalse(osp.isfile(expected_filepath)) |
|
127 |
|
128 def test_bfss_sqlite_fspath(self): |
|
129 with self.admin_access.repo_cnx() as cnx: |
|
130 f1 = self.create_file(cnx) |
|
131 expected_filepath = osp.join(self.tempdir, '%s_data_%s' % (f1.eid, f1.data_name)) |
|
132 base, ext = osp.splitext(expected_filepath) |
|
133 self.assertTrue(self.fspath(cnx, f1).startswith(base)) |
|
134 self.assertTrue(self.fspath(cnx, f1).endswith(ext)) |
|
135 |
|
136 def test_bfss_fs_importing_doesnt_touch_path(self): |
|
137 with self.admin_access.repo_cnx() as cnx: |
|
138 cnx.transaction_data['fs_importing'] = True |
|
139 filepath = osp.abspath(__file__) |
|
140 f1 = cnx.create_entity('File', data=Binary(filepath.encode(sys.getfilesystemencoding())), |
|
141 data_format=u'text/plain', data_name=u'foo') |
|
142 self.assertEqual(self.fspath(cnx, f1), filepath) |
|
143 |
|
144 def test_source_storage_transparency(self): |
|
145 with self.admin_access.repo_cnx() as cnx: |
|
146 with self.temporary_appobjects(DummyBeforeHook, DummyAfterHook): |
|
147 self.create_file(cnx) |
|
148 |
|
149 def test_source_mapped_attribute_error_cases(self): |
|
150 with self.admin_access.repo_cnx() as cnx: |
|
151 with self.assertRaises(QueryError) as cm: |
|
152 cnx.execute('Any X WHERE X data ~= "hop", X is File') |
|
153 self.assertEqual(str(cm.exception), 'can\'t use File.data (X data ILIKE "hop") in restriction') |
|
154 with self.assertRaises(QueryError) as cm: |
|
155 cnx.execute('Any X, Y WHERE X data D, Y data D, ' |
|
156 'NOT X identity Y, X is File, Y is File') |
|
157 self.assertEqual(str(cm.exception), "can't use D as a restriction variable") |
|
158 # query returning mix of mapped / regular attributes (only file.data |
|
159 # mapped, not image.data for instance) |
|
160 with self.assertRaises(QueryError) as cm: |
|
161 cnx.execute('Any X WITH X BEING (' |
|
162 ' (Any NULL)' |
|
163 ' UNION ' |
|
164 ' (Any D WHERE X data D, X is File)' |
|
165 ')') |
|
166 self.assertEqual(str(cm.exception), 'query fetch some source mapped attribute, some not') |
|
167 with self.assertRaises(QueryError) as cm: |
|
168 cnx.execute('(Any D WHERE X data D, X is File)' |
|
169 ' UNION ' |
|
170 '(Any D WHERE X title D, X is Bookmark)') |
|
171 self.assertEqual(str(cm.exception), 'query fetch some source mapped attribute, some not') |
|
172 |
|
173 storages.set_attribute_storage(self.repo, 'State', 'name', |
|
174 storages.BytesFileSystemStorage(self.tempdir)) |
|
175 try: |
|
176 with self.assertRaises(QueryError) as cm: |
|
177 cnx.execute('Any D WHERE X name D, X is IN (State, Transition)') |
|
178 self.assertEqual(str(cm.exception), 'query fetch some source mapped attribute, some not') |
|
179 finally: |
|
180 storages.unset_attribute_storage(self.repo, 'State', 'name') |
|
181 |
|
182 def test_source_mapped_attribute_advanced(self): |
|
183 with self.admin_access.repo_cnx() as cnx: |
|
184 f1 = self.create_file(cnx) |
|
185 rset = cnx.execute('Any X,D WITH D,X BEING (' |
|
186 ' (Any D, X WHERE X eid %(x)s, X data D)' |
|
187 ' UNION ' |
|
188 ' (Any D, X WHERE X eid %(x)s, X data D)' |
|
189 ')', {'x': f1.eid}) |
|
190 self.assertEqual(len(rset), 2) |
|
191 self.assertEqual(rset[0][0], f1.eid) |
|
192 self.assertEqual(rset[1][0], f1.eid) |
|
193 self.assertEqual(rset[0][1].getvalue(), b'the-data') |
|
194 self.assertEqual(rset[1][1].getvalue(), b'the-data') |
|
195 rset = cnx.execute('Any X,LENGTH(D) WHERE X eid %(x)s, X data D', |
|
196 {'x': f1.eid}) |
|
197 self.assertEqual(len(rset), 1) |
|
198 self.assertEqual(rset[0][0], f1.eid) |
|
199 self.assertEqual(rset[0][1], len('the-data')) |
|
200 rset = cnx.execute('Any X,LENGTH(D) WITH D,X BEING (' |
|
201 ' (Any D, X WHERE X eid %(x)s, X data D)' |
|
202 ' UNION ' |
|
203 ' (Any D, X WHERE X eid %(x)s, X data D)' |
|
204 ')', {'x': f1.eid}) |
|
205 self.assertEqual(len(rset), 2) |
|
206 self.assertEqual(rset[0][0], f1.eid) |
|
207 self.assertEqual(rset[1][0], f1.eid) |
|
208 self.assertEqual(rset[0][1], len('the-data')) |
|
209 self.assertEqual(rset[1][1], len('the-data')) |
|
210 with self.assertRaises(QueryError) as cm: |
|
211 cnx.execute('Any X,UPPER(D) WHERE X eid %(x)s, X data D', |
|
212 {'x': f1.eid}) |
|
213 self.assertEqual(str(cm.exception), 'UPPER can not be called on mapped attribute') |
|
214 |
|
215 |
|
216 def test_bfss_fs_importing_transparency(self): |
|
217 with self.admin_access.repo_cnx() as cnx: |
|
218 cnx.transaction_data['fs_importing'] = True |
|
219 filepath = osp.abspath(__file__) |
|
220 f1 = cnx.create_entity('File', data=Binary(filepath.encode(sys.getfilesystemencoding())), |
|
221 data_format=u'text/plain', data_name=u'foo') |
|
222 cw_value = f1.data.getvalue() |
|
223 fs_value = open(filepath, 'rb').read() |
|
224 if cw_value != fs_value: |
|
225 self.fail('cw value %r is different from file content' % cw_value) |
|
226 |
|
227 @tag('update') |
|
228 def test_bfss_update_with_existing_data(self): |
|
229 with self.admin_access.repo_cnx() as cnx: |
|
230 f1 = cnx.create_entity('File', data=Binary(b'some data'), |
|
231 data_format=u'text/plain', data_name=u'foo') |
|
232 # NOTE: do not use cw_set() which would automatically |
|
233 # update f1's local dict. We want the pure rql version to work |
|
234 cnx.execute('SET F data %(d)s WHERE F eid %(f)s', |
|
235 {'d': Binary(b'some other data'), 'f': f1.eid}) |
|
236 self.assertEqual(f1.data.getvalue(), b'some other data') |
|
237 cnx.commit() |
|
238 f2 = cnx.execute('Any F WHERE F eid %(f)s, F is File', {'f': f1.eid}).get_entity(0, 0) |
|
239 self.assertEqual(f2.data.getvalue(), b'some other data') |
|
240 |
|
241 @tag('update', 'extension', 'commit') |
|
242 def test_bfss_update_with_different_extension_commited(self): |
|
243 with self.admin_access.repo_cnx() as cnx: |
|
244 f1 = cnx.create_entity('File', data=Binary(b'some data'), |
|
245 data_format=u'text/plain', data_name=u'foo.txt') |
|
246 # NOTE: do not use cw_set() which would automatically |
|
247 # update f1's local dict. We want the pure rql version to work |
|
248 cnx.commit() |
|
249 old_path = self.fspath(cnx, f1) |
|
250 self.assertTrue(osp.isfile(old_path)) |
|
251 self.assertEqual(osp.splitext(old_path)[1], '.txt') |
|
252 cnx.execute('SET F data %(d)s, F data_name %(dn)s, ' |
|
253 'F data_format %(df)s WHERE F eid %(f)s', |
|
254 {'d': Binary(b'some other data'), 'f': f1.eid, |
|
255 'dn': u'bar.jpg', 'df': u'image/jpeg'}) |
|
256 cnx.commit() |
|
257 # the new file exists with correct extension |
|
258 # the old file is dead |
|
259 f2 = cnx.execute('Any F WHERE F eid %(f)s, F is File', {'f': f1.eid}).get_entity(0, 0) |
|
260 new_path = self.fspath(cnx, f2) |
|
261 self.assertFalse(osp.isfile(old_path)) |
|
262 self.assertTrue(osp.isfile(new_path)) |
|
263 self.assertEqual(osp.splitext(new_path)[1], '.jpg') |
|
264 |
|
265 @tag('update', 'extension', 'rollback') |
|
266 def test_bfss_update_with_different_extension_rolled_back(self): |
|
267 with self.admin_access.repo_cnx() as cnx: |
|
268 f1 = cnx.create_entity('File', data=Binary(b'some data'), |
|
269 data_format=u'text/plain', data_name=u'foo.txt') |
|
270 # NOTE: do not use cw_set() which would automatically |
|
271 # update f1's local dict. We want the pure rql version to work |
|
272 cnx.commit() |
|
273 old_path = self.fspath(cnx, f1) |
|
274 old_data = f1.data.getvalue() |
|
275 self.assertTrue(osp.isfile(old_path)) |
|
276 self.assertEqual(osp.splitext(old_path)[1], '.txt') |
|
277 cnx.execute('SET F data %(d)s, F data_name %(dn)s, ' |
|
278 'F data_format %(df)s WHERE F eid %(f)s', |
|
279 {'d': Binary(b'some other data'), |
|
280 'f': f1.eid, |
|
281 'dn': u'bar.jpg', |
|
282 'df': u'image/jpeg'}) |
|
283 cnx.rollback() |
|
284 # the new file exists with correct extension |
|
285 # the old file is dead |
|
286 f2 = cnx.execute('Any F WHERE F eid %(f)s, F is File', |
|
287 {'f': f1.eid}).get_entity(0, 0) |
|
288 new_path = self.fspath(cnx, f2) |
|
289 new_data = f2.data.getvalue() |
|
290 self.assertTrue(osp.isfile(new_path)) |
|
291 self.assertEqual(osp.splitext(new_path)[1], '.txt') |
|
292 self.assertEqual(old_path, new_path) |
|
293 self.assertEqual(old_data, new_data) |
|
294 |
|
295 @tag('update', 'NULL') |
|
296 def test_bfss_update_to_None(self): |
|
297 with self.admin_access.repo_cnx() as cnx: |
|
298 f = cnx.create_entity('Affaire', opt_attr=Binary(b'toto')) |
|
299 cnx.commit() |
|
300 f.cw_set(opt_attr=None) |
|
301 cnx.commit() |
|
302 |
|
303 @tag('fs_importing', 'update') |
|
304 def test_bfss_update_with_fs_importing(self): |
|
305 with self.admin_access.repo_cnx() as cnx: |
|
306 f1 = cnx.create_entity('File', data=Binary(b'some data'), |
|
307 data_format=u'text/plain', |
|
308 data_name=u'foo') |
|
309 old_fspath = self.fspath(cnx, f1) |
|
310 cnx.transaction_data['fs_importing'] = True |
|
311 new_fspath = osp.join(self.tempdir, 'newfile.txt') |
|
312 open(new_fspath, 'w').write('the new data') |
|
313 cnx.execute('SET F data %(d)s WHERE F eid %(f)s', |
|
314 {'d': Binary(new_fspath.encode(sys.getfilesystemencoding())), 'f': f1.eid}) |
|
315 cnx.commit() |
|
316 self.assertEqual(f1.data.getvalue(), b'the new data') |
|
317 self.assertEqual(self.fspath(cnx, f1), new_fspath) |
|
318 self.assertFalse(osp.isfile(old_fspath)) |
|
319 |
|
320 @tag('fsimport') |
|
321 def test_clean(self): |
|
322 with self.admin_access.repo_cnx() as cnx: |
|
323 fsimport = storages.fsimport |
|
324 td = cnx.transaction_data |
|
325 self.assertNotIn('fs_importing', td) |
|
326 with fsimport(cnx): |
|
327 self.assertIn('fs_importing', td) |
|
328 self.assertTrue(td['fs_importing']) |
|
329 self.assertNotIn('fs_importing', td) |
|
330 |
|
331 @tag('fsimport') |
|
332 def test_true(self): |
|
333 with self.admin_access.repo_cnx() as cnx: |
|
334 fsimport = storages.fsimport |
|
335 td = cnx.transaction_data |
|
336 td['fs_importing'] = True |
|
337 with fsimport(cnx): |
|
338 self.assertIn('fs_importing', td) |
|
339 self.assertTrue(td['fs_importing']) |
|
340 self.assertTrue(td['fs_importing']) |
|
341 |
|
342 @tag('fsimport') |
|
343 def test_False(self): |
|
344 with self.admin_access.repo_cnx() as cnx: |
|
345 fsimport = storages.fsimport |
|
346 td = cnx.transaction_data |
|
347 td['fs_importing'] = False |
|
348 with fsimport(cnx): |
|
349 self.assertIn('fs_importing', td) |
|
350 self.assertTrue(td['fs_importing']) |
|
351 self.assertFalse(td['fs_importing']) |
|
352 |
|
353 if __name__ == '__main__': |
|
354 unittest_main() |