devtools/fake.py
changeset 0 b97547f5f1fa
child 1481 8ea54e7be3e2
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/devtools/fake.py	Wed Nov 05 15:52:50 2008 +0100
@@ -0,0 +1,241 @@
+"""Fake objects to ease testing of cubicweb without a fully working environment
+
+:organization: Logilab
+:copyright: 2001-2008 LOGILAB S.A. (Paris, FRANCE), all rights reserved.
+:contact: http://www.logilab.fr/ -- mailto:contact@logilab.fr
+"""
+__docformat__ = "restructuredtext en"
+
+from logilab.common.testlib import mock_object as Mock
+from logilab.common.adbh import get_adv_func_helper
+
+from indexer import get_indexer
+
+from cubicweb import RequestSessionMixIn
+from cubicweb.web.request import CubicWebRequestBase
+from cubicweb.devtools import BASE_URL, BaseApptestConfiguration
+
+
+class FakeConfig(dict, BaseApptestConfiguration):
+    translations = {}
+    apphome = None
+    def __init__(self, appid='data', apphome=None, cubes=()):
+        self.appid = appid
+        self.apphome = apphome
+        self._cubes = cubes
+        self['auth-mode'] = 'cookie'
+        self['uid'] = None 
+        self['base-url'] = BASE_URL
+        self['rql-cache-size'] = 100
+       
+    def cubes(self, expand=False):
+        return self._cubes
+    
+    def sources(self):
+        return {}
+
+class FakeVReg(object):
+    def __init__(self, schema=None, config=None):
+        self.schema = schema
+        self.config = config or FakeConfig()
+        self.properties = {'ui.encoding': 'UTF8',
+                           'ui.language': 'en',
+                           }
+        
+    def property_value(self, key):
+        return self.properties[key]
+
+    _registries = {
+        'controllers' : [Mock(id='view'), Mock(id='login'),
+                         Mock(id='logout'), Mock(id='edit')],
+        'views' : [Mock(id='primary'), Mock(id='secondary'),
+                         Mock(id='oneline'), Mock(id='list')],
+        }
+    
+    def registry_objects(self, name, oid=None):
+        return self._registries[name]
+    
+    def etype_class(self, etype):
+        class Entity(dict):
+            e_schema = self.schema[etype]
+            def __init__(self, session, eid, row=0, col=0):
+                self.req = session
+                self.eid = eid
+                self.row, self.col = row, col
+            def set_eid(self, eid):
+                self.eid = self['eid'] = eid
+        return Entity
+
+
+class FakeRequest(CubicWebRequestBase):
+    """test implementation of an cubicweb request object"""
+
+    def __init__(self, *args, **kwargs):
+        if not (args or 'vreg' in kwargs):
+            kwargs['vreg'] = FakeVReg()
+        kwargs['https'] = False
+        self._url = kwargs.pop('url', 'view?rql=Blop&vid=blop')
+        super(FakeRequest, self).__init__(*args, **kwargs)
+        self._session_data = {}
+        self._headers = {}
+
+    def header_accept_language(self):
+        """returns an ordered list of preferred languages"""
+        return ('en',)
+
+    def header_if_modified_since(self):
+        return None
+
+    def base_url(self):
+        """return the root url of the application"""
+        return BASE_URL
+
+    def relative_path(self, includeparams=True):
+        """return the normalized path of the request (ie at least relative
+        to the application's root, but some other normalization may be needed
+        so that the returned path may be used to compare to generated urls
+        """
+        if self._url.startswith(BASE_URL):
+            url = self._url[len(BASE_URL):]
+        else:
+            url = self._url
+        if includeparams:
+            return url
+        return url.split('?', 1)[0]
+
+    def set_content_type(self, content_type, filename=None, encoding=None):
+        """set output content type for this request. An optional filename
+        may be given
+        """
+        pass
+
+    def set_header(self, header, value):
+        """set an output HTTP header"""
+        pass
+    
+    def add_header(self, header, value):
+        """set an output HTTP header"""
+        pass
+    
+    def remove_header(self, header):
+        """remove an output HTTP header"""
+        pass
+    
+    def get_header(self, header, default=None):
+        """return the value associated with the given input header,
+        raise KeyError if the header is not set
+        """
+        return self._headers.get(header, default)
+
+    def set_cookie(self, cookie, key, maxage=300):
+        """set / update a cookie key
+
+        by default, cookie will be available for the next 5 minutes
+        """
+        pass
+
+    def remove_cookie(self, cookie, key):
+        """remove a cookie by expiring it"""
+        pass
+
+    def validate_cache(self):
+        pass
+
+    # session compatibility (in some test are using this class to test server
+    # side views...)
+    def actual_session(self):
+        """return the original parent session if any, else self"""
+        return self
+
+    def unsafe_execute(self, *args, **kwargs):
+        """return the original parent session if any, else self"""
+        kwargs.pop('propagate', None)
+        return self.execute(*args, **kwargs)
+
+
+class FakeUser(object):
+    login = 'toto'
+    eid = 0
+    def in_groups(self, groups):
+        return True
+
+
+class FakeSession(RequestSessionMixIn):
+    def __init__(self, repo=None, user=None):
+        self.repo = repo
+        self.vreg = getattr(self.repo, 'vreg', FakeVReg())
+        self.pool = FakePool()
+        self.user = user or FakeUser()
+        self.is_internal_session = False
+        self.is_super_session = self.user.eid == -1
+        self._query_data = {}
+        
+    def execute(self, *args):
+        pass
+    def commit(self, *args):
+        self._query_data.clear()
+    def close(self, *args):
+        pass
+    def system_sql(self, sql, args=None):
+        pass
+
+    def decorate_rset(self, rset, propagate=False):
+        rset.vreg = self.vreg
+        rset.req = self
+        return rset
+
+    def set_entity_cache(self, entity):
+        pass
+    
+class FakeRepo(object):
+    querier = None
+    def __init__(self, schema, vreg=None, config=None):
+        self.extids = {}
+        self.eids = {}
+        self._count = 0
+        self.schema = schema
+        self.vreg = vreg or FakeVReg()
+        self.config = config or FakeConfig()
+
+    def internal_session(self):
+        return FakeSession(self)
+    
+    def extid2eid(self, source, extid, etype, session, insert=True):
+        try:
+            return self.extids[extid]
+        except KeyError:
+            if not insert:
+                return None
+            self._count += 1
+            eid = self._count
+            entity = source.before_entity_insertion(session, extid, etype, eid)
+            self.extids[extid] = eid
+            self.eids[eid] = extid
+            source.after_entity_insertion(session, extid, entity)
+            return eid
+        
+    def eid2extid(self, source, eid, session=None):
+        return self.eids[eid]
+
+
+class FakeSource(object):
+    dbhelper = get_adv_func_helper('sqlite')
+    indexer = get_indexer('sqlite', 'UTF8')
+    dbhelper.fti_uid_attr = indexer.uid_attr
+    dbhelper.fti_table = indexer.table
+    dbhelper.fti_restriction_sql = indexer.restriction_sql
+    dbhelper.fti_need_distinct_query = indexer.need_distinct
+    def __init__(self, uri):
+        self.uri = uri
+
+        
+class FakePool(object):
+    def source(self, uri):
+        return FakeSource(uri)
+
+# commented until proven to be useful
+## from logging import getLogger
+## from cubicweb import set_log_methods
+## for cls in (FakeConfig, FakeVReg, FakeRequest, FakeSession, FakeRepo,
+##             FakeSource, FakePool):
+##     set_log_methods(cls, getLogger('fake'))