302 not getattr(repo.config, 'no_sqlite_wrap', False): |
302 not getattr(repo.config, 'no_sqlite_wrap', False): |
303 from cubicweb.server.sources.extlite import ConnectionWrapper |
303 from cubicweb.server.sources.extlite import ConnectionWrapper |
304 self.dbhelper.dbname = abspath(self.dbhelper.dbname) |
304 self.dbhelper.dbname = abspath(self.dbhelper.dbname) |
305 self.get_connection = lambda: ConnectionWrapper(self) |
305 self.get_connection = lambda: ConnectionWrapper(self) |
306 self.check_connection = lambda cnx: cnx |
306 self.check_connection = lambda cnx: cnx |
307 def pool_reset(cnx): |
307 def cnxset_freed(cnx): |
308 cnx.close() |
308 cnx.close() |
309 self.pool_reset = pool_reset |
309 self.cnxset_freed = cnxset_freed |
310 if self.dbdriver == 'sqlite': |
310 if self.dbdriver == 'sqlite': |
311 self._create_eid = None |
311 self._create_eid = None |
312 self.create_eid = self._create_eid_sqlite |
312 self.create_eid = self._create_eid_sqlite |
313 self.binary_to_str = self.dbhelper.dbapi_module.binary_to_str |
313 self.binary_to_str = self.dbhelper.dbapi_module.binary_to_str |
314 |
314 |
344 |
344 |
345 def sqlexec(self, session, sql, args=None): |
345 def sqlexec(self, session, sql, args=None): |
346 """execute the query and return its result""" |
346 """execute the query and return its result""" |
347 return self.process_result(self.doexec(session, sql, args)) |
347 return self.process_result(self.doexec(session, sql, args)) |
348 |
348 |
349 def init_creating(self, pool=None): |
349 def init_creating(self, cnxset=None): |
350 # check full text index availibility |
350 # check full text index availibility |
351 if self.do_fti: |
351 if self.do_fti: |
352 if pool is None: |
352 if cnxset is None: |
353 _pool = self.repo._get_pool() |
353 _cnxset = self.repo._get_cnxset() |
354 _pool.pool_set() |
354 _cnxset.cnxset_set() |
355 else: |
355 else: |
356 _pool = pool |
356 _cnxset = cnxset |
357 if not self.dbhelper.has_fti_table(_pool['system']): |
357 if not self.dbhelper.has_fti_table(_cnxset['system']): |
358 if not self.repo.config.creating: |
358 if not self.repo.config.creating: |
359 self.critical('no text index table') |
359 self.critical('no text index table') |
360 self.do_fti = False |
360 self.do_fti = False |
361 if pool is None: |
361 if cnxset is None: |
362 _pool.pool_reset() |
362 _cnxset.cnxset_freed() |
363 self.repo._free_pool(_pool) |
363 self.repo._free_cnxset(_cnxset) |
364 |
364 |
365 def backup(self, backupfile, confirm, format='native'): |
365 def backup(self, backupfile, confirm, format='native'): |
366 """method called to create a backup of the source's data""" |
366 """method called to create a backup of the source's data""" |
367 if format == 'portable': |
367 if format == 'portable': |
368 self.repo.fill_schema() |
368 self.repo.fill_schema() |
369 self.set_schema(self.repo.schema) |
369 self.set_schema(self.repo.schema) |
370 helper = DatabaseIndependentBackupRestore(self) |
370 helper = DatabaseIndependentBackupRestore(self) |
371 self.close_pool_connections() |
371 self.close_source_connections() |
372 try: |
372 try: |
373 helper.backup(backupfile) |
373 helper.backup(backupfile) |
374 finally: |
374 finally: |
375 self.open_pool_connections() |
375 self.open_source_connections() |
376 elif format == 'native': |
376 elif format == 'native': |
377 self.close_pool_connections() |
377 self.close_source_connections() |
378 try: |
378 try: |
379 self.backup_to_file(backupfile, confirm) |
379 self.backup_to_file(backupfile, confirm) |
380 finally: |
380 finally: |
381 self.open_pool_connections() |
381 self.open_source_connections() |
382 else: |
382 else: |
383 raise ValueError('Unknown format %r' % format) |
383 raise ValueError('Unknown format %r' % format) |
384 |
384 |
385 |
385 |
386 def restore(self, backupfile, confirm, drop, format='native'): |
386 def restore(self, backupfile, confirm, drop, format='native'): |
387 """method called to restore a backup of source's data""" |
387 """method called to restore a backup of source's data""" |
388 if self.repo.config.open_connections_pools: |
388 if self.repo.config.init_cnxset_pool: |
389 self.close_pool_connections() |
389 self.close_source_connections() |
390 try: |
390 try: |
391 if format == 'portable': |
391 if format == 'portable': |
392 helper = DatabaseIndependentBackupRestore(self) |
392 helper = DatabaseIndependentBackupRestore(self) |
393 helper.restore(backupfile) |
393 helper.restore(backupfile) |
394 elif format == 'native': |
394 elif format == 'native': |
395 self.restore_from_file(backupfile, confirm, drop=drop) |
395 self.restore_from_file(backupfile, confirm, drop=drop) |
396 else: |
396 else: |
397 raise ValueError('Unknown format %r' % format) |
397 raise ValueError('Unknown format %r' % format) |
398 finally: |
398 finally: |
399 if self.repo.config.open_connections_pools: |
399 if self.repo.config.init_cnxset_pool: |
400 self.open_pool_connections() |
400 self.open_source_connections() |
401 |
401 |
402 |
402 |
403 def init(self, activated, source_entity): |
403 def init(self, activated, source_entity): |
404 self.init_creating(source_entity._cw.pool) |
404 self.init_creating(source_entity._cw.cnxset) |
405 |
405 |
406 def shutdown(self): |
406 def shutdown(self): |
407 if self._eid_creation_cnx: |
407 if self._eid_creation_cnx: |
408 self._eid_creation_cnx.close() |
408 self._eid_creation_cnx.close() |
409 self._eid_creation_cnx = None |
409 self._eid_creation_cnx = None |
521 # do not attempt to reconnect if there has been some write |
521 # do not attempt to reconnect if there has been some write |
522 # during the transaction |
522 # during the transaction |
523 raise |
523 raise |
524 # FIXME: better detection of deconnection pb |
524 # FIXME: better detection of deconnection pb |
525 self.warning("trying to reconnect") |
525 self.warning("trying to reconnect") |
526 session.pool.reconnect(self) |
526 session.cnxset.reconnect(self) |
527 cursor = self.doexec(session, sql, args) |
527 cursor = self.doexec(session, sql, args) |
528 except (self.DbapiError,), exc: |
528 except (self.DbapiError,), exc: |
529 # We get this one with pyodbc and SQL Server when connection was reset |
529 # We get this one with pyodbc and SQL Server when connection was reset |
530 if exc.args[0] == '08S01' and session.mode != 'write': |
530 if exc.args[0] == '08S01' and session.mode != 'write': |
531 self.warning("trying to reconnect") |
531 self.warning("trying to reconnect") |
532 session.pool.reconnect(self) |
532 session.cnxset.reconnect(self) |
533 cursor = self.doexec(session, sql, args) |
533 cursor = self.doexec(session, sql, args) |
534 else: |
534 else: |
535 raise |
535 raise |
536 results = self.process_result(cursor, cbs, session=session) |
536 results = self.process_result(cursor, cbs, session=session) |
537 assert dbg_results(results) |
537 assert dbg_results(results) |
716 |
716 |
717 def doexec(self, session, query, args=None, rollback=True): |
717 def doexec(self, session, query, args=None, rollback=True): |
718 """Execute a query. |
718 """Execute a query. |
719 it's a function just so that it shows up in profiling |
719 it's a function just so that it shows up in profiling |
720 """ |
720 """ |
721 cursor = session.pool[self.uri] |
721 cursor = session.cnxset[self.uri] |
722 if server.DEBUG & server.DBG_SQL: |
722 if server.DEBUG & server.DBG_SQL: |
723 cnx = session.pool.connection(self.uri) |
723 cnx = session.cnxset.connection(self.uri) |
724 # getattr to get the actual connection if cnx is a ConnectionWrapper |
724 # getattr to get the actual connection if cnx is a ConnectionWrapper |
725 # instance |
725 # instance |
726 print 'exec', query, args, getattr(cnx, '_cnx', cnx) |
726 print 'exec', query, args, getattr(cnx, '_cnx', cnx) |
727 try: |
727 try: |
728 # str(query) to avoid error if it's an unicode string |
728 # str(query) to avoid error if it's an unicode string |
762 """Execute a query. |
762 """Execute a query. |
763 it's a function just so that it shows up in profiling |
763 it's a function just so that it shows up in profiling |
764 """ |
764 """ |
765 if server.DEBUG & server.DBG_SQL: |
765 if server.DEBUG & server.DBG_SQL: |
766 print 'execmany', query, 'with', len(args), 'arguments' |
766 print 'execmany', query, 'with', len(args), 'arguments' |
767 cursor = session.pool[self.uri] |
767 cursor = session.cnxset[self.uri] |
768 try: |
768 try: |
769 # str(query) to avoid error if it's an unicode string |
769 # str(query) to avoid error if it's an unicode string |
770 cursor.executemany(str(query), args) |
770 cursor.executemany(str(query), args) |
771 except Exception, ex: |
771 except Exception, ex: |
772 if self.repo.config.mode != 'test': |
772 if self.repo.config.mode != 'test': |
773 # during test we get those message when trying to alter sqlite |
773 # during test we get those message when trying to alter sqlite |
774 # db schema |
774 # db schema |
775 self.critical("sql many: %r\n args: %s\ndbms message: %r", |
775 self.critical("sql many: %r\n args: %s\ndbms message: %r", |
776 query, args, ex.args[0]) |
776 query, args, ex.args[0]) |
777 try: |
777 try: |
778 session.pool.connection(self.uri).rollback() |
778 session.cnxset.connection(self.uri).rollback() |
779 if self.repo.config.mode != 'test': |
779 if self.repo.config.mode != 'test': |
780 self.critical('transaction has been rollbacked') |
780 self.critical('transaction has been rollbacked') |
781 except: |
781 except: |
782 pass |
782 pass |
783 raise |
783 raise |
791 coltype, allownull = rdef_physical_info(self.dbhelper, rdef) |
791 coltype, allownull = rdef_physical_info(self.dbhelper, rdef) |
792 if not self.dbhelper.alter_column_support: |
792 if not self.dbhelper.alter_column_support: |
793 self.error("backend can't alter %s.%s to %s%s", table, column, coltype, |
793 self.error("backend can't alter %s.%s to %s%s", table, column, coltype, |
794 not allownull and 'NOT NULL' or '') |
794 not allownull and 'NOT NULL' or '') |
795 return |
795 return |
796 self.dbhelper.change_col_type(LogCursor(session.pool[self.uri]), |
796 self.dbhelper.change_col_type(LogCursor(session.cnxset[self.uri]), |
797 table, column, coltype, allownull) |
797 table, column, coltype, allownull) |
798 self.info('altered %s.%s: now %s%s', table, column, coltype, |
798 self.info('altered %s.%s: now %s%s', table, column, coltype, |
799 not allownull and 'NOT NULL' or '') |
799 not allownull and 'NOT NULL' or '') |
800 |
800 |
801 def update_rdef_null_allowed(self, session, rdef): |
801 def update_rdef_null_allowed(self, session, rdef): |
806 # not supported (and NOT NULL not set by yams in that case, so no |
806 # not supported (and NOT NULL not set by yams in that case, so no |
807 # worry) |
807 # worry) |
808 return |
808 return |
809 table, column = rdef_table_column(rdef) |
809 table, column = rdef_table_column(rdef) |
810 coltype, allownull = rdef_physical_info(self.dbhelper, rdef) |
810 coltype, allownull = rdef_physical_info(self.dbhelper, rdef) |
811 self.dbhelper.set_null_allowed(LogCursor(session.pool[self.uri]), |
811 self.dbhelper.set_null_allowed(LogCursor(session.cnxset[self.uri]), |
812 table, column, coltype, allownull) |
812 table, column, coltype, allownull) |
813 |
813 |
814 def update_rdef_indexed(self, session, rdef): |
814 def update_rdef_indexed(self, session, rdef): |
815 table, column = rdef_table_column(rdef) |
815 table, column = rdef_table_column(rdef) |
816 if rdef.indexed: |
816 if rdef.indexed: |
824 self.create_index(session, table, column, unique=True) |
824 self.create_index(session, table, column, unique=True) |
825 else: |
825 else: |
826 self.drop_index(session, table, column, unique=True) |
826 self.drop_index(session, table, column, unique=True) |
827 |
827 |
828 def create_index(self, session, table, column, unique=False): |
828 def create_index(self, session, table, column, unique=False): |
829 cursor = LogCursor(session.pool[self.uri]) |
829 cursor = LogCursor(session.cnxset[self.uri]) |
830 self.dbhelper.create_index(cursor, table, column, unique) |
830 self.dbhelper.create_index(cursor, table, column, unique) |
831 |
831 |
832 def drop_index(self, session, table, column, unique=False): |
832 def drop_index(self, session, table, column, unique=False): |
833 cursor = LogCursor(session.pool[self.uri]) |
833 cursor = LogCursor(session.cnxset[self.uri]) |
834 self.dbhelper.drop_index(cursor, table, column, unique) |
834 self.dbhelper.drop_index(cursor, table, column, unique) |
835 |
835 |
836 # system source interface ################################################# |
836 # system source interface ################################################# |
837 |
837 |
838 def eid_type_source(self, session, eid): |
838 def eid_type_source(self, session, eid): |
839 """return a tuple (type, source, extid) for the entity with id <eid>""" |
839 """return a tuple (type, source, extid) for the entity with id <eid>""" |
840 sql = 'SELECT type, source, extid FROM entities WHERE eid=%s' % eid |
840 sql = 'SELECT type, source, extid FROM entities WHERE eid=%s' % eid |
841 try: |
841 try: |
842 res = self.doexec(session, sql).fetchone() |
842 res = self.doexec(session, sql).fetchone() |
843 except: |
843 except: |
844 assert session.pool, 'session has no pool set' |
844 assert session.cnxset, 'session has no connections set' |
845 raise UnknownEid(eid) |
845 raise UnknownEid(eid) |
846 if res is None: |
846 if res is None: |
847 raise UnknownEid(eid) |
847 raise UnknownEid(eid) |
848 if res[-1] is not None: |
848 if res[-1] is not None: |
849 if not isinstance(res, list): |
849 if not isinstance(res, list): |
1133 """See :class:`cubicweb.dbapi.Connection.undo_transaction` |
1133 """See :class:`cubicweb.dbapi.Connection.undo_transaction` |
1134 |
1134 |
1135 important note: while undoing of a transaction, only hooks in the |
1135 important note: while undoing of a transaction, only hooks in the |
1136 'integrity', 'activeintegrity' and 'undo' categories are called. |
1136 'integrity', 'activeintegrity' and 'undo' categories are called. |
1137 """ |
1137 """ |
1138 # set mode so pool isn't released subsquently until commit/rollback |
1138 # set mode so connections set isn't released subsquently until commit/rollback |
1139 session.mode = 'write' |
1139 session.mode = 'write' |
1140 errors = [] |
1140 errors = [] |
1141 session.transaction_data['undoing_uuid'] = txuuid |
1141 session.transaction_data['undoing_uuid'] = txuuid |
1142 with hooks_control(session, session.HOOKS_DENY_ALL, |
1142 with hooks_control(session, session.HOOKS_DENY_ALL, |
1143 'integrity', 'activeintegrity', 'undo'): |
1143 'integrity', 'activeintegrity', 'undo'): |
1391 |
1391 |
1392 def fti_index_entities(self, session, entities): |
1392 def fti_index_entities(self, session, entities): |
1393 """add text content of created/modified entities to the full text index |
1393 """add text content of created/modified entities to the full text index |
1394 """ |
1394 """ |
1395 cursor_index_object = self.dbhelper.cursor_index_object |
1395 cursor_index_object = self.dbhelper.cursor_index_object |
1396 cursor = session.pool['system'] |
1396 cursor = session.cnxset['system'] |
1397 try: |
1397 try: |
1398 # use cursor_index_object, not cursor_reindex_object since |
1398 # use cursor_index_object, not cursor_reindex_object since |
1399 # unindexing done in the FTIndexEntityOp |
1399 # unindexing done in the FTIndexEntityOp |
1400 for entity in entities: |
1400 for entity in entities: |
1401 cursor_index_object(entity.eid, |
1401 cursor_index_object(entity.eid, |