server/test/unittest_storage.py
changeset 7791 31bb51ea5485
parent 7711 458cb2edf63a
child 7979 8bd5031e2201
equal deleted inserted replaced
7790:7e16e056eecb 7791:31bb51ea5485
    87 
    87 
    88     def test_bfss_storage(self):
    88     def test_bfss_storage(self):
    89         f1 = self.create_file()
    89         f1 = self.create_file()
    90         expected_filepath = osp.join(self.tempdir, '%s_data_%s' %
    90         expected_filepath = osp.join(self.tempdir, '%s_data_%s' %
    91                                      (f1.eid, f1.data_name))
    91                                      (f1.eid, f1.data_name))
    92         self.failUnless(osp.isfile(expected_filepath))
    92         self.assertTrue(osp.isfile(expected_filepath))
    93         self.assertEqual(file(expected_filepath).read(), 'the-data')
    93         self.assertEqual(file(expected_filepath).read(), 'the-data')
    94         self.rollback()
    94         self.rollback()
    95         self.failIf(osp.isfile(expected_filepath))
    95         self.assertFalse(osp.isfile(expected_filepath))
    96         f1 = self.create_file()
    96         f1 = self.create_file()
    97         self.commit()
    97         self.commit()
    98         self.assertEqual(file(expected_filepath).read(), 'the-data')
    98         self.assertEqual(file(expected_filepath).read(), 'the-data')
    99         f1.set_attributes(data=Binary('the new data'))
    99         f1.set_attributes(data=Binary('the new data'))
   100         self.rollback()
   100         self.rollback()
   101         self.assertEqual(file(expected_filepath).read(), 'the-data')
   101         self.assertEqual(file(expected_filepath).read(), 'the-data')
   102         f1.cw_delete()
   102         f1.cw_delete()
   103         self.failUnless(osp.isfile(expected_filepath))
   103         self.assertTrue(osp.isfile(expected_filepath))
   104         self.rollback()
   104         self.rollback()
   105         self.failUnless(osp.isfile(expected_filepath))
   105         self.assertTrue(osp.isfile(expected_filepath))
   106         f1.cw_delete()
   106         f1.cw_delete()
   107         self.commit()
   107         self.commit()
   108         self.failIf(osp.isfile(expected_filepath))
   108         self.assertFalse(osp.isfile(expected_filepath))
   109 
   109 
   110     def test_bfss_sqlite_fspath(self):
   110     def test_bfss_sqlite_fspath(self):
   111         f1 = self.create_file()
   111         f1 = self.create_file()
   112         expected_filepath = osp.join(self.tempdir, '%s_data_%s' % (f1.eid, f1.data_name))
   112         expected_filepath = osp.join(self.tempdir, '%s_data_%s' % (f1.eid, f1.data_name))
   113         self.assertEqual(self.fspath(f1), expected_filepath)
   113         self.assertEqual(self.fspath(f1), expected_filepath)
   217                                         data_format=u'text/plain', data_name=u'foo.txt')
   217                                         data_format=u'text/plain', data_name=u'foo.txt')
   218         # NOTE: do not use set_attributes() which would automatically
   218         # NOTE: do not use set_attributes() which would automatically
   219         #       update f1's local dict. We want the pure rql version to work
   219         #       update f1's local dict. We want the pure rql version to work
   220         self.commit()
   220         self.commit()
   221         old_path = self.fspath(f1)
   221         old_path = self.fspath(f1)
   222         self.failUnless(osp.isfile(old_path))
   222         self.assertTrue(osp.isfile(old_path))
   223         self.assertEqual(osp.splitext(old_path)[1], '.txt')
   223         self.assertEqual(osp.splitext(old_path)[1], '.txt')
   224         self.execute('SET F data %(d)s, F data_name %(dn)s, F data_format %(df)s WHERE F eid %(f)s',
   224         self.execute('SET F data %(d)s, F data_name %(dn)s, F data_format %(df)s WHERE F eid %(f)s',
   225                      {'d': Binary('some other data'), 'f': f1.eid, 'dn': u'bar.jpg', 'df': u'image/jpeg'})
   225                      {'d': Binary('some other data'), 'f': f1.eid, 'dn': u'bar.jpg', 'df': u'image/jpeg'})
   226         self.commit()
   226         self.commit()
   227         # the new file exists with correct extension
   227         # the new file exists with correct extension
   228         # the old file is dead
   228         # the old file is dead
   229         f2 = self.execute('Any F WHERE F eid %(f)s, F is File', {'f': f1.eid}).get_entity(0, 0)
   229         f2 = self.execute('Any F WHERE F eid %(f)s, F is File', {'f': f1.eid}).get_entity(0, 0)
   230         new_path = self.fspath(f2)
   230         new_path = self.fspath(f2)
   231         self.failIf(osp.isfile(old_path))
   231         self.assertFalse(osp.isfile(old_path))
   232         self.failUnless(osp.isfile(new_path))
   232         self.assertTrue(osp.isfile(new_path))
   233         self.assertEqual(osp.splitext(new_path)[1], '.jpg')
   233         self.assertEqual(osp.splitext(new_path)[1], '.jpg')
   234 
   234 
   235     @tag('update', 'extension', 'rollback')
   235     @tag('update', 'extension', 'rollback')
   236     def test_bfss_update_with_different_extension_rollbacked(self):
   236     def test_bfss_update_with_different_extension_rollbacked(self):
   237         # use self.session to use server-side cache
   237         # use self.session to use server-side cache
   240         # NOTE: do not use set_attributes() which would automatically
   240         # NOTE: do not use set_attributes() which would automatically
   241         #       update f1's local dict. We want the pure rql version to work
   241         #       update f1's local dict. We want the pure rql version to work
   242         self.commit()
   242         self.commit()
   243         old_path = self.fspath(f1)
   243         old_path = self.fspath(f1)
   244         old_data = f1.data.getvalue()
   244         old_data = f1.data.getvalue()
   245         self.failUnless(osp.isfile(old_path))
   245         self.assertTrue(osp.isfile(old_path))
   246         self.assertEqual(osp.splitext(old_path)[1], '.txt')
   246         self.assertEqual(osp.splitext(old_path)[1], '.txt')
   247         self.execute('SET F data %(d)s, F data_name %(dn)s, F data_format %(df)s WHERE F eid %(f)s',
   247         self.execute('SET F data %(d)s, F data_name %(dn)s, F data_format %(df)s WHERE F eid %(f)s',
   248                      {'d': Binary('some other data'), 'f': f1.eid, 'dn': u'bar.jpg', 'df': u'image/jpeg'})
   248                      {'d': Binary('some other data'), 'f': f1.eid, 'dn': u'bar.jpg', 'df': u'image/jpeg'})
   249         self.rollback()
   249         self.rollback()
   250         # the new file exists with correct extension
   250         # the new file exists with correct extension
   251         # the old file is dead
   251         # the old file is dead
   252         f2 = self.execute('Any F WHERE F eid %(f)s, F is File', {'f': f1.eid}).get_entity(0, 0)
   252         f2 = self.execute('Any F WHERE F eid %(f)s, F is File', {'f': f1.eid}).get_entity(0, 0)
   253         new_path = self.fspath(f2)
   253         new_path = self.fspath(f2)
   254         new_data = f2.data.getvalue()
   254         new_data = f2.data.getvalue()
   255         self.failUnless(osp.isfile(new_path))
   255         self.assertTrue(osp.isfile(new_path))
   256         self.assertEqual(osp.splitext(new_path)[1], '.txt')
   256         self.assertEqual(osp.splitext(new_path)[1], '.txt')
   257         self.assertEqual(old_path, new_path)
   257         self.assertEqual(old_path, new_path)
   258         self.assertEqual(old_data, new_data)
   258         self.assertEqual(old_data, new_data)
   259 
   259 
   260     @tag('update', 'NULL')
   260     @tag('update', 'NULL')
   277         self.execute('SET F data %(d)s WHERE F eid %(f)s',
   277         self.execute('SET F data %(d)s WHERE F eid %(f)s',
   278                      {'d': Binary(new_fspath), 'f': f1.eid})
   278                      {'d': Binary(new_fspath), 'f': f1.eid})
   279         self.commit()
   279         self.commit()
   280         self.assertEqual(f1.data.getvalue(), 'the new data')
   280         self.assertEqual(f1.data.getvalue(), 'the new data')
   281         self.assertEqual(self.fspath(f1), new_fspath)
   281         self.assertEqual(self.fspath(f1), new_fspath)
   282         self.failIf(osp.isfile(old_fspath))
   282         self.assertFalse(osp.isfile(old_fspath))
   283 
   283 
   284     @tag('fsimport')
   284     @tag('fsimport')
   285     def test_clean(self):
   285     def test_clean(self):
   286         fsimport = storages.fsimport
   286         fsimport = storages.fsimport
   287         td = self.session.transaction_data
   287         td = self.session.transaction_data