[pyramid] Override cubicweb.hooks.syncsession.get_user_sessions() for Pyramid
authorSylvain Thénault <sylvain.thenault@logilab.fr>
Thu, 06 Oct 2016 14:25:18 +0200
changeset 11702 be23c3813bbf
parent 11701 ca536eec556b
child 11703 670aa9bf0b6c
[pyramid] Override cubicweb.hooks.syncsession.get_user_sessions() for Pyramid Closes #13436818
cubicweb/pyramid/__init__.py
cubicweb/pyramid/syncsession.py
cubicweb/pyramid/test/test_hooks.py
--- a/cubicweb/pyramid/__init__.py	Mon Jun 06 15:37:01 2016 +0200
+++ b/cubicweb/pyramid/__init__.py	Thu Oct 06 14:25:18 2016 +0200
@@ -188,6 +188,7 @@
     config.include('cubicweb.pyramid.tools')
     config.include('cubicweb.pyramid.predicates')
     config.include('cubicweb.pyramid.core')
+    config.include('cubicweb.pyramid.syncsession')
 
     if asbool(config.registry.settings.get('cubicweb.bwcompat', True)):
         config.include('cubicweb.pyramid.bwcompat')
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/cubicweb/pyramid/syncsession.py	Thu Oct 06 14:25:18 2016 +0200
@@ -0,0 +1,29 @@
+# copyright 2016 LOGILAB S.A. (Paris, FRANCE), all rights reserved.
+# contact http://www.logilab.fr/ -- mailto:contact@logilab.fr
+#
+# This file is part of CubicWeb.
+#
+# CubicWeb is free software: you can redistribute it and/or modify it under the
+# terms of the GNU Lesser General Public License as published by the Free
+# Software Foundation, either version 2.1 of the License, or (at your option)
+# any later version.
+#
+# CubicWeb is distributed in the hope that it will be useful, but WITHOUT
+# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
+# FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for more
+# details.
+#
+# You should have received a copy of the GNU Lesser General Public License along
+# with CubicWeb.  If not, see <http://www.gnu.org/licenses/>.
+"""Override cubicweb's syncsession hooks to handle them in the pyramid's way"""
+
+from logilab.common.decorators import monkeypatch
+from cubicweb.hooks import syncsession
+
+
+def includeme(config):
+
+    @monkeypatch(syncsession)
+    def get_user_sessions(cnx, user_eid):
+        if cnx.user.eid == user_eid:
+            yield cnx
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/cubicweb/pyramid/test/test_hooks.py	Thu Oct 06 14:25:18 2016 +0200
@@ -0,0 +1,77 @@
+from six import text_type
+
+from cubicweb.pyramid.test import PyramidCWTest
+from cubicweb.pyramid import tools
+
+
+def set_language(request):
+    lang = request.POST.get('lang', None)
+    cnx = request.cw_cnx
+    if lang is None:
+        cnx.execute('DELETE CWProperty X WHERE X for_user U, U eid %(u)s',
+                    {'u': cnx.user.eid})
+    else:
+        cnx.user.set_property(u'ui.language', text_type(lang))
+    cnx.commit()
+
+    request.response.text = text_type(cnx.user.properties.get('ui.language', ''))
+    return request.response
+
+
+def add_remove_group(request):
+    add_remove = request.POST['add_remove']
+    cnx = request.cw_cnx
+    if add_remove == 'add':
+        cnx.execute('SET U in_group G WHERE G name "users", U eid %(u)s',
+                    {'u': cnx.user.eid})
+    else:
+        cnx.execute('DELETE U in_group G WHERE G name "users", U eid %(u)s',
+                    {'u': cnx.user.eid})
+    cnx.commit()
+
+    request.response.text = text_type(','.join(sorted(cnx.user.groups)))
+    return request.response
+
+
+class SessionSyncHoooksTC(PyramidCWTest):
+
+    def includeme(self, config):
+        for view in (set_language, add_remove_group):
+            config.add_route(view.__name__, '/' + view.__name__)
+            config.add_view(view, route_name=view.__name__)
+
+    def setUp(self):
+        super(SessionSyncHoooksTC, self).setUp()
+        with self.admin_access.repo_cnx() as cnx:
+            self.admin_eid = cnx.user.eid
+
+    def test_sync_props(self):
+        # initialize a pyramid session using admin credentials
+        res = self.webapp.post('/login', {
+            '__login': self.admlogin, '__password': self.admpassword})
+        self.assertEqual(res.status_int, 303)
+        # new property
+        res = self.webapp.post('/set_language', {'lang': 'fr'})
+        self.assertEqual(res.text, 'fr')
+        # updated property
+        res = self.webapp.post('/set_language', {'lang': 'en'})
+        self.assertEqual(res.text, 'en')
+        # removed property
+        res = self.webapp.post('/set_language')
+        self.assertEqual(res.text, '')
+
+    def test_sync_groups(self):
+        # initialize a pyramid session using admin credentials
+        res = self.webapp.post('/login', {
+            '__login': self.admlogin, '__password': self.admpassword})
+        self.assertEqual(res.status_int, 303)
+        # XXX how to get pyramid request using this session?
+        res = self.webapp.post('/add_remove_group', {'add_remove': 'add'})
+        self.assertEqual(res.text, 'managers,users')
+        res = self.webapp.post('/add_remove_group', {'add_remove': 'remove'})
+        self.assertEqual(res.text, 'managers')
+
+
+if __name__ == '__main__':
+    from unittest import main
+    main()