# HG changeset patch # User Julien Cristau # Date 1380889730 -7200 # Node ID 0dcc68dd8458f0ae236108400bc73796ea4c9648 # Parent c027ed79f1cecd5fac202e8dc3b1f98fda7a17b7# Parent fdd7f614ca2a1b1343bd9fe9b6e7556d24fc46a6 merge 3.17.8 into default diff -r fdd7f614ca2a -r 0dcc68dd8458 doc/3.18.rst --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/doc/3.18.rst Fri Oct 04 14:28:50 2013 +0200 @@ -0,0 +1,26 @@ +What's new in CubicWeb 3.18? +============================ + +New functionalities +-------------------- + +* add a security debugging tool + (see `#2920304 `_) + + +API changes +----------- + + + +Deprecation +--------------------- + + + +Deprecated Code Drops +---------------------- + +* ``ldapuser`` have been dropped; use ``ldapfeed`` now + (see `#2936496 `_) + diff -r fdd7f614ca2a -r 0dcc68dd8458 doc/4.0.rst --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/doc/4.0.rst Fri Oct 04 14:28:50 2013 +0200 @@ -0,0 +1,8 @@ +What's new in CubicWeb 4.0? +============================ + +Deprecated Code Drops +---------------------- + +* The ldapuser source has been dropped. ldapfeed is the only official source + remaining for ldap. diff -r fdd7f614ca2a -r 0dcc68dd8458 etwist/request.py --- a/etwist/request.py Thu Oct 03 15:37:45 2013 +0200 +++ b/etwist/request.py Fri Oct 04 14:28:50 2013 +0200 @@ -24,15 +24,21 @@ class CubicWebTwistedRequestAdapter(CubicWebRequestBase): + """ from twisted .req to cubicweb .form + req.files are put into .form[] + """ def __init__(self, req, vreg, https): self._twreq = req super(CubicWebTwistedRequestAdapter, self).__init__( vreg, https, req.args, headers=req.received_headers) - for key, (name, stream) in req.files.iteritems(): - if name is None: - self.form[key] = (name, stream) - else: - self.form[key] = (unicode(name, self.encoding), stream) + for key, name_stream_list in req.files.iteritems(): + for name, stream in name_stream_list: + if name is not None: + name = unicode(name, self.encoding) + self.form.setdefault(key, []).append((name, stream)) + # 3.16.4 backward compat + if len(self.form[key]) == 1: + self.form[key] = self.form[key][0] self.content = self._twreq.content # stream def http_method(self): diff -r fdd7f614ca2a -r 0dcc68dd8458 etwist/server.py --- a/etwist/server.py Thu Oct 03 15:37:45 2013 +0200 +++ b/etwist/server.py Fri Oct 04 14:28:50 2013 +0200 @@ -244,7 +244,6 @@ self._do_process_multipart = True self.process() - @monkeypatch(http.Request) def process_multipart(self): if not self._do_process_multipart: @@ -254,16 +253,17 @@ keep_blank_values=1, strict_parsing=1) for key in form: - value = form[key] - if isinstance(value, list): - self.args[key] = [v.value for v in value] - elif value.filename: - if value.done != -1: # -1 is transfer has been interrupted - self.files[key] = (value.filename, value.file) + values = form[key] + if not isinstance(values, list): + values = [values] + for value in values: + if value.filename: + if value.done != -1: # -1 is transfer has been interrupted + self.files.setdefault(key, []).append((value.filename, value.file)) + else: + self.files.setdefault(key, []).append((None, None)) else: - self.files[key] = (None, None) - else: - self.args[key] = value.value + self.args.setdefault(key, []).append(value.value) from logging import getLogger from cubicweb import set_log_methods diff -r fdd7f614ca2a -r 0dcc68dd8458 schema.py --- a/schema.py Thu Oct 03 15:37:45 2013 +0200 +++ b/schema.py Fri Oct 04 14:28:50 2013 +0200 @@ -44,6 +44,15 @@ import cubicweb from cubicweb import ETYPE_NAME_MAP, ValidationError, Unauthorized +try: + from cubicweb import server +except ImportError: + # We need to lookup DEBUG from there, + # however a pure dbapi client may not have it. + class server(object): pass + server.DEBUG = False + + PURE_VIRTUAL_RTYPES = set(('identity', 'has_text',)) VIRTUAL_RTYPES = set(('eid', 'identity', 'has_text',)) @@ -268,13 +277,25 @@ return False PermissionMixIn.has_perm = has_perm + def check_perm(self, _cw, action, **kwargs): # NB: _cw may be a server transaction or a request object. # # check user is in an allowed group, if so that's enough internal # transactions should always stop there + DBG = False + if server.DEBUG & server.DBG_SEC: + if action in server._SECURITY_CAPS: + _self_str = str(self) + if server._SECURITY_ITEMS: + if any(item in _self_str for item in server._SECURITY_ITEMS): + DBG = True + else: + DBG = True groups = self.get_groups(action) if _cw.user.matching_groups(groups): + if DBG: + print 'check_perm: %r %r: user matches %s' % (action, _self_str, groups) return # if 'owners' in allowed groups, check if the user actually owns this # object, if so that's enough @@ -284,8 +305,15 @@ if 'owners' in groups and ( kwargs.get('creating') or ('eid' in kwargs and _cw.user.owns(kwargs['eid']))): + if DBG: + print ('check_perm: %r %r: user is owner or creation time' % + (action, _self_str)) return # else if there is some rql expressions, check them + if DBG: + print ('check_perm: %r %r %s' % + (action, _self_str, [(rqlexpr, kwargs, rqlexpr.check(_cw, **kwargs)) + for rqlexpr in self.get_rqlexprs(action)])) if any(rqlexpr.check(_cw, **kwargs) for rqlexpr in self.get_rqlexprs(action)): return diff -r fdd7f614ca2a -r 0dcc68dd8458 server/__init__.py --- a/server/__init__.py Thu Oct 03 15:37:45 2013 +0200 +++ b/server/__init__.py Fri Oct 04 14:28:50 2013 +0200 @@ -26,6 +26,7 @@ import sys from os.path import join, exists from glob import glob +from contextlib import contextmanager from logilab.common.modutils import LazyObject from logilab.common.textutils import splitstrip @@ -80,14 +81,57 @@ DBG_HOOKS = 16 #: operations DBG_OPS = 32 +#: security +DBG_SEC = 64 #: more verbosity -DBG_MORE = 64 +DBG_MORE = 128 #: all level enabled -DBG_ALL = DBG_RQL + DBG_SQL + DBG_REPO + DBG_MS + DBG_HOOKS + DBG_OPS + DBG_MORE +DBG_ALL = DBG_RQL + DBG_SQL + DBG_REPO + DBG_MS + DBG_HOOKS + DBG_OPS + DBG_SEC + DBG_MORE + +_SECURITY_ITEMS = [] +_SECURITY_CAPS = ['read', 'add', 'update', 'delete'] #: current debug mode DEBUG = 0 +@contextmanager +def tunesecurity(items=(), capabilities=()): + """Context manager to use in conjunction with DBG_SEC. + + This allows some tuning of: + * the monitored capabilities ('read', 'add', ....) + * the object being checked by the security checkers + + When no item is given, all of them will be watched. + By default all capabilities are monitored, unless specified. + + Example use:: + + from cubicweb.server import debugged, DBG_SEC, tunesecurity + with debugged(DBG_SEC): + with tunesecurity(items=('Elephant', 'trumps'), + capabilities=('update', 'delete')): + babar.cw_set(trumps=celeste) + flore.cw_delete() + + ==> + + check_perm: 'update' 'relation Elephant.trumps.Elephant' + [(ERQLExpression(Any X WHERE U has_update_permission X, X eid %(x)s, U eid %(u)s), + {'eid': 2167}, True)] + check_perm: 'delete' 'Elephant' + [(ERQLExpression(Any X WHERE U has_delete_permission X, X eid %(x)s, U eid %(u)s), + {'eid': 2168}, True)] + + """ + olditems = _SECURITY_ITEMS[:] + _SECURITY_ITEMS.extend(list(items)) + oldactions = _SECURITY_CAPS[:] + _SECURITY_CAPS[:] = capabilities + yield + _SECURITY_ITEMS[:] = olditems + _SECURITY_CAPS[:] = oldactions + def set_debug(debugmode): """change the repository debugging mode""" global DEBUG @@ -305,7 +349,6 @@ SOURCE_TYPES = {'native': LazyObject('cubicweb.server.sources.native', 'NativeSQLSource'), 'datafeed': LazyObject('cubicweb.server.sources.datafeed', 'DataFeedSource'), 'ldapfeed': LazyObject('cubicweb.server.sources.ldapfeed', 'LDAPFeedSource'), - 'ldapuser': LazyObject('cubicweb.server.sources.ldapuser', 'LDAPUserSource'), 'pyrorql': LazyObject('cubicweb.server.sources.pyrorql', 'PyroRQLSource'), 'zmqrql': LazyObject('cubicweb.server.sources.zmqrql', 'ZMQRQLSource'), } diff -r fdd7f614ca2a -r 0dcc68dd8458 server/sources/ldapuser.py --- a/server/sources/ldapuser.py Thu Oct 03 15:37:45 2013 +0200 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,551 +0,0 @@ -# copyright 2003-2013 LOGILAB S.A. (Paris, FRANCE), all rights reserved. -# contact http://www.logilab.fr/ -- mailto:contact@logilab.fr -# -# This file is part of CubicWeb. -# -# CubicWeb is free software: you can redistribute it and/or modify it under the -# terms of the GNU Lesser General Public License as published by the Free -# Software Foundation, either version 2.1 of the License, or (at your option) -# any later version. -# -# CubicWeb is distributed in the hope that it will be useful, but WITHOUT -# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS -# FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more -# details. -# -# You should have received a copy of the GNU Lesser General Public License along -# with CubicWeb. If not, see . -"""cubicweb ldap user source - -this source is for now limited to a read-only CWUser source -""" -from __future__ import division, with_statement -from base64 import b64decode - -import ldap -from ldap.filter import escape_filter_chars - -from rql.nodes import Relation, VariableRef, Constant, Function - -import warnings -from cubicweb import UnknownEid, RepositoryError -from cubicweb.server import ldaputils -from cubicweb.server.utils import cartesian_product -from cubicweb.server.sources import (AbstractSource, TrFunc, GlobTrFunc, - TimedCache) - -# search scopes -BASE = ldap.SCOPE_BASE -ONELEVEL = ldap.SCOPE_ONELEVEL -SUBTREE = ldap.SCOPE_SUBTREE - -# map ldap protocol to their standard port -PROTO_PORT = {'ldap': 389, - 'ldaps': 636, - 'ldapi': None, - } - - -# module is lazily imported -warnings.warn('Imminent drop of ldapuser. Switch to ldapfeed now!', - DeprecationWarning) - - -class LDAPUserSource(ldaputils.LDAPSourceMixIn, AbstractSource): - """LDAP read-only CWUser source""" - support_entities = {'CWUser': False} - - options = ldaputils.LDAPSourceMixIn.options + ( - - ('synchronization-interval', - {'type' : 'time', - 'default': '1d', - 'help': 'interval between synchronization with the ldap \ -directory (default to once a day).', - 'group': 'ldap-source', 'level': 3, - }), - ('cache-life-time', - {'type' : 'time', - 'default': '2h', - 'help': 'life time of query cache (default to two hours).', - 'group': 'ldap-source', 'level': 3, - }), - - ) - - def update_config(self, source_entity, typedconfig): - """update configuration from source entity. `typedconfig` is config - properly typed with defaults set - """ - super(LDAPUserSource, self).update_config(source_entity, typedconfig) - self._interval = typedconfig['synchronization-interval'] - self._cache_ttl = max(71, typedconfig['cache-life-time']) - self.reset_caches() - # XXX copy from datafeed source - if source_entity is not None: - self._entity_update(source_entity) - self.config = typedconfig - # /end XXX - - def reset_caches(self): - """method called during test to reset potential source caches""" - self._cache = {} - self._query_cache = TimedCache(self._cache_ttl) - - def init(self, activated, source_entity): - """method called by the repository once ready to handle request""" - super(LDAPUserSource, self).init(activated, source_entity) - if activated: - self.info('ldap init') - # set minimum period of 5min 1s (the additional second is to - # minimize resonnance effet) - if self.user_rev_attrs['email']: - self.repo.looping_task(max(301, self._interval), self.synchronize) - self.repo.looping_task(self._cache_ttl // 10, - self._query_cache.clear_expired) - - def synchronize(self): - with self.repo.internal_session() as session: - self.pull_data(session) - - def pull_data(self, session, force=False, raise_on_error=False): - """synchronize content known by this repository with content in the - external repository - """ - self.info('synchronizing ldap source %s', self.uri) - ldap_emailattr = self.user_rev_attrs['email'] - assert ldap_emailattr - execute = session.execute - cursor = session.system_sql("SELECT eid, extid FROM entities WHERE " - "source='%s'" % self.uri) - for eid, b64extid in cursor.fetchall(): - extid = b64decode(b64extid) - self.debug('ldap eid %s', eid) - # if no result found, _search automatically delete entity information - res = self._search(session, extid, BASE) - self.debug('ldap search %s', res) - if res: - ldapemailaddr = res[0].get(ldap_emailattr) - if ldapemailaddr: - if isinstance(ldapemailaddr, list): - ldapemailaddr = ldapemailaddr[0] # XXX consider only the first email in the list - rset = execute('Any X,A WHERE ' - 'X address A, U use_email X, U eid %(u)s', - {'u': eid}) - ldapemailaddr = unicode(ldapemailaddr) - for emaileid, emailaddr, in rset: - if emailaddr == ldapemailaddr: - break - else: - self.debug('updating email address of user %s to %s', - extid, ldapemailaddr) - emailrset = execute('EmailAddress A WHERE A address %(addr)s', - {'addr': ldapemailaddr}) - if emailrset: - execute('SET U use_email X WHERE ' - 'X eid %(x)s, U eid %(u)s', - {'x': emailrset[0][0], 'u': eid}) - elif rset: - if not execute('SET X address %(addr)s WHERE ' - 'U primary_email X, U eid %(u)s', - {'addr': ldapemailaddr, 'u': eid}): - execute('SET X address %(addr)s WHERE ' - 'X eid %(x)s', - {'addr': ldapemailaddr, 'x': rset[0][0]}) - else: - # no email found, create it - _insert_email(session, ldapemailaddr, eid) - session.commit() - - def ldap_name(self, var): - if var.stinfo['relations']: - relname = iter(var.stinfo['relations']).next().r_type - return self.user_rev_attrs.get(relname) - return None - - def prepare_columns(self, mainvars, rqlst): - """return two list describing how to build the final results - from the result of an ldap search (ie a list of dictionary) - """ - columns = [] - global_transforms = [] - for i, term in enumerate(rqlst.selection): - if isinstance(term, Constant): - columns.append(term) - continue - if isinstance(term, Function): # LOWER, UPPER, COUNT... - var = term.get_nodes(VariableRef)[0] - var = var.variable - try: - mainvar = var.stinfo['attrvar'].name - except AttributeError: # no attrvar set - mainvar = var.name - assert mainvar in mainvars - trname = term.name - ldapname = self.ldap_name(var) - if trname in ('COUNT', 'MIN', 'MAX', 'SUM'): - global_transforms.append(GlobTrFunc(trname, i, ldapname)) - columns.append((mainvar, ldapname)) - continue - if trname in ('LOWER', 'UPPER'): - columns.append((mainvar, TrFunc(trname, i, ldapname))) - continue - raise NotImplementedError('no support for %s function' % trname) - if term.name in mainvars: - columns.append((term.name, 'dn')) - continue - var = term.variable - mainvar = var.stinfo['attrvar'].name - columns.append((mainvar, self.ldap_name(var))) - #else: - # # probably a bug in rql splitting if we arrive here - # raise NotImplementedError - return columns, global_transforms - - def syntax_tree_search(self, session, union, - args=None, cachekey=None, varmap=None, debug=0): - """return result from this source for a rql query (actually from a rql - syntax tree and a solution dictionary mapping each used variable to a - possible type). If cachekey is given, the query necessary to fetch the - results (but not the results themselves) may be cached using this key. - """ - self.debug('ldap syntax tree search') - # XXX not handled : transform/aggregat function, join on multiple users... - assert len(union.children) == 1, 'union not supported' - rqlst = union.children[0] - assert not rqlst.with_, 'subquery not supported' - rqlkey = rqlst.as_string(kwargs=args) - try: - results = self._query_cache[rqlkey] - except KeyError: - try: - results = self.rqlst_search(session, rqlst, args) - self._query_cache[rqlkey] = results - except ldap.SERVER_DOWN: - # cant connect to server - msg = session._("can't connect to source %s, some data may be missing") - session.set_shared_data('sources_error', msg % self.uri, txdata=True) - return [] - return results - - def rqlst_search(self, session, rqlst, args): - mainvars = [] - for varname in rqlst.defined_vars: - for sol in rqlst.solutions: - if sol[varname] == 'CWUser': - mainvars.append(varname) - break - assert mainvars, rqlst - columns, globtransforms = self.prepare_columns(mainvars, rqlst) - eidfilters = [lambda x: x > 0] - allresults = [] - generator = RQL2LDAPFilter(self, session, args, mainvars) - for mainvar in mainvars: - # handle restriction - try: - eidfilters_, ldapfilter = generator.generate(rqlst, mainvar) - except GotDN as ex: - assert ex.dn, 'no dn!' - try: - res = [self._cache[ex.dn]] - except KeyError: - res = self._search(session, ex.dn, BASE) - except UnknownEid as ex: - # raised when we are looking for the dn of an eid which is not - # coming from this source - res = [] - else: - eidfilters += eidfilters_ - res = self._search(session, self.user_base_dn, - self.user_base_scope, ldapfilter) - allresults.append(res) - # 1. get eid for each dn and filter according to that eid if necessary - for i, res in enumerate(allresults): - filteredres = [] - for resdict in res: - # get sure the entity exists in the system table - eid = self.repo.extid2eid(self, resdict['dn'], 'CWUser', session) - for eidfilter in eidfilters: - if not eidfilter(eid): - break - else: - resdict['eid'] = eid - filteredres.append(resdict) - allresults[i] = filteredres - # 2. merge result for each "mainvar": cartesian product - allresults = cartesian_product(allresults) - # 3. build final result according to column definition - result = [] - for rawline in allresults: - rawline = dict(zip(mainvars, rawline)) - line = [] - for varname, ldapname in columns: - if ldapname is None: - value = None # no mapping available - elif ldapname == 'dn': - value = rawline[varname]['eid'] - elif isinstance(ldapname, Constant): - if ldapname.type == 'Substitute': - value = args[ldapname.value] - else: - value = ldapname.value - elif isinstance(ldapname, TrFunc): - value = ldapname.apply(rawline[varname]) - else: - value = rawline[varname].get(ldapname) - line.append(value) - result.append(line) - for trfunc in globtransforms: - result = trfunc.apply(result) - #print '--> ldap result', result - return result - - def _process_ldap_item(self, dn, iterator): - itemdict = super(LDAPUserSource, self)._process_ldap_item(dn, iterator) - self._cache[dn] = itemdict - return itemdict - - def _process_no_such_object(self, session, dn): - eid = self.repo.extid2eid(self, dn, 'CWUser', session, insert=False) - if eid: - self.warning('deleting ldap user with eid %s and dn %s', eid, dn) - entity = session.entity_from_eid(eid, 'CWUser') - self.repo.delete_info(session, entity, self.uri) - self.reset_caches() - - def before_entity_insertion(self, session, lid, etype, eid, sourceparams): - """called by the repository when an eid has been attributed for an - entity stored here but the entity has not been inserted in the system - table yet. - - This method must return the an Entity instance representation of this - entity. - """ - self.debug('ldap before entity insertion') - entity = super(LDAPUserSource, self).before_entity_insertion( - session, lid, etype, eid, sourceparams) - res = self._search(session, lid, BASE)[0] - for attr in entity.e_schema.indexable_attributes(): - entity.cw_edited[attr] = res[self.user_rev_attrs[attr]] - return entity - - def after_entity_insertion(self, session, lid, entity, sourceparams): - """called by the repository after an entity stored here has been - inserted in the system table. - """ - self.debug('ldap after entity insertion') - super(LDAPUserSource, self).after_entity_insertion( - session, lid, entity, sourceparams) - for group in self.user_default_groups: - session.execute('SET X in_group G WHERE X eid %(x)s, G name %(group)s', - {'x': entity.eid, 'group': group}) - # search for existant email first - try: - # lid = dn - emailaddr = self._cache[lid][self.user_rev_attrs['email']] - except KeyError: - return - if isinstance(emailaddr, list): - emailaddr = emailaddr[0] # XXX consider only the first email in the list - rset = session.execute('EmailAddress X WHERE X address %(addr)s', - {'addr': emailaddr}) - if rset: - session.execute('SET U primary_email X WHERE U eid %(u)s, X eid %(x)s', - {'x': rset[0][0], 'u': entity.eid}) - else: - # not found, create it - _insert_email(session, emailaddr, entity.eid) - - def update_entity(self, session, entity): - """replace an entity in the source""" - raise RepositoryError('this source is read only') - - def delete_entity(self, session, entity): - """delete an entity from the source""" - raise RepositoryError('this source is read only') - - -def _insert_email(session, emailaddr, ueid): - session.execute('INSERT EmailAddress X: X address %(addr)s, U primary_email X ' - 'WHERE U eid %(x)s', {'addr': emailaddr, 'x': ueid}) - -class GotDN(Exception): - """exception used when a dn localizing the searched user has been found""" - def __init__(self, dn): - self.dn = dn - - -class RQL2LDAPFilter(object): - """generate an LDAP filter for a rql query""" - def __init__(self, source, session, args=None, mainvars=()): - self.source = source - self.repo = source.repo - self._ldap_attrs = source.user_rev_attrs - self._base_filters = source.base_filters - self._session = session - if args is None: - args = {} - self._args = args - self.mainvars = mainvars - - def generate(self, selection, mainvarname): - self._filters = res = self._base_filters[:] - self._mainvarname = mainvarname - self._eidfilters = [] - self._done_not = set() - restriction = selection.where - if isinstance(restriction, Relation): - # only a single relation, need to append result here (no AND/OR) - filter = restriction.accept(self) - if filter is not None: - res.append(filter) - elif restriction: - restriction.accept(self) - if len(res) > 1: - return self._eidfilters, '(&%s)' % ''.join(res) - return self._eidfilters, res[0] - - def visit_and(self, et): - """generate filter for a AND subtree""" - for c in et.children: - part = c.accept(self) - if part: - self._filters.append(part) - - def visit_or(self, ou): - """generate filter for a OR subtree""" - res = [] - for c in ou.children: - part = c.accept(self) - if part: - res.append(part) - if res: - if len(res) > 1: - part = '(|%s)' % ''.join(res) - else: - part = res[0] - self._filters.append(part) - - def visit_not(self, node): - """generate filter for a OR subtree""" - part = node.children[0].accept(self) - if part: - self._filters.append('(!(%s))'% part) - - def visit_relation(self, relation): - """generate filter for a relation""" - rtype = relation.r_type - # don't care of type constraint statement (i.e. relation_type = 'is') - if rtype == 'is': - return '' - lhs, rhs = relation.get_parts() - # attribute relation - if self.source.schema.rschema(rtype).final: - # dunno what to do here, don't pretend anything else - if lhs.name != self._mainvarname: - if lhs.name in self.mainvars: - # XXX check we don't have variable as rhs - return - raise NotImplementedError - rhs_vars = rhs.get_nodes(VariableRef) - if rhs_vars: - if len(rhs_vars) > 1: - raise NotImplementedError - # selected variable, nothing to do here - return - # no variables in the RHS - if isinstance(rhs.children[0], Function): - res = rhs.children[0].accept(self) - elif rtype != 'has_text': - res = self._visit_attribute_relation(relation) - else: - raise NotImplementedError(relation) - # regular relation XXX todo: in_group - else: - raise NotImplementedError(relation) - return res - - def _visit_attribute_relation(self, relation): - """generate filter for an attribute relation""" - lhs, rhs = relation.get_parts() - lhsvar = lhs.variable - if relation.r_type == 'eid': - # XXX hack - # skip comparison sign - eid = int(rhs.children[0].accept(self)) - if relation.neged(strict=True): - self._done_not.add(relation.parent) - self._eidfilters.append(lambda x: not x == eid) - return - if rhs.operator != '=': - filter = {'>': lambda x: x > eid, - '>=': lambda x: x >= eid, - '<': lambda x: x < eid, - '<=': lambda x: x <= eid, - }[rhs.operator] - self._eidfilters.append(filter) - return - dn = self.repo.eid2extid(self.source, eid, self._session) - raise GotDN(dn) - try: - filter = '(%s%s)' % (self._ldap_attrs[relation.r_type], - rhs.accept(self)) - except KeyError: - # unsupported attribute - self.source.warning('%s source can\'t handle relation %s, no ' - 'results will be returned from this source', - self.source.uri, relation) - raise UnknownEid # trick to return no result - return filter - - def visit_comparison(self, cmp): - """generate filter for a comparaison""" - return '%s%s'% (cmp.operator, cmp.children[0].accept(self)) - - def visit_mathexpression(self, mexpr): - """generate filter for a mathematic expression""" - raise NotImplementedError - - def visit_function(self, function): - """generate filter name for a function""" - if function.name == 'IN': - return self.visit_in(function) - raise NotImplementedError - - def visit_in(self, function): - grandpapa = function.parent.parent - ldapattr = self._ldap_attrs[grandpapa.r_type] - res = [] - for c in function.children: - part = c.accept(self) - if part: - res.append(part) - if res: - if len(res) > 1: - part = '(|%s)' % ''.join('(%s=%s)' % (ldapattr, v) for v in res) - else: - part = '(%s=%s)' % (ldapattr, res[0]) - return part - - def visit_constant(self, constant): - """generate filter name for a constant""" - value = constant.value - if constant.type is None: - raise NotImplementedError - if constant.type == 'Date': - raise NotImplementedError - #value = self.keyword_map[value]() - elif constant.type == 'Substitute': - value = self._args[constant.value] - else: - value = constant.value - if isinstance(value, unicode): - value = value.encode('utf8') - else: - value = str(value) - return escape_filter_chars(value) - - def visit_variableref(self, variableref): - """get the sql name for a variable reference""" - pass - diff -r fdd7f614ca2a -r 0dcc68dd8458 server/test/unittest_ldapsource.py --- a/server/test/unittest_ldapsource.py Thu Oct 03 15:37:45 2013 +0200 +++ b/server/test/unittest_ldapsource.py Fri Oct 04 14:28:50 2013 +0200 @@ -33,7 +33,6 @@ from cubicweb.devtools.httptest import get_available_port from cubicweb.devtools import get_test_db_handler -from cubicweb.server.sources.ldapuser import GlobTrFunc, UnknownEid, RQL2LDAPFilter CONFIG_LDAPFEED = u''' user-base-dn=ou=People,dc=cubicweb,dc=test @@ -453,386 +452,6 @@ self.setUpClass() -class LDAPUserSourceTC(LDAPFeedTestBase): - test_db_id = 'ldap-user' - tags = CubicWebTC.tags | Tags(('ldap')) - - @classmethod - def pre_setup_database(cls, session, config): - session.create_entity('CWSource', name=u'ldap', type=u'ldapuser', - url=URL, config=CONFIG_LDAPUSER) - session.commit() - # XXX keep it there - session.execute('CWUser U') - - def setup_database(self): - # XXX a traceback may appear in the logs of the test due to - # the _init_repo method that may fail to connect to the ldap - # source if its URI has changed (from what is stored in the - # database). This TB is NOT a failure or so. - with self.session.repo.internal_session(safe=True) as session: - session.execute('SET S url %(url)s, S config %(conf)s ' - 'WHERE S is CWSource, S name "ldap"', - {"conf": CONFIG_LDAPUSER, 'url': URL} ) - session.commit() - self.pull() - - def assertMetadata(self, entity): - self.assertEqual(entity.creation_date, None) - self.assertEqual(entity.modification_date, None) - - def test_synchronize(self): - source = self.repo.sources_by_uri['ldap'] - source.synchronize() - - def test_base(self): - # check a known one - rset = self.sexecute('CWUser X WHERE X login %(login)s', {'login': 'syt'}) - e = rset.get_entity(0, 0) - self.assertEqual(e.login, 'syt') - e.complete() - self.assertMetadata(e) - self.assertEqual(e.firstname, None) - self.assertEqual(e.surname, None) - self.assertEqual(e.in_group[0].name, 'users') - self.assertEqual(e.owned_by[0].login, 'syt') - self.assertEqual(e.created_by, ()) - addresses = [pe.address for pe in e.use_email] - addresses.sort() - # should habe two element but ldapuser seems buggy. It's going to be dropped anyway. - self.assertEqual(['sylvain.thenault@logilab.fr',], # 'syt@logilab.fr'], - addresses) - self.assertIn(e.primary_email[0].address, - ['sylvain.thenault@logilab.fr', 'syt@logilab.fr']) - # email content should be indexed on the user - rset = self.sexecute('CWUser X WHERE X has_text "thenault"') - self.assertEqual(rset.rows, [[e.eid]]) - - def test_not(self): - eid = self.sexecute('CWUser X WHERE X login %(login)s', {'login': 'syt'})[0][0] - rset = self.sexecute('CWUser X WHERE NOT X eid %s' % eid) - self.assert_(rset) - self.assert_(not eid in (r[0] for r in rset)) - - def test_multiple(self): - seid = self.sexecute('CWUser X WHERE X login %(login)s', {'login': 'syt'})[0][0] - aeid = self.sexecute('CWUser X WHERE X login %(login)s', {'login': 'adim'})[0][0] - rset = self.sexecute('CWUser X, Y WHERE X login %(syt)s, Y login %(adim)s', - {'syt': 'syt', 'adim': 'adim'}) - self.assertEqual(rset.rows, [[seid, aeid]]) - rset = self.sexecute('Any X,Y,L WHERE X login L, X login %(syt)s, Y login %(adim)s', - {'syt': 'syt', 'adim': 'adim'}) - self.assertEqual(rset.rows, [[seid, aeid, 'syt']]) - - def test_in(self): - seid = self.sexecute('CWUser X WHERE X login %(login)s', {'login': 'syt'})[0][0] - aeid = self.sexecute('CWUser X WHERE X login %(login)s', {'login': 'adim'})[0][0] - rset = self.sexecute('Any X,L ORDERBY L WHERE X login IN("%s", "%s"), X login L' % ('syt', 'adim')) - self.assertEqual(rset.rows, [[aeid, 'adim'], [seid, 'syt']]) - - def test_relations(self): - eid = self.sexecute('CWUser X WHERE X login %(login)s', {'login': 'syt'})[0][0] - rset = self.sexecute('Any X,E WHERE X is CWUser, X login L, X primary_email E') - self.assert_(eid in (r[0] for r in rset)) - rset = self.sexecute('Any X,L,E WHERE X is CWUser, X login L, X primary_email E') - self.assert_('syt' in (r[1] for r in rset)) - - def test_count(self): - nbusers = self.sexecute('Any COUNT(X) WHERE X is CWUser')[0][0] - # just check this is a possible number - self.assert_(nbusers > 1, nbusers) - self.assert_(nbusers < 30, nbusers) - - def test_upper(self): - eid = self.sexecute('CWUser X WHERE X login %(login)s', {'login': 'syt'})[0][0] - rset = self.sexecute('Any UPPER(L) WHERE X eid %s, X login L' % eid) - self.assertEqual(rset[0][0], 'syt'.upper()) - - def test_unknown_attr(self): - eid = self.sexecute('CWUser X WHERE X login %(login)s', {'login': 'syt'})[0][0] - rset = self.sexecute('Any L,C,M WHERE X eid %s, X login L, ' - 'X creation_date C, X modification_date M' % eid) - self.assertEqual(rset[0][0], 'syt') - self.assertEqual(rset[0][1], None) - self.assertEqual(rset[0][2], None) - - def test_sort(self): - logins = [l for l, in self.sexecute('Any L ORDERBY L WHERE X login L')] - self.assertEqual(logins, sorted(logins)) - - def test_lower_sort(self): - logins = [l for l, in self.sexecute('Any L ORDERBY lower(L) WHERE X login L')] - self.assertEqual(logins, sorted(logins)) - - def test_or(self): - rset = self.sexecute('DISTINCT Any X WHERE X login %(login)s OR (X in_group G, G name "managers")', - {'login': 'syt'}) - self.assertEqual(len(rset), 2, rset.rows) # syt + admin - - def test_nonregr_set_owned_by(self): - # test that when a user coming from ldap is triggering a transition - # the related TrInfo has correct owner information - self.sexecute('SET X in_group G WHERE X login %(syt)s, G name "managers"', {'syt': 'syt'}) - self.commit() - syt = self.sexecute('CWUser X WHERE X login %(login)s', {'login': 'syt'}).get_entity(0, 0) - self.assertEqual([g.name for g in syt.in_group], ['managers', 'users']) - cnx = self.login('syt', password='syt') - cu = cnx.cursor() - adim = cu.execute('CWUser X WHERE X login %(login)s', {'login': 'adim'}).get_entity(0, 0) - iworkflowable = adim.cw_adapt_to('IWorkflowable') - iworkflowable.fire_transition('deactivate') - try: - cnx.commit() - adim.cw_clear_all_caches() - self.assertEqual(adim.in_state[0].name, 'deactivated') - trinfo = iworkflowable.latest_trinfo() - self.assertEqual(trinfo.owned_by[0].login, 'syt') - # select from_state to skip the user's creation TrInfo - rset = self.sexecute('Any U ORDERBY D DESC WHERE WF wf_info_for X,' - 'WF creation_date D, WF from_state FS,' - 'WF owned_by U?, X eid %(x)s', - {'x': adim.eid}) - self.assertEqual(rset.rows, [[syt.eid]]) - finally: - # restore db state - self.restore_connection() - adim = self.sexecute('CWUser X WHERE X login %(login)s', {'login': 'adim'}).get_entity(0, 0) - adim.cw_adapt_to('IWorkflowable').fire_transition('activate') - self.sexecute('DELETE X in_group G WHERE X login %(syt)s, G name "managers"', {'syt': 'syt'}) - - def test_same_column_names(self): - self.sexecute('Any X, Y WHERE X copain Y, X login "comme", Y login "cochon"') - - def test_multiple_entities_from_different_sources(self): - req = self.request() - self.create_user(req, 'cochon') - self.assertTrue(self.sexecute('Any X,Y WHERE X login %(syt)s, Y login "cochon"', {'syt': 'syt'})) - - def test_exists1(self): - self.session.set_cnxset() - self.session.create_entity('CWGroup', name=u'bougloup1') - self.session.create_entity('CWGroup', name=u'bougloup2') - self.sexecute('SET U in_group G WHERE G name ~= "bougloup%", U login "admin"') - self.sexecute('SET U in_group G WHERE G name = "bougloup1", U login %(syt)s', {'syt': 'syt'}) - rset = self.sexecute('Any L,SN ORDERBY L WHERE X in_state S, ' - 'S name SN, X login L, EXISTS(X in_group G, G name ~= "bougloup%")') - self.assertEqual(rset.rows, [['admin', 'activated'], ['syt', 'activated']]) - - def test_exists2(self): - req = self.request() - self.create_user(req, 'comme') - self.create_user(req, 'cochon') - self.sexecute('SET X copain Y WHERE X login "comme", Y login "cochon"') - rset = self.sexecute('Any GN ORDERBY GN WHERE X in_group G, G name GN, ' - '(G name "managers" OR EXISTS(X copain T, T login in ("comme", "cochon")))') - self.assertEqual(rset.rows, [['managers'], ['users']]) - - def test_exists3(self): - req = self.request() - self.create_user(req, 'comme') - self.create_user(req, 'cochon') - self.sexecute('SET X copain Y WHERE X login "comme", Y login "cochon"') - self.assertTrue(self.sexecute('Any X, Y WHERE X copain Y, X login "comme", Y login "cochon"')) - self.sexecute('SET X copain Y WHERE X login %(syt)s, Y login "cochon"', {'syt': 'syt'}) - self.assertTrue(self.sexecute('Any X, Y WHERE X copain Y, X login %(syt)s, Y login "cochon"', {'syt': 'syt'})) - rset = self.sexecute('Any GN,L WHERE X in_group G, X login L, G name GN, G name "managers" ' - 'OR EXISTS(X copain T, T login in ("comme", "cochon"))') - self.assertEqual(sorted(rset.rows), [['managers', 'admin'], ['users', 'comme'], ['users', 'syt']]) - - def test_exists4(self): - req = self.request() - self.create_user(req, 'comme') - self.create_user(req, 'cochon', groups=('users', 'guests')) - self.create_user(req, 'billy') - self.sexecute('SET X copain Y WHERE X login "comme", Y login "cochon"') - self.sexecute('SET X copain Y WHERE X login "cochon", Y login "cochon"') - self.sexecute('SET X copain Y WHERE X login "comme", Y login "billy"') - self.sexecute('SET X copain Y WHERE X login %(syt)s, Y login "billy"', {'syt': 'syt'}) - # search for group name, login where - # CWUser copain with "comme" or "cochon" AND same login as the copain - # OR - # CWUser in_state activated AND not copain with billy - # - # SO we expect everybody but "comme" and "syt" - rset= self.sexecute('Any GN,L WHERE X in_group G, X login L, G name GN, ' - 'EXISTS(X copain T, T login L, T login in ("comme", "cochon")) OR ' - 'EXISTS(X in_state S, S name "activated", NOT X copain T2, T2 login "billy")') - all = self.sexecute('Any GN, L WHERE X in_group G, X login L, G name GN') - all.rows.remove(['users', 'comme']) - all.rows.remove(['users', 'syt']) - self.assertEqual(sorted(rset.rows), sorted(all.rows)) - - def test_exists5(self): - req = self.request() - self.create_user(req, 'comme') - self.create_user(req, 'cochon', groups=('users', 'guests')) - self.create_user(req, 'billy') - self.sexecute('SET X copain Y WHERE X login "comme", Y login "cochon"') - self.sexecute('SET X copain Y WHERE X login "cochon", Y login "cochon"') - self.sexecute('SET X copain Y WHERE X login "comme", Y login "billy"') - self.sexecute('SET X copain Y WHERE X login %(syt)s, Y login "cochon"', {'syt': 'syt'}) - rset= self.sexecute('Any L WHERE X login L, ' - 'EXISTS(X copain T, T login in ("comme", "cochon")) AND ' - 'NOT EXISTS(X copain T2, T2 login "billy")') - self.assertEqual(sorted(rset.rows), [['cochon'], ['syt']]) - rset= self.sexecute('Any GN,L WHERE X in_group G, X login L, G name GN, ' - 'EXISTS(X copain T, T login in ("comme", "cochon")) AND ' - 'NOT EXISTS(X copain T2, T2 login "billy")') - self.assertEqual(sorted(rset.rows), [['guests', 'cochon'], - ['users', 'cochon'], - ['users', 'syt']]) - - def test_cd_restriction(self): - rset = self.sexecute('CWUser X WHERE X creation_date > "2009-02-01"') - # admin/anon but no ldap user since it doesn't support creation_date - self.assertEqual(sorted(e.login for e in rset.entities()), - ['admin', 'anon']) - - def test_union(self): - afeids = self.sexecute('State X') - ueids = self.sexecute('CWUser X') - rset = self.sexecute('(Any X WHERE X is State) UNION (Any X WHERE X is CWUser)') - self.assertEqual(sorted(r[0] for r in rset.rows), - sorted(r[0] for r in afeids + ueids)) - - def _init_security_test(self): - req = self.request() - self.create_user(req, 'iaminguestsgrouponly', groups=('guests',)) - cnx = self.login('iaminguestsgrouponly') - return cnx.cursor() - - def test_security1(self): - cu = self._init_security_test() - rset = cu.execute('CWUser X WHERE X login %(login)s', {'login': 'syt'}) - self.assertEqual(rset.rows, []) - rset = cu.execute('Any X WHERE X login "iaminguestsgrouponly"') - self.assertEqual(len(rset.rows), 1) - - def test_security2(self): - cu = self._init_security_test() - rset = cu.execute('Any X WHERE X has_text %(syt)s', {'syt': 'syt'}) - self.assertEqual(rset.rows, []) - rset = cu.execute('Any X WHERE X has_text "iaminguestsgrouponly"') - self.assertEqual(len(rset.rows), 1) - - def test_security3(self): - cu = self._init_security_test() - rset = cu.execute('Any F WHERE X has_text %(syt)s, X firstname F', {'syt': 'syt'}) - self.assertEqual(rset.rows, []) - rset = cu.execute('Any F WHERE X has_text "iaminguestsgrouponly", X firstname F') - self.assertEqual(rset.rows, [[None]]) - - def test_nonregr1(self): - self.sexecute('Any X,AA ORDERBY AA DESC WHERE E eid %(x)s, E owned_by X, ' - 'X modification_date AA', - {'x': self.session.user.eid}) - - def test_nonregr2(self): - self.sexecute('Any X,L,AA WHERE E eid %(x)s, E owned_by X, ' - 'X login L, X modification_date AA', - {'x': self.session.user.eid}) - - def test_nonregr3(self): - self.sexecute('Any X,AA ORDERBY AA DESC WHERE E eid %(x)s, ' - 'X modification_date AA', - {'x': self.session.user.eid}) - - def test_nonregr4(self): - emaileid = self.sexecute('INSERT EmailAddress X: X address "toto@logilab.org"')[0][0] - self.sexecute('Any X,AA WHERE X use_email Y, Y eid %(x)s, X modification_date AA', - {'x': emaileid}) - - def test_nonregr5(self): - # original jpl query: - # Any X, NOW - CD, P WHERE P is Project, U interested_in P, U is CWUser, - # U login "sthenault", X concerns P, X creation_date CD ORDERBY CD DESC LIMIT 5 - rql = ('Any X, NOW - CD, P ORDERBY CD DESC LIMIT 5 WHERE P bookmarked_by U, ' - 'U login "%s", P is X, X creation_date CD') % self.session.user.login - self.sexecute(rql, )#{'x': }) - - def test_nonregr6(self): - self.sexecute('Any B,U,UL GROUPBY B,U,UL WHERE B created_by U?, B is File ' - 'WITH U,UL BEING (Any U,UL WHERE ME eid %(x)s, (EXISTS(U identity ME) ' - 'OR (EXISTS(U in_group G, G name IN("managers", "staff")))) ' - 'OR (EXISTS(U in_group H, ME in_group H, NOT H name "users")), U login UL, U is CWUser)', - {'x': self.session.user.eid}) - -class GlobTrFuncTC(TestCase): - - def test_count(self): - trfunc = GlobTrFunc('count', 0) - res = trfunc.apply([[1], [2], [3], [4]]) - self.assertEqual(res, [[4]]) - trfunc = GlobTrFunc('count', 1) - res = trfunc.apply([[1, 2], [2, 4], [3, 6], [1, 5]]) - self.assertEqual(res, [[1, 2], [2, 1], [3, 1]]) - - def test_sum(self): - trfunc = GlobTrFunc('sum', 0) - res = trfunc.apply([[1], [2], [3], [4]]) - self.assertEqual(res, [[10]]) - trfunc = GlobTrFunc('sum', 1) - res = trfunc.apply([[1, 2], [2, 4], [3, 6], [1, 5]]) - self.assertEqual(res, [[1, 7], [2, 4], [3, 6]]) - - def test_min(self): - trfunc = GlobTrFunc('min', 0) - res = trfunc.apply([[1], [2], [3], [4]]) - self.assertEqual(res, [[1]]) - trfunc = GlobTrFunc('min', 1) - res = trfunc.apply([[1, 2], [2, 4], [3, 6], [1, 5]]) - self.assertEqual(res, [[1, 2], [2, 4], [3, 6]]) - - def test_max(self): - trfunc = GlobTrFunc('max', 0) - res = trfunc.apply([[1], [2], [3], [4]]) - self.assertEqual(res, [[4]]) - trfunc = GlobTrFunc('max', 1) - res = trfunc.apply([[1, 2], [2, 4], [3, 6], [1, 5]]) - self.assertEqual(res, [[1, 5], [2, 4], [3, 6]]) - - -class RQL2LDAPFilterTC(RQLGeneratorTC): - - tags = RQLGeneratorTC.tags | Tags(('ldap')) - - @property - def schema(self): - """return the application schema""" - return self._schema - - def setUp(self): - self.handler = get_test_db_handler(LDAPUserSourceTC.config) - self.handler.build_db_cache('ldap-rqlgenerator', LDAPUserSourceTC.pre_setup_database) - self.handler.restore_database('ldap-rqlgenerator') - self._repo = repo = self.handler.get_repo() - self._schema = repo.schema - super(RQL2LDAPFilterTC, self).setUp() - ldapsource = repo.sources[-1] - self.cnxset = repo._get_cnxset() - session = mock_object(cnxset=self.cnxset) - self.o = RQL2LDAPFilter(ldapsource, session) - self.ldapclasses = ''.join(ldapsource.base_filters) - - def tearDown(self): - self._repo.turn_repo_off() - super(RQL2LDAPFilterTC, self).tearDown() - - def test_base(self): - rqlst = self._prepare('CWUser X WHERE X login "toto"').children[0] - self.assertEqual(self.o.generate(rqlst, 'X')[1], - '(&%s(uid=toto))' % self.ldapclasses) - - def test_kwargs(self): - rqlst = self._prepare('CWUser X WHERE X login %(x)s').children[0] - self.o._args = {'x': "toto"} - self.assertEqual(self.o.generate(rqlst, 'X')[1], - '(&%s(uid=toto))' % self.ldapclasses) - - def test_get_attr(self): - rqlst = self._prepare('Any X WHERE E firstname X, E eid 12').children[0] - self.assertRaises(UnknownEid, self.o.generate, rqlst, 'E') - if __name__ == '__main__': unittest_main() diff -r fdd7f614ca2a -r 0dcc68dd8458 test/unittest_mail.py --- a/test/unittest_mail.py Thu Oct 03 15:37:45 2013 +0200 +++ b/test/unittest_mail.py Fri Oct 04 14:28:50 2013 +0200 @@ -31,7 +31,7 @@ def getlogin(): - """avoid usinng os.getlogin() because of strange tty / stdin problems + """avoid using os.getlogin() because of strange tty / stdin problems (man 3 getlogin) Another solution would be to use $LOGNAME, $USER or $USERNAME """ diff -r fdd7f614ca2a -r 0dcc68dd8458 web/formfields.py --- a/web/formfields.py Thu Oct 03 15:37:45 2013 +0200 +++ b/web/formfields.py Fri Oct 04 14:28:50 2013 +0200 @@ -760,8 +760,13 @@ # raise UnmodifiedField instead of returning None, since the later # will try to remove already attached file if any raise UnmodifiedField() - # value is a 2-uple (filename, stream) + # value is a 2-uple (filename, stream) or a list of such + # tuples (multiple files) try: + if isinstance(value, list): + value = value[0] + form.warning('mutiple files provided, however ' + 'only the first will be picked') filename, stream = value except ValueError: raise UnmodifiedField() diff -r fdd7f614ca2a -r 0dcc68dd8458 web/test/data/views.py --- a/web/test/data/views.py Thu Oct 03 15:37:45 2013 +0200 +++ b/web/test/data/views.py Fri Oct 04 14:28:50 2013 +0200 @@ -18,6 +18,7 @@ from cubicweb.web import Redirect from cubicweb.web.application import CubicWebPublisher +from cubicweb.web.views.ajaxcontroller import ajaxfunc # proof of concept : monkey patch handle method so that if we are in an # anonymous session and __fblogin is found is req.form, the user with the @@ -40,5 +41,35 @@ assert req.user.login == login return orig_handle(self, req, path) + +def _recursive_replace_stream_by_content(tree): + """ Search for streams (i.e. object that have a 'read' method) in a tree + (which branches are lists or tuples), and substitute them by their content, + leaving other leafs identical. A copy of the tree with only lists as + branches is returned. + """ + if not isinstance(tree, (list, tuple)): + if hasattr(tree, 'read'): + return tree.read() + return tree + else: + return [_recursive_replace_stream_by_content(value) + for value in tree] + + +@ajaxfunc(output_type='json') +def fileupload(self): + """ Return a json copy of the web request formin which uploaded files + are read and their content substitute the received streams. + """ + try: + result_dict = {} + for key, value in self._cw.form.iteritems(): + result_dict[key] = _recursive_replace_stream_by_content(value) + return result_dict + except Exception, ex: + import traceback as tb + tb.print_exc(ex) + orig_handle = CubicWebPublisher.main_handle_request CubicWebPublisher.main_handle_request = auto_login_handle_request diff -r fdd7f614ca2a -r 0dcc68dd8458 web/test/unittest_web.py --- a/web/test/unittest_web.py Thu Oct 03 15:37:45 2013 +0200 +++ b/web/test/unittest_web.py Fri Oct 04 14:28:50 2013 +0200 @@ -16,7 +16,17 @@ # You should have received a copy of the GNU Lesser General Public License along # with CubicWeb. If not, see . +from json import loads +from os.path import join + +try: + import requests + assert [int(n) for n in requests.__version__.split('.', 2)][:2] >= [1, 2] +except (ImportError, AssertionError): + requests = None + from logilab.common.testlib import TestCase, unittest_main +from cubicweb.devtools.httptest import CubicWebServerTC from cubicweb.devtools.fake import FakeRequest class AjaxReplaceUrlTC(TestCase): @@ -43,5 +53,45 @@ (cbname, qs, req.pageid), req.html_headers.post_inlined_scripts[0]) + +class FileUploadTC(CubicWebServerTC): + + def setUp(self): + "Skip whole test class if a suitable requests module is not available" + if requests is None: + self.skipTest('Python ``requests`` module is not available') + super(FileUploadTC, self).setUp() + + @property + def _post_url(self): + return self.request().build_url('ajax', fname='fileupload') + + def _fobject(self, fname): + return open(join(self.datadir, fname), 'rb') + + def _fcontent(self, fname): + return self._fobject(fname).read() + + def test_single_file_upload(self): + files = {'file': ('schema.py', self._fobject('schema.py'))} + webreq = requests.post(self._post_url, files=files) + # check backward compat : a single uploaded file leads to a single + # 2-uple in the request form + expect = {'fname': u'fileupload', + 'file': ['schema.py', self._fcontent('schema.py')]} + self.assertEqual(webreq.status_code, 200) + self.assertDictEqual(expect, loads(webreq.content)) + + def test_multiple_file_upload(self): + files = [('files', ('schema.py', self._fobject('schema.py'))), + ('files', ('views.py', self._fobject('views.py')))] + webreq = requests.post(self._post_url, files=files,) + expect = {'fname': u'fileupload', + 'files': [['schema.py', self._fcontent('schema.py')], + ['views.py', self._fcontent('views.py')]],} + self.assertEqual(webreq.status_code, 200) + self.assertDictEqual(expect, loads(webreq.content)) + + if __name__ == '__main__': unittest_main() diff -r fdd7f614ca2a -r 0dcc68dd8458 web/views/cwuser.py --- a/web/views/cwuser.py Thu Oct 03 15:37:45 2013 +0200 +++ b/web/views/cwuser.py Fri Oct 04 14:28:50 2013 +0200 @@ -189,8 +189,8 @@ __select__ = StartupView.__select__ & match_user_groups('managers') cache_max_age = 0 # disable caching # XXX one could wish to display for instance only user's firstname/surname - # for non managers but filtering out NULL cause crash with an ldapuser - # source. + # for non managers but filtering out NULL caused crash with an ldapuser + # source. The ldapuser source has been dropped and this code can be updated. rql = ('Any U,US,F,S,U,UAA,UDS, L,UAA,USN,UDSN ORDERBY L WHERE U is CWUser, ' 'U login L, U firstname F, U surname S, ' 'U in_state US, US name USN, ' diff -r fdd7f614ca2a -r 0dcc68dd8458 web/views/sessions.py --- a/web/views/sessions.py Thu Oct 03 15:37:45 2013 +0200 +++ b/web/views/sessions.py Fri Oct 04 14:28:50 2013 +0200 @@ -100,8 +100,6 @@ def _update_last_login_time(self, req): # XXX should properly detect missing permission / non writeable source # and avoid "except (RepositoryError, Unauthorized)" below - if req.user.cw_metainformation()['source']['type'] == 'ldapuser': - return try: req.execute('SET X last_login_time NOW WHERE X eid %(x)s', {'x' : req.user.eid})