diff -r 058bb3dc685f -r 0b59724cb3f2 server/sources/native.py
--- a/server/sources/native.py Mon Jan 04 18:40:30 2016 +0100
+++ /dev/null Thu Jan 01 00:00:00 1970 +0000
@@ -1,1813 +0,0 @@
-# copyright 2003-2014 LOGILAB S.A. (Paris, FRANCE), all rights reserved.
-# contact http://www.logilab.fr/ -- mailto:contact@logilab.fr
-#
-# This file is part of CubicWeb.
-#
-# CubicWeb is free software: you can redistribute it and/or modify it under the
-# terms of the GNU Lesser General Public License as published by the Free
-# Software Foundation, either version 2.1 of the License, or (at your option)
-# any later version.
-#
-# CubicWeb is distributed in the hope that it will be useful, but WITHOUT
-# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
-# FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
-# details.
-#
-# You should have received a copy of the GNU Lesser General Public License along
-# with CubicWeb. If not, see .
-"""Adapters for native cubicweb sources.
-
-Notes:
-* extid (aka external id, the primary key of an entity in the external source
- from which it comes from) are stored in a varchar column encoded as a base64
- string. This is because it should actually be Bytes but we want an index on
- it for fast querying.
-"""
-from __future__ import print_function
-
-__docformat__ = "restructuredtext en"
-
-from threading import Lock
-from datetime import datetime
-from base64 import b64encode
-from contextlib import contextmanager
-from os.path import basename
-import re
-import itertools
-import zipfile
-import logging
-import sys
-
-from six import PY2, text_type, binary_type, string_types
-from six.moves import range, cPickle as pickle
-
-from logilab.common.decorators import cached, clear_cache
-from logilab.common.configuration import Method
-from logilab.common.shellutils import getlogin
-from logilab.database import get_db_helper, sqlgen
-
-from yams.schema import role_name
-
-from cubicweb import (UnknownEid, AuthenticationError, ValidationError, Binary,
- UniqueTogetherError, UndoTransactionException, ViolatedConstraint)
-from cubicweb import transaction as tx, server, neg_role
-from cubicweb.utils import QueryCache
-from cubicweb.schema import VIRTUAL_RTYPES
-from cubicweb.cwconfig import CubicWebNoAppConfiguration
-from cubicweb.server import hook
-from cubicweb.server import schema2sql as y2sql
-from cubicweb.server.utils import crypt_password, eschema_eid, verify_and_update
-from cubicweb.server.sqlutils import SQL_PREFIX, SQLAdapterMixIn
-from cubicweb.server.rqlannotation import set_qdata
-from cubicweb.server.hook import CleanupDeletedEidsCacheOp
-from cubicweb.server.edition import EditedEntity
-from cubicweb.server.sources import AbstractSource, dbg_st_search, dbg_results
-from cubicweb.server.sources.rql2sql import SQLGenerator
-from cubicweb.statsd_logger import statsd_timeit
-
-
-ATTR_MAP = {}
-NONSYSTEM_ETYPES = set()
-NONSYSTEM_RELATIONS = set()
-
-class LogCursor(object):
- def __init__(self, cursor):
- self.cu = cursor
-
- def execute(self, query, args=None):
- """Execute a query.
- it's a function just so that it shows up in profiling
- """
- if server.DEBUG & server.DBG_SQL:
- print('exec', query, args)
- try:
- self.cu.execute(str(query), args)
- except Exception as ex:
- print("sql: %r\n args: %s\ndbms message: %r" % (
- query, args, ex.args[0]))
- raise
-
- def fetchall(self):
- return self.cu.fetchall()
-
- def fetchone(self):
- return self.cu.fetchone()
-
-
-def sql_or_clauses(sql, clauses):
- select, restr = sql.split(' WHERE ', 1)
- restrclauses = restr.split(' AND ')
- for clause in clauses:
- restrclauses.remove(clause)
- if restrclauses:
- restr = '%s AND (%s)' % (' AND '.join(restrclauses),
- ' OR '.join(clauses))
- else:
- restr = '(%s)' % ' OR '.join(clauses)
- return '%s WHERE %s' % (select, restr)
-
-
-def rdef_table_column(rdef):
- """return table and column used to store the given relation definition in
- the database
- """
- return (SQL_PREFIX + str(rdef.subject),
- SQL_PREFIX + str(rdef.rtype))
-
-
-def rdef_physical_info(dbhelper, rdef):
- """return backend type and a boolean flag if NULL values should be allowed
- for a given relation definition
- """
- if not rdef.object.final:
- return dbhelper.TYPE_MAPPING['Int']
- coltype = y2sql.type_from_rdef(dbhelper, rdef, creating=False)
- allownull = rdef.cardinality[0] != '1'
- return coltype, allownull
-
-
-class _UndoException(Exception):
- """something went wrong during undoing"""
-
- def __unicode__(self):
- """Called by the unicode builtin; should return a Unicode object
-
- Type of _UndoException message must be `unicode` by design in CubicWeb.
- """
- assert isinstance(self.args[0], text_type)
- return self.args[0]
-
-
-def _undo_check_relation_target(tentity, rdef, role):
- """check linked entity has not been redirected for this relation"""
- card = rdef.role_cardinality(role)
- if card in '?1' and tentity.related(rdef.rtype, role):
- raise _UndoException(tentity._cw._(
- "Can't restore %(role)s relation %(rtype)s to entity %(eid)s which "
- "is already linked using this relation.")
- % {'role': neg_role(role),
- 'rtype': rdef.rtype,
- 'eid': tentity.eid})
-
-def _undo_rel_info(cnx, subj, rtype, obj):
- entities = []
- for role, eid in (('subject', subj), ('object', obj)):
- try:
- entities.append(cnx.entity_from_eid(eid))
- except UnknownEid:
- raise _UndoException(cnx._(
- "Can't restore relation %(rtype)s, %(role)s entity %(eid)s"
- " doesn't exist anymore.")
- % {'role': cnx._(role),
- 'rtype': cnx._(rtype),
- 'eid': eid})
- sentity, oentity = entities
- try:
- rschema = cnx.vreg.schema.rschema(rtype)
- rdef = rschema.rdefs[(sentity.cw_etype, oentity.cw_etype)]
- except KeyError:
- raise _UndoException(cnx._(
- "Can't restore relation %(rtype)s between %(subj)s and "
- "%(obj)s, that relation does not exists anymore in the "
- "schema.")
- % {'rtype': cnx._(rtype),
- 'subj': subj,
- 'obj': obj})
- return sentity, oentity, rdef
-
-def _undo_has_later_transaction(cnx, eid):
- return cnx.system_sql('''\
-SELECT T.tx_uuid FROM transactions AS TREF, transactions AS T
-WHERE TREF.tx_uuid='%(txuuid)s' AND T.tx_uuid!='%(txuuid)s'
-AND T.tx_time>=TREF.tx_time
-AND (EXISTS(SELECT 1 FROM tx_entity_actions AS TEA
- WHERE TEA.tx_uuid=T.tx_uuid AND TEA.eid=%(eid)s)
- OR EXISTS(SELECT 1 FROM tx_relation_actions as TRA
- WHERE TRA.tx_uuid=T.tx_uuid AND (
- TRA.eid_from=%(eid)s OR TRA.eid_to=%(eid)s))
- )''' % {'txuuid': cnx.transaction_data['undoing_uuid'],
- 'eid': eid}).fetchone()
-
-
-class DefaultEidGenerator(object):
- __slots__ = ('source', 'cnx', 'lock')
-
- def __init__(self, source):
- self.source = source
- self.cnx = None
- self.lock = Lock()
-
- def close(self):
- if self.cnx:
- self.cnx.close()
- self.cnx = None
-
- def create_eid(self, _cnx, count=1):
- # lock needed to prevent 'Connection is busy with results for another
- # command (0)' errors with SQLServer
- assert count > 0
- with self.lock:
- return self._create_eid(count)
-
- def _create_eid(self, count):
- # internal function doing the eid creation without locking.
- # needed for the recursive handling of disconnections (otherwise we
- # deadlock on self._eid_cnx_lock
- source = self.source
- if self.cnx is None:
- self.cnx = source.get_connection()
- cnx = self.cnx
- try:
- cursor = cnx.cursor()
- for sql in source.dbhelper.sqls_increment_numrange('entities_id_seq', count):
- cursor.execute(sql)
- eid = cursor.fetchone()[0]
- except (source.OperationalError, source.InterfaceError):
- # FIXME: better detection of deconnection pb
- source.warning("trying to reconnect create eid connection")
- self.cnx = None
- return self._create_eid(count)
- except source.DbapiError as exc:
- # We get this one with pyodbc and SQL Server when connection was reset
- if exc.args[0] == '08S01':
- source.warning("trying to reconnect create eid connection")
- self.cnx = None
- return self._create_eid(count)
- else:
- raise
- except Exception: # WTF?
- cnx.rollback()
- self.cnx = None
- source.exception('create eid failed in an unforeseen way on SQL statement %s', sql)
- raise
- else:
- cnx.commit()
- return eid
-
-
-class SQLITEEidGenerator(object):
- __slots__ = ('source', 'lock')
-
- def __init__(self, source):
- self.source = source
- self.lock = Lock()
-
- def close(self):
- pass
-
- def create_eid(self, cnx, count=1):
- assert count > 0
- source = self.source
- with self.lock:
- for sql in source.dbhelper.sqls_increment_numrange('entities_id_seq', count):
- cursor = source.doexec(cnx, sql)
- return cursor.fetchone()[0]
-
-
-class NativeSQLSource(SQLAdapterMixIn, AbstractSource):
- """adapter for source using the native cubicweb schema (see below)
- """
- sqlgen_class = SQLGenerator
- options = (
- ('db-driver',
- {'type' : 'string',
- 'default': 'postgres',
- # XXX use choice type
- 'help': 'database driver (postgres, sqlite, sqlserver2005)',
- 'group': 'native-source', 'level': 0,
- }),
- ('db-host',
- {'type' : 'string',
- 'default': '',
- 'help': 'database host',
- 'group': 'native-source', 'level': 1,
- }),
- ('db-port',
- {'type' : 'string',
- 'default': '',
- 'help': 'database port',
- 'group': 'native-source', 'level': 1,
- }),
- ('db-name',
- {'type' : 'string',
- 'default': Method('default_instance_id'),
- 'help': 'database name',
- 'group': 'native-source', 'level': 0,
- }),
- ('db-namespace',
- {'type' : 'string',
- 'default': '',
- 'help': 'database namespace (schema) name',
- 'group': 'native-source', 'level': 1,
- }),
- ('db-user',
- {'type' : 'string',
- 'default': CubicWebNoAppConfiguration.mode == 'user' and getlogin() or 'cubicweb',
- 'help': 'database user',
- 'group': 'native-source', 'level': 0,
- }),
- ('db-password',
- {'type' : 'password',
- 'default': '',
- 'help': 'database password',
- 'group': 'native-source', 'level': 0,
- }),
- ('db-encoding',
- {'type' : 'string',
- 'default': 'utf8',
- 'help': 'database encoding',
- 'group': 'native-source', 'level': 1,
- }),
- ('db-extra-arguments',
- {'type' : 'string',
- 'default': '',
- 'help': 'set to "Trusted_Connection" if you are using SQLServer and '
- 'want trusted authentication for the database connection',
- 'group': 'native-source', 'level': 2,
- }),
- ('db-statement-timeout',
- {'type': 'int',
- 'default': 0,
- 'help': 'sql statement timeout, in milliseconds (postgres only)',
- 'group': 'native-source', 'level': 2,
- }),
- )
-
- def __init__(self, repo, source_config, *args, **kwargs):
- SQLAdapterMixIn.__init__(self, source_config, repairing=repo.config.repairing)
- self.authentifiers = [LoginPasswordAuthentifier(self)]
- if repo.config['allow-email-login']:
- self.authentifiers.insert(0, EmailPasswordAuthentifier(self))
- AbstractSource.__init__(self, repo, source_config, *args, **kwargs)
- # sql generator
- self._rql_sqlgen = self.sqlgen_class(self.schema, self.dbhelper,
- ATTR_MAP.copy())
- # full text index helper
- self.do_fti = not repo.config['delay-full-text-indexation']
- # sql queries cache
- self._cache = QueryCache(repo.config['rql-cache-size'])
- # (etype, attr) / storage mapping
- self._storages = {}
- self.binary_to_str = self.dbhelper.dbapi_module.binary_to_str
- if self.dbdriver == 'sqlite':
- self.eid_generator = SQLITEEidGenerator(self)
- else:
- self.eid_generator = DefaultEidGenerator(self)
- self.create_eid = self.eid_generator.create_eid
-
- def check_config(self, source_entity):
- """check configuration of source entity"""
- if source_entity.host_config:
- msg = source_entity._cw._('the system source has its configuration '
- 'stored on the file-system')
- raise ValidationError(source_entity.eid, {role_name('config', 'subject'): msg})
-
- def add_authentifier(self, authentifier):
- self.authentifiers.append(authentifier)
- authentifier.source = self
- authentifier.set_schema(self.schema)
-
- def reset_caches(self):
- """method called during test to reset potential source caches"""
- self._cache = QueryCache(self.repo.config['rql-cache-size'])
-
- def clear_eid_cache(self, eid, etype):
- """clear potential caches for the given eid"""
- self._cache.pop('Any X WHERE X eid %s, X is %s' % (eid, etype), None)
- self._cache.pop('Any X WHERE X eid %s' % eid, None)
- self._cache.pop('Any %s' % eid, None)
-
- @statsd_timeit
- def sqlexec(self, cnx, sql, args=None):
- """execute the query and return its result"""
- return self.process_result(self.doexec(cnx, sql, args))
-
- def init_creating(self, cnxset=None):
- # check full text index availibility
- if self.do_fti:
- if cnxset is None:
- _cnxset = self.repo._get_cnxset()
- else:
- _cnxset = cnxset
- if not self.dbhelper.has_fti_table(_cnxset.cu):
- if not self.repo.config.creating:
- self.critical('no text index table')
- self.do_fti = False
- if cnxset is None:
- _cnxset.cnxset_freed()
- self.repo._free_cnxset(_cnxset)
-
- def backup(self, backupfile, confirm, format='native'):
- """method called to create a backup of the source's data"""
- if format == 'portable':
- # ensure the schema is the one stored in the database: if repository
- # started in quick_start mode, the file system's one has been loaded
- # so force reload
- if self.repo.config.quick_start:
- self.repo.set_schema(self.repo.deserialize_schema(),
- resetvreg=False)
- helper = DatabaseIndependentBackupRestore(self)
- self.close_source_connections()
- try:
- helper.backup(backupfile)
- finally:
- self.open_source_connections()
- elif format == 'native':
- self.close_source_connections()
- try:
- self.backup_to_file(backupfile, confirm)
- finally:
- self.open_source_connections()
- else:
- raise ValueError('Unknown format %r' % format)
-
-
- def restore(self, backupfile, confirm, drop, format='native'):
- """method called to restore a backup of source's data"""
- if self.repo.config.init_cnxset_pool:
- self.close_source_connections()
- try:
- if format == 'portable':
- helper = DatabaseIndependentBackupRestore(self)
- helper.restore(backupfile)
- elif format == 'native':
- self.restore_from_file(backupfile, confirm, drop=drop)
- else:
- raise ValueError('Unknown format %r' % format)
- finally:
- if self.repo.config.init_cnxset_pool:
- self.open_source_connections()
-
-
- def init(self, activated, source_entity):
- try:
- # test if 'asource' column exists
- query = self.dbhelper.sql_add_limit_offset('SELECT asource FROM entities', 1)
- source_entity._cw.system_sql(query)
- except Exception as ex:
- self.eid_type_source = self.eid_type_source_pre_131
- super(NativeSQLSource, self).init(activated, source_entity)
- self.init_creating(source_entity._cw.cnxset)
-
- def shutdown(self):
- self.eid_generator.close()
-
- # XXX deprecates [un]map_attribute?
- def map_attribute(self, etype, attr, cb, sourcedb=True):
- self._rql_sqlgen.attr_map[u'%s.%s' % (etype, attr)] = (cb, sourcedb)
-
- def unmap_attribute(self, etype, attr):
- self._rql_sqlgen.attr_map.pop(u'%s.%s' % (etype, attr), None)
-
- def set_storage(self, etype, attr, storage):
- storage_dict = self._storages.setdefault(etype, {})
- storage_dict[attr] = storage
- self.map_attribute(etype, attr,
- storage.callback, storage.is_source_callback)
-
- def unset_storage(self, etype, attr):
- self._storages[etype].pop(attr)
- # if etype has no storage left, remove the entry
- if not self._storages[etype]:
- del self._storages[etype]
- self.unmap_attribute(etype, attr)
-
- def storage(self, etype, attr):
- """return the storage for the given entity type / attribute
- """
- try:
- return self._storages[etype][attr]
- except KeyError:
- raise Exception('no custom storage set for %s.%s' % (etype, attr))
-
- # ISource interface #######################################################
-
- @statsd_timeit
- def compile_rql(self, rql, sols):
- rqlst = self.repo.vreg.rqlhelper.parse(rql)
- rqlst.restricted_vars = ()
- rqlst.children[0].solutions = sols
- self.repo.querier.sqlgen_annotate(rqlst)
- set_qdata(self.schema.rschema, rqlst, ())
- return rqlst
-
- def set_schema(self, schema):
- """set the instance'schema"""
- self._cache = QueryCache(self.repo.config['rql-cache-size'])
- self.cache_hit, self.cache_miss, self.no_cache = 0, 0, 0
- self.schema = schema
- try:
- self._rql_sqlgen.schema = schema
- except AttributeError:
- pass # __init__
- for authentifier in self.authentifiers:
- authentifier.set_schema(self.schema)
- clear_cache(self, 'need_fti_indexation')
-
- def support_entity(self, etype, write=False):
- """return true if the given entity's type is handled by this adapter
- if write is true, return true only if it's a RW support
- """
- return not etype in NONSYSTEM_ETYPES
-
- def support_relation(self, rtype, write=False):
- """return true if the given relation's type is handled by this adapter
- if write is true, return true only if it's a RW support
- """
- if write:
- return not rtype in NONSYSTEM_RELATIONS
- # due to current multi-sources implementation, the system source
- # can't claim not supporting a relation
- return True #not rtype == 'content_for'
-
- @statsd_timeit
- def authenticate(self, cnx, login, **kwargs):
- """return CWUser eid for the given login and other authentication
- information found in kwargs, else raise `AuthenticationError`
- """
- for authentifier in self.authentifiers:
- try:
- return authentifier.authenticate(cnx, login, **kwargs)
- except AuthenticationError:
- continue
- raise AuthenticationError()
-
- def syntax_tree_search(self, cnx, union, args=None, cachekey=None,
- varmap=None):
- """return result from this source for a rql query (actually from
- a rql syntax tree and a solution dictionary mapping each used
- variable to a possible type). If cachekey is given, the query
- necessary to fetch the results (but not the results themselves)
- may be cached using this key.
- """
- assert dbg_st_search(self.uri, union, varmap, args, cachekey)
- # remember number of actually selected term (sql generation may append some)
- if cachekey is None:
- self.no_cache += 1
- # generate sql query if we are able to do so (not supported types...)
- sql, qargs, cbs = self._rql_sqlgen.generate(union, args, varmap)
- else:
- # sql may be cached
- try:
- sql, qargs, cbs = self._cache[cachekey]
- self.cache_hit += 1
- except KeyError:
- self.cache_miss += 1
- sql, qargs, cbs = self._rql_sqlgen.generate(union, args, varmap)
- self._cache[cachekey] = sql, qargs, cbs
- args = self.merge_args(args, qargs)
- assert isinstance(sql, string_types), repr(sql)
- cursor = self.doexec(cnx, sql, args)
- results = self.process_result(cursor, cnx, cbs)
- assert dbg_results(results)
- return results
-
- @contextmanager
- def _fixup_cw(self, cnx, entity):
- _cw = entity._cw
- entity._cw = cnx
- try:
- yield
- finally:
- entity._cw = _cw
-
- @contextmanager
- def _storage_handler(self, cnx, entity, event):
- # 1/ memorize values as they are before the storage is called.
- # For instance, the BFSStorage will replace the `data`
- # binary value with a Binary containing the destination path
- # on the filesystem. To make the entity.data usage absolutely
- # transparent, we'll have to reset entity.data to its binary
- # value once the SQL query will be executed
- restore_values = []
- if isinstance(entity, list):
- entities = entity
- else:
- entities = [entity]
- etype = entities[0].__regid__
- for attr, storage in self._storages.get(etype, {}).items():
- for entity in entities:
- with self._fixup_cw(cnx, entity):
- if event == 'deleted':
- storage.entity_deleted(entity, attr)
- else:
- edited = entity.cw_edited
- if attr in edited:
- handler = getattr(storage, 'entity_%s' % event)
- to_restore = handler(entity, attr)
- restore_values.append((entity, attr, to_restore))
- try:
- yield # 2/ execute the source's instructions
- finally:
- # 3/ restore original values
- for entity, attr, value in restore_values:
- entity.cw_edited.edited_attribute(attr, value)
-
- def add_entity(self, cnx, entity):
- """add a new entity to the source"""
- with self._storage_handler(cnx, entity, 'added'):
- attrs = self.preprocess_entity(entity)
- sql = self.sqlgen.insert(SQL_PREFIX + entity.cw_etype, attrs)
- self.doexec(cnx, sql, attrs)
- if cnx.ertype_supports_undo(entity.cw_etype):
- self._record_tx_action(cnx, 'tx_entity_actions', u'C',
- etype=text_type(entity.cw_etype), eid=entity.eid)
-
- def update_entity(self, cnx, entity):
- """replace an entity in the source"""
- with self._storage_handler(cnx, entity, 'updated'):
- attrs = self.preprocess_entity(entity)
- if cnx.ertype_supports_undo(entity.cw_etype):
- changes = self._save_attrs(cnx, entity, attrs)
- self._record_tx_action(cnx, 'tx_entity_actions', u'U',
- etype=text_type(entity.cw_etype), eid=entity.eid,
- changes=self._binary(pickle.dumps(changes)))
- sql = self.sqlgen.update(SQL_PREFIX + entity.cw_etype, attrs,
- ['cw_eid'])
- self.doexec(cnx, sql, attrs)
-
- def delete_entity(self, cnx, entity):
- """delete an entity from the source"""
- with self._storage_handler(cnx, entity, 'deleted'):
- if cnx.ertype_supports_undo(entity.cw_etype):
- attrs = [SQL_PREFIX + r.type
- for r in entity.e_schema.subject_relations()
- if (r.final or r.inlined) and not r in VIRTUAL_RTYPES]
- changes = self._save_attrs(cnx, entity, attrs)
- self._record_tx_action(cnx, 'tx_entity_actions', u'D',
- etype=text_type(entity.cw_etype), eid=entity.eid,
- changes=self._binary(pickle.dumps(changes)))
- attrs = {'cw_eid': entity.eid}
- sql = self.sqlgen.delete(SQL_PREFIX + entity.cw_etype, attrs)
- self.doexec(cnx, sql, attrs)
-
- def add_relation(self, cnx, subject, rtype, object, inlined=False):
- """add a relation to the source"""
- self._add_relations(cnx, rtype, [(subject, object)], inlined)
- if cnx.ertype_supports_undo(rtype):
- self._record_tx_action(cnx, 'tx_relation_actions', u'A',
- eid_from=subject, rtype=text_type(rtype), eid_to=object)
-
- def add_relations(self, cnx, rtype, subj_obj_list, inlined=False):
- """add a relations to the source"""
- self._add_relations(cnx, rtype, subj_obj_list, inlined)
- if cnx.ertype_supports_undo(rtype):
- for subject, object in subj_obj_list:
- self._record_tx_action(cnx, 'tx_relation_actions', u'A',
- eid_from=subject, rtype=text_type(rtype), eid_to=object)
-
- def _add_relations(self, cnx, rtype, subj_obj_list, inlined=False):
- """add a relation to the source"""
- sql = []
- if inlined is False:
- attrs = [{'eid_from': subject, 'eid_to': object}
- for subject, object in subj_obj_list]
- sql.append((self.sqlgen.insert('%s_relation' % rtype, attrs[0]), attrs))
- else: # used by data import
- etypes = {}
- for subject, object in subj_obj_list:
- etype = cnx.entity_metas(subject)['type']
- if etype in etypes:
- etypes[etype].append((subject, object))
- else:
- etypes[etype] = [(subject, object)]
- for subj_etype, subj_obj_list in etypes.items():
- attrs = [{'cw_eid': subject, SQL_PREFIX + rtype: object}
- for subject, object in subj_obj_list]
- sql.append((self.sqlgen.update(SQL_PREFIX + etype, attrs[0],
- ['cw_eid']),
- attrs))
- for statement, attrs in sql:
- self.doexecmany(cnx, statement, attrs)
-
- def delete_relation(self, cnx, subject, rtype, object):
- """delete a relation from the source"""
- rschema = self.schema.rschema(rtype)
- self._delete_relation(cnx, subject, rtype, object, rschema.inlined)
- if cnx.ertype_supports_undo(rtype):
- self._record_tx_action(cnx, 'tx_relation_actions', u'R',
- eid_from=subject, rtype=text_type(rtype), eid_to=object)
-
- def _delete_relation(self, cnx, subject, rtype, object, inlined=False):
- """delete a relation from the source"""
- if inlined:
- table = SQL_PREFIX + cnx.entity_metas(subject)['type']
- column = SQL_PREFIX + rtype
- sql = 'UPDATE %s SET %s=NULL WHERE %seid=%%(eid)s' % (table, column,
- SQL_PREFIX)
- attrs = {'eid' : subject}
- else:
- attrs = {'eid_from': subject, 'eid_to': object}
- sql = self.sqlgen.delete('%s_relation' % rtype, attrs)
- self.doexec(cnx, sql, attrs)
-
- @statsd_timeit
- def doexec(self, cnx, query, args=None, rollback=True):
- """Execute a query.
- it's a function just so that it shows up in profiling
- """
- cursor = cnx.cnxset.cu
- if server.DEBUG & server.DBG_SQL:
- print('exec', query, args, cnx.cnxset.cnx)
- try:
- # str(query) to avoid error if it's a unicode string
- cursor.execute(str(query), args)
- except Exception as ex:
- if self.repo.config.mode != 'test':
- # during test we get those message when trying to alter sqlite
- # db schema
- self.info("sql: %r\n args: %s\ndbms message: %r",
- query, args, ex.args[0])
- if rollback:
- try:
- cnx.cnxset.rollback()
- if self.repo.config.mode != 'test':
- self.debug('transaction has been rolled back')
- except Exception as ex:
- pass
- if ex.__class__.__name__ == 'IntegrityError':
- # need string comparison because of various backends
- for arg in ex.args:
- # postgres, sqlserver
- mo = re.search("unique_[a-z0-9]{32}", arg)
- if mo is not None:
- raise UniqueTogetherError(cnx, cstrname=mo.group(0))
- # old sqlite
- mo = re.search('columns? (.*) (?:is|are) not unique', arg)
- if mo is not None: # sqlite in use
- # we left chop the 'cw_' prefix of attribute names
- rtypes = [c.strip()[3:]
- for c in mo.group(1).split(',')]
- raise UniqueTogetherError(cnx, rtypes=rtypes)
- # sqlite after http://www.sqlite.org/cgi/src/info/c80e229dd9c1230a
- if arg.startswith('UNIQUE constraint failed:'):
- # message looks like: "UNIQUE constraint failed: foo.cw_bar, foo.cw_baz"
- # so drop the prefix, split on comma, drop the tablenames, and drop "cw_"
- columns = arg.split(':', 1)[1].split(',')
- rtypes = [c.split('.', 1)[1].strip()[3:] for c in columns]
- raise UniqueTogetherError(cnx, rtypes=rtypes)
-
- mo = re.search('"cstr[a-f0-9]{32}"', arg)
- if mo is not None:
- # postgresql
- raise ViolatedConstraint(cnx, cstrname=mo.group(0)[1:-1])
- if arg.startswith('CHECK constraint failed:'):
- # sqlite3 (new)
- raise ViolatedConstraint(cnx, cstrname=arg.split(':', 1)[1].strip())
- mo = re.match('^constraint (cstr.*) failed$', arg)
- if mo is not None:
- # sqlite3 (old)
- raise ViolatedConstraint(cnx, cstrname=mo.group(1))
- raise
- return cursor
-
- @statsd_timeit
- def doexecmany(self, cnx, query, args):
- """Execute a query.
- it's a function just so that it shows up in profiling
- """
- if server.DEBUG & server.DBG_SQL:
- print('execmany', query, 'with', len(args), 'arguments', cnx.cnxset.cnx)
- cursor = cnx.cnxset.cu
- try:
- # str(query) to avoid error if it's a unicode string
- cursor.executemany(str(query), args)
- except Exception as ex:
- if self.repo.config.mode != 'test':
- # during test we get those message when trying to alter sqlite
- # db schema
- self.critical("sql many: %r\n args: %s\ndbms message: %r",
- query, args, ex.args[0])
- try:
- cnx.cnxset.rollback()
- if self.repo.config.mode != 'test':
- self.critical('transaction has been rolled back')
- except Exception:
- pass
- raise
-
- # short cut to method requiring advanced db helper usage ##################
-
- def update_rdef_column(self, cnx, rdef):
- """update physical column for a relation definition (final or inlined)
- """
- table, column = rdef_table_column(rdef)
- coltype, allownull = rdef_physical_info(self.dbhelper, rdef)
- if not self.dbhelper.alter_column_support:
- self.error("backend can't alter %s.%s to %s%s", table, column, coltype,
- not allownull and 'NOT NULL' or '')
- return
- self.dbhelper.change_col_type(LogCursor(cnx.cnxset.cu),
- table, column, coltype, allownull)
- self.info('altered %s.%s: now %s%s', table, column, coltype,
- not allownull and 'NOT NULL' or '')
-
- def update_rdef_null_allowed(self, cnx, rdef):
- """update NULL / NOT NULL of physical column for a relation definition
- (final or inlined)
- """
- if not self.dbhelper.alter_column_support:
- # not supported (and NOT NULL not set by yams in that case, so no
- # worry)
- return
- table, column = rdef_table_column(rdef)
- coltype, allownull = rdef_physical_info(self.dbhelper, rdef)
- self.dbhelper.set_null_allowed(LogCursor(cnx.cnxset.cu),
- table, column, coltype, allownull)
-
- def update_rdef_indexed(self, cnx, rdef):
- table, column = rdef_table_column(rdef)
- if rdef.indexed:
- self.create_index(cnx, table, column)
- else:
- self.drop_index(cnx, table, column)
-
- def update_rdef_unique(self, cnx, rdef):
- table, column = rdef_table_column(rdef)
- if rdef.constraint_by_type('UniqueConstraint'):
- self.create_index(cnx, table, column, unique=True)
- else:
- self.drop_index(cnx, table, column, unique=True)
-
- def create_index(self, cnx, table, column, unique=False):
- cursor = LogCursor(cnx.cnxset.cu)
- self.dbhelper.create_index(cursor, table, column, unique)
-
- def drop_index(self, cnx, table, column, unique=False):
- cursor = LogCursor(cnx.cnxset.cu)
- self.dbhelper.drop_index(cursor, table, column, unique)
-
- # system source interface #################################################
-
- def _eid_type_source(self, cnx, eid, sql):
- try:
- res = self.doexec(cnx, sql).fetchone()
- if res is not None:
- return res
- except Exception:
- self.exception('failed to query entities table for eid %s', eid)
- raise UnknownEid(eid)
-
- def eid_type_source(self, cnx, eid): # pylint: disable=E0202
- """return a tuple (type, extid, source) for the entity with id """
- sql = 'SELECT type, extid, asource FROM entities WHERE eid=%s' % eid
- res = self._eid_type_source(cnx, eid, sql)
- if not isinstance(res, list):
- res = list(res)
- res[-2] = self.decode_extid(res[-2])
- return res
-
- def eid_type_source_pre_131(self, cnx, eid):
- """return a tuple (type, extid, source) for the entity with id """
- sql = 'SELECT type, extid FROM entities WHERE eid=%s' % eid
- res = self._eid_type_source(cnx, eid, sql)
- if not isinstance(res, list):
- res = list(res)
- res[-1] = self.decode_extid(res[-1])
- res.append("system")
- return res
-
- def extid2eid(self, cnx, extid):
- """get eid from an external id. Return None if no record found."""
- assert isinstance(extid, binary_type)
- args = {'x': b64encode(extid).decode('ascii')}
- cursor = self.doexec(cnx,
- 'SELECT eid FROM entities WHERE extid=%(x)s',
- args)
- # XXX testing rowcount cause strange bug with sqlite, results are there
- # but rowcount is 0
- #if cursor.rowcount > 0:
- try:
- result = cursor.fetchone()
- if result:
- return result[0]
- except Exception:
- pass
- cursor = self.doexec(cnx,
- 'SELECT eid FROM moved_entities WHERE extid=%(x)s',
- args)
- try:
- result = cursor.fetchone()
- if result:
- # entity was moved to the system source, return negative
- # number to tell the external source to ignore it
- return -result[0]
- except Exception:
- pass
- return None
-
- def _handle_is_relation_sql(self, cnx, sql, attrs):
- """ Handler for specific is_relation sql that may be
- overwritten in some stores"""
- self.doexec(cnx, sql % attrs)
-
- _handle_insert_entity_sql = doexec
- _handle_is_instance_of_sql = _handle_source_relation_sql = _handle_is_relation_sql
-
- def add_info(self, cnx, entity, source, extid):
- """add type and source info for an eid into the system table"""
- assert cnx.cnxset is not None
- # begin by inserting eid/type/source/extid into the entities table
- if extid is not None:
- assert isinstance(extid, binary_type)
- extid = b64encode(extid).decode('ascii')
- attrs = {'type': text_type(entity.cw_etype), 'eid': entity.eid, 'extid': extid,
- 'asource': text_type(source.uri)}
- self._handle_insert_entity_sql(cnx, self.sqlgen.insert('entities', attrs), attrs)
- # insert core relations: is, is_instance_of and cw_source
- try:
- self._handle_is_relation_sql(cnx, 'INSERT INTO is_relation(eid_from,eid_to) VALUES (%s,%s)',
- (entity.eid, eschema_eid(cnx, entity.e_schema)))
- except IndexError:
- # during schema serialization, skip
- pass
- else:
- for eschema in entity.e_schema.ancestors() + [entity.e_schema]:
- self._handle_is_relation_sql(cnx,
- 'INSERT INTO is_instance_of_relation(eid_from,eid_to) VALUES (%s,%s)',
- (entity.eid, eschema_eid(cnx, eschema)))
- if 'CWSource' in self.schema and source.eid is not None: # else, cw < 3.10
- self._handle_is_relation_sql(cnx, 'INSERT INTO cw_source_relation(eid_from,eid_to) VALUES (%s,%s)',
- (entity.eid, source.eid))
- # now we can update the full text index
- if self.need_fti_indexation(entity.cw_etype):
- self.index_entity(cnx, entity=entity)
-
- def update_info(self, cnx, entity, need_fti_update):
- """mark entity as being modified, fulltext reindex if needed"""
- if need_fti_update:
- # reindex the entity only if this query is updating at least
- # one indexable attribute
- self.index_entity(cnx, entity=entity)
-
- def delete_info_multi(self, cnx, entities):
- """delete system information on deletion of a list of entities with the
- same etype and belinging to the same source
-
- * update the fti
- * remove record from the `entities` table
- """
- self.fti_unindex_entities(cnx, entities)
- attrs = {'eid': '(%s)' % ','.join([str(_e.eid) for _e in entities])}
- self.doexec(cnx, self.sqlgen.delete_many('entities', attrs), attrs)
-
- # undo support #############################################################
-
- def undoable_transactions(self, cnx, ueid=None, **actionfilters):
- """See :class:`cubicweb.repoapi.Connection.undoable_transactions`"""
- # force filtering to connection's user if not a manager
- if not cnx.user.is_in_group('managers'):
- ueid = cnx.user.eid
- restr = {}
- if ueid is not None:
- restr['tx_user'] = ueid
- sql = self.sqlgen.select('transactions', restr, ('tx_uuid', 'tx_time', 'tx_user'))
- if actionfilters:
- # we will need subqueries to filter transactions according to
- # actions done
- tearestr = {} # filters on the tx_entity_actions table
- trarestr = {} # filters on the tx_relation_actions table
- genrestr = {} # generic filters, appliyable to both table
- # unless public explicitly set to false, we only consider public
- # actions
- if actionfilters.pop('public', True):
- genrestr['txa_public'] = True
- # put additional filters in trarestr and/or tearestr
- for key, val in actionfilters.items():
- if key == 'etype':
- # filtering on etype implies filtering on entity actions
- # only, and with no eid specified
- assert actionfilters.get('action', 'C') in 'CUD'
- assert not 'eid' in actionfilters
- tearestr['etype'] = text_type(val)
- elif key == 'eid':
- # eid filter may apply to 'eid' of tx_entity_actions or to
- # 'eid_from' OR 'eid_to' of tx_relation_actions
- if actionfilters.get('action', 'C') in 'CUD':
- tearestr['eid'] = val
- if actionfilters.get('action', 'A') in 'AR':
- trarestr['eid_from'] = val
- trarestr['eid_to'] = val
- elif key == 'action':
- if val in 'CUD':
- tearestr['txa_action'] = text_type(val)
- else:
- assert val in 'AR'
- trarestr['txa_action'] = text_type(val)
- else:
- raise AssertionError('unknow filter %s' % key)
- assert trarestr or tearestr, "can't only filter on 'public'"
- subqsqls = []
- # append subqueries to the original query, using EXISTS()
- if trarestr or (genrestr and not tearestr):
- trarestr.update(genrestr)
- trasql = self.sqlgen.select('tx_relation_actions', trarestr, ('1',))
- if 'eid_from' in trarestr:
- # replace AND by OR between eid_from/eid_to restriction
- trasql = sql_or_clauses(trasql, ['eid_from = %(eid_from)s',
- 'eid_to = %(eid_to)s'])
- trasql += ' AND transactions.tx_uuid=tx_relation_actions.tx_uuid'
- subqsqls.append('EXISTS(%s)' % trasql)
- if tearestr or (genrestr and not trarestr):
- tearestr.update(genrestr)
- teasql = self.sqlgen.select('tx_entity_actions', tearestr, ('1',))
- teasql += ' AND transactions.tx_uuid=tx_entity_actions.tx_uuid'
- subqsqls.append('EXISTS(%s)' % teasql)
- if restr:
- sql += ' AND %s' % ' OR '.join(subqsqls)
- else:
- sql += ' WHERE %s' % ' OR '.join(subqsqls)
- restr.update(trarestr)
- restr.update(tearestr)
- # we want results ordered by transaction's time descendant
- sql += ' ORDER BY tx_time DESC'
- cu = self.doexec(cnx, sql, restr)
- # turn results into transaction objects
- return [tx.Transaction(cnx, *args) for args in cu.fetchall()]
-
- def tx_info(self, cnx, txuuid):
- """See :class:`cubicweb.repoapi.Connection.transaction_info`"""
- return tx.Transaction(cnx, txuuid, *self._tx_info(cnx, text_type(txuuid)))
-
- def tx_actions(self, cnx, txuuid, public):
- """See :class:`cubicweb.repoapi.Connection.transaction_actions`"""
- txuuid = text_type(txuuid)
- self._tx_info(cnx, txuuid)
- restr = {'tx_uuid': txuuid}
- if public:
- restr['txa_public'] = True
- # XXX use generator to avoid loading everything in memory?
- sql = self.sqlgen.select('tx_entity_actions', restr,
- ('txa_action', 'txa_public', 'txa_order',
- 'etype', 'eid', 'changes'))
- with cnx.ensure_cnx_set:
- cu = self.doexec(cnx, sql, restr)
- actions = [tx.EntityAction(a,p,o,et,e,c and pickle.loads(self.binary_to_str(c)))
- for a,p,o,et,e,c in cu.fetchall()]
- sql = self.sqlgen.select('tx_relation_actions', restr,
- ('txa_action', 'txa_public', 'txa_order',
- 'rtype', 'eid_from', 'eid_to'))
- with cnx.ensure_cnx_set:
- cu = self.doexec(cnx, sql, restr)
- actions += [tx.RelationAction(*args) for args in cu.fetchall()]
- return sorted(actions, key=lambda x: x.order)
-
- def undo_transaction(self, cnx, txuuid):
- """See :class:`cubicweb.repoapi.Connection.undo_transaction`
-
- important note: while undoing of a transaction, only hooks in the
- 'integrity', 'activeintegrity' and 'undo' categories are called.
- """
- errors = []
- cnx.transaction_data['undoing_uuid'] = txuuid
- with cnx.deny_all_hooks_but('integrity', 'activeintegrity', 'undo'):
- with cnx.security_enabled(read=False):
- for action in reversed(self.tx_actions(cnx, txuuid, False)):
- undomethod = getattr(self, '_undo_%s' % action.action.lower())
- errors += undomethod(cnx, action)
- # remove the transactions record
- self.doexec(cnx,
- "DELETE FROM transactions WHERE tx_uuid='%s'" % txuuid)
- if errors:
- raise UndoTransactionException(txuuid, errors)
- else:
- return
-
- def start_undoable_transaction(self, cnx, uuid):
- """connection callback to insert a transaction record in the transactions
- table when some undoable transaction is started
- """
- ueid = cnx.user.eid
- attrs = {'tx_uuid': uuid, 'tx_user': ueid, 'tx_time': datetime.utcnow()}
- self.doexec(cnx, self.sqlgen.insert('transactions', attrs), attrs)
-
- def _save_attrs(self, cnx, entity, attrs):
- """return a pickleable dictionary containing current values for given
- attributes of the entity
- """
- restr = {'cw_eid': entity.eid}
- sql = self.sqlgen.select(SQL_PREFIX + entity.cw_etype, restr, attrs)
- cu = self.doexec(cnx, sql, restr)
- values = dict(zip(attrs, cu.fetchone()))
- # ensure backend specific binary are converted back to string
- eschema = entity.e_schema
- for column in attrs:
- # [3:] remove 'cw_' prefix
- attr = column[3:]
- if not eschema.subjrels[attr].final:
- continue
- if eschema.destination(attr) in ('Password', 'Bytes'):
- value = values[column]
- if value is not None:
- values[column] = self.binary_to_str(value)
- return values
-
- def _record_tx_action(self, cnx, table, action, **kwargs):
- """record a transaction action in the given table (either
- 'tx_entity_actions' or 'tx_relation_action')
- """
- kwargs['tx_uuid'] = cnx.transaction_uuid()
- kwargs['txa_action'] = action
- kwargs['txa_order'] = cnx.transaction_inc_action_counter()
- kwargs['txa_public'] = not cnx.hooks_in_progress
- self.doexec(cnx, self.sqlgen.insert(table, kwargs), kwargs)
-
- def _tx_info(self, cnx, txuuid):
- """return transaction's time and user of the transaction with the given uuid.
-
- raise `NoSuchTransaction` if there is no such transaction of if the
- connection's user isn't allowed to see it.
- """
- restr = {'tx_uuid': txuuid}
- sql = self.sqlgen.select('transactions', restr,
- ('tx_time', 'tx_user'))
- cu = self.doexec(cnx, sql, restr)
- try:
- time, ueid = cu.fetchone()
- except TypeError:
- raise tx.NoSuchTransaction(txuuid)
- if not (cnx.user.is_in_group('managers')
- or cnx.user.eid == ueid):
- raise tx.NoSuchTransaction(txuuid)
- return time, ueid
-
- def _reedit_entity(self, entity, changes, err):
- cnx = entity._cw
- eid = entity.eid
- entity.cw_edited = edited = EditedEntity(entity)
- # check for schema changes, entities linked through inlined relation
- # still exists, rewrap binary values
- eschema = entity.e_schema
- getrschema = eschema.subjrels
- for column, value in changes.items():
- rtype = column[len(SQL_PREFIX):]
- if rtype == "eid":
- continue # XXX should even `eid` be stored in action changes?
- try:
- rschema = getrschema[rtype]
- except KeyError:
- err(cnx._("can't restore relation %(rtype)s of entity %(eid)s, "
- "this relation does not exist in the schema anymore.")
- % {'rtype': rtype, 'eid': eid})
- if not rschema.final:
- if not rschema.inlined:
- assert value is None
- # rschema is an inlined relation
- elif value is not None:
- # not a deletion: we must put something in edited
- try:
- entity._cw.entity_from_eid(value) # check target exists
- edited[rtype] = value
- except UnknownEid:
- err(cnx._("can't restore entity %(eid)s of type %(eschema)s, "
- "target of %(rtype)s (eid %(value)s) does not exist any longer")
- % locals())
- changes[column] = None
- elif eschema.destination(rtype) in ('Bytes', 'Password'):
- changes[column] = self._binary(value)
- edited[rtype] = Binary(value)
- elif PY2 and isinstance(value, str):
- edited[rtype] = text_type(value, cnx.encoding, 'replace')
- else:
- edited[rtype] = value
- # This must only be done after init_entitiy_caches : defered in calling functions
- # edited.check()
-
- def _undo_d(self, cnx, action):
- """undo an entity deletion"""
- errors = []
- err = errors.append
- eid = action.eid
- etype = action.etype
- _ = cnx._
- # get an entity instance
- try:
- entity = self.repo.vreg['etypes'].etype_class(etype)(cnx)
- except Exception:
- err("can't restore entity %s of type %s, type no more supported"
- % (eid, etype))
- return errors
- self._reedit_entity(entity, action.changes, err)
- entity.eid = eid
- cnx.repo.init_entity_caches(cnx, entity, self)
- entity.cw_edited.check()
- self.repo.hm.call_hooks('before_add_entity', cnx, entity=entity)
- # restore the entity
- action.changes['cw_eid'] = eid
- # restore record in entities (will update fti if needed)
- self.add_info(cnx, entity, self, None)
- sql = self.sqlgen.insert(SQL_PREFIX + etype, action.changes)
- self.doexec(cnx, sql, action.changes)
- self.repo.hm.call_hooks('after_add_entity', cnx, entity=entity)
- return errors
-
- def _undo_r(self, cnx, action):
- """undo a relation removal"""
- errors = []
- subj, rtype, obj = action.eid_from, action.rtype, action.eid_to
- try:
- sentity, oentity, rdef = _undo_rel_info(cnx, subj, rtype, obj)
- except _UndoException as ex:
- errors.append(text_type(ex))
- else:
- for role, entity in (('subject', sentity),
- ('object', oentity)):
- try:
- _undo_check_relation_target(entity, rdef, role)
- except _UndoException as ex:
- errors.append(text_type(ex))
- continue
- if not errors:
- self.repo.hm.call_hooks('before_add_relation', cnx,
- eidfrom=subj, rtype=rtype, eidto=obj)
- # add relation in the database
- self._add_relations(cnx, rtype, [(subj, obj)], rdef.rtype.inlined)
- # set related cache
- cnx.update_rel_cache_add(subj, rtype, obj, rdef.rtype.symmetric)
- self.repo.hm.call_hooks('after_add_relation', cnx,
- eidfrom=subj, rtype=rtype, eidto=obj)
- return errors
-
- def _undo_c(self, cnx, action):
- """undo an entity creation"""
- eid = action.eid
- # XXX done to avoid fetching all remaining relation for the entity
- # we should find an efficient way to do this (keeping current veolidf
- # massive deletion performance)
- if _undo_has_later_transaction(cnx, eid):
- msg = cnx._('some later transaction(s) touch entity, undo them '
- 'first')
- raise ValidationError(eid, {None: msg})
- etype = action.etype
- # get an entity instance
- try:
- entity = self.repo.vreg['etypes'].etype_class(etype)(cnx)
- except Exception:
- return [cnx._(
- "Can't undo creation of entity %(eid)s of type %(etype)s, type "
- "no more supported" % {'eid': eid, 'etype': etype})]
- entity.eid = eid
- # for proper eid/type cache update
- CleanupDeletedEidsCacheOp.get_instance(cnx).add_data(eid)
- self.repo.hm.call_hooks('before_delete_entity', cnx, entity=entity)
- # remove is / is_instance_of which are added using sql by hooks, hence
- # unvisible as transaction action
- self.doexec(cnx, 'DELETE FROM is_relation WHERE eid_from=%s' % eid)
- self.doexec(cnx, 'DELETE FROM is_instance_of_relation WHERE eid_from=%s' % eid)
- self.doexec(cnx, 'DELETE FROM cw_source_relation WHERE eid_from=%s' % eid)
- # XXX check removal of inlined relation?
- # delete the entity
- attrs = {'cw_eid': eid}
- sql = self.sqlgen.delete(SQL_PREFIX + entity.cw_etype, attrs)
- self.doexec(cnx, sql, attrs)
- # remove record from entities (will update fti if needed)
- self.delete_info_multi(cnx, [entity])
- self.repo.hm.call_hooks('after_delete_entity', cnx, entity=entity)
- return ()
-
- def _undo_u(self, cnx, action):
- """undo an entity update"""
- errors = []
- err = errors.append
- try:
- entity = cnx.entity_from_eid(action.eid)
- except UnknownEid:
- err(cnx._("can't restore state of entity %s, it has been "
- "deleted inbetween") % action.eid)
- return errors
- self._reedit_entity(entity, action.changes, err)
- entity.cw_edited.check()
- self.repo.hm.call_hooks('before_update_entity', cnx, entity=entity)
- sql = self.sqlgen.update(SQL_PREFIX + entity.cw_etype, action.changes,
- ['cw_eid'])
- self.doexec(cnx, sql, action.changes)
- self.repo.hm.call_hooks('after_update_entity', cnx, entity=entity)
- return errors
-
- def _undo_a(self, cnx, action):
- """undo a relation addition"""
- errors = []
- subj, rtype, obj = action.eid_from, action.rtype, action.eid_to
- try:
- sentity, oentity, rdef = _undo_rel_info(cnx, subj, rtype, obj)
- except _UndoException as ex:
- errors.append(text_type(ex))
- else:
- rschema = rdef.rtype
- if rschema.inlined:
- sql = 'SELECT 1 FROM cw_%s WHERE cw_eid=%s and cw_%s=%s'\
- % (sentity.cw_etype, subj, rtype, obj)
- else:
- sql = 'SELECT 1 FROM %s_relation WHERE eid_from=%s and eid_to=%s'\
- % (rtype, subj, obj)
- cu = self.doexec(cnx, sql)
- if cu.fetchone() is None:
- errors.append(cnx._(
- "Can't undo addition of relation %(rtype)s from %(subj)s to"
- " %(obj)s, doesn't exist anymore" % locals()))
- if not errors:
- self.repo.hm.call_hooks('before_delete_relation', cnx,
- eidfrom=subj, rtype=rtype, eidto=obj)
- # delete relation from the database
- self._delete_relation(cnx, subj, rtype, obj, rschema.inlined)
- # set related cache
- cnx.update_rel_cache_del(subj, rtype, obj, rschema.symmetric)
- self.repo.hm.call_hooks('after_delete_relation', cnx,
- eidfrom=subj, rtype=rtype, eidto=obj)
- return errors
-
- # full text index handling #################################################
-
- @cached
- def need_fti_indexation(self, etype):
- eschema = self.schema.eschema(etype)
- if any(eschema.indexable_attributes()):
- return True
- if any(eschema.fulltext_containers()):
- return True
- return False
-
- def index_entity(self, cnx, entity):
- """create an operation to [re]index textual content of the given entity
- on commit
- """
- if self.do_fti:
- FTIndexEntityOp.get_instance(cnx).add_data(entity.eid)
-
- def fti_unindex_entities(self, cnx, entities):
- """remove text content for entities from the full text index
- """
- cursor = cnx.cnxset.cu
- cursor_unindex_object = self.dbhelper.cursor_unindex_object
- try:
- for entity in entities:
- cursor_unindex_object(entity.eid, cursor)
- except Exception: # let KeyboardInterrupt / SystemExit propagate
- self.exception('error while unindexing %s', entity)
-
-
- def fti_index_entities(self, cnx, entities):
- """add text content of created/modified entities to the full text index
- """
- cursor_index_object = self.dbhelper.cursor_index_object
- cursor = cnx.cnxset.cu
- try:
- # use cursor_index_object, not cursor_reindex_object since
- # unindexing done in the FTIndexEntityOp
- for entity in entities:
- cursor_index_object(entity.eid,
- entity.cw_adapt_to('IFTIndexable'),
- cursor)
- except Exception: # let KeyboardInterrupt / SystemExit propagate
- self.exception('error while indexing %s', entity)
-
-
-class FTIndexEntityOp(hook.DataOperationMixIn, hook.LateOperation):
- """operation to delay entity full text indexation to commit
-
- since fti indexing may trigger discovery of other entities, it should be
- triggered on precommit, not commit, and this should be done after other
- precommit operation which may add relations to the entity
- """
-
- def precommit_event(self):
- cnx = self.cnx
- source = cnx.repo.system_source
- pendingeids = cnx.transaction_data.get('pendingeids', ())
- done = cnx.transaction_data.setdefault('indexedeids', set())
- to_reindex = set()
- for eid in self.get_data():
- if eid in pendingeids or eid in done:
- # entity added and deleted in the same transaction or already
- # processed
- continue
- done.add(eid)
- iftindexable = cnx.entity_from_eid(eid).cw_adapt_to('IFTIndexable')
- to_reindex |= set(iftindexable.fti_containers())
- source.fti_unindex_entities(cnx, to_reindex)
- source.fti_index_entities(cnx, to_reindex)
-
-def sql_schema(driver):
- helper = get_db_helper(driver)
- typemap = helper.TYPE_MAPPING
- schema = """
-/* Create the repository's system database */
-
-%s
-
-CREATE TABLE entities (
- eid INTEGER PRIMARY KEY NOT NULL,
- type VARCHAR(64) NOT NULL,
- asource VARCHAR(128) NOT NULL,
- extid VARCHAR(256)
-);;
-CREATE INDEX entities_type_idx ON entities(type);;
-CREATE TABLE moved_entities (
- eid INTEGER PRIMARY KEY NOT NULL,
- extid VARCHAR(256) UNIQUE NOT NULL
-);;
-
-CREATE TABLE transactions (
- tx_uuid CHAR(32) PRIMARY KEY NOT NULL,
- tx_user INTEGER NOT NULL,
- tx_time %s NOT NULL
-);;
-CREATE INDEX transactions_tx_user_idx ON transactions(tx_user);;
-CREATE INDEX transactions_tx_time_idx ON transactions(tx_time);;
-
-CREATE TABLE tx_entity_actions (
- tx_uuid CHAR(32) REFERENCES transactions(tx_uuid) ON DELETE CASCADE,
- txa_action CHAR(1) NOT NULL,
- txa_public %s NOT NULL,
- txa_order INTEGER,
- eid INTEGER NOT NULL,
- etype VARCHAR(64) NOT NULL,
- changes %s
-);;
-CREATE INDEX tx_entity_actions_txa_action_idx ON tx_entity_actions(txa_action);;
-CREATE INDEX tx_entity_actions_txa_public_idx ON tx_entity_actions(txa_public);;
-CREATE INDEX tx_entity_actions_eid_idx ON tx_entity_actions(eid);;
-CREATE INDEX tx_entity_actions_etype_idx ON tx_entity_actions(etype);;
-CREATE INDEX tx_entity_actions_tx_uuid_idx ON tx_entity_actions(tx_uuid);;
-
-CREATE TABLE tx_relation_actions (
- tx_uuid CHAR(32) REFERENCES transactions(tx_uuid) ON DELETE CASCADE,
- txa_action CHAR(1) NOT NULL,
- txa_public %s NOT NULL,
- txa_order INTEGER,
- eid_from INTEGER NOT NULL,
- eid_to INTEGER NOT NULL,
- rtype VARCHAR(256) NOT NULL
-);;
-CREATE INDEX tx_relation_actions_txa_action_idx ON tx_relation_actions(txa_action);;
-CREATE INDEX tx_relation_actions_txa_public_idx ON tx_relation_actions(txa_public);;
-CREATE INDEX tx_relation_actions_eid_from_idx ON tx_relation_actions(eid_from);;
-CREATE INDEX tx_relation_actions_eid_to_idx ON tx_relation_actions(eid_to);;
-CREATE INDEX tx_relation_actions_tx_uuid_idx ON tx_relation_actions(tx_uuid);;
-""" % (helper.sql_create_numrange('entities_id_seq').replace(';', ';;'),
- typemap['Datetime'],
- typemap['Boolean'], typemap['Bytes'], typemap['Boolean'])
- if helper.backend_name == 'sqlite':
- # sqlite support the ON DELETE CASCADE syntax but do nothing
- schema += '''
-CREATE TRIGGER fkd_transactions
-BEFORE DELETE ON transactions
-FOR EACH ROW BEGIN
- DELETE FROM tx_entity_actions WHERE tx_uuid=OLD.tx_uuid;
- DELETE FROM tx_relation_actions WHERE tx_uuid=OLD.tx_uuid;
-END;;
-'''
- schema += ';;'.join(helper.sqls_create_multicol_unique_index('entities', ['extid']))
- schema += ';;\n'
- return schema
-
-
-def sql_drop_schema(driver):
- helper = get_db_helper(driver)
- return """
-%s;
-%s
-DROP TABLE entities;
-DROP TABLE tx_entity_actions;
-DROP TABLE tx_relation_actions;
-DROP TABLE transactions;
-""" % (';'.join(helper.sqls_drop_multicol_unique_index('entities', ['extid'])),
- helper.sql_drop_numrange('entities_id_seq'))
-
-
-def grant_schema(user, set_owner=True):
- result = ''
- for table in ('entities', 'entities_id_seq',
- 'transactions', 'tx_entity_actions', 'tx_relation_actions'):
- if set_owner:
- result = 'ALTER TABLE %s OWNER TO %s;\n' % (table, user)
- result += 'GRANT ALL ON %s TO %s;\n' % (table, user)
- return result
-
-
-class BaseAuthentifier(object):
-
- def __init__(self, source=None):
- self.source = source
-
- def set_schema(self, schema):
- """set the instance'schema"""
- pass
-
-class LoginPasswordAuthentifier(BaseAuthentifier):
- passwd_rql = 'Any P WHERE X is CWUser, X login %(login)s, X upassword P'
- auth_rql = (u'Any X WHERE X is CWUser, X login %(login)s, X upassword %(pwd)s, '
- 'X cw_source S, S name "system"')
- _sols = ({'X': 'CWUser', 'P': 'Password', 'S': 'CWSource'},)
-
- def set_schema(self, schema):
- """set the instance'schema"""
- if 'CWUser' in schema: # probably an empty schema if not true...
- # rql syntax trees used to authenticate users
- self._passwd_rqlst = self.source.compile_rql(self.passwd_rql, self._sols)
- self._auth_rqlst = self.source.compile_rql(self.auth_rql, self._sols)
-
- def authenticate(self, cnx, login, password=None, **kwargs):
- """return CWUser eid for the given login/password if this account is
- defined in this source, else raise `AuthenticationError`
-
- two queries are needed since passwords are stored crypted, so we have
- to fetch the salt first
- """
- args = {'login': login, 'pwd' : None}
- if password is not None:
- rset = self.source.syntax_tree_search(cnx, self._passwd_rqlst, args)
- try:
- pwd = rset[0][0]
- except IndexError:
- raise AuthenticationError('bad login')
- if pwd is None:
- # if pwd is None but a password is provided, something is wrong
- raise AuthenticationError('bad password')
- # passwords are stored using the Bytes type, so we get a StringIO
- args['pwd'] = Binary(crypt_password(password, pwd.getvalue()))
- # get eid from login and (crypted) password
- rset = self.source.syntax_tree_search(cnx, self._auth_rqlst, args)
- pwd = args['pwd']
- try:
- user = rset[0][0]
- # If the stored hash uses a deprecated scheme (e.g. DES or MD5 used
- # before 3.14.7), update with a fresh one
- if pwd is not None and pwd.getvalue():
- verify, newhash = verify_and_update(password, pwd.getvalue())
- if not verify: # should not happen, but...
- raise AuthenticationError('bad password')
- if newhash:
- cnx.system_sql("UPDATE %s SET %s=%%(newhash)s WHERE %s=%%(login)s" % (
- SQL_PREFIX + 'CWUser',
- SQL_PREFIX + 'upassword',
- SQL_PREFIX + 'login'),
- {'newhash': self.source._binary(newhash.encode('ascii')),
- 'login': login})
- cnx.commit()
- return user
- except IndexError:
- raise AuthenticationError('bad password')
-
-
-class EmailPasswordAuthentifier(BaseAuthentifier):
- def authenticate(self, cnx, login, **authinfo):
- # email_auth flag prevent from infinite recursion (call to
- # repo.check_auth_info at the end of this method may lead us here again)
- if not '@' in login or authinfo.pop('email_auth', None):
- raise AuthenticationError('not an email')
- rset = cnx.execute('Any L WHERE U login L, U primary_email M, '
- 'M address %(login)s', {'login': login},
- build_descr=False)
- if rset.rowcount != 1:
- raise AuthenticationError('unexisting email')
- login = rset.rows[0][0]
- authinfo['email_auth'] = True
- return self.source.repo.check_auth_info(cnx, login, authinfo)
-
-
-class DatabaseIndependentBackupRestore(object):
- """Helper class to perform db backend agnostic backup and restore
-
- The backup and restore methods are used to dump / restore the
- system database in a database independent format. The file is a
- Zip archive containing the following files:
-
- * format.txt: the format of the archive. Currently '1.1'
- * tables.txt: list of filenames in the archive tables/ directory
- * sequences.txt: list of filenames in the archive sequences/ directory
- * numranges.txt: list of filenames in the archive numrange/ directory
- * versions.txt: the list of cube versions from CWProperty
- * tables/.: pickled data
- * sequences/: pickled data
-
- The pickled data format for tables, numranges and sequences is a tuple of 3 elements:
- * the table name
- * a tuple of column names
- * a list of rows (as tuples with one element per column)
-
- Tables are saved in chunks in different files in order to prevent
- a too high memory consumption.
- """
- blocksize = 100
-
- def __init__(self, source):
- """
- :param: source an instance of the system source
- """
- self._source = source
- self.logger = logging.getLogger('cubicweb.ctl')
- self.logger.setLevel(logging.INFO)
- self.logger.addHandler(logging.StreamHandler(sys.stdout))
- self.schema = self._source.schema
- self.dbhelper = self._source.dbhelper
- self.cnx = None
- self.cursor = None
- self.sql_generator = sqlgen.SQLGenerator()
-
- def get_connection(self):
- return self._source.get_connection()
-
- def backup(self, backupfile):
- archive = zipfile.ZipFile(backupfile, 'w', allowZip64=True)
- self.cnx = self.get_connection()
- try:
- self.cursor = self.cnx.cursor()
- self.cursor.arraysize = 100
- self.logger.info('writing metadata')
- self.write_metadata(archive)
- for seq in self.get_sequences():
- self.logger.info('processing sequence %s', seq)
- self.write_sequence(archive, seq)
- for numrange in self.get_numranges():
- self.logger.info('processing numrange %s', numrange)
- self.write_numrange(archive, numrange)
- for table in self.get_tables():
- self.logger.info('processing table %s', table)
- self.write_table(archive, table)
- finally:
- archive.close()
- self.cnx.close()
- self.logger.info('done')
-
- def get_tables(self):
- non_entity_tables = ['entities',
- 'transactions',
- 'tx_entity_actions',
- 'tx_relation_actions',
- ]
- etype_tables = []
- relation_tables = []
- prefix = 'cw_'
- for etype in self.schema.entities():
- eschema = self.schema.eschema(etype)
- if eschema.final:
- continue
- etype_tables.append('%s%s'%(prefix, etype))
- for rtype in self.schema.relations():
- rschema = self.schema.rschema(rtype)
- if rschema.final or rschema.inlined or rschema in VIRTUAL_RTYPES:
- continue
- relation_tables.append('%s_relation' % rtype)
- return non_entity_tables + etype_tables + relation_tables
-
- def get_sequences(self):
- return []
-
- def get_numranges(self):
- return ['entities_id_seq']
-
- def write_metadata(self, archive):
- archive.writestr('format.txt', '1.1')
- archive.writestr('tables.txt', '\n'.join(self.get_tables()))
- archive.writestr('sequences.txt', '\n'.join(self.get_sequences()))
- archive.writestr('numranges.txt', '\n'.join(self.get_numranges()))
- versions = self._get_versions()
- versions_str = '\n'.join('%s %s' % (k, v)
- for k, v in versions)
- archive.writestr('versions.txt', versions_str)
-
- def write_sequence(self, archive, seq):
- sql = self.dbhelper.sql_sequence_current_state(seq)
- columns, rows_iterator = self._get_cols_and_rows(sql)
- rows = list(rows_iterator)
- serialized = self._serialize(seq, columns, rows)
- archive.writestr('sequences/%s' % seq, serialized)
-
- def write_numrange(self, archive, numrange):
- sql = self.dbhelper.sql_numrange_current_state(numrange)
- columns, rows_iterator = self._get_cols_and_rows(sql)
- rows = list(rows_iterator)
- serialized = self._serialize(numrange, columns, rows)
- archive.writestr('numrange/%s' % numrange, serialized)
-
- def write_table(self, archive, table):
- nb_lines_sql = 'SELECT COUNT(*) FROM %s' % table
- self.cursor.execute(nb_lines_sql)
- rowcount = self.cursor.fetchone()[0]
- sql = 'SELECT * FROM %s' % table
- columns, rows_iterator = self._get_cols_and_rows(sql)
- self.logger.info('number of rows: %d', rowcount)
- blocksize = self.blocksize
- if rowcount > 0:
- for i, start in enumerate(range(0, rowcount, blocksize)):
- rows = list(itertools.islice(rows_iterator, blocksize))
- serialized = self._serialize(table, columns, rows)
- archive.writestr('tables/%s.%04d' % (table, i), serialized)
- self.logger.debug('wrote rows %d to %d (out of %d) to %s.%04d',
- start, start+len(rows)-1,
- rowcount,
- table, i)
- else:
- rows = []
- serialized = self._serialize(table, columns, rows)
- archive.writestr('tables/%s.%04d' % (table, 0), serialized)
-
- def _get_cols_and_rows(self, sql):
- process_result = self._source.iter_process_result
- self.cursor.execute(sql)
- columns = (d[0] for d in self.cursor.description)
- rows = process_result(self.cursor)
- return tuple(columns), rows
-
- def _serialize(self, name, columns, rows):
- return pickle.dumps((name, columns, rows), pickle.HIGHEST_PROTOCOL)
-
- def restore(self, backupfile):
- archive = zipfile.ZipFile(backupfile, 'r', allowZip64=True)
- self.cnx = self.get_connection()
- self.cursor = self.cnx.cursor()
- sequences, numranges, tables, table_chunks = self.read_metadata(archive, backupfile)
- for seq in sequences:
- self.logger.info('restoring sequence %s', seq)
- self.read_sequence(archive, seq)
- for numrange in numranges:
- self.logger.info('restoring numrange %s', numrange)
- self.read_numrange(archive, numrange)
- for table in tables:
- self.logger.info('restoring table %s', table)
- self.read_table(archive, table, sorted(table_chunks[table]))
- self.cnx.close()
- archive.close()
- self.logger.info('done')
-
- def read_metadata(self, archive, backupfile):
- formatinfo = archive.read('format.txt')
- self.logger.info('checking metadata')
- if formatinfo.strip() != "1.1":
- self.logger.critical('Unsupported format in archive: %s', formatinfo)
- raise ValueError('Unknown format in %s: %s' % (backupfile, formatinfo))
- tables = archive.read('tables.txt').splitlines()
- sequences = archive.read('sequences.txt').splitlines()
- numranges = archive.read('numranges.txt').splitlines()
- file_versions = self._parse_versions(archive.read('versions.txt'))
- versions = set(self._get_versions())
- if file_versions != versions:
- self.logger.critical('Unable to restore : versions do not match')
- self.logger.critical('Expected:\n%s', '\n'.join('%s : %s' % (cube, ver)
- for cube, ver in sorted(versions)))
- self.logger.critical('Found:\n%s', '\n'.join('%s : %s' % (cube, ver)
- for cube, ver in sorted(file_versions)))
- raise ValueError('Unable to restore : versions do not match')
- table_chunks = {}
- for name in archive.namelist():
- if not name.startswith('tables/'):
- continue
- filename = basename(name)
- tablename, _ext = filename.rsplit('.', 1)
- table_chunks.setdefault(tablename, []).append(name)
- return sequences, numranges, tables, table_chunks
-
- def read_sequence(self, archive, seq):
- seqname, columns, rows = pickle.loads(archive.read('sequences/%s' % seq))
- assert seqname == seq
- assert len(rows) == 1
- assert len(rows[0]) == 1
- value = rows[0][0]
- sql = self.dbhelper.sql_restart_sequence(seq, value)
- self.cursor.execute(sql)
- self.cnx.commit()
-
- def read_numrange(self, archive, numrange):
- rangename, columns, rows = pickle.loads(archive.read('numrange/%s' % numrange))
- assert rangename == numrange
- assert len(rows) == 1
- assert len(rows[0]) == 1
- value = rows[0][0]
- sql = self.dbhelper.sql_restart_numrange(numrange, value)
- self.cursor.execute(sql)
- self.cnx.commit()
-
- def read_table(self, archive, table, filenames):
- merge_args = self._source.merge_args
- self.cursor.execute('DELETE FROM %s' % table)
- self.cnx.commit()
- row_count = 0
- for filename in filenames:
- tablename, columns, rows = pickle.loads(archive.read(filename))
- assert tablename == table
- if not rows:
- continue
- insert = self.sql_generator.insert(table,
- dict(zip(columns, rows[0])))
- for row in rows:
- self.cursor.execute(insert, merge_args(dict(zip(columns, row)), {}))
- row_count += len(rows)
- self.cnx.commit()
- self.logger.info('inserted %d rows', row_count)
-
-
- def _parse_versions(self, version_str):
- versions = set()
- for line in version_str.splitlines():
- versions.add(tuple(line.split()))
- return versions
-
- def _get_versions(self):
- version_sql = 'SELECT cw_pkey, cw_value FROM cw_CWProperty'
- versions = []
- self.cursor.execute(version_sql)
- for pkey, value in self.cursor.fetchall():
- if pkey.startswith(u'system.version'):
- versions.append((pkey, value))
- return versions