--- a/server/test/unittest_storage.py Thu Jun 05 15:10:04 2014 +0200
+++ b/server/test/unittest_storage.py Wed May 28 17:57:40 2014 +0200
@@ -1,4 +1,4 @@
-# copyright 2003-2012 LOGILAB S.A. (Paris, FRANCE), all rights reserved.
+# copyright 2003-2014 LOGILAB S.A. (Paris, FRANCE), all rights reserved.
# contact http://www.logilab.fr/ -- mailto:contact@logilab.fr
#
# This file is part of CubicWeb.
@@ -28,7 +28,7 @@
from cubicweb import Binary, QueryError
from cubicweb.predicates import is_instance
from cubicweb.server.sources import storages
-from cubicweb.server.hook import Hook, Operation
+from cubicweb.server.hook import Hook
class DummyBeforeHook(Hook):
__regid__ = 'dummy-before-hook'
@@ -50,7 +50,7 @@
assert oldvalue == self.entity.data.getvalue()
class StorageTC(CubicWebTC):
-
+ tempdir = None
tags = CubicWebTC.tags | Tags('Storage', 'BFSS')
def setup_database(self):
@@ -65,255 +65,273 @@
shutil.rmtree(self.tempdir)
- def create_file(self, content='the-data'):
- req = self.request()
- return req.create_entity('File', data=Binary(content),
- data_format=u'text/plain', data_name=u'foo.pdf')
+ def create_file(self, cnx, content='the-data'):
+ return cnx.create_entity('File', data=Binary(content),
+ data_format=u'text/plain',
+ data_name=u'foo.pdf')
- def fspath(self, entity):
- fspath = self.execute('Any fspath(D) WHERE F eid %(f)s, F data D',
- {'f': entity.eid})[0][0]
+ def fspath(self, cnx, entity):
+ fspath = cnx.execute('Any fspath(D) WHERE F eid %(f)s, F data D',
+ {'f': entity.eid})[0][0]
return fspath.getvalue()
def test_bfss_wrong_fspath_usage(self):
- f1 = self.create_file()
- self.execute('Any fspath(D) WHERE F eid %(f)s, F data D', {'f': f1.eid})
- with self.assertRaises(NotImplementedError) as cm:
- self.execute('Any fspath(F) WHERE F eid %(f)s', {'f': f1.eid})
- self.assertEqual(str(cm.exception),
- 'This callback is only available for BytesFileSystemStorage '
- 'managed attribute. Is FSPATH() argument BFSS managed?')
+ with self.admin_access.repo_cnx() as cnx:
+ f1 = self.create_file(cnx)
+ cnx.execute('Any fspath(D) WHERE F eid %(f)s, F data D', {'f': f1.eid})
+ with self.assertRaises(NotImplementedError) as cm:
+ cnx.execute('Any fspath(F) WHERE F eid %(f)s', {'f': f1.eid})
+ self.assertEqual(str(cm.exception),
+ 'This callback is only available for BytesFileSystemStorage '
+ 'managed attribute. Is FSPATH() argument BFSS managed?')
def test_bfss_storage(self):
- f1 = self.create_file()
- expected_filepath = osp.join(self.tempdir, '%s_data_%s' %
- (f1.eid, f1.data_name))
- self.assertTrue(osp.isfile(expected_filepath))
- # file should be read only
- self.assertFalse(os.access(expected_filepath, os.W_OK))
- self.assertEqual(file(expected_filepath).read(), 'the-data')
- self.rollback()
- self.assertFalse(osp.isfile(expected_filepath))
- f1 = self.create_file()
- self.commit()
- self.assertEqual(file(expected_filepath).read(), 'the-data')
- f1.cw_set(data=Binary('the new data'))
- self.rollback()
- self.assertEqual(file(expected_filepath).read(), 'the-data')
- f1.cw_delete()
- self.assertTrue(osp.isfile(expected_filepath))
- self.rollback()
- self.assertTrue(osp.isfile(expected_filepath))
- f1.cw_delete()
- self.commit()
- self.assertFalse(osp.isfile(expected_filepath))
+ with self.admin_access.repo_cnx() as cnx:
+ f1 = self.create_file(cnx)
+ expected_filepath = osp.join(self.tempdir, '%s_data_%s' %
+ (f1.eid, f1.data_name))
+ self.assertTrue(osp.isfile(expected_filepath))
+ # file should be read only
+ self.assertFalse(os.access(expected_filepath, os.W_OK))
+ self.assertEqual(file(expected_filepath).read(), 'the-data')
+ cnx.rollback()
+ self.assertFalse(osp.isfile(expected_filepath))
+ f1 = self.create_file(cnx)
+ cnx.commit()
+ self.assertEqual(file(expected_filepath).read(), 'the-data')
+ f1.cw_set(data=Binary('the new data'))
+ cnx.rollback()
+ self.assertEqual(file(expected_filepath).read(), 'the-data')
+ f1.cw_delete()
+ self.assertTrue(osp.isfile(expected_filepath))
+ cnx.rollback()
+ self.assertTrue(osp.isfile(expected_filepath))
+ f1.cw_delete()
+ cnx.commit()
+ self.assertFalse(osp.isfile(expected_filepath))
def test_bfss_sqlite_fspath(self):
- f1 = self.create_file()
- expected_filepath = osp.join(self.tempdir, '%s_data_%s' % (f1.eid, f1.data_name))
- self.assertEqual(self.fspath(f1), expected_filepath)
+ with self.admin_access.repo_cnx() as cnx:
+ f1 = self.create_file(cnx)
+ expected_filepath = osp.join(self.tempdir, '%s_data_%s' % (f1.eid, f1.data_name))
+ self.assertEqual(self.fspath(cnx, f1), expected_filepath)
def test_bfss_fs_importing_doesnt_touch_path(self):
- self.session.transaction_data['fs_importing'] = True
- filepath = osp.abspath(__file__)
- f1 = self.request().create_entity('File', data=Binary(filepath),
- data_format=u'text/plain', data_name=u'foo')
- self.assertEqual(self.fspath(f1), filepath)
+ with self.admin_access.repo_cnx() as cnx:
+ cnx.transaction_data['fs_importing'] = True
+ filepath = osp.abspath(__file__)
+ f1 = cnx.create_entity('File', data=Binary(filepath),
+ data_format=u'text/plain', data_name=u'foo')
+ self.assertEqual(self.fspath(cnx, f1), filepath)
def test_source_storage_transparency(self):
- with self.temporary_appobjects(DummyBeforeHook, DummyAfterHook):
- self.create_file()
+ with self.admin_access.repo_cnx() as cnx:
+ with self.temporary_appobjects(DummyBeforeHook, DummyAfterHook):
+ self.create_file(cnx)
def test_source_mapped_attribute_error_cases(self):
- with self.assertRaises(QueryError) as cm:
- self.execute('Any X WHERE X data ~= "hop", X is File')
- self.assertEqual(str(cm.exception), 'can\'t use File.data (X data ILIKE "hop") in restriction')
- with self.assertRaises(QueryError) as cm:
- self.execute('Any X, Y WHERE X data D, Y data D, '
- 'NOT X identity Y, X is File, Y is File')
- self.assertEqual(str(cm.exception), "can't use D as a restriction variable")
- # query returning mix of mapped / regular attributes (only file.data
- # mapped, not image.data for instance)
- with self.assertRaises(QueryError) as cm:
- self.execute('Any X WITH X BEING ('
- ' (Any NULL)'
- ' UNION '
- ' (Any D WHERE X data D, X is File)'
- ')')
- self.assertEqual(str(cm.exception), 'query fetch some source mapped attribute, some not')
- with self.assertRaises(QueryError) as cm:
- self.execute('(Any D WHERE X data D, X is File)'
- ' UNION '
- '(Any D WHERE X title D, X is Bookmark)')
- self.assertEqual(str(cm.exception), 'query fetch some source mapped attribute, some not')
+ with self.admin_access.repo_cnx() as cnx:
+ with self.assertRaises(QueryError) as cm:
+ cnx.execute('Any X WHERE X data ~= "hop", X is File')
+ self.assertEqual(str(cm.exception), 'can\'t use File.data (X data ILIKE "hop") in restriction')
+ with self.assertRaises(QueryError) as cm:
+ cnx.execute('Any X, Y WHERE X data D, Y data D, '
+ 'NOT X identity Y, X is File, Y is File')
+ self.assertEqual(str(cm.exception), "can't use D as a restriction variable")
+ # query returning mix of mapped / regular attributes (only file.data
+ # mapped, not image.data for instance)
+ with self.assertRaises(QueryError) as cm:
+ cnx.execute('Any X WITH X BEING ('
+ ' (Any NULL)'
+ ' UNION '
+ ' (Any D WHERE X data D, X is File)'
+ ')')
+ self.assertEqual(str(cm.exception), 'query fetch some source mapped attribute, some not')
+ with self.assertRaises(QueryError) as cm:
+ cnx.execute('(Any D WHERE X data D, X is File)'
+ ' UNION '
+ '(Any D WHERE X title D, X is Bookmark)')
+ self.assertEqual(str(cm.exception), 'query fetch some source mapped attribute, some not')
- storages.set_attribute_storage(self.repo, 'State', 'name',
- storages.BytesFileSystemStorage(self.tempdir))
- try:
- with self.assertRaises(QueryError) as cm:
- self.execute('Any D WHERE X name D, X is IN (State, Transition)')
- self.assertEqual(str(cm.exception), 'query fetch some source mapped attribute, some not')
- finally:
- storages.unset_attribute_storage(self.repo, 'State', 'name')
+ storages.set_attribute_storage(self.repo, 'State', 'name',
+ storages.BytesFileSystemStorage(self.tempdir))
+ try:
+ with self.assertRaises(QueryError) as cm:
+ cnx.execute('Any D WHERE X name D, X is IN (State, Transition)')
+ self.assertEqual(str(cm.exception), 'query fetch some source mapped attribute, some not')
+ finally:
+ storages.unset_attribute_storage(self.repo, 'State', 'name')
def test_source_mapped_attribute_advanced(self):
- f1 = self.create_file()
- rset = self.execute('Any X,D WITH D,X BEING ('
- ' (Any D, X WHERE X eid %(x)s, X data D)'
- ' UNION '
- ' (Any D, X WHERE X eid %(x)s, X data D)'
- ')', {'x': f1.eid})
- self.assertEqual(len(rset), 2)
- self.assertEqual(rset[0][0], f1.eid)
- self.assertEqual(rset[1][0], f1.eid)
- self.assertEqual(rset[0][1].getvalue(), 'the-data')
- self.assertEqual(rset[1][1].getvalue(), 'the-data')
- rset = self.execute('Any X,LENGTH(D) WHERE X eid %(x)s, X data D',
- {'x': f1.eid})
- self.assertEqual(len(rset), 1)
- self.assertEqual(rset[0][0], f1.eid)
- self.assertEqual(rset[0][1], len('the-data'))
- rset = self.execute('Any X,LENGTH(D) WITH D,X BEING ('
- ' (Any D, X WHERE X eid %(x)s, X data D)'
- ' UNION '
- ' (Any D, X WHERE X eid %(x)s, X data D)'
- ')', {'x': f1.eid})
- self.assertEqual(len(rset), 2)
- self.assertEqual(rset[0][0], f1.eid)
- self.assertEqual(rset[1][0], f1.eid)
- self.assertEqual(rset[0][1], len('the-data'))
- self.assertEqual(rset[1][1], len('the-data'))
- with self.assertRaises(QueryError) as cm:
- self.execute('Any X,UPPER(D) WHERE X eid %(x)s, X data D',
- {'x': f1.eid})
- self.assertEqual(str(cm.exception), 'UPPER can not be called on mapped attribute')
+ with self.admin_access.repo_cnx() as cnx:
+ f1 = self.create_file(cnx)
+ rset = cnx.execute('Any X,D WITH D,X BEING ('
+ ' (Any D, X WHERE X eid %(x)s, X data D)'
+ ' UNION '
+ ' (Any D, X WHERE X eid %(x)s, X data D)'
+ ')', {'x': f1.eid})
+ self.assertEqual(len(rset), 2)
+ self.assertEqual(rset[0][0], f1.eid)
+ self.assertEqual(rset[1][0], f1.eid)
+ self.assertEqual(rset[0][1].getvalue(), 'the-data')
+ self.assertEqual(rset[1][1].getvalue(), 'the-data')
+ rset = cnx.execute('Any X,LENGTH(D) WHERE X eid %(x)s, X data D',
+ {'x': f1.eid})
+ self.assertEqual(len(rset), 1)
+ self.assertEqual(rset[0][0], f1.eid)
+ self.assertEqual(rset[0][1], len('the-data'))
+ rset = cnx.execute('Any X,LENGTH(D) WITH D,X BEING ('
+ ' (Any D, X WHERE X eid %(x)s, X data D)'
+ ' UNION '
+ ' (Any D, X WHERE X eid %(x)s, X data D)'
+ ')', {'x': f1.eid})
+ self.assertEqual(len(rset), 2)
+ self.assertEqual(rset[0][0], f1.eid)
+ self.assertEqual(rset[1][0], f1.eid)
+ self.assertEqual(rset[0][1], len('the-data'))
+ self.assertEqual(rset[1][1], len('the-data'))
+ with self.assertRaises(QueryError) as cm:
+ cnx.execute('Any X,UPPER(D) WHERE X eid %(x)s, X data D',
+ {'x': f1.eid})
+ self.assertEqual(str(cm.exception), 'UPPER can not be called on mapped attribute')
def test_bfss_fs_importing_transparency(self):
- self.session.transaction_data['fs_importing'] = True
- filepath = osp.abspath(__file__)
- f1 = self.session.create_entity('File', data=Binary(filepath),
- data_format=u'text/plain', data_name=u'foo')
- cw_value = f1.data.getvalue()
- fs_value = file(filepath).read()
- if cw_value != fs_value:
- self.fail('cw value %r is different from file content' % cw_value)
-
+ with self.admin_access.repo_cnx() as cnx:
+ cnx.transaction_data['fs_importing'] = True
+ filepath = osp.abspath(__file__)
+ f1 = cnx.create_entity('File', data=Binary(filepath),
+ data_format=u'text/plain', data_name=u'foo')
+ cw_value = f1.data.getvalue()
+ fs_value = file(filepath).read()
+ if cw_value != fs_value:
+ self.fail('cw value %r is different from file content' % cw_value)
@tag('update')
def test_bfss_update_with_existing_data(self):
- # use self.session to use server-side cache
- f1 = self.session.create_entity('File', data=Binary('some data'),
- data_format=u'text/plain', data_name=u'foo')
- # NOTE: do not use cw_set() which would automatically
- # update f1's local dict. We want the pure rql version to work
- self.execute('SET F data %(d)s WHERE F eid %(f)s',
- {'d': Binary('some other data'), 'f': f1.eid})
- self.assertEqual(f1.data.getvalue(), 'some other data')
- self.commit()
- f2 = self.execute('Any F WHERE F eid %(f)s, F is File', {'f': f1.eid}).get_entity(0, 0)
- self.assertEqual(f2.data.getvalue(), 'some other data')
+ with self.admin_access.repo_cnx() as cnx:
+ f1 = cnx.create_entity('File', data=Binary('some data'),
+ data_format=u'text/plain', data_name=u'foo')
+ # NOTE: do not use cw_set() which would automatically
+ # update f1's local dict. We want the pure rql version to work
+ cnx.execute('SET F data %(d)s WHERE F eid %(f)s',
+ {'d': Binary('some other data'), 'f': f1.eid})
+ self.assertEqual(f1.data.getvalue(), 'some other data')
+ cnx.commit()
+ f2 = cnx.execute('Any F WHERE F eid %(f)s, F is File', {'f': f1.eid}).get_entity(0, 0)
+ self.assertEqual(f2.data.getvalue(), 'some other data')
@tag('update', 'extension', 'commit')
def test_bfss_update_with_different_extension_commited(self):
- # use self.session to use server-side cache
- f1 = self.session.create_entity('File', data=Binary('some data'),
- data_format=u'text/plain', data_name=u'foo.txt')
- # NOTE: do not use cw_set() which would automatically
- # update f1's local dict. We want the pure rql version to work
- self.commit()
- old_path = self.fspath(f1)
- self.assertTrue(osp.isfile(old_path))
- self.assertEqual(osp.splitext(old_path)[1], '.txt')
- self.execute('SET F data %(d)s, F data_name %(dn)s, F data_format %(df)s WHERE F eid %(f)s',
- {'d': Binary('some other data'), 'f': f1.eid, 'dn': u'bar.jpg', 'df': u'image/jpeg'})
- self.commit()
- # the new file exists with correct extension
- # the old file is dead
- f2 = self.execute('Any F WHERE F eid %(f)s, F is File', {'f': f1.eid}).get_entity(0, 0)
- new_path = self.fspath(f2)
- self.assertFalse(osp.isfile(old_path))
- self.assertTrue(osp.isfile(new_path))
- self.assertEqual(osp.splitext(new_path)[1], '.jpg')
+ with self.admin_access.repo_cnx() as cnx:
+ f1 = cnx.create_entity('File', data=Binary('some data'),
+ data_format=u'text/plain', data_name=u'foo.txt')
+ # NOTE: do not use cw_set() which would automatically
+ # update f1's local dict. We want the pure rql version to work
+ cnx.commit()
+ old_path = self.fspath(cnx, f1)
+ self.assertTrue(osp.isfile(old_path))
+ self.assertEqual(osp.splitext(old_path)[1], '.txt')
+ cnx.execute('SET F data %(d)s, F data_name %(dn)s, '
+ 'F data_format %(df)s WHERE F eid %(f)s',
+ {'d': Binary('some other data'), 'f': f1.eid,
+ 'dn': u'bar.jpg', 'df': u'image/jpeg'})
+ cnx.commit()
+ # the new file exists with correct extension
+ # the old file is dead
+ f2 = cnx.execute('Any F WHERE F eid %(f)s, F is File', {'f': f1.eid}).get_entity(0, 0)
+ new_path = self.fspath(cnx, f2)
+ self.assertFalse(osp.isfile(old_path))
+ self.assertTrue(osp.isfile(new_path))
+ self.assertEqual(osp.splitext(new_path)[1], '.jpg')
@tag('update', 'extension', 'rollback')
def test_bfss_update_with_different_extension_rolled_back(self):
- # use self.session to use server-side cache
- f1 = self.session.create_entity('File', data=Binary('some data'),
- data_format=u'text/plain', data_name=u'foo.txt')
- # NOTE: do not use cw_set() which would automatically
- # update f1's local dict. We want the pure rql version to work
- self.commit()
- old_path = self.fspath(f1)
- old_data = f1.data.getvalue()
- self.assertTrue(osp.isfile(old_path))
- self.assertEqual(osp.splitext(old_path)[1], '.txt')
- self.execute('SET F data %(d)s, F data_name %(dn)s, F data_format %(df)s WHERE F eid %(f)s',
- {'d': Binary('some other data'), 'f': f1.eid, 'dn': u'bar.jpg', 'df': u'image/jpeg'})
- self.rollback()
- # the new file exists with correct extension
- # the old file is dead
- f2 = self.execute('Any F WHERE F eid %(f)s, F is File', {'f': f1.eid}).get_entity(0, 0)
- new_path = self.fspath(f2)
- new_data = f2.data.getvalue()
- self.assertTrue(osp.isfile(new_path))
- self.assertEqual(osp.splitext(new_path)[1], '.txt')
- self.assertEqual(old_path, new_path)
- self.assertEqual(old_data, new_data)
+ with self.admin_access.repo_cnx() as cnx:
+ f1 = cnx.create_entity('File', data=Binary('some data'),
+ data_format=u'text/plain', data_name=u'foo.txt')
+ # NOTE: do not use cw_set() which would automatically
+ # update f1's local dict. We want the pure rql version to work
+ cnx.commit()
+ old_path = self.fspath(cnx, f1)
+ old_data = f1.data.getvalue()
+ self.assertTrue(osp.isfile(old_path))
+ self.assertEqual(osp.splitext(old_path)[1], '.txt')
+ cnx.execute('SET F data %(d)s, F data_name %(dn)s, '
+ 'F data_format %(df)s WHERE F eid %(f)s',
+ {'d': Binary('some other data'),
+ 'f': f1.eid,
+ 'dn': u'bar.jpg',
+ 'df': u'image/jpeg'})
+ cnx.rollback()
+ # the new file exists with correct extension
+ # the old file is dead
+ f2 = cnx.execute('Any F WHERE F eid %(f)s, F is File',
+ {'f': f1.eid}).get_entity(0, 0)
+ new_path = self.fspath(cnx, f2)
+ new_data = f2.data.getvalue()
+ self.assertTrue(osp.isfile(new_path))
+ self.assertEqual(osp.splitext(new_path)[1], '.txt')
+ self.assertEqual(old_path, new_path)
+ self.assertEqual(old_data, new_data)
@tag('update', 'NULL')
def test_bfss_update_to_None(self):
- f = self.session.create_entity('Affaire', opt_attr=Binary('toto'))
- self.session.commit()
- self.session.set_cnxset()
- f.cw_set(opt_attr=None)
- self.session.commit()
+ with self.admin_access.repo_cnx() as cnx:
+ f = cnx.create_entity('Affaire', opt_attr=Binary('toto'))
+ cnx.commit()
+ f.cw_set(opt_attr=None)
+ cnx.commit()
@tag('fs_importing', 'update')
def test_bfss_update_with_fs_importing(self):
- # use self.session to use server-side cache
- f1 = self.session.create_entity('File', data=Binary('some data'),
- data_format=u'text/plain', data_name=u'foo')
- old_fspath = self.fspath(f1)
- self.session.transaction_data['fs_importing'] = True
- new_fspath = osp.join(self.tempdir, 'newfile.txt')
- file(new_fspath, 'w').write('the new data')
- self.execute('SET F data %(d)s WHERE F eid %(f)s',
- {'d': Binary(new_fspath), 'f': f1.eid})
- self.commit()
- self.assertEqual(f1.data.getvalue(), 'the new data')
- self.assertEqual(self.fspath(f1), new_fspath)
- self.assertFalse(osp.isfile(old_fspath))
+ with self.admin_access.repo_cnx() as cnx:
+ f1 = cnx.create_entity('File', data=Binary('some data'),
+ data_format=u'text/plain',
+ data_name=u'foo')
+ old_fspath = self.fspath(cnx, f1)
+ cnx.transaction_data['fs_importing'] = True
+ new_fspath = osp.join(self.tempdir, 'newfile.txt')
+ file(new_fspath, 'w').write('the new data')
+ cnx.execute('SET F data %(d)s WHERE F eid %(f)s',
+ {'d': Binary(new_fspath), 'f': f1.eid})
+ cnx.commit()
+ self.assertEqual(f1.data.getvalue(), 'the new data')
+ self.assertEqual(self.fspath(cnx, f1), new_fspath)
+ self.assertFalse(osp.isfile(old_fspath))
@tag('fsimport')
def test_clean(self):
- fsimport = storages.fsimport
- td = self.session.transaction_data
- self.assertNotIn('fs_importing', td)
- with fsimport(self.session):
- self.assertIn('fs_importing', td)
- self.assertTrue(td['fs_importing'])
- self.assertNotIn('fs_importing', td)
+ with self.admin_access.repo_cnx() as cnx:
+ fsimport = storages.fsimport
+ td = cnx.transaction_data
+ self.assertNotIn('fs_importing', td)
+ with fsimport(cnx):
+ self.assertIn('fs_importing', td)
+ self.assertTrue(td['fs_importing'])
+ self.assertNotIn('fs_importing', td)
@tag('fsimport')
def test_true(self):
- fsimport = storages.fsimport
- td = self.session.transaction_data
- td['fs_importing'] = True
- with fsimport(self.session):
- self.assertIn('fs_importing', td)
+ with self.admin_access.repo_cnx() as cnx:
+ fsimport = storages.fsimport
+ td = cnx.transaction_data
+ td['fs_importing'] = True
+ with fsimport(cnx):
+ self.assertIn('fs_importing', td)
+ self.assertTrue(td['fs_importing'])
self.assertTrue(td['fs_importing'])
- self.assertTrue(td['fs_importing'])
@tag('fsimport')
def test_False(self):
- fsimport = storages.fsimport
- td = self.session.transaction_data
- td['fs_importing'] = False
- with fsimport(self.session):
- self.assertIn('fs_importing', td)
- self.assertTrue(td['fs_importing'])
- self.assertFalse(td['fs_importing'])
+ with self.admin_access.repo_cnx() as cnx:
+ fsimport = storages.fsimport
+ td = cnx.transaction_data
+ td['fs_importing'] = False
+ with fsimport(cnx):
+ self.assertIn('fs_importing', td)
+ self.assertTrue(td['fs_importing'])
+ self.assertFalse(td['fs_importing'])
if __name__ == '__main__':
unittest_main()