server/test/unittest_repository.py
author Aurelien Campeas <aurelien.campeas@logilab.fr>
Wed, 03 Jul 2013 14:33:27 +0200
branchstable
changeset 9130 0f1504a9fb51
parent 9128 d988eec2d5d3
child 9226 653f1d4a1101
permissions -rw-r--r--
[constraint] more robust unicity constraint failures reporting for end-users Postgres or Sqlserver have limits on the index names (around resp. 64 and 128 characters). Because `logilab.database` encodes the `unique together` constraint rtypes in the index names, we sometimes get truncated index names, from which it is impossible to retrieve all rtypes. In the long run, the way such index are named should be changed. In the short term, we try to reduce the end-user confusion resulting from this design flaw: * in source/native, the regex filtering ``IntegrityError`` message does not impose an `_idx` suffix, which indeed may be absent (the result being an UI message that resembles a catastrophic failure), * also we avoid including a trailing " (double quote) from the error message * in entities/adapters, the well-named ``IUserFriendly`` adapter is made a bit smarter about how to handle missing rtypes. * the adapter also always produces a global message explaining the issue (and the fact that sometimes, the user is not shown all the relevant info) * i18n is updated Closes #2793789

# -*- coding: iso-8859-1 -*-
# copyright 2003-2013 LOGILAB S.A. (Paris, FRANCE), all rights reserved.
# contact http://www.logilab.fr/ -- mailto:contact@logilab.fr
#
# This file is part of CubicWeb.
#
# CubicWeb is free software: you can redistribute it and/or modify it under the
# terms of the GNU Lesser General Public License as published by the Free
# Software Foundation, either version 2.1 of the License, or (at your option)
# any later version.
#
# CubicWeb is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for more
# details.
#
# You should have received a copy of the GNU Lesser General Public License along
# with CubicWeb.  If not, see <http://www.gnu.org/licenses/>.
"""unit tests for module cubicweb.server.repository"""

import os
import sys
import threading
import time
import logging
from copy import deepcopy
from datetime import datetime

from logilab.common.testlib import TestCase, unittest_main

from yams.constraints import UniqueConstraint
from yams import register_base_type, unregister_base_type

from logilab.database import get_db_helper

from cubicweb import (BadConnectionId, RepositoryError, ValidationError,
                      UnknownEid, AuthenticationError, Unauthorized, QueryError)
from cubicweb.predicates import is_instance
from cubicweb.schema import CubicWebSchema, RQLConstraint
from cubicweb.dbapi import connect, multiple_connections_unfix
from cubicweb.devtools.testlib import CubicWebTC
from cubicweb.devtools.repotest import tuplify
from cubicweb.server import repository, hook
from cubicweb.server.sqlutils import SQL_PREFIX
from cubicweb.server.hook import Hook
from cubicweb.server.sources import native
from cubicweb.server.session import SessionClosedError


