1 # copyright 2003-2014 LOGILAB S.A. (Paris, FRANCE), all rights reserved. |
|
2 # contact http://www.logilab.fr/ -- mailto:contact@logilab.fr |
|
3 # |
|
4 # This file is part of CubicWeb. |
|
5 # |
|
6 # CubicWeb is free software: you can redistribute it and/or modify it under the |
|
7 # terms of the GNU Lesser General Public License as published by the Free |
|
8 # Software Foundation, either version 2.1 of the License, or (at your option) |
|
9 # any later version. |
|
10 # |
|
11 # CubicWeb is distributed in the hope that it will be useful, but WITHOUT |
|
12 # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS |
|
13 # FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more |
|
14 # details. |
|
15 # |
|
16 # You should have received a copy of the GNU Lesser General Public License along |
|
17 # with CubicWeb. If not, see <http://www.gnu.org/licenses/>. |
|
18 """Adapters for native cubicweb sources. |
|
19 |
|
20 Notes: |
|
21 * extid (aka external id, the primary key of an entity in the external source |
|
22 from which it comes from) are stored in a varchar column encoded as a base64 |
|
23 string. This is because it should actually be Bytes but we want an index on |
|
24 it for fast querying. |
|
25 """ |
|
26 from __future__ import print_function |
|
27 |
|
28 __docformat__ = "restructuredtext en" |
|
29 |
|
30 from threading import Lock |
|
31 from datetime import datetime |
|
32 from base64 import b64encode |
|
33 from contextlib import contextmanager |
|
34 from os.path import basename |
|
35 import re |
|
36 import itertools |
|
37 import zipfile |
|
38 import logging |
|
39 import sys |
|
40 |
|
41 from six import PY2, text_type, binary_type, string_types |
|
42 from six.moves import range, cPickle as pickle |
|
43 |
|
44 from logilab.common.decorators import cached, clear_cache |
|
45 from logilab.common.configuration import Method |
|
46 from logilab.common.shellutils import getlogin |
|
47 from logilab.database import get_db_helper, sqlgen |
|
48 |
|
49 from yams.schema import role_name |
|
50 |
|
51 from cubicweb import (UnknownEid, AuthenticationError, ValidationError, Binary, |
|
52 UniqueTogetherError, UndoTransactionException, ViolatedConstraint) |
|
53 from cubicweb import transaction as tx, server, neg_role |
|
54 from cubicweb.utils import QueryCache |
|
55 from cubicweb.schema import VIRTUAL_RTYPES |
|
56 from cubicweb.cwconfig import CubicWebNoAppConfiguration |
|
57 from cubicweb.server import hook |
|
58 from cubicweb.server import schema2sql as y2sql |
|
59 from cubicweb.server.utils import crypt_password, eschema_eid, verify_and_update |
|
60 from cubicweb.server.sqlutils import SQL_PREFIX, SQLAdapterMixIn |
|
61 from cubicweb.server.rqlannotation import set_qdata |
|
62 from cubicweb.server.hook import CleanupDeletedEidsCacheOp |
|
63 from cubicweb.server.edition import EditedEntity |
|
64 from cubicweb.server.sources import AbstractSource, dbg_st_search, dbg_results |
|
65 from cubicweb.server.sources.rql2sql import SQLGenerator |
|
66 from cubicweb.statsd_logger import statsd_timeit |
|
67 |
|
68 |
|
69 ATTR_MAP = {} |
|
70 NONSYSTEM_ETYPES = set() |
|
71 NONSYSTEM_RELATIONS = set() |
|
72 |
|
73 class LogCursor(object): |
|
74 def __init__(self, cursor): |
|
75 self.cu = cursor |
|
76 |
|
77 def execute(self, query, args=None): |
|
78 """Execute a query. |
|
79 it's a function just so that it shows up in profiling |
|
80 """ |
|
81 if server.DEBUG & server.DBG_SQL: |
|
82 print('exec', query, args) |
|
83 try: |
|
84 self.cu.execute(str(query), args) |
|
85 except Exception as ex: |
|
86 print("sql: %r\n args: %s\ndbms message: %r" % ( |
|
87 query, args, ex.args[0])) |
|
88 raise |
|
89 |
|
90 def fetchall(self): |
|
91 return self.cu.fetchall() |
|
92 |
|
93 def fetchone(self): |
|
94 return self.cu.fetchone() |
|
95 |
|
96 |
|
97 def sql_or_clauses(sql, clauses): |
|
98 select, restr = sql.split(' WHERE ', 1) |
|
99 restrclauses = restr.split(' AND ') |
|
100 for clause in clauses: |
|
101 restrclauses.remove(clause) |
|
102 if restrclauses: |
|
103 restr = '%s AND (%s)' % (' AND '.join(restrclauses), |
|
104 ' OR '.join(clauses)) |
|
105 else: |
|
106 restr = '(%s)' % ' OR '.join(clauses) |
|
107 return '%s WHERE %s' % (select, restr) |
|
108 |
|
109 |
|
110 def rdef_table_column(rdef): |
|
111 """return table and column used to store the given relation definition in |
|
112 the database |
|
113 """ |
|
114 return (SQL_PREFIX + str(rdef.subject), |
|
115 SQL_PREFIX + str(rdef.rtype)) |
|
116 |
|
117 |
|
118 def rdef_physical_info(dbhelper, rdef): |
|
119 """return backend type and a boolean flag if NULL values should be allowed |
|
120 for a given relation definition |
|
121 """ |
|
122 if not rdef.object.final: |
|
123 return dbhelper.TYPE_MAPPING['Int'] |
|
124 coltype = y2sql.type_from_rdef(dbhelper, rdef, creating=False) |
|
125 allownull = rdef.cardinality[0] != '1' |
|
126 return coltype, allownull |
|
127 |
|
128 |
|
129 class _UndoException(Exception): |
|
130 """something went wrong during undoing""" |
|
131 |
|
132 def __unicode__(self): |
|
133 """Called by the unicode builtin; should return a Unicode object |
|
134 |
|
135 Type of _UndoException message must be `unicode` by design in CubicWeb. |
|
136 """ |
|
137 assert isinstance(self.args[0], text_type) |
|
138 return self.args[0] |
|
139 |
|
140 |
|
141 def _undo_check_relation_target(tentity, rdef, role): |
|
142 """check linked entity has not been redirected for this relation""" |
|
143 card = rdef.role_cardinality(role) |
|
144 if card in '?1' and tentity.related(rdef.rtype, role): |
|
145 raise _UndoException(tentity._cw._( |
|
146 "Can't restore %(role)s relation %(rtype)s to entity %(eid)s which " |
|
147 "is already linked using this relation.") |
|
148 % {'role': neg_role(role), |
|
149 'rtype': rdef.rtype, |
|
150 'eid': tentity.eid}) |
|
151 |
|
152 def _undo_rel_info(cnx, subj, rtype, obj): |
|
153 entities = [] |
|
154 for role, eid in (('subject', subj), ('object', obj)): |
|
155 try: |
|
156 entities.append(cnx.entity_from_eid(eid)) |
|
157 except UnknownEid: |
|
158 raise _UndoException(cnx._( |
|
159 "Can't restore relation %(rtype)s, %(role)s entity %(eid)s" |
|
160 " doesn't exist anymore.") |
|
161 % {'role': cnx._(role), |
|
162 'rtype': cnx._(rtype), |
|
163 'eid': eid}) |
|
164 sentity, oentity = entities |
|
165 try: |
|
166 rschema = cnx.vreg.schema.rschema(rtype) |
|
167 rdef = rschema.rdefs[(sentity.cw_etype, oentity.cw_etype)] |
|
168 except KeyError: |
|
169 raise _UndoException(cnx._( |
|
170 "Can't restore relation %(rtype)s between %(subj)s and " |
|
171 "%(obj)s, that relation does not exists anymore in the " |
|
172 "schema.") |
|
173 % {'rtype': cnx._(rtype), |
|
174 'subj': subj, |
|
175 'obj': obj}) |
|
176 return sentity, oentity, rdef |
|
177 |
|
178 def _undo_has_later_transaction(cnx, eid): |
|
179 return cnx.system_sql('''\ |
|
180 SELECT T.tx_uuid FROM transactions AS TREF, transactions AS T |
|
181 WHERE TREF.tx_uuid='%(txuuid)s' AND T.tx_uuid!='%(txuuid)s' |
|
182 AND T.tx_time>=TREF.tx_time |
|
183 AND (EXISTS(SELECT 1 FROM tx_entity_actions AS TEA |
|
184 WHERE TEA.tx_uuid=T.tx_uuid AND TEA.eid=%(eid)s) |
|
185 OR EXISTS(SELECT 1 FROM tx_relation_actions as TRA |
|
186 WHERE TRA.tx_uuid=T.tx_uuid AND ( |
|
187 TRA.eid_from=%(eid)s OR TRA.eid_to=%(eid)s)) |
|
188 )''' % {'txuuid': cnx.transaction_data['undoing_uuid'], |
|
189 'eid': eid}).fetchone() |
|
190 |
|
191 |
|
192 class DefaultEidGenerator(object): |
|
193 __slots__ = ('source', 'cnx', 'lock') |
|
194 |
|
195 def __init__(self, source): |
|
196 self.source = source |
|
197 self.cnx = None |
|
198 self.lock = Lock() |
|
199 |
|
200 def close(self): |
|
201 if self.cnx: |
|
202 self.cnx.close() |
|
203 self.cnx = None |
|
204 |
|
205 def create_eid(self, _cnx, count=1): |
|
206 # lock needed to prevent 'Connection is busy with results for another |
|
207 # command (0)' errors with SQLServer |
|
208 assert count > 0 |
|
209 with self.lock: |
|
210 return self._create_eid(count) |
|
211 |
|
212 def _create_eid(self, count): |
|
213 # internal function doing the eid creation without locking. |
|
214 # needed for the recursive handling of disconnections (otherwise we |
|
215 # deadlock on self._eid_cnx_lock |
|
216 source = self.source |
|
217 if self.cnx is None: |
|
218 self.cnx = source.get_connection() |
|
219 cnx = self.cnx |
|
220 try: |
|
221 cursor = cnx.cursor() |
|
222 for sql in source.dbhelper.sqls_increment_numrange('entities_id_seq', count): |
|
223 cursor.execute(sql) |
|
224 eid = cursor.fetchone()[0] |
|
225 except (source.OperationalError, source.InterfaceError): |
|
226 # FIXME: better detection of deconnection pb |
|
227 source.warning("trying to reconnect create eid connection") |
|
228 self.cnx = None |
|
229 return self._create_eid(count) |
|
230 except source.DbapiError as exc: |
|
231 # We get this one with pyodbc and SQL Server when connection was reset |
|
232 if exc.args[0] == '08S01': |
|
233 source.warning("trying to reconnect create eid connection") |
|
234 self.cnx = None |
|
235 return self._create_eid(count) |
|
236 else: |
|
237 raise |
|
238 except Exception: # WTF? |
|
239 cnx.rollback() |
|
240 self.cnx = None |
|
241 source.exception('create eid failed in an unforeseen way on SQL statement %s', sql) |
|
242 raise |
|
243 else: |
|
244 cnx.commit() |
|
245 return eid |
|
246 |
|
247 |
|
248 class SQLITEEidGenerator(object): |
|
249 __slots__ = ('source', 'lock') |
|
250 |
|
251 def __init__(self, source): |
|
252 self.source = source |
|
253 self.lock = Lock() |
|
254 |
|
255 def close(self): |
|
256 pass |
|
257 |
|
258 def create_eid(self, cnx, count=1): |
|
259 assert count > 0 |
|
260 source = self.source |
|
261 with self.lock: |
|
262 for sql in source.dbhelper.sqls_increment_numrange('entities_id_seq', count): |
|
263 cursor = source.doexec(cnx, sql) |
|
264 return cursor.fetchone()[0] |
|
265 |
|
266 |
|
267 class NativeSQLSource(SQLAdapterMixIn, AbstractSource): |
|
268 """adapter for source using the native cubicweb schema (see below) |
|
269 """ |
|
270 sqlgen_class = SQLGenerator |
|
271 options = ( |
|
272 ('db-driver', |
|
273 {'type' : 'string', |
|
274 'default': 'postgres', |
|
275 # XXX use choice type |
|
276 'help': 'database driver (postgres, sqlite, sqlserver2005)', |
|
277 'group': 'native-source', 'level': 0, |
|
278 }), |
|
279 ('db-host', |
|
280 {'type' : 'string', |
|
281 'default': '', |
|
282 'help': 'database host', |
|
283 'group': 'native-source', 'level': 1, |
|
284 }), |
|
285 ('db-port', |
|
286 {'type' : 'string', |
|
287 'default': '', |
|
288 'help': 'database port', |
|
289 'group': 'native-source', 'level': 1, |
|
290 }), |
|
291 ('db-name', |
|
292 {'type' : 'string', |
|
293 'default': Method('default_instance_id'), |
|
294 'help': 'database name', |
|
295 'group': 'native-source', 'level': 0, |
|
296 }), |
|
297 ('db-namespace', |
|
298 {'type' : 'string', |
|
299 'default': '', |
|
300 'help': 'database namespace (schema) name', |
|
301 'group': 'native-source', 'level': 1, |
|
302 }), |
|
303 ('db-user', |
|
304 {'type' : 'string', |
|
305 'default': CubicWebNoAppConfiguration.mode == 'user' and getlogin() or 'cubicweb', |
|
306 'help': 'database user', |
|
307 'group': 'native-source', 'level': 0, |
|
308 }), |
|
309 ('db-password', |
|
310 {'type' : 'password', |
|
311 'default': '', |
|
312 'help': 'database password', |
|
313 'group': 'native-source', 'level': 0, |
|
314 }), |
|
315 ('db-encoding', |
|
316 {'type' : 'string', |
|
317 'default': 'utf8', |
|
318 'help': 'database encoding', |
|
319 'group': 'native-source', 'level': 1, |
|
320 }), |
|
321 ('db-extra-arguments', |
|
322 {'type' : 'string', |
|
323 'default': '', |
|
324 'help': 'set to "Trusted_Connection" if you are using SQLServer and ' |
|
325 'want trusted authentication for the database connection', |
|
326 'group': 'native-source', 'level': 2, |
|
327 }), |
|
328 ('db-statement-timeout', |
|
329 {'type': 'int', |
|
330 'default': 0, |
|
331 'help': 'sql statement timeout, in milliseconds (postgres only)', |
|
332 'group': 'native-source', 'level': 2, |
|
333 }), |
|
334 ) |
|
335 |
|
336 def __init__(self, repo, source_config, *args, **kwargs): |
|
337 SQLAdapterMixIn.__init__(self, source_config, repairing=repo.config.repairing) |
|
338 self.authentifiers = [LoginPasswordAuthentifier(self)] |
|
339 if repo.config['allow-email-login']: |
|
340 self.authentifiers.insert(0, EmailPasswordAuthentifier(self)) |
|
341 AbstractSource.__init__(self, repo, source_config, *args, **kwargs) |
|
342 # sql generator |
|
343 self._rql_sqlgen = self.sqlgen_class(self.schema, self.dbhelper, |
|
344 ATTR_MAP.copy()) |
|
345 # full text index helper |
|
346 self.do_fti = not repo.config['delay-full-text-indexation'] |
|
347 # sql queries cache |
|
348 self._cache = QueryCache(repo.config['rql-cache-size']) |
|
349 # (etype, attr) / storage mapping |
|
350 self._storages = {} |
|
351 self.binary_to_str = self.dbhelper.dbapi_module.binary_to_str |
|
352 if self.dbdriver == 'sqlite': |
|
353 self.eid_generator = SQLITEEidGenerator(self) |
|
354 else: |
|
355 self.eid_generator = DefaultEidGenerator(self) |
|
356 self.create_eid = self.eid_generator.create_eid |
|
357 |
|
358 def check_config(self, source_entity): |
|
359 """check configuration of source entity""" |
|
360 if source_entity.host_config: |
|
361 msg = source_entity._cw._('the system source has its configuration ' |
|
362 'stored on the file-system') |
|
363 raise ValidationError(source_entity.eid, {role_name('config', 'subject'): msg}) |
|
364 |
|
365 def add_authentifier(self, authentifier): |
|
366 self.authentifiers.append(authentifier) |
|
367 authentifier.source = self |
|
368 authentifier.set_schema(self.schema) |
|
369 |
|
370 def reset_caches(self): |
|
371 """method called during test to reset potential source caches""" |
|
372 self._cache = QueryCache(self.repo.config['rql-cache-size']) |
|
373 |
|
374 def clear_eid_cache(self, eid, etype): |
|
375 """clear potential caches for the given eid""" |
|
376 self._cache.pop('Any X WHERE X eid %s, X is %s' % (eid, etype), None) |
|
377 self._cache.pop('Any X WHERE X eid %s' % eid, None) |
|
378 self._cache.pop('Any %s' % eid, None) |
|
379 |
|
380 @statsd_timeit |
|
381 def sqlexec(self, cnx, sql, args=None): |
|
382 """execute the query and return its result""" |
|
383 return self.process_result(self.doexec(cnx, sql, args)) |
|
384 |
|
385 def init_creating(self, cnxset=None): |
|
386 # check full text index availibility |
|
387 if self.do_fti: |
|
388 if cnxset is None: |
|
389 _cnxset = self.repo._get_cnxset() |
|
390 else: |
|
391 _cnxset = cnxset |
|
392 if not self.dbhelper.has_fti_table(_cnxset.cu): |
|
393 if not self.repo.config.creating: |
|
394 self.critical('no text index table') |
|
395 self.do_fti = False |
|
396 if cnxset is None: |
|
397 _cnxset.cnxset_freed() |
|
398 self.repo._free_cnxset(_cnxset) |
|
399 |
|
400 def backup(self, backupfile, confirm, format='native'): |
|
401 """method called to create a backup of the source's data""" |
|
402 if format == 'portable': |
|
403 # ensure the schema is the one stored in the database: if repository |
|
404 # started in quick_start mode, the file system's one has been loaded |
|
405 # so force reload |
|
406 if self.repo.config.quick_start: |
|
407 self.repo.set_schema(self.repo.deserialize_schema(), |
|
408 resetvreg=False) |
|
409 helper = DatabaseIndependentBackupRestore(self) |
|
410 self.close_source_connections() |
|
411 try: |
|
412 helper.backup(backupfile) |
|
413 finally: |
|
414 self.open_source_connections() |
|
415 elif format == 'native': |
|
416 self.close_source_connections() |
|
417 try: |
|
418 self.backup_to_file(backupfile, confirm) |
|
419 finally: |
|
420 self.open_source_connections() |
|
421 else: |
|
422 raise ValueError('Unknown format %r' % format) |
|
423 |
|
424 |
|
425 def restore(self, backupfile, confirm, drop, format='native'): |
|
426 """method called to restore a backup of source's data""" |
|
427 if self.repo.config.init_cnxset_pool: |
|
428 self.close_source_connections() |
|
429 try: |
|
430 if format == 'portable': |
|
431 helper = DatabaseIndependentBackupRestore(self) |
|
432 helper.restore(backupfile) |
|
433 elif format == 'native': |
|
434 self.restore_from_file(backupfile, confirm, drop=drop) |
|
435 else: |
|
436 raise ValueError('Unknown format %r' % format) |
|
437 finally: |
|
438 if self.repo.config.init_cnxset_pool: |
|
439 self.open_source_connections() |
|
440 |
|
441 |
|
442 def init(self, activated, source_entity): |
|
443 try: |
|
444 # test if 'asource' column exists |
|
445 query = self.dbhelper.sql_add_limit_offset('SELECT asource FROM entities', 1) |
|
446 source_entity._cw.system_sql(query) |
|
447 except Exception as ex: |
|
448 self.eid_type_source = self.eid_type_source_pre_131 |
|
449 super(NativeSQLSource, self).init(activated, source_entity) |
|
450 self.init_creating(source_entity._cw.cnxset) |
|
451 |
|
452 def shutdown(self): |
|
453 self.eid_generator.close() |
|
454 |
|
455 # XXX deprecates [un]map_attribute? |
|
456 def map_attribute(self, etype, attr, cb, sourcedb=True): |
|
457 self._rql_sqlgen.attr_map[u'%s.%s' % (etype, attr)] = (cb, sourcedb) |
|
458 |
|
459 def unmap_attribute(self, etype, attr): |
|
460 self._rql_sqlgen.attr_map.pop(u'%s.%s' % (etype, attr), None) |
|
461 |
|
462 def set_storage(self, etype, attr, storage): |
|
463 storage_dict = self._storages.setdefault(etype, {}) |
|
464 storage_dict[attr] = storage |
|
465 self.map_attribute(etype, attr, |
|
466 storage.callback, storage.is_source_callback) |
|
467 |
|
468 def unset_storage(self, etype, attr): |
|
469 self._storages[etype].pop(attr) |
|
470 # if etype has no storage left, remove the entry |
|
471 if not self._storages[etype]: |
|
472 del self._storages[etype] |
|
473 self.unmap_attribute(etype, attr) |
|
474 |
|
475 def storage(self, etype, attr): |
|
476 """return the storage for the given entity type / attribute |
|
477 """ |
|
478 try: |
|
479 return self._storages[etype][attr] |
|
480 except KeyError: |
|
481 raise Exception('no custom storage set for %s.%s' % (etype, attr)) |
|
482 |
|
483 # ISource interface ####################################################### |
|
484 |
|
485 @statsd_timeit |
|
486 def compile_rql(self, rql, sols): |
|
487 rqlst = self.repo.vreg.rqlhelper.parse(rql) |
|
488 rqlst.restricted_vars = () |
|
489 rqlst.children[0].solutions = sols |
|
490 self.repo.querier.sqlgen_annotate(rqlst) |
|
491 set_qdata(self.schema.rschema, rqlst, ()) |
|
492 return rqlst |
|
493 |
|
494 def set_schema(self, schema): |
|
495 """set the instance'schema""" |
|
496 self._cache = QueryCache(self.repo.config['rql-cache-size']) |
|
497 self.cache_hit, self.cache_miss, self.no_cache = 0, 0, 0 |
|
498 self.schema = schema |
|
499 try: |
|
500 self._rql_sqlgen.schema = schema |
|
501 except AttributeError: |
|
502 pass # __init__ |
|
503 for authentifier in self.authentifiers: |
|
504 authentifier.set_schema(self.schema) |
|
505 clear_cache(self, 'need_fti_indexation') |
|
506 |
|
507 def support_entity(self, etype, write=False): |
|
508 """return true if the given entity's type is handled by this adapter |
|
509 if write is true, return true only if it's a RW support |
|
510 """ |
|
511 return not etype in NONSYSTEM_ETYPES |
|
512 |
|
513 def support_relation(self, rtype, write=False): |
|
514 """return true if the given relation's type is handled by this adapter |
|
515 if write is true, return true only if it's a RW support |
|
516 """ |
|
517 if write: |
|
518 return not rtype in NONSYSTEM_RELATIONS |
|
519 # due to current multi-sources implementation, the system source |
|
520 # can't claim not supporting a relation |
|
521 return True #not rtype == 'content_for' |
|
522 |
|
523 @statsd_timeit |
|
524 def authenticate(self, cnx, login, **kwargs): |
|
525 """return CWUser eid for the given login and other authentication |
|
526 information found in kwargs, else raise `AuthenticationError` |
|
527 """ |
|
528 for authentifier in self.authentifiers: |
|
529 try: |
|
530 return authentifier.authenticate(cnx, login, **kwargs) |
|
531 except AuthenticationError: |
|
532 continue |
|
533 raise AuthenticationError() |
|
534 |
|
535 def syntax_tree_search(self, cnx, union, args=None, cachekey=None, |
|
536 varmap=None): |
|
537 """return result from this source for a rql query (actually from |
|
538 a rql syntax tree and a solution dictionary mapping each used |
|
539 variable to a possible type). If cachekey is given, the query |
|
540 necessary to fetch the results (but not the results themselves) |
|
541 may be cached using this key. |
|
542 """ |
|
543 assert dbg_st_search(self.uri, union, varmap, args, cachekey) |
|
544 # remember number of actually selected term (sql generation may append some) |
|
545 if cachekey is None: |
|
546 self.no_cache += 1 |
|
547 # generate sql query if we are able to do so (not supported types...) |
|
548 sql, qargs, cbs = self._rql_sqlgen.generate(union, args, varmap) |
|
549 else: |
|
550 # sql may be cached |
|
551 try: |
|
552 sql, qargs, cbs = self._cache[cachekey] |
|
553 self.cache_hit += 1 |
|
554 except KeyError: |
|
555 self.cache_miss += 1 |
|
556 sql, qargs, cbs = self._rql_sqlgen.generate(union, args, varmap) |
|
557 self._cache[cachekey] = sql, qargs, cbs |
|
558 args = self.merge_args(args, qargs) |
|
559 assert isinstance(sql, string_types), repr(sql) |
|
560 cursor = self.doexec(cnx, sql, args) |
|
561 results = self.process_result(cursor, cnx, cbs) |
|
562 assert dbg_results(results) |
|
563 return results |
|
564 |
|
565 @contextmanager |
|
566 def _fixup_cw(self, cnx, entity): |
|
567 _cw = entity._cw |
|
568 entity._cw = cnx |
|
569 try: |
|
570 yield |
|
571 finally: |
|
572 entity._cw = _cw |
|
573 |
|
574 @contextmanager |
|
575 def _storage_handler(self, cnx, entity, event): |
|
576 # 1/ memorize values as they are before the storage is called. |
|
577 # For instance, the BFSStorage will replace the `data` |
|
578 # binary value with a Binary containing the destination path |
|
579 # on the filesystem. To make the entity.data usage absolutely |
|
580 # transparent, we'll have to reset entity.data to its binary |
|
581 # value once the SQL query will be executed |
|
582 restore_values = [] |
|
583 if isinstance(entity, list): |
|
584 entities = entity |
|
585 else: |
|
586 entities = [entity] |
|
587 etype = entities[0].__regid__ |
|
588 for attr, storage in self._storages.get(etype, {}).items(): |
|
589 for entity in entities: |
|
590 with self._fixup_cw(cnx, entity): |
|
591 if event == 'deleted': |
|
592 storage.entity_deleted(entity, attr) |
|
593 else: |
|
594 edited = entity.cw_edited |
|
595 if attr in edited: |
|
596 handler = getattr(storage, 'entity_%s' % event) |
|
597 to_restore = handler(entity, attr) |
|
598 restore_values.append((entity, attr, to_restore)) |
|
599 try: |
|
600 yield # 2/ execute the source's instructions |
|
601 finally: |
|
602 # 3/ restore original values |
|
603 for entity, attr, value in restore_values: |
|
604 entity.cw_edited.edited_attribute(attr, value) |
|
605 |
|
606 def add_entity(self, cnx, entity): |
|
607 """add a new entity to the source""" |
|
608 with self._storage_handler(cnx, entity, 'added'): |
|
609 attrs = self.preprocess_entity(entity) |
|
610 sql = self.sqlgen.insert(SQL_PREFIX + entity.cw_etype, attrs) |
|
611 self.doexec(cnx, sql, attrs) |
|
612 if cnx.ertype_supports_undo(entity.cw_etype): |
|
613 self._record_tx_action(cnx, 'tx_entity_actions', u'C', |
|
614 etype=text_type(entity.cw_etype), eid=entity.eid) |
|
615 |
|
616 def update_entity(self, cnx, entity): |
|
617 """replace an entity in the source""" |
|
618 with self._storage_handler(cnx, entity, 'updated'): |
|
619 attrs = self.preprocess_entity(entity) |
|
620 if cnx.ertype_supports_undo(entity.cw_etype): |
|
621 changes = self._save_attrs(cnx, entity, attrs) |
|
622 self._record_tx_action(cnx, 'tx_entity_actions', u'U', |
|
623 etype=text_type(entity.cw_etype), eid=entity.eid, |
|
624 changes=self._binary(pickle.dumps(changes))) |
|
625 sql = self.sqlgen.update(SQL_PREFIX + entity.cw_etype, attrs, |
|
626 ['cw_eid']) |
|
627 self.doexec(cnx, sql, attrs) |
|
628 |
|
629 def delete_entity(self, cnx, entity): |
|
630 """delete an entity from the source""" |
|
631 with self._storage_handler(cnx, entity, 'deleted'): |
|
632 if cnx.ertype_supports_undo(entity.cw_etype): |
|
633 attrs = [SQL_PREFIX + r.type |
|
634 for r in entity.e_schema.subject_relations() |
|
635 if (r.final or r.inlined) and not r in VIRTUAL_RTYPES] |
|
636 changes = self._save_attrs(cnx, entity, attrs) |
|
637 self._record_tx_action(cnx, 'tx_entity_actions', u'D', |
|
638 etype=text_type(entity.cw_etype), eid=entity.eid, |
|
639 changes=self._binary(pickle.dumps(changes))) |
|
640 attrs = {'cw_eid': entity.eid} |
|
641 sql = self.sqlgen.delete(SQL_PREFIX + entity.cw_etype, attrs) |
|
642 self.doexec(cnx, sql, attrs) |
|
643 |
|
644 def add_relation(self, cnx, subject, rtype, object, inlined=False): |
|
645 """add a relation to the source""" |
|
646 self._add_relations(cnx, rtype, [(subject, object)], inlined) |
|
647 if cnx.ertype_supports_undo(rtype): |
|
648 self._record_tx_action(cnx, 'tx_relation_actions', u'A', |
|
649 eid_from=subject, rtype=text_type(rtype), eid_to=object) |
|
650 |
|
651 def add_relations(self, cnx, rtype, subj_obj_list, inlined=False): |
|
652 """add a relations to the source""" |
|
653 self._add_relations(cnx, rtype, subj_obj_list, inlined) |
|
654 if cnx.ertype_supports_undo(rtype): |
|
655 for subject, object in subj_obj_list: |
|
656 self._record_tx_action(cnx, 'tx_relation_actions', u'A', |
|
657 eid_from=subject, rtype=text_type(rtype), eid_to=object) |
|
658 |
|
659 def _add_relations(self, cnx, rtype, subj_obj_list, inlined=False): |
|
660 """add a relation to the source""" |
|
661 sql = [] |
|
662 if inlined is False: |
|
663 attrs = [{'eid_from': subject, 'eid_to': object} |
|
664 for subject, object in subj_obj_list] |
|
665 sql.append((self.sqlgen.insert('%s_relation' % rtype, attrs[0]), attrs)) |
|
666 else: # used by data import |
|
667 etypes = {} |
|
668 for subject, object in subj_obj_list: |
|
669 etype = cnx.entity_metas(subject)['type'] |
|
670 if etype in etypes: |
|
671 etypes[etype].append((subject, object)) |
|
672 else: |
|
673 etypes[etype] = [(subject, object)] |
|
674 for subj_etype, subj_obj_list in etypes.items(): |
|
675 attrs = [{'cw_eid': subject, SQL_PREFIX + rtype: object} |
|
676 for subject, object in subj_obj_list] |
|
677 sql.append((self.sqlgen.update(SQL_PREFIX + etype, attrs[0], |
|
678 ['cw_eid']), |
|
679 attrs)) |
|
680 for statement, attrs in sql: |
|
681 self.doexecmany(cnx, statement, attrs) |
|
682 |
|
683 def delete_relation(self, cnx, subject, rtype, object): |
|
684 """delete a relation from the source""" |
|
685 rschema = self.schema.rschema(rtype) |
|
686 self._delete_relation(cnx, subject, rtype, object, rschema.inlined) |
|
687 if cnx.ertype_supports_undo(rtype): |
|
688 self._record_tx_action(cnx, 'tx_relation_actions', u'R', |
|
689 eid_from=subject, rtype=text_type(rtype), eid_to=object) |
|
690 |
|
691 def _delete_relation(self, cnx, subject, rtype, object, inlined=False): |
|
692 """delete a relation from the source""" |
|
693 if inlined: |
|
694 table = SQL_PREFIX + cnx.entity_metas(subject)['type'] |
|
695 column = SQL_PREFIX + rtype |
|
696 sql = 'UPDATE %s SET %s=NULL WHERE %seid=%%(eid)s' % (table, column, |
|
697 SQL_PREFIX) |
|
698 attrs = {'eid' : subject} |
|
699 else: |
|
700 attrs = {'eid_from': subject, 'eid_to': object} |
|
701 sql = self.sqlgen.delete('%s_relation' % rtype, attrs) |
|
702 self.doexec(cnx, sql, attrs) |
|
703 |
|
704 @statsd_timeit |
|
705 def doexec(self, cnx, query, args=None, rollback=True): |
|
706 """Execute a query. |
|
707 it's a function just so that it shows up in profiling |
|
708 """ |
|
709 cursor = cnx.cnxset.cu |
|
710 if server.DEBUG & server.DBG_SQL: |
|
711 print('exec', query, args, cnx.cnxset.cnx) |
|
712 try: |
|
713 # str(query) to avoid error if it's a unicode string |
|
714 cursor.execute(str(query), args) |
|
715 except Exception as ex: |
|
716 if self.repo.config.mode != 'test': |
|
717 # during test we get those message when trying to alter sqlite |
|
718 # db schema |
|
719 self.info("sql: %r\n args: %s\ndbms message: %r", |
|
720 query, args, ex.args[0]) |
|
721 if rollback: |
|
722 try: |
|
723 cnx.cnxset.rollback() |
|
724 if self.repo.config.mode != 'test': |
|
725 self.debug('transaction has been rolled back') |
|
726 except Exception as ex: |
|
727 pass |
|
728 if ex.__class__.__name__ == 'IntegrityError': |
|
729 # need string comparison because of various backends |
|
730 for arg in ex.args: |
|
731 # postgres, sqlserver |
|
732 mo = re.search("unique_[a-z0-9]{32}", arg) |
|
733 if mo is not None: |
|
734 raise UniqueTogetherError(cnx, cstrname=mo.group(0)) |
|
735 # old sqlite |
|
736 mo = re.search('columns? (.*) (?:is|are) not unique', arg) |
|
737 if mo is not None: # sqlite in use |
|
738 # we left chop the 'cw_' prefix of attribute names |
|
739 rtypes = [c.strip()[3:] |
|
740 for c in mo.group(1).split(',')] |
|
741 raise UniqueTogetherError(cnx, rtypes=rtypes) |
|
742 # sqlite after http://www.sqlite.org/cgi/src/info/c80e229dd9c1230a |
|
743 if arg.startswith('UNIQUE constraint failed:'): |
|
744 # message looks like: "UNIQUE constraint failed: foo.cw_bar, foo.cw_baz" |
|
745 # so drop the prefix, split on comma, drop the tablenames, and drop "cw_" |
|
746 columns = arg.split(':', 1)[1].split(',') |
|
747 rtypes = [c.split('.', 1)[1].strip()[3:] for c in columns] |
|
748 raise UniqueTogetherError(cnx, rtypes=rtypes) |
|
749 |
|
750 mo = re.search('"cstr[a-f0-9]{32}"', arg) |
|
751 if mo is not None: |
|
752 # postgresql |
|
753 raise ViolatedConstraint(cnx, cstrname=mo.group(0)[1:-1]) |
|
754 if arg.startswith('CHECK constraint failed:'): |
|
755 # sqlite3 (new) |
|
756 raise ViolatedConstraint(cnx, cstrname=arg.split(':', 1)[1].strip()) |
|
757 mo = re.match('^constraint (cstr.*) failed$', arg) |
|
758 if mo is not None: |
|
759 # sqlite3 (old) |
|
760 raise ViolatedConstraint(cnx, cstrname=mo.group(1)) |
|
761 raise |
|
762 return cursor |
|
763 |
|
764 @statsd_timeit |
|
765 def doexecmany(self, cnx, query, args): |
|
766 """Execute a query. |
|
767 it's a function just so that it shows up in profiling |
|
768 """ |
|
769 if server.DEBUG & server.DBG_SQL: |
|
770 print('execmany', query, 'with', len(args), 'arguments', cnx.cnxset.cnx) |
|
771 cursor = cnx.cnxset.cu |
|
772 try: |
|
773 # str(query) to avoid error if it's a unicode string |
|
774 cursor.executemany(str(query), args) |
|
775 except Exception as ex: |
|
776 if self.repo.config.mode != 'test': |
|
777 # during test we get those message when trying to alter sqlite |
|
778 # db schema |
|
779 self.critical("sql many: %r\n args: %s\ndbms message: %r", |
|
780 query, args, ex.args[0]) |
|
781 try: |
|
782 cnx.cnxset.rollback() |
|
783 if self.repo.config.mode != 'test': |
|
784 self.critical('transaction has been rolled back') |
|
785 except Exception: |
|
786 pass |
|
787 raise |
|
788 |
|
789 # short cut to method requiring advanced db helper usage ################## |
|
790 |
|
791 def update_rdef_column(self, cnx, rdef): |
|
792 """update physical column for a relation definition (final or inlined) |
|
793 """ |
|
794 table, column = rdef_table_column(rdef) |
|
795 coltype, allownull = rdef_physical_info(self.dbhelper, rdef) |
|
796 if not self.dbhelper.alter_column_support: |
|
797 self.error("backend can't alter %s.%s to %s%s", table, column, coltype, |
|
798 not allownull and 'NOT NULL' or '') |
|
799 return |
|
800 self.dbhelper.change_col_type(LogCursor(cnx.cnxset.cu), |
|
801 table, column, coltype, allownull) |
|
802 self.info('altered %s.%s: now %s%s', table, column, coltype, |
|
803 not allownull and 'NOT NULL' or '') |
|
804 |
|
805 def update_rdef_null_allowed(self, cnx, rdef): |
|
806 """update NULL / NOT NULL of physical column for a relation definition |
|
807 (final or inlined) |
|
808 """ |
|
809 if not self.dbhelper.alter_column_support: |
|
810 # not supported (and NOT NULL not set by yams in that case, so no |
|
811 # worry) |
|
812 return |
|
813 table, column = rdef_table_column(rdef) |
|
814 coltype, allownull = rdef_physical_info(self.dbhelper, rdef) |
|
815 self.dbhelper.set_null_allowed(LogCursor(cnx.cnxset.cu), |
|
816 table, column, coltype, allownull) |
|
817 |
|
818 def update_rdef_indexed(self, cnx, rdef): |
|
819 table, column = rdef_table_column(rdef) |
|
820 if rdef.indexed: |
|
821 self.create_index(cnx, table, column) |
|
822 else: |
|
823 self.drop_index(cnx, table, column) |
|
824 |
|
825 def update_rdef_unique(self, cnx, rdef): |
|
826 table, column = rdef_table_column(rdef) |
|
827 if rdef.constraint_by_type('UniqueConstraint'): |
|
828 self.create_index(cnx, table, column, unique=True) |
|
829 else: |
|
830 self.drop_index(cnx, table, column, unique=True) |
|
831 |
|
832 def create_index(self, cnx, table, column, unique=False): |
|
833 cursor = LogCursor(cnx.cnxset.cu) |
|
834 self.dbhelper.create_index(cursor, table, column, unique) |
|
835 |
|
836 def drop_index(self, cnx, table, column, unique=False): |
|
837 cursor = LogCursor(cnx.cnxset.cu) |
|
838 self.dbhelper.drop_index(cursor, table, column, unique) |
|
839 |
|
840 # system source interface ################################################# |
|
841 |
|
842 def _eid_type_source(self, cnx, eid, sql): |
|
843 try: |
|
844 res = self.doexec(cnx, sql).fetchone() |
|
845 if res is not None: |
|
846 return res |
|
847 except Exception: |
|
848 self.exception('failed to query entities table for eid %s', eid) |
|
849 raise UnknownEid(eid) |
|
850 |
|
851 def eid_type_source(self, cnx, eid): # pylint: disable=E0202 |
|
852 """return a tuple (type, extid, source) for the entity with id <eid>""" |
|
853 sql = 'SELECT type, extid, asource FROM entities WHERE eid=%s' % eid |
|
854 res = self._eid_type_source(cnx, eid, sql) |
|
855 if not isinstance(res, list): |
|
856 res = list(res) |
|
857 res[-2] = self.decode_extid(res[-2]) |
|
858 return res |
|
859 |
|
860 def eid_type_source_pre_131(self, cnx, eid): |
|
861 """return a tuple (type, extid, source) for the entity with id <eid>""" |
|
862 sql = 'SELECT type, extid FROM entities WHERE eid=%s' % eid |
|
863 res = self._eid_type_source(cnx, eid, sql) |
|
864 if not isinstance(res, list): |
|
865 res = list(res) |
|
866 res[-1] = self.decode_extid(res[-1]) |
|
867 res.append("system") |
|
868 return res |
|
869 |
|
870 def extid2eid(self, cnx, extid): |
|
871 """get eid from an external id. Return None if no record found.""" |
|
872 assert isinstance(extid, binary_type) |
|
873 args = {'x': b64encode(extid).decode('ascii')} |
|
874 cursor = self.doexec(cnx, |
|
875 'SELECT eid FROM entities WHERE extid=%(x)s', |
|
876 args) |
|
877 # XXX testing rowcount cause strange bug with sqlite, results are there |
|
878 # but rowcount is 0 |
|
879 #if cursor.rowcount > 0: |
|
880 try: |
|
881 result = cursor.fetchone() |
|
882 if result: |
|
883 return result[0] |
|
884 except Exception: |
|
885 pass |
|
886 cursor = self.doexec(cnx, |
|
887 'SELECT eid FROM moved_entities WHERE extid=%(x)s', |
|
888 args) |
|
889 try: |
|
890 result = cursor.fetchone() |
|
891 if result: |
|
892 # entity was moved to the system source, return negative |
|
893 # number to tell the external source to ignore it |
|
894 return -result[0] |
|
895 except Exception: |
|
896 pass |
|
897 return None |
|
898 |
|
899 def _handle_is_relation_sql(self, cnx, sql, attrs): |
|
900 """ Handler for specific is_relation sql that may be |
|
901 overwritten in some stores""" |
|
902 self.doexec(cnx, sql % attrs) |
|
903 |
|
904 _handle_insert_entity_sql = doexec |
|
905 _handle_is_instance_of_sql = _handle_source_relation_sql = _handle_is_relation_sql |
|
906 |
|
907 def add_info(self, cnx, entity, source, extid): |
|
908 """add type and source info for an eid into the system table""" |
|
909 assert cnx.cnxset is not None |
|
910 # begin by inserting eid/type/source/extid into the entities table |
|
911 if extid is not None: |
|
912 assert isinstance(extid, binary_type) |
|
913 extid = b64encode(extid).decode('ascii') |
|
914 attrs = {'type': text_type(entity.cw_etype), 'eid': entity.eid, 'extid': extid, |
|
915 'asource': text_type(source.uri)} |
|
916 self._handle_insert_entity_sql(cnx, self.sqlgen.insert('entities', attrs), attrs) |
|
917 # insert core relations: is, is_instance_of and cw_source |
|
918 try: |
|
919 self._handle_is_relation_sql(cnx, 'INSERT INTO is_relation(eid_from,eid_to) VALUES (%s,%s)', |
|
920 (entity.eid, eschema_eid(cnx, entity.e_schema))) |
|
921 except IndexError: |
|
922 # during schema serialization, skip |
|
923 pass |
|
924 else: |
|
925 for eschema in entity.e_schema.ancestors() + [entity.e_schema]: |
|
926 self._handle_is_relation_sql(cnx, |
|
927 'INSERT INTO is_instance_of_relation(eid_from,eid_to) VALUES (%s,%s)', |
|
928 (entity.eid, eschema_eid(cnx, eschema))) |
|
929 if 'CWSource' in self.schema and source.eid is not None: # else, cw < 3.10 |
|
930 self._handle_is_relation_sql(cnx, 'INSERT INTO cw_source_relation(eid_from,eid_to) VALUES (%s,%s)', |
|
931 (entity.eid, source.eid)) |
|
932 # now we can update the full text index |
|
933 if self.need_fti_indexation(entity.cw_etype): |
|
934 self.index_entity(cnx, entity=entity) |
|
935 |
|
936 def update_info(self, cnx, entity, need_fti_update): |
|
937 """mark entity as being modified, fulltext reindex if needed""" |
|
938 if need_fti_update: |
|
939 # reindex the entity only if this query is updating at least |
|
940 # one indexable attribute |
|
941 self.index_entity(cnx, entity=entity) |
|
942 |
|
943 def delete_info_multi(self, cnx, entities): |
|
944 """delete system information on deletion of a list of entities with the |
|
945 same etype and belinging to the same source |
|
946 |
|
947 * update the fti |
|
948 * remove record from the `entities` table |
|
949 """ |
|
950 self.fti_unindex_entities(cnx, entities) |
|
951 attrs = {'eid': '(%s)' % ','.join([str(_e.eid) for _e in entities])} |
|
952 self.doexec(cnx, self.sqlgen.delete_many('entities', attrs), attrs) |
|
953 |
|
954 # undo support ############################################################# |
|
955 |
|
956 def undoable_transactions(self, cnx, ueid=None, **actionfilters): |
|
957 """See :class:`cubicweb.repoapi.Connection.undoable_transactions`""" |
|
958 # force filtering to connection's user if not a manager |
|
959 if not cnx.user.is_in_group('managers'): |
|
960 ueid = cnx.user.eid |
|
961 restr = {} |
|
962 if ueid is not None: |
|
963 restr['tx_user'] = ueid |
|
964 sql = self.sqlgen.select('transactions', restr, ('tx_uuid', 'tx_time', 'tx_user')) |
|
965 if actionfilters: |
|
966 # we will need subqueries to filter transactions according to |
|
967 # actions done |
|
968 tearestr = {} # filters on the tx_entity_actions table |
|
969 trarestr = {} # filters on the tx_relation_actions table |
|
970 genrestr = {} # generic filters, appliyable to both table |
|
971 # unless public explicitly set to false, we only consider public |
|
972 # actions |
|
973 if actionfilters.pop('public', True): |
|
974 genrestr['txa_public'] = True |
|
975 # put additional filters in trarestr and/or tearestr |
|
976 for key, val in actionfilters.items(): |
|
977 if key == 'etype': |
|
978 # filtering on etype implies filtering on entity actions |
|
979 # only, and with no eid specified |
|
980 assert actionfilters.get('action', 'C') in 'CUD' |
|
981 assert not 'eid' in actionfilters |
|
982 tearestr['etype'] = text_type(val) |
|
983 elif key == 'eid': |
|
984 # eid filter may apply to 'eid' of tx_entity_actions or to |
|
985 # 'eid_from' OR 'eid_to' of tx_relation_actions |
|
986 if actionfilters.get('action', 'C') in 'CUD': |
|
987 tearestr['eid'] = val |
|
988 if actionfilters.get('action', 'A') in 'AR': |
|
989 trarestr['eid_from'] = val |
|
990 trarestr['eid_to'] = val |
|
991 elif key == 'action': |
|
992 if val in 'CUD': |
|
993 tearestr['txa_action'] = text_type(val) |
|
994 else: |
|
995 assert val in 'AR' |
|
996 trarestr['txa_action'] = text_type(val) |
|
997 else: |
|
998 raise AssertionError('unknow filter %s' % key) |
|
999 assert trarestr or tearestr, "can't only filter on 'public'" |
|
1000 subqsqls = [] |
|
1001 # append subqueries to the original query, using EXISTS() |
|
1002 if trarestr or (genrestr and not tearestr): |
|
1003 trarestr.update(genrestr) |
|
1004 trasql = self.sqlgen.select('tx_relation_actions', trarestr, ('1',)) |
|
1005 if 'eid_from' in trarestr: |
|
1006 # replace AND by OR between eid_from/eid_to restriction |
|
1007 trasql = sql_or_clauses(trasql, ['eid_from = %(eid_from)s', |
|
1008 'eid_to = %(eid_to)s']) |
|
1009 trasql += ' AND transactions.tx_uuid=tx_relation_actions.tx_uuid' |
|
1010 subqsqls.append('EXISTS(%s)' % trasql) |
|
1011 if tearestr or (genrestr and not trarestr): |
|
1012 tearestr.update(genrestr) |
|
1013 teasql = self.sqlgen.select('tx_entity_actions', tearestr, ('1',)) |
|
1014 teasql += ' AND transactions.tx_uuid=tx_entity_actions.tx_uuid' |
|
1015 subqsqls.append('EXISTS(%s)' % teasql) |
|
1016 if restr: |
|
1017 sql += ' AND %s' % ' OR '.join(subqsqls) |
|
1018 else: |
|
1019 sql += ' WHERE %s' % ' OR '.join(subqsqls) |
|
1020 restr.update(trarestr) |
|
1021 restr.update(tearestr) |
|
1022 # we want results ordered by transaction's time descendant |
|
1023 sql += ' ORDER BY tx_time DESC' |
|
1024 cu = self.doexec(cnx, sql, restr) |
|
1025 # turn results into transaction objects |
|
1026 return [tx.Transaction(cnx, *args) for args in cu.fetchall()] |
|
1027 |
|
1028 def tx_info(self, cnx, txuuid): |
|
1029 """See :class:`cubicweb.repoapi.Connection.transaction_info`""" |
|
1030 return tx.Transaction(cnx, txuuid, *self._tx_info(cnx, text_type(txuuid))) |
|
1031 |
|
1032 def tx_actions(self, cnx, txuuid, public): |
|
1033 """See :class:`cubicweb.repoapi.Connection.transaction_actions`""" |
|
1034 txuuid = text_type(txuuid) |
|
1035 self._tx_info(cnx, txuuid) |
|
1036 restr = {'tx_uuid': txuuid} |
|
1037 if public: |
|
1038 restr['txa_public'] = True |
|
1039 # XXX use generator to avoid loading everything in memory? |
|
1040 sql = self.sqlgen.select('tx_entity_actions', restr, |
|
1041 ('txa_action', 'txa_public', 'txa_order', |
|
1042 'etype', 'eid', 'changes')) |
|
1043 with cnx.ensure_cnx_set: |
|
1044 cu = self.doexec(cnx, sql, restr) |
|
1045 actions = [tx.EntityAction(a,p,o,et,e,c and pickle.loads(self.binary_to_str(c))) |
|
1046 for a,p,o,et,e,c in cu.fetchall()] |
|
1047 sql = self.sqlgen.select('tx_relation_actions', restr, |
|
1048 ('txa_action', 'txa_public', 'txa_order', |
|
1049 'rtype', 'eid_from', 'eid_to')) |
|
1050 with cnx.ensure_cnx_set: |
|
1051 cu = self.doexec(cnx, sql, restr) |
|
1052 actions += [tx.RelationAction(*args) for args in cu.fetchall()] |
|
1053 return sorted(actions, key=lambda x: x.order) |
|
1054 |
|
1055 def undo_transaction(self, cnx, txuuid): |
|
1056 """See :class:`cubicweb.repoapi.Connection.undo_transaction` |
|
1057 |
|
1058 important note: while undoing of a transaction, only hooks in the |
|
1059 'integrity', 'activeintegrity' and 'undo' categories are called. |
|
1060 """ |
|
1061 errors = [] |
|
1062 cnx.transaction_data['undoing_uuid'] = txuuid |
|
1063 with cnx.deny_all_hooks_but('integrity', 'activeintegrity', 'undo'): |
|
1064 with cnx.security_enabled(read=False): |
|
1065 for action in reversed(self.tx_actions(cnx, txuuid, False)): |
|
1066 undomethod = getattr(self, '_undo_%s' % action.action.lower()) |
|
1067 errors += undomethod(cnx, action) |
|
1068 # remove the transactions record |
|
1069 self.doexec(cnx, |
|
1070 "DELETE FROM transactions WHERE tx_uuid='%s'" % txuuid) |
|
1071 if errors: |
|
1072 raise UndoTransactionException(txuuid, errors) |
|
1073 else: |
|
1074 return |
|
1075 |
|
1076 def start_undoable_transaction(self, cnx, uuid): |
|
1077 """connection callback to insert a transaction record in the transactions |
|
1078 table when some undoable transaction is started |
|
1079 """ |
|
1080 ueid = cnx.user.eid |
|
1081 attrs = {'tx_uuid': uuid, 'tx_user': ueid, 'tx_time': datetime.utcnow()} |
|
1082 self.doexec(cnx, self.sqlgen.insert('transactions', attrs), attrs) |
|
1083 |
|
1084 def _save_attrs(self, cnx, entity, attrs): |
|
1085 """return a pickleable dictionary containing current values for given |
|
1086 attributes of the entity |
|
1087 """ |
|
1088 restr = {'cw_eid': entity.eid} |
|
1089 sql = self.sqlgen.select(SQL_PREFIX + entity.cw_etype, restr, attrs) |
|
1090 cu = self.doexec(cnx, sql, restr) |
|
1091 values = dict(zip(attrs, cu.fetchone())) |
|
1092 # ensure backend specific binary are converted back to string |
|
1093 eschema = entity.e_schema |
|
1094 for column in attrs: |
|
1095 # [3:] remove 'cw_' prefix |
|
1096 attr = column[3:] |
|
1097 if not eschema.subjrels[attr].final: |
|
1098 continue |
|
1099 if eschema.destination(attr) in ('Password', 'Bytes'): |
|
1100 value = values[column] |
|
1101 if value is not None: |
|
1102 values[column] = self.binary_to_str(value) |
|
1103 return values |
|
1104 |
|
1105 def _record_tx_action(self, cnx, table, action, **kwargs): |
|
1106 """record a transaction action in the given table (either |
|
1107 'tx_entity_actions' or 'tx_relation_action') |
|
1108 """ |
|
1109 kwargs['tx_uuid'] = cnx.transaction_uuid() |
|
1110 kwargs['txa_action'] = action |
|
1111 kwargs['txa_order'] = cnx.transaction_inc_action_counter() |
|
1112 kwargs['txa_public'] = not cnx.hooks_in_progress |
|
1113 self.doexec(cnx, self.sqlgen.insert(table, kwargs), kwargs) |
|
1114 |
|
1115 def _tx_info(self, cnx, txuuid): |
|
1116 """return transaction's time and user of the transaction with the given uuid. |
|
1117 |
|
1118 raise `NoSuchTransaction` if there is no such transaction of if the |
|
1119 connection's user isn't allowed to see it. |
|
1120 """ |
|
1121 restr = {'tx_uuid': txuuid} |
|
1122 sql = self.sqlgen.select('transactions', restr, |
|
1123 ('tx_time', 'tx_user')) |
|
1124 cu = self.doexec(cnx, sql, restr) |
|
1125 try: |
|
1126 time, ueid = cu.fetchone() |
|
1127 except TypeError: |
|
1128 raise tx.NoSuchTransaction(txuuid) |
|
1129 if not (cnx.user.is_in_group('managers') |
|
1130 or cnx.user.eid == ueid): |
|
1131 raise tx.NoSuchTransaction(txuuid) |
|
1132 return time, ueid |
|
1133 |
|
1134 def _reedit_entity(self, entity, changes, err): |
|
1135 cnx = entity._cw |
|
1136 eid = entity.eid |
|
1137 entity.cw_edited = edited = EditedEntity(entity) |
|
1138 # check for schema changes, entities linked through inlined relation |
|
1139 # still exists, rewrap binary values |
|
1140 eschema = entity.e_schema |
|
1141 getrschema = eschema.subjrels |
|
1142 for column, value in changes.items(): |
|
1143 rtype = column[len(SQL_PREFIX):] |
|
1144 if rtype == "eid": |
|
1145 continue # XXX should even `eid` be stored in action changes? |
|
1146 try: |
|
1147 rschema = getrschema[rtype] |
|
1148 except KeyError: |
|
1149 err(cnx._("can't restore relation %(rtype)s of entity %(eid)s, " |
|
1150 "this relation does not exist in the schema anymore.") |
|
1151 % {'rtype': rtype, 'eid': eid}) |
|
1152 if not rschema.final: |
|
1153 if not rschema.inlined: |
|
1154 assert value is None |
|
1155 # rschema is an inlined relation |
|
1156 elif value is not None: |
|
1157 # not a deletion: we must put something in edited |
|
1158 try: |
|
1159 entity._cw.entity_from_eid(value) # check target exists |
|
1160 edited[rtype] = value |
|
1161 except UnknownEid: |
|
1162 err(cnx._("can't restore entity %(eid)s of type %(eschema)s, " |
|
1163 "target of %(rtype)s (eid %(value)s) does not exist any longer") |
|
1164 % locals()) |
|
1165 changes[column] = None |
|
1166 elif eschema.destination(rtype) in ('Bytes', 'Password'): |
|
1167 changes[column] = self._binary(value) |
|
1168 edited[rtype] = Binary(value) |
|
1169 elif PY2 and isinstance(value, str): |
|
1170 edited[rtype] = text_type(value, cnx.encoding, 'replace') |
|
1171 else: |
|
1172 edited[rtype] = value |
|
1173 # This must only be done after init_entitiy_caches : defered in calling functions |
|
1174 # edited.check() |
|
1175 |
|
1176 def _undo_d(self, cnx, action): |
|
1177 """undo an entity deletion""" |
|
1178 errors = [] |
|
1179 err = errors.append |
|
1180 eid = action.eid |
|
1181 etype = action.etype |
|
1182 _ = cnx._ |
|
1183 # get an entity instance |
|
1184 try: |
|
1185 entity = self.repo.vreg['etypes'].etype_class(etype)(cnx) |
|
1186 except Exception: |
|
1187 err("can't restore entity %s of type %s, type no more supported" |
|
1188 % (eid, etype)) |
|
1189 return errors |
|
1190 self._reedit_entity(entity, action.changes, err) |
|
1191 entity.eid = eid |
|
1192 cnx.repo.init_entity_caches(cnx, entity, self) |
|
1193 entity.cw_edited.check() |
|
1194 self.repo.hm.call_hooks('before_add_entity', cnx, entity=entity) |
|
1195 # restore the entity |
|
1196 action.changes['cw_eid'] = eid |
|
1197 # restore record in entities (will update fti if needed) |
|
1198 self.add_info(cnx, entity, self, None) |
|
1199 sql = self.sqlgen.insert(SQL_PREFIX + etype, action.changes) |
|
1200 self.doexec(cnx, sql, action.changes) |
|
1201 self.repo.hm.call_hooks('after_add_entity', cnx, entity=entity) |
|
1202 return errors |
|
1203 |
|
1204 def _undo_r(self, cnx, action): |
|
1205 """undo a relation removal""" |
|
1206 errors = [] |
|
1207 subj, rtype, obj = action.eid_from, action.rtype, action.eid_to |
|
1208 try: |
|
1209 sentity, oentity, rdef = _undo_rel_info(cnx, subj, rtype, obj) |
|
1210 except _UndoException as ex: |
|
1211 errors.append(text_type(ex)) |
|
1212 else: |
|
1213 for role, entity in (('subject', sentity), |
|
1214 ('object', oentity)): |
|
1215 try: |
|
1216 _undo_check_relation_target(entity, rdef, role) |
|
1217 except _UndoException as ex: |
|
1218 errors.append(text_type(ex)) |
|
1219 continue |
|
1220 if not errors: |
|
1221 self.repo.hm.call_hooks('before_add_relation', cnx, |
|
1222 eidfrom=subj, rtype=rtype, eidto=obj) |
|
1223 # add relation in the database |
|
1224 self._add_relations(cnx, rtype, [(subj, obj)], rdef.rtype.inlined) |
|
1225 # set related cache |
|
1226 cnx.update_rel_cache_add(subj, rtype, obj, rdef.rtype.symmetric) |
|
1227 self.repo.hm.call_hooks('after_add_relation', cnx, |
|
1228 eidfrom=subj, rtype=rtype, eidto=obj) |
|
1229 return errors |
|
1230 |
|
1231 def _undo_c(self, cnx, action): |
|
1232 """undo an entity creation""" |
|
1233 eid = action.eid |
|
1234 # XXX done to avoid fetching all remaining relation for the entity |
|
1235 # we should find an efficient way to do this (keeping current veolidf |
|
1236 # massive deletion performance) |
|
1237 if _undo_has_later_transaction(cnx, eid): |
|
1238 msg = cnx._('some later transaction(s) touch entity, undo them ' |
|
1239 'first') |
|
1240 raise ValidationError(eid, {None: msg}) |
|
1241 etype = action.etype |
|
1242 # get an entity instance |
|
1243 try: |
|
1244 entity = self.repo.vreg['etypes'].etype_class(etype)(cnx) |
|
1245 except Exception: |
|
1246 return [cnx._( |
|
1247 "Can't undo creation of entity %(eid)s of type %(etype)s, type " |
|
1248 "no more supported" % {'eid': eid, 'etype': etype})] |
|
1249 entity.eid = eid |
|
1250 # for proper eid/type cache update |
|
1251 CleanupDeletedEidsCacheOp.get_instance(cnx).add_data(eid) |
|
1252 self.repo.hm.call_hooks('before_delete_entity', cnx, entity=entity) |
|
1253 # remove is / is_instance_of which are added using sql by hooks, hence |
|
1254 # unvisible as transaction action |
|
1255 self.doexec(cnx, 'DELETE FROM is_relation WHERE eid_from=%s' % eid) |
|
1256 self.doexec(cnx, 'DELETE FROM is_instance_of_relation WHERE eid_from=%s' % eid) |
|
1257 self.doexec(cnx, 'DELETE FROM cw_source_relation WHERE eid_from=%s' % eid) |
|
1258 # XXX check removal of inlined relation? |
|
1259 # delete the entity |
|
1260 attrs = {'cw_eid': eid} |
|
1261 sql = self.sqlgen.delete(SQL_PREFIX + entity.cw_etype, attrs) |
|
1262 self.doexec(cnx, sql, attrs) |
|
1263 # remove record from entities (will update fti if needed) |
|
1264 self.delete_info_multi(cnx, [entity]) |
|
1265 self.repo.hm.call_hooks('after_delete_entity', cnx, entity=entity) |
|
1266 return () |
|
1267 |
|
1268 def _undo_u(self, cnx, action): |
|
1269 """undo an entity update""" |
|
1270 errors = [] |
|
1271 err = errors.append |
|
1272 try: |
|
1273 entity = cnx.entity_from_eid(action.eid) |
|
1274 except UnknownEid: |
|
1275 err(cnx._("can't restore state of entity %s, it has been " |
|
1276 "deleted inbetween") % action.eid) |
|
1277 return errors |
|
1278 self._reedit_entity(entity, action.changes, err) |
|
1279 entity.cw_edited.check() |
|
1280 self.repo.hm.call_hooks('before_update_entity', cnx, entity=entity) |
|
1281 sql = self.sqlgen.update(SQL_PREFIX + entity.cw_etype, action.changes, |
|
1282 ['cw_eid']) |
|
1283 self.doexec(cnx, sql, action.changes) |
|
1284 self.repo.hm.call_hooks('after_update_entity', cnx, entity=entity) |
|
1285 return errors |
|
1286 |
|
1287 def _undo_a(self, cnx, action): |
|
1288 """undo a relation addition""" |
|
1289 errors = [] |
|
1290 subj, rtype, obj = action.eid_from, action.rtype, action.eid_to |
|
1291 try: |
|
1292 sentity, oentity, rdef = _undo_rel_info(cnx, subj, rtype, obj) |
|
1293 except _UndoException as ex: |
|
1294 errors.append(text_type(ex)) |
|
1295 else: |
|
1296 rschema = rdef.rtype |
|
1297 if rschema.inlined: |
|
1298 sql = 'SELECT 1 FROM cw_%s WHERE cw_eid=%s and cw_%s=%s'\ |
|
1299 % (sentity.cw_etype, subj, rtype, obj) |
|
1300 else: |
|
1301 sql = 'SELECT 1 FROM %s_relation WHERE eid_from=%s and eid_to=%s'\ |
|
1302 % (rtype, subj, obj) |
|
1303 cu = self.doexec(cnx, sql) |
|
1304 if cu.fetchone() is None: |
|
1305 errors.append(cnx._( |
|
1306 "Can't undo addition of relation %(rtype)s from %(subj)s to" |
|
1307 " %(obj)s, doesn't exist anymore" % locals())) |
|
1308 if not errors: |
|
1309 self.repo.hm.call_hooks('before_delete_relation', cnx, |
|
1310 eidfrom=subj, rtype=rtype, eidto=obj) |
|
1311 # delete relation from the database |
|
1312 self._delete_relation(cnx, subj, rtype, obj, rschema.inlined) |
|
1313 # set related cache |
|
1314 cnx.update_rel_cache_del(subj, rtype, obj, rschema.symmetric) |
|
1315 self.repo.hm.call_hooks('after_delete_relation', cnx, |
|
1316 eidfrom=subj, rtype=rtype, eidto=obj) |
|
1317 return errors |
|
1318 |
|
1319 # full text index handling ################################################# |
|
1320 |
|
1321 @cached |
|
1322 def need_fti_indexation(self, etype): |
|
1323 eschema = self.schema.eschema(etype) |
|
1324 if any(eschema.indexable_attributes()): |
|
1325 return True |
|
1326 if any(eschema.fulltext_containers()): |
|
1327 return True |
|
1328 return False |
|
1329 |
|
1330 def index_entity(self, cnx, entity): |
|
1331 """create an operation to [re]index textual content of the given entity |
|
1332 on commit |
|
1333 """ |
|
1334 if self.do_fti: |
|
1335 FTIndexEntityOp.get_instance(cnx).add_data(entity.eid) |
|
1336 |
|
1337 def fti_unindex_entities(self, cnx, entities): |
|
1338 """remove text content for entities from the full text index |
|
1339 """ |
|
1340 cursor = cnx.cnxset.cu |
|
1341 cursor_unindex_object = self.dbhelper.cursor_unindex_object |
|
1342 try: |
|
1343 for entity in entities: |
|
1344 cursor_unindex_object(entity.eid, cursor) |
|
1345 except Exception: # let KeyboardInterrupt / SystemExit propagate |
|
1346 self.exception('error while unindexing %s', entity) |
|
1347 |
|
1348 |
|
1349 def fti_index_entities(self, cnx, entities): |
|
1350 """add text content of created/modified entities to the full text index |
|
1351 """ |
|
1352 cursor_index_object = self.dbhelper.cursor_index_object |
|
1353 cursor = cnx.cnxset.cu |
|
1354 try: |
|
1355 # use cursor_index_object, not cursor_reindex_object since |
|
1356 # unindexing done in the FTIndexEntityOp |
|
1357 for entity in entities: |
|
1358 cursor_index_object(entity.eid, |
|
1359 entity.cw_adapt_to('IFTIndexable'), |
|
1360 cursor) |
|
1361 except Exception: # let KeyboardInterrupt / SystemExit propagate |
|
1362 self.exception('error while indexing %s', entity) |
|
1363 |
|
1364 |
|
1365 class FTIndexEntityOp(hook.DataOperationMixIn, hook.LateOperation): |
|
1366 """operation to delay entity full text indexation to commit |
|
1367 |
|
1368 since fti indexing may trigger discovery of other entities, it should be |
|
1369 triggered on precommit, not commit, and this should be done after other |
|
1370 precommit operation which may add relations to the entity |
|
1371 """ |
|
1372 |
|
1373 def precommit_event(self): |
|
1374 cnx = self.cnx |
|
1375 source = cnx.repo.system_source |
|
1376 pendingeids = cnx.transaction_data.get('pendingeids', ()) |
|
1377 done = cnx.transaction_data.setdefault('indexedeids', set()) |
|
1378 to_reindex = set() |
|
1379 for eid in self.get_data(): |
|
1380 if eid in pendingeids or eid in done: |
|
1381 # entity added and deleted in the same transaction or already |
|
1382 # processed |
|
1383 continue |
|
1384 done.add(eid) |
|
1385 iftindexable = cnx.entity_from_eid(eid).cw_adapt_to('IFTIndexable') |
|
1386 to_reindex |= set(iftindexable.fti_containers()) |
|
1387 source.fti_unindex_entities(cnx, to_reindex) |
|
1388 source.fti_index_entities(cnx, to_reindex) |
|
1389 |
|
1390 def sql_schema(driver): |
|
1391 helper = get_db_helper(driver) |
|
1392 typemap = helper.TYPE_MAPPING |
|
1393 schema = """ |
|
1394 /* Create the repository's system database */ |
|
1395 |
|
1396 %s |
|
1397 |
|
1398 CREATE TABLE entities ( |
|
1399 eid INTEGER PRIMARY KEY NOT NULL, |
|
1400 type VARCHAR(64) NOT NULL, |
|
1401 asource VARCHAR(128) NOT NULL, |
|
1402 extid VARCHAR(256) |
|
1403 );; |
|
1404 CREATE INDEX entities_type_idx ON entities(type);; |
|
1405 CREATE TABLE moved_entities ( |
|
1406 eid INTEGER PRIMARY KEY NOT NULL, |
|
1407 extid VARCHAR(256) UNIQUE NOT NULL |
|
1408 );; |
|
1409 |
|
1410 CREATE TABLE transactions ( |
|
1411 tx_uuid CHAR(32) PRIMARY KEY NOT NULL, |
|
1412 tx_user INTEGER NOT NULL, |
|
1413 tx_time %s NOT NULL |
|
1414 );; |
|
1415 CREATE INDEX transactions_tx_user_idx ON transactions(tx_user);; |
|
1416 CREATE INDEX transactions_tx_time_idx ON transactions(tx_time);; |
|
1417 |
|
1418 CREATE TABLE tx_entity_actions ( |
|
1419 tx_uuid CHAR(32) REFERENCES transactions(tx_uuid) ON DELETE CASCADE, |
|
1420 txa_action CHAR(1) NOT NULL, |
|
1421 txa_public %s NOT NULL, |
|
1422 txa_order INTEGER, |
|
1423 eid INTEGER NOT NULL, |
|
1424 etype VARCHAR(64) NOT NULL, |
|
1425 changes %s |
|
1426 );; |
|
1427 CREATE INDEX tx_entity_actions_txa_action_idx ON tx_entity_actions(txa_action);; |
|
1428 CREATE INDEX tx_entity_actions_txa_public_idx ON tx_entity_actions(txa_public);; |
|
1429 CREATE INDEX tx_entity_actions_eid_idx ON tx_entity_actions(eid);; |
|
1430 CREATE INDEX tx_entity_actions_etype_idx ON tx_entity_actions(etype);; |
|
1431 CREATE INDEX tx_entity_actions_tx_uuid_idx ON tx_entity_actions(tx_uuid);; |
|
1432 |
|
1433 CREATE TABLE tx_relation_actions ( |
|
1434 tx_uuid CHAR(32) REFERENCES transactions(tx_uuid) ON DELETE CASCADE, |
|
1435 txa_action CHAR(1) NOT NULL, |
|
1436 txa_public %s NOT NULL, |
|
1437 txa_order INTEGER, |
|
1438 eid_from INTEGER NOT NULL, |
|
1439 eid_to INTEGER NOT NULL, |
|
1440 rtype VARCHAR(256) NOT NULL |
|
1441 );; |
|
1442 CREATE INDEX tx_relation_actions_txa_action_idx ON tx_relation_actions(txa_action);; |
|
1443 CREATE INDEX tx_relation_actions_txa_public_idx ON tx_relation_actions(txa_public);; |
|
1444 CREATE INDEX tx_relation_actions_eid_from_idx ON tx_relation_actions(eid_from);; |
|
1445 CREATE INDEX tx_relation_actions_eid_to_idx ON tx_relation_actions(eid_to);; |
|
1446 CREATE INDEX tx_relation_actions_tx_uuid_idx ON tx_relation_actions(tx_uuid);; |
|
1447 """ % (helper.sql_create_numrange('entities_id_seq').replace(';', ';;'), |
|
1448 typemap['Datetime'], |
|
1449 typemap['Boolean'], typemap['Bytes'], typemap['Boolean']) |
|
1450 if helper.backend_name == 'sqlite': |
|
1451 # sqlite support the ON DELETE CASCADE syntax but do nothing |
|
1452 schema += ''' |
|
1453 CREATE TRIGGER fkd_transactions |
|
1454 BEFORE DELETE ON transactions |
|
1455 FOR EACH ROW BEGIN |
|
1456 DELETE FROM tx_entity_actions WHERE tx_uuid=OLD.tx_uuid; |
|
1457 DELETE FROM tx_relation_actions WHERE tx_uuid=OLD.tx_uuid; |
|
1458 END;; |
|
1459 ''' |
|
1460 schema += ';;'.join(helper.sqls_create_multicol_unique_index('entities', ['extid'])) |
|
1461 schema += ';;\n' |
|
1462 return schema |
|
1463 |
|
1464 |
|
1465 def sql_drop_schema(driver): |
|
1466 helper = get_db_helper(driver) |
|
1467 return """ |
|
1468 %s; |
|
1469 %s |
|
1470 DROP TABLE entities; |
|
1471 DROP TABLE tx_entity_actions; |
|
1472 DROP TABLE tx_relation_actions; |
|
1473 DROP TABLE transactions; |
|
1474 """ % (';'.join(helper.sqls_drop_multicol_unique_index('entities', ['extid'])), |
|
1475 helper.sql_drop_numrange('entities_id_seq')) |
|
1476 |
|
1477 |
|
1478 def grant_schema(user, set_owner=True): |
|
1479 result = '' |
|
1480 for table in ('entities', 'entities_id_seq', |
|
1481 'transactions', 'tx_entity_actions', 'tx_relation_actions'): |
|
1482 if set_owner: |
|
1483 result = 'ALTER TABLE %s OWNER TO %s;\n' % (table, user) |
|
1484 result += 'GRANT ALL ON %s TO %s;\n' % (table, user) |
|
1485 return result |
|
1486 |
|
1487 |
|
1488 class BaseAuthentifier(object): |
|
1489 |
|
1490 def __init__(self, source=None): |
|
1491 self.source = source |
|
1492 |
|
1493 def set_schema(self, schema): |
|
1494 """set the instance'schema""" |
|
1495 pass |
|
1496 |
|
1497 class LoginPasswordAuthentifier(BaseAuthentifier): |
|
1498 passwd_rql = 'Any P WHERE X is CWUser, X login %(login)s, X upassword P' |
|
1499 auth_rql = (u'Any X WHERE X is CWUser, X login %(login)s, X upassword %(pwd)s, ' |
|
1500 'X cw_source S, S name "system"') |
|
1501 _sols = ({'X': 'CWUser', 'P': 'Password', 'S': 'CWSource'},) |
|
1502 |
|
1503 def set_schema(self, schema): |
|
1504 """set the instance'schema""" |
|
1505 if 'CWUser' in schema: # probably an empty schema if not true... |
|
1506 # rql syntax trees used to authenticate users |
|
1507 self._passwd_rqlst = self.source.compile_rql(self.passwd_rql, self._sols) |
|
1508 self._auth_rqlst = self.source.compile_rql(self.auth_rql, self._sols) |
|
1509 |
|
1510 def authenticate(self, cnx, login, password=None, **kwargs): |
|
1511 """return CWUser eid for the given login/password if this account is |
|
1512 defined in this source, else raise `AuthenticationError` |
|
1513 |
|
1514 two queries are needed since passwords are stored crypted, so we have |
|
1515 to fetch the salt first |
|
1516 """ |
|
1517 args = {'login': login, 'pwd' : None} |
|
1518 if password is not None: |
|
1519 rset = self.source.syntax_tree_search(cnx, self._passwd_rqlst, args) |
|
1520 try: |
|
1521 pwd = rset[0][0] |
|
1522 except IndexError: |
|
1523 raise AuthenticationError('bad login') |
|
1524 if pwd is None: |
|
1525 # if pwd is None but a password is provided, something is wrong |
|
1526 raise AuthenticationError('bad password') |
|
1527 # passwords are stored using the Bytes type, so we get a StringIO |
|
1528 args['pwd'] = Binary(crypt_password(password, pwd.getvalue())) |
|
1529 # get eid from login and (crypted) password |
|
1530 rset = self.source.syntax_tree_search(cnx, self._auth_rqlst, args) |
|
1531 pwd = args['pwd'] |
|
1532 try: |
|
1533 user = rset[0][0] |
|
1534 # If the stored hash uses a deprecated scheme (e.g. DES or MD5 used |
|
1535 # before 3.14.7), update with a fresh one |
|
1536 if pwd is not None and pwd.getvalue(): |
|
1537 verify, newhash = verify_and_update(password, pwd.getvalue()) |
|
1538 if not verify: # should not happen, but... |
|
1539 raise AuthenticationError('bad password') |
|
1540 if newhash: |
|
1541 cnx.system_sql("UPDATE %s SET %s=%%(newhash)s WHERE %s=%%(login)s" % ( |
|
1542 SQL_PREFIX + 'CWUser', |
|
1543 SQL_PREFIX + 'upassword', |
|
1544 SQL_PREFIX + 'login'), |
|
1545 {'newhash': self.source._binary(newhash.encode('ascii')), |
|
1546 'login': login}) |
|
1547 cnx.commit() |
|
1548 return user |
|
1549 except IndexError: |
|
1550 raise AuthenticationError('bad password') |
|
1551 |
|
1552 |
|
1553 class EmailPasswordAuthentifier(BaseAuthentifier): |
|
1554 def authenticate(self, cnx, login, **authinfo): |
|
1555 # email_auth flag prevent from infinite recursion (call to |
|
1556 # repo.check_auth_info at the end of this method may lead us here again) |
|
1557 if not '@' in login or authinfo.pop('email_auth', None): |
|
1558 raise AuthenticationError('not an email') |
|
1559 rset = cnx.execute('Any L WHERE U login L, U primary_email M, ' |
|
1560 'M address %(login)s', {'login': login}, |
|
1561 build_descr=False) |
|
1562 if rset.rowcount != 1: |
|
1563 raise AuthenticationError('unexisting email') |
|
1564 login = rset.rows[0][0] |
|
1565 authinfo['email_auth'] = True |
|
1566 return self.source.repo.check_auth_info(cnx, login, authinfo) |
|
1567 |
|
1568 |
|
1569 class DatabaseIndependentBackupRestore(object): |
|
1570 """Helper class to perform db backend agnostic backup and restore |
|
1571 |
|
1572 The backup and restore methods are used to dump / restore the |
|
1573 system database in a database independent format. The file is a |
|
1574 Zip archive containing the following files: |
|
1575 |
|
1576 * format.txt: the format of the archive. Currently '1.1' |
|
1577 * tables.txt: list of filenames in the archive tables/ directory |
|
1578 * sequences.txt: list of filenames in the archive sequences/ directory |
|
1579 * numranges.txt: list of filenames in the archive numrange/ directory |
|
1580 * versions.txt: the list of cube versions from CWProperty |
|
1581 * tables/<tablename>.<chunkno>: pickled data |
|
1582 * sequences/<sequencename>: pickled data |
|
1583 |
|
1584 The pickled data format for tables, numranges and sequences is a tuple of 3 elements: |
|
1585 * the table name |
|
1586 * a tuple of column names |
|
1587 * a list of rows (as tuples with one element per column) |
|
1588 |
|
1589 Tables are saved in chunks in different files in order to prevent |
|
1590 a too high memory consumption. |
|
1591 """ |
|
1592 blocksize = 100 |
|
1593 |
|
1594 def __init__(self, source): |
|
1595 """ |
|
1596 :param: source an instance of the system source |
|
1597 """ |
|
1598 self._source = source |
|
1599 self.logger = logging.getLogger('cubicweb.ctl') |
|
1600 self.logger.setLevel(logging.INFO) |
|
1601 self.logger.addHandler(logging.StreamHandler(sys.stdout)) |
|
1602 self.schema = self._source.schema |
|
1603 self.dbhelper = self._source.dbhelper |
|
1604 self.cnx = None |
|
1605 self.cursor = None |
|
1606 self.sql_generator = sqlgen.SQLGenerator() |
|
1607 |
|
1608 def get_connection(self): |
|
1609 return self._source.get_connection() |
|
1610 |
|
1611 def backup(self, backupfile): |
|
1612 archive = zipfile.ZipFile(backupfile, 'w', allowZip64=True) |
|
1613 self.cnx = self.get_connection() |
|
1614 try: |
|
1615 self.cursor = self.cnx.cursor() |
|
1616 self.cursor.arraysize = 100 |
|
1617 self.logger.info('writing metadata') |
|
1618 self.write_metadata(archive) |
|
1619 for seq in self.get_sequences(): |
|
1620 self.logger.info('processing sequence %s', seq) |
|
1621 self.write_sequence(archive, seq) |
|
1622 for numrange in self.get_numranges(): |
|
1623 self.logger.info('processing numrange %s', numrange) |
|
1624 self.write_numrange(archive, numrange) |
|
1625 for table in self.get_tables(): |
|
1626 self.logger.info('processing table %s', table) |
|
1627 self.write_table(archive, table) |
|
1628 finally: |
|
1629 archive.close() |
|
1630 self.cnx.close() |
|
1631 self.logger.info('done') |
|
1632 |
|
1633 def get_tables(self): |
|
1634 non_entity_tables = ['entities', |
|
1635 'transactions', |
|
1636 'tx_entity_actions', |
|
1637 'tx_relation_actions', |
|
1638 ] |
|
1639 etype_tables = [] |
|
1640 relation_tables = [] |
|
1641 prefix = 'cw_' |
|
1642 for etype in self.schema.entities(): |
|
1643 eschema = self.schema.eschema(etype) |
|
1644 if eschema.final: |
|
1645 continue |
|
1646 etype_tables.append('%s%s'%(prefix, etype)) |
|
1647 for rtype in self.schema.relations(): |
|
1648 rschema = self.schema.rschema(rtype) |
|
1649 if rschema.final or rschema.inlined or rschema in VIRTUAL_RTYPES: |
|
1650 continue |
|
1651 relation_tables.append('%s_relation' % rtype) |
|
1652 return non_entity_tables + etype_tables + relation_tables |
|
1653 |
|
1654 def get_sequences(self): |
|
1655 return [] |
|
1656 |
|
1657 def get_numranges(self): |
|
1658 return ['entities_id_seq'] |
|
1659 |
|
1660 def write_metadata(self, archive): |
|
1661 archive.writestr('format.txt', '1.1') |
|
1662 archive.writestr('tables.txt', '\n'.join(self.get_tables())) |
|
1663 archive.writestr('sequences.txt', '\n'.join(self.get_sequences())) |
|
1664 archive.writestr('numranges.txt', '\n'.join(self.get_numranges())) |
|
1665 versions = self._get_versions() |
|
1666 versions_str = '\n'.join('%s %s' % (k, v) |
|
1667 for k, v in versions) |
|
1668 archive.writestr('versions.txt', versions_str) |
|
1669 |
|
1670 def write_sequence(self, archive, seq): |
|
1671 sql = self.dbhelper.sql_sequence_current_state(seq) |
|
1672 columns, rows_iterator = self._get_cols_and_rows(sql) |
|
1673 rows = list(rows_iterator) |
|
1674 serialized = self._serialize(seq, columns, rows) |
|
1675 archive.writestr('sequences/%s' % seq, serialized) |
|
1676 |
|
1677 def write_numrange(self, archive, numrange): |
|
1678 sql = self.dbhelper.sql_numrange_current_state(numrange) |
|
1679 columns, rows_iterator = self._get_cols_and_rows(sql) |
|
1680 rows = list(rows_iterator) |
|
1681 serialized = self._serialize(numrange, columns, rows) |
|
1682 archive.writestr('numrange/%s' % numrange, serialized) |
|
1683 |
|
1684 def write_table(self, archive, table): |
|
1685 nb_lines_sql = 'SELECT COUNT(*) FROM %s' % table |
|
1686 self.cursor.execute(nb_lines_sql) |
|
1687 rowcount = self.cursor.fetchone()[0] |
|
1688 sql = 'SELECT * FROM %s' % table |
|
1689 columns, rows_iterator = self._get_cols_and_rows(sql) |
|
1690 self.logger.info('number of rows: %d', rowcount) |
|
1691 blocksize = self.blocksize |
|
1692 if rowcount > 0: |
|
1693 for i, start in enumerate(range(0, rowcount, blocksize)): |
|
1694 rows = list(itertools.islice(rows_iterator, blocksize)) |
|
1695 serialized = self._serialize(table, columns, rows) |
|
1696 archive.writestr('tables/%s.%04d' % (table, i), serialized) |
|
1697 self.logger.debug('wrote rows %d to %d (out of %d) to %s.%04d', |
|
1698 start, start+len(rows)-1, |
|
1699 rowcount, |
|
1700 table, i) |
|
1701 else: |
|
1702 rows = [] |
|
1703 serialized = self._serialize(table, columns, rows) |
|
1704 archive.writestr('tables/%s.%04d' % (table, 0), serialized) |
|
1705 |
|
1706 def _get_cols_and_rows(self, sql): |
|
1707 process_result = self._source.iter_process_result |
|
1708 self.cursor.execute(sql) |
|
1709 columns = (d[0] for d in self.cursor.description) |
|
1710 rows = process_result(self.cursor) |
|
1711 return tuple(columns), rows |
|
1712 |
|
1713 def _serialize(self, name, columns, rows): |
|
1714 return pickle.dumps((name, columns, rows), pickle.HIGHEST_PROTOCOL) |
|
1715 |
|
1716 def restore(self, backupfile): |
|
1717 archive = zipfile.ZipFile(backupfile, 'r', allowZip64=True) |
|
1718 self.cnx = self.get_connection() |
|
1719 self.cursor = self.cnx.cursor() |
|
1720 sequences, numranges, tables, table_chunks = self.read_metadata(archive, backupfile) |
|
1721 for seq in sequences: |
|
1722 self.logger.info('restoring sequence %s', seq) |
|
1723 self.read_sequence(archive, seq) |
|
1724 for numrange in numranges: |
|
1725 self.logger.info('restoring numrange %s', numrange) |
|
1726 self.read_numrange(archive, numrange) |
|
1727 for table in tables: |
|
1728 self.logger.info('restoring table %s', table) |
|
1729 self.read_table(archive, table, sorted(table_chunks[table])) |
|
1730 self.cnx.close() |
|
1731 archive.close() |
|
1732 self.logger.info('done') |
|
1733 |
|
1734 def read_metadata(self, archive, backupfile): |
|
1735 formatinfo = archive.read('format.txt') |
|
1736 self.logger.info('checking metadata') |
|
1737 if formatinfo.strip() != "1.1": |
|
1738 self.logger.critical('Unsupported format in archive: %s', formatinfo) |
|
1739 raise ValueError('Unknown format in %s: %s' % (backupfile, formatinfo)) |
|
1740 tables = archive.read('tables.txt').splitlines() |
|
1741 sequences = archive.read('sequences.txt').splitlines() |
|
1742 numranges = archive.read('numranges.txt').splitlines() |
|
1743 file_versions = self._parse_versions(archive.read('versions.txt')) |
|
1744 versions = set(self._get_versions()) |
|
1745 if file_versions != versions: |
|
1746 self.logger.critical('Unable to restore : versions do not match') |
|
1747 self.logger.critical('Expected:\n%s', '\n'.join('%s : %s' % (cube, ver) |
|
1748 for cube, ver in sorted(versions))) |
|
1749 self.logger.critical('Found:\n%s', '\n'.join('%s : %s' % (cube, ver) |
|
1750 for cube, ver in sorted(file_versions))) |
|
1751 raise ValueError('Unable to restore : versions do not match') |
|
1752 table_chunks = {} |
|
1753 for name in archive.namelist(): |
|
1754 if not name.startswith('tables/'): |
|
1755 continue |
|
1756 filename = basename(name) |
|
1757 tablename, _ext = filename.rsplit('.', 1) |
|
1758 table_chunks.setdefault(tablename, []).append(name) |
|
1759 return sequences, numranges, tables, table_chunks |
|
1760 |
|
1761 def read_sequence(self, archive, seq): |
|
1762 seqname, columns, rows = pickle.loads(archive.read('sequences/%s' % seq)) |
|
1763 assert seqname == seq |
|
1764 assert len(rows) == 1 |
|
1765 assert len(rows[0]) == 1 |
|
1766 value = rows[0][0] |
|
1767 sql = self.dbhelper.sql_restart_sequence(seq, value) |
|
1768 self.cursor.execute(sql) |
|
1769 self.cnx.commit() |
|
1770 |
|
1771 def read_numrange(self, archive, numrange): |
|
1772 rangename, columns, rows = pickle.loads(archive.read('numrange/%s' % numrange)) |
|
1773 assert rangename == numrange |
|
1774 assert len(rows) == 1 |
|
1775 assert len(rows[0]) == 1 |
|
1776 value = rows[0][0] |
|
1777 sql = self.dbhelper.sql_restart_numrange(numrange, value) |
|
1778 self.cursor.execute(sql) |
|
1779 self.cnx.commit() |
|
1780 |
|
1781 def read_table(self, archive, table, filenames): |
|
1782 merge_args = self._source.merge_args |
|
1783 self.cursor.execute('DELETE FROM %s' % table) |
|
1784 self.cnx.commit() |
|
1785 row_count = 0 |
|
1786 for filename in filenames: |
|
1787 tablename, columns, rows = pickle.loads(archive.read(filename)) |
|
1788 assert tablename == table |
|
1789 if not rows: |
|
1790 continue |
|
1791 insert = self.sql_generator.insert(table, |
|
1792 dict(zip(columns, rows[0]))) |
|
1793 for row in rows: |
|
1794 self.cursor.execute(insert, merge_args(dict(zip(columns, row)), {})) |
|
1795 row_count += len(rows) |
|
1796 self.cnx.commit() |
|
1797 self.logger.info('inserted %d rows', row_count) |
|
1798 |
|
1799 |
|
1800 def _parse_versions(self, version_str): |
|
1801 versions = set() |
|
1802 for line in version_str.splitlines(): |
|
1803 versions.add(tuple(line.split())) |
|
1804 return versions |
|
1805 |
|
1806 def _get_versions(self): |
|
1807 version_sql = 'SELECT cw_pkey, cw_value FROM cw_CWProperty' |
|
1808 versions = [] |
|
1809 self.cursor.execute(version_sql) |
|
1810 for pkey, value in self.cursor.fetchall(): |
|
1811 if pkey.startswith(u'system.version'): |
|
1812 versions.append((pkey, value)) |
|
1813 return versions |
|