--- a/schema.py Thu Mar 31 09:39:51 2011 +0200
+++ b/schema.py Thu Mar 31 15:23:46 2011 +0200
@@ -28,6 +28,7 @@
from logilab.common.decorators import cached, clear_cache, monkeypatch
from logilab.common.logging_ext import set_log_methods
from logilab.common.deprecation import deprecated, class_moved
+from logilab.common.textutils import splitstrip
from logilab.common.graph import get_cycles
from logilab.common.compat import any
@@ -179,35 +180,6 @@
__builtins__['display_name'] = deprecated('[3.4] display_name should be imported from cubicweb.schema')(display_name)
-# rql expression utilities function ############################################
-
-def guess_rrqlexpr_mainvars(expression):
- defined = set(split_expression(expression))
- mainvars = []
- if 'S' in defined:
- mainvars.append('S')
- if 'O' in defined:
- mainvars.append('O')
- if 'U' in defined:
- mainvars.append('U')
- if not mainvars:
- raise Exception('unable to guess selection variables')
- return ','.join(sorted(mainvars))
-
-def split_expression(rqlstring):
- for expr in rqlstring.split(','):
- for noparen in expr.split('('):
- for word in noparen.split():
- yield word
-
-def normalize_expression(rqlstring):
- """normalize an rql expression to ease schema synchronization (avoid
- suppressing and reinserting an expression if only a space has been added/removed
- for instance)
- """
- return u', '.join(' '.join(expr.split()) for expr in rqlstring.split(','))
-
-
# Schema objects definition ###################################################
def ERSchema_display_name(self, req, form='', context=None):
@@ -640,175 +612,57 @@
def schema_by_eid(self, eid):
return self._eid_index[eid]
-
-# Possible constraints ########################################################
-
-class BaseRQLConstraint(BaseConstraint):
- """base class for rql constraints
- """
- distinct_query = None
-
- def __init__(self, restriction, mainvars=None):
- self.restriction = normalize_expression(restriction)
- if mainvars is None:
- mainvars = guess_rrqlexpr_mainvars(restriction)
- else:
- normmainvars = []
- for mainvar in mainvars.split(','):
- mainvar = mainvar.strip()
- if not mainvar.isalpha():
- raise Exception('bad mainvars %s' % mainvars)
- normmainvars.append(mainvar)
- assert mainvars, 'bad mainvars %s' % mainvars
- mainvars = ','.join(sorted(normmainvars))
- self.mainvars = mainvars
-
- def serialize(self):
- # start with a comma for bw compat, see below
- return ';' + self.mainvars + ';' + self.restriction
-
- @classmethod
- def deserialize(cls, value):
- # XXX < 3.5.10 bw compat
- if not value.startswith(';'):
- return cls(value)
- _, mainvars, restriction = value.split(';', 2)
- return cls(restriction, mainvars)
-
- def check(self, entity, rtype, value):
- """return true if the value satisfy the constraint, else false"""
- # implemented as a hook in the repository
- return 1
-
- def repo_check(self, session, eidfrom, rtype, eidto):
- """raise ValidationError if the relation doesn't satisfy the constraint
- """
- pass # this is a vocabulary constraint, not enforce XXX why?
-
- def __str__(self):
- if self.distinct_query:
- selop = 'Any'
- else:
- selop = 'DISTINCT Any'
- return '%s(%s %s WHERE %s)' % (self.__class__.__name__, selop,
- self.mainvars, self.restriction)
-
- def __repr__(self):
- return '<%s @%#x>' % (self.__str__(), id(self))
-
-
-class RQLVocabularyConstraint(BaseRQLConstraint):
- """the rql vocabulary constraint :
-
- limit the proposed values to a set of entities returned by a rql query,
- but this is not enforced at the repository level
-
- restriction is additional rql restriction that will be added to
- a predefined query, where the S and O variables respectivly represent
- the subject and the object of the relation
-
- mainvars is a string that should be used as selection variable (eg
- `'Any %s WHERE ...' % mainvars`). If not specified, an attempt will be
- done to guess it according to variable used in the expression.
- """
-
-
-class RepoEnforcedRQLConstraintMixIn(object):
+# Bases for manipulating RQL in schema #########################################
- def __init__(self, restriction, mainvars=None, msg=None):
- super(RepoEnforcedRQLConstraintMixIn, self).__init__(restriction, mainvars)
- self.msg = msg
-
- def serialize(self):
- # start with a semicolon for bw compat, see below
- return ';%s;%s\n%s' % (self.mainvars, self.restriction,
- self.msg or '')
-
- def deserialize(cls, value):
- # XXX < 3.5.10 bw compat
- if not value.startswith(';'):
- return cls(value)
- value, msg = value.split('\n', 1)
- _, mainvars, restriction = value.split(';', 2)
- return cls(restriction, mainvars, msg)
- deserialize = classmethod(deserialize)
+def guess_rrqlexpr_mainvars(expression):
+ defined = set(split_expression(expression))
+ mainvars = set()
+ if 'S' in defined:
+ mainvars.add('S')
+ if 'O' in defined:
+ mainvars.add('O')
+ if 'U' in defined:
+ mainvars.add('U')
+ if not mainvars:
+ raise Exception('unable to guess selection variables')
+ return mainvars
- def repo_check(self, session, eidfrom, rtype, eidto=None):
- """raise ValidationError if the relation doesn't satisfy the constraint
- """
- if not self.match_condition(session, eidfrom, eidto):
- # XXX at this point if both or neither of S and O are in mainvar we
- # dunno if the validation error `occurred` on eidfrom or eidto (from
- # user interface point of view)
- #
- # possible enhancement: check entity being created, it's probably
- # the main eid unless this is a composite relation
- if eidto is None or 'S' in self.mainvars or not 'O' in self.mainvars:
- maineid = eidfrom
- qname = role_name(rtype, 'subject')
- else:
- maineid = eidto
- qname = role_name(rtype, 'object')
- if self.msg:
- msg = session._(self.msg)
- else:
- msg = '%(constraint)s %(restriction)s failed' % {
- 'constraint': session._(self.type()),
- 'restriction': self.restriction}
- raise ValidationError(maineid, {qname: msg})
+def split_expression(rqlstring):
+ for expr in rqlstring.split(','):
+ for noparen in expr.split('('):
+ for word in noparen.split():
+ yield word
- def exec_query(self, session, eidfrom, eidto):
- if eidto is None:
- # checking constraint for an attribute relation
- restriction = 'S eid %(s)s, ' + self.restriction
- args = {'s': eidfrom}
- else:
- restriction = 'S eid %(s)s, O eid %(o)s, ' + self.restriction
- args = {'s': eidfrom, 'o': eidto}
- rql = 'Any %s WHERE %s' % (self.mainvars, restriction)
- if self.distinct_query:
- rql = 'DISTINCT ' + rql
- return session.execute(rql, args, build_descr=False)
-
-
-class RQLConstraint(RepoEnforcedRQLConstraintMixIn, RQLVocabularyConstraint):
- """the rql constraint is similar to the RQLVocabularyConstraint but
- are also enforced at the repository level
+def normalize_expression(rqlstring):
+ """normalize an rql expression to ease schema synchronization (avoid
+ suppressing and reinserting an expression if only a space has been
+ added/removed for instance)
"""
- distinct_query = False
-
- def match_condition(self, session, eidfrom, eidto):
- return self.exec_query(session, eidfrom, eidto)
-
-
-class RQLUniqueConstraint(RepoEnforcedRQLConstraintMixIn, BaseRQLConstraint):
- """the unique rql constraint check that the result of the query isn't
- greater than one.
-
- You *must* specify mainvars when instantiating the constraint since there is
- no way to guess it correctly (e.g. if using S,O or U the constraint will
- always be satisfied because we've to use a DISTINCT query).
- """
- # XXX turns mainvars into a required argument in __init__
- distinct_query = True
-
- def match_condition(self, session, eidfrom, eidto):
- return len(self.exec_query(session, eidfrom, eidto)) <= 1
+ return u', '.join(' '.join(expr.split()) for expr in rqlstring.split(','))
class RQLExpression(object):
+ """Base class for RQL expression used in schema (constraints and
+ permissions)
+ """
+ # these are overridden by set_log_methods below
+ # only defining here to prevent pylint from complaining
+ info = warning = error = critical = exception = debug = lambda msg,*a,**kw: None
def __init__(self, expression, mainvars, eid):
self.eid = eid # eid of the entity representing this rql expression
- if not isinstance(mainvars, unicode):
- mainvars = unicode(mainvars)
+ assert mainvars, 'bad mainvars %s' % mainvars
+ if isinstance(mainvars, basestring):
+ mainvars = set(splitstrip(mainvars))
+ elif not isinstance(mainvars, set):
+ mainvars = set(mainvars)
self.mainvars = mainvars
self.expression = normalize_expression(expression)
try:
self.rqlst = parse(self.full_rql, print_errors=False).children[0]
except RQLSyntaxError:
raise RQLSyntaxError(expression)
- for mainvar in mainvars.split(','):
+ for mainvar in mainvars:
if len(self.rqlst.defined_vars[mainvar].references()) <= 2:
_LOGGER.warn('You did not use the %s variable in your RQL '
'expression %s', mainvar, self)
@@ -832,6 +686,8 @@
def __setstate__(self, state):
self.__init__(*state)
+ # permission rql expression specific stuff #################################
+
@cached
def transform_has_permission(self):
found = None
@@ -942,12 +798,10 @@
@property
def minimal_rql(self):
- return 'Any %s WHERE %s' % (self.mainvars, self.expression)
+ return 'Any %s WHERE %s' % (','.join(sorted(self.mainvars)),
+ self.expression)
- # these are overridden by set_log_methods below
- # only defining here to prevent pylint from complaining
- info = warning = error = critical = exception = debug = lambda msg,*a,**kw: None
-
+# rql expressions for use in permission definition #############################
class ERQLExpression(RQLExpression):
def __init__(self, expression, mainvars=None, eid=None):
@@ -1024,12 +878,153 @@
kwargs['o'] = toeid
return self._check(session, **kwargs)
+
# in yams, default 'update' perm for attributes granted to managers and owners.
# Within cw, we want to default to users who may edit the entity holding the
# attribute.
ybo.DEFAULT_ATTRPERMS['update'] = (
'managers', ERQLExpression('U has_update_permission X'))
+# additional cw specific constraints ###########################################
+
+class BaseRQLConstraint(RRQLExpression, BaseConstraint):
+ """base class for rql constraints"""
+ distinct_query = None
+
+ def serialize(self):
+ # start with a comma for bw compat,see below
+ return ';' + ','.join(sorted(self.mainvars)) + ';' + self.expression
+
+ @classmethod
+ def deserialize(cls, value):
+ # XXX < 3.5.10 bw compat
+ if not value.startswith(';'):
+ return cls(value)
+ _, mainvars, expression = value.split(';', 2)
+ return cls(expression, mainvars)
+
+ def check(self, entity, rtype, value):
+ """return true if the value satisfy the constraint, else false"""
+ # implemented as a hook in the repository
+ return 1
+
+ def __str__(self):
+ if self.distinct_query:
+ selop = 'Any'
+ else:
+ selop = 'DISTINCT Any'
+ return '%s(%s %s WHERE %s)' % (self.__class__.__name__, selop,
+ ','.join(sorted(self.mainvars)),
+ self.expression)
+
+ def __repr__(self):
+ return '<%s @%#x>' % (self.__str__(), id(self))
+
+
+class RQLVocabularyConstraint(BaseRQLConstraint):
+ """the rql vocabulary constraint:
+
+ limit the proposed values to a set of entities returned by a rql query,
+ but this is not enforced at the repository level
+
+ `expression` is additional rql restriction that will be added to
+ a predefined query, where the S and O variables respectivly represent
+ the subject and the object of the relation
+
+ `mainvars` is a set of variables that should be used as selection variable
+ (eg `'Any %s WHERE ...' % mainvars`). If not specified, an attempt will be
+ done to guess it according to variable used in the expression.
+ """
+
+ def repo_check(self, session, eidfrom, rtype, eidto):
+ """raise ValidationError if the relation doesn't satisfy the constraint
+ """
+ pass # this is a vocabulary constraint, not enforce
+
+
+class RepoEnforcedRQLConstraintMixIn(object):
+
+ def __init__(self, expression, mainvars=None, msg=None):
+ super(RepoEnforcedRQLConstraintMixIn, self).__init__(expression, mainvars)
+ self.msg = msg
+
+ def serialize(self):
+ # start with a semicolon for bw compat, see below
+ return ';%s;%s\n%s' % (','.join(sorted(self.mainvars)), self.expression,
+ self.msg or '')
+
+ def deserialize(cls, value):
+ # XXX < 3.5.10 bw compat
+ if not value.startswith(';'):
+ return cls(value)
+ value, msg = value.split('\n', 1)
+ _, mainvars, expression = value.split(';', 2)
+ return cls(expression, mainvars, msg)
+ deserialize = classmethod(deserialize)
+
+ def repo_check(self, session, eidfrom, rtype, eidto=None):
+ """raise ValidationError if the relation doesn't satisfy the constraint
+ """
+ if not self.match_condition(session, eidfrom, eidto):
+ # XXX at this point if both or neither of S and O are in mainvar we
+ # dunno if the validation error `occurred` on eidfrom or eidto (from
+ # user interface point of view)
+ #
+ # possible enhancement: check entity being created, it's probably
+ # the main eid unless this is a composite relation
+ if eidto is None or 'S' in self.mainvars or not 'O' in self.mainvars:
+ maineid = eidfrom
+ qname = role_name(rtype, 'subject')
+ else:
+ maineid = eidto
+ qname = role_name(rtype, 'object')
+ if self.msg:
+ msg = session._(self.msg)
+ else:
+ msg = '%(constraint)s %(expression)s failed' % {
+ 'constraint': session._(self.type()),
+ 'expression': self.expression}
+ raise ValidationError(maineid, {qname: msg})
+
+ def exec_query(self, session, eidfrom, eidto):
+ if eidto is None:
+ # checking constraint for an attribute relation
+ expression = 'S eid %(s)s, ' + self.expression
+ args = {'s': eidfrom}
+ else:
+ expression = 'S eid %(s)s, O eid %(o)s, ' + self.expression
+ args = {'s': eidfrom, 'o': eidto}
+ rql = 'Any %s WHERE %s' % (','.join(sorted(self.mainvars)), expression)
+ if self.distinct_query:
+ rql = 'DISTINCT ' + rql
+ return session.execute(rql, args, build_descr=False)
+
+
+class RQLConstraint(RepoEnforcedRQLConstraintMixIn, RQLVocabularyConstraint):
+ """the rql constraint is similar to the RQLVocabularyConstraint but
+ are also enforced at the repository level
+ """
+ distinct_query = False
+
+ def match_condition(self, session, eidfrom, eidto):
+ return self.exec_query(session, eidfrom, eidto)
+
+
+class RQLUniqueConstraint(RepoEnforcedRQLConstraintMixIn, BaseRQLConstraint):
+ """the unique rql constraint check that the result of the query isn't
+ greater than one.
+
+ You *must* specify `mainvars` when instantiating the constraint since there
+ is no way to guess it correctly (e.g. if using S,O or U the constraint will
+ always be satisfied because we've to use a DISTINCT query).
+ """
+ # XXX turns mainvars into a required argument in __init__
+ distinct_query = True
+
+ def match_condition(self, session, eidfrom, eidto):
+ return len(self.exec_query(session, eidfrom, eidto)) <= 1
+
+
# workflow extensions #########################################################
from yams.buildobjs import _add_relation as yams_add_relation