server/test/unittest_migractions.py
changeset 9963 5531f5577b50
parent 9962 64b573d54133
child 9964 f4a3ee05cf9d
equal deleted inserted replaced
9962:64b573d54133 9963:5531f5577b50
    74     def mh(self):
    74     def mh(self):
    75         with self.admin_access.client_cnx() as cnx:
    75         with self.admin_access.client_cnx() as cnx:
    76             yield cnx, ServerMigrationHelper(self.repo.config, migrschema,
    76             yield cnx, ServerMigrationHelper(self.repo.config, migrschema,
    77                                              repo=self.repo, cnx=cnx,
    77                                              repo=self.repo, cnx=cnx,
    78                                              interactive=False)
    78                                              interactive=False)
       
    79 
       
    80     def table_sql(self, mh, tablename):
       
    81         result = mh.sqlexec("SELECT sql FROM sqlite_master WHERE type='table' "
       
    82                             "and name=%(table)s", {'table': tablename})
       
    83         if result:
       
    84             return result[0][0]
       
    85         return None # no such table
       
    86 
       
    87     def table_schema(self, mh, tablename):
       
    88         sql = self.table_sql(mh, tablename)
       
    89         assert sql, 'no table %s' % tablename
       
    90         return dict(x.split()[:2]
       
    91                     for x in sql.split('(', 1)[1].rsplit(')', 1)[0].split(','))
    79 
    92 
    80 
    93 
    81 class MigrationCommandsTC(MigrationTC):
    94 class MigrationCommandsTC(MigrationTC):
    82 
    95 
    83     def _init_repo(self):
    96     def _init_repo(self):
   144             mh.cmd_add_attribute('Note', 'shortpara')
   157             mh.cmd_add_attribute('Note', 'shortpara')
   145             self.assertIn('shortpara', self.schema)
   158             self.assertIn('shortpara', self.schema)
   146             self.assertEqual(self.schema['shortpara'].subjects(), ('Note', ))
   159             self.assertEqual(self.schema['shortpara'].subjects(), ('Note', ))
   147             self.assertEqual(self.schema['shortpara'].objects(), ('String', ))
   160             self.assertEqual(self.schema['shortpara'].objects(), ('String', ))
   148             # test created column is actually a varchar(64)
   161             # test created column is actually a varchar(64)
   149             notesql = mh.sqlexec("SELECT sql FROM sqlite_master WHERE type='table' and name='%sNote'" % SQL_PREFIX)[0][0]
   162             fields = self.table_schema(mh, '%sNote' % SQL_PREFIX)
   150             fields = dict(x.strip().split()[:2] for x in notesql.split('(', 1)[1].rsplit(')', 1)[0].split(','))
       
   151             self.assertEqual(fields['%sshortpara' % SQL_PREFIX], 'varchar(64)')
   163             self.assertEqual(fields['%sshortpara' % SQL_PREFIX], 'varchar(64)')
   152             # test default value set on existing entities
   164             # test default value set on existing entities
   153             self.assertEqual(cnx.execute('Note X').get_entity(0, 0).shortpara, 'hop')
   165             self.assertEqual(cnx.execute('Note X').get_entity(0, 0).shortpara, 'hop')
   154             # test default value set for next entities
   166             # test default value set for next entities
   155             self.assertEqual(cnx.create_entity('Note').shortpara, 'hop')
   167             self.assertEqual(cnx.create_entity('Note').shortpara, 'hop')
   665             self.assertEqual(sorted(et.type for et in self.schema['Para'].specialized_by()),
   677             self.assertEqual(sorted(et.type for et in self.schema['Para'].specialized_by()),
   666                              [])
   678                              [])
   667             self.assertEqual(self.schema['Note'].specializes(), None)
   679             self.assertEqual(self.schema['Note'].specializes(), None)
   668             self.assertEqual(self.schema['Text'].specializes(), None)
   680             self.assertEqual(self.schema['Text'].specializes(), None)
   669 
   681 
   670 
       
   671     def test_add_symmetric_relation_type(self):
   682     def test_add_symmetric_relation_type(self):
   672         with self.mh() as (cnx, mh):
   683         with self.mh() as (cnx, mh):
   673             same_as_sql = mh.sqlexec("SELECT sql FROM sqlite_master WHERE type='table' "
   684             self.assertFalse(self.table_sql(mh, 'same_as_relation'))
   674                                      "and name='same_as_relation'")
       
   675             self.assertFalse(same_as_sql)
       
   676             mh.cmd_add_relation_type('same_as')
   685             mh.cmd_add_relation_type('same_as')
   677             same_as_sql = mh.sqlexec("SELECT sql FROM sqlite_master WHERE type='table' "
   686             self.assertTrue(self.table_sql(mh, 'same_as_relation'))
   678                                      "and name='same_as_relation'")
       
   679             self.assertTrue(same_as_sql)
       
   680 
   687 
   681 
   688 
   682 class MigrationCommandsComputedTC(MigrationTC):
   689 class MigrationCommandsComputedTC(MigrationTC):
   683     """ Unit tests for computed relations and attributes
   690     """ Unit tests for computed relations and attributes
   684     """
   691     """
   701                 mh.cmd_drop_relation_definition('Company', 'notes', 'Note')
   708                 mh.cmd_drop_relation_definition('Company', 'notes', 'Note')
   702         self.assertEqual(str(exc.exception),
   709         self.assertEqual(str(exc.exception),
   703                          'Cannot drop a relation definition for a computed '
   710                          'Cannot drop a relation definition for a computed '
   704                          'relation (notes)')
   711                          'relation (notes)')
   705 
   712 
       
   713     def test_computed_relation_add_relation_type(self):
       
   714         self.assertNotIn('works_for', self.schema)
       
   715         with self.mh() as (cnx, mh):
       
   716             mh.cmd_add_relation_type('works_for')
       
   717             self.assertIn('works_for', self.schema)
       
   718             self.assertEqual(self.schema['works_for'].rule,
       
   719                              'O employees S, NOT EXISTS (O associates S)')
       
   720             self.assertEqual(self.schema['works_for'].objects(), ('Company',))
       
   721             self.assertEqual(self.schema['works_for'].subjects(), ('Employee',))
       
   722             self.assertFalse(self.table_sql(mh, 'works_for_relation'))
       
   723             e = cnx.create_entity('Employee')
       
   724             a = cnx.create_entity('Employee')
       
   725             cnx.create_entity('Company', employees=e, associates=a)
       
   726             cnx.commit()
       
   727             company = cnx.execute('Company X').get_entity(0, 0)
       
   728             self.assertEqual([e.eid],
       
   729                              [x.eid for x in company.reverse_works_for])
       
   730             mh.rollback()
       
   731 
   706     def test_computed_relation_drop_relation_type(self):
   732     def test_computed_relation_drop_relation_type(self):
   707         self.assertIn('notes', self.schema)
   733         self.assertIn('notes', self.schema)
   708         with self.mh() as (cnx, mh):
   734         with self.mh() as (cnx, mh):
   709             mh.cmd_drop_relation_type('notes')
   735             mh.cmd_drop_relation_type('notes')
   710         self.assertNotIn('notes', self.schema)
   736         self.assertNotIn('notes', self.schema)