diff -r 8af7c6d86efb -r a964c40adbe3 server/repository.py --- a/server/repository.py Tue Jul 10 10:33:19 2012 +0200 +++ b/server/repository.py Tue Jul 10 15:07:52 2012 +0200 @@ -1,4 +1,4 @@ -# copyright 2003-2011 LOGILAB S.A. (Paris, FRANCE), all rights reserved. +# copyright 2003-2012 LOGILAB S.A. (Paris, FRANCE), all rights reserved. # contact http://www.logilab.fr/ -- mailto:contact@logilab.fr # # This file is part of CubicWeb. @@ -65,7 +65,7 @@ ('cw_source', 'object'), ]) -def prefill_entity_caches(entity, relations): +def prefill_entity_caches(entity): session = entity._cw # prefill entity relation caches for rschema in entity.e_schema.subject_relations(): @@ -120,6 +120,21 @@ {'x': eidfrom, 'y': eidto}) + +class NullEventBus(object): + def publish(self, msg): + pass + + def add_subscription(self, topic, callback): + pass + + def start(self): + pass + + def stop(self): + pass + + class Repository(object): """a repository provides access to a set of persistent storages for entities and relations @@ -127,18 +142,22 @@ XXX protect pyro access """ - def __init__(self, config, vreg=None): + def __init__(self, config, tasks_manager=None, vreg=None): self.config = config if vreg is None: - vreg = cwvreg.CubicWebVRegistry(config) + vreg = cwvreg.CWRegistryStore(config) self.vreg = vreg + self._tasks_manager = tasks_manager + self.pyro_registered = False self.pyro_uri = None + self.app_instances_bus = NullEventBus() self.info('starting repository from %s', self.config.apphome) # dictionary of opened sessions self._sessions = {} + + # list of functions to be called at regular interval - self._looping_tasks = [] # list of running threads self._running_threads = [] # initial schema, should be build or replaced latter @@ -162,6 +181,9 @@ self.init_cnxset_pool() @onevent('after-registry-reload', self) def fix_user_classes(self): + # After registery reload the 'CWUser' class used for CWEtype + # changed. To any existing user object have a different class than + # the new loaded one. We are hot fixing this. usercls = self.vreg['etypes'].etype_class('CWUser') for session in self._sessions.values(): if not isinstance(session.user, InternalManager): @@ -232,8 +254,7 @@ or not 'CWSource' in self.schema: # # 3.10 migration self.system_source.init_creating() return - session = self.internal_session() - try: + with self.internal_session() as session: # FIXME: sources should be ordered (add_entity priority) for sourceent in session.execute( 'Any S, SN, SA, SC WHERE S is_instance_of CWSource, ' @@ -244,8 +265,6 @@ self.system_source.init(True, sourceent) continue self.add_source(sourceent, add_to_cnxsets=False) - finally: - session.close() def _clear_planning_caches(self): for cache in ('source_defs', 'is_multi_sources_relation', @@ -309,14 +328,13 @@ self.schema = schema def fill_schema(self): - """lod schema from the repository""" + """load schema from the repository""" from cubicweb.server.schemaserial import deserialize_schema self.info('loading schema from the repository') appschema = schema.CubicWebSchema(self.config.appid) self.set_schema(self.config.load_bootstrap_schema(), resetvreg=False) self.debug('deserializing db schema into %s %#x', appschema.name, id(appschema)) - session = self.internal_session() - try: + with self.internal_session() as session: try: deserialize_schema(appschema, session) except BadSchemaDefinition: @@ -327,11 +345,15 @@ raise Exception('Is the database initialised ? (cause: %s)' % (ex.args and ex.args[0].strip() or 'unknown')), \ None, sys.exc_info()[-1] - finally: - session.close() self.set_schema(appschema) - def start_looping_tasks(self): + + def _prepare_startup(self): + """Prepare "Repository as a server" for startup. + + * trigger server startup hook, + * register session clean up task. + """ if not (self.config.creating or self.config.repairing or self.config.quick_start): # call instance level initialisation hooks @@ -340,15 +362,23 @@ self.cleanup_session_time = self.config['cleanup-session-time'] or 60 * 60 * 24 assert self.cleanup_session_time > 0 cleanup_session_interval = min(60*60, self.cleanup_session_time / 3) - self.looping_task(cleanup_session_interval, self.clean_sessions) - assert isinstance(self._looping_tasks, list), 'already started' - for i, (interval, func, args) in enumerate(self._looping_tasks): - self._looping_tasks[i] = task = utils.LoopTask(self, interval, func, args) - self.info('starting task %s with interval %.2fs', task.name, - interval) - task.start() - # ensure no tasks will be further added - self._looping_tasks = tuple(self._looping_tasks) + assert self._tasks_manager is not None, "This Repository is not intended to be used as a server" + self._tasks_manager.add_looping_task(cleanup_session_interval, + self.clean_sessions) + + def start_looping_tasks(self): + """Actual "Repository as a server" startup. + + * trigger server startup hook, + * register session clean up task, + * start all tasks. + + XXX Other startup related stuffs are done elsewhere. In Repository + XXX __init__ or in external codes (various server managers). + """ + self._prepare_startup() + assert self._tasks_manager is not None, "This Repository is not intended to be used as a server" + self._tasks_manager.start() def looping_task(self, interval, func, *args): """register a function to be called every `interval` seconds. @@ -356,15 +386,12 @@ looping tasks can only be registered during repository initialization, once done this method will fail. """ - try: - self._looping_tasks.append( (interval, func, args) ) - except AttributeError: - raise RuntimeError("can't add looping task once the repository is started") + assert self._tasks_manager is not None, "This Repository is not intended to be used as a server" + self._tasks_manager.add_looping_task(interval, func, *args) def threaded_task(self, func): """start function in a separated thread""" - t = utils.RepoThread(func, self._running_threads) - t.start() + utils.RepoThread(func, self._running_threads).start() #@locked def _get_cnxset(self): @@ -392,21 +419,21 @@ connections """ assert not self.shutting_down, 'already shutting down' + if not (self.config.creating or self.config.repairing + or self.config.quick_start): + # then, the system source is still available + self.hm.call_hooks('before_server_shutdown', repo=self) self.shutting_down = True self.system_source.shutdown() - if isinstance(self._looping_tasks, tuple): # if tasks have been started - for looptask in self._looping_tasks: - self.info('canceling task %s...', looptask.name) - looptask.cancel() - looptask.join() - self.info('task %s finished', looptask.name) + if self._tasks_manager is not None: + self._tasks_manager.stop() + if not (self.config.creating or self.config.repairing + or self.config.quick_start): + self.hm.call_hooks('server_shutdown', repo=self) for thread in self._running_threads: self.info('waiting thread %s...', thread.getName()) thread.join() self.info('thread %s finished', thread.getName()) - if not (self.config.creating or self.config.repairing - or self.config.quick_start): - self.hm.call_hooks('server_shutdown', repo=self) self.close_sessions() while not self._cnxsets_pool.empty(): cnxset = self._cnxsets_pool.get_nowait() @@ -436,8 +463,10 @@ """validate authentication, raise AuthenticationError on failure, return associated CWUser's eid on success. """ - for source in self.sources: - if source.support_entity('CWUser'): + # iter on sources_by_uri then check enabled source since sources doesn't + # contain copy based sources + for source in self.sources_by_uri.itervalues(): + if self.config.source_enabled(source) and source.support_entity('CWUser'): try: return source.authenticate(session, login, **authinfo) except AuthenticationError: @@ -497,7 +526,8 @@ results['sql_no_cache'] = self.system_source.no_cache results['nb_open_sessions'] = len(self._sessions) results['nb_active_threads'] = threading.activeCount() - results['looping_tasks'] = ', '.join(str(t) for t in self._looping_tasks) + looping_tasks = self._tasks_manager._looping_tasks + results['looping_tasks'] = ', '.join(str(t) for t in looping_tasks) results['available_cnxsets'] = self._cnxsets_pool.qsize() results['threads'] = ', '.join(sorted(str(t) for t in threading.enumerate())) return results @@ -595,8 +625,7 @@ """ from logilab.common.changelog import Version vcconf = {} - session = self.internal_session() - try: + with self.internal_session() as session: for pk, version in session.execute( 'Any K,V WHERE P is CWProperty, P value V, P pkey K, ' 'P pkey ~="system.version.%"', build_descr=False): @@ -615,8 +644,6 @@ msg = ('instance has %s version %s but %s ' 'is installed. Run "cubicweb-ctl upgrade".') raise ExecutionError(msg % (cube, version, fsversion)) - finally: - session.close() return vcconf @cached @@ -637,14 +664,11 @@ This is a public method, not requiring a session id. """ - session = self.internal_session() - try: + with self.internal_session() as session: # don't use session.execute, we don't want rset.req set return self.querier.execute(session, 'Any K,V WHERE P is CWProperty,' 'P pkey K, P value V, NOT P for_user U', build_descr=False) - finally: - session.close() # XXX protect this method: anonymous should be allowed and registration # plugged @@ -653,10 +677,9 @@ given password. This method is designed to be used for anonymous registration on public web site. """ - session = self.internal_session() - # for consistency, keep same error as unique check hook (although not required) - errmsg = session._('the value "%s" is already used, use another one') - try: + with self.internal_session() as session: + # for consistency, keep same error as unique check hook (although not required) + errmsg = session._('the value "%s" is already used, use another one') if (session.execute('CWUser X WHERE X login %(login)s', {'login': login}, build_descr=False) or session.execute('CWUser X WHERE X use_email C, C address %(login)s', @@ -683,8 +706,6 @@ 'U primary_email X, U use_email X ' 'WHERE U login %(login)s', d, build_descr=False) session.commit() - finally: - session.close() return True def find_users(self, fetch_attrs, **query_attrs): @@ -707,8 +728,7 @@ for k in chain(fetch_attrs, query_attrs.iterkeys()): if k not in cwuserattrs: raise Exception('bad input for find_user') - session = self.internal_session() - try: + with self.internal_session() as session: varmaker = rqlvar_maker() vars = [(attr, varmaker.next()) for attr in fetch_attrs] rql = 'Any %s WHERE X is CWUser, ' % ','.join(var[1] for var in vars) @@ -717,8 +737,6 @@ for attr in query_attrs.iterkeys()), query_attrs) return rset.rows - finally: - session.close() def connect(self, login, **kwargs): """open a connection for a given user @@ -730,13 +748,10 @@ raise `ConnectionError` if we can't open a connection """ # use an internal connection - session = self.internal_session() - # try to get a user object - cnxprops = kwargs.pop('cnxprops', None) - try: + with self.internal_session() as session: + # try to get a user object + cnxprops = kwargs.pop('cnxprops', None) user = self.authenticate_user(session, login, **kwargs) - finally: - session.close() session = Session(user, self, cnxprops) user._cw = user.cw_rset.req = session user.cw_clear_relation_cache() @@ -861,6 +876,25 @@ del self._sessions[sessionid] self.info('closed session %s for user %s', sessionid, session.user.login) + def call_service(self, sessionid, regid, async, **kwargs): + """ + See :class:`cubicweb.dbapi.Connection.call_service` + and :class:`cubicweb.server.Service` + """ + def task(): + session = self._get_session(sessionid, setcnxset=True) + service = session.vreg['services'].select(regid, session, **kwargs) + try: + return service.call(**kwargs) + finally: + session.rollback() # free cnxset + if async: + self.info('calling service %s asynchronously', regid) + self.threaded_task(task) + else: + self.info('calling service %s synchronously', regid) + return task() + def user_info(self, sessionid, props=None): """this method should be used by client to: * check session id validity @@ -930,14 +964,11 @@ * list of (etype, eid) of entities of the given types which have been deleted since the given timestamp """ - session = self.internal_session() - updatetime = datetime.utcnow() - try: + with self.internal_session() as session: + updatetime = datetime.utcnow() modentities, delentities = self.system_source.modified_entities( session, etypes, mtime) return updatetime, modentities, delentities - finally: - session.close() # session handling ######################################################## @@ -1330,11 +1361,11 @@ extid = self.init_entity_caches(session, entity, source) if server.DEBUG & server.DBG_REPO: print 'ADD entity', self, entity.__regid__, entity.eid, edited - relations = [] - prefill_entity_caches(entity, relations) + prefill_entity_caches(entity) if source.should_call_hooks: self.hm.call_hooks('before_add_entity', session, entity=entity) - activintegrity = session.is_hook_category_activated('activeintegrity') + relations = [] + activeintegrity = session.is_hook_category_activated('activeintegrity') for attr in edited.iterkeys(): rschema = eschema.subjrels[attr] if not rschema.final: # inlined relation @@ -1342,7 +1373,7 @@ relations.append((attr, value)) session.update_rel_cache_add(entity.eid, attr, value) rdef = session.rtype_eids_rdef(attr, entity.eid, value) - if rdef.cardinality[1] in '1?' and activintegrity: + if rdef.cardinality[1] in '1?' and activeintegrity: with security_enabled(session, read=False): session.execute('DELETE X %s Y WHERE Y eid %%(y)s' % attr, {'x': entity.eid, 'y': value}) @@ -1654,6 +1685,7 @@ # only defining here to prevent pylint from complaining info = warning = error = critical = exception = debug = lambda msg,*a,**kw: None + def pyro_unregister(config): """unregister the repository from the pyro name server""" from logilab.common.pyro_ext import ns_unregister