--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/cubicweb/devtools/fill.py Sat Jan 16 13:48:51 2016 +0100
@@ -0,0 +1,557 @@
+# -*- coding: iso-8859-1 -*-
+# copyright 2003-2014 LOGILAB S.A. (Paris, FRANCE), all rights reserved.
+# contact http://www.logilab.fr/ -- mailto:contact@logilab.fr
+#
+# This file is part of CubicWeb.
+#
+# CubicWeb is free software: you can redistribute it and/or modify it under the
+# terms of the GNU Lesser General Public License as published by the Free
+# Software Foundation, either version 2.1 of the License, or (at your option)
+# any later version.
+#
+# CubicWeb is distributed in the hope that it will be useful, but WITHOUT
+# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
+# FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
+# details.
+#
+# You should have received a copy of the GNU Lesser General Public License along
+# with CubicWeb. If not, see <http://www.gnu.org/licenses/>.
+"""This modules defines func / methods for creating test repositories"""
+from __future__ import print_function
+
+__docformat__ = "restructuredtext en"
+
+import logging
+from random import randint, choice
+from copy import deepcopy
+from datetime import datetime, date, time, timedelta
+from decimal import Decimal
+import inspect
+
+from six import text_type, add_metaclass
+from six.moves import range
+
+from logilab.common import attrdict
+from logilab.mtconverter import xml_escape
+from yams.constraints import (SizeConstraint, StaticVocabularyConstraint,
+ IntervalBoundConstraint, BoundaryConstraint,
+ Attribute, actual_value)
+from rql.utils import decompose_b26 as base_decompose_b26
+
+from cubicweb import Binary
+from cubicweb.schema import RQLConstraint
+
+def custom_range(start, stop, step):
+ while start < stop:
+ yield start
+ start += step
+
+def decompose_b26(index, ascii=False):
+ """return a letter (base-26) decomposition of index"""
+ if ascii:
+ return base_decompose_b26(index)
+ return base_decompose_b26(index, u'éabcdefghijklmnopqrstuvwxyz')
+
+def get_max_length(eschema, attrname):
+ """returns the maximum length allowed for 'attrname'"""
+ for cst in eschema.rdef(attrname).constraints:
+ if isinstance(cst, SizeConstraint) and cst.max:
+ return cst.max
+ return 300
+ #raise AttributeError('No Size constraint on attribute "%s"' % attrname)
+
+_GENERATED_VALUES = {}
+
+class _ValueGenerator(object):
+ """generates integers / dates / strings / etc. to fill a DB table"""
+
+ def __init__(self, eschema, choice_func=None):
+ """<choice_func> is a function that returns a list of possible
+ choices for a given entity type and an attribute name. It should
+ looks like :
+ def values_for(etype, attrname):
+ # some stuff ...
+ return alist_of_acceptable_values # or None
+ """
+ self.choice_func = choice_func
+ self.eschema = eschema
+
+ def generate_attribute_value(self, entity, attrname, index=1, **kwargs):
+ if attrname in entity:
+ return entity[attrname]
+ eschema = self.eschema
+ if not eschema.has_unique_values(attrname):
+ value = self.__generate_value(entity, attrname, index, **kwargs)
+ else:
+ value = self.__generate_value(entity, attrname, index, **kwargs)
+ while value in _GENERATED_VALUES.get((eschema, attrname), ()):
+ index += 1
+ value = self.__generate_value(entity, attrname, index, **kwargs)
+ _GENERATED_VALUES.setdefault((eschema, attrname), set()).add(value)
+ entity[attrname] = value
+ return value
+
+ def __generate_value(self, entity, attrname, index, **kwargs):
+ """generates a consistent value for 'attrname'"""
+ eschema = self.eschema
+ attrtype = str(eschema.destination(attrname)).lower()
+ # Before calling generate_%s functions, try to find values domain
+ if self.choice_func is not None:
+ values_domain = self.choice_func(eschema, attrname)
+ if values_domain is not None:
+ return choice(values_domain)
+ gen_func = getattr(self, 'generate_%s_%s' % (eschema, attrname),
+ getattr(self, 'generate_Any_%s' % attrname, None))
+ if gen_func is not None:
+ return gen_func(entity, index, **kwargs)
+ # If no specific values domain, then generate a dummy value
+ gen_func = getattr(self, 'generate_%s' % (attrtype))
+ return gen_func(entity, attrname, index, **kwargs)
+
+ def generate_string(self, entity, attrname, index, format=None):
+ """generates a consistent value for 'attrname' if it's a string"""
+ # First try to get choices
+ choosed = self.get_choice(entity, attrname)
+ if choosed is not None:
+ return choosed
+ # All other case, generate a default string
+ attrlength = get_max_length(self.eschema, attrname)
+ num_len = numlen(index)
+ if num_len >= attrlength:
+ ascii = self.eschema.rdef(attrname).internationalizable
+ return ('&'+decompose_b26(index, ascii))[:attrlength]
+ # always use plain text when no format is specified
+ attrprefix = attrname[:max(attrlength-num_len-1, 0)]
+ if format == 'text/html':
+ value = u'<span>é%s<b>%d</b></span>' % (attrprefix, index)
+ elif format == 'text/rest':
+ value = u"""
+title
+-----
+
+* %s
+* %d
+* é&
+""" % (attrprefix, index)
+ else:
+ value = u'é&%s%d' % (attrprefix, index)
+ return value[:attrlength]
+
+ def generate_password(self, entity, attrname, index):
+ """generates a consistent value for 'attrname' if it's a password"""
+ return u'toto'
+
+ def generate_integer(self, entity, attrname, index):
+ """generates a consistent value for 'attrname' if it's an integer"""
+ return self._constrained_generate(entity, attrname, 0, 1, index)
+ generate_int = generate_bigint = generate_integer
+
+ def generate_float(self, entity, attrname, index):
+ """generates a consistent value for 'attrname' if it's a float"""
+ return self._constrained_generate(entity, attrname, 0.0, 1.0, index)
+
+ def generate_decimal(self, entity, attrname, index):
+ """generates a consistent value for 'attrname' if it's a float"""
+ return Decimal(str(self.generate_float(entity, attrname, index)))
+
+ def generate_datetime(self, entity, attrname, index):
+ """generates a random date (format is 'yyyy-mm-dd HH:MM')"""
+ base = datetime(randint(2000, 2004), randint(1, 12), randint(1, 28), 11, index%60)
+ return self._constrained_generate(entity, attrname, base, timedelta(hours=1), index)
+
+ generate_tzdatetime = generate_datetime # XXX implementation should add a timezone
+
+ def generate_date(self, entity, attrname, index):
+ """generates a random date (format is 'yyyy-mm-dd')"""
+ base = date(randint(2000, 2010), 1, 1) + timedelta(randint(1, 365))
+ return self._constrained_generate(entity, attrname, base, timedelta(days=1), index)
+
+ def generate_interval(self, entity, attrname, index):
+ """generates a random date (format is 'yyyy-mm-dd')"""
+ base = timedelta(randint(1, 365))
+ return self._constrained_generate(entity, attrname, base, timedelta(days=1), index)
+
+ def generate_time(self, entity, attrname, index):
+ """generates a random time (format is ' HH:MM')"""
+ return time(11, index%60) #'11:%02d' % (index % 60)
+
+ generate_tztime = generate_time # XXX implementation should add a timezone
+
+ def generate_bytes(self, entity, attrname, index, format=None):
+ fakefile = Binary(("%s%s" % (attrname, index)).encode('ascii'))
+ fakefile.filename = u"file_%s" % attrname
+ return fakefile
+
+ def generate_boolean(self, entity, attrname, index):
+ """generates a consistent value for 'attrname' if it's a boolean"""
+ return index % 2 == 0
+
+ def _constrained_generate(self, entity, attrname, base, step, index):
+ choosed = self.get_choice(entity, attrname)
+ if choosed is not None:
+ return choosed
+ # ensure index > 0
+ index += 1
+ minvalue, maxvalue = self.get_bounds(entity, attrname)
+ if maxvalue is None:
+ if minvalue is not None:
+ base = max(minvalue, base)
+ maxvalue = base + index * step
+ if minvalue is None:
+ minvalue = maxvalue - (index * step) # i.e. randint(-index, 0)
+ return choice(list(custom_range(minvalue, maxvalue, step)))
+
+ def _actual_boundary(self, entity, attrname, boundary):
+ if isinstance(boundary, Attribute):
+ # ensure we've a value for this attribute
+ entity[attrname] = None # infinite loop safety belt
+ if not boundary.attr in entity:
+ self.generate_attribute_value(entity, boundary.attr)
+ boundary = actual_value(boundary, entity)
+ return boundary
+
+ def get_bounds(self, entity, attrname):
+ minvalue = maxvalue = None
+ for cst in self.eschema.rdef(attrname).constraints:
+ if isinstance(cst, IntervalBoundConstraint):
+ minvalue = self._actual_boundary(entity, attrname, cst.minvalue)
+ maxvalue = self._actual_boundary(entity, attrname, cst.maxvalue)
+ elif isinstance(cst, BoundaryConstraint):
+ if cst.operator[0] == '<':
+ maxvalue = self._actual_boundary(entity, attrname, cst.boundary)
+ else:
+ minvalue = self._actual_boundary(entity, attrname, cst.boundary)
+ return minvalue, maxvalue
+
+ def get_choice(self, entity, attrname):
+ """generates a consistent value for 'attrname' if it has some static
+ vocabulary set, else return None.
+ """
+ for cst in self.eschema.rdef(attrname).constraints:
+ if isinstance(cst, StaticVocabularyConstraint):
+ return text_type(choice(cst.vocabulary()))
+ return None
+
+ # XXX nothing to do here
+ def generate_Any_data_format(self, entity, index, **kwargs):
+ # data_format attribute of File has no vocabulary constraint, we
+ # need this method else stupid values will be set which make mtconverter
+ # raise exception
+ return u'application/octet-stream'
+
+ def generate_Any_content_format(self, entity, index, **kwargs):
+ # content_format attribute of EmailPart has no vocabulary constraint, we
+ # need this method else stupid values will be set which make mtconverter
+ # raise exception
+ return u'text/plain'
+
+ def generate_CWDataImport_log(self, entity, index, **kwargs):
+ # content_format attribute of EmailPart has no vocabulary constraint, we
+ # need this method else stupid values will be set which make mtconverter
+ # raise exception
+ logs = [u'%s\t%s\t%s\t%s<br/>' % (logging.ERROR, 'http://url.com?arg1=hop&arg2=hip',
+ 1, xml_escape('hjoio&oio"'))]
+ return u'<br/>'.join(logs)
+
+
+class autoextend(type):
+ def __new__(mcs, name, bases, classdict):
+ for attrname, attrvalue in classdict.items():
+ if callable(attrvalue):
+ if attrname.startswith('generate_') and \
+ len(inspect.getargspec(attrvalue).args) < 2:
+ raise TypeError('generate_xxx must accept at least 1 argument')
+ setattr(_ValueGenerator, attrname, attrvalue)
+ return type.__new__(mcs, name, bases, classdict)
+
+
+@add_metaclass(autoextend)
+class ValueGenerator(_ValueGenerator):
+ pass
+
+
+def _default_choice_func(etype, attrname):
+ """default choice_func for insert_entity_queries"""
+ return None
+
+def insert_entity_queries(etype, schema, vreg, entity_num,
+ choice_func=_default_choice_func):
+ """returns a list of 'add entity' queries (couples query, args)
+ :type etype: str
+ :param etype: the entity's type
+
+ :type schema: cubicweb.schema.Schema
+ :param schema: the instance schema
+
+ :type entity_num: int
+ :param entity_num: the number of entities to insert
+
+ XXX FIXME: choice_func is here for *historical* reasons, it should
+ probably replaced by a nicer way to specify choices
+ :type choice_func: function
+ :param choice_func: a function that takes an entity type, an attrname and
+ returns acceptable values for this attribute
+ """
+ queries = []
+ for index in range(entity_num):
+ restrictions = []
+ args = {}
+ for attrname, value in make_entity(etype, schema, vreg, index, choice_func).items():
+ restrictions.append('X %s %%(%s)s' % (attrname, attrname))
+ args[attrname] = value
+ if restrictions:
+ queries.append(('INSERT %s X: %s' % (etype, ', '.join(restrictions)),
+ args))
+ assert not 'eid' in args, args
+ else:
+ queries.append(('INSERT %s X' % etype, {}))
+ return queries
+
+
+def make_entity(etype, schema, vreg, index=0, choice_func=_default_choice_func,
+ form=False):
+ """generates a random entity and returns it as a dict
+
+ by default, generate an entity to be inserted in the repository
+ elif form, generate an form dictionary to be given to a web controller
+ """
+ eschema = schema.eschema(etype)
+ valgen = ValueGenerator(eschema, choice_func)
+ entity = attrdict()
+ # preprocessing to deal with _format fields
+ attributes = []
+ relatedfields = {}
+ for rschema, attrschema in eschema.attribute_definitions():
+ attrname = rschema.type
+ if attrname == 'eid':
+ # don't specify eids !
+ continue
+ if attrname.endswith('_format') and attrname[:-7] in eschema.subject_relations():
+ relatedfields[attrname[:-7]] = attrschema
+ else:
+ attributes.append((attrname, attrschema))
+ for attrname, attrschema in attributes:
+ if attrname in relatedfields:
+ # first generate a format and record it
+ format = valgen.generate_attribute_value(entity, attrname + '_format', index)
+ # then a value coherent with this format
+ value = valgen.generate_attribute_value(entity, attrname, index, format=format)
+ else:
+ value = valgen.generate_attribute_value(entity, attrname, index)
+ if form: # need to encode values
+ if attrschema.type == 'Bytes':
+ # twisted way
+ fakefile = value
+ filename = value.filename
+ value = (filename, u"text/plain", fakefile)
+ elif attrschema.type == 'Date':
+ value = value.strftime(vreg.property_value('ui.date-format'))
+ elif attrschema.type == 'Datetime':
+ value = value.strftime(vreg.property_value('ui.datetime-format'))
+ elif attrschema.type == 'Time':
+ value = value.strftime(vreg.property_value('ui.time-format'))
+ elif attrschema.type == 'Float':
+ fmt = vreg.property_value('ui.float-format')
+ value = fmt % value
+ else:
+ value = text_type(value)
+ return entity
+
+
+
+def select(constraints, cnx, selectvar='O', objtype=None):
+ """returns list of eids matching <constraints>
+
+ <selectvar> should be either 'O' or 'S' to match schema definitions
+ """
+ try:
+ rql = 'Any %s WHERE %s' % (selectvar, constraints)
+ if objtype:
+ rql += ', %s is %s' % (selectvar, objtype)
+ rset = cnx.execute(rql)
+ except Exception:
+ print("could restrict eid_list with given constraints (%r)" % constraints)
+ return []
+ return set(eid for eid, in rset.rows)
+
+
+
+def make_relations_queries(schema, edict, cnx, ignored_relations=(),
+ existingrels=None):
+ """returns a list of generated RQL queries for relations
+ :param schema: The instance schema
+
+ :param e_dict: mapping between etypes and eids
+
+ :param ignored_relations: list of relations to ignore (i.e. don't try
+ to generate insert queries for these relations)
+ """
+ gen = RelationsQueriesGenerator(schema, cnx, existingrels)
+ return gen.compute_queries(edict, ignored_relations)
+
+def composite_relation(rschema):
+ for obj in rschema.objects():
+ if obj.rdef(rschema, 'object', takefirst=True).composite == 'subject':
+ return True
+ for obj in rschema.subjects():
+ if obj.rdef(rschema, 'subject', takefirst=True).composite == 'object':
+ return True
+ return False
+
+class RelationsQueriesGenerator(object):
+ rql_tmpl = 'SET S %s O WHERE S eid %%(subjeid)s, O eid %%(objeid)s'
+ def __init__(self, schema, cnx, existing=None):
+ self.schema = schema
+ self.cnx = cnx
+ self.existingrels = existing or {}
+
+ def compute_queries(self, edict, ignored_relations):
+ queries = []
+ # 1/ skip final relations and explictly ignored relations
+ rels = sorted([rschema for rschema in self.schema.relations()
+ if not (rschema.final or rschema in ignored_relations)],
+ key=lambda x:not composite_relation(x))
+ # for each relation
+ # 2/ take each possible couple (subj, obj)
+ # 3/ analyze cardinality of relation
+ # a/ if relation is mandatory, insert one relation
+ # b/ else insert N relations where N is the mininum
+ # of 20 and the number of existing targetable entities
+ for rschema in rels:
+ sym = set()
+ sedict = deepcopy(edict)
+ oedict = deepcopy(edict)
+ delayed = []
+ # for each couple (subjschema, objschema), insert relations
+ for subj, obj in rschema.rdefs:
+ sym.add( (subj, obj) )
+ if rschema.symmetric and (obj, subj) in sym:
+ continue
+ subjcard, objcard = rschema.rdef(subj, obj).cardinality
+ # process mandatory relations first
+ if subjcard in '1+' or objcard in '1+' or composite_relation(rschema):
+ for query, args in self.make_relation_queries(sedict, oedict,
+ rschema, subj, obj):
+ yield query, args
+ else:
+ delayed.append( (subj, obj) )
+ for subj, obj in delayed:
+ for query, args in self.make_relation_queries(sedict, oedict, rschema,
+ subj, obj):
+ yield query, args
+
+ def qargs(self, subjeids, objeids, subjcard, objcard, subjeid, objeid):
+ if subjcard in '?1+':
+ subjeids.remove(subjeid)
+ if objcard in '?1+':
+ objeids.remove(objeid)
+ return {'subjeid' : subjeid, 'objeid' : objeid}
+
+ def make_relation_queries(self, sedict, oedict, rschema, subj, obj):
+ rdef = rschema.rdef(subj, obj)
+ subjcard, objcard = rdef.cardinality
+ subjeids = sedict.get(subj, frozenset())
+ used = self.existingrels[rschema.type]
+ preexisting_subjrels = set(subj for subj, obj in used)
+ preexisting_objrels = set(obj for subj, obj in used)
+ # if there are constraints, only select appropriate objeids
+ q = self.rql_tmpl % rschema.type
+ constraints = [c for c in rdef.constraints
+ if isinstance(c, RQLConstraint)]
+ if constraints:
+ restrictions = ', '.join(c.expression for c in constraints)
+ q += ', %s' % restrictions
+ # restrict object eids if possible
+ # XXX the attempt to restrict below in completely wrong
+ # disabling it for now
+ objeids = select(restrictions, self.cnx, objtype=obj)
+ else:
+ objeids = oedict.get(obj, frozenset())
+ if subjcard in '?1' or objcard in '?1':
+ for subjeid, objeid in used:
+ if subjcard in '?1' and subjeid in subjeids:
+ subjeids.remove(subjeid)
+ # XXX why?
+ #if objeid in objeids:
+ # objeids.remove(objeid)
+ if objcard in '?1' and objeid in objeids:
+ objeids.remove(objeid)
+ # XXX why?
+ #if subjeid in subjeids:
+ # subjeids.remove(subjeid)
+ if not subjeids:
+ check_card_satisfied(objcard, objeids, subj, rschema, obj)
+ return
+ if not objeids:
+ check_card_satisfied(subjcard, subjeids, subj, rschema, obj)
+ return
+ if subjcard in '?1+':
+ for subjeid in tuple(subjeids):
+ # do not insert relation if this entity already has a relation
+ if subjeid in preexisting_subjrels:
+ continue
+ objeid = choose_eid(objeids, subjeid)
+ if objeid is None or (subjeid, objeid) in used:
+ continue
+ yield q, self.qargs(subjeids, objeids, subjcard, objcard,
+ subjeid, objeid)
+ used.add( (subjeid, objeid) )
+ if not objeids:
+ check_card_satisfied(subjcard, subjeids, subj, rschema, obj)
+ break
+ elif objcard in '?1+':
+ for objeid in tuple(objeids):
+ # do not insert relation if this entity already has a relation
+ if objeid in preexisting_objrels:
+ continue
+ subjeid = choose_eid(subjeids, objeid)
+ if subjeid is None or (subjeid, objeid) in used:
+ continue
+ yield q, self.qargs(subjeids, objeids, subjcard, objcard,
+ subjeid, objeid)
+ used.add( (subjeid, objeid) )
+ if not subjeids:
+ check_card_satisfied(objcard, objeids, subj, rschema, obj)
+ break
+ else:
+ # FIXME: 20 should be read from config
+ subjeidsiter = [choice(tuple(subjeids)) for i in range(min(len(subjeids), 20))]
+ objeidsiter = [choice(tuple(objeids)) for i in range(min(len(objeids), 20))]
+ for subjeid, objeid in zip(subjeidsiter, objeidsiter):
+ if subjeid != objeid and not (subjeid, objeid) in used:
+ used.add( (subjeid, objeid) )
+ yield q, self.qargs(subjeids, objeids, subjcard, objcard,
+ subjeid, objeid)
+
+def check_card_satisfied(card, remaining, subj, rschema, obj):
+ if card in '1+' and remaining:
+ raise Exception("can't satisfy cardinality %s for relation %s %s %s" %
+ (card, subj, rschema, obj))
+
+
+def choose_eid(values, avoid):
+ values = tuple(values)
+ if len(values) == 1 and values[0] == avoid:
+ return None
+ objeid = choice(values)
+ while objeid == avoid: # avoid infinite recursion like in X comment X
+ objeid = choice(values)
+ return objeid
+
+
+
+# UTILITIES FUNCS ##############################################################
+def make_tel(num_tel):
+ """takes an integer, converts is as a string and inserts
+ white spaces each 2 chars (french notation)
+ """
+ num_list = list(str(num_tel))
+ for index in (6, 4, 2):
+ num_list.insert(index, ' ')
+
+ return ''.join(num_list)
+
+
+def numlen(number):
+ """returns the number's length"""
+ return len(str(number))