server/repository.py
changeset 2839 6419af16faa0
parent 2835 04034421b072
child 2847 c2ee28f4d4b1
equal deleted inserted replaced
2835:04034421b072 2839:6419af16faa0
    31 from cubicweb import (CW_SOFTWARE_ROOT, CW_MIGRATION_MAP, CW_EVENT_MANAGER,
    31 from cubicweb import (CW_SOFTWARE_ROOT, CW_MIGRATION_MAP, CW_EVENT_MANAGER,
    32                       UnknownEid, AuthenticationError, ExecutionError,
    32                       UnknownEid, AuthenticationError, ExecutionError,
    33                       ETypeNotSupportedBySources, RTypeNotSupportedBySources,
    33                       ETypeNotSupportedBySources, RTypeNotSupportedBySources,
    34                       BadConnectionId, Unauthorized, ValidationError,
    34                       BadConnectionId, Unauthorized, ValidationError,
    35                       typed_eid)
    35                       typed_eid)
    36 from cubicweb.cwvreg import CubicWebVRegistry
    36 from cubicweb import cwvreg, schema, server
    37 from cubicweb.schema import VIRTUAL_RTYPES, CubicWebSchema
    37 from cubicweb.server import utils, hook, pool, querier, sources
    38 from cubicweb import server
       
    39 from cubicweb.server.utils import RepoThread, LoopTask
       
    40 from cubicweb.server.pool import ConnectionsPool, LateOperation, SingleLastOperation
       
    41 from cubicweb.server.session import Session, InternalSession
    38 from cubicweb.server.session import Session, InternalSession
    42 from cubicweb.server.querier import QuerierHelper
    39 
    43 from cubicweb.server.sources import get_source
    40 
    44 from cubicweb.server.hookhelper import rproperty
    41 class CleanupEidTypeCacheOp(hook.SingleLastOperation):
    45 
       
    46 
       
    47 class CleanupEidTypeCacheOp(SingleLastOperation):
       
    48     """on rollback of a insert query or commit of delete query, we have to
    42     """on rollback of a insert query or commit of delete query, we have to
    49     clear repository's cache from no more valid entries
    43     clear repository's cache from no more valid entries
    50 
    44 
    51     NOTE: querier's rqlst/solutions cache may have been polluted too with
    45     NOTE: querier's rqlst/solutions cache may have been polluted too with
    52     queries such as Any X WHERE X eid 32 if 32 has been rollbacked however
    46     queries such as Any X WHERE X eid 32 if 32 has been rollbacked however
    72             self.repo.clear_caches(self.session.transaction_data['neweids'])
    66             self.repo.clear_caches(self.session.transaction_data['neweids'])
    73         except KeyError:
    67         except KeyError:
    74             pass
    68             pass
    75 
    69 
    76 
    70 
    77 class FTIndexEntityOp(LateOperation):
    71 class FTIndexEntityOp(hook.LateOperation):
    78     """operation to delay entity full text indexation to commit
    72     """operation to delay entity full text indexation to commit
    79 
    73 
    80     since fti indexing may trigger discovery of other entities, it should be
    74     since fti indexing may trigger discovery of other entities, it should be
    81     triggered on precommit, not commit, and this should be done after other
    75     triggered on precommit, not commit, and this should be done after other
    82     precommit operation which may add relations to the entity
    76     precommit operation which may add relations to the entity
   105     """
    99     """
   106     # skip delete queries (only?) if session is an internal session. This is
   100     # skip delete queries (only?) if session is an internal session. This is
   107     # hooks responsability to ensure they do not violate relation's cardinality
   101     # hooks responsability to ensure they do not violate relation's cardinality
   108     if session.is_super_session:
   102     if session.is_super_session:
   109         return
   103         return
   110     card = rproperty(session, rtype, eidfrom, eidto, 'cardinality')
   104     card = session.schema_rproperty(rtype, eidfrom, eidto, 'cardinality')
   111     # one may be tented to check for neweids but this may cause more than one
   105     # one may be tented to check for neweids but this may cause more than one
   112     # relation even with '1?'  cardinality if thoses relations are added in the
   106     # relation even with '1?'  cardinality if thoses relations are added in the
   113     # same transaction where the entity is being created. This never occurs from
   107     # same transaction where the entity is being created. This never occurs from
   114     # the web interface but may occurs during test or dbapi connection (though
   108     # the web interface but may occurs during test or dbapi connection (though
   115     # not expected for this).  So: don't do it, we pretend to ensure repository
   109     # not expected for this).  So: don't do it, we pretend to ensure repository
   134     """
   128     """
   135 
   129 
   136     def __init__(self, config, vreg=None, debug=False):
   130     def __init__(self, config, vreg=None, debug=False):
   137         self.config = config
   131         self.config = config
   138         if vreg is None:
   132         if vreg is None:
   139             vreg = CubicWebVRegistry(config, debug)
   133             vreg = cwvreg.CubicWebVRegistry(config, debug)
   140         self.vreg = vreg
   134         self.vreg = vreg
   141         self.pyro_registered = False
   135         self.pyro_registered = False
   142         self.info('starting repository from %s', self.config.apphome)
   136         self.info('starting repository from %s', self.config.apphome)
   143         # dictionary of opened sessions
   137         # dictionary of opened sessions
   144         self._sessions = {}
   138         self._sessions = {}
   145         # list of functions to be called at regular interval
   139         # list of functions to be called at regular interval
   146         self._looping_tasks = []
   140         self._looping_tasks = []
   147         # list of running threads
   141         # list of running threads
   148         self._running_threads = []
   142         self._running_threads = []
   149         # initial schema, should be build or replaced latter
   143         # initial schema, should be build or replaced latter
   150         self.schema = CubicWebSchema(config.appid)
   144         self.schema = schema.CubicWebSchema(config.appid)
   151         # querier helper, need to be created after sources initialization
   145         # querier helper, need to be created after sources initialization
   152         self.querier = QuerierHelper(self, self.schema)
   146         self.querier = querier.QuerierHelper(self, self.schema)
   153         # should we reindex in changes?
   147         # should we reindex in changes?
   154         self.do_fti = not config['delay-full-text-indexation']
   148         self.do_fti = not config['delay-full-text-indexation']
   155         # sources
   149         # sources
   156         self.sources = []
   150         self.sources = []
   157         self.sources_by_uri = {}
   151         self.sources_by_uri = {}
   172         self._type_source_cache = {}
   166         self._type_source_cache = {}
   173         # cache (extid, source uri) -> eid
   167         # cache (extid, source uri) -> eid
   174         self._extid_cache = {}
   168         self._extid_cache = {}
   175         # open some connections pools
   169         # open some connections pools
   176         self._available_pools = Queue.Queue()
   170         self._available_pools = Queue.Queue()
   177         self._available_pools.put_nowait(ConnectionsPool(self.sources))
   171         self._available_pools.put_nowait(pool.ConnectionsPool(self.sources))
   178         if config.read_instance_schema:
   172         if config.read_instance_schema:
   179             # normal start: load the instance schema from the database
   173             # normal start: load the instance schema from the database
   180             self.fill_schema()
   174             self.fill_schema()
   181         elif config.bootstrap_schema:
   175         elif config.bootstrap_schema:
   182             # usually during repository creation
   176             # usually during repository creation
   214         # initialization now that we know cubes
   208         # initialization now that we know cubes
   215         self._get_pool().close(True)
   209         self._get_pool().close(True)
   216         # list of available pools (we can't iterated on Queue instance)
   210         # list of available pools (we can't iterated on Queue instance)
   217         self.pools = []
   211         self.pools = []
   218         for i in xrange(config['connections-pool-size']):
   212         for i in xrange(config['connections-pool-size']):
   219             self.pools.append(ConnectionsPool(self.sources))
   213             self.pools.append(pool.ConnectionsPool(self.sources))
   220             self._available_pools.put_nowait(self.pools[-1])
   214             self._available_pools.put_nowait(self.pools[-1])
   221         self._shutting_down = False
   215         self._shutting_down = False
   222         self.hm = vreg['hooks']
   216         self.hm = vreg['hooks']
   223         if not (config.creating or config.repairing):
   217         if not (config.creating or config.repairing):
   224             # call instance level initialisation hooks
   218             # call instance level initialisation hooks
   229 
   223 
   230     # internals ###############################################################
   224     # internals ###############################################################
   231 
   225 
   232     def get_source(self, uri, source_config):
   226     def get_source(self, uri, source_config):
   233         source_config['uri'] = uri
   227         source_config['uri'] = uri
   234         return get_source(source_config, self.schema, self)
   228         return sources.get_source(source_config, self.schema, self)
   235 
   229 
   236     def set_schema(self, schema, resetvreg=True):
   230     def set_schema(self, schema, resetvreg=True):
   237         schema.rebuild_infered_relations()
   231         schema.rebuild_infered_relations()
   238         self.info('set schema %s %#x', schema.name, id(schema))
   232         self.info('set schema %s %#x', schema.name, id(schema))
   239         self.debug(', '.join(sorted(str(e) for e in schema.entities())))
   233         self.debug(', '.join(sorted(str(e) for e in schema.entities())))
   248 
   242 
   249     def fill_schema(self):
   243     def fill_schema(self):
   250         """lod schema from the repository"""
   244         """lod schema from the repository"""
   251         from cubicweb.server.schemaserial import deserialize_schema
   245         from cubicweb.server.schemaserial import deserialize_schema
   252         self.info('loading schema from the repository')
   246         self.info('loading schema from the repository')
   253         appschema = CubicWebSchema(self.config.appid)
   247         appschema = schema.CubicWebSchema(self.config.appid)
   254         self.set_schema(self.config.load_bootstrap_schema(), resetvreg=False)
   248         self.set_schema(self.config.load_bootstrap_schema(), resetvreg=False)
   255         self.debug('deserializing db schema into %s %#x', appschema.name, id(appschema))
   249         self.debug('deserializing db schema into %s %#x', appschema.name, id(appschema))
   256         session = self.internal_session()
   250         session = self.internal_session()
   257         try:
   251         try:
   258             try:
   252             try:
   271         self.config.init_cubes(self.get_cubes())
   265         self.config.init_cubes(self.get_cubes())
   272 
   266 
   273     def start_looping_tasks(self):
   267     def start_looping_tasks(self):
   274         assert isinstance(self._looping_tasks, list), 'already started'
   268         assert isinstance(self._looping_tasks, list), 'already started'
   275         for i, (interval, func, args) in enumerate(self._looping_tasks):
   269         for i, (interval, func, args) in enumerate(self._looping_tasks):
   276             self._looping_tasks[i] = task = LoopTask(interval, func, args)
   270             self._looping_tasks[i] = task = utils.LoopTask(interval, func, args)
   277             self.info('starting task %s with interval %.2fs', task.name,
   271             self.info('starting task %s with interval %.2fs', task.name,
   278                       interval)
   272                       interval)
   279             task.start()
   273             task.start()
   280         # ensure no tasks will be further added
   274         # ensure no tasks will be further added
   281         self._looping_tasks = tuple(self._looping_tasks)
   275         self._looping_tasks = tuple(self._looping_tasks)
   291         except AttributeError:
   285         except AttributeError:
   292             raise RuntimeError("can't add looping task once the repository is started")
   286             raise RuntimeError("can't add looping task once the repository is started")
   293 
   287 
   294     def threaded_task(self, func):
   288     def threaded_task(self, func):
   295         """start function in a separated thread"""
   289         """start function in a separated thread"""
   296         t = RepoThread(func, self._running_threads)
   290         t = utils.RepoThread(func, self._running_threads)
   297         t.start()
   291         t.start()
   298 
   292 
   299     #@locked
   293     #@locked
   300     def _get_pool(self):
   294     def _get_pool(self):
   301         try:
   295         try:
   896         rql = []
   890         rql = []
   897         eschema = self.schema.eschema(etype)
   891         eschema = self.schema.eschema(etype)
   898         pendingrtypes = session.transaction_data.get('pendingrtypes', ())
   892         pendingrtypes = session.transaction_data.get('pendingrtypes', ())
   899         for rschema, targetschemas, x in eschema.relation_definitions():
   893         for rschema, targetschemas, x in eschema.relation_definitions():
   900             rtype = rschema.type
   894             rtype = rschema.type
   901             if rtype in VIRTUAL_RTYPES or rtype in pendingrtypes:
   895             if rtype in schema.VIRTUAL_RTYPES or rtype in pendingrtypes:
   902                 continue
   896                 continue
   903             var = '%s%s' % (rtype.upper(), x.upper())
   897             var = '%s%s' % (rtype.upper(), x.upper())
   904             if x == 'subject':
   898             if x == 'subject':
   905                 # don't skip inlined relation so they are regularly
   899                 # don't skip inlined relation so they are regularly
   906                 # deleted and so hooks are correctly called
   900                 # deleted and so hooks are correctly called
   976         entity._is_saved = True # entity has an eid and is saved
   970         entity._is_saved = True # entity has an eid and is saved
   977         # prefill entity relation caches
   971         # prefill entity relation caches
   978         session.set_entity_cache(entity)
   972         session.set_entity_cache(entity)
   979         for rschema in eschema.subject_relations():
   973         for rschema in eschema.subject_relations():
   980             rtype = str(rschema)
   974             rtype = str(rschema)
   981             if rtype in VIRTUAL_RTYPES:
   975             if rtype in schema.VIRTUAL_RTYPES:
   982                 continue
   976                 continue
   983             if rschema.is_final():
   977             if rschema.is_final():
   984                 entity.setdefault(rtype, None)
   978                 entity.setdefault(rtype, None)
   985             else:
   979             else:
   986                 entity.set_related_cache(rtype, 'subject', session.empty_rset())
   980                 entity.set_related_cache(rtype, 'subject', session.empty_rset())
   987         for rschema in eschema.object_relations():
   981         for rschema in eschema.object_relations():
   988             rtype = str(rschema)
   982             rtype = str(rschema)
   989             if rtype in VIRTUAL_RTYPES:
   983             if rtype in schema.VIRTUAL_RTYPES:
   990                 continue
   984                 continue
   991             entity.set_related_cache(rtype, 'object', session.empty_rset())
   985             entity.set_related_cache(rtype, 'object', session.empty_rset())
   992         # set inline relation cache before call to after_add_entity
   986         # set inline relation cache before call to after_add_entity
   993         for attr, value in relations:
   987         for attr, value in relations:
   994             session.update_rel_cache_add(entity.eid, attr, value)
   988             session.update_rel_cache_add(entity.eid, attr, value)