[serverctl] allow to sync multiple and all sources in source-sync command
Breaking change: by default source are not synced anymore if it is fresh
(regards to synchronization-interval). To get the previous behavior an option
--force is added.
Closes #13886467
--- a/cubicweb/server/serverctl.py Wed Jun 22 15:57:17 2016 +0200
+++ b/cubicweb/server/serverctl.py Wed Jun 22 11:38:53 2016 +0200
@@ -969,21 +969,25 @@
class SynchronizeSourceCommand(Command):
- """Force a source synchronization.
+ """Force sources synchronization.
<instance>
the identifier of the instance
<source>
- the name of the source to synchronize.
+ names of the sources to synchronize, if empty all sources will be synced.
"""
name = 'source-sync'
- arguments = '<instance> <source>'
- min_args = max_args = 2
+ arguments = '<instance> [<source> <source> ...]'
+ min_args = 1
options = (
('loglevel',
{'short': 'l', 'type': 'choice', 'metavar': '<log level>',
'default': 'info', 'choices': ('debug', 'info', 'warning', 'error')},
),
+ ('force',
+ {'short': 'f', 'action': 'store_true', 'default': False,
+ 'help': 'force source synchronization (ignore synchronization interval)'},
+ ),
)
def run(self, args):
@@ -995,25 +999,38 @@
init_cmdline_log_threshold(config, self['loglevel'])
repo = repoapi.get_repository(config=config)
repo.hm.call_hooks('server_maintenance', repo=repo)
- status = 0
- try:
- try:
- source = repo.sources_by_uri[args[1]]
- except KeyError:
- raise ExecutionError('no source named %r' % args[1])
- with repo.internal_cnx() as cnx:
+ errors = False
+ with repo.internal_cnx() as cnx:
+ sources = []
+ if len(args) >= 2:
+ for name in args[1:]:
+ try:
+ source = repo.sources_by_uri[name]
+ except KeyError:
+ cnx.error('no source named %r' % name)
+ errors = True
+ else:
+ sources.append(source)
+ else:
+ for uri, source in list(repo.sources_by_uri.items()):
+ if (uri != 'system' and
+ repo.config.source_enabled(source) and
+ source.config['synchronize']):
+ sources.append(source)
+
+ for source in sources:
try:
- stats = source.pull_data(cnx, force=True, raise_on_error=True)
- except SourceException as exc:
- print("can't synchronize the source:", exc)
- status = 1
- stats = {}
- finally:
- repo.shutdown()
- for key, val in stats.items():
- if val:
- print(key, ':', val)
- sys.exit(status)
+ stats = source.pull_data(cnx, force=self['force'], raise_on_error=True)
+ except Exception:
+ cnx.exception('while trying to update source %s', source)
+ errors = True
+ else:
+ for key, val in stats.items():
+ if val:
+ print(key, ':', val)
+
+ if errors:
+ raise ExecutionError('All sources where not synced')
def permissionshandler(relation, perms):
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/cubicweb/server/test/data/entities.py Wed Jun 22 11:38:53 2016 +0200
@@ -0,0 +1,18 @@
+from cubicweb.server.sources import datafeed
+
+
+class SourceParserSuccess(datafeed.DataFeedParser):
+ __regid__ = 'test_source_parser_success'
+
+ def process(self, url, raise_on_error=False):
+ entity = self._cw.create_entity('Card', title=u'success')
+ self.notify_updated(entity)
+
+
+class SourceParserFail(SourceParserSuccess):
+ __regid__ = 'test_source_parser_fail'
+
+ def process(self, url, raise_on_error=False):
+ entity = self._cw.create_entity('Card', title=u'fail')
+ self.notify_updated(entity)
+ raise RuntimeError("fail")
--- a/cubicweb/server/test/unittest_serverctl.py Wed Jun 22 15:57:17 2016 +0200
+++ b/cubicweb/server/test/unittest_serverctl.py Wed Jun 22 11:38:53 2016 +0200
@@ -1,8 +1,9 @@
import os.path as osp
import shutil
+from cubicweb import ExecutionError
from cubicweb.devtools import testlib, ApptestConfiguration
-from cubicweb.server.serverctl import _local_dump, DBDumpCommand
+from cubicweb.server.serverctl import _local_dump, DBDumpCommand, SynchronizeSourceCommand
from cubicweb.server.serverconfig import ServerConfiguration
class ServerCTLTC(testlib.CubicWebTC):
@@ -20,6 +21,35 @@
DBDumpCommand(None).run([self.appid])
shutil.rmtree(osp.join(self.config.apphome, 'backup'))
+ def test_source_sync(self):
+ with self.admin_access.repo_cnx() as cnx:
+ cnx.create_entity('CWSource', name=u'success_feed', type=u'datafeed',
+ parser=u'test_source_parser_success',
+ url=u'ignored')
+ cnx.create_entity('CWSource', name=u'fail_feed', type=u'datafeed',
+ parser=u'test_source_parser_fail',
+ url=u'ignored')
+ cnx.commit()
+
+ cmd = SynchronizeSourceCommand(None)
+ cmd.config.force = 1
+
+ # Should sync all sources even if one failed
+ with self.assertRaises(ExecutionError) as exc:
+ cmd.run([self.appid])
+ self.assertEqual(len(cnx.find('Card', title=u'success')), 1)
+ self.assertEqual(len(cnx.find('Card', title=u'fail')), 0)
+ self.assertEqual(str(exc.exception), 'All sources where not synced')
+
+ # call with named sources
+ cmd.run([self.appid, u'success_feed'])
+ self.assertEqual(len(cnx.find('Card', title=u'success')), 2)
+
+ with self.assertRaises(ExecutionError) as exc:
+ cmd.run([self.appid, u'fail_feed'])
+ self.assertEqual(str(exc.exception), 'All sources where not synced')
+ self.assertEqual(len(cnx.find('Card', title=u'fail')), 0)
+
if __name__ == '__main__':
from unittest import main