diff -r 058bb3dc685f -r 0b59724cb3f2 server/test/unittest_postgres.py --- a/server/test/unittest_postgres.py Mon Jan 04 18:40:30 2016 +0100 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,186 +0,0 @@ -# copyright 2003-2015 LOGILAB S.A. (Paris, FRANCE), all rights reserved. -# contact http://www.logilab.fr/ -- mailto:contact@logilab.fr -# -# This file is part of CubicWeb. -# -# CubicWeb is free software: you can redistribute it and/or modify it under the -# terms of the GNU Lesser General Public License as published by the Free -# Software Foundation, either version 2.1 of the License, or (at your option) -# any later version. -# -# CubicWeb is distributed in the hope that it will be useful, but WITHOUT -# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS -# FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more -# details. -# -# You should have received a copy of the GNU Lesser General Public License along -# with CubicWeb. If not, see . - -from datetime import datetime -from threading import Thread - -from six.moves import range - -from logilab.common.testlib import SkipTest - -import logilab.database as lgdb -from cubicweb import ValidationError -from cubicweb.devtools import PostgresApptestConfiguration, startpgcluster, stoppgcluster -from cubicweb.devtools.testlib import CubicWebTC -from cubicweb.predicates import is_instance -from cubicweb.entities.adapters import IFTIndexableAdapter - -from unittest_querier import FixedOffset - - -def setUpModule(): - startpgcluster(__file__) - - -def tearDownModule(): - stoppgcluster(__file__) - - -class PostgresTimeoutConfiguration(PostgresApptestConfiguration): - def __init__(self, *args, **kwargs): - self.default_sources = PostgresApptestConfiguration.default_sources.copy() - self.default_sources['system'] = PostgresApptestConfiguration.default_sources['system'].copy() - self.default_sources['system']['db-statement-timeout'] = 200 - super(PostgresTimeoutConfiguration, self).__init__(*args, **kwargs) - - -class PostgresFTITC(CubicWebTC): - configcls = PostgresTimeoutConfiguration - - @classmethod - def setUpClass(cls): - cls.orig_connect_hooks = lgdb.SQL_CONNECT_HOOKS['postgres'][:] - - @classmethod - def tearDownClass(cls): - lgdb.SQL_CONNECT_HOOKS['postgres'] = cls.orig_connect_hooks - - def test_eid_range(self): - # concurrent allocation of eid ranges - source = self.session.repo.sources_by_uri['system'] - range1 = [] - range2 = [] - def allocate_eid_ranges(session, target): - for x in range(1, 10): - eid = source.create_eid(session, count=x) - target.extend(range(eid-x, eid)) - - t1 = Thread(target=lambda: allocate_eid_ranges(self.session, range1)) - t2 = Thread(target=lambda: allocate_eid_ranges(self.session, range2)) - t1.start() - t2.start() - t1.join() - t2.join() - self.assertEqual(range1, sorted(range1)) - self.assertEqual(range2, sorted(range2)) - self.assertEqual(set(), set(range1) & set(range2)) - - def test_occurence_count(self): - with self.admin_access.repo_cnx() as cnx: - c1 = cnx.create_entity('Card', title=u'c1', - content=u'cubicweb cubicweb cubicweb') - c2 = cnx.create_entity('Card', title=u'c3', - content=u'cubicweb') - c3 = cnx.create_entity('Card', title=u'c2', - content=u'cubicweb cubicweb') - cnx.commit() - self.assertEqual(cnx.execute('Card X ORDERBY FTIRANK(X) DESC ' - 'WHERE X has_text "cubicweb"').rows, - [[c1.eid,], [c3.eid,], [c2.eid,]]) - - - def test_attr_weight(self): - class CardIFTIndexableAdapter(IFTIndexableAdapter): - __select__ = is_instance('Card') - attr_weight = {'title': 'A'} - with self.temporary_appobjects(CardIFTIndexableAdapter): - with self.admin_access.repo_cnx() as cnx: - c1 = cnx.create_entity('Card', title=u'c1', - content=u'cubicweb cubicweb cubicweb') - c2 = cnx.create_entity('Card', title=u'c2', - content=u'cubicweb cubicweb') - c3 = cnx.create_entity('Card', title=u'cubicweb', - content=u'autre chose') - cnx.commit() - self.assertEqual(cnx.execute('Card X ORDERBY FTIRANK(X) DESC ' - 'WHERE X has_text "cubicweb"').rows, - [[c3.eid,], [c1.eid,], [c2.eid,]]) - - def test_entity_weight(self): - class PersonneIFTIndexableAdapter(IFTIndexableAdapter): - __select__ = is_instance('Personne') - entity_weight = 2.0 - with self.temporary_appobjects(PersonneIFTIndexableAdapter): - with self.admin_access.repo_cnx() as cnx: - c1 = cnx.create_entity('Personne', nom=u'c1', prenom=u'cubicweb') - c2 = cnx.create_entity('Comment', content=u'cubicweb cubicweb', - comments=c1) - c3 = cnx.create_entity('Comment', content=u'cubicweb cubicweb cubicweb', - comments=c1) - cnx.commit() - self.assertEqual(cnx.execute('Any X ORDERBY FTIRANK(X) DESC ' - 'WHERE X has_text "cubicweb"').rows, - [[c1.eid,], [c3.eid,], [c2.eid,]]) - - def test_tz_datetime(self): - with self.admin_access.repo_cnx() as cnx: - bob = cnx.create_entity('Personne', nom=u'bob', - tzdatenaiss=datetime(1977, 6, 7, 2, 0, tzinfo=FixedOffset(1))) - datenaiss = cnx.execute("Any XD WHERE X nom 'bob', X tzdatenaiss XD")[0][0] - self.assertIsNotNone(datenaiss.tzinfo) - self.assertEqual(datenaiss.utctimetuple()[:5], (1977, 6, 7, 1, 0)) - cnx.commit() - cnx.create_entity('Personne', nom=u'boby', - tzdatenaiss=datetime(1977, 6, 7, 2, 0)) - datenaiss = cnx.execute("Any XD WHERE X nom 'boby', X tzdatenaiss XD")[0][0] - self.assertIsNotNone(datenaiss.tzinfo) - self.assertEqual(datenaiss.utctimetuple()[:5], (1977, 6, 7, 2, 0)) - rset = cnx.execute("Any X WHERE X tzdatenaiss %(d)s", - {'d': datetime(1977, 6, 7, 2, 0, tzinfo=FixedOffset(1))}) - self.assertEqual(rset.rows, [[bob.eid]]) - - def test_constraint_validationerror(self): - with self.admin_access.repo_cnx() as cnx: - with cnx.allow_all_hooks_but('integrity'): - with self.assertRaises(ValidationError) as cm: - cnx.execute("INSERT Note N: N type 'nogood'") - self.assertEqual(cm.exception.errors, - {'type-subject': u'invalid value %(KEY-value)s, it must be one of %(KEY-choices)s'}) - self.assertEqual(cm.exception.msgargs, - {'type-subject-value': u'"nogood"', - 'type-subject-choices': u'"todo", "a", "b", "T", "lalala"'}) - - def test_statement_timeout(self): - with self.admin_access.repo_cnx() as cnx: - cnx.system_sql('select pg_sleep(0.1)') - with self.assertRaises(Exception): - cnx.system_sql('select pg_sleep(0.3)') - - -class PostgresLimitSizeTC(CubicWebTC): - configcls = PostgresApptestConfiguration - - def test(self): - with self.admin_access.repo_cnx() as cnx: - def sql(string): - return cnx.system_sql(string).fetchone()[0] - yield self.assertEqual, sql("SELECT limit_size('

hello

', 'text/html', 20)"), \ - '

hello

' - yield self.assertEqual, sql("SELECT limit_size('

hello

', 'text/html', 2)"), \ - 'he...' - yield self.assertEqual, sql("SELECT limit_size('
hello', 'text/html', 2)"), \ - 'he...' - yield self.assertEqual, sql("SELECT limit_size('hello', 'text/html', 2)"), \ - 'he...' - yield self.assertEqual, sql("SELECT limit_size('a>b', 'text/html', 2)"), \ - 'a>...' - - -if __name__ == '__main__': - from logilab.common.testlib import unittest_main - unittest_main()