dataimport/massive_store.py
author Julien Cristau <julien.cristau@logilab.fr>
Mon, 09 Nov 2015 15:51:02 +0100
changeset 10872 ff4f94cfa2fb
parent 10871 1d4a94d04ec6
child 10873 0611466ce367
permissions -rw-r--r--
[dataimport] turn eids_seq_{start,range} into class attributes Only the master-mode store will actually use eids_seq_start to reset the eid sequence.

# coding: utf-8
# copyright 2015 LOGILAB S.A. (Paris, FRANCE), all rights reserved.
# contact http://www.logilab.fr/ -- mailto:contact@logilab.fr
#
# This file is part of CubicWeb.
#
# CubicWeb is free software: you can redistribute it and/or modify it under the
# terms of the GNU Lesser General Public License as published by the Free
# Software Foundation, either version 2.1 of the License, or (at your option)
# any later version.
#
# CubicWeb is distributed in the hope that it will be useful, but WITHOUT ANY
# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
# A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for more
# details.
#
# You should have received a copy of the GNU Lesser General Public License along
# with CubicWeb.  If not, see <http://www.gnu.org/licenses/>.

import logging
from datetime import datetime
from collections import defaultdict
from io import StringIO

from six.moves import range

from yams.constraints import SizeConstraint

from psycopg2 import ProgrammingError

from cubicweb.dataimport import stores, pgstore
from cubicweb.utils import make_uid
from cubicweb.server.sqlutils import SQL_PREFIX


class MassiveObjectStore(stores.RQLObjectStore):
    """
    Store for massive import of data, with delayed insertion of meta data.

    WARNINGS:
   - This store may be only used with PostgreSQL for now, as it relies
     on the COPY FROM method, and on specific PostgreSQL tables to get all
     the indexes.
   - This store can only insert relations that are not inlined (i.e.,
     which do *not* have inlined=True in their definition in the schema).


   It should be used as follows:

       store = MassiveObjectStore(cnx)
       store.init_rtype_table('Person', 'lives_in', 'Location')
       ...

       store.create_entity('Person', subj_iid_attribute=person_iid, ...)
       store.create_entity('Location', obj_iid_attribute=location_iid, ...)
       ...

       # subj_iid_attribute and obj_iid_attribute are argument names
       # chosen by the user (e.g. "cwuri"). These names can be identical.
       # person_iid and location_iid are unique IDs and depend on the data
       # (e.g URI).
       store.flush()
       store.relate_by_iid(person_iid, 'lives_in', location_iid)
       # For example:
       store.create_entity('Person',
                           cwuri='http://dbpedia.org/toto',
                           name='Toto')
       store.create_entity('Location',
                           uri='http://geonames.org/11111',
                           name='Somewhere')
       store.flush()
       store.relate_by_iid('http://dbpedia.org/toto',
                       'lives_in',
                       'http://geonames.org/11111')
       # Finally
       store.flush_meta_data()
       store.convert_relations('Person', 'lives_in', 'Location',
                               'subj_iid_attribute', 'obj_iid_attribute')
       # For the previous example:
       store.convert_relations('Person', 'lives_in', 'Location', 'cwuri', 'uri')
       ...
       store.cleanup()
    """
    # size of eid range reserved by the store for each batch
    eids_seq_range = 10000
    # initial eid (None means use the value in the db)
    eids_seq_start = None

    def __init__(self, cnx, autoflush_metadata=True,
                 commit_at_flush=True,
                 iid_maxsize=1024, uri_param_name='rdf:about',
                 on_commit_callback=None, on_rollback_callback=None,
                 slave_mode=False,
                 source=None):
        """ Create a MassiveObject store, with the following attributes:

        - cnx: CubicWeb cnx
        - autoflush_metadata: Boolean.
                              Automatically flush the metadata after
                              each flush()
        - commit_at_flush: Boolean. Commit after each flush().
        - iid_maxsize: Int. Max size of the iid, used to create the
                    iid_eid convertion table.
        - uri_param_name: String. If given, will use this parameter to get cw_uri
                          for entities.
        """
        super(MassiveObjectStore, self).__init__(cnx)
        self.logger = logging.getLogger('dataio.relationmixin')
        self._cnx = cnx
        self.sql = cnx.system_sql
        self.iid_maxsize = iid_maxsize
        self.commit_at_flush = commit_at_flush
        self._data_uri_relations = defaultdict(list)
        self._initialized = {'init_uri_eid': set(),
                             'uri_eid_inserted': set(),
                             'uri_rtypes': set(),
                             'entities': set(),
                             'rtypes': set(),
                            }
        self.sql = self._cnx.system_sql
        self.logger = logging.getLogger('dataio.massiveimport')
        self.autoflush_metadata = autoflush_metadata
        self.slave_mode = slave_mode
        self.size_constraints = get_size_constraints(cnx.vreg.schema)
        self.default_values = get_default_values(cnx.vreg.schema)
        pg_schema = cnx.repo.config.system_source_config.get('db-namespace', 'public')
        self._dbh = PGHelper(self._cnx, pg_schema)
        self._data_entities = defaultdict(list)
        self._data_relations = defaultdict(list)
        self._now = datetime.now()
        self._default_cwuri = make_uid('_auto_generated')
        self.uri_param_name = uri_param_name
        self._count_cwuri = 0
        self.commit_at_flush = commit_at_flush
        self.on_commit_callback = on_commit_callback
        self.on_rollback_callback = on_rollback_callback
        # Initialized the meta tables of dataio for warm restart
        self._init_dataio_metatables()
        # Internal markers of initialization
        if self.eids_seq_start is not None and not self.slave_mode:
            self._cnx.system_sql(self._cnx.repo.system_source.dbhelper.sql_restart_numrange(
                'entities_id_seq', initial_value=self.eids_seq_start + 1))
            cnx.commit()
        self.get_next_eid = lambda g=self._get_eid_gen(): next(g)
        # recreate then when self.finish() is called
        if not self.slave_mode:
            self._drop_metatables_constraints()
        if source is None:
            source = cnx.repo.system_source
        self.source = source
        self._etype_eid_idx = dict(cnx.execute('Any XN,X WHERE X is CWEType, X name XN'))
        cnx.read_security = False
        cnx.write_security = False

    ### INIT FUNCTIONS ########################################################

    def init_rtype_table(self, etype_from, rtype, etype_to):
        """ Build temporary table a for standard rtype """
        # Create an uri_eid table for each etype for a better
        # control of which etype is concerns for a particular
        # possibly multivalued relation.
        for etype in (etype_from, etype_to):
            if etype and etype not in self._initialized['init_uri_eid']:
                self._init_uri_eid_table(etype)
        if rtype not in self._initialized['uri_rtypes']:
            # Create the temporary tables
            if not self._cnx.repo.schema.rschema(rtype).inlined:
                try:
                    sql = 'CREATE TABLE %(r)s_relation_iid_tmp (uri_from character ' \
                          'varying(%(s)s), uri_to character varying(%(s)s))'
                    self.sql(sql % {'r': rtype, 's': self.iid_maxsize})
                except ProgrammingError:
                    # XXX Already exist (probably due to multiple import)
                    pass
            else:
                self.logger.warning("inlined relation %s: cannot insert it", rtype)
            #Add it to the initialized set
            self._initialized['uri_rtypes'].add(rtype)

    def _init_uri_eid_table(self, etype):
        """ Build a temporary table for id/eid convertion
        """
        try:
            sql = "CREATE TABLE uri_eid_%(e)s (uri character varying(%(size)s), eid integer)"
            self.sql(sql % {'e': etype.lower(), 'size': self.iid_maxsize,})
        except ProgrammingError:
            # XXX Already exist (probably due to multiple import)
            pass
        # Add it to the initialized set
        self._initialized['init_uri_eid'].add(etype)

    def _init_dataio_metatables(self):
        """ Initialized the meta tables of dataio for warm restart
        """
        # Check if dataio tables are not already created (i.e. a restart)
        self._initialized_table_created = self._dbh.table_exists('dataio_initialized')
        self._constraint_table_created = self._dbh.table_exists('dataio_constraints')
        self._metadata_table_created = self._dbh.table_exists('dataio_metadata')

    ### RELATE FUNCTION #######################################################

    def relate_by_iid(self, iid_from, rtype, iid_to):
        """Add new relation based on the internal id (iid)
        of the entities (not the eid)"""
        # Push data
        if isinstance(iid_from, unicode):
            iid_from = iid_from.encode('utf-8')
        if isinstance(iid_to, unicode):
            iid_to = iid_to.encode('utf-8')
        self._data_uri_relations[rtype].append({'uri_from': iid_from, 'uri_to': iid_to})

    ### FLUSH FUNCTIONS #######################################################

    def flush_relations(self):
        """ Flush the relations data
        """
        for rtype, data in self._data_uri_relations.items():
            if not data:
                self.logger.info('No data for rtype %s', rtype)
            buf = StringIO('\n'.join(['%(uri_from)s\t%(uri_to)s' % d for d in data]))
            if not buf:
                self.logger.info('Empty Buffer for rtype %s', rtype)
                continue
            cursor = self._cnx.cnxset.cu
            if not self._cnx.repo.schema.rschema(rtype).inlined:
                cursor.copy_from(buf, '%s_relation_iid_tmp' % rtype.lower(),
                                 null='NULL', columns=('uri_from', 'uri_to'))
            else:
                self.logger.warning("inlined relation %s: cannot insert it", rtype)
            buf.close()
            # Clear data cache
            self._data_uri_relations[rtype] = []
            # Commit if asked
            if self.commit_at_flush:
                self.commit()

    def fill_uri_eid_table(self, etype, uri_label):
        """ Fill the uri_eid table
        """
        self.logger.info('Fill uri_eid for etype %s', etype)
        sql = 'INSERT INTO uri_eid_%(e)s SELECT cw_%(l)s, cw_eid FROM cw_%(e)s'
        self.sql(sql % {'l': uri_label, 'e': etype.lower()})
        # Add indexes
        self.sql('CREATE INDEX uri_eid_%(e)s_idx ON uri_eid_%(e)s' '(uri)' % {'e': etype.lower()})
        # Set the etype as converted
        self._initialized['uri_eid_inserted'].add(etype)
        self.commit()

    def convert_relations(self, etype_from, rtype, etype_to,
                          uri_label_from='cwuri', uri_label_to='cwuri'):
        """ Flush the converted relations
        """
        # Always flush relations to be sure
        self.logger.info('Convert relations %s %s %s', etype_from, rtype, etype_to)
        self.flush_relations()
        if uri_label_from and etype_from not in self._initialized['uri_eid_inserted']:
            self.fill_uri_eid_table(etype_from, uri_label_from)
        if uri_label_to and etype_to not in self._initialized['uri_eid_inserted']:
            self.fill_uri_eid_table(etype_to, uri_label_to)
        if self._cnx.repo.schema.rschema(rtype).inlined:
            self.logger.warning("Can't insert inlined relation %s", rtype)
            return
        if uri_label_from and uri_label_to:
            sql = '''INSERT INTO %(r)s_relation (eid_from, eid_to) SELECT DISTINCT O1.eid, O2.eid
            FROM %(r)s_relation_iid_tmp AS T, uri_eid_%(ef)s as O1, uri_eid_%(et)s as O2
            WHERE O1.uri=T.uri_from AND O2.uri=T.uri_to AND NOT EXISTS (
            SELECT 1 FROM %(r)s_relation AS TT WHERE TT.eid_from=O1.eid AND TT.eid_to=O2.eid);
            '''
        elif uri_label_to:
            sql = '''INSERT INTO %(r)s_relation (eid_from, eid_to) SELECT DISTINCT
            CAST(T.uri_from AS INTEGER), O1.eid
            FROM %(r)s_relation_iid_tmp AS T, uri_eid_%(et)s as O1
            WHERE O1.uri=T.uri_to AND NOT EXISTS (
            SELECT 1 FROM %(r)s_relation AS TT WHERE
            TT.eid_from=CAST(T.uri_from AS INTEGER) AND TT.eid_to=O1.eid);
            '''
        elif uri_label_from:
            sql = '''INSERT INTO %(r)s_relation (eid_from, eid_to) SELECT DISTINCT O1.eid, T.uri_to
            O1.eid, CAST(T.uri_to AS INTEGER)
            FROM %(r)s_relation_iid_tmp AS T, uri_eid_%(ef)s as O1
            WHERE O1.uri=T.uri_from AND NOT EXISTS (
            SELECT 1 FROM %(r)s_relation AS TT WHERE
            TT.eid_from=O1.eid AND TT.eid_to=CAST(T.uri_to AS INTEGER));
            '''
        try:
            self.sql(sql % {'r': rtype.lower(),
                            'et': etype_to.lower() if etype_to else u'',
                            'ef': etype_from.lower() if etype_from else u''})
        except Exception as ex:
            self.logger.error("Can't insert relation %s: %s", rtype, ex)
        self.commit()

    ### SQL UTILITIES #########################################################

    def drop_and_store_indexes_constraints(self, tablename):
        # Drop indexes and constraints
        if not self._constraint_table_created:
            # Create a table to save the constraints
            # Allow reload even after crash
            sql = "CREATE TABLE dataio_constraints (origtable text, query text, type varchar(256))"
            self.sql(sql)
            self._constraint_table_created = True
        self._drop_table_constraints_indexes(tablename)

    def _drop_table_constraints_indexes(self, tablename):
        """ Drop and store table constraints and indexes """
        indexes, constraints = self._dbh.application_indexes_constraints(tablename)
        for name, query in constraints.items():
            sql = 'INSERT INTO dataio_constraints VALUES (%(e)s, %(c)s, %(t)s)'
            self.sql(sql, {'e': tablename, 'c': query, 't': 'constraint'})
            sql = 'ALTER TABLE %s DROP CONSTRAINT %s CASCADE' % (tablename, name)
            self.sql(sql)
        for name, query in indexes.items():
            sql = 'INSERT INTO dataio_constraints VALUES (%(e)s, %(c)s, %(t)s)'
            self.sql(sql, {'e': tablename, 'c': query, 't': 'index'})
            sql = 'DROP INDEX %s' % name
            self.sql(sql)

    def reapply_constraint_index(self, tablename):
        if not self._dbh.table_exists('dataio_constraints'):
            self.logger.info('The table dataio_constraints does not exist '
                             '(keep_index option should be True)')
            return
        sql = 'SELECT query FROM dataio_constraints WHERE origtable = %(e)s'
        crs = self.sql(sql, {'e': tablename})
        for query, in crs.fetchall():
            self.sql(query)
            self.sql('DELETE FROM dataio_constraints WHERE origtable = %(e)s '
                     'AND query = %(q)s', {'e': tablename, 'q': query})

    def _drop_metatables_constraints(self):
        """ Drop all the constraints for the meta data"""
        for tablename in ('created_by_relation', 'owned_by_relation',
                          'is_instance_of_relation', 'is_relation',
                          'entities'):
            self.drop_and_store_indexes_constraints(tablename)

    def _create_metatables_constraints(self):
        """ Create all the constraints for the meta data"""
        for tablename in ('entities',
                          'created_by_relation', 'owned_by_relation',
                          'is_instance_of_relation', 'is_relation'):
            # Indexes and constraints
            self.reapply_constraint_index(tablename)

    def init_relation_table(self, rtype):
        """ Get and remove all indexes for performance sake """
        # Create temporary table
        if not self.slave_mode and rtype not in self._initialized['rtypes']:
            sql = "CREATE TABLE %s_relation_tmp (eid_from integer, eid_to integer)" % rtype.lower()
            self.sql(sql)
            # Drop indexes and constraints
            tablename = '%s_relation' % rtype.lower()
            self.drop_and_store_indexes_constraints(tablename)
            # Push the etype in the initialized table for easier restart
            self.init_create_initialized_table()
            sql = 'INSERT INTO dataio_initialized VALUES (%(e)s, %(t)s)'
            self.sql(sql, {'e': rtype, 't': 'rtype'})
            # Mark rtype as "initialized" for faster check
            self._initialized['rtypes'].add(rtype)

    def init_create_initialized_table(self):
        """ Create the dataio initialized table
        """
        if not self._initialized_table_created:
            sql = "CREATE TABLE dataio_initialized (retype text, type varchar(128))"
            self.sql(sql)
            self._initialized_table_created = True

    def init_etype_table(self, etype):
        """ Add eid sequence to a particular etype table and
        remove all indexes for performance sake """
        if etype not in self._initialized['entities']:
            # Only for non-initialized etype and not slave mode store
            if not self.slave_mode:
                if self.eids_seq_range is None:
                    # Eids are directly set by the entities_id_seq.
                    # We attach this sequence to all the created etypes.
                    sql = ("ALTER TABLE cw_%s ALTER COLUMN cw_eid "
                           "SET DEFAULT nextval('entities_id_seq')" % etype.lower())
                    self.sql(sql)
                # Drop indexes and constraints
                tablename = 'cw_%s' % etype.lower()
                self.drop_and_store_indexes_constraints(tablename)
                # Push the etype in the initialized table for easier restart
                self.init_create_initialized_table()
                sql = 'INSERT INTO dataio_initialized VALUES (%(e)s, %(t)s)'
                self.sql(sql, {'e': etype, 't': 'etype'})
            # Mark etype as "initialized" for faster check
            self._initialized['entities'].add(etype)


    ### ENTITIES CREATION #####################################################

    def _get_eid_gen(self):
        """ Function getting the next eid. This is done by preselecting
        a given number of eids from the 'entities_id_seq', and then
        storing them"""
        while True:
            last_eid = self._cnx.repo.system_source.create_eid(self._cnx, self.eids_seq_range)
            for eid in range(last_eid - self.eids_seq_range + 1, last_eid + 1):
                yield eid

    def _apply_size_constraints(self, etype, kwargs):
        """Apply the size constraints for a given etype, attribute and value."""
        size_constraints = self.size_constraints[etype]
        for attr, value in kwargs.items():
            if value:
                maxsize = size_constraints.get(attr)
                if maxsize is not None and len(value) > maxsize:
                    kwargs[attr] = value[:maxsize-4] + '...'

    def _apply_default_values(self, etype, kwargs):
        """Apply the default values for a given etype, attribute and value."""
        default_values = self.default_values[etype]
        missing_keys = set(default_values) - set(kwargs)
        kwargs.update((key, default_values[key]) for key in missing_keys)

    # store api ################################################################

    def prepare_insert_entity(self, etype, **kwargs):
        """Given an entity type, attributes and inlined relations, returns the inserted entity's
        eid.
        """
        # Init the table if necessary
        self.init_etype_table(etype)
        # Add meta data if not given
        if 'modification_date' not in kwargs:
            kwargs['modification_date'] = self._now
        if 'creation_date' not in kwargs:
            kwargs['creation_date'] = self._now
        if 'cwuri' not in kwargs:
            if self.uri_param_name and self.uri_param_name in kwargs:
                kwargs['cwuri'] = kwargs[self.uri_param_name]
            else:
                kwargs['cwuri'] = self._default_cwuri + str(self._count_cwuri)
                self._count_cwuri += 1
        if 'eid' not in kwargs and self.eids_seq_range is not None:
            # If eid is not given and the eids sequence is set,
            # use the value from the sequence
            kwargs['eid'] = self.get_next_eid()
        self._apply_size_constraints(etype, kwargs)
        self._apply_default_values(etype, kwargs)
        self._data_entities[etype].append(kwargs)
        return kwargs.get('eid')

    def prepare_insert_relation(self, eid_from, rtype, eid_to, **kwargs):
        """Insert into the database a  relation ``rtype`` between entities with eids ``eid_from``
        and ``eid_to``.
        """
        # Init the table if necessary
        self.init_relation_table(rtype)
        self._data_relations[rtype].append({'eid_from': eid_from, 'eid_to': eid_to})

    def flush(self):
        """Flush the data"""
        self.flush_entities()
        self.flush_internal_relations()
        self.flush_relations()

    def commit(self):
        """Commit the database transaction."""
        self.on_commit()
        super(MassiveObjectStore, self).commit()

    def finish(self):
        """Remove temporary tables and columns."""
        self.logger.info("Start cleaning")
        if self.slave_mode:
            raise RuntimeError('Store cleanup is not allowed in slave mode')
        self.logger.info("Start cleaning")
        # Cleanup relations tables
        for etype in self._initialized['init_uri_eid']:
            self.sql('DROP TABLE uri_eid_%s' % etype.lower())
        # Remove relations tables
        for rtype in self._initialized['uri_rtypes']:
            if not self._cnx.repo.schema.rschema(rtype).inlined:
                self.sql('DROP TABLE %(r)s_relation_iid_tmp' % {'r': rtype})
            else:
                self.logger.warning("inlined relation %s: no cleanup to be done for it" % rtype)
        self.commit()
        # Get all the initialized etypes/rtypes
        if self._dbh.table_exists('dataio_initialized'):
            crs = self.sql('SELECT retype, type FROM dataio_initialized')
            for retype, _type in crs.fetchall():
                self.logger.info('Cleanup for %s' % retype)
                if _type == 'etype':
                    # Cleanup entities tables - Recreate indexes
                    self._cleanup_entities(retype)
                elif _type == 'rtype':
                    # Cleanup relations tables
                    self._cleanup_relations(retype)
                self.sql('DELETE FROM dataio_initialized WHERE retype = %(e)s',
                         {'e': retype})
        # Create meta constraints (entities, is_instance_of, ...)
        self._create_metatables_constraints()
        self.commit()
        # Delete the meta data table
        for table_name in ('dataio_initialized', 'dataio_constraints', 'dataio_metadata'):
            if self._dbh.table_exists(table_name):
                self.sql('DROP TABLE %s' % table_name)
        self.commit()

    ### FLUSH #################################################################

    def on_commit(self):
        if self.on_commit_callback:
            self.on_commit_callback()

    def on_rollback(self, exc, etype, data):
        if self.on_rollback_callback:
            self.on_rollback_callback(exc, etype, data)
            self._cnx.rollback()
        else:
            raise exc

    def flush_internal_relations(self):
        """ Flush the relations data
        """
        for rtype, data in self._data_relations.items():
            if not data:
                # There is no data for these etype for this flush round.
                continue
            buf = pgstore._create_copyfrom_buffer(data, ('eid_from', 'eid_to'))
            if not buf:
                # The buffer is empty. This is probably due to error in _create_copyfrom_buffer
                raise ValueError
            cursor = self._cnx.cnxset.cu
            # Push into the tmp table
            cursor.copy_from(buf, '%s_relation_tmp' % rtype.lower(),
                             null='NULL', columns=('eid_from', 'eid_to'))
            # Clear data cache
            self._data_relations[rtype] = []
            # Commit if asked
            if self.commit_at_flush:
                self.commit()

    def flush_entities(self):
        """ Flush the entities data
        """
        for etype, data in self._data_entities.items():
            if not data:
                # There is no data for these etype for this flush round.
                continue
            # XXX It may be interresting to directly infer the columns'
            # names from the schema instead of using .keys()
            columns = data[0].keys()
            # XXX For now, the _create_copyfrom_buffer does a "row[column]"
            # which can lead to a key error.
            # Thus we should create dictionary with all the keys.
            columns = set()
            for d in data:
                columns.update(d.keys())
            _data = []
            _base_data = dict.fromkeys(columns)
            for d in data:
                _d = _base_data.copy()
                _d.update(d)
                _data.append(_d)
            buf = pgstore._create_copyfrom_buffer(_data, columns)
            if not buf:
                # The buffer is empty. This is probably due to error in _create_copyfrom_buffer
                raise ValueError('Error in buffer creation for etype %s' % etype)
            columns = ['cw_%s' % attr for attr in columns]
            cursor = self._cnx.cnxset.cu
            try:
                cursor.copy_from(buf, 'cw_%s' % etype.lower(), null='NULL', columns=columns)
            except Exception as exc:
                self.on_rollback(exc, etype, data)
            # Clear data cache
            self._data_entities[etype] = []
        if self.autoflush_metadata:
            self.flush_meta_data()
        # Commit if asked
        if self.commit_at_flush:
            self.commit()

    def flush_meta_data(self):
        """ Flush the meta data (entities table, is_instance table, ...)
        """
        if self.slave_mode:
            raise RuntimeError('Flushing meta data is not allow in slave mode')
        if not self._dbh.table_exists('dataio_initialized'):
            self.logger.info('No information available for initialized etypes/rtypes')
            return
        if not self._metadata_table_created:
            # Keep the correctly flush meta data in database
            sql = "CREATE TABLE dataio_metadata (etype text)"
            self.sql(sql)
            self._metadata_table_created = True
        crs = self.sql('SELECT etype FROM dataio_metadata')
        already_flushed = set(e for e, in crs.fetchall())
        crs = self.sql('SELECT retype FROM dataio_initialized WHERE type = %(t)s',
                       {'t': 'etype'})
        all_etypes = set(e for e, in crs.fetchall())
        for etype in all_etypes:
            if etype not in already_flushed:
                # Deals with meta data
                self.logger.info('Flushing meta data for %s' % etype)
                self.insert_massive_meta_data(etype)
                sql = 'INSERT INTO dataio_metadata VALUES (%(e)s)'
                self.sql(sql, {'e': etype})
        # Final commit
        self.commit()

    def _cleanup_entities(self, etype):
        """ Cleanup etype table """
        if self.eids_seq_range is None:
            # Remove DEFAULT eids sequence if added
            sql = 'ALTER TABLE cw_%s ALTER COLUMN cw_eid DROP DEFAULT;' % etype.lower()
            self.sql(sql)
        # Create indexes and constraints
        tablename = SQL_PREFIX + etype.lower()
        self.reapply_constraint_index(tablename)

    def _cleanup_relations(self, rtype):
        """ Cleanup rtype table """
        # Push into relation table while removing duplicate
        sql = '''INSERT INTO %(r)s_relation (eid_from, eid_to) SELECT DISTINCT
                 T.eid_from, T.eid_to FROM %(r)s_relation_tmp AS T
                 WHERE NOT EXISTS (SELECT 1 FROM %(r)s_relation AS TT WHERE
                 TT.eid_from=T.eid_from AND TT.eid_to=T.eid_to);''' % {'r': rtype}
        self.sql(sql)
        # Drop temporary relation table
        sql = ('DROP TABLE %(r)s_relation_tmp' % {'r': rtype.lower()})
        self.sql(sql)
        # Create indexes and constraints
        tablename = '%s_relation' % rtype.lower()
        self.reapply_constraint_index(tablename)

    def insert_massive_meta_data(self, etype):
        """ Massive insertion of meta data for a given etype, based on SQL statements.
        """
        # Push data - Use coalesce to avoid NULL (and get 0), if there is no
        # entities of this type in the entities table.
        # Meta data relations
        self.metagen_push_relation(etype, self._cnx.user.eid, 'created_by_relation')
        self.metagen_push_relation(etype, self._cnx.user.eid, 'owned_by_relation')
        self.metagen_push_relation(etype, self.source.eid, 'cw_source_relation')
        self.metagen_push_relation(etype, self._etype_eid_idx[etype], 'is_relation')
        self.metagen_push_relation(etype, self._etype_eid_idx[etype], 'is_instance_of_relation')
        sql = ("INSERT INTO entities (eid, type, asource, extid) "
               "SELECT cw_eid, '%s', 'system', NULL FROM cw_%s "
               "WHERE NOT EXISTS (SELECT 1 FROM entities WHERE eid=cw_eid)"
               % (etype, etype.lower()))
        self.sql(sql)

    def metagen_push_relation(self, etype, eid_to, rtype):
        sql = ("INSERT INTO %s (eid_from, eid_to) SELECT cw_eid, %s FROM cw_%s "
               "WHERE NOT EXISTS (SELECT 1 FROM entities WHERE eid=cw_eid)"
               % (rtype, eid_to, etype.lower()))
        self.sql(sql)


### CONSTRAINTS MANAGEMENT FUNCTIONS  ##########################################

def get_size_constraints(schema):
    """analyzes yams ``schema`` and returns the list of size constraints.

    The returned value is a dictionary mapping entity types to a
    sub-dictionnaries mapping attribute names -> max size.
    """
    size_constraints = {}
    # iterates on all entity types
    for eschema in schema.entities():
        # for each entity type, iterates on attribute definitions
        size_constraints[eschema.type] = eschema_constraints = {}
        for rschema, aschema in eschema.attribute_definitions():
            # for each attribute, if a size constraint is found,
            # append it to the size constraint list
            maxsize = None
            rdef = rschema.rdef(eschema, aschema)
            for constraint in rdef.constraints:
                if isinstance(constraint, SizeConstraint):
                    maxsize = constraint.max
                    eschema_constraints[rschema.type] = maxsize
    return size_constraints

def get_default_values(schema):
    """analyzes yams ``schema`` and returns the list of default values.

    The returned value is a dictionary mapping entity types to a
    sub-dictionnaries mapping attribute names -> default values.
    """
    default_values = {}
    # iterates on all entity types
    for eschema in schema.entities():
        # for each entity type, iterates on attribute definitions
        default_values[eschema.type] = eschema_constraints = {}
        for rschema, _ in eschema.attribute_definitions():
            # for each attribute, if a size constraint is found,
            # append it to the size constraint list
            if eschema.default(rschema.type) is not None:
                eschema_constraints[rschema.type] = eschema.default(rschema.type)
    return default_values


class PGHelper(object):
    def __init__(self, cnx, pg_schema='public'):
        self.cnx = cnx
        # Deals with pg schema, see #3216686
        self.pg_schema = pg_schema

    def application_indexes_constraints(self, tablename):
        """ Get all the indexes/constraints for a given tablename """
        indexes = self.application_indexes(tablename)
        constraints = self.application_constraints(tablename)
        _indexes = {}
        for name, query in indexes.items():
            # Remove pkey indexes (automatically created by constraints)
            # Specific cases of primary key, see #3224079
            if name not in constraints:
                _indexes[name] = query
        return _indexes, constraints

    def table_exists(self, table_name):
        sql = "SELECT * from information_schema.tables WHERE table_name=%(t)s AND table_schema=%(s)s"
        crs = self.cnx.system_sql(sql, {'t': table_name, 's': self.pg_schema})
        res = crs.fetchall()
        if res:
            return True
        return False

    # def check_if_primary_key_exists_for_table(self, table_name):
    #     sql = ("SELECT constraint_name FROM information_schema.table_constraints "
    #            "WHERE constraint_type = 'PRIMARY KEY' AND table_name=%(t)s AND table_schema=%(s)s")
    #     crs = self.cnx.system_sql(sql, {'t': table_name, 's': self.pg_schema})
    #     res = crs.fetchall()
    #     if res:
    #         return True
    #     return False

    def index_query(self, name):
        """Get the request to be used to recreate the index"""
        return self.cnx.system_sql("SELECT pg_get_indexdef(c.oid) "
                                   "from pg_catalog.pg_class c "
                                   "LEFT JOIN pg_catalog.pg_namespace n "
                                   "ON n.oid = c.relnamespace "
                                   "WHERE c.relname = %(r)s AND n.nspname=%(n)s",
                                   {'r': name, 'n': self.pg_schema}).fetchone()[0]

    def constraint_query(self, name):
        """Get the request to be used to recreate the constraint"""
        return self.cnx.system_sql("SELECT pg_get_constraintdef(c.oid) "
                                   "from pg_catalog.pg_constraint c "
                                   "LEFT JOIN pg_catalog.pg_namespace n "
                                   "ON n.oid = c.connamespace "
                                   "WHERE c.conname = %(r)s AND n.nspname=%(n)s",
                                   {'r': name, 'n': self.pg_schema}).fetchone()[0]

    def application_indexes(self, tablename):
        """ Iterate over all the indexes """
        # This SQL query (cf http://www.postgresql.org/message-id/432F450F.4080700@squiz.net)
        # aims at getting all the indexes for each table.
        sql = '''SELECT c.relname as "Name"
        FROM pg_catalog.pg_class c
        JOIN pg_catalog.pg_index i ON i.indexrelid = c.oid
        JOIN pg_catalog.pg_class c2 ON i.indrelid = c2.oid
        LEFT JOIN pg_catalog.pg_user u ON u.usesysid = c.relowner
        LEFT JOIN pg_catalog.pg_namespace n ON n.oid = c.relnamespace
        WHERE c.relkind IN ('i','')
        AND c2.relname = '%s'
        AND n.nspname NOT IN ('pg_catalog', 'pg_toast')
        AND pg_catalog.pg_table_is_visible(c.oid);''' % tablename
        indexes_list = self.cnx.system_sql(sql).fetchall()
        indexes = {}
        for name, in indexes_list:
            indexes[name] = self.index_query(name)
        return indexes

    def application_constraints(self, tablename):
        """ Iterate over all the constraints """
        sql = '''SELECT i.conname as "Name"
                 FROM pg_catalog.pg_class c JOIN pg_catalog.pg_constraint i
                 ON i.conrelid = c.oid JOIN pg_catalog.pg_class c2 ON i.conrelid=c2.oid
                 LEFT JOIN pg_catalog.pg_user u ON u.usesysid = c.relowner
                 LEFT JOIN pg_catalog.pg_namespace n ON n.oid = c.relnamespace
                 WHERE c2.relname = '%s' AND n.nspname NOT IN ('pg_catalog', 'pg_toast')
                 AND pg_catalog.pg_table_is_visible(c.oid);''' % tablename
        indexes_list = self.cnx.system_sql(sql).fetchall()
        constraints = {}
        for name, in indexes_list:
            query = self.constraint_query(name)
            constraints[name] = 'ALTER TABLE %s ADD CONSTRAINT %s %s' % (tablename, name, query)
        return constraints