--- a/devtools/testlib.py Tue Aug 11 17:04:59 2009 +0200
+++ b/devtools/testlib.py Tue Aug 11 17:13:32 2009 +0200
@@ -1,4 +1,4 @@
-"""this module contains base classes for web tests
+"""this module contains base classes and utilities for cubicweb tests
:organization: Logilab
:copyright: 2001-2009 LOGILAB S.A. (Paris, FRANCE), license is LGPL v2.
@@ -7,84 +7,516 @@
"""
__docformat__ = "restructuredtext en"
+import os
import sys
+import re
+from urllib import unquote
from math import log
-from logilab.common.debugger import Debugger
-from logilab.common.testlib import InnerTest
-from logilab.common.pytest import nocoverage
+import simplejson
+
+import yams.schema
-from cubicweb.devtools import VIEW_VALIDATORS
-from cubicweb.devtools.apptest import EnvBasedTC
-from cubicweb.devtools._apptest import unprotected_entities, SYSTEM_RELATIONS
-from cubicweb.devtools.htmlparser import DTDValidator, SaxOnlyValidator, HTMLValidator
-from cubicweb.devtools.fill import insert_entity_queries, make_relations_queries
+from logilab.common.testlib import TestCase, InnerTest
+from logilab.common.pytest import nocoverage, pause_tracing, resume_tracing
+from logilab.common.debugger import Debugger
+from logilab.common.umessage import message_from_string
+from logilab.common.decorators import cached, classproperty
+from logilab.common.deprecation import deprecated
-from cubicweb.sobjects.notification import NotificationView
-
-from cubicweb.vregistry import NoSelectableObject
+from cubicweb import NoSelectableObject, cwconfig, devtools, web, server
+from cubicweb.dbapi import repo_connect, ConnectionProperties, ProgrammingError
+from cubicweb.sobjects import notification
+from cubicweb.web import application
+from cubicweb.devtools import SYSTEM_ENTITIES, SYSTEM_RELATIONS, VIEW_VALIDATORS
+from cubicweb.devtools import fake, htmlparser
-## TODO ###############
-# creation tests: make sure an entity was actually created
-# Existing Test Environment
+# low-level utilities ##########################################################
class CubicWebDebugger(Debugger):
-
+ """special debugger class providing a 'view' function which saves some
+ html into a temporary file and open a web browser to examinate it.
+ """
def do_view(self, arg):
import webbrowser
data = self._getval(arg)
file('/tmp/toto.html', 'w').write(data)
webbrowser.open('file:///tmp/toto.html')
-def how_many_dict(schema, cursor, how_many, skip):
- """compute how many entities by type we need to be able to satisfy relations
- cardinality
- """
- # compute how many entities by type we need to be able to satisfy relation constraint
- relmap = {}
- for rschema in schema.relations():
- if rschema.is_final():
- continue
- for subj, obj in rschema.iter_rdefs():
- card = rschema.rproperty(subj, obj, 'cardinality')
- if card[0] in '1?' and len(rschema.subjects(obj)) == 1:
- relmap.setdefault((rschema, subj), []).append(str(obj))
- if card[1] in '1?' and len(rschema.objects(subj)) == 1:
- relmap.setdefault((rschema, obj), []).append(str(subj))
- unprotected = unprotected_entities(schema)
- for etype in skip:
- unprotected.add(etype)
- howmanydict = {}
- for etype in unprotected_entities(schema, strict=True):
- howmanydict[str(etype)] = cursor.execute('Any COUNT(X) WHERE X is %s' % etype)[0][0]
- if etype in unprotected:
- howmanydict[str(etype)] += how_many
- for (rschema, etype), targets in relmap.iteritems():
- # XXX should 1. check no cycle 2. propagate changes
- relfactor = sum(howmanydict[e] for e in targets)
- howmanydict[str(etype)] = max(relfactor, howmanydict[etype])
- return howmanydict
-
def line_context_filter(line_no, center, before=3, after=None):
"""return true if line are in context
- if after is None: after = before"""
+
+ if after is None: after = before
+ """
if after is None:
after = before
return center - before <= line_no <= center + after
-## base webtest class #########################################################
-VALMAP = {None: None, 'dtd': DTDValidator, 'xml': SaxOnlyValidator}
+
+def unprotected_entities(schema, strict=False):
+ """returned a set of each non final entity type, excluding "system" entities
+ (eg CWGroup, CWUser...)
+ """
+ if strict:
+ protected_entities = yams.schema.BASE_TYPES
+ else:
+ protected_entities = yams.schema.BASE_TYPES.union(SYSTEM_ENTITIES)
+ return set(schema.entities()) - protected_entities
+
+
+def get_versions(self, checkversions=False):
+ """return the a dictionary containing cubes used by this instance
+ as key with their version as value, including cubicweb version. This is a
+ public method, not requiring a session id.
+
+ replace Repository.get_versions by this method if you don't want versions
+ checking
+ """
+ vcconf = {'cubicweb': self.config.cubicweb_version()}
+ self.config.bootstrap_cubes()
+ for pk in self.config.cubes():
+ version = self.config.cube_version(pk)
+ vcconf[pk] = version
+ self.config._cubes = None
+ return vcconf
+
+
+def refresh_repo(repo):
+ devtools.reset_test_database(repo.config)
+ for pool in repo.pools:
+ pool.reconnect()
+ repo._type_source_cache = {}
+ repo._extid_cache = {}
+ repo.querier._rql_cache = {}
+ for source in repo.sources:
+ source.reset_caches()
+
+
+# email handling, to test emails sent by an application ########################
+
+MAILBOX = []
+
+class Email:
+ """you'll get instances of Email into MAILBOX during tests that trigger
+ some notification.
+
+ * `msg` is the original message object
+
+ * `recipients` is a list of email address which are the recipients of this
+ message
+ """
+ def __init__(self, recipients, msg):
+ self.recipients = recipients
+ self.msg = msg
+
+ @property
+ def message(self):
+ return message_from_string(self.msg)
+
+ @property
+ def subject(self):
+ return self.message.get('Subject')
+
+ @property
+ def content(self):
+ return self.message.get_payload(decode=True)
+
+ def __repr__(self):
+ return '<Email to %s with subject %s>' % (','.join(self.recipients),
+ self.message.get('Subject'))
+
+# the trick to get email into MAILBOX instead of actually sent: monkey patch
+# cwconfig.SMTP object
+class MockSMTP:
+ def __init__(self, server, port):
+ pass
+ def close(self):
+ pass
+ def sendmail(self, helo_addr, recipients, msg):
+ MAILBOX.append(Email(recipients, msg))
+
+cwconfig.SMTP = MockSMTP
+
+
+# base class for cubicweb tests requiring a full cw environments ###############
+
+class CubicWebTC(TestCase):
+ """abstract class for test using an apptest environment
+
+ attributes:
+ `vreg`, the vregistry
+ `schema`, self.vreg.schema
+ `config`, cubicweb configuration
+ `cnx`, dbapi connection to the repository using an admin user
+ `session`, server side session associated to `cnx`
+ `app`, the cubicweb publisher (for web testing)
+ `repo`, the repository object
+
+ `admlogin`, login of the admin user
+ `admpassword`, password of the admin user
+
+ """
+ appid = 'data'
+ configcls = devtools.ApptestConfiguration
+
+ @classproperty
+ def config(cls):
+ """return the configuration object. Configuration is cached on the test
+ class.
+ """
+ try:
+ return cls.__dict__['_config']
+ except KeyError:
+ config = cls._config = cls.configcls(cls.appid)
+ config.mode = 'test'
+ return config
+
+ @classmethod
+ def init_config(cls, config):
+ """configuration initialization hooks. You may want to override this."""
+ source = config.sources()['system']
+ cls.admlogin = unicode(source['db-user'])
+ cls.admpassword = source['db-password']
+ # uncomment the line below if you want rql queries to be logged
+ #config.global_set_option('query-log-file', '/tmp/test_rql_log.' + `os.getpid()`)
+ config.global_set_option('log-file', None)
+ # set default-dest-addrs to a dumb email address to avoid mailbox or
+ # mail queue pollution
+ config.global_set_option('default-dest-addrs', ['whatever'])
+ try:
+ send_to = '%s@logilab.fr' % os.getlogin()
+ except OSError:
+ send_to = '%s@logilab.fr' % (os.environ.get('USER')
+ or os.environ.get('USERNAME')
+ or os.environ.get('LOGNAME'))
+ config.global_set_option('sender-addr', send_to)
+ config.global_set_option('default-dest-addrs', send_to)
+ config.global_set_option('sender-name', 'cubicweb-test')
+ config.global_set_option('sender-addr', 'cubicweb-test@logilab.fr')
+ # web resources
+ config.global_set_option('base-url', devtools.BASE_URL)
+ try:
+ config.global_set_option('embed-allowed', re.compile('.*'))
+ except: # not in server only configuration
+ pass
+
+ @classmethod
+ def _init_repo(cls):
+ """init the repository and connection to it.
+
+ Repository and connection are cached on the test class. Once
+ initialized, we simply reset connections and repository caches.
+ """
+ if not 'repo' in cls.__dict__:
+ cls._build_repo()
+ else:
+ cls.cnx.rollback()
+ cls._refresh_repo()
+
+ @classmethod
+ def _build_repo(cls):
+ cls.repo, cls.cnx = devtools.init_test_database(config=cls.config)
+ cls.init_config(cls.config)
+ cls.vreg = cls.repo.vreg
+ cls._orig_cnx = cls.cnx
+ cls.config.repository = lambda x=None: cls.repo
+ # necessary for authentication tests
+ cls.cnx.login = cls.admlogin
+ cls.cnx.password = cls.admpassword
+
+ @classmethod
+ def _refresh_repo(cls):
+ refresh_repo(cls.repo)
+
+ # global resources accessors ###############################################
+
+ @property
+ def schema(self):
+ """return the application schema"""
+ return self.vreg.schema
+
+ @property
+ def session(self):
+ """return current server side session (using default manager account)"""
+ return self.repo._sessions[self.cnx.sessionid]
+
+ @property
+ def adminsession(self):
+ """return current server side session (using default manager account)"""
+ return self.repo._sessions[self._orig_cnx.sessionid]
+
+ def set_option(self, optname, value):
+ self.config.global_set_option(optname, value)
+
+ def set_debug(self, debugmode):
+ server.set_debug(debugmode)
+
+ # default test setup and teardown #########################################
+
+ def setUp(self):
+ pause_tracing()
+ self._init_repo()
+ resume_tracing()
+ self.setup_database()
+ self.commit()
+ MAILBOX[:] = [] # reset mailbox
+
+ def setup_database(self):
+ """add your database setup code by overriding this method"""
+
+ # user / session management ###############################################
+
+ def user(self, req=None):
+ """return the application schema"""
+ if req is None:
+ req = self.request()
+ return self.cnx.user(req)
+ else:
+ return req.user
-class WebTest(EnvBasedTC):
- """base class for web tests"""
- __abstract__ = True
+ def create_user(self, login, groups=('users',), password=None, req=None,
+ commit=True):
+ """create and return a new user entity"""
+ if password is None:
+ password = login.encode('utf8')
+ cursor = self._orig_cnx.cursor(req or self.request())
+ rset = cursor.execute('INSERT CWUser X: X login %(login)s, X upassword %(passwd)s,'
+ 'X in_state S WHERE S name "activated"',
+ {'login': unicode(login), 'passwd': password})
+ user = rset.get_entity(0, 0)
+ cursor.execute('SET X in_group G WHERE X eid %%(x)s, G name IN(%s)'
+ % ','.join(repr(g) for g in groups),
+ {'x': user.eid}, 'x')
+ user.clear_related_cache('in_group', 'subject')
+ if commit:
+ self._orig_cnx.commit()
+ return user
+
+ def login(self, login, password=None):
+ """return a connection for the given login/password"""
+ if login == self.admlogin:
+ self.restore_connection()
+ else:
+ self.cnx = repo_connect(self.repo, unicode(login),
+ password or str(login),
+ ConnectionProperties('inmemory'))
+ if login == self.vreg.config.anonymous_user()[0]:
+ self.cnx.anonymous_connection = True
+ return self.cnx
+
+ def restore_connection(self):
+ if not self.cnx is self._orig_cnx:
+ try:
+ self.cnx.close()
+ except ProgrammingError:
+ pass # already closed
+ self.cnx = self._orig_cnx
+
+ # db api ##################################################################
+
+ @nocoverage
+ def cursor(self, req=None):
+ return self.cnx.cursor(req or self.request())
+
+ @nocoverage
+ def execute(self, rql, args=None, eidkey=None, req=None):
+ """executes <rql>, builds a resultset, and returns a couple (rset, req)
+ where req is a FakeRequest
+ """
+ req = req or self.request(rql=rql)
+ return self.cnx.cursor(req).execute(unicode(rql), args, eidkey)
+
+ @nocoverage
+ def commit(self):
+ self.cnx.commit()
+
+ @nocoverage
+ def rollback(self):
+ try:
+ self.cnx.rollback()
+ except ProgrammingError:
+ pass
+
+ # # server side db api #######################################################
+
+ def sexecute(self, rql, args=None, eid_key=None):
+ self.session.set_pool()
+ return self.session.execute(rql, args, eid_key)
+
+ # def scommit(self):
+ # self.repo.commit(self.cnxid)
+ # self.session.set_pool()
+
+ # def srollback(self):
+ # self.repo.rollback(self.cnxid)
+ # self.session.set_pool()
+
+ # def sclose(self):
+ # self.repo.close(self.cnxid)
+
+ # other utilities #########################################################
+
+ def entity(self, rql, args=None, eidkey=None, req=None):
+ return self.execute(rql, args, eidkey, req=req).get_entity(0, 0)
+
+ def add_entity(self, etype, req=None, **kwargs):
+ rql = ['INSERT %s X' % etype]
+ # dict for replacement in RQL Request
+ args = {}
+ if kwargs:
+ rql.append(':')
+ # dict to define new entities variables
+ entities = {}
+ # assignement part of the request
+ sub_rql = []
+ for key, value in kwargs.iteritems():
+ # entities
+ if hasattr(value, 'eid'):
+ new_value = "%s__" % key.upper()
+ entities[new_value] = value.eid
+ args[new_value] = value.eid
+
+ sub_rql.append("X %s %s" % (key, new_value))
+ # final attributes
+ else:
+ sub_rql.append('X %s %%(%s)s' % (key, key))
+ args[key] = value
+ rql.append(', '.join(sub_rql))
+ if entities:
+ rql.append('WHERE')
+ # WHERE part of the request (to link entity to they eid)
+ sub_rql = []
+ for key, value in entities.iteritems():
+ sub_rql.append("%s eid %%(%s)s" % (key, key))
+ rql.append(', '.join(sub_rql))
+ return self.execute(' '.join(rql), args, req=req).get_entity(0, 0)
+
+ # vregistry inspection utilities ###########################################
- pdbclass = CubicWebDebugger
- # this is a hook to be able to define a list of rql queries
- # that are application dependent and cannot be guessed automatically
- application_rql = []
+ def pviews(self, req, rset):
+ return sorted((a.id, a.__class__)
+ for a in self.vreg['views'].possible_views(req, rset=rset))
+
+ def pactions(self, req, rset,
+ skipcategories=('addrelated', 'siteactions', 'useractions')):
+ return [(a.id, a.__class__)
+ for a in self.vreg['actions'].possible_vobjects(req, rset=rset)
+ if a.category not in skipcategories]
+
+ def pactions_by_cats(self, req, rset, categories=('addrelated',)):
+ return [(a.id, a.__class__)
+ for a in self.vreg['actions'].possible_vobjects(req, rset=rset)
+ if a.category in categories]
+
+ def pactionsdict(self, req, rset,
+ skipcategories=('addrelated', 'siteactions', 'useractions')):
+ res = {}
+ for a in self.vreg['actions'].possible_vobjects(req, rset=rset):
+ if a.category not in skipcategories:
+ res.setdefault(a.category, []).append(a.__class__)
+ return res
+
+ def list_views_for(self, rset):
+ """returns the list of views that can be applied on `rset`"""
+ req = rset.req
+ only_once_vids = ('primary', 'secondary', 'text')
+ req.data['ex'] = ValueError("whatever")
+ viewsvreg = self.vreg['views']
+ for vid, views in viewsvreg.items():
+ if vid[0] == '_':
+ continue
+ if rset.rowcount > 1 and vid in only_once_vids:
+ continue
+ views = [view for view in views
+ if view.category != 'startupview'
+ and not issubclass(view, notification.NotificationView)]
+ if views:
+ try:
+ view = viewsvreg.select_best(views, req, rset=rset)
+ if view.linkable():
+ yield view
+ else:
+ not_selected(self.vreg, view)
+ # else the view is expected to be used as subview and should
+ # not be tested directly
+ except NoSelectableObject:
+ continue
+
+ def list_actions_for(self, rset):
+ """returns the list of actions that can be applied on `rset`"""
+ req = rset.req
+ for action in self.vreg['actions'].possible_objects(req, rset=rset):
+ yield action
+
+ def list_boxes_for(self, rset):
+ """returns the list of boxes that can be applied on `rset`"""
+ req = rset.req
+ for box in self.vreg['boxes'].possible_objects(req, rset=rset):
+ yield box
+
+ def list_startup_views(self):
+ """returns the list of startup views"""
+ req = self.request()
+ for view in self.vreg['views'].possible_views(req, None):
+ if view.category == 'startupview':
+ yield view.id
+ else:
+ not_selected(self.vreg, view)
+
+ # web ui testing utilities #################################################
+
+ @property
+ @cached
+ def app(self):
+ """return a cubicweb publisher"""
+ return application.CubicWebPublisher(self.config, vreg=self.vreg)
+
+ requestcls = fake.FakeRequest
+ def request(self, *args, **kwargs):
+ """return a web ui request"""
+ req = self.requestcls(self.vreg, form=kwargs)
+ req.set_connection(self.cnx)
+ return req
+
+ def remote_call(self, fname, *args):
+ """remote json call simulation"""
+ dump = simplejson.dumps
+ args = [dump(arg) for arg in args]
+ req = self.request(fname=fname, pageid='123', arg=args)
+ ctrl = self.vreg['controllers'].select('json', req)
+ return ctrl.publish(), req
+
+ def publish(self, req):
+ """call the publish method of the edit controller"""
+ ctrl = self.vreg['controllers'].select('edit', req)
+ try:
+ result = ctrl.publish()
+ req.cnx.commit()
+ except web.Redirect:
+ req.cnx.commit()
+ raise
+ return result
+
+ def expect_redirect_publish(self, req):
+ """call the publish method of the edit controller, expecting to get a
+ Redirect exception."""
+ try:
+ self.publish(req)
+ except web.Redirect, ex:
+ try:
+ path, params = ex.location.split('?', 1)
+ except:
+ path, params = ex.location, ""
+ req._url = path
+ cleanup = lambda p: (p[0], unquote(p[1]))
+ params = dict(cleanup(p.split('=', 1)) for p in params.split('&') if p)
+ return req.relative_path(False), params # path.rsplit('/', 1)[-1], params
+ else:
+ self.fail('expected a Redirect exception')
+
+ # content validation #######################################################
# validators are used to validate (XML, DTD, whatever) view's content
# validators availables are :
@@ -99,8 +531,8 @@
# snippets
#'text/html': DTDValidator,
#'application/xhtml+xml': DTDValidator,
- 'application/xml': SaxOnlyValidator,
- 'text/xml': SaxOnlyValidator,
+ 'application/xml': htmlparser.SaxOnlyValidator,
+ 'text/xml': htmlparser.SaxOnlyValidator,
'text/plain': None,
'text/comma-separated-values': None,
'text/x-vcard': None,
@@ -109,68 +541,9 @@
'image/png': None,
}
# maps vid : validator name (override content_type_validators)
- vid_validators = dict((vid, VALMAP[valkey])
+ vid_validators = dict((vid, htmlparser.VALMAP[valkey])
for vid, valkey in VIEW_VALIDATORS.iteritems())
- no_auto_populate = ()
- ignored_relations = ()
-
- def custom_populate(self, how_many, cursor):
- pass
-
- def post_populate(self, cursor):
- pass
-
- @nocoverage
- def auto_populate(self, how_many):
- """this method populates the database with `how_many` entities
- of each possible type. It also inserts random relations between them
- """
- cu = self.cursor()
- self.custom_populate(how_many, cu)
- vreg = self.vreg
- howmanydict = how_many_dict(self.schema, cu, how_many, self.no_auto_populate)
- for etype in unprotected_entities(self.schema):
- if etype in self.no_auto_populate:
- continue
- nb = howmanydict.get(etype, how_many)
- for rql, args in insert_entity_queries(etype, self.schema, vreg, nb):
- cu.execute(rql, args)
- edict = {}
- for etype in unprotected_entities(self.schema, strict=True):
- rset = cu.execute('%s X' % etype)
- edict[str(etype)] = set(row[0] for row in rset.rows)
- existingrels = {}
- ignored_relations = SYSTEM_RELATIONS + self.ignored_relations
- for rschema in self.schema.relations():
- if rschema.is_final() or rschema in ignored_relations:
- continue
- rset = cu.execute('DISTINCT Any X,Y WHERE X %s Y' % rschema)
- existingrels.setdefault(rschema.type, set()).update((x, y) for x, y in rset)
- q = make_relations_queries(self.schema, edict, cu, ignored_relations,
- existingrels=existingrels)
- for rql, args in q:
- cu.execute(rql, args)
- self.post_populate(cu)
- self.commit()
-
- @nocoverage
- def _check_html(self, output, view, template='main-template'):
- """raises an exception if the HTML is invalid"""
- try:
- validatorclass = self.vid_validators[view.id]
- except KeyError:
- if template is None:
- default_validator = HTMLValidator
- else:
- default_validator = DTDValidator
- validatorclass = self.content_type_validators.get(view.content_type,
- default_validator)
- if validatorclass is None:
- return None
- validator = validatorclass()
- return validator.parse_string(output.strip())
-
def view(self, vid, rset=None, req=None, template='main-template',
**kwargs):
@@ -244,9 +617,145 @@
raise AssertionError, msg, tcbk
+ @nocoverage
+ def _check_html(self, output, view, template='main-template'):
+ """raises an exception if the HTML is invalid"""
+ try:
+ validatorclass = self.vid_validators[view.id]
+ except KeyError:
+ if template is None:
+ default_validator = htmlparser.HTMLValidator
+ else:
+ default_validator = htmlparser.DTDValidator
+ validatorclass = self.content_type_validators.get(view.content_type,
+ default_validator)
+ if validatorclass is None:
+ return None
+ validator = validatorclass()
+ return validator.parse_string(output.strip())
+
+ # deprecated ###############################################################
+
+ @deprecated('use self.vreg["etypes"].etype_class(etype)(self.request())')
+ def etype_instance(self, etype, req=None):
+ req = req or self.request()
+ e = self.vreg['etypes'].etype_class(etype)(req)
+ e.eid = None
+ return e
+
+ @nocoverage
+ @deprecated('use req = self.request(); rset = req.execute()')
+ def rset_and_req(self, rql, optional_args=None, args=None, eidkey=None):
+ """executes <rql>, builds a resultset, and returns a
+ couple (rset, req) where req is a FakeRequest
+ """
+ return (self.execute(rql, args, eidkey),
+ self.request(rql=rql, **optional_args or {}))
+
+
+# auto-populating test classes and utilities ###################################
+
+from cubicweb.devtools.fill import insert_entity_queries, make_relations_queries
+
+def how_many_dict(schema, cursor, how_many, skip):
+ """compute how many entities by type we need to be able to satisfy relations
+ cardinality
+ """
+ # compute how many entities by type we need to be able to satisfy relation constraint
+ relmap = {}
+ for rschema in schema.relations():
+ if rschema.is_final():
+ continue
+ for subj, obj in rschema.iter_rdefs():
+ card = rschema.rproperty(subj, obj, 'cardinality')
+ if card[0] in '1?' and len(rschema.subjects(obj)) == 1:
+ relmap.setdefault((rschema, subj), []).append(str(obj))
+ if card[1] in '1?' and len(rschema.objects(subj)) == 1:
+ relmap.setdefault((rschema, obj), []).append(str(subj))
+ unprotected = unprotected_entities(schema)
+ for etype in skip:
+ unprotected.add(etype)
+ howmanydict = {}
+ for etype in unprotected_entities(schema, strict=True):
+ howmanydict[str(etype)] = cursor.execute('Any COUNT(X) WHERE X is %s' % etype)[0][0]
+ if etype in unprotected:
+ howmanydict[str(etype)] += how_many
+ for (rschema, etype), targets in relmap.iteritems():
+ # XXX should 1. check no cycle 2. propagate changes
+ relfactor = sum(howmanydict[e] for e in targets)
+ howmanydict[str(etype)] = max(relfactor, howmanydict[etype])
+ return howmanydict
+
+
+class AutoPopulateTest(CubicWebTC):
+ """base class for test with auto-populating of the database"""
+ __abstract__ = True
+
+ pdbclass = CubicWebDebugger
+ # this is a hook to be able to define a list of rql queries
+ # that are application dependent and cannot be guessed automatically
+ application_rql = []
+
+ no_auto_populate = ()
+ ignored_relations = ()
+
def to_test_etypes(self):
return unprotected_entities(self.schema, strict=True)
+ def custom_populate(self, how_many, cursor):
+ pass
+
+ def post_populate(self, cursor):
+ pass
+
+ @nocoverage
+ def auto_populate(self, how_many):
+ """this method populates the database with `how_many` entities
+ of each possible type. It also inserts random relations between them
+ """
+ cu = self.cursor()
+ self.custom_populate(how_many, cu)
+ vreg = self.vreg
+ howmanydict = how_many_dict(self.schema, cu, how_many, self.no_auto_populate)
+ for etype in unprotected_entities(self.schema):
+ if etype in self.no_auto_populate:
+ continue
+ nb = howmanydict.get(etype, how_many)
+ for rql, args in insert_entity_queries(etype, self.schema, vreg, nb):
+ cu.execute(rql, args)
+ edict = {}
+ for etype in unprotected_entities(self.schema, strict=True):
+ rset = cu.execute('%s X' % etype)
+ edict[str(etype)] = set(row[0] for row in rset.rows)
+ existingrels = {}
+ ignored_relations = SYSTEM_RELATIONS | set(self.ignored_relations)
+ for rschema in self.schema.relations():
+ if rschema.is_final() or rschema in ignored_relations:
+ continue
+ rset = cu.execute('DISTINCT Any X,Y WHERE X %s Y' % rschema)
+ existingrels.setdefault(rschema.type, set()).update((x, y) for x, y in rset)
+ q = make_relations_queries(self.schema, edict, cu, ignored_relations,
+ existingrels=existingrels)
+ for rql, args in q:
+ cu.execute(rql, args)
+ self.post_populate(cu)
+ self.commit()
+
+ def iter_individual_rsets(self, etypes=None, limit=None):
+ etypes = etypes or self.to_test_etypes()
+ for etype in etypes:
+ if limit:
+ rql = 'Any X LIMIT %s WHERE X is %s' % (limit, etype)
+ else:
+ rql = 'Any X WHERE X is %s' % etype
+ rset = self.execute(rql)
+ for row in xrange(len(rset)):
+ if limit and row > limit:
+ break
+ # XXX iirk
+ rset2 = rset.limit(limit=1, offset=row)
+ yield rset2
+
def iter_automatic_rsets(self, limit=10):
"""generates basic resultsets for each entity type"""
etypes = self.to_test_etypes()
@@ -267,54 +776,6 @@
for rql in self.application_rql:
yield self.execute(rql)
-
- def list_views_for(self, rset):
- """returns the list of views that can be applied on `rset`"""
- req = rset.req
- only_once_vids = ('primary', 'secondary', 'text')
- req.data['ex'] = ValueError("whatever")
- viewsvreg = self.vreg['views']
- for vid, views in viewsvreg.items():
- if vid[0] == '_':
- continue
- if rset.rowcount > 1 and vid in only_once_vids:
- continue
- views = [view for view in views
- if view.category != 'startupview'
- and not issubclass(view, NotificationView)]
- if views:
- try:
- view = viewsvreg.select_best(views, req, rset=rset)
- if view.linkable():
- yield view
- else:
- not_selected(self.vreg, view)
- # else the view is expected to be used as subview and should
- # not be tested directly
- except NoSelectableObject:
- continue
-
- def list_actions_for(self, rset):
- """returns the list of actions that can be applied on `rset`"""
- req = rset.req
- for action in self.vreg['actions'].possible_objects(req, rset=rset):
- yield action
-
- def list_boxes_for(self, rset):
- """returns the list of boxes that can be applied on `rset`"""
- req = rset.req
- for box in self.vreg['boxes'].possible_objects(req, rset=rset):
- yield box
-
- def list_startup_views(self):
- """returns the list of startup views"""
- req = self.request()
- for view in self.vreg['views'].possible_views(req, None):
- if view.category == 'startupview':
- yield view.id
- else:
- not_selected(self.vreg, view)
-
def _test_everything_for(self, rset):
"""this method tries to find everything that can be tested
for `rset` and yields a callable test (as needed in generative tests)
@@ -342,8 +803,16 @@
return '%s_%s_%s' % ('_'.join(rset.column_types(0)), objid, objtype)
-class AutomaticWebTest(WebTest):
+# concrete class for automated application testing ############################
+
+class AutomaticWebTest(AutoPopulateTest):
"""import this if you wan automatic tests to be ran"""
+ def setUp(self):
+ AutoPopulateTest.setUp(self)
+ # access to self.app for proper initialization of the authentication
+ # machinery (else some views may fail)
+ self.app
+
## one each
def test_one_each_config(self):
self.auto_populate(1)
@@ -365,17 +834,7 @@
yield self.view, vid, None, req
-class RealDBTest(WebTest):
-
- def iter_individual_rsets(self, etypes=None, limit=None):
- etypes = etypes or unprotected_entities(self.schema, strict=True)
- for etype in etypes:
- rset = self.execute('Any X WHERE X is %s' % etype)
- for row in xrange(len(rset)):
- if limit and row > limit:
- break
- rset2 = rset.limit(limit=1, offset=row)
- yield rset2
+# registry instrumentization ###################################################
def not_selected(vreg, appobject):
try:
@@ -383,10 +842,11 @@
except (KeyError, AttributeError):
pass
+
def vreg_instrumentize(testclass):
+ # XXX broken
from cubicweb.devtools.apptest import TestEnvironment
- env = testclass._env = TestEnvironment('data', configcls=testclass.configcls,
- requestcls=testclass.requestcls)
+ env = testclass._env = TestEnvironment('data', configcls=testclass.configcls)
for reg in env.vreg.values():
reg._selected = {}
try:
@@ -405,6 +865,7 @@
reg.__class__.select_best = instr_select_best
reg.__class__.__orig_select_best = orig_select_best
+
def print_untested_objects(testclass, skipregs=('hooks', 'etypes')):
for regname, reg in testclass._env.vreg.iteritems():
if regname in skipregs: