--- a/cubicweb/server/test/unittest_migractions.py Fri Nov 03 15:26:04 2017 +0100
+++ b/cubicweb/server/test/unittest_migractions.py Fri Nov 03 15:26:32 2017 +0100
@@ -22,7 +22,6 @@
import sys
from datetime import date
from contextlib import contextmanager
-import tempfile
from logilab.common import tempattr
@@ -111,15 +110,18 @@
interactive=False)
def table_sql(self, mh, tablename):
- result = mh.sqlexec("SELECT table_name FROM information_schema.tables WHERE LOWER(table_name)=%(table)s",
- {'table': tablename.lower()})
+ result = mh.sqlexec(
+ "SELECT table_name FROM information_schema.tables WHERE LOWER(table_name)=%(table)s",
+ {'table': tablename.lower()})
if result:
return result[0][0]
- return None # no such table
+ return None # no such table
def table_schema(self, mh, tablename):
- result = mh.sqlexec("SELECT column_name, data_type, character_maximum_length FROM information_schema.columns "
- "WHERE LOWER(table_name) = %(table)s", {'table': tablename.lower()})
+ result = mh.sqlexec(
+ "SELECT column_name, data_type, character_maximum_length "
+ "FROM information_schema.columns "
+ "WHERE LOWER(table_name) = %(table)s", {'table': tablename.lower()})
assert result, 'no table %s' % tablename
return dict((x[0], (x[1], x[2])) for x in result)
@@ -178,16 +180,9 @@
whateverorder = migrschema['whatever'].rdef('Note', 'Int').order
for k, v in orderdict.items():
if v >= whateverorder:
- orderdict[k] = v+1
+ orderdict[k] = v + 1
orderdict['whatever'] = whateverorder
self.assertDictEqual(orderdict, orderdict2)
- #self.assertEqual([r.type for r in self.schema['Note'].ordered_relations()],
- # ['modification_date', 'creation_date', 'owned_by',
- # 'eid', 'ecrit_par', 'inline1', 'date', 'type',
- # 'whatever', 'date', 'in_basket'])
- # NB: commit instead of rollback make following test fail with py2.5
- # this sounds like a pysqlite/2.5 bug (the same eid is affected to
- # two different entities)
def test_add_attribute_varchar(self):
with self.mh() as (cnx, mh):
@@ -233,7 +228,7 @@
self.assertEqual(self.schema['mydate'].objects(), ('Date', ))
testdate = date(2005, 12, 13)
eid1 = mh.rqlexec('INSERT Note N')[0][0]
- eid2 = mh.rqlexec('INSERT Note N: N mydate %(mydate)s', {'mydate' : testdate})[0][0]
+ eid2 = mh.rqlexec('INSERT Note N: N mydate %(mydate)s', {'mydate': testdate})[0][0]
d1 = mh.rqlexec('Any D WHERE X eid %(x)s, X mydate D', {'x': eid1})[0][0]
d2 = mh.rqlexec('Any D WHERE X eid %(x)s, X mydate D', {'x': eid2})[0][0]
d3 = mh.rqlexec('Any D WHERE X eid %(x)s, X oldstyledefaultdate D', {'x': eid1})[0][0]
@@ -285,8 +280,8 @@
def test_workflow_actions(self):
with self.mh() as (cnx, mh):
- wf = mh.cmd_add_workflow(u'foo', ('Personne', 'Email'),
- ensure_workflowable=False)
+ mh.cmd_add_workflow(u'foo', ('Personne', 'Email'),
+ ensure_workflowable=False)
for etype in ('Personne', 'Email'):
s1 = mh.rqlexec('Any N WHERE WF workflow_of ET, ET name "%s", WF name N' %
etype)[0][0]
@@ -306,18 +301,19 @@
self.assertIn('filed_under2', self.schema)
self.assertTrue(cnx.execute('CWRType X WHERE X name "filed_under2"'))
self.assertEqual(sorted(str(rs) for rs in self.schema['Folder2'].subject_relations()),
- ['created_by', 'creation_date', 'cw_source', 'cwuri',
- 'description', 'description_format',
- 'eid',
- 'filed_under2', 'has_text',
- 'identity', 'in_basket', 'inlined_rel', 'is', 'is_instance_of',
- 'modification_date', 'name', 'owned_by'])
+ ['created_by', 'creation_date', 'cw_source', 'cwuri',
+ 'description', 'description_format',
+ 'eid',
+ 'filed_under2', 'has_text',
+ 'identity', 'in_basket', 'inlined_rel', 'is', 'is_instance_of',
+ 'modification_date', 'name', 'owned_by'])
self.assertCountEqual([str(rs) for rs in self.schema['Folder2'].object_relations()],
['filed_under2', 'identity', 'inlined_rel'])
# Old will be missing as it has been renamed into 'New' in the migrated
# schema while New hasn't been added here.
self.assertEqual(sorted(str(e) for e in self.schema['filed_under2'].subjects()),
- sorted(str(e) for e in self.schema.entities() if not e.final and e != 'Old'))
+ sorted(str(e) for e in self.schema.entities()
+ if not e.final and e != 'Old'))
self.assertEqual(self.schema['filed_under2'].objects(), ('Folder2',))
eschema = self.schema.eschema('Folder2')
for cstr in eschema.rdef('name').constraints:
@@ -339,7 +335,7 @@
self.assertEqual(rdef.scale, 10)
self.assertEqual(rdef.precision, 18)
fields = self.table_schema(mh, '%sLocation' % SQL_PREFIX)
- self.assertEqual(fields['%snum' % SQL_PREFIX], ('numeric', None)) # XXX
+ self.assertEqual(fields['%snum' % SQL_PREFIX], ('numeric', None)) # XXX
finally:
mh.cmd_drop_cube('fakecustomtype')
mh.drop_entity_type('Numeric')
@@ -354,7 +350,6 @@
wf.add_transition(u'redoit', done, todo)
wf.add_transition(u'markasdone', todo, done)
cnx.commit()
- eschema = self.schema.eschema('Folder2')
mh.cmd_drop_entity_type('Folder2')
self.assertNotIn('Folder2', self.schema)
self.assertFalse(cnx.execute('CWEType X WHERE X name "Folder2"'))
@@ -404,8 +399,9 @@
('Personne',))
self.assertEqual(self.schema['concerne2'].objects(),
('Affaire', ))
- self.assertEqual(self.schema['concerne2'].rdef('Personne', 'Affaire').cardinality,
- '1*')
+ self.assertEqual(
+ self.schema['concerne2'].rdef('Personne', 'Affaire').cardinality,
+ '1*')
mh.cmd_add_relation_definition('Personne', 'concerne2', 'Note')
self.assertEqual(sorted(self.schema['concerne2'].objects()), ['Affaire', 'Note'])
mh.create_entity('Personne', nom=u'tot')
@@ -420,38 +416,38 @@
def test_drop_relation_definition_existant_rtype(self):
with self.mh() as (cnx, mh):
self.assertEqual(sorted(str(e) for e in self.schema['concerne'].subjects()),
- ['Affaire', 'Personne'])
+ ['Affaire', 'Personne'])
self.assertEqual(sorted(str(e) for e in self.schema['concerne'].objects()),
- ['Affaire', 'Division', 'Note', 'Societe', 'SubDivision'])
+ ['Affaire', 'Division', 'Note', 'Societe', 'SubDivision'])
mh.cmd_drop_relation_definition('Personne', 'concerne', 'Affaire')
self.assertEqual(sorted(str(e) for e in self.schema['concerne'].subjects()),
- ['Affaire'])
+ ['Affaire'])
self.assertEqual(sorted(str(e) for e in self.schema['concerne'].objects()),
- ['Division', 'Note', 'Societe', 'SubDivision'])
+ ['Division', 'Note', 'Societe', 'SubDivision'])
mh.cmd_add_relation_definition('Personne', 'concerne', 'Affaire')
self.assertEqual(sorted(str(e) for e in self.schema['concerne'].subjects()),
- ['Affaire', 'Personne'])
+ ['Affaire', 'Personne'])
self.assertEqual(sorted(str(e) for e in self.schema['concerne'].objects()),
- ['Affaire', 'Division', 'Note', 'Societe', 'SubDivision'])
+ ['Affaire', 'Division', 'Note', 'Societe', 'SubDivision'])
# trick: overwrite self.maxeid to avoid deletion of just reintroduced types
self.maxeid = cnx.execute('Any MAX(X)')[0][0]
def test_drop_relation_definition_with_specialization(self):
with self.mh() as (cnx, mh):
self.assertEqual(sorted(str(e) for e in self.schema['concerne'].subjects()),
- ['Affaire', 'Personne'])
+ ['Affaire', 'Personne'])
self.assertEqual(sorted(str(e) for e in self.schema['concerne'].objects()),
- ['Affaire', 'Division', 'Note', 'Societe', 'SubDivision'])
+ ['Affaire', 'Division', 'Note', 'Societe', 'SubDivision'])
mh.cmd_drop_relation_definition('Affaire', 'concerne', 'Societe')
self.assertEqual(sorted(str(e) for e in self.schema['concerne'].subjects()),
- ['Affaire', 'Personne'])
+ ['Affaire', 'Personne'])
self.assertEqual(sorted(str(e) for e in self.schema['concerne'].objects()),
- ['Affaire', 'Note'])
+ ['Affaire', 'Note'])
mh.cmd_add_relation_definition('Affaire', 'concerne', 'Societe')
self.assertEqual(sorted(str(e) for e in self.schema['concerne'].subjects()),
- ['Affaire', 'Personne'])
+ ['Affaire', 'Personne'])
self.assertEqual(sorted(str(e) for e in self.schema['concerne'].objects()),
- ['Affaire', 'Division', 'Note', 'Societe', 'SubDivision'])
+ ['Affaire', 'Division', 'Note', 'Societe', 'SubDivision'])
# trick: overwrite self.maxeid to avoid deletion of just reintroduced types
self.maxeid = cnx.execute('Any MAX(X)')[0][0]
@@ -546,8 +542,8 @@
# new rql expr to add note entity
eexpr = self._erqlexpr_entity(cnx, 'add', 'Note')
self.assertEqual(eexpr.expression,
- 'X ecrit_part PE, U in_group G, '
- 'PE require_permission P, P name "add_note", P require_group G')
+ 'X ecrit_part PE, U in_group G, '
+ 'PE require_permission P, P name "add_note", P require_group G')
self.assertEqual([et.name for et in eexpr.reverse_add_permission], ['Note'])
self.assertEqual(eexpr.reverse_read_permission, ())
self.assertEqual(eexpr.reverse_delete_permission, ())
@@ -558,9 +554,10 @@
# new rql expr to add ecrit_par relation
rexpr = self._rrqlexpr_entity(cnx, 'add', 'ecrit_par')
self.assertEqual(rexpr.expression,
- 'O require_permission P, P name "add_note", '
- 'U in_group G, P require_group G')
- self.assertEqual([rdef.rtype.name for rdef in rexpr.reverse_add_permission], ['ecrit_par'])
+ 'O require_permission P, P name "add_note", '
+ 'U in_group G, P require_group G')
+ self.assertEqual([rdef.rtype.name for rdef in rexpr.reverse_add_permission],
+ ['ecrit_par'])
self.assertEqual(rexpr.reverse_read_permission, ())
self.assertEqual(rexpr.reverse_delete_permission, ())
# no more rqlexpr to delete and add travaille relation
@@ -590,14 +587,16 @@
# * 2 implicit new for attributes (Note.para, Person.test)
# remaining orphan rql expr which should be deleted at commit (composite relation)
# unattached expressions -> pending deletion on commit
- self.assertEqual(cnx.execute('Any COUNT(X) WHERE X is RQLExpression, X exprtype "ERQLExpression",'
- 'NOT ET1 read_permission X, NOT ET2 add_permission X, '
- 'NOT ET3 delete_permission X, NOT ET4 update_permission X')[0][0],
- 7)
- self.assertEqual(cnx.execute('Any COUNT(X) WHERE X is RQLExpression, X exprtype "RRQLExpression",'
- 'NOT ET1 read_permission X, NOT ET2 add_permission X, '
- 'NOT ET3 delete_permission X, NOT ET4 update_permission X')[0][0],
- 2)
+ self.assertEqual(
+ cnx.execute('Any COUNT(X) WHERE X is RQLExpression, X exprtype "ERQLExpression",'
+ 'NOT ET1 read_permission X, NOT ET2 add_permission X, '
+ 'NOT ET3 delete_permission X, NOT ET4 update_permission X')[0][0],
+ 7)
+ self.assertEqual(
+ cnx.execute('Any COUNT(X) WHERE X is RQLExpression, X exprtype "RRQLExpression",'
+ 'NOT ET1 read_permission X, NOT ET2 add_permission X, '
+ 'NOT ET3 delete_permission X, NOT ET4 update_permission X')[0][0],
+ 2)
# finally
self.assertEqual(cnx.execute('Any COUNT(X) WHERE X is RQLExpression')[0][0],
nbrqlexpr_start + 1 + 2 + 2 + 2)
@@ -605,8 +604,9 @@
# unique_together test
self.assertEqual(len(self.schema.eschema('Personne')._unique_together), 1)
self.assertCountEqual(self.schema.eschema('Personne')._unique_together[0],
- ('nom', 'prenom', 'datenaiss'))
- rset = cnx.execute('Any C WHERE C is CWUniqueTogetherConstraint, C constraint_of ET, ET name "Personne"')
+ ('nom', 'prenom', 'datenaiss'))
+ rset = cnx.execute('Any C WHERE C is CWUniqueTogetherConstraint, '
+ 'C constraint_of ET, ET name "Personne"')
self.assertEqual(len(rset), 1)
relations = [r.name for r in rset.get_entity(0, 0).relations]
self.assertCountEqual(relations, ('nom', 'prenom', 'datenaiss'))
@@ -628,8 +628,9 @@
return rset.get_entity(0, 0)
def _rrqlexpr_rset(self, cnx, action, ertype):
- rql = 'RQLExpression X WHERE RT is CWRType, RDEF %s_permission X, RT name %%(name)s, RDEF relation_type RT' % action
- return cnx.execute(rql, {'name': ertype})
+ return cnx.execute('RQLExpression X WHERE RT is CWRType, RDEF %s_permission X, '
+ 'RT name %%(name)s, RDEF relation_type RT' % action,
+ {'name': ertype})
def _rrqlexpr_entity(self, cnx, action, ertype):
rset = self._rrqlexpr_rset(cnx, action, ertype)
@@ -667,15 +668,21 @@
'sender', 'in_thread', 'reply_to', 'data_format'):
self.assertNotIn(ertype, schema)
self.assertEqual(sorted(schema['see_also'].rdefs),
- sorted([('Folder', 'Folder'),
- ('Bookmark', 'Bookmark'),
- ('Bookmark', 'Note'),
- ('Note', 'Note'),
- ('Note', 'Bookmark')]))
- self.assertEqual(sorted(schema['see_also'].subjects()), ['Bookmark', 'Folder', 'Note'])
- self.assertEqual(sorted(schema['see_also'].objects()), ['Bookmark', 'Folder', 'Note'])
- self.assertEqual(cnx.execute('Any X WHERE X pkey "system.version.fakeemail"').rowcount, 0)
- self.assertEqual(cnx.execute('Any X WHERE X pkey "system.version.file"').rowcount, 0)
+ sorted([('Folder', 'Folder'),
+ ('Bookmark', 'Bookmark'),
+ ('Bookmark', 'Note'),
+ ('Note', 'Note'),
+ ('Note', 'Bookmark')]))
+ self.assertEqual(sorted(schema['see_also'].subjects()),
+ ['Bookmark', 'Folder', 'Note'])
+ self.assertEqual(sorted(schema['see_also'].objects()),
+ ['Bookmark', 'Folder', 'Note'])
+ self.assertEqual(
+ cnx.execute('Any X WHERE X pkey "system.version.fakeemail"').rowcount,
+ 0)
+ self.assertEqual(
+ cnx.execute('Any X WHERE X pkey "system.version.file"').rowcount,
+ 0)
finally:
mh.cmd_add_cube('fakeemail')
self.assertIn('fakeemail', self.config.cubes())
@@ -691,17 +698,18 @@
('Bookmark', 'Note'),
('Note', 'Note'),
('Note', 'Bookmark')]))
- self.assertEqual(sorted(schema['see_also'].subjects()), ['Bookmark', 'EmailThread', 'Folder', 'Note'])
- self.assertEqual(sorted(schema['see_also'].objects()), ['Bookmark', 'EmailThread', 'Folder', 'Note'])
+ self.assertEqual(sorted(schema['see_also'].subjects()),
+ ['Bookmark', 'EmailThread', 'Folder', 'Note'])
+ self.assertEqual(sorted(schema['see_also'].objects()),
+ ['Bookmark', 'EmailThread', 'Folder', 'Note'])
from cubes.fakeemail.__pkginfo__ import version as email_version
from cubes.file.__pkginfo__ import version as file_version
- self.assertEqual(cnx.execute('Any V WHERE X value V, X pkey "system.version.fakeemail"')[0][0],
- email_version)
- self.assertEqual(cnx.execute('Any V WHERE X value V, X pkey "system.version.file"')[0][0],
- file_version)
- # trick: overwrite self.maxeid to avoid deletion of just reintroduced
- # types (and their associated tables!)
- self.maxeid = cnx.execute('Any MAX(X)')[0][0]
+ self.assertEqual(
+ cnx.execute('Any V WHERE X value V, X pkey "system.version.fakeemail"')[0][0],
+ email_version)
+ self.assertEqual(
+ cnx.execute('Any V WHERE X value V, X pkey "system.version.file"')[0][0],
+ file_version)
# why this commit is necessary is unclear to me (though without it
# next test may fail complaining of missing tables
cnx.commit()
@@ -739,20 +747,21 @@
self.assertEqual(self.schema['Note'].specializes().type, 'Para')
mh.cmd_add_entity_type('Text')
self.assertEqual(sorted(et.type for et in self.schema['Para'].specialized_by()),
- ['Note', 'Text'])
+ ['Note', 'Text'])
self.assertEqual(self.schema['Text'].specializes().type, 'Para')
# test columns have been actually added
- text = cnx.execute('INSERT Text X: X para "hip", X summary "hop", X newattr "momo"').get_entity(0, 0)
- note = cnx.execute('INSERT Note X: X para "hip", X shortpara "hop", X newattr "momo", X unique_id "x"').get_entity(0, 0)
- aff = cnx.execute('INSERT Affaire X').get_entity(0, 0)
- self.assertTrue(cnx.execute('SET X newnotinlined Y WHERE X eid %(x)s, Y eid %(y)s',
- {'x': text.eid, 'y': aff.eid}))
+ text = cnx.create_entity('Text', para=u"hip", summary=u"hop", newattr=u"momo")
+ note = cnx.create_entity('Note', para=u"hip", shortpara=u"hop",
+ newattr=u"momo", unique_id=u"x")
+ aff = cnx.create_entity('Affaire')
self.assertTrue(cnx.execute('SET X newnotinlined Y WHERE X eid %(x)s, Y eid %(y)s',
- {'x': note.eid, 'y': aff.eid}))
+ {'x': text.eid, 'y': aff.eid}))
+ self.assertTrue(cnx.execute('SET X newnotinlined Y WHERE X eid %(x)s, Y eid %(y)s',
+ {'x': note.eid, 'y': aff.eid}))
self.assertTrue(cnx.execute('SET X newinlined Y WHERE X eid %(x)s, Y eid %(y)s',
- {'x': text.eid, 'y': aff.eid}))
+ {'x': text.eid, 'y': aff.eid}))
self.assertTrue(cnx.execute('SET X newinlined Y WHERE X eid %(x)s, Y eid %(y)s',
- {'x': note.eid, 'y': aff.eid}))
+ {'x': note.eid, 'y': aff.eid}))
# XXX remove specializes by ourselves, else tearDown fails when removing
# Para because of Note inheritance. This could be fixed by putting the
# MemSchemaCWETypeDel(session, name) operation in the
@@ -789,7 +798,7 @@
def test_drop_required_inlined_relation(self):
with self.mh() as (cnx, mh):
bob = mh.cmd_create_entity('Personne', nom=u'bob')
- note = mh.cmd_create_entity('Note', ecrit_par=bob)
+ mh.cmd_create_entity('Note', ecrit_par=bob)
mh.commit()
rdef = mh.fs_schema.rschema('ecrit_par').rdefs[('Note', 'Personne')]
with tempattr(rdef, 'cardinality', '1*'):
@@ -800,7 +809,7 @@
def test_drop_inlined_rdef_delete_data(self):
with self.mh() as (cnx, mh):
- note = mh.cmd_create_entity('Note', ecrit_par=cnx.user.eid)
+ mh.cmd_create_entity('Note', ecrit_par=cnx.user.eid)
mh.commit()
mh.drop_relation_definition('Note', 'ecrit_par', 'CWUser')
self.assertFalse(mh.sqlexec('SELECT * FROM cw_Note WHERE cw_ecrit_par IS NOT NULL'))