dataimport/massive_store.py
changeset 11057 0b59724cb3f2
parent 11052 058bb3dc685f
child 11058 23eb30449fe5
equal deleted inserted replaced
11052:058bb3dc685f 11057:0b59724cb3f2
     1 # coding: utf-8
       
     2 # copyright 2015 LOGILAB S.A. (Paris, FRANCE), all rights reserved.
       
     3 # contact http://www.logilab.fr/ -- mailto:contact@logilab.fr
       
     4 #
       
     5 # This file is part of CubicWeb.
       
     6 #
       
     7 # CubicWeb is free software: you can redistribute it and/or modify it under the
       
     8 # terms of the GNU Lesser General Public License as published by the Free
       
     9 # Software Foundation, either version 2.1 of the License, or (at your option)
       
    10 # any later version.
       
    11 #
       
    12 # CubicWeb is distributed in the hope that it will be useful, but WITHOUT ANY
       
    13 # WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
       
    14 # A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for more
       
    15 # details.
       
    16 #
       
    17 # You should have received a copy of the GNU Lesser General Public License along
       
    18 # with CubicWeb.  If not, see <http://www.gnu.org/licenses/>.
       
    19 
       
    20 import logging
       
    21 from datetime import datetime
       
    22 from collections import defaultdict
       
    23 from io import StringIO
       
    24 
       
    25 from six.moves import range
       
    26 
       
    27 from yams.constraints import SizeConstraint
       
    28 
       
    29 from psycopg2 import ProgrammingError
       
    30 
       
    31 from cubicweb.server.schema2sql import rschema_has_table
       
    32 from cubicweb.schema import PURE_VIRTUAL_RTYPES
       
    33 from cubicweb.dataimport import stores, pgstore
       
    34 from cubicweb.utils import make_uid
       
    35 from cubicweb.server.sqlutils import SQL_PREFIX
       
    36 
       
    37 
       
    38 class MassiveObjectStore(stores.RQLObjectStore):
       
    39     """
       
    40     Store for massive import of data, with delayed insertion of meta data.
       
    41 
       
    42     WARNINGS:
       
    43    - This store may be only used with PostgreSQL for now, as it relies
       
    44      on the COPY FROM method, and on specific PostgreSQL tables to get all
       
    45      the indexes.
       
    46    - This store can only insert relations that are not inlined (i.e.,
       
    47      which do *not* have inlined=True in their definition in the schema).
       
    48 
       
    49    It should be used as follows:
       
    50 
       
    51        store = MassiveObjectStore(cnx)
       
    52        store.init_rtype_table('Person', 'lives_in', 'Location')
       
    53        ...
       
    54 
       
    55        store.prepare_insert_entity('Person', subj_iid_attribute=person_iid, ...)
       
    56        store.prepare_insert_entity('Location', obj_iid_attribute=location_iid, ...)
       
    57        ...
       
    58 
       
    59        # subj_iid_attribute and obj_iid_attribute are argument names
       
    60        # chosen by the user (e.g. "cwuri"). These names can be identical.
       
    61        # person_iid and location_iid are unique IDs and depend on the data
       
    62        # (e.g URI).
       
    63        store.flush()
       
    64        store.relate_by_iid(person_iid, 'lives_in', location_iid)
       
    65        # For example:
       
    66        store.prepare_insert_entity('Person',
       
    67                                    cwuri='http://dbpedia.org/toto',
       
    68                                    name='Toto')
       
    69        store.prepare_insert_entity('Location',
       
    70                                    uri='http://geonames.org/11111',
       
    71                                    name='Somewhere')
       
    72        store.flush()
       
    73        store.relate_by_iid('http://dbpedia.org/toto',
       
    74                            'lives_in',
       
    75                            'http://geonames.org/11111')
       
    76        # Finally
       
    77        store.convert_relations('Person', 'lives_in', 'Location',
       
    78                                'subj_iid_attribute', 'obj_iid_attribute')
       
    79        # For the previous example:
       
    80        store.convert_relations('Person', 'lives_in', 'Location', 'cwuri', 'uri')
       
    81        ...
       
    82        store.commit()
       
    83        store.finish()
       
    84     """
       
    85     # max size of the iid, used to create the iid_eid conversion table
       
    86     iid_maxsize = 1024
       
    87 
       
    88     def __init__(self, cnx,
       
    89                  on_commit_callback=None, on_rollback_callback=None,
       
    90                  slave_mode=False,
       
    91                  source=None,
       
    92                  eids_seq_range=10000):
       
    93         """ Create a MassiveObject store, with the following attributes:
       
    94 
       
    95         - cnx: CubicWeb cnx
       
    96         - eids_seq_range: size of eid range reserved by the store for each batch
       
    97         """
       
    98         super(MassiveObjectStore, self).__init__(cnx)
       
    99         self.logger = logging.getLogger('dataimport.massive_store')
       
   100         self._cnx = cnx
       
   101         self.sql = cnx.system_sql
       
   102         self._data_uri_relations = defaultdict(list)
       
   103         self.eids_seq_range = eids_seq_range
       
   104 
       
   105         # etypes for which we have a uri_eid_%(etype)s table
       
   106         self._init_uri_eid = set()
       
   107         # etypes for which we have a uri_eid_%(e)s_idx index
       
   108         self._uri_eid_inserted = set()
       
   109         # set of rtypes for which we have a %(rtype)s_relation_iid_tmp table
       
   110         self._uri_rtypes = set()
       
   111         # set of etypes whose tables are created
       
   112         self._entities = set()
       
   113         # set of rtypes for which we have a %(rtype)s_relation_tmp table
       
   114         self._rtypes = set()
       
   115 
       
   116         self.slave_mode = slave_mode
       
   117         self.default_values = get_default_values(cnx.vreg.schema)
       
   118         pg_schema = cnx.repo.config.system_source_config.get('db-namespace') or 'public'
       
   119         self._dbh = PGHelper(self._cnx, pg_schema)
       
   120         self._data_entities = defaultdict(list)
       
   121         self._data_relations = defaultdict(list)
       
   122         self._now = datetime.utcnow()
       
   123         self._default_cwuri = make_uid('_auto_generated')
       
   124         self._count_cwuri = 0
       
   125         self.on_commit_callback = on_commit_callback
       
   126         self.on_rollback_callback = on_rollback_callback
       
   127         # Do our meta tables already exist?
       
   128         self._init_massive_metatables()
       
   129         self.get_next_eid = lambda g=self._get_eid_gen(): next(g)
       
   130         # recreate then when self.finish() is called
       
   131 
       
   132         if not self.slave_mode:
       
   133             self._drop_all_constraints()
       
   134             self._drop_metatables_constraints()
       
   135         if source is None:
       
   136             source = cnx.repo.system_source
       
   137         self.source = source
       
   138         self._etype_eid_idx = dict(cnx.execute('Any XN,X WHERE X is CWEType, X name XN'))
       
   139         cnx.read_security = False
       
   140         cnx.write_security = False
       
   141 
       
   142     ### INIT FUNCTIONS ########################################################
       
   143 
       
   144     def _drop_all_constraints(self):
       
   145         schema = self._cnx.vreg.schema
       
   146         tables = ['cw_%s' % etype.type.lower()
       
   147                   for etype in schema.entities() if not etype.final]
       
   148         for rschema in schema.relations():
       
   149             if rschema.inlined:
       
   150                 continue
       
   151             elif rschema_has_table(rschema, skip_relations=PURE_VIRTUAL_RTYPES):
       
   152                 tables.append('%s_relation' % rschema.type.lower())
       
   153         tables.append('entities')
       
   154         for tablename in tables:
       
   155             self._store_and_drop_constraints(tablename)
       
   156 
       
   157     def _store_and_drop_constraints(self, tablename):
       
   158         if not self._constraint_table_created:
       
   159             # Create a table to save the constraints
       
   160             # Allow reload even after crash
       
   161             sql = "CREATE TABLE cwmassive_constraints (origtable text, query text, type varchar(256))"
       
   162             self.sql(sql)
       
   163             self._constraint_table_created = True
       
   164         constraints = self._dbh.application_constraints(tablename)
       
   165         for name, query in constraints.items():
       
   166             sql = 'INSERT INTO cwmassive_constraints VALUES (%(e)s, %(c)s, %(t)s)'
       
   167             self.sql(sql, {'e': tablename, 'c': query, 't': 'constraint'})
       
   168             sql = 'ALTER TABLE %s DROP CONSTRAINT %s' % (tablename, name)
       
   169             self.sql(sql)
       
   170 
       
   171     def reapply_all_constraints(self):
       
   172         if not self._dbh.table_exists('cwmassive_constraints'):
       
   173             self.logger.info('The table cwmassive_constraints does not exist')
       
   174             return
       
   175         sql = 'SELECT query FROM cwmassive_constraints WHERE type = %(t)s'
       
   176         crs = self.sql(sql, {'t': 'constraint'})
       
   177         for query, in crs.fetchall():
       
   178             self.sql(query)
       
   179             self.sql('DELETE FROM cwmassive_constraints WHERE type = %(t)s '
       
   180                      'AND query = %(q)s', {'t': 'constraint', 'q': query})
       
   181 
       
   182     def init_rtype_table(self, etype_from, rtype, etype_to):
       
   183         """ Build temporary table for standard rtype """
       
   184         # Create an uri_eid table for each etype for a better
       
   185         # control of which etype is concerned by a particular
       
   186         # possibly multivalued relation.
       
   187         for etype in (etype_from, etype_to):
       
   188             if etype and etype not in self._init_uri_eid:
       
   189                 self._init_uri_eid_table(etype)
       
   190         if rtype not in self._uri_rtypes:
       
   191             # Create the temporary table
       
   192             if not self._cnx.repo.schema.rschema(rtype).inlined:
       
   193                 try:
       
   194                     sql = 'CREATE TABLE %(r)s_relation_iid_tmp (uri_from character ' \
       
   195                           'varying(%(s)s), uri_to character varying(%(s)s))'
       
   196                     self.sql(sql % {'r': rtype, 's': self.iid_maxsize})
       
   197                 except ProgrammingError:
       
   198                     # XXX Already exist (probably due to multiple import)
       
   199                     pass
       
   200             else:
       
   201                 self.logger.warning("inlined relation %s: cannot insert it", rtype)
       
   202             # Add it to the initialized set
       
   203             self._uri_rtypes.add(rtype)
       
   204 
       
   205     def _init_uri_eid_table(self, etype):
       
   206         """ Build a temporary table for id/eid convertion
       
   207         """
       
   208         try:
       
   209             sql = "CREATE TABLE uri_eid_%(e)s (uri character varying(%(size)s), eid integer)"
       
   210             self.sql(sql % {'e': etype.lower(), 'size': self.iid_maxsize,})
       
   211         except ProgrammingError:
       
   212             # XXX Already exist (probably due to multiple import)
       
   213             pass
       
   214         # Add it to the initialized set
       
   215         self._init_uri_eid.add(etype)
       
   216 
       
   217     def _init_massive_metatables(self):
       
   218         # Check if our tables are not already created (i.e. a restart)
       
   219         self._initialized_table_created = self._dbh.table_exists('cwmassive_initialized')
       
   220         self._constraint_table_created = self._dbh.table_exists('cwmassive_constraints')
       
   221         self._metadata_table_created = self._dbh.table_exists('cwmassive_metadata')
       
   222 
       
   223     ### RELATE FUNCTION #######################################################
       
   224 
       
   225     def relate_by_iid(self, iid_from, rtype, iid_to):
       
   226         """Add new relation based on the internal id (iid)
       
   227         of the entities (not the eid)"""
       
   228         # Push data
       
   229         if isinstance(iid_from, unicode):
       
   230             iid_from = iid_from.encode('utf-8')
       
   231         if isinstance(iid_to, unicode):
       
   232             iid_to = iid_to.encode('utf-8')
       
   233         self._data_uri_relations[rtype].append({'uri_from': iid_from, 'uri_to': iid_to})
       
   234 
       
   235     ### FLUSH FUNCTIONS #######################################################
       
   236 
       
   237     def flush_relations(self):
       
   238         """ Flush the relations data
       
   239         """
       
   240         for rtype, data in self._data_uri_relations.items():
       
   241             if not data:
       
   242                 self.logger.info('No data for rtype %s', rtype)
       
   243             buf = StringIO('\n'.join(['%(uri_from)s\t%(uri_to)s' % d for d in data]))
       
   244             if not buf:
       
   245                 self.logger.info('Empty Buffer for rtype %s', rtype)
       
   246                 continue
       
   247             cursor = self._cnx.cnxset.cu
       
   248             if not self._cnx.repo.schema.rschema(rtype).inlined:
       
   249                 cursor.copy_from(buf, '%s_relation_iid_tmp' % rtype.lower(),
       
   250                                  null='NULL', columns=('uri_from', 'uri_to'))
       
   251             else:
       
   252                 self.logger.warning("inlined relation %s: cannot insert it", rtype)
       
   253             buf.close()
       
   254             # Clear data cache
       
   255             self._data_uri_relations[rtype] = []
       
   256 
       
   257     def fill_uri_eid_table(self, etype, uri_label):
       
   258         """ Fill the uri_eid table
       
   259         """
       
   260         self.logger.info('Fill uri_eid for etype %s', etype)
       
   261         sql = 'INSERT INTO uri_eid_%(e)s SELECT cw_%(l)s, cw_eid FROM cw_%(e)s'
       
   262         self.sql(sql % {'l': uri_label, 'e': etype.lower()})
       
   263         # Add indexes
       
   264         self.sql('CREATE INDEX uri_eid_%(e)s_idx ON uri_eid_%(e)s' '(uri)' % {'e': etype.lower()})
       
   265         # Set the etype as converted
       
   266         self._uri_eid_inserted.add(etype)
       
   267 
       
   268     def convert_relations(self, etype_from, rtype, etype_to,
       
   269                           uri_label_from='cwuri', uri_label_to='cwuri'):
       
   270         """ Flush the converted relations
       
   271         """
       
   272         # Always flush relations to be sure
       
   273         self.logger.info('Convert relations %s %s %s', etype_from, rtype, etype_to)
       
   274         self.flush_relations()
       
   275         if uri_label_from and etype_from not in self._uri_eid_inserted:
       
   276             self.fill_uri_eid_table(etype_from, uri_label_from)
       
   277         if uri_label_to and etype_to not in self._uri_eid_inserted:
       
   278             self.fill_uri_eid_table(etype_to, uri_label_to)
       
   279         if self._cnx.repo.schema.rschema(rtype).inlined:
       
   280             self.logger.warning("Can't insert inlined relation %s", rtype)
       
   281             return
       
   282         if uri_label_from and uri_label_to:
       
   283             sql = '''INSERT INTO %(r)s_relation (eid_from, eid_to) SELECT DISTINCT O1.eid, O2.eid
       
   284             FROM %(r)s_relation_iid_tmp AS T, uri_eid_%(ef)s as O1, uri_eid_%(et)s as O2
       
   285             WHERE O1.uri=T.uri_from AND O2.uri=T.uri_to AND NOT EXISTS (
       
   286             SELECT 1 FROM %(r)s_relation AS TT WHERE TT.eid_from=O1.eid AND TT.eid_to=O2.eid);
       
   287             '''
       
   288         elif uri_label_to:
       
   289             sql = '''INSERT INTO %(r)s_relation (eid_from, eid_to) SELECT DISTINCT
       
   290             CAST(T.uri_from AS INTEGER), O1.eid
       
   291             FROM %(r)s_relation_iid_tmp AS T, uri_eid_%(et)s as O1
       
   292             WHERE O1.uri=T.uri_to AND NOT EXISTS (
       
   293             SELECT 1 FROM %(r)s_relation AS TT WHERE
       
   294             TT.eid_from=CAST(T.uri_from AS INTEGER) AND TT.eid_to=O1.eid);
       
   295             '''
       
   296         elif uri_label_from:
       
   297             sql = '''INSERT INTO %(r)s_relation (eid_from, eid_to) SELECT DISTINCT O1.eid, T.uri_to
       
   298             O1.eid, CAST(T.uri_to AS INTEGER)
       
   299             FROM %(r)s_relation_iid_tmp AS T, uri_eid_%(ef)s as O1
       
   300             WHERE O1.uri=T.uri_from AND NOT EXISTS (
       
   301             SELECT 1 FROM %(r)s_relation AS TT WHERE
       
   302             TT.eid_from=O1.eid AND TT.eid_to=CAST(T.uri_to AS INTEGER));
       
   303             '''
       
   304         try:
       
   305             self.sql(sql % {'r': rtype.lower(),
       
   306                             'et': etype_to.lower() if etype_to else u'',
       
   307                             'ef': etype_from.lower() if etype_from else u''})
       
   308         except Exception as ex:
       
   309             self.logger.error("Can't insert relation %s: %s", rtype, ex)
       
   310 
       
   311     ### SQL UTILITIES #########################################################
       
   312 
       
   313     def drop_and_store_indexes(self, tablename):
       
   314         # Drop indexes and constraints
       
   315         if not self._constraint_table_created:
       
   316             # Create a table to save the constraints
       
   317             # Allow reload even after crash
       
   318             sql = "CREATE TABLE cwmassive_constraints (origtable text, query text, type varchar(256))"
       
   319             self.sql(sql)
       
   320             self._constraint_table_created = True
       
   321         self._drop_table_indexes(tablename)
       
   322 
       
   323     def _drop_table_indexes(self, tablename):
       
   324         """ Drop and store table constraints and indexes """
       
   325         indexes = self._dbh.application_indexes(tablename)
       
   326         for name, query in indexes.items():
       
   327             sql = 'INSERT INTO cwmassive_constraints VALUES (%(e)s, %(c)s, %(t)s)'
       
   328             self.sql(sql, {'e': tablename, 'c': query, 't': 'index'})
       
   329             sql = 'DROP INDEX %s' % name
       
   330             self.sql(sql)
       
   331 
       
   332     def reapply_constraint_index(self, tablename):
       
   333         if not self._dbh.table_exists('cwmassive_constraints'):
       
   334             self.logger.info('The table cwmassive_constraints does not exist')
       
   335             return
       
   336         sql = 'SELECT query FROM cwmassive_constraints WHERE origtable = %(e)s'
       
   337         crs = self.sql(sql, {'e': tablename})
       
   338         for query, in crs.fetchall():
       
   339             self.sql(query)
       
   340             self.sql('DELETE FROM cwmassive_constraints WHERE origtable = %(e)s '
       
   341                      'AND query = %(q)s', {'e': tablename, 'q': query})
       
   342 
       
   343     def _drop_metatables_constraints(self):
       
   344         """ Drop all the constraints for the meta data"""
       
   345         for tablename in ('created_by_relation', 'owned_by_relation',
       
   346                           'is_instance_of_relation', 'is_relation',
       
   347                           'entities'):
       
   348             self.drop_and_store_indexes(tablename)
       
   349 
       
   350     def _create_metatables_constraints(self):
       
   351         """ Create all the constraints for the meta data"""
       
   352         for tablename in ('entities',
       
   353                           'created_by_relation', 'owned_by_relation',
       
   354                           'is_instance_of_relation', 'is_relation'):
       
   355             # Indexes and constraints
       
   356             self.reapply_constraint_index(tablename)
       
   357 
       
   358     def init_relation_table(self, rtype):
       
   359         """ Get and remove all indexes for performance sake """
       
   360         # Create temporary table
       
   361         if not self.slave_mode and rtype not in self._rtypes:
       
   362             sql = "CREATE TABLE %s_relation_tmp (eid_from integer, eid_to integer)" % rtype.lower()
       
   363             self.sql(sql)
       
   364             # Drop indexes and constraints
       
   365             tablename = '%s_relation' % rtype.lower()
       
   366             self.drop_and_store_indexes(tablename)
       
   367             # Push the etype in the initialized table for easier restart
       
   368             self.init_create_initialized_table()
       
   369             sql = 'INSERT INTO cwmassive_initialized VALUES (%(e)s, %(t)s)'
       
   370             self.sql(sql, {'e': rtype, 't': 'rtype'})
       
   371             # Mark rtype as "initialized" for faster check
       
   372             self._rtypes.add(rtype)
       
   373 
       
   374     def init_create_initialized_table(self):
       
   375         """ Create the cwmassive initialized table
       
   376         """
       
   377         if not self._initialized_table_created:
       
   378             sql = "CREATE TABLE cwmassive_initialized (retype text, type varchar(128))"
       
   379             self.sql(sql)
       
   380             self._initialized_table_created = True
       
   381 
       
   382     def init_etype_table(self, etype):
       
   383         """ Add eid sequence to a particular etype table and
       
   384         remove all indexes for performance sake """
       
   385         if etype not in self._entities:
       
   386             # Only for non-initialized etype and not slave mode store
       
   387             if not self.slave_mode:
       
   388                 # Drop indexes and constraints
       
   389                 tablename = 'cw_%s' % etype.lower()
       
   390                 self.drop_and_store_indexes(tablename)
       
   391                 # Push the etype in the initialized table for easier restart
       
   392                 self.init_create_initialized_table()
       
   393                 sql = 'INSERT INTO cwmassive_initialized VALUES (%(e)s, %(t)s)'
       
   394                 self.sql(sql, {'e': etype, 't': 'etype'})
       
   395             # Mark etype as "initialized" for faster check
       
   396             self._entities.add(etype)
       
   397 
       
   398     def restart_eid_sequence(self, start_eid):
       
   399         self._cnx.system_sql(self._cnx.repo.system_source.dbhelper.sql_restart_numrange(
       
   400             'entities_id_seq', initial_value=start_eid))
       
   401         self._cnx.commit()
       
   402 
       
   403     ### ENTITIES CREATION #####################################################
       
   404 
       
   405     def _get_eid_gen(self):
       
   406         """ Function getting the next eid. This is done by preselecting
       
   407         a given number of eids from the 'entities_id_seq', and then
       
   408         storing them"""
       
   409         while True:
       
   410             last_eid = self._cnx.repo.system_source.create_eid(self._cnx, self.eids_seq_range)
       
   411             for eid in range(last_eid - self.eids_seq_range + 1, last_eid + 1):
       
   412                 yield eid
       
   413 
       
   414     def _apply_default_values(self, etype, kwargs):
       
   415         """Apply the default values for a given etype, attribute and value."""
       
   416         default_values = self.default_values[etype]
       
   417         missing_keys = set(default_values) - set(kwargs)
       
   418         kwargs.update((key, default_values[key]) for key in missing_keys)
       
   419 
       
   420     # store api ################################################################
       
   421 
       
   422     def prepare_insert_entity(self, etype, **kwargs):
       
   423         """Given an entity type, attributes and inlined relations, returns the inserted entity's
       
   424         eid.
       
   425         """
       
   426         # Init the table if necessary
       
   427         self.init_etype_table(etype)
       
   428         # Add meta data if not given
       
   429         if 'modification_date' not in kwargs:
       
   430             kwargs['modification_date'] = self._now
       
   431         if 'creation_date' not in kwargs:
       
   432             kwargs['creation_date'] = self._now
       
   433         if 'cwuri' not in kwargs:
       
   434             kwargs['cwuri'] = self._default_cwuri + str(self._count_cwuri)
       
   435             self._count_cwuri += 1
       
   436         if 'eid' not in kwargs:
       
   437             # If eid is not given and the eids sequence is set,
       
   438             # use the value from the sequence
       
   439             kwargs['eid'] = self.get_next_eid()
       
   440         self._apply_default_values(etype, kwargs)
       
   441         self._data_entities[etype].append(kwargs)
       
   442         return kwargs.get('eid')
       
   443 
       
   444     def prepare_insert_relation(self, eid_from, rtype, eid_to, **kwargs):
       
   445         """Insert into the database a  relation ``rtype`` between entities with eids ``eid_from``
       
   446         and ``eid_to``.
       
   447         """
       
   448         # Init the table if necessary
       
   449         self.init_relation_table(rtype)
       
   450         self._data_relations[rtype].append({'eid_from': eid_from, 'eid_to': eid_to})
       
   451 
       
   452     def flush(self):
       
   453         """Flush the data"""
       
   454         self.flush_entities()
       
   455         self.flush_internal_relations()
       
   456         self.flush_relations()
       
   457 
       
   458     def commit(self):
       
   459         """Commit the database transaction."""
       
   460         self.on_commit()
       
   461         super(MassiveObjectStore, self).commit()
       
   462 
       
   463     def finish(self):
       
   464         """Remove temporary tables and columns."""
       
   465         self.logger.info("Start cleaning")
       
   466         if self.slave_mode:
       
   467             raise RuntimeError('Store cleanup is not allowed in slave mode')
       
   468         self.logger.info("Start cleaning")
       
   469         # Cleanup relations tables
       
   470         for etype in self._init_uri_eid:
       
   471             self.sql('DROP TABLE uri_eid_%s' % etype.lower())
       
   472         # Remove relations tables
       
   473         for rtype in self._uri_rtypes:
       
   474             if not self._cnx.repo.schema.rschema(rtype).inlined:
       
   475                 self.sql('DROP TABLE %(r)s_relation_iid_tmp' % {'r': rtype})
       
   476             else:
       
   477                 self.logger.warning("inlined relation %s: no cleanup to be done for it" % rtype)
       
   478         # Create meta constraints (entities, is_instance_of, ...)
       
   479         self._create_metatables_constraints()
       
   480         # Get all the initialized etypes/rtypes
       
   481         if self._dbh.table_exists('cwmassive_initialized'):
       
   482             crs = self.sql('SELECT retype, type FROM cwmassive_initialized')
       
   483             for retype, _type in crs.fetchall():
       
   484                 self.logger.info('Cleanup for %s' % retype)
       
   485                 if _type == 'etype':
       
   486                     # Cleanup entities tables - Recreate indexes
       
   487                     self._cleanup_entities(retype)
       
   488                 elif _type == 'rtype':
       
   489                     # Cleanup relations tables
       
   490                     self._cleanup_relations(retype)
       
   491                 self.sql('DELETE FROM cwmassive_initialized WHERE retype = %(e)s',
       
   492                          {'e': retype})
       
   493         self.reapply_all_constraints()
       
   494         # Delete the meta data table
       
   495         for table_name in ('cwmassive_initialized', 'cwmassive_constraints', 'cwmassive_metadata'):
       
   496             if self._dbh.table_exists(table_name):
       
   497                 self.sql('DROP TABLE %s' % table_name)
       
   498         self.commit()
       
   499 
       
   500     ### FLUSH #################################################################
       
   501 
       
   502     def on_commit(self):
       
   503         if self.on_commit_callback:
       
   504             self.on_commit_callback()
       
   505 
       
   506     def on_rollback(self, exc, etype, data):
       
   507         if self.on_rollback_callback:
       
   508             self.on_rollback_callback(exc, etype, data)
       
   509             self._cnx.rollback()
       
   510         else:
       
   511             raise exc
       
   512 
       
   513     def flush_internal_relations(self):
       
   514         """ Flush the relations data
       
   515         """
       
   516         for rtype, data in self._data_relations.items():
       
   517             if not data:
       
   518                 # There is no data for these etype for this flush round.
       
   519                 continue
       
   520             buf = pgstore._create_copyfrom_buffer(data, ('eid_from', 'eid_to'))
       
   521             if not buf:
       
   522                 # The buffer is empty. This is probably due to error in _create_copyfrom_buffer
       
   523                 raise ValueError
       
   524             cursor = self._cnx.cnxset.cu
       
   525             # Push into the tmp table
       
   526             cursor.copy_from(buf, '%s_relation_tmp' % rtype.lower(),
       
   527                              null='NULL', columns=('eid_from', 'eid_to'))
       
   528             # Clear data cache
       
   529             self._data_relations[rtype] = []
       
   530 
       
   531     def flush_entities(self):
       
   532         """ Flush the entities data
       
   533         """
       
   534         for etype, data in self._data_entities.items():
       
   535             if not data:
       
   536                 # There is no data for these etype for this flush round.
       
   537                 continue
       
   538             # XXX It may be interresting to directly infer the columns'
       
   539             # names from the schema instead of using .keys()
       
   540             columns = data[0].keys()
       
   541             # XXX For now, the _create_copyfrom_buffer does a "row[column]"
       
   542             # which can lead to a key error.
       
   543             # Thus we should create dictionary with all the keys.
       
   544             columns = set()
       
   545             for d in data:
       
   546                 columns.update(d.keys())
       
   547             _data = []
       
   548             _base_data = dict.fromkeys(columns)
       
   549             for d in data:
       
   550                 _d = _base_data.copy()
       
   551                 _d.update(d)
       
   552                 _data.append(_d)
       
   553             buf = pgstore._create_copyfrom_buffer(_data, columns)
       
   554             if not buf:
       
   555                 # The buffer is empty. This is probably due to error in _create_copyfrom_buffer
       
   556                 raise ValueError('Error in buffer creation for etype %s' % etype)
       
   557             columns = ['cw_%s' % attr for attr in columns]
       
   558             cursor = self._cnx.cnxset.cu
       
   559             try:
       
   560                 cursor.copy_from(buf, 'cw_%s' % etype.lower(), null='NULL', columns=columns)
       
   561             except Exception as exc:
       
   562                 self.on_rollback(exc, etype, data)
       
   563             # Clear data cache
       
   564             self._data_entities[etype] = []
       
   565         if not self.slave_mode:
       
   566             self.flush_meta_data()
       
   567 
       
   568     def flush_meta_data(self):
       
   569         """ Flush the meta data (entities table, is_instance table, ...)
       
   570         """
       
   571         if self.slave_mode:
       
   572             raise RuntimeError('Flushing meta data is not allow in slave mode')
       
   573         if not self._dbh.table_exists('cwmassive_initialized'):
       
   574             self.logger.info('No information available for initialized etypes/rtypes')
       
   575             return
       
   576         if not self._metadata_table_created:
       
   577             # Keep the correctly flush meta data in database
       
   578             sql = "CREATE TABLE cwmassive_metadata (etype text)"
       
   579             self.sql(sql)
       
   580             self._metadata_table_created = True
       
   581         crs = self.sql('SELECT etype FROM cwmassive_metadata')
       
   582         already_flushed = set(e for e, in crs.fetchall())
       
   583         crs = self.sql('SELECT retype FROM cwmassive_initialized WHERE type = %(t)s',
       
   584                        {'t': 'etype'})
       
   585         all_etypes = set(e for e, in crs.fetchall())
       
   586         for etype in all_etypes:
       
   587             if etype not in already_flushed:
       
   588                 # Deals with meta data
       
   589                 self.logger.info('Flushing meta data for %s' % etype)
       
   590                 self.insert_massive_meta_data(etype)
       
   591                 sql = 'INSERT INTO cwmassive_metadata VALUES (%(e)s)'
       
   592                 self.sql(sql, {'e': etype})
       
   593 
       
   594     def _cleanup_entities(self, etype):
       
   595         """ Cleanup etype table """
       
   596         # Create indexes and constraints
       
   597         tablename = SQL_PREFIX + etype.lower()
       
   598         self.reapply_constraint_index(tablename)
       
   599 
       
   600     def _cleanup_relations(self, rtype):
       
   601         """ Cleanup rtype table """
       
   602         # Push into relation table while removing duplicate
       
   603         sql = '''INSERT INTO %(r)s_relation (eid_from, eid_to) SELECT DISTINCT
       
   604                  T.eid_from, T.eid_to FROM %(r)s_relation_tmp AS T
       
   605                  WHERE NOT EXISTS (SELECT 1 FROM %(r)s_relation AS TT WHERE
       
   606                  TT.eid_from=T.eid_from AND TT.eid_to=T.eid_to);''' % {'r': rtype}
       
   607         self.sql(sql)
       
   608         # Drop temporary relation table
       
   609         sql = ('DROP TABLE %(r)s_relation_tmp' % {'r': rtype.lower()})
       
   610         self.sql(sql)
       
   611         # Create indexes and constraints
       
   612         tablename = '%s_relation' % rtype.lower()
       
   613         self.reapply_constraint_index(tablename)
       
   614 
       
   615     def insert_massive_meta_data(self, etype):
       
   616         """ Massive insertion of meta data for a given etype, based on SQL statements.
       
   617         """
       
   618         # Push data - Use coalesce to avoid NULL (and get 0), if there is no
       
   619         # entities of this type in the entities table.
       
   620         # Meta data relations
       
   621         self.metagen_push_relation(etype, self._cnx.user.eid, 'created_by_relation')
       
   622         self.metagen_push_relation(etype, self._cnx.user.eid, 'owned_by_relation')
       
   623         self.metagen_push_relation(etype, self.source.eid, 'cw_source_relation')
       
   624         self.metagen_push_relation(etype, self._etype_eid_idx[etype], 'is_relation')
       
   625         self.metagen_push_relation(etype, self._etype_eid_idx[etype], 'is_instance_of_relation')
       
   626         sql = ("INSERT INTO entities (eid, type, asource, extid) "
       
   627                "SELECT cw_eid, '%s', 'system', NULL FROM cw_%s "
       
   628                "WHERE NOT EXISTS (SELECT 1 FROM entities WHERE eid=cw_eid)"
       
   629                % (etype, etype.lower()))
       
   630         self.sql(sql)
       
   631 
       
   632     def metagen_push_relation(self, etype, eid_to, rtype):
       
   633         sql = ("INSERT INTO %s (eid_from, eid_to) SELECT cw_eid, %s FROM cw_%s "
       
   634                "WHERE NOT EXISTS (SELECT 1 FROM entities WHERE eid=cw_eid)"
       
   635                % (rtype, eid_to, etype.lower()))
       
   636         self.sql(sql)
       
   637 
       
   638 
       
   639 ### CONSTRAINTS MANAGEMENT FUNCTIONS  ##########################################
       
   640 
       
   641 def get_size_constraints(schema):
       
   642     """analyzes yams ``schema`` and returns the list of size constraints.
       
   643 
       
   644     The returned value is a dictionary mapping entity types to a
       
   645     sub-dictionnaries mapping attribute names -> max size.
       
   646     """
       
   647     size_constraints = {}
       
   648     # iterates on all entity types
       
   649     for eschema in schema.entities():
       
   650         # for each entity type, iterates on attribute definitions
       
   651         size_constraints[eschema.type] = eschema_constraints = {}
       
   652         for rschema, aschema in eschema.attribute_definitions():
       
   653             # for each attribute, if a size constraint is found,
       
   654             # append it to the size constraint list
       
   655             maxsize = None
       
   656             rdef = rschema.rdef(eschema, aschema)
       
   657             for constraint in rdef.constraints:
       
   658                 if isinstance(constraint, SizeConstraint):
       
   659                     maxsize = constraint.max
       
   660                     eschema_constraints[rschema.type] = maxsize
       
   661     return size_constraints
       
   662 
       
   663 def get_default_values(schema):
       
   664     """analyzes yams ``schema`` and returns the list of default values.
       
   665 
       
   666     The returned value is a dictionary mapping entity types to a
       
   667     sub-dictionnaries mapping attribute names -> default values.
       
   668     """
       
   669     default_values = {}
       
   670     # iterates on all entity types
       
   671     for eschema in schema.entities():
       
   672         # for each entity type, iterates on attribute definitions
       
   673         default_values[eschema.type] = eschema_constraints = {}
       
   674         for rschema, _ in eschema.attribute_definitions():
       
   675             # for each attribute, if a size constraint is found,
       
   676             # append it to the size constraint list
       
   677             if eschema.default(rschema.type) is not None:
       
   678                 eschema_constraints[rschema.type] = eschema.default(rschema.type)
       
   679     return default_values
       
   680 
       
   681 
       
   682 class PGHelper(object):
       
   683     def __init__(self, cnx, pg_schema='public'):
       
   684         self.cnx = cnx
       
   685         # Deals with pg schema, see #3216686
       
   686         self.pg_schema = pg_schema
       
   687 
       
   688     def application_indexes_constraints(self, tablename):
       
   689         """ Get all the indexes/constraints for a given tablename """
       
   690         indexes = self.application_indexes(tablename)
       
   691         constraints = self.application_constraints(tablename)
       
   692         _indexes = {}
       
   693         for name, query in indexes.items():
       
   694             # Remove pkey indexes (automatically created by constraints)
       
   695             # Specific cases of primary key, see #3224079
       
   696             if name not in constraints:
       
   697                 _indexes[name] = query
       
   698         return _indexes, constraints
       
   699 
       
   700     def table_exists(self, table_name):
       
   701         sql = "SELECT * from information_schema.tables WHERE table_name=%(t)s AND table_schema=%(s)s"
       
   702         crs = self.cnx.system_sql(sql, {'t': table_name, 's': self.pg_schema})
       
   703         res = crs.fetchall()
       
   704         if res:
       
   705             return True
       
   706         return False
       
   707 
       
   708     # def check_if_primary_key_exists_for_table(self, table_name):
       
   709     #     sql = ("SELECT constraint_name FROM information_schema.table_constraints "
       
   710     #            "WHERE constraint_type = 'PRIMARY KEY' AND table_name=%(t)s AND table_schema=%(s)s")
       
   711     #     crs = self.cnx.system_sql(sql, {'t': table_name, 's': self.pg_schema})
       
   712     #     res = crs.fetchall()
       
   713     #     if res:
       
   714     #         return True
       
   715     #     return False
       
   716 
       
   717     def index_query(self, name):
       
   718         """Get the request to be used to recreate the index"""
       
   719         return self.cnx.system_sql("SELECT pg_get_indexdef(c.oid) "
       
   720                                    "from pg_catalog.pg_class c "
       
   721                                    "LEFT JOIN pg_catalog.pg_namespace n "
       
   722                                    "ON n.oid = c.relnamespace "
       
   723                                    "WHERE c.relname = %(r)s AND n.nspname=%(n)s",
       
   724                                    {'r': name, 'n': self.pg_schema}).fetchone()[0]
       
   725 
       
   726     def constraint_query(self, name):
       
   727         """Get the request to be used to recreate the constraint"""
       
   728         return self.cnx.system_sql("SELECT pg_get_constraintdef(c.oid) "
       
   729                                    "from pg_catalog.pg_constraint c "
       
   730                                    "LEFT JOIN pg_catalog.pg_namespace n "
       
   731                                    "ON n.oid = c.connamespace "
       
   732                                    "WHERE c.conname = %(r)s AND n.nspname=%(n)s",
       
   733                                    {'r': name, 'n': self.pg_schema}).fetchone()[0]
       
   734 
       
   735     def index_list(self, tablename):
       
   736         # This SQL query (cf http://www.postgresql.org/message-id/432F450F.4080700@squiz.net)
       
   737         # aims at getting all the indexes for each table.
       
   738         sql = '''SELECT c.relname as "Name"
       
   739         FROM pg_catalog.pg_class c
       
   740         JOIN pg_catalog.pg_index i ON i.indexrelid = c.oid
       
   741         JOIN pg_catalog.pg_class c2 ON i.indrelid = c2.oid
       
   742         LEFT JOIN pg_catalog.pg_user u ON u.usesysid = c.relowner
       
   743         LEFT JOIN pg_catalog.pg_namespace n ON n.oid = c.relnamespace
       
   744         WHERE c.relkind IN ('i','')
       
   745         AND c2.relname = '%s'
       
   746         AND i.indisprimary = FALSE
       
   747         AND n.nspname NOT IN ('pg_catalog', 'pg_toast')
       
   748         AND pg_catalog.pg_table_is_visible(c.oid);''' % tablename
       
   749         return self.cnx.system_sql(sql).fetchall()
       
   750 
       
   751     def application_indexes(self, tablename):
       
   752         """ Iterate over all the indexes """
       
   753         indexes_list = self.index_list(tablename)
       
   754         indexes = {}
       
   755         for name, in indexes_list:
       
   756             indexes[name] = self.index_query(name)
       
   757         return indexes
       
   758 
       
   759     def constraint_list(self, tablename):
       
   760         sql = '''SELECT i.conname as "Name"
       
   761                  FROM pg_catalog.pg_class c
       
   762                  JOIN pg_catalog.pg_constraint i ON i.conrelid = c.oid
       
   763                  JOIN pg_catalog.pg_class c2 ON i.conrelid=c2.oid
       
   764                  LEFT JOIN pg_catalog.pg_user u ON u.usesysid = c.relowner
       
   765                  LEFT JOIN pg_catalog.pg_namespace n ON n.oid = c.relnamespace
       
   766                  WHERE
       
   767                    c2.relname = '%s'
       
   768                    AND n.nspname NOT IN ('pg_catalog', 'pg_toast')
       
   769                    AND pg_catalog.pg_table_is_visible(c.oid)
       
   770                  ''' % tablename
       
   771         return self.cnx.system_sql(sql).fetchall()
       
   772 
       
   773     def application_constraints(self, tablename):
       
   774         """ Iterate over all the constraints """
       
   775         constraint_list = self.constraint_list(tablename)
       
   776         constraints = {}
       
   777         for name, in constraint_list:
       
   778             query = self.constraint_query(name)
       
   779             constraints[name] = 'ALTER TABLE %s ADD CONSTRAINT %s %s' % (tablename, name, query)
       
   780         return constraints