diff -r 000000000000 -r b97547f5f1fa server/session.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/server/session.py Wed Nov 05 15:52:50 2008 +0100 @@ -0,0 +1,603 @@ +"""Repository users' and internal' sessions. + +:organization: Logilab +:copyright: 2001-2008 LOGILAB S.A. (Paris, FRANCE), all rights reserved. +:contact: http://www.logilab.fr/ -- mailto:contact@logilab.fr +""" +__docformat__ = "restructuredtext en" + +import sys +import threading +from time import time +from types import NoneType +from decimal import Decimal + +from mx.DateTime import DateTimeType, DateTimeDeltaType +from rql.nodes import VariableRef, Function +from yams import BASE_TYPES + +from cubicweb import RequestSessionMixIn, Binary +from cubicweb.dbapi import ConnectionProperties +from cubicweb.common.utils import make_uid +from cubicweb.server.rqlrewrite import RQLRewriter + +def etype_from_pyobj(value): + """guess yams type from python value""" + # note: + # * Password is not selectable so no problem) + # * use type(value) and not value.__class__ since mx instances have no + # __class__ attribute + # * XXX Date, Time + return {bool: 'Boolean', + int: 'Int', + long: 'Int', + float: 'Float', + Decimal: 'Decimal', + unicode: 'String', + NoneType: None, + Binary: 'Bytes', + DateTimeType: 'Datetime', + DateTimeDeltaType: 'Interval', + }[type(value)] + +def is_final(rqlst, variable, args): + # try to find if this is a final var or not + for select in rqlst.children: + for sol in select.solutions: + etype = variable.get_type(sol, args) + if etype is None: + continue + if etype in BASE_TYPES: + return True + return False + +def _make_description(selected, args, solution): + """return a description for a result set""" + description = [] + for term in selected: + description.append(term.get_type(solution, args)) + return description + +#XXX rql <= 0.18.3 bw compat +from rql import stmts +if not hasattr(stmts.Union, 'get_variable_variables'): + def _union_get_variable_variables(self): + """return the set of variable names which take different type according to + the solution + """ + change = set() + values = {} + for select in self.children: + change.update(select.get_variable_variables(values)) + return change + stmts.Union.get_variable_variables = _union_get_variable_variables + + def _select_get_variable_variables(self, _values=None): + """return the set of variable names which take different type according to + the solution + """ + change = set() + if _values is None: + _values = {} + for solution in self.solutions: + for vname, etype in solution.iteritems(): + if not vname in _values: + _values[vname] = etype + elif _values[vname] != etype: + change.add(vname) + return change + stmts.Select.get_variable_variables = _select_get_variable_variables + +class Session(RequestSessionMixIn): + """tie session id, user, connections pool and other session data all + together + """ + + def __init__(self, user, repo, cnxprops=None, _id=None): + super(Session, self).__init__(repo.vreg) + self.id = _id or make_uid(user.login.encode('UTF8')) + cnxprops = cnxprops or ConnectionProperties('inmemory') + self.user = user + self.repo = repo + self.cnxtype = cnxprops.cnxtype + self.creation = time() + self.timestamp = self.creation + self.is_internal_session = False + self.is_super_session = False + # short cut to querier .execute method + self._execute = repo.querier.execute + # shared data, used to communicate extra information between the client + # and the rql server + self.data = {} + # i18n initialization + self.set_language(cnxprops.lang) + self._threaddata = threading.local() + + def get_mode(self): + return getattr(self._threaddata, 'mode', 'read') + def set_mode(self, value): + self._threaddata.mode = value + # transaction mode (read/write), resetted to read on commit / rollback + mode = property(get_mode, set_mode) + + def get_commit_state(self): + return getattr(self._threaddata, 'commit_state', None) + def set_commit_state(self, value): + self._threaddata.commit_state = value + commit_state = property(get_commit_state, set_commit_state) + + # set according to transaction mode for each query + @property + def pool(self): + return getattr(self._threaddata, 'pool', None) + + # pending transaction operations + @property + def pending_operations(self): + try: + return self._threaddata.pending_operations + except AttributeError: + self._threaddata.pending_operations = [] + return self._threaddata.pending_operations + + # rql rewriter + @property + def rql_rewriter(self): + try: + return self._threaddata._rewriter + except AttributeError: + self._threaddata._rewriter = RQLRewriter(self.repo.querier, self) + return self._threaddata._rewriter + + # transaction queries data + @property + def _query_data(self): + try: + return self._threaddata._query_data + except AttributeError: + self._threaddata._query_data = {} + return self._threaddata._query_data + + def set_language(self, language): + """i18n configuration for translation""" + vreg = self.vreg + language = language or self.user.property_value('ui.language') + try: + self._ = self.__ = vreg.config.translations[language] + except KeyError: + language = vreg.property_value('ui.language') + try: + self._ = self.__ = vreg.config.translations[language] + except KeyError: + self._ = self.__ = unicode + self.lang = language + + def change_property(self, prop, value): + assert prop == 'lang' # this is the only one changeable property for now + self.set_language(value) + + def __str__(self): + return '<%ssession %s (%s 0x%x)>' % (self.cnxtype, self.user.login, + self.id, id(self)) + + def etype_class(self, etype): + """return an entity class for the given entity type""" + return self.vreg.etype_class(etype) + + def entity(self, eid): + """return a result set for the given eid""" + return self.eid_rset(eid).get_entity(0, 0) + + def _touch(self): + """update latest session usage timestamp and reset mode to read + """ + self.timestamp = time() + self.local_perm_cache.clear() + self._threaddata.mode = 'read' + + def set_pool(self): + """the session need a pool to execute some queries""" + if self.pool is None: + self._threaddata.pool = self.repo._get_pool() + try: + self._threaddata.pool.pool_set(self) + except: + self.repo._free_pool(self.pool) + self._threaddata.pool = None + raise + return self._threaddata.pool + + def reset_pool(self): + """the session has no longer using its pool, at least for some time + """ + # pool may be none if no operation has been done since last commit + # or rollback + if self.pool is not None and self.mode == 'read': + # even in read mode, we must release the current transaction + self.repo._free_pool(self.pool) + self.pool.pool_reset(self) + self._threaddata.pool = None + + def system_sql(self, sql, args=None): + """return a sql cursor on the system database""" + if not sql.split(None, 1)[0].upper() == 'SELECT': + self.mode = 'write' + cursor = self.pool['system'] + self.pool.source('system').doexec(cursor, sql, args) + return cursor + + def actual_session(self): + """return the original parent session if any, else self""" + return self + + # shared data handling ################################################### + + def get_shared_data(self, key, default=None, pop=False): + """return value associated to `key` in session data""" + if pop: + return self.data.pop(key, default) + else: + return self.data.get(key, default) + + def set_shared_data(self, key, value, querydata=False): + """set value associated to `key` in session data""" + if querydata: + self.set_query_data(key, value) + else: + self.data[key] = value + + # request interface ####################################################### + + def set_entity_cache(self, entity): + # no entity cache in the server, too high risk of inconsistency + # between pre/post hooks + pass + + def entity_cache(self, eid): + raise KeyError(eid) + + def base_url(self): + return self.repo.config['base-url'] or u'' + + def from_controller(self): + """return the id (string) of the controller issuing the request (no + sense here, always return 'view') + """ + return 'view' + + def source_defs(self): + return self.repo.source_defs() + + def describe(self, eid): + """return a tuple (type, sourceuri, extid) for the entity with id """ + return self.repo.type_and_source_from_eid(eid, self) + + # db-api like interface ################################################### + + def source_from_eid(self, eid): + """return the source where the entity with id is located""" + return self.repo.source_from_eid(eid, self) + + def decorate_rset(self, rset, propagate=False): + rset.vreg = self.vreg + rset.req = propagate and self or self.actual_session() + return rset + + @property + def super_session(self): + try: + csession = self._threaddata.childsession + except AttributeError: + if self.is_super_session: + csession = self + else: + csession = ChildSession(self) + self._threaddata.childsession = csession + # need shared pool set + self.set_pool() + return csession + + def unsafe_execute(self, rql, kwargs=None, eid_key=None, build_descr=False, + propagate=False): + """like .execute but with security checking disabled (this method is + internal to the server, it's not part of the db-api) + + if `propagate` is true, the super_session will be attached to the result + set instead of the parent session, hence further query done through + entities fetched from this result set will bypass security as well + """ + return self.super_session.execute(rql, kwargs, eid_key, build_descr, + propagate) + + @property + def cursor(self): + """return a rql cursor""" + return self + + def execute(self, rql, kwargs=None, eid_key=None, build_descr=True, + propagate=False): + """db-api like method directly linked to the querier execute method + + Becare that unlike actual cursor.execute, `build_descr` default to + false + """ + rset = self._execute(self, rql, kwargs, eid_key, build_descr) + return self.decorate_rset(rset, propagate) + + def commit(self, reset_pool=True): + """commit the current session's transaction""" + if self.pool is None: + assert not self.pending_operations + self._query_data.clear() + self._touch() + return + if self.commit_state: + return + # on rollback, an operation should have the following state + # information: + # - processed by the precommit/commit event or not + # - if processed, is it the failed operation + try: + for trstate in ('precommit', 'commit'): + processed = [] + self.commit_state = trstate + try: + while self.pending_operations: + operation = self.pending_operations.pop(0) + operation.processed = trstate + processed.append(operation) + operation.handle_event('%s_event' % trstate) + self.pending_operations[:] = processed + self.debug('%s session %s done', trstate, self.id) + except: + self.exception('error while %sing', trstate) + operation.failed = True + for operation in processed: + operation.handle_event('revert%s_event' % trstate) + self.rollback(reset_pool) + raise + self.pool.commit() + finally: + self._touch() + self.commit_state = None + self.pending_operations[:] = [] + self._query_data.clear() + if reset_pool: + self.reset_pool() + + def rollback(self, reset_pool=True): + """rollback the current session's transaction""" + if self.pool is None: + assert not self.pending_operations + self._query_data.clear() + self._touch() + return + try: + while self.pending_operations: + try: + operation = self.pending_operations.pop(0) + operation.handle_event('rollback_event') + except: + self.critical('rollback error', exc_info=sys.exc_info()) + continue + self.pool.rollback() + finally: + self._touch() + self.pending_operations[:] = [] + self._query_data.clear() + if reset_pool: + self.reset_pool() + + def close(self): + """do not close pool on session close, since they are shared now""" + self.rollback() + + # transaction data/operations management ################################## + + def add_query_data(self, key, value): + self._query_data.setdefault(key, []).append(value) + + def set_query_data(self, key, value): + self._query_data[key] = value + + def query_data(self, key, default=None, setdefault=False, pop=False): + if setdefault: + assert not pop + return self._query_data.setdefault(key, default) + if pop: + return self._query_data.pop(key, default) + else: + return self._query_data.get(key, default) + + def add_operation(self, operation, index=None): + """add an observer""" + assert self.commit_state != 'commit' + if index is not None: + self.pending_operations.insert(index, operation) + else: + self.pending_operations.append(operation) + + # querier helpers ######################################################### + + def build_description(self, rqlst, args, result): + """build a description for a given result""" + if len(rqlst.children) == 1 and len(rqlst.children[0].solutions) == 1: + # easy, all lines are identical + selected = rqlst.children[0].selection + solution = rqlst.children[0].solutions[0] + description = _make_description(selected, args, solution) + return [tuple(description)] * len(result) + # hard, delegate the work :o) + return self.manual_build_descr(rqlst, args, result) + + def manual_build_descr(self, rqlst, args, result): + """build a description for a given result by analysing each row + + XXX could probably be done more efficiently during execution of query + """ + # not so easy, looks for variable which changes from one solution + # to another + unstables = rqlst.get_variable_variables() + basedescription = [] + todetermine = [] + selected = rqlst.children[0].selection # sample selection + for i, term in enumerate(selected): + if isinstance(term, Function) and term.descr().rtype is not None: + basedescription.append(term.get_type(term.descr().rtype, args)) + continue + for vref in term.get_nodes(VariableRef): + if vref.name in unstables: + basedescription.append(None) + todetermine.append( (i, is_final(rqlst, vref.variable, args)) ) + break + else: + # sample etype + etype = rqlst.children[0].solutions[0] + basedescription.append(term.get_type(etype, args)) + if not todetermine: + return [tuple(basedescription)] * len(result) + return self._build_descr(result, basedescription, todetermine) + + def _build_descr(self, result, basedescription, todetermine): + description = [] + etype_from_eid = self.describe + for row in result: + row_descr = basedescription + for index, isfinal in todetermine: + value = row[index] + if value is None: + # None value inserted by an outer join, no type + row_descr[index] = None + continue + if isfinal: + row_descr[index] = etype_from_pyobj(value) + else: + row_descr[index] = etype_from_eid(value)[0] + description.append(tuple(row_descr)) + return description + + +class ChildSession(Session): + """child (or internal) session are used to hijack the security system + """ + cnxtype = 'inmemory' + + def __init__(self, parent_session): + self.id = None + self.is_internal_session = False + self.is_super_session = True + # session which has created this one + self.parent_session = parent_session + self.user = InternalManager() + self.repo = parent_session.repo + self.vreg = parent_session.vreg + self.data = parent_session.data + self.encoding = parent_session.encoding + self.lang = parent_session.lang + self._ = self.__ = parent_session._ + # short cut to querier .execute method + self._execute = self.repo.querier.execute + + @property + def super_session(self): + return self + + def get_mode(self): + return self.parent_session.mode + def set_mode(self, value): + self.parent_session.set_mode(value) + mode = property(get_mode, set_mode) + + def get_commit_state(self): + return self.parent_session.commit_state + def set_commit_state(self, value): + self.parent_session.set_commit_state(value) + commit_state = property(get_commit_state, set_commit_state) + + @property + def pool(self): + return self.parent_session.pool + @property + def pending_operations(self): + return self.parent_session.pending_operations + @property + def _query_data(self): + return self.parent_session._query_data + + def set_pool(self): + """the session need a pool to execute some queries""" + self.parent_session.set_pool() + + def reset_pool(self): + """the session has no longer using its pool, at least for some time + """ + self.parent_session.reset_pool() + + def actual_session(self): + """return the original parent session if any, else self""" + return self.parent_session + + def commit(self, reset_pool=True): + """commit the current session's transaction""" + self.parent_session.commit(reset_pool) + + def rollback(self, reset_pool=True): + """rollback the current session's transaction""" + self.parent_session.rollback(reset_pool) + + def close(self): + """do not close pool on session close, since they are shared now""" + self.rollback() + + def user_data(self): + """returns a dictionnary with this user's information""" + return self.parent_session.user_data() + + +class InternalSession(Session): + """special session created internaly by the repository""" + + def __init__(self, repo, cnxprops=None): + super(InternalSession, self).__init__(_IMANAGER, repo, cnxprops, + _id='internal') + self.cnxtype = 'inmemory' + self.is_internal_session = True + self.is_super_session = True + + @property + def super_session(self): + return self + + +class InternalManager(object): + """a manager user with all access rights used internally for task such as + bootstrapping the repository or creating regular users according to + repository content + """ + def __init__(self): + self.eid = -1 + self.login = u'__internal_manager__' + self.properties = {} + + def matching_groups(self, groups): + return 1 + + def is_in_group(self, group): + return True + + def owns(self, eid): + return True + + def has_permission(self, pname, contexteid=None): + return True + + def property_value(self, key): + if key == 'ui.language': + return 'en' + return None + +_IMANAGER= InternalManager() + +from logging import getLogger +from cubicweb import set_log_methods +set_log_methods(Session, getLogger('cubicweb.session'))