--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/server/pool.py Wed Nov 05 15:52:50 2008 +0100
@@ -0,0 +1,273 @@
+"""CubicWeb server connections pool :
+
+* the rql repository has a limited number of connections pools, each of them
+ dealing with a set of connections on each source used by the repository
+
+* operation may be registered by hooks during a transaction, which will be
+ fired when the pool is commited or rollbacked
+
+This module defined the `ConnectionsPool` class and a set of abstract classes
+for operation.
+
+
+:organization: Logilab
+:copyright: 2001-2008 LOGILAB S.A. (Paris, FRANCE), all rights reserved.
+:contact: http://www.logilab.fr/ -- mailto:contact@logilab.fr
+"""
+__docformat__ = "restructuredtext en"
+
+import sys
+
+class ConnectionsPool(object):
+ """handle connections on a set of sources, at some point associated to a
+ user session
+ """
+
+ def __init__(self, sources):
+ # dictionnary of (source, connection), indexed by sources'uri
+ self.source_cnxs = {}
+ for source in sources:
+ self.source_cnxs[source.uri] = (source, source.get_connection())
+ if not 'system' in self.source_cnxs:
+ self.source_cnxs['system'] = self.source_cnxs[sources[0].uri]
+ self._cursors = {}
+
+ def commit(self):
+ """commit the current transaction for this user"""
+ # FIXME: what happends if a commit fail
+ # would need a two phases commit or like, but I don't know how to do
+ # this using the db-api...
+ for source, cnx in self.source_cnxs.values():
+ # let exception propagates
+ cnx.commit()
+
+ def rollback(self):
+ """rollback the current transaction for this user"""
+ for source, cnx in self.source_cnxs.values():
+ # catch exceptions, rollback other sources anyway
+ try:
+ cnx.rollback()
+ except:
+ source.critical('rollback error', exc_info=sys.exc_info())
+
+ def close(self, i_know_what_i_do=False):
+ """close all connections in the pool"""
+ if i_know_what_i_do is not True: # unexpected closing safety belt
+ raise RuntimeError('pool shouldn\'t be closed')
+ for cu in self._cursors.values():
+ try:
+ cu.close()
+ except:
+ continue
+ for _, cnx in self.source_cnxs.values():
+ try:
+ cnx.close()
+ except:
+ continue
+
+ # internals ###############################################################
+
+ def pool_set(self, session):
+ """pool is being set"""
+ self.check_connections()
+
+ def pool_reset(self, session):
+ """pool is being reseted"""
+ for source, cnx in self.source_cnxs.values():
+ source.pool_reset(cnx)
+
+ def __getitem__(self, uri):
+ """subscription notation provide access to sources'cursors"""
+ try:
+ cursor = self._cursors[uri]
+ except KeyError:
+ cursor = self.source_cnxs[uri][1].cursor()
+ if cursor is not None:
+ # None possible on sources without cursor support such as ldap
+ self._cursors[uri] = cursor
+ return cursor
+
+ def sources(self):
+ """return the source objects handled by this pool"""
+ # implementation details of flying insert requires the system source
+ # first
+ yield self.source_cnxs['system']
+ for uri, (source, cursor) in self.source_cnxs.items():
+ if uri == 'system':
+ continue
+ yield source
+ #return [source_cnx[0] for source_cnx in self.source_cnxs.values()]
+
+ def source(self, uid):
+ """return the source object with the given uri"""
+ return self.source_cnxs[uid][0]
+
+ def connection(self, uid):
+ """return the connection on the source object with the given uri"""
+ return self.source_cnxs[uid][1]
+
+ def reconnect(self, source):
+ """reopen a connection for this source"""
+ source.info('trying to reconnect')
+ self.source_cnxs[source.uri] = (source, source.get_connection())
+ del self._cursors[source.uri]
+
+ def check_connections(self):
+ for source, cnx in self.source_cnxs.itervalues():
+ newcnx = source.check_connection(cnx)
+ if newcnx is not None:
+ self.reset_connection(source, newcnx)
+
+ def reset_connection(self, source, cnx):
+ self.source_cnxs[source.uri] = (source, cnx)
+ self._cursors.pop(source.uri, None)
+
+
+class Operation(object):
+ """an operation is triggered on connections pool events related to
+ commit / rollback transations. Possible events are:
+
+ precommit:
+ the pool is preparing to commit. You shouldn't do anything things which
+ has to be reverted if the commit fail at this point, but you can freely
+ do any heavy computation or raise an exception if the commit can't go.
+ You can add some new operation during this phase but their precommit
+ event won't be triggered
+
+ commit:
+ the pool is preparing to commit. You should avoid to do to expensive
+ stuff or something that may cause an exception in this event
+
+ revertcommit:
+ if an operation failed while commited, this event is triggered for
+ all operations which had their commit event already to let them
+ revert things (including the operation which made fail the commit)
+
+ rollback:
+ the transaction has been either rollbacked either
+ * intentionaly
+ * a precommit event failed, all operations are rollbacked
+ * a commit event failed, all operations which are not been triggered for
+ commit are rollbacked
+
+ order of operations may be important, and is controlled according to:
+ * operation's class
+ """
+
+ def __init__(self, session, **kwargs):
+ self.session = session
+ self.user = session.user
+ self.repo = session.repo
+ self.schema = session.repo.schema
+ self.config = session.repo.config
+ self.__dict__.update(kwargs)
+ self.register(session)
+ # execution information
+ self.processed = None # 'precommit', 'commit'
+ self.failed = False
+
+ def register(self, session):
+ session.add_operation(self, self.insert_index())
+
+ def insert_index(self):
+ """return the index of the lastest instance which is not a
+ LateOperation instance
+ """
+ for i, op in enumerate(self.session.pending_operations):
+ if isinstance(op, (LateOperation, SingleLastOperation)):
+ return i
+ return None
+
+ def handle_event(self, event):
+ """delegate event handling to the opertaion"""
+ getattr(self, event)()
+
+ def precommit_event(self):
+ """the observed connections pool is preparing a commit"""
+
+ def revertprecommit_event(self):
+ """an error went when pre-commiting this operation or a later one
+
+ should revert pre-commit's changes but take care, they may have not
+ been all considered if it's this operation which failed
+ """
+
+ def commit_event(self):
+ """the observed connections pool is commiting"""
+ raise NotImplementedError()
+
+ def revertcommit_event(self):
+ """an error went when commiting this operation or a later one
+
+ should revert commit's changes but take care, they may have not
+ been all considered if it's this operation which failed
+ """
+
+ def rollback_event(self):
+ """the observed connections pool has been rollbacked
+
+ do nothing by default, the operation will just be removed from the pool
+ operation list
+ """
+
+
+class PreCommitOperation(Operation):
+ """base class for operation only defining a precommit operation
+ """
+
+ def precommit_event(self):
+ """the observed connections pool is preparing a commit"""
+ raise NotImplementedError()
+
+ def commit_event(self):
+ """the observed connections pool is commiting"""
+
+
+class LateOperation(Operation):
+ """special operation which should be called after all possible (ie non late)
+ operations
+ """
+ def insert_index(self):
+ """return the index of the lastest instance which is not a
+ SingleLastOperation instance
+ """
+ for i, op in enumerate(self.session.pending_operations):
+ if isinstance(op, SingleLastOperation):
+ return i
+ return None
+
+
+class SingleOperation(Operation):
+ """special operation which should be called once"""
+ def register(self, session):
+ """override register to handle cases where this operation has already
+ been added
+ """
+ operations = session.pending_operations
+ index = self.equivalent_index(operations)
+ if index is not None:
+ equivalent = operations.pop(index)
+ else:
+ equivalent = None
+ session.add_operation(self, self.insert_index())
+ return equivalent
+
+ def equivalent_index(self, operations):
+ """return the index of the equivalent operation if any"""
+ equivalents = [i for i, op in enumerate(operations)
+ if op.__class__ is self.__class__]
+ if equivalents:
+ return equivalents[0]
+ return None
+
+
+class SingleLastOperation(SingleOperation):
+ """special operation which should be called once and after all other
+ operations
+ """
+ def insert_index(self):
+ return None
+
+from logging import getLogger
+from cubicweb import set_log_methods
+set_log_methods(Operation, getLogger('cubicweb.session'))