server/repository.py
branchstable
changeset 8463 a964c40adbe3
parent 8456 c912d82f2166
child 8537 e30d0a7f0087
child 8547 f23ac525ddd1
equal deleted inserted replaced
8461:8af7c6d86efb 8463:a964c40adbe3
     1 # copyright 2003-2011 LOGILAB S.A. (Paris, FRANCE), all rights reserved.
     1 # copyright 2003-2012 LOGILAB S.A. (Paris, FRANCE), all rights reserved.
     2 # contact http://www.logilab.fr/ -- mailto:contact@logilab.fr
     2 # contact http://www.logilab.fr/ -- mailto:contact@logilab.fr
     3 #
     3 #
     4 # This file is part of CubicWeb.
     4 # This file is part of CubicWeb.
     5 #
     5 #
     6 # CubicWeb is free software: you can redistribute it and/or modify it under the
     6 # CubicWeb is free software: you can redistribute it and/or modify it under the
    63 NO_CACHE_RELATIONS = set( [('owned_by', 'object'),
    63 NO_CACHE_RELATIONS = set( [('owned_by', 'object'),
    64                            ('created_by', 'object'),
    64                            ('created_by', 'object'),
    65                            ('cw_source', 'object'),
    65                            ('cw_source', 'object'),
    66                            ])
    66                            ])
    67 
    67 
    68 def prefill_entity_caches(entity, relations):
    68 def prefill_entity_caches(entity):
    69     session = entity._cw
    69     session = entity._cw
    70     # prefill entity relation caches
    70     # prefill entity relation caches
    71     for rschema in entity.e_schema.subject_relations():
    71     for rschema in entity.e_schema.subject_relations():
    72         rtype = str(rschema)
    72         rtype = str(rschema)
    73         if rtype in schema.VIRTUAL_RTYPES or (rtype, 'subject') in NO_CACHE_RELATIONS:
    73         if rtype in schema.VIRTUAL_RTYPES or (rtype, 'subject') in NO_CACHE_RELATIONS:
   118             session.execute('DELETE X %s Y WHERE Y eid %%(y)s, '
   118             session.execute('DELETE X %s Y WHERE Y eid %%(y)s, '
   119                             'NOT X eid %%(x)s' % rtype,
   119                             'NOT X eid %%(x)s' % rtype,
   120                             {'x': eidfrom, 'y': eidto})
   120                             {'x': eidfrom, 'y': eidto})
   121 
   121 
   122 
   122 
       
   123 
       
   124 class NullEventBus(object):
       
   125     def publish(self, msg):
       
   126         pass
       
   127 
       
   128     def add_subscription(self, topic, callback):
       
   129         pass
       
   130 
       
   131     def start(self):
       
   132         pass
       
   133 
       
   134     def stop(self):
       
   135         pass
       
   136 
       
   137 
   123 class Repository(object):
   138 class Repository(object):
   124     """a repository provides access to a set of persistent storages for
   139     """a repository provides access to a set of persistent storages for
   125     entities and relations
   140     entities and relations
   126 
   141 
   127     XXX protect pyro access
   142     XXX protect pyro access
   128     """
   143     """
   129 
   144 
   130     def __init__(self, config, vreg=None):
   145     def __init__(self, config, tasks_manager=None, vreg=None):
   131         self.config = config
   146         self.config = config
   132         if vreg is None:
   147         if vreg is None:
   133             vreg = cwvreg.CubicWebVRegistry(config)
   148             vreg = cwvreg.CWRegistryStore(config)
   134         self.vreg = vreg
   149         self.vreg = vreg
       
   150         self._tasks_manager = tasks_manager
       
   151 
   135         self.pyro_registered = False
   152         self.pyro_registered = False
   136         self.pyro_uri = None
   153         self.pyro_uri = None
       
   154         self.app_instances_bus = NullEventBus()
   137         self.info('starting repository from %s', self.config.apphome)
   155         self.info('starting repository from %s', self.config.apphome)
   138         # dictionary of opened sessions
   156         # dictionary of opened sessions
   139         self._sessions = {}
   157         self._sessions = {}
       
   158 
       
   159 
   140         # list of functions to be called at regular interval
   160         # list of functions to be called at regular interval
   141         self._looping_tasks = []
       
   142         # list of running threads
   161         # list of running threads
   143         self._running_threads = []
   162         self._running_threads = []
   144         # initial schema, should be build or replaced latter
   163         # initial schema, should be build or replaced latter
   145         self.schema = schema.CubicWebSchema(config.appid)
   164         self.schema = schema.CubicWebSchema(config.appid)
   146         self.vreg.schema = self.schema # until actual schema is loaded...
   165         self.vreg.schema = self.schema # until actual schema is loaded...
   160         # open some connections set
   179         # open some connections set
   161         if config.init_cnxset_pool:
   180         if config.init_cnxset_pool:
   162             self.init_cnxset_pool()
   181             self.init_cnxset_pool()
   163         @onevent('after-registry-reload', self)
   182         @onevent('after-registry-reload', self)
   164         def fix_user_classes(self):
   183         def fix_user_classes(self):
       
   184             # After registery reload the 'CWUser' class used for CWEtype
       
   185             # changed.  To any existing user object have a different class than
       
   186             # the new loaded one. We are hot fixing this.
   165             usercls = self.vreg['etypes'].etype_class('CWUser')
   187             usercls = self.vreg['etypes'].etype_class('CWUser')
   166             for session in self._sessions.values():
   188             for session in self._sessions.values():
   167                 if not isinstance(session.user, InternalManager):
   189                 if not isinstance(session.user, InternalManager):
   168                     session.user.__class__ = usercls
   190                     session.user.__class__ = usercls
   169 
   191 
   230         self.sources_by_eid = {}
   252         self.sources_by_eid = {}
   231         if self.config.quick_start \
   253         if self.config.quick_start \
   232                or not 'CWSource' in self.schema: # # 3.10 migration
   254                or not 'CWSource' in self.schema: # # 3.10 migration
   233             self.system_source.init_creating()
   255             self.system_source.init_creating()
   234             return
   256             return
   235         session = self.internal_session()
   257         with self.internal_session() as session:
   236         try:
       
   237             # FIXME: sources should be ordered (add_entity priority)
   258             # FIXME: sources should be ordered (add_entity priority)
   238             for sourceent in session.execute(
   259             for sourceent in session.execute(
   239                 'Any S, SN, SA, SC WHERE S is_instance_of CWSource, '
   260                 'Any S, SN, SA, SC WHERE S is_instance_of CWSource, '
   240                 'S name SN, S type SA, S config SC').entities():
   261                 'S name SN, S type SA, S config SC').entities():
   241                 if sourceent.name == 'system':
   262                 if sourceent.name == 'system':
   242                     self.system_source.eid = sourceent.eid
   263                     self.system_source.eid = sourceent.eid
   243                     self.sources_by_eid[sourceent.eid] = self.system_source
   264                     self.sources_by_eid[sourceent.eid] = self.system_source
   244                     self.system_source.init(True, sourceent)
   265                     self.system_source.init(True, sourceent)
   245                     continue
   266                     continue
   246                 self.add_source(sourceent, add_to_cnxsets=False)
   267                 self.add_source(sourceent, add_to_cnxsets=False)
   247         finally:
       
   248             session.close()
       
   249 
   268 
   250     def _clear_planning_caches(self):
   269     def _clear_planning_caches(self):
   251         for cache in ('source_defs', 'is_multi_sources_relation',
   270         for cache in ('source_defs', 'is_multi_sources_relation',
   252                       'can_cross_relation', 'rel_type_sources'):
   271                       'can_cross_relation', 'rel_type_sources'):
   253             clear_cache(self, cache)
   272             clear_cache(self, cache)
   307         for source in self.sources_by_uri.values():
   326         for source in self.sources_by_uri.values():
   308             source.set_schema(schema)
   327             source.set_schema(schema)
   309         self.schema = schema
   328         self.schema = schema
   310 
   329 
   311     def fill_schema(self):
   330     def fill_schema(self):
   312         """lod schema from the repository"""
   331         """load schema from the repository"""
   313         from cubicweb.server.schemaserial import deserialize_schema
   332         from cubicweb.server.schemaserial import deserialize_schema
   314         self.info('loading schema from the repository')
   333         self.info('loading schema from the repository')
   315         appschema = schema.CubicWebSchema(self.config.appid)
   334         appschema = schema.CubicWebSchema(self.config.appid)
   316         self.set_schema(self.config.load_bootstrap_schema(), resetvreg=False)
   335         self.set_schema(self.config.load_bootstrap_schema(), resetvreg=False)
   317         self.debug('deserializing db schema into %s %#x', appschema.name, id(appschema))
   336         self.debug('deserializing db schema into %s %#x', appschema.name, id(appschema))
   318         session = self.internal_session()
   337         with self.internal_session() as session:
   319         try:
       
   320             try:
   338             try:
   321                 deserialize_schema(appschema, session)
   339                 deserialize_schema(appschema, session)
   322             except BadSchemaDefinition:
   340             except BadSchemaDefinition:
   323                 raise
   341                 raise
   324             except Exception, ex:
   342             except Exception, ex:
   325                 import traceback
   343                 import traceback
   326                 traceback.print_exc()
   344                 traceback.print_exc()
   327                 raise Exception('Is the database initialised ? (cause: %s)' %
   345                 raise Exception('Is the database initialised ? (cause: %s)' %
   328                                 (ex.args and ex.args[0].strip() or 'unknown')), \
   346                                 (ex.args and ex.args[0].strip() or 'unknown')), \
   329                                 None, sys.exc_info()[-1]
   347                                 None, sys.exc_info()[-1]
   330         finally:
       
   331             session.close()
       
   332         self.set_schema(appschema)
   348         self.set_schema(appschema)
   333 
   349 
   334     def start_looping_tasks(self):
   350 
       
   351     def _prepare_startup(self):
       
   352         """Prepare "Repository as a server" for startup.
       
   353 
       
   354         * trigger server startup hook,
       
   355         * register session clean up task.
       
   356         """
   335         if not (self.config.creating or self.config.repairing
   357         if not (self.config.creating or self.config.repairing
   336                 or self.config.quick_start):
   358                 or self.config.quick_start):
   337             # call instance level initialisation hooks
   359             # call instance level initialisation hooks
   338             self.hm.call_hooks('server_startup', repo=self)
   360             self.hm.call_hooks('server_startup', repo=self)
   339             # register a task to cleanup expired session
   361             # register a task to cleanup expired session
   340             self.cleanup_session_time = self.config['cleanup-session-time'] or 60 * 60 * 24
   362             self.cleanup_session_time = self.config['cleanup-session-time'] or 60 * 60 * 24
   341             assert self.cleanup_session_time > 0
   363             assert self.cleanup_session_time > 0
   342             cleanup_session_interval = min(60*60, self.cleanup_session_time / 3)
   364             cleanup_session_interval = min(60*60, self.cleanup_session_time / 3)
   343             self.looping_task(cleanup_session_interval, self.clean_sessions)
   365             assert self._tasks_manager is not None, "This Repository is not intended to be used as a server"
   344         assert isinstance(self._looping_tasks, list), 'already started'
   366             self._tasks_manager.add_looping_task(cleanup_session_interval,
   345         for i, (interval, func, args) in enumerate(self._looping_tasks):
   367                                                  self.clean_sessions)
   346             self._looping_tasks[i] = task = utils.LoopTask(self, interval, func, args)
   368 
   347             self.info('starting task %s with interval %.2fs', task.name,
   369     def start_looping_tasks(self):
   348                       interval)
   370         """Actual "Repository as a server" startup.
   349             task.start()
   371 
   350         # ensure no tasks will be further added
   372         * trigger server startup hook,
   351         self._looping_tasks = tuple(self._looping_tasks)
   373         * register session clean up task,
       
   374         * start all tasks.
       
   375 
       
   376         XXX Other startup related stuffs are done elsewhere. In Repository
       
   377         XXX __init__ or in external codes (various server managers).
       
   378         """
       
   379         self._prepare_startup()
       
   380         assert self._tasks_manager is not None, "This Repository is not intended to be used as a server"
       
   381         self._tasks_manager.start()
   352 
   382 
   353     def looping_task(self, interval, func, *args):
   383     def looping_task(self, interval, func, *args):
   354         """register a function to be called every `interval` seconds.
   384         """register a function to be called every `interval` seconds.
   355 
   385 
   356         looping tasks can only be registered during repository initialization,
   386         looping tasks can only be registered during repository initialization,
   357         once done this method will fail.
   387         once done this method will fail.
   358         """
   388         """
   359         try:
   389         assert self._tasks_manager is not None, "This Repository is not intended to be used as a server"
   360             self._looping_tasks.append( (interval, func, args) )
   390         self._tasks_manager.add_looping_task(interval, func, *args)
   361         except AttributeError:
       
   362             raise RuntimeError("can't add looping task once the repository is started")
       
   363 
   391 
   364     def threaded_task(self, func):
   392     def threaded_task(self, func):
   365         """start function in a separated thread"""
   393         """start function in a separated thread"""
   366         t = utils.RepoThread(func, self._running_threads)
   394         utils.RepoThread(func, self._running_threads).start()
   367         t.start()
       
   368 
   395 
   369     #@locked
   396     #@locked
   370     def _get_cnxset(self):
   397     def _get_cnxset(self):
   371         try:
   398         try:
   372             return self._cnxsets_pool.get(True, timeout=5)
   399             return self._cnxsets_pool.get(True, timeout=5)
   390     def shutdown(self):
   417     def shutdown(self):
   391         """called on server stop event to properly close opened sessions and
   418         """called on server stop event to properly close opened sessions and
   392         connections
   419         connections
   393         """
   420         """
   394         assert not self.shutting_down, 'already shutting down'
   421         assert not self.shutting_down, 'already shutting down'
       
   422         if not (self.config.creating or self.config.repairing
       
   423                 or self.config.quick_start):
       
   424             # then, the system source is still available
       
   425             self.hm.call_hooks('before_server_shutdown', repo=self)
   395         self.shutting_down = True
   426         self.shutting_down = True
   396         self.system_source.shutdown()
   427         self.system_source.shutdown()
   397         if isinstance(self._looping_tasks, tuple): # if tasks have been started
   428         if self._tasks_manager is not None:
   398             for looptask in self._looping_tasks:
   429             self._tasks_manager.stop()
   399                 self.info('canceling task %s...', looptask.name)
   430         if not (self.config.creating or self.config.repairing
   400                 looptask.cancel()
   431                 or self.config.quick_start):
   401                 looptask.join()
   432             self.hm.call_hooks('server_shutdown', repo=self)
   402                 self.info('task %s finished', looptask.name)
       
   403         for thread in self._running_threads:
   433         for thread in self._running_threads:
   404             self.info('waiting thread %s...', thread.getName())
   434             self.info('waiting thread %s...', thread.getName())
   405             thread.join()
   435             thread.join()
   406             self.info('thread %s finished', thread.getName())
   436             self.info('thread %s finished', thread.getName())
   407         if not (self.config.creating or self.config.repairing
       
   408                 or self.config.quick_start):
       
   409             self.hm.call_hooks('server_shutdown', repo=self)
       
   410         self.close_sessions()
   437         self.close_sessions()
   411         while not self._cnxsets_pool.empty():
   438         while not self._cnxsets_pool.empty():
   412             cnxset = self._cnxsets_pool.get_nowait()
   439             cnxset = self._cnxsets_pool.get_nowait()
   413             try:
   440             try:
   414                 cnxset.close(True)
   441                 cnxset.close(True)
   434 
   461 
   435     def check_auth_info(self, session, login, authinfo):
   462     def check_auth_info(self, session, login, authinfo):
   436         """validate authentication, raise AuthenticationError on failure, return
   463         """validate authentication, raise AuthenticationError on failure, return
   437         associated CWUser's eid on success.
   464         associated CWUser's eid on success.
   438         """
   465         """
   439         for source in self.sources:
   466         # iter on sources_by_uri then check enabled source since sources doesn't
   440             if source.support_entity('CWUser'):
   467         # contain copy based sources
       
   468         for source in self.sources_by_uri.itervalues():
       
   469             if self.config.source_enabled(source) and source.support_entity('CWUser'):
   441                 try:
   470                 try:
   442                     return source.authenticate(session, login, **authinfo)
   471                     return source.authenticate(session, login, **authinfo)
   443                 except AuthenticationError:
   472                 except AuthenticationError:
   444                     continue
   473                     continue
   445         else:
   474         else:
   495         results['type_source_cache_size'] = len(self._type_source_cache)
   524         results['type_source_cache_size'] = len(self._type_source_cache)
   496         results['extid_cache_size'] = len(self._extid_cache)
   525         results['extid_cache_size'] = len(self._extid_cache)
   497         results['sql_no_cache'] = self.system_source.no_cache
   526         results['sql_no_cache'] = self.system_source.no_cache
   498         results['nb_open_sessions'] = len(self._sessions)
   527         results['nb_open_sessions'] = len(self._sessions)
   499         results['nb_active_threads'] = threading.activeCount()
   528         results['nb_active_threads'] = threading.activeCount()
   500         results['looping_tasks'] = ', '.join(str(t) for t in self._looping_tasks)
   529         looping_tasks = self._tasks_manager._looping_tasks
       
   530         results['looping_tasks'] = ', '.join(str(t) for t in looping_tasks)
   501         results['available_cnxsets'] = self._cnxsets_pool.qsize()
   531         results['available_cnxsets'] = self._cnxsets_pool.qsize()
   502         results['threads'] = ', '.join(sorted(str(t) for t in threading.enumerate()))
   532         results['threads'] = ', '.join(sorted(str(t) for t in threading.enumerate()))
   503         return results
   533         return results
   504 
   534 
   505     def gc_stats(self, nmax=20):
   535     def gc_stats(self, nmax=20):
   593 
   623 
   594         This is a public method, not requiring a session id.
   624         This is a public method, not requiring a session id.
   595         """
   625         """
   596         from logilab.common.changelog import Version
   626         from logilab.common.changelog import Version
   597         vcconf = {}
   627         vcconf = {}
   598         session = self.internal_session()
   628         with self.internal_session() as session:
   599         try:
       
   600             for pk, version in session.execute(
   629             for pk, version in session.execute(
   601                 'Any K,V WHERE P is CWProperty, P value V, P pkey K, '
   630                 'Any K,V WHERE P is CWProperty, P value V, P pkey K, '
   602                 'P pkey ~="system.version.%"', build_descr=False):
   631                 'P pkey ~="system.version.%"', build_descr=False):
   603                 cube = pk.split('.')[-1]
   632                 cube = pk.split('.')[-1]
   604                 # XXX cubicweb migration
   633                 # XXX cubicweb migration
   613                         fsversion = self.config.cubicweb_version()
   642                         fsversion = self.config.cubicweb_version()
   614                     if version < fsversion:
   643                     if version < fsversion:
   615                         msg = ('instance has %s version %s but %s '
   644                         msg = ('instance has %s version %s but %s '
   616                                'is installed. Run "cubicweb-ctl upgrade".')
   645                                'is installed. Run "cubicweb-ctl upgrade".')
   617                         raise ExecutionError(msg % (cube, version, fsversion))
   646                         raise ExecutionError(msg % (cube, version, fsversion))
   618         finally:
       
   619             session.close()
       
   620         return vcconf
   647         return vcconf
   621 
   648 
   622     @cached
   649     @cached
   623     def source_defs(self):
   650     def source_defs(self):
   624         """Return the a dictionary containing source uris as value and a
   651         """Return the a dictionary containing source uris as value and a
   635     def properties(self):
   662     def properties(self):
   636         """Return a result set containing system wide properties.
   663         """Return a result set containing system wide properties.
   637 
   664 
   638         This is a public method, not requiring a session id.
   665         This is a public method, not requiring a session id.
   639         """
   666         """
   640         session = self.internal_session()
   667         with self.internal_session() as session:
   641         try:
       
   642             # don't use session.execute, we don't want rset.req set
   668             # don't use session.execute, we don't want rset.req set
   643             return self.querier.execute(session, 'Any K,V WHERE P is CWProperty,'
   669             return self.querier.execute(session, 'Any K,V WHERE P is CWProperty,'
   644                                         'P pkey K, P value V, NOT P for_user U',
   670                                         'P pkey K, P value V, NOT P for_user U',
   645                                         build_descr=False)
   671                                         build_descr=False)
   646         finally:
       
   647             session.close()
       
   648 
   672 
   649     # XXX protect this method: anonymous should be allowed and registration
   673     # XXX protect this method: anonymous should be allowed and registration
   650     # plugged
   674     # plugged
   651     def register_user(self, login, password, email=None, **kwargs):
   675     def register_user(self, login, password, email=None, **kwargs):
   652         """check a user with the given login exists, if not create it with the
   676         """check a user with the given login exists, if not create it with the
   653         given password. This method is designed to be used for anonymous
   677         given password. This method is designed to be used for anonymous
   654         registration on public web site.
   678         registration on public web site.
   655         """
   679         """
   656         session = self.internal_session()
   680         with self.internal_session() as session:
   657         # for consistency, keep same error as unique check hook (although not required)
   681             # for consistency, keep same error as unique check hook (although not required)
   658         errmsg = session._('the value "%s" is already used, use another one')
   682             errmsg = session._('the value "%s" is already used, use another one')
   659         try:
       
   660             if (session.execute('CWUser X WHERE X login %(login)s', {'login': login},
   683             if (session.execute('CWUser X WHERE X login %(login)s', {'login': login},
   661                                 build_descr=False)
   684                                 build_descr=False)
   662                 or session.execute('CWUser X WHERE X use_email C, C address %(login)s',
   685                 or session.execute('CWUser X WHERE X use_email C, C address %(login)s',
   663                                    {'login': login}, build_descr=False)):
   686                                    {'login': login}, build_descr=False)):
   664                 qname = role_name('login', 'subject')
   687                 qname = role_name('login', 'subject')
   681                     raise ValidationError(None, {qname: errmsg % d['email']})
   704                     raise ValidationError(None, {qname: errmsg % d['email']})
   682                 session.execute('INSERT EmailAddress X: X address %(email)s, '
   705                 session.execute('INSERT EmailAddress X: X address %(email)s, '
   683                                 'U primary_email X, U use_email X '
   706                                 'U primary_email X, U use_email X '
   684                                 'WHERE U login %(login)s', d, build_descr=False)
   707                                 'WHERE U login %(login)s', d, build_descr=False)
   685             session.commit()
   708             session.commit()
   686         finally:
       
   687             session.close()
       
   688         return True
   709         return True
   689 
   710 
   690     def find_users(self, fetch_attrs, **query_attrs):
   711     def find_users(self, fetch_attrs, **query_attrs):
   691         """yield user attributes for cwusers matching the given query_attrs
   712         """yield user attributes for cwusers matching the given query_attrs
   692         (the result set cannot survive this method call)
   713         (the result set cannot survive this method call)
   705                                      if not rschema.meta)
   726                                      if not rschema.meta)
   706         cwuserattrs = self._cwuser_attrs
   727         cwuserattrs = self._cwuser_attrs
   707         for k in chain(fetch_attrs, query_attrs.iterkeys()):
   728         for k in chain(fetch_attrs, query_attrs.iterkeys()):
   708             if k not in cwuserattrs:
   729             if k not in cwuserattrs:
   709                 raise Exception('bad input for find_user')
   730                 raise Exception('bad input for find_user')
   710         session = self.internal_session()
   731         with self.internal_session() as session:
   711         try:
       
   712             varmaker = rqlvar_maker()
   732             varmaker = rqlvar_maker()
   713             vars = [(attr, varmaker.next()) for attr in fetch_attrs]
   733             vars = [(attr, varmaker.next()) for attr in fetch_attrs]
   714             rql = 'Any %s WHERE X is CWUser, ' % ','.join(var[1] for var in vars)
   734             rql = 'Any %s WHERE X is CWUser, ' % ','.join(var[1] for var in vars)
   715             rql += ','.join('X %s %s' % (var[0], var[1]) for var in vars) + ','
   735             rql += ','.join('X %s %s' % (var[0], var[1]) for var in vars) + ','
   716             rset = session.execute(rql + ','.join('X %s %%(%s)s' % (attr, attr)
   736             rset = session.execute(rql + ','.join('X %s %%(%s)s' % (attr, attr)
   717                                                   for attr in query_attrs.iterkeys()),
   737                                                   for attr in query_attrs.iterkeys()),
   718                                    query_attrs)
   738                                    query_attrs)
   719             return rset.rows
   739             return rset.rows
   720         finally:
       
   721             session.close()
       
   722 
   740 
   723     def connect(self, login, **kwargs):
   741     def connect(self, login, **kwargs):
   724         """open a connection for a given user
   742         """open a connection for a given user
   725 
   743 
   726         base_url may be needed to send mails
   744         base_url may be needed to send mails
   728 
   746 
   729         raise `AuthenticationError` if the authentication failed
   747         raise `AuthenticationError` if the authentication failed
   730         raise `ConnectionError` if we can't open a connection
   748         raise `ConnectionError` if we can't open a connection
   731         """
   749         """
   732         # use an internal connection
   750         # use an internal connection
   733         session = self.internal_session()
   751         with self.internal_session() as session:
   734         # try to get a user object
   752             # try to get a user object
   735         cnxprops = kwargs.pop('cnxprops', None)
   753             cnxprops = kwargs.pop('cnxprops', None)
   736         try:
       
   737             user = self.authenticate_user(session, login, **kwargs)
   754             user = self.authenticate_user(session, login, **kwargs)
   738         finally:
       
   739             session.close()
       
   740         session = Session(user, self, cnxprops)
   755         session = Session(user, self, cnxprops)
   741         user._cw = user.cw_rset.req = session
   756         user._cw = user.cw_rset.req = session
   742         user.cw_clear_relation_cache()
   757         user.cw_clear_relation_cache()
   743         self._sessions[session.id] = session
   758         self._sessions[session.id] = session
   744         self.info('opened session %s for user %s', session.id, login)
   759         self.info('opened session %s for user %s', session.id, login)
   859         session.commit()
   874         session.commit()
   860         session.close()
   875         session.close()
   861         del self._sessions[sessionid]
   876         del self._sessions[sessionid]
   862         self.info('closed session %s for user %s', sessionid, session.user.login)
   877         self.info('closed session %s for user %s', sessionid, session.user.login)
   863 
   878 
       
   879     def call_service(self, sessionid, regid, async, **kwargs):
       
   880         """
       
   881         See :class:`cubicweb.dbapi.Connection.call_service`
       
   882         and :class:`cubicweb.server.Service`
       
   883         """
       
   884         def task():
       
   885             session = self._get_session(sessionid, setcnxset=True)
       
   886             service = session.vreg['services'].select(regid, session, **kwargs)
       
   887             try:
       
   888                 return service.call(**kwargs)
       
   889             finally:
       
   890                 session.rollback() # free cnxset
       
   891         if async:
       
   892             self.info('calling service %s asynchronously', regid)
       
   893             self.threaded_task(task)
       
   894         else:
       
   895             self.info('calling service %s synchronously', regid)
       
   896             return task()
       
   897 
   864     def user_info(self, sessionid, props=None):
   898     def user_info(self, sessionid, props=None):
   865         """this method should be used by client to:
   899         """this method should be used by client to:
   866         * check session id validity
   900         * check session id validity
   867         * update user information on each user's request (i.e. groups and
   901         * update user information on each user's request (i.e. groups and
   868           custom properties)
   902           custom properties)
   928           modified since the given timestamp (actually entities whose full text
   962           modified since the given timestamp (actually entities whose full text
   929           index content has changed)
   963           index content has changed)
   930         * list of (etype, eid) of entities of the given types which have been
   964         * list of (etype, eid) of entities of the given types which have been
   931           deleted since the given timestamp
   965           deleted since the given timestamp
   932         """
   966         """
   933         session = self.internal_session()
   967         with self.internal_session() as session:
   934         updatetime = datetime.utcnow()
   968             updatetime = datetime.utcnow()
   935         try:
       
   936             modentities, delentities = self.system_source.modified_entities(
   969             modentities, delentities = self.system_source.modified_entities(
   937                 session, etypes, mtime)
   970                 session, etypes, mtime)
   938             return updatetime, modentities, delentities
   971             return updatetime, modentities, delentities
   939         finally:
       
   940             session.close()
       
   941 
   972 
   942     # session handling ########################################################
   973     # session handling ########################################################
   943 
   974 
   944     def close_sessions(self):
   975     def close_sessions(self):
   945         """close every opened sessions"""
   976         """close every opened sessions"""
  1328         entity.eid = self.system_source.create_eid(session)
  1359         entity.eid = self.system_source.create_eid(session)
  1329         # set caches asap
  1360         # set caches asap
  1330         extid = self.init_entity_caches(session, entity, source)
  1361         extid = self.init_entity_caches(session, entity, source)
  1331         if server.DEBUG & server.DBG_REPO:
  1362         if server.DEBUG & server.DBG_REPO:
  1332             print 'ADD entity', self, entity.__regid__, entity.eid, edited
  1363             print 'ADD entity', self, entity.__regid__, entity.eid, edited
  1333         relations = []
  1364         prefill_entity_caches(entity)
  1334         prefill_entity_caches(entity, relations)
       
  1335         if source.should_call_hooks:
  1365         if source.should_call_hooks:
  1336             self.hm.call_hooks('before_add_entity', session, entity=entity)
  1366             self.hm.call_hooks('before_add_entity', session, entity=entity)
  1337         activintegrity = session.is_hook_category_activated('activeintegrity')
  1367         relations = []
       
  1368         activeintegrity = session.is_hook_category_activated('activeintegrity')
  1338         for attr in edited.iterkeys():
  1369         for attr in edited.iterkeys():
  1339             rschema = eschema.subjrels[attr]
  1370             rschema = eschema.subjrels[attr]
  1340             if not rschema.final: # inlined relation
  1371             if not rschema.final: # inlined relation
  1341                 value = edited[attr]
  1372                 value = edited[attr]
  1342                 relations.append((attr, value))
  1373                 relations.append((attr, value))
  1343                 session.update_rel_cache_add(entity.eid, attr, value)
  1374                 session.update_rel_cache_add(entity.eid, attr, value)
  1344                 rdef = session.rtype_eids_rdef(attr, entity.eid, value)
  1375                 rdef = session.rtype_eids_rdef(attr, entity.eid, value)
  1345                 if rdef.cardinality[1] in '1?' and activintegrity:
  1376                 if rdef.cardinality[1] in '1?' and activeintegrity:
  1346                     with security_enabled(session, read=False):
  1377                     with security_enabled(session, read=False):
  1347                         session.execute('DELETE X %s Y WHERE Y eid %%(y)s' % attr,
  1378                         session.execute('DELETE X %s Y WHERE Y eid %%(y)s' % attr,
  1348                                         {'x': entity.eid, 'y': value})
  1379                                         {'x': entity.eid, 'y': value})
  1349         edited.set_defaults()
  1380         edited.set_defaults()
  1350         if session.is_hook_category_activated('integrity'):
  1381         if session.is_hook_category_activated('integrity'):
  1652 
  1683 
  1653     # these are overridden by set_log_methods below
  1684     # these are overridden by set_log_methods below
  1654     # only defining here to prevent pylint from complaining
  1685     # only defining here to prevent pylint from complaining
  1655     info = warning = error = critical = exception = debug = lambda msg,*a,**kw: None
  1686     info = warning = error = critical = exception = debug = lambda msg,*a,**kw: None
  1656 
  1687 
       
  1688 
  1657 def pyro_unregister(config):
  1689 def pyro_unregister(config):
  1658     """unregister the repository from the pyro name server"""
  1690     """unregister the repository from the pyro name server"""
  1659     from logilab.common.pyro_ext import ns_unregister
  1691     from logilab.common.pyro_ext import ns_unregister
  1660     appid = config['pyro-instance-id'] or config.appid
  1692     appid = config['pyro-instance-id'] or config.appid
  1661     ns_unregister(appid, config['pyro-ns-group'], config['pyro-ns-host'])
  1693     ns_unregister(appid, config['pyro-ns-group'], config['pyro-ns-host'])