[entity] restrict creation form field vocabulary using __linkto information (closes #1799997)
authorFlorent Cayré <florent.cayre@gmail.com>
Thu, 20 Oct 2011 16:03:51 +0200
changeset 7973 64639bc94e25
parent 7971 3e51c2a577dd
child 7979 8bd5031e2201
[entity] restrict creation form field vocabulary using __linkto information (closes #1799997)
entity.py
i18n/fr.po
schema.py
test/data/schema.py
test/unittest_entity.py
web/formfields.py
--- a/entity.py	Thu Oct 20 14:22:16 2011 +0200
+++ b/entity.py	Thu Oct 20 16:03:51 2011 +0200
@@ -37,7 +37,8 @@
 from cubicweb.selectors import yes
 from cubicweb.appobject import AppObject
 from cubicweb.req import _check_cw_unsafe
-from cubicweb.schema import RQLVocabularyConstraint, RQLConstraint
+from cubicweb.schema import (RQLVocabularyConstraint, RQLConstraint,
+                             GeneratedConstraint)
 from cubicweb.rqlrewrite import RQLRewriter
 
 from cubicweb.uilib import soup2xhtml
@@ -65,6 +66,85 @@
         return False
     return True
 
+def rel_vars(rel):
+    return ((isinstance(rel.children[0], VariableRef)
+             and rel.children[0].variable or None),
+            (isinstance(rel.children[1].children[0], VariableRef)
+             and rel.children[1].children[0].variable or None)
+            )
+
+def rel_matches(rel, rtype, role, varname, operator='='):
+    if rel.r_type == rtype and rel.children[1].operator == operator:
+        same_role_var_idx = 0 if role == 'subject' else 1
+        variables = rel_vars(rel)
+        if variables[same_role_var_idx].name == varname:
+            return variables[1 - same_role_var_idx]
+
+def build_cstr_with_linkto_infos(cstr, args, searchedvar, evar,
+                                 lt_infos, eidvars):
+    """restrict vocabulary as much as possible in entity creation,
+    based on infos provided by __linkto form param.
+
+    Example based on following schema:
+
+      class works_in(RelationDefinition):
+          subject = 'CWUser'
+          object = 'Lab'
+          cardinality = '1*'
+          constraints = [RQLConstraint('S in_group G, O welcomes G')]
+
+      class welcomes(RelationDefinition):
+          subject = 'Lab'
+          object = 'CWGroup'
+
+    If you create a CWUser in the "scientists" CWGroup you can show
+    only the labs that welcome them using :
+
+      lt_infos = {('in_group', 'subject'): 321}
+
+    You get following restriction : 'O welcomes G, G eid 321'
+
+    """
+    st = cstr.snippet_rqlst.copy()
+    # replace relations in ST by eid infos from linkto where possible
+    for (info_rtype, info_role), eids in lt_infos.iteritems():
+        eid = eids[0] # NOTE: we currently assume a pruned lt_info with only 1 eid
+        for rel in st.iget_nodes(RqlRelation):
+            targetvar = rel_matches(rel, info_rtype, info_role, evar.name)
+            if targetvar is not None:
+                if targetvar.name in eidvars:
+                    rel.parent.remove(rel)
+                else:
+                    eidrel = make_relation(
+                        targetvar, 'eid', (targetvar.name, 'Substitute'),
+                        Constant)
+                    rel.parent.replace(rel, eidrel)
+                    args[targetvar.name] = eid
+                    eidvars.add(targetvar.name)
+    # if modified ST still contains evar references we must discard the
+    # constraint, otherwise evar is unknown in the final rql query which can
+    # lead to a SQL table cartesian product and multiple occurences of solutions
+    evarname = evar.name
+    for rel in st.iget_nodes(RqlRelation):
+        for variable in rel_vars(rel):
+            if variable and evarname == variable.name:
+                return
+    # else insert snippets into the global tree
+    return GeneratedConstraint(st, cstr.mainvars - set(evarname))
+
+def pruned_lt_info(eschema, lt_infos):
+    pruned = {}
+    for (lt_rtype, lt_role), eids in lt_infos.iteritems():
+        # we can only use lt_infos describing relation with a cardinality
+        # of value 1 towards the linked entity
+        if not len(eids) == 1:
+            continue
+        lt_card = eschema.rdef(lt_rtype, lt_role).cardinality[
+            0 if lt_role == 'subject' else 1]
+        if lt_card not in '?1':
+            continue
+        pruned[(lt_rtype, lt_role)] = eids
+    return pruned
 
 class Entity(AppObject):
     """an entity instance has e_schema automagically set on
@@ -931,17 +1011,22 @@
     # generic vocabulary methods ##############################################
 
     def cw_unrelated_rql(self, rtype, targettype, role, ordermethod=None,
-                         vocabconstraints=True):
+                         vocabconstraints=True, lt_infos={}):
         """build a rql to fetch `targettype` entities unrelated to this entity
         using (rtype, role) relation.
 
         Consider relation permissions so that returned entities may be actually
         linked by `rtype`.
+
+        `lt_infos` are supplementary informations, usually coming from __linkto
+        parameter, that can help further restricting the results in case current
+        entity is not yet created. It is a dict describing entities the current
+        entity will be linked to, which keys are (rtype, role) tuples and values
+        are a list of eids.
         """
         ordermethod = ordermethod or 'fetch_unrelated_order'
-        if isinstance(rtype, basestring):
-            rtype = self._cw.vreg.schema.rschema(rtype)
-        rdef = rtype.role_rdef(self.e_schema, targettype, role)
+        rschema = self._cw.vreg.schema.rschema(rtype)
+        rdef = rschema.role_rdef(self.e_schema, targettype, role)
         rewriter = RQLRewriter(self._cw)
         select = Select()
         # initialize some variables according to the `role` of `self` in the
@@ -955,20 +1040,20 @@
             searchedvar = subjvar = select.get_variable('S')
             evar = objvar = select.get_variable('O')
         select.add_selected(searchedvar)
-        # initialize some variables according to `self` existance
+        # initialize some variables according to `self` existence
         if rdef.role_cardinality(neg_role(role)) in '?1':
             # if cardinality in '1?', we want a target entity which isn't
             # already linked using this relation
-            var = select.get_variable('ZZ') # XXX unname when tests pass
+            variable = select.make_variable()
             if role == 'subject':
-                rel = make_relation(var, rtype.type, (searchedvar,), VariableRef)
+                rel = make_relation(variable, rtype, (searchedvar,), VariableRef)
             else:
-                rel = make_relation(searchedvar, rtype.type, (var,), VariableRef)
+                rel = make_relation(searchedvar, rtype, (variable,), VariableRef)
             select.add_restriction(Not(rel))
         elif self.has_eid():
             # elif we have an eid, we don't want a target entity which is
             # already linked to ourself through this relation
-            rel = make_relation(subjvar, rtype.type, (objvar,), VariableRef)
+            rel = make_relation(subjvar, rtype, (objvar,), VariableRef)
             select.add_restriction(Not(rel))
         if self.has_eid():
             rel = make_relation(evar, 'eid', ('x', 'Substitute'), Constant)
@@ -998,11 +1083,23 @@
             cstrcls = RQLVocabularyConstraint
         else:
             cstrcls = RQLConstraint
+        lt_infos = pruned_lt_info(self.e_schema, lt_infos or {})
+        # if there are still lt_infos, use set to keep track of added eid
+        # relations (adding twice the same eid relation is incorrect RQL)
+        eidvars = set()
         for cstr in rdef.constraints:
             # consider constraint.mainvars to check if constraint apply
             if isinstance(cstr, cstrcls) and searchedvar.name in cstr.mainvars:
-                if not self.has_eid() and evar.name in cstr.mainvars:
-                    continue
+                if not self.has_eid():
+                    if lt_infos:
+                        # we can perhaps further restrict with linkto infos using
+                        # a custom constraint built from cstr and lt_infos
+                        cstr = build_cstr_with_linkto_infos(
+                            cstr, args, searchedvar, evar, lt_infos, eidvars)
+                        if cstr is None:
+                            continue # could not build constraint -> discard
+                    elif evar.name in cstr.mainvars:
+                        continue
                 # compute a varmap suitable to RQLRewriter.rewrite argument
                 varmap = dict((v, v) for v in (searchedvar.name, evar.name)
                               if v in select.defined_vars and v in cstr.mainvars)
@@ -1028,12 +1125,13 @@
         return rql, args
 
     def unrelated(self, rtype, targettype, role='subject', limit=None,
-                  ordermethod=None): # XXX .cw_unrelated
+                  ordermethod=None, lt_infos={}): # XXX .cw_unrelated
         """return a result set of target type objects that may be related
         by a given relation, with self as subject or object
         """
         try:
-            rql, args = self.cw_unrelated_rql(rtype, targettype, role, ordermethod)
+            rql, args = self.cw_unrelated_rql(rtype, targettype, role,
+                                              ordermethod, lt_infos=lt_infos)
         except Unauthorized:
             return self._cw.empty_rset()
         # XXX should be set in unrelated rql when manipulating the AST
--- a/i18n/fr.po	Thu Oct 20 14:22:16 2011 +0200
+++ b/i18n/fr.po	Thu Oct 20 16:03:51 2011 +0200
@@ -223,7 +223,7 @@
 "<div>This schema of the data model <em>excludes</em> the meta-data, but you "
 "can also display a <a href=\"%s\">complete schema with meta-data</a>.</div>"
 msgstr ""
-"<div>Ce schéma du modèle de données <em>exclue</em> les méta-données, mais "
+"<div>Ce schéma du modèle de données <em>exclut</em> les méta-données, mais "
 "vous pouvez afficher un <a href=\"%s\">schéma complet</a>.</div>"
 
 msgid "<no relation>"
--- a/schema.py	Thu Oct 20 14:22:16 2011 +0200
+++ b/schema.py	Thu Oct 20 16:03:51 2011 +0200
@@ -850,23 +850,39 @@
         return self._check(session, **kwargs)
 
 
+def vargraph(rqlst):
+    """ builds an adjacency graph of variables from the rql syntax tree, e.g:
+    Any O,S WHERE T subworkflow_exit S, T subworkflow WF, O state_of WF
+    => {'WF': ['O', 'T'], 'S': ['T'], 'T': ['WF', 'S'], 'O': ['WF']}
+    """
+    vargraph = {}
+    for relation in rqlst.get_nodes(nodes.Relation):
+        try:
+            rhsvarname = relation.children[1].children[0].variable.name
+            lhsvarname = relation.children[0].name
+        except AttributeError:
+            pass
+        else:
+            vargraph.setdefault(lhsvarname, []).append(rhsvarname)
+            vargraph.setdefault(rhsvarname, []).append(lhsvarname)
+            #vargraph[(lhsvarname, rhsvarname)] = relation.r_type
+    return vargraph
+
+
+class GeneratedConstraint(object):
+    def __init__(self, rqlst, mainvars):
+        self.snippet_rqlst = rqlst
+        self.mainvars = mainvars
+        self.vargraph = vargraph(rqlst)
+
+
 class RRQLExpression(RQLExpression):
     def __init__(self, expression, mainvars=None, eid=None):
         if mainvars is None:
             mainvars = guess_rrqlexpr_mainvars(expression)
         RQLExpression.__init__(self, expression, mainvars, eid)
         # graph of links between variable, used by rql rewriter
-        self.vargraph = {}
-        for relation in self.rqlst.get_nodes(nodes.Relation):
-            try:
-                rhsvarname = relation.children[1].children[0].variable.name
-                lhsvarname = relation.children[0].name
-            except AttributeError:
-                pass
-            else:
-                self.vargraph.setdefault(lhsvarname, []).append(rhsvarname)
-                self.vargraph.setdefault(rhsvarname, []).append(lhsvarname)
-                #self.vargraph[(lhsvarname, rhsvarname)] = relation.r_type
+        self.vargraph = vargraph(self.rqlst)
 
     @property
     def full_rql(self):
--- a/test/data/schema.py	Thu Oct 20 14:22:16 2011 +0200
+++ b/test/data/schema.py	Thu Oct 20 16:03:51 2011 +0200
@@ -37,13 +37,19 @@
             # unittest_entity.py
             RQLVocabularyConstraint('NOT (S connait P, P nom "toto")'),
             RQLVocabularyConstraint('S travaille P, P nom "tutu"')])
+    actionnaire = SubjectRelation('Societe', cardinality='??',
+                                  constraints=[RQLConstraint('NOT EXISTS(O contrat_exclusif S)')])
+    dirige = SubjectRelation('Societe', cardinality='??',
+                             constraints=[RQLConstraint('S actionnaire O')])
+    associe = SubjectRelation('Personne', cardinality='1*',
+                              constraints=[RQLConstraint('S actionnaire SOC, O actionnaire SOC')])
 
 
 class Societe(EntityType):
     nom = String()
     evaluee = SubjectRelation('Note')
     fournit = SubjectRelation(('Service', 'Produit'), cardinality='1*')
-
+    contrat_exclusif = SubjectRelation('Personne', cardinality='??')
 
 class Service(EntityType):
     fabrique_par = SubjectRelation('Personne', cardinality='1*')
--- a/test/unittest_entity.py	Thu Oct 20 14:22:16 2011 +0200
+++ b/test/unittest_entity.py	Thu Oct 20 16:03:51 2011 +0200
@@ -25,7 +25,7 @@
 from cubicweb.mttransforms import HAS_TAL
 from cubicweb.entities import fetch_config
 from cubicweb.uilib import soup2xhtml
-
+from cubicweb.schema import RQLVocabularyConstraint
 
 class EntityTC(CubicWebTC):
 
@@ -239,7 +239,7 @@
         self.assertEqual(n.cw_related_rql('evaluee', role='object',
                                           targettypes=('Societe', 'Personne')),
                          "Any X,AA ORDERBY AB DESC WHERE E eid %(x)s, X evaluee E, "
-                         "X is IN('Personne', 'Societe'), X nom AA, "
+                         "X is IN(Personne, Societe), X nom AA, "
                          "X modification_date AB")
         Personne.fetch_attrs, Personne.cw_fetch_order = fetch_config(('nom', ))
         # XXX
@@ -279,7 +279,7 @@
         user = self.request().user
         rql = user.cw_unrelated_rql('use_email', 'EmailAddress', 'subject')[0]
         self.assertEqual(rql, 'Any O,AA,AB,AC ORDERBY AC DESC '
-                         'WHERE NOT ZZ use_email O, S eid %(x)s, '
+                         'WHERE NOT A use_email O, S eid %(x)s, '
                          'O is EmailAddress, O address AA, O alias AB, O modification_date AC')
 
     def test_unrelated_rql_security_1_user(self):
@@ -289,23 +289,23 @@
         user = req.user
         rql = user.cw_unrelated_rql('use_email', 'EmailAddress', 'subject')[0]
         self.assertEqual(rql, 'Any O,AA,AB,AC ORDERBY AC DESC '
-                         'WHERE NOT ZZ use_email O, S eid %(x)s, '
+                         'WHERE NOT A use_email O, S eid %(x)s, '
                          'O is EmailAddress, O address AA, O alias AB, O modification_date AC')
         user = self.execute('Any X WHERE X login "admin"').get_entity(0, 0)
         rql = user.cw_unrelated_rql('use_email', 'EmailAddress', 'subject')[0]
         self.assertEqual(rql, 'Any O,AA,AB,AC ORDERBY AC DESC '
-                         'WHERE NOT ZZ use_email O, S eid %(x)s, '
+                         'WHERE NOT A use_email O, S eid %(x)s, '
                          'O is EmailAddress, O address AA, O alias AB, O modification_date AC, AD eid %(AE)s, '
-                         'EXISTS(S identity AD, NOT AD in_group AF, AF name "guests", AF is CWGroup), ZZ is CWUser')
+                         'EXISTS(S identity AD, NOT AD in_group AF, AF name "guests", AF is CWGroup), A is CWUser')
 
     def test_unrelated_rql_security_1_anon(self):
         self.login('anon')
         user = self.request().user
         rql = user.cw_unrelated_rql('use_email', 'EmailAddress', 'subject')[0]
         self.assertEqual(rql, 'Any O,AA,AB,AC ORDERBY AC DESC '
-                         'WHERE NOT ZZ use_email O, S eid %(x)s, '
+                         'WHERE NOT A use_email O, S eid %(x)s, '
                          'O is EmailAddress, O address AA, O alias AB, O modification_date AC, AD eid %(AE)s, '
-                         'EXISTS(S identity AD, NOT AD in_group AF, AF name "guests", AF is CWGroup), ZZ is CWUser')
+                         'EXISTS(S identity AD, NOT AD in_group AF, AF name "guests", AF is CWGroup), A is CWUser')
 
     def test_unrelated_rql_security_2(self):
         email = self.execute('INSERT EmailAddress X: X address "hop"').get_entity(0, 0)
@@ -365,6 +365,76 @@
             'NOT S identity O, NOT (S connait AD, AD nom "toto"), '
             'EXISTS(S travaille AE, AE nom "tutu")')
 
+    def test_unrelated_rql_s_linkto_s(self):
+        req = self.request()
+        person = self.vreg['etypes'].etype_class('Personne')(req)
+        self.vreg['etypes'].etype_class('Personne').fetch_attrs = ()
+        soc = req.create_entity('Societe', nom=u'logilab')
+        lt_infos = {('actionnaire', 'subject'): [soc.eid]}
+        rql, args = person.cw_unrelated_rql('associe', 'Personne', 'subject',
+                                            lt_infos=lt_infos)
+        self.assertEqual(u'Any O ORDERBY O WHERE O is Personne, '
+                         u'EXISTS(AA eid %(SOC)s, O actionnaire AA)', rql)
+        self.assertEqual({'SOC': soc.eid}, args)
+
+    def test_unrelated_rql_s_linkto_o(self):
+        req = self.request()
+        person = self.vreg['etypes'].etype_class('Personne')(req)
+        self.vreg['etypes'].etype_class('Societe').fetch_attrs = ()
+        soc = req.create_entity('Societe', nom=u'logilab')
+        lt_infos = {('contrat_exclusif', 'object'): [soc.eid]}
+        rql, args = person.cw_unrelated_rql('actionnaire', 'Societe', 'subject',
+                                            lt_infos=lt_infos)
+        self.assertEqual(u'Any O ORDERBY O WHERE NOT A actionnaire O, '
+                         u'O is Societe, NOT EXISTS(O eid %(O)s), '
+                         u'A is Personne', rql)
+        self.assertEqual({'O': soc.eid}, args)
+
+    def test_unrelated_rql_o_linkto_s(self):
+        req = self.request()
+        soc = self.vreg['etypes'].etype_class('Societe')(req)
+        self.vreg['etypes'].etype_class('Personne').fetch_attrs = ()
+        person = req.create_entity('Personne', nom=u'florent')
+        lt_infos = {('contrat_exclusif', 'subject'): [person.eid]}
+        rql, args = soc.cw_unrelated_rql('actionnaire', 'Personne', 'object',
+                                         lt_infos=lt_infos)
+        self.assertEqual(u'Any S ORDERBY S WHERE NOT S actionnaire A, '
+                         u'S is Personne, NOT EXISTS(S eid %(S)s), '
+                         u'A is Societe', rql)
+        self.assertEqual({'S': person.eid}, args)
+
+    def test_unrelated_rql_o_linkto_o(self):
+        req = self.request()
+        soc = self.vreg['etypes'].etype_class('Societe')(req)
+        self.vreg['etypes'].etype_class('Personne').fetch_attrs = ()
+        person = req.create_entity('Personne', nom=u'florent')
+        lt_infos = {('actionnaire', 'object'): [person.eid]}
+        rql, args = soc.cw_unrelated_rql('dirige', 'Personne', 'object',
+                                         lt_infos=lt_infos)
+        self.assertEqual(u'Any S ORDERBY S WHERE NOT S dirige A, '
+                         u'S is Personne, EXISTS(S eid %(S)s), '
+                         u'A is Societe', rql)
+        self.assertEqual({'S': person.eid}, args)
+
+    def test_unrelated_rql_s_linkto_s_no_info(self):
+        req = self.request()
+        person = self.vreg['etypes'].etype_class('Personne')(req)
+        self.vreg['etypes'].etype_class('Personne').fetch_attrs = ()
+        soc = req.create_entity('Societe', nom=u'logilab')
+        rql, args = person.cw_unrelated_rql('associe', 'Personne', 'subject')
+        self.assertEqual(u'Any O ORDERBY O WHERE O is Personne', rql)
+        self.assertEqual({}, args)
+
+    def test_unrelated_rql_s_linkto_s_unused_info(self):
+        req = self.request()
+        person = self.vreg['etypes'].etype_class('Personne')(req)
+        self.vreg['etypes'].etype_class('Personne').fetch_attrs = ()
+        other_p = req.create_entity('Personne', nom=u'titi')
+        lt_infos = {('dirige', 'subject'): [other_p.eid]}
+        rql, args = person.cw_unrelated_rql('associe', 'Personne', 'subject',
+                                            lt_infos=lt_infos)
+        self.assertEqual(u'Any O ORDERBY O WHERE O is Personne', rql)
+
     def test_unrelated_base(self):
         req = self.request()
         p = req.create_entity('Personne', nom=u'di mascio', prenom=u'adrien')
--- a/web/formfields.py	Thu Oct 20 14:22:16 2011 +0200
+++ b/web/formfields.py	Thu Oct 20 16:03:51 2011 +0200
@@ -1107,8 +1107,8 @@
             done = set()
         res = []
         entity = form.edited_entity
-        for entity in entity.unrelated(self.name, targettype,
-                                       self.role, limit).entities():
+        for entity in entity.unrelated(self.name, targettype, self.role, limit,
+                                       lt_infos=form.linked_to).entities():
             if entity.eid in done:
                 continue
             done.add(entity.eid)