[serverctl] allow to sync multiple and all sources in source-sync command
authorPhilippe Pepiot <philippe.pepiot@logilab.fr>
Wed, 22 Jun 2016 11:38:53 +0200
changeset 11355 47b0b08fbb4b
parent 11354 6b7f4c6745a0
child 11356 17070ff16000
[serverctl] allow to sync multiple and all sources in source-sync command Breaking change: by default source are not synced anymore if it is fresh (regards to synchronization-interval). To get the previous behavior an option --force is added. Closes #13886467
cubicweb/server/serverctl.py
cubicweb/server/test/data/entities.py
cubicweb/server/test/unittest_serverctl.py
--- a/cubicweb/server/serverctl.py	Wed Jun 22 15:57:17 2016 +0200
+++ b/cubicweb/server/serverctl.py	Wed Jun 22 11:38:53 2016 +0200
@@ -969,21 +969,25 @@
 
 
 class SynchronizeSourceCommand(Command):
-    """Force a source synchronization.
+    """Force sources synchronization.
 
     <instance>
       the identifier of the instance
     <source>
-      the name of the source to synchronize.
+      names of the sources to synchronize, if empty all sources will be synced.
     """
     name = 'source-sync'
-    arguments = '<instance> <source>'
-    min_args = max_args = 2
+    arguments = '<instance> [<source> <source> ...]'
+    min_args = 1
     options = (
         ('loglevel',
          {'short': 'l', 'type': 'choice', 'metavar': '<log level>',
           'default': 'info', 'choices': ('debug', 'info', 'warning', 'error')},
          ),
+        ('force',
+         {'short': 'f', 'action': 'store_true', 'default': False,
+          'help': 'force source synchronization (ignore synchronization interval)'},
+         ),
     )
 
     def run(self, args):
@@ -995,25 +999,38 @@
         init_cmdline_log_threshold(config, self['loglevel'])
         repo = repoapi.get_repository(config=config)
         repo.hm.call_hooks('server_maintenance', repo=repo)
-        status = 0
-        try:
-            try:
-                source = repo.sources_by_uri[args[1]]
-            except KeyError:
-                raise ExecutionError('no source named %r' % args[1])
-            with repo.internal_cnx() as cnx:
+        errors = False
+        with repo.internal_cnx() as cnx:
+            sources = []
+            if len(args) >= 2:
+                for name in args[1:]:
+                    try:
+                        source = repo.sources_by_uri[name]
+                    except KeyError:
+                        cnx.error('no source named %r' % name)
+                        errors = True
+                    else:
+                        sources.append(source)
+            else:
+                for uri, source in list(repo.sources_by_uri.items()):
+                    if (uri != 'system' and
+                            repo.config.source_enabled(source) and
+                            source.config['synchronize']):
+                        sources.append(source)
+
+            for source in sources:
                 try:
-                    stats = source.pull_data(cnx, force=True, raise_on_error=True)
-                except SourceException as exc:
-                    print("can't synchronize the source:", exc)
-                    status = 1
-                    stats = {}
-        finally:
-            repo.shutdown()
-        for key, val in stats.items():
-            if val:
-                print(key, ':', val)
-        sys.exit(status)
+                    stats = source.pull_data(cnx, force=self['force'], raise_on_error=True)
+                except Exception:
+                    cnx.exception('while trying to update source %s', source)
+                    errors = True
+                else:
+                    for key, val in stats.items():
+                        if val:
+                            print(key, ':', val)
+
+        if errors:
+            raise ExecutionError('All sources where not synced')
 
 
 def permissionshandler(relation, perms):
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/cubicweb/server/test/data/entities.py	Wed Jun 22 11:38:53 2016 +0200
@@ -0,0 +1,18 @@
+from cubicweb.server.sources import datafeed
+
+
+class SourceParserSuccess(datafeed.DataFeedParser):
+    __regid__ = 'test_source_parser_success'
+
+    def process(self, url, raise_on_error=False):
+        entity = self._cw.create_entity('Card', title=u'success')
+        self.notify_updated(entity)
+
+
+class SourceParserFail(SourceParserSuccess):
+    __regid__ = 'test_source_parser_fail'
+
+    def process(self, url, raise_on_error=False):
+        entity = self._cw.create_entity('Card', title=u'fail')
+        self.notify_updated(entity)
+        raise RuntimeError("fail")
--- a/cubicweb/server/test/unittest_serverctl.py	Wed Jun 22 15:57:17 2016 +0200
+++ b/cubicweb/server/test/unittest_serverctl.py	Wed Jun 22 11:38:53 2016 +0200
@@ -1,8 +1,9 @@
 import os.path as osp
 import shutil
 
+from cubicweb import ExecutionError
 from cubicweb.devtools import testlib, ApptestConfiguration
-from cubicweb.server.serverctl import _local_dump, DBDumpCommand
+from cubicweb.server.serverctl import _local_dump, DBDumpCommand, SynchronizeSourceCommand
 from cubicweb.server.serverconfig import ServerConfiguration
 
 class ServerCTLTC(testlib.CubicWebTC):
@@ -20,6 +21,35 @@
         DBDumpCommand(None).run([self.appid])
         shutil.rmtree(osp.join(self.config.apphome, 'backup'))
 
+    def test_source_sync(self):
+        with self.admin_access.repo_cnx() as cnx:
+            cnx.create_entity('CWSource', name=u'success_feed', type=u'datafeed',
+                              parser=u'test_source_parser_success',
+                              url=u'ignored')
+            cnx.create_entity('CWSource', name=u'fail_feed', type=u'datafeed',
+                              parser=u'test_source_parser_fail',
+                              url=u'ignored')
+            cnx.commit()
+
+            cmd = SynchronizeSourceCommand(None)
+            cmd.config.force = 1
+
+            # Should sync all sources even if one failed
+            with self.assertRaises(ExecutionError) as exc:
+                cmd.run([self.appid])
+            self.assertEqual(len(cnx.find('Card', title=u'success')), 1)
+            self.assertEqual(len(cnx.find('Card', title=u'fail')), 0)
+            self.assertEqual(str(exc.exception), 'All sources where not synced')
+
+            # call with named sources
+            cmd.run([self.appid, u'success_feed'])
+            self.assertEqual(len(cnx.find('Card', title=u'success')), 2)
+
+            with self.assertRaises(ExecutionError) as exc:
+                cmd.run([self.appid, u'fail_feed'])
+            self.assertEqual(str(exc.exception), 'All sources where not synced')
+            self.assertEqual(len(cnx.find('Card', title=u'fail')), 0)
+
 
 if __name__ == '__main__':
     from unittest import main