cubicweb/server/test/unittest_postgres.py
changeset 11057 0b59724cb3f2
parent 11034 75d752e6daf7
child 11077 09be48c01fa4
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/cubicweb/server/test/unittest_postgres.py	Sat Jan 16 13:48:51 2016 +0100
@@ -0,0 +1,186 @@
+# copyright 2003-2015 LOGILAB S.A. (Paris, FRANCE), all rights reserved.
+# contact http://www.logilab.fr/ -- mailto:contact@logilab.fr
+#
+# This file is part of CubicWeb.
+#
+# CubicWeb is free software: you can redistribute it and/or modify it under the
+# terms of the GNU Lesser General Public License as published by the Free
+# Software Foundation, either version 2.1 of the License, or (at your option)
+# any later version.
+#
+# CubicWeb is distributed in the hope that it will be useful, but WITHOUT
+# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
+# FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for more
+# details.
+#
+# You should have received a copy of the GNU Lesser General Public License along
+# with CubicWeb.  If not, see <http://www.gnu.org/licenses/>.
+
+from datetime import datetime
+from threading import Thread
+
+from six.moves import range
+
+from logilab.common.testlib import SkipTest
+
+import logilab.database as lgdb
+from cubicweb import ValidationError
+from cubicweb.devtools import PostgresApptestConfiguration, startpgcluster, stoppgcluster
+from cubicweb.devtools.testlib import CubicWebTC
+from cubicweb.predicates import is_instance
+from cubicweb.entities.adapters import IFTIndexableAdapter
+
+from unittest_querier import FixedOffset
+
+
+def setUpModule():
+    startpgcluster(__file__)
+
+
+def tearDownModule():
+    stoppgcluster(__file__)
+
+
+class PostgresTimeoutConfiguration(PostgresApptestConfiguration):
+    def __init__(self, *args, **kwargs):
+        self.default_sources = PostgresApptestConfiguration.default_sources.copy()
+        self.default_sources['system'] = PostgresApptestConfiguration.default_sources['system'].copy()
+        self.default_sources['system']['db-statement-timeout'] = 200
+        super(PostgresTimeoutConfiguration, self).__init__(*args, **kwargs)
+
+
+class PostgresFTITC(CubicWebTC):
+    configcls = PostgresTimeoutConfiguration
+
+    @classmethod
+    def setUpClass(cls):
+        cls.orig_connect_hooks = lgdb.SQL_CONNECT_HOOKS['postgres'][:]
+
+    @classmethod
+    def tearDownClass(cls):
+        lgdb.SQL_CONNECT_HOOKS['postgres'] = cls.orig_connect_hooks
+
+    def test_eid_range(self):
+        # concurrent allocation of eid ranges
+        source = self.session.repo.sources_by_uri['system']
+        range1 = []
+        range2 = []
+        def allocate_eid_ranges(session, target):
+            for x in range(1, 10):
+                eid = source.create_eid(session, count=x)
+                target.extend(range(eid-x, eid))
+
+        t1 = Thread(target=lambda: allocate_eid_ranges(self.session, range1))
+        t2 = Thread(target=lambda: allocate_eid_ranges(self.session, range2))
+        t1.start()
+        t2.start()
+        t1.join()
+        t2.join()
+        self.assertEqual(range1, sorted(range1))
+        self.assertEqual(range2, sorted(range2))
+        self.assertEqual(set(), set(range1) & set(range2))
+
+    def test_occurence_count(self):
+        with self.admin_access.repo_cnx() as cnx:
+            c1 = cnx.create_entity('Card', title=u'c1',
+                                   content=u'cubicweb cubicweb cubicweb')
+            c2 = cnx.create_entity('Card', title=u'c3',
+                                   content=u'cubicweb')
+            c3 = cnx.create_entity('Card', title=u'c2',
+                                   content=u'cubicweb cubicweb')
+            cnx.commit()
+            self.assertEqual(cnx.execute('Card X ORDERBY FTIRANK(X) DESC '
+                                         'WHERE X has_text "cubicweb"').rows,
+                             [[c1.eid,], [c3.eid,], [c2.eid,]])
+
+
+    def test_attr_weight(self):
+        class CardIFTIndexableAdapter(IFTIndexableAdapter):
+            __select__ = is_instance('Card')
+            attr_weight = {'title': 'A'}
+        with self.temporary_appobjects(CardIFTIndexableAdapter):
+            with self.admin_access.repo_cnx() as cnx:
+                c1 = cnx.create_entity('Card', title=u'c1',
+                                       content=u'cubicweb cubicweb cubicweb')
+                c2 = cnx.create_entity('Card', title=u'c2',
+                                       content=u'cubicweb cubicweb')
+                c3 = cnx.create_entity('Card', title=u'cubicweb',
+                                       content=u'autre chose')
+                cnx.commit()
+                self.assertEqual(cnx.execute('Card X ORDERBY FTIRANK(X) DESC '
+                                             'WHERE X has_text "cubicweb"').rows,
+                                 [[c3.eid,], [c1.eid,], [c2.eid,]])
+
+    def test_entity_weight(self):
+        class PersonneIFTIndexableAdapter(IFTIndexableAdapter):
+            __select__ = is_instance('Personne')
+            entity_weight = 2.0
+        with self.temporary_appobjects(PersonneIFTIndexableAdapter):
+            with self.admin_access.repo_cnx() as cnx:
+                c1 = cnx.create_entity('Personne', nom=u'c1', prenom=u'cubicweb')
+                c2 = cnx.create_entity('Comment', content=u'cubicweb cubicweb',
+                                       comments=c1)
+                c3 = cnx.create_entity('Comment', content=u'cubicweb cubicweb cubicweb',
+                                       comments=c1)
+                cnx.commit()
+                self.assertEqual(cnx.execute('Any X ORDERBY FTIRANK(X) DESC '
+                                             'WHERE X has_text "cubicweb"').rows,
+                                  [[c1.eid,], [c3.eid,], [c2.eid,]])
+
+    def test_tz_datetime(self):
+        with self.admin_access.repo_cnx() as cnx:
+            bob = cnx.create_entity('Personne', nom=u'bob',
+                                   tzdatenaiss=datetime(1977, 6, 7, 2, 0, tzinfo=FixedOffset(1)))
+            datenaiss = cnx.execute("Any XD WHERE X nom 'bob', X tzdatenaiss XD")[0][0]
+            self.assertIsNotNone(datenaiss.tzinfo)
+            self.assertEqual(datenaiss.utctimetuple()[:5], (1977, 6, 7, 1, 0))
+            cnx.commit()
+            cnx.create_entity('Personne', nom=u'boby',
+                              tzdatenaiss=datetime(1977, 6, 7, 2, 0))
+            datenaiss = cnx.execute("Any XD WHERE X nom 'boby', X tzdatenaiss XD")[0][0]
+            self.assertIsNotNone(datenaiss.tzinfo)
+            self.assertEqual(datenaiss.utctimetuple()[:5], (1977, 6, 7, 2, 0))
+            rset = cnx.execute("Any X WHERE X tzdatenaiss %(d)s",
+                               {'d': datetime(1977, 6, 7, 2, 0, tzinfo=FixedOffset(1))})
+            self.assertEqual(rset.rows, [[bob.eid]])
+
+    def test_constraint_validationerror(self):
+        with self.admin_access.repo_cnx() as cnx:
+            with cnx.allow_all_hooks_but('integrity'):
+                with self.assertRaises(ValidationError) as cm:
+                    cnx.execute("INSERT Note N: N type 'nogood'")
+                self.assertEqual(cm.exception.errors,
+                        {'type-subject': u'invalid value %(KEY-value)s, it must be one of %(KEY-choices)s'})
+                self.assertEqual(cm.exception.msgargs,
+                        {'type-subject-value': u'"nogood"',
+                         'type-subject-choices': u'"todo", "a", "b", "T", "lalala"'})
+
+    def test_statement_timeout(self):
+        with self.admin_access.repo_cnx() as cnx:
+            cnx.system_sql('select pg_sleep(0.1)')
+            with self.assertRaises(Exception):
+                cnx.system_sql('select pg_sleep(0.3)')
+
+
+class PostgresLimitSizeTC(CubicWebTC):
+    configcls = PostgresApptestConfiguration
+
+    def test(self):
+        with self.admin_access.repo_cnx() as cnx:
+            def sql(string):
+                return cnx.system_sql(string).fetchone()[0]
+            yield self.assertEqual, sql("SELECT limit_size('<p>hello</p>', 'text/html', 20)"), \
+                '<p>hello</p>'
+            yield self.assertEqual, sql("SELECT limit_size('<p>hello</p>', 'text/html', 2)"), \
+                'he...'
+            yield self.assertEqual, sql("SELECT limit_size('<br/>hello', 'text/html', 2)"), \
+                'he...'
+            yield self.assertEqual, sql("SELECT limit_size('<span class=\"1\">he</span>llo', 'text/html', 2)"), \
+                'he...'
+            yield self.assertEqual, sql("SELECT limit_size('<span>a>b</span>', 'text/html', 2)"), \
+                'a>...'
+
+
+if __name__ == '__main__':
+    from logilab.common.testlib import unittest_main
+    unittest_main()