diff -r 058bb3dc685f -r 0b59724cb3f2 cubicweb/server/sources/native.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/cubicweb/server/sources/native.py Sat Jan 16 13:48:51 2016 +0100 @@ -0,0 +1,1813 @@ +# copyright 2003-2014 LOGILAB S.A. (Paris, FRANCE), all rights reserved. +# contact http://www.logilab.fr/ -- mailto:contact@logilab.fr +# +# This file is part of CubicWeb. +# +# CubicWeb is free software: you can redistribute it and/or modify it under the +# terms of the GNU Lesser General Public License as published by the Free +# Software Foundation, either version 2.1 of the License, or (at your option) +# any later version. +# +# CubicWeb is distributed in the hope that it will be useful, but WITHOUT +# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS +# FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more +# details. +# +# You should have received a copy of the GNU Lesser General Public License along +# with CubicWeb. If not, see . +"""Adapters for native cubicweb sources. + +Notes: +* extid (aka external id, the primary key of an entity in the external source + from which it comes from) are stored in a varchar column encoded as a base64 + string. This is because it should actually be Bytes but we want an index on + it for fast querying. +""" +from __future__ import print_function + +__docformat__ = "restructuredtext en" + +from threading import Lock +from datetime import datetime +from base64 import b64encode +from contextlib import contextmanager +from os.path import basename +import re +import itertools +import zipfile +import logging +import sys + +from six import PY2, text_type, binary_type, string_types +from six.moves import range, cPickle as pickle + +from logilab.common.decorators import cached, clear_cache +from logilab.common.configuration import Method +from logilab.common.shellutils import getlogin +from logilab.database import get_db_helper, sqlgen + +from yams.schema import role_name + +from cubicweb import (UnknownEid, AuthenticationError, ValidationError, Binary, + UniqueTogetherError, UndoTransactionException, ViolatedConstraint) +from cubicweb import transaction as tx, server, neg_role +from cubicweb.utils import QueryCache +from cubicweb.schema import VIRTUAL_RTYPES +from cubicweb.cwconfig import CubicWebNoAppConfiguration +from cubicweb.server import hook +from cubicweb.server import schema2sql as y2sql +from cubicweb.server.utils import crypt_password, eschema_eid, verify_and_update +from cubicweb.server.sqlutils import SQL_PREFIX, SQLAdapterMixIn +from cubicweb.server.rqlannotation import set_qdata +from cubicweb.server.hook import CleanupDeletedEidsCacheOp +from cubicweb.server.edition import EditedEntity +from cubicweb.server.sources import AbstractSource, dbg_st_search, dbg_results +from cubicweb.server.sources.rql2sql import SQLGenerator +from cubicweb.statsd_logger import statsd_timeit + + +ATTR_MAP = {} +NONSYSTEM_ETYPES = set() +NONSYSTEM_RELATIONS = set() + +class LogCursor(object): + def __init__(self, cursor): + self.cu = cursor + + def execute(self, query, args=None): + """Execute a query. + it's a function just so that it shows up in profiling + """ + if server.DEBUG & server.DBG_SQL: + print('exec', query, args) + try: + self.cu.execute(str(query), args) + except Exception as ex: + print("sql: %r\n args: %s\ndbms message: %r" % ( + query, args, ex.args[0])) + raise + + def fetchall(self): + return self.cu.fetchall() + + def fetchone(self): + return self.cu.fetchone() + + +def sql_or_clauses(sql, clauses): + select, restr = sql.split(' WHERE ', 1) + restrclauses = restr.split(' AND ') + for clause in clauses: + restrclauses.remove(clause) + if restrclauses: + restr = '%s AND (%s)' % (' AND '.join(restrclauses), + ' OR '.join(clauses)) + else: + restr = '(%s)' % ' OR '.join(clauses) + return '%s WHERE %s' % (select, restr) + + +def rdef_table_column(rdef): + """return table and column used to store the given relation definition in + the database + """ + return (SQL_PREFIX + str(rdef.subject), + SQL_PREFIX + str(rdef.rtype)) + + +def rdef_physical_info(dbhelper, rdef): + """return backend type and a boolean flag if NULL values should be allowed + for a given relation definition + """ + if not rdef.object.final: + return dbhelper.TYPE_MAPPING['Int'] + coltype = y2sql.type_from_rdef(dbhelper, rdef, creating=False) + allownull = rdef.cardinality[0] != '1' + return coltype, allownull + + +class _UndoException(Exception): + """something went wrong during undoing""" + + def __unicode__(self): + """Called by the unicode builtin; should return a Unicode object + + Type of _UndoException message must be `unicode` by design in CubicWeb. + """ + assert isinstance(self.args[0], text_type) + return self.args[0] + + +def _undo_check_relation_target(tentity, rdef, role): + """check linked entity has not been redirected for this relation""" + card = rdef.role_cardinality(role) + if card in '?1' and tentity.related(rdef.rtype, role): + raise _UndoException(tentity._cw._( + "Can't restore %(role)s relation %(rtype)s to entity %(eid)s which " + "is already linked using this relation.") + % {'role': neg_role(role), + 'rtype': rdef.rtype, + 'eid': tentity.eid}) + +def _undo_rel_info(cnx, subj, rtype, obj): + entities = [] + for role, eid in (('subject', subj), ('object', obj)): + try: + entities.append(cnx.entity_from_eid(eid)) + except UnknownEid: + raise _UndoException(cnx._( + "Can't restore relation %(rtype)s, %(role)s entity %(eid)s" + " doesn't exist anymore.") + % {'role': cnx._(role), + 'rtype': cnx._(rtype), + 'eid': eid}) + sentity, oentity = entities + try: + rschema = cnx.vreg.schema.rschema(rtype) + rdef = rschema.rdefs[(sentity.cw_etype, oentity.cw_etype)] + except KeyError: + raise _UndoException(cnx._( + "Can't restore relation %(rtype)s between %(subj)s and " + "%(obj)s, that relation does not exists anymore in the " + "schema.") + % {'rtype': cnx._(rtype), + 'subj': subj, + 'obj': obj}) + return sentity, oentity, rdef + +def _undo_has_later_transaction(cnx, eid): + return cnx.system_sql('''\ +SELECT T.tx_uuid FROM transactions AS TREF, transactions AS T +WHERE TREF.tx_uuid='%(txuuid)s' AND T.tx_uuid!='%(txuuid)s' +AND T.tx_time>=TREF.tx_time +AND (EXISTS(SELECT 1 FROM tx_entity_actions AS TEA + WHERE TEA.tx_uuid=T.tx_uuid AND TEA.eid=%(eid)s) + OR EXISTS(SELECT 1 FROM tx_relation_actions as TRA + WHERE TRA.tx_uuid=T.tx_uuid AND ( + TRA.eid_from=%(eid)s OR TRA.eid_to=%(eid)s)) + )''' % {'txuuid': cnx.transaction_data['undoing_uuid'], + 'eid': eid}).fetchone() + + +class DefaultEidGenerator(object): + __slots__ = ('source', 'cnx', 'lock') + + def __init__(self, source): + self.source = source + self.cnx = None + self.lock = Lock() + + def close(self): + if self.cnx: + self.cnx.close() + self.cnx = None + + def create_eid(self, _cnx, count=1): + # lock needed to prevent 'Connection is busy with results for another + # command (0)' errors with SQLServer + assert count > 0 + with self.lock: + return self._create_eid(count) + + def _create_eid(self, count): + # internal function doing the eid creation without locking. + # needed for the recursive handling of disconnections (otherwise we + # deadlock on self._eid_cnx_lock + source = self.source + if self.cnx is None: + self.cnx = source.get_connection() + cnx = self.cnx + try: + cursor = cnx.cursor() + for sql in source.dbhelper.sqls_increment_numrange('entities_id_seq', count): + cursor.execute(sql) + eid = cursor.fetchone()[0] + except (source.OperationalError, source.InterfaceError): + # FIXME: better detection of deconnection pb + source.warning("trying to reconnect create eid connection") + self.cnx = None + return self._create_eid(count) + except source.DbapiError as exc: + # We get this one with pyodbc and SQL Server when connection was reset + if exc.args[0] == '08S01': + source.warning("trying to reconnect create eid connection") + self.cnx = None + return self._create_eid(count) + else: + raise + except Exception: # WTF? + cnx.rollback() + self.cnx = None + source.exception('create eid failed in an unforeseen way on SQL statement %s', sql) + raise + else: + cnx.commit() + return eid + + +class SQLITEEidGenerator(object): + __slots__ = ('source', 'lock') + + def __init__(self, source): + self.source = source + self.lock = Lock() + + def close(self): + pass + + def create_eid(self, cnx, count=1): + assert count > 0 + source = self.source + with self.lock: + for sql in source.dbhelper.sqls_increment_numrange('entities_id_seq', count): + cursor = source.doexec(cnx, sql) + return cursor.fetchone()[0] + + +class NativeSQLSource(SQLAdapterMixIn, AbstractSource): + """adapter for source using the native cubicweb schema (see below) + """ + sqlgen_class = SQLGenerator + options = ( + ('db-driver', + {'type' : 'string', + 'default': 'postgres', + # XXX use choice type + 'help': 'database driver (postgres, sqlite, sqlserver2005)', + 'group': 'native-source', 'level': 0, + }), + ('db-host', + {'type' : 'string', + 'default': '', + 'help': 'database host', + 'group': 'native-source', 'level': 1, + }), + ('db-port', + {'type' : 'string', + 'default': '', + 'help': 'database port', + 'group': 'native-source', 'level': 1, + }), + ('db-name', + {'type' : 'string', + 'default': Method('default_instance_id'), + 'help': 'database name', + 'group': 'native-source', 'level': 0, + }), + ('db-namespace', + {'type' : 'string', + 'default': '', + 'help': 'database namespace (schema) name', + 'group': 'native-source', 'level': 1, + }), + ('db-user', + {'type' : 'string', + 'default': CubicWebNoAppConfiguration.mode == 'user' and getlogin() or 'cubicweb', + 'help': 'database user', + 'group': 'native-source', 'level': 0, + }), + ('db-password', + {'type' : 'password', + 'default': '', + 'help': 'database password', + 'group': 'native-source', 'level': 0, + }), + ('db-encoding', + {'type' : 'string', + 'default': 'utf8', + 'help': 'database encoding', + 'group': 'native-source', 'level': 1, + }), + ('db-extra-arguments', + {'type' : 'string', + 'default': '', + 'help': 'set to "Trusted_Connection" if you are using SQLServer and ' + 'want trusted authentication for the database connection', + 'group': 'native-source', 'level': 2, + }), + ('db-statement-timeout', + {'type': 'int', + 'default': 0, + 'help': 'sql statement timeout, in milliseconds (postgres only)', + 'group': 'native-source', 'level': 2, + }), + ) + + def __init__(self, repo, source_config, *args, **kwargs): + SQLAdapterMixIn.__init__(self, source_config, repairing=repo.config.repairing) + self.authentifiers = [LoginPasswordAuthentifier(self)] + if repo.config['allow-email-login']: + self.authentifiers.insert(0, EmailPasswordAuthentifier(self)) + AbstractSource.__init__(self, repo, source_config, *args, **kwargs) + # sql generator + self._rql_sqlgen = self.sqlgen_class(self.schema, self.dbhelper, + ATTR_MAP.copy()) + # full text index helper + self.do_fti = not repo.config['delay-full-text-indexation'] + # sql queries cache + self._cache = QueryCache(repo.config['rql-cache-size']) + # (etype, attr) / storage mapping + self._storages = {} + self.binary_to_str = self.dbhelper.dbapi_module.binary_to_str + if self.dbdriver == 'sqlite': + self.eid_generator = SQLITEEidGenerator(self) + else: + self.eid_generator = DefaultEidGenerator(self) + self.create_eid = self.eid_generator.create_eid + + def check_config(self, source_entity): + """check configuration of source entity""" + if source_entity.host_config: + msg = source_entity._cw._('the system source has its configuration ' + 'stored on the file-system') + raise ValidationError(source_entity.eid, {role_name('config', 'subject'): msg}) + + def add_authentifier(self, authentifier): + self.authentifiers.append(authentifier) + authentifier.source = self + authentifier.set_schema(self.schema) + + def reset_caches(self): + """method called during test to reset potential source caches""" + self._cache = QueryCache(self.repo.config['rql-cache-size']) + + def clear_eid_cache(self, eid, etype): + """clear potential caches for the given eid""" + self._cache.pop('Any X WHERE X eid %s, X is %s' % (eid, etype), None) + self._cache.pop('Any X WHERE X eid %s' % eid, None) + self._cache.pop('Any %s' % eid, None) + + @statsd_timeit + def sqlexec(self, cnx, sql, args=None): + """execute the query and return its result""" + return self.process_result(self.doexec(cnx, sql, args)) + + def init_creating(self, cnxset=None): + # check full text index availibility + if self.do_fti: + if cnxset is None: + _cnxset = self.repo._get_cnxset() + else: + _cnxset = cnxset + if not self.dbhelper.has_fti_table(_cnxset.cu): + if not self.repo.config.creating: + self.critical('no text index table') + self.do_fti = False + if cnxset is None: + _cnxset.cnxset_freed() + self.repo._free_cnxset(_cnxset) + + def backup(self, backupfile, confirm, format='native'): + """method called to create a backup of the source's data""" + if format == 'portable': + # ensure the schema is the one stored in the database: if repository + # started in quick_start mode, the file system's one has been loaded + # so force reload + if self.repo.config.quick_start: + self.repo.set_schema(self.repo.deserialize_schema(), + resetvreg=False) + helper = DatabaseIndependentBackupRestore(self) + self.close_source_connections() + try: + helper.backup(backupfile) + finally: + self.open_source_connections() + elif format == 'native': + self.close_source_connections() + try: + self.backup_to_file(backupfile, confirm) + finally: + self.open_source_connections() + else: + raise ValueError('Unknown format %r' % format) + + + def restore(self, backupfile, confirm, drop, format='native'): + """method called to restore a backup of source's data""" + if self.repo.config.init_cnxset_pool: + self.close_source_connections() + try: + if format == 'portable': + helper = DatabaseIndependentBackupRestore(self) + helper.restore(backupfile) + elif format == 'native': + self.restore_from_file(backupfile, confirm, drop=drop) + else: + raise ValueError('Unknown format %r' % format) + finally: + if self.repo.config.init_cnxset_pool: + self.open_source_connections() + + + def init(self, activated, source_entity): + try: + # test if 'asource' column exists + query = self.dbhelper.sql_add_limit_offset('SELECT asource FROM entities', 1) + source_entity._cw.system_sql(query) + except Exception as ex: + self.eid_type_source = self.eid_type_source_pre_131 + super(NativeSQLSource, self).init(activated, source_entity) + self.init_creating(source_entity._cw.cnxset) + + def shutdown(self): + self.eid_generator.close() + + # XXX deprecates [un]map_attribute? + def map_attribute(self, etype, attr, cb, sourcedb=True): + self._rql_sqlgen.attr_map[u'%s.%s' % (etype, attr)] = (cb, sourcedb) + + def unmap_attribute(self, etype, attr): + self._rql_sqlgen.attr_map.pop(u'%s.%s' % (etype, attr), None) + + def set_storage(self, etype, attr, storage): + storage_dict = self._storages.setdefault(etype, {}) + storage_dict[attr] = storage + self.map_attribute(etype, attr, + storage.callback, storage.is_source_callback) + + def unset_storage(self, etype, attr): + self._storages[etype].pop(attr) + # if etype has no storage left, remove the entry + if not self._storages[etype]: + del self._storages[etype] + self.unmap_attribute(etype, attr) + + def storage(self, etype, attr): + """return the storage for the given entity type / attribute + """ + try: + return self._storages[etype][attr] + except KeyError: + raise Exception('no custom storage set for %s.%s' % (etype, attr)) + + # ISource interface ####################################################### + + @statsd_timeit + def compile_rql(self, rql, sols): + rqlst = self.repo.vreg.rqlhelper.parse(rql) + rqlst.restricted_vars = () + rqlst.children[0].solutions = sols + self.repo.querier.sqlgen_annotate(rqlst) + set_qdata(self.schema.rschema, rqlst, ()) + return rqlst + + def set_schema(self, schema): + """set the instance'schema""" + self._cache = QueryCache(self.repo.config['rql-cache-size']) + self.cache_hit, self.cache_miss, self.no_cache = 0, 0, 0 + self.schema = schema + try: + self._rql_sqlgen.schema = schema + except AttributeError: + pass # __init__ + for authentifier in self.authentifiers: + authentifier.set_schema(self.schema) + clear_cache(self, 'need_fti_indexation') + + def support_entity(self, etype, write=False): + """return true if the given entity's type is handled by this adapter + if write is true, return true only if it's a RW support + """ + return not etype in NONSYSTEM_ETYPES + + def support_relation(self, rtype, write=False): + """return true if the given relation's type is handled by this adapter + if write is true, return true only if it's a RW support + """ + if write: + return not rtype in NONSYSTEM_RELATIONS + # due to current multi-sources implementation, the system source + # can't claim not supporting a relation + return True #not rtype == 'content_for' + + @statsd_timeit + def authenticate(self, cnx, login, **kwargs): + """return CWUser eid for the given login and other authentication + information found in kwargs, else raise `AuthenticationError` + """ + for authentifier in self.authentifiers: + try: + return authentifier.authenticate(cnx, login, **kwargs) + except AuthenticationError: + continue + raise AuthenticationError() + + def syntax_tree_search(self, cnx, union, args=None, cachekey=None, + varmap=None): + """return result from this source for a rql query (actually from + a rql syntax tree and a solution dictionary mapping each used + variable to a possible type). If cachekey is given, the query + necessary to fetch the results (but not the results themselves) + may be cached using this key. + """ + assert dbg_st_search(self.uri, union, varmap, args, cachekey) + # remember number of actually selected term (sql generation may append some) + if cachekey is None: + self.no_cache += 1 + # generate sql query if we are able to do so (not supported types...) + sql, qargs, cbs = self._rql_sqlgen.generate(union, args, varmap) + else: + # sql may be cached + try: + sql, qargs, cbs = self._cache[cachekey] + self.cache_hit += 1 + except KeyError: + self.cache_miss += 1 + sql, qargs, cbs = self._rql_sqlgen.generate(union, args, varmap) + self._cache[cachekey] = sql, qargs, cbs + args = self.merge_args(args, qargs) + assert isinstance(sql, string_types), repr(sql) + cursor = self.doexec(cnx, sql, args) + results = self.process_result(cursor, cnx, cbs) + assert dbg_results(results) + return results + + @contextmanager + def _fixup_cw(self, cnx, entity): + _cw = entity._cw + entity._cw = cnx + try: + yield + finally: + entity._cw = _cw + + @contextmanager + def _storage_handler(self, cnx, entity, event): + # 1/ memorize values as they are before the storage is called. + # For instance, the BFSStorage will replace the `data` + # binary value with a Binary containing the destination path + # on the filesystem. To make the entity.data usage absolutely + # transparent, we'll have to reset entity.data to its binary + # value once the SQL query will be executed + restore_values = [] + if isinstance(entity, list): + entities = entity + else: + entities = [entity] + etype = entities[0].__regid__ + for attr, storage in self._storages.get(etype, {}).items(): + for entity in entities: + with self._fixup_cw(cnx, entity): + if event == 'deleted': + storage.entity_deleted(entity, attr) + else: + edited = entity.cw_edited + if attr in edited: + handler = getattr(storage, 'entity_%s' % event) + to_restore = handler(entity, attr) + restore_values.append((entity, attr, to_restore)) + try: + yield # 2/ execute the source's instructions + finally: + # 3/ restore original values + for entity, attr, value in restore_values: + entity.cw_edited.edited_attribute(attr, value) + + def add_entity(self, cnx, entity): + """add a new entity to the source""" + with self._storage_handler(cnx, entity, 'added'): + attrs = self.preprocess_entity(entity) + sql = self.sqlgen.insert(SQL_PREFIX + entity.cw_etype, attrs) + self.doexec(cnx, sql, attrs) + if cnx.ertype_supports_undo(entity.cw_etype): + self._record_tx_action(cnx, 'tx_entity_actions', u'C', + etype=text_type(entity.cw_etype), eid=entity.eid) + + def update_entity(self, cnx, entity): + """replace an entity in the source""" + with self._storage_handler(cnx, entity, 'updated'): + attrs = self.preprocess_entity(entity) + if cnx.ertype_supports_undo(entity.cw_etype): + changes = self._save_attrs(cnx, entity, attrs) + self._record_tx_action(cnx, 'tx_entity_actions', u'U', + etype=text_type(entity.cw_etype), eid=entity.eid, + changes=self._binary(pickle.dumps(changes))) + sql = self.sqlgen.update(SQL_PREFIX + entity.cw_etype, attrs, + ['cw_eid']) + self.doexec(cnx, sql, attrs) + + def delete_entity(self, cnx, entity): + """delete an entity from the source""" + with self._storage_handler(cnx, entity, 'deleted'): + if cnx.ertype_supports_undo(entity.cw_etype): + attrs = [SQL_PREFIX + r.type + for r in entity.e_schema.subject_relations() + if (r.final or r.inlined) and not r in VIRTUAL_RTYPES] + changes = self._save_attrs(cnx, entity, attrs) + self._record_tx_action(cnx, 'tx_entity_actions', u'D', + etype=text_type(entity.cw_etype), eid=entity.eid, + changes=self._binary(pickle.dumps(changes))) + attrs = {'cw_eid': entity.eid} + sql = self.sqlgen.delete(SQL_PREFIX + entity.cw_etype, attrs) + self.doexec(cnx, sql, attrs) + + def add_relation(self, cnx, subject, rtype, object, inlined=False): + """add a relation to the source""" + self._add_relations(cnx, rtype, [(subject, object)], inlined) + if cnx.ertype_supports_undo(rtype): + self._record_tx_action(cnx, 'tx_relation_actions', u'A', + eid_from=subject, rtype=text_type(rtype), eid_to=object) + + def add_relations(self, cnx, rtype, subj_obj_list, inlined=False): + """add a relations to the source""" + self._add_relations(cnx, rtype, subj_obj_list, inlined) + if cnx.ertype_supports_undo(rtype): + for subject, object in subj_obj_list: + self._record_tx_action(cnx, 'tx_relation_actions', u'A', + eid_from=subject, rtype=text_type(rtype), eid_to=object) + + def _add_relations(self, cnx, rtype, subj_obj_list, inlined=False): + """add a relation to the source""" + sql = [] + if inlined is False: + attrs = [{'eid_from': subject, 'eid_to': object} + for subject, object in subj_obj_list] + sql.append((self.sqlgen.insert('%s_relation' % rtype, attrs[0]), attrs)) + else: # used by data import + etypes = {} + for subject, object in subj_obj_list: + etype = cnx.entity_metas(subject)['type'] + if etype in etypes: + etypes[etype].append((subject, object)) + else: + etypes[etype] = [(subject, object)] + for subj_etype, subj_obj_list in etypes.items(): + attrs = [{'cw_eid': subject, SQL_PREFIX + rtype: object} + for subject, object in subj_obj_list] + sql.append((self.sqlgen.update(SQL_PREFIX + etype, attrs[0], + ['cw_eid']), + attrs)) + for statement, attrs in sql: + self.doexecmany(cnx, statement, attrs) + + def delete_relation(self, cnx, subject, rtype, object): + """delete a relation from the source""" + rschema = self.schema.rschema(rtype) + self._delete_relation(cnx, subject, rtype, object, rschema.inlined) + if cnx.ertype_supports_undo(rtype): + self._record_tx_action(cnx, 'tx_relation_actions', u'R', + eid_from=subject, rtype=text_type(rtype), eid_to=object) + + def _delete_relation(self, cnx, subject, rtype, object, inlined=False): + """delete a relation from the source""" + if inlined: + table = SQL_PREFIX + cnx.entity_metas(subject)['type'] + column = SQL_PREFIX + rtype + sql = 'UPDATE %s SET %s=NULL WHERE %seid=%%(eid)s' % (table, column, + SQL_PREFIX) + attrs = {'eid' : subject} + else: + attrs = {'eid_from': subject, 'eid_to': object} + sql = self.sqlgen.delete('%s_relation' % rtype, attrs) + self.doexec(cnx, sql, attrs) + + @statsd_timeit + def doexec(self, cnx, query, args=None, rollback=True): + """Execute a query. + it's a function just so that it shows up in profiling + """ + cursor = cnx.cnxset.cu + if server.DEBUG & server.DBG_SQL: + print('exec', query, args, cnx.cnxset.cnx) + try: + # str(query) to avoid error if it's a unicode string + cursor.execute(str(query), args) + except Exception as ex: + if self.repo.config.mode != 'test': + # during test we get those message when trying to alter sqlite + # db schema + self.info("sql: %r\n args: %s\ndbms message: %r", + query, args, ex.args[0]) + if rollback: + try: + cnx.cnxset.rollback() + if self.repo.config.mode != 'test': + self.debug('transaction has been rolled back') + except Exception as ex: + pass + if ex.__class__.__name__ == 'IntegrityError': + # need string comparison because of various backends + for arg in ex.args: + # postgres, sqlserver + mo = re.search("unique_[a-z0-9]{32}", arg) + if mo is not None: + raise UniqueTogetherError(cnx, cstrname=mo.group(0)) + # old sqlite + mo = re.search('columns? (.*) (?:is|are) not unique', arg) + if mo is not None: # sqlite in use + # we left chop the 'cw_' prefix of attribute names + rtypes = [c.strip()[3:] + for c in mo.group(1).split(',')] + raise UniqueTogetherError(cnx, rtypes=rtypes) + # sqlite after http://www.sqlite.org/cgi/src/info/c80e229dd9c1230a + if arg.startswith('UNIQUE constraint failed:'): + # message looks like: "UNIQUE constraint failed: foo.cw_bar, foo.cw_baz" + # so drop the prefix, split on comma, drop the tablenames, and drop "cw_" + columns = arg.split(':', 1)[1].split(',') + rtypes = [c.split('.', 1)[1].strip()[3:] for c in columns] + raise UniqueTogetherError(cnx, rtypes=rtypes) + + mo = re.search('"cstr[a-f0-9]{32}"', arg) + if mo is not None: + # postgresql + raise ViolatedConstraint(cnx, cstrname=mo.group(0)[1:-1]) + if arg.startswith('CHECK constraint failed:'): + # sqlite3 (new) + raise ViolatedConstraint(cnx, cstrname=arg.split(':', 1)[1].strip()) + mo = re.match('^constraint (cstr.*) failed$', arg) + if mo is not None: + # sqlite3 (old) + raise ViolatedConstraint(cnx, cstrname=mo.group(1)) + raise + return cursor + + @statsd_timeit + def doexecmany(self, cnx, query, args): + """Execute a query. + it's a function just so that it shows up in profiling + """ + if server.DEBUG & server.DBG_SQL: + print('execmany', query, 'with', len(args), 'arguments', cnx.cnxset.cnx) + cursor = cnx.cnxset.cu + try: + # str(query) to avoid error if it's a unicode string + cursor.executemany(str(query), args) + except Exception as ex: + if self.repo.config.mode != 'test': + # during test we get those message when trying to alter sqlite + # db schema + self.critical("sql many: %r\n args: %s\ndbms message: %r", + query, args, ex.args[0]) + try: + cnx.cnxset.rollback() + if self.repo.config.mode != 'test': + self.critical('transaction has been rolled back') + except Exception: + pass + raise + + # short cut to method requiring advanced db helper usage ################## + + def update_rdef_column(self, cnx, rdef): + """update physical column for a relation definition (final or inlined) + """ + table, column = rdef_table_column(rdef) + coltype, allownull = rdef_physical_info(self.dbhelper, rdef) + if not self.dbhelper.alter_column_support: + self.error("backend can't alter %s.%s to %s%s", table, column, coltype, + not allownull and 'NOT NULL' or '') + return + self.dbhelper.change_col_type(LogCursor(cnx.cnxset.cu), + table, column, coltype, allownull) + self.info('altered %s.%s: now %s%s', table, column, coltype, + not allownull and 'NOT NULL' or '') + + def update_rdef_null_allowed(self, cnx, rdef): + """update NULL / NOT NULL of physical column for a relation definition + (final or inlined) + """ + if not self.dbhelper.alter_column_support: + # not supported (and NOT NULL not set by yams in that case, so no + # worry) + return + table, column = rdef_table_column(rdef) + coltype, allownull = rdef_physical_info(self.dbhelper, rdef) + self.dbhelper.set_null_allowed(LogCursor(cnx.cnxset.cu), + table, column, coltype, allownull) + + def update_rdef_indexed(self, cnx, rdef): + table, column = rdef_table_column(rdef) + if rdef.indexed: + self.create_index(cnx, table, column) + else: + self.drop_index(cnx, table, column) + + def update_rdef_unique(self, cnx, rdef): + table, column = rdef_table_column(rdef) + if rdef.constraint_by_type('UniqueConstraint'): + self.create_index(cnx, table, column, unique=True) + else: + self.drop_index(cnx, table, column, unique=True) + + def create_index(self, cnx, table, column, unique=False): + cursor = LogCursor(cnx.cnxset.cu) + self.dbhelper.create_index(cursor, table, column, unique) + + def drop_index(self, cnx, table, column, unique=False): + cursor = LogCursor(cnx.cnxset.cu) + self.dbhelper.drop_index(cursor, table, column, unique) + + # system source interface ################################################# + + def _eid_type_source(self, cnx, eid, sql): + try: + res = self.doexec(cnx, sql).fetchone() + if res is not None: + return res + except Exception: + self.exception('failed to query entities table for eid %s', eid) + raise UnknownEid(eid) + + def eid_type_source(self, cnx, eid): # pylint: disable=E0202 + """return a tuple (type, extid, source) for the entity with id """ + sql = 'SELECT type, extid, asource FROM entities WHERE eid=%s' % eid + res = self._eid_type_source(cnx, eid, sql) + if not isinstance(res, list): + res = list(res) + res[-2] = self.decode_extid(res[-2]) + return res + + def eid_type_source_pre_131(self, cnx, eid): + """return a tuple (type, extid, source) for the entity with id """ + sql = 'SELECT type, extid FROM entities WHERE eid=%s' % eid + res = self._eid_type_source(cnx, eid, sql) + if not isinstance(res, list): + res = list(res) + res[-1] = self.decode_extid(res[-1]) + res.append("system") + return res + + def extid2eid(self, cnx, extid): + """get eid from an external id. Return None if no record found.""" + assert isinstance(extid, binary_type) + args = {'x': b64encode(extid).decode('ascii')} + cursor = self.doexec(cnx, + 'SELECT eid FROM entities WHERE extid=%(x)s', + args) + # XXX testing rowcount cause strange bug with sqlite, results are there + # but rowcount is 0 + #if cursor.rowcount > 0: + try: + result = cursor.fetchone() + if result: + return result[0] + except Exception: + pass + cursor = self.doexec(cnx, + 'SELECT eid FROM moved_entities WHERE extid=%(x)s', + args) + try: + result = cursor.fetchone() + if result: + # entity was moved to the system source, return negative + # number to tell the external source to ignore it + return -result[0] + except Exception: + pass + return None + + def _handle_is_relation_sql(self, cnx, sql, attrs): + """ Handler for specific is_relation sql that may be + overwritten in some stores""" + self.doexec(cnx, sql % attrs) + + _handle_insert_entity_sql = doexec + _handle_is_instance_of_sql = _handle_source_relation_sql = _handle_is_relation_sql + + def add_info(self, cnx, entity, source, extid): + """add type and source info for an eid into the system table""" + assert cnx.cnxset is not None + # begin by inserting eid/type/source/extid into the entities table + if extid is not None: + assert isinstance(extid, binary_type) + extid = b64encode(extid).decode('ascii') + attrs = {'type': text_type(entity.cw_etype), 'eid': entity.eid, 'extid': extid, + 'asource': text_type(source.uri)} + self._handle_insert_entity_sql(cnx, self.sqlgen.insert('entities', attrs), attrs) + # insert core relations: is, is_instance_of and cw_source + try: + self._handle_is_relation_sql(cnx, 'INSERT INTO is_relation(eid_from,eid_to) VALUES (%s,%s)', + (entity.eid, eschema_eid(cnx, entity.e_schema))) + except IndexError: + # during schema serialization, skip + pass + else: + for eschema in entity.e_schema.ancestors() + [entity.e_schema]: + self._handle_is_relation_sql(cnx, + 'INSERT INTO is_instance_of_relation(eid_from,eid_to) VALUES (%s,%s)', + (entity.eid, eschema_eid(cnx, eschema))) + if 'CWSource' in self.schema and source.eid is not None: # else, cw < 3.10 + self._handle_is_relation_sql(cnx, 'INSERT INTO cw_source_relation(eid_from,eid_to) VALUES (%s,%s)', + (entity.eid, source.eid)) + # now we can update the full text index + if self.need_fti_indexation(entity.cw_etype): + self.index_entity(cnx, entity=entity) + + def update_info(self, cnx, entity, need_fti_update): + """mark entity as being modified, fulltext reindex if needed""" + if need_fti_update: + # reindex the entity only if this query is updating at least + # one indexable attribute + self.index_entity(cnx, entity=entity) + + def delete_info_multi(self, cnx, entities): + """delete system information on deletion of a list of entities with the + same etype and belinging to the same source + + * update the fti + * remove record from the `entities` table + """ + self.fti_unindex_entities(cnx, entities) + attrs = {'eid': '(%s)' % ','.join([str(_e.eid) for _e in entities])} + self.doexec(cnx, self.sqlgen.delete_many('entities', attrs), attrs) + + # undo support ############################################################# + + def undoable_transactions(self, cnx, ueid=None, **actionfilters): + """See :class:`cubicweb.repoapi.Connection.undoable_transactions`""" + # force filtering to connection's user if not a manager + if not cnx.user.is_in_group('managers'): + ueid = cnx.user.eid + restr = {} + if ueid is not None: + restr['tx_user'] = ueid + sql = self.sqlgen.select('transactions', restr, ('tx_uuid', 'tx_time', 'tx_user')) + if actionfilters: + # we will need subqueries to filter transactions according to + # actions done + tearestr = {} # filters on the tx_entity_actions table + trarestr = {} # filters on the tx_relation_actions table + genrestr = {} # generic filters, appliyable to both table + # unless public explicitly set to false, we only consider public + # actions + if actionfilters.pop('public', True): + genrestr['txa_public'] = True + # put additional filters in trarestr and/or tearestr + for key, val in actionfilters.items(): + if key == 'etype': + # filtering on etype implies filtering on entity actions + # only, and with no eid specified + assert actionfilters.get('action', 'C') in 'CUD' + assert not 'eid' in actionfilters + tearestr['etype'] = text_type(val) + elif key == 'eid': + # eid filter may apply to 'eid' of tx_entity_actions or to + # 'eid_from' OR 'eid_to' of tx_relation_actions + if actionfilters.get('action', 'C') in 'CUD': + tearestr['eid'] = val + if actionfilters.get('action', 'A') in 'AR': + trarestr['eid_from'] = val + trarestr['eid_to'] = val + elif key == 'action': + if val in 'CUD': + tearestr['txa_action'] = text_type(val) + else: + assert val in 'AR' + trarestr['txa_action'] = text_type(val) + else: + raise AssertionError('unknow filter %s' % key) + assert trarestr or tearestr, "can't only filter on 'public'" + subqsqls = [] + # append subqueries to the original query, using EXISTS() + if trarestr or (genrestr and not tearestr): + trarestr.update(genrestr) + trasql = self.sqlgen.select('tx_relation_actions', trarestr, ('1',)) + if 'eid_from' in trarestr: + # replace AND by OR between eid_from/eid_to restriction + trasql = sql_or_clauses(trasql, ['eid_from = %(eid_from)s', + 'eid_to = %(eid_to)s']) + trasql += ' AND transactions.tx_uuid=tx_relation_actions.tx_uuid' + subqsqls.append('EXISTS(%s)' % trasql) + if tearestr or (genrestr and not trarestr): + tearestr.update(genrestr) + teasql = self.sqlgen.select('tx_entity_actions', tearestr, ('1',)) + teasql += ' AND transactions.tx_uuid=tx_entity_actions.tx_uuid' + subqsqls.append('EXISTS(%s)' % teasql) + if restr: + sql += ' AND %s' % ' OR '.join(subqsqls) + else: + sql += ' WHERE %s' % ' OR '.join(subqsqls) + restr.update(trarestr) + restr.update(tearestr) + # we want results ordered by transaction's time descendant + sql += ' ORDER BY tx_time DESC' + cu = self.doexec(cnx, sql, restr) + # turn results into transaction objects + return [tx.Transaction(cnx, *args) for args in cu.fetchall()] + + def tx_info(self, cnx, txuuid): + """See :class:`cubicweb.repoapi.Connection.transaction_info`""" + return tx.Transaction(cnx, txuuid, *self._tx_info(cnx, text_type(txuuid))) + + def tx_actions(self, cnx, txuuid, public): + """See :class:`cubicweb.repoapi.Connection.transaction_actions`""" + txuuid = text_type(txuuid) + self._tx_info(cnx, txuuid) + restr = {'tx_uuid': txuuid} + if public: + restr['txa_public'] = True + # XXX use generator to avoid loading everything in memory? + sql = self.sqlgen.select('tx_entity_actions', restr, + ('txa_action', 'txa_public', 'txa_order', + 'etype', 'eid', 'changes')) + with cnx.ensure_cnx_set: + cu = self.doexec(cnx, sql, restr) + actions = [tx.EntityAction(a,p,o,et,e,c and pickle.loads(self.binary_to_str(c))) + for a,p,o,et,e,c in cu.fetchall()] + sql = self.sqlgen.select('tx_relation_actions', restr, + ('txa_action', 'txa_public', 'txa_order', + 'rtype', 'eid_from', 'eid_to')) + with cnx.ensure_cnx_set: + cu = self.doexec(cnx, sql, restr) + actions += [tx.RelationAction(*args) for args in cu.fetchall()] + return sorted(actions, key=lambda x: x.order) + + def undo_transaction(self, cnx, txuuid): + """See :class:`cubicweb.repoapi.Connection.undo_transaction` + + important note: while undoing of a transaction, only hooks in the + 'integrity', 'activeintegrity' and 'undo' categories are called. + """ + errors = [] + cnx.transaction_data['undoing_uuid'] = txuuid + with cnx.deny_all_hooks_but('integrity', 'activeintegrity', 'undo'): + with cnx.security_enabled(read=False): + for action in reversed(self.tx_actions(cnx, txuuid, False)): + undomethod = getattr(self, '_undo_%s' % action.action.lower()) + errors += undomethod(cnx, action) + # remove the transactions record + self.doexec(cnx, + "DELETE FROM transactions WHERE tx_uuid='%s'" % txuuid) + if errors: + raise UndoTransactionException(txuuid, errors) + else: + return + + def start_undoable_transaction(self, cnx, uuid): + """connection callback to insert a transaction record in the transactions + table when some undoable transaction is started + """ + ueid = cnx.user.eid + attrs = {'tx_uuid': uuid, 'tx_user': ueid, 'tx_time': datetime.utcnow()} + self.doexec(cnx, self.sqlgen.insert('transactions', attrs), attrs) + + def _save_attrs(self, cnx, entity, attrs): + """return a pickleable dictionary containing current values for given + attributes of the entity + """ + restr = {'cw_eid': entity.eid} + sql = self.sqlgen.select(SQL_PREFIX + entity.cw_etype, restr, attrs) + cu = self.doexec(cnx, sql, restr) + values = dict(zip(attrs, cu.fetchone())) + # ensure backend specific binary are converted back to string + eschema = entity.e_schema + for column in attrs: + # [3:] remove 'cw_' prefix + attr = column[3:] + if not eschema.subjrels[attr].final: + continue + if eschema.destination(attr) in ('Password', 'Bytes'): + value = values[column] + if value is not None: + values[column] = self.binary_to_str(value) + return values + + def _record_tx_action(self, cnx, table, action, **kwargs): + """record a transaction action in the given table (either + 'tx_entity_actions' or 'tx_relation_action') + """ + kwargs['tx_uuid'] = cnx.transaction_uuid() + kwargs['txa_action'] = action + kwargs['txa_order'] = cnx.transaction_inc_action_counter() + kwargs['txa_public'] = not cnx.hooks_in_progress + self.doexec(cnx, self.sqlgen.insert(table, kwargs), kwargs) + + def _tx_info(self, cnx, txuuid): + """return transaction's time and user of the transaction with the given uuid. + + raise `NoSuchTransaction` if there is no such transaction of if the + connection's user isn't allowed to see it. + """ + restr = {'tx_uuid': txuuid} + sql = self.sqlgen.select('transactions', restr, + ('tx_time', 'tx_user')) + cu = self.doexec(cnx, sql, restr) + try: + time, ueid = cu.fetchone() + except TypeError: + raise tx.NoSuchTransaction(txuuid) + if not (cnx.user.is_in_group('managers') + or cnx.user.eid == ueid): + raise tx.NoSuchTransaction(txuuid) + return time, ueid + + def _reedit_entity(self, entity, changes, err): + cnx = entity._cw + eid = entity.eid + entity.cw_edited = edited = EditedEntity(entity) + # check for schema changes, entities linked through inlined relation + # still exists, rewrap binary values + eschema = entity.e_schema + getrschema = eschema.subjrels + for column, value in changes.items(): + rtype = column[len(SQL_PREFIX):] + if rtype == "eid": + continue # XXX should even `eid` be stored in action changes? + try: + rschema = getrschema[rtype] + except KeyError: + err(cnx._("can't restore relation %(rtype)s of entity %(eid)s, " + "this relation does not exist in the schema anymore.") + % {'rtype': rtype, 'eid': eid}) + if not rschema.final: + if not rschema.inlined: + assert value is None + # rschema is an inlined relation + elif value is not None: + # not a deletion: we must put something in edited + try: + entity._cw.entity_from_eid(value) # check target exists + edited[rtype] = value + except UnknownEid: + err(cnx._("can't restore entity %(eid)s of type %(eschema)s, " + "target of %(rtype)s (eid %(value)s) does not exist any longer") + % locals()) + changes[column] = None + elif eschema.destination(rtype) in ('Bytes', 'Password'): + changes[column] = self._binary(value) + edited[rtype] = Binary(value) + elif PY2 and isinstance(value, str): + edited[rtype] = text_type(value, cnx.encoding, 'replace') + else: + edited[rtype] = value + # This must only be done after init_entitiy_caches : defered in calling functions + # edited.check() + + def _undo_d(self, cnx, action): + """undo an entity deletion""" + errors = [] + err = errors.append + eid = action.eid + etype = action.etype + _ = cnx._ + # get an entity instance + try: + entity = self.repo.vreg['etypes'].etype_class(etype)(cnx) + except Exception: + err("can't restore entity %s of type %s, type no more supported" + % (eid, etype)) + return errors + self._reedit_entity(entity, action.changes, err) + entity.eid = eid + cnx.repo.init_entity_caches(cnx, entity, self) + entity.cw_edited.check() + self.repo.hm.call_hooks('before_add_entity', cnx, entity=entity) + # restore the entity + action.changes['cw_eid'] = eid + # restore record in entities (will update fti if needed) + self.add_info(cnx, entity, self, None) + sql = self.sqlgen.insert(SQL_PREFIX + etype, action.changes) + self.doexec(cnx, sql, action.changes) + self.repo.hm.call_hooks('after_add_entity', cnx, entity=entity) + return errors + + def _undo_r(self, cnx, action): + """undo a relation removal""" + errors = [] + subj, rtype, obj = action.eid_from, action.rtype, action.eid_to + try: + sentity, oentity, rdef = _undo_rel_info(cnx, subj, rtype, obj) + except _UndoException as ex: + errors.append(text_type(ex)) + else: + for role, entity in (('subject', sentity), + ('object', oentity)): + try: + _undo_check_relation_target(entity, rdef, role) + except _UndoException as ex: + errors.append(text_type(ex)) + continue + if not errors: + self.repo.hm.call_hooks('before_add_relation', cnx, + eidfrom=subj, rtype=rtype, eidto=obj) + # add relation in the database + self._add_relations(cnx, rtype, [(subj, obj)], rdef.rtype.inlined) + # set related cache + cnx.update_rel_cache_add(subj, rtype, obj, rdef.rtype.symmetric) + self.repo.hm.call_hooks('after_add_relation', cnx, + eidfrom=subj, rtype=rtype, eidto=obj) + return errors + + def _undo_c(self, cnx, action): + """undo an entity creation""" + eid = action.eid + # XXX done to avoid fetching all remaining relation for the entity + # we should find an efficient way to do this (keeping current veolidf + # massive deletion performance) + if _undo_has_later_transaction(cnx, eid): + msg = cnx._('some later transaction(s) touch entity, undo them ' + 'first') + raise ValidationError(eid, {None: msg}) + etype = action.etype + # get an entity instance + try: + entity = self.repo.vreg['etypes'].etype_class(etype)(cnx) + except Exception: + return [cnx._( + "Can't undo creation of entity %(eid)s of type %(etype)s, type " + "no more supported" % {'eid': eid, 'etype': etype})] + entity.eid = eid + # for proper eid/type cache update + CleanupDeletedEidsCacheOp.get_instance(cnx).add_data(eid) + self.repo.hm.call_hooks('before_delete_entity', cnx, entity=entity) + # remove is / is_instance_of which are added using sql by hooks, hence + # unvisible as transaction action + self.doexec(cnx, 'DELETE FROM is_relation WHERE eid_from=%s' % eid) + self.doexec(cnx, 'DELETE FROM is_instance_of_relation WHERE eid_from=%s' % eid) + self.doexec(cnx, 'DELETE FROM cw_source_relation WHERE eid_from=%s' % eid) + # XXX check removal of inlined relation? + # delete the entity + attrs = {'cw_eid': eid} + sql = self.sqlgen.delete(SQL_PREFIX + entity.cw_etype, attrs) + self.doexec(cnx, sql, attrs) + # remove record from entities (will update fti if needed) + self.delete_info_multi(cnx, [entity]) + self.repo.hm.call_hooks('after_delete_entity', cnx, entity=entity) + return () + + def _undo_u(self, cnx, action): + """undo an entity update""" + errors = [] + err = errors.append + try: + entity = cnx.entity_from_eid(action.eid) + except UnknownEid: + err(cnx._("can't restore state of entity %s, it has been " + "deleted inbetween") % action.eid) + return errors + self._reedit_entity(entity, action.changes, err) + entity.cw_edited.check() + self.repo.hm.call_hooks('before_update_entity', cnx, entity=entity) + sql = self.sqlgen.update(SQL_PREFIX + entity.cw_etype, action.changes, + ['cw_eid']) + self.doexec(cnx, sql, action.changes) + self.repo.hm.call_hooks('after_update_entity', cnx, entity=entity) + return errors + + def _undo_a(self, cnx, action): + """undo a relation addition""" + errors = [] + subj, rtype, obj = action.eid_from, action.rtype, action.eid_to + try: + sentity, oentity, rdef = _undo_rel_info(cnx, subj, rtype, obj) + except _UndoException as ex: + errors.append(text_type(ex)) + else: + rschema = rdef.rtype + if rschema.inlined: + sql = 'SELECT 1 FROM cw_%s WHERE cw_eid=%s and cw_%s=%s'\ + % (sentity.cw_etype, subj, rtype, obj) + else: + sql = 'SELECT 1 FROM %s_relation WHERE eid_from=%s and eid_to=%s'\ + % (rtype, subj, obj) + cu = self.doexec(cnx, sql) + if cu.fetchone() is None: + errors.append(cnx._( + "Can't undo addition of relation %(rtype)s from %(subj)s to" + " %(obj)s, doesn't exist anymore" % locals())) + if not errors: + self.repo.hm.call_hooks('before_delete_relation', cnx, + eidfrom=subj, rtype=rtype, eidto=obj) + # delete relation from the database + self._delete_relation(cnx, subj, rtype, obj, rschema.inlined) + # set related cache + cnx.update_rel_cache_del(subj, rtype, obj, rschema.symmetric) + self.repo.hm.call_hooks('after_delete_relation', cnx, + eidfrom=subj, rtype=rtype, eidto=obj) + return errors + + # full text index handling ################################################# + + @cached + def need_fti_indexation(self, etype): + eschema = self.schema.eschema(etype) + if any(eschema.indexable_attributes()): + return True + if any(eschema.fulltext_containers()): + return True + return False + + def index_entity(self, cnx, entity): + """create an operation to [re]index textual content of the given entity + on commit + """ + if self.do_fti: + FTIndexEntityOp.get_instance(cnx).add_data(entity.eid) + + def fti_unindex_entities(self, cnx, entities): + """remove text content for entities from the full text index + """ + cursor = cnx.cnxset.cu + cursor_unindex_object = self.dbhelper.cursor_unindex_object + try: + for entity in entities: + cursor_unindex_object(entity.eid, cursor) + except Exception: # let KeyboardInterrupt / SystemExit propagate + self.exception('error while unindexing %s', entity) + + + def fti_index_entities(self, cnx, entities): + """add text content of created/modified entities to the full text index + """ + cursor_index_object = self.dbhelper.cursor_index_object + cursor = cnx.cnxset.cu + try: + # use cursor_index_object, not cursor_reindex_object since + # unindexing done in the FTIndexEntityOp + for entity in entities: + cursor_index_object(entity.eid, + entity.cw_adapt_to('IFTIndexable'), + cursor) + except Exception: # let KeyboardInterrupt / SystemExit propagate + self.exception('error while indexing %s', entity) + + +class FTIndexEntityOp(hook.DataOperationMixIn, hook.LateOperation): + """operation to delay entity full text indexation to commit + + since fti indexing may trigger discovery of other entities, it should be + triggered on precommit, not commit, and this should be done after other + precommit operation which may add relations to the entity + """ + + def precommit_event(self): + cnx = self.cnx + source = cnx.repo.system_source + pendingeids = cnx.transaction_data.get('pendingeids', ()) + done = cnx.transaction_data.setdefault('indexedeids', set()) + to_reindex = set() + for eid in self.get_data(): + if eid in pendingeids or eid in done: + # entity added and deleted in the same transaction or already + # processed + continue + done.add(eid) + iftindexable = cnx.entity_from_eid(eid).cw_adapt_to('IFTIndexable') + to_reindex |= set(iftindexable.fti_containers()) + source.fti_unindex_entities(cnx, to_reindex) + source.fti_index_entities(cnx, to_reindex) + +def sql_schema(driver): + helper = get_db_helper(driver) + typemap = helper.TYPE_MAPPING + schema = """ +/* Create the repository's system database */ + +%s + +CREATE TABLE entities ( + eid INTEGER PRIMARY KEY NOT NULL, + type VARCHAR(64) NOT NULL, + asource VARCHAR(128) NOT NULL, + extid VARCHAR(256) +);; +CREATE INDEX entities_type_idx ON entities(type);; +CREATE TABLE moved_entities ( + eid INTEGER PRIMARY KEY NOT NULL, + extid VARCHAR(256) UNIQUE NOT NULL +);; + +CREATE TABLE transactions ( + tx_uuid CHAR(32) PRIMARY KEY NOT NULL, + tx_user INTEGER NOT NULL, + tx_time %s NOT NULL +);; +CREATE INDEX transactions_tx_user_idx ON transactions(tx_user);; +CREATE INDEX transactions_tx_time_idx ON transactions(tx_time);; + +CREATE TABLE tx_entity_actions ( + tx_uuid CHAR(32) REFERENCES transactions(tx_uuid) ON DELETE CASCADE, + txa_action CHAR(1) NOT NULL, + txa_public %s NOT NULL, + txa_order INTEGER, + eid INTEGER NOT NULL, + etype VARCHAR(64) NOT NULL, + changes %s +);; +CREATE INDEX tx_entity_actions_txa_action_idx ON tx_entity_actions(txa_action);; +CREATE INDEX tx_entity_actions_txa_public_idx ON tx_entity_actions(txa_public);; +CREATE INDEX tx_entity_actions_eid_idx ON tx_entity_actions(eid);; +CREATE INDEX tx_entity_actions_etype_idx ON tx_entity_actions(etype);; +CREATE INDEX tx_entity_actions_tx_uuid_idx ON tx_entity_actions(tx_uuid);; + +CREATE TABLE tx_relation_actions ( + tx_uuid CHAR(32) REFERENCES transactions(tx_uuid) ON DELETE CASCADE, + txa_action CHAR(1) NOT NULL, + txa_public %s NOT NULL, + txa_order INTEGER, + eid_from INTEGER NOT NULL, + eid_to INTEGER NOT NULL, + rtype VARCHAR(256) NOT NULL +);; +CREATE INDEX tx_relation_actions_txa_action_idx ON tx_relation_actions(txa_action);; +CREATE INDEX tx_relation_actions_txa_public_idx ON tx_relation_actions(txa_public);; +CREATE INDEX tx_relation_actions_eid_from_idx ON tx_relation_actions(eid_from);; +CREATE INDEX tx_relation_actions_eid_to_idx ON tx_relation_actions(eid_to);; +CREATE INDEX tx_relation_actions_tx_uuid_idx ON tx_relation_actions(tx_uuid);; +""" % (helper.sql_create_numrange('entities_id_seq').replace(';', ';;'), + typemap['Datetime'], + typemap['Boolean'], typemap['Bytes'], typemap['Boolean']) + if helper.backend_name == 'sqlite': + # sqlite support the ON DELETE CASCADE syntax but do nothing + schema += ''' +CREATE TRIGGER fkd_transactions +BEFORE DELETE ON transactions +FOR EACH ROW BEGIN + DELETE FROM tx_entity_actions WHERE tx_uuid=OLD.tx_uuid; + DELETE FROM tx_relation_actions WHERE tx_uuid=OLD.tx_uuid; +END;; +''' + schema += ';;'.join(helper.sqls_create_multicol_unique_index('entities', ['extid'])) + schema += ';;\n' + return schema + + +def sql_drop_schema(driver): + helper = get_db_helper(driver) + return """ +%s; +%s +DROP TABLE entities; +DROP TABLE tx_entity_actions; +DROP TABLE tx_relation_actions; +DROP TABLE transactions; +""" % (';'.join(helper.sqls_drop_multicol_unique_index('entities', ['extid'])), + helper.sql_drop_numrange('entities_id_seq')) + + +def grant_schema(user, set_owner=True): + result = '' + for table in ('entities', 'entities_id_seq', + 'transactions', 'tx_entity_actions', 'tx_relation_actions'): + if set_owner: + result = 'ALTER TABLE %s OWNER TO %s;\n' % (table, user) + result += 'GRANT ALL ON %s TO %s;\n' % (table, user) + return result + + +class BaseAuthentifier(object): + + def __init__(self, source=None): + self.source = source + + def set_schema(self, schema): + """set the instance'schema""" + pass + +class LoginPasswordAuthentifier(BaseAuthentifier): + passwd_rql = 'Any P WHERE X is CWUser, X login %(login)s, X upassword P' + auth_rql = (u'Any X WHERE X is CWUser, X login %(login)s, X upassword %(pwd)s, ' + 'X cw_source S, S name "system"') + _sols = ({'X': 'CWUser', 'P': 'Password', 'S': 'CWSource'},) + + def set_schema(self, schema): + """set the instance'schema""" + if 'CWUser' in schema: # probably an empty schema if not true... + # rql syntax trees used to authenticate users + self._passwd_rqlst = self.source.compile_rql(self.passwd_rql, self._sols) + self._auth_rqlst = self.source.compile_rql(self.auth_rql, self._sols) + + def authenticate(self, cnx, login, password=None, **kwargs): + """return CWUser eid for the given login/password if this account is + defined in this source, else raise `AuthenticationError` + + two queries are needed since passwords are stored crypted, so we have + to fetch the salt first + """ + args = {'login': login, 'pwd' : None} + if password is not None: + rset = self.source.syntax_tree_search(cnx, self._passwd_rqlst, args) + try: + pwd = rset[0][0] + except IndexError: + raise AuthenticationError('bad login') + if pwd is None: + # if pwd is None but a password is provided, something is wrong + raise AuthenticationError('bad password') + # passwords are stored using the Bytes type, so we get a StringIO + args['pwd'] = Binary(crypt_password(password, pwd.getvalue())) + # get eid from login and (crypted) password + rset = self.source.syntax_tree_search(cnx, self._auth_rqlst, args) + pwd = args['pwd'] + try: + user = rset[0][0] + # If the stored hash uses a deprecated scheme (e.g. DES or MD5 used + # before 3.14.7), update with a fresh one + if pwd is not None and pwd.getvalue(): + verify, newhash = verify_and_update(password, pwd.getvalue()) + if not verify: # should not happen, but... + raise AuthenticationError('bad password') + if newhash: + cnx.system_sql("UPDATE %s SET %s=%%(newhash)s WHERE %s=%%(login)s" % ( + SQL_PREFIX + 'CWUser', + SQL_PREFIX + 'upassword', + SQL_PREFIX + 'login'), + {'newhash': self.source._binary(newhash.encode('ascii')), + 'login': login}) + cnx.commit() + return user + except IndexError: + raise AuthenticationError('bad password') + + +class EmailPasswordAuthentifier(BaseAuthentifier): + def authenticate(self, cnx, login, **authinfo): + # email_auth flag prevent from infinite recursion (call to + # repo.check_auth_info at the end of this method may lead us here again) + if not '@' in login or authinfo.pop('email_auth', None): + raise AuthenticationError('not an email') + rset = cnx.execute('Any L WHERE U login L, U primary_email M, ' + 'M address %(login)s', {'login': login}, + build_descr=False) + if rset.rowcount != 1: + raise AuthenticationError('unexisting email') + login = rset.rows[0][0] + authinfo['email_auth'] = True + return self.source.repo.check_auth_info(cnx, login, authinfo) + + +class DatabaseIndependentBackupRestore(object): + """Helper class to perform db backend agnostic backup and restore + + The backup and restore methods are used to dump / restore the + system database in a database independent format. The file is a + Zip archive containing the following files: + + * format.txt: the format of the archive. Currently '1.1' + * tables.txt: list of filenames in the archive tables/ directory + * sequences.txt: list of filenames in the archive sequences/ directory + * numranges.txt: list of filenames in the archive numrange/ directory + * versions.txt: the list of cube versions from CWProperty + * tables/.: pickled data + * sequences/: pickled data + + The pickled data format for tables, numranges and sequences is a tuple of 3 elements: + * the table name + * a tuple of column names + * a list of rows (as tuples with one element per column) + + Tables are saved in chunks in different files in order to prevent + a too high memory consumption. + """ + blocksize = 100 + + def __init__(self, source): + """ + :param: source an instance of the system source + """ + self._source = source + self.logger = logging.getLogger('cubicweb.ctl') + self.logger.setLevel(logging.INFO) + self.logger.addHandler(logging.StreamHandler(sys.stdout)) + self.schema = self._source.schema + self.dbhelper = self._source.dbhelper + self.cnx = None + self.cursor = None + self.sql_generator = sqlgen.SQLGenerator() + + def get_connection(self): + return self._source.get_connection() + + def backup(self, backupfile): + archive = zipfile.ZipFile(backupfile, 'w', allowZip64=True) + self.cnx = self.get_connection() + try: + self.cursor = self.cnx.cursor() + self.cursor.arraysize = 100 + self.logger.info('writing metadata') + self.write_metadata(archive) + for seq in self.get_sequences(): + self.logger.info('processing sequence %s', seq) + self.write_sequence(archive, seq) + for numrange in self.get_numranges(): + self.logger.info('processing numrange %s', numrange) + self.write_numrange(archive, numrange) + for table in self.get_tables(): + self.logger.info('processing table %s', table) + self.write_table(archive, table) + finally: + archive.close() + self.cnx.close() + self.logger.info('done') + + def get_tables(self): + non_entity_tables = ['entities', + 'transactions', + 'tx_entity_actions', + 'tx_relation_actions', + ] + etype_tables = [] + relation_tables = [] + prefix = 'cw_' + for etype in self.schema.entities(): + eschema = self.schema.eschema(etype) + if eschema.final: + continue + etype_tables.append('%s%s'%(prefix, etype)) + for rtype in self.schema.relations(): + rschema = self.schema.rschema(rtype) + if rschema.final or rschema.inlined or rschema in VIRTUAL_RTYPES: + continue + relation_tables.append('%s_relation' % rtype) + return non_entity_tables + etype_tables + relation_tables + + def get_sequences(self): + return [] + + def get_numranges(self): + return ['entities_id_seq'] + + def write_metadata(self, archive): + archive.writestr('format.txt', '1.1') + archive.writestr('tables.txt', '\n'.join(self.get_tables())) + archive.writestr('sequences.txt', '\n'.join(self.get_sequences())) + archive.writestr('numranges.txt', '\n'.join(self.get_numranges())) + versions = self._get_versions() + versions_str = '\n'.join('%s %s' % (k, v) + for k, v in versions) + archive.writestr('versions.txt', versions_str) + + def write_sequence(self, archive, seq): + sql = self.dbhelper.sql_sequence_current_state(seq) + columns, rows_iterator = self._get_cols_and_rows(sql) + rows = list(rows_iterator) + serialized = self._serialize(seq, columns, rows) + archive.writestr('sequences/%s' % seq, serialized) + + def write_numrange(self, archive, numrange): + sql = self.dbhelper.sql_numrange_current_state(numrange) + columns, rows_iterator = self._get_cols_and_rows(sql) + rows = list(rows_iterator) + serialized = self._serialize(numrange, columns, rows) + archive.writestr('numrange/%s' % numrange, serialized) + + def write_table(self, archive, table): + nb_lines_sql = 'SELECT COUNT(*) FROM %s' % table + self.cursor.execute(nb_lines_sql) + rowcount = self.cursor.fetchone()[0] + sql = 'SELECT * FROM %s' % table + columns, rows_iterator = self._get_cols_and_rows(sql) + self.logger.info('number of rows: %d', rowcount) + blocksize = self.blocksize + if rowcount > 0: + for i, start in enumerate(range(0, rowcount, blocksize)): + rows = list(itertools.islice(rows_iterator, blocksize)) + serialized = self._serialize(table, columns, rows) + archive.writestr('tables/%s.%04d' % (table, i), serialized) + self.logger.debug('wrote rows %d to %d (out of %d) to %s.%04d', + start, start+len(rows)-1, + rowcount, + table, i) + else: + rows = [] + serialized = self._serialize(table, columns, rows) + archive.writestr('tables/%s.%04d' % (table, 0), serialized) + + def _get_cols_and_rows(self, sql): + process_result = self._source.iter_process_result + self.cursor.execute(sql) + columns = (d[0] for d in self.cursor.description) + rows = process_result(self.cursor) + return tuple(columns), rows + + def _serialize(self, name, columns, rows): + return pickle.dumps((name, columns, rows), pickle.HIGHEST_PROTOCOL) + + def restore(self, backupfile): + archive = zipfile.ZipFile(backupfile, 'r', allowZip64=True) + self.cnx = self.get_connection() + self.cursor = self.cnx.cursor() + sequences, numranges, tables, table_chunks = self.read_metadata(archive, backupfile) + for seq in sequences: + self.logger.info('restoring sequence %s', seq) + self.read_sequence(archive, seq) + for numrange in numranges: + self.logger.info('restoring numrange %s', numrange) + self.read_numrange(archive, numrange) + for table in tables: + self.logger.info('restoring table %s', table) + self.read_table(archive, table, sorted(table_chunks[table])) + self.cnx.close() + archive.close() + self.logger.info('done') + + def read_metadata(self, archive, backupfile): + formatinfo = archive.read('format.txt') + self.logger.info('checking metadata') + if formatinfo.strip() != "1.1": + self.logger.critical('Unsupported format in archive: %s', formatinfo) + raise ValueError('Unknown format in %s: %s' % (backupfile, formatinfo)) + tables = archive.read('tables.txt').splitlines() + sequences = archive.read('sequences.txt').splitlines() + numranges = archive.read('numranges.txt').splitlines() + file_versions = self._parse_versions(archive.read('versions.txt')) + versions = set(self._get_versions()) + if file_versions != versions: + self.logger.critical('Unable to restore : versions do not match') + self.logger.critical('Expected:\n%s', '\n'.join('%s : %s' % (cube, ver) + for cube, ver in sorted(versions))) + self.logger.critical('Found:\n%s', '\n'.join('%s : %s' % (cube, ver) + for cube, ver in sorted(file_versions))) + raise ValueError('Unable to restore : versions do not match') + table_chunks = {} + for name in archive.namelist(): + if not name.startswith('tables/'): + continue + filename = basename(name) + tablename, _ext = filename.rsplit('.', 1) + table_chunks.setdefault(tablename, []).append(name) + return sequences, numranges, tables, table_chunks + + def read_sequence(self, archive, seq): + seqname, columns, rows = pickle.loads(archive.read('sequences/%s' % seq)) + assert seqname == seq + assert len(rows) == 1 + assert len(rows[0]) == 1 + value = rows[0][0] + sql = self.dbhelper.sql_restart_sequence(seq, value) + self.cursor.execute(sql) + self.cnx.commit() + + def read_numrange(self, archive, numrange): + rangename, columns, rows = pickle.loads(archive.read('numrange/%s' % numrange)) + assert rangename == numrange + assert len(rows) == 1 + assert len(rows[0]) == 1 + value = rows[0][0] + sql = self.dbhelper.sql_restart_numrange(numrange, value) + self.cursor.execute(sql) + self.cnx.commit() + + def read_table(self, archive, table, filenames): + merge_args = self._source.merge_args + self.cursor.execute('DELETE FROM %s' % table) + self.cnx.commit() + row_count = 0 + for filename in filenames: + tablename, columns, rows = pickle.loads(archive.read(filename)) + assert tablename == table + if not rows: + continue + insert = self.sql_generator.insert(table, + dict(zip(columns, rows[0]))) + for row in rows: + self.cursor.execute(insert, merge_args(dict(zip(columns, row)), {})) + row_count += len(rows) + self.cnx.commit() + self.logger.info('inserted %d rows', row_count) + + + def _parse_versions(self, version_str): + versions = set() + for line in version_str.splitlines(): + versions.add(tuple(line.split())) + return versions + + def _get_versions(self): + version_sql = 'SELECT cw_pkey, cw_value FROM cw_CWProperty' + versions = [] + self.cursor.execute(version_sql) + for pkey, value in self.cursor.fetchall(): + if pkey.startswith(u'system.version'): + versions.append((pkey, value)) + return versions