--- a/server/test/unittest_repository.py Tue Apr 10 17:09:04 2012 +0200
+++ b/server/test/unittest_repository.py Wed Apr 04 10:57:03 2012 +0200
@@ -36,7 +36,7 @@
UnknownEid, AuthenticationError, Unauthorized, QueryError)
from cubicweb.predicates import is_instance
from cubicweb.schema import CubicWebSchema, RQLConstraint
-from cubicweb.dbapi import connect, multiple_connections_unfix
+from cubicweb.dbapi import connect, multiple_connections_unfix, ConnectionProperties
from cubicweb.devtools.testlib import CubicWebTC
from cubicweb.devtools.repotest import tuplify
from cubicweb.server import repository, hook
@@ -379,6 +379,63 @@
# connect monkey patch some method by default, remove them
multiple_connections_unfix()
+
+ def test_zmq(self):
+ try:
+ import zmq
+ except ImportError:
+ self.skipTest("zmq in not available")
+ done = []
+ from cubicweb.devtools import TestServerConfiguration as ServerConfiguration
+ from cubicweb.server.cwzmq import ZMQRepositoryServer
+ # the client part has to be in a thread due to sqlite limitations
+ t = threading.Thread(target=self._zmq_client, args=(done,))
+ t.start()
+
+ zmq_server = ZMQRepositoryServer(self.repo)
+ zmq_server.connect('tcp://127.0.0.1:41415')
+
+ t2 = threading.Thread(target=self._zmq_quit, args=(done, zmq_server,))
+ t2.start()
+
+ zmq_server.run()
+
+ t2.join(1)
+ t.join(1)
+
+ if t.isAlive():
+ self.fail('something went wrong, thread still alive')
+
+ def _zmq_quit(self, done, srv):
+ while not done:
+ time.sleep(0.1)
+ srv.quit()
+
+ def _zmq_client(self, done):
+ cnxprops = ConnectionProperties('zmq')
+ cnx = connect(self.repo.config.appid, u'admin', password=u'gingkow',
+ host='tcp://127.0.0.1:41415',
+ cnxprops=cnxprops,
+ initlog=False) # don't reset logging configuration
+ try:
+ cnx.load_appobjects(subpath=('entities',))
+ # check we can get the schema
+ schema = cnx.get_schema()
+ self.assertTrue(cnx.vreg)
+ self.assertTrue('etypes'in cnx.vreg)
+ cu = cnx.cursor()
+ rset = cu.execute('Any U,G WHERE U in_group G')
+ user = iter(rset.entities()).next()
+ self.assertTrue(user._cw)
+ self.assertTrue(user._cw.vreg)
+ from cubicweb.entities import authobjs
+ self.assertIsInstance(user._cw.user, authobjs.CWUser)
+ cnx.close()
+ done.append(True)
+ finally:
+ # connect monkey patch some method by default, remove them
+ multiple_connections_unfix()
+
def test_internal_api(self):
repo = self.repo
cnxid = repo.connect(self.admlogin, password=self.admpassword)