author | Laura Médioni <laura.medioni@logilab.fr> |
Mon, 28 Apr 2014 14:11:23 +0200 | |
changeset 9966 | 6c2d57d1b6de |
parent 9965 | a8769b752299 |
child 10203 | 1443fe643a38 |
child 10280 | 2cdab5e33542 |
permissions | -rw-r--r-- |
# -*- coding: iso-8859-1 -*- # copyright 2003-2014 LOGILAB S.A. (Paris, FRANCE), all rights reserved. # contact http://www.logilab.fr/ -- mailto:contact@logilab.fr # # This file is part of CubicWeb. # # CubicWeb is free software: you can redistribute it and/or modify it under the # terms of the GNU Lesser General Public License as published by the Free # Software Foundation, either version 2.1 of the License, or (at your option) # any later version. # # CubicWeb is distributed in the hope that it will be useful, but WITHOUT # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS # FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more # details. # # You should have received a copy of the GNU Lesser General Public License along # with CubicWeb. If not, see <http://www.gnu.org/licenses/>. """unit tests for module cubicweb.server.repository""" import threading import time import logging from yams.constraints import UniqueConstraint from yams import register_base_type, unregister_base_type from logilab.database import get_db_helper from cubicweb import (BadConnectionId, ValidationError, UnknownEid, AuthenticationError, Unauthorized, QueryError) from cubicweb.predicates import is_instance from cubicweb.schema import RQLConstraint from cubicweb.dbapi import connect, multiple_connections_unfix from cubicweb.devtools.testlib import CubicWebTC from cubicweb.devtools.repotest import tuplify from cubicweb.server import repository, hook from cubicweb.server.sqlutils import SQL_PREFIX from cubicweb.server.hook import Hook from cubicweb.server.sources import native from cubicweb.server.session import SessionClosedError class RepositoryTC(CubicWebTC): """ singleton providing access to a persistent storage for entities and relation """ def test_unique_together_constraint(self): with self.admin_access.repo_cnx() as cnx: cnx.execute('INSERT Societe S: S nom "Logilab", S type "SSLL", S cp "75013"') with self.assertRaises(ValidationError) as wraperr: cnx.execute('INSERT Societe S: S nom "Logilab", S type "SSLL", S cp "75013"') self.assertEqual( {'cp': u'cp is part of violated unicity constraint', 'nom': u'nom is part of violated unicity constraint', 'type': u'type is part of violated unicity constraint', 'unicity constraint': u'some relations violate a unicity constraint'}, wraperr.exception.args[1]) def test_unique_together_schema(self): person = self.repo.schema.eschema('Personne') self.assertEqual(len(person._unique_together), 1) self.assertItemsEqual(person._unique_together[0], ('nom', 'prenom', 'inline2')) def test_all_entities_have_owner(self): with self.admin_access.repo_cnx() as cnx: self.assertFalse(cnx.execute('Any X WHERE NOT X owned_by U')) def test_all_entities_have_is(self): with self.admin_access.repo_cnx() as cnx: self.assertFalse(cnx.execute('Any X WHERE NOT X is ET')) def test_all_entities_have_cw_source(self): with self.admin_access.repo_cnx() as cnx: self.assertFalse(cnx.execute('Any X WHERE NOT X cw_source S')) def test_connect(self): cnxid = self.repo.connect(self.admlogin, password=self.admpassword) self.assert_(cnxid) self.repo.close(cnxid) self.assertRaises(AuthenticationError, self.repo.connect, self.admlogin, password='nimportnawak') self.assertRaises(AuthenticationError, self.repo.connect, self.admlogin, password='') self.assertRaises(AuthenticationError, self.repo.connect, self.admlogin, password=None) self.assertRaises(AuthenticationError, self.repo.connect, None, password=None) self.assertRaises(AuthenticationError, self.repo.connect, self.admlogin) self.assertRaises(AuthenticationError, self.repo.connect, None) def test_execute(self): repo = self.repo cnxid = repo.connect(self.admlogin, password=self.admpassword) repo.execute(cnxid, 'Any X') repo.execute(cnxid, 'Any X where X is Personne') repo.execute(cnxid, 'Any X where X is Personne, X nom ~= "to"') repo.execute(cnxid, 'Any X WHERE X has_text %(text)s', {'text': u'\xe7a'}) repo.close(cnxid) def test_login_upassword_accent(self): repo = self.repo cnxid = repo.connect(self.admlogin, password=self.admpassword) repo.execute(cnxid, 'INSERT CWUser X: X login %(login)s, X upassword %(passwd)s, X in_group G WHERE G name "users"', {'login': u"barnab�", 'passwd': u"h�h�h�".encode('UTF8')}) repo.commit(cnxid) repo.close(cnxid) cnxid = repo.connect(u"barnab�", password=u"h�h�h�".encode('UTF8')) self.assert_(cnxid) repo.close(cnxid) def test_rollback_on_commit_error(self): cnxid = self.repo.connect(self.admlogin, password=self.admpassword) self.repo.execute(cnxid, 'INSERT CWUser X: X login %(login)s, X upassword %(passwd)s', {'login': u"tutetute", 'passwd': 'tutetute'}) self.assertRaises(ValidationError, self.repo.commit, cnxid) self.assertFalse(self.repo.execute(cnxid, 'CWUser X WHERE X login "tutetute"')) self.repo.close(cnxid) def test_rollback_on_execute_validation_error(self): class ValidationErrorAfterHook(Hook): __regid__ = 'valerror-after-hook' __select__ = Hook.__select__ & is_instance('CWGroup') events = ('after_update_entity',) def __call__(self): raise ValidationError(self.entity.eid, {}) with self.admin_access.repo_cnx() as cnx: with self.temporary_appobjects(ValidationErrorAfterHook): self.assertRaises(ValidationError, cnx.execute, 'SET X name "toto" WHERE X is CWGroup, X name "guests"') self.assertTrue(cnx.execute('Any X WHERE X is CWGroup, X name "toto"')) with self.assertRaises(QueryError) as cm: cnx.commit() self.assertEqual(str(cm.exception), 'transaction must be rolled back') cnx.rollback() self.assertFalse(cnx.execute('Any X WHERE X is CWGroup, X name "toto"')) def test_rollback_on_execute_unauthorized(self): class UnauthorizedAfterHook(Hook): __regid__ = 'unauthorized-after-hook' __select__ = Hook.__select__ & is_instance('CWGroup') events = ('after_update_entity',) def __call__(self): raise Unauthorized() with self.admin_access.repo_cnx() as cnx: with self.temporary_appobjects(UnauthorizedAfterHook): self.assertRaises(Unauthorized, cnx.execute, 'SET X name "toto" WHERE X is CWGroup, X name "guests"') self.assertTrue(cnx.execute('Any X WHERE X is CWGroup, X name "toto"')) with self.assertRaises(QueryError) as cm: cnx.commit() self.assertEqual(str(cm.exception), 'transaction must be rolled back') cnx.rollback() self.assertFalse(cnx.execute('Any X WHERE X is CWGroup, X name "toto"')) def test_close(self): repo = self.repo cnxid = repo.connect(self.admlogin, password=self.admpassword) self.assert_(cnxid) repo.close(cnxid) self.assertRaises(BadConnectionId, repo.execute, cnxid, 'Any X') def test_invalid_cnxid(self): self.assertRaises(BadConnectionId, self.repo.execute, 0, 'Any X') self.assertRaises(BadConnectionId, self.repo.close, None) def test_shared_data(self): repo = self.repo cnxid = repo.connect(self.admlogin, password=self.admpassword) repo.set_shared_data(cnxid, 'data', 4) cnxid2 = repo.connect(self.admlogin, password=self.admpassword) self.assertEqual(repo.get_shared_data(cnxid, 'data'), 4) self.assertEqual(repo.get_shared_data(cnxid2, 'data'), None) repo.set_shared_data(cnxid2, 'data', 5) self.assertEqual(repo.get_shared_data(cnxid, 'data'), 4) self.assertEqual(repo.get_shared_data(cnxid2, 'data'), 5) repo.get_shared_data(cnxid2, 'data', pop=True) self.assertEqual(repo.get_shared_data(cnxid, 'data'), 4) self.assertEqual(repo.get_shared_data(cnxid2, 'data'), None) repo.close(cnxid) repo.close(cnxid2) self.assertRaises(BadConnectionId, repo.get_shared_data, cnxid, 'data') self.assertRaises(BadConnectionId, repo.get_shared_data, cnxid2, 'data') self.assertRaises(BadConnectionId, repo.set_shared_data, cnxid, 'data', 1) self.assertRaises(BadConnectionId, repo.set_shared_data, cnxid2, 'data', 1) def test_check_session(self): repo = self.repo cnxid = repo.connect(self.admlogin, password=self.admpassword) self.assertIsInstance(repo.check_session(cnxid), float) repo.close(cnxid) self.assertRaises(BadConnectionId, repo.check_session, cnxid) def test_transaction_base(self): repo = self.repo cnxid = repo.connect(self.admlogin, password=self.admpassword) # check db state result = repo.execute(cnxid, 'Personne X') self.assertEqual(result.rowcount, 0) # rollback entity insertion repo.execute(cnxid, "INSERT Personne X: X nom 'bidule'") result = repo.execute(cnxid, 'Personne X') self.assertEqual(result.rowcount, 1) repo.rollback(cnxid) result = repo.execute(cnxid, 'Personne X') self.assertEqual(result.rowcount, 0, result.rows) # commit repo.execute(cnxid, "INSERT Personne X: X nom 'bidule'") repo.commit(cnxid) result = repo.execute(cnxid, 'Personne X') self.assertEqual(result.rowcount, 1) repo.close(cnxid) def test_transaction_base2(self): repo = self.repo cnxid = repo.connect(self.admlogin, password=self.admpassword) # rollback relation insertion repo.execute(cnxid, "SET U in_group G WHERE U login 'admin', G name 'guests'") result = repo.execute(cnxid, "Any U WHERE U in_group G, U login 'admin', G name 'guests'") self.assertEqual(result.rowcount, 1) repo.rollback(cnxid) result = repo.execute(cnxid, "Any U WHERE U in_group G, U login 'admin', G name 'guests'") self.assertEqual(result.rowcount, 0, result.rows) repo.close(cnxid) def test_transaction_base3(self): repo = self.repo cnxid = repo.connect(self.admlogin, password=self.admpassword) # rollback state change which trigger TrInfo insertion session = repo._get_session(cnxid) user = session.user user.cw_adapt_to('IWorkflowable').fire_transition('deactivate') rset = repo.execute(cnxid, 'TrInfo T WHERE T wf_info_for X, X eid %(x)s', {'x': user.eid}) self.assertEqual(len(rset), 1) repo.rollback(cnxid) rset = repo.execute(cnxid, 'TrInfo T WHERE T wf_info_for X, X eid %(x)s', {'x': user.eid}) self.assertEqual(len(rset), 0) repo.close(cnxid) def test_close_kill_processing_request(self): repo = self.repo cnxid = repo.connect(self.admlogin, password=self.admpassword) repo.execute(cnxid, 'INSERT CWUser X: X login "toto", X upassword "tutu", X in_group G WHERE G name "users"') repo.commit(cnxid) lock = threading.Lock() lock.acquire() # close has to be in the thread due to sqlite limitations def close_in_a_few_moment(): lock.acquire() repo.close(cnxid) t = threading.Thread(target=close_in_a_few_moment) t.start() def run_transaction(): lock.release() repo.execute(cnxid, 'DELETE CWUser X WHERE X login "toto"') repo.commit(cnxid) try: with self.assertRaises(SessionClosedError) as cm: run_transaction() self.assertEqual(str(cm.exception), 'try to access connections set on a closed session %s' % cnxid) finally: t.join() def test_initial_schema(self): schema = self.repo.schema # check order of attributes is respected notin = set(('eid', 'is', 'is_instance_of', 'identity', 'creation_date', 'modification_date', 'cwuri', 'owned_by', 'created_by', 'cw_source', 'update_permission', 'read_permission', 'add_permission', 'in_basket')) self.assertListEqual(['relation_type', 'from_entity', 'to_entity', 'constrained_by', 'cardinality', 'ordernum', 'formula', 'indexed', 'fulltextindexed', 'internationalizable', 'defaultval', 'extra_props', 'description', 'description_format'], [r.type for r in schema.eschema('CWAttribute').ordered_relations() if r.type not in notin]) self.assertEqual(schema.eschema('CWEType').main_attribute(), 'name') self.assertEqual(schema.eschema('State').main_attribute(), 'name') constraints = schema.rschema('name').rdef('CWEType', 'String').constraints self.assertEqual(len(constraints), 2) for cstr in constraints[:]: if isinstance(cstr, UniqueConstraint): constraints.remove(cstr) break else: self.fail('unique constraint not found') sizeconstraint = constraints[0] self.assertEqual(sizeconstraint.min, None) self.assertEqual(sizeconstraint.max, 64) constraints = schema.rschema('relation_type').rdef('CWAttribute', 'CWRType').constraints self.assertEqual(len(constraints), 1) cstr = constraints[0] self.assert_(isinstance(cstr, RQLConstraint)) self.assertEqual(cstr.expression, 'O final TRUE') ownedby = schema.rschema('owned_by') self.assertEqual(ownedby.objects('CWEType'), ('CWUser',)) def test_pyro(self): import Pyro Pyro.config.PYRO_MULTITHREADED = 0 done = [] self.repo.config.global_set_option('pyro-ns-host', 'NO_PYRONS') daemon = self.repo.pyro_register() try: uri = self.repo.pyro_uri.replace('PYRO', 'pyroloc') # the client part has to be in the thread due to sqlite limitations t = threading.Thread(target=self._pyro_client, args=(uri, done)) t.start() while not done: daemon.handleRequests(1.0) t.join(1) if t.isAlive(): self.fail('something went wrong, thread still alive') finally: repository.pyro_unregister(self.repo.config) from logilab.common import pyro_ext pyro_ext._DAEMONS.clear() def _pyro_client(self, uri, done): cnx = connect(uri, u'admin', password='gingkow', initlog=False) # don't reset logging configuration try: cnx.load_appobjects(subpath=('entities',)) # check we can get the schema schema = cnx.get_schema() self.assertTrue(cnx.vreg) self.assertTrue('etypes'in cnx.vreg) cu = cnx.cursor() rset = cu.execute('Any U,G WHERE U in_group G') user = iter(rset.entities()).next() self.assertTrue(user._cw) self.assertTrue(user._cw.vreg) from cubicweb.entities import authobjs self.assertIsInstance(user._cw.user, authobjs.CWUser) # make sure the tcp connection is closed properly; yes, it's disgusting. adapter = cnx._repo.adapter cnx.close() adapter.release() done.append(True) finally: # connect monkey patch some method by default, remove them multiple_connections_unfix() def test_zmq(self): try: import zmq except ImportError: self.skipTest("zmq in not available") done = [] from cubicweb.devtools import TestServerConfiguration as ServerConfiguration from cubicweb.server.cwzmq import ZMQRepositoryServer # the client part has to be in a thread due to sqlite limitations t = threading.Thread(target=self._zmq_client, args=(done,)) t.start() zmq_server = ZMQRepositoryServer(self.repo) zmq_server.connect('zmqpickle-tcp://127.0.0.1:41415') t2 = threading.Thread(target=self._zmq_quit, args=(done, zmq_server,)) t2.start() zmq_server.run() t2.join(1) t.join(1) if t.isAlive(): self.fail('something went wrong, thread still alive') def _zmq_quit(self, done, srv): while not done: time.sleep(0.1) srv.quit() def _zmq_client(self, done): try: cnx = connect('zmqpickle-tcp://127.0.0.1:41415', u'admin', password=u'gingkow', initlog=False) # don't reset logging configuration try: cnx.load_appobjects(subpath=('entities',)) # check we can get the schema schema = cnx.get_schema() self.assertTrue(cnx.vreg) self.assertTrue('etypes'in cnx.vreg) cu = cnx.cursor() rset = cu.execute('Any U,G WHERE U in_group G') user = iter(rset.entities()).next() self.assertTrue(user._cw) self.assertTrue(user._cw.vreg) from cubicweb.entities import authobjs self.assertIsInstance(user._cw.user, authobjs.CWUser) cnx.close() done.append(True) finally: # connect monkey patch some method by default, remove them multiple_connections_unfix() finally: done.append(False) def test_internal_api(self): repo = self.repo cnxid = repo.connect(self.admlogin, password=self.admpassword) session = repo._get_session(cnxid, setcnxset=True) self.assertEqual(repo.type_and_source_from_eid(2, session), ('CWGroup', None, 'system')) self.assertEqual(repo.type_from_eid(2, session), 'CWGroup') repo.close(cnxid) def test_public_api(self): self.assertEqual(self.repo.get_schema(), self.repo.schema) self.assertEqual(self.repo.source_defs(), {'system': {'type': 'native', 'uri': 'system', 'use-cwuri-as-url': False} }) # .properties() return a result set self.assertEqual(self.repo.properties().rql, 'Any K,V WHERE P is CWProperty,P pkey K, P value V, NOT P for_user U') def test_session_api(self): repo = self.repo cnxid = repo.connect(self.admlogin, password=self.admpassword) self.assertEqual(repo.user_info(cnxid), (6, 'admin', set([u'managers']), {})) self.assertEqual({'type': u'CWGroup', 'extid': None, 'source': 'system'}, repo.entity_metas(cnxid, 2)) self.assertEqual(repo.describe(cnxid, 2), (u'CWGroup', 'system', None, 'system')) repo.close(cnxid) self.assertRaises(BadConnectionId, repo.user_info, cnxid) self.assertRaises(BadConnectionId, repo.describe, cnxid, 1) def test_shared_data_api(self): repo = self.repo cnxid = repo.connect(self.admlogin, password=self.admpassword) self.assertEqual(repo.get_shared_data(cnxid, 'data'), None) repo.set_shared_data(cnxid, 'data', 4) self.assertEqual(repo.get_shared_data(cnxid, 'data'), 4) repo.get_shared_data(cnxid, 'data', pop=True) repo.get_shared_data(cnxid, 'whatever', pop=True) self.assertEqual(repo.get_shared_data(cnxid, 'data'), None) repo.close(cnxid) self.assertRaises(BadConnectionId, repo.set_shared_data, cnxid, 'data', 0) self.assertRaises(BadConnectionId, repo.get_shared_data, cnxid, 'data') def test_schema_is_relation(self): with self.admin_access.repo_cnx() as cnx: no_is_rset = cnx.execute('Any X WHERE NOT X is ET') self.assertFalse(no_is_rset, no_is_rset.description) # def test_perfo(self): # self.set_debug(True) # from time import time, clock # t, c = time(), clock() # try: # self.create_user('toto') # finally: # self.set_debug(False) # print 'test time: %.3f (time) %.3f (cpu)' % ((time() - t), clock() - c) def test_delete_if_singlecard1(self): with self.admin_access.repo_cnx() as cnx: note = cnx.create_entity('Affaire') p1 = cnx.create_entity('Personne', nom=u'toto') cnx.execute('SET A todo_by P WHERE A eid %(x)s, P eid %(p)s', {'x': note.eid, 'p': p1.eid}) rset = cnx.execute('Any P WHERE A todo_by P, A eid %(x)s', {'x': note.eid}) self.assertEqual(len(rset), 1) p2 = cnx.create_entity('Personne', nom=u'tutu') cnx.execute('SET A todo_by P WHERE A eid %(x)s, P eid %(p)s', {'x': note.eid, 'p': p2.eid}) rset = cnx.execute('Any P WHERE A todo_by P, A eid %(x)s', {'x': note.eid}) self.assertEqual(len(rset), 1) self.assertEqual(rset.rows[0][0], p2.eid) def test_delete_if_object_inlined_singlecard(self): with self.admin_access.repo_cnx() as cnx: c = cnx.create_entity('Card', title=u'Carte') cnx.create_entity('Personne', nom=u'Vincent', fiche=c) cnx.create_entity('Personne', nom=u'Florent', fiche=c) cnx.commit() self.assertEqual(len(c.reverse_fiche), 1) def test_cw_set_in_before_update(self): # local hook class DummyBeforeHook(Hook): __regid__ = 'dummy-before-hook' __select__ = Hook.__select__ & is_instance('EmailAddress') events = ('before_update_entity',) def __call__(self): # safety belt: avoid potential infinite recursion if the test # fails (i.e. RuntimeError not raised) pendings = self._cw.transaction_data.setdefault('pending', set()) if self.entity.eid not in pendings: pendings.add(self.entity.eid) self.entity.cw_set(alias=u'foo') with self.admin_access.repo_cnx() as cnx: with self.temporary_appobjects(DummyBeforeHook): addr = cnx.create_entity('EmailAddress', address=u'a@b.fr') addr.cw_set(address=u'a@b.com') rset = cnx.execute('Any A,AA WHERE X eid %(x)s, X address A, X alias AA', {'x': addr.eid}) self.assertEqual(rset.rows, [[u'a@b.com', u'foo']]) def test_cw_set_in_before_add(self): # local hook class DummyBeforeHook(Hook): __regid__ = 'dummy-before-hook' __select__ = Hook.__select__ & is_instance('EmailAddress') events = ('before_add_entity',) def __call__(self): # cw_set is forbidden within before_add_entity() self.entity.cw_set(alias=u'foo') with self.admin_access.repo_cnx() as cnx: with self.temporary_appobjects(DummyBeforeHook): # XXX will fail with python -O self.assertRaises(AssertionError, cnx.create_entity, 'EmailAddress', address=u'a@b.fr') def test_multiple_edit_cw_set(self): """make sure cw_edited doesn't get cluttered by previous entities on multiple set """ # local hook class DummyBeforeHook(Hook): _test = self # keep reference to test instance __regid__ = 'dummy-before-hook' __select__ = Hook.__select__ & is_instance('Affaire') events = ('before_update_entity',) def __call__(self): # invoiced attribute shouldn't be considered "edited" before the hook self._test.assertFalse('invoiced' in self.entity.cw_edited, 'cw_edited cluttered by previous update') self.entity.cw_edited['invoiced'] = 10 with self.admin_access.repo_cnx() as cnx: with self.temporary_appobjects(DummyBeforeHook): cnx.create_entity('Affaire', ref=u'AFF01') cnx.create_entity('Affaire', ref=u'AFF02') cnx.execute('SET A duration 10 WHERE A is Affaire') def test_user_friendly_error(self): from cubicweb.entities.adapters import IUserFriendlyUniqueTogether class MyIUserFriendlyUniqueTogether(IUserFriendlyUniqueTogether): __select__ = IUserFriendlyUniqueTogether.__select__ & is_instance('Societe') def raise_user_exception(self): raise ValidationError(self.entity.eid, {'hip': 'hop'}) with self.admin_access.repo_cnx() as cnx: with self.temporary_appobjects(MyIUserFriendlyUniqueTogether): s = cnx.create_entity('Societe', nom=u'Logilab', type=u'ssll', cp=u'75013') cnx.commit() with self.assertRaises(ValidationError) as cm: cnx.create_entity('Societe', nom=u'Logilab', type=u'ssll', cp=u'75013') self.assertEqual(cm.exception.errors, {'hip': 'hop'}) cnx.rollback() cnx.create_entity('Societe', nom=u'Logilab', type=u'ssll', cp=u'31400') with self.assertRaises(ValidationError) as cm: s.cw_set(cp=u'31400') self.assertEqual(cm.exception.entity, s.eid) self.assertEqual(cm.exception.errors, {'hip': 'hop'}) cnx.rollback() class SchemaDeserialTC(CubicWebTC): appid = 'data-schemaserial' @classmethod def setUpClass(cls): register_base_type('BabarTestType', ('jungle_speed',)) helper = get_db_helper('sqlite') helper.TYPE_MAPPING['BabarTestType'] = 'TEXT' helper.TYPE_CONVERTERS['BabarTestType'] = lambda x: '"%s"' % x super(SchemaDeserialTC, cls).setUpClass() @classmethod def tearDownClass(cls): unregister_base_type('BabarTestType') helper = get_db_helper('sqlite') helper.TYPE_MAPPING.pop('BabarTestType', None) helper.TYPE_CONVERTERS.pop('BabarTestType', None) super(SchemaDeserialTC, cls).tearDownClass() def test_deserialization_base(self): """Check the following deserialization * all CWEtype has name * Final type * CWUniqueTogetherConstraint * _unique_together__ content""" origshema = self.repo.schema try: self.repo.config.repairing = True # avoid versions checking self.repo.set_schema(self.repo.deserialize_schema()) table = SQL_PREFIX + 'CWEType' namecol = SQL_PREFIX + 'name' finalcol = SQL_PREFIX + 'final' with self.admin_access.repo_cnx() as cnx: with cnx.ensure_cnx_set: cu = cnx.system_sql('SELECT %s FROM %s WHERE %s is NULL' % (namecol, table, finalcol)) self.assertEqual(cu.fetchall(), []) cu = cnx.system_sql('SELECT %s FROM %s ' 'WHERE %s=%%(final)s ORDER BY %s' % (namecol, table, finalcol, namecol), {'final': True}) self.assertEqual(cu.fetchall(), [(u'BabarTestType',), (u'BigInt',), (u'Boolean',), (u'Bytes',), (u'Date',), (u'Datetime',), (u'Decimal',),(u'Float',), (u'Int',), (u'Interval',), (u'Password',), (u'String',), (u'TZDatetime',), (u'TZTime',), (u'Time',)]) sql = ("SELECT etype.cw_eid, etype.cw_name, cstr.cw_eid, rel.eid_to " "FROM cw_CWUniqueTogetherConstraint as cstr, " " relations_relation as rel, " " cw_CWEType as etype " "WHERE cstr.cw_eid = rel.eid_from " " AND cstr.cw_constraint_of = etype.cw_eid " " AND etype.cw_name = 'Personne' " ";") cu = cnx.system_sql(sql) rows = cu.fetchall() self.assertEqual(len(rows), 3) person = self.repo.schema.eschema('Personne') self.assertEqual(len(person._unique_together), 1) self.assertItemsEqual(person._unique_together[0], ('nom', 'prenom', 'inline2')) finally: self.repo.set_schema(origshema) def test_custom_attribute_param(self): origshema = self.repo.schema try: self.repo.config.repairing = True # avoid versions checking self.repo.set_schema(self.repo.deserialize_schema()) pes = self.repo.schema['Personne'] attr = pes.rdef('custom_field_of_jungle') self.assertIn('jungle_speed', vars(attr)) self.assertEqual(42, attr.jungle_speed) finally: self.repo.set_schema(origshema) class DataHelpersTC(CubicWebTC): def test_type_from_eid(self): with self.admin_access.repo_cnx() as cnx: with cnx.ensure_cnx_set: self.assertEqual(self.repo.type_from_eid(2, cnx), 'CWGroup') def test_type_from_eid_raise(self): with self.admin_access.repo_cnx() as cnx: with cnx.ensure_cnx_set: self.assertRaises(UnknownEid, self.repo.type_from_eid, -2, cnx) def test_add_delete_info(self): with self.admin_access.repo_cnx() as cnx: with cnx.ensure_cnx_set: cnx.mode = 'write' entity = self.repo.vreg['etypes'].etype_class('Personne')(cnx) entity.eid = -1 entity.complete = lambda x: None self.repo.add_info(cnx, entity, self.repo.system_source) cu = cnx.system_sql('SELECT * FROM entities WHERE eid = -1') data = cu.fetchall() self.assertEqual(tuplify(data), [(-1, 'Personne', 'system', None)]) self.repo.delete_info(cnx, entity, 'system') #self.repo.commit() cu = cnx.system_sql('SELECT * FROM entities WHERE eid = -1') data = cu.fetchall() self.assertEqual(data, []) class FTITC(CubicWebTC): def test_fulltext_container_entity(self): with self.admin_access.repo_cnx() as cnx: assert self.schema.rschema('use_email').fulltext_container == 'subject' toto = cnx.create_entity('EmailAddress', address=u'toto@logilab.fr') cnx.commit() rset = cnx.execute('Any X WHERE X has_text %(t)s', {'t': 'toto'}) self.assertEqual(rset.rows, []) cnx.user.cw_set(use_email=toto) cnx.commit() rset = cnx.execute('Any X WHERE X has_text %(t)s', {'t': 'toto'}) self.assertEqual(rset.rows, [[cnx.user.eid]]) cnx.execute('DELETE X use_email Y WHERE X login "admin", Y eid %(y)s', {'y': toto.eid}) cnx.commit() rset = cnx.execute('Any X WHERE X has_text %(t)s', {'t': 'toto'}) self.assertEqual(rset.rows, []) tutu = cnx.create_entity('EmailAddress', address=u'tutu@logilab.fr') cnx.user.cw_set(use_email=tutu) cnx.commit() rset = cnx.execute('Any X WHERE X has_text %(t)s', {'t': 'tutu'}) self.assertEqual(rset.rows, [[cnx.user.eid]]) tutu.cw_set(address=u'hip@logilab.fr') cnx.commit() rset = cnx.execute('Any X WHERE X has_text %(t)s', {'t': 'tutu'}) self.assertEqual(rset.rows, []) rset = cnx.execute('Any X WHERE X has_text %(t)s', {'t': 'hip'}) self.assertEqual(rset.rows, [[cnx.user.eid]]) def test_no_uncessary_ftiindex_op(self): with self.admin_access.repo_cnx() as cnx: cnx.create_entity('Workflow', name=u'dummy workflow', description=u'huuuuu') self.assertFalse(any(x for x in cnx.pending_operations if isinstance(x, native.FTIndexEntityOp))) class DBInitTC(CubicWebTC): def test_versions_inserted(self): with self.admin_access.repo_cnx() as cnx: inserted = [r[0] for r in cnx.execute('Any K ORDERBY K ' 'WHERE P pkey K, P pkey ~= "system.version.%"')] self.assertEqual(inserted, [u'system.version.basket', u'system.version.card', u'system.version.comment', u'system.version.cubicweb', u'system.version.email', u'system.version.file', u'system.version.folder', u'system.version.localperms', u'system.version.tag']) CALLED = [] class InlineRelHooksTC(CubicWebTC): """test relation hooks are called for inlined relations """ def setUp(self): CubicWebTC.setUp(self) CALLED[:] = () def test_inline_relation(self): """make sure <event>_relation hooks are called for inlined relation""" class EcritParHook(hook.Hook): __regid__ = 'inlinedrelhook' __select__ = hook.Hook.__select__ & hook.match_rtype('ecrit_par') events = ('before_add_relation', 'after_add_relation', 'before_delete_relation', 'after_delete_relation') def __call__(self): CALLED.append((self.event, self.eidfrom, self.rtype, self.eidto)) with self.temporary_appobjects(EcritParHook): with self.admin_access.repo_cnx() as cnx: eidp = cnx.execute('INSERT Personne X: X nom "toto"')[0][0] eidn = cnx.execute('INSERT Note X: X type "T"')[0][0] cnx.execute('SET N ecrit_par Y WHERE N type "T", Y nom "toto"') self.assertEqual(CALLED, [('before_add_relation', eidn, 'ecrit_par', eidp), ('after_add_relation', eidn, 'ecrit_par', eidp)]) CALLED[:] = () cnx.execute('DELETE N ecrit_par Y WHERE N type "T", Y nom "toto"') self.assertEqual(CALLED, [('before_delete_relation', eidn, 'ecrit_par', eidp), ('after_delete_relation', eidn, 'ecrit_par', eidp)]) CALLED[:] = () eidn = cnx.execute('INSERT Note N: N ecrit_par P WHERE P nom "toto"')[0][0] self.assertEqual(CALLED, [('before_add_relation', eidn, 'ecrit_par', eidp), ('after_add_relation', eidn, 'ecrit_par', eidp)]) def test_unique_contraint(self): with self.admin_access.repo_cnx() as cnx: toto = cnx.create_entity('Personne', nom=u'toto') a01 = cnx.create_entity('Affaire', ref=u'A01', todo_by=toto) cnx.commit() cnx.create_entity('Note', type=u'todo', inline1=a01) cnx.commit() cnx.create_entity('Note', type=u'todo', inline1=a01) with self.assertRaises(ValidationError) as cm: cnx.commit() self.assertEqual(cm.exception.errors, {'inline1-subject': u'RQLUniqueConstraint S type T, S inline1 A1, ' 'A1 todo_by C, Y type T, Y inline1 A2, A2 todo_by C failed'}) def test_add_relations_at_creation_with_del_existing_rel(self): with self.admin_access.repo_cnx() as cnx: person = cnx.create_entity('Personne', nom=u'Toto', prenom=u'Lanturlu', sexe=u'M') users_rql = 'Any U WHERE U is CWGroup, U name "users"' users = cnx.execute(users_rql).get_entity(0, 0) cnx.create_entity('CWUser', login=u'Toto', upassword=u'firstname', firstname=u'firstname', surname=u'surname', reverse_login_user=person, in_group=users) cnx.commit() class PerformanceTest(CubicWebTC): def setUp(self): super(PerformanceTest, self).setUp() logger = logging.getLogger('cubicweb.session') #logger.handlers = [logging.StreamHandler(sys.stdout)] logger.setLevel(logging.INFO) self.info = logger.info def tearDown(self): super(PerformanceTest, self).tearDown() logger = logging.getLogger('cubicweb.session') logger.setLevel(logging.CRITICAL) def test_composite_deletion(self): with self.admin_access.repo_cnx() as cnx: personnes = [] t0 = time.time() for i in xrange(2000): p = cnx.create_entity('Personne', nom=u'Doe%03d'%i, prenom=u'John', sexe=u'M') personnes.append(p) abraham = cnx.create_entity('Personne', nom=u'Abraham', prenom=u'John', sexe=u'M') for j in xrange(0, 2000, 100): abraham.cw_set(personne_composite=personnes[j:j+100]) t1 = time.time() self.info('creation: %.2gs', (t1 - t0)) cnx.commit() t2 = time.time() self.info('commit creation: %.2gs', (t2 - t1)) cnx.execute('DELETE Personne P WHERE P eid %(eid)s', {'eid': abraham.eid}) t3 = time.time() self.info('deletion: %.2gs', (t3 - t2)) cnx.commit() t4 = time.time() self.info("commit deletion: %2gs", (t4 - t3)) def test_add_relation_non_inlined(self): with self.admin_access.repo_cnx() as cnx: personnes = [] for i in xrange(2000): p = cnx.create_entity('Personne', nom=u'Doe%03d'%i, prenom=u'John', sexe=u'M') personnes.append(p) cnx.commit() t0 = time.time() abraham = cnx.create_entity('Personne', nom=u'Abraham', prenom=u'John', sexe=u'M', personne_composite=personnes[:100]) t1 = time.time() self.info('creation: %.2gs', (t1 - t0)) for j in xrange(100, 2000, 100): abraham.cw_set(personne_composite=personnes[j:j+100]) t2 = time.time() self.info('more relations: %.2gs', (t2-t1)) cnx.commit() t3 = time.time() self.info('commit creation: %.2gs', (t3 - t2)) def test_add_relation_inlined(self): with self.admin_access.repo_cnx() as cnx: personnes = [] for i in xrange(2000): p = cnx.create_entity('Personne', nom=u'Doe%03d'%i, prenom=u'John', sexe=u'M') personnes.append(p) cnx.commit() t0 = time.time() abraham = cnx.create_entity('Personne', nom=u'Abraham', prenom=u'John', sexe=u'M', personne_inlined=personnes[:100]) t1 = time.time() self.info('creation: %.2gs', (t1 - t0)) for j in xrange(100, 2000, 100): abraham.cw_set(personne_inlined=personnes[j:j+100]) t2 = time.time() self.info('more relations: %.2gs', (t2-t1)) cnx.commit() t3 = time.time() self.info('commit creation: %.2gs', (t3 - t2)) def test_session_add_relation(self): """ to be compared with test_session_add_relations""" with self.admin_access.repo_cnx() as cnx: personnes = [] for i in xrange(2000): p = cnx.create_entity('Personne', nom=u'Doe%03d'%i, prenom=u'John', sexe=u'M') personnes.append(p) abraham = cnx.create_entity('Personne', nom=u'Abraham', prenom=u'John', sexe=u'M') cnx.commit() t0 = time.time() add_relation = cnx.add_relation for p in personnes: add_relation(abraham.eid, 'personne_composite', p.eid) cnx.commit() t1 = time.time() self.info('add relation: %.2gs', t1-t0) def test_session_add_relations (self): """ to be compared with test_session_add_relation""" with self.admin_access.repo_cnx() as cnx: personnes = [] for i in xrange(2000): p = cnx.create_entity('Personne', nom=u'Doe%03d'%i, prenom=u'John', sexe=u'M') personnes.append(p) abraham = cnx.create_entity('Personne', nom=u'Abraham', prenom=u'John', sexe=u'M') cnx.commit() t0 = time.time() add_relations = cnx.add_relations relations = [('personne_composite', [(abraham.eid, p.eid) for p in personnes])] add_relations(relations) cnx.commit() t1 = time.time() self.info('add relations: %.2gs', t1-t0) def test_session_add_relation_inlined(self): """ to be compared with test_session_add_relations""" with self.admin_access.repo_cnx() as cnx: personnes = [] for i in xrange(2000): p = cnx.create_entity('Personne', nom=u'Doe%03d'%i, prenom=u'John', sexe=u'M') personnes.append(p) abraham = cnx.create_entity('Personne', nom=u'Abraham', prenom=u'John', sexe=u'M') cnx.commit() t0 = time.time() add_relation = cnx.add_relation for p in personnes: add_relation(abraham.eid, 'personne_inlined', p.eid) cnx.commit() t1 = time.time() self.info('add relation (inlined): %.2gs', t1-t0) def test_session_add_relations_inlined (self): """ to be compared with test_session_add_relation""" with self.admin_access.repo_cnx() as cnx: personnes = [] for i in xrange(2000): p = cnx.create_entity('Personne', nom=u'Doe%03d'%i, prenom=u'John', sexe=u'M') personnes.append(p) abraham = cnx.create_entity('Personne', nom=u'Abraham', prenom=u'John', sexe=u'M') cnx.commit() t0 = time.time() add_relations = cnx.add_relations relations = [('personne_inlined', [(abraham.eid, p.eid) for p in personnes])] add_relations(relations) cnx.commit() t1 = time.time() self.info('add relations (inlined): %.2gs', t1-t0) def test_optional_relation_reset_1(self): with self.admin_access.repo_cnx() as cnx: p1 = cnx.create_entity('Personne', nom=u'Vincent') p2 = cnx.create_entity('Personne', nom=u'Florent') w = cnx.create_entity('Affaire', ref=u'wc') w.cw_set(todo_by=[p1,p2]) w.cw_clear_all_caches() cnx.commit() self.assertEqual(len(w.todo_by), 1) self.assertEqual(w.todo_by[0].eid, p2.eid) def test_optional_relation_reset_2(self): with self.admin_access.repo_cnx() as cnx: p1 = cnx.create_entity('Personne', nom=u'Vincent') p2 = cnx.create_entity('Personne', nom=u'Florent') w = cnx.create_entity('Affaire', ref=u'wc') w.cw_set(todo_by=p1) cnx.commit() w.cw_set(todo_by=p2) w.cw_clear_all_caches() cnx.commit() self.assertEqual(len(w.todo_by), 1) self.assertEqual(w.todo_by[0].eid, p2.eid) if __name__ == '__main__': from logilab.common.testlib import unittest_main unittest_main()