server/repository.py
changeset 9007 e27337dfec8c
parent 8954 9d30719142bf
child 9008 e0aa7cf8abf8
equal deleted inserted replaced
9006:e4ea8f9ffa11 9007:e27337dfec8c
   165         self.vreg = vreg
   165         self.vreg = vreg
   166         self._tasks_manager = tasks_manager
   166         self._tasks_manager = tasks_manager
   167 
   167 
   168         self.pyro_registered = False
   168         self.pyro_registered = False
   169         self.pyro_uri = None
   169         self.pyro_uri = None
       
   170         # every pyro client is handled in its own thread; map these threads to
       
   171         # the session we opened for them so we can clean up when they go away
       
   172         self._pyro_sessions = {}
   170         self.app_instances_bus = NullEventBus()
   173         self.app_instances_bus = NullEventBus()
   171         self.info('starting repository from %s', self.config.apphome)
   174         self.info('starting repository from %s', self.config.apphome)
   172         # dictionary of opened sessions
   175         # dictionary of opened sessions
   173         self._sessions = {}
   176         self._sessions = {}
   174 
   177 
   754         # use an internal connection
   757         # use an internal connection
   755         with self.internal_session() as session:
   758         with self.internal_session() as session:
   756             # try to get a user object
   759             # try to get a user object
   757             user = self.authenticate_user(session, login, **kwargs)
   760             user = self.authenticate_user(session, login, **kwargs)
   758         session = Session(user, self, cnxprops)
   761         session = Session(user, self, cnxprops)
       
   762         if threading.currentThread() in self._pyro_sessions:
       
   763             # assume no pyro client does one get_repository followed by
       
   764             # multiple repo.connect
       
   765             assert self._pyro_sessions[threading.currentThread()] == None
       
   766             self.debug('record session %s', session)
       
   767             self._pyro_sessions[threading.currentThread()] = session
   759         user._cw = user.cw_rset.req = session
   768         user._cw = user.cw_rset.req = session
   760         user.cw_clear_relation_cache()
   769         user.cw_clear_relation_cache()
   761         self._sessions[session.id] = session
   770         self._sessions[session.id] = session
   762         self.info('opened session %s for user %s', session.id, login)
   771         self.info('opened session %s for user %s', session.id, login)
   763         self.hm.call_hooks('session_open', session)
   772         self.hm.call_hooks('session_open', session)
  1635         self.pyro_registered = True
  1644         self.pyro_registered = True
  1636         # register a looping task to regularly ensure we're still registered
  1645         # register a looping task to regularly ensure we're still registered
  1637         # into the pyro name server
  1646         # into the pyro name server
  1638         if self._use_pyrons():
  1647         if self._use_pyrons():
  1639             self.looping_task(60*10, self._ensure_pyro_ns)
  1648             self.looping_task(60*10, self._ensure_pyro_ns)
       
  1649         pyro_sessions = self._pyro_sessions
  1640         # install hacky function to free cnxset
  1650         # install hacky function to free cnxset
  1641         self.looping_task(60, self._cleanup_pyro)
  1651         def handleConnection(conn, tcpserver, sessions=pyro_sessions):
       
  1652             sessions[threading.currentThread()] = None
       
  1653             return tcpserver.getAdapter().__class__.handleConnection(tcpserver.getAdapter(), conn, tcpserver)
       
  1654         daemon.getAdapter().handleConnection = handleConnection
       
  1655         def removeConnection(conn, sessions=pyro_sessions):
       
  1656             daemon.__class__.removeConnection(daemon, conn)
       
  1657             try:
       
  1658                 session = sessions[threading.currentThread()]
       
  1659             except KeyError:
       
  1660                 return
       
  1661             if session is None:
       
  1662                 # client was not yet connected to the repo
       
  1663                 return
       
  1664             if not session.closed:
       
  1665                 session.close()
       
  1666         daemon.removeConnection = removeConnection
  1642         return daemon
  1667         return daemon
  1643 
       
  1644     def _cleanup_pyro(self):
       
  1645         """Very hacky function to cleanup session left by dead Pyro thread.
       
  1646 
       
  1647         There is no clean pyro callback to detect this.
       
  1648         """
       
  1649         for session in self._sessions.values():
       
  1650             for thread, cnxset in session._threads_in_transaction.copy():
       
  1651                 if not thread.isAlive():
       
  1652                     self.warning('Freeing cnxset used by dead pyro threads: %',
       
  1653                                  thread)
       
  1654                     session._free_thread_cnxset(thread, cnxset)
       
  1655 
  1668 
  1656     def _ensure_pyro_ns(self):
  1669     def _ensure_pyro_ns(self):
  1657         if not self._use_pyrons():
  1670         if not self._use_pyrons():
  1658             return
  1671             return
  1659         from logilab.common import pyro_ext as pyro
  1672         from logilab.common import pyro_ext as pyro