author Denis Laxalde <>
Tue, 21 Feb 2017 11:04:19 +0100
changeset 12141 29d032bb70d8
parent 11871 5f71460236a4
child 12171 970c32a4c7b7
child 12195 81cf4cf60411
permissions -rw-r--r--
Add a "Contributing" section to README with patch submission guidelines For the CubicWeb project and its dependencies, we now prefer patches submission and review by email on a public mailing list. We are thus moving away from the previous vcreview-based workflow taking place on the forge. This change is motivated by the following points: - the current reviewer assignment mechanism (pick a random reviewer, rely on reviewer availability rather than on willingness to review, send related patches to distinct people, etc.) is inefficient if not counter-productive; - most of the times, discussion only happens between the patch submitter and a reviewer with no easy way to increase the audience; - cubicweb-vcreview has no concept of patch series; - cubicweb-vcreview is not actively maintained anymore and its usability keeps deteriorating. We expect that email-based submission and review of patches will circumvent these limitations. Anybody interested in the project is welcome to subscribed to the mailing list and participate to the review process. This patch documents the basic workflow of patches submissions by email.

# coding: utf-8
# copyright 2015-2016 LOGILAB S.A. (Paris, FRANCE), all rights reserved.
# contact --
# This file is part of CubicWeb.
# CubicWeb is free software: you can redistribute it and/or modify it under the
# terms of the GNU Lesser General Public License as published by the Free
# Software Foundation, either version 2.1 of the License, or (at your option)
# any later version.
# CubicWeb is distributed in the hope that it will be useful, but WITHOUT ANY
# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
# A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for more
# details.
# You should have received a copy of the GNU Lesser General Public License along
# with CubicWeb.  If not, see <>.

from collections import defaultdict
from itertools import chain
import logging
from uuid import uuid4

from six import text_type
from six.moves import range

from cubicweb.dataimport import stores, pgstore
from cubicweb.server.schema2sql import eschema_sql_def