class RepositoryTC(CubicWebTC):
    """ singleton providing access to a persistent storage for entities
    and relation
    """

    def test_unique_together_constraint(self):
        self.execute('INSERT Societe S: S nom "Logilab", S type "SSLL", S cp "75013"')
        with self.assertRaises(ValidationError) as wraperr:
            self.execute('INSERT Societe S: S nom "Logilab", S type "SSLL", S cp "75013"')
        self.assertEqual(
            {'cp': u'cp is part of violated unicity constraint',
             'nom': u'nom is part of violated unicity constraint',
             'type': u'type is part of violated unicity constraint',
             'unicity constraint': u'some relations violate a unicity constraint'},
            wraperr.exception.args[1])

    def test_unique_together_schema(self):
        person = self.repo.schema.eschema('Personne')
        self.assertEqual(len(person._unique_together), 1)
        self.assertItemsEqual(person._unique_together[0],
                                           ('nom', 'prenom', 'inline2'))

    def test_all_entities_have_owner(self):
        self.assertFalse(self.execute('Any X WHERE NOT X owned_by U'))

    def test_all_entities_have_is(self):
        self.assertFalse(self.execute('Any X WHERE NOT X is ET'))

    def test_all_entities_have_cw_source(self):
        self.assertFalse(self.execute('Any X WHERE NOT X cw_source S'))

    def test_connect(self):
        cnxid = self.repo.connect(self.admlogin, password=self.admpassword)
        self.assert_(cnxid)
        self.repo.close(cnxid)
        self.assertRaises(AuthenticationError,
                          self.repo.connect, self.admlogin, password='nimportnawak')
        self.assertRaises(AuthenticationError,
                          self.repo.connect, self.admlogin, password='')
        self.assertRaises(AuthenticationError,
                          self.repo.connect, self.admlogin, password=None)
        self.assertRaises(AuthenticationError,
                          self.repo.connect, None, password=None)
        self.assertRaises(AuthenticationError,
                          self.repo.connect, self.admlogin)
        self.assertRaises(AuthenticationError,
                          self.repo.connect, None)

    def test_execute(self):
        repo = self.repo
        cnxid = repo.connect(self.admlogin, password=self.admpassword)
        repo.execute(cnxid, 'Any X')
        repo.execute(cnxid, 'Any X where X is Personne')
        repo.execute(cnxid, 'Any X where X is Personne, X nom ~= "to"')
        repo.execute(cnxid, 'Any X WHERE X has_text %(text)s', {'text': u'\xe7a'})
        repo.close(cnxid)

    def test_login_upassword_accent(self):
        repo = self.repo
        cnxid = repo.connect(self.admlogin, password=self.admpassword)
        repo.execute(cnxid, 'INSERT CWUser X: X login %(login)s, X upassword %(passwd)s, X in_group G WHERE G name "users"',
                     {'login': u"barnab�", 'passwd': u"h�h�h�".encode('UTF8')})
        repo.commit(cnxid)
        repo.close(cnxid)
        cnxid = repo.connect(u"barnab�", password=u"h�h�h�".encode('UTF8'))
        self.assert_(cnxid)
        repo.close(cnxid)

    def test_rollback_on_commit_error(self):
        cnxid = self.repo.connect(self.admlogin, password=self.admpassword)
        self.repo.execute(cnxid,
                          'INSERT CWUser X: X login %(login)s, X upassword %(passwd)s',
                          {'login': u"tutetute", 'passwd': 'tutetute'})
        self.assertRaises(ValidationError, self.repo.commit, cnxid)
        self.assertFalse(self.repo.execute(cnxid, 'CWUser X WHERE X login "tutetute"'))
        self.repo.close(cnxid)

    def test_rollback_on_execute_validation_error(self):
        class ValidationErrorAfterHook(Hook):
            __regid__ = 'valerror-after-hook'
            __select__ = Hook.__select__ & is_instance('CWGroup')
            events = ('after_update_entity',)
            def __call__(self):
                raise ValidationError(self.entity.eid, {})
        with self.temporary_appobjects(ValidationErrorAfterHook):
            self.assertRaises(ValidationError,
                              self.execute, 'SET X name "toto" WHERE X is CWGroup, X name "guests"')
            self.assertTrue(self.execute('Any X WHERE X is CWGroup, X name "toto"'))
            with self.assertRaises(QueryError) as cm:
                self.commit()
            self.assertEqual(str(cm.exception), 'transaction must be rollbacked')
            self.rollback()
            self.assertFalse(self.execute('Any X WHERE X is CWGroup, X name "toto"'))

    def test_rollback_on_execute_unauthorized(self):
        class UnauthorizedAfterHook(Hook):
            __regid__ = 'unauthorized-after-hook'
            __select__ = Hook.__select__ & is_instance('CWGroup')
            events = ('after_update_entity',)
            def __call__(self):
                raise Unauthorized()
        with self.temporary_appobjects(UnauthorizedAfterHook):
            self.assertRaises(Unauthorized,
                              self.execute, 'SET X name "toto" WHERE X is CWGroup, X name "guests"')
            self.assertTrue(self.execute('Any X WHERE X is CWGroup, X name "toto"'))
            with self.assertRaises(QueryError) as cm:
                self.commit()
            self.assertEqual(str(cm.exception), 'transaction must be rollbacked')
            self.rollback()
            self.assertFalse(self.execute('Any X WHERE X is CWGroup, X name "toto"'))


    def test_close(self):
        repo = self.repo
        cnxid = repo.connect(self.admlogin, password=self.admpassword)
        self.assert_(cnxid)
        repo.close(cnxid)
        self.assertRaises(BadConnectionId, repo.execute, cnxid, 'Any X')

    def test_invalid_cnxid(self):
        self.assertRaises(BadConnectionId, self.repo.execute, 0, 'Any X')
        self.assertRaises(BadConnectionId, self.repo.close, None)

    def test_shared_data(self):
        repo = self.repo
        cnxid = repo.connect(self.admlogin, password=self.admpassword)
        repo.set_shared_data(cnxid, 'data', 4)
        cnxid2 = repo.connect(self.admlogin, password=self.admpassword)
        self.assertEqual(repo.get_shared_data(cnxid, 'data'), 4)
        self.assertEqual(repo.get_shared_data(cnxid2, 'data'), None)
        repo.set_shared_data(cnxid2, 'data', 5)
        self.assertEqual(repo.get_shared_data(cnxid, 'data'), 4)
        self.assertEqual(repo.get_shared_data(cnxid2, 'data'), 5)
        repo.get_shared_data(cnxid2, 'data', pop=True)
        self.assertEqual(repo.get_shared_data(cnxid, 'data'), 4)
        self.assertEqual(repo.get_shared_data(cnxid2, 'data'), None)
        repo.close(cnxid)
        repo.close(cnxid2)
        self.assertRaises(BadConnectionId, repo.get_shared_data, cnxid, 'data')
        self.assertRaises(BadConnectionId, repo.get_shared_data, cnxid2, 'data')
        self.assertRaises(BadConnectionId, repo.set_shared_data, cnxid, 'data', 1)
        self.assertRaises(BadConnectionId, repo.set_shared_data, cnxid2, 'data', 1)

    def test_check_session(self):
        repo = self.repo
        cnxid = repo.connect(self.admlogin, password=self.admpassword)
        self.assertIsInstance(repo.check_session(cnxid), float)
        repo.close(cnxid)
        self.assertRaises(BadConnectionId, repo.check_session, cnxid)

    def test_transaction_base(self):
        repo = self.repo
        cnxid = repo.connect(self.admlogin, password=self.admpassword)
        # check db state
        result = repo.execute(cnxid, 'Personne X')
        self.assertEqual(result.rowcount, 0)
        # rollback entity insertion
        repo.execute(cnxid, "INSERT Personne X: X nom 'bidule'")
        result = repo.execute(cnxid, 'Personne X')
        self.assertEqual(result.rowcount, 1)
        repo.rollback(cnxid)
        result = repo.execute(cnxid, 'Personne X')
        self.assertEqual(result.rowcount, 0, result.rows)
        # commit
        repo.execute(cnxid, "INSERT Personne X: X nom 'bidule'")
        repo.commit(cnxid)
        result = repo.execute(cnxid, 'Personne X')
        self.assertEqual(result.rowcount, 1)
        repo.close(cnxid)

    def test_transaction_base2(self):
        repo = self.repo
        cnxid = repo.connect(self.admlogin, password=self.admpassword)
        # rollback relation insertion
        repo.execute(cnxid, "SET U in_group G WHERE U login 'admin', G name 'guests'")
        result = repo.execute(cnxid, "Any U WHERE U in_group G, U login 'admin', G name 'guests'")
        self.assertEqual(result.rowcount, 1)
        repo.rollback(cnxid)
        result = repo.execute(cnxid, "Any U WHERE U in_group G, U login 'admin', G name 'guests'")
        self.assertEqual(result.rowcount, 0, result.rows)
        repo.close(cnxid)

    def test_transaction_base3(self):
        repo = self.repo
        cnxid = repo.connect(self.admlogin, password=self.admpassword)
        # rollback state change which trigger TrInfo insertion
        session = repo._get_session(cnxid)
        session.set_cnxset()
        user = session.user
        user.cw_adapt_to('IWorkflowable').fire_transition('deactivate')
        rset = repo.execute(cnxid, 'TrInfo T WHERE T wf_info_for X, X eid %(x)s', {'x': user.eid})
        self.assertEqual(len(rset), 1)
        repo.rollback(cnxid)
        rset = repo.execute(cnxid, 'TrInfo T WHERE T wf_info_for X, X eid %(x)s', {'x': user.eid})
        self.assertEqual(len(rset), 0)
        repo.close(cnxid)

    def test_transaction_interleaved(self):
        self.skipTest('implement me')

    def test_close_kill_processing_request(self):
        repo = self.repo
        cnxid = repo.connect(self.admlogin, password=self.admpassword)
        repo.execute(cnxid, 'INSERT CWUser X: X login "toto", X upassword "tutu", X in_group G WHERE G name "users"')
        repo.commit(cnxid)
        lock = threading.Lock()
        lock.acquire()
        # close has to be in the thread due to sqlite limitations
        def close_in_a_few_moment():
            lock.acquire()
            repo.close(cnxid)
        t = threading.Thread(target=close_in_a_few_moment)
        t.start()
        def run_transaction():
            lock.release()
            repo.execute(cnxid, 'DELETE CWUser X WHERE X login "toto"')
            repo.commit(cnxid)
        try:
            with self.assertRaises(SessionClosedError) as cm:
                run_transaction()
            self.assertEqual(str(cm.exception), 'try to access connections set on a closed session %s' % cnxid)
        finally:
            t.join()

    def test_initial_schema(self):
        schema = self.repo.schema
        # check order of attributes is respected
        notin = set(('eid', 'is', 'is_instance_of', 'identity',
                     'creation_date', 'modification_date', 'cwuri',
                     'owned_by', 'created_by', 'cw_source',
                     'update_permission', 'read_permission',
                     'in_basket'))
        self.assertListEqual(['relation_type',
                              'from_entity', 'to_entity',
                              'constrained_by',
                              'cardinality', 'ordernum',
                              'indexed', 'fulltextindexed', 'internationalizable',
                              'defaultval', 'extra_props',
                              'description', 'description_format'],
                             [r.type
                              for r in schema.eschema('CWAttribute').ordered_relations()
                              if r.type not in notin])

        self.assertEqual(schema.eschema('CWEType').main_attribute(), 'name')
        self.assertEqual(schema.eschema('State').main_attribute(), 'name')

        constraints = schema.rschema('name').rdef('CWEType', 'String').constraints
        self.assertEqual(len(constraints), 2)
        for cstr in constraints[:]:
            if isinstance(cstr, UniqueConstraint):
                constraints.remove(cstr)
                break
        else:
            self.fail('unique constraint not found')
        sizeconstraint = constraints[0]
        self.assertEqual(sizeconstraint.min, None)
        self.assertEqual(sizeconstraint.max, 64)

        constraints = schema.rschema('relation_type').rdef('CWAttribute', 'CWRType').constraints
        self.assertEqual(len(constraints), 1)
        cstr = constraints[0]
        self.assert_(isinstance(cstr, RQLConstraint))
        self.assertEqual(cstr.expression, 'O final TRUE')

        ownedby = schema.rschema('owned_by')
        self.assertEqual(ownedby.objects('CWEType'), ('CWUser',))

    def test_pyro(self):
        import Pyro
        Pyro.config.PYRO_MULTITHREADED = 0
        done = []
        self.repo.config.global_set_option('pyro-ns-host', 'NO_PYRONS')
        daemon = self.repo.pyro_register()
        try:
            uri = self.repo.pyro_uri.replace('PYRO', 'pyroloc')
            # the client part has to be in the thread due to sqlite limitations
            t = threading.Thread(target=self._pyro_client, args=(uri, done))
            t.start()
            while not done:
                daemon.handleRequests(1.0)
            t.join(1)
            if t.isAlive():
                self.fail('something went wrong, thread still alive')
        finally:
            repository.pyro_unregister(self.repo.config)
            from logilab.common import pyro_ext
            pyro_ext._DAEMONS.clear()


    def _pyro_client(self, uri, done):
        cnx = connect(uri,
                      u'admin', password='gingkow',
                      initlog=False) # don't reset logging configuration
        try:
            cnx.load_appobjects(subpath=('entities',))
            # check we can get the schema
            schema = cnx.get_schema()
            self.assertTrue(cnx.vreg)
            self.assertTrue('etypes'in cnx.vreg)
            cu = cnx.cursor()
            rset = cu.execute('Any U,G WHERE U in_group G')
            user = iter(rset.entities()).next()
            self.assertTrue(user._cw)
            self.assertTrue(user._cw.vreg)
            from cubicweb.entities import authobjs
            self.assertIsInstance(user._cw.user, authobjs.CWUser)
            # make sure the tcp connection is closed properly; yes, it's disgusting.
            adapter = cnx._repo.adapter
            cnx.close()
            adapter.release()
            done.append(True)
        finally:
            # connect monkey patch some method by default, remove them
            multiple_connections_unfix()


    def test_zmq(self):
        try:
            import zmq
        except ImportError:
            self.skipTest("zmq in not available")
        done = []
        from cubicweb.devtools import TestServerConfiguration as ServerConfiguration
        from cubicweb.server.cwzmq import ZMQRepositoryServer
        # the client part has to be in a thread due to sqlite limitations
        t = threading.Thread(target=self._zmq_client, args=(done,))
        t.start()

        zmq_server = ZMQRepositoryServer(self.repo)
        zmq_server.connect('zmqpickle-tcp://127.0.0.1:41415')

        t2 = threading.Thread(target=self._zmq_quit, args=(done, zmq_server,))
        t2.start()

        zmq_server.run()

        t2.join(1)
        t.join(1)

        if t.isAlive():
            self.fail('something went wrong, thread still alive')

    def _zmq_quit(self, done, srv):
        while not done:
            time.sleep(0.1)
        srv.quit()

    def _zmq_client(self, done):
        try:
            cnx = connect('zmqpickle-tcp://127.0.0.1:41415', u'admin', password=u'gingkow',
                          initlog=False) # don't reset logging configuration
            try:
                cnx.load_appobjects(subpath=('entities',))
                # check we can get the schema
                schema = cnx.get_schema()
                self.assertTrue(cnx.vreg)
                self.assertTrue('etypes'in cnx.vreg)
                cu = cnx.cursor()
                rset = cu.execute('Any U,G WHERE U in_group G')
                user = iter(rset.entities()).next()
                self.assertTrue(user._cw)
                self.assertTrue(user._cw.vreg)
                from cubicweb.entities import authobjs
                self.assertIsInstance(user._cw.user, authobjs.CWUser)
                cnx.close()
                done.append(True)
            finally:
                # connect monkey patch some method by default, remove them
                multiple_connections_unfix()
        finally:
            done.append(False)

    def test_internal_api(self):
        repo = self.repo
        cnxid = repo.connect(self.admlogin, password=self.admpassword)
        session = repo._get_session(cnxid, setcnxset=True)
        self.assertEqual(repo.type_and_source_from_eid(2, session),
                         ('CWGroup', 'system', None, 'system'))
        self.assertEqual(repo.type_from_eid(2, session), 'CWGroup')
        self.assertEqual(repo.source_from_eid(2, session).uri, 'system')
        self.assertEqual(repo.eid2extid(repo.system_source, 2, session), None)
        class dummysource: uri = 'toto'
        self.assertRaises(UnknownEid, repo.eid2extid, dummysource, 2, session)
        repo.close(cnxid)

    def test_public_api(self):
        self.assertEqual(self.repo.get_schema(), self.repo.schema)
        self.assertEqual(self.repo.source_defs(), {'system': {'type': 'native',
                                                              'uri': 'system',
                                                              'use-cwuri-as-url': False}
                                                  })
        # .properties() return a result set
        self.assertEqual(self.repo.properties().rql, 'Any K,V WHERE P is CWProperty,P pkey K, P value V, NOT P for_user U')

    def test_session_api(self):
        repo = self.repo
        cnxid = repo.connect(self.admlogin, password=self.admpassword)
        self.assertEqual(repo.user_info(cnxid), (6, 'admin', set([u'managers']), {}))
        self.assertEqual(repo.describe(cnxid, 2), (u'CWGroup', u'system', None, 'system'))
        repo.close(cnxid)
        self.assertRaises(BadConnectionId, repo.user_info, cnxid)
        self.assertRaises(BadConnectionId, repo.describe, cnxid, 1)

    def test_shared_data_api(self):
        repo = self.repo
        cnxid = repo.connect(self.admlogin, password=self.admpassword)
        self.assertEqual(repo.get_shared_data(cnxid, 'data'), None)
        repo.set_shared_data(cnxid, 'data', 4)
        self.assertEqual(repo.get_shared_data(cnxid, 'data'), 4)
        repo.get_shared_data(cnxid, 'data', pop=True)
        repo.get_shared_data(cnxid, 'whatever', pop=True)
        self.assertEqual(repo.get_shared_data(cnxid, 'data'), None)
        repo.close(cnxid)
        self.assertRaises(BadConnectionId, repo.set_shared_data, cnxid, 'data', 0)
        self.assertRaises(BadConnectionId, repo.get_shared_data, cnxid, 'data')

    def test_schema_is_relation(self):
        no_is_rset = self.execute('Any X WHERE NOT X is ET')
        self.assertFalse(no_is_rset, no_is_rset.description)

