diff -r 000000000000 -r b97547f5f1fa server/migractions.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/server/migractions.py Wed Nov 05 15:52:50 2008 +0100 @@ -0,0 +1,1075 @@ +"""a class implementing basic actions used in migration scripts. + +The following schema actions are supported for now: +* add/drop/rename attribute +* add/drop entity/relation type +* rename entity type + +The following data actions are supported for now: +* add an entity +* execute raw RQL queries + + +:organization: Logilab +:copyright: 2001-2008 LOGILAB S.A. (Paris, FRANCE), all rights reserved. +:contact: http://www.logilab.fr/ -- mailto:contact@logilab.fr +""" +__docformat__ = "restructuredtext en" + +import sys +import os +from os.path import join, exists + +from mx.DateTime import now +from logilab.common.decorators import cached +from logilab.common.adbh import get_adv_func_helper + +from yams.constraints import SizeConstraint +from yams.schema2sql import eschema2sql, rschema2sql + +from cubicweb import AuthenticationError +from cubicweb.dbapi import get_repository, repo_connect +from cubicweb.common.migration import MigrationHelper, yes + +try: + from cubicweb.server import schemaserial as ss + from cubicweb.server.utils import manager_userpasswd + from cubicweb.server.sqlutils import sqlexec +except ImportError: # LAX + pass + +class ServerMigrationHelper(MigrationHelper): + """specific migration helper for server side migration scripts, + providind actions related to schema/data migration + """ + + def __init__(self, config, schema, interactive=True, + repo=None, cnx=None, verbosity=1, connect=True): + MigrationHelper.__init__(self, config, interactive, verbosity) + if not interactive: + assert cnx + assert repo + if cnx is not None: + assert repo + self._cnx = cnx + self.repo = repo + elif connect: + self.repo_connect() + if not schema: + schema = config.load_schema(expand_cubes=True) + self.new_schema = schema + self._synchronized = set() + + @cached + def repo_connect(self): + self.repo = get_repository(method='inmemory', config=self.config) + return self.repo + + def shutdown(self): + if self.repo is not None: + self.repo.shutdown() + + def rewrite_vcconfiguration(self): + """write current installed versions (of cubicweb software + and of each used cube) into the database + """ + self.cmd_set_property('system.version.cubicweb', self.config.cubicweb_version()) + for pkg in self.config.cubes(): + pkgversion = self.config.cube_version(pkg) + self.cmd_set_property('system.version.%s' % pkg.lower(), pkgversion) + self.commit() + + def backup_database(self, backupfile=None, askconfirm=True): + config = self.config + source = config.sources()['system'] + helper = get_adv_func_helper(source['db-driver']) + date = now().strftime('%Y-%m-%d_%H:%M:%S') + app = config.appid + backupfile = backupfile or join(config.backup_dir(), + '%s-%s.dump' % (app, date)) + if exists(backupfile): + if not self.confirm('a backup already exists for %s, overwrite it?' % app): + return + elif askconfirm and not self.confirm('backup %s database?' % app): + return + cmd = helper.backup_command(source['db-name'], source.get('db-host'), + source.get('db-user'), backupfile, + keepownership=False) + while True: + print cmd + if os.system(cmd): + print 'error while backuping the base' + answer = self.confirm('continue anyway?', + shell=False, abort=False, retry=True) + if not answer: + raise SystemExit(1) + if answer == 1: # 1: continue, 2: retry + break + else: + print 'database backup:', backupfile + break + + def restore_database(self, backupfile, drop=True): + config = self.config + source = config.sources()['system'] + helper = get_adv_func_helper(source['db-driver']) + app = config.appid + if not exists(backupfile): + raise Exception("backup file %s doesn't exist" % backupfile) + if self.confirm('restore %s database from %s ?' % (app, backupfile)): + for cmd in helper.restore_commands(source['db-name'], source.get('db-host'), + source.get('db-user'), backupfile, + source['db-encoding'], + keepownership=False, drop=drop): + while True: + print cmd + if os.system(cmd): + print 'error while restoring the base' + answer = self.confirm('continue anyway?', + shell=False, abort=False, retry=True) + if not answer: + raise SystemExit(1) + if answer == 1: # 1: continue, 2: retry + break + else: + break + print 'database restored' + + def migrate(self, vcconf, toupgrade, options): + if not options.fs_only: + if options.backup_db is None: + self.backup_database() + elif options.backup_db: + self.backup_database(askconfirm=False) + super(ServerMigrationHelper, self).migrate(vcconf, toupgrade, options) + self.rewrite_configuration() + + def process_script(self, migrscript, funcname=None, *args, **kwargs): + """execute a migration script + in interactive mode, display the migration script path, ask for + confirmation and execute it if confirmed + """ + if migrscript.endswith('.sql'): + if self.execscript_confirm(migrscript): + sqlexec(open(migrscript).read(), self.session.system_sql) + else: + return super(ServerMigrationHelper, self).process_script( + migrscript, funcname, *args, **kwargs) + + @property + def cnx(self): + """lazy connection""" + try: + return self._cnx + except AttributeError: + sourcescfg = self.repo.config.sources() + try: + login = sourcescfg['admin']['login'] + pwd = sourcescfg['admin']['password'] + except KeyError: + login, pwd = manager_userpasswd() + while True: + try: + self._cnx = repo_connect(self.repo, login, pwd) + if not 'managers' in self._cnx.user(self.session).groups: + print 'migration need an account in the managers group' + else: + break + except AuthenticationError: + print 'wrong user/password' + except (KeyboardInterrupt, EOFError): + print 'aborting...' + sys.exit(0) + try: + login, pwd = manager_userpasswd() + except (KeyboardInterrupt, EOFError): + print 'aborting...' + sys.exit(0) + return self._cnx + + @property + def session(self): + return self.repo._get_session(self.cnx.sessionid) + + @property + @cached + def rqlcursor(self): + """lazy rql cursor""" + return self.cnx.cursor(self.session) + + def commit(self): + if hasattr(self, '_cnx'): + self._cnx.commit() + + def rollback(self): + if hasattr(self, '_cnx'): + self._cnx.rollback() + + def rqlexecall(self, rqliter, cachekey=None, ask_confirm=True): + for rql, kwargs in rqliter: + self.rqlexec(rql, kwargs, cachekey, ask_confirm) + + @cached + def _create_context(self): + """return a dictionary to use as migration script execution context""" + context = super(ServerMigrationHelper, self)._create_context() + context.update({'checkpoint': self.checkpoint, + 'sql': self.sqlexec, + 'rql': self.rqlexec, + 'rqliter': self.rqliter, + 'schema': self.repo.schema, + 'newschema': self.new_schema, + 'cnx': self.cnx, + 'session' : self.session, + 'repo' : self.repo, + }) + return context + + @cached + def group_mapping(self): + """cached group mapping""" + return ss.group_mapping(self.rqlcursor) + + def exec_event_script(self, event, cubepath=None, funcname=None, + *args, **kwargs): + if cubepath: + apc = join(cubepath, 'migration', '%s.py' % event) + else: + apc = join(self.config.migration_scripts_dir(), '%s.py' % event) + if exists(apc): + if self.config.free_wheel: + from cubicweb.server.hooks import setowner_after_add_entity + self.repo.hm.unregister_hook(setowner_after_add_entity, + 'after_add_entity', '') + self.deactivate_verification_hooks() + self.info('executing %s', apc) + confirm = self.confirm + execscript_confirm = self.execscript_confirm + self.confirm = yes + self.execscript_confirm = yes + try: + return self.process_script(apc, funcname, *args, **kwargs) + finally: + self.confirm = confirm + self.execscript_confirm = execscript_confirm + if self.config.free_wheel: + self.repo.hm.register_hook(setowner_after_add_entity, + 'after_add_entity', '') + self.reactivate_verification_hooks() + + # base actions ############################################################ + + def checkpoint(self): + """checkpoint action""" + if self.confirm('commit now ?', shell=False): + self.commit() + + def cmd_add_cube(self, cube, update_database=True): + """update_database is telling if the database schema should be updated + or if only the relevant eproperty should be inserted (for the case where + a cube has been extracted from an existing application, so the + cube schema is already in there) + """ + newcubes = super(ServerMigrationHelper, self).cmd_add_cube( + cube) + if not newcubes: + return + for pack in newcubes: + self.cmd_set_property('system.version.'+pack, + self.config.cube_version(pack)) + if not update_database: + self.commit() + return + self.new_schema = self.config.load_schema() + new = set() + # execute pre-create files + for pack in reversed(newcubes): + self.exec_event_script('precreate', self.config.cube_dir(pack)) + # add new entity and relation types + for rschema in self.new_schema.relations(): + if not rschema in self.repo.schema: + self.cmd_add_relation_type(rschema.type) + new.add(rschema.type) + for eschema in self.new_schema.entities(): + if not eschema in self.repo.schema: + self.cmd_add_entity_type(eschema.type) + new.add(eschema.type) + # check if attributes has been added to existing entities + for rschema in self.new_schema.relations(): + existingschema = self.repo.schema.rschema(rschema.type) + for (fromtype, totype) in rschema.iter_rdefs(): + if existingschema.has_rdef(fromtype, totype): + continue + # check we should actually add the relation definition + if not (fromtype in new or totype in new or rschema in new): + continue + self.cmd_add_relation_definition(str(fromtype), rschema.type, + str(totype)) + # execute post-create files + for pack in reversed(newcubes): + self.exec_event_script('postcreate', self.config.cube_dir(pack)) + self.commit() + + def cmd_remove_cube(self, cube): + removedcubes = super(ServerMigrationHelper, self).cmd_remove_cube(cube) + if not removedcubes: + return + oldschema = self.new_schema + self.new_schema = newschema = self.config.load_schema() + reposchema = self.repo.schema + # execute pre-remove files + for pack in reversed(removedcubes): + self.exec_event_script('preremove', self.config.cube_dir(pack)) + # remove cubes'entity and relation types + for rschema in oldschema.relations(): + if not rschema in newschema and rschema in reposchema: + self.cmd_drop_relation_type(rschema.type) + for eschema in oldschema.entities(): + if not eschema in newschema and eschema in reposchema: + self.cmd_drop_entity_type(eschema.type) + for rschema in oldschema.relations(): + if rschema in newschema and rschema in reposchema: + # check if attributes/relations has been added to entities from + # other cubes + for fromtype, totype in rschema.iter_rdefs(): + if not newschema[rschema.type].has_rdef(fromtype, totype) and \ + reposchema[rschema.type].has_rdef(fromtype, totype): + self.cmd_drop_relation_definition( + str(fromtype), rschema.type, str(totype)) + # execute post-remove files + for pack in reversed(removedcubes): + self.exec_event_script('postremove', self.config.cube_dir(pack)) + self.rqlexec('DELETE EProperty X WHERE X pkey %(pk)s', + {'pk': u'system.version.'+pack}, ask_confirm=False) + self.commit() + + # schema migration actions ################################################ + + def cmd_add_attribute(self, etype, attrname, attrtype=None, commit=True): + """add a new attribute on the given entity type""" + if attrtype is None: + rschema = self.new_schema.rschema(attrname) + attrtype = rschema.objects(etype)[0] + self.cmd_add_relation_definition(etype, attrname, attrtype, commit=commit) + + def cmd_drop_attribute(self, etype, attrname, commit=True): + """drop an existing attribute from the given entity type + + `attrname` is a string giving the name of the attribute to drop + """ + rschema = self.repo.schema.rschema(attrname) + attrtype = rschema.objects(etype)[0] + self.cmd_drop_relation_definition(etype, attrname, attrtype, commit=commit) + + def cmd_rename_attribute(self, etype, oldname, newname, commit=True): + """rename an existing attribute of the given entity type + + `oldname` is a string giving the name of the existing attribute + `newname` is a string giving the name of the renamed attribute + """ + eschema = self.new_schema.eschema(etype) + attrtype = eschema.destination(newname) + # have to commit this first step anyway to get the definition + # actually in the schema + self.cmd_add_attribute(etype, newname, attrtype, commit=True) + # skipp NULL values if the attribute is required + rql = 'SET X %s VAL WHERE X is %s, X %s VAL' % (newname, etype, oldname) + card = eschema.rproperty(newname, 'cardinality')[0] + if card == '1': + rql += ', NOT X %s NULL' % oldname + self.rqlexec(rql, ask_confirm=self.verbosity>=2) + self.cmd_drop_attribute(etype, oldname, commit=commit) + + def cmd_add_entity_type(self, etype, auto=True, commit=True): + """register a new entity type + + in auto mode, automatically register entity's relation where the + targeted type is known + """ + applschema = self.repo.schema + if etype in applschema: + eschema = applschema[etype] + if eschema.is_final(): + applschema.del_entity_type(etype) + else: + eschema = self.new_schema.eschema(etype) + confirm = self.verbosity >= 2 + # register the entity into EEType + self.rqlexecall(ss.eschema2rql(eschema), ask_confirm=confirm) + # add specializes relation if needed + self.rqlexecall(ss.eschemaspecialize2rql(eschema), ask_confirm=confirm) + # register groups / permissions for the entity + self.rqlexecall(ss.erperms2rql(eschema, self.group_mapping()), + ask_confirm=confirm) + # register entity's attributes + for rschema, attrschema in eschema.attribute_definitions(): + # ignore those meta relations, they will be automatically added + if rschema.type in ('eid', 'creation_date', 'modification_date'): + continue + if not rschema.type in applschema: + # need to add the relation type and to commit to get it + # actually in the schema + self.cmd_add_relation_type(rschema.type, False, commit=True) + # register relation definition + self.rqlexecall(ss.rdef2rql(rschema, etype, attrschema.type), + ask_confirm=confirm) + if auto: + # we have commit here to get relation types actually in the schema + self.commit() + added = [] + for rschema in eschema.subject_relations(): + # attribute relation have already been processed and + # 'owned_by'/'created_by' will be automatically added + if rschema.final or rschema.type in ('owned_by', 'created_by', 'is', 'is_instance_of'): + continue + rtypeadded = rschema.type in applschema + for targetschema in rschema.objects(etype): + # ignore relations where the targeted type is not in the + # current application schema + targettype = targetschema.type + if not targettype in applschema and targettype != etype: + continue + if not rtypeadded: + # need to add the relation type and to commit to get it + # actually in the schema + added.append(rschema.type) + self.cmd_add_relation_type(rschema.type, False, commit=True) + rtypeadded = True + # register relation definition + # remember this two avoid adding twice non symetric relation + # such as "Emailthread forked_from Emailthread" + added.append((etype, rschema.type, targettype)) + self.rqlexecall(ss.rdef2rql(rschema, etype, targettype), + ask_confirm=confirm) + for rschema in eschema.object_relations(): + rtypeadded = rschema.type in applschema or rschema.type in added + for targetschema in rschema.subjects(etype): + # ignore relations where the targeted type is not in the + # current application schema + targettype = targetschema.type + # don't check targettype != etype since in this case the + # relation has already been added as a subject relation + if not targettype in applschema: + continue + if not rtypeadded: + # need to add the relation type and to commit to get it + # actually in the schema + self.cmd_add_relation_type(rschema.type, False, commit=True) + rtypeadded = True + elif (targettype, rschema.type, etype) in added: + continue + # register relation definition + self.rqlexecall(ss.rdef2rql(rschema, targettype, etype), + ask_confirm=confirm) + if commit: + self.commit() + + def cmd_drop_entity_type(self, etype, commit=True): + """unregister an existing entity type + + This will trigger deletion of necessary relation types and definitions + """ + # XXX what if we delete an entity type which is specialized by other types + # unregister the entity from EEType + self.rqlexec('DELETE EEType X WHERE X name %(etype)s', {'etype': etype}, + ask_confirm=self.verbosity>=2) + if commit: + self.commit() + + def cmd_rename_entity_type(self, oldname, newname, commit=True): + """rename an existing entity type in the persistent schema + + `oldname` is a string giving the name of the existing entity type + `newname` is a string giving the name of the renamed entity type + """ + self.rqlexec('SET ET name %(newname)s WHERE ET is EEType, ET name %(oldname)s', + {'newname' : unicode(newname), 'oldname' : oldname}) + if commit: + self.commit() + + def cmd_add_relation_type(self, rtype, addrdef=True, commit=True): + """register a new relation type named `rtype`, as described in the + schema description file. + + `addrdef` is a boolean value; when True, it will also add all relations + of the type just added found in the schema definition file. Note that it + implies an intermediate "commit" which commits the relation type + creation (but not the relation definitions themselves, for which + committing depends on the `commit` argument value). + + """ + rschema = self.new_schema.rschema(rtype) + # register the relation into ERType and insert necessary relation + # definitions + self.rqlexecall(ss.rschema2rql(rschema, addrdef=False), + ask_confirm=self.verbosity>=2) + # register groups / permissions for the relation + self.rqlexecall(ss.erperms2rql(rschema, self.group_mapping()), + ask_confirm=self.verbosity>=2) + if addrdef: + self.commit() + self.rqlexecall(ss.rdef2rql(rschema), + ask_confirm=self.verbosity>=2) + if commit: + self.commit() + + def cmd_drop_relation_type(self, rtype, commit=True): + """unregister an existing relation type""" + # unregister the relation from ERType + self.rqlexec('DELETE ERType X WHERE X name %r' % rtype, + ask_confirm=self.verbosity>=2) + if commit: + self.commit() + + def cmd_rename_relation(self, oldname, newname, commit=True): + """rename an existing relation + + `oldname` is a string giving the name of the existing relation + `newname` is a string giving the name of the renamed relation + """ + self.cmd_add_relation_type(newname, commit=True) + self.rqlexec('SET X %s Y WHERE X %s Y' % (newname, oldname), + ask_confirm=self.verbosity>=2) + self.cmd_drop_relation_type(oldname, commit=commit) + + def cmd_add_relation_definition(self, subjtype, rtype, objtype, commit=True): + """register a new relation definition, from its definition found in the + schema definition file + """ + rschema = self.new_schema.rschema(rtype) + if not rtype in self.repo.schema: + self.cmd_add_relation_type(rtype, addrdef=False, commit=True) + self.rqlexecall(ss.rdef2rql(rschema, subjtype, objtype), + ask_confirm=self.verbosity>=2) + if commit: + self.commit() + + def cmd_drop_relation_definition(self, subjtype, rtype, objtype, commit=True): + """unregister an existing relation definition""" + rschema = self.repo.schema.rschema(rtype) + # unregister the definition from EFRDef or ENFRDef + if rschema.is_final(): + etype = 'EFRDef' + else: + etype = 'ENFRDef' + rql = ('DELETE %s X WHERE X from_entity FE, FE name "%s",' + 'X relation_type RT, RT name "%s", X to_entity TE, TE name "%s"') + self.rqlexec(rql % (etype, subjtype, rtype, objtype), + ask_confirm=self.verbosity>=2) + if commit: + self.commit() + + def cmd_synchronize_permissions(self, ertype, commit=True): + """permission synchronization for an entity or relation type""" + if ertype in ('eid', 'has_text', 'identity'): + return + newrschema = self.new_schema[ertype] + teid = self.repo.schema[ertype].eid + if 'update' in newrschema.ACTIONS or newrschema.is_final(): + # entity type + exprtype = u'ERQLExpression' + else: + # relation type + exprtype = u'RRQLExpression' + assert teid, ertype + gm = self.group_mapping() + confirm = self.verbosity >= 2 + # * remove possibly deprecated permission (eg in the persistent schema + # but not in the new schema) + # * synchronize existing expressions + # * add new groups/expressions + for action in newrschema.ACTIONS: + perm = '%s_permission' % action + # handle groups + newgroups = list(newrschema.get_groups(action)) + for geid, gname in self.rqlexec('Any G, GN WHERE T %s G, G name GN, ' + 'T eid %%(x)s' % perm, {'x': teid}, 'x', + ask_confirm=False): + if not gname in newgroups: + if not confirm or self.confirm('remove %s permission of %s to %s?' + % (action, ertype, gname)): + self.rqlexec('DELETE T %s G WHERE G eid %%(x)s, T eid %s' + % (perm, teid), + {'x': geid}, 'x', ask_confirm=False) + else: + newgroups.remove(gname) + for gname in newgroups: + if not confirm or self.confirm('grant %s permission of %s to %s?' + % (action, ertype, gname)): + self.rqlexec('SET T %s G WHERE G eid %%(x)s, T eid %s' + % (perm, teid), + {'x': gm[gname]}, 'x', ask_confirm=False) + # handle rql expressions + newexprs = dict((expr.expression, expr) for expr in newrschema.get_rqlexprs(action)) + for expreid, expression in self.rqlexec('Any E, EX WHERE T %s E, E expression EX, ' + 'T eid %s' % (perm, teid), + ask_confirm=False): + if not expression in newexprs: + if not confirm or self.confirm('remove %s expression for %s permission of %s?' + % (expression, action, ertype)): + # deleting the relation will delete the expression entity + self.rqlexec('DELETE T %s E WHERE E eid %%(x)s, T eid %s' + % (perm, teid), + {'x': expreid}, 'x', ask_confirm=False) + else: + newexprs.pop(expression) + for expression in newexprs.values(): + expr = expression.expression + if not confirm or self.confirm('add %s expression for %s permission of %s?' + % (expr, action, ertype)): + self.rqlexec('INSERT RQLExpression X: X exprtype %%(exprtype)s, ' + 'X expression %%(expr)s, X mainvars %%(vars)s, T %s X ' + 'WHERE T eid %%(x)s' % perm, + {'expr': expr, 'exprtype': exprtype, + 'vars': expression.mainvars, 'x': teid}, 'x', + ask_confirm=False) + if commit: + self.commit() + + def cmd_synchronize_rschema(self, rtype, syncrdefs=True, syncperms=True, + commit=True): + """synchronize properties of the persistent relation schema against its + current definition: + + * description + * symetric, meta + * inlined + * relation definitions if `syncrdefs` + * permissions if `syncperms` + + physical schema changes should be handled by repository's schema hooks + """ + rtype = str(rtype) + if rtype in self._synchronized: + return + self._synchronized.add(rtype) + rschema = self.new_schema.rschema(rtype) + self.rqlexecall(ss.updaterschema2rql(rschema), + ask_confirm=self.verbosity>=2) + reporschema = self.repo.schema.rschema(rtype) + if syncrdefs: + for subj, obj in rschema.iter_rdefs(): + if not reporschema.has_rdef(subj, obj): + continue + self.cmd_synchronize_rdef_schema(subj, rschema, obj, + commit=False) + if syncperms: + self.cmd_synchronize_permissions(rtype, commit=False) + if commit: + self.commit() + + def cmd_synchronize_eschema(self, etype, syncperms=True, commit=True): + """synchronize properties of the persistent entity schema against + its current definition: + + * description + * internationalizable, fulltextindexed, indexed, meta + * relations from/to this entity + * permissions if `syncperms` + """ + etype = str(etype) + if etype in self._synchronized: + return + self._synchronized.add(etype) + repoeschema = self.repo.schema.eschema(etype) + try: + eschema = self.new_schema.eschema(etype) + except KeyError: + return + repospschema = repoeschema.specializes() + espschema = eschema.specializes() + if repospschema and not espschema: + self.rqlexec('DELETE X specializes Y WHERE X is EEType, X name %(x)s', + {'x': str(repoechema)}) + elif not repospschema and espschema: + self.rqlexec('SET X specializes Y WHERE X is EEType, X name %(x)s, ' + 'Y is EEType, Y name %(y)s', + {'x': str(repoechema), 'y': str(epschema)}) + self.rqlexecall(ss.updateeschema2rql(eschema), + ask_confirm=self.verbosity >= 2) + for rschema, targettypes, x in eschema.relation_definitions(True): + if x == 'subject': + if not rschema in repoeschema.subject_relations(): + continue + subjtypes, objtypes = [etype], targettypes + else: # x == 'object' + if not rschema in repoeschema.object_relations(): + continue + subjtypes, objtypes = targettypes, [etype] + self.cmd_synchronize_rschema(rschema, syncperms=syncperms, + syncrdefs=False, commit=False) + reporschema = self.repo.schema.rschema(rschema) + for subj in subjtypes: + for obj in objtypes: + if not reporschema.has_rdef(subj, obj): + continue + self.cmd_synchronize_rdef_schema(subj, rschema, obj, + commit=False) + if syncperms: + self.cmd_synchronize_permissions(etype, commit=False) + if commit: + self.commit() + + def cmd_synchronize_rdef_schema(self, subjtype, rtype, objtype, + commit=True): + """synchronize properties of the persistent relation definition schema + against its current definition: + * order and other properties + * constraints + """ + subjtype, objtype = str(subjtype), str(objtype) + rschema = self.new_schema.rschema(rtype) + reporschema = self.repo.schema.rschema(rschema) + if (subjtype, rschema, objtype) in self._synchronized: + return + self._synchronized.add((subjtype, rschema, objtype)) + if rschema.symetric: + self._synchronized.add((objtype, rschema, subjtype)) + confirm = self.verbosity >= 2 + # properties + self.rqlexecall(ss.updaterdef2rql(rschema, subjtype, objtype), + ask_confirm=confirm) + # constraints + newconstraints = list(rschema.rproperty(subjtype, objtype, 'constraints')) + # 1. remove old constraints and update constraints of the same type + # NOTE: don't use rschema.constraint_by_type because it may be + # out of sync with newconstraints when multiple + # constraints of the same type are used + for cstr in reporschema.rproperty(subjtype, objtype, 'constraints'): + for newcstr in newconstraints: + if newcstr.type() == cstr.type(): + break + else: + newcstr = None + if newcstr is None: + self.rqlexec('DELETE X constrained_by C WHERE C eid %(x)s', + {'x': cstr.eid}, 'x', + ask_confirm=confirm) + self.rqlexec('DELETE EConstraint C WHERE C eid %(x)s', + {'x': cstr.eid}, 'x', + ask_confirm=confirm) + else: + newconstraints.remove(newcstr) + values = {'x': cstr.eid, + 'v': unicode(newcstr.serialize())} + self.rqlexec('SET X value %(v)s WHERE X eid %(x)s', + values, 'x', ask_confirm=confirm) + # 2. add new constraints + for newcstr in newconstraints: + self.rqlexecall(ss.constraint2rql(rschema, subjtype, objtype, + newcstr), + ask_confirm=confirm) + if commit: + self.commit() + + def cmd_synchronize_schema(self, syncperms=True, commit=True): + """synchronize the persistent schema against the current definition + schema. + + It will synch common stuff between the definition schema and the + actual persistent schema, it won't add/remove any entity or relation. + """ + for etype in self.repo.schema.entities(): + self.cmd_synchronize_eschema(etype, syncperms=syncperms, commit=False) + if commit: + self.commit() + + def cmd_change_relation_props(self, subjtype, rtype, objtype, + commit=True, **kwargs): + """change some properties of a relation definition""" + assert kwargs + restriction = [] + if subjtype and subjtype != 'Any': + restriction.append('X from_entity FE, FE name "%s"' % subjtype) + if objtype and objtype != 'Any': + restriction.append('X to_entity TE, TE name "%s"' % objtype) + if rtype and rtype != 'Any': + restriction.append('X relation_type RT, RT name "%s"' % rtype) + assert restriction + values = [] + for k, v in kwargs.items(): + values.append('X %s %%(%s)s' % (k, k)) + if isinstance(v, str): + kwargs[k] = unicode(v) + rql = 'SET %s WHERE %s' % (','.join(values), ','.join(restriction)) + self.rqlexec(rql, kwargs, ask_confirm=self.verbosity>=2) + if commit: + self.commit() + + def cmd_set_size_constraint(self, etype, rtype, size, commit=True): + """set change size constraint of a string attribute + + if size is None any size constraint will be removed + """ + oldvalue = None + for constr in self.repo.schema.eschema(etype).constraints(rtype): + if isinstance(constr, SizeConstraint): + oldvalue = constr.max + if oldvalue == size: + return + if oldvalue is None and not size is None: + ceid = self.rqlexec('INSERT EConstraint C: C value %(v)s, C cstrtype CT ' + 'WHERE CT name "SizeConstraint"', + {'v': SizeConstraint(size).serialize()}, + ask_confirm=self.verbosity>=2)[0][0] + self.rqlexec('SET X constrained_by C WHERE X from_entity S, X relation_type R, ' + 'S name "%s", R name "%s", C eid %s' % (etype, rtype, ceid), + ask_confirm=self.verbosity>=2) + elif not oldvalue is None: + if not size is None: + self.rqlexec('SET C value %%(v)s WHERE X from_entity S, X relation_type R,' + 'X constrained_by C, C cstrtype CT, CT name "SizeConstraint",' + 'S name "%s", R name "%s"' % (etype, rtype), + {'v': unicode(SizeConstraint(size).serialize())}, + ask_confirm=self.verbosity>=2) + else: + self.rqlexec('DELETE X constrained_by C WHERE X from_entity S, X relation_type R,' + 'X constrained_by C, C cstrtype CT, CT name "SizeConstraint",' + 'S name "%s", R name "%s"' % (etype, rtype), + ask_confirm=self.verbosity>=2) + # cleanup unused constraints + self.rqlexec('DELETE EConstraint C WHERE NOT X constrained_by C') + if commit: + self.commit() + + # Workflows handling ###################################################### + + def cmd_add_state(self, name, stateof, initial=False, commit=False, **kwargs): + """method to ease workflow definition: add a state for one or more + entity type(s) + """ + stateeid = self.cmd_add_entity('State', name=name, **kwargs) + if not isinstance(stateof, (list, tuple)): + stateof = (stateof,) + for etype in stateof: + # XXX ensure etype validity + self.rqlexec('SET X state_of Y WHERE X eid %(x)s, Y name %(et)s', + {'x': stateeid, 'et': etype}, 'x', ask_confirm=False) + if initial: + self.rqlexec('SET ET initial_state S WHERE ET name %(et)s, S eid %(x)s', + {'x': stateeid, 'et': etype}, 'x', ask_confirm=False) + if commit: + self.commit() + return stateeid + + def cmd_add_transition(self, name, transitionof, fromstates, tostate, + requiredgroups=(), conditions=(), commit=False, **kwargs): + """method to ease workflow definition: add a transition for one or more + entity type(s), from one or more state and to a single state + """ + treid = self.cmd_add_entity('Transition', name=name, **kwargs) + if not isinstance(transitionof, (list, tuple)): + transitionof = (transitionof,) + for etype in transitionof: + # XXX ensure etype validity + self.rqlexec('SET X transition_of Y WHERE X eid %(x)s, Y name %(et)s', + {'x': treid, 'et': etype}, 'x', ask_confirm=False) + for stateeid in fromstates: + self.rqlexec('SET X allowed_transition Y WHERE X eid %(x)s, Y eid %(y)s', + {'x': stateeid, 'y': treid}, 'x', ask_confirm=False) + self.rqlexec('SET X destination_state Y WHERE X eid %(x)s, Y eid %(y)s', + {'x': treid, 'y': tostate}, 'x', ask_confirm=False) + self.cmd_set_transition_permissions(treid, requiredgroups, conditions, + reset=False) + if commit: + self.commit() + return treid + + def cmd_set_transition_permissions(self, treid, + requiredgroups=(), conditions=(), + reset=True, commit=False): + """set or add (if `reset` is False) groups and conditions for a + transition + """ + if reset: + self.rqlexec('DELETE T require_group G WHERE T eid %(x)s', + {'x': treid}, 'x', ask_confirm=False) + self.rqlexec('DELETE T condition R WHERE T eid %(x)s', + {'x': treid}, 'x', ask_confirm=False) + for gname in requiredgroups: + ### XXX ensure gname validity + self.rqlexec('SET T require_group G WHERE T eid %(x)s, G name %(gn)s', + {'x': treid, 'gn': gname}, 'x', ask_confirm=False) + if isinstance(conditions, basestring): + conditions = (conditions,) + for expr in conditions: + if isinstance(expr, str): + expr = unicode(expr) + self.rqlexec('INSERT RQLExpression X: X exprtype "ERQLExpression", ' + 'X expression %(expr)s, T condition X ' + 'WHERE T eid %(x)s', + {'x': treid, 'expr': expr}, 'x', ask_confirm=False) + if commit: + self.commit() + + # EProperty handling ###################################################### + + def cmd_property_value(self, pkey): + rql = 'Any V WHERE X is EProperty, X pkey %(k)s, X value V' + rset = self.rqlexec(rql, {'k': pkey}, ask_confirm=False) + return rset[0][0] + + def cmd_set_property(self, pkey, value): + value = unicode(value) + try: + prop = self.rqlexec('EProperty X WHERE X pkey %(k)s', {'k': pkey}, + ask_confirm=False).get_entity(0, 0) + except: + self.cmd_add_entity('EProperty', pkey=unicode(pkey), value=value) + else: + self.rqlexec('SET X value %(v)s WHERE X pkey %(k)s', + {'k': pkey, 'v': value}, ask_confirm=False) + + # other data migration commands ########################################### + + def cmd_add_entity(self, etype, *args, **kwargs): + """add a new entity of the given type""" + rql = 'INSERT %s X' % etype + relations = [] + restrictions = [] + for rtype, rvar in args: + relations.append('X %s %s' % (rtype, rvar)) + restrictions.append('%s eid %s' % (rvar, kwargs.pop(rvar))) + commit = kwargs.pop('commit', False) + for attr in kwargs: + relations.append('X %s %%(%s)s' % (attr, attr)) + if relations: + rql = '%s: %s' % (rql, ', '.join(relations)) + if restrictions: + rql = '%s WHERE %s' % (rql, ', '.join(restrictions)) + eid = self.rqlexec(rql, kwargs, ask_confirm=self.verbosity>=2).rows[0][0] + if commit: + self.commit() + return eid + + def sqlexec(self, sql, args=None, ask_confirm=True): + """execute the given sql if confirmed + + should only be used for low level stuff undoable with existing higher + level actions + """ + if not ask_confirm or self.confirm('execute sql: %s ?' % sql): + self.session.set_pool() # ensure pool is set + try: + cu = self.session.system_sql(sql, args) + except: + ex = sys.exc_info()[1] + if self.confirm('error: %s\nabort?' % ex): + raise + return + try: + return cu.fetchall() + except: + # no result to fetch + return + + def rqlexec(self, rql, kwargs=None, cachekey=None, ask_confirm=True): + """rql action""" + if not isinstance(rql, (tuple, list)): + rql = ( (rql, kwargs), ) + res = None + for rql, kwargs in rql: + if kwargs: + msg = '%s (%s)' % (rql, kwargs) + else: + msg = rql + if not ask_confirm or self.confirm('execute rql: %s ?' % msg): + try: + res = self.rqlcursor.execute(rql, kwargs, cachekey) + except Exception, ex: + if self.confirm('error: %s\nabort?' % ex): + raise + return res + + def rqliter(self, rql, kwargs=None, ask_confirm=True): + return ForRqlIterator(self, rql, None, ask_confirm) + + def cmd_deactivate_verification_hooks(self): + self.repo.hm.deactivate_verification_hooks() + + def cmd_reactivate_verification_hooks(self): + self.repo.hm.reactivate_verification_hooks() + + # broken db commands ###################################################### + + def cmd_change_attribute_type(self, etype, attr, newtype, commit=True): + """low level method to change the type of an entity attribute. This is + a quick hack which has some drawback: + * only works when the old type can be changed to the new type by the + underlying rdbms (eg using ALTER TABLE) + * the actual schema won't be updated until next startup + """ + rschema = self.repo.schema.rschema(attr) + oldtype = rschema.objects(etype)[0] + rdefeid = rschema.rproperty(etype, oldtype, 'eid') + sql = ("UPDATE EFRDef " + "SET to_entity=(SELECT eid FROM EEType WHERE name='%s')" + "WHERE eid=%s") % (newtype, rdefeid) + self.sqlexec(sql, ask_confirm=False) + dbhelper = self.repo.system_source.dbhelper + sqltype = dbhelper.TYPE_MAPPING[newtype] + sql = 'ALTER TABLE %s ALTER COLUMN %s TYPE %s' % (etype, attr, sqltype) + self.sqlexec(sql, ask_confirm=False) + if commit: + self.commit() + + def cmd_add_entity_type_table(self, etype, commit=True): + """low level method to create the sql table for an existing entity. + This may be useful on accidental desync between the repository schema + and a sql database + """ + dbhelper = self.repo.system_source.dbhelper + tablesql = eschema2sql(dbhelper, self.repo.schema.eschema(etype)) + for sql in tablesql.split(';'): + if sql.strip(): + self.sqlexec(sql) + if commit: + self.commit() + + def cmd_add_relation_type_table(self, rtype, commit=True): + """low level method to create the sql table for an existing relation. + This may be useful on accidental desync between the repository schema + and a sql database + """ + dbhelper = self.repo.system_source.dbhelper + tablesql = rschema2sql(dbhelper, self.repo.schema.rschema(rtype)) + for sql in tablesql.split(';'): + if sql.strip(): + self.sqlexec(sql) + if commit: + self.commit() + + +class ForRqlIterator: + """specific rql iterator to make the loop skipable""" + def __init__(self, helper, rql, kwargs, ask_confirm): + self._h = helper + self.rql = rql + self.kwargs = kwargs + self.ask_confirm = ask_confirm + self._rsetit = None + + def __iter__(self): + return self + + def next(self): + if self._rsetit is not None: + return self._rsetit.next() + rql, kwargs = self.rql, self.kwargs + if kwargs: + msg = '%s (%s)' % (rql, kwargs) + else: + msg = rql + if self.ask_confirm: + if not self._h.confirm('execute rql: %s ?' % msg): + raise StopIteration + try: + #print rql, kwargs + rset = self._h.rqlcursor.execute(rql, kwargs) + except Exception, ex: + if self._h.confirm('error: %s\nabort?' % ex): + raise + else: + raise StopIteration + self._rsetit = iter(rset) + return self._rsetit.next()