# HG changeset patch # User Sylvain Thénault # Date 1316187026 -7200 # Node ID f2a5805615f861df65199bda352e9f6f2866ff88 # Parent 02e141e41da23f51bb2eaf72e823bd998460b3f1# Parent aa30c665bd06afbe59ff7936a8c3fa99e78f0ef0 backport stable diff -r aa30c665bd06 -r f2a5805615f8 __pkginfo__.py --- a/__pkginfo__.py Fri Sep 16 16:42:42 2011 +0200 +++ b/__pkginfo__.py Fri Sep 16 17:30:26 2011 +0200 @@ -22,7 +22,7 @@ modname = distname = "cubicweb" -numversion = (3, 13, 5) +numversion = (3, 14, 0) version = '.'.join(str(num) for num in numversion) description = "a repository of entities / relations for knowledge management" @@ -40,7 +40,7 @@ ] __depends__ = { - 'logilab-common': '>= 0.56.2', + 'logilab-common': '>= 0.56.3', 'logilab-mtconverter': '>= 0.8.0', 'rql': '>= 0.28.0', 'yams': '>= 0.33.0', diff -r aa30c665bd06 -r f2a5805615f8 debian/control --- a/debian/control Fri Sep 16 16:42:42 2011 +0200 +++ b/debian/control Fri Sep 16 17:30:26 2011 +0200 @@ -35,7 +35,7 @@ Conflicts: cubicweb-multisources Replaces: cubicweb-multisources Provides: cubicweb-multisources -Depends: ${misc:Depends}, ${python:Depends}, cubicweb-common (= ${source:Version}), cubicweb-ctl (= ${source:Version}), python-logilab-database (>= 1.5.0), cubicweb-postgresql-support | cubicweb-mysql-support | python-pysqlite2, python-logilab-common (>= 0.56.2) +Depends: ${misc:Depends}, ${python:Depends}, cubicweb-common (= ${source:Version}), cubicweb-ctl (= ${source:Version}), python-logilab-database (>= 1.5.0), cubicweb-postgresql-support | cubicweb-mysql-support | python-pysqlite2 Recommends: pyro (<< 4.0.0), cubicweb-documentation (= ${source:Version}) Description: server part of the CubicWeb framework CubicWeb is a semantic web application framework. @@ -70,7 +70,7 @@ Architecture: all XB-Python-Version: ${python:Versions} Provides: cubicweb-web-frontend -Depends: ${misc:Depends}, ${python:Depends}, cubicweb-web (= ${source:Version}), cubicweb-ctl (= ${source:Version}), python-twisted-web, python-logilab-common (>= 0.56.2) +Depends: ${misc:Depends}, ${python:Depends}, cubicweb-web (= ${source:Version}), cubicweb-ctl (= ${source:Version}), python-twisted-web Recommends: pyro (<< 4.0.0), cubicweb-documentation (= ${source:Version}) Description: twisted-based web interface for the CubicWeb framework CubicWeb is a semantic web application framework. @@ -99,7 +99,7 @@ Package: cubicweb-common Architecture: all XB-Python-Version: ${python:Versions} -Depends: ${misc:Depends}, ${python:Depends}, graphviz, gettext, python-logilab-mtconverter (>= 0.8.0), python-logilab-common (>= 0.55.2), python-yams (>= 0.33.0), python-rql (>= 0.28.0), python-lxml +Depends: ${misc:Depends}, ${python:Depends}, graphviz, gettext, python-logilab-mtconverter (>= 0.8.0), python-logilab-common (>= 0.56.3), python-yams (>= 0.33.0), python-rql (>= 0.28.0), python-lxml Recommends: python-simpletal (>= 4.0), python-crypto Conflicts: cubicweb-core Replaces: cubicweb-core diff -r aa30c665bd06 -r f2a5805615f8 devtools/test/unittest_dbfill.py --- a/devtools/test/unittest_dbfill.py Fri Sep 16 16:42:42 2011 +0200 +++ b/devtools/test/unittest_dbfill.py Fri Sep 16 17:30:26 2011 +0200 @@ -72,7 +72,7 @@ """test value generation from a given domain value""" firstname = self.person_valgen.generate_attribute_value({}, 'firstname', 12) possible_choices = self._choice_func('Person', 'firstname') - self.failUnless(firstname in possible_choices, + self.assertTrue(firstname in possible_choices, '%s not in %s' % (firstname, possible_choices)) def test_choice(self): @@ -80,21 +80,21 @@ # Test for random index for index in range(5): sx_value = self.person_valgen.generate_attribute_value({}, 'civility', index) - self.failUnless(sx_value in ('Mr', 'Mrs', 'Ms')) + self.assertTrue(sx_value in ('Mr', 'Mrs', 'Ms')) def test_integer(self): """test integer generation""" # Test for random index for index in range(5): cost_value = self.bug_valgen.generate_attribute_value({}, 'cost', index) - self.failUnless(cost_value in range(index+1)) + self.assertTrue(cost_value in range(index+1)) def test_date(self): """test date generation""" # Test for random index for index in range(10): date_value = self.person_valgen.generate_attribute_value({}, 'birthday', index) - self.failUnless(isinstance(date_value, datetime.date)) + self.assertTrue(isinstance(date_value, datetime.date)) def test_phone(self): """tests make_tel utility""" diff -r aa30c665bd06 -r f2a5805615f8 devtools/test/unittest_fill.py --- a/devtools/test/unittest_fill.py Fri Sep 16 16:42:42 2011 +0200 +++ b/devtools/test/unittest_fill.py Fri Sep 16 17:30:26 2011 +0200 @@ -41,31 +41,31 @@ def test_autoextend(self): - self.failIf('generate_server' in dir(ValueGenerator)) + self.assertFalse('generate_server' in dir(ValueGenerator)) class MyValueGenerator(ValueGenerator): def generate_server(self, index): return attrname - self.failUnless('generate_server' in dir(ValueGenerator)) + self.assertTrue('generate_server' in dir(ValueGenerator)) def test_bad_signature_detection(self): - self.failIf('generate_server' in dir(ValueGenerator)) + self.assertFalse('generate_server' in dir(ValueGenerator)) try: class MyValueGenerator(ValueGenerator): def generate_server(self): pass except TypeError: - self.failIf('generate_server' in dir(ValueGenerator)) + self.assertFalse('generate_server' in dir(ValueGenerator)) else: self.fail('TypeError not raised') def test_signature_extension(self): - self.failIf('generate_server' in dir(ValueGenerator)) + self.assertFalse('generate_server' in dir(ValueGenerator)) class MyValueGenerator(ValueGenerator): def generate_server(self, index, foo): pass - self.failUnless('generate_server' in dir(ValueGenerator)) + self.assertTrue('generate_server' in dir(ValueGenerator)) if __name__ == '__main__': diff -r aa30c665bd06 -r f2a5805615f8 devtools/testlib.py --- a/devtools/testlib.py Fri Sep 16 16:42:42 2011 +0200 +++ b/devtools/testlib.py Fri Sep 16 17:30:26 2011 +0200 @@ -387,31 +387,6 @@ req.cnx.commit() return user - @iclassmethod # XXX turn into a class method - def grant_permission(self, session, entity, group, pname=None, plabel=None): - """insert a permission on an entity. Will have to commit the main - connection to be considered - """ - if not isinstance(session, Session): - warn('[3.12] grant_permission arguments are now (session, entity, group, pname[, plabel])', - DeprecationWarning, stacklevel=2) - plabel = pname - pname = group - group = entity - entity = session - assert not isinstance(self, type) - session = self.session - pname = unicode(pname) - plabel = plabel and unicode(plabel) or unicode(group) - e = getattr(entity, 'eid', entity) - with security_enabled(session, False, False): - peid = session.execute( - 'INSERT CWPermission X: X name %(pname)s, X label %(plabel)s,' - 'X require_group G, E require_permission X ' - 'WHERE G name %(group)s, E eid %(e)s', - locals())[0][0] - return peid - def login(self, login, **kwargs): """return a connection for the given login/password""" if login == self.admlogin: diff -r aa30c665bd06 -r f2a5805615f8 doc/book/en/devrepo/datamodel/baseschema.rst --- a/doc/book/en/devrepo/datamodel/baseschema.rst Fri Sep 16 16:42:42 2011 +0200 +++ b/doc/book/en/devrepo/datamodel/baseschema.rst Fri Sep 16 17:30:26 2011 +0200 @@ -19,7 +19,6 @@ ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ * _`CWUser`, system users * _`CWGroup`, users groups -* _`CWPermission`, used to configure the security of the instance Entity types used to manage workflows ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ diff -r aa30c665bd06 -r f2a5805615f8 doc/book/en/devrepo/datamodel/definition.rst --- a/doc/book/en/devrepo/datamodel/definition.rst Fri Sep 16 16:42:42 2011 +0200 +++ b/doc/book/en/devrepo/datamodel/definition.rst Fri Sep 16 17:30:26 2011 +0200 @@ -646,68 +646,7 @@ RelationType declaration which offers some advantages in the context of reusable cubes. -Definition of permissions -~~~~~~~~~~~~~~~~~~~~~~~~~~ -The entity type `CWPermission` from the standard library -allows to build very complex and dynamic security architectures. The schema of -this entity type is as follow: - -.. sourcecode:: python - - class CWPermission(EntityType): - """entity type that may be used to construct some advanced security configuration - """ - name = String(required=True, indexed=True, internationalizable=True, maxsize=100) - require_group = SubjectRelation('CWGroup', cardinality='+*', - description=_('groups to which the permission is granted')) - require_state = SubjectRelation('State', - description=_("entity's state in which the permission is applicable")) - # can be used on any entity - require_permission = ObjectRelation('**', cardinality='*1', composite='subject', - description=_("link a permission to the entity. This " - "permission should be used in the security " - "definition of the entity's type to be useful.")) - - -Example of configuration: - -.. sourcecode:: python - - class Version(EntityType): - """a version is defining the content of a particular project's release""" - - __permissions__ = {'read': ('managers', 'users', 'guests',), - 'update': ('managers', 'logilab', 'owners',), - 'delete': ('managers', ), - 'add': ('managers', 'logilab', - ERQLExpression('X version_of PROJ, U in_group G,' - 'PROJ require_permission P, P name "add_version",' - 'P require_group G'),)} - - - class version_of(RelationType): - """link a version to its project. A version is necessarily linked to one and only one project. - """ - __permissions__ = {'read': ('managers', 'users', 'guests',), - 'delete': ('managers', ), - 'add': ('managers', 'logilab', - RRQLExpression('O require_permission P, P name "add_version",' - 'U in_group G, P require_group G'),) - } - inlined = True - - -This configuration indicates that an entity `CWPermission` named -"add_version" can be associated to a project and provides rights to create -new versions on this project to specific groups. It is important to notice that: - -* in such case, we have to protect both the entity type "Version" and the relation - associating a version to a project ("version_of") - -* because of the genericity of the entity type `CWPermission`, we have to execute - a unification with the groups and/or the states if necessary in the expression - ("U in_group G, P require_group G" in the above example) - + Handling schema changes diff -r aa30c665bd06 -r f2a5805615f8 entities/authobjs.py --- a/entities/authobjs.py Fri Sep 16 16:42:42 2011 +0200 +++ b/entities/authobjs.py Fri Sep 16 17:30:26 2011 +0200 @@ -29,22 +29,6 @@ fetch_attrs, fetch_order = fetch_config(['name']) fetch_unrelated_order = fetch_order - def grant_permission(self, entity, pname, plabel=None): - """grant local `pname` permission on `entity` to this group using - :class:`CWPermission`. - - If a similar permission already exists, add the group to it, else create - a new one. - """ - if not self._cw.execute( - 'SET X require_group G WHERE E eid %(e)s, G eid %(g)s, ' - 'E require_permission X, X name %(name)s, X label %(label)s', - {'e': entity.eid, 'g': self.eid, - 'name': pname, 'label': plabel}): - self._cw.create_entity('CWPermission', name=pname, label=plabel, - require_group=self, - reverse_require_permission=entity) - class CWUser(AnyEntity): __regid__ = 'CWUser' @@ -139,18 +123,6 @@ return False owns = cached(owns, keyarg=1) - def has_permission(self, pname, contexteid=None): - rql = 'Any P WHERE P is CWPermission, U eid %(u)s, U in_group G, '\ - 'P name %(pname)s, P require_group G' - kwargs = {'pname': pname, 'u': self.eid} - if contexteid is not None: - rql += ', X require_permission P, X eid %(x)s' - kwargs['x'] = contexteid - try: - return self._cw.execute(rql, kwargs) - except Unauthorized: - return False - # presentation utilities ################################################## def name(self): diff -r aa30c665bd06 -r f2a5805615f8 entities/schemaobjs.py --- a/entities/schemaobjs.py Fri Sep 16 16:42:42 2011 +0200 +++ b/entities/schemaobjs.py Fri Sep 16 17:30:26 2011 +0200 @@ -176,13 +176,3 @@ def check_expression(self, *args, **kwargs): return self._rqlexpr().check(*args, **kwargs) - - -class CWPermission(AnyEntity): - __regid__ = 'CWPermission' - fetch_attrs, fetch_order = fetch_config(['name', 'label']) - - def dc_title(self): - if self.label: - return '%s (%s)' % (self._cw._(self.name), self.label) - return self._cw._(self.name) diff -r aa30c665bd06 -r f2a5805615f8 entities/test/unittest_base.py --- a/entities/test/unittest_base.py Fri Sep 16 16:42:42 2011 +0200 +++ b/entities/test/unittest_base.py Fri Sep 16 17:30:26 2011 +0200 @@ -88,10 +88,10 @@ def test_matching_groups(self): e = self.execute('CWUser X WHERE X login "admin"').get_entity(0, 0) - self.failUnless(e.matching_groups('managers')) - self.failIf(e.matching_groups('xyz')) - self.failUnless(e.matching_groups(('xyz', 'managers'))) - self.failIf(e.matching_groups(('xyz', 'abcd'))) + self.assertTrue(e.matching_groups('managers')) + self.assertFalse(e.matching_groups('xyz')) + self.assertTrue(e.matching_groups(('xyz', 'managers'))) + self.assertFalse(e.matching_groups(('xyz', 'abcd'))) def test_dc_title_and_name(self): e = self.execute('CWUser U WHERE U login "member"').get_entity(0, 0) @@ -131,12 +131,12 @@ self.vreg['etypes'].initialization_completed() MyUser_ = self.vreg['etypes'].etype_class('CWUser') # a copy is done systematically - self.failUnless(issubclass(MyUser_, MyUser)) - self.failUnless(implements(MyUser_, IMileStone)) - self.failUnless(implements(MyUser_, ICalendarable)) + self.assertTrue(issubclass(MyUser_, MyUser)) + self.assertTrue(implements(MyUser_, IMileStone)) + self.assertTrue(implements(MyUser_, ICalendarable)) # original class should not have beed modified, only the copy - self.failUnless(implements(MyUser, IMileStone)) - self.failIf(implements(MyUser, ICalendarable)) + self.assertTrue(implements(MyUser, IMileStone)) + self.assertFalse(implements(MyUser, ICalendarable)) class SpecializedEntityClassesTC(CubicWebTC): @@ -149,7 +149,7 @@ def test_etype_class_selection_and_specialization(self): # no specific class for Subdivisions, the default one should be selected eclass = self.select_eclass('SubDivision') - self.failUnless(eclass.__autogenerated__) + self.assertTrue(eclass.__autogenerated__) #self.assertEqual(eclass.__bases__, (AnyEntity,)) # build class from most generic to most specific and make # sure the most specific is always selected @@ -159,8 +159,8 @@ __regid__ = etype self.vreg.register(Foo) eclass = self.select_eclass('SubDivision') - self.failUnless(eclass.__autogenerated__) - self.failIf(eclass is Foo) + self.assertTrue(eclass.__autogenerated__) + self.assertFalse(eclass is Foo) if etype == 'SubDivision': self.assertEqual(eclass.__bases__, (Foo,)) else: diff -r aa30c665bd06 -r f2a5805615f8 entities/test/unittest_wfobjs.py --- a/entities/test/unittest_wfobjs.py Fri Sep 16 16:42:42 2011 +0200 +++ b/entities/test/unittest_wfobjs.py Fri Sep 16 17:30:26 2011 +0200 @@ -567,7 +567,7 @@ # test initial state is set rset = self.execute('Any N WHERE S name N, X in_state S, X eid %(x)s', {'x' : ueid}) - self.failIf(rset, rset.rows) + self.assertFalse(rset, rset.rows) self.commit() initialstate = self.execute('Any N WHERE S name N, X in_state S, X eid %(x)s', {'x' : ueid})[0][0] diff -r aa30c665bd06 -r f2a5805615f8 entity.py --- a/entity.py Fri Sep 16 16:42:42 2011 +0200 +++ b/entity.py Fri Sep 16 17:30:26 2011 +0200 @@ -27,6 +27,9 @@ from logilab.mtconverter import TransformData, TransformError, xml_escape from rql.utils import rqlvar_maker +from rql.stmts import Select +from rql.nodes import (Not, VariableRef, Constant, make_relation, + Relation as RqlRelation) from cubicweb import Unauthorized, typed_eid, neg_role from cubicweb.rset import ResultSet @@ -62,23 +65,6 @@ return True -def remove_ambiguous_rels(attr_set, subjtypes, schema): - '''remove from `attr_set` the relations of entity types `subjtypes` that have - different entity type sets as target''' - for attr in attr_set.copy(): - rschema = schema.rschema(attr) - if rschema.final: - continue - ttypes = None - for subjtype in subjtypes: - cur_ttypes = rschema.objects(subjtype) - if ttypes is None: - ttypes = cur_ttypes - elif cur_ttypes != ttypes: - attr_set.remove(attr) - break - - class Entity(AppObject): """an entity instance has e_schema automagically set on the class and instances has access to their issuing cursor. @@ -96,11 +82,12 @@ If None is specified, the first non-meta attribute will be used - :type skip_copy_for: list - :cvar skip_copy_for: a list of relations that should be skipped when copying - this kind of entity. Note that some relations such - as composite relations or relations that have '?1' as object - cardinality are always skipped. + :type cw_skip_copy_for: list + :cvar cw_skip_copy_for: a list of couples (rtype, role) for each relation + that should be skipped when copying + this kind of entity. Note that some relations such + as composite relations or relations that have '?1' as object + cardinality are always skipped. """ __registry__ = 'etypes' __select__ = yes() @@ -108,7 +95,8 @@ # class attributes that must be set in class definition rest_attr = None fetch_attrs = None - skip_copy_for = ('in_state',) # XXX turn into a set + skip_copy_for = () # bw compat (< 3.14), use cw_skip_copy_for instead + cw_skip_copy_for = [('in_state', 'subject')] # class attributes set automatically at registration time e_schema = None @@ -173,30 +161,74 @@ @classmethod def fetch_rql(cls, user, restriction=None, fetchattrs=None, mainvar='X', settype=True, ordermethod='fetch_order'): - """return a rql to fetch all entities of the class type""" - # XXX update api and implementation to AST manipulation (see unrelated rql) - restrictions = restriction or [] - if settype: - restrictions.append('%s is %s' % (mainvar, cls.__regid__)) - if fetchattrs is None: - fetchattrs = cls.fetch_attrs - selection = [mainvar] - orderby = [] - # start from 26 to avoid possible conflicts with X - # XXX not enough to be sure it'll be no conflicts - varmaker = rqlvar_maker(index=26) - cls._fetch_restrictions(mainvar, varmaker, fetchattrs, selection, - orderby, restrictions, user, ordermethod) - rql = 'Any %s' % ','.join(selection) - if orderby: - rql += ' ORDERBY %s' % ','.join(orderby) - rql += ' WHERE %s' % ', '.join(restrictions) + st = cls.fetch_rqlst(user, mainvar=mainvar, fetchattrs=fetchattrs, + settype=settype, ordermethod=ordermethod) + rql = st.as_string() + if restriction: + # cannot use RQLRewriter API to insert 'X rtype %(x)s' restriction + warn('[3.14] fetch_rql: use of `restriction` parameter is ' + 'deprecated, please use fetch_rqlst and supply a syntax' + 'tree with your restriction instead', DeprecationWarning) + insert = ' WHERE ' + ','.join(restriction) + if ' WHERE ' in rql: + select, where = rql.split(' WHERE ', 1) + rql = select + insert + ',' + where + else: + rql += insert return rql @classmethod - def _fetch_restrictions(cls, mainvar, varmaker, fetchattrs, - selection, orderby, restrictions, user, - ordermethod='fetch_order', visited=None): + def fetch_rqlst(cls, user, select=None, mainvar='X', fetchattrs=None, + settype=True, ordermethod='fetch_order'): + if select is None: + select = Select() + mainvar = select.get_variable(mainvar) + select.add_selected(mainvar) + elif isinstance(mainvar, basestring): + assert mainvar in select.defined_vars + mainvar = select.get_variable(mainvar) + # eases string -> syntax tree test transition: please remove once stable + select._varmaker = rqlvar_maker(defined=select.defined_vars, + aliases=select.aliases, index=26) + if settype: + select.add_type_restriction(mainvar, cls.__regid__) + if fetchattrs is None: + fetchattrs = cls.fetch_attrs + cls._fetch_restrictions(mainvar, select, fetchattrs, user, ordermethod) + return select + + @classmethod + def _fetch_ambiguous_rtypes(cls, select, var, fetchattrs, subjtypes, schema): + """find rtypes in `fetchattrs` that relate different subject etypes + taken from (`subjtypes`) to different target etypes; these so called + "ambiguous" relations, are added directly to the `select` syntax tree + selection but removed from `fetchattrs` to avoid the fetch recursion + because we have to choose only one targettype for the recursion and + adding its own fetch attrs to the selection -when we recurse- would + filter out the other possible target types from the result set + """ + for attr in fetchattrs.copy(): + rschema = schema.rschema(attr) + if rschema.final: + continue + ttypes = None + for subjtype in subjtypes: + cur_ttypes = set(rschema.objects(subjtype)) + if ttypes is None: + ttypes = cur_ttypes + elif cur_ttypes != ttypes: + # we found an ambiguous relation: remove it from fetchattrs + fetchattrs.remove(attr) + # ... and add it to the selection + targetvar = select.make_variable() + select.add_selected(targetvar) + rel = make_relation(var, attr, (targetvar,), VariableRef) + select.add_restriction(rel) + break + + @classmethod + def _fetch_restrictions(cls, mainvar, select, fetchattrs, + user, ordermethod='fetch_order', visited=None): eschema = cls.e_schema if visited is None: visited = set((eschema.type,)) @@ -216,40 +248,39 @@ rdef = eschema.rdef(attr) if not user.matching_groups(rdef.get_groups('read')): continue - var = varmaker.next() - selection.append(var) - restriction = '%s %s %s' % (mainvar, attr, var) - restrictions.append(restriction) + if rschema.final or rdef.cardinality[0] in '?1': + var = select.make_variable() + select.add_selected(var) + rel = make_relation(mainvar, attr, (var,), VariableRef) + select.add_restriction(rel) + else: + cls.warning('bad relation %s specified in fetch attrs for %s', + attr, cls) + continue if not rschema.final: - card = rdef.cardinality[0] - if card not in '?1': - cls.warning('bad relation %s specified in fetch attrs for %s', - attr, cls) - selection.pop() - restrictions.pop() - continue # XXX we need outer join in case the relation is not mandatory # (card == '?') *or if the entity is being added*, since in # that case the relation may still be missing. As we miss this # later information here, systematically add it. - restrictions[-1] += '?' + rel.change_optional('right') targettypes = rschema.objects(eschema.type) - # XXX user._cw.vreg iiiirk - etypecls = user._cw.vreg['etypes'].etype_class(targettypes[0]) + vreg = user._cw.vreg # XXX user._cw.vreg iiiirk + etypecls = vreg['etypes'].etype_class(targettypes[0]) if len(targettypes) > 1: # find fetch_attrs common to all destination types - fetchattrs = user._cw.vreg['etypes'].fetch_attrs(targettypes) - remove_ambiguous_rels(fetchattrs, targettypes, user._cw.vreg.schema) + fetchattrs = vreg['etypes'].fetch_attrs(targettypes) + # ... and handle ambiguous relations + cls._fetch_ambiguous_rtypes(select, var, fetchattrs, + targettypes, vreg.schema) else: fetchattrs = etypecls.fetch_attrs - etypecls._fetch_restrictions(var, varmaker, fetchattrs, - selection, orderby, restrictions, + etypecls._fetch_restrictions(var, select, fetchattrs, user, ordermethod, visited=visited) if ordermethod is not None: - orderterm = getattr(cls, ordermethod)(attr, var) + orderterm = getattr(cls, ordermethod)(attr, var.name) if orderterm: - orderby.append(orderterm) - return selection, orderby, restrictions + var, order = orderterm.split() + select.add_sort_var(select.get_variable(var), order=='ASC') @classmethod @cached @@ -542,13 +573,22 @@ """ assert self.has_eid() execute = self._cw.execute + skip_copy_for = {'subject': set(), 'object': set()} + for rtype in self.skip_copy_for: + skip_copy_for['subject'].add(rtype) + warn('[3.14] skip_copy_for on entity classes (%s) is deprecated, ' + 'use cw_skip_for instead with list of couples (rtype, role)' % self.__regid__, + DeprecationWarning) + for rtype, role in self.cw_skip_copy_for: + assert role in ('subject', 'object'), role + skip_copy_for[role].add(rtype) for rschema in self.e_schema.subject_relations(): if rschema.final or rschema.meta: continue # skip already defined relations if getattr(self, rschema.type): continue - if rschema.type in self.skip_copy_for: + if rschema.type in skip_copy_for['subject']: continue # skip composite relation rdef = self.e_schema.rdef(rschema) @@ -568,6 +608,8 @@ # skip already defined relations if self.related(rschema.type, 'object'): continue + if rschema.type in skip_copy_for['object']: + continue rdef = self.e_schema.rdef(rschema, 'object') # skip composite relation if rdef.composite: @@ -729,6 +771,7 @@ :param limit: resultset's maximum size :param entities: if True, the entites are returned; if False, a result set is returned """ + rtype = str(rtype) try: return self._cw_relation_cache(rtype, role, entities, limit) except KeyError: @@ -743,51 +786,61 @@ return self.related(rtype, role, limit, entities) def cw_related_rql(self, rtype, role='subject', targettypes=None): - rschema = self._cw.vreg.schema[rtype] + vreg = self._cw.vreg + rschema = vreg.schema[rtype] + select = Select() + mainvar, evar = select.get_variable('X'), select.get_variable('E') + select.add_selected(mainvar) + select.add_eid_restriction(evar, 'x', 'Substitute') if role == 'subject': - restriction = 'E eid %%(x)s, E %s X' % rtype + rel = make_relation(evar, rtype, (mainvar,), VariableRef) + select.add_restriction(rel) if targettypes is None: targettypes = rschema.objects(self.e_schema) else: - restriction += ', X is IN (%s)' % ','.join(targettypes) - card = greater_card(rschema, (self.e_schema,), targettypes, 0) + select.add_constant_restriction(mainvar, 'is', + targettypes, 'etype') + gcard = greater_card(rschema, (self.e_schema,), targettypes, 0) else: - restriction = 'E eid %%(x)s, X %s E' % rtype + rel = make_relation(mainvar, rtype, (evar,), VariableRef) + select.add_restriction(rel) if targettypes is None: targettypes = rschema.subjects(self.e_schema) else: - restriction += ', X is IN (%s)' % ','.join(targettypes) - card = greater_card(rschema, targettypes, (self.e_schema,), 1) - etypecls = self._cw.vreg['etypes'].etype_class(targettypes[0]) + select.add_constant_restriction(mainvar, 'is', targettypes, + 'String') + gcard = greater_card(rschema, targettypes, (self.e_schema,), 1) + etypecls = vreg['etypes'].etype_class(targettypes[0]) if len(targettypes) > 1: - fetchattrs = self._cw.vreg['etypes'].fetch_attrs(targettypes) - # XXX we should fetch ambiguous relation objects too but not - # recurse on them in _fetch_restrictions; it is easier to remove - # them completely for now, as it would require an deeper api rewrite - remove_ambiguous_rels(fetchattrs, targettypes, self._cw.vreg.schema) + fetchattrs = vreg['etypes'].fetch_attrs(targettypes) + self._fetch_ambiguous_rtypes(select, mainvar, fetchattrs, + targettypes, vreg.schema) else: fetchattrs = etypecls.fetch_attrs - rql = etypecls.fetch_rql(self._cw.user, [restriction], fetchattrs, - settype=False) + etypecls.fetch_rqlst(self._cw.user, select, mainvar, fetchattrs, + settype=False) # optimisation: remove ORDERBY if cardinality is 1 or ? (though # greater_card return 1 for those both cases) - if card == '1': - if ' ORDERBY ' in rql: - rql = '%s WHERE %s' % (rql.split(' ORDERBY ', 1)[0], - rql.split(' WHERE ', 1)[1]) - elif not ' ORDERBY ' in rql: - args = rql.split(' WHERE ', 1) - # if modification_date already retreived, we should use it instead - # of adding another variable for sort. This should be be problematic - # but it's actually with sqlserver, see ticket #694445 - if 'X modification_date ' in args[1]: - var = args[1].split('X modification_date ', 1)[1].split(',', 1)[0] - args.insert(1, var.strip()) - rql = '%s ORDERBY %s DESC WHERE %s' % tuple(args) + if gcard == '1': + select.remove_sort_terms() + elif not select.orderby: + # if modification_date is already retrieved, we use it instead + # of adding another variable for sorting. This should not be + # problematic, but it is with sqlserver, see ticket #694445 + for rel in select.where.get_nodes(RqlRelation): + if (rel.r_type == 'modification_date' + and rel.children[0].variable == mainvar + and rel.children[1].operator == '='): + var = rel.children[1].children[0].variable + select.add_sort_var(var, asc=False) + break else: - rql = '%s ORDERBY Z DESC WHERE X modification_date Z, %s' % \ - tuple(args) - return rql + mdvar = select.make_variable() + rel = make_relation(mainvar, 'modification_date', + (mdvar,), VariableRef) + select.add_restriction(rel) + select.add_sort_var(mdvar, asc=False) + return select.as_string() # generic vocabulary methods ############################################## @@ -804,33 +857,36 @@ rtype = self._cw.vreg.schema.rschema(rtype) rdef = rtype.role_rdef(self.e_schema, targettype, role) rewriter = RQLRewriter(self._cw) + select = Select() # initialize some variables according to the `role` of `self` in the - # relation: - # * variable for myself (`evar`) and searched entities (`searchvedvar`) - # * entity type of the subject (`subjtype`) and of the object - # (`objtype`) of the relation + # relation (variable names must respect constraints conventions): + # * variable for myself (`evar`) + # * variable for searched entities (`searchvedvar`) if role == 'subject': - evar, searchedvar = 'S', 'O' - subjtype, objtype = self.e_schema, targettype + evar = subjvar = select.get_variable('S') + searchedvar = objvar = select.get_variable('O') else: - searchedvar, evar = 'S', 'O' - objtype, subjtype = self.e_schema, targettype + searchedvar = subjvar = select.get_variable('S') + evar = objvar = select.get_variable('O') + select.add_selected(searchedvar) # initialize some variables according to `self` existance if rdef.role_cardinality(neg_role(role)) in '?1': # if cardinality in '1?', we want a target entity which isn't # already linked using this relation - if searchedvar == 'S': - restriction = ['NOT S %s ZZ' % rtype] + var = select.get_variable('ZZ') # XXX unname when tests pass + if role == 'subject': + rel = make_relation(var, rtype.type, (searchedvar,), VariableRef) else: - restriction = ['NOT ZZ %s O' % rtype] + rel = make_relation(searchedvar, rtype.type, (var,), VariableRef) + select.add_restriction(Not(rel)) elif self.has_eid(): # elif we have an eid, we don't want a target entity which is # already linked to ourself through this relation - restriction = ['NOT S %s O' % rtype] - else: - restriction = [] + rel = make_relation(subjvar, rtype.type, (objvar,), VariableRef) + select.add_restriction(Not(rel)) if self.has_eid(): - restriction += ['%s eid %%(x)s' % evar] + rel = make_relation(evar, 'eid', ('x', 'Substitute'), Constant) + select.add_restriction(rel) args = {'x': self.eid} if role == 'subject': sec_check_args = {'fromeid': self.eid} @@ -840,12 +896,15 @@ else: args = {} sec_check_args = {} - existant = searchedvar - # retreive entity class for targettype to compute base rql + existant = searchedvar.name + # undefine unused evar, or the type resolver will consider it + select.undefine_variable(evar) + # retrieve entity class for targettype to compute base rql etypecls = self._cw.vreg['etypes'].etype_class(targettype) - rql = etypecls.fetch_rql(self._cw.user, restriction, - mainvar=searchedvar, ordermethod=ordermethod) - select = self._cw.vreg.parse(self._cw, rql, args).children[0] + etypecls.fetch_rqlst(self._cw.user, select, searchedvar, + ordermethod=ordermethod) + # from now on, we need variable type resolving + self._cw.vreg.solutions(self._cw, select, args) # insert RQL expressions for schema constraints into the rql syntax tree if vocabconstraints: # RQLConstraint is a subclass for RQLVocabularyConstraint, so they @@ -855,12 +914,12 @@ cstrcls = RQLConstraint for cstr in rdef.constraints: # consider constraint.mainvars to check if constraint apply - if isinstance(cstr, cstrcls) and searchedvar in cstr.mainvars: - if not self.has_eid() and evar in cstr.mainvars: + if isinstance(cstr, cstrcls) and searchedvar.name in cstr.mainvars: + if not self.has_eid() and evar.name in cstr.mainvars: continue # compute a varmap suitable to RQLRewriter.rewrite argument - varmap = dict((v, v) for v in 'SO' if v in select.defined_vars - and v in cstr.mainvars) + varmap = dict((v, v) for v in (searchedvar.name, evar.name) + if v in select.defined_vars and v in cstr.mainvars) # rewrite constraint by constraint since we want a AND between # expressions. rewriter.rewrite(select, [(varmap, (cstr,))], select.solutions, @@ -870,13 +929,14 @@ rqlexprs = rdef.get_rqlexprs('add') if rqlexprs and not rdef.has_perm(self._cw, 'add', **sec_check_args): # compute a varmap suitable to RQLRewriter.rewrite argument - varmap = dict((v, v) for v in 'SO' if v in select.defined_vars) + varmap = dict((v, v) for v in (searchedvar.name, evar.name) + if v in select.defined_vars) # rewrite all expressions at once since we want a OR between them. rewriter.rewrite(select, [(varmap, rqlexprs)], select.solutions, args, existant) # ensure we have an order defined if not select.orderby: - select.add_sort_var(select.defined_vars[searchedvar]) + select.add_sort_var(select.defined_vars[searchedvar.name]) # we're done, turn the rql syntax tree as a string rql = select.as_string() return rql, args diff -r aa30c665bd06 -r f2a5805615f8 etwist/test/unittest_server.py --- a/etwist/test/unittest_server.py Fri Sep 16 16:42:42 2011 +0200 +++ b/etwist/test/unittest_server.py Fri Sep 16 17:30:26 2011 +0200 @@ -69,7 +69,7 @@ def test_cache(self): concat = ConcatFiles(self.config, ('cubicweb.ajax.js', 'jquery.js')) - self.failUnless(osp.isfile(concat.path)) + self.assertTrue(osp.isfile(concat.path)) def test_404(self): # when not in debug mode, should not crash diff -r aa30c665bd06 -r f2a5805615f8 hooks/test/unittest_bookmarks.py --- a/hooks/test/unittest_bookmarks.py Fri Sep 16 16:42:42 2011 +0200 +++ b/hooks/test/unittest_bookmarks.py Fri Sep 16 17:30:26 2011 +0200 @@ -28,10 +28,10 @@ self.commit() self.execute('DELETE X bookmarked_by U WHERE U login "admin"') self.commit() - self.failUnless(self.execute('Any X WHERE X eid %(x)s', {'x': beid})) + self.assertTrue(self.execute('Any X WHERE X eid %(x)s', {'x': beid})) self.execute('DELETE X bookmarked_by U WHERE U login "anon"') self.commit() - self.failIf(self.execute('Any X WHERE X eid %(x)s', {'x': beid})) + self.assertFalse(self.execute('Any X WHERE X eid %(x)s', {'x': beid})) if __name__ == '__main__': unittest_main() diff -r aa30c665bd06 -r f2a5805615f8 hooks/test/unittest_hooks.py --- a/hooks/test/unittest_hooks.py Fri Sep 16 16:42:42 2011 +0200 +++ b/hooks/test/unittest_hooks.py Fri Sep 16 17:30:26 2011 +0200 @@ -117,7 +117,7 @@ self.repo.connect, u'toto', password='hop') self.commit() cnxid = self.repo.connect(u'toto', password='hop') - self.failIfEqual(cnxid, self.session.id) + self.assertNotEqual(cnxid, self.session.id) self.execute('DELETE CWUser X WHERE X login "toto"') self.repo.execute(cnxid, 'State X') self.commit() @@ -151,7 +151,7 @@ eid = self.execute('INSERT EmailAddress X: X address "toto@logilab.fr"')[0][0] self.execute('DELETE EmailAddress X WHERE X eid %s' % eid) self.commit() - self.failIf(self.execute('Any X WHERE X created_by Y, X eid >= %(x)s', {'x': eid})) + self.assertFalse(self.execute('Any X WHERE X created_by Y, X eid >= %(x)s', {'x': eid})) diff -r aa30c665bd06 -r f2a5805615f8 hooks/test/unittest_integrity.py --- a/hooks/test/unittest_integrity.py Fri Sep 16 16:42:42 2011 +0200 +++ b/hooks/test/unittest_integrity.py Fri Sep 16 17:30:26 2011 +0200 @@ -62,7 +62,7 @@ self.execute('INSERT EmailPart X: X content_format "text/plain", X ordernum 1, X content "this is a test"') self.execute('INSERT Email X: X messageid "<1234>", X subject "test", X sender Y, X recipients Y, X parts P ' 'WHERE Y is EmailAddress, P is EmailPart') - self.failUnless(self.execute('Email X WHERE X sender Y')) + self.assertTrue(self.execute('Email X WHERE X sender Y')) self.commit() self.execute('DELETE Email X') rset = self.execute('Any X WHERE X is EmailPart') diff -r aa30c665bd06 -r f2a5805615f8 hooks/test/unittest_syncschema.py --- a/hooks/test/unittest_syncschema.py Fri Sep 16 16:42:42 2011 +0200 +++ b/hooks/test/unittest_syncschema.py Fri Sep 16 17:30:26 2011 +0200 @@ -60,18 +60,18 @@ self.session.set_cnxset() dbhelper = self.session.cnxset.source('system').dbhelper sqlcursor = self.session.cnxset['system'] - self.failIf(schema.has_entity('Societe2')) - self.failIf(schema.has_entity('concerne2')) + self.assertFalse(schema.has_entity('Societe2')) + self.assertFalse(schema.has_entity('concerne2')) # schema should be update on insertion (after commit) eeid = self.execute('INSERT CWEType X: X name "Societe2", X description "", X final FALSE')[0][0] self._set_perms(eeid) self.execute('INSERT CWRType X: X name "concerne2", X description "", X final FALSE, X symmetric FALSE') - self.failIf(schema.has_entity('Societe2')) - self.failIf(schema.has_entity('concerne2')) + self.assertFalse(schema.has_entity('Societe2')) + self.assertFalse(schema.has_entity('concerne2')) # have to commit before adding definition relations self.commit() - self.failUnless(schema.has_entity('Societe2')) - self.failUnless(schema.has_relation('concerne2')) + self.assertTrue(schema.has_entity('Societe2')) + self.assertTrue(schema.has_relation('concerne2')) attreid = self.execute('INSERT CWAttribute X: X cardinality "11", X defaultval "noname", ' ' X indexed TRUE, X relation_type RT, X from_entity E, X to_entity F ' 'WHERE RT name "name", E name "Societe2", F name "String"')[0][0] @@ -80,13 +80,13 @@ 'INSERT CWRelation X: X cardinality "**", X relation_type RT, X from_entity E, X to_entity E ' 'WHERE RT name "concerne2", E name "Societe2"')[0][0] self._set_perms(concerne2_rdef_eid) - self.failIf('name' in schema['Societe2'].subject_relations()) - self.failIf('concerne2' in schema['Societe2'].subject_relations()) - self.failIf(self.index_exists('Societe2', 'name')) + self.assertFalse('name' in schema['Societe2'].subject_relations()) + self.assertFalse('concerne2' in schema['Societe2'].subject_relations()) + self.assertFalse(self.index_exists('Societe2', 'name')) self.commit() - self.failUnless('name' in schema['Societe2'].subject_relations()) - self.failUnless('concerne2' in schema['Societe2'].subject_relations()) - self.failUnless(self.index_exists('Societe2', 'name')) + self.assertTrue('name' in schema['Societe2'].subject_relations()) + self.assertTrue('concerne2' in schema['Societe2'].subject_relations()) + self.assertTrue(self.index_exists('Societe2', 'name')) # now we should be able to insert and query Societe2 s2eid = self.execute('INSERT Societe2 X: X name "logilab"')[0][0] self.execute('Societe2 X WHERE X name "logilab"') @@ -101,20 +101,20 @@ self.commit() self.execute('DELETE CWRelation X WHERE X eid %(x)s', {'x': concerne2_rdef_eid}) self.commit() - self.failUnless('concerne2' in schema['CWUser'].subject_relations()) - self.failIf('concerne2' in schema['Societe2'].subject_relations()) - self.failIf(self.execute('Any X WHERE X concerne2 Y')) + self.assertTrue('concerne2' in schema['CWUser'].subject_relations()) + self.assertFalse('concerne2' in schema['Societe2'].subject_relations()) + self.assertFalse(self.execute('Any X WHERE X concerne2 Y')) # schema should be cleaned on delete (after commit) self.execute('DELETE CWEType X WHERE X name "Societe2"') self.execute('DELETE CWRType X WHERE X name "concerne2"') - self.failUnless(self.index_exists('Societe2', 'name')) - self.failUnless(schema.has_entity('Societe2')) - self.failUnless(schema.has_relation('concerne2')) + self.assertTrue(self.index_exists('Societe2', 'name')) + self.assertTrue(schema.has_entity('Societe2')) + self.assertTrue(schema.has_relation('concerne2')) self.commit() - self.failIf(self.index_exists('Societe2', 'name')) - self.failIf(schema.has_entity('Societe2')) - self.failIf(schema.has_entity('concerne2')) - self.failIf('concerne2' in schema['CWUser'].subject_relations()) + self.assertFalse(self.index_exists('Societe2', 'name')) + self.assertFalse(schema.has_entity('Societe2')) + self.assertFalse(schema.has_entity('concerne2')) + self.assertFalse('concerne2' in schema['CWUser'].subject_relations()) def test_is_instance_of_insertions(self): seid = self.execute('INSERT Transition T: T name "subdiv"')[0][0] @@ -123,15 +123,15 @@ instanceof_etypes = [etype for etype, in self.execute('Any ETN WHERE X eid %s, X is_instance_of ET, ET name ETN' % seid)] self.assertEqual(sorted(instanceof_etypes), ['BaseTransition', 'Transition']) snames = [name for name, in self.execute('Any N WHERE S is BaseTransition, S name N')] - self.failIf('subdiv' in snames) + self.assertFalse('subdiv' in snames) snames = [name for name, in self.execute('Any N WHERE S is_instance_of BaseTransition, S name N')] - self.failUnless('subdiv' in snames) + self.assertTrue('subdiv' in snames) def test_perms_synchronization_1(self): schema = self.repo.schema self.assertEqual(schema['CWUser'].get_groups('read'), set(('managers', 'users'))) - self.failUnless(self.execute('Any X, Y WHERE X is CWEType, X name "CWUser", Y is CWGroup, Y name "users"')[0]) + self.assertTrue(self.execute('Any X, Y WHERE X is CWEType, X name "CWUser", Y is CWGroup, Y name "users"')[0]) self.execute('DELETE X read_permission Y WHERE X is CWEType, X name "CWUser", Y name "users"') self.assertEqual(schema['CWUser'].get_groups('read'), set(('managers', 'users', ))) self.commit() @@ -173,13 +173,13 @@ self.session.set_cnxset() dbhelper = self.session.cnxset.source('system').dbhelper sqlcursor = self.session.cnxset['system'] - self.failUnless(self.schema['state_of'].inlined) + self.assertTrue(self.schema['state_of'].inlined) try: self.execute('SET X inlined FALSE WHERE X name "state_of"') - self.failUnless(self.schema['state_of'].inlined) + self.assertTrue(self.schema['state_of'].inlined) self.commit() - self.failIf(self.schema['state_of'].inlined) - self.failIf(self.index_exists('State', 'state_of')) + self.assertFalse(self.schema['state_of'].inlined) + self.assertFalse(self.index_exists('State', 'state_of')) rset = self.execute('Any X, Y WHERE X state_of Y') self.assertEqual(len(rset), 2) # user states except: @@ -187,10 +187,10 @@ traceback.print_exc() finally: self.execute('SET X inlined TRUE WHERE X name "state_of"') - self.failIf(self.schema['state_of'].inlined) + self.assertFalse(self.schema['state_of'].inlined) self.commit() - self.failUnless(self.schema['state_of'].inlined) - self.failUnless(self.index_exists('State', 'state_of')) + self.assertTrue(self.schema['state_of'].inlined) + self.assertTrue(self.index_exists('State', 'state_of')) rset = self.execute('Any X, Y WHERE X state_of Y') self.assertEqual(len(rset), 2) @@ -200,18 +200,18 @@ sqlcursor = self.session.cnxset['system'] try: self.execute('SET X indexed FALSE WHERE X relation_type R, R name "name"') - self.failUnless(self.schema['name'].rdef('Workflow', 'String').indexed) - self.failUnless(self.index_exists('Workflow', 'name')) + self.assertTrue(self.schema['name'].rdef('Workflow', 'String').indexed) + self.assertTrue(self.index_exists('Workflow', 'name')) self.commit() - self.failIf(self.schema['name'].rdef('Workflow', 'String').indexed) - self.failIf(self.index_exists('Workflow', 'name')) + self.assertFalse(self.schema['name'].rdef('Workflow', 'String').indexed) + self.assertFalse(self.index_exists('Workflow', 'name')) finally: self.execute('SET X indexed TRUE WHERE X relation_type R, R name "name"') - self.failIf(self.schema['name'].rdef('Workflow', 'String').indexed) - self.failIf(self.index_exists('Workflow', 'name')) + self.assertFalse(self.schema['name'].rdef('Workflow', 'String').indexed) + self.assertFalse(self.index_exists('Workflow', 'name')) self.commit() - self.failUnless(self.schema['name'].rdef('Workflow', 'String').indexed) - self.failUnless(self.index_exists('Workflow', 'name')) + self.assertTrue(self.schema['name'].rdef('Workflow', 'String').indexed) + self.assertTrue(self.index_exists('Workflow', 'name')) def test_unique_change(self): self.session.set_cnxset() @@ -221,20 +221,20 @@ self.execute('INSERT CWConstraint X: X cstrtype CT, DEF constrained_by X ' 'WHERE CT name "UniqueConstraint", DEF relation_type RT, DEF from_entity E,' 'RT name "name", E name "Workflow"') - self.failIf(self.schema['Workflow'].has_unique_values('name')) - self.failIf(self.index_exists('Workflow', 'name', unique=True)) + self.assertFalse(self.schema['Workflow'].has_unique_values('name')) + self.assertFalse(self.index_exists('Workflow', 'name', unique=True)) self.commit() - self.failUnless(self.schema['Workflow'].has_unique_values('name')) - self.failUnless(self.index_exists('Workflow', 'name', unique=True)) + self.assertTrue(self.schema['Workflow'].has_unique_values('name')) + self.assertTrue(self.index_exists('Workflow', 'name', unique=True)) finally: self.execute('DELETE DEF constrained_by X WHERE X cstrtype CT, ' 'CT name "UniqueConstraint", DEF relation_type RT, DEF from_entity E,' 'RT name "name", E name "Workflow"') - self.failUnless(self.schema['Workflow'].has_unique_values('name')) - self.failUnless(self.index_exists('Workflow', 'name', unique=True)) + self.assertTrue(self.schema['Workflow'].has_unique_values('name')) + self.assertTrue(self.index_exists('Workflow', 'name', unique=True)) self.commit() - self.failIf(self.schema['Workflow'].has_unique_values('name')) - self.failIf(self.index_exists('Workflow', 'name', unique=True)) + self.assertFalse(self.schema['Workflow'].has_unique_values('name')) + self.assertFalse(self.index_exists('Workflow', 'name', unique=True)) def test_required_change_1(self): self.execute('SET DEF cardinality "?1" ' @@ -267,8 +267,8 @@ {'x': attreid}) self.commit() self.schema.rebuild_infered_relations() - self.failUnless('Transition' in self.schema['messageid'].subjects()) - self.failUnless('WorkflowTransition' in self.schema['messageid'].subjects()) + self.assertTrue('Transition' in self.schema['messageid'].subjects()) + self.assertTrue('WorkflowTransition' in self.schema['messageid'].subjects()) self.execute('Any X WHERE X is_instance_of BaseTransition, X messageid "hop"') def test_change_fulltextindexed(self): @@ -283,7 +283,7 @@ 'A from_entity E, A relation_type R, R name "subject"') self.commit() rset = req.execute('Any X WHERE X has_text "rick.roll"') - self.failIf(rset) + self.assertFalse(rset) assert req.execute('SET A fulltextindexed TRUE ' 'WHERE A from_entity E, A relation_type R, ' 'E name "Email", R name "subject"') diff -r aa30c665bd06 -r f2a5805615f8 misc/migration/bootstrapmigration_repository.py --- a/misc/migration/bootstrapmigration_repository.py Fri Sep 16 16:42:42 2011 +0200 +++ b/misc/migration/bootstrapmigration_repository.py Fri Sep 16 17:30:26 2011 +0200 @@ -41,6 +41,15 @@ 'FROM cw_CWSource, cw_source_relation ' 'WHERE entities.eid=cw_source_relation.eid_from AND cw_source_relation.eid_to=cw_CWSource.cw_eid') +if applcubicwebversion <= (3, 14, 0) and cubicwebversion >= (3, 14, 0): + if 'require_permission' in schema and not 'localperms'in repo.config.cubes(): + from cubicweb import ExecutionError + try: + add_cube('localperms', update_database=False) + except ImportError: + raise ExecutionError('In cubicweb 3.14, CWPermission and related stuff ' + 'has been moved to cube localperms. Install it first.') + if applcubicwebversion == (3, 6, 0) and cubicwebversion >= (3, 6, 0): CSTRMAP = dict(rql('Any T, X WHERE X is CWConstraintType, X name T', ask_confirm=False)) diff -r aa30c665bd06 -r f2a5805615f8 misc/migration/postcreate.py --- a/misc/migration/postcreate.py Fri Sep 16 16:42:42 2011 +0200 +++ b/misc/migration/postcreate.py Fri Sep 16 17:30:26 2011 +0200 @@ -1,4 +1,4 @@ -# copyright 2003-2010 LOGILAB S.A. (Paris, FRANCE), all rights reserved. +# copyright 2003-2011 LOGILAB S.A. (Paris, FRANCE), all rights reserved. # contact http://www.logilab.fr/ -- mailto:contact@logilab.fr # # This file is part of CubicWeb. @@ -69,10 +69,3 @@ if value != default: rql('INSERT CWProperty X: X pkey %(k)s, X value %(v)s', {'k': key, 'v': value}) - -# add PERM_USE_TEMPLATE_FORMAT permission -from cubicweb.schema import PERM_USE_TEMPLATE_FORMAT -usetmplperm = create_entity('CWPermission', name=PERM_USE_TEMPLATE_FORMAT, - label=_('use template languages')) -rql('SET X require_group G WHERE G name "managers", X eid %(x)s', - {'x': usetmplperm.eid}) diff -r aa30c665bd06 -r f2a5805615f8 schema.py --- a/schema.py Fri Sep 16 16:42:42 2011 +0200 +++ b/schema.py Fri Sep 16 17:30:26 2011 +0200 @@ -59,12 +59,11 @@ 'from_state', 'to_state', 'condition', 'subworkflow', 'subworkflow_state', 'subworkflow_exit', )) -SYSTEM_RTYPES = set(('in_group', 'require_group', 'require_permission', +SYSTEM_RTYPES = set(('in_group', 'require_group', # cwproperty 'for_user', )) | WORKFLOW_RTYPES NO_I18NCONTEXT = META_RTYPES | WORKFLOW_RTYPES -NO_I18NCONTEXT.add('require_permission') SKIP_COMPOSITE_RELS = [('cw_source', 'subject')] @@ -85,7 +84,7 @@ 'WorkflowTransition', 'BaseTransition', 'SubWorkflowExitPoint')) -INTERNAL_TYPES = set(('CWProperty', 'CWPermission', 'CWCache', 'ExternalUri', +INTERNAL_TYPES = set(('CWProperty', 'CWCache', 'ExternalUri', 'CWSource', 'CWSourceHostConfig', 'CWSourceSchemaConfig')) @@ -1174,7 +1173,7 @@ # _() is just there to add messages to the catalog, don't care about actual # translation -PERM_USE_TEMPLATE_FORMAT = _('use_template_format') +MAY_USE_TEMPLATE_FORMAT = set(('managers',)) NEED_PERM_FORMATS = [_('text/cubicweb-page-template')] @monkeypatch(FormatConstraint) @@ -1189,9 +1188,9 @@ # cw is a server session hasperm = not cw.write_security or \ not cw.is_hook_category_activated('integrity') or \ - cw.user.has_permission(PERM_USE_TEMPLATE_FORMAT) + cw.user.matching_groups(MAY_USE_TEMPLATE_FORMAT) else: - hasperm = cw.user.has_permission(PERM_USE_TEMPLATE_FORMAT) + hasperm = cw.user.matching_groups(MAY_USE_TEMPLATE_FORMAT) if hasperm: return self.regular_formats + tuple(NEED_PERM_FORMATS) return self.regular_formats diff -r aa30c665bd06 -r f2a5805615f8 schemas/__init__.py --- a/schemas/__init__.py Fri Sep 16 16:42:42 2011 +0200 +++ b/schemas/__init__.py Fri Sep 16 17:30:26 2011 +0200 @@ -1,4 +1,4 @@ -# copyright 2003-2010 LOGILAB S.A. (Paris, FRANCE), all rights reserved. +# copyright 2003-2011 LOGILAB S.A. (Paris, FRANCE), all rights reserved. # contact http://www.logilab.fr/ -- mailto:contact@logilab.fr # # This file is part of CubicWeb. @@ -15,12 +15,10 @@ # # You should have received a copy of the GNU Lesser General Public License along # with CubicWeb. If not, see . -"""some utilities to define schema permissions +"""some constants and classes to define schema permissions""" -""" __docformat__ = "restructuredtext en" -from rql.utils import quote from cubicweb.schema import RO_REL_PERMS, RO_ATTR_PERMS, \ PUB_SYSTEM_ENTITY_PERMS, PUB_SYSTEM_REL_PERMS, \ ERQLExpression, RRQLExpression @@ -35,59 +33,19 @@ # execute, readable by anyone HOOKS_RTYPE_PERMS = RO_REL_PERMS # XXX deprecates -def _perm(names): - if isinstance(names, (list, tuple)): - if len(names) == 1: - names = quote(names[0]) - else: - names = 'IN (%s)' % (','.join(quote(name) for name in names)) - else: - names = quote(names) - #return u' require_permission P, P name %s, U in_group G, P require_group G' % names - return u' require_permission P, P name %s, U has_group_permission P' % names - -def xperm(*names): - return 'X' + _perm(names) - -def xexpr(*names): - return ERQLExpression(xperm(*names)) - -def xrexpr(relation, *names): - return ERQLExpression('X %s Y, Y %s' % (relation, _perm(names))) - -def xorexpr(relation, etype, *names): - return ERQLExpression('Y %s X, X is %s, Y %s' % (relation, etype, _perm(names))) - - -def sexpr(*names): - return RRQLExpression('S' + _perm(names), 'S') +from logilab.common.modutils import LazyObject +from logilab.common.deprecation import deprecated +class MyLazyObject(LazyObject): -def restricted_sexpr(restriction, *names): - rql = '%s, %s' % (restriction, 'S' + _perm(names)) - return RRQLExpression(rql, 'S') - -def restricted_oexpr(restriction, *names): - rql = '%s, %s' % (restriction, 'O' + _perm(names)) - return RRQLExpression(rql, 'O') - -def oexpr(*names): - return RRQLExpression('O' + _perm(names), 'O') - + def _getobj(self): + try: + return super(MyLazyObject, self)._getobj() + except ImportError: + raise ImportError('In cubicweb 3.14, function %s has been moved to ' + 'cube localperms. Install it first.' % self.obj) -# def supdate_perm(): -# return RRQLExpression('U has_update_permission S', 'S') - -# def oupdate_perm(): -# return RRQLExpression('U has_update_permission O', 'O') - -def relxperm(rel, role, *names): - assert role in ('subject', 'object') - if role == 'subject': - zxrel = ', X %s Z' % rel - else: - zxrel = ', Z %s X' % rel - return 'Z' + _perm(names) + zxrel - -def relxexpr(rel, role, *names): - return ERQLExpression(relxperm(rel, role, *names)) +for name in ('xperm', 'xexpr', 'xrexpr', 'xorexpr', 'sexpr', 'restricted_sexpr', + 'restricted_oexpr', 'oexpr', 'relxperm', 'relxexpr', '_perm'): + msg = '[3.14] import %s from cubes.localperms' % name + globals()[name] = deprecated(msg, name=name, doc='deprecated')(MyLazyObject('cubes.localperms', name)) diff -r aa30c665bd06 -r f2a5805615f8 schemas/base.py --- a/schemas/base.py Fri Sep 16 16:42:42 2011 +0200 +++ b/schemas/base.py Fri Sep 16 17:30:26 2011 +0200 @@ -180,31 +180,6 @@ cardinality = '?*' -class CWPermission(EntityType): - """entity type that may be used to construct some advanced security configuration - """ - __permissions__ = PUB_SYSTEM_ENTITY_PERMS - - name = String(required=True, indexed=True, internationalizable=True, maxsize=100, - description=_('name or identifier of the permission')) - label = String(required=True, internationalizable=True, maxsize=100, - description=_('distinct label to distinguate between other ' - 'permission entity of the same name')) - require_group = SubjectRelation('CWGroup', - description=_('groups to which the permission is granted')) - -# explicitly add X require_permission CWPermission for each entity that should have -# configurable security -class require_permission(RelationType): - """link a permission to the entity. This permission should be used in the - security definition of the entity's type to be useful. - """ - __permissions__ = PUB_SYSTEM_REL_PERMS - -class require_group(RelationType): - """used to grant a permission to a group""" - __permissions__ = PUB_SYSTEM_REL_PERMS - class ExternalUri(EntityType): """a URI representing an object in external data store""" @@ -382,3 +357,5 @@ 'add': ('managers', RRQLExpression('U has_update_permission S'),), 'delete': ('managers', RRQLExpression('U has_update_permission S'),), } + + diff -r aa30c665bd06 -r f2a5805615f8 schemas/workflow.py --- a/schemas/workflow.py Fri Sep 16 16:42:42 2011 +0200 +++ b/schemas/workflow.py Fri Sep 16 17:30:26 2011 +0200 @@ -1,4 +1,4 @@ -# copyright 2003-2010 LOGILAB S.A. (Paris, FRANCE), all rights reserved. +# copyright 2003-2011 LOGILAB S.A. (Paris, FRANCE), all rights reserved. # contact http://www.logilab.fr/ -- mailto:contact@logilab.fr # # This file is part of CubicWeb. @@ -21,14 +21,15 @@ __docformat__ = "restructuredtext en" _ = unicode -from yams.buildobjs import (EntityType, RelationType, SubjectRelation, +from yams.buildobjs import (EntityType, RelationType, RelationDefinition, + SubjectRelation, RichString, String, Int) from cubicweb.schema import RQLConstraint, RQLUniqueConstraint -from cubicweb.schemas import (META_ETYPE_PERMS, META_RTYPE_PERMS, - HOOKS_RTYPE_PERMS) +from cubicweb.schemas import (PUB_SYSTEM_ENTITY_PERMS, PUB_SYSTEM_REL_PERMS, + RO_REL_PERMS) class Workflow(EntityType): - __permissions__ = META_ETYPE_PERMS + __permissions__ = PUB_SYSTEM_ENTITY_PERMS name = String(required=True, indexed=True, internationalizable=True, maxsize=256) @@ -47,7 +48,7 @@ class default_workflow(RelationType): """default workflow for an entity type""" - __permissions__ = META_RTYPE_PERMS + __permissions__ = PUB_SYSTEM_REL_PERMS subject = 'CWEType' object = 'Workflow' @@ -60,7 +61,7 @@ """used to associate simple states to an entity type and/or to define workflows """ - __permissions__ = META_ETYPE_PERMS + __permissions__ = PUB_SYSTEM_ENTITY_PERMS name = String(required=True, indexed=True, internationalizable=True, maxsize=256, @@ -83,7 +84,7 @@ class BaseTransition(EntityType): """abstract base class for transitions""" - __permissions__ = META_ETYPE_PERMS + __permissions__ = PUB_SYSTEM_ENTITY_PERMS name = String(required=True, indexed=True, internationalizable=True, maxsize=256, @@ -91,22 +92,34 @@ _('workflow already have a transition of that name'))]) type = String(vocabulary=(_('normal'), _('auto')), default='normal') description = RichString(description=_('semantic description of this transition')) - condition = SubjectRelation('RQLExpression', cardinality='*?', composite='subject', - description=_('a RQL expression which should return some results, ' - 'else the transition won\'t be available. ' - 'This query may use X and U variables ' - 'that will respectivly represents ' - 'the current entity and the current user')) - require_group = SubjectRelation('CWGroup', cardinality='**', - description=_('group in which a user should be to be ' - 'allowed to pass this transition')) transition_of = SubjectRelation('Workflow', cardinality='1*', composite='object', description=_('workflow to which this transition belongs'), constraints=[RQLUniqueConstraint('S name N, Y transition_of O, Y name N', 'Y', _('workflow already have a transition of that name'))]) +class require_group(RelationDefinition): + """group in which a user should be to be allowed to pass this transition""" + __permissions__ = PUB_SYSTEM_REL_PERMS + subject = 'BaseTransition' + object = 'CWGroup' + + +class condition(RelationDefinition): + """a RQL expression which should return some results, else the transition + won't be available. + + This query may use X and U variables that will respectivly represents the + current entity and the current user. + """ + __permissions__ = PUB_SYSTEM_REL_PERMS + subject = 'BaseTransition' + object = 'RQLExpression' + cardinality = '*?' + composite = 'subject' + + class Transition(BaseTransition): """use to define a transition from one or multiple states to a destination states in workflow's definitions. Transition without destination state will @@ -177,11 +190,11 @@ # get actor and date time using owned_by and creation_date class from_state(RelationType): - __permissions__ = HOOKS_RTYPE_PERMS.copy() + __permissions__ = RO_REL_PERMS.copy() inlined = True class to_state(RelationType): - __permissions__ = HOOKS_RTYPE_PERMS.copy() + __permissions__ = RO_REL_PERMS.copy() inlined = True class by_transition(RelationType): @@ -196,60 +209,52 @@ class workflow_of(RelationType): """link a workflow to one or more entity type""" - __permissions__ = META_RTYPE_PERMS + __permissions__ = PUB_SYSTEM_REL_PERMS class state_of(RelationType): """link a state to one or more workflow""" - __permissions__ = META_RTYPE_PERMS + __permissions__ = PUB_SYSTEM_REL_PERMS inlined = True class transition_of(RelationType): """link a transition to one or more workflow""" - __permissions__ = META_RTYPE_PERMS + __permissions__ = PUB_SYSTEM_REL_PERMS inlined = True class destination_state(RelationType): """destination state of a transition""" - __permissions__ = META_RTYPE_PERMS + __permissions__ = PUB_SYSTEM_REL_PERMS inlined = True class allowed_transition(RelationType): """allowed transitions from this state""" - __permissions__ = META_RTYPE_PERMS + __permissions__ = PUB_SYSTEM_REL_PERMS class initial_state(RelationType): """indicate which state should be used by default when an entity using states is created """ - __permissions__ = META_RTYPE_PERMS + __permissions__ = PUB_SYSTEM_REL_PERMS inlined = True class subworkflow(RelationType): - __permissions__ = META_RTYPE_PERMS + __permissions__ = PUB_SYSTEM_REL_PERMS inlined = True class exit_point(RelationType): - __permissions__ = META_RTYPE_PERMS + __permissions__ = PUB_SYSTEM_REL_PERMS class subworkflow_state(RelationType): - __permissions__ = META_RTYPE_PERMS + __permissions__ = PUB_SYSTEM_REL_PERMS inlined = True -class condition(RelationType): - __permissions__ = META_RTYPE_PERMS - -# already defined in base.py -# class require_group(RelationType): -# __permissions__ = META_RTYPE_PERMS - - # "abstract" relations, set by WorkflowableEntityType ########################## class custom_workflow(RelationType): """allow to set a specific workflow for an entity""" - __permissions__ = META_RTYPE_PERMS + __permissions__ = PUB_SYSTEM_REL_PERMS cardinality = '?*' constraints = [RQLConstraint('S is ET, O workflow_of ET', @@ -275,7 +280,7 @@ class in_state(RelationType): """indicate the current state of an entity""" - __permissions__ = HOOKS_RTYPE_PERMS + __permissions__ = RO_REL_PERMS # not inlined intentionnaly since when using ldap sources, user'state # has to be stored outside the CWUser table diff -r aa30c665bd06 -r f2a5805615f8 server/migractions.py --- a/server/migractions.py Fri Sep 16 16:42:42 2011 +0200 +++ b/server/migractions.py Fri Sep 16 17:30:26 2011 +0200 @@ -117,7 +117,15 @@ # which is called on regular start repo.hm.call_hooks('server_maintenance', repo=repo) if not schema and not getattr(config, 'quick_start', False): - schema = config.load_schema(expand_cubes=True) + insert_lperms = self.repo.get_versions()['cubicweb'] < (3, 14, 0) and 'localperms' in config.available_cubes() + if insert_lperms: + cubes = config._cubes + config._cubes += ('localperms',) + try: + schema = config.load_schema(expand_cubes=True) + finally: + if insert_lperms: + config._cubes = cubes self.fs_schema = schema self._synchronized = set() diff -r aa30c665bd06 -r f2a5805615f8 server/repository.py --- a/server/repository.py Fri Sep 16 16:42:42 2011 +0200 +++ b/server/repository.py Fri Sep 16 17:30:26 2011 +0200 @@ -60,8 +60,7 @@ security_enabled from cubicweb.server.ssplanner import EditedEntity -NO_CACHE_RELATIONS = set( [('require_permission', 'object'), - ('owned_by', 'object'), +NO_CACHE_RELATIONS = set( [('owned_by', 'object'), ('created_by', 'object'), ('cw_source', 'object'), ]) @@ -460,8 +459,9 @@ def _build_user(self, session, eid): """return a CWUser entity for user with the given eid""" cls = self.vreg['etypes'].etype_class('CWUser') - rql = cls.fetch_rql(session.user, ['X eid %(x)s']) - rset = session.execute(rql, {'x': eid}) + st = cls.fetch_rqlst(session.user, ordermethod=None) + st.add_eid_restriction(st.get_variable('X'), 'x', 'Substitute') + rset = session.execute(st.as_string(), {'x': eid}) assert len(rset) == 1, rset cwuser = rset.get_entity(0, 0) # pylint: disable=W0104 diff -r aa30c665bd06 -r f2a5805615f8 server/session.py --- a/server/session.py Fri Sep 16 16:42:42 2011 +0200 +++ b/server/session.py Fri Sep 16 17:30:26 2011 +0200 @@ -28,6 +28,7 @@ from warnings import warn from logilab.common.deprecation import deprecated +from logilab.common.textutils import unormalize from rql import CoercionError from rql.nodes import ETYPE_PYOBJ_MAP, etype_from_pyobj from yams import BASE_TYPES @@ -232,7 +233,7 @@ def __init__(self, user, repo, cnxprops=None, _id=None): super(Session, self).__init__(repo.vreg) - self.id = _id or make_uid(user.login.encode('UTF8')) + self.id = _id or make_uid(unormalize(user.login).encode('UTF8')) cnxprops = cnxprops or ConnectionProperties('inmemory') self.user = user self.repo = repo @@ -1319,9 +1320,6 @@ def owns(self, eid): return True - def has_permission(self, pname, contexteid=None): - return True - def property_value(self, key): if key == 'ui.language': return 'en' diff -r aa30c665bd06 -r f2a5805615f8 server/test/data/bootstrap_cubes --- a/server/test/data/bootstrap_cubes Fri Sep 16 16:42:42 2011 +0200 +++ b/server/test/data/bootstrap_cubes Fri Sep 16 17:30:26 2011 +0200 @@ -1,1 +1,1 @@ -card,comment,folder,tag,basket,email,file +card,comment,folder,tag,basket,email,file,localperms diff -r aa30c665bd06 -r f2a5805615f8 server/test/unittest_checkintegrity.py --- a/server/test/unittest_checkintegrity.py Fri Sep 16 16:42:42 2011 +0200 +++ b/server/test/unittest_checkintegrity.py Fri Sep 16 17:30:26 2011 +0200 @@ -46,9 +46,9 @@ def test_reindex_all(self): self.execute('INSERT Personne X: X nom "toto", X prenom "tutu"') self.session.commit(False) - self.failUnless(self.execute('Any X WHERE X has_text "tutu"')) + self.assertTrue(self.execute('Any X WHERE X has_text "tutu"')) reindex_entities(self.repo.schema, self.session, withpb=False) - self.failUnless(self.execute('Any X WHERE X has_text "tutu"')) + self.assertTrue(self.execute('Any X WHERE X has_text "tutu"')) def test_reindex_etype(self): self.execute('INSERT Personne X: X nom "toto", X prenom "tutu"') @@ -56,8 +56,8 @@ self.session.commit(False) reindex_entities(self.repo.schema, self.session, withpb=False, etypes=('Personne',)) - self.failUnless(self.execute('Any X WHERE X has_text "tutu"')) - self.failUnless(self.execute('Any X WHERE X has_text "toto"')) + self.assertTrue(self.execute('Any X WHERE X has_text "tutu"')) + self.assertTrue(self.execute('Any X WHERE X has_text "toto"')) if __name__ == '__main__': unittest_main() diff -r aa30c665bd06 -r f2a5805615f8 server/test/unittest_datafeed.py --- a/server/test/unittest_datafeed.py Fri Sep 16 16:42:42 2011 +0200 +++ b/server/test/unittest_datafeed.py Fri Sep 16 17:30:26 2011 +0200 @@ -115,8 +115,8 @@ req = self.request() req.execute('DELETE CWSource S WHERE S name "myrenamedfeed"') self.commit() - self.failIf(self.execute('Card X WHERE X title "cubicweb.org"')) - self.failIf(self.execute('Any X WHERE X has_text "cubicweb.org"')) + self.assertFalse(self.execute('Card X WHERE X title "cubicweb.org"')) + self.assertFalse(self.execute('Any X WHERE X has_text "cubicweb.org"')) if __name__ == '__main__': from logilab.common.testlib import unittest_main diff -r aa30c665bd06 -r f2a5805615f8 server/test/unittest_ldapuser.py --- a/server/test/unittest_ldapuser.py Fri Sep 16 16:42:42 2011 +0200 +++ b/server/test/unittest_ldapuser.py Fri Sep 16 17:30:26 2011 +0200 @@ -262,7 +262,7 @@ def test_multiple_entities_from_different_sources(self): req = self.request() self.create_user(req, 'cochon') - self.failUnless(self.sexecute('Any X,Y WHERE X login %(syt)s, Y login "cochon"', {'syt': SYT})) + self.assertTrue(self.sexecute('Any X,Y WHERE X login %(syt)s, Y login "cochon"', {'syt': SYT})) def test_exists1(self): self.session.set_cnxset() @@ -288,9 +288,9 @@ self.create_user(req, 'comme') self.create_user(req, 'cochon') self.sexecute('SET X copain Y WHERE X login "comme", Y login "cochon"') - self.failUnless(self.sexecute('Any X, Y WHERE X copain Y, X login "comme", Y login "cochon"')) + self.assertTrue(self.sexecute('Any X, Y WHERE X copain Y, X login "comme", Y login "cochon"')) self.sexecute('SET X copain Y WHERE X login %(syt)s, Y login "cochon"', {'syt': SYT}) - self.failUnless(self.sexecute('Any X, Y WHERE X copain Y, X login %(syt)s, Y login "cochon"', {'syt': SYT})) + self.assertTrue(self.sexecute('Any X, Y WHERE X copain Y, X login %(syt)s, Y login "cochon"', {'syt': SYT})) rset = self.sexecute('Any GN,L WHERE X in_group G, X login L, G name GN, G name "managers" ' 'OR EXISTS(X copain T, T login in ("comme", "cochon"))') self.assertEqual(sorted(rset.rows), [['managers', 'admin'], ['users', 'comme'], ['users', SYT]]) @@ -392,8 +392,8 @@ 'type': 'CWUser', 'extid': None}) self.assertEqual(e.cw_source[0].name, 'system') - self.failUnless(e.creation_date) - self.failUnless(e.modification_date) + self.assertTrue(e.creation_date) + self.assertTrue(e.modification_date) # XXX test some password has been set source.synchronize() rset = self.sexecute('CWUser X WHERE X login %(login)s', {'login': SYT}) diff -r aa30c665bd06 -r f2a5805615f8 server/test/unittest_migractions.py --- a/server/test/unittest_migractions.py Fri Sep 16 16:42:42 2011 +0200 +++ b/server/test/unittest_migractions.py Fri Sep 16 17:30:26 2011 +0200 @@ -74,13 +74,13 @@ self.repo.vreg['etypes'].clear_caches() def test_add_attribute_int(self): - self.failIf('whatever' in self.schema) + self.assertFalse('whatever' in self.schema) self.request().create_entity('Note') self.commit() orderdict = dict(self.mh.rqlexec('Any RTN, O WHERE X name "Note", RDEF from_entity X, ' 'RDEF relation_type RT, RDEF ordernum O, RT name RTN')) self.mh.cmd_add_attribute('Note', 'whatever') - self.failUnless('whatever' in self.schema) + self.assertTrue('whatever' in self.schema) self.assertEqual(self.schema['whatever'].subjects(), ('Note',)) self.assertEqual(self.schema['whatever'].objects(), ('Int',)) self.assertEqual(self.schema['Note'].default('whatever'), 2) @@ -108,12 +108,12 @@ self.mh.rollback() def test_add_attribute_varchar(self): - self.failIf('whatever' in self.schema) + self.assertFalse('whatever' in self.schema) self.request().create_entity('Note') self.commit() - self.failIf('shortpara' in self.schema) + self.assertFalse('shortpara' in self.schema) self.mh.cmd_add_attribute('Note', 'shortpara') - self.failUnless('shortpara' in self.schema) + self.assertTrue('shortpara' in self.schema) self.assertEqual(self.schema['shortpara'].subjects(), ('Note', )) self.assertEqual(self.schema['shortpara'].objects(), ('String', )) # test created column is actually a varchar(64) @@ -128,10 +128,10 @@ self.mh.rollback() def test_add_datetime_with_default_value_attribute(self): - self.failIf('mydate' in self.schema) - self.failIf('shortpara' in self.schema) + self.assertFalse('mydate' in self.schema) + self.assertFalse('shortpara' in self.schema) self.mh.cmd_add_attribute('Note', 'mydate') - self.failUnless('mydate' in self.schema) + self.assertTrue('mydate' in self.schema) self.assertEqual(self.schema['mydate'].subjects(), ('Note', )) self.assertEqual(self.schema['mydate'].objects(), ('Date', )) testdate = date(2005, 12, 13) @@ -167,17 +167,17 @@ self.mh.rollback() def test_rename_attribute(self): - self.failIf('civility' in self.schema) + self.assertFalse('civility' in self.schema) eid1 = self.mh.rqlexec('INSERT Personne X: X nom "lui", X sexe "M"')[0][0] eid2 = self.mh.rqlexec('INSERT Personne X: X nom "l\'autre", X sexe NULL')[0][0] self.mh.cmd_rename_attribute('Personne', 'sexe', 'civility') - self.failIf('sexe' in self.schema) - self.failUnless('civility' in self.schema) + self.assertFalse('sexe' in self.schema) + self.assertTrue('civility' in self.schema) # test data has been backported c1 = self.mh.rqlexec('Any C WHERE X eid %s, X civility C' % eid1)[0][0] - self.failUnlessEqual(c1, 'M') + self.assertEqual(c1, 'M') c2 = self.mh.rqlexec('Any C WHERE X eid %s, X civility C' % eid2)[0][0] - self.failUnlessEqual(c2, None) + self.assertEqual(c2, None) def test_workflow_actions(self): @@ -192,13 +192,13 @@ self.assertEqual(s1, "foo") def test_add_entity_type(self): - self.failIf('Folder2' in self.schema) - self.failIf('filed_under2' in self.schema) + self.assertFalse('Folder2' in self.schema) + self.assertFalse('filed_under2' in self.schema) self.mh.cmd_add_entity_type('Folder2') - self.failUnless('Folder2' in self.schema) - self.failUnless(self.execute('CWEType X WHERE X name "Folder2"')) - self.failUnless('filed_under2' in self.schema) - self.failUnless(self.execute('CWRType X WHERE X name "filed_under2"')) + self.assertTrue('Folder2' in self.schema) + self.assertTrue(self.execute('CWEType X WHERE X name "Folder2"')) + self.assertTrue('filed_under2' in self.schema) + self.assertTrue(self.execute('CWRType X WHERE X name "filed_under2"')) self.schema.rebuild_infered_relations() self.assertEqual(sorted(str(rs) for rs in self.schema['Folder2'].subject_relations()), ['created_by', 'creation_date', 'cw_source', 'cwuri', @@ -214,7 +214,7 @@ self.assertEqual(self.schema['filed_under2'].objects(), ('Folder2',)) eschema = self.schema.eschema('Folder2') for cstr in eschema.rdef('name').constraints: - self.failUnless(hasattr(cstr, 'eid')) + self.assertTrue(hasattr(cstr, 'eid')) def test_add_drop_entity_type(self): self.mh.cmd_add_entity_type('Folder2') @@ -227,23 +227,23 @@ self.commit() eschema = self.schema.eschema('Folder2') self.mh.cmd_drop_entity_type('Folder2') - self.failIf('Folder2' in self.schema) - self.failIf(self.execute('CWEType X WHERE X name "Folder2"')) + self.assertFalse('Folder2' in self.schema) + self.assertFalse(self.execute('CWEType X WHERE X name "Folder2"')) # test automatic workflow deletion - self.failIf(self.execute('Workflow X WHERE NOT X workflow_of ET')) - self.failIf(self.execute('State X WHERE NOT X state_of WF')) - self.failIf(self.execute('Transition X WHERE NOT X transition_of WF')) + self.assertFalse(self.execute('Workflow X WHERE NOT X workflow_of ET')) + self.assertFalse(self.execute('State X WHERE NOT X state_of WF')) + self.assertFalse(self.execute('Transition X WHERE NOT X transition_of WF')) def test_add_drop_relation_type(self): self.mh.cmd_add_entity_type('Folder2', auto=False) self.mh.cmd_add_relation_type('filed_under2') self.schema.rebuild_infered_relations() - self.failUnless('filed_under2' in self.schema) + self.assertTrue('filed_under2' in self.schema) self.assertEqual(sorted(str(e) for e in self.schema['filed_under2'].subjects()), sorted(str(e) for e in self.schema.entities() if not e.final)) self.assertEqual(self.schema['filed_under2'].objects(), ('Folder2',)) self.mh.cmd_drop_relation_type('filed_under2') - self.failIf('filed_under2' in self.schema) + self.assertFalse('filed_under2' in self.schema) def test_add_relation_definition_nortype(self): self.mh.cmd_add_relation_definition('Personne', 'concerne2', 'Affaire') @@ -260,9 +260,9 @@ self.mh.rqlexec('SET X concerne2 Y WHERE X is Personne, Y is Affaire') self.commit() self.mh.cmd_drop_relation_definition('Personne', 'concerne2', 'Affaire') - self.failUnless('concerne2' in self.schema) + self.assertTrue('concerne2' in self.schema) self.mh.cmd_drop_relation_definition('Personne', 'concerne2', 'Note') - self.failIf('concerne2' in self.schema) + self.assertFalse('concerne2' in self.schema) def test_drop_relation_definition_existant_rtype(self): self.assertEqual(sorted(str(e) for e in self.schema['concerne'].subjects()), @@ -380,8 +380,8 @@ self.assertEqual(eexpr.reverse_delete_permission, ()) self.assertEqual(eexpr.reverse_update_permission, ()) # no more rqlexpr to delete and add para attribute - self.failIf(self._rrqlexpr_rset('add', 'para')) - self.failIf(self._rrqlexpr_rset('delete', 'para')) + self.assertFalse(self._rrqlexpr_rset('add', 'para')) + self.assertFalse(self._rrqlexpr_rset('delete', 'para')) # new rql expr to add ecrit_par relation rexpr = self._rrqlexpr_entity('add', 'ecrit_par') self.assertEqual(rexpr.expression, @@ -391,13 +391,13 @@ self.assertEqual(rexpr.reverse_read_permission, ()) self.assertEqual(rexpr.reverse_delete_permission, ()) # no more rqlexpr to delete and add travaille relation - self.failIf(self._rrqlexpr_rset('add', 'travaille')) - self.failIf(self._rrqlexpr_rset('delete', 'travaille')) + self.assertFalse(self._rrqlexpr_rset('add', 'travaille')) + self.assertFalse(self._rrqlexpr_rset('delete', 'travaille')) # no more rqlexpr to delete and update Societe entity - self.failIf(self._erqlexpr_rset('update', 'Societe')) - self.failIf(self._erqlexpr_rset('delete', 'Societe')) + self.assertFalse(self._erqlexpr_rset('update', 'Societe')) + self.assertFalse(self._erqlexpr_rset('delete', 'Societe')) # no more rqlexpr to read Affaire entity - self.failIf(self._erqlexpr_rset('read', 'Affaire')) + self.assertFalse(self._erqlexpr_rset('read', 'Affaire')) # rqlexpr to update Affaire entity has been updated eexpr = self._erqlexpr_entity('update', 'Affaire') self.assertEqual(eexpr.expression, 'X concerne S, S owned_by U') @@ -470,13 +470,13 @@ try: self.mh.cmd_remove_cube('email', removedeps=True) # file was there because it's an email dependancy, should have been removed - self.failIf('email' in self.config.cubes()) - self.failIf(self.config.cube_dir('email') in self.config.cubes_path()) - self.failIf('file' in self.config.cubes()) - self.failIf(self.config.cube_dir('file') in self.config.cubes_path()) + self.assertFalse('email' in self.config.cubes()) + self.assertFalse(self.config.cube_dir('email') in self.config.cubes_path()) + self.assertFalse('file' in self.config.cubes()) + self.assertFalse(self.config.cube_dir('file') in self.config.cubes_path()) for ertype in ('Email', 'EmailThread', 'EmailPart', 'File', 'sender', 'in_thread', 'reply_to', 'data_format'): - self.failIf(ertype in schema, ertype) + self.assertFalse(ertype in schema, ertype) self.assertEqual(sorted(schema['see_also'].rdefs.keys()), sorted([('Folder', 'Folder'), ('Bookmark', 'Bookmark'), @@ -493,13 +493,13 @@ raise finally: self.mh.cmd_add_cube('email') - self.failUnless('email' in self.config.cubes()) - self.failUnless(self.config.cube_dir('email') in self.config.cubes_path()) - self.failUnless('file' in self.config.cubes()) - self.failUnless(self.config.cube_dir('file') in self.config.cubes_path()) + self.assertTrue('email' in self.config.cubes()) + self.assertTrue(self.config.cube_dir('email') in self.config.cubes_path()) + self.assertTrue('file' in self.config.cubes()) + self.assertTrue(self.config.cube_dir('file') in self.config.cubes_path()) for ertype in ('Email', 'EmailThread', 'EmailPart', 'File', 'sender', 'in_thread', 'reply_to', 'data_format'): - self.failUnless(ertype in schema, ertype) + self.assertTrue(ertype in schema, ertype) self.assertEqual(sorted(schema['see_also'].rdefs.keys()), sorted([('EmailThread', 'EmailThread'), ('Folder', 'Folder'), ('Bookmark', 'Bookmark'), @@ -530,18 +530,18 @@ try: self.mh.cmd_remove_cube('email') cubes.remove('email') - self.failIf('email' in self.config.cubes()) - self.failUnless('file' in self.config.cubes()) + self.assertFalse('email' in self.config.cubes()) + self.assertTrue('file' in self.config.cubes()) for ertype in ('Email', 'EmailThread', 'EmailPart', 'sender', 'in_thread', 'reply_to'): - self.failIf(ertype in schema, ertype) + self.assertFalse(ertype in schema, ertype) except : import traceback traceback.print_exc() raise finally: self.mh.cmd_add_cube('email') - self.failUnless('email' in self.config.cubes()) + self.assertTrue('email' in self.config.cubes()) # trick: overwrite self.maxeid to avoid deletion of just reintroduced # types (and their associated tables!) self.maxeid = self.execute('Any MAX(X)')[0][0] @@ -570,13 +570,13 @@ text = self.execute('INSERT Text X: X para "hip", X summary "hop", X newattr "momo"').get_entity(0, 0) note = self.execute('INSERT Note X: X para "hip", X shortpara "hop", X newattr "momo", X unique_id "x"').get_entity(0, 0) aff = self.execute('INSERT Affaire X').get_entity(0, 0) - self.failUnless(self.execute('SET X newnotinlined Y WHERE X eid %(x)s, Y eid %(y)s', + self.assertTrue(self.execute('SET X newnotinlined Y WHERE X eid %(x)s, Y eid %(y)s', {'x': text.eid, 'y': aff.eid})) - self.failUnless(self.execute('SET X newnotinlined Y WHERE X eid %(x)s, Y eid %(y)s', + self.assertTrue(self.execute('SET X newnotinlined Y WHERE X eid %(x)s, Y eid %(y)s', {'x': note.eid, 'y': aff.eid})) - self.failUnless(self.execute('SET X newinlined Y WHERE X eid %(x)s, Y eid %(y)s', + self.assertTrue(self.execute('SET X newinlined Y WHERE X eid %(x)s, Y eid %(y)s', {'x': text.eid, 'y': aff.eid})) - self.failUnless(self.execute('SET X newinlined Y WHERE X eid %(x)s, Y eid %(y)s', + self.assertTrue(self.execute('SET X newinlined Y WHERE X eid %(x)s, Y eid %(y)s', {'x': note.eid, 'y': aff.eid})) # XXX remove specializes by ourselves, else tearDown fails when removing # Para because of Note inheritance. This could be fixed by putting the @@ -601,11 +601,11 @@ def test_add_symmetric_relation_type(self): same_as_sql = self.mh.sqlexec("SELECT sql FROM sqlite_master WHERE type='table' " "and name='same_as_relation'") - self.failIf(same_as_sql) + self.assertFalse(same_as_sql) self.mh.cmd_add_relation_type('same_as') same_as_sql = self.mh.sqlexec("SELECT sql FROM sqlite_master WHERE type='table' " "and name='same_as_relation'") - self.failUnless(same_as_sql) + self.assertTrue(same_as_sql) if __name__ == '__main__': unittest_main() diff -r aa30c665bd06 -r f2a5805615f8 server/test/unittest_multisources.py --- a/server/test/unittest_multisources.py Fri Sep 16 16:42:42 2011 +0200 +++ b/server/test/unittest_multisources.py Fri Sep 16 17:30:26 2011 +0200 @@ -180,10 +180,10 @@ def test_has_text(self): self.repo.sources_by_uri['extern'].synchronize(MTIME) # in case fti_update has been run before - self.failUnless(self.sexecute('Any X WHERE X has_text "affref"')) - self.failUnless(self.sexecute('Affaire X WHERE X has_text "affref"')) - self.failUnless(self.sexecute('Any X ORDERBY FTIRANK(X) WHERE X has_text "affref"')) - self.failUnless(self.sexecute('Affaire X ORDERBY FTIRANK(X) WHERE X has_text "affref"')) + self.assertTrue(self.sexecute('Any X WHERE X has_text "affref"')) + self.assertTrue(self.sexecute('Affaire X WHERE X has_text "affref"')) + self.assertTrue(self.sexecute('Any X ORDERBY FTIRANK(X) WHERE X has_text "affref"')) + self.assertTrue(self.sexecute('Affaire X ORDERBY FTIRANK(X) WHERE X has_text "affref"')) def test_anon_has_text(self): self.repo.sources_by_uri['extern'].synchronize(MTIME) # in case fti_update has been run before @@ -210,13 +210,13 @@ try: # force sync self.repo.sources_by_uri['extern'].synchronize(MTIME) - self.failUnless(self.sexecute('Any X WHERE X has_text "blah"')) - self.failUnless(self.sexecute('Any X WHERE X has_text "affreux"')) + self.assertTrue(self.sexecute('Any X WHERE X has_text "blah"')) + self.assertTrue(self.sexecute('Any X WHERE X has_text "affreux"')) cu.execute('DELETE Affaire X WHERE X eid %(x)s', {'x': aff2}) self.cnx2.commit() self.repo.sources_by_uri['extern'].synchronize(MTIME) rset = self.sexecute('Any X WHERE X has_text "affreux"') - self.failIf(rset) + self.assertFalse(rset) finally: # restore state cu.execute('SET X ref "AFFREF" WHERE X eid %(x)s', {'x': self.aff1}) @@ -389,7 +389,7 @@ req.execute('DELETE CWSource S WHERE S name "extern"') self.commit() cu = self.session.system_sql("SELECT * FROM entities WHERE source='extern'") - self.failIf(cu.fetchall()) + self.assertFalse(cu.fetchall()) if __name__ == '__main__': from logilab.common.testlib import unittest_main diff -r aa30c665bd06 -r f2a5805615f8 server/test/unittest_querier.py --- a/server/test/unittest_querier.py Fri Sep 16 16:42:42 2011 +0200 +++ b/server/test/unittest_querier.py Fri Sep 16 17:30:26 2011 +0200 @@ -250,7 +250,7 @@ def test_unknown_eid(self): # should return an empty result set - self.failIf(self.execute('Any X WHERE X eid 99999999')) + self.assertFalse(self.execute('Any X WHERE X eid 99999999')) def test_typed_eid(self): # should return an empty result set @@ -418,8 +418,8 @@ self.execute("SET X tags Y WHERE X eid %(t)s, Y eid %(g)s", {'g': geid, 't': teid}) rset = self.execute("Any GN,TN ORDERBY GN WHERE T? tags G, T name TN, G name GN") - self.failUnless(['users', 'tag'] in rset.rows) - self.failUnless(['activated', None] in rset.rows) + self.assertTrue(['users', 'tag'] in rset.rows) + self.assertTrue(['activated', None] in rset.rows) rset = self.execute("Any GN,TN ORDERBY GN WHERE T tags G?, T name TN, G name GN") self.assertEqual(rset.rows, [[None, 'tagbis'], ['users', 'tag']]) @@ -494,26 +494,26 @@ def test_select_custom_aggregat_concat_string(self): rset = self.execute('Any GROUP_CONCAT(N) WHERE X is CWGroup, X name N') - self.failUnless(rset) - self.failUnlessEqual(sorted(rset[0][0].split(', ')), ['guests', 'managers', + self.assertTrue(rset) + self.assertEqual(sorted(rset[0][0].split(', ')), ['guests', 'managers', 'owners', 'users']) def test_select_custom_regproc_limit_size(self): rset = self.execute('Any TEXT_LIMIT_SIZE(N, 3) WHERE X is CWGroup, X name N, X name "managers"') - self.failUnless(rset) - self.failUnlessEqual(rset[0][0], 'man...') + self.assertTrue(rset) + self.assertEqual(rset[0][0], 'man...') self.execute("INSERT Basket X: X name 'bidule', X description 'hop hop', X description_format 'text/html'") rset = self.execute('Any LIMIT_SIZE(D, DF, 3) WHERE X is Basket, X description D, X description_format DF') - self.failUnless(rset) - self.failUnlessEqual(rset[0][0], 'hop...') + self.assertTrue(rset) + self.assertEqual(rset[0][0], 'hop...') def test_select_regproc_orderby(self): rset = self.execute('DISTINCT Any X,N ORDERBY GROUP_SORT_VALUE(N) WHERE X is CWGroup, X name N, X name "managers"') - self.failUnlessEqual(len(rset), 1) - self.failUnlessEqual(rset[0][1], 'managers') + self.assertEqual(len(rset), 1) + self.assertEqual(rset[0][1], 'managers') rset = self.execute('Any X,N ORDERBY GROUP_SORT_VALUE(N) WHERE X is CWGroup, X name N, NOT U in_group X, U login "admin"') - self.failUnlessEqual(len(rset), 3) - self.failUnlessEqual(rset[0][1], 'owners') + self.assertEqual(len(rset), 3) + self.assertEqual(rset[0][1], 'owners') def test_select_aggregat_sort(self): rset = self.execute('Any G, COUNT(U) GROUPBY G ORDERBY 2 WHERE U in_group G') @@ -619,7 +619,7 @@ self.assertEqual(len(rset.rows), 2, rset.rows) biduleeids = [r[0] for r in rset.rows] rset = self.execute(u'Any N where NOT N has_text "bidüle"') - self.failIf([r[0] for r in rset.rows if r[0] in biduleeids]) + self.assertFalse([r[0] for r in rset.rows if r[0] in biduleeids]) # duh? rset = self.execute('Any X WHERE X has_text %(text)s', {'text': u'ça'}) @@ -757,7 +757,7 @@ def test_select_explicit_eid(self): rset = self.execute('Any X,E WHERE X owned_by U, X eid E, U eid %(u)s', {'u': self.session.user.eid}) - self.failUnless(rset) + self.assertTrue(rset) self.assertEqual(rset.description[0][1], 'Int') # def test_select_rewritten_optional(self): @@ -774,7 +774,7 @@ rset = self.execute('Tag X WHERE X creation_date TODAY') self.assertEqual(len(rset.rows), 2) rset = self.execute('Any MAX(D) WHERE X is Tag, X creation_date D') - self.failUnless(isinstance(rset[0][0], datetime), (rset[0][0], type(rset[0][0]))) + self.assertTrue(isinstance(rset[0][0], datetime), (rset[0][0], type(rset[0][0]))) def test_today(self): self.execute("INSERT Tag X: X name 'bidule', X creation_date TODAY") @@ -891,11 +891,11 @@ def test_select_date_mathexp(self): rset = self.execute('Any X, TODAY - CD WHERE X is CWUser, X creation_date CD') - self.failUnless(rset) - self.failUnlessEqual(rset.description[0][1], 'Interval') + self.assertTrue(rset) + self.assertEqual(rset.description[0][1], 'Interval') eid, = self.execute("INSERT Personne X: X nom 'bidule'")[0] rset = self.execute('Any X, NOW - CD WHERE X is Personne, X creation_date CD') - self.failUnlessEqual(rset.description[0][1], 'Interval') + self.assertEqual(rset.description[0][1], 'Interval') def test_select_subquery_aggregat_1(self): # percent users by groups @@ -1173,7 +1173,7 @@ rset = self.execute('Any X, Y WHERE X travaille Y') self.assertEqual(len(rset.rows), 1) # test add of an existant relation but with NOT X rel Y protection - self.failIf(self.execute("SET X travaille Y WHERE X eid %(x)s, Y eid %(y)s," + self.assertFalse(self.execute("SET X travaille Y WHERE X eid %(x)s, Y eid %(y)s," "NOT X travaille Y", {'x': str(eid1), 'y': str(eid2)})) @@ -1198,9 +1198,9 @@ peid2 = self.execute("INSERT Personne Y: Y nom 'tutu'")[0][0] self.execute('SET P1 owned_by U, P2 owned_by U ' 'WHERE P1 eid %s, P2 eid %s, U eid %s' % (peid1, peid2, ueid)) - self.failUnless(self.execute('Any X WHERE X eid %s, X owned_by U, U eid %s' + self.assertTrue(self.execute('Any X WHERE X eid %s, X owned_by U, U eid %s' % (peid1, ueid))) - self.failUnless(self.execute('Any X WHERE X eid %s, X owned_by U, U eid %s' + self.assertTrue(self.execute('Any X WHERE X eid %s, X owned_by U, U eid %s' % (peid2, ueid))) def test_update_math_expr(self): diff -r aa30c665bd06 -r f2a5805615f8 server/test/unittest_repository.py --- a/server/test/unittest_repository.py Fri Sep 16 16:42:42 2011 +0200 +++ b/server/test/unittest_repository.py Fri Sep 16 17:30:26 2011 +0200 @@ -98,13 +98,13 @@ ('nom', 'prenom', 'inline2')) def test_all_entities_have_owner(self): - self.failIf(self.execute('Any X WHERE NOT X owned_by U')) + self.assertFalse(self.execute('Any X WHERE NOT X owned_by U')) def test_all_entities_have_is(self): - self.failIf(self.execute('Any X WHERE NOT X is ET')) + self.assertFalse(self.execute('Any X WHERE NOT X is ET')) def test_all_entities_have_cw_source(self): - self.failIf(self.execute('Any X WHERE NOT X cw_source S')) + self.assertFalse(self.execute('Any X WHERE NOT X cw_source S')) def test_connect(self): cnxid = self.repo.connect(self.admlogin, password=self.admpassword) @@ -147,7 +147,7 @@ 'INSERT CWUser X: X login %(login)s, X upassword %(passwd)s', {'login': u"tutetute", 'passwd': 'tutetute'}) self.assertRaises(ValidationError, self.repo.commit, cnxid) - self.failIf(self.repo.execute(cnxid, 'CWUser X WHERE X login "tutetute"')) + self.assertFalse(self.repo.execute(cnxid, 'CWUser X WHERE X login "tutetute"')) self.repo.close(cnxid) def test_rollback_on_execute_validation_error(self): @@ -160,12 +160,12 @@ with self.temporary_appobjects(ValidationErrorAfterHook): self.assertRaises(ValidationError, self.execute, 'SET X name "toto" WHERE X is CWGroup, X name "guests"') - self.failUnless(self.execute('Any X WHERE X is CWGroup, X name "toto"')) + self.assertTrue(self.execute('Any X WHERE X is CWGroup, X name "toto"')) with self.assertRaises(QueryError) as cm: self.commit() self.assertEqual(str(cm.exception), 'transaction must be rollbacked') self.rollback() - self.failIf(self.execute('Any X WHERE X is CWGroup, X name "toto"')) + self.assertFalse(self.execute('Any X WHERE X is CWGroup, X name "toto"')) def test_rollback_on_execute_unauthorized(self): class UnauthorizedAfterHook(Hook): @@ -177,12 +177,12 @@ with self.temporary_appobjects(UnauthorizedAfterHook): self.assertRaises(Unauthorized, self.execute, 'SET X name "toto" WHERE X is CWGroup, X name "guests"') - self.failUnless(self.execute('Any X WHERE X is CWGroup, X name "toto"')) + self.assertTrue(self.execute('Any X WHERE X is CWGroup, X name "toto"')) with self.assertRaises(QueryError) as cm: self.commit() self.assertEqual(str(cm.exception), 'transaction must be rollbacked') self.rollback() - self.failIf(self.execute('Any X WHERE X is CWGroup, X name "toto"')) + self.assertFalse(self.execute('Any X WHERE X is CWGroup, X name "toto"')) def test_close(self): @@ -364,13 +364,13 @@ cnx.load_appobjects(subpath=('entities',)) # check we can get the schema schema = cnx.get_schema() - self.failUnless(cnx.vreg) - self.failUnless('etypes'in cnx.vreg) + self.assertTrue(cnx.vreg) + self.assertTrue('etypes'in cnx.vreg) cu = cnx.cursor() rset = cu.execute('Any U,G WHERE U in_group G') user = iter(rset.entities()).next() - self.failUnless(user._cw) - self.failUnless(user._cw.vreg) + self.assertTrue(user._cw) + self.assertTrue(user._cw.vreg) from cubicweb.entities import authobjs self.assertIsInstance(user._cw.user, authobjs.CWUser) cnx.close() @@ -425,7 +425,7 @@ def test_schema_is_relation(self): no_is_rset = self.execute('Any X WHERE NOT X is ET') - self.failIf(no_is_rset, no_is_rset.description) + self.assertFalse(no_is_rset, no_is_rset.description) # def test_perfo(self): # self.set_debug(True) @@ -509,7 +509,7 @@ events = ('before_update_entity',) def __call__(self): # invoiced attribute shouldn't be considered "edited" before the hook - self._test.failIf('invoiced' in self.entity.cw_edited, + self._test.assertFalse('invoiced' in self.entity.cw_edited, 'cw_edited cluttered by previous update') self.entity.cw_edited['invoiced'] = 10 with self.temporary_appobjects(DummyBeforeHook): @@ -582,7 +582,7 @@ self.session.set_cnxset() cu = self.session.system_sql('SELECT mtime FROM entities WHERE eid = %s' % eidp) mtime = cu.fetchone()[0] - self.failUnless(omtime < mtime) + self.assertTrue(omtime < mtime) self.commit() date, modified, deleted = self.repo.entities_modified_since(('Personne',), omtime) self.assertEqual(modified, [('Personne', eidp)]) @@ -627,7 +627,7 @@ def test_no_uncessary_ftiindex_op(self): req = self.request() req.create_entity('Workflow', name=u'dummy workflow', description=u'huuuuu') - self.failIf(any(x for x in self.session.pending_operations + self.assertFalse(any(x for x in self.session.pending_operations if isinstance(x, native.FTIndexEntityOp))) @@ -639,7 +639,7 @@ [u'system.version.basket', u'system.version.card', u'system.version.comment', u'system.version.cubicweb', u'system.version.email', u'system.version.file', u'system.version.folder', - u'system.version.tag']) + u'system.version.localperms', u'system.version.tag']) CALLED = [] diff -r aa30c665bd06 -r f2a5805615f8 server/test/unittest_rqlannotation.py --- a/server/test/unittest_rqlannotation.py Fri Sep 16 16:42:42 2011 +0200 +++ b/server/test/unittest_rqlannotation.py Fri Sep 16 17:30:26 2011 +0200 @@ -342,7 +342,7 @@ def test_remove_from_deleted_source_1(self): rqlst = self._prepare('Note X WHERE X eid 999998, NOT X cw_source Y') - self.failIf('X' in rqlst.defined_vars) # simplified + self.assertFalse('X' in rqlst.defined_vars) # simplified self.assertEqual(rqlst.defined_vars['Y']._q_invariant, True) def test_remove_from_deleted_source_2(self): diff -r aa30c665bd06 -r f2a5805615f8 server/test/unittest_security.py --- a/server/test/unittest_security.py Fri Sep 16 16:42:42 2011 +0200 +++ b/server/test/unittest_security.py Fri Sep 16 17:30:26 2011 +0200 @@ -327,7 +327,7 @@ cu = cnx.cursor() aff2 = cu.execute("INSERT Affaire X: X sujet 'cool'")[0][0] # entity created in transaction are readable *by eid* - self.failUnless(cu.execute('Any X WHERE X eid %(x)s', {'x':aff2})) + self.assertTrue(cu.execute('Any X WHERE X eid %(x)s', {'x':aff2})) # XXX would be nice if it worked rset = cu.execute("Affaire X WHERE X sujet 'cool'") self.assertEqual(len(rset), 0) @@ -347,8 +347,8 @@ cu.execute("SET A concerne S WHERE A eid %(a)s, S eid %(s)s", {'a': aff2, 's': soc1}) cnx.commit() self.assertRaises(Unauthorized, cu.execute, 'Any X WHERE X eid %(x)s', {'x':aff1}) - self.failUnless(cu.execute('Any X WHERE X eid %(x)s', {'x':aff2})) - self.failUnless(cu.execute('Any X WHERE X eid %(x)s', {'x':card1})) + self.assertTrue(cu.execute('Any X WHERE X eid %(x)s', {'x':aff2})) + self.assertTrue(cu.execute('Any X WHERE X eid %(x)s', {'x':card1})) rset = cu.execute("Any X WHERE X has_text 'cool'") self.assertEqual(sorted(eid for eid, in rset.rows), [card1, aff2]) @@ -457,14 +457,14 @@ cnx = self.login('anon') cu = cnx.cursor() rset = cu.execute('CWUser X') - self.failUnless(rset) + self.assertTrue(rset) x = rset.get_entity(0, 0) self.assertEqual(x.login, None) - self.failUnless(x.creation_date) + self.assertTrue(x.creation_date) x = rset.get_entity(1, 0) x.complete() self.assertEqual(x.login, None) - self.failUnless(x.creation_date) + self.assertTrue(x.creation_date) cnx.rollback() cnx.close() @@ -492,7 +492,7 @@ cu = cnx.cursor() cu.execute('DELETE Affaire X WHERE X ref "ARCT01"') cnx.commit() - self.failIf(cu.execute('Affaire X')) + self.assertFalse(cu.execute('Affaire X')) cnx.close() def test_users_and_groups_non_readable_by_guests(self): diff -r aa30c665bd06 -r f2a5805615f8 server/test/unittest_session.py --- a/server/test/unittest_session.py Fri Sep 16 16:42:42 2011 +0200 +++ b/server/test/unittest_session.py Fri Sep 16 17:30:26 2011 +0200 @@ -95,7 +95,7 @@ description = self.session.build_description(rset.syntax_tree(), None, rset.rows) self.assertEqual(len(description), orig_length - 1) self.assertEqual(len(rset.rows), orig_length - 1) - self.failIf(rset.rows[0][0] == 9999999) + self.assertFalse(rset.rows[0][0] == 9999999) if __name__ == '__main__': diff -r aa30c665bd06 -r f2a5805615f8 server/test/unittest_storage.py --- a/server/test/unittest_storage.py Fri Sep 16 16:42:42 2011 +0200 +++ b/server/test/unittest_storage.py Fri Sep 16 17:30:26 2011 +0200 @@ -89,10 +89,10 @@ f1 = self.create_file() expected_filepath = osp.join(self.tempdir, '%s_data_%s' % (f1.eid, f1.data_name)) - self.failUnless(osp.isfile(expected_filepath)) + self.assertTrue(osp.isfile(expected_filepath)) self.assertEqual(file(expected_filepath).read(), 'the-data') self.rollback() - self.failIf(osp.isfile(expected_filepath)) + self.assertFalse(osp.isfile(expected_filepath)) f1 = self.create_file() self.commit() self.assertEqual(file(expected_filepath).read(), 'the-data') @@ -100,12 +100,12 @@ self.rollback() self.assertEqual(file(expected_filepath).read(), 'the-data') f1.cw_delete() - self.failUnless(osp.isfile(expected_filepath)) + self.assertTrue(osp.isfile(expected_filepath)) self.rollback() - self.failUnless(osp.isfile(expected_filepath)) + self.assertTrue(osp.isfile(expected_filepath)) f1.cw_delete() self.commit() - self.failIf(osp.isfile(expected_filepath)) + self.assertFalse(osp.isfile(expected_filepath)) def test_bfss_sqlite_fspath(self): f1 = self.create_file() @@ -219,7 +219,7 @@ # update f1's local dict. We want the pure rql version to work self.commit() old_path = self.fspath(f1) - self.failUnless(osp.isfile(old_path)) + self.assertTrue(osp.isfile(old_path)) self.assertEqual(osp.splitext(old_path)[1], '.txt') self.execute('SET F data %(d)s, F data_name %(dn)s, F data_format %(df)s WHERE F eid %(f)s', {'d': Binary('some other data'), 'f': f1.eid, 'dn': u'bar.jpg', 'df': u'image/jpeg'}) @@ -228,8 +228,8 @@ # the old file is dead f2 = self.execute('Any F WHERE F eid %(f)s, F is File', {'f': f1.eid}).get_entity(0, 0) new_path = self.fspath(f2) - self.failIf(osp.isfile(old_path)) - self.failUnless(osp.isfile(new_path)) + self.assertFalse(osp.isfile(old_path)) + self.assertTrue(osp.isfile(new_path)) self.assertEqual(osp.splitext(new_path)[1], '.jpg') @tag('update', 'extension', 'rollback') @@ -242,7 +242,7 @@ self.commit() old_path = self.fspath(f1) old_data = f1.data.getvalue() - self.failUnless(osp.isfile(old_path)) + self.assertTrue(osp.isfile(old_path)) self.assertEqual(osp.splitext(old_path)[1], '.txt') self.execute('SET F data %(d)s, F data_name %(dn)s, F data_format %(df)s WHERE F eid %(f)s', {'d': Binary('some other data'), 'f': f1.eid, 'dn': u'bar.jpg', 'df': u'image/jpeg'}) @@ -252,7 +252,7 @@ f2 = self.execute('Any F WHERE F eid %(f)s, F is File', {'f': f1.eid}).get_entity(0, 0) new_path = self.fspath(f2) new_data = f2.data.getvalue() - self.failUnless(osp.isfile(new_path)) + self.assertTrue(osp.isfile(new_path)) self.assertEqual(osp.splitext(new_path)[1], '.txt') self.assertEqual(old_path, new_path) self.assertEqual(old_data, new_data) @@ -279,7 +279,7 @@ self.commit() self.assertEqual(f1.data.getvalue(), 'the new data') self.assertEqual(self.fspath(f1), new_fspath) - self.failIf(osp.isfile(old_fspath)) + self.assertFalse(osp.isfile(old_fspath)) @tag('fsimport') def test_clean(self): diff -r aa30c665bd06 -r f2a5805615f8 server/test/unittest_undo.py --- a/server/test/unittest_undo.py Fri Sep 16 16:42:42 2011 +0200 +++ b/server/test/unittest_undo.py Fri Sep 16 17:30:26 2011 +0200 @@ -43,13 +43,13 @@ # also check transaction actions have been properly deleted cu = self.session.system_sql( "SELECT * from tx_entity_actions WHERE tx_uuid='%s'" % txuuid) - self.failIf(cu.fetchall()) + self.assertFalse(cu.fetchall()) cu = self.session.system_sql( "SELECT * from tx_relation_actions WHERE tx_uuid='%s'" % txuuid) - self.failIf(cu.fetchall()) + self.assertFalse(cu.fetchall()) def test_undo_api(self): - self.failUnless(self.txuuid) + self.assertTrue(self.txuuid) # test transaction api self.assertRaises(NoSuchTransaction, self.cnx.transaction_info, 'hop') @@ -58,7 +58,7 @@ self.assertRaises(NoSuchTransaction, self.cnx.undo_transaction, 'hop') txinfo = self.cnx.transaction_info(self.txuuid) - self.failUnless(txinfo.datetime) + self.assertTrue(txinfo.datetime) self.assertEqual(txinfo.user_eid, self.session.user.eid) self.assertEqual(txinfo.user().login, 'admin') actions = txinfo.actions_list() @@ -159,9 +159,9 @@ undotxuuid = self.commit() self.assertEqual(undotxuuid, None) # undo not undoable self.assertEqual(errors, []) - self.failUnless(self.execute('Any X WHERE X eid %(x)s', {'x': toto.eid})) - self.failUnless(self.execute('Any X WHERE X eid %(x)s', {'x': e.eid})) - self.failUnless(self.execute('Any X WHERE X has_text "toto@logilab"')) + self.assertTrue(self.execute('Any X WHERE X eid %(x)s', {'x': toto.eid})) + self.assertTrue(self.execute('Any X WHERE X eid %(x)s', {'x': e.eid})) + self.assertTrue(self.execute('Any X WHERE X has_text "toto@logilab"')) self.assertEqual(toto.cw_adapt_to('IWorkflowable').state, 'activated') self.assertEqual(toto.cw_adapt_to('IEmailable').get_email(), 'toto@logilab.org') self.assertEqual([(p.pkey, p.value) for p in toto.reverse_for_user], @@ -231,20 +231,20 @@ txuuid = self.commit() errors = self.cnx.undo_transaction(txuuid) self.commit() - self.failIf(errors) - self.failIf(self.execute('Any X WHERE X eid %(x)s', {'x': c.eid})) - self.failIf(self.execute('Any X WHERE X eid %(x)s', {'x': p.eid})) - self.failIf(self.execute('Any X,Y WHERE X fiche Y')) + self.assertFalse(errors) + self.assertFalse(self.execute('Any X WHERE X eid %(x)s', {'x': c.eid})) + self.assertFalse(self.execute('Any X WHERE X eid %(x)s', {'x': p.eid})) + self.assertFalse(self.execute('Any X,Y WHERE X fiche Y')) self.session.set_cnxset() for eid in (p.eid, c.eid): - self.failIf(session.system_sql( + self.assertFalse(session.system_sql( 'SELECT * FROM entities WHERE eid=%s' % eid).fetchall()) - self.failIf(session.system_sql( + self.assertFalse(session.system_sql( 'SELECT 1 FROM owned_by_relation WHERE eid_from=%s' % eid).fetchall()) # added by sql in hooks (except when using dataimport) - self.failIf(session.system_sql( + self.assertFalse(session.system_sql( 'SELECT 1 FROM is_relation WHERE eid_from=%s' % eid).fetchall()) - self.failIf(session.system_sql( + self.assertFalse(session.system_sql( 'SELECT 1 FROM is_instance_of_relation WHERE eid_from=%s' % eid).fetchall()) self.check_transaction_deleted(txuuid) diff -r aa30c665bd06 -r f2a5805615f8 sobjects/test/unittest_email.py --- a/sobjects/test/unittest_email.py Fri Sep 16 16:42:42 2011 +0200 +++ b/sobjects/test/unittest_email.py Fri Sep 16 17:30:26 2011 +0200 @@ -51,7 +51,7 @@ self.execute('SET U primary_email E WHERE U login "anon", E address "client@client.com"') self.commit() rset = self.execute('Any X WHERE X use_email E, E eid %(e)s', {'e': email1}) - self.failIf(rset.rowcount != 1, rset) + self.assertFalse(rset.rowcount != 1, rset) def test_security_check(self): req = self.request() diff -r aa30c665bd06 -r f2a5805615f8 sobjects/test/unittest_notification.py --- a/sobjects/test/unittest_notification.py Fri Sep 16 16:42:42 2011 +0200 +++ b/sobjects/test/unittest_notification.py Fri Sep 16 17:30:26 2011 +0200 @@ -30,29 +30,29 @@ def test_base(self): msgid1 = construct_message_id('testapp', 21) msgid2 = construct_message_id('testapp', 21) - self.failIfEqual(msgid1, msgid2) - self.failIf('&' in msgid1) - self.failIf('=' in msgid1) - self.failIf('/' in msgid1) - self.failIf('+' in msgid1) + self.assertNotEqual(msgid1, msgid2) + self.assertFalse('&' in msgid1) + self.assertFalse('=' in msgid1) + self.assertFalse('/' in msgid1) + self.assertFalse('+' in msgid1) values = parse_message_id(msgid1, 'testapp') - self.failUnless(values) + self.assertTrue(values) # parse_message_id should work with or without surrounding <> - self.failUnlessEqual(values, parse_message_id(msgid1[1:-1], 'testapp')) - self.failUnlessEqual(values['eid'], '21') - self.failUnless('timestamp' in values) - self.failUnlessEqual(parse_message_id(msgid1[1:-1], 'anotherapp'), None) + self.assertEqual(values, parse_message_id(msgid1[1:-1], 'testapp')) + self.assertEqual(values['eid'], '21') + self.assertTrue('timestamp' in values) + self.assertEqual(parse_message_id(msgid1[1:-1], 'anotherapp'), None) def test_notimestamp(self): msgid1 = construct_message_id('testapp', 21, False) msgid2 = construct_message_id('testapp', 21, False) values = parse_message_id(msgid1, 'testapp') - self.failUnlessEqual(values, {'eid': '21'}) + self.assertEqual(values, {'eid': '21'}) def test_parse_message_doesnt_raise(self): - self.failUnlessEqual(parse_message_id('oijioj@bla.bla', 'tesapp'), None) - self.failUnlessEqual(parse_message_id('oijioj@bla', 'tesapp'), None) - self.failUnlessEqual(parse_message_id('oijioj', 'tesapp'), None) + self.assertEqual(parse_message_id('oijioj@bla.bla', 'tesapp'), None) + self.assertEqual(parse_message_id('oijioj@bla', 'tesapp'), None) + self.assertEqual(parse_message_id('oijioj', 'tesapp'), None) def test_nonregr_empty_message_id(self): @@ -86,7 +86,7 @@ req = self.request() u = self.create_user(req, 'toto') u.cw_adapt_to('IWorkflowable').fire_transition('deactivate', comment=u'yeah') - self.failIf(MAILBOX) + self.assertFalse(MAILBOX) self.commit() self.assertEqual(len(MAILBOX), 1) email = MAILBOX[0] diff -r aa30c665bd06 -r f2a5805615f8 test/data/bootstrap_cubes --- a/test/data/bootstrap_cubes Fri Sep 16 16:42:42 2011 +0200 +++ b/test/data/bootstrap_cubes Fri Sep 16 17:30:26 2011 +0200 @@ -1,1 +1,1 @@ -card, file, tag +card, file, tag, localperms diff -r aa30c665bd06 -r f2a5805615f8 test/data/rewrite/bootstrap_cubes --- a/test/data/rewrite/bootstrap_cubes Fri Sep 16 16:42:42 2011 +0200 +++ b/test/data/rewrite/bootstrap_cubes Fri Sep 16 17:30:26 2011 +0200 @@ -1,1 +1,1 @@ -card +card,localperms diff -r aa30c665bd06 -r f2a5805615f8 test/unittest_cwconfig.py --- a/test/unittest_cwconfig.py Fri Sep 16 16:42:42 2011 +0200 +++ b/test/unittest_cwconfig.py Fri Sep 16 17:30:26 2011 +0200 @@ -123,7 +123,7 @@ self.assertEqual(self.config.cubes_search_path(), [CUSTOM_CUBES_DIR, self.config.CUBES_DIR]) - self.failUnless('mycube' in self.config.available_cubes()) + self.assertTrue('mycube' in self.config.available_cubes()) # test cubes python path self.config.adjust_sys_path() import cubes diff -r aa30c665bd06 -r f2a5805615f8 test/unittest_entity.py --- a/test/unittest_entity.py Fri Sep 16 16:42:42 2011 +0200 +++ b/test/unittest_entity.py Fri Sep 16 17:30:26 2011 +0200 @@ -42,7 +42,7 @@ def test_boolean_value(self): e = self.vreg['etypes'].etype_class('CWUser')(self.request()) - self.failUnless(e) + self.assertTrue(e) def test_yams_inheritance(self): from entities import Note @@ -87,8 +87,8 @@ {'t': oe.eid, 'u': p.eid}) e = req.create_entity('Note', type=u'z') e.copy_relations(oe.eid) - self.failIf(e.ecrit_par) - self.failUnless(oe.ecrit_par) + self.assertFalse(e.ecrit_par) + self.assertTrue(oe.ecrit_par) def test_copy_with_composite(self): user = self.user() @@ -100,8 +100,8 @@ 'WHERE G name "users"')[0][0] e = self.execute('Any X WHERE X eid %(x)s', {'x': usereid}).get_entity(0, 0) e.copy_relations(user.eid) - self.failIf(e.use_email) - self.failIf(e.primary_email) + self.assertFalse(e.use_email) + self.assertFalse(e.primary_email) def test_copy_with_non_initial_state(self): user = self.user() @@ -128,7 +128,7 @@ groups = user.in_group self.assertEqual(sorted(user._cw_related_cache), ['in_group_subject', 'primary_email_subject']) for group in groups: - self.failIf('in_group_subject' in group._cw_related_cache, group._cw_related_cache.keys()) + self.assertFalse('in_group_subject' in group._cw_related_cache, group._cw_related_cache.keys()) def test_related_limit(self): req = self.request() @@ -179,7 +179,7 @@ try: # testing basic fetch_attrs attribute self.assertEqual(Personne.fetch_rql(user), - 'Any X,AA,AB,AC ORDERBY AA ASC ' + 'Any X,AA,AB,AC ORDERBY AA ' 'WHERE X is Personne, X nom AA, X prenom AB, X modification_date AC') # testing unknown attributes Personne.fetch_attrs = ('bloug', 'beep') @@ -187,36 +187,36 @@ # testing one non final relation Personne.fetch_attrs = ('nom', 'prenom', 'travaille') self.assertEqual(Personne.fetch_rql(user), - 'Any X,AA,AB,AC,AD ORDERBY AA ASC ' + 'Any X,AA,AB,AC,AD ORDERBY AA ' 'WHERE X is Personne, X nom AA, X prenom AB, X travaille AC?, AC nom AD') # testing two non final relations Personne.fetch_attrs = ('nom', 'prenom', 'travaille', 'evaluee') self.assertEqual(Personne.fetch_rql(user), - 'Any X,AA,AB,AC,AD,AE ORDERBY AA ASC ' + 'Any X,AA,AB,AC,AD,AE ORDERBY AA ' 'WHERE X is Personne, X nom AA, X prenom AB, X travaille AC?, AC nom AD, ' 'X evaluee AE?') # testing one non final relation with recursion Personne.fetch_attrs = ('nom', 'prenom', 'travaille') Societe.fetch_attrs = ('nom', 'evaluee') self.assertEqual(Personne.fetch_rql(user), - 'Any X,AA,AB,AC,AD,AE,AF ORDERBY AA ASC,AF DESC ' + 'Any X,AA,AB,AC,AD,AE,AF ORDERBY AA,AF DESC ' 'WHERE X is Personne, X nom AA, X prenom AB, X travaille AC?, AC nom AD, ' 'AC evaluee AE?, AE modification_date AF' ) # testing symmetric relation Personne.fetch_attrs = ('nom', 'connait') - self.assertEqual(Personne.fetch_rql(user), 'Any X,AA,AB ORDERBY AA ASC ' + self.assertEqual(Personne.fetch_rql(user), 'Any X,AA,AB ORDERBY AA ' 'WHERE X is Personne, X nom AA, X connait AB?') # testing optional relation peschema.subjrels['travaille'].rdef(peschema, seschema).cardinality = '?*' Personne.fetch_attrs = ('nom', 'prenom', 'travaille') Societe.fetch_attrs = ('nom',) self.assertEqual(Personne.fetch_rql(user), - 'Any X,AA,AB,AC,AD ORDERBY AA ASC WHERE X is Personne, X nom AA, X prenom AB, X travaille AC?, AC nom AD') + 'Any X,AA,AB,AC,AD ORDERBY AA WHERE X is Personne, X nom AA, X prenom AB, X travaille AC?, AC nom AD') # testing relation with cardinality > 1 peschema.subjrels['travaille'].rdef(peschema, seschema).cardinality = '**' self.assertEqual(Personne.fetch_rql(user), - 'Any X,AA,AB ORDERBY AA ASC WHERE X is Personne, X nom AA, X prenom AB') + 'Any X,AA,AB ORDERBY AA WHERE X is Personne, X nom AA, X prenom AB') # XXX test unauthorized attribute finally: # fetch_attrs restored by generic tearDown @@ -227,14 +227,20 @@ Personne = self.vreg['etypes'].etype_class('Personne') Note = self.vreg['etypes'].etype_class('Note') SubNote = self.vreg['etypes'].etype_class('SubNote') - self.failUnless(issubclass(self.vreg['etypes'].etype_class('SubNote'), Note)) + self.assertTrue(issubclass(self.vreg['etypes'].etype_class('SubNote'), Note)) Personne.fetch_attrs, Personne.fetch_order = fetch_config(('nom', 'type')) Note.fetch_attrs, Note.fetch_order = fetch_config(('type',)) SubNote.fetch_attrs, SubNote.fetch_order = fetch_config(('type',)) p = self.request().create_entity('Personne', nom=u'pouet') self.assertEqual(p.cw_related_rql('evaluee'), - 'Any X,AA,AB ORDERBY AA ASC WHERE E eid %(x)s, E evaluee X, ' - 'X type AA, X modification_date AB') + 'Any X,AA,AB ORDERBY AA WHERE E eid %(x)s, E evaluee X, ' + 'X type AA, X modification_date AB') + n = self.request().create_entity('Note') + self.assertEqual(n.cw_related_rql('evaluee', role='object', + targettypes=('Societe', 'Personne')), + "Any X,AA ORDERBY AB DESC WHERE E eid %(x)s, X evaluee E, " + "X is IN('Personne', 'Societe'), X nom AA, " + "X modification_date AB") Personne.fetch_attrs, Personne.fetch_order = fetch_config(('nom', )) # XXX self.assertEqual(p.cw_related_rql('evaluee'), @@ -246,8 +252,8 @@ 'Any X,AA ORDERBY AA DESC ' 'WHERE E eid %(x)s, E tags X, X modification_date AA') self.assertEqual(tag.cw_related_rql('tags', 'subject', ('Personne',)), - 'Any X,AA,AB ORDERBY AA ASC ' - 'WHERE E eid %(x)s, E tags X, X is IN (Personne), X nom AA, ' + 'Any X,AA,AB ORDERBY AA ' + 'WHERE E eid %(x)s, E tags X, X is Personne, X nom AA, ' 'X modification_date AB') def test_related_rql_ambiguous_cant_use_fetch_order(self): @@ -258,7 +264,7 @@ 'Any X,AA ORDERBY AA DESC ' 'WHERE E eid %(x)s, E tags X, X modification_date AA') - def test_related_rql_cant_fetch_ambiguous_rtype(self): + def test_related_rql_fetch_ambiguous_rtype(self): soc_etype = self.vreg['etypes'].etype_class('Societe') soc = soc_etype(self.request()) soc_etype.fetch_attrs = ('fournit',) @@ -266,15 +272,14 @@ self.vreg['etypes'].etype_class('Produit').fetch_attrs = ('fabrique_par',) self.vreg['etypes'].etype_class('Usine').fetch_attrs = ('lieu',) self.vreg['etypes'].etype_class('Personne').fetch_attrs = ('nom',) - # XXX should be improved: we could fetch fabrique_par object too self.assertEqual(soc.cw_related_rql('fournit', 'subject'), - 'Any X WHERE E eid %(x)s, E fournit X') + 'Any X,A WHERE E eid %(x)s, E fournit X, X fabrique_par A') def test_unrelated_rql_security_1_manager(self): user = self.request().user rql = user.cw_unrelated_rql('use_email', 'EmailAddress', 'subject')[0] self.assertEqual(rql, 'Any O,AA,AB,AC ORDERBY AC DESC ' - 'WHERE NOT EXISTS(ZZ use_email O), S eid %(x)s, ' + 'WHERE NOT ZZ use_email O, S eid %(x)s, ' 'O is EmailAddress, O address AA, O alias AB, O modification_date AC') def test_unrelated_rql_security_1_user(self): @@ -284,37 +289,37 @@ user = req.user rql = user.cw_unrelated_rql('use_email', 'EmailAddress', 'subject')[0] self.assertEqual(rql, 'Any O,AA,AB,AC ORDERBY AC DESC ' - 'WHERE NOT EXISTS(ZZ use_email O), S eid %(x)s, ' + 'WHERE NOT ZZ use_email O, S eid %(x)s, ' 'O is EmailAddress, O address AA, O alias AB, O modification_date AC') user = self.execute('Any X WHERE X login "admin"').get_entity(0, 0) rql = user.cw_unrelated_rql('use_email', 'EmailAddress', 'subject')[0] self.assertEqual(rql, 'Any O,AA,AB,AC ORDERBY AC DESC ' - 'WHERE NOT EXISTS(ZZ use_email O, ZZ is CWUser), S eid %(x)s, ' - 'O is EmailAddress, O address AA, O alias AB, O modification_date AC, A eid %(B)s, ' - 'EXISTS(S identity A, NOT A in_group C, C name "guests", C is CWGroup)') + 'WHERE NOT ZZ use_email O, S eid %(x)s, ' + 'O is EmailAddress, O address AA, O alias AB, O modification_date AC, AD eid %(AE)s, ' + 'EXISTS(S identity AD, NOT AD in_group AF, AF name "guests", AF is CWGroup), ZZ is CWUser') def test_unrelated_rql_security_1_anon(self): self.login('anon') user = self.request().user rql = user.cw_unrelated_rql('use_email', 'EmailAddress', 'subject')[0] self.assertEqual(rql, 'Any O,AA,AB,AC ORDERBY AC DESC ' - 'WHERE NOT EXISTS(ZZ use_email O, ZZ is CWUser), S eid %(x)s, ' - 'O is EmailAddress, O address AA, O alias AB, O modification_date AC, A eid %(B)s, ' - 'EXISTS(S identity A, NOT A in_group C, C name "guests", C is CWGroup)') + 'WHERE NOT ZZ use_email O, S eid %(x)s, ' + 'O is EmailAddress, O address AA, O alias AB, O modification_date AC, AD eid %(AE)s, ' + 'EXISTS(S identity AD, NOT AD in_group AF, AF name "guests", AF is CWGroup), ZZ is CWUser') def test_unrelated_rql_security_2(self): email = self.execute('INSERT EmailAddress X: X address "hop"').get_entity(0, 0) rql = email.cw_unrelated_rql('use_email', 'CWUser', 'object')[0] self.assertEqual(rql, 'Any S,AA,AB,AC,AD ORDERBY AA ' - 'WHERE NOT EXISTS(S use_email O), O eid %(x)s, S is CWUser, ' + 'WHERE NOT S use_email O, O eid %(x)s, S is CWUser, ' 'S login AA, S firstname AB, S surname AC, S modification_date AD') self.login('anon') email = self.execute('Any X WHERE X eid %(x)s', {'x': email.eid}).get_entity(0, 0) rql = email.cw_unrelated_rql('use_email', 'CWUser', 'object')[0] self.assertEqual(rql, 'Any S,AA,AB,AC,AD ORDERBY AA ' - 'WHERE NOT EXISTS(S use_email O), O eid %(x)s, S is CWUser, ' + 'WHERE NOT S use_email O, O eid %(x)s, S is CWUser, ' 'S login AA, S firstname AB, S surname AC, S modification_date AD, ' - 'A eid %(B)s, EXISTS(S identity A, NOT A in_group C, C name "guests", C is CWGroup)') + 'AE eid %(AF)s, EXISTS(S identity AE, NOT AE in_group AG, AG name "guests", AG is CWGroup)') def test_unrelated_rql_security_nonexistant(self): self.login('anon') @@ -323,7 +328,7 @@ self.assertEqual(rql, 'Any S,AA,AB,AC,AD ORDERBY AA ' 'WHERE S is CWUser, ' 'S login AA, S firstname AB, S surname AC, S modification_date AD, ' - 'A eid %(B)s, EXISTS(S identity A, NOT A in_group C, C name "guests", C is CWGroup)') + 'AE eid %(AF)s, EXISTS(S identity AE, NOT AE in_group AG, AG name "guests", AG is CWGroup)') def test_unrelated_rql_constraints_creation_subject(self): person = self.vreg['etypes'].etype_class('Personne')(self.request()) @@ -338,14 +343,15 @@ self.assertEqual( rql, 'Any S,AA,AB,AC ORDERBY AC DESC WHERE ' 'S is Personne, S nom AA, S prenom AB, S modification_date AC, ' - 'NOT (S connait A, A nom "toto"), A is Personne, EXISTS(S travaille B, B nom "tutu")') + 'NOT (S connait AD, AD nom "toto"), AD is Personne, ' + 'EXISTS(S travaille AE, AE nom "tutu")') def test_unrelated_rql_constraints_edition_subject(self): person = self.request().create_entity('Personne', nom=u'sylvain') rql = person.cw_unrelated_rql('connait', 'Personne', 'subject')[0] self.assertEqual( rql, 'Any O,AA,AB,AC ORDERBY AC DESC WHERE ' - 'NOT EXISTS(S connait O), S eid %(x)s, O is Personne, ' + 'NOT S connait O, S eid %(x)s, O is Personne, ' 'O nom AA, O prenom AB, O modification_date AC, ' 'NOT S identity O') @@ -354,23 +360,23 @@ rql = person.cw_unrelated_rql('connait', 'Personne', 'object')[0] self.assertEqual( rql, 'Any S,AA,AB,AC ORDERBY AC DESC WHERE ' - 'NOT EXISTS(S connait O), O eid %(x)s, S is Personne, ' + 'NOT S connait O, O eid %(x)s, S is Personne, ' 'S nom AA, S prenom AB, S modification_date AC, ' - 'NOT S identity O, NOT (S connait A, A nom "toto"), ' - 'EXISTS(S travaille B, B nom "tutu")') + 'NOT S identity O, NOT (S connait AD, AD nom "toto"), ' + 'EXISTS(S travaille AE, AE nom "tutu")') def test_unrelated_base(self): req = self.request() p = req.create_entity('Personne', nom=u'di mascio', prenom=u'adrien') e = req.create_entity('Tag', name=u'x') related = [r.eid for r in e.tags] - self.failUnlessEqual(related, []) + self.assertEqual(related, []) unrelated = [r[0] for r in e.unrelated('tags', 'Personne', 'subject')] - self.failUnless(p.eid in unrelated) + self.assertTrue(p.eid in unrelated) self.execute('SET X tags Y WHERE X is Tag, Y is Personne') e = self.execute('Any X WHERE X is Tag').get_entity(0, 0) unrelated = [r[0] for r in e.unrelated('tags', 'Personne', 'subject')] - self.failIf(p.eid in unrelated) + self.assertFalse(p.eid in unrelated) def test_unrelated_limit(self): req = self.request() @@ -538,7 +544,7 @@ p2 = req.create_entity('Personne', nom=u'toto') self.execute('SET X evaluee Y WHERE X nom "di mascio", Y nom "toto"') self.assertEqual(p1.evaluee[0].nom, "toto") - self.failUnless(not p1.reverse_evaluee) + self.assertTrue(not p1.reverse_evaluee) def test_complete_relation(self): session = self.session @@ -547,10 +553,10 @@ 'WHERE U login "admin", S1 name "activated", S2 name "deactivated"')[0][0] trinfo = self.execute('Any X WHERE X eid %(x)s', {'x': eid}).get_entity(0, 0) trinfo.complete() - self.failUnless(isinstance(trinfo.cw_attr_cache['creation_date'], datetime)) - self.failUnless(trinfo.cw_relation_cached('from_state', 'subject')) - self.failUnless(trinfo.cw_relation_cached('to_state', 'subject')) - self.failUnless(trinfo.cw_relation_cached('wf_info_for', 'subject')) + self.assertTrue(isinstance(trinfo.cw_attr_cache['creation_date'], datetime)) + self.assertTrue(trinfo.cw_relation_cached('from_state', 'subject')) + self.assertTrue(trinfo.cw_relation_cached('to_state', 'subject')) + self.assertTrue(trinfo.cw_relation_cached('wf_info_for', 'subject')) self.assertEqual(trinfo.by_transition, ()) def test_request_cache(self): @@ -558,7 +564,7 @@ user = self.execute('CWUser X WHERE X login "admin"', req=req).get_entity(0, 0) state = user.in_state[0] samestate = self.execute('State X WHERE X name "activated"', req=req).get_entity(0, 0) - self.failUnless(state is samestate) + self.assertTrue(state is samestate) def test_rest_path(self): req = self.request() diff -r aa30c665bd06 -r f2a5805615f8 test/unittest_rqlrewrite.py --- a/test/unittest_rqlrewrite.py Fri Sep 16 16:42:42 2011 +0200 +++ b/test/unittest_rqlrewrite.py Fri Sep 16 17:30:26 2011 +0200 @@ -110,7 +110,7 @@ 'P name "read", P require_group G') rqlst = parse('Card C') rewrite(rqlst, {('C', 'X'): (constraint,)}, {}) - self.failUnlessEqual(rqlst.as_string(), + self.assertEqual(rqlst.as_string(), u"Any C WHERE C is Card, B eid %(D)s, " "EXISTS(C in_state A, B in_group E, F require_state A, " "F name 'read', F require_group E, A is State, E is CWGroup, F is CWPermission)") @@ -129,13 +129,13 @@ "F name 'read', F require_group E, A is State, E is CWGroup, F is CWPermission), " "(EXISTS(S ref LIKE 'PUBLIC%')) OR (EXISTS(B in_group G, G name 'public', G is CWGroup)), " "S is Affaire") - self.failUnless('D' in kwargs) + self.assertTrue('D' in kwargs) def test_or(self): constraint = '(X identity U) OR (X in_state ST, CL identity U, CL in_state ST, ST name "subscribed")' rqlst = parse('Any S WHERE S owned_by C, C eid %(u)s, S is in (CWUser, CWGroup)') rewrite(rqlst, {('C', 'X'): (constraint,)}, {'u':1}) - self.failUnlessEqual(rqlst.as_string(), + self.assertEqual(rqlst.as_string(), "Any S WHERE S owned_by C, C eid %(u)s, S is IN(CWUser, CWGroup), A eid %(B)s, " "EXISTS((C identity A) OR (C in_state D, E identity A, " "E in_state D, D name 'subscribed'), D is State, E is CWUser)") @@ -145,7 +145,7 @@ 'P name "read", P require_group G') rqlst = parse('Any 2') # this is the simplified rql st for Any X WHERE X eid 12 rewrite(rqlst, {('2', 'X'): (constraint,)}, {}) - self.failUnlessEqual(rqlst.as_string(), + self.assertEqual(rqlst.as_string(), u"Any 2 WHERE B eid %(C)s, " "EXISTS(2 in_state A, B in_group D, E require_state A, " "E name 'read', E require_group D, A is State, D is CWGroup, E is CWPermission)") @@ -155,7 +155,7 @@ 'P name "read", P require_group G') rqlst = parse('Any A,C WHERE A documented_by C?') rewrite(rqlst, {('C', 'X'): (constraint,)}, {}) - self.failUnlessEqual(rqlst.as_string(), + self.assertEqual(rqlst.as_string(), "Any A,C WHERE A documented_by C?, A is Affaire " "WITH C BEING " "(Any C WHERE EXISTS(C in_state B, D in_group F, G require_state B, G name 'read', " @@ -166,7 +166,7 @@ 'P name "read", P require_group G') rqlst = parse('Any A,C,T WHERE A documented_by C?, C title T') rewrite(rqlst, {('C', 'X'): (constraint,)}, {}) - self.failUnlessEqual(rqlst.as_string(), + self.assertEqual(rqlst.as_string(), "Any A,C,T WHERE A documented_by C?, A is Affaire " "WITH C,T BEING " "(Any C,T WHERE C title T, EXISTS(C in_state B, D in_group F, " @@ -179,7 +179,7 @@ constraint2 = 'X in_state S, S name "public"' rqlst = parse('Any A,C,T WHERE A documented_by C?, C title T') rewrite(rqlst, {('C', 'X'): (constraint1, constraint2)}, {}) - self.failUnlessEqual(rqlst.as_string(), + self.assertEqual(rqlst.as_string(), "Any A,C,T WHERE A documented_by C?, A is Affaire " "WITH C,T BEING (Any C,T WHERE C title T, " "EXISTS(C in_state B, D in_group F, G require_state B, G name 'read', G require_group F), " @@ -194,7 +194,7 @@ rewrite(rqlst, {('LA', 'X'): (constraint1, constraint2), ('X', 'X'): (constraint3,), ('Y', 'X'): (constraint3,)}, {}) - self.failUnlessEqual(rqlst.as_string(), + self.assertEqual(rqlst.as_string(), u'Any X,LA,Y WHERE LA? documented_by X, LA concerne Y, B eid %(C)s, ' 'EXISTS(X created_by B), EXISTS(Y created_by B), ' 'X is Card, Y is IN(Division, Note, Societe) ' @@ -209,7 +209,7 @@ ('A', 'X'): (c2,), }, {}) # XXX suboptimal - self.failUnlessEqual(rqlst.as_string(), + self.assertEqual(rqlst.as_string(), "Any C,A,R WITH A,R,C BEING " "(Any A,R,C WHERE A ref R, A? inlined_card C, " "(A is NULL) OR (EXISTS(A inlined_card B, B require_permission D, " @@ -223,7 +223,7 @@ # rewrite(rqlst, {('C', 'X'): (c1,), # ('A', 'X'): (c2,), # }, {}) - # self.failUnlessEqual(rqlst.as_string(), + # self.assertEqual(rqlst.as_string(), # "") def test_optional_var_inlined_imbricated_error(self): @@ -243,14 +243,14 @@ snippet = ('X in_state S, S name "hop"') rqlst = parse('Card C WHERE C in_state STATE') rewrite(rqlst, {('C', 'X'): (snippet,)}, {}) - self.failUnlessEqual(rqlst.as_string(), + self.assertEqual(rqlst.as_string(), "Any C WHERE C in_state STATE, C is Card, " "EXISTS(STATE name 'hop'), STATE is State") def test_relation_optimization_1_rhs(self): snippet = ('TW subworkflow_exit X, TW name "hop"') rqlst = parse('WorkflowTransition C WHERE C subworkflow_exit EXIT') rewrite(rqlst, {('EXIT', 'X'): (snippet,)}, {}) - self.failUnlessEqual(rqlst.as_string(), + self.assertEqual(rqlst.as_string(), "Any C WHERE C subworkflow_exit EXIT, C is WorkflowTransition, " "EXISTS(C name 'hop'), EXIT is SubWorkflowExitPoint") @@ -259,14 +259,14 @@ snippet = ('X in_state S?, S name "hop"') rqlst = parse('Card C WHERE C in_state STATE?') rewrite(rqlst, {('C', 'X'): (snippet,)}, {}) - self.failUnlessEqual(rqlst.as_string(), + self.assertEqual(rqlst.as_string(), "Any C WHERE C in_state STATE?, C is Card, " "EXISTS(STATE name 'hop'), STATE is State") def test_relation_optimization_2_rhs(self): snippet = ('TW? subworkflow_exit X, TW name "hop"') rqlst = parse('SubWorkflowExitPoint EXIT WHERE C? subworkflow_exit EXIT') rewrite(rqlst, {('EXIT', 'X'): (snippet,)}, {}) - self.failUnlessEqual(rqlst.as_string(), + self.assertEqual(rqlst.as_string(), "Any EXIT WHERE C? subworkflow_exit EXIT, EXIT is SubWorkflowExitPoint, " "EXISTS(C name 'hop'), C is WorkflowTransition") @@ -275,14 +275,14 @@ snippet = ('X in_state S?, S name "hop"') rqlst = parse('Card C WHERE C in_state STATE') rewrite(rqlst, {('C', 'X'): (snippet,)}, {}) - self.failUnlessEqual(rqlst.as_string(), + self.assertEqual(rqlst.as_string(), "Any C WHERE C in_state STATE, C is Card, " "EXISTS(STATE name 'hop'), STATE is State") def test_relation_optimization_3_rhs(self): snippet = ('TW? subworkflow_exit X, TW name "hop"') rqlst = parse('WorkflowTransition C WHERE C subworkflow_exit EXIT') rewrite(rqlst, {('EXIT', 'X'): (snippet,)}, {}) - self.failUnlessEqual(rqlst.as_string(), + self.assertEqual(rqlst.as_string(), "Any C WHERE C subworkflow_exit EXIT, C is WorkflowTransition, " "EXISTS(C name 'hop'), EXIT is SubWorkflowExitPoint") @@ -291,14 +291,14 @@ snippet = ('X in_state S, S name "hop"') rqlst = parse('Card C WHERE C in_state STATE?') rewrite(rqlst, {('C', 'X'): (snippet,)}, {}) - self.failUnlessEqual(rqlst.as_string(), + self.assertEqual(rqlst.as_string(), "Any C WHERE C in_state STATE?, C is Card, " "EXISTS(C in_state A, A name 'hop', A is State), STATE is State") def test_relation_non_optimization_1_rhs(self): snippet = ('TW subworkflow_exit X, TW name "hop"') rqlst = parse('SubWorkflowExitPoint EXIT WHERE C? subworkflow_exit EXIT') rewrite(rqlst, {('EXIT', 'X'): (snippet,)}, {}) - self.failUnlessEqual(rqlst.as_string(), + self.assertEqual(rqlst.as_string(), "Any EXIT WHERE C? subworkflow_exit EXIT, EXIT is SubWorkflowExitPoint, " "EXISTS(A subworkflow_exit EXIT, A name 'hop', A is WorkflowTransition), " "C is WorkflowTransition") @@ -313,7 +313,7 @@ trinfo_constraint = ('X wf_info_for Y, Y require_permission P, P name "read"') rqlst = parse('Any U,T WHERE U is CWUser, T wf_info_for U') rewrite(rqlst, {('T', 'X'): (trinfo_constraint, 'X wf_info_for Y, Y in_group G, G name "managers"')}, {}) - self.failUnlessEqual(rqlst.as_string(), + self.assertEqual(rqlst.as_string(), u"Any U,T WHERE U is CWUser, T wf_info_for U, " "EXISTS(U in_group B, B name 'managers', B is CWGroup), T is TrInfo") @@ -322,14 +322,14 @@ trinfo_constraint = ('X wf_info_for Y, Y require_permission P, P name "read"') rqlst = parse('Any T WHERE T wf_info_for X') rewrite(rqlst, {('T', 'X'): (trinfo_constraint, 'X in_group G, G name "managers"')}, {}) - self.failUnlessEqual(rqlst.as_string(), + self.assertEqual(rqlst.as_string(), u'XXX dunno what should be generated') def test_add_ambiguity_exists(self): constraint = ('X concerne Y') rqlst = parse('Affaire X') rewrite(rqlst, {('X', 'X'): (constraint,)}, {}) - self.failUnlessEqual(rqlst.as_string(), + self.assertEqual(rqlst.as_string(), u"Any X WHERE X is Affaire, ((EXISTS(X concerne A, A is Division)) OR (EXISTS(X concerne C, C is Societe))) OR (EXISTS(X concerne B, B is Note))") def test_add_ambiguity_outerjoin(self): @@ -337,7 +337,7 @@ rqlst = parse('Any X,C WHERE X? documented_by C') rewrite(rqlst, {('X', 'X'): (constraint,)}, {}) # ambiguity are kept in the sub-query, no need to be resolved using OR - self.failUnlessEqual(rqlst.as_string(), + self.assertEqual(rqlst.as_string(), u"Any X,C WHERE X? documented_by C, C is Card WITH X BEING (Any X WHERE EXISTS(X concerne A), X is Affaire)") @@ -345,76 +345,76 @@ constraint = RRQLExpression('S owned_by U') rqlst = parse('Card C') rewrite(rqlst, {('C', 'S'): (constraint,)}, {}, 'SU') - self.failUnlessEqual(rqlst.as_string(), + self.assertEqual(rqlst.as_string(), u"Any C WHERE C is Card, A eid %(B)s, EXISTS(C owned_by A)") rqlst = parse('Card C') rewrite(rqlst, {('C', 'S'): (constraint,)}, {}, 'OU') - self.failUnlessEqual(rqlst.as_string(), + self.assertEqual(rqlst.as_string(), u"Any C WHERE C is Card") rqlst = parse('Card C') rewrite(rqlst, {('C', 'S'): (constraint,)}, {}, 'SOU') - self.failUnlessEqual(rqlst.as_string(), + self.assertEqual(rqlst.as_string(), u"Any C WHERE C is Card, A eid %(B)s, EXISTS(C owned_by A)") def test_rrqlexpr_nonexistant_subject_2(self): constraint = RRQLExpression('S owned_by U, O owned_by U, O is Card') rqlst = parse('Card C') rewrite(rqlst, {('C', 'S'): (constraint,)}, {}, 'SU') - self.failUnlessEqual(rqlst.as_string(), + self.assertEqual(rqlst.as_string(), 'Any C WHERE C is Card, A eid %(B)s, EXISTS(C owned_by A)') rqlst = parse('Card C') rewrite(rqlst, {('C', 'S'): (constraint,)}, {}, 'OU') - self.failUnlessEqual(rqlst.as_string(), + self.assertEqual(rqlst.as_string(), 'Any C WHERE C is Card, B eid %(D)s, EXISTS(A owned_by B, A is Card)') rqlst = parse('Card C') rewrite(rqlst, {('C', 'S'): (constraint,)}, {}, 'SOU') - self.failUnlessEqual(rqlst.as_string(), + self.assertEqual(rqlst.as_string(), 'Any C WHERE C is Card, A eid %(B)s, EXISTS(C owned_by A, D owned_by A, D is Card)') def test_rrqlexpr_nonexistant_subject_3(self): constraint = RRQLExpression('U in_group G, G name "users"') rqlst = parse('Card C') rewrite(rqlst, {('C', 'S'): (constraint,)}, {}, 'SU') - self.failUnlessEqual(rqlst.as_string(), + self.assertEqual(rqlst.as_string(), u'Any C WHERE C is Card, A eid %(B)s, EXISTS(A in_group D, D name "users", D is CWGroup)') def test_rrqlexpr_nonexistant_subject_4(self): constraint = RRQLExpression('U in_group G, G name "users", S owned_by U') rqlst = parse('Card C') rewrite(rqlst, {('C', 'S'): (constraint,)}, {}, 'SU') - self.failUnlessEqual(rqlst.as_string(), + self.assertEqual(rqlst.as_string(), u'Any C WHERE C is Card, A eid %(B)s, EXISTS(A in_group D, D name "users", C owned_by A, D is CWGroup)') rqlst = parse('Card C') rewrite(rqlst, {('C', 'S'): (constraint,)}, {}, 'OU') - self.failUnlessEqual(rqlst.as_string(), + self.assertEqual(rqlst.as_string(), u'Any C WHERE C is Card, A eid %(B)s, EXISTS(A in_group D, D name "users", D is CWGroup)') def test_rrqlexpr_nonexistant_subject_5(self): constraint = RRQLExpression('S owned_by Z, O owned_by Z, O is Card') rqlst = parse('Card C') rewrite(rqlst, {('C', 'S'): (constraint,)}, {}, 'S') - self.failUnlessEqual(rqlst.as_string(), + self.assertEqual(rqlst.as_string(), u"Any C WHERE C is Card, EXISTS(C owned_by A, A is CWUser)") def test_rqlexpr_not_relation_1_1(self): constraint = RRQLExpression('X owned_by Z, Z login "hop"', 'X') rqlst = parse('Affaire A WHERE NOT EXISTS(A documented_by C)') rewrite(rqlst, {('C', 'X'): (constraint,)}, {}, 'X') - self.failUnlessEqual(rqlst.as_string(), + self.assertEqual(rqlst.as_string(), u'Any A WHERE NOT EXISTS(A documented_by C, EXISTS(C owned_by B, B login "hop", B is CWUser), C is Card), A is Affaire') def test_rqlexpr_not_relation_1_2(self): constraint = RRQLExpression('X owned_by Z, Z login "hop"', 'X') rqlst = parse('Affaire A WHERE NOT EXISTS(A documented_by C)') rewrite(rqlst, {('A', 'X'): (constraint,)}, {}, 'X') - self.failUnlessEqual(rqlst.as_string(), + self.assertEqual(rqlst.as_string(), u'Any A WHERE NOT EXISTS(A documented_by C, C is Card), A is Affaire, EXISTS(A owned_by B, B login "hop", B is CWUser)') def test_rqlexpr_not_relation_2(self): constraint = RRQLExpression('X owned_by Z, Z login "hop"', 'X') rqlst = rqlhelper.parse('Affaire A WHERE NOT A documented_by C', annotate=False) rewrite(rqlst, {('C', 'X'): (constraint,)}, {}, 'X') - self.failUnlessEqual(rqlst.as_string(), + self.assertEqual(rqlst.as_string(), u'Any A WHERE NOT EXISTS(A documented_by C, EXISTS(C owned_by B, B login "hop", B is CWUser), C is Card), A is Affaire') diff -r aa30c665bd06 -r f2a5805615f8 test/unittest_rset.py --- a/test/unittest_rset.py Fri Sep 16 16:42:42 2011 +0200 +++ b/test/unittest_rset.py Fri Sep 16 17:30:26 2011 +0200 @@ -401,7 +401,7 @@ def test_entities(self): rset = self.execute('Any U,G WHERE U in_group G') # make sure we have at least one element - self.failUnless(rset) + self.assertTrue(rset) self.assertEqual(set(e.e_schema.type for e in rset.entities(0)), set(['CWUser',])) self.assertEqual(set(e.e_schema.type for e in rset.entities(1)), @@ -410,7 +410,7 @@ def test_iter_rows_with_entities(self): rset = self.execute('Any U,UN,G,GN WHERE U in_group G, U login UN, G name GN') # make sure we have at least one element - self.failUnless(rset) + self.assertTrue(rset) out = list(rset.iter_rows_with_entities())[0] self.assertEqual( out[0].login, out[1] ) self.assertEqual( out[2].name, out[3] ) diff -r aa30c665bd06 -r f2a5805615f8 test/unittest_schema.py --- a/test/unittest_schema.py Fri Sep 16 16:42:42 2011 +0200 +++ b/test/unittest_schema.py Fri Sep 16 17:30:26 2011 +0200 @@ -106,9 +106,9 @@ # isinstance(cstr, RQLConstraint) # -> expected to return RQLConstraint instances but not # RRQLVocabularyConstraint and QLUniqueConstraint - self.failIf(issubclass(RQLUniqueConstraint, RQLVocabularyConstraint)) - self.failIf(issubclass(RQLUniqueConstraint, RQLConstraint)) - self.failUnless(issubclass(RQLConstraint, RQLVocabularyConstraint)) + self.assertFalse(issubclass(RQLUniqueConstraint, RQLVocabularyConstraint)) + self.assertFalse(issubclass(RQLUniqueConstraint, RQLConstraint)) + self.assertTrue(issubclass(RQLConstraint, RQLVocabularyConstraint)) def test_entity_perms(self): self.assertEqual(eperson.get_groups('read'), set(('managers', 'users', 'guests'))) @@ -193,7 +193,7 @@ 'fabrique_par', 'final', 'firstname', 'for_user', 'fournit', 'from_entity', 'from_state', 'fulltext_container', 'fulltextindexed', - 'has_text', + 'has_group_permission', 'has_text', 'identity', 'in_group', 'in_state', 'indexed', 'initial_state', 'inlined', 'internationalizable', 'is', 'is_instance_of', @@ -225,12 +225,13 @@ rels = sorted(str(r) for r in eschema.subject_relations()) self.assertListEqual(rels, ['created_by', 'creation_date', 'custom_workflow', 'cw_source', 'cwuri', 'eid', - 'evaluee', 'firstname', 'has_text', 'identity', - 'in_group', 'in_state', 'is', - 'is_instance_of', 'last_login_time', - 'login', 'modification_date', 'owned_by', - 'primary_email', 'surname', 'upassword', - 'use_email']) + 'evaluee', 'firstname', 'has_group_permission', + 'has_text', 'identity', + 'in_group', 'in_state', 'is', + 'is_instance_of', 'last_login_time', + 'login', 'modification_date', 'owned_by', + 'primary_email', 'surname', 'upassword', + 'use_email']) rels = sorted(r.type for r in eschema.object_relations()) self.assertListEqual(rels, ['bookmarked_by', 'created_by', 'for_user', 'identity', 'owned_by', 'wf_info_for']) @@ -238,15 +239,15 @@ properties = rschema.rdef('CWAttribute', 'CWRType') self.assertEqual(properties.cardinality, '1*') constraints = properties.constraints - self.failUnlessEqual(len(constraints), 1, constraints) + self.assertEqual(len(constraints), 1, constraints) constraint = constraints[0] - self.failUnless(isinstance(constraint, RQLConstraint)) - self.failUnlessEqual(constraint.expression, 'O final TRUE') + self.assertTrue(isinstance(constraint, RQLConstraint)) + self.assertEqual(constraint.expression, 'O final TRUE') def test_fulltext_container(self): schema = loader.load(config) - self.failUnless('has_text' in schema['CWUser'].subject_relations()) - self.failIf('has_text' in schema['EmailAddress'].subject_relations()) + self.assertTrue('has_text' in schema['CWUser'].subject_relations()) + self.assertFalse('has_text' in schema['EmailAddress'].subject_relations()) def test_permission_settings(self): schema = loader.load(config) diff -r aa30c665bd06 -r f2a5805615f8 test/unittest_selectors.py --- a/test/unittest_selectors.py Fri Sep 16 16:42:42 2011 +0200 +++ b/test/unittest_selectors.py Fri Sep 16 17:30:26 2011 +0200 @@ -87,11 +87,11 @@ def test_composition(self): selector = (_1_() & _1_()) & (_1_() & _1_()) - self.failUnless(isinstance(selector, AndSelector)) + self.assertTrue(isinstance(selector, AndSelector)) self.assertEqual(len(selector.selectors), 4) self.assertEqual(selector(None), 4) selector = (_1_() & _0_()) | (_1_() & _1_()) - self.failUnless(isinstance(selector, OrSelector)) + self.assertTrue(isinstance(selector, OrSelector)) self.assertEqual(len(selector.selectors), 2) self.assertEqual(selector(None), 2) @@ -151,13 +151,13 @@ rset = f.as_rset() anyscore = is_instance('Any')(f.__class__, req, rset=rset) idownscore = adaptable('IDownloadable')(f.__class__, req, rset=rset) - self.failUnless(idownscore > anyscore, (idownscore, anyscore)) + self.assertTrue(idownscore > anyscore, (idownscore, anyscore)) filescore = is_instance('File')(f.__class__, req, rset=rset) - self.failUnless(filescore > idownscore, (filescore, idownscore)) + self.assertTrue(filescore > idownscore, (filescore, idownscore)) def test_etype_inheritance_no_yams_inheritance(self): cls = self.vreg['etypes'].etype_class('Personne') - self.failIf(is_instance('Societe').score_class(cls, self.request())) + self.assertFalse(is_instance('Societe').score_class(cls, self.request())) def test_yams_inheritance(self): cls = self.vreg['etypes'].etype_class('Transition') @@ -321,7 +321,7 @@ self.vreg._loadedmods[__name__] = {} self.vreg.register(SomeAction) SomeAction.__registered__(self.vreg['actions']) - self.failUnless(SomeAction in self.vreg['actions']['yo'], self.vreg['actions']) + self.assertTrue(SomeAction in self.vreg['actions']['yo'], self.vreg['actions']) try: # login as a simple user req = self.request() @@ -330,18 +330,18 @@ # it should not be possible to use SomeAction not owned objects req = self.request() rset = req.execute('Any G WHERE G is CWGroup, G name "managers"') - self.failIf('yo' in dict(self.pactions(req, rset))) + self.assertFalse('yo' in dict(self.pactions(req, rset))) # insert a new card, and check that we can use SomeAction on our object self.execute('INSERT Card C: C title "zoubidou"') self.commit() req = self.request() rset = req.execute('Card C WHERE C title "zoubidou"') - self.failUnless('yo' in dict(self.pactions(req, rset)), self.pactions(req, rset)) + self.assertTrue('yo' in dict(self.pactions(req, rset)), self.pactions(req, rset)) # make sure even managers can't use the action self.restore_connection() req = self.request() rset = req.execute('Card C WHERE C title "zoubidou"') - self.failIf('yo' in dict(self.pactions(req, rset))) + self.assertFalse('yo' in dict(self.pactions(req, rset))) finally: del self.vreg[SomeAction.__registry__][SomeAction.__regid__] diff -r aa30c665bd06 -r f2a5805615f8 test/unittest_utils.py --- a/test/unittest_utils.py Fri Sep 16 16:42:42 2011 +0200 +++ b/test/unittest_utils.py Fri Sep 16 17:30:26 2011 +0200 @@ -67,7 +67,7 @@ # XXX self.assertEqual(l[4], (1, 3)) - self.failIf(RepeatList(0, None)) + self.assertFalse(RepeatList(0, None)) def test_slice(self): l = RepeatList(3, (1, 3)) @@ -159,9 +159,17 @@ self.assertEqual(self.encode(TestCase), 'null') class HTMLHeadTC(CubicWebTC): + + def htmlhead(self, datadir_url): + req = self.request() + base_url = u'http://test.fr/data/' + req.datadir_url = base_url + head = HTMLHead(req) + return head + def test_concat_urls(self): base_url = u'http://test.fr/data/' - head = HTMLHead(base_url) + head = self.htmlhead(base_url) urls = [base_url + u'bob1.js', base_url + u'bob2.js', base_url + u'bob3.js'] @@ -171,7 +179,7 @@ def test_group_urls(self): base_url = u'http://test.fr/data/' - head = HTMLHead(base_url) + head = self.htmlhead(base_url) urls_spec = [(base_url + u'bob0.js', None), (base_url + u'bob1.js', None), (u'http://ext.com/bob2.js', None), @@ -196,7 +204,7 @@ def test_getvalue_with_concat(self): base_url = u'http://test.fr/data/' - head = HTMLHead(base_url) + head = self.htmlhead(base_url) head.add_js(base_url + u'bob0.js') head.add_js(base_url + u'bob1.js') head.add_js(u'http://ext.com/bob2.js') @@ -224,20 +232,22 @@ self.assertEqual(result, expected) def test_getvalue_without_concat(self): - base_url = u'http://test.fr/data/' - head = HTMLHead() - head.add_js(base_url + u'bob0.js') - head.add_js(base_url + u'bob1.js') - head.add_js(u'http://ext.com/bob2.js') - head.add_js(u'http://ext.com/bob3.js') - head.add_css(base_url + u'bob4.css') - head.add_css(base_url + u'bob5.css') - head.add_css(base_url + u'bob6.css', 'print') - head.add_css(base_url + u'bob7.css', 'print') - head.add_ie_css(base_url + u'bob8.css') - head.add_ie_css(base_url + u'bob9.css', 'print', u'[if lt IE 7]') - result = head.getvalue() - expected = u""" + self.config.global_set_option('concat-resources', False) + try: + base_url = u'http://test.fr/data/' + head = self.htmlhead(base_url) + head.add_js(base_url + u'bob0.js') + head.add_js(base_url + u'bob1.js') + head.add_js(u'http://ext.com/bob2.js') + head.add_js(u'http://ext.com/bob3.js') + head.add_css(base_url + u'bob4.css') + head.add_css(base_url + u'bob5.css') + head.add_css(base_url + u'bob6.css', 'print') + head.add_css(base_url + u'bob7.css', 'print') + head.add_ie_css(base_url + u'bob8.css') + head.add_ie_css(base_url + u'bob9.css', 'print', u'[if lt IE 7]') + result = head.getvalue() + expected = u""" @@ -253,7 +263,9 @@ """ - self.assertEqual(result, expected) + self.assertEqual(result, expected) + finally: + self.config.global_set_option('concat-resources', True) class DocTest(DocTest): from cubicweb import utils as module diff -r aa30c665bd06 -r f2a5805615f8 test/unittest_vregistry.py --- a/test/unittest_vregistry.py Fri Sep 16 16:42:42 2011 +0200 +++ b/test/unittest_vregistry.py Fri Sep 16 17:30:26 2011 +0200 @@ -56,7 +56,7 @@ def test_load_subinterface_based_appobjects(self): self.vreg.register_objects([join(BASE, 'web', 'views', 'iprogress.py')]) # check progressbar was kicked - self.failIf(self.vreg['views'].get('progressbar')) + self.assertFalse(self.vreg['views'].get('progressbar')) # we've to emulate register_objects to add custom MyCard objects path = [join(BASE, 'entities', '__init__.py'), join(BASE, 'entities', 'adapters.py'), @@ -74,8 +74,8 @@ def test_properties(self): self.vreg.reset() - self.failIf('system.version.cubicweb' in self.vreg['propertydefs']) - self.failUnless(self.vreg.property_info('system.version.cubicweb')) + self.assertFalse('system.version.cubicweb' in self.vreg['propertydefs']) + self.assertTrue(self.vreg.property_info('system.version.cubicweb')) self.assertRaises(UnknownProperty, self.vreg.property_info, 'a.non.existent.key') diff -r aa30c665bd06 -r f2a5805615f8 utils.py --- a/utils.py Fri Sep 16 16:42:42 2011 +0200 +++ b/utils.py Fri Sep 16 17:30:26 2011 +0200 @@ -227,7 +227,7 @@ xhtml_safe_script_opening = u'' - def __init__(self, datadir_url=None): + def __init__(self, req): super(HTMLHead, self).__init__() self.jsvars = [] self.jsfiles = [] @@ -235,8 +235,8 @@ self.ie_cssfiles = [] self.post_inlined_scripts = [] self.pagedata_unload = False - self.datadir_url = datadir_url - + self._cw = req + self.datadir_url = req.datadir_url def add_raw(self, rawheader): self.write(rawheader) @@ -348,20 +348,26 @@ w(vardecl + u'\n') w(self.xhtml_safe_script_closing) # 2/ css files - for cssfile, media in (self.group_urls(self.cssfiles) if self.datadir_url else self.cssfiles): + ie_cssfiles = ((x, (y, z)) for x, y, z in self.ie_cssfiles) + if self.datadir_url and self._cw.vreg.config['concat-resources']: + cssfiles = self.group_urls(self.cssfiles) + ie_cssfiles = self.group_urls(ie_cssfiles) + jsfiles = (x for x, _ in self.group_urls((x, None) for x in self.jsfiles)) + else: + cssfiles = self.cssfiles + jsfiles = self.jsfiles + for cssfile, media in cssfiles: w(u'\n' % (media, xml_escape(cssfile))) # 3/ ie css if necessary - if self.ie_cssfiles: - ie_cssfiles = ((x, (y, z)) for x, y, z in self.ie_cssfiles) - for cssfile, (media, iespec) in (self.group_urls(ie_cssfiles) if self.datadir_url else ie_cssfiles): + if self.ie_cssfiles: # use self.ie_cssfiles because `ie_cssfiles` is a genexp + for cssfile, (media, iespec) in ie_cssfiles: w(u' \n') # 4/ js files - jsfiles = ((x, None) for x in self.jsfiles) - for jsfile, media in self.group_urls(jsfiles) if self.datadir_url else jsfiles: + for jsfile in jsfiles: if skiphead: # Don't insert