cubicweb/server/sources/native.py
changeset 11057 0b59724cb3f2
parent 11005 f8417bd135ed
child 11087 35b29f1eb37a
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/cubicweb/server/sources/native.py	Sat Jan 16 13:48:51 2016 +0100
@@ -0,0 +1,1813 @@
+# copyright 2003-2014 LOGILAB S.A. (Paris, FRANCE), all rights reserved.
+# contact http://www.logilab.fr/ -- mailto:contact@logilab.fr
+#
+# This file is part of CubicWeb.
+#
+# CubicWeb is free software: you can redistribute it and/or modify it under the
+# terms of the GNU Lesser General Public License as published by the Free
+# Software Foundation, either version 2.1 of the License, or (at your option)
+# any later version.
+#
+# CubicWeb is distributed in the hope that it will be useful, but WITHOUT
+# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
+# FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for more
+# details.
+#
+# You should have received a copy of the GNU Lesser General Public License along
+# with CubicWeb.  If not, see <http://www.gnu.org/licenses/>.
+"""Adapters for native cubicweb sources.
+
+Notes:
+* extid (aka external id, the primary key of an entity in the external source
+  from which it comes from) are stored in a varchar column encoded as a base64
+  string. This is because it should actually be Bytes but we want an index on
+  it for fast querying.
+"""
+from __future__ import print_function
+
+__docformat__ = "restructuredtext en"
+
+from threading import Lock
+from datetime import datetime
+from base64 import b64encode
+from contextlib import contextmanager
+from os.path import basename
+import re
+import itertools
+import zipfile
+import logging
+import sys
+
+from six import PY2, text_type, binary_type, string_types
+from six.moves import range, cPickle as pickle
+
+from logilab.common.decorators import cached, clear_cache
+from logilab.common.configuration import Method
+from logilab.common.shellutils import getlogin
+from logilab.database import get_db_helper, sqlgen
+
+from yams.schema import role_name
+
+from cubicweb import (UnknownEid, AuthenticationError, ValidationError, Binary,
+                      UniqueTogetherError, UndoTransactionException, ViolatedConstraint)
+from cubicweb import transaction as tx, server, neg_role
+from cubicweb.utils import QueryCache
+from cubicweb.schema import VIRTUAL_RTYPES
+from cubicweb.cwconfig import CubicWebNoAppConfiguration
+from cubicweb.server import hook
+from cubicweb.server import schema2sql as y2sql
+from cubicweb.server.utils import crypt_password, eschema_eid, verify_and_update
+from cubicweb.server.sqlutils import SQL_PREFIX, SQLAdapterMixIn
+from cubicweb.server.rqlannotation import set_qdata
+from cubicweb.server.hook import CleanupDeletedEidsCacheOp
+from cubicweb.server.edition import EditedEntity
+from cubicweb.server.sources import AbstractSource, dbg_st_search, dbg_results
+from cubicweb.server.sources.rql2sql import SQLGenerator
+from cubicweb.statsd_logger import statsd_timeit
+
+
+ATTR_MAP = {}
+NONSYSTEM_ETYPES = set()
+NONSYSTEM_RELATIONS = set()
+
+class LogCursor(object):
+    def __init__(self, cursor):
+        self.cu = cursor
+
+    def execute(self, query, args=None):
+        """Execute a query.
+        it's a function just so that it shows up in profiling
+        """
+        if server.DEBUG & server.DBG_SQL:
+            print('exec', query, args)
+        try:
+            self.cu.execute(str(query), args)
+        except Exception as ex:
+            print("sql: %r\n args: %s\ndbms message: %r" % (
+                query, args, ex.args[0]))
+            raise
+
+    def fetchall(self):
+        return self.cu.fetchall()
+
+    def fetchone(self):
+        return self.cu.fetchone()
+
+
+def sql_or_clauses(sql, clauses):
+    select, restr = sql.split(' WHERE ', 1)
+    restrclauses = restr.split(' AND ')
+    for clause in clauses:
+        restrclauses.remove(clause)
+    if restrclauses:
+        restr = '%s AND (%s)' % (' AND '.join(restrclauses),
+                                 ' OR '.join(clauses))
+    else:
+        restr = '(%s)' % ' OR '.join(clauses)
+    return '%s WHERE %s' % (select, restr)
+
+
+def rdef_table_column(rdef):
+    """return table and column used to store the given relation definition in
+    the database
+    """
+    return (SQL_PREFIX + str(rdef.subject),
+            SQL_PREFIX + str(rdef.rtype))
+
+
+def rdef_physical_info(dbhelper, rdef):
+    """return backend type and a boolean flag if NULL values should be allowed
+    for a given relation definition
+    """
+    if not rdef.object.final:
+        return dbhelper.TYPE_MAPPING['Int']
+    coltype = y2sql.type_from_rdef(dbhelper, rdef, creating=False)
+    allownull = rdef.cardinality[0] != '1'
+    return coltype, allownull
+
+
+class _UndoException(Exception):
+    """something went wrong during undoing"""
+
+    def __unicode__(self):
+        """Called by the unicode builtin; should return a Unicode object
+
+        Type of _UndoException message must be `unicode` by design in CubicWeb.
+        """
+        assert isinstance(self.args[0], text_type)
+        return self.args[0]
+
+
+def _undo_check_relation_target(tentity, rdef, role):
+    """check linked entity has not been redirected for this relation"""
+    card = rdef.role_cardinality(role)
+    if card in '?1' and tentity.related(rdef.rtype, role):
+        raise _UndoException(tentity._cw._(
+            "Can't restore %(role)s relation %(rtype)s to entity %(eid)s which "
+            "is already linked using this relation.")
+                            % {'role': neg_role(role),
+                               'rtype': rdef.rtype,
+                               'eid': tentity.eid})
+
+def _undo_rel_info(cnx, subj, rtype, obj):
+    entities = []
+    for role, eid in (('subject', subj), ('object', obj)):
+        try:
+            entities.append(cnx.entity_from_eid(eid))
+        except UnknownEid:
+            raise _UndoException(cnx._(
+                "Can't restore relation %(rtype)s, %(role)s entity %(eid)s"
+                " doesn't exist anymore.")
+                                % {'role': cnx._(role),
+                                   'rtype': cnx._(rtype),
+                                   'eid': eid})
+    sentity, oentity = entities
+    try:
+        rschema = cnx.vreg.schema.rschema(rtype)
+        rdef = rschema.rdefs[(sentity.cw_etype, oentity.cw_etype)]
+    except KeyError:
+        raise _UndoException(cnx._(
+            "Can't restore relation %(rtype)s between %(subj)s and "
+            "%(obj)s, that relation does not exists anymore in the "
+            "schema.")
+                            % {'rtype': cnx._(rtype),
+                               'subj': subj,
+                               'obj': obj})
+    return sentity, oentity, rdef
+
+def _undo_has_later_transaction(cnx, eid):
+    return cnx.system_sql('''\
+SELECT T.tx_uuid FROM transactions AS TREF, transactions AS T
+WHERE TREF.tx_uuid='%(txuuid)s' AND T.tx_uuid!='%(txuuid)s'
+AND T.tx_time>=TREF.tx_time
+AND (EXISTS(SELECT 1 FROM tx_entity_actions AS TEA
+            WHERE TEA.tx_uuid=T.tx_uuid AND TEA.eid=%(eid)s)
+     OR EXISTS(SELECT 1 FROM tx_relation_actions as TRA
+               WHERE TRA.tx_uuid=T.tx_uuid AND (
+                   TRA.eid_from=%(eid)s OR TRA.eid_to=%(eid)s))
+     )''' % {'txuuid': cnx.transaction_data['undoing_uuid'],
+             'eid': eid}).fetchone()
+
+
+class DefaultEidGenerator(object):
+    __slots__ = ('source', 'cnx', 'lock')
+
+    def __init__(self, source):
+        self.source = source
+        self.cnx = None
+        self.lock = Lock()
+
+    def close(self):
+        if self.cnx:
+            self.cnx.close()
+        self.cnx = None
+
+    def create_eid(self, _cnx, count=1):
+        # lock needed to prevent 'Connection is busy with results for another
+        # command (0)' errors with SQLServer
+        assert count > 0
+        with self.lock:
+            return self._create_eid(count)
+
+    def _create_eid(self, count):
+        # internal function doing the eid creation without locking.
+        # needed for the recursive handling of disconnections (otherwise we
+        # deadlock on self._eid_cnx_lock
+        source = self.source
+        if self.cnx is None:
+            self.cnx = source.get_connection()
+        cnx = self.cnx
+        try:
+            cursor = cnx.cursor()
+            for sql in source.dbhelper.sqls_increment_numrange('entities_id_seq', count):
+                cursor.execute(sql)
+            eid = cursor.fetchone()[0]
+        except (source.OperationalError, source.InterfaceError):
+            # FIXME: better detection of deconnection pb
+            source.warning("trying to reconnect create eid connection")
+            self.cnx = None
+            return self._create_eid(count)
+        except source.DbapiError as exc:
+            # We get this one with pyodbc and SQL Server when connection was reset
+            if exc.args[0] == '08S01':
+                source.warning("trying to reconnect create eid connection")
+                self.cnx = None
+                return self._create_eid(count)
+            else:
+                raise
+        except Exception: # WTF?
+            cnx.rollback()
+            self.cnx = None
+            source.exception('create eid failed in an unforeseen way on SQL statement %s', sql)
+            raise
+        else:
+            cnx.commit()
+            return eid
+
+
+class SQLITEEidGenerator(object):
+    __slots__ = ('source', 'lock')
+
+    def __init__(self, source):
+        self.source = source
+        self.lock = Lock()
+
+    def close(self):
+        pass
+
+    def create_eid(self, cnx, count=1):
+        assert count > 0
+        source = self.source
+        with self.lock:
+            for sql in source.dbhelper.sqls_increment_numrange('entities_id_seq', count):
+                cursor = source.doexec(cnx, sql)
+            return cursor.fetchone()[0]
+
+
+class NativeSQLSource(SQLAdapterMixIn, AbstractSource):
+    """adapter for source using the native cubicweb schema (see below)
+    """
+    sqlgen_class = SQLGenerator
+    options = (
+        ('db-driver',
+         {'type' : 'string',
+          'default': 'postgres',
+          # XXX use choice type
+          'help': 'database driver (postgres, sqlite, sqlserver2005)',
+          'group': 'native-source', 'level': 0,
+          }),
+        ('db-host',
+         {'type' : 'string',
+          'default': '',
+          'help': 'database host',
+          'group': 'native-source', 'level': 1,
+          }),
+        ('db-port',
+         {'type' : 'string',
+          'default': '',
+          'help': 'database port',
+          'group': 'native-source', 'level': 1,
+          }),
+        ('db-name',
+         {'type' : 'string',
+          'default': Method('default_instance_id'),
+          'help': 'database name',
+          'group': 'native-source', 'level': 0,
+          }),
+        ('db-namespace',
+         {'type' : 'string',
+          'default': '',
+          'help': 'database namespace (schema) name',
+          'group': 'native-source', 'level': 1,
+          }),
+        ('db-user',
+         {'type' : 'string',
+          'default': CubicWebNoAppConfiguration.mode == 'user' and getlogin() or 'cubicweb',
+          'help': 'database user',
+          'group': 'native-source', 'level': 0,
+          }),
+        ('db-password',
+         {'type' : 'password',
+          'default': '',
+          'help': 'database password',
+          'group': 'native-source', 'level': 0,
+          }),
+        ('db-encoding',
+         {'type' : 'string',
+          'default': 'utf8',
+          'help': 'database encoding',
+          'group': 'native-source', 'level': 1,
+          }),
+        ('db-extra-arguments',
+         {'type' : 'string',
+          'default': '',
+          'help': 'set to "Trusted_Connection" if you are using SQLServer and '
+                  'want trusted authentication for the database connection',
+          'group': 'native-source', 'level': 2,
+          }),
+        ('db-statement-timeout',
+         {'type': 'int',
+          'default': 0,
+          'help': 'sql statement timeout, in milliseconds (postgres only)',
+          'group': 'native-source', 'level': 2,
+          }),
+    )
+
+    def __init__(self, repo, source_config, *args, **kwargs):
+        SQLAdapterMixIn.__init__(self, source_config, repairing=repo.config.repairing)
+        self.authentifiers = [LoginPasswordAuthentifier(self)]
+        if repo.config['allow-email-login']:
+            self.authentifiers.insert(0, EmailPasswordAuthentifier(self))
+        AbstractSource.__init__(self, repo, source_config, *args, **kwargs)
+        # sql generator
+        self._rql_sqlgen = self.sqlgen_class(self.schema, self.dbhelper,
+                                             ATTR_MAP.copy())
+        # full text index helper
+        self.do_fti = not repo.config['delay-full-text-indexation']
+        # sql queries cache
+        self._cache = QueryCache(repo.config['rql-cache-size'])
+        # (etype, attr) / storage mapping
+        self._storages = {}
+        self.binary_to_str = self.dbhelper.dbapi_module.binary_to_str
+        if self.dbdriver == 'sqlite':
+            self.eid_generator = SQLITEEidGenerator(self)
+        else:
+            self.eid_generator = DefaultEidGenerator(self)
+        self.create_eid = self.eid_generator.create_eid
+
+    def check_config(self, source_entity):
+        """check configuration of source entity"""
+        if source_entity.host_config:
+            msg = source_entity._cw._('the system source has its configuration '
+                                      'stored on the file-system')
+            raise ValidationError(source_entity.eid, {role_name('config', 'subject'): msg})
+
+    def add_authentifier(self, authentifier):
+        self.authentifiers.append(authentifier)
+        authentifier.source = self
+        authentifier.set_schema(self.schema)
+
+    def reset_caches(self):
+        """method called during test to reset potential source caches"""
+        self._cache = QueryCache(self.repo.config['rql-cache-size'])
+
+    def clear_eid_cache(self, eid, etype):
+        """clear potential caches for the given eid"""
+        self._cache.pop('Any X WHERE X eid %s, X is %s' % (eid, etype), None)
+        self._cache.pop('Any X WHERE X eid %s' % eid, None)
+        self._cache.pop('Any %s' % eid, None)
+
+    @statsd_timeit
+    def sqlexec(self, cnx, sql, args=None):
+        """execute the query and return its result"""
+        return self.process_result(self.doexec(cnx, sql, args))
+
+    def init_creating(self, cnxset=None):
+        # check full text index availibility
+        if self.do_fti:
+            if cnxset is None:
+                _cnxset = self.repo._get_cnxset()
+            else:
+                _cnxset = cnxset
+            if not self.dbhelper.has_fti_table(_cnxset.cu):
+                if not self.repo.config.creating:
+                    self.critical('no text index table')
+                self.do_fti = False
+            if cnxset is None:
+                _cnxset.cnxset_freed()
+                self.repo._free_cnxset(_cnxset)
+
+    def backup(self, backupfile, confirm, format='native'):
+        """method called to create a backup of the source's data"""
+        if format == 'portable':
+            # ensure the schema is the one stored in the database: if repository
+            # started in quick_start mode, the file system's one has been loaded
+            # so force reload
+            if self.repo.config.quick_start:
+                self.repo.set_schema(self.repo.deserialize_schema(),
+                                     resetvreg=False)
+            helper = DatabaseIndependentBackupRestore(self)
+            self.close_source_connections()
+            try:
+                helper.backup(backupfile)
+            finally:
+                self.open_source_connections()
+        elif format == 'native':
+            self.close_source_connections()
+            try:
+                self.backup_to_file(backupfile, confirm)
+            finally:
+                self.open_source_connections()
+        else:
+            raise ValueError('Unknown format %r' % format)
+
+
+    def restore(self, backupfile, confirm, drop, format='native'):
+        """method called to restore a backup of source's data"""
+        if self.repo.config.init_cnxset_pool:
+            self.close_source_connections()
+        try:
+            if format == 'portable':
+                helper = DatabaseIndependentBackupRestore(self)
+                helper.restore(backupfile)
+            elif format == 'native':
+                self.restore_from_file(backupfile, confirm, drop=drop)
+            else:
+                raise ValueError('Unknown format %r' % format)
+        finally:
+            if self.repo.config.init_cnxset_pool:
+                self.open_source_connections()
+
+
+    def init(self, activated, source_entity):
+        try:
+            # test if 'asource' column exists
+            query = self.dbhelper.sql_add_limit_offset('SELECT asource FROM entities', 1)
+            source_entity._cw.system_sql(query)
+        except Exception as ex:
+            self.eid_type_source = self.eid_type_source_pre_131
+        super(NativeSQLSource, self).init(activated, source_entity)
+        self.init_creating(source_entity._cw.cnxset)
+
+    def shutdown(self):
+        self.eid_generator.close()
+
+    # XXX deprecates [un]map_attribute?
+    def map_attribute(self, etype, attr, cb, sourcedb=True):
+        self._rql_sqlgen.attr_map[u'%s.%s' % (etype, attr)] = (cb, sourcedb)
+
+    def unmap_attribute(self, etype, attr):
+        self._rql_sqlgen.attr_map.pop(u'%s.%s' % (etype, attr), None)
+
+    def set_storage(self, etype, attr, storage):
+        storage_dict = self._storages.setdefault(etype, {})
+        storage_dict[attr] = storage
+        self.map_attribute(etype, attr,
+                           storage.callback, storage.is_source_callback)
+
+    def unset_storage(self, etype, attr):
+        self._storages[etype].pop(attr)
+        # if etype has no storage left, remove the entry
+        if not self._storages[etype]:
+            del self._storages[etype]
+        self.unmap_attribute(etype, attr)
+
+    def storage(self, etype, attr):
+        """return the storage for the given entity type / attribute
+        """
+        try:
+            return self._storages[etype][attr]
+        except KeyError:
+            raise Exception('no custom storage set for %s.%s' % (etype, attr))
+
+    # ISource interface #######################################################
+
+    @statsd_timeit
+    def compile_rql(self, rql, sols):
+        rqlst = self.repo.vreg.rqlhelper.parse(rql)
+        rqlst.restricted_vars = ()
+        rqlst.children[0].solutions = sols
+        self.repo.querier.sqlgen_annotate(rqlst)
+        set_qdata(self.schema.rschema, rqlst, ())
+        return rqlst
+
+    def set_schema(self, schema):
+        """set the instance'schema"""
+        self._cache = QueryCache(self.repo.config['rql-cache-size'])
+        self.cache_hit, self.cache_miss, self.no_cache = 0, 0, 0
+        self.schema = schema
+        try:
+            self._rql_sqlgen.schema = schema
+        except AttributeError:
+            pass # __init__
+        for authentifier in self.authentifiers:
+            authentifier.set_schema(self.schema)
+        clear_cache(self, 'need_fti_indexation')
+
+    def support_entity(self, etype, write=False):
+        """return true if the given entity's type is handled by this adapter
+        if write is true, return true only if it's a RW support
+        """
+        return not etype in NONSYSTEM_ETYPES
+
+    def support_relation(self, rtype, write=False):
+        """return true if the given relation's type is handled by this adapter
+        if write is true, return true only if it's a RW support
+        """
+        if write:
+            return not rtype in NONSYSTEM_RELATIONS
+        # due to current multi-sources implementation, the system source
+        # can't claim not supporting a relation
+        return True #not rtype == 'content_for'
+
+    @statsd_timeit
+    def authenticate(self, cnx, login, **kwargs):
+        """return CWUser eid for the given login and other authentication
+        information found in kwargs, else raise `AuthenticationError`
+        """
+        for authentifier in self.authentifiers:
+            try:
+                return authentifier.authenticate(cnx, login, **kwargs)
+            except AuthenticationError:
+                continue
+        raise AuthenticationError()
+
+    def syntax_tree_search(self, cnx, union, args=None, cachekey=None,
+                           varmap=None):
+        """return result from this source for a rql query (actually from
+        a rql syntax tree and a solution dictionary mapping each used
+        variable to a possible type). If cachekey is given, the query
+        necessary to fetch the results (but not the results themselves)
+        may be cached using this key.
+        """
+        assert dbg_st_search(self.uri, union, varmap, args, cachekey)
+        # remember number of actually selected term (sql generation may append some)
+        if cachekey is None:
+            self.no_cache += 1
+            # generate sql query if we are able to do so (not supported types...)
+            sql, qargs, cbs = self._rql_sqlgen.generate(union, args, varmap)
+        else:
+            # sql may be cached
+            try:
+                sql, qargs, cbs = self._cache[cachekey]
+                self.cache_hit += 1
+            except KeyError:
+                self.cache_miss += 1
+                sql, qargs, cbs = self._rql_sqlgen.generate(union, args, varmap)
+                self._cache[cachekey] = sql, qargs, cbs
+        args = self.merge_args(args, qargs)
+        assert isinstance(sql, string_types), repr(sql)
+        cursor = self.doexec(cnx, sql, args)
+        results = self.process_result(cursor, cnx, cbs)
+        assert dbg_results(results)
+        return results
+
+    @contextmanager
+    def _fixup_cw(self, cnx, entity):
+        _cw = entity._cw
+        entity._cw = cnx
+        try:
+            yield
+        finally:
+            entity._cw = _cw
+
+    @contextmanager
+    def _storage_handler(self, cnx, entity, event):
+        # 1/ memorize values as they are before the storage is called.
+        #    For instance, the BFSStorage will replace the `data`
+        #    binary value with a Binary containing the destination path
+        #    on the filesystem. To make the entity.data usage absolutely
+        #    transparent, we'll have to reset entity.data to its binary
+        #    value once the SQL query will be executed
+        restore_values = []
+        if isinstance(entity, list):
+            entities = entity
+        else:
+            entities = [entity]
+        etype = entities[0].__regid__
+        for attr, storage in self._storages.get(etype, {}).items():
+            for entity in entities:
+                with self._fixup_cw(cnx, entity):
+                    if event == 'deleted':
+                        storage.entity_deleted(entity, attr)
+                    else:
+                        edited = entity.cw_edited
+                        if attr in edited:
+                            handler = getattr(storage, 'entity_%s' % event)
+                            to_restore = handler(entity, attr)
+                            restore_values.append((entity, attr, to_restore))
+        try:
+            yield # 2/ execute the source's instructions
+        finally:
+            # 3/ restore original values
+            for entity, attr, value in restore_values:
+                entity.cw_edited.edited_attribute(attr, value)
+
+    def add_entity(self, cnx, entity):
+        """add a new entity to the source"""
+        with self._storage_handler(cnx, entity, 'added'):
+            attrs = self.preprocess_entity(entity)
+            sql = self.sqlgen.insert(SQL_PREFIX + entity.cw_etype, attrs)
+            self.doexec(cnx, sql, attrs)
+            if cnx.ertype_supports_undo(entity.cw_etype):
+                self._record_tx_action(cnx, 'tx_entity_actions', u'C',
+                                       etype=text_type(entity.cw_etype), eid=entity.eid)
+
+    def update_entity(self, cnx, entity):
+        """replace an entity in the source"""
+        with self._storage_handler(cnx, entity, 'updated'):
+            attrs = self.preprocess_entity(entity)
+            if cnx.ertype_supports_undo(entity.cw_etype):
+                changes = self._save_attrs(cnx, entity, attrs)
+                self._record_tx_action(cnx, 'tx_entity_actions', u'U',
+                                       etype=text_type(entity.cw_etype), eid=entity.eid,
+                                       changes=self._binary(pickle.dumps(changes)))
+            sql = self.sqlgen.update(SQL_PREFIX + entity.cw_etype, attrs,
+                                     ['cw_eid'])
+            self.doexec(cnx, sql, attrs)
+
+    def delete_entity(self, cnx, entity):
+        """delete an entity from the source"""
+        with self._storage_handler(cnx, entity, 'deleted'):
+            if cnx.ertype_supports_undo(entity.cw_etype):
+                attrs = [SQL_PREFIX + r.type
+                         for r in entity.e_schema.subject_relations()
+                         if (r.final or r.inlined) and not r in VIRTUAL_RTYPES]
+                changes = self._save_attrs(cnx, entity, attrs)
+                self._record_tx_action(cnx, 'tx_entity_actions', u'D',
+                                       etype=text_type(entity.cw_etype), eid=entity.eid,
+                                       changes=self._binary(pickle.dumps(changes)))
+            attrs = {'cw_eid': entity.eid}
+            sql = self.sqlgen.delete(SQL_PREFIX + entity.cw_etype, attrs)
+            self.doexec(cnx, sql, attrs)
+
+    def add_relation(self, cnx, subject, rtype, object, inlined=False):
+        """add a relation to the source"""
+        self._add_relations(cnx,  rtype, [(subject, object)], inlined)
+        if cnx.ertype_supports_undo(rtype):
+            self._record_tx_action(cnx, 'tx_relation_actions', u'A',
+                                   eid_from=subject, rtype=text_type(rtype), eid_to=object)
+
+    def add_relations(self, cnx,  rtype, subj_obj_list, inlined=False):
+        """add a relations to the source"""
+        self._add_relations(cnx, rtype, subj_obj_list, inlined)
+        if cnx.ertype_supports_undo(rtype):
+            for subject, object in subj_obj_list:
+                self._record_tx_action(cnx, 'tx_relation_actions', u'A',
+                                       eid_from=subject, rtype=text_type(rtype), eid_to=object)
+
+    def _add_relations(self, cnx, rtype, subj_obj_list, inlined=False):
+        """add a relation to the source"""
+        sql = []
+        if inlined is False:
+            attrs = [{'eid_from': subject, 'eid_to': object}
+                     for subject, object in subj_obj_list]
+            sql.append((self.sqlgen.insert('%s_relation' % rtype, attrs[0]), attrs))
+        else: # used by data import
+            etypes = {}
+            for subject, object in subj_obj_list:
+                etype = cnx.entity_metas(subject)['type']
+                if etype in etypes:
+                    etypes[etype].append((subject, object))
+                else:
+                    etypes[etype] = [(subject, object)]
+            for subj_etype, subj_obj_list in etypes.items():
+                attrs = [{'cw_eid': subject, SQL_PREFIX + rtype: object}
+                         for subject, object in subj_obj_list]
+                sql.append((self.sqlgen.update(SQL_PREFIX + etype, attrs[0],
+                                     ['cw_eid']),
+                            attrs))
+        for statement, attrs in sql:
+            self.doexecmany(cnx, statement, attrs)
+
+    def delete_relation(self, cnx, subject, rtype, object):
+        """delete a relation from the source"""
+        rschema = self.schema.rschema(rtype)
+        self._delete_relation(cnx, subject, rtype, object, rschema.inlined)
+        if cnx.ertype_supports_undo(rtype):
+            self._record_tx_action(cnx, 'tx_relation_actions', u'R',
+                                   eid_from=subject, rtype=text_type(rtype), eid_to=object)
+
+    def _delete_relation(self, cnx, subject, rtype, object, inlined=False):
+        """delete a relation from the source"""
+        if inlined:
+            table = SQL_PREFIX + cnx.entity_metas(subject)['type']
+            column = SQL_PREFIX + rtype
+            sql = 'UPDATE %s SET %s=NULL WHERE %seid=%%(eid)s' % (table, column,
+                                                                  SQL_PREFIX)
+            attrs = {'eid' : subject}
+        else:
+            attrs = {'eid_from': subject, 'eid_to': object}
+            sql = self.sqlgen.delete('%s_relation' % rtype, attrs)
+        self.doexec(cnx, sql, attrs)
+
+    @statsd_timeit
+    def doexec(self, cnx, query, args=None, rollback=True):
+        """Execute a query.
+        it's a function just so that it shows up in profiling
+        """
+        cursor = cnx.cnxset.cu
+        if server.DEBUG & server.DBG_SQL:
+            print('exec', query, args, cnx.cnxset.cnx)
+        try:
+            # str(query) to avoid error if it's a unicode string
+            cursor.execute(str(query), args)
+        except Exception as ex:
+            if self.repo.config.mode != 'test':
+                # during test we get those message when trying to alter sqlite
+                # db schema
+                self.info("sql: %r\n args: %s\ndbms message: %r",
+                              query, args, ex.args[0])
+            if rollback:
+                try:
+                    cnx.cnxset.rollback()
+                    if self.repo.config.mode != 'test':
+                        self.debug('transaction has been rolled back')
+                except Exception as ex:
+                    pass
+            if ex.__class__.__name__ == 'IntegrityError':
+                # need string comparison because of various backends
+                for arg in ex.args:
+                    # postgres, sqlserver
+                    mo = re.search("unique_[a-z0-9]{32}", arg)
+                    if mo is not None:
+                        raise UniqueTogetherError(cnx, cstrname=mo.group(0))
+                    # old sqlite
+                    mo = re.search('columns? (.*) (?:is|are) not unique', arg)
+                    if mo is not None: # sqlite in use
+                        # we left chop the 'cw_' prefix of attribute names
+                        rtypes = [c.strip()[3:]
+                                  for c in mo.group(1).split(',')]
+                        raise UniqueTogetherError(cnx, rtypes=rtypes)
+                    # sqlite after http://www.sqlite.org/cgi/src/info/c80e229dd9c1230a
+                    if arg.startswith('UNIQUE constraint failed:'):
+                        # message looks like: "UNIQUE constraint failed: foo.cw_bar, foo.cw_baz"
+                        # so drop the prefix, split on comma, drop the tablenames, and drop "cw_"
+                        columns = arg.split(':', 1)[1].split(',')
+                        rtypes = [c.split('.', 1)[1].strip()[3:] for c in columns]
+                        raise UniqueTogetherError(cnx, rtypes=rtypes)
+
+                    mo = re.search('"cstr[a-f0-9]{32}"', arg)
+                    if mo is not None:
+                        # postgresql
+                        raise ViolatedConstraint(cnx, cstrname=mo.group(0)[1:-1])
+                    if arg.startswith('CHECK constraint failed:'):
+                        # sqlite3 (new)
+                        raise ViolatedConstraint(cnx, cstrname=arg.split(':', 1)[1].strip())
+                    mo = re.match('^constraint (cstr.*) failed$', arg)
+                    if mo is not None:
+                        # sqlite3 (old)
+                        raise ViolatedConstraint(cnx, cstrname=mo.group(1))
+            raise
+        return cursor
+
+    @statsd_timeit
+    def doexecmany(self, cnx, query, args):
+        """Execute a query.
+        it's a function just so that it shows up in profiling
+        """
+        if server.DEBUG & server.DBG_SQL:
+            print('execmany', query, 'with', len(args), 'arguments', cnx.cnxset.cnx)
+        cursor = cnx.cnxset.cu
+        try:
+            # str(query) to avoid error if it's a unicode string
+            cursor.executemany(str(query), args)
+        except Exception as ex:
+            if self.repo.config.mode != 'test':
+                # during test we get those message when trying to alter sqlite
+                # db schema
+                self.critical("sql many: %r\n args: %s\ndbms message: %r",
+                              query, args, ex.args[0])
+            try:
+                cnx.cnxset.rollback()
+                if self.repo.config.mode != 'test':
+                    self.critical('transaction has been rolled back')
+            except Exception:
+                pass
+            raise
+
+    # short cut to method requiring advanced db helper usage ##################
+
+    def update_rdef_column(self, cnx, rdef):
+        """update physical column for a relation definition (final or inlined)
+        """
+        table, column = rdef_table_column(rdef)
+        coltype, allownull = rdef_physical_info(self.dbhelper, rdef)
+        if not self.dbhelper.alter_column_support:
+            self.error("backend can't alter %s.%s to %s%s", table, column, coltype,
+                       not allownull and 'NOT NULL' or '')
+            return
+        self.dbhelper.change_col_type(LogCursor(cnx.cnxset.cu),
+                                      table, column, coltype, allownull)
+        self.info('altered %s.%s: now %s%s', table, column, coltype,
+                  not allownull and 'NOT NULL' or '')
+
+    def update_rdef_null_allowed(self, cnx, rdef):
+        """update NULL / NOT NULL of physical column for a relation definition
+        (final or inlined)
+        """
+        if not self.dbhelper.alter_column_support:
+            # not supported (and NOT NULL not set by yams in that case, so no
+            # worry)
+            return
+        table, column = rdef_table_column(rdef)
+        coltype, allownull = rdef_physical_info(self.dbhelper, rdef)
+        self.dbhelper.set_null_allowed(LogCursor(cnx.cnxset.cu),
+                                       table, column, coltype, allownull)
+
+    def update_rdef_indexed(self, cnx, rdef):
+        table, column = rdef_table_column(rdef)
+        if rdef.indexed:
+            self.create_index(cnx, table, column)
+        else:
+            self.drop_index(cnx, table, column)
+
+    def update_rdef_unique(self, cnx, rdef):
+        table, column = rdef_table_column(rdef)
+        if rdef.constraint_by_type('UniqueConstraint'):
+            self.create_index(cnx, table, column, unique=True)
+        else:
+            self.drop_index(cnx, table, column, unique=True)
+
+    def create_index(self, cnx, table, column, unique=False):
+        cursor = LogCursor(cnx.cnxset.cu)
+        self.dbhelper.create_index(cursor, table, column, unique)
+
+    def drop_index(self, cnx, table, column, unique=False):
+        cursor = LogCursor(cnx.cnxset.cu)
+        self.dbhelper.drop_index(cursor, table, column, unique)
+
+    # system source interface #################################################
+
+    def _eid_type_source(self, cnx, eid, sql):
+        try:
+            res = self.doexec(cnx, sql).fetchone()
+            if res is not None:
+                return res
+        except Exception:
+            self.exception('failed to query entities table for eid %s', eid)
+        raise UnknownEid(eid)
+
+    def eid_type_source(self, cnx, eid): # pylint: disable=E0202
+        """return a tuple (type, extid, source) for the entity with id <eid>"""
+        sql = 'SELECT type, extid, asource FROM entities WHERE eid=%s' % eid
+        res = self._eid_type_source(cnx, eid, sql)
+        if not isinstance(res, list):
+            res = list(res)
+        res[-2] = self.decode_extid(res[-2])
+        return res
+
+    def eid_type_source_pre_131(self, cnx, eid):
+        """return a tuple (type, extid, source) for the entity with id <eid>"""
+        sql = 'SELECT type, extid FROM entities WHERE eid=%s' % eid
+        res = self._eid_type_source(cnx, eid, sql)
+        if not isinstance(res, list):
+            res = list(res)
+        res[-1] = self.decode_extid(res[-1])
+        res.append("system")
+        return res
+
+    def extid2eid(self, cnx, extid):
+        """get eid from an external id. Return None if no record found."""
+        assert isinstance(extid, binary_type)
+        args = {'x': b64encode(extid).decode('ascii')}
+        cursor = self.doexec(cnx,
+                             'SELECT eid FROM entities WHERE extid=%(x)s',
+                             args)
+        # XXX testing rowcount cause strange bug with sqlite, results are there
+        #     but rowcount is 0
+        #if cursor.rowcount > 0:
+        try:
+            result = cursor.fetchone()
+            if result:
+                return result[0]
+        except Exception:
+            pass
+        cursor = self.doexec(cnx,
+                             'SELECT eid FROM moved_entities WHERE extid=%(x)s',
+                             args)
+        try:
+            result = cursor.fetchone()
+            if result:
+                # entity was moved to the system source, return negative
+                # number to tell the external source to ignore it
+                return -result[0]
+        except Exception:
+            pass
+        return None
+
+    def _handle_is_relation_sql(self, cnx, sql, attrs):
+        """ Handler for specific is_relation sql that may be
+        overwritten in some stores"""
+        self.doexec(cnx, sql % attrs)
+
+    _handle_insert_entity_sql = doexec
+    _handle_is_instance_of_sql = _handle_source_relation_sql = _handle_is_relation_sql
+
+    def add_info(self, cnx, entity, source, extid):
+        """add type and source info for an eid into the system table"""
+        assert cnx.cnxset is not None
+        # begin by inserting eid/type/source/extid into the entities table
+        if extid is not None:
+            assert isinstance(extid, binary_type)
+            extid = b64encode(extid).decode('ascii')
+        attrs = {'type': text_type(entity.cw_etype), 'eid': entity.eid, 'extid': extid,
+                 'asource': text_type(source.uri)}
+        self._handle_insert_entity_sql(cnx, self.sqlgen.insert('entities', attrs), attrs)
+        # insert core relations: is, is_instance_of and cw_source
+        try:
+            self._handle_is_relation_sql(cnx, 'INSERT INTO is_relation(eid_from,eid_to) VALUES (%s,%s)',
+                                         (entity.eid, eschema_eid(cnx, entity.e_schema)))
+        except IndexError:
+            # during schema serialization, skip
+            pass
+        else:
+            for eschema in entity.e_schema.ancestors() + [entity.e_schema]:
+                self._handle_is_relation_sql(cnx,
+                                             'INSERT INTO is_instance_of_relation(eid_from,eid_to) VALUES (%s,%s)',
+                                             (entity.eid, eschema_eid(cnx, eschema)))
+        if 'CWSource' in self.schema and source.eid is not None: # else, cw < 3.10
+            self._handle_is_relation_sql(cnx, 'INSERT INTO cw_source_relation(eid_from,eid_to) VALUES (%s,%s)',
+                                         (entity.eid, source.eid))
+        # now we can update the full text index
+        if self.need_fti_indexation(entity.cw_etype):
+            self.index_entity(cnx, entity=entity)
+
+    def update_info(self, cnx, entity, need_fti_update):
+        """mark entity as being modified, fulltext reindex if needed"""
+        if need_fti_update:
+            # reindex the entity only if this query is updating at least
+            # one indexable attribute
+            self.index_entity(cnx, entity=entity)
+
+    def delete_info_multi(self, cnx, entities):
+        """delete system information on deletion of a list of entities with the
+        same etype and belinging to the same source
+
+        * update the fti
+        * remove record from the `entities` table
+        """
+        self.fti_unindex_entities(cnx, entities)
+        attrs = {'eid': '(%s)' % ','.join([str(_e.eid) for _e in entities])}
+        self.doexec(cnx, self.sqlgen.delete_many('entities', attrs), attrs)
+
+    # undo support #############################################################
+
+    def undoable_transactions(self, cnx, ueid=None, **actionfilters):
+        """See :class:`cubicweb.repoapi.Connection.undoable_transactions`"""
+        # force filtering to connection's user if not a manager
+        if not cnx.user.is_in_group('managers'):
+            ueid = cnx.user.eid
+        restr = {}
+        if ueid is not None:
+            restr['tx_user'] = ueid
+        sql = self.sqlgen.select('transactions', restr, ('tx_uuid', 'tx_time', 'tx_user'))
+        if actionfilters:
+            # we will need subqueries to filter transactions according to
+            # actions done
+            tearestr = {} # filters on the tx_entity_actions table
+            trarestr = {} # filters on the tx_relation_actions table
+            genrestr = {} # generic filters, appliyable to both table
+            # unless public explicitly set to false, we only consider public
+            # actions
+            if actionfilters.pop('public', True):
+                genrestr['txa_public'] = True
+            # put additional filters in trarestr and/or tearestr
+            for key, val in actionfilters.items():
+                if key == 'etype':
+                    # filtering on etype implies filtering on entity actions
+                    # only, and with no eid specified
+                    assert actionfilters.get('action', 'C') in 'CUD'
+                    assert not 'eid' in actionfilters
+                    tearestr['etype'] = text_type(val)
+                elif key == 'eid':
+                    # eid filter may apply to 'eid' of tx_entity_actions or to
+                    # 'eid_from' OR 'eid_to' of tx_relation_actions
+                    if actionfilters.get('action', 'C') in 'CUD':
+                        tearestr['eid'] = val
+                    if actionfilters.get('action', 'A') in 'AR':
+                        trarestr['eid_from'] = val
+                        trarestr['eid_to'] = val
+                elif key == 'action':
+                    if val in 'CUD':
+                        tearestr['txa_action'] = text_type(val)
+                    else:
+                        assert val in 'AR'
+                        trarestr['txa_action'] = text_type(val)
+                else:
+                    raise AssertionError('unknow filter %s' % key)
+            assert trarestr or tearestr, "can't only filter on 'public'"
+            subqsqls = []
+            # append subqueries to the original query, using EXISTS()
+            if trarestr or (genrestr and not tearestr):
+                trarestr.update(genrestr)
+                trasql = self.sqlgen.select('tx_relation_actions', trarestr, ('1',))
+                if 'eid_from' in trarestr:
+                    # replace AND by OR between eid_from/eid_to restriction
+                    trasql = sql_or_clauses(trasql, ['eid_from = %(eid_from)s',
+                                                     'eid_to = %(eid_to)s'])
+                trasql += ' AND transactions.tx_uuid=tx_relation_actions.tx_uuid'
+                subqsqls.append('EXISTS(%s)' % trasql)
+            if tearestr or (genrestr and not trarestr):
+                tearestr.update(genrestr)
+                teasql = self.sqlgen.select('tx_entity_actions', tearestr, ('1',))
+                teasql += ' AND transactions.tx_uuid=tx_entity_actions.tx_uuid'
+                subqsqls.append('EXISTS(%s)' % teasql)
+            if restr:
+                sql += ' AND %s' % ' OR '.join(subqsqls)
+            else:
+                sql += ' WHERE %s' % ' OR '.join(subqsqls)
+            restr.update(trarestr)
+            restr.update(tearestr)
+        # we want results ordered by transaction's time descendant
+        sql += ' ORDER BY tx_time DESC'
+        cu = self.doexec(cnx, sql, restr)
+        # turn results into transaction objects
+        return [tx.Transaction(cnx, *args) for args in cu.fetchall()]
+
+    def tx_info(self, cnx, txuuid):
+        """See :class:`cubicweb.repoapi.Connection.transaction_info`"""
+        return tx.Transaction(cnx, txuuid, *self._tx_info(cnx, text_type(txuuid)))
+
+    def tx_actions(self, cnx, txuuid, public):
+        """See :class:`cubicweb.repoapi.Connection.transaction_actions`"""
+        txuuid = text_type(txuuid)
+        self._tx_info(cnx, txuuid)
+        restr = {'tx_uuid': txuuid}
+        if public:
+            restr['txa_public'] = True
+        # XXX use generator to avoid loading everything in memory?
+        sql = self.sqlgen.select('tx_entity_actions', restr,
+                                 ('txa_action', 'txa_public', 'txa_order',
+                                  'etype', 'eid', 'changes'))
+        with cnx.ensure_cnx_set:
+            cu = self.doexec(cnx, sql, restr)
+            actions = [tx.EntityAction(a,p,o,et,e,c and pickle.loads(self.binary_to_str(c)))
+                       for a,p,o,et,e,c in cu.fetchall()]
+        sql = self.sqlgen.select('tx_relation_actions', restr,
+                                 ('txa_action', 'txa_public', 'txa_order',
+                                  'rtype', 'eid_from', 'eid_to'))
+        with cnx.ensure_cnx_set:
+            cu = self.doexec(cnx, sql, restr)
+            actions += [tx.RelationAction(*args) for args in cu.fetchall()]
+        return sorted(actions, key=lambda x: x.order)
+
+    def undo_transaction(self, cnx, txuuid):
+        """See :class:`cubicweb.repoapi.Connection.undo_transaction`
+
+        important note: while undoing of a transaction, only hooks in the
+        'integrity', 'activeintegrity' and 'undo' categories are called.
+        """
+        errors = []
+        cnx.transaction_data['undoing_uuid'] = txuuid
+        with cnx.deny_all_hooks_but('integrity', 'activeintegrity', 'undo'):
+            with cnx.security_enabled(read=False):
+                for action in reversed(self.tx_actions(cnx, txuuid, False)):
+                    undomethod = getattr(self, '_undo_%s' % action.action.lower())
+                    errors += undomethod(cnx, action)
+        # remove the transactions record
+        self.doexec(cnx,
+                    "DELETE FROM transactions WHERE tx_uuid='%s'" % txuuid)
+        if errors:
+            raise UndoTransactionException(txuuid, errors)
+        else:
+            return
+
+    def start_undoable_transaction(self, cnx, uuid):
+        """connection callback to insert a transaction record in the transactions
+        table when some undoable transaction is started
+        """
+        ueid = cnx.user.eid
+        attrs = {'tx_uuid': uuid, 'tx_user': ueid, 'tx_time': datetime.utcnow()}
+        self.doexec(cnx, self.sqlgen.insert('transactions', attrs), attrs)
+
+    def _save_attrs(self, cnx, entity, attrs):
+        """return a pickleable dictionary containing current values for given
+        attributes of the entity
+        """
+        restr = {'cw_eid': entity.eid}
+        sql = self.sqlgen.select(SQL_PREFIX + entity.cw_etype, restr, attrs)
+        cu = self.doexec(cnx, sql, restr)
+        values = dict(zip(attrs, cu.fetchone()))
+        # ensure backend specific binary are converted back to string
+        eschema = entity.e_schema
+        for column in attrs:
+            # [3:] remove 'cw_' prefix
+            attr = column[3:]
+            if not eschema.subjrels[attr].final:
+                continue
+            if eschema.destination(attr) in ('Password', 'Bytes'):
+                value = values[column]
+                if value is not None:
+                    values[column] = self.binary_to_str(value)
+        return values
+
+    def _record_tx_action(self, cnx, table, action, **kwargs):
+        """record a transaction action in the given table (either
+        'tx_entity_actions' or 'tx_relation_action')
+        """
+        kwargs['tx_uuid'] = cnx.transaction_uuid()
+        kwargs['txa_action'] = action
+        kwargs['txa_order'] = cnx.transaction_inc_action_counter()
+        kwargs['txa_public'] = not cnx.hooks_in_progress
+        self.doexec(cnx, self.sqlgen.insert(table, kwargs), kwargs)
+
+    def _tx_info(self, cnx, txuuid):
+        """return transaction's time and user of the transaction with the given uuid.
+
+        raise `NoSuchTransaction` if there is no such transaction of if the
+        connection's user isn't allowed to see it.
+        """
+        restr = {'tx_uuid': txuuid}
+        sql = self.sqlgen.select('transactions', restr,
+                                 ('tx_time', 'tx_user'))
+        cu = self.doexec(cnx, sql, restr)
+        try:
+            time, ueid = cu.fetchone()
+        except TypeError:
+            raise tx.NoSuchTransaction(txuuid)
+        if not (cnx.user.is_in_group('managers')
+                or cnx.user.eid == ueid):
+            raise tx.NoSuchTransaction(txuuid)
+        return time, ueid
+
+    def _reedit_entity(self, entity, changes, err):
+        cnx = entity._cw
+        eid = entity.eid
+        entity.cw_edited = edited = EditedEntity(entity)
+        # check for schema changes, entities linked through inlined relation
+        # still exists, rewrap binary values
+        eschema = entity.e_schema
+        getrschema = eschema.subjrels
+        for column, value in changes.items():
+            rtype = column[len(SQL_PREFIX):]
+            if rtype == "eid":
+                continue # XXX should even `eid` be stored in action changes?
+            try:
+                rschema = getrschema[rtype]
+            except KeyError:
+                err(cnx._("can't restore relation %(rtype)s of entity %(eid)s, "
+                              "this relation does not exist in the schema anymore.")
+                    % {'rtype': rtype, 'eid': eid})
+            if not rschema.final:
+                if not rschema.inlined:
+                    assert value is None
+                # rschema is an inlined relation
+                elif value is not None:
+                    # not a deletion: we must put something in edited
+                    try:
+                        entity._cw.entity_from_eid(value) # check target exists
+                        edited[rtype] = value
+                    except UnknownEid:
+                        err(cnx._("can't restore entity %(eid)s of type %(eschema)s, "
+                                      "target of %(rtype)s (eid %(value)s) does not exist any longer")
+                            % locals())
+                        changes[column] = None
+            elif eschema.destination(rtype) in ('Bytes', 'Password'):
+                changes[column] = self._binary(value)
+                edited[rtype] = Binary(value)
+            elif PY2 and isinstance(value, str):
+                edited[rtype] = text_type(value, cnx.encoding, 'replace')
+            else:
+                edited[rtype] = value
+        # This must only be done after init_entitiy_caches : defered in calling functions
+        # edited.check()
+
+    def _undo_d(self, cnx, action):
+        """undo an entity deletion"""
+        errors = []
+        err = errors.append
+        eid = action.eid
+        etype = action.etype
+        _ = cnx._
+        # get an entity instance
+        try:
+            entity = self.repo.vreg['etypes'].etype_class(etype)(cnx)
+        except Exception:
+            err("can't restore entity %s of type %s, type no more supported"
+                % (eid, etype))
+            return errors
+        self._reedit_entity(entity, action.changes, err)
+        entity.eid = eid
+        cnx.repo.init_entity_caches(cnx, entity, self)
+        entity.cw_edited.check()
+        self.repo.hm.call_hooks('before_add_entity', cnx, entity=entity)
+        # restore the entity
+        action.changes['cw_eid'] = eid
+        # restore record in entities (will update fti if needed)
+        self.add_info(cnx, entity, self, None)
+        sql = self.sqlgen.insert(SQL_PREFIX + etype, action.changes)
+        self.doexec(cnx, sql, action.changes)
+        self.repo.hm.call_hooks('after_add_entity', cnx, entity=entity)
+        return errors
+
+    def _undo_r(self, cnx, action):
+        """undo a relation removal"""
+        errors = []
+        subj, rtype, obj = action.eid_from, action.rtype, action.eid_to
+        try:
+            sentity, oentity, rdef = _undo_rel_info(cnx, subj, rtype, obj)
+        except _UndoException as ex:
+            errors.append(text_type(ex))
+        else:
+            for role, entity in (('subject', sentity),
+                                 ('object', oentity)):
+                try:
+                    _undo_check_relation_target(entity, rdef, role)
+                except _UndoException as ex:
+                    errors.append(text_type(ex))
+                    continue
+        if not errors:
+            self.repo.hm.call_hooks('before_add_relation', cnx,
+                                    eidfrom=subj, rtype=rtype, eidto=obj)
+            # add relation in the database
+            self._add_relations(cnx, rtype, [(subj, obj)], rdef.rtype.inlined)
+            # set related cache
+            cnx.update_rel_cache_add(subj, rtype, obj, rdef.rtype.symmetric)
+            self.repo.hm.call_hooks('after_add_relation', cnx,
+                                    eidfrom=subj, rtype=rtype, eidto=obj)
+        return errors
+
+    def _undo_c(self, cnx, action):
+        """undo an entity creation"""
+        eid = action.eid
+        # XXX done to avoid fetching all remaining relation for the entity
+        # we should find an efficient way to do this (keeping current veolidf
+        # massive deletion performance)
+        if _undo_has_later_transaction(cnx, eid):
+            msg = cnx._('some later transaction(s) touch entity, undo them '
+                            'first')
+            raise ValidationError(eid, {None: msg})
+        etype = action.etype
+        # get an entity instance
+        try:
+            entity = self.repo.vreg['etypes'].etype_class(etype)(cnx)
+        except Exception:
+            return [cnx._(
+                "Can't undo creation of entity %(eid)s of type %(etype)s, type "
+                "no more supported" % {'eid': eid, 'etype': etype})]
+        entity.eid = eid
+        # for proper eid/type cache update
+        CleanupDeletedEidsCacheOp.get_instance(cnx).add_data(eid)
+        self.repo.hm.call_hooks('before_delete_entity', cnx, entity=entity)
+        # remove is / is_instance_of which are added using sql by hooks, hence
+        # unvisible as transaction action
+        self.doexec(cnx, 'DELETE FROM is_relation WHERE eid_from=%s' % eid)
+        self.doexec(cnx, 'DELETE FROM is_instance_of_relation WHERE eid_from=%s' % eid)
+        self.doexec(cnx, 'DELETE FROM cw_source_relation WHERE eid_from=%s' % eid)
+        # XXX check removal of inlined relation?
+        # delete the entity
+        attrs = {'cw_eid': eid}
+        sql = self.sqlgen.delete(SQL_PREFIX + entity.cw_etype, attrs)
+        self.doexec(cnx, sql, attrs)
+        # remove record from entities (will update fti if needed)
+        self.delete_info_multi(cnx, [entity])
+        self.repo.hm.call_hooks('after_delete_entity', cnx, entity=entity)
+        return ()
+
+    def _undo_u(self, cnx, action):
+        """undo an entity update"""
+        errors = []
+        err = errors.append
+        try:
+            entity = cnx.entity_from_eid(action.eid)
+        except UnknownEid:
+            err(cnx._("can't restore state of entity %s, it has been "
+                          "deleted inbetween") % action.eid)
+            return errors
+        self._reedit_entity(entity, action.changes, err)
+        entity.cw_edited.check()
+        self.repo.hm.call_hooks('before_update_entity', cnx, entity=entity)
+        sql = self.sqlgen.update(SQL_PREFIX + entity.cw_etype, action.changes,
+                                 ['cw_eid'])
+        self.doexec(cnx, sql, action.changes)
+        self.repo.hm.call_hooks('after_update_entity', cnx, entity=entity)
+        return errors
+
+    def _undo_a(self, cnx, action):
+        """undo a relation addition"""
+        errors = []
+        subj, rtype, obj = action.eid_from, action.rtype, action.eid_to
+        try:
+            sentity, oentity, rdef = _undo_rel_info(cnx, subj, rtype, obj)
+        except _UndoException as ex:
+            errors.append(text_type(ex))
+        else:
+            rschema = rdef.rtype
+            if rschema.inlined:
+                sql = 'SELECT 1 FROM cw_%s WHERE cw_eid=%s and cw_%s=%s'\
+                      % (sentity.cw_etype, subj, rtype, obj)
+            else:
+                sql = 'SELECT 1 FROM %s_relation WHERE eid_from=%s and eid_to=%s'\
+                      % (rtype, subj, obj)
+            cu = self.doexec(cnx, sql)
+            if cu.fetchone() is None:
+                errors.append(cnx._(
+                    "Can't undo addition of relation %(rtype)s from %(subj)s to"
+                    " %(obj)s, doesn't exist anymore" % locals()))
+        if not errors:
+            self.repo.hm.call_hooks('before_delete_relation', cnx,
+                                    eidfrom=subj, rtype=rtype, eidto=obj)
+            # delete relation from the database
+            self._delete_relation(cnx, subj, rtype, obj, rschema.inlined)
+            # set related cache
+            cnx.update_rel_cache_del(subj, rtype, obj, rschema.symmetric)
+            self.repo.hm.call_hooks('after_delete_relation', cnx,
+                                    eidfrom=subj, rtype=rtype, eidto=obj)
+        return errors
+
+    # full text index handling #################################################
+
+    @cached
+    def need_fti_indexation(self, etype):
+        eschema = self.schema.eschema(etype)
+        if any(eschema.indexable_attributes()):
+            return True
+        if any(eschema.fulltext_containers()):
+            return True
+        return False
+
+    def index_entity(self, cnx, entity):
+        """create an operation to [re]index textual content of the given entity
+        on commit
+        """
+        if self.do_fti:
+            FTIndexEntityOp.get_instance(cnx).add_data(entity.eid)
+
+    def fti_unindex_entities(self, cnx, entities):
+        """remove text content for entities from the full text index
+        """
+        cursor = cnx.cnxset.cu
+        cursor_unindex_object = self.dbhelper.cursor_unindex_object
+        try:
+            for entity in entities:
+                cursor_unindex_object(entity.eid, cursor)
+        except Exception: # let KeyboardInterrupt / SystemExit propagate
+            self.exception('error while unindexing %s', entity)
+
+
+    def fti_index_entities(self, cnx, entities):
+        """add text content of created/modified entities to the full text index
+        """
+        cursor_index_object = self.dbhelper.cursor_index_object
+        cursor = cnx.cnxset.cu
+        try:
+            # use cursor_index_object, not cursor_reindex_object since
+            # unindexing done in the FTIndexEntityOp
+            for entity in entities:
+                cursor_index_object(entity.eid,
+                                    entity.cw_adapt_to('IFTIndexable'),
+                                    cursor)
+        except Exception: # let KeyboardInterrupt / SystemExit propagate
+            self.exception('error while indexing %s', entity)
+
+
+class FTIndexEntityOp(hook.DataOperationMixIn, hook.LateOperation):
+    """operation to delay entity full text indexation to commit
+
+    since fti indexing may trigger discovery of other entities, it should be
+    triggered on precommit, not commit, and this should be done after other
+    precommit operation which may add relations to the entity
+    """
+
+    def precommit_event(self):
+        cnx = self.cnx
+        source = cnx.repo.system_source
+        pendingeids = cnx.transaction_data.get('pendingeids', ())
+        done = cnx.transaction_data.setdefault('indexedeids', set())
+        to_reindex = set()
+        for eid in self.get_data():
+            if eid in pendingeids or eid in done:
+                # entity added and deleted in the same transaction or already
+                # processed
+                continue
+            done.add(eid)
+            iftindexable = cnx.entity_from_eid(eid).cw_adapt_to('IFTIndexable')
+            to_reindex |= set(iftindexable.fti_containers())
+        source.fti_unindex_entities(cnx, to_reindex)
+        source.fti_index_entities(cnx, to_reindex)
+
+def sql_schema(driver):
+    helper = get_db_helper(driver)
+    typemap = helper.TYPE_MAPPING
+    schema = """
+/* Create the repository's system database */
+
+%s
+
+CREATE TABLE entities (
+  eid INTEGER PRIMARY KEY NOT NULL,
+  type VARCHAR(64) NOT NULL,
+  asource VARCHAR(128) NOT NULL,
+  extid VARCHAR(256)
+);;
+CREATE INDEX entities_type_idx ON entities(type);;
+CREATE TABLE moved_entities (
+  eid INTEGER PRIMARY KEY NOT NULL,
+  extid VARCHAR(256) UNIQUE NOT NULL
+);;
+
+CREATE TABLE transactions (
+  tx_uuid CHAR(32) PRIMARY KEY NOT NULL,
+  tx_user INTEGER NOT NULL,
+  tx_time %s NOT NULL
+);;
+CREATE INDEX transactions_tx_user_idx ON transactions(tx_user);;
+CREATE INDEX transactions_tx_time_idx ON transactions(tx_time);;
+
+CREATE TABLE tx_entity_actions (
+  tx_uuid CHAR(32) REFERENCES transactions(tx_uuid) ON DELETE CASCADE,
+  txa_action CHAR(1) NOT NULL,
+  txa_public %s NOT NULL,
+  txa_order INTEGER,
+  eid INTEGER NOT NULL,
+  etype VARCHAR(64) NOT NULL,
+  changes %s
+);;
+CREATE INDEX tx_entity_actions_txa_action_idx ON tx_entity_actions(txa_action);;
+CREATE INDEX tx_entity_actions_txa_public_idx ON tx_entity_actions(txa_public);;
+CREATE INDEX tx_entity_actions_eid_idx ON tx_entity_actions(eid);;
+CREATE INDEX tx_entity_actions_etype_idx ON tx_entity_actions(etype);;
+CREATE INDEX tx_entity_actions_tx_uuid_idx ON tx_entity_actions(tx_uuid);;
+
+CREATE TABLE tx_relation_actions (
+  tx_uuid CHAR(32) REFERENCES transactions(tx_uuid) ON DELETE CASCADE,
+  txa_action CHAR(1) NOT NULL,
+  txa_public %s NOT NULL,
+  txa_order INTEGER,
+  eid_from INTEGER NOT NULL,
+  eid_to INTEGER NOT NULL,
+  rtype VARCHAR(256) NOT NULL
+);;
+CREATE INDEX tx_relation_actions_txa_action_idx ON tx_relation_actions(txa_action);;
+CREATE INDEX tx_relation_actions_txa_public_idx ON tx_relation_actions(txa_public);;
+CREATE INDEX tx_relation_actions_eid_from_idx ON tx_relation_actions(eid_from);;
+CREATE INDEX tx_relation_actions_eid_to_idx ON tx_relation_actions(eid_to);;
+CREATE INDEX tx_relation_actions_tx_uuid_idx ON tx_relation_actions(tx_uuid);;
+""" % (helper.sql_create_numrange('entities_id_seq').replace(';', ';;'),
+       typemap['Datetime'],
+       typemap['Boolean'], typemap['Bytes'], typemap['Boolean'])
+    if helper.backend_name == 'sqlite':
+        # sqlite support the ON DELETE CASCADE syntax but do nothing
+        schema += '''
+CREATE TRIGGER fkd_transactions
+BEFORE DELETE ON transactions
+FOR EACH ROW BEGIN
+    DELETE FROM tx_entity_actions WHERE tx_uuid=OLD.tx_uuid;
+    DELETE FROM tx_relation_actions WHERE tx_uuid=OLD.tx_uuid;
+END;;
+'''
+    schema += ';;'.join(helper.sqls_create_multicol_unique_index('entities', ['extid']))
+    schema += ';;\n'
+    return schema
+
+
+def sql_drop_schema(driver):
+    helper = get_db_helper(driver)
+    return """
+%s;
+%s
+DROP TABLE entities;
+DROP TABLE tx_entity_actions;
+DROP TABLE tx_relation_actions;
+DROP TABLE transactions;
+""" % (';'.join(helper.sqls_drop_multicol_unique_index('entities', ['extid'])),
+       helper.sql_drop_numrange('entities_id_seq'))
+
+
+def grant_schema(user, set_owner=True):
+    result = ''
+    for table in ('entities', 'entities_id_seq',
+                  'transactions', 'tx_entity_actions', 'tx_relation_actions'):
+        if set_owner:
+            result = 'ALTER TABLE %s OWNER TO %s;\n' % (table, user)
+        result += 'GRANT ALL ON %s TO %s;\n' % (table, user)
+    return result
+
+
+class BaseAuthentifier(object):
+
+    def __init__(self, source=None):
+        self.source = source
+
+    def set_schema(self, schema):
+        """set the instance'schema"""
+        pass
+
+class LoginPasswordAuthentifier(BaseAuthentifier):
+    passwd_rql = 'Any P WHERE X is CWUser, X login %(login)s, X upassword P'
+    auth_rql = (u'Any X WHERE X is CWUser, X login %(login)s, X upassword %(pwd)s, '
+                'X cw_source S, S name "system"')
+    _sols = ({'X': 'CWUser', 'P': 'Password', 'S': 'CWSource'},)
+
+    def set_schema(self, schema):
+        """set the instance'schema"""
+        if 'CWUser' in schema: # probably an empty schema if not true...
+            # rql syntax trees used to authenticate users
+            self._passwd_rqlst = self.source.compile_rql(self.passwd_rql, self._sols)
+            self._auth_rqlst = self.source.compile_rql(self.auth_rql, self._sols)
+
+    def authenticate(self, cnx, login, password=None, **kwargs):
+        """return CWUser eid for the given login/password if this account is
+        defined in this source, else raise `AuthenticationError`
+
+        two queries are needed since passwords are stored crypted, so we have
+        to fetch the salt first
+        """
+        args = {'login': login, 'pwd' : None}
+        if password is not None:
+            rset = self.source.syntax_tree_search(cnx, self._passwd_rqlst, args)
+            try:
+                pwd = rset[0][0]
+            except IndexError:
+                raise AuthenticationError('bad login')
+            if pwd is None:
+                # if pwd is None but a password is provided, something is wrong
+                raise AuthenticationError('bad password')
+            # passwords are stored using the Bytes type, so we get a StringIO
+            args['pwd'] = Binary(crypt_password(password, pwd.getvalue()))
+        # get eid from login and (crypted) password
+        rset = self.source.syntax_tree_search(cnx, self._auth_rqlst, args)
+        pwd = args['pwd']
+        try:
+            user = rset[0][0]
+            # If the stored hash uses a deprecated scheme (e.g. DES or MD5 used
+            # before 3.14.7), update with a fresh one
+            if pwd is not None and pwd.getvalue():
+                verify, newhash = verify_and_update(password, pwd.getvalue())
+                if not verify: # should not happen, but...
+                    raise AuthenticationError('bad password')
+                if newhash:
+                    cnx.system_sql("UPDATE %s SET %s=%%(newhash)s WHERE %s=%%(login)s" % (
+                                        SQL_PREFIX + 'CWUser',
+                                        SQL_PREFIX + 'upassword',
+                                        SQL_PREFIX + 'login'),
+                                       {'newhash': self.source._binary(newhash.encode('ascii')),
+                                        'login': login})
+                    cnx.commit()
+            return user
+        except IndexError:
+            raise AuthenticationError('bad password')
+
+
+class EmailPasswordAuthentifier(BaseAuthentifier):
+    def authenticate(self, cnx, login, **authinfo):
+        # email_auth flag prevent from infinite recursion (call to
+        # repo.check_auth_info at the end of this method may lead us here again)
+        if not '@' in login or authinfo.pop('email_auth', None):
+            raise AuthenticationError('not an email')
+        rset = cnx.execute('Any L WHERE U login L, U primary_email M, '
+                               'M address %(login)s', {'login': login},
+                               build_descr=False)
+        if rset.rowcount != 1:
+            raise AuthenticationError('unexisting email')
+        login = rset.rows[0][0]
+        authinfo['email_auth'] = True
+        return self.source.repo.check_auth_info(cnx, login, authinfo)
+
+
+class DatabaseIndependentBackupRestore(object):
+    """Helper class to perform db backend agnostic backup and restore
+
+    The backup and restore methods are used to dump / restore the
+    system database in a database independent format. The file is a
+    Zip archive containing the following files:
+
+    * format.txt: the format of the archive. Currently '1.1'
+    * tables.txt: list of filenames in the archive tables/ directory
+    * sequences.txt: list of filenames in the archive sequences/ directory
+    * numranges.txt: list of filenames in the archive numrange/ directory
+    * versions.txt: the list of cube versions from CWProperty
+    * tables/<tablename>.<chunkno>: pickled data
+    * sequences/<sequencename>: pickled data
+
+    The pickled data format for tables, numranges and sequences is a tuple of 3 elements:
+    * the table name
+    * a tuple of column names
+    * a list of rows (as tuples with one element per column)
+
+    Tables are saved in chunks in different files in order to prevent
+    a too high memory consumption.
+    """
+    blocksize = 100
+
+    def __init__(self, source):
+        """
+        :param: source an instance of the system source
+        """
+        self._source = source
+        self.logger = logging.getLogger('cubicweb.ctl')
+        self.logger.setLevel(logging.INFO)
+        self.logger.addHandler(logging.StreamHandler(sys.stdout))
+        self.schema = self._source.schema
+        self.dbhelper = self._source.dbhelper
+        self.cnx = None
+        self.cursor = None
+        self.sql_generator = sqlgen.SQLGenerator()
+
+    def get_connection(self):
+        return self._source.get_connection()
+
+    def backup(self, backupfile):
+        archive = zipfile.ZipFile(backupfile, 'w', allowZip64=True)
+        self.cnx = self.get_connection()
+        try:
+            self.cursor = self.cnx.cursor()
+            self.cursor.arraysize = 100
+            self.logger.info('writing metadata')
+            self.write_metadata(archive)
+            for seq in self.get_sequences():
+                self.logger.info('processing sequence %s', seq)
+                self.write_sequence(archive, seq)
+            for numrange in self.get_numranges():
+                self.logger.info('processing numrange %s', numrange)
+                self.write_numrange(archive, numrange)
+            for table in self.get_tables():
+                self.logger.info('processing table %s', table)
+                self.write_table(archive, table)
+        finally:
+            archive.close()
+            self.cnx.close()
+        self.logger.info('done')
+
+    def get_tables(self):
+        non_entity_tables = ['entities',
+                             'transactions',
+                             'tx_entity_actions',
+                             'tx_relation_actions',
+                             ]
+        etype_tables = []
+        relation_tables = []
+        prefix = 'cw_'
+        for etype in self.schema.entities():
+            eschema = self.schema.eschema(etype)
+            if eschema.final:
+                continue
+            etype_tables.append('%s%s'%(prefix, etype))
+        for rtype in self.schema.relations():
+            rschema = self.schema.rschema(rtype)
+            if rschema.final or rschema.inlined or rschema in VIRTUAL_RTYPES:
+                continue
+            relation_tables.append('%s_relation' % rtype)
+        return non_entity_tables + etype_tables + relation_tables
+
+    def get_sequences(self):
+        return []
+
+    def get_numranges(self):
+        return ['entities_id_seq']
+
+    def write_metadata(self, archive):
+        archive.writestr('format.txt', '1.1')
+        archive.writestr('tables.txt', '\n'.join(self.get_tables()))
+        archive.writestr('sequences.txt', '\n'.join(self.get_sequences()))
+        archive.writestr('numranges.txt', '\n'.join(self.get_numranges()))
+        versions = self._get_versions()
+        versions_str = '\n'.join('%s %s' % (k, v)
+                                 for k, v in versions)
+        archive.writestr('versions.txt', versions_str)
+
+    def write_sequence(self, archive, seq):
+        sql = self.dbhelper.sql_sequence_current_state(seq)
+        columns, rows_iterator = self._get_cols_and_rows(sql)
+        rows = list(rows_iterator)
+        serialized = self._serialize(seq, columns, rows)
+        archive.writestr('sequences/%s' % seq, serialized)
+
+    def write_numrange(self, archive, numrange):
+        sql = self.dbhelper.sql_numrange_current_state(numrange)
+        columns, rows_iterator = self._get_cols_and_rows(sql)
+        rows = list(rows_iterator)
+        serialized = self._serialize(numrange, columns, rows)
+        archive.writestr('numrange/%s' % numrange, serialized)
+
+    def write_table(self, archive, table):
+        nb_lines_sql = 'SELECT COUNT(*) FROM %s' % table
+        self.cursor.execute(nb_lines_sql)
+        rowcount = self.cursor.fetchone()[0]
+        sql = 'SELECT * FROM %s' % table
+        columns, rows_iterator = self._get_cols_and_rows(sql)
+        self.logger.info('number of rows: %d', rowcount)
+        blocksize = self.blocksize
+        if rowcount > 0:
+            for i, start in enumerate(range(0, rowcount, blocksize)):
+                rows = list(itertools.islice(rows_iterator, blocksize))
+                serialized = self._serialize(table, columns, rows)
+                archive.writestr('tables/%s.%04d' % (table, i), serialized)
+                self.logger.debug('wrote rows %d to %d (out of %d) to %s.%04d',
+                                  start, start+len(rows)-1,
+                                  rowcount,
+                                  table, i)
+        else:
+            rows = []
+            serialized = self._serialize(table, columns, rows)
+            archive.writestr('tables/%s.%04d' % (table, 0), serialized)
+
+    def _get_cols_and_rows(self, sql):
+        process_result = self._source.iter_process_result
+        self.cursor.execute(sql)
+        columns = (d[0] for d in self.cursor.description)
+        rows = process_result(self.cursor)
+        return tuple(columns), rows
+
+    def _serialize(self, name, columns, rows):
+        return pickle.dumps((name, columns, rows), pickle.HIGHEST_PROTOCOL)
+
+    def restore(self, backupfile):
+        archive = zipfile.ZipFile(backupfile, 'r', allowZip64=True)
+        self.cnx = self.get_connection()
+        self.cursor = self.cnx.cursor()
+        sequences, numranges, tables, table_chunks = self.read_metadata(archive, backupfile)
+        for seq in sequences:
+            self.logger.info('restoring sequence %s', seq)
+            self.read_sequence(archive, seq)
+        for numrange in numranges:
+            self.logger.info('restoring numrange %s', numrange)
+            self.read_numrange(archive, numrange)
+        for table in tables:
+            self.logger.info('restoring table %s', table)
+            self.read_table(archive, table, sorted(table_chunks[table]))
+        self.cnx.close()
+        archive.close()
+        self.logger.info('done')
+
+    def read_metadata(self, archive, backupfile):
+        formatinfo = archive.read('format.txt')
+        self.logger.info('checking metadata')
+        if formatinfo.strip() != "1.1":
+            self.logger.critical('Unsupported format in archive: %s', formatinfo)
+            raise ValueError('Unknown format in %s: %s' % (backupfile, formatinfo))
+        tables = archive.read('tables.txt').splitlines()
+        sequences = archive.read('sequences.txt').splitlines()
+        numranges = archive.read('numranges.txt').splitlines()
+        file_versions = self._parse_versions(archive.read('versions.txt'))
+        versions = set(self._get_versions())
+        if file_versions != versions:
+            self.logger.critical('Unable to restore : versions do not match')
+            self.logger.critical('Expected:\n%s', '\n'.join('%s : %s' % (cube, ver)
+                                                            for cube, ver in sorted(versions)))
+            self.logger.critical('Found:\n%s', '\n'.join('%s : %s' % (cube, ver)
+                                                         for cube, ver in sorted(file_versions)))
+            raise ValueError('Unable to restore : versions do not match')
+        table_chunks = {}
+        for name in archive.namelist():
+            if not name.startswith('tables/'):
+                continue
+            filename = basename(name)
+            tablename, _ext = filename.rsplit('.', 1)
+            table_chunks.setdefault(tablename, []).append(name)
+        return sequences, numranges, tables, table_chunks
+
+    def read_sequence(self, archive, seq):
+        seqname, columns, rows = pickle.loads(archive.read('sequences/%s' % seq))
+        assert seqname == seq
+        assert len(rows) == 1
+        assert len(rows[0]) == 1
+        value = rows[0][0]
+        sql = self.dbhelper.sql_restart_sequence(seq, value)
+        self.cursor.execute(sql)
+        self.cnx.commit()
+
+    def read_numrange(self, archive, numrange):
+        rangename, columns, rows = pickle.loads(archive.read('numrange/%s' % numrange))
+        assert rangename == numrange
+        assert len(rows) == 1
+        assert len(rows[0]) == 1
+        value = rows[0][0]
+        sql = self.dbhelper.sql_restart_numrange(numrange, value)
+        self.cursor.execute(sql)
+        self.cnx.commit()
+
+    def read_table(self, archive, table, filenames):
+        merge_args = self._source.merge_args
+        self.cursor.execute('DELETE FROM %s' % table)
+        self.cnx.commit()
+        row_count = 0
+        for filename in filenames:
+            tablename, columns, rows = pickle.loads(archive.read(filename))
+            assert tablename == table
+            if not rows:
+                continue
+            insert = self.sql_generator.insert(table,
+                                               dict(zip(columns, rows[0])))
+            for row in rows:
+                self.cursor.execute(insert, merge_args(dict(zip(columns, row)), {}))
+            row_count += len(rows)
+            self.cnx.commit()
+        self.logger.info('inserted %d rows', row_count)
+
+
+    def _parse_versions(self, version_str):
+        versions = set()
+        for line in version_str.splitlines():
+            versions.add(tuple(line.split()))
+        return versions
+
+    def _get_versions(self):
+        version_sql = 'SELECT cw_pkey, cw_value FROM cw_CWProperty'
+        versions = []
+        self.cursor.execute(version_sql)
+        for pkey, value in self.cursor.fetchall():
+            if pkey.startswith(u'system.version'):
+                versions.append((pkey, value))
+        return versions