dataimport/deprecated.py
changeset 10513 7bec01a59f92
child 10589 7c23b7de2b8d
equal deleted inserted replaced
10512:99bdd4bddd77 10513:7bec01a59f92
       
     1 # copyright 2003-2015 LOGILAB S.A. (Paris, FRANCE), all rights reserved.
       
     2 # contact http://www.logilab.fr/ -- mailto:contact@logilab.fr
       
     3 #
       
     4 # This file is part of CubicWeb.
       
     5 #
       
     6 # CubicWeb is free software: you can redistribute it and/or modify it under the
       
     7 # terms of the GNU Lesser General Public License as published by the Free
       
     8 # Software Foundation, either version 2.1 of the License, or (at your option)
       
     9 # any later version.
       
    10 #
       
    11 # CubicWeb is distributed in the hope that it will be useful, but WITHOUT
       
    12 # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
       
    13 # FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for more
       
    14 # details.
       
    15 #
       
    16 # You should have received a copy of the GNU Lesser General Public License along
       
    17 # with CubicWeb.  If not, see <http://www.gnu.org/licenses/>.
       
    18 """Old and deprecated dataimport API that provides tools to import tabular data.
       
    19 
       
    20 
       
    21 Example of use (run this with `cubicweb-ctl shell instance import-script.py`):
       
    22 
       
    23 .. sourcecode:: python
       
    24 
       
    25   from cubicweb.dataimport import *
       
    26   # define data generators
       
    27   GENERATORS = []
       
    28 
       
    29   USERS = [('Prenom', 'firstname', ()),
       
    30            ('Nom', 'surname', ()),
       
    31            ('Identifiant', 'login', ()),
       
    32            ]
       
    33 
       
    34   def gen_users(ctl):
       
    35       for row in ctl.iter_and_commit('utilisateurs'):
       
    36           entity = mk_entity(row, USERS)
       
    37           entity['upassword'] = 'motdepasse'
       
    38           ctl.check('login', entity['login'], None)
       
    39           entity = ctl.store.prepare_insert_entity('CWUser', **entity)
       
    40           email = ctl.store.prepare_insert_entity('EmailAddress', address=row['email'])
       
    41           ctl.store.prepare_insert_relation(entity, 'use_email', email)
       
    42           ctl.store.rql('SET U in_group G WHERE G name "users", U eid %(x)s', {'x': entity})
       
    43 
       
    44   CHK = [('login', check_doubles, 'Utilisateurs Login',
       
    45           'Deux utilisateurs ne devraient pas avoir le meme login.'),
       
    46          ]
       
    47 
       
    48   GENERATORS.append( (gen_users, CHK) )
       
    49 
       
    50   # create controller
       
    51   ctl = CWImportController(RQLObjectStore(cnx))
       
    52   ctl.askerror = 1
       
    53   ctl.generators = GENERATORS
       
    54   ctl.data['utilisateurs'] = lazytable(ucsvreader(open('users.csv')))
       
    55   # run
       
    56   ctl.run()
       
    57 
       
    58 .. BUG file with one column are not parsable
       
    59 .. TODO rollback() invocation is not possible yet
       
    60 """
       
    61 
       
    62 import sys
       
    63 import traceback
       
    64 from StringIO import StringIO
       
    65 
       
    66 from logilab.common import attrdict, shellutils
       
    67 from logilab.common.date import strptime
       
    68 from logilab.common.deprecation import deprecated, class_deprecated
       
    69 
       
    70 from cubicweb import QueryError
       
    71 from cubicweb.dataimport import callfunc_every
       
    72 
       
    73 
       
    74 @deprecated('[3.21] deprecated')
       
    75 def lazytable(reader):
       
    76     """The first row is taken to be the header of the table and
       
    77     used to output a dict for each row of data.
       
    78 
       
    79     >>> data = lazytable(ucsvreader(open(filename)))
       
    80     """
       
    81     header = reader.next()
       
    82     for row in reader:
       
    83         yield dict(zip(header, row))
       
    84 
       
    85 
       
    86 @deprecated('[3.21] deprecated')
       
    87 def lazydbtable(cu, table, headers, orderby=None):
       
    88     """return an iterator on rows of a sql table. On each row, fetch columns
       
    89     defined in headers and return values as a dictionary.
       
    90 
       
    91     >>> data = lazydbtable(cu, 'experimentation', ('id', 'nickname', 'gps'))
       
    92     """
       
    93     sql = 'SELECT %s FROM %s' % (','.join(headers), table,)
       
    94     if orderby:
       
    95         sql += ' ORDER BY %s' % ','.join(orderby)
       
    96     cu.execute(sql)
       
    97     while True:
       
    98         row = cu.fetchone()
       
    99         if row is None:
       
   100             break
       
   101         yield dict(zip(headers, row))
       
   102 
       
   103 
       
   104 @deprecated('[3.21] deprecated')
       
   105 def tell(msg):
       
   106     print msg
       
   107 
       
   108 
       
   109 @deprecated('[3.21] deprecated')
       
   110 def confirm(question):
       
   111     """A confirm function that asks for yes/no/abort and exits on abort."""
       
   112     answer = shellutils.ASK.ask(question, ('Y', 'n', 'abort'), 'Y')
       
   113     if answer == 'abort':
       
   114         sys.exit(1)
       
   115     return answer == 'Y'
       
   116 
       
   117 
       
   118 class catch_error(object):
       
   119     """Helper for @contextmanager decorator."""
       
   120     __metaclass__ = class_deprecated
       
   121     __deprecation_warning__ = '[3.21] deprecated'
       
   122 
       
   123     def __init__(self, ctl, key='unexpected error', msg=None):
       
   124         self.ctl = ctl
       
   125         self.key = key
       
   126         self.msg = msg
       
   127 
       
   128     def __enter__(self):
       
   129         return self
       
   130 
       
   131     def __exit__(self, type, value, traceback):
       
   132         if type is not None:
       
   133             if issubclass(type, (KeyboardInterrupt, SystemExit)):
       
   134                 return # re-raise
       
   135             if self.ctl.catcherrors:
       
   136                 self.ctl.record_error(self.key, None, type, value, traceback)
       
   137                 return True # silent
       
   138 
       
   139 @deprecated('[3.21] deprecated')
       
   140 def mk_entity(row, map):
       
   141     """Return a dict made from sanitized mapped values.
       
   142 
       
   143     ValueError can be raised on unexpected values found in checkers
       
   144 
       
   145     >>> row = {'myname': u'dupont'}
       
   146     >>> map = [('myname', u'name', (call_transform_method('title'),))]
       
   147     >>> mk_entity(row, map)
       
   148     {'name': u'Dupont'}
       
   149     >>> row = {'myname': u'dupont', 'optname': u''}
       
   150     >>> map = [('myname', u'name', (call_transform_method('title'),)),
       
   151     ...        ('optname', u'MARKER', (optional,))]
       
   152     >>> mk_entity(row, map)
       
   153     {'name': u'Dupont', 'optname': None}
       
   154     """
       
   155     res = {}
       
   156     assert isinstance(row, dict)
       
   157     assert isinstance(map, list)
       
   158     for src, dest, funcs in map:
       
   159         try:
       
   160             res[dest] = row[src]
       
   161         except KeyError:
       
   162             continue
       
   163         try:
       
   164             for func in funcs:
       
   165                 res[dest] = func(res[dest])
       
   166                 if res[dest] is None:
       
   167                     break
       
   168         except ValueError as err:
       
   169             raise ValueError('error with %r field: %s' % (src, err)), None, sys.exc_info()[-1]
       
   170     return res
       
   171 
       
   172 
       
   173 # base sanitizing/coercing functions ###########################################
       
   174 
       
   175 @deprecated('[3.21] deprecated')
       
   176 def optional(value):
       
   177     """checker to filter optional field
       
   178 
       
   179     If value is undefined (ex: empty string), return None that will
       
   180     break the checkers validation chain
       
   181 
       
   182     General use is to add 'optional' check in first condition to avoid
       
   183     ValueError by further checkers
       
   184 
       
   185     >>> MAPPER = [(u'value', 'value', (optional, int))]
       
   186     >>> row = {'value': u'XXX'}
       
   187     >>> mk_entity(row, MAPPER)
       
   188     {'value': None}
       
   189     >>> row = {'value': u'100'}
       
   190     >>> mk_entity(row, MAPPER)
       
   191     {'value': 100}
       
   192     """
       
   193     if value:
       
   194         return value
       
   195     return None
       
   196 
       
   197 
       
   198 @deprecated('[3.21] deprecated')
       
   199 def required(value):
       
   200     """raise ValueError if value is empty
       
   201 
       
   202     This check should be often found in last position in the chain.
       
   203     """
       
   204     if value:
       
   205         return value
       
   206     raise ValueError("required")
       
   207 
       
   208 
       
   209 @deprecated('[3.21] deprecated')
       
   210 def todatetime(format='%d/%m/%Y'):
       
   211     """return a transformation function to turn string input value into a
       
   212     `datetime.datetime` instance, using given format.
       
   213 
       
   214     Follow it by `todate` or `totime` functions from `logilab.common.date` if
       
   215     you want a `date`/`time` instance instead of `datetime`.
       
   216     """
       
   217     def coerce(value):
       
   218         return strptime(value, format)
       
   219     return coerce
       
   220 
       
   221 
       
   222 @deprecated('[3.21] deprecated')
       
   223 def call_transform_method(methodname, *args, **kwargs):
       
   224     """return value returned by calling the given method on input"""
       
   225     def coerce(value):
       
   226         return getattr(value, methodname)(*args, **kwargs)
       
   227     return coerce
       
   228 
       
   229 
       
   230 @deprecated('[3.21] deprecated')
       
   231 def call_check_method(methodname, *args, **kwargs):
       
   232     """check value returned by calling the given method on input is true,
       
   233     else raise ValueError
       
   234     """
       
   235     def check(value):
       
   236         if getattr(value, methodname)(*args, **kwargs):
       
   237             return value
       
   238         raise ValueError('%s not verified on %r' % (methodname, value))
       
   239     return check
       
   240 
       
   241 
       
   242 # base integrity checking functions ############################################
       
   243 
       
   244 @deprecated('[3.21] deprecated')
       
   245 def check_doubles(buckets):
       
   246     """Extract the keys that have more than one item in their bucket."""
       
   247     return [(k, len(v)) for k, v in buckets.items() if len(v) > 1]
       
   248 
       
   249 
       
   250 @deprecated('[3.21] deprecated')
       
   251 def check_doubles_not_none(buckets):
       
   252     """Extract the keys that have more than one item in their bucket."""
       
   253     return [(k, len(v)) for k, v in buckets.items()
       
   254             if k is not None and len(v) > 1]
       
   255 
       
   256 
       
   257 class ObjectStore(object):
       
   258     """Store objects in memory for *faster* validation (development mode)
       
   259 
       
   260     But it will not enforce the constraints of the schema and hence will miss some problems
       
   261 
       
   262     >>> store = ObjectStore()
       
   263     >>> user = store.prepare_insert_entity('CWUser', login=u'johndoe')
       
   264     >>> group = store.prepare_insert_entity('CWUser', name=u'unknown')
       
   265     >>> store.prepare_insert_relation(user, 'in_group', group)
       
   266     """
       
   267     __metaclass__ = class_deprecated
       
   268     __deprecation_warning__ = '[3.21] use the new importer API'
       
   269 
       
   270     def __init__(self):
       
   271         self.items = []
       
   272         self.eids = {}
       
   273         self.types = {}
       
   274         self.relations = set()
       
   275         self.indexes = {}
       
   276 
       
   277     def prepare_insert_entity(self, etype, **data):
       
   278         """Given an entity type, attributes and inlined relations, return an eid for the entity that
       
   279         would be inserted with a real store.
       
   280         """
       
   281         data = attrdict(data)
       
   282         data['eid'] = eid = len(self.items)
       
   283         self.items.append(data)
       
   284         self.eids[eid] = data
       
   285         self.types.setdefault(etype, []).append(eid)
       
   286         return eid
       
   287 
       
   288     def prepare_update_entity(self, etype, eid, **kwargs):
       
   289         """Given an entity type and eid, updates the corresponding fake entity with specified
       
   290         attributes and inlined relations.
       
   291         """
       
   292         assert eid in self.types[etype], 'Trying to update with wrong type {}'.format(etype)
       
   293         data = self.eids[eid]
       
   294         data.update(kwargs)
       
   295 
       
   296     def prepare_insert_relation(self, eid_from, rtype, eid_to, **kwargs):
       
   297         """Store into the `relations` attribute that a relation ``rtype`` exists between entities
       
   298         with eids ``eid_from`` and ``eid_to``.
       
   299         """
       
   300         relation = eid_from, rtype, eid_to
       
   301         self.relations.add(relation)
       
   302         return relation
       
   303 
       
   304     def flush(self):
       
   305         """Nothing to flush for this store."""
       
   306         pass
       
   307 
       
   308     def commit(self):
       
   309         """Nothing to commit for this store."""
       
   310         return
       
   311 
       
   312     def finish(self):
       
   313         """Nothing to do once import is terminated for this store."""
       
   314         pass
       
   315 
       
   316     @property
       
   317     def nb_inserted_entities(self):
       
   318         return len(self.eids)
       
   319 
       
   320     @property
       
   321     def nb_inserted_types(self):
       
   322         return len(self.types)
       
   323 
       
   324     @property
       
   325     def nb_inserted_relations(self):
       
   326         return len(self.relations)
       
   327 
       
   328     @deprecated('[3.21] use prepare_insert_entity instead')
       
   329     def create_entity(self, etype, **data):
       
   330         self.prepare_insert_entity(etype, **data)
       
   331         return attrdict(data)
       
   332 
       
   333     @deprecated('[3.21] use prepare_insert_relation instead')
       
   334     def relate(self, eid_from, rtype, eid_to, **kwargs):
       
   335         self.prepare_insert_relation(eid_from, rtype, eid_to, **kwargs)
       
   336 
       
   337 
       
   338 class CWImportController(object):
       
   339     """Controller of the data import process.
       
   340 
       
   341     >>> ctl = CWImportController(store)
       
   342     >>> ctl.generators = list_of_data_generators
       
   343     >>> ctl.data = dict_of_data_tables
       
   344     >>> ctl.run()
       
   345     """
       
   346     __metaclass__ = class_deprecated
       
   347     __deprecation_warning__ = '[3.21] use the new importer API'
       
   348 
       
   349     def __init__(self, store, askerror=0, catcherrors=None, tell=tell,
       
   350                  commitevery=50):
       
   351         self.store = store
       
   352         self.generators = None
       
   353         self.data = {}
       
   354         self.errors = None
       
   355         self.askerror = askerror
       
   356         if  catcherrors is None:
       
   357             catcherrors = askerror
       
   358         self.catcherrors = catcherrors
       
   359         self.commitevery = commitevery # set to None to do a single commit
       
   360         self._tell = tell
       
   361 
       
   362     def check(self, type, key, value):
       
   363         self._checks.setdefault(type, {}).setdefault(key, []).append(value)
       
   364 
       
   365     def check_map(self, entity, key, map, default):
       
   366         try:
       
   367             entity[key] = map[entity[key]]
       
   368         except KeyError:
       
   369             self.check(key, entity[key], None)
       
   370             entity[key] = default
       
   371 
       
   372     def record_error(self, key, msg=None, type=None, value=None, tb=None):
       
   373         tmp = StringIO()
       
   374         if type is None:
       
   375             traceback.print_exc(file=tmp)
       
   376         else:
       
   377             traceback.print_exception(type, value, tb, file=tmp)
       
   378         # use a list to avoid counting a <nb lines> errors instead of one
       
   379         errorlog = self.errors.setdefault(key, [])
       
   380         if msg is None:
       
   381             errorlog.append(tmp.getvalue().splitlines())
       
   382         else:
       
   383             errorlog.append( (msg, tmp.getvalue().splitlines()) )
       
   384 
       
   385     def run(self):
       
   386         self.errors = {}
       
   387         if self.commitevery is None:
       
   388             self.tell('Will commit all or nothing.')
       
   389         else:
       
   390             self.tell('Will commit every %s iterations' % self.commitevery)
       
   391         for func, checks in self.generators:
       
   392             self._checks = {}
       
   393             func_name = func.__name__
       
   394             self.tell("Run import function '%s'..." % func_name)
       
   395             try:
       
   396                 func(self)
       
   397             except Exception:
       
   398                 if self.catcherrors:
       
   399                     self.record_error(func_name, 'While calling %s' % func.__name__)
       
   400                 else:
       
   401                     self._print_stats()
       
   402                     raise
       
   403             for key, func, title, help in checks:
       
   404                 buckets = self._checks.get(key)
       
   405                 if buckets:
       
   406                     err = func(buckets)
       
   407                     if err:
       
   408                         self.errors[title] = (help, err)
       
   409         try:
       
   410             txuuid = self.store.commit()
       
   411             if txuuid is not None:
       
   412                 self.tell('Transaction commited (txuuid: %s)' % txuuid)
       
   413         except QueryError as ex:
       
   414             self.tell('Transaction aborted: %s' % ex)
       
   415         self._print_stats()
       
   416         if self.errors:
       
   417             if self.askerror == 2 or (self.askerror and confirm('Display errors ?')):
       
   418                 from pprint import pformat
       
   419                 for errkey, error in self.errors.items():
       
   420                     self.tell("\n%s (%s): %d\n" % (error[0], errkey, len(error[1])))
       
   421                     self.tell(pformat(sorted(error[1])))
       
   422 
       
   423     def _print_stats(self):
       
   424         nberrors = sum(len(err) for err in self.errors.itervalues())
       
   425         self.tell('\nImport statistics: %i entities, %i types, %i relations and %i errors'
       
   426                   % (self.store.nb_inserted_entities,
       
   427                      self.store.nb_inserted_types,
       
   428                      self.store.nb_inserted_relations,
       
   429                      nberrors))
       
   430 
       
   431     def get_data(self, key):
       
   432         return self.data.get(key)
       
   433 
       
   434     def index(self, name, key, value, unique=False):
       
   435         """create a new index
       
   436 
       
   437         If unique is set to True, only first occurence will be kept not the following ones
       
   438         """
       
   439         if unique:
       
   440             try:
       
   441                 if value in self.store.indexes[name][key]:
       
   442                     return
       
   443             except KeyError:
       
   444                 # we're sure that one is the first occurence; so continue...
       
   445                 pass
       
   446         self.store.indexes.setdefault(name, {}).setdefault(key, []).append(value)
       
   447 
       
   448     def tell(self, msg):
       
   449         self._tell(msg)
       
   450 
       
   451     def iter_and_commit(self, datakey):
       
   452         """iter rows, triggering commit every self.commitevery iterations"""
       
   453         if self.commitevery is None:
       
   454             return self.get_data(datakey)
       
   455         else:
       
   456             return callfunc_every(self.store.commit,
       
   457                                   self.commitevery,
       
   458                                   self.get_data(datakey))
       
   459 
       
   460