118 session.execute('DELETE X %s Y WHERE Y eid %%(y)s, ' |
118 session.execute('DELETE X %s Y WHERE Y eid %%(y)s, ' |
119 'NOT X eid %%(x)s' % rtype, |
119 'NOT X eid %%(x)s' % rtype, |
120 {'x': eidfrom, 'y': eidto}) |
120 {'x': eidfrom, 'y': eidto}) |
121 |
121 |
122 |
122 |
|
123 |
|
124 class NullEventBus(object): |
|
125 def publish(self, msg): |
|
126 pass |
|
127 |
|
128 def add_subscription(self, topic, callback): |
|
129 pass |
|
130 |
|
131 def start(self): |
|
132 pass |
|
133 |
|
134 def stop(self): |
|
135 pass |
|
136 |
|
137 |
123 class Repository(object): |
138 class Repository(object): |
124 """a repository provides access to a set of persistent storages for |
139 """a repository provides access to a set of persistent storages for |
125 entities and relations |
140 entities and relations |
126 |
141 |
127 XXX protect pyro access |
142 XXX protect pyro access |
128 """ |
143 """ |
129 |
144 |
130 def __init__(self, config, vreg=None): |
145 def __init__(self, config, tasks_manager=None, vreg=None): |
131 self.config = config |
146 self.config = config |
132 if vreg is None: |
147 if vreg is None: |
133 vreg = cwvreg.CubicWebVRegistry(config) |
148 vreg = cwvreg.CWRegistryStore(config) |
134 self.vreg = vreg |
149 self.vreg = vreg |
|
150 self._tasks_manager = tasks_manager |
|
151 |
135 self.pyro_registered = False |
152 self.pyro_registered = False |
136 self.pyro_uri = None |
153 self.pyro_uri = None |
|
154 self.app_instances_bus = NullEventBus() |
137 self.info('starting repository from %s', self.config.apphome) |
155 self.info('starting repository from %s', self.config.apphome) |
138 # dictionary of opened sessions |
156 # dictionary of opened sessions |
139 self._sessions = {} |
157 self._sessions = {} |
|
158 |
|
159 |
140 # list of functions to be called at regular interval |
160 # list of functions to be called at regular interval |
141 self._looping_tasks = [] |
|
142 # list of running threads |
161 # list of running threads |
143 self._running_threads = [] |
162 self._running_threads = [] |
144 # initial schema, should be build or replaced latter |
163 # initial schema, should be build or replaced latter |
145 self.schema = schema.CubicWebSchema(config.appid) |
164 self.schema = schema.CubicWebSchema(config.appid) |
146 self.vreg.schema = self.schema # until actual schema is loaded... |
165 self.vreg.schema = self.schema # until actual schema is loaded... |
230 self.sources_by_eid = {} |
252 self.sources_by_eid = {} |
231 if self.config.quick_start \ |
253 if self.config.quick_start \ |
232 or not 'CWSource' in self.schema: # # 3.10 migration |
254 or not 'CWSource' in self.schema: # # 3.10 migration |
233 self.system_source.init_creating() |
255 self.system_source.init_creating() |
234 return |
256 return |
235 session = self.internal_session() |
257 with self.internal_session() as session: |
236 try: |
|
237 # FIXME: sources should be ordered (add_entity priority) |
258 # FIXME: sources should be ordered (add_entity priority) |
238 for sourceent in session.execute( |
259 for sourceent in session.execute( |
239 'Any S, SN, SA, SC WHERE S is_instance_of CWSource, ' |
260 'Any S, SN, SA, SC WHERE S is_instance_of CWSource, ' |
240 'S name SN, S type SA, S config SC').entities(): |
261 'S name SN, S type SA, S config SC').entities(): |
241 if sourceent.name == 'system': |
262 if sourceent.name == 'system': |
242 self.system_source.eid = sourceent.eid |
263 self.system_source.eid = sourceent.eid |
243 self.sources_by_eid[sourceent.eid] = self.system_source |
264 self.sources_by_eid[sourceent.eid] = self.system_source |
244 self.system_source.init(True, sourceent) |
265 self.system_source.init(True, sourceent) |
245 continue |
266 continue |
246 self.add_source(sourceent, add_to_cnxsets=False) |
267 self.add_source(sourceent, add_to_cnxsets=False) |
247 finally: |
|
248 session.close() |
|
249 |
268 |
250 def _clear_planning_caches(self): |
269 def _clear_planning_caches(self): |
251 for cache in ('source_defs', 'is_multi_sources_relation', |
270 for cache in ('source_defs', 'is_multi_sources_relation', |
252 'can_cross_relation', 'rel_type_sources'): |
271 'can_cross_relation', 'rel_type_sources'): |
253 clear_cache(self, cache) |
272 clear_cache(self, cache) |
307 for source in self.sources_by_uri.values(): |
326 for source in self.sources_by_uri.values(): |
308 source.set_schema(schema) |
327 source.set_schema(schema) |
309 self.schema = schema |
328 self.schema = schema |
310 |
329 |
311 def fill_schema(self): |
330 def fill_schema(self): |
312 """lod schema from the repository""" |
331 """load schema from the repository""" |
313 from cubicweb.server.schemaserial import deserialize_schema |
332 from cubicweb.server.schemaserial import deserialize_schema |
314 self.info('loading schema from the repository') |
333 self.info('loading schema from the repository') |
315 appschema = schema.CubicWebSchema(self.config.appid) |
334 appschema = schema.CubicWebSchema(self.config.appid) |
316 self.set_schema(self.config.load_bootstrap_schema(), resetvreg=False) |
335 self.set_schema(self.config.load_bootstrap_schema(), resetvreg=False) |
317 self.debug('deserializing db schema into %s %#x', appschema.name, id(appschema)) |
336 self.debug('deserializing db schema into %s %#x', appschema.name, id(appschema)) |
318 session = self.internal_session() |
337 with self.internal_session() as session: |
319 try: |
|
320 try: |
338 try: |
321 deserialize_schema(appschema, session) |
339 deserialize_schema(appschema, session) |
322 except BadSchemaDefinition: |
340 except BadSchemaDefinition: |
323 raise |
341 raise |
324 except Exception, ex: |
342 except Exception, ex: |
325 import traceback |
343 import traceback |
326 traceback.print_exc() |
344 traceback.print_exc() |
327 raise Exception('Is the database initialised ? (cause: %s)' % |
345 raise Exception('Is the database initialised ? (cause: %s)' % |
328 (ex.args and ex.args[0].strip() or 'unknown')), \ |
346 (ex.args and ex.args[0].strip() or 'unknown')), \ |
329 None, sys.exc_info()[-1] |
347 None, sys.exc_info()[-1] |
330 finally: |
|
331 session.close() |
|
332 self.set_schema(appschema) |
348 self.set_schema(appschema) |
333 |
349 |
334 def start_looping_tasks(self): |
350 |
|
351 def _prepare_startup(self): |
|
352 """Prepare "Repository as a server" for startup. |
|
353 |
|
354 * trigger server startup hook, |
|
355 * register session clean up task. |
|
356 """ |
335 if not (self.config.creating or self.config.repairing |
357 if not (self.config.creating or self.config.repairing |
336 or self.config.quick_start): |
358 or self.config.quick_start): |
337 # call instance level initialisation hooks |
359 # call instance level initialisation hooks |
338 self.hm.call_hooks('server_startup', repo=self) |
360 self.hm.call_hooks('server_startup', repo=self) |
339 # register a task to cleanup expired session |
361 # register a task to cleanup expired session |
340 self.cleanup_session_time = self.config['cleanup-session-time'] or 60 * 60 * 24 |
362 self.cleanup_session_time = self.config['cleanup-session-time'] or 60 * 60 * 24 |
341 assert self.cleanup_session_time > 0 |
363 assert self.cleanup_session_time > 0 |
342 cleanup_session_interval = min(60*60, self.cleanup_session_time / 3) |
364 cleanup_session_interval = min(60*60, self.cleanup_session_time / 3) |
343 self.looping_task(cleanup_session_interval, self.clean_sessions) |
365 assert self._tasks_manager is not None, "This Repository is not intended to be used as a server" |
344 assert isinstance(self._looping_tasks, list), 'already started' |
366 self._tasks_manager.add_looping_task(cleanup_session_interval, |
345 for i, (interval, func, args) in enumerate(self._looping_tasks): |
367 self.clean_sessions) |
346 self._looping_tasks[i] = task = utils.LoopTask(self, interval, func, args) |
368 |
347 self.info('starting task %s with interval %.2fs', task.name, |
369 def start_looping_tasks(self): |
348 interval) |
370 """Actual "Repository as a server" startup. |
349 task.start() |
371 |
350 # ensure no tasks will be further added |
372 * trigger server startup hook, |
351 self._looping_tasks = tuple(self._looping_tasks) |
373 * register session clean up task, |
|
374 * start all tasks. |
|
375 |
|
376 XXX Other startup related stuffs are done elsewhere. In Repository |
|
377 XXX __init__ or in external codes (various server managers). |
|
378 """ |
|
379 self._prepare_startup() |
|
380 assert self._tasks_manager is not None, "This Repository is not intended to be used as a server" |
|
381 self._tasks_manager.start() |
352 |
382 |
353 def looping_task(self, interval, func, *args): |
383 def looping_task(self, interval, func, *args): |
354 """register a function to be called every `interval` seconds. |
384 """register a function to be called every `interval` seconds. |
355 |
385 |
356 looping tasks can only be registered during repository initialization, |
386 looping tasks can only be registered during repository initialization, |
357 once done this method will fail. |
387 once done this method will fail. |
358 """ |
388 """ |
359 try: |
389 assert self._tasks_manager is not None, "This Repository is not intended to be used as a server" |
360 self._looping_tasks.append( (interval, func, args) ) |
390 self._tasks_manager.add_looping_task(interval, func, *args) |
361 except AttributeError: |
|
362 raise RuntimeError("can't add looping task once the repository is started") |
|
363 |
391 |
364 def threaded_task(self, func): |
392 def threaded_task(self, func): |
365 """start function in a separated thread""" |
393 """start function in a separated thread""" |
366 t = utils.RepoThread(func, self._running_threads) |
394 utils.RepoThread(func, self._running_threads).start() |
367 t.start() |
|
368 |
395 |
369 #@locked |
396 #@locked |
370 def _get_cnxset(self): |
397 def _get_cnxset(self): |
371 try: |
398 try: |
372 return self._cnxsets_pool.get(True, timeout=5) |
399 return self._cnxsets_pool.get(True, timeout=5) |
390 def shutdown(self): |
417 def shutdown(self): |
391 """called on server stop event to properly close opened sessions and |
418 """called on server stop event to properly close opened sessions and |
392 connections |
419 connections |
393 """ |
420 """ |
394 assert not self.shutting_down, 'already shutting down' |
421 assert not self.shutting_down, 'already shutting down' |
|
422 if not (self.config.creating or self.config.repairing |
|
423 or self.config.quick_start): |
|
424 # then, the system source is still available |
|
425 self.hm.call_hooks('before_server_shutdown', repo=self) |
395 self.shutting_down = True |
426 self.shutting_down = True |
396 self.system_source.shutdown() |
427 self.system_source.shutdown() |
397 if isinstance(self._looping_tasks, tuple): # if tasks have been started |
428 if self._tasks_manager is not None: |
398 for looptask in self._looping_tasks: |
429 self._tasks_manager.stop() |
399 self.info('canceling task %s...', looptask.name) |
430 if not (self.config.creating or self.config.repairing |
400 looptask.cancel() |
431 or self.config.quick_start): |
401 looptask.join() |
432 self.hm.call_hooks('server_shutdown', repo=self) |
402 self.info('task %s finished', looptask.name) |
|
403 for thread in self._running_threads: |
433 for thread in self._running_threads: |
404 self.info('waiting thread %s...', thread.getName()) |
434 self.info('waiting thread %s...', thread.getName()) |
405 thread.join() |
435 thread.join() |
406 self.info('thread %s finished', thread.getName()) |
436 self.info('thread %s finished', thread.getName()) |
407 if not (self.config.creating or self.config.repairing |
|
408 or self.config.quick_start): |
|
409 self.hm.call_hooks('server_shutdown', repo=self) |
|
410 self.close_sessions() |
437 self.close_sessions() |
411 while not self._cnxsets_pool.empty(): |
438 while not self._cnxsets_pool.empty(): |
412 cnxset = self._cnxsets_pool.get_nowait() |
439 cnxset = self._cnxsets_pool.get_nowait() |
413 try: |
440 try: |
414 cnxset.close(True) |
441 cnxset.close(True) |
635 def properties(self): |
662 def properties(self): |
636 """Return a result set containing system wide properties. |
663 """Return a result set containing system wide properties. |
637 |
664 |
638 This is a public method, not requiring a session id. |
665 This is a public method, not requiring a session id. |
639 """ |
666 """ |
640 session = self.internal_session() |
667 with self.internal_session() as session: |
641 try: |
|
642 # don't use session.execute, we don't want rset.req set |
668 # don't use session.execute, we don't want rset.req set |
643 return self.querier.execute(session, 'Any K,V WHERE P is CWProperty,' |
669 return self.querier.execute(session, 'Any K,V WHERE P is CWProperty,' |
644 'P pkey K, P value V, NOT P for_user U', |
670 'P pkey K, P value V, NOT P for_user U', |
645 build_descr=False) |
671 build_descr=False) |
646 finally: |
|
647 session.close() |
|
648 |
672 |
649 # XXX protect this method: anonymous should be allowed and registration |
673 # XXX protect this method: anonymous should be allowed and registration |
650 # plugged |
674 # plugged |
651 def register_user(self, login, password, email=None, **kwargs): |
675 def register_user(self, login, password, email=None, **kwargs): |
652 """check a user with the given login exists, if not create it with the |
676 """check a user with the given login exists, if not create it with the |
653 given password. This method is designed to be used for anonymous |
677 given password. This method is designed to be used for anonymous |
654 registration on public web site. |
678 registration on public web site. |
655 """ |
679 """ |
656 session = self.internal_session() |
680 with self.internal_session() as session: |
657 # for consistency, keep same error as unique check hook (although not required) |
681 # for consistency, keep same error as unique check hook (although not required) |
658 errmsg = session._('the value "%s" is already used, use another one') |
682 errmsg = session._('the value "%s" is already used, use another one') |
659 try: |
|
660 if (session.execute('CWUser X WHERE X login %(login)s', {'login': login}, |
683 if (session.execute('CWUser X WHERE X login %(login)s', {'login': login}, |
661 build_descr=False) |
684 build_descr=False) |
662 or session.execute('CWUser X WHERE X use_email C, C address %(login)s', |
685 or session.execute('CWUser X WHERE X use_email C, C address %(login)s', |
663 {'login': login}, build_descr=False)): |
686 {'login': login}, build_descr=False)): |
664 qname = role_name('login', 'subject') |
687 qname = role_name('login', 'subject') |
705 if not rschema.meta) |
726 if not rschema.meta) |
706 cwuserattrs = self._cwuser_attrs |
727 cwuserattrs = self._cwuser_attrs |
707 for k in chain(fetch_attrs, query_attrs.iterkeys()): |
728 for k in chain(fetch_attrs, query_attrs.iterkeys()): |
708 if k not in cwuserattrs: |
729 if k not in cwuserattrs: |
709 raise Exception('bad input for find_user') |
730 raise Exception('bad input for find_user') |
710 session = self.internal_session() |
731 with self.internal_session() as session: |
711 try: |
|
712 varmaker = rqlvar_maker() |
732 varmaker = rqlvar_maker() |
713 vars = [(attr, varmaker.next()) for attr in fetch_attrs] |
733 vars = [(attr, varmaker.next()) for attr in fetch_attrs] |
714 rql = 'Any %s WHERE X is CWUser, ' % ','.join(var[1] for var in vars) |
734 rql = 'Any %s WHERE X is CWUser, ' % ','.join(var[1] for var in vars) |
715 rql += ','.join('X %s %s' % (var[0], var[1]) for var in vars) + ',' |
735 rql += ','.join('X %s %s' % (var[0], var[1]) for var in vars) + ',' |
716 rset = session.execute(rql + ','.join('X %s %%(%s)s' % (attr, attr) |
736 rset = session.execute(rql + ','.join('X %s %%(%s)s' % (attr, attr) |
717 for attr in query_attrs.iterkeys()), |
737 for attr in query_attrs.iterkeys()), |
718 query_attrs) |
738 query_attrs) |
719 return rset.rows |
739 return rset.rows |
720 finally: |
|
721 session.close() |
|
722 |
740 |
723 def connect(self, login, **kwargs): |
741 def connect(self, login, **kwargs): |
724 """open a connection for a given user |
742 """open a connection for a given user |
725 |
743 |
726 base_url may be needed to send mails |
744 base_url may be needed to send mails |
728 |
746 |
729 raise `AuthenticationError` if the authentication failed |
747 raise `AuthenticationError` if the authentication failed |
730 raise `ConnectionError` if we can't open a connection |
748 raise `ConnectionError` if we can't open a connection |
731 """ |
749 """ |
732 # use an internal connection |
750 # use an internal connection |
733 session = self.internal_session() |
751 with self.internal_session() as session: |
734 # try to get a user object |
752 # try to get a user object |
735 cnxprops = kwargs.pop('cnxprops', None) |
753 cnxprops = kwargs.pop('cnxprops', None) |
736 try: |
|
737 user = self.authenticate_user(session, login, **kwargs) |
754 user = self.authenticate_user(session, login, **kwargs) |
738 finally: |
|
739 session.close() |
|
740 session = Session(user, self, cnxprops) |
755 session = Session(user, self, cnxprops) |
741 user._cw = user.cw_rset.req = session |
756 user._cw = user.cw_rset.req = session |
742 user.cw_clear_relation_cache() |
757 user.cw_clear_relation_cache() |
743 self._sessions[session.id] = session |
758 self._sessions[session.id] = session |
744 self.info('opened session %s for user %s', session.id, login) |
759 self.info('opened session %s for user %s', session.id, login) |
859 session.commit() |
874 session.commit() |
860 session.close() |
875 session.close() |
861 del self._sessions[sessionid] |
876 del self._sessions[sessionid] |
862 self.info('closed session %s for user %s', sessionid, session.user.login) |
877 self.info('closed session %s for user %s', sessionid, session.user.login) |
863 |
878 |
|
879 def call_service(self, sessionid, regid, async, **kwargs): |
|
880 """ |
|
881 See :class:`cubicweb.dbapi.Connection.call_service` |
|
882 and :class:`cubicweb.server.Service` |
|
883 """ |
|
884 def task(): |
|
885 session = self._get_session(sessionid, setcnxset=True) |
|
886 service = session.vreg['services'].select(regid, session, **kwargs) |
|
887 try: |
|
888 return service.call(**kwargs) |
|
889 finally: |
|
890 session.rollback() # free cnxset |
|
891 if async: |
|
892 self.info('calling service %s asynchronously', regid) |
|
893 self.threaded_task(task) |
|
894 else: |
|
895 self.info('calling service %s synchronously', regid) |
|
896 return task() |
|
897 |
864 def user_info(self, sessionid, props=None): |
898 def user_info(self, sessionid, props=None): |
865 """this method should be used by client to: |
899 """this method should be used by client to: |
866 * check session id validity |
900 * check session id validity |
867 * update user information on each user's request (i.e. groups and |
901 * update user information on each user's request (i.e. groups and |
868 custom properties) |
902 custom properties) |
928 modified since the given timestamp (actually entities whose full text |
962 modified since the given timestamp (actually entities whose full text |
929 index content has changed) |
963 index content has changed) |
930 * list of (etype, eid) of entities of the given types which have been |
964 * list of (etype, eid) of entities of the given types which have been |
931 deleted since the given timestamp |
965 deleted since the given timestamp |
932 """ |
966 """ |
933 session = self.internal_session() |
967 with self.internal_session() as session: |
934 updatetime = datetime.utcnow() |
968 updatetime = datetime.utcnow() |
935 try: |
|
936 modentities, delentities = self.system_source.modified_entities( |
969 modentities, delentities = self.system_source.modified_entities( |
937 session, etypes, mtime) |
970 session, etypes, mtime) |
938 return updatetime, modentities, delentities |
971 return updatetime, modentities, delentities |
939 finally: |
|
940 session.close() |
|
941 |
972 |
942 # session handling ######################################################## |
973 # session handling ######################################################## |
943 |
974 |
944 def close_sessions(self): |
975 def close_sessions(self): |
945 """close every opened sessions""" |
976 """close every opened sessions""" |
1328 entity.eid = self.system_source.create_eid(session) |
1359 entity.eid = self.system_source.create_eid(session) |
1329 # set caches asap |
1360 # set caches asap |
1330 extid = self.init_entity_caches(session, entity, source) |
1361 extid = self.init_entity_caches(session, entity, source) |
1331 if server.DEBUG & server.DBG_REPO: |
1362 if server.DEBUG & server.DBG_REPO: |
1332 print 'ADD entity', self, entity.__regid__, entity.eid, edited |
1363 print 'ADD entity', self, entity.__regid__, entity.eid, edited |
1333 relations = [] |
1364 prefill_entity_caches(entity) |
1334 prefill_entity_caches(entity, relations) |
|
1335 if source.should_call_hooks: |
1365 if source.should_call_hooks: |
1336 self.hm.call_hooks('before_add_entity', session, entity=entity) |
1366 self.hm.call_hooks('before_add_entity', session, entity=entity) |
1337 activintegrity = session.is_hook_category_activated('activeintegrity') |
1367 relations = [] |
|
1368 activeintegrity = session.is_hook_category_activated('activeintegrity') |
1338 for attr in edited.iterkeys(): |
1369 for attr in edited.iterkeys(): |
1339 rschema = eschema.subjrels[attr] |
1370 rschema = eschema.subjrels[attr] |
1340 if not rschema.final: # inlined relation |
1371 if not rschema.final: # inlined relation |
1341 value = edited[attr] |
1372 value = edited[attr] |
1342 relations.append((attr, value)) |
1373 relations.append((attr, value)) |
1343 session.update_rel_cache_add(entity.eid, attr, value) |
1374 session.update_rel_cache_add(entity.eid, attr, value) |
1344 rdef = session.rtype_eids_rdef(attr, entity.eid, value) |
1375 rdef = session.rtype_eids_rdef(attr, entity.eid, value) |
1345 if rdef.cardinality[1] in '1?' and activintegrity: |
1376 if rdef.cardinality[1] in '1?' and activeintegrity: |
1346 with security_enabled(session, read=False): |
1377 with security_enabled(session, read=False): |
1347 session.execute('DELETE X %s Y WHERE Y eid %%(y)s' % attr, |
1378 session.execute('DELETE X %s Y WHERE Y eid %%(y)s' % attr, |
1348 {'x': entity.eid, 'y': value}) |
1379 {'x': entity.eid, 'y': value}) |
1349 edited.set_defaults() |
1380 edited.set_defaults() |
1350 if session.is_hook_category_activated('integrity'): |
1381 if session.is_hook_category_activated('integrity'): |