cubicweb/dataimport/massive_store.py
changeset 11316 36c7cd362fc7
parent 11315 ad826d81e88e
child 11317 4085a452b6b4
equal deleted inserted replaced
11315:ad826d81e88e 11316:36c7cd362fc7
    19 
    19 
    20 import logging
    20 import logging
    21 from datetime import datetime
    21 from datetime import datetime
    22 from collections import defaultdict
    22 from collections import defaultdict
    23 from io import StringIO
    23 from io import StringIO
       
    24 from itertools import chain
    24 
    25 
    25 from six.moves import range
    26 from six.moves import range
    26 
    27 
    27 import pytz
    28 import pytz
    28 
    29 
   104         self.slave_mode = slave_mode
   105         self.slave_mode = slave_mode
   105         self.eids_seq_range = eids_seq_range
   106         self.eids_seq_range = eids_seq_range
   106 
   107 
   107         self.logger = logging.getLogger('dataimport.massive_store')
   108         self.logger = logging.getLogger('dataimport.massive_store')
   108         self.sql = cnx.system_sql
   109         self.sql = cnx.system_sql
   109         self.default_values = get_default_values(cnx.vreg.schema)
   110         self.schema = self._cnx.vreg.schema
       
   111         self.default_values = get_default_values(self.schema)
   110         self.get_next_eid = lambda g=self._get_eid_gen(): next(g)
   112         self.get_next_eid = lambda g=self._get_eid_gen(): next(g)
   111         self._dbh = PGHelper(cnx)
   113         self._dbh = PGHelper(cnx)
   112 
   114 
   113         cnx.read_security = False
   115         cnx.read_security = False
   114         cnx.write_security = False
   116         cnx.write_security = False
   141         self._etype_eid_idx = dict(cnx.execute('Any XN,X WHERE X is CWEType, X name XN'))
   143         self._etype_eid_idx = dict(cnx.execute('Any XN,X WHERE X is CWEType, X name XN'))
   142 
   144 
   143     # INIT FUNCTIONS ########################################################
   145     # INIT FUNCTIONS ########################################################
   144 
   146 
   145     def _drop_all_constraints(self):
   147     def _drop_all_constraints(self):
   146         schema = self._cnx.vreg.schema
   148         etypes_tables = ('cw_%s' % eschema.type.lower() for eschema in self.schema.entities()
   147         tables = ['cw_%s' % etype.type.lower()
   149                          if not eschema.final)
   148                   for etype in schema.entities() if not etype.final]
   150         rtypes_tables = ('%s_relation' % rschema.type.lower() for rschema in self.schema.relations()
   149         for rschema in schema.relations():
   151                          if rschema_has_table(rschema, skip_relations=PURE_VIRTUAL_RTYPES))
   150             if rschema.inlined:
   152         for tablename in chain(etypes_tables, rtypes_tables, ('entities',)):
   151                 continue
       
   152             elif rschema_has_table(rschema, skip_relations=PURE_VIRTUAL_RTYPES):
       
   153                 tables.append('%s_relation' % rschema.type.lower())
       
   154         tables.append('entities')
       
   155         for tablename in tables:
       
   156             self._store_and_drop_constraints(tablename)
   153             self._store_and_drop_constraints(tablename)
   157 
   154 
   158     def _store_and_drop_constraints(self, tablename):
   155     def _store_and_drop_constraints(self, tablename):
   159         # Create a table to save the constraints, it allows reloading even after crash
   156         # Create a table to save the constraints, it allows reloading even after crash
   160         self.sql('CREATE TABLE IF NOT EXISTS cwmassive_constraints'
   157         self.sql('CREATE TABLE IF NOT EXISTS cwmassive_constraints'
   183         for etype in (etype_from, etype_to):
   180         for etype in (etype_from, etype_to):
   184             if etype and etype not in self._init_uri_eid:
   181             if etype and etype not in self._init_uri_eid:
   185                 self._init_uri_eid_table(etype)
   182                 self._init_uri_eid_table(etype)
   186         if rtype not in self._uri_rtypes:
   183         if rtype not in self._uri_rtypes:
   187             # Create the temporary table
   184             # Create the temporary table
   188             if not self._cnx.repo.schema.rschema(rtype).inlined:
   185             if not self.schema.rschema(rtype).inlined:
   189                 self.sql('CREATE TABLE IF NOT EXISTS %(r)s_relation_iid_tmp'
   186                 self.sql('CREATE TABLE IF NOT EXISTS %(r)s_relation_iid_tmp'
   190                          '(uri_from character varying(%(s)s), uri_to character varying(%(s)s))'
   187                          '(uri_from character varying(%(s)s), uri_to character varying(%(s)s))'
   191                          % {'r': rtype, 's': self.iid_maxsize})
   188                          % {'r': rtype, 's': self.iid_maxsize})
   192             else:
   189             else:
   193                 self.logger.warning("inlined relation %s: cannot insert it", rtype)
   190                 self.logger.warning("inlined relation %s: cannot insert it", rtype)
   227             buf = StringIO('\n'.join(['%(uri_from)s\t%(uri_to)s' % d for d in data]))
   224             buf = StringIO('\n'.join(['%(uri_from)s\t%(uri_to)s' % d for d in data]))
   228             if not buf:
   225             if not buf:
   229                 self.logger.info('Empty Buffer for rtype %s', rtype)
   226                 self.logger.info('Empty Buffer for rtype %s', rtype)
   230                 continue
   227                 continue
   231             cursor = self._cnx.cnxset.cu
   228             cursor = self._cnx.cnxset.cu
   232             if not self._cnx.repo.schema.rschema(rtype).inlined:
   229             if not self.schema.rschema(rtype).inlined:
   233                 cursor.copy_from(buf, '%s_relation_iid_tmp' % rtype.lower(),
   230                 cursor.copy_from(buf, '%s_relation_iid_tmp' % rtype.lower(),
   234                                  null='NULL', columns=('uri_from', 'uri_to'))
   231                                  null='NULL', columns=('uri_from', 'uri_to'))
   235             else:
   232             else:
   236                 self.logger.warning("inlined relation %s: cannot insert it", rtype)
   233                 self.logger.warning("inlined relation %s: cannot insert it", rtype)
   237             buf.close()
   234             buf.close()
   258         self.flush_relations()
   255         self.flush_relations()
   259         if uri_label_from and etype_from not in self._uri_eid_inserted:
   256         if uri_label_from and etype_from not in self._uri_eid_inserted:
   260             self.fill_uri_eid_table(etype_from, uri_label_from)
   257             self.fill_uri_eid_table(etype_from, uri_label_from)
   261         if uri_label_to and etype_to not in self._uri_eid_inserted:
   258         if uri_label_to and etype_to not in self._uri_eid_inserted:
   262             self.fill_uri_eid_table(etype_to, uri_label_to)
   259             self.fill_uri_eid_table(etype_to, uri_label_to)
   263         if self._cnx.repo.schema.rschema(rtype).inlined:
   260         if self.schema.rschema(rtype).inlined:
   264             self.logger.warning("Can't insert inlined relation %s", rtype)
   261             self.logger.warning("Can't insert inlined relation %s", rtype)
   265             return
   262             return
   266         if uri_label_from and uri_label_to:
   263         if uri_label_from and uri_label_to:
   267             sql = '''INSERT INTO %(r)s_relation (eid_from, eid_to) SELECT DISTINCT O1.eid, O2.eid
   264             sql = '''INSERT INTO %(r)s_relation (eid_from, eid_to) SELECT DISTINCT O1.eid, O2.eid
   268             FROM %(r)s_relation_iid_tmp AS T, uri_eid_%(ef)s as O1, uri_eid_%(et)s as O2
   265             FROM %(r)s_relation_iid_tmp AS T, uri_eid_%(ef)s as O1, uri_eid_%(et)s as O2
   446         # Cleanup relations tables
   443         # Cleanup relations tables
   447         for etype in self._init_uri_eid:
   444         for etype in self._init_uri_eid:
   448             self.sql('DROP TABLE uri_eid_%s' % etype.lower())
   445             self.sql('DROP TABLE uri_eid_%s' % etype.lower())
   449         # Remove relations tables
   446         # Remove relations tables
   450         for rtype in self._uri_rtypes:
   447         for rtype in self._uri_rtypes:
   451             if not self._cnx.repo.schema.rschema(rtype).inlined:
   448             if not self.schema.rschema(rtype).inlined:
   452                 self.sql('DROP TABLE %(r)s_relation_iid_tmp' % {'r': rtype})
   449                 self.sql('DROP TABLE %(r)s_relation_iid_tmp' % {'r': rtype})
   453             else:
   450             else:
   454                 self.logger.warning("inlined relation %s: no cleanup to be done for it" % rtype)
   451                 self.logger.warning("inlined relation %s: no cleanup to be done for it" % rtype)
   455         # Create meta constraints (entities, is_instance_of, ...)
   452         # Create meta constraints (entities, is_instance_of, ...)
   456         self._create_metatables_constraints()
   453         self._create_metatables_constraints()