124 def init(self, activated, source_entity): |
124 def init(self, activated, source_entity): |
125 super(DataFeedSource, self).init(activated, source_entity) |
125 super(DataFeedSource, self).init(activated, source_entity) |
126 self.parser_id = source_entity.parser |
126 self.parser_id = source_entity.parser |
127 self.load_mapping(source_entity._cw) |
127 self.load_mapping(source_entity._cw) |
128 |
128 |
129 def _get_parser(self, session, **kwargs): |
129 def _get_parser(self, cnx, **kwargs): |
130 return self.repo.vreg['parsers'].select( |
130 return self.repo.vreg['parsers'].select( |
131 self.parser_id, session, source=self, **kwargs) |
131 self.parser_id, cnx, source=self, **kwargs) |
132 |
132 |
133 def load_mapping(self, session): |
133 def load_mapping(self, cnx): |
134 self.mapping = {} |
134 self.mapping = {} |
135 self.mapping_idx = {} |
135 self.mapping_idx = {} |
136 try: |
136 try: |
137 parser = self._get_parser(session) |
137 parser = self._get_parser(cnx) |
138 except (RegistryNotFound, ObjectNotFound): |
138 except (RegistryNotFound, ObjectNotFound): |
139 return # no parser yet, don't go further |
139 return # no parser yet, don't go further |
140 self._load_mapping(session, parser=parser) |
140 self._load_mapping(cnx, parser=parser) |
141 |
141 |
142 def add_schema_config(self, schemacfg, checkonly=False, parser=None): |
142 def add_schema_config(self, schemacfg, checkonly=False, parser=None): |
143 """added CWSourceSchemaConfig, modify mapping accordingly""" |
143 """added CWSourceSchemaConfig, modify mapping accordingly""" |
144 if parser is None: |
144 if parser is None: |
145 parser = self._get_parser(schemacfg._cw) |
145 parser = self._get_parser(schemacfg._cw) |
236 self.exception('error while processing %s: %s', |
236 self.exception('error while processing %s: %s', |
237 url, exc) |
237 url, exc) |
238 error = True |
238 error = True |
239 return error |
239 return error |
240 |
240 |
241 def before_entity_insertion(self, session, lid, etype, eid, sourceparams): |
241 def before_entity_insertion(self, cnx, lid, etype, eid, sourceparams): |
242 """called by the repository when an eid has been attributed for an |
242 """called by the repository when an eid has been attributed for an |
243 entity stored here but the entity has not been inserted in the system |
243 entity stored here but the entity has not been inserted in the system |
244 table yet. |
244 table yet. |
245 |
245 |
246 This method must return the an Entity instance representation of this |
246 This method must return the an Entity instance representation of this |
247 entity. |
247 entity. |
248 """ |
248 """ |
249 entity = super(DataFeedSource, self).before_entity_insertion( |
249 entity = super(DataFeedSource, self).before_entity_insertion( |
250 session, lid, etype, eid, sourceparams) |
250 cnx, lid, etype, eid, sourceparams) |
251 entity.cw_edited['cwuri'] = lid.decode('utf-8') |
251 entity.cw_edited['cwuri'] = lid.decode('utf-8') |
252 entity.cw_edited.set_defaults() |
252 entity.cw_edited.set_defaults() |
253 sourceparams['parser'].before_entity_copy(entity, sourceparams) |
253 sourceparams['parser'].before_entity_copy(entity, sourceparams) |
254 return entity |
254 return entity |
255 |
255 |
256 def after_entity_insertion(self, session, lid, entity, sourceparams): |
256 def after_entity_insertion(self, cnx, lid, entity, sourceparams): |
257 """called by the repository after an entity stored here has been |
257 """called by the repository after an entity stored here has been |
258 inserted in the system table. |
258 inserted in the system table. |
259 """ |
259 """ |
260 relations = preprocess_inlined_relations(session, entity) |
260 relations = preprocess_inlined_relations(cnx, entity) |
261 if session.is_hook_category_activated('integrity'): |
261 if cnx.is_hook_category_activated('integrity'): |
262 entity.cw_edited.check(creation=True) |
262 entity.cw_edited.check(creation=True) |
263 self.repo.system_source.add_entity(session, entity) |
263 self.repo.system_source.add_entity(cnx, entity) |
264 entity.cw_edited.saved = entity._cw_is_saved = True |
264 entity.cw_edited.saved = entity._cw_is_saved = True |
265 sourceparams['parser'].after_entity_copy(entity, sourceparams) |
265 sourceparams['parser'].after_entity_copy(entity, sourceparams) |
266 # call hooks for inlined relations |
266 # call hooks for inlined relations |
267 call_hooks = self.repo.hm.call_hooks |
267 call_hooks = self.repo.hm.call_hooks |
268 if self.should_call_hooks: |
268 if self.should_call_hooks: |
269 for attr, value in relations: |
269 for attr, value in relations: |
270 call_hooks('before_add_relation', session, |
270 call_hooks('before_add_relation', cnx, |
271 eidfrom=entity.eid, rtype=attr, eidto=value) |
271 eidfrom=entity.eid, rtype=attr, eidto=value) |
272 call_hooks('after_add_relation', session, |
272 call_hooks('after_add_relation', cnx, |
273 eidfrom=entity.eid, rtype=attr, eidto=value) |
273 eidfrom=entity.eid, rtype=attr, eidto=value) |
274 |
274 |
275 def source_cwuris(self, session): |
275 def source_cwuris(self, cnx): |
276 sql = ('SELECT extid, eid, type FROM entities, cw_source_relation ' |
276 sql = ('SELECT extid, eid, type FROM entities, cw_source_relation ' |
277 'WHERE entities.eid=cw_source_relation.eid_from ' |
277 'WHERE entities.eid=cw_source_relation.eid_from ' |
278 'AND cw_source_relation.eid_to=%s' % self.eid) |
278 'AND cw_source_relation.eid_to=%s' % self.eid) |
279 return dict((b64decode(uri), (eid, type)) |
279 return dict((b64decode(uri), (eid, type)) |
280 for uri, eid, type in session.system_sql(sql).fetchall()) |
280 for uri, eid, type in cnx.system_sql(sql).fetchall()) |
281 |
281 |
282 def init_import_log(self, session, **kwargs): |
282 def init_import_log(self, cnx, **kwargs): |
283 dataimport = session.create_entity('CWDataImport', cw_import_of=self, |
283 dataimport = cnx.create_entity('CWDataImport', cw_import_of=self, |
284 start_timestamp=datetime.utcnow(), |
284 start_timestamp=datetime.utcnow(), |
285 **kwargs) |
285 **kwargs) |
286 dataimport.init() |
286 dataimport.init() |
287 return dataimport |
287 return dataimport |
288 |
288 |
289 |
289 |
290 class DataFeedParser(AppObject): |
290 class DataFeedParser(AppObject): |
291 __registry__ = 'parsers' |
291 __registry__ = 'parsers' |
292 |
292 |
293 def __init__(self, session, source, sourceuris=None, import_log=None, **kwargs): |
293 def __init__(self, cnx, source, sourceuris=None, import_log=None, **kwargs): |
294 super(DataFeedParser, self).__init__(session, **kwargs) |
294 super(DataFeedParser, self).__init__(cnx, **kwargs) |
295 self.source = source |
295 self.source = source |
296 self.sourceuris = sourceuris |
296 self.sourceuris = sourceuris |
297 self.import_log = import_log |
297 self.import_log = import_log |
298 self.stats = {'created': set(), 'updated': set(), 'checked': set()} |
298 self.stats = {'created': set(), 'updated': set(), 'checked': set()} |
299 |
299 |
343 |
343 |
344 def extid2entity(self, uri, etype, **sourceparams): |
344 def extid2entity(self, uri, etype, **sourceparams): |
345 """return an entity for the given uri. May return None if it should be |
345 """return an entity for the given uri. May return None if it should be |
346 skipped |
346 skipped |
347 """ |
347 """ |
348 session = self._cw |
348 cnx = self._cw |
349 # if cwsource is specified and repository has a source with the same |
349 # if cwsource is specified and repository has a source with the same |
350 # name, call extid2eid on that source so entity will be properly seen as |
350 # name, call extid2eid on that source so entity will be properly seen as |
351 # coming from this source |
351 # coming from this source |
352 source_uri = sourceparams.pop('cwsource', None) |
352 source_uri = sourceparams.pop('cwsource', None) |
353 if source_uri is not None and source_uri != 'system': |
353 if source_uri is not None and source_uri != 'system': |
354 source = session.repo.sources_by_uri.get(source_uri, self.source) |
354 source = cnx.repo.sources_by_uri.get(source_uri, self.source) |
355 else: |
355 else: |
356 source = self.source |
356 source = self.source |
357 sourceparams['parser'] = self |
357 sourceparams['parser'] = self |
358 if isinstance(uri, unicode): |
358 if isinstance(uri, unicode): |
359 uri = uri.encode('utf-8') |
359 uri = uri.encode('utf-8') |
360 try: |
360 try: |
361 eid = session.repo.extid2eid(source, str(uri), etype, session, |
361 eid = cnx.repo.extid2eid(source, str(uri), etype, cnx, |
362 sourceparams=sourceparams) |
362 sourceparams=sourceparams) |
363 except ValidationError as ex: |
363 except ValidationError as ex: |
364 # XXX use critical so they are seen during tests. Should consider |
364 # XXX use critical so they are seen during tests. Should consider |
365 # raise_on_error instead? |
365 # raise_on_error instead? |
366 self.source.critical('error while creating %s: %s', etype, ex) |
366 self.source.critical('error while creating %s: %s', etype, ex) |
409 is actually deleted. Always return True by default, put more sensible |
409 is actually deleted. Always return True by default, put more sensible |
410 stuff in sub-classes. |
410 stuff in sub-classes. |
411 """ |
411 """ |
412 return True |
412 return True |
413 |
413 |
414 def handle_deletion(self, config, session, myuris): |
414 def handle_deletion(self, config, cnx, myuris): |
415 if config['delete-entities'] and myuris: |
415 if config['delete-entities'] and myuris: |
416 byetype = {} |
416 byetype = {} |
417 for extid, (eid, etype) in myuris.iteritems(): |
417 for extid, (eid, etype) in myuris.iteritems(): |
418 if self.is_deleted(extid, etype, eid): |
418 if self.is_deleted(extid, etype, eid): |
419 byetype.setdefault(etype, []).append(str(eid)) |
419 byetype.setdefault(etype, []).append(str(eid)) |
420 for etype, eids in byetype.iteritems(): |
420 for etype, eids in byetype.iteritems(): |
421 self.warning('delete %s %s entities', len(eids), etype) |
421 self.warning('delete %s %s entities', len(eids), etype) |
422 session.set_cnxset() |
422 cnx.execute('DELETE %s X WHERE X eid IN (%s)' |
423 session.execute('DELETE %s X WHERE X eid IN (%s)' |
423 % (etype, ','.join(eids))) |
424 % (etype, ','.join(eids))) |
424 cnx.commit() |
425 session.commit() |
|
426 |
425 |
427 def update_if_necessary(self, entity, attrs): |
426 def update_if_necessary(self, entity, attrs): |
428 entity.complete(tuple(attrs)) |
427 entity.complete(tuple(attrs)) |
429 # check modification date and compare attribute values to only update |
428 # check modification date and compare attribute values to only update |
430 # what's actually needed |
429 # what's actually needed |