server/pool.py
changeset 0 b97547f5f1fa
child 1132 96752791c2b6
equal deleted inserted replaced
-1:000000000000 0:b97547f5f1fa
       
     1 """CubicWeb server connections pool :
       
     2 
       
     3 * the rql repository has a limited number of connections pools, each of them
       
     4   dealing with a set of connections on each source used by the repository
       
     5   
       
     6 * operation may be registered by hooks during a transaction, which will  be
       
     7   fired when the pool is commited or rollbacked
       
     8 
       
     9 This module defined the `ConnectionsPool` class and a set of abstract classes
       
    10 for operation.
       
    11 
       
    12 
       
    13 :organization: Logilab
       
    14 :copyright: 2001-2008 LOGILAB S.A. (Paris, FRANCE), all rights reserved.
       
    15 :contact: http://www.logilab.fr/ -- mailto:contact@logilab.fr
       
    16 """
       
    17 __docformat__ = "restructuredtext en"
       
    18 
       
    19 import sys
       
    20     
       
    21 class ConnectionsPool(object):
       
    22     """handle connections on a set of sources, at some point associated to a
       
    23     user session
       
    24     """
       
    25 
       
    26     def __init__(self, sources):
       
    27         # dictionnary of (source, connection), indexed by sources'uri
       
    28         self.source_cnxs = {}
       
    29         for source in sources:
       
    30             self.source_cnxs[source.uri] = (source, source.get_connection())
       
    31         if not 'system' in self.source_cnxs:
       
    32             self.source_cnxs['system'] = self.source_cnxs[sources[0].uri]
       
    33         self._cursors = {}
       
    34 
       
    35     def commit(self):
       
    36         """commit the current transaction for this user"""
       
    37         # FIXME: what happends if a commit fail
       
    38         # would need a two phases commit or like, but I don't know how to do
       
    39         # this using the db-api...
       
    40         for source, cnx in self.source_cnxs.values():
       
    41             # let exception propagates
       
    42             cnx.commit()
       
    43         
       
    44     def rollback(self):
       
    45         """rollback the current transaction for this user"""
       
    46         for source, cnx in self.source_cnxs.values():
       
    47             # catch exceptions, rollback other sources anyway
       
    48             try:
       
    49                 cnx.rollback()
       
    50             except:
       
    51                 source.critical('rollback error', exc_info=sys.exc_info())
       
    52 
       
    53     def close(self, i_know_what_i_do=False):
       
    54         """close all connections in the pool"""
       
    55         if i_know_what_i_do is not True: # unexpected closing safety belt
       
    56             raise RuntimeError('pool shouldn\'t be closed')
       
    57         for cu in self._cursors.values():
       
    58             try:
       
    59                 cu.close()
       
    60             except:
       
    61                 continue
       
    62         for _, cnx in self.source_cnxs.values():
       
    63             try:
       
    64                 cnx.close()
       
    65             except:
       
    66                 continue
       
    67             
       
    68     # internals ###############################################################
       
    69 
       
    70     def pool_set(self, session):
       
    71         """pool is being set"""
       
    72         self.check_connections()
       
    73 
       
    74     def pool_reset(self, session):
       
    75         """pool is being reseted"""
       
    76         for source, cnx in self.source_cnxs.values():
       
    77             source.pool_reset(cnx)
       
    78         
       
    79     def __getitem__(self, uri):
       
    80         """subscription notation provide access to sources'cursors"""
       
    81         try:
       
    82             cursor = self._cursors[uri]
       
    83         except KeyError:
       
    84             cursor = self.source_cnxs[uri][1].cursor()
       
    85             if cursor is not None:
       
    86                 # None possible on sources without cursor support such as ldap
       
    87                 self._cursors[uri] = cursor
       
    88         return cursor
       
    89     
       
    90     def sources(self):
       
    91         """return the source objects handled by this pool"""
       
    92         # implementation details of flying insert requires the system source
       
    93         # first
       
    94         yield self.source_cnxs['system']
       
    95         for uri, (source, cursor) in self.source_cnxs.items():
       
    96             if uri == 'system':
       
    97                 continue
       
    98             yield source
       
    99         #return [source_cnx[0] for source_cnx in self.source_cnxs.values()]
       
   100     
       
   101     def source(self, uid):
       
   102         """return the source object with the given uri"""
       
   103         return self.source_cnxs[uid][0]
       
   104     
       
   105     def connection(self, uid):
       
   106         """return the connection on the source object with the given uri"""
       
   107         return self.source_cnxs[uid][1]
       
   108 
       
   109     def reconnect(self, source):
       
   110         """reopen a connection for this source"""
       
   111         source.info('trying to reconnect')
       
   112         self.source_cnxs[source.uri] = (source, source.get_connection())        
       
   113         del self._cursors[source.uri]
       
   114 
       
   115     def check_connections(self):
       
   116         for source, cnx in self.source_cnxs.itervalues():
       
   117             newcnx = source.check_connection(cnx)
       
   118             if newcnx is not None:
       
   119                 self.reset_connection(source, newcnx)
       
   120 
       
   121     def reset_connection(self, source, cnx):
       
   122         self.source_cnxs[source.uri] = (source, cnx)
       
   123         self._cursors.pop(source.uri, None)
       
   124 
       
   125 
       
   126 class Operation(object):
       
   127     """an operation is triggered on connections pool events related to
       
   128     commit / rollback transations. Possible events are:
       
   129 
       
   130     precommit:
       
   131       the pool is preparing to commit. You shouldn't do anything things which
       
   132       has to be reverted if the commit fail at this point, but you can freely
       
   133       do any heavy computation or raise an exception if the commit can't go.
       
   134       You can add some new operation during this phase but their precommit
       
   135       event won't be triggered
       
   136       
       
   137     commit:
       
   138       the pool is preparing to commit. You should avoid to do to expensive
       
   139       stuff or something that may cause an exception in this event
       
   140       
       
   141     revertcommit:
       
   142       if an operation failed while commited, this event is triggered for
       
   143       all operations which had their commit event already to let them
       
   144       revert things (including the operation which made fail the commit)
       
   145 
       
   146     rollback:
       
   147       the transaction has been either rollbacked either
       
   148       * intentionaly
       
   149       * a precommit event failed, all operations are rollbacked
       
   150       * a commit event failed, all operations which are not been triggered for
       
   151         commit are rollbacked
       
   152 
       
   153     order of operations may be important, and is controlled according to:
       
   154     * operation's class
       
   155     """
       
   156     
       
   157     def __init__(self, session, **kwargs):
       
   158         self.session = session
       
   159         self.user = session.user
       
   160         self.repo = session.repo
       
   161         self.schema = session.repo.schema
       
   162         self.config = session.repo.config
       
   163         self.__dict__.update(kwargs)
       
   164         self.register(session)
       
   165         # execution information
       
   166         self.processed = None # 'precommit', 'commit'
       
   167         self.failed = False
       
   168         
       
   169     def register(self, session):
       
   170         session.add_operation(self, self.insert_index())
       
   171         
       
   172     def insert_index(self):
       
   173         """return the index of  the lastest instance which is not a
       
   174         LateOperation instance
       
   175         """
       
   176         for i, op in enumerate(self.session.pending_operations):
       
   177             if isinstance(op, (LateOperation, SingleLastOperation)):
       
   178                 return i
       
   179         return None
       
   180     
       
   181     def handle_event(self, event):
       
   182         """delegate event handling to the opertaion"""
       
   183         getattr(self, event)()
       
   184 
       
   185     def precommit_event(self):
       
   186         """the observed connections pool is preparing a commit"""
       
   187     
       
   188     def revertprecommit_event(self):
       
   189         """an error went when pre-commiting this operation or a later one
       
   190         
       
   191         should revert pre-commit's changes but take care, they may have not
       
   192         been all considered if it's this operation which failed
       
   193         """
       
   194 
       
   195     def commit_event(self):
       
   196         """the observed connections pool is commiting"""
       
   197         raise NotImplementedError()
       
   198     
       
   199     def revertcommit_event(self):
       
   200         """an error went when commiting this operation or a later one
       
   201         
       
   202         should revert commit's changes but take care, they may have not
       
   203         been all considered if it's this operation which failed
       
   204         """
       
   205     
       
   206     def rollback_event(self):
       
   207         """the observed connections pool has been rollbacked
       
   208         
       
   209         do nothing by default, the operation will just be removed from the pool
       
   210         operation list
       
   211         """
       
   212 
       
   213 
       
   214 class PreCommitOperation(Operation):
       
   215     """base class for operation only defining a precommit operation
       
   216     """
       
   217 
       
   218     def precommit_event(self):
       
   219         """the observed connections pool is preparing a commit"""
       
   220         raise NotImplementedError()
       
   221 
       
   222     def commit_event(self):
       
   223         """the observed connections pool is commiting"""
       
   224 
       
   225 
       
   226 class LateOperation(Operation):
       
   227     """special operation which should be called after all possible (ie non late)
       
   228     operations
       
   229     """    
       
   230     def insert_index(self):
       
   231         """return the index of  the lastest instance which is not a
       
   232         SingleLastOperation instance
       
   233         """
       
   234         for i, op in enumerate(self.session.pending_operations):
       
   235             if isinstance(op, SingleLastOperation):
       
   236                 return i
       
   237         return None
       
   238 
       
   239 
       
   240 class SingleOperation(Operation):
       
   241     """special operation which should be called once"""    
       
   242     def register(self, session):
       
   243         """override register to handle cases where this operation has already
       
   244         been added
       
   245         """
       
   246         operations = session.pending_operations
       
   247         index = self.equivalent_index(operations)
       
   248         if index is not None:
       
   249             equivalent = operations.pop(index)
       
   250         else:
       
   251             equivalent = None
       
   252         session.add_operation(self, self.insert_index())
       
   253         return equivalent
       
   254     
       
   255     def equivalent_index(self, operations):
       
   256         """return the index of the equivalent operation if any"""
       
   257         equivalents = [i for i, op in enumerate(operations)
       
   258                        if op.__class__ is self.__class__]
       
   259         if equivalents:
       
   260             return equivalents[0]
       
   261         return None
       
   262 
       
   263 
       
   264 class SingleLastOperation(SingleOperation):
       
   265     """special operation which should be called once and after all other
       
   266     operations
       
   267     """    
       
   268     def insert_index(self):
       
   269         return None
       
   270 
       
   271 from logging import getLogger
       
   272 from cubicweb import set_log_methods
       
   273 set_log_methods(Operation, getLogger('cubicweb.session'))