28 |
28 |
29 from __future__ import print_function |
29 from __future__ import print_function |
30 |
30 |
31 from warnings import warn |
31 from warnings import warn |
32 from itertools import chain |
32 from itertools import chain |
33 from time import time, localtime, strftime |
|
34 from contextlib import contextmanager |
33 from contextlib import contextmanager |
35 from logging import getLogger |
34 from logging import getLogger |
36 |
35 |
37 from six.moves import range, queue |
36 from six.moves import range, queue |
38 |
37 |
42 from yams import BadSchemaDefinition |
41 from yams import BadSchemaDefinition |
43 from rql.utils import rqlvar_maker |
42 from rql.utils import rqlvar_maker |
44 |
43 |
45 from cubicweb import (CW_MIGRATION_MAP, QueryError, |
44 from cubicweb import (CW_MIGRATION_MAP, QueryError, |
46 UnknownEid, AuthenticationError, ExecutionError, |
45 UnknownEid, AuthenticationError, ExecutionError, |
47 BadConnectionId, |
|
48 UniqueTogetherError, ViolatedConstraint) |
46 UniqueTogetherError, ViolatedConstraint) |
49 from cubicweb import set_log_methods |
47 from cubicweb import set_log_methods |
50 from cubicweb import cwvreg, schema, server |
48 from cubicweb import cwvreg, schema, server |
51 from cubicweb.server import utils, hook, querier, sources |
49 from cubicweb.server import utils, hook, querier, sources |
52 from cubicweb.server.session import Session, InternalManager |
50 from cubicweb.server.session import Session, InternalManager |
218 vreg = cwvreg.CWRegistryStore(config) |
216 vreg = cwvreg.CWRegistryStore(config) |
219 self.vreg = vreg |
217 self.vreg = vreg |
220 self._scheduler = scheduler |
218 self._scheduler = scheduler |
221 |
219 |
222 self.app_instances_bus = NullEventBus() |
220 self.app_instances_bus = NullEventBus() |
223 # dictionary of opened sessions |
|
224 self._sessions = {} |
|
225 |
221 |
226 # list of functions to be called at regular interval |
222 # list of functions to be called at regular interval |
227 # list of running threads |
223 # list of running threads |
228 self._running_threads = [] |
224 self._running_threads = [] |
229 # initial schema, should be build or replaced latter |
225 # initial schema, should be build or replaced latter |
388 import traceback |
384 import traceback |
389 traceback.print_exc() |
385 traceback.print_exc() |
390 raise Exception('Is the database initialised ? (cause: %s)' % ex) |
386 raise Exception('Is the database initialised ? (cause: %s)' % ex) |
391 return appschema |
387 return appschema |
392 |
388 |
393 def _prepare_startup(self): |
|
394 """Prepare "Repository as a server" for startup. |
|
395 |
|
396 * register session clean up task. |
|
397 """ |
|
398 if not (self.config.creating or self.config.repairing |
|
399 or self.config.quick_start): |
|
400 # register a task to cleanup expired session |
|
401 if self._scheduler is not None: |
|
402 self.cleanup_session_time = self.config['cleanup-session-time'] or 60 * 60 * 24 |
|
403 assert self.cleanup_session_time > 0 |
|
404 cleanup_session_interval = min(60 * 60, self.cleanup_session_time / 3) |
|
405 self.looping_task(cleanup_session_interval, self.clean_sessions) |
|
406 |
|
407 def run_scheduler(self): |
389 def run_scheduler(self): |
408 """Start repository scheduler after preparing the repository for that. |
390 """Start repository scheduler after preparing the repository for that. |
409 |
391 |
410 * trigger server startup hook, |
392 * trigger server startup hook, |
411 * register session clean up task, |
|
412 * start the scheduler *and block*. |
393 * start the scheduler *and block*. |
413 |
394 |
414 XXX Other startup related stuffs are done elsewhere. In Repository |
395 XXX Other startup related stuffs are done elsewhere. In Repository |
415 XXX __init__ or in external codes (various server managers). |
396 XXX __init__ or in external codes (various server managers). |
416 """ |
397 """ |
417 self._prepare_startup() |
|
418 assert self._scheduler is not None, \ |
398 assert self._scheduler is not None, \ |
419 "This Repository is not intended to be used as a server" |
399 "This Repository is not intended to be used as a server" |
420 self.info( |
400 self.info( |
421 'starting repository scheduler with tasks: %s', |
401 'starting repository scheduler with tasks: %s', |
422 ', '.join(e.action.__name__ for e in self._scheduler.queue)) |
402 ', '.join(e.action.__name__ for e in self._scheduler.queue)) |
673 # try to get a user object |
653 # try to get a user object |
674 user = self.authenticate_user(cnx, login, **kwargs) |
654 user = self.authenticate_user(cnx, login, **kwargs) |
675 session = Session(user, self) |
655 session = Session(user, self) |
676 user._cw = user.cw_rset.req = session |
656 user._cw = user.cw_rset.req = session |
677 user.cw_clear_relation_cache() |
657 user.cw_clear_relation_cache() |
678 self._sessions[session.sessionid] = session |
|
679 self.info('opened session %s for user %s', session.sessionid, login) |
658 self.info('opened session %s for user %s', session.sessionid, login) |
680 with session.new_cnx() as cnx: |
659 with session.new_cnx() as cnx: |
681 self.hm.call_hooks('session_open', cnx) |
660 self.hm.call_hooks('session_open', cnx) |
682 # commit connection at this point in case write operation has been |
661 # commit connection at this point in case write operation has been |
683 # done during `session_open` hooks |
662 # done during `session_open` hooks |
687 @deprecated('[3.23] use .new_session instead (and get a plain session object)') |
666 @deprecated('[3.23] use .new_session instead (and get a plain session object)') |
688 def connect(self, login, **kwargs): |
667 def connect(self, login, **kwargs): |
689 return self.new_session(login, **kwargs).sessionid |
668 return self.new_session(login, **kwargs).sessionid |
690 |
669 |
691 # session handling ######################################################## |
670 # session handling ######################################################## |
692 |
|
693 def clean_sessions(self): |
|
694 """close sessions not used since an amount of time specified in the |
|
695 configuration |
|
696 """ |
|
697 mintime = time() - self.cleanup_session_time |
|
698 self.debug('cleaning session unused since %s', |
|
699 strftime('%H:%M:%S', localtime(mintime))) |
|
700 nbclosed = 0 |
|
701 for session in list(self._sessions.values()): |
|
702 if session.timestamp < mintime: |
|
703 session.close() |
|
704 nbclosed += 1 |
|
705 return nbclosed |
|
706 |
671 |
707 @contextmanager |
672 @contextmanager |
708 def internal_cnx(self): |
673 def internal_cnx(self): |
709 """Context manager returning a Connection using internal user which have |
674 """Context manager returning a Connection using internal user which have |
710 every access rights on the repository. |
675 every access rights on the repository. |