server/test/unittest_repository.py
branchstable
changeset 8463 a964c40adbe3
parent 8388 c6c624cea870
child 8483 4ba11607d84a
child 8520 fcd048fa6e6d
equal deleted inserted replaced
8461:8af7c6d86efb 8463:a964c40adbe3
     1 # -*- coding: iso-8859-1 -*-
     1 # -*- coding: iso-8859-1 -*-
     2 # copyright 2003-2011 LOGILAB S.A. (Paris, FRANCE), all rights reserved.
     2 # copyright 2003-2012 LOGILAB S.A. (Paris, FRANCE), all rights reserved.
     3 # contact http://www.logilab.fr/ -- mailto:contact@logilab.fr
     3 # contact http://www.logilab.fr/ -- mailto:contact@logilab.fr
     4 #
     4 #
     5 # This file is part of CubicWeb.
     5 # This file is part of CubicWeb.
     6 #
     6 #
     7 # CubicWeb is free software: you can redistribute it and/or modify it under the
     7 # CubicWeb is free software: you can redistribute it and/or modify it under the
    32 
    32 
    33 from yams.constraints import UniqueConstraint
    33 from yams.constraints import UniqueConstraint
    34 
    34 
    35 from cubicweb import (BadConnectionId, RepositoryError, ValidationError,
    35 from cubicweb import (BadConnectionId, RepositoryError, ValidationError,
    36                       UnknownEid, AuthenticationError, Unauthorized, QueryError)
    36                       UnknownEid, AuthenticationError, Unauthorized, QueryError)
    37 from cubicweb.selectors import is_instance
    37 from cubicweb.predicates import is_instance
    38 from cubicweb.schema import CubicWebSchema, RQLConstraint
    38 from cubicweb.schema import CubicWebSchema, RQLConstraint
    39 from cubicweb.dbapi import connect, multiple_connections_unfix
    39 from cubicweb.dbapi import connect, multiple_connections_unfix, ConnectionProperties
    40 from cubicweb.devtools.testlib import CubicWebTC
    40 from cubicweb.devtools.testlib import CubicWebTC
    41 from cubicweb.devtools.repotest import tuplify
    41 from cubicweb.devtools.repotest import tuplify
    42 from cubicweb.server import repository, hook
    42 from cubicweb.server import repository, hook
    43 from cubicweb.server.sqlutils import SQL_PREFIX
    43 from cubicweb.server.sqlutils import SQL_PREFIX
    44 from cubicweb.server.hook import Hook
    44 from cubicweb.server.hook import Hook
   377             done.append(True)
   377             done.append(True)
   378         finally:
   378         finally:
   379             # connect monkey patch some method by default, remove them
   379             # connect monkey patch some method by default, remove them
   380             multiple_connections_unfix()
   380             multiple_connections_unfix()
   381 
   381 
       
   382 
       
   383     def test_zmq(self):
       
   384         try:
       
   385             import zmq
       
   386         except ImportError:
       
   387             self.skipTest("zmq in not available")
       
   388         done = []
       
   389         from cubicweb.devtools import TestServerConfiguration as ServerConfiguration
       
   390         from cubicweb.server.cwzmq import ZMQRepositoryServer
       
   391         # the client part has to be in a thread due to sqlite limitations
       
   392         t = threading.Thread(target=self._zmq_client, args=(done,))
       
   393         t.start()
       
   394 
       
   395         zmq_server = ZMQRepositoryServer(self.repo)
       
   396         zmq_server.connect('tcp://127.0.0.1:41415')
       
   397 
       
   398         t2 = threading.Thread(target=self._zmq_quit, args=(done, zmq_server,))
       
   399         t2.start()
       
   400 
       
   401         zmq_server.run()
       
   402 
       
   403         t2.join(1)
       
   404         t.join(1)
       
   405 
       
   406         if t.isAlive():
       
   407             self.fail('something went wrong, thread still alive')
       
   408 
       
   409     def _zmq_quit(self, done, srv):
       
   410         while not done:
       
   411             time.sleep(0.1)
       
   412         srv.quit()
       
   413 
       
   414     def _zmq_client(self, done):
       
   415         cnxprops = ConnectionProperties('zmq')
       
   416         try:
       
   417             cnx = connect(self.repo.config.appid, u'admin', password=u'gingkow',
       
   418                           host='tcp://127.0.0.1:41415',
       
   419                           cnxprops=cnxprops,
       
   420                           initlog=False) # don't reset logging configuration
       
   421             try:
       
   422                 cnx.load_appobjects(subpath=('entities',))
       
   423                 # check we can get the schema
       
   424                 schema = cnx.get_schema()
       
   425                 self.assertTrue(cnx.vreg)
       
   426                 self.assertTrue('etypes'in cnx.vreg)
       
   427                 cu = cnx.cursor()
       
   428                 rset = cu.execute('Any U,G WHERE U in_group G')
       
   429                 user = iter(rset.entities()).next()
       
   430                 self.assertTrue(user._cw)
       
   431                 self.assertTrue(user._cw.vreg)
       
   432                 from cubicweb.entities import authobjs
       
   433                 self.assertIsInstance(user._cw.user, authobjs.CWUser)
       
   434                 cnx.close()
       
   435                 done.append(True)
       
   436             finally:
       
   437                 # connect monkey patch some method by default, remove them
       
   438                 multiple_connections_unfix()
       
   439         finally:
       
   440             done.append(False)
       
   441 
   382     def test_internal_api(self):
   442     def test_internal_api(self):
   383         repo = self.repo
   443         repo = self.repo
   384         cnxid = repo.connect(self.admlogin, password=self.admpassword)
   444         cnxid = repo.connect(self.admlogin, password=self.admpassword)
   385         session = repo._get_session(cnxid, setcnxset=True)
   445         session = repo._get_session(cnxid, setcnxset=True)
   386         self.assertEqual(repo.type_and_source_from_eid(2, session),
   446         self.assertEqual(repo.type_and_source_from_eid(2, session),