diff -r 000000000000 -r b97547f5f1fa server/pool.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/server/pool.py Wed Nov 05 15:52:50 2008 +0100 @@ -0,0 +1,273 @@ +"""CubicWeb server connections pool : + +* the rql repository has a limited number of connections pools, each of them + dealing with a set of connections on each source used by the repository + +* operation may be registered by hooks during a transaction, which will be + fired when the pool is commited or rollbacked + +This module defined the `ConnectionsPool` class and a set of abstract classes +for operation. + + +:organization: Logilab +:copyright: 2001-2008 LOGILAB S.A. (Paris, FRANCE), all rights reserved. +:contact: http://www.logilab.fr/ -- mailto:contact@logilab.fr +""" +__docformat__ = "restructuredtext en" + +import sys + +class ConnectionsPool(object): + """handle connections on a set of sources, at some point associated to a + user session + """ + + def __init__(self, sources): + # dictionnary of (source, connection), indexed by sources'uri + self.source_cnxs = {} + for source in sources: + self.source_cnxs[source.uri] = (source, source.get_connection()) + if not 'system' in self.source_cnxs: + self.source_cnxs['system'] = self.source_cnxs[sources[0].uri] + self._cursors = {} + + def commit(self): + """commit the current transaction for this user""" + # FIXME: what happends if a commit fail + # would need a two phases commit or like, but I don't know how to do + # this using the db-api... + for source, cnx in self.source_cnxs.values(): + # let exception propagates + cnx.commit() + + def rollback(self): + """rollback the current transaction for this user""" + for source, cnx in self.source_cnxs.values(): + # catch exceptions, rollback other sources anyway + try: + cnx.rollback() + except: + source.critical('rollback error', exc_info=sys.exc_info()) + + def close(self, i_know_what_i_do=False): + """close all connections in the pool""" + if i_know_what_i_do is not True: # unexpected closing safety belt + raise RuntimeError('pool shouldn\'t be closed') + for cu in self._cursors.values(): + try: + cu.close() + except: + continue + for _, cnx in self.source_cnxs.values(): + try: + cnx.close() + except: + continue + + # internals ############################################################### + + def pool_set(self, session): + """pool is being set""" + self.check_connections() + + def pool_reset(self, session): + """pool is being reseted""" + for source, cnx in self.source_cnxs.values(): + source.pool_reset(cnx) + + def __getitem__(self, uri): + """subscription notation provide access to sources'cursors""" + try: + cursor = self._cursors[uri] + except KeyError: + cursor = self.source_cnxs[uri][1].cursor() + if cursor is not None: + # None possible on sources without cursor support such as ldap + self._cursors[uri] = cursor + return cursor + + def sources(self): + """return the source objects handled by this pool""" + # implementation details of flying insert requires the system source + # first + yield self.source_cnxs['system'] + for uri, (source, cursor) in self.source_cnxs.items(): + if uri == 'system': + continue + yield source + #return [source_cnx[0] for source_cnx in self.source_cnxs.values()] + + def source(self, uid): + """return the source object with the given uri""" + return self.source_cnxs[uid][0] + + def connection(self, uid): + """return the connection on the source object with the given uri""" + return self.source_cnxs[uid][1] + + def reconnect(self, source): + """reopen a connection for this source""" + source.info('trying to reconnect') + self.source_cnxs[source.uri] = (source, source.get_connection()) + del self._cursors[source.uri] + + def check_connections(self): + for source, cnx in self.source_cnxs.itervalues(): + newcnx = source.check_connection(cnx) + if newcnx is not None: + self.reset_connection(source, newcnx) + + def reset_connection(self, source, cnx): + self.source_cnxs[source.uri] = (source, cnx) + self._cursors.pop(source.uri, None) + + +class Operation(object): + """an operation is triggered on connections pool events related to + commit / rollback transations. Possible events are: + + precommit: + the pool is preparing to commit. You shouldn't do anything things which + has to be reverted if the commit fail at this point, but you can freely + do any heavy computation or raise an exception if the commit can't go. + You can add some new operation during this phase but their precommit + event won't be triggered + + commit: + the pool is preparing to commit. You should avoid to do to expensive + stuff or something that may cause an exception in this event + + revertcommit: + if an operation failed while commited, this event is triggered for + all operations which had their commit event already to let them + revert things (including the operation which made fail the commit) + + rollback: + the transaction has been either rollbacked either + * intentionaly + * a precommit event failed, all operations are rollbacked + * a commit event failed, all operations which are not been triggered for + commit are rollbacked + + order of operations may be important, and is controlled according to: + * operation's class + """ + + def __init__(self, session, **kwargs): + self.session = session + self.user = session.user + self.repo = session.repo + self.schema = session.repo.schema + self.config = session.repo.config + self.__dict__.update(kwargs) + self.register(session) + # execution information + self.processed = None # 'precommit', 'commit' + self.failed = False + + def register(self, session): + session.add_operation(self, self.insert_index()) + + def insert_index(self): + """return the index of the lastest instance which is not a + LateOperation instance + """ + for i, op in enumerate(self.session.pending_operations): + if isinstance(op, (LateOperation, SingleLastOperation)): + return i + return None + + def handle_event(self, event): + """delegate event handling to the opertaion""" + getattr(self, event)() + + def precommit_event(self): + """the observed connections pool is preparing a commit""" + + def revertprecommit_event(self): + """an error went when pre-commiting this operation or a later one + + should revert pre-commit's changes but take care, they may have not + been all considered if it's this operation which failed + """ + + def commit_event(self): + """the observed connections pool is commiting""" + raise NotImplementedError() + + def revertcommit_event(self): + """an error went when commiting this operation or a later one + + should revert commit's changes but take care, they may have not + been all considered if it's this operation which failed + """ + + def rollback_event(self): + """the observed connections pool has been rollbacked + + do nothing by default, the operation will just be removed from the pool + operation list + """ + + +class PreCommitOperation(Operation): + """base class for operation only defining a precommit operation + """ + + def precommit_event(self): + """the observed connections pool is preparing a commit""" + raise NotImplementedError() + + def commit_event(self): + """the observed connections pool is commiting""" + + +class LateOperation(Operation): + """special operation which should be called after all possible (ie non late) + operations + """ + def insert_index(self): + """return the index of the lastest instance which is not a + SingleLastOperation instance + """ + for i, op in enumerate(self.session.pending_operations): + if isinstance(op, SingleLastOperation): + return i + return None + + +class SingleOperation(Operation): + """special operation which should be called once""" + def register(self, session): + """override register to handle cases where this operation has already + been added + """ + operations = session.pending_operations + index = self.equivalent_index(operations) + if index is not None: + equivalent = operations.pop(index) + else: + equivalent = None + session.add_operation(self, self.insert_index()) + return equivalent + + def equivalent_index(self, operations): + """return the index of the equivalent operation if any""" + equivalents = [i for i, op in enumerate(operations) + if op.__class__ is self.__class__] + if equivalents: + return equivalents[0] + return None + + +class SingleLastOperation(SingleOperation): + """special operation which should be called once and after all other + operations + """ + def insert_index(self): + return None + +from logging import getLogger +from cubicweb import set_log_methods +set_log_methods(Operation, getLogger('cubicweb.session'))