--- a/server/test/unittest_migractions.py Wed May 28 17:33:16 2014 +0200
+++ b/server/test/unittest_migractions.py Fri May 30 11:10:54 2014 +0200
@@ -19,16 +19,16 @@
from datetime import date
from os.path import join
+from contextlib import contextmanager
-from logilab.common.testlib import TestCase, unittest_main, Tags, tag
+from logilab.common.testlib import unittest_main, Tags, tag
from yams.constraints import UniqueConstraint
from cubicweb import ConfigurationError, ValidationError
from cubicweb.devtools.testlib import CubicWebTC
-from cubicweb.schema import CubicWebSchemaLoader
from cubicweb.server.sqlutils import SQL_PREFIX
-from cubicweb.server.migractions import *
+from cubicweb.server.migractions import ServerMigrationHelper
import cubicweb.devtools
@@ -61,467 +61,485 @@
def setUp(self):
CubicWebTC.setUp(self)
- self.mh = ServerMigrationHelper(self.repo.config, migrschema,
- repo=self.repo, cnx=self.cnx,
- interactive=False)
- assert self.cnx is self.mh.cnx
- assert self.session is self.mh.session, (self.session.id, self.mh.session.id)
def tearDown(self):
CubicWebTC.tearDown(self)
self.repo.vreg['etypes'].clear_caches()
+ @contextmanager
+ def mh(self):
+ with self.admin_access.client_cnx() as cnx:
+ yield cnx, ServerMigrationHelper(self.repo.config, migrschema,
+ repo=self.repo, cnx=cnx,
+ interactive=False)
+
def test_add_attribute_bool(self):
- self.assertNotIn('yesno', self.schema)
- self.session.create_entity('Note')
- self.commit()
- self.mh.cmd_add_attribute('Note', 'yesno')
- self.assertIn('yesno', self.schema)
- self.assertEqual(self.schema['yesno'].subjects(), ('Note',))
- self.assertEqual(self.schema['yesno'].objects(), ('Boolean',))
- self.assertEqual(self.schema['Note'].default('yesno'), False)
- # test default value set on existing entities
- note = self.session.execute('Note X').get_entity(0, 0)
- self.assertEqual(note.yesno, False)
- # test default value set for next entities
- self.assertEqual(self.session.create_entity('Note').yesno, False)
- self.mh.rollback()
+ with self.mh() as (cnx, mh):
+ self.assertNotIn('yesno', self.schema)
+ cnx.create_entity('Note')
+ cnx.commit()
+ mh.cmd_add_attribute('Note', 'yesno')
+ self.assertIn('yesno', self.schema)
+ self.assertEqual(self.schema['yesno'].subjects(), ('Note',))
+ self.assertEqual(self.schema['yesno'].objects(), ('Boolean',))
+ self.assertEqual(self.schema['Note'].default('yesno'), False)
+ # test default value set on existing entities
+ note = cnx.execute('Note X').get_entity(0, 0)
+ self.assertEqual(note.yesno, False)
+ # test default value set for next entities
+ self.assertEqual(cnx.create_entity('Note').yesno, False)
def test_add_attribute_int(self):
- self.assertNotIn('whatever', self.schema)
- self.session.create_entity('Note')
- self.session.commit(free_cnxset=False)
- orderdict = dict(self.mh.rqlexec('Any RTN, O WHERE X name "Note", RDEF from_entity X, '
+ with self.mh() as (cnx, mh):
+ self.assertNotIn('whatever', self.schema)
+ cnx.create_entity('Note')
+ cnx.commit()
+ orderdict = dict(mh.rqlexec('Any RTN, O WHERE X name "Note", RDEF from_entity X, '
+ 'RDEF relation_type RT, RDEF ordernum O, RT name RTN'))
+ mh.cmd_add_attribute('Note', 'whatever')
+ self.assertIn('whatever', self.schema)
+ self.assertEqual(self.schema['whatever'].subjects(), ('Note',))
+ self.assertEqual(self.schema['whatever'].objects(), ('Int',))
+ self.assertEqual(self.schema['Note'].default('whatever'), 0)
+ # test default value set on existing entities
+ note = cnx.execute('Note X').get_entity(0, 0)
+ self.assertIsInstance(note.whatever, int)
+ self.assertEqual(note.whatever, 0)
+ # test default value set for next entities
+ self.assertEqual(cnx.create_entity('Note').whatever, 0)
+ # test attribute order
+ orderdict2 = dict(mh.rqlexec('Any RTN, O WHERE X name "Note", RDEF from_entity X, '
'RDEF relation_type RT, RDEF ordernum O, RT name RTN'))
- self.mh.cmd_add_attribute('Note', 'whatever')
- self.assertIn('whatever', self.schema)
- self.assertEqual(self.schema['whatever'].subjects(), ('Note',))
- self.assertEqual(self.schema['whatever'].objects(), ('Int',))
- self.assertEqual(self.schema['Note'].default('whatever'), 0)
- # test default value set on existing entities
- note = self.session.execute('Note X').get_entity(0, 0)
- self.assertIsInstance(note.whatever, int)
- self.assertEqual(note.whatever, 0)
- # test default value set for next entities
- self.assertEqual(self.session.create_entity('Note').whatever, 0)
- # test attribute order
- orderdict2 = dict(self.mh.rqlexec('Any RTN, O WHERE X name "Note", RDEF from_entity X, '
- 'RDEF relation_type RT, RDEF ordernum O, RT name RTN'))
- whateverorder = migrschema['whatever'].rdef('Note', 'Int').order
- for k, v in orderdict.iteritems():
- if v >= whateverorder:
- orderdict[k] = v+1
- orderdict['whatever'] = whateverorder
- self.assertDictEqual(orderdict, orderdict2)
- #self.assertEqual([r.type for r in self.schema['Note'].ordered_relations()],
- # ['modification_date', 'creation_date', 'owned_by',
- # 'eid', 'ecrit_par', 'inline1', 'date', 'type',
- # 'whatever', 'date', 'in_basket'])
- # NB: commit instead of rollback make following test fail with py2.5
- # this sounds like a pysqlite/2.5 bug (the same eid is affected to
- # two different entities)
- self.mh.rollback()
+ whateverorder = migrschema['whatever'].rdef('Note', 'Int').order
+ for k, v in orderdict.iteritems():
+ if v >= whateverorder:
+ orderdict[k] = v+1
+ orderdict['whatever'] = whateverorder
+ self.assertDictEqual(orderdict, orderdict2)
+ #self.assertEqual([r.type for r in self.schema['Note'].ordered_relations()],
+ # ['modification_date', 'creation_date', 'owned_by',
+ # 'eid', 'ecrit_par', 'inline1', 'date', 'type',
+ # 'whatever', 'date', 'in_basket'])
+ # NB: commit instead of rollback make following test fail with py2.5
+ # this sounds like a pysqlite/2.5 bug (the same eid is affected to
+ # two different entities)
def test_add_attribute_varchar(self):
- self.assertNotIn('whatever', self.schema)
- self.session.create_entity('Note')
- self.session.commit(free_cnxset=False)
- self.assertNotIn('shortpara', self.schema)
- self.mh.cmd_add_attribute('Note', 'shortpara')
- self.assertIn('shortpara', self.schema)
- self.assertEqual(self.schema['shortpara'].subjects(), ('Note', ))
- self.assertEqual(self.schema['shortpara'].objects(), ('String', ))
- # test created column is actually a varchar(64)
- notesql = self.mh.sqlexec("SELECT sql FROM sqlite_master WHERE type='table' and name='%sNote'" % SQL_PREFIX)[0][0]
- fields = dict(x.strip().split()[:2] for x in notesql.split('(', 1)[1].rsplit(')', 1)[0].split(','))
- self.assertEqual(fields['%sshortpara' % SQL_PREFIX], 'varchar(64)')
- # test default value set on existing entities
- self.assertEqual(self.session.execute('Note X').get_entity(0, 0).shortpara, 'hop')
- # test default value set for next entities
- self.assertEqual(self.session.create_entity('Note').shortpara, 'hop')
- self.mh.rollback()
+ with self.mh() as (cnx, mh):
+ self.assertNotIn('whatever', self.schema)
+ cnx.create_entity('Note')
+ cnx.commit()
+ self.assertNotIn('shortpara', self.schema)
+ mh.cmd_add_attribute('Note', 'shortpara')
+ self.assertIn('shortpara', self.schema)
+ self.assertEqual(self.schema['shortpara'].subjects(), ('Note', ))
+ self.assertEqual(self.schema['shortpara'].objects(), ('String', ))
+ # test created column is actually a varchar(64)
+ notesql = mh.sqlexec("SELECT sql FROM sqlite_master WHERE type='table' and name='%sNote'" % SQL_PREFIX)[0][0]
+ fields = dict(x.strip().split()[:2] for x in notesql.split('(', 1)[1].rsplit(')', 1)[0].split(','))
+ self.assertEqual(fields['%sshortpara' % SQL_PREFIX], 'varchar(64)')
+ # test default value set on existing entities
+ self.assertEqual(cnx.execute('Note X').get_entity(0, 0).shortpara, 'hop')
+ # test default value set for next entities
+ self.assertEqual(cnx.create_entity('Note').shortpara, 'hop')
def test_add_datetime_with_default_value_attribute(self):
- self.assertNotIn('mydate', self.schema)
- self.assertNotIn('oldstyledefaultdate', self.schema)
- self.assertNotIn('newstyledefaultdate', self.schema)
- self.mh.cmd_add_attribute('Note', 'mydate')
- self.mh.cmd_add_attribute('Note', 'oldstyledefaultdate')
- self.mh.cmd_add_attribute('Note', 'newstyledefaultdate')
- self.assertIn('mydate', self.schema)
- self.assertIn('oldstyledefaultdate', self.schema)
- self.assertIn('newstyledefaultdate', self.schema)
- self.assertEqual(self.schema['mydate'].subjects(), ('Note', ))
- self.assertEqual(self.schema['mydate'].objects(), ('Date', ))
- testdate = date(2005, 12, 13)
- eid1 = self.mh.rqlexec('INSERT Note N')[0][0]
- eid2 = self.mh.rqlexec('INSERT Note N: N mydate %(mydate)s', {'mydate' : testdate})[0][0]
- d1 = self.mh.rqlexec('Any D WHERE X eid %(x)s, X mydate D', {'x': eid1})[0][0]
- d2 = self.mh.rqlexec('Any D WHERE X eid %(x)s, X mydate D', {'x': eid2})[0][0]
- d3 = self.mh.rqlexec('Any D WHERE X eid %(x)s, X oldstyledefaultdate D', {'x': eid1})[0][0]
- d4 = self.mh.rqlexec('Any D WHERE X eid %(x)s, X newstyledefaultdate D', {'x': eid1})[0][0]
- self.assertEqual(d1, date.today())
- self.assertEqual(d2, testdate)
- myfavoritedate = date(2013, 1, 1)
- self.assertEqual(d3, myfavoritedate)
- self.assertEqual(d4, myfavoritedate)
- self.mh.rollback()
+ with self.mh() as (cnx, mh):
+ self.assertNotIn('mydate', self.schema)
+ self.assertNotIn('oldstyledefaultdate', self.schema)
+ self.assertNotIn('newstyledefaultdate', self.schema)
+ mh.cmd_add_attribute('Note', 'mydate')
+ mh.cmd_add_attribute('Note', 'oldstyledefaultdate')
+ mh.cmd_add_attribute('Note', 'newstyledefaultdate')
+ self.assertIn('mydate', self.schema)
+ self.assertIn('oldstyledefaultdate', self.schema)
+ self.assertIn('newstyledefaultdate', self.schema)
+ self.assertEqual(self.schema['mydate'].subjects(), ('Note', ))
+ self.assertEqual(self.schema['mydate'].objects(), ('Date', ))
+ testdate = date(2005, 12, 13)
+ eid1 = mh.rqlexec('INSERT Note N')[0][0]
+ eid2 = mh.rqlexec('INSERT Note N: N mydate %(mydate)s', {'mydate' : testdate})[0][0]
+ d1 = mh.rqlexec('Any D WHERE X eid %(x)s, X mydate D', {'x': eid1})[0][0]
+ d2 = mh.rqlexec('Any D WHERE X eid %(x)s, X mydate D', {'x': eid2})[0][0]
+ d3 = mh.rqlexec('Any D WHERE X eid %(x)s, X oldstyledefaultdate D', {'x': eid1})[0][0]
+ d4 = mh.rqlexec('Any D WHERE X eid %(x)s, X newstyledefaultdate D', {'x': eid1})[0][0]
+ self.assertEqual(d1, date.today())
+ self.assertEqual(d2, testdate)
+ myfavoritedate = date(2013, 1, 1)
+ self.assertEqual(d3, myfavoritedate)
+ self.assertEqual(d4, myfavoritedate)
def test_drop_chosen_constraints_ctxmanager(self):
- with self.mh.cmd_dropped_constraints('Note', 'unique_id', UniqueConstraint):
- self.mh.cmd_add_attribute('Note', 'unique_id')
- # make sure the maxsize constraint is not dropped
- self.assertRaises(ValidationError,
- self.mh.rqlexec,
- 'INSERT Note N: N unique_id "xyz"')
- self.mh.rollback()
- # make sure the unique constraint is dropped
- self.mh.rqlexec('INSERT Note N: N unique_id "x"')
- self.mh.rqlexec('INSERT Note N: N unique_id "x"')
- self.mh.rqlexec('DELETE Note N')
- self.mh.rollback()
+ with self.mh() as (cnx, mh):
+ with mh.cmd_dropped_constraints('Note', 'unique_id', UniqueConstraint):
+ mh.cmd_add_attribute('Note', 'unique_id')
+ # make sure the maxsize constraint is not dropped
+ self.assertRaises(ValidationError,
+ mh.rqlexec,
+ 'INSERT Note N: N unique_id "xyz"')
+ mh.rollback()
+ # make sure the unique constraint is dropped
+ mh.rqlexec('INSERT Note N: N unique_id "x"')
+ mh.rqlexec('INSERT Note N: N unique_id "x"')
+ mh.rqlexec('DELETE Note N')
def test_drop_required_ctxmanager(self):
- with self.mh.cmd_dropped_constraints('Note', 'unique_id', cstrtype=None,
- droprequired=True):
- self.mh.cmd_add_attribute('Note', 'unique_id')
- self.mh.rqlexec('INSERT Note N')
- # make sure the required=True was restored
- self.assertRaises(ValidationError, self.mh.rqlexec, 'INSERT Note N')
- self.mh.rollback()
+ with self.mh() as (cnx, mh):
+ with mh.cmd_dropped_constraints('Note', 'unique_id', cstrtype=None,
+ droprequired=True):
+ mh.cmd_add_attribute('Note', 'unique_id')
+ mh.rqlexec('INSERT Note N')
+ # make sure the required=True was restored
+ self.assertRaises(ValidationError, mh.rqlexec, 'INSERT Note N')
+ mh.rollback()
def test_rename_attribute(self):
- self.assertNotIn('civility', self.schema)
- eid1 = self.mh.rqlexec('INSERT Personne X: X nom "lui", X sexe "M"')[0][0]
- eid2 = self.mh.rqlexec('INSERT Personne X: X nom "l\'autre", X sexe NULL')[0][0]
- self.mh.cmd_rename_attribute('Personne', 'sexe', 'civility')
- self.assertNotIn('sexe', self.schema)
- self.assertIn('civility', self.schema)
- # test data has been backported
- c1 = self.mh.rqlexec('Any C WHERE X eid %s, X civility C' % eid1)[0][0]
- self.assertEqual(c1, 'M')
- c2 = self.mh.rqlexec('Any C WHERE X eid %s, X civility C' % eid2)[0][0]
- self.assertEqual(c2, None)
-
+ with self.mh() as (cnx, mh):
+ self.assertNotIn('civility', self.schema)
+ eid1 = mh.rqlexec('INSERT Personne X: X nom "lui", X sexe "M"')[0][0]
+ eid2 = mh.rqlexec('INSERT Personne X: X nom "l\'autre", X sexe NULL')[0][0]
+ mh.cmd_rename_attribute('Personne', 'sexe', 'civility')
+ self.assertNotIn('sexe', self.schema)
+ self.assertIn('civility', self.schema)
+ # test data has been backported
+ c1 = mh.rqlexec('Any C WHERE X eid %s, X civility C' % eid1)[0][0]
+ self.assertEqual(c1, 'M')
+ c2 = mh.rqlexec('Any C WHERE X eid %s, X civility C' % eid2)[0][0]
+ self.assertEqual(c2, None)
def test_workflow_actions(self):
- wf = self.mh.cmd_add_workflow(u'foo', ('Personne', 'Email'),
- ensure_workflowable=False)
- for etype in ('Personne', 'Email'):
- s1 = self.mh.rqlexec('Any N WHERE WF workflow_of ET, ET name "%s", WF name N' %
- etype)[0][0]
- self.assertEqual(s1, "foo")
- s1 = self.mh.rqlexec('Any N WHERE ET default_workflow WF, ET name "%s", WF name N' %
- etype)[0][0]
- self.assertEqual(s1, "foo")
+ with self.mh() as (cnx, mh):
+ wf = mh.cmd_add_workflow(u'foo', ('Personne', 'Email'),
+ ensure_workflowable=False)
+ for etype in ('Personne', 'Email'):
+ s1 = mh.rqlexec('Any N WHERE WF workflow_of ET, ET name "%s", WF name N' %
+ etype)[0][0]
+ self.assertEqual(s1, "foo")
+ s1 = mh.rqlexec('Any N WHERE ET default_workflow WF, ET name "%s", WF name N' %
+ etype)[0][0]
+ self.assertEqual(s1, "foo")
def test_add_entity_type(self):
- self.assertNotIn('Folder2', self.schema)
- self.assertNotIn('filed_under2', self.schema)
- self.mh.cmd_add_entity_type('Folder2')
- self.assertIn('Folder2', self.schema)
- self.assertIn('Old', self.schema)
- self.assertTrue(self.session.execute('CWEType X WHERE X name "Folder2"'))
- self.assertIn('filed_under2', self.schema)
- self.assertTrue(self.session.execute('CWRType X WHERE X name "filed_under2"'))
- self.assertEqual(sorted(str(rs) for rs in self.schema['Folder2'].subject_relations()),
- ['created_by', 'creation_date', 'cw_source', 'cwuri',
- 'description', 'description_format',
- 'eid',
- 'filed_under2', 'has_text',
- 'identity', 'in_basket', 'is', 'is_instance_of',
- 'modification_date', 'name', 'owned_by'])
- self.assertEqual([str(rs) for rs in self.schema['Folder2'].object_relations()],
- ['filed_under2', 'identity'])
- # Old will be missing as it has been renamed into 'New' in the migrated
- # schema while New hasn't been added here.
- self.assertEqual(sorted(str(e) for e in self.schema['filed_under2'].subjects()),
- sorted(str(e) for e in self.schema.entities() if not e.final and e != 'Old'))
- self.assertEqual(self.schema['filed_under2'].objects(), ('Folder2',))
- eschema = self.schema.eschema('Folder2')
- for cstr in eschema.rdef('name').constraints:
- self.assertTrue(hasattr(cstr, 'eid'))
+ with self.mh() as (cnx, mh):
+ self.assertNotIn('Folder2', self.schema)
+ self.assertNotIn('filed_under2', self.schema)
+ mh.cmd_add_entity_type('Folder2')
+ self.assertIn('Folder2', self.schema)
+ self.assertIn('Old', self.schema)
+ self.assertTrue(cnx.execute('CWEType X WHERE X name "Folder2"'))
+ self.assertIn('filed_under2', self.schema)
+ self.assertTrue(cnx.execute('CWRType X WHERE X name "filed_under2"'))
+ self.assertEqual(sorted(str(rs) for rs in self.schema['Folder2'].subject_relations()),
+ ['created_by', 'creation_date', 'cw_source', 'cwuri',
+ 'description', 'description_format',
+ 'eid',
+ 'filed_under2', 'has_text',
+ 'identity', 'in_basket', 'is', 'is_instance_of',
+ 'modification_date', 'name', 'owned_by'])
+ self.assertEqual([str(rs) for rs in self.schema['Folder2'].object_relations()],
+ ['filed_under2', 'identity'])
+ # Old will be missing as it has been renamed into 'New' in the migrated
+ # schema while New hasn't been added here.
+ self.assertEqual(sorted(str(e) for e in self.schema['filed_under2'].subjects()),
+ sorted(str(e) for e in self.schema.entities() if not e.final and e != 'Old'))
+ self.assertEqual(self.schema['filed_under2'].objects(), ('Folder2',))
+ eschema = self.schema.eschema('Folder2')
+ for cstr in eschema.rdef('name').constraints:
+ self.assertTrue(hasattr(cstr, 'eid'))
def test_add_drop_entity_type(self):
- self.mh.cmd_add_entity_type('Folder2')
- wf = self.mh.cmd_add_workflow(u'folder2 wf', 'Folder2',
- ensure_workflowable=False)
- todo = wf.add_state(u'todo', initial=True)
- done = wf.add_state(u'done')
- wf.add_transition(u'redoit', done, todo)
- wf.add_transition(u'markasdone', todo, done)
- self.session.commit(free_cnxset=False)
- eschema = self.schema.eschema('Folder2')
- self.mh.cmd_drop_entity_type('Folder2')
- self.assertNotIn('Folder2', self.schema)
- self.assertFalse(self.session.execute('CWEType X WHERE X name "Folder2"'))
- # test automatic workflow deletion
- self.assertFalse(self.session.execute('Workflow X WHERE NOT X workflow_of ET'))
- self.assertFalse(self.session.execute('State X WHERE NOT X state_of WF'))
- self.assertFalse(self.session.execute('Transition X WHERE NOT X transition_of WF'))
+ with self.mh() as (cnx, mh):
+ mh.cmd_add_entity_type('Folder2')
+ wf = mh.cmd_add_workflow(u'folder2 wf', 'Folder2',
+ ensure_workflowable=False)
+ todo = wf.add_state(u'todo', initial=True)
+ done = wf.add_state(u'done')
+ wf.add_transition(u'redoit', done, todo)
+ wf.add_transition(u'markasdone', todo, done)
+ cnx.commit()
+ eschema = self.schema.eschema('Folder2')
+ mh.cmd_drop_entity_type('Folder2')
+ self.assertNotIn('Folder2', self.schema)
+ self.assertFalse(cnx.execute('CWEType X WHERE X name "Folder2"'))
+ # test automatic workflow deletion
+ self.assertFalse(cnx.execute('Workflow X WHERE NOT X workflow_of ET'))
+ self.assertFalse(cnx.execute('State X WHERE NOT X state_of WF'))
+ self.assertFalse(cnx.execute('Transition X WHERE NOT X transition_of WF'))
def test_rename_entity_type(self):
- entity = self.mh.create_entity('Old', name=u'old')
- self.repo.type_and_source_from_eid(entity.eid, entity._cw)
- self.mh.cmd_rename_entity_type('Old', 'New')
- self.mh.cmd_rename_attribute('New', 'name', 'new_name')
+ with self.mh() as (cnx, mh):
+ entity = mh.create_entity('Old', name=u'old')
+ self.repo.type_and_source_from_eid(entity.eid, entity._cw)
+ mh.cmd_rename_entity_type('Old', 'New')
+ mh.cmd_rename_attribute('New', 'name', 'new_name')
def test_add_drop_relation_type(self):
- self.mh.cmd_add_entity_type('Folder2', auto=False)
- self.mh.cmd_add_relation_type('filed_under2')
- self.assertIn('filed_under2', self.schema)
- # Old will be missing as it has been renamed into 'New' in the migrated
- # schema while New hasn't been added here.
- self.assertEqual(sorted(str(e) for e in self.schema['filed_under2'].subjects()),
- sorted(str(e) for e in self.schema.entities()
- if not e.final and e != 'Old'))
- self.assertEqual(self.schema['filed_under2'].objects(), ('Folder2',))
- self.mh.cmd_drop_relation_type('filed_under2')
- self.assertNotIn('filed_under2', self.schema)
+ with self.mh() as (cnx, mh):
+ mh.cmd_add_entity_type('Folder2', auto=False)
+ mh.cmd_add_relation_type('filed_under2')
+ self.assertIn('filed_under2', self.schema)
+ # Old will be missing as it has been renamed into 'New' in the migrated
+ # schema while New hasn't been added here.
+ self.assertEqual(sorted(str(e) for e in self.schema['filed_under2'].subjects()),
+ sorted(str(e) for e in self.schema.entities()
+ if not e.final and e != 'Old'))
+ self.assertEqual(self.schema['filed_under2'].objects(), ('Folder2',))
+ mh.cmd_drop_relation_type('filed_under2')
+ self.assertNotIn('filed_under2', self.schema)
def test_add_relation_definition_nortype(self):
- self.mh.cmd_add_relation_definition('Personne', 'concerne2', 'Affaire')
- self.assertEqual(self.schema['concerne2'].subjects(),
- ('Personne',))
- self.assertEqual(self.schema['concerne2'].objects(),
- ('Affaire', ))
- self.assertEqual(self.schema['concerne2'].rdef('Personne', 'Affaire').cardinality,
- '1*')
- self.mh.cmd_add_relation_definition('Personne', 'concerne2', 'Note')
- self.assertEqual(sorted(self.schema['concerne2'].objects()), ['Affaire', 'Note'])
- self.mh.create_entity('Personne', nom=u'tot')
- self.mh.create_entity('Affaire')
- self.mh.rqlexec('SET X concerne2 Y WHERE X is Personne, Y is Affaire')
- self.session.commit(free_cnxset=False)
- self.mh.cmd_drop_relation_definition('Personne', 'concerne2', 'Affaire')
- self.assertIn('concerne2', self.schema)
- self.mh.cmd_drop_relation_definition('Personne', 'concerne2', 'Note')
- self.assertNotIn('concerne2', self.schema)
+ with self.mh() as (cnx, mh):
+ mh.cmd_add_relation_definition('Personne', 'concerne2', 'Affaire')
+ self.assertEqual(self.schema['concerne2'].subjects(),
+ ('Personne',))
+ self.assertEqual(self.schema['concerne2'].objects(),
+ ('Affaire', ))
+ self.assertEqual(self.schema['concerne2'].rdef('Personne', 'Affaire').cardinality,
+ '1*')
+ mh.cmd_add_relation_definition('Personne', 'concerne2', 'Note')
+ self.assertEqual(sorted(self.schema['concerne2'].objects()), ['Affaire', 'Note'])
+ mh.create_entity('Personne', nom=u'tot')
+ mh.create_entity('Affaire')
+ mh.rqlexec('SET X concerne2 Y WHERE X is Personne, Y is Affaire')
+ cnx.commit()
+ mh.cmd_drop_relation_definition('Personne', 'concerne2', 'Affaire')
+ self.assertIn('concerne2', self.schema)
+ mh.cmd_drop_relation_definition('Personne', 'concerne2', 'Note')
+ self.assertNotIn('concerne2', self.schema)
def test_drop_relation_definition_existant_rtype(self):
- self.assertEqual(sorted(str(e) for e in self.schema['concerne'].subjects()),
- ['Affaire', 'Personne'])
- self.assertEqual(sorted(str(e) for e in self.schema['concerne'].objects()),
- ['Affaire', 'Division', 'Note', 'Societe', 'SubDivision'])
- self.mh.cmd_drop_relation_definition('Personne', 'concerne', 'Affaire')
- self.assertEqual(sorted(str(e) for e in self.schema['concerne'].subjects()),
- ['Affaire'])
- self.assertEqual(sorted(str(e) for e in self.schema['concerne'].objects()),
- ['Division', 'Note', 'Societe', 'SubDivision'])
- self.mh.cmd_add_relation_definition('Personne', 'concerne', 'Affaire')
- self.assertEqual(sorted(str(e) for e in self.schema['concerne'].subjects()),
- ['Affaire', 'Personne'])
- self.assertEqual(sorted(str(e) for e in self.schema['concerne'].objects()),
- ['Affaire', 'Division', 'Note', 'Societe', 'SubDivision'])
- # trick: overwrite self.maxeid to avoid deletion of just reintroduced types
- self.maxeid = self.session.execute('Any MAX(X)')[0][0]
+ with self.mh() as (cnx, mh):
+ self.assertEqual(sorted(str(e) for e in self.schema['concerne'].subjects()),
+ ['Affaire', 'Personne'])
+ self.assertEqual(sorted(str(e) for e in self.schema['concerne'].objects()),
+ ['Affaire', 'Division', 'Note', 'Societe', 'SubDivision'])
+ mh.cmd_drop_relation_definition('Personne', 'concerne', 'Affaire')
+ self.assertEqual(sorted(str(e) for e in self.schema['concerne'].subjects()),
+ ['Affaire'])
+ self.assertEqual(sorted(str(e) for e in self.schema['concerne'].objects()),
+ ['Division', 'Note', 'Societe', 'SubDivision'])
+ mh.cmd_add_relation_definition('Personne', 'concerne', 'Affaire')
+ self.assertEqual(sorted(str(e) for e in self.schema['concerne'].subjects()),
+ ['Affaire', 'Personne'])
+ self.assertEqual(sorted(str(e) for e in self.schema['concerne'].objects()),
+ ['Affaire', 'Division', 'Note', 'Societe', 'SubDivision'])
+ # trick: overwrite self.maxeid to avoid deletion of just reintroduced types
+ self.maxeid = cnx.execute('Any MAX(X)')[0][0]
def test_drop_relation_definition_with_specialization(self):
- self.assertEqual(sorted(str(e) for e in self.schema['concerne'].subjects()),
- ['Affaire', 'Personne'])
- self.assertEqual(sorted(str(e) for e in self.schema['concerne'].objects()),
- ['Affaire', 'Division', 'Note', 'Societe', 'SubDivision'])
- self.mh.cmd_drop_relation_definition('Affaire', 'concerne', 'Societe')
- self.assertEqual(sorted(str(e) for e in self.schema['concerne'].subjects()),
- ['Affaire', 'Personne'])
- self.assertEqual(sorted(str(e) for e in self.schema['concerne'].objects()),
- ['Affaire', 'Note'])
- self.mh.cmd_add_relation_definition('Affaire', 'concerne', 'Societe')
- self.assertEqual(sorted(str(e) for e in self.schema['concerne'].subjects()),
- ['Affaire', 'Personne'])
- self.assertEqual(sorted(str(e) for e in self.schema['concerne'].objects()),
- ['Affaire', 'Division', 'Note', 'Societe', 'SubDivision'])
- # trick: overwrite self.maxeid to avoid deletion of just reintroduced types
- self.maxeid = self.session.execute('Any MAX(X)')[0][0]
+ with self.mh() as (cnx, mh):
+ self.assertEqual(sorted(str(e) for e in self.schema['concerne'].subjects()),
+ ['Affaire', 'Personne'])
+ self.assertEqual(sorted(str(e) for e in self.schema['concerne'].objects()),
+ ['Affaire', 'Division', 'Note', 'Societe', 'SubDivision'])
+ mh.cmd_drop_relation_definition('Affaire', 'concerne', 'Societe')
+ self.assertEqual(sorted(str(e) for e in self.schema['concerne'].subjects()),
+ ['Affaire', 'Personne'])
+ self.assertEqual(sorted(str(e) for e in self.schema['concerne'].objects()),
+ ['Affaire', 'Note'])
+ mh.cmd_add_relation_definition('Affaire', 'concerne', 'Societe')
+ self.assertEqual(sorted(str(e) for e in self.schema['concerne'].subjects()),
+ ['Affaire', 'Personne'])
+ self.assertEqual(sorted(str(e) for e in self.schema['concerne'].objects()),
+ ['Affaire', 'Division', 'Note', 'Societe', 'SubDivision'])
+ # trick: overwrite self.maxeid to avoid deletion of just reintroduced types
+ self.maxeid = cnx.execute('Any MAX(X)')[0][0]
def test_rename_relation(self):
self.skipTest('implement me')
def test_change_relation_props_non_final(self):
- rschema = self.schema['concerne']
- card = rschema.rdef('Affaire', 'Societe').cardinality
- self.assertEqual(card, '**')
- try:
- self.mh.cmd_change_relation_props('Affaire', 'concerne', 'Societe',
- cardinality='?*')
+ with self.mh() as (cnx, mh):
+ rschema = self.schema['concerne']
card = rschema.rdef('Affaire', 'Societe').cardinality
- self.assertEqual(card, '?*')
- finally:
- self.mh.cmd_change_relation_props('Affaire', 'concerne', 'Societe',
- cardinality='**')
+ self.assertEqual(card, '**')
+ try:
+ mh.cmd_change_relation_props('Affaire', 'concerne', 'Societe',
+ cardinality='?*')
+ card = rschema.rdef('Affaire', 'Societe').cardinality
+ self.assertEqual(card, '?*')
+ finally:
+ mh.cmd_change_relation_props('Affaire', 'concerne', 'Societe',
+ cardinality='**')
def test_change_relation_props_final(self):
- rschema = self.schema['adel']
- card = rschema.rdef('Personne', 'String').fulltextindexed
- self.assertEqual(card, False)
- try:
- self.mh.cmd_change_relation_props('Personne', 'adel', 'String',
- fulltextindexed=True)
+ with self.mh() as (cnx, mh):
+ rschema = self.schema['adel']
card = rschema.rdef('Personne', 'String').fulltextindexed
- self.assertEqual(card, True)
- finally:
- self.mh.cmd_change_relation_props('Personne', 'adel', 'String',
- fulltextindexed=False)
+ self.assertEqual(card, False)
+ try:
+ mh.cmd_change_relation_props('Personne', 'adel', 'String',
+ fulltextindexed=True)
+ card = rschema.rdef('Personne', 'String').fulltextindexed
+ self.assertEqual(card, True)
+ finally:
+ mh.cmd_change_relation_props('Personne', 'adel', 'String',
+ fulltextindexed=False)
def test_sync_schema_props_perms_rqlconstraints(self):
- # Drop one of the RQLConstraint.
- rdef = self.schema['evaluee'].rdefs[('Personne', 'Note')]
- oldconstraints = rdef.constraints
- self.assertIn('S created_by U',
- [cstr.expression for cstr in oldconstraints])
- self.mh.cmd_sync_schema_props_perms('evaluee', commit=True)
- newconstraints = rdef.constraints
- self.assertNotIn('S created_by U',
- [cstr.expression for cstr in newconstraints])
+ with self.mh() as (cnx, mh):
+ # Drop one of the RQLConstraint.
+ rdef = self.schema['evaluee'].rdefs[('Personne', 'Note')]
+ oldconstraints = rdef.constraints
+ self.assertIn('S created_by U',
+ [cstr.expression for cstr in oldconstraints])
+ mh.cmd_sync_schema_props_perms('evaluee', commit=True)
+ newconstraints = rdef.constraints
+ self.assertNotIn('S created_by U',
+ [cstr.expression for cstr in newconstraints])
- # Drop all RQLConstraint.
- rdef = self.schema['travaille'].rdefs[('Personne', 'Societe')]
- oldconstraints = rdef.constraints
- self.assertEqual(len(oldconstraints), 2)
- self.mh.cmd_sync_schema_props_perms('travaille', commit=True)
- rdef = self.schema['travaille'].rdefs[('Personne', 'Societe')]
- newconstraints = rdef.constraints
- self.assertEqual(len(newconstraints), 0)
+ # Drop all RQLConstraint.
+ rdef = self.schema['travaille'].rdefs[('Personne', 'Societe')]
+ oldconstraints = rdef.constraints
+ self.assertEqual(len(oldconstraints), 2)
+ mh.cmd_sync_schema_props_perms('travaille', commit=True)
+ rdef = self.schema['travaille'].rdefs[('Personne', 'Societe')]
+ newconstraints = rdef.constraints
+ self.assertEqual(len(newconstraints), 0)
@tag('longrun')
def test_sync_schema_props_perms(self):
- cursor = self.mh.session
- cursor.set_cnxset()
- nbrqlexpr_start = cursor.execute('Any COUNT(X) WHERE X is RQLExpression')[0][0]
- migrschema['titre'].rdefs[('Personne', 'String')].order = 7
- migrschema['adel'].rdefs[('Personne', 'String')].order = 6
- migrschema['ass'].rdefs[('Personne', 'String')].order = 5
- migrschema['Personne'].description = 'blabla bla'
- migrschema['titre'].description = 'usually a title'
- migrschema['titre'].rdefs[('Personne', 'String')].description = 'title for this person'
- delete_concerne_rqlexpr = self._rrqlexpr_rset('delete', 'concerne')
- add_concerne_rqlexpr = self._rrqlexpr_rset('add', 'concerne')
+ with self.mh() as (cnx, mh):
+ nbrqlexpr_start = cnx.execute('Any COUNT(X) WHERE X is RQLExpression')[0][0]
+ migrschema['titre'].rdefs[('Personne', 'String')].order = 7
+ migrschema['adel'].rdefs[('Personne', 'String')].order = 6
+ migrschema['ass'].rdefs[('Personne', 'String')].order = 5
+ migrschema['Personne'].description = 'blabla bla'
+ migrschema['titre'].description = 'usually a title'
+ migrschema['titre'].rdefs[('Personne', 'String')].description = 'title for this person'
+ delete_concerne_rqlexpr = self._rrqlexpr_rset(cnx, 'delete', 'concerne')
+ add_concerne_rqlexpr = self._rrqlexpr_rset(cnx, 'add', 'concerne')
- self.mh.cmd_sync_schema_props_perms(commit=False)
+ mh.cmd_sync_schema_props_perms(commit=False)
- self.assertEqual(cursor.execute('Any D WHERE X name "Personne", X description D')[0][0],
- 'blabla bla')
- self.assertEqual(cursor.execute('Any D WHERE X name "titre", X description D')[0][0],
- 'usually a title')
- self.assertEqual(cursor.execute('Any D WHERE X relation_type RT, RT name "titre",'
+ self.assertEqual(cnx.execute('Any D WHERE X name "Personne", X description D')[0][0],
+ 'blabla bla')
+ self.assertEqual(cnx.execute('Any D WHERE X name "titre", X description D')[0][0],
+ 'usually a title')
+ self.assertEqual(cnx.execute('Any D WHERE X relation_type RT, RT name "titre",'
'X from_entity FE, FE name "Personne",'
'X description D')[0][0],
- 'title for this person')
- rinorder = [n for n, in cursor.execute(
- 'Any N ORDERBY O,N WHERE X is CWAttribute, X relation_type RT, RT name N,'
- 'X from_entity FE, FE name "Personne",'
- 'X ordernum O')]
- expected = [u'nom', u'prenom', u'sexe', u'promo', u'ass', u'adel', u'titre',
- u'web', u'tel', u'fax', u'datenaiss', u'test', u'tzdatenaiss',
- u'description', u'firstname',
- u'creation_date', u'cwuri', u'modification_date']
- self.assertEqual(expected, rinorder)
+ 'title for this person')
+ rinorder = [n for n, in cnx.execute(
+ 'Any N ORDERBY O,N WHERE X is CWAttribute, X relation_type RT, RT name N,'
+ 'X from_entity FE, FE name "Personne",'
+ 'X ordernum O')]
+ expected = [u'nom', u'prenom', u'sexe', u'promo', u'ass', u'adel', u'titre',
+ u'web', u'tel', u'fax', u'datenaiss', u'test', u'tzdatenaiss',
+ u'description', u'firstname',
+ u'creation_date', u'cwuri', u'modification_date']
+ self.assertEqual(expected, rinorder)
- # test permissions synchronization ####################################
- # new rql expr to add note entity
- eexpr = self._erqlexpr_entity('add', 'Note')
- self.assertEqual(eexpr.expression,
- 'X ecrit_part PE, U in_group G, '
- 'PE require_permission P, P name "add_note", P require_group G')
- self.assertEqual([et.name for et in eexpr.reverse_add_permission], ['Note'])
- self.assertEqual(eexpr.reverse_read_permission, ())
- self.assertEqual(eexpr.reverse_delete_permission, ())
- self.assertEqual(eexpr.reverse_update_permission, ())
- self.assertTrue(self._rrqlexpr_rset('add', 'para'))
- # no rqlexpr to delete para attribute
- self.assertFalse(self._rrqlexpr_rset('delete', 'para'))
- # new rql expr to add ecrit_par relation
- rexpr = self._rrqlexpr_entity('add', 'ecrit_par')
- self.assertEqual(rexpr.expression,
- 'O require_permission P, P name "add_note", '
- 'U in_group G, P require_group G')
- self.assertEqual([rdef.rtype.name for rdef in rexpr.reverse_add_permission], ['ecrit_par'])
- self.assertEqual(rexpr.reverse_read_permission, ())
- self.assertEqual(rexpr.reverse_delete_permission, ())
- # no more rqlexpr to delete and add travaille relation
- self.assertFalse(self._rrqlexpr_rset('add', 'travaille'))
- self.assertFalse(self._rrqlexpr_rset('delete', 'travaille'))
- # no more rqlexpr to delete and update Societe entity
- self.assertFalse(self._erqlexpr_rset('update', 'Societe'))
- self.assertFalse(self._erqlexpr_rset('delete', 'Societe'))
- # no more rqlexpr to read Affaire entity
- self.assertFalse(self._erqlexpr_rset('read', 'Affaire'))
- # rqlexpr to update Affaire entity has been updated
- eexpr = self._erqlexpr_entity('update', 'Affaire')
- self.assertEqual(eexpr.expression, 'X concerne S, S owned_by U')
- # no change for rqlexpr to add and delete Affaire entity
- self.assertEqual(len(self._erqlexpr_rset('delete', 'Affaire')), 1)
- self.assertEqual(len(self._erqlexpr_rset('add', 'Affaire')), 1)
- # no change for rqlexpr to add and delete concerne relation
- self.assertEqual(len(self._rrqlexpr_rset('delete', 'concerne')), len(delete_concerne_rqlexpr))
- self.assertEqual(len(self._rrqlexpr_rset('add', 'concerne')), len(add_concerne_rqlexpr))
- # * migrschema involve:
- # * 7 erqlexprs deletions (2 in (Affaire + Societe + Note.para) + 1 Note.something
- # * 2 rrqlexprs deletions (travaille)
- # * 1 update (Affaire update)
- # * 2 new (Note add, ecrit_par add)
- # * 2 implicit new for attributes (Note.para, Person.test)
- # remaining orphan rql expr which should be deleted at commit (composite relation)
- # unattached expressions -> pending deletion on commit
- self.assertEqual(cursor.execute('Any COUNT(X) WHERE X is RQLExpression, X exprtype "ERQLExpression",'
- 'NOT ET1 read_permission X, NOT ET2 add_permission X, '
- 'NOT ET3 delete_permission X, NOT ET4 update_permission X')[0][0],
- 7)
- self.assertEqual(cursor.execute('Any COUNT(X) WHERE X is RQLExpression, X exprtype "RRQLExpression",'
- 'NOT ET1 read_permission X, NOT ET2 add_permission X, '
- 'NOT ET3 delete_permission X, NOT ET4 update_permission X')[0][0],
- 2)
- # finally
- self.assertEqual(cursor.execute('Any COUNT(X) WHERE X is RQLExpression')[0][0],
- nbrqlexpr_start + 1 + 2 + 2 + 2)
- self.mh.commit()
- # unique_together test
- self.assertEqual(len(self.schema.eschema('Personne')._unique_together), 1)
- self.assertCountEqual(self.schema.eschema('Personne')._unique_together[0],
- ('nom', 'prenom', 'datenaiss'))
- rset = cursor.execute('Any C WHERE C is CWUniqueTogetherConstraint, C constraint_of ET, ET name "Personne"')
- self.assertEqual(len(rset), 1)
- relations = [r.name for r in rset.get_entity(0, 0).relations]
- self.assertCountEqual(relations, ('nom', 'prenom', 'datenaiss'))
+ # test permissions synchronization ####################################
+ # new rql expr to add note entity
+ eexpr = self._erqlexpr_entity(cnx, 'add', 'Note')
+ self.assertEqual(eexpr.expression,
+ 'X ecrit_part PE, U in_group G, '
+ 'PE require_permission P, P name "add_note", P require_group G')
+ self.assertEqual([et.name for et in eexpr.reverse_add_permission], ['Note'])
+ self.assertEqual(eexpr.reverse_read_permission, ())
+ self.assertEqual(eexpr.reverse_delete_permission, ())
+ self.assertEqual(eexpr.reverse_update_permission, ())
+ self.assertTrue(self._rrqlexpr_rset(cnx, 'add', 'para'))
+ # no rqlexpr to delete para attribute
+ self.assertFalse(self._rrqlexpr_rset(cnx, 'delete', 'para'))
+ # new rql expr to add ecrit_par relation
+ rexpr = self._rrqlexpr_entity(cnx, 'add', 'ecrit_par')
+ self.assertEqual(rexpr.expression,
+ 'O require_permission P, P name "add_note", '
+ 'U in_group G, P require_group G')
+ self.assertEqual([rdef.rtype.name for rdef in rexpr.reverse_add_permission], ['ecrit_par'])
+ self.assertEqual(rexpr.reverse_read_permission, ())
+ self.assertEqual(rexpr.reverse_delete_permission, ())
+ # no more rqlexpr to delete and add travaille relation
+ self.assertFalse(self._rrqlexpr_rset(cnx, 'add', 'travaille'))
+ self.assertFalse(self._rrqlexpr_rset(cnx, 'delete', 'travaille'))
+ # no more rqlexpr to delete and update Societe entity
+ self.assertFalse(self._erqlexpr_rset(cnx, 'update', 'Societe'))
+ self.assertFalse(self._erqlexpr_rset(cnx, 'delete', 'Societe'))
+ # no more rqlexpr to read Affaire entity
+ self.assertFalse(self._erqlexpr_rset(cnx, 'read', 'Affaire'))
+ # rqlexpr to update Affaire entity has been updated
+ eexpr = self._erqlexpr_entity(cnx, 'update', 'Affaire')
+ self.assertEqual(eexpr.expression, 'X concerne S, S owned_by U')
+ # no change for rqlexpr to add and delete Affaire entity
+ self.assertEqual(len(self._erqlexpr_rset(cnx, 'delete', 'Affaire')), 1)
+ self.assertEqual(len(self._erqlexpr_rset(cnx, 'add', 'Affaire')), 1)
+ # no change for rqlexpr to add and delete concerne relation
+ self.assertEqual(len(self._rrqlexpr_rset(cnx, 'delete', 'concerne')),
+ len(delete_concerne_rqlexpr))
+ self.assertEqual(len(self._rrqlexpr_rset(cnx, 'add', 'concerne')),
+ len(add_concerne_rqlexpr))
+ # * migrschema involve:
+ # * 7 erqlexprs deletions (2 in (Affaire + Societe + Note.para) + 1 Note.something
+ # * 2 rrqlexprs deletions (travaille)
+ # * 1 update (Affaire update)
+ # * 2 new (Note add, ecrit_par add)
+ # * 2 implicit new for attributes (Note.para, Person.test)
+ # remaining orphan rql expr which should be deleted at commit (composite relation)
+ # unattached expressions -> pending deletion on commit
+ self.assertEqual(cnx.execute('Any COUNT(X) WHERE X is RQLExpression, X exprtype "ERQLExpression",'
+ 'NOT ET1 read_permission X, NOT ET2 add_permission X, '
+ 'NOT ET3 delete_permission X, NOT ET4 update_permission X')[0][0],
+ 7)
+ self.assertEqual(cnx.execute('Any COUNT(X) WHERE X is RQLExpression, X exprtype "RRQLExpression",'
+ 'NOT ET1 read_permission X, NOT ET2 add_permission X, '
+ 'NOT ET3 delete_permission X, NOT ET4 update_permission X')[0][0],
+ 2)
+ # finally
+ self.assertEqual(cnx.execute('Any COUNT(X) WHERE X is RQLExpression')[0][0],
+ nbrqlexpr_start + 1 + 2 + 2 + 2)
+ cnx.commit()
+ # unique_together test
+ self.assertEqual(len(self.schema.eschema('Personne')._unique_together), 1)
+ self.assertCountEqual(self.schema.eschema('Personne')._unique_together[0],
+ ('nom', 'prenom', 'datenaiss'))
+ rset = cnx.execute('Any C WHERE C is CWUniqueTogetherConstraint, C constraint_of ET, ET name "Personne"')
+ self.assertEqual(len(rset), 1)
+ relations = [r.name for r in rset.get_entity(0, 0).relations]
+ self.assertCountEqual(relations, ('nom', 'prenom', 'datenaiss'))
- def _erqlexpr_rset(self, action, ertype):
+ def _erqlexpr_rset(self, cnx, action, ertype):
rql = 'RQLExpression X WHERE ET is CWEType, ET %s_permission X, ET name %%(name)s' % action
- return self.mh.session.execute(rql, {'name': ertype})
- def _erqlexpr_entity(self, action, ertype):
- rset = self._erqlexpr_rset(action, ertype)
+ return cnx.execute(rql, {'name': ertype})
+
+ def _erqlexpr_entity(self, cnx, action, ertype):
+ rset = self._erqlexpr_rset(cnx, action, ertype)
self.assertEqual(len(rset), 1)
return rset.get_entity(0, 0)
- def _rrqlexpr_rset(self, action, ertype):
+
+ def _rrqlexpr_rset(self, cnx, action, ertype):
rql = 'RQLExpression X WHERE RT is CWRType, RDEF %s_permission X, RT name %%(name)s, RDEF relation_type RT' % action
- return self.mh.session.execute(rql, {'name': ertype})
- def _rrqlexpr_entity(self, action, ertype):
- rset = self._rrqlexpr_rset(action, ertype)
+ return cnx.execute(rql, {'name': ertype})
+
+ def _rrqlexpr_entity(self, cnx, action, ertype):
+ rset = self._rrqlexpr_rset(cnx, action, ertype)
self.assertEqual(len(rset), 1)
return rset.get_entity(0, 0)
def test_set_size_constraint(self):
- # existing previous value
- try:
- self.mh.cmd_set_size_constraint('CWEType', 'name', 128)
- finally:
- self.mh.cmd_set_size_constraint('CWEType', 'name', 64)
- # non existing previous value
- try:
- self.mh.cmd_set_size_constraint('CWEType', 'description', 256)
- finally:
- self.mh.cmd_set_size_constraint('CWEType', 'description', None)
+ with self.mh() as (cnx, mh):
+ # existing previous value
+ try:
+ mh.cmd_set_size_constraint('CWEType', 'name', 128)
+ finally:
+ mh.cmd_set_size_constraint('CWEType', 'name', 64)
+ # non existing previous value
+ try:
+ mh.cmd_set_size_constraint('CWEType', 'description', 256)
+ finally:
+ mh.cmd_set_size_constraint('CWEType', 'description', None)
@tag('longrun')
def test_add_remove_cube_and_deps(self):
- cubes = set(self.config.cubes())
- schema = self.repo.schema
- self.assertEqual(sorted((str(s), str(o)) for s, o in schema['see_also'].rdefs.iterkeys()),
- sorted([('EmailThread', 'EmailThread'), ('Folder', 'Folder'),
- ('Bookmark', 'Bookmark'), ('Bookmark', 'Note'),
- ('Note', 'Note'), ('Note', 'Bookmark')]))
- try:
+ with self.mh() as (cnx, mh):
+ schema = self.repo.schema
+ self.assertEqual(sorted((str(s), str(o)) for s, o in schema['see_also'].rdefs.iterkeys()),
+ sorted([('EmailThread', 'EmailThread'), ('Folder', 'Folder'),
+ ('Bookmark', 'Bookmark'), ('Bookmark', 'Note'),
+ ('Note', 'Note'), ('Note', 'Bookmark')]))
try:
- self.mh.cmd_remove_cube('email', removedeps=True)
+ mh.cmd_remove_cube('email', removedeps=True)
# file was there because it's an email dependancy, should have been removed
self.assertNotIn('email', self.config.cubes())
self.assertNotIn(self.config.cube_dir('email'), self.config.cubes_path())
@@ -538,121 +556,116 @@
('Note', 'Bookmark')]))
self.assertEqual(sorted(schema['see_also'].subjects()), ['Bookmark', 'Folder', 'Note'])
self.assertEqual(sorted(schema['see_also'].objects()), ['Bookmark', 'Folder', 'Note'])
- self.assertEqual(self.session.execute('Any X WHERE X pkey "system.version.email"').rowcount, 0)
- self.assertEqual(self.session.execute('Any X WHERE X pkey "system.version.file"').rowcount, 0)
- except :
- import traceback
- traceback.print_exc()
- raise
- finally:
- self.mh.cmd_add_cube('email')
- self.assertIn('email', self.config.cubes())
- self.assertIn(self.config.cube_dir('email'), self.config.cubes_path())
- self.assertIn('file', self.config.cubes())
- self.assertIn(self.config.cube_dir('file'), self.config.cubes_path())
- for ertype in ('Email', 'EmailThread', 'EmailPart', 'File',
- 'sender', 'in_thread', 'reply_to', 'data_format'):
- self.assertTrue(ertype in schema, ertype)
- self.assertEqual(sorted(schema['see_also'].rdefs.iterkeys()),
- sorted([('EmailThread', 'EmailThread'), ('Folder', 'Folder'),
- ('Bookmark', 'Bookmark'),
- ('Bookmark', 'Note'),
- ('Note', 'Note'),
- ('Note', 'Bookmark')]))
- self.assertEqual(sorted(schema['see_also'].subjects()), ['Bookmark', 'EmailThread', 'Folder', 'Note'])
- self.assertEqual(sorted(schema['see_also'].objects()), ['Bookmark', 'EmailThread', 'Folder', 'Note'])
- from cubes.email.__pkginfo__ import version as email_version
- from cubes.file.__pkginfo__ import version as file_version
- self.assertEqual(self.session.execute('Any V WHERE X value V, X pkey "system.version.email"')[0][0],
- email_version)
- self.assertEqual(self.session.execute('Any V WHERE X value V, X pkey "system.version.file"')[0][0],
- file_version)
- # trick: overwrite self.maxeid to avoid deletion of just reintroduced
- # types (and their associated tables!)
- self.maxeid = self.session.execute('Any MAX(X)')[0][0]
- # why this commit is necessary is unclear to me (though without it
- # next test may fail complaining of missing tables
- self.session.commit(free_cnxset=False)
+ self.assertEqual(cnx.execute('Any X WHERE X pkey "system.version.email"').rowcount, 0)
+ self.assertEqual(cnx.execute('Any X WHERE X pkey "system.version.file"').rowcount, 0)
+ finally:
+ mh.cmd_add_cube('email')
+ self.assertIn('email', self.config.cubes())
+ self.assertIn(self.config.cube_dir('email'), self.config.cubes_path())
+ self.assertIn('file', self.config.cubes())
+ self.assertIn(self.config.cube_dir('file'), self.config.cubes_path())
+ for ertype in ('Email', 'EmailThread', 'EmailPart', 'File',
+ 'sender', 'in_thread', 'reply_to', 'data_format'):
+ self.assertTrue(ertype in schema, ertype)
+ self.assertEqual(sorted(schema['see_also'].rdefs.iterkeys()),
+ sorted([('EmailThread', 'EmailThread'), ('Folder', 'Folder'),
+ ('Bookmark', 'Bookmark'),
+ ('Bookmark', 'Note'),
+ ('Note', 'Note'),
+ ('Note', 'Bookmark')]))
+ self.assertEqual(sorted(schema['see_also'].subjects()), ['Bookmark', 'EmailThread', 'Folder', 'Note'])
+ self.assertEqual(sorted(schema['see_also'].objects()), ['Bookmark', 'EmailThread', 'Folder', 'Note'])
+ from cubes.email.__pkginfo__ import version as email_version
+ from cubes.file.__pkginfo__ import version as file_version
+ self.assertEqual(cnx.execute('Any V WHERE X value V, X pkey "system.version.email"')[0][0],
+ email_version)
+ self.assertEqual(cnx.execute('Any V WHERE X value V, X pkey "system.version.file"')[0][0],
+ file_version)
+ # trick: overwrite self.maxeid to avoid deletion of just reintroduced
+ # types (and their associated tables!)
+ self.maxeid = cnx.execute('Any MAX(X)')[0][0]
+ # why this commit is necessary is unclear to me (though without it
+ # next test may fail complaining of missing tables
+ cnx.commit()
@tag('longrun')
def test_add_remove_cube_no_deps(self):
- cubes = set(self.config.cubes())
- schema = self.repo.schema
- try:
+ with self.mh() as (cnx, mh):
+ cubes = set(self.config.cubes())
+ schema = self.repo.schema
try:
- self.mh.cmd_remove_cube('email')
+ mh.cmd_remove_cube('email')
cubes.remove('email')
self.assertNotIn('email', self.config.cubes())
self.assertIn('file', self.config.cubes())
for ertype in ('Email', 'EmailThread', 'EmailPart',
'sender', 'in_thread', 'reply_to'):
self.assertFalse(ertype in schema, ertype)
- except :
- import traceback
- traceback.print_exc()
- raise
- finally:
- self.mh.cmd_add_cube('email')
- self.assertIn('email', self.config.cubes())
- # trick: overwrite self.maxeid to avoid deletion of just reintroduced
- # types (and their associated tables!)
- self.maxeid = self.session.execute('Any MAX(X)')[0][0]
- # why this commit is necessary is unclear to me (though without it
- # next test may fail complaining of missing tables
- self.session.commit(free_cnxset=False)
+ finally:
+ mh.cmd_add_cube('email')
+ self.assertIn('email', self.config.cubes())
+ # trick: overwrite self.maxeid to avoid deletion of just reintroduced
+ # types (and their associated tables!)
+ self.maxeid = cnx.execute('Any MAX(X)')[0][0] # XXXXXXX KILL KENNY
+ # why this commit is necessary is unclear to me (though without it
+ # next test may fail complaining of missing tables
+ cnx.commit()
def test_remove_dep_cube(self):
- with self.assertRaises(ConfigurationError) as cm:
- self.mh.cmd_remove_cube('file')
- self.assertEqual(str(cm.exception), "can't remove cube file, used as a dependency")
+ with self.mh() as (cnx, mh):
+ with self.assertRaises(ConfigurationError) as cm:
+ mh.cmd_remove_cube('file')
+ self.assertEqual(str(cm.exception), "can't remove cube file, used as a dependency")
@tag('longrun')
def test_introduce_base_class(self):
- self.mh.cmd_add_entity_type('Para')
- self.assertEqual(sorted(et.type for et in self.schema['Para'].specialized_by()),
- ['Note'])
- self.assertEqual(self.schema['Note'].specializes().type, 'Para')
- self.mh.cmd_add_entity_type('Text')
- self.assertEqual(sorted(et.type for et in self.schema['Para'].specialized_by()),
- ['Note', 'Text'])
- self.assertEqual(self.schema['Text'].specializes().type, 'Para')
- # test columns have been actually added
- text = self.session.execute('INSERT Text X: X para "hip", X summary "hop", X newattr "momo"').get_entity(0, 0)
- note = self.session.execute('INSERT Note X: X para "hip", X shortpara "hop", X newattr "momo", X unique_id "x"').get_entity(0, 0)
- aff = self.session.execute('INSERT Affaire X').get_entity(0, 0)
- self.assertTrue(self.session.execute('SET X newnotinlined Y WHERE X eid %(x)s, Y eid %(y)s',
- {'x': text.eid, 'y': aff.eid}))
- self.assertTrue(self.session.execute('SET X newnotinlined Y WHERE X eid %(x)s, Y eid %(y)s',
- {'x': note.eid, 'y': aff.eid}))
- self.assertTrue(self.session.execute('SET X newinlined Y WHERE X eid %(x)s, Y eid %(y)s',
- {'x': text.eid, 'y': aff.eid}))
- self.assertTrue(self.session.execute('SET X newinlined Y WHERE X eid %(x)s, Y eid %(y)s',
- {'x': note.eid, 'y': aff.eid}))
- # XXX remove specializes by ourselves, else tearDown fails when removing
- # Para because of Note inheritance. This could be fixed by putting the
- # MemSchemaCWETypeDel(session, name) operation in the
- # after_delete_entity(CWEType) hook, since in that case the MemSchemaSpecializesDel
- # operation would be removed before, but I'm not sure this is a desired behaviour.
- #
- # also we need more tests about introducing/removing base classes or
- # specialization relationship...
- self.session.execute('DELETE X specializes Y WHERE Y name "Para"')
- self.session.commit(free_cnxset=False)
- self.assertEqual(sorted(et.type for et in self.schema['Para'].specialized_by()),
- [])
- self.assertEqual(self.schema['Note'].specializes(), None)
- self.assertEqual(self.schema['Text'].specializes(), None)
+ with self.mh() as (cnx, mh):
+ mh.cmd_add_entity_type('Para')
+ self.assertEqual(sorted(et.type for et in self.schema['Para'].specialized_by()),
+ ['Note'])
+ self.assertEqual(self.schema['Note'].specializes().type, 'Para')
+ mh.cmd_add_entity_type('Text')
+ self.assertEqual(sorted(et.type for et in self.schema['Para'].specialized_by()),
+ ['Note', 'Text'])
+ self.assertEqual(self.schema['Text'].specializes().type, 'Para')
+ # test columns have been actually added
+ text = cnx.execute('INSERT Text X: X para "hip", X summary "hop", X newattr "momo"').get_entity(0, 0)
+ note = cnx.execute('INSERT Note X: X para "hip", X shortpara "hop", X newattr "momo", X unique_id "x"').get_entity(0, 0)
+ aff = cnx.execute('INSERT Affaire X').get_entity(0, 0)
+ self.assertTrue(cnx.execute('SET X newnotinlined Y WHERE X eid %(x)s, Y eid %(y)s',
+ {'x': text.eid, 'y': aff.eid}))
+ self.assertTrue(cnx.execute('SET X newnotinlined Y WHERE X eid %(x)s, Y eid %(y)s',
+ {'x': note.eid, 'y': aff.eid}))
+ self.assertTrue(cnx.execute('SET X newinlined Y WHERE X eid %(x)s, Y eid %(y)s',
+ {'x': text.eid, 'y': aff.eid}))
+ self.assertTrue(cnx.execute('SET X newinlined Y WHERE X eid %(x)s, Y eid %(y)s',
+ {'x': note.eid, 'y': aff.eid}))
+ # XXX remove specializes by ourselves, else tearDown fails when removing
+ # Para because of Note inheritance. This could be fixed by putting the
+ # MemSchemaCWETypeDel(session, name) operation in the
+ # after_delete_entity(CWEType) hook, since in that case the MemSchemaSpecializesDel
+ # operation would be removed before, but I'm not sure this is a desired behaviour.
+ #
+ # also we need more tests about introducing/removing base classes or
+ # specialization relationship...
+ cnx.execute('DELETE X specializes Y WHERE Y name "Para"')
+ cnx.commit()
+ self.assertEqual(sorted(et.type for et in self.schema['Para'].specialized_by()),
+ [])
+ self.assertEqual(self.schema['Note'].specializes(), None)
+ self.assertEqual(self.schema['Text'].specializes(), None)
def test_add_symmetric_relation_type(self):
- same_as_sql = self.mh.sqlexec("SELECT sql FROM sqlite_master WHERE type='table' "
- "and name='same_as_relation'")
- self.assertFalse(same_as_sql)
- self.mh.cmd_add_relation_type('same_as')
- same_as_sql = self.mh.sqlexec("SELECT sql FROM sqlite_master WHERE type='table' "
- "and name='same_as_relation'")
- self.assertTrue(same_as_sql)
+ with self.mh() as (cnx, mh):
+ same_as_sql = mh.sqlexec("SELECT sql FROM sqlite_master WHERE type='table' "
+ "and name='same_as_relation'")
+ self.assertFalse(same_as_sql)
+ mh.cmd_add_relation_type('same_as')
+ same_as_sql = mh.sqlexec("SELECT sql FROM sqlite_master WHERE type='table' "
+ "and name='same_as_relation'")
+ self.assertTrue(same_as_sql)
if __name__ == '__main__':
unittest_main()