--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/devtools/_apptest.py Wed Nov 05 15:52:50 2008 +0100
@@ -0,0 +1,257 @@
+"""Hidden internals for the devtools.apptest module
+
+:organization: Logilab
+:copyright: 2001-2008 LOGILAB S.A. (Paris, FRANCE), all rights reserved.
+:contact: http://www.logilab.fr/ -- mailto:contact@logilab.fr
+"""
+__docformat__ = "restructuredtext en"
+
+import sys, traceback
+
+from logilab.common.pytest import pause_tracing, resume_tracing
+
+import yams.schema
+
+from cubicweb.dbapi import repo_connect, ConnectionProperties, ProgrammingError
+from cubicweb.cwvreg import CubicWebRegistry
+
+from cubicweb.web.application import CubicWebPublisher
+from cubicweb.web import Redirect
+
+from cubicweb.devtools import ApptestConfiguration, init_test_database
+from cubicweb.devtools.fake import FakeRequest
+
+SYSTEM_ENTITIES = ('EGroup', 'EUser',
+ 'EFRDef', 'ENFRDef',
+ 'EConstraint', 'EConstraintType', 'EProperty',
+ 'EEType', 'ERType',
+ 'State', 'Transition', 'TrInfo',
+ 'RQLExpression',
+ )
+SYSTEM_RELATIONS = (
+ # virtual relation
+ 'identity',
+ # metadata
+ 'is', 'is_instance_of', 'owned_by', 'created_by', 'specializes',
+ # workflow related
+ 'state_of', 'transition_of', 'initial_state', 'allowed_transition',
+ 'destination_state', 'in_state', 'wf_info_for', 'from_state', 'to_state',
+ 'condition',
+ # permission
+ 'in_group', 'require_group', 'require_permission',
+ 'read_permission', 'update_permission', 'delete_permission', 'add_permission',
+ # eproperty
+ 'for_user',
+ # schema definition
+ 'relation_type', 'from_entity', 'to_entity',
+ 'constrained_by', 'cstrtype', 'widget',
+ # deducted from other relations
+ 'primary_email',
+ )
+
+def unprotected_entities(app_schema, strict=False):
+ """returned a Set of each non final entity type, excluding EGroup, and EUser...
+ """
+ if strict:
+ protected_entities = yams.schema.BASE_TYPES
+ else:
+ protected_entities = yams.schema.BASE_TYPES.union(set(SYSTEM_ENTITIES))
+ entities = set(app_schema.entities())
+ return entities - protected_entities
+
+
+def ignore_relations(*relations):
+ SYSTEM_RELATIONS += relations
+
+class TestEnvironment(object):
+ """TestEnvironment defines a context (e.g. a config + a given connection) in
+ which the tests are executed
+ """
+
+ def __init__(self, appid, reporter=None, verbose=False,
+ configcls=ApptestConfiguration, requestcls=FakeRequest):
+ config = configcls(appid)
+ self.requestcls = requestcls
+ self.cnx = None
+ config.db_perms = False
+ source = config.sources()['system']
+ if verbose:
+ print "init test database ..."
+ self.vreg = vreg = CubicWebRegistry(config)
+ self.admlogin = source['db-user']
+ # restore database <=> init database
+ self.restore_database()
+ if verbose:
+ print "init done"
+ login = source['db-user']
+ config.repository = lambda x=None: self.repo
+ self.app = CubicWebPublisher(config, vreg=vreg)
+ self.verbose = verbose
+ schema = self.vreg.schema
+ # else we may run into problems since email address are ususally share in app tests
+ # XXX should not be necessary anymore
+ schema.rschema('primary_email').set_rproperty('EUser', 'EmailAddress', 'composite', False)
+ self.deletable_entities = unprotected_entities(schema)
+
+ def restore_database(self):
+ """called by unittests' tearDown to restore the original database
+ """
+ try:
+ pause_tracing()
+ if self.cnx:
+ self.cnx.close()
+ source = self.vreg.config.sources()['system']
+ self.repo, self.cnx = init_test_database(driver=source['db-driver'],
+ vreg=self.vreg)
+ self._orig_cnx = self.cnx
+ resume_tracing()
+ except:
+ resume_tracing()
+ traceback.print_exc()
+ sys.exit(1)
+ # XXX cnx decoration is usually done by the repository authentication manager,
+ # necessary in authentication tests
+ self.cnx.vreg = self.vreg
+ self.cnx.login = source['db-user']
+ self.cnx.password = source['db-password']
+
+
+ def create_user(self, login, groups=('users',), req=None):
+ req = req or self.create_request()
+ cursor = self._orig_cnx.cursor(req)
+ rset = cursor.execute('INSERT EUser X: X login %(login)s, X upassword %(passwd)s,'
+ 'X in_state S WHERE S name "activated"',
+ {'login': unicode(login), 'passwd': login.encode('utf8')})
+ user = rset.get_entity(0, 0)
+ cursor.execute('SET X in_group G WHERE X eid %%(x)s, G name IN(%s)'
+ % ','.join(repr(g) for g in groups),
+ {'x': user.eid}, 'x')
+ user.clear_related_cache('in_group', 'subject')
+ self._orig_cnx.commit()
+ return user
+
+ def login(self, login):
+ if login == self.admlogin:
+ self.restore_connection()
+ else:
+ self.cnx = repo_connect(self.repo, unicode(login), str(login),
+ ConnectionProperties('inmemory'))
+ if login == self.vreg.config.anonymous_user()[0]:
+ self.cnx.anonymous_connection = True
+ return self.cnx
+
+ def restore_connection(self):
+ if not self.cnx is self._orig_cnx:
+ try:
+ self.cnx.close()
+ except ProgrammingError:
+ pass # already closed
+ self.cnx = self._orig_cnx
+
+ ############################################################################
+
+ def execute(self, rql, args=None, eidkey=None, req=None):
+ """executes <rql>, builds a resultset, and returns a couple (rset, req)
+ where req is a FakeRequest
+ """
+ req = req or self.create_request(rql=rql)
+ return self.cnx.cursor(req).execute(unicode(rql), args, eidkey)
+
+ def create_request(self, rql=None, **kwargs):
+ """executes <rql>, builds a resultset, and returns a
+ couple (rset, req) where req is a FakeRequest
+ """
+ if rql:
+ kwargs['rql'] = rql
+ req = self.requestcls(self.vreg, form=kwargs)
+ req.set_connection(self.cnx)
+ return req
+
+ def get_rset_and_req(self, rql, optional_args=None, args=None, eidkey=None):
+ """executes <rql>, builds a resultset, and returns a
+ couple (rset, req) where req is a FakeRequest
+ """
+ return (self.execute(rql, args, eidkey),
+ self.create_request(rql=rql, **optional_args or {}))
+
+ def check_view(self, rql, vid, optional_args, template='main'):
+ """checks if vreg.view() raises an exception in this environment
+
+ If any exception is raised in this method, it will be considered
+ as a TestFailure
+ """
+ return self.call_view(vid, rql,
+ template=template, optional_args=optional_args)
+
+ def call_view(self, vid, rql, template='main', optional_args=None):
+ """shortcut for self.vreg.view()"""
+ assert template
+ if optional_args is None:
+ optional_args = {}
+ optional_args['vid'] = vid
+ req = self.create_request(rql=rql, **optional_args)
+ return self.vreg.main_template(req, template)
+
+ def call_edit(self, req):
+ """shortcut for self.app.edit()"""
+ controller = self.app.select_controller('edit', req)
+ try:
+ controller.publish()
+ except Redirect:
+ result = 'success'
+ else:
+ raise Exception('edit should raise Redirect on success')
+ req.cnx.commit()
+ return result
+
+ def iter_possible_views(self, req, rset):
+ """returns a list of possible vids for <rql>"""
+ for view in self.vreg.possible_views(req, rset):
+ if view.category == 'startupview':
+ continue
+ yield view.id
+ if rset.rowcount == 1:
+ yield 'edition'
+
+ def iter_startup_views(self, req):
+ """returns the list of startup views"""
+ for view in self.vreg.possible_views(req, None):
+ if view.category != 'startupview':
+ continue
+ yield view.id
+
+ def iter_possible_actions(self, req, rset):
+ """returns a list of possible vids for <rql>"""
+ for action in self.vreg.possible_vobjects('actions', req, rset):
+ yield action
+
+class ExistingTestEnvironment(TestEnvironment):
+
+ def __init__(self, appid, sourcefile, verbose=False):
+ config = ApptestConfiguration(appid, sourcefile=sourcefile)
+ if verbose:
+ print "init test database ..."
+ source = config.sources()['system']
+ self.vreg = CubicWebRegistry(config)
+ repo, self.cnx = init_test_database(driver=source['db-driver'],
+ vreg=self.vreg)
+ if verbose:
+ print "init done"
+ self.app = CubicWebPublisher(config, vreg=self.vreg)
+ self.verbose = verbose
+ # this is done when the publisher is opening a connection
+ self.cnx.vreg = self.vreg
+ login = source['db-user']
+
+ def setup(self, config=None):
+ """config is passed by TestSuite but is ignored in this environment"""
+ cursor = self.cnx.cursor()
+ self.last_eid = cursor.execute('Any X WHERE X creation_date D ORDERBY D DESC LIMIT 1').rows[0][0]
+
+ def cleanup(self):
+ """cancel inserted elements during tests"""
+ cursor = self.cnx.cursor()
+ cursor.execute('DELETE Any X WHERE X eid > %(x)s', {'x' : self.last_eid}, eid_key='x')
+ print "cleaning done"
+ self.cnx.commit()
+