Record a log of datafeed source imports (closes #2026097)
authorJulien Cristau <julien.cristau@logilab.fr>
Fri, 21 Oct 2011 14:32:37 +0200
changeset 7995 9a9f35ef418c
parent 7994 af3fb709c061
child 7996 8de58d2674d6
Record a log of datafeed source imports (closes #2026097) Formatting, css and js stolen from narval.
devtools/fill.py
entities/sources.py
entity.py
hooks/__init__.py
misc/migration/3.14.0_Any.py
schemas/base.py
server/sources/datafeed.py
server/test/unittest_msplanner.py
sobjects/parsers.py
test/unittest_schema.py
web/data/cubicweb.log.css
web/data/cubicweb.log.js
web/views/cwsources.py
--- a/devtools/fill.py	Fri Oct 21 14:32:37 2011 +0200
+++ b/devtools/fill.py	Fri Oct 21 14:32:37 2011 +0200
@@ -20,12 +20,14 @@
 
 __docformat__ = "restructuredtext en"
 
+import logging
 from random import randint, choice
 from copy import deepcopy
 from datetime import datetime, date, time, timedelta
 from decimal import Decimal
 
 from logilab.common import attrdict
+from logilab.mtconverter import xml_escape
 from yams.constraints import (SizeConstraint, StaticVocabularyConstraint,
                               IntervalBoundConstraint, BoundaryConstraint,
                               Attribute, actual_value)
@@ -238,6 +240,14 @@
         # raise exception
         return u'text/plain'
 
+    def generate_CWDataImport_log(self, entity, index, **kwargs):
+        # content_format attribute of EmailPart has no vocabulary constraint, we
+        # need this method else stupid values will be set which make mtconverter
+        # raise exception
+        logs =  [u'%s\t%s\t%s\t%s<br/>' % (logging.ERROR, 'http://url.com?arg1=hop&arg2=hip',
+                                           1, xml_escape('hjoio&oio"'))]
+        return u'<br/>'.join(logs)
+
 
 class autoextend(type):
     def __new__(mcs, name, bases, classdict):
--- a/entities/sources.py	Fri Oct 21 14:32:37 2011 +0200
+++ b/entities/sources.py	Fri Oct 21 14:32:37 2011 +0200
@@ -21,9 +21,11 @@
 
 import re
 from socket import gethostname
+import logging
 
 from logilab.common.textutils import text_to_dict
 from logilab.common.configuration import OptionError
+from logilab.mtconverter import xml_escape
 
 from cubicweb import ValidationError
 from cubicweb.entities import AnyEntity, fetch_config
@@ -131,3 +133,52 @@
     @property
     def cwsource(self):
         return self.cw_for_source[0]
+
+
+class CWDataImport(AnyEntity):
+    __regid__ = 'CWDataImport'
+
+    def __init__(self, *args, **kwargs):
+        super(CWDataImport, self).__init__(*args, **kwargs)
+        self._logs = []
+
+    def dc_title(self):
+        return '%s [%s]' % (self.printable_value('start_timestamp'),
+                            self.printable_value('status'))
+
+    @property
+    def cwsource(self):
+        return self.cw_import_of[0]
+
+    def record_debug(self, msg, path=None, line=None):
+        self._log(logging.DEBUG, msg, path, line)
+        self.debug(msg)
+
+    def record_info(self, msg, path=None, line=None):
+        self._log(logging.INFO, msg, path, line)
+        self.info(msg)
+
+    def record_warning(self, msg, path=None, line=None):
+        self._log(logging.WARNING, msg, path, line)
+        self.warning(msg)
+
+    def record_error(self, msg, path=None, line=None):
+        self._status = u'failed'
+        self._log(logging.ERROR, msg, path, line)
+        self.error(msg)
+
+    def record_fatal(self, msg, path=None, line=None):
+        self._status = u'failed'
+        self._log(logging.FATAL, msg, path, line)
+        self.fatal(msg)
+
+    def _log(self, severity, msg, path=None, line=None):
+        encodedmsg =  u'%s\t%s\t%s\t%s<br/>' % (severity, path or u'',
+                                                line or u'', xml_escape(msg))
+        self._logs.append(encodedmsg)
+
+    def write_log(self, session, **kwargs):
+        if 'status' not in kwargs:
+            kwargs['status'] = getattr(self, '_status', u'success')
+        self.set_attributes(log=u'<br/>'.join(self._logs), **kwargs)
+        self._logs = []
--- a/entity.py	Fri Oct 21 14:32:37 2011 +0200
+++ b/entity.py	Fri Oct 21 14:32:37 2011 +0200
@@ -835,7 +835,7 @@
             var = varmaker.next()
             rql.append('%s %s %s' % (V, attr, var))
             selected.append((attr, var))
-        # +1 since this doen't include the main variable
+        # +1 since this doesn't include the main variable
         lastattr = len(selected) + 1
         # don't fetch extra relation if attributes specified or of the entity is
         # coming from an external source (may lead to error)
--- a/hooks/__init__.py	Fri Oct 21 14:32:37 2011 +0200
+++ b/hooks/__init__.py	Fri Oct 21 14:32:37 2011 +0200
@@ -59,13 +59,23 @@
                     continue
                 session = repo.internal_session(safe=True)
                 try:
-                    stats = source.pull_data(session)
-                    if stats.get('created'):
-                        source.info('added %s entities', len(stats['created']))
-                    if stats.get('updated'):
-                        source.info('updated %s entities', len(stats['updated']))
+                    source.pull_data(session)
                 except Exception, exc:
                     session.exception('while trying to update feed %s', source)
                 finally:
                     session.close()
         self.repo.looping_task(60, update_feeds, self.repo)
+
+        def expire_dataimports(repo=self.repo):
+            for source in repo.sources_by_eid.itervalues():
+                if (not source.copy_based_source
+                    or not repo.config.source_enabled(source)):
+                    continue
+                session = repo.internal_session()
+                try:
+                    mindate = datetime.now() - timedelta(seconds=source.config['logs-lifetime'])
+                    session.execute('DELETE CWDataImport X WHERE X start_timestamp < %(time)s', {'time': mindate})
+                    session.commit()
+                finally:
+                    session.close()
+        self.repo.looping_task(60*60*24, expire_dataimports, self.repo)
--- a/misc/migration/3.14.0_Any.py	Fri Oct 21 14:32:37 2011 +0200
+++ b/misc/migration/3.14.0_Any.py	Fri Oct 21 14:32:37 2011 +0200
@@ -1,2 +1,3 @@
 config['rql-cache-size'] = config['rql-cache-size'] * 10
 
+add_entity_type('CWDataImport')
--- a/schemas/base.py	Fri Oct 21 14:32:37 2011 +0200
+++ b/schemas/base.py	Fri Oct 21 14:32:37 2011 +0200
@@ -305,6 +305,24 @@
     cardinality = '1*'
     composite = 'object'
 
+
+class CWDataImport(EntityType):
+    __permissions__ = ENTITY_MANAGERS_PERMISSIONS
+    start_timestamp = TZDatetime()
+    end_timestamp = TZDatetime()
+    log = String()
+    status = String(required=True, internationalizable=True, indexed=True,
+                    default='in progress',
+                    vocabulary=[_('in progress'), _('success'), _('failed')])
+
+class cw_import_of(RelationDefinition):
+    __permissions__ = RELATION_MANAGERS_PERMISSIONS
+    subject = 'CWDataImport'
+    object = 'CWSource'
+    cardinality = '1*'
+    composite = 'object'
+
+
 class CWSourceSchemaConfig(EntityType):
     __permissions__ = ENTITY_MANAGERS_PERMISSIONS
     cw_for_source = SubjectRelation(
--- a/server/sources/datafeed.py	Fri Oct 21 14:32:37 2011 +0200
+++ b/server/sources/datafeed.py	Fri Oct 21 14:32:37 2011 +0200
@@ -27,6 +27,7 @@
 from cookielib import CookieJar
 
 from lxml import etree
+from logilab.mtconverter import xml_escape
 
 from cubicweb import RegistryNotFound, ObjectNotFound, ValidationError, UnknownEid
 from cubicweb.server.sources import AbstractSource
@@ -71,7 +72,12 @@
                    'external source be deleted?'),
           'group': 'datafeed-source', 'level': 2,
           }),
