[pyramid] Override cubicweb.hooks.syncsession.get_user_sessions() for Pyramid
Closes #13436818
--- a/cubicweb/pyramid/__init__.py Mon Jun 06 15:37:01 2016 +0200
+++ b/cubicweb/pyramid/__init__.py Thu Oct 06 14:25:18 2016 +0200
@@ -188,6 +188,7 @@
config.include('cubicweb.pyramid.tools')
config.include('cubicweb.pyramid.predicates')
config.include('cubicweb.pyramid.core')
+ config.include('cubicweb.pyramid.syncsession')
if asbool(config.registry.settings.get('cubicweb.bwcompat', True)):
config.include('cubicweb.pyramid.bwcompat')
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/cubicweb/pyramid/syncsession.py Thu Oct 06 14:25:18 2016 +0200
@@ -0,0 +1,29 @@
+# copyright 2016 LOGILAB S.A. (Paris, FRANCE), all rights reserved.
+# contact http://www.logilab.fr/ -- mailto:contact@logilab.fr
+#
+# This file is part of CubicWeb.
+#
+# CubicWeb is free software: you can redistribute it and/or modify it under the
+# terms of the GNU Lesser General Public License as published by the Free
+# Software Foundation, either version 2.1 of the License, or (at your option)
+# any later version.
+#
+# CubicWeb is distributed in the hope that it will be useful, but WITHOUT
+# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
+# FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
+# details.
+#
+# You should have received a copy of the GNU Lesser General Public License along
+# with CubicWeb. If not, see <http://www.gnu.org/licenses/>.
+"""Override cubicweb's syncsession hooks to handle them in the pyramid's way"""
+
+from logilab.common.decorators import monkeypatch
+from cubicweb.hooks import syncsession
+
+
+def includeme(config):
+
+ @monkeypatch(syncsession)
+ def get_user_sessions(cnx, user_eid):
+ if cnx.user.eid == user_eid:
+ yield cnx
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/cubicweb/pyramid/test/test_hooks.py Thu Oct 06 14:25:18 2016 +0200
@@ -0,0 +1,77 @@
+from six import text_type
+
+from cubicweb.pyramid.test import PyramidCWTest
+from cubicweb.pyramid import tools
+
+
+def set_language(request):
+ lang = request.POST.get('lang', None)
+ cnx = request.cw_cnx
+ if lang is None:
+ cnx.execute('DELETE CWProperty X WHERE X for_user U, U eid %(u)s',
+ {'u': cnx.user.eid})
+ else:
+ cnx.user.set_property(u'ui.language', text_type(lang))
+ cnx.commit()
+
+ request.response.text = text_type(cnx.user.properties.get('ui.language', ''))
+ return request.response
+
+
+def add_remove_group(request):
+ add_remove = request.POST['add_remove']
+ cnx = request.cw_cnx
+ if add_remove == 'add':
+ cnx.execute('SET U in_group G WHERE G name "users", U eid %(u)s',
+ {'u': cnx.user.eid})
+ else:
+ cnx.execute('DELETE U in_group G WHERE G name "users", U eid %(u)s',
+ {'u': cnx.user.eid})
+ cnx.commit()
+
+ request.response.text = text_type(','.join(sorted(cnx.user.groups)))
+ return request.response
+
+
+class SessionSyncHoooksTC(PyramidCWTest):
+
+ def includeme(self, config):
+ for view in (set_language, add_remove_group):
+ config.add_route(view.__name__, '/' + view.__name__)
+ config.add_view(view, route_name=view.__name__)
+
+ def setUp(self):
+ super(SessionSyncHoooksTC, self).setUp()
+ with self.admin_access.repo_cnx() as cnx:
+ self.admin_eid = cnx.user.eid
+
+ def test_sync_props(self):
+ # initialize a pyramid session using admin credentials
+ res = self.webapp.post('/login', {
+ '__login': self.admlogin, '__password': self.admpassword})
+ self.assertEqual(res.status_int, 303)
+ # new property
+ res = self.webapp.post('/set_language', {'lang': 'fr'})
+ self.assertEqual(res.text, 'fr')
+ # updated property
+ res = self.webapp.post('/set_language', {'lang': 'en'})
+ self.assertEqual(res.text, 'en')
+ # removed property
+ res = self.webapp.post('/set_language')
+ self.assertEqual(res.text, '')
+
+ def test_sync_groups(self):
+ # initialize a pyramid session using admin credentials
+ res = self.webapp.post('/login', {
+ '__login': self.admlogin, '__password': self.admpassword})
+ self.assertEqual(res.status_int, 303)
+ # XXX how to get pyramid request using this session?
+ res = self.webapp.post('/add_remove_group', {'add_remove': 'add'})
+ self.assertEqual(res.text, 'managers,users')
+ res = self.webapp.post('/add_remove_group', {'add_remove': 'remove'})
+ self.assertEqual(res.text, 'managers')
+
+
+if __name__ == '__main__':
+ from unittest import main
+ main()