74 def mh(self): |
74 def mh(self): |
75 with self.admin_access.client_cnx() as cnx: |
75 with self.admin_access.client_cnx() as cnx: |
76 yield cnx, ServerMigrationHelper(self.repo.config, migrschema, |
76 yield cnx, ServerMigrationHelper(self.repo.config, migrschema, |
77 repo=self.repo, cnx=cnx, |
77 repo=self.repo, cnx=cnx, |
78 interactive=False) |
78 interactive=False) |
|
79 |
|
80 def table_sql(self, mh, tablename): |
|
81 result = mh.sqlexec("SELECT sql FROM sqlite_master WHERE type='table' " |
|
82 "and name=%(table)s", {'table': tablename}) |
|
83 if result: |
|
84 return result[0][0] |
|
85 return None # no such table |
|
86 |
|
87 def table_schema(self, mh, tablename): |
|
88 sql = self.table_sql(mh, tablename) |
|
89 assert sql, 'no table %s' % tablename |
|
90 return dict(x.split()[:2] |
|
91 for x in sql.split('(', 1)[1].rsplit(')', 1)[0].split(',')) |
79 |
92 |
80 |
93 |
81 class MigrationCommandsTC(MigrationTC): |
94 class MigrationCommandsTC(MigrationTC): |
82 |
95 |
83 def _init_repo(self): |
96 def _init_repo(self): |
144 mh.cmd_add_attribute('Note', 'shortpara') |
157 mh.cmd_add_attribute('Note', 'shortpara') |
145 self.assertIn('shortpara', self.schema) |
158 self.assertIn('shortpara', self.schema) |
146 self.assertEqual(self.schema['shortpara'].subjects(), ('Note', )) |
159 self.assertEqual(self.schema['shortpara'].subjects(), ('Note', )) |
147 self.assertEqual(self.schema['shortpara'].objects(), ('String', )) |
160 self.assertEqual(self.schema['shortpara'].objects(), ('String', )) |
148 # test created column is actually a varchar(64) |
161 # test created column is actually a varchar(64) |
149 notesql = mh.sqlexec("SELECT sql FROM sqlite_master WHERE type='table' and name='%sNote'" % SQL_PREFIX)[0][0] |
162 fields = self.table_schema(mh, '%sNote' % SQL_PREFIX) |
150 fields = dict(x.strip().split()[:2] for x in notesql.split('(', 1)[1].rsplit(')', 1)[0].split(',')) |
|
151 self.assertEqual(fields['%sshortpara' % SQL_PREFIX], 'varchar(64)') |
163 self.assertEqual(fields['%sshortpara' % SQL_PREFIX], 'varchar(64)') |
152 # test default value set on existing entities |
164 # test default value set on existing entities |
153 self.assertEqual(cnx.execute('Note X').get_entity(0, 0).shortpara, 'hop') |
165 self.assertEqual(cnx.execute('Note X').get_entity(0, 0).shortpara, 'hop') |
154 # test default value set for next entities |
166 # test default value set for next entities |
155 self.assertEqual(cnx.create_entity('Note').shortpara, 'hop') |
167 self.assertEqual(cnx.create_entity('Note').shortpara, 'hop') |
665 self.assertEqual(sorted(et.type for et in self.schema['Para'].specialized_by()), |
677 self.assertEqual(sorted(et.type for et in self.schema['Para'].specialized_by()), |
666 []) |
678 []) |
667 self.assertEqual(self.schema['Note'].specializes(), None) |
679 self.assertEqual(self.schema['Note'].specializes(), None) |
668 self.assertEqual(self.schema['Text'].specializes(), None) |
680 self.assertEqual(self.schema['Text'].specializes(), None) |
669 |
681 |
670 |
|
671 def test_add_symmetric_relation_type(self): |
682 def test_add_symmetric_relation_type(self): |
672 with self.mh() as (cnx, mh): |
683 with self.mh() as (cnx, mh): |
673 same_as_sql = mh.sqlexec("SELECT sql FROM sqlite_master WHERE type='table' " |
684 self.assertFalse(self.table_sql(mh, 'same_as_relation')) |
674 "and name='same_as_relation'") |
|
675 self.assertFalse(same_as_sql) |
|
676 mh.cmd_add_relation_type('same_as') |
685 mh.cmd_add_relation_type('same_as') |
677 same_as_sql = mh.sqlexec("SELECT sql FROM sqlite_master WHERE type='table' " |
686 self.assertTrue(self.table_sql(mh, 'same_as_relation')) |
678 "and name='same_as_relation'") |
|
679 self.assertTrue(same_as_sql) |
|
680 |
687 |
681 |
688 |
682 class MigrationCommandsComputedTC(MigrationTC): |
689 class MigrationCommandsComputedTC(MigrationTC): |
683 """ Unit tests for computed relations and attributes |
690 """ Unit tests for computed relations and attributes |
684 """ |
691 """ |
701 mh.cmd_drop_relation_definition('Company', 'notes', 'Note') |
708 mh.cmd_drop_relation_definition('Company', 'notes', 'Note') |
702 self.assertEqual(str(exc.exception), |
709 self.assertEqual(str(exc.exception), |
703 'Cannot drop a relation definition for a computed ' |
710 'Cannot drop a relation definition for a computed ' |
704 'relation (notes)') |
711 'relation (notes)') |
705 |
712 |
|
713 def test_computed_relation_add_relation_type(self): |
|
714 self.assertNotIn('works_for', self.schema) |
|
715 with self.mh() as (cnx, mh): |
|
716 mh.cmd_add_relation_type('works_for') |
|
717 self.assertIn('works_for', self.schema) |
|
718 self.assertEqual(self.schema['works_for'].rule, |
|
719 'O employees S, NOT EXISTS (O associates S)') |
|
720 self.assertEqual(self.schema['works_for'].objects(), ('Company',)) |
|
721 self.assertEqual(self.schema['works_for'].subjects(), ('Employee',)) |
|
722 self.assertFalse(self.table_sql(mh, 'works_for_relation')) |
|
723 e = cnx.create_entity('Employee') |
|
724 a = cnx.create_entity('Employee') |
|
725 cnx.create_entity('Company', employees=e, associates=a) |
|
726 cnx.commit() |
|
727 company = cnx.execute('Company X').get_entity(0, 0) |
|
728 self.assertEqual([e.eid], |
|
729 [x.eid for x in company.reverse_works_for]) |
|
730 mh.rollback() |
|
731 |
706 def test_computed_relation_drop_relation_type(self): |
732 def test_computed_relation_drop_relation_type(self): |
707 self.assertIn('notes', self.schema) |
733 self.assertIn('notes', self.schema) |
708 with self.mh() as (cnx, mh): |
734 with self.mh() as (cnx, mh): |
709 mh.cmd_drop_relation_type('notes') |
735 mh.cmd_drop_relation_type('notes') |
710 self.assertNotIn('notes', self.schema) |
736 self.assertNotIn('notes', self.schema) |