cubicweb/server/sources/native.py
changeset 11057 0b59724cb3f2
parent 11005 f8417bd135ed
child 11087 35b29f1eb37a
equal deleted inserted replaced
11052:058bb3dc685f 11057:0b59724cb3f2
       
     1 # copyright 2003-2014 LOGILAB S.A. (Paris, FRANCE), all rights reserved.
       
     2 # contact http://www.logilab.fr/ -- mailto:contact@logilab.fr
       
     3 #
       
     4 # This file is part of CubicWeb.
       
     5 #
       
     6 # CubicWeb is free software: you can redistribute it and/or modify it under the
       
     7 # terms of the GNU Lesser General Public License as published by the Free
       
     8 # Software Foundation, either version 2.1 of the License, or (at your option)
       
     9 # any later version.
       
    10 #
       
    11 # CubicWeb is distributed in the hope that it will be useful, but WITHOUT
       
    12 # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
       
    13 # FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for more
       
    14 # details.
       
    15 #
       
    16 # You should have received a copy of the GNU Lesser General Public License along
       
    17 # with CubicWeb.  If not, see <http://www.gnu.org/licenses/>.
       
    18 """Adapters for native cubicweb sources.
       
    19 
       
    20 Notes:
       
    21 * extid (aka external id, the primary key of an entity in the external source
       
    22   from which it comes from) are stored in a varchar column encoded as a base64
       
    23   string. This is because it should actually be Bytes but we want an index on
       
    24   it for fast querying.
       
    25 """
       
    26 from __future__ import print_function
       
    27 
       
    28 __docformat__ = "restructuredtext en"
       
    29 
       
    30 from threading import Lock
       
    31 from datetime import datetime
       
    32 from base64 import b64encode
       
    33 from contextlib import contextmanager
       
    34 from os.path import basename
       
    35 import re
       
    36 import itertools
       
    37 import zipfile
       
    38 import logging
       
    39 import sys
       
    40 
       
    41 from six import PY2, text_type, binary_type, string_types
       
    42 from six.moves import range, cPickle as pickle
       
    43 
       
    44 from logilab.common.decorators import cached, clear_cache
       
    45 from logilab.common.configuration import Method
       
    46 from logilab.common.shellutils import getlogin
       
    47 from logilab.database import get_db_helper, sqlgen
       
    48 
       
    49 from yams.schema import role_name
       
    50 
       
    51 from cubicweb import (UnknownEid, AuthenticationError, ValidationError, Binary,
       
    52                       UniqueTogetherError, UndoTransactionException, ViolatedConstraint)
       
    53 from cubicweb import transaction as tx, server, neg_role
       
    54 from cubicweb.utils import QueryCache
       
    55 from cubicweb.schema import VIRTUAL_RTYPES
       
    56 from cubicweb.cwconfig import CubicWebNoAppConfiguration
       
    57 from cubicweb.server import hook
       
    58 from cubicweb.server import schema2sql as y2sql
       
    59 from cubicweb.server.utils import crypt_password, eschema_eid, verify_and_update
       
    60 from cubicweb.server.sqlutils import SQL_PREFIX, SQLAdapterMixIn
       
    61 from cubicweb.server.rqlannotation import set_qdata
       
    62 from cubicweb.server.hook import CleanupDeletedEidsCacheOp
       
    63 from cubicweb.server.edition import EditedEntity
       
    64 from cubicweb.server.sources import AbstractSource, dbg_st_search, dbg_results
       
    65 from cubicweb.server.sources.rql2sql import SQLGenerator
       
    66 from cubicweb.statsd_logger import statsd_timeit
       
    67 
       
    68 
       
    69 ATTR_MAP = {}
       
    70 NONSYSTEM_ETYPES = set()
       
    71 NONSYSTEM_RELATIONS = set()
       
    72 
       
    73 class LogCursor(object):
       
    74     def __init__(self, cursor):
       
    75         self.cu = cursor
       
    76 
       
    77     def execute(self, query, args=None):
       
    78         """Execute a query.
       
    79         it's a function just so that it shows up in profiling
       
    80         """
       
    81         if server.DEBUG & server.DBG_SQL:
       
    82             print('exec', query, args)
       
    83         try:
       
    84             self.cu.execute(str(query), args)
       
    85         except Exception as ex:
       
    86             print("sql: %r\n args: %s\ndbms message: %r" % (
       
    87                 query, args, ex.args[0]))
       
    88             raise
       
    89 
       
    90     def fetchall(self):
       
    91         return self.cu.fetchall()
       
    92 
       
    93     def fetchone(self):
       
    94         return self.cu.fetchone()
       
    95 
       
    96 
       
    97 def sql_or_clauses(sql, clauses):
       
    98     select, restr = sql.split(' WHERE ', 1)
       
    99     restrclauses = restr.split(' AND ')
       
   100     for clause in clauses:
       
   101         restrclauses.remove(clause)
       
   102     if restrclauses:
       
   103         restr = '%s AND (%s)' % (' AND '.join(restrclauses),
       
   104                                  ' OR '.join(clauses))
       
   105     else:
       
   106         restr = '(%s)' % ' OR '.join(clauses)
       
   107     return '%s WHERE %s' % (select, restr)
       
   108 
       
   109 
       
   110 def rdef_table_column(rdef):
       
   111     """return table and column used to store the given relation definition in
       
   112     the database
       
   113     """
       
   114     return (SQL_PREFIX + str(rdef.subject),
       
   115             SQL_PREFIX + str(rdef.rtype))
       
   116 
       
   117 
       
   118 def rdef_physical_info(dbhelper, rdef):
       
   119     """return backend type and a boolean flag if NULL values should be allowed
       
   120     for a given relation definition
       
   121     """
       
   122     if not rdef.object.final:
       
   123         return dbhelper.TYPE_MAPPING['Int']
       
   124     coltype = y2sql.type_from_rdef(dbhelper, rdef, creating=False)
       
   125     allownull = rdef.cardinality[0] != '1'
       
   126     return coltype, allownull
       
   127 
       
   128 
       
   129 class _UndoException(Exception):
       
   130     """something went wrong during undoing"""
       
   131 
       
   132     def __unicode__(self):
       
   133         """Called by the unicode builtin; should return a Unicode object
       
   134 
       
   135         Type of _UndoException message must be `unicode` by design in CubicWeb.
       
   136         """
       
   137         assert isinstance(self.args[0], text_type)
       
   138         return self.args[0]
       
   139 
       
   140 
       
   141 def _undo_check_relation_target(tentity, rdef, role):
       
   142     """check linked entity has not been redirected for this relation"""
       
   143     card = rdef.role_cardinality(role)
       
   144     if card in '?1' and tentity.related(rdef.rtype, role):
       
   145         raise _UndoException(tentity._cw._(
       
   146             "Can't restore %(role)s relation %(rtype)s to entity %(eid)s which "
       
   147             "is already linked using this relation.")
       
   148                             % {'role': neg_role(role),
       
   149                                'rtype': rdef.rtype,
       
   150                                'eid': tentity.eid})
       
   151 
       
   152 def _undo_rel_info(cnx, subj, rtype, obj):
       
   153     entities = []
       
   154     for role, eid in (('subject', subj), ('object', obj)):
       
   155         try:
       
   156             entities.append(cnx.entity_from_eid(eid))
       
   157         except UnknownEid:
       
   158             raise _UndoException(cnx._(
       
   159                 "Can't restore relation %(rtype)s, %(role)s entity %(eid)s"
       
   160                 " doesn't exist anymore.")
       
   161                                 % {'role': cnx._(role),
       
   162                                    'rtype': cnx._(rtype),
       
   163                                    'eid': eid})
       
   164     sentity, oentity = entities
       
   165     try:
       
   166         rschema = cnx.vreg.schema.rschema(rtype)
       
   167         rdef = rschema.rdefs[(sentity.cw_etype, oentity.cw_etype)]
       
   168     except KeyError:
       
   169         raise _UndoException(cnx._(
       
   170             "Can't restore relation %(rtype)s between %(subj)s and "
       
   171             "%(obj)s, that relation does not exists anymore in the "
       
   172             "schema.")
       
   173                             % {'rtype': cnx._(rtype),
       
   174                                'subj': subj,
       
   175                                'obj': obj})
       
   176     return sentity, oentity, rdef
       
   177 
       
   178 def _undo_has_later_transaction(cnx, eid):
       
   179     return cnx.system_sql('''\
       
   180 SELECT T.tx_uuid FROM transactions AS TREF, transactions AS T
       
   181 WHERE TREF.tx_uuid='%(txuuid)s' AND T.tx_uuid!='%(txuuid)s'
       
   182 AND T.tx_time>=TREF.tx_time
       
   183 AND (EXISTS(SELECT 1 FROM tx_entity_actions AS TEA
       
   184             WHERE TEA.tx_uuid=T.tx_uuid AND TEA.eid=%(eid)s)
       
   185      OR EXISTS(SELECT 1 FROM tx_relation_actions as TRA
       
   186                WHERE TRA.tx_uuid=T.tx_uuid AND (
       
   187                    TRA.eid_from=%(eid)s OR TRA.eid_to=%(eid)s))
       
   188      )''' % {'txuuid': cnx.transaction_data['undoing_uuid'],
       
   189              'eid': eid}).fetchone()
       
   190 
       
   191 
       
   192 class DefaultEidGenerator(object):
       
   193     __slots__ = ('source', 'cnx', 'lock')
       
   194 
       
   195     def __init__(self, source):
       
   196         self.source = source
       
   197         self.cnx = None
       
   198         self.lock = Lock()
       
   199 
       
   200     def close(self):
       
   201         if self.cnx:
       
   202             self.cnx.close()
       
   203         self.cnx = None
       
   204 
       
   205     def create_eid(self, _cnx, count=1):
       
   206         # lock needed to prevent 'Connection is busy with results for another
       
   207         # command (0)' errors with SQLServer
       
   208         assert count > 0
       
   209         with self.lock:
       
   210             return self._create_eid(count)
       
   211 
       
   212     def _create_eid(self, count):
       
   213         # internal function doing the eid creation without locking.
       
   214         # needed for the recursive handling of disconnections (otherwise we
       
   215         # deadlock on self._eid_cnx_lock
       
   216         source = self.source
       
   217         if self.cnx is None:
       
   218             self.cnx = source.get_connection()
       
   219         cnx = self.cnx
       
   220         try:
       
   221             cursor = cnx.cursor()
       
   222             for sql in source.dbhelper.sqls_increment_numrange('entities_id_seq', count):
       
   223                 cursor.execute(sql)
       
   224             eid = cursor.fetchone()[0]
       
   225         except (source.OperationalError, source.InterfaceError):
       
   226             # FIXME: better detection of deconnection pb
       
   227             source.warning("trying to reconnect create eid connection")
       
   228             self.cnx = None
       
   229             return self._create_eid(count)
       
   230         except source.DbapiError as exc:
       
   231             # We get this one with pyodbc and SQL Server when connection was reset
       
   232             if exc.args[0] == '08S01':
       
   233                 source.warning("trying to reconnect create eid connection")
       
   234                 self.cnx = None
       
   235                 return self._create_eid(count)
       
   236             else:
       
   237                 raise
       
   238         except Exception: # WTF?
       
   239             cnx.rollback()
       
   240             self.cnx = None
       
   241             source.exception('create eid failed in an unforeseen way on SQL statement %s', sql)
       
   242             raise
       
   243         else:
       
   244             cnx.commit()
       
   245             return eid
       
   246 
       
   247 
       
   248 class SQLITEEidGenerator(object):
       
   249     __slots__ = ('source', 'lock')
       
   250 
       
   251     def __init__(self, source):
       
   252         self.source = source
       
   253         self.lock = Lock()
       
   254 
       
   255     def close(self):
       
   256         pass
       
   257 
       
   258     def create_eid(self, cnx, count=1):
       
   259         assert count > 0
       
   260         source = self.source
       
   261         with self.lock:
       
   262             for sql in source.dbhelper.sqls_increment_numrange('entities_id_seq', count):
       
   263                 cursor = source.doexec(cnx, sql)
       
   264             return cursor.fetchone()[0]
       
   265 
       
   266 
       
   267 class NativeSQLSource(SQLAdapterMixIn, AbstractSource):
       
   268     """adapter for source using the native cubicweb schema (see below)
       
   269     """
       
   270     sqlgen_class = SQLGenerator
       
   271     options = (
       
   272         ('db-driver',
       
   273          {'type' : 'string',
       
   274           'default': 'postgres',
       
   275           # XXX use choice type
       
   276           'help': 'database driver (postgres, sqlite, sqlserver2005)',
       
   277           'group': 'native-source', 'level': 0,
       
   278           }),
       
   279         ('db-host',
       
   280          {'type' : 'string',
       
   281           'default': '',
       
   282           'help': 'database host',
       
   283           'group': 'native-source', 'level': 1,
       
   284           }),
       
   285         ('db-port',
       
   286          {'type' : 'string',
       
   287           'default': '',
       
   288           'help': 'database port',
       
   289           'group': 'native-source', 'level': 1,
       
   290           }),
       
   291         ('db-name',
       
   292          {'type' : 'string',
       
   293           'default': Method('default_instance_id'),
       
   294           'help': 'database name',
       
   295           'group': 'native-source', 'level': 0,
       
   296           }),
       
   297         ('db-namespace',
       
   298          {'type' : 'string',
       
   299           'default': '',
       
   300           'help': 'database namespace (schema) name',
       
   301           'group': 'native-source', 'level': 1,
       
   302           }),
       
   303         ('db-user',
       
   304          {'type' : 'string',
       
   305           'default': CubicWebNoAppConfiguration.mode == 'user' and getlogin() or 'cubicweb',
       
   306           'help': 'database user',
       
   307           'group': 'native-source', 'level': 0,
       
   308           }),
       
   309         ('db-password',
       
   310          {'type' : 'password',
       
   311           'default': '',
       
   312           'help': 'database password',
       
   313           'group': 'native-source', 'level': 0,
       
   314           }),
       
   315         ('db-encoding',
       
   316          {'type' : 'string',
       
   317           'default': 'utf8',
       
   318           'help': 'database encoding',
       
   319           'group': 'native-source', 'level': 1,
       
   320           }),
       
   321         ('db-extra-arguments',
       
   322          {'type' : 'string',
       
   323           'default': '',
       
   324           'help': 'set to "Trusted_Connection" if you are using SQLServer and '
       
   325                   'want trusted authentication for the database connection',
       
   326           'group': 'native-source', 'level': 2,
       
   327           }),
       
   328         ('db-statement-timeout',
       
   329          {'type': 'int',
       
   330           'default': 0,
       
   331           'help': 'sql statement timeout, in milliseconds (postgres only)',
       
   332           'group': 'native-source', 'level': 2,
       
   333           }),
       
   334     )
       
   335 
       
   336     def __init__(self, repo, source_config, *args, **kwargs):
       
   337         SQLAdapterMixIn.__init__(self, source_config, repairing=repo.config.repairing)
       
   338         self.authentifiers = [LoginPasswordAuthentifier(self)]
       
   339         if repo.config['allow-email-login']:
       
   340             self.authentifiers.insert(0, EmailPasswordAuthentifier(self))
       
   341         AbstractSource.__init__(self, repo, source_config, *args, **kwargs)
       
   342         # sql generator
       
   343         self._rql_sqlgen = self.sqlgen_class(self.schema, self.dbhelper,
       
   344                                              ATTR_MAP.copy())
       
   345         # full text index helper
       
   346         self.do_fti = not repo.config['delay-full-text-indexation']
       
   347         # sql queries cache
       
   348         self._cache = QueryCache(repo.config['rql-cache-size'])
       
   349         # (etype, attr) / storage mapping
       
   350         self._storages = {}
       
   351         self.binary_to_str = self.dbhelper.dbapi_module.binary_to_str
       
   352         if self.dbdriver == 'sqlite':
       
   353             self.eid_generator = SQLITEEidGenerator(self)
       
   354         else:
       
   355             self.eid_generator = DefaultEidGenerator(self)
       
   356         self.create_eid = self.eid_generator.create_eid
       
   357 
       
   358     def check_config(self, source_entity):
       
   359         """check configuration of source entity"""
       
   360         if source_entity.host_config:
       
   361             msg = source_entity._cw._('the system source has its configuration '
       
   362                                       'stored on the file-system')
       
   363             raise ValidationError(source_entity.eid, {role_name('config', 'subject'): msg})
       
   364 
       
   365     def add_authentifier(self, authentifier):
       
   366         self.authentifiers.append(authentifier)
       
   367         authentifier.source = self
       
   368         authentifier.set_schema(self.schema)
       
   369 
       
   370     def reset_caches(self):
       
   371         """method called during test to reset potential source caches"""
       
   372         self._cache = QueryCache(self.repo.config['rql-cache-size'])
       
   373 
       
   374     def clear_eid_cache(self, eid, etype):
       
   375         """clear potential caches for the given eid"""
       
   376         self._cache.pop('Any X WHERE X eid %s, X is %s' % (eid, etype), None)
       
   377         self._cache.pop('Any X WHERE X eid %s' % eid, None)
       
   378         self._cache.pop('Any %s' % eid, None)
       
   379 
       
   380     @statsd_timeit
       
   381     def sqlexec(self, cnx, sql, args=None):
       
   382         """execute the query and return its result"""
       
   383         return self.process_result(self.doexec(cnx, sql, args))
       
   384 
       
   385     def init_creating(self, cnxset=None):
       
   386         # check full text index availibility
       
   387         if self.do_fti:
       
   388             if cnxset is None:
       
   389                 _cnxset = self.repo._get_cnxset()
       
   390             else:
       
   391                 _cnxset = cnxset
       
   392             if not self.dbhelper.has_fti_table(_cnxset.cu):
       
   393                 if not self.repo.config.creating:
       
   394                     self.critical('no text index table')
       
   395                 self.do_fti = False
       
   396             if cnxset is None:
       
   397                 _cnxset.cnxset_freed()
       
   398                 self.repo._free_cnxset(_cnxset)
       
   399 
       
   400     def backup(self, backupfile, confirm, format='native'):
       
   401         """method called to create a backup of the source's data"""
       
   402         if format == 'portable':
       
   403             # ensure the schema is the one stored in the database: if repository
       
   404             # started in quick_start mode, the file system's one has been loaded
       
   405             # so force reload
       
   406             if self.repo.config.quick_start:
       
   407                 self.repo.set_schema(self.repo.deserialize_schema(),
       
   408                                      resetvreg=False)
       
   409             helper = DatabaseIndependentBackupRestore(self)
       
   410             self.close_source_connections()
       
   411             try:
       
   412                 helper.backup(backupfile)
       
   413             finally:
       
   414                 self.open_source_connections()
       
   415         elif format == 'native':
       
   416             self.close_source_connections()
       
   417             try:
       
   418                 self.backup_to_file(backupfile, confirm)
       
   419             finally:
       
   420                 self.open_source_connections()
       
   421         else:
       
   422             raise ValueError('Unknown format %r' % format)
       
   423 
       
   424 
       
   425     def restore(self, backupfile, confirm, drop, format='native'):
       
   426         """method called to restore a backup of source's data"""
       
   427         if self.repo.config.init_cnxset_pool:
       
   428             self.close_source_connections()
       
   429         try:
       
   430             if format == 'portable':
       
   431                 helper = DatabaseIndependentBackupRestore(self)
       
   432                 helper.restore(backupfile)
       
   433             elif format == 'native':
       
   434                 self.restore_from_file(backupfile, confirm, drop=drop)
       
   435             else:
       
   436                 raise ValueError('Unknown format %r' % format)
       
   437         finally:
       
   438             if self.repo.config.init_cnxset_pool:
       
   439                 self.open_source_connections()
       
   440 
       
   441 
       
   442     def init(self, activated, source_entity):
       
   443         try:
       
   444             # test if 'asource' column exists
       
   445             query = self.dbhelper.sql_add_limit_offset('SELECT asource FROM entities', 1)
       
   446             source_entity._cw.system_sql(query)
       
   447         except Exception as ex:
       
   448             self.eid_type_source = self.eid_type_source_pre_131
       
   449         super(NativeSQLSource, self).init(activated, source_entity)
       
   450         self.init_creating(source_entity._cw.cnxset)
       
   451 
       
   452     def shutdown(self):
       
   453         self.eid_generator.close()
       
   454 
       
   455     # XXX deprecates [un]map_attribute?
       
   456     def map_attribute(self, etype, attr, cb, sourcedb=True):
       
   457         self._rql_sqlgen.attr_map[u'%s.%s' % (etype, attr)] = (cb, sourcedb)
       
   458 
       
   459     def unmap_attribute(self, etype, attr):
       
   460         self._rql_sqlgen.attr_map.pop(u'%s.%s' % (etype, attr), None)
       
   461 
       
   462     def set_storage(self, etype, attr, storage):
       
   463         storage_dict = self._storages.setdefault(etype, {})
       
   464         storage_dict[attr] = storage
       
   465         self.map_attribute(etype, attr,
       
   466                            storage.callback, storage.is_source_callback)
       
   467 
       
   468     def unset_storage(self, etype, attr):
       
   469         self._storages[etype].pop(attr)
       
   470         # if etype has no storage left, remove the entry
       
   471         if not self._storages[etype]:
       
   472             del self._storages[etype]
       
   473         self.unmap_attribute(etype, attr)
       
   474 
       
   475     def storage(self, etype, attr):
       
   476         """return the storage for the given entity type / attribute
       
   477         """
       
   478         try:
       
   479             return self._storages[etype][attr]
       
   480         except KeyError:
       
   481             raise Exception('no custom storage set for %s.%s' % (etype, attr))
       
   482 
       
   483     # ISource interface #######################################################
       
   484 
       
   485     @statsd_timeit
       
   486     def compile_rql(self, rql, sols):
       
   487         rqlst = self.repo.vreg.rqlhelper.parse(rql)
       
   488         rqlst.restricted_vars = ()
       
   489         rqlst.children[0].solutions = sols
       
   490         self.repo.querier.sqlgen_annotate(rqlst)
       
   491         set_qdata(self.schema.rschema, rqlst, ())
       
   492         return rqlst
       
   493 
       
   494     def set_schema(self, schema):
       
   495         """set the instance'schema"""
       
   496         self._cache = QueryCache(self.repo.config['rql-cache-size'])
       
   497         self.cache_hit, self.cache_miss, self.no_cache = 0, 0, 0
       
   498         self.schema = schema
       
   499         try:
       
   500             self._rql_sqlgen.schema = schema
       
   501         except AttributeError:
       
   502             pass # __init__
       
   503         for authentifier in self.authentifiers:
       
   504             authentifier.set_schema(self.schema)
       
   505         clear_cache(self, 'need_fti_indexation')
       
   506 
       
   507     def support_entity(self, etype, write=False):
       
   508         """return true if the given entity's type is handled by this adapter
       
   509         if write is true, return true only if it's a RW support
       
   510         """
       
   511         return not etype in NONSYSTEM_ETYPES
       
   512 
       
   513     def support_relation(self, rtype, write=False):
       
   514         """return true if the given relation's type is handled by this adapter
       
   515         if write is true, return true only if it's a RW support
       
   516         """
       
   517         if write:
       
   518             return not rtype in NONSYSTEM_RELATIONS
       
   519         # due to current multi-sources implementation, the system source
       
   520         # can't claim not supporting a relation
       
   521         return True #not rtype == 'content_for'
       
   522 
       
   523     @statsd_timeit
       
   524     def authenticate(self, cnx, login, **kwargs):
       
   525         """return CWUser eid for the given login and other authentication
       
   526         information found in kwargs, else raise `AuthenticationError`
       
   527         """
       
   528         for authentifier in self.authentifiers:
       
   529             try:
       
   530                 return authentifier.authenticate(cnx, login, **kwargs)
       
   531             except AuthenticationError:
       
   532                 continue
       
   533         raise AuthenticationError()
       
   534 
       
   535     def syntax_tree_search(self, cnx, union, args=None, cachekey=None,
       
   536                            varmap=None):
       
   537         """return result from this source for a rql query (actually from
       
   538         a rql syntax tree and a solution dictionary mapping each used
       
   539         variable to a possible type). If cachekey is given, the query
       
   540         necessary to fetch the results (but not the results themselves)
       
   541         may be cached using this key.
       
   542         """
       
   543         assert dbg_st_search(self.uri, union, varmap, args, cachekey)
       
   544         # remember number of actually selected term (sql generation may append some)
       
   545         if cachekey is None:
       
   546             self.no_cache += 1
       
   547             # generate sql query if we are able to do so (not supported types...)
       
   548             sql, qargs, cbs = self._rql_sqlgen.generate(union, args, varmap)
       
   549         else:
       
   550             # sql may be cached
       
   551             try:
       
   552                 sql, qargs, cbs = self._cache[cachekey]
       
   553                 self.cache_hit += 1
       
   554             except KeyError:
       
   555                 self.cache_miss += 1
       
   556                 sql, qargs, cbs = self._rql_sqlgen.generate(union, args, varmap)
       
   557                 self._cache[cachekey] = sql, qargs, cbs
       
   558         args = self.merge_args(args, qargs)
       
   559         assert isinstance(sql, string_types), repr(sql)
       
   560         cursor = self.doexec(cnx, sql, args)
       
   561         results = self.process_result(cursor, cnx, cbs)
       
   562         assert dbg_results(results)
       
   563         return results
       
   564 
       
   565     @contextmanager
       
   566     def _fixup_cw(self, cnx, entity):
       
   567         _cw = entity._cw
       
   568         entity._cw = cnx
       
   569         try:
       
   570             yield
       
   571         finally:
       
   572             entity._cw = _cw
       
   573 
       
   574     @contextmanager
       
   575     def _storage_handler(self, cnx, entity, event):
       
   576         # 1/ memorize values as they are before the storage is called.
       
   577         #    For instance, the BFSStorage will replace the `data`
       
   578         #    binary value with a Binary containing the destination path
       
   579         #    on the filesystem. To make the entity.data usage absolutely
       
   580         #    transparent, we'll have to reset entity.data to its binary
       
   581         #    value once the SQL query will be executed
       
   582         restore_values = []
       
   583         if isinstance(entity, list):
       
   584             entities = entity
       
   585         else:
       
   586             entities = [entity]
       
   587         etype = entities[0].__regid__
       
   588         for attr, storage in self._storages.get(etype, {}).items():
       
   589             for entity in entities:
       
   590                 with self._fixup_cw(cnx, entity):
       
   591                     if event == 'deleted':
       
   592                         storage.entity_deleted(entity, attr)
       
   593                     else:
       
   594                         edited = entity.cw_edited
       
   595                         if attr in edited:
       
   596                             handler = getattr(storage, 'entity_%s' % event)
       
   597                             to_restore = handler(entity, attr)
       
   598                             restore_values.append((entity, attr, to_restore))
       
   599         try:
       
   600             yield # 2/ execute the source's instructions
       
   601         finally:
       
   602             # 3/ restore original values
       
   603             for entity, attr, value in restore_values:
       
   604                 entity.cw_edited.edited_attribute(attr, value)
       
   605 
       
   606     def add_entity(self, cnx, entity):
       
   607         """add a new entity to the source"""
       
   608         with self._storage_handler(cnx, entity, 'added'):
       
   609             attrs = self.preprocess_entity(entity)
       
   610             sql = self.sqlgen.insert(SQL_PREFIX + entity.cw_etype, attrs)
       
   611             self.doexec(cnx, sql, attrs)
       
   612             if cnx.ertype_supports_undo(entity.cw_etype):
       
   613                 self._record_tx_action(cnx, 'tx_entity_actions', u'C',
       
   614                                        etype=text_type(entity.cw_etype), eid=entity.eid)
       
   615 
       
   616     def update_entity(self, cnx, entity):
       
   617         """replace an entity in the source"""
       
   618         with self._storage_handler(cnx, entity, 'updated'):
       
   619             attrs = self.preprocess_entity(entity)
       
   620             if cnx.ertype_supports_undo(entity.cw_etype):
       
   621                 changes = self._save_attrs(cnx, entity, attrs)
       
   622                 self._record_tx_action(cnx, 'tx_entity_actions', u'U',
       
   623                                        etype=text_type(entity.cw_etype), eid=entity.eid,
       
   624                                        changes=self._binary(pickle.dumps(changes)))
       
   625             sql = self.sqlgen.update(SQL_PREFIX + entity.cw_etype, attrs,
       
   626                                      ['cw_eid'])
       
   627             self.doexec(cnx, sql, attrs)
       
   628 
       
   629     def delete_entity(self, cnx, entity):
       
   630         """delete an entity from the source"""
       
   631         with self._storage_handler(cnx, entity, 'deleted'):
       
   632             if cnx.ertype_supports_undo(entity.cw_etype):
       
   633                 attrs = [SQL_PREFIX + r.type
       
   634                          for r in entity.e_schema.subject_relations()
       
   635                          if (r.final or r.inlined) and not r in VIRTUAL_RTYPES]
       
   636                 changes = self._save_attrs(cnx, entity, attrs)
       
   637                 self._record_tx_action(cnx, 'tx_entity_actions', u'D',
       
   638                                        etype=text_type(entity.cw_etype), eid=entity.eid,
       
   639                                        changes=self._binary(pickle.dumps(changes)))
       
   640             attrs = {'cw_eid': entity.eid}
       
   641             sql = self.sqlgen.delete(SQL_PREFIX + entity.cw_etype, attrs)
       
   642             self.doexec(cnx, sql, attrs)
       
   643 
       
   644     def add_relation(self, cnx, subject, rtype, object, inlined=False):
       
   645         """add a relation to the source"""
       
   646         self._add_relations(cnx,  rtype, [(subject, object)], inlined)
       
   647         if cnx.ertype_supports_undo(rtype):
       
   648             self._record_tx_action(cnx, 'tx_relation_actions', u'A',
       
   649                                    eid_from=subject, rtype=text_type(rtype), eid_to=object)
       
   650 
       
   651     def add_relations(self, cnx,  rtype, subj_obj_list, inlined=False):
       
   652         """add a relations to the source"""
       
   653         self._add_relations(cnx, rtype, subj_obj_list, inlined)
       
   654         if cnx.ertype_supports_undo(rtype):
       
   655             for subject, object in subj_obj_list:
       
   656                 self._record_tx_action(cnx, 'tx_relation_actions', u'A',
       
   657                                        eid_from=subject, rtype=text_type(rtype), eid_to=object)
       
   658 
       
   659     def _add_relations(self, cnx, rtype, subj_obj_list, inlined=False):
       
   660         """add a relation to the source"""
       
   661         sql = []
       
   662         if inlined is False:
       
   663             attrs = [{'eid_from': subject, 'eid_to': object}
       
   664                      for subject, object in subj_obj_list]
       
   665             sql.append((self.sqlgen.insert('%s_relation' % rtype, attrs[0]), attrs))
       
   666         else: # used by data import
       
   667             etypes = {}
       
   668             for subject, object in subj_obj_list:
       
   669                 etype = cnx.entity_metas(subject)['type']
       
   670                 if etype in etypes:
       
   671                     etypes[etype].append((subject, object))
       
   672                 else:
       
   673                     etypes[etype] = [(subject, object)]
       
   674             for subj_etype, subj_obj_list in etypes.items():
       
   675                 attrs = [{'cw_eid': subject, SQL_PREFIX + rtype: object}
       
   676                          for subject, object in subj_obj_list]
       
   677                 sql.append((self.sqlgen.update(SQL_PREFIX + etype, attrs[0],
       
   678                                      ['cw_eid']),
       
   679                             attrs))
       
   680         for statement, attrs in sql:
       
   681             self.doexecmany(cnx, statement, attrs)
       
   682 
       
   683     def delete_relation(self, cnx, subject, rtype, object):
       
   684         """delete a relation from the source"""
       
   685         rschema = self.schema.rschema(rtype)
       
   686         self._delete_relation(cnx, subject, rtype, object, rschema.inlined)
       
   687         if cnx.ertype_supports_undo(rtype):
       
   688             self._record_tx_action(cnx, 'tx_relation_actions', u'R',
       
   689                                    eid_from=subject, rtype=text_type(rtype), eid_to=object)
       
   690 
       
   691     def _delete_relation(self, cnx, subject, rtype, object, inlined=False):
       
   692         """delete a relation from the source"""
       
   693         if inlined:
       
   694             table = SQL_PREFIX + cnx.entity_metas(subject)['type']
       
   695             column = SQL_PREFIX + rtype
       
   696             sql = 'UPDATE %s SET %s=NULL WHERE %seid=%%(eid)s' % (table, column,
       
   697                                                                   SQL_PREFIX)
       
   698             attrs = {'eid' : subject}
       
   699         else:
       
   700             attrs = {'eid_from': subject, 'eid_to': object}
       
   701             sql = self.sqlgen.delete('%s_relation' % rtype, attrs)
       
   702         self.doexec(cnx, sql, attrs)
       
   703 
       
   704     @statsd_timeit
       
   705     def doexec(self, cnx, query, args=None, rollback=True):
       
   706         """Execute a query.
       
   707         it's a function just so that it shows up in profiling
       
   708         """
       
   709         cursor = cnx.cnxset.cu
       
   710         if server.DEBUG & server.DBG_SQL:
       
   711             print('exec', query, args, cnx.cnxset.cnx)
       
   712         try:
       
   713             # str(query) to avoid error if it's a unicode string
       
   714             cursor.execute(str(query), args)
       
   715         except Exception as ex:
       
   716             if self.repo.config.mode != 'test':
       
   717                 # during test we get those message when trying to alter sqlite
       
   718                 # db schema
       
   719                 self.info("sql: %r\n args: %s\ndbms message: %r",
       
   720                               query, args, ex.args[0])
       
   721             if rollback:
       
   722                 try:
       
   723                     cnx.cnxset.rollback()
       
   724                     if self.repo.config.mode != 'test':
       
   725                         self.debug('transaction has been rolled back')
       
   726                 except Exception as ex:
       
   727                     pass
       
   728             if ex.__class__.__name__ == 'IntegrityError':
       
   729                 # need string comparison because of various backends
       
   730                 for arg in ex.args:
       
   731                     # postgres, sqlserver
       
   732                     mo = re.search("unique_[a-z0-9]{32}", arg)
       
   733                     if mo is not None:
       
   734                         raise UniqueTogetherError(cnx, cstrname=mo.group(0))
       
   735                     # old sqlite
       
   736                     mo = re.search('columns? (.*) (?:is|are) not unique', arg)
       
   737                     if mo is not None: # sqlite in use
       
   738                         # we left chop the 'cw_' prefix of attribute names
       
   739                         rtypes = [c.strip()[3:]
       
   740                                   for c in mo.group(1).split(',')]
       
   741                         raise UniqueTogetherError(cnx, rtypes=rtypes)
       
   742                     # sqlite after http://www.sqlite.org/cgi/src/info/c80e229dd9c1230a
       
   743                     if arg.startswith('UNIQUE constraint failed:'):
       
   744                         # message looks like: "UNIQUE constraint failed: foo.cw_bar, foo.cw_baz"
       
   745                         # so drop the prefix, split on comma, drop the tablenames, and drop "cw_"
       
   746                         columns = arg.split(':', 1)[1].split(',')
       
   747                         rtypes = [c.split('.', 1)[1].strip()[3:] for c in columns]
       
   748                         raise UniqueTogetherError(cnx, rtypes=rtypes)
       
   749 
       
   750                     mo = re.search('"cstr[a-f0-9]{32}"', arg)
       
   751                     if mo is not None:
       
   752                         # postgresql
       
   753                         raise ViolatedConstraint(cnx, cstrname=mo.group(0)[1:-1])
       
   754                     if arg.startswith('CHECK constraint failed:'):
       
   755                         # sqlite3 (new)
       
   756                         raise ViolatedConstraint(cnx, cstrname=arg.split(':', 1)[1].strip())
       
   757                     mo = re.match('^constraint (cstr.*) failed$', arg)
       
   758                     if mo is not None:
       
   759                         # sqlite3 (old)
       
   760                         raise ViolatedConstraint(cnx, cstrname=mo.group(1))
       
   761             raise
       
   762         return cursor
       
   763 
       
   764     @statsd_timeit
       
   765     def doexecmany(self, cnx, query, args):
       
   766         """Execute a query.
       
   767         it's a function just so that it shows up in profiling
       
   768         """
       
   769         if server.DEBUG & server.DBG_SQL:
       
   770             print('execmany', query, 'with', len(args), 'arguments', cnx.cnxset.cnx)
       
   771         cursor = cnx.cnxset.cu
       
   772         try:
       
   773             # str(query) to avoid error if it's a unicode string
       
   774             cursor.executemany(str(query), args)
       
   775         except Exception as ex:
       
   776             if self.repo.config.mode != 'test':
       
   777                 # during test we get those message when trying to alter sqlite
       
   778                 # db schema
       
   779                 self.critical("sql many: %r\n args: %s\ndbms message: %r",
       
   780                               query, args, ex.args[0])
       
   781             try:
       
   782                 cnx.cnxset.rollback()
       
   783                 if self.repo.config.mode != 'test':
       
   784                     self.critical('transaction has been rolled back')
       
   785             except Exception:
       
   786                 pass
       
   787             raise
       
   788 
       
   789     # short cut to method requiring advanced db helper usage ##################
       
   790 
       
   791     def update_rdef_column(self, cnx, rdef):
       
   792         """update physical column for a relation definition (final or inlined)
       
   793         """
       
   794         table, column = rdef_table_column(rdef)
       
   795         coltype, allownull = rdef_physical_info(self.dbhelper, rdef)
       
   796         if not self.dbhelper.alter_column_support:
       
   797             self.error("backend can't alter %s.%s to %s%s", table, column, coltype,
       
   798                        not allownull and 'NOT NULL' or '')
       
   799             return
       
   800         self.dbhelper.change_col_type(LogCursor(cnx.cnxset.cu),
       
   801                                       table, column, coltype, allownull)
       
   802         self.info('altered %s.%s: now %s%s', table, column, coltype,
       
   803                   not allownull and 'NOT NULL' or '')
       
   804 
       
   805     def update_rdef_null_allowed(self, cnx, rdef):
       
   806         """update NULL / NOT NULL of physical column for a relation definition
       
   807         (final or inlined)
       
   808         """
       
   809         if not self.dbhelper.alter_column_support:
       
   810             # not supported (and NOT NULL not set by yams in that case, so no
       
   811             # worry)
       
   812             return
       
   813         table, column = rdef_table_column(rdef)
       
   814         coltype, allownull = rdef_physical_info(self.dbhelper, rdef)
       
   815         self.dbhelper.set_null_allowed(LogCursor(cnx.cnxset.cu),
       
   816                                        table, column, coltype, allownull)
       
   817 
       
   818     def update_rdef_indexed(self, cnx, rdef):
       
   819         table, column = rdef_table_column(rdef)
       
   820         if rdef.indexed:
       
   821             self.create_index(cnx, table, column)
       
   822         else:
       
   823             self.drop_index(cnx, table, column)
       
   824 
       
   825     def update_rdef_unique(self, cnx, rdef):
       
   826         table, column = rdef_table_column(rdef)
       
   827         if rdef.constraint_by_type('UniqueConstraint'):
       
   828             self.create_index(cnx, table, column, unique=True)
       
   829         else:
       
   830             self.drop_index(cnx, table, column, unique=True)
       
   831 
       
   832     def create_index(self, cnx, table, column, unique=False):
       
   833         cursor = LogCursor(cnx.cnxset.cu)
       
   834         self.dbhelper.create_index(cursor, table, column, unique)
       
   835 
       
   836     def drop_index(self, cnx, table, column, unique=False):
       
   837         cursor = LogCursor(cnx.cnxset.cu)
       
   838         self.dbhelper.drop_index(cursor, table, column, unique)
       
   839 
       
   840     # system source interface #################################################
       
   841 
       
   842     def _eid_type_source(self, cnx, eid, sql):
       
   843         try:
       
   844             res = self.doexec(cnx, sql).fetchone()
       
   845             if res is not None:
       
   846                 return res
       
   847         except Exception:
       
   848             self.exception('failed to query entities table for eid %s', eid)
       
   849         raise UnknownEid(eid)
       
   850 
       
   851     def eid_type_source(self, cnx, eid): # pylint: disable=E0202
       
   852         """return a tuple (type, extid, source) for the entity with id <eid>"""
       
   853         sql = 'SELECT type, extid, asource FROM entities WHERE eid=%s' % eid
       
   854         res = self._eid_type_source(cnx, eid, sql)
       
   855         if not isinstance(res, list):
       
   856             res = list(res)
       
   857         res[-2] = self.decode_extid(res[-2])
       
   858         return res
       
   859 
       
   860     def eid_type_source_pre_131(self, cnx, eid):
       
   861         """return a tuple (type, extid, source) for the entity with id <eid>"""
       
   862         sql = 'SELECT type, extid FROM entities WHERE eid=%s' % eid
       
   863         res = self._eid_type_source(cnx, eid, sql)
       
   864         if not isinstance(res, list):
       
   865             res = list(res)
       
   866         res[-1] = self.decode_extid(res[-1])
       
   867         res.append("system")
       
   868         return res
       
   869 
       
   870     def extid2eid(self, cnx, extid):
       
   871         """get eid from an external id. Return None if no record found."""
       
   872         assert isinstance(extid, binary_type)
       
   873         args = {'x': b64encode(extid).decode('ascii')}
       
   874         cursor = self.doexec(cnx,
       
   875                              'SELECT eid FROM entities WHERE extid=%(x)s',
       
   876                              args)
       
   877         # XXX testing rowcount cause strange bug with sqlite, results are there
       
   878         #     but rowcount is 0
       
   879         #if cursor.rowcount > 0:
       
   880         try:
       
   881             result = cursor.fetchone()
       
   882             if result:
       
   883                 return result[0]
       
   884         except Exception:
       
   885             pass
       
   886         cursor = self.doexec(cnx,
       
   887                              'SELECT eid FROM moved_entities WHERE extid=%(x)s',
       
   888                              args)
       
   889         try:
       
   890             result = cursor.fetchone()
       
   891             if result:
       
   892                 # entity was moved to the system source, return negative
       
   893                 # number to tell the external source to ignore it
       
   894                 return -result[0]
       
   895         except Exception:
       
   896             pass
       
   897         return None
       
   898 
       
   899     def _handle_is_relation_sql(self, cnx, sql, attrs):
       
   900         """ Handler for specific is_relation sql that may be
       
   901         overwritten in some stores"""
       
   902         self.doexec(cnx, sql % attrs)
       
   903 
       
   904     _handle_insert_entity_sql = doexec
       
   905     _handle_is_instance_of_sql = _handle_source_relation_sql = _handle_is_relation_sql
       
   906 
       
   907     def add_info(self, cnx, entity, source, extid):
       
   908         """add type and source info for an eid into the system table"""
       
   909         assert cnx.cnxset is not None
       
   910         # begin by inserting eid/type/source/extid into the entities table
       
   911         if extid is not None:
       
   912             assert isinstance(extid, binary_type)
       
   913             extid = b64encode(extid).decode('ascii')
       
   914         attrs = {'type': text_type(entity.cw_etype), 'eid': entity.eid, 'extid': extid,
       
   915                  'asource': text_type(source.uri)}
       
   916         self._handle_insert_entity_sql(cnx, self.sqlgen.insert('entities', attrs), attrs)
       
   917         # insert core relations: is, is_instance_of and cw_source
       
   918         try:
       
   919             self._handle_is_relation_sql(cnx, 'INSERT INTO is_relation(eid_from,eid_to) VALUES (%s,%s)',
       
   920                                          (entity.eid, eschema_eid(cnx, entity.e_schema)))
       
   921         except IndexError:
       
   922             # during schema serialization, skip
       
   923             pass
       
   924         else:
       
   925             for eschema in entity.e_schema.ancestors() + [entity.e_schema]:
       
   926                 self._handle_is_relation_sql(cnx,
       
   927                                              'INSERT INTO is_instance_of_relation(eid_from,eid_to) VALUES (%s,%s)',
       
   928                                              (entity.eid, eschema_eid(cnx, eschema)))
       
   929         if 'CWSource' in self.schema and source.eid is not None: # else, cw < 3.10
       
   930             self._handle_is_relation_sql(cnx, 'INSERT INTO cw_source_relation(eid_from,eid_to) VALUES (%s,%s)',
       
   931                                          (entity.eid, source.eid))
       
   932         # now we can update the full text index
       
   933         if self.need_fti_indexation(entity.cw_etype):
       
   934             self.index_entity(cnx, entity=entity)
       
   935 
       
   936     def update_info(self, cnx, entity, need_fti_update):
       
   937         """mark entity as being modified, fulltext reindex if needed"""
       
   938         if need_fti_update:
       
   939             # reindex the entity only if this query is updating at least
       
   940             # one indexable attribute
       
   941             self.index_entity(cnx, entity=entity)
       
   942 
       
   943     def delete_info_multi(self, cnx, entities):
       
   944         """delete system information on deletion of a list of entities with the
       
   945         same etype and belinging to the same source
       
   946 
       
   947         * update the fti
       
   948         * remove record from the `entities` table
       
   949         """
       
   950         self.fti_unindex_entities(cnx, entities)
       
   951         attrs = {'eid': '(%s)' % ','.join([str(_e.eid) for _e in entities])}
       
   952         self.doexec(cnx, self.sqlgen.delete_many('entities', attrs), attrs)
       
   953 
       
   954     # undo support #############################################################
       
   955 
       
   956     def undoable_transactions(self, cnx, ueid=None, **actionfilters):
       
   957         """See :class:`cubicweb.repoapi.Connection.undoable_transactions`"""
       
   958         # force filtering to connection's user if not a manager
       
   959         if not cnx.user.is_in_group('managers'):
       
   960             ueid = cnx.user.eid
       
   961         restr = {}
       
   962         if ueid is not None:
       
   963             restr['tx_user'] = ueid
       
   964         sql = self.sqlgen.select('transactions', restr, ('tx_uuid', 'tx_time', 'tx_user'))
       
   965         if actionfilters:
       
   966             # we will need subqueries to filter transactions according to
       
   967             # actions done
       
   968             tearestr = {} # filters on the tx_entity_actions table
       
   969             trarestr = {} # filters on the tx_relation_actions table
       
   970             genrestr = {} # generic filters, appliyable to both table
       
   971             # unless public explicitly set to false, we only consider public
       
   972             # actions
       
   973             if actionfilters.pop('public', True):
       
   974                 genrestr['txa_public'] = True
       
   975             # put additional filters in trarestr and/or tearestr
       
   976             for key, val in actionfilters.items():
       
   977                 if key == 'etype':
       
   978                     # filtering on etype implies filtering on entity actions
       
   979                     # only, and with no eid specified
       
   980                     assert actionfilters.get('action', 'C') in 'CUD'
       
   981                     assert not 'eid' in actionfilters
       
   982                     tearestr['etype'] = text_type(val)
       
   983                 elif key == 'eid':
       
   984                     # eid filter may apply to 'eid' of tx_entity_actions or to
       
   985                     # 'eid_from' OR 'eid_to' of tx_relation_actions
       
   986                     if actionfilters.get('action', 'C') in 'CUD':
       
   987                         tearestr['eid'] = val
       
   988                     if actionfilters.get('action', 'A') in 'AR':
       
   989                         trarestr['eid_from'] = val
       
   990                         trarestr['eid_to'] = val
       
   991                 elif key == 'action':
       
   992                     if val in 'CUD':
       
   993                         tearestr['txa_action'] = text_type(val)
       
   994                     else:
       
   995                         assert val in 'AR'
       
   996                         trarestr['txa_action'] = text_type(val)
       
   997                 else:
       
   998                     raise AssertionError('unknow filter %s' % key)
       
   999             assert trarestr or tearestr, "can't only filter on 'public'"
       
  1000             subqsqls = []
       
  1001             # append subqueries to the original query, using EXISTS()
       
  1002             if trarestr or (genrestr and not tearestr):
       
  1003                 trarestr.update(genrestr)
       
  1004                 trasql = self.sqlgen.select('tx_relation_actions', trarestr, ('1',))
       
  1005                 if 'eid_from' in trarestr:
       
  1006                     # replace AND by OR between eid_from/eid_to restriction
       
  1007                     trasql = sql_or_clauses(trasql, ['eid_from = %(eid_from)s',
       
  1008                                                      'eid_to = %(eid_to)s'])
       
  1009                 trasql += ' AND transactions.tx_uuid=tx_relation_actions.tx_uuid'
       
  1010                 subqsqls.append('EXISTS(%s)' % trasql)
       
  1011             if tearestr or (genrestr and not trarestr):
       
  1012                 tearestr.update(genrestr)
       
  1013                 teasql = self.sqlgen.select('tx_entity_actions', tearestr, ('1',))
       
  1014                 teasql += ' AND transactions.tx_uuid=tx_entity_actions.tx_uuid'
       
  1015                 subqsqls.append('EXISTS(%s)' % teasql)
       
  1016             if restr:
       
  1017                 sql += ' AND %s' % ' OR '.join(subqsqls)
       
  1018             else:
       
  1019                 sql += ' WHERE %s' % ' OR '.join(subqsqls)
       
  1020             restr.update(trarestr)
       
  1021             restr.update(tearestr)
       
  1022         # we want results ordered by transaction's time descendant
       
  1023         sql += ' ORDER BY tx_time DESC'
       
  1024         cu = self.doexec(cnx, sql, restr)
       
  1025         # turn results into transaction objects
       
  1026         return [tx.Transaction(cnx, *args) for args in cu.fetchall()]
       
  1027 
       
  1028     def tx_info(self, cnx, txuuid):
       
  1029         """See :class:`cubicweb.repoapi.Connection.transaction_info`"""
       
  1030         return tx.Transaction(cnx, txuuid, *self._tx_info(cnx, text_type(txuuid)))
       
  1031 
       
  1032     def tx_actions(self, cnx, txuuid, public):
       
  1033         """See :class:`cubicweb.repoapi.Connection.transaction_actions`"""
       
  1034         txuuid = text_type(txuuid)
       
  1035         self._tx_info(cnx, txuuid)
       
  1036         restr = {'tx_uuid': txuuid}
       
  1037         if public:
       
  1038             restr['txa_public'] = True
       
  1039         # XXX use generator to avoid loading everything in memory?
       
  1040         sql = self.sqlgen.select('tx_entity_actions', restr,
       
  1041                                  ('txa_action', 'txa_public', 'txa_order',
       
  1042                                   'etype', 'eid', 'changes'))
       
  1043         with cnx.ensure_cnx_set:
       
  1044             cu = self.doexec(cnx, sql, restr)
       
  1045             actions = [tx.EntityAction(a,p,o,et,e,c and pickle.loads(self.binary_to_str(c)))
       
  1046                        for a,p,o,et,e,c in cu.fetchall()]
       
  1047         sql = self.sqlgen.select('tx_relation_actions', restr,
       
  1048                                  ('txa_action', 'txa_public', 'txa_order',
       
  1049                                   'rtype', 'eid_from', 'eid_to'))
       
  1050         with cnx.ensure_cnx_set:
       
  1051             cu = self.doexec(cnx, sql, restr)
       
  1052             actions += [tx.RelationAction(*args) for args in cu.fetchall()]
       
  1053         return sorted(actions, key=lambda x: x.order)
       
  1054 
       
  1055     def undo_transaction(self, cnx, txuuid):
       
  1056         """See :class:`cubicweb.repoapi.Connection.undo_transaction`
       
  1057 
       
  1058         important note: while undoing of a transaction, only hooks in the
       
  1059         'integrity', 'activeintegrity' and 'undo' categories are called.
       
  1060         """
       
  1061         errors = []
       
  1062         cnx.transaction_data['undoing_uuid'] = txuuid
       
  1063         with cnx.deny_all_hooks_but('integrity', 'activeintegrity', 'undo'):
       
  1064             with cnx.security_enabled(read=False):
       
  1065                 for action in reversed(self.tx_actions(cnx, txuuid, False)):
       
  1066                     undomethod = getattr(self, '_undo_%s' % action.action.lower())
       
  1067                     errors += undomethod(cnx, action)
       
  1068         # remove the transactions record
       
  1069         self.doexec(cnx,
       
  1070                     "DELETE FROM transactions WHERE tx_uuid='%s'" % txuuid)
       
  1071         if errors:
       
  1072             raise UndoTransactionException(txuuid, errors)
       
  1073         else:
       
  1074             return
       
  1075 
       
  1076     def start_undoable_transaction(self, cnx, uuid):
       
  1077         """connection callback to insert a transaction record in the transactions
       
  1078         table when some undoable transaction is started
       
  1079         """
       
  1080         ueid = cnx.user.eid
       
  1081         attrs = {'tx_uuid': uuid, 'tx_user': ueid, 'tx_time': datetime.utcnow()}
       
  1082         self.doexec(cnx, self.sqlgen.insert('transactions', attrs), attrs)
       
  1083 
       
  1084     def _save_attrs(self, cnx, entity, attrs):
       
  1085         """return a pickleable dictionary containing current values for given
       
  1086         attributes of the entity
       
  1087         """
       
  1088         restr = {'cw_eid': entity.eid}
       
  1089         sql = self.sqlgen.select(SQL_PREFIX + entity.cw_etype, restr, attrs)
       
  1090         cu = self.doexec(cnx, sql, restr)
       
  1091         values = dict(zip(attrs, cu.fetchone()))
       
  1092         # ensure backend specific binary are converted back to string
       
  1093         eschema = entity.e_schema
       
  1094         for column in attrs:
       
  1095             # [3:] remove 'cw_' prefix
       
  1096             attr = column[3:]
       
  1097             if not eschema.subjrels[attr].final:
       
  1098                 continue
       
  1099             if eschema.destination(attr) in ('Password', 'Bytes'):
       
  1100                 value = values[column]
       
  1101                 if value is not None:
       
  1102                     values[column] = self.binary_to_str(value)
       
  1103         return values
       
  1104 
       
  1105     def _record_tx_action(self, cnx, table, action, **kwargs):
       
  1106         """record a transaction action in the given table (either
       
  1107         'tx_entity_actions' or 'tx_relation_action')
       
  1108         """
       
  1109         kwargs['tx_uuid'] = cnx.transaction_uuid()
       
  1110         kwargs['txa_action'] = action
       
  1111         kwargs['txa_order'] = cnx.transaction_inc_action_counter()
       
  1112         kwargs['txa_public'] = not cnx.hooks_in_progress
       
  1113         self.doexec(cnx, self.sqlgen.insert(table, kwargs), kwargs)
       
  1114 
       
  1115     def _tx_info(self, cnx, txuuid):
       
  1116         """return transaction's time and user of the transaction with the given uuid.
       
  1117 
       
  1118         raise `NoSuchTransaction` if there is no such transaction of if the
       
  1119         connection's user isn't allowed to see it.
       
  1120         """
       
  1121         restr = {'tx_uuid': txuuid}
       
  1122         sql = self.sqlgen.select('transactions', restr,
       
  1123                                  ('tx_time', 'tx_user'))
       
  1124         cu = self.doexec(cnx, sql, restr)
       
  1125         try:
       
  1126             time, ueid = cu.fetchone()
       
  1127         except TypeError:
       
  1128             raise tx.NoSuchTransaction(txuuid)
       
  1129         if not (cnx.user.is_in_group('managers')
       
  1130                 or cnx.user.eid == ueid):
       
  1131             raise tx.NoSuchTransaction(txuuid)
       
  1132         return time, ueid
       
  1133 
       
  1134     def _reedit_entity(self, entity, changes, err):
       
  1135         cnx = entity._cw
       
  1136         eid = entity.eid
       
  1137         entity.cw_edited = edited = EditedEntity(entity)
       
  1138         # check for schema changes, entities linked through inlined relation
       
  1139         # still exists, rewrap binary values
       
  1140         eschema = entity.e_schema
       
  1141         getrschema = eschema.subjrels
       
  1142         for column, value in changes.items():
       
  1143             rtype = column[len(SQL_PREFIX):]
       
  1144             if rtype == "eid":
       
  1145                 continue # XXX should even `eid` be stored in action changes?
       
  1146             try:
       
  1147                 rschema = getrschema[rtype]
       
  1148             except KeyError:
       
  1149                 err(cnx._("can't restore relation %(rtype)s of entity %(eid)s, "
       
  1150                               "this relation does not exist in the schema anymore.")
       
  1151                     % {'rtype': rtype, 'eid': eid})
       
  1152             if not rschema.final:
       
  1153                 if not rschema.inlined:
       
  1154                     assert value is None
       
  1155                 # rschema is an inlined relation
       
  1156                 elif value is not None:
       
  1157                     # not a deletion: we must put something in edited
       
  1158                     try:
       
  1159                         entity._cw.entity_from_eid(value) # check target exists
       
  1160                         edited[rtype] = value
       
  1161                     except UnknownEid:
       
  1162                         err(cnx._("can't restore entity %(eid)s of type %(eschema)s, "
       
  1163                                       "target of %(rtype)s (eid %(value)s) does not exist any longer")
       
  1164                             % locals())
       
  1165                         changes[column] = None
       
  1166             elif eschema.destination(rtype) in ('Bytes', 'Password'):
       
  1167                 changes[column] = self._binary(value)
       
  1168                 edited[rtype] = Binary(value)
       
  1169             elif PY2 and isinstance(value, str):
       
  1170                 edited[rtype] = text_type(value, cnx.encoding, 'replace')
       
  1171             else:
       
  1172                 edited[rtype] = value
       
  1173         # This must only be done after init_entitiy_caches : defered in calling functions
       
  1174         # edited.check()
       
  1175 
       
  1176     def _undo_d(self, cnx, action):
       
  1177         """undo an entity deletion"""
       
  1178         errors = []
       
  1179         err = errors.append
       
  1180         eid = action.eid
       
  1181         etype = action.etype
       
  1182         _ = cnx._
       
  1183         # get an entity instance
       
  1184         try:
       
  1185             entity = self.repo.vreg['etypes'].etype_class(etype)(cnx)
       
  1186         except Exception:
       
  1187             err("can't restore entity %s of type %s, type no more supported"
       
  1188                 % (eid, etype))
       
  1189             return errors
       
  1190         self._reedit_entity(entity, action.changes, err)
       
  1191         entity.eid = eid
       
  1192         cnx.repo.init_entity_caches(cnx, entity, self)
       
  1193         entity.cw_edited.check()
       
  1194         self.repo.hm.call_hooks('before_add_entity', cnx, entity=entity)
       
  1195         # restore the entity
       
  1196         action.changes['cw_eid'] = eid
       
  1197         # restore record in entities (will update fti if needed)
       
  1198         self.add_info(cnx, entity, self, None)
       
  1199         sql = self.sqlgen.insert(SQL_PREFIX + etype, action.changes)
       
  1200         self.doexec(cnx, sql, action.changes)
       
  1201         self.repo.hm.call_hooks('after_add_entity', cnx, entity=entity)
       
  1202         return errors
       
  1203 
       
  1204     def _undo_r(self, cnx, action):
       
  1205         """undo a relation removal"""
       
  1206         errors = []
       
  1207         subj, rtype, obj = action.eid_from, action.rtype, action.eid_to
       
  1208         try:
       
  1209             sentity, oentity, rdef = _undo_rel_info(cnx, subj, rtype, obj)
       
  1210         except _UndoException as ex:
       
  1211             errors.append(text_type(ex))
       
  1212         else:
       
  1213             for role, entity in (('subject', sentity),
       
  1214                                  ('object', oentity)):
       
  1215                 try:
       
  1216                     _undo_check_relation_target(entity, rdef, role)
       
  1217                 except _UndoException as ex:
       
  1218                     errors.append(text_type(ex))
       
  1219                     continue
       
  1220         if not errors:
       
  1221             self.repo.hm.call_hooks('before_add_relation', cnx,
       
  1222                                     eidfrom=subj, rtype=rtype, eidto=obj)
       
  1223             # add relation in the database
       
  1224             self._add_relations(cnx, rtype, [(subj, obj)], rdef.rtype.inlined)
       
  1225             # set related cache
       
  1226             cnx.update_rel_cache_add(subj, rtype, obj, rdef.rtype.symmetric)
       
  1227             self.repo.hm.call_hooks('after_add_relation', cnx,
       
  1228                                     eidfrom=subj, rtype=rtype, eidto=obj)
       
  1229         return errors
       
  1230 
       
  1231     def _undo_c(self, cnx, action):
       
  1232         """undo an entity creation"""
       
  1233         eid = action.eid
       
  1234         # XXX done to avoid fetching all remaining relation for the entity
       
  1235         # we should find an efficient way to do this (keeping current veolidf
       
  1236         # massive deletion performance)
       
  1237         if _undo_has_later_transaction(cnx, eid):
       
  1238             msg = cnx._('some later transaction(s) touch entity, undo them '
       
  1239                             'first')
       
  1240             raise ValidationError(eid, {None: msg})
       
  1241         etype = action.etype
       
  1242         # get an entity instance
       
  1243         try:
       
  1244             entity = self.repo.vreg['etypes'].etype_class(etype)(cnx)
       
  1245         except Exception:
       
  1246             return [cnx._(
       
  1247                 "Can't undo creation of entity %(eid)s of type %(etype)s, type "
       
  1248                 "no more supported" % {'eid': eid, 'etype': etype})]
       
  1249         entity.eid = eid
       
  1250         # for proper eid/type cache update
       
  1251         CleanupDeletedEidsCacheOp.get_instance(cnx).add_data(eid)
       
  1252         self.repo.hm.call_hooks('before_delete_entity', cnx, entity=entity)
       
  1253         # remove is / is_instance_of which are added using sql by hooks, hence
       
  1254         # unvisible as transaction action
       
  1255         self.doexec(cnx, 'DELETE FROM is_relation WHERE eid_from=%s' % eid)
       
  1256         self.doexec(cnx, 'DELETE FROM is_instance_of_relation WHERE eid_from=%s' % eid)
       
  1257         self.doexec(cnx, 'DELETE FROM cw_source_relation WHERE eid_from=%s' % eid)
       
  1258         # XXX check removal of inlined relation?
       
  1259         # delete the entity
       
  1260         attrs = {'cw_eid': eid}
       
  1261         sql = self.sqlgen.delete(SQL_PREFIX + entity.cw_etype, attrs)
       
  1262         self.doexec(cnx, sql, attrs)
       
  1263         # remove record from entities (will update fti if needed)
       
  1264         self.delete_info_multi(cnx, [entity])
       
  1265         self.repo.hm.call_hooks('after_delete_entity', cnx, entity=entity)
       
  1266         return ()
       
  1267 
       
  1268     def _undo_u(self, cnx, action):
       
  1269         """undo an entity update"""
       
  1270         errors = []
       
  1271         err = errors.append
       
  1272         try:
       
  1273             entity = cnx.entity_from_eid(action.eid)
       
  1274         except UnknownEid:
       
  1275             err(cnx._("can't restore state of entity %s, it has been "
       
  1276                           "deleted inbetween") % action.eid)
       
  1277             return errors
       
  1278         self._reedit_entity(entity, action.changes, err)
       
  1279         entity.cw_edited.check()
       
  1280         self.repo.hm.call_hooks('before_update_entity', cnx, entity=entity)
       
  1281         sql = self.sqlgen.update(SQL_PREFIX + entity.cw_etype, action.changes,
       
  1282                                  ['cw_eid'])
       
  1283         self.doexec(cnx, sql, action.changes)
       
  1284         self.repo.hm.call_hooks('after_update_entity', cnx, entity=entity)
       
  1285         return errors
       
  1286 
       
  1287     def _undo_a(self, cnx, action):
       
  1288         """undo a relation addition"""
       
  1289         errors = []
       
  1290         subj, rtype, obj = action.eid_from, action.rtype, action.eid_to
       
  1291         try:
       
  1292             sentity, oentity, rdef = _undo_rel_info(cnx, subj, rtype, obj)
       
  1293         except _UndoException as ex:
       
  1294             errors.append(text_type(ex))
       
  1295         else:
       
  1296             rschema = rdef.rtype
       
  1297             if rschema.inlined:
       
  1298                 sql = 'SELECT 1 FROM cw_%s WHERE cw_eid=%s and cw_%s=%s'\
       
  1299                       % (sentity.cw_etype, subj, rtype, obj)
       
  1300             else:
       
  1301                 sql = 'SELECT 1 FROM %s_relation WHERE eid_from=%s and eid_to=%s'\
       
  1302                       % (rtype, subj, obj)
       
  1303             cu = self.doexec(cnx, sql)
       
  1304             if cu.fetchone() is None:
       
  1305                 errors.append(cnx._(
       
  1306                     "Can't undo addition of relation %(rtype)s from %(subj)s to"
       
  1307                     " %(obj)s, doesn't exist anymore" % locals()))
       
  1308         if not errors:
       
  1309             self.repo.hm.call_hooks('before_delete_relation', cnx,
       
  1310                                     eidfrom=subj, rtype=rtype, eidto=obj)
       
  1311             # delete relation from the database
       
  1312             self._delete_relation(cnx, subj, rtype, obj, rschema.inlined)
       
  1313             # set related cache
       
  1314             cnx.update_rel_cache_del(subj, rtype, obj, rschema.symmetric)
       
  1315             self.repo.hm.call_hooks('after_delete_relation', cnx,
       
  1316                                     eidfrom=subj, rtype=rtype, eidto=obj)
       
  1317         return errors
       
  1318 
       
  1319     # full text index handling #################################################
       
  1320 
       
  1321     @cached
       
  1322     def need_fti_indexation(self, etype):
       
  1323         eschema = self.schema.eschema(etype)
       
  1324         if any(eschema.indexable_attributes()):
       
  1325             return True
       
  1326         if any(eschema.fulltext_containers()):
       
  1327             return True
       
  1328         return False
       
  1329 
       
  1330     def index_entity(self, cnx, entity):
       
  1331         """create an operation to [re]index textual content of the given entity
       
  1332         on commit
       
  1333         """
       
  1334         if self.do_fti:
       
  1335             FTIndexEntityOp.get_instance(cnx).add_data(entity.eid)
       
  1336 
       
  1337     def fti_unindex_entities(self, cnx, entities):
       
  1338         """remove text content for entities from the full text index
       
  1339         """
       
  1340         cursor = cnx.cnxset.cu
       
  1341         cursor_unindex_object = self.dbhelper.cursor_unindex_object
       
  1342         try:
       
  1343             for entity in entities:
       
  1344                 cursor_unindex_object(entity.eid, cursor)
       
  1345         except Exception: # let KeyboardInterrupt / SystemExit propagate
       
  1346             self.exception('error while unindexing %s', entity)
       
  1347 
       
  1348 
       
  1349     def fti_index_entities(self, cnx, entities):
       
  1350         """add text content of created/modified entities to the full text index
       
  1351         """
       
  1352         cursor_index_object = self.dbhelper.cursor_index_object
       
  1353         cursor = cnx.cnxset.cu
       
  1354         try:
       
  1355             # use cursor_index_object, not cursor_reindex_object since
       
  1356             # unindexing done in the FTIndexEntityOp
       
  1357             for entity in entities:
       
  1358                 cursor_index_object(entity.eid,
       
  1359                                     entity.cw_adapt_to('IFTIndexable'),
       
  1360                                     cursor)
       
  1361         except Exception: # let KeyboardInterrupt / SystemExit propagate
       
  1362             self.exception('error while indexing %s', entity)
       
  1363 
       
  1364 
       
  1365 class FTIndexEntityOp(hook.DataOperationMixIn, hook.LateOperation):
       
  1366     """operation to delay entity full text indexation to commit
       
  1367 
       
  1368     since fti indexing may trigger discovery of other entities, it should be
       
  1369     triggered on precommit, not commit, and this should be done after other
       
  1370     precommit operation which may add relations to the entity
       
  1371     """
       
  1372 
       
  1373     def precommit_event(self):
       
  1374         cnx = self.cnx
       
  1375         source = cnx.repo.system_source
       
  1376         pendingeids = cnx.transaction_data.get('pendingeids', ())
       
  1377         done = cnx.transaction_data.setdefault('indexedeids', set())
       
  1378         to_reindex = set()
       
  1379         for eid in self.get_data():
       
  1380             if eid in pendingeids or eid in done:
       
  1381                 # entity added and deleted in the same transaction or already
       
  1382                 # processed
       
  1383                 continue
       
  1384             done.add(eid)
       
  1385             iftindexable = cnx.entity_from_eid(eid).cw_adapt_to('IFTIndexable')
       
  1386             to_reindex |= set(iftindexable.fti_containers())
       
  1387         source.fti_unindex_entities(cnx, to_reindex)
       
  1388         source.fti_index_entities(cnx, to_reindex)
       
  1389 
       
  1390 def sql_schema(driver):
       
  1391     helper = get_db_helper(driver)
       
  1392     typemap = helper.TYPE_MAPPING
       
  1393     schema = """
       
  1394 /* Create the repository's system database */
       
  1395 
       
  1396 %s
       
  1397 
       
  1398 CREATE TABLE entities (
       
  1399   eid INTEGER PRIMARY KEY NOT NULL,
       
  1400   type VARCHAR(64) NOT NULL,
       
  1401   asource VARCHAR(128) NOT NULL,
       
  1402   extid VARCHAR(256)
       
  1403 );;
       
  1404 CREATE INDEX entities_type_idx ON entities(type);;
       
  1405 CREATE TABLE moved_entities (
       
  1406   eid INTEGER PRIMARY KEY NOT NULL,
       
  1407   extid VARCHAR(256) UNIQUE NOT NULL
       
  1408 );;
       
  1409 
       
  1410 CREATE TABLE transactions (
       
  1411   tx_uuid CHAR(32) PRIMARY KEY NOT NULL,
       
  1412   tx_user INTEGER NOT NULL,
       
  1413   tx_time %s NOT NULL
       
  1414 );;
       
  1415 CREATE INDEX transactions_tx_user_idx ON transactions(tx_user);;
       
  1416 CREATE INDEX transactions_tx_time_idx ON transactions(tx_time);;
       
  1417 
       
  1418 CREATE TABLE tx_entity_actions (
       
  1419   tx_uuid CHAR(32) REFERENCES transactions(tx_uuid) ON DELETE CASCADE,
       
  1420   txa_action CHAR(1) NOT NULL,
       
  1421   txa_public %s NOT NULL,
       
  1422   txa_order INTEGER,
       
  1423   eid INTEGER NOT NULL,
       
  1424   etype VARCHAR(64) NOT NULL,
       
  1425   changes %s
       
  1426 );;
       
  1427 CREATE INDEX tx_entity_actions_txa_action_idx ON tx_entity_actions(txa_action);;
       
  1428 CREATE INDEX tx_entity_actions_txa_public_idx ON tx_entity_actions(txa_public);;
       
  1429 CREATE INDEX tx_entity_actions_eid_idx ON tx_entity_actions(eid);;
       
  1430 CREATE INDEX tx_entity_actions_etype_idx ON tx_entity_actions(etype);;
       
  1431 CREATE INDEX tx_entity_actions_tx_uuid_idx ON tx_entity_actions(tx_uuid);;
       
  1432 
       
  1433 CREATE TABLE tx_relation_actions (
       
  1434   tx_uuid CHAR(32) REFERENCES transactions(tx_uuid) ON DELETE CASCADE,
       
  1435   txa_action CHAR(1) NOT NULL,
       
  1436   txa_public %s NOT NULL,
       
  1437   txa_order INTEGER,
       
  1438   eid_from INTEGER NOT NULL,
       
  1439   eid_to INTEGER NOT NULL,
       
  1440   rtype VARCHAR(256) NOT NULL
       
  1441 );;
       
  1442 CREATE INDEX tx_relation_actions_txa_action_idx ON tx_relation_actions(txa_action);;
       
  1443 CREATE INDEX tx_relation_actions_txa_public_idx ON tx_relation_actions(txa_public);;
       
  1444 CREATE INDEX tx_relation_actions_eid_from_idx ON tx_relation_actions(eid_from);;
       
  1445 CREATE INDEX tx_relation_actions_eid_to_idx ON tx_relation_actions(eid_to);;
       
  1446 CREATE INDEX tx_relation_actions_tx_uuid_idx ON tx_relation_actions(tx_uuid);;
       
  1447 """ % (helper.sql_create_numrange('entities_id_seq').replace(';', ';;'),
       
  1448        typemap['Datetime'],
       
  1449        typemap['Boolean'], typemap['Bytes'], typemap['Boolean'])
       
  1450     if helper.backend_name == 'sqlite':
       
  1451         # sqlite support the ON DELETE CASCADE syntax but do nothing
       
  1452         schema += '''
       
  1453 CREATE TRIGGER fkd_transactions
       
  1454 BEFORE DELETE ON transactions
       
  1455 FOR EACH ROW BEGIN
       
  1456     DELETE FROM tx_entity_actions WHERE tx_uuid=OLD.tx_uuid;
       
  1457     DELETE FROM tx_relation_actions WHERE tx_uuid=OLD.tx_uuid;
       
  1458 END;;
       
  1459 '''
       
  1460     schema += ';;'.join(helper.sqls_create_multicol_unique_index('entities', ['extid']))
       
  1461     schema += ';;\n'
       
  1462     return schema
       
  1463 
       
  1464 
       
  1465 def sql_drop_schema(driver):
       
  1466     helper = get_db_helper(driver)
       
  1467     return """
       
  1468 %s;
       
  1469 %s
       
  1470 DROP TABLE entities;
       
  1471 DROP TABLE tx_entity_actions;
       
  1472 DROP TABLE tx_relation_actions;
       
  1473 DROP TABLE transactions;
       
  1474 """ % (';'.join(helper.sqls_drop_multicol_unique_index('entities', ['extid'])),
       
  1475        helper.sql_drop_numrange('entities_id_seq'))
       
  1476 
       
  1477 
       
  1478 def grant_schema(user, set_owner=True):
       
  1479     result = ''
       
  1480     for table in ('entities', 'entities_id_seq',
       
  1481                   'transactions', 'tx_entity_actions', 'tx_relation_actions'):
       
  1482         if set_owner:
       
  1483             result = 'ALTER TABLE %s OWNER TO %s;\n' % (table, user)
       
  1484         result += 'GRANT ALL ON %s TO %s;\n' % (table, user)
       
  1485     return result
       
  1486 
       
  1487 
       
  1488 class BaseAuthentifier(object):
       
  1489 
       
  1490     def __init__(self, source=None):
       
  1491         self.source = source
       
  1492 
       
  1493     def set_schema(self, schema):
       
  1494         """set the instance'schema"""
       
  1495         pass
       
  1496 
       
  1497 class LoginPasswordAuthentifier(BaseAuthentifier):
       
  1498     passwd_rql = 'Any P WHERE X is CWUser, X login %(login)s, X upassword P'
       
  1499     auth_rql = (u'Any X WHERE X is CWUser, X login %(login)s, X upassword %(pwd)s, '
       
  1500                 'X cw_source S, S name "system"')
       
  1501     _sols = ({'X': 'CWUser', 'P': 'Password', 'S': 'CWSource'},)
       
  1502 
       
  1503     def set_schema(self, schema):
       
  1504         """set the instance'schema"""
       
  1505         if 'CWUser' in schema: # probably an empty schema if not true...
       
  1506             # rql syntax trees used to authenticate users
       
  1507             self._passwd_rqlst = self.source.compile_rql(self.passwd_rql, self._sols)
       
  1508             self._auth_rqlst = self.source.compile_rql(self.auth_rql, self._sols)
       
  1509 
       
  1510     def authenticate(self, cnx, login, password=None, **kwargs):
       
  1511         """return CWUser eid for the given login/password if this account is
       
  1512         defined in this source, else raise `AuthenticationError`
       
  1513 
       
  1514         two queries are needed since passwords are stored crypted, so we have
       
  1515         to fetch the salt first
       
  1516         """
       
  1517         args = {'login': login, 'pwd' : None}
       
  1518         if password is not None:
       
  1519             rset = self.source.syntax_tree_search(cnx, self._passwd_rqlst, args)
       
  1520             try:
       
  1521                 pwd = rset[0][0]
       
  1522             except IndexError:
       
  1523                 raise AuthenticationError('bad login')
       
  1524             if pwd is None:
       
  1525                 # if pwd is None but a password is provided, something is wrong
       
  1526                 raise AuthenticationError('bad password')
       
  1527             # passwords are stored using the Bytes type, so we get a StringIO
       
  1528             args['pwd'] = Binary(crypt_password(password, pwd.getvalue()))
       
  1529         # get eid from login and (crypted) password
       
  1530         rset = self.source.syntax_tree_search(cnx, self._auth_rqlst, args)
       
  1531         pwd = args['pwd']
       
  1532         try:
       
  1533             user = rset[0][0]
       
  1534             # If the stored hash uses a deprecated scheme (e.g. DES or MD5 used
       
  1535             # before 3.14.7), update with a fresh one
       
  1536             if pwd is not None and pwd.getvalue():
       
  1537                 verify, newhash = verify_and_update(password, pwd.getvalue())
       
  1538                 if not verify: # should not happen, but...
       
  1539                     raise AuthenticationError('bad password')
       
  1540                 if newhash:
       
  1541                     cnx.system_sql("UPDATE %s SET %s=%%(newhash)s WHERE %s=%%(login)s" % (
       
  1542                                         SQL_PREFIX + 'CWUser',
       
  1543                                         SQL_PREFIX + 'upassword',
       
  1544                                         SQL_PREFIX + 'login'),
       
  1545                                        {'newhash': self.source._binary(newhash.encode('ascii')),
       
  1546                                         'login': login})
       
  1547                     cnx.commit()
       
  1548             return user
       
  1549         except IndexError:
       
  1550             raise AuthenticationError('bad password')
       
  1551 
       
  1552 
       
  1553 class EmailPasswordAuthentifier(BaseAuthentifier):
       
  1554     def authenticate(self, cnx, login, **authinfo):
       
  1555         # email_auth flag prevent from infinite recursion (call to
       
  1556         # repo.check_auth_info at the end of this method may lead us here again)
       
  1557         if not '@' in login or authinfo.pop('email_auth', None):
       
  1558             raise AuthenticationError('not an email')
       
  1559         rset = cnx.execute('Any L WHERE U login L, U primary_email M, '
       
  1560                                'M address %(login)s', {'login': login},
       
  1561                                build_descr=False)
       
  1562         if rset.rowcount != 1:
       
  1563             raise AuthenticationError('unexisting email')
       
  1564         login = rset.rows[0][0]
       
  1565         authinfo['email_auth'] = True
       
  1566         return self.source.repo.check_auth_info(cnx, login, authinfo)
       
  1567 
       
  1568 
       
  1569 class DatabaseIndependentBackupRestore(object):
       
  1570     """Helper class to perform db backend agnostic backup and restore
       
  1571 
       
  1572     The backup and restore methods are used to dump / restore the
       
  1573     system database in a database independent format. The file is a
       
  1574     Zip archive containing the following files:
       
  1575 
       
  1576     * format.txt: the format of the archive. Currently '1.1'
       
  1577     * tables.txt: list of filenames in the archive tables/ directory
       
  1578     * sequences.txt: list of filenames in the archive sequences/ directory
       
  1579     * numranges.txt: list of filenames in the archive numrange/ directory
       
  1580     * versions.txt: the list of cube versions from CWProperty
       
  1581     * tables/<tablename>.<chunkno>: pickled data
       
  1582     * sequences/<sequencename>: pickled data
       
  1583 
       
  1584     The pickled data format for tables, numranges and sequences is a tuple of 3 elements:
       
  1585     * the table name
       
  1586     * a tuple of column names
       
  1587     * a list of rows (as tuples with one element per column)
       
  1588 
       
  1589     Tables are saved in chunks in different files in order to prevent
       
  1590     a too high memory consumption.
       
  1591     """
       
  1592     blocksize = 100
       
  1593 
       
  1594     def __init__(self, source):
       
  1595         """
       
  1596         :param: source an instance of the system source
       
  1597         """
       
  1598         self._source = source
       
  1599         self.logger = logging.getLogger('cubicweb.ctl')
       
  1600         self.logger.setLevel(logging.INFO)
       
  1601         self.logger.addHandler(logging.StreamHandler(sys.stdout))
       
  1602         self.schema = self._source.schema
       
  1603         self.dbhelper = self._source.dbhelper
       
  1604         self.cnx = None
       
  1605         self.cursor = None
       
  1606         self.sql_generator = sqlgen.SQLGenerator()
       
  1607 
       
  1608     def get_connection(self):
       
  1609         return self._source.get_connection()
       
  1610 
       
  1611     def backup(self, backupfile):
       
  1612         archive = zipfile.ZipFile(backupfile, 'w', allowZip64=True)
       
  1613         self.cnx = self.get_connection()
       
  1614         try:
       
  1615             self.cursor = self.cnx.cursor()
       
  1616             self.cursor.arraysize = 100
       
  1617             self.logger.info('writing metadata')
       
  1618             self.write_metadata(archive)
       
  1619             for seq in self.get_sequences():
       
  1620                 self.logger.info('processing sequence %s', seq)
       
  1621                 self.write_sequence(archive, seq)
       
  1622             for numrange in self.get_numranges():
       
  1623                 self.logger.info('processing numrange %s', numrange)
       
  1624                 self.write_numrange(archive, numrange)
       
  1625             for table in self.get_tables():
       
  1626                 self.logger.info('processing table %s', table)
       
  1627                 self.write_table(archive, table)
       
  1628         finally:
       
  1629             archive.close()
       
  1630             self.cnx.close()
       
  1631         self.logger.info('done')
       
  1632 
       
  1633     def get_tables(self):
       
  1634         non_entity_tables = ['entities',
       
  1635                              'transactions',
       
  1636                              'tx_entity_actions',
       
  1637                              'tx_relation_actions',
       
  1638                              ]
       
  1639         etype_tables = []
       
  1640         relation_tables = []
       
  1641         prefix = 'cw_'
       
  1642         for etype in self.schema.entities():
       
  1643             eschema = self.schema.eschema(etype)
       
  1644             if eschema.final:
       
  1645                 continue
       
  1646             etype_tables.append('%s%s'%(prefix, etype))
       
  1647         for rtype in self.schema.relations():
       
  1648             rschema = self.schema.rschema(rtype)
       
  1649             if rschema.final or rschema.inlined or rschema in VIRTUAL_RTYPES:
       
  1650                 continue
       
  1651             relation_tables.append('%s_relation' % rtype)
       
  1652         return non_entity_tables + etype_tables + relation_tables
       
  1653 
       
  1654     def get_sequences(self):
       
  1655         return []
       
  1656 
       
  1657     def get_numranges(self):
       
  1658         return ['entities_id_seq']
       
  1659 
       
  1660     def write_metadata(self, archive):
       
  1661         archive.writestr('format.txt', '1.1')
       
  1662         archive.writestr('tables.txt', '\n'.join(self.get_tables()))
       
  1663         archive.writestr('sequences.txt', '\n'.join(self.get_sequences()))
       
  1664         archive.writestr('numranges.txt', '\n'.join(self.get_numranges()))
       
  1665         versions = self._get_versions()
       
  1666         versions_str = '\n'.join('%s %s' % (k, v)
       
  1667                                  for k, v in versions)
       
  1668         archive.writestr('versions.txt', versions_str)
       
  1669 
       
  1670     def write_sequence(self, archive, seq):
       
  1671         sql = self.dbhelper.sql_sequence_current_state(seq)
       
  1672         columns, rows_iterator = self._get_cols_and_rows(sql)
       
  1673         rows = list(rows_iterator)
       
  1674         serialized = self._serialize(seq, columns, rows)
       
  1675         archive.writestr('sequences/%s' % seq, serialized)
       
  1676 
       
  1677     def write_numrange(self, archive, numrange):
       
  1678         sql = self.dbhelper.sql_numrange_current_state(numrange)
       
  1679         columns, rows_iterator = self._get_cols_and_rows(sql)
       
  1680         rows = list(rows_iterator)
       
  1681         serialized = self._serialize(numrange, columns, rows)
       
  1682         archive.writestr('numrange/%s' % numrange, serialized)
       
  1683 
       
  1684     def write_table(self, archive, table):
       
  1685         nb_lines_sql = 'SELECT COUNT(*) FROM %s' % table
       
  1686         self.cursor.execute(nb_lines_sql)
       
  1687         rowcount = self.cursor.fetchone()[0]
       
  1688         sql = 'SELECT * FROM %s' % table
       
  1689         columns, rows_iterator = self._get_cols_and_rows(sql)
       
  1690         self.logger.info('number of rows: %d', rowcount)
       
  1691         blocksize = self.blocksize
       
  1692         if rowcount > 0:
       
  1693             for i, start in enumerate(range(0, rowcount, blocksize)):
       
  1694                 rows = list(itertools.islice(rows_iterator, blocksize))
       
  1695                 serialized = self._serialize(table, columns, rows)
       
  1696                 archive.writestr('tables/%s.%04d' % (table, i), serialized)
       
  1697                 self.logger.debug('wrote rows %d to %d (out of %d) to %s.%04d',
       
  1698                                   start, start+len(rows)-1,
       
  1699                                   rowcount,
       
  1700                                   table, i)
       
  1701         else:
       
  1702             rows = []
       
  1703             serialized = self._serialize(table, columns, rows)
       
  1704             archive.writestr('tables/%s.%04d' % (table, 0), serialized)
       
  1705 
       
  1706     def _get_cols_and_rows(self, sql):
       
  1707         process_result = self._source.iter_process_result
       
  1708         self.cursor.execute(sql)
       
  1709         columns = (d[0] for d in self.cursor.description)
       
  1710         rows = process_result(self.cursor)
       
  1711         return tuple(columns), rows
       
  1712 
       
  1713     def _serialize(self, name, columns, rows):
       
  1714         return pickle.dumps((name, columns, rows), pickle.HIGHEST_PROTOCOL)
       
  1715 
       
  1716     def restore(self, backupfile):
       
  1717         archive = zipfile.ZipFile(backupfile, 'r', allowZip64=True)
       
  1718         self.cnx = self.get_connection()
       
  1719         self.cursor = self.cnx.cursor()
       
  1720         sequences, numranges, tables, table_chunks = self.read_metadata(archive, backupfile)
       
  1721         for seq in sequences:
       
  1722             self.logger.info('restoring sequence %s', seq)
       
  1723             self.read_sequence(archive, seq)
       
  1724         for numrange in numranges:
       
  1725             self.logger.info('restoring numrange %s', numrange)
       
  1726             self.read_numrange(archive, numrange)
       
  1727         for table in tables:
       
  1728             self.logger.info('restoring table %s', table)
       
  1729             self.read_table(archive, table, sorted(table_chunks[table]))
       
  1730         self.cnx.close()
       
  1731         archive.close()
       
  1732         self.logger.info('done')
       
  1733 
       
  1734     def read_metadata(self, archive, backupfile):
       
  1735         formatinfo = archive.read('format.txt')
       
  1736         self.logger.info('checking metadata')
       
  1737         if formatinfo.strip() != "1.1":
       
  1738             self.logger.critical('Unsupported format in archive: %s', formatinfo)
       
  1739             raise ValueError('Unknown format in %s: %s' % (backupfile, formatinfo))
       
  1740         tables = archive.read('tables.txt').splitlines()
       
  1741         sequences = archive.read('sequences.txt').splitlines()
       
  1742         numranges = archive.read('numranges.txt').splitlines()
       
  1743         file_versions = self._parse_versions(archive.read('versions.txt'))
       
  1744         versions = set(self._get_versions())
       
  1745         if file_versions != versions:
       
  1746             self.logger.critical('Unable to restore : versions do not match')
       
  1747             self.logger.critical('Expected:\n%s', '\n'.join('%s : %s' % (cube, ver)
       
  1748                                                             for cube, ver in sorted(versions)))
       
  1749             self.logger.critical('Found:\n%s', '\n'.join('%s : %s' % (cube, ver)
       
  1750                                                          for cube, ver in sorted(file_versions)))
       
  1751             raise ValueError('Unable to restore : versions do not match')
       
  1752         table_chunks = {}
       
  1753         for name in archive.namelist():
       
  1754             if not name.startswith('tables/'):
       
  1755                 continue
       
  1756             filename = basename(name)
       
  1757             tablename, _ext = filename.rsplit('.', 1)
       
  1758             table_chunks.setdefault(tablename, []).append(name)
       
  1759         return sequences, numranges, tables, table_chunks
       
  1760 
       
  1761     def read_sequence(self, archive, seq):
       
  1762         seqname, columns, rows = pickle.loads(archive.read('sequences/%s' % seq))
       
  1763         assert seqname == seq
       
  1764         assert len(rows) == 1
       
  1765         assert len(rows[0]) == 1
       
  1766         value = rows[0][0]
       
  1767         sql = self.dbhelper.sql_restart_sequence(seq, value)
       
  1768         self.cursor.execute(sql)
       
  1769         self.cnx.commit()
       
  1770 
       
  1771     def read_numrange(self, archive, numrange):
       
  1772         rangename, columns, rows = pickle.loads(archive.read('numrange/%s' % numrange))
       
  1773         assert rangename == numrange
       
  1774         assert len(rows) == 1
       
  1775         assert len(rows[0]) == 1
       
  1776         value = rows[0][0]
       
  1777         sql = self.dbhelper.sql_restart_numrange(numrange, value)
       
  1778         self.cursor.execute(sql)
       
  1779         self.cnx.commit()
       
  1780 
       
  1781     def read_table(self, archive, table, filenames):
       
  1782         merge_args = self._source.merge_args
       
  1783         self.cursor.execute('DELETE FROM %s' % table)
       
  1784         self.cnx.commit()
       
  1785         row_count = 0
       
  1786         for filename in filenames:
       
  1787             tablename, columns, rows = pickle.loads(archive.read(filename))
       
  1788             assert tablename == table
       
  1789             if not rows:
       
  1790                 continue
       
  1791             insert = self.sql_generator.insert(table,
       
  1792                                                dict(zip(columns, rows[0])))
       
  1793             for row in rows:
       
  1794                 self.cursor.execute(insert, merge_args(dict(zip(columns, row)), {}))
       
  1795             row_count += len(rows)
       
  1796             self.cnx.commit()
       
  1797         self.logger.info('inserted %d rows', row_count)
       
  1798 
       
  1799 
       
  1800     def _parse_versions(self, version_str):
       
  1801         versions = set()
       
  1802         for line in version_str.splitlines():
       
  1803             versions.add(tuple(line.split()))
       
  1804         return versions
       
  1805 
       
  1806     def _get_versions(self):
       
  1807         version_sql = 'SELECT cw_pkey, cw_value FROM cw_CWProperty'
       
  1808         versions = []
       
  1809         self.cursor.execute(version_sql)
       
  1810         for pkey, value in self.cursor.fetchall():
       
  1811             if pkey.startswith(u'system.version'):
       
  1812                 versions.append((pkey, value))
       
  1813         return versions