diff -r 000000000000 -r b97547f5f1fa goa/gaesource.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/goa/gaesource.py Wed Nov 05 15:52:50 2008 +0100 @@ -0,0 +1,320 @@ +"""Adapter for google appengine source. + +:organization: Logilab +:copyright: 2008 LOGILAB S.A. (Paris, FRANCE), all rights reserved. +:contact: http://www.logilab.fr/ -- mailto:contact@logilab.fr +""" +__docformat__ = "restructuredtext en" + +from logilab.common.decorators import cached, clear_cache + +from cubicweb import AuthenticationError, UnknownEid, server +from cubicweb.server.sources import AbstractSource, ConnectionWrapper +from cubicweb.server.pool import SingleOperation +from cubicweb.server.utils import crypt_password +from cubicweb.goa.dbinit import set_user_groups +from cubicweb.goa.rqlinterpreter import RQLInterpreter + +from google.appengine.api.datastore import Key, Entity, Get, Put, Delete +from google.appengine.api.datastore import Query +from google.appengine.api import datastore_errors, users + +def _init_groups(guser, euser): + # set default groups + if guser is None: + groups = ['guests'] + else: + groups = ['users'] + if users.is_current_user_admin(): + groups.append('managers') + set_user_groups(euser, groups) + +def _clear_related_cache(session, gaesubject, rtype, gaeobject): + subject, object = str(gaesubject.key()), str(gaeobject.key()) + for eid, role in ((subject, 'subject'), (object, 'object')): + # clear related cache if necessary + try: + entity = session.entity_cache(eid) + except KeyError: + pass + else: + entity.clear_related_cache(rtype, role) + if gaesubject.kind() == 'EUser': + for asession in session.repo._sessions.itervalues(): + if asession.user.eid == subject: + asession.user.clear_related_cache(rtype, 'subject') + if gaeobject.kind() == 'EUser': + for asession in session.repo._sessions.itervalues(): + if asession.user.eid == object: + asession.user.clear_related_cache(rtype, 'object') + +def _mark_modified(session, gaeentity): + modified = session.query_data('modifiedentities', {}, setdefault=True) + modified[str(gaeentity.key())] = gaeentity + DatastorePutOp(session) + +def _rinfo(session, subject, rtype, object): + gaesubj = session.datastore_get(subject) + gaeobj = session.datastore_get(object) + rschema = session.vreg.schema.rschema(rtype) + cards = rschema.rproperty(gaesubj.kind(), gaeobj.kind(), 'cardinality') + return gaesubj, gaeobj, cards + +def _radd(session, gaeentity, targetkey, relation, card): + if card in '?1': + gaeentity[relation] = targetkey + else: + try: + related = gaeentity[relation] + except KeyError: + related = [] + else: + if related is None: + related = [] + related.append(targetkey) + gaeentity[relation] = related + _mark_modified(session, gaeentity) + +def _rdel(session, gaeentity, targetkey, relation, card): + if card in '?1': + gaeentity[relation] = None + else: + related = gaeentity[relation] + if related is not None: + related = [key for key in related if not key == targetkey] + gaeentity[relation] = related or None + _mark_modified(session, gaeentity) + + +class DatastorePutOp(SingleOperation): + """delayed put of entities to have less datastore write api calls + + * save all modified entities at precommit (should be the first operation + processed, hence the 0 returned by insert_index()) + + * in case others precommit operations modify some entities, resave modified + entities at commit. This suppose that no db changes will occurs during + commit event but it should be the case. + """ + def insert_index(self): + return 0 + + def _put_entities(self): + pending = self.session.query_data('pendingeids', ()) + modified = self.session.query_data('modifiedentities', {}) + for eid, gaeentity in modified.iteritems(): + assert not eid in pending + Put(gaeentity) + modified.clear() + + def commit_event(self): + self._put_entities() + + def precommit_event(self): + self._put_entities() + + +class GAESource(AbstractSource): + """adapter for a system source on top of google appengine datastore""" + + passwd_rql = "Any P WHERE X is EUser, X login %(login)s, X upassword P" + auth_rql = "Any X WHERE X is EUser, X login %(login)s, X upassword %(pwd)s" + _sols = ({'X': 'EUser', 'P': 'Password'},) + + options = () + + def __init__(self, repo, appschema, source_config, *args, **kwargs): + AbstractSource.__init__(self, repo, appschema, source_config, + *args, **kwargs) + if repo.config['use-google-auth']: + self.info('using google authentication service') + self.authenticate = self.authenticate_gauth + else: + self.authenticate = self.authenticate_local + + def reset_caches(self): + """method called during test to reset potential source caches""" + pass + + def init_creating(self): + pass + + def init(self): + # XXX unregister unsupported hooks + from cubicweb.server.hooks import sync_owner_after_add_composite_relation + self.repo.hm.unregister_hook(sync_owner_after_add_composite_relation, + 'after_add_relation', '') + + def get_connection(self): + return ConnectionWrapper() + + # ISource interface ####################################################### + + def compile_rql(self, rql): + rqlst = self.repo.querier._rqlhelper.parse(rql) + rqlst.restricted_vars = () + rqlst.children[0].solutions = self._sols + return rqlst + + def set_schema(self, schema): + """set the application'schema""" + self.interpreter = RQLInterpreter(schema) + self.schema = schema + if 'EUser' in schema and not self.repo.config['use-google-auth']: + # rql syntax trees used to authenticate users + self._passwd_rqlst = self.compile_rql(self.passwd_rql) + self._auth_rqlst = self.compile_rql(self.auth_rql) + + def support_entity(self, etype, write=False): + """return true if the given entity's type is handled by this adapter + if write is true, return true only if it's a RW support + """ + return True + + def support_relation(self, rtype, write=False): + """return true if the given relation's type is handled by this adapter + if write is true, return true only if it's a RW support + """ + return True + + def authenticate_gauth(self, session, login, password): + guser = users.get_current_user() + # allowing or not anonymous connection should be done in the app.yaml + # file, suppose it's authorized if we are there + if guser is None: + login = u'anonymous' + else: + login = unicode(guser.nickname()) + # XXX http://code.google.com/appengine/docs/users/userobjects.html + # use a reference property to automatically work with email address + # changes after the propagation feature is implemented + key = Key.from_path('EUser', 'key_' + login, parent=None) + try: + euser = session.datastore_get(key) + # XXX fix user. Required until we find a better way to fix broken records + if not euser.get('s_in_group'): + _init_groups(guser, euser) + Put(euser) + return str(key) + except datastore_errors.EntityNotFoundError: + # create a record for this user + euser = Entity('EUser', name='key_' + login) + euser['s_login'] = login + _init_groups(guser, euser) + Put(euser) + return str(euser.key()) + + def authenticate_local(self, session, login, password): + """return EUser eid for the given login/password if this account is + defined in this source, else raise `AuthenticationError` + + two queries are needed since passwords are stored crypted, so we have + to fetch the salt first + """ + args = {'login': login, 'pwd' : password} + if password is not None: + rset = self.syntax_tree_search(session, self._passwd_rqlst, args) + try: + pwd = rset[0][0] + except IndexError: + raise AuthenticationError('bad login') + # passwords are stored using the bytea type, so we get a StringIO + if pwd is not None: + args['pwd'] = crypt_password(password, pwd[:2]) + # get eid from login and (crypted) password + rset = self.syntax_tree_search(session, self._auth_rqlst, args) + try: + return rset[0][0] + except IndexError: + raise AuthenticationError('bad password') + + def syntax_tree_search(self, session, union, args=None, cachekey=None, + varmap=None): + """return result from this source for a rql query (actually from a rql + syntax tree and a solution dictionary mapping each used variable to a + possible type). If cachekey is given, the query necessary to fetch the + results (but not the results themselves) may be cached using this key. + """ + results, description = self.interpreter.interpret(union, args, + session.datastore_get) + return results # XXX description + + def flying_insert(self, table, session, union, args=None, varmap=None): + raise NotImplementedError + + def add_entity(self, session, entity): + """add a new entity to the source""" + # do not delay add_entity as other modifications, new created entity + # needs an eid + entity.put() + + def update_entity(self, session, entity): + """replace an entity in the source""" + gaeentity = entity.to_gae_model() + _mark_modified(session, entity.to_gae_model()) + if gaeentity.kind() == 'EUser': + for asession in self.repo._sessions.itervalues(): + if asession.user.eid == entity.eid: + asession.user.update(dict(gaeentity)) + + def delete_entity(self, session, etype, eid): + """delete an entity from the source""" + # do not delay delete_entity as other modifications to ensure + # consistency + key = Key(eid) + Delete(key) + session.clear_datastore_cache(key) + session.drop_entity_cache(eid) + session.query_data('modifiedentities', {}).pop(eid, None) + + def add_relation(self, session, subject, rtype, object): + """add a relation to the source""" + gaesubj, gaeobj, cards = _rinfo(session, subject, rtype, object) + _radd(session, gaesubj, gaeobj.key(), 's_' + rtype, cards[0]) + _radd(session, gaeobj, gaesubj.key(), 'o_' + rtype, cards[1]) + _clear_related_cache(session, gaesubj, rtype, gaeobj) + + def delete_relation(self, session, subject, rtype, object): + """delete a relation from the source""" + gaesubj, gaeobj, cards = _rinfo(session, subject, rtype, object) + pending = session.query_data('pendingeids', set(), setdefault=True) + if not subject in pending: + _rdel(session, gaesubj, gaeobj.key(), 's_' + rtype, cards[0]) + if not object in pending: + _rdel(session, gaeobj, gaesubj.key(), 'o_' + rtype, cards[1]) + _clear_related_cache(session, gaesubj, rtype, gaeobj) + + # system source interface ################################################# + + def eid_type_source(self, session, eid): + """return a tuple (type, source, extid) for the entity with id """ + try: + key = Key(eid) + except datastore_errors.BadKeyError: + raise UnknownEid(eid) + return key.kind(), 'system', None + + def create_eid(self, session): + return None # let the datastore generating key + + def add_info(self, session, entity, source, extid=None): + """add type and source info for an eid into the system table""" + pass + + def delete_info(self, session, eid, etype, uri, extid): + """delete system information on deletion of an entity by transfering + record from the entities table to the deleted_entities table + """ + pass + + def fti_unindex_entity(self, session, eid): + """remove text content for entity with the given eid from the full text + index + """ + pass + + def fti_index_entity(self, session, entity): + """add text content of a created/modified entity to the full text index + """ + pass +