165 self.vreg = vreg |
165 self.vreg = vreg |
166 self._tasks_manager = tasks_manager |
166 self._tasks_manager = tasks_manager |
167 |
167 |
168 self.pyro_registered = False |
168 self.pyro_registered = False |
169 self.pyro_uri = None |
169 self.pyro_uri = None |
|
170 # every pyro client is handled in its own thread; map these threads to |
|
171 # the session we opened for them so we can clean up when they go away |
|
172 self._pyro_sessions = {} |
170 self.app_instances_bus = NullEventBus() |
173 self.app_instances_bus = NullEventBus() |
171 self.info('starting repository from %s', self.config.apphome) |
174 self.info('starting repository from %s', self.config.apphome) |
172 # dictionary of opened sessions |
175 # dictionary of opened sessions |
173 self._sessions = {} |
176 self._sessions = {} |
174 |
177 |
754 # use an internal connection |
757 # use an internal connection |
755 with self.internal_session() as session: |
758 with self.internal_session() as session: |
756 # try to get a user object |
759 # try to get a user object |
757 user = self.authenticate_user(session, login, **kwargs) |
760 user = self.authenticate_user(session, login, **kwargs) |
758 session = Session(user, self, cnxprops) |
761 session = Session(user, self, cnxprops) |
|
762 if threading.currentThread() in self._pyro_sessions: |
|
763 # assume no pyro client does one get_repository followed by |
|
764 # multiple repo.connect |
|
765 assert self._pyro_sessions[threading.currentThread()] == None |
|
766 self.debug('record session %s', session) |
|
767 self._pyro_sessions[threading.currentThread()] = session |
759 user._cw = user.cw_rset.req = session |
768 user._cw = user.cw_rset.req = session |
760 user.cw_clear_relation_cache() |
769 user.cw_clear_relation_cache() |
761 self._sessions[session.id] = session |
770 self._sessions[session.id] = session |
762 self.info('opened session %s for user %s', session.id, login) |
771 self.info('opened session %s for user %s', session.id, login) |
763 self.hm.call_hooks('session_open', session) |
772 self.hm.call_hooks('session_open', session) |
1635 self.pyro_registered = True |
1644 self.pyro_registered = True |
1636 # register a looping task to regularly ensure we're still registered |
1645 # register a looping task to regularly ensure we're still registered |
1637 # into the pyro name server |
1646 # into the pyro name server |
1638 if self._use_pyrons(): |
1647 if self._use_pyrons(): |
1639 self.looping_task(60*10, self._ensure_pyro_ns) |
1648 self.looping_task(60*10, self._ensure_pyro_ns) |
|
1649 pyro_sessions = self._pyro_sessions |
1640 # install hacky function to free cnxset |
1650 # install hacky function to free cnxset |
1641 self.looping_task(60, self._cleanup_pyro) |
1651 def handleConnection(conn, tcpserver, sessions=pyro_sessions): |
|
1652 sessions[threading.currentThread()] = None |
|
1653 return tcpserver.getAdapter().__class__.handleConnection(tcpserver.getAdapter(), conn, tcpserver) |
|
1654 daemon.getAdapter().handleConnection = handleConnection |
|
1655 def removeConnection(conn, sessions=pyro_sessions): |
|
1656 daemon.__class__.removeConnection(daemon, conn) |
|
1657 try: |
|
1658 session = sessions[threading.currentThread()] |
|
1659 except KeyError: |
|
1660 return |
|
1661 if session is None: |
|
1662 # client was not yet connected to the repo |
|
1663 return |
|
1664 if not session.closed: |
|
1665 session.close() |
|
1666 daemon.removeConnection = removeConnection |
1642 return daemon |
1667 return daemon |
1643 |
|
1644 def _cleanup_pyro(self): |
|
1645 """Very hacky function to cleanup session left by dead Pyro thread. |
|
1646 |
|
1647 There is no clean pyro callback to detect this. |
|
1648 """ |
|
1649 for session in self._sessions.values(): |
|
1650 for thread, cnxset in session._threads_in_transaction.copy(): |
|
1651 if not thread.isAlive(): |
|
1652 self.warning('Freeing cnxset used by dead pyro threads: %', |
|
1653 thread) |
|
1654 session._free_thread_cnxset(thread, cnxset) |
|
1655 |
1668 |
1656 def _ensure_pyro_ns(self): |
1669 def _ensure_pyro_ns(self): |
1657 if not self._use_pyrons(): |
1670 if not self._use_pyrons(): |
1658 return |
1671 return |
1659 from logilab.common import pyro_ext as pyro |
1672 from logilab.common import pyro_ext as pyro |