server/test/unittest_storage.py
changeset 11057 0b59724cb3f2
parent 11052 058bb3dc685f
child 11058 23eb30449fe5
equal deleted inserted replaced
11052:058bb3dc685f 11057:0b59724cb3f2
     1 # copyright 2003-2014 LOGILAB S.A. (Paris, FRANCE), all rights reserved.
       
     2 # contact http://www.logilab.fr/ -- mailto:contact@logilab.fr
       
     3 #
       
     4 # This file is part of CubicWeb.
       
     5 #
       
     6 # CubicWeb is free software: you can redistribute it and/or modify it under the
       
     7 # terms of the GNU Lesser General Public License as published by the Free
       
     8 # Software Foundation, either version 2.1 of the License, or (at your option)
       
     9 # any later version.
       
    10 #
       
    11 # CubicWeb is distributed in the hope that it will be useful, but WITHOUT
       
    12 # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
       
    13 # FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for more
       
    14 # details.
       
    15 #
       
    16 # You should have received a copy of the GNU Lesser General Public License along
       
    17 # with CubicWeb.  If not, see <http://www.gnu.org/licenses/>.
       
    18 """unit tests for module cubicweb.server.sources.storages"""
       
    19 
       
    20 from six import PY2
       
    21 
       
    22 from logilab.common.testlib import unittest_main, tag, Tags
       
    23 from cubicweb.devtools.testlib import CubicWebTC
       
    24 
       
    25 from glob import glob
       
    26 import os
       
    27 import os.path as osp
       
    28 import sys
       
    29 import shutil
       
    30 import tempfile
       
    31 
       
    32 from cubicweb import Binary, QueryError
       
    33 from cubicweb.predicates import is_instance
       
    34 from cubicweb.server.sources import storages
       
    35 from cubicweb.server.hook import Hook
       
    36 
       
    37 class DummyBeforeHook(Hook):
       
    38     __regid__ = 'dummy-before-hook'
       
    39     __select__ = Hook.__select__ & is_instance('File')
       
    40     events = ('before_add_entity',)
       
    41 
       
    42     def __call__(self):
       
    43         self._cw.transaction_data['orig_file_value'] = self.entity.data.getvalue()
       
    44 
       
    45 
       
    46 class DummyAfterHook(Hook):
       
    47     __regid__ = 'dummy-after-hook'
       
    48     __select__ = Hook.__select__ & is_instance('File')
       
    49     events = ('after_add_entity',)
       
    50 
       
    51     def __call__(self):
       
    52         # new value of entity.data should be the same as before
       
    53         oldvalue = self._cw.transaction_data['orig_file_value']
       
    54         assert oldvalue == self.entity.data.getvalue()
       
    55 
       
    56 class StorageTC(CubicWebTC):
       
    57     tempdir = None
       
    58     tags = CubicWebTC.tags | Tags('Storage', 'BFSS')
       
    59 
       
    60     def setup_database(self):
       
    61         self.tempdir = tempfile.mkdtemp()
       
    62         bfs_storage = storages.BytesFileSystemStorage(self.tempdir)
       
    63         self.bfs_storage = bfs_storage
       
    64         storages.set_attribute_storage(self.repo, 'File', 'data', bfs_storage)
       
    65         storages.set_attribute_storage(self.repo, 'BFSSTestable', 'opt_attr', bfs_storage)
       
    66 
       
    67     def tearDown(self):
       
    68         super(StorageTC, self).tearDown()
       
    69         storages.unset_attribute_storage(self.repo, 'File', 'data')
       
    70         del self.bfs_storage
       
    71         shutil.rmtree(self.tempdir)
       
    72 
       
    73 
       
    74     def create_file(self, cnx, content=b'the-data'):
       
    75         return cnx.create_entity('File', data=Binary(content),
       
    76                                  data_format=u'text/plain',
       
    77                                  data_name=u'foo.pdf')
       
    78 
       
    79     def fspath(self, cnx, entity):
       
    80         fspath = cnx.execute('Any fspath(D) WHERE F eid %(f)s, F data D',
       
    81                              {'f': entity.eid})[0][0].getvalue()
       
    82         return fspath if PY2 else fspath.decode('utf-8')
       
    83 
       
    84     def test_bfss_wrong_fspath_usage(self):
       
    85         with self.admin_access.repo_cnx() as cnx:
       
    86             f1 = self.create_file(cnx)
       
    87             cnx.execute('Any fspath(D) WHERE F eid %(f)s, F data D', {'f': f1.eid})
       
    88             with self.assertRaises(NotImplementedError) as cm:
       
    89                 cnx.execute('Any fspath(F) WHERE F eid %(f)s', {'f': f1.eid})
       
    90             self.assertEqual(str(cm.exception),
       
    91                              'This callback is only available for BytesFileSystemStorage '
       
    92                              'managed attribute. Is FSPATH() argument BFSS managed?')
       
    93 
       
    94     def test_bfss_storage(self):
       
    95         with self.admin_access.web_request() as req:
       
    96             cnx = req.cnx
       
    97             f1 = self.create_file(req)
       
    98             filepaths = glob(osp.join(self.tempdir, '%s_data_*' % f1.eid))
       
    99             self.assertEqual(len(filepaths), 1, filepaths)
       
   100             expected_filepath = filepaths[0]
       
   101             # file should be read only
       
   102             self.assertFalse(os.access(expected_filepath, os.W_OK))
       
   103             self.assertEqual(open(expected_filepath).read(), 'the-data')
       
   104             cnx.rollback()
       
   105             self.assertFalse(osp.isfile(expected_filepath))
       
   106             filepaths = glob(osp.join(self.tempdir, '%s_data_*' % f1.eid))
       
   107             self.assertEqual(len(filepaths), 0, filepaths)
       
   108             f1 = self.create_file(req)
       
   109             cnx.commit()
       
   110             filepaths = glob(osp.join(self.tempdir, '%s_data_*' % f1.eid))
       
   111             self.assertEqual(len(filepaths), 1, filepaths)
       
   112             expected_filepath = filepaths[0]
       
   113             self.assertEqual(open(expected_filepath).read(), 'the-data')
       
   114 
       
   115             # add f1 back to the entity cache with req as _cw
       
   116             f1 = req.entity_from_eid(f1.eid)
       
   117             f1.cw_set(data=Binary(b'the new data'))
       
   118             cnx.rollback()
       
   119             self.assertEqual(open(expected_filepath).read(), 'the-data')
       
   120             f1.cw_delete()
       
   121             self.assertTrue(osp.isfile(expected_filepath))
       
   122             cnx.rollback()
       
   123             self.assertTrue(osp.isfile(expected_filepath))
       
   124             f1.cw_delete()
       
   125             cnx.commit()
       
   126             self.assertFalse(osp.isfile(expected_filepath))
       
   127 
       
   128     def test_bfss_sqlite_fspath(self):
       
   129         with self.admin_access.repo_cnx() as cnx:
       
   130             f1 = self.create_file(cnx)
       
   131             expected_filepath = osp.join(self.tempdir, '%s_data_%s' % (f1.eid, f1.data_name))
       
   132             base, ext = osp.splitext(expected_filepath)
       
   133             self.assertTrue(self.fspath(cnx, f1).startswith(base))
       
   134             self.assertTrue(self.fspath(cnx, f1).endswith(ext))
       
   135 
       
   136     def test_bfss_fs_importing_doesnt_touch_path(self):
       
   137         with self.admin_access.repo_cnx() as cnx:
       
   138             cnx.transaction_data['fs_importing'] = True
       
   139             filepath = osp.abspath(__file__)
       
   140             f1 = cnx.create_entity('File', data=Binary(filepath.encode(sys.getfilesystemencoding())),
       
   141                                    data_format=u'text/plain', data_name=u'foo')
       
   142             self.assertEqual(self.fspath(cnx, f1), filepath)
       
   143 
       
   144     def test_source_storage_transparency(self):
       
   145         with self.admin_access.repo_cnx() as cnx:
       
   146             with self.temporary_appobjects(DummyBeforeHook, DummyAfterHook):
       
   147                 self.create_file(cnx)
       
   148 
       
   149     def test_source_mapped_attribute_error_cases(self):
       
   150         with self.admin_access.repo_cnx() as cnx:
       
   151             with self.assertRaises(QueryError) as cm:
       
   152                 cnx.execute('Any X WHERE X data ~= "hop", X is File')
       
   153             self.assertEqual(str(cm.exception), 'can\'t use File.data (X data ILIKE "hop") in restriction')
       
   154             with self.assertRaises(QueryError) as cm:
       
   155                 cnx.execute('Any X, Y WHERE X data D, Y data D, '
       
   156                              'NOT X identity Y, X is File, Y is File')
       
   157             self.assertEqual(str(cm.exception), "can't use D as a restriction variable")
       
   158             # query returning mix of mapped / regular attributes (only file.data
       
   159             # mapped, not image.data for instance)
       
   160             with self.assertRaises(QueryError) as cm:
       
   161                 cnx.execute('Any X WITH X BEING ('
       
   162                              ' (Any NULL)'
       
   163                              '  UNION '
       
   164                              ' (Any D WHERE X data D, X is File)'
       
   165                              ')')
       
   166             self.assertEqual(str(cm.exception), 'query fetch some source mapped attribute, some not')
       
   167             with self.assertRaises(QueryError) as cm:
       
   168                 cnx.execute('(Any D WHERE X data D, X is File)'
       
   169                              ' UNION '
       
   170                              '(Any D WHERE X title D, X is Bookmark)')
       
   171             self.assertEqual(str(cm.exception), 'query fetch some source mapped attribute, some not')
       
   172 
       
   173             storages.set_attribute_storage(self.repo, 'State', 'name',
       
   174                                            storages.BytesFileSystemStorage(self.tempdir))
       
   175             try:
       
   176                 with self.assertRaises(QueryError) as cm:
       
   177                     cnx.execute('Any D WHERE X name D, X is IN (State, Transition)')
       
   178                 self.assertEqual(str(cm.exception), 'query fetch some source mapped attribute, some not')
       
   179             finally:
       
   180                 storages.unset_attribute_storage(self.repo, 'State', 'name')
       
   181 
       
   182     def test_source_mapped_attribute_advanced(self):
       
   183         with self.admin_access.repo_cnx() as cnx:
       
   184             f1 = self.create_file(cnx)
       
   185             rset = cnx.execute('Any X,D WITH D,X BEING ('
       
   186                                 ' (Any D, X WHERE X eid %(x)s, X data D)'
       
   187                                 '  UNION '
       
   188                                 ' (Any D, X WHERE X eid %(x)s, X data D)'
       
   189                                 ')', {'x': f1.eid})
       
   190             self.assertEqual(len(rset), 2)
       
   191             self.assertEqual(rset[0][0], f1.eid)
       
   192             self.assertEqual(rset[1][0], f1.eid)
       
   193             self.assertEqual(rset[0][1].getvalue(), b'the-data')
       
   194             self.assertEqual(rset[1][1].getvalue(), b'the-data')
       
   195             rset = cnx.execute('Any X,LENGTH(D) WHERE X eid %(x)s, X data D',
       
   196                                 {'x': f1.eid})
       
   197             self.assertEqual(len(rset), 1)
       
   198             self.assertEqual(rset[0][0], f1.eid)
       
   199             self.assertEqual(rset[0][1], len('the-data'))
       
   200             rset = cnx.execute('Any X,LENGTH(D) WITH D,X BEING ('
       
   201                                 ' (Any D, X WHERE X eid %(x)s, X data D)'
       
   202                                 '  UNION '
       
   203                                 ' (Any D, X WHERE X eid %(x)s, X data D)'
       
   204                                 ')', {'x': f1.eid})
       
   205             self.assertEqual(len(rset), 2)
       
   206             self.assertEqual(rset[0][0], f1.eid)
       
   207             self.assertEqual(rset[1][0], f1.eid)
       
   208             self.assertEqual(rset[0][1], len('the-data'))
       
   209             self.assertEqual(rset[1][1], len('the-data'))
       
   210             with self.assertRaises(QueryError) as cm:
       
   211                 cnx.execute('Any X,UPPER(D) WHERE X eid %(x)s, X data D',
       
   212                              {'x': f1.eid})
       
   213             self.assertEqual(str(cm.exception), 'UPPER can not be called on mapped attribute')
       
   214 
       
   215 
       
   216     def test_bfss_fs_importing_transparency(self):
       
   217         with self.admin_access.repo_cnx() as cnx:
       
   218             cnx.transaction_data['fs_importing'] = True
       
   219             filepath = osp.abspath(__file__)
       
   220             f1 = cnx.create_entity('File', data=Binary(filepath.encode(sys.getfilesystemencoding())),
       
   221                                    data_format=u'text/plain', data_name=u'foo')
       
   222             cw_value = f1.data.getvalue()
       
   223             fs_value = open(filepath, 'rb').read()
       
   224             if cw_value != fs_value:
       
   225                 self.fail('cw value %r is different from file content' % cw_value)
       
   226 
       
   227     @tag('update')
       
   228     def test_bfss_update_with_existing_data(self):
       
   229         with self.admin_access.repo_cnx() as cnx:
       
   230             f1 = cnx.create_entity('File', data=Binary(b'some data'),
       
   231                                    data_format=u'text/plain', data_name=u'foo')
       
   232             # NOTE: do not use cw_set() which would automatically
       
   233             #       update f1's local dict. We want the pure rql version to work
       
   234             cnx.execute('SET F data %(d)s WHERE F eid %(f)s',
       
   235                          {'d': Binary(b'some other data'), 'f': f1.eid})
       
   236             self.assertEqual(f1.data.getvalue(), b'some other data')
       
   237             cnx.commit()
       
   238             f2 = cnx.execute('Any F WHERE F eid %(f)s, F is File', {'f': f1.eid}).get_entity(0, 0)
       
   239             self.assertEqual(f2.data.getvalue(), b'some other data')
       
   240 
       
   241     @tag('update', 'extension', 'commit')
       
   242     def test_bfss_update_with_different_extension_commited(self):
       
   243         with self.admin_access.repo_cnx() as cnx:
       
   244             f1 = cnx.create_entity('File', data=Binary(b'some data'),
       
   245                                    data_format=u'text/plain', data_name=u'foo.txt')
       
   246             # NOTE: do not use cw_set() which would automatically
       
   247             #       update f1's local dict. We want the pure rql version to work
       
   248             cnx.commit()
       
   249             old_path = self.fspath(cnx, f1)
       
   250             self.assertTrue(osp.isfile(old_path))
       
   251             self.assertEqual(osp.splitext(old_path)[1], '.txt')
       
   252             cnx.execute('SET F data %(d)s, F data_name %(dn)s, '
       
   253                          'F data_format %(df)s WHERE F eid %(f)s',
       
   254                          {'d': Binary(b'some other data'), 'f': f1.eid,
       
   255                           'dn': u'bar.jpg', 'df': u'image/jpeg'})
       
   256             cnx.commit()
       
   257             # the new file exists with correct extension
       
   258             # the old file is dead
       
   259             f2 = cnx.execute('Any F WHERE F eid %(f)s, F is File', {'f': f1.eid}).get_entity(0, 0)
       
   260             new_path = self.fspath(cnx, f2)
       
   261             self.assertFalse(osp.isfile(old_path))
       
   262             self.assertTrue(osp.isfile(new_path))
       
   263             self.assertEqual(osp.splitext(new_path)[1], '.jpg')
       
   264 
       
   265     @tag('update', 'extension', 'rollback')
       
   266     def test_bfss_update_with_different_extension_rolled_back(self):
       
   267         with self.admin_access.repo_cnx() as cnx:
       
   268             f1 = cnx.create_entity('File', data=Binary(b'some data'),
       
   269                                    data_format=u'text/plain', data_name=u'foo.txt')
       
   270             # NOTE: do not use cw_set() which would automatically
       
   271             #       update f1's local dict. We want the pure rql version to work
       
   272             cnx.commit()
       
   273             old_path = self.fspath(cnx, f1)
       
   274             old_data = f1.data.getvalue()
       
   275             self.assertTrue(osp.isfile(old_path))
       
   276             self.assertEqual(osp.splitext(old_path)[1], '.txt')
       
   277             cnx.execute('SET F data %(d)s, F data_name %(dn)s, '
       
   278                          'F data_format %(df)s WHERE F eid %(f)s',
       
   279                          {'d': Binary(b'some other data'),
       
   280                           'f': f1.eid,
       
   281                           'dn': u'bar.jpg',
       
   282                           'df': u'image/jpeg'})
       
   283             cnx.rollback()
       
   284             # the new file exists with correct extension
       
   285             # the old file is dead
       
   286             f2 = cnx.execute('Any F WHERE F eid %(f)s, F is File',
       
   287                               {'f': f1.eid}).get_entity(0, 0)
       
   288             new_path = self.fspath(cnx, f2)
       
   289             new_data = f2.data.getvalue()
       
   290             self.assertTrue(osp.isfile(new_path))
       
   291             self.assertEqual(osp.splitext(new_path)[1], '.txt')
       
   292             self.assertEqual(old_path, new_path)
       
   293             self.assertEqual(old_data, new_data)
       
   294 
       
   295     @tag('update', 'NULL')
       
   296     def test_bfss_update_to_None(self):
       
   297         with self.admin_access.repo_cnx() as cnx:
       
   298             f = cnx.create_entity('Affaire', opt_attr=Binary(b'toto'))
       
   299             cnx.commit()
       
   300             f.cw_set(opt_attr=None)
       
   301             cnx.commit()
       
   302 
       
   303     @tag('fs_importing', 'update')
       
   304     def test_bfss_update_with_fs_importing(self):
       
   305         with self.admin_access.repo_cnx() as cnx:
       
   306             f1 = cnx.create_entity('File', data=Binary(b'some data'),
       
   307                                    data_format=u'text/plain',
       
   308                                    data_name=u'foo')
       
   309             old_fspath = self.fspath(cnx, f1)
       
   310             cnx.transaction_data['fs_importing'] = True
       
   311             new_fspath = osp.join(self.tempdir, 'newfile.txt')
       
   312             open(new_fspath, 'w').write('the new data')
       
   313             cnx.execute('SET F data %(d)s WHERE F eid %(f)s',
       
   314                          {'d': Binary(new_fspath.encode(sys.getfilesystemencoding())), 'f': f1.eid})
       
   315             cnx.commit()
       
   316             self.assertEqual(f1.data.getvalue(), b'the new data')
       
   317             self.assertEqual(self.fspath(cnx, f1), new_fspath)
       
   318             self.assertFalse(osp.isfile(old_fspath))
       
   319 
       
   320     @tag('fsimport')
       
   321     def test_clean(self):
       
   322         with self.admin_access.repo_cnx() as cnx:
       
   323             fsimport = storages.fsimport
       
   324             td = cnx.transaction_data
       
   325             self.assertNotIn('fs_importing', td)
       
   326             with fsimport(cnx):
       
   327                 self.assertIn('fs_importing', td)
       
   328                 self.assertTrue(td['fs_importing'])
       
   329             self.assertNotIn('fs_importing', td)
       
   330 
       
   331     @tag('fsimport')
       
   332     def test_true(self):
       
   333         with self.admin_access.repo_cnx() as cnx:
       
   334             fsimport = storages.fsimport
       
   335             td = cnx.transaction_data
       
   336             td['fs_importing'] = True
       
   337             with fsimport(cnx):
       
   338                 self.assertIn('fs_importing', td)
       
   339                 self.assertTrue(td['fs_importing'])
       
   340             self.assertTrue(td['fs_importing'])
       
   341 
       
   342     @tag('fsimport')
       
   343     def test_False(self):
       
   344         with self.admin_access.repo_cnx() as cnx:
       
   345             fsimport = storages.fsimport
       
   346             td = cnx.transaction_data
       
   347             td['fs_importing'] = False
       
   348             with fsimport(cnx):
       
   349                 self.assertIn('fs_importing', td)
       
   350                 self.assertTrue(td['fs_importing'])
       
   351             self.assertFalse(td['fs_importing'])
       
   352 
       
   353 if __name__ == '__main__':
       
   354     unittest_main()