server/session.py
changeset 10364 8b35a898b334
parent 10362 ec8c233ce54b
child 10366 38c7598b5c61
equal deleted inserted replaced
10363:e1ebf3d12098 10364:8b35a898b334
    17 # with CubicWeb.  If not, see <http://www.gnu.org/licenses/>.
    17 # with CubicWeb.  If not, see <http://www.gnu.org/licenses/>.
    18 """Repository users' and internal' sessions."""
    18 """Repository users' and internal' sessions."""
    19 __docformat__ = "restructuredtext en"
    19 __docformat__ = "restructuredtext en"
    20 
    20 
    21 import sys
    21 import sys
    22 import threading
       
    23 from time import time
    22 from time import time
    24 from uuid import uuid4
    23 from uuid import uuid4
    25 from warnings import warn
    24 from warnings import warn
    26 import functools
    25 import functools
    27 from contextlib import contextmanager
    26 from contextlib import contextmanager
   169 DEFAULT_SECURITY = object() # evaluated to true by design
   168 DEFAULT_SECURITY = object() # evaluated to true by design
   170 
   169 
   171 class SessionClosedError(RuntimeError):
   170 class SessionClosedError(RuntimeError):
   172     pass
   171     pass
   173 
   172 
   174 class CnxSetTracker(object):
       
   175     """Keep track of which connection use which cnxset.
       
   176 
       
   177     There should be one of these objects per session (including internal sessions).
       
   178 
       
   179     Session objects are responsible for creating their CnxSetTracker object.
       
   180 
       
   181     Connections should use the :meth:`record` and :meth:`forget` to inform the
       
   182     tracker of cnxsets they have acquired.
       
   183 
       
   184     .. automethod:: cubicweb.server.session.CnxSetTracker.record
       
   185     .. automethod:: cubicweb.server.session.CnxSetTracker.forget
       
   186 
       
   187     Sessions use the :meth:`close` and :meth:`wait` methods when closing.
       
   188 
       
   189     .. automethod:: cubicweb.server.session.CnxSetTracker.close
       
   190     .. automethod:: cubicweb.server.session.CnxSetTracker.wait
       
   191 
       
   192     This object itself is threadsafe. It also requires caller to acquired its
       
   193     lock in some situation.
       
   194     """
       
   195 
       
   196     def __init__(self):
       
   197         self._active = True
       
   198         self._condition = threading.Condition()
       
   199         self._record = {}
       
   200 
       
   201     def __enter__(self):
       
   202         return self._condition.__enter__()
       
   203 
       
   204     def __exit__(self, *args):
       
   205         return self._condition.__exit__(*args)
       
   206 
       
   207     def record(self, cnxid, cnxset):
       
   208         """Inform the tracker that a cnxid has acquired a cnxset
       
   209 
       
   210         This method is to be used by Connection objects.
       
   211 
       
   212         This method fails when:
       
   213         - The cnxid already has a recorded cnxset.
       
   214         - The tracker is not active anymore.
       
   215 
       
   216         Notes about the caller:
       
   217         (1) It is responsible for retrieving a cnxset.
       
   218         (2) It must be prepared to release the cnxset if the
       
   219             `cnxsettracker.forget` call fails.
       
   220         (3) It should acquire the tracker lock until the very end of the operation.
       
   221         (4) However it must only lock the CnxSetTracker object after having
       
   222             retrieved the cnxset to prevent deadlock.
       
   223 
       
   224         A typical usage look like::
       
   225 
       
   226         cnxset = repo._get_cnxset() # (1)
       
   227         try:
       
   228             with cnxset_tracker: # (3) and (4)
       
   229                 cnxset_tracker.record(caller.id, cnxset)
       
   230                 # (3') operation ends when caller is in expected state only
       
   231                 caller.cnxset = cnxset
       
   232         except Exception:
       
   233             repo._free_cnxset(cnxset) # (2)
       
   234             raise
       
   235         """
       
   236         # dubious since the caller is supposed to have acquired it anyway.
       
   237         with self._condition:
       
   238             if not self._active:
       
   239                 raise SessionClosedError('Closed')
       
   240             old = self._record.get(cnxid)
       
   241             if old is not None:
       
   242                 raise ValueError('connection "%s" already has a cnx_set (%r)'
       
   243                                  % (cnxid, old))
       
   244             self._record[cnxid] = cnxset
       
   245 
       
   246     def forget(self, cnxid, cnxset):
       
   247         """Inform the tracker that a cnxid have release a cnxset
       
   248 
       
   249         This methode is to be used by Connection object.
       
   250 
       
   251         This method fails when:
       
   252         - The cnxset for the cnxid does not match the recorded one.
       
   253 
       
   254         Notes about the caller:
       
   255         (1) It is responsible for releasing the cnxset.
       
   256         (2) It should acquire the tracker lock during the operation to ensure
       
   257             the internal tracker state is always accurate regarding its own state.
       
   258 
       
   259         A typical usage look like::
       
   260 
       
   261         cnxset = caller.cnxset
       
   262         try:
       
   263             with cnxset_tracker:
       
   264                 # (2) you can not have caller.cnxset out of sync with
       
   265                 #     cnxset_tracker state while unlocked
       
   266                 caller.cnxset = None
       
   267                 cnxset_tracker.forget(caller.id, cnxset)
       
   268         finally:
       
   269             cnxset = repo._free_cnxset(cnxset) # (1)
       
   270         """
       
   271         with self._condition:
       
   272             old = self._record.get(cnxid, None)
       
   273             if old is not cnxset:
       
   274                 raise ValueError('recorded cnxset for "%s" mismatch: %r != %r'
       
   275                                  % (cnxid, old, cnxset))
       
   276             self._record.pop(cnxid)
       
   277             self._condition.notify_all()
       
   278 
       
   279     def close(self):
       
   280         """Marks the tracker as inactive.
       
   281 
       
   282         This method is to be used by Session objects.
       
   283 
       
   284         An inactive tracker does not accept new records anymore.
       
   285         """
       
   286         with self._condition:
       
   287             self._active = False
       
   288 
       
   289     def wait(self, timeout=10):
       
   290         """Wait for all recorded cnxsets to be released
       
   291 
       
   292         This method is to be used by Session objects.
       
   293 
       
   294         Returns a tuple of connection ids that remain open.
       
   295         """
       
   296         with self._condition:
       
   297             if  self._active:
       
   298                 raise RuntimeError('Cannot wait on active tracker.'
       
   299                                    ' Call tracker.close() first')
       
   300             while self._record and timeout > 0:
       
   301                 start = time()
       
   302                 self._condition.wait(timeout)
       
   303                 timeout -= time() - start
       
   304             return tuple(self._record)
       
   305 
       
   306 
       
   307 def _with_cnx_set(func):
       
   308     """decorator for Connection method that ensure they run with a cnxset """
       
   309     @functools.wraps(func)
       
   310     def wrapper(cnx, *args, **kwargs):
       
   311         with cnx.ensure_cnx_set:
       
   312             return func(cnx, *args, **kwargs)
       
   313     return wrapper
       
   314 
   173 
   315 def _open_only(func):
   174 def _open_only(func):
   316     """decorator for Connection method that check it is open"""
   175     """decorator for Connection method that check it is open"""
   317     @functools.wraps(func)
   176     @functools.wraps(func)
   318     def check_open(cnx, *args, **kwargs):
   177     def check_open(cnx, *args, **kwargs):
   384 
   243 
   385       :attr:`read_security` and :attr:`write_security`, boolean flags telling if
   244       :attr:`read_security` and :attr:`write_security`, boolean flags telling if
   386       read/write security is currently activated.
   245       read/write security is currently activated.
   387 
   246 
   388     """
   247     """
   389 
   248     mode = 'write'
   390     is_request = False
   249     is_request = False
   391     hooks_in_progress = False
   250     hooks_in_progress = False
   392     is_repo_in_memory = True # bw compat
   251     is_repo_in_memory = True # bw compat
   393     mode = 'read'
       
   394 
   252 
   395     def __init__(self, session):
   253     def __init__(self, session):
   396         # using super(Connection, self) confuse some test hack
   254         # using super(Connection, self) confuse some test hack
   397         RequestSessionBase.__init__(self, session.vreg)
   255         RequestSessionBase.__init__(self, session.vreg)
   398         #: connection unique id
   256         #: connection unique id
   400         self.connectionid = '%s-%s' % (session.sessionid, uuid4().hex)
   258         self.connectionid = '%s-%s' % (session.sessionid, uuid4().hex)
   401         self.session = session
   259         self.session = session
   402         self.sessionid = session.sessionid
   260         self.sessionid = session.sessionid
   403         #: reentrance handling
   261         #: reentrance handling
   404         self.ctx_count = 0
   262         self.ctx_count = 0
   405         #: count the number of entry in a context needing a cnxset
       
   406         self._cnxset_count = 0
       
   407         #: Boolean for compat with the older explicite set_cnxset/free_cnx API
       
   408         #: When a call set_cnxset is done, no automatic freeing will be done
       
   409         #: until free_cnx is called.
       
   410         self._auto_free_cnx_set = True
       
   411 
   263 
   412         #: server.Repository object
   264         #: server.Repository object
   413         self.repo = session.repo
   265         self.repo = session.repo
   414         self.vreg = self.repo.vreg
   266         self.vreg = self.repo.vreg
   415         self._execute = self.repo.querier.execute
   267         self._execute = self.repo.querier.execute
   416 
   268 
   417         # other session utility
   269         # other session utility
   418         self._session_timestamp = session._timestamp
   270         self._session_timestamp = session._timestamp
   419 
   271 
   420         #: connection set used to execute queries on sources
       
   421         self._cnxset = None
       
   422         #: CnxSetTracker used to report cnxset usage
       
   423         self._cnxset_tracker = CnxSetTracker()
       
   424         # internal (root) session
   272         # internal (root) session
   425         self.is_internal_session = isinstance(session.user, InternalManager)
   273         self.is_internal_session = isinstance(session.user, InternalManager)
   426 
   274 
   427         #: dict containing arbitrary data cleared at the end of the transaction
   275         #: dict containing arbitrary data cleared at the end of the transaction
   428         self.transaction_data = {}
   276         self.transaction_data = {}
   538     # life cycle handling ####################################################
   386     # life cycle handling ####################################################
   539 
   387 
   540     def __enter__(self):
   388     def __enter__(self):
   541         assert self._open is None # first opening
   389         assert self._open is None # first opening
   542         self._open = True
   390         self._open = True
       
   391         self.cnxset = self.repo._get_cnxset()
   543         return self
   392         return self
   544 
   393 
   545     def __exit__(self, exctype=None, excvalue=None, tb=None):
   394     def __exit__(self, exctype=None, excvalue=None, tb=None):
   546         assert self._open # actually already open
   395         assert self._open # actually already open
   547         assert self._cnxset_count == 0
   396         self.clear()
   548         self.rollback()
       
   549         self._open = False
   397         self._open = False
       
   398         self.cnxset.cnxset_freed()
       
   399         self.repo._free_cnxset(self.cnxset)
       
   400         self.cnxset = None
   550 
   401 
   551     @contextmanager
   402     @contextmanager
   552     def running_hooks_ops(self):
   403     def running_hooks_ops(self):
   553         """this context manager should be called whenever hooks or operations
   404         """this context manager should be called whenever hooks or operations
   554         are about to be run (but after hook selection)
   405         are about to be run (but after hook selection)
   602         self.commit_state = None
   453         self.commit_state = None
   603         self.pruned_hooks_cache = {}
   454         self.pruned_hooks_cache = {}
   604         self.local_perm_cache.clear()
   455         self.local_perm_cache.clear()
   605         self.rewriter = RQLRewriter(self)
   456         self.rewriter = RQLRewriter(self)
   606 
   457 
   607     # Connection Set Management ###############################################
       
   608     @property
       
   609     @_open_only
       
   610     def cnxset(self):
       
   611         return self._cnxset
       
   612 
       
   613     @cnxset.setter
       
   614     @_open_only
       
   615     def cnxset(self, new_cnxset):
       
   616         with self._cnxset_tracker:
       
   617             old_cnxset = self._cnxset
       
   618             if new_cnxset is old_cnxset:
       
   619                 return #nothing to do
       
   620             if old_cnxset is not None:
       
   621                 old_cnxset.rollback()
       
   622                 self._cnxset = None
       
   623                 self.ctx_count -= 1
       
   624                 self._cnxset_tracker.forget(self.connectionid, old_cnxset)
       
   625             if new_cnxset is not None:
       
   626                 self._cnxset_tracker.record(self.connectionid, new_cnxset)
       
   627                 self._cnxset = new_cnxset
       
   628                 self.ctx_count += 1
       
   629 
       
   630     @_open_only
       
   631     def _set_cnxset(self):
       
   632         """the connection need a connections set to execute some queries"""
       
   633         if self.cnxset is None:
       
   634             cnxset = self.repo._get_cnxset()
       
   635             try:
       
   636                 self.cnxset = cnxset
       
   637             except:
       
   638                 self.repo._free_cnxset(cnxset)
       
   639                 raise
       
   640         return self.cnxset
       
   641 
       
   642     @_open_only
       
   643     def _free_cnxset(self, ignoremode=False):
       
   644         """the connection is no longer using its connections set, at least for some time"""
       
   645         # cnxset may be none if no operation has been done since last commit
       
   646         # or rollback
       
   647         cnxset = self.cnxset
       
   648         if cnxset is not None and (ignoremode or self.mode == 'read'):
       
   649             assert self._cnxset_count == 0
       
   650             try:
       
   651                 self.cnxset = None
       
   652             finally:
       
   653                 cnxset.cnxset_freed()
       
   654                 self.repo._free_cnxset(cnxset)
       
   655 
       
   656     @deprecated('[3.19] cnxset are automatically managed now.'
   458     @deprecated('[3.19] cnxset are automatically managed now.'
   657                 ' stop using explicit set and free.')
   459                 ' stop using explicit set and free.')
   658     def set_cnxset(self):
   460     def set_cnxset(self):
   659         self._auto_free_cnx_set = False
   461         pass
   660         return self._set_cnxset()
       
   661 
   462 
   662     @deprecated('[3.19] cnxset are automatically managed now.'
   463     @deprecated('[3.19] cnxset are automatically managed now.'
   663                 ' stop using explicit set and free.')
   464                 ' stop using explicit set and free.')
   664     def free_cnxset(self, ignoremode=False):
   465     def free_cnxset(self, ignoremode=False):
   665         self._auto_free_cnx_set = True
   466         pass
   666         return self._free_cnxset(ignoremode=ignoremode)
       
   667 
       
   668 
   467 
   669     @property
   468     @property
   670     @contextmanager
   469     @contextmanager
   671     @_open_only
   470     @_open_only
       
   471     @deprecated('[3.21] a cnxset is automatically set on __enter__ call now.'
       
   472                 ' stop using .ensure_cnx_set')
   672     def ensure_cnx_set(self):
   473     def ensure_cnx_set(self):
   673         assert self._cnxset_count >= 0
   474         yield
   674         if self._cnxset_count == 0:
       
   675             self._set_cnxset()
       
   676         try:
       
   677             self._cnxset_count += 1
       
   678             yield
       
   679         finally:
       
   680             self._cnxset_count = max(self._cnxset_count - 1, 0)
       
   681             if self._cnxset_count == 0 and self._auto_free_cnx_set:
       
   682                 self._free_cnxset()
       
   683 
       
   684 
   475 
   685     # Entity cache management #################################################
   476     # Entity cache management #################################################
   686     #
   477     #
   687     # The connection entity cache as held in cnx.transaction_data is removed at the
   478     # The connection entity cache as held in cnx.transaction_data is removed at the
   688     # end of the connection (commit and rollback)
   479     # end of the connection (commit and rollback)
   990     @_open_only
   781     @_open_only
   991     def source_defs(self):
   782     def source_defs(self):
   992         return self.repo.source_defs()
   783         return self.repo.source_defs()
   993 
   784 
   994     @deprecated('[3.19] use .entity_metas(eid) instead')
   785     @deprecated('[3.19] use .entity_metas(eid) instead')
   995     @_with_cnx_set
       
   996     @_open_only
   786     @_open_only
   997     def describe(self, eid, asdict=False):
   787     def describe(self, eid, asdict=False):
   998         """return a tuple (type, sourceuri, extid) for the entity with id <eid>"""
   788         """return a tuple (type, sourceuri, extid) for the entity with id <eid>"""
   999         etype, extid, source = self.repo.type_and_source_from_eid(eid, self)
   789         etype, extid, source = self.repo.type_and_source_from_eid(eid, self)
  1000         metas = {'type': etype, 'source': source, 'extid': extid}
   790         metas = {'type': etype, 'source': source, 'extid': extid}
  1001         if asdict:
   791         if asdict:
  1002             metas['asource'] = metas['source'] # XXX pre 3.19 client compat
   792             metas['asource'] = metas['source'] # XXX pre 3.19 client compat
  1003             return metas
   793             return metas
  1004         return etype, source, extid
   794         return etype, source, extid
  1005 
   795 
  1006     @_with_cnx_set
       
  1007     @_open_only
   796     @_open_only
  1008     def entity_metas(self, eid):
   797     def entity_metas(self, eid):
  1009         """return a tuple (type, sourceuri, extid) for the entity with id <eid>"""
   798         """return a tuple (type, sourceuri, extid) for the entity with id <eid>"""
  1010         etype, extid, source = self.repo.type_and_source_from_eid(eid, self)
   799         etype, extid, source = self.repo.type_and_source_from_eid(eid, self)
  1011         return {'type': etype, 'source': source, 'extid': extid}
   800         return {'type': etype, 'source': source, 'extid': extid}
  1012 
   801 
  1013     # core method #############################################################
   802     # core method #############################################################
  1014 
   803 
  1015     @_with_cnx_set
       
  1016     @_open_only
   804     @_open_only
  1017     def execute(self, rql, kwargs=None, build_descr=True):
   805     def execute(self, rql, kwargs=None, build_descr=True):
  1018         """db-api like method directly linked to the querier execute method.
   806         """db-api like method directly linked to the querier execute method.
  1019 
   807 
  1020         See :meth:`cubicweb.dbapi.Cursor.execute` documentation.
   808         See :meth:`cubicweb.dbapi.Cursor.execute` documentation.
  1024         rset.req = self
   812         rset.req = self
  1025         self._session_timestamp.touch()
   813         self._session_timestamp.touch()
  1026         return rset
   814         return rset
  1027 
   815 
  1028     @_open_only
   816     @_open_only
  1029     def rollback(self, free_cnxset=True, reset_pool=None):
   817     def rollback(self, free_cnxset=None, reset_pool=None):
  1030         """rollback the current transaction"""
   818         """rollback the current transaction"""
       
   819         if free_cnxset is not None:
       
   820             warn('[3.21] free_cnxset is now unneeded',
       
   821                  DeprecationWarning, stacklevel=2)
  1031         if reset_pool is not None:
   822         if reset_pool is not None:
  1032             warn('[3.13] use free_cnxset argument instead for reset_pool',
   823             warn('[3.13] reset_pool is now unneeded',
  1033                  DeprecationWarning, stacklevel=2)
   824                  DeprecationWarning, stacklevel=2)
  1034             free_cnxset = reset_pool
       
  1035         if self._cnxset_count != 0:
       
  1036             # we are inside ensure_cnx_set, don't lose it
       
  1037             free_cnxset = False
       
  1038         cnxset = self.cnxset
   825         cnxset = self.cnxset
  1039         if cnxset is None:
   826         assert cnxset is not None
  1040             self.clear()
       
  1041             self._session_timestamp.touch()
       
  1042             self.debug('rollback transaction %s done (no db activity)', self.connectionid)
       
  1043             return
       
  1044         try:
   827         try:
  1045             # by default, operations are executed with security turned off
   828             # by default, operations are executed with security turned off
  1046             with self.security_enabled(False, False):
   829             with self.security_enabled(False, False):
  1047                 while self.pending_operations:
   830                 while self.pending_operations:
  1048                     try:
   831                     try:
  1053                         continue
   836                         continue
  1054                 cnxset.rollback()
   837                 cnxset.rollback()
  1055                 self.debug('rollback for transaction %s done', self.connectionid)
   838                 self.debug('rollback for transaction %s done', self.connectionid)
  1056         finally:
   839         finally:
  1057             self._session_timestamp.touch()
   840             self._session_timestamp.touch()
  1058             if free_cnxset:
       
  1059                 self._free_cnxset(ignoremode=True)
       
  1060             self.clear()
   841             self.clear()
  1061 
   842 
  1062     @_open_only
   843     @_open_only
  1063     def commit(self, free_cnxset=True, reset_pool=None):
   844     def commit(self, free_cnxset=None, reset_pool=None):
  1064         """commit the current session's transaction"""
   845         """commit the current session's transaction"""
       
   846         if free_cnxset is not None:
       
   847             warn('[3.21] free_cnxset is now unneeded',
       
   848                  DeprecationWarning, stacklevel=2)
  1065         if reset_pool is not None:
   849         if reset_pool is not None:
  1066             warn('[3.13] use free_cnxset argument instead for reset_pool',
   850             warn('[3.13] reset_pool is now unneeded',
  1067                  DeprecationWarning, stacklevel=2)
   851                  DeprecationWarning, stacklevel=2)
  1068             free_cnxset = reset_pool
   852         assert self.cnxset is not None
  1069         if self.cnxset is None:
       
  1070             assert not self.pending_operations
       
  1071             self.clear()
       
  1072             self._session_timestamp.touch()
       
  1073             self.debug('commit transaction %s done (no db activity)', self.connectionid)
       
  1074             return
       
  1075         if self._cnxset_count != 0:
       
  1076             # we are inside ensure_cnx_set, don't lose it
       
  1077             free_cnxset = False
       
  1078         cstate = self.commit_state
   853         cstate = self.commit_state
  1079         if cstate == 'uncommitable':
   854         if cstate == 'uncommitable':
  1080             raise QueryError('transaction must be rolled back')
   855             raise QueryError('transaction must be rolled back')
  1081         if cstate == 'precommit':
   856         if cstate == 'precommit':
  1082             self.warn('calling commit in precommit makes no sense; ignoring commit')
   857             self.warn('calling commit in precommit makes no sense; ignoring commit')
  1132                                 self.critical('error while reverting precommit',
   907                                 self.critical('error while reverting precommit',
  1133                                               exc_info=True)
   908                                               exc_info=True)
  1134                     # XXX use slice notation since self.pending_operations is a
   909                     # XXX use slice notation since self.pending_operations is a
  1135                     # read-only property.
   910                     # read-only property.
  1136                     self.pending_operations[:] = processed + self.pending_operations
   911                     self.pending_operations[:] = processed + self.pending_operations
  1137                     self.rollback(free_cnxset)
   912                     self.rollback()
  1138                     raise
   913                     raise
  1139                 self.cnxset.commit()
   914                 self.cnxset.commit()
  1140                 self.commit_state = 'postcommit'
   915                 self.commit_state = 'postcommit'
  1141                 if debug:
   916                 if debug:
  1142                     print self.commit_state, '*' * 20
   917                     print self.commit_state, '*' * 20
  1153                                           exc_info=sys.exc_info())
   928                                           exc_info=sys.exc_info())
  1154                 self.debug('postcommit transaction %s done', self.connectionid)
   929                 self.debug('postcommit transaction %s done', self.connectionid)
  1155                 return self.transaction_uuid(set=False)
   930                 return self.transaction_uuid(set=False)
  1156         finally:
   931         finally:
  1157             self._session_timestamp.touch()
   932             self._session_timestamp.touch()
  1158             if free_cnxset:
       
  1159                 self._free_cnxset(ignoremode=True)
       
  1160             self.clear()
   933             self.clear()
  1161 
   934 
  1162     # resource accessors ######################################################
   935     # resource accessors ######################################################
  1163 
   936 
  1164     @_with_cnx_set
       
  1165     @_open_only
   937     @_open_only
  1166     def call_service(self, regid, **kwargs):
   938     def call_service(self, regid, **kwargs):
  1167         self.debug('calling service %s', regid)
   939         self.debug('calling service %s', regid)
  1168         service = self.vreg['services'].select(regid, self, **kwargs)
   940         service = self.vreg['services'].select(regid, self, **kwargs)
  1169         return service.call(**kwargs)
   941         return service.call(**kwargs)
  1170 
   942 
  1171     @_with_cnx_set
       
  1172     @_open_only
   943     @_open_only
  1173     def system_sql(self, sql, args=None, rollback_on_failure=True):
   944     def system_sql(self, sql, args=None, rollback_on_failure=True):
  1174         """return a sql cursor on the system database"""
   945         """return a sql cursor on the system database"""
  1175         if sql.split(None, 1)[0].upper() != 'SELECT':
       
  1176             self.mode = 'write'
       
  1177         source = self.repo.system_source
   946         source = self.repo.system_source
  1178         try:
   947         try:
  1179             return source.doexec(self, sql, args, rollback=rollback_on_failure)
   948             return source.doexec(self, sql, args, rollback=rollback_on_failure)
  1180         except (source.OperationalError, source.InterfaceError):
   949         except (source.OperationalError, source.InterfaceError):
  1181             if not rollback_on_failure:
   950             if not rollback_on_failure: