--- a/server/repository.py Thu May 19 10:36:26 2011 +0200
+++ b/server/repository.py Thu May 19 10:53:11 2011 +0200
@@ -164,9 +164,9 @@
self._type_source_cache = {}
# cache (extid, source uri) -> eid
self._extid_cache = {}
- # open some connections pools
- if config.open_connections_pools:
- self.open_connections_pools()
+ # open some connections set
+ if config.init_cnxset_pool:
+ self.init_cnxset_pool()
@onevent('after-registry-reload', self)
def fix_user_classes(self):
usercls = self.vreg['etypes'].etype_class('CWUser')
@@ -174,10 +174,10 @@
if not isinstance(session.user, InternalManager):
session.user.__class__ = usercls
- def open_connections_pools(self):
+ def init_cnxset_pool(self):
config = self.config
- self._available_pools = Queue.Queue()
- self._available_pools.put_nowait(pool.ConnectionsPool(self.sources))
+ self._cnxsets_pool = Queue.Queue()
+ self._cnxsets_pool.put_nowait(pool.ConnectionsSet(self.sources))
if config.quick_start:
# quick start, usually only to get a minimal repository to get cubes
# information (eg dump/restore/...)
@@ -219,14 +219,14 @@
# configurate tsearch according to postgres version
for source in self.sources:
source.init_creating()
- # close initialization pool and reopen fresh ones for proper
+ # close initialization connetions set and reopen fresh ones for proper
# initialization now that we know cubes
- self._get_pool().close(True)
- # list of available pools (we can't iterate on Queue instance)
- self.pools = []
+ self._get_cnxset().close(True)
+ # list of available_cnxsets (we can't iterate on Queue instance)
+ self.cnxsets = []
for i in xrange(config['connections-pool-size']):
- self.pools.append(pool.ConnectionsPool(self.sources))
- self._available_pools.put_nowait(self.pools[-1])
+ self.cnxsets.append(pool.ConnectionsSet(self.sources))
+ self._cnxsets_pool.put_nowait(self.cnxsets[-1])
if config.quick_start:
config.init_cubes(self.get_cubes())
self.hm = hook.HooksManager(self.vreg)
@@ -249,7 +249,7 @@
self.sources_by_eid[sourceent.eid] = self.system_source
self.system_source.init(True, sourceent)
continue
- self.add_source(sourceent, add_to_pools=False)
+ self.add_source(sourceent, add_to_cnxsets=False)
finally:
session.close()
@@ -258,7 +258,7 @@
'can_cross_relation', 'rel_type_sources'):
clear_cache(self, cache)
- def add_source(self, sourceent, add_to_pools=True):
+ def add_source(self, sourceent, add_to_cnxsets=True):
source = self.get_source(sourceent.type, sourceent.name,
sourceent.host_config, sourceent.eid)
self.sources_by_eid[sourceent.eid] = source
@@ -266,15 +266,15 @@
if self.config.source_enabled(source):
# call source's init method to complete their initialisation if
# needed (for instance looking for persistent configuration using an
- # internal session, which is not possible until pools have been
+ # internal session, which is not possible until connections sets have been
# initialized)
source.init(True, sourceent)
if not source.copy_based_source:
self.sources.append(source)
self.querier.set_planner()
- if add_to_pools:
- for pool in self.pools:
- pool.add_source(source)
+ if add_to_cnxsets:
+ for cnxset in self.cnxsets:
+ cnxset.add_source(source)
else:
source.init(False, sourceent)
self._clear_planning_caches()
@@ -285,8 +285,8 @@
if self.config.source_enabled(source) and not source.copy_based_source:
self.sources.remove(source)
self.querier.set_planner()
- for pool in self.pools:
- pool.remove_source(source)
+ for cnxset in self.cnxsets:
+ cnxset.remove_source(source)
self._clear_planning_caches()
def get_source(self, type, uri, source_config, eid=None):
@@ -373,25 +373,25 @@
t.start()
#@locked
- def _get_pool(self):
+ def _get_cnxset(self):
try:
- return self._available_pools.get(True, timeout=5)
+ return self._cnxsets_pool.get(True, timeout=5)
except Queue.Empty:
- raise Exception('no pool available after 5 secs, probably either a '
+ raise Exception('no connections set available after 5 secs, probably either a '
'bug in code (too many uncommited/rollbacked '
'connections) or too much load on the server (in '
'which case you can try to set a bigger '
- 'connections pools size)')
+ 'connections pool size)')
- def _free_pool(self, pool):
- self._available_pools.put_nowait(pool)
+ def _free_cnxset(self, cnxset):
+ self._cnxsets_pool.put_nowait(cnxset)
def pinfo(self):
- # XXX: session.pool is accessed from a local storage, would be interesting
- # to see if there is a pool set in any thread specific data)
- return '%s: %s (%s)' % (self._available_pools.qsize(),
+ # XXX: session.cnxset is accessed from a local storage, would be interesting
+ # to see if there is a cnxset set in any thread specific data)
+ return '%s: %s (%s)' % (self._cnxsets_pool.qsize(),
','.join(session.user.login for session in self._sessions.values()
- if session.pool),
+ if session.cnxset),
threading.currentThread())
def shutdown(self):
"""called on server stop event to properly close opened sessions and
@@ -414,12 +414,12 @@
or self.config.quick_start):
self.hm.call_hooks('server_shutdown', repo=self)
self.close_sessions()
- while not self._available_pools.empty():
- pool = self._available_pools.get_nowait()
+ while not self._cnxsets_pool.empty():
+ cnxset = self._cnxsets_pool.get_nowait()
try:
- pool.close(True)
+ cnxset.close(True)
except:
- self.exception('error while closing %s' % pool)
+ self.exception('error while closing %s' % cnxset)
continue
if self.pyro_registered:
if self._use_pyrons():
@@ -501,7 +501,7 @@
results['nb_open_sessions'] = len(self._sessions)
results['nb_active_threads'] = threading.activeCount()
results['looping_tasks'] = ', '.join(str(t) for t in self._looping_tasks)
- results['available_pools'] = self._available_pools.qsize()
+ results['available_cnxsets'] = self._cnxsets_pool.qsize()
results['threads'] = ', '.join(sorted(str(t) for t in threading.enumerate()))
return results
@@ -543,9 +543,9 @@
_, sourceuri, extid = self.type_and_source_from_eid(foreid)
if sourceuri == 'system':
return self.config[option]
- pool = self._get_pool()
+ cnxset = self._get_cnxset()
try:
- cnx = pool.connection(sourceuri)
+ cnx = cnxset.connection(sourceuri)
# needed to check connection is valid and usable by the current
# thread
newcnx = self.sources_by_uri[sourceuri].check_connection(cnx)
@@ -553,7 +553,7 @@
cnx = newcnx
return cnx.get_option_value(option, extid)
finally:
- self._free_pool(pool)
+ self._free_cnxset(cnxset)
@cached
def get_versions(self, checkversions=False):
@@ -726,7 +726,7 @@
* build_descr is a flag indicating if the description should be
built on select queries
"""
- session = self._get_session(sessionid, setpool=True, txid=txid)
+ session = self._get_session(sessionid, setcnxset=True, txid=txid)
try:
try:
rset = self.querier.execute(session, rqlstring, args,
@@ -752,21 +752,21 @@
self.exception('unexpected error while executing %s with %s', rqlstring, args)
raise
finally:
- session.reset_pool()
+ session.free_cnxset()
def describe(self, sessionid, eid, txid=None):
"""return a tuple (type, source, extid) for the entity with id <eid>"""
- session = self._get_session(sessionid, setpool=True, txid=txid)
+ session = self._get_session(sessionid, setcnxset=True, txid=txid)
try:
return self.type_and_source_from_eid(eid, session)
finally:
- session.reset_pool()
+ session.free_cnxset()
def check_session(self, sessionid):
"""raise `BadConnectionId` if the connection is no more valid, else
return its latest activity timestamp.
"""
- return self._get_session(sessionid, setpool=False).timestamp
+ return self._get_session(sessionid, setcnxset=False).timestamp
def get_shared_data(self, sessionid, key, default=None, pop=False, txdata=False):
"""return value associated to key in the session's data dictionary or
@@ -777,7 +777,7 @@
If key isn't defined in the dictionnary, value specified by the
`default` argument will be returned.
"""
- session = self._get_session(sessionid, setpool=False)
+ session = self._get_session(sessionid, setcnxset=False)
return session.get_shared_data(key, default, pop, txdata)
def set_shared_data(self, sessionid, key, value, txdata=False):
@@ -787,7 +787,7 @@
transaction's data which are cleared on commit/rollback of the current
transaction.
"""
- session = self._get_session(sessionid, setpool=False)
+ session = self._get_session(sessionid, setcnxset=False)
session.set_shared_data(key, value, txdata)
def commit(self, sessionid, txid=None):
@@ -816,10 +816,10 @@
def close(self, sessionid, txid=None, checkshuttingdown=True):
"""close the session with the given id"""
- session = self._get_session(sessionid, setpool=True, txid=txid,
+ session = self._get_session(sessionid, setcnxset=True, txid=txid,
checkshuttingdown=checkshuttingdown)
# operation uncommited before close are rollbacked before hook is called
- session.rollback(reset_pool=False)
+ session.rollback(free_cnxset=False)
self.hm.call_hooks('session_close', session)
# commit session at this point in case write operation has been done
# during `session_close` hooks
@@ -834,7 +834,7 @@
* update user information on each user's request (i.e. groups and
custom properties)
"""
- session = self._get_session(sessionid, setpool=False)
+ session = self._get_session(sessionid, setcnxset=False)
if props is not None:
self.set_session_props(sessionid, props)
user = session.user
@@ -846,43 +846,43 @@
* update user information on each user's request (i.e. groups and
custom properties)
"""
- session = self._get_session(sessionid, setpool=False)
+ session = self._get_session(sessionid, setcnxset=False)
for prop, value in props.items():
session.change_property(prop, value)
def undoable_transactions(self, sessionid, ueid=None, txid=None,
**actionfilters):
"""See :class:`cubicweb.dbapi.Connection.undoable_transactions`"""
- session = self._get_session(sessionid, setpool=True, txid=txid)
+ session = self._get_session(sessionid, setcnxset=True, txid=txid)
try:
return self.system_source.undoable_transactions(session, ueid,
**actionfilters)
finally:
- session.reset_pool()
+ session.free_cnxset()
def transaction_info(self, sessionid, txuuid, txid=None):
"""See :class:`cubicweb.dbapi.Connection.transaction_info`"""
- session = self._get_session(sessionid, setpool=True, txid=txid)
+ session = self._get_session(sessionid, setcnxset=True, txid=txid)
try:
return self.system_source.tx_info(session, txuuid)
finally:
- session.reset_pool()
+ session.free_cnxset()
def transaction_actions(self, sessionid, txuuid, public=True, txid=None):
"""See :class:`cubicweb.dbapi.Connection.transaction_actions`"""
- session = self._get_session(sessionid, setpool=True, txid=txid)
+ session = self._get_session(sessionid, setcnxset=True, txid=txid)
try:
return self.system_source.tx_actions(session, txuuid, public)
finally:
- session.reset_pool()
+ session.free_cnxset()
def undo_transaction(self, sessionid, txuuid, txid=None):
"""See :class:`cubicweb.dbapi.Connection.undo_transaction`"""
- session = self._get_session(sessionid, setpool=True, txid=txid)
+ session = self._get_session(sessionid, setcnxset=True, txid=txid)
try:
return self.system_source.undo_transaction(session, txuuid)
finally:
- session.reset_pool()
+ session.free_cnxset()
# public (inter-repository) interface #####################################
@@ -934,14 +934,14 @@
"""return a dbapi like connection/cursor using internal user which
have every rights on the repository. You'll *have to* commit/rollback
or close (rollback implicitly) the session once the job's done, else
- you'll leak connections pool up to the time where no more pool is
+ you'll leak connections set up to the time where no one is
available, causing irremediable freeze...
"""
session = InternalSession(self, cnxprops)
- session.set_pool()
+ session.set_cnxset()
return session
- def _get_session(self, sessionid, setpool=False, txid=None,
+ def _get_session(self, sessionid, setcnxset=False, txid=None,
checkshuttingdown=True):
"""return the user associated to the given session identifier"""
if checkshuttingdown and self.shutting_down:
@@ -950,9 +950,9 @@
session = self._sessions[sessionid]
except KeyError:
raise BadConnectionId('No such session %s' % sessionid)
- if setpool:
- session.set_tx_data(txid) # must be done before set_pool
- session.set_pool()
+ if setcnxset:
+ session.set_tx_data(txid) # must be done before set_cnxset
+ session.set_cnxset()
return session
# data sources handling ###################################################
@@ -970,15 +970,15 @@
except KeyError:
if session is None:
session = self.internal_session()
- reset_pool = True
+ free_cnxset = True
else:
- reset_pool = False
+ free_cnxset = False
try:
etype, uri, extid = self.system_source.eid_type_source(session,
eid)
finally:
- if reset_pool:
- session.reset_pool()
+ if free_cnxset:
+ session.free_cnxset()
self._type_source_cache[eid] = (etype, uri, extid)
if uri != 'system':
self._extid_cache[(extid, uri)] = eid
@@ -1039,16 +1039,16 @@
return self._extid_cache[cachekey]
except KeyError:
pass
- reset_pool = False
+ free_cnxset = False
if session is None:
session = self.internal_session()
- reset_pool = True
+ free_cnxset = True
eid = self.system_source.extid2eid(session, uri, extid)
if eid is not None:
self._extid_cache[cachekey] = eid
self._type_source_cache[eid] = (etype, uri, extid)
- if reset_pool:
- session.reset_pool()
+ if free_cnxset:
+ session.free_cnxset()
return eid
if not insert:
return
@@ -1060,7 +1060,7 @@
# processing a commit, we have to use another one
if not session.is_internal_session:
session = self.internal_session()
- reset_pool = True
+ free_cnxset = True
try:
eid = self.system_source.create_eid(session)
self._extid_cache[cachekey] = eid
@@ -1074,10 +1074,10 @@
source.after_entity_insertion(session, extid, entity, sourceparams)
if source.should_call_hooks:
self.hm.call_hooks('after_add_entity', session, entity=entity)
- session.commit(reset_pool)
+ session.commit(free_cnxset)
return eid
except:
- session.rollback(reset_pool)
+ session.rollback(free_cnxset)
raise
def add_info(self, session, entity, source, extid=None, complete=True):