server/test/unittest_repository.py
branchstable
changeset 1880 293fe4b49e28
parent 1787 71c143c0ada3
child 1954 9b20f3504af8
equal deleted inserted replaced
1879:cb3466e08d81 1880:293fe4b49e28
   197         self.assertEquals(len(rset), 1)
   197         self.assertEquals(len(rset), 1)
   198 
   198 
   199     def test_transaction_interleaved(self):
   199     def test_transaction_interleaved(self):
   200         self.skip('implement me')
   200         self.skip('implement me')
   201 
   201 
       
   202     def test_close_wait_processing_request(self):
       
   203         repo = self.repo
       
   204         cnxid = repo.connect(*self.default_user_password())
       
   205         repo.execute(cnxid, 'INSERT CWUser X: X login "toto", X upassword "tutu", X in_group G WHERE G name "users"')
       
   206         repo.commit(cnxid)
       
   207         # close has to be in the thread due to sqlite limitations
       
   208         def close_in_a_few_moment():
       
   209             time.sleep(0.1)
       
   210             repo.close(cnxid)
       
   211         t = threading.Thread(target=close_in_a_few_moment)
       
   212         t.start()
       
   213         try:
       
   214             print 'execute'
       
   215             repo.execute(cnxid, 'DELETE CWUser X WHERE X login "toto"')
       
   216             print 'commit'
       
   217             repo.commit(cnxid)
       
   218             print 'commited'
       
   219         finally:
       
   220             t.join()
       
   221 
   202     def test_initial_schema(self):
   222     def test_initial_schema(self):
   203         schema = self.repo.schema
   223         schema = self.repo.schema
   204         # check order of attributes is respected
   224         # check order of attributes is respected
   205         self.assertListEquals([r.type for r in schema.eschema('CWAttribute').ordered_relations()
   225         self.assertListEquals([r.type for r in schema.eschema('CWAttribute').ordered_relations()
   206                                if not r.type in ('eid', 'is', 'is_instance_of', 'identity',
   226                                if not r.type in ('eid', 'is', 'is_instance_of', 'identity',
   236         self.assertEquals(ownedby.objects('CWEType'), ('CWUser',))
   256         self.assertEquals(ownedby.objects('CWEType'), ('CWUser',))
   237 
   257 
   238     def test_pyro(self):
   258     def test_pyro(self):
   239         import Pyro
   259         import Pyro
   240         Pyro.config.PYRO_MULTITHREADED = 0
   260         Pyro.config.PYRO_MULTITHREADED = 0
   241         lock = threading.Lock()
   261         done = []
   242         # the client part has to be in the thread due to sqlite limitations
   262         # the client part has to be in the thread due to sqlite limitations
   243         t = threading.Thread(target=self._pyro_client, args=(lock,))
   263         t = threading.Thread(target=self._pyro_client, args=(done,))
   244         try:
   264         try:
   245             daemon = self.repo.pyro_register()
   265             daemon = self.repo.pyro_register()
   246             t.start()
   266             t.start()
   247             # connection
   267             while not done:
   248             daemon.handleRequests(1.0)
   268                 daemon.handleRequests(1.0)
   249             daemon.handleRequests(1.0)
   269             t.join(1)
   250             daemon.handleRequests(1.0)
   270             if t.isAlive():
   251             # get schema
   271                 self.fail('something went wrong, thread still alive')
   252             daemon.handleRequests(1.0)
       
   253             # execute
       
   254             daemon.handleRequests(1.0)
       
   255             t.join()
       
   256         finally:
   272         finally:
   257             repository.pyro_unregister(self.repo.config)
   273             repository.pyro_unregister(self.repo.config)
   258 
   274 
   259     def _pyro_client(self, lock):
   275     def _pyro_client(self, done):
   260         cnx = connect(self.repo.config.appid, u'admin', 'gingkow')
   276         cnx = connect(self.repo.config.appid, u'admin', 'gingkow')
   261         # check we can get the schema
   277         # check we can get the schema
   262         schema = cnx.get_schema()
   278         schema = cnx.get_schema()
   263         self.assertEquals(schema.__hashmode__, None)
   279         self.assertEquals(schema.__hashmode__, None)
   264         rset = cnx.cursor().execute('Any U,G WHERE U in_group G')
   280         cu = cnx.cursor()
   265 
   281         rset = cu.execute('Any U,G WHERE U in_group G')
       
   282         cnx.close()
       
   283         done.append(True)
   266 
   284 
   267     def test_internal_api(self):
   285     def test_internal_api(self):
   268         repo = self.repo
   286         repo = self.repo
   269         cnxid = repo.connect(*self.default_user_password())
   287         cnxid = repo.connect(*self.default_user_password())
   270         session = repo._get_session(cnxid, setpool=True)
   288         session = repo._get_session(cnxid, setpool=True)