cubicweb/server/test/unittest_repository.py
changeset 11057 0b59724cb3f2
parent 11008 de86c6592cc7
child 11195 5de859b95988
equal deleted inserted replaced
11052:058bb3dc685f 11057:0b59724cb3f2
       
     1 # -*- coding: iso-8859-1 -*-
       
     2 # copyright 2003-2014 LOGILAB S.A. (Paris, FRANCE), all rights reserved.
       
     3 # contact http://www.logilab.fr/ -- mailto:contact@logilab.fr
       
     4 #
       
     5 # This file is part of CubicWeb.
       
     6 #
       
     7 # CubicWeb is free software: you can redistribute it and/or modify it under the
       
     8 # terms of the GNU Lesser General Public License as published by the Free
       
     9 # Software Foundation, either version 2.1 of the License, or (at your option)
       
    10 # any later version.
       
    11 #
       
    12 # CubicWeb is distributed in the hope that it will be useful, but WITHOUT
       
    13 # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
       
    14 # FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for more
       
    15 # details.
       
    16 #
       
    17 # You should have received a copy of the GNU Lesser General Public License along
       
    18 # with CubicWeb.  If not, see <http://www.gnu.org/licenses/>.
       
    19 """unit tests for module cubicweb.server.repository"""
       
    20 
       
    21 import threading
       
    22 import time
       
    23 import logging
       
    24 
       
    25 from six.moves import range
       
    26 
       
    27 from yams.constraints import UniqueConstraint
       
    28 from yams import register_base_type, unregister_base_type
       
    29 
       
    30 from logilab.database import get_db_helper
       
    31 
       
    32 from cubicweb import (BadConnectionId, ValidationError,
       
    33                       UnknownEid, AuthenticationError, Unauthorized, QueryError)
       
    34 from cubicweb.predicates import is_instance
       
    35 from cubicweb.schema import RQLConstraint
       
    36 from cubicweb.devtools.testlib import CubicWebTC
       
    37 from cubicweb.devtools.repotest import tuplify
       
    38 from cubicweb.server import hook
       
    39 from cubicweb.server.sqlutils import SQL_PREFIX
       
    40 from cubicweb.server.hook import Hook
       
    41 from cubicweb.server.sources import native
       
    42 from cubicweb.server.session import SessionClosedError
       
    43 
       
    44 
       
    45 class RepositoryTC(CubicWebTC):
       
    46     """ singleton providing access to a persistent storage for entities
       
    47     and relation
       
    48     """
       
    49 
       
    50     def test_unique_together_constraint(self):
       
    51         with self.admin_access.repo_cnx() as cnx:
       
    52             cnx.execute('INSERT Societe S: S nom "Logilab", S type "SSLL", S cp "75013"')
       
    53             with self.assertRaises(ValidationError) as wraperr:
       
    54                 cnx.execute('INSERT Societe S: S nom "Logilab", S type "SSLL", S cp "75013"')
       
    55             self.assertEqual(
       
    56                 {'cp': u'%(KEY-rtype)s is part of violated unicity constraint',
       
    57                  'nom': u'%(KEY-rtype)s is part of violated unicity constraint',
       
    58                  'type': u'%(KEY-rtype)s is part of violated unicity constraint',
       
    59                  '': u'some relations violate a unicity constraint'},
       
    60                 wraperr.exception.args[1])
       
    61 
       
    62     def test_unique_together_schema(self):
       
    63         person = self.repo.schema.eschema('Personne')
       
    64         self.assertEqual(len(person._unique_together), 1)
       
    65         self.assertItemsEqual(person._unique_together[0],
       
    66                                            ('nom', 'prenom', 'inline2'))
       
    67 
       
    68     def test_all_entities_have_owner(self):
       
    69         with self.admin_access.repo_cnx() as cnx:
       
    70             self.assertFalse(cnx.execute('Any X WHERE NOT X owned_by U'))
       
    71 
       
    72     def test_all_entities_have_is(self):
       
    73         with self.admin_access.repo_cnx() as cnx:
       
    74             self.assertFalse(cnx.execute('Any X WHERE NOT X is ET'))
       
    75 
       
    76     def test_all_entities_have_cw_source(self):
       
    77         with self.admin_access.repo_cnx() as cnx:
       
    78             self.assertFalse(cnx.execute('Any X WHERE NOT X cw_source S'))
       
    79 
       
    80     def test_connect(self):
       
    81         cnxid = self.repo.connect(self.admlogin, password=self.admpassword)
       
    82         self.assertTrue(cnxid)
       
    83         self.repo.close(cnxid)
       
    84         self.assertRaises(AuthenticationError,
       
    85                           self.repo.connect, self.admlogin, password='nimportnawak')
       
    86         self.assertRaises(AuthenticationError,
       
    87                           self.repo.connect, self.admlogin, password='')
       
    88         self.assertRaises(AuthenticationError,
       
    89                           self.repo.connect, self.admlogin, password=None)
       
    90         self.assertRaises(AuthenticationError,
       
    91                           self.repo.connect, None, password=None)
       
    92         self.assertRaises(AuthenticationError,
       
    93                           self.repo.connect, self.admlogin)
       
    94         self.assertRaises(AuthenticationError,
       
    95                           self.repo.connect, None)
       
    96 
       
    97     def test_login_upassword_accent(self):
       
    98         with self.admin_access.repo_cnx() as cnx:
       
    99             cnx.execute('INSERT CWUser X: X login %(login)s, X upassword %(passwd)s, '
       
   100                         'X in_group G WHERE G name "users"',
       
   101                         {'login': u"barnabé", 'passwd': u"héhéhé".encode('UTF8')})
       
   102             cnx.commit()
       
   103         repo = self.repo
       
   104         cnxid = repo.connect(u"barnabé", password=u"héhéhé".encode('UTF8'))
       
   105         self.assertTrue(cnxid)
       
   106         repo.close(cnxid)
       
   107 
       
   108     def test_rollback_on_execute_validation_error(self):
       
   109         class ValidationErrorAfterHook(Hook):
       
   110             __regid__ = 'valerror-after-hook'
       
   111             __select__ = Hook.__select__ & is_instance('CWGroup')
       
   112             events = ('after_update_entity',)
       
   113             def __call__(self):
       
   114                 raise ValidationError(self.entity.eid, {})
       
   115 
       
   116         with self.admin_access.repo_cnx() as cnx:
       
   117             with self.temporary_appobjects(ValidationErrorAfterHook):
       
   118                 self.assertRaises(ValidationError,
       
   119                                   cnx.execute, 'SET X name "toto" WHERE X is CWGroup, X name "guests"')
       
   120                 self.assertTrue(cnx.execute('Any X WHERE X is CWGroup, X name "toto"'))
       
   121                 with self.assertRaises(QueryError) as cm:
       
   122                     cnx.commit()
       
   123                 self.assertEqual(str(cm.exception), 'transaction must be rolled back')
       
   124                 cnx.rollback()
       
   125                 self.assertFalse(cnx.execute('Any X WHERE X is CWGroup, X name "toto"'))
       
   126 
       
   127     def test_rollback_on_execute_unauthorized(self):
       
   128         class UnauthorizedAfterHook(Hook):
       
   129             __regid__ = 'unauthorized-after-hook'
       
   130             __select__ = Hook.__select__ & is_instance('CWGroup')
       
   131             events = ('after_update_entity',)
       
   132             def __call__(self):
       
   133                 raise Unauthorized()
       
   134 
       
   135         with self.admin_access.repo_cnx() as cnx:
       
   136             with self.temporary_appobjects(UnauthorizedAfterHook):
       
   137                 self.assertRaises(Unauthorized,
       
   138                                   cnx.execute, 'SET X name "toto" WHERE X is CWGroup, X name "guests"')
       
   139                 self.assertTrue(cnx.execute('Any X WHERE X is CWGroup, X name "toto"'))
       
   140                 with self.assertRaises(QueryError) as cm:
       
   141                     cnx.commit()
       
   142                 self.assertEqual(str(cm.exception), 'transaction must be rolled back')
       
   143                 cnx.rollback()
       
   144                 self.assertFalse(cnx.execute('Any X WHERE X is CWGroup, X name "toto"'))
       
   145 
       
   146 
       
   147     def test_close(self):
       
   148         repo = self.repo
       
   149         cnxid = repo.connect(self.admlogin, password=self.admpassword)
       
   150         self.assertTrue(cnxid)
       
   151         repo.close(cnxid)
       
   152 
       
   153 
       
   154     def test_initial_schema(self):
       
   155         schema = self.repo.schema
       
   156         # check order of attributes is respected
       
   157         notin = set(('eid', 'is', 'is_instance_of', 'identity',
       
   158                      'creation_date', 'modification_date', 'cwuri',
       
   159                      'owned_by', 'created_by', 'cw_source',
       
   160                      'update_permission', 'read_permission',
       
   161                      'add_permission', 'in_basket'))
       
   162         self.assertListEqual(['relation_type',
       
   163                               'from_entity', 'to_entity',
       
   164                               'constrained_by',
       
   165                               'cardinality', 'ordernum', 'formula',
       
   166                               'indexed', 'fulltextindexed', 'internationalizable',
       
   167                               'defaultval', 'extra_props',
       
   168                               'description', 'description_format'],
       
   169                              [r.type
       
   170                               for r in schema.eschema('CWAttribute').ordered_relations()
       
   171                               if r.type not in notin])
       
   172 
       
   173         self.assertEqual(schema.eschema('CWEType').main_attribute(), 'name')
       
   174         self.assertEqual(schema.eschema('State').main_attribute(), 'name')
       
   175 
       
   176         constraints = schema.rschema('name').rdef('CWEType', 'String').constraints
       
   177         self.assertEqual(len(constraints), 2)
       
   178         for cstr in constraints[:]:
       
   179             if isinstance(cstr, UniqueConstraint):
       
   180                 constraints.remove(cstr)
       
   181                 break
       
   182         else:
       
   183             self.fail('unique constraint not found')
       
   184         sizeconstraint = constraints[0]
       
   185         self.assertEqual(sizeconstraint.min, None)
       
   186         self.assertEqual(sizeconstraint.max, 64)
       
   187 
       
   188         constraints = schema.rschema('relation_type').rdef('CWAttribute', 'CWRType').constraints
       
   189         self.assertEqual(len(constraints), 1)
       
   190         cstr = constraints[0]
       
   191         self.assertIsInstance(cstr, RQLConstraint)
       
   192         self.assertEqual(cstr.expression, 'O final TRUE')
       
   193 
       
   194         ownedby = schema.rschema('owned_by')
       
   195         self.assertEqual(ownedby.objects('CWEType'), ('CWUser',))
       
   196 
       
   197     def test_internal_api(self):
       
   198         repo = self.repo
       
   199         cnxid = repo.connect(self.admlogin, password=self.admpassword)
       
   200         session = repo._get_session(cnxid)
       
   201         with session.new_cnx() as cnx:
       
   202             self.assertEqual(repo.type_and_source_from_eid(2, cnx),
       
   203                              ('CWGroup', None, 'system'))
       
   204             self.assertEqual(repo.type_from_eid(2, cnx), 'CWGroup')
       
   205         repo.close(cnxid)
       
   206 
       
   207     def test_public_api(self):
       
   208         self.assertEqual(self.repo.get_schema(), self.repo.schema)
       
   209         self.assertEqual(self.repo.source_defs(), {'system': {'type': 'native',
       
   210                                                               'uri': 'system',
       
   211                                                               'use-cwuri-as-url': False}
       
   212                                                   })
       
   213         # .properties() return a result set
       
   214         self.assertEqual(self.repo.properties().rql, 'Any K,V WHERE P is CWProperty,P pkey K, P value V, NOT P for_user U')
       
   215 
       
   216     def test_schema_is_relation(self):
       
   217         with self.admin_access.repo_cnx() as cnx:
       
   218             no_is_rset = cnx.execute('Any X WHERE NOT X is ET')
       
   219             self.assertFalse(no_is_rset, no_is_rset.description)
       
   220 
       
   221     def test_delete_if_singlecard1(self):
       
   222         with self.admin_access.repo_cnx() as cnx:
       
   223             note = cnx.create_entity('Affaire')
       
   224             p1 = cnx.create_entity('Personne', nom=u'toto')
       
   225             cnx.execute('SET A todo_by P WHERE A eid %(x)s, P eid %(p)s',
       
   226                         {'x': note.eid, 'p': p1.eid})
       
   227             rset = cnx.execute('Any P WHERE A todo_by P, A eid %(x)s',
       
   228                                {'x': note.eid})
       
   229             self.assertEqual(len(rset), 1)
       
   230             p2 = cnx.create_entity('Personne', nom=u'tutu')
       
   231             cnx.execute('SET A todo_by P WHERE A eid %(x)s, P eid %(p)s',
       
   232                         {'x': note.eid, 'p': p2.eid})
       
   233             rset = cnx.execute('Any P WHERE A todo_by P, A eid %(x)s',
       
   234                                 {'x': note.eid})
       
   235             self.assertEqual(len(rset), 1)
       
   236             self.assertEqual(rset.rows[0][0], p2.eid)
       
   237 
       
   238     def test_delete_if_object_inlined_singlecard(self):
       
   239         with self.admin_access.repo_cnx() as cnx:
       
   240             c = cnx.create_entity('Card', title=u'Carte')
       
   241             cnx.create_entity('Personne', nom=u'Vincent', fiche=c)
       
   242             cnx.create_entity('Personne', nom=u'Florent', fiche=c)
       
   243             cnx.commit()
       
   244             self.assertEqual(len(c.reverse_fiche), 1)
       
   245 
       
   246     def test_delete_computed_relation_nonregr(self):
       
   247         with self.admin_access.repo_cnx() as cnx:
       
   248             c = cnx.create_entity('Personne', nom=u'Adam', login_user=cnx.user.eid)
       
   249             cnx.commit()
       
   250             c.cw_delete()
       
   251             cnx.commit()
       
   252 
       
   253     def test_cw_set_in_before_update(self):
       
   254         # local hook
       
   255         class DummyBeforeHook(Hook):
       
   256             __regid__ = 'dummy-before-hook'
       
   257             __select__ = Hook.__select__ & is_instance('EmailAddress')
       
   258             events = ('before_update_entity',)
       
   259             def __call__(self):
       
   260                 # safety belt: avoid potential infinite recursion if the test
       
   261                 #              fails (i.e. RuntimeError not raised)
       
   262                 pendings = self._cw.transaction_data.setdefault('pending', set())
       
   263                 if self.entity.eid not in pendings:
       
   264                     pendings.add(self.entity.eid)
       
   265                     self.entity.cw_set(alias=u'foo')
       
   266 
       
   267         with self.admin_access.repo_cnx() as cnx:
       
   268             with self.temporary_appobjects(DummyBeforeHook):
       
   269                 addr = cnx.create_entity('EmailAddress', address=u'a@b.fr')
       
   270                 addr.cw_set(address=u'a@b.com')
       
   271                 rset = cnx.execute('Any A,AA WHERE X eid %(x)s, X address A, X alias AA',
       
   272                                    {'x': addr.eid})
       
   273                 self.assertEqual(rset.rows, [[u'a@b.com', u'foo']])
       
   274 
       
   275     def test_cw_set_in_before_add(self):
       
   276         # local hook
       
   277         class DummyBeforeHook(Hook):
       
   278             __regid__ = 'dummy-before-hook'
       
   279             __select__ = Hook.__select__ & is_instance('EmailAddress')
       
   280             events = ('before_add_entity',)
       
   281             def __call__(self):
       
   282                 # cw_set is forbidden within before_add_entity()
       
   283                 self.entity.cw_set(alias=u'foo')
       
   284 
       
   285         with self.admin_access.repo_cnx() as cnx:
       
   286             with self.temporary_appobjects(DummyBeforeHook):
       
   287                 # XXX will fail with python -O
       
   288                 self.assertRaises(AssertionError, cnx.create_entity,
       
   289                                   'EmailAddress', address=u'a@b.fr')
       
   290 
       
   291     def test_multiple_edit_cw_set(self):
       
   292         """make sure cw_edited doesn't get cluttered
       
   293         by previous entities on multiple set
       
   294         """
       
   295         # local hook
       
   296         class DummyBeforeHook(Hook):
       
   297             _test = self # keep reference to test instance
       
   298             __regid__ = 'dummy-before-hook'
       
   299             __select__ = Hook.__select__ & is_instance('Affaire')
       
   300             events = ('before_update_entity',)
       
   301             def __call__(self):
       
   302                 # invoiced attribute shouldn't be considered "edited" before the hook
       
   303                 self._test.assertFalse('invoiced' in self.entity.cw_edited,
       
   304                                   'cw_edited cluttered by previous update')
       
   305                 self.entity.cw_edited['invoiced'] = 10
       
   306 
       
   307         with self.admin_access.repo_cnx() as cnx:
       
   308             with self.temporary_appobjects(DummyBeforeHook):
       
   309                 cnx.create_entity('Affaire', ref=u'AFF01')
       
   310                 cnx.create_entity('Affaire', ref=u'AFF02')
       
   311                 cnx.execute('SET A duration 10 WHERE A is Affaire')
       
   312 
       
   313 
       
   314     def test_user_friendly_error(self):
       
   315         from cubicweb.entities.adapters import IUserFriendlyUniqueTogether
       
   316         class MyIUserFriendlyUniqueTogether(IUserFriendlyUniqueTogether):
       
   317             __select__ = IUserFriendlyUniqueTogether.__select__ & is_instance('Societe')
       
   318             def raise_user_exception(self):
       
   319                 raise ValidationError(self.entity.eid, {'hip': 'hop'})
       
   320 
       
   321         with self.admin_access.repo_cnx() as cnx:
       
   322             with self.temporary_appobjects(MyIUserFriendlyUniqueTogether):
       
   323                 s = cnx.create_entity('Societe', nom=u'Logilab', type=u'ssll', cp=u'75013')
       
   324                 cnx.commit()
       
   325                 with self.assertRaises(ValidationError) as cm:
       
   326                     cnx.create_entity('Societe', nom=u'Logilab', type=u'ssll', cp=u'75013')
       
   327                 self.assertEqual(cm.exception.errors, {'hip': 'hop'})
       
   328                 cnx.rollback()
       
   329                 cnx.create_entity('Societe', nom=u'Logilab', type=u'ssll', cp=u'31400')
       
   330                 with self.assertRaises(ValidationError) as cm:
       
   331                     s.cw_set(cp=u'31400')
       
   332                 self.assertEqual(cm.exception.entity, s.eid)
       
   333                 self.assertEqual(cm.exception.errors, {'hip': 'hop'})
       
   334                 cnx.rollback()
       
   335 
       
   336     def test_attribute_cache(self):
       
   337         with self.admin_access.repo_cnx() as cnx:
       
   338             bk = cnx.create_entity('Bookmark', title=u'index', path=u'/')
       
   339             cnx.commit()
       
   340             self.assertEqual(bk.title, 'index')
       
   341             bk.cw_set(title=u'root')
       
   342             self.assertEqual(bk.title, 'root')
       
   343             cnx.commit()
       
   344             self.assertEqual(bk.title, 'root')
       
   345 
       
   346 class SchemaDeserialTC(CubicWebTC):
       
   347 
       
   348     appid = 'data-schemaserial'
       
   349 
       
   350     @classmethod
       
   351     def setUpClass(cls):
       
   352         register_base_type('BabarTestType', ('jungle_speed',))
       
   353         helper = get_db_helper('sqlite')
       
   354         helper.TYPE_MAPPING['BabarTestType'] = 'TEXT'
       
   355         helper.TYPE_CONVERTERS['BabarTestType'] = lambda x: '"%s"' % x
       
   356         super(SchemaDeserialTC, cls).setUpClass()
       
   357 
       
   358 
       
   359     @classmethod
       
   360     def tearDownClass(cls):
       
   361         unregister_base_type('BabarTestType')
       
   362         helper = get_db_helper('sqlite')
       
   363         helper.TYPE_MAPPING.pop('BabarTestType', None)
       
   364         helper.TYPE_CONVERTERS.pop('BabarTestType', None)
       
   365         super(SchemaDeserialTC, cls).tearDownClass()
       
   366 
       
   367     def test_deserialization_base(self):
       
   368         """Check the following deserialization
       
   369 
       
   370         * all CWEtype has name
       
   371         * Final type
       
   372         * CWUniqueTogetherConstraint
       
   373         * _unique_together__ content"""
       
   374         origshema = self.repo.schema
       
   375         try:
       
   376             self.repo.config.repairing = True # avoid versions checking
       
   377             self.repo.set_schema(self.repo.deserialize_schema())
       
   378             table = SQL_PREFIX + 'CWEType'
       
   379             namecol = SQL_PREFIX + 'name'
       
   380             finalcol = SQL_PREFIX + 'final'
       
   381             with self.admin_access.repo_cnx() as cnx:
       
   382                 cu = cnx.system_sql('SELECT %s FROM %s WHERE %s is NULL'
       
   383                                     % (namecol, table, finalcol))
       
   384                 self.assertEqual(cu.fetchall(), [])
       
   385                 cu = cnx.system_sql('SELECT %s FROM %s '
       
   386                                     'WHERE %s=%%(final)s ORDER BY %s'
       
   387                                     % (namecol, table, finalcol, namecol),
       
   388                                     {'final': True})
       
   389                 self.assertEqual(cu.fetchall(),
       
   390                                  [(u'BabarTestType',),
       
   391                                   (u'BigInt',), (u'Boolean',), (u'Bytes',),
       
   392                                   (u'Date',), (u'Datetime',),
       
   393                                   (u'Decimal',),(u'Float',),
       
   394                                   (u'Int',),
       
   395                                   (u'Interval',), (u'Password',),
       
   396                                   (u'String',),
       
   397                                   (u'TZDatetime',), (u'TZTime',), (u'Time',)])
       
   398                 sql = ("SELECT etype.cw_eid, etype.cw_name, cstr.cw_eid, rel.eid_to "
       
   399                        "FROM cw_CWUniqueTogetherConstraint as cstr, "
       
   400                        "     relations_relation as rel, "
       
   401                        "     cw_CWEType as etype "
       
   402                        "WHERE cstr.cw_eid = rel.eid_from "
       
   403                        "  AND cstr.cw_constraint_of = etype.cw_eid "
       
   404                        "  AND etype.cw_name = 'Personne' "
       
   405                        ";")
       
   406                 cu = cnx.system_sql(sql)
       
   407                 rows = cu.fetchall()
       
   408                 self.assertEqual(len(rows), 3)
       
   409                 person = self.repo.schema.eschema('Personne')
       
   410                 self.assertEqual(len(person._unique_together), 1)
       
   411                 self.assertItemsEqual(person._unique_together[0],
       
   412                                       ('nom', 'prenom', 'inline2'))
       
   413 
       
   414         finally:
       
   415             self.repo.set_schema(origshema)
       
   416 
       
   417     def test_custom_attribute_param(self):
       
   418         origshema = self.repo.schema
       
   419         try:
       
   420             self.repo.config.repairing = True # avoid versions checking
       
   421             self.repo.set_schema(self.repo.deserialize_schema())
       
   422             pes = self.repo.schema['Personne']
       
   423             attr = pes.rdef('custom_field_of_jungle')
       
   424             self.assertIn('jungle_speed', vars(attr))
       
   425             self.assertEqual(42, attr.jungle_speed)
       
   426         finally:
       
   427             self.repo.set_schema(origshema)
       
   428 
       
   429 
       
   430 
       
   431 class DataHelpersTC(CubicWebTC):
       
   432 
       
   433     def test_type_from_eid(self):
       
   434         with self.admin_access.repo_cnx() as cnx:
       
   435             self.assertEqual(self.repo.type_from_eid(2, cnx), 'CWGroup')
       
   436 
       
   437     def test_type_from_eid_raise(self):
       
   438         with self.admin_access.repo_cnx() as cnx:
       
   439             self.assertRaises(UnknownEid, self.repo.type_from_eid, -2, cnx)
       
   440 
       
   441     def test_add_delete_info(self):
       
   442         with self.admin_access.repo_cnx() as cnx:
       
   443             entity = self.repo.vreg['etypes'].etype_class('Personne')(cnx)
       
   444             entity.eid = -1
       
   445             entity.complete = lambda x: None
       
   446             self.repo.add_info(cnx, entity, self.repo.system_source)
       
   447             cu = cnx.system_sql('SELECT * FROM entities WHERE eid = -1')
       
   448             data = cu.fetchall()
       
   449             self.assertEqual(tuplify(data), [(-1, 'Personne', 'system', None)])
       
   450             self.repo._delete_cascade_multi(cnx, [entity])
       
   451             self.repo.system_source.delete_info_multi(cnx, [entity])
       
   452             cu = cnx.system_sql('SELECT * FROM entities WHERE eid = -1')
       
   453             data = cu.fetchall()
       
   454             self.assertEqual(data, [])
       
   455 
       
   456 
       
   457 class FTITC(CubicWebTC):
       
   458 
       
   459     def test_fulltext_container_entity(self):
       
   460         with self.admin_access.repo_cnx() as cnx:
       
   461             assert self.schema.rschema('use_email').fulltext_container == 'subject'
       
   462             toto = cnx.create_entity('EmailAddress', address=u'toto@logilab.fr')
       
   463             cnx.commit()
       
   464             rset = cnx.execute('Any X WHERE X has_text %(t)s', {'t': 'toto'})
       
   465             self.assertEqual(rset.rows, [])
       
   466             cnx.user.cw_set(use_email=toto)
       
   467             cnx.commit()
       
   468             rset = cnx.execute('Any X WHERE X has_text %(t)s', {'t': 'toto'})
       
   469             self.assertEqual(rset.rows, [[cnx.user.eid]])
       
   470             cnx.execute('DELETE X use_email Y WHERE X login "admin", Y eid %(y)s',
       
   471                         {'y': toto.eid})
       
   472             cnx.commit()
       
   473             rset = cnx.execute('Any X WHERE X has_text %(t)s', {'t': 'toto'})
       
   474             self.assertEqual(rset.rows, [])
       
   475             tutu = cnx.create_entity('EmailAddress', address=u'tutu@logilab.fr')
       
   476             cnx.user.cw_set(use_email=tutu)
       
   477             cnx.commit()
       
   478             rset = cnx.execute('Any X WHERE X has_text %(t)s', {'t': 'tutu'})
       
   479             self.assertEqual(rset.rows, [[cnx.user.eid]])
       
   480             tutu.cw_set(address=u'hip@logilab.fr')
       
   481             cnx.commit()
       
   482             rset = cnx.execute('Any X WHERE X has_text %(t)s', {'t': 'tutu'})
       
   483             self.assertEqual(rset.rows, [])
       
   484             rset = cnx.execute('Any X WHERE X has_text %(t)s', {'t': 'hip'})
       
   485             self.assertEqual(rset.rows, [[cnx.user.eid]])
       
   486 
       
   487     def test_no_uncessary_ftiindex_op(self):
       
   488         with self.admin_access.repo_cnx() as cnx:
       
   489             cnx.create_entity('Workflow',
       
   490                               name=u'dummy workflow',
       
   491                               description=u'huuuuu')
       
   492             self.assertFalse(any(x for x in cnx.pending_operations
       
   493                                  if isinstance(x, native.FTIndexEntityOp)))
       
   494 
       
   495 
       
   496 class DBInitTC(CubicWebTC):
       
   497 
       
   498     def test_versions_inserted(self):
       
   499         with self.admin_access.repo_cnx() as cnx:
       
   500             inserted = [r[0]
       
   501                         for r in cnx.execute('Any K ORDERBY K '
       
   502                                              'WHERE P pkey K, P pkey ~= "system.version.%"')]
       
   503             self.assertEqual(inserted,
       
   504                              [u'system.version.basket',
       
   505                               u'system.version.card',
       
   506                               u'system.version.comment',
       
   507                               u'system.version.cubicweb',
       
   508                               u'system.version.file',
       
   509                               u'system.version.localperms',
       
   510                               u'system.version.tag'])
       
   511 
       
   512 CALLED = []
       
   513 
       
   514 class InlineRelHooksTC(CubicWebTC):
       
   515     """test relation hooks are called for inlined relations
       
   516     """
       
   517     def setUp(self):
       
   518         CubicWebTC.setUp(self)
       
   519         CALLED[:] = ()
       
   520 
       
   521     def test_inline_relation(self):
       
   522         """make sure <event>_relation hooks are called for inlined relation"""
       
   523 
       
   524         class EcritParHook(hook.Hook):
       
   525             __regid__ = 'inlinedrelhook'
       
   526             __select__ = hook.Hook.__select__ & hook.match_rtype('ecrit_par')
       
   527             events = ('before_add_relation', 'after_add_relation',
       
   528                       'before_delete_relation', 'after_delete_relation')
       
   529             def __call__(self):
       
   530                 CALLED.append((self.event, self.eidfrom, self.rtype, self.eidto))
       
   531 
       
   532         with self.temporary_appobjects(EcritParHook):
       
   533             with self.admin_access.repo_cnx() as cnx:
       
   534                 eidp = cnx.execute('INSERT Personne X: X nom "toto"')[0][0]
       
   535                 eidn = cnx.execute('INSERT Note X: X type "T"')[0][0]
       
   536                 cnx.execute('SET N ecrit_par Y WHERE N type "T", Y nom "toto"')
       
   537                 self.assertEqual(CALLED, [('before_add_relation', eidn, 'ecrit_par', eidp),
       
   538                                           ('after_add_relation', eidn, 'ecrit_par', eidp)])
       
   539                 CALLED[:] = ()
       
   540                 cnx.execute('DELETE N ecrit_par Y WHERE N type "T", Y nom "toto"')
       
   541                 self.assertEqual(CALLED, [('before_delete_relation', eidn, 'ecrit_par', eidp),
       
   542                                           ('after_delete_relation', eidn, 'ecrit_par', eidp)])
       
   543                 CALLED[:] = ()
       
   544                 eidn = cnx.execute('INSERT Note N: N ecrit_par P WHERE P nom "toto"')[0][0]
       
   545                 self.assertEqual(CALLED, [('before_add_relation', eidn, 'ecrit_par', eidp),
       
   546                                           ('after_add_relation', eidn, 'ecrit_par', eidp)])
       
   547 
       
   548     def test_unique_contraint(self):
       
   549         with self.admin_access.repo_cnx() as cnx:
       
   550             toto = cnx.create_entity('Personne', nom=u'toto')
       
   551             a01 = cnx.create_entity('Affaire', ref=u'A01', todo_by=toto)
       
   552             cnx.commit()
       
   553             cnx.create_entity('Note', type=u'todo', inline1=a01)
       
   554             cnx.commit()
       
   555             cnx.create_entity('Note', type=u'todo', inline1=a01)
       
   556             with self.assertRaises(ValidationError) as cm:
       
   557                 cnx.commit()
       
   558             self.assertEqual(cm.exception.errors,
       
   559                              {'inline1-subject': u'RQLUniqueConstraint S type T, S inline1 A1, '
       
   560                               'A1 todo_by C, Y type T, Y inline1 A2, A2 todo_by C failed'})
       
   561 
       
   562     def test_add_relations_at_creation_with_del_existing_rel(self):
       
   563         with self.admin_access.repo_cnx() as cnx:
       
   564             person = cnx.create_entity('Personne',
       
   565                                        nom=u'Toto',
       
   566                                        prenom=u'Lanturlu',
       
   567                                        sexe=u'M')
       
   568             users_rql = 'Any U WHERE U is CWGroup, U name "users"'
       
   569             users = cnx.execute(users_rql).get_entity(0, 0)
       
   570             cnx.create_entity('CWUser',
       
   571                               login=u'Toto',
       
   572                               upassword=u'firstname',
       
   573                               firstname=u'firstname',
       
   574                               surname=u'surname',
       
   575                               reverse_login_user=person,
       
   576                               in_group=users)
       
   577             cnx.commit()
       
   578 
       
   579 
       
   580 class PerformanceTest(CubicWebTC):
       
   581     def setUp(self):
       
   582         super(PerformanceTest, self).setUp()
       
   583         logger = logging.getLogger('cubicweb.session')
       
   584         #logger.handlers = [logging.StreamHandler(sys.stdout)]
       
   585         logger.setLevel(logging.INFO)
       
   586         self.info = logger.info
       
   587 
       
   588     def tearDown(self):
       
   589         super(PerformanceTest, self).tearDown()
       
   590         logger = logging.getLogger('cubicweb.session')
       
   591         logger.setLevel(logging.CRITICAL)
       
   592 
       
   593     def test_composite_deletion(self):
       
   594         with self.admin_access.repo_cnx() as cnx:
       
   595             personnes = []
       
   596             t0 = time.time()
       
   597             for i in range(2000):
       
   598                 p = cnx.create_entity('Personne', nom=u'Doe%03d'%i, prenom=u'John', sexe=u'M')
       
   599                 personnes.append(p)
       
   600             abraham = cnx.create_entity('Personne', nom=u'Abraham', prenom=u'John', sexe=u'M')
       
   601             for j in range(0, 2000, 100):
       
   602                 abraham.cw_set(personne_composite=personnes[j:j+100])
       
   603             t1 = time.time()
       
   604             self.info('creation: %.2gs', (t1 - t0))
       
   605             cnx.commit()
       
   606             t2 = time.time()
       
   607             self.info('commit creation: %.2gs', (t2 - t1))
       
   608             cnx.execute('DELETE Personne P WHERE P eid %(eid)s', {'eid': abraham.eid})
       
   609             t3 = time.time()
       
   610             self.info('deletion: %.2gs', (t3 - t2))
       
   611             cnx.commit()
       
   612             t4 = time.time()
       
   613             self.info("commit deletion: %2gs", (t4 - t3))
       
   614 
       
   615     def test_add_relation_non_inlined(self):
       
   616         with self.admin_access.repo_cnx() as cnx:
       
   617             personnes = []
       
   618             for i in range(2000):
       
   619                 p = cnx.create_entity('Personne', nom=u'Doe%03d'%i, prenom=u'John', sexe=u'M')
       
   620                 personnes.append(p)
       
   621             cnx.commit()
       
   622             t0 = time.time()
       
   623             abraham = cnx.create_entity('Personne', nom=u'Abraham', prenom=u'John', sexe=u'M',
       
   624                                         personne_composite=personnes[:100])
       
   625             t1 = time.time()
       
   626             self.info('creation: %.2gs', (t1 - t0))
       
   627             for j in range(100, 2000, 100):
       
   628                 abraham.cw_set(personne_composite=personnes[j:j+100])
       
   629             t2 = time.time()
       
   630             self.info('more relations: %.2gs', (t2-t1))
       
   631             cnx.commit()
       
   632             t3 = time.time()
       
   633             self.info('commit creation: %.2gs', (t3 - t2))
       
   634 
       
   635     def test_add_relation_inlined(self):
       
   636         with self.admin_access.repo_cnx() as cnx:
       
   637             personnes = []
       
   638             for i in range(2000):
       
   639                 p = cnx.create_entity('Personne', nom=u'Doe%03d'%i, prenom=u'John', sexe=u'M')
       
   640                 personnes.append(p)
       
   641             cnx.commit()
       
   642             t0 = time.time()
       
   643             abraham = cnx.create_entity('Personne', nom=u'Abraham', prenom=u'John', sexe=u'M',
       
   644                                         personne_inlined=personnes[:100])
       
   645             t1 = time.time()
       
   646             self.info('creation: %.2gs', (t1 - t0))
       
   647             for j in range(100, 2000, 100):
       
   648                 abraham.cw_set(personne_inlined=personnes[j:j+100])
       
   649             t2 = time.time()
       
   650             self.info('more relations: %.2gs', (t2-t1))
       
   651             cnx.commit()
       
   652             t3 = time.time()
       
   653             self.info('commit creation: %.2gs', (t3 - t2))
       
   654 
       
   655 
       
   656     def test_session_add_relation(self):
       
   657         """ to be compared with test_session_add_relations"""
       
   658         with self.admin_access.repo_cnx() as cnx:
       
   659             personnes = []
       
   660             for i in range(2000):
       
   661                 p = cnx.create_entity('Personne', nom=u'Doe%03d'%i, prenom=u'John', sexe=u'M')
       
   662                 personnes.append(p)
       
   663             abraham = cnx.create_entity('Personne', nom=u'Abraham', prenom=u'John', sexe=u'M')
       
   664             cnx.commit()
       
   665             t0 = time.time()
       
   666             add_relation = cnx.add_relation
       
   667             for p in personnes:
       
   668                 add_relation(abraham.eid, 'personne_composite', p.eid)
       
   669             cnx.commit()
       
   670             t1 = time.time()
       
   671             self.info('add relation: %.2gs', t1-t0)
       
   672 
       
   673     def test_session_add_relations (self):
       
   674         """ to be compared with test_session_add_relation"""
       
   675         with self.admin_access.repo_cnx() as cnx:
       
   676             personnes = []
       
   677             for i in range(2000):
       
   678                 p = cnx.create_entity('Personne', nom=u'Doe%03d'%i, prenom=u'John', sexe=u'M')
       
   679                 personnes.append(p)
       
   680             abraham = cnx.create_entity('Personne', nom=u'Abraham', prenom=u'John', sexe=u'M')
       
   681             cnx.commit()
       
   682             t0 = time.time()
       
   683             add_relations = cnx.add_relations
       
   684             relations = [('personne_composite', [(abraham.eid, p.eid) for p in personnes])]
       
   685             add_relations(relations)
       
   686             cnx.commit()
       
   687             t1 = time.time()
       
   688             self.info('add relations: %.2gs', t1-t0)
       
   689 
       
   690     def test_session_add_relation_inlined(self):
       
   691         """ to be compared with test_session_add_relations"""
       
   692         with self.admin_access.repo_cnx() as cnx:
       
   693             personnes = []
       
   694             for i in range(2000):
       
   695                 p = cnx.create_entity('Personne', nom=u'Doe%03d'%i, prenom=u'John', sexe=u'M')
       
   696                 personnes.append(p)
       
   697             abraham = cnx.create_entity('Personne', nom=u'Abraham', prenom=u'John', sexe=u'M')
       
   698             cnx.commit()
       
   699             t0 = time.time()
       
   700             add_relation = cnx.add_relation
       
   701             for p in personnes:
       
   702                 add_relation(abraham.eid, 'personne_inlined', p.eid)
       
   703             cnx.commit()
       
   704             t1 = time.time()
       
   705             self.info('add relation (inlined): %.2gs', t1-t0)
       
   706 
       
   707     def test_session_add_relations_inlined (self):
       
   708         """ to be compared with test_session_add_relation"""
       
   709         with self.admin_access.repo_cnx() as cnx:
       
   710             personnes = []
       
   711             for i in range(2000):
       
   712                 p = cnx.create_entity('Personne', nom=u'Doe%03d'%i, prenom=u'John', sexe=u'M')
       
   713                 personnes.append(p)
       
   714             abraham = cnx.create_entity('Personne', nom=u'Abraham', prenom=u'John', sexe=u'M')
       
   715             cnx.commit()
       
   716             t0 = time.time()
       
   717             add_relations = cnx.add_relations
       
   718             relations = [('personne_inlined', [(abraham.eid, p.eid) for p in personnes])]
       
   719             add_relations(relations)
       
   720             cnx.commit()
       
   721             t1 = time.time()
       
   722             self.info('add relations (inlined): %.2gs', t1-t0)
       
   723 
       
   724     def test_optional_relation_reset_1(self):
       
   725         with self.admin_access.repo_cnx() as cnx:
       
   726             p1 = cnx.create_entity('Personne', nom=u'Vincent')
       
   727             p2 = cnx.create_entity('Personne', nom=u'Florent')
       
   728             w = cnx.create_entity('Affaire', ref=u'wc')
       
   729             w.cw_set(todo_by=[p1,p2])
       
   730             w.cw_clear_all_caches()
       
   731             cnx.commit()
       
   732             self.assertEqual(len(w.todo_by), 1)
       
   733             self.assertEqual(w.todo_by[0].eid, p2.eid)
       
   734 
       
   735     def test_optional_relation_reset_2(self):
       
   736         with self.admin_access.repo_cnx() as cnx:
       
   737             p1 = cnx.create_entity('Personne', nom=u'Vincent')
       
   738             p2 = cnx.create_entity('Personne', nom=u'Florent')
       
   739             w = cnx.create_entity('Affaire', ref=u'wc')
       
   740             w.cw_set(todo_by=p1)
       
   741             cnx.commit()
       
   742             w.cw_set(todo_by=p2)
       
   743             w.cw_clear_all_caches()
       
   744             cnx.commit()
       
   745             self.assertEqual(len(w.todo_by), 1)
       
   746             self.assertEqual(w.todo_by[0].eid, p2.eid)
       
   747 
       
   748 
       
   749 if __name__ == '__main__':
       
   750     from logilab.common.testlib import unittest_main
       
   751     unittest_main()