server/session.py
branchstable
changeset 2100 89b825cdec74
parent 2063 fe4278b50388
child 2182 488099333160
child 2188 3a57c8173290
equal deleted inserted replaced
2099:0cba78f4fa12 2100:89b825cdec74
     9 
     9 
    10 import sys
    10 import sys
    11 import threading
    11 import threading
    12 from time import time
    12 from time import time
    13 
    13 
       
    14 from logilab.common.deprecation import obsolete
    14 from rql.nodes import VariableRef, Function, ETYPE_PYOBJ_MAP, etype_from_pyobj
    15 from rql.nodes import VariableRef, Function, ETYPE_PYOBJ_MAP, etype_from_pyobj
    15 from yams import BASE_TYPES
    16 from yams import BASE_TYPES
    16 
    17 
    17 from cubicweb import RequestSessionMixIn, Binary, UnknownEid
    18 from cubicweb import RequestSessionMixIn, Binary, UnknownEid
    18 from cubicweb.dbapi import ConnectionProperties
    19 from cubicweb.dbapi import ConnectionProperties
    63         # shared data, used to communicate extra information between the client
    64         # shared data, used to communicate extra information between the client
    64         # and the rql server
    65         # and the rql server
    65         self.data = {}
    66         self.data = {}
    66         # i18n initialization
    67         # i18n initialization
    67         self.set_language(cnxprops.lang)
    68         self.set_language(cnxprops.lang)
       
    69         # internals
    68         self._threaddata = threading.local()
    70         self._threaddata = threading.local()
    69         self._threads_in_transaction = set()
    71         self._threads_in_transaction = set()
    70         self._closed = False
    72         self._closed = False
    71 
    73 
    72     def get_mode(self):
    74     def __str__(self):
    73         return getattr(self._threaddata, 'mode', 'read')
    75         return '<%ssession %s (%s 0x%x)>' % (self.cnxtype, self.user.login,
    74     def set_mode(self, value):
    76                                              self.id, id(self))
    75         self._threaddata.mode = value
    77     # resource accessors ######################################################
    76     # transaction mode (read/write), resetted to read on commit / rollback
    78 
    77     mode = property(get_mode, set_mode)
    79     def actual_session(self):
    78 
    80         """return the original parent session if any, else self"""
    79     def get_commit_state(self):
    81         return self
    80         return getattr(self._threaddata, 'commit_state', None)
    82 
    81     def set_commit_state(self, value):
    83     def etype_class(self, etype):
    82         self._threaddata.commit_state = value
    84         """return an entity class for the given entity type"""
    83     commit_state = property(get_commit_state, set_commit_state)
    85         return self.vreg.etype_class(etype)
    84 
    86 
    85     # set according to transaction mode for each query
    87     def entity(self, eid):
    86     @property
    88         """return a result set for the given eid"""
    87     def pool(self):
    89         return self.eid_rset(eid).get_entity(0, 0)
    88         return getattr(self._threaddata, 'pool', None)
    90 
    89 
    91     def system_sql(self, sql, args=None):
    90     # pending transaction operations
    92         """return a sql cursor on the system database"""
    91     @property
    93         if not sql.split(None, 1)[0].upper() == 'SELECT':
    92     def pending_operations(self):
    94             self.mode = 'write'
    93         try:
    95         cursor = self.pool['system']
    94             return self._threaddata.pending_operations
    96         self.pool.source('system').doexec(cursor, sql, args)
    95         except AttributeError:
    97         return cursor
    96             self._threaddata.pending_operations = []
       
    97             return self._threaddata.pending_operations
       
    98 
       
    99     # rql rewriter
       
   100     @property
       
   101     def rql_rewriter(self):
       
   102         try:
       
   103             return self._threaddata._rewriter
       
   104         except AttributeError:
       
   105             self._threaddata._rewriter = RQLRewriter(self.repo.querier, self)
       
   106             return self._threaddata._rewriter
       
   107 
       
   108     # transaction queries data
       
   109     @property
       
   110     def _query_data(self):
       
   111         try:
       
   112             return self._threaddata._query_data
       
   113         except AttributeError:
       
   114             self._threaddata._query_data = {}
       
   115             return self._threaddata._query_data
       
   116 
    98 
   117     def set_language(self, language):
    99     def set_language(self, language):
   118         """i18n configuration for translation"""
   100         """i18n configuration for translation"""
   119         vreg = self.vreg
   101         vreg = self.vreg
   120         language = language or self.user.property_value('ui.language')
   102         language = language or self.user.property_value('ui.language')
   130 
   112 
   131     def change_property(self, prop, value):
   113     def change_property(self, prop, value):
   132         assert prop == 'lang' # this is the only one changeable property for now
   114         assert prop == 'lang' # this is the only one changeable property for now
   133         self.set_language(value)
   115         self.set_language(value)
   134 
   116 
   135     def __str__(self):
   117     # connection management ###################################################
   136         return '<%ssession %s (%s 0x%x)>' % (self.cnxtype, self.user.login,
   118 
   137                                              self.id, id(self))
   119     def get_mode(self):
   138 
   120         return getattr(self._threaddata, 'mode', 'read')
   139     def etype_class(self, etype):
   121     def set_mode(self, value):
   140         """return an entity class for the given entity type"""
   122         self._threaddata.mode = value
   141         return self.vreg.etype_class(etype)
   123     mode = property(get_mode, set_mode,
   142 
   124                     doc='transaction mode (read/write), resetted to read on '
   143     def entity(self, eid):
   125                     'commit / rollback')
   144         """return a result set for the given eid"""
   126 
   145         return self.eid_rset(eid).get_entity(0, 0)
   127     def get_commit_state(self):
   146 
   128         return getattr(self._threaddata, 'commit_state', None)
   147     def _touch(self):
   129     def set_commit_state(self, value):
   148         """update latest session usage timestamp and reset mode to read
   130         self._threaddata.commit_state = value
   149         """
   131     commit_state = property(get_commit_state, set_commit_state)
   150         self.timestamp = time()
   132 
   151         self.local_perm_cache.clear()
   133     @property
   152         self._threaddata.mode = 'read'
   134     def pool(self):
       
   135         """connections pool, set according to transaction mode for each query"""
       
   136         return getattr(self._threaddata, 'pool', None)
   153 
   137 
   154     def set_pool(self):
   138     def set_pool(self):
   155         """the session need a pool to execute some queries"""
   139         """the session need a pool to execute some queries"""
   156         if self._closed:
   140         if self._closed:
   157             raise Exception('try to set pool on a closed session')
   141             raise Exception('try to set pool on a closed session')
   166                 raise
   150                 raise
   167             self._threads_in_transaction.add(threading.currentThread())
   151             self._threads_in_transaction.add(threading.currentThread())
   168         return self._threaddata.pool
   152         return self._threaddata.pool
   169 
   153 
   170     def reset_pool(self):
   154     def reset_pool(self):
   171         """the session has no longer using its pool, at least for some time
   155         """the session has no longer using its pool, at least for some time"""
   172         """
       
   173         # pool may be none if no operation has been done since last commit
   156         # pool may be none if no operation has been done since last commit
   174         # or rollback
   157         # or rollback
   175         if self.pool is not None and self.mode == 'read':
   158         if self.pool is not None and self.mode == 'read':
   176             # even in read mode, we must release the current transaction
   159             # even in read mode, we must release the current transaction
   177             pool = self.pool
   160             pool = self.pool
   179             pool.pool_reset()
   162             pool.pool_reset()
   180             self._threaddata.pool = None
   163             self._threaddata.pool = None
   181             # free pool once everything is done to avoid race-condition
   164             # free pool once everything is done to avoid race-condition
   182             self.repo._free_pool(pool)
   165             self.repo._free_pool(pool)
   183 
   166 
   184     def system_sql(self, sql, args=None):
   167     def _touch(self):
   185         """return a sql cursor on the system database"""
   168         """update latest session usage timestamp and reset mode to read"""
   186         if not sql.split(None, 1)[0].upper() == 'SELECT':
   169         self.timestamp = time()
   187             self.mode = 'write'
   170         self.local_perm_cache.clear()
   188         cursor = self.pool['system']
   171         self._threaddata.mode = 'read'
   189         self.pool.source('system').doexec(cursor, sql, args)
       
   190         return cursor
       
   191 
       
   192     def actual_session(self):
       
   193         """return the original parent session if any, else self"""
       
   194         return self
       
   195 
   172 
   196     # shared data handling ###################################################
   173     # shared data handling ###################################################
   197 
   174 
   198     def get_shared_data(self, key, default=None, pop=False):
   175     def get_shared_data(self, key, default=None, pop=False):
   199         """return value associated to `key` in session data"""
   176         """return value associated to `key` in session data"""
   203             return self.data.get(key, default)
   180             return self.data.get(key, default)
   204 
   181 
   205     def set_shared_data(self, key, value, querydata=False):
   182     def set_shared_data(self, key, value, querydata=False):
   206         """set value associated to `key` in session data"""
   183         """set value associated to `key` in session data"""
   207         if querydata:
   184         if querydata:
   208             self.set_query_data(key, value)
   185             self.transaction_data[key] = value
   209         else:
   186         else:
   210             self.data[key] = value
   187             self.data[key] = value
   211 
   188 
   212     # request interface #######################################################
   189     # request interface #######################################################
   213 
   190 
   289 
   266 
   290     def commit(self, reset_pool=True):
   267     def commit(self, reset_pool=True):
   291         """commit the current session's transaction"""
   268         """commit the current session's transaction"""
   292         if self.pool is None:
   269         if self.pool is None:
   293             assert not self.pending_operations
   270             assert not self.pending_operations
   294             self._query_data.clear()
   271             self.transaction_data.clear()
   295             self._touch()
   272             self._touch()
   296             return
   273             return
   297         if self.commit_state:
   274         if self.commit_state:
   298             return
   275             return
   299         # on rollback, an operation should have the following state
   276         # on rollback, an operation should have the following state
   322             self.pool.commit()
   299             self.pool.commit()
   323         finally:
   300         finally:
   324             self._touch()
   301             self._touch()
   325             self.commit_state = None
   302             self.commit_state = None
   326             self.pending_operations[:] = []
   303             self.pending_operations[:] = []
   327             self._query_data.clear()
   304             self.transaction_data.clear()
   328             if reset_pool:
   305             if reset_pool:
   329                 self.reset_pool()
   306                 self.reset_pool()
   330 
   307 
   331     def rollback(self, reset_pool=True):
   308     def rollback(self, reset_pool=True):
   332         """rollback the current session's transaction"""
   309         """rollback the current session's transaction"""
   333         if self.pool is None:
   310         if self.pool is None:
   334             assert not self.pending_operations
   311             assert not self.pending_operations
   335             self._query_data.clear()
   312             self.transaction_data.clear()
   336             self._touch()
   313             self._touch()
   337             return
   314             return
   338         try:
   315         try:
   339             while self.pending_operations:
   316             while self.pending_operations:
   340                 try:
   317                 try:
   345                     continue
   322                     continue
   346             self.pool.rollback()
   323             self.pool.rollback()
   347         finally:
   324         finally:
   348             self._touch()
   325             self._touch()
   349             self.pending_operations[:] = []
   326             self.pending_operations[:] = []
   350             self._query_data.clear()
   327             self.transaction_data.clear()
   351             if reset_pool:
   328             if reset_pool:
   352                 self.reset_pool()
   329                 self.reset_pool()
   353 
   330 
   354     def close(self):
   331     def close(self):
   355         """do not close pool on session close, since they are shared now"""
   332         """do not close pool on session close, since they are shared now"""
   372                            'session anyway', thread)
   349                            'session anyway', thread)
   373         self.rollback()
   350         self.rollback()
   374 
   351 
   375     # transaction data/operations management ##################################
   352     # transaction data/operations management ##################################
   376 
   353 
   377     def add_query_data(self, key, value):
   354     @property
   378         self._query_data.setdefault(key, []).append(value)
   355     def transaction_data(self):
   379 
   356         try:
   380     def set_query_data(self, key, value):
   357             return self._threaddata.transaction_data
   381         self._query_data[key] = value
   358         except AttributeError:
   382 
   359             self._threaddata.transaction_data = {}
       
   360             return self._threaddata.transaction_data
       
   361 
       
   362     @obsolete('use direct access to session.transaction_data')
   383     def query_data(self, key, default=None, setdefault=False, pop=False):
   363     def query_data(self, key, default=None, setdefault=False, pop=False):
   384         if setdefault:
   364         if setdefault:
   385             assert not pop
   365             assert not pop
   386             return self._query_data.setdefault(key, default)
   366             return self.transaction_data.setdefault(key, default)
   387         if pop:
   367         if pop:
   388             return self._query_data.pop(key, default)
   368             return self.transaction_data.pop(key, default)
   389         else:
   369         else:
   390             return self._query_data.get(key, default)
   370             return self.transaction_data.get(key, default)
       
   371 
       
   372     @property
       
   373     def pending_operations(self):
       
   374         try:
       
   375             return self._threaddata.pending_operations
       
   376         except AttributeError:
       
   377             self._threaddata.pending_operations = []
       
   378             return self._threaddata.pending_operations
       
   379 
   391 
   380 
   392     def add_operation(self, operation, index=None):
   381     def add_operation(self, operation, index=None):
   393         """add an observer"""
   382         """add an observer"""
   394         assert self.commit_state != 'commit'
   383         assert self.commit_state != 'commit'
   395         if index is not None:
   384         if index is not None:
   396             self.pending_operations.insert(index, operation)
   385             self.pending_operations.insert(index, operation)
   397         else:
   386         else:
   398             self.pending_operations.append(operation)
   387             self.pending_operations.append(operation)
   399 
   388 
   400     # querier helpers #########################################################
   389     # querier helpers #########################################################
       
   390 
       
   391     @property
       
   392     def rql_rewriter(self):
       
   393         try:
       
   394             return self._threaddata._rewriter
       
   395         except AttributeError:
       
   396             self._threaddata._rewriter = RQLRewriter(self.repo.querier, self)
       
   397             return self._threaddata._rewriter
   401 
   398 
   402     def build_description(self, rqlst, args, result):
   399     def build_description(self, rqlst, args, result):
   403         """build a description for a given result"""
   400         """build a description for a given result"""
   404         if len(rqlst.children) == 1 and len(rqlst.children[0].solutions) == 1:
   401         if len(rqlst.children) == 1 and len(rqlst.children[0].solutions) == 1:
   405             # easy, all lines are identical
   402             # easy, all lines are identical
   503         return self.parent_session.pool
   500         return self.parent_session.pool
   504     @property
   501     @property
   505     def pending_operations(self):
   502     def pending_operations(self):
   506         return self.parent_session.pending_operations
   503         return self.parent_session.pending_operations
   507     @property
   504     @property
   508     def _query_data(self):
   505     def transaction_data(self):
   509         return self.parent_session._query_data
   506         return self.parent_session.transaction_data
   510 
   507 
   511     def set_pool(self):
   508     def set_pool(self):
   512         """the session need a pool to execute some queries"""
   509         """the session need a pool to execute some queries"""
   513         self.parent_session.set_pool()
   510         self.parent_session.set_pool()
   514 
   511