-
+        ('logs-lifetime',
+         {'type': 'time',
+          'default': '10d',
+          'help': ('Time before logs from datafeed imports are deleted.'),
+          'group': 'datafeed-source', 'level': 2,
+          }),
         )
     def __init__(self, repo, source_config, eid=None):
         AbstractSource.__init__(self, repo, source_config, eid)
@@ -188,7 +194,8 @@
             myuris = self.source_cwuris(session)
         else:
             myuris = None
-        parser = self._get_parser(session, sourceuris=myuris)
+        importlog = self.init_import_log(session)
+        parser = self._get_parser(session, sourceuris=myuris, import_log=importlog)
         if self.process_urls(parser, self.urls, raise_on_error):
             self.warning("some error occured, don't attempt to delete entities")
         elif self.config['delete-entities'] and myuris:
@@ -200,7 +207,13 @@
                 session.execute('DELETE %s X WHERE X eid IN (%s)'
                                 % (etype, ','.join(eids)))
         self.update_latest_retrieval(session)
-        return parser.stats
+        stats = parser.stats
+        if stats.get('created'):
+            importlog.record_info('added %s entities' % len(stats['created']))
+        if stats.get('updated'):
+            importlog.record_info('updated %s entities' % len(stats['updated']))
+        importlog.write_log(session, end_timestamp=self.latest_retrieval)
+        return stats
 
     def process_urls(self, parser, urls, raise_on_error=False):
         error = False
@@ -255,14 +268,20 @@
         return dict((b64decode(uri), (eid, type))
                     for uri, eid, type in session.system_sql(sql))
 
