server/repository.py
changeset 0 b97547f5f1fa
child 341 0a426be2f3a2
equal deleted inserted replaced
-1:000000000000 0:b97547f5f1fa
       
     1 """Defines the central class for the CubicWeb RQL server: the repository.
       
     2 
       
     3 The repository is an abstraction allowing execution of rql queries against
       
     4 data sources. Most of the work is actually done in helper classes. The
       
     5 repository mainly:
       
     6 
       
     7 * brings these classes all together to provide a single access
       
     8   point to a cubicweb application.
       
     9 * handles session management
       
    10 * provides method for pyro registration, to call if pyro is enabled
       
    11 
       
    12 
       
    13 :organization: Logilab
       
    14 :copyright: 2001-2008 LOGILAB S.A. (Paris, FRANCE), all rights reserved.
       
    15 :contact: http://www.logilab.fr/ -- mailto:contact@logilab.fr
       
    16 """
       
    17 __docformat__ = "restructuredtext en"
       
    18 
       
    19 import sys
       
    20 import Queue
       
    21 from os.path import join, exists
       
    22 from time import time, localtime, strftime
       
    23 
       
    24 from mx.DateTime import now
       
    25 
       
    26 from logilab.common.decorators import cached
       
    27 
       
    28 from yams import BadSchemaDefinition
       
    29 from rql import RQLSyntaxError
       
    30 
       
    31 from cubicweb import (CW_SOFTWARE_ROOT, UnknownEid, AuthenticationError,
       
    32                       ETypeNotSupportedBySources, RTypeNotSupportedBySources,
       
    33                       BadConnectionId, Unauthorized, ValidationError,
       
    34                       ExecutionError, typed_eid,
       
    35                       CW_MIGRATION_MAP)
       
    36 from cubicweb.cwvreg import CubicWebRegistry
       
    37 from cubicweb.schema import CubicWebSchema
       
    38 
       
    39 from cubicweb.server.utils import RepoThread, LoopTask
       
    40 from cubicweb.server.pool import ConnectionsPool, LateOperation, SingleLastOperation
       
    41 from cubicweb.server.session import Session, InternalSession
       
    42 from cubicweb.server.querier import QuerierHelper
       
    43 from cubicweb.server.sources import get_source
       
    44 from cubicweb.server.hooksmanager import HooksManager
       
    45 from cubicweb.server.hookhelper import rproperty
       
    46 
       
    47 
       
    48 class CleanupEidTypeCacheOp(SingleLastOperation):
       
    49     """on rollback of a insert query or commit of delete query, we have to
       
    50     clear repository's cache from no more valid entries
       
    51 
       
    52     NOTE: querier's rqlst/solutions cache may have been polluted too with
       
    53     queries such as Any X WHERE X eid 32 if 32 has been rollbacked however
       
    54     generated queries are unpredictable and analysing all the cache probably
       
    55     too expensive. Notice that there is no pb when using args to specify eids
       
    56     instead of giving them into the rql string.
       
    57     """
       
    58 
       
    59     def commit_event(self):
       
    60         """the observed connections pool has been rollbacked,
       
    61         remove inserted eid from repository type/source cache
       
    62         """
       
    63         self.repo.clear_caches(self.session.query_data('pendingeids', ()))
       
    64         
       
    65     def rollback_event(self):
       
    66         """the observed connections pool has been rollbacked,
       
    67         remove inserted eid from repository type/source cache
       
    68         """
       
    69         self.repo.clear_caches(self.session.query_data('neweids', ()))
       
    70 
       
    71 
       
    72 class FTIndexEntityOp(LateOperation):
       
    73     """operation to delay entity full text indexation to commit
       
    74 
       
    75     since fti indexing may trigger discovery of other entities, it should be
       
    76     triggered on precommit, not commit, and this should be done after other
       
    77     precommit operation which may add relations to the entity
       
    78     """
       
    79 
       
    80     def precommit_event(self):
       
    81         session = self.session
       
    82         entity = self.entity
       
    83         if entity.eid in session.query_data('pendingeids', ()):
       
    84             return # entity added and deleted in the same transaction
       
    85         session.repo.system_source.fti_unindex_entity(session, entity.eid)
       
    86         for container in entity.fti_containers():
       
    87             session.repo.index_entity(session, container)
       
    88             
       
    89     def commit_event(self):
       
    90         pass
       
    91 
       
    92 def del_existing_rel_if_needed(session, eidfrom, rtype, eidto):
       
    93     """delete existing relation when adding a new one if card is 1 or ?
       
    94 
       
    95     have to be done once the new relation has been inserted to avoid having
       
    96     an entity without a relation for some time
       
    97 
       
    98     this kind of behaviour has to be done in the repository so we don't have
       
    99     hooks order hazardness
       
   100     """
       
   101     # skip delete queries (only?) if session is an internal session. This is
       
   102     # hooks responsability to ensure they do not violate relation's cardinality
       
   103     if session.is_super_session:
       
   104         return
       
   105     card = rproperty(session, rtype, eidfrom, eidto, 'cardinality')
       
   106     # one may be tented to check for neweids but this may cause more than one
       
   107     # relation even with '1?'  cardinality if thoses relations are added in the
       
   108     # same transaction where the entity is being created. This never occurs from
       
   109     # the web interface but may occurs during test or dbapi connection (though
       
   110     # not expected for this).  So: don't do it, we pretend to ensure repository
       
   111     # consistency.
       
   112     # XXX should probably not use unsafe_execute!
       
   113     if card[0] in '1?':
       
   114         rschema = session.repo.schema.rschema(rtype)
       
   115         if not rschema.inlined:
       
   116             session.unsafe_execute('DELETE X %s Y WHERE X eid %%(x)s, NOT Y eid %%(y)s' % rtype,
       
   117                                    {'x': eidfrom, 'y': eidto}, 'x')
       
   118     if card[1] in '1?':
       
   119         session.unsafe_execute('DELETE X %s Y WHERE NOT X eid %%(x)s, Y eid %%(y)s' % rtype,
       
   120                                {'x': eidfrom, 'y': eidto}, 'y')
       
   121 
       
   122     
       
   123 class Repository(object):
       
   124     """a repository provides access to a set of persistent storages for
       
   125     entities and relations
       
   126 
       
   127     XXX protect pyro access
       
   128     """
       
   129     
       
   130     def __init__(self, config, vreg=None, debug=False):
       
   131         self.config = config
       
   132         if vreg is None:
       
   133             vreg = CubicWebRegistry(config, debug)
       
   134         self.vreg = vreg
       
   135         self.pyro_registered = False
       
   136         self.info('starting repository from %s', self.config.apphome)
       
   137         # dictionary of opened sessions
       
   138         self._sessions = {}
       
   139         # list of functions to be called at regular interval
       
   140         self._looping_tasks = []
       
   141         # list of running threads
       
   142         self._running_threads = []
       
   143         # initial schema, should be build or replaced latter
       
   144         self.schema = CubicWebSchema(config.appid)
       
   145         # querier helper, need to be created after sources initialization
       
   146         self.querier = QuerierHelper(self, self.schema)
       
   147         # sources
       
   148         self.sources = []
       
   149         self.sources_by_uri = {}
       
   150         # FIXME: store additional sources info in the system database ?
       
   151         # FIXME: sources should be ordered (add_entity priority)
       
   152         for uri, source_config in config.sources().items():
       
   153             if uri == 'admin':
       
   154                 # not an actual source
       
   155                 continue 
       
   156             source = self.get_source(uri, source_config)
       
   157             self.sources_by_uri[uri] = source
       
   158             self.sources.append(source)
       
   159         self.system_source = self.sources_by_uri['system']
       
   160         # ensure system source is the first one
       
   161         self.sources.remove(self.system_source)
       
   162         self.sources.insert(0, self.system_source)
       
   163         # cache eid -> type / source
       
   164         self._type_source_cache = {}
       
   165         # cache (extid, source uri) -> eid
       
   166         self._extid_cache = {}
       
   167         # create the hooks manager
       
   168         self.hm = HooksManager(self.schema)
       
   169         # open some connections pools
       
   170         self._available_pools = Queue.Queue()
       
   171         self._available_pools.put_nowait(ConnectionsPool(self.sources))
       
   172         if config.read_application_schema:
       
   173             # normal start: load the application schema from the database
       
   174             self.fill_schema()
       
   175         elif config.bootstrap_schema:
       
   176             # usually during repository creation
       
   177             self.warning("set fs application'schema as bootstrap schema")
       
   178             config.bootstrap_cubes()
       
   179             self.set_bootstrap_schema(self.config.load_schema())
       
   180             # need to load the Any and EUser entity types
       
   181             self.vreg.schema = self.schema
       
   182             etdirectory = join(CW_SOFTWARE_ROOT, 'entities')
       
   183             self.vreg.load_file(etdirectory, '__init__.py')
       
   184             self.vreg.load_file(etdirectory, 'authobjs.py')
       
   185         else:
       
   186             # test start: use the file system schema (quicker)
       
   187             self.warning("set fs application'schema")
       
   188             config.bootstrap_cubes()
       
   189             self.set_schema(self.config.load_schema())
       
   190         if not config.creating:
       
   191             if 'EProperty' in self.schema:
       
   192                 self.vreg.init_properties(self.properties())
       
   193             # call source's init method to complete their initialisation if
       
   194             # needed (for instance looking for persistent configuration using an
       
   195             # internal session, which is not possible until pools have been
       
   196             # initialized)
       
   197             for source in self.sources:
       
   198                 source.init()
       
   199             # call application level initialisation hooks
       
   200             self.hm.call_hooks('server_startup', repo=self)
       
   201             # register a task to cleanup expired session
       
   202             self.looping_task(self.config['session-time']/3.,
       
   203                               self.clean_sessions)
       
   204         else:
       
   205             # call init_creating so for instance native source can configurate
       
   206             # tsearch according to postgres version
       
   207             for source in self.sources:
       
   208                 source.init_creating()
       
   209         # close initialization pool and reopen fresh ones for proper
       
   210         # initialization now that we know cubes
       
   211         self._get_pool().close(True) 
       
   212         for i in xrange(config['connections-pool-size']):
       
   213             self._available_pools.put_nowait(ConnectionsPool(self.sources))
       
   214      
       
   215     # internals ###############################################################
       
   216 
       
   217     def get_source(self, uri, source_config):
       
   218         source_config['uri'] = uri
       
   219         return get_source(source_config, self.schema, self)
       
   220         
       
   221     def set_schema(self, schema, resetvreg=True):
       
   222         schema.rebuild_infered_relations()
       
   223         self.info('set schema %s %#x', schema.name, id(schema))
       
   224         self.debug(', '.join(sorted(str(e) for e in schema.entities())))
       
   225         self.querier.set_schema(schema)
       
   226         for source in self.sources:
       
   227             source.set_schema(schema)
       
   228         self.schema = schema
       
   229         if resetvreg:
       
   230             # full reload of all appobjects
       
   231             self.vreg.reset()
       
   232             self.vreg.set_schema(schema)
       
   233         self.hm.set_schema(schema)
       
   234         self.hm.register_system_hooks(self.config)
       
   235         # application specific hooks
       
   236         if self.config.application_hooks:
       
   237             self.info('loading application hooks')
       
   238             self.hm.register_hooks(self.config.load_hooks(self.vreg))
       
   239 
       
   240     def fill_schema(self):
       
   241         """lod schema from the repository"""
       
   242         from cubicweb.server.schemaserial import deserialize_schema
       
   243         self.info('loading schema from the repository')
       
   244         appschema = CubicWebSchema(self.config.appid)
       
   245         self.set_bootstrap_schema(self.config.load_bootstrap_schema())
       
   246         self.debug('deserializing db schema into %s %#x', appschema.name, id(appschema))
       
   247         session = self.internal_session()
       
   248         try:
       
   249             try:
       
   250                 deserialize_schema(appschema, session)
       
   251             except BadSchemaDefinition:
       
   252                 raise
       
   253             except Exception, ex:
       
   254                 raise Exception('Is the database initialised ? (cause: %s)' % 
       
   255                                 (ex.args and ex.args[0].strip() or 'unknown')), \
       
   256                                 None, sys.exc_info()[-1]
       
   257             self.info('set the actual schema')
       
   258             # XXX have to do this since EProperty isn't in the bootstrap schema
       
   259             #     it'll be redone in set_schema
       
   260             self.set_bootstrap_schema(appschema)
       
   261             # 2.49 migration
       
   262             if exists(join(self.config.apphome, 'vc.conf')):
       
   263                 session.set_pool()
       
   264                 if not 'template' in file(join(self.config.apphome, 'vc.conf')).read():
       
   265                     # remaning from cubicweb < 2.38...
       
   266                     session.execute('DELETE EProperty X WHERE X pkey "system.version.template"')
       
   267                     session.commit()
       
   268         finally:
       
   269             session.close()
       
   270         self.config.init_cubes(self.get_cubes())
       
   271         self.set_schema(appschema)
       
   272         
       
   273     def set_bootstrap_schema(self, schema):
       
   274         """disable hooks when setting a bootstrap schema, but restore
       
   275         the configuration for the next time
       
   276         """
       
   277         config = self.config
       
   278         # XXX refactor
       
   279         config.core_hooks = False
       
   280         config.usergroup_hooks = False
       
   281         config.schema_hooks = False
       
   282         config.notification_hooks = False
       
   283         config.application_hooks = False
       
   284         self.set_schema(schema, resetvreg=False)
       
   285         config.core_hooks = True
       
   286         config.usergroup_hooks = True
       
   287         config.schema_hooks = True
       
   288         config.notification_hooks = True
       
   289         config.application_hooks = True
       
   290             
       
   291     def start_looping_tasks(self):
       
   292         assert isinstance(self._looping_tasks, list), 'already started'
       
   293         for i, (interval, func) in enumerate(self._looping_tasks):
       
   294             self._looping_tasks[i] = task = LoopTask(interval, func)
       
   295             self.info('starting task %s with interval %.2fs', task.name,
       
   296                       interval)
       
   297             task.start()
       
   298         # ensure no tasks will be further added
       
   299         self._looping_tasks = tuple(self._looping_tasks)
       
   300 
       
   301     def looping_task(self, interval, func):
       
   302         """register a function to be called every `interval` seconds.
       
   303         
       
   304         looping tasks can only be registered during repository initialization,
       
   305         once done this method will fail.
       
   306         """
       
   307         try:
       
   308             self._looping_tasks.append( (interval, func) )
       
   309         except AttributeError:
       
   310             raise RuntimeError("can't add looping task once the repository is started")
       
   311 
       
   312     def threaded_task(self, func):
       
   313         """start function in a separated thread"""
       
   314         t = RepoThread(func, self._running_threads)
       
   315         t.start()
       
   316         
       
   317     #@locked
       
   318     def _get_pool(self):
       
   319         try:
       
   320             return self._available_pools.get(True, timeout=5)
       
   321         except Queue.Empty:
       
   322             raise Exception('no pool available after 5 secs, probably either a '
       
   323                             'bug in code (to many uncommited/rollbacked '
       
   324                             'connections) or to much load on the server (in '
       
   325                             'which case you can try to set a bigger '
       
   326                             'connections pools size)')
       
   327         
       
   328     def _free_pool(self, pool):
       
   329         pool.rollback()
       
   330         self._available_pools.put_nowait(pool)
       
   331 
       
   332     def pinfo(self):
       
   333         # XXX: session.pool is accessed from a local storage, would be interesting
       
   334         #      to see if there is a pool set in any thread specific data)
       
   335         import threading
       
   336         return '%s: %s (%s)' % (self._available_pools.qsize(),
       
   337                                 ','.join(session.user.login for session in self._sessions.values()
       
   338                                          if session.pool),
       
   339                                 threading.currentThread())
       
   340     def shutdown(self):
       
   341         """called on server stop event to properly close opened sessions and
       
   342         connections
       
   343         """
       
   344         if isinstance(self._looping_tasks, tuple): # if tasks have been started
       
   345             for looptask in self._looping_tasks:
       
   346                 self.info('canceling task %s...', looptask.name)
       
   347                 looptask.cancel()
       
   348                 looptask.join()
       
   349                 self.info('task %s finished', looptask.name)
       
   350         for thread in self._running_threads:
       
   351             self.info('waiting thread %s...', thread.name)
       
   352             thread.join()
       
   353             self.info('thread %s finished', thread.name)
       
   354         self.hm.call_hooks('server_shutdown', repo=self)
       
   355         self.close_sessions()
       
   356         while not self._available_pools.empty():
       
   357             pool = self._available_pools.get_nowait()
       
   358             try:
       
   359                 pool.close(True)
       
   360             except:
       
   361                 self.exception('error while closing %s' % pool)
       
   362                 continue
       
   363         if self.pyro_registered:
       
   364             pyro_unregister(self.config)
       
   365         hits, misses = self.querier.cache_hit, self.querier.cache_miss
       
   366         try:
       
   367             self.info('rqlt st cache hit/miss: %s/%s (%s%% hits)', hits, misses,
       
   368                       (hits * 100) / (hits + misses))
       
   369             hits, misses = self.system_source.cache_hit, self.system_source.cache_miss
       
   370             self.info('sql cache hit/miss: %s/%s (%s%% hits)', hits, misses,
       
   371                       (hits * 100) / (hits + misses))
       
   372             nocache  = self.system_source.no_cache
       
   373             self.info('sql cache usage: %s/%s (%s%%)', hits+ misses, nocache,
       
   374                       ((hits + misses) * 100) / (hits + misses + nocache))
       
   375         except ZeroDivisionError:
       
   376             pass
       
   377         
       
   378     def authenticate_user(self, session, login, password):
       
   379         """validate login / password, raise AuthenticationError on failure
       
   380         return associated EUser instance on success
       
   381         """
       
   382         for source in self.sources:
       
   383             if source.support_entity('EUser'):
       
   384                 try:
       
   385                     eid = source.authenticate(session, login, password)
       
   386                     break
       
   387                 except AuthenticationError:
       
   388                     continue
       
   389         else:
       
   390             raise AuthenticationError('authentication failed with all sources')
       
   391         euser = self._build_user(session, eid)
       
   392         if self.config.consider_user_state and \
       
   393                not euser.state in euser.AUTHENTICABLE_STATES:
       
   394             raise AuthenticationError('user is not in authenticable state')
       
   395         return euser
       
   396 
       
   397     def _build_user(self, session, eid):
       
   398         cls = self.vreg.etype_class('EUser')
       
   399         rql = cls.fetch_rql(session.user, ['X eid %(x)s'])
       
   400         rset = session.execute(rql, {'x': eid}, 'x')
       
   401         assert len(rset) == 1, rset
       
   402         euser = rset.get_entity(0, 0)
       
   403         # prefetch / cache euser's groups and properties. This is especially
       
   404         # useful for internal sessions to avoid security insertions
       
   405         euser.groups
       
   406         euser.properties
       
   407         return euser
       
   408         
       
   409     # public (dbapi) interface ################################################
       
   410             
       
   411     def get_schema(self):
       
   412         """return the application schema. This is a public method, not
       
   413         requiring a session id
       
   414         """
       
   415         try:
       
   416             # necessary to support pickling used by pyro
       
   417             self.schema.__hashmode__ = 'pickle'
       
   418             return self.schema
       
   419         finally:
       
   420             self.schema.__hashmode__ = None
       
   421 
       
   422     def get_cubes(self):
       
   423         """return the list of cubes used by this application. This is a
       
   424         public method, not requiring a session id.
       
   425         """
       
   426         versions = self.get_versions(not self.config.creating)
       
   427         cubes = list(versions)
       
   428         cubes.remove('cubicweb')
       
   429         return cubes
       
   430 
       
   431     @cached
       
   432     def get_versions(self, checkversions=False):
       
   433         """return the a dictionary containing cubes used by this application
       
   434         as key with their version as value, including cubicweb version. This is a
       
   435         public method, not requiring a session id.
       
   436         """
       
   437         from logilab.common.changelog import Version
       
   438         vcconf = {}
       
   439         session = self.internal_session()
       
   440         try:
       
   441             for pk, version in session.execute(
       
   442                 'Any K,V WHERE P is EProperty, P value V, P pkey K, '
       
   443                 'P pkey ~="system.version.%"', build_descr=False):
       
   444                 cube = pk.split('.')[-1]
       
   445                 # XXX cubicweb migration
       
   446                 if cube in CW_MIGRATION_MAP:
       
   447                     cube = CW_MIGRATION_MAP[cube]
       
   448                 version = Version(version)
       
   449                 vcconf[cube] = version
       
   450                 if checkversions:
       
   451                     if cube != 'cubicweb':
       
   452                         fsversion = self.config.cube_version(cube)
       
   453                     else:
       
   454                         fsversion = self.config.cubicweb_version()
       
   455                     if version < fsversion:
       
   456                         msg = ('application has %s version %s but %s '
       
   457                                'is installed. Run "cubicweb-ctl upgrade".')
       
   458                         raise ExecutionError(msg % (cube, version, fsversion))
       
   459         finally:
       
   460             session.close()
       
   461         return vcconf
       
   462     
       
   463     @cached
       
   464     def source_defs(self):
       
   465         sources = self.config.sources().copy()
       
   466         # remove manager information
       
   467         sources.pop('admin', None)
       
   468         # remove sensitive information
       
   469         for uri, sourcedef in sources.iteritems():
       
   470             sourcedef = sourcedef.copy()
       
   471             self.sources_by_uri[uri].remove_sensitive_information(sourcedef)
       
   472             sources[uri] = sourcedef
       
   473         return sources
       
   474 
       
   475     def properties(self):
       
   476         """return a result set containing system wide properties"""
       
   477         session = self.internal_session()
       
   478         try:
       
   479             return session.execute('Any K,V WHERE P is EProperty,'
       
   480                                    'P pkey K, P value V, NOT P for_user U',
       
   481                                    build_descr=False)
       
   482         finally:
       
   483             session.close()
       
   484 
       
   485     def register_user(self, login, password, **kwargs):
       
   486         """check a user with the given login exists, if not create it with the
       
   487         given password. This method is designed to be used for anonymous
       
   488         registration on public web site.
       
   489         """
       
   490         session = self.internal_session()
       
   491         try:
       
   492             if session.execute('EUser X WHERE X login %(login)s', {'login': login}):
       
   493                 return
       
   494             # we have to create the user
       
   495             user = self.vreg.etype_class('EUser')(session, None)
       
   496             if isinstance(password, unicode):
       
   497                 # password should *always* be utf8 encoded
       
   498                 password = password.encode('UTF8')
       
   499             kwargs['login'] = login
       
   500             kwargs['upassword'] = password
       
   501             user.update(kwargs)
       
   502             self.glob_add_entity(session, user)
       
   503             session.execute('SET X in_group G WHERE X eid %(x)s, G name "users"',
       
   504                             {'x': user.eid})
       
   505             session.commit()
       
   506         finally:
       
   507             session.close()
       
   508         
       
   509     def connect(self, login, password, cnxprops=None):
       
   510         """open a connection for a given user
       
   511 
       
   512         base_url may be needed to send mails
       
   513         cnxtype indicate if this is a pyro connection or a in-memory connection
       
   514         
       
   515         raise `AuthenticationError` if the authentication failed
       
   516         raise `ConnectionError` if we can't open a connection
       
   517         """
       
   518         # use an internal connection
       
   519         session = self.internal_session()
       
   520         # try to get a user object
       
   521         try:
       
   522             user = self.authenticate_user(session, login, password)
       
   523         finally:
       
   524             session.close()
       
   525         session = Session(user, self, cnxprops)
       
   526         user.req = user.rset.req = session
       
   527         user.clear_related_cache()
       
   528         self._sessions[session.id] = session
       
   529         self.info('opened %s', session)
       
   530         self.hm.call_hooks('session_open', session=session)
       
   531         # commit session at this point in case write operation has been done
       
   532         # during `session_open` hooks
       
   533         session.commit()
       
   534         return session.id
       
   535 
       
   536     def execute(self, sessionid, rqlstring, args=None, eid_key=None, build_descr=True):
       
   537         """execute a RQL query
       
   538 
       
   539         * rqlstring should be an unicode string or a plain ascii string
       
   540         * args the optional parameters used in the query
       
   541         * build_descr is a flag indicating if the description should be
       
   542           built on select queries
       
   543         """
       
   544         session = self._get_session(sessionid, setpool=True)
       
   545         try:
       
   546             try:
       
   547                 return self.querier.execute(session, rqlstring, args, eid_key,
       
   548                                             build_descr)
       
   549             except (Unauthorized, RQLSyntaxError):
       
   550                 raise
       
   551             except ValidationError, ex:
       
   552                 # need ValidationError normalization here so error may pass
       
   553                 # through pyro
       
   554                 if hasattr(ex.entity, 'eid'):
       
   555                     ex.entity = ex.entity.eid # error raised by yams
       
   556                     args = list(ex.args)
       
   557                     args[0] = ex.entity
       
   558                     ex.args = tuple(args)
       
   559                 raise
       
   560             except:
       
   561                 # FIXME: check error to catch internal errors
       
   562                 self.exception('unexpected error')
       
   563                 raise
       
   564         finally:
       
   565             session.reset_pool()
       
   566     
       
   567     def describe(self, sessionid, eid):
       
   568         """return a tuple (type, source, extid) for the entity with id <eid>"""
       
   569         session = self._get_session(sessionid, setpool=True)
       
   570         try:
       
   571             return self.type_and_source_from_eid(eid, session)
       
   572         finally:
       
   573             session.reset_pool()
       
   574 
       
   575     def check_session(self, sessionid):
       
   576         """raise `BadSessionId` if the connection is no more valid"""
       
   577         self._get_session(sessionid, setpool=False)
       
   578 
       
   579     def get_shared_data(self, sessionid, key, default=None, pop=False):
       
   580         """return the session's data dictionary"""
       
   581         session = self._get_session(sessionid, setpool=False)
       
   582         return session.get_shared_data(key, default, pop)
       
   583 
       
   584     def set_shared_data(self, sessionid, key, value, querydata=False):
       
   585         """set value associated to `key` in shared data
       
   586 
       
   587         if `querydata` is true, the value will be added to the repository
       
   588         session's query data which are cleared on commit/rollback of the current
       
   589         transaction, and won't be available through the connexion, only on the
       
   590         repository side.
       
   591         """
       
   592         session = self._get_session(sessionid, setpool=False)
       
   593         session.set_shared_data(key, value, querydata)
       
   594 
       
   595     def commit(self, sessionid):
       
   596         """commit transaction for the session with the given id"""
       
   597         self.debug('begin commit for session %s', sessionid)
       
   598         try:
       
   599             self._get_session(sessionid, setpool=True).commit()
       
   600         except (ValidationError, Unauthorized): 
       
   601             raise
       
   602         except:
       
   603             self.exception('unexpected error')
       
   604             raise
       
   605         
       
   606     def rollback(self, sessionid):
       
   607         """commit transaction for the session with the given id"""
       
   608         self.debug('begin rollback for session %s', sessionid)
       
   609         try:
       
   610             self._get_session(sessionid, setpool=True).rollback()
       
   611         except:
       
   612             self.exception('unexpected error')
       
   613             raise
       
   614 
       
   615     def close(self, sessionid):
       
   616         """close the session with the given id"""
       
   617         session = self._get_session(sessionid, setpool=True)
       
   618         # operation uncommited before close are rollbacked before hook is called
       
   619         session.rollback()
       
   620         self.hm.call_hooks('session_close', session=session)
       
   621         # commit session at this point in case write operation has been done
       
   622         # during `session_close` hooks
       
   623         session.commit()
       
   624         session.close()
       
   625         del self._sessions[sessionid]
       
   626         self.info('closed session %s for user %s', sessionid, session.user.login)
       
   627     
       
   628     def user_info(self, sessionid, props=None):
       
   629         """this method should be used by client to:
       
   630         * check session id validity
       
   631         * update user information on each user's request (i.e. groups and
       
   632           custom properties)
       
   633         """
       
   634         session = self._get_session(sessionid, setpool=False)
       
   635         if props:
       
   636             # update session properties
       
   637             for prop, value in props.items():
       
   638                 session.change_property(prop, value)
       
   639         user = session.user
       
   640         return user.eid, user.login, user.groups, user.properties
       
   641             
       
   642     # public (inter-repository) interface #####################################
       
   643     
       
   644     def entities_modified_since(self, etypes, mtime):
       
   645         """function designed to be called from an external repository which
       
   646         is using this one as a rql source for synchronization, and return a
       
   647         3-uple containing :
       
   648         * the local date
       
   649         * list of (etype, eid) of entities of the given types which have been
       
   650           modified since the given timestamp (actually entities whose full text
       
   651           index content has changed)
       
   652         * list of (etype, eid) of entities of the given types which have been
       
   653           deleted since the given timestamp
       
   654         """
       
   655         session = self.internal_session()
       
   656         updatetime = now()
       
   657         try:
       
   658             modentities, delentities = self.system_source.modified_entities(
       
   659                 session, etypes, mtime)
       
   660             return updatetime, modentities, delentities
       
   661         finally:
       
   662             session.close()
       
   663 
       
   664     # session handling ########################################################
       
   665         
       
   666     def close_sessions(self):
       
   667         """close every opened sessions"""
       
   668         for sessionid in self._sessions.keys():
       
   669             try:
       
   670                 self.close(sessionid)
       
   671             except:
       
   672                 self.exception('error while closing session %s' % sessionid)
       
   673 
       
   674     def clean_sessions(self):
       
   675         """close sessions not used since an amount of time specified in the
       
   676         configuration
       
   677         """
       
   678         mintime = time() - self.config['session-time']
       
   679         self.debug('cleaning session unused since %s',
       
   680                    strftime('%T', localtime(mintime)))
       
   681         nbclosed = 0
       
   682         for session in self._sessions.values():
       
   683             if session.timestamp < mintime:
       
   684                 self.close(session.id)
       
   685                 nbclosed += 1
       
   686         return nbclosed
       
   687     
       
   688     def internal_session(self, cnxprops=None):
       
   689         """return a dbapi like connection/cursor using internal user which
       
   690         have every rights on the repository. You'll *have to* commit/rollback
       
   691         or close (rollback implicitly) the session once the job's done, else
       
   692         you'll leak connections pool up to the time where no more pool is
       
   693         available, causing irremediable freeze...
       
   694         """
       
   695         session = InternalSession(self, cnxprops)
       
   696         session.set_pool()
       
   697         return session
       
   698             
       
   699     def _get_session(self, sessionid, setpool=False):
       
   700         """return the user associated to the given session identifier"""
       
   701         try:
       
   702             session = self._sessions[sessionid]
       
   703         except KeyError:
       
   704             raise BadConnectionId('No such session %s' % sessionid)
       
   705         if setpool:
       
   706             session.set_pool()
       
   707         return session
       
   708 
       
   709     # data sources handling ###################################################
       
   710     # * correspondance between eid and (type, source)
       
   711     # * correspondance between eid and local id (i.e. specific to a given source)
       
   712     # * searchable text indexes
       
   713     
       
   714     def type_and_source_from_eid(self, eid, session=None):
       
   715         """return a tuple (type, source, extid) for the entity with id <eid>"""
       
   716         try:
       
   717             eid = typed_eid(eid)
       
   718         except ValueError:
       
   719             raise UnknownEid(eid)
       
   720         try:
       
   721             return self._type_source_cache[eid]
       
   722         except KeyError:
       
   723             if session is None:
       
   724                 session = self.internal_session()
       
   725                 reset_pool = True
       
   726             else:
       
   727                 reset_pool = False
       
   728             try:
       
   729                 etype, uri, extid = self.system_source.eid_type_source(session,
       
   730                                                                        eid)
       
   731             finally:
       
   732                 if reset_pool:
       
   733                     session.reset_pool()
       
   734         self._type_source_cache[eid] = (etype, uri, extid)
       
   735         if uri != 'system':
       
   736             self._extid_cache[(extid, uri)] = eid
       
   737         return etype, uri, extid
       
   738 
       
   739     def clear_caches(self, eids):
       
   740         etcache = self._type_source_cache
       
   741         extidcache = self._extid_cache
       
   742         rqlcache = self.querier._rql_cache
       
   743         for eid in eids:
       
   744             try:
       
   745                 etype, uri, extid = etcache.pop(typed_eid(eid)) # may be a string in some cases
       
   746                 rqlcache.pop('%s X WHERE X eid %s' % (etype, eid), None)
       
   747                 extidcache.pop((extid, uri), None)
       
   748             except KeyError:
       
   749                 etype = None
       
   750             rqlcache.pop('Any X WHERE X eid %s' % eid, None)
       
   751             for source in self.sources:
       
   752                 source.clear_eid_cache(eid, etype)
       
   753                 
       
   754     def type_from_eid(self, eid, session=None):
       
   755         """return the type of the entity with id <eid>"""
       
   756         return self.type_and_source_from_eid(eid, session)[0]
       
   757     
       
   758     def source_from_eid(self, eid, session=None):
       
   759         """return the source for the given entity's eid"""
       
   760         return self.sources_by_uri[self.type_and_source_from_eid(eid, session)[1]]
       
   761         
       
   762     def eid2extid(self, source, eid, session=None):
       
   763         """get local id from an eid"""
       
   764         etype, uri, extid = self.type_and_source_from_eid(eid, session)
       
   765         if source.uri != uri:
       
   766             # eid not from the given source
       
   767             raise UnknownEid(eid)
       
   768         return extid
       
   769 
       
   770     def extid2eid(self, source, lid, etype, session=None, insert=True):
       
   771         """get eid from a local id. An eid is attributed if no record is found"""
       
   772         cachekey = (str(lid), source.uri)
       
   773         try:
       
   774             return self._extid_cache[cachekey]
       
   775         except KeyError:
       
   776             pass
       
   777         reset_pool = False
       
   778         if session is None:
       
   779             session = self.internal_session()
       
   780             reset_pool = True
       
   781         eid = self.system_source.extid2eid(session, source, lid)
       
   782         if eid is not None:
       
   783             self._extid_cache[cachekey] = eid
       
   784             self._type_source_cache[eid] = (etype, source.uri, lid)
       
   785             if reset_pool:
       
   786                 session.reset_pool()
       
   787             return eid
       
   788         if not insert:
       
   789             return
       
   790         # no link between lid and eid, create one using an internal session
       
   791         # since the current session user may not have required permissions to
       
   792         # do necessary stuff and we don't want to commit user session.
       
   793         #
       
   794         # More other, even if session is already an internal session but is
       
   795         # processing a commit, we have to use another one
       
   796         if not session.is_internal_session:
       
   797             session = self.internal_session()
       
   798             reset_pool = True
       
   799         try:
       
   800             eid = self.system_source.create_eid(session)
       
   801             self._extid_cache[cachekey] = eid
       
   802             self._type_source_cache[eid] = (etype, source.uri, lid)
       
   803             entity = source.before_entity_insertion(session, lid, etype, eid)
       
   804             if source.should_call_hooks:
       
   805                 self.hm.call_hooks('before_add_entity', etype, session, entity)
       
   806             self.add_info(session, entity, source, lid)
       
   807             source.after_entity_insertion(session, lid, entity)
       
   808             if source.should_call_hooks:
       
   809                 self.hm.call_hooks('after_add_entity', etype, session, entity)
       
   810             else:
       
   811                 # minimal meta-data
       
   812                 session.execute('SET X is E WHERE X eid %(x)s, E name %(name)s',
       
   813                                 {'x': entity.eid, 'name': entity.id}, 'x')
       
   814             session.commit(reset_pool)
       
   815             return eid
       
   816         except:
       
   817             session.rollback(reset_pool)
       
   818             raise
       
   819         
       
   820     def add_info(self, session, entity, source, extid=None, complete=True):
       
   821         """add type and source info for an eid into the system table,
       
   822         and index the entity with the full text index
       
   823         """
       
   824         # begin by inserting eid/type/source/extid into the entities table
       
   825         self.system_source.add_info(session, entity, source, extid)
       
   826         if complete:
       
   827             entity.complete(entity.e_schema.indexable_attributes())
       
   828         session.add_query_data('neweids', entity.eid)
       
   829         # now we can update the full text index
       
   830         FTIndexEntityOp(session, entity=entity)
       
   831         CleanupEidTypeCacheOp(session)
       
   832         
       
   833     def delete_info(self, session, eid):
       
   834         self._prepare_delete_info(session, eid)
       
   835         self._delete_info(session, eid)
       
   836         
       
   837     def _prepare_delete_info(self, session, eid):
       
   838         """prepare the repository for deletion of an entity:
       
   839         * update the fti
       
   840         * mark eid as being deleted in session info
       
   841         * setup cache update operation
       
   842         """
       
   843         self.system_source.fti_unindex_entity(session, eid)
       
   844         pending = session.query_data('pendingeids', set(), setdefault=True)
       
   845         pending.add(eid)
       
   846         CleanupEidTypeCacheOp(session)
       
   847         
       
   848     def _delete_info(self, session, eid):
       
   849         """delete system information on deletion of an entity:
       
   850         * delete all relations on this entity
       
   851         * transfer record from the entities table to the deleted_entities table
       
   852         """
       
   853         etype, uri, extid = self.type_and_source_from_eid(eid, session)
       
   854         self._clear_eid_relations(session, etype, eid)
       
   855         self.system_source.delete_info(session, eid, etype, uri, extid)
       
   856         
       
   857     def _clear_eid_relations(self, session, etype, eid):
       
   858         """when a entity is deleted, build and execute rql query to delete all
       
   859         its relations
       
   860         """
       
   861         rql = []
       
   862         eschema = self.schema.eschema(etype)
       
   863         for rschema, targetschemas, x in eschema.relation_definitions():
       
   864             rtype = rschema.type
       
   865             if rtype == 'identity':
       
   866                 continue
       
   867             var = '%s%s' % (rtype.upper(), x.upper())
       
   868             if x == 'subject':
       
   869                 # don't skip inlined relation so they are regularly
       
   870                 # deleted and so hooks are correctly called
       
   871                 rql.append('X %s %s' % (rtype, var))
       
   872             else:
       
   873                 rql.append('%s %s X' % (var, rtype))
       
   874         rql = 'DELETE %s WHERE X eid %%(x)s' % ','.join(rql)
       
   875         # unsafe_execute since we suppose that if user can delete the entity,
       
   876         # he can delete all its relations without security checking
       
   877         session.unsafe_execute(rql, {'x': eid}, 'x', build_descr=False)
       
   878 
       
   879     def index_entity(self, session, entity):
       
   880         """full text index a modified entity"""
       
   881         alreadydone = session.query_data('indexedeids', set(), setdefault=True)
       
   882         if entity.eid in alreadydone:
       
   883             self.info('skipping reindexation of %s, already done', entity.eid)
       
   884             return
       
   885         alreadydone.add(entity.eid)
       
   886         self.system_source.fti_index_entity(session, entity)
       
   887         
       
   888     def locate_relation_source(self, session, subject, rtype, object):
       
   889         subjsource = self.source_from_eid(subject, session)
       
   890         objsource = self.source_from_eid(object, session)
       
   891         if not (subjsource is objsource and subjsource.support_relation(rtype, 1)):
       
   892             source = self.system_source
       
   893             if not source.support_relation(rtype, 1):
       
   894                 raise RTypeNotSupportedBySources(rtype)
       
   895         else:
       
   896             source = subjsource
       
   897         return source
       
   898     
       
   899     def locate_etype_source(self, etype):
       
   900         for source in self.sources:
       
   901             if source.support_entity(etype, 1):
       
   902                 return source
       
   903         else:
       
   904             raise ETypeNotSupportedBySources(etype)
       
   905         
       
   906     def glob_add_entity(self, session, entity):
       
   907         """add an entity to the repository
       
   908         
       
   909         the entity eid should originaly be None and a unique eid is assigned to
       
   910         the entity instance
       
   911         """
       
   912         entity = entity.pre_add_hook()
       
   913         eschema = entity.e_schema
       
   914         etype = str(eschema)
       
   915         source = self.locate_etype_source(etype)
       
   916         # attribute an eid to the entity before calling hooks
       
   917         entity.set_eid(self.system_source.create_eid(session))
       
   918         entity._is_saved = False # entity has an eid but is not yet saved
       
   919         relations = []
       
   920         # if inlined relations are specified, fill entity's related cache to
       
   921         # avoid unnecessary queries
       
   922         for attr in entity.keys():
       
   923             rschema = eschema.subject_relation(attr)
       
   924             if not rschema.is_final(): # inlined relation
       
   925                 entity.set_related_cache(attr, 'subject',
       
   926                                          entity.req.eid_rset(entity[attr]))
       
   927                 relations.append((attr, entity[attr]))
       
   928         if source.should_call_hooks:
       
   929             self.hm.call_hooks('before_add_entity', etype, session, entity)
       
   930         entity.set_defaults()
       
   931         entity.check(creation=True)
       
   932         source.add_entity(session, entity)
       
   933         if source.uri != 'system':
       
   934             extid = source.get_extid(entity)
       
   935             self._extid_cache[(str(extid), source.uri)] = entity.eid
       
   936         else:
       
   937             extid = None
       
   938         self.add_info(session, entity, source, extid, complete=False)
       
   939         entity._is_saved = True # entity has an eid and is saved
       
   940         #print 'added', entity#, entity.items()
       
   941         # trigger after_add_entity after after_add_relation
       
   942         if source.should_call_hooks:
       
   943             self.hm.call_hooks('after_add_entity', etype, session, entity)
       
   944             # call hooks for inlined relations
       
   945             for attr, value in relations:
       
   946                 self.hm.call_hooks('before_add_relation', attr, session,
       
   947                                     entity.eid, attr, value)
       
   948                 self.hm.call_hooks('after_add_relation', attr, session,
       
   949                                     entity.eid, attr, value)
       
   950         return entity.eid
       
   951         
       
   952     def glob_update_entity(self, session, entity):
       
   953         """replace an entity in the repository
       
   954         the type and the eid of an entity must not be changed
       
   955         """
       
   956         #print 'update', entity
       
   957         entity.check()
       
   958         etype = str(entity.e_schema)
       
   959         eschema = entity.e_schema
       
   960         only_inline_rels, need_fti_update = True, False
       
   961         relations = []
       
   962         for attr in entity.keys():
       
   963             if attr == 'eid':
       
   964                 continue
       
   965             rschema = eschema.subject_relation(attr)
       
   966             if rschema.is_final():
       
   967                 if eschema.rproperty(attr, 'fulltextindexed'):
       
   968                     need_fti_update = True
       
   969                 only_inline_rels = False
       
   970             else:
       
   971                 # inlined relation
       
   972                 previous_value = entity.related(attr)
       
   973                 if previous_value:
       
   974                     previous_value = previous_value[0][0] # got a result set
       
   975                     self.hm.call_hooks('before_delete_relation', attr, session,
       
   976                                        entity.eid, attr, previous_value)
       
   977                 entity.set_related_cache(attr, 'subject',
       
   978                                          entity.req.eid_rset(entity[attr]))
       
   979                 relations.append((attr, entity[attr], previous_value))
       
   980         source = self.source_from_eid(entity.eid, session)
       
   981         if source.should_call_hooks:
       
   982             # call hooks for inlined relations
       
   983             for attr, value, _ in relations:
       
   984                 self.hm.call_hooks('before_add_relation', attr, session,
       
   985                                     entity.eid, attr, value)
       
   986             if not only_inline_rels:
       
   987                 self.hm.call_hooks('before_update_entity', etype, session,
       
   988                                     entity)
       
   989         source.update_entity(session, entity)
       
   990         if not only_inline_rels:
       
   991             if need_fti_update:
       
   992                 # reindex the entity only if this query is updating at least
       
   993                 # one indexable attribute
       
   994                 FTIndexEntityOp(session, entity=entity)
       
   995             if source.should_call_hooks:
       
   996                 self.hm.call_hooks('after_update_entity', etype, session,
       
   997                                     entity)
       
   998         if source.should_call_hooks:
       
   999             for attr, value, prevvalue in relations:
       
  1000                 if prevvalue:
       
  1001                     self.hm.call_hooks('after_delete_relation', attr, session,
       
  1002                                        entity.eid, attr, prevvalue)
       
  1003                 del_existing_rel_if_needed(session, entity.eid, attr, value)
       
  1004                 self.hm.call_hooks('after_add_relation', attr, session,
       
  1005                                     entity.eid, attr, value)
       
  1006 
       
  1007     def glob_delete_entity(self, session, eid):
       
  1008         """delete an entity and all related entities from the repository"""
       
  1009         #print 'deleting', eid
       
  1010         # call delete_info before hooks
       
  1011         self._prepare_delete_info(session, eid)
       
  1012         etype, uri, extid = self.type_and_source_from_eid(eid, session)
       
  1013         source = self.sources_by_uri[uri]
       
  1014         if source.should_call_hooks:
       
  1015             self.hm.call_hooks('before_delete_entity', etype, session, eid)
       
  1016         self._delete_info(session, eid)
       
  1017         source.delete_entity(session, etype, eid)
       
  1018         if source.should_call_hooks:
       
  1019             self.hm.call_hooks('after_delete_entity', etype, session, eid)
       
  1020         # don't clear cache here this is done in a hook on commit
       
  1021         
       
  1022     def glob_add_relation(self, session, subject, rtype, object):
       
  1023         """add a relation to the repository"""
       
  1024         assert subject is not None
       
  1025         assert rtype
       
  1026         assert object is not None
       
  1027         source = self.locate_relation_source(session, subject, rtype, object)
       
  1028         #print 'adding', subject, rtype, object, 'to', source
       
  1029         if source.should_call_hooks:
       
  1030             del_existing_rel_if_needed(session, subject, rtype, object)
       
  1031             self.hm.call_hooks('before_add_relation', rtype, session,
       
  1032                                subject, rtype, object)
       
  1033         source.add_relation(session, subject, rtype, object)
       
  1034         if source.should_call_hooks:
       
  1035             self.hm.call_hooks('after_add_relation', rtype, session,
       
  1036                                subject, rtype, object)
       
  1037 
       
  1038     def glob_delete_relation(self, session, subject, rtype, object):
       
  1039         """delete a relation from the repository"""
       
  1040         assert subject is not None
       
  1041         assert rtype
       
  1042         assert object is not None
       
  1043         source = self.locate_relation_source(session, subject, rtype, object)
       
  1044         #print 'delete rel', subject, rtype, object
       
  1045         if source.should_call_hooks:
       
  1046             self.hm.call_hooks('before_delete_relation', rtype, session,
       
  1047                                subject, rtype, object)
       
  1048         source.delete_relation(session, subject, rtype, object)
       
  1049         if self.schema.rschema(rtype).symetric:
       
  1050             # on symetric relation, we can't now in which sense it's
       
  1051             # stored so try to delete both
       
  1052             source.delete_relation(session, object, rtype, subject)
       
  1053         if source.should_call_hooks:
       
  1054             self.hm.call_hooks('after_delete_relation', rtype, session,
       
  1055                                subject, rtype, object)
       
  1056 
       
  1057 
       
  1058     # pyro handling ###########################################################
       
  1059     
       
  1060     def pyro_register(self, host=''):
       
  1061         """register the repository as a pyro object"""
       
  1062         from Pyro import core
       
  1063         port = self.config['pyro-port']
       
  1064         nshost, nsgroup = self.config['pyro-ns-host'], self.config['pyro-ns-group']
       
  1065         nsgroup = ':' + nsgroup
       
  1066         core.initServer(banner=0)
       
  1067         daemon = core.Daemon(host=host, port=port)
       
  1068         daemon.useNameServer(self.pyro_nameserver(nshost, nsgroup))
       
  1069         # use Delegation approach
       
  1070         impl = core.ObjBase()
       
  1071         impl.delegateTo(self)
       
  1072         nsid = self.config['pyro-id'] or self.config.appid
       
  1073         daemon.connect(impl, '%s.%s' % (nsgroup, nsid))
       
  1074         msg = 'repository registered as a pyro object using group %s and id %s'
       
  1075         self.info(msg, nsgroup, nsid)
       
  1076         self.pyro_registered = True
       
  1077         return daemon
       
  1078     
       
  1079     def pyro_nameserver(self, host=None, group=None):
       
  1080         """locate and bind the the name server to the daemon"""
       
  1081         from Pyro import naming, errors
       
  1082         # locate the name server
       
  1083         nameserver = naming.NameServerLocator().getNS(host)
       
  1084         if group is not None:
       
  1085             # make sure our namespace group exists
       
  1086             try:
       
  1087                 nameserver.createGroup(group)
       
  1088             except errors.NamingError:
       
  1089                 pass
       
  1090         return nameserver
       
  1091 
       
  1092 
       
  1093 def pyro_unregister(config):
       
  1094     """unregister the repository from the pyro name server"""
       
  1095     nshost, nsgroup = config['pyro-ns-host'], config['pyro-ns-group']
       
  1096     appid = config['pyro-id'] or config.appid
       
  1097     from Pyro import core, naming, errors
       
  1098     core.initClient(banner=False)
       
  1099     try:
       
  1100         nameserver = naming.NameServerLocator().getNS(nshost)
       
  1101     except errors.PyroError, ex:
       
  1102         # name server not responding
       
  1103         config.error('can\'t locate pyro name server: %s', ex)
       
  1104         return
       
  1105     try:
       
  1106         nameserver.unregister(':%s.%s' % (nsgroup, appid))
       
  1107         config.info('%s unregistered from pyro name server', appid)
       
  1108     except errors.NamingError:
       
  1109         config.warning('%s already unregistered from pyro name server', appid)
       
  1110 
       
  1111 
       
  1112 from logging import getLogger
       
  1113 from cubicweb import set_log_methods
       
  1114 set_log_methods(Repository, getLogger('cubicweb.repository'))