[test] support for test on postgres database using the same mecanism as sqlite: one template database generated when necessary + actual test database created from the template
authorSylvain Thénault <sylvain.thenault@logilab.fr>
Wed, 16 Jun 2010 09:21:49 +0200
changeset 5754 51179e0bb250
parent 5753 cd20ddaef124
child 5762 730d458ec1bf
[test] support for test on postgres database using the same mecanism as sqlite: one template database generated when necessary + actual test database created from the template * * * [test] reset postgres database between test of the same fixture
devtools/__init__.py
server/serverctl.py
--- a/devtools/__init__.py	Tue Jun 15 18:19:44 2010 +0200
+++ b/devtools/__init__.py	Wed Jun 16 09:21:49 2010 +0200
@@ -21,6 +21,7 @@
 __docformat__ = "restructuredtext en"
 
 import os
+import sys
 import logging
 from datetime import timedelta
 from os.path import (abspath, join, exists, basename, dirname, normpath, split,
@@ -182,7 +183,8 @@
         return ('en', 'fr', 'de')
 
     def pyro_enabled(self):
-        # but export PYRO_MULTITHREAD=0 or you get problems with sqlite and threads
+        # but export PYRO_MULTITHREAD=0 or you get problems with sqlite and
+        # threads
         return True
 
 
@@ -206,8 +208,6 @@
         init_test_database_sqlite(config)
     elif driver == 'postgres':
         init_test_database_postgres(config)
-    elif driver == 'sqlserver2005':
-        init_test_database_sqlserver2005(config)
     else:
         raise ValueError('no initialization function for driver %r' % driver)
     config._cubes = None # avoid assertion error
@@ -223,10 +223,8 @@
     driver = config.sources()['system']['db-driver']
     if driver == 'sqlite':
         reset_test_database_sqlite(config)
-    elif driver in ('sqlserver2005', 'postgres'):
-        # XXX do something with dump/restore ?
-        print 'resetting the database is not done for', driver
-        print 'you should handle it manually'
+    elif driver == 'postgres':
+        init_test_database_postgres(config)
     else:
         raise ValueError('no reset function for driver %r' % driver)
 
@@ -235,11 +233,46 @@
 
 def init_test_database_postgres(config):
     """initialize a fresh postgresql databse used for testing purpose"""
-    if config.init_repository:
-        from cubicweb.server import init_repository
-        init_repository(config, interactive=False, drop=True)
+    from logilab.database import get_db_helper
+    from cubicweb.server import init_repository
+    from cubicweb.server.serverctl import (createdb, system_source_cnx,
+                                           _db_sys_cnx)
+    source = config.sources()['system']
+    dbname = source['db-name']
+    templdbname = dbname + '_template'
+    helper = get_db_helper('postgres')
+    # connect on the dbms system base to create our base
+    dbcnx = _db_sys_cnx(source, 'CREATE DATABASE and / or USER', verbose=0)
+    cursor = dbcnx.cursor()
+    try:
+        if dbname in helper.list_databases(cursor):
+            cursor.execute('DROP DATABASE %s' % dbname)
+        if not templdbname in helper.list_databases(cursor):
+            source['db-name'] = templdbname
+            createdb(helper, source, dbcnx, cursor)
+            dbcnx.commit()
+            cnx = system_source_cnx(source, special_privs='LANGUAGE C', verbose=0)
+            templcursor = cnx.cursor()
+            # XXX factorize with db-create code
+            helper.init_fti_extensions(templcursor)
+            # install plpythonu/plpgsql language if not installed by the cube
+            langs = sys.platform == 'win32' and ('plpgsql',) or ('plpythonu', 'plpgsql')
+            for extlang in langs:
+                helper.create_language(templcursor, extlang)
+            cnx.commit()
+            templcursor.close()
+            cnx.close()
+            init_repository(config, interactive=False)
+            source['db-name'] = dbname
+    except:
+        dbcnx.rollback()
+        # XXX drop template
+        raise
+    createdb(helper, source, dbcnx, cursor, template=templdbname)
+    dbcnx.commit()
+    dbcnx.close()
 
-### sqlserver2005 test database handling ############################################
+### sqlserver2005 test database handling #######################################
 
 def init_test_database_sqlserver2005(config):
     """initialize a fresh sqlserver databse used for testing purpose"""
--- a/server/serverctl.py	Tue Jun 15 18:19:44 2010 +0200
+++ b/server/serverctl.py	Wed Jun 16 09:21:49 2010 +0200
@@ -48,14 +48,16 @@
     if dbname is None:
         dbname = source['db-name']
     driver = source['db-driver']
-    print '-> connecting to %s database' % driver,
-    if dbhost:
-        print '%s@%s' % (dbname, dbhost),
-    else:
-        print dbname,
+    if verbose:
+        print '-> connecting to %s database' % driver,
+        if dbhost:
+            print '%s@%s' % (dbname, dbhost),
+        else:
+            print dbname,
     if not verbose or (not special_privs and source.get('db-user')):
         user = source['db-user']
-        print 'as', user
+        if verbose:
+            print 'as', user
         if source.get('db-password'):
             password = source['db-password']
         else:
@@ -272,6 +274,14 @@
 
 # repository specific commands ################################################
 
+def createdb(helper, source, dbcnx, cursor, **kwargs):
+    if dbcnx.logged_user != source['db-user']:
+        helper.create_database(cursor, source['db-name'], source['db-user'],
+                               source['db-encoding'], **kwargs)
+    else:
+        helper.create_database(cursor, source['db-name'],
+                               dbencoding=source['db-encoding'], **kwargs)
+
 class CreateInstanceDBCommand(Command):
     """Create the system database of an instance (run after 'create').
 
@@ -314,14 +324,13 @@
         source = config.sources()['system']
         dbname = source['db-name']
         driver = source['db-driver']
-        create_db = self.config.create_db
         helper = get_db_helper(driver)
         if driver == 'sqlite':
             if os.path.exists(dbname) and (
                 automatic or
                 ASK.confirm('Database %s already exists. Drop it?' % dbname)):
                 os.unlink(dbname)
-        elif create_db:
+        elif self.config.create_db:
             print '\n'+underline_title('Creating the system database')
             # connect on the dbms system base to create our base
             dbcnx = _db_sys_cnx(source, 'CREATE DATABASE and / or USER', verbose=verbose)
@@ -338,12 +347,7 @@
                         cursor.execute('DROP DATABASE %s' % dbname)
                     else:
                         return
-                if dbcnx.logged_user != source['db-user']:
-                    helper.create_database(cursor, dbname, source['db-user'],
-                                           source['db-encoding'])
-                else:
-                    helper.create_database(cursor, dbname,
-                                           dbencoding=source['db-encoding'])
+                createdb(helper, source, dbcnx, cursor)
                 dbcnx.commit()
                 print '-> database %s created.' % dbname
             except: