diff -r 29450466273a -r ca853478aaa8 server/sources/native.py --- a/server/sources/native.py Tue Apr 01 16:12:50 2014 +0200 +++ b/server/sources/native.py Tue Apr 01 16:17:03 2014 +0200 @@ -153,34 +153,34 @@ 'rtype': rdef.rtype, 'eid': tentity.eid}) -def _undo_rel_info(session, subj, rtype, obj): +def _undo_rel_info(cnx, subj, rtype, obj): entities = [] for role, eid in (('subject', subj), ('object', obj)): try: - entities.append(session.entity_from_eid(eid)) + entities.append(cnx.entity_from_eid(eid)) except UnknownEid: - raise _UndoException(session._( + raise _UndoException(cnx._( "Can't restore relation %(rtype)s, %(role)s entity %(eid)s" " doesn't exist anymore.") - % {'role': session._(role), - 'rtype': session._(rtype), + % {'role': cnx._(role), + 'rtype': cnx._(rtype), 'eid': eid}) sentity, oentity = entities try: - rschema = session.vreg.schema.rschema(rtype) + rschema = cnx.vreg.schema.rschema(rtype) rdef = rschema.rdefs[(sentity.cw_etype, oentity.cw_etype)] except KeyError: - raise _UndoException(session._( + raise _UndoException(cnx._( "Can't restore relation %(rtype)s between %(subj)s and " "%(obj)s, that relation does not exists anymore in the " "schema.") - % {'rtype': session._(rtype), + % {'rtype': cnx._(rtype), 'subj': subj, 'obj': obj}) return sentity, oentity, rdef -def _undo_has_later_transaction(session, eid): - return session.system_sql('''\ +def _undo_has_later_transaction(cnx, eid): + return cnx.system_sql('''\ SELECT T.tx_uuid FROM transactions AS TREF, transactions AS T WHERE TREF.tx_uuid='%(txuuid)s' AND T.tx_uuid!='%(txuuid)s' AND T.tx_time>=TREF.tx_time @@ -189,7 +189,7 @@ OR EXISTS(SELECT 1 FROM tx_relation_actions as TRA WHERE TRA.tx_uuid=T.tx_uuid AND ( TRA.eid_from=%(eid)s OR TRA.eid_to=%(eid)s)) - )''' % {'txuuid': session.transaction_data['undoing_uuid'], + )''' % {'txuuid': cnx.transaction_data['undoing_uuid'], 'eid': eid}).fetchone() @@ -206,7 +206,7 @@ self.cnx.close() self.cnx = None - def create_eid(self, _session, count=1): + def create_eid(self, _cnx, count=1): # lock needed to prevent 'Connection is busy with results for another # command (0)' errors with SQLServer assert count > 0 @@ -259,12 +259,12 @@ def close(self): pass - def create_eid(self, session, count=1): + def create_eid(self, cnx, count=1): assert count > 0 source = self.source with self.lock: for sql in source.dbhelper.sqls_increment_numrange('entities_id_seq', count): - cursor = source.doexec(session, sql) + cursor = source.doexec(cnx, sql) return cursor.fetchone()[0] @@ -369,9 +369,9 @@ self._cache.pop('Any X WHERE X eid %s' % eid, None) self._cache.pop('Any %s' % eid, None) - def sqlexec(self, session, sql, args=None): + def sqlexec(self, cnx, sql, args=None): """execute the query and return its result""" - return self.process_result(self.doexec(session, sql, args)) + return self.process_result(self.doexec(cnx, sql, args)) def init_creating(self, cnxset=None): # check full text index availibility @@ -521,7 +521,7 @@ continue raise AuthenticationError() - def syntax_tree_search(self, session, union, args=None, cachekey=None, + def syntax_tree_search(self, cnx, union, args=None, cachekey=None, varmap=None): """return result from this source for a rql query (actually from a rql syntax tree and a solution dictionary mapping each used @@ -547,25 +547,25 @@ args = self.merge_args(args, qargs) assert isinstance(sql, basestring), repr(sql) try: - cursor = self.doexec(session, sql, args) + cursor = self.doexec(cnx, sql, args) except (self.OperationalError, self.InterfaceError): - if session.mode == 'write': + if cnx.mode == 'write': # do not attempt to reconnect if there has been some write # during the transaction raise # FIXME: better detection of deconnection pb self.warning("trying to reconnect") - session.cnxset.reconnect() - cursor = self.doexec(session, sql, args) + cnx.cnxset.reconnect() + cursor = self.doexec(cnx, sql, args) except self.DbapiError as exc: # We get this one with pyodbc and SQL Server when connection was reset - if exc.args[0] == '08S01' and session.mode != 'write': + if exc.args[0] == '08S01' and cnx.mode != 'write': self.warning("trying to reconnect") - session.cnxset.reconnect() - cursor = self.doexec(session, sql, args) + cnx.cnxset.reconnect() + cursor = self.doexec(cnx, sql, args) else: raise - results = self.process_result(cursor, cbs, session=session) + results = self.process_result(cursor, cbs, session=cnx) assert dbg_results(results) return results @@ -600,60 +600,60 @@ for entity, attr, value in restore_values: entity.cw_edited.edited_attribute(attr, value) - def add_entity(self, session, entity): + def add_entity(self, cnx, entity): """add a new entity to the source""" with self._storage_handler(entity, 'added'): attrs = self.preprocess_entity(entity) sql = self.sqlgen.insert(SQL_PREFIX + entity.cw_etype, attrs) - self.doexec(session, sql, attrs) - if session.ertype_supports_undo(entity.cw_etype): - self._record_tx_action(session, 'tx_entity_actions', 'C', + self.doexec(cnx, sql, attrs) + if cnx.ertype_supports_undo(entity.cw_etype): + self._record_tx_action(cnx, 'tx_entity_actions', 'C', etype=entity.cw_etype, eid=entity.eid) - def update_entity(self, session, entity): + def update_entity(self, cnx, entity): """replace an entity in the source""" with self._storage_handler(entity, 'updated'): attrs = self.preprocess_entity(entity) - if session.ertype_supports_undo(entity.cw_etype): - changes = self._save_attrs(session, entity, attrs) - self._record_tx_action(session, 'tx_entity_actions', 'U', + if cnx.ertype_supports_undo(entity.cw_etype): + changes = self._save_attrs(cnx, entity, attrs) + self._record_tx_action(cnx, 'tx_entity_actions', 'U', etype=entity.cw_etype, eid=entity.eid, changes=self._binary(dumps(changes))) sql = self.sqlgen.update(SQL_PREFIX + entity.cw_etype, attrs, ['cw_eid']) - self.doexec(session, sql, attrs) + self.doexec(cnx, sql, attrs) - def delete_entity(self, session, entity): + def delete_entity(self, cnx, entity): """delete an entity from the source""" with self._storage_handler(entity, 'deleted'): - if session.ertype_supports_undo(entity.cw_etype): + if cnx.ertype_supports_undo(entity.cw_etype): attrs = [SQL_PREFIX + r.type for r in entity.e_schema.subject_relations() if (r.final or r.inlined) and not r in VIRTUAL_RTYPES] - changes = self._save_attrs(session, entity, attrs) - self._record_tx_action(session, 'tx_entity_actions', 'D', + changes = self._save_attrs(cnx, entity, attrs) + self._record_tx_action(cnx, 'tx_entity_actions', 'D', etype=entity.cw_etype, eid=entity.eid, changes=self._binary(dumps(changes))) attrs = {'cw_eid': entity.eid} sql = self.sqlgen.delete(SQL_PREFIX + entity.cw_etype, attrs) - self.doexec(session, sql, attrs) + self.doexec(cnx, sql, attrs) - def add_relation(self, session, subject, rtype, object, inlined=False): + def add_relation(self, cnx, subject, rtype, object, inlined=False): """add a relation to the source""" - self._add_relations(session, rtype, [(subject, object)], inlined) - if session.ertype_supports_undo(rtype): - self._record_tx_action(session, 'tx_relation_actions', 'A', + self._add_relations(cnx, rtype, [(subject, object)], inlined) + if cnx.ertype_supports_undo(rtype): + self._record_tx_action(cnx, 'tx_relation_actions', 'A', eid_from=subject, rtype=rtype, eid_to=object) - def add_relations(self, session, rtype, subj_obj_list, inlined=False): + def add_relations(self, cnx, rtype, subj_obj_list, inlined=False): """add a relations to the source""" - self._add_relations(session, rtype, subj_obj_list, inlined) - if session.ertype_supports_undo(rtype): + self._add_relations(cnx, rtype, subj_obj_list, inlined) + if cnx.ertype_supports_undo(rtype): for subject, object in subj_obj_list: - self._record_tx_action(session, 'tx_relation_actions', 'A', + self._record_tx_action(cnx, 'tx_relation_actions', 'A', eid_from=subject, rtype=rtype, eid_to=object) - def _add_relations(self, session, rtype, subj_obj_list, inlined=False): + def _add_relations(self, cnx, rtype, subj_obj_list, inlined=False): """add a relation to the source""" sql = [] if inlined is False: @@ -663,7 +663,7 @@ else: # used by data import etypes = {} for subject, object in subj_obj_list: - etype = session.entity_metas(subject)['type'] + etype = cnx.entity_metas(subject)['type'] if etype in etypes: etypes[etype].append((subject, object)) else: @@ -675,20 +675,20 @@ ['cw_eid']), attrs)) for statement, attrs in sql: - self.doexecmany(session, statement, attrs) + self.doexecmany(cnx, statement, attrs) - def delete_relation(self, session, subject, rtype, object): + def delete_relation(self, cnx, subject, rtype, object): """delete a relation from the source""" rschema = self.schema.rschema(rtype) - self._delete_relation(session, subject, rtype, object, rschema.inlined) - if session.ertype_supports_undo(rtype): - self._record_tx_action(session, 'tx_relation_actions', 'R', + self._delete_relation(cnx, subject, rtype, object, rschema.inlined) + if cnx.ertype_supports_undo(rtype): + self._record_tx_action(cnx, 'tx_relation_actions', 'R', eid_from=subject, rtype=rtype, eid_to=object) - def _delete_relation(self, session, subject, rtype, object, inlined=False): + def _delete_relation(self, cnx, subject, rtype, object, inlined=False): """delete a relation from the source""" if inlined: - table = SQL_PREFIX + session.entity_metas(subject)['type'] + table = SQL_PREFIX + cnx.entity_metas(subject)['type'] column = SQL_PREFIX + rtype sql = 'UPDATE %s SET %s=NULL WHERE %seid=%%(eid)s' % (table, column, SQL_PREFIX) @@ -696,15 +696,15 @@ else: attrs = {'eid_from': subject, 'eid_to': object} sql = self.sqlgen.delete('%s_relation' % rtype, attrs) - self.doexec(session, sql, attrs) + self.doexec(cnx, sql, attrs) - def doexec(self, session, query, args=None, rollback=True): + def doexec(self, cnx, query, args=None, rollback=True): """Execute a query. it's a function just so that it shows up in profiling """ - cursor = session.cnxset.cu + cursor = cnx.cnxset.cu if server.DEBUG & server.DBG_SQL: - cnx = session.cnxset.cnx + cnx = cnx.cnxset.cnx # getattr to get the actual connection if cnx is a CnxLoggingWrapper # instance print 'exec', query, args, getattr(cnx, '_cnx', cnx) @@ -719,7 +719,7 @@ query, args, ex.args[0]) if rollback: try: - session.cnxset.rollback() + cnx.cnxset.rollback() if self.repo.config.mode != 'test': self.critical('transaction has been rolled back') except Exception as ex: @@ -730,31 +730,31 @@ # postgres, sqlserver mo = re.search("unique_[a-z0-9]{32}", arg) if mo is not None: - raise UniqueTogetherError(session, cstrname=mo.group(0)) + raise UniqueTogetherError(cnx, cstrname=mo.group(0)) # old sqlite mo = re.search('columns (.*) are not unique', arg) if mo is not None: # sqlite in use # we left chop the 'cw_' prefix of attribute names rtypes = [c.strip()[3:] for c in mo.group(1).split(',')] - raise UniqueTogetherError(session, rtypes=rtypes) + raise UniqueTogetherError(cnx, rtypes=rtypes) # sqlite after http://www.sqlite.org/cgi/src/info/c80e229dd9c1230a if arg.startswith('UNIQUE constraint failed:'): # message looks like: "UNIQUE constraint failed: foo.cw_bar, foo.cw_baz" # so drop the prefix, split on comma, drop the tablenames, and drop "cw_" columns = arg.split(':', 1)[1].split(',') rtypes = [c.split('.', 1)[1].strip()[3:] for c in columns] - raise UniqueTogetherError(session, rtypes=rtypes) + raise UniqueTogetherError(cnx, rtypes=rtypes) raise return cursor - def doexecmany(self, session, query, args): + def doexecmany(self, cnx, query, args): """Execute a query. it's a function just so that it shows up in profiling """ if server.DEBUG & server.DBG_SQL: print 'execmany', query, 'with', len(args), 'arguments' - cursor = session.cnxset.cu + cursor = cnx.cnxset.cu try: # str(query) to avoid error if it's an unicode string cursor.executemany(str(query), args) @@ -765,7 +765,7 @@ self.critical("sql many: %r\n args: %s\ndbms message: %r", query, args, ex.args[0]) try: - session.cnxset.rollback() + cnx.cnxset.rollback() if self.repo.config.mode != 'test': self.critical('transaction has been rolled back') except Exception: @@ -774,7 +774,7 @@ # short cut to method requiring advanced db helper usage ################## - def update_rdef_column(self, session, rdef): + def update_rdef_column(self, cnx, rdef): """update physical column for a relation definition (final or inlined) """ table, column = rdef_table_column(rdef) @@ -783,12 +783,12 @@ self.error("backend can't alter %s.%s to %s%s", table, column, coltype, not allownull and 'NOT NULL' or '') return - self.dbhelper.change_col_type(LogCursor(session.cnxset.cu), + self.dbhelper.change_col_type(LogCursor(cnx.cnxset.cu), table, column, coltype, allownull) self.info('altered %s.%s: now %s%s', table, column, coltype, not allownull and 'NOT NULL' or '') - def update_rdef_null_allowed(self, session, rdef): + def update_rdef_null_allowed(self, cnx, rdef): """update NULL / NOT NULL of physical column for a relation definition (final or inlined) """ @@ -798,62 +798,62 @@ return table, column = rdef_table_column(rdef) coltype, allownull = rdef_physical_info(self.dbhelper, rdef) - self.dbhelper.set_null_allowed(LogCursor(session.cnxset.cu), + self.dbhelper.set_null_allowed(LogCursor(cnx.cnxset.cu), table, column, coltype, allownull) - def update_rdef_indexed(self, session, rdef): + def update_rdef_indexed(self, cnx, rdef): table, column = rdef_table_column(rdef) if rdef.indexed: - self.create_index(session, table, column) + self.create_index(cnx, table, column) else: - self.drop_index(session, table, column) + self.drop_index(cnx, table, column) - def update_rdef_unique(self, session, rdef): + def update_rdef_unique(self, cnx, rdef): table, column = rdef_table_column(rdef) if rdef.constraint_by_type('UniqueConstraint'): - self.create_index(session, table, column, unique=True) + self.create_index(cnx, table, column, unique=True) else: - self.drop_index(session, table, column, unique=True) + self.drop_index(cnx, table, column, unique=True) - def create_index(self, session, table, column, unique=False): - cursor = LogCursor(session.cnxset.cu) + def create_index(self, cnx, table, column, unique=False): + cursor = LogCursor(cnx.cnxset.cu) self.dbhelper.create_index(cursor, table, column, unique) - def drop_index(self, session, table, column, unique=False): - cursor = LogCursor(session.cnxset.cu) + def drop_index(self, cnx, table, column, unique=False): + cursor = LogCursor(cnx.cnxset.cu) self.dbhelper.drop_index(cursor, table, column, unique) # system source interface ################################################# - def _eid_type_source(self, session, eid, sql, _retry=True): + def _eid_type_source(self, cnx, eid, sql, _retry=True): try: - res = self.doexec(session, sql).fetchone() + res = self.doexec(cnx, sql).fetchone() if res is not None: return res except (self.OperationalError, self.InterfaceError): - if session.mode == 'read' and _retry: + if cnx.mode == 'read' and _retry: self.warning("trying to reconnect (eid_type_source())") - session.cnxset.reconnect() - return self._eid_type_source(session, eid, sql, _retry=False) + cnx.cnxset.reconnect() + return self._eid_type_source(cnx, eid, sql, _retry=False) except Exception: - assert session.cnxset, 'session has no connections set' + assert cnx.cnxset, 'connection has no connections set' self.exception('failed to query entities table for eid %s', eid) raise UnknownEid(eid) - def eid_type_source(self, session, eid): # pylint: disable=E0202 + def eid_type_source(self, cnx, eid): # pylint: disable=E0202 """return a tuple (type, source, extid) for the entity with id """ sql = 'SELECT type, extid, asource FROM entities WHERE eid=%s' % eid - res = self._eid_type_source(session, eid, sql) + res = self._eid_type_source(cnx, eid, sql) if res[-2] is not None: if not isinstance(res, list): res = list(res) res[-2] = b64decode(res[-2]) return res - def eid_type_source_pre_131(self, session, eid): + def eid_type_source_pre_131(self, cnx, eid): """return a tuple (type, source, extid) for the entity with id """ sql = 'SELECT type, extid FROM entities WHERE eid=%s' % eid - res = self._eid_type_source(session, eid, sql) + res = self._eid_type_source(cnx, eid, sql) if not isinstance(res, list): res = list(res) if res[-1] is not None: @@ -861,10 +861,10 @@ res.append(res[1]) return res - def extid2eid(self, session, extid): + def extid2eid(self, cnx, extid): """get eid from an external id. Return None if no record found.""" assert isinstance(extid, str) - cursor = self.doexec(session, + cursor = self.doexec(cnx, 'SELECT eid FROM entities WHERE extid=%(x)s', {'x': b64encode(extid)}) # XXX testing rowcount cause strange bug with sqlite, results are there @@ -878,70 +878,70 @@ pass return None - def _handle_is_relation_sql(self, session, sql, attrs): + def _handle_is_relation_sql(self, cnx, sql, attrs): """ Handler for specific is_relation sql that may be overwritten in some stores""" - self.doexec(session, sql % attrs) + self.doexec(cnx, sql % attrs) _handle_insert_entity_sql = doexec _handle_is_instance_of_sql = _handle_source_relation_sql = _handle_is_relation_sql - def add_info(self, session, entity, source, extid, complete): + def add_info(self, cnx, entity, source, extid, complete): """add type and source info for an eid into the system table""" - with session.ensure_cnx_set: + with cnx.ensure_cnx_set: # begin by inserting eid/type/source/extid into the entities table if extid is not None: assert isinstance(extid, str) extid = b64encode(extid) attrs = {'type': entity.cw_etype, 'eid': entity.eid, 'extid': extid, 'asource': source.uri} - self._handle_insert_entity_sql(session, self.sqlgen.insert('entities', attrs), attrs) + self._handle_insert_entity_sql(cnx, self.sqlgen.insert('entities', attrs), attrs) # insert core relations: is, is_instance_of and cw_source try: - self._handle_is_relation_sql(session, 'INSERT INTO is_relation(eid_from,eid_to) VALUES (%s,%s)', - (entity.eid, eschema_eid(session, entity.e_schema))) + self._handle_is_relation_sql(cnx, 'INSERT INTO is_relation(eid_from,eid_to) VALUES (%s,%s)', + (entity.eid, eschema_eid(cnx, entity.e_schema))) except IndexError: # during schema serialization, skip pass else: for eschema in entity.e_schema.ancestors() + [entity.e_schema]: - self._handle_is_relation_sql(session, + self._handle_is_relation_sql(cnx, 'INSERT INTO is_instance_of_relation(eid_from,eid_to) VALUES (%s,%s)', - (entity.eid, eschema_eid(session, eschema))) + (entity.eid, eschema_eid(cnx, eschema))) if 'CWSource' in self.schema and source.eid is not None: # else, cw < 3.10 - self._handle_is_relation_sql(session, 'INSERT INTO cw_source_relation(eid_from,eid_to) VALUES (%s,%s)', + self._handle_is_relation_sql(cnx, 'INSERT INTO cw_source_relation(eid_from,eid_to) VALUES (%s,%s)', (entity.eid, source.eid)) # now we can update the full text index if self.do_fti and self.need_fti_indexation(entity.cw_etype): if complete: entity.complete(entity.e_schema.indexable_attributes()) - self.index_entity(session, entity=entity) + self.index_entity(cnx, entity=entity) - def update_info(self, session, entity, need_fti_update): + def update_info(self, cnx, entity, need_fti_update): """mark entity as being modified, fulltext reindex if needed""" if self.do_fti and need_fti_update: # reindex the entity only if this query is updating at least # one indexable attribute - self.index_entity(session, entity=entity) + self.index_entity(cnx, entity=entity) - def delete_info_multi(self, session, entities): + def delete_info_multi(self, cnx, entities): """delete system information on deletion of a list of entities with the same etype and belinging to the same source * update the fti * remove record from the `entities` table """ - self.fti_unindex_entities(session, entities) + self.fti_unindex_entities(cnx, entities) attrs = {'eid': '(%s)' % ','.join([str(_e.eid) for _e in entities])} - self.doexec(session, self.sqlgen.delete_many('entities', attrs), attrs) + self.doexec(cnx, self.sqlgen.delete_many('entities', attrs), attrs) # undo support ############################################################# - def undoable_transactions(self, session, ueid=None, **actionfilters): + def undoable_transactions(self, cnx, ueid=None, **actionfilters): """See :class:`cubicweb.repoapi.ClientConnection.undoable_transactions`""" - # force filtering to session's user if not a manager - if not session.user.is_in_group('managers'): - ueid = session.user.eid + # force filtering to connection's user if not a manager + if not cnx.user.is_in_group('managers'): + ueid = cnx.user.eid restr = {} if ueid is not None: restr['tx_user'] = ueid @@ -1005,18 +1005,18 @@ restr.update(tearestr) # we want results ordered by transaction's time descendant sql += ' ORDER BY tx_time DESC' - with session.ensure_cnx_set: - cu = self.doexec(session, sql, restr) + with cnx.ensure_cnx_set: + cu = self.doexec(cnx, sql, restr) # turn results into transaction objects return [tx.Transaction(*args) for args in cu.fetchall()] - def tx_info(self, session, txuuid): + def tx_info(self, cnx, txuuid): """See :class:`cubicweb.repoapi.ClientConnection.transaction_info`""" - return tx.Transaction(txuuid, *self._tx_info(session, txuuid)) + return tx.Transaction(txuuid, *self._tx_info(cnx, txuuid)) - def tx_actions(self, session, txuuid, public): + def tx_actions(self, cnx, txuuid, public): """See :class:`cubicweb.repoapi.ClientConnection.transaction_actions`""" - self._tx_info(session, txuuid) + self._tx_info(cnx, txuuid) restr = {'tx_uuid': txuuid} if public: restr['txa_public'] = True @@ -1024,54 +1024,54 @@ sql = self.sqlgen.select('tx_entity_actions', restr, ('txa_action', 'txa_public', 'txa_order', 'etype', 'eid', 'changes')) - cu = self.doexec(session, sql, restr) + cu = self.doexec(cnx, sql, restr) actions = [tx.EntityAction(a,p,o,et,e,c and loads(self.binary_to_str(c))) for a,p,o,et,e,c in cu.fetchall()] sql = self.sqlgen.select('tx_relation_actions', restr, ('txa_action', 'txa_public', 'txa_order', 'rtype', 'eid_from', 'eid_to')) - cu = self.doexec(session, sql, restr) + cu = self.doexec(cnx, sql, restr) actions += [tx.RelationAction(*args) for args in cu.fetchall()] return sorted(actions, key=lambda x: x.order) - def undo_transaction(self, session, txuuid): + def undo_transaction(self, cnx, txuuid): """See :class:`cubicweb.repoapi.ClientConnection.undo_transaction` important note: while undoing of a transaction, only hooks in the 'integrity', 'activeintegrity' and 'undo' categories are called. """ # set mode so connections set isn't released subsquently until commit/rollback - session.mode = 'write' + cnx.mode = 'write' errors = [] - session.transaction_data['undoing_uuid'] = txuuid - with session.deny_all_hooks_but('integrity', 'activeintegrity', 'undo'): - with session.security_enabled(read=False): - for action in reversed(self.tx_actions(session, txuuid, False)): + cnx.transaction_data['undoing_uuid'] = txuuid + with cnx.deny_all_hooks_but('integrity', 'activeintegrity', 'undo'): + with cnx.security_enabled(read=False): + for action in reversed(self.tx_actions(cnx, txuuid, False)): undomethod = getattr(self, '_undo_%s' % action.action.lower()) - errors += undomethod(session, action) + errors += undomethod(cnx, action) # remove the transactions record - self.doexec(session, + self.doexec(cnx, "DELETE FROM transactions WHERE tx_uuid='%s'" % txuuid) if errors: raise UndoTransactionException(txuuid, errors) else: return - def start_undoable_transaction(self, session, uuid): - """session callback to insert a transaction record in the transactions + def start_undoable_transaction(self, cnx, uuid): + """connection callback to insert a transaction record in the transactions table when some undoable transaction is started """ - ueid = session.user.eid + ueid = cnx.user.eid attrs = {'tx_uuid': uuid, 'tx_user': ueid, 'tx_time': datetime.utcnow()} - self.doexec(session, self.sqlgen.insert('transactions', attrs), attrs) + self.doexec(cnx, self.sqlgen.insert('transactions', attrs), attrs) - def _save_attrs(self, session, entity, attrs): + def _save_attrs(self, cnx, entity, attrs): """return a pickleable dictionary containing current values for given attributes of the entity """ restr = {'cw_eid': entity.eid} sql = self.sqlgen.select(SQL_PREFIX + entity.cw_etype, restr, attrs) - cu = self.doexec(session, sql, restr) + cu = self.doexec(cnx, sql, restr) values = dict(zip(attrs, cu.fetchone())) # ensure backend specific binary are converted back to string eschema = entity.e_schema @@ -1086,38 +1086,38 @@ values[column] = self.binary_to_str(value) return values - def _record_tx_action(self, session, table, action, **kwargs): + def _record_tx_action(self, cnx, table, action, **kwargs): """record a transaction action in the given table (either 'tx_entity_actions' or 'tx_relation_action') """ - kwargs['tx_uuid'] = session.transaction_uuid() + kwargs['tx_uuid'] = cnx.transaction_uuid() kwargs['txa_action'] = action - kwargs['txa_order'] = session.transaction_inc_action_counter() - kwargs['txa_public'] = session.running_dbapi_query - self.doexec(session, self.sqlgen.insert(table, kwargs), kwargs) + kwargs['txa_order'] = cnx.transaction_inc_action_counter() + kwargs['txa_public'] = cnx.running_dbapi_query + self.doexec(cnx, self.sqlgen.insert(table, kwargs), kwargs) - def _tx_info(self, session, txuuid): + def _tx_info(self, cnx, txuuid): """return transaction's time and user of the transaction with the given uuid. raise `NoSuchTransaction` if there is no such transaction of if the - session's user isn't allowed to see it. + connection's user isn't allowed to see it. """ - with session.ensure_cnx_set: + with cnx.ensure_cnx_set: restr = {'tx_uuid': txuuid} sql = self.sqlgen.select('transactions', restr, ('tx_time', 'tx_user')) - cu = self.doexec(session, sql, restr) + cu = self.doexec(cnx, sql, restr) try: time, ueid = cu.fetchone() except TypeError: raise tx.NoSuchTransaction(txuuid) - if not (session.user.is_in_group('managers') - or session.user.eid == ueid): + if not (cnx.user.is_in_group('managers') + or cnx.user.eid == ueid): raise tx.NoSuchTransaction(txuuid) return time, ueid def _reedit_entity(self, entity, changes, err): - session = entity._cw + cnx = entity._cw eid = entity.eid entity.cw_edited = edited = EditedEntity(entity) # check for schema changes, entities linked through inlined relation @@ -1131,7 +1131,7 @@ try: rschema = getrschema[rtype] except KeyError: - err(session._("can't restore relation %(rtype)s of entity %(eid)s, " + err(cnx._("can't restore relation %(rtype)s of entity %(eid)s, " "this relation does not exist in the schema anymore.") % {'rtype': rtype, 'eid': eid}) if not rschema.final: @@ -1144,53 +1144,53 @@ entity._cw.entity_from_eid(value) # check target exists edited[rtype] = value except UnknownEid: - err(session._("can't restore entity %(eid)s of type %(eschema)s, " + err(cnx._("can't restore entity %(eid)s of type %(eschema)s, " "target of %(rtype)s (eid %(value)s) does not exist any longer") % locals()) elif eschema.destination(rtype) in ('Bytes', 'Password'): changes[column] = self._binary(value) edited[rtype] = Binary(value) elif isinstance(value, str): - edited[rtype] = unicode(value, session.encoding, 'replace') + edited[rtype] = unicode(value, cnx.encoding, 'replace') else: edited[rtype] = value # This must only be done after init_entitiy_caches : defered in calling functions # edited.check() - def _undo_d(self, session, action): + def _undo_d(self, cnx, action): """undo an entity deletion""" errors = [] err = errors.append eid = action.eid etype = action.etype - _ = session._ + _ = cnx._ # get an entity instance try: - entity = self.repo.vreg['etypes'].etype_class(etype)(session) + entity = self.repo.vreg['etypes'].etype_class(etype)(cnx) except Exception: err("can't restore entity %s of type %s, type no more supported" % (eid, etype)) return errors self._reedit_entity(entity, action.changes, err) entity.eid = eid - session.repo.init_entity_caches(session, entity, self) + cnx.repo.init_entity_caches(cnx, entity, self) entity.cw_edited.check() - self.repo.hm.call_hooks('before_add_entity', session, entity=entity) + self.repo.hm.call_hooks('before_add_entity', cnx, entity=entity) # restore the entity action.changes['cw_eid'] = eid sql = self.sqlgen.insert(SQL_PREFIX + etype, action.changes) - self.doexec(session, sql, action.changes) + self.doexec(cnx, sql, action.changes) # restore record in entities (will update fti if needed) - self.add_info(session, entity, self, None, True) - self.repo.hm.call_hooks('after_add_entity', session, entity=entity) + self.add_info(cnx, entity, self, None, True) + self.repo.hm.call_hooks('after_add_entity', cnx, entity=entity) return errors - def _undo_r(self, session, action): + def _undo_r(self, cnx, action): """undo a relation removal""" errors = [] subj, rtype, obj = action.eid_from, action.rtype, action.eid_to try: - sentity, oentity, rdef = _undo_rel_info(session, subj, rtype, obj) + sentity, oentity, rdef = _undo_rel_info(cnx, subj, rtype, obj) except _UndoException as ex: errors.append(unicode(ex)) else: @@ -1202,78 +1202,78 @@ errors.append(unicode(ex)) continue if not errors: - self.repo.hm.call_hooks('before_add_relation', session, + self.repo.hm.call_hooks('before_add_relation', cnx, eidfrom=subj, rtype=rtype, eidto=obj) # add relation in the database - self._add_relations(session, rtype, [(subj, obj)], rdef.rtype.inlined) + self._add_relations(cnx, rtype, [(subj, obj)], rdef.rtype.inlined) # set related cache - session.update_rel_cache_add(subj, rtype, obj, rdef.rtype.symmetric) - self.repo.hm.call_hooks('after_add_relation', session, + cnx.update_rel_cache_add(subj, rtype, obj, rdef.rtype.symmetric) + self.repo.hm.call_hooks('after_add_relation', cnx, eidfrom=subj, rtype=rtype, eidto=obj) return errors - def _undo_c(self, session, action): + def _undo_c(self, cnx, action): """undo an entity creation""" eid = action.eid # XXX done to avoid fetching all remaining relation for the entity # we should find an efficient way to do this (keeping current veolidf # massive deletion performance) - if _undo_has_later_transaction(session, eid): - msg = session._('some later transaction(s) touch entity, undo them ' + if _undo_has_later_transaction(cnx, eid): + msg = cnx._('some later transaction(s) touch entity, undo them ' 'first') raise ValidationError(eid, {None: msg}) etype = action.etype # get an entity instance try: - entity = self.repo.vreg['etypes'].etype_class(etype)(session) + entity = self.repo.vreg['etypes'].etype_class(etype)(cnx) except Exception: - return [session._( + return [cnx._( "Can't undo creation of entity %(eid)s of type %(etype)s, type " "no more supported" % {'eid': eid, 'etype': etype})] entity.eid = eid # for proper eid/type cache update - CleanupDeletedEidsCacheOp.get_instance(session).add_data(eid) - self.repo.hm.call_hooks('before_delete_entity', session, entity=entity) + CleanupDeletedEidsCacheOp.get_instance(cnx).add_data(eid) + self.repo.hm.call_hooks('before_delete_entity', cnx, entity=entity) # remove is / is_instance_of which are added using sql by hooks, hence # unvisible as transaction action - self.doexec(session, 'DELETE FROM is_relation WHERE eid_from=%s' % eid) - self.doexec(session, 'DELETE FROM is_instance_of_relation WHERE eid_from=%s' % eid) - self.doexec(session, 'DELETE FROM cw_source_relation WHERE eid_from=%s' % self.eid) + self.doexec(cnx, 'DELETE FROM is_relation WHERE eid_from=%s' % eid) + self.doexec(cnx, 'DELETE FROM is_instance_of_relation WHERE eid_from=%s' % eid) + self.doexec(cnx, 'DELETE FROM cw_source_relation WHERE eid_from=%s' % self.eid) # XXX check removal of inlined relation? # delete the entity attrs = {'cw_eid': eid} sql = self.sqlgen.delete(SQL_PREFIX + entity.cw_etype, attrs) - self.doexec(session, sql, attrs) + self.doexec(cnx, sql, attrs) # remove record from entities (will update fti if needed) - self.delete_info_multi(session, [entity]) - self.repo.hm.call_hooks('after_delete_entity', session, entity=entity) + self.delete_info_multi(cnx, [entity]) + self.repo.hm.call_hooks('after_delete_entity', cnx, entity=entity) return () - def _undo_u(self, session, action): + def _undo_u(self, cnx, action): """undo an entity update""" errors = [] err = errors.append try: - entity = session.entity_from_eid(action.eid) + entity = cnx.entity_from_eid(action.eid) except UnknownEid: - err(session._("can't restore state of entity %s, it has been " + err(cnx._("can't restore state of entity %s, it has been " "deleted inbetween") % action.eid) return errors self._reedit_entity(entity, action.changes, err) entity.cw_edited.check() - self.repo.hm.call_hooks('before_update_entity', session, entity=entity) + self.repo.hm.call_hooks('before_update_entity', cnx, entity=entity) sql = self.sqlgen.update(SQL_PREFIX + entity.cw_etype, action.changes, ['cw_eid']) - self.doexec(session, sql, action.changes) - self.repo.hm.call_hooks('after_update_entity', session, entity=entity) + self.doexec(cnx, sql, action.changes) + self.repo.hm.call_hooks('after_update_entity', cnx, entity=entity) return errors - def _undo_a(self, session, action): + def _undo_a(self, cnx, action): """undo a relation addition""" errors = [] subj, rtype, obj = action.eid_from, action.rtype, action.eid_to try: - sentity, oentity, rdef = _undo_rel_info(session, subj, rtype, obj) + sentity, oentity, rdef = _undo_rel_info(cnx, subj, rtype, obj) except _UndoException as ex: errors.append(unicode(ex)) else: @@ -1284,19 +1284,19 @@ else: sql = 'SELECT 1 FROM %s_relation WHERE eid_from=%s and eid_to=%s'\ % (rtype, subj, obj) - cu = self.doexec(session, sql) + cu = self.doexec(cnx, sql) if cu.fetchone() is None: - errors.append(session._( + errors.append(cnx._( "Can't undo addition of relation %(rtype)s from %(subj)s to" " %(obj)s, doesn't exist anymore" % locals())) if not errors: - self.repo.hm.call_hooks('before_delete_relation', session, + self.repo.hm.call_hooks('before_delete_relation', cnx, eidfrom=subj, rtype=rtype, eidto=obj) # delete relation from the database - self._delete_relation(session, subj, rtype, obj, rschema.inlined) + self._delete_relation(cnx, subj, rtype, obj, rschema.inlined) # set related cache - session.update_rel_cache_del(subj, rtype, obj, rschema.symmetric) - self.repo.hm.call_hooks('after_delete_relation', session, + cnx.update_rel_cache_del(subj, rtype, obj, rschema.symmetric) + self.repo.hm.call_hooks('after_delete_relation', cnx, eidfrom=subj, rtype=rtype, eidto=obj) return errors @@ -1311,11 +1311,11 @@ return True return False - def index_entity(self, session, entity): + def index_entity(self, cnx, entity): """create an operation to [re]index textual content of the given entity on commit """ - FTIndexEntityOp.get_instance(session).add_data(entity.eid) + FTIndexEntityOp.get_instance(cnx).add_data(entity.eid) def fti_unindex_entities(self, cnx, entities): """remove text content for entities from the full text index @@ -1354,10 +1354,10 @@ """ def precommit_event(self): - session = self.session - source = session.repo.system_source - pendingeids = session.transaction_data.get('pendingeids', ()) - done = session.transaction_data.setdefault('indexedeids', set()) + cnx = self.cnx + source = cnx.repo.system_source + pendingeids = cnx.transaction_data.get('pendingeids', ()) + done = cnx.transaction_data.setdefault('indexedeids', set()) to_reindex = set() for eid in self.get_data(): if eid in pendingeids or eid in done: @@ -1365,10 +1365,10 @@ # processed continue done.add(eid) - iftindexable = session.entity_from_eid(eid).cw_adapt_to('IFTIndexable') + iftindexable = cnx.entity_from_eid(eid).cw_adapt_to('IFTIndexable') to_reindex |= set(iftindexable.fti_containers()) - source.fti_unindex_entities(session, to_reindex) - source.fti_index_entities(session, to_reindex) + source.fti_unindex_entities(cnx, to_reindex) + source.fti_index_entities(cnx, to_reindex) def sql_schema(driver): helper = get_db_helper(driver)