server/pool.py
brancholdstable
changeset 4985 02b52bf9f5f8
parent 4721 8f63691ccb7f
child 5421 8167de96c523
equal deleted inserted replaced
4563:c25da7573ebd 4985:02b52bf9f5f8
     1 """CubicWeb server connections pool :
     1 """CubicWeb server connections pool : the repository has a limited number of
     2 
     2 connections pools, each of them dealing with a set of connections on each source
     3 * the rql repository has a limited number of connections pools, each of them
     3 used by the repository. A connections pools (`ConnectionsPool`) is an
     4   dealing with a set of connections on each source used by the repository
     4 abstraction for a group of connection to each source.
     5 
       
     6 * operation may be registered by hooks during a transaction, which will  be
       
     7   fired when the pool is commited or rollbacked
       
     8 
       
     9 This module defined the `ConnectionsPool` class and a set of abstract classes
       
    10 for operation.
       
    11 
     5 
    12 
     6 
    13 :organization: Logilab
     7 :organization: Logilab
    14 :copyright: 2001-2010 LOGILAB S.A. (Paris, FRANCE), license is LGPL v2.
     8 :copyright: 2001-2010 LOGILAB S.A. (Paris, FRANCE), license is LGPL v2.
    15 :contact: http://www.logilab.fr/ -- mailto:contact@logilab.fr
     9 :contact: http://www.logilab.fr/ -- mailto:contact@logilab.fr
    91     def sources(self):
    85     def sources(self):
    92         """return the source objects handled by this pool"""
    86         """return the source objects handled by this pool"""
    93         # implementation details of flying insert requires the system source
    87         # implementation details of flying insert requires the system source
    94         # first
    88         # first
    95         yield self.source_cnxs['system'][0]
    89         yield self.source_cnxs['system'][0]
    96         for uri, (source, cursor) in self.source_cnxs.items():
    90         for uri, (source, cnx) in self.source_cnxs.items():
    97             if uri == 'system':
    91             if uri == 'system':
    98                 continue
    92                 continue
    99             yield source
    93             yield source
   100         #return [source_cnx[0] for source_cnx in self.source_cnxs.values()]
    94         #return [source_cnx[0] for source_cnx in self.source_cnxs.values()]
   101 
    95 
   113         if source is None:
   107         if source is None:
   114             sources = self.sources()
   108             sources = self.sources()
   115         else:
   109         else:
   116             sources = (source,)
   110             sources = (source,)
   117         for source in sources:
   111         for source in sources:
       
   112             try:
       
   113                 # properly close existing connection if any
       
   114                 self.source_cnxs[source.uri][1].close()
       
   115             except:
       
   116                 pass
   118             source.info('trying to reconnect')
   117             source.info('trying to reconnect')
   119             self.source_cnxs[source.uri] = (source, source.get_connection())
   118             self.source_cnxs[source.uri] = (source, source.get_connection())
   120             self._cursors.pop(source.uri, None)
   119             self._cursors.pop(source.uri, None)
   121 
   120 
   122     def check_connections(self):
   121     def check_connections(self):
   128     def reset_connection(self, source, cnx):
   127     def reset_connection(self, source, cnx):
   129         self.source_cnxs[source.uri] = (source, cnx)
   128         self.source_cnxs[source.uri] = (source, cnx)
   130         self._cursors.pop(source.uri, None)
   129         self._cursors.pop(source.uri, None)
   131 
   130 
   132 
   131 
   133 class Operation(object):
   132 from cubicweb.server.hook import (Operation, LateOperation, SingleOperation,
   134     """an operation is triggered on connections pool events related to
   133                                   SingleLastOperation)
   135     commit / rollback transations. Possible events are:
   134 from logilab.common.deprecation import class_moved, class_renamed
   136 
   135 Operation = class_moved(Operation)
   137     precommit:
   136 PreCommitOperation = class_renamed('PreCommitOperation', Operation)
   138       the pool is preparing to commit. You shouldn't do anything things which
   137 LateOperation = class_moved(LateOperation)
   139       has to be reverted if the commit fail at this point, but you can freely
   138 SingleOperation = class_moved(SingleOperation)
   140       do any heavy computation or raise an exception if the commit can't go.
   139 SingleLastOperation = class_moved(SingleLastOperation)
   141       You can add some new operation during this phase but their precommit
       
   142       event won't be triggered
       
   143 
       
   144     commit:
       
   145       the pool is preparing to commit. You should avoid to do to expensive
       
   146       stuff or something that may cause an exception in this event
       
   147 
       
   148     revertcommit:
       
   149       if an operation failed while commited, this event is triggered for
       
   150       all operations which had their commit event already to let them
       
   151       revert things (including the operation which made fail the commit)
       
   152 
       
   153     rollback:
       
   154       the transaction has been either rollbacked either
       
   155       * intentionaly
       
   156       * a precommit event failed, all operations are rollbacked
       
   157       * a commit event failed, all operations which are not been triggered for
       
   158         commit are rollbacked
       
   159 
       
   160     order of operations may be important, and is controlled according to:
       
   161     * operation's class
       
   162     """
       
   163 
       
   164     def __init__(self, session, **kwargs):
       
   165         self.session = session
       
   166         self.user = session.user
       
   167         self.repo = session.repo
       
   168         self.schema = session.repo.schema
       
   169         self.config = session.repo.config
       
   170         self.__dict__.update(kwargs)
       
   171         self.register(session)
       
   172         # execution information
       
   173         self.processed = None # 'precommit', 'commit'
       
   174         self.failed = False
       
   175 
       
   176     def register(self, session):
       
   177         session.add_operation(self, self.insert_index())
       
   178 
       
   179     def insert_index(self):
       
   180         """return the index of  the lastest instance which is not a
       
   181         LateOperation instance
       
   182         """
       
   183         # faster by inspecting operation in reverse order for heavy transactions
       
   184         i = None
       
   185         for i, op in enumerate(reversed(self.session.pending_operations)):
       
   186             if isinstance(op, (LateOperation, SingleLastOperation)):
       
   187                 continue
       
   188             return -i or None
       
   189         if i is None:
       
   190             return None
       
   191         return -(i + 1)
       
   192 
       
   193     def handle_event(self, event):
       
   194         """delegate event handling to the operation"""
       
   195         getattr(self, event)()
       
   196 
       
   197     def precommit_event(self):
       
   198         """the observed connections pool is preparing a commit"""
       
   199 
       
   200     def revertprecommit_event(self):
       
   201         """an error went when pre-commiting this operation or a later one
       
   202 
       
   203         should revert pre-commit's changes but take care, they may have not
       
   204         been all considered if it's this operation which failed
       
   205         """
       
   206 
       
   207     def commit_event(self):
       
   208         """the observed connections pool is commiting"""
       
   209         raise NotImplementedError()
       
   210 
       
   211     def revertcommit_event(self):
       
   212         """an error went when commiting this operation or a later one
       
   213 
       
   214         should revert commit's changes but take care, they may have not
       
   215         been all considered if it's this operation which failed
       
   216         """
       
   217 
       
   218     def rollback_event(self):
       
   219         """the observed connections pool has been rollbacked
       
   220 
       
   221         do nothing by default, the operation will just be removed from the pool
       
   222         operation list
       
   223         """
       
   224 
       
   225     def postcommit_event(self):
       
   226         """the observed connections pool has committed"""
       
   227 
       
   228 
       
   229 class PreCommitOperation(Operation):
       
   230     """base class for operation only defining a precommit operation
       
   231     """
       
   232 
       
   233     def precommit_event(self):
       
   234         """the observed connections pool is preparing a commit"""
       
   235         raise NotImplementedError()
       
   236 
       
   237     def commit_event(self):
       
   238         """the observed connections pool is commiting"""
       
   239 
       
   240 
       
   241 class LateOperation(Operation):
       
   242     """special operation which should be called after all possible (ie non late)
       
   243     operations
       
   244     """
       
   245     def insert_index(self):
       
   246         """return the index of  the lastest instance which is not a
       
   247         SingleLastOperation instance
       
   248         """
       
   249         # faster by inspecting operation in reverse order for heavy transactions
       
   250         i = None
       
   251         for i, op in enumerate(reversed(self.session.pending_operations)):
       
   252             if isinstance(op, SingleLastOperation):
       
   253                 continue
       
   254             return -i or None
       
   255         if i is None:
       
   256             return None
       
   257         return -(i + 1)
       
   258 
       
   259 
       
   260 class SingleOperation(Operation):
       
   261     """special operation which should be called once"""
       
   262     def register(self, session):
       
   263         """override register to handle cases where this operation has already
       
   264         been added
       
   265         """
       
   266         operations = session.pending_operations
       
   267         index = self.equivalent_index(operations)
       
   268         if index is not None:
       
   269             equivalent = operations.pop(index)
       
   270         else:
       
   271             equivalent = None
       
   272         session.add_operation(self, self.insert_index())
       
   273         return equivalent
       
   274 
       
   275     def equivalent_index(self, operations):
       
   276         """return the index of the equivalent operation if any"""
       
   277         for i, op in enumerate(reversed(operations)):
       
   278             if op.__class__ is self.__class__:
       
   279                 return -(i+1)
       
   280         return None
       
   281 
       
   282 
       
   283 class SingleLastOperation(SingleOperation):
       
   284     """special operation which should be called once and after all other
       
   285     operations
       
   286     """
       
   287     def insert_index(self):
       
   288         return None
       
   289 
       
   290 from logging import getLogger
       
   291 from cubicweb import set_log_methods
       
   292 set_log_methods(Operation, getLogger('cubicweb.session'))