server/test/unittest_storage.py
branchstable
changeset 6796 e70ca9abfc51
parent 6788 0f31ed3fff79
child 7057 daa1da99a071
equal deleted inserted replaced
6795:f29d24c3d687 6796:e70ca9abfc51
   121     def test_source_storage_transparency(self):
   121     def test_source_storage_transparency(self):
   122         with self.temporary_appobjects(DummyBeforeHook, DummyAfterHook):
   122         with self.temporary_appobjects(DummyBeforeHook, DummyAfterHook):
   123             self.create_file()
   123             self.create_file()
   124 
   124 
   125     def test_source_mapped_attribute_error_cases(self):
   125     def test_source_mapped_attribute_error_cases(self):
   126         ex = self.assertRaises(QueryError, self.execute,
   126         with self.assertRaises(QueryError) as cm:
   127                                'Any X WHERE X data ~= "hop", X is File')
   127             self.execute('Any X WHERE X data ~= "hop", X is File')
   128         self.assertEqual(str(ex), 'can\'t use File.data (X data ILIKE "hop") in restriction')
   128         self.assertEqual(str(cm.exception), 'can\'t use File.data (X data ILIKE "hop") in restriction')
   129         ex = self.assertRaises(QueryError, self.execute,
   129         with self.assertRaises(QueryError) as cm:
   130                                'Any X, Y WHERE X data D, Y data D, '
   130             self.execute('Any X, Y WHERE X data D, Y data D, '
   131                                'NOT X identity Y, X is File, Y is File')
   131                          'NOT X identity Y, X is File, Y is File')
   132         self.assertEqual(str(ex), "can't use D as a restriction variable")
   132         self.assertEqual(str(cm.exception), "can't use D as a restriction variable")
   133         # query returning mix of mapped / regular attributes (only file.data
   133         # query returning mix of mapped / regular attributes (only file.data
   134         # mapped, not image.data for instance)
   134         # mapped, not image.data for instance)
   135         ex = self.assertRaises(QueryError, self.execute,
   135         with self.assertRaises(QueryError) as cm:
   136                                'Any X WITH X BEING ('
   136             self.execute('Any X WITH X BEING ('
   137                                ' (Any NULL)'
   137                          ' (Any NULL)'
   138                                '  UNION '
   138                          '  UNION '
   139                                ' (Any D WHERE X data D, X is File)'
   139                          ' (Any D WHERE X data D, X is File)'
   140                                ')')
   140                          ')')
   141         self.assertEqual(str(ex), 'query fetch some source mapped attribute, some not')
   141         self.assertEqual(str(cm.exception), 'query fetch some source mapped attribute, some not')
   142         ex = self.assertRaises(QueryError, self.execute,
   142         with self.assertRaises(QueryError) as cm:
   143                                '(Any D WHERE X data D, X is File)'
   143             self.execute('(Any D WHERE X data D, X is File)'
   144                                ' UNION '
   144                          ' UNION '
   145                                '(Any D WHERE X title D, X is Bookmark)')
   145                          '(Any D WHERE X title D, X is Bookmark)')
   146         self.assertEqual(str(ex), 'query fetch some source mapped attribute, some not')
   146         self.assertEqual(str(cm.exception), 'query fetch some source mapped attribute, some not')
   147 
   147 
   148         storages.set_attribute_storage(self.repo, 'State', 'name',
   148         storages.set_attribute_storage(self.repo, 'State', 'name',
   149                                        storages.BytesFileSystemStorage(self.tempdir))
   149                                        storages.BytesFileSystemStorage(self.tempdir))
   150         try:
   150         try:
   151             ex = self.assertRaises(QueryError,
   151             with self.assertRaises(QueryError) as cm:
   152                                    self.execute, 'Any D WHERE X name D, X is IN (State, Transition)')
   152                 self.execute('Any D WHERE X name D, X is IN (State, Transition)')
   153             self.assertEqual(str(ex), 'query fetch some source mapped attribute, some not')
   153             self.assertEqual(str(cm.exception), 'query fetch some source mapped attribute, some not')
   154         finally:
   154         finally:
   155             storages.unset_attribute_storage(self.repo, 'State', 'name')
   155             storages.unset_attribute_storage(self.repo, 'State', 'name')
   156 
   156 
   157     def test_source_mapped_attribute_advanced(self):
   157     def test_source_mapped_attribute_advanced(self):
   158         f1 = self.create_file()
   158         f1 = self.create_file()
   179         self.assertEqual(len(rset), 2)
   179         self.assertEqual(len(rset), 2)
   180         self.assertEqual(rset[0][0], f1.eid)
   180         self.assertEqual(rset[0][0], f1.eid)
   181         self.assertEqual(rset[1][0], f1.eid)
   181         self.assertEqual(rset[1][0], f1.eid)
   182         self.assertEqual(rset[0][1], len('the-data'))
   182         self.assertEqual(rset[0][1], len('the-data'))
   183         self.assertEqual(rset[1][1], len('the-data'))
   183         self.assertEqual(rset[1][1], len('the-data'))
   184         ex = self.assertRaises(QueryError, self.execute,
   184         with self.assertRaises(QueryError) as cm:
   185                                'Any X,UPPER(D) WHERE X eid %(x)s, X data D',
   185             self.execute('Any X,UPPER(D) WHERE X eid %(x)s, X data D',
   186                                {'x': f1.eid})
   186                          {'x': f1.eid})
   187         self.assertEqual(str(ex), 'UPPER can not be called on mapped attribute')
   187         self.assertEqual(str(cm.exception), 'UPPER can not be called on mapped attribute')
   188 
   188 
   189 
   189 
   190     def test_bfss_fs_importing_transparency(self):
   190     def test_bfss_fs_importing_transparency(self):
   191         self.session.transaction_data['fs_importing'] = True
   191         self.session.transaction_data['fs_importing'] = True
   192         filepath = osp.abspath(__file__)
   192         filepath = osp.abspath(__file__)