server/test/unittest_repository.py
changeset 6366 1806148d6ce8
parent 6279 42079f752a9c
parent 6364 ad9ed9803eb6
child 6401 d7f5d873e1b8
equal deleted inserted replaced
6333:e3994fcc21c3 6366:1806148d6ce8
    30 from logilab.common.testlib import TestCase, unittest_main
    30 from logilab.common.testlib import TestCase, unittest_main
    31 
    31 
    32 from yams.constraints import UniqueConstraint
    32 from yams.constraints import UniqueConstraint
    33 
    33 
    34 from cubicweb import (BadConnectionId, RepositoryError, ValidationError,
    34 from cubicweb import (BadConnectionId, RepositoryError, ValidationError,
    35                       UnknownEid, AuthenticationError)
    35                       UnknownEid, AuthenticationError, Unauthorized)
    36 from cubicweb.selectors import is_instance
    36 from cubicweb.selectors import is_instance
    37 from cubicweb.schema import CubicWebSchema, RQLConstraint
    37 from cubicweb.schema import CubicWebSchema, RQLConstraint
    38 from cubicweb.dbapi import connect, multiple_connections_unfix
    38 from cubicweb.dbapi import connect, multiple_connections_unfix
    39 from cubicweb.devtools.testlib import CubicWebTC
    39 from cubicweb.devtools.testlib import CubicWebTC
    40 from cubicweb.devtools.repotest import tuplify
    40 from cubicweb.devtools.repotest import tuplify
    63             namecol = SQL_PREFIX + 'name'
    63             namecol = SQL_PREFIX + 'name'
    64             finalcol = SQL_PREFIX + 'final'
    64             finalcol = SQL_PREFIX + 'final'
    65             self.session.set_pool()
    65             self.session.set_pool()
    66             cu = self.session.system_sql('SELECT %s FROM %s WHERE %s is NULL' % (
    66             cu = self.session.system_sql('SELECT %s FROM %s WHERE %s is NULL' % (
    67                 namecol, table, finalcol))
    67                 namecol, table, finalcol))
    68             self.assertEquals(cu.fetchall(), [])
    68             self.assertEqual(cu.fetchall(), [])
    69             cu = self.session.system_sql('SELECT %s FROM %s WHERE %s=%%(final)s ORDER BY %s'
    69             cu = self.session.system_sql('SELECT %s FROM %s WHERE %s=%%(final)s ORDER BY %s'
    70                                          % (namecol, table, finalcol, namecol), {'final': 'TRUE'})
    70                                          % (namecol, table, finalcol, namecol), {'final': 'TRUE'})
    71             self.assertEquals(cu.fetchall(), [(u'Boolean',), (u'Bytes',),
    71             self.assertEqual(cu.fetchall(), [(u'Boolean',), (u'Bytes',),
    72                                               (u'Date',), (u'Datetime',),
    72                                               (u'Date',), (u'Datetime',),
    73                                               (u'Decimal',),(u'Float',),
    73                                               (u'Decimal',),(u'Float',),
    74                                               (u'Int',),
    74                                               (u'Int',),
    75                                               (u'Interval',), (u'Password',),
    75                                               (u'Interval',), (u'Password',),
    76                                               (u'String',), (u'Time',)])
    76                                               (u'String',), (u'Time',)])
    82                    "  AND cstr.cw_constraint_of = etype.cw_eid "
    82                    "  AND cstr.cw_constraint_of = etype.cw_eid "
    83                    "  AND etype.cw_name = 'Personne' "
    83                    "  AND etype.cw_name = 'Personne' "
    84                    ";")
    84                    ";")
    85             cu = self.session.system_sql(sql)
    85             cu = self.session.system_sql(sql)
    86             rows = cu.fetchall()
    86             rows = cu.fetchall()
    87             self.assertEquals(len(rows), 3)
    87             self.assertEqual(len(rows), 3)
    88             self.test_unique_together()
    88             self.test_unique_together()
    89         finally:
    89         finally:
    90             self.repo.set_schema(origshema)
    90             self.repo.set_schema(origshema)
    91 
    91 
    92     def test_unique_together(self):
    92     def test_unique_together(self):
    93         person = self.repo.schema.eschema('Personne')
    93         person = self.repo.schema.eschema('Personne')
    94         self.assertEquals(len(person._unique_together), 1)
    94         self.assertEqual(len(person._unique_together), 1)
    95         self.assertUnorderedIterableEquals(person._unique_together[0],
    95         self.assertItemsEqual(person._unique_together[0],
    96                                            ('nom', 'prenom', 'inline2'))
    96                                            ('nom', 'prenom', 'inline2'))
    97 
    97 
    98     def test_schema_has_owner(self):
    98     def test_schema_has_owner(self):
    99         repo = self.repo
    99         repo = self.repo
   100         cnxid = repo.connect(self.admlogin, password=self.admpassword)
   100         cnxid = repo.connect(self.admlogin, password=self.admpassword)
   134                      {'login': u"barnabé", 'passwd': u"héhéhé".encode('UTF8')})
   134                      {'login': u"barnabé", 'passwd': u"héhéhé".encode('UTF8')})
   135         repo.commit(cnxid)
   135         repo.commit(cnxid)
   136         repo.close(cnxid)
   136         repo.close(cnxid)
   137         self.assert_(repo.connect(u"barnabé", password=u"héhéhé".encode('UTF8')))
   137         self.assert_(repo.connect(u"barnabé", password=u"héhéhé".encode('UTF8')))
   138 
   138 
   139     def test_invalid_entity_rollback(self):
   139     def test_rollback_on_commit_error(self):
   140         cnxid = self.repo.connect(self.admlogin, password=self.admpassword)
   140         cnxid = self.repo.connect(self.admlogin, password=self.admpassword)
   141         # no group
       
   142         self.repo.execute(cnxid,
   141         self.repo.execute(cnxid,
   143                           'INSERT CWUser X: X login %(login)s, X upassword %(passwd)s',
   142                           'INSERT CWUser X: X login %(login)s, X upassword %(passwd)s',
   144                           {'login': u"tutetute", 'passwd': 'tutetute'})
   143                           {'login': u"tutetute", 'passwd': 'tutetute'})
   145         self.assertRaises(ValidationError, self.repo.commit, cnxid)
   144         self.assertRaises(ValidationError, self.repo.commit, cnxid)
   146         self.failIf(self.repo.execute(cnxid, 'CWUser X WHERE X login "tutetute"'))
   145         self.failIf(self.repo.execute(cnxid, 'CWUser X WHERE X login "tutetute"'))
   147 
   146 
       
   147     def test_rollback_on_execute_validation_error(self):
       
   148         class ValidationErrorAfterHook(Hook):
       
   149             __regid__ = 'valerror-after-hook'
       
   150             __select__ = Hook.__select__ & is_instance('CWGroup')
       
   151             events = ('after_update_entity',)
       
   152             def __call__(self):
       
   153                 raise ValidationError(self.entity.eid, {})
       
   154         with self.temporary_appobjects(ValidationErrorAfterHook):
       
   155             self.assertRaises(ValidationError,
       
   156                               self.execute, 'SET X name "toto" WHERE X is CWGroup, X name "guests"')
       
   157             self.failIf(self.execute('Any X WHERE X is CWGroup, X name "toto"'))
       
   158 
       
   159     def test_rollback_on_execute_unauthorized(self):
       
   160         class UnauthorizedAfterHook(Hook):
       
   161             __regid__ = 'unauthorized-after-hook'
       
   162             __select__ = Hook.__select__ & is_instance('CWGroup')
       
   163             events = ('after_update_entity',)
       
   164             def __call__(self):
       
   165                 raise Unauthorized()
       
   166         with self.temporary_appobjects(UnauthorizedAfterHook):
       
   167             self.assertRaises(Unauthorized,
       
   168                               self.execute, 'SET X name "toto" WHERE X is CWGroup, X name "guests"')
       
   169             self.failIf(self.execute('Any X WHERE X is CWGroup, X name "toto"'))
       
   170 
       
   171 
   148     def test_close(self):
   172     def test_close(self):
   149         repo = self.repo
   173         repo = self.repo
   150         cnxid = repo.connect(self.admlogin, password=self.admpassword)
   174         cnxid = repo.connect(self.admlogin, password=self.admpassword)
   151         self.assert_(cnxid)
   175         self.assert_(cnxid)
   152         repo.close(cnxid)
   176         repo.close(cnxid)
   159     def test_shared_data(self):
   183     def test_shared_data(self):
   160         repo = self.repo
   184         repo = self.repo
   161         cnxid = repo.connect(self.admlogin, password=self.admpassword)
   185         cnxid = repo.connect(self.admlogin, password=self.admpassword)
   162         repo.set_shared_data(cnxid, 'data', 4)
   186         repo.set_shared_data(cnxid, 'data', 4)
   163         cnxid2 = repo.connect(self.admlogin, password=self.admpassword)
   187         cnxid2 = repo.connect(self.admlogin, password=self.admpassword)
   164         self.assertEquals(repo.get_shared_data(cnxid, 'data'), 4)
   188         self.assertEqual(repo.get_shared_data(cnxid, 'data'), 4)
   165         self.assertEquals(repo.get_shared_data(cnxid2, 'data'), None)
   189         self.assertEqual(repo.get_shared_data(cnxid2, 'data'), None)
   166         repo.set_shared_data(cnxid2, 'data', 5)
   190         repo.set_shared_data(cnxid2, 'data', 5)
   167         self.assertEquals(repo.get_shared_data(cnxid, 'data'), 4)
   191         self.assertEqual(repo.get_shared_data(cnxid, 'data'), 4)
   168         self.assertEquals(repo.get_shared_data(cnxid2, 'data'), 5)
   192         self.assertEqual(repo.get_shared_data(cnxid2, 'data'), 5)
   169         repo.get_shared_data(cnxid2, 'data', pop=True)
   193         repo.get_shared_data(cnxid2, 'data', pop=True)
   170         self.assertEquals(repo.get_shared_data(cnxid, 'data'), 4)
   194         self.assertEqual(repo.get_shared_data(cnxid, 'data'), 4)
   171         self.assertEquals(repo.get_shared_data(cnxid2, 'data'), None)
   195         self.assertEqual(repo.get_shared_data(cnxid2, 'data'), None)
   172         repo.close(cnxid)
   196         repo.close(cnxid)
   173         repo.close(cnxid2)
   197         repo.close(cnxid2)
   174         self.assertRaises(BadConnectionId, repo.get_shared_data, cnxid, 'data')
   198         self.assertRaises(BadConnectionId, repo.get_shared_data, cnxid, 'data')
   175         self.assertRaises(BadConnectionId, repo.get_shared_data, cnxid2, 'data')
   199         self.assertRaises(BadConnectionId, repo.get_shared_data, cnxid2, 'data')
   176         self.assertRaises(BadConnectionId, repo.set_shared_data, cnxid, 'data', 1)
   200         self.assertRaises(BadConnectionId, repo.set_shared_data, cnxid, 'data', 1)
   186     def test_transaction_base(self):
   210     def test_transaction_base(self):
   187         repo = self.repo
   211         repo = self.repo
   188         cnxid = repo.connect(self.admlogin, password=self.admpassword)
   212         cnxid = repo.connect(self.admlogin, password=self.admpassword)
   189         # check db state
   213         # check db state
   190         result = repo.execute(cnxid, 'Personne X')
   214         result = repo.execute(cnxid, 'Personne X')
   191         self.assertEquals(result.rowcount, 0)
   215         self.assertEqual(result.rowcount, 0)
   192         # rollback entity insertion
   216         # rollback entity insertion
   193         repo.execute(cnxid, "INSERT Personne X: X nom 'bidule'")
   217         repo.execute(cnxid, "INSERT Personne X: X nom 'bidule'")
   194         result = repo.execute(cnxid, 'Personne X')
   218         result = repo.execute(cnxid, 'Personne X')
   195         self.assertEquals(result.rowcount, 1)
   219         self.assertEqual(result.rowcount, 1)
   196         repo.rollback(cnxid)
   220         repo.rollback(cnxid)
   197         result = repo.execute(cnxid, 'Personne X')
   221         result = repo.execute(cnxid, 'Personne X')
   198         self.assertEquals(result.rowcount, 0, result.rows)
   222         self.assertEqual(result.rowcount, 0, result.rows)
   199         # commit
   223         # commit
   200         repo.execute(cnxid, "INSERT Personne X: X nom 'bidule'")
   224         repo.execute(cnxid, "INSERT Personne X: X nom 'bidule'")
   201         repo.commit(cnxid)
   225         repo.commit(cnxid)
   202         result = repo.execute(cnxid, 'Personne X')
   226         result = repo.execute(cnxid, 'Personne X')
   203         self.assertEquals(result.rowcount, 1)
   227         self.assertEqual(result.rowcount, 1)
   204 
   228 
   205     def test_transaction_base2(self):
   229     def test_transaction_base2(self):
   206         repo = self.repo
   230         repo = self.repo
   207         cnxid = repo.connect(self.admlogin, password=self.admpassword)
   231         cnxid = repo.connect(self.admlogin, password=self.admpassword)
   208         # rollback relation insertion
   232         # rollback relation insertion
   209         repo.execute(cnxid, "SET U in_group G WHERE U login 'admin', G name 'guests'")
   233         repo.execute(cnxid, "SET U in_group G WHERE U login 'admin', G name 'guests'")
   210         result = repo.execute(cnxid, "Any U WHERE U in_group G, U login 'admin', G name 'guests'")
   234         result = repo.execute(cnxid, "Any U WHERE U in_group G, U login 'admin', G name 'guests'")
   211         self.assertEquals(result.rowcount, 1)
   235         self.assertEqual(result.rowcount, 1)
   212         repo.rollback(cnxid)
   236         repo.rollback(cnxid)
   213         result = repo.execute(cnxid, "Any U WHERE U in_group G, U login 'admin', G name 'guests'")
   237         result = repo.execute(cnxid, "Any U WHERE U in_group G, U login 'admin', G name 'guests'")
   214         self.assertEquals(result.rowcount, 0, result.rows)
   238         self.assertEqual(result.rowcount, 0, result.rows)
   215 
   239 
   216     def test_transaction_base3(self):
   240     def test_transaction_base3(self):
   217         repo = self.repo
   241         repo = self.repo
   218         cnxid = repo.connect(self.admlogin, password=self.admpassword)
   242         cnxid = repo.connect(self.admlogin, password=self.admpassword)
   219         # rollback state change which trigger TrInfo insertion
   243         # rollback state change which trigger TrInfo insertion
   220         session = repo._get_session(cnxid)
   244         session = repo._get_session(cnxid)
   221         session.set_pool()
   245         session.set_pool()
   222         user = session.user
   246         user = session.user
   223         user.cw_adapt_to('IWorkflowable').fire_transition('deactivate')
   247         user.cw_adapt_to('IWorkflowable').fire_transition('deactivate')
   224         rset = repo.execute(cnxid, 'TrInfo T WHERE T wf_info_for X, X eid %(x)s', {'x': user.eid})
   248         rset = repo.execute(cnxid, 'TrInfo T WHERE T wf_info_for X, X eid %(x)s', {'x': user.eid})
   225         self.assertEquals(len(rset), 1)
   249         self.assertEqual(len(rset), 1)
   226         repo.rollback(cnxid)
   250         repo.rollback(cnxid)
   227         rset = repo.execute(cnxid, 'TrInfo T WHERE T wf_info_for X, X eid %(x)s', {'x': user.eid})
   251         rset = repo.execute(cnxid, 'TrInfo T WHERE T wf_info_for X, X eid %(x)s', {'x': user.eid})
   228         self.assertEquals(len(rset), 0)
   252         self.assertEqual(len(rset), 0)
   229 
   253 
   230     def test_transaction_interleaved(self):
   254     def test_transaction_interleaved(self):
   231         self.skip('implement me')
   255         self.skipTest('implement me')
   232 
   256 
   233     def test_close_kill_processing_request(self):
   257     def test_close_kill_processing_request(self):
   234         repo = self.repo
   258         repo = self.repo
   235         cnxid = repo.connect(self.admlogin, password=self.admpassword)
   259         cnxid = repo.connect(self.admlogin, password=self.admpassword)
   236         repo.execute(cnxid, 'INSERT CWUser X: X login "toto", X upassword "tutu", X in_group G WHERE G name "users"')
   260         repo.execute(cnxid, 'INSERT CWUser X: X login "toto", X upassword "tutu", X in_group G WHERE G name "users"')
   244         def run_transaction():
   268         def run_transaction():
   245             repo.execute(cnxid, 'DELETE CWUser X WHERE X login "toto"')
   269             repo.execute(cnxid, 'DELETE CWUser X WHERE X login "toto"')
   246             repo.commit(cnxid)
   270             repo.commit(cnxid)
   247         try:
   271         try:
   248             ex = self.assertRaises(Exception, run_transaction)
   272             ex = self.assertRaises(Exception, run_transaction)
   249             self.assertEquals(str(ex), 'try to access pool on a closed session')
   273             self.assertEqual(str(ex), 'try to access pool on a closed session')
   250         finally:
   274         finally:
   251             t.join()
   275             t.join()
   252 
   276 
   253     def test_initial_schema(self):
   277     def test_initial_schema(self):
   254         schema = self.repo.schema
   278         schema = self.repo.schema
   255         # check order of attributes is respected
   279         # check order of attributes is respected
   256         self.assertListEquals([r.type for r in schema.eschema('CWAttribute').ordered_relations()
   280         self.assertListEqual([r.type for r in schema.eschema('CWAttribute').ordered_relations()
   257                                if not r.type in ('eid', 'is', 'is_instance_of', 'identity',
   281                                if not r.type in ('eid', 'is', 'is_instance_of', 'identity',
   258                                                  'creation_date', 'modification_date', 'cwuri',
   282                                                  'creation_date', 'modification_date', 'cwuri',
   259                                                  'owned_by', 'created_by',
   283                                                  'owned_by', 'created_by',
   260                                                  'update_permission', 'read_permission',
   284                                                  'update_permission', 'read_permission',
   261                                                  'in_basket')],
   285                                                  'in_basket')],
   264                                'constrained_by',
   288                                'constrained_by',
   265                                'cardinality', 'ordernum',
   289                                'cardinality', 'ordernum',
   266                                'indexed', 'fulltextindexed', 'internationalizable',
   290                                'indexed', 'fulltextindexed', 'internationalizable',
   267                                'defaultval', 'description', 'description_format'])
   291                                'defaultval', 'description', 'description_format'])
   268 
   292 
   269         self.assertEquals(schema.eschema('CWEType').main_attribute(), 'name')
   293         self.assertEqual(schema.eschema('CWEType').main_attribute(), 'name')
   270         self.assertEquals(schema.eschema('State').main_attribute(), 'name')
   294         self.assertEqual(schema.eschema('State').main_attribute(), 'name')
   271 
   295 
   272         constraints = schema.rschema('name').rdef('CWEType', 'String').constraints
   296         constraints = schema.rschema('name').rdef('CWEType', 'String').constraints
   273         self.assertEquals(len(constraints), 2)
   297         self.assertEqual(len(constraints), 2)
   274         for cstr in constraints[:]:
   298         for cstr in constraints[:]:
   275             if isinstance(cstr, UniqueConstraint):
   299             if isinstance(cstr, UniqueConstraint):
   276                 constraints.remove(cstr)
   300                 constraints.remove(cstr)
   277                 break
   301                 break
   278         else:
   302         else:
   279             self.fail('unique constraint not found')
   303             self.fail('unique constraint not found')
   280         sizeconstraint = constraints[0]
   304         sizeconstraint = constraints[0]
   281         self.assertEquals(sizeconstraint.min, None)
   305         self.assertEqual(sizeconstraint.min, None)
   282         self.assertEquals(sizeconstraint.max, 64)
   306         self.assertEqual(sizeconstraint.max, 64)
   283 
   307 
   284         constraints = schema.rschema('relation_type').rdef('CWAttribute', 'CWRType').constraints
   308         constraints = schema.rschema('relation_type').rdef('CWAttribute', 'CWRType').constraints
   285         self.assertEquals(len(constraints), 1)
   309         self.assertEqual(len(constraints), 1)
   286         cstr = constraints[0]
   310         cstr = constraints[0]
   287         self.assert_(isinstance(cstr, RQLConstraint))
   311         self.assert_(isinstance(cstr, RQLConstraint))
   288         self.assertEquals(cstr.restriction, 'O final TRUE')
   312         self.assertEqual(cstr.restriction, 'O final TRUE')
   289 
   313 
   290         ownedby = schema.rschema('owned_by')
   314         ownedby = schema.rschema('owned_by')
   291         self.assertEquals(ownedby.objects('CWEType'), ('CWUser',))
   315         self.assertEqual(ownedby.objects('CWEType'), ('CWUser',))
   292 
   316 
   293     def test_pyro(self):
   317     def test_pyro(self):
   294         import Pyro
   318         import Pyro
   295         Pyro.config.PYRO_MULTITHREADED = 0
   319         Pyro.config.PYRO_MULTITHREADED = 0
   296         done = []
   320         done = []
   317             cnx.load_appobjects(subpath=('entities',))
   341             cnx.load_appobjects(subpath=('entities',))
   318             # check we can get the schema
   342             # check we can get the schema
   319             schema = cnx.get_schema()
   343             schema = cnx.get_schema()
   320             self.failUnless(cnx.vreg)
   344             self.failUnless(cnx.vreg)
   321             self.failUnless('etypes'in cnx.vreg)
   345             self.failUnless('etypes'in cnx.vreg)
   322             self.assertEquals(schema.__hashmode__, None)
   346             self.assertEqual(schema.__hashmode__, None)
   323             cu = cnx.cursor()
   347             cu = cnx.cursor()
   324             rset = cu.execute('Any U,G WHERE U in_group G')
   348             rset = cu.execute('Any U,G WHERE U in_group G')
   325             user = iter(rset.entities()).next()
   349             user = iter(rset.entities()).next()
   326             self.failUnless(user._cw)
   350             self.failUnless(user._cw)
   327             self.failUnless(user._cw.vreg)
   351             self.failUnless(user._cw.vreg)
   335 
   359 
   336     def test_internal_api(self):
   360     def test_internal_api(self):
   337         repo = self.repo
   361         repo = self.repo
   338         cnxid = repo.connect(self.admlogin, password=self.admpassword)
   362         cnxid = repo.connect(self.admlogin, password=self.admpassword)
   339         session = repo._get_session(cnxid, setpool=True)
   363         session = repo._get_session(cnxid, setpool=True)
   340         self.assertEquals(repo.type_and_source_from_eid(1, session),
   364         self.assertEqual(repo.type_and_source_from_eid(1, session),
   341                           ('CWGroup', 'system', None))
   365                           ('CWGroup', 'system', None))
   342         self.assertEquals(repo.type_from_eid(1, session), 'CWGroup')
   366         self.assertEqual(repo.type_from_eid(1, session), 'CWGroup')
   343         self.assertEquals(repo.source_from_eid(1, session).uri, 'system')
   367         self.assertEqual(repo.source_from_eid(1, session).uri, 'system')
   344         self.assertEquals(repo.eid2extid(repo.system_source, 1, session), None)
   368         self.assertEqual(repo.eid2extid(repo.system_source, 1, session), None)
   345         class dummysource: uri = 'toto'
   369         class dummysource: uri = 'toto'
   346         self.assertRaises(UnknownEid, repo.eid2extid, dummysource, 1, session)
   370         self.assertRaises(UnknownEid, repo.eid2extid, dummysource, 1, session)
   347 
   371 
   348     def test_public_api(self):
   372     def test_public_api(self):
   349         self.assertEquals(self.repo.get_schema(), self.repo.schema)
   373         self.assertEqual(self.repo.get_schema(), self.repo.schema)
   350         self.assertEquals(self.repo.source_defs(), {'system': {'adapter': 'native', 'uri': 'system'}})
   374         self.assertEqual(self.repo.source_defs(), {'system': {'adapter': 'native', 'uri': 'system'}})
   351         # .properties() return a result set
   375         # .properties() return a result set
   352         self.assertEquals(self.repo.properties().rql, 'Any K,V WHERE P is CWProperty,P pkey K, P value V, NOT P for_user U')
   376         self.assertEqual(self.repo.properties().rql, 'Any K,V WHERE P is CWProperty,P pkey K, P value V, NOT P for_user U')
   353 
   377 
   354     def test_session_api(self):
   378     def test_session_api(self):
   355         repo = self.repo
   379         repo = self.repo
   356         cnxid = repo.connect(self.admlogin, password=self.admpassword)
   380         cnxid = repo.connect(self.admlogin, password=self.admpassword)
   357         self.assertEquals(repo.user_info(cnxid), (5, 'admin', set([u'managers']), {}))
   381         self.assertEqual(repo.user_info(cnxid), (5, 'admin', set([u'managers']), {}))
   358         self.assertEquals(repo.describe(cnxid, 1), (u'CWGroup', u'system', None))
   382         self.assertEqual(repo.describe(cnxid, 1), (u'CWGroup', u'system', None))
   359         repo.close(cnxid)
   383         repo.close(cnxid)
   360         self.assertRaises(BadConnectionId, repo.user_info, cnxid)
   384         self.assertRaises(BadConnectionId, repo.user_info, cnxid)
   361         self.assertRaises(BadConnectionId, repo.describe, cnxid, 1)
   385         self.assertRaises(BadConnectionId, repo.describe, cnxid, 1)
   362 
   386 
   363     def test_shared_data_api(self):
   387     def test_shared_data_api(self):
   364         repo = self.repo
   388         repo = self.repo
   365         cnxid = repo.connect(self.admlogin, password=self.admpassword)
   389         cnxid = repo.connect(self.admlogin, password=self.admpassword)
   366         self.assertEquals(repo.get_shared_data(cnxid, 'data'), None)
   390         self.assertEqual(repo.get_shared_data(cnxid, 'data'), None)
   367         repo.set_shared_data(cnxid, 'data', 4)
   391         repo.set_shared_data(cnxid, 'data', 4)
   368         self.assertEquals(repo.get_shared_data(cnxid, 'data'), 4)
   392         self.assertEqual(repo.get_shared_data(cnxid, 'data'), 4)
   369         repo.get_shared_data(cnxid, 'data', pop=True)
   393         repo.get_shared_data(cnxid, 'data', pop=True)
   370         repo.get_shared_data(cnxid, 'whatever', pop=True)
   394         repo.get_shared_data(cnxid, 'whatever', pop=True)
   371         self.assertEquals(repo.get_shared_data(cnxid, 'data'), None)
   395         self.assertEqual(repo.get_shared_data(cnxid, 'data'), None)
   372         repo.close(cnxid)
   396         repo.close(cnxid)
   373         self.assertRaises(BadConnectionId, repo.set_shared_data, cnxid, 'data', 0)
   397         self.assertRaises(BadConnectionId, repo.set_shared_data, cnxid, 'data', 0)
   374         self.assertRaises(BadConnectionId, repo.get_shared_data, cnxid, 'data')
   398         self.assertRaises(BadConnectionId, repo.get_shared_data, cnxid, 'data')
   375 
   399 
   376     def test_schema_is_relation(self):
   400     def test_schema_is_relation(self):
   392         p1 = self.request().create_entity('Personne', nom=u'toto')
   416         p1 = self.request().create_entity('Personne', nom=u'toto')
   393         self.execute('SET A todo_by P WHERE A eid %(x)s, P eid %(p)s',
   417         self.execute('SET A todo_by P WHERE A eid %(x)s, P eid %(p)s',
   394                      {'x': note.eid, 'p': p1.eid})
   418                      {'x': note.eid, 'p': p1.eid})
   395         rset = self.execute('Any P WHERE A todo_by P, A eid %(x)s',
   419         rset = self.execute('Any P WHERE A todo_by P, A eid %(x)s',
   396                             {'x': note.eid})
   420                             {'x': note.eid})
   397         self.assertEquals(len(rset), 1)
   421         self.assertEqual(len(rset), 1)
   398         p2 = self.request().create_entity('Personne', nom=u'tutu')
   422         p2 = self.request().create_entity('Personne', nom=u'tutu')
   399         self.execute('SET A todo_by P WHERE A eid %(x)s, P eid %(p)s',
   423         self.execute('SET A todo_by P WHERE A eid %(x)s, P eid %(p)s',
   400                      {'x': note.eid, 'p': p2.eid})
   424                      {'x': note.eid, 'p': p2.eid})
   401         rset = self.execute('Any P WHERE A todo_by P, A eid %(x)s',
   425         rset = self.execute('Any P WHERE A todo_by P, A eid %(x)s',
   402                             {'x': note.eid})
   426                             {'x': note.eid})
   403         self.assertEquals(len(rset), 1)
   427         self.assertEqual(len(rset), 1)
   404         self.assertEquals(rset.rows[0][0], p2.eid)
   428         self.assertEqual(rset.rows[0][0], p2.eid)
   405 
   429 
   406     def test_delete_if_object_inlined_singlecard(self):
   430     def test_delete_if_object_inlined_singlecard(self):
   407         req = self.request()
   431         req = self.request()
   408         c = req.create_entity('Card', title=u'Carte')
   432         c = req.create_entity('Card', title=u'Carte')
   409         req.create_entity('Personne', nom=u'Vincent', fiche=c)
   433         req.create_entity('Personne', nom=u'Vincent', fiche=c)
   410         req.create_entity('Personne', nom=u'Florent', fiche=c)
   434         req.create_entity('Personne', nom=u'Florent', fiche=c)
   411         self.commit()
   435         self.commit()
   412         self.assertEquals(len(c.reverse_fiche), 1)
   436         self.assertEqual(len(c.reverse_fiche), 1)
   413 
   437 
   414     def test_set_attributes_in_before_update(self):
   438     def test_set_attributes_in_before_update(self):
   415         # local hook
   439         # local hook
   416         class DummyBeforeHook(Hook):
   440         class DummyBeforeHook(Hook):
   417             __regid__ = 'dummy-before-hook'
   441             __regid__ = 'dummy-before-hook'
   428             req = self.request()
   452             req = self.request()
   429             addr = req.create_entity('EmailAddress', address=u'a@b.fr')
   453             addr = req.create_entity('EmailAddress', address=u'a@b.fr')
   430             addr.set_attributes(address=u'a@b.com')
   454             addr.set_attributes(address=u'a@b.com')
   431             rset = self.execute('Any A,AA WHERE X eid %(x)s, X address A, X alias AA',
   455             rset = self.execute('Any A,AA WHERE X eid %(x)s, X address A, X alias AA',
   432                                 {'x': addr.eid})
   456                                 {'x': addr.eid})
   433             self.assertEquals(rset.rows, [[u'a@b.com', u'foo']])
   457             self.assertEqual(rset.rows, [[u'a@b.com', u'foo']])
   434 
   458 
   435     def test_set_attributes_in_before_add(self):
   459     def test_set_attributes_in_before_add(self):
   436         # local hook
   460         # local hook
   437         class DummyBeforeHook(Hook):
   461         class DummyBeforeHook(Hook):
   438             __regid__ = 'dummy-before-hook'
   462             __regid__ = 'dummy-before-hook'
   475         self.session.set_pool()
   499         self.session.set_pool()
   476         self.assert_(self.repo.system_source.create_eid(self.session))
   500         self.assert_(self.repo.system_source.create_eid(self.session))
   477 
   501 
   478     def test_source_from_eid(self):
   502     def test_source_from_eid(self):
   479         self.session.set_pool()
   503         self.session.set_pool()
   480         self.assertEquals(self.repo.source_from_eid(1, self.session),
   504         self.assertEqual(self.repo.source_from_eid(1, self.session),
   481                           self.repo.sources_by_uri['system'])
   505                           self.repo.sources_by_uri['system'])
   482 
   506 
   483     def test_source_from_eid_raise(self):
   507     def test_source_from_eid_raise(self):
   484         self.session.set_pool()
   508         self.session.set_pool()
   485         self.assertRaises(UnknownEid, self.repo.source_from_eid, -2, self.session)
   509         self.assertRaises(UnknownEid, self.repo.source_from_eid, -2, self.session)
   486 
   510 
   487     def test_type_from_eid(self):
   511     def test_type_from_eid(self):
   488         self.session.set_pool()
   512         self.session.set_pool()
   489         self.assertEquals(self.repo.type_from_eid(1, self.session), 'CWGroup')
   513         self.assertEqual(self.repo.type_from_eid(1, self.session), 'CWGroup')
   490 
   514 
   491     def test_type_from_eid_raise(self):
   515     def test_type_from_eid_raise(self):
   492         self.session.set_pool()
   516         self.session.set_pool()
   493         self.assertRaises(UnknownEid, self.repo.type_from_eid, -2, self.session)
   517         self.assertRaises(UnknownEid, self.repo.type_from_eid, -2, self.session)
   494 
   518 
   501         cu = self.session.system_sql('SELECT * FROM entities WHERE eid = -1')
   525         cu = self.session.system_sql('SELECT * FROM entities WHERE eid = -1')
   502         data = cu.fetchall()
   526         data = cu.fetchall()
   503         self.assertIsInstance(data[0][3], datetime)
   527         self.assertIsInstance(data[0][3], datetime)
   504         data[0] = list(data[0])
   528         data[0] = list(data[0])
   505         data[0][3] = None
   529         data[0][3] = None
   506         self.assertEquals(tuplify(data), [(-1, 'Personne', 'system', None, None)])
   530         self.assertEqual(tuplify(data), [(-1, 'Personne', 'system', None, None)])
   507         self.repo.delete_info(self.session, entity, 'system', None)
   531         self.repo.delete_info(self.session, entity, 'system', None)
   508         #self.repo.commit()
   532         #self.repo.commit()
   509         cu = self.session.system_sql('SELECT * FROM entities WHERE eid = -1')
   533         cu = self.session.system_sql('SELECT * FROM entities WHERE eid = -1')
   510         data = cu.fetchall()
   534         data = cu.fetchall()
   511         self.assertEquals(data, [])
   535         self.assertEqual(data, [])
   512 
   536 
   513 
   537 
   514 class FTITC(CubicWebTC):
   538 class FTITC(CubicWebTC):
   515 
   539 
   516     def test_reindex_and_modified_since(self):
   540     def test_reindex_and_modified_since(self):
   517         self.repo.system_source.multisources_etypes.add('Personne')
   541         self.repo.system_source.multisources_etypes.add('Personne')
   518         eidp = self.execute('INSERT Personne X: X nom "toto", X prenom "tutu"')[0][0]
   542         eidp = self.execute('INSERT Personne X: X nom "toto", X prenom "tutu"')[0][0]
   519         self.commit()
   543         self.commit()
   520         ts = datetime.now()
   544         ts = datetime.now()
   521         self.assertEquals(len(self.execute('Personne X WHERE X has_text "tutu"')), 1)
   545         self.assertEqual(len(self.execute('Personne X WHERE X has_text "tutu"')), 1)
   522         self.session.set_pool()
   546         self.session.set_pool()
   523         cu = self.session.system_sql('SELECT mtime, eid FROM entities WHERE eid = %s' % eidp)
   547         cu = self.session.system_sql('SELECT mtime, eid FROM entities WHERE eid = %s' % eidp)
   524         omtime = cu.fetchone()[0]
   548         omtime = cu.fetchone()[0]
   525         # our sqlite datetime adapter is ignore seconds fraction, so we have to
   549         # our sqlite datetime adapter is ignore seconds fraction, so we have to
   526         # ensure update is done the next seconds
   550         # ensure update is done the next seconds
   527         time.sleep(1 - (ts.second - int(ts.second)))
   551         time.sleep(1 - (ts.second - int(ts.second)))
   528         self.execute('SET X nom "tata" WHERE X eid %(x)s', {'x': eidp})
   552         self.execute('SET X nom "tata" WHERE X eid %(x)s', {'x': eidp})
   529         self.commit()
   553         self.commit()
   530         self.assertEquals(len(self.execute('Personne X WHERE X has_text "tutu"')), 1)
   554         self.assertEqual(len(self.execute('Personne X WHERE X has_text "tutu"')), 1)
   531         self.session.set_pool()
   555         self.session.set_pool()
   532         cu = self.session.system_sql('SELECT mtime FROM entities WHERE eid = %s' % eidp)
   556         cu = self.session.system_sql('SELECT mtime FROM entities WHERE eid = %s' % eidp)
   533         mtime = cu.fetchone()[0]
   557         mtime = cu.fetchone()[0]
   534         self.failUnless(omtime < mtime)
   558         self.failUnless(omtime < mtime)
   535         self.commit()
   559         self.commit()
   536         date, modified, deleted = self.repo.entities_modified_since(('Personne',), omtime)
   560         date, modified, deleted = self.repo.entities_modified_since(('Personne',), omtime)
   537         self.assertEquals(modified, [('Personne', eidp)])
   561         self.assertEqual(modified, [('Personne', eidp)])
   538         self.assertEquals(deleted, [])
   562         self.assertEqual(deleted, [])
   539         date, modified, deleted = self.repo.entities_modified_since(('Personne',), mtime)
   563         date, modified, deleted = self.repo.entities_modified_since(('Personne',), mtime)
   540         self.assertEquals(modified, [])
   564         self.assertEqual(modified, [])
   541         self.assertEquals(deleted, [])
   565         self.assertEqual(deleted, [])
   542         self.execute('DELETE Personne X WHERE X eid %(x)s', {'x': eidp})
   566         self.execute('DELETE Personne X WHERE X eid %(x)s', {'x': eidp})
   543         self.commit()
   567         self.commit()
   544         date, modified, deleted = self.repo.entities_modified_since(('Personne',), omtime)
   568         date, modified, deleted = self.repo.entities_modified_since(('Personne',), omtime)
   545         self.assertEquals(modified, [])
   569         self.assertEqual(modified, [])
   546         self.assertEquals(deleted, [('Personne', eidp)])
   570         self.assertEqual(deleted, [('Personne', eidp)])
   547 
   571 
   548     def test_fulltext_container_entity(self):
   572     def test_fulltext_container_entity(self):
   549         assert self.schema.rschema('use_email').fulltext_container == 'subject'
   573         assert self.schema.rschema('use_email').fulltext_container == 'subject'
   550         req = self.request()
   574         req = self.request()
   551         toto = req.create_entity('EmailAddress', address=u'toto@logilab.fr')
   575         toto = req.create_entity('EmailAddress', address=u'toto@logilab.fr')
   552         self.commit()
   576         self.commit()
   553         rset = req.execute('Any X WHERE X has_text %(t)s', {'t': 'toto'})
   577         rset = req.execute('Any X WHERE X has_text %(t)s', {'t': 'toto'})
   554         self.assertEquals(rset.rows, [])
   578         self.assertEqual(rset.rows, [])
   555         req.user.set_relations(use_email=toto)
   579         req.user.set_relations(use_email=toto)
   556         self.commit()
   580         self.commit()
   557         rset = req.execute('Any X WHERE X has_text %(t)s', {'t': 'toto'})
   581         rset = req.execute('Any X WHERE X has_text %(t)s', {'t': 'toto'})
   558         self.assertEquals(rset.rows, [[req.user.eid]])
   582         self.assertEqual(rset.rows, [[req.user.eid]])
   559         req.execute('DELETE X use_email Y WHERE X login "admin", Y eid %(y)s',
   583         req.execute('DELETE X use_email Y WHERE X login "admin", Y eid %(y)s',
   560                     {'y': toto.eid})
   584                     {'y': toto.eid})
   561         self.commit()
   585         self.commit()
   562         rset = req.execute('Any X WHERE X has_text %(t)s', {'t': 'toto'})
   586         rset = req.execute('Any X WHERE X has_text %(t)s', {'t': 'toto'})
   563         self.assertEquals(rset.rows, [])
   587         self.assertEqual(rset.rows, [])
   564         tutu = req.create_entity('EmailAddress', address=u'tutu@logilab.fr')
   588         tutu = req.create_entity('EmailAddress', address=u'tutu@logilab.fr')
   565         req.user.set_relations(use_email=tutu)
   589         req.user.set_relations(use_email=tutu)
   566         self.commit()
   590         self.commit()
   567         rset = req.execute('Any X WHERE X has_text %(t)s', {'t': 'tutu'})
   591         rset = req.execute('Any X WHERE X has_text %(t)s', {'t': 'tutu'})
   568         self.assertEquals(rset.rows, [[req.user.eid]])
   592         self.assertEqual(rset.rows, [[req.user.eid]])
   569         tutu.set_attributes(address=u'hip@logilab.fr')
   593         tutu.set_attributes(address=u'hip@logilab.fr')
   570         self.commit()
   594         self.commit()
   571         rset = req.execute('Any X WHERE X has_text %(t)s', {'t': 'tutu'})
   595         rset = req.execute('Any X WHERE X has_text %(t)s', {'t': 'tutu'})
   572         self.assertEquals(rset.rows, [])
   596         self.assertEqual(rset.rows, [])
   573         rset = req.execute('Any X WHERE X has_text %(t)s', {'t': 'hip'})
   597         rset = req.execute('Any X WHERE X has_text %(t)s', {'t': 'hip'})
   574         self.assertEquals(rset.rows, [[req.user.eid]])
   598         self.assertEqual(rset.rows, [[req.user.eid]])
   575 
   599 
   576     def test_no_uncessary_ftiindex_op(self):
   600     def test_no_uncessary_ftiindex_op(self):
   577         req = self.request()
   601         req = self.request()
   578         req.create_entity('Workflow', name=u'dummy workflow', description=u'huuuuu')
   602         req.create_entity('Workflow', name=u'dummy workflow', description=u'huuuuu')
   579         self.failIf(any(x for x in self.session.pending_operations
   603         self.failIf(any(x for x in self.session.pending_operations
   582 
   606 
   583 class DBInitTC(CubicWebTC):
   607 class DBInitTC(CubicWebTC):
   584 
   608 
   585     def test_versions_inserted(self):
   609     def test_versions_inserted(self):
   586         inserted = [r[0] for r in self.execute('Any K ORDERBY K WHERE P pkey K, P pkey ~= "system.version.%"')]
   610         inserted = [r[0] for r in self.execute('Any K ORDERBY K WHERE P pkey K, P pkey ~= "system.version.%"')]
   587         self.assertEquals(inserted,
   611         self.assertEqual(inserted,
   588                           [u'system.version.basket', u'system.version.card', u'system.version.comment',
   612                           [u'system.version.basket', u'system.version.card', u'system.version.comment',
   589                            u'system.version.cubicweb', u'system.version.email',
   613                            u'system.version.cubicweb', u'system.version.email',
   590                            u'system.version.file', u'system.version.folder',
   614                            u'system.version.file', u'system.version.folder',
   591                            u'system.version.tag'])
   615                            u'system.version.tag'])
   592 
   616 
   614 
   638 
   615         with self.temporary_appobjects(EcritParHook):
   639         with self.temporary_appobjects(EcritParHook):
   616             eidp = self.execute('INSERT Personne X: X nom "toto"')[0][0]
   640             eidp = self.execute('INSERT Personne X: X nom "toto"')[0][0]
   617             eidn = self.execute('INSERT Note X: X type "T"')[0][0]
   641             eidn = self.execute('INSERT Note X: X type "T"')[0][0]
   618             self.execute('SET N ecrit_par Y WHERE N type "T", Y nom "toto"')
   642             self.execute('SET N ecrit_par Y WHERE N type "T", Y nom "toto"')
   619             self.assertEquals(CALLED, [('before_add_relation', eidn, 'ecrit_par', eidp),
   643             self.assertEqual(CALLED, [('before_add_relation', eidn, 'ecrit_par', eidp),
   620                                        ('after_add_relation', eidn, 'ecrit_par', eidp)])
   644                                        ('after_add_relation', eidn, 'ecrit_par', eidp)])
   621             CALLED[:] = ()
   645             CALLED[:] = ()
   622             self.execute('DELETE N ecrit_par Y WHERE N type "T", Y nom "toto"')
   646             self.execute('DELETE N ecrit_par Y WHERE N type "T", Y nom "toto"')
   623             self.assertEquals(CALLED, [('before_delete_relation', eidn, 'ecrit_par', eidp),
   647             self.assertEqual(CALLED, [('before_delete_relation', eidn, 'ecrit_par', eidp),
   624                                        ('after_delete_relation', eidn, 'ecrit_par', eidp)])
   648                                        ('after_delete_relation', eidn, 'ecrit_par', eidp)])
   625             CALLED[:] = ()
   649             CALLED[:] = ()
   626             eidn = self.execute('INSERT Note N: N ecrit_par P WHERE P nom "toto"')[0][0]
   650             eidn = self.execute('INSERT Note N: N ecrit_par P WHERE P nom "toto"')[0][0]
   627             self.assertEquals(CALLED, [('before_add_relation', eidn, 'ecrit_par', eidp),
   651             self.assertEqual(CALLED, [('before_add_relation', eidn, 'ecrit_par', eidp),
   628                                        ('after_add_relation', eidn, 'ecrit_par', eidp)])
   652                                        ('after_add_relation', eidn, 'ecrit_par', eidp)])
   629 
   653 
   630     def test_unique_contraint(self):
   654     def test_unique_contraint(self):
   631         req = self.request()
   655         req = self.request()
   632         toto = req.create_entity('Personne', nom=u'toto')
   656         toto = req.create_entity('Personne', nom=u'toto')
   636         req.create_entity('Note', type=u'todo', inline1=a01)
   660         req.create_entity('Note', type=u'todo', inline1=a01)
   637         req.cnx.commit()
   661         req.cnx.commit()
   638         req = self.request()
   662         req = self.request()
   639         req.create_entity('Note', type=u'todo', inline1=a01)
   663         req.create_entity('Note', type=u'todo', inline1=a01)
   640         ex = self.assertRaises(ValidationError, req.cnx.commit)
   664         ex = self.assertRaises(ValidationError, req.cnx.commit)
   641         self.assertEquals(ex.errors, {'inline1-subject': u'RQLUniqueConstraint S type T, S inline1 A1, A1 todo_by C, Y type T, Y inline1 A2, A2 todo_by C failed'})
   665         self.assertEqual(ex.errors, {'inline1-subject': u'RQLUniqueConstraint S type T, S inline1 A1, A1 todo_by C, Y type T, Y inline1 A2, A2 todo_by C failed'})
   642 
   666 
   643 if __name__ == '__main__':
   667 if __name__ == '__main__':
   644     unittest_main()
   668     unittest_main()