[multi-sources] support for moving an entity from an external source (closes #343818)
authorSylvain Thénault <sylvain.thenault@logilab.fr>
Thu, 19 May 2011 10:53:17 +0200
changeset 7399 972ed1843bd8
parent 7398 26695dd703d8
child 7400 2391a6f526bf
[multi-sources] support for moving an entity from an external source (closes #343818) Original need is to move a user from a ldap source to the system source so we can delete it from ldap without loosing information into the cubicweb instance. We can't wait for the user to be deleted from the ldap since it will be too late then to get back user attributes, so it has to be a manual operation to operate before actual deletion. This makes sense for other sources as well. So the idea is to make the "Any cw_source CWSource" relation editable by managers, and to watch changes of it. We then check the move is possible (ie from an external source to the system source) and do necessary stuff (essentially changing source information and copying data into the system source). Remaining pb is that we don't want the moved entity to be reimported later. To distinguish this state, the trick is to change the associated record in the 'entities' system table with eid=-eid while leaving other fields unchanged, and to add a new record with eid=eid, source='system'. External source will then have consider case where `extid2eid` return a negative eid as 'this entity was known but has been moved, ignore it'. Notice no ui is provided yet, it has currently to be done in a c-c shell.
hooks/metadata.py
hooks/syncsources.py
misc/migration/3.13.0_Any.py
schemas/base.py
server/repository.py
server/sources/datafeed.py
server/sources/ldapuser.py
server/sources/pyrorql.py
server/test/unittest_ldapuser.py
sobjects/parsers.py
sobjects/test/unittest_parsers.py
--- a/hooks/metadata.py	Thu May 19 10:53:11 2011 +0200
+++ b/hooks/metadata.py	Thu May 19 10:53:17 2011 +0200
@@ -23,6 +23,7 @@
 
 from cubicweb.selectors import is_instance
 from cubicweb.server import hook
+from cubicweb.server.edition import EditedEntity
 
 
 class MetaDataHook(hook.Hook):
@@ -142,3 +143,73 @@
             session.repo.system_source.index_entity(
                 session, session.entity_from_eid(self.eidto))
 
+
+
+# entity source handling #######################################################
+
+class ChangeEntityUpdateCaches(hook.Operation):
+    def postcommit_event(self):
+        self.oldsource.reset_caches()
+        repo = self.session.repo
+        entity = self.entity
+        extid = entity.cw_metainformation()['extid']
+        repo._type_source_cache[entity.eid] = (
+            entity.__regid__, self.newsource.uri, None)
+        if self.oldsource.copy_based_source:
+            uri = 'system'
+        else:
+            uri = self.oldsource.uri
+        repo._extid_cache[(extid, uri)] = -entity.eid
+
+class ChangeEntitySourceDeleteHook(MetaDataHook):
+    """support for moving an entity from an external source by watching 'Any
+    cw_source CWSource' relation
+    """
+
+    __regid__ = 'cw.metadata.source-change'
+    __select__ = MetaDataHook.__select__ & hook.match_rtype('cw_source')
+    events = ('before_delete_relation',)
+
+    def __call__(self):
+        if (self._cw.deleted_in_transaction(self.eidfrom)
+            or self._cw.deleted_in_transaction(self.eidto)):
+            return
+        schange = self._cw.transaction_data.setdefault('cw_source_change', {})
+        schange[self.eidfrom] = self.eidto
+
+class ChangeEntitySourceAddHook(MetaDataHook):
+    __regid__ = 'cw.metadata.source-change'
+    __select__ = MetaDataHook.__select__ & hook.match_rtype('cw_source')
+    events = ('before_add_relation',)
+
+    def __call__(self):
+        schange = self._cw.transaction_data.get('cw_source_change')
+        if schange is not None and self.eidfrom in schange:
+            newsource = self._cw.entity_from_eid(self.eidto)
+            if newsource.name != 'system':
+                raise Exception('changing source to something else than the '
+                                'system source is unsupported')
+            syssource = newsource.repo_source
+            oldsource = self._cw.entity_from_eid(schange[self.eidfrom])
+            entity = self._cw.entity_from_eid(self.eidfrom)
+            # copy entity if necessary
+            if not oldsource.repo_source.copy_based_source:
+                entity.complete(skip_bytes=False)
+                entity.cw_edited = EditedEntity(entity, **entity.cw_attr_cache)
+                syssource.add_entity(self._cw, entity)
+            # we don't want the moved entity to be reimported later.  To
+            # distinguish this state, the trick is to change the associated
+            # record in the 'entities' system table with eid=-eid while leaving
+            # other fields unchanged, and to add a new record with eid=eid,
+            # source='system'. External source will then have consider case
+            # where `extid2eid` return a negative eid as 'this entity was known
+            # but has been moved, ignore it'.
+            self._cw.system_sql('UPDATE entities SET eid=-eid,source=%(source)s WHERE eid=%(eid)s',
+                                {'eid': self.eidfrom, 'source': newsource.name})
+            attrs = {'type': entity.__regid__, 'eid': entity.eid, 'extid': None,
+                     'source': 'system', 'mtime': datetime.now()}
+            self._cw.system_sql(syssource.sqlgen.insert('entities', attrs), attrs)
+            # register an operation to update repository/sources caches
+            ChangeEntityUpdateCaches(self._cw, entity=entity,
+                                     oldsource=oldsource.repo_source,
+                                     newsource=syssource)
--- a/hooks/syncsources.py	Thu May 19 10:53:11 2011 +0200
+++ b/hooks/syncsources.py	Thu May 19 10:53:17 2011 +0200
@@ -30,6 +30,8 @@
     category = 'cw.sources'
 
 
+# repo sources synchronization #################################################
+
 class SourceAddedOp(hook.Operation):
     def postcommit_event(self):
         self.session.repo.add_source(self.entity)
@@ -100,8 +102,10 @@
                 pass
 
 
-# source mapping synchronization. Expect cw_for_source/cw_schema are immutable
-# relations (i.e. can't change from a source or schema to another).
+# source mapping synchronization ###############################################
+#
+# Expect cw_for_source/cw_schema are immutable relations (i.e. can't change from
+# a source or schema to another).
 
 class SourceMappingDeleteHook(SourceHook):
     """check cw_for_source and cw_schema are immutable relations
@@ -161,3 +165,4 @@
         SourceMappingChangedOp.get_instance(self._cw).add_data(
             (self._cw.entity_from_eid(self.eidfrom),
              self._cw.entity_from_eid(self.eidto)) )
+
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/misc/migration/3.13.0_Any.py	Thu May 19 10:53:17 2011 +0200
@@ -0,0 +1,1 @@
+sync_schema_props_perms('cw_source', syncprops=False)
--- a/schemas/base.py	Thu May 19 10:53:11 2011 +0200
+++ b/schemas/base.py	Thu May 19 10:53:17 2011 +0200
@@ -307,8 +307,8 @@
 class cw_source(RelationDefinition):
     __permissions__ = {
         'read':   ('managers', 'users', 'guests'),
-        'add':    (),
-        'delete': (),
+        'add':    ('managers',),
+        'delete': ('managers',),
         }
     subject = '*'
     object = 'CWSource'
--- a/server/repository.py	Thu May 19 10:53:11 2011 +0200
+++ b/server/repository.py	Thu May 19 10:53:17 2011 +0200
@@ -160,7 +160,7 @@
         self.sources_by_uri = {'system': self.system_source}
         # querier helper, need to be created after sources initialization
         self.querier = querier.QuerierHelper(self, self.schema)
-        # cache eid -> type / source
+        # cache eid -> (type, source, extid)
         self._type_source_cache = {}
         # cache (extid, source uri) -> eid
         self._extid_cache = {}
@@ -1032,7 +1032,28 @@
 
     def extid2eid(self, source, extid, etype, session=None, insert=True,
                   sourceparams=None):
-        """get eid from a local id. An eid is attributed if no record is found"""
+        """Return eid from a local id. If the eid is a negative integer, that
+        means the entity is known but has been copied back to the system source
+        hence should be ignored.
+
+        If no record is found, ie the entity is not known yet:
+
+        1. an eid is attributed
+
+        2. the source's :meth:`before_entity_insertion` method is called to
+           build the entity instance
+
+        3. unless source's :attr:`should_call_hooks` tell otherwise,
+          'before_add_entity' hooks are called
+
+        4. record is added into the system source
+
+        5. the source's :meth:`after_entity_insertion` method is called to
+           complete building of the entity instance
+
+        6. unless source's :attr:`should_call_hooks` tell otherwise,
+          'before_add_entity' hooks are called
+        """
         uri = 'system' if source.copy_based_source else source.uri
         cachekey = (extid, uri)
         try:
--- a/server/sources/datafeed.py	Thu May 19 10:53:11 2011 +0200
+++ b/server/sources/datafeed.py	Thu May 19 10:53:17 2011 +0200
@@ -27,7 +27,7 @@
 
 from lxml import etree
 
-from cubicweb import RegistryNotFound, ObjectNotFound, ValidationError
+from cubicweb import RegistryNotFound, ObjectNotFound, ValidationError, UnknownEid
 from cubicweb.server.sources import AbstractSource
 from cubicweb.appobject import AppObject
 
@@ -218,9 +218,23 @@
         raise ValidationError(schemacfg.eid, {None: msg})
 
     def extid2entity(self, uri, etype, **sourceparams):
+        """return an entity for the given uri. May return None if it should be
+        skipped
+        """
         sourceparams['parser'] = self
         eid = self.source.extid2eid(str(uri), etype, self._cw,
                                     sourceparams=sourceparams)
+        if eid < 0:
+            # entity has been moved away from its original source
+            #
+            # Don't give etype to entity_from_eid so we get UnknownEid if the
+            # entity has been removed
+            try:
+                entity = self._cw.entity_from_eid(-eid)
+            except UnknownEid:
+                return None
+            self.notify_updated(entity) # avoid later update from the source's data
+            return entity
         if self.sourceuris is not None:
             self.sourceuris.pop(str(uri), None)
         return self._cw.entity_from_eid(eid, etype)
--- a/server/sources/ldapuser.py	Thu May 19 10:53:11 2011 +0200
+++ b/server/sources/ldapuser.py	Thu May 19 10:53:17 2011 +0200
@@ -310,7 +310,11 @@
         except Exception:
             self.error('while trying to authenticate %s', user, exc_info=True)
             raise AuthenticationError()
-        return self.extid2eid(user['dn'], 'CWUser', session)
+        eid = self.extid2eid(user['dn'], 'CWUser', session)
+        if eid < 0:
+            # user has been moved away from this source
+            raise AuthenticationError()
+        return eid
 
     def ldap_name(self, var):
         if var.stinfo['relations']:
@@ -392,7 +396,7 @@
                     break
         assert mainvars, rqlst
         columns, globtransforms = self.prepare_columns(mainvars, rqlst)
-        eidfilters = []
+        eidfilters = [lambda x: x > 0]
         allresults = []
         generator = RQL2LDAPFilter(self, session, args, mainvars)
         for mainvar in mainvars:
--- a/server/sources/pyrorql.py	Thu May 19 10:53:11 2011 +0200
+++ b/server/sources/pyrorql.py	Thu May 19 10:53:17 2011 +0200
@@ -234,8 +234,10 @@
         etype, dexturi, dextid = cnx.describe(extid)
         if dexturi == 'system' or not (
             dexturi in self.repo.sources_by_uri or self._skip_externals):
-            return self.repo.extid2eid(self, str(extid), etype, session), True
-        if dexturi in self.repo.sources_by_uri:
+            eid = self.repo.extid2eid(self, str(extid), etype, session)
+            if eid > 0:
+                return eid, True
+        elif dexturi in self.repo.sources_by_uri:
             source = self.repo.sources_by_uri[dexturi]
             cnx = session.cnxset.connection(source.uri)
             eid = source.local_eid(cnx, dextid, session)[0]
--- a/server/test/unittest_ldapuser.py	Thu May 19 10:53:11 2011 +0200
+++ b/server/test/unittest_ldapuser.py	Thu May 19 10:53:17 2011 +0200
@@ -378,6 +378,23 @@
         rset = cu.execute('Any F WHERE X has_text "iaminguestsgrouponly", X firstname F')
         self.assertEqual(rset.rows, [[None]])
 
+    def test_copy_to_system_source(self):
+        eid = self.sexecute('CWUser X WHERE X login %(login)s', {'login': SYT})[0][0]
+        self.sexecute('SET X cw_source S WHERE X eid %(x)s, S name "system"', {'x': eid})
+        self.commit()
+        rset = self.sexecute('CWUser X WHERE X login %(login)s', {'login': SYT})
+        self.assertEqual(len(rset), 1)
+        e = rset.get_entity(0, 0)
+        self.assertEqual(e.eid, eid)
+        self.assertEqual(e.cw_metainformation(), {'source': {'type': u'native', 'uri': u'system'},
+                                                  'type': 'CWUser',
+                                                  'extid': None})
+        self.assertEqual(e.cw_source[0].name, 'system')
+        source = self.repo.sources_by_uri['ldapuser']
+        source.synchronize()
+        rset = self.sexecute('CWUser X WHERE X login %(login)s', {'login': SYT})
+        self.assertEqual(len(rset), 1)
+
     def test_nonregr1(self):
         self.sexecute('Any X,AA ORDERBY AA DESC WHERE E eid %(x)s, E owned_by X, '
                      'X modification_date AA',
--- a/sobjects/parsers.py	Thu May 19 10:53:11 2011 +0200
+++ b/sobjects/parsers.py	Thu May 19 10:53:17 2011 +0200
@@ -202,6 +202,8 @@
     def process_item(self, item, rels):
         entity = self.extid2entity(str(item.pop('cwuri')),  item.pop('cwtype'),
                                    item=item)
+        if entity is None:
+            return None
         if not (self.created_during_pull(entity) or self.updated_during_pull(entity)):
             self.notify_updated(entity)
             item.pop('eid')
@@ -233,17 +235,18 @@
         Takes no option.
         """
         assert not any(x[1] for x in rules), "'copy' action takes no option"
-        ttypes = set([x[0] for x in rules])
-        others = [item for item in others if item['cwtype'] in ttypes]
+        ttypes = frozenset([x[0] for x in rules])
         eids = [] # local eids
-        if not others:
+        for item in others:
+            if item['cwtype'] in ttypes:
+                item, _rels = self._complete_item(item)
+                other_entity = self.process_item(item, [])
+                if other_entity is not None:
+                    eids.append(other_entity.eid)
+        if eids:
+            self._set_relation(entity, rtype, role, eids)
+        else:
             self._clear_relation(entity, rtype, role, ttypes)
-            return
-        for item in others:
-            item, _rels = self._complete_item(item)
-            other_entity = self.process_item(item, [])
-            eids.append(other_entity.eid)
-        self._set_relation(entity, rtype, role, eids)
 
     def related_link(self, entity, rtype, role, others, rules):
         """implementation of 'link' action
@@ -297,10 +300,10 @@
             else:
                 self.source.error('can not find %s entity with attributes %s',
                                   item['cwtype'], kwargs)
-        if not eids:
+        if eids:
+            self._set_relation(entity, rtype, role, eids)
+        else:
             self._clear_relation(entity, rtype, role, (ttype,))
-        else:
-            self._set_relation(entity, rtype, role, eids)
 
     def _complete_item(self, item, add_relations=True):
         itemurl = item['cwuri'] + '?vid=xml'
@@ -321,18 +324,16 @@
                              {'x': entity.eid})
 
     def _set_relation(self, entity, rtype, role, eids):
+        assert eids
         rqlbase = rtype_role_rql(rtype, role)
-        rql = 'DELETE %s' % rqlbase
-        if eids:
-            eidstr = ','.join(str(eid) for eid in eids)
-            rql += ', NOT Y eid IN (%s)' % eidstr
+        eidstr = ','.join(str(eid) for eid in eids)
+        self._cw.execute('DELETE %s, NOT Y eid IN (%s)' % (rqlbase, eidstr),
+                         {'x': entity.eid})
+        if role == 'object':
+            rql = 'SET %s, Y eid IN (%s), NOT Y %s X' % (rqlbase, eidstr, rtype)
+        else:
+            rql = 'SET %s, Y eid IN (%s), NOT X %s Y' % (rqlbase, eidstr, rtype)
         self._cw.execute(rql, {'x': entity.eid})
-        if eids:
-            if role == 'object':
-                rql = 'SET %s, Y eid IN (%s), NOT Y %s X' % (rqlbase, eidstr, rtype)
-            else:
-                rql = 'SET %s, Y eid IN (%s), NOT X %s Y' % (rqlbase, eidstr, rtype)
-            self._cw.execute(rql, {'x': entity.eid})
 
 def registration_callback(vreg):
     vreg.register_all(globals().values(), __name__)
--- a/sobjects/test/unittest_parsers.py	Thu May 19 10:53:11 2011 +0200
+++ b/sobjects/test/unittest_parsers.py	Thu May 19 10:53:17 2011 +0200
@@ -164,6 +164,45 @@
         stats = dfsource.pull_data(session, force=True, raise_on_error=True)
         self.assertEqual(stats['created'], set())
         self.assertEqual(len(stats['updated']), 2)
+        session.commit()
+
+        # test move to system source
+        self.sexecute('SET X cw_source S WHERE X eid %(x)s, S name "system"', {'x': email.eid})
+        self.commit()
+        rset = self.sexecute('EmailAddress X WHERE X address "syt@logilab.fr"')
+        self.assertEqual(len(rset), 1)
+        e = rset.get_entity(0, 0)
+        self.assertEqual(e.eid, email.eid)
+        self.assertEqual(e.cw_metainformation(), {'source': {'type': u'native', 'uri': u'system'},
+                                                  'type': 'EmailAddress',
+                                                  'extid': None})
+        self.assertEqual(e.cw_source[0].name, 'system')
+        self.assertEqual(e.reverse_use_email[0].login, 'sthenault')
+        self.commit()
+        # test everything is still fine after source synchronization
+        session.set_pool()
+        stats = dfsource.pull_data(session, force=True, raise_on_error=True)
+        rset = self.sexecute('EmailAddress X WHERE X address "syt@logilab.fr"')
+        self.assertEqual(len(rset), 1)
+        e = rset.get_entity(0, 0)
+        self.assertEqual(e.eid, email.eid)
+        self.assertEqual(e.cw_metainformation(), {'source': {'type': u'native', 'uri': u'system'},
+                                                  'type': 'EmailAddress',
+                                                  'extid': None})
+        self.assertEqual(e.cw_source[0].name, 'system')
+        self.assertEqual(e.reverse_use_email[0].login, 'sthenault')
+        session.commit()
+
+        # test delete entity
+        e.cw_delete()
+        self.commit()
+        # test everything is still fine after source synchronization
+        session.set_pool()
+        stats = dfsource.pull_data(session, force=True, raise_on_error=True)
+        rset = self.sexecute('EmailAddress X WHERE X address "syt@logilab.fr"')
+        self.assertEqual(len(rset), 0)
+        rset = self.sexecute('Any X WHERE X use_email E, X login "sthenault"')
+        self.assertEqual(len(rset), 0)
 
 if __name__ == '__main__':
     from logilab.common.testlib import unittest_main