devtools/apptest.py
changeset 0 b97547f5f1fa
child 17 62ce3e6126e0
equal deleted inserted replaced
-1:000000000000 0:b97547f5f1fa
       
     1 """This module provides misc utilities to test applications
       
     2 
       
     3 :organization: Logilab
       
     4 :copyright: 2001-2008 LOGILAB S.A. (Paris, FRANCE), all rights reserved.
       
     5 :contact: http://www.logilab.fr/ -- mailto:contact@logilab.fr
       
     6 """
       
     7 __docformat__ = "restructuredtext en"
       
     8 
       
     9 from copy import deepcopy
       
    10 
       
    11 import simplejson
       
    12 
       
    13 from logilab.common.testlib import TestCase
       
    14 from logilab.common.pytest import nocoverage
       
    15 from logilab.common.umessage import message_from_string
       
    16 
       
    17 from cubicweb.devtools import init_test_database, TestServerConfiguration, ApptestConfiguration
       
    18 from cubicweb.devtools._apptest import TestEnvironment
       
    19 from cubicweb.devtools.fake import FakeRequest
       
    20 
       
    21 from cubicweb.dbapi import repo_connect, ConnectionProperties, ProgrammingError
       
    22 
       
    23 
       
    24 MAILBOX = []
       
    25 class Email:
       
    26     def __init__(self, recipients, msg):
       
    27         self.recipients = recipients
       
    28         self.msg = msg
       
    29 
       
    30     @property
       
    31     def message(self):
       
    32         return message_from_string(self.msg)
       
    33     
       
    34     def __repr__(self):
       
    35         return '<Email to %s with subject %s>' % (','.join(self.recipients),
       
    36                                                   self.message.get('Subject'))
       
    37     
       
    38 class MockSMTP:
       
    39     def __init__(self, server, port):
       
    40         pass
       
    41     def close(self):
       
    42         pass
       
    43     def sendmail(self, helo_addr, recipients, msg):
       
    44         MAILBOX.append(Email(recipients, msg))
       
    45 
       
    46 from cubicweb.server import hookhelper
       
    47 hookhelper.SMTP = MockSMTP
       
    48 
       
    49 
       
    50 def get_versions(self, checkversions=False):
       
    51     """return the a dictionary containing cubes used by this application
       
    52     as key with their version as value, including cubicweb version. This is a
       
    53     public method, not requiring a session id.
       
    54 
       
    55     replace Repository.get_versions by this method if you don't want versions
       
    56     checking
       
    57     """
       
    58     vcconf = {'cubicweb': self.config.cubicweb_version()}
       
    59     self.config.bootstrap_cubes()
       
    60     for pk in self.config.cubes():
       
    61         version = self.config.template_version(pk)
       
    62         vcconf[pk] = version
       
    63     self.config._cubes = None
       
    64     return vcconf
       
    65 
       
    66 
       
    67 @property
       
    68 def late_binding_env(self):
       
    69     """builds TestEnvironment as late as possible"""
       
    70     if not hasattr(self, '_env'):
       
    71         self.__class__._env = TestEnvironment('data', configcls=self.configcls,
       
    72                                               requestcls=self.requestcls)
       
    73     return self._env
       
    74 
       
    75 
       
    76 class autoenv(type):
       
    77     """automatically set environment on EnvBasedTC subclasses if necessary
       
    78     """
       
    79     def __new__(mcs, name, bases, classdict):
       
    80         env = classdict.get('env')
       
    81         # try to find env in one of the base classes
       
    82         if env is None:
       
    83             for base in bases:
       
    84                 env = getattr(base, 'env', None)
       
    85                 if env is not None:
       
    86                     classdict['env'] = env
       
    87                     break
       
    88         if not classdict.get('__abstract__')  and not classdict.get('env'):
       
    89             classdict['env'] = late_binding_env
       
    90         return super(autoenv, mcs).__new__(mcs, name, bases, classdict)
       
    91 
       
    92 
       
    93 class EnvBasedTC(TestCase):
       
    94     """abstract class for test using an apptest environment
       
    95     """
       
    96     __metaclass__ = autoenv
       
    97     __abstract__ = True
       
    98     env = None
       
    99     configcls = ApptestConfiguration
       
   100     requestcls = FakeRequest
       
   101     
       
   102     # user / session management ###############################################
       
   103 
       
   104     def user(self, req=None):
       
   105         if req is None:
       
   106             req = self.env.create_request()
       
   107             return self.env.cnx.user(req)
       
   108         else:
       
   109             return req.user
       
   110 
       
   111     def create_user(self, *args, **kwargs):
       
   112         return self.env.create_user(*args, **kwargs)
       
   113 
       
   114     def login(self, login):
       
   115         return self.env.login(login)
       
   116 
       
   117     def restore_connection(self):
       
   118         self.env.restore_connection()
       
   119         
       
   120     # db api ##################################################################
       
   121 
       
   122     @nocoverage
       
   123     def cursor(self, req=None):
       
   124         return self.env.cnx.cursor(req or self.request())
       
   125     
       
   126     @nocoverage
       
   127     def execute(self, *args, **kwargs):
       
   128         return self.env.execute(*args, **kwargs)
       
   129 
       
   130     @nocoverage
       
   131     def commit(self):
       
   132         self.env.cnx.commit()
       
   133     
       
   134     @nocoverage
       
   135     def rollback(self):
       
   136         try:
       
   137             self.env.cnx.rollback()
       
   138         except ProgrammingError:
       
   139             pass
       
   140         
       
   141     # other utilities #########################################################
       
   142     def set_debug(self, debugmode):
       
   143         from cubicweb.server import set_debug
       
   144         set_debug(debugmode)
       
   145     
       
   146     @property
       
   147     def config(self):
       
   148         return self.vreg.config
       
   149 
       
   150     def session(self):
       
   151         """return current server side session (using default manager account)"""
       
   152         return self.env.repo._sessions[self.env.cnx.sessionid]
       
   153     
       
   154     def request(self, *args, **kwargs):
       
   155         """return a web interface request"""
       
   156         return self.env.create_request(*args, **kwargs)
       
   157 
       
   158     @nocoverage
       
   159     def rset_and_req(self, *args, **kwargs):
       
   160         return self.env.get_rset_and_req(*args, **kwargs)
       
   161     
       
   162     def entity(self, rql, args=None, eidkey=None, req=None):
       
   163         return self.execute(rql, args, eidkey, req=req).get_entity(0, 0)
       
   164     
       
   165     def etype_instance(self, etype, req=None):
       
   166         req = req or self.request()
       
   167         e = self.env.vreg.etype_class(etype)(req, None, None)
       
   168         e.eid = None
       
   169         return e
       
   170     
       
   171     def add_entity(self, etype, **kwargs):
       
   172         rql = ['INSERT %s X' % etype]
       
   173 
       
   174         # dict for replacement in RQL Request
       
   175         rql_args = {}
       
   176 
       
   177         if kwargs: #
       
   178             rql.append(':')
       
   179             # dict to define new entities variables
       
   180             entities = {}
       
   181 
       
   182             # assignement part of the request
       
   183             sub_rql = []
       
   184             for key, value in kwargs.iteritems():
       
   185                 # entities
       
   186                 if hasattr(value, 'eid'): 
       
   187                     new_value = "%s__" % key.upper()
       
   188                     
       
   189                     entities[new_value] = value.eid
       
   190                     rql_args[new_value] = value.eid
       
   191                     
       
   192                     sub_rql.append("X %s %s" % (key, new_value))
       
   193                 # final attributes
       
   194                 else: 
       
   195                     sub_rql.append('X %s %%(%s)s' % (key, key))
       
   196                     rql_args[key] = value
       
   197             rql.append(', '.join(sub_rql))
       
   198 
       
   199 
       
   200             if entities:
       
   201                 rql.append('WHERE')
       
   202                 # WHERE part of the request (to link entity to they eid)
       
   203                 sub_rql = []
       
   204                 for key, value in entities.iteritems():
       
   205                     sub_rql.append("%s eid %%(%s)s" % (key, key))
       
   206                 rql.append(', '.join(sub_rql))
       
   207 
       
   208         rql = ' '.join(rql)
       
   209         rset = self.execute(rql, rql_args)
       
   210         return rset.get_entity(0, 0)
       
   211 
       
   212     def set_option(self, optname, value):
       
   213         self.vreg.config.global_set_option(optname, value)
       
   214 
       
   215     def pviews(self, req, rset):
       
   216         return sorted((a.id, a.__class__) for a in self.vreg.possible_views(req, rset)) 
       
   217         
       
   218     def pactions(self, req, rset, skipcategories=('addrelated', 'siteactions', 'useractions')):
       
   219         return [(a.id, a.__class__) for a in self.vreg.possible_vobjects('actions', req, rset)
       
   220                 if a.category not in skipcategories]
       
   221     def pactionsdict(self, req, rset, skipcategories=('addrelated', 'siteactions', 'useractions')):
       
   222         res = {}
       
   223         for a in self.vreg.possible_vobjects('actions', req, rset):
       
   224             if a.category not in skipcategories:
       
   225                 res.setdefault(a.category, []).append(a.__class__)
       
   226         return res
       
   227 
       
   228     def paddrelactions(self, req, rset):
       
   229         return [(a.id, a.__class__) for a in self.vreg.possible_vobjects('actions', req, rset)
       
   230                 if a.category == 'addrelated']
       
   231                
       
   232     def remote_call(self, fname, *args):
       
   233         """remote call simulation"""
       
   234         dump = simplejson.dumps
       
   235         args = [dump(arg) for arg in args]
       
   236         req = self.request(mode='remote', fname=fname, pageid='123', arg=args)
       
   237         ctrl = self.env.app.select_controller('json', req)
       
   238         return ctrl.publish(), req
       
   239 
       
   240     # default test setup and teardown #########################################
       
   241         
       
   242     def setup_database(self):
       
   243         pass
       
   244 
       
   245     def setUp(self):
       
   246         self.restore_connection()
       
   247         session = self.session()
       
   248         #self.maxeid = self.execute('Any MAX(X)')
       
   249         session.set_pool()
       
   250         self.maxeid = session.system_sql('SELECT MAX(eid) FROM entities').fetchone()[0]
       
   251         self.app = self.env.app
       
   252         self.vreg = self.env.app.vreg
       
   253         self.schema = self.vreg.schema
       
   254         self.vreg.config.mode = 'test'
       
   255         # set default-dest-addrs to a dumb email address to avoid mailbox or
       
   256         # mail queue pollution
       
   257         self.set_option('default-dest-addrs', ['whatever'])
       
   258         self.setup_database()
       
   259         self.commit()
       
   260         MAILBOX[:] = [] # reset mailbox
       
   261         
       
   262     @nocoverage
       
   263     def tearDown(self):
       
   264         self.rollback()
       
   265         # self.env.restore_database()
       
   266         self.env.restore_connection()
       
   267         self.session().unsafe_execute('DELETE Any X WHERE X eid > %s' % self.maxeid)
       
   268         self.commit()
       
   269 
       
   270 
       
   271 # XXX
       
   272 try:
       
   273     from cubicweb.web import Redirect
       
   274     from urllib import unquote
       
   275 except ImportError:
       
   276     pass # cubicweb-web not installed
       
   277 else:
       
   278     class ControllerTC(EnvBasedTC):
       
   279         def setUp(self):
       
   280             super(ControllerTC, self).setUp()
       
   281             self.req = self.request()
       
   282             self.ctrl = self.env.app.select_controller('edit', self.req)
       
   283 
       
   284         def publish(self, req):
       
   285             assert req is self.ctrl.req
       
   286             try:
       
   287                 result = self.ctrl.publish()
       
   288                 req.cnx.commit()
       
   289             except Redirect:
       
   290                 req.cnx.commit()
       
   291                 raise
       
   292             return result
       
   293 
       
   294         def expect_redirect_publish(self, req=None):
       
   295             if req is not None:
       
   296                 self.ctrl = self.env.app.select_controller('edit', req)
       
   297             else:
       
   298                 req = self.req
       
   299             try:
       
   300                 res = self.publish(req)
       
   301             except Redirect, ex:
       
   302                 try:
       
   303                     path, params = ex.location.split('?', 1)
       
   304                 except:
       
   305                     path, params = ex.location, ""
       
   306                 req._url = path
       
   307                 cleanup = lambda p: (p[0], unquote(p[1]))
       
   308                 params = dict(cleanup(p.split('=', 1)) for p in params.split('&') if p)
       
   309                 return req.relative_path(False), params # path.rsplit('/', 1)[-1], params
       
   310             else:
       
   311                 self.fail('expected a Redirect exception')
       
   312 
       
   313 
       
   314 def make_late_binding_repo_property(attrname):
       
   315     @property
       
   316     def late_binding(self):
       
   317         """builds cnx as late as possible"""
       
   318         if not hasattr(self, attrname):
       
   319             # sets explicit test mode here to avoid autoreload
       
   320             from cubicweb.cwconfig import CubicWebConfiguration
       
   321             CubicWebConfiguration.mode = 'test'
       
   322             cls = self.__class__
       
   323             config = self.repo_config or TestServerConfiguration('data')
       
   324             cls._repo, cls._cnx = init_test_database('sqlite',  config=config)
       
   325         return getattr(self, attrname)
       
   326     return late_binding
       
   327 
       
   328 
       
   329 class autorepo(type):
       
   330     """automatically set repository on RepositoryBasedTC subclasses if necessary
       
   331     """
       
   332     def __new__(mcs, name, bases, classdict):
       
   333         repo = classdict.get('repo')
       
   334         # try to find repo in one of the base classes
       
   335         if repo is None:
       
   336             for base in bases:
       
   337                 repo = getattr(base, 'repo', None)
       
   338                 if repo is not None:
       
   339                     classdict['repo'] = repo
       
   340                     break
       
   341         if name != 'RepositoryBasedTC' and not classdict.get('repo'):
       
   342             classdict['repo'] = make_late_binding_repo_property('_repo')
       
   343             classdict['cnx'] = make_late_binding_repo_property('_cnx')
       
   344         return super(autorepo, mcs).__new__(mcs, name, bases, classdict)
       
   345 
       
   346 
       
   347 class RepositoryBasedTC(TestCase):
       
   348     """abstract class for test using direct repository connections
       
   349     """
       
   350     __metaclass__ = autorepo
       
   351     repo_config = None # set a particular config instance if necessary
       
   352     
       
   353     # user / session management ###############################################
       
   354 
       
   355     def create_user(self, user, groups=('users',), password=None, commit=True):
       
   356         if password is None:
       
   357             password = user
       
   358         eid = self.execute('INSERT EUser X: X login %(x)s, X upassword %(p)s,'
       
   359                             'X in_state S WHERE S name "activated"',
       
   360                             {'x': unicode(user), 'p': password})[0][0]
       
   361         groups = ','.join(repr(group) for group in groups)
       
   362         self.execute('SET X in_group Y WHERE X eid %%(x)s, Y name IN (%s)' % groups,
       
   363                       {'x': eid})
       
   364         if commit:
       
   365             self.commit()
       
   366         self.session.reset_pool()        
       
   367         return eid
       
   368     
       
   369     def login(self, login, password=None):
       
   370         cnx = repo_connect(self.repo, unicode(login), password or login,
       
   371                            ConnectionProperties('inmemory'))
       
   372         self.cnxs.append(cnx)
       
   373         return cnx
       
   374 
       
   375     def current_session(self):
       
   376         return self.repo._sessions[self.cnxs[-1].sessionid]
       
   377     
       
   378     def restore_connection(self):
       
   379         assert len(self.cnxs) == 1, self.cnxs
       
   380         cnx = self.cnxs.pop()
       
   381         try:
       
   382             cnx.close()
       
   383         except Exception, ex:
       
   384             print "exception occured while closing connection", ex
       
   385         
       
   386     # db api ##################################################################
       
   387 
       
   388     def execute(self, rql, args=None, eid_key=None):
       
   389         assert self.session.id == self.cnxid
       
   390         rset = self.__execute(self.cnxid, rql, args, eid_key)
       
   391         rset.vreg = self.vreg
       
   392         rset.req = self.session
       
   393         # call to set_pool is necessary to avoid pb when using
       
   394         # application entities for convenience
       
   395         self.session.set_pool()
       
   396         return rset
       
   397     
       
   398     def commit(self):
       
   399         self.__commit(self.cnxid)
       
   400         self.session.set_pool()        
       
   401     
       
   402     def rollback(self):
       
   403         self.__rollback(self.cnxid)
       
   404         self.session.set_pool()        
       
   405     
       
   406     def close(self):
       
   407         self.__close(self.cnxid)
       
   408 
       
   409     # other utilities #########################################################
       
   410     def set_debug(self, debugmode):
       
   411         from cubicweb.server import set_debug
       
   412         set_debug(debugmode)
       
   413         
       
   414     def set_option(self, optname, value):
       
   415         self.vreg.config.global_set_option(optname, value)
       
   416             
       
   417     def add_entity(self, etype, **kwargs):
       
   418         restrictions = ', '.join('X %s %%(%s)s' % (key, key) for key in kwargs)
       
   419         rql = 'INSERT %s X' % etype
       
   420         if kwargs:
       
   421             rql += ': %s' % ', '.join('X %s %%(%s)s' % (key, key) for key in kwargs)
       
   422         rset = self.execute(rql, kwargs)
       
   423         return rset.get_entity(0, 0)
       
   424 
       
   425     def default_user_password(self):
       
   426         config = self.repo.config #TestConfiguration('data')
       
   427         user = unicode(config.sources()['system']['db-user'])
       
   428         passwd = config.sources()['system']['db-password']
       
   429         return user, passwd
       
   430     
       
   431     def close_connections(self):
       
   432         for cnx in self.cnxs:
       
   433             try:
       
   434                 cnx.rollback()
       
   435                 cnx.close()
       
   436             except:
       
   437                 continue
       
   438         self.cnxs = []
       
   439 
       
   440     pactions = EnvBasedTC.pactions.im_func
       
   441     pactionsdict = EnvBasedTC.pactionsdict.im_func
       
   442     
       
   443     # default test setup and teardown #########################################
       
   444     copy_schema = False
       
   445     
       
   446     def _prepare(self):
       
   447         MAILBOX[:] = [] # reset mailbox
       
   448         if hasattr(self, 'cnxid'):
       
   449             return
       
   450         repo = self.repo
       
   451         self.__execute = repo.execute
       
   452         self.__commit = repo.commit
       
   453         self.__rollback = repo.rollback
       
   454         self.__close = repo.close
       
   455         self.cnxid = repo.connect(*self.default_user_password())
       
   456         self.session = repo._sessions[self.cnxid]
       
   457         # XXX copy schema since hooks may alter it and it may be not fully
       
   458         #     cleaned (missing some schema synchronization support)
       
   459         try:
       
   460             origschema = repo.__schema
       
   461         except AttributeError:
       
   462             origschema = repo.schema
       
   463             repo.__schema = origschema
       
   464         if self.copy_schema:
       
   465             repo.schema = deepcopy(origschema)
       
   466             repo.set_schema(repo.schema) # reset hooks
       
   467             repo.vreg.update_schema(repo.schema)
       
   468         self.cnxs = []
       
   469         # reset caches, they may introduce bugs among tests
       
   470         repo._type_source_cache = {}
       
   471         repo._extid_cache = {}
       
   472         repo.querier._rql_cache = {}
       
   473         for source in repo.sources:
       
   474             source.reset_caches()
       
   475         for s in repo.sources:
       
   476             if hasattr(s, '_cache'):
       
   477                 s._cache = {}
       
   478 
       
   479     @property
       
   480     def config(self):
       
   481         return self.repo.config
       
   482 
       
   483     @property
       
   484     def vreg(self):
       
   485         return self.repo.vreg
       
   486 
       
   487     @property
       
   488     def schema(self):
       
   489         return self.repo.schema
       
   490     
       
   491     def setUp(self):
       
   492         self._prepare()
       
   493         self.session.set_pool()
       
   494         self.maxeid = self.session.system_sql('SELECT MAX(eid) FROM entities').fetchone()[0]
       
   495         #self.maxeid = self.execute('Any MAX(X)')
       
   496         
       
   497     def tearDown(self, close=True):
       
   498         self.close_connections()
       
   499         self.rollback()
       
   500         self.session.unsafe_execute('DELETE Any X WHERE X eid > %(x)s', {'x': self.maxeid})
       
   501         self.commit()
       
   502         if close:
       
   503             self.close()
       
   504