devtools/_apptest.py
changeset 0 b97547f5f1fa
child 387 dbe9997ffc7e
equal deleted inserted replaced
-1:000000000000 0:b97547f5f1fa
       
     1 """Hidden internals for the devtools.apptest module
       
     2 
       
     3 :organization: Logilab
       
     4 :copyright: 2001-2008 LOGILAB S.A. (Paris, FRANCE), all rights reserved.
       
     5 :contact: http://www.logilab.fr/ -- mailto:contact@logilab.fr
       
     6 """
       
     7 __docformat__ = "restructuredtext en"
       
     8 
       
     9 import sys, traceback
       
    10 
       
    11 from logilab.common.pytest import pause_tracing, resume_tracing
       
    12 
       
    13 import yams.schema
       
    14 
       
    15 from cubicweb.dbapi import repo_connect, ConnectionProperties, ProgrammingError
       
    16 from cubicweb.cwvreg import CubicWebRegistry
       
    17 
       
    18 from cubicweb.web.application import CubicWebPublisher
       
    19 from cubicweb.web import Redirect
       
    20 
       
    21 from cubicweb.devtools import ApptestConfiguration, init_test_database
       
    22 from cubicweb.devtools.fake import FakeRequest
       
    23     
       
    24 SYSTEM_ENTITIES = ('EGroup', 'EUser',
       
    25                    'EFRDef', 'ENFRDef',
       
    26                    'EConstraint', 'EConstraintType', 'EProperty',
       
    27                    'EEType', 'ERType',
       
    28                    'State', 'Transition', 'TrInfo',
       
    29                    'RQLExpression',
       
    30                    )
       
    31 SYSTEM_RELATIONS = (
       
    32     # virtual relation
       
    33     'identity',
       
    34     # metadata
       
    35     'is', 'is_instance_of', 'owned_by', 'created_by', 'specializes',
       
    36     # workflow related
       
    37     'state_of', 'transition_of', 'initial_state', 'allowed_transition',
       
    38     'destination_state', 'in_state', 'wf_info_for', 'from_state', 'to_state', 
       
    39     'condition',
       
    40     # permission
       
    41     'in_group', 'require_group', 'require_permission',
       
    42     'read_permission', 'update_permission', 'delete_permission', 'add_permission',
       
    43     # eproperty
       
    44     'for_user',
       
    45     # schema definition
       
    46     'relation_type', 'from_entity', 'to_entity',
       
    47     'constrained_by', 'cstrtype', 'widget',
       
    48     # deducted from other relations
       
    49     'primary_email', 
       
    50                     )
       
    51 
       
    52 def unprotected_entities(app_schema, strict=False):
       
    53     """returned a Set of each non final entity type, excluding EGroup, and EUser...
       
    54     """
       
    55     if strict:
       
    56         protected_entities = yams.schema.BASE_TYPES
       
    57     else:
       
    58         protected_entities = yams.schema.BASE_TYPES.union(set(SYSTEM_ENTITIES))
       
    59     entities = set(app_schema.entities())
       
    60     return entities - protected_entities
       
    61     
       
    62 
       
    63 def ignore_relations(*relations):
       
    64     SYSTEM_RELATIONS += relations
       
    65 
       
    66 class TestEnvironment(object):
       
    67     """TestEnvironment defines a context (e.g. a config + a given connection) in
       
    68     which the tests are executed
       
    69     """
       
    70     
       
    71     def __init__(self, appid, reporter=None, verbose=False,
       
    72                  configcls=ApptestConfiguration, requestcls=FakeRequest):
       
    73         config = configcls(appid)
       
    74         self.requestcls = requestcls
       
    75         self.cnx = None
       
    76         config.db_perms = False
       
    77         source = config.sources()['system']
       
    78         if verbose:
       
    79             print "init test database ..."
       
    80         self.vreg = vreg = CubicWebRegistry(config)
       
    81         self.admlogin = source['db-user']
       
    82         # restore database <=> init database
       
    83         self.restore_database()
       
    84         if verbose:
       
    85             print "init done"
       
    86         login = source['db-user']
       
    87         config.repository = lambda x=None: self.repo
       
    88         self.app = CubicWebPublisher(config, vreg=vreg)
       
    89         self.verbose = verbose
       
    90         schema = self.vreg.schema
       
    91         # else we may run into problems since email address are ususally share in app tests
       
    92         # XXX should not be necessary anymore
       
    93         schema.rschema('primary_email').set_rproperty('EUser', 'EmailAddress', 'composite', False)
       
    94         self.deletable_entities = unprotected_entities(schema)
       
    95 
       
    96     def restore_database(self):
       
    97         """called by unittests' tearDown to restore the original database
       
    98         """
       
    99         try:
       
   100             pause_tracing()
       
   101             if self.cnx:
       
   102                 self.cnx.close()
       
   103             source = self.vreg.config.sources()['system']
       
   104             self.repo, self.cnx = init_test_database(driver=source['db-driver'],
       
   105                                                      vreg=self.vreg)
       
   106             self._orig_cnx = self.cnx
       
   107             resume_tracing()
       
   108         except:
       
   109             resume_tracing()
       
   110             traceback.print_exc()
       
   111             sys.exit(1)
       
   112         # XXX cnx decoration is usually done by the repository authentication manager,
       
   113         # necessary in authentication tests
       
   114         self.cnx.vreg = self.vreg
       
   115         self.cnx.login = source['db-user']
       
   116         self.cnx.password = source['db-password']
       
   117         
       
   118 
       
   119     def create_user(self, login, groups=('users',), req=None):
       
   120         req = req or self.create_request()
       
   121         cursor = self._orig_cnx.cursor(req)
       
   122         rset = cursor.execute('INSERT EUser X: X login %(login)s, X upassword %(passwd)s,'
       
   123                               'X in_state S WHERE S name "activated"',
       
   124                               {'login': unicode(login), 'passwd': login.encode('utf8')})
       
   125         user = rset.get_entity(0, 0)
       
   126         cursor.execute('SET X in_group G WHERE X eid %%(x)s, G name IN(%s)'
       
   127                        % ','.join(repr(g) for g in groups),
       
   128                        {'x': user.eid}, 'x')
       
   129         user.clear_related_cache('in_group', 'subject')
       
   130         self._orig_cnx.commit()
       
   131         return user
       
   132 
       
   133     def login(self, login):
       
   134         if login == self.admlogin:
       
   135             self.restore_connection()
       
   136         else:
       
   137             self.cnx = repo_connect(self.repo, unicode(login), str(login),
       
   138                                     ConnectionProperties('inmemory'))
       
   139         if login == self.vreg.config.anonymous_user()[0]:
       
   140             self.cnx.anonymous_connection = True
       
   141         return self.cnx
       
   142     
       
   143     def restore_connection(self):
       
   144         if not self.cnx is self._orig_cnx:
       
   145             try:
       
   146                 self.cnx.close()
       
   147             except ProgrammingError:
       
   148                 pass # already closed
       
   149         self.cnx = self._orig_cnx
       
   150 
       
   151     ############################################################################
       
   152 
       
   153     def execute(self, rql, args=None, eidkey=None, req=None):
       
   154         """executes <rql>, builds a resultset, and returns a couple (rset, req)
       
   155         where req is a FakeRequest
       
   156         """
       
   157         req = req or self.create_request(rql=rql)
       
   158         return self.cnx.cursor(req).execute(unicode(rql), args, eidkey)
       
   159     
       
   160     def create_request(self, rql=None, **kwargs):
       
   161         """executes <rql>, builds a resultset, and returns a
       
   162         couple (rset, req) where req is a FakeRequest
       
   163         """
       
   164         if rql:
       
   165             kwargs['rql'] = rql
       
   166         req = self.requestcls(self.vreg, form=kwargs)
       
   167         req.set_connection(self.cnx)
       
   168         return req
       
   169         
       
   170     def get_rset_and_req(self, rql, optional_args=None, args=None, eidkey=None):
       
   171         """executes <rql>, builds a resultset, and returns a
       
   172         couple (rset, req) where req is a FakeRequest
       
   173         """
       
   174         return (self.execute(rql, args, eidkey),
       
   175                 self.create_request(rql=rql, **optional_args or {}))
       
   176     
       
   177     def check_view(self, rql, vid, optional_args, template='main'):
       
   178         """checks if vreg.view() raises an exception in this environment
       
   179 
       
   180         If any exception is raised in this method, it will be considered
       
   181         as a TestFailure
       
   182         """
       
   183         return self.call_view(vid, rql,
       
   184                               template=template, optional_args=optional_args)
       
   185     
       
   186     def call_view(self, vid, rql, template='main', optional_args=None):
       
   187         """shortcut for self.vreg.view()"""
       
   188         assert template
       
   189         if optional_args is None:
       
   190             optional_args = {}
       
   191         optional_args['vid'] = vid
       
   192         req = self.create_request(rql=rql, **optional_args)
       
   193         return self.vreg.main_template(req, template)
       
   194 
       
   195     def call_edit(self, req):
       
   196         """shortcut for self.app.edit()"""
       
   197         controller = self.app.select_controller('edit', req)
       
   198         try:
       
   199             controller.publish()
       
   200         except Redirect:
       
   201             result = 'success'
       
   202         else:
       
   203             raise Exception('edit should raise Redirect on success')
       
   204         req.cnx.commit()
       
   205         return result
       
   206 
       
   207     def iter_possible_views(self, req, rset):
       
   208         """returns a list of possible vids for <rql>"""
       
   209         for view in self.vreg.possible_views(req, rset):
       
   210             if view.category == 'startupview':
       
   211                 continue
       
   212             yield view.id
       
   213         if rset.rowcount == 1:
       
   214             yield 'edition'
       
   215 
       
   216     def iter_startup_views(self, req):
       
   217         """returns the list of startup views"""
       
   218         for view in self.vreg.possible_views(req, None):
       
   219             if view.category != 'startupview':
       
   220                 continue
       
   221             yield view.id
       
   222 
       
   223     def iter_possible_actions(self, req, rset):
       
   224         """returns a list of possible vids for <rql>"""
       
   225         for action in self.vreg.possible_vobjects('actions', req, rset):
       
   226             yield action
       
   227 
       
   228 class ExistingTestEnvironment(TestEnvironment):
       
   229     
       
   230     def __init__(self, appid, sourcefile, verbose=False):
       
   231         config = ApptestConfiguration(appid, sourcefile=sourcefile)
       
   232         if verbose:
       
   233             print "init test database ..."
       
   234         source = config.sources()['system']
       
   235         self.vreg = CubicWebRegistry(config)
       
   236         repo, self.cnx = init_test_database(driver=source['db-driver'],
       
   237                                             vreg=self.vreg)
       
   238         if verbose:
       
   239             print "init done" 
       
   240         self.app = CubicWebPublisher(config, vreg=self.vreg)
       
   241         self.verbose = verbose
       
   242         # this is done when the publisher is opening a connection
       
   243         self.cnx.vreg = self.vreg
       
   244         login = source['db-user']
       
   245         
       
   246     def setup(self, config=None):
       
   247         """config is passed by TestSuite but is ignored in this environment"""
       
   248         cursor = self.cnx.cursor()
       
   249         self.last_eid = cursor.execute('Any X WHERE X creation_date D ORDERBY D DESC LIMIT 1').rows[0][0]
       
   250 
       
   251     def cleanup(self):
       
   252         """cancel inserted elements during tests"""
       
   253         cursor = self.cnx.cursor()
       
   254         cursor.execute('DELETE Any X WHERE X eid > %(x)s', {'x' : self.last_eid}, eid_key='x')
       
   255         print "cleaning done"
       
   256         self.cnx.commit()
       
   257