[multi-sources-removal] Kill repo.sources
authorSylvain Thénault <sylvain.thenault@logilab.fr>
Fri, 21 Jun 2013 16:01:59 +0200
changeset 9456 a79e88aad555
parent 9455 62e89e696a3b
child 9457 d5ed6efd6448
[multi-sources-removal] Kill repo.sources there is only the system source in there now! Related to #2919300
devtools/__init__.py
devtools/fake.py
devtools/repotest.py
hooks/__init__.py
server/__init__.py
server/migractions.py
server/pool.py
server/repository.py
server/ssplanner.py
server/test/unittest_datafeed.py
server/test/unittest_ssplanner.py
--- a/devtools/__init__.py	Fri Jun 21 16:18:20 2013 +0200
+++ b/devtools/__init__.py	Fri Jun 21 16:01:59 2013 +0200
@@ -120,8 +120,7 @@
         repo._type_source_cache = {}
         repo._extid_cache = {}
         repo.querier._rql_cache = {}
-        for source in repo.sources:
-            source.reset_caches()
+        repo.system_source.reset_caches()
         repo._needs_refresh = False
 
 
--- a/devtools/fake.py	Fri Jun 21 16:18:20 2013 +0200
+++ b/devtools/fake.py	Fri Jun 21 16:01:59 2013 +0200
@@ -173,7 +173,6 @@
         self.config = config or FakeConfig()
         self.vreg = vreg or CWRegistryStore(self.config, initlog=False)
         self.vreg.schema = schema
-        self.sources = []
 
     def internal_session(self):
         return FakeSession(self)
--- a/devtools/repotest.py	Fri Jun 21 16:18:20 2013 +0200
+++ b/devtools/repotest.py	Fri Jun 21 16:01:59 2013 +0200
@@ -277,35 +277,19 @@
 
 
 class BasePlannerTC(BaseQuerierTC):
-    newsources = ()
 
     def setup(self):
-        clear_cache(self.repo, 'is_multi_sources_relation')
         # XXX source_defs
         self.o = self.repo.querier
         self.session = self.repo._sessions.values()[0]
         self.cnxset = self.session.set_cnxset()
         self.schema = self.o.schema
-        self.sources = self.o._repo.sources
-        self.system = self.sources[-1]
+        self.system = self.repo.system_source
         do_monkey_patch()
         self._dumb_sessions = [] # by hi-jacked parent setup
         self.repo.vreg.rqlhelper.backend = 'postgres' # so FTIRANK is considered
-        self.newsources = []
-
-    def add_source(self, sourcecls, uri):
-        source = sourcecls(self.repo, {'uri': uri, 'type': 'whatever'})
-        if not source.copy_based_source:
-            self.sources.append(source)
-        self.newsources.append(source)
-        self.repo.sources_by_uri[uri] = source
-        setattr(self, uri, source)
 
     def tearDown(self):
-        for source in self.newsources:
-            if not source.copy_based_source:
-                self.sources.remove(source)
-            del self.repo.sources_by_uri[source.uri]
         undo_monkey_patch()
         for session in self._dumb_sessions:
             if session._cnx.cnxset is not None:
--- a/hooks/__init__.py	Fri Jun 21 16:18:20 2013 +0200
+++ b/hooks/__init__.py	Fri Jun 21 16:01:59 2013 +0200
@@ -53,12 +53,10 @@
 
     def __call__(self):
         def update_feeds(repo):
-            # don't iter on repo.sources which doesn't include copy based
-            # sources (the one we're looking for)
             # take a list to avoid iterating on a dictionary whose size may
             # change
