[connection] transparent cnx_set handling
Connection object while take cares of there cnxset themself (as dbapi connection
does).
The ``set_cnxset`` and ``free_cnxset`` operation are still available for
backward compatibility purpose. The ``_auto_free_cnx_set`` is introduced to
handle mixed usage.
A new context manager ``connection.ensure_cnx_set`` is added for code that
access ``cnx.cnxset`` directly and are not wrapped in any specific
``Connection`` method.
A ``_with_cnx_set`` decorator is used on all Connection method that need a
cnxset.
--- a/devtools/repotest.py Thu Jun 27 10:44:40 2013 +0200
+++ b/devtools/repotest.py Wed Jun 26 11:21:39 2013 +0200
@@ -324,7 +324,8 @@
select.solutions.sort()
else:
rqlst.solutions.sort()
- return self.o.plan_factory(rqlst, kwargs, self.session)
+ with self.session.ensure_cnx_set:
+ return self.o.plan_factory(rqlst, kwargs, self.session)
# monkey patch some methods to get predicatable results #######################
--- a/repoapi.py Thu Jun 27 10:44:40 2013 +0200
+++ b/repoapi.py Wed Jun 26 11:21:39 2013 +0200
@@ -189,11 +189,7 @@
session object"""
if not self._open:
raise ProgrammingError('Closed connection %s' % self._cnxid)
- self._cnx.set_cnxset()
- try:
- yield self._cnx
- finally:
- self._cnx.free_cnxset()
+ yield self._cnx
# Main Connection purpose in life #########################################
--- a/server/session.py Thu Jun 27 10:44:40 2013 +0200
+++ b/server/session.py Wed Jun 26 11:21:39 2013 +0200
@@ -24,6 +24,8 @@
from uuid import uuid4
from warnings import warn
import json
+import functools
+from contextlib import contextmanager
from logilab.common.deprecation import deprecated
from logilab.common.textutils import unormalize
@@ -360,6 +362,16 @@
timeout -= time() - start
return tuple(self._record)
+
+def _with_cnx_set(func):
+ """decorator for Connection method that ensure they run with a cnxset """
+ @functools.wraps(func)
+ def wrapper(cnx, *args, **kwargs):
+ with cnx.ensure_cnx_set:
+ return func(cnx, *args, **kwargs)
+ return wrapper
+
+
class Connection(RequestSessionBase):
"""Repository Connection
@@ -426,6 +438,12 @@
self.connectionid = cnxid
#: reentrance handling
self.ctx_count = 0
+ #: count the number of entry in a context needing a cnxset
+ self._cnxset_count = 0
+ #: Boolean for compat with the older explicite set_cnxset/free_cnx API
+ #: When a call set_cnxset is done, no automatic freeing will be done
+ #: until free_cnx is called.
+ self._auto_free_cnx_set = True
#: server.Repository object
self.repo = session.repo
@@ -541,7 +559,7 @@
self._cnxset = new_cnxset
self.ctx_count += 1
- def set_cnxset(self):
+ def _set_cnxset(self):
"""the connection need a connections set to execute some queries"""
if self.cnxset is None:
cnxset = self.repo._get_cnxset()
@@ -557,7 +575,7 @@
raise
return self.cnxset
- def free_cnxset(self, ignoremode=False):
+ def _free_cnxset(self, ignoremode=False):
"""the connection is no longer using its connections set, at least for some time"""
# cnxset may be none if no operation has been done since last commit
# or rollback
@@ -569,6 +587,29 @@
cnxset.cnxset_freed()
self.repo._free_cnxset(cnxset)
+ def set_cnxset(self):
+ self._auto_free_cnx_set = False
+ return self._set_cnxset()
+
+ def free_cnxset(self, ignoremode=False):
+ self._auto_free_cnx_set = True
+ return self._free_cnxset(ignoremode=ignoremode)
+
+
+ @property
+ @contextmanager
+ def ensure_cnx_set(self):
+ assert self._cnxset_count >= 0
+ if self._cnxset_count == 0:
+ self._set_cnxset()
+ try:
+ self._cnxset_count += 1
+ yield
+ finally:
+ self._cnxset_count = max(self._cnxset_count - 1, 0)
+ if self._cnxset_count == 0 and self._auto_free_cnx_set:
+ self._free_cnxset()
+
# Entity cache management #################################################
#
@@ -869,6 +910,7 @@
def source_defs(self):
return self.repo.source_defs()
+ @_with_cnx_set
def describe(self, eid, asdict=False):
"""return a tuple (type, sourceuri, extid) for the entity with id <eid>"""
metas = self.repo.type_and_source_from_eid(eid, self)
@@ -877,13 +919,14 @@
# XXX :-1 for cw compat, use asdict=True for full information
return metas[:-1]
-
+ @_with_cnx_set
def source_from_eid(self, eid):
"""return the source where the entity with id <eid> is located"""
return self.repo.source_from_eid(eid, self)
# core method #############################################################
+ @_with_cnx_set
def execute(self, rql, kwargs=None, eid_key=None, build_descr=True):
"""db-api like method directly linked to the querier execute method.
@@ -1018,24 +1061,22 @@
# resource accessors ######################################################
+ @_with_cnx_set
def call_service(self, regid, **kwargs):
json.dumps(kwargs) # This line ensure that people use serialisable
# argument for call service. this is very important
# to enforce that from start to make sure RPC
# version is available.
self.info('calling service %s', regid)
- self.set_cnxset()
- try:
- service = self.vreg['services'].select(regid, self, **kwargs)
- result = service.call(**kwargs)
- json.dumps(result) # This line ensure that service have serialisable
- # output. this is very important to enforce that
- # from start to make sure RPC version is
- # available.
- return result
- finally:
- self.free_cnxset()
+ service = self.vreg['services'].select(regid, self, **kwargs)
+ result = service.call(**kwargs)
+ json.dumps(result) # This line ensure that service have serialisable
+ # output. this is very important to enforce that
+ # from start to make sure RPC version is
+ # available.
+ return result
+ @_with_cnx_set
def system_sql(self, sql, args=None, rollback_on_failure=True):
"""return a sql cursor on the system database"""
if sql.split(None, 1)[0].upper() != 'SELECT':
@@ -1385,6 +1426,7 @@
raise SessionClosedError('try to set connections set on a closed session %s' % self.id)
return self._cnx.set_cnxset()
free_cnxset = cnx_meth('free_cnxset')
+ ensure_cnx_set = cnx_attr('ensure_cnx_set')
def _touch(self):
"""update latest session usage timestamp and reset mode to read"""
--- a/server/sources/native.py Thu Jun 27 10:44:40 2013 +0200
+++ b/server/sources/native.py Wed Jun 26 11:21:39 2013 +0200
@@ -977,34 +977,35 @@
def add_info(self, session, entity, source, extid, complete):
"""add type and source info for an eid into the system table"""
- # begin by inserting eid/type/source/extid into the entities table
- if extid is not None:
- assert isinstance(extid, str)
- extid = b64encode(extid)
- uri = 'system' if source.copy_based_source else source.uri
- attrs = {'type': entity.cw_etype, 'eid': entity.eid, 'extid': extid,
- 'source': uri, 'asource': source.uri, 'mtime': datetime.utcnow()}
- self._handle_insert_entity_sql(session, self.sqlgen.insert('entities', attrs), attrs)
- # insert core relations: is, is_instance_of and cw_source
- try:
- self._handle_is_relation_sql(session, 'INSERT INTO is_relation(eid_from,eid_to) VALUES (%s,%s)',
- (entity.eid, eschema_eid(session, entity.e_schema)))
- except IndexError:
- # during schema serialization, skip
- pass
- else:
- for eschema in entity.e_schema.ancestors() + [entity.e_schema]:
- self._handle_is_relation_sql(session,
- 'INSERT INTO is_instance_of_relation(eid_from,eid_to) VALUES (%s,%s)',
- (entity.eid, eschema_eid(session, eschema)))
- if 'CWSource' in self.schema and source.eid is not None: # else, cw < 3.10
- self._handle_is_relation_sql(session, 'INSERT INTO cw_source_relation(eid_from,eid_to) VALUES (%s,%s)',
- (entity.eid, source.eid))
- # now we can update the full text index
- if self.do_fti and self.need_fti_indexation(entity.cw_etype):
- if complete:
- entity.complete(entity.e_schema.indexable_attributes())
- self.index_entity(session, entity=entity)
+ with session.ensure_cnx_set:
+ # begin by inserting eid/type/source/extid into the entities table
+ if extid is not None:
+ assert isinstance(extid, str)
+ extid = b64encode(extid)
+ uri = 'system' if source.copy_based_source else source.uri
+ attrs = {'type': entity.cw_etype, 'eid': entity.eid, 'extid': extid,
+ 'source': uri, 'asource': source.uri, 'mtime': datetime.utcnow()}
+ self._handle_insert_entity_sql(session, self.sqlgen.insert('entities', attrs), attrs)
+ # insert core relations: is, is_instance_of and cw_source
+ try:
+ self._handle_is_relation_sql(session, 'INSERT INTO is_relation(eid_from,eid_to) VALUES (%s,%s)',
+ (entity.eid, eschema_eid(session, entity.e_schema)))
+ except IndexError:
+ # during schema serialization, skip
+ pass
+ else:
+ for eschema in entity.e_schema.ancestors() + [entity.e_schema]:
+ self._handle_is_relation_sql(session,
+ 'INSERT INTO is_instance_of_relation(eid_from,eid_to) VALUES (%s,%s)',
+ (entity.eid, eschema_eid(session, eschema)))
+ if 'CWSource' in self.schema and source.eid is not None: # else, cw < 3.10
+ self._handle_is_relation_sql(session, 'INSERT INTO cw_source_relation(eid_from,eid_to) VALUES (%s,%s)',
+ (entity.eid, source.eid))
+ # now we can update the full text index
+ if self.do_fti and self.need_fti_indexation(entity.cw_etype):
+ if complete:
+ entity.complete(entity.e_schema.indexable_attributes())
+ self.index_entity(session, entity=entity)
def update_info(self, session, entity, need_fti_update):
"""mark entity as being modified, fulltext reindex if needed"""
@@ -1227,17 +1228,19 @@
raise `NoSuchTransaction` if there is no such transaction of if the
session's user isn't allowed to see it.
"""
- restr = {'tx_uuid': txuuid}
- sql = self.sqlgen.select('transactions', restr, ('tx_time', 'tx_user'))
- cu = self.doexec(session, sql, restr)
- try:
- time, ueid = cu.fetchone()
- except TypeError:
- raise tx.NoSuchTransaction(txuuid)
- if not (session.user.is_in_group('managers')
- or session.user.eid == ueid):
- raise tx.NoSuchTransaction(txuuid)
- return time, ueid
+ with session.ensure_cnx_set:
+ restr = {'tx_uuid': txuuid}
+ sql = self.sqlgen.select('transactions', restr,
+ ('tx_time', 'tx_user'))
+ cu = self.doexec(session, sql, restr)
+ try:
+ time, ueid = cu.fetchone()
+ except TypeError:
+ raise tx.NoSuchTransaction(txuuid)
+ if not (session.user.is_in_group('managers')
+ or session.user.eid == ueid):
+ raise tx.NoSuchTransaction(txuuid)
+ return time, ueid
def _reedit_entity(self, entity, changes, err):
session = entity._cw