server/test/unittest_migractions.py
changeset 10441 1d0f4064a87e
parent 10353 d9a1e7939ee6
child 10487 49a5c38de1de
equal deleted inserted replaced
10440:eecb7bbb6795 10441:1d0f4064a87e
    24 from logilab.common.testlib import unittest_main, Tags, tag
    24 from logilab.common.testlib import unittest_main, Tags, tag
    25 
    25 
    26 from yams.constraints import UniqueConstraint
    26 from yams.constraints import UniqueConstraint
    27 
    27 
    28 from cubicweb import ConfigurationError, ValidationError, ExecutionError
    28 from cubicweb import ConfigurationError, ValidationError, ExecutionError
       
    29 from cubicweb.devtools import startpgcluster, stoppgcluster
    29 from cubicweb.devtools.testlib import CubicWebTC
    30 from cubicweb.devtools.testlib import CubicWebTC
    30 from cubicweb.server.sqlutils import SQL_PREFIX
    31 from cubicweb.server.sqlutils import SQL_PREFIX
    31 from cubicweb.server.migractions import ServerMigrationHelper
    32 from cubicweb.server.migractions import ServerMigrationHelper
    32 
    33 
    33 import cubicweb.devtools
    34 import cubicweb.devtools
    34 
    35 
    35 
    36 
    36 HERE = osp.dirname(osp.abspath(__file__))
    37 HERE = osp.dirname(osp.abspath(__file__))
       
    38 
       
    39 
       
    40 def setUpModule():
       
    41     startpgcluster(__file__)
       
    42 
    37 
    43 
    38 migrschema = None
    44 migrschema = None
    39 def tearDownModule(*args):
    45 def tearDownModule(*args):
    40     global migrschema
    46     global migrschema
    41     del migrschema
    47     del migrschema
    42     if hasattr(MigrationCommandsTC, 'origschema'):
    48     if hasattr(MigrationCommandsTC, 'origschema'):
    43         del MigrationCommandsTC.origschema
    49         del MigrationCommandsTC.origschema
    44     if hasattr(MigrationCommandsComputedTC, 'origschema'):
    50     if hasattr(MigrationCommandsComputedTC, 'origschema'):
    45         del MigrationCommandsComputedTC.origschema
    51         del MigrationCommandsComputedTC.origschema
       
    52     stoppgcluster(__file__)
       
    53 
       
    54 
       
    55 class MigrationConfig(cubicweb.devtools.TestServerConfiguration):
       
    56     default_sources = cubicweb.devtools.DEFAULT_PSQL_SOURCES
       
    57 
    46 
    58 
    47 class MigrationTC(CubicWebTC):
    59 class MigrationTC(CubicWebTC):
    48 
    60 
    49     configcls = cubicweb.devtools.TestServerConfiguration
    61     configcls = MigrationConfig
    50 
    62 
    51     tags = CubicWebTC.tags | Tags(('server', 'migration', 'migractions'))
    63     tags = CubicWebTC.tags | Tags(('server', 'migration', 'migractions'))
    52 
    64 
    53     def _init_repo(self):
    65     def _init_repo(self):
    54         super(MigrationTC, self)._init_repo()
    66         super(MigrationTC, self)._init_repo()
    76             yield cnx, ServerMigrationHelper(self.repo.config, migrschema,
    88             yield cnx, ServerMigrationHelper(self.repo.config, migrschema,
    77                                              repo=self.repo, cnx=cnx,
    89                                              repo=self.repo, cnx=cnx,
    78                                              interactive=False)
    90                                              interactive=False)
    79 
    91 
    80     def table_sql(self, mh, tablename):
    92     def table_sql(self, mh, tablename):
    81         result = mh.sqlexec("SELECT sql FROM sqlite_master WHERE type='table' "
    93         result = mh.sqlexec("SELECT table_name FROM information_schema.tables WHERE LOWER(table_name)=%(table)s",
    82                             "and name=%(table)s", {'table': tablename})
    94                             {'table': tablename.lower()})
    83         if result:
    95         if result:
    84             return result[0][0]
    96             return result[0][0]
    85         return None # no such table
    97         return None # no such table
    86 
    98 
    87     def table_schema(self, mh, tablename):
    99     def table_schema(self, mh, tablename):
    88         sql = self.table_sql(mh, tablename)
   100         result = mh.sqlexec("SELECT column_name, data_type, character_maximum_length FROM information_schema.columns "
    89         assert sql, 'no table %s' % tablename
   101                             "WHERE LOWER(table_name) = %(table)s", {'table': tablename.lower()})
    90         return dict(x.split()[:2]
   102         assert result, 'no table %s' % tablename
    91                     for x in sql.split('(', 1)[1].rsplit(')', 1)[0].split(','))
   103         return dict((x[0], (x[1], x[2])) for x in result)
    92 
   104 
    93 
   105 
    94 class MigrationCommandsTC(MigrationTC):
   106 class MigrationCommandsTC(MigrationTC):
    95 
   107 
    96     def _init_repo(self):
   108     def _init_repo(self):
   158             self.assertIn('shortpara', self.schema)
   170             self.assertIn('shortpara', self.schema)
   159             self.assertEqual(self.schema['shortpara'].subjects(), ('Note', ))
   171             self.assertEqual(self.schema['shortpara'].subjects(), ('Note', ))
   160             self.assertEqual(self.schema['shortpara'].objects(), ('String', ))
   172             self.assertEqual(self.schema['shortpara'].objects(), ('String', ))
   161             # test created column is actually a varchar(64)
   173             # test created column is actually a varchar(64)
   162             fields = self.table_schema(mh, '%sNote' % SQL_PREFIX)
   174             fields = self.table_schema(mh, '%sNote' % SQL_PREFIX)
   163             self.assertEqual(fields['%sshortpara' % SQL_PREFIX], 'varchar(64)')
   175             self.assertEqual(fields['%sshortpara' % SQL_PREFIX], ('character varying', 64))
   164             # test default value set on existing entities
   176             # test default value set on existing entities
   165             self.assertEqual(cnx.execute('Note X').get_entity(0, 0).shortpara, 'hop')
   177             self.assertEqual(cnx.execute('Note X').get_entity(0, 0).shortpara, 'hop')
   166             # test default value set for next entities
   178             # test default value set for next entities
   167             self.assertEqual(cnx.create_entity('Note').shortpara, 'hop')
   179             self.assertEqual(cnx.create_entity('Note').shortpara, 'hop')
   168 
   180 
   210         with self.mh() as (cnx, mh):
   222         with self.mh() as (cnx, mh):
   211             with mh.cmd_dropped_constraints('Note', 'unique_id', cstrtype=None,
   223             with mh.cmd_dropped_constraints('Note', 'unique_id', cstrtype=None,
   212                                             droprequired=True):
   224                                             droprequired=True):
   213                 mh.cmd_add_attribute('Note', 'unique_id')
   225                 mh.cmd_add_attribute('Note', 'unique_id')
   214                 mh.rqlexec('INSERT Note N')
   226                 mh.rqlexec('INSERT Note N')
       
   227                 mh.rqlexec('SET N unique_id "x"')
   215             # make sure the required=True was restored
   228             # make sure the required=True was restored
   216             self.assertRaises(ValidationError, mh.rqlexec, 'INSERT Note N')
   229             self.assertRaises(ValidationError, mh.rqlexec, 'INSERT Note N')
   217             mh.rollback()
   230             mh.rollback()
   218 
   231 
   219     def test_rename_attribute(self):
   232     def test_rename_attribute(self):
   777 
   790 
   778     def assert_score_initialized(self, mh):
   791     def assert_score_initialized(self, mh):
   779         self.assertEqual(self.schema['score'].rdefs['Company', 'Float'].formula,
   792         self.assertEqual(self.schema['score'].rdefs['Company', 'Float'].formula,
   780                          'Any AVG(NN) WHERE X employees E, N concerns E, N note NN')
   793                          'Any AVG(NN) WHERE X employees E, N concerns E, N note NN')
   781         fields = self.table_schema(mh, '%sCompany' % SQL_PREFIX)
   794         fields = self.table_schema(mh, '%sCompany' % SQL_PREFIX)
   782         self.assertEqual(fields['%sscore' % SQL_PREFIX], 'float')
   795         self.assertEqual(fields['%sscore' % SQL_PREFIX], ('double precision', None))
   783         self.assertEqual([[3.0]],
   796         self.assertEqual([[3.0]],
   784                          mh.rqlexec('Any CS WHERE C score CS, C is Company').rows)
   797                          mh.rqlexec('Any CS WHERE C score CS, C is Company').rows)
   785 
   798 
   786     def test_computed_attribute_add_relation_type(self):
   799     def test_computed_attribute_add_relation_type(self):
   787         self.assertNotIn('score', self.schema)
   800         self.assertNotIn('score', self.schema)
   801             self.assertIn('score', self.schema)
   814             self.assertIn('score', self.schema)
   802             self.assert_score_initialized(mh)
   815             self.assert_score_initialized(mh)
   803 
   816 
   804     def assert_computed_attribute_dropped(self):
   817     def assert_computed_attribute_dropped(self):
   805         self.assertNotIn('note20', self.schema)
   818         self.assertNotIn('note20', self.schema)
   806         # DROP COLUMN not supported by sqlite
   819         with self.mh() as (cnx, mh):
   807         #with self.mh() as (cnx, mh):
   820             fields = self.table_schema(mh, '%sNote' % SQL_PREFIX)
   808         #    fields = self.table_schema(mh, '%sNote' % SQL_PREFIX)
   821         self.assertNotIn('%snote20' % SQL_PREFIX, fields)
   809         #self.assertNotIn('%snote20' % SQL_PREFIX, fields)
       
   810 
   822 
   811     def test_computed_attribute_drop_type(self):
   823     def test_computed_attribute_drop_type(self):
   812         self.assertIn('note20', self.schema)
   824         self.assertIn('note20', self.schema)
   813         with self.mh() as (cnx, mh):
   825         with self.mh() as (cnx, mh):
   814             mh.cmd_drop_relation_type('note20')
   826             mh.cmd_drop_relation_type('note20')