goa/gaesource.py
changeset 0 b97547f5f1fa
child 1132 96752791c2b6
equal deleted inserted replaced
-1:000000000000 0:b97547f5f1fa
       
     1 """Adapter for google appengine source.
       
     2 
       
     3 :organization: Logilab
       
     4 :copyright: 2008 LOGILAB S.A. (Paris, FRANCE), all rights reserved.
       
     5 :contact: http://www.logilab.fr/ -- mailto:contact@logilab.fr
       
     6 """
       
     7 __docformat__ = "restructuredtext en"
       
     8 
       
     9 from logilab.common.decorators import cached, clear_cache
       
    10 
       
    11 from cubicweb import AuthenticationError, UnknownEid, server
       
    12 from cubicweb.server.sources import AbstractSource, ConnectionWrapper
       
    13 from cubicweb.server.pool import SingleOperation
       
    14 from cubicweb.server.utils import crypt_password
       
    15 from cubicweb.goa.dbinit import set_user_groups
       
    16 from cubicweb.goa.rqlinterpreter import RQLInterpreter
       
    17 
       
    18 from google.appengine.api.datastore import Key, Entity, Get, Put, Delete
       
    19 from google.appengine.api.datastore import Query
       
    20 from google.appengine.api import datastore_errors, users
       
    21     
       
    22 def _init_groups(guser, euser):
       
    23     # set default groups
       
    24     if guser is None:
       
    25         groups = ['guests']
       
    26     else:
       
    27         groups = ['users']
       
    28         if users.is_current_user_admin():
       
    29             groups.append('managers')
       
    30     set_user_groups(euser, groups)
       
    31 
       
    32 def _clear_related_cache(session, gaesubject, rtype, gaeobject):
       
    33     subject, object = str(gaesubject.key()), str(gaeobject.key())
       
    34     for eid, role in ((subject, 'subject'), (object, 'object')):
       
    35         # clear related cache if necessary
       
    36         try:
       
    37             entity = session.entity_cache(eid)
       
    38         except KeyError:
       
    39             pass
       
    40         else:
       
    41             entity.clear_related_cache(rtype, role)
       
    42     if gaesubject.kind() == 'EUser':
       
    43         for asession in session.repo._sessions.itervalues():
       
    44             if asession.user.eid == subject:
       
    45                 asession.user.clear_related_cache(rtype, 'subject')
       
    46     if gaeobject.kind() == 'EUser':
       
    47         for asession in session.repo._sessions.itervalues():
       
    48             if asession.user.eid == object:
       
    49                 asession.user.clear_related_cache(rtype, 'object')
       
    50 
       
    51 def _mark_modified(session, gaeentity):
       
    52     modified = session.query_data('modifiedentities', {}, setdefault=True)
       
    53     modified[str(gaeentity.key())] = gaeentity
       
    54     DatastorePutOp(session)
       
    55 
       
    56 def _rinfo(session, subject, rtype, object):
       
    57     gaesubj = session.datastore_get(subject)
       
    58     gaeobj = session.datastore_get(object)
       
    59     rschema = session.vreg.schema.rschema(rtype)
       
    60     cards = rschema.rproperty(gaesubj.kind(), gaeobj.kind(), 'cardinality')
       
    61     return gaesubj, gaeobj, cards
       
    62 
       
    63 def _radd(session, gaeentity, targetkey, relation, card):
       
    64     if card in '?1':
       
    65         gaeentity[relation] = targetkey
       
    66     else:
       
    67         try:
       
    68             related = gaeentity[relation]
       
    69         except KeyError:
       
    70             related = []
       
    71         else:
       
    72             if related is None:
       
    73                 related = []
       
    74         related.append(targetkey)
       
    75         gaeentity[relation] = related
       
    76     _mark_modified(session, gaeentity)
       
    77 
       
    78 def _rdel(session, gaeentity, targetkey, relation, card):
       
    79     if card in '?1':
       
    80         gaeentity[relation] = None
       
    81     else:
       
    82         related = gaeentity[relation]
       
    83         if related is not None:
       
    84             related = [key for key in related if not key == targetkey]
       
    85             gaeentity[relation] = related or None
       
    86     _mark_modified(session, gaeentity)
       
    87 
       
    88     
       
    89 class DatastorePutOp(SingleOperation):
       
    90     """delayed put of entities to have less datastore write api calls
       
    91 
       
    92     * save all modified entities at precommit (should be the first operation
       
    93       processed, hence the 0 returned by insert_index())
       
    94       
       
    95     * in case others precommit operations modify some entities, resave modified
       
    96       entities at commit. This suppose that no db changes will occurs during
       
    97       commit event but it should be the case.
       
    98     """
       
    99     def insert_index(self):
       
   100         return 0
       
   101 
       
   102     def _put_entities(self):
       
   103         pending = self.session.query_data('pendingeids', ())
       
   104         modified = self.session.query_data('modifiedentities', {})
       
   105         for eid, gaeentity in modified.iteritems():
       
   106             assert not eid in pending
       
   107             Put(gaeentity)
       
   108         modified.clear()
       
   109         
       
   110     def commit_event(self):
       
   111         self._put_entities()
       
   112         
       
   113     def precommit_event(self):
       
   114         self._put_entities()
       
   115 
       
   116 
       
   117 class GAESource(AbstractSource):
       
   118     """adapter for a system source on top of google appengine datastore"""
       
   119 
       
   120     passwd_rql = "Any P WHERE X is EUser, X login %(login)s, X upassword P"
       
   121     auth_rql = "Any X WHERE X is EUser, X login %(login)s, X upassword %(pwd)s"
       
   122     _sols = ({'X': 'EUser', 'P': 'Password'},)
       
   123     
       
   124     options = ()
       
   125     
       
   126     def __init__(self, repo, appschema, source_config, *args, **kwargs):
       
   127         AbstractSource.__init__(self, repo, appschema, source_config,
       
   128                                 *args, **kwargs)
       
   129         if repo.config['use-google-auth']:
       
   130             self.info('using google authentication service')
       
   131             self.authenticate = self.authenticate_gauth
       
   132         else:
       
   133             self.authenticate = self.authenticate_local
       
   134             
       
   135     def reset_caches(self):
       
   136         """method called during test to reset potential source caches"""
       
   137         pass
       
   138     
       
   139     def init_creating(self):
       
   140         pass
       
   141 
       
   142     def init(self):
       
   143         # XXX unregister unsupported hooks
       
   144         from cubicweb.server.hooks import sync_owner_after_add_composite_relation
       
   145         self.repo.hm.unregister_hook(sync_owner_after_add_composite_relation,
       
   146                                      'after_add_relation', '')
       
   147 
       
   148     def get_connection(self):
       
   149         return ConnectionWrapper()
       
   150     
       
   151     # ISource interface #######################################################
       
   152 
       
   153     def compile_rql(self, rql):
       
   154         rqlst = self.repo.querier._rqlhelper.parse(rql)
       
   155         rqlst.restricted_vars = ()
       
   156         rqlst.children[0].solutions = self._sols
       
   157         return rqlst
       
   158     
       
   159     def set_schema(self, schema):
       
   160         """set the application'schema"""
       
   161         self.interpreter = RQLInterpreter(schema)
       
   162         self.schema = schema
       
   163         if 'EUser' in schema and not self.repo.config['use-google-auth']:
       
   164             # rql syntax trees used to authenticate users
       
   165             self._passwd_rqlst = self.compile_rql(self.passwd_rql)
       
   166             self._auth_rqlst = self.compile_rql(self.auth_rql)
       
   167                 
       
   168     def support_entity(self, etype, write=False):
       
   169         """return true if the given entity's type is handled by this adapter
       
   170         if write is true, return true only if it's a RW support
       
   171         """
       
   172         return True
       
   173     
       
   174     def support_relation(self, rtype, write=False):
       
   175         """return true if the given relation's type is handled by this adapter
       
   176         if write is true, return true only if it's a RW support
       
   177         """
       
   178         return True
       
   179 
       
   180     def authenticate_gauth(self, session, login, password):
       
   181         guser = users.get_current_user()
       
   182         # allowing or not anonymous connection should be done in the app.yaml
       
   183         # file, suppose it's authorized if we are there
       
   184         if guser is None:
       
   185             login = u'anonymous'
       
   186         else:
       
   187             login = unicode(guser.nickname())
       
   188         # XXX http://code.google.com/appengine/docs/users/userobjects.html
       
   189         # use a reference property to automatically work with email address
       
   190         # changes after the propagation feature is implemented
       
   191         key = Key.from_path('EUser', 'key_' + login, parent=None)
       
   192         try:
       
   193             euser = session.datastore_get(key)
       
   194             # XXX fix user. Required until we find a better way to fix broken records
       
   195             if not euser.get('s_in_group'):
       
   196                 _init_groups(guser, euser)
       
   197                 Put(euser)
       
   198             return str(key)
       
   199         except datastore_errors.EntityNotFoundError:
       
   200             # create a record for this user
       
   201             euser = Entity('EUser', name='key_' + login)
       
   202             euser['s_login'] = login
       
   203             _init_groups(guser, euser)
       
   204             Put(euser)
       
   205             return str(euser.key())
       
   206         
       
   207     def authenticate_local(self, session, login, password):
       
   208         """return EUser eid for the given login/password if this account is
       
   209         defined in this source, else raise `AuthenticationError`
       
   210 
       
   211         two queries are needed since passwords are stored crypted, so we have
       
   212         to fetch the salt first
       
   213         """
       
   214         args = {'login': login, 'pwd' : password}
       
   215         if password is not None:
       
   216             rset = self.syntax_tree_search(session, self._passwd_rqlst, args)
       
   217             try:
       
   218                 pwd = rset[0][0]
       
   219             except IndexError:
       
   220                 raise AuthenticationError('bad login')
       
   221             # passwords are stored using the bytea type, so we get a StringIO
       
   222             if pwd is not None:
       
   223                 args['pwd'] = crypt_password(password, pwd[:2])
       
   224         # get eid from login and (crypted) password
       
   225         rset = self.syntax_tree_search(session, self._auth_rqlst, args)
       
   226         try:
       
   227             return rset[0][0]
       
   228         except IndexError:
       
   229             raise AuthenticationError('bad password')
       
   230     
       
   231     def syntax_tree_search(self, session, union, args=None, cachekey=None, 
       
   232                            varmap=None):
       
   233         """return result from this source for a rql query (actually from a rql
       
   234         syntax tree and a solution dictionary mapping each used variable to a
       
   235         possible type). If cachekey is given, the query necessary to fetch the
       
   236         results (but not the results themselves) may be cached using this key.
       
   237         """
       
   238         results, description = self.interpreter.interpret(union, args,
       
   239                                                           session.datastore_get)
       
   240         return results # XXX description
       
   241                 
       
   242     def flying_insert(self, table, session, union, args=None, varmap=None):
       
   243         raise NotImplementedError
       
   244     
       
   245     def add_entity(self, session, entity):
       
   246         """add a new entity to the source"""
       
   247         # do not delay add_entity as other modifications, new created entity
       
   248         # needs an eid
       
   249         entity.put()
       
   250         
       
   251     def update_entity(self, session, entity):
       
   252         """replace an entity in the source"""
       
   253         gaeentity = entity.to_gae_model()
       
   254         _mark_modified(session, entity.to_gae_model())
       
   255         if gaeentity.kind() == 'EUser':
       
   256             for asession in self.repo._sessions.itervalues():
       
   257                 if asession.user.eid == entity.eid:
       
   258                     asession.user.update(dict(gaeentity))
       
   259                 
       
   260     def delete_entity(self, session, etype, eid):
       
   261         """delete an entity from the source"""
       
   262         # do not delay delete_entity as other modifications to ensure
       
   263         # consistency
       
   264         key = Key(eid)
       
   265         Delete(key)
       
   266         session.clear_datastore_cache(key)
       
   267         session.drop_entity_cache(eid)
       
   268         session.query_data('modifiedentities', {}).pop(eid, None)
       
   269 
       
   270     def add_relation(self, session, subject, rtype, object):
       
   271         """add a relation to the source"""
       
   272         gaesubj, gaeobj, cards = _rinfo(session, subject, rtype, object)
       
   273         _radd(session, gaesubj, gaeobj.key(), 's_' + rtype, cards[0])
       
   274         _radd(session, gaeobj, gaesubj.key(), 'o_' + rtype, cards[1])
       
   275         _clear_related_cache(session, gaesubj, rtype, gaeobj)
       
   276             
       
   277     def delete_relation(self, session, subject, rtype, object):
       
   278         """delete a relation from the source"""
       
   279         gaesubj, gaeobj, cards = _rinfo(session, subject, rtype, object)
       
   280         pending = session.query_data('pendingeids', set(), setdefault=True)
       
   281         if not subject in pending:
       
   282             _rdel(session, gaesubj, gaeobj.key(), 's_' + rtype, cards[0])
       
   283         if not object in pending:
       
   284             _rdel(session, gaeobj, gaesubj.key(), 'o_' + rtype, cards[1])
       
   285         _clear_related_cache(session, gaesubj, rtype, gaeobj)
       
   286         
       
   287     # system source interface #################################################
       
   288 
       
   289     def eid_type_source(self, session, eid):
       
   290         """return a tuple (type, source, extid) for the entity with id <eid>"""
       
   291         try:
       
   292             key = Key(eid)
       
   293         except datastore_errors.BadKeyError:
       
   294             raise UnknownEid(eid)
       
   295         return key.kind(), 'system', None
       
   296     
       
   297     def create_eid(self, session):
       
   298         return None # let the datastore generating key
       
   299 
       
   300     def add_info(self, session, entity, source, extid=None):
       
   301         """add type and source info for an eid into the system table"""
       
   302         pass
       
   303 
       
   304     def delete_info(self, session, eid, etype, uri, extid):
       
   305         """delete system information on deletion of an entity by transfering
       
   306         record from the entities table to the deleted_entities table
       
   307         """
       
   308         pass
       
   309         
       
   310     def fti_unindex_entity(self, session, eid):
       
   311         """remove text content for entity with the given eid from the full text
       
   312         index
       
   313         """
       
   314         pass
       
   315         
       
   316     def fti_index_entity(self, session, entity):
       
   317         """add text content of a created/modified entity to the full text index
       
   318         """
       
   319         pass
       
   320