#     def test_perfo(self):
#         self.set_debug(True)
#         from time import time, clock
#         t, c = time(), clock()
#         try:
#             self.create_user('toto')
#         finally:
#             self.set_debug(False)
#         print 'test time: %.3f (time) %.3f (cpu)' % ((time() - t), clock() - c)

    def test_delete_if_singlecard1(self):
        note = self.request().create_entity('Affaire')
        p1 = self.request().create_entity('Personne', nom=u'toto')
        self.execute('SET A todo_by P WHERE A eid %(x)s, P eid %(p)s',
                     {'x': note.eid, 'p': p1.eid})
        rset = self.execute('Any P WHERE A todo_by P, A eid %(x)s',
                            {'x': note.eid})
        self.assertEqual(len(rset), 1)
        p2 = self.request().create_entity('Personne', nom=u'tutu')
        self.execute('SET A todo_by P WHERE A eid %(x)s, P eid %(p)s',
                     {'x': note.eid, 'p': p2.eid})
        rset = self.execute('Any P WHERE A todo_by P, A eid %(x)s',
                            {'x': note.eid})
        self.assertEqual(len(rset), 1)
        self.assertEqual(rset.rows[0][0], p2.eid)

    def test_delete_if_object_inlined_singlecard(self):
        req = self.request()
        c = req.create_entity('Card', title=u'Carte')
        req.create_entity('Personne', nom=u'Vincent', fiche=c)
        req.create_entity('Personne', nom=u'Florent', fiche=c)
        self.commit()
        self.assertEqual(len(c.reverse_fiche), 1)

    def test_cw_set_in_before_update(self):
        # local hook
        class DummyBeforeHook(Hook):
            __regid__ = 'dummy-before-hook'
            __select__ = Hook.__select__ & is_instance('EmailAddress')
            events = ('before_update_entity',)
            def __call__(self):
                # safety belt: avoid potential infinite recursion if the test
                #              fails (i.e. RuntimeError not raised)
                pendings = self._cw.transaction_data.setdefault('pending', set())
                if self.entity.eid not in pendings:
                    pendings.add(self.entity.eid)
                    self.entity.cw_set(alias=u'foo')
        with self.temporary_appobjects(DummyBeforeHook):
            req = self.request()
            addr = req.create_entity('EmailAddress', address=u'a@b.fr')
            addr.cw_set(address=u'a@b.com')
            rset = self.execute('Any A,AA WHERE X eid %(x)s, X address A, X alias AA',
                                {'x': addr.eid})
            self.assertEqual(rset.rows, [[u'a@b.com', u'foo']])

    def test_cw_set_in_before_add(self):
        # local hook
        class DummyBeforeHook(Hook):
            __regid__ = 'dummy-before-hook'
            __select__ = Hook.__select__ & is_instance('EmailAddress')
            events = ('before_add_entity',)
            def __call__(self):
                # cw_set is forbidden within before_add_entity()
                self.entity.cw_set(alias=u'foo')
        with self.temporary_appobjects(DummyBeforeHook):
            req = self.request()
            # XXX will fail with python -O
            self.assertRaises(AssertionError, req.create_entity,
                              'EmailAddress', address=u'a@b.fr')

    def test_multiple_edit_cw_set(self):
        """make sure cw_edited doesn't get cluttered
        by previous entities on multiple set
        """
        # local hook
        class DummyBeforeHook(Hook):
            _test = self # keep reference to test instance
            __regid__ = 'dummy-before-hook'
            __select__ = Hook.__select__ & is_instance('Affaire')
            events = ('before_update_entity',)
            def __call__(self):
                # invoiced attribute shouldn't be considered "edited" before the hook
                self._test.assertFalse('invoiced' in self.entity.cw_edited,
                                  'cw_edited cluttered by previous update')
                self.entity.cw_edited['invoiced'] = 10
        with self.temporary_appobjects(DummyBeforeHook):
            req = self.request()
            req.create_entity('Affaire', ref=u'AFF01')
            req.create_entity('Affaire', ref=u'AFF02')
            req.execute('SET A duration 10 WHERE A is Affaire')

class SchemaDeserialTC(CubicWebTC):

    appid = 'data-schemaserial'

    @classmethod
    def setUpClass(cls):
        register_base_type('BabarTestType', ('jungle_speed',))
        helper = get_db_helper('sqlite')
        helper.TYPE_MAPPING['BabarTestType'] = 'TEXT'
        helper.TYPE_CONVERTERS['BabarTestType'] = lambda x: '"%s"' % x
        super(SchemaDeserialTC, cls).setUpClass()


    @classmethod
    def tearDownClass(cls):
        unregister_base_type('BabarTestType')
        helper = get_db_helper('sqlite')
        helper.TYPE_MAPPING.pop('BabarTestType', None)
        helper.TYPE_CONVERTERS.pop('BabarTestType', None)
        super(SchemaDeserialTC, cls).tearDownClass()

    def test_deserialization_base(self):
        """Check the following deserialization

        * all CWEtype has name
        * Final type
        * CWUniqueTogetherConstraint
        * _unique_together__ content"""
        origshema = self.repo.schema
        try:
            self.repo.config.repairing = True # avoid versions checking
            self.repo.set_schema(self.repo.deserialize_schema())
            table = SQL_PREFIX + 'CWEType'
            namecol = SQL_PREFIX + 'name'
            finalcol = SQL_PREFIX + 'final'
            self.session.set_cnxset()
            cu = self.session.system_sql('SELECT %s FROM %s WHERE %s is NULL' % (
                namecol, table, finalcol))
            self.assertEqual(cu.fetchall(), [])
            cu = self.session.system_sql('SELECT %s FROM %s WHERE %s=%%(final)s ORDER BY %s'
                                         % (namecol, table, finalcol, namecol), {'final': True})
            self.assertEqual(cu.fetchall(), [(u'BabarTestType',),
                                             (u'BigInt',), (u'Boolean',), (u'Bytes',),
                                             (u'Date',), (u'Datetime',),
                                             (u'Decimal',),(u'Float',),
                                             (u'Int',),
                                             (u'Interval',), (u'Password',),
                                             (u'String',),
                                             (u'TZDatetime',), (u'TZTime',), (u'Time',)])
            sql = ("SELECT etype.cw_eid, etype.cw_name, cstr.cw_eid, rel.eid_to "
                   "FROM cw_CWUniqueTogetherConstraint as cstr, "
                   "     relations_relation as rel, "
                   "     cw_CWEType as etype "
                   "WHERE cstr.cw_eid = rel.eid_from "
                   "  AND cstr.cw_constraint_of = etype.cw_eid "
                   "  AND etype.cw_name = 'Personne' "
                   ";")
            cu = self.session.system_sql(sql)
            rows = cu.fetchall()
            self.assertEqual(len(rows), 3)
            person = self.repo.schema.eschema('Personne')
            self.assertEqual(len(person._unique_together), 1)
            self.assertItemsEqual(person._unique_together[0],
                                  ('nom', 'prenom', 'inline2'))

        finally:
            self.repo.set_schema(origshema)

    def test_custom_attribute_param(self):
        origshema = self.repo.schema
        try:
            self.repo.config.repairing = True # avoid versions checking
            self.repo.set_schema(self.repo.deserialize_schema())
            pes = self.repo.schema['Personne']
            attr = pes.rdef('custom_field_of_jungle')
            self.assertIn('jungle_speed', vars(attr))
            self.assertEqual(42, attr.jungle_speed)
        finally:
            self.repo.set_schema(origshema)



