111 def init(self, activated, source_entity): |
111 def init(self, activated, source_entity): |
112 super(DataFeedSource, self).init(activated, source_entity) |
112 super(DataFeedSource, self).init(activated, source_entity) |
113 self.parser_id = source_entity.parser |
113 self.parser_id = source_entity.parser |
114 self.load_mapping(source_entity._cw) |
114 self.load_mapping(source_entity._cw) |
115 |
115 |
116 def _get_parser(self, session, **kwargs): |
116 def _get_parser(self, cnx, **kwargs): |
117 return self.repo.vreg['parsers'].select( |
117 return self.repo.vreg['parsers'].select( |
118 self.parser_id, session, source=self, **kwargs) |
118 self.parser_id, cnx, source=self, **kwargs) |
119 |
119 |
120 def load_mapping(self, session): |
120 def load_mapping(self, cnx): |
121 self.mapping = {} |
121 self.mapping = {} |
122 self.mapping_idx = {} |
122 self.mapping_idx = {} |
123 try: |
123 try: |
124 parser = self._get_parser(session) |
124 parser = self._get_parser(cnx) |
125 except (RegistryNotFound, ObjectNotFound): |
125 except (RegistryNotFound, ObjectNotFound): |
126 return # no parser yet, don't go further |
126 return # no parser yet, don't go further |
127 self._load_mapping(session, parser=parser) |
127 self._load_mapping(cnx, parser=parser) |
128 |
128 |
129 def add_schema_config(self, schemacfg, checkonly=False, parser=None): |
129 def add_schema_config(self, schemacfg, checkonly=False, parser=None): |
130 """added CWSourceSchemaConfig, modify mapping accordingly""" |
130 """added CWSourceSchemaConfig, modify mapping accordingly""" |
131 if parser is None: |
131 if parser is None: |
132 parser = self._get_parser(schemacfg._cw) |
132 parser = self._get_parser(schemacfg._cw) |
223 self.exception('error while processing %s: %s', |
223 self.exception('error while processing %s: %s', |
224 url, exc) |
224 url, exc) |
225 error = True |
225 error = True |
226 return error |
226 return error |
227 |
227 |
228 def before_entity_insertion(self, session, lid, etype, eid, sourceparams): |
228 def before_entity_insertion(self, cnx, lid, etype, eid, sourceparams): |
229 """called by the repository when an eid has been attributed for an |
229 """called by the repository when an eid has been attributed for an |
230 entity stored here but the entity has not been inserted in the system |
230 entity stored here but the entity has not been inserted in the system |
231 table yet. |
231 table yet. |
232 |
232 |
233 This method must return the an Entity instance representation of this |
233 This method must return the an Entity instance representation of this |
234 entity. |
234 entity. |
235 """ |
235 """ |
236 entity = super(DataFeedSource, self).before_entity_insertion( |
236 entity = super(DataFeedSource, self).before_entity_insertion( |
237 session, lid, etype, eid, sourceparams) |
237 cnx, lid, etype, eid, sourceparams) |
238 entity.cw_edited['cwuri'] = lid.decode('utf-8') |
238 entity.cw_edited['cwuri'] = lid.decode('utf-8') |
239 entity.cw_edited.set_defaults() |
239 entity.cw_edited.set_defaults() |
240 sourceparams['parser'].before_entity_copy(entity, sourceparams) |
240 sourceparams['parser'].before_entity_copy(entity, sourceparams) |
241 return entity |
241 return entity |
242 |
242 |
243 def after_entity_insertion(self, session, lid, entity, sourceparams): |
243 def after_entity_insertion(self, cnx, lid, entity, sourceparams): |
244 """called by the repository after an entity stored here has been |
244 """called by the repository after an entity stored here has been |
245 inserted in the system table. |
245 inserted in the system table. |
246 """ |
246 """ |
247 relations = preprocess_inlined_relations(session, entity) |
247 relations = preprocess_inlined_relations(cnx, entity) |
248 if session.is_hook_category_activated('integrity'): |
248 if cnx.is_hook_category_activated('integrity'): |
249 entity.cw_edited.check(creation=True) |
249 entity.cw_edited.check(creation=True) |
250 self.repo.system_source.add_entity(session, entity) |
250 self.repo.system_source.add_entity(cnx, entity) |
251 entity.cw_edited.saved = entity._cw_is_saved = True |
251 entity.cw_edited.saved = entity._cw_is_saved = True |
252 sourceparams['parser'].after_entity_copy(entity, sourceparams) |
252 sourceparams['parser'].after_entity_copy(entity, sourceparams) |
253 # call hooks for inlined relations |
253 # call hooks for inlined relations |
254 call_hooks = self.repo.hm.call_hooks |
254 call_hooks = self.repo.hm.call_hooks |
255 if self.should_call_hooks: |
255 if self.should_call_hooks: |
256 for attr, value in relations: |
256 for attr, value in relations: |
257 call_hooks('before_add_relation', session, |
257 call_hooks('before_add_relation', cnx, |
258 eidfrom=entity.eid, rtype=attr, eidto=value) |
258 eidfrom=entity.eid, rtype=attr, eidto=value) |
259 call_hooks('after_add_relation', session, |
259 call_hooks('after_add_relation', cnx, |
260 eidfrom=entity.eid, rtype=attr, eidto=value) |
260 eidfrom=entity.eid, rtype=attr, eidto=value) |
261 |
261 |
262 def source_cwuris(self, cnx): |
262 def source_cwuris(self, cnx): |
263 sql = ('SELECT extid, eid, type FROM entities, cw_source_relation ' |
263 sql = ('SELECT extid, eid, type FROM entities, cw_source_relation ' |
264 'WHERE entities.eid=cw_source_relation.eid_from ' |
264 'WHERE entities.eid=cw_source_relation.eid_from ' |
265 'AND cw_source_relation.eid_to=%s' % self.eid) |
265 'AND cw_source_relation.eid_to=%s' % self.eid) |
266 return dict((b64decode(uri), (eid, type)) |
266 return dict((b64decode(uri), (eid, type)) |
267 for uri, eid, type in cnx.system_sql(sql).fetchall()) |
267 for uri, eid, type in cnx.system_sql(sql).fetchall()) |
268 |
268 |
269 def init_import_log(self, session, **kwargs): |
269 def init_import_log(self, cnx, **kwargs): |
270 dataimport = session.create_entity('CWDataImport', cw_import_of=self, |
270 dataimport = cnx.create_entity('CWDataImport', cw_import_of=self, |
271 start_timestamp=datetime.utcnow(), |
271 start_timestamp=datetime.utcnow(), |
272 **kwargs) |
272 **kwargs) |
273 dataimport.init() |
273 dataimport.init() |
274 return dataimport |
274 return dataimport |
275 |
275 |
276 |
276 |
277 class DataFeedParser(AppObject): |
277 class DataFeedParser(AppObject): |
278 __registry__ = 'parsers' |
278 __registry__ = 'parsers' |
279 |
279 |
280 def __init__(self, session, source, sourceuris=None, import_log=None, **kwargs): |
280 def __init__(self, cnx, source, sourceuris=None, import_log=None, **kwargs): |
281 super(DataFeedParser, self).__init__(session, **kwargs) |
281 super(DataFeedParser, self).__init__(cnx, **kwargs) |
282 self.source = source |
282 self.source = source |
283 self.sourceuris = sourceuris |
283 self.sourceuris = sourceuris |
284 self.import_log = import_log |
284 self.import_log = import_log |
285 self.stats = {'created': set(), 'updated': set(), 'checked': set()} |
285 self.stats = {'created': set(), 'updated': set(), 'checked': set()} |
286 |
286 |
303 |
303 |
304 def extid2entity(self, uri, etype, **sourceparams): |
304 def extid2entity(self, uri, etype, **sourceparams): |
305 """return an entity for the given uri. May return None if it should be |
305 """return an entity for the given uri. May return None if it should be |
306 skipped |
306 skipped |
307 """ |
307 """ |
308 session = self._cw |
308 cnx = self._cw |
309 # if cwsource is specified and repository has a source with the same |
309 # if cwsource is specified and repository has a source with the same |
310 # name, call extid2eid on that source so entity will be properly seen as |
310 # name, call extid2eid on that source so entity will be properly seen as |
311 # coming from this source |
311 # coming from this source |
312 source_uri = sourceparams.pop('cwsource', None) |
312 source_uri = sourceparams.pop('cwsource', None) |
313 if source_uri is not None and source_uri != 'system': |
313 if source_uri is not None and source_uri != 'system': |
314 source = session.repo.sources_by_uri.get(source_uri, self.source) |
314 source = cnx.repo.sources_by_uri.get(source_uri, self.source) |
315 else: |
315 else: |
316 source = self.source |
316 source = self.source |
317 sourceparams['parser'] = self |
317 sourceparams['parser'] = self |
318 if isinstance(uri, unicode): |
318 if isinstance(uri, unicode): |
319 uri = uri.encode('utf-8') |
319 uri = uri.encode('utf-8') |
320 try: |
320 try: |
321 eid = session.repo.extid2eid(source, str(uri), etype, session, |
321 eid = cnx.repo.extid2eid(source, str(uri), etype, cnx, |
322 sourceparams=sourceparams) |
322 sourceparams=sourceparams) |
323 except ValidationError as ex: |
323 except ValidationError as ex: |
324 # XXX use critical so they are seen during tests. Should consider |
324 # XXX use critical so they are seen during tests. Should consider |
325 # raise_on_error instead? |
325 # raise_on_error instead? |
326 self.source.critical('error while creating %s: %s', etype, ex) |
326 self.source.critical('error while creating %s: %s', etype, ex) |
369 is actually deleted. Always return True by default, put more sensible |
369 is actually deleted. Always return True by default, put more sensible |
370 stuff in sub-classes. |
370 stuff in sub-classes. |
371 """ |
371 """ |
372 return True |
372 return True |
373 |
373 |
374 def handle_deletion(self, config, session, myuris): |
374 def handle_deletion(self, config, cnx, myuris): |
375 if config['delete-entities'] and myuris: |
375 if config['delete-entities'] and myuris: |
376 byetype = {} |
376 byetype = {} |
377 for extid, (eid, etype) in myuris.iteritems(): |
377 for extid, (eid, etype) in myuris.iteritems(): |
378 if self.is_deleted(extid, etype, eid): |
378 if self.is_deleted(extid, etype, eid): |
379 byetype.setdefault(etype, []).append(str(eid)) |
379 byetype.setdefault(etype, []).append(str(eid)) |
380 for etype, eids in byetype.iteritems(): |
380 for etype, eids in byetype.iteritems(): |
381 self.warning('delete %s %s entities', len(eids), etype) |
381 self.warning('delete %s %s entities', len(eids), etype) |
382 session.set_cnxset() |
382 cnx.execute('DELETE %s X WHERE X eid IN (%s)' |
383 session.execute('DELETE %s X WHERE X eid IN (%s)' |
383 % (etype, ','.join(eids))) |
384 % (etype, ','.join(eids))) |
384 cnx.commit() |
385 session.commit() |
|
386 |
385 |
387 def update_if_necessary(self, entity, attrs): |
386 def update_if_necessary(self, entity, attrs): |
388 entity.complete(tuple(attrs)) |
387 entity.complete(tuple(attrs)) |
389 # check modification date and compare attribute values to only update |
388 # check modification date and compare attribute values to only update |
390 # what's actually needed |
389 # what's actually needed |