devtools/apptest.py
changeset 2774 a9a2dca5db20
parent 2771 8074dd88e21b
parent 2773 b2530e3e0afb
child 2777 8f7fcbe11879
equal deleted inserted replaced
2771:8074dd88e21b 2774:a9a2dca5db20
     1 """This module provides misc utilities to test instances
       
     2 
       
     3 :organization: Logilab
       
     4 :copyright: 2001-2009 LOGILAB S.A. (Paris, FRANCE), license is LGPL v2.
       
     5 :contact: http://www.logilab.fr/ -- mailto:contact@logilab.fr
       
     6 :license: GNU Lesser General Public License, v2.1 - http://www.gnu.org/licenses
       
     7 """
       
     8 __docformat__ = "restructuredtext en"
       
     9 
       
    10 from copy import deepcopy
       
    11 
       
    12 import simplejson
       
    13 
       
    14 from logilab.common.testlib import TestCase
       
    15 from logilab.common.pytest import nocoverage
       
    16 from logilab.common.umessage import message_from_string
       
    17 
       
    18 from logilab.common.deprecation import deprecated
       
    19 
       
    20 from cubicweb.devtools import init_test_database, TestServerConfiguration, ApptestConfiguration
       
    21 from cubicweb.devtools._apptest import TestEnvironment
       
    22 from cubicweb.devtools.fake import FakeRequest
       
    23 
       
    24 from cubicweb.dbapi import repo_connect, ConnectionProperties, ProgrammingError
       
    25 
       
    26 
       
    27 MAILBOX = []
       
    28 class Email:
       
    29     def __init__(self, recipients, msg):
       
    30         self.recipients = recipients
       
    31         self.msg = msg
       
    32 
       
    33     @property
       
    34     def message(self):
       
    35         return message_from_string(self.msg)
       
    36 
       
    37     @property
       
    38     def subject(self):
       
    39         return self.message.get('Subject')
       
    40 
       
    41     @property
       
    42     def content(self):
       
    43         return self.message.get_payload(decode=True)
       
    44 
       
    45     def __repr__(self):
       
    46         return '<Email to %s with subject %s>' % (','.join(self.recipients),
       
    47                                                   self.message.get('Subject'))
       
    48 
       
    49 class MockSMTP:
       
    50     def __init__(self, server, port):
       
    51         pass
       
    52     def close(self):
       
    53         pass
       
    54     def sendmail(self, helo_addr, recipients, msg):
       
    55         MAILBOX.append(Email(recipients, msg))
       
    56 
       
    57 from cubicweb import cwconfig
       
    58 cwconfig.SMTP = MockSMTP
       
    59 
       
    60 
       
    61 def get_versions(self, checkversions=False):
       
    62     """return the a dictionary containing cubes used by this instance
       
    63     as key with their version as value, including cubicweb version. This is a
       
    64     public method, not requiring a session id.
       
    65 
       
    66     replace Repository.get_versions by this method if you don't want versions
       
    67     checking
       
    68     """
       
    69     vcconf = {'cubicweb': self.config.cubicweb_version()}
       
    70     self.config.bootstrap_cubes()
       
    71     for pk in self.config.cubes():
       
    72         version = self.config.cube_version(pk)
       
    73         vcconf[pk] = version
       
    74     self.config._cubes = None
       
    75     return vcconf
       
    76 
       
    77 
       
    78 @property
       
    79 def late_binding_env(self):
       
    80     """builds TestEnvironment as late as possible"""
       
    81     if not hasattr(self, '_env'):
       
    82         self.__class__._env = TestEnvironment('data', configcls=self.configcls,
       
    83                                               requestcls=self.requestcls)
       
    84     return self._env
       
    85 
       
    86 
       
    87 class autoenv(type):
       
    88     """automatically set environment on EnvBasedTC subclasses if necessary
       
    89     """
       
    90     def __new__(mcs, name, bases, classdict):
       
    91         env = classdict.get('env')
       
    92         # try to find env in one of the base classes
       
    93         if env is None:
       
    94             for base in bases:
       
    95                 env = getattr(base, 'env', None)
       
    96                 if env is not None:
       
    97                     classdict['env'] = env
       
    98                     break
       
    99         if not classdict.get('__abstract__')  and not classdict.get('env'):
       
   100             classdict['env'] = late_binding_env
       
   101         return super(autoenv, mcs).__new__(mcs, name, bases, classdict)
       
   102 
       
   103 
       
   104 class EnvBasedTC(TestCase):
       
   105     """abstract class for test using an apptest environment
       
   106     """
       
   107     __metaclass__ = autoenv
       
   108     __abstract__ = True
       
   109     env = None
       
   110     configcls = ApptestConfiguration
       
   111     requestcls = FakeRequest
       
   112 
       
   113     # user / session management ###############################################
       
   114 
       
   115     def user(self, req=None):
       
   116         if req is None:
       
   117             req = self.env.create_request()
       
   118             return self.env.cnx.user(req)
       
   119         else:
       
   120             return req.user
       
   121 
       
   122     def create_user(self, *args, **kwargs):
       
   123         return self.env.create_user(*args, **kwargs)
       
   124 
       
   125     def login(self, login, password=None):
       
   126         return self.env.login(login, password)
       
   127 
       
   128     def restore_connection(self):
       
   129         self.env.restore_connection()
       
   130 
       
   131     # db api ##################################################################
       
   132 
       
   133     @nocoverage
       
   134     def cursor(self, req=None):
       
   135         return self.env.cnx.cursor(req or self.request())
       
   136 
       
   137     @nocoverage
       
   138     def execute(self, *args, **kwargs):
       
   139         return self.env.execute(*args, **kwargs)
       
   140 
       
   141     @nocoverage
       
   142     def commit(self):
       
   143         self.env.cnx.commit()
       
   144 
       
   145     @nocoverage
       
   146     def rollback(self):
       
   147         try:
       
   148             self.env.cnx.rollback()
       
   149         except ProgrammingError:
       
   150             pass
       
   151 
       
   152     # other utilities #########################################################
       
   153     def set_debug(self, debugmode):
       
   154         from cubicweb.server import set_debug
       
   155         set_debug(debugmode)
       
   156 
       
   157     @property
       
   158     def config(self):
       
   159         return self.vreg.config
       
   160 
       
   161     def session(self):
       
   162         """return current server side session (using default manager account)"""
       
   163         return self.env.repo._sessions[self.env.cnx.sessionid]
       
   164 
       
   165     def request(self, *args, **kwargs):
       
   166         """return a web interface request"""
       
   167         return self.env.create_request(*args, **kwargs)
       
   168 
       
   169     @nocoverage
       
   170     def rset_and_req(self, *args, **kwargs):
       
   171         return self.env.get_rset_and_req(*args, **kwargs)
       
   172 
       
   173     def entity(self, rql, args=None, eidkey=None, req=None):
       
   174         return self.execute(rql, args, eidkey, req=req).get_entity(0, 0)
       
   175 
       
   176     def etype_instance(self, etype, req=None):
       
   177         req = req or self.request()
       
   178         e = self.env.vreg['etypes'].etype_class(etype)(req)
       
   179         e.eid = None
       
   180         return e
       
   181 
       
   182     def add_entity(self, etype, **kwargs):
       
   183         rql = ['INSERT %s X' % etype]
       
   184 
       
   185         # dict for replacement in RQL Request
       
   186         rql_args = {}
       
   187 
       
   188         if kwargs: #
       
   189             rql.append(':')
       
   190             # dict to define new entities variables
       
   191             entities = {}
       
   192 
       
   193             # assignement part of the request
       
   194             sub_rql = []
       
   195             for key, value in kwargs.iteritems():
       
   196                 # entities
       
   197                 if hasattr(value, 'eid'):
       
   198                     new_value = "%s__" % key.upper()
       
   199 
       
   200                     entities[new_value] = value.eid
       
   201                     rql_args[new_value] = value.eid
       
   202 
       
   203                     sub_rql.append("X %s %s" % (key, new_value))
       
   204                 # final attributes
       
   205                 else:
       
   206                     sub_rql.append('X %s %%(%s)s' % (key, key))
       
   207                     rql_args[key] = value
       
   208             rql.append(', '.join(sub_rql))
       
   209 
       
   210 
       
   211             if entities:
       
   212                 rql.append('WHERE')
       
   213                 # WHERE part of the request (to link entity to they eid)
       
   214                 sub_rql = []
       
   215                 for key, value in entities.iteritems():
       
   216                     sub_rql.append("%s eid %%(%s)s" % (key, key))
       
   217                 rql.append(', '.join(sub_rql))
       
   218 
       
   219         rql = ' '.join(rql)
       
   220         rset = self.execute(rql, rql_args)
       
   221         return rset.get_entity(0, 0)
       
   222 
       
   223     def set_option(self, optname, value):
       
   224         self.vreg.config.global_set_option(optname, value)
       
   225 
       
   226     def pviews(self, req, rset):
       
   227         return sorted((a.id, a.__class__) for a in self.vreg['views'].possible_views(req, rset=rset))
       
   228 
       
   229     def pactions(self, req, rset, skipcategories=('addrelated', 'siteactions', 'useractions')):
       
   230         return [(a.id, a.__class__) for a in self.vreg['actions'].possible_vobjects(req, rset=rset)
       
   231                 if a.category not in skipcategories]
       
   232 
       
   233     def pactions_by_cats(self, req, rset, categories=('addrelated',)):
       
   234         return [(a.id, a.__class__) for a in self.vreg['actions'].possible_vobjects(req, rset=rset)
       
   235                 if a.category in categories]
       
   236 
       
   237     paddrelactions = deprecated()(pactions_by_cats)
       
   238 
       
   239     def pactionsdict(self, req, rset, skipcategories=('addrelated', 'siteactions', 'useractions')):
       
   240         res = {}
       
   241         for a in self.vreg['actions'].possible_vobjects(req, rset=rset):
       
   242             if a.category not in skipcategories:
       
   243                 res.setdefault(a.category, []).append(a.__class__)
       
   244         return res
       
   245 
       
   246 
       
   247     def remote_call(self, fname, *args):
       
   248         """remote call simulation"""
       
   249         dump = simplejson.dumps
       
   250         args = [dump(arg) for arg in args]
       
   251         req = self.request(fname=fname, pageid='123', arg=args)
       
   252         ctrl = self.vreg['controllers'].select('json', req)
       
   253         return ctrl.publish(), req
       
   254 
       
   255     # default test setup and teardown #########################################
       
   256 
       
   257     def setup_database(self):
       
   258         pass
       
   259 
       
   260     def setUp(self):
       
   261         self.restore_connection()
       
   262         session = self.session()
       
   263         #self.maxeid = self.execute('Any MAX(X)')
       
   264         session.set_pool()
       
   265         self.maxeid = session.system_sql('SELECT MAX(eid) FROM entities').fetchone()[0]
       
   266         self.app = self.env.app
       
   267         self.vreg = self.env.app.vreg
       
   268         self.schema = self.vreg.schema
       
   269         self.vreg.config.mode = 'test'
       
   270         # set default-dest-addrs to a dumb email address to avoid mailbox or
       
   271         # mail queue pollution
       
   272         self.set_option('default-dest-addrs', ['whatever'])
       
   273         self.setup_database()
       
   274         self.commit()
       
   275         MAILBOX[:] = [] # reset mailbox
       
   276 
       
   277     @nocoverage
       
   278     def tearDown(self):
       
   279         self.rollback()
       
   280         # self.env.restore_database()
       
   281         self.env.restore_connection()
       
   282         self.session().unsafe_execute('DELETE Any X WHERE X eid > %s' % self.maxeid)
       
   283         self.commit()
       
   284 
       
   285     # global resources accessors ###############################################
       
   286 
       
   287 # XXX
       
   288 try:
       
   289     from cubicweb.web import Redirect
       
   290     from urllib import unquote
       
   291 except ImportError:
       
   292     pass # cubicweb-web not installed
       
   293 else:
       
   294     class ControllerTC(EnvBasedTC):
       
   295         def setUp(self):
       
   296             super(ControllerTC, self).setUp()
       
   297             self.req = self.request()
       
   298             self.ctrl = self.vreg['controllers'].select('edit', self.req)
       
   299 
       
   300         def publish(self, req):
       
   301             assert req is self.ctrl.req
       
   302             try:
       
   303                 result = self.ctrl.publish()
       
   304                 req.cnx.commit()
       
   305             except Redirect:
       
   306                 req.cnx.commit()
       
   307                 raise
       
   308             return result
       
   309 
       
   310         def expect_redirect_publish(self, req=None):
       
   311             if req is not None:
       
   312                 self.ctrl = self.vreg['controllers'].select('edit', req)
       
   313             else:
       
   314                 req = self.req
       
   315             try:
       
   316                 res = self.publish(req)
       
   317             except Redirect, ex:
       
   318                 try:
       
   319                     path, params = ex.location.split('?', 1)
       
   320                 except:
       
   321                     path, params = ex.location, ""
       
   322                 req._url = path
       
   323                 cleanup = lambda p: (p[0], unquote(p[1]))
       
   324                 params = dict(cleanup(p.split('=', 1)) for p in params.split('&') if p)
       
   325                 return req.relative_path(False), params # path.rsplit('/', 1)[-1], params
       
   326             else:
       
   327                 self.fail('expected a Redirect exception')
       
   328 
       
   329 
       
   330 def make_late_binding_repo_property(attrname):
       
   331     @property
       
   332     def late_binding(self):
       
   333         """builds cnx as late as possible"""
       
   334         if not hasattr(self, attrname):
       
   335             # sets explicit test mode here to avoid autoreload
       
   336             from cubicweb.cwconfig import CubicWebConfiguration
       
   337             CubicWebConfiguration.mode = 'test'
       
   338             cls = self.__class__
       
   339             config = self.repo_config or TestServerConfiguration('data')
       
   340             cls._repo, cls._cnx = init_test_database('sqlite',  config=config)
       
   341         return getattr(self, attrname)
       
   342     return late_binding
       
   343 
       
   344 
       
   345 class autorepo(type):
       
   346     """automatically set repository on RepositoryBasedTC subclasses if necessary
       
   347     """
       
   348     def __new__(mcs, name, bases, classdict):
       
   349         repo = classdict.get('repo')
       
   350         # try to find repo in one of the base classes
       
   351         if repo is None:
       
   352             for base in bases:
       
   353                 repo = getattr(base, 'repo', None)
       
   354                 if repo is not None:
       
   355                     classdict['repo'] = repo
       
   356                     break
       
   357         if name != 'RepositoryBasedTC' and not classdict.get('repo'):
       
   358             classdict['repo'] = make_late_binding_repo_property('_repo')
       
   359             classdict['cnx'] = make_late_binding_repo_property('_cnx')
       
   360         return super(autorepo, mcs).__new__(mcs, name, bases, classdict)
       
   361 
       
   362 
       
   363 class RepositoryBasedTC(TestCase):
       
   364     """abstract class for test using direct repository connections
       
   365     """
       
   366     __metaclass__ = autorepo
       
   367     repo_config = None # set a particular config instance if necessary
       
   368 
       
   369     # user / session management ###############################################
       
   370 
       
   371     def create_user(self, user, groups=('users',), password=None, commit=True):
       
   372         if password is None:
       
   373             password = user
       
   374         eid = self.execute('INSERT CWUser X: X login %(x)s, X upassword %(p)s,'
       
   375                             'X in_state S WHERE S name "activated"',
       
   376                             {'x': unicode(user), 'p': password})[0][0]
       
   377         groups = ','.join(repr(group) for group in groups)
       
   378         self.execute('SET X in_group Y WHERE X eid %%(x)s, Y name IN (%s)' % groups,
       
   379                       {'x': eid})
       
   380         if commit:
       
   381             self.commit()
       
   382         self.session.reset_pool()
       
   383         return eid
       
   384 
       
   385     def login(self, login, password=None):
       
   386         cnx = repo_connect(self.repo, unicode(login), password or login,
       
   387                            ConnectionProperties('inmemory'))
       
   388         self.cnxs.append(cnx)
       
   389         return cnx
       
   390 
       
   391     def current_session(self):
       
   392         return self.repo._sessions[self.cnxs[-1].sessionid]
       
   393 
       
   394     def restore_connection(self):
       
   395         assert len(self.cnxs) == 1, self.cnxs
       
   396         cnx = self.cnxs.pop()
       
   397         try:
       
   398             cnx.close()
       
   399         except Exception, ex:
       
   400             print "exception occured while closing connection", ex
       
   401 
       
   402     # db api ##################################################################
       
   403 
       
   404     def execute(self, rql, args=None, eid_key=None):
       
   405         assert self.session.id == self.cnxid
       
   406         rset = self.__execute(self.cnxid, rql, args, eid_key)
       
   407         rset.vreg = self.vreg
       
   408         rset.req = self.session
       
   409         # call to set_pool is necessary to avoid pb when using
       
   410         # instance entities for convenience
       
   411         self.session.set_pool()
       
   412         return rset
       
   413 
       
   414     def commit(self):
       
   415         self.__commit(self.cnxid)
       
   416         self.session.set_pool()
       
   417 
       
   418     def rollback(self):
       
   419         self.__rollback(self.cnxid)
       
   420         self.session.set_pool()
       
   421 
       
   422     def close(self):
       
   423         self.__close(self.cnxid)
       
   424 
       
   425     # other utilities #########################################################
       
   426 
       
   427     def set_debug(self, debugmode):
       
   428         from cubicweb.server import set_debug
       
   429         set_debug(debugmode)
       
   430 
       
   431     def set_option(self, optname, value):
       
   432         self.vreg.config.global_set_option(optname, value)
       
   433 
       
   434     def add_entity(self, etype, **kwargs):
       
   435         restrictions = ', '.join('X %s %%(%s)s' % (key, key) for key in kwargs)
       
   436         rql = 'INSERT %s X' % etype
       
   437         if kwargs:
       
   438             rql += ': %s' % ', '.join('X %s %%(%s)s' % (key, key) for key in kwargs)
       
   439         rset = self.execute(rql, kwargs)
       
   440         return rset.get_entity(0, 0)
       
   441 
       
   442     def default_user_password(self):
       
   443         config = self.repo.config #TestConfiguration('data')
       
   444         user = unicode(config.sources()['system']['db-user'])
       
   445         passwd = config.sources()['system']['db-password']
       
   446         return user, passwd
       
   447 
       
   448     def close_connections(self):
       
   449         for cnx in self.cnxs:
       
   450             try:
       
   451                 cnx.rollback()
       
   452                 cnx.close()
       
   453             except:
       
   454                 continue
       
   455         self.cnxs = []
       
   456 
       
   457     pactions = EnvBasedTC.pactions.im_func
       
   458     pactionsdict = EnvBasedTC.pactionsdict.im_func
       
   459 
       
   460     # default test setup and teardown #########################################
       
   461 
       
   462     def _prepare(self):
       
   463         MAILBOX[:] = [] # reset mailbox
       
   464         if hasattr(self, 'cnxid'):
       
   465             return
       
   466         repo = self.repo
       
   467         self.__execute = repo.execute
       
   468         self.__commit = repo.commit
       
   469         self.__rollback = repo.rollback
       
   470         self.__close = repo.close
       
   471         self.cnxid = self.cnx.sessionid
       
   472         self.session = repo._sessions[self.cnxid]
       
   473         self.cnxs = []
       
   474         # reset caches, they may introduce bugs among tests
       
   475         repo._type_source_cache = {}
       
   476         repo._extid_cache = {}
       
   477         repo.querier._rql_cache = {}
       
   478         for source in repo.sources:
       
   479             source.reset_caches()
       
   480         for s in repo.sources:
       
   481             if hasattr(s, '_cache'):
       
   482                 s._cache = {}
       
   483 
       
   484     @property
       
   485     def config(self):
       
   486         return self.repo.config
       
   487 
       
   488     @property
       
   489     def vreg(self):
       
   490         return self.repo.vreg
       
   491 
       
   492     @property
       
   493     def schema(self):
       
   494         return self.repo.schema
       
   495 
       
   496     def setUp(self):
       
   497         self._prepare()
       
   498         self.session.set_pool()
       
   499         self.maxeid = self.session.system_sql('SELECT MAX(eid) FROM entities').fetchone()[0]
       
   500 
       
   501     def tearDown(self):
       
   502         self.close_connections()
       
   503         self.rollback()
       
   504         self.session.unsafe_execute('DELETE Any X WHERE X eid > %(x)s', {'x': self.maxeid})
       
   505         self.commit()
       
   506