+    def init_import_log(self, session, **kwargs):
+        dataimport = session.create_entity('CWDataImport', cw_import_of=self,
+                                           start_timestamp=datetime.utcnow(),
+                                           **kwargs)
+        return dataimport
 
 class DataFeedParser(AppObject):
     __registry__ = 'parsers'
 
-    def __init__(self, session, source, sourceuris=None, **kwargs):
+    def __init__(self, session, source, sourceuris=None, import_log=None, **kwargs):
         super(DataFeedParser, self).__init__(session, **kwargs)
         self.source = source
         self.sourceuris = sourceuris
+        self.import_log = import_log
         self.stats = {'created': set(),
                       'updated': set()}
 
--- a/server/test/unittest_msplanner.py	Fri Oct 21 14:32:37 2011 +0200
+++ b/server/test/unittest_msplanner.py	Fri Oct 21 14:32:37 2011 +0200
@@ -64,7 +64,7 @@
 
 X_ALL_SOLS = sorted([{'X': 'Affaire'}, {'X': 'BaseTransition'}, {'X': 'Basket'},
                      {'X': 'Bookmark'}, {'X': 'CWAttribute'}, {'X': 'CWCache'},
-                     {'X': 'CWConstraint'}, {'X': 'CWConstraintType'}, {'X': 'CWEType'},
+                     {'X': 'CWConstraint'}, {'X': 'CWConstraintType'}, {'X': 'CWDataImport'}, {'X': 'CWEType'},
                      {'X': 'CWGroup'}, {'X': 'CWPermission'}, {'X': 'CWProperty'},
                      {'X': 'CWRType'}, {'X': 'CWRelation'},
                      {'X': 'CWSource'}, {'X': 'CWSourceHostConfig'}, {'X': 'CWSourceSchemaConfig'},
@@ -907,6 +907,7 @@
         ALL_SOLS = X_ALL_SOLS[:]
         ALL_SOLS.remove({'X': 'CWSourceHostConfig'}) # not authorized
         ALL_SOLS.remove({'X': 'CWSourceSchemaConfig'}) # not authorized
+        ALL_SOLS.remove({'X': 'CWDataImport'}) # not authorized
         self._test('Any MAX(X)',
                    [('FetchStep', [('Any E WHERE E type "X", E is Note', [{'E': 'Note'}])],
                      [self.cards, self.system],  None, {'E': 'table1.C0'}, []),
@@ -957,7 +958,7 @@
         ueid = self.session.user.eid
         X_ET_ALL_SOLS = []
         for s in X_ALL_SOLS:
-            if s in ({'X': 'CWSourceHostConfig'}, {'X': 'CWSourceSchemaConfig'}):
+            if s in ({'X': 'CWSourceHostConfig'}, {'X': 'CWSourceSchemaConfig'}, {'X': 'CWDataImport'}):
                 continue # not authorized
             ets = {'ET': 'CWEType'}
             ets.update(s)
@@ -990,7 +991,8 @@
                                         [{'X': 'BaseTransition', 'ET': 'CWEType'},
                                          {'X': 'Bookmark', 'ET': 'CWEType'}, {'X': 'CWAttribute', 'ET': 'CWEType'},
                                          {'X': 'CWCache', 'ET': 'CWEType'}, {'X': 'CWConstraint', 'ET': 'CWEType'},
-                                         {'X': 'CWConstraintType', 'ET': 'CWEType'}, {'X': 'CWEType', 'ET': 'CWEType'},
+                                         {'X': 'CWConstraintType', 'ET': 'CWEType'},
+                                         {'X': 'CWEType', 'ET': 'CWEType'},
                                          {'X': 'CWGroup', 'ET': 'CWEType'}, {'X': 'CWPermission', 'ET': 'CWEType'},
                                          {'X': 'CWProperty', 'ET': 'CWEType'}, {'X': 'CWRType', 'ET': 'CWEType'},
                                          {'X': 'CWSource', 'ET': 'CWEType'},
@@ -2661,7 +2663,7 @@
                      None, {'X': 'table0.C0'}, []),
                     ('UnionStep', None, None,
                      [('OneFetchStep',
-                       [(u'Any X WHERE X owned_by U, U login "anon", U is CWUser, X is IN(Affaire, BaseTransition, Basket, Bookmark, CWAttribute, CWCache, CWConstraint, CWConstraintType, CWEType, CWGroup, CWPermission, CWProperty, CWRType, CWRelation, CWSource, CWSourceHostConfig, CWSourceSchemaConfig, CWUniqueTogetherConstraint, CWUser, Division, Email, EmailAddress, EmailPart, EmailThread, ExternalUri, File, Folder, Personne, RQLExpression, Societe, SubDivision, SubWorkflowExitPoint, Tag, TrInfo, Transition, Workflow, WorkflowTransition)',
+                       [(u'Any X WHERE X owned_by U, U login "anon", U is CWUser, X is IN(Affaire, BaseTransition, Basket, Bookmark, CWAttribute, CWCache, CWConstraint, CWConstraintType, CWDataImport, CWEType, CWGroup, CWPermission, CWProperty, CWRType, CWRelation, CWSource, CWSourceHostConfig, CWSourceSchemaConfig, CWUniqueTogetherConstraint, CWUser, Division, Email, EmailAddress, EmailPart, EmailThread, ExternalUri, File, Folder, Personne, RQLExpression, Societe, SubDivision, SubWorkflowExitPoint, Tag, TrInfo, Transition, Workflow, WorkflowTransition)',
                          [{'U': 'CWUser', 'X': 'Affaire'},
                           {'U': 'CWUser', 'X': 'BaseTransition'},
                           {'U': 'CWUser', 'X': 'Basket'},
@@ -2670,6 +2672,7 @@
                           {'U': 'CWUser', 'X': 'CWCache'},
                           {'U': 'CWUser', 'X': 'CWConstraint'},
                           {'U': 'CWUser', 'X': 'CWConstraintType'},
+                          {'U': 'CWUser', 'X': 'CWDataImport'},
                           {'U': 'CWUser', 'X': 'CWEType'},
                           {'U': 'CWUser', 'X': 'CWGroup'},
                           {'U': 'CWUser', 'X': 'CWPermission'},
--- a/sobjects/parsers.py	Fri Oct 21 14:32:37 2011 +0200
+++ b/sobjects/parsers.py	Fri Oct 21 14:32:37 2011 +0200
@@ -233,13 +233,13 @@
             try:
                 related_items = rels[role][rtype]
             except KeyError:
-                self.source.error('relation %s-%s not found in xml export of %s',
-                                  rtype, role, etype)
+                self.import_log.record_error('relation %s-%s not found in xml export of %s'
+                                             % (rtype, role, etype))
                 continue
             try:
                 linker = self.select_linker(action, rtype, role, entity)
             except RegistryException:
-                self.source.error('no linker for action %s', action)
+                self.import_log.record_error('no linker for action %s' % action)
             else:
                 linker.link_items(related_items, rules)
 
@@ -430,15 +430,15 @@
         def issubset(x,y):
             return all(z in y for z in x)
         eids = [] # local eids
-        source = self.parser.source
+        log = self.parser.import_log
         for item, rels in others:
             if item['cwtype'] != ttype:
                 continue
             if not issubset(searchattrs, item):
                 item, rels = self.parser.complete_item(item, rels)
                 if not issubset(searchattrs, item):
-                    source.error('missing attribute, got %s expected keys %s',
-                                 item, searchattrs)
+                    log.record_error('missing attribute, got %s expected keys %s'
+                                     % (item, searchattrs))
                     continue
             # XXX str() needed with python < 2.6
             kwargs = dict((str(attr), item[attr]) for attr in searchattrs)
@@ -449,11 +449,11 @@
                 entity = self._cw.create_entity(item['cwtype'], **kwargs)
             else:
                 if len(targets) > 1:
-                    source.error('ambiguous link: found %s entity %s with attributes %s',
-                                 len(targets), item['cwtype'], kwargs)
+                    log.record_error('ambiguous link: found %s entity %s with attributes %s'
+                                     % (len(targets), item['cwtype'], kwargs))
                 else:
-                    source.error('can not find %s entity with attributes %s',
-                                 item['cwtype'], kwargs)
+                    log.record_error('can not find %s entity with attributes %s'
+                                     % (item['cwtype'], kwargs))
                 continue
             eids.append(entity.eid)
             self.parser.process_relations(entity, rels)
--- a/test/unittest_schema.py	Fri Oct 21 14:32:37 2011 +0200
+++ b/test/unittest_schema.py	Fri Oct 21 14:32:37 2011 +0200
@@ -161,8 +161,8 @@
         entities = sorted([str(e) for e in schema.entities()])
         expected_entities = ['BaseTransition', 'BigInt', 'Bookmark', 'Boolean', 'Bytes', 'Card',
                              'Date', 'Datetime', 'Decimal',
-                             'CWCache', 'CWConstraint', 'CWConstraintType', 'CWEType',
-                             'CWAttribute', 'CWGroup', 'EmailAddress', 'CWRelation',
+                             'CWCache', 'CWConstraint', 'CWConstraintType', 'CWDataImport',
+                             'CWEType', 'CWAttribute', 'CWGroup', 'EmailAddress', 'CWRelation',
                              'CWPermission', 'CWProperty', 'CWRType',
                              'CWSource', 'CWSourceHostConfig', 'CWSourceSchemaConfig',
                              'CWUniqueTogetherConstraint', 'CWUser',
@@ -183,12 +183,12 @@
                               'constrained_by', 'constraint_of',
                               'content', 'content_format',
                               'created_by', 'creation_date', 'cstrtype', 'custom_workflow',
-                              'cwuri', 'cw_for_source', 'cw_host_config_of', 'cw_schema', 'cw_source',
+                              'cwuri', 'cw_for_source', 'cw_import_of', 'cw_host_config_of', 'cw_schema', 'cw_source',
 
                               'data', 'data_encoding', 'data_format', 'data_name', 'default_workflow', 'defaultval', 'delete_permission',
                               'description', 'description_format', 'destination_state',
 
-                              'ecrit_par', 'eid', 'evaluee', 'expression', 'exprtype',
+                              'ecrit_par', 'eid', 'end_timestamp', 'evaluee', 'expression', 'exprtype',
 
                               'fabrique_par', 'final', 'firstname', 'for_user', 'fournit',
                               'from_entity', 'from_state', 'fulltext_container', 'fulltextindexed',
@@ -196,7 +196,7 @@
                               'identity', 'in_group', 'in_state', 'in_synchronization', 'indexed',
                               'initial_state', 'inlined', 'internationalizable', 'is', 'is_instance_of',
 
-                              'label', 'last_login_time', 'latest_retrieval', 'lieu', 'login',
+                              'label', 'last_login_time', 'latest_retrieval', 'lieu', 'log', 'login',
 
                               'mainvars', 'match_host', 'modification_date',
 
@@ -208,7 +208,7 @@
 
                               'read_permission', 'relation_type', 'relations', 'require_group',
 
-                              'specializes', 'state_of', 'subworkflow', 'subworkflow_exit', 'subworkflow_state', 'surname', 'symmetric', 'synopsis',
+                              'specializes', 'start_timestamp', 'state_of', 'status', 'subworkflow', 'subworkflow_exit', 'subworkflow_state', 'surname', 'symmetric', 'synopsis',
 
                               'tags', 'timestamp', 'title', 'to_entity', 'to_state', 'transition_of', 'travaille', 'type',
 
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/web/data/cubicweb.log.css	Fri Oct 21 14:32:37 2011 +0200
@@ -0,0 +1,118 @@
+/* sample css file for logs
+ *
+ * Copyright (c) 2003-2010 LOGILAB S.A. (Paris, FRANCE).
+ * http://www.logilab.fr/ -- mailto:contact@logilab.fr
+ */
+
+pre.rawtext {
+    overflow: auto;
+    max-width: 110em;
+    padding: 0 0 0 0;
+}
+
+table.listing td.logSeverity {
+    font-weight: bold;
+    padding-left: 0.5em;
+    padding-right: 1em;
+}
+
+table.listing pre{
+    color: black;
+}
+
+table.listing .logDebug a{
+    color : #444 ;
+}
+table.listing .logDebug td{
+    color : #444 ;
+    border-color: grey #AAA;
+}
+
+table.listing .logDebug pre{
+    background-color : transparent ;
+    border: none;
+}
+
+table.listing .logSeverity .internallink {
+    visibility: hidden;
+    color: #FF4500;
+    font-weight: bolder;
+}
+
+table.listing tr:hover .internallink {
+    visibility: visible;
+}
+
+table.listing .internallink:hover {
+    background-color: #FF4500;
+    color: White;
+    font-weight: bolder;
+}
+
+table.listing .logInfo a{
+    color : #240 ;
+}
+
+table.listing .logInfo td{
+    color : #240 ;
+    background-color : #DFD ;
+    border-color: grey #AFA;
+}
+
+table.listing .logInfo pre{
+    background-color : transparent ;
+    border: none;
+}
+
+table.listing .logWarning a{
+    color : #A42 ;
+}
+table.listing .logWarning td{
+    color : #A42 ;
+    background-color : #FFC ;
+    border-color: grey #FA6;
+}
+
+table.listing .logWarning pre{
+    background-color : transparent ;
+    border: none;
+}
+
+table.listing .logError a{
+    color : #A00 ;
+}
+table.listing .logError td{
+    color : #A00 ;
+    background-color : #FDD ;
+    border-color: grey #FAA;
+}
+
+table.listing .logError pre{
+    background-color : transparent ;
+    border: none;
+}
+
+table.listing .logFatal a{
+    color : #00A;
+}
+table.listing .logFatal td{
+    color : #00A;
+    background-color : #DDF ;
+    border-color: grey #AAF;
+}
+
+table.listing .logFatal pre{
+    background-color : transparent ;
+    border: none;
+}
+
+div.validPlan{
+  color: green;
+  text-align: center;
+}
+
+div.invalidPlan{
+  color: red;
+  text-align: center;
+}
+
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/web/data/cubicweb.log.js	Fri Oct 21 14:32:37 2011 +0200
@@ -0,0 +1,13 @@
+// This contains template-specific javascript
+
+function filterLog(domid, thresholdLevel) {
+    var logLevels = ["Debug", "Info", "Warning", "Error", "Fatal"]
+    var action = "hide";
+    for (var idx = 0; idx < logLevels.length; idx++){
+        var level = logLevels[idx];
+        if (level === thresholdLevel){
+            action = "show";
+        }
+        $('#'+domid+' .log' + level)[action]();
+    }
+}
--- a/web/views/cwsources.py	Fri Oct 21 14:32:37 2011 +0200
+++ b/web/views/cwsources.py	Fri Oct 21 14:32:37 2011 +0200
@@ -22,14 +22,21 @@
 __docformat__ = "restructuredtext en"
 _ = unicode
 
+import logging
 from itertools import repeat, chain
+from logilab.mtconverter import xml_escape
+from logilab.common.decorators import cachedproperty
 
-from cubicweb import Unauthorized
-from cubicweb.selectors import is_instance, score_entity, match_user_groups
+from cubicweb import Unauthorized, tags
+from cubicweb.utils import make_uid
+from cubicweb.selectors import (is_instance, score_entity, has_related_entities,
+                                match_user_groups, match_kwargs, match_view)
 from cubicweb.view import EntityView, StartupView
 from cubicweb.schema import META_RTYPES, VIRTUAL_RTYPES, display_name
-from cubicweb.web import uicfg, formwidgets as wdgs
-from cubicweb.web.views import tabs, actions, ibreadcrumbs, tableview, add_etype_button
+from cubicweb.web import uicfg, formwidgets as wdgs, facet
+from cubicweb.web.views import add_etype_button
+from cubicweb.web.views import (tabs, actions, ibreadcrumbs, navigation,
+                                tableview, pyviews)
 
 
 _abaa = uicfg.actionbox_appearsin_addmenu
@@ -37,6 +44,7 @@
 _abaa.tag_object_of(('CWSourceSchemaConfig', 'cw_schema', '*'), False)
 _abaa.tag_object_of(('CWSourceSchemaConfig', 'cw_for_source', '*'), False)
 _abaa.tag_object_of(('CWSourceSchemaConfig', 'cw_host_config_of', '*'), False)
+_abaa.tag_object_of(('CWDataImport', 'cw_import_of', '*'), False)
 
 _afs = uicfg.autoform_section
 _afs.tag_object_of(('*', 'cw_for_source', 'CWSource'), 'main', 'hidden')
@@ -62,13 +70,13 @@
 
 class CWSourcePrimaryView(tabs.TabbedPrimaryView):
     __select__ = is_instance('CWSource')
-    tabs = [_('cwsource-main'), _('cwsource-mapping')]
+    tabs = [_('cwsource-main'), _('cwsource-mapping'), _('cwsource-imports')]
     default_tab = 'cwsource-main'
 
 
 class CWSourceMainTab(tabs.PrimaryTab):
     __regid__ = 'cwsource-main'
-    __select__ = tabs.PrimaryTab.__select__ & is_instance('CWSource')
+    __select__ = is_instance('CWSource')
 
     def render_entity_attributes(self, entity):
         super(CWSourceMainTab, self).render_entity_attributes(entity)
@@ -93,7 +101,7 @@
 
 class CWSourceMappingTab(EntityView):
     __regid__ = 'cwsource-mapping'
-    __select__ = (tabs.PrimaryTab.__select__ & is_instance('CWSource')
+    __select__ = (is_instance('CWSource')
                   & match_user_groups('managers')
                   & score_entity(lambda x:x.type in MAPPED_SOURCE_TYPES))
 
@@ -248,6 +256,30 @@
     'pyrorql': PyroRQLMappingChecker,
     }
 
+
+class CWSourceImportsTab(EntityView):
+    __regid__ = 'cwsource-imports'
+    __select__ = (is_instance('CWSource')
+                  & has_related_entities('cw_import_of', 'object'))
+
+    def entity_call(self, entity):
+        rset = self._cw.execute('Any X, XST, XET, XS ORDERBY XST DESC WHERE '
+                                'X cw_import_of S, S eid %(s)s, X status XS, '
+                                'X start_timestamp XST, X end_timestamp XET',
+                                {'s': entity.eid})
+        self._cw.view('cw.imports-table', rset, w=self.w)
+
+
+class CWImportsTable(tableview.EntityTableView):
+    __regid__ = 'cw.imports-table'
+    __select__ = is_instance('CWDataImport')
+    columns = ['import', 'start_timestamp', 'end_timestamp']
+    column_renderers = {'import': tableview.MainEntityColRenderer()}
+    layout_args = {'display_filter': 'top'}
+
+
+
+
 # sources management view ######################################################
 
 class ManageSourcesAction(actions.ManagersAction):
@@ -272,12 +304,221 @@
 class CWSourcesTable(tableview.EntityTableView):
     __regid__ = 'cw.sources-table'
     __select__ = is_instance('CWSource')
-    columns = ['source', 'type', 'parser', 'latest_retrieval']
+    columns = ['source', 'type', 'parser', 'latest_retrieval', 'latest_import']
+
+    class LatestImportColRenderer(tableview.EntityTableColRenderer):
+        def render_cell(self, w, rownum):
+            entity = self.entity(rownum)
+            rset = self._cw.execute('Any X,XS,XST ORDERBY XST DESC LIMIT 1 WHERE '
+                                    'X cw_import_of S, S eid %(s)s, X status XS, '
+                                    'X start_timestamp XST', {'s': entity.eid})
+            if rset:
+                self._cw.view('incontext', rset, row=0, w=w)
+            else:
+                w(self.empty_cell_content)
+
+    column_renderers = {
+        'source': tableview.MainEntityColRenderer(),
+        'latest_import': LatestImportColRenderer(header=_('latest import'),
+                                                 sortable=False)
+        }
+
+# datafeed source import #######################################################
+
+REVERSE_SEVERITIES = {
+    logging.DEBUG :   _('DEBUG'),
+    logging.INFO :    _('INFO'),
+    logging.WARNING : _('WARNING'),
+    logging.ERROR :   _('ERROR'),
+    logging.FATAL :   _('FATAL')
+}
+
+
+def log_to_table(req, rawdata):
+    data = []
+    for msg_idx, msg in enumerate(rawdata.split('<br/>')):
+        record = msg.strip()
+        if not record:
+            continue
+        try:
+            severity, url, line, msg = record.split('\t', 3)
+        except ValueError:
+            req.warning('badly formated log %s' % record)
+            url = line = u''
+            severity = logging.DEBUG
+            msg = record
+        data.append( (severity, url, line, msg) )
+    return data
+
+
+class LogTableLayout(tableview.TableLayout):
+    __select__ = match_view('cw.log.table')
+    needs_js = tableview.TableLayout.needs_js + ('cubicweb.log.js',)
+    needs_css = tableview.TableLayout.needs_css + ('cubicweb.log.css',)
+    columns_css = {
+        0: 'logSeverity',
+        1: 'logPath',
+        2: 'logLine',
+        3: 'logMsg',
+        }
+
+    def render_table(self, w, actions, paginate):
+        default_level = self.view.cw_extra_kwargs['default_level']
+        if default_level != 'Debug':
+            self._cw.add_onload('$("select.logFilter").val("%s").change();'
+                           % self._cw.form.get('logLevel', default_level))
+        w(u'\n<form action="#"><fieldset>')
+        w(u'<label>%s</label>' % self._cw._(u'Message threshold'))
+        w(u'<select class="log_filter" onchange="filterLog(\'%s\', this.options[this.selectedIndex].value)">'
+          % self.view.domid)
+        for level in ('Debug', 'Info', 'Warning', 'Error', 'Fatal'):
+            w('<option value="%s">%s</option>' % (level, self._cw._(level)))
+        w(u'</select>')
+        w(u'</fieldset></form>')
+        super(LogTableLayout, self).render_table(w, actions, paginate)
+
+    def table_attributes(self):
+        attrs = super(LogTableLayout, self).table_attributes()
+        attrs['id'] = 'table'+self.view.domid
+        return attrs
+
+    def row_attributes(self, rownum):
+        attrs = super(LogTableLayout, self).row_attributes(rownum)
+        attrs['id'] = 'log_msg_%i' % rownum
+        severityname = REVERSE_SEVERITIES[int(self.view.pyvalue[rownum][0])]
+        attrs['class'] = 'log%s' % severityname.capitalize()
+        return attrs
+
+    def cell_attributes(self, rownum, colnum, colid):
+        attrs = super(LogTableLayout, self).cell_attributes(rownum, colnum, colid)
+        attrs['class'] = self.columns_css[colnum]
+        return attrs
+
+
+class LogTable(pyviews.PyValTableView):
+    __regid__ = 'cw.log.table'
+    headers = [_('severity'), _('url'), _('line'), _('message')]
+
+    @cachedproperty
+    def domid(self):
+        return make_uid('logTable')
 
-    # @tableview.etable_header_title('CWSource_plural', addcount=True)
-    # @tableview.etable_entity_sortvalue()
-    # def source_cell(self, w, entity):
-    #     w(entity.view('incontext'))
+    class SeverityRenderer(pyviews.PyValTableColRenderer):
+        def render_cell(self, w, rownum):
+            severity = self.data[rownum][0]
+            w(u'<a class="internallink" href="javascript:;" title="%(title)s" '
+              u'''onclick="document.location.hash='%(msg_id)s';">&#182;</a>'''
+              u'&#160;%(severity)s' % {
+                'severity': self._cw._(REVERSE_SEVERITIES[int(severity)]),
+                'title': self._cw._('permalink to this message'),
+                'msg_id': 'log_msg_%i' % rownum,
+            })
+        def sortvalue(self, rownum):
+            return int(self.data[rownum][0])
+
+    class URLRenderer(pyviews.PyValTableColRenderer):
+        def render_cell(self, w, rownum):
+            url = self.data[rownum][1]
+            w(url and tags.a(url, href=url) or u'&#160;')
+
+    class LineRenderer(pyviews.PyValTableColRenderer):
+        def render_cell(self, w, rownum):
+            line = self.data[rownum][2]
+            w(line or u'&#160;')
+
+    class MessageRenderer(pyviews.PyValTableColRenderer):
+        snip_over = 7
+        def render_cell(self, w, rownum):
+            msg = self.data[rownum][3]
+            lines = msg.splitlines()
+            if len(lines) <= self.snip_over:
+                w(u'<pre class="rawtext">%s</pre>' % msg)
+            else:
+                # The make_uid argument has no specific meaning here.
+                div_snip_id = make_uid(u'log_snip_')
+                div_full_id = make_uid(u'log_full_')
+                divs_id = (div_snip_id, div_full_id)
+                snip = u'\n'.join((lines[0], lines[1],
+                                   u'  ...',
+                                   u'    %i more lines [double click to expand]' % (len(lines)-4),
+                                   u'  ...',
+                                   lines[-2], lines[-1]))
+                divs = (
+                        (div_snip_id, snip, u'expand', "class='collapsed'"),
+                        (div_full_id, msg,  u'collapse', "class='hidden'")
+                )
+                for div_id, content, button, h_class in divs:
+                    text = self._cw._(button)
+                    js = u"toggleVisibility('%s'); toggleVisibility('%s');" % divs_id
+                    w(u'<div id="%s" %s>' % (div_id, h_class))
+                    w(u'<pre class="raw_test" ondblclick="javascript: %s" '
+                      u'title="%s" style="display: block;">' % (js, text))
+                    w(content)
+                    w(u'</pre>')
+                    w(u'</div>')
+
+    column_renderers = {0: SeverityRenderer(),
+                        1: URLRenderer(),
+                        2: LineRenderer(),
+                        3: MessageRenderer(),
+                        }
+
+
+class DataFeedSourceDataImport(EntityView):
+    __select__ = EntityView.__select__ & match_kwargs('rtype')
+    __regid__ = 'cw.formated_log'
+
+    def cell_call(self, row, col, rtype, loglevel='Info', **kwargs):
+        if 'dispctrl' in self.cw_extra_kwargs:
+            loglevel = self.cw_extra_kwargs['dispctrl'].get('loglevel', loglevel)
+        entity = self.cw_rset.get_entity(row, col)
+        value = getattr(entity, rtype)
+        if value:
+            self._cw.view('cw.log.table', pyvalue=log_to_table(self._cw, value),
+                          default_level=loglevel, w=self.w)
+        else:
+            self.w(self._cw._('no log to display'))
+
+
+_pvs.tag_attribute(('CWDataImport', 'log'), 'relations')
+_pvdc.tag_attribute(('CWDataImport', 'log'), {'vid': 'cw.formated_log'})
+_pvs.tag_subject_of(('CWDataImport', 'cw_import_of', '*'), 'hidden') # in breadcrumbs
+_pvs.tag_object_of(('*', 'cw_import_of', 'CWSource'), 'hidden') # in dedicated tab
+
+
+class CWDataImportIPrevNextAdapter(navigation.IPrevNextAdapter):
+    __select__ = is_instance('CWDataImport')
+
+    def next_entity(self):
+        if self.entity.start_timestamp is not None:
+            # add NOT X eid %(e)s because > may not be enough
+            rset = self._cw.execute(
+                'Any X,XSTS ORDERBY 2 LIMIT 1 WHERE X is CWDataImport, '
+                'X cw_import_of S, S eid %(s)s, NOT X eid %(e)s, '
+                'X start_timestamp XSTS, X start_timestamp > %(sts)s',
+                {'sts': self.entity.start_timestamp,
+                 'e': self.entity.eid,
+                 's': self.entity.cwsource.eid})
+            if rset:
+                return rset.get_entity(0, 0)
+
+    def previous_entity(self):
+        if self.entity.start_timestamp is not None:
+            # add NOT X eid %(e)s because < may not be enough
+            rset = self._cw.execute(
+                'Any X,XSTS ORDERBY 2 DESC LIMIT 1 WHERE X is CWDataImport, '
+                'X cw_import_of S, S eid %(s)s, NOT X eid %(e)s, '
+                'X start_timestamp XSTS, X start_timestamp < %(sts)s',
+                {'sts': self.entity.start_timestamp,
+                 'e': self.entity.eid,
+                 's': self.entity.cwsource.eid})
+            if rset:
+                return rset.get_entity(0, 0)
+
+class CWDataImportStatusFacet(facet.AttributeFacet):
+    __regid__ = 'datafeed.dataimport.status'
+    __select__ = is_instance('CWDataImport')
+    rtype = 'status'
 
 
 # breadcrumbs configuration ####################################################
@@ -286,3 +527,8 @@
     __select__ = is_instance('CWSourceHostConfig', 'CWSourceSchemaConfig')
     def parent_entity(self):
         return self.entity.cwsource
+
+class CWDataImportIBreadCrumbsAdapter(ibreadcrumbs.IBreadCrumbsAdapter):
+    __select__ = is_instance('CWDataImport')
+    def parent_entity(self):
+        return self.entity.cw_import_of[0]