--- a/devtools/apptest.py Fri Apr 24 19:49:39 2009 +0200
+++ b/devtools/apptest.py Fri Apr 24 19:49:47 2009 +0200
@@ -32,11 +32,11 @@
@property
def message(self):
return message_from_string(self.msg)
-
+
def __repr__(self):
return '<Email to %s with subject %s>' % (','.join(self.recipients),
self.message.get('Subject'))
-
+
class MockSMTP:
def __init__(self, server, port):
pass
@@ -100,7 +100,7 @@
env = None
configcls = ApptestConfiguration
requestcls = FakeRequest
-
+
# user / session management ###############################################
def user(self, req=None):
@@ -118,13 +118,13 @@
def restore_connection(self):
self.env.restore_connection()
-
+
# db api ##################################################################
@nocoverage
def cursor(self, req=None):
return self.env.cnx.cursor(req or self.request())
-
+
@nocoverage
def execute(self, *args, **kwargs):
return self.env.execute(*args, **kwargs)
@@ -132,19 +132,19 @@
@nocoverage
def commit(self):
self.env.cnx.commit()
-
+
@nocoverage
def rollback(self):
try:
self.env.cnx.rollback()
except ProgrammingError:
pass
-
+
# other utilities #########################################################
def set_debug(self, debugmode):
from cubicweb.server import set_debug
set_debug(debugmode)
-
+
@property
def config(self):
return self.vreg.config
@@ -152,7 +152,7 @@
def session(self):
"""return current server side session (using default manager account)"""
return self.env.repo._sessions[self.env.cnx.sessionid]
-
+
def request(self, *args, **kwargs):
"""return a web interface request"""
return self.env.create_request(*args, **kwargs)
@@ -160,16 +160,16 @@
@nocoverage
def rset_and_req(self, *args, **kwargs):
return self.env.get_rset_and_req(*args, **kwargs)
-
+
def entity(self, rql, args=None, eidkey=None, req=None):
return self.execute(rql, args, eidkey, req=req).get_entity(0, 0)
-
+
def etype_instance(self, etype, req=None):
req = req or self.request()
e = self.env.vreg.etype_class(etype)(req, None, None)
e.eid = None
return e
-
+
def add_entity(self, etype, **kwargs):
rql = ['INSERT %s X' % etype]
@@ -185,15 +185,15 @@
sub_rql = []
for key, value in kwargs.iteritems():
# entities
- if hasattr(value, 'eid'):
+ if hasattr(value, 'eid'):
new_value = "%s__" % key.upper()
-
+
entities[new_value] = value.eid
rql_args[new_value] = value.eid
-
+
sub_rql.append("X %s %s" % (key, new_value))
# final attributes
- else:
+ else:
sub_rql.append('X %s %%(%s)s' % (key, key))
rql_args[key] = value
rql.append(', '.join(sub_rql))
@@ -215,8 +215,8 @@
self.vreg.config.global_set_option(optname, value)
def pviews(self, req, rset):
- return sorted((a.id, a.__class__) for a in self.vreg.possible_views(req, rset))
-
+ return sorted((a.id, a.__class__) for a in self.vreg.possible_views(req, rset))
+
def pactions(self, req, rset, skipcategories=('addrelated', 'siteactions', 'useractions')):
return [(a.id, a.__class__) for a in self.vreg.possible_vobjects('actions', req, rset)
if a.category not in skipcategories]
@@ -224,7 +224,7 @@
def pactions_by_cats(self, req, rset, categories=('addrelated',)):
return [(a.id, a.__class__) for a in self.vreg.possible_vobjects('actions', req, rset)
if a.category in categories]
-
+
paddrelactions = deprecated_function(pactions_by_cats)
def pactionsdict(self, req, rset, skipcategories=('addrelated', 'siteactions', 'useractions')):
@@ -234,7 +234,7 @@
res.setdefault(a.category, []).append(a.__class__)
return res
-
+
def remote_call(self, fname, *args):
"""remote call simulation"""
dump = simplejson.dumps
@@ -244,7 +244,7 @@
return ctrl.publish(), req
# default test setup and teardown #########################################
-
+
def setup_database(self):
pass
@@ -264,7 +264,7 @@
self.setup_database()
self.commit()
MAILBOX[:] = [] # reset mailbox
-
+
@nocoverage
def tearDown(self):
self.rollback()
@@ -355,7 +355,7 @@
"""
__metaclass__ = autorepo
repo_config = None # set a particular config instance if necessary
-
+
# user / session management ###############################################
def create_user(self, user, groups=('users',), password=None, commit=True):
@@ -369,9 +369,9 @@
{'x': eid})
if commit:
self.commit()
- self.session.reset_pool()
+ self.session.reset_pool()
return eid
-
+
def login(self, login, password=None):
cnx = repo_connect(self.repo, unicode(login), password or login,
ConnectionProperties('inmemory'))
@@ -380,7 +380,7 @@
def current_session(self):
return self.repo._sessions[self.cnxs[-1].sessionid]
-
+
def restore_connection(self):
assert len(self.cnxs) == 1, self.cnxs
cnx = self.cnxs.pop()
@@ -388,7 +388,7 @@
cnx.close()
except Exception, ex:
print "exception occured while closing connection", ex
-
+
# db api ##################################################################
def execute(self, rql, args=None, eid_key=None):
@@ -400,27 +400,27 @@
# application entities for convenience
self.session.set_pool()
return rset
-
+
def commit(self):
self.__commit(self.cnxid)
- self.session.set_pool()
-
+ self.session.set_pool()
+
def rollback(self):
self.__rollback(self.cnxid)
- self.session.set_pool()
-
+ self.session.set_pool()
+
def close(self):
self.__close(self.cnxid)
# other utilities #########################################################
-
+
def set_debug(self, debugmode):
from cubicweb.server import set_debug
set_debug(debugmode)
-
+
def set_option(self, optname, value):
self.vreg.config.global_set_option(optname, value)
-
+
def add_entity(self, etype, **kwargs):
restrictions = ', '.join('X %s %%(%s)s' % (key, key) for key in kwargs)
rql = 'INSERT %s X' % etype
@@ -434,7 +434,7 @@
user = unicode(config.sources()['system']['db-user'])
passwd = config.sources()['system']['db-password']
return user, passwd
-
+
def close_connections(self):
for cnx in self.cnxs:
try:
@@ -446,10 +446,10 @@
pactions = EnvBasedTC.pactions.im_func
pactionsdict = EnvBasedTC.pactionsdict.im_func
-
+
# default test setup and teardown #########################################
copy_schema = False
-
+
def _prepare(self):
MAILBOX[:] = [] # reset mailbox
if hasattr(self, 'cnxid'):
@@ -494,15 +494,15 @@
@property
def schema(self):
return self.repo.schema
-
+
def setUp(self):
self._prepare()
self.session.set_pool()
self.maxeid = self.session.system_sql('SELECT MAX(eid) FROM entities').fetchone()[0]
-
+
def tearDown(self):
self.close_connections()
self.rollback()
self.session.unsafe_execute('DELETE Any X WHERE X eid > %(x)s', {'x': self.maxeid})
self.commit()
-
+