server/test/unittest_postgres.py
changeset 11057 0b59724cb3f2
parent 11052 058bb3dc685f
child 11058 23eb30449fe5
--- a/server/test/unittest_postgres.py	Mon Jan 04 18:40:30 2016 +0100
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,186 +0,0 @@
-# copyright 2003-2015 LOGILAB S.A. (Paris, FRANCE), all rights reserved.
-# contact http://www.logilab.fr/ -- mailto:contact@logilab.fr
-#
-# This file is part of CubicWeb.
-#
-# CubicWeb is free software: you can redistribute it and/or modify it under the
-# terms of the GNU Lesser General Public License as published by the Free
-# Software Foundation, either version 2.1 of the License, or (at your option)
-# any later version.
-#
-# CubicWeb is distributed in the hope that it will be useful, but WITHOUT
-# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
-# FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for more
-# details.
-#
-# You should have received a copy of the GNU Lesser General Public License along
-# with CubicWeb.  If not, see <http://www.gnu.org/licenses/>.
-
-from datetime import datetime
-from threading import Thread
-
-from six.moves import range
-
-from logilab.common.testlib import SkipTest
-
-import logilab.database as lgdb
-from cubicweb import ValidationError
-from cubicweb.devtools import PostgresApptestConfiguration, startpgcluster, stoppgcluster
-from cubicweb.devtools.testlib import CubicWebTC
-from cubicweb.predicates import is_instance
-from cubicweb.entities.adapters import IFTIndexableAdapter
-
-from unittest_querier import FixedOffset
-
-
-def setUpModule():
-    startpgcluster(__file__)
-
-
-def tearDownModule():
-    stoppgcluster(__file__)
-
-
-class PostgresTimeoutConfiguration(PostgresApptestConfiguration):
-    def __init__(self, *args, **kwargs):
-        self.default_sources = PostgresApptestConfiguration.default_sources.copy()
-        self.default_sources['system'] = PostgresApptestConfiguration.default_sources['system'].copy()
-        self.default_sources['system']['db-statement-timeout'] = 200
-        super(PostgresTimeoutConfiguration, self).__init__(*args, **kwargs)
-
-
-class PostgresFTITC(CubicWebTC):
-    configcls = PostgresTimeoutConfiguration
-
-    @classmethod
-    def setUpClass(cls):
-        cls.orig_connect_hooks = lgdb.SQL_CONNECT_HOOKS['postgres'][:]
-
-    @classmethod
-    def tearDownClass(cls):
-        lgdb.SQL_CONNECT_HOOKS['postgres'] = cls.orig_connect_hooks
-
-    def test_eid_range(self):
-        # concurrent allocation of eid ranges
-        source = self.session.repo.sources_by_uri['system']
-        range1 = []
-        range2 = []
-        def allocate_eid_ranges(session, target):
-            for x in range(1, 10):
-                eid = source.create_eid(session, count=x)
-                target.extend(range(eid-x, eid))
-
-        t1 = Thread(target=lambda: allocate_eid_ranges(self.session, range1))
-        t2 = Thread(target=lambda: allocate_eid_ranges(self.session, range2))
-        t1.start()
-        t2.start()
-        t1.join()
-        t2.join()
-        self.assertEqual(range1, sorted(range1))
-        self.assertEqual(range2, sorted(range2))
-        self.assertEqual(set(), set(range1) & set(range2))
-
-    def test_occurence_count(self):
-        with self.admin_access.repo_cnx() as cnx:
-            c1 = cnx.create_entity('Card', title=u'c1',
-                                   content=u'cubicweb cubicweb cubicweb')
-            c2 = cnx.create_entity('Card', title=u'c3',
-                                   content=u'cubicweb')
-            c3 = cnx.create_entity('Card', title=u'c2',
-                                   content=u'cubicweb cubicweb')
-            cnx.commit()
-            self.assertEqual(cnx.execute('Card X ORDERBY FTIRANK(X) DESC '
-                                         'WHERE X has_text "cubicweb"').rows,
-                             [[c1.eid,], [c3.eid,], [c2.eid,]])
-
-
-    def test_attr_weight(self):
-        class CardIFTIndexableAdapter(IFTIndexableAdapter):
-            __select__ = is_instance('Card')
-            attr_weight = {'title': 'A'}
-        with self.temporary_appobjects(CardIFTIndexableAdapter):
-            with self.admin_access.repo_cnx() as cnx:
-                c1 = cnx.create_entity('Card', title=u'c1',
-                                       content=u'cubicweb cubicweb cubicweb')
-                c2 = cnx.create_entity('Card', title=u'c2',
-                                       content=u'cubicweb cubicweb')
-                c3 = cnx.create_entity('Card', title=u'cubicweb',
-                                       content=u'autre chose')
-                cnx.commit()
-                self.assertEqual(cnx.execute('Card X ORDERBY FTIRANK(X) DESC '
-                                             'WHERE X has_text "cubicweb"').rows,
-                                 [[c3.eid,], [c1.eid,], [c2.eid,]])
-
-    def test_entity_weight(self):
-        class PersonneIFTIndexableAdapter(IFTIndexableAdapter):
-            __select__ = is_instance('Personne')
-            entity_weight = 2.0
-        with self.temporary_appobjects(PersonneIFTIndexableAdapter):
-            with self.admin_access.repo_cnx() as cnx:
-                c1 = cnx.create_entity('Personne', nom=u'c1', prenom=u'cubicweb')
-                c2 = cnx.create_entity('Comment', content=u'cubicweb cubicweb',
-                                       comments=c1)
-                c3 = cnx.create_entity('Comment', content=u'cubicweb cubicweb cubicweb',
-                                       comments=c1)
-                cnx.commit()
-                self.assertEqual(cnx.execute('Any X ORDERBY FTIRANK(X) DESC '
-                                             'WHERE X has_text "cubicweb"').rows,
-                                  [[c1.eid,], [c3.eid,], [c2.eid,]])
-
-    def test_tz_datetime(self):
-        with self.admin_access.repo_cnx() as cnx:
-            bob = cnx.create_entity('Personne', nom=u'bob',
-                                   tzdatenaiss=datetime(1977, 6, 7, 2, 0, tzinfo=FixedOffset(1)))
-            datenaiss = cnx.execute("Any XD WHERE X nom 'bob', X tzdatenaiss XD")[0][0]
-            self.assertIsNotNone(datenaiss.tzinfo)
-            self.assertEqual(datenaiss.utctimetuple()[:5], (1977, 6, 7, 1, 0))
-            cnx.commit()
-            cnx.create_entity('Personne', nom=u'boby',
-                              tzdatenaiss=datetime(1977, 6, 7, 2, 0))
-            datenaiss = cnx.execute("Any XD WHERE X nom 'boby', X tzdatenaiss XD")[0][0]
-            self.assertIsNotNone(datenaiss.tzinfo)
-            self.assertEqual(datenaiss.utctimetuple()[:5], (1977, 6, 7, 2, 0))
-            rset = cnx.execute("Any X WHERE X tzdatenaiss %(d)s",
-                               {'d': datetime(1977, 6, 7, 2, 0, tzinfo=FixedOffset(1))})
-            self.assertEqual(rset.rows, [[bob.eid]])
-
-    def test_constraint_validationerror(self):
-        with self.admin_access.repo_cnx() as cnx:
-            with cnx.allow_all_hooks_but('integrity'):
-                with self.assertRaises(ValidationError) as cm:
-                    cnx.execute("INSERT Note N: N type 'nogood'")
-                self.assertEqual(cm.exception.errors,
-                        {'type-subject': u'invalid value %(KEY-value)s, it must be one of %(KEY-choices)s'})
-                self.assertEqual(cm.exception.msgargs,
-                        {'type-subject-value': u'"nogood"',
-                         'type-subject-choices': u'"todo", "a", "b", "T", "lalala"'})
-
-    def test_statement_timeout(self):
-        with self.admin_access.repo_cnx() as cnx:
-            cnx.system_sql('select pg_sleep(0.1)')
-            with self.assertRaises(Exception):
-                cnx.system_sql('select pg_sleep(0.3)')
-
-
-class PostgresLimitSizeTC(CubicWebTC):
-    configcls = PostgresApptestConfiguration
-
-    def test(self):
-        with self.admin_access.repo_cnx() as cnx:
-            def sql(string):
-                return cnx.system_sql(string).fetchone()[0]
-            yield self.assertEqual, sql("SELECT limit_size('<p>hello</p>', 'text/html', 20)"), \
-                '<p>hello</p>'
-            yield self.assertEqual, sql("SELECT limit_size('<p>hello</p>', 'text/html', 2)"), \
-                'he...'
-            yield self.assertEqual, sql("SELECT limit_size('<br/>hello', 'text/html', 2)"), \
-                'he...'
-            yield self.assertEqual, sql("SELECT limit_size('<span class=\"1\">he</span>llo', 'text/html', 2)"), \
-                'he...'
-            yield self.assertEqual, sql("SELECT limit_size('<span>a>b</span>', 'text/html', 2)"), \
-                'a>...'
-
-
-if __name__ == '__main__':
-    from logilab.common.testlib import unittest_main
-    unittest_main()