--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/doc/3.18.rst Fri Oct 04 14:28:50 2013 +0200
@@ -0,0 +1,26 @@
+What's new in CubicWeb 3.18?
+============================
+
+New functionalities
+--------------------
+
+* add a security debugging tool
+ (see `#2920304 <http://www.cubicweb.org/2920304>`_)
+
+
+API changes
+-----------
+
+
+
+Deprecation
+---------------------
+
+
+
+Deprecated Code Drops
+----------------------
+
+* ``ldapuser`` have been dropped; use ``ldapfeed`` now
+ (see `#2936496 <http://www.cubicweb.org/2936496>`_)
+
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/doc/4.0.rst Fri Oct 04 14:28:50 2013 +0200
@@ -0,0 +1,8 @@
+What's new in CubicWeb 4.0?
+============================
+
+Deprecated Code Drops
+----------------------
+
+* The ldapuser source has been dropped. ldapfeed is the only official source
+ remaining for ldap.
--- a/etwist/request.py Thu Oct 03 15:37:45 2013 +0200
+++ b/etwist/request.py Fri Oct 04 14:28:50 2013 +0200
@@ -24,15 +24,21 @@
class CubicWebTwistedRequestAdapter(CubicWebRequestBase):
+ """ from twisted .req to cubicweb .form
+ req.files are put into .form[<filefield>]
+ """
def __init__(self, req, vreg, https):
self._twreq = req
super(CubicWebTwistedRequestAdapter, self).__init__(
vreg, https, req.args, headers=req.received_headers)
- for key, (name, stream) in req.files.iteritems():
- if name is None:
- self.form[key] = (name, stream)
- else:
- self.form[key] = (unicode(name, self.encoding), stream)
+ for key, name_stream_list in req.files.iteritems():
+ for name, stream in name_stream_list:
+ if name is not None:
+ name = unicode(name, self.encoding)
+ self.form.setdefault(key, []).append((name, stream))
+ # 3.16.4 backward compat
+ if len(self.form[key]) == 1:
+ self.form[key] = self.form[key][0]
self.content = self._twreq.content # stream
def http_method(self):
--- a/etwist/server.py Thu Oct 03 15:37:45 2013 +0200
+++ b/etwist/server.py Fri Oct 04 14:28:50 2013 +0200
@@ -244,7 +244,6 @@
self._do_process_multipart = True
self.process()
-
@monkeypatch(http.Request)
def process_multipart(self):
if not self._do_process_multipart:
@@ -254,16 +253,17 @@
keep_blank_values=1,
strict_parsing=1)
for key in form:
- value = form[key]
- if isinstance(value, list):
- self.args[key] = [v.value for v in value]
- elif value.filename:
- if value.done != -1: # -1 is transfer has been interrupted
- self.files[key] = (value.filename, value.file)
+ values = form[key]
+ if not isinstance(values, list):
+ values = [values]
+ for value in values:
+ if value.filename:
+ if value.done != -1: # -1 is transfer has been interrupted
+ self.files.setdefault(key, []).append((value.filename, value.file))
+ else:
+ self.files.setdefault(key, []).append((None, None))
else:
- self.files[key] = (None, None)
- else:
- self.args[key] = value.value
+ self.args.setdefault(key, []).append(value.value)
from logging import getLogger
from cubicweb import set_log_methods
--- a/schema.py Thu Oct 03 15:37:45 2013 +0200
+++ b/schema.py Fri Oct 04 14:28:50 2013 +0200
@@ -44,6 +44,15 @@
import cubicweb
from cubicweb import ETYPE_NAME_MAP, ValidationError, Unauthorized
+try:
+ from cubicweb import server
+except ImportError:
+ # We need to lookup DEBUG from there,
+ # however a pure dbapi client may not have it.
+ class server(object): pass
+ server.DEBUG = False
+
+
PURE_VIRTUAL_RTYPES = set(('identity', 'has_text',))
VIRTUAL_RTYPES = set(('eid', 'identity', 'has_text',))
@@ -268,13 +277,25 @@
return False
PermissionMixIn.has_perm = has_perm
+
def check_perm(self, _cw, action, **kwargs):
# NB: _cw may be a server transaction or a request object.
#
# check user is in an allowed group, if so that's enough internal
# transactions should always stop there
+ DBG = False
+ if server.DEBUG & server.DBG_SEC:
+ if action in server._SECURITY_CAPS:
+ _self_str = str(self)
+ if server._SECURITY_ITEMS:
+ if any(item in _self_str for item in server._SECURITY_ITEMS):
+ DBG = True
+ else:
+ DBG = True
groups = self.get_groups(action)
if _cw.user.matching_groups(groups):
+ if DBG:
+ print 'check_perm: %r %r: user matches %s' % (action, _self_str, groups)
return
# if 'owners' in allowed groups, check if the user actually owns this
# object, if so that's enough
@@ -284,8 +305,15 @@
if 'owners' in groups and (
kwargs.get('creating')
or ('eid' in kwargs and _cw.user.owns(kwargs['eid']))):
+ if DBG:
+ print ('check_perm: %r %r: user is owner or creation time' %
+ (action, _self_str))
return
# else if there is some rql expressions, check them
+ if DBG:
+ print ('check_perm: %r %r %s' %
+ (action, _self_str, [(rqlexpr, kwargs, rqlexpr.check(_cw, **kwargs))
+ for rqlexpr in self.get_rqlexprs(action)]))
if any(rqlexpr.check(_cw, **kwargs)
for rqlexpr in self.get_rqlexprs(action)):
return
--- a/server/__init__.py Thu Oct 03 15:37:45 2013 +0200
+++ b/server/__init__.py Fri Oct 04 14:28:50 2013 +0200
@@ -26,6 +26,7 @@
import sys
from os.path import join, exists
from glob import glob
+from contextlib import contextmanager
from logilab.common.modutils import LazyObject
from logilab.common.textutils import splitstrip
@@ -80,14 +81,57 @@
DBG_HOOKS = 16
#: operations
DBG_OPS = 32
+#: security
+DBG_SEC = 64
#: more verbosity
-DBG_MORE = 64
+DBG_MORE = 128
#: all level enabled
-DBG_ALL = DBG_RQL + DBG_SQL + DBG_REPO + DBG_MS + DBG_HOOKS + DBG_OPS + DBG_MORE
+DBG_ALL = DBG_RQL + DBG_SQL + DBG_REPO + DBG_MS + DBG_HOOKS + DBG_OPS + DBG_SEC + DBG_MORE
+
+_SECURITY_ITEMS = []
+_SECURITY_CAPS = ['read', 'add', 'update', 'delete']
#: current debug mode
DEBUG = 0
+@contextmanager
+def tunesecurity(items=(), capabilities=()):
+ """Context manager to use in conjunction with DBG_SEC.
+
+ This allows some tuning of:
+ * the monitored capabilities ('read', 'add', ....)
+ * the object being checked by the security checkers
+
+ When no item is given, all of them will be watched.
+ By default all capabilities are monitored, unless specified.
+
+ Example use::
+
+ from cubicweb.server import debugged, DBG_SEC, tunesecurity
+ with debugged(DBG_SEC):
+ with tunesecurity(items=('Elephant', 'trumps'),
+ capabilities=('update', 'delete')):
+ babar.cw_set(trumps=celeste)
+ flore.cw_delete()
+
+ ==>
+
+ check_perm: 'update' 'relation Elephant.trumps.Elephant'
+ [(ERQLExpression(Any X WHERE U has_update_permission X, X eid %(x)s, U eid %(u)s),
+ {'eid': 2167}, True)]
+ check_perm: 'delete' 'Elephant'
+ [(ERQLExpression(Any X WHERE U has_delete_permission X, X eid %(x)s, U eid %(u)s),
+ {'eid': 2168}, True)]
+
+ """
+ olditems = _SECURITY_ITEMS[:]
+ _SECURITY_ITEMS.extend(list(items))
+ oldactions = _SECURITY_CAPS[:]
+ _SECURITY_CAPS[:] = capabilities
+ yield
+ _SECURITY_ITEMS[:] = olditems
+ _SECURITY_CAPS[:] = oldactions
+
def set_debug(debugmode):
"""change the repository debugging mode"""
global DEBUG
@@ -305,7 +349,6 @@
SOURCE_TYPES = {'native': LazyObject('cubicweb.server.sources.native', 'NativeSQLSource'),
'datafeed': LazyObject('cubicweb.server.sources.datafeed', 'DataFeedSource'),
'ldapfeed': LazyObject('cubicweb.server.sources.ldapfeed', 'LDAPFeedSource'),
- 'ldapuser': LazyObject('cubicweb.server.sources.ldapuser', 'LDAPUserSource'),
'pyrorql': LazyObject('cubicweb.server.sources.pyrorql', 'PyroRQLSource'),
'zmqrql': LazyObject('cubicweb.server.sources.zmqrql', 'ZMQRQLSource'),
}
--- a/server/sources/ldapuser.py Thu Oct 03 15:37:45 2013 +0200
+++ /dev/null Thu Jan 01 00:00:00 1970 +0000
@@ -1,551 +0,0 @@
-# copyright 2003-2013 LOGILAB S.A. (Paris, FRANCE), all rights reserved.
-# contact http://www.logilab.fr/ -- mailto:contact@logilab.fr
-#
-# This file is part of CubicWeb.
-#
-# CubicWeb is free software: you can redistribute it and/or modify it under the
-# terms of the GNU Lesser General Public License as published by the Free
-# Software Foundation, either version 2.1 of the License, or (at your option)
-# any later version.
-#
-# CubicWeb is distributed in the hope that it will be useful, but WITHOUT
-# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
-# FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
-# details.
-#
-# You should have received a copy of the GNU Lesser General Public License along
-# with CubicWeb. If not, see <http://www.gnu.org/licenses/>.
-"""cubicweb ldap user source
-
-this source is for now limited to a read-only CWUser source
-"""
-from __future__ import division, with_statement
-from base64 import b64decode
-
-import ldap
-from ldap.filter import escape_filter_chars
-
-from rql.nodes import Relation, VariableRef, Constant, Function
-
-import warnings
-from cubicweb import UnknownEid, RepositoryError
-from cubicweb.server import ldaputils
-from cubicweb.server.utils import cartesian_product
-from cubicweb.server.sources import (AbstractSource, TrFunc, GlobTrFunc,
- TimedCache)
-
-# search scopes
-BASE = ldap.SCOPE_BASE
-ONELEVEL = ldap.SCOPE_ONELEVEL
-SUBTREE = ldap.SCOPE_SUBTREE
-
-# map ldap protocol to their standard port
-PROTO_PORT = {'ldap': 389,
- 'ldaps': 636,
- 'ldapi': None,
- }
-
-
-# module is lazily imported
-warnings.warn('Imminent drop of ldapuser. Switch to ldapfeed now!',
- DeprecationWarning)
-
-
-class LDAPUserSource(ldaputils.LDAPSourceMixIn, AbstractSource):
- """LDAP read-only CWUser source"""
- support_entities = {'CWUser': False}
-
- options = ldaputils.LDAPSourceMixIn.options + (
-
- ('synchronization-interval',
- {'type' : 'time',
- 'default': '1d',
- 'help': 'interval between synchronization with the ldap \
-directory (default to once a day).',
- 'group': 'ldap-source', 'level': 3,
- }),
- ('cache-life-time',
- {'type' : 'time',
- 'default': '2h',
- 'help': 'life time of query cache (default to two hours).',
- 'group': 'ldap-source', 'level': 3,
- }),
-
- )
-
- def update_config(self, source_entity, typedconfig):
- """update configuration from source entity. `typedconfig` is config
- properly typed with defaults set
- """
- super(LDAPUserSource, self).update_config(source_entity, typedconfig)
- self._interval = typedconfig['synchronization-interval']
- self._cache_ttl = max(71, typedconfig['cache-life-time'])
- self.reset_caches()
- # XXX copy from datafeed source
- if source_entity is not None:
- self._entity_update(source_entity)
- self.config = typedconfig
- # /end XXX
-
- def reset_caches(self):
- """method called during test to reset potential source caches"""
- self._cache = {}
- self._query_cache = TimedCache(self._cache_ttl)
-
- def init(self, activated, source_entity):
- """method called by the repository once ready to handle request"""
- super(LDAPUserSource, self).init(activated, source_entity)
- if activated:
- self.info('ldap init')
- # set minimum period of 5min 1s (the additional second is to
- # minimize resonnance effet)
- if self.user_rev_attrs['email']:
- self.repo.looping_task(max(301, self._interval), self.synchronize)
- self.repo.looping_task(self._cache_ttl // 10,
- self._query_cache.clear_expired)
-
- def synchronize(self):
- with self.repo.internal_session() as session:
- self.pull_data(session)
-
- def pull_data(self, session, force=False, raise_on_error=False):
- """synchronize content known by this repository with content in the
- external repository
- """
- self.info('synchronizing ldap source %s', self.uri)
- ldap_emailattr = self.user_rev_attrs['email']
- assert ldap_emailattr
- execute = session.execute
- cursor = session.system_sql("SELECT eid, extid FROM entities WHERE "
- "source='%s'" % self.uri)
- for eid, b64extid in cursor.fetchall():
- extid = b64decode(b64extid)
- self.debug('ldap eid %s', eid)
- # if no result found, _search automatically delete entity information
- res = self._search(session, extid, BASE)
- self.debug('ldap search %s', res)
- if res:
- ldapemailaddr = res[0].get(ldap_emailattr)
- if ldapemailaddr:
- if isinstance(ldapemailaddr, list):
- ldapemailaddr = ldapemailaddr[0] # XXX consider only the first email in the list
- rset = execute('Any X,A WHERE '
- 'X address A, U use_email X, U eid %(u)s',
- {'u': eid})
- ldapemailaddr = unicode(ldapemailaddr)
- for emaileid, emailaddr, in rset:
- if emailaddr == ldapemailaddr:
- break
- else:
- self.debug('updating email address of user %s to %s',
- extid, ldapemailaddr)
- emailrset = execute('EmailAddress A WHERE A address %(addr)s',
- {'addr': ldapemailaddr})
- if emailrset:
- execute('SET U use_email X WHERE '
- 'X eid %(x)s, U eid %(u)s',
- {'x': emailrset[0][0], 'u': eid})
- elif rset:
- if not execute('SET X address %(addr)s WHERE '
- 'U primary_email X, U eid %(u)s',
- {'addr': ldapemailaddr, 'u': eid}):
- execute('SET X address %(addr)s WHERE '
- 'X eid %(x)s',
- {'addr': ldapemailaddr, 'x': rset[0][0]})
- else:
- # no email found, create it
- _insert_email(session, ldapemailaddr, eid)
- session.commit()
-
- def ldap_name(self, var):
- if var.stinfo['relations']:
- relname = iter(var.stinfo['relations']).next().r_type
- return self.user_rev_attrs.get(relname)
- return None
-
- def prepare_columns(self, mainvars, rqlst):
- """return two list describing how to build the final results
- from the result of an ldap search (ie a list of dictionary)
- """
- columns = []
- global_transforms = []
- for i, term in enumerate(rqlst.selection):
- if isinstance(term, Constant):
- columns.append(term)
- continue
- if isinstance(term, Function): # LOWER, UPPER, COUNT...
- var = term.get_nodes(VariableRef)[0]
- var = var.variable
- try:
- mainvar = var.stinfo['attrvar'].name
- except AttributeError: # no attrvar set
- mainvar = var.name
- assert mainvar in mainvars
- trname = term.name
- ldapname = self.ldap_name(var)
- if trname in ('COUNT', 'MIN', 'MAX', 'SUM'):
- global_transforms.append(GlobTrFunc(trname, i, ldapname))
- columns.append((mainvar, ldapname))
- continue
- if trname in ('LOWER', 'UPPER'):
- columns.append((mainvar, TrFunc(trname, i, ldapname)))
- continue
- raise NotImplementedError('no support for %s function' % trname)
- if term.name in mainvars:
- columns.append((term.name, 'dn'))
- continue
- var = term.variable
- mainvar = var.stinfo['attrvar'].name
- columns.append((mainvar, self.ldap_name(var)))
- #else:
- # # probably a bug in rql splitting if we arrive here
- # raise NotImplementedError
- return columns, global_transforms
-
- def syntax_tree_search(self, session, union,
- args=None, cachekey=None, varmap=None, debug=0):
- """return result from this source for a rql query (actually from a rql
- syntax tree and a solution dictionary mapping each used variable to a
- possible type). If cachekey is given, the query necessary to fetch the
- results (but not the results themselves) may be cached using this key.
- """
- self.debug('ldap syntax tree search')
- # XXX not handled : transform/aggregat function, join on multiple users...
- assert len(union.children) == 1, 'union not supported'
- rqlst = union.children[0]
- assert not rqlst.with_, 'subquery not supported'
- rqlkey = rqlst.as_string(kwargs=args)
- try:
- results = self._query_cache[rqlkey]
- except KeyError:
- try:
- results = self.rqlst_search(session, rqlst, args)
- self._query_cache[rqlkey] = results
- except ldap.SERVER_DOWN:
- # cant connect to server
- msg = session._("can't connect to source %s, some data may be missing")
- session.set_shared_data('sources_error', msg % self.uri, txdata=True)
- return []
- return results
-
- def rqlst_search(self, session, rqlst, args):
- mainvars = []
- for varname in rqlst.defined_vars:
- for sol in rqlst.solutions:
- if sol[varname] == 'CWUser':
- mainvars.append(varname)
- break
- assert mainvars, rqlst
- columns, globtransforms = self.prepare_columns(mainvars, rqlst)
- eidfilters = [lambda x: x > 0]
- allresults = []
- generator = RQL2LDAPFilter(self, session, args, mainvars)
- for mainvar in mainvars:
- # handle restriction
- try:
- eidfilters_, ldapfilter = generator.generate(rqlst, mainvar)
- except GotDN as ex:
- assert ex.dn, 'no dn!'
- try:
- res = [self._cache[ex.dn]]
- except KeyError:
- res = self._search(session, ex.dn, BASE)
- except UnknownEid as ex:
- # raised when we are looking for the dn of an eid which is not
- # coming from this source
- res = []
- else:
- eidfilters += eidfilters_
- res = self._search(session, self.user_base_dn,
- self.user_base_scope, ldapfilter)
- allresults.append(res)
- # 1. get eid for each dn and filter according to that eid if necessary
- for i, res in enumerate(allresults):
- filteredres = []
- for resdict in res:
- # get sure the entity exists in the system table
- eid = self.repo.extid2eid(self, resdict['dn'], 'CWUser', session)
- for eidfilter in eidfilters:
- if not eidfilter(eid):
- break
- else:
- resdict['eid'] = eid
- filteredres.append(resdict)
- allresults[i] = filteredres
- # 2. merge result for each "mainvar": cartesian product
- allresults = cartesian_product(allresults)
- # 3. build final result according to column definition
- result = []
- for rawline in allresults:
- rawline = dict(zip(mainvars, rawline))
- line = []
- for varname, ldapname in columns:
- if ldapname is None:
- value = None # no mapping available
- elif ldapname == 'dn':
- value = rawline[varname]['eid']
- elif isinstance(ldapname, Constant):
- if ldapname.type == 'Substitute':
- value = args[ldapname.value]
- else:
- value = ldapname.value
- elif isinstance(ldapname, TrFunc):
- value = ldapname.apply(rawline[varname])
- else:
- value = rawline[varname].get(ldapname)
- line.append(value)
- result.append(line)
- for trfunc in globtransforms:
- result = trfunc.apply(result)
- #print '--> ldap result', result
- return result
-
- def _process_ldap_item(self, dn, iterator):
- itemdict = super(LDAPUserSource, self)._process_ldap_item(dn, iterator)
- self._cache[dn] = itemdict
- return itemdict
-
- def _process_no_such_object(self, session, dn):
- eid = self.repo.extid2eid(self, dn, 'CWUser', session, insert=False)
- if eid:
- self.warning('deleting ldap user with eid %s and dn %s', eid, dn)
- entity = session.entity_from_eid(eid, 'CWUser')
- self.repo.delete_info(session, entity, self.uri)
- self.reset_caches()
-
- def before_entity_insertion(self, session, lid, etype, eid, sourceparams):
- """called by the repository when an eid has been attributed for an
- entity stored here but the entity has not been inserted in the system
- table yet.
-
- This method must return the an Entity instance representation of this
- entity.
- """
- self.debug('ldap before entity insertion')
- entity = super(LDAPUserSource, self).before_entity_insertion(
- session, lid, etype, eid, sourceparams)
- res = self._search(session, lid, BASE)[0]
- for attr in entity.e_schema.indexable_attributes():
- entity.cw_edited[attr] = res[self.user_rev_attrs[attr]]
- return entity
-
- def after_entity_insertion(self, session, lid, entity, sourceparams):
- """called by the repository after an entity stored here has been
- inserted in the system table.
- """
- self.debug('ldap after entity insertion')
- super(LDAPUserSource, self).after_entity_insertion(
- session, lid, entity, sourceparams)
- for group in self.user_default_groups:
- session.execute('SET X in_group G WHERE X eid %(x)s, G name %(group)s',
- {'x': entity.eid, 'group': group})
- # search for existant email first
- try:
- # lid = dn
- emailaddr = self._cache[lid][self.user_rev_attrs['email']]
- except KeyError:
- return
- if isinstance(emailaddr, list):
- emailaddr = emailaddr[0] # XXX consider only the first email in the list
- rset = session.execute('EmailAddress X WHERE X address %(addr)s',
- {'addr': emailaddr})
- if rset:
- session.execute('SET U primary_email X WHERE U eid %(u)s, X eid %(x)s',
- {'x': rset[0][0], 'u': entity.eid})
- else:
- # not found, create it
- _insert_email(session, emailaddr, entity.eid)
-
- def update_entity(self, session, entity):
- """replace an entity in the source"""
- raise RepositoryError('this source is read only')
-
- def delete_entity(self, session, entity):
- """delete an entity from the source"""
- raise RepositoryError('this source is read only')
-
-
-def _insert_email(session, emailaddr, ueid):
- session.execute('INSERT EmailAddress X: X address %(addr)s, U primary_email X '
- 'WHERE U eid %(x)s', {'addr': emailaddr, 'x': ueid})
-
-class GotDN(Exception):
- """exception used when a dn localizing the searched user has been found"""
- def __init__(self, dn):
- self.dn = dn
-
-
-class RQL2LDAPFilter(object):
- """generate an LDAP filter for a rql query"""
- def __init__(self, source, session, args=None, mainvars=()):
- self.source = source
- self.repo = source.repo
- self._ldap_attrs = source.user_rev_attrs
- self._base_filters = source.base_filters
- self._session = session
- if args is None:
- args = {}
- self._args = args
- self.mainvars = mainvars
-
- def generate(self, selection, mainvarname):
- self._filters = res = self._base_filters[:]
- self._mainvarname = mainvarname
- self._eidfilters = []
- self._done_not = set()
- restriction = selection.where
- if isinstance(restriction, Relation):
- # only a single relation, need to append result here (no AND/OR)
- filter = restriction.accept(self)
- if filter is not None:
- res.append(filter)
- elif restriction:
- restriction.accept(self)
- if len(res) > 1:
- return self._eidfilters, '(&%s)' % ''.join(res)
- return self._eidfilters, res[0]
-
- def visit_and(self, et):
- """generate filter for a AND subtree"""
- for c in et.children:
- part = c.accept(self)
- if part:
- self._filters.append(part)
-
- def visit_or(self, ou):
- """generate filter for a OR subtree"""
- res = []
- for c in ou.children:
- part = c.accept(self)
- if part:
- res.append(part)
- if res:
- if len(res) > 1:
- part = '(|%s)' % ''.join(res)
- else:
- part = res[0]
- self._filters.append(part)
-
- def visit_not(self, node):
- """generate filter for a OR subtree"""
- part = node.children[0].accept(self)
- if part:
- self._filters.append('(!(%s))'% part)
-
- def visit_relation(self, relation):
- """generate filter for a relation"""
- rtype = relation.r_type
- # don't care of type constraint statement (i.e. relation_type = 'is')
- if rtype == 'is':
- return ''
- lhs, rhs = relation.get_parts()
- # attribute relation
- if self.source.schema.rschema(rtype).final:
- # dunno what to do here, don't pretend anything else
- if lhs.name != self._mainvarname:
- if lhs.name in self.mainvars:
- # XXX check we don't have variable as rhs
- return
- raise NotImplementedError
- rhs_vars = rhs.get_nodes(VariableRef)
- if rhs_vars:
- if len(rhs_vars) > 1:
- raise NotImplementedError
- # selected variable, nothing to do here
- return
- # no variables in the RHS
- if isinstance(rhs.children[0], Function):
- res = rhs.children[0].accept(self)
- elif rtype != 'has_text':
- res = self._visit_attribute_relation(relation)
- else:
- raise NotImplementedError(relation)
- # regular relation XXX todo: in_group
- else:
- raise NotImplementedError(relation)
- return res
-
- def _visit_attribute_relation(self, relation):
- """generate filter for an attribute relation"""
- lhs, rhs = relation.get_parts()
- lhsvar = lhs.variable
- if relation.r_type == 'eid':
- # XXX hack
- # skip comparison sign
- eid = int(rhs.children[0].accept(self))
- if relation.neged(strict=True):
- self._done_not.add(relation.parent)
- self._eidfilters.append(lambda x: not x == eid)
- return
- if rhs.operator != '=':
- filter = {'>': lambda x: x > eid,
- '>=': lambda x: x >= eid,
- '<': lambda x: x < eid,
- '<=': lambda x: x <= eid,
- }[rhs.operator]
- self._eidfilters.append(filter)
- return
- dn = self.repo.eid2extid(self.source, eid, self._session)
- raise GotDN(dn)
- try:
- filter = '(%s%s)' % (self._ldap_attrs[relation.r_type],
- rhs.accept(self))
- except KeyError:
- # unsupported attribute
- self.source.warning('%s source can\'t handle relation %s, no '
- 'results will be returned from this source',
- self.source.uri, relation)
- raise UnknownEid # trick to return no result
- return filter
-
- def visit_comparison(self, cmp):
- """generate filter for a comparaison"""
- return '%s%s'% (cmp.operator, cmp.children[0].accept(self))
-
- def visit_mathexpression(self, mexpr):
- """generate filter for a mathematic expression"""
- raise NotImplementedError
-
- def visit_function(self, function):
- """generate filter name for a function"""
- if function.name == 'IN':
- return self.visit_in(function)
- raise NotImplementedError
-
- def visit_in(self, function):
- grandpapa = function.parent.parent
- ldapattr = self._ldap_attrs[grandpapa.r_type]
- res = []
- for c in function.children:
- part = c.accept(self)
- if part:
- res.append(part)
- if res:
- if len(res) > 1:
- part = '(|%s)' % ''.join('(%s=%s)' % (ldapattr, v) for v in res)
- else:
- part = '(%s=%s)' % (ldapattr, res[0])
- return part
-
- def visit_constant(self, constant):
- """generate filter name for a constant"""
- value = constant.value
- if constant.type is None:
- raise NotImplementedError
- if constant.type == 'Date':
- raise NotImplementedError
- #value = self.keyword_map[value]()
- elif constant.type == 'Substitute':
- value = self._args[constant.value]
- else:
- value = constant.value
- if isinstance(value, unicode):
- value = value.encode('utf8')
- else:
- value = str(value)
- return escape_filter_chars(value)
-
- def visit_variableref(self, variableref):
- """get the sql name for a variable reference"""
- pass
-
--- a/server/test/unittest_ldapsource.py Thu Oct 03 15:37:45 2013 +0200
+++ b/server/test/unittest_ldapsource.py Fri Oct 04 14:28:50 2013 +0200
@@ -33,7 +33,6 @@
from cubicweb.devtools.httptest import get_available_port
from cubicweb.devtools import get_test_db_handler
-from cubicweb.server.sources.ldapuser import GlobTrFunc, UnknownEid, RQL2LDAPFilter
CONFIG_LDAPFEED = u'''
user-base-dn=ou=People,dc=cubicweb,dc=test
@@ -453,386 +452,6 @@
self.setUpClass()
-class LDAPUserSourceTC(LDAPFeedTestBase):
- test_db_id = 'ldap-user'
- tags = CubicWebTC.tags | Tags(('ldap'))
-
- @classmethod
- def pre_setup_database(cls, session, config):
- session.create_entity('CWSource', name=u'ldap', type=u'ldapuser',
- url=URL, config=CONFIG_LDAPUSER)
- session.commit()
- # XXX keep it there
- session.execute('CWUser U')
-
- def setup_database(self):
- # XXX a traceback may appear in the logs of the test due to
- # the _init_repo method that may fail to connect to the ldap
- # source if its URI has changed (from what is stored in the
- # database). This TB is NOT a failure or so.
- with self.session.repo.internal_session(safe=True) as session:
- session.execute('SET S url %(url)s, S config %(conf)s '
- 'WHERE S is CWSource, S name "ldap"',
- {"conf": CONFIG_LDAPUSER, 'url': URL} )
- session.commit()
- self.pull()
-
- def assertMetadata(self, entity):
- self.assertEqual(entity.creation_date, None)
- self.assertEqual(entity.modification_date, None)
-
- def test_synchronize(self):
- source = self.repo.sources_by_uri['ldap']
- source.synchronize()
-
- def test_base(self):
- # check a known one
- rset = self.sexecute('CWUser X WHERE X login %(login)s', {'login': 'syt'})
- e = rset.get_entity(0, 0)
- self.assertEqual(e.login, 'syt')
- e.complete()
- self.assertMetadata(e)
- self.assertEqual(e.firstname, None)
- self.assertEqual(e.surname, None)
- self.assertEqual(e.in_group[0].name, 'users')
- self.assertEqual(e.owned_by[0].login, 'syt')
- self.assertEqual(e.created_by, ())
- addresses = [pe.address for pe in e.use_email]
- addresses.sort()
- # should habe two element but ldapuser seems buggy. It's going to be dropped anyway.
- self.assertEqual(['sylvain.thenault@logilab.fr',], # 'syt@logilab.fr'],
- addresses)
- self.assertIn(e.primary_email[0].address,
- ['sylvain.thenault@logilab.fr', 'syt@logilab.fr'])
- # email content should be indexed on the user
- rset = self.sexecute('CWUser X WHERE X has_text "thenault"')
- self.assertEqual(rset.rows, [[e.eid]])
-
- def test_not(self):
- eid = self.sexecute('CWUser X WHERE X login %(login)s', {'login': 'syt'})[0][0]
- rset = self.sexecute('CWUser X WHERE NOT X eid %s' % eid)
- self.assert_(rset)
- self.assert_(not eid in (r[0] for r in rset))
-
- def test_multiple(self):
- seid = self.sexecute('CWUser X WHERE X login %(login)s', {'login': 'syt'})[0][0]
- aeid = self.sexecute('CWUser X WHERE X login %(login)s', {'login': 'adim'})[0][0]
- rset = self.sexecute('CWUser X, Y WHERE X login %(syt)s, Y login %(adim)s',
- {'syt': 'syt', 'adim': 'adim'})
- self.assertEqual(rset.rows, [[seid, aeid]])
- rset = self.sexecute('Any X,Y,L WHERE X login L, X login %(syt)s, Y login %(adim)s',
- {'syt': 'syt', 'adim': 'adim'})
- self.assertEqual(rset.rows, [[seid, aeid, 'syt']])
-
- def test_in(self):
- seid = self.sexecute('CWUser X WHERE X login %(login)s', {'login': 'syt'})[0][0]
- aeid = self.sexecute('CWUser X WHERE X login %(login)s', {'login': 'adim'})[0][0]
- rset = self.sexecute('Any X,L ORDERBY L WHERE X login IN("%s", "%s"), X login L' % ('syt', 'adim'))
- self.assertEqual(rset.rows, [[aeid, 'adim'], [seid, 'syt']])
-
- def test_relations(self):
- eid = self.sexecute('CWUser X WHERE X login %(login)s', {'login': 'syt'})[0][0]
- rset = self.sexecute('Any X,E WHERE X is CWUser, X login L, X primary_email E')
- self.assert_(eid in (r[0] for r in rset))
- rset = self.sexecute('Any X,L,E WHERE X is CWUser, X login L, X primary_email E')
- self.assert_('syt' in (r[1] for r in rset))
-
- def test_count(self):
- nbusers = self.sexecute('Any COUNT(X) WHERE X is CWUser')[0][0]
- # just check this is a possible number
- self.assert_(nbusers > 1, nbusers)
- self.assert_(nbusers < 30, nbusers)
-
- def test_upper(self):
- eid = self.sexecute('CWUser X WHERE X login %(login)s', {'login': 'syt'})[0][0]
- rset = self.sexecute('Any UPPER(L) WHERE X eid %s, X login L' % eid)
- self.assertEqual(rset[0][0], 'syt'.upper())
-
- def test_unknown_attr(self):
- eid = self.sexecute('CWUser X WHERE X login %(login)s', {'login': 'syt'})[0][0]
- rset = self.sexecute('Any L,C,M WHERE X eid %s, X login L, '
- 'X creation_date C, X modification_date M' % eid)
- self.assertEqual(rset[0][0], 'syt')
- self.assertEqual(rset[0][1], None)
- self.assertEqual(rset[0][2], None)
-
- def test_sort(self):
- logins = [l for l, in self.sexecute('Any L ORDERBY L WHERE X login L')]
- self.assertEqual(logins, sorted(logins))
-
- def test_lower_sort(self):
- logins = [l for l, in self.sexecute('Any L ORDERBY lower(L) WHERE X login L')]
- self.assertEqual(logins, sorted(logins))
-
- def test_or(self):
- rset = self.sexecute('DISTINCT Any X WHERE X login %(login)s OR (X in_group G, G name "managers")',
- {'login': 'syt'})
- self.assertEqual(len(rset), 2, rset.rows) # syt + admin
-
- def test_nonregr_set_owned_by(self):
- # test that when a user coming from ldap is triggering a transition
- # the related TrInfo has correct owner information
- self.sexecute('SET X in_group G WHERE X login %(syt)s, G name "managers"', {'syt': 'syt'})
- self.commit()
- syt = self.sexecute('CWUser X WHERE X login %(login)s', {'login': 'syt'}).get_entity(0, 0)
- self.assertEqual([g.name for g in syt.in_group], ['managers', 'users'])
- cnx = self.login('syt', password='syt')
- cu = cnx.cursor()
- adim = cu.execute('CWUser X WHERE X login %(login)s', {'login': 'adim'}).get_entity(0, 0)
- iworkflowable = adim.cw_adapt_to('IWorkflowable')
- iworkflowable.fire_transition('deactivate')
- try:
- cnx.commit()
- adim.cw_clear_all_caches()
- self.assertEqual(adim.in_state[0].name, 'deactivated')
- trinfo = iworkflowable.latest_trinfo()
- self.assertEqual(trinfo.owned_by[0].login, 'syt')
- # select from_state to skip the user's creation TrInfo
- rset = self.sexecute('Any U ORDERBY D DESC WHERE WF wf_info_for X,'
- 'WF creation_date D, WF from_state FS,'
- 'WF owned_by U?, X eid %(x)s',
- {'x': adim.eid})
- self.assertEqual(rset.rows, [[syt.eid]])
- finally:
- # restore db state
- self.restore_connection()
- adim = self.sexecute('CWUser X WHERE X login %(login)s', {'login': 'adim'}).get_entity(0, 0)
- adim.cw_adapt_to('IWorkflowable').fire_transition('activate')
- self.sexecute('DELETE X in_group G WHERE X login %(syt)s, G name "managers"', {'syt': 'syt'})
-
- def test_same_column_names(self):
- self.sexecute('Any X, Y WHERE X copain Y, X login "comme", Y login "cochon"')
-
- def test_multiple_entities_from_different_sources(self):
- req = self.request()
- self.create_user(req, 'cochon')
- self.assertTrue(self.sexecute('Any X,Y WHERE X login %(syt)s, Y login "cochon"', {'syt': 'syt'}))
-
- def test_exists1(self):
- self.session.set_cnxset()
- self.session.create_entity('CWGroup', name=u'bougloup1')
- self.session.create_entity('CWGroup', name=u'bougloup2')
- self.sexecute('SET U in_group G WHERE G name ~= "bougloup%", U login "admin"')
- self.sexecute('SET U in_group G WHERE G name = "bougloup1", U login %(syt)s', {'syt': 'syt'})
- rset = self.sexecute('Any L,SN ORDERBY L WHERE X in_state S, '
- 'S name SN, X login L, EXISTS(X in_group G, G name ~= "bougloup%")')
- self.assertEqual(rset.rows, [['admin', 'activated'], ['syt', 'activated']])
-
- def test_exists2(self):
- req = self.request()
- self.create_user(req, 'comme')
- self.create_user(req, 'cochon')
- self.sexecute('SET X copain Y WHERE X login "comme", Y login "cochon"')
- rset = self.sexecute('Any GN ORDERBY GN WHERE X in_group G, G name GN, '
- '(G name "managers" OR EXISTS(X copain T, T login in ("comme", "cochon")))')
- self.assertEqual(rset.rows, [['managers'], ['users']])
-
- def test_exists3(self):
- req = self.request()
- self.create_user(req, 'comme')
- self.create_user(req, 'cochon')
- self.sexecute('SET X copain Y WHERE X login "comme", Y login "cochon"')
- self.assertTrue(self.sexecute('Any X, Y WHERE X copain Y, X login "comme", Y login "cochon"'))
- self.sexecute('SET X copain Y WHERE X login %(syt)s, Y login "cochon"', {'syt': 'syt'})
- self.assertTrue(self.sexecute('Any X, Y WHERE X copain Y, X login %(syt)s, Y login "cochon"', {'syt': 'syt'}))
- rset = self.sexecute('Any GN,L WHERE X in_group G, X login L, G name GN, G name "managers" '
- 'OR EXISTS(X copain T, T login in ("comme", "cochon"))')
- self.assertEqual(sorted(rset.rows), [['managers', 'admin'], ['users', 'comme'], ['users', 'syt']])
-
- def test_exists4(self):
- req = self.request()
- self.create_user(req, 'comme')
- self.create_user(req, 'cochon', groups=('users', 'guests'))
- self.create_user(req, 'billy')
- self.sexecute('SET X copain Y WHERE X login "comme", Y login "cochon"')
- self.sexecute('SET X copain Y WHERE X login "cochon", Y login "cochon"')
- self.sexecute('SET X copain Y WHERE X login "comme", Y login "billy"')
- self.sexecute('SET X copain Y WHERE X login %(syt)s, Y login "billy"', {'syt': 'syt'})
- # search for group name, login where
- # CWUser copain with "comme" or "cochon" AND same login as the copain
- # OR
- # CWUser in_state activated AND not copain with billy
- #
- # SO we expect everybody but "comme" and "syt"
- rset= self.sexecute('Any GN,L WHERE X in_group G, X login L, G name GN, '
- 'EXISTS(X copain T, T login L, T login in ("comme", "cochon")) OR '
- 'EXISTS(X in_state S, S name "activated", NOT X copain T2, T2 login "billy")')
- all = self.sexecute('Any GN, L WHERE X in_group G, X login L, G name GN')
- all.rows.remove(['users', 'comme'])
- all.rows.remove(['users', 'syt'])
- self.assertEqual(sorted(rset.rows), sorted(all.rows))
-
- def test_exists5(self):
- req = self.request()
- self.create_user(req, 'comme')
- self.create_user(req, 'cochon', groups=('users', 'guests'))
- self.create_user(req, 'billy')
- self.sexecute('SET X copain Y WHERE X login "comme", Y login "cochon"')
- self.sexecute('SET X copain Y WHERE X login "cochon", Y login "cochon"')
- self.sexecute('SET X copain Y WHERE X login "comme", Y login "billy"')
- self.sexecute('SET X copain Y WHERE X login %(syt)s, Y login "cochon"', {'syt': 'syt'})
- rset= self.sexecute('Any L WHERE X login L, '
- 'EXISTS(X copain T, T login in ("comme", "cochon")) AND '
- 'NOT EXISTS(X copain T2, T2 login "billy")')
- self.assertEqual(sorted(rset.rows), [['cochon'], ['syt']])
- rset= self.sexecute('Any GN,L WHERE X in_group G, X login L, G name GN, '
- 'EXISTS(X copain T, T login in ("comme", "cochon")) AND '
- 'NOT EXISTS(X copain T2, T2 login "billy")')
- self.assertEqual(sorted(rset.rows), [['guests', 'cochon'],
- ['users', 'cochon'],
- ['users', 'syt']])
-
- def test_cd_restriction(self):
- rset = self.sexecute('CWUser X WHERE X creation_date > "2009-02-01"')
- # admin/anon but no ldap user since it doesn't support creation_date
- self.assertEqual(sorted(e.login for e in rset.entities()),
- ['admin', 'anon'])
-
- def test_union(self):
- afeids = self.sexecute('State X')
- ueids = self.sexecute('CWUser X')
- rset = self.sexecute('(Any X WHERE X is State) UNION (Any X WHERE X is CWUser)')
- self.assertEqual(sorted(r[0] for r in rset.rows),
- sorted(r[0] for r in afeids + ueids))
-
- def _init_security_test(self):
- req = self.request()
- self.create_user(req, 'iaminguestsgrouponly', groups=('guests',))
- cnx = self.login('iaminguestsgrouponly')
- return cnx.cursor()
-
- def test_security1(self):
- cu = self._init_security_test()
- rset = cu.execute('CWUser X WHERE X login %(login)s', {'login': 'syt'})
- self.assertEqual(rset.rows, [])
- rset = cu.execute('Any X WHERE X login "iaminguestsgrouponly"')
- self.assertEqual(len(rset.rows), 1)
-
- def test_security2(self):
- cu = self._init_security_test()
- rset = cu.execute('Any X WHERE X has_text %(syt)s', {'syt': 'syt'})
- self.assertEqual(rset.rows, [])
- rset = cu.execute('Any X WHERE X has_text "iaminguestsgrouponly"')
- self.assertEqual(len(rset.rows), 1)
-
- def test_security3(self):
- cu = self._init_security_test()
- rset = cu.execute('Any F WHERE X has_text %(syt)s, X firstname F', {'syt': 'syt'})
- self.assertEqual(rset.rows, [])
- rset = cu.execute('Any F WHERE X has_text "iaminguestsgrouponly", X firstname F')
- self.assertEqual(rset.rows, [[None]])
-
- def test_nonregr1(self):
- self.sexecute('Any X,AA ORDERBY AA DESC WHERE E eid %(x)s, E owned_by X, '
- 'X modification_date AA',
- {'x': self.session.user.eid})
-
- def test_nonregr2(self):
- self.sexecute('Any X,L,AA WHERE E eid %(x)s, E owned_by X, '
- 'X login L, X modification_date AA',
- {'x': self.session.user.eid})
-
- def test_nonregr3(self):
- self.sexecute('Any X,AA ORDERBY AA DESC WHERE E eid %(x)s, '
- 'X modification_date AA',
- {'x': self.session.user.eid})
-
- def test_nonregr4(self):
- emaileid = self.sexecute('INSERT EmailAddress X: X address "toto@logilab.org"')[0][0]
- self.sexecute('Any X,AA WHERE X use_email Y, Y eid %(x)s, X modification_date AA',
- {'x': emaileid})
-
- def test_nonregr5(self):
- # original jpl query:
- # Any X, NOW - CD, P WHERE P is Project, U interested_in P, U is CWUser,
- # U login "sthenault", X concerns P, X creation_date CD ORDERBY CD DESC LIMIT 5
- rql = ('Any X, NOW - CD, P ORDERBY CD DESC LIMIT 5 WHERE P bookmarked_by U, '
- 'U login "%s", P is X, X creation_date CD') % self.session.user.login
- self.sexecute(rql, )#{'x': })
-
- def test_nonregr6(self):
- self.sexecute('Any B,U,UL GROUPBY B,U,UL WHERE B created_by U?, B is File '
- 'WITH U,UL BEING (Any U,UL WHERE ME eid %(x)s, (EXISTS(U identity ME) '
- 'OR (EXISTS(U in_group G, G name IN("managers", "staff")))) '
- 'OR (EXISTS(U in_group H, ME in_group H, NOT H name "users")), U login UL, U is CWUser)',
- {'x': self.session.user.eid})
-
-class GlobTrFuncTC(TestCase):
-
- def test_count(self):
- trfunc = GlobTrFunc('count', 0)
- res = trfunc.apply([[1], [2], [3], [4]])
- self.assertEqual(res, [[4]])
- trfunc = GlobTrFunc('count', 1)
- res = trfunc.apply([[1, 2], [2, 4], [3, 6], [1, 5]])
- self.assertEqual(res, [[1, 2], [2, 1], [3, 1]])
-
- def test_sum(self):
- trfunc = GlobTrFunc('sum', 0)
- res = trfunc.apply([[1], [2], [3], [4]])
- self.assertEqual(res, [[10]])
- trfunc = GlobTrFunc('sum', 1)
- res = trfunc.apply([[1, 2], [2, 4], [3, 6], [1, 5]])
- self.assertEqual(res, [[1, 7], [2, 4], [3, 6]])
-
- def test_min(self):
- trfunc = GlobTrFunc('min', 0)
- res = trfunc.apply([[1], [2], [3], [4]])
- self.assertEqual(res, [[1]])
- trfunc = GlobTrFunc('min', 1)
- res = trfunc.apply([[1, 2], [2, 4], [3, 6], [1, 5]])
- self.assertEqual(res, [[1, 2], [2, 4], [3, 6]])
-
- def test_max(self):
- trfunc = GlobTrFunc('max', 0)
- res = trfunc.apply([[1], [2], [3], [4]])
- self.assertEqual(res, [[4]])
- trfunc = GlobTrFunc('max', 1)
- res = trfunc.apply([[1, 2], [2, 4], [3, 6], [1, 5]])
- self.assertEqual(res, [[1, 5], [2, 4], [3, 6]])
-
-
-class RQL2LDAPFilterTC(RQLGeneratorTC):
-
- tags = RQLGeneratorTC.tags | Tags(('ldap'))
-
- @property
- def schema(self):
- """return the application schema"""
- return self._schema
-
- def setUp(self):
- self.handler = get_test_db_handler(LDAPUserSourceTC.config)
- self.handler.build_db_cache('ldap-rqlgenerator', LDAPUserSourceTC.pre_setup_database)
- self.handler.restore_database('ldap-rqlgenerator')
- self._repo = repo = self.handler.get_repo()
- self._schema = repo.schema
- super(RQL2LDAPFilterTC, self).setUp()
- ldapsource = repo.sources[-1]
- self.cnxset = repo._get_cnxset()
- session = mock_object(cnxset=self.cnxset)
- self.o = RQL2LDAPFilter(ldapsource, session)
- self.ldapclasses = ''.join(ldapsource.base_filters)
-
- def tearDown(self):
- self._repo.turn_repo_off()
- super(RQL2LDAPFilterTC, self).tearDown()
-
- def test_base(self):
- rqlst = self._prepare('CWUser X WHERE X login "toto"').children[0]
- self.assertEqual(self.o.generate(rqlst, 'X')[1],
- '(&%s(uid=toto))' % self.ldapclasses)
-
- def test_kwargs(self):
- rqlst = self._prepare('CWUser X WHERE X login %(x)s').children[0]
- self.o._args = {'x': "toto"}
- self.assertEqual(self.o.generate(rqlst, 'X')[1],
- '(&%s(uid=toto))' % self.ldapclasses)
-
- def test_get_attr(self):
- rqlst = self._prepare('Any X WHERE E firstname X, E eid 12').children[0]
- self.assertRaises(UnknownEid, self.o.generate, rqlst, 'E')
-
if __name__ == '__main__':
unittest_main()
--- a/test/unittest_mail.py Thu Oct 03 15:37:45 2013 +0200
+++ b/test/unittest_mail.py Fri Oct 04 14:28:50 2013 +0200
@@ -31,7 +31,7 @@
def getlogin():
- """avoid usinng os.getlogin() because of strange tty / stdin problems
+ """avoid using os.getlogin() because of strange tty / stdin problems
(man 3 getlogin)
Another solution would be to use $LOGNAME, $USER or $USERNAME
"""
--- a/web/formfields.py Thu Oct 03 15:37:45 2013 +0200
+++ b/web/formfields.py Fri Oct 04 14:28:50 2013 +0200
@@ -760,8 +760,13 @@
# raise UnmodifiedField instead of returning None, since the later
# will try to remove already attached file if any
raise UnmodifiedField()
- # value is a 2-uple (filename, stream)
+ # value is a 2-uple (filename, stream) or a list of such
+ # tuples (multiple files)
try:
+ if isinstance(value, list):
+ value = value[0]
+ form.warning('mutiple files provided, however '
+ 'only the first will be picked')
filename, stream = value
except ValueError:
raise UnmodifiedField()
--- a/web/test/data/views.py Thu Oct 03 15:37:45 2013 +0200
+++ b/web/test/data/views.py Fri Oct 04 14:28:50 2013 +0200
@@ -18,6 +18,7 @@
from cubicweb.web import Redirect
from cubicweb.web.application import CubicWebPublisher
+from cubicweb.web.views.ajaxcontroller import ajaxfunc
# proof of concept : monkey patch handle method so that if we are in an
# anonymous session and __fblogin is found is req.form, the user with the
@@ -40,5 +41,35 @@
assert req.user.login == login
return orig_handle(self, req, path)
+
+def _recursive_replace_stream_by_content(tree):
+ """ Search for streams (i.e. object that have a 'read' method) in a tree
+ (which branches are lists or tuples), and substitute them by their content,
+ leaving other leafs identical. A copy of the tree with only lists as
+ branches is returned.
+ """
+ if not isinstance(tree, (list, tuple)):
+ if hasattr(tree, 'read'):
+ return tree.read()
+ return tree
+ else:
+ return [_recursive_replace_stream_by_content(value)
+ for value in tree]
+
+
+@ajaxfunc(output_type='json')
+def fileupload(self):
+ """ Return a json copy of the web request formin which uploaded files
+ are read and their content substitute the received streams.
+ """
+ try:
+ result_dict = {}
+ for key, value in self._cw.form.iteritems():
+ result_dict[key] = _recursive_replace_stream_by_content(value)
+ return result_dict
+ except Exception, ex:
+ import traceback as tb
+ tb.print_exc(ex)
+
orig_handle = CubicWebPublisher.main_handle_request
CubicWebPublisher.main_handle_request = auto_login_handle_request
--- a/web/test/unittest_web.py Thu Oct 03 15:37:45 2013 +0200
+++ b/web/test/unittest_web.py Fri Oct 04 14:28:50 2013 +0200
@@ -16,7 +16,17 @@
# You should have received a copy of the GNU Lesser General Public License along
# with CubicWeb. If not, see <http://www.gnu.org/licenses/>.
+from json import loads
+from os.path import join
+
+try:
+ import requests
+ assert [int(n) for n in requests.__version__.split('.', 2)][:2] >= [1, 2]
+except (ImportError, AssertionError):
+ requests = None
+
from logilab.common.testlib import TestCase, unittest_main
+from cubicweb.devtools.httptest import CubicWebServerTC
from cubicweb.devtools.fake import FakeRequest
class AjaxReplaceUrlTC(TestCase):
@@ -43,5 +53,45 @@
(cbname, qs, req.pageid),
req.html_headers.post_inlined_scripts[0])
+
+class FileUploadTC(CubicWebServerTC):
+
+ def setUp(self):
+ "Skip whole test class if a suitable requests module is not available"
+ if requests is None:
+ self.skipTest('Python ``requests`` module is not available')
+ super(FileUploadTC, self).setUp()
+
+ @property
+ def _post_url(self):
+ return self.request().build_url('ajax', fname='fileupload')
+
+ def _fobject(self, fname):
+ return open(join(self.datadir, fname), 'rb')
+
+ def _fcontent(self, fname):
+ return self._fobject(fname).read()
+
+ def test_single_file_upload(self):
+ files = {'file': ('schema.py', self._fobject('schema.py'))}
+ webreq = requests.post(self._post_url, files=files)
+ # check backward compat : a single uploaded file leads to a single
+ # 2-uple in the request form
+ expect = {'fname': u'fileupload',
+ 'file': ['schema.py', self._fcontent('schema.py')]}
+ self.assertEqual(webreq.status_code, 200)
+ self.assertDictEqual(expect, loads(webreq.content))
+
+ def test_multiple_file_upload(self):
+ files = [('files', ('schema.py', self._fobject('schema.py'))),
+ ('files', ('views.py', self._fobject('views.py')))]
+ webreq = requests.post(self._post_url, files=files,)
+ expect = {'fname': u'fileupload',
+ 'files': [['schema.py', self._fcontent('schema.py')],
+ ['views.py', self._fcontent('views.py')]],}
+ self.assertEqual(webreq.status_code, 200)
+ self.assertDictEqual(expect, loads(webreq.content))
+
+
if __name__ == '__main__':
unittest_main()
--- a/web/views/cwuser.py Thu Oct 03 15:37:45 2013 +0200
+++ b/web/views/cwuser.py Fri Oct 04 14:28:50 2013 +0200
@@ -189,8 +189,8 @@
__select__ = StartupView.__select__ & match_user_groups('managers')
cache_max_age = 0 # disable caching
# XXX one could wish to display for instance only user's firstname/surname
- # for non managers but filtering out NULL cause crash with an ldapuser
- # source.
+ # for non managers but filtering out NULL caused crash with an ldapuser
+ # source. The ldapuser source has been dropped and this code can be updated.
rql = ('Any U,US,F,S,U,UAA,UDS, L,UAA,USN,UDSN ORDERBY L WHERE U is CWUser, '
'U login L, U firstname F, U surname S, '
'U in_state US, US name USN, '
--- a/web/views/sessions.py Thu Oct 03 15:37:45 2013 +0200
+++ b/web/views/sessions.py Fri Oct 04 14:28:50 2013 +0200
@@ -100,8 +100,6 @@
def _update_last_login_time(self, req):
# XXX should properly detect missing permission / non writeable source
# and avoid "except (RepositoryError, Unauthorized)" below
- if req.user.cw_metainformation()['source']['type'] == 'ldapuser':
- return
try:
req.execute('SET X last_login_time NOW WHERE X eid %(x)s',
{'x' : req.user.eid})