server/pool.py
branchtls-sprint
changeset 1802 d628defebc17
parent 1132 96752791c2b6
child 1977 606923dff11b
equal deleted inserted replaced
1801:672acc730ce5 1802:d628defebc17
     1 """CubicWeb server connections pool :
     1 """CubicWeb server connections pool :
     2 
     2 
     3 * the rql repository has a limited number of connections pools, each of them
     3 * the rql repository has a limited number of connections pools, each of them
     4   dealing with a set of connections on each source used by the repository
     4   dealing with a set of connections on each source used by the repository
     5   
     5 
     6 * operation may be registered by hooks during a transaction, which will  be
     6 * operation may be registered by hooks during a transaction, which will  be
     7   fired when the pool is commited or rollbacked
     7   fired when the pool is commited or rollbacked
     8 
     8 
     9 This module defined the `ConnectionsPool` class and a set of abstract classes
     9 This module defined the `ConnectionsPool` class and a set of abstract classes
    10 for operation.
    10 for operation.
    15 :contact: http://www.logilab.fr/ -- mailto:contact@logilab.fr
    15 :contact: http://www.logilab.fr/ -- mailto:contact@logilab.fr
    16 """
    16 """
    17 __docformat__ = "restructuredtext en"
    17 __docformat__ = "restructuredtext en"
    18 
    18 
    19 import sys
    19 import sys
    20     
    20 
    21 class ConnectionsPool(object):
    21 class ConnectionsPool(object):
    22     """handle connections on a set of sources, at some point associated to a
    22     """handle connections on a set of sources, at some point associated to a
    23     user session
    23     user session
    24     """
    24     """
    25 
    25 
    38         # would need a two phases commit or like, but I don't know how to do
    38         # would need a two phases commit or like, but I don't know how to do
    39         # this using the db-api...
    39         # this using the db-api...
    40         for source, cnx in self.source_cnxs.values():
    40         for source, cnx in self.source_cnxs.values():
    41             # let exception propagates
    41             # let exception propagates
    42             cnx.commit()
    42             cnx.commit()
    43         
    43 
    44     def rollback(self):
    44     def rollback(self):
    45         """rollback the current transaction for this user"""
    45         """rollback the current transaction for this user"""
    46         for source, cnx in self.source_cnxs.values():
    46         for source, cnx in self.source_cnxs.values():
    47             # catch exceptions, rollback other sources anyway
    47             # catch exceptions, rollback other sources anyway
    48             try:
    48             try:
    62         for _, cnx in self.source_cnxs.values():
    62         for _, cnx in self.source_cnxs.values():
    63             try:
    63             try:
    64                 cnx.close()
    64                 cnx.close()
    65             except:
    65             except:
    66                 continue
    66                 continue
    67             
    67 
    68     # internals ###############################################################
    68     # internals ###############################################################
    69 
    69 
    70     def pool_set(self, session):
    70     def pool_set(self, session):
    71         """pool is being set"""
    71         """pool is being set"""
    72         self.check_connections()
    72         self.check_connections()
    73 
    73 
    74     def pool_reset(self, session):
    74     def pool_reset(self, session):
    75         """pool is being reseted"""
    75         """pool is being reseted"""
    76         for source, cnx in self.source_cnxs.values():
    76         for source, cnx in self.source_cnxs.values():
    77             source.pool_reset(cnx)
    77             source.pool_reset(cnx)
    78         
    78 
    79     def __getitem__(self, uri):
    79     def __getitem__(self, uri):
    80         """subscription notation provide access to sources'cursors"""
    80         """subscription notation provide access to sources'cursors"""
    81         try:
    81         try:
    82             cursor = self._cursors[uri]
    82             cursor = self._cursors[uri]
    83         except KeyError:
    83         except KeyError:
    84             cursor = self.source_cnxs[uri][1].cursor()
    84             cursor = self.source_cnxs[uri][1].cursor()
    85             if cursor is not None:
    85             if cursor is not None:
    86                 # None possible on sources without cursor support such as ldap
    86                 # None possible on sources without cursor support such as ldap
    87                 self._cursors[uri] = cursor
    87                 self._cursors[uri] = cursor
    88         return cursor
    88         return cursor
    89     
    89 
    90     def sources(self):
    90     def sources(self):
    91         """return the source objects handled by this pool"""
    91         """return the source objects handled by this pool"""
    92         # implementation details of flying insert requires the system source
    92         # implementation details of flying insert requires the system source
    93         # first
    93         # first
    94         yield self.source_cnxs['system']
    94         yield self.source_cnxs['system']
    95         for uri, (source, cursor) in self.source_cnxs.items():
    95         for uri, (source, cursor) in self.source_cnxs.items():
    96             if uri == 'system':
    96             if uri == 'system':
    97                 continue
    97                 continue
    98             yield source
    98             yield source
    99         #return [source_cnx[0] for source_cnx in self.source_cnxs.values()]
    99         #return [source_cnx[0] for source_cnx in self.source_cnxs.values()]
   100     
   100 
   101     def source(self, uid):
   101     def source(self, uid):
   102         """return the source object with the given uri"""
   102         """return the source object with the given uri"""
   103         return self.source_cnxs[uid][0]
   103         return self.source_cnxs[uid][0]
   104     
   104 
   105     def connection(self, uid):
   105     def connection(self, uid):
   106         """return the connection on the source object with the given uri"""
   106         """return the connection on the source object with the given uri"""
   107         return self.source_cnxs[uid][1]
   107         return self.source_cnxs[uid][1]
   108 
   108 
   109     def reconnect(self, source):
   109     def reconnect(self, source):
   110         """reopen a connection for this source"""
   110         """reopen a connection for this source"""
   111         source.info('trying to reconnect')
   111         source.info('trying to reconnect')
   112         self.source_cnxs[source.uri] = (source, source.get_connection())        
   112         self.source_cnxs[source.uri] = (source, source.get_connection())
   113         del self._cursors[source.uri]
   113         del self._cursors[source.uri]
   114 
   114 
   115     def check_connections(self):
   115     def check_connections(self):
   116         for source, cnx in self.source_cnxs.itervalues():
   116         for source, cnx in self.source_cnxs.itervalues():
   117             newcnx = source.check_connection(cnx)
   117             newcnx = source.check_connection(cnx)
   131       the pool is preparing to commit. You shouldn't do anything things which
   131       the pool is preparing to commit. You shouldn't do anything things which
   132       has to be reverted if the commit fail at this point, but you can freely
   132       has to be reverted if the commit fail at this point, but you can freely
   133       do any heavy computation or raise an exception if the commit can't go.
   133       do any heavy computation or raise an exception if the commit can't go.
   134       You can add some new operation during this phase but their precommit
   134       You can add some new operation during this phase but their precommit
   135       event won't be triggered
   135       event won't be triggered
   136       
   136 
   137     commit:
   137     commit:
   138       the pool is preparing to commit. You should avoid to do to expensive
   138       the pool is preparing to commit. You should avoid to do to expensive
   139       stuff or something that may cause an exception in this event
   139       stuff or something that may cause an exception in this event
   140       
   140 
   141     revertcommit:
   141     revertcommit:
   142       if an operation failed while commited, this event is triggered for
   142       if an operation failed while commited, this event is triggered for
   143       all operations which had their commit event already to let them
   143       all operations which had their commit event already to let them
   144       revert things (including the operation which made fail the commit)
   144       revert things (including the operation which made fail the commit)
   145 
   145 
   151         commit are rollbacked
   151         commit are rollbacked
   152 
   152 
   153     order of operations may be important, and is controlled according to:
   153     order of operations may be important, and is controlled according to:
   154     * operation's class
   154     * operation's class
   155     """
   155     """
   156     
   156 
   157     def __init__(self, session, **kwargs):
   157     def __init__(self, session, **kwargs):
   158         self.session = session
   158         self.session = session
   159         self.user = session.user
   159         self.user = session.user
   160         self.repo = session.repo
   160         self.repo = session.repo
   161         self.schema = session.repo.schema
   161         self.schema = session.repo.schema
   163         self.__dict__.update(kwargs)
   163         self.__dict__.update(kwargs)
   164         self.register(session)
   164         self.register(session)
   165         # execution information
   165         # execution information
   166         self.processed = None # 'precommit', 'commit'
   166         self.processed = None # 'precommit', 'commit'
   167         self.failed = False
   167         self.failed = False
   168         
   168 
   169     def register(self, session):
   169     def register(self, session):
   170         session.add_operation(self, self.insert_index())
   170         session.add_operation(self, self.insert_index())
   171         
   171 
   172     def insert_index(self):
   172     def insert_index(self):
   173         """return the index of  the lastest instance which is not a
   173         """return the index of  the lastest instance which is not a
   174         LateOperation instance
   174         LateOperation instance
   175         """
   175         """
   176         for i, op in enumerate(self.session.pending_operations):
   176         for i, op in enumerate(self.session.pending_operations):
   177             if isinstance(op, (LateOperation, SingleLastOperation)):
   177             if isinstance(op, (LateOperation, SingleLastOperation)):
   178                 return i
   178                 return i
   179         return None
   179         return None
   180     
   180 
   181     def handle_event(self, event):
   181     def handle_event(self, event):
   182         """delegate event handling to the opertaion"""
   182         """delegate event handling to the opertaion"""
   183         getattr(self, event)()
   183         getattr(self, event)()
   184 
   184 
   185     def precommit_event(self):
   185     def precommit_event(self):
   186         """the observed connections pool is preparing a commit"""
   186         """the observed connections pool is preparing a commit"""
   187     
   187 
   188     def revertprecommit_event(self):
   188     def revertprecommit_event(self):
   189         """an error went when pre-commiting this operation or a later one
   189         """an error went when pre-commiting this operation or a later one
   190         
   190 
   191         should revert pre-commit's changes but take care, they may have not
   191         should revert pre-commit's changes but take care, they may have not
   192         been all considered if it's this operation which failed
   192         been all considered if it's this operation which failed
   193         """
   193         """
   194 
   194 
   195     def commit_event(self):
   195     def commit_event(self):
   196         """the observed connections pool is commiting"""
   196         """the observed connections pool is commiting"""
   197         raise NotImplementedError()
   197         raise NotImplementedError()
   198     
   198 
   199     def revertcommit_event(self):
   199     def revertcommit_event(self):
   200         """an error went when commiting this operation or a later one
   200         """an error went when commiting this operation or a later one
   201         
   201 
   202         should revert commit's changes but take care, they may have not
   202         should revert commit's changes but take care, they may have not
   203         been all considered if it's this operation which failed
   203         been all considered if it's this operation which failed
   204         """
   204         """
   205     
   205 
   206     def rollback_event(self):
   206     def rollback_event(self):
   207         """the observed connections pool has been rollbacked
   207         """the observed connections pool has been rollbacked
   208         
   208 
   209         do nothing by default, the operation will just be removed from the pool
   209         do nothing by default, the operation will just be removed from the pool
   210         operation list
   210         operation list
   211         """
   211         """
   212 
   212 
   213 
   213 
   224 
   224 
   225 
   225 
   226 class LateOperation(Operation):
   226 class LateOperation(Operation):
   227     """special operation which should be called after all possible (ie non late)
   227     """special operation which should be called after all possible (ie non late)
   228     operations
   228     operations
   229     """    
   229     """
   230     def insert_index(self):
   230     def insert_index(self):
   231         """return the index of  the lastest instance which is not a
   231         """return the index of  the lastest instance which is not a
   232         SingleLastOperation instance
   232         SingleLastOperation instance
   233         """
   233         """
   234         for i, op in enumerate(self.session.pending_operations):
   234         for i, op in enumerate(self.session.pending_operations):
   236                 return i
   236                 return i
   237         return None
   237         return None
   238 
   238 
   239 
   239 
   240 class SingleOperation(Operation):
   240 class SingleOperation(Operation):
   241     """special operation which should be called once"""    
   241     """special operation which should be called once"""
   242     def register(self, session):
   242     def register(self, session):
   243         """override register to handle cases where this operation has already
   243         """override register to handle cases where this operation has already
   244         been added
   244         been added
   245         """
   245         """
   246         operations = session.pending_operations
   246         operations = session.pending_operations
   249             equivalent = operations.pop(index)
   249             equivalent = operations.pop(index)
   250         else:
   250         else:
   251             equivalent = None
   251             equivalent = None
   252         session.add_operation(self, self.insert_index())
   252         session.add_operation(self, self.insert_index())
   253         return equivalent
   253         return equivalent
   254     
   254 
   255     def equivalent_index(self, operations):
   255     def equivalent_index(self, operations):
   256         """return the index of the equivalent operation if any"""
   256         """return the index of the equivalent operation if any"""
   257         equivalents = [i for i, op in enumerate(operations)
   257         equivalents = [i for i, op in enumerate(operations)
   258                        if op.__class__ is self.__class__]
   258                        if op.__class__ is self.__class__]
   259         if equivalents:
   259         if equivalents:
   262 
   262 
   263 
   263 
   264 class SingleLastOperation(SingleOperation):
   264 class SingleLastOperation(SingleOperation):
   265     """special operation which should be called once and after all other
   265     """special operation which should be called once and after all other
   266     operations
   266     operations
   267     """    
   267     """
   268     def insert_index(self):
   268     def insert_index(self):
   269         return None
   269         return None
   270 
   270 
   271 from logging import getLogger
   271 from logging import getLogger
   272 from cubicweb import set_log_methods
   272 from cubicweb import set_log_methods