cubicweb/devtools/repotest.py
changeset 12235 03b94c9863de
parent 12125 1d3a9bb46339
child 12237 2dd0dcb2e5f9
equal deleted inserted replaced
12234:14fdd5888cbb 12235:03b94c9863de
     1 # copyright 2003-2016 LOGILAB S.A. (Paris, FRANCE), all rights reserved.
     1 # copyright 2003 LOGILAB S.A. (Paris, FRANCE), all rights reserved.
     2 # contact http://www.logilab.fr/ -- mailto:contact@logilab.fr
     2 # contact http://www.logilab.fr/ -- mailto:contact@logilab.fr
     3 #
     3 #
     4 # This file is part of CubicWeb.
     4 # This file is part of CubicWeb.
     5 #
     5 #
     6 # CubicWeb is free software: you can redistribute it and/or modify it under the
     6 # CubicWeb is free software: you can redistribute it and/or modify it under the
    22 from __future__ import print_function
    22 from __future__ import print_function
    23 
    23 
    24 from contextlib import contextmanager
    24 from contextlib import contextmanager
    25 from pprint import pprint
    25 from pprint import pprint
    26 
    26 
    27 from logilab.common.testlib import SkipTest
    27 
    28 
       
    29 from cubicweb.devtools.testlib import RepoAccess
       
    30 from cubicweb.entities.authobjs import user_session_cache_key
    28 from cubicweb.entities.authobjs import user_session_cache_key
       
    29 from cubicweb.server import set_debug, debugged
       
    30 from cubicweb.server.sources.rql2sql import remove_unused_solutions
       
    31 
       
    32 from .testlib import RepoAccess, BaseTestCase
       
    33 from .fake import FakeRequest
       
    34 
    31 
    35 
    32 def tuplify(mylist):
    36 def tuplify(mylist):
    33     return [tuple(item) for item in mylist]
    37     return [tuple(item) for item in mylist]
    34 
    38 
    35 
    39 
   127         for rdef in x.rdefs.values():
   131         for rdef in x.rdefs.values():
   128             rdef.eid = schema_eids[(rdef.subject, rdef.rtype, rdef.object)]
   132             rdef.eid = schema_eids[(rdef.subject, rdef.rtype, rdef.object)]
   129             schema._eid_index[rdef.eid] = rdef
   133             schema._eid_index[rdef.eid] = rdef
   130 
   134 
   131 
   135 
   132 from logilab.common.testlib import TestCase, mock_object
   136 class BaseQuerierTC(BaseTestCase):
   133 from logilab.database import get_db_helper
       
   134 
       
   135 from rql import RQLHelper
       
   136 
       
   137 from cubicweb.devtools.testlib import BaseTestCase
       
   138 from cubicweb.devtools.fake import FakeRepo, FakeConfig, FakeRequest, FakeConnection
       
   139 from cubicweb.server import set_debug, debugged
       
   140 from cubicweb.server.querier import QuerierHelper
       
   141 from cubicweb.server.sources.rql2sql import SQLGenerator, remove_unused_solutions
       
   142 
       
   143 class RQLGeneratorTC(BaseTestCase):
       
   144     schema = backend = None # set this in concrete class
       
   145 
       
   146     @classmethod
       
   147     def setUpClass(cls):
       
   148         if cls.backend is not None:
       
   149             try:
       
   150                 cls.dbhelper = get_db_helper(cls.backend)
       
   151             except ImportError as ex:
       
   152                 raise SkipTest(str(ex))
       
   153 
       
   154     def setUp(self):
       
   155         self.repo = FakeRepo(self.schema, config=FakeConfig(apphome=self.datadir))
       
   156         self.repo.system_source = mock_object(dbdriver=self.backend)
       
   157         self.rqlhelper = RQLHelper(self.schema,
       
   158                                    special_relations={'eid': 'uid',
       
   159                                                       'has_text': 'fti'},
       
   160                                    backend=self.backend)
       
   161         self.qhelper = QuerierHelper(self.repo, self.schema)
       
   162         ExecutionPlan._check_permissions = _dummy_check_permissions
       
   163         rqlannotation._select_principal = _select_principal
       
   164         if self.backend is not None:
       
   165             self.o = SQLGenerator(self.schema, self.dbhelper)
       
   166 
       
   167     def tearDown(self):
       
   168         ExecutionPlan._check_permissions = _orig_check_permissions
       
   169         rqlannotation._select_principal = _orig_select_principal
       
   170 
       
   171     def set_debug(self, debug):
       
   172         set_debug(debug)
       
   173     def debugged(self, debug):
       
   174         return debugged(debug)
       
   175 
       
   176     def _prepare(self, rql):
       
   177         #print '******************** prepare', rql
       
   178         union = self.rqlhelper.parse(rql)
       
   179         #print '********* parsed', union.as_string()
       
   180         self.rqlhelper.compute_solutions(union)
       
   181         #print '********* solutions', solutions
       
   182         self.rqlhelper.simplify(union)
       
   183         #print '********* simplified', union.as_string()
       
   184         plan = self.qhelper.plan_factory(union, {}, FakeConnection(self.repo))
       
   185         plan.preprocess(union)
       
   186         for select in union.children:
       
   187             select.solutions.sort(key=lambda x: list(x.items()))
       
   188         #print '********* ppsolutions', solutions
       
   189         return union
       
   190 
       
   191 
       
   192 class BaseQuerierTC(TestCase):
       
   193     repo = None # set this in concrete class
   137     repo = None # set this in concrete class
   194 
   138 
   195     def setUp(self):
   139     def setUp(self):
   196         self.o = self.repo.querier
   140         self.o = self.repo.querier
   197         self.admin_access = RepoAccess(self.repo, 'admin', FakeRequest)
   141         self.admin_access = RepoAccess(self.repo, 'admin', FakeRequest)
   314 def _check_permissions(*args, **kwargs):
   258 def _check_permissions(*args, **kwargs):
   315     res, restricted = _orig_check_permissions(*args, **kwargs)
   259     res, restricted = _orig_check_permissions(*args, **kwargs)
   316     res = DumbOrderedDict(sorted(res.items(), key=lambda x: [list(y.items()) for y in x[1]]))
   260     res = DumbOrderedDict(sorted(res.items(), key=lambda x: [list(y.items()) for y in x[1]]))
   317     return res, restricted
   261     return res, restricted
   318 
   262 
   319 def _dummy_check_permissions(self, rqlst):
       
   320     return {(): rqlst.solutions}, set()
       
   321 
   263 
   322 from cubicweb.server import rqlannotation
   264 from cubicweb.server import rqlannotation
   323 _orig_select_principal = rqlannotation._select_principal
   265 _orig_select_principal = rqlannotation._select_principal
   324 
   266 
   325 def _select_principal(scope, relations):
   267 def _select_principal(scope, relations):