class DataHelpersTC(CubicWebTC):

    def test_create_eid(self):
        self.session.set_cnxset()
        self.assert_(self.repo.system_source.create_eid(self.session))

    def test_source_from_eid(self):
        self.session.set_cnxset()
        self.assertEqual(self.repo.source_from_eid(1, self.session),
                          self.repo.sources_by_uri['system'])

    def test_source_from_eid_raise(self):
        self.session.set_cnxset()
        self.assertRaises(UnknownEid, self.repo.source_from_eid, -2, self.session)

    def test_type_from_eid(self):
        self.session.set_cnxset()
        self.assertEqual(self.repo.type_from_eid(2, self.session), 'CWGroup')

    def test_type_from_eid_raise(self):
        self.session.set_cnxset()
        self.assertRaises(UnknownEid, self.repo.type_from_eid, -2, self.session)

    def test_add_delete_info(self):
        entity = self.repo.vreg['etypes'].etype_class('Personne')(self.session)
        entity.eid = -1
        entity.complete = lambda x: None
        self.session.set_cnxset()
        self.repo.add_info(self.session, entity, self.repo.system_source)
        cu = self.session.system_sql('SELECT * FROM entities WHERE eid = -1')
        data = cu.fetchall()
        self.assertIsInstance(data[0][4], datetime)
        data[0] = list(data[0])
        data[0][4] = None
        self.assertEqual(tuplify(data), [(-1, 'Personne', 'system', 'system',
                                          None, None)])
        self.repo.delete_info(self.session, entity, 'system', None)
        #self.repo.commit()
        cu = self.session.system_sql('SELECT * FROM entities WHERE eid = -1')
        data = cu.fetchall()
        self.assertEqual(data, [])


