# HG changeset patch # User Aurelien Campeas # Date 1401292660 -7200 # Node ID 59c582ce68c88be8371b0268652be52ffa8ac53a # Parent 95e8fa2c8da8004dafdf4ccc0db355d997db7c34 [tests/storage] use the new connection api diff -r 95e8fa2c8da8 -r 59c582ce68c8 server/test/unittest_storage.py --- a/server/test/unittest_storage.py Thu Jun 05 15:10:04 2014 +0200 +++ b/server/test/unittest_storage.py Wed May 28 17:57:40 2014 +0200 @@ -1,4 +1,4 @@ -# copyright 2003-2012 LOGILAB S.A. (Paris, FRANCE), all rights reserved. +# copyright 2003-2014 LOGILAB S.A. (Paris, FRANCE), all rights reserved. # contact http://www.logilab.fr/ -- mailto:contact@logilab.fr # # This file is part of CubicWeb. @@ -28,7 +28,7 @@ from cubicweb import Binary, QueryError from cubicweb.predicates import is_instance from cubicweb.server.sources import storages -from cubicweb.server.hook import Hook, Operation +from cubicweb.server.hook import Hook class DummyBeforeHook(Hook): __regid__ = 'dummy-before-hook' @@ -50,7 +50,7 @@ assert oldvalue == self.entity.data.getvalue() class StorageTC(CubicWebTC): - + tempdir = None tags = CubicWebTC.tags | Tags('Storage', 'BFSS') def setup_database(self): @@ -65,255 +65,273 @@ shutil.rmtree(self.tempdir) - def create_file(self, content='the-data'): - req = self.request() - return req.create_entity('File', data=Binary(content), - data_format=u'text/plain', data_name=u'foo.pdf') + def create_file(self, cnx, content='the-data'): + return cnx.create_entity('File', data=Binary(content), + data_format=u'text/plain', + data_name=u'foo.pdf') - def fspath(self, entity): - fspath = self.execute('Any fspath(D) WHERE F eid %(f)s, F data D', - {'f': entity.eid})[0][0] + def fspath(self, cnx, entity): + fspath = cnx.execute('Any fspath(D) WHERE F eid %(f)s, F data D', + {'f': entity.eid})[0][0] return fspath.getvalue() def test_bfss_wrong_fspath_usage(self): - f1 = self.create_file() - self.execute('Any fspath(D) WHERE F eid %(f)s, F data D', {'f': f1.eid}) - with self.assertRaises(NotImplementedError) as cm: - self.execute('Any fspath(F) WHERE F eid %(f)s', {'f': f1.eid}) - self.assertEqual(str(cm.exception), - 'This callback is only available for BytesFileSystemStorage ' - 'managed attribute. Is FSPATH() argument BFSS managed?') + with self.admin_access.repo_cnx() as cnx: + f1 = self.create_file(cnx) + cnx.execute('Any fspath(D) WHERE F eid %(f)s, F data D', {'f': f1.eid}) + with self.assertRaises(NotImplementedError) as cm: + cnx.execute('Any fspath(F) WHERE F eid %(f)s', {'f': f1.eid}) + self.assertEqual(str(cm.exception), + 'This callback is only available for BytesFileSystemStorage ' + 'managed attribute. Is FSPATH() argument BFSS managed?') def test_bfss_storage(self): - f1 = self.create_file() - expected_filepath = osp.join(self.tempdir, '%s_data_%s' % - (f1.eid, f1.data_name)) - self.assertTrue(osp.isfile(expected_filepath)) - # file should be read only - self.assertFalse(os.access(expected_filepath, os.W_OK)) - self.assertEqual(file(expected_filepath).read(), 'the-data') - self.rollback() - self.assertFalse(osp.isfile(expected_filepath)) - f1 = self.create_file() - self.commit() - self.assertEqual(file(expected_filepath).read(), 'the-data') - f1.cw_set(data=Binary('the new data')) - self.rollback() - self.assertEqual(file(expected_filepath).read(), 'the-data') - f1.cw_delete() - self.assertTrue(osp.isfile(expected_filepath)) - self.rollback() - self.assertTrue(osp.isfile(expected_filepath)) - f1.cw_delete() - self.commit() - self.assertFalse(osp.isfile(expected_filepath)) + with self.admin_access.repo_cnx() as cnx: + f1 = self.create_file(cnx) + expected_filepath = osp.join(self.tempdir, '%s_data_%s' % + (f1.eid, f1.data_name)) + self.assertTrue(osp.isfile(expected_filepath)) + # file should be read only + self.assertFalse(os.access(expected_filepath, os.W_OK)) + self.assertEqual(file(expected_filepath).read(), 'the-data') + cnx.rollback() + self.assertFalse(osp.isfile(expected_filepath)) + f1 = self.create_file(cnx) + cnx.commit() + self.assertEqual(file(expected_filepath).read(), 'the-data') + f1.cw_set(data=Binary('the new data')) + cnx.rollback() + self.assertEqual(file(expected_filepath).read(), 'the-data') + f1.cw_delete() + self.assertTrue(osp.isfile(expected_filepath)) + cnx.rollback() + self.assertTrue(osp.isfile(expected_filepath)) + f1.cw_delete() + cnx.commit() + self.assertFalse(osp.isfile(expected_filepath)) def test_bfss_sqlite_fspath(self): - f1 = self.create_file() - expected_filepath = osp.join(self.tempdir, '%s_data_%s' % (f1.eid, f1.data_name)) - self.assertEqual(self.fspath(f1), expected_filepath) + with self.admin_access.repo_cnx() as cnx: + f1 = self.create_file(cnx) + expected_filepath = osp.join(self.tempdir, '%s_data_%s' % (f1.eid, f1.data_name)) + self.assertEqual(self.fspath(cnx, f1), expected_filepath) def test_bfss_fs_importing_doesnt_touch_path(self): - self.session.transaction_data['fs_importing'] = True - filepath = osp.abspath(__file__) - f1 = self.request().create_entity('File', data=Binary(filepath), - data_format=u'text/plain', data_name=u'foo') - self.assertEqual(self.fspath(f1), filepath) + with self.admin_access.repo_cnx() as cnx: + cnx.transaction_data['fs_importing'] = True + filepath = osp.abspath(__file__) + f1 = cnx.create_entity('File', data=Binary(filepath), + data_format=u'text/plain', data_name=u'foo') + self.assertEqual(self.fspath(cnx, f1), filepath) def test_source_storage_transparency(self): - with self.temporary_appobjects(DummyBeforeHook, DummyAfterHook): - self.create_file() + with self.admin_access.repo_cnx() as cnx: + with self.temporary_appobjects(DummyBeforeHook, DummyAfterHook): + self.create_file(cnx) def test_source_mapped_attribute_error_cases(self): - with self.assertRaises(QueryError) as cm: - self.execute('Any X WHERE X data ~= "hop", X is File') - self.assertEqual(str(cm.exception), 'can\'t use File.data (X data ILIKE "hop") in restriction') - with self.assertRaises(QueryError) as cm: - self.execute('Any X, Y WHERE X data D, Y data D, ' - 'NOT X identity Y, X is File, Y is File') - self.assertEqual(str(cm.exception), "can't use D as a restriction variable") - # query returning mix of mapped / regular attributes (only file.data - # mapped, not image.data for instance) - with self.assertRaises(QueryError) as cm: - self.execute('Any X WITH X BEING (' - ' (Any NULL)' - ' UNION ' - ' (Any D WHERE X data D, X is File)' - ')') - self.assertEqual(str(cm.exception), 'query fetch some source mapped attribute, some not') - with self.assertRaises(QueryError) as cm: - self.execute('(Any D WHERE X data D, X is File)' - ' UNION ' - '(Any D WHERE X title D, X is Bookmark)') - self.assertEqual(str(cm.exception), 'query fetch some source mapped attribute, some not') + with self.admin_access.repo_cnx() as cnx: + with self.assertRaises(QueryError) as cm: + cnx.execute('Any X WHERE X data ~= "hop", X is File') + self.assertEqual(str(cm.exception), 'can\'t use File.data (X data ILIKE "hop") in restriction') + with self.assertRaises(QueryError) as cm: + cnx.execute('Any X, Y WHERE X data D, Y data D, ' + 'NOT X identity Y, X is File, Y is File') + self.assertEqual(str(cm.exception), "can't use D as a restriction variable") + # query returning mix of mapped / regular attributes (only file.data + # mapped, not image.data for instance) + with self.assertRaises(QueryError) as cm: + cnx.execute('Any X WITH X BEING (' + ' (Any NULL)' + ' UNION ' + ' (Any D WHERE X data D, X is File)' + ')') + self.assertEqual(str(cm.exception), 'query fetch some source mapped attribute, some not') + with self.assertRaises(QueryError) as cm: + cnx.execute('(Any D WHERE X data D, X is File)' + ' UNION ' + '(Any D WHERE X title D, X is Bookmark)') + self.assertEqual(str(cm.exception), 'query fetch some source mapped attribute, some not') - storages.set_attribute_storage(self.repo, 'State', 'name', - storages.BytesFileSystemStorage(self.tempdir)) - try: - with self.assertRaises(QueryError) as cm: - self.execute('Any D WHERE X name D, X is IN (State, Transition)') - self.assertEqual(str(cm.exception), 'query fetch some source mapped attribute, some not') - finally: - storages.unset_attribute_storage(self.repo, 'State', 'name') + storages.set_attribute_storage(self.repo, 'State', 'name', + storages.BytesFileSystemStorage(self.tempdir)) + try: + with self.assertRaises(QueryError) as cm: + cnx.execute('Any D WHERE X name D, X is IN (State, Transition)') + self.assertEqual(str(cm.exception), 'query fetch some source mapped attribute, some not') + finally: + storages.unset_attribute_storage(self.repo, 'State', 'name') def test_source_mapped_attribute_advanced(self): - f1 = self.create_file() - rset = self.execute('Any X,D WITH D,X BEING (' - ' (Any D, X WHERE X eid %(x)s, X data D)' - ' UNION ' - ' (Any D, X WHERE X eid %(x)s, X data D)' - ')', {'x': f1.eid}) - self.assertEqual(len(rset), 2) - self.assertEqual(rset[0][0], f1.eid) - self.assertEqual(rset[1][0], f1.eid) - self.assertEqual(rset[0][1].getvalue(), 'the-data') - self.assertEqual(rset[1][1].getvalue(), 'the-data') - rset = self.execute('Any X,LENGTH(D) WHERE X eid %(x)s, X data D', - {'x': f1.eid}) - self.assertEqual(len(rset), 1) - self.assertEqual(rset[0][0], f1.eid) - self.assertEqual(rset[0][1], len('the-data')) - rset = self.execute('Any X,LENGTH(D) WITH D,X BEING (' - ' (Any D, X WHERE X eid %(x)s, X data D)' - ' UNION ' - ' (Any D, X WHERE X eid %(x)s, X data D)' - ')', {'x': f1.eid}) - self.assertEqual(len(rset), 2) - self.assertEqual(rset[0][0], f1.eid) - self.assertEqual(rset[1][0], f1.eid) - self.assertEqual(rset[0][1], len('the-data')) - self.assertEqual(rset[1][1], len('the-data')) - with self.assertRaises(QueryError) as cm: - self.execute('Any X,UPPER(D) WHERE X eid %(x)s, X data D', - {'x': f1.eid}) - self.assertEqual(str(cm.exception), 'UPPER can not be called on mapped attribute') + with self.admin_access.repo_cnx() as cnx: + f1 = self.create_file(cnx) + rset = cnx.execute('Any X,D WITH D,X BEING (' + ' (Any D, X WHERE X eid %(x)s, X data D)' + ' UNION ' + ' (Any D, X WHERE X eid %(x)s, X data D)' + ')', {'x': f1.eid}) + self.assertEqual(len(rset), 2) + self.assertEqual(rset[0][0], f1.eid) + self.assertEqual(rset[1][0], f1.eid) + self.assertEqual(rset[0][1].getvalue(), 'the-data') + self.assertEqual(rset[1][1].getvalue(), 'the-data') + rset = cnx.execute('Any X,LENGTH(D) WHERE X eid %(x)s, X data D', + {'x': f1.eid}) + self.assertEqual(len(rset), 1) + self.assertEqual(rset[0][0], f1.eid) + self.assertEqual(rset[0][1], len('the-data')) + rset = cnx.execute('Any X,LENGTH(D) WITH D,X BEING (' + ' (Any D, X WHERE X eid %(x)s, X data D)' + ' UNION ' + ' (Any D, X WHERE X eid %(x)s, X data D)' + ')', {'x': f1.eid}) + self.assertEqual(len(rset), 2) + self.assertEqual(rset[0][0], f1.eid) + self.assertEqual(rset[1][0], f1.eid) + self.assertEqual(rset[0][1], len('the-data')) + self.assertEqual(rset[1][1], len('the-data')) + with self.assertRaises(QueryError) as cm: + cnx.execute('Any X,UPPER(D) WHERE X eid %(x)s, X data D', + {'x': f1.eid}) + self.assertEqual(str(cm.exception), 'UPPER can not be called on mapped attribute') def test_bfss_fs_importing_transparency(self): - self.session.transaction_data['fs_importing'] = True - filepath = osp.abspath(__file__) - f1 = self.session.create_entity('File', data=Binary(filepath), - data_format=u'text/plain', data_name=u'foo') - cw_value = f1.data.getvalue() - fs_value = file(filepath).read() - if cw_value != fs_value: - self.fail('cw value %r is different from file content' % cw_value) - + with self.admin_access.repo_cnx() as cnx: + cnx.transaction_data['fs_importing'] = True + filepath = osp.abspath(__file__) + f1 = cnx.create_entity('File', data=Binary(filepath), + data_format=u'text/plain', data_name=u'foo') + cw_value = f1.data.getvalue() + fs_value = file(filepath).read() + if cw_value != fs_value: + self.fail('cw value %r is different from file content' % cw_value) @tag('update') def test_bfss_update_with_existing_data(self): - # use self.session to use server-side cache - f1 = self.session.create_entity('File', data=Binary('some data'), - data_format=u'text/plain', data_name=u'foo') - # NOTE: do not use cw_set() which would automatically - # update f1's local dict. We want the pure rql version to work - self.execute('SET F data %(d)s WHERE F eid %(f)s', - {'d': Binary('some other data'), 'f': f1.eid}) - self.assertEqual(f1.data.getvalue(), 'some other data') - self.commit() - f2 = self.execute('Any F WHERE F eid %(f)s, F is File', {'f': f1.eid}).get_entity(0, 0) - self.assertEqual(f2.data.getvalue(), 'some other data') + with self.admin_access.repo_cnx() as cnx: + f1 = cnx.create_entity('File', data=Binary('some data'), + data_format=u'text/plain', data_name=u'foo') + # NOTE: do not use cw_set() which would automatically + # update f1's local dict. We want the pure rql version to work + cnx.execute('SET F data %(d)s WHERE F eid %(f)s', + {'d': Binary('some other data'), 'f': f1.eid}) + self.assertEqual(f1.data.getvalue(), 'some other data') + cnx.commit() + f2 = cnx.execute('Any F WHERE F eid %(f)s, F is File', {'f': f1.eid}).get_entity(0, 0) + self.assertEqual(f2.data.getvalue(), 'some other data') @tag('update', 'extension', 'commit') def test_bfss_update_with_different_extension_commited(self): - # use self.session to use server-side cache - f1 = self.session.create_entity('File', data=Binary('some data'), - data_format=u'text/plain', data_name=u'foo.txt') - # NOTE: do not use cw_set() which would automatically - # update f1's local dict. We want the pure rql version to work - self.commit() - old_path = self.fspath(f1) - self.assertTrue(osp.isfile(old_path)) - self.assertEqual(osp.splitext(old_path)[1], '.txt') - self.execute('SET F data %(d)s, F data_name %(dn)s, F data_format %(df)s WHERE F eid %(f)s', - {'d': Binary('some other data'), 'f': f1.eid, 'dn': u'bar.jpg', 'df': u'image/jpeg'}) - self.commit() - # the new file exists with correct extension - # the old file is dead - f2 = self.execute('Any F WHERE F eid %(f)s, F is File', {'f': f1.eid}).get_entity(0, 0) - new_path = self.fspath(f2) - self.assertFalse(osp.isfile(old_path)) - self.assertTrue(osp.isfile(new_path)) - self.assertEqual(osp.splitext(new_path)[1], '.jpg') + with self.admin_access.repo_cnx() as cnx: + f1 = cnx.create_entity('File', data=Binary('some data'), + data_format=u'text/plain', data_name=u'foo.txt') + # NOTE: do not use cw_set() which would automatically + # update f1's local dict. We want the pure rql version to work + cnx.commit() + old_path = self.fspath(cnx, f1) + self.assertTrue(osp.isfile(old_path)) + self.assertEqual(osp.splitext(old_path)[1], '.txt') + cnx.execute('SET F data %(d)s, F data_name %(dn)s, ' + 'F data_format %(df)s WHERE F eid %(f)s', + {'d': Binary('some other data'), 'f': f1.eid, + 'dn': u'bar.jpg', 'df': u'image/jpeg'}) + cnx.commit() + # the new file exists with correct extension + # the old file is dead + f2 = cnx.execute('Any F WHERE F eid %(f)s, F is File', {'f': f1.eid}).get_entity(0, 0) + new_path = self.fspath(cnx, f2) + self.assertFalse(osp.isfile(old_path)) + self.assertTrue(osp.isfile(new_path)) + self.assertEqual(osp.splitext(new_path)[1], '.jpg') @tag('update', 'extension', 'rollback') def test_bfss_update_with_different_extension_rolled_back(self): - # use self.session to use server-side cache - f1 = self.session.create_entity('File', data=Binary('some data'), - data_format=u'text/plain', data_name=u'foo.txt') - # NOTE: do not use cw_set() which would automatically - # update f1's local dict. We want the pure rql version to work - self.commit() - old_path = self.fspath(f1) - old_data = f1.data.getvalue() - self.assertTrue(osp.isfile(old_path)) - self.assertEqual(osp.splitext(old_path)[1], '.txt') - self.execute('SET F data %(d)s, F data_name %(dn)s, F data_format %(df)s WHERE F eid %(f)s', - {'d': Binary('some other data'), 'f': f1.eid, 'dn': u'bar.jpg', 'df': u'image/jpeg'}) - self.rollback() - # the new file exists with correct extension - # the old file is dead - f2 = self.execute('Any F WHERE F eid %(f)s, F is File', {'f': f1.eid}).get_entity(0, 0) - new_path = self.fspath(f2) - new_data = f2.data.getvalue() - self.assertTrue(osp.isfile(new_path)) - self.assertEqual(osp.splitext(new_path)[1], '.txt') - self.assertEqual(old_path, new_path) - self.assertEqual(old_data, new_data) + with self.admin_access.repo_cnx() as cnx: + f1 = cnx.create_entity('File', data=Binary('some data'), + data_format=u'text/plain', data_name=u'foo.txt') + # NOTE: do not use cw_set() which would automatically + # update f1's local dict. We want the pure rql version to work + cnx.commit() + old_path = self.fspath(cnx, f1) + old_data = f1.data.getvalue() + self.assertTrue(osp.isfile(old_path)) + self.assertEqual(osp.splitext(old_path)[1], '.txt') + cnx.execute('SET F data %(d)s, F data_name %(dn)s, ' + 'F data_format %(df)s WHERE F eid %(f)s', + {'d': Binary('some other data'), + 'f': f1.eid, + 'dn': u'bar.jpg', + 'df': u'image/jpeg'}) + cnx.rollback() + # the new file exists with correct extension + # the old file is dead + f2 = cnx.execute('Any F WHERE F eid %(f)s, F is File', + {'f': f1.eid}).get_entity(0, 0) + new_path = self.fspath(cnx, f2) + new_data = f2.data.getvalue() + self.assertTrue(osp.isfile(new_path)) + self.assertEqual(osp.splitext(new_path)[1], '.txt') + self.assertEqual(old_path, new_path) + self.assertEqual(old_data, new_data) @tag('update', 'NULL') def test_bfss_update_to_None(self): - f = self.session.create_entity('Affaire', opt_attr=Binary('toto')) - self.session.commit() - self.session.set_cnxset() - f.cw_set(opt_attr=None) - self.session.commit() + with self.admin_access.repo_cnx() as cnx: + f = cnx.create_entity('Affaire', opt_attr=Binary('toto')) + cnx.commit() + f.cw_set(opt_attr=None) + cnx.commit() @tag('fs_importing', 'update') def test_bfss_update_with_fs_importing(self): - # use self.session to use server-side cache - f1 = self.session.create_entity('File', data=Binary('some data'), - data_format=u'text/plain', data_name=u'foo') - old_fspath = self.fspath(f1) - self.session.transaction_data['fs_importing'] = True - new_fspath = osp.join(self.tempdir, 'newfile.txt') - file(new_fspath, 'w').write('the new data') - self.execute('SET F data %(d)s WHERE F eid %(f)s', - {'d': Binary(new_fspath), 'f': f1.eid}) - self.commit() - self.assertEqual(f1.data.getvalue(), 'the new data') - self.assertEqual(self.fspath(f1), new_fspath) - self.assertFalse(osp.isfile(old_fspath)) + with self.admin_access.repo_cnx() as cnx: + f1 = cnx.create_entity('File', data=Binary('some data'), + data_format=u'text/plain', + data_name=u'foo') + old_fspath = self.fspath(cnx, f1) + cnx.transaction_data['fs_importing'] = True + new_fspath = osp.join(self.tempdir, 'newfile.txt') + file(new_fspath, 'w').write('the new data') + cnx.execute('SET F data %(d)s WHERE F eid %(f)s', + {'d': Binary(new_fspath), 'f': f1.eid}) + cnx.commit() + self.assertEqual(f1.data.getvalue(), 'the new data') + self.assertEqual(self.fspath(cnx, f1), new_fspath) + self.assertFalse(osp.isfile(old_fspath)) @tag('fsimport') def test_clean(self): - fsimport = storages.fsimport - td = self.session.transaction_data - self.assertNotIn('fs_importing', td) - with fsimport(self.session): - self.assertIn('fs_importing', td) - self.assertTrue(td['fs_importing']) - self.assertNotIn('fs_importing', td) + with self.admin_access.repo_cnx() as cnx: + fsimport = storages.fsimport + td = cnx.transaction_data + self.assertNotIn('fs_importing', td) + with fsimport(cnx): + self.assertIn('fs_importing', td) + self.assertTrue(td['fs_importing']) + self.assertNotIn('fs_importing', td) @tag('fsimport') def test_true(self): - fsimport = storages.fsimport - td = self.session.transaction_data - td['fs_importing'] = True - with fsimport(self.session): - self.assertIn('fs_importing', td) + with self.admin_access.repo_cnx() as cnx: + fsimport = storages.fsimport + td = cnx.transaction_data + td['fs_importing'] = True + with fsimport(cnx): + self.assertIn('fs_importing', td) + self.assertTrue(td['fs_importing']) self.assertTrue(td['fs_importing']) - self.assertTrue(td['fs_importing']) @tag('fsimport') def test_False(self): - fsimport = storages.fsimport - td = self.session.transaction_data - td['fs_importing'] = False - with fsimport(self.session): - self.assertIn('fs_importing', td) - self.assertTrue(td['fs_importing']) - self.assertFalse(td['fs_importing']) + with self.admin_access.repo_cnx() as cnx: + fsimport = storages.fsimport + td = cnx.transaction_data + td['fs_importing'] = False + with fsimport(cnx): + self.assertIn('fs_importing', td) + self.assertTrue(td['fs_importing']) + self.assertFalse(td['fs_importing']) if __name__ == '__main__': unittest_main()