diff -r 058bb3dc685f -r 0b59724cb3f2 cubicweb/server/test/unittest_postgres.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/cubicweb/server/test/unittest_postgres.py Sat Jan 16 13:48:51 2016 +0100 @@ -0,0 +1,186 @@ +# copyright 2003-2015 LOGILAB S.A. (Paris, FRANCE), all rights reserved. +# contact http://www.logilab.fr/ -- mailto:contact@logilab.fr +# +# This file is part of CubicWeb. +# +# CubicWeb is free software: you can redistribute it and/or modify it under the +# terms of the GNU Lesser General Public License as published by the Free +# Software Foundation, either version 2.1 of the License, or (at your option) +# any later version. +# +# CubicWeb is distributed in the hope that it will be useful, but WITHOUT +# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS +# FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more +# details. +# +# You should have received a copy of the GNU Lesser General Public License along +# with CubicWeb. If not, see . + +from datetime import datetime +from threading import Thread + +from six.moves import range + +from logilab.common.testlib import SkipTest + +import logilab.database as lgdb +from cubicweb import ValidationError +from cubicweb.devtools import PostgresApptestConfiguration, startpgcluster, stoppgcluster +from cubicweb.devtools.testlib import CubicWebTC +from cubicweb.predicates import is_instance +from cubicweb.entities.adapters import IFTIndexableAdapter + +from unittest_querier import FixedOffset + + +def setUpModule(): + startpgcluster(__file__) + + +def tearDownModule(): + stoppgcluster(__file__) + + +class PostgresTimeoutConfiguration(PostgresApptestConfiguration): + def __init__(self, *args, **kwargs): + self.default_sources = PostgresApptestConfiguration.default_sources.copy() + self.default_sources['system'] = PostgresApptestConfiguration.default_sources['system'].copy() + self.default_sources['system']['db-statement-timeout'] = 200 + super(PostgresTimeoutConfiguration, self).__init__(*args, **kwargs) + + +class PostgresFTITC(CubicWebTC): + configcls = PostgresTimeoutConfiguration + + @classmethod + def setUpClass(cls): + cls.orig_connect_hooks = lgdb.SQL_CONNECT_HOOKS['postgres'][:] + + @classmethod + def tearDownClass(cls): + lgdb.SQL_CONNECT_HOOKS['postgres'] = cls.orig_connect_hooks + + def test_eid_range(self): + # concurrent allocation of eid ranges + source = self.session.repo.sources_by_uri['system'] + range1 = [] + range2 = [] + def allocate_eid_ranges(session, target): + for x in range(1, 10): + eid = source.create_eid(session, count=x) + target.extend(range(eid-x, eid)) + + t1 = Thread(target=lambda: allocate_eid_ranges(self.session, range1)) + t2 = Thread(target=lambda: allocate_eid_ranges(self.session, range2)) + t1.start() + t2.start() + t1.join() + t2.join() + self.assertEqual(range1, sorted(range1)) + self.assertEqual(range2, sorted(range2)) + self.assertEqual(set(), set(range1) & set(range2)) + + def test_occurence_count(self): + with self.admin_access.repo_cnx() as cnx: + c1 = cnx.create_entity('Card', title=u'c1', + content=u'cubicweb cubicweb cubicweb') + c2 = cnx.create_entity('Card', title=u'c3', + content=u'cubicweb') + c3 = cnx.create_entity('Card', title=u'c2', + content=u'cubicweb cubicweb') + cnx.commit() + self.assertEqual(cnx.execute('Card X ORDERBY FTIRANK(X) DESC ' + 'WHERE X has_text "cubicweb"').rows, + [[c1.eid,], [c3.eid,], [c2.eid,]]) + + + def test_attr_weight(self): + class CardIFTIndexableAdapter(IFTIndexableAdapter): + __select__ = is_instance('Card') + attr_weight = {'title': 'A'} + with self.temporary_appobjects(CardIFTIndexableAdapter): + with self.admin_access.repo_cnx() as cnx: + c1 = cnx.create_entity('Card', title=u'c1', + content=u'cubicweb cubicweb cubicweb') + c2 = cnx.create_entity('Card', title=u'c2', + content=u'cubicweb cubicweb') + c3 = cnx.create_entity('Card', title=u'cubicweb', + content=u'autre chose') + cnx.commit() + self.assertEqual(cnx.execute('Card X ORDERBY FTIRANK(X) DESC ' + 'WHERE X has_text "cubicweb"').rows, + [[c3.eid,], [c1.eid,], [c2.eid,]]) + + def test_entity_weight(self): + class PersonneIFTIndexableAdapter(IFTIndexableAdapter): + __select__ = is_instance('Personne') + entity_weight = 2.0 + with self.temporary_appobjects(PersonneIFTIndexableAdapter): + with self.admin_access.repo_cnx() as cnx: + c1 = cnx.create_entity('Personne', nom=u'c1', prenom=u'cubicweb') + c2 = cnx.create_entity('Comment', content=u'cubicweb cubicweb', + comments=c1) + c3 = cnx.create_entity('Comment', content=u'cubicweb cubicweb cubicweb', + comments=c1) + cnx.commit() + self.assertEqual(cnx.execute('Any X ORDERBY FTIRANK(X) DESC ' + 'WHERE X has_text "cubicweb"').rows, + [[c1.eid,], [c3.eid,], [c2.eid,]]) + + def test_tz_datetime(self): + with self.admin_access.repo_cnx() as cnx: + bob = cnx.create_entity('Personne', nom=u'bob', + tzdatenaiss=datetime(1977, 6, 7, 2, 0, tzinfo=FixedOffset(1))) + datenaiss = cnx.execute("Any XD WHERE X nom 'bob', X tzdatenaiss XD")[0][0] + self.assertIsNotNone(datenaiss.tzinfo) + self.assertEqual(datenaiss.utctimetuple()[:5], (1977, 6, 7, 1, 0)) + cnx.commit() + cnx.create_entity('Personne', nom=u'boby', + tzdatenaiss=datetime(1977, 6, 7, 2, 0)) + datenaiss = cnx.execute("Any XD WHERE X nom 'boby', X tzdatenaiss XD")[0][0] + self.assertIsNotNone(datenaiss.tzinfo) + self.assertEqual(datenaiss.utctimetuple()[:5], (1977, 6, 7, 2, 0)) + rset = cnx.execute("Any X WHERE X tzdatenaiss %(d)s", + {'d': datetime(1977, 6, 7, 2, 0, tzinfo=FixedOffset(1))}) + self.assertEqual(rset.rows, [[bob.eid]]) + + def test_constraint_validationerror(self): + with self.admin_access.repo_cnx() as cnx: + with cnx.allow_all_hooks_but('integrity'): + with self.assertRaises(ValidationError) as cm: + cnx.execute("INSERT Note N: N type 'nogood'") + self.assertEqual(cm.exception.errors, + {'type-subject': u'invalid value %(KEY-value)s, it must be one of %(KEY-choices)s'}) + self.assertEqual(cm.exception.msgargs, + {'type-subject-value': u'"nogood"', + 'type-subject-choices': u'"todo", "a", "b", "T", "lalala"'}) + + def test_statement_timeout(self): + with self.admin_access.repo_cnx() as cnx: + cnx.system_sql('select pg_sleep(0.1)') + with self.assertRaises(Exception): + cnx.system_sql('select pg_sleep(0.3)') + + +class PostgresLimitSizeTC(CubicWebTC): + configcls = PostgresApptestConfiguration + + def test(self): + with self.admin_access.repo_cnx() as cnx: + def sql(string): + return cnx.system_sql(string).fetchone()[0] + yield self.assertEqual, sql("SELECT limit_size('

hello

', 'text/html', 20)"), \ + '

hello

' + yield self.assertEqual, sql("SELECT limit_size('

hello

', 'text/html', 2)"), \ + 'he...' + yield self.assertEqual, sql("SELECT limit_size('
hello', 'text/html', 2)"), \ + 'he...' + yield self.assertEqual, sql("SELECT limit_size('hello', 'text/html', 2)"), \ + 'he...' + yield self.assertEqual, sql("SELECT limit_size('a>b', 'text/html', 2)"), \ + 'a>...' + + +if __name__ == '__main__': + from logilab.common.testlib import unittest_main + unittest_main()