server/test/unittest_repository.py
changeset 4191 01638461d4b0
parent 3777 3ef8cdb5fb1c
child 4252 6c4f109c2b03
--- a/server/test/unittest_repository.py	Tue Dec 22 19:27:26 2009 +0100
+++ b/server/test/unittest_repository.py	Tue Dec 22 19:27:48 2009 +0100
@@ -60,7 +60,7 @@
 
     def test_schema_has_owner(self):
         repo = self.repo
-        cnxid = repo.connect(self.admlogin, self.admpassword)
+        cnxid = repo.connect(self.admlogin, password=self.admpassword)
         self.failIf(repo.execute(cnxid, 'CWEType X WHERE NOT X owned_by U'))
         self.failIf(repo.execute(cnxid, 'CWRType X WHERE NOT X owned_by U'))
         self.failIf(repo.execute(cnxid, 'CWAttribute X WHERE NOT X owned_by U'))
@@ -69,17 +69,21 @@
         self.failIf(repo.execute(cnxid, 'CWConstraintType X WHERE NOT X owned_by U'))
 
     def test_connect(self):
-        self.assert_(self.repo.connect(self.admlogin, self.admpassword))
+        self.assert_(self.repo.connect(self.admlogin, password=self.admpassword))
         self.assertRaises(AuthenticationError,
-                          self.repo.connect, self.admlogin, 'nimportnawak')
+                          self.repo.connect, self.admlogin, password='nimportnawak')
+        self.assertRaises(AuthenticationError,
+                          self.repo.connect, self.admlogin, password=None)
         self.assertRaises(AuthenticationError,
-                          self.repo.connect, self.admlogin, None)
+                          self.repo.connect, None, password=None)
         self.assertRaises(AuthenticationError,
-                          self.repo.connect, None, None)
+                          self.repo.connect, self.admlogin)
+        self.assertRaises(AuthenticationError,
+                          self.repo.connect, None)
 
     def test_execute(self):
         repo = self.repo
-        cnxid = repo.connect(self.admlogin, self.admpassword)
+        cnxid = repo.connect(self.admlogin, password=self.admpassword)
         repo.execute(cnxid, 'Any X')
         repo.execute(cnxid, 'Any X where X is Personne')
         repo.execute(cnxid, 'Any X where X is Personne, X nom ~= "to"')
@@ -88,15 +92,15 @@
 
     def test_login_upassword_accent(self):
         repo = self.repo
-        cnxid = repo.connect(self.admlogin, self.admpassword)
+        cnxid = repo.connect(self.admlogin, password=self.admpassword)
         repo.execute(cnxid, 'INSERT CWUser X: X login %(login)s, X upassword %(passwd)s, X in_group G WHERE G name "users"',
                      {'login': u"barnabé", 'passwd': u"héhéhé".encode('UTF8')})
         repo.commit(cnxid)
         repo.close(cnxid)
-        self.assert_(repo.connect(u"barnabé", u"héhéhé".encode('UTF8')))
+        self.assert_(repo.connect(u"barnabé", password=u"héhéhé".encode('UTF8')))
 
     def test_invalid_entity_rollback(self):
