server/test/unittest_postgres.py
changeset 11057 0b59724cb3f2
parent 11052 058bb3dc685f
child 11058 23eb30449fe5
equal deleted inserted replaced
11052:058bb3dc685f 11057:0b59724cb3f2
     1 # copyright 2003-2015 LOGILAB S.A. (Paris, FRANCE), all rights reserved.
       
     2 # contact http://www.logilab.fr/ -- mailto:contact@logilab.fr
       
     3 #
       
     4 # This file is part of CubicWeb.
       
     5 #
       
     6 # CubicWeb is free software: you can redistribute it and/or modify it under the
       
     7 # terms of the GNU Lesser General Public License as published by the Free
       
     8 # Software Foundation, either version 2.1 of the License, or (at your option)
       
     9 # any later version.
       
    10 #
       
    11 # CubicWeb is distributed in the hope that it will be useful, but WITHOUT
       
    12 # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
       
    13 # FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for more
       
    14 # details.
       
    15 #
       
    16 # You should have received a copy of the GNU Lesser General Public License along
       
    17 # with CubicWeb.  If not, see <http://www.gnu.org/licenses/>.
       
    18 
       
    19 from datetime import datetime
       
    20 from threading import Thread
       
    21 
       
    22 from six.moves import range
       
    23 
       
    24 from logilab.common.testlib import SkipTest
       
    25 
       
    26 import logilab.database as lgdb
       
    27 from cubicweb import ValidationError
       
    28 from cubicweb.devtools import PostgresApptestConfiguration, startpgcluster, stoppgcluster
       
    29 from cubicweb.devtools.testlib import CubicWebTC
       
    30 from cubicweb.predicates import is_instance
       
    31 from cubicweb.entities.adapters import IFTIndexableAdapter
       
    32 
       
    33 from unittest_querier import FixedOffset
       
    34 
       
    35 
       
    36 def setUpModule():
       
    37     startpgcluster(__file__)
       
    38 
       
    39 
       
    40 def tearDownModule():
       
    41     stoppgcluster(__file__)
       
    42 
       
    43 
       
    44 class PostgresTimeoutConfiguration(PostgresApptestConfiguration):
       
    45     def __init__(self, *args, **kwargs):
       
    46         self.default_sources = PostgresApptestConfiguration.default_sources.copy()
       
    47         self.default_sources['system'] = PostgresApptestConfiguration.default_sources['system'].copy()
       
    48         self.default_sources['system']['db-statement-timeout'] = 200
       
    49         super(PostgresTimeoutConfiguration, self).__init__(*args, **kwargs)
       
    50 
       
    51 
       
    52 class PostgresFTITC(CubicWebTC):
       
    53     configcls = PostgresTimeoutConfiguration
       
    54 
       
    55     @classmethod
       
    56     def setUpClass(cls):
       
    57         cls.orig_connect_hooks = lgdb.SQL_CONNECT_HOOKS['postgres'][:]
       
    58 
       
    59     @classmethod
       
    60     def tearDownClass(cls):
       
    61         lgdb.SQL_CONNECT_HOOKS['postgres'] = cls.orig_connect_hooks
       
    62 
       
    63     def test_eid_range(self):
       
    64         # concurrent allocation of eid ranges
       
    65         source = self.session.repo.sources_by_uri['system']
       
    66         range1 = []
       
    67         range2 = []
       
    68         def allocate_eid_ranges(session, target):
       
    69             for x in range(1, 10):
       
    70                 eid = source.create_eid(session, count=x)
       
    71                 target.extend(range(eid-x, eid))
       
    72 
       
    73         t1 = Thread(target=lambda: allocate_eid_ranges(self.session, range1))
       
    74         t2 = Thread(target=lambda: allocate_eid_ranges(self.session, range2))
       
    75         t1.start()
       
    76         t2.start()
       
    77         t1.join()
       
    78         t2.join()
       
    79         self.assertEqual(range1, sorted(range1))
       
    80         self.assertEqual(range2, sorted(range2))
       
    81         self.assertEqual(set(), set(range1) & set(range2))
       
    82 
       
    83     def test_occurence_count(self):
       
    84         with self.admin_access.repo_cnx() as cnx:
       
    85             c1 = cnx.create_entity('Card', title=u'c1',
       
    86                                    content=u'cubicweb cubicweb cubicweb')
       
    87             c2 = cnx.create_entity('Card', title=u'c3',
       
    88                                    content=u'cubicweb')
       
    89             c3 = cnx.create_entity('Card', title=u'c2',
       
    90                                    content=u'cubicweb cubicweb')
       
    91             cnx.commit()
       
    92             self.assertEqual(cnx.execute('Card X ORDERBY FTIRANK(X) DESC '
       
    93                                          'WHERE X has_text "cubicweb"').rows,
       
    94                              [[c1.eid,], [c3.eid,], [c2.eid,]])
       
    95 
       
    96 
       
    97     def test_attr_weight(self):
       
    98         class CardIFTIndexableAdapter(IFTIndexableAdapter):
       
    99             __select__ = is_instance('Card')
       
   100             attr_weight = {'title': 'A'}
       
   101         with self.temporary_appobjects(CardIFTIndexableAdapter):
       
   102             with self.admin_access.repo_cnx() as cnx:
       
   103                 c1 = cnx.create_entity('Card', title=u'c1',
       
   104                                        content=u'cubicweb cubicweb cubicweb')
       
   105                 c2 = cnx.create_entity('Card', title=u'c2',
       
   106                                        content=u'cubicweb cubicweb')
       
   107                 c3 = cnx.create_entity('Card', title=u'cubicweb',
       
   108                                        content=u'autre chose')
       
   109                 cnx.commit()
       
   110                 self.assertEqual(cnx.execute('Card X ORDERBY FTIRANK(X) DESC '
       
   111                                              'WHERE X has_text "cubicweb"').rows,
       
   112                                  [[c3.eid,], [c1.eid,], [c2.eid,]])
       
   113 
       
   114     def test_entity_weight(self):
       
   115         class PersonneIFTIndexableAdapter(IFTIndexableAdapter):
       
   116             __select__ = is_instance('Personne')
       
   117             entity_weight = 2.0
       
   118         with self.temporary_appobjects(PersonneIFTIndexableAdapter):
       
   119             with self.admin_access.repo_cnx() as cnx:
       
   120                 c1 = cnx.create_entity('Personne', nom=u'c1', prenom=u'cubicweb')
       
   121                 c2 = cnx.create_entity('Comment', content=u'cubicweb cubicweb',
       
   122                                        comments=c1)
       
   123                 c3 = cnx.create_entity('Comment', content=u'cubicweb cubicweb cubicweb',
       
   124                                        comments=c1)
       
   125                 cnx.commit()
       
   126                 self.assertEqual(cnx.execute('Any X ORDERBY FTIRANK(X) DESC '
       
   127                                              'WHERE X has_text "cubicweb"').rows,
       
   128                                   [[c1.eid,], [c3.eid,], [c2.eid,]])
       
   129 
       
   130     def test_tz_datetime(self):
       
   131         with self.admin_access.repo_cnx() as cnx:
       
   132             bob = cnx.create_entity('Personne', nom=u'bob',
       
   133                                    tzdatenaiss=datetime(1977, 6, 7, 2, 0, tzinfo=FixedOffset(1)))
       
   134             datenaiss = cnx.execute("Any XD WHERE X nom 'bob', X tzdatenaiss XD")[0][0]
       
   135             self.assertIsNotNone(datenaiss.tzinfo)
       
   136             self.assertEqual(datenaiss.utctimetuple()[:5], (1977, 6, 7, 1, 0))
       
   137             cnx.commit()
       
   138             cnx.create_entity('Personne', nom=u'boby',
       
   139                               tzdatenaiss=datetime(1977, 6, 7, 2, 0))
       
   140             datenaiss = cnx.execute("Any XD WHERE X nom 'boby', X tzdatenaiss XD")[0][0]
       
   141             self.assertIsNotNone(datenaiss.tzinfo)
       
   142             self.assertEqual(datenaiss.utctimetuple()[:5], (1977, 6, 7, 2, 0))
       
   143             rset = cnx.execute("Any X WHERE X tzdatenaiss %(d)s",
       
   144                                {'d': datetime(1977, 6, 7, 2, 0, tzinfo=FixedOffset(1))})
       
   145             self.assertEqual(rset.rows, [[bob.eid]])
       
   146 
       
   147     def test_constraint_validationerror(self):
       
   148         with self.admin_access.repo_cnx() as cnx:
       
   149             with cnx.allow_all_hooks_but('integrity'):
       
   150                 with self.assertRaises(ValidationError) as cm:
       
   151                     cnx.execute("INSERT Note N: N type 'nogood'")
       
   152                 self.assertEqual(cm.exception.errors,
       
   153                         {'type-subject': u'invalid value %(KEY-value)s, it must be one of %(KEY-choices)s'})
       
   154                 self.assertEqual(cm.exception.msgargs,
       
   155                         {'type-subject-value': u'"nogood"',
       
   156                          'type-subject-choices': u'"todo", "a", "b", "T", "lalala"'})
       
   157 
       
   158     def test_statement_timeout(self):
       
   159         with self.admin_access.repo_cnx() as cnx:
       
   160             cnx.system_sql('select pg_sleep(0.1)')
       
   161             with self.assertRaises(Exception):
       
   162                 cnx.system_sql('select pg_sleep(0.3)')
       
   163 
       
   164 
       
   165 class PostgresLimitSizeTC(CubicWebTC):
       
   166     configcls = PostgresApptestConfiguration
       
   167 
       
   168     def test(self):
       
   169         with self.admin_access.repo_cnx() as cnx:
       
   170             def sql(string):
       
   171                 return cnx.system_sql(string).fetchone()[0]
       
   172             yield self.assertEqual, sql("SELECT limit_size('<p>hello</p>', 'text/html', 20)"), \
       
   173                 '<p>hello</p>'
       
   174             yield self.assertEqual, sql("SELECT limit_size('<p>hello</p>', 'text/html', 2)"), \
       
   175                 'he...'
       
   176             yield self.assertEqual, sql("SELECT limit_size('<br/>hello', 'text/html', 2)"), \
       
   177                 'he...'
       
   178             yield self.assertEqual, sql("SELECT limit_size('<span class=\"1\">he</span>llo', 'text/html', 2)"), \
       
   179                 'he...'
       
   180             yield self.assertEqual, sql("SELECT limit_size('<span>a>b</span>', 'text/html', 2)"), \
       
   181                 'a>...'
       
   182 
       
   183 
       
   184 if __name__ == '__main__':
       
   185     from logilab.common.testlib import unittest_main
       
   186     unittest_main()