--- a/server/test/data/bootstrap_cubes Wed May 13 16:28:21 2009 +0200
+++ b/server/test/data/bootstrap_cubes Wed May 13 16:59:50 2009 +0200
@@ -1,1 +1,1 @@
-comment,folder,tag,basket,email,file
+card,comment,folder,tag,basket,email,file
--- a/server/test/unittest_hookhelper.py Wed May 13 16:28:21 2009 +0200
+++ b/server/test/unittest_hookhelper.py Wed May 13 16:59:50 2009 +0200
@@ -4,23 +4,23 @@
from logilab.common.testlib import unittest_main
from cubicweb.devtools.apptest import RepositoryBasedTC
-from cubicweb.server.pool import LateOperation
+from cubicweb.server.pool import LateOperation, Operation, SingleLastOperation
from cubicweb.server.hookhelper import *
class HookHelpersTC(RepositoryBasedTC):
-
+
def setUp(self):
RepositoryBasedTC.setUp(self)
self.hm = self.repo.hm
-
+
def test_late_operation(self):
session = self.session
l1 = LateOperation(session)
l2 = LateOperation(session)
l3 = Operation(session)
self.assertEquals(session.pending_operations, [l3, l1, l2])
-
+
def test_single_last_operation(self):
session = self.session
l0 = SingleLastOperation(session)
@@ -30,7 +30,7 @@
self.assertEquals(session.pending_operations, [l3, l1, l2, l0])
l4 = SingleLastOperation(session)
self.assertEquals(session.pending_operations, [l3, l1, l2, l4])
-
+
def test_global_operation_order(self):
from cubicweb.server import hooks, schemahooks
session = self.session
@@ -42,8 +42,8 @@
op4 = hooks.DelayedDeleteOp(session)
op5 = hooks.CheckORelationOp(session)
self.assertEquals(session.pending_operations, [op1, op2, op4, op5, op3])
-
-
+
+
def test_in_state_notification(self):
result = []
# test both email notification and transition_information
@@ -78,6 +78,6 @@
if isinstance(op, SendMailOp)]
self.assertEquals(len(searchedops), 0,
self.session.pending_operations)
-
+
if __name__ == '__main__':
unittest_main()
--- a/server/test/unittest_hooks.py Wed May 13 16:28:21 2009 +0200
+++ b/server/test/unittest_hooks.py Wed May 13 16:59:50 2009 +0200
@@ -20,9 +20,9 @@
Repository.get_versions = orig_get_versions
-
+
class CoreHooksTC(RepositoryBasedTC):
-
+
def test_delete_internal_entities(self):
self.assertRaises(RepositoryError, self.execute,
'DELETE CWEType X WHERE X name "CWEType"')
@@ -40,17 +40,17 @@
self.execute('DELETE X in_group Y WHERE X login "toto"')
self.execute('SET X in_group Y WHERE X login "toto", Y name "guests"')
self.commit()
-
+
def test_delete_required_relations_object(self):
self.skip('no sample in the schema ! YAGNI ? Kermaat ?')
-
+
def test_static_vocabulary_check(self):
self.assertRaises(ValidationError,
self.execute,
'SET X composite "whatever" WHERE X from_entity FE, FE name "CWUser", X relation_type RT, RT name "in_group"')
-
+
def test_missing_required_relations_subject_inline(self):
- # missing in_group relation
+ # missing in_group relation
self.execute('INSERT CWUser X: X login "toto", X upassword "hop"')
self.assertRaises(ValidationError,
self.commit)
@@ -76,7 +76,7 @@
self.execute('SET X sender Y WHERE X is Email, Y is EmailAddress')
rset = self.execute('Any S WHERE X sender S, X eid %s' % eeid)
self.assertEquals(len(rset), 1)
-
+
def test_composite_1(self):
self.execute('INSERT EmailAddress X: X address "toto@logilab.fr", X alias "hop"')
self.execute('INSERT EmailPart X: X content_format "text/plain", X ordernum 1, X content "this is a test"')
@@ -90,7 +90,7 @@
self.commit()
rset = self.execute('Any X WHERE X is EmailPart')
self.assertEquals(len(rset), 0)
-
+
def test_composite_2(self):
self.execute('INSERT EmailAddress X: X address "toto@logilab.fr", X alias "hop"')
self.execute('INSERT EmailPart X: X content_format "text/plain", X ordernum 1, X content "this is a test"')
@@ -102,7 +102,7 @@
self.commit()
rset = self.execute('Any X WHERE X is EmailPart')
self.assertEquals(len(rset), 0)
-
+
def test_composite_redirection(self):
self.execute('INSERT EmailAddress X: X address "toto@logilab.fr", X alias "hop"')
self.execute('INSERT EmailPart X: X content_format "text/plain", X ordernum 1, X content "this is a test"')
@@ -148,11 +148,11 @@
self.execute('SET A descr "R&D<p>yo" WHERE A eid %s' % entity.eid)
entity = self.execute('Any A WHERE A eid %s' % entity.eid).get_entity(0, 0)
self.assertEquals(entity.descr, u'R&D<p>yo</p>')
-
+
+
-
class UserGroupHooksTC(RepositoryBasedTC):
-
+
def test_user_synchronization(self):
self.create_user('toto', password='hop', commit=False)
self.assertRaises(AuthenticationError,
@@ -193,9 +193,9 @@
self.execute('DELETE EmailAddress X WHERE X eid %s' % eid)
self.commit()
self.failIf(self.execute('Any X WHERE X created_by Y, X eid >= %(x)s', {'x': eid}))
-
+
class CWPropertyHooksTC(RepositoryBasedTC):
-
+
def test_unexistant_eproperty(self):
ex = self.assertRaises(ValidationError,
self.execute, 'INSERT CWProperty X: X pkey "bla.bla", X value "hop", X for_user U')
@@ -203,12 +203,12 @@
ex = self.assertRaises(ValidationError,
self.execute, 'INSERT CWProperty X: X pkey "bla.bla", X value "hop"')
self.assertEquals(ex.errors, {'pkey': 'unknown property key'})
-
+
def test_site_wide_eproperty(self):
ex = self.assertRaises(ValidationError,
self.execute, 'INSERT CWProperty X: X pkey "ui.site-title", X value "hop", X for_user U')
self.assertEquals(ex.errors, {'for_user': "site-wide property can't be set for user"})
-
+
def test_bad_type_eproperty(self):
ex = self.assertRaises(ValidationError,
self.execute, 'INSERT CWProperty X: X pkey "ui.language", X value "hop", X for_user U')
@@ -216,10 +216,10 @@
ex = self.assertRaises(ValidationError,
self.execute, 'INSERT CWProperty X: X pkey "ui.language", X value "hop"')
self.assertEquals(ex.errors, {'value': u'unauthorized value'})
-
-
+
+
class SchemaHooksTC(RepositoryBasedTC):
-
+
def test_duplicate_etype_error(self):
# check we can't add a CWEType or CWRType entity if it already exists one
# with the same name
@@ -229,7 +229,7 @@
self.execute, 'INSERT CWEType X: X name "Societe"')
self.assertRaises((ValidationError, RepositoryError),
self.execute, 'INSERT CWRType X: X name "in_group"')
-
+
def test_validation_unique_constraint(self):
self.assertRaises(ValidationError,
self.execute, 'INSERT CWUser X: X login "admin"')
@@ -253,13 +253,13 @@
RepositoryBasedTC.setUp(self)
def index_exists(self, etype, attr, unique=False):
- dbhelper = self.session.pool.source('system').dbhelper
+ dbhelper = self.session.pool.source('system').dbhelper
sqlcursor = self.session.pool['system']
return dbhelper.index_exists(sqlcursor, SQL_PREFIX + etype, SQL_PREFIX + attr, unique=unique)
-
+
def test_base(self):
schema = self.repo.schema
- dbhelper = self.session.pool.source('system').dbhelper
+ dbhelper = self.session.pool.source('system').dbhelper
sqlcursor = self.session.pool['system']
self.failIf(schema.has_entity('Societe2'))
self.failIf(schema.has_entity('concerne2'))
@@ -332,8 +332,8 @@
self.failUnless('subdiv' in snames)
snames = [name for name, in self.execute('Any N WHERE S is_instance_of Division, S nom N')]
self.failUnless('subdiv' in snames)
-
-
+
+
def test_perms_synchronization_1(self):
schema = self.repo.schema
self.assertEquals(schema['CWUser'].get_groups('read'), set(('managers', 'users')))
@@ -374,9 +374,9 @@
self.execute('Any X WHERE X is CWEType, X name "CWEType"')
# schema modification hooks tests #########################################
-
+
def test_uninline_relation(self):
- dbhelper = self.session.pool.source('system').dbhelper
+ dbhelper = self.session.pool.source('system').dbhelper
sqlcursor = self.session.pool['system']
# Personne inline2 Affaire inline
# insert a person without inline2 relation (not mandatory)
@@ -410,7 +410,7 @@
self.assertEquals(rset.rows[0], [peid, aeid])
def test_indexed_change(self):
- dbhelper = self.session.pool.source('system').dbhelper
+ dbhelper = self.session.pool.source('system').dbhelper
sqlcursor = self.session.pool['system']
try:
self.execute('SET X indexed TRUE WHERE X relation_type R, R name "sujet"')
@@ -428,7 +428,7 @@
self.failIf(self.index_exists('Affaire', 'sujet'))
def test_unique_change(self):
- dbhelper = self.session.pool.source('system').dbhelper
+ dbhelper = self.session.pool.source('system').dbhelper
sqlcursor = self.session.pool['system']
try:
try:
@@ -453,7 +453,7 @@
self.commit()
self.failIf(self.schema['Affaire'].has_unique_values('sujet'))
self.failIf(self.index_exists('Affaire', 'sujet', unique=True))
-
+
class WorkflowHooksTC(RepositoryBasedTC):
@@ -468,7 +468,7 @@
# enforcement
self.execute('SET X require_group G WHERE G name "users", X transition_of ET, ET name "CWUser"')
self.commit()
-
+
def tearDown(self):
self.execute('DELETE X require_group G WHERE G name "users", X transition_of ET, ET name "CWUser"')
self.commit()
@@ -483,7 +483,7 @@
initialstate = self.execute('Any N WHERE S name N, X in_state S, X eid %(x)s',
{'x' : ueid})[0][0]
self.assertEquals(initialstate, u'activated')
-
+
def test_initial_state(self):
cnx = self.login('stduser')
cu = cnx.cursor()
@@ -495,7 +495,7 @@
self.execute('INSERT CWUser X: X login "badaboum", X upassword %(pwd)s, '
'X in_state S, X in_group G WHERE S name "deactivated", G name "users"', {'pwd': 'oops'})
self.commit()
-
+
# test that the workflow is correctly enforced
def test_transition_checking1(self):
cnx = self.login('stduser')
@@ -505,7 +505,7 @@
cu.execute, 'SET X in_state S WHERE X eid %(x)s, S eid %(s)s',
{'x': ueid, 's': self.s_activated}, 'x')
cnx.close()
-
+
def test_transition_checking2(self):
cnx = self.login('stduser')
cu = cnx.cursor()
@@ -514,7 +514,7 @@
cu.execute, 'SET X in_state S WHERE X eid %(x)s, S eid %(s)s',
{'x': ueid, 's': self.s_dummy}, 'x')
cnx.close()
-
+
def test_transition_checking3(self):
cnx = self.login('stduser')
cu = cnx.cursor()
@@ -530,7 +530,7 @@
{'x': ueid, 's': self.s_activated}, 'x')
cnx.commit()
cnx.close()
-
+
def test_transition_checking4(self):
cnx = self.login('stduser')
cu = cnx.cursor()
@@ -559,7 +559,7 @@
self.assertEquals(tr.comment, None)
self.assertEquals(tr.from_state[0].eid, self.s_activated)
self.assertEquals(tr.to_state[0].eid, self.s_deactivated)
-
+
self.session.set_shared_data('trcomment', u'il est pas sage celui-la')
self.session.set_shared_data('trcommentformat', u'text/plain')
self.execute('SET X in_state S WHERE X eid %(x)s, S eid %(s)s',
@@ -591,6 +591,6 @@
cu = cnx.cursor()
self.failUnless(cu.execute("INSERT Note X: X type 'a', X in_state S WHERE S name 'todo'"))
cnx.commit()
-
+
if __name__ == '__main__':
unittest_main()
--- a/server/test/unittest_migractions.py Wed May 13 16:28:21 2009 +0200
+++ b/server/test/unittest_migractions.py Wed May 13 16:59:50 2009 +0200
@@ -19,10 +19,10 @@
def teardown_module(*args):
Repository.get_versions = orig_get_versions
-
+
class MigrationCommandsTC(RepositoryBasedTC):
copy_schema = True
-
+
def setUp(self):
if not hasattr(self, '_repo'):
# first initialization
@@ -43,7 +43,7 @@
interactive=False)
assert self.cnx is self.mh._cnx
assert self.session is self.mh.session, (self.session.id, self.mh.session.id)
-
+
def test_add_attribute_int(self):
self.failIf('whatever' in self.schema)
paraordernum = self.mh.rqlexec('Any O WHERE X name "Note", RT name "para", RDEF from_entity X, RDEF relation_type RT, RDEF ordernum O')[0][0]
@@ -73,7 +73,7 @@
fields = dict(x.strip().split()[:2] for x in notesql.split('(', 1)[1].rsplit(')', 1)[0].split(','))
self.assertEquals(fields['%sshortpara' % SQL_PREFIX], 'varchar(64)')
self.mh.rollback()
-
+
def test_add_datetime_with_default_value_attribute(self):
self.failIf('mydate' in self.schema)
self.mh.cmd_add_attribute('Note', 'mydate')
@@ -88,7 +88,7 @@
self.assertEquals(d1, date.today())
self.assertEquals(d2, testdate)
self.mh.rollback()
-
+
def test_rename_attribute(self):
self.failIf('civility' in self.schema)
eid1 = self.mh.rqlexec('INSERT Personne X: X nom "lui", X sexe "M"')[0][0]
@@ -121,7 +121,7 @@
self.assertEquals(t1, "baz")
gn = self.mh.rqlexec('Any GN WHERE T require_group G, G name GN, T eid %s' % baz)[0][0]
self.assertEquals(gn, 'managers')
-
+
def test_add_entity_type(self):
self.failIf('Folder2' in self.schema)
self.failIf('filed_under2' in self.schema)
@@ -137,7 +137,7 @@
self.assertEquals([str(rs) for rs in self.schema['Folder2'].object_relations()],
['filed_under2', 'identity'])
self.assertEquals(sorted(str(e) for e in self.schema['filed_under2'].subjects()),
- ['Affaire', 'Card', 'Division', 'Email', 'EmailThread', 'File',
+ ['Affaire', 'Card', 'Division', 'Email', 'EmailThread', 'File',
'Folder2', 'Image', 'Note', 'Personne', 'Societe', 'SubDivision'])
self.assertEquals(self.schema['filed_under2'].objects(), ('Folder2',))
eschema = self.schema.eschema('Folder2')
@@ -164,7 +164,7 @@
self.mh.cmd_add_relation_type('filed_under2')
self.failUnless('filed_under2' in self.schema)
self.assertEquals(sorted(str(e) for e in self.schema['filed_under2'].subjects()),
- ['Affaire', 'Card', 'Division', 'Email', 'EmailThread', 'File',
+ ['Affaire', 'Card', 'Division', 'Email', 'EmailThread', 'File',
'Folder2', 'Image', 'Note', 'Personne', 'Societe', 'SubDivision'])
self.assertEquals(self.schema['filed_under2'].objects(), ('Folder2',))
@@ -178,8 +178,8 @@
def test_add_relation_definition(self):
self.mh.cmd_add_relation_definition('Societe', 'in_state', 'State')
- self.assertEquals(sorted(self.schema['in_state'].subjects()),
- ['Affaire', 'Division', 'CWUser', 'Note', 'Societe', 'SubDivision'])
+ self.assertEquals(sorted(str(x) for x in self.schema['in_state'].subjects()),
+ ['Affaire', 'CWUser', 'Division', 'Note', 'Societe', 'SubDivision'])
self.assertEquals(self.schema['in_state'].objects(), ('State',))
def test_add_relation_definition_nortype(self):
@@ -195,7 +195,7 @@
self.mh.cmd_drop_relation_definition('Personne', 'concerne', 'Affaire')
self.assertEquals(sorted(str(e) for e in self.schema['concerne'].subjects()), ['Affaire'])
self.assertEquals(sorted(str(e) for e in self.schema['concerne'].objects()), ['Division', 'Note', 'Societe', 'SubDivision'])
-
+
def test_drop_relation_definition_with_specialization(self):
self.failUnless('concerne' in self.schema)
self.assertEquals(sorted(str(e) for e in self.schema['concerne'].subjects()), ['Affaire', 'Personne'])
@@ -204,13 +204,13 @@
self.mh.cmd_drop_relation_definition('None', 'concerne', 'Societe')
self.assertEquals(sorted(str(e) for e in self.schema['concerne'].subjects()), ['Affaire', 'Personne'])
self.assertEquals(sorted(str(e) for e in self.schema['concerne'].objects()), ['Affaire', 'Note'])
-
+
def test_drop_relation_definition2(self):
self.failUnless('evaluee' in self.schema)
self.mh.cmd_drop_relation_definition('Personne', 'evaluee', 'Note')
self.failUnless('evaluee' in self.schema)
self.assertEquals(sorted(self.schema['evaluee'].subjects()),
- ['Division', 'CWUser', 'Societe', 'SubDivision'])
+ ['CWUser', 'Division', 'Societe', 'SubDivision'])
self.assertEquals(sorted(self.schema['evaluee'].objects()),
['Note'])
@@ -229,7 +229,7 @@
finally:
self.mh.cmd_change_relation_props('Affaire', 'concerne', 'Societe',
cardinality='**')
-
+
def test_change_relation_props_final(self):
rschema = self.schema['adel']
card = rschema.rproperty('Personne', 'String', 'fulltextindexed')
@@ -255,7 +255,7 @@
# self.assertEquals([rs.type for rs in migrschema['Personne'].ordered_relations() if rs.is_final()],
# expected)
migrschema['Personne'].description = 'blabla bla'
- migrschema['titre'].description = 'usually a title'
+ migrschema['titre'].description = 'usually a title'
migrschema['titre']._rproperties[('Personne', 'String')]['description'] = 'title for this person'
# rinorderbefore = cursor.execute('Any O,N WHERE X is CWAttribute, X relation_type RT, RT name N,'
# 'X from_entity FE, FE name "Personne",'
@@ -264,9 +264,9 @@
# u'sexe', u'promo', u'titre', u'adel', u'ass', u'web', u'tel',
# u'fax', u'datenaiss', u'test', u'description']
# self.assertListEquals(rinorderbefore, map(list, zip([0, 0]+range(1, len(expected)), expected)))
-
- self.mh.cmd_synchronize_schema(commit=False)
-
+
+ self.mh.cmd_sync_schema_props_perms(commit=False)
+
self.assertEquals(cursor.execute('Any D WHERE X name "Personne", X description D')[0][0],
'blabla bla')
self.assertEquals(cursor.execute('Any D WHERE X name "titre", X description D')[0][0],
@@ -300,7 +300,7 @@
# no more rqlexpr to delete and add para attribute
self.failIf(self._rrqlexpr_rset('add', 'para'))
self.failIf(self._rrqlexpr_rset('delete', 'para'))
- # new rql expr to add ecrit_par relation
+ # new rql expr to add ecrit_par relation
rexpr = self._rrqlexpr_entity('add', 'ecrit_par')
self.assertEquals(rexpr.expression,
'O require_permission P, P name "add_note", '
@@ -333,8 +333,8 @@
self.assertEquals(len(cursor.execute('RQLExpression X WHERE NOT ET1 read_permission X, NOT ET2 add_permission X, '
'NOT ET3 delete_permission X, NOT ET4 update_permission X')), 8+1)
# finally
- self.assertEquals(len(cursor.execute('RQLExpression X')), nbrqlexpr_start + 1 + 2)
-
+ self.assertEquals(len(cursor.execute('RQLExpression X')), nbrqlexpr_start + 1 + 2)
+
self.mh.rollback()
def _erqlexpr_rset(self, action, ertype):
@@ -351,7 +351,7 @@
rset = self._rrqlexpr_rset(action, ertype)
self.assertEquals(len(rset), 1)
return rset.get_entity(0, 0)
-
+
def test_set_size_constraint(self):
# existing previous value
try:
@@ -378,7 +378,7 @@
cubes.remove('email')
cubes.remove('file')
self.assertEquals(set(self.config.cubes()), cubes)
- for ertype in ('Email', 'EmailThread', 'EmailPart', 'File', 'Image',
+ for ertype in ('Email', 'EmailThread', 'EmailPart', 'File', 'Image',
'sender', 'in_thread', 'reply_to', 'data_format'):
self.failIf(ertype in schema, ertype)
self.assertEquals(sorted(schema['see_also']._rproperties.keys()),
@@ -402,7 +402,7 @@
cubes.add('email')
cubes.add('file')
self.assertEquals(set(self.config.cubes()), cubes)
- for ertype in ('Email', 'EmailThread', 'EmailPart', 'File', 'Image',
+ for ertype in ('Email', 'EmailThread', 'EmailPart', 'File', 'Image',
'sender', 'in_thread', 'reply_to', 'data_format'):
self.failUnless(ertype in schema, ertype)
self.assertEquals(sorted(schema['see_also']._rproperties.keys()),
@@ -426,17 +426,13 @@
self.maxeid = self.execute('Any MAX(X)')[0][0]
# why this commit is necessary is unclear to me (though without it
# next test may fail complaining of missing tables
- self.commit()
+ self.commit()
def test_set_state(self):
user = self.session.user
- self.set_debug(True)
self.mh.set_state(user.eid, 'deactivated')
user.clear_related_cache('in_state', 'subject')
- try:
- self.assertEquals(user.state, 'deactivated')
- finally:
- self.set_debug(False)
-
+ self.assertEquals(user.state, 'deactivated')
+
if __name__ == '__main__':
unittest_main()
--- a/server/test/unittest_querier.py Wed May 13 16:28:21 2009 +0200
+++ b/server/test/unittest_querier.py Wed May 13 16:59:50 2009 +0200
@@ -40,10 +40,10 @@
class MakeSchemaTC(TestCase):
def test_known_values(self):
solution = {'A': 'String', 'B': 'CWUser'}
- self.assertEquals(make_schema((Variable('A'), Variable('B')), solution,
+ self.assertEquals(make_schema((Variable('A'), Variable('B')), solution,
'table0', TYPEMAP),
('C0 text,C1 integer', {'A': 'table0.C0', 'B': 'table0.C1'}))
-
+
repo, cnx = init_test_database('sqlite')
@@ -51,19 +51,19 @@
class UtilsTC(BaseQuerierTC):
repo = repo
-
+
def get_max_eid(self):
# no need for cleanup here
return None
def cleanup(self):
# no need for cleanup here
pass
-
+
def test_preprocess_1(self):
reid = self.execute('Any X WHERE X is CWRType, X name "owned_by"')[0][0]
rqlst = self._prepare('Any COUNT(RDEF) WHERE RDEF relation_type X, X eid %(x)s', {'x': reid})
self.assertEquals(rqlst.solutions, [{'RDEF': 'CWAttribute'}, {'RDEF': 'CWRelation'}])
-
+
def test_preprocess_2(self):
teid = self.execute("INSERT Tag X: X name 'tag'")[0][0]
#geid = self.execute("CWGroup G WHERE G name 'users'")[0][0]
@@ -73,7 +73,7 @@
# the query may be optimized, should keep only one solution
# (any one, etype will be discarded)
self.assertEquals(len(rqlst.solutions), 1)
-
+
def test_preprocess_security(self):
plan = self._prepare_plan('Any ETN,COUNT(X) GROUPBY ETN '
'WHERE X is ET, ET name ETN')
@@ -109,7 +109,7 @@
'ET': 'CWEType', 'ETN': 'String'}])
rql, solutions = partrqls[1]
self.assertEquals(rql, 'Any ETN,X WHERE X is ET, ET name ETN, ET is CWEType, '
- 'X is IN(Bookmark, Card, Comment, Division, CWCache, CWConstraint, CWConstraintType, CWEType, CWAttribute, CWGroup, CWRelation, CWPermission, CWProperty, CWRType, CWUser, Email, EmailAddress, EmailPart, EmailThread, File, Folder, Image, Note, Personne, RQLExpression, Societe, State, SubDivision, Tag, TrInfo, Transition)')
+ 'X is IN(Bookmark, CWAttribute, CWCache, CWConstraint, CWConstraintType, CWEType, CWGroup, CWPermission, CWProperty, CWRType, CWRelation, CWUser, Card, Comment, Division, Email, EmailAddress, EmailPart, EmailThread, File, Folder, Image, Note, Personne, RQLExpression, Societe, State, SubDivision, Tag, TrInfo, Transition)')
self.assertListEquals(sorted(solutions),
sorted([{'X': 'Bookmark', 'ETN': 'String', 'ET': 'CWEType'},
{'X': 'Card', 'ETN': 'String', 'ET': 'CWEType'},
@@ -162,11 +162,11 @@
self.assertEquals(len(subq.children), 3)
self.assertEquals([t.as_string() for t in union.children[0].selection],
['MAX(X)'])
-
+
def test_preprocess_nonregr(self):
rqlst = self._prepare('Any S ORDERBY SI WHERE NOT S ecrit_par O, S para SI')
self.assertEquals(len(rqlst.solutions), 1)
-
+
def test_build_description(self):
# should return an empty result set
rset = self.execute('Any X WHERE X eid %(x)s', {'x': self.session.user.eid})
@@ -207,66 +207,66 @@
def test_unknown_eid(self):
# should return an empty result set
self.failIf(self.execute('Any X WHERE X eid 99999999'))
-
+
# selection queries tests #################################################
-
+
def test_select_1(self):
rset = self.execute('Any X ORDERBY X WHERE X is CWGroup')
result, descr = rset.rows, rset.description
self.assertEquals(tuplify(result), [(1,), (2,), (3,), (4,)])
self.assertEquals(descr, [('CWGroup',), ('CWGroup',), ('CWGroup',), ('CWGroup',)])
-
+
def test_select_2(self):
rset = self.execute('Any X ORDERBY N WHERE X is CWGroup, X name N')
self.assertEquals(tuplify(rset.rows), [(3,), (1,), (4,), (2,)])
self.assertEquals(rset.description, [('CWGroup',), ('CWGroup',), ('CWGroup',), ('CWGroup',)])
rset = self.execute('Any X ORDERBY N DESC WHERE X is CWGroup, X name N')
self.assertEquals(tuplify(rset.rows), [(2,), (4,), (1,), (3,)])
-
+
def test_select_3(self):
rset = self.execute('Any N GROUPBY N WHERE X is CWGroup, X name N')
result, descr = rset.rows, rset.description
result.sort()
self.assertEquals(tuplify(result), [('guests',), ('managers',), ('owners',), ('users',)])
self.assertEquals(descr, [('String',), ('String',), ('String',), ('String',)])
-
+
def test_select_is(self):
rset = self.execute('Any X, TN ORDERBY TN LIMIT 10 WHERE X is T, T name TN')
result, descr = rset.rows, rset.description
self.assertEquals(result[0][1], descr[0][0])
-
+
def test_select_is_aggr(self):
rset = self.execute('Any TN, COUNT(X) GROUPBY TN ORDERBY 2 DESC WHERE X is T, T name TN')
result, descr = rset.rows, rset.description
self.assertEquals(descr[0][0], 'String')
self.assertEquals(descr[0][1], 'Int')
self.assertEquals(result[0][0], 'CWRelation')
-
+
def test_select_groupby_orderby(self):
rset = self.execute('Any N GROUPBY N ORDERBY N WHERE X is CWGroup, X name N')
self.assertEquals(tuplify(rset.rows), [('guests',), ('managers',), ('owners',), ('users',)])
self.assertEquals(rset.description, [('String',), ('String',), ('String',), ('String',)])
-
+
def test_select_complex_groupby(self):
rset = self.execute('Any N GROUPBY N WHERE X name N')
rset = self.execute('Any N,MAX(D) GROUPBY N LIMIT 5 WHERE X name N, X creation_date D')
-
+
def test_select_inlined_groupby(self):
seid = self.execute('State X WHERE X name "deactivated"')[0][0]
rset = self.execute('Any U,L,S GROUPBY U,L,S WHERE X in_state S, U login L, S eid %s' % seid)
-
+
def test_select_complex_orderby(self):
rset1 = self.execute('Any N ORDERBY N WHERE X name N')
self.assertEquals(sorted(rset1.rows), rset1.rows)
rset = self.execute('Any N ORDERBY N LIMIT 5 OFFSET 1 WHERE X name N')
- self.assertEquals(rset.rows[0][0], rset1.rows[1][0])
+ self.assertEquals(rset.rows[0][0], rset1.rows[1][0])
self.assertEquals(len(rset), 5)
-
+
def test_select_5(self):
rset = self.execute('Any X, TMP ORDERBY TMP WHERE X name TMP, X is CWGroup')
self.assertEquals(tuplify(rset.rows), [(3, 'guests',), (1, 'managers',), (4, 'owners',), (2, 'users',)])
self.assertEquals(rset.description, [('CWGroup', 'String',), ('CWGroup', 'String',), ('CWGroup', 'String',), ('CWGroup', 'String',)])
-
+
def test_select_6(self):
self.execute("INSERT Personne X: X nom 'bidule'")[0]
rset = self.execute('Any Y where X name TMP, Y nom in (TMP, "bidule")')
@@ -274,7 +274,7 @@
self.assert_(('Personne',) in rset.description)
rset = self.execute('DISTINCT Any Y where X name TMP, Y nom in (TMP, "bidule")')
self.assert_(('Personne',) in rset.description)
-
+
def test_select_not_attr(self):
self.execute("INSERT Personne X: X nom 'bidule'")
self.execute("INSERT Societe X: X nom 'chouette'")
@@ -285,13 +285,13 @@
self.execute("SET P travaille S WHERE P nom 'bidule', S nom 'chouette'")
rset = self.execute('Personne X WHERE NOT X travaille S')
self.assertEquals(len(rset.rows), 0, rset.rows)
-
+
def test_select_is_in(self):
self.execute("INSERT Personne X: X nom 'bidule'")
self.execute("INSERT Societe X: X nom 'chouette'")
self.assertEquals(len(self.execute("Any X WHERE X is IN (Personne, Societe)")),
2)
-
+
def test_select_not_rel(self):
self.execute("INSERT Personne X: X nom 'bidule'")
self.execute("INSERT Societe X: X nom 'chouette'")
@@ -301,7 +301,7 @@
self.assertEquals(len(rset.rows), 1, rset.rows)
rset = self.execute('Personne X WHERE NOT X travaille S, S nom "chouette"')
self.assertEquals(len(rset.rows), 1, rset.rows)
-
+
def test_select_nonregr_inlined(self):
self.execute("INSERT Note X: X para 'bidule'")
self.execute("INSERT Personne X: X nom 'chouette'")
@@ -310,7 +310,7 @@
rset = self.execute('Any U,T ORDERBY T DESC WHERE U is CWUser, '
'N ecrit_par U, N type T')#, {'x': self.ueid})
self.assertEquals(len(rset.rows), 0)
-
+
def test_select_nonregr_edition_not(self):
groupeids = set((1, 2, 3))
groupreadperms = set(r[0] for r in self.execute('Any Y WHERE X name "CWGroup", Y eid IN(1, 2, 3), X read_permission Y'))
@@ -318,7 +318,7 @@
self.assertEquals(sorted(r[0] for r in rset.rows), sorted(groupeids - groupreadperms))
rset = self.execute('DISTINCT Any Y WHERE X name "CWGroup", Y eid IN(1, 2, 3), NOT X read_permission Y')
self.assertEquals(sorted(r[0] for r in rset.rows), sorted(groupeids - groupreadperms))
-
+
def test_select_outer_join(self):
peid1 = self.execute("INSERT Personne X: X nom 'bidule'")[0][0]
peid2 = self.execute("INSERT Personne X: X nom 'autre'")[0][0]
@@ -331,7 +331,7 @@
self.assertEquals(rset.rows, [[peid1, seid1], [peid2, None]])
rset = self.execute('Any S,X ORDERBY S WHERE X? travaille S')
self.assertEquals(rset.rows, [[seid1, peid1], [seid2, None]])
-
+
def test_select_outer_join_optimized(self):
peid1 = self.execute("INSERT Personne X: X nom 'bidule'")[0][0]
rset = self.execute('Any X WHERE X eid %(x)s, P? connait X', {'x':peid1}, 'x')
@@ -372,8 +372,8 @@
self.failUnless(['users', 'tag'] in rset.rows)
self.failUnless(['activated', None] in rset.rows)
rset = self.execute("Any GN,TN ORDERBY GN WHERE T tags G?, T name TN, G name GN")
- self.assertEquals(rset.rows, [[None, 'tagbis'], ['users', 'tag']])
-
+ self.assertEquals(rset.rows, [[None, 'tagbis'], ['users', 'tag']])
+
def test_select_not_inline_rel(self):
self.execute("INSERT Personne X: X nom 'bidule'")
self.execute("INSERT Note X: X type 'a'")
@@ -381,7 +381,7 @@
self.execute("SET X ecrit_par Y WHERE X type 'a', Y nom 'bidule'")
rset = self.execute('Note X WHERE NOT X ecrit_par P')
self.assertEquals(len(rset.rows), 1, rset.rows)
-
+
def test_select_not_unlinked_multiple_solutions(self):
self.execute("INSERT Personne X: X nom 'bidule'")
self.execute("INSERT Note X: X type 'a'")
@@ -395,13 +395,13 @@
self.assertEquals(len(rset.rows), 1)
self.assertEquals(len(rset.rows[0]), 1)
self.assertEquals(rset.description, [('Int',)])
-
+
def test_select_aggregat_sum(self):
rset = self.execute('Any SUM(O) WHERE X ordernum O')
self.assertEquals(len(rset.rows), 1)
self.assertEquals(len(rset.rows[0]), 1)
self.assertEquals(rset.description, [('Int',)])
-
+
def test_select_aggregat_min(self):
rset = self.execute('Any MIN(X) WHERE X is Personne')
self.assertEquals(len(rset.rows), 1)
@@ -411,7 +411,7 @@
self.assertEquals(len(rset.rows), 1)
self.assertEquals(len(rset.rows[0]), 1)
self.assertEquals(rset.description, [('Int',)])
-
+
def test_select_aggregat_max(self):
rset = self.execute('Any MAX(X) WHERE X is Personne')
self.assertEquals(len(rset.rows), 1)
@@ -444,7 +444,7 @@
rset = self.execute('Any X,N ORDERBY GROUP_SORT_VALUE(N) WHERE X is CWGroup, X name N, NOT U in_group X, U login "admin"')
self.failUnlessEqual(len(rset), 3)
self.failUnlessEqual(rset[0][1], 'owners')
-
+
def test_select_aggregat_sort(self):
rset = self.execute('Any G, COUNT(U) GROUPBY G ORDERBY 2 WHERE U in_group G')
self.assertEquals(len(rset.rows), 2)
@@ -475,7 +475,7 @@
result = rset.rows
result.sort()
self.assertEquals(tuplify(result), [(1,), (2,), (3,), (4,), (5,)])
-
+
def test_select_upper(self):
rset = self.execute('Any X, UPPER(L) ORDERBY L WHERE X is CWUser, X login L')
self.assertEquals(len(rset.rows), 2)
@@ -494,7 +494,7 @@
## self.assertEquals(rset.rows[0][0], 'admin')
## rset = self.execute('Any L WHERE %(x)s login L', {'x':ueid})
## self.assertEquals(rset.rows[0][0], 'admin')
-
+
def test_select_searchable_text_1(self):
rset = self.execute(u"INSERT Personne X: X nom 'bidüle'")
rset = self.execute(u"INSERT Societe X: X nom 'bidüle'")
@@ -509,7 +509,7 @@
self.failIf([r[0] for r in rset.rows if r[0] in biduleeids])
# duh?
rset = self.execute('Any X WHERE X has_text %(text)s', {'text': u'ça'})
-
+
def test_select_searchable_text_2(self):
rset = self.execute("INSERT Personne X: X nom 'bidule'")
rset = self.execute("INSERT Personne X: X nom 'chouette'")
@@ -517,7 +517,7 @@
self.commit()
rset = self.execute('Personne N where N has_text "bidule"')
self.assertEquals(len(rset.rows), 1, rset.rows)
-
+
def test_select_searchable_text_3(self):
rset = self.execute("INSERT Personne X: X nom 'bidule', X sexe 'M'")
rset = self.execute("INSERT Personne X: X nom 'bidule', X sexe 'F'")
@@ -525,7 +525,7 @@
self.commit()
rset = self.execute('Any X where X has_text "bidule" and X sexe "M"')
self.assertEquals(len(rset.rows), 1, rset.rows)
-
+
def test_select_multiple_searchable_text(self):
self.execute(u"INSERT Personne X: X nom 'bidüle'")
self.execute("INSERT Societe X: X nom 'chouette', S travaille X")
@@ -536,7 +536,7 @@
'text2': u'chouette',}
)
self.assertEquals(len(rset.rows), 1, rset.rows)
-
+
def test_select_no_descr(self):
rset = self.execute('Any X WHERE X is CWGroup', build_descr=0)
rset.rows.sort()
@@ -549,7 +549,7 @@
self.assertEquals(rset.description, [('CWGroup',), ('CWGroup',)])
rset = self.execute('CWGroup X ORDERBY N LIMIT 2 OFFSET 2 WHERE X name N')
self.assertEquals(tuplify(rset.rows), [(4,), (2,)])
-
+
def test_select_symetric(self):
self.execute("INSERT Personne X: X nom 'machin'")
self.execute("INSERT Personne X: X nom 'bidule'")
@@ -569,14 +569,14 @@
self.assertEquals(len(rset.rows), 2, rset.rows)
rset = self.execute('Any P where P2 connait P, P2 nom "chouette"')
self.assertEquals(len(rset.rows), 2, rset.rows)
-
+
def test_select_inline(self):
self.execute("INSERT Personne X: X nom 'bidule'")
self.execute("INSERT Note X: X type 'a'")
self.execute("SET X ecrit_par Y WHERE X type 'a', Y nom 'bidule'")
rset = self.execute('Any N where N ecrit_par X, X nom "bidule"')
self.assertEquals(len(rset.rows), 1, rset.rows)
-
+
def test_select_creation_date(self):
self.execute("INSERT Personne X: X nom 'bidule'")
rset = self.execute('Any D WHERE X nom "bidule", X creation_date D')
@@ -593,7 +593,7 @@
self.execute("SET P travaille S WHERE P nom 'chouette', S nom 'caesium'")
rset = self.execute('DISTINCT Any P WHERE P travaille S1 OR P travaille S2, S1 nom "logilab", S2 nom "caesium"')
self.assertEqual(len(rset.rows), 2)
-
+
def test_select_or_sym_relation(self):
self.execute("INSERT Personne X: X nom 'bidule'")
self.execute("INSERT Personne X: X nom 'chouette'")
@@ -608,7 +608,7 @@
self.assertEqual(len(rset.rows), 2, rset.rows)
rset = self.execute('DISTINCT Any P WHERE P connait S OR S connait P, S nom "chouette"')
self.assertEqual(len(rset.rows), 2, rset.rows)
-
+
def test_select_follow_relation(self):
self.execute("INSERT Affaire X: X sujet 'cool'")
self.execute("INSERT Societe X: X nom 'chouette'")
@@ -635,7 +635,7 @@
self.execute("INSERT Affaire X: X sujet 'abcd'")
rset = self.execute('DISTINCT Any S ORDERBY S WHERE A is Affaire, A sujet S')
self.assertEqual(rset.rows, [['abcd'], ['important'], ['minor'], ['normal'], ['zou']])
-
+
def test_select_ordered_distinct_3(self):
rset = self.execute('DISTINCT Any N ORDERBY GROUP_SORT_VALUE(N) WHERE X is CWGroup, X name N')
self.assertEqual(rset.rows, [['owners'], ['guests'], ['users'], ['managers']])
@@ -650,13 +650,13 @@
rset = self.execute('Any X,E WHERE X owned_by U, X eid E, U eid %(u)s', {'u': self.session.user.eid})
self.failUnless(rset)
self.assertEquals(rset.description[0][1], 'Int')
-
+
# def test_select_rewritten_optional(self):
# eid = self.execute("INSERT Affaire X: X sujet 'cool'")[0][0]
# rset = self.execute('Any X WHERE X eid %(x)s, EXISTS(X owned_by U) OR EXISTS(X concerne S?, S owned_by U)',
# {'x': eid}, 'x')
# self.assertEquals(rset.rows, [[eid]])
-
+
def test_today_bug(self):
self.execute("INSERT Tag X: X name 'bidule', X creation_date NOW")
self.execute("INSERT Tag Y: Y name 'toto'")
@@ -689,7 +689,7 @@
'Int', 'Interval',
'Password', 'String',
'Time'])
-
+
def test_select_constant(self):
rset = self.execute('Any X, "toto" ORDERBY X WHERE X is CWGroup')
self.assertEquals(rset.rows,
@@ -718,14 +718,14 @@
self.assertEquals(rset.description,
[('Transition', 'String'), ('State', 'String'),
('Transition', 'String'), ('State', 'String')])
-
+
def test_select_union_aggregat(self):
# meaningless, the goal in to have group by done on different attribute
# for each sub-query
self.execute('(Any N,COUNT(X) GROUPBY N WHERE X name N, X is State)'
' UNION '
'(Any N,COUNT(X) GROUPBY N ORDERBY 2 WHERE X login N)')
-
+
def test_select_union_aggregat_independant_group(self):
self.execute('INSERT State X: X name "hop"')
self.execute('INSERT State X: X name "hop"')
@@ -736,7 +736,7 @@
' UNION '
'(Any N,COUNT(X) GROUPBY N WHERE X name N, X is Transition HAVING COUNT(X)>1))')
self.assertEquals(rset.rows, [[u'hop', 2], [u'hop', 2]])
-
+
def test_select_union_selection_with_diff_variables(self):
rset = self.execute('(Any N WHERE X name N, X is State)'
' UNION '
@@ -746,7 +746,7 @@
'deactivate', 'deactivated', 'done', 'en cours',
'end', 'finie', 'markasdone', 'pitetre', 'redoit',
'start', 'todo'])
-
+
def test_exists(self):
geid = self.execute("INSERT CWGroup X: X name 'lulufanclub'")[0][0]
self.execute("SET U in_group G WHERE G name 'lulufanclub'")
@@ -784,15 +784,15 @@
rset = self.execute('Any X WITH X BEING ((Any NULL) UNION (Any "toto"))')
self.assertEquals(rset.rows, [[None], ['toto']])
self.assertEquals(rset.description, [(None,), ('String',)])
-
+
# insertion queries tests #################################################
-
+
def test_insert_is(self):
eid, = self.execute("INSERT Personne X: X nom 'bidule'")[0]
etype, = self.execute("Any TN WHERE X is T, X eid %s, T name TN" % eid)[0]
self.assertEquals(etype, 'Personne')
self.execute("INSERT Personne X: X nom 'managers'")
-
+
def test_insert_1(self):
rset = self.execute("INSERT Personne X: X nom 'bidule'")
self.assertEquals(len(rset.rows), 1)
@@ -819,7 +819,7 @@
self.execute("INSERT Personne X: X nom Y WHERE U login 'admin', U login Y")
rset = self.execute('Personne X WHERE X nom "admin"')
self.assert_(rset.rows)
- self.assertEquals(rset.description, [('Personne',)])
+ self.assertEquals(rset.description, [('Personne',)])
def test_insert_4(self):
self.execute("INSERT Societe Y: Y nom 'toto'")
@@ -827,7 +827,7 @@
rset = self.execute('Any X, Y WHERE X nom "bidule", Y nom "toto", X travaille Y')
self.assert_(rset.rows)
self.assertEquals(rset.description, [('Personne', 'Societe',)])
-
+
def test_insert_4bis(self):
peid = self.execute("INSERT Personne X: X nom 'bidule'")[0][0]
seid = self.execute("INSERT Societe Y: Y nom 'toto', X travaille Y WHERE X eid %(x)s",
@@ -836,7 +836,7 @@
self.execute("INSERT Personne X: X nom 'chouette', X travaille Y WHERE Y eid %(x)s",
{'x': str(seid)})
self.assertEqual(len(self.execute('Any X, Y WHERE X travaille Y')), 2)
-
+
def test_insert_4ter(self):
peid = self.execute("INSERT Personne X: X nom 'bidule'")[0][0]
seid = self.execute("INSERT Societe Y: Y nom 'toto', X travaille Y WHERE X eid %(x)s",
@@ -891,7 +891,7 @@
'E primary_email EM, EM address "X", E in_group G '
'WHERE G name "managers"')
self.assertEquals(list(rset.description[0]), ['CWUser', 'EmailAddress'])
-
+
# deletion queries tests ##################################################
def test_delete_1(self):
@@ -901,7 +901,7 @@
self.execute("DELETE Personne Y WHERE Y nom 'toto'")
rset = self.execute('Personne X WHERE X nom "toto"')
self.assertEqual(len(rset.rows), 0)
-
+
def test_delete_2(self):
rset = self.execute("INSERT Personne X, Personne Y, Societe Z : X nom 'syt', Y nom 'adim', Z nom 'Logilab', X travaille Z, Y travaille Z")
self.assertEquals(len(rset), 1)
@@ -960,7 +960,7 @@
self.assertEquals(len(sqlc.fetchall()), 0)
sqlc.execute('SELECT * FROM owned_by_relation WHERE eid_from=%s'%eeid)
self.assertEquals(len(sqlc.fetchall()), 0)
-
+
def test_nonregr_delete_cache2(self):
eid = self.execute("INSERT Folder T: T name 'toto'")[0][0]
self.commit()
@@ -979,7 +979,7 @@
self.assertEquals(rset.rows, [])
rset = self.execute("Folder X WHERE X eid %s" %eid)
self.assertEquals(rset.rows, [])
-
+
# update queries tests ####################################################
def test_update_1(self):
@@ -989,7 +989,7 @@
self.execute("SET X nom 'tutu', X prenom 'original' WHERE X is Personne, X nom 'toto'")
rset = self.execute('Any Y, Z WHERE X is Personne, X nom Y, X prenom Z')
self.assertEqual(tuplify(rset.rows), [('tutu', 'original')])
-
+
def test_update_2(self):
self.execute("INSERT Personne X, Societe Y: X nom 'bidule', Y nom 'toto'")
#rset = self.execute('Any X, Y WHERE X nom "bidule", Y nom "toto"')
@@ -999,7 +999,7 @@
self.execute("SET X travaille Y WHERE X nom 'bidule', Y nom 'toto'")
rset = self.execute('Any X, Y WHERE X travaille Y')
self.assertEqual(len(rset.rows), 1)
-
+
def test_update_2bis(self):
rset = self.execute("INSERT Personne X, Societe Y: X nom 'bidule', Y nom 'toto'")
eid1, eid2 = rset[0][0], rset[0][1]
@@ -1007,7 +1007,7 @@
{'x': str(eid1), 'y': str(eid2)})
rset = self.execute('Any X, Y WHERE X travaille Y')
self.assertEqual(len(rset.rows), 1)
-
+
def test_update_2ter(self):
rset = self.execute("INSERT Personne X, Societe Y: X nom 'bidule', Y nom 'toto'")
eid1, eid2 = rset[0][0], rset[0][1]
@@ -1015,10 +1015,10 @@
{'x': unicode(eid1), 'y': unicode(eid2)})
rset = self.execute('Any X, Y WHERE X travaille Y')
self.assertEqual(len(rset.rows), 1)
-
+
## def test_update_4(self):
## self.execute("SET X know Y WHERE X ami Y")
-
+
def test_update_multiple1(self):
peid1 = self.execute("INSERT Personne Y: Y nom 'tutu'")[0][0]
peid2 = self.execute("INSERT Personne Y: Y nom 'toto'")[0][0]
@@ -1054,16 +1054,16 @@
self.execute('SET X title XN + %(suffix)s WHERE X is Bookmark, X title XN', {'suffix': u'-moved'})
newname = self.execute('Any XN WHERE X eid %(x)s, X title XN', {'x': beid}, 'x')[0][0]
self.assertEquals(newname, 'toto-moved')
-
+
def test_update_query_error(self):
self.execute("INSERT Personne Y: Y nom 'toto'")
self.assertRaises(Exception, self.execute, "SET X nom 'toto', X is Personne")
self.assertRaises(QueryError, self.execute, "SET X nom 'toto', X has_text 'tutu' WHERE X is Personne")
self.assertRaises(QueryError, self.execute, "SET X login 'tutu', X eid %s" % cnx.user(self.session).eid)
-
+
# upassword encryption tests #################################################
-
+
def test_insert_upassword(self):
rset = self.execute("INSERT CWUser X: X login 'bob', X upassword 'toto'")
self.assertEquals(len(rset.rows), 1)
@@ -1074,11 +1074,11 @@
cursor.execute("SELECT %supassword from %sCWUser WHERE %slogin='bob'"
% (SQL_PREFIX, SQL_PREFIX, SQL_PREFIX))
passwd = cursor.fetchone()[0].getvalue()
- self.assertEquals(passwd, crypt_password('toto', passwd[:2]))
+ self.assertEquals(passwd, crypt_password('toto', passwd[:2]))
rset = self.execute("Any X WHERE X is CWUser, X login 'bob', X upassword '%s'" % passwd)
self.assertEquals(len(rset.rows), 1)
self.assertEquals(rset.description, [('CWUser',)])
-
+
def test_update_upassword(self):
cursor = self.pool['system']
rset = self.execute("INSERT CWUser X: X login 'bob', X upassword %(pwd)s", {'pwd': 'toto'})
@@ -1088,13 +1088,13 @@
cursor.execute("SELECT %supassword from %sCWUser WHERE %slogin='bob'"
% (SQL_PREFIX, SQL_PREFIX, SQL_PREFIX))
passwd = cursor.fetchone()[0].getvalue()
- self.assertEquals(passwd, crypt_password('tutu', passwd[:2]))
+ self.assertEquals(passwd, crypt_password('tutu', passwd[:2]))
rset = self.execute("Any X WHERE X is CWUser, X login 'bob', X upassword '%s'" % passwd)
self.assertEquals(len(rset.rows), 1)
self.assertEquals(rset.description, [('CWUser',)])
# non regression tests ####################################################
-
+
def test_nonregr_1(self):
teid = self.execute("INSERT Tag X: X name 'tag'")[0][0]
self.execute("SET X tags Y WHERE X name 'tag', Y is State, Y name 'activated'")
@@ -1113,7 +1113,7 @@
rset = self.execute('Any X WHERE E eid %(x)s, E tags X',
{'x': teid})
self.assertEquals(rset.rows, [[geid]])
-
+
def test_nonregr_3(self):
"""bad sql generated on the second query (destination_state is not
detected as an inlined relation)
@@ -1151,10 +1151,10 @@
self.assertEquals(rset1.rows, rset2.rows)
self.assertEquals(rset1.rows, rset3.rows)
self.assertEquals(rset1.rows, rset4.rows)
-
+
def test_nonregr_6(self):
self.execute('Any N,COUNT(S) GROUPBY N ORDERBY COUNT(N) WHERE S name N, S is State')
-
+
def test_sqlite_encoding(self):
"""XXX this test was trying to show a bug on use of lower which only
occurs with non ascii string and misconfigured locale
@@ -1215,7 +1215,7 @@
"""
self.skip('retry me once http://www.sqlite.org/cvstrac/tktview?tn=3773 is fixed')
self.execute('Any X ORDERBY D DESC WHERE X creation_date D')
-
+
def test_nonregr_extra_joins(self):
ueid = self.session.user.eid
teid1 = self.execute("INSERT Folder X: X name 'folder1'")[0][0]
@@ -1251,7 +1251,7 @@
"E firstname %(firstname)s, E surname %(surname)s "
"WHERE E eid %(x)s, G name 'users', S name 'activated'",
{'x':ueid, 'firstname': u'jean', 'surname': u'paul'}, 'x')
-
+
def test_nonregr_u_owned_by_u(self):
ueid = self.execute("INSERT CWUser X: X login 'bob', X upassword 'toto', X in_group G "
"WHERE G name 'users'")[0][0]
--- a/server/test/unittest_repository.py Wed May 13 16:28:21 2009 +0200
+++ b/server/test/unittest_repository.py Wed May 13 16:59:50 2009 +0200
@@ -17,7 +17,7 @@
from cubicweb.dbapi import connect, repo_connect
from cubicweb.devtools.apptest import RepositoryBasedTC
from cubicweb.devtools.repotest import tuplify
-from cubicweb.server import repository
+from cubicweb.server import repository
from cubicweb.server.sqlutils import SQL_PREFIX
@@ -29,10 +29,10 @@
""" singleton providing access to a persistent storage for entities
and relation
"""
-
+
# def setUp(self):
# pass
-
+
# def tearDown(self):
# self.repo.config.db_perms = True
# cnxid = self.repo.connect(*self.default_user_password())
@@ -64,7 +64,7 @@
(u'String',), (u'Time',)])
finally:
self.repo._free_pool(pool)
-
+
def test_schema_has_owner(self):
repo = self.repo
cnxid = repo.connect(*self.default_user_password())
@@ -74,7 +74,7 @@
self.failIf(repo.execute(cnxid, 'CWRelation X WHERE NOT X owned_by U'))
self.failIf(repo.execute(cnxid, 'CWConstraint X WHERE NOT X owned_by U'))
self.failIf(repo.execute(cnxid, 'CWConstraintType X WHERE NOT X owned_by U'))
-
+
def test_connect(self):
login, passwd = self.default_user_password()
self.assert_(self.repo.connect(login, passwd))
@@ -84,7 +84,7 @@
self.repo.connect, login, None)
self.assertRaises(AuthenticationError,
self.repo.connect, None, None)
-
+
def test_execute(self):
repo = self.repo
cnxid = repo.connect(*self.default_user_password())
@@ -93,7 +93,7 @@
repo.execute(cnxid, 'Any X where X is Personne, X nom ~= "to"')
repo.execute(cnxid, 'Any X WHERE X has_text %(text)s', {'text': u'\xe7a'})
repo.close(cnxid)
-
+
def test_login_upassword_accent(self):
repo = self.repo
cnxid = repo.connect(*self.default_user_password())
@@ -102,27 +102,28 @@
repo.commit(cnxid)
repo.close(cnxid)
self.assert_(repo.connect(u"barnabé", u"héhéhé".encode('UTF8')))
-
+
def test_invalid_entity_rollback(self):
repo = self.repo
cnxid = repo.connect(*self.default_user_password())
+ # no group
repo.execute(cnxid, 'INSERT CWUser X: X login %(login)s, X upassword %(passwd)s, X in_state S WHERE S name "activated"',
{'login': u"tutetute", 'passwd': 'tutetute'})
self.assertRaises(ValidationError, repo.commit, cnxid)
rset = repo.execute(cnxid, 'CWUser X WHERE X login "tutetute"')
self.assertEquals(rset.rowcount, 0)
-
+
def test_close(self):
repo = self.repo
cnxid = repo.connect(*self.default_user_password())
self.assert_(cnxid)
repo.close(cnxid)
self.assertRaises(BadConnectionId, repo.execute, cnxid, 'Any X')
-
+
def test_invalid_cnxid(self):
self.assertRaises(BadConnectionId, self.repo.execute, 0, 'Any X')
self.assertRaises(BadConnectionId, self.repo.close, None)
-
+
def test_shared_data(self):
repo = self.repo
cnxid = repo.connect(*self.default_user_password())
@@ -179,7 +180,7 @@
repo.rollback(cnxid)
result = repo.execute(cnxid, "Any U WHERE U in_group G, U login 'admin', G name 'guests'")
self.assertEquals(result.rowcount, 0, result.rows)
-
+
def test_transaction_base3(self):
repo = self.repo
cnxid = repo.connect(*self.default_user_password())
@@ -194,7 +195,7 @@
repo.rollback(cnxid)
rset = repo.execute(cnxid, 'TrInfo T WHERE T wf_info_for X, X eid %(x)s', {'x': ueid})
self.assertEquals(len(rset), 1)
-
+
def test_transaction_interleaved(self):
self.skip('implement me')
@@ -202,11 +203,11 @@
schema = self.repo.schema
# check order of attributes is respected
self.assertListEquals([r.type for r in schema.eschema('CWAttribute').ordered_relations()
- if not r.type in ('eid', 'is', 'is_instance_of', 'identity',
+ if not r.type in ('eid', 'is', 'is_instance_of', 'identity',
'creation_date', 'modification_date',
'owned_by', 'created_by')],
['relation_type', 'from_entity', 'to_entity', 'constrained_by',
- 'cardinality', 'ordernum',
+ 'cardinality', 'ordernum',
'indexed', 'fulltextindexed', 'internationalizable',
'defaultval', 'description_format', 'description'])
@@ -254,14 +255,14 @@
t.join()
finally:
repository.pyro_unregister(self.repo.config)
-
+
def _pyro_client(self, lock):
cnx = connect(self.repo.config.appid, u'admin', 'gingkow')
# check we can get the schema
schema = cnx.get_schema()
self.assertEquals(schema.__hashmode__, None)
rset = cnx.cursor().execute('Any U,G WHERE U in_group G')
-
+
def test_internal_api(self):
repo = self.repo
@@ -301,10 +302,10 @@
repo.close(cnxid)
self.assertRaises(BadConnectionId, repo.set_shared_data, cnxid, 'data', 0)
self.assertRaises(BadConnectionId, repo.get_shared_data, cnxid, 'data')
-
+
class DataHelpersTC(RepositoryBasedTC):
-
+
def setUp(self):
""" called before each test from this class """
cnxid = self.repo.connect(*self.default_user_password())
@@ -313,7 +314,7 @@
def tearDown(self):
self.session.rollback()
-
+
def test_create_eid(self):
self.assert_(self.repo.system_source.create_eid(self.session))
@@ -326,10 +327,10 @@
def test_type_from_eid(self):
self.assertEquals(self.repo.type_from_eid(1, self.session), 'CWGroup')
-
+
def test_type_from_eid_raise(self):
self.assertRaises(UnknownEid, self.repo.type_from_eid, -2, self.session)
-
+
def test_add_delete_info(self):
entity = self.repo.vreg.etype_class('Personne')(self.session, None, None)
entity.eid = -1
@@ -350,7 +351,7 @@
class FTITC(RepositoryBasedTC):
-
+
def test_reindex_and_modified_since(self):
cursor = self.session.pool['system']
eidp = self.execute('INSERT Personne X: X nom "toto", X prenom "tutu"')[0][0]
@@ -400,18 +401,18 @@
self.commit()
rset = self.execute('Any X WHERE X has_text %(t)s', {'t': 'tutu'})
self.assertEquals(rset.rows, [[self.session.user.eid]])
-
-
+
+
class DBInitTC(RepositoryBasedTC):
-
+
def test_versions_inserted(self):
inserted = [r[0] for r in self.execute('Any K ORDERBY K WHERE P pkey K, P pkey ~= "system.version.%"')]
self.assertEquals(inserted,
- [u'system.version.basket', u'system.version.comment',
- u'system.version.cubicweb', u'system.version.email',
- u'system.version.file', u'system.version.folder',
+ [u'system.version.basket', u'system.version.card', u'system.version.comment',
+ u'system.version.cubicweb', u'system.version.email',
+ u'system.version.file', u'system.version.folder',
u'system.version.tag'])
-
+
class InlineRelHooksTC(RepositoryBasedTC):
"""test relation hooks are called for inlined relations
"""
@@ -419,13 +420,13 @@
RepositoryBasedTC.setUp(self)
self.hm = self.repo.hm
self.called = []
-
+
def _before_relation_hook(self, pool, fromeid, rtype, toeid):
self.called.append((fromeid, rtype, toeid))
def _after_relation_hook(self, pool, fromeid, rtype, toeid):
self.called.append((fromeid, rtype, toeid))
-
+
def test_before_add_inline_relation(self):
"""make sure before_<event>_relation hooks are called directly"""
self.hm.register_hook(self._before_relation_hook,
@@ -434,7 +435,7 @@
eidn = self.execute('INSERT Note X: X type "T"')[0][0]
self.execute('SET N ecrit_par Y WHERE N type "T", Y nom "toto"')
self.assertEquals(self.called, [(eidn, 'ecrit_par', eidp)])
-
+
def test_after_add_inline_relation(self):
"""make sure after_<event>_relation hooks are deferred"""
self.hm.register_hook(self._after_relation_hook,
@@ -444,7 +445,7 @@
self.assertEquals(self.called, [])
self.execute('SET N ecrit_par Y WHERE N type "T", Y nom "toto"')
self.assertEquals(self.called, [(eidn, 'ecrit_par', eidp,)])
-
+
def test_after_add_inline(self):
"""make sure after_<event>_relation hooks are deferred"""
self.hm.register_hook(self._after_relation_hook,
@@ -452,7 +453,7 @@
eidp = self.execute('INSERT CWUser X: X login "toto", X upassword "tutu", X in_state S WHERE S name "activated"')[0][0]
eids = self.execute('State X WHERE X name "activated"')[0][0]
self.assertEquals(self.called, [(eidp, 'in_state', eids,)])
-
+
def test_before_delete_inline_relation(self):
"""make sure before_<event>_relation hooks are called directly"""
self.hm.register_hook(self._before_relation_hook,
@@ -477,6 +478,6 @@
self.execute('DELETE N ecrit_par Y WHERE N type "T", Y nom "toto"')
self.assertEquals(self.called, [(eidn, 'ecrit_par', eidp,)])
-
+
if __name__ == '__main__':
unittest_main()
--- a/server/test/unittest_rql2sql.py Wed May 13 16:28:21 2009 +0200
+++ b/server/test/unittest_rql2sql.py Wed May 13 16:59:50 2009 +0200
@@ -45,10 +45,10 @@
("Any X WHERE X is Affaire",
'''SELECT X.cw_eid
FROM cw_Affaire AS X'''),
-
+
("Any X WHERE X eid 0",
'''SELECT 0'''),
-
+
("Personne P",
'''SELECT P.cw_eid
FROM cw_Personne AS P'''),
@@ -56,12 +56,12 @@
("Personne P WHERE P test TRUE",
'''SELECT P.cw_eid
FROM cw_Personne AS P
-WHERE P.cw_test=True'''),
+WHERE P.cw_test=TRUE'''),
("Personne P WHERE P test false",
'''SELECT P.cw_eid
FROM cw_Personne AS P
-WHERE P.cw_test=False'''),
+WHERE P.cw_test=FALSE'''),
("Personne P WHERE P eid -1",
'''SELECT -1'''),
@@ -160,7 +160,7 @@
'''SELECT S.cw_eid
FROM cw_Societe AS S
WHERE ((S.cw_nom=Logilab) OR (S.cw_nom=Caesium))'''),
-
+
('Any X WHERE X nom "toto", X eid IN (9700, 9710, 1045, 674)',
'''SELECT X.cw_eid
FROM cw_Division AS X
@@ -207,7 +207,7 @@
'''SELECT N.cw_eid
FROM cw_Note AS N, evaluee_relation AS rel_evaluee0, todo_by_relation AS rel_todo_by1
WHERE ((rel_evaluee0.eid_to=N.cw_eid) OR (rel_todo_by1.eid_from=N.cw_eid))'''),
-
+
("Any X WHERE X concerne B or C concerne X, B eid 12, C eid 13",
'''SELECT X.cw_eid
FROM concerne_relation AS rel_concerne0, concerne_relation AS rel_concerne1, cw_Affaire AS X
@@ -226,19 +226,19 @@
('Any X WHERE T tags X',
'''SELECT rel_tags0.eid_to
FROM tags_relation AS rel_tags0'''),
-
+
('Any X WHERE X in_basket B, B eid 12',
'''SELECT rel_in_basket0.eid_from
FROM in_basket_relation AS rel_in_basket0
WHERE rel_in_basket0.eid_to=12'''),
-
+
('Any SEN,RN,OEN WHERE X from_entity SE, SE eid 44, X relation_type R, R eid 139, X to_entity OE, OE eid 42, R name RN, SE name SEN, OE name OEN',
'''SELECT SE.cw_name, R.cw_name, OE.cw_name
-FROM cw_CWEType AS OE, cw_CWEType AS SE, cw_CWAttribute AS X, cw_CWRType AS R
+FROM cw_CWAttribute AS X, cw_CWEType AS OE, cw_CWEType AS SE, cw_CWRType AS R
WHERE X.cw_from_entity=44 AND SE.cw_eid=44 AND X.cw_relation_type=139 AND R.cw_eid=139 AND X.cw_to_entity=42 AND OE.cw_eid=42
UNION ALL
SELECT SE.cw_name, R.cw_name, OE.cw_name
-FROM cw_CWEType AS OE, cw_CWEType AS SE, cw_CWRelation AS X, cw_CWRType AS R
+FROM cw_CWEType AS OE, cw_CWEType AS SE, cw_CWRType AS R, cw_CWRelation AS X
WHERE X.cw_from_entity=44 AND SE.cw_eid=44 AND X.cw_relation_type=139 AND R.cw_eid=139 AND X.cw_to_entity=42 AND OE.cw_eid=42'''),
# Any O WHERE NOT S corrected_in O, S eid %(x)s, S concerns P, O version_of P, O in_state ST, NOT ST name "published", O modification_date MTIME ORDERBY MTIME DESC LIMIT 9
@@ -264,7 +264,7 @@
FROM evaluee_relation AS rel_evaluee1, todo_by_relation AS rel_todo_by0
WHERE rel_evaluee1.eid_to=rel_todo_by0.eid_from AND rel_todo_by0.eid_to=2 AND rel_evaluee1.eid_from=3'''),
-
+
(' Any X,U WHERE C owned_by U, NOT X owned_by U, C eid 1, X eid 2',
'''SELECT 2, rel_owned_by0.eid_to
FROM owned_by_relation AS rel_owned_by0
@@ -279,7 +279,7 @@
"""SELECT C.cw_eid
FROM cw_Card AS C
WHERE EXISTS(SELECT 1 FROM documented_by_relation AS rel_documented_by0 WHERE rel_documented_by0.eid_to=C.cw_eid)"""),
-
+
('Any C WHERE C is Card, EXISTS(X documented_by C, X eid 12)',
"""SELECT C.cw_eid
FROM cw_Card AS C
@@ -323,7 +323,7 @@
'''SELECT Y.cw_login
FROM cw_CWUser AS X, cw_CWUser AS Y
WHERE X.cw_login=admin AND NOT X.cw_eid=Y.cw_eid'''),
-
+
('Any L WHERE X login "admin", X identity Y?, Y login L',
'''SELECT Y.cw_login
FROM cw_CWUser AS X LEFT OUTER JOIN cw_CWUser AS Y ON (X.cw_eid=Y.cw_eid)
@@ -438,7 +438,7 @@
SELECT DISTINCT X.cw_eid, Y.cw_eid
FROM cw_CWRType AS X, cw_RQLExpression AS Y
WHERE X.cw_name=CWGroup AND Y.cw_eid IN(1, 2, 3) AND NOT EXISTS(SELECT 1 FROM read_permission_relation AS rel_read_permission0 WHERE rel_read_permission0.eid_from=X.cw_eid AND rel_read_permission0.eid_to=Y.cw_eid)'''),
-
+
# neged relation, can't be inveriant
('Any X,Y WHERE X name "CWGroup", Y eid IN(1, 2, 3), NOT X read_permission Y',
'''SELECT X.cw_eid, Y.cw_eid
@@ -497,7 +497,7 @@
SELECT X.cw_eid AS C0, X.cw_name AS C1
FROM cw_Transition AS X) AS T1
GROUP BY T1.C1'''),
-
+
('Any MAX(X)+MIN(LENGTH(D)), N GROUPBY N ORDERBY 1, N, DF WHERE X name N, X data D, X data_format DF;',
'''SELECT (MAX(T1.C1) + MIN(LENGTH(T1.C0))), T1.C2 FROM (SELECT X.cw_data AS C0, X.cw_eid AS C1, X.cw_name AS C2, X.cw_data_format AS C3
FROM cw_File AS X
@@ -511,7 +511,7 @@
'''SELECT T1.C0 FROM (SELECT DISTINCT A.cw_sujet AS C0, A.cw_ref AS C1
FROM cw_Affaire AS A
ORDER BY 2) AS T1'''),
-
+
('DISTINCT Any MAX(X)+MIN(LENGTH(D)), N GROUPBY N ORDERBY 2, DF WHERE X name N, X data D, X data_format DF;',
'''SELECT T1.C0,T1.C1 FROM (SELECT DISTINCT (MAX(T1.C1) + MIN(LENGTH(T1.C0))) AS C0, T1.C2 AS C1, T1.C3 AS C2 FROM (SELECT DISTINCT X.cw_data AS C0, X.cw_eid AS C1, X.cw_name AS C2, X.cw_data_format AS C3
FROM cw_File AS X
@@ -528,7 +528,7 @@
FROM cw_Tag AS T
WHERE NOT (T.cw_name IN(t1, t2)) AND EXISTS(SELECT 1 FROM tags_relation AS rel_tags0, cw_CWGroup AS X WHERE rel_tags0.eid_from=T.cw_eid AND rel_tags0.eid_to=X.cw_eid UNION SELECT 1 FROM tags_relation AS rel_tags1, cw_CWUser AS X WHERE rel_tags1.eid_from=T.cw_eid AND rel_tags1.eid_to=X.cw_eid)'''),
- # must not use a relation in EXISTS scope to inline a variable
+ # must not use a relation in EXISTS scope to inline a variable
('Any U WHERE U eid IN (1,2), EXISTS(X owned_by U)',
'''SELECT U.cw_eid
FROM cw_CWUser AS U
@@ -551,17 +551,17 @@
('Any MAX(X) WHERE X is Note',
'''SELECT MAX(X.cw_eid)
FROM cw_Note AS X'''),
-
+
('Any X WHERE X eid > 12',
'''SELECT X.eid
FROM entities AS X
WHERE X.eid>12'''),
-
+
('Any X WHERE X eid > 12, X is Note',
"""SELECT X.eid
FROM entities AS X
WHERE X.type='Note' AND X.eid>12"""),
-
+
('Any X, T WHERE X eid > 12, X title T',
"""SELECT X.cw_eid, X.cw_title
FROM cw_Bookmark AS X
@@ -581,14 +581,14 @@
('Any X GROUPBY X WHERE X eid 12',
'''SELECT 12'''),
-
+
('Any X GROUPBY X ORDERBY Y WHERE X eid 12, X login Y',
'''SELECT X.cw_eid
FROM cw_CWUser AS X
WHERE X.cw_eid=12
GROUP BY X.cw_eid
ORDER BY X.cw_login'''),
-
+
('Any U,COUNT(X) GROUPBY U WHERE U eid 12, X owned_by U HAVING COUNT(X) > 10',
'''SELECT rel_owned_by0.eid_to, COUNT(rel_owned_by0.eid_from)
FROM owned_by_relation AS rel_owned_by0
@@ -600,7 +600,7 @@
'''SELECT T1.C0 FROM (SELECT DISTINCT U.cw_login AS C0, STOCKPROC(U.cw_login) AS C1
FROM cw_CWUser AS U
ORDER BY 2) AS T1'''),
-
+
('DISTINCT Any X ORDERBY Y WHERE B bookmarked_by X, X login Y',
'''SELECT T1.C0 FROM (SELECT DISTINCT X.cw_eid AS C0, X.cw_login AS C1
FROM bookmarked_by_relation AS rel_bookmarked_by0, cw_CWUser AS X
@@ -642,27 +642,27 @@
'''SELECT X.cw_eid
FROM cw_Personne AS X
WHERE NOT EXISTS(SELECT 1 FROM evaluee_relation AS rel_evaluee0 WHERE rel_evaluee0.eid_from=X.cw_eid)'''),
-
+
("Note N WHERE NOT X evaluee N, X eid 0",
'''SELECT N.cw_eid
FROM cw_Note AS N
WHERE NOT EXISTS(SELECT 1 FROM evaluee_relation AS rel_evaluee0 WHERE rel_evaluee0.eid_from=0 AND rel_evaluee0.eid_to=N.cw_eid)'''),
-
+
('Any X WHERE NOT X travaille S, X is Personne',
'''SELECT X.cw_eid
FROM cw_Personne AS X
WHERE NOT EXISTS(SELECT 1 FROM travaille_relation AS rel_travaille0 WHERE rel_travaille0.eid_from=X.cw_eid)'''),
-
+
("Personne P where not P datenaiss TODAY",
'''SELECT P.cw_eid
FROM cw_Personne AS P
WHERE NOT (DATE(P.cw_datenaiss)=CURRENT_DATE)'''),
-
+
("Personne P where NOT P concerne A",
'''SELECT P.cw_eid
FROM cw_Personne AS P
WHERE NOT EXISTS(SELECT 1 FROM concerne_relation AS rel_concerne0 WHERE rel_concerne0.eid_from=P.cw_eid)'''),
-
+
("Affaire A where not P concerne A",
'''SELECT A.cw_eid
FROM cw_Affaire AS A
@@ -676,7 +676,7 @@
'''SELECT rel_tags0.eid_to
FROM tags_relation AS rel_tags0
WHERE NOT (rel_tags0.eid_from=28258)'''),
-
+
('Any S WHERE T is Tag, T name TN, NOT T eid 28258, T tags S, S name SN',
'''SELECT S.cw_eid
FROM cw_CWGroup AS S, cw_Tag AS T, tags_relation AS rel_tags0
@@ -690,7 +690,7 @@
FROM cw_Tag AS S, cw_Tag AS T, tags_relation AS rel_tags0
WHERE NOT (T.cw_eid=28258) AND rel_tags0.eid_from=T.cw_eid AND rel_tags0.eid_to=S.cw_eid'''),
-
+
('Any X,Y WHERE X created_by Y, X eid 5, NOT Y eid 6',
'''SELECT 5, rel_created_by0.eid_to
FROM created_by_relation AS rel_created_by0
@@ -703,11 +703,11 @@
('Any Y WHERE NOT Y evaluee X',
'''SELECT Y.cw_eid
-FROM cw_Division AS Y
+FROM cw_CWUser AS Y
WHERE NOT EXISTS(SELECT 1 FROM evaluee_relation AS rel_evaluee0 WHERE rel_evaluee0.eid_from=Y.cw_eid)
UNION ALL
SELECT Y.cw_eid
-FROM cw_CWUser AS Y
+FROM cw_Division AS Y
WHERE NOT EXISTS(SELECT 1 FROM evaluee_relation AS rel_evaluee0 WHERE rel_evaluee0.eid_from=Y.cw_eid)
UNION ALL
SELECT Y.cw_eid
@@ -726,7 +726,7 @@
'''SELECT X.cw_eid
FROM cw_Note AS X
WHERE NOT EXISTS(SELECT 1 FROM evaluee_relation AS rel_evaluee0,cw_CWUser AS Y WHERE rel_evaluee0.eid_from=Y.cw_eid AND rel_evaluee0.eid_to=X.cw_eid)'''),
-
+
('Any X,T WHERE X title T, NOT X is Bookmark',
'''SELECT DISTINCT X.cw_eid, X.cw_title
FROM cw_Card AS X
@@ -816,7 +816,8 @@
),
('Any GN, TN ORDERBY GN WHERE T tags G?, T name TN, G name GN',
- '''SELECT _T0.C1, T.cw_name
+ '''
+SELECT _T0.C1, T.cw_name
FROM cw_Tag AS T LEFT OUTER JOIN tags_relation AS rel_tags0 ON (rel_tags0.eid_from=T.cw_eid) LEFT OUTER JOIN (SELECT G.cw_eid AS C0, G.cw_name AS C1
FROM cw_CWGroup AS G
UNION ALL
@@ -866,10 +867,11 @@
('Any T,G,S WHERE T tags G?, G in_state S?, S name "hop", G is CWUser',
'''SELECT T.cw_eid, G.cw_eid, S.cw_eid
FROM cw_Tag AS T LEFT OUTER JOIN tags_relation AS rel_tags0 ON (rel_tags0.eid_from=T.cw_eid) LEFT OUTER JOIN cw_CWUser AS G ON (rel_tags0.eid_to=G.cw_eid) LEFT OUTER JOIN cw_State AS S ON (G.cw_in_state=S.cw_eid AND S.cw_name=hop)'''),
-
+
# two optional variables with additional restriction on an ambigous inlined relation
('Any T,G,S WHERE T tags G?, G in_state S?, S name "hop"',
- '''SELECT T.cw_eid, _T0.C0, _T0.C1
+ '''
+SELECT T.cw_eid, _T0.C0, _T0.C1
FROM cw_Tag AS T LEFT OUTER JOIN tags_relation AS rel_tags0 ON (rel_tags0.eid_from=T.cw_eid) LEFT OUTER JOIN (SELECT G.cw_eid AS C0, S.cw_eid AS C1
FROM cw_Affaire AS G LEFT OUTER JOIN cw_State AS S ON (G.cw_in_state=S.cw_eid AND S.cw_name=hop)
UNION ALL
@@ -886,7 +888,7 @@
'''SELECT rel_travaille0.eid_from
FROM cw_Societe AS S, travaille_relation AS rel_travaille0
WHERE rel_travaille0.eid_to=S.cw_eid AND S.cw_fax=S.cw_tel'''),
-
+
("Personne P where X eid 0, X creation_date D, P datenaiss < D, X is Affaire",
'''SELECT P.cw_eid
FROM cw_Affaire AS X, cw_Personne AS P
@@ -959,29 +961,29 @@
# FROM connait_relation AS rel_connait0
# WHERE rel_connait0.eid_to=0'''
),
-
+
('Any P WHERE X connait P',
'''SELECT DISTINCT P.cw_eid
FROM connait_relation AS rel_connait0, cw_Personne AS P
WHERE (rel_connait0.eid_to=P.cw_eid OR rel_connait0.eid_from=P.cw_eid)'''
),
-
+
('Any X WHERE X connait P',
'''SELECT DISTINCT X.cw_eid
FROM connait_relation AS rel_connait0, cw_Personne AS X
WHERE (rel_connait0.eid_from=X.cw_eid OR rel_connait0.eid_to=X.cw_eid)'''
),
-
+
('Any P WHERE X eid 0, NOT X connait P',
'''SELECT P.cw_eid
FROM cw_Personne AS P
WHERE NOT EXISTS(SELECT 1 FROM connait_relation AS rel_connait0 WHERE (rel_connait0.eid_from=0 AND rel_connait0.eid_to=P.cw_eid OR rel_connait0.eid_to=0 AND rel_connait0.eid_from=P.cw_eid))'''),
-
+
('Any P WHERE NOT X connait P',
'''SELECT P.cw_eid
FROM cw_Personne AS P
WHERE NOT EXISTS(SELECT 1 FROM connait_relation AS rel_connait0 WHERE (rel_connait0.eid_to=P.cw_eid OR rel_connait0.eid_from=P.cw_eid))'''),
-
+
('Any X WHERE NOT X connait P',
'''SELECT X.cw_eid
FROM cw_Personne AS X
@@ -991,7 +993,7 @@
'''SELECT DISTINCT P.cw_eid
FROM connait_relation AS rel_connait0, cw_Personne AS P
WHERE (rel_connait0.eid_to=P.cw_eid OR rel_connait0.eid_from=P.cw_eid) AND P.cw_nom=nom'''),
-
+
('Any X WHERE X connait P, P nom "nom"',
'''SELECT DISTINCT X.cw_eid
FROM connait_relation AS rel_connait0, cw_Personne AS P, cw_Personne AS X
@@ -1018,12 +1020,12 @@
'''SELECT P.cw_eid, P.cw_nom
FROM cw_Note AS N, cw_Personne AS P
WHERE N.cw_ecrit_par=P.cw_eid AND N.cw_eid=0'''),
-
+
('Any N WHERE NOT N ecrit_par P, P nom "toto"',
'''SELECT DISTINCT N.cw_eid
FROM cw_Note AS N, cw_Personne AS P
WHERE (N.cw_ecrit_par IS NULL OR N.cw_ecrit_par!=P.cw_eid) AND P.cw_nom=toto'''),
-
+
('Any P WHERE N ecrit_par P, N eid 0',
'''SELECT N.cw_ecrit_par
FROM cw_Note AS N
@@ -1055,7 +1057,7 @@
SELECT S.cw_in_state
FROM cw_Note AS S
WHERE S.cw_eid=0 AND S.cw_in_state IS NOT NULL''')
-
+
]
INTERSECT = [
@@ -1080,7 +1082,7 @@
SELECT X.cw_nom
FROM cw_Personne AS X
WHERE NOT EXISTS(SELECT 1 FROM travaille_relation AS rel_travaille0,cw_Societe AS S WHERE rel_travaille0.eid_from=X.cw_eid AND rel_travaille0.eid_to=S.cw_eid)'''),
-
+
('Any PN WHERE NOT X travaille S, S nom PN, S is IN(Division, Societe)',
'''SELECT S.cw_nom
FROM cw_Division AS S
@@ -1089,7 +1091,7 @@
SELECT S.cw_nom
FROM cw_Societe AS S
WHERE NOT EXISTS(SELECT 1 FROM travaille_relation AS rel_travaille0 WHERE rel_travaille0.eid_to=S.cw_eid)'''),
-
+
('Personne X WHERE NOT X travaille S, S nom "chouette"',
'''SELECT X.cw_eid
FROM cw_Division AS S, cw_Personne AS X
@@ -1102,7 +1104,7 @@
SELECT X.cw_eid
FROM cw_Personne AS X, cw_SubDivision AS S
WHERE NOT EXISTS(SELECT 1 FROM travaille_relation AS rel_travaille0 WHERE rel_travaille0.eid_from=X.cw_eid AND rel_travaille0.eid_to=S.cw_eid) AND S.cw_nom=chouette'''),
-
+
('Any X WHERE X is ET, ET eid 2',
'''SELECT rel_is0.eid_from
FROM is_relation AS rel_is0
@@ -1110,14 +1112,14 @@
]
from logilab.common.adbh import ADV_FUNC_HELPER_DIRECTORY
-
+
class PostgresSQLGeneratorTC(RQLGeneratorTC):
schema = schema
-
+
#capture = True
def setUp(self):
RQLGeneratorTC.setUp(self)
- indexer = get_indexer('postgres', 'utf8')
+ indexer = get_indexer('postgres', 'utf8')
dbms_helper = ADV_FUNC_HELPER_DIRECTORY['postgres']
dbms_helper.fti_uid_attr = indexer.uid_attr
dbms_helper.fti_table = indexer.table
@@ -1127,7 +1129,7 @@
def _norm_sql(self, sql):
return sql.strip()
-
+
def _check(self, rql, sql, varmap=None):
try:
union = self._prepare(rql)
@@ -1141,11 +1143,11 @@
print '!='
print sql.strip()
raise
-
+
def _parse(self, rqls):
for rql, sql in rqls:
yield self._check, rql, sql
-
+
def _checkall(self, rql, sql):
try:
rqlst = self._prepare(rql)
@@ -1191,7 +1193,7 @@
'''SELECT rel_in_basket0.eid_from
FROM in_basket_relation AS rel_in_basket0
WHERE rel_in_basket0.eid_to=12''')
-
+
self._check('Any X WHERE X in_basket B, B eid 12',
'''SELECT rel_in_basket0.eid_from
FROM in_basket_relation AS rel_in_basket0
@@ -1212,7 +1214,7 @@
def test_parser_parse(self):
for t in self._parse(PARSER):
yield t
-
+
def test_basic_parse(self):
for t in self._parse(BASIC):
yield t
@@ -1232,15 +1234,15 @@
def test_multiple_sel_parse(self):
for t in self._parse(MULTIPLE_SEL):
yield t
-
+
def test_functions(self):
for t in self._parse(FUNCS):
yield t
-
+
def test_negation(self):
for t in self._parse(NEGATIONS):
yield t
-
+
def test_intersection(self):
for t in self._parse(INTERSECT):
yield t
@@ -1259,7 +1261,7 @@
ORDER BY 1)'''),
)):
yield t
-
+
def test_subquery(self):
for t in self._parse((
@@ -1274,7 +1276,7 @@
(SELECT XX.cw_name AS C0
FROM cw_Transition AS XX)) AS _T0
ORDER BY 1'''),
-
+
('Any N,NX ORDERBY NX WITH N,NX BEING '
'((Any N,COUNT(X) GROUPBY N WHERE X name N, X is State HAVING COUNT(X)>1)'
' UNION '
@@ -1289,7 +1291,7 @@
FROM cw_Transition AS X
GROUP BY X.cw_name
HAVING COUNT(X.cw_eid)>1)) AS _T0
-ORDER BY 2'''),
+ORDER BY 2'''),
('Any N,COUNT(X) GROUPBY N HAVING COUNT(X)>1 '
'WITH X, N BEING ((Any X, N WHERE X name N, X is State) UNION '
@@ -1317,7 +1319,7 @@
)):
yield t
-
+
def test_subquery_error(self):
rql = ('Any N WHERE X name N WITH X BEING '
'((Any X WHERE X is State)'
@@ -1325,32 +1327,32 @@
' (Any X WHERE X is Transition))')
rqlst = self._prepare(rql)
self.assertRaises(BadRQLQuery, self.o.generate, rqlst)
-
+
def test_symetric(self):
for t in self._parse(SYMETRIC):
yield t
-
+
def test_inline(self):
for t in self._parse(INLINE):
yield t
-
+
def test_has_text(self):
for t in self._parse((
('Any X WHERE X has_text "toto tata"',
"""SELECT appears0.uid
FROM appears AS appears0
WHERE appears0.words @@ to_tsquery('default', 'toto&tata')"""),
-
+
('Personne X WHERE X has_text "toto tata"',
"""SELECT X.eid
FROM appears AS appears0, entities AS X
WHERE appears0.words @@ to_tsquery('default', 'toto&tata') AND appears0.uid=X.eid AND X.type='Personne'"""),
-
+
('Personne X WHERE X has_text %(text)s',
"""SELECT X.eid
FROM appears AS appears0, entities AS X
WHERE appears0.words @@ to_tsquery('default', 'hip&hop&momo') AND appears0.uid=X.eid AND X.type='Personne'"""),
-
+
('Any X WHERE X has_text "toto tata", X name "tutu"',
"""SELECT X.cw_eid
FROM appears AS appears0, cw_Basket AS X
@@ -1406,10 +1408,10 @@
class SqliteSQLGeneratorTC(PostgresSQLGeneratorTC):
-
+
def setUp(self):
RQLGeneratorTC.setUp(self)
- indexer = get_indexer('sqlite', 'utf8')
+ indexer = get_indexer('sqlite', 'utf8')
dbms_helper = ADV_FUNC_HELPER_DIRECTORY['sqlite']
dbms_helper.fti_uid_attr = indexer.uid_attr
dbms_helper.fti_table = indexer.table
@@ -1434,7 +1436,7 @@
ORDER BY 1'''),
)):
yield t
-
+
def test_subquery(self):
# NOTE: no paren around UNION with sqlitebackend
@@ -1451,7 +1453,7 @@
SELECT XX.cw_name AS C0
FROM cw_Transition AS XX) AS _T0
ORDER BY 1'''),
-
+
('Any N,NX ORDERBY NX WITH N,NX BEING '
'((Any N,COUNT(X) GROUPBY N WHERE X name N, X is State HAVING COUNT(X)>1)'
' UNION '
@@ -1466,7 +1468,7 @@
FROM cw_Transition AS X
GROUP BY X.cw_name
HAVING COUNT(X.cw_eid)>1) AS _T0
-ORDER BY 2'''),
+ORDER BY 2'''),
('Any N,COUNT(X) GROUPBY N HAVING COUNT(X)>1 '
'WITH X, N BEING ((Any X, N WHERE X name N, X is State) UNION '
@@ -1481,24 +1483,24 @@
HAVING COUNT(_T0.C0)>1'''),
)):
yield t
-
+
def test_has_text(self):
for t in self._parse((
('Any X WHERE X has_text "toto tata"',
"""SELECT appears0.uid
FROM appears AS appears0
WHERE appears0.word_id IN (SELECT word_id FROM word WHERE word in ('toto', 'tata'))"""),
-
+
('Any X WHERE X has_text %(text)s',
"""SELECT appears0.uid
FROM appears AS appears0
WHERE appears0.word_id IN (SELECT word_id FROM word WHERE word in ('hip', 'hop', 'momo'))"""),
-
+
('Personne X WHERE X has_text "toto tata"',
"""SELECT X.eid
FROM appears AS appears0, entities AS X
WHERE appears0.word_id IN (SELECT word_id FROM word WHERE word in ('toto', 'tata')) AND appears0.uid=X.eid AND X.type='Personne'"""),
-
+
('Any X WHERE X has_text "toto tata", X name "tutu"',
"""SELECT X.cw_eid
FROM appears AS appears0, cw_Basket AS X
@@ -1536,7 +1538,7 @@
def setUp(self):
RQLGeneratorTC.setUp(self)
- indexer = get_indexer('mysql', 'utf8')
+ indexer = get_indexer('mysql', 'utf8')
dbms_helper = ADV_FUNC_HELPER_DIRECTORY['mysql']
dbms_helper.fti_uid_attr = indexer.uid_attr
dbms_helper.fti_table = indexer.table
@@ -1545,7 +1547,7 @@
self.o = SQLGenerator(schema, dbms_helper)
def _norm_sql(self, sql):
- return sql.strip().replace(' ILIKE ', ' LIKE ')
+ return sql.strip().replace(' ILIKE ', ' LIKE ').replace('TRUE', '1').replace('FALSE', '0')
def test_from_clause_needed(self):
queries = [("Any 1 WHERE EXISTS(T is CWGroup, T name 'managers')",
@@ -1606,16 +1608,16 @@
]
for t in self._parse(queries):
yield t
-
+
def test_ambigous_exists_no_from_clause(self):
self._check('Any COUNT(U) WHERE U eid 1, EXISTS (P owned_by U, P is IN (Note, Affaire))',
'''SELECT COUNT(1)
FROM (SELECT 1) AS _T
-WHERE EXISTS(SELECT 1 FROM owned_by_relation AS rel_owned_by0, cw_Affaire AS P WHERE rel_owned_by0.eid_from=P.cw_eid AND rel_owned_by0.eid_to=1 UNION SELECT 1 FROM owned_by_relation AS rel_owned_by1, cw_Note AS P WHERE rel_owned_by1.eid_from=P.cw_eid AND rel_owned_by1.eid_to=1)''')
-
+WHERE EXISTS(SELECT 1 FROM owned_by_relation AS rel_owned_by0, cw_Affaire AS P WHERE rel_owned_by0.eid_from=P.cw_eid AND rel_owned_by0.eid_to=1 UNION SELECT 1 FROM owned_by_relation AS rel_owned_by1, cw_Note AS P WHERE rel_owned_by1.eid_from=P.cw_eid AND rel_owned_by1.eid_to=1)''')
-
+
+
if __name__ == '__main__':
unittest_main()
--- a/server/test/unittest_rqlrewrite.py Wed May 13 16:28:21 2009 +0200
+++ b/server/test/unittest_rqlrewrite.py Wed May 13 16:59:50 2009 +0200
@@ -11,7 +11,7 @@
config.bootstrap_cubes()
schema = config.load_schema()
schema.add_relation_def(mock_object(subject='Card', name='in_state', object='State', cardinality='1*'))
-
+
rqlhelper = RQLHelper(schema, special_relations={'eid': 'uid',
'has_text': 'fti'})
@@ -20,7 +20,7 @@
def teardown_module(*args):
repotest.undo_monkey_patch()
-
+
def eid_func_map(eid):
return {1: 'CWUser',
2: 'Card'}[eid]
@@ -41,7 +41,7 @@
rewriter = RQLRewriter(FakeQuerier, mock_object(user=(mock_object(eid=1))))
for v, snippets in snippets_map.items():
snippets_map[v] = [mock_object(snippet_rqlst=parse('Any X WHERE '+snippet).children[0],
- expression='Any X WHERE '+snippet)
+ expression='Any X WHERE '+snippet)
for snippet in snippets]
rqlhelper.compute_solutions(rqlst.children[0], {'eid': eid_func_map}, kwargs=kwargs)
solutions = rqlst.children[0].solutions
@@ -62,10 +62,10 @@
* optimisation: detecter les relations utilisees dans les rqlexpressions qui
sont presentes dans la requete de depart pour les reutiliser si possible
-
+
* "has_<ACTION>_permission" ?
"""
-
+
def test_base_var(self):
card_constraint = ('X in_state S, U in_group G, P require_state S,'
'P name "read", P require_group G')
@@ -75,7 +75,7 @@
u"Any C WHERE C is Card, B eid %(D)s, "
"EXISTS(C in_state A, B in_group E, F require_state A, "
"F name 'read', F require_group E, A is State, E is CWGroup, F is CWPermission)")
-
+
def test_multiple_var(self):
card_constraint = ('X in_state S, U in_group G, P require_state S,'
'P name "read", P require_group G')
@@ -91,7 +91,7 @@
"(EXISTS(S ref LIKE 'PUBLIC%')) OR (EXISTS(B in_group G, G name 'public', G is CWGroup)), "
"S is Affaire")
self.failUnless('D' in kwargs)
-
+
def test_or(self):
constraint = '(X identity U) OR (X in_state ST, CL identity U, CL in_state ST, ST name "subscribed")'
rqlst = parse('Any S WHERE S owned_by C, C eid %(u)s')
@@ -100,7 +100,7 @@
"Any S WHERE S owned_by C, C eid %(u)s, A eid %(B)s, "
"EXISTS((C identity A) OR (C in_state D, E identity A, "
"E in_state D, D name 'subscribed'), D is State, E is CWUser), "
- "S is IN(Affaire, Basket, Bookmark, Card, Comment, Division, CWCache, CWConstraint, CWConstraintType, CWEType, CWAttribute, CWGroup, CWRelation, CWPermission, CWProperty, CWRType, CWUser, Email, EmailAddress, EmailPart, EmailThread, File, Folder, Image, Note, Personne, RQLExpression, Societe, State, SubDivision, Tag, TrInfo, Transition)")
+ "S is IN(Affaire, Basket, Bookmark, CWAttribute, CWCache, CWConstraint, CWConstraintType, CWEType, CWGroup, CWPermission, CWProperty, CWRType, CWRelation, CWUser, Card, Comment, Division, Email, EmailAddress, EmailPart, EmailThread, File, Folder, Image, Note, Personne, RQLExpression, Societe, State, SubDivision, Tag, TrInfo, Transition)")
def test_simplified_rqlst(self):
card_constraint = ('X in_state S, U in_group G, P require_state S,'
@@ -129,7 +129,7 @@
"WITH C,T BEING "
"(Any C,T WHERE C in_state B, D in_group F, G require_state B, G name 'read', "
"G require_group F, C title T, D eid %(A)s, C is Card)")
-
+
def test_relation_optimization(self):
# since Card in_state State as monovalued cardinality, the in_state
# relation used in the rql expression can be ignored and S replaced by
@@ -149,7 +149,7 @@
trinfo_constraint = ('X wf_info_for Y, Y require_permission P, P name "read"')
rqlst = parse('Any U,T WHERE U is CWUser, T wf_info_for U')
self.assertRaises(Unauthorized, rewrite, rqlst, {'T': (trinfo_constraint,)}, {})
-
+
def test_unsupported_constraint_2(self):
trinfo_constraint = ('X wf_info_for Y, Y require_permission P, P name "read"')
rqlst = parse('Any U,T WHERE U is CWUser, T wf_info_for U')
@@ -165,23 +165,23 @@
rewrite(rqlst, {'T': (trinfo_constraint, 'X in_group G, G name "managers"')}, {})
self.failUnlessEqual(rqlst.as_string(),
u'XXX dunno what should be generated')
-
+
def test_add_ambiguity_exists(self):
constraint = ('X concerne Y')
rqlst = parse('Affaire X')
rewrite(rqlst, {'X': (constraint,)}, {})
self.failUnlessEqual(rqlst.as_string(),
u"Any X WHERE X is Affaire, (((EXISTS(X concerne A, A is Division)) OR (EXISTS(X concerne D, D is SubDivision))) OR (EXISTS(X concerne C, C is Societe))) OR (EXISTS(X concerne B, B is Note))")
-
+
def test_add_ambiguity_outerjoin(self):
constraint = ('X concerne Y')
rqlst = parse('Any X,C WHERE X? documented_by C')
rewrite(rqlst, {'X': (constraint,)}, {})
# ambiguity are kept in the sub-query, no need to be resolved using OR
self.failUnlessEqual(rqlst.as_string(),
- u"Any X,C WHERE X? documented_by C, C is Card WITH X BEING (Any X WHERE X concerne A, X is Affaire)")
-
-
-
+ u"Any X,C WHERE X? documented_by C, C is Card WITH X BEING (Any X WHERE X concerne A, X is Affaire)")
+
+
+
if __name__ == '__main__':
unittest_main()
--- a/server/test/unittest_schemaserial.py Wed May 13 16:28:21 2009 +0200
+++ b/server/test/unittest_schemaserial.py Wed May 13 16:59:50 2009 +0200
@@ -16,9 +16,9 @@
schema = loader.load(config)
from cubicweb.server.schemaserial import *
-
+
class Schema2RQLTC(TestCase):
-
+
def test_eschema2rql1(self):
self.assertListEquals(list(eschema2rql(schema.eschema('CWAttribute'))),
[
@@ -26,12 +26,12 @@
{'description': u'define a final relation: link a final relation type from a non final entity to a final entity type. used to build the application schema',
'meta': True, 'name': u'CWAttribute', 'final': False})
])
-
+
def test_eschema2rql2(self):
self.assertListEquals(list(eschema2rql(schema.eschema('String'))), [
('INSERT CWEType X: X description %(description)s,X final %(final)s,X meta %(meta)s,X name %(name)s',
{'description': u'', 'final': True, 'meta': True, 'name': u'String'})])
-
+
def test_eschema2rql_specialization(self):
self.assertListEquals(list(specialize2rql(schema)),
[
@@ -39,74 +39,76 @@
{'x': 'Division', 'et': 'Societe'}),
('SET X specializes ET WHERE X name %(x)s, ET name %(et)s',
{'x': 'SubDivision', 'et': 'Division'})])
-
+
def test_rschema2rql1(self):
self.assertListEquals(list(rschema2rql(schema.rschema('relation_type'))),
[
('INSERT CWRType X: X description %(description)s,X final %(final)s,X fulltext_container %(fulltext_container)s,X inlined %(inlined)s,X meta %(meta)s,X name %(name)s,X symetric %(symetric)s',
{'description': u'link a relation definition to its relation type', 'meta': True, 'symetric': False, 'name': u'relation_type', 'final' : False, 'fulltext_container': None, 'inlined': True}),
- ('INSERT CWRelation X: X cardinality %(cardinality)s,X composite %(composite)s,X description %(description)s,X ordernum %(ordernum)s,X relation_type ER,X from_entity SE,X to_entity OE WHERE SE name %(se)s,ER name %(rt)s,OE name %(oe)s',
- {'rt': 'relation_type', 'description': u'', 'composite': u'object', 'oe': 'CWRType',
- 'ordernum': 1, 'cardinality': u'1*', 'se': 'CWAttribute'}),
- ('INSERT CWConstraint X: X value %(value)s, X cstrtype CT, EDEF constrained_by X WHERE CT name %(ctname)s, EDEF relation_type ER, EDEF from_entity SE, EDEF to_entity OE, ER name %(rt)s, SE name %(se)s, OE name %(oe)s, EDEF is CWRelation',
- {'rt': 'relation_type', 'oe': 'CWRType', 'ctname': u'RQLConstraint', 'se': 'CWAttribute', 'value': u'O final TRUE'}),
+
('INSERT CWRelation X: X cardinality %(cardinality)s,X composite %(composite)s,X description %(description)s,X ordernum %(ordernum)s,X relation_type ER,X from_entity SE,X to_entity OE WHERE SE name %(se)s,ER name %(rt)s,OE name %(oe)s',
{'rt': 'relation_type', 'description': u'', 'composite': u'object', 'oe': 'CWRType',
'ordernum': 1, 'cardinality': u'1*', 'se': 'CWRelation'}),
('INSERT CWConstraint X: X value %(value)s, X cstrtype CT, EDEF constrained_by X WHERE CT name %(ctname)s, EDEF relation_type ER, EDEF from_entity SE, EDEF to_entity OE, ER name %(rt)s, SE name %(se)s, OE name %(oe)s, EDEF is CWRelation',
{'rt': 'relation_type', 'oe': 'CWRType', 'ctname': u'RQLConstraint', 'se': 'CWRelation', 'value': u'O final FALSE'}),
+
+ ('INSERT CWRelation X: X cardinality %(cardinality)s,X composite %(composite)s,X description %(description)s,X ordernum %(ordernum)s,X relation_type ER,X from_entity SE,X to_entity OE WHERE SE name %(se)s,ER name %(rt)s,OE name %(oe)s',
+ {'rt': 'relation_type', 'description': u'', 'composite': u'object', 'oe': 'CWRType',
+ 'ordernum': 1, 'cardinality': u'1*', 'se': 'CWAttribute'}),
+ ('INSERT CWConstraint X: X value %(value)s, X cstrtype CT, EDEF constrained_by X WHERE CT name %(ctname)s, EDEF relation_type ER, EDEF from_entity SE, EDEF to_entity OE, ER name %(rt)s, SE name %(se)s, OE name %(oe)s, EDEF is CWRelation',
+ {'rt': 'relation_type', 'oe': 'CWRType', 'ctname': u'RQLConstraint', 'se': 'CWAttribute', 'value': u'O final TRUE'}),
])
-
+
def test_rschema2rql2(self):
- expected = [
+ self.assertListEquals(list(rschema2rql(schema.rschema('add_permission'))),
+ [
('INSERT CWRType X: X description %(description)s,X final %(final)s,X fulltext_container %(fulltext_container)s,X inlined %(inlined)s,X meta %(meta)s,X name %(name)s,X symetric %(symetric)s', {'description': u'core relation giving to a group the permission to add an entity or relation type', 'meta': True, 'symetric': False, 'name': u'add_permission', 'final': False, 'fulltext_container': None, 'inlined': False}),
+
+ ('INSERT CWRelation X: X cardinality %(cardinality)s,X composite %(composite)s,X description %(description)s,X ordernum %(ordernum)s,X relation_type ER,X from_entity SE,X to_entity OE WHERE SE name %(se)s,ER name %(rt)s,OE name %(oe)s',
+ {'rt': 'add_permission', 'description': u'groups allowed to add entities/relations of this type', 'composite': None, 'oe': 'CWGroup', 'ordernum': 3, 'cardinality': u'**', 'se': 'CWEType'}),
+ ('INSERT CWRelation X: X cardinality %(cardinality)s,X composite %(composite)s,X description %(description)s,X ordernum %(ordernum)s,X relation_type ER,X from_entity SE,X to_entity OE WHERE SE name %(se)s,ER name %(rt)s,OE name %(oe)s',
+ {'rt': 'add_permission', 'description': u'groups allowed to add entities/relations of this type', 'composite': None, 'oe': 'CWGroup', 'ordernum': 3, 'cardinality': u'**', 'se': 'CWRType'}),
+
('INSERT CWRelation X: X cardinality %(cardinality)s,X composite %(composite)s,X description %(description)s,X ordernum %(ordernum)s,X relation_type ER,X from_entity SE,X to_entity OE WHERE SE name %(se)s,ER name %(rt)s,OE name %(oe)s',
{'rt': 'add_permission', 'description': u'rql expression allowing to add entities/relations of this type', 'composite': 'subject', 'oe': 'RQLExpression', 'ordernum': 5, 'cardinality': u'*?', 'se': 'CWEType'}),
('INSERT CWRelation X: X cardinality %(cardinality)s,X composite %(composite)s,X description %(description)s,X ordernum %(ordernum)s,X relation_type ER,X from_entity SE,X to_entity OE WHERE SE name %(se)s,ER name %(rt)s,OE name %(oe)s',
{'rt': 'add_permission', 'description': u'rql expression allowing to add entities/relations of this type', 'composite': 'subject', 'oe': 'RQLExpression', 'ordernum': 5, 'cardinality': u'*?', 'se': 'CWRType'}),
-
- ('INSERT CWRelation X: X cardinality %(cardinality)s,X composite %(composite)s,X description %(description)s,X ordernum %(ordernum)s,X relation_type ER,X from_entity SE,X to_entity OE WHERE SE name %(se)s,ER name %(rt)s,OE name %(oe)s',
- {'rt': 'add_permission', 'description': u'groups allowed to add entities/relations of this type', 'composite': None, 'oe': 'CWGroup', 'ordernum': 3, 'cardinality': u'**', 'se': 'CWEType'}),
- ('INSERT CWRelation X: X cardinality %(cardinality)s,X composite %(composite)s,X description %(description)s,X ordernum %(ordernum)s,X relation_type ER,X from_entity SE,X to_entity OE WHERE SE name %(se)s,ER name %(rt)s,OE name %(oe)s',
- {'rt': 'add_permission', 'description': u'groups allowed to add entities/relations of this type', 'composite': None, 'oe': 'CWGroup', 'ordernum': 3, 'cardinality': u'**', 'se': 'CWRType'}),
- ]
- for i, (rql, args) in enumerate(rschema2rql(schema.rschema('add_permission'))):
- yield self.assertEquals, (rql, args), expected[i]
-
+ ])
+
def test_rschema2rql3(self):
- self.assertListEquals(list(rschema2rql(schema.rschema('cardinality'))),
+ self.assertListEquals(list(rschema2rql(schema.rschema('cardinality'))),
[
('INSERT CWRType X: X description %(description)s,X final %(final)s,X fulltext_container %(fulltext_container)s,X inlined %(inlined)s,X meta %(meta)s,X name %(name)s,X symetric %(symetric)s',
{'description': u'', 'meta': False, 'symetric': False, 'name': u'cardinality', 'final': True, 'fulltext_container': None, 'inlined': False}),
('INSERT CWAttribute X: X cardinality %(cardinality)s,X defaultval %(defaultval)s,X description %(description)s,X fulltextindexed %(fulltextindexed)s,X indexed %(indexed)s,X internationalizable %(internationalizable)s,X ordernum %(ordernum)s,X relation_type ER,X from_entity SE,X to_entity OE WHERE SE name %(se)s,ER name %(rt)s,OE name %(oe)s',
+ {'rt': 'cardinality', 'description': u'subject/object cardinality', 'internationalizable': True, 'fulltextindexed': False, 'ordernum': 5, 'defaultval': None, 'indexed': False, 'cardinality': u'?1', 'oe': 'String', 'se': 'CWRelation'}),
+ ('INSERT CWConstraint X: X value %(value)s, X cstrtype CT, EDEF constrained_by X WHERE CT name %(ctname)s, EDEF relation_type ER, EDEF from_entity SE, EDEF to_entity OE, ER name %(rt)s, SE name %(se)s, OE name %(oe)s, EDEF is CWAttribute',
+ {'rt': 'cardinality', 'oe': 'String', 'ctname': u'SizeConstraint', 'se': 'CWRelation', 'value': u'max=2'}),
+ ('INSERT CWConstraint X: X value %(value)s, X cstrtype CT, EDEF constrained_by X WHERE CT name %(ctname)s, EDEF relation_type ER, EDEF from_entity SE, EDEF to_entity OE, ER name %(rt)s, SE name %(se)s, OE name %(oe)s, EDEF is CWAttribute',
+ {'rt': 'cardinality', 'oe': 'String', 'ctname': u'StaticVocabularyConstraint', 'se': 'CWRelation', 'value': u"u'?*', u'1*', u'+*', u'**', u'?+', u'1+', u'++', u'*+', u'?1', u'11', u'+1', u'*1', u'??', u'1?', u'+?', u'*?'"}),
+
+ ('INSERT CWAttribute X: X cardinality %(cardinality)s,X defaultval %(defaultval)s,X description %(description)s,X fulltextindexed %(fulltextindexed)s,X indexed %(indexed)s,X internationalizable %(internationalizable)s,X ordernum %(ordernum)s,X relation_type ER,X from_entity SE,X to_entity OE WHERE SE name %(se)s,ER name %(rt)s,OE name %(oe)s',
{'rt': 'cardinality', 'description': u'subject/object cardinality', 'internationalizable': True, 'fulltextindexed': False, 'ordernum': 5, 'defaultval': None, 'indexed': False, 'cardinality': u'?1', 'oe': 'String', 'se': 'CWAttribute'}),
('INSERT CWConstraint X: X value %(value)s, X cstrtype CT, EDEF constrained_by X WHERE CT name %(ctname)s, EDEF relation_type ER, EDEF from_entity SE, EDEF to_entity OE, ER name %(rt)s, SE name %(se)s, OE name %(oe)s, EDEF is CWAttribute',
{'rt': 'cardinality', 'oe': 'String', 'ctname': u'SizeConstraint', 'se': 'CWAttribute', 'value': u'max=2'}),
('INSERT CWConstraint X: X value %(value)s, X cstrtype CT, EDEF constrained_by X WHERE CT name %(ctname)s, EDEF relation_type ER, EDEF from_entity SE, EDEF to_entity OE, ER name %(rt)s, SE name %(se)s, OE name %(oe)s, EDEF is CWAttribute',
{'rt': 'cardinality', 'oe': 'String', 'ctname': u'StaticVocabularyConstraint', 'se': 'CWAttribute', 'value': u"u'?1', u'11', u'??', u'1?'"}),
+ ])
- ('INSERT CWAttribute X: X cardinality %(cardinality)s,X defaultval %(defaultval)s,X description %(description)s,X fulltextindexed %(fulltextindexed)s,X indexed %(indexed)s,X internationalizable %(internationalizable)s,X ordernum %(ordernum)s,X relation_type ER,X from_entity SE,X to_entity OE WHERE SE name %(se)s,ER name %(rt)s,OE name %(oe)s',
- {'rt': 'cardinality', 'description': u'subject/object cardinality', 'internationalizable': True, 'fulltextindexed': False, 'ordernum': 5, 'defaultval': None, 'indexed': False, 'cardinality': u'?1', 'oe': 'String', 'se': 'CWRelation'}),
- ('INSERT CWConstraint X: X value %(value)s, X cstrtype CT, EDEF constrained_by X WHERE CT name %(ctname)s, EDEF relation_type ER, EDEF from_entity SE, EDEF to_entity OE, ER name %(rt)s, SE name %(se)s, OE name %(oe)s, EDEF is CWAttribute',
- {'rt': 'cardinality', 'oe': 'String', 'ctname': u'SizeConstraint', 'se': 'CWRelation', 'value': u'max=2'}),
- ('INSERT CWConstraint X: X value %(value)s, X cstrtype CT, EDEF constrained_by X WHERE CT name %(ctname)s, EDEF relation_type ER, EDEF from_entity SE, EDEF to_entity OE, ER name %(rt)s, SE name %(se)s, OE name %(oe)s, EDEF is CWAttribute',
- {'rt': 'cardinality', 'oe': 'String', 'ctname': u'StaticVocabularyConstraint', 'se': 'CWRelation', 'value': u"u'?*', u'1*', u'+*', u'**', u'?+', u'1+', u'++', u'*+', u'?1', u'11', u'+1', u'*1', u'??', u'1?', u'+?', u'*?'"}),
- ])
-
def test_updateeschema2rql1(self):
self.assertListEquals(list(updateeschema2rql(schema.eschema('CWAttribute'))),
[('SET X description %(description)s,X final %(final)s,X meta %(meta)s,X name %(name)s WHERE X is CWEType, X name %(et)s',
{'description': u'define a final relation: link a final relation type from a non final entity to a final entity type. used to build the application schema', 'meta': True, 'et': 'CWAttribute', 'final': False, 'name': u'CWAttribute'}),
])
-
+
def test_updateeschema2rql2(self):
self.assertListEquals(list(updateeschema2rql(schema.eschema('String'))),
[('SET X description %(description)s,X final %(final)s,X meta %(meta)s,X name %(name)s WHERE X is CWEType, X name %(et)s',
{'description': u'', 'meta': True, 'et': 'String', 'final': True, 'name': u'String'})
])
-
+
def test_updaterschema2rql1(self):
self.assertListEquals(list(updaterschema2rql(schema.rschema('relation_type'))),
[
@@ -115,7 +117,7 @@
'description': u'link a relation definition to its relation type',
'meta': True, 'final': False, 'fulltext_container': None, 'inlined': True, 'name': u'relation_type'})
])
-
+
def test_updaterschema2rql2(self):
expected = [
('SET X description %(description)s,X final %(final)s,X fulltext_container %(fulltext_container)s,X inlined %(inlined)s,X meta %(meta)s,X name %(name)s,X symetric %(symetric)s WHERE X is CWRType, X name %(rt)s',
@@ -133,7 +135,7 @@
'guests': 2,
'owners': 3,
}
-
+
def test_eperms2rql1(self):
self.assertListEquals([rql for rql, kwargs in erperms2rql(schema.eschema('CWEType'), self.GROUP_MAPPING)],
['SET X read_permission Y WHERE X is CWEType, X name "CWEType", Y eid 2',
@@ -144,7 +146,7 @@
'SET X update_permission Y WHERE X is CWEType, X name "CWEType", Y eid 3',
'SET X delete_permission Y WHERE X is CWEType, X name "CWEType", Y eid 0',
])
-
+
def test_rperms2rql2(self):
self.assertListEquals([rql for rql, kwargs in erperms2rql(schema.rschema('read_permission'), self.GROUP_MAPPING)],
['SET X read_permission Y WHERE X is CWRType, X name "read_permission", Y eid 2',
@@ -153,7 +155,7 @@
'SET X add_permission Y WHERE X is CWRType, X name "read_permission", Y eid 0',
'SET X delete_permission Y WHERE X is CWRType, X name "read_permission", Y eid 0',
])
-
+
def test_rperms2rql3(self):
self.assertListEquals([rql for rql, kwargs in erperms2rql(schema.rschema('name'), self.GROUP_MAPPING)],
['SET X read_permission Y WHERE X is CWRType, X name "name", Y eid 2',
@@ -166,11 +168,11 @@
'SET X delete_permission Y WHERE X is CWRType, X name "name", Y eid 0',
'SET X delete_permission Y WHERE X is CWRType, X name "name", Y eid 1',
])
-
+
#def test_perms2rql(self):
# self.assertListEquals(perms2rql(schema, self.GROUP_MAPPING),
# ['INSERT CWEType X: X name 'Societe', X final FALSE'])
-
+
if __name__ == '__main__':
--- a/sobjects/test/data/bootstrap_cubes Wed May 13 16:28:21 2009 +0200
+++ b/sobjects/test/data/bootstrap_cubes Wed May 13 16:59:50 2009 +0200
@@ -1,1 +1,1 @@
-comment
+card,comment
--- a/test/unittest_vregistry.py Wed May 13 16:28:21 2009 +0200
+++ b/test/unittest_vregistry.py Wed May 13 16:59:50 2009 +0200
@@ -24,16 +24,6 @@
config.bootstrap_cubes()
self.vreg.schema = config.load_schema()
- def test_load(self):
- self.vreg.init_registration([WEBVIEWSDIR])
- self.vreg.load_file(join(WEBVIEWSDIR, 'cwuser.py'), 'cubicweb.web.views.cwuser')
- self.vreg.load_file(join(WEBVIEWSDIR, 'baseviews.py'), 'cubicweb.web.views.baseviews')
- fpvc = [v for v in self.vreg.registry_objects('views', 'primary')
- if v.__module__ == 'cubicweb.web.views.cwuser'][0]
- fpv = fpvc(None, None)
- # don't want a TypeError due to super call
- self.assertRaises(AttributeError, fpv.render_entity_attributes, None, None)
-
def test_load_interface_based_vojects(self):
self.vreg.init_registration([WEBVIEWSDIR])
self.vreg.load_file(join(BASE, 'entities', '__init__.py'), 'cubicweb.entities.__init__')