class MassiveObjectStore(stores.RQLObjectStore):
    """Store for massive import of data, with delayed insertion of meta data.


    - This store may only be used with PostgreSQL for now, as it relies
      on the COPY FROM method, and on specific PostgreSQL tables to get all
      the indexes.

    - This store can only insert relations that are not inlined (i.e.,
      which do *not* have inlined=True in their definition in the schema),
      unless they are specified as entity attributes.

    It should be used as follows:

       store = MassiveObjectStore(cnx)
       eid_p = store.prepare_insert_entity('Person',
       eid_loc = store.prepare_insert_entity('Location',
       store.prepare_insert_relation(eid_p, 'lives_in', eid_loc)

    Full-text indexation is not handled, you'll have to reindex the proper entity types by yourself
    if desired.

    def __init__(self, cnx, slave_mode=False, eids_seq_range=10000, metagen=None):
        """Create a MassiveObject store, with the following arguments:

        - `cnx`, a connection to the repository
        - `metagen`, optional :class:`MetadataGenerator` instance
        - `eids_seq_range`: size of eid range reserved by the store for each batch
        super(MassiveObjectStore, self).__init__(cnx)

        self.uuid = text_type(uuid4()).replace('-', '')
        self.slave_mode = slave_mode
        if metagen is None:
            metagen = stores.MetadataGenerator(cnx)
        self.metagen = metagen

        self.logger = logging.getLogger('dataimport.massive_store')
        self.sql = cnx.system_sql
        self.schema = cnx.vreg.schema
        self.default_values = get_default_values(self.schema)
        self.get_next_eid = lambda g=self._get_eid_gen(eids_seq_range): next(g)
        self._source_dbhelper = cnx.repo.system_source.dbhelper
        self._dbh = PGHelper(cnx)

        self._data_entities = defaultdict(list)
        self._data_relations = defaultdict(list)
        self._initialized = {}

    def _get_eid_gen(self, eids_seq_range):
        """ Function getting the next eid. This is done by preselecting
        a given number of eids from the 'entities_id_seq', and then
        storing them"""
        while True:
            last_eid = self._cnx.repo.system_source.create_eid(self._cnx, eids_seq_range)
            for eid in range(last_eid - eids_seq_range + 1, last_eid + 1):
                yield eid

    # master/slaves specific API

    def master_init(self, commit=True):
        """Initialize database for massive insertion.

        This is expected to be called once, by the master store in master/slaves configuration.
        assert not self.slave_mode
        if self not in self._initialized:
            self.sql('CREATE TABLE cwmassive_initialized'
                     '(retype text, type varchar(128), uuid varchar(32))')
            self._initialized[self] = None
            if commit:

    # SQL utilities #########################################################

    def _drop_metadata_constraints(self):
        """Drop constraints and indexes for the metadata tables.

        They will be recreated by the `finish` method.
        rtypes = [rtype for rtype in self.metagen.meta_relations
                  if not self.schema.rschema(rtype).final]
        rtypes += ('is_instance_of', 'is', 'cw_source')
        for rtype in rtypes:
            self._dbh.drop_constraints(rtype + '_relation')
            self._dbh.drop_indexes(rtype + '_relation')
        # don't drop constraints for the entities table, the only one is the primary key's index on
        # eid and we want to keep it

    def restart_eid_sequence(self, start_eid):
            'entities_id_seq', initial_value=start_eid))

    # store api ################################################################

    def prepare_insert_entity(self, etype, **data):
        """Given an entity type, attributes and inlined relations, returns the inserted entity's
        if etype not in self._initialized:
            if not self.slave_mode:
            tablename = 'cw_%s' % etype.lower()
            tmp_tablename = '%s_%s' % (tablename, self.uuid)
            self.sql("INSERT INTO cwmassive_initialized VALUES (%(e)s, 'etype', %(uuid)s)",
                     {'e': etype, 'uuid': self.uuid})
            attr_defs = eschema_sql_def(self._source_dbhelper, self.schema[etype])
            self.sql('CREATE TABLE %s(%s);' % (tmp_tablename,
                                               ', '.join('cw_%s %s' % (column, sqltype)
                                                         for column, sqltype in attr_defs)))
            self._initialized[etype] = [attr for attr, _ in attr_defs]

        if 'eid' not in data:
            # If eid is not given and the eids sequence is set, use the value from the sequence
            eid = self.get_next_eid()
            data['eid'] = eid
        return data['eid']

    def prepare_insert_relation(self, eid_from, rtype, eid_to, **kwargs):
        """Insert into the database a  relation ``rtype`` between entities with eids ``eid_from``
        and ``eid_to``.

        Relation must not be inlined.
        if rtype not in self._initialized:
            if not self.slave_mode:
            assert not self._cnx.vreg.schema.rschema(rtype).inlined
            self._initialized[rtype] = None
            tablename = '%s_relation' % rtype.lower()
            tmp_tablename = '%s_%s' % (tablename, self.uuid)
            self.sql("INSERT INTO cwmassive_initialized VALUES (%(r)s, 'rtype', %(uuid)s)",
                     {'r': rtype, 'uuid': self.uuid})
            self.sql('CREATE TABLE %s(eid_from integer, eid_to integer)' % tmp_tablename)
        self._data_relations[rtype].append({'eid_from': eid_from, 'eid_to': eid_to})

    def flush(self):
        """Flush the data"""

    def finish(self):
        """Remove temporary tables and columns."""
        assert not self.slave_mode, 'finish method should only be called by the master store'"Start cleaning")
        # Get all the initialized etypes/rtypes
        if self._dbh.table_exists('cwmassive_initialized'):
            cu = self.sql('SELECT retype, type, uuid FROM cwmassive_initialized')
            entities = defaultdict(list)
            relations = defaultdict(list)
            for retype, _type, uuid in cu.fetchall():
                if _type == 'rtype':
                else:  # _type = 'etype'
            # if there is some entities to insert, delete constraint on metadata tables once for all
            if entities:
            # get back entity data from the temporary tables
            for etype, uuids in entities.items():
                tablename = 'cw_%s' % etype.lower()
                attr_defs = eschema_sql_def(self._source_dbhelper, self.schema[etype])
                columns = ','.join('cw_%s' % attr for attr, _ in attr_defs)
                for uuid in uuids:
                    tmp_tablename = '%s_%s' % (tablename, uuid)
                    self.sql('INSERT INTO %(table)s(%(columns)s) '
                             'SELECT %(columns)s FROM %(tmp_table)s'
                             % {'table': tablename, 'tmp_table': tmp_tablename,
                                'columns': columns})
                    self._insert_etype_metadata(etype, tmp_tablename)
                    self._tmp_data_cleanup(tmp_tablename, etype, uuid)
            # get back relation data from the temporary tables
            for rtype, uuids in relations.items():
                tablename = '%s_relation' % rtype.lower()
                for uuid in uuids:
                    tmp_tablename = '%s_%s' % (tablename, uuid)
                    self.fill_relation_table(tablename, tmp_tablename)
                    self._tmp_data_cleanup(tmp_tablename, rtype, uuid)
        # restore all deleted indexes and constraints
        # delete the meta data table
        self.sql('DROP TABLE IF EXISTS cwmassive_initialized')

    def _insert_etype_metadata(self, etype, tmp_tablename):
        """Massive insertion of meta data for `etype`, with new entities in `tmp_tablename`.
        # insert standard metadata relations
        for rtype, eid in self.metagen.base_etype_rels(etype).items():
            self.fill_meta_relation_table(tmp_tablename, rtype, eid)
        # insert cw_source, is and is_instance_of relations (normally handled by the system source)
        self.fill_meta_relation_table(tmp_tablename, 'cw_source', self.metagen.source.eid)
        eschema = self.schema[etype]
        self.fill_meta_relation_table(tmp_tablename, 'is', eschema.eid)
        for parent_eschema in chain(eschema.ancestors(), [eschema]):
            self.fill_meta_relation_table(tmp_tablename, 'is_instance_of', parent_eschema.eid)
        self.fill_entities_table(etype, tmp_tablename)

    def fill_entities_table(self, etype, tmp_tablename):
        # finally insert records into the entities table
        self.sql("INSERT INTO entities(eid, type) "
                 "SELECT cw_eid, '%s' FROM %s "
                 "WHERE NOT EXISTS (SELECT 1 FROM entities WHERE eid=cw_eid)"
                 % (etype, tmp_tablename))

    def fill_relation_table(self, tablename, tmp_tablename):
        # XXX no index on the original relation table, EXISTS subquery may be sloooow
        self.sql('INSERT INTO %(table)s(eid_from, eid_to) SELECT DISTINCT '
                 'T.eid_from, T.eid_to FROM %(tmp_table)s AS T '
                 'WHERE NOT EXISTS (SELECT 1 FROM %(table)s AS TT WHERE '
                 'TT.eid_from=T.eid_from AND TT.eid_to=T.eid_to);'
                 % {'table': tablename, 'tmp_table': tmp_tablename})

    def fill_meta_relation_table(self, tmp_tablename, rtype, eid_to):
        self.sql("INSERT INTO %s_relation(eid_from, eid_to) SELECT cw_eid, %s FROM %s "
                 "WHERE NOT EXISTS (SELECT 1 FROM entities WHERE eid=cw_eid)"
                 % (rtype, eid_to, tmp_tablename))

    def _tmp_data_cleanup(self, tmp_tablename, ertype, uuid):
        """Drop temporary relation table and record from cwmassive_initialized."""
        self.sql('DROP TABLE %(tmp_table)s' % {'tmp_table': tmp_tablename})
        self.sql('DELETE FROM cwmassive_initialized '
                 'WHERE retype = %(rtype)s AND uuid = %(uuid)s',
                 {'rtype': ertype, 'uuid': uuid})

    # FLUSH #################################################################

    def flush_relations(self):
        """Flush the relations data from in-memory structures to a temporary table."""
        for rtype, data in self._data_relations.items():
            if not data:
                # There is no data for these etype for this flush round.
            buf = pgstore._create_copyfrom_buffer(data, ('eid_from', 'eid_to'))
            cursor =
            tablename = '%s_relation' % rtype.lower()
            tmp_tablename = '%s_%s' % (tablename, self.uuid)
            cursor.copy_from(buf, tmp_tablename, null='NULL', columns=('eid_from', 'eid_to'))
            # Clear data cache
            self._data_relations[rtype] = []

    def flush_entities(self):
        """Flush the entities data from in-memory structures to a temporary table."""
        metagen = self.metagen
        for etype, data in self._data_entities.items():
            if not data:
                # There is no data for these etype for this flush round.
            attrs = self._initialized[etype]
            _base_data = dict.fromkeys(attrs)
            _data = []
            for d in data:
                # do this first on `d`, because it won't fill keys associated to None as provided by
                # `_base_data`
                metagen.init_entity_attrs(etype, d['eid'], d)
                # XXX warn/raise if there is some key not in attrs?
                _d = _base_data.copy()
            buf = pgstore._create_copyfrom_buffer(_data, attrs)
            tablename = 'cw_%s' % etype.lower()
            tmp_tablename = '%s_%s' % (tablename, self.uuid)
            columns = ['cw_%s' % attr for attr in attrs]
            cursor =
            cursor.copy_from(buf, tmp_tablename, null='NULL', columns=columns)
            # Clear data cache
            self._data_entities[etype] = []

def get_default_values(schema):
    """analyzes yams ``schema`` and returns the list of default values.

    The returned value is a dictionary mapping entity types to a
    sub-dictionnaries mapping attribute names -> default values.
    default_values = {}
    # iterates on all entity types
    for eschema in schema.entities():
        # for each entity type, iterates on attribute definitions
        default_values[eschema.type] = eschema_constraints = {}
        for rschema, _ in eschema.attribute_definitions():
            # for each attribute, if a size constraint is found,
            # append it to the size constraint list
            if eschema.default(rschema.type) is not None:
                eschema_constraints[rschema.type] = eschema.default(rschema.type)
    return default_values

