devtools/_apptest.py
branchtls-sprint
changeset 1802 d628defebc17
parent 1398 5fe84a5f7035
child 1977 606923dff11b
equal deleted inserted replaced
1801:672acc730ce5 1802:d628defebc17
     1 """Hidden internals for the devtools.apptest module
     1 """Hidden internals for the devtools.apptest module
     2 
     2 
     3 :organization: Logilab
     3 :organization: Logilab
     4 :copyright: 2001-2008 LOGILAB S.A. (Paris, FRANCE), all rights reserved.
     4 :copyright: 2001-2009 LOGILAB S.A. (Paris, FRANCE), all rights reserved.
     5 :contact: http://www.logilab.fr/ -- mailto:contact@logilab.fr
     5 :contact: http://www.logilab.fr/ -- mailto:contact@logilab.fr
     6 """
     6 """
     7 __docformat__ = "restructuredtext en"
     7 __docformat__ = "restructuredtext en"
     8 
     8 
     9 import sys, traceback
     9 import sys, traceback
    18 from cubicweb.web.application import CubicWebPublisher
    18 from cubicweb.web.application import CubicWebPublisher
    19 from cubicweb.web import Redirect
    19 from cubicweb.web import Redirect
    20 
    20 
    21 from cubicweb.devtools import ApptestConfiguration, init_test_database
    21 from cubicweb.devtools import ApptestConfiguration, init_test_database
    22 from cubicweb.devtools.fake import FakeRequest
    22 from cubicweb.devtools.fake import FakeRequest
    23     
    23 
    24 SYSTEM_ENTITIES = ('CWGroup', 'CWUser',
    24 SYSTEM_ENTITIES = ('CWGroup', 'CWUser',
    25                    'CWAttribute', 'CWRelation',
    25                    'CWAttribute', 'CWRelation',
    26                    'CWConstraint', 'CWConstraintType', 'CWProperty',
    26                    'CWConstraint', 'CWConstraintType', 'CWProperty',
    27                    'CWEType', 'CWRType',
    27                    'CWEType', 'CWRType',
    28                    'State', 'Transition', 'TrInfo',
    28                    'State', 'Transition', 'TrInfo',
    33     'identity',
    33     'identity',
    34     # metadata
    34     # metadata
    35     'is', 'is_instance_of', 'owned_by', 'created_by', 'specializes',
    35     'is', 'is_instance_of', 'owned_by', 'created_by', 'specializes',
    36     # workflow related
    36     # workflow related
    37     'state_of', 'transition_of', 'initial_state', 'allowed_transition',
    37     'state_of', 'transition_of', 'initial_state', 'allowed_transition',
    38     'destination_state', 'in_state', 'wf_info_for', 'from_state', 'to_state', 
    38     'destination_state', 'in_state', 'wf_info_for', 'from_state', 'to_state',
    39     'condition',
    39     'condition',
    40     # permission
    40     # permission
    41     'in_group', 'require_group', 'require_permission',
    41     'in_group', 'require_group', 'require_permission',
    42     'read_permission', 'update_permission', 'delete_permission', 'add_permission',
    42     'read_permission', 'update_permission', 'delete_permission', 'add_permission',
    43     # eproperty
    43     # eproperty
    44     'for_user',
    44     'for_user',
    45     # schema definition
    45     # schema definition
    46     'relation_type', 'from_entity', 'to_entity',
    46     'relation_type', 'from_entity', 'to_entity',
    47     'constrained_by', 'cstrtype', 'widget',
    47     'constrained_by', 'cstrtype', 'widget',
    48     # deducted from other relations
    48     # deducted from other relations
    49     'primary_email', 
    49     'primary_email',
    50                     )
    50                     )
    51 
    51 
    52 def unprotected_entities(app_schema, strict=False):
    52 def unprotected_entities(app_schema, strict=False):
    53     """returned a Set of each non final entity type, excluding CWGroup, and CWUser...
    53     """returned a Set of each non final entity type, excluding CWGroup, and CWUser...
    54     """
    54     """
    56         protected_entities = yams.schema.BASE_TYPES
    56         protected_entities = yams.schema.BASE_TYPES
    57     else:
    57     else:
    58         protected_entities = yams.schema.BASE_TYPES.union(set(SYSTEM_ENTITIES))
    58         protected_entities = yams.schema.BASE_TYPES.union(set(SYSTEM_ENTITIES))
    59     entities = set(app_schema.entities())
    59     entities = set(app_schema.entities())
    60     return entities - protected_entities
    60     return entities - protected_entities
    61     
    61 
    62 
    62 
    63 def ignore_relations(*relations):
    63 def ignore_relations(*relations):
    64     global SYSTEM_RELATIONS
    64     global SYSTEM_RELATIONS
    65     SYSTEM_RELATIONS += relations
    65     SYSTEM_RELATIONS += relations
    66 
    66 
    67 class TestEnvironment(object):
    67 class TestEnvironment(object):
    68     """TestEnvironment defines a context (e.g. a config + a given connection) in
    68     """TestEnvironment defines a context (e.g. a config + a given connection) in
    69     which the tests are executed
    69     which the tests are executed
    70     """
    70     """
    71     
    71 
    72     def __init__(self, appid, reporter=None, verbose=False,
    72     def __init__(self, appid, reporter=None, verbose=False,
    73                  configcls=ApptestConfiguration, requestcls=FakeRequest):
    73                  configcls=ApptestConfiguration, requestcls=FakeRequest):
    74         config = configcls(appid)
    74         config = configcls(appid)
    75         self.requestcls = requestcls
    75         self.requestcls = requestcls
    76         self.cnx = None
    76         self.cnx = None
   112         # XXX cnx decoration is usually done by the repository authentication manager,
   112         # XXX cnx decoration is usually done by the repository authentication manager,
   113         # necessary in authentication tests
   113         # necessary in authentication tests
   114         self.cnx.vreg = self.vreg
   114         self.cnx.vreg = self.vreg
   115         self.cnx.login = source['db-user']
   115         self.cnx.login = source['db-user']
   116         self.cnx.password = source['db-password']
   116         self.cnx.password = source['db-password']
   117         
   117 
   118 
   118 
   119     def create_user(self, login, groups=('users',), req=None):
   119     def create_user(self, login, groups=('users',), req=None):
   120         req = req or self.create_request()
   120         req = req or self.create_request()
   121         cursor = self._orig_cnx.cursor(req)
   121         cursor = self._orig_cnx.cursor(req)
   122         rset = cursor.execute('INSERT CWUser X: X login %(login)s, X upassword %(passwd)s,'
   122         rset = cursor.execute('INSERT CWUser X: X login %(login)s, X upassword %(passwd)s,'
   138                                     password or str(login),
   138                                     password or str(login),
   139                                     ConnectionProperties('inmemory'))
   139                                     ConnectionProperties('inmemory'))
   140         if login == self.vreg.config.anonymous_user()[0]:
   140         if login == self.vreg.config.anonymous_user()[0]:
   141             self.cnx.anonymous_connection = True
   141             self.cnx.anonymous_connection = True
   142         return self.cnx
   142         return self.cnx
   143     
   143 
   144     def restore_connection(self):
   144     def restore_connection(self):
   145         if not self.cnx is self._orig_cnx:
   145         if not self.cnx is self._orig_cnx:
   146             try:
   146             try:
   147                 self.cnx.close()
   147                 self.cnx.close()
   148             except ProgrammingError:
   148             except ProgrammingError:
   155         """executes <rql>, builds a resultset, and returns a couple (rset, req)
   155         """executes <rql>, builds a resultset, and returns a couple (rset, req)
   156         where req is a FakeRequest
   156         where req is a FakeRequest
   157         """
   157         """
   158         req = req or self.create_request(rql=rql)
   158         req = req or self.create_request(rql=rql)
   159         return self.cnx.cursor(req).execute(unicode(rql), args, eidkey)
   159         return self.cnx.cursor(req).execute(unicode(rql), args, eidkey)
   160     
   160 
   161     def create_request(self, rql=None, **kwargs):
   161     def create_request(self, rql=None, **kwargs):
   162         """executes <rql>, builds a resultset, and returns a
   162         """executes <rql>, builds a resultset, and returns a
   163         couple (rset, req) where req is a FakeRequest
   163         couple (rset, req) where req is a FakeRequest
   164         """
   164         """
   165         if rql:
   165         if rql:
   166             kwargs['rql'] = rql
   166             kwargs['rql'] = rql
   167         req = self.requestcls(self.vreg, form=kwargs)
   167         req = self.requestcls(self.vreg, form=kwargs)
   168         req.set_connection(self.cnx)
   168         req.set_connection(self.cnx)
   169         return req
   169         return req
   170         
   170 
   171     def get_rset_and_req(self, rql, optional_args=None, args=None, eidkey=None):
   171     def get_rset_and_req(self, rql, optional_args=None, args=None, eidkey=None):
   172         """executes <rql>, builds a resultset, and returns a
   172         """executes <rql>, builds a resultset, and returns a
   173         couple (rset, req) where req is a FakeRequest
   173         couple (rset, req) where req is a FakeRequest
   174         """
   174         """
   175         return (self.execute(rql, args, eidkey),
   175         return (self.execute(rql, args, eidkey),
   176                 self.create_request(rql=rql, **optional_args or {}))
   176                 self.create_request(rql=rql, **optional_args or {}))
   177     
   177 
   178     def check_view(self, rql, vid, optional_args, template='main'):
   178     def check_view(self, rql, vid, optional_args, template='main'):
   179         """checks if vreg.view() raises an exception in this environment
   179         """checks if vreg.view() raises an exception in this environment
   180 
   180 
   181         If any exception is raised in this method, it will be considered
   181         If any exception is raised in this method, it will be considered
   182         as a TestFailure
   182         as a TestFailure
   183         """
   183         """
   184         return self.call_view(vid, rql,
   184         return self.call_view(vid, rql,
   185                               template=template, optional_args=optional_args)
   185                               template=template, optional_args=optional_args)
   186     
   186 
   187     def call_view(self, vid, rql, template='main', optional_args=None):
   187     def call_view(self, vid, rql, template='main', optional_args=None):
   188         """shortcut for self.vreg.view()"""
   188         """shortcut for self.vreg.view()"""
   189         assert template
   189         assert template
   190         if optional_args is None:
   190         if optional_args is None:
   191             optional_args = {}
   191             optional_args = {}
   225         """returns a list of possible vids for <rql>"""
   225         """returns a list of possible vids for <rql>"""
   226         for action in self.vreg.possible_vobjects('actions', req, rset):
   226         for action in self.vreg.possible_vobjects('actions', req, rset):
   227             yield action
   227             yield action
   228 
   228 
   229 class ExistingTestEnvironment(TestEnvironment):
   229 class ExistingTestEnvironment(TestEnvironment):
   230     
   230 
   231     def __init__(self, appid, sourcefile, verbose=False):
   231     def __init__(self, appid, sourcefile, verbose=False):
   232         config = ApptestConfiguration(appid, sourcefile=sourcefile)
   232         config = ApptestConfiguration(appid, sourcefile=sourcefile)
   233         if verbose:
   233         if verbose:
   234             print "init test database ..."
   234             print "init test database ..."
   235         source = config.sources()['system']
   235         source = config.sources()['system']
   236         self.vreg = CubicWebRegistry(config)
   236         self.vreg = CubicWebRegistry(config)
   237         self.cnx = init_test_database(driver=source['db-driver'],
   237         self.cnx = init_test_database(driver=source['db-driver'],
   238                                       vreg=self.vreg)[1]
   238                                       vreg=self.vreg)[1]
   239         if verbose:
   239         if verbose:
   240             print "init done" 
   240             print "init done"
   241         self.app = CubicWebPublisher(config, vreg=self.vreg)
   241         self.app = CubicWebPublisher(config, vreg=self.vreg)
   242         self.verbose = verbose
   242         self.verbose = verbose
   243         # this is done when the publisher is opening a connection
   243         # this is done when the publisher is opening a connection
   244         self.cnx.vreg = self.vreg
   244         self.cnx.vreg = self.vreg
   245         
   245 
   246     def setup(self, config=None):
   246     def setup(self, config=None):
   247         """config is passed by TestSuite but is ignored in this environment"""
   247         """config is passed by TestSuite but is ignored in this environment"""
   248         cursor = self.cnx.cursor()
   248         cursor = self.cnx.cursor()
   249         self.last_eid = cursor.execute('Any X WHERE X creation_date D ORDERBY D DESC LIMIT 1').rows[0][0]
   249         self.last_eid = cursor.execute('Any X WHERE X creation_date D ORDERBY D DESC LIMIT 1').rows[0][0]
   250 
   250 
   252         """cancel inserted elements during tests"""
   252         """cancel inserted elements during tests"""
   253         cursor = self.cnx.cursor()
   253         cursor = self.cnx.cursor()
   254         cursor.execute('DELETE Any X WHERE X eid > %(x)s', {'x' : self.last_eid}, eid_key='x')
   254         cursor.execute('DELETE Any X WHERE X eid > %(x)s', {'x' : self.last_eid}, eid_key='x')
   255         print "cleaning done"
   255         print "cleaning done"
   256         self.cnx.commit()
   256         self.cnx.commit()
   257