|
1 """Adapter for google appengine source. |
|
2 |
|
3 :organization: Logilab |
|
4 :copyright: 2008 LOGILAB S.A. (Paris, FRANCE), all rights reserved. |
|
5 :contact: http://www.logilab.fr/ -- mailto:contact@logilab.fr |
|
6 """ |
|
7 __docformat__ = "restructuredtext en" |
|
8 |
|
9 from logilab.common.decorators import cached, clear_cache |
|
10 |
|
11 from cubicweb import AuthenticationError, UnknownEid, server |
|
12 from cubicweb.server.sources import AbstractSource, ConnectionWrapper |
|
13 from cubicweb.server.pool import SingleOperation |
|
14 from cubicweb.server.utils import crypt_password |
|
15 from cubicweb.goa.dbinit import set_user_groups |
|
16 from cubicweb.goa.rqlinterpreter import RQLInterpreter |
|
17 |
|
18 from google.appengine.api.datastore import Key, Entity, Get, Put, Delete |
|
19 from google.appengine.api.datastore import Query |
|
20 from google.appengine.api import datastore_errors, users |
|
21 |
|
22 def _init_groups(guser, euser): |
|
23 # set default groups |
|
24 if guser is None: |
|
25 groups = ['guests'] |
|
26 else: |
|
27 groups = ['users'] |
|
28 if users.is_current_user_admin(): |
|
29 groups.append('managers') |
|
30 set_user_groups(euser, groups) |
|
31 |
|
32 def _clear_related_cache(session, gaesubject, rtype, gaeobject): |
|
33 subject, object = str(gaesubject.key()), str(gaeobject.key()) |
|
34 for eid, role in ((subject, 'subject'), (object, 'object')): |
|
35 # clear related cache if necessary |
|
36 try: |
|
37 entity = session.entity_cache(eid) |
|
38 except KeyError: |
|
39 pass |
|
40 else: |
|
41 entity.clear_related_cache(rtype, role) |
|
42 if gaesubject.kind() == 'EUser': |
|
43 for asession in session.repo._sessions.itervalues(): |
|
44 if asession.user.eid == subject: |
|
45 asession.user.clear_related_cache(rtype, 'subject') |
|
46 if gaeobject.kind() == 'EUser': |
|
47 for asession in session.repo._sessions.itervalues(): |
|
48 if asession.user.eid == object: |
|
49 asession.user.clear_related_cache(rtype, 'object') |
|
50 |
|
51 def _mark_modified(session, gaeentity): |
|
52 modified = session.query_data('modifiedentities', {}, setdefault=True) |
|
53 modified[str(gaeentity.key())] = gaeentity |
|
54 DatastorePutOp(session) |
|
55 |
|
56 def _rinfo(session, subject, rtype, object): |
|
57 gaesubj = session.datastore_get(subject) |
|
58 gaeobj = session.datastore_get(object) |
|
59 rschema = session.vreg.schema.rschema(rtype) |
|
60 cards = rschema.rproperty(gaesubj.kind(), gaeobj.kind(), 'cardinality') |
|
61 return gaesubj, gaeobj, cards |
|
62 |
|
63 def _radd(session, gaeentity, targetkey, relation, card): |
|
64 if card in '?1': |
|
65 gaeentity[relation] = targetkey |
|
66 else: |
|
67 try: |
|
68 related = gaeentity[relation] |
|
69 except KeyError: |
|
70 related = [] |
|
71 else: |
|
72 if related is None: |
|
73 related = [] |
|
74 related.append(targetkey) |
|
75 gaeentity[relation] = related |
|
76 _mark_modified(session, gaeentity) |
|
77 |
|
78 def _rdel(session, gaeentity, targetkey, relation, card): |
|
79 if card in '?1': |
|
80 gaeentity[relation] = None |
|
81 else: |
|
82 related = gaeentity[relation] |
|
83 if related is not None: |
|
84 related = [key for key in related if not key == targetkey] |
|
85 gaeentity[relation] = related or None |
|
86 _mark_modified(session, gaeentity) |
|
87 |
|
88 |
|
89 class DatastorePutOp(SingleOperation): |
|
90 """delayed put of entities to have less datastore write api calls |
|
91 |
|
92 * save all modified entities at precommit (should be the first operation |
|
93 processed, hence the 0 returned by insert_index()) |
|
94 |
|
95 * in case others precommit operations modify some entities, resave modified |
|
96 entities at commit. This suppose that no db changes will occurs during |
|
97 commit event but it should be the case. |
|
98 """ |
|
99 def insert_index(self): |
|
100 return 0 |
|
101 |
|
102 def _put_entities(self): |
|
103 pending = self.session.query_data('pendingeids', ()) |
|
104 modified = self.session.query_data('modifiedentities', {}) |
|
105 for eid, gaeentity in modified.iteritems(): |
|
106 assert not eid in pending |
|
107 Put(gaeentity) |
|
108 modified.clear() |
|
109 |
|
110 def commit_event(self): |
|
111 self._put_entities() |
|
112 |
|
113 def precommit_event(self): |
|
114 self._put_entities() |
|
115 |
|
116 |
|
117 class GAESource(AbstractSource): |
|
118 """adapter for a system source on top of google appengine datastore""" |
|
119 |
|
120 passwd_rql = "Any P WHERE X is EUser, X login %(login)s, X upassword P" |
|
121 auth_rql = "Any X WHERE X is EUser, X login %(login)s, X upassword %(pwd)s" |
|
122 _sols = ({'X': 'EUser', 'P': 'Password'},) |
|
123 |
|
124 options = () |
|
125 |
|
126 def __init__(self, repo, appschema, source_config, *args, **kwargs): |
|
127 AbstractSource.__init__(self, repo, appschema, source_config, |
|
128 *args, **kwargs) |
|
129 if repo.config['use-google-auth']: |
|
130 self.info('using google authentication service') |
|
131 self.authenticate = self.authenticate_gauth |
|
132 else: |
|
133 self.authenticate = self.authenticate_local |
|
134 |
|
135 def reset_caches(self): |
|
136 """method called during test to reset potential source caches""" |
|
137 pass |
|
138 |
|
139 def init_creating(self): |
|
140 pass |
|
141 |
|
142 def init(self): |
|
143 # XXX unregister unsupported hooks |
|
144 from cubicweb.server.hooks import sync_owner_after_add_composite_relation |
|
145 self.repo.hm.unregister_hook(sync_owner_after_add_composite_relation, |
|
146 'after_add_relation', '') |
|
147 |
|
148 def get_connection(self): |
|
149 return ConnectionWrapper() |
|
150 |
|
151 # ISource interface ####################################################### |
|
152 |
|
153 def compile_rql(self, rql): |
|
154 rqlst = self.repo.querier._rqlhelper.parse(rql) |
|
155 rqlst.restricted_vars = () |
|
156 rqlst.children[0].solutions = self._sols |
|
157 return rqlst |
|
158 |
|
159 def set_schema(self, schema): |
|
160 """set the application'schema""" |
|
161 self.interpreter = RQLInterpreter(schema) |
|
162 self.schema = schema |
|
163 if 'EUser' in schema and not self.repo.config['use-google-auth']: |
|
164 # rql syntax trees used to authenticate users |
|
165 self._passwd_rqlst = self.compile_rql(self.passwd_rql) |
|
166 self._auth_rqlst = self.compile_rql(self.auth_rql) |
|
167 |
|
168 def support_entity(self, etype, write=False): |
|
169 """return true if the given entity's type is handled by this adapter |
|
170 if write is true, return true only if it's a RW support |
|
171 """ |
|
172 return True |
|
173 |
|
174 def support_relation(self, rtype, write=False): |
|
175 """return true if the given relation's type is handled by this adapter |
|
176 if write is true, return true only if it's a RW support |
|
177 """ |
|
178 return True |
|
179 |
|
180 def authenticate_gauth(self, session, login, password): |
|
181 guser = users.get_current_user() |
|
182 # allowing or not anonymous connection should be done in the app.yaml |
|
183 # file, suppose it's authorized if we are there |
|
184 if guser is None: |
|
185 login = u'anonymous' |
|
186 else: |
|
187 login = unicode(guser.nickname()) |
|
188 # XXX http://code.google.com/appengine/docs/users/userobjects.html |
|
189 # use a reference property to automatically work with email address |
|
190 # changes after the propagation feature is implemented |
|
191 key = Key.from_path('EUser', 'key_' + login, parent=None) |
|
192 try: |
|
193 euser = session.datastore_get(key) |
|
194 # XXX fix user. Required until we find a better way to fix broken records |
|
195 if not euser.get('s_in_group'): |
|
196 _init_groups(guser, euser) |
|
197 Put(euser) |
|
198 return str(key) |
|
199 except datastore_errors.EntityNotFoundError: |
|
200 # create a record for this user |
|
201 euser = Entity('EUser', name='key_' + login) |
|
202 euser['s_login'] = login |
|
203 _init_groups(guser, euser) |
|
204 Put(euser) |
|
205 return str(euser.key()) |
|
206 |
|
207 def authenticate_local(self, session, login, password): |
|
208 """return EUser eid for the given login/password if this account is |
|
209 defined in this source, else raise `AuthenticationError` |
|
210 |
|
211 two queries are needed since passwords are stored crypted, so we have |
|
212 to fetch the salt first |
|
213 """ |
|
214 args = {'login': login, 'pwd' : password} |
|
215 if password is not None: |
|
216 rset = self.syntax_tree_search(session, self._passwd_rqlst, args) |
|
217 try: |
|
218 pwd = rset[0][0] |
|
219 except IndexError: |
|
220 raise AuthenticationError('bad login') |
|
221 # passwords are stored using the bytea type, so we get a StringIO |
|
222 if pwd is not None: |
|
223 args['pwd'] = crypt_password(password, pwd[:2]) |
|
224 # get eid from login and (crypted) password |
|
225 rset = self.syntax_tree_search(session, self._auth_rqlst, args) |
|
226 try: |
|
227 return rset[0][0] |
|
228 except IndexError: |
|
229 raise AuthenticationError('bad password') |
|
230 |
|
231 def syntax_tree_search(self, session, union, args=None, cachekey=None, |
|
232 varmap=None): |
|
233 """return result from this source for a rql query (actually from a rql |
|
234 syntax tree and a solution dictionary mapping each used variable to a |
|
235 possible type). If cachekey is given, the query necessary to fetch the |
|
236 results (but not the results themselves) may be cached using this key. |
|
237 """ |
|
238 results, description = self.interpreter.interpret(union, args, |
|
239 session.datastore_get) |
|
240 return results # XXX description |
|
241 |
|
242 def flying_insert(self, table, session, union, args=None, varmap=None): |
|
243 raise NotImplementedError |
|
244 |
|
245 def add_entity(self, session, entity): |
|
246 """add a new entity to the source""" |
|
247 # do not delay add_entity as other modifications, new created entity |
|
248 # needs an eid |
|
249 entity.put() |
|
250 |
|
251 def update_entity(self, session, entity): |
|
252 """replace an entity in the source""" |
|
253 gaeentity = entity.to_gae_model() |
|
254 _mark_modified(session, entity.to_gae_model()) |
|
255 if gaeentity.kind() == 'EUser': |
|
256 for asession in self.repo._sessions.itervalues(): |
|
257 if asession.user.eid == entity.eid: |
|
258 asession.user.update(dict(gaeentity)) |
|
259 |
|
260 def delete_entity(self, session, etype, eid): |
|
261 """delete an entity from the source""" |
|
262 # do not delay delete_entity as other modifications to ensure |
|
263 # consistency |
|
264 key = Key(eid) |
|
265 Delete(key) |
|
266 session.clear_datastore_cache(key) |
|
267 session.drop_entity_cache(eid) |
|
268 session.query_data('modifiedentities', {}).pop(eid, None) |
|
269 |
|
270 def add_relation(self, session, subject, rtype, object): |
|
271 """add a relation to the source""" |
|
272 gaesubj, gaeobj, cards = _rinfo(session, subject, rtype, object) |
|
273 _radd(session, gaesubj, gaeobj.key(), 's_' + rtype, cards[0]) |
|
274 _radd(session, gaeobj, gaesubj.key(), 'o_' + rtype, cards[1]) |
|
275 _clear_related_cache(session, gaesubj, rtype, gaeobj) |
|
276 |
|
277 def delete_relation(self, session, subject, rtype, object): |
|
278 """delete a relation from the source""" |
|
279 gaesubj, gaeobj, cards = _rinfo(session, subject, rtype, object) |
|
280 pending = session.query_data('pendingeids', set(), setdefault=True) |
|
281 if not subject in pending: |
|
282 _rdel(session, gaesubj, gaeobj.key(), 's_' + rtype, cards[0]) |
|
283 if not object in pending: |
|
284 _rdel(session, gaeobj, gaesubj.key(), 'o_' + rtype, cards[1]) |
|
285 _clear_related_cache(session, gaesubj, rtype, gaeobj) |
|
286 |
|
287 # system source interface ################################################# |
|
288 |
|
289 def eid_type_source(self, session, eid): |
|
290 """return a tuple (type, source, extid) for the entity with id <eid>""" |
|
291 try: |
|
292 key = Key(eid) |
|
293 except datastore_errors.BadKeyError: |
|
294 raise UnknownEid(eid) |
|
295 return key.kind(), 'system', None |
|
296 |
|
297 def create_eid(self, session): |
|
298 return None # let the datastore generating key |
|
299 |
|
300 def add_info(self, session, entity, source, extid=None): |
|
301 """add type and source info for an eid into the system table""" |
|
302 pass |
|
303 |
|
304 def delete_info(self, session, eid, etype, uri, extid): |
|
305 """delete system information on deletion of an entity by transfering |
|
306 record from the entities table to the deleted_entities table |
|
307 """ |
|
308 pass |
|
309 |
|
310 def fti_unindex_entity(self, session, eid): |
|
311 """remove text content for entity with the given eid from the full text |
|
312 index |
|
313 """ |
|
314 pass |
|
315 |
|
316 def fti_index_entity(self, session, entity): |
|
317 """add text content of a created/modified entity to the full text index |
|
318 """ |
|
319 pass |
|
320 |