sobjects/parsers.py
branchstable
changeset 7075 4751d77394b1
parent 7002 29f085f6177b
child 7351 ed66f236715d
equal deleted inserted replaced
7073:4ce9e536dd66 7075:4751d77394b1
       
     1 # copyright 2010-2011 LOGILAB S.A. (Paris, FRANCE), all rights reserved.
       
     2 # contact http://www.logilab.fr/ -- mailto:contact@logilab.fr
       
     3 #
       
     4 # This file is part of CubicWeb.
       
     5 #
       
     6 # CubicWeb is free software: you can redistribute it and/or modify it under the
       
     7 # terms of the GNU Lesser General Public License as published by the Free
       
     8 # Software Foundation, either version 2.1 of the License, or (at your option)
       
     9 # any later version.
       
    10 #
       
    11 # CubicWeb is distributed in the hope that it will be useful, but WITHOUT
       
    12 # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
       
    13 # FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for more
       
    14 # details.
       
    15 #
       
    16 # You should have received a copy of the GNU Lesser General Public License along
       
    17 # with CubicWeb.  If not, see <http://www.gnu.org/licenses/>.
       
    18 """datafeed parser for xml generated by cubicweb"""
       
    19 
       
    20 import urllib2
       
    21 import StringIO
       
    22 import os.path as osp
       
    23 from cookielib import CookieJar
       
    24 from datetime import datetime, timedelta
       
    25 
       
    26 from lxml import etree
       
    27 
       
    28 from logilab.common.date import todate, totime
       
    29 from logilab.common.textutils import splitstrip, text_to_dict
       
    30 
       
    31 from yams.constraints import BASE_CONVERTERS
       
    32 from yams.schema import role_name as rn
       
    33 
       
    34 from cubicweb import ValidationError, typed_eid
       
    35 from cubicweb.server.sources import datafeed
       
    36 
       
    37 def ensure_str_keys(dict):
       
    38     for key in dict:
       
    39         dict[str(key)] = dict.pop(key)
       
    40 
       
    41 # see cubicweb.web.views.xmlrss.SERIALIZERS
       
    42 DEFAULT_CONVERTERS = BASE_CONVERTERS.copy()
       
    43 DEFAULT_CONVERTERS['String'] = unicode
       
    44 DEFAULT_CONVERTERS['Password'] = lambda x: x.encode('utf8')
       
    45 def convert_date(ustr):
       
    46     return todate(datetime.strptime(ustr, '%Y-%m-%d'))
       
    47 DEFAULT_CONVERTERS['Date'] = convert_date
       
    48 def convert_datetime(ustr):
       
    49     if '.' in ustr: # assume %Y-%m-%d %H:%M:%S.mmmmmm
       
    50         ustr = ustr.split('.',1)[0]
       
    51     return datetime.strptime(ustr, '%Y-%m-%d %H:%M:%S')
       
    52 DEFAULT_CONVERTERS['Datetime'] = convert_datetime
       
    53 def convert_time(ustr):
       
    54     return totime(datetime.strptime(ustr, '%H:%M:%S'))
       
    55 DEFAULT_CONVERTERS['Time'] = convert_time
       
    56 def convert_interval(ustr):
       
    57     return time(seconds=int(ustr))
       
    58 DEFAULT_CONVERTERS['Interval'] = convert_interval
       
    59 
       
    60 # use a cookie enabled opener to use session cookie if any
       
    61 _OPENER = urllib2.build_opener()
       
    62 try:
       
    63     from logilab.common import urllib2ext
       
    64     _OPENER.add_handler(urllib2ext.HTTPGssapiAuthHandler())
       
    65 except ImportError: # python-kerberos not available
       
    66     pass
       
    67 _OPENER.add_handler(urllib2.HTTPCookieProcessor(CookieJar()))
       
    68 
       
    69 def extract_typed_attrs(eschema, stringdict, converters=DEFAULT_CONVERTERS):
       
    70     typeddict = {}
       
    71     for rschema in eschema.subject_relations():
       
    72         if rschema.final and rschema in stringdict:
       
    73             if rschema == 'eid':
       
    74                 continue
       
    75             attrtype = eschema.destination(rschema)
       
    76             typeddict[rschema.type] = converters[attrtype](stringdict[rschema])
       
    77     return typeddict
       
    78 
       
    79 def _entity_etree(parent):
       
    80     for node in list(parent):
       
    81         try:
       
    82             item = {'cwtype': unicode(node.tag),
       
    83                     'cwuri': node.attrib['cwuri'],
       
    84                     'eid': typed_eid(node.attrib['eid']),
       
    85                     }
       
    86         except KeyError:
       
    87             # cw < 3.11 compat mode XXX
       
    88             item = {'cwtype': unicode(node.tag),
       
    89                     'cwuri': node.find('cwuri').text,
       
    90                     'eid': typed_eid(node.find('eid').text),
       
    91                     }
       
    92         rels = {}
       
    93         for child in node:
       
    94             role = child.get('role')
       
    95             if child.get('role'):
       
    96                 # relation
       
    97                 related = rels.setdefault(role, {}).setdefault(child.tag, [])
       
    98                 related += [ritem for ritem, _ in _entity_etree(child)]
       
    99             else:
       
   100                 # attribute
       
   101                 item[child.tag] = unicode(child.text)
       
   102         yield item, rels
       
   103 
       
   104 def build_search_rql(etype, attrs):
       
   105     restrictions = []
       
   106     for attr in attrs:
       
   107         restrictions.append('X %(attr)s %%(%(attr)s)s' % {'attr': attr})
       
   108     return 'Any X WHERE X is %s, %s' % (etype, ','.join(restrictions))
       
   109 
       
   110 def rtype_role_rql(rtype, role):
       
   111     if role == 'object':
       
   112         return 'Y %s X WHERE X eid %%(x)s' % rtype
       
   113     else:
       
   114         return 'X %s Y WHERE X eid %%(x)s' % rtype
       
   115 
       
   116 
       
   117 def _check_no_option(action, options, eid, _):
       
   118     if options:
       
   119         msg = _("'%s' action doesn't take any options") % action
       
   120         raise ValidationError(eid, {rn('options', 'subject'): msg})
       
   121 
       
   122 def _check_linkattr_option(action, options, eid, _):
       
   123     if not 'linkattr' in options:
       
   124         msg = _("'%s' action require 'linkattr' option") % action
       
   125         raise ValidationError(eid, {rn('options', 'subject'): msg})
       
   126 
       
   127 
       
   128 class CWEntityXMLParser(datafeed.DataFeedParser):
       
   129     """datafeed parser for the 'xml' entity view"""
       
   130     __regid__ = 'cw.entityxml'
       
   131 
       
   132     action_options = {
       
   133         'copy': _check_no_option,
       
   134         'link-or-create': _check_linkattr_option,
       
   135         'link': _check_linkattr_option,
       
   136         }
       
   137 
       
   138     def __init__(self, *args, **kwargs):
       
   139         super(CWEntityXMLParser, self).__init__(*args, **kwargs)
       
   140         self.action_methods = {
       
   141             'copy': self.related_copy,
       
   142             'link-or-create': self.related_link_or_create,
       
   143             'link': self.related_link,
       
   144             }
       
   145 
       
   146     # mapping handling #########################################################
       
   147 
       
   148     def add_schema_config(self, schemacfg, checkonly=False):
       
   149         """added CWSourceSchemaConfig, modify mapping accordingly"""
       
   150         _ = self._cw._
       
   151         try:
       
   152             rtype = schemacfg.schema.rtype.name
       
   153         except AttributeError:
       
   154             msg = _("entity and relation types can't be mapped, only attributes "
       
   155                     "or relations")
       
   156             raise ValidationError(schemacfg.eid, {rn('cw_for_schema', 'subject'): msg})
       
   157         if schemacfg.options:
       
   158             options = text_to_dict(schemacfg.options)
       
   159         else:
       
   160             options = {}
       
   161         try:
       
   162             role = options.pop('role')
       
   163             if role not in ('subject', 'object'):
       
   164                 raise KeyError
       
   165         except KeyError:
       
   166             msg = _('"role=subject" or "role=object" must be specified in options')
       
   167             raise ValidationError(schemacfg.eid, {rn('options', 'subject'): msg})
       
   168         try:
       
   169             action = options.pop('action')
       
   170             self.action_options[action](action, options, schemacfg.eid, _)
       
   171         except KeyError:
       
   172             msg = _('"action" must be specified in options; allowed values are '
       
   173                     '%s') % ', '.join(self.action_methods)
       
   174             raise ValidationError(schemacfg.eid, {rn('options', 'subject'): msg})
       
   175         if not checkonly:
       
   176             if role == 'subject':
       
   177                 etype = schemacfg.schema.stype.name
       
   178                 ttype = schemacfg.schema.otype.name
       
   179             else:
       
   180                 etype = schemacfg.schema.otype.name
       
   181                 ttype = schemacfg.schema.stype.name
       
   182             etyperules = self.source.mapping.setdefault(etype, {})
       
   183             etyperules.setdefault((rtype, role, action), []).append(
       
   184                 (ttype, options) )
       
   185             self.source.mapping_idx[schemacfg.eid] = (
       
   186                 etype, rtype, role, action, ttype)
       
   187 
       
   188     def del_schema_config(self, schemacfg, checkonly=False):
       
   189         """deleted CWSourceSchemaConfig, modify mapping accordingly"""
       
   190         etype, rtype, role, action, ttype = self.source.mapping_idx[schemacfg.eid]
       
   191         rules = self.source.mapping[etype][(rtype, role, action)]
       
   192         rules = [x for x in rules if not x[0] == ttype]
       
   193         if not rules:
       
   194             del self.source.mapping[etype][(rtype, role, action)]
       
   195 
       
   196     # import handling ##########################################################
       
   197 
       
   198     def process(self, url, partialcommit=True):
       
   199         """IDataFeedParser main entry point"""
       
   200         # XXX suppression support according to source configuration. If set, get
       
   201         # all cwuri of entities from this source, and compare with newly
       
   202         # imported ones
       
   203         error = False
       
   204         for item, rels in self.parse(url):
       
   205             cwuri = item['cwuri']
       
   206             try:
       
   207                 self.process_item(item, rels)
       
   208                 if partialcommit:
       
   209                     # commit+set_pool instead of commit(reset_pool=False) to let
       
   210                     # other a chance to get our pool
       
   211                     self._cw.commit()
       
   212                     self._cw.set_pool()
       
   213             except ValidationError, exc:
       
   214                 if partialcommit:
       
   215                     self.source.error('Skipping %s because of validation error %s' % (cwuri, exc))
       
   216                     self._cw.rollback()
       
   217                     self._cw.set_pool()
       
   218                     error = True
       
   219                 else:
       
   220                     raise
       
   221         return error
       
   222 
       
   223     def parse(self, url):
       
   224         if not url.startswith('http'):
       
   225             stream = StringIO.StringIO(url)
       
   226         else:
       
   227             for mappedurl in HOST_MAPPING:
       
   228                 if url.startswith(mappedurl):
       
   229                     url = url.replace(mappedurl, HOST_MAPPING[mappedurl], 1)
       
   230                     break
       
   231             self.source.info('GET %s', url)
       
   232             stream = _OPENER.open(url)
       
   233         return _entity_etree(etree.parse(stream).getroot())
       
   234 
       
   235     def process_one(self, url):
       
   236         # XXX assert len(root.children) == 1
       
   237         for item, rels in self.parse(url):
       
   238             return self.process_item(item, rels)
       
   239 
       
   240     def process_item(self, item, rels):
       
   241         entity = self.extid2entity(str(item.pop('cwuri')),
       
   242                                    item.pop('cwtype'),
       
   243                                    item=item)
       
   244         if not (self.created_during_pull(entity)
       
   245                 or self.updated_during_pull(entity)):
       
   246             self.notify_updated(entity)
       
   247             item.pop('eid')
       
   248             # XXX check modification date
       
   249             attrs = extract_typed_attrs(entity.e_schema, item)
       
   250             entity.set_attributes(**attrs)
       
   251         for (rtype, role, action), rules in self.source.mapping.get(entity.__regid__, {}).iteritems():
       
   252             try:
       
   253                 rel = rels[role][rtype]
       
   254             except KeyError:
       
   255                 self.source.error('relation %s-%s doesn\'t seem exported in %s xml',
       
   256                                   rtype, role, entity.__regid__)
       
   257                 continue
       
   258             try:
       
   259                 actionmethod = self.action_methods[action]
       
   260             except KeyError:
       
   261                 raise Exception('Unknown action %s' % action)
       
   262             actionmethod(entity, rtype, role, rel, rules)
       
   263         return entity
       
   264 
       
   265     def before_entity_copy(self, entity, sourceparams):
       
   266         """IDataFeedParser callback"""
       
   267         attrs = extract_typed_attrs(entity.e_schema, sourceparams['item'])
       
   268         entity.cw_edited.update(attrs)
       
   269 
       
   270     def related_copy(self, entity, rtype, role, value, rules):
       
   271         """implementation of 'copy' action
       
   272 
       
   273         Takes no option.
       
   274         """
       
   275         assert not any(x[1] for x in rules), "'copy' action takes no option"
       
   276         ttypes = set([x[0] for x in rules])
       
   277         value = [item for item in value if item['cwtype'] in ttypes]
       
   278         eids = [] # local eids
       
   279         if not value:
       
   280             self._clear_relation(entity, rtype, role, ttypes)
       
   281             return
       
   282         for item in value:
       
   283             eids.append(self.process_one(self._complete_url(item)).eid)
       
   284         self._set_relation(entity, rtype, role, eids)
       
   285 
       
   286     def related_link(self, entity, rtype, role, value, rules):
       
   287         """implementation of 'link' action
       
   288 
       
   289         requires an options to control search of the linked entity.
       
   290         """
       
   291         for ttype, options in rules:
       
   292             assert 'linkattr' in options, (
       
   293                 "'link-or-create' action require a list of attributes used to "
       
   294                 "search if the entity already exists")
       
   295             self._related_link(entity, rtype, role, ttype, value, [options['linkattr']],
       
   296                                self._log_not_found)
       
   297 
       
   298     def related_link_or_create(self, entity, rtype, role, value, rules):
       
   299         """implementation of 'link-or-create' action
       
   300 
       
   301         requires an options to control search of the linked entity.
       
   302         """
       
   303         for ttype, options in rules:
       
   304             assert 'linkattr' in options, (
       
   305                 "'link-or-create' action require a list of attributes used to "
       
   306                 "search if the entity already exists")
       
   307             self._related_link(entity, rtype, role, ttype, value, [options['linkattr']],
       
   308                                self._create_not_found)
       
   309 
       
   310     def _log_not_found(self, entity, rtype, role, ritem, searchvalues):
       
   311         self.source.error('can find %s entity with attributes %s',
       
   312                           ritem['cwtype'], searchvalues)
       
   313 
       
   314     def _create_not_found(self, entity, rtype, role, ritem, searchvalues):
       
   315         ensure_str_keys(searchvalues) # XXX necessary with python < 2.6
       
   316         return self._cw.create_entity(ritem['cwtype'], **searchvalues).eid
       
   317 
       
   318     def _related_link(self, entity, rtype, role, ttype, value, searchattrs,
       
   319                       notfound_callback):
       
   320         eids = [] # local eids
       
   321         for item in value:
       
   322             if item['cwtype'] != ttype:
       
   323                 continue
       
   324             if not all(attr in item for attr in searchattrs):
       
   325                 # need to fetch related entity's xml
       
   326                 ritems = list(self.parse(self._complete_url(item, False)))
       
   327                 assert len(ritems) == 1, 'unexpected xml'
       
   328                 ritem = ritems[0][0] # list of 2-uples
       
   329                 assert all(attr in ritem for attr in searchattrs), \
       
   330                        'missing attribute, got %s expected keys %s' % (item, searchattrs)
       
   331             else:
       
   332                 ritem = item
       
   333             kwargs = dict((attr, ritem[attr]) for attr in searchattrs)
       
   334             rql = build_search_rql(item['cwtype'], kwargs)
       
   335             rset = self._cw.execute(rql, kwargs)
       
   336             if rset:
       
   337                 assert len(rset) == 1
       
   338                 eids.append(rset[0][0])
       
   339             else:
       
   340                 eid = notfound_callback(entity, rtype, role, ritem, kwargs)
       
   341                 if eid is not None:
       
   342                     eids.append(eid)
       
   343         if not eids:
       
   344             self._clear_relation(entity, rtype, role, (ttype,))
       
   345         else:
       
   346             self._set_relation(entity, rtype, role, eids)
       
   347 
       
   348     def _complete_url(self, item, add_relations=True):
       
   349         itemurl = item['cwuri'] + '?vid=xml'
       
   350         for rtype, role, _ in self.source.mapping.get(item['cwtype'], ()):
       
   351             itemurl += '&relation=%s_%s' % (rtype, role)
       
   352         return itemurl
       
   353 
       
   354     def _clear_relation(self, entity, rtype, role, ttypes):
       
   355         if entity.eid not in self.stats['created']:
       
   356             if len(ttypes) > 1:
       
   357                 typerestr = ', Y is IN(%s)' % ','.join(ttypes)
       
   358             else:
       
   359                 typerestr = ', Y is %s' % ','.join(ttypes)
       
   360             self._cw.execute('DELETE ' + rtype_role_rql(rtype, role) + typerestr,
       
   361                              {'x': entity.eid})
       
   362 
       
   363     def _set_relation(self, entity, rtype, role, eids):
       
   364         eidstr = ','.join(str(eid) for eid in eids)
       
   365         rql = rtype_role_rql(rtype, role)
       
   366         self._cw.execute('DELETE %s, NOT Y eid IN (%s)' % (rql, eidstr),
       
   367                          {'x': entity.eid})
       
   368         if role == 'object':
       
   369             rql = 'SET %s, Y eid IN (%s), NOT Y %s X' % (rql, eidstr, rtype)
       
   370         else:
       
   371             rql = 'SET %s, Y eid IN (%s), NOT X %s Y' % (rql, eidstr, rtype)
       
   372         self._cw.execute(rql, {'x': entity.eid})
       
   373 
       
   374 def registration_callback(vreg):
       
   375     vreg.register_all(globals().values(), __name__)
       
   376     global HOST_MAPPING
       
   377     HOST_MAPPING = {}
       
   378     if vreg.config.apphome:
       
   379         host_mapping_file = osp.join(vreg.config.apphome, 'hostmapping.py')
       
   380         if osp.exists(host_mapping_file):
       
   381             HOST_MAPPING = eval(file(host_mapping_file).read())
       
   382             vreg.info('using host mapping %s from %s', HOST_MAPPING, host_mapping_file)