25 from datetime import datetime, timedelta |
25 from datetime import datetime, timedelta |
26 from base64 import b64decode |
26 from base64 import b64decode |
27 from cookielib import CookieJar |
27 from cookielib import CookieJar |
28 |
28 |
29 from lxml import etree |
29 from lxml import etree |
|
30 from logilab.mtconverter import xml_escape |
30 |
31 |
31 from cubicweb import RegistryNotFound, ObjectNotFound, ValidationError, UnknownEid |
32 from cubicweb import RegistryNotFound, ObjectNotFound, ValidationError, UnknownEid |
32 from cubicweb.server.sources import AbstractSource |
33 from cubicweb.server.sources import AbstractSource |
33 from cubicweb.appobject import AppObject |
34 from cubicweb.appobject import AppObject |
34 |
35 |
69 'default': True, |
70 'default': True, |
70 'help': ('Should already imported entities not found anymore on the ' |
71 'help': ('Should already imported entities not found anymore on the ' |
71 'external source be deleted?'), |
72 'external source be deleted?'), |
72 'group': 'datafeed-source', 'level': 2, |
73 'group': 'datafeed-source', 'level': 2, |
73 }), |
74 }), |
74 |
75 ('logs-lifetime', |
|
76 {'type': 'time', |
|
77 'default': '10d', |
|
78 'help': ('Time before logs from datafeed imports are deleted.'), |
|
79 'group': 'datafeed-source', 'level': 2, |
|
80 }), |
75 ) |
81 ) |
76 def __init__(self, repo, source_config, eid=None): |
82 def __init__(self, repo, source_config, eid=None): |
77 AbstractSource.__init__(self, repo, source_config, eid) |
83 AbstractSource.__init__(self, repo, source_config, eid) |
78 self.update_config(None, self.check_conf_dict(eid, source_config, |
84 self.update_config(None, self.check_conf_dict(eid, source_config, |
79 fail_if_unknown=False)) |
85 fail_if_unknown=False)) |
186 def _pull_data(self, session, force=False, raise_on_error=False): |
192 def _pull_data(self, session, force=False, raise_on_error=False): |
187 if self.config['delete-entities']: |
193 if self.config['delete-entities']: |
188 myuris = self.source_cwuris(session) |
194 myuris = self.source_cwuris(session) |
189 else: |
195 else: |
190 myuris = None |
196 myuris = None |
191 parser = self._get_parser(session, sourceuris=myuris) |
197 importlog = self.init_import_log(session) |
|
198 parser = self._get_parser(session, sourceuris=myuris, import_log=importlog) |
192 if self.process_urls(parser, self.urls, raise_on_error): |
199 if self.process_urls(parser, self.urls, raise_on_error): |
193 self.warning("some error occured, don't attempt to delete entities") |
200 self.warning("some error occured, don't attempt to delete entities") |
194 elif self.config['delete-entities'] and myuris: |
201 elif self.config['delete-entities'] and myuris: |
195 byetype = {} |
202 byetype = {} |
196 for eid, etype in myuris.values(): |
203 for eid, etype in myuris.values(): |
198 self.error('delete %s entities %s', self.uri, byetype) |
205 self.error('delete %s entities %s', self.uri, byetype) |
199 for etype, eids in byetype.iteritems(): |
206 for etype, eids in byetype.iteritems(): |
200 session.execute('DELETE %s X WHERE X eid IN (%s)' |
207 session.execute('DELETE %s X WHERE X eid IN (%s)' |
201 % (etype, ','.join(eids))) |
208 % (etype, ','.join(eids))) |
202 self.update_latest_retrieval(session) |
209 self.update_latest_retrieval(session) |
203 return parser.stats |
210 stats = parser.stats |
|
211 if stats.get('created'): |
|
212 importlog.record_info('added %s entities' % len(stats['created'])) |
|
213 if stats.get('updated'): |
|
214 importlog.record_info('updated %s entities' % len(stats['updated'])) |
|
215 importlog.write_log(session, end_timestamp=self.latest_retrieval) |
|
216 return stats |
204 |
217 |
205 def process_urls(self, parser, urls, raise_on_error=False): |
218 def process_urls(self, parser, urls, raise_on_error=False): |
206 error = False |
219 error = False |
207 for url in urls: |
220 for url in urls: |
208 self.info('pulling data from %s', url) |
221 self.info('pulling data from %s', url) |
253 'WHERE entities.eid=cw_source_relation.eid_from ' |
266 'WHERE entities.eid=cw_source_relation.eid_from ' |
254 'AND cw_source_relation.eid_to=%s' % self.eid) |
267 'AND cw_source_relation.eid_to=%s' % self.eid) |
255 return dict((b64decode(uri), (eid, type)) |
268 return dict((b64decode(uri), (eid, type)) |
256 for uri, eid, type in session.system_sql(sql)) |
269 for uri, eid, type in session.system_sql(sql)) |
257 |
270 |
|
271 def init_import_log(self, session, **kwargs): |
|
272 dataimport = session.create_entity('CWDataImport', cw_import_of=self, |
|
273 start_timestamp=datetime.utcnow(), |
|
274 **kwargs) |
|
275 return dataimport |
258 |
276 |
259 class DataFeedParser(AppObject): |
277 class DataFeedParser(AppObject): |
260 __registry__ = 'parsers' |
278 __registry__ = 'parsers' |
261 |
279 |
262 def __init__(self, session, source, sourceuris=None, **kwargs): |
280 def __init__(self, session, source, sourceuris=None, import_log=None, **kwargs): |
263 super(DataFeedParser, self).__init__(session, **kwargs) |
281 super(DataFeedParser, self).__init__(session, **kwargs) |
264 self.source = source |
282 self.source = source |
265 self.sourceuris = sourceuris |
283 self.sourceuris = sourceuris |
|
284 self.import_log = import_log |
266 self.stats = {'created': set(), |
285 self.stats = {'created': set(), |
267 'updated': set()} |
286 'updated': set()} |
268 |
287 |
269 def add_schema_config(self, schemacfg, checkonly=False): |
288 def add_schema_config(self, schemacfg, checkonly=False): |
270 """added CWSourceSchemaConfig, modify mapping accordingly""" |
289 """added CWSourceSchemaConfig, modify mapping accordingly""" |