server/repository.py
changeset 7398 26695dd703d8
parent 7373 e5e6ef56cfb5
child 7399 972ed1843bd8
equal deleted inserted replaced
7397:6a9e66d788b3 7398:26695dd703d8
   162         self.querier = querier.QuerierHelper(self, self.schema)
   162         self.querier = querier.QuerierHelper(self, self.schema)
   163         # cache eid -> type / source
   163         # cache eid -> type / source
   164         self._type_source_cache = {}
   164         self._type_source_cache = {}
   165         # cache (extid, source uri) -> eid
   165         # cache (extid, source uri) -> eid
   166         self._extid_cache = {}
   166         self._extid_cache = {}
   167         # open some connections pools
   167         # open some connections set
   168         if config.open_connections_pools:
   168         if config.init_cnxset_pool:
   169             self.open_connections_pools()
   169             self.init_cnxset_pool()
   170         @onevent('after-registry-reload', self)
   170         @onevent('after-registry-reload', self)
   171         def fix_user_classes(self):
   171         def fix_user_classes(self):
   172             usercls = self.vreg['etypes'].etype_class('CWUser')
   172             usercls = self.vreg['etypes'].etype_class('CWUser')
   173             for session in self._sessions.values():
   173             for session in self._sessions.values():
   174                 if not isinstance(session.user, InternalManager):
   174                 if not isinstance(session.user, InternalManager):
   175                     session.user.__class__ = usercls
   175                     session.user.__class__ = usercls
   176 
   176 
   177     def open_connections_pools(self):
   177     def init_cnxset_pool(self):
   178         config = self.config
   178         config = self.config
   179         self._available_pools = Queue.Queue()
   179         self._cnxsets_pool = Queue.Queue()
   180         self._available_pools.put_nowait(pool.ConnectionsPool(self.sources))
   180         self._cnxsets_pool.put_nowait(pool.ConnectionsSet(self.sources))
   181         if config.quick_start:
   181         if config.quick_start:
   182             # quick start, usually only to get a minimal repository to get cubes
   182             # quick start, usually only to get a minimal repository to get cubes
   183             # information (eg dump/restore/...)
   183             # information (eg dump/restore/...)
   184             config._cubes = ()
   184             config._cubes = ()
   185             # only load hooks and entity classes in the registry
   185             # only load hooks and entity classes in the registry
   217         else:
   217         else:
   218             # call init_creating so that for instance native source can
   218             # call init_creating so that for instance native source can
   219             # configurate tsearch according to postgres version
   219             # configurate tsearch according to postgres version
   220             for source in self.sources:
   220             for source in self.sources:
   221                 source.init_creating()
   221                 source.init_creating()
   222         # close initialization pool and reopen fresh ones for proper
   222         # close initialization connetions set and reopen fresh ones for proper
   223         # initialization now that we know cubes
   223         # initialization now that we know cubes
   224         self._get_pool().close(True)
   224         self._get_cnxset().close(True)
   225         # list of available pools (we can't iterate on Queue instance)
   225         # list of available_cnxsets (we can't iterate on Queue instance)
   226         self.pools = []
   226         self.cnxsets = []
   227         for i in xrange(config['connections-pool-size']):
   227         for i in xrange(config['connections-pool-size']):
   228             self.pools.append(pool.ConnectionsPool(self.sources))
   228             self.cnxsets.append(pool.ConnectionsSet(self.sources))
   229             self._available_pools.put_nowait(self.pools[-1])
   229             self._cnxsets_pool.put_nowait(self.cnxsets[-1])
   230         if config.quick_start:
   230         if config.quick_start:
   231             config.init_cubes(self.get_cubes())
   231             config.init_cubes(self.get_cubes())
   232         self.hm = hook.HooksManager(self.vreg)
   232         self.hm = hook.HooksManager(self.vreg)
   233 
   233 
   234     # internals ###############################################################
   234     # internals ###############################################################
   247                 if sourceent.name == 'system':
   247                 if sourceent.name == 'system':
   248                     self.system_source.eid = sourceent.eid
   248                     self.system_source.eid = sourceent.eid
   249                     self.sources_by_eid[sourceent.eid] = self.system_source
   249                     self.sources_by_eid[sourceent.eid] = self.system_source
   250                     self.system_source.init(True, sourceent)
   250                     self.system_source.init(True, sourceent)
   251                     continue
   251                     continue
   252                 self.add_source(sourceent, add_to_pools=False)
   252                 self.add_source(sourceent, add_to_cnxsets=False)
   253         finally:
   253         finally:
   254             session.close()
   254             session.close()
   255 
   255 
   256     def _clear_planning_caches(self):
   256     def _clear_planning_caches(self):
   257         for cache in ('source_defs', 'is_multi_sources_relation',
   257         for cache in ('source_defs', 'is_multi_sources_relation',
   258                       'can_cross_relation', 'rel_type_sources'):
   258                       'can_cross_relation', 'rel_type_sources'):
   259             clear_cache(self, cache)
   259             clear_cache(self, cache)
   260 
   260 
   261     def add_source(self, sourceent, add_to_pools=True):
   261     def add_source(self, sourceent, add_to_cnxsets=True):
   262         source = self.get_source(sourceent.type, sourceent.name,
   262         source = self.get_source(sourceent.type, sourceent.name,
   263                                  sourceent.host_config, sourceent.eid)
   263                                  sourceent.host_config, sourceent.eid)
   264         self.sources_by_eid[sourceent.eid] = source
   264         self.sources_by_eid[sourceent.eid] = source
   265         self.sources_by_uri[sourceent.name] = source
   265         self.sources_by_uri[sourceent.name] = source
   266         if self.config.source_enabled(source):
   266         if self.config.source_enabled(source):
   267             # call source's init method to complete their initialisation if
   267             # call source's init method to complete their initialisation if
   268             # needed (for instance looking for persistent configuration using an
   268             # needed (for instance looking for persistent configuration using an
   269             # internal session, which is not possible until pools have been
   269             # internal session, which is not possible until connections sets have been
   270             # initialized)
   270             # initialized)
   271             source.init(True, sourceent)
   271             source.init(True, sourceent)
   272             if not source.copy_based_source:
   272             if not source.copy_based_source:
   273                 self.sources.append(source)
   273                 self.sources.append(source)
   274                 self.querier.set_planner()
   274                 self.querier.set_planner()
   275                 if add_to_pools:
   275                 if add_to_cnxsets:
   276                     for pool in self.pools:
   276                     for cnxset in self.cnxsets:
   277                         pool.add_source(source)
   277                        cnxset.add_source(source)
   278         else:
   278         else:
   279             source.init(False, sourceent)
   279             source.init(False, sourceent)
   280         self._clear_planning_caches()
   280         self._clear_planning_caches()
   281 
   281 
   282     def remove_source(self, uri):
   282     def remove_source(self, uri):
   283         source = self.sources_by_uri.pop(uri)
   283         source = self.sources_by_uri.pop(uri)
   284         del self.sources_by_eid[source.eid]
   284         del self.sources_by_eid[source.eid]
   285         if self.config.source_enabled(source) and not source.copy_based_source:
   285         if self.config.source_enabled(source) and not source.copy_based_source:
   286             self.sources.remove(source)
   286             self.sources.remove(source)
   287             self.querier.set_planner()
   287             self.querier.set_planner()
   288             for pool in self.pools:
   288             for cnxset in self.cnxsets:
   289                 pool.remove_source(source)
   289                 cnxset.remove_source(source)
   290         self._clear_planning_caches()
   290         self._clear_planning_caches()
   291 
   291 
   292     def get_source(self, type, uri, source_config, eid=None):
   292     def get_source(self, type, uri, source_config, eid=None):
   293         # set uri and type in source config so it's available through
   293         # set uri and type in source config so it's available through
   294         # source_defs()
   294         # source_defs()
   371         """start function in a separated thread"""
   371         """start function in a separated thread"""
   372         t = utils.RepoThread(func, self._running_threads)
   372         t = utils.RepoThread(func, self._running_threads)
   373         t.start()
   373         t.start()
   374 
   374 
   375     #@locked
   375     #@locked
   376     def _get_pool(self):
   376     def _get_cnxset(self):
   377         try:
   377         try:
   378             return self._available_pools.get(True, timeout=5)
   378             return self._cnxsets_pool.get(True, timeout=5)
   379         except Queue.Empty:
   379         except Queue.Empty:
   380             raise Exception('no pool available after 5 secs, probably either a '
   380             raise Exception('no connections set available after 5 secs, probably either a '
   381                             'bug in code (too many uncommited/rollbacked '
   381                             'bug in code (too many uncommited/rollbacked '
   382                             'connections) or too much load on the server (in '
   382                             'connections) or too much load on the server (in '
   383                             'which case you can try to set a bigger '
   383                             'which case you can try to set a bigger '
   384                             'connections pools size)')
   384                             'connections pool size)')
   385 
   385 
   386     def _free_pool(self, pool):
   386     def _free_cnxset(self, cnxset):
   387         self._available_pools.put_nowait(pool)
   387         self._cnxsets_pool.put_nowait(cnxset)
   388 
   388 
   389     def pinfo(self):
   389     def pinfo(self):
   390         # XXX: session.pool is accessed from a local storage, would be interesting
   390         # XXX: session.cnxset is accessed from a local storage, would be interesting
   391         #      to see if there is a pool set in any thread specific data)
   391         #      to see if there is a cnxset set in any thread specific data)
   392         return '%s: %s (%s)' % (self._available_pools.qsize(),
   392         return '%s: %s (%s)' % (self._cnxsets_pool.qsize(),
   393                                 ','.join(session.user.login for session in self._sessions.values()
   393                                 ','.join(session.user.login for session in self._sessions.values()
   394                                          if session.pool),
   394                                          if session.cnxset),
   395                                 threading.currentThread())
   395                                 threading.currentThread())
   396     def shutdown(self):
   396     def shutdown(self):
   397         """called on server stop event to properly close opened sessions and
   397         """called on server stop event to properly close opened sessions and
   398         connections
   398         connections
   399         """
   399         """
   412             self.info('thread %s finished', thread.getName())
   412             self.info('thread %s finished', thread.getName())
   413         if not (self.config.creating or self.config.repairing
   413         if not (self.config.creating or self.config.repairing
   414                 or self.config.quick_start):
   414                 or self.config.quick_start):
   415             self.hm.call_hooks('server_shutdown', repo=self)
   415             self.hm.call_hooks('server_shutdown', repo=self)
   416         self.close_sessions()
   416         self.close_sessions()
   417         while not self._available_pools.empty():
   417         while not self._cnxsets_pool.empty():
   418             pool = self._available_pools.get_nowait()
   418             cnxset = self._cnxsets_pool.get_nowait()
   419             try:
   419             try:
   420                 pool.close(True)
   420                 cnxset.close(True)
   421             except:
   421             except:
   422                 self.exception('error while closing %s' % pool)
   422                 self.exception('error while closing %s' % cnxset)
   423                 continue
   423                 continue
   424         if self.pyro_registered:
   424         if self.pyro_registered:
   425             if self._use_pyrons():
   425             if self._use_pyrons():
   426                 pyro_unregister(self.config)
   426                 pyro_unregister(self.config)
   427             self.pyro_uri = None
   427             self.pyro_uri = None
   499             results['%s_cache_hit_percent' % title] = (hits * 100) / (hits + misses)
   499             results['%s_cache_hit_percent' % title] = (hits * 100) / (hits + misses)
   500         results['sql_no_cache'] = self.system_source.no_cache
   500         results['sql_no_cache'] = self.system_source.no_cache
   501         results['nb_open_sessions'] = len(self._sessions)
   501         results['nb_open_sessions'] = len(self._sessions)
   502         results['nb_active_threads'] = threading.activeCount()
   502         results['nb_active_threads'] = threading.activeCount()
   503         results['looping_tasks'] = ', '.join(str(t) for t in self._looping_tasks)
   503         results['looping_tasks'] = ', '.join(str(t) for t in self._looping_tasks)
   504         results['available_pools'] = self._available_pools.qsize()
   504         results['available_cnxsets'] = self._cnxsets_pool.qsize()
   505         results['threads'] = ', '.join(sorted(str(t) for t in threading.enumerate()))
   505         results['threads'] = ', '.join(sorted(str(t) for t in threading.enumerate()))
   506         return results
   506         return results
   507 
   507 
   508     def get_schema(self):
   508     def get_schema(self):
   509         """Return the instance schema.
   509         """Return the instance schema.
   541         if foreid is None:
   541         if foreid is None:
   542             return self.config[option]
   542             return self.config[option]
   543         _, sourceuri, extid = self.type_and_source_from_eid(foreid)
   543         _, sourceuri, extid = self.type_and_source_from_eid(foreid)
   544         if sourceuri == 'system':
   544         if sourceuri == 'system':
   545             return self.config[option]
   545             return self.config[option]
   546         pool = self._get_pool()
   546         cnxset = self._get_cnxset()
   547         try:
   547         try:
   548             cnx = pool.connection(sourceuri)
   548             cnx = cnxset.connection(sourceuri)
   549             # needed to check connection is valid and usable by the current
   549             # needed to check connection is valid and usable by the current
   550             # thread
   550             # thread
   551             newcnx = self.sources_by_uri[sourceuri].check_connection(cnx)
   551             newcnx = self.sources_by_uri[sourceuri].check_connection(cnx)
   552             if newcnx is not None:
   552             if newcnx is not None:
   553                 cnx = newcnx
   553                 cnx = newcnx
   554             return cnx.get_option_value(option, extid)
   554             return cnx.get_option_value(option, extid)
   555         finally:
   555         finally:
   556             self._free_pool(pool)
   556             self._free_cnxset(cnxset)
   557 
   557 
   558     @cached
   558     @cached
   559     def get_versions(self, checkversions=False):
   559     def get_versions(self, checkversions=False):
   560         """Return the a dictionary containing cubes used by this instance
   560         """Return the a dictionary containing cubes used by this instance
   561         as key with their version as value, including cubicweb version.
   561         as key with their version as value, including cubicweb version.
   724         * rqlstring should be an unicode string or a plain ascii string
   724         * rqlstring should be an unicode string or a plain ascii string
   725         * args the optional parameters used in the query
   725         * args the optional parameters used in the query
   726         * build_descr is a flag indicating if the description should be
   726         * build_descr is a flag indicating if the description should be
   727           built on select queries
   727           built on select queries
   728         """
   728         """
   729         session = self._get_session(sessionid, setpool=True, txid=txid)
   729         session = self._get_session(sessionid, setcnxset=True, txid=txid)
   730         try:
   730         try:
   731             try:
   731             try:
   732                 rset = self.querier.execute(session, rqlstring, args,
   732                 rset = self.querier.execute(session, rqlstring, args,
   733                                             build_descr)
   733                                             build_descr)
   734                 # NOTE: the web front will (re)build it when needed
   734                 # NOTE: the web front will (re)build it when needed
   750             except:
   750             except:
   751                 # FIXME: check error to catch internal errors
   751                 # FIXME: check error to catch internal errors
   752                 self.exception('unexpected error while executing %s with %s', rqlstring, args)
   752                 self.exception('unexpected error while executing %s with %s', rqlstring, args)
   753                 raise
   753                 raise
   754         finally:
   754         finally:
   755             session.reset_pool()
   755             session.free_cnxset()
   756 
   756 
   757     def describe(self, sessionid, eid, txid=None):
   757     def describe(self, sessionid, eid, txid=None):
   758         """return a tuple (type, source, extid) for the entity with id <eid>"""
   758         """return a tuple (type, source, extid) for the entity with id <eid>"""
   759         session = self._get_session(sessionid, setpool=True, txid=txid)
   759         session = self._get_session(sessionid, setcnxset=True, txid=txid)
   760         try:
   760         try:
   761             return self.type_and_source_from_eid(eid, session)
   761             return self.type_and_source_from_eid(eid, session)
   762         finally:
   762         finally:
   763             session.reset_pool()
   763             session.free_cnxset()
   764 
   764 
   765     def check_session(self, sessionid):
   765     def check_session(self, sessionid):
   766         """raise `BadConnectionId` if the connection is no more valid, else
   766         """raise `BadConnectionId` if the connection is no more valid, else
   767         return its latest activity timestamp.
   767         return its latest activity timestamp.
   768         """
   768         """
   769         return self._get_session(sessionid, setpool=False).timestamp
   769         return self._get_session(sessionid, setcnxset=False).timestamp
   770 
   770 
   771     def get_shared_data(self, sessionid, key, default=None, pop=False, txdata=False):
   771     def get_shared_data(self, sessionid, key, default=None, pop=False, txdata=False):
   772         """return value associated to key in the session's data dictionary or
   772         """return value associated to key in the session's data dictionary or
   773         session's transaction's data if `txdata` is true.
   773         session's transaction's data if `txdata` is true.
   774 
   774 
   775         If pop is True, value will be removed from the dictionnary.
   775         If pop is True, value will be removed from the dictionnary.
   776 
   776 
   777         If key isn't defined in the dictionnary, value specified by the
   777         If key isn't defined in the dictionnary, value specified by the
   778         `default` argument will be returned.
   778         `default` argument will be returned.
   779         """
   779         """
   780         session = self._get_session(sessionid, setpool=False)
   780         session = self._get_session(sessionid, setcnxset=False)
   781         return session.get_shared_data(key, default, pop, txdata)
   781         return session.get_shared_data(key, default, pop, txdata)
   782 
   782 
   783     def set_shared_data(self, sessionid, key, value, txdata=False):
   783     def set_shared_data(self, sessionid, key, value, txdata=False):
   784         """set value associated to `key` in shared data
   784         """set value associated to `key` in shared data
   785 
   785 
   786         if `txdata` is true, the value will be added to the repository session's
   786         if `txdata` is true, the value will be added to the repository session's
   787         transaction's data which are cleared on commit/rollback of the current
   787         transaction's data which are cleared on commit/rollback of the current
   788         transaction.
   788         transaction.
   789         """
   789         """
   790         session = self._get_session(sessionid, setpool=False)
   790         session = self._get_session(sessionid, setcnxset=False)
   791         session.set_shared_data(key, value, txdata)
   791         session.set_shared_data(key, value, txdata)
   792 
   792 
   793     def commit(self, sessionid, txid=None):
   793     def commit(self, sessionid, txid=None):
   794         """commit transaction for the session with the given id"""
   794         """commit transaction for the session with the given id"""
   795         self.debug('begin commit for session %s', sessionid)
   795         self.debug('begin commit for session %s', sessionid)
   814             self.exception('unexpected error')
   814             self.exception('unexpected error')
   815             raise
   815             raise
   816 
   816 
   817     def close(self, sessionid, txid=None, checkshuttingdown=True):
   817     def close(self, sessionid, txid=None, checkshuttingdown=True):
   818         """close the session with the given id"""
   818         """close the session with the given id"""
   819         session = self._get_session(sessionid, setpool=True, txid=txid,
   819         session = self._get_session(sessionid, setcnxset=True, txid=txid,
   820                                     checkshuttingdown=checkshuttingdown)
   820                                     checkshuttingdown=checkshuttingdown)
   821         # operation uncommited before close are rollbacked before hook is called
   821         # operation uncommited before close are rollbacked before hook is called
   822         session.rollback(reset_pool=False)
   822         session.rollback(free_cnxset=False)
   823         self.hm.call_hooks('session_close', session)
   823         self.hm.call_hooks('session_close', session)
   824         # commit session at this point in case write operation has been done
   824         # commit session at this point in case write operation has been done
   825         # during `session_close` hooks
   825         # during `session_close` hooks
   826         session.commit()
   826         session.commit()
   827         session.close()
   827         session.close()
   832         """this method should be used by client to:
   832         """this method should be used by client to:
   833         * check session id validity
   833         * check session id validity
   834         * update user information on each user's request (i.e. groups and
   834         * update user information on each user's request (i.e. groups and
   835           custom properties)
   835           custom properties)
   836         """
   836         """
   837         session = self._get_session(sessionid, setpool=False)
   837         session = self._get_session(sessionid, setcnxset=False)
   838         if props is not None:
   838         if props is not None:
   839             self.set_session_props(sessionid, props)
   839             self.set_session_props(sessionid, props)
   840         user = session.user
   840         user = session.user
   841         return user.eid, user.login, user.groups, user.properties
   841         return user.eid, user.login, user.groups, user.properties
   842 
   842 
   844         """this method should be used by client to:
   844         """this method should be used by client to:
   845         * check session id validity
   845         * check session id validity
   846         * update user information on each user's request (i.e. groups and
   846         * update user information on each user's request (i.e. groups and
   847           custom properties)
   847           custom properties)
   848         """
   848         """
   849         session = self._get_session(sessionid, setpool=False)
   849         session = self._get_session(sessionid, setcnxset=False)
   850         for prop, value in props.items():
   850         for prop, value in props.items():
   851             session.change_property(prop, value)
   851             session.change_property(prop, value)
   852 
   852 
   853     def undoable_transactions(self, sessionid, ueid=None, txid=None,
   853     def undoable_transactions(self, sessionid, ueid=None, txid=None,
   854                               **actionfilters):
   854                               **actionfilters):
   855         """See :class:`cubicweb.dbapi.Connection.undoable_transactions`"""
   855         """See :class:`cubicweb.dbapi.Connection.undoable_transactions`"""
   856         session = self._get_session(sessionid, setpool=True, txid=txid)
   856         session = self._get_session(sessionid, setcnxset=True, txid=txid)
   857         try:
   857         try:
   858             return self.system_source.undoable_transactions(session, ueid,
   858             return self.system_source.undoable_transactions(session, ueid,
   859                                                             **actionfilters)
   859                                                             **actionfilters)
   860         finally:
   860         finally:
   861             session.reset_pool()
   861             session.free_cnxset()
   862 
   862 
   863     def transaction_info(self, sessionid, txuuid, txid=None):
   863     def transaction_info(self, sessionid, txuuid, txid=None):
   864         """See :class:`cubicweb.dbapi.Connection.transaction_info`"""
   864         """See :class:`cubicweb.dbapi.Connection.transaction_info`"""
   865         session = self._get_session(sessionid, setpool=True, txid=txid)
   865         session = self._get_session(sessionid, setcnxset=True, txid=txid)
   866         try:
   866         try:
   867             return self.system_source.tx_info(session, txuuid)
   867             return self.system_source.tx_info(session, txuuid)
   868         finally:
   868         finally:
   869             session.reset_pool()
   869             session.free_cnxset()
   870 
   870 
   871     def transaction_actions(self, sessionid, txuuid, public=True, txid=None):
   871     def transaction_actions(self, sessionid, txuuid, public=True, txid=None):
   872         """See :class:`cubicweb.dbapi.Connection.transaction_actions`"""
   872         """See :class:`cubicweb.dbapi.Connection.transaction_actions`"""
   873         session = self._get_session(sessionid, setpool=True, txid=txid)
   873         session = self._get_session(sessionid, setcnxset=True, txid=txid)
   874         try:
   874         try:
   875             return self.system_source.tx_actions(session, txuuid, public)
   875             return self.system_source.tx_actions(session, txuuid, public)
   876         finally:
   876         finally:
   877             session.reset_pool()
   877             session.free_cnxset()
   878 
   878 
   879     def undo_transaction(self, sessionid, txuuid, txid=None):
   879     def undo_transaction(self, sessionid, txuuid, txid=None):
   880         """See :class:`cubicweb.dbapi.Connection.undo_transaction`"""
   880         """See :class:`cubicweb.dbapi.Connection.undo_transaction`"""
   881         session = self._get_session(sessionid, setpool=True, txid=txid)
   881         session = self._get_session(sessionid, setcnxset=True, txid=txid)
   882         try:
   882         try:
   883             return self.system_source.undo_transaction(session, txuuid)
   883             return self.system_source.undo_transaction(session, txuuid)
   884         finally:
   884         finally:
   885             session.reset_pool()
   885             session.free_cnxset()
   886 
   886 
   887     # public (inter-repository) interface #####################################
   887     # public (inter-repository) interface #####################################
   888 
   888 
   889     def entities_modified_since(self, etypes, mtime):
   889     def entities_modified_since(self, etypes, mtime):
   890         """function designed to be called from an external repository which
   890         """function designed to be called from an external repository which
   932 
   932 
   933     def internal_session(self, cnxprops=None):
   933     def internal_session(self, cnxprops=None):
   934         """return a dbapi like connection/cursor using internal user which
   934         """return a dbapi like connection/cursor using internal user which
   935         have every rights on the repository. You'll *have to* commit/rollback
   935         have every rights on the repository. You'll *have to* commit/rollback
   936         or close (rollback implicitly) the session once the job's done, else
   936         or close (rollback implicitly) the session once the job's done, else
   937         you'll leak connections pool up to the time where no more pool is
   937         you'll leak connections set up to the time where no one is
   938         available, causing irremediable freeze...
   938         available, causing irremediable freeze...
   939         """
   939         """
   940         session = InternalSession(self, cnxprops)
   940         session = InternalSession(self, cnxprops)
   941         session.set_pool()
   941         session.set_cnxset()
   942         return session
   942         return session
   943 
   943 
   944     def _get_session(self, sessionid, setpool=False, txid=None,
   944     def _get_session(self, sessionid, setcnxset=False, txid=None,
   945                      checkshuttingdown=True):
   945                      checkshuttingdown=True):
   946         """return the user associated to the given session identifier"""
   946         """return the user associated to the given session identifier"""
   947         if checkshuttingdown and self.shutting_down:
   947         if checkshuttingdown and self.shutting_down:
   948             raise Exception('Repository is shutting down')
   948             raise Exception('Repository is shutting down')
   949         try:
   949         try:
   950             session = self._sessions[sessionid]
   950             session = self._sessions[sessionid]
   951         except KeyError:
   951         except KeyError:
   952             raise BadConnectionId('No such session %s' % sessionid)
   952             raise BadConnectionId('No such session %s' % sessionid)
   953         if setpool:
   953         if setcnxset:
   954             session.set_tx_data(txid) # must be done before set_pool
   954             session.set_tx_data(txid) # must be done before set_cnxset
   955             session.set_pool()
   955             session.set_cnxset()
   956         return session
   956         return session
   957 
   957 
   958     # data sources handling ###################################################
   958     # data sources handling ###################################################
   959     # * correspondance between eid and (type, source)
   959     # * correspondance between eid and (type, source)
   960     # * correspondance between eid and local id (i.e. specific to a given source)
   960     # * correspondance between eid and local id (i.e. specific to a given source)
   968         try:
   968         try:
   969             return self._type_source_cache[eid]
   969             return self._type_source_cache[eid]
   970         except KeyError:
   970         except KeyError:
   971             if session is None:
   971             if session is None:
   972                 session = self.internal_session()
   972                 session = self.internal_session()
   973                 reset_pool = True
   973                 free_cnxset = True
   974             else:
   974             else:
   975                 reset_pool = False
   975                 free_cnxset = False
   976             try:
   976             try:
   977                 etype, uri, extid = self.system_source.eid_type_source(session,
   977                 etype, uri, extid = self.system_source.eid_type_source(session,
   978                                                                        eid)
   978                                                                        eid)
   979             finally:
   979             finally:
   980                 if reset_pool:
   980                 if free_cnxset:
   981                     session.reset_pool()
   981                     session.free_cnxset()
   982         self._type_source_cache[eid] = (etype, uri, extid)
   982         self._type_source_cache[eid] = (etype, uri, extid)
   983         if uri != 'system':
   983         if uri != 'system':
   984             self._extid_cache[(extid, uri)] = eid
   984             self._extid_cache[(extid, uri)] = eid
   985         return etype, uri, extid
   985         return etype, uri, extid
   986 
   986 
  1037         cachekey = (extid, uri)
  1037         cachekey = (extid, uri)
  1038         try:
  1038         try:
  1039             return self._extid_cache[cachekey]
  1039             return self._extid_cache[cachekey]
  1040         except KeyError:
  1040         except KeyError:
  1041             pass
  1041             pass
  1042         reset_pool = False
  1042         free_cnxset = False
  1043         if session is None:
  1043         if session is None:
  1044             session = self.internal_session()
  1044             session = self.internal_session()
  1045             reset_pool = True
  1045             free_cnxset = True
  1046         eid = self.system_source.extid2eid(session, uri, extid)
  1046         eid = self.system_source.extid2eid(session, uri, extid)
  1047         if eid is not None:
  1047         if eid is not None:
  1048             self._extid_cache[cachekey] = eid
  1048             self._extid_cache[cachekey] = eid
  1049             self._type_source_cache[eid] = (etype, uri, extid)
  1049             self._type_source_cache[eid] = (etype, uri, extid)
  1050             if reset_pool:
  1050             if free_cnxset:
  1051                 session.reset_pool()
  1051                 session.free_cnxset()
  1052             return eid
  1052             return eid
  1053         if not insert:
  1053         if not insert:
  1054             return
  1054             return
  1055         # no link between extid and eid, create one using an internal session
  1055         # no link between extid and eid, create one using an internal session
  1056         # since the current session user may not have required permissions to
  1056         # since the current session user may not have required permissions to
  1058         #
  1058         #
  1059         # Moreover, even if session is already an internal session but is
  1059         # Moreover, even if session is already an internal session but is
  1060         # processing a commit, we have to use another one
  1060         # processing a commit, we have to use another one
  1061         if not session.is_internal_session:
  1061         if not session.is_internal_session:
  1062             session = self.internal_session()
  1062             session = self.internal_session()
  1063             reset_pool = True
  1063             free_cnxset = True
  1064         try:
  1064         try:
  1065             eid = self.system_source.create_eid(session)
  1065             eid = self.system_source.create_eid(session)
  1066             self._extid_cache[cachekey] = eid
  1066             self._extid_cache[cachekey] = eid
  1067             self._type_source_cache[eid] = (etype, uri, extid)
  1067             self._type_source_cache[eid] = (etype, uri, extid)
  1068             entity = source.before_entity_insertion(
  1068             entity = source.before_entity_insertion(
  1072             # XXX call add_info with complete=False ?
  1072             # XXX call add_info with complete=False ?
  1073             self.add_info(session, entity, source, extid)
  1073             self.add_info(session, entity, source, extid)
  1074             source.after_entity_insertion(session, extid, entity, sourceparams)
  1074             source.after_entity_insertion(session, extid, entity, sourceparams)
  1075             if source.should_call_hooks:
  1075             if source.should_call_hooks:
  1076                 self.hm.call_hooks('after_add_entity', session, entity=entity)
  1076                 self.hm.call_hooks('after_add_entity', session, entity=entity)
  1077             session.commit(reset_pool)
  1077             session.commit(free_cnxset)
  1078             return eid
  1078             return eid
  1079         except:
  1079         except:
  1080             session.rollback(reset_pool)
  1080             session.rollback(free_cnxset)
  1081             raise
  1081             raise
  1082 
  1082 
  1083     def add_info(self, session, entity, source, extid=None, complete=True):
  1083     def add_info(self, session, entity, source, extid=None, complete=True):
  1084         """add type and source info for an eid into the system table,
  1084         """add type and source info for an eid into the system table,
  1085         and index the entity with the full text index
  1085         and index the entity with the full text index