server/sources/native.py
changeset 7398 26695dd703d8
parent 7342 d1c8b5b3531c
child 7514 32081892850e
equal deleted inserted replaced
7397:6a9e66d788b3 7398:26695dd703d8
   302                not getattr(repo.config, 'no_sqlite_wrap', False):
   302                not getattr(repo.config, 'no_sqlite_wrap', False):
   303             from cubicweb.server.sources.extlite import ConnectionWrapper
   303             from cubicweb.server.sources.extlite import ConnectionWrapper
   304             self.dbhelper.dbname = abspath(self.dbhelper.dbname)
   304             self.dbhelper.dbname = abspath(self.dbhelper.dbname)
   305             self.get_connection = lambda: ConnectionWrapper(self)
   305             self.get_connection = lambda: ConnectionWrapper(self)
   306             self.check_connection = lambda cnx: cnx
   306             self.check_connection = lambda cnx: cnx
   307             def pool_reset(cnx):
   307             def cnxset_freed(cnx):
   308                 cnx.close()
   308                 cnx.close()
   309             self.pool_reset = pool_reset
   309             self.cnxset_freed = cnxset_freed
   310         if self.dbdriver == 'sqlite':
   310         if self.dbdriver == 'sqlite':
   311             self._create_eid = None
   311             self._create_eid = None
   312             self.create_eid = self._create_eid_sqlite
   312             self.create_eid = self._create_eid_sqlite
   313         self.binary_to_str = self.dbhelper.dbapi_module.binary_to_str
   313         self.binary_to_str = self.dbhelper.dbapi_module.binary_to_str
   314 
   314 
   344 
   344 
   345     def sqlexec(self, session, sql, args=None):
   345     def sqlexec(self, session, sql, args=None):
   346         """execute the query and return its result"""
   346         """execute the query and return its result"""
   347         return self.process_result(self.doexec(session, sql, args))
   347         return self.process_result(self.doexec(session, sql, args))
   348 
   348 
   349     def init_creating(self, pool=None):
   349     def init_creating(self, cnxset=None):
   350         # check full text index availibility
   350         # check full text index availibility
   351         if self.do_fti:
   351         if self.do_fti:
   352             if pool is None:
   352             if cnxset is None:
   353                 _pool = self.repo._get_pool()
   353                 _cnxset = self.repo._get_cnxset()
   354                 _pool.pool_set()
   354                 _cnxset.cnxset_set()
   355             else:
   355             else:
   356                 _pool = pool
   356                 _cnxset = cnxset
   357             if not self.dbhelper.has_fti_table(_pool['system']):
   357             if not self.dbhelper.has_fti_table(_cnxset['system']):
   358                 if not self.repo.config.creating:
   358                 if not self.repo.config.creating:
   359                     self.critical('no text index table')
   359                     self.critical('no text index table')
   360                 self.do_fti = False
   360                 self.do_fti = False
   361             if pool is None:
   361             if cnxset is None:
   362                 _pool.pool_reset()
   362                 _cnxset.cnxset_freed()
   363                 self.repo._free_pool(_pool)
   363                 self.repo._free_cnxset(_cnxset)
   364 
   364 
   365     def backup(self, backupfile, confirm, format='native'):
   365     def backup(self, backupfile, confirm, format='native'):
   366         """method called to create a backup of the source's data"""
   366         """method called to create a backup of the source's data"""
   367         if format == 'portable':
   367         if format == 'portable':
   368             self.repo.fill_schema()
   368             self.repo.fill_schema()
   369             self.set_schema(self.repo.schema)
   369             self.set_schema(self.repo.schema)
   370             helper = DatabaseIndependentBackupRestore(self)
   370             helper = DatabaseIndependentBackupRestore(self)
   371             self.close_pool_connections()
   371             self.close_source_connections()
   372             try:
   372             try:
   373                 helper.backup(backupfile)
   373                 helper.backup(backupfile)
   374             finally:
   374             finally:
   375                 self.open_pool_connections()
   375                 self.open_source_connections()
   376         elif format == 'native':
   376         elif format == 'native':
   377             self.close_pool_connections()
   377             self.close_source_connections()
   378             try:
   378             try:
   379                 self.backup_to_file(backupfile, confirm)
   379                 self.backup_to_file(backupfile, confirm)
   380             finally:
   380             finally:
   381                 self.open_pool_connections()
   381                 self.open_source_connections()
   382         else:
   382         else:
   383             raise ValueError('Unknown format %r' % format)
   383             raise ValueError('Unknown format %r' % format)
   384 
   384 
   385 
   385 
   386     def restore(self, backupfile, confirm, drop, format='native'):
   386     def restore(self, backupfile, confirm, drop, format='native'):
   387         """method called to restore a backup of source's data"""
   387         """method called to restore a backup of source's data"""
   388         if self.repo.config.open_connections_pools:
   388         if self.repo.config.init_cnxset_pool:
   389             self.close_pool_connections()
   389             self.close_source_connections()
   390         try:
   390         try:
   391             if format == 'portable':
   391             if format == 'portable':
   392                 helper = DatabaseIndependentBackupRestore(self)
   392                 helper = DatabaseIndependentBackupRestore(self)
   393                 helper.restore(backupfile)
   393                 helper.restore(backupfile)
   394             elif format == 'native':
   394             elif format == 'native':
   395                 self.restore_from_file(backupfile, confirm, drop=drop)
   395                 self.restore_from_file(backupfile, confirm, drop=drop)
   396             else:
   396             else:
   397                 raise ValueError('Unknown format %r' % format)
   397                 raise ValueError('Unknown format %r' % format)
   398         finally:
   398         finally:
   399             if self.repo.config.open_connections_pools:
   399             if self.repo.config.init_cnxset_pool:
   400                 self.open_pool_connections()
   400                 self.open_source_connections()
   401 
   401 
   402 
   402 
   403     def init(self, activated, source_entity):
   403     def init(self, activated, source_entity):
   404         self.init_creating(source_entity._cw.pool)
   404         self.init_creating(source_entity._cw.cnxset)
   405 
   405 
   406     def shutdown(self):
   406     def shutdown(self):
   407         if self._eid_creation_cnx:
   407         if self._eid_creation_cnx:
   408             self._eid_creation_cnx.close()
   408             self._eid_creation_cnx.close()
   409             self._eid_creation_cnx = None
   409             self._eid_creation_cnx = None
   521                 # do not attempt to reconnect if there has been some write
   521                 # do not attempt to reconnect if there has been some write
   522                 # during the transaction
   522                 # during the transaction
   523                 raise
   523                 raise
   524             # FIXME: better detection of deconnection pb
   524             # FIXME: better detection of deconnection pb
   525             self.warning("trying to reconnect")
   525             self.warning("trying to reconnect")
   526             session.pool.reconnect(self)
   526             session.cnxset.reconnect(self)
   527             cursor = self.doexec(session, sql, args)
   527             cursor = self.doexec(session, sql, args)
   528         except (self.DbapiError,), exc:
   528         except (self.DbapiError,), exc:
   529             # We get this one with pyodbc and SQL Server when connection was reset
   529             # We get this one with pyodbc and SQL Server when connection was reset
   530             if exc.args[0] == '08S01' and session.mode != 'write':
   530             if exc.args[0] == '08S01' and session.mode != 'write':
   531                 self.warning("trying to reconnect")
   531                 self.warning("trying to reconnect")
   532                 session.pool.reconnect(self)
   532                 session.cnxset.reconnect(self)
   533                 cursor = self.doexec(session, sql, args)
   533                 cursor = self.doexec(session, sql, args)
   534             else:
   534             else:
   535                 raise
   535                 raise
   536         results = self.process_result(cursor, cbs, session=session)
   536         results = self.process_result(cursor, cbs, session=session)
   537         assert dbg_results(results)
   537         assert dbg_results(results)
   716 
   716 
   717     def doexec(self, session, query, args=None, rollback=True):
   717     def doexec(self, session, query, args=None, rollback=True):
   718         """Execute a query.
   718         """Execute a query.
   719         it's a function just so that it shows up in profiling
   719         it's a function just so that it shows up in profiling
   720         """
   720         """
   721         cursor = session.pool[self.uri]
   721         cursor = session.cnxset[self.uri]
   722         if server.DEBUG & server.DBG_SQL:
   722         if server.DEBUG & server.DBG_SQL:
   723             cnx = session.pool.connection(self.uri)
   723             cnx = session.cnxset.connection(self.uri)
   724             # getattr to get the actual connection if cnx is a ConnectionWrapper
   724             # getattr to get the actual connection if cnx is a ConnectionWrapper
   725             # instance
   725             # instance
   726             print 'exec', query, args, getattr(cnx, '_cnx', cnx)
   726             print 'exec', query, args, getattr(cnx, '_cnx', cnx)
   727         try:
   727         try:
   728             # str(query) to avoid error if it's an unicode string
   728             # str(query) to avoid error if it's an unicode string
   733                 # db schema
   733                 # db schema
   734                 self.critical("sql: %r\n args: %s\ndbms message: %r",
   734                 self.critical("sql: %r\n args: %s\ndbms message: %r",
   735                               query, args, ex.args[0])
   735                               query, args, ex.args[0])
   736             if rollback:
   736             if rollback:
   737                 try:
   737                 try:
   738                     session.pool.connection(self.uri).rollback()
   738                     session.cnxset.connection(self.uri).rollback()
   739                     if self.repo.config.mode != 'test':
   739                     if self.repo.config.mode != 'test':
   740                         self.critical('transaction has been rollbacked')
   740                         self.critical('transaction has been rollbacked')
   741                 except:
   741                 except:
   742                     pass
   742                     pass
   743             if ex.__class__.__name__ == 'IntegrityError':
   743             if ex.__class__.__name__ == 'IntegrityError':
   762         """Execute a query.
   762         """Execute a query.
   763         it's a function just so that it shows up in profiling
   763         it's a function just so that it shows up in profiling
   764         """
   764         """
   765         if server.DEBUG & server.DBG_SQL:
   765         if server.DEBUG & server.DBG_SQL:
   766             print 'execmany', query, 'with', len(args), 'arguments'
   766             print 'execmany', query, 'with', len(args), 'arguments'
   767         cursor = session.pool[self.uri]
   767         cursor = session.cnxset[self.uri]
   768         try:
   768         try:
   769             # str(query) to avoid error if it's an unicode string
   769             # str(query) to avoid error if it's an unicode string
   770             cursor.executemany(str(query), args)
   770             cursor.executemany(str(query), args)
   771         except Exception, ex:
   771         except Exception, ex:
   772             if self.repo.config.mode != 'test':
   772             if self.repo.config.mode != 'test':
   773                 # during test we get those message when trying to alter sqlite
   773                 # during test we get those message when trying to alter sqlite
   774                 # db schema
   774                 # db schema
   775                 self.critical("sql many: %r\n args: %s\ndbms message: %r",
   775                 self.critical("sql many: %r\n args: %s\ndbms message: %r",
   776                               query, args, ex.args[0])
   776                               query, args, ex.args[0])
   777             try:
   777             try:
   778                 session.pool.connection(self.uri).rollback()
   778                 session.cnxset.connection(self.uri).rollback()
   779                 if self.repo.config.mode != 'test':
   779                 if self.repo.config.mode != 'test':
   780                     self.critical('transaction has been rollbacked')
   780                     self.critical('transaction has been rollbacked')
   781             except:
   781             except:
   782                 pass
   782                 pass
   783             raise
   783             raise
   791         coltype, allownull = rdef_physical_info(self.dbhelper, rdef)
   791         coltype, allownull = rdef_physical_info(self.dbhelper, rdef)
   792         if not self.dbhelper.alter_column_support:
   792         if not self.dbhelper.alter_column_support:
   793             self.error("backend can't alter %s.%s to %s%s", table, column, coltype,
   793             self.error("backend can't alter %s.%s to %s%s", table, column, coltype,
   794                        not allownull and 'NOT NULL' or '')
   794                        not allownull and 'NOT NULL' or '')
   795             return
   795             return
   796         self.dbhelper.change_col_type(LogCursor(session.pool[self.uri]),
   796         self.dbhelper.change_col_type(LogCursor(session.cnxset[self.uri]),
   797                                       table, column, coltype, allownull)
   797                                       table, column, coltype, allownull)
   798         self.info('altered %s.%s: now %s%s', table, column, coltype,
   798         self.info('altered %s.%s: now %s%s', table, column, coltype,
   799                   not allownull and 'NOT NULL' or '')
   799                   not allownull and 'NOT NULL' or '')
   800 
   800 
   801     def update_rdef_null_allowed(self, session, rdef):
   801     def update_rdef_null_allowed(self, session, rdef):
   806             # not supported (and NOT NULL not set by yams in that case, so no
   806             # not supported (and NOT NULL not set by yams in that case, so no
   807             # worry)
   807             # worry)
   808             return
   808             return
   809         table, column = rdef_table_column(rdef)
   809         table, column = rdef_table_column(rdef)
   810         coltype, allownull = rdef_physical_info(self.dbhelper, rdef)
   810         coltype, allownull = rdef_physical_info(self.dbhelper, rdef)
   811         self.dbhelper.set_null_allowed(LogCursor(session.pool[self.uri]),
   811         self.dbhelper.set_null_allowed(LogCursor(session.cnxset[self.uri]),
   812                                        table, column, coltype, allownull)
   812                                        table, column, coltype, allownull)
   813 
   813 
   814     def update_rdef_indexed(self, session, rdef):
   814     def update_rdef_indexed(self, session, rdef):
   815         table, column = rdef_table_column(rdef)
   815         table, column = rdef_table_column(rdef)
   816         if rdef.indexed:
   816         if rdef.indexed:
   824             self.create_index(session, table, column, unique=True)
   824             self.create_index(session, table, column, unique=True)
   825         else:
   825         else:
   826             self.drop_index(session, table, column, unique=True)
   826             self.drop_index(session, table, column, unique=True)
   827 
   827 
   828     def create_index(self, session, table, column, unique=False):
   828     def create_index(self, session, table, column, unique=False):
   829         cursor = LogCursor(session.pool[self.uri])
   829         cursor = LogCursor(session.cnxset[self.uri])
   830         self.dbhelper.create_index(cursor, table, column, unique)
   830         self.dbhelper.create_index(cursor, table, column, unique)
   831 
   831 
   832     def drop_index(self, session, table, column, unique=False):
   832     def drop_index(self, session, table, column, unique=False):
   833         cursor = LogCursor(session.pool[self.uri])
   833         cursor = LogCursor(session.cnxset[self.uri])
   834         self.dbhelper.drop_index(cursor, table, column, unique)
   834         self.dbhelper.drop_index(cursor, table, column, unique)
   835 
   835 
   836     # system source interface #################################################
   836     # system source interface #################################################
   837 
   837 
   838     def eid_type_source(self, session, eid):
   838     def eid_type_source(self, session, eid):
   839         """return a tuple (type, source, extid) for the entity with id <eid>"""
   839         """return a tuple (type, source, extid) for the entity with id <eid>"""
   840         sql = 'SELECT type, source, extid FROM entities WHERE eid=%s' % eid
   840         sql = 'SELECT type, source, extid FROM entities WHERE eid=%s' % eid
   841         try:
   841         try:
   842             res = self.doexec(session, sql).fetchone()
   842             res = self.doexec(session, sql).fetchone()
   843         except:
   843         except:
   844             assert session.pool, 'session has no pool set'
   844             assert session.cnxset, 'session has no connections set'
   845             raise UnknownEid(eid)
   845             raise UnknownEid(eid)
   846         if res is None:
   846         if res is None:
   847             raise UnknownEid(eid)
   847             raise UnknownEid(eid)
   848         if res[-1] is not None:
   848         if res[-1] is not None:
   849             if not isinstance(res, list):
   849             if not isinstance(res, list):
  1133         """See :class:`cubicweb.dbapi.Connection.undo_transaction`
  1133         """See :class:`cubicweb.dbapi.Connection.undo_transaction`
  1134 
  1134 
  1135         important note: while undoing of a transaction, only hooks in the
  1135         important note: while undoing of a transaction, only hooks in the
  1136         'integrity', 'activeintegrity' and 'undo' categories are called.
  1136         'integrity', 'activeintegrity' and 'undo' categories are called.
  1137         """
  1137         """
  1138         # set mode so pool isn't released subsquently until commit/rollback
  1138         # set mode so connections set isn't released subsquently until commit/rollback
  1139         session.mode = 'write'
  1139         session.mode = 'write'
  1140         errors = []
  1140         errors = []
  1141         session.transaction_data['undoing_uuid'] = txuuid
  1141         session.transaction_data['undoing_uuid'] = txuuid
  1142         with hooks_control(session, session.HOOKS_DENY_ALL,
  1142         with hooks_control(session, session.HOOKS_DENY_ALL,
  1143                            'integrity', 'activeintegrity', 'undo'):
  1143                            'integrity', 'activeintegrity', 'undo'):
  1378         FTIndexEntityOp.get_instance(session).add_data(entity.eid)
  1378         FTIndexEntityOp.get_instance(session).add_data(entity.eid)
  1379 
  1379 
  1380     def fti_unindex_entities(self, session, entities):
  1380     def fti_unindex_entities(self, session, entities):
  1381         """remove text content for entities from the full text index
  1381         """remove text content for entities from the full text index
  1382         """
  1382         """
  1383         cursor = session.pool['system']
  1383         cursor = session.cnxset['system']
  1384         cursor_unindex_object = self.dbhelper.cursor_unindex_object
  1384         cursor_unindex_object = self.dbhelper.cursor_unindex_object
  1385         try:
  1385         try:
  1386             for entity in entities:
  1386             for entity in entities:
  1387                 cursor_unindex_object(entity.eid, cursor)
  1387                 cursor_unindex_object(entity.eid, cursor)
  1388         except Exception: # let KeyboardInterrupt / SystemExit propagate
  1388         except Exception: # let KeyboardInterrupt / SystemExit propagate
  1391 
  1391 
  1392     def fti_index_entities(self, session, entities):
  1392     def fti_index_entities(self, session, entities):
  1393         """add text content of created/modified entities to the full text index
  1393         """add text content of created/modified entities to the full text index
  1394         """
  1394         """
  1395         cursor_index_object = self.dbhelper.cursor_index_object
  1395         cursor_index_object = self.dbhelper.cursor_index_object
  1396         cursor = session.pool['system']
  1396         cursor = session.cnxset['system']
  1397         try:
  1397         try:
  1398             # use cursor_index_object, not cursor_reindex_object since
  1398             # use cursor_index_object, not cursor_reindex_object since
  1399             # unindexing done in the FTIndexEntityOp
  1399             # unindexing done in the FTIndexEntityOp
  1400             for entity in entities:
  1400             for entity in entities:
  1401                 cursor_index_object(entity.eid,
  1401                 cursor_index_object(entity.eid,