16 # |
16 # |
17 # You should have received a copy of the GNU Lesser General Public License along |
17 # You should have received a copy of the GNU Lesser General Public License along |
18 # with CubicWeb. If not, see <http://www.gnu.org/licenses/>. |
18 # with CubicWeb. If not, see <http://www.gnu.org/licenses/>. |
19 """unit tests for module cubicweb.server.repository""" |
19 """unit tests for module cubicweb.server.repository""" |
20 |
20 |
21 import threading |
|
22 import time |
21 import time |
23 import logging |
22 import logging |
24 |
23 |
25 from six.moves import range |
24 from six.moves import range |
26 |
25 |
37 from cubicweb.devtools.repotest import tuplify |
36 from cubicweb.devtools.repotest import tuplify |
38 from cubicweb.server import hook |
37 from cubicweb.server import hook |
39 from cubicweb.server.sqlutils import SQL_PREFIX |
38 from cubicweb.server.sqlutils import SQL_PREFIX |
40 from cubicweb.server.hook import Hook |
39 from cubicweb.server.hook import Hook |
41 from cubicweb.server.sources import native |
40 from cubicweb.server.sources import native |
42 from cubicweb.server.session import SessionClosedError |
|
43 |
41 |
44 |
42 |
45 class RepositoryTC(CubicWebTC): |
43 class RepositoryTC(CubicWebTC): |
46 """ singleton providing access to a persistent storage for entities |
44 """ singleton providing access to a persistent storage for entities |
47 and relation |
45 and relation |
76 def test_all_entities_have_cw_source(self): |
74 def test_all_entities_have_cw_source(self): |
77 with self.admin_access.repo_cnx() as cnx: |
75 with self.admin_access.repo_cnx() as cnx: |
78 self.assertFalse(cnx.execute('Any X WHERE NOT X cw_source S')) |
76 self.assertFalse(cnx.execute('Any X WHERE NOT X cw_source S')) |
79 |
77 |
80 def test_connect(self): |
78 def test_connect(self): |
81 cnxid = self.repo.connect(self.admlogin, password=self.admpassword) |
79 session = self.repo.new_session(self.admlogin, password=self.admpassword) |
82 self.assertTrue(cnxid) |
80 self.assertTrue(session.sessionid) |
83 self.repo.close(cnxid) |
81 session.close() |
84 self.assertRaises(AuthenticationError, |
82 self.assertRaises(AuthenticationError, |
85 self.repo.connect, self.admlogin, password='nimportnawak') |
83 self.repo.connect, self.admlogin, password='nimportnawak') |
86 self.assertRaises(AuthenticationError, |
84 self.assertRaises(AuthenticationError, |
87 self.repo.connect, self.admlogin, password='') |
85 self.repo.connect, self.admlogin, password='') |
88 self.assertRaises(AuthenticationError, |
86 self.assertRaises(AuthenticationError, |
99 cnx.execute('INSERT CWUser X: X login %(login)s, X upassword %(passwd)s, ' |
97 cnx.execute('INSERT CWUser X: X login %(login)s, X upassword %(passwd)s, ' |
100 'X in_group G WHERE G name "users"', |
98 'X in_group G WHERE G name "users"', |
101 {'login': u"barnabé", 'passwd': u"héhéhé".encode('UTF8')}) |
99 {'login': u"barnabé", 'passwd': u"héhéhé".encode('UTF8')}) |
102 cnx.commit() |
100 cnx.commit() |
103 repo = self.repo |
101 repo = self.repo |
104 cnxid = repo.connect(u"barnabé", password=u"héhéhé".encode('UTF8')) |
102 session = repo.new_session(u"barnabé", password=u"héhéhé".encode('UTF8')) |
105 self.assertTrue(cnxid) |
103 self.assertTrue(session.sessionid) |
106 repo.close(cnxid) |
104 session.close() |
107 |
105 |
108 def test_rollback_on_execute_validation_error(self): |
106 def test_rollback_on_execute_validation_error(self): |
109 class ValidationErrorAfterHook(Hook): |
107 class ValidationErrorAfterHook(Hook): |
110 __regid__ = 'valerror-after-hook' |
108 __regid__ = 'valerror-after-hook' |
111 __select__ = Hook.__select__ & is_instance('CWGroup') |
109 __select__ = Hook.__select__ & is_instance('CWGroup') |
144 self.assertFalse(cnx.execute('Any X WHERE X is CWGroup, X name "toto"')) |
142 self.assertFalse(cnx.execute('Any X WHERE X is CWGroup, X name "toto"')) |
145 |
143 |
146 |
144 |
147 def test_close(self): |
145 def test_close(self): |
148 repo = self.repo |
146 repo = self.repo |
149 cnxid = repo.connect(self.admlogin, password=self.admpassword) |
147 session = repo.new_session(self.admlogin, password=self.admpassword) |
150 self.assertTrue(cnxid) |
148 self.assertTrue(session.sessionid) |
151 repo.close(cnxid) |
149 session.close() |
152 |
150 |
153 |
151 |
154 def test_initial_schema(self): |
152 def test_initial_schema(self): |
155 schema = self.repo.schema |
153 schema = self.repo.schema |
156 # check order of attributes is respected |
154 # check order of attributes is respected |
194 ownedby = schema.rschema('owned_by') |
192 ownedby = schema.rschema('owned_by') |
195 self.assertEqual(ownedby.objects('CWEType'), ('CWUser',)) |
193 self.assertEqual(ownedby.objects('CWEType'), ('CWUser',)) |
196 |
194 |
197 def test_internal_api(self): |
195 def test_internal_api(self): |
198 repo = self.repo |
196 repo = self.repo |
199 cnxid = repo.connect(self.admlogin, password=self.admpassword) |
197 session = repo.new_session(self.admlogin, password=self.admpassword) |
200 session = repo._get_session(cnxid) |
|
201 with session.new_cnx() as cnx: |
198 with session.new_cnx() as cnx: |
202 self.assertEqual(repo.type_and_source_from_eid(2, cnx), |
199 self.assertEqual(repo.type_and_source_from_eid(2, cnx), |
203 ('CWGroup', None, 'system')) |
200 ('CWGroup', None, 'system')) |
204 self.assertEqual(repo.type_from_eid(2, cnx), 'CWGroup') |
201 self.assertEqual(repo.type_from_eid(2, cnx), 'CWGroup') |
205 repo.close(cnxid) |
202 session.close() |
206 |
203 |
207 def test_public_api(self): |
204 def test_public_api(self): |
208 self.assertEqual(self.repo.get_schema(), self.repo.schema) |
205 self.assertEqual(self.repo.get_schema(), self.repo.schema) |
209 self.assertEqual(self.repo.source_defs(), {'system': {'type': 'native', |
206 self.assertEqual(self.repo.source_defs(), {'system': {'type': 'native', |
210 'uri': 'system', |
207 'uri': 'system', |