server/sources/native.py
changeset 9611 ca853478aaa8
parent 9605 e6b8931abe17
child 9664 5ef5494b6b0b
equal deleted inserted replaced
9610:29450466273a 9611:ca853478aaa8
   151             "is already linked using this relation.")
   151             "is already linked using this relation.")
   152                             % {'role': neg_role(role),
   152                             % {'role': neg_role(role),
   153                                'rtype': rdef.rtype,
   153                                'rtype': rdef.rtype,
   154                                'eid': tentity.eid})
   154                                'eid': tentity.eid})
   155 
   155 
   156 def _undo_rel_info(session, subj, rtype, obj):
   156 def _undo_rel_info(cnx, subj, rtype, obj):
   157     entities = []
   157     entities = []
   158     for role, eid in (('subject', subj), ('object', obj)):
   158     for role, eid in (('subject', subj), ('object', obj)):
   159         try:
   159         try:
   160             entities.append(session.entity_from_eid(eid))
   160             entities.append(cnx.entity_from_eid(eid))
   161         except UnknownEid:
   161         except UnknownEid:
   162             raise _UndoException(session._(
   162             raise _UndoException(cnx._(
   163                 "Can't restore relation %(rtype)s, %(role)s entity %(eid)s"
   163                 "Can't restore relation %(rtype)s, %(role)s entity %(eid)s"
   164                 " doesn't exist anymore.")
   164                 " doesn't exist anymore.")
   165                                 % {'role': session._(role),
   165                                 % {'role': cnx._(role),
   166                                    'rtype': session._(rtype),
   166                                    'rtype': cnx._(rtype),
   167                                    'eid': eid})
   167                                    'eid': eid})
   168     sentity, oentity = entities
   168     sentity, oentity = entities
   169     try:
   169     try:
   170         rschema = session.vreg.schema.rschema(rtype)
   170         rschema = cnx.vreg.schema.rschema(rtype)
   171         rdef = rschema.rdefs[(sentity.cw_etype, oentity.cw_etype)]
   171         rdef = rschema.rdefs[(sentity.cw_etype, oentity.cw_etype)]
   172     except KeyError:
   172     except KeyError:
   173         raise _UndoException(session._(
   173         raise _UndoException(cnx._(
   174             "Can't restore relation %(rtype)s between %(subj)s and "
   174             "Can't restore relation %(rtype)s between %(subj)s and "
   175             "%(obj)s, that relation does not exists anymore in the "
   175             "%(obj)s, that relation does not exists anymore in the "
   176             "schema.")
   176             "schema.")
   177                             % {'rtype': session._(rtype),
   177                             % {'rtype': cnx._(rtype),
   178                                'subj': subj,
   178                                'subj': subj,
   179                                'obj': obj})
   179                                'obj': obj})
   180     return sentity, oentity, rdef
   180     return sentity, oentity, rdef
   181 
   181 
   182 def _undo_has_later_transaction(session, eid):
   182 def _undo_has_later_transaction(cnx, eid):
   183     return session.system_sql('''\
   183     return cnx.system_sql('''\
   184 SELECT T.tx_uuid FROM transactions AS TREF, transactions AS T
   184 SELECT T.tx_uuid FROM transactions AS TREF, transactions AS T
   185 WHERE TREF.tx_uuid='%(txuuid)s' AND T.tx_uuid!='%(txuuid)s'
   185 WHERE TREF.tx_uuid='%(txuuid)s' AND T.tx_uuid!='%(txuuid)s'
   186 AND T.tx_time>=TREF.tx_time
   186 AND T.tx_time>=TREF.tx_time
   187 AND (EXISTS(SELECT 1 FROM tx_entity_actions AS TEA
   187 AND (EXISTS(SELECT 1 FROM tx_entity_actions AS TEA
   188             WHERE TEA.tx_uuid=T.tx_uuid AND TEA.eid=%(eid)s)
   188             WHERE TEA.tx_uuid=T.tx_uuid AND TEA.eid=%(eid)s)
   189      OR EXISTS(SELECT 1 FROM tx_relation_actions as TRA
   189      OR EXISTS(SELECT 1 FROM tx_relation_actions as TRA
   190                WHERE TRA.tx_uuid=T.tx_uuid AND (
   190                WHERE TRA.tx_uuid=T.tx_uuid AND (
   191                    TRA.eid_from=%(eid)s OR TRA.eid_to=%(eid)s))
   191                    TRA.eid_from=%(eid)s OR TRA.eid_to=%(eid)s))
   192      )''' % {'txuuid': session.transaction_data['undoing_uuid'],
   192      )''' % {'txuuid': cnx.transaction_data['undoing_uuid'],
   193              'eid': eid}).fetchone()
   193              'eid': eid}).fetchone()
   194 
   194 
   195 
   195 
   196 class DefaultEidGenerator(object):
   196 class DefaultEidGenerator(object):
   197     __slots__ = ('source', 'cnx', 'lock')
   197     __slots__ = ('source', 'cnx', 'lock')
   204     def close(self):
   204     def close(self):
   205         if self.cnx:
   205         if self.cnx:
   206             self.cnx.close()
   206             self.cnx.close()
   207         self.cnx = None
   207         self.cnx = None
   208 
   208 
   209     def create_eid(self, _session, count=1):
   209     def create_eid(self, _cnx, count=1):
   210         # lock needed to prevent 'Connection is busy with results for another
   210         # lock needed to prevent 'Connection is busy with results for another
   211         # command (0)' errors with SQLServer
   211         # command (0)' errors with SQLServer
   212         assert count > 0
   212         assert count > 0
   213         with self.lock:
   213         with self.lock:
   214             return self._create_eid(count)
   214             return self._create_eid(count)
   257         self.lock = Lock()
   257         self.lock = Lock()
   258 
   258 
   259     def close(self):
   259     def close(self):
   260         pass
   260         pass
   261 
   261 
   262     def create_eid(self, session, count=1):
   262     def create_eid(self, cnx, count=1):
   263         assert count > 0
   263         assert count > 0
   264         source = self.source
   264         source = self.source
   265         with self.lock:
   265         with self.lock:
   266             for sql in source.dbhelper.sqls_increment_numrange('entities_id_seq', count):
   266             for sql in source.dbhelper.sqls_increment_numrange('entities_id_seq', count):
   267                 cursor = source.doexec(session, sql)
   267                 cursor = source.doexec(cnx, sql)
   268             return cursor.fetchone()[0]
   268             return cursor.fetchone()[0]
   269 
   269 
   270 
   270 
   271 class NativeSQLSource(SQLAdapterMixIn, AbstractSource):
   271 class NativeSQLSource(SQLAdapterMixIn, AbstractSource):
   272     """adapter for source using the native cubicweb schema (see below)
   272     """adapter for source using the native cubicweb schema (see below)
   367         """clear potential caches for the given eid"""
   367         """clear potential caches for the given eid"""
   368         self._cache.pop('Any X WHERE X eid %s, X is %s' % (eid, etype), None)
   368         self._cache.pop('Any X WHERE X eid %s, X is %s' % (eid, etype), None)
   369         self._cache.pop('Any X WHERE X eid %s' % eid, None)
   369         self._cache.pop('Any X WHERE X eid %s' % eid, None)
   370         self._cache.pop('Any %s' % eid, None)
   370         self._cache.pop('Any %s' % eid, None)
   371 
   371 
   372     def sqlexec(self, session, sql, args=None):
   372     def sqlexec(self, cnx, sql, args=None):
   373         """execute the query and return its result"""
   373         """execute the query and return its result"""
   374         return self.process_result(self.doexec(session, sql, args))
   374         return self.process_result(self.doexec(cnx, sql, args))
   375 
   375 
   376     def init_creating(self, cnxset=None):
   376     def init_creating(self, cnxset=None):
   377         # check full text index availibility
   377         # check full text index availibility
   378         if self.do_fti:
   378         if self.do_fti:
   379             if cnxset is None:
   379             if cnxset is None:
   519                 return authentifier.authenticate(cnx, login, **kwargs)
   519                 return authentifier.authenticate(cnx, login, **kwargs)
   520             except AuthenticationError:
   520             except AuthenticationError:
   521                 continue
   521                 continue
   522         raise AuthenticationError()
   522         raise AuthenticationError()
   523 
   523 
   524     def syntax_tree_search(self, session, union, args=None, cachekey=None,
   524     def syntax_tree_search(self, cnx, union, args=None, cachekey=None,
   525                            varmap=None):
   525                            varmap=None):
   526         """return result from this source for a rql query (actually from
   526         """return result from this source for a rql query (actually from
   527         a rql syntax tree and a solution dictionary mapping each used
   527         a rql syntax tree and a solution dictionary mapping each used
   528         variable to a possible type). If cachekey is given, the query
   528         variable to a possible type). If cachekey is given, the query
   529         necessary to fetch the results (but not the results themselves)
   529         necessary to fetch the results (but not the results themselves)
   545                 sql, qargs, cbs = self._rql_sqlgen.generate(union, args, varmap)
   545                 sql, qargs, cbs = self._rql_sqlgen.generate(union, args, varmap)
   546                 self._cache[cachekey] = sql, qargs, cbs
   546                 self._cache[cachekey] = sql, qargs, cbs
   547         args = self.merge_args(args, qargs)
   547         args = self.merge_args(args, qargs)
   548         assert isinstance(sql, basestring), repr(sql)
   548         assert isinstance(sql, basestring), repr(sql)
   549         try:
   549         try:
   550             cursor = self.doexec(session, sql, args)
   550             cursor = self.doexec(cnx, sql, args)
   551         except (self.OperationalError, self.InterfaceError):
   551         except (self.OperationalError, self.InterfaceError):
   552             if session.mode == 'write':
   552             if cnx.mode == 'write':
   553                 # do not attempt to reconnect if there has been some write
   553                 # do not attempt to reconnect if there has been some write
   554                 # during the transaction
   554                 # during the transaction
   555                 raise
   555                 raise
   556             # FIXME: better detection of deconnection pb
   556             # FIXME: better detection of deconnection pb
   557             self.warning("trying to reconnect")
   557             self.warning("trying to reconnect")
   558             session.cnxset.reconnect()
   558             cnx.cnxset.reconnect()
   559             cursor = self.doexec(session, sql, args)
   559             cursor = self.doexec(cnx, sql, args)
   560         except self.DbapiError as exc:
   560         except self.DbapiError as exc:
   561             # We get this one with pyodbc and SQL Server when connection was reset
   561             # We get this one with pyodbc and SQL Server when connection was reset
   562             if exc.args[0] == '08S01' and session.mode != 'write':
   562             if exc.args[0] == '08S01' and cnx.mode != 'write':
   563                 self.warning("trying to reconnect")
   563                 self.warning("trying to reconnect")
   564                 session.cnxset.reconnect()
   564                 cnx.cnxset.reconnect()
   565                 cursor = self.doexec(session, sql, args)
   565                 cursor = self.doexec(cnx, sql, args)
   566             else:
   566             else:
   567                 raise
   567                 raise
   568         results = self.process_result(cursor, cbs, session=session)
   568         results = self.process_result(cursor, cbs, session=cnx)
   569         assert dbg_results(results)
   569         assert dbg_results(results)
   570         return results
   570         return results
   571 
   571 
   572     @contextmanager
   572     @contextmanager
   573     def _storage_handler(self, entity, event):
   573     def _storage_handler(self, entity, event):
   598         finally:
   598         finally:
   599             # 3/ restore original values
   599             # 3/ restore original values
   600             for entity, attr, value in restore_values:
   600             for entity, attr, value in restore_values:
   601                 entity.cw_edited.edited_attribute(attr, value)
   601                 entity.cw_edited.edited_attribute(attr, value)
   602 
   602 
   603     def add_entity(self, session, entity):
   603     def add_entity(self, cnx, entity):
   604         """add a new entity to the source"""
   604         """add a new entity to the source"""
   605         with self._storage_handler(entity, 'added'):
   605         with self._storage_handler(entity, 'added'):
   606             attrs = self.preprocess_entity(entity)
   606             attrs = self.preprocess_entity(entity)
   607             sql = self.sqlgen.insert(SQL_PREFIX + entity.cw_etype, attrs)
   607             sql = self.sqlgen.insert(SQL_PREFIX + entity.cw_etype, attrs)
   608             self.doexec(session, sql, attrs)
   608             self.doexec(cnx, sql, attrs)
   609             if session.ertype_supports_undo(entity.cw_etype):
   609             if cnx.ertype_supports_undo(entity.cw_etype):
   610                 self._record_tx_action(session, 'tx_entity_actions', 'C',
   610                 self._record_tx_action(cnx, 'tx_entity_actions', 'C',
   611                                        etype=entity.cw_etype, eid=entity.eid)
   611                                        etype=entity.cw_etype, eid=entity.eid)
   612 
   612 
   613     def update_entity(self, session, entity):
   613     def update_entity(self, cnx, entity):
   614         """replace an entity in the source"""
   614         """replace an entity in the source"""
   615         with self._storage_handler(entity, 'updated'):
   615         with self._storage_handler(entity, 'updated'):
   616             attrs = self.preprocess_entity(entity)
   616             attrs = self.preprocess_entity(entity)
   617             if session.ertype_supports_undo(entity.cw_etype):
   617             if cnx.ertype_supports_undo(entity.cw_etype):
   618                 changes = self._save_attrs(session, entity, attrs)
   618                 changes = self._save_attrs(cnx, entity, attrs)
   619                 self._record_tx_action(session, 'tx_entity_actions', 'U',
   619                 self._record_tx_action(cnx, 'tx_entity_actions', 'U',
   620                                        etype=entity.cw_etype, eid=entity.eid,
   620                                        etype=entity.cw_etype, eid=entity.eid,
   621                                        changes=self._binary(dumps(changes)))
   621                                        changes=self._binary(dumps(changes)))
   622             sql = self.sqlgen.update(SQL_PREFIX + entity.cw_etype, attrs,
   622             sql = self.sqlgen.update(SQL_PREFIX + entity.cw_etype, attrs,
   623                                      ['cw_eid'])
   623                                      ['cw_eid'])
   624             self.doexec(session, sql, attrs)
   624             self.doexec(cnx, sql, attrs)
   625 
   625 
   626     def delete_entity(self, session, entity):
   626     def delete_entity(self, cnx, entity):
   627         """delete an entity from the source"""
   627         """delete an entity from the source"""
   628         with self._storage_handler(entity, 'deleted'):
   628         with self._storage_handler(entity, 'deleted'):
   629             if session.ertype_supports_undo(entity.cw_etype):
   629             if cnx.ertype_supports_undo(entity.cw_etype):
   630                 attrs = [SQL_PREFIX + r.type
   630                 attrs = [SQL_PREFIX + r.type
   631                          for r in entity.e_schema.subject_relations()
   631                          for r in entity.e_schema.subject_relations()
   632                          if (r.final or r.inlined) and not r in VIRTUAL_RTYPES]
   632                          if (r.final or r.inlined) and not r in VIRTUAL_RTYPES]
   633                 changes = self._save_attrs(session, entity, attrs)
   633                 changes = self._save_attrs(cnx, entity, attrs)
   634                 self._record_tx_action(session, 'tx_entity_actions', 'D',
   634                 self._record_tx_action(cnx, 'tx_entity_actions', 'D',
   635                                        etype=entity.cw_etype, eid=entity.eid,
   635                                        etype=entity.cw_etype, eid=entity.eid,
   636                                        changes=self._binary(dumps(changes)))
   636                                        changes=self._binary(dumps(changes)))
   637             attrs = {'cw_eid': entity.eid}
   637             attrs = {'cw_eid': entity.eid}
   638             sql = self.sqlgen.delete(SQL_PREFIX + entity.cw_etype, attrs)
   638             sql = self.sqlgen.delete(SQL_PREFIX + entity.cw_etype, attrs)
   639             self.doexec(session, sql, attrs)
   639             self.doexec(cnx, sql, attrs)
   640 
   640 
   641     def add_relation(self, session, subject, rtype, object, inlined=False):
   641     def add_relation(self, cnx, subject, rtype, object, inlined=False):
   642         """add a relation to the source"""
   642         """add a relation to the source"""
   643         self._add_relations(session,  rtype, [(subject, object)], inlined)
   643         self._add_relations(cnx,  rtype, [(subject, object)], inlined)
   644         if session.ertype_supports_undo(rtype):
   644         if cnx.ertype_supports_undo(rtype):
   645             self._record_tx_action(session, 'tx_relation_actions', 'A',
   645             self._record_tx_action(cnx, 'tx_relation_actions', 'A',
   646                                    eid_from=subject, rtype=rtype, eid_to=object)
   646                                    eid_from=subject, rtype=rtype, eid_to=object)
   647 
   647 
   648     def add_relations(self, session,  rtype, subj_obj_list, inlined=False):
   648     def add_relations(self, cnx,  rtype, subj_obj_list, inlined=False):
   649         """add a relations to the source"""
   649         """add a relations to the source"""
   650         self._add_relations(session, rtype, subj_obj_list, inlined)
   650         self._add_relations(cnx, rtype, subj_obj_list, inlined)
   651         if session.ertype_supports_undo(rtype):
   651         if cnx.ertype_supports_undo(rtype):
   652             for subject, object in subj_obj_list:
   652             for subject, object in subj_obj_list:
   653                 self._record_tx_action(session, 'tx_relation_actions', 'A',
   653                 self._record_tx_action(cnx, 'tx_relation_actions', 'A',
   654                                        eid_from=subject, rtype=rtype, eid_to=object)
   654                                        eid_from=subject, rtype=rtype, eid_to=object)
   655 
   655 
   656     def _add_relations(self, session, rtype, subj_obj_list, inlined=False):
   656     def _add_relations(self, cnx, rtype, subj_obj_list, inlined=False):
   657         """add a relation to the source"""
   657         """add a relation to the source"""
   658         sql = []
   658         sql = []
   659         if inlined is False:
   659         if inlined is False:
   660             attrs = [{'eid_from': subject, 'eid_to': object}
   660             attrs = [{'eid_from': subject, 'eid_to': object}
   661                      for subject, object in subj_obj_list]
   661                      for subject, object in subj_obj_list]
   662             sql.append((self.sqlgen.insert('%s_relation' % rtype, attrs[0]), attrs))
   662             sql.append((self.sqlgen.insert('%s_relation' % rtype, attrs[0]), attrs))
   663         else: # used by data import
   663         else: # used by data import
   664             etypes = {}
   664             etypes = {}
   665             for subject, object in subj_obj_list:
   665             for subject, object in subj_obj_list:
   666                 etype = session.entity_metas(subject)['type']
   666                 etype = cnx.entity_metas(subject)['type']
   667                 if etype in etypes:
   667                 if etype in etypes:
   668                     etypes[etype].append((subject, object))
   668                     etypes[etype].append((subject, object))
   669                 else:
   669                 else:
   670                     etypes[etype] = [(subject, object)]
   670                     etypes[etype] = [(subject, object)]
   671             for subj_etype, subj_obj_list in etypes.iteritems():
   671             for subj_etype, subj_obj_list in etypes.iteritems():
   673                          for subject, object in subj_obj_list]
   673                          for subject, object in subj_obj_list]
   674                 sql.append((self.sqlgen.update(SQL_PREFIX + etype, attrs[0],
   674                 sql.append((self.sqlgen.update(SQL_PREFIX + etype, attrs[0],
   675                                      ['cw_eid']),
   675                                      ['cw_eid']),
   676                             attrs))
   676                             attrs))
   677         for statement, attrs in sql:
   677         for statement, attrs in sql:
   678             self.doexecmany(session, statement, attrs)
   678             self.doexecmany(cnx, statement, attrs)
   679 
   679 
   680     def delete_relation(self, session, subject, rtype, object):
   680     def delete_relation(self, cnx, subject, rtype, object):
   681         """delete a relation from the source"""
   681         """delete a relation from the source"""
   682         rschema = self.schema.rschema(rtype)
   682         rschema = self.schema.rschema(rtype)
   683         self._delete_relation(session, subject, rtype, object, rschema.inlined)
   683         self._delete_relation(cnx, subject, rtype, object, rschema.inlined)
   684         if session.ertype_supports_undo(rtype):
   684         if cnx.ertype_supports_undo(rtype):
   685             self._record_tx_action(session, 'tx_relation_actions', 'R',
   685             self._record_tx_action(cnx, 'tx_relation_actions', 'R',
   686                                    eid_from=subject, rtype=rtype, eid_to=object)
   686                                    eid_from=subject, rtype=rtype, eid_to=object)
   687 
   687 
   688     def _delete_relation(self, session, subject, rtype, object, inlined=False):
   688     def _delete_relation(self, cnx, subject, rtype, object, inlined=False):
   689         """delete a relation from the source"""
   689         """delete a relation from the source"""
   690         if inlined:
   690         if inlined:
   691             table = SQL_PREFIX + session.entity_metas(subject)['type']
   691             table = SQL_PREFIX + cnx.entity_metas(subject)['type']
   692             column = SQL_PREFIX + rtype
   692             column = SQL_PREFIX + rtype
   693             sql = 'UPDATE %s SET %s=NULL WHERE %seid=%%(eid)s' % (table, column,
   693             sql = 'UPDATE %s SET %s=NULL WHERE %seid=%%(eid)s' % (table, column,
   694                                                                   SQL_PREFIX)
   694                                                                   SQL_PREFIX)
   695             attrs = {'eid' : subject}
   695             attrs = {'eid' : subject}
   696         else:
   696         else:
   697             attrs = {'eid_from': subject, 'eid_to': object}
   697             attrs = {'eid_from': subject, 'eid_to': object}
   698             sql = self.sqlgen.delete('%s_relation' % rtype, attrs)
   698             sql = self.sqlgen.delete('%s_relation' % rtype, attrs)
   699         self.doexec(session, sql, attrs)
   699         self.doexec(cnx, sql, attrs)
   700 
   700 
   701     def doexec(self, session, query, args=None, rollback=True):
   701     def doexec(self, cnx, query, args=None, rollback=True):
   702         """Execute a query.
   702         """Execute a query.
   703         it's a function just so that it shows up in profiling
   703         it's a function just so that it shows up in profiling
   704         """
   704         """
   705         cursor = session.cnxset.cu
   705         cursor = cnx.cnxset.cu
   706         if server.DEBUG & server.DBG_SQL:
   706         if server.DEBUG & server.DBG_SQL:
   707             cnx = session.cnxset.cnx
   707             cnx = cnx.cnxset.cnx
   708             # getattr to get the actual connection if cnx is a CnxLoggingWrapper
   708             # getattr to get the actual connection if cnx is a CnxLoggingWrapper
   709             # instance
   709             # instance
   710             print 'exec', query, args, getattr(cnx, '_cnx', cnx)
   710             print 'exec', query, args, getattr(cnx, '_cnx', cnx)
   711         try:
   711         try:
   712             # str(query) to avoid error if it's an unicode string
   712             # str(query) to avoid error if it's an unicode string
   717                 # db schema
   717                 # db schema
   718                 self.critical("sql: %r\n args: %s\ndbms message: %r",
   718                 self.critical("sql: %r\n args: %s\ndbms message: %r",
   719                               query, args, ex.args[0])
   719                               query, args, ex.args[0])
   720             if rollback:
   720             if rollback:
   721                 try:
   721                 try:
   722                     session.cnxset.rollback()
   722                     cnx.cnxset.rollback()
   723                     if self.repo.config.mode != 'test':
   723                     if self.repo.config.mode != 'test':
   724                         self.critical('transaction has been rolled back')
   724                         self.critical('transaction has been rolled back')
   725                 except Exception as ex:
   725                 except Exception as ex:
   726                     pass
   726                     pass
   727             if ex.__class__.__name__ == 'IntegrityError':
   727             if ex.__class__.__name__ == 'IntegrityError':
   728                 # need string comparison because of various backends
   728                 # need string comparison because of various backends
   729                 for arg in ex.args:
   729                 for arg in ex.args:
   730                     # postgres, sqlserver
   730                     # postgres, sqlserver
   731                     mo = re.search("unique_[a-z0-9]{32}", arg)
   731                     mo = re.search("unique_[a-z0-9]{32}", arg)
   732                     if mo is not None:
   732                     if mo is not None:
   733                         raise UniqueTogetherError(session, cstrname=mo.group(0))
   733                         raise UniqueTogetherError(cnx, cstrname=mo.group(0))
   734                     # old sqlite
   734                     # old sqlite
   735                     mo = re.search('columns (.*) are not unique', arg)
   735                     mo = re.search('columns (.*) are not unique', arg)
   736                     if mo is not None: # sqlite in use
   736                     if mo is not None: # sqlite in use
   737                         # we left chop the 'cw_' prefix of attribute names
   737                         # we left chop the 'cw_' prefix of attribute names
   738                         rtypes = [c.strip()[3:]
   738                         rtypes = [c.strip()[3:]
   739                                   for c in mo.group(1).split(',')]
   739                                   for c in mo.group(1).split(',')]
   740                         raise UniqueTogetherError(session, rtypes=rtypes)
   740                         raise UniqueTogetherError(cnx, rtypes=rtypes)
   741                     # sqlite after http://www.sqlite.org/cgi/src/info/c80e229dd9c1230a
   741                     # sqlite after http://www.sqlite.org/cgi/src/info/c80e229dd9c1230a
   742                     if arg.startswith('UNIQUE constraint failed:'):
   742                     if arg.startswith('UNIQUE constraint failed:'):
   743                         # message looks like: "UNIQUE constraint failed: foo.cw_bar, foo.cw_baz"
   743                         # message looks like: "UNIQUE constraint failed: foo.cw_bar, foo.cw_baz"
   744                         # so drop the prefix, split on comma, drop the tablenames, and drop "cw_"
   744                         # so drop the prefix, split on comma, drop the tablenames, and drop "cw_"
   745                         columns = arg.split(':', 1)[1].split(',')
   745                         columns = arg.split(':', 1)[1].split(',')
   746                         rtypes = [c.split('.', 1)[1].strip()[3:] for c in columns]
   746                         rtypes = [c.split('.', 1)[1].strip()[3:] for c in columns]
   747                         raise UniqueTogetherError(session, rtypes=rtypes)
   747                         raise UniqueTogetherError(cnx, rtypes=rtypes)
   748             raise
   748             raise
   749         return cursor
   749         return cursor
   750 
   750 
   751     def doexecmany(self, session, query, args):
   751     def doexecmany(self, cnx, query, args):
   752         """Execute a query.
   752         """Execute a query.
   753         it's a function just so that it shows up in profiling
   753         it's a function just so that it shows up in profiling
   754         """
   754         """
   755         if server.DEBUG & server.DBG_SQL:
   755         if server.DEBUG & server.DBG_SQL:
   756             print 'execmany', query, 'with', len(args), 'arguments'
   756             print 'execmany', query, 'with', len(args), 'arguments'
   757         cursor = session.cnxset.cu
   757         cursor = cnx.cnxset.cu
   758         try:
   758         try:
   759             # str(query) to avoid error if it's an unicode string
   759             # str(query) to avoid error if it's an unicode string
   760             cursor.executemany(str(query), args)
   760             cursor.executemany(str(query), args)
   761         except Exception as ex:
   761         except Exception as ex:
   762             if self.repo.config.mode != 'test':
   762             if self.repo.config.mode != 'test':
   763                 # during test we get those message when trying to alter sqlite
   763                 # during test we get those message when trying to alter sqlite
   764                 # db schema
   764                 # db schema
   765                 self.critical("sql many: %r\n args: %s\ndbms message: %r",
   765                 self.critical("sql many: %r\n args: %s\ndbms message: %r",
   766                               query, args, ex.args[0])
   766                               query, args, ex.args[0])
   767             try:
   767             try:
   768                 session.cnxset.rollback()
   768                 cnx.cnxset.rollback()
   769                 if self.repo.config.mode != 'test':
   769                 if self.repo.config.mode != 'test':
   770                     self.critical('transaction has been rolled back')
   770                     self.critical('transaction has been rolled back')
   771             except Exception:
   771             except Exception:
   772                 pass
   772                 pass
   773             raise
   773             raise
   774 
   774 
   775     # short cut to method requiring advanced db helper usage ##################
   775     # short cut to method requiring advanced db helper usage ##################
   776 
   776 
   777     def update_rdef_column(self, session, rdef):
   777     def update_rdef_column(self, cnx, rdef):
   778         """update physical column for a relation definition (final or inlined)
   778         """update physical column for a relation definition (final or inlined)
   779         """
   779         """
   780         table, column = rdef_table_column(rdef)
   780         table, column = rdef_table_column(rdef)
   781         coltype, allownull = rdef_physical_info(self.dbhelper, rdef)
   781         coltype, allownull = rdef_physical_info(self.dbhelper, rdef)
   782         if not self.dbhelper.alter_column_support:
   782         if not self.dbhelper.alter_column_support:
   783             self.error("backend can't alter %s.%s to %s%s", table, column, coltype,
   783             self.error("backend can't alter %s.%s to %s%s", table, column, coltype,
   784                        not allownull and 'NOT NULL' or '')
   784                        not allownull and 'NOT NULL' or '')
   785             return
   785             return
   786         self.dbhelper.change_col_type(LogCursor(session.cnxset.cu),
   786         self.dbhelper.change_col_type(LogCursor(cnx.cnxset.cu),
   787                                       table, column, coltype, allownull)
   787                                       table, column, coltype, allownull)
   788         self.info('altered %s.%s: now %s%s', table, column, coltype,
   788         self.info('altered %s.%s: now %s%s', table, column, coltype,
   789                   not allownull and 'NOT NULL' or '')
   789                   not allownull and 'NOT NULL' or '')
   790 
   790 
   791     def update_rdef_null_allowed(self, session, rdef):
   791     def update_rdef_null_allowed(self, cnx, rdef):
   792         """update NULL / NOT NULL of physical column for a relation definition
   792         """update NULL / NOT NULL of physical column for a relation definition
   793         (final or inlined)
   793         (final or inlined)
   794         """
   794         """
   795         if not self.dbhelper.alter_column_support:
   795         if not self.dbhelper.alter_column_support:
   796             # not supported (and NOT NULL not set by yams in that case, so no
   796             # not supported (and NOT NULL not set by yams in that case, so no
   797             # worry)
   797             # worry)
   798             return
   798             return
   799         table, column = rdef_table_column(rdef)
   799         table, column = rdef_table_column(rdef)
   800         coltype, allownull = rdef_physical_info(self.dbhelper, rdef)
   800         coltype, allownull = rdef_physical_info(self.dbhelper, rdef)
   801         self.dbhelper.set_null_allowed(LogCursor(session.cnxset.cu),
   801         self.dbhelper.set_null_allowed(LogCursor(cnx.cnxset.cu),
   802                                        table, column, coltype, allownull)
   802                                        table, column, coltype, allownull)
   803 
   803 
   804     def update_rdef_indexed(self, session, rdef):
   804     def update_rdef_indexed(self, cnx, rdef):
   805         table, column = rdef_table_column(rdef)
   805         table, column = rdef_table_column(rdef)
   806         if rdef.indexed:
   806         if rdef.indexed:
   807             self.create_index(session, table, column)
   807             self.create_index(cnx, table, column)
   808         else:
   808         else:
   809             self.drop_index(session, table, column)
   809             self.drop_index(cnx, table, column)
   810 
   810 
   811     def update_rdef_unique(self, session, rdef):
   811     def update_rdef_unique(self, cnx, rdef):
   812         table, column = rdef_table_column(rdef)
   812         table, column = rdef_table_column(rdef)
   813         if rdef.constraint_by_type('UniqueConstraint'):
   813         if rdef.constraint_by_type('UniqueConstraint'):
   814             self.create_index(session, table, column, unique=True)
   814             self.create_index(cnx, table, column, unique=True)
   815         else:
   815         else:
   816             self.drop_index(session, table, column, unique=True)
   816             self.drop_index(cnx, table, column, unique=True)
   817 
   817 
   818     def create_index(self, session, table, column, unique=False):
   818     def create_index(self, cnx, table, column, unique=False):
   819         cursor = LogCursor(session.cnxset.cu)
   819         cursor = LogCursor(cnx.cnxset.cu)
   820         self.dbhelper.create_index(cursor, table, column, unique)
   820         self.dbhelper.create_index(cursor, table, column, unique)
   821 
   821 
   822     def drop_index(self, session, table, column, unique=False):
   822     def drop_index(self, cnx, table, column, unique=False):
   823         cursor = LogCursor(session.cnxset.cu)
   823         cursor = LogCursor(cnx.cnxset.cu)
   824         self.dbhelper.drop_index(cursor, table, column, unique)
   824         self.dbhelper.drop_index(cursor, table, column, unique)
   825 
   825 
   826     # system source interface #################################################
   826     # system source interface #################################################
   827 
   827 
   828     def _eid_type_source(self, session, eid, sql, _retry=True):
   828     def _eid_type_source(self, cnx, eid, sql, _retry=True):
   829         try:
   829         try:
   830             res = self.doexec(session, sql).fetchone()
   830             res = self.doexec(cnx, sql).fetchone()
   831             if res is not None:
   831             if res is not None:
   832                 return res
   832                 return res
   833         except (self.OperationalError, self.InterfaceError):
   833         except (self.OperationalError, self.InterfaceError):
   834             if session.mode == 'read' and _retry:
   834             if cnx.mode == 'read' and _retry:
   835                 self.warning("trying to reconnect (eid_type_source())")
   835                 self.warning("trying to reconnect (eid_type_source())")
   836                 session.cnxset.reconnect()
   836                 cnx.cnxset.reconnect()
   837                 return self._eid_type_source(session, eid, sql, _retry=False)
   837                 return self._eid_type_source(cnx, eid, sql, _retry=False)
   838         except Exception:
   838         except Exception:
   839             assert session.cnxset, 'session has no connections set'
   839             assert cnx.cnxset, 'connection has no connections set'
   840             self.exception('failed to query entities table for eid %s', eid)
   840             self.exception('failed to query entities table for eid %s', eid)
   841         raise UnknownEid(eid)
   841         raise UnknownEid(eid)
   842 
   842 
   843     def eid_type_source(self, session, eid): # pylint: disable=E0202
   843     def eid_type_source(self, cnx, eid): # pylint: disable=E0202
   844         """return a tuple (type, source, extid) for the entity with id <eid>"""
   844         """return a tuple (type, source, extid) for the entity with id <eid>"""
   845         sql = 'SELECT type, extid, asource FROM entities WHERE eid=%s' % eid
   845         sql = 'SELECT type, extid, asource FROM entities WHERE eid=%s' % eid
   846         res = self._eid_type_source(session, eid, sql)
   846         res = self._eid_type_source(cnx, eid, sql)
   847         if res[-2] is not None:
   847         if res[-2] is not None:
   848             if not isinstance(res, list):
   848             if not isinstance(res, list):
   849                 res = list(res)
   849                 res = list(res)
   850             res[-2] = b64decode(res[-2])
   850             res[-2] = b64decode(res[-2])
   851         return res
   851         return res
   852 
   852 
   853     def eid_type_source_pre_131(self, session, eid):
   853     def eid_type_source_pre_131(self, cnx, eid):
   854         """return a tuple (type, source, extid) for the entity with id <eid>"""
   854         """return a tuple (type, source, extid) for the entity with id <eid>"""
   855         sql = 'SELECT type, extid FROM entities WHERE eid=%s' % eid
   855         sql = 'SELECT type, extid FROM entities WHERE eid=%s' % eid
   856         res = self._eid_type_source(session, eid, sql)
   856         res = self._eid_type_source(cnx, eid, sql)
   857         if not isinstance(res, list):
   857         if not isinstance(res, list):
   858             res = list(res)
   858             res = list(res)
   859         if res[-1] is not None:
   859         if res[-1] is not None:
   860             res[-1] = b64decode(res[-1])
   860             res[-1] = b64decode(res[-1])
   861         res.append(res[1])
   861         res.append(res[1])
   862         return res
   862         return res
   863 
   863 
   864     def extid2eid(self, session, extid):
   864     def extid2eid(self, cnx, extid):
   865         """get eid from an external id. Return None if no record found."""
   865         """get eid from an external id. Return None if no record found."""
   866         assert isinstance(extid, str)
   866         assert isinstance(extid, str)
   867         cursor = self.doexec(session,
   867         cursor = self.doexec(cnx,
   868                              'SELECT eid FROM entities WHERE extid=%(x)s',
   868                              'SELECT eid FROM entities WHERE extid=%(x)s',
   869                              {'x': b64encode(extid)})
   869                              {'x': b64encode(extid)})
   870         # XXX testing rowcount cause strange bug with sqlite, results are there
   870         # XXX testing rowcount cause strange bug with sqlite, results are there
   871         #     but rowcount is 0
   871         #     but rowcount is 0
   872         #if cursor.rowcount > 0:
   872         #if cursor.rowcount > 0:
   876                 return result[0]
   876                 return result[0]
   877         except Exception:
   877         except Exception:
   878             pass
   878             pass
   879         return None
   879         return None
   880 
   880 
   881     def _handle_is_relation_sql(self, session, sql, attrs):
   881     def _handle_is_relation_sql(self, cnx, sql, attrs):
   882         """ Handler for specific is_relation sql that may be
   882         """ Handler for specific is_relation sql that may be
   883         overwritten in some stores"""
   883         overwritten in some stores"""
   884         self.doexec(session, sql % attrs)
   884         self.doexec(cnx, sql % attrs)
   885 
   885 
   886     _handle_insert_entity_sql = doexec
   886     _handle_insert_entity_sql = doexec
   887     _handle_is_instance_of_sql = _handle_source_relation_sql = _handle_is_relation_sql
   887     _handle_is_instance_of_sql = _handle_source_relation_sql = _handle_is_relation_sql
   888 
   888 
   889     def add_info(self, session, entity, source, extid, complete):
   889     def add_info(self, cnx, entity, source, extid, complete):
   890         """add type and source info for an eid into the system table"""
   890         """add type and source info for an eid into the system table"""
   891         with session.ensure_cnx_set:
   891         with cnx.ensure_cnx_set:
   892             # begin by inserting eid/type/source/extid into the entities table
   892             # begin by inserting eid/type/source/extid into the entities table
   893             if extid is not None:
   893             if extid is not None:
   894                 assert isinstance(extid, str)
   894                 assert isinstance(extid, str)
   895                 extid = b64encode(extid)
   895                 extid = b64encode(extid)
   896             attrs = {'type': entity.cw_etype, 'eid': entity.eid, 'extid': extid,
   896             attrs = {'type': entity.cw_etype, 'eid': entity.eid, 'extid': extid,
   897                      'asource': source.uri}
   897                      'asource': source.uri}
   898             self._handle_insert_entity_sql(session, self.sqlgen.insert('entities', attrs), attrs)
   898             self._handle_insert_entity_sql(cnx, self.sqlgen.insert('entities', attrs), attrs)
   899             # insert core relations: is, is_instance_of and cw_source
   899             # insert core relations: is, is_instance_of and cw_source
   900             try:
   900             try:
   901                 self._handle_is_relation_sql(session, 'INSERT INTO is_relation(eid_from,eid_to) VALUES (%s,%s)',
   901                 self._handle_is_relation_sql(cnx, 'INSERT INTO is_relation(eid_from,eid_to) VALUES (%s,%s)',
   902                                              (entity.eid, eschema_eid(session, entity.e_schema)))
   902                                              (entity.eid, eschema_eid(cnx, entity.e_schema)))
   903             except IndexError:
   903             except IndexError:
   904                 # during schema serialization, skip
   904                 # during schema serialization, skip
   905                 pass
   905                 pass
   906             else:
   906             else:
   907                 for eschema in entity.e_schema.ancestors() + [entity.e_schema]:
   907                 for eschema in entity.e_schema.ancestors() + [entity.e_schema]:
   908                     self._handle_is_relation_sql(session,
   908                     self._handle_is_relation_sql(cnx,
   909                                                  'INSERT INTO is_instance_of_relation(eid_from,eid_to) VALUES (%s,%s)',
   909                                                  'INSERT INTO is_instance_of_relation(eid_from,eid_to) VALUES (%s,%s)',
   910                                                  (entity.eid, eschema_eid(session, eschema)))
   910                                                  (entity.eid, eschema_eid(cnx, eschema)))
   911             if 'CWSource' in self.schema and source.eid is not None: # else, cw < 3.10
   911             if 'CWSource' in self.schema and source.eid is not None: # else, cw < 3.10
   912                 self._handle_is_relation_sql(session, 'INSERT INTO cw_source_relation(eid_from,eid_to) VALUES (%s,%s)',
   912                 self._handle_is_relation_sql(cnx, 'INSERT INTO cw_source_relation(eid_from,eid_to) VALUES (%s,%s)',
   913                                              (entity.eid, source.eid))
   913                                              (entity.eid, source.eid))
   914             # now we can update the full text index
   914             # now we can update the full text index
   915             if self.do_fti and self.need_fti_indexation(entity.cw_etype):
   915             if self.do_fti and self.need_fti_indexation(entity.cw_etype):
   916                 if complete:
   916                 if complete:
   917                     entity.complete(entity.e_schema.indexable_attributes())
   917                     entity.complete(entity.e_schema.indexable_attributes())
   918                 self.index_entity(session, entity=entity)
   918                 self.index_entity(cnx, entity=entity)
   919 
   919 
   920     def update_info(self, session, entity, need_fti_update):
   920     def update_info(self, cnx, entity, need_fti_update):
   921         """mark entity as being modified, fulltext reindex if needed"""
   921         """mark entity as being modified, fulltext reindex if needed"""
   922         if self.do_fti and need_fti_update:
   922         if self.do_fti and need_fti_update:
   923             # reindex the entity only if this query is updating at least
   923             # reindex the entity only if this query is updating at least
   924             # one indexable attribute
   924             # one indexable attribute
   925             self.index_entity(session, entity=entity)
   925             self.index_entity(cnx, entity=entity)
   926 
   926 
   927     def delete_info_multi(self, session, entities):
   927     def delete_info_multi(self, cnx, entities):
   928         """delete system information on deletion of a list of entities with the
   928         """delete system information on deletion of a list of entities with the
   929         same etype and belinging to the same source
   929         same etype and belinging to the same source
   930 
   930 
   931         * update the fti
   931         * update the fti
   932         * remove record from the `entities` table
   932         * remove record from the `entities` table
   933         """
   933         """
   934         self.fti_unindex_entities(session, entities)
   934         self.fti_unindex_entities(cnx, entities)
   935         attrs = {'eid': '(%s)' % ','.join([str(_e.eid) for _e in entities])}
   935         attrs = {'eid': '(%s)' % ','.join([str(_e.eid) for _e in entities])}
   936         self.doexec(session, self.sqlgen.delete_many('entities', attrs), attrs)
   936         self.doexec(cnx, self.sqlgen.delete_many('entities', attrs), attrs)
   937 
   937 
   938     # undo support #############################################################
   938     # undo support #############################################################
   939 
   939 
   940     def undoable_transactions(self, session, ueid=None, **actionfilters):
   940     def undoable_transactions(self, cnx, ueid=None, **actionfilters):
   941         """See :class:`cubicweb.repoapi.ClientConnection.undoable_transactions`"""
   941         """See :class:`cubicweb.repoapi.ClientConnection.undoable_transactions`"""
   942         # force filtering to session's user if not a manager
   942         # force filtering to connection's user if not a manager
   943         if not session.user.is_in_group('managers'):
   943         if not cnx.user.is_in_group('managers'):
   944             ueid = session.user.eid
   944             ueid = cnx.user.eid
   945         restr = {}
   945         restr = {}
   946         if ueid is not None:
   946         if ueid is not None:
   947             restr['tx_user'] = ueid
   947             restr['tx_user'] = ueid
   948         sql = self.sqlgen.select('transactions', restr, ('tx_uuid', 'tx_time', 'tx_user'))
   948         sql = self.sqlgen.select('transactions', restr, ('tx_uuid', 'tx_time', 'tx_user'))
   949         if actionfilters:
   949         if actionfilters:
  1003                 sql += ' WHERE %s' % ' OR '.join(subqsqls)
  1003                 sql += ' WHERE %s' % ' OR '.join(subqsqls)
  1004             restr.update(trarestr)
  1004             restr.update(trarestr)
  1005             restr.update(tearestr)
  1005             restr.update(tearestr)
  1006         # we want results ordered by transaction's time descendant
  1006         # we want results ordered by transaction's time descendant
  1007         sql += ' ORDER BY tx_time DESC'
  1007         sql += ' ORDER BY tx_time DESC'
  1008         with session.ensure_cnx_set:
  1008         with cnx.ensure_cnx_set:
  1009             cu = self.doexec(session, sql, restr)
  1009             cu = self.doexec(cnx, sql, restr)
  1010             # turn results into transaction objects
  1010             # turn results into transaction objects
  1011             return [tx.Transaction(*args) for args in cu.fetchall()]
  1011             return [tx.Transaction(*args) for args in cu.fetchall()]
  1012 
  1012 
  1013     def tx_info(self, session, txuuid):
  1013     def tx_info(self, cnx, txuuid):
  1014         """See :class:`cubicweb.repoapi.ClientConnection.transaction_info`"""
  1014         """See :class:`cubicweb.repoapi.ClientConnection.transaction_info`"""
  1015         return tx.Transaction(txuuid, *self._tx_info(session, txuuid))
  1015         return tx.Transaction(txuuid, *self._tx_info(cnx, txuuid))
  1016 
  1016 
  1017     def tx_actions(self, session, txuuid, public):
  1017     def tx_actions(self, cnx, txuuid, public):
  1018         """See :class:`cubicweb.repoapi.ClientConnection.transaction_actions`"""
  1018         """See :class:`cubicweb.repoapi.ClientConnection.transaction_actions`"""
  1019         self._tx_info(session, txuuid)
  1019         self._tx_info(cnx, txuuid)
  1020         restr = {'tx_uuid': txuuid}
  1020         restr = {'tx_uuid': txuuid}
  1021         if public:
  1021         if public:
  1022             restr['txa_public'] = True
  1022             restr['txa_public'] = True
  1023         # XXX use generator to avoid loading everything in memory?
  1023         # XXX use generator to avoid loading everything in memory?
  1024         sql = self.sqlgen.select('tx_entity_actions', restr,
  1024         sql = self.sqlgen.select('tx_entity_actions', restr,
  1025                                  ('txa_action', 'txa_public', 'txa_order',
  1025                                  ('txa_action', 'txa_public', 'txa_order',
  1026                                   'etype', 'eid', 'changes'))
  1026                                   'etype', 'eid', 'changes'))
  1027         cu = self.doexec(session, sql, restr)
  1027         cu = self.doexec(cnx, sql, restr)
  1028         actions = [tx.EntityAction(a,p,o,et,e,c and loads(self.binary_to_str(c)))
  1028         actions = [tx.EntityAction(a,p,o,et,e,c and loads(self.binary_to_str(c)))
  1029                    for a,p,o,et,e,c in cu.fetchall()]
  1029                    for a,p,o,et,e,c in cu.fetchall()]
  1030         sql = self.sqlgen.select('tx_relation_actions', restr,
  1030         sql = self.sqlgen.select('tx_relation_actions', restr,
  1031                                  ('txa_action', 'txa_public', 'txa_order',
  1031                                  ('txa_action', 'txa_public', 'txa_order',
  1032                                   'rtype', 'eid_from', 'eid_to'))
  1032                                   'rtype', 'eid_from', 'eid_to'))
  1033         cu = self.doexec(session, sql, restr)
  1033         cu = self.doexec(cnx, sql, restr)
  1034         actions += [tx.RelationAction(*args) for args in cu.fetchall()]
  1034         actions += [tx.RelationAction(*args) for args in cu.fetchall()]
  1035         return sorted(actions, key=lambda x: x.order)
  1035         return sorted(actions, key=lambda x: x.order)
  1036 
  1036 
  1037     def undo_transaction(self, session, txuuid):
  1037     def undo_transaction(self, cnx, txuuid):
  1038         """See :class:`cubicweb.repoapi.ClientConnection.undo_transaction`
  1038         """See :class:`cubicweb.repoapi.ClientConnection.undo_transaction`
  1039 
  1039 
  1040         important note: while undoing of a transaction, only hooks in the
  1040         important note: while undoing of a transaction, only hooks in the
  1041         'integrity', 'activeintegrity' and 'undo' categories are called.
  1041         'integrity', 'activeintegrity' and 'undo' categories are called.
  1042         """
  1042         """
  1043         # set mode so connections set isn't released subsquently until commit/rollback
  1043         # set mode so connections set isn't released subsquently until commit/rollback
  1044         session.mode = 'write'
  1044         cnx.mode = 'write'
  1045         errors = []
  1045         errors = []
  1046         session.transaction_data['undoing_uuid'] = txuuid
  1046         cnx.transaction_data['undoing_uuid'] = txuuid
  1047         with session.deny_all_hooks_but('integrity', 'activeintegrity', 'undo'):
  1047         with cnx.deny_all_hooks_but('integrity', 'activeintegrity', 'undo'):
  1048             with session.security_enabled(read=False):
  1048             with cnx.security_enabled(read=False):
  1049                 for action in reversed(self.tx_actions(session, txuuid, False)):
  1049                 for action in reversed(self.tx_actions(cnx, txuuid, False)):
  1050                     undomethod = getattr(self, '_undo_%s' % action.action.lower())
  1050                     undomethod = getattr(self, '_undo_%s' % action.action.lower())
  1051                     errors += undomethod(session, action)
  1051                     errors += undomethod(cnx, action)
  1052         # remove the transactions record
  1052         # remove the transactions record
  1053         self.doexec(session,
  1053         self.doexec(cnx,
  1054                     "DELETE FROM transactions WHERE tx_uuid='%s'" % txuuid)
  1054                     "DELETE FROM transactions WHERE tx_uuid='%s'" % txuuid)
  1055         if errors:
  1055         if errors:
  1056             raise UndoTransactionException(txuuid, errors)
  1056             raise UndoTransactionException(txuuid, errors)
  1057         else:
  1057         else:
  1058             return
  1058             return
  1059 
  1059 
  1060     def start_undoable_transaction(self, session, uuid):
  1060     def start_undoable_transaction(self, cnx, uuid):
  1061         """session callback to insert a transaction record in the transactions
  1061         """connection callback to insert a transaction record in the transactions
  1062         table when some undoable transaction is started
  1062         table when some undoable transaction is started
  1063         """
  1063         """
  1064         ueid = session.user.eid
  1064         ueid = cnx.user.eid
  1065         attrs = {'tx_uuid': uuid, 'tx_user': ueid, 'tx_time': datetime.utcnow()}
  1065         attrs = {'tx_uuid': uuid, 'tx_user': ueid, 'tx_time': datetime.utcnow()}
  1066         self.doexec(session, self.sqlgen.insert('transactions', attrs), attrs)
  1066         self.doexec(cnx, self.sqlgen.insert('transactions', attrs), attrs)
  1067 
  1067 
  1068     def _save_attrs(self, session, entity, attrs):
  1068     def _save_attrs(self, cnx, entity, attrs):
  1069         """return a pickleable dictionary containing current values for given
  1069         """return a pickleable dictionary containing current values for given
  1070         attributes of the entity
  1070         attributes of the entity
  1071         """
  1071         """
  1072         restr = {'cw_eid': entity.eid}
  1072         restr = {'cw_eid': entity.eid}
  1073         sql = self.sqlgen.select(SQL_PREFIX + entity.cw_etype, restr, attrs)
  1073         sql = self.sqlgen.select(SQL_PREFIX + entity.cw_etype, restr, attrs)
  1074         cu = self.doexec(session, sql, restr)
  1074         cu = self.doexec(cnx, sql, restr)
  1075         values = dict(zip(attrs, cu.fetchone()))
  1075         values = dict(zip(attrs, cu.fetchone()))
  1076         # ensure backend specific binary are converted back to string
  1076         # ensure backend specific binary are converted back to string
  1077         eschema = entity.e_schema
  1077         eschema = entity.e_schema
  1078         for column in attrs:
  1078         for column in attrs:
  1079             # [3:] remove 'cw_' prefix
  1079             # [3:] remove 'cw_' prefix
  1084                 value = values[column]
  1084                 value = values[column]
  1085                 if value is not None:
  1085                 if value is not None:
  1086                     values[column] = self.binary_to_str(value)
  1086                     values[column] = self.binary_to_str(value)
  1087         return values
  1087         return values
  1088 
  1088 
  1089     def _record_tx_action(self, session, table, action, **kwargs):
  1089     def _record_tx_action(self, cnx, table, action, **kwargs):
  1090         """record a transaction action in the given table (either
  1090         """record a transaction action in the given table (either
  1091         'tx_entity_actions' or 'tx_relation_action')
  1091         'tx_entity_actions' or 'tx_relation_action')
  1092         """
  1092         """
  1093         kwargs['tx_uuid'] = session.transaction_uuid()
  1093         kwargs['tx_uuid'] = cnx.transaction_uuid()
  1094         kwargs['txa_action'] = action
  1094         kwargs['txa_action'] = action
  1095         kwargs['txa_order'] = session.transaction_inc_action_counter()
  1095         kwargs['txa_order'] = cnx.transaction_inc_action_counter()
  1096         kwargs['txa_public'] = session.running_dbapi_query
  1096         kwargs['txa_public'] = cnx.running_dbapi_query
  1097         self.doexec(session, self.sqlgen.insert(table, kwargs), kwargs)
  1097         self.doexec(cnx, self.sqlgen.insert(table, kwargs), kwargs)
  1098 
  1098 
  1099     def _tx_info(self, session, txuuid):
  1099     def _tx_info(self, cnx, txuuid):
  1100         """return transaction's time and user of the transaction with the given uuid.
  1100         """return transaction's time and user of the transaction with the given uuid.
  1101 
  1101 
  1102         raise `NoSuchTransaction` if there is no such transaction of if the
  1102         raise `NoSuchTransaction` if there is no such transaction of if the
  1103         session's user isn't allowed to see it.
  1103         connection's user isn't allowed to see it.
  1104         """
  1104         """
  1105         with session.ensure_cnx_set:
  1105         with cnx.ensure_cnx_set:
  1106             restr = {'tx_uuid': txuuid}
  1106             restr = {'tx_uuid': txuuid}
  1107             sql = self.sqlgen.select('transactions', restr,
  1107             sql = self.sqlgen.select('transactions', restr,
  1108                                      ('tx_time', 'tx_user'))
  1108                                      ('tx_time', 'tx_user'))
  1109             cu = self.doexec(session, sql, restr)
  1109             cu = self.doexec(cnx, sql, restr)
  1110             try:
  1110             try:
  1111                 time, ueid = cu.fetchone()
  1111                 time, ueid = cu.fetchone()
  1112             except TypeError:
  1112             except TypeError:
  1113                 raise tx.NoSuchTransaction(txuuid)
  1113                 raise tx.NoSuchTransaction(txuuid)
  1114             if not (session.user.is_in_group('managers')
  1114             if not (cnx.user.is_in_group('managers')
  1115                     or session.user.eid == ueid):
  1115                     or cnx.user.eid == ueid):
  1116                 raise tx.NoSuchTransaction(txuuid)
  1116                 raise tx.NoSuchTransaction(txuuid)
  1117             return time, ueid
  1117             return time, ueid
  1118 
  1118 
  1119     def _reedit_entity(self, entity, changes, err):
  1119     def _reedit_entity(self, entity, changes, err):
  1120         session = entity._cw
  1120         cnx = entity._cw
  1121         eid = entity.eid
  1121         eid = entity.eid
  1122         entity.cw_edited = edited = EditedEntity(entity)
  1122         entity.cw_edited = edited = EditedEntity(entity)
  1123         # check for schema changes, entities linked through inlined relation
  1123         # check for schema changes, entities linked through inlined relation
  1124         # still exists, rewrap binary values
  1124         # still exists, rewrap binary values
  1125         eschema = entity.e_schema
  1125         eschema = entity.e_schema
  1129             if rtype == "eid":
  1129             if rtype == "eid":
  1130                 continue # XXX should even `eid` be stored in action changes?
  1130                 continue # XXX should even `eid` be stored in action changes?
  1131             try:
  1131             try:
  1132                 rschema = getrschema[rtype]
  1132                 rschema = getrschema[rtype]
  1133             except KeyError:
  1133             except KeyError:
  1134                 err(session._("can't restore relation %(rtype)s of entity %(eid)s, "
  1134                 err(cnx._("can't restore relation %(rtype)s of entity %(eid)s, "
  1135                               "this relation does not exist in the schema anymore.")
  1135                               "this relation does not exist in the schema anymore.")
  1136                     % {'rtype': rtype, 'eid': eid})
  1136                     % {'rtype': rtype, 'eid': eid})
  1137             if not rschema.final:
  1137             if not rschema.final:
  1138                 if not rschema.inlined:
  1138                 if not rschema.inlined:
  1139                     assert value is None
  1139                     assert value is None
  1142                     # not a deletion: we must put something in edited
  1142                     # not a deletion: we must put something in edited
  1143                     try:
  1143                     try:
  1144                         entity._cw.entity_from_eid(value) # check target exists
  1144                         entity._cw.entity_from_eid(value) # check target exists
  1145                         edited[rtype] = value
  1145                         edited[rtype] = value
  1146                     except UnknownEid:
  1146                     except UnknownEid:
  1147                         err(session._("can't restore entity %(eid)s of type %(eschema)s, "
  1147                         err(cnx._("can't restore entity %(eid)s of type %(eschema)s, "
  1148                                       "target of %(rtype)s (eid %(value)s) does not exist any longer")
  1148                                       "target of %(rtype)s (eid %(value)s) does not exist any longer")
  1149                             % locals())
  1149                             % locals())
  1150             elif eschema.destination(rtype) in ('Bytes', 'Password'):
  1150             elif eschema.destination(rtype) in ('Bytes', 'Password'):
  1151                 changes[column] = self._binary(value)
  1151                 changes[column] = self._binary(value)
  1152                 edited[rtype] = Binary(value)
  1152                 edited[rtype] = Binary(value)
  1153             elif isinstance(value, str):
  1153             elif isinstance(value, str):
  1154                 edited[rtype] = unicode(value, session.encoding, 'replace')
  1154                 edited[rtype] = unicode(value, cnx.encoding, 'replace')
  1155             else:
  1155             else:
  1156                 edited[rtype] = value
  1156                 edited[rtype] = value
  1157         # This must only be done after init_entitiy_caches : defered in calling functions
  1157         # This must only be done after init_entitiy_caches : defered in calling functions
  1158         # edited.check()
  1158         # edited.check()
  1159 
  1159 
  1160     def _undo_d(self, session, action):
  1160     def _undo_d(self, cnx, action):
  1161         """undo an entity deletion"""
  1161         """undo an entity deletion"""
  1162         errors = []
  1162         errors = []
  1163         err = errors.append
  1163         err = errors.append
  1164         eid = action.eid
  1164         eid = action.eid
  1165         etype = action.etype
  1165         etype = action.etype
  1166         _ = session._
  1166         _ = cnx._
  1167         # get an entity instance
  1167         # get an entity instance
  1168         try:
  1168         try:
  1169             entity = self.repo.vreg['etypes'].etype_class(etype)(session)
  1169             entity = self.repo.vreg['etypes'].etype_class(etype)(cnx)
  1170         except Exception:
  1170         except Exception:
  1171             err("can't restore entity %s of type %s, type no more supported"
  1171             err("can't restore entity %s of type %s, type no more supported"
  1172                 % (eid, etype))
  1172                 % (eid, etype))
  1173             return errors
  1173             return errors
  1174         self._reedit_entity(entity, action.changes, err)
  1174         self._reedit_entity(entity, action.changes, err)
  1175         entity.eid = eid
  1175         entity.eid = eid
  1176         session.repo.init_entity_caches(session, entity, self)
  1176         cnx.repo.init_entity_caches(cnx, entity, self)
  1177         entity.cw_edited.check()
  1177         entity.cw_edited.check()
  1178         self.repo.hm.call_hooks('before_add_entity', session, entity=entity)
  1178         self.repo.hm.call_hooks('before_add_entity', cnx, entity=entity)
  1179         # restore the entity
  1179         # restore the entity
  1180         action.changes['cw_eid'] = eid
  1180         action.changes['cw_eid'] = eid
  1181         sql = self.sqlgen.insert(SQL_PREFIX + etype, action.changes)
  1181         sql = self.sqlgen.insert(SQL_PREFIX + etype, action.changes)
  1182         self.doexec(session, sql, action.changes)
  1182         self.doexec(cnx, sql, action.changes)
  1183         # restore record in entities (will update fti if needed)
  1183         # restore record in entities (will update fti if needed)
  1184         self.add_info(session, entity, self, None, True)
  1184         self.add_info(cnx, entity, self, None, True)
  1185         self.repo.hm.call_hooks('after_add_entity', session, entity=entity)
  1185         self.repo.hm.call_hooks('after_add_entity', cnx, entity=entity)
  1186         return errors
  1186         return errors
  1187 
  1187 
  1188     def _undo_r(self, session, action):
  1188     def _undo_r(self, cnx, action):
  1189         """undo a relation removal"""
  1189         """undo a relation removal"""
  1190         errors = []
  1190         errors = []
  1191         subj, rtype, obj = action.eid_from, action.rtype, action.eid_to
  1191         subj, rtype, obj = action.eid_from, action.rtype, action.eid_to
  1192         try:
  1192         try:
  1193             sentity, oentity, rdef = _undo_rel_info(session, subj, rtype, obj)
  1193             sentity, oentity, rdef = _undo_rel_info(cnx, subj, rtype, obj)
  1194         except _UndoException as ex:
  1194         except _UndoException as ex:
  1195             errors.append(unicode(ex))
  1195             errors.append(unicode(ex))
  1196         else:
  1196         else:
  1197             for role, entity in (('subject', sentity),
  1197             for role, entity in (('subject', sentity),
  1198                                  ('object', oentity)):
  1198                                  ('object', oentity)):
  1200                     _undo_check_relation_target(entity, rdef, role)
  1200                     _undo_check_relation_target(entity, rdef, role)
  1201                 except _UndoException as ex:
  1201                 except _UndoException as ex:
  1202                     errors.append(unicode(ex))
  1202                     errors.append(unicode(ex))
  1203                     continue
  1203                     continue
  1204         if not errors:
  1204         if not errors:
  1205             self.repo.hm.call_hooks('before_add_relation', session,
  1205             self.repo.hm.call_hooks('before_add_relation', cnx,
  1206                                     eidfrom=subj, rtype=rtype, eidto=obj)
  1206                                     eidfrom=subj, rtype=rtype, eidto=obj)
  1207             # add relation in the database
  1207             # add relation in the database
  1208             self._add_relations(session, rtype, [(subj, obj)], rdef.rtype.inlined)
  1208             self._add_relations(cnx, rtype, [(subj, obj)], rdef.rtype.inlined)
  1209             # set related cache
  1209             # set related cache
  1210             session.update_rel_cache_add(subj, rtype, obj, rdef.rtype.symmetric)
  1210             cnx.update_rel_cache_add(subj, rtype, obj, rdef.rtype.symmetric)
  1211             self.repo.hm.call_hooks('after_add_relation', session,
  1211             self.repo.hm.call_hooks('after_add_relation', cnx,
  1212                                     eidfrom=subj, rtype=rtype, eidto=obj)
  1212                                     eidfrom=subj, rtype=rtype, eidto=obj)
  1213         return errors
  1213         return errors
  1214 
  1214 
  1215     def _undo_c(self, session, action):
  1215     def _undo_c(self, cnx, action):
  1216         """undo an entity creation"""
  1216         """undo an entity creation"""
  1217         eid = action.eid
  1217         eid = action.eid
  1218         # XXX done to avoid fetching all remaining relation for the entity
  1218         # XXX done to avoid fetching all remaining relation for the entity
  1219         # we should find an efficient way to do this (keeping current veolidf
  1219         # we should find an efficient way to do this (keeping current veolidf
  1220         # massive deletion performance)
  1220         # massive deletion performance)
  1221         if _undo_has_later_transaction(session, eid):
  1221         if _undo_has_later_transaction(cnx, eid):
  1222             msg = session._('some later transaction(s) touch entity, undo them '
  1222             msg = cnx._('some later transaction(s) touch entity, undo them '
  1223                             'first')
  1223                             'first')
  1224             raise ValidationError(eid, {None: msg})
  1224             raise ValidationError(eid, {None: msg})
  1225         etype = action.etype
  1225         etype = action.etype
  1226         # get an entity instance
  1226         # get an entity instance
  1227         try:
  1227         try:
  1228             entity = self.repo.vreg['etypes'].etype_class(etype)(session)
  1228             entity = self.repo.vreg['etypes'].etype_class(etype)(cnx)
  1229         except Exception:
  1229         except Exception:
  1230             return [session._(
  1230             return [cnx._(
  1231                 "Can't undo creation of entity %(eid)s of type %(etype)s, type "
  1231                 "Can't undo creation of entity %(eid)s of type %(etype)s, type "
  1232                 "no more supported" % {'eid': eid, 'etype': etype})]
  1232                 "no more supported" % {'eid': eid, 'etype': etype})]
  1233         entity.eid = eid
  1233         entity.eid = eid
  1234         # for proper eid/type cache update
  1234         # for proper eid/type cache update
  1235         CleanupDeletedEidsCacheOp.get_instance(session).add_data(eid)
  1235         CleanupDeletedEidsCacheOp.get_instance(cnx).add_data(eid)
  1236         self.repo.hm.call_hooks('before_delete_entity', session, entity=entity)
  1236         self.repo.hm.call_hooks('before_delete_entity', cnx, entity=entity)
  1237         # remove is / is_instance_of which are added using sql by hooks, hence
  1237         # remove is / is_instance_of which are added using sql by hooks, hence
  1238         # unvisible as transaction action
  1238         # unvisible as transaction action
  1239         self.doexec(session, 'DELETE FROM is_relation WHERE eid_from=%s' % eid)
  1239         self.doexec(cnx, 'DELETE FROM is_relation WHERE eid_from=%s' % eid)
  1240         self.doexec(session, 'DELETE FROM is_instance_of_relation WHERE eid_from=%s' % eid)
  1240         self.doexec(cnx, 'DELETE FROM is_instance_of_relation WHERE eid_from=%s' % eid)
  1241         self.doexec(session, 'DELETE FROM cw_source_relation WHERE eid_from=%s' % self.eid)
  1241         self.doexec(cnx, 'DELETE FROM cw_source_relation WHERE eid_from=%s' % self.eid)
  1242         # XXX check removal of inlined relation?
  1242         # XXX check removal of inlined relation?
  1243         # delete the entity
  1243         # delete the entity
  1244         attrs = {'cw_eid': eid}
  1244         attrs = {'cw_eid': eid}
  1245         sql = self.sqlgen.delete(SQL_PREFIX + entity.cw_etype, attrs)
  1245         sql = self.sqlgen.delete(SQL_PREFIX + entity.cw_etype, attrs)
  1246         self.doexec(session, sql, attrs)
  1246         self.doexec(cnx, sql, attrs)
  1247         # remove record from entities (will update fti if needed)
  1247         # remove record from entities (will update fti if needed)
  1248         self.delete_info_multi(session, [entity])
  1248         self.delete_info_multi(cnx, [entity])
  1249         self.repo.hm.call_hooks('after_delete_entity', session, entity=entity)
  1249         self.repo.hm.call_hooks('after_delete_entity', cnx, entity=entity)
  1250         return ()
  1250         return ()
  1251 
  1251 
  1252     def _undo_u(self, session, action):
  1252     def _undo_u(self, cnx, action):
  1253         """undo an entity update"""
  1253         """undo an entity update"""
  1254         errors = []
  1254         errors = []
  1255         err = errors.append
  1255         err = errors.append
  1256         try:
  1256         try:
  1257             entity = session.entity_from_eid(action.eid)
  1257             entity = cnx.entity_from_eid(action.eid)
  1258         except UnknownEid:
  1258         except UnknownEid:
  1259             err(session._("can't restore state of entity %s, it has been "
  1259             err(cnx._("can't restore state of entity %s, it has been "
  1260                           "deleted inbetween") % action.eid)
  1260                           "deleted inbetween") % action.eid)
  1261             return errors
  1261             return errors
  1262         self._reedit_entity(entity, action.changes, err)
  1262         self._reedit_entity(entity, action.changes, err)
  1263         entity.cw_edited.check()
  1263         entity.cw_edited.check()
  1264         self.repo.hm.call_hooks('before_update_entity', session, entity=entity)
  1264         self.repo.hm.call_hooks('before_update_entity', cnx, entity=entity)
  1265         sql = self.sqlgen.update(SQL_PREFIX + entity.cw_etype, action.changes,
  1265         sql = self.sqlgen.update(SQL_PREFIX + entity.cw_etype, action.changes,
  1266                                  ['cw_eid'])
  1266                                  ['cw_eid'])
  1267         self.doexec(session, sql, action.changes)
  1267         self.doexec(cnx, sql, action.changes)
  1268         self.repo.hm.call_hooks('after_update_entity', session, entity=entity)
  1268         self.repo.hm.call_hooks('after_update_entity', cnx, entity=entity)
  1269         return errors
  1269         return errors
  1270 
  1270 
  1271     def _undo_a(self, session, action):
  1271     def _undo_a(self, cnx, action):
  1272         """undo a relation addition"""
  1272         """undo a relation addition"""
  1273         errors = []
  1273         errors = []
  1274         subj, rtype, obj = action.eid_from, action.rtype, action.eid_to
  1274         subj, rtype, obj = action.eid_from, action.rtype, action.eid_to
  1275         try:
  1275         try:
  1276             sentity, oentity, rdef = _undo_rel_info(session, subj, rtype, obj)
  1276             sentity, oentity, rdef = _undo_rel_info(cnx, subj, rtype, obj)
  1277         except _UndoException as ex:
  1277         except _UndoException as ex:
  1278             errors.append(unicode(ex))
  1278             errors.append(unicode(ex))
  1279         else:
  1279         else:
  1280             rschema = rdef.rtype
  1280             rschema = rdef.rtype
  1281             if rschema.inlined:
  1281             if rschema.inlined:
  1282                 sql = 'SELECT 1 FROM cw_%s WHERE cw_eid=%s and cw_%s=%s'\
  1282                 sql = 'SELECT 1 FROM cw_%s WHERE cw_eid=%s and cw_%s=%s'\
  1283                       % (sentity.cw_etype, subj, rtype, obj)
  1283                       % (sentity.cw_etype, subj, rtype, obj)
  1284             else:
  1284             else:
  1285                 sql = 'SELECT 1 FROM %s_relation WHERE eid_from=%s and eid_to=%s'\
  1285                 sql = 'SELECT 1 FROM %s_relation WHERE eid_from=%s and eid_to=%s'\
  1286                       % (rtype, subj, obj)
  1286                       % (rtype, subj, obj)
  1287             cu = self.doexec(session, sql)
  1287             cu = self.doexec(cnx, sql)
  1288             if cu.fetchone() is None:
  1288             if cu.fetchone() is None:
  1289                 errors.append(session._(
  1289                 errors.append(cnx._(
  1290                     "Can't undo addition of relation %(rtype)s from %(subj)s to"
  1290                     "Can't undo addition of relation %(rtype)s from %(subj)s to"
  1291                     " %(obj)s, doesn't exist anymore" % locals()))
  1291                     " %(obj)s, doesn't exist anymore" % locals()))
  1292         if not errors:
  1292         if not errors:
  1293             self.repo.hm.call_hooks('before_delete_relation', session,
  1293             self.repo.hm.call_hooks('before_delete_relation', cnx,
  1294                                     eidfrom=subj, rtype=rtype, eidto=obj)
  1294                                     eidfrom=subj, rtype=rtype, eidto=obj)
  1295             # delete relation from the database
  1295             # delete relation from the database
  1296             self._delete_relation(session, subj, rtype, obj, rschema.inlined)
  1296             self._delete_relation(cnx, subj, rtype, obj, rschema.inlined)
  1297             # set related cache
  1297             # set related cache
  1298             session.update_rel_cache_del(subj, rtype, obj, rschema.symmetric)
  1298             cnx.update_rel_cache_del(subj, rtype, obj, rschema.symmetric)
  1299             self.repo.hm.call_hooks('after_delete_relation', session,
  1299             self.repo.hm.call_hooks('after_delete_relation', cnx,
  1300                                     eidfrom=subj, rtype=rtype, eidto=obj)
  1300                                     eidfrom=subj, rtype=rtype, eidto=obj)
  1301         return errors
  1301         return errors
  1302 
  1302 
  1303     # full text index handling #################################################
  1303     # full text index handling #################################################
  1304 
  1304 
  1309             return True
  1309             return True
  1310         if any(eschema.fulltext_containers()):
  1310         if any(eschema.fulltext_containers()):
  1311             return True
  1311             return True
  1312         return False
  1312         return False
  1313 
  1313 
  1314     def index_entity(self, session, entity):
  1314     def index_entity(self, cnx, entity):
  1315         """create an operation to [re]index textual content of the given entity
  1315         """create an operation to [re]index textual content of the given entity
  1316         on commit
  1316         on commit
  1317         """
  1317         """
  1318         FTIndexEntityOp.get_instance(session).add_data(entity.eid)
  1318         FTIndexEntityOp.get_instance(cnx).add_data(entity.eid)
  1319 
  1319 
  1320     def fti_unindex_entities(self, cnx, entities):
  1320     def fti_unindex_entities(self, cnx, entities):
  1321         """remove text content for entities from the full text index
  1321         """remove text content for entities from the full text index
  1322         """
  1322         """
  1323         cursor = cnx.cnxset.cu
  1323         cursor = cnx.cnxset.cu
  1352     triggered on precommit, not commit, and this should be done after other
  1352     triggered on precommit, not commit, and this should be done after other
  1353     precommit operation which may add relations to the entity
  1353     precommit operation which may add relations to the entity
  1354     """
  1354     """
  1355 
  1355 
  1356     def precommit_event(self):
  1356     def precommit_event(self):
  1357         session = self.session
  1357         cnx = self.cnx
  1358         source = session.repo.system_source
  1358         source = cnx.repo.system_source
  1359         pendingeids = session.transaction_data.get('pendingeids', ())
  1359         pendingeids = cnx.transaction_data.get('pendingeids', ())
  1360         done = session.transaction_data.setdefault('indexedeids', set())
  1360         done = cnx.transaction_data.setdefault('indexedeids', set())
  1361         to_reindex = set()
  1361         to_reindex = set()
  1362         for eid in self.get_data():
  1362         for eid in self.get_data():
  1363             if eid in pendingeids or eid in done:
  1363             if eid in pendingeids or eid in done:
  1364                 # entity added and deleted in the same transaction or already
  1364                 # entity added and deleted in the same transaction or already
  1365                 # processed
  1365                 # processed
  1366                 continue
  1366                 continue
  1367             done.add(eid)
  1367             done.add(eid)
  1368             iftindexable = session.entity_from_eid(eid).cw_adapt_to('IFTIndexable')
  1368             iftindexable = cnx.entity_from_eid(eid).cw_adapt_to('IFTIndexable')
  1369             to_reindex |= set(iftindexable.fti_containers())
  1369             to_reindex |= set(iftindexable.fti_containers())
  1370         source.fti_unindex_entities(session, to_reindex)
  1370         source.fti_unindex_entities(cnx, to_reindex)
  1371         source.fti_index_entities(session, to_reindex)
  1371         source.fti_index_entities(cnx, to_reindex)
  1372 
  1372 
  1373 def sql_schema(driver):
  1373 def sql_schema(driver):
  1374     helper = get_db_helper(driver)
  1374     helper = get_db_helper(driver)
  1375     typemap = helper.TYPE_MAPPING
  1375     typemap = helper.TYPE_MAPPING
  1376     schema = """
  1376     schema = """