server/test/unittest_repository.py
branchtls-sprint
changeset 1398 5fe84a5f7035
parent 1263 01152fffd593
child 1787 71c143c0ada3
equal deleted inserted replaced
1397:6cbc7bc8ea6d 1398:5fe84a5f7035
    44     def test_fill_schema(self):
    44     def test_fill_schema(self):
    45         self.repo.schema = CubicWebSchema(self.repo.config.appid)
    45         self.repo.schema = CubicWebSchema(self.repo.config.appid)
    46         self.repo.config._cubes = None # avoid assertion error
    46         self.repo.config._cubes = None # avoid assertion error
    47         self.repo.fill_schema()
    47         self.repo.fill_schema()
    48         pool = self.repo._get_pool()
    48         pool = self.repo._get_pool()
    49         table = SQL_PREFIX + 'EEType'
    49         table = SQL_PREFIX + 'CWEType'
    50         namecol = SQL_PREFIX + 'name'
    50         namecol = SQL_PREFIX + 'name'
    51         finalcol = SQL_PREFIX + 'final'
    51         finalcol = SQL_PREFIX + 'final'
    52         try:
    52         try:
    53             sqlcursor = pool['system']
    53             sqlcursor = pool['system']
    54             sqlcursor.execute('SELECT %s FROM %s WHERE %s is NULL' % (
    54             sqlcursor.execute('SELECT %s FROM %s WHERE %s is NULL' % (
    66             self.repo._free_pool(pool)
    66             self.repo._free_pool(pool)
    67             
    67             
    68     def test_schema_has_owner(self):
    68     def test_schema_has_owner(self):
    69         repo = self.repo
    69         repo = self.repo
    70         cnxid = repo.connect(*self.default_user_password())
    70         cnxid = repo.connect(*self.default_user_password())
    71         self.failIf(repo.execute(cnxid, 'EEType X WHERE NOT X owned_by U'))
    71         self.failIf(repo.execute(cnxid, 'CWEType X WHERE NOT X owned_by U'))
    72         self.failIf(repo.execute(cnxid, 'ERType X WHERE NOT X owned_by U'))
    72         self.failIf(repo.execute(cnxid, 'CWRType X WHERE NOT X owned_by U'))
    73         self.failIf(repo.execute(cnxid, 'EFRDef X WHERE NOT X owned_by U'))
    73         self.failIf(repo.execute(cnxid, 'CWAttribute X WHERE NOT X owned_by U'))
    74         self.failIf(repo.execute(cnxid, 'ENFRDef X WHERE NOT X owned_by U'))
    74         self.failIf(repo.execute(cnxid, 'CWRelation X WHERE NOT X owned_by U'))
    75         self.failIf(repo.execute(cnxid, 'EConstraint X WHERE NOT X owned_by U'))
    75         self.failIf(repo.execute(cnxid, 'CWConstraint X WHERE NOT X owned_by U'))
    76         self.failIf(repo.execute(cnxid, 'EConstraintType X WHERE NOT X owned_by U'))
    76         self.failIf(repo.execute(cnxid, 'CWConstraintType X WHERE NOT X owned_by U'))
    77         
    77         
    78     def test_connect(self):
    78     def test_connect(self):
    79         login, passwd = self.default_user_password()
    79         login, passwd = self.default_user_password()
    80         self.assert_(self.repo.connect(login, passwd))
    80         self.assert_(self.repo.connect(login, passwd))
    81         self.assertRaises(AuthenticationError,
    81         self.assertRaises(AuthenticationError,
    95         repo.close(cnxid)
    95         repo.close(cnxid)
    96         
    96         
    97     def test_login_upassword_accent(self):
    97     def test_login_upassword_accent(self):
    98         repo = self.repo
    98         repo = self.repo
    99         cnxid = repo.connect(*self.default_user_password())
    99         cnxid = repo.connect(*self.default_user_password())
   100         repo.execute(cnxid, 'INSERT EUser X: X login %(login)s, X upassword %(passwd)s, X in_state S, X in_group G WHERE S name "activated", G name "users"',
   100         repo.execute(cnxid, 'INSERT CWUser X: X login %(login)s, X upassword %(passwd)s, X in_state S, X in_group G WHERE S name "activated", G name "users"',
   101                      {'login': u"barnabé", 'passwd': u"héhéhé".encode('UTF8')})
   101                      {'login': u"barnabé", 'passwd': u"héhéhé".encode('UTF8')})
   102         repo.commit(cnxid)
   102         repo.commit(cnxid)
   103         repo.close(cnxid)
   103         repo.close(cnxid)
   104         self.assert_(repo.connect(u"barnabé", u"héhéhé".encode('UTF8')))
   104         self.assert_(repo.connect(u"barnabé", u"héhéhé".encode('UTF8')))
   105     
   105     
   106     def test_invalid_entity_rollback(self):
   106     def test_invalid_entity_rollback(self):
   107         repo = self.repo
   107         repo = self.repo
   108         cnxid = repo.connect(*self.default_user_password())
   108         cnxid = repo.connect(*self.default_user_password())
   109         repo.execute(cnxid, 'INSERT EUser X: X login %(login)s, X upassword %(passwd)s, X in_state S WHERE S name "activated"',
   109         repo.execute(cnxid, 'INSERT CWUser X: X login %(login)s, X upassword %(passwd)s, X in_state S WHERE S name "activated"',
   110                      {'login': u"tutetute", 'passwd': 'tutetute'})
   110                      {'login': u"tutetute", 'passwd': 'tutetute'})
   111         self.assertRaises(ValidationError, repo.commit, cnxid)
   111         self.assertRaises(ValidationError, repo.commit, cnxid)
   112         rset = repo.execute(cnxid, 'EUser X WHERE X login "tutetute"')
   112         rset = repo.execute(cnxid, 'CWUser X WHERE X login "tutetute"')
   113         self.assertEquals(rset.rowcount, 0)
   113         self.assertEquals(rset.rowcount, 0)
   114         
   114         
   115     def test_close(self):
   115     def test_close(self):
   116         repo = self.repo
   116         repo = self.repo
   117         cnxid = repo.connect(*self.default_user_password())
   117         cnxid = repo.connect(*self.default_user_password())
   199         self.skip('implement me')
   199         self.skip('implement me')
   200 
   200 
   201     def test_initial_schema(self):
   201     def test_initial_schema(self):
   202         schema = self.repo.schema
   202         schema = self.repo.schema
   203         # check order of attributes is respected
   203         # check order of attributes is respected
   204         self.assertListEquals([r.type for r in schema.eschema('EFRDef').ordered_relations()
   204         self.assertListEquals([r.type for r in schema.eschema('CWAttribute').ordered_relations()
   205                                if not r.type in ('eid', 'is', 'is_instance_of', 'identity', 
   205                                if not r.type in ('eid', 'is', 'is_instance_of', 'identity', 
   206                                                  'creation_date', 'modification_date',
   206                                                  'creation_date', 'modification_date',
   207                                                  'owned_by', 'created_by')],
   207                                                  'owned_by', 'created_by')],
   208                               ['relation_type', 'from_entity', 'to_entity', 'constrained_by',
   208                               ['relation_type', 'from_entity', 'to_entity', 'constrained_by',
   209                                'cardinality', 'ordernum', 
   209                                'cardinality', 'ordernum', 
   210                                'indexed', 'fulltextindexed', 'internationalizable',
   210                                'indexed', 'fulltextindexed', 'internationalizable',
   211                                'defaultval', 'description_format', 'description'])
   211                                'defaultval', 'description_format', 'description'])
   212 
   212 
   213         self.assertEquals(schema.eschema('EEType').main_attribute(), 'name')
   213         self.assertEquals(schema.eschema('CWEType').main_attribute(), 'name')
   214         self.assertEquals(schema.eschema('State').main_attribute(), 'name')
   214         self.assertEquals(schema.eschema('State').main_attribute(), 'name')
   215 
   215 
   216         constraints = schema.rschema('name').rproperty('EEType', 'String', 'constraints')
   216         constraints = schema.rschema('name').rproperty('CWEType', 'String', 'constraints')
   217         self.assertEquals(len(constraints), 2)
   217         self.assertEquals(len(constraints), 2)
   218         for cstr in constraints[:]:
   218         for cstr in constraints[:]:
   219             if isinstance(cstr, UniqueConstraint):
   219             if isinstance(cstr, UniqueConstraint):
   220                 constraints.remove(cstr)
   220                 constraints.remove(cstr)
   221                 break
   221                 break
   223             self.fail('unique constraint not found')
   223             self.fail('unique constraint not found')
   224         sizeconstraint = constraints[0]
   224         sizeconstraint = constraints[0]
   225         self.assertEquals(sizeconstraint.min, None)
   225         self.assertEquals(sizeconstraint.min, None)
   226         self.assertEquals(sizeconstraint.max, 64)
   226         self.assertEquals(sizeconstraint.max, 64)
   227 
   227 
   228         constraints = schema.rschema('relation_type').rproperty('EFRDef', 'ERType', 'constraints')
   228         constraints = schema.rschema('relation_type').rproperty('CWAttribute', 'CWRType', 'constraints')
   229         self.assertEquals(len(constraints), 1)
   229         self.assertEquals(len(constraints), 1)
   230         cstr = constraints[0]
   230         cstr = constraints[0]
   231         self.assert_(isinstance(cstr, RQLConstraint))
   231         self.assert_(isinstance(cstr, RQLConstraint))
   232         self.assertEquals(cstr.restriction, 'O final TRUE')
   232         self.assertEquals(cstr.restriction, 'O final TRUE')
   233 
   233 
   234         ownedby = schema.rschema('owned_by')
   234         ownedby = schema.rschema('owned_by')
   235         self.assertEquals(ownedby.objects('EEType'), ('EUser',))
   235         self.assertEquals(ownedby.objects('CWEType'), ('CWUser',))
   236 
   236 
   237     def test_pyro(self):
   237     def test_pyro(self):
   238         import Pyro
   238         import Pyro
   239         Pyro.config.PYRO_MULTITHREADED = 0
   239         Pyro.config.PYRO_MULTITHREADED = 0
   240         lock = threading.Lock()
   240         lock = threading.Lock()
   265 
   265 
   266     def test_internal_api(self):
   266     def test_internal_api(self):
   267         repo = self.repo
   267         repo = self.repo
   268         cnxid = repo.connect(*self.default_user_password())
   268         cnxid = repo.connect(*self.default_user_password())
   269         session = repo._get_session(cnxid, setpool=True)
   269         session = repo._get_session(cnxid, setpool=True)
   270         self.assertEquals(repo.type_and_source_from_eid(1, session), ('EGroup', 'system', None))
   270         self.assertEquals(repo.type_and_source_from_eid(1, session), ('CWGroup', 'system', None))
   271         self.assertEquals(repo.type_from_eid(1, session), 'EGroup')
   271         self.assertEquals(repo.type_from_eid(1, session), 'CWGroup')
   272         self.assertEquals(repo.source_from_eid(1, session).uri, 'system')
   272         self.assertEquals(repo.source_from_eid(1, session).uri, 'system')
   273         self.assertEquals(repo.eid2extid(repo.system_source, 1, session), None)
   273         self.assertEquals(repo.eid2extid(repo.system_source, 1, session), None)
   274         class dummysource: uri = 'toto'
   274         class dummysource: uri = 'toto'
   275         self.assertRaises(UnknownEid, repo.eid2extid, dummysource, 1, session)
   275         self.assertRaises(UnknownEid, repo.eid2extid, dummysource, 1, session)
   276 
   276 
   277     def test_public_api(self):
   277     def test_public_api(self):
   278         self.assertEquals(self.repo.get_schema(), self.repo.schema)
   278         self.assertEquals(self.repo.get_schema(), self.repo.schema)
   279         self.assertEquals(self.repo.source_defs(), {'system': {'adapter': 'native', 'uri': 'system'}})
   279         self.assertEquals(self.repo.source_defs(), {'system': {'adapter': 'native', 'uri': 'system'}})
   280         # .properties() return a result set
   280         # .properties() return a result set
   281         self.assertEquals(self.repo.properties().rql, 'Any K,V WHERE P is EProperty,P pkey K, P value V, NOT P for_user U')
   281         self.assertEquals(self.repo.properties().rql, 'Any K,V WHERE P is CWProperty,P pkey K, P value V, NOT P for_user U')
   282 
   282 
   283     def test_session_api(self):
   283     def test_session_api(self):
   284         repo = self.repo
   284         repo = self.repo
   285         cnxid = repo.connect(*self.default_user_password())
   285         cnxid = repo.connect(*self.default_user_password())
   286         self.assertEquals(repo.user_info(cnxid), (5, 'admin', set([u'managers']), {}))
   286         self.assertEquals(repo.user_info(cnxid), (5, 'admin', set([u'managers']), {}))
   287         self.assertEquals(repo.describe(cnxid, 1), (u'EGroup', u'system', None))
   287         self.assertEquals(repo.describe(cnxid, 1), (u'CWGroup', u'system', None))
   288         repo.close(cnxid)
   288         repo.close(cnxid)
   289         self.assertRaises(BadConnectionId, repo.user_info, cnxid)
   289         self.assertRaises(BadConnectionId, repo.user_info, cnxid)
   290         self.assertRaises(BadConnectionId, repo.describe, cnxid, 1)
   290         self.assertRaises(BadConnectionId, repo.describe, cnxid, 1)
   291 
   291 
   292     def test_shared_data_api(self):
   292     def test_shared_data_api(self):
   323 
   323 
   324     def test_source_from_eid_raise(self):
   324     def test_source_from_eid_raise(self):
   325         self.assertRaises(UnknownEid, self.repo.source_from_eid, -2, self.session)
   325         self.assertRaises(UnknownEid, self.repo.source_from_eid, -2, self.session)
   326 
   326 
   327     def test_type_from_eid(self):
   327     def test_type_from_eid(self):
   328         self.assertEquals(self.repo.type_from_eid(1, self.session), 'EGroup')
   328         self.assertEquals(self.repo.type_from_eid(1, self.session), 'CWGroup')
   329         
   329         
   330     def test_type_from_eid_raise(self):
   330     def test_type_from_eid_raise(self):
   331         self.assertRaises(UnknownEid, self.repo.type_from_eid, -2, self.session)
   331         self.assertRaises(UnknownEid, self.repo.type_from_eid, -2, self.session)
   332         
   332         
   333     def test_add_delete_info(self):
   333     def test_add_delete_info(self):
   447         
   447         
   448     def test_after_add_inline(self):
   448     def test_after_add_inline(self):
   449         """make sure after_<event>_relation hooks are deferred"""
   449         """make sure after_<event>_relation hooks are deferred"""
   450         self.hm.register_hook(self._after_relation_hook,
   450         self.hm.register_hook(self._after_relation_hook,
   451                              'after_add_relation', 'in_state')
   451                              'after_add_relation', 'in_state')
   452         eidp = self.execute('INSERT EUser X: X login "toto", X upassword "tutu", X in_state S WHERE S name "activated"')[0][0]
   452         eidp = self.execute('INSERT CWUser X: X login "toto", X upassword "tutu", X in_state S WHERE S name "activated"')[0][0]
   453         eids = self.execute('State X WHERE X name "activated"')[0][0]
   453         eids = self.execute('State X WHERE X name "activated"')[0][0]
   454         self.assertEquals(self.called, [(eidp, 'in_state', eids,)])
   454         self.assertEquals(self.called, [(eidp, 'in_state', eids,)])
   455     
   455     
   456     def test_before_delete_inline_relation(self):
   456     def test_before_delete_inline_relation(self):
   457         """make sure before_<event>_relation hooks are called directly"""
   457         """make sure before_<event>_relation hooks are called directly"""