adds support for a portable db import/export format (closes: #1521112)
the db-dump and db-restore cwctl commands are enhanced with a --format option
(defaults to 'native'). If the format is 'portable', then the system database
dump / restore is performed using pure SQL queries and the storage uses picked
Python objects in a zip archive.
--- a/server/migractions.py Thu Apr 21 16:33:55 2011 +0200
+++ b/server/migractions.py Thu Apr 21 12:35:41 2011 +0200
@@ -162,7 +162,7 @@
# server specific migration methods ########################################
- def backup_database(self, backupfile=None, askconfirm=True):
+ def backup_database(self, backupfile=None, askconfirm=True, format='native'):
config = self.config
repo = self.repo_connect()
# paths
@@ -185,16 +185,24 @@
# backup
tmpdir = tempfile.mkdtemp()
try:
+ failed = False
for source in repo.sources:
try:
- source.backup(osp.join(tmpdir, source.uri), self.confirm)
+ source.backup(osp.join(tmpdir, source.uri), self.confirm, format=format)
except Exception, ex:
print '-> error trying to backup %s [%s]' % (source.uri, ex)
if not self.confirm('Continue anyway?', default='n'):
raise SystemExit(1)
else:
- break
- else:
+ failed = True
+ with open(osp.join(tmpdir, 'format.txt'), 'w') as format_file:
+ format_file.write('%s\n' % format)
+ with open(osp.join(tmpdir, 'versions.txt'), 'w') as version_file:
+ versions = repo.get_versions()
+ for cube, version in versions.iteritems():
+ version_file.write('%s %s\n' % (cube, version))
+
+ if not failed:
bkup = tarfile.open(backupfile, 'w|gz')
for filename in os.listdir(tmpdir):
bkup.add(osp.join(tmpdir, filename), filename)
@@ -207,7 +215,7 @@
shutil.rmtree(tmpdir)
def restore_database(self, backupfile, drop=True, systemonly=True,
- askconfirm=True):
+ askconfirm=True, format='native'):
# check
if not osp.exists(backupfile):
raise ExecutionError("Backup file %s doesn't exist" % backupfile)
@@ -229,13 +237,18 @@
bkup = tarfile.open(backupfile, 'r|gz')
bkup.extractall(path=tmpdir)
bkup.close()
+ if osp.isfile(osp.join(tmpdir, 'format.txt')):
+ with open(osp.join(tmpdir, 'format.txt')) as format_file:
+ written_format = format_file.readline().strip()
+ if written_format in ('portable', 'native'):
+ format = written_format
self.config.open_connections_pools = False
repo = self.repo_connect()
for source in repo.sources:
if systemonly and source.uri != 'system':
continue
try:
- source.restore(osp.join(tmpdir, source.uri), self.confirm, drop)
+ source.restore(osp.join(tmpdir, source.uri), self.confirm, drop, format)
except Exception, exc:
print '-> error trying to restore %s [%s]' % (source.uri, exc)
if not self.confirm('Continue anyway?', default='n'):
--- a/server/serverctl.py Thu Apr 21 16:33:55 2011 +0200
+++ b/server/serverctl.py Thu Apr 21 12:35:41 2011 +0200
@@ -691,19 +691,20 @@
'Continue anyway?' % filename):
raise ExecutionError('Error while deleting remote dump at /tmp/%s' % filename)
-def _local_dump(appid, output):
+
+def _local_dump(appid, output, format='native'):
config = ServerConfiguration.config_for(appid)
config.quick_start = True
mih = config.migration_handler(connect=False, verbosity=1)
- mih.backup_database(output, askconfirm=False)
+ mih.backup_database(output, askconfirm=False, format=format)
mih.shutdown()
-def _local_restore(appid, backupfile, drop, systemonly=True):
+def _local_restore(appid, backupfile, drop, systemonly=True, format='native'):
config = ServerConfiguration.config_for(appid)
config.verbosity = 1 # else we won't be asked for confirmation on problems
config.quick_start = True
mih = config.migration_handler(connect=False, verbosity=1)
- mih.restore_database(backupfile, drop, systemonly, askconfirm=False)
+ mih.restore_database(backupfile, drop, systemonly, askconfirm=False, format=format)
repo = mih.repo_connect()
# version of the database
dbversions = repo.get_versions()
@@ -777,6 +778,12 @@
'default' : False,
'help': 'Use sudo on the remote host.'}
),
+ ('format',
+ {'short': 'f', 'default': 'native', 'type': 'choice',
+ 'choices': ('native', 'portable'),
+ 'help': '"native" format uses db backend utilities to dump the database. '
+ '"portable" format uses a database independent format'}
+ ),
)
def run(self, args):
@@ -785,7 +792,9 @@
host, appid = appid.split(':')
_remote_dump(host, appid, self.config.output, self.config.sudo)
else:
- _local_dump(appid, self.config.output)
+ _local_dump(appid, self.config.output, format=self.config.format)
+
+
class DBRestoreCommand(Command):
@@ -811,13 +820,33 @@
'instance data. In that case, <backupfile> is expected to be the '
'timestamp of the backup to restore, not a file'}
),
+ ('format',
+ {'short': 'f', 'default': 'native', 'type': 'choice',
+ 'choices': ('native', 'portable'),
+ 'help': 'the format used when dumping the database'}),
)
def run(self, args):
appid, backupfile = args
+ if self.config.format == 'portable':
+ # we need to ensure a DB exist before restoring from portable format
+ if not self.config.no_drop:
+ try:
+ CWCTL.run(['db-create', '--automatic', appid])
+ except SystemExit, exc:
+ # continue if the command exited with status 0 (success)
+ if exc.code:
+ raise
_local_restore(appid, backupfile,
drop=not self.config.no_drop,
- systemonly=not self.config.restore_all)
+ systemonly=not self.config.restore_all,
+ format=self.config.format)
+ if self.config.format == 'portable':
+ try:
+ CWCTL.run(['db-rebuild-fti', appid])
+ except SystemExit, exc:
+ if exc.code:
+ raise
class DBCopyCommand(Command):
@@ -850,6 +879,12 @@
'default' : False,
'help': 'Use sudo on the remote host.'}
),
+ ('format',
+ {'short': 'f', 'default': 'native', 'type': 'choice',
+ 'choices': ('native', 'portable'),
+ 'help': '"native" format uses db backend utilities to dump the database. '
+ '"portable" format uses a database independent format'}
+ ),
)
def run(self, args):
@@ -861,8 +896,9 @@
host, srcappid = srcappid.split(':')
_remote_dump(host, srcappid, output, self.config.sudo)
else:
- _local_dump(srcappid, output)
- _local_restore(destappid, output, not self.config.no_drop)
+ _local_dump(srcappid, output, format=self.config.format)
+ _local_restore(destappid, output, not self.config.no_drop,
+ self.config.format)
if self.config.keep_dump:
print '-> you can get the dump file at', output
else:
--- a/server/sources/__init__.py Thu Apr 21 16:33:55 2011 +0200
+++ b/server/sources/__init__.py Thu Apr 21 12:35:41 2011 +0200
@@ -139,11 +139,11 @@
return -1
return cmp(self.uri, other.uri)
- def backup(self, backupfile, confirm):
+ def backup(self, backupfile, confirm, format='native'):
"""method called to create a backup of source's data"""
pass
- def restore(self, backupfile, confirm, drop):
+ def restore(self, backupfile, confirm, drop, format='native'):
"""method called to restore a backup of source's data"""
pass
--- a/server/sources/native.py Thu Apr 21 16:33:55 2011 +0200
+++ b/server/sources/native.py Thu Apr 21 12:35:41 2011 +0200
@@ -28,21 +28,29 @@
__docformat__ = "restructuredtext en"
-from pickle import loads, dumps
+try:
+ from cPickle import loads, dumps
+ import cPickle as pickle
+except ImportError:
+ from pickle import loads, dumps
+ import pickle
from threading import Lock
from datetime import datetime
from base64 import b64decode, b64encode
from contextlib import contextmanager
-from os.path import abspath
+from os.path import abspath, basename
import re
import itertools
+import zipfile
+import logging
+import sys
from logilab.common.compat import any
from logilab.common.cache import Cache
from logilab.common.decorators import cached, clear_cache
from logilab.common.configuration import Method
from logilab.common.shellutils import getlogin
-from logilab.database import get_db_helper
+from logilab.database import get_db_helper, sqlgen
from yams import schema2sql as y2sql
from yams.schema import role_name
@@ -354,24 +362,44 @@
_pool.pool_reset()
self.repo._free_pool(_pool)
- def backup(self, backupfile, confirm):
+ def backup(self, backupfile, confirm, format='native'):
"""method called to create a backup of the source's data"""
- self.close_pool_connections()
- try:
- self.backup_to_file(backupfile, confirm)
- finally:
- self.open_pool_connections()
+ if format == 'portable':
+ self.repo.fill_schema()
+ self.set_schema(self.repo.schema)
+ helper = DatabaseIndependentBackupRestore(self)
+ self.close_pool_connections()
+ try:
+ helper.backup(backupfile)
+ finally:
+ self.open_pool_connections()
+ elif format == 'native':
+ self.close_pool_connections()
+ try:
+ self.backup_to_file(backupfile, confirm)
+ finally:
+ self.open_pool_connections()
+ else:
+ raise ValueError('Unknown format %r' % format)
- def restore(self, backupfile, confirm, drop):
+
+ def restore(self, backupfile, confirm, drop, format='native'):
"""method called to restore a backup of source's data"""
if self.repo.config.open_connections_pools:
self.close_pool_connections()
try:
- self.restore_from_file(backupfile, confirm, drop=drop)
+ if format == 'portable':
+ helper = DatabaseIndependentBackupRestore(self)
+ helper.restore(backupfile)
+ elif format == 'native':
+ self.restore_from_file(backupfile, confirm, drop=drop)
+ else:
+ raise ValueError('Unknown format %r' % format)
finally:
if self.repo.config.open_connections_pools:
self.open_pool_connections()
+
def init(self, activated, source_entity):
self.init_creating(source_entity._cw.pool)
@@ -1564,3 +1592,218 @@
login = rset.rows[0][0]
authinfo['email_auth'] = True
return self.source.repo.check_auth_info(session, login, authinfo)
+
+class DatabaseIndependentBackupRestore(object):
+ """Helper class to perform db backend agnostic backup and restore
+
+ The backup and restore methods are used to dump / restore the
+ system database in a database independent format. The file is a
+ Zip archive containing the following files:
+
+ * format.txt: the format of the archive. Currently '1.0'
+ * tables.txt: list of filenames in the archive tables/ directory
+ * sequences.txt: list of filenames in the archive sequences/ directory
+ * versions.txt: the list of cube versions from CWProperty
+ * tables/<tablename>.<chunkno>: pickled data
+ * sequences/<sequencename>: pickled data
+
+ The pickled data format for tables and sequences is a tuple of 3 elements:
+ * the table name
+ * a tuple of column names
+ * a list of rows (as tuples with one element per column)
+
+ Tables are saved in chunks in different files in order to prevent
+ a too high memory consumption.
+ """
+ def __init__(self, source):
+ """
+ :param: source an instance of the system source
+ """
+ self._source = source
+ self.logger = logging.getLogger('cubicweb.ctl')
+ self.logger.setLevel(logging.INFO)
+ self.logger.addHandler(logging.StreamHandler(sys.stdout))
+ self.schema = self._source.schema
+ self.dbhelper = self._source.dbhelper
+ self.cnx = None
+ self.cursor = None
+ self.sql_generator = sqlgen.SQLGenerator()
+
+ def get_connection(self):
+ return self._source.get_connection()
+
+ def backup(self, backupfile):
+ archive=zipfile.ZipFile(backupfile, 'w')
+ self.cnx = self.get_connection()
+ try:
+ self.cursor = self.cnx.cursor()
+ self.cursor.arraysize=100
+ self.logger.info('writing metadata')
+ self.write_metadata(archive)
+ for seq in self.get_sequences():
+ self.logger.info('processing sequence %s', seq)
+ self.write_sequence(archive, seq)
+ for table in self.get_tables():
+ self.logger.info('processing table %s', table)
+ self.write_table(archive, table)
+ finally:
+ archive.close()
+ self.cnx.close()
+ self.logger.info('done')
+
+ def get_tables(self):
+ non_entity_tables = ['entities',
+ 'deleted_entities',
+ 'transactions',
+ 'tx_entity_actions',
+ 'tx_relation_actions',
+ ]
+ etype_tables = []
+ relation_tables = []
+ prefix = 'cw_'
+ for etype in self.schema.entities():
+ eschema = self.schema.eschema(etype)
+ print etype, eschema.final
+ if eschema.final:
+ continue
+ etype_tables.append('%s%s'%(prefix, etype))
+ for rtype in self.schema.relations():
+ rschema = self.schema.rschema(rtype)
+ if rschema.final or rschema.inlined:
+ continue
+ relation_tables.append('%s_relation' % rtype)
+ return non_entity_tables + etype_tables + relation_tables
+
+ def get_sequences(self):
+ return ['entities_id_seq']
+
+ def write_metadata(self, archive):
+ archive.writestr('format.txt', '1.0')
+ archive.writestr('tables.txt', '\n'.join(self.get_tables()))
+ archive.writestr('sequences.txt', '\n'.join(self.get_sequences()))
+ versions = self._get_versions()
+ versions_str = '\n'.join('%s %s' % (k,v)
+ for k,v in versions)
+ archive.writestr('versions.txt', versions_str)
+
+ def write_sequence(self, archive, seq):
+ sql = self.dbhelper.sql_sequence_current_state(seq)
+ columns, rows_iterator = self._get_cols_and_rows(sql)
+ rows = list(rows_iterator)
+ serialized = self._serialize(seq, columns, rows)
+ archive.writestr('sequences/%s' % seq, serialized)
+
+ def write_table(self, archive, table):
+ sql = 'SELECT * FROM %s' % table
+ columns, rows_iterator = self._get_cols_and_rows(sql)
+ self.logger.info('number of rows: %d', self.cursor.rowcount)
+ if table.startswith('cw_'): # entities
+ blocksize = 2000
+ else: # relations and metadata
+ blocksize = 10000
+ if self.cursor.rowcount > 0:
+ for i, start in enumerate(xrange(0, self.cursor.rowcount, blocksize)):
+ rows = list(itertools.islice(rows_iterator, blocksize))
+ serialized = self._serialize(table, columns, rows)
+ archive.writestr('tables/%s.%04d' % (table, i), serialized)
+ self.logger.debug('wrote rows %d to %d (out of %d) to %s.%04d',
+ start, start+len(rows)-1,
+ self.cursor.rowcount,
+ table, i)
+ else:
+ rows = []
+ serialized = self._serialize(table, columns, rows)
+ archive.writestr('tables/%s.%04d' % (table, 0), serialized)
+
+ def _get_cols_and_rows(self, sql):
+ process_result = self._source.iter_process_result
+ self.cursor.execute(sql)
+ columns = (d[0] for d in self.cursor.description)
+ rows = process_result(self.cursor)
+ return tuple(columns), rows
+
+ def _serialize(self, name, columns, rows):
+ return dumps((name, columns, rows), pickle.HIGHEST_PROTOCOL)
+
+ def restore(self, backupfile):
+ archive = zipfile.ZipFile(backupfile, 'r')
+ self.cnx = self.get_connection()
+ self.cursor = self.cnx.cursor()
+ sequences, tables, table_chunks = self.read_metadata(archive, backupfile)
+ for seq in sequences:
+ self.logger.info('restoring sequence %s', seq)
+ self.read_sequence(archive, seq)
+ for table in tables:
+ self.logger.info('restoring table %s', table)
+ self.read_table(archive, table, sorted(table_chunks[table]))
+ self.cnx.close()
+ archive.close()
+ self.logger.info('done')
+
+ def read_metadata(self, archive, backupfile):
+ formatinfo = archive.read('format.txt')
+ self.logger.info('checking metadata')
+ if formatinfo.strip() != "1.0":
+ self.logger.critical('Unsupported format in archive: %s', formatinfo)
+ raise ValueError('Unknown format in %s: %s' % (backupfile, formatinfo))
+ tables = archive.read('tables.txt').splitlines()
+ sequences = archive.read('sequences.txt').splitlines()
+ file_versions = self._parse_versions(archive.read('versions.txt'))
+ versions = set(self._get_versions())
+ if file_versions != versions:
+ self.logger.critical('Unable to restore : versions do not match')
+ self.logger.critical('Expected:\n%s', '\n'.join(list(sorted(versions))))
+ self.logger.critical('Found:\n%s', '\n'.join(list(sorted(file_versions))))
+ raise ValueError('Unable to restore : versions do not match')
+ table_chunks = {}
+ for name in archive.namelist():
+ if not name.startswith('tables/'):
+ continue
+ filename = basename(name)
+ tablename, _ext = filename.rsplit('.', 1)
+ table_chunks.setdefault(tablename, []).append(name)
+ return sequences, tables, table_chunks
+
+ def read_sequence(self, archive, seq):
+ seqname, columns, rows = loads(archive.read('sequences/%s' % seq))
+ assert seqname == seq
+ assert len(rows) == 1
+ assert len(rows[0]) == 1
+ value = rows[0][0]
+ sql = self.dbhelper.sql_restart_sequence(seq, value)
+ self.cursor.execute(sql)
+ self.cnx.commit()
+
+ def read_table(self, archive, table, filenames):
+ merge_args = self._source.merge_args
+ self.cursor.execute('DELETE FROM %s' % table)
+ self.cnx.commit()
+ row_count = 0
+ for filename in filenames:
+ tablename, columns, rows = loads(archive.read(filename))
+ assert tablename == table
+ if not rows:
+ continue
+ insert = self.sql_generator.insert(table,
+ dict(zip(columns, rows[0])))
+ for row in rows:
+ self.cursor.execute(insert, merge_args(dict(zip(columns, row)), {}))
+ row_count += len(rows)
+ self.cnx.commit()
+ self.logger.info('inserted %d rows', row_count)
+
+
+ def _parse_versions(self, version_str):
+ versions = set()
+ for line in version_str.splitlines():
+ versions.add(tuple(line.split()))
+ return versions
+
+ def _get_versions(self):
+ version_sql = 'SELECT cw_pkey, cw_value FROM cw_CWProperty'
+ versions = []
+ self.cursor.execute(version_sql)
+ for pkey, value in self.cursor.fetchall():
+ if pkey.startswith(u'system.version'):
+ versions.append((pkey, value))
+ return versions