# HG changeset patch # User Florent Cayré # Date 1319119431 -7200 # Node ID 64639bc94e25d661853580212aafe73ed3776e64 # Parent 3e51c2a577dd149219e508d29342477874dc2ba5 [entity] restrict creation form field vocabulary using __linkto information (closes #1799997) diff -r 3e51c2a577dd -r 64639bc94e25 entity.py --- a/entity.py Thu Oct 20 14:22:16 2011 +0200 +++ b/entity.py Thu Oct 20 16:03:51 2011 +0200 @@ -37,7 +37,8 @@ from cubicweb.selectors import yes from cubicweb.appobject import AppObject from cubicweb.req import _check_cw_unsafe -from cubicweb.schema import RQLVocabularyConstraint, RQLConstraint +from cubicweb.schema import (RQLVocabularyConstraint, RQLConstraint, + GeneratedConstraint) from cubicweb.rqlrewrite import RQLRewriter from cubicweb.uilib import soup2xhtml @@ -65,6 +66,85 @@ return False return True +def rel_vars(rel): + return ((isinstance(rel.children[0], VariableRef) + and rel.children[0].variable or None), + (isinstance(rel.children[1].children[0], VariableRef) + and rel.children[1].children[0].variable or None) + ) + +def rel_matches(rel, rtype, role, varname, operator='='): + if rel.r_type == rtype and rel.children[1].operator == operator: + same_role_var_idx = 0 if role == 'subject' else 1 + variables = rel_vars(rel) + if variables[same_role_var_idx].name == varname: + return variables[1 - same_role_var_idx] + +def build_cstr_with_linkto_infos(cstr, args, searchedvar, evar, + lt_infos, eidvars): + """restrict vocabulary as much as possible in entity creation, + based on infos provided by __linkto form param. + + Example based on following schema: + + class works_in(RelationDefinition): + subject = 'CWUser' + object = 'Lab' + cardinality = '1*' + constraints = [RQLConstraint('S in_group G, O welcomes G')] + + class welcomes(RelationDefinition): + subject = 'Lab' + object = 'CWGroup' + + If you create a CWUser in the "scientists" CWGroup you can show + only the labs that welcome them using : + + lt_infos = {('in_group', 'subject'): 321} + + You get following restriction : 'O welcomes G, G eid 321' + + """ + st = cstr.snippet_rqlst.copy() + # replace relations in ST by eid infos from linkto where possible + for (info_rtype, info_role), eids in lt_infos.iteritems(): + eid = eids[0] # NOTE: we currently assume a pruned lt_info with only 1 eid + for rel in st.iget_nodes(RqlRelation): + targetvar = rel_matches(rel, info_rtype, info_role, evar.name) + if targetvar is not None: + if targetvar.name in eidvars: + rel.parent.remove(rel) + else: + eidrel = make_relation( + targetvar, 'eid', (targetvar.name, 'Substitute'), + Constant) + rel.parent.replace(rel, eidrel) + args[targetvar.name] = eid + eidvars.add(targetvar.name) + # if modified ST still contains evar references we must discard the + # constraint, otherwise evar is unknown in the final rql query which can + # lead to a SQL table cartesian product and multiple occurences of solutions + evarname = evar.name + for rel in st.iget_nodes(RqlRelation): + for variable in rel_vars(rel): + if variable and evarname == variable.name: + return + # else insert snippets into the global tree + return GeneratedConstraint(st, cstr.mainvars - set(evarname)) + +def pruned_lt_info(eschema, lt_infos): + pruned = {} + for (lt_rtype, lt_role), eids in lt_infos.iteritems(): + # we can only use lt_infos describing relation with a cardinality + # of value 1 towards the linked entity + if not len(eids) == 1: + continue + lt_card = eschema.rdef(lt_rtype, lt_role).cardinality[ + 0 if lt_role == 'subject' else 1] + if lt_card not in '?1': + continue + pruned[(lt_rtype, lt_role)] = eids + return pruned class Entity(AppObject): """an entity instance has e_schema automagically set on @@ -931,17 +1011,22 @@ # generic vocabulary methods ############################################## def cw_unrelated_rql(self, rtype, targettype, role, ordermethod=None, - vocabconstraints=True): + vocabconstraints=True, lt_infos={}): """build a rql to fetch `targettype` entities unrelated to this entity using (rtype, role) relation. Consider relation permissions so that returned entities may be actually linked by `rtype`. + + `lt_infos` are supplementary informations, usually coming from __linkto + parameter, that can help further restricting the results in case current + entity is not yet created. It is a dict describing entities the current + entity will be linked to, which keys are (rtype, role) tuples and values + are a list of eids. """ ordermethod = ordermethod or 'fetch_unrelated_order' - if isinstance(rtype, basestring): - rtype = self._cw.vreg.schema.rschema(rtype) - rdef = rtype.role_rdef(self.e_schema, targettype, role) + rschema = self._cw.vreg.schema.rschema(rtype) + rdef = rschema.role_rdef(self.e_schema, targettype, role) rewriter = RQLRewriter(self._cw) select = Select() # initialize some variables according to the `role` of `self` in the @@ -955,20 +1040,20 @@ searchedvar = subjvar = select.get_variable('S') evar = objvar = select.get_variable('O') select.add_selected(searchedvar) - # initialize some variables according to `self` existance + # initialize some variables according to `self` existence if rdef.role_cardinality(neg_role(role)) in '?1': # if cardinality in '1?', we want a target entity which isn't # already linked using this relation - var = select.get_variable('ZZ') # XXX unname when tests pass + variable = select.make_variable() if role == 'subject': - rel = make_relation(var, rtype.type, (searchedvar,), VariableRef) + rel = make_relation(variable, rtype, (searchedvar,), VariableRef) else: - rel = make_relation(searchedvar, rtype.type, (var,), VariableRef) + rel = make_relation(searchedvar, rtype, (variable,), VariableRef) select.add_restriction(Not(rel)) elif self.has_eid(): # elif we have an eid, we don't want a target entity which is # already linked to ourself through this relation - rel = make_relation(subjvar, rtype.type, (objvar,), VariableRef) + rel = make_relation(subjvar, rtype, (objvar,), VariableRef) select.add_restriction(Not(rel)) if self.has_eid(): rel = make_relation(evar, 'eid', ('x', 'Substitute'), Constant) @@ -998,11 +1083,23 @@ cstrcls = RQLVocabularyConstraint else: cstrcls = RQLConstraint + lt_infos = pruned_lt_info(self.e_schema, lt_infos or {}) + # if there are still lt_infos, use set to keep track of added eid + # relations (adding twice the same eid relation is incorrect RQL) + eidvars = set() for cstr in rdef.constraints: # consider constraint.mainvars to check if constraint apply if isinstance(cstr, cstrcls) and searchedvar.name in cstr.mainvars: - if not self.has_eid() and evar.name in cstr.mainvars: - continue + if not self.has_eid(): + if lt_infos: + # we can perhaps further restrict with linkto infos using + # a custom constraint built from cstr and lt_infos + cstr = build_cstr_with_linkto_infos( + cstr, args, searchedvar, evar, lt_infos, eidvars) + if cstr is None: + continue # could not build constraint -> discard + elif evar.name in cstr.mainvars: + continue # compute a varmap suitable to RQLRewriter.rewrite argument varmap = dict((v, v) for v in (searchedvar.name, evar.name) if v in select.defined_vars and v in cstr.mainvars) @@ -1028,12 +1125,13 @@ return rql, args def unrelated(self, rtype, targettype, role='subject', limit=None, - ordermethod=None): # XXX .cw_unrelated + ordermethod=None, lt_infos={}): # XXX .cw_unrelated """return a result set of target type objects that may be related by a given relation, with self as subject or object """ try: - rql, args = self.cw_unrelated_rql(rtype, targettype, role, ordermethod) + rql, args = self.cw_unrelated_rql(rtype, targettype, role, + ordermethod, lt_infos=lt_infos) except Unauthorized: return self._cw.empty_rset() # XXX should be set in unrelated rql when manipulating the AST diff -r 3e51c2a577dd -r 64639bc94e25 i18n/fr.po --- a/i18n/fr.po Thu Oct 20 14:22:16 2011 +0200 +++ b/i18n/fr.po Thu Oct 20 16:03:51 2011 +0200 @@ -223,7 +223,7 @@ "
This schema of the data model excludes the meta-data, but you " "can also display a complete schema with meta-data.
" msgstr "" -"
Ce schéma du modèle de données exclue les méta-données, mais " +"
Ce schéma du modèle de données exclut les méta-données, mais " "vous pouvez afficher un schéma complet.
" msgid "" diff -r 3e51c2a577dd -r 64639bc94e25 schema.py --- a/schema.py Thu Oct 20 14:22:16 2011 +0200 +++ b/schema.py Thu Oct 20 16:03:51 2011 +0200 @@ -850,23 +850,39 @@ return self._check(session, **kwargs) +def vargraph(rqlst): + """ builds an adjacency graph of variables from the rql syntax tree, e.g: + Any O,S WHERE T subworkflow_exit S, T subworkflow WF, O state_of WF + => {'WF': ['O', 'T'], 'S': ['T'], 'T': ['WF', 'S'], 'O': ['WF']} + """ + vargraph = {} + for relation in rqlst.get_nodes(nodes.Relation): + try: + rhsvarname = relation.children[1].children[0].variable.name + lhsvarname = relation.children[0].name + except AttributeError: + pass + else: + vargraph.setdefault(lhsvarname, []).append(rhsvarname) + vargraph.setdefault(rhsvarname, []).append(lhsvarname) + #vargraph[(lhsvarname, rhsvarname)] = relation.r_type + return vargraph + + +class GeneratedConstraint(object): + def __init__(self, rqlst, mainvars): + self.snippet_rqlst = rqlst + self.mainvars = mainvars + self.vargraph = vargraph(rqlst) + + class RRQLExpression(RQLExpression): def __init__(self, expression, mainvars=None, eid=None): if mainvars is None: mainvars = guess_rrqlexpr_mainvars(expression) RQLExpression.__init__(self, expression, mainvars, eid) # graph of links between variable, used by rql rewriter - self.vargraph = {} - for relation in self.rqlst.get_nodes(nodes.Relation): - try: - rhsvarname = relation.children[1].children[0].variable.name - lhsvarname = relation.children[0].name - except AttributeError: - pass - else: - self.vargraph.setdefault(lhsvarname, []).append(rhsvarname) - self.vargraph.setdefault(rhsvarname, []).append(lhsvarname) - #self.vargraph[(lhsvarname, rhsvarname)] = relation.r_type + self.vargraph = vargraph(self.rqlst) @property def full_rql(self): diff -r 3e51c2a577dd -r 64639bc94e25 test/data/schema.py --- a/test/data/schema.py Thu Oct 20 14:22:16 2011 +0200 +++ b/test/data/schema.py Thu Oct 20 16:03:51 2011 +0200 @@ -37,13 +37,19 @@ # unittest_entity.py RQLVocabularyConstraint('NOT (S connait P, P nom "toto")'), RQLVocabularyConstraint('S travaille P, P nom "tutu"')]) + actionnaire = SubjectRelation('Societe', cardinality='??', + constraints=[RQLConstraint('NOT EXISTS(O contrat_exclusif S)')]) + dirige = SubjectRelation('Societe', cardinality='??', + constraints=[RQLConstraint('S actionnaire O')]) + associe = SubjectRelation('Personne', cardinality='1*', + constraints=[RQLConstraint('S actionnaire SOC, O actionnaire SOC')]) class Societe(EntityType): nom = String() evaluee = SubjectRelation('Note') fournit = SubjectRelation(('Service', 'Produit'), cardinality='1*') - + contrat_exclusif = SubjectRelation('Personne', cardinality='??') class Service(EntityType): fabrique_par = SubjectRelation('Personne', cardinality='1*') diff -r 3e51c2a577dd -r 64639bc94e25 test/unittest_entity.py --- a/test/unittest_entity.py Thu Oct 20 14:22:16 2011 +0200 +++ b/test/unittest_entity.py Thu Oct 20 16:03:51 2011 +0200 @@ -25,7 +25,7 @@ from cubicweb.mttransforms import HAS_TAL from cubicweb.entities import fetch_config from cubicweb.uilib import soup2xhtml - +from cubicweb.schema import RQLVocabularyConstraint class EntityTC(CubicWebTC): @@ -239,7 +239,7 @@ self.assertEqual(n.cw_related_rql('evaluee', role='object', targettypes=('Societe', 'Personne')), "Any X,AA ORDERBY AB DESC WHERE E eid %(x)s, X evaluee E, " - "X is IN('Personne', 'Societe'), X nom AA, " + "X is IN(Personne, Societe), X nom AA, " "X modification_date AB") Personne.fetch_attrs, Personne.cw_fetch_order = fetch_config(('nom', )) # XXX @@ -279,7 +279,7 @@ user = self.request().user rql = user.cw_unrelated_rql('use_email', 'EmailAddress', 'subject')[0] self.assertEqual(rql, 'Any O,AA,AB,AC ORDERBY AC DESC ' - 'WHERE NOT ZZ use_email O, S eid %(x)s, ' + 'WHERE NOT A use_email O, S eid %(x)s, ' 'O is EmailAddress, O address AA, O alias AB, O modification_date AC') def test_unrelated_rql_security_1_user(self): @@ -289,23 +289,23 @@ user = req.user rql = user.cw_unrelated_rql('use_email', 'EmailAddress', 'subject')[0] self.assertEqual(rql, 'Any O,AA,AB,AC ORDERBY AC DESC ' - 'WHERE NOT ZZ use_email O, S eid %(x)s, ' + 'WHERE NOT A use_email O, S eid %(x)s, ' 'O is EmailAddress, O address AA, O alias AB, O modification_date AC') user = self.execute('Any X WHERE X login "admin"').get_entity(0, 0) rql = user.cw_unrelated_rql('use_email', 'EmailAddress', 'subject')[0] self.assertEqual(rql, 'Any O,AA,AB,AC ORDERBY AC DESC ' - 'WHERE NOT ZZ use_email O, S eid %(x)s, ' + 'WHERE NOT A use_email O, S eid %(x)s, ' 'O is EmailAddress, O address AA, O alias AB, O modification_date AC, AD eid %(AE)s, ' - 'EXISTS(S identity AD, NOT AD in_group AF, AF name "guests", AF is CWGroup), ZZ is CWUser') + 'EXISTS(S identity AD, NOT AD in_group AF, AF name "guests", AF is CWGroup), A is CWUser') def test_unrelated_rql_security_1_anon(self): self.login('anon') user = self.request().user rql = user.cw_unrelated_rql('use_email', 'EmailAddress', 'subject')[0] self.assertEqual(rql, 'Any O,AA,AB,AC ORDERBY AC DESC ' - 'WHERE NOT ZZ use_email O, S eid %(x)s, ' + 'WHERE NOT A use_email O, S eid %(x)s, ' 'O is EmailAddress, O address AA, O alias AB, O modification_date AC, AD eid %(AE)s, ' - 'EXISTS(S identity AD, NOT AD in_group AF, AF name "guests", AF is CWGroup), ZZ is CWUser') + 'EXISTS(S identity AD, NOT AD in_group AF, AF name "guests", AF is CWGroup), A is CWUser') def test_unrelated_rql_security_2(self): email = self.execute('INSERT EmailAddress X: X address "hop"').get_entity(0, 0) @@ -365,6 +365,76 @@ 'NOT S identity O, NOT (S connait AD, AD nom "toto"), ' 'EXISTS(S travaille AE, AE nom "tutu")') + def test_unrelated_rql_s_linkto_s(self): + req = self.request() + person = self.vreg['etypes'].etype_class('Personne')(req) + self.vreg['etypes'].etype_class('Personne').fetch_attrs = () + soc = req.create_entity('Societe', nom=u'logilab') + lt_infos = {('actionnaire', 'subject'): [soc.eid]} + rql, args = person.cw_unrelated_rql('associe', 'Personne', 'subject', + lt_infos=lt_infos) + self.assertEqual(u'Any O ORDERBY O WHERE O is Personne, ' + u'EXISTS(AA eid %(SOC)s, O actionnaire AA)', rql) + self.assertEqual({'SOC': soc.eid}, args) + + def test_unrelated_rql_s_linkto_o(self): + req = self.request() + person = self.vreg['etypes'].etype_class('Personne')(req) + self.vreg['etypes'].etype_class('Societe').fetch_attrs = () + soc = req.create_entity('Societe', nom=u'logilab') + lt_infos = {('contrat_exclusif', 'object'): [soc.eid]} + rql, args = person.cw_unrelated_rql('actionnaire', 'Societe', 'subject', + lt_infos=lt_infos) + self.assertEqual(u'Any O ORDERBY O WHERE NOT A actionnaire O, ' + u'O is Societe, NOT EXISTS(O eid %(O)s), ' + u'A is Personne', rql) + self.assertEqual({'O': soc.eid}, args) + + def test_unrelated_rql_o_linkto_s(self): + req = self.request() + soc = self.vreg['etypes'].etype_class('Societe')(req) + self.vreg['etypes'].etype_class('Personne').fetch_attrs = () + person = req.create_entity('Personne', nom=u'florent') + lt_infos = {('contrat_exclusif', 'subject'): [person.eid]} + rql, args = soc.cw_unrelated_rql('actionnaire', 'Personne', 'object', + lt_infos=lt_infos) + self.assertEqual(u'Any S ORDERBY S WHERE NOT S actionnaire A, ' + u'S is Personne, NOT EXISTS(S eid %(S)s), ' + u'A is Societe', rql) + self.assertEqual({'S': person.eid}, args) + + def test_unrelated_rql_o_linkto_o(self): + req = self.request() + soc = self.vreg['etypes'].etype_class('Societe')(req) + self.vreg['etypes'].etype_class('Personne').fetch_attrs = () + person = req.create_entity('Personne', nom=u'florent') + lt_infos = {('actionnaire', 'object'): [person.eid]} + rql, args = soc.cw_unrelated_rql('dirige', 'Personne', 'object', + lt_infos=lt_infos) + self.assertEqual(u'Any S ORDERBY S WHERE NOT S dirige A, ' + u'S is Personne, EXISTS(S eid %(S)s), ' + u'A is Societe', rql) + self.assertEqual({'S': person.eid}, args) + + def test_unrelated_rql_s_linkto_s_no_info(self): + req = self.request() + person = self.vreg['etypes'].etype_class('Personne')(req) + self.vreg['etypes'].etype_class('Personne').fetch_attrs = () + soc = req.create_entity('Societe', nom=u'logilab') + rql, args = person.cw_unrelated_rql('associe', 'Personne', 'subject') + self.assertEqual(u'Any O ORDERBY O WHERE O is Personne', rql) + self.assertEqual({}, args) + + def test_unrelated_rql_s_linkto_s_unused_info(self): + req = self.request() + person = self.vreg['etypes'].etype_class('Personne')(req) + self.vreg['etypes'].etype_class('Personne').fetch_attrs = () + other_p = req.create_entity('Personne', nom=u'titi') + lt_infos = {('dirige', 'subject'): [other_p.eid]} + rql, args = person.cw_unrelated_rql('associe', 'Personne', 'subject', + lt_infos=lt_infos) + self.assertEqual(u'Any O ORDERBY O WHERE O is Personne', rql) + def test_unrelated_base(self): req = self.request() p = req.create_entity('Personne', nom=u'di mascio', prenom=u'adrien') diff -r 3e51c2a577dd -r 64639bc94e25 web/formfields.py --- a/web/formfields.py Thu Oct 20 14:22:16 2011 +0200 +++ b/web/formfields.py Thu Oct 20 16:03:51 2011 +0200 @@ -1107,8 +1107,8 @@ done = set() res = [] entity = form.edited_entity - for entity in entity.unrelated(self.name, targettype, - self.role, limit).entities(): + for entity in entity.unrelated(self.name, targettype, self.role, limit, + lt_infos=form.linked_to).entities(): if entity.eid in done: continue done.add(entity.eid)