server/test/unittest_storage.py
changeset 5016 b3b0b808a0ed
parent 5013 ad91f93bbb93
child 5109 5cf83b9356d5
equal deleted inserted replaced
5004:4cc020ee70e2 5016:b3b0b808a0ed
    11 
    11 
    12 import os.path as osp
    12 import os.path as osp
    13 import shutil
    13 import shutil
    14 import tempfile
    14 import tempfile
    15 
    15 
    16 from cubicweb import Binary
    16 from cubicweb import Binary, QueryError
    17 from cubicweb.selectors import implements
    17 from cubicweb.selectors import implements
    18 from cubicweb.server.sources import storages
    18 from cubicweb.server.sources import storages
    19 from cubicweb.server.hook import Hook, Operation
    19 from cubicweb.server.hook import Hook, Operation
    20 
    20 
    21 class DummyBeforeHook(Hook):
    21 class DummyBeforeHook(Hook):
    49         super(CubicWebTC, self).tearDown()
    49         super(CubicWebTC, self).tearDown()
    50         storages.unset_attribute_storage(self.repo, 'File', 'data')
    50         storages.unset_attribute_storage(self.repo, 'File', 'data')
    51         shutil.rmtree(self.tempdir)
    51         shutil.rmtree(self.tempdir)
    52 
    52 
    53 
    53 
    54     def create_file(self, content):
    54     def create_file(self, content='the-data'):
    55         req = self.request()
    55         req = self.request()
    56         return req.create_entity('File', data=Binary(content),
    56         return req.create_entity('File', data=Binary(content),
    57                                  data_format=u'text/plain', data_name=u'foo')
    57                                  data_format=u'text/plain', data_name=u'foo')
    58 
    58 
    59     def test_bfs_storage(self):
    59     def test_bfss_storage(self):
    60         f1 = self.create_file(content='the-data')
    60         f1 = self.create_file()
    61         expected_filepath = osp.join(self.tempdir, '%s_data' % f1.eid)
    61         expected_filepath = osp.join(self.tempdir, '%s_data' % f1.eid)
    62         self.failUnless(osp.isfile(expected_filepath))
    62         self.failUnless(osp.isfile(expected_filepath))
    63         self.assertEquals(file(expected_filepath).read(), 'the-data')
    63         self.assertEquals(file(expected_filepath).read(), 'the-data')
       
    64         self.rollback()
       
    65         self.failIf(osp.isfile(expected_filepath))
       
    66         f1 = self.create_file()
       
    67         self.commit()
       
    68         self.assertEquals(file(expected_filepath).read(), 'the-data')
       
    69         f1.set_attributes(data=Binary('the new data'))
       
    70         self.rollback()
       
    71         self.assertEquals(file(expected_filepath).read(), 'the-data')
       
    72         f1.delete()
       
    73         self.failUnless(osp.isfile(expected_filepath))
       
    74         self.rollback()
       
    75         self.failUnless(osp.isfile(expected_filepath))
       
    76         f1.delete()
       
    77         self.commit()
       
    78         self.failIf(osp.isfile(expected_filepath))
    64 
    79 
    65     def test_sqlite_fspath(self):
    80     def test_bfss_sqlite_fspath(self):
    66         f1 = self.create_file(content='the-data')
    81         f1 = self.create_file()
    67         expected_filepath = osp.join(self.tempdir, '%s_data' % f1.eid)
    82         expected_filepath = osp.join(self.tempdir, '%s_data' % f1.eid)
    68         fspath = self.execute('Any fspath(F, "File", "data") WHERE F eid %(f)s',
    83         fspath = self.execute('Any fspath(D) WHERE F eid %(f)s, F data D',
    69                               {'f': f1.eid})[0][0]
    84                               {'f': f1.eid})[0][0]
    70         self.assertEquals(fspath.getvalue(), expected_filepath)
    85         self.assertEquals(fspath.getvalue(), expected_filepath)
    71 
    86 
    72     def test_fs_importing_doesnt_touch_path(self):
    87     def test_bfss_fs_importing_doesnt_touch_path(self):
    73         self.session.transaction_data['fs_importing'] = True
    88         self.session.transaction_data['fs_importing'] = True
    74         f1 = self.session.create_entity('File', data=Binary('/the/path'),
    89         f1 = self.session.create_entity('File', data=Binary('/the/path'),
    75                                         data_format=u'text/plain', data_name=u'foo')
    90                                         data_format=u'text/plain', data_name=u'foo')
    76         fspath = self.execute('Any fspath(F, "File", "data") WHERE F eid %(f)s',
    91         fspath = self.execute('Any fspath(D) WHERE F eid %(f)s, F data D',
    77                               {'f': f1.eid})[0][0]
    92                               {'f': f1.eid})[0][0]
    78         self.assertEquals(fspath.getvalue(), '/the/path')
    93         self.assertEquals(fspath.getvalue(), '/the/path')
    79 
    94 
    80     def test_storage_transparency(self):
    95     def test_source_storage_transparency(self):
    81         self.vreg._loadedmods[__name__] = {}
    96         self.vreg._loadedmods[__name__] = {}
    82         self.vreg.register(DummyBeforeHook)
    97         self.vreg.register(DummyBeforeHook)
    83         self.vreg.register(DummyAfterHook)
    98         self.vreg.register(DummyAfterHook)
    84         try:
    99         try:
    85             self.create_file(content='the-data')
   100             self.create_file()
    86         finally:
   101         finally:
    87             self.vreg.unregister(DummyBeforeHook)
   102             self.vreg.unregister(DummyBeforeHook)
    88             self.vreg.unregister(DummyAfterHook)
   103             self.vreg.unregister(DummyAfterHook)
    89 
   104 
       
   105     def test_source_mapped_attribute_error_cases(self):
       
   106         ex = self.assertRaises(QueryError, self.execute,
       
   107                                'Any X WHERE X data ~= "hop", X is File')
       
   108         self.assertEquals(str(ex), 'can\'t use File.data (X data ILIKE "hop") in restriction')
       
   109         ex = self.assertRaises(QueryError, self.execute,
       
   110                                'Any X, Y WHERE X data D, Y data D, '
       
   111                                'NOT X identity Y, X is File, Y is File')
       
   112         self.assertEquals(str(ex), "can't use D as a restriction variable")
       
   113         # query returning mix of mapped / regular attributes (only file.data
       
   114         # mapped, not image.data for instance)
       
   115         ex = self.assertRaises(QueryError, self.execute,
       
   116                                'Any X WITH X BEING ('
       
   117                                ' (Any NULL)'
       
   118                                '  UNION '
       
   119                                ' (Any D WHERE X data D, X is File)'
       
   120                                ')')
       
   121         self.assertEquals(str(ex), 'query fetch some source mapped attribute, some not')
       
   122         ex = self.assertRaises(QueryError, self.execute,
       
   123                                '(Any D WHERE X data D, X is File)'
       
   124                                ' UNION '
       
   125                                '(Any D WHERE X data D, X is Image)')
       
   126         self.assertEquals(str(ex), 'query fetch some source mapped attribute, some not')
       
   127         ex = self.assertRaises(QueryError,
       
   128                                self.execute, 'Any D WHERE X data D')
       
   129         self.assertEquals(str(ex), 'query fetch some source mapped attribute, some not')
       
   130 
       
   131     def test_source_mapped_attribute_advanced(self):
       
   132         f1 = self.create_file()
       
   133         rset = self.execute('Any X,D WITH D,X BEING ('
       
   134                             ' (Any D, X WHERE X eid %(x)s, X data D)'
       
   135                             '  UNION '
       
   136                             ' (Any D, X WHERE X eid %(x)s, X data D)'
       
   137                             ')', {'x': f1.eid}, 'x')
       
   138         self.assertEquals(len(rset), 2)
       
   139         self.assertEquals(rset[0][0], f1.eid)
       
   140         self.assertEquals(rset[1][0], f1.eid)
       
   141         self.assertEquals(rset[0][1].getvalue(), 'the-data')
       
   142         self.assertEquals(rset[1][1].getvalue(), 'the-data')
       
   143         rset = self.execute('Any X,LENGTH(D) WHERE X eid %(x)s, X data D',
       
   144                             {'x': f1.eid}, 'x')
       
   145         self.assertEquals(len(rset), 1)
       
   146         self.assertEquals(rset[0][0], f1.eid)
       
   147         self.assertEquals(rset[0][1], len('the-data'))
       
   148         rset = self.execute('Any X,LENGTH(D) WITH D,X BEING ('
       
   149                             ' (Any D, X WHERE X eid %(x)s, X data D)'
       
   150                             '  UNION '
       
   151                             ' (Any D, X WHERE X eid %(x)s, X data D)'
       
   152                             ')', {'x': f1.eid}, 'x')
       
   153         self.assertEquals(len(rset), 2)
       
   154         self.assertEquals(rset[0][0], f1.eid)
       
   155         self.assertEquals(rset[1][0], f1.eid)
       
   156         self.assertEquals(rset[0][1], len('the-data'))
       
   157         self.assertEquals(rset[1][1], len('the-data'))
       
   158         ex = self.assertRaises(QueryError, self.execute,
       
   159                                'Any X,UPPER(D) WHERE X eid %(x)s, X data D',
       
   160                                {'x': f1.eid}, 'x')
       
   161         self.assertEquals(str(ex), 'UPPER can not be called on mapped attribute')
       
   162 
    90 if __name__ == '__main__':
   163 if __name__ == '__main__':
    91     unittest_main()
   164     unittest_main()