[multi-sources-removal] Kill repo.sources
there is only the system source in there now!
Related to #2919300
--- a/devtools/__init__.py Fri Jun 21 16:18:20 2013 +0200
+++ b/devtools/__init__.py Fri Jun 21 16:01:59 2013 +0200
@@ -120,8 +120,7 @@
repo._type_source_cache = {}
repo._extid_cache = {}
repo.querier._rql_cache = {}
- for source in repo.sources:
- source.reset_caches()
+ repo.system_source.reset_caches()
repo._needs_refresh = False
--- a/devtools/fake.py Fri Jun 21 16:18:20 2013 +0200
+++ b/devtools/fake.py Fri Jun 21 16:01:59 2013 +0200
@@ -173,7 +173,6 @@
self.config = config or FakeConfig()
self.vreg = vreg or CWRegistryStore(self.config, initlog=False)
self.vreg.schema = schema
- self.sources = []
def internal_session(self):
return FakeSession(self)
--- a/devtools/repotest.py Fri Jun 21 16:18:20 2013 +0200
+++ b/devtools/repotest.py Fri Jun 21 16:01:59 2013 +0200
@@ -277,35 +277,19 @@
class BasePlannerTC(BaseQuerierTC):
- newsources = ()
def setup(self):
- clear_cache(self.repo, 'is_multi_sources_relation')
# XXX source_defs
self.o = self.repo.querier
self.session = self.repo._sessions.values()[0]
self.cnxset = self.session.set_cnxset()
self.schema = self.o.schema
- self.sources = self.o._repo.sources
- self.system = self.sources[-1]
+ self.system = self.repo.system_source
do_monkey_patch()
self._dumb_sessions = [] # by hi-jacked parent setup
self.repo.vreg.rqlhelper.backend = 'postgres' # so FTIRANK is considered
- self.newsources = []
-
- def add_source(self, sourcecls, uri):
- source = sourcecls(self.repo, {'uri': uri, 'type': 'whatever'})
- if not source.copy_based_source:
- self.sources.append(source)
- self.newsources.append(source)
- self.repo.sources_by_uri[uri] = source
- setattr(self, uri, source)
def tearDown(self):
- for source in self.newsources:
- if not source.copy_based_source:
- self.sources.remove(source)
- del self.repo.sources_by_uri[source.uri]
undo_monkey_patch()
for session in self._dumb_sessions:
if session._cnx.cnxset is not None:
--- a/hooks/__init__.py Fri Jun 21 16:18:20 2013 +0200
+++ b/hooks/__init__.py Fri Jun 21 16:01:59 2013 +0200
@@ -53,12 +53,10 @@
def __call__(self):
def update_feeds(repo):
- # don't iter on repo.sources which doesn't include copy based
- # sources (the one we're looking for)
# take a list to avoid iterating on a dictionary whose size may
# change
- for source in list(repo.sources_by_eid.values()):
- if (not source.copy_based_source
+ for uri, source in list(repo.sources_by_uri.iteritems()):
+ if (uri == 'system'
or not repo.config.source_enabled(source)
or not source.config['synchronize']):
continue
--- a/server/__init__.py Fri Jun 21 16:18:20 2013 +0200
+++ b/server/__init__.py Fri Jun 21 16:01:59 2013 +0200
@@ -284,7 +284,6 @@
config._cubes = None # avoid assertion error
repo, cnx = in_memory_repo_cnx(config, login, password=pwd)
repo.system_source.eid = ssource.eid # redo this manually
- assert len(repo.sources) == 1, repo.sources
handler = config.migration_handler(schema, interactive=False,
cnx=cnx, repo=repo)
# install additional driver specific sql files
--- a/server/migractions.py Fri Jun 21 16:18:20 2013 +0200
+++ b/server/migractions.py Fri Jun 21 16:01:59 2013 +0200
@@ -187,18 +187,18 @@
open(backupfile,'w').close() # kinda lock
os.chmod(backupfile, 0600)
# backup
+ source = repo.system_source
tmpdir = tempfile.mkdtemp()
try:
failed = False
- for source in repo.sources:
- try:
- source.backup(osp.join(tmpdir, source.uri), self.confirm, format=format)
- except Exception as ex:
- print '-> error trying to backup %s [%s]' % (source.uri, ex)
- if not self.confirm('Continue anyway?', default='n'):
- raise SystemExit(1)
- else:
- failed = True
+ try:
+ source.backup(osp.join(tmpdir, source.uri), self.confirm, format=format)
+ except Exception as ex:
+ print '-> error trying to backup %s [%s]' % (source.uri, ex)
+ if not self.confirm('Continue anyway?', default='n'):
+ raise SystemExit(1)
+ else:
+ failed = True
with open(osp.join(tmpdir, 'format.txt'), 'w') as format_file:
format_file.write('%s\n' % format)
with open(osp.join(tmpdir, 'versions.txt'), 'w') as version_file:
@@ -247,15 +247,13 @@
format = written_format
self.config.init_cnxset_pool = False
repo = self.repo_connect()
- for source in repo.sources:
- if systemonly and source.uri != 'system':
- continue
- try:
- source.restore(osp.join(tmpdir, source.uri), self.confirm, drop, format)
- except Exception as exc:
- print '-> error trying to restore %s [%s]' % (source.uri, exc)
- if not self.confirm('Continue anyway?', default='n'):
- raise SystemExit(1)
+ source = repo.system_source
+ try:
+ source.restore(osp.join(tmpdir, source.uri), self.confirm, drop, format)
+ except Exception as exc:
+ print '-> error trying to restore %s [%s]' % (source.uri, exc)
+ if not self.confirm('Continue anyway?', default='n'):
+ raise SystemExit(1)
shutil.rmtree(tmpdir)
# call hooks
repo.init_cnxset_pool()
--- a/server/pool.py Fri Jun 21 16:18:20 2013 +0200
+++ b/server/pool.py Fri Jun 21 16:01:59 2013 +0200
@@ -1,4 +1,4 @@
-# copyright 2003-2012 LOGILAB S.A. (Paris, FRANCE), all rights reserved.
+# copyright 2003-2013 LOGILAB S.A. (Paris, FRANCE), all rights reserved.
# contact http://www.logilab.fr/ -- mailto:contact@logilab.fr
#
# This file is part of CubicWeb.
@@ -29,17 +29,17 @@
:class:`Session`
"""
- def __init__(self, sources):
+ # since 3.19, we only have to manage the system source connection
+ def __init__(self, system_source):
# dictionary of (source, connection), indexed by sources'uri
self.source_cnxs = {}
- for source in sources:
- self.add_source(source)
- if not 'system' in self.source_cnxs:
- self.source_cnxs['system'] = self.source_cnxs[sources[0].uri]
+ self.source_cnxs['system'] = (system_source,
+ system_source.get_connection())
self._cursors = {}
def __getitem__(self, uri):
"""subscription notation provide access to sources'cursors"""
+ assert uri == 'system'
try:
cursor = self._cursors[uri]
except KeyError:
@@ -49,15 +49,6 @@
self._cursors[uri] = cursor
return cursor
- def add_source(self, source):
- assert not source.uri in self.source_cnxs
- self.source_cnxs[source.uri] = (source, source.get_connection())
-
- def remove_source(self, source):
- source, cnx = self.source_cnxs.pop(source.uri)
- cnx.close()
- self._cursors.pop(source.uri, None)
-
def commit(self):
"""commit the current transaction for this user"""
# FIXME: what happends if a commit fail
@@ -110,11 +101,6 @@
# implementation details of flying insert requires the system source
# first
yield self.source_cnxs['system'][0]
- for uri, (source, cnx) in self.source_cnxs.items():
- if uri == 'system':
- continue
- yield source
- #return [source_cnx[0] for source_cnx in self.source_cnxs.itervalues()]
def source(self, uid):
"""return the source object with the given uri"""
--- a/server/repository.py Fri Jun 21 16:18:20 2013 +0200
+++ b/server/repository.py Fri Jun 21 16:01:59 2013 +0200
@@ -190,7 +190,6 @@
# sources (additional sources info in the system database)
self.system_source = self.get_source('native', 'system',
config.sources()['system'].copy())
- self.sources = [self.system_source]
self.sources_by_uri = {'system': self.system_source}
# querier helper, need to be created after sources initialization
self.querier = querier.QuerierHelper(self, self.schema)
@@ -220,7 +219,7 @@
self._cnxsets_pool = Queue.Queue()
# 0. init a cnxset that will be used to fetch bootstrap information from
# the database
- self._cnxsets_pool.put_nowait(pool.ConnectionsSet(self.sources))
+ self._cnxsets_pool.put_nowait(pool.ConnectionsSet(self.system_source))
# 1. set used cubes
if config.creating or not config.read_instance_schema:
config.bootstrap_cubes()
@@ -251,8 +250,7 @@
if config.creating:
# call init_creating so that for instance native source can
# configurate tsearch according to postgres version
- for source in self.sources:
- source.init_creating()
+ self.system_source.init_creating()
else:
self.init_sources_from_database()
if 'CWProperty' in self.schema:
@@ -262,7 +260,7 @@
self._get_cnxset().close(True)
self.cnxsets = [] # list of available cnxsets (can't iterate on a Queue)
for i in xrange(config['connections-pool-size']):
- self.cnxsets.append(pool.ConnectionsSet(self.sources))
+ self.cnxsets.append(pool.ConnectionsSet(self.system_source))
self._cnxsets_pool.put_nowait(self.cnxsets[-1])
# internals ###############################################################
@@ -286,8 +284,7 @@
self.add_source(sourceent)
def _clear_planning_caches(self):
- for cache in ('source_defs', 'is_multi_sources_relation'):
- clear_cache(self, cache)
+ clear_cache(self, 'source_defs')
def add_source(self, sourceent):
source = self.get_source(sourceent.type, sourceent.name,
@@ -324,8 +321,6 @@
else:
self.vreg._set_schema(schema)
self.querier.set_schema(schema)
- # don't use self.sources, we may want to give schema even to disabled
- # sources
for source in self.sources_by_uri.itervalues():
source.set_schema(schema)
self.schema = schema
@@ -996,8 +991,7 @@
except KeyError:
etype = None
rqlcache.pop( ('Any X WHERE X eid %s' % eid,), None)
- for source in self.sources:
- source.clear_eid_cache(eid, etype)
+ self.system_source.clear_eid_cache(eid, etype)
def type_from_eid(self, eid, session=None):
"""return the type of the entity with id <eid>"""
@@ -1504,15 +1498,6 @@
self.info('repository re-registered as a pyro object %s',
self.pyro_appid)
- # multi-sources planner helpers ###########################################
-
- @cached
- def is_multi_sources_relation(self, rtype):
- warn('[3.18] old multi-source system will go away in the next version',
- DeprecationWarning)
- return any(source for source in self.sources
- if not source is self.system_source
- and source.support_relation(rtype))
# these are overridden by set_log_methods below
# only defining here to prevent pylint from complaining
--- a/server/ssplanner.py Fri Jun 21 16:18:20 2013 +0200
+++ b/server/ssplanner.py Fri Jun 21 16:01:59 2013 +0200
@@ -1,4 +1,4 @@
-# copyright 2003-2012 LOGILAB S.A. (Paris, FRANCE), all rights reserved.
+# copyright 2003-2013 LOGILAB S.A. (Paris, FRANCE), all rights reserved.
# contact http://www.logilab.fr/ -- mailto:contact@logilab.fr
#
# This file is part of CubicWeb.
@@ -145,7 +145,7 @@
the rqlst should not be tagged at this point.
"""
plan.preprocess(rqlst)
- return (OneFetchStep(plan, rqlst, plan.session.repo.sources),)
+ return (OneFetchStep(plan, rqlst),)
def build_insert_plan(self, plan, rqlst):
"""get an execution plan from an INSERT RQL query"""
@@ -311,24 +311,6 @@
maprepr[var] = '%s.%s' % (tablesinorder[table], col)
return maprepr
-def offset_result(offset, result):
- offset -= len(result)
- if offset < 0:
- result = result[offset:]
- offset = None
- elif offset == 0:
- offset = None
- result = ()
- return offset, result
-
-
-class LimitOffsetMixIn(object):
- limit = offset = None
- def set_limit_offset(self, limit, offset):
- self.limit = limit
- self.offset = offset or None
-
-
class Step(object):
"""base abstract class for execution step"""
def __init__(self, plan):
@@ -357,22 +339,14 @@
[step.test_repr() for step in self.children],)
-class OneFetchStep(LimitOffsetMixIn, Step):
+class OneFetchStep(Step):
"""step consisting in fetching data from sources and directly returning
results
"""
- def __init__(self, plan, union, sources, inputmap=None):
+ def __init__(self, plan, union, inputmap=None):
Step.__init__(self, plan)
self.union = union
- self.sources = sources
self.inputmap = inputmap
- self.set_limit_offset(union.children[-1].limit, union.children[-1].offset)
-
- def set_limit_offset(self, limit, offset):
- LimitOffsetMixIn.set_limit_offset(self, limit, offset)
- for select in self.union.children:
- select.limit = limit
- select.offset = offset
def execute(self):
"""call .syntax_tree_search with the given syntax tree on each
@@ -395,31 +369,9 @@
cachekey = tuple(cachekey)
else:
cachekey = union.as_string()
- result = []
- # limit / offset processing
- limit = self.limit
- offset = self.offset
- if offset is not None:
- if len(self.sources) > 1:
- # we'll have to deal with limit/offset by ourself
- if union.children[-1].limit:
- union.children[-1].limit = limit + offset
- union.children[-1].offset = None
- else:
- offset, limit = None, None
- for source in self.sources:
- if offset is None and limit is not None:
- # modifying the sample rqlst is enough since sql generation
- # will pick it here as well
- union.children[-1].limit = limit - len(result)
- result_ = source.syntax_tree_search(session, union, args, cachekey,
- inputmap)
- if offset is not None:
- offset, result_ = offset_result(offset, result_)
- result += result_
- if limit is not None:
- if len(result) >= limit:
- return result[:limit]
+ # get results for query
+ source = session.repo.system_source
+ result = source.syntax_tree_search(session, union, args, cachekey, inputmap)
#print 'ONEFETCH RESULT %s' % (result)
return result
@@ -432,8 +384,7 @@
return (self.__class__.__name__,
sorted((r.as_string(kwargs=self.plan.args), r.solutions)
for r in self.union.children),
- self.limit, self.offset,
- sorted(self.sources), inputmap)
+ inputmap)
# UPDATE/INSERT/DELETE steps ##################################################
--- a/server/test/unittest_datafeed.py Fri Jun 21 16:18:20 2013 +0200
+++ b/server/test/unittest_datafeed.py Fri Jun 21 16:01:59 2013 +0200
@@ -31,7 +31,6 @@
def test(self):
self.assertIn('myfeed', self.repo.sources_by_uri)
dfsource = self.repo.sources_by_uri['myfeed']
- self.assertNotIn(dfsource, self.repo.sources)
self.assertEqual(dfsource.latest_retrieval, None)
self.assertEqual(dfsource.synchro_interval, timedelta(seconds=60))
self.assertFalse(dfsource.fresh())
--- a/server/test/unittest_ssplanner.py Fri Jun 21 16:18:20 2013 +0200
+++ b/server/test/unittest_ssplanner.py Fri Jun 21 16:01:59 2013 +0200
@@ -51,8 +51,7 @@
[{'X': 'Basket', 'XN': 'String'},
{'X': 'State', 'XN': 'String'},
{'X': 'Folder', 'XN': 'String'}])],
- None, None,
- [self.system], None, [])])
+ None, [])])
def test_groupeded_ambigous_sol(self):
self._test('Any XN,COUNT(X) GROUPBY XN WHERE X name XN, X is IN (Basket, State, Folder)',
@@ -60,8 +59,7 @@
[{'X': 'Basket', 'XN': 'String'},
{'X': 'State', 'XN': 'String'},
{'X': 'Folder', 'XN': 'String'}])],
- None, None,
- [self.system], None, [])])
+ None, [])])
if __name__ == '__main__':
from logilab.common.testlib import unittest_main