[server/sources/native] deal with connections, not sessions
authorJulien Cristau <julien.cristau@logilab.fr>
Tue, 01 Apr 2014 16:17:03 +0200
changeset 9611 ca853478aaa8
parent 9610 29450466273a
child 9612 24460d4d64bf
[server/sources/native] deal with connections, not sessions
server/sources/native.py
--- a/server/sources/native.py	Tue Apr 01 16:12:50 2014 +0200
+++ b/server/sources/native.py	Tue Apr 01 16:17:03 2014 +0200
@@ -153,34 +153,34 @@
                                'rtype': rdef.rtype,
                                'eid': tentity.eid})
 
-def _undo_rel_info(session, subj, rtype, obj):
+def _undo_rel_info(cnx, subj, rtype, obj):
     entities = []
     for role, eid in (('subject', subj), ('object', obj)):
         try:
-            entities.append(session.entity_from_eid(eid))
+            entities.append(cnx.entity_from_eid(eid))
         except UnknownEid:
-            raise _UndoException(session._(
+            raise _UndoException(cnx._(
                 "Can't restore relation %(rtype)s, %(role)s entity %(eid)s"
                 " doesn't exist anymore.")
-                                % {'role': session._(role),
-                                   'rtype': session._(rtype),
+                                % {'role': cnx._(role),
+                                   'rtype': cnx._(rtype),
                                    'eid': eid})
     sentity, oentity = entities
     try:
-        rschema = session.vreg.schema.rschema(rtype)
+        rschema = cnx.vreg.schema.rschema(rtype)
         rdef = rschema.rdefs[(sentity.cw_etype, oentity.cw_etype)]
     except KeyError:
-        raise _UndoException(session._(
+        raise _UndoException(cnx._(
             "Can't restore relation %(rtype)s between %(subj)s and "
             "%(obj)s, that relation does not exists anymore in the "
             "schema.")
-                            % {'rtype': session._(rtype),
+                            % {'rtype': cnx._(rtype),
                                'subj': subj,
                                'obj': obj})
     return sentity, oentity, rdef
 
-def _undo_has_later_transaction(session, eid):
-    return session.system_sql('''\
+def _undo_has_later_transaction(cnx, eid):
+    return cnx.system_sql('''\
 SELECT T.tx_uuid FROM transactions AS TREF, transactions AS T
 WHERE TREF.tx_uuid='%(txuuid)s' AND T.tx_uuid!='%(txuuid)s'
 AND T.tx_time>=TREF.tx_time
@@ -189,7 +189,7 @@
      OR EXISTS(SELECT 1 FROM tx_relation_actions as TRA
                WHERE TRA.tx_uuid=T.tx_uuid AND (
                    TRA.eid_from=%(eid)s OR TRA.eid_to=%(eid)s))
-     )''' % {'txuuid': session.transaction_data['undoing_uuid'],
+     )''' % {'txuuid': cnx.transaction_data['undoing_uuid'],
              'eid': eid}).fetchone()
 
 
@@ -206,7 +206,7 @@
             self.cnx.close()
         self.cnx = None
 
-    def create_eid(self, _session, count=1):
+    def create_eid(self, _cnx, count=1):
         # lock needed to prevent 'Connection is busy with results for another
         # command (0)' errors with SQLServer
         assert count > 0
@@ -259,12 +259,12 @@
     def close(self):
         pass
 
-    def create_eid(self, session, count=1):
+    def create_eid(self, cnx, count=1):
         assert count > 0
         source = self.source
         with self.lock:
             for sql in source.dbhelper.sqls_increment_numrange('entities_id_seq', count):
-                cursor = source.doexec(session, sql)
+                cursor = source.doexec(cnx, sql)
             return cursor.fetchone()[0]
 
 
@@ -369,9 +369,9 @@
         self._cache.pop('Any X WHERE X eid %s' % eid, None)
         self._cache.pop('Any %s' % eid, None)
 
-    def sqlexec(self, session, sql, args=None):
+    def sqlexec(self, cnx, sql, args=None):
         """execute the query and return its result"""
-        return self.process_result(self.doexec(session, sql, args))
+        return self.process_result(self.doexec(cnx, sql, args))
 
     def init_creating(self, cnxset=None):
         # check full text index availibility
@@ -521,7 +521,7 @@
                 continue
         raise AuthenticationError()
 
-    def syntax_tree_search(self, session, union, args=None, cachekey=None,
+    def syntax_tree_search(self, cnx, union, args=None, cachekey=None,
                            varmap=None):
         """return result from this source for a rql query (actually from
         a rql syntax tree and a solution dictionary mapping each used
@@ -547,25 +547,25 @@
         args = self.merge_args(args, qargs)
         assert isinstance(sql, basestring), repr(sql)
         try:
-            cursor = self.doexec(session, sql, args)
+            cursor = self.doexec(cnx, sql, args)
         except (self.OperationalError, self.InterfaceError):
-            if session.mode == 'write':
+            if cnx.mode == 'write':
                 # do not attempt to reconnect if there has been some write
                 # during the transaction
                 raise
             # FIXME: better detection of deconnection pb
             self.warning("trying to reconnect")
-            session.cnxset.reconnect()
-            cursor = self.doexec(session, sql, args)
+            cnx.cnxset.reconnect()
+            cursor = self.doexec(cnx, sql, args)
         except self.DbapiError as exc:
             # We get this one with pyodbc and SQL Server when connection was reset
-            if exc.args[0] == '08S01' and session.mode != 'write':
+            if exc.args[0] == '08S01' and cnx.mode != 'write':
                 self.warning("trying to reconnect")
-                session.cnxset.reconnect()
-                cursor = self.doexec(session, sql, args)
+                cnx.cnxset.reconnect()
+                cursor = self.doexec(cnx, sql, args)
             else:
                 raise
-        results = self.process_result(cursor, cbs, session=session)
+        results = self.process_result(cursor, cbs, session=cnx)
         assert dbg_results(results)
         return results
 
@@ -600,60 +600,60 @@
             for entity, attr, value in restore_values:
                 entity.cw_edited.edited_attribute(attr, value)
 
-    def add_entity(self, session, entity):
+    def add_entity(self, cnx, entity):
         """add a new entity to the source"""
         with self._storage_handler(entity, 'added'):
             attrs = self.preprocess_entity(entity)
             sql = self.sqlgen.insert(SQL_PREFIX + entity.cw_etype, attrs)
-            self.doexec(session, sql, attrs)
-            if session.ertype_supports_undo(entity.cw_etype):
-                self._record_tx_action(session, 'tx_entity_actions', 'C',
+            self.doexec(cnx, sql, attrs)
+            if cnx.ertype_supports_undo(entity.cw_etype):
+                self._record_tx_action(cnx, 'tx_entity_actions', 'C',
                                        etype=entity.cw_etype, eid=entity.eid)
 
-    def update_entity(self, session, entity):
+    def update_entity(self, cnx, entity):
         """replace an entity in the source"""
         with self._storage_handler(entity, 'updated'):
             attrs = self.preprocess_entity(entity)
-            if session.ertype_supports_undo(entity.cw_etype):
-                changes = self._save_attrs(session, entity, attrs)
-                self._record_tx_action(session, 'tx_entity_actions', 'U',
+            if cnx.ertype_supports_undo(entity.cw_etype):
+                changes = self._save_attrs(cnx, entity, attrs)
+                self._record_tx_action(cnx, 'tx_entity_actions', 'U',
                                        etype=entity.cw_etype, eid=entity.eid,
                                        changes=self._binary(dumps(changes)))
             sql = self.sqlgen.update(SQL_PREFIX + entity.cw_etype, attrs,
                                      ['cw_eid'])
-            self.doexec(session, sql, attrs)
+            self.doexec(cnx, sql, attrs)
 
-    def delete_entity(self, session, entity):
+    def delete_entity(self, cnx, entity):
         """delete an entity from the source"""
         with self._storage_handler(entity, 'deleted'):
-            if session.ertype_supports_undo(entity.cw_etype):
+            if cnx.ertype_supports_undo(entity.cw_etype):
                 attrs = [SQL_PREFIX + r.type
                          for r in entity.e_schema.subject_relations()
                          if (r.final or r.inlined) and not r in VIRTUAL_RTYPES]
-                changes = self._save_attrs(session, entity, attrs)
-                self._record_tx_action(session, 'tx_entity_actions', 'D',
+                changes = self._save_attrs(cnx, entity, attrs)
+                self._record_tx_action(cnx, 'tx_entity_actions', 'D',
                                        etype=entity.cw_etype, eid=entity.eid,
                                        changes=self._binary(dumps(changes)))
             attrs = {'cw_eid': entity.eid}
             sql = self.sqlgen.delete(SQL_PREFIX + entity.cw_etype, attrs)
-            self.doexec(session, sql, attrs)
+            self.doexec(cnx, sql, attrs)
 
-    def add_relation(self, session, subject, rtype, object, inlined=False):
+    def add_relation(self, cnx, subject, rtype, object, inlined=False):
         """add a relation to the source"""
-        self._add_relations(session,  rtype, [(subject, object)], inlined)
-        if session.ertype_supports_undo(rtype):
-            self._record_tx_action(session, 'tx_relation_actions', 'A',
+        self._add_relations(cnx,  rtype, [(subject, object)], inlined)
+        if cnx.ertype_supports_undo(rtype):
+            self._record_tx_action(cnx, 'tx_relation_actions', 'A',
                                    eid_from=subject, rtype=rtype, eid_to=object)
 
-    def add_relations(self, session,  rtype, subj_obj_list, inlined=False):
+    def add_relations(self, cnx,  rtype, subj_obj_list, inlined=False):
         """add a relations to the source"""
-        self._add_relations(session, rtype, subj_obj_list, inlined)
-        if session.ertype_supports_undo(rtype):
+        self._add_relations(cnx, rtype, subj_obj_list, inlined)
+        if cnx.ertype_supports_undo(rtype):
             for subject, object in subj_obj_list:
-                self._record_tx_action(session, 'tx_relation_actions', 'A',
+                self._record_tx_action(cnx, 'tx_relation_actions', 'A',
                                        eid_from=subject, rtype=rtype, eid_to=object)
 
-    def _add_relations(self, session, rtype, subj_obj_list, inlined=False):
+    def _add_relations(self, cnx, rtype, subj_obj_list, inlined=False):
         """add a relation to the source"""
         sql = []
         if inlined is False:
@@ -663,7 +663,7 @@
         else: # used by data import
             etypes = {}
             for subject, object in subj_obj_list:
-                etype = session.entity_metas(subject)['type']
+                etype = cnx.entity_metas(subject)['type']
                 if etype in etypes:
                     etypes[etype].append((subject, object))
                 else:
@@ -675,20 +675,20 @@
                                      ['cw_eid']),
                             attrs))
         for statement, attrs in sql:
-            self.doexecmany(session, statement, attrs)
+            self.doexecmany(cnx, statement, attrs)
 
-    def delete_relation(self, session, subject, rtype, object):
+    def delete_relation(self, cnx, subject, rtype, object):
         """delete a relation from the source"""
         rschema = self.schema.rschema(rtype)
-        self._delete_relation(session, subject, rtype, object, rschema.inlined)
-        if session.ertype_supports_undo(rtype):
-            self._record_tx_action(session, 'tx_relation_actions', 'R',
+        self._delete_relation(cnx, subject, rtype, object, rschema.inlined)
+        if cnx.ertype_supports_undo(rtype):
+            self._record_tx_action(cnx, 'tx_relation_actions', 'R',
                                    eid_from=subject, rtype=rtype, eid_to=object)
 
-    def _delete_relation(self, session, subject, rtype, object, inlined=False):
+    def _delete_relation(self, cnx, subject, rtype, object, inlined=False):
         """delete a relation from the source"""
         if inlined:
-            table = SQL_PREFIX + session.entity_metas(subject)['type']
+            table = SQL_PREFIX + cnx.entity_metas(subject)['type']
             column = SQL_PREFIX + rtype
             sql = 'UPDATE %s SET %s=NULL WHERE %seid=%%(eid)s' % (table, column,
                                                                   SQL_PREFIX)
@@ -696,15 +696,15 @@
         else:
             attrs = {'eid_from': subject, 'eid_to': object}
             sql = self.sqlgen.delete('%s_relation' % rtype, attrs)
-        self.doexec(session, sql, attrs)
+        self.doexec(cnx, sql, attrs)
 
-    def doexec(self, session, query, args=None, rollback=True):
+    def doexec(self, cnx, query, args=None, rollback=True):
         """Execute a query.
         it's a function just so that it shows up in profiling
         """
-        cursor = session.cnxset.cu
+        cursor = cnx.cnxset.cu
         if server.DEBUG & server.DBG_SQL:
-            cnx = session.cnxset.cnx
+            cnx = cnx.cnxset.cnx
             # getattr to get the actual connection if cnx is a CnxLoggingWrapper
             # instance
             print 'exec', query, args, getattr(cnx, '_cnx', cnx)
@@ -719,7 +719,7 @@
                               query, args, ex.args[0])
             if rollback:
                 try:
-                    session.cnxset.rollback()
+                    cnx.cnxset.rollback()
                     if self.repo.config.mode != 'test':
                         self.critical('transaction has been rolled back')
                 except Exception as ex:
@@ -730,31 +730,31 @@
                     # postgres, sqlserver
                     mo = re.search("unique_[a-z0-9]{32}", arg)
                     if mo is not None:
-                        raise UniqueTogetherError(session, cstrname=mo.group(0))
+                        raise UniqueTogetherError(cnx, cstrname=mo.group(0))
                     # old sqlite
                     mo = re.search('columns (.*) are not unique', arg)
                     if mo is not None: # sqlite in use
                         # we left chop the 'cw_' prefix of attribute names
                         rtypes = [c.strip()[3:]
                                   for c in mo.group(1).split(',')]
-                        raise UniqueTogetherError(session, rtypes=rtypes)
+                        raise UniqueTogetherError(cnx, rtypes=rtypes)
                     # sqlite after http://www.sqlite.org/cgi/src/info/c80e229dd9c1230a
                     if arg.startswith('UNIQUE constraint failed:'):
                         # message looks like: "UNIQUE constraint failed: foo.cw_bar, foo.cw_baz"
                         # so drop the prefix, split on comma, drop the tablenames, and drop "cw_"
                         columns = arg.split(':', 1)[1].split(',')
                         rtypes = [c.split('.', 1)[1].strip()[3:] for c in columns]
-                        raise UniqueTogetherError(session, rtypes=rtypes)
+                        raise UniqueTogetherError(cnx, rtypes=rtypes)
             raise
         return cursor
 
-    def doexecmany(self, session, query, args):
+    def doexecmany(self, cnx, query, args):
         """Execute a query.
         it's a function just so that it shows up in profiling
         """
         if server.DEBUG & server.DBG_SQL:
             print 'execmany', query, 'with', len(args), 'arguments'
-        cursor = session.cnxset.cu
+        cursor = cnx.cnxset.cu
         try:
             # str(query) to avoid error if it's an unicode string
             cursor.executemany(str(query), args)
@@ -765,7 +765,7 @@
                 self.critical("sql many: %r\n args: %s\ndbms message: %r",
                               query, args, ex.args[0])
             try:
-                session.cnxset.rollback()
+                cnx.cnxset.rollback()
                 if self.repo.config.mode != 'test':
                     self.critical('transaction has been rolled back')
             except Exception:
@@ -774,7 +774,7 @@
 
     # short cut to method requiring advanced db helper usage ##################
 
-    def update_rdef_column(self, session, rdef):
+    def update_rdef_column(self, cnx, rdef):
         """update physical column for a relation definition (final or inlined)
         """
         table, column = rdef_table_column(rdef)
@@ -783,12 +783,12 @@
             self.error("backend can't alter %s.%s to %s%s", table, column, coltype,
                        not allownull and 'NOT NULL' or '')
             return
-        self.dbhelper.change_col_type(LogCursor(session.cnxset.cu),
+        self.dbhelper.change_col_type(LogCursor(cnx.cnxset.cu),
                                       table, column, coltype, allownull)
         self.info('altered %s.%s: now %s%s', table, column, coltype,
                   not allownull and 'NOT NULL' or '')
 
-    def update_rdef_null_allowed(self, session, rdef):
+    def update_rdef_null_allowed(self, cnx, rdef):
         """update NULL / NOT NULL of physical column for a relation definition
         (final or inlined)
         """
@@ -798,62 +798,62 @@
             return
         table, column = rdef_table_column(rdef)
         coltype, allownull = rdef_physical_info(self.dbhelper, rdef)
-        self.dbhelper.set_null_allowed(LogCursor(session.cnxset.cu),
+        self.dbhelper.set_null_allowed(LogCursor(cnx.cnxset.cu),
                                        table, column, coltype, allownull)
 
-    def update_rdef_indexed(self, session, rdef):
+    def update_rdef_indexed(self, cnx, rdef):
         table, column = rdef_table_column(rdef)
         if rdef.indexed:
-            self.create_index(session, table, column)
+            self.create_index(cnx, table, column)
         else:
-            self.drop_index(session, table, column)
+            self.drop_index(cnx, table, column)
 
-    def update_rdef_unique(self, session, rdef):
+    def update_rdef_unique(self, cnx, rdef):
         table, column = rdef_table_column(rdef)
         if rdef.constraint_by_type('UniqueConstraint'):
-            self.create_index(session, table, column, unique=True)
+            self.create_index(cnx, table, column, unique=True)
         else:
-            self.drop_index(session, table, column, unique=True)
+            self.drop_index(cnx, table, column, unique=True)
 
-    def create_index(self, session, table, column, unique=False):
-        cursor = LogCursor(session.cnxset.cu)
+    def create_index(self, cnx, table, column, unique=False):
+        cursor = LogCursor(cnx.cnxset.cu)
         self.dbhelper.create_index(cursor, table, column, unique)
 
-    def drop_index(self, session, table, column, unique=False):
-        cursor = LogCursor(session.cnxset.cu)
+    def drop_index(self, cnx, table, column, unique=False):
+        cursor = LogCursor(cnx.cnxset.cu)
         self.dbhelper.drop_index(cursor, table, column, unique)
 
     # system source interface #################################################
 
-    def _eid_type_source(self, session, eid, sql, _retry=True):
+    def _eid_type_source(self, cnx, eid, sql, _retry=True):
         try:
-            res = self.doexec(session, sql).fetchone()
+            res = self.doexec(cnx, sql).fetchone()
             if res is not None:
                 return res
         except (self.OperationalError, self.InterfaceError):
-            if session.mode == 'read' and _retry:
+            if cnx.mode == 'read' and _retry:
                 self.warning("trying to reconnect (eid_type_source())")
-                session.cnxset.reconnect()
-                return self._eid_type_source(session, eid, sql, _retry=False)
+                cnx.cnxset.reconnect()
+                return self._eid_type_source(cnx, eid, sql, _retry=False)
         except Exception:
-            assert session.cnxset, 'session has no connections set'
+            assert cnx.cnxset, 'connection has no connections set'
             self.exception('failed to query entities table for eid %s', eid)
         raise UnknownEid(eid)
 
-    def eid_type_source(self, session, eid): # pylint: disable=E0202
+    def eid_type_source(self, cnx, eid): # pylint: disable=E0202
         """return a tuple (type, source, extid) for the entity with id <eid>"""
         sql = 'SELECT type, extid, asource FROM entities WHERE eid=%s' % eid
-        res = self._eid_type_source(session, eid, sql)
+        res = self._eid_type_source(cnx, eid, sql)
         if res[-2] is not None:
             if not isinstance(res, list):
                 res = list(res)
             res[-2] = b64decode(res[-2])
         return res
 
-    def eid_type_source_pre_131(self, session, eid):
+    def eid_type_source_pre_131(self, cnx, eid):
         """return a tuple (type, source, extid) for the entity with id <eid>"""
         sql = 'SELECT type, extid FROM entities WHERE eid=%s' % eid
-        res = self._eid_type_source(session, eid, sql)
+        res = self._eid_type_source(cnx, eid, sql)
         if not isinstance(res, list):
             res = list(res)
         if res[-1] is not None:
@@ -861,10 +861,10 @@
         res.append(res[1])
         return res
 
-    def extid2eid(self, session, extid):
+    def extid2eid(self, cnx, extid):
         """get eid from an external id. Return None if no record found."""
         assert isinstance(extid, str)
-        cursor = self.doexec(session,
+        cursor = self.doexec(cnx,
                              'SELECT eid FROM entities WHERE extid=%(x)s',
                              {'x': b64encode(extid)})
         # XXX testing rowcount cause strange bug with sqlite, results are there
@@ -878,70 +878,70 @@
             pass
         return None
 
-    def _handle_is_relation_sql(self, session, sql, attrs):
+    def _handle_is_relation_sql(self, cnx, sql, attrs):
         """ Handler for specific is_relation sql that may be
         overwritten in some stores"""
-        self.doexec(session, sql % attrs)
+        self.doexec(cnx, sql % attrs)
 
     _handle_insert_entity_sql = doexec
     _handle_is_instance_of_sql = _handle_source_relation_sql = _handle_is_relation_sql
 
-    def add_info(self, session, entity, source, extid, complete):
+    def add_info(self, cnx, entity, source, extid, complete):
         """add type and source info for an eid into the system table"""
-        with session.ensure_cnx_set:
+        with cnx.ensure_cnx_set:
             # begin by inserting eid/type/source/extid into the entities table
             if extid is not None:
                 assert isinstance(extid, str)
                 extid = b64encode(extid)
             attrs = {'type': entity.cw_etype, 'eid': entity.eid, 'extid': extid,
                      'asource': source.uri}
-            self._handle_insert_entity_sql(session, self.sqlgen.insert('entities', attrs), attrs)
+            self._handle_insert_entity_sql(cnx, self.sqlgen.insert('entities', attrs), attrs)
             # insert core relations: is, is_instance_of and cw_source
             try:
-                self._handle_is_relation_sql(session, 'INSERT INTO is_relation(eid_from,eid_to) VALUES (%s,%s)',
-                                             (entity.eid, eschema_eid(session, entity.e_schema)))
+                self._handle_is_relation_sql(cnx, 'INSERT INTO is_relation(eid_from,eid_to) VALUES (%s,%s)',
+                                             (entity.eid, eschema_eid(cnx, entity.e_schema)))
             except IndexError:
                 # during schema serialization, skip
                 pass
             else:
                 for eschema in entity.e_schema.ancestors() + [entity.e_schema]:
-                    self._handle_is_relation_sql(session,
+                    self._handle_is_relation_sql(cnx,
                                                  'INSERT INTO is_instance_of_relation(eid_from,eid_to) VALUES (%s,%s)',
-                                                 (entity.eid, eschema_eid(session, eschema)))
+                                                 (entity.eid, eschema_eid(cnx, eschema)))
             if 'CWSource' in self.schema and source.eid is not None: # else, cw < 3.10
-                self._handle_is_relation_sql(session, 'INSERT INTO cw_source_relation(eid_from,eid_to) VALUES (%s,%s)',
+                self._handle_is_relation_sql(cnx, 'INSERT INTO cw_source_relation(eid_from,eid_to) VALUES (%s,%s)',
                                              (entity.eid, source.eid))
             # now we can update the full text index
             if self.do_fti and self.need_fti_indexation(entity.cw_etype):
                 if complete:
                     entity.complete(entity.e_schema.indexable_attributes())
-                self.index_entity(session, entity=entity)
+                self.index_entity(cnx, entity=entity)
 
-    def update_info(self, session, entity, need_fti_update):
+    def update_info(self, cnx, entity, need_fti_update):
         """mark entity as being modified, fulltext reindex if needed"""
         if self.do_fti and need_fti_update:
             # reindex the entity only if this query is updating at least
             # one indexable attribute
-            self.index_entity(session, entity=entity)
+            self.index_entity(cnx, entity=entity)
 
-    def delete_info_multi(self, session, entities):
+    def delete_info_multi(self, cnx, entities):
         """delete system information on deletion of a list of entities with the
         same etype and belinging to the same source
 
         * update the fti
         * remove record from the `entities` table
         """
-        self.fti_unindex_entities(session, entities)
+        self.fti_unindex_entities(cnx, entities)
         attrs = {'eid': '(%s)' % ','.join([str(_e.eid) for _e in entities])}
-        self.doexec(session, self.sqlgen.delete_many('entities', attrs), attrs)
+        self.doexec(cnx, self.sqlgen.delete_many('entities', attrs), attrs)
 
     # undo support #############################################################
 
-    def undoable_transactions(self, session, ueid=None, **actionfilters):
+    def undoable_transactions(self, cnx, ueid=None, **actionfilters):
         """See :class:`cubicweb.repoapi.ClientConnection.undoable_transactions`"""
-        # force filtering to session's user if not a manager
-        if not session.user.is_in_group('managers'):
-            ueid = session.user.eid
+        # force filtering to connection's user if not a manager
+        if not cnx.user.is_in_group('managers'):
+            ueid = cnx.user.eid
         restr = {}
         if ueid is not None:
             restr['tx_user'] = ueid
@@ -1005,18 +1005,18 @@
             restr.update(tearestr)
         # we want results ordered by transaction's time descendant
         sql += ' ORDER BY tx_time DESC'
-        with session.ensure_cnx_set:
-            cu = self.doexec(session, sql, restr)
+        with cnx.ensure_cnx_set:
+            cu = self.doexec(cnx, sql, restr)
             # turn results into transaction objects
             return [tx.Transaction(*args) for args in cu.fetchall()]
 
-    def tx_info(self, session, txuuid):
+    def tx_info(self, cnx, txuuid):
         """See :class:`cubicweb.repoapi.ClientConnection.transaction_info`"""
-        return tx.Transaction(txuuid, *self._tx_info(session, txuuid))
+        return tx.Transaction(txuuid, *self._tx_info(cnx, txuuid))
 
-    def tx_actions(self, session, txuuid, public):
+    def tx_actions(self, cnx, txuuid, public):
         """See :class:`cubicweb.repoapi.ClientConnection.transaction_actions`"""
-        self._tx_info(session, txuuid)
+        self._tx_info(cnx, txuuid)
         restr = {'tx_uuid': txuuid}
         if public:
             restr['txa_public'] = True
@@ -1024,54 +1024,54 @@
         sql = self.sqlgen.select('tx_entity_actions', restr,
                                  ('txa_action', 'txa_public', 'txa_order',
                                   'etype', 'eid', 'changes'))
-        cu = self.doexec(session, sql, restr)
+        cu = self.doexec(cnx, sql, restr)
         actions = [tx.EntityAction(a,p,o,et,e,c and loads(self.binary_to_str(c)))
                    for a,p,o,et,e,c in cu.fetchall()]
         sql = self.sqlgen.select('tx_relation_actions', restr,
                                  ('txa_action', 'txa_public', 'txa_order',
                                   'rtype', 'eid_from', 'eid_to'))
-        cu = self.doexec(session, sql, restr)
+        cu = self.doexec(cnx, sql, restr)
         actions += [tx.RelationAction(*args) for args in cu.fetchall()]
         return sorted(actions, key=lambda x: x.order)
 
-    def undo_transaction(self, session, txuuid):
+    def undo_transaction(self, cnx, txuuid):
         """See :class:`cubicweb.repoapi.ClientConnection.undo_transaction`
 
         important note: while undoing of a transaction, only hooks in the
         'integrity', 'activeintegrity' and 'undo' categories are called.
         """
         # set mode so connections set isn't released subsquently until commit/rollback
-        session.mode = 'write'
+        cnx.mode = 'write'
         errors = []
-        session.transaction_data['undoing_uuid'] = txuuid
-        with session.deny_all_hooks_but('integrity', 'activeintegrity', 'undo'):
-            with session.security_enabled(read=False):
-                for action in reversed(self.tx_actions(session, txuuid, False)):
+        cnx.transaction_data['undoing_uuid'] = txuuid
+        with cnx.deny_all_hooks_but('integrity', 'activeintegrity', 'undo'):
+            with cnx.security_enabled(read=False):
+                for action in reversed(self.tx_actions(cnx, txuuid, False)):
                     undomethod = getattr(self, '_undo_%s' % action.action.lower())
-                    errors += undomethod(session, action)
+                    errors += undomethod(cnx, action)
         # remove the transactions record
-        self.doexec(session,
+        self.doexec(cnx,
                     "DELETE FROM transactions WHERE tx_uuid='%s'" % txuuid)
         if errors:
             raise UndoTransactionException(txuuid, errors)
         else:
             return
 
-    def start_undoable_transaction(self, session, uuid):
-        """session callback to insert a transaction record in the transactions
+    def start_undoable_transaction(self, cnx, uuid):
+        """connection callback to insert a transaction record in the transactions
         table when some undoable transaction is started
         """
-        ueid = session.user.eid
+        ueid = cnx.user.eid
         attrs = {'tx_uuid': uuid, 'tx_user': ueid, 'tx_time': datetime.utcnow()}
-        self.doexec(session, self.sqlgen.insert('transactions', attrs), attrs)
+        self.doexec(cnx, self.sqlgen.insert('transactions', attrs), attrs)
 
-    def _save_attrs(self, session, entity, attrs):
+    def _save_attrs(self, cnx, entity, attrs):
         """return a pickleable dictionary containing current values for given
         attributes of the entity
         """
         restr = {'cw_eid': entity.eid}
         sql = self.sqlgen.select(SQL_PREFIX + entity.cw_etype, restr, attrs)
-        cu = self.doexec(session, sql, restr)
+        cu = self.doexec(cnx, sql, restr)
         values = dict(zip(attrs, cu.fetchone()))
         # ensure backend specific binary are converted back to string
         eschema = entity.e_schema
@@ -1086,38 +1086,38 @@
                     values[column] = self.binary_to_str(value)
         return values
 
-    def _record_tx_action(self, session, table, action, **kwargs):
+    def _record_tx_action(self, cnx, table, action, **kwargs):
         """record a transaction action in the given table (either
         'tx_entity_actions' or 'tx_relation_action')
         """
-        kwargs['tx_uuid'] = session.transaction_uuid()
+        kwargs['tx_uuid'] = cnx.transaction_uuid()
         kwargs['txa_action'] = action
-        kwargs['txa_order'] = session.transaction_inc_action_counter()
-        kwargs['txa_public'] = session.running_dbapi_query
-        self.doexec(session, self.sqlgen.insert(table, kwargs), kwargs)
+        kwargs['txa_order'] = cnx.transaction_inc_action_counter()
+        kwargs['txa_public'] = cnx.running_dbapi_query
+        self.doexec(cnx, self.sqlgen.insert(table, kwargs), kwargs)
 
-    def _tx_info(self, session, txuuid):
+    def _tx_info(self, cnx, txuuid):
         """return transaction's time and user of the transaction with the given uuid.
 
         raise `NoSuchTransaction` if there is no such transaction of if the
-        session's user isn't allowed to see it.
+        connection's user isn't allowed to see it.
         """
-        with session.ensure_cnx_set:
+        with cnx.ensure_cnx_set:
             restr = {'tx_uuid': txuuid}
             sql = self.sqlgen.select('transactions', restr,
                                      ('tx_time', 'tx_user'))
-            cu = self.doexec(session, sql, restr)
+            cu = self.doexec(cnx, sql, restr)
             try:
                 time, ueid = cu.fetchone()
             except TypeError:
                 raise tx.NoSuchTransaction(txuuid)
-            if not (session.user.is_in_group('managers')
-                    or session.user.eid == ueid):
+            if not (cnx.user.is_in_group('managers')
+                    or cnx.user.eid == ueid):
                 raise tx.NoSuchTransaction(txuuid)
             return time, ueid
 
     def _reedit_entity(self, entity, changes, err):
-        session = entity._cw
+        cnx = entity._cw
         eid = entity.eid
         entity.cw_edited = edited = EditedEntity(entity)
         # check for schema changes, entities linked through inlined relation
@@ -1131,7 +1131,7 @@
             try:
                 rschema = getrschema[rtype]
             except KeyError:
-                err(session._("can't restore relation %(rtype)s of entity %(eid)s, "
+                err(cnx._("can't restore relation %(rtype)s of entity %(eid)s, "
                               "this relation does not exist in the schema anymore.")
                     % {'rtype': rtype, 'eid': eid})
             if not rschema.final:
@@ -1144,53 +1144,53 @@
                         entity._cw.entity_from_eid(value) # check target exists
                         edited[rtype] = value
                     except UnknownEid:
-                        err(session._("can't restore entity %(eid)s of type %(eschema)s, "
+                        err(cnx._("can't restore entity %(eid)s of type %(eschema)s, "
                                       "target of %(rtype)s (eid %(value)s) does not exist any longer")
                             % locals())
             elif eschema.destination(rtype) in ('Bytes', 'Password'):
                 changes[column] = self._binary(value)
                 edited[rtype] = Binary(value)
             elif isinstance(value, str):
-                edited[rtype] = unicode(value, session.encoding, 'replace')
+                edited[rtype] = unicode(value, cnx.encoding, 'replace')
             else:
                 edited[rtype] = value
         # This must only be done after init_entitiy_caches : defered in calling functions
         # edited.check()
 
-    def _undo_d(self, session, action):
+    def _undo_d(self, cnx, action):
         """undo an entity deletion"""
         errors = []
         err = errors.append
         eid = action.eid
         etype = action.etype
-        _ = session._
+        _ = cnx._
         # get an entity instance
         try:
-            entity = self.repo.vreg['etypes'].etype_class(etype)(session)
+            entity = self.repo.vreg['etypes'].etype_class(etype)(cnx)
         except Exception:
             err("can't restore entity %s of type %s, type no more supported"
                 % (eid, etype))
             return errors
         self._reedit_entity(entity, action.changes, err)
         entity.eid = eid
-        session.repo.init_entity_caches(session, entity, self)
+        cnx.repo.init_entity_caches(cnx, entity, self)
         entity.cw_edited.check()
-        self.repo.hm.call_hooks('before_add_entity', session, entity=entity)
+        self.repo.hm.call_hooks('before_add_entity', cnx, entity=entity)
         # restore the entity
         action.changes['cw_eid'] = eid
         sql = self.sqlgen.insert(SQL_PREFIX + etype, action.changes)
-        self.doexec(session, sql, action.changes)
+        self.doexec(cnx, sql, action.changes)
         # restore record in entities (will update fti if needed)
-        self.add_info(session, entity, self, None, True)
-        self.repo.hm.call_hooks('after_add_entity', session, entity=entity)
+        self.add_info(cnx, entity, self, None, True)
+        self.repo.hm.call_hooks('after_add_entity', cnx, entity=entity)
         return errors
 
-    def _undo_r(self, session, action):
+    def _undo_r(self, cnx, action):
         """undo a relation removal"""
         errors = []
         subj, rtype, obj = action.eid_from, action.rtype, action.eid_to
         try:
-            sentity, oentity, rdef = _undo_rel_info(session, subj, rtype, obj)
+            sentity, oentity, rdef = _undo_rel_info(cnx, subj, rtype, obj)
         except _UndoException as ex:
             errors.append(unicode(ex))
         else:
@@ -1202,78 +1202,78 @@
                     errors.append(unicode(ex))
                     continue
         if not errors:
-            self.repo.hm.call_hooks('before_add_relation', session,
+            self.repo.hm.call_hooks('before_add_relation', cnx,
                                     eidfrom=subj, rtype=rtype, eidto=obj)
             # add relation in the database
-            self._add_relations(session, rtype, [(subj, obj)], rdef.rtype.inlined)
+            self._add_relations(cnx, rtype, [(subj, obj)], rdef.rtype.inlined)
             # set related cache
-            session.update_rel_cache_add(subj, rtype, obj, rdef.rtype.symmetric)
-            self.repo.hm.call_hooks('after_add_relation', session,
+            cnx.update_rel_cache_add(subj, rtype, obj, rdef.rtype.symmetric)
+            self.repo.hm.call_hooks('after_add_relation', cnx,
                                     eidfrom=subj, rtype=rtype, eidto=obj)
         return errors
 
-    def _undo_c(self, session, action):
+    def _undo_c(self, cnx, action):
         """undo an entity creation"""
         eid = action.eid
         # XXX done to avoid fetching all remaining relation for the entity
         # we should find an efficient way to do this (keeping current veolidf
         # massive deletion performance)
-        if _undo_has_later_transaction(session, eid):
-            msg = session._('some later transaction(s) touch entity, undo them '
+        if _undo_has_later_transaction(cnx, eid):
+            msg = cnx._('some later transaction(s) touch entity, undo them '
                             'first')
             raise ValidationError(eid, {None: msg})
         etype = action.etype
         # get an entity instance
         try:
-            entity = self.repo.vreg['etypes'].etype_class(etype)(session)
+            entity = self.repo.vreg['etypes'].etype_class(etype)(cnx)
         except Exception:
-            return [session._(
+            return [cnx._(
                 "Can't undo creation of entity %(eid)s of type %(etype)s, type "
                 "no more supported" % {'eid': eid, 'etype': etype})]
         entity.eid = eid
         # for proper eid/type cache update
-        CleanupDeletedEidsCacheOp.get_instance(session).add_data(eid)
-        self.repo.hm.call_hooks('before_delete_entity', session, entity=entity)
+        CleanupDeletedEidsCacheOp.get_instance(cnx).add_data(eid)
+        self.repo.hm.call_hooks('before_delete_entity', cnx, entity=entity)
         # remove is / is_instance_of which are added using sql by hooks, hence
         # unvisible as transaction action
-        self.doexec(session, 'DELETE FROM is_relation WHERE eid_from=%s' % eid)
-        self.doexec(session, 'DELETE FROM is_instance_of_relation WHERE eid_from=%s' % eid)
-        self.doexec(session, 'DELETE FROM cw_source_relation WHERE eid_from=%s' % self.eid)
+        self.doexec(cnx, 'DELETE FROM is_relation WHERE eid_from=%s' % eid)
+        self.doexec(cnx, 'DELETE FROM is_instance_of_relation WHERE eid_from=%s' % eid)
+        self.doexec(cnx, 'DELETE FROM cw_source_relation WHERE eid_from=%s' % self.eid)
         # XXX check removal of inlined relation?
         # delete the entity
         attrs = {'cw_eid': eid}
         sql = self.sqlgen.delete(SQL_PREFIX + entity.cw_etype, attrs)
-        self.doexec(session, sql, attrs)
+        self.doexec(cnx, sql, attrs)
         # remove record from entities (will update fti if needed)
-        self.delete_info_multi(session, [entity])
-        self.repo.hm.call_hooks('after_delete_entity', session, entity=entity)
+        self.delete_info_multi(cnx, [entity])
+        self.repo.hm.call_hooks('after_delete_entity', cnx, entity=entity)
         return ()
 
-    def _undo_u(self, session, action):
+    def _undo_u(self, cnx, action):
         """undo an entity update"""
         errors = []
         err = errors.append
         try:
-            entity = session.entity_from_eid(action.eid)
+            entity = cnx.entity_from_eid(action.eid)
         except UnknownEid:
-            err(session._("can't restore state of entity %s, it has been "
+            err(cnx._("can't restore state of entity %s, it has been "
                           "deleted inbetween") % action.eid)
             return errors
         self._reedit_entity(entity, action.changes, err)
         entity.cw_edited.check()
-        self.repo.hm.call_hooks('before_update_entity', session, entity=entity)
+        self.repo.hm.call_hooks('before_update_entity', cnx, entity=entity)
         sql = self.sqlgen.update(SQL_PREFIX + entity.cw_etype, action.changes,
                                  ['cw_eid'])
-        self.doexec(session, sql, action.changes)
-        self.repo.hm.call_hooks('after_update_entity', session, entity=entity)
+        self.doexec(cnx, sql, action.changes)
+        self.repo.hm.call_hooks('after_update_entity', cnx, entity=entity)
         return errors
 
-    def _undo_a(self, session, action):
+    def _undo_a(self, cnx, action):
         """undo a relation addition"""
         errors = []
         subj, rtype, obj = action.eid_from, action.rtype, action.eid_to
         try:
-            sentity, oentity, rdef = _undo_rel_info(session, subj, rtype, obj)
+            sentity, oentity, rdef = _undo_rel_info(cnx, subj, rtype, obj)
         except _UndoException as ex:
             errors.append(unicode(ex))
         else:
@@ -1284,19 +1284,19 @@
             else:
                 sql = 'SELECT 1 FROM %s_relation WHERE eid_from=%s and eid_to=%s'\
                       % (rtype, subj, obj)
-            cu = self.doexec(session, sql)
+            cu = self.doexec(cnx, sql)
             if cu.fetchone() is None:
-                errors.append(session._(
+                errors.append(cnx._(
                     "Can't undo addition of relation %(rtype)s from %(subj)s to"
                     " %(obj)s, doesn't exist anymore" % locals()))
         if not errors:
-            self.repo.hm.call_hooks('before_delete_relation', session,
+            self.repo.hm.call_hooks('before_delete_relation', cnx,
                                     eidfrom=subj, rtype=rtype, eidto=obj)
             # delete relation from the database
-            self._delete_relation(session, subj, rtype, obj, rschema.inlined)
+            self._delete_relation(cnx, subj, rtype, obj, rschema.inlined)
             # set related cache
-            session.update_rel_cache_del(subj, rtype, obj, rschema.symmetric)
-            self.repo.hm.call_hooks('after_delete_relation', session,
+            cnx.update_rel_cache_del(subj, rtype, obj, rschema.symmetric)
+            self.repo.hm.call_hooks('after_delete_relation', cnx,
                                     eidfrom=subj, rtype=rtype, eidto=obj)
         return errors
 
@@ -1311,11 +1311,11 @@
             return True
         return False
 
-    def index_entity(self, session, entity):
+    def index_entity(self, cnx, entity):
         """create an operation to [re]index textual content of the given entity
         on commit
         """
-        FTIndexEntityOp.get_instance(session).add_data(entity.eid)
+        FTIndexEntityOp.get_instance(cnx).add_data(entity.eid)
 
     def fti_unindex_entities(self, cnx, entities):
         """remove text content for entities from the full text index
@@ -1354,10 +1354,10 @@
     """
 
     def precommit_event(self):
-        session = self.session
-        source = session.repo.system_source
-        pendingeids = session.transaction_data.get('pendingeids', ())
-        done = session.transaction_data.setdefault('indexedeids', set())
+        cnx = self.cnx
+        source = cnx.repo.system_source
+        pendingeids = cnx.transaction_data.get('pendingeids', ())
+        done = cnx.transaction_data.setdefault('indexedeids', set())
         to_reindex = set()
         for eid in self.get_data():
             if eid in pendingeids or eid in done:
@@ -1365,10 +1365,10 @@
                 # processed
                 continue
             done.add(eid)
-            iftindexable = session.entity_from_eid(eid).cw_adapt_to('IFTIndexable')
+            iftindexable = cnx.entity_from_eid(eid).cw_adapt_to('IFTIndexable')
             to_reindex |= set(iftindexable.fti_containers())
-        source.fti_unindex_entities(session, to_reindex)
-        source.fti_index_entities(session, to_reindex)
+        source.fti_unindex_entities(cnx, to_reindex)
+        source.fti_index_entities(cnx, to_reindex)
 
 def sql_schema(driver):
     helper = get_db_helper(driver)