# HG changeset patch # User Pierre-Yves David # Date 1372238499 -7200 # Node ID 5467fb9019315719f790ab8a34335eb4c6d7fe9f # Parent 39be1e548270c63a213d552e6d40adfd55b4496a [connection] transparent cnx_set handling Connection object while take cares of there cnxset themself (as dbapi connection does). The ``set_cnxset`` and ``free_cnxset`` operation are still available for backward compatibility purpose. The ``_auto_free_cnx_set`` is introduced to handle mixed usage. A new context manager ``connection.ensure_cnx_set`` is added for code that access ``cnx.cnxset`` directly and are not wrapped in any specific ``Connection`` method. A ``_with_cnx_set`` decorator is used on all Connection method that need a cnxset. diff -r 39be1e548270 -r 5467fb901931 devtools/repotest.py --- a/devtools/repotest.py Thu Jun 27 10:44:40 2013 +0200 +++ b/devtools/repotest.py Wed Jun 26 11:21:39 2013 +0200 @@ -324,7 +324,8 @@ select.solutions.sort() else: rqlst.solutions.sort() - return self.o.plan_factory(rqlst, kwargs, self.session) + with self.session.ensure_cnx_set: + return self.o.plan_factory(rqlst, kwargs, self.session) # monkey patch some methods to get predicatable results ####################### diff -r 39be1e548270 -r 5467fb901931 repoapi.py --- a/repoapi.py Thu Jun 27 10:44:40 2013 +0200 +++ b/repoapi.py Wed Jun 26 11:21:39 2013 +0200 @@ -189,11 +189,7 @@ session object""" if not self._open: raise ProgrammingError('Closed connection %s' % self._cnxid) - self._cnx.set_cnxset() - try: - yield self._cnx - finally: - self._cnx.free_cnxset() + yield self._cnx # Main Connection purpose in life ######################################### diff -r 39be1e548270 -r 5467fb901931 server/session.py --- a/server/session.py Thu Jun 27 10:44:40 2013 +0200 +++ b/server/session.py Wed Jun 26 11:21:39 2013 +0200 @@ -24,6 +24,8 @@ from uuid import uuid4 from warnings import warn import json +import functools +from contextlib import contextmanager from logilab.common.deprecation import deprecated from logilab.common.textutils import unormalize @@ -360,6 +362,16 @@ timeout -= time() - start return tuple(self._record) + +def _with_cnx_set(func): + """decorator for Connection method that ensure they run with a cnxset """ + @functools.wraps(func) + def wrapper(cnx, *args, **kwargs): + with cnx.ensure_cnx_set: + return func(cnx, *args, **kwargs) + return wrapper + + class Connection(RequestSessionBase): """Repository Connection @@ -426,6 +438,12 @@ self.connectionid = cnxid #: reentrance handling self.ctx_count = 0 + #: count the number of entry in a context needing a cnxset + self._cnxset_count = 0 + #: Boolean for compat with the older explicite set_cnxset/free_cnx API + #: When a call set_cnxset is done, no automatic freeing will be done + #: until free_cnx is called. + self._auto_free_cnx_set = True #: server.Repository object self.repo = session.repo @@ -541,7 +559,7 @@ self._cnxset = new_cnxset self.ctx_count += 1 - def set_cnxset(self): + def _set_cnxset(self): """the connection need a connections set to execute some queries""" if self.cnxset is None: cnxset = self.repo._get_cnxset() @@ -557,7 +575,7 @@ raise return self.cnxset - def free_cnxset(self, ignoremode=False): + def _free_cnxset(self, ignoremode=False): """the connection is no longer using its connections set, at least for some time""" # cnxset may be none if no operation has been done since last commit # or rollback @@ -569,6 +587,29 @@ cnxset.cnxset_freed() self.repo._free_cnxset(cnxset) + def set_cnxset(self): + self._auto_free_cnx_set = False + return self._set_cnxset() + + def free_cnxset(self, ignoremode=False): + self._auto_free_cnx_set = True + return self._free_cnxset(ignoremode=ignoremode) + + + @property + @contextmanager + def ensure_cnx_set(self): + assert self._cnxset_count >= 0 + if self._cnxset_count == 0: + self._set_cnxset() + try: + self._cnxset_count += 1 + yield + finally: + self._cnxset_count = max(self._cnxset_count - 1, 0) + if self._cnxset_count == 0 and self._auto_free_cnx_set: + self._free_cnxset() + # Entity cache management ################################################# # @@ -869,6 +910,7 @@ def source_defs(self): return self.repo.source_defs() + @_with_cnx_set def describe(self, eid, asdict=False): """return a tuple (type, sourceuri, extid) for the entity with id """ metas = self.repo.type_and_source_from_eid(eid, self) @@ -877,13 +919,14 @@ # XXX :-1 for cw compat, use asdict=True for full information return metas[:-1] - + @_with_cnx_set def source_from_eid(self, eid): """return the source where the entity with id is located""" return self.repo.source_from_eid(eid, self) # core method ############################################################# + @_with_cnx_set def execute(self, rql, kwargs=None, eid_key=None, build_descr=True): """db-api like method directly linked to the querier execute method. @@ -1018,24 +1061,22 @@ # resource accessors ###################################################### + @_with_cnx_set def call_service(self, regid, **kwargs): json.dumps(kwargs) # This line ensure that people use serialisable # argument for call service. this is very important # to enforce that from start to make sure RPC # version is available. self.info('calling service %s', regid) - self.set_cnxset() - try: - service = self.vreg['services'].select(regid, self, **kwargs) - result = service.call(**kwargs) - json.dumps(result) # This line ensure that service have serialisable - # output. this is very important to enforce that - # from start to make sure RPC version is - # available. - return result - finally: - self.free_cnxset() + service = self.vreg['services'].select(regid, self, **kwargs) + result = service.call(**kwargs) + json.dumps(result) # This line ensure that service have serialisable + # output. this is very important to enforce that + # from start to make sure RPC version is + # available. + return result + @_with_cnx_set def system_sql(self, sql, args=None, rollback_on_failure=True): """return a sql cursor on the system database""" if sql.split(None, 1)[0].upper() != 'SELECT': @@ -1385,6 +1426,7 @@ raise SessionClosedError('try to set connections set on a closed session %s' % self.id) return self._cnx.set_cnxset() free_cnxset = cnx_meth('free_cnxset') + ensure_cnx_set = cnx_attr('ensure_cnx_set') def _touch(self): """update latest session usage timestamp and reset mode to read""" diff -r 39be1e548270 -r 5467fb901931 server/sources/native.py --- a/server/sources/native.py Thu Jun 27 10:44:40 2013 +0200 +++ b/server/sources/native.py Wed Jun 26 11:21:39 2013 +0200 @@ -977,34 +977,35 @@ def add_info(self, session, entity, source, extid, complete): """add type and source info for an eid into the system table""" - # begin by inserting eid/type/source/extid into the entities table - if extid is not None: - assert isinstance(extid, str) - extid = b64encode(extid) - uri = 'system' if source.copy_based_source else source.uri - attrs = {'type': entity.cw_etype, 'eid': entity.eid, 'extid': extid, - 'source': uri, 'asource': source.uri, 'mtime': datetime.utcnow()} - self._handle_insert_entity_sql(session, self.sqlgen.insert('entities', attrs), attrs) - # insert core relations: is, is_instance_of and cw_source - try: - self._handle_is_relation_sql(session, 'INSERT INTO is_relation(eid_from,eid_to) VALUES (%s,%s)', - (entity.eid, eschema_eid(session, entity.e_schema))) - except IndexError: - # during schema serialization, skip - pass - else: - for eschema in entity.e_schema.ancestors() + [entity.e_schema]: - self._handle_is_relation_sql(session, - 'INSERT INTO is_instance_of_relation(eid_from,eid_to) VALUES (%s,%s)', - (entity.eid, eschema_eid(session, eschema))) - if 'CWSource' in self.schema and source.eid is not None: # else, cw < 3.10 - self._handle_is_relation_sql(session, 'INSERT INTO cw_source_relation(eid_from,eid_to) VALUES (%s,%s)', - (entity.eid, source.eid)) - # now we can update the full text index - if self.do_fti and self.need_fti_indexation(entity.cw_etype): - if complete: - entity.complete(entity.e_schema.indexable_attributes()) - self.index_entity(session, entity=entity) + with session.ensure_cnx_set: + # begin by inserting eid/type/source/extid into the entities table + if extid is not None: + assert isinstance(extid, str) + extid = b64encode(extid) + uri = 'system' if source.copy_based_source else source.uri + attrs = {'type': entity.cw_etype, 'eid': entity.eid, 'extid': extid, + 'source': uri, 'asource': source.uri, 'mtime': datetime.utcnow()} + self._handle_insert_entity_sql(session, self.sqlgen.insert('entities', attrs), attrs) + # insert core relations: is, is_instance_of and cw_source + try: + self._handle_is_relation_sql(session, 'INSERT INTO is_relation(eid_from,eid_to) VALUES (%s,%s)', + (entity.eid, eschema_eid(session, entity.e_schema))) + except IndexError: + # during schema serialization, skip + pass + else: + for eschema in entity.e_schema.ancestors() + [entity.e_schema]: + self._handle_is_relation_sql(session, + 'INSERT INTO is_instance_of_relation(eid_from,eid_to) VALUES (%s,%s)', + (entity.eid, eschema_eid(session, eschema))) + if 'CWSource' in self.schema and source.eid is not None: # else, cw < 3.10 + self._handle_is_relation_sql(session, 'INSERT INTO cw_source_relation(eid_from,eid_to) VALUES (%s,%s)', + (entity.eid, source.eid)) + # now we can update the full text index + if self.do_fti and self.need_fti_indexation(entity.cw_etype): + if complete: + entity.complete(entity.e_schema.indexable_attributes()) + self.index_entity(session, entity=entity) def update_info(self, session, entity, need_fti_update): """mark entity as being modified, fulltext reindex if needed""" @@ -1227,17 +1228,19 @@ raise `NoSuchTransaction` if there is no such transaction of if the session's user isn't allowed to see it. """ - restr = {'tx_uuid': txuuid} - sql = self.sqlgen.select('transactions', restr, ('tx_time', 'tx_user')) - cu = self.doexec(session, sql, restr) - try: - time, ueid = cu.fetchone() - except TypeError: - raise tx.NoSuchTransaction(txuuid) - if not (session.user.is_in_group('managers') - or session.user.eid == ueid): - raise tx.NoSuchTransaction(txuuid) - return time, ueid + with session.ensure_cnx_set: + restr = {'tx_uuid': txuuid} + sql = self.sqlgen.select('transactions', restr, + ('tx_time', 'tx_user')) + cu = self.doexec(session, sql, restr) + try: + time, ueid = cu.fetchone() + except TypeError: + raise tx.NoSuchTransaction(txuuid) + if not (session.user.is_in_group('managers') + or session.user.eid == ueid): + raise tx.NoSuchTransaction(txuuid) + return time, ueid def _reedit_entity(self, entity, changes, err): session = entity._cw