--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/server/session.py Wed Nov 05 15:52:50 2008 +0100
@@ -0,0 +1,603 @@
+"""Repository users' and internal' sessions.
+
+:organization: Logilab
+:copyright: 2001-2008 LOGILAB S.A. (Paris, FRANCE), all rights reserved.
+:contact: http://www.logilab.fr/ -- mailto:contact@logilab.fr
+"""
+__docformat__ = "restructuredtext en"
+
+import sys
+import threading
+from time import time
+from types import NoneType
+from decimal import Decimal
+
+from mx.DateTime import DateTimeType, DateTimeDeltaType
+from rql.nodes import VariableRef, Function
+from yams import BASE_TYPES
+
+from cubicweb import RequestSessionMixIn, Binary
+from cubicweb.dbapi import ConnectionProperties
+from cubicweb.common.utils import make_uid
+from cubicweb.server.rqlrewrite import RQLRewriter
+
+def etype_from_pyobj(value):
+ """guess yams type from python value"""
+ # note:
+ # * Password is not selectable so no problem)
+ # * use type(value) and not value.__class__ since mx instances have no
+ # __class__ attribute
+ # * XXX Date, Time
+ return {bool: 'Boolean',
+ int: 'Int',
+ long: 'Int',
+ float: 'Float',
+ Decimal: 'Decimal',
+ unicode: 'String',
+ NoneType: None,
+ Binary: 'Bytes',
+ DateTimeType: 'Datetime',
+ DateTimeDeltaType: 'Interval',
+ }[type(value)]
+
+def is_final(rqlst, variable, args):
+ # try to find if this is a final var or not
+ for select in rqlst.children:
+ for sol in select.solutions:
+ etype = variable.get_type(sol, args)
+ if etype is None:
+ continue
+ if etype in BASE_TYPES:
+ return True
+ return False
+
+def _make_description(selected, args, solution):
+ """return a description for a result set"""
+ description = []
+ for term in selected:
+ description.append(term.get_type(solution, args))
+ return description
+
+#XXX rql <= 0.18.3 bw compat
+from rql import stmts
+if not hasattr(stmts.Union, 'get_variable_variables'):
+ def _union_get_variable_variables(self):
+ """return the set of variable names which take different type according to
+ the solution
+ """
+ change = set()
+ values = {}
+ for select in self.children:
+ change.update(select.get_variable_variables(values))
+ return change
+ stmts.Union.get_variable_variables = _union_get_variable_variables
+
+ def _select_get_variable_variables(self, _values=None):
+ """return the set of variable names which take different type according to
+ the solution
+ """
+ change = set()
+ if _values is None:
+ _values = {}
+ for solution in self.solutions:
+ for vname, etype in solution.iteritems():
+ if not vname in _values:
+ _values[vname] = etype
+ elif _values[vname] != etype:
+ change.add(vname)
+ return change
+ stmts.Select.get_variable_variables = _select_get_variable_variables
+
+class Session(RequestSessionMixIn):
+ """tie session id, user, connections pool and other session data all
+ together
+ """
+
+ def __init__(self, user, repo, cnxprops=None, _id=None):
+ super(Session, self).__init__(repo.vreg)
+ self.id = _id or make_uid(user.login.encode('UTF8'))
+ cnxprops = cnxprops or ConnectionProperties('inmemory')
+ self.user = user
+ self.repo = repo
+ self.cnxtype = cnxprops.cnxtype
+ self.creation = time()
+ self.timestamp = self.creation
+ self.is_internal_session = False
+ self.is_super_session = False
+ # short cut to querier .execute method
+ self._execute = repo.querier.execute
+ # shared data, used to communicate extra information between the client
+ # and the rql server
+ self.data = {}
+ # i18n initialization
+ self.set_language(cnxprops.lang)
+ self._threaddata = threading.local()
+
+ def get_mode(self):
+ return getattr(self._threaddata, 'mode', 'read')
+ def set_mode(self, value):
+ self._threaddata.mode = value
+ # transaction mode (read/write), resetted to read on commit / rollback
+ mode = property(get_mode, set_mode)
+
+ def get_commit_state(self):
+ return getattr(self._threaddata, 'commit_state', None)
+ def set_commit_state(self, value):
+ self._threaddata.commit_state = value
+ commit_state = property(get_commit_state, set_commit_state)
+
+ # set according to transaction mode for each query
+ @property
+ def pool(self):
+ return getattr(self._threaddata, 'pool', None)
+
+ # pending transaction operations
+ @property
+ def pending_operations(self):
+ try:
+ return self._threaddata.pending_operations
+ except AttributeError:
+ self._threaddata.pending_operations = []
+ return self._threaddata.pending_operations
+
+ # rql rewriter
+ @property
+ def rql_rewriter(self):
+ try:
+ return self._threaddata._rewriter
+ except AttributeError:
+ self._threaddata._rewriter = RQLRewriter(self.repo.querier, self)
+ return self._threaddata._rewriter
+
+ # transaction queries data
+ @property
+ def _query_data(self):
+ try:
+ return self._threaddata._query_data
+ except AttributeError:
+ self._threaddata._query_data = {}
+ return self._threaddata._query_data
+
+ def set_language(self, language):
+ """i18n configuration for translation"""
+ vreg = self.vreg
+ language = language or self.user.property_value('ui.language')
+ try:
+ self._ = self.__ = vreg.config.translations[language]
+ except KeyError:
+ language = vreg.property_value('ui.language')
+ try:
+ self._ = self.__ = vreg.config.translations[language]
+ except KeyError:
+ self._ = self.__ = unicode
+ self.lang = language
+
+ def change_property(self, prop, value):
+ assert prop == 'lang' # this is the only one changeable property for now
+ self.set_language(value)
+
+ def __str__(self):
+ return '<%ssession %s (%s 0x%x)>' % (self.cnxtype, self.user.login,
+ self.id, id(self))
+
+ def etype_class(self, etype):
+ """return an entity class for the given entity type"""
+ return self.vreg.etype_class(etype)
+
+ def entity(self, eid):
+ """return a result set for the given eid"""
+ return self.eid_rset(eid).get_entity(0, 0)
+
+ def _touch(self):
+ """update latest session usage timestamp and reset mode to read
+ """
+ self.timestamp = time()
+ self.local_perm_cache.clear()
+ self._threaddata.mode = 'read'
+
+ def set_pool(self):
+ """the session need a pool to execute some queries"""
+ if self.pool is None:
+ self._threaddata.pool = self.repo._get_pool()
+ try:
+ self._threaddata.pool.pool_set(self)
+ except:
+ self.repo._free_pool(self.pool)
+ self._threaddata.pool = None
+ raise
+ return self._threaddata.pool
+
+ def reset_pool(self):
+ """the session has no longer using its pool, at least for some time
+ """
+ # pool may be none if no operation has been done since last commit
+ # or rollback
+ if self.pool is not None and self.mode == 'read':
+ # even in read mode, we must release the current transaction
+ self.repo._free_pool(self.pool)
+ self.pool.pool_reset(self)
+ self._threaddata.pool = None
+
+ def system_sql(self, sql, args=None):
+ """return a sql cursor on the system database"""
+ if not sql.split(None, 1)[0].upper() == 'SELECT':
+ self.mode = 'write'
+ cursor = self.pool['system']
+ self.pool.source('system').doexec(cursor, sql, args)
+ return cursor
+
+ def actual_session(self):
+ """return the original parent session if any, else self"""
+ return self
+
+ # shared data handling ###################################################
+
+ def get_shared_data(self, key, default=None, pop=False):
+ """return value associated to `key` in session data"""
+ if pop:
+ return self.data.pop(key, default)
+ else:
+ return self.data.get(key, default)
+
+ def set_shared_data(self, key, value, querydata=False):
+ """set value associated to `key` in session data"""
+ if querydata:
+ self.set_query_data(key, value)
+ else:
+ self.data[key] = value
+
+ # request interface #######################################################
+
+ def set_entity_cache(self, entity):
+ # no entity cache in the server, too high risk of inconsistency
+ # between pre/post hooks
+ pass
+
+ def entity_cache(self, eid):
+ raise KeyError(eid)
+
+ def base_url(self):
+ return self.repo.config['base-url'] or u''
+
+ def from_controller(self):
+ """return the id (string) of the controller issuing the request (no
+ sense here, always return 'view')
+ """
+ return 'view'
+
+ def source_defs(self):
+ return self.repo.source_defs()
+
+ def describe(self, eid):
+ """return a tuple (type, sourceuri, extid) for the entity with id <eid>"""
+ return self.repo.type_and_source_from_eid(eid, self)
+
+ # db-api like interface ###################################################
+
+ def source_from_eid(self, eid):
+ """return the source where the entity with id <eid> is located"""
+ return self.repo.source_from_eid(eid, self)
+
+ def decorate_rset(self, rset, propagate=False):
+ rset.vreg = self.vreg
+ rset.req = propagate and self or self.actual_session()
+ return rset
+
+ @property
+ def super_session(self):
+ try:
+ csession = self._threaddata.childsession
+ except AttributeError:
+ if self.is_super_session:
+ csession = self
+ else:
+ csession = ChildSession(self)
+ self._threaddata.childsession = csession
+ # need shared pool set
+ self.set_pool()
+ return csession
+
+ def unsafe_execute(self, rql, kwargs=None, eid_key=None, build_descr=False,
+ propagate=False):
+ """like .execute but with security checking disabled (this method is
+ internal to the server, it's not part of the db-api)
+
+ if `propagate` is true, the super_session will be attached to the result
+ set instead of the parent session, hence further query done through
+ entities fetched from this result set will bypass security as well
+ """
+ return self.super_session.execute(rql, kwargs, eid_key, build_descr,
+ propagate)
+
+ @property
+ def cursor(self):
+ """return a rql cursor"""
+ return self
+
+ def execute(self, rql, kwargs=None, eid_key=None, build_descr=True,
+ propagate=False):
+ """db-api like method directly linked to the querier execute method
+
+ Becare that unlike actual cursor.execute, `build_descr` default to
+ false
+ """
+ rset = self._execute(self, rql, kwargs, eid_key, build_descr)
+ return self.decorate_rset(rset, propagate)
+
+ def commit(self, reset_pool=True):
+ """commit the current session's transaction"""
+ if self.pool is None:
+ assert not self.pending_operations
+ self._query_data.clear()
+ self._touch()
+ return
+ if self.commit_state:
+ return
+ # on rollback, an operation should have the following state
+ # information:
+ # - processed by the precommit/commit event or not
+ # - if processed, is it the failed operation
+ try:
+ for trstate in ('precommit', 'commit'):
+ processed = []
+ self.commit_state = trstate
+ try:
+ while self.pending_operations:
+ operation = self.pending_operations.pop(0)
+ operation.processed = trstate
+ processed.append(operation)
+ operation.handle_event('%s_event' % trstate)
+ self.pending_operations[:] = processed
+ self.debug('%s session %s done', trstate, self.id)
+ except:
+ self.exception('error while %sing', trstate)
+ operation.failed = True
+ for operation in processed:
+ operation.handle_event('revert%s_event' % trstate)
+ self.rollback(reset_pool)
+ raise
+ self.pool.commit()
+ finally:
+ self._touch()
+ self.commit_state = None
+ self.pending_operations[:] = []
+ self._query_data.clear()
+ if reset_pool:
+ self.reset_pool()
+
+ def rollback(self, reset_pool=True):
+ """rollback the current session's transaction"""
+ if self.pool is None:
+ assert not self.pending_operations
+ self._query_data.clear()
+ self._touch()
+ return
+ try:
+ while self.pending_operations:
+ try:
+ operation = self.pending_operations.pop(0)
+ operation.handle_event('rollback_event')
+ except:
+ self.critical('rollback error', exc_info=sys.exc_info())
+ continue
+ self.pool.rollback()
+ finally:
+ self._touch()
+ self.pending_operations[:] = []
+ self._query_data.clear()
+ if reset_pool:
+ self.reset_pool()
+
+ def close(self):
+ """do not close pool on session close, since they are shared now"""
+ self.rollback()
+
+ # transaction data/operations management ##################################
+
+ def add_query_data(self, key, value):
+ self._query_data.setdefault(key, []).append(value)
+
+ def set_query_data(self, key, value):
+ self._query_data[key] = value
+
+ def query_data(self, key, default=None, setdefault=False, pop=False):
+ if setdefault:
+ assert not pop
+ return self._query_data.setdefault(key, default)
+ if pop:
+ return self._query_data.pop(key, default)
+ else:
+ return self._query_data.get(key, default)
+
+ def add_operation(self, operation, index=None):
+ """add an observer"""
+ assert self.commit_state != 'commit'
+ if index is not None:
+ self.pending_operations.insert(index, operation)
+ else:
+ self.pending_operations.append(operation)
+
+ # querier helpers #########################################################
+
+ def build_description(self, rqlst, args, result):
+ """build a description for a given result"""
+ if len(rqlst.children) == 1 and len(rqlst.children[0].solutions) == 1:
+ # easy, all lines are identical
+ selected = rqlst.children[0].selection
+ solution = rqlst.children[0].solutions[0]
+ description = _make_description(selected, args, solution)
+ return [tuple(description)] * len(result)
+ # hard, delegate the work :o)
+ return self.manual_build_descr(rqlst, args, result)
+
+ def manual_build_descr(self, rqlst, args, result):
+ """build a description for a given result by analysing each row
+
+ XXX could probably be done more efficiently during execution of query
+ """
+ # not so easy, looks for variable which changes from one solution
+ # to another
+ unstables = rqlst.get_variable_variables()
+ basedescription = []
+ todetermine = []
+ selected = rqlst.children[0].selection # sample selection
+ for i, term in enumerate(selected):
+ if isinstance(term, Function) and term.descr().rtype is not None:
+ basedescription.append(term.get_type(term.descr().rtype, args))
+ continue
+ for vref in term.get_nodes(VariableRef):
+ if vref.name in unstables:
+ basedescription.append(None)
+ todetermine.append( (i, is_final(rqlst, vref.variable, args)) )
+ break
+ else:
+ # sample etype
+ etype = rqlst.children[0].solutions[0]
+ basedescription.append(term.get_type(etype, args))
+ if not todetermine:
+ return [tuple(basedescription)] * len(result)
+ return self._build_descr(result, basedescription, todetermine)
+
+ def _build_descr(self, result, basedescription, todetermine):
+ description = []
+ etype_from_eid = self.describe
+ for row in result:
+ row_descr = basedescription
+ for index, isfinal in todetermine:
+ value = row[index]
+ if value is None:
+ # None value inserted by an outer join, no type
+ row_descr[index] = None
+ continue
+ if isfinal:
+ row_descr[index] = etype_from_pyobj(value)
+ else:
+ row_descr[index] = etype_from_eid(value)[0]
+ description.append(tuple(row_descr))
+ return description
+
+
+class ChildSession(Session):
+ """child (or internal) session are used to hijack the security system
+ """
+ cnxtype = 'inmemory'
+
+ def __init__(self, parent_session):
+ self.id = None
+ self.is_internal_session = False
+ self.is_super_session = True
+ # session which has created this one
+ self.parent_session = parent_session
+ self.user = InternalManager()
+ self.repo = parent_session.repo
+ self.vreg = parent_session.vreg
+ self.data = parent_session.data
+ self.encoding = parent_session.encoding
+ self.lang = parent_session.lang
+ self._ = self.__ = parent_session._
+ # short cut to querier .execute method
+ self._execute = self.repo.querier.execute
+
+ @property
+ def super_session(self):
+ return self
+
+ def get_mode(self):
+ return self.parent_session.mode
+ def set_mode(self, value):
+ self.parent_session.set_mode(value)
+ mode = property(get_mode, set_mode)
+
+ def get_commit_state(self):
+ return self.parent_session.commit_state
+ def set_commit_state(self, value):
+ self.parent_session.set_commit_state(value)
+ commit_state = property(get_commit_state, set_commit_state)
+
+ @property
+ def pool(self):
+ return self.parent_session.pool
+ @property
+ def pending_operations(self):
+ return self.parent_session.pending_operations
+ @property
+ def _query_data(self):
+ return self.parent_session._query_data
+
+ def set_pool(self):
+ """the session need a pool to execute some queries"""
+ self.parent_session.set_pool()
+
+ def reset_pool(self):
+ """the session has no longer using its pool, at least for some time
+ """
+ self.parent_session.reset_pool()
+
+ def actual_session(self):
+ """return the original parent session if any, else self"""
+ return self.parent_session
+
+ def commit(self, reset_pool=True):
+ """commit the current session's transaction"""
+ self.parent_session.commit(reset_pool)
+
+ def rollback(self, reset_pool=True):
+ """rollback the current session's transaction"""
+ self.parent_session.rollback(reset_pool)
+
+ def close(self):
+ """do not close pool on session close, since they are shared now"""
+ self.rollback()
+
+ def user_data(self):
+ """returns a dictionnary with this user's information"""
+ return self.parent_session.user_data()
+
+
+class InternalSession(Session):
+ """special session created internaly by the repository"""
+
+ def __init__(self, repo, cnxprops=None):
+ super(InternalSession, self).__init__(_IMANAGER, repo, cnxprops,
+ _id='internal')
+ self.cnxtype = 'inmemory'
+ self.is_internal_session = True
+ self.is_super_session = True
+
+ @property
+ def super_session(self):
+ return self
+
+
+class InternalManager(object):
+ """a manager user with all access rights used internally for task such as
+ bootstrapping the repository or creating regular users according to
+ repository content
+ """
+ def __init__(self):
+ self.eid = -1
+ self.login = u'__internal_manager__'
+ self.properties = {}
+
+ def matching_groups(self, groups):
+ return 1
+
+ def is_in_group(self, group):
+ return True
+
+ def owns(self, eid):
+ return True
+
+ def has_permission(self, pname, contexteid=None):
+ return True
+
+ def property_value(self, key):
+ if key == 'ui.language':
+ return 'en'
+ return None
+
+_IMANAGER= InternalManager()
+
+from logging import getLogger
+from cubicweb import set_log_methods
+set_log_methods(Session, getLogger('cubicweb.session'))