goa/gaesource.py
changeset 0 b97547f5f1fa
child 1132 96752791c2b6
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/goa/gaesource.py	Wed Nov 05 15:52:50 2008 +0100
@@ -0,0 +1,320 @@
+"""Adapter for google appengine source.
+
+:organization: Logilab
+:copyright: 2008 LOGILAB S.A. (Paris, FRANCE), all rights reserved.
+:contact: http://www.logilab.fr/ -- mailto:contact@logilab.fr
+"""
+__docformat__ = "restructuredtext en"
+
+from logilab.common.decorators import cached, clear_cache
+
+from cubicweb import AuthenticationError, UnknownEid, server
+from cubicweb.server.sources import AbstractSource, ConnectionWrapper
+from cubicweb.server.pool import SingleOperation
+from cubicweb.server.utils import crypt_password
+from cubicweb.goa.dbinit import set_user_groups
+from cubicweb.goa.rqlinterpreter import RQLInterpreter
+
+from google.appengine.api.datastore import Key, Entity, Get, Put, Delete
+from google.appengine.api.datastore import Query
+from google.appengine.api import datastore_errors, users
+    
+def _init_groups(guser, euser):
+    # set default groups
+    if guser is None:
+        groups = ['guests']
+    else:
+        groups = ['users']
+        if users.is_current_user_admin():
+            groups.append('managers')
+    set_user_groups(euser, groups)
+
+def _clear_related_cache(session, gaesubject, rtype, gaeobject):
+    subject, object = str(gaesubject.key()), str(gaeobject.key())
+    for eid, role in ((subject, 'subject'), (object, 'object')):
+        # clear related cache if necessary
+        try:
+            entity = session.entity_cache(eid)
+        except KeyError:
+            pass
+        else:
+            entity.clear_related_cache(rtype, role)
+    if gaesubject.kind() == 'EUser':
+        for asession in session.repo._sessions.itervalues():
+            if asession.user.eid == subject:
+                asession.user.clear_related_cache(rtype, 'subject')
+    if gaeobject.kind() == 'EUser':
+        for asession in session.repo._sessions.itervalues():
+            if asession.user.eid == object:
+                asession.user.clear_related_cache(rtype, 'object')
+
+def _mark_modified(session, gaeentity):
+    modified = session.query_data('modifiedentities', {}, setdefault=True)
+    modified[str(gaeentity.key())] = gaeentity
+    DatastorePutOp(session)
+
+def _rinfo(session, subject, rtype, object):
+    gaesubj = session.datastore_get(subject)
+    gaeobj = session.datastore_get(object)
+    rschema = session.vreg.schema.rschema(rtype)
+    cards = rschema.rproperty(gaesubj.kind(), gaeobj.kind(), 'cardinality')
+    return gaesubj, gaeobj, cards
+
+def _radd(session, gaeentity, targetkey, relation, card):
+    if card in '?1':
+        gaeentity[relation] = targetkey
+    else:
+        try:
+            related = gaeentity[relation]
+        except KeyError:
+            related = []
+        else:
+            if related is None:
+                related = []
+        related.append(targetkey)
+        gaeentity[relation] = related
+    _mark_modified(session, gaeentity)
+
+def _rdel(session, gaeentity, targetkey, relation, card):
+    if card in '?1':
+        gaeentity[relation] = None
+    else:
+        related = gaeentity[relation]
+        if related is not None:
+            related = [key for key in related if not key == targetkey]
+            gaeentity[relation] = related or None
+    _mark_modified(session, gaeentity)
+
+    
+class DatastorePutOp(SingleOperation):
+    """delayed put of entities to have less datastore write api calls
+
+    * save all modified entities at precommit (should be the first operation
+      processed, hence the 0 returned by insert_index())
+      
+    * in case others precommit operations modify some entities, resave modified
+      entities at commit. This suppose that no db changes will occurs during
+      commit event but it should be the case.
+    """
+    def insert_index(self):
+        return 0
+
+    def _put_entities(self):
+        pending = self.session.query_data('pendingeids', ())
+        modified = self.session.query_data('modifiedentities', {})
+        for eid, gaeentity in modified.iteritems():
+            assert not eid in pending
+            Put(gaeentity)
+        modified.clear()
+        
+    def commit_event(self):
+        self._put_entities()
+        
+    def precommit_event(self):
+        self._put_entities()
+
+
+class GAESource(AbstractSource):
+    """adapter for a system source on top of google appengine datastore"""
+
+    passwd_rql = "Any P WHERE X is EUser, X login %(login)s, X upassword P"
+    auth_rql = "Any X WHERE X is EUser, X login %(login)s, X upassword %(pwd)s"
+    _sols = ({'X': 'EUser', 'P': 'Password'},)
+    
+    options = ()
+    
+    def __init__(self, repo, appschema, source_config, *args, **kwargs):
+        AbstractSource.__init__(self, repo, appschema, source_config,
+                                *args, **kwargs)
+        if repo.config['use-google-auth']:
+            self.info('using google authentication service')
+            self.authenticate = self.authenticate_gauth
+        else:
+            self.authenticate = self.authenticate_local
+            
+    def reset_caches(self):
+        """method called during test to reset potential source caches"""
+        pass
+    
+    def init_creating(self):
+        pass
+
+    def init(self):
+        # XXX unregister unsupported hooks
+        from cubicweb.server.hooks import sync_owner_after_add_composite_relation
+        self.repo.hm.unregister_hook(sync_owner_after_add_composite_relation,
+                                     'after_add_relation', '')
+
+    def get_connection(self):
+        return ConnectionWrapper()
+    
+    # ISource interface #######################################################
+
+    def compile_rql(self, rql):
+        rqlst = self.repo.querier._rqlhelper.parse(rql)
+        rqlst.restricted_vars = ()
+        rqlst.children[0].solutions = self._sols
+        return rqlst
+    
+    def set_schema(self, schema):
+        """set the application'schema"""
+        self.interpreter = RQLInterpreter(schema)
+        self.schema = schema
+        if 'EUser' in schema and not self.repo.config['use-google-auth']:
+            # rql syntax trees used to authenticate users
+            self._passwd_rqlst = self.compile_rql(self.passwd_rql)
+            self._auth_rqlst = self.compile_rql(self.auth_rql)
+                
+    def support_entity(self, etype, write=False):
+        """return true if the given entity's type is handled by this adapter
+        if write is true, return true only if it's a RW support
+        """
+        return True
+    
+    def support_relation(self, rtype, write=False):
+        """return true if the given relation's type is handled by this adapter
+        if write is true, return true only if it's a RW support
+        """
+        return True
+
+    def authenticate_gauth(self, session, login, password):
+        guser = users.get_current_user()
+        # allowing or not anonymous connection should be done in the app.yaml
+        # file, suppose it's authorized if we are there
+        if guser is None:
+            login = u'anonymous'
+        else:
+            login = unicode(guser.nickname())
+        # XXX http://code.google.com/appengine/docs/users/userobjects.html
+        # use a reference property to automatically work with email address
+        # changes after the propagation feature is implemented
+        key = Key.from_path('EUser', 'key_' + login, parent=None)
+        try:
+            euser = session.datastore_get(key)
+            # XXX fix user. Required until we find a better way to fix broken records
+            if not euser.get('s_in_group'):
+                _init_groups(guser, euser)
+                Put(euser)
+            return str(key)
+        except datastore_errors.EntityNotFoundError:
+            # create a record for this user
+            euser = Entity('EUser', name='key_' + login)
+            euser['s_login'] = login
+            _init_groups(guser, euser)
+            Put(euser)
+            return str(euser.key())
+        
+    def authenticate_local(self, session, login, password):
+        """return EUser eid for the given login/password if this account is
+        defined in this source, else raise `AuthenticationError`
+
+        two queries are needed since passwords are stored crypted, so we have
+        to fetch the salt first
+        """
+        args = {'login': login, 'pwd' : password}
+        if password is not None:
+            rset = self.syntax_tree_search(session, self._passwd_rqlst, args)
+            try:
+                pwd = rset[0][0]
+            except IndexError:
+                raise AuthenticationError('bad login')
+            # passwords are stored using the bytea type, so we get a StringIO
+            if pwd is not None:
+                args['pwd'] = crypt_password(password, pwd[:2])
+        # get eid from login and (crypted) password
+        rset = self.syntax_tree_search(session, self._auth_rqlst, args)
+        try:
+            return rset[0][0]
+        except IndexError:
+            raise AuthenticationError('bad password')
+    
+    def syntax_tree_search(self, session, union, args=None, cachekey=None, 
+                           varmap=None):
+        """return result from this source for a rql query (actually from a rql
+        syntax tree and a solution dictionary mapping each used variable to a
+        possible type). If cachekey is given, the query necessary to fetch the
+        results (but not the results themselves) may be cached using this key.
+        """
+        results, description = self.interpreter.interpret(union, args,
+                                                          session.datastore_get)
+        return results # XXX description
+                
+    def flying_insert(self, table, session, union, args=None, varmap=None):
+        raise NotImplementedError
+    
+    def add_entity(self, session, entity):
+        """add a new entity to the source"""
+        # do not delay add_entity as other modifications, new created entity
+        # needs an eid
+        entity.put()
+        
+    def update_entity(self, session, entity):
+        """replace an entity in the source"""
+        gaeentity = entity.to_gae_model()
+        _mark_modified(session, entity.to_gae_model())
+        if gaeentity.kind() == 'EUser':
+            for asession in self.repo._sessions.itervalues():
+                if asession.user.eid == entity.eid:
+                    asession.user.update(dict(gaeentity))
+                
+    def delete_entity(self, session, etype, eid):
+        """delete an entity from the source"""
+        # do not delay delete_entity as other modifications to ensure
+        # consistency
+        key = Key(eid)
+        Delete(key)
+        session.clear_datastore_cache(key)
+        session.drop_entity_cache(eid)
+        session.query_data('modifiedentities', {}).pop(eid, None)
+
+    def add_relation(self, session, subject, rtype, object):
+        """add a relation to the source"""
+        gaesubj, gaeobj, cards = _rinfo(session, subject, rtype, object)
+        _radd(session, gaesubj, gaeobj.key(), 's_' + rtype, cards[0])
+        _radd(session, gaeobj, gaesubj.key(), 'o_' + rtype, cards[1])
+        _clear_related_cache(session, gaesubj, rtype, gaeobj)
+            
+    def delete_relation(self, session, subject, rtype, object):
+        """delete a relation from the source"""
+        gaesubj, gaeobj, cards = _rinfo(session, subject, rtype, object)
+        pending = session.query_data('pendingeids', set(), setdefault=True)
+        if not subject in pending:
+            _rdel(session, gaesubj, gaeobj.key(), 's_' + rtype, cards[0])
+        if not object in pending:
+            _rdel(session, gaeobj, gaesubj.key(), 'o_' + rtype, cards[1])
+        _clear_related_cache(session, gaesubj, rtype, gaeobj)
+        
+    # system source interface #################################################
+
+    def eid_type_source(self, session, eid):
+        """return a tuple (type, source, extid) for the entity with id <eid>"""
+        try:
+            key = Key(eid)
+        except datastore_errors.BadKeyError:
+            raise UnknownEid(eid)
+        return key.kind(), 'system', None
+    
+    def create_eid(self, session):
+        return None # let the datastore generating key
+
+    def add_info(self, session, entity, source, extid=None):
+        """add type and source info for an eid into the system table"""
+        pass
+
+    def delete_info(self, session, eid, etype, uri, extid):
+        """delete system information on deletion of an entity by transfering
+        record from the entities table to the deleted_entities table
+        """
+        pass
+        
+    def fti_unindex_entity(self, session, eid):
+        """remove text content for entity with the given eid from the full text
+        index
+        """
+        pass
+        
+    def fti_index_entity(self, session, entity):
+        """add text content of a created/modified entity to the full text index
+        """
+        pass
+