dataimport/massive_store.py
changeset 10853 de741492538d
child 10854 f437787d8849
equal deleted inserted replaced
10852:e35d23686d1f 10853:de741492538d
       
     1 # coding: utf-8
       
     2 # copyright 2015 LOGILAB S.A. (Paris, FRANCE), all rights reserved.
       
     3 # contact http://www.logilab.fr/ -- mailto:contact@logilab.fr
       
     4 #
       
     5 # This file is part of CubicWeb.
       
     6 #
       
     7 # CubicWeb is free software: you can redistribute it and/or modify it under the
       
     8 # terms of the GNU Lesser General Public License as published by the Free
       
     9 # Software Foundation, either version 2.1 of the License, or (at your option)
       
    10 # any later version.
       
    11 #
       
    12 # CubicWeb is distributed in the hope that it will be useful, but WITHOUT ANY
       
    13 # WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
       
    14 # A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for more
       
    15 # details.
       
    16 #
       
    17 # You should have received a copy of the GNU Lesser General Public License along
       
    18 # with CubicWeb.  If not, see <http://www.gnu.org/licenses/>.
       
    19 
       
    20 import logging
       
    21 from datetime import datetime
       
    22 from collections import defaultdict
       
    23 from StringIO import StringIO
       
    24 
       
    25 from yams.constraints import SizeConstraint
       
    26 
       
    27 from psycopg2 import ProgrammingError
       
    28 
       
    29 from cubicweb.dataimport import stores, pgstore
       
    30 from cubicweb.utils import make_uid
       
    31 from cubicweb.server.sqlutils import SQL_PREFIX
       
    32 
       
    33 
       
    34 class MassiveObjectStore(stores.RQLObjectStore):
       
    35     """
       
    36     Store for massive import of data, with delayed insertion of meta data.
       
    37 
       
    38     WARNINGS:
       
    39    - This store may be only used with PostgreSQL for now, as it relies
       
    40      on the COPY FROM method, and on specific PostgreSQL tables to get all
       
    41      the indexes.
       
    42    - This store can only insert relations that are not inlined (i.e.,
       
    43      which do *not* have inlined=True in their definition in the schema).
       
    44 
       
    45 
       
    46    It should be used as follows:
       
    47 
       
    48        store = MassiveObjectStore(cnx)
       
    49        store.init_rtype_table('Person', 'lives_in', 'Location')
       
    50        ...
       
    51 
       
    52        store.create_entity('Person', subj_iid_attribute=person_iid, ...)
       
    53        store.create_entity('Location', obj_iid_attribute=location_iid, ...)
       
    54        ...
       
    55 
       
    56        # subj_iid_attribute and obj_iid_attribute are argument names
       
    57        # chosen by the user (e.g. "cwuri"). These names can be identical.
       
    58        # person_iid and location_iid are unique IDs and depend on the data
       
    59        # (e.g URI).
       
    60        store.flush()
       
    61        store.relate_by_iid(person_iid, 'lives_in', location_iid)
       
    62        # For example:
       
    63        store.create_entity('Person',
       
    64                            cwuri='http://dbpedia.org/toto',
       
    65                            name='Toto')
       
    66        store.create_entity('Location',
       
    67                            uri='http://geonames.org/11111',
       
    68                            name='Somewhere')
       
    69        store.flush()
       
    70        store.relate_by_iid('http://dbpedia.org/toto',
       
    71                        'lives_in',
       
    72                        'http://geonames.org/11111')
       
    73        # Finally
       
    74        store.flush_meta_data()
       
    75        store.convert_relations('Person', 'lives_in', 'Location',
       
    76                                'subj_iid_attribute', 'obj_iid_attribute')
       
    77        # For the previous example:
       
    78        store.convert_relations('Person', 'lives_in', 'Location', 'cwuri', 'uri')
       
    79        ...
       
    80        store.cleanup()
       
    81     """
       
    82 
       
    83     def __init__(self, cnx, autoflush_metadata=True,
       
    84                  replace_sep='', commit_at_flush=True,
       
    85                  drop_index=True,
       
    86                  pg_schema='public',
       
    87                  iid_maxsize=1024, uri_param_name='rdf:about',
       
    88                  eids_seq_range=10000, eids_seq_start=None,
       
    89                  on_commit_callback=None, on_rollback_callback=None,
       
    90                  slave_mode=False, build_entities=False,
       
    91                  source=None):
       
    92         """ Create a MassiveObject store, with the following attributes:
       
    93 
       
    94         - cnx: CubicWeb cnx
       
    95         - autoflush_metadata: Boolean.
       
    96                               Automatically flush the metadata after
       
    97                               each flush()
       
    98         - replace_sep: String. Replace separator used for
       
    99                        (COPY FROM) buffer creation.
       
   100         - commit_at_flush: Boolean. Commit after each flush().
       
   101         - drop_index: Boolean. Drop SQL index before COPY FROM
       
   102         - eids_seq_range: Int. Range of the eids_seq_range to be fetched each time
       
   103                                by the store (default is 10000).
       
   104                                If None, the sequence eids is attached to each entity tables
       
   105                                (backward compatibility with the 0.2.0).
       
   106         - eids_seq_start: Int. Set the eids sequence value (if None, nothing is done).
       
   107         - iid_maxsize: Int. Max size of the iid, used to create the
       
   108                     iid_eid convertion table.
       
   109         - uri_param_name: String. If given, will use this parameter to get cw_uri
       
   110                           for entities.
       
   111         - build_entities: Boolean. If True, create_entity returns a CW etype object
       
   112           (but WITHOUT eid !).
       
   113         """
       
   114         super(MassiveObjectStore, self).__init__(cnx)
       
   115         self.logger = logging.getLogger('dataio.relationmixin')
       
   116         self._cnx = cnx
       
   117         self.sql = cnx.system_sql
       
   118         self.iid_maxsize = iid_maxsize
       
   119         self.replace_sep = replace_sep
       
   120         self.commit_at_flush = commit_at_flush
       
   121         self._data_uri_relations = defaultdict(list)
       
   122         self._initialized = {'init_uri_eid': set(),
       
   123                              'uri_eid_inserted': set(),
       
   124                              'uri_rtypes': set(),
       
   125                              'entities': set(),
       
   126                              'rtypes': set(),
       
   127                             }
       
   128         self.sql = self._cnx.system_sql
       
   129         self.logger = logging.getLogger('dataio.massiveimport')
       
   130         self.autoflush_metadata = autoflush_metadata
       
   131         self.replace_sep = replace_sep
       
   132         self.drop_index = drop_index
       
   133         self.slave_mode = slave_mode
       
   134         self.size_constraints = get_size_constraints(cnx.vreg.schema)
       
   135         self.default_values = get_default_values(cnx.vreg.schema)
       
   136         self._dbh = PGHelper(self._cnx, pg_schema or 'public')
       
   137         self._build_entities = build_entities
       
   138         self._data_entities = defaultdict(list)
       
   139         self._data_relations = defaultdict(list)
       
   140         self._now = datetime.now()
       
   141         self._default_cwuri = make_uid('_auto_generated')
       
   142         self.uri_param_name = uri_param_name
       
   143         self._count_cwuri = 0
       
   144         self.commit_at_flush = commit_at_flush
       
   145         self.on_commit_callback = on_commit_callback
       
   146         self.on_rollback_callback = on_rollback_callback
       
   147         # Initialized the meta tables of dataio for warm restart
       
   148         self._init_dataio_metatables()
       
   149         # Internal markers of initialization
       
   150         self._eids_seq_range = eids_seq_range
       
   151         self._eids_seq_start = eids_seq_start
       
   152         if self._eids_seq_start is not None:
       
   153             self._cnx.system_sql(self._cnx.repo.system_source.dbhelper.sql_restart_numrange(
       
   154                 'entities_id_seq', initial_value=self._eids_seq_start + 1))
       
   155             cnx.commit()
       
   156         self.get_next_eid = self._get_eid_gen().next
       
   157         # recreate then when self.cleanup() is called
       
   158         if not self.slave_mode and self.drop_index:
       
   159             self._drop_metatables_constraints()
       
   160         if source is None:
       
   161             source = cnx.repo.system_source
       
   162         self.source = source
       
   163         self._etype_eid_idx = dict(cnx.execute('Any XN,X WHERE X is CWEType, X name XN'))
       
   164         cnx.read_security = False
       
   165         cnx.write_security = False
       
   166 
       
   167     ### INIT FUNCTIONS ########################################################
       
   168 
       
   169     def init_rtype_table(self, etype_from, rtype, etype_to):
       
   170         """ Build temporary table a for standard rtype """
       
   171         # Create an uri_eid table for each etype for a better
       
   172         # control of which etype is concerns for a particular
       
   173         # possibly multivalued relation.
       
   174         for etype in (etype_from, etype_to):
       
   175             if etype and etype not in self._initialized['init_uri_eid']:
       
   176                 self._init_uri_eid_table(etype)
       
   177         if rtype not in self._initialized['uri_rtypes']:
       
   178             # Create the temporary tables
       
   179             if not self._cnx.repo.schema.rschema(rtype).inlined:
       
   180                 try:
       
   181                     sql = 'CREATE TABLE %(r)s_relation_iid_tmp (uri_from character ' \
       
   182                           'varying(%(s)s), uri_to character varying(%(s)s))'
       
   183                     self.sql(sql % {'r': rtype, 's': self.iid_maxsize})
       
   184                 except ProgrammingError:
       
   185                     # XXX Already exist (probably due to multiple import)
       
   186                     pass
       
   187             else:
       
   188                 self.logger.warning("inlined relation %s: cannot insert it", rtype)
       
   189             #Add it to the initialized set
       
   190             self._initialized['uri_rtypes'].add(rtype)
       
   191 
       
   192     def _init_uri_eid_table(self, etype):
       
   193         """ Build a temporary table for id/eid convertion
       
   194         """
       
   195         try:
       
   196             sql = "CREATE TABLE uri_eid_%(e)s (uri character varying(%(size)s), eid integer)"
       
   197             self.sql(sql % {'e': etype.lower(), 'size': self.iid_maxsize,})
       
   198         except ProgrammingError:
       
   199             # XXX Already exist (probably due to multiple import)
       
   200             pass
       
   201         # Add it to the initialized set
       
   202         self._initialized['init_uri_eid'].add(etype)
       
   203 
       
   204     def _init_dataio_metatables(self):
       
   205         """ Initialized the meta tables of dataio for warm restart
       
   206         """
       
   207         # Check if dataio tables are not already created (i.e. a restart)
       
   208         self._initialized_table_created = self._dbh.table_exists('dataio_initialized')
       
   209         self._constraint_table_created = self._dbh.table_exists('dataio_constraints')
       
   210         self._metadata_table_created = self._dbh.table_exists('dataio_metadata')
       
   211 
       
   212     ### RELATE FUNCTION #######################################################
       
   213 
       
   214     def relate_by_iid(self, iid_from, rtype, iid_to):
       
   215         """Add new relation based on the internal id (iid)
       
   216         of the entities (not the eid)"""
       
   217         # Push data
       
   218         if isinstance(iid_from, unicode):
       
   219             iid_from = iid_from.encode('utf-8')
       
   220         if isinstance(iid_to, unicode):
       
   221             iid_to = iid_to.encode('utf-8')
       
   222         self._data_uri_relations[rtype].append({'uri_from': iid_from, 'uri_to': iid_to})
       
   223 
       
   224     ### FLUSH FUNCTIONS #######################################################
       
   225 
       
   226     def flush_relations(self):
       
   227         """ Flush the relations data
       
   228         """
       
   229         for rtype, data in self._data_uri_relations.iteritems():
       
   230             if not data:
       
   231                 self.logger.info('No data for rtype %s', rtype)
       
   232             buf = StringIO('\n'.join(['%(uri_from)s\t%(uri_to)s' % d for d in data]))
       
   233             if not buf:
       
   234                 self.logger.info('Empty Buffer for rtype %s', rtype)
       
   235                 continue
       
   236             cursor = self._cnx.cnxset.cu
       
   237             if not self._cnx.repo.schema.rschema(rtype).inlined:
       
   238                 cursor.copy_from(buf, '%s_relation_iid_tmp' % rtype.lower(),
       
   239                                  null='NULL', columns=('uri_from', 'uri_to'))
       
   240             else:
       
   241                 self.logger.warning("inlined relation %s: cannot insert it", rtype)
       
   242             buf.close()
       
   243             # Clear data cache
       
   244             self._data_uri_relations[rtype] = []
       
   245             # Commit if asked
       
   246             if self.commit_at_flush:
       
   247                 self.commit()
       
   248 
       
   249     def fill_uri_eid_table(self, etype, uri_label):
       
   250         """ Fill the uri_eid table
       
   251         """
       
   252         self.logger.info('Fill uri_eid for etype %s', etype)
       
   253         sql = 'INSERT INTO uri_eid_%(e)s SELECT cw_%(l)s, cw_eid FROM cw_%(e)s'
       
   254         self.sql(sql % {'l': uri_label, 'e': etype.lower()})
       
   255         # Add indexes
       
   256         self.sql('CREATE INDEX uri_eid_%(e)s_idx ON uri_eid_%(e)s' '(uri)' % {'e': etype.lower()})
       
   257         # Set the etype as converted
       
   258         self._initialized['uri_eid_inserted'].add(etype)
       
   259         self.commit()
       
   260 
       
   261     def convert_relations(self, etype_from, rtype, etype_to,
       
   262                           uri_label_from='cwuri', uri_label_to='cwuri'):
       
   263         """ Flush the converted relations
       
   264         """
       
   265         # Always flush relations to be sure
       
   266         self.logger.info('Convert relations %s %s %s', etype_from, rtype, etype_to)
       
   267         self.flush_relations()
       
   268         if uri_label_from and etype_from not in self._initialized['uri_eid_inserted']:
       
   269             self.fill_uri_eid_table(etype_from, uri_label_from)
       
   270         if uri_label_to and etype_to not in self._initialized['uri_eid_inserted']:
       
   271             self.fill_uri_eid_table(etype_to, uri_label_to)
       
   272         if self._cnx.repo.schema.rschema(rtype).inlined:
       
   273             self.logger.warning("Can't insert inlined relation %s", rtype)
       
   274             return
       
   275         if uri_label_from and uri_label_to:
       
   276             sql = '''INSERT INTO %(r)s_relation (eid_from, eid_to) SELECT DISTINCT O1.eid, O2.eid
       
   277             FROM %(r)s_relation_iid_tmp AS T, uri_eid_%(ef)s as O1, uri_eid_%(et)s as O2
       
   278             WHERE O1.uri=T.uri_from AND O2.uri=T.uri_to AND NOT EXISTS (
       
   279             SELECT 1 FROM %(r)s_relation AS TT WHERE TT.eid_from=O1.eid AND TT.eid_to=O2.eid);
       
   280             '''
       
   281         elif uri_label_to:
       
   282             sql = '''INSERT INTO %(r)s_relation (eid_from, eid_to) SELECT DISTINCT
       
   283             CAST(T.uri_from AS INTEGER), O1.eid
       
   284             FROM %(r)s_relation_iid_tmp AS T, uri_eid_%(et)s as O1
       
   285             WHERE O1.uri=T.uri_to AND NOT EXISTS (
       
   286             SELECT 1 FROM %(r)s_relation AS TT WHERE
       
   287             TT.eid_from=CAST(T.uri_from AS INTEGER) AND TT.eid_to=O1.eid);
       
   288             '''
       
   289         elif uri_label_from:
       
   290             sql = '''INSERT INTO %(r)s_relation (eid_from, eid_to) SELECT DISTINCT O1.eid, T.uri_to
       
   291             O1.eid, CAST(T.uri_to AS INTEGER)
       
   292             FROM %(r)s_relation_iid_tmp AS T, uri_eid_%(ef)s as O1
       
   293             WHERE O1.uri=T.uri_from AND NOT EXISTS (
       
   294             SELECT 1 FROM %(r)s_relation AS TT WHERE
       
   295             TT.eid_from=O1.eid AND TT.eid_to=CAST(T.uri_to AS INTEGER));
       
   296             '''
       
   297         try:
       
   298             self.sql(sql % {'r': rtype.lower(),
       
   299                             'et': etype_to.lower() if etype_to else u'',
       
   300                             'ef': etype_from.lower() if etype_from else u''})
       
   301         except Exception as ex:
       
   302             self.logger.error("Can't insert relation %s: %s", rtype, ex)
       
   303         self.commit()
       
   304 
       
   305     ### SQL UTILITIES #########################################################
       
   306 
       
   307     def drop_and_store_indexes_constraints(self, tablename):
       
   308         # Drop indexes and constraints
       
   309         if not self._constraint_table_created:
       
   310             # Create a table to save the constraints
       
   311             # Allow reload even after crash
       
   312             sql = "CREATE TABLE dataio_constraints (origtable text, query text, type varchar(256))"
       
   313             self.sql(sql)
       
   314             self._constraint_table_created = True
       
   315         self._drop_table_constraints_indexes(tablename)
       
   316 
       
   317     def _drop_table_constraints_indexes(self, tablename):
       
   318         """ Drop and store table constraints and indexes """
       
   319         indexes, constraints = self._dbh.application_indexes_constraints(tablename)
       
   320         for name, query in constraints.iteritems():
       
   321             sql = 'INSERT INTO dataio_constraints VALUES (%(e)s, %(c)s, %(t)s)'
       
   322             self.sql(sql, {'e': tablename, 'c': query, 't': 'constraint'})
       
   323             sql = 'ALTER TABLE %s DROP CONSTRAINT %s CASCADE' % (tablename, name)
       
   324             self.sql(sql)
       
   325         for name, query in indexes.iteritems():
       
   326             sql = 'INSERT INTO dataio_constraints VALUES (%(e)s, %(c)s, %(t)s)'
       
   327             self.sql(sql, {'e': tablename, 'c': query, 't': 'index'})
       
   328             sql = 'DROP INDEX %s' % name
       
   329             self.sql(sql)
       
   330 
       
   331     def reapply_constraint_index(self, tablename):
       
   332         if not self._dbh.table_exists('dataio_constraints'):
       
   333             self.logger.info('The table dataio_constraints does not exist '
       
   334                              '(keep_index option should be True)')
       
   335             return
       
   336         sql = 'SELECT query FROM dataio_constraints WHERE origtable = %(e)s'
       
   337         crs = self.sql(sql, {'e': tablename})
       
   338         for query, in crs.fetchall():
       
   339             self.sql(query)
       
   340             self.sql('DELETE FROM dataio_constraints WHERE origtable = %(e)s '
       
   341                      'AND query = %(q)s', {'e': tablename, 'q': query})
       
   342 
       
   343     def _drop_metatables_constraints(self):
       
   344         """ Drop all the constraints for the meta data"""
       
   345         for tablename in ('created_by_relation', 'owned_by_relation',
       
   346                           'is_instance_of_relation', 'identity_relation',
       
   347                           'entities'):
       
   348             self.drop_and_store_indexes_constraints(tablename)
       
   349 
       
   350     def _create_metatables_constraints(self):
       
   351         """ Create all the constraints for the meta data"""
       
   352         for tablename in ('entities',
       
   353                           'created_by_relation', 'owned_by_relation',
       
   354                           'is_instance_of_relation', 'identity_relation'):
       
   355             # Indexes and constraints
       
   356             if self.drop_index:
       
   357                 self.reapply_constraint_index(tablename)
       
   358 
       
   359     def init_relation_table(self, rtype):
       
   360         """ Get and remove all indexes for performance sake """
       
   361         # Create temporary table
       
   362         if not self.slave_mode and rtype not in self._initialized['rtypes']:
       
   363             sql = "CREATE TABLE %s_relation_tmp (eid_from integer, eid_to integer)" % rtype.lower()
       
   364             self.sql(sql)
       
   365             if self.drop_index:
       
   366                 # Drop indexes and constraints
       
   367                 tablename = '%s_relation' % rtype.lower()
       
   368                 self.drop_and_store_indexes_constraints(tablename)
       
   369             # Push the etype in the initialized table for easier restart
       
   370             self.init_create_initialized_table()
       
   371             sql = 'INSERT INTO dataio_initialized VALUES (%(e)s, %(t)s)'
       
   372             self.sql(sql, {'e': rtype, 't': 'rtype'})
       
   373             # Mark rtype as "initialized" for faster check
       
   374             self._initialized['rtypes'].add(rtype)
       
   375 
       
   376     def init_create_initialized_table(self):
       
   377         """ Create the dataio initialized table
       
   378         """
       
   379         if not self._initialized_table_created:
       
   380             sql = "CREATE TABLE dataio_initialized (retype text, type varchar(128))"
       
   381             self.sql(sql)
       
   382             self._initialized_table_created = True
       
   383 
       
   384     def init_etype_table(self, etype):
       
   385         """ Add eid sequence to a particular etype table and
       
   386         remove all indexes for performance sake """
       
   387         if etype not in self._initialized['entities']:
       
   388             # Only for non-initialized etype and not slave mode store
       
   389             if not self.slave_mode:
       
   390                 if self._eids_seq_range is None:
       
   391                     # Eids are directly set by the entities_id_seq.
       
   392                     # We attach this sequence to all the created etypes.
       
   393                     sql = ("ALTER TABLE cw_%s ALTER COLUMN cw_eid "
       
   394                            "SET DEFAULT nextval('entities_id_seq')" % etype.lower())
       
   395                     self.sql(sql)
       
   396                 if self.drop_index:
       
   397                     # Drop indexes and constraints
       
   398                     tablename = 'cw_%s' % etype.lower()
       
   399                     self.drop_and_store_indexes_constraints(tablename)
       
   400                 # Push the etype in the initialized table for easier restart
       
   401                 self.init_create_initialized_table()
       
   402                 sql = 'INSERT INTO dataio_initialized VALUES (%(e)s, %(t)s)'
       
   403                 self.sql(sql, {'e': etype, 't': 'etype'})
       
   404             # Mark etype as "initialized" for faster check
       
   405             self._initialized['entities'].add(etype)
       
   406 
       
   407 
       
   408     ### ENTITIES CREATION #####################################################
       
   409 
       
   410     def _get_eid_gen(self):
       
   411         """ Function getting the next eid. This is done by preselecting
       
   412         a given number of eids from the 'entities_id_seq', and then
       
   413         storing them"""
       
   414         while True:
       
   415             last_eid = self._cnx.repo.system_source.create_eid(self._cnx, self._eids_seq_range)
       
   416             for eid in xrange(last_eid - self._eids_seq_range + 1, last_eid + 1):
       
   417                 yield eid
       
   418 
       
   419     def apply_size_constraints(self, etype, kwargs):
       
   420         """ Apply the size constraints for a given etype, attribute and value
       
   421         """
       
   422         size_constraints = self.size_constraints[etype]
       
   423         for attr, value in kwargs.items():
       
   424             if value:
       
   425                 maxsize = size_constraints.get(attr)
       
   426                 if maxsize is not None and len(value) > maxsize:
       
   427                     kwargs[attr] = value[:maxsize-4] + '...'
       
   428         return kwargs
       
   429 
       
   430     def apply_default_values(self, etype, kwargs):
       
   431         """ Apply the default values for a given etype, attribute and value
       
   432         """
       
   433         default_values = self.default_values[etype]
       
   434         missing_keys = set(default_values) - set(kwargs)
       
   435         kwargs.update((key, default_values[key]) for key in missing_keys)
       
   436         return kwargs
       
   437 
       
   438     def create_entity(self, etype, **kwargs):
       
   439         """ Create an entity
       
   440         """
       
   441         # Init the table if necessary
       
   442         self.init_etype_table(etype)
       
   443         # Add meta data if not given
       
   444         if 'modification_date' not in kwargs:
       
   445             kwargs['modification_date'] = self._now
       
   446         if 'creation_date' not in kwargs:
       
   447             kwargs['creation_date'] = self._now
       
   448         if 'cwuri' not in kwargs:
       
   449             if self.uri_param_name and self.uri_param_name in kwargs:
       
   450                 kwargs['cwuri'] = kwargs[self.uri_param_name]
       
   451             else:
       
   452                 kwargs['cwuri'] = self._default_cwuri + str(self._count_cwuri)
       
   453                 self._count_cwuri += 1
       
   454         if 'eid' not in kwargs and self._eids_seq_range is not None:
       
   455             # If eid is not given and the eids sequence is set,
       
   456             # use the value from the sequence
       
   457             kwargs['eid'] = self.get_next_eid()
       
   458         # Check size constraints
       
   459         kwargs = self.apply_size_constraints(etype, kwargs)
       
   460         # Apply default values
       
   461         kwargs = self.apply_default_values(etype, kwargs)
       
   462         # Push data / Return entity
       
   463         self._data_entities[etype].append(kwargs)
       
   464         entity = self._cnx.vreg['etypes'].etype_class(etype)(self._cnx)
       
   465         entity.cw_attr_cache.update(kwargs)
       
   466         if 'eid' in kwargs:
       
   467             entity.eid = kwargs['eid']
       
   468         return entity
       
   469 
       
   470     ### RELATIONS CREATION ####################################################
       
   471 
       
   472     def relate(self, subj_eid, rtype, obj_eid, *args, **kwargs):
       
   473         """ Compatibility with other stores
       
   474         """
       
   475         # Init the table if necessary
       
   476         self.init_relation_table(rtype)
       
   477         self._data_relations[rtype].append({'eid_from': subj_eid, 'eid_to': obj_eid})
       
   478 
       
   479 
       
   480     ### FLUSH #################################################################
       
   481 
       
   482     def on_commit(self):
       
   483         if self.on_commit_callback:
       
   484             self.on_commit_callback()
       
   485 
       
   486     def on_rollback(self, exc, etype, data):
       
   487         if self.on_rollback_callback:
       
   488             self.on_rollback_callback(exc, etype, data)
       
   489             self._cnx.rollback()
       
   490         else:
       
   491             raise exc
       
   492 
       
   493     def commit(self):
       
   494         self.on_commit()
       
   495         super(MassiveObjectStore, self).commit()
       
   496 
       
   497     def flush(self):
       
   498         """ Flush the data
       
   499         """
       
   500         self.flush_entities()
       
   501         self.flush_internal_relations()
       
   502         self.flush_relations()
       
   503 
       
   504     def flush_internal_relations(self):
       
   505         """ Flush the relations data
       
   506         """
       
   507         for rtype, data in self._data_relations.iteritems():
       
   508             if not data:
       
   509                 # There is no data for these etype for this flush round.
       
   510                 continue
       
   511             buf = pgstore._create_copyfrom_buffer(data, ('eid_from', 'eid_to'),
       
   512                                                   replace_sep=self.replace_sep)
       
   513             if not buf:
       
   514                 # The buffer is empty. This is probably due to error in _create_copyfrom_buffer
       
   515                 raise ValueError
       
   516             cursor = self._cnx.cnxset.cu
       
   517             # Push into the tmp table
       
   518             cursor.copy_from(buf, '%s_relation_tmp' % rtype.lower(),
       
   519                              null='NULL', columns=('eid_from', 'eid_to'))
       
   520             # Clear data cache
       
   521             self._data_relations[rtype] = []
       
   522             # Commit if asked
       
   523             if self.commit_at_flush:
       
   524                 self.commit()
       
   525 
       
   526     def flush_entities(self):
       
   527         """ Flush the entities data
       
   528         """
       
   529         for etype, data in self._data_entities.iteritems():
       
   530             if not data:
       
   531                 # There is no data for these etype for this flush round.
       
   532                 continue
       
   533             # XXX It may be interresting to directly infer the columns'
       
   534             # names from the schema instead of using .keys()
       
   535             columns = data[0].keys()
       
   536             # XXX For now, the _create_copyfrom_buffer does a "row[column]"
       
   537             # which can lead to a key error.
       
   538             # Thus we should create dictionary with all the keys.
       
   539             columns = set()
       
   540             for d in data:
       
   541                 columns.update(d.keys())
       
   542             _data = []
       
   543             _base_data = dict.fromkeys(columns)
       
   544             for d in data:
       
   545                 _d = _base_data.copy()
       
   546                 _d.update(d)
       
   547                 _data.append(_d)
       
   548             buf = pgstore._create_copyfrom_buffer(_data, columns,
       
   549                                                   replace_sep=self.replace_sep)
       
   550             if not buf:
       
   551                 # The buffer is empty. This is probably due to error in _create_copyfrom_buffer
       
   552                 raise ValueError('Error in buffer creation for etype %s' % etype)
       
   553             columns = ['cw_%s' % attr for attr in columns]
       
   554             cursor = self._cnx.cnxset.cu
       
   555             try:
       
   556                 cursor.copy_from(buf, 'cw_%s' % etype.lower(), null='NULL', columns=columns)
       
   557             except Exception as exc:
       
   558                 self.on_rollback(exc, etype, data)
       
   559             # Clear data cache
       
   560             self._data_entities[etype] = []
       
   561         if self.autoflush_metadata:
       
   562             self.flush_meta_data()
       
   563         # Commit if asked
       
   564         if self.commit_at_flush:
       
   565             self.commit()
       
   566 
       
   567     def flush_meta_data(self):
       
   568         """ Flush the meta data (entities table, is_instance table, ...)
       
   569         """
       
   570         if self.slave_mode:
       
   571             raise RuntimeError('Flushing meta data is not allow in slave mode')
       
   572         if not self._dbh.table_exists('dataio_initialized'):
       
   573             self.logger.info('No information available for initialized etypes/rtypes')
       
   574             return
       
   575         if not self._metadata_table_created:
       
   576             # Keep the correctly flush meta data in database
       
   577             sql = "CREATE TABLE dataio_metadata (etype text)"
       
   578             self.sql(sql)
       
   579             self._metadata_table_created = True
       
   580         crs = self.sql('SELECT etype FROM dataio_metadata')
       
   581         already_flushed = set(e for e, in crs.fetchall())
       
   582         crs = self.sql('SELECT retype FROM dataio_initialized WHERE type = %(t)s',
       
   583                        {'t': 'etype'})
       
   584         all_etypes = set(e for e, in crs.fetchall())
       
   585         for etype in all_etypes:
       
   586             if etype not in already_flushed:
       
   587                 # Deals with meta data
       
   588                 self.logger.info('Flushing meta data for %s' % etype)
       
   589                 self.insert_massive_meta_data(etype)
       
   590                 sql = 'INSERT INTO dataio_metadata VALUES (%(e)s)'
       
   591                 self.sql(sql, {'e': etype})
       
   592         # Final commit
       
   593         self.commit()
       
   594 
       
   595     def _cleanup_entities(self, etype):
       
   596         """ Cleanup etype table """
       
   597         if self._eids_seq_range is None:
       
   598             # Remove DEFAULT eids sequence if added
       
   599             sql = 'ALTER TABLE cw_%s ALTER COLUMN cw_eid DROP DEFAULT;' % etype.lower()
       
   600             self.sql(sql)
       
   601         # Create indexes and constraints
       
   602         if self.drop_index:
       
   603             tablename = SQL_PREFIX + etype.lower()
       
   604             self.reapply_constraint_index(tablename)
       
   605 
       
   606     def _cleanup_relations(self, rtype):
       
   607         """ Cleanup rtype table """
       
   608         # Push into relation table while removing duplicate
       
   609         sql = '''INSERT INTO %(r)s_relation (eid_from, eid_to) SELECT DISTINCT
       
   610                  T.eid_from, T.eid_to FROM %(r)s_relation_tmp AS T
       
   611                  WHERE NOT EXISTS (SELECT 1 FROM %(r)s_relation AS TT WHERE
       
   612                  TT.eid_from=T.eid_from AND TT.eid_to=T.eid_to);''' % {'r': rtype}
       
   613         self.sql(sql)
       
   614         # Drop temporary relation table
       
   615         sql = ('DROP TABLE %(r)s_relation_tmp' % {'r': rtype.lower()})
       
   616         self.sql(sql)
       
   617         # Create indexes and constraints
       
   618         if self.drop_index:
       
   619             tablename = '%s_relation' % rtype.lower()
       
   620             self.reapply_constraint_index(tablename)
       
   621 
       
   622     def cleanup(self):
       
   623         """ Remove temporary tables and columns
       
   624         """
       
   625         self.logger.info("Start cleaning")
       
   626         if self.slave_mode:
       
   627             raise RuntimeError('Store cleanup is not allowed in slave mode')
       
   628         self.logger.info("Start cleaning")
       
   629         # Cleanup relations tables
       
   630         for etype in self._initialized['init_uri_eid']:
       
   631             self.sql('DROP TABLE uri_eid_%s' % etype.lower())
       
   632         # Remove relations tables
       
   633         for rtype in self._initialized['uri_rtypes']:
       
   634             if not self._cnx.repo.schema.rschema(rtype).inlined:
       
   635                 self.sql('DROP TABLE %(r)s_relation_iid_tmp' % {'r': rtype})
       
   636             else:
       
   637                 self.logger.warning("inlined relation %s: no cleanup to be done for it" % rtype)
       
   638         self.commit()
       
   639         # Get all the initialized etypes/rtypes
       
   640         if self._dbh.table_exists('dataio_initialized'):
       
   641             crs = self.sql('SELECT retype, type FROM dataio_initialized')
       
   642             for retype, _type in crs.fetchall():
       
   643                 self.logger.info('Cleanup for %s' % retype)
       
   644                 if _type == 'etype':
       
   645                     # Cleanup entities tables - Recreate indexes
       
   646                     self._cleanup_entities(retype)
       
   647                 elif _type == 'rtype':
       
   648                     # Cleanup relations tables
       
   649                     self._cleanup_relations(retype)
       
   650                 self.sql('DELETE FROM dataio_initialized WHERE retype = %(e)s',
       
   651                          {'e': retype})
       
   652         # Create meta constraints (entities, is_instance_of, ...)
       
   653         self._create_metatables_constraints()
       
   654         self.commit()
       
   655         # Delete the meta data table
       
   656         for table_name in ('dataio_initialized', 'dataio_constraints', 'dataio_metadata'):
       
   657             if self._dbh.table_exists(table_name):
       
   658                 self.sql('DROP TABLE %s' % table_name)
       
   659         self.commit()
       
   660 
       
   661     def insert_massive_meta_data(self, etype):
       
   662         """ Massive insertion of meta data for a given etype, based on SQL statements.
       
   663         """
       
   664         # Push data - Use coalesce to avoid NULL (and get 0), if there is no
       
   665         # entities of this type in the entities table.
       
   666         # Meta data relations
       
   667         self.metagen_push_relation(etype, self._cnx.user.eid, 'created_by_relation')
       
   668         self.metagen_push_relation(etype, self._cnx.user.eid, 'owned_by_relation')
       
   669         self.metagen_push_relation(etype, self.source.eid, 'cw_source_relation')
       
   670         self.metagen_push_relation(etype, self._etype_eid_idx[etype], 'is_relation')
       
   671         self.metagen_push_relation(etype, self._etype_eid_idx[etype], 'is_instance_of_relation')
       
   672         sql = ("INSERT INTO entities (eid, type, asource, extid) "
       
   673                "SELECT cw_eid, '%s', 'system', NULL FROM cw_%s "
       
   674                "WHERE NOT EXISTS (SELECT 1 FROM entities WHERE eid=cw_eid)"
       
   675                % (etype, etype.lower()))
       
   676         self.sql(sql)
       
   677 
       
   678     def metagen_push_relation(self, etype, eid_to, rtype):
       
   679         sql = ("INSERT INTO %s (eid_from, eid_to) SELECT cw_eid, %s FROM cw_%s "
       
   680                "WHERE NOT EXISTS (SELECT 1 FROM entities WHERE eid=cw_eid)"
       
   681                % (rtype, eid_to, etype.lower()))
       
   682         self.sql(sql)
       
   683 
       
   684 
       
   685 ### CONSTRAINTS MANAGEMENT FUNCTIONS  ##########################################
       
   686 
       
   687 def get_size_constraints(schema):
       
   688     """analyzes yams ``schema`` and returns the list of size constraints.
       
   689 
       
   690     The returned value is a dictionary mapping entity types to a
       
   691     sub-dictionnaries mapping attribute names -> max size.
       
   692     """
       
   693     size_constraints = {}
       
   694     # iterates on all entity types
       
   695     for eschema in schema.entities():
       
   696         # for each entity type, iterates on attribute definitions
       
   697         size_constraints[eschema.type] = eschema_constraints = {}
       
   698         for rschema, aschema in eschema.attribute_definitions():
       
   699             # for each attribute, if a size constraint is found,
       
   700             # append it to the size constraint list
       
   701             maxsize = None
       
   702             rdef = rschema.rdef(eschema, aschema)
       
   703             for constraint in rdef.constraints:
       
   704                 if isinstance(constraint, SizeConstraint):
       
   705                     maxsize = constraint.max
       
   706                     eschema_constraints[rschema.type] = maxsize
       
   707     return size_constraints
       
   708 
       
   709 def get_default_values(schema):
       
   710     """analyzes yams ``schema`` and returns the list of default values.
       
   711 
       
   712     The returned value is a dictionary mapping entity types to a
       
   713     sub-dictionnaries mapping attribute names -> default values.
       
   714     """
       
   715     default_values = {}
       
   716     # iterates on all entity types
       
   717     for eschema in schema.entities():
       
   718         # for each entity type, iterates on attribute definitions
       
   719         default_values[eschema.type] = eschema_constraints = {}
       
   720         for rschema, _ in eschema.attribute_definitions():
       
   721             # for each attribute, if a size constraint is found,
       
   722             # append it to the size constraint list
       
   723             if eschema.default(rschema.type) is not None:
       
   724                 eschema_constraints[rschema.type] = eschema.default(rschema.type)
       
   725     return default_values
       
   726 
       
   727 
       
   728 class PGHelper(object):
       
   729     def __init__(self, cnx, pg_schema='public'):
       
   730         self.cnx = cnx
       
   731         # Deals with pg schema, see #3216686
       
   732         self.pg_schema = pg_schema
       
   733 
       
   734     def application_indexes_constraints(self, tablename):
       
   735         """ Get all the indexes/constraints for a given tablename """
       
   736         indexes = self.application_indexes(tablename)
       
   737         constraints = self.application_constraints(tablename)
       
   738         _indexes = {}
       
   739         for name, query in indexes.iteritems():
       
   740             # Remove pkey indexes (automatically created by constraints)
       
   741             # Specific cases of primary key, see #3224079
       
   742             if name not in constraints:
       
   743                 _indexes[name] = query
       
   744         return _indexes, constraints
       
   745 
       
   746     def table_exists(self, table_name):
       
   747         sql = "SELECT * from information_schema.tables WHERE table_name=%(t)s AND table_schema=%(s)s"
       
   748         crs = self.cnx.system_sql(sql, {'t': table_name, 's': self.pg_schema})
       
   749         res = crs.fetchall()
       
   750         if res:
       
   751             return True
       
   752         return False
       
   753 
       
   754     # def check_if_primary_key_exists_for_table(self, table_name):
       
   755     #     sql = ("SELECT constraint_name FROM information_schema.table_constraints "
       
   756     #            "WHERE constraint_type = 'PRIMARY KEY' AND table_name=%(t)s AND table_schema=%(s)s")
       
   757     #     crs = self.cnx.system_sql(sql, {'t': table_name, 's': self.pg_schema})
       
   758     #     res = crs.fetchall()
       
   759     #     if res:
       
   760     #         return True
       
   761     #     return False
       
   762 
       
   763     def index_query(self, name):
       
   764         """Get the request to be used to recreate the index"""
       
   765         return self.cnx.system_sql("SELECT pg_get_indexdef(c.oid) "
       
   766                                    "from pg_catalog.pg_class c "
       
   767                                    "LEFT JOIN pg_catalog.pg_namespace n "
       
   768                                    "ON n.oid = c.relnamespace "
       
   769                                    "WHERE c.relname = %(r)s AND n.nspname=%(n)s",
       
   770                                    {'r': name, 'n': self.pg_schema}).fetchone()[0]
       
   771 
       
   772     def constraint_query(self, name):
       
   773         """Get the request to be used to recreate the constraint"""
       
   774         return self.cnx.system_sql("SELECT pg_get_constraintdef(c.oid) "
       
   775                                    "from pg_catalog.pg_constraint c "
       
   776                                    "LEFT JOIN pg_catalog.pg_namespace n "
       
   777                                    "ON n.oid = c.connamespace "
       
   778                                    "WHERE c.conname = %(r)s AND n.nspname=%(n)s",
       
   779                                    {'r': name, 'n': self.pg_schema}).fetchone()[0]
       
   780 
       
   781     def application_indexes(self, tablename):
       
   782         """ Iterate over all the indexes """
       
   783         # This SQL query (cf http://www.postgresql.org/message-id/432F450F.4080700@squiz.net)
       
   784         # aims at getting all the indexes for each table.
       
   785         sql = '''SELECT c.relname as "Name"
       
   786         FROM pg_catalog.pg_class c
       
   787         JOIN pg_catalog.pg_index i ON i.indexrelid = c.oid
       
   788         JOIN pg_catalog.pg_class c2 ON i.indrelid = c2.oid
       
   789         LEFT JOIN pg_catalog.pg_user u ON u.usesysid = c.relowner
       
   790         LEFT JOIN pg_catalog.pg_namespace n ON n.oid = c.relnamespace
       
   791         WHERE c.relkind IN ('i','')
       
   792         AND c2.relname = '%s'
       
   793         AND n.nspname NOT IN ('pg_catalog', 'pg_toast')
       
   794         AND pg_catalog.pg_table_is_visible(c.oid);''' % tablename
       
   795         indexes_list = self.cnx.system_sql(sql).fetchall()
       
   796         indexes = {}
       
   797         for name, in indexes_list:
       
   798             indexes[name] = self.index_query(name)
       
   799         return indexes
       
   800 
       
   801     def application_constraints(self, tablename):
       
   802         """ Iterate over all the constraints """
       
   803         sql = '''SELECT i.conname as "Name"
       
   804                  FROM pg_catalog.pg_class c JOIN pg_catalog.pg_constraint i
       
   805                  ON i.conrelid = c.oid JOIN pg_catalog.pg_class c2 ON i.conrelid=c2.oid
       
   806                  LEFT JOIN pg_catalog.pg_user u ON u.usesysid = c.relowner
       
   807                  LEFT JOIN pg_catalog.pg_namespace n ON n.oid = c.relnamespace
       
   808                  WHERE c2.relname = '%s' AND n.nspname NOT IN ('pg_catalog', 'pg_toast')
       
   809                  AND pg_catalog.pg_table_is_visible(c.oid);''' % tablename
       
   810         indexes_list = self.cnx.system_sql(sql).fetchall()
       
   811         constraints = {}
       
   812         for name, in indexes_list:
       
   813             query = self.constraint_query(name)
       
   814             constraints[name] = 'ALTER TABLE %s ADD CONSTRAINT %s %s' % (tablename, name, query)
       
   815         return constraints