-            for source in list(repo.sources_by_eid.values()):
-                if (not source.copy_based_source
+            for uri, source in list(repo.sources_by_uri.iteritems()):
+                if (uri == 'system'
                     or not repo.config.source_enabled(source)
                     or not source.config['synchronize']):
                     continue
--- a/server/__init__.py	Fri Jun 21 16:18:20 2013 +0200
+++ b/server/__init__.py	Fri Jun 21 16:01:59 2013 +0200
@@ -284,7 +284,6 @@
     config._cubes = None # avoid assertion error
     repo, cnx = in_memory_repo_cnx(config, login, password=pwd)
     repo.system_source.eid = ssource.eid # redo this manually
-    assert len(repo.sources) == 1, repo.sources
     handler = config.migration_handler(schema, interactive=False,
                                        cnx=cnx, repo=repo)
     # install additional driver specific sql files
--- a/server/migractions.py	Fri Jun 21 16:18:20 2013 +0200
+++ b/server/migractions.py	Fri Jun 21 16:01:59 2013 +0200
@@ -187,18 +187,18 @@
         open(backupfile,'w').close() # kinda lock
         os.chmod(backupfile, 0600)
         # backup
+        source = repo.system_source
         tmpdir = tempfile.mkdtemp()
         try:
             failed = False
-            for source in repo.sources:
-                try:
-                    source.backup(osp.join(tmpdir, source.uri), self.confirm, format=format)
-                except Exception as ex:
-                    print '-> error trying to backup %s [%s]' % (source.uri, ex)
-                    if not self.confirm('Continue anyway?', default='n'):
-                        raise SystemExit(1)
-                    else:
-                        failed = True
+            try:
+                source.backup(osp.join(tmpdir, source.uri), self.confirm, format=format)
+            except Exception as ex:
+                print '-> error trying to backup %s [%s]' % (source.uri, ex)
+                if not self.confirm('Continue anyway?', default='n'):
+                    raise SystemExit(1)
+                else:
+                    failed = True
             with open(osp.join(tmpdir, 'format.txt'), 'w') as format_file:
                 format_file.write('%s\n' % format)
             with open(osp.join(tmpdir, 'versions.txt'), 'w') as version_file:
@@ -247,15 +247,13 @@
                     format = written_format
         self.config.init_cnxset_pool = False
         repo = self.repo_connect()
-        for source in repo.sources:
-            if systemonly and source.uri != 'system':
-                continue
-            try:
-                source.restore(osp.join(tmpdir, source.uri), self.confirm, drop, format)
-            except Exception as exc:
-                print '-> error trying to restore %s [%s]' % (source.uri, exc)
-                if not self.confirm('Continue anyway?', default='n'):
-                    raise SystemExit(1)
+        source = repo.system_source
+        try:
+            source.restore(osp.join(tmpdir, source.uri), self.confirm, drop, format)
+        except Exception as exc:
+            print '-> error trying to restore %s [%s]' % (source.uri, exc)
+            if not self.confirm('Continue anyway?', default='n'):
+                raise SystemExit(1)
         shutil.rmtree(tmpdir)
         # call hooks
         repo.init_cnxset_pool()
--- a/server/pool.py	Fri Jun 21 16:18:20 2013 +0200
+++ b/server/pool.py	Fri Jun 21 16:01:59 2013 +0200
@@ -1,4 +1,4 @@
-# copyright 2003-2012 LOGILAB S.A. (Paris, FRANCE), all rights reserved.
+# copyright 2003-2013 LOGILAB S.A. (Paris, FRANCE), all rights reserved.
 # contact http://www.logilab.fr/ -- mailto:contact@logilab.fr
 #
 # This file is part of CubicWeb.
@@ -29,17 +29,17 @@
     :class:`Session`
     """
 
-    def __init__(self, sources):
+    # since 3.19, we only have to manage the system source connection
+    def __init__(self, system_source):
         # dictionary of (source, connection), indexed by sources'uri
         self.source_cnxs = {}
-        for source in sources:
-            self.add_source(source)
-        if not 'system' in self.source_cnxs:
-            self.source_cnxs['system'] = self.source_cnxs[sources[0].uri]
+        self.source_cnxs['system'] = (system_source,
+                                      system_source.get_connection())
         self._cursors = {}
 
     def __getitem__(self, uri):
         """subscription notation provide access to sources'cursors"""
+        assert uri == 'system'
         try:
             cursor = self._cursors[uri]
         except KeyError:
@@ -49,15 +49,6 @@
                 self._cursors[uri] = cursor
         return cursor
 
-    def add_source(self, source):
-        assert not source.uri in self.source_cnxs
-        self.source_cnxs[source.uri] = (source, source.get_connection())
-
-    def remove_source(self, source):
-        source, cnx = self.source_cnxs.pop(source.uri)
-        cnx.close()
-        self._cursors.pop(source.uri, None)
-
     def commit(self):
         """commit the current transaction for this user"""
         # FIXME: what happends if a commit fail
@@ -110,11 +101,6 @@
         # implementation details of flying insert requires the system source
         # first
         yield self.source_cnxs['system'][0]
-        for uri, (source, cnx) in self.source_cnxs.items():
-            if uri == 'system':
-                continue
-            yield source
-        #return [source_cnx[0] for source_cnx in self.source_cnxs.itervalues()]
 
     def source(self, uid):
         """return the source object with the given uri"""
--- a/server/repository.py	Fri Jun 21 16:18:20 2013 +0200
+++ b/server/repository.py	Fri Jun 21 16:01:59 2013 +0200
@@ -190,7 +190,6 @@
         # sources (additional sources info in the system database)
         self.system_source = self.get_source('native', 'system',
                                              config.sources()['system'].copy())
-        self.sources = [self.system_source]
         self.sources_by_uri = {'system': self.system_source}
         # querier helper, need to be created after sources initialization
         self.querier = querier.QuerierHelper(self, self.schema)
@@ -220,7 +219,7 @@
         self._cnxsets_pool = Queue.Queue()
         # 0. init a cnxset that will be used to fetch bootstrap information from
         #    the database
-        self._cnxsets_pool.put_nowait(pool.ConnectionsSet(self.sources))
+        self._cnxsets_pool.put_nowait(pool.ConnectionsSet(self.system_source))
         # 1. set used cubes
         if config.creating or not config.read_instance_schema:
             config.bootstrap_cubes()
@@ -251,8 +250,7 @@
         if config.creating:
             # call init_creating so that for instance native source can
             # configurate tsearch according to postgres version
-            for source in self.sources:
-                source.init_creating()
+            self.system_source.init_creating()
         else:
             self.init_sources_from_database()
             if 'CWProperty' in self.schema:
@@ -262,7 +260,7 @@
         self._get_cnxset().close(True)
         self.cnxsets = [] # list of available cnxsets (can't iterate on a Queue)
         for i in xrange(config['connections-pool-size']):
-            self.cnxsets.append(pool.ConnectionsSet(self.sources))
+            self.cnxsets.append(pool.ConnectionsSet(self.system_source))
             self._cnxsets_pool.put_nowait(self.cnxsets[-1])
 
     # internals ###############################################################
@@ -286,8 +284,7 @@
                 self.add_source(sourceent)
 
     def _clear_planning_caches(self):
-        for cache in ('source_defs', 'is_multi_sources_relation'):
-            clear_cache(self, cache)
+        clear_cache(self, 'source_defs')
 
     def add_source(self, sourceent):
         source = self.get_source(sourceent.type, sourceent.name,
@@ -324,8 +321,6 @@
         else:
             self.vreg._set_schema(schema)
         self.querier.set_schema(schema)
-        # don't use self.sources, we may want to give schema even to disabled
-        # sources
         for source in self.sources_by_uri.itervalues():
             source.set_schema(schema)
         self.schema = schema
@@ -996,8 +991,7 @@
             except KeyError:
                 etype = None
             rqlcache.pop( ('Any X WHERE X eid %s' % eid,), None)
-            for source in self.sources:
-                source.clear_eid_cache(eid, etype)
+            self.system_source.clear_eid_cache(eid, etype)
 
     def type_from_eid(self, eid, session=None):
         """return the type of the entity with id <eid>"""
@@ -1504,15 +1498,6 @@
         self.info('repository re-registered as a pyro object %s',
                   self.pyro_appid)
 
-    # multi-sources planner helpers ###########################################
-
-    @cached
-    def is_multi_sources_relation(self, rtype):
-        warn('[3.18] old multi-source system will go away in the next version',
-             DeprecationWarning)
-        return any(source for source in self.sources
-                   if not source is self.system_source
-                   and source.support_relation(rtype))
 
     # these are overridden by set_log_methods below
     # only defining here to prevent pylint from complaining
--- a/server/ssplanner.py	Fri Jun 21 16:18:20 2013 +0200
+++ b/server/ssplanner.py	Fri Jun 21 16:01:59 2013 +0200
@@ -1,4 +1,4 @@
-# copyright 2003-2012 LOGILAB S.A. (Paris, FRANCE), all rights reserved.
+# copyright 2003-2013 LOGILAB S.A. (Paris, FRANCE), all rights reserved.
 # contact http://www.logilab.fr/ -- mailto:contact@logilab.fr
 #
 # This file is part of CubicWeb.
@@ -145,7 +145,7 @@
         the rqlst should not be tagged at this point.
         """
         plan.preprocess(rqlst)
-        return (OneFetchStep(plan, rqlst, plan.session.repo.sources),)
+        return (OneFetchStep(plan, rqlst),)
 
     def build_insert_plan(self, plan, rqlst):
         """get an execution plan from an INSERT RQL query"""
@@ -311,24 +311,6 @@
         maprepr[var] = '%s.%s' % (tablesinorder[table], col)
     return maprepr
 
-def offset_result(offset, result):
-    offset -= len(result)
-    if offset < 0:
-        result = result[offset:]
-        offset = None
-    elif offset == 0:
-        offset = None
-        result = ()
-    return offset, result
-
-
-class LimitOffsetMixIn(object):
-    limit = offset = None
-    def set_limit_offset(self, limit, offset):
-        self.limit = limit
-        self.offset = offset or None
-
-
 class Step(object):
     """base abstract class for execution step"""
     def __init__(self, plan):
@@ -357,22 +339,14 @@
             [step.test_repr() for step in self.children],)
 
 
-class OneFetchStep(LimitOffsetMixIn, Step):
+class OneFetchStep(Step):
     """step consisting in fetching data from sources and directly returning
     results
     """
-    def __init__(self, plan, union, sources, inputmap=None):
+    def __init__(self, plan, union, inputmap=None):
         Step.__init__(self, plan)
         self.union = union
-        self.sources = sources
         self.inputmap = inputmap
-        self.set_limit_offset(union.children[-1].limit, union.children[-1].offset)
-
-    def set_limit_offset(self, limit, offset):
-        LimitOffsetMixIn.set_limit_offset(self, limit, offset)
-        for select in self.union.children:
-            select.limit = limit
-            select.offset = offset
 
     def execute(self):
         """call .syntax_tree_search with the given syntax tree on each
@@ -395,31 +369,9 @@
             cachekey = tuple(cachekey)
         else:
             cachekey = union.as_string()
-        result = []
-        # limit / offset processing
-        limit = self.limit
-        offset = self.offset
-        if offset is not None:
-            if len(self.sources) > 1:
-                # we'll have to deal with limit/offset by ourself
-                if union.children[-1].limit:
-                    union.children[-1].limit = limit + offset
-                union.children[-1].offset = None
-            else:
-                offset, limit = None, None
-        for source in self.sources:
-            if offset is None and limit is not None:
-                # modifying the sample rqlst is enough since sql generation
-                # will pick it here as well
-                union.children[-1].limit = limit - len(result)
-            result_ = source.syntax_tree_search(session, union, args, cachekey,
-                                                inputmap)
-            if offset is not None:
-                offset, result_ = offset_result(offset, result_)
-            result += result_
-            if limit is not None:
-                if len(result) >= limit:
-                    return result[:limit]
+        # get results for query
+        source = session.repo.system_source
+        result = source.syntax_tree_search(session, union, args, cachekey, inputmap)
         #print 'ONEFETCH RESULT %s' % (result)
         return result
 
@@ -432,8 +384,7 @@
         return (self.__class__.__name__,
                 sorted((r.as_string(kwargs=self.plan.args), r.solutions)
                        for r in self.union.children),
-                self.limit, self.offset,
-                sorted(self.sources), inputmap)
+                inputmap)
 
 
 # UPDATE/INSERT/DELETE steps ##################################################
--- a/server/test/unittest_datafeed.py	Fri Jun 21 16:18:20 2013 +0200
+++ b/server/test/unittest_datafeed.py	Fri Jun 21 16:01:59 2013 +0200
@@ -31,7 +31,6 @@
     def test(self):
         self.assertIn('myfeed', self.repo.sources_by_uri)
         dfsource = self.repo.sources_by_uri['myfeed']
-        self.assertNotIn(dfsource, self.repo.sources)
         self.assertEqual(dfsource.latest_retrieval, None)
         self.assertEqual(dfsource.synchro_interval, timedelta(seconds=60))
         self.assertFalse(dfsource.fresh())
--- a/server/test/unittest_ssplanner.py	Fri Jun 21 16:18:20 2013 +0200
+++ b/server/test/unittest_ssplanner.py	Fri Jun 21 16:01:59 2013 +0200
@@ -51,8 +51,7 @@
                                        [{'X': 'Basket', 'XN': 'String'},
                                         {'X': 'State', 'XN': 'String'},
                                         {'X': 'Folder', 'XN': 'String'}])],
-                     None, None,
-                     [self.system], None, [])])
+                     None, [])])
 
     def test_groupeded_ambigous_sol(self):
         self._test('Any XN,COUNT(X) GROUPBY XN WHERE X name XN, X is IN (Basket, State, Folder)',
@@ -60,8 +59,7 @@
                                        [{'X': 'Basket', 'XN': 'String'},
                                         {'X': 'State', 'XN': 'String'},
                                         {'X': 'Folder', 'XN': 'String'}])],
-                     None, None,
-                     [self.system], None, [])])
+                     None, [])])
 
 if __name__ == '__main__':
     from logilab.common.testlib import unittest_main