[server] Make "sources_by_uri" and "sources_by_eid" properties of repository 3.25
authorDenis Laxalde <denis.laxalde@logilab.fr>
Tue, 04 Apr 2017 16:28:50 +0200
branch3.25
changeset 12142 db2fc87348ab
parent 12141 29d032bb70d8
child 12143 a446124bcf3c
[server] Make "sources_by_uri" and "sources_by_eid" properties of repository I.e. do not populate these dict as repo initialization (bootstrap step) but always use information from database. This is needed because when multiple instances of the same application run, if one instance adds a CWSource the other ones will not see it. In particular, when using a scheduler instance, new CWSource will be added by the web instance and not seen by the scheduler which is supposed to update them. We thus define properties for sources_by_eid and sources_by_uri instead attributes on repository instance. CWSource entities are thus retrieved from database every time these properties are accessed. We factor out initialization of the "source" instance (subclass of cubicweb.server.source.AbstractSource) in a _sources() method. Note that this method takes care of calling "init" method on the source as well as "set_schema" (previously done in repo.set_schema(), which now only touches system_source). Accordingly the "init_sources_from_database" method is dropped along with "add_source"/"remove_source" methods. In syncsources hook, we thus drop: * SourceAddedOp operation which called repo.add_source() so that the SourceAddedHook only cares about checking source configuration now; * SourceRemovedOp and SourceRenamedOp operations for the same reason; * SourceConfigUpdatedOp as updating the live config of source is meaningless once we rely on them being retrieved from the database; * SourceHostConfigUpdatedHook hook which is now useless without call to SourceConfigUpdatedOp; In 3.10 migration script, remove usage of sources_by_uri repo attribute which, unless I'm missing something, appears useless (at least now). In tests: * unittest_datafeed: remove test_update_url method since we dropped respective hook; * unittest_ldapsource: LDAPFeedUserDeletionTC.test_a_filter_inactivate() currently fails because it still relies on live config being updated, this will be fixed in the next changeset once all "live source" logic will be removed.
cubicweb/hooks/syncsources.py
cubicweb/misc/migration/3.10.0_Any.py
cubicweb/server/repository.py
cubicweb/server/test/unittest_datafeed.py
--- a/cubicweb/hooks/syncsources.py	Tue Feb 21 11:04:19 2017 +0100
+++ b/cubicweb/hooks/syncsources.py	Tue Apr 04 16:28:50 2017 +0200
@@ -35,11 +35,6 @@
 
 # repo sources synchronization #################################################
 
-class SourceAddedOp(hook.Operation):
-    entity = None # make pylint happy
-    def postcommit_event(self):
-        self.cnx.repo.add_source(self.entity)
-
 class SourceAddedHook(SourceHook):
     __regid__ = 'cw.sources.added'
     __select__ = SourceHook.__select__ & is_instance('CWSource')
@@ -56,14 +51,8 @@
         if self.entity.name != 'system':
             sourcecls.check_conf_dict(self.entity.eid, self.entity.host_config,
                                       fail_if_unknown=not self._cw.vreg.config.repairing)
-            SourceAddedOp(self._cw, entity=self.entity)
 
 
-class SourceRemovedOp(hook.Operation):
-    uri = None # make pylint happy
-    def postcommit_event(self):
-        self.cnx.repo.remove_source(self.uri)
-
 class SourceRemovedHook(SourceHook):
     __regid__ = 'cw.sources.removed'
     __select__ = SourceHook.__select__ & is_instance('CWSource')
@@ -72,34 +61,6 @@
         if self.entity.name == 'system':
             msg = _("You cannot remove the system source")
             raise validation_error(self.entity, {None: msg})
-        SourceRemovedOp(self._cw, uri=self.entity.name)
-
-
-class SourceConfigUpdatedOp(hook.DataOperationMixIn, hook.Operation):
-
-    def precommit_event(self):
-        self.__processed = []
-        for source in self.get_data():
-            if not self.cnx.deleted_in_transaction(source.eid):
-                conf = source.repo_source.check_config(source)
-                self.__processed.append( (source, conf) )
-
-    def postcommit_event(self):
-        for source, conf in self.__processed:
-            source.repo_source.update_config(source, conf)
-
-
-class SourceRenamedOp(hook.LateOperation):
-    oldname = newname = None # make pylint happy
-
-    def postcommit_event(self):
-        repo = self.cnx.repo
-        # XXX race condition
-        source = repo.sources_by_uri.pop(self.oldname)
-        source.uri = self.newname
-        source.public_config['uri'] = self.newname
-        repo.sources_by_uri[self.newname] = source
-        clear_cache(repo, 'source_defs')
 
 
 class SourceUpdatedHook(SourceHook):
@@ -112,26 +73,8 @@
             if oldname == 'system':
                 msg = _("You cannot rename the system source")
                 raise validation_error(self.entity, {('name', 'subject'): msg})
-            SourceRenamedOp(self._cw, oldname=oldname, newname=newname)
         if 'config' in self.entity.cw_edited or 'url' in self.entity.cw_edited:
             if self.entity.name == 'system' and self.entity.config:
                 msg = _("Configuration of the system source goes to "
                         "the 'sources' file, not in the database")
                 raise validation_error(self.entity, {('config', 'subject'): msg})
-            SourceConfigUpdatedOp.get_instance(self._cw).add_data(self.entity)
-
-
-class SourceHostConfigUpdatedHook(SourceHook):
-    __regid__ = 'cw.sources.hostconfigupdate'
-    __select__ = SourceHook.__select__ & is_instance('CWSourceHostConfig')
-    events = ('after_add_entity', 'after_update_entity', 'before_delete_entity',)
-    def __call__(self):
-        if self.entity.match(gethostname()):
-            if self.event == 'after_update_entity' and \
-                   not 'config' in self.entity.cw_edited:
-                return
-            try:
-                SourceConfigUpdatedOp.get_instance(self._cw).add_data(self.entity.cwsource)
-            except IndexError:
-                # XXX no source linked to the host config yet
-                pass
--- a/cubicweb/misc/migration/3.10.0_Any.py	Tue Feb 21 11:04:19 2017 +0100
+++ b/cubicweb/misc/migration/3.10.0_Any.py	Tue Apr 04 16:28:50 2017 +0200
@@ -1,10 +1,5 @@
 from six import text_type
 
-for uri, cfg in config.read_sources_file().items():
-    if uri in ('system', 'admin'):
-        continue
-    repo.sources_by_uri[uri] = repo.get_source(cfg['adapter'], uri, cfg.copy())
-
 add_entity_type('CWSource')
 add_relation_definition('CWSource', 'cw_source', 'CWSource')
 add_entity_type('CWSourceHostConfig')
@@ -21,7 +16,6 @@
 for uri, cfg in config.read_sources_file().items():
     if uri in ('system', 'admin'):
         continue
-    repo.sources_by_uri.pop(uri)
     config = u'\n'.join('%s=%s' % (key, value) for key, value in cfg.items()
                         if key != 'adapter' and value is not None)
     create_entity('CWSource', name=text_type(uri), type=text_type(cfg['adapter']),
--- a/cubicweb/server/repository.py	Tue Feb 21 11:04:19 2017 +0100
+++ b/cubicweb/server/repository.py	Tue Apr 04 16:28:50 2017 +0200
@@ -211,7 +211,6 @@
 
     def __init__(self, config, scheduler=None, vreg=None):
         self.config = config
-        self.sources_by_eid = {}
         if vreg is None:
             vreg = cwvreg.CWRegistryStore(config)
         self.vreg = vreg
@@ -230,7 +229,6 @@
         # sources (additional sources info in the system database)
         self.system_source = self.get_source('native', 'system',
                                              config.system_source_config.copy())
-        self.sources_by_uri = {'system': self.system_source}
         # querier helper, need to be created after sources initialization
         self.querier = querier.QuerierHelper(self, self.schema)
         # cache eid -> type
@@ -295,7 +293,6 @@
             self.system_source.init_creating()
         else:
             self._init_system_source()
-            self.init_sources_from_database()
             if 'CWProperty' in self.schema:
                 self.vreg.init_properties(self.properties())
         # 4. close initialization connection set and reopen fresh ones for
@@ -305,6 +302,41 @@
         # 5. call instance level initialisation hooks
         self.hm.call_hooks('server_startup', repo=self)
 
+    @property
+    def sources_by_uri(self):
+        mapping = {'system': self.system_source}
+        mapping.update((sourceent.name, source)
+                       for sourceent, source in self._sources())
+        return mapping
+
+    @property
+    def sources_by_eid(self):
+        mapping = {self.system_source.eid: self.system_source}
+        mapping.update((sourceent.eid, source)
+                       for sourceent, source in self._sources())
+        return mapping
+
+    def _sources(self):
+        if self.config.quick_start:
+            return
+        with self.internal_cnx() as cnx:
+            for sourceent in cnx.execute(
+                    'Any S, SN, SA, SC WHERE S is_instance_of CWSource, '
+                    'S name SN, S type SA, S config SC, S name != "system"').entities():
+                source = self.get_source(sourceent.type, sourceent.name,
+                                         sourceent.host_config, sourceent.eid)
+                if self.config.source_enabled(source):
+                    # call source's init method to complete their initialisation if
+                    # needed (for instance looking for persistent configuration using an
+                    # internal session, which is not possible until connections sets have been
+                    # initialized)
+                    source.init(True, sourceent)
+                else:
+                    source.init(False, sourceent)
+                source.set_schema(self.schema)
+                yield sourceent, source
+        self._clear_source_defs_caches()
+
     # internals ###############################################################
 
     def _init_system_source(self):
@@ -317,45 +349,8 @@
                 ' S name "system", S type SA, S config SC'
             ).one()
             self.system_source.eid = sourceent.eid
-            self.sources_by_eid[sourceent.eid] = self.system_source
             self.system_source.init(True, sourceent)
 
-    def init_sources_from_database(self):
-        if self.config.quick_start:
-            return
-        with self.internal_cnx() as cnx:
-            # FIXME: sources should be ordered (add_entity priority)
-            for sourceent in cnx.execute(
-                    'Any S, SN, SA, SC WHERE S is_instance_of CWSource, '
-                    'S name SN, S type SA, S config SC, S name != "system"').entities():
-                self.add_source(sourceent)
-
-    def add_source(self, sourceent):
-        try:
-            source = self.get_source(sourceent.type, sourceent.name,
-                                     sourceent.host_config, sourceent.eid)
-        except RuntimeError:
-            if self.config.repairing:
-                self.exception('cant setup source %s, skipped', sourceent.name)
-                return
-            raise
-        self.sources_by_eid[sourceent.eid] = source
-        self.sources_by_uri[sourceent.name] = source
-        if self.config.source_enabled(source):
-            # call source's init method to complete their initialisation if
-            # needed (for instance looking for persistent configuration using an
-            # internal session, which is not possible until connections sets have been
-            # initialized)
-            source.init(True, sourceent)
-        else:
-            source.init(False, sourceent)
-        self._clear_source_defs_caches()
-
-    def remove_source(self, uri):
-        source = self.sources_by_uri.pop(uri)
-        del self.sources_by_eid[source.eid]
-        self._clear_source_defs_caches()
-
     def get_source(self, type, uri, source_config, eid=None):
         # set uri and type in source config so it's available through
         # source_defs()
@@ -371,8 +366,7 @@
         else:
             self.vreg._set_schema(schema)
         self.querier.set_schema(schema)
-        for source in self.sources_by_uri.values():
-            source.set_schema(schema)
+        self.system_source.set_schema(schema)
         self.schema = schema
 
     def deserialize_schema(self):
--- a/cubicweb/server/test/unittest_datafeed.py	Tue Feb 21 11:04:19 2017 +0100
+++ b/cubicweb/server/test/unittest_datafeed.py	Tue Apr 04 16:28:50 2017 +0200
@@ -116,8 +116,9 @@
         dfsource = self.repo.sources_by_uri[u'ô myfeed']
         with self.admin_access.repo_cnx() as cnx:
             cnx.entity_from_eid(dfsource.eid).cw_set(url=u"http://pouet.com\nhttp://pouet.org")
-            self.assertEqual(dfsource.urls, [u'ignored'])
             cnx.commit()
+        self.assertEqual(dfsource.urls, [u'ignored'])
+        dfsource = self.repo.sources_by_uri[u'ô myfeed']
         self.assertEqual(dfsource.urls, [u"http://pouet.com", u"http://pouet.org"])
 
     def test_parser_not_found(self):