-        cnxid = self.repo.connect(self.admlogin, self.admpassword)
+        cnxid = self.repo.connect(self.admlogin, password=self.admpassword)
         # no group
         self.repo.execute(cnxid,
                           'INSERT CWUser X: X login %(login)s, X upassword %(passwd)s',
@@ -106,7 +110,7 @@
 
     def test_close(self):
         repo = self.repo
-        cnxid = repo.connect(self.admlogin, self.admpassword)
+        cnxid = repo.connect(self.admlogin, password=self.admpassword)
         self.assert_(cnxid)
         repo.close(cnxid)
         self.assertRaises(BadConnectionId, repo.execute, cnxid, 'Any X')
@@ -117,9 +121,9 @@
 
     def test_shared_data(self):
         repo = self.repo
-        cnxid = repo.connect(self.admlogin, self.admpassword)
+        cnxid = repo.connect(self.admlogin, password=self.admpassword)
         repo.set_shared_data(cnxid, 'data', 4)
-        cnxid2 = repo.connect(self.admlogin, self.admpassword)
+        cnxid2 = repo.connect(self.admlogin, password=self.admpassword)
         self.assertEquals(repo.get_shared_data(cnxid, 'data'), 4)
         self.assertEquals(repo.get_shared_data(cnxid2, 'data'), None)
         repo.set_shared_data(cnxid2, 'data', 5)
@@ -137,14 +141,14 @@
 
     def test_check_session(self):
         repo = self.repo
-        cnxid = repo.connect(self.admlogin, self.admpassword)
+        cnxid = repo.connect(self.admlogin, password=self.admpassword)
         self.assertEquals(repo.check_session(cnxid), None)
         repo.close(cnxid)
         self.assertRaises(BadConnectionId, repo.check_session, cnxid)
 
     def test_transaction_base(self):
         repo = self.repo
-        cnxid = repo.connect(self.admlogin, self.admpassword)
+        cnxid = repo.connect(self.admlogin, password=self.admpassword)
         # check db state
         result = repo.execute(cnxid, 'Personne X')
         self.assertEquals(result.rowcount, 0)
@@ -163,7 +167,7 @@
 
     def test_transaction_base2(self):
         repo = self.repo
-        cnxid = repo.connect(self.admlogin, self.admpassword)
+        cnxid = repo.connect(self.admlogin, password=self.admpassword)
         # rollback relation insertion
         repo.execute(cnxid, "SET U in_group G WHERE U login 'admin', G name 'guests'")
         result = repo.execute(cnxid, "Any U WHERE U in_group G, U login 'admin', G name 'guests'")
@@ -174,7 +178,7 @@
 
     def test_transaction_base3(self):
         repo = self.repo
-        cnxid = repo.connect(self.admlogin, self.admpassword)
+        cnxid = repo.connect(self.admlogin, password=self.admpassword)
         # rollback state change which trigger TrInfo insertion
         user = repo._get_session(cnxid).user
         user.fire_transition('deactivate')
@@ -189,7 +193,7 @@
 
     def test_close_wait_processing_request(self):
         repo = self.repo
-        cnxid = repo.connect(self.admlogin, self.admpassword)
+        cnxid = repo.connect(self.admlogin, password=self.admpassword)
         repo.execute(cnxid, 'INSERT CWUser X: X login "toto", X upassword "tutu", X in_group G WHERE G name "users"')
         repo.commit(cnxid)
         # close has to be in the thread due to sqlite limitations
@@ -210,8 +214,11 @@
         self.assertListEquals([r.type for r in schema.eschema('CWAttribute').ordered_relations()
                                if not r.type in ('eid', 'is', 'is_instance_of', 'identity',
                                                  'creation_date', 'modification_date', 'cwuri',
-                                                 'owned_by', 'created_by')],
-                              ['relation_type', 'from_entity', 'to_entity', 'in_basket', 'constrained_by',
+                                                 'owned_by', 'created_by',
+                                                 'add_permission', 'delete_permission', 'read_permission')],
+                              ['relation_type',
+                               'from_entity', 'to_entity',
+                               'in_basket', 'constrained_by', 
                                'cardinality', 'ordernum',
                                'indexed', 'fulltextindexed', 'internationalizable',
                                'defaultval', 'description', 'description_format'])
@@ -219,7 +226,7 @@
         self.assertEquals(schema.eschema('CWEType').main_attribute(), 'name')
         self.assertEquals(schema.eschema('State').main_attribute(), 'name')
 
-        constraints = schema.rschema('name').rproperty('CWEType', 'String', 'constraints')
+        constraints = schema.rschema('name').rdef('CWEType', 'String').constraints
         self.assertEquals(len(constraints), 2)
         for cstr in constraints[:]:
             if isinstance(cstr, UniqueConstraint):
@@ -231,7 +238,7 @@
         self.assertEquals(sizeconstraint.min, None)
         self.assertEquals(sizeconstraint.max, 64)
 
-        constraints = schema.rschema('relation_type').rproperty('CWAttribute', 'CWRType', 'constraints')
+        constraints = schema.rschema('relation_type').rdef('CWAttribute', 'CWRType').constraints
         self.assertEquals(len(constraints), 1)
         cstr = constraints[0]
         self.assert_(isinstance(cstr, RQLConstraint))
@@ -258,7 +265,7 @@
             repository.pyro_unregister(self.repo.config)
 
     def _pyro_client(self, done):
-        cnx = connect(self.repo.config.appid, u'admin', 'gingkow')
+        cnx = connect(self.repo.config.appid, u'admin', password='gingkow')
         try:
             # check we can get the schema
             schema = cnx.get_schema()
@@ -273,7 +280,7 @@
 
     def test_internal_api(self):
         repo = self.repo
-        cnxid = repo.connect(self.admlogin, self.admpassword)
+        cnxid = repo.connect(self.admlogin, password=self.admpassword)
         session = repo._get_session(cnxid, setpool=True)
         self.assertEquals(repo.type_and_source_from_eid(1, session),
                           ('CWGroup', 'system', None))
@@ -291,7 +298,7 @@
 
     def test_session_api(self):
         repo = self.repo
-        cnxid = repo.connect(self.admlogin, self.admpassword)
+        cnxid = repo.connect(self.admlogin, password=self.admpassword)
         self.assertEquals(repo.user_info(cnxid), (5, 'admin', set([u'managers']), {}))
         self.assertEquals(repo.describe(cnxid, 1), (u'CWGroup', u'system', None))
         repo.close(cnxid)
@@ -300,7 +307,7 @@
 
     def test_shared_data_api(self):
         repo = self.repo
-        cnxid = repo.connect(self.admlogin, self.admpassword)
+        cnxid = repo.connect(self.admlogin, password=self.admpassword)
         self.assertEquals(repo.get_shared_data(cnxid, 'data'), None)
         repo.set_shared_data(cnxid, 'data', 4)
         self.assertEquals(repo.get_shared_data(cnxid, 'data'), 4)
@@ -326,14 +333,14 @@
 #         print 'test time: %.3f (time) %.3f (cpu)' % ((time() - t), clock() - c)
 
     def test_delete_if_singlecard1(self):
-        note = self.add_entity('Affaire')
-        p1 = self.add_entity('Personne', nom=u'toto')
+        note = self.request().create_entity('Affaire')
+        p1 = self.request().create_entity('Personne', nom=u'toto')
         self.execute('SET A todo_by P WHERE A eid %(x)s, P eid %(p)s',
                      {'x': note.eid, 'p': p1.eid})
         rset = self.execute('Any P WHERE A todo_by P, A eid %(x)s',
                             {'x': note.eid})
         self.assertEquals(len(rset), 1)
-        p2 = self.add_entity('Personne', nom=u'tutu')
+        p2 = self.request().create_entity('Personne', nom=u'tutu')
         self.execute('SET A todo_by P WHERE A eid %(x)s, P eid %(p)s',
                      {'x': note.eid, 'p': p2.eid})
         rset = self.execute('Any P WHERE A todo_by P, A eid %(x)s',
@@ -419,7 +426,7 @@
 
     def test_composite_entity(self):
         assert self.schema.rschema('use_email').fulltext_container == 'subject'
-        eid = self.add_entity('EmailAddress', address=u'toto@logilab.fr').eid
+        eid = self.request().create_entity('EmailAddress', address=u'toto@logilab.fr').eid
         self.commit()
         rset = self.execute('Any X WHERE X has_text %(t)s', {'t': 'toto'})
         self.assertEquals(rset.rows, [[eid]])
@@ -431,7 +438,7 @@
         self.commit()
         rset = self.execute('Any X WHERE X has_text %(t)s', {'t': 'toto'})
         self.assertEquals(rset.rows, [])
-        eid = self.add_entity('EmailAddress', address=u'tutu@logilab.fr').eid
+        eid = self.request().create_entity('EmailAddress', address=u'tutu@logilab.fr').eid
         self.execute('SET X use_email Y WHERE X login "admin", Y eid %(y)s', {'y': eid})
         self.commit()
         rset = self.execute('Any X WHERE X has_text %(t)s', {'t': 'tutu'})