class FTITC(CubicWebTC):

    def test_reindex_and_modified_since(self):
        self.repo.system_source.multisources_etypes.add('Personne')
        eidp = self.execute('INSERT Personne X: X nom "toto", X prenom "tutu"')[0][0]
        self.commit()
        ts = datetime.now()
        self.assertEqual(len(self.execute('Personne X WHERE X has_text "tutu"')), 1)
        self.session.set_cnxset()
        cu = self.session.system_sql('SELECT mtime, eid FROM entities WHERE eid = %s' % eidp)
        omtime = cu.fetchone()[0]
        # our sqlite datetime adapter is ignore seconds fraction, so we have to
        # ensure update is done the next seconds
        time.sleep(1 - (ts.second - int(ts.second)))
        self.execute('SET X nom "tata" WHERE X eid %(x)s', {'x': eidp})
        self.commit()
        self.assertEqual(len(self.execute('Personne X WHERE X has_text "tutu"')), 1)
        self.session.set_cnxset()
        cu = self.session.system_sql('SELECT mtime FROM entities WHERE eid = %s' % eidp)
        mtime = cu.fetchone()[0]
        self.assertTrue(omtime < mtime)
        self.commit()
        date, modified, deleted = self.repo.entities_modified_since(('Personne',), omtime)
        self.assertEqual(modified, [('Personne', eidp)])
        self.assertEqual(deleted, [])
        date, modified, deleted = self.repo.entities_modified_since(('Personne',), mtime)
        self.assertEqual(modified, [])
        self.assertEqual(deleted, [])
        self.execute('DELETE Personne X WHERE X eid %(x)s', {'x': eidp})
        self.commit()
        date, modified, deleted = self.repo.entities_modified_since(('Personne',), omtime)
        self.assertEqual(modified, [])
        self.assertEqual(deleted, [('Personne', eidp)])

    def test_fulltext_container_entity(self):
        assert self.schema.rschema('use_email').fulltext_container == 'subject'
        req = self.request()
        toto = req.create_entity('EmailAddress', address=u'toto@logilab.fr')
        self.commit()
        rset = req.execute('Any X WHERE X has_text %(t)s', {'t': 'toto'})
        self.assertEqual(rset.rows, [])
        req.user.cw_set(use_email=toto)
        self.commit()
        rset = req.execute('Any X WHERE X has_text %(t)s', {'t': 'toto'})
        self.assertEqual(rset.rows, [[req.user.eid]])
        req.execute('DELETE X use_email Y WHERE X login "admin", Y eid %(y)s',
                    {'y': toto.eid})
        self.commit()
        rset = req.execute('Any X WHERE X has_text %(t)s', {'t': 'toto'})
        self.assertEqual(rset.rows, [])
        tutu = req.create_entity('EmailAddress', address=u'tutu@logilab.fr')
        req.user.cw_set(use_email=tutu)
        self.commit()
        rset = req.execute('Any X WHERE X has_text %(t)s', {'t': 'tutu'})
        self.assertEqual(rset.rows, [[req.user.eid]])
        tutu.cw_set(address=u'hip@logilab.fr')
        self.commit()
        rset = req.execute('Any X WHERE X has_text %(t)s', {'t': 'tutu'})
        self.assertEqual(rset.rows, [])
        rset = req.execute('Any X WHERE X has_text %(t)s', {'t': 'hip'})
        self.assertEqual(rset.rows, [[req.user.eid]])

    def test_no_uncessary_ftiindex_op(self):
        req = self.request()
        req.create_entity('Workflow', name=u'dummy workflow', description=u'huuuuu')
        self.assertFalse(any(x for x in self.session.pending_operations
                        if isinstance(x, native.FTIndexEntityOp)))


class DBInitTC(CubicWebTC):

    def test_versions_inserted(self):
        inserted = [r[0] for r in self.execute('Any K ORDERBY K WHERE P pkey K, P pkey ~= "system.version.%"')]
        self.assertEqual(inserted,
                          [u'system.version.basket', u'system.version.card', u'system.version.comment',
                           u'system.version.cubicweb', u'system.version.email',
                           u'system.version.file', u'system.version.folder',
                           u'system.version.localperms', u'system.version.tag'])

CALLED = []

class InlineRelHooksTC(CubicWebTC):
    """test relation hooks are called for inlined relations
    """
    def setUp(self):
        CubicWebTC.setUp(self)
        CALLED[:] = ()

    def _after_relation_hook(self, cnxset, fromeid, rtype, toeid):
        self.called.append((fromeid, rtype, toeid))

    def test_inline_relation(self):
        """make sure <event>_relation hooks are called for inlined relation"""
        class EcritParHook(hook.Hook):
            __regid__ = 'inlinedrelhook'
            __select__ = hook.Hook.__select__ & hook.match_rtype('ecrit_par')
            events = ('before_add_relation', 'after_add_relation',
                      'before_delete_relation', 'after_delete_relation')
            def __call__(self):
                CALLED.append((self.event, self.eidfrom, self.rtype, self.eidto))

        with self.temporary_appobjects(EcritParHook):
            eidp = self.execute('INSERT Personne X: X nom "toto"')[0][0]
            eidn = self.execute('INSERT Note X: X type "T"')[0][0]
            self.execute('SET N ecrit_par Y WHERE N type "T", Y nom "toto"')
            self.assertEqual(CALLED, [('before_add_relation', eidn, 'ecrit_par', eidp),
                                       ('after_add_relation', eidn, 'ecrit_par', eidp)])
            CALLED[:] = ()
            self.execute('DELETE N ecrit_par Y WHERE N type "T", Y nom "toto"')
            self.assertEqual(CALLED, [('before_delete_relation', eidn, 'ecrit_par', eidp),
                                       ('after_delete_relation', eidn, 'ecrit_par', eidp)])
            CALLED[:] = ()
            eidn = self.execute('INSERT Note N: N ecrit_par P WHERE P nom "toto"')[0][0]
            self.assertEqual(CALLED, [('before_add_relation', eidn, 'ecrit_par', eidp),
                                       ('after_add_relation', eidn, 'ecrit_par', eidp)])

    def test_unique_contraint(self):
        req = self.request()
        toto = req.create_entity('Personne', nom=u'toto')
        a01 = req.create_entity('Affaire', ref=u'A01', todo_by=toto)
        req.cnx.commit()
        req = self.request()
        req.create_entity('Note', type=u'todo', inline1=a01)
        req.cnx.commit()
        req = self.request()
        req.create_entity('Note', type=u'todo', inline1=a01)
        with self.assertRaises(ValidationError) as cm:
            req.cnx.commit()
        self.assertEqual(cm.exception.errors, {'inline1-subject': u'RQLUniqueConstraint S type T, S inline1 A1, A1 todo_by C, Y type T, Y inline1 A2, A2 todo_by C failed'})

    def test_add_relations_at_creation_with_del_existing_rel(self):
        req = self.request()
        person = req.create_entity('Personne', nom=u'Toto', prenom=u'Lanturlu', sexe=u'M')
        users_rql = 'Any U WHERE U is CWGroup, U name "users"'
        users = self.execute(users_rql).get_entity(0, 0)
        req.create_entity('CWUser',
                      login=u'Toto',
                      upassword=u'firstname',
                      firstname=u'firstname',
                      surname=u'surname',
                      reverse_login_user=person,
                      in_group=users)
        self.commit()


class PerformanceTest(CubicWebTC):
    def setUp(self):
        super(PerformanceTest, self).setUp()
        logger = logging.getLogger('cubicweb.session')
        #logger.handlers = [logging.StreamHandler(sys.stdout)]
        logger.setLevel(logging.INFO)
        self.info = logger.info

    def tearDown(self):
        super(PerformanceTest, self).tearDown()
        logger = logging.getLogger('cubicweb.session')
        logger.setLevel(logging.CRITICAL)

    def test_composite_deletion(self):
        req = self.request()
        personnes = []
        t0 = time.time()
        for i in xrange(2000):
            p = req.create_entity('Personne', nom=u'Doe%03d'%i, prenom=u'John', sexe=u'M')
            personnes.append(p)
        abraham = req.create_entity('Personne', nom=u'Abraham', prenom=u'John', sexe=u'M')
        for j in xrange(0, 2000, 100):
            abraham.cw_set(personne_composite=personnes[j:j+100])
        t1 = time.time()
        self.info('creation: %.2gs', (t1 - t0))
        req.cnx.commit()
        t2 = time.time()
        self.info('commit creation: %.2gs', (t2 - t1))
        self.execute('DELETE Personne P WHERE P eid %(eid)s', {'eid': abraham.eid})
        t3 = time.time()
        self.info('deletion: %.2gs', (t3 - t2))
        req.cnx.commit()
        t4 = time.time()
        self.info("commit deletion: %2gs", (t4 - t3))

    def test_add_relation_non_inlined(self):
        req = self.request()
        personnes = []
        for i in xrange(2000):
            p = req.create_entity('Personne', nom=u'Doe%03d'%i, prenom=u'John', sexe=u'M')
            personnes.append(p)
        req.cnx.commit()
        t0 = time.time()
        abraham = req.create_entity('Personne', nom=u'Abraham', prenom=u'John', sexe=u'M',
                                    personne_composite=personnes[:100])
        t1 = time.time()
        self.info('creation: %.2gs', (t1 - t0))
        for j in xrange(100, 2000, 100):
            abraham.cw_set(personne_composite=personnes[j:j+100])
        t2 = time.time()
        self.info('more relations: %.2gs', (t2-t1))
        req.cnx.commit()
        t3 = time.time()
        self.info('commit creation: %.2gs', (t3 - t2))

    def test_add_relation_inlined(self):
        req = self.request()
        personnes = []
        for i in xrange(2000):
            p = req.create_entity('Personne', nom=u'Doe%03d'%i, prenom=u'John', sexe=u'M')
            personnes.append(p)
        req.cnx.commit()
        t0 = time.time()
        abraham = req.create_entity('Personne', nom=u'Abraham', prenom=u'John', sexe=u'M',
                                    personne_inlined=personnes[:100])
        t1 = time.time()
        self.info('creation: %.2gs', (t1 - t0))
        for j in xrange(100, 2000, 100):
            abraham.cw_set(personne_inlined=personnes[j:j+100])
        t2 = time.time()
        self.info('more relations: %.2gs', (t2-t1))
        req.cnx.commit()
        t3 = time.time()
        self.info('commit creation: %.2gs', (t3 - t2))


    def test_session_add_relation(self):
        """ to be compared with test_session_add_relations"""
        req = self.request()
        personnes = []
        for i in xrange(2000):
            p = req.create_entity('Personne', nom=u'Doe%03d'%i, prenom=u'John', sexe=u'M')
            personnes.append(p)
        abraham = req.create_entity('Personne', nom=u'Abraham', prenom=u'John', sexe=u'M')
        req.cnx.commit()
        t0 = time.time()
        add_relation = self.session.add_relation
        for p in personnes:
            add_relation(abraham.eid, 'personne_composite', p.eid)
        req.cnx.commit()
        t1 = time.time()
        self.info('add relation: %.2gs', t1-t0)

    def test_session_add_relations (self):
        """ to be compared with test_session_add_relation"""
        req = self.request()
        personnes = []
        for i in xrange(2000):
            p = req.create_entity('Personne', nom=u'Doe%03d'%i, prenom=u'John', sexe=u'M')
            personnes.append(p)
        abraham = req.create_entity('Personne', nom=u'Abraham', prenom=u'John', sexe=u'M')
        req.cnx.commit()
        t0 = time.time()
        add_relations = self.session.add_relations
        relations = [('personne_composite', [(abraham.eid, p.eid) for p in personnes])]
        add_relations(relations)
        req.cnx.commit()
        t1 = time.time()
        self.info('add relations: %.2gs', t1-t0)

    def test_session_add_relation_inlined(self):
        """ to be compared with test_session_add_relations"""
        req = self.request()
        personnes = []
        for i in xrange(2000):
            p = req.create_entity('Personne', nom=u'Doe%03d'%i, prenom=u'John', sexe=u'M')
            personnes.append(p)
        abraham = req.create_entity('Personne', nom=u'Abraham', prenom=u'John', sexe=u'M')
        req.cnx.commit()
        t0 = time.time()
        add_relation = self.session.add_relation
        for p in personnes:
            add_relation(abraham.eid, 'personne_inlined', p.eid)
        req.cnx.commit()
        t1 = time.time()
        self.info('add relation (inlined): %.2gs', t1-t0)

    def test_session_add_relations_inlined (self):
        """ to be compared with test_session_add_relation"""
        req = self.request()
        personnes = []
        for i in xrange(2000):
            p = req.create_entity('Personne', nom=u'Doe%03d'%i, prenom=u'John', sexe=u'M')
            personnes.append(p)
        abraham = req.create_entity('Personne', nom=u'Abraham', prenom=u'John', sexe=u'M')
        req.cnx.commit()
        t0 = time.time()
        add_relations = self.session.add_relations
        relations = [('personne_inlined', [(abraham.eid, p.eid) for p in personnes])]
        add_relations(relations)
        req.cnx.commit()
        t1 = time.time()
        self.info('add relations (inlined): %.2gs', t1-t0)

    def test_optional_relation_reset_1(self):
        req = self.request()
        p1 = req.create_entity('Personne', nom=u'Vincent')
        p2 = req.create_entity('Personne', nom=u'Florent')
        w = req.create_entity('Affaire', ref=u'wc')
        w.cw_set(todo_by=[p1,p2])
        w.cw_clear_all_caches()
        self.commit()
        self.assertEqual(len(w.todo_by), 1)
        self.assertEqual(w.todo_by[0].eid, p2.eid)

    def test_optional_relation_reset_2(self):
        req = self.request()
        p1 = req.create_entity('Personne', nom=u'Vincent')
        p2 = req.create_entity('Personne', nom=u'Florent')
        w = req.create_entity('Affaire', ref=u'wc')
        w.cw_set(todo_by=p1)
        self.commit()
        w.cw_set(todo_by=p2)
        w.cw_clear_all_caches()
        self.commit()
        self.assertEqual(len(w.todo_by), 1)
        self.assertEqual(w.todo_by[0].eid, p2.eid)


if __name__ == '__main__':
    unittest_main()