# HG changeset patch # User Aurelien Campeas # Date 1401441054 -7200 # Node ID ae624d86c652cc74405a7f201cdb62da8925f748 # Parent 57dbd44503732f9b489a23780818d2f9779bb5ea [tests/migration] use the new connection api diff -r 57dbd4450373 -r ae624d86c652 server/test/unittest_migractions.py --- a/server/test/unittest_migractions.py Wed May 28 17:33:16 2014 +0200 +++ b/server/test/unittest_migractions.py Fri May 30 11:10:54 2014 +0200 @@ -19,16 +19,16 @@ from datetime import date from os.path import join +from contextlib import contextmanager -from logilab.common.testlib import TestCase, unittest_main, Tags, tag +from logilab.common.testlib import unittest_main, Tags, tag from yams.constraints import UniqueConstraint from cubicweb import ConfigurationError, ValidationError from cubicweb.devtools.testlib import CubicWebTC -from cubicweb.schema import CubicWebSchemaLoader from cubicweb.server.sqlutils import SQL_PREFIX -from cubicweb.server.migractions import * +from cubicweb.server.migractions import ServerMigrationHelper import cubicweb.devtools @@ -61,467 +61,485 @@ def setUp(self): CubicWebTC.setUp(self) - self.mh = ServerMigrationHelper(self.repo.config, migrschema, - repo=self.repo, cnx=self.cnx, - interactive=False) - assert self.cnx is self.mh.cnx - assert self.session is self.mh.session, (self.session.id, self.mh.session.id) def tearDown(self): CubicWebTC.tearDown(self) self.repo.vreg['etypes'].clear_caches() + @contextmanager + def mh(self): + with self.admin_access.client_cnx() as cnx: + yield cnx, ServerMigrationHelper(self.repo.config, migrschema, + repo=self.repo, cnx=cnx, + interactive=False) + def test_add_attribute_bool(self): - self.assertNotIn('yesno', self.schema) - self.session.create_entity('Note') - self.commit() - self.mh.cmd_add_attribute('Note', 'yesno') - self.assertIn('yesno', self.schema) - self.assertEqual(self.schema['yesno'].subjects(), ('Note',)) - self.assertEqual(self.schema['yesno'].objects(), ('Boolean',)) - self.assertEqual(self.schema['Note'].default('yesno'), False) - # test default value set on existing entities - note = self.session.execute('Note X').get_entity(0, 0) - self.assertEqual(note.yesno, False) - # test default value set for next entities - self.assertEqual(self.session.create_entity('Note').yesno, False) - self.mh.rollback() + with self.mh() as (cnx, mh): + self.assertNotIn('yesno', self.schema) + cnx.create_entity('Note') + cnx.commit() + mh.cmd_add_attribute('Note', 'yesno') + self.assertIn('yesno', self.schema) + self.assertEqual(self.schema['yesno'].subjects(), ('Note',)) + self.assertEqual(self.schema['yesno'].objects(), ('Boolean',)) + self.assertEqual(self.schema['Note'].default('yesno'), False) + # test default value set on existing entities + note = cnx.execute('Note X').get_entity(0, 0) + self.assertEqual(note.yesno, False) + # test default value set for next entities + self.assertEqual(cnx.create_entity('Note').yesno, False) def test_add_attribute_int(self): - self.assertNotIn('whatever', self.schema) - self.session.create_entity('Note') - self.session.commit(free_cnxset=False) - orderdict = dict(self.mh.rqlexec('Any RTN, O WHERE X name "Note", RDEF from_entity X, ' + with self.mh() as (cnx, mh): + self.assertNotIn('whatever', self.schema) + cnx.create_entity('Note') + cnx.commit() + orderdict = dict(mh.rqlexec('Any RTN, O WHERE X name "Note", RDEF from_entity X, ' + 'RDEF relation_type RT, RDEF ordernum O, RT name RTN')) + mh.cmd_add_attribute('Note', 'whatever') + self.assertIn('whatever', self.schema) + self.assertEqual(self.schema['whatever'].subjects(), ('Note',)) + self.assertEqual(self.schema['whatever'].objects(), ('Int',)) + self.assertEqual(self.schema['Note'].default('whatever'), 0) + # test default value set on existing entities + note = cnx.execute('Note X').get_entity(0, 0) + self.assertIsInstance(note.whatever, int) + self.assertEqual(note.whatever, 0) + # test default value set for next entities + self.assertEqual(cnx.create_entity('Note').whatever, 0) + # test attribute order + orderdict2 = dict(mh.rqlexec('Any RTN, O WHERE X name "Note", RDEF from_entity X, ' 'RDEF relation_type RT, RDEF ordernum O, RT name RTN')) - self.mh.cmd_add_attribute('Note', 'whatever') - self.assertIn('whatever', self.schema) - self.assertEqual(self.schema['whatever'].subjects(), ('Note',)) - self.assertEqual(self.schema['whatever'].objects(), ('Int',)) - self.assertEqual(self.schema['Note'].default('whatever'), 0) - # test default value set on existing entities - note = self.session.execute('Note X').get_entity(0, 0) - self.assertIsInstance(note.whatever, int) - self.assertEqual(note.whatever, 0) - # test default value set for next entities - self.assertEqual(self.session.create_entity('Note').whatever, 0) - # test attribute order - orderdict2 = dict(self.mh.rqlexec('Any RTN, O WHERE X name "Note", RDEF from_entity X, ' - 'RDEF relation_type RT, RDEF ordernum O, RT name RTN')) - whateverorder = migrschema['whatever'].rdef('Note', 'Int').order - for k, v in orderdict.iteritems(): - if v >= whateverorder: - orderdict[k] = v+1 - orderdict['whatever'] = whateverorder - self.assertDictEqual(orderdict, orderdict2) - #self.assertEqual([r.type for r in self.schema['Note'].ordered_relations()], - # ['modification_date', 'creation_date', 'owned_by', - # 'eid', 'ecrit_par', 'inline1', 'date', 'type', - # 'whatever', 'date', 'in_basket']) - # NB: commit instead of rollback make following test fail with py2.5 - # this sounds like a pysqlite/2.5 bug (the same eid is affected to - # two different entities) - self.mh.rollback() + whateverorder = migrschema['whatever'].rdef('Note', 'Int').order + for k, v in orderdict.iteritems(): + if v >= whateverorder: + orderdict[k] = v+1 + orderdict['whatever'] = whateverorder + self.assertDictEqual(orderdict, orderdict2) + #self.assertEqual([r.type for r in self.schema['Note'].ordered_relations()], + # ['modification_date', 'creation_date', 'owned_by', + # 'eid', 'ecrit_par', 'inline1', 'date', 'type', + # 'whatever', 'date', 'in_basket']) + # NB: commit instead of rollback make following test fail with py2.5 + # this sounds like a pysqlite/2.5 bug (the same eid is affected to + # two different entities) def test_add_attribute_varchar(self): - self.assertNotIn('whatever', self.schema) - self.session.create_entity('Note') - self.session.commit(free_cnxset=False) - self.assertNotIn('shortpara', self.schema) - self.mh.cmd_add_attribute('Note', 'shortpara') - self.assertIn('shortpara', self.schema) - self.assertEqual(self.schema['shortpara'].subjects(), ('Note', )) - self.assertEqual(self.schema['shortpara'].objects(), ('String', )) - # test created column is actually a varchar(64) - notesql = self.mh.sqlexec("SELECT sql FROM sqlite_master WHERE type='table' and name='%sNote'" % SQL_PREFIX)[0][0] - fields = dict(x.strip().split()[:2] for x in notesql.split('(', 1)[1].rsplit(')', 1)[0].split(',')) - self.assertEqual(fields['%sshortpara' % SQL_PREFIX], 'varchar(64)') - # test default value set on existing entities - self.assertEqual(self.session.execute('Note X').get_entity(0, 0).shortpara, 'hop') - # test default value set for next entities - self.assertEqual(self.session.create_entity('Note').shortpara, 'hop') - self.mh.rollback() + with self.mh() as (cnx, mh): + self.assertNotIn('whatever', self.schema) + cnx.create_entity('Note') + cnx.commit() + self.assertNotIn('shortpara', self.schema) + mh.cmd_add_attribute('Note', 'shortpara') + self.assertIn('shortpara', self.schema) + self.assertEqual(self.schema['shortpara'].subjects(), ('Note', )) + self.assertEqual(self.schema['shortpara'].objects(), ('String', )) + # test created column is actually a varchar(64) + notesql = mh.sqlexec("SELECT sql FROM sqlite_master WHERE type='table' and name='%sNote'" % SQL_PREFIX)[0][0] + fields = dict(x.strip().split()[:2] for x in notesql.split('(', 1)[1].rsplit(')', 1)[0].split(',')) + self.assertEqual(fields['%sshortpara' % SQL_PREFIX], 'varchar(64)') + # test default value set on existing entities + self.assertEqual(cnx.execute('Note X').get_entity(0, 0).shortpara, 'hop') + # test default value set for next entities + self.assertEqual(cnx.create_entity('Note').shortpara, 'hop') def test_add_datetime_with_default_value_attribute(self): - self.assertNotIn('mydate', self.schema) - self.assertNotIn('oldstyledefaultdate', self.schema) - self.assertNotIn('newstyledefaultdate', self.schema) - self.mh.cmd_add_attribute('Note', 'mydate') - self.mh.cmd_add_attribute('Note', 'oldstyledefaultdate') - self.mh.cmd_add_attribute('Note', 'newstyledefaultdate') - self.assertIn('mydate', self.schema) - self.assertIn('oldstyledefaultdate', self.schema) - self.assertIn('newstyledefaultdate', self.schema) - self.assertEqual(self.schema['mydate'].subjects(), ('Note', )) - self.assertEqual(self.schema['mydate'].objects(), ('Date', )) - testdate = date(2005, 12, 13) - eid1 = self.mh.rqlexec('INSERT Note N')[0][0] - eid2 = self.mh.rqlexec('INSERT Note N: N mydate %(mydate)s', {'mydate' : testdate})[0][0] - d1 = self.mh.rqlexec('Any D WHERE X eid %(x)s, X mydate D', {'x': eid1})[0][0] - d2 = self.mh.rqlexec('Any D WHERE X eid %(x)s, X mydate D', {'x': eid2})[0][0] - d3 = self.mh.rqlexec('Any D WHERE X eid %(x)s, X oldstyledefaultdate D', {'x': eid1})[0][0] - d4 = self.mh.rqlexec('Any D WHERE X eid %(x)s, X newstyledefaultdate D', {'x': eid1})[0][0] - self.assertEqual(d1, date.today()) - self.assertEqual(d2, testdate) - myfavoritedate = date(2013, 1, 1) - self.assertEqual(d3, myfavoritedate) - self.assertEqual(d4, myfavoritedate) - self.mh.rollback() + with self.mh() as (cnx, mh): + self.assertNotIn('mydate', self.schema) + self.assertNotIn('oldstyledefaultdate', self.schema) + self.assertNotIn('newstyledefaultdate', self.schema) + mh.cmd_add_attribute('Note', 'mydate') + mh.cmd_add_attribute('Note', 'oldstyledefaultdate') + mh.cmd_add_attribute('Note', 'newstyledefaultdate') + self.assertIn('mydate', self.schema) + self.assertIn('oldstyledefaultdate', self.schema) + self.assertIn('newstyledefaultdate', self.schema) + self.assertEqual(self.schema['mydate'].subjects(), ('Note', )) + self.assertEqual(self.schema['mydate'].objects(), ('Date', )) + testdate = date(2005, 12, 13) + eid1 = mh.rqlexec('INSERT Note N')[0][0] + eid2 = mh.rqlexec('INSERT Note N: N mydate %(mydate)s', {'mydate' : testdate})[0][0] + d1 = mh.rqlexec('Any D WHERE X eid %(x)s, X mydate D', {'x': eid1})[0][0] + d2 = mh.rqlexec('Any D WHERE X eid %(x)s, X mydate D', {'x': eid2})[0][0] + d3 = mh.rqlexec('Any D WHERE X eid %(x)s, X oldstyledefaultdate D', {'x': eid1})[0][0] + d4 = mh.rqlexec('Any D WHERE X eid %(x)s, X newstyledefaultdate D', {'x': eid1})[0][0] + self.assertEqual(d1, date.today()) + self.assertEqual(d2, testdate) + myfavoritedate = date(2013, 1, 1) + self.assertEqual(d3, myfavoritedate) + self.assertEqual(d4, myfavoritedate) def test_drop_chosen_constraints_ctxmanager(self): - with self.mh.cmd_dropped_constraints('Note', 'unique_id', UniqueConstraint): - self.mh.cmd_add_attribute('Note', 'unique_id') - # make sure the maxsize constraint is not dropped - self.assertRaises(ValidationError, - self.mh.rqlexec, - 'INSERT Note N: N unique_id "xyz"') - self.mh.rollback() - # make sure the unique constraint is dropped - self.mh.rqlexec('INSERT Note N: N unique_id "x"') - self.mh.rqlexec('INSERT Note N: N unique_id "x"') - self.mh.rqlexec('DELETE Note N') - self.mh.rollback() + with self.mh() as (cnx, mh): + with mh.cmd_dropped_constraints('Note', 'unique_id', UniqueConstraint): + mh.cmd_add_attribute('Note', 'unique_id') + # make sure the maxsize constraint is not dropped + self.assertRaises(ValidationError, + mh.rqlexec, + 'INSERT Note N: N unique_id "xyz"') + mh.rollback() + # make sure the unique constraint is dropped + mh.rqlexec('INSERT Note N: N unique_id "x"') + mh.rqlexec('INSERT Note N: N unique_id "x"') + mh.rqlexec('DELETE Note N') def test_drop_required_ctxmanager(self): - with self.mh.cmd_dropped_constraints('Note', 'unique_id', cstrtype=None, - droprequired=True): - self.mh.cmd_add_attribute('Note', 'unique_id') - self.mh.rqlexec('INSERT Note N') - # make sure the required=True was restored - self.assertRaises(ValidationError, self.mh.rqlexec, 'INSERT Note N') - self.mh.rollback() + with self.mh() as (cnx, mh): + with mh.cmd_dropped_constraints('Note', 'unique_id', cstrtype=None, + droprequired=True): + mh.cmd_add_attribute('Note', 'unique_id') + mh.rqlexec('INSERT Note N') + # make sure the required=True was restored + self.assertRaises(ValidationError, mh.rqlexec, 'INSERT Note N') + mh.rollback() def test_rename_attribute(self): - self.assertNotIn('civility', self.schema) - eid1 = self.mh.rqlexec('INSERT Personne X: X nom "lui", X sexe "M"')[0][0] - eid2 = self.mh.rqlexec('INSERT Personne X: X nom "l\'autre", X sexe NULL')[0][0] - self.mh.cmd_rename_attribute('Personne', 'sexe', 'civility') - self.assertNotIn('sexe', self.schema) - self.assertIn('civility', self.schema) - # test data has been backported - c1 = self.mh.rqlexec('Any C WHERE X eid %s, X civility C' % eid1)[0][0] - self.assertEqual(c1, 'M') - c2 = self.mh.rqlexec('Any C WHERE X eid %s, X civility C' % eid2)[0][0] - self.assertEqual(c2, None) - + with self.mh() as (cnx, mh): + self.assertNotIn('civility', self.schema) + eid1 = mh.rqlexec('INSERT Personne X: X nom "lui", X sexe "M"')[0][0] + eid2 = mh.rqlexec('INSERT Personne X: X nom "l\'autre", X sexe NULL')[0][0] + mh.cmd_rename_attribute('Personne', 'sexe', 'civility') + self.assertNotIn('sexe', self.schema) + self.assertIn('civility', self.schema) + # test data has been backported + c1 = mh.rqlexec('Any C WHERE X eid %s, X civility C' % eid1)[0][0] + self.assertEqual(c1, 'M') + c2 = mh.rqlexec('Any C WHERE X eid %s, X civility C' % eid2)[0][0] + self.assertEqual(c2, None) def test_workflow_actions(self): - wf = self.mh.cmd_add_workflow(u'foo', ('Personne', 'Email'), - ensure_workflowable=False) - for etype in ('Personne', 'Email'): - s1 = self.mh.rqlexec('Any N WHERE WF workflow_of ET, ET name "%s", WF name N' % - etype)[0][0] - self.assertEqual(s1, "foo") - s1 = self.mh.rqlexec('Any N WHERE ET default_workflow WF, ET name "%s", WF name N' % - etype)[0][0] - self.assertEqual(s1, "foo") + with self.mh() as (cnx, mh): + wf = mh.cmd_add_workflow(u'foo', ('Personne', 'Email'), + ensure_workflowable=False) + for etype in ('Personne', 'Email'): + s1 = mh.rqlexec('Any N WHERE WF workflow_of ET, ET name "%s", WF name N' % + etype)[0][0] + self.assertEqual(s1, "foo") + s1 = mh.rqlexec('Any N WHERE ET default_workflow WF, ET name "%s", WF name N' % + etype)[0][0] + self.assertEqual(s1, "foo") def test_add_entity_type(self): - self.assertNotIn('Folder2', self.schema) - self.assertNotIn('filed_under2', self.schema) - self.mh.cmd_add_entity_type('Folder2') - self.assertIn('Folder2', self.schema) - self.assertIn('Old', self.schema) - self.assertTrue(self.session.execute('CWEType X WHERE X name "Folder2"')) - self.assertIn('filed_under2', self.schema) - self.assertTrue(self.session.execute('CWRType X WHERE X name "filed_under2"')) - self.assertEqual(sorted(str(rs) for rs in self.schema['Folder2'].subject_relations()), - ['created_by', 'creation_date', 'cw_source', 'cwuri', - 'description', 'description_format', - 'eid', - 'filed_under2', 'has_text', - 'identity', 'in_basket', 'is', 'is_instance_of', - 'modification_date', 'name', 'owned_by']) - self.assertEqual([str(rs) for rs in self.schema['Folder2'].object_relations()], - ['filed_under2', 'identity']) - # Old will be missing as it has been renamed into 'New' in the migrated - # schema while New hasn't been added here. - self.assertEqual(sorted(str(e) for e in self.schema['filed_under2'].subjects()), - sorted(str(e) for e in self.schema.entities() if not e.final and e != 'Old')) - self.assertEqual(self.schema['filed_under2'].objects(), ('Folder2',)) - eschema = self.schema.eschema('Folder2') - for cstr in eschema.rdef('name').constraints: - self.assertTrue(hasattr(cstr, 'eid')) + with self.mh() as (cnx, mh): + self.assertNotIn('Folder2', self.schema) + self.assertNotIn('filed_under2', self.schema) + mh.cmd_add_entity_type('Folder2') + self.assertIn('Folder2', self.schema) + self.assertIn('Old', self.schema) + self.assertTrue(cnx.execute('CWEType X WHERE X name "Folder2"')) + self.assertIn('filed_under2', self.schema) + self.assertTrue(cnx.execute('CWRType X WHERE X name "filed_under2"')) + self.assertEqual(sorted(str(rs) for rs in self.schema['Folder2'].subject_relations()), + ['created_by', 'creation_date', 'cw_source', 'cwuri', + 'description', 'description_format', + 'eid', + 'filed_under2', 'has_text', + 'identity', 'in_basket', 'is', 'is_instance_of', + 'modification_date', 'name', 'owned_by']) + self.assertEqual([str(rs) for rs in self.schema['Folder2'].object_relations()], + ['filed_under2', 'identity']) + # Old will be missing as it has been renamed into 'New' in the migrated + # schema while New hasn't been added here. + self.assertEqual(sorted(str(e) for e in self.schema['filed_under2'].subjects()), + sorted(str(e) for e in self.schema.entities() if not e.final and e != 'Old')) + self.assertEqual(self.schema['filed_under2'].objects(), ('Folder2',)) + eschema = self.schema.eschema('Folder2') + for cstr in eschema.rdef('name').constraints: + self.assertTrue(hasattr(cstr, 'eid')) def test_add_drop_entity_type(self): - self.mh.cmd_add_entity_type('Folder2') - wf = self.mh.cmd_add_workflow(u'folder2 wf', 'Folder2', - ensure_workflowable=False) - todo = wf.add_state(u'todo', initial=True) - done = wf.add_state(u'done') - wf.add_transition(u'redoit', done, todo) - wf.add_transition(u'markasdone', todo, done) - self.session.commit(free_cnxset=False) - eschema = self.schema.eschema('Folder2') - self.mh.cmd_drop_entity_type('Folder2') - self.assertNotIn('Folder2', self.schema) - self.assertFalse(self.session.execute('CWEType X WHERE X name "Folder2"')) - # test automatic workflow deletion - self.assertFalse(self.session.execute('Workflow X WHERE NOT X workflow_of ET')) - self.assertFalse(self.session.execute('State X WHERE NOT X state_of WF')) - self.assertFalse(self.session.execute('Transition X WHERE NOT X transition_of WF')) + with self.mh() as (cnx, mh): + mh.cmd_add_entity_type('Folder2') + wf = mh.cmd_add_workflow(u'folder2 wf', 'Folder2', + ensure_workflowable=False) + todo = wf.add_state(u'todo', initial=True) + done = wf.add_state(u'done') + wf.add_transition(u'redoit', done, todo) + wf.add_transition(u'markasdone', todo, done) + cnx.commit() + eschema = self.schema.eschema('Folder2') + mh.cmd_drop_entity_type('Folder2') + self.assertNotIn('Folder2', self.schema) + self.assertFalse(cnx.execute('CWEType X WHERE X name "Folder2"')) + # test automatic workflow deletion + self.assertFalse(cnx.execute('Workflow X WHERE NOT X workflow_of ET')) + self.assertFalse(cnx.execute('State X WHERE NOT X state_of WF')) + self.assertFalse(cnx.execute('Transition X WHERE NOT X transition_of WF')) def test_rename_entity_type(self): - entity = self.mh.create_entity('Old', name=u'old') - self.repo.type_and_source_from_eid(entity.eid, entity._cw) - self.mh.cmd_rename_entity_type('Old', 'New') - self.mh.cmd_rename_attribute('New', 'name', 'new_name') + with self.mh() as (cnx, mh): + entity = mh.create_entity('Old', name=u'old') + self.repo.type_and_source_from_eid(entity.eid, entity._cw) + mh.cmd_rename_entity_type('Old', 'New') + mh.cmd_rename_attribute('New', 'name', 'new_name') def test_add_drop_relation_type(self): - self.mh.cmd_add_entity_type('Folder2', auto=False) - self.mh.cmd_add_relation_type('filed_under2') - self.assertIn('filed_under2', self.schema) - # Old will be missing as it has been renamed into 'New' in the migrated - # schema while New hasn't been added here. - self.assertEqual(sorted(str(e) for e in self.schema['filed_under2'].subjects()), - sorted(str(e) for e in self.schema.entities() - if not e.final and e != 'Old')) - self.assertEqual(self.schema['filed_under2'].objects(), ('Folder2',)) - self.mh.cmd_drop_relation_type('filed_under2') - self.assertNotIn('filed_under2', self.schema) + with self.mh() as (cnx, mh): + mh.cmd_add_entity_type('Folder2', auto=False) + mh.cmd_add_relation_type('filed_under2') + self.assertIn('filed_under2', self.schema) + # Old will be missing as it has been renamed into 'New' in the migrated + # schema while New hasn't been added here. + self.assertEqual(sorted(str(e) for e in self.schema['filed_under2'].subjects()), + sorted(str(e) for e in self.schema.entities() + if not e.final and e != 'Old')) + self.assertEqual(self.schema['filed_under2'].objects(), ('Folder2',)) + mh.cmd_drop_relation_type('filed_under2') + self.assertNotIn('filed_under2', self.schema) def test_add_relation_definition_nortype(self): - self.mh.cmd_add_relation_definition('Personne', 'concerne2', 'Affaire') - self.assertEqual(self.schema['concerne2'].subjects(), - ('Personne',)) - self.assertEqual(self.schema['concerne2'].objects(), - ('Affaire', )) - self.assertEqual(self.schema['concerne2'].rdef('Personne', 'Affaire').cardinality, - '1*') - self.mh.cmd_add_relation_definition('Personne', 'concerne2', 'Note') - self.assertEqual(sorted(self.schema['concerne2'].objects()), ['Affaire', 'Note']) - self.mh.create_entity('Personne', nom=u'tot') - self.mh.create_entity('Affaire') - self.mh.rqlexec('SET X concerne2 Y WHERE X is Personne, Y is Affaire') - self.session.commit(free_cnxset=False) - self.mh.cmd_drop_relation_definition('Personne', 'concerne2', 'Affaire') - self.assertIn('concerne2', self.schema) - self.mh.cmd_drop_relation_definition('Personne', 'concerne2', 'Note') - self.assertNotIn('concerne2', self.schema) + with self.mh() as (cnx, mh): + mh.cmd_add_relation_definition('Personne', 'concerne2', 'Affaire') + self.assertEqual(self.schema['concerne2'].subjects(), + ('Personne',)) + self.assertEqual(self.schema['concerne2'].objects(), + ('Affaire', )) + self.assertEqual(self.schema['concerne2'].rdef('Personne', 'Affaire').cardinality, + '1*') + mh.cmd_add_relation_definition('Personne', 'concerne2', 'Note') + self.assertEqual(sorted(self.schema['concerne2'].objects()), ['Affaire', 'Note']) + mh.create_entity('Personne', nom=u'tot') + mh.create_entity('Affaire') + mh.rqlexec('SET X concerne2 Y WHERE X is Personne, Y is Affaire') + cnx.commit() + mh.cmd_drop_relation_definition('Personne', 'concerne2', 'Affaire') + self.assertIn('concerne2', self.schema) + mh.cmd_drop_relation_definition('Personne', 'concerne2', 'Note') + self.assertNotIn('concerne2', self.schema) def test_drop_relation_definition_existant_rtype(self): - self.assertEqual(sorted(str(e) for e in self.schema['concerne'].subjects()), - ['Affaire', 'Personne']) - self.assertEqual(sorted(str(e) for e in self.schema['concerne'].objects()), - ['Affaire', 'Division', 'Note', 'Societe', 'SubDivision']) - self.mh.cmd_drop_relation_definition('Personne', 'concerne', 'Affaire') - self.assertEqual(sorted(str(e) for e in self.schema['concerne'].subjects()), - ['Affaire']) - self.assertEqual(sorted(str(e) for e in self.schema['concerne'].objects()), - ['Division', 'Note', 'Societe', 'SubDivision']) - self.mh.cmd_add_relation_definition('Personne', 'concerne', 'Affaire') - self.assertEqual(sorted(str(e) for e in self.schema['concerne'].subjects()), - ['Affaire', 'Personne']) - self.assertEqual(sorted(str(e) for e in self.schema['concerne'].objects()), - ['Affaire', 'Division', 'Note', 'Societe', 'SubDivision']) - # trick: overwrite self.maxeid to avoid deletion of just reintroduced types - self.maxeid = self.session.execute('Any MAX(X)')[0][0] + with self.mh() as (cnx, mh): + self.assertEqual(sorted(str(e) for e in self.schema['concerne'].subjects()), + ['Affaire', 'Personne']) + self.assertEqual(sorted(str(e) for e in self.schema['concerne'].objects()), + ['Affaire', 'Division', 'Note', 'Societe', 'SubDivision']) + mh.cmd_drop_relation_definition('Personne', 'concerne', 'Affaire') + self.assertEqual(sorted(str(e) for e in self.schema['concerne'].subjects()), + ['Affaire']) + self.assertEqual(sorted(str(e) for e in self.schema['concerne'].objects()), + ['Division', 'Note', 'Societe', 'SubDivision']) + mh.cmd_add_relation_definition('Personne', 'concerne', 'Affaire') + self.assertEqual(sorted(str(e) for e in self.schema['concerne'].subjects()), + ['Affaire', 'Personne']) + self.assertEqual(sorted(str(e) for e in self.schema['concerne'].objects()), + ['Affaire', 'Division', 'Note', 'Societe', 'SubDivision']) + # trick: overwrite self.maxeid to avoid deletion of just reintroduced types + self.maxeid = cnx.execute('Any MAX(X)')[0][0] def test_drop_relation_definition_with_specialization(self): - self.assertEqual(sorted(str(e) for e in self.schema['concerne'].subjects()), - ['Affaire', 'Personne']) - self.assertEqual(sorted(str(e) for e in self.schema['concerne'].objects()), - ['Affaire', 'Division', 'Note', 'Societe', 'SubDivision']) - self.mh.cmd_drop_relation_definition('Affaire', 'concerne', 'Societe') - self.assertEqual(sorted(str(e) for e in self.schema['concerne'].subjects()), - ['Affaire', 'Personne']) - self.assertEqual(sorted(str(e) for e in self.schema['concerne'].objects()), - ['Affaire', 'Note']) - self.mh.cmd_add_relation_definition('Affaire', 'concerne', 'Societe') - self.assertEqual(sorted(str(e) for e in self.schema['concerne'].subjects()), - ['Affaire', 'Personne']) - self.assertEqual(sorted(str(e) for e in self.schema['concerne'].objects()), - ['Affaire', 'Division', 'Note', 'Societe', 'SubDivision']) - # trick: overwrite self.maxeid to avoid deletion of just reintroduced types - self.maxeid = self.session.execute('Any MAX(X)')[0][0] + with self.mh() as (cnx, mh): + self.assertEqual(sorted(str(e) for e in self.schema['concerne'].subjects()), + ['Affaire', 'Personne']) + self.assertEqual(sorted(str(e) for e in self.schema['concerne'].objects()), + ['Affaire', 'Division', 'Note', 'Societe', 'SubDivision']) + mh.cmd_drop_relation_definition('Affaire', 'concerne', 'Societe') + self.assertEqual(sorted(str(e) for e in self.schema['concerne'].subjects()), + ['Affaire', 'Personne']) + self.assertEqual(sorted(str(e) for e in self.schema['concerne'].objects()), + ['Affaire', 'Note']) + mh.cmd_add_relation_definition('Affaire', 'concerne', 'Societe') + self.assertEqual(sorted(str(e) for e in self.schema['concerne'].subjects()), + ['Affaire', 'Personne']) + self.assertEqual(sorted(str(e) for e in self.schema['concerne'].objects()), + ['Affaire', 'Division', 'Note', 'Societe', 'SubDivision']) + # trick: overwrite self.maxeid to avoid deletion of just reintroduced types + self.maxeid = cnx.execute('Any MAX(X)')[0][0] def test_rename_relation(self): self.skipTest('implement me') def test_change_relation_props_non_final(self): - rschema = self.schema['concerne'] - card = rschema.rdef('Affaire', 'Societe').cardinality - self.assertEqual(card, '**') - try: - self.mh.cmd_change_relation_props('Affaire', 'concerne', 'Societe', - cardinality='?*') + with self.mh() as (cnx, mh): + rschema = self.schema['concerne'] card = rschema.rdef('Affaire', 'Societe').cardinality - self.assertEqual(card, '?*') - finally: - self.mh.cmd_change_relation_props('Affaire', 'concerne', 'Societe', - cardinality='**') + self.assertEqual(card, '**') + try: + mh.cmd_change_relation_props('Affaire', 'concerne', 'Societe', + cardinality='?*') + card = rschema.rdef('Affaire', 'Societe').cardinality + self.assertEqual(card, '?*') + finally: + mh.cmd_change_relation_props('Affaire', 'concerne', 'Societe', + cardinality='**') def test_change_relation_props_final(self): - rschema = self.schema['adel'] - card = rschema.rdef('Personne', 'String').fulltextindexed - self.assertEqual(card, False) - try: - self.mh.cmd_change_relation_props('Personne', 'adel', 'String', - fulltextindexed=True) + with self.mh() as (cnx, mh): + rschema = self.schema['adel'] card = rschema.rdef('Personne', 'String').fulltextindexed - self.assertEqual(card, True) - finally: - self.mh.cmd_change_relation_props('Personne', 'adel', 'String', - fulltextindexed=False) + self.assertEqual(card, False) + try: + mh.cmd_change_relation_props('Personne', 'adel', 'String', + fulltextindexed=True) + card = rschema.rdef('Personne', 'String').fulltextindexed + self.assertEqual(card, True) + finally: + mh.cmd_change_relation_props('Personne', 'adel', 'String', + fulltextindexed=False) def test_sync_schema_props_perms_rqlconstraints(self): - # Drop one of the RQLConstraint. - rdef = self.schema['evaluee'].rdefs[('Personne', 'Note')] - oldconstraints = rdef.constraints - self.assertIn('S created_by U', - [cstr.expression for cstr in oldconstraints]) - self.mh.cmd_sync_schema_props_perms('evaluee', commit=True) - newconstraints = rdef.constraints - self.assertNotIn('S created_by U', - [cstr.expression for cstr in newconstraints]) + with self.mh() as (cnx, mh): + # Drop one of the RQLConstraint. + rdef = self.schema['evaluee'].rdefs[('Personne', 'Note')] + oldconstraints = rdef.constraints + self.assertIn('S created_by U', + [cstr.expression for cstr in oldconstraints]) + mh.cmd_sync_schema_props_perms('evaluee', commit=True) + newconstraints = rdef.constraints + self.assertNotIn('S created_by U', + [cstr.expression for cstr in newconstraints]) - # Drop all RQLConstraint. - rdef = self.schema['travaille'].rdefs[('Personne', 'Societe')] - oldconstraints = rdef.constraints - self.assertEqual(len(oldconstraints), 2) - self.mh.cmd_sync_schema_props_perms('travaille', commit=True) - rdef = self.schema['travaille'].rdefs[('Personne', 'Societe')] - newconstraints = rdef.constraints - self.assertEqual(len(newconstraints), 0) + # Drop all RQLConstraint. + rdef = self.schema['travaille'].rdefs[('Personne', 'Societe')] + oldconstraints = rdef.constraints + self.assertEqual(len(oldconstraints), 2) + mh.cmd_sync_schema_props_perms('travaille', commit=True) + rdef = self.schema['travaille'].rdefs[('Personne', 'Societe')] + newconstraints = rdef.constraints + self.assertEqual(len(newconstraints), 0) @tag('longrun') def test_sync_schema_props_perms(self): - cursor = self.mh.session - cursor.set_cnxset() - nbrqlexpr_start = cursor.execute('Any COUNT(X) WHERE X is RQLExpression')[0][0] - migrschema['titre'].rdefs[('Personne', 'String')].order = 7 - migrschema['adel'].rdefs[('Personne', 'String')].order = 6 - migrschema['ass'].rdefs[('Personne', 'String')].order = 5 - migrschema['Personne'].description = 'blabla bla' - migrschema['titre'].description = 'usually a title' - migrschema['titre'].rdefs[('Personne', 'String')].description = 'title for this person' - delete_concerne_rqlexpr = self._rrqlexpr_rset('delete', 'concerne') - add_concerne_rqlexpr = self._rrqlexpr_rset('add', 'concerne') + with self.mh() as (cnx, mh): + nbrqlexpr_start = cnx.execute('Any COUNT(X) WHERE X is RQLExpression')[0][0] + migrschema['titre'].rdefs[('Personne', 'String')].order = 7 + migrschema['adel'].rdefs[('Personne', 'String')].order = 6 + migrschema['ass'].rdefs[('Personne', 'String')].order = 5 + migrschema['Personne'].description = 'blabla bla' + migrschema['titre'].description = 'usually a title' + migrschema['titre'].rdefs[('Personne', 'String')].description = 'title for this person' + delete_concerne_rqlexpr = self._rrqlexpr_rset(cnx, 'delete', 'concerne') + add_concerne_rqlexpr = self._rrqlexpr_rset(cnx, 'add', 'concerne') - self.mh.cmd_sync_schema_props_perms(commit=False) + mh.cmd_sync_schema_props_perms(commit=False) - self.assertEqual(cursor.execute('Any D WHERE X name "Personne", X description D')[0][0], - 'blabla bla') - self.assertEqual(cursor.execute('Any D WHERE X name "titre", X description D')[0][0], - 'usually a title') - self.assertEqual(cursor.execute('Any D WHERE X relation_type RT, RT name "titre",' + self.assertEqual(cnx.execute('Any D WHERE X name "Personne", X description D')[0][0], + 'blabla bla') + self.assertEqual(cnx.execute('Any D WHERE X name "titre", X description D')[0][0], + 'usually a title') + self.assertEqual(cnx.execute('Any D WHERE X relation_type RT, RT name "titre",' 'X from_entity FE, FE name "Personne",' 'X description D')[0][0], - 'title for this person') - rinorder = [n for n, in cursor.execute( - 'Any N ORDERBY O,N WHERE X is CWAttribute, X relation_type RT, RT name N,' - 'X from_entity FE, FE name "Personne",' - 'X ordernum O')] - expected = [u'nom', u'prenom', u'sexe', u'promo', u'ass', u'adel', u'titre', - u'web', u'tel', u'fax', u'datenaiss', u'test', u'tzdatenaiss', - u'description', u'firstname', - u'creation_date', u'cwuri', u'modification_date'] - self.assertEqual(expected, rinorder) + 'title for this person') + rinorder = [n for n, in cnx.execute( + 'Any N ORDERBY O,N WHERE X is CWAttribute, X relation_type RT, RT name N,' + 'X from_entity FE, FE name "Personne",' + 'X ordernum O')] + expected = [u'nom', u'prenom', u'sexe', u'promo', u'ass', u'adel', u'titre', + u'web', u'tel', u'fax', u'datenaiss', u'test', u'tzdatenaiss', + u'description', u'firstname', + u'creation_date', u'cwuri', u'modification_date'] + self.assertEqual(expected, rinorder) - # test permissions synchronization #################################### - # new rql expr to add note entity - eexpr = self._erqlexpr_entity('add', 'Note') - self.assertEqual(eexpr.expression, - 'X ecrit_part PE, U in_group G, ' - 'PE require_permission P, P name "add_note", P require_group G') - self.assertEqual([et.name for et in eexpr.reverse_add_permission], ['Note']) - self.assertEqual(eexpr.reverse_read_permission, ()) - self.assertEqual(eexpr.reverse_delete_permission, ()) - self.assertEqual(eexpr.reverse_update_permission, ()) - self.assertTrue(self._rrqlexpr_rset('add', 'para')) - # no rqlexpr to delete para attribute - self.assertFalse(self._rrqlexpr_rset('delete', 'para')) - # new rql expr to add ecrit_par relation - rexpr = self._rrqlexpr_entity('add', 'ecrit_par') - self.assertEqual(rexpr.expression, - 'O require_permission P, P name "add_note", ' - 'U in_group G, P require_group G') - self.assertEqual([rdef.rtype.name for rdef in rexpr.reverse_add_permission], ['ecrit_par']) - self.assertEqual(rexpr.reverse_read_permission, ()) - self.assertEqual(rexpr.reverse_delete_permission, ()) - # no more rqlexpr to delete and add travaille relation - self.assertFalse(self._rrqlexpr_rset('add', 'travaille')) - self.assertFalse(self._rrqlexpr_rset('delete', 'travaille')) - # no more rqlexpr to delete and update Societe entity - self.assertFalse(self._erqlexpr_rset('update', 'Societe')) - self.assertFalse(self._erqlexpr_rset('delete', 'Societe')) - # no more rqlexpr to read Affaire entity - self.assertFalse(self._erqlexpr_rset('read', 'Affaire')) - # rqlexpr to update Affaire entity has been updated - eexpr = self._erqlexpr_entity('update', 'Affaire') - self.assertEqual(eexpr.expression, 'X concerne S, S owned_by U') - # no change for rqlexpr to add and delete Affaire entity - self.assertEqual(len(self._erqlexpr_rset('delete', 'Affaire')), 1) - self.assertEqual(len(self._erqlexpr_rset('add', 'Affaire')), 1) - # no change for rqlexpr to add and delete concerne relation - self.assertEqual(len(self._rrqlexpr_rset('delete', 'concerne')), len(delete_concerne_rqlexpr)) - self.assertEqual(len(self._rrqlexpr_rset('add', 'concerne')), len(add_concerne_rqlexpr)) - # * migrschema involve: - # * 7 erqlexprs deletions (2 in (Affaire + Societe + Note.para) + 1 Note.something - # * 2 rrqlexprs deletions (travaille) - # * 1 update (Affaire update) - # * 2 new (Note add, ecrit_par add) - # * 2 implicit new for attributes (Note.para, Person.test) - # remaining orphan rql expr which should be deleted at commit (composite relation) - # unattached expressions -> pending deletion on commit - self.assertEqual(cursor.execute('Any COUNT(X) WHERE X is RQLExpression, X exprtype "ERQLExpression",' - 'NOT ET1 read_permission X, NOT ET2 add_permission X, ' - 'NOT ET3 delete_permission X, NOT ET4 update_permission X')[0][0], - 7) - self.assertEqual(cursor.execute('Any COUNT(X) WHERE X is RQLExpression, X exprtype "RRQLExpression",' - 'NOT ET1 read_permission X, NOT ET2 add_permission X, ' - 'NOT ET3 delete_permission X, NOT ET4 update_permission X')[0][0], - 2) - # finally - self.assertEqual(cursor.execute('Any COUNT(X) WHERE X is RQLExpression')[0][0], - nbrqlexpr_start + 1 + 2 + 2 + 2) - self.mh.commit() - # unique_together test - self.assertEqual(len(self.schema.eschema('Personne')._unique_together), 1) - self.assertCountEqual(self.schema.eschema('Personne')._unique_together[0], - ('nom', 'prenom', 'datenaiss')) - rset = cursor.execute('Any C WHERE C is CWUniqueTogetherConstraint, C constraint_of ET, ET name "Personne"') - self.assertEqual(len(rset), 1) - relations = [r.name for r in rset.get_entity(0, 0).relations] - self.assertCountEqual(relations, ('nom', 'prenom', 'datenaiss')) + # test permissions synchronization #################################### + # new rql expr to add note entity + eexpr = self._erqlexpr_entity(cnx, 'add', 'Note') + self.assertEqual(eexpr.expression, + 'X ecrit_part PE, U in_group G, ' + 'PE require_permission P, P name "add_note", P require_group G') + self.assertEqual([et.name for et in eexpr.reverse_add_permission], ['Note']) + self.assertEqual(eexpr.reverse_read_permission, ()) + self.assertEqual(eexpr.reverse_delete_permission, ()) + self.assertEqual(eexpr.reverse_update_permission, ()) + self.assertTrue(self._rrqlexpr_rset(cnx, 'add', 'para')) + # no rqlexpr to delete para attribute + self.assertFalse(self._rrqlexpr_rset(cnx, 'delete', 'para')) + # new rql expr to add ecrit_par relation + rexpr = self._rrqlexpr_entity(cnx, 'add', 'ecrit_par') + self.assertEqual(rexpr.expression, + 'O require_permission P, P name "add_note", ' + 'U in_group G, P require_group G') + self.assertEqual([rdef.rtype.name for rdef in rexpr.reverse_add_permission], ['ecrit_par']) + self.assertEqual(rexpr.reverse_read_permission, ()) + self.assertEqual(rexpr.reverse_delete_permission, ()) + # no more rqlexpr to delete and add travaille relation + self.assertFalse(self._rrqlexpr_rset(cnx, 'add', 'travaille')) + self.assertFalse(self._rrqlexpr_rset(cnx, 'delete', 'travaille')) + # no more rqlexpr to delete and update Societe entity + self.assertFalse(self._erqlexpr_rset(cnx, 'update', 'Societe')) + self.assertFalse(self._erqlexpr_rset(cnx, 'delete', 'Societe')) + # no more rqlexpr to read Affaire entity + self.assertFalse(self._erqlexpr_rset(cnx, 'read', 'Affaire')) + # rqlexpr to update Affaire entity has been updated + eexpr = self._erqlexpr_entity(cnx, 'update', 'Affaire') + self.assertEqual(eexpr.expression, 'X concerne S, S owned_by U') + # no change for rqlexpr to add and delete Affaire entity + self.assertEqual(len(self._erqlexpr_rset(cnx, 'delete', 'Affaire')), 1) + self.assertEqual(len(self._erqlexpr_rset(cnx, 'add', 'Affaire')), 1) + # no change for rqlexpr to add and delete concerne relation + self.assertEqual(len(self._rrqlexpr_rset(cnx, 'delete', 'concerne')), + len(delete_concerne_rqlexpr)) + self.assertEqual(len(self._rrqlexpr_rset(cnx, 'add', 'concerne')), + len(add_concerne_rqlexpr)) + # * migrschema involve: + # * 7 erqlexprs deletions (2 in (Affaire + Societe + Note.para) + 1 Note.something + # * 2 rrqlexprs deletions (travaille) + # * 1 update (Affaire update) + # * 2 new (Note add, ecrit_par add) + # * 2 implicit new for attributes (Note.para, Person.test) + # remaining orphan rql expr which should be deleted at commit (composite relation) + # unattached expressions -> pending deletion on commit + self.assertEqual(cnx.execute('Any COUNT(X) WHERE X is RQLExpression, X exprtype "ERQLExpression",' + 'NOT ET1 read_permission X, NOT ET2 add_permission X, ' + 'NOT ET3 delete_permission X, NOT ET4 update_permission X')[0][0], + 7) + self.assertEqual(cnx.execute('Any COUNT(X) WHERE X is RQLExpression, X exprtype "RRQLExpression",' + 'NOT ET1 read_permission X, NOT ET2 add_permission X, ' + 'NOT ET3 delete_permission X, NOT ET4 update_permission X')[0][0], + 2) + # finally + self.assertEqual(cnx.execute('Any COUNT(X) WHERE X is RQLExpression')[0][0], + nbrqlexpr_start + 1 + 2 + 2 + 2) + cnx.commit() + # unique_together test + self.assertEqual(len(self.schema.eschema('Personne')._unique_together), 1) + self.assertCountEqual(self.schema.eschema('Personne')._unique_together[0], + ('nom', 'prenom', 'datenaiss')) + rset = cnx.execute('Any C WHERE C is CWUniqueTogetherConstraint, C constraint_of ET, ET name "Personne"') + self.assertEqual(len(rset), 1) + relations = [r.name for r in rset.get_entity(0, 0).relations] + self.assertCountEqual(relations, ('nom', 'prenom', 'datenaiss')) - def _erqlexpr_rset(self, action, ertype): + def _erqlexpr_rset(self, cnx, action, ertype): rql = 'RQLExpression X WHERE ET is CWEType, ET %s_permission X, ET name %%(name)s' % action - return self.mh.session.execute(rql, {'name': ertype}) - def _erqlexpr_entity(self, action, ertype): - rset = self._erqlexpr_rset(action, ertype) + return cnx.execute(rql, {'name': ertype}) + + def _erqlexpr_entity(self, cnx, action, ertype): + rset = self._erqlexpr_rset(cnx, action, ertype) self.assertEqual(len(rset), 1) return rset.get_entity(0, 0) - def _rrqlexpr_rset(self, action, ertype): + + def _rrqlexpr_rset(self, cnx, action, ertype): rql = 'RQLExpression X WHERE RT is CWRType, RDEF %s_permission X, RT name %%(name)s, RDEF relation_type RT' % action - return self.mh.session.execute(rql, {'name': ertype}) - def _rrqlexpr_entity(self, action, ertype): - rset = self._rrqlexpr_rset(action, ertype) + return cnx.execute(rql, {'name': ertype}) + + def _rrqlexpr_entity(self, cnx, action, ertype): + rset = self._rrqlexpr_rset(cnx, action, ertype) self.assertEqual(len(rset), 1) return rset.get_entity(0, 0) def test_set_size_constraint(self): - # existing previous value - try: - self.mh.cmd_set_size_constraint('CWEType', 'name', 128) - finally: - self.mh.cmd_set_size_constraint('CWEType', 'name', 64) - # non existing previous value - try: - self.mh.cmd_set_size_constraint('CWEType', 'description', 256) - finally: - self.mh.cmd_set_size_constraint('CWEType', 'description', None) + with self.mh() as (cnx, mh): + # existing previous value + try: + mh.cmd_set_size_constraint('CWEType', 'name', 128) + finally: + mh.cmd_set_size_constraint('CWEType', 'name', 64) + # non existing previous value + try: + mh.cmd_set_size_constraint('CWEType', 'description', 256) + finally: + mh.cmd_set_size_constraint('CWEType', 'description', None) @tag('longrun') def test_add_remove_cube_and_deps(self): - cubes = set(self.config.cubes()) - schema = self.repo.schema - self.assertEqual(sorted((str(s), str(o)) for s, o in schema['see_also'].rdefs.iterkeys()), - sorted([('EmailThread', 'EmailThread'), ('Folder', 'Folder'), - ('Bookmark', 'Bookmark'), ('Bookmark', 'Note'), - ('Note', 'Note'), ('Note', 'Bookmark')])) - try: + with self.mh() as (cnx, mh): + schema = self.repo.schema + self.assertEqual(sorted((str(s), str(o)) for s, o in schema['see_also'].rdefs.iterkeys()), + sorted([('EmailThread', 'EmailThread'), ('Folder', 'Folder'), + ('Bookmark', 'Bookmark'), ('Bookmark', 'Note'), + ('Note', 'Note'), ('Note', 'Bookmark')])) try: - self.mh.cmd_remove_cube('email', removedeps=True) + mh.cmd_remove_cube('email', removedeps=True) # file was there because it's an email dependancy, should have been removed self.assertNotIn('email', self.config.cubes()) self.assertNotIn(self.config.cube_dir('email'), self.config.cubes_path()) @@ -538,121 +556,116 @@ ('Note', 'Bookmark')])) self.assertEqual(sorted(schema['see_also'].subjects()), ['Bookmark', 'Folder', 'Note']) self.assertEqual(sorted(schema['see_also'].objects()), ['Bookmark', 'Folder', 'Note']) - self.assertEqual(self.session.execute('Any X WHERE X pkey "system.version.email"').rowcount, 0) - self.assertEqual(self.session.execute('Any X WHERE X pkey "system.version.file"').rowcount, 0) - except : - import traceback - traceback.print_exc() - raise - finally: - self.mh.cmd_add_cube('email') - self.assertIn('email', self.config.cubes()) - self.assertIn(self.config.cube_dir('email'), self.config.cubes_path()) - self.assertIn('file', self.config.cubes()) - self.assertIn(self.config.cube_dir('file'), self.config.cubes_path()) - for ertype in ('Email', 'EmailThread', 'EmailPart', 'File', - 'sender', 'in_thread', 'reply_to', 'data_format'): - self.assertTrue(ertype in schema, ertype) - self.assertEqual(sorted(schema['see_also'].rdefs.iterkeys()), - sorted([('EmailThread', 'EmailThread'), ('Folder', 'Folder'), - ('Bookmark', 'Bookmark'), - ('Bookmark', 'Note'), - ('Note', 'Note'), - ('Note', 'Bookmark')])) - self.assertEqual(sorted(schema['see_also'].subjects()), ['Bookmark', 'EmailThread', 'Folder', 'Note']) - self.assertEqual(sorted(schema['see_also'].objects()), ['Bookmark', 'EmailThread', 'Folder', 'Note']) - from cubes.email.__pkginfo__ import version as email_version - from cubes.file.__pkginfo__ import version as file_version - self.assertEqual(self.session.execute('Any V WHERE X value V, X pkey "system.version.email"')[0][0], - email_version) - self.assertEqual(self.session.execute('Any V WHERE X value V, X pkey "system.version.file"')[0][0], - file_version) - # trick: overwrite self.maxeid to avoid deletion of just reintroduced - # types (and their associated tables!) - self.maxeid = self.session.execute('Any MAX(X)')[0][0] - # why this commit is necessary is unclear to me (though without it - # next test may fail complaining of missing tables - self.session.commit(free_cnxset=False) + self.assertEqual(cnx.execute('Any X WHERE X pkey "system.version.email"').rowcount, 0) + self.assertEqual(cnx.execute('Any X WHERE X pkey "system.version.file"').rowcount, 0) + finally: + mh.cmd_add_cube('email') + self.assertIn('email', self.config.cubes()) + self.assertIn(self.config.cube_dir('email'), self.config.cubes_path()) + self.assertIn('file', self.config.cubes()) + self.assertIn(self.config.cube_dir('file'), self.config.cubes_path()) + for ertype in ('Email', 'EmailThread', 'EmailPart', 'File', + 'sender', 'in_thread', 'reply_to', 'data_format'): + self.assertTrue(ertype in schema, ertype) + self.assertEqual(sorted(schema['see_also'].rdefs.iterkeys()), + sorted([('EmailThread', 'EmailThread'), ('Folder', 'Folder'), + ('Bookmark', 'Bookmark'), + ('Bookmark', 'Note'), + ('Note', 'Note'), + ('Note', 'Bookmark')])) + self.assertEqual(sorted(schema['see_also'].subjects()), ['Bookmark', 'EmailThread', 'Folder', 'Note']) + self.assertEqual(sorted(schema['see_also'].objects()), ['Bookmark', 'EmailThread', 'Folder', 'Note']) + from cubes.email.__pkginfo__ import version as email_version + from cubes.file.__pkginfo__ import version as file_version + self.assertEqual(cnx.execute('Any V WHERE X value V, X pkey "system.version.email"')[0][0], + email_version) + self.assertEqual(cnx.execute('Any V WHERE X value V, X pkey "system.version.file"')[0][0], + file_version) + # trick: overwrite self.maxeid to avoid deletion of just reintroduced + # types (and their associated tables!) + self.maxeid = cnx.execute('Any MAX(X)')[0][0] + # why this commit is necessary is unclear to me (though without it + # next test may fail complaining of missing tables + cnx.commit() @tag('longrun') def test_add_remove_cube_no_deps(self): - cubes = set(self.config.cubes()) - schema = self.repo.schema - try: + with self.mh() as (cnx, mh): + cubes = set(self.config.cubes()) + schema = self.repo.schema try: - self.mh.cmd_remove_cube('email') + mh.cmd_remove_cube('email') cubes.remove('email') self.assertNotIn('email', self.config.cubes()) self.assertIn('file', self.config.cubes()) for ertype in ('Email', 'EmailThread', 'EmailPart', 'sender', 'in_thread', 'reply_to'): self.assertFalse(ertype in schema, ertype) - except : - import traceback - traceback.print_exc() - raise - finally: - self.mh.cmd_add_cube('email') - self.assertIn('email', self.config.cubes()) - # trick: overwrite self.maxeid to avoid deletion of just reintroduced - # types (and their associated tables!) - self.maxeid = self.session.execute('Any MAX(X)')[0][0] - # why this commit is necessary is unclear to me (though without it - # next test may fail complaining of missing tables - self.session.commit(free_cnxset=False) + finally: + mh.cmd_add_cube('email') + self.assertIn('email', self.config.cubes()) + # trick: overwrite self.maxeid to avoid deletion of just reintroduced + # types (and their associated tables!) + self.maxeid = cnx.execute('Any MAX(X)')[0][0] # XXXXXXX KILL KENNY + # why this commit is necessary is unclear to me (though without it + # next test may fail complaining of missing tables + cnx.commit() def test_remove_dep_cube(self): - with self.assertRaises(ConfigurationError) as cm: - self.mh.cmd_remove_cube('file') - self.assertEqual(str(cm.exception), "can't remove cube file, used as a dependency") + with self.mh() as (cnx, mh): + with self.assertRaises(ConfigurationError) as cm: + mh.cmd_remove_cube('file') + self.assertEqual(str(cm.exception), "can't remove cube file, used as a dependency") @tag('longrun') def test_introduce_base_class(self): - self.mh.cmd_add_entity_type('Para') - self.assertEqual(sorted(et.type for et in self.schema['Para'].specialized_by()), - ['Note']) - self.assertEqual(self.schema['Note'].specializes().type, 'Para') - self.mh.cmd_add_entity_type('Text') - self.assertEqual(sorted(et.type for et in self.schema['Para'].specialized_by()), - ['Note', 'Text']) - self.assertEqual(self.schema['Text'].specializes().type, 'Para') - # test columns have been actually added - text = self.session.execute('INSERT Text X: X para "hip", X summary "hop", X newattr "momo"').get_entity(0, 0) - note = self.session.execute('INSERT Note X: X para "hip", X shortpara "hop", X newattr "momo", X unique_id "x"').get_entity(0, 0) - aff = self.session.execute('INSERT Affaire X').get_entity(0, 0) - self.assertTrue(self.session.execute('SET X newnotinlined Y WHERE X eid %(x)s, Y eid %(y)s', - {'x': text.eid, 'y': aff.eid})) - self.assertTrue(self.session.execute('SET X newnotinlined Y WHERE X eid %(x)s, Y eid %(y)s', - {'x': note.eid, 'y': aff.eid})) - self.assertTrue(self.session.execute('SET X newinlined Y WHERE X eid %(x)s, Y eid %(y)s', - {'x': text.eid, 'y': aff.eid})) - self.assertTrue(self.session.execute('SET X newinlined Y WHERE X eid %(x)s, Y eid %(y)s', - {'x': note.eid, 'y': aff.eid})) - # XXX remove specializes by ourselves, else tearDown fails when removing - # Para because of Note inheritance. This could be fixed by putting the - # MemSchemaCWETypeDel(session, name) operation in the - # after_delete_entity(CWEType) hook, since in that case the MemSchemaSpecializesDel - # operation would be removed before, but I'm not sure this is a desired behaviour. - # - # also we need more tests about introducing/removing base classes or - # specialization relationship... - self.session.execute('DELETE X specializes Y WHERE Y name "Para"') - self.session.commit(free_cnxset=False) - self.assertEqual(sorted(et.type for et in self.schema['Para'].specialized_by()), - []) - self.assertEqual(self.schema['Note'].specializes(), None) - self.assertEqual(self.schema['Text'].specializes(), None) + with self.mh() as (cnx, mh): + mh.cmd_add_entity_type('Para') + self.assertEqual(sorted(et.type for et in self.schema['Para'].specialized_by()), + ['Note']) + self.assertEqual(self.schema['Note'].specializes().type, 'Para') + mh.cmd_add_entity_type('Text') + self.assertEqual(sorted(et.type for et in self.schema['Para'].specialized_by()), + ['Note', 'Text']) + self.assertEqual(self.schema['Text'].specializes().type, 'Para') + # test columns have been actually added + text = cnx.execute('INSERT Text X: X para "hip", X summary "hop", X newattr "momo"').get_entity(0, 0) + note = cnx.execute('INSERT Note X: X para "hip", X shortpara "hop", X newattr "momo", X unique_id "x"').get_entity(0, 0) + aff = cnx.execute('INSERT Affaire X').get_entity(0, 0) + self.assertTrue(cnx.execute('SET X newnotinlined Y WHERE X eid %(x)s, Y eid %(y)s', + {'x': text.eid, 'y': aff.eid})) + self.assertTrue(cnx.execute('SET X newnotinlined Y WHERE X eid %(x)s, Y eid %(y)s', + {'x': note.eid, 'y': aff.eid})) + self.assertTrue(cnx.execute('SET X newinlined Y WHERE X eid %(x)s, Y eid %(y)s', + {'x': text.eid, 'y': aff.eid})) + self.assertTrue(cnx.execute('SET X newinlined Y WHERE X eid %(x)s, Y eid %(y)s', + {'x': note.eid, 'y': aff.eid})) + # XXX remove specializes by ourselves, else tearDown fails when removing + # Para because of Note inheritance. This could be fixed by putting the + # MemSchemaCWETypeDel(session, name) operation in the + # after_delete_entity(CWEType) hook, since in that case the MemSchemaSpecializesDel + # operation would be removed before, but I'm not sure this is a desired behaviour. + # + # also we need more tests about introducing/removing base classes or + # specialization relationship... + cnx.execute('DELETE X specializes Y WHERE Y name "Para"') + cnx.commit() + self.assertEqual(sorted(et.type for et in self.schema['Para'].specialized_by()), + []) + self.assertEqual(self.schema['Note'].specializes(), None) + self.assertEqual(self.schema['Text'].specializes(), None) def test_add_symmetric_relation_type(self): - same_as_sql = self.mh.sqlexec("SELECT sql FROM sqlite_master WHERE type='table' " - "and name='same_as_relation'") - self.assertFalse(same_as_sql) - self.mh.cmd_add_relation_type('same_as') - same_as_sql = self.mh.sqlexec("SELECT sql FROM sqlite_master WHERE type='table' " - "and name='same_as_relation'") - self.assertTrue(same_as_sql) + with self.mh() as (cnx, mh): + same_as_sql = mh.sqlexec("SELECT sql FROM sqlite_master WHERE type='table' " + "and name='same_as_relation'") + self.assertFalse(same_as_sql) + mh.cmd_add_relation_type('same_as') + same_as_sql = mh.sqlexec("SELECT sql FROM sqlite_master WHERE type='table' " + "and name='same_as_relation'") + self.assertTrue(same_as_sql) if __name__ == '__main__': unittest_main()