169 DEFAULT_SECURITY = object() # evaluated to true by design |
168 DEFAULT_SECURITY = object() # evaluated to true by design |
170 |
169 |
171 class SessionClosedError(RuntimeError): |
170 class SessionClosedError(RuntimeError): |
172 pass |
171 pass |
173 |
172 |
174 class CnxSetTracker(object): |
|
175 """Keep track of which connection use which cnxset. |
|
176 |
|
177 There should be one of these objects per session (including internal sessions). |
|
178 |
|
179 Session objects are responsible for creating their CnxSetTracker object. |
|
180 |
|
181 Connections should use the :meth:`record` and :meth:`forget` to inform the |
|
182 tracker of cnxsets they have acquired. |
|
183 |
|
184 .. automethod:: cubicweb.server.session.CnxSetTracker.record |
|
185 .. automethod:: cubicweb.server.session.CnxSetTracker.forget |
|
186 |
|
187 Sessions use the :meth:`close` and :meth:`wait` methods when closing. |
|
188 |
|
189 .. automethod:: cubicweb.server.session.CnxSetTracker.close |
|
190 .. automethod:: cubicweb.server.session.CnxSetTracker.wait |
|
191 |
|
192 This object itself is threadsafe. It also requires caller to acquired its |
|
193 lock in some situation. |
|
194 """ |
|
195 |
|
196 def __init__(self): |
|
197 self._active = True |
|
198 self._condition = threading.Condition() |
|
199 self._record = {} |
|
200 |
|
201 def __enter__(self): |
|
202 return self._condition.__enter__() |
|
203 |
|
204 def __exit__(self, *args): |
|
205 return self._condition.__exit__(*args) |
|
206 |
|
207 def record(self, cnxid, cnxset): |
|
208 """Inform the tracker that a cnxid has acquired a cnxset |
|
209 |
|
210 This method is to be used by Connection objects. |
|
211 |
|
212 This method fails when: |
|
213 - The cnxid already has a recorded cnxset. |
|
214 - The tracker is not active anymore. |
|
215 |
|
216 Notes about the caller: |
|
217 (1) It is responsible for retrieving a cnxset. |
|
218 (2) It must be prepared to release the cnxset if the |
|
219 `cnxsettracker.forget` call fails. |
|
220 (3) It should acquire the tracker lock until the very end of the operation. |
|
221 (4) However it must only lock the CnxSetTracker object after having |
|
222 retrieved the cnxset to prevent deadlock. |
|
223 |
|
224 A typical usage look like:: |
|
225 |
|
226 cnxset = repo._get_cnxset() # (1) |
|
227 try: |
|
228 with cnxset_tracker: # (3) and (4) |
|
229 cnxset_tracker.record(caller.id, cnxset) |
|
230 # (3') operation ends when caller is in expected state only |
|
231 caller.cnxset = cnxset |
|
232 except Exception: |
|
233 repo._free_cnxset(cnxset) # (2) |
|
234 raise |
|
235 """ |
|
236 # dubious since the caller is supposed to have acquired it anyway. |
|
237 with self._condition: |
|
238 if not self._active: |
|
239 raise SessionClosedError('Closed') |
|
240 old = self._record.get(cnxid) |
|
241 if old is not None: |
|
242 raise ValueError('connection "%s" already has a cnx_set (%r)' |
|
243 % (cnxid, old)) |
|
244 self._record[cnxid] = cnxset |
|
245 |
|
246 def forget(self, cnxid, cnxset): |
|
247 """Inform the tracker that a cnxid have release a cnxset |
|
248 |
|
249 This methode is to be used by Connection object. |
|
250 |
|
251 This method fails when: |
|
252 - The cnxset for the cnxid does not match the recorded one. |
|
253 |
|
254 Notes about the caller: |
|
255 (1) It is responsible for releasing the cnxset. |
|
256 (2) It should acquire the tracker lock during the operation to ensure |
|
257 the internal tracker state is always accurate regarding its own state. |
|
258 |
|
259 A typical usage look like:: |
|
260 |
|
261 cnxset = caller.cnxset |
|
262 try: |
|
263 with cnxset_tracker: |
|
264 # (2) you can not have caller.cnxset out of sync with |
|
265 # cnxset_tracker state while unlocked |
|
266 caller.cnxset = None |
|
267 cnxset_tracker.forget(caller.id, cnxset) |
|
268 finally: |
|
269 cnxset = repo._free_cnxset(cnxset) # (1) |
|
270 """ |
|
271 with self._condition: |
|
272 old = self._record.get(cnxid, None) |
|
273 if old is not cnxset: |
|
274 raise ValueError('recorded cnxset for "%s" mismatch: %r != %r' |
|
275 % (cnxid, old, cnxset)) |
|
276 self._record.pop(cnxid) |
|
277 self._condition.notify_all() |
|
278 |
|
279 def close(self): |
|
280 """Marks the tracker as inactive. |
|
281 |
|
282 This method is to be used by Session objects. |
|
283 |
|
284 An inactive tracker does not accept new records anymore. |
|
285 """ |
|
286 with self._condition: |
|
287 self._active = False |
|
288 |
|
289 def wait(self, timeout=10): |
|
290 """Wait for all recorded cnxsets to be released |
|
291 |
|
292 This method is to be used by Session objects. |
|
293 |
|
294 Returns a tuple of connection ids that remain open. |
|
295 """ |
|
296 with self._condition: |
|
297 if self._active: |
|
298 raise RuntimeError('Cannot wait on active tracker.' |
|
299 ' Call tracker.close() first') |
|
300 while self._record and timeout > 0: |
|
301 start = time() |
|
302 self._condition.wait(timeout) |
|
303 timeout -= time() - start |
|
304 return tuple(self._record) |
|
305 |
|
306 |
|
307 def _with_cnx_set(func): |
|
308 """decorator for Connection method that ensure they run with a cnxset """ |
|
309 @functools.wraps(func) |
|
310 def wrapper(cnx, *args, **kwargs): |
|
311 with cnx.ensure_cnx_set: |
|
312 return func(cnx, *args, **kwargs) |
|
313 return wrapper |
|
314 |
173 |
315 def _open_only(func): |
174 def _open_only(func): |
316 """decorator for Connection method that check it is open""" |
175 """decorator for Connection method that check it is open""" |
317 @functools.wraps(func) |
176 @functools.wraps(func) |
318 def check_open(cnx, *args, **kwargs): |
177 def check_open(cnx, *args, **kwargs): |
400 self.connectionid = '%s-%s' % (session.sessionid, uuid4().hex) |
258 self.connectionid = '%s-%s' % (session.sessionid, uuid4().hex) |
401 self.session = session |
259 self.session = session |
402 self.sessionid = session.sessionid |
260 self.sessionid = session.sessionid |
403 #: reentrance handling |
261 #: reentrance handling |
404 self.ctx_count = 0 |
262 self.ctx_count = 0 |
405 #: count the number of entry in a context needing a cnxset |
|
406 self._cnxset_count = 0 |
|
407 #: Boolean for compat with the older explicite set_cnxset/free_cnx API |
|
408 #: When a call set_cnxset is done, no automatic freeing will be done |
|
409 #: until free_cnx is called. |
|
410 self._auto_free_cnx_set = True |
|
411 |
263 |
412 #: server.Repository object |
264 #: server.Repository object |
413 self.repo = session.repo |
265 self.repo = session.repo |
414 self.vreg = self.repo.vreg |
266 self.vreg = self.repo.vreg |
415 self._execute = self.repo.querier.execute |
267 self._execute = self.repo.querier.execute |
416 |
268 |
417 # other session utility |
269 # other session utility |
418 self._session_timestamp = session._timestamp |
270 self._session_timestamp = session._timestamp |
419 |
271 |
420 #: connection set used to execute queries on sources |
|
421 self._cnxset = None |
|
422 #: CnxSetTracker used to report cnxset usage |
|
423 self._cnxset_tracker = CnxSetTracker() |
|
424 # internal (root) session |
272 # internal (root) session |
425 self.is_internal_session = isinstance(session.user, InternalManager) |
273 self.is_internal_session = isinstance(session.user, InternalManager) |
426 |
274 |
427 #: dict containing arbitrary data cleared at the end of the transaction |
275 #: dict containing arbitrary data cleared at the end of the transaction |
428 self.transaction_data = {} |
276 self.transaction_data = {} |
602 self.commit_state = None |
453 self.commit_state = None |
603 self.pruned_hooks_cache = {} |
454 self.pruned_hooks_cache = {} |
604 self.local_perm_cache.clear() |
455 self.local_perm_cache.clear() |
605 self.rewriter = RQLRewriter(self) |
456 self.rewriter = RQLRewriter(self) |
606 |
457 |
607 # Connection Set Management ############################################### |
|
608 @property |
|
609 @_open_only |
|
610 def cnxset(self): |
|
611 return self._cnxset |
|
612 |
|
613 @cnxset.setter |
|
614 @_open_only |
|
615 def cnxset(self, new_cnxset): |
|
616 with self._cnxset_tracker: |
|
617 old_cnxset = self._cnxset |
|
618 if new_cnxset is old_cnxset: |
|
619 return #nothing to do |
|
620 if old_cnxset is not None: |
|
621 old_cnxset.rollback() |
|
622 self._cnxset = None |
|
623 self.ctx_count -= 1 |
|
624 self._cnxset_tracker.forget(self.connectionid, old_cnxset) |
|
625 if new_cnxset is not None: |
|
626 self._cnxset_tracker.record(self.connectionid, new_cnxset) |
|
627 self._cnxset = new_cnxset |
|
628 self.ctx_count += 1 |
|
629 |
|
630 @_open_only |
|
631 def _set_cnxset(self): |
|
632 """the connection need a connections set to execute some queries""" |
|
633 if self.cnxset is None: |
|
634 cnxset = self.repo._get_cnxset() |
|
635 try: |
|
636 self.cnxset = cnxset |
|
637 except: |
|
638 self.repo._free_cnxset(cnxset) |
|
639 raise |
|
640 return self.cnxset |
|
641 |
|
642 @_open_only |
|
643 def _free_cnxset(self, ignoremode=False): |
|
644 """the connection is no longer using its connections set, at least for some time""" |
|
645 # cnxset may be none if no operation has been done since last commit |
|
646 # or rollback |
|
647 cnxset = self.cnxset |
|
648 if cnxset is not None and (ignoremode or self.mode == 'read'): |
|
649 assert self._cnxset_count == 0 |
|
650 try: |
|
651 self.cnxset = None |
|
652 finally: |
|
653 cnxset.cnxset_freed() |
|
654 self.repo._free_cnxset(cnxset) |
|
655 |
|
656 @deprecated('[3.19] cnxset are automatically managed now.' |
458 @deprecated('[3.19] cnxset are automatically managed now.' |
657 ' stop using explicit set and free.') |
459 ' stop using explicit set and free.') |
658 def set_cnxset(self): |
460 def set_cnxset(self): |
659 self._auto_free_cnx_set = False |
461 pass |
660 return self._set_cnxset() |
|
661 |
462 |
662 @deprecated('[3.19] cnxset are automatically managed now.' |
463 @deprecated('[3.19] cnxset are automatically managed now.' |
663 ' stop using explicit set and free.') |
464 ' stop using explicit set and free.') |
664 def free_cnxset(self, ignoremode=False): |
465 def free_cnxset(self, ignoremode=False): |
665 self._auto_free_cnx_set = True |
466 pass |
666 return self._free_cnxset(ignoremode=ignoremode) |
|
667 |
|
668 |
467 |
669 @property |
468 @property |
670 @contextmanager |
469 @contextmanager |
671 @_open_only |
470 @_open_only |
|
471 @deprecated('[3.21] a cnxset is automatically set on __enter__ call now.' |
|
472 ' stop using .ensure_cnx_set') |
672 def ensure_cnx_set(self): |
473 def ensure_cnx_set(self): |
673 assert self._cnxset_count >= 0 |
474 yield |
674 if self._cnxset_count == 0: |
|
675 self._set_cnxset() |
|
676 try: |
|
677 self._cnxset_count += 1 |
|
678 yield |
|
679 finally: |
|
680 self._cnxset_count = max(self._cnxset_count - 1, 0) |
|
681 if self._cnxset_count == 0 and self._auto_free_cnx_set: |
|
682 self._free_cnxset() |
|
683 |
|
684 |
475 |
685 # Entity cache management ################################################# |
476 # Entity cache management ################################################# |
686 # |
477 # |
687 # The connection entity cache as held in cnx.transaction_data is removed at the |
478 # The connection entity cache as held in cnx.transaction_data is removed at the |
688 # end of the connection (commit and rollback) |
479 # end of the connection (commit and rollback) |
990 @_open_only |
781 @_open_only |
991 def source_defs(self): |
782 def source_defs(self): |
992 return self.repo.source_defs() |
783 return self.repo.source_defs() |
993 |
784 |
994 @deprecated('[3.19] use .entity_metas(eid) instead') |
785 @deprecated('[3.19] use .entity_metas(eid) instead') |
995 @_with_cnx_set |
|
996 @_open_only |
786 @_open_only |
997 def describe(self, eid, asdict=False): |
787 def describe(self, eid, asdict=False): |
998 """return a tuple (type, sourceuri, extid) for the entity with id <eid>""" |
788 """return a tuple (type, sourceuri, extid) for the entity with id <eid>""" |
999 etype, extid, source = self.repo.type_and_source_from_eid(eid, self) |
789 etype, extid, source = self.repo.type_and_source_from_eid(eid, self) |
1000 metas = {'type': etype, 'source': source, 'extid': extid} |
790 metas = {'type': etype, 'source': source, 'extid': extid} |
1001 if asdict: |
791 if asdict: |
1002 metas['asource'] = metas['source'] # XXX pre 3.19 client compat |
792 metas['asource'] = metas['source'] # XXX pre 3.19 client compat |
1003 return metas |
793 return metas |
1004 return etype, source, extid |
794 return etype, source, extid |
1005 |
795 |
1006 @_with_cnx_set |
|
1007 @_open_only |
796 @_open_only |
1008 def entity_metas(self, eid): |
797 def entity_metas(self, eid): |
1009 """return a tuple (type, sourceuri, extid) for the entity with id <eid>""" |
798 """return a tuple (type, sourceuri, extid) for the entity with id <eid>""" |
1010 etype, extid, source = self.repo.type_and_source_from_eid(eid, self) |
799 etype, extid, source = self.repo.type_and_source_from_eid(eid, self) |
1011 return {'type': etype, 'source': source, 'extid': extid} |
800 return {'type': etype, 'source': source, 'extid': extid} |
1012 |
801 |
1013 # core method ############################################################# |
802 # core method ############################################################# |
1014 |
803 |
1015 @_with_cnx_set |
|
1016 @_open_only |
804 @_open_only |
1017 def execute(self, rql, kwargs=None, build_descr=True): |
805 def execute(self, rql, kwargs=None, build_descr=True): |
1018 """db-api like method directly linked to the querier execute method. |
806 """db-api like method directly linked to the querier execute method. |
1019 |
807 |
1020 See :meth:`cubicweb.dbapi.Cursor.execute` documentation. |
808 See :meth:`cubicweb.dbapi.Cursor.execute` documentation. |
1024 rset.req = self |
812 rset.req = self |
1025 self._session_timestamp.touch() |
813 self._session_timestamp.touch() |
1026 return rset |
814 return rset |
1027 |
815 |
1028 @_open_only |
816 @_open_only |
1029 def rollback(self, free_cnxset=True, reset_pool=None): |
817 def rollback(self, free_cnxset=None, reset_pool=None): |
1030 """rollback the current transaction""" |
818 """rollback the current transaction""" |
|
819 if free_cnxset is not None: |
|
820 warn('[3.21] free_cnxset is now unneeded', |
|
821 DeprecationWarning, stacklevel=2) |
1031 if reset_pool is not None: |
822 if reset_pool is not None: |
1032 warn('[3.13] use free_cnxset argument instead for reset_pool', |
823 warn('[3.13] reset_pool is now unneeded', |
1033 DeprecationWarning, stacklevel=2) |
824 DeprecationWarning, stacklevel=2) |
1034 free_cnxset = reset_pool |
|
1035 if self._cnxset_count != 0: |
|
1036 # we are inside ensure_cnx_set, don't lose it |
|
1037 free_cnxset = False |
|
1038 cnxset = self.cnxset |
825 cnxset = self.cnxset |
1039 if cnxset is None: |
826 assert cnxset is not None |
1040 self.clear() |
|
1041 self._session_timestamp.touch() |
|
1042 self.debug('rollback transaction %s done (no db activity)', self.connectionid) |
|
1043 return |
|
1044 try: |
827 try: |
1045 # by default, operations are executed with security turned off |
828 # by default, operations are executed with security turned off |
1046 with self.security_enabled(False, False): |
829 with self.security_enabled(False, False): |
1047 while self.pending_operations: |
830 while self.pending_operations: |
1048 try: |
831 try: |
1053 continue |
836 continue |
1054 cnxset.rollback() |
837 cnxset.rollback() |
1055 self.debug('rollback for transaction %s done', self.connectionid) |
838 self.debug('rollback for transaction %s done', self.connectionid) |
1056 finally: |
839 finally: |
1057 self._session_timestamp.touch() |
840 self._session_timestamp.touch() |
1058 if free_cnxset: |
|
1059 self._free_cnxset(ignoremode=True) |
|
1060 self.clear() |
841 self.clear() |
1061 |
842 |
1062 @_open_only |
843 @_open_only |
1063 def commit(self, free_cnxset=True, reset_pool=None): |
844 def commit(self, free_cnxset=None, reset_pool=None): |
1064 """commit the current session's transaction""" |
845 """commit the current session's transaction""" |
|
846 if free_cnxset is not None: |
|
847 warn('[3.21] free_cnxset is now unneeded', |
|
848 DeprecationWarning, stacklevel=2) |
1065 if reset_pool is not None: |
849 if reset_pool is not None: |
1066 warn('[3.13] use free_cnxset argument instead for reset_pool', |
850 warn('[3.13] reset_pool is now unneeded', |
1067 DeprecationWarning, stacklevel=2) |
851 DeprecationWarning, stacklevel=2) |
1068 free_cnxset = reset_pool |
852 assert self.cnxset is not None |
1069 if self.cnxset is None: |
|
1070 assert not self.pending_operations |
|
1071 self.clear() |
|
1072 self._session_timestamp.touch() |
|
1073 self.debug('commit transaction %s done (no db activity)', self.connectionid) |
|
1074 return |
|
1075 if self._cnxset_count != 0: |
|
1076 # we are inside ensure_cnx_set, don't lose it |
|
1077 free_cnxset = False |
|
1078 cstate = self.commit_state |
853 cstate = self.commit_state |
1079 if cstate == 'uncommitable': |
854 if cstate == 'uncommitable': |
1080 raise QueryError('transaction must be rolled back') |
855 raise QueryError('transaction must be rolled back') |
1081 if cstate == 'precommit': |
856 if cstate == 'precommit': |
1082 self.warn('calling commit in precommit makes no sense; ignoring commit') |
857 self.warn('calling commit in precommit makes no sense; ignoring commit') |
1153 exc_info=sys.exc_info()) |
928 exc_info=sys.exc_info()) |
1154 self.debug('postcommit transaction %s done', self.connectionid) |
929 self.debug('postcommit transaction %s done', self.connectionid) |
1155 return self.transaction_uuid(set=False) |
930 return self.transaction_uuid(set=False) |
1156 finally: |
931 finally: |
1157 self._session_timestamp.touch() |
932 self._session_timestamp.touch() |
1158 if free_cnxset: |
|
1159 self._free_cnxset(ignoremode=True) |
|
1160 self.clear() |
933 self.clear() |
1161 |
934 |
1162 # resource accessors ###################################################### |
935 # resource accessors ###################################################### |
1163 |
936 |
1164 @_with_cnx_set |
|
1165 @_open_only |
937 @_open_only |
1166 def call_service(self, regid, **kwargs): |
938 def call_service(self, regid, **kwargs): |
1167 self.debug('calling service %s', regid) |
939 self.debug('calling service %s', regid) |
1168 service = self.vreg['services'].select(regid, self, **kwargs) |
940 service = self.vreg['services'].select(regid, self, **kwargs) |
1169 return service.call(**kwargs) |
941 return service.call(**kwargs) |
1170 |
942 |
1171 @_with_cnx_set |
|
1172 @_open_only |
943 @_open_only |
1173 def system_sql(self, sql, args=None, rollback_on_failure=True): |
944 def system_sql(self, sql, args=None, rollback_on_failure=True): |
1174 """return a sql cursor on the system database""" |
945 """return a sql cursor on the system database""" |
1175 if sql.split(None, 1)[0].upper() != 'SELECT': |
|
1176 self.mode = 'write' |
|
1177 source = self.repo.system_source |
946 source = self.repo.system_source |
1178 try: |
947 try: |
1179 return source.doexec(self, sql, args, rollback=rollback_on_failure) |
948 return source.doexec(self, sql, args, rollback=rollback_on_failure) |
1180 except (source.OperationalError, source.InterfaceError): |
949 except (source.OperationalError, source.InterfaceError): |
1181 if not rollback_on_failure: |
950 if not rollback_on_failure: |