class PGHelper(object):
    """This class provides some helper methods to manipulate a postgres database metadata (index and

    def __init__(self, cnx):
        self.sql = cnx.system_sql
        # Deals with pg schema, see #3216686
        pg_schema = cnx.repo.config.system_source_config.get('db-namespace') or 'public'
        self.pg_schema = pg_schema

    def drop_indexes(self, tablename):
        """Drop indexes and constraints, storing them in a table for later restore."""
        # Create a table to save the constraints, it allows reloading even after crash
        self.sql('CREATE TABLE IF NOT EXISTS cwmassive_constraints(sql TEXT, insert_order SERIAL)')
        indexes = self.table_indexes(tablename)
        for name, query in indexes.items():
            self.sql('INSERT INTO cwmassive_constraints(sql) VALUES (%(sql)s)', {'sql': query})
            self.sql('DROP INDEX %s' % name)

    def drop_constraints(self, tablename):
        self.sql('CREATE TABLE IF NOT EXISTS cwmassive_constraints(sql TEXT, insert_order SERIAL)')
        constraints = self.table_constraints(tablename)
        for name, query in constraints.items():
            self.sql('INSERT INTO cwmassive_constraints(sql) VALUES (%(sql)s)', {'sql': query})
            self.sql('ALTER TABLE %s DROP CONSTRAINT %s' % (tablename, name))

    def restore_indexes_and_constraints(self):
        """Restore indexes and constraints."""
        if not self.table_exists('cwmassive_constraints'):
        cu = self.sql('SELECT sql, insert_order FROM cwmassive_constraints '
                      'ORDER BY insert_order DESC')
        for query, order in cu.fetchall():
            self.sql('DELETE FROM cwmassive_constraints WHERE insert_order=%(order)s',
                     {'order': order})
        self.sql('DROP TABLE cwmassive_constraints')

    def table_exists(self, tablename):
        """Return True if the given table already exists in the database."""
        cu = self.sql('SELECT 1 from information_schema.tables '
                      'WHERE table_name=%(t)s AND table_schema=%(s)s',
                      {'t': tablename, 's': self.pg_schema})
        return bool(cu.fetchone())

    def table_indexes(self, tablename):
        """Return a dictionary of indexes {index name: index sql}, constraints included."""
        indexes = {}
        for name in self._index_names(tablename):
            indexes[name] = self._index_sql(name)
        return indexes

    def table_constraints(self, tablename):
        """Return a dictionary of constraints {constraint name: constraint sql}."""
        constraints = {}
        for name in self._constraint_names(tablename):
            query = self._constraint_sql(name)
            constraints[name] = 'ALTER TABLE %s ADD CONSTRAINT %s %s' % (tablename, name, query)
        return constraints

    def _index_names(self, tablename):
        """Return the names of all indexes in the given table (including constraints.)"""
        cu = self.sql("SELECT c.relname FROM pg_catalog.pg_class c "
                      "JOIN pg_catalog.pg_index i ON i.indexrelid = c.oid "
                      "JOIN pg_catalog.pg_class c2 ON i.indrelid = c2.oid "
                      "LEFT JOIN pg_catalog.pg_user u ON u.usesysid = c.relowner "
                      "LEFT JOIN pg_catalog.pg_namespace n ON n.oid = c.relnamespace "
                      "WHERE c.relkind IN ('i','') "
                      " AND c2.relname = %(t)s "
                      " AND i.indisprimary = FALSE "
                      " AND n.nspname NOT IN ('pg_catalog', 'pg_toast') "
                      " AND pg_catalog.pg_table_is_visible(c.oid);", {'t': tablename})
        return [name for name, in cu.fetchall()]

    def _constraint_names(self, tablename):
        """Return the names of all constraints in the given table."""
        cu = self.sql("SELECT i.conname FROM pg_catalog.pg_class c "
                      "JOIN pg_catalog.pg_constraint i ON i.conrelid = c.oid "
                      "JOIN pg_catalog.pg_class c2 ON i.conrelid=c2.oid "
                      "LEFT JOIN pg_catalog.pg_user u ON u.usesysid = c.relowner "
                      "LEFT JOIN pg_catalog.pg_namespace n ON n.oid = c.relnamespace "
                      "WHERE c2.relname = %(t)s "
                      "AND n.nspname NOT IN ('pg_catalog', 'pg_toast') "
                      "AND pg_catalog.pg_table_is_visible(c.oid)", {'t': tablename})
        return [name for name, in cu.fetchall()]

    def _index_sql(self, name):
        """Return the SQL to be used to recreate the index of the given name."""
        return self.sql('SELECT pg_get_indexdef(c.oid) FROM pg_catalog.pg_class c '
                        'LEFT JOIN pg_catalog.pg_namespace n ON n.oid = c.relnamespace '
                        'WHERE c.relname = %(r)s AND n.nspname=%(n)s',
                        {'r': name, 'n': self.pg_schema}).fetchone()[0]

    def _constraint_sql(self, name):
        """Return the SQL to be used to recreate the constraint."""
        return self.sql('SELECT pg_get_constraintdef(c.oid) FROM pg_catalog.pg_constraint c '
                        'LEFT JOIN pg_catalog.pg_namespace n ON n.oid = c.connamespace '
                        'WHERE c.conname = %(r)s AND n.nspname=%(n)s',
                        {'r': name, 'n': self.pg_schema}).fetchone()[0]