[zmq] add unit tests for ZMQ-based repository (server and dbapi)
authorDavid Douard <david.douard@logilab.fr>
Wed, 04 Apr 2012 10:57:03 +0200
changeset 8353 c1cc2f1cd177
parent 8352 0e3b41118631
child 8354 a9984ceebc26
[zmq] add unit tests for ZMQ-based repository (server and dbapi)
server/test/unittest_repository.py
--- a/server/test/unittest_repository.py	Tue Apr 10 17:09:04 2012 +0200
+++ b/server/test/unittest_repository.py	Wed Apr 04 10:57:03 2012 +0200
@@ -36,7 +36,7 @@
                       UnknownEid, AuthenticationError, Unauthorized, QueryError)
 from cubicweb.predicates import is_instance
 from cubicweb.schema import CubicWebSchema, RQLConstraint
-from cubicweb.dbapi import connect, multiple_connections_unfix
+from cubicweb.dbapi import connect, multiple_connections_unfix, ConnectionProperties
 from cubicweb.devtools.testlib import CubicWebTC
 from cubicweb.devtools.repotest import tuplify
 from cubicweb.server import repository, hook
@@ -379,6 +379,63 @@
             # connect monkey patch some method by default, remove them
             multiple_connections_unfix()
 
+
+    def test_zmq(self):
+        try:
+            import zmq
+        except ImportError:
+            self.skipTest("zmq in not available")
+        done = []
+        from cubicweb.devtools import TestServerConfiguration as ServerConfiguration
+        from cubicweb.server.cwzmq import ZMQRepositoryServer
+        # the client part has to be in a thread due to sqlite limitations
+        t = threading.Thread(target=self._zmq_client, args=(done,))
+        t.start()
+
+        zmq_server = ZMQRepositoryServer(self.repo)
+        zmq_server.connect('tcp://127.0.0.1:41415')
+
+        t2 = threading.Thread(target=self._zmq_quit, args=(done, zmq_server,))
+        t2.start()
+
+        zmq_server.run()
+
+        t2.join(1)
+        t.join(1)
+
+        if t.isAlive():
+            self.fail('something went wrong, thread still alive')
+
+    def _zmq_quit(self, done, srv):
+        while not done:
+            time.sleep(0.1)
+        srv.quit()
+
+    def _zmq_client(self, done):
+        cnxprops = ConnectionProperties('zmq')
+        cnx = connect(self.repo.config.appid, u'admin', password=u'gingkow',
+                      host='tcp://127.0.0.1:41415',
+                      cnxprops=cnxprops,
+                      initlog=False) # don't reset logging configuration
+        try:
+            cnx.load_appobjects(subpath=('entities',))
+            # check we can get the schema
+            schema = cnx.get_schema()
+            self.assertTrue(cnx.vreg)
+            self.assertTrue('etypes'in cnx.vreg)
+            cu = cnx.cursor()
+            rset = cu.execute('Any U,G WHERE U in_group G')
+            user = iter(rset.entities()).next()
+            self.assertTrue(user._cw)
+            self.assertTrue(user._cw.vreg)
+            from cubicweb.entities import authobjs
+            self.assertIsInstance(user._cw.user, authobjs.CWUser)
+            cnx.close()
+            done.append(True)
+        finally:
+            # connect monkey patch some method by default, remove them
+            multiple_connections_unfix()
+
     def test_internal_api(self):
         repo = self.repo
         cnxid = repo.connect(self.admlogin, password=self.admpassword)