backport unittest_multisources from cubicweb-multisources
authorSylvain Thenault <sylvain.thenault@logilab.fr>
Wed, 07 Jan 2009 15:00:30 +0100
changeset 342 6becc066fc00
parent 341 0a426be2f3a2
child 346 5bbb01a133ae
backport unittest_multisources from cubicweb-multisources
server/test/unittest_multisources.py
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/server/test/unittest_multisources.py	Wed Jan 07 15:00:30 2009 +0100
@@ -0,0 +1,249 @@
+from os.path import dirname, join, abspath
+from logilab.common.decorators import cached
+from mx.DateTime import now
+
+from cubicweb.devtools import TestServerConfiguration, init_test_database
+from cubicweb.devtools.apptest import RepositoryBasedTC
+from cubicweb.devtools.repotest import do_monkey_patch, undo_monkey_patch
+
+class TwoSourcesConfiguration(TestServerConfiguration):
+    sourcefile = 'sources_multi'
+
+        
+class ExternalSourceConfiguration(TestServerConfiguration):
+    sourcefile = 'sources_extern'
+
+repo2, cnx2 = init_test_database('sqlite', config=ExternalSourceConfiguration('data'))
+cu = cnx2.cursor()
+ec1 = cu.execute('INSERT Card X: X title "C3: An external card", X wikiid "aaa"')[0][0]
+cu.execute('INSERT Card X: X title "C4: Ze external card", X wikiid "zzz"')
+aff1 = cu.execute('INSERT Affaire X: X ref "AFFREF", X in_state S WHERE S name "pitetre"')[0][0]
+cnx2.commit()
+
+MTIME = now() - 0.1
+
+# XXX, access existing connection, no pyro connection
+from cubicweb.server.sources.pyrorql import PyroRQLSource
+PyroRQLSource.get_connection = lambda x: cnx2
+# necessary since the repository is closing its initial connections pool though
+# we want to keep cnx2 valid
+from cubicweb.dbapi import Connection
+Connection.close = lambda x: None
+
+class TwoSourcesTC(RepositoryBasedTC):
+    repo_config = TwoSourcesConfiguration('data')
+
+    def setUp(self):
+        RepositoryBasedTC.setUp(self)
+        # trigger discovery
+        self.execute('Card X')
+        self.execute('Affaire X')
+        self.execute('State X')
+        self.commit()
+        # don't delete external entities!
+        self.maxeid = self.session.system_sql('SELECT MAX(eid) FROM entities').fetchone()[0]
+        # add some entities
+        self.ic1 = self.execute('INSERT Card X: X title "C1: An internal card", X wikiid "aaai"')[0][0]
+        self.ic2 = self.execute('INSERT Card X: X title "C2: Ze internal card", X wikiid "zzzi"')[0][0]
+        self.commit()
+        do_monkey_patch()
+        
+    def tearDown(self):
+        RepositoryBasedTC.tearDown(self)
+        undo_monkey_patch()
+
+    def test_eid_comp(self):
+        rset = self.execute('Card X WHERE X eid > 1')
+        self.assertEquals(len(rset), 4)
+        rset = self.execute('Any X,T WHERE X title T, X eid > 1')
+        self.assertEquals(len(rset), 4)
+        
+    def test_metainformation(self):
+        rset = self.execute('Card X ORDERBY T WHERE X title T')
+        # 2 added to the system source, 2 added to the external source
+        self.assertEquals(len(rset), 4)
+        # since they are orderd by eid, we know the 3 first one is coming from the system source
+        # and the others from external source
+        self.assertEquals(rset.get_entity(0, 0).metainformation(), 
+                          {'source': {'adapter': 'native', 'uri': 'system'}, 
+                           'type': u'Card', 'extid': None})
+        externent = rset.get_entity(3, 0)
+        metainf = externent.metainformation()
+        self.assertEquals(metainf['source'], {'adapter': 'pyrorql', 'uri': 'extern'})
+        self.assertEquals(metainf['type'], 'Card')
+        self.assert_(metainf['extid'])
+        etype = self.execute('Any ETN WHERE X is ET, ET name ETN, X eid %(x)s',
+                             {'x': externent.eid}, 'x')[0][0]
+        self.assertEquals(etype, 'Card')
+        
+    def test_order_limit_offset(self):
+        rsetbase = self.execute('Any W,X ORDERBY W,X WHERE X wikiid W')
+        self.assertEquals(len(rsetbase), 4)
+        self.assertEquals(sorted(rsetbase.rows), rsetbase.rows)
+        rset = self.execute('Any W,X ORDERBY W,X LIMIT 2 OFFSET 2 WHERE X wikiid W')
+        self.assertEquals(rset.rows, rsetbase.rows[2:4])
+
+    def test_has_text(self):
+        self.repo.sources[-1].synchronize(MTIME) # in case fti_update has been run before
+        self.failUnless(self.execute('Any X WHERE X has_text "affref"'))
+        self.failUnless(self.execute('Affaire X WHERE X has_text "affref"'))
+
+    def test_anon_has_text(self):
+        self.repo.sources[-1].synchronize(MTIME) # in case fti_update has been run before
+        self.execute('INSERT Affaire X: X ref "no readable card"')[0][0]
+        aff1 = self.execute('INSERT Affaire X: X ref "card"')[0][0]
+        # grant read access
+        self.execute('SET X owned_by U WHERE X eid %(x)s, U login "anon"', {'x': aff1}, 'x')
+        self.commit()
+        cnx = self.login('anon')
+        cu = cnx.cursor()
+        rset = cu.execute('Any X WHERE X has_text "card"')
+        self.assertEquals(len(rset), 5)
+
+    def test_synchronization(self):
+        cu = cnx2.cursor()
+        cu.execute('SET X ref "BLAH" WHERE X eid %(x)s', {'x': aff1}, 'x')
+        aff2 = cu.execute('INSERT Affaire X: X ref "AFFREUX", X in_state S WHERE S name "pitetre"')[0][0]
+        cnx2.commit()
+        try:
+            # force sync
+            self.repo.sources[-1].synchronize(MTIME)
+            self.failUnless(self.execute('Any X WHERE X has_text "blah"'))
+            self.failUnless(self.execute('Any X WHERE X has_text "affreux"'))
+            cu.execute('DELETE Affaire X WHERE X eid %(x)s', {'x': aff2})
+            cnx2.commit()
+            self.repo.sources[-1].synchronize(MTIME)
+            self.failIf(self.execute('Any X WHERE X has_text "affreux"'))
+        finally:
+            # restore state
+            cu.execute('SET X ref "AFFREF" WHERE X eid %(x)s', {'x': aff1}, 'x')
+            cnx2.commit()
+
+    def test_simplifiable_var(self):
+        affeid = self.execute('Affaire X WHERE X ref "AFFREF"')[0][0]
+        rset = self.execute('Any X,AA,AB WHERE E eid %(x)s, E in_state X, X name AA, X modification_date AB',
+                            {'x': affeid}, 'x')
+        self.assertEquals(len(rset), 1)
+        self.assertEquals(rset[0][1], "pitetre")
+
+    def test_simplifiable_var_2(self):
+        affeid = self.execute('Affaire X WHERE X ref "AFFREF"')[0][0]
+        rset = self.execute('Any E WHERE E eid %(x)s, E in_state S, NOT S name "moved"',
+                            {'x': affeid, 'u': self.session.user.eid}, 'x')
+        self.assertEquals(len(rset), 1)
+
+    def test_sort_func(self):
+        self.execute('Affaire X ORDERBY DUMB_SORT(RF) WHERE X ref RF')
+        
+    def test_sort_func_ambigous(self):
+        self.execute('Any X ORDERBY DUMB_SORT(RF) WHERE X title RF')
+
+    def test_in_eid(self):
+        iec1 = self.repo.extid2eid(self.repo.sources[-1], ec1, 'Card', self.session)
+        rset = self.execute('Any X WHERE X eid IN (%s, %s)' % (iec1, self.ic1))
+        self.assertEquals(sorted(r[0] for r in rset.rows), sorted([iec1, self.ic1]))
+        
+    def test_greater_eid(self):
+        rset = self.execute('Any X WHERE X eid > %s' % self.maxeid)
+        self.assertEquals(len(rset.rows), 2) # self.ic1 and self.ic2
+        ec2 = cu.execute('INSERT Card X: X title "glup"')[0][0]
+        cnx2.commit()
+        # 'X eid > something' should not trigger discovery
+        rset = self.execute('Any X WHERE X eid > %s' % self.maxeid)
+        self.assertEquals(len(rset.rows), 2)
+        # trigger discovery using another query
+        crset = self.execute('Card X WHERE X title "glup"')
+        self.assertEquals(len(crset.rows), 1) 
+        rset = self.execute('Any X WHERE X eid > %s' % self.maxeid)
+        self.assertEquals(len(rset.rows), 3)
+        rset = self.execute('Any MAX(X)')
+        self.assertEquals(len(rset.rows), 1)
+        self.assertEquals(rset.rows[0][0], crset[0][0])
+        
+    def test_attr_unification_1(self):
+        n1 = self.execute('INSERT Note X: X type "AFFREF"')[0][0]
+        n2 = self.execute('INSERT Note X: X type "AFFREU"')[0][0]
+        rset = self.execute('Any X,Y WHERE X is Note, Y is Affaire, X type T, Y ref T')
+        self.assertEquals(len(rset), 1, rset.rows)
+
+    def test_attr_unification_2(self):
+        ec2 = cu.execute('INSERT Card X: X title "AFFREF"')[0][0]
+        cnx2.commit()
+        try:
+            c1 = self.execute('INSERT Card C: C title "AFFREF"')[0][0]
+            rset = self.execute('Any X,Y WHERE X is Card, Y is Affaire, X title T, Y ref T')
+            self.assertEquals(len(rset), 2, rset.rows)
+        finally:
+            cu.execute('DELETE Card X WHERE X eid %(x)s', {'x': ec2}, 'x')
+            cnx2.commit()
+
+    def test_attr_unification_neq_1(self):
+        # XXX complete
+        self.execute('Any X,Y WHERE X is Note, Y is Affaire, X creation_date D, Y creation_date > D')
+
+    def test_attr_unification_neq_2(self):
+        # XXX complete
+        self.execute('Any X,Y WHERE X is Card, Y is Affaire, X creation_date D, Y creation_date > D')
+        
+    def test_union(self):
+        afeids = self.execute('Affaire X')
+        ueids = self.execute('EUser X')
+        rset = self.execute('(Any X WHERE X is Affaire) UNION (Any X WHERE X is EUser)')
+        self.assertEquals(sorted(r[0] for r in rset.rows),
+                          sorted(r[0] for r in afeids + ueids))
+        
+    def test_subquery1(self):
+        rsetbase = self.execute('Any W,X WITH W,X BEING (Any W,X ORDERBY W,X WHERE X wikiid W)')
+        self.assertEquals(len(rsetbase), 4)
+        self.assertEquals(sorted(rsetbase.rows), rsetbase.rows)
+        rset = self.execute('Any W,X LIMIT 2 OFFSET 2 WITH W,X BEING (Any W,X ORDERBY W,X WHERE X wikiid W)')
+        self.assertEquals(rset.rows, rsetbase.rows[2:4])
+        rset = self.execute('Any W,X ORDERBY W,X LIMIT 2 OFFSET 2 WITH W,X BEING (Any W,X WHERE X wikiid W)')
+        self.assertEquals(rset.rows, rsetbase.rows[2:4])
+        rset = self.execute('Any W,X WITH W,X BEING (Any W,X ORDERBY W,X LIMIT 2 OFFSET 2 WHERE X wikiid W)')
+        self.assertEquals(rset.rows, rsetbase.rows[2:4])
+        
+    def test_subquery2(self):
+        affeid = self.execute('Affaire X WHERE X ref "AFFREF"')[0][0]
+        rset =self.execute('Any X,AA,AB WITH X,AA,AB BEING (Any X,AA,AB WHERE E eid %(x)s, E in_state X, X name AA, X modification_date AB)',
+                           {'x': affeid})
+        self.assertEquals(len(rset), 1)
+        self.assertEquals(rset[0][1], "pitetre")
+
+    def test_not_relation(self):
+        states = set(tuple(x) for x in self.execute('Any S,SN WHERE S is State, S name SN'))
+        userstate = self.session.user.in_state[0]
+        states.remove((userstate.eid, userstate.name))
+        notstates = set(tuple(x) for x in self.execute('Any S,SN WHERE S is State, S name SN, NOT X in_state S, X eid %(x)s',
+                                                       {'x': self.session.user.eid}, 'x'))
+        self.assertEquals(notstates, states)
+        aff1 = self.execute('Any X WHERE X is Affaire, X ref "AFFREF"')[0][0]
+        aff1stateeid, aff1statename = self.execute('Any S,SN WHERE X eid %(x)s, X in_state S, S name SN', {'x': aff1}, 'x')[0]
+        self.assertEquals(aff1statename, 'pitetre')
+        states.add((userstate.eid, userstate.name))
+        states.remove((aff1stateeid, aff1statename))
+        notstates = set(tuple(x) for x in self.execute('Any S,SN WHERE S is State, S name SN, NOT X in_state S, X eid %(x)s',
+                                                       {'x': aff1}, 'x'))
+        self.set_debug(False)
+        self.assertSetEquals(notstates, states)
+        
+    def test_nonregr1(self):
+        ueid = self.session.user.eid
+        affaire = self.execute('Affaire X WHERE X ref "AFFREF"').get_entity(0, 0)
+        self.execute('Any U WHERE U in_group G, (G name IN ("managers", "logilab") OR (X require_permission P?, P name "bla", P require_group G)), X eid %(x)s, U eid %(u)s',
+                     {'x': affaire.eid, 'u': ueid})
+        
+    def test_nonregr2(self):
+        treid = self.session.user.latest_trinfo().eid
+        rset = self.execute('Any X ORDERBY D DESC WHERE E eid %(x)s, E wf_info_for X, X modification_date D',
+                            {'x': treid})
+        self.assertEquals(len(rset), 1)
+        self.assertEquals(rset.rows[0], [self.session.user.eid])
+
+
+    def test_nonregr3(self):
+        self.execute('DELETE Card X WHERE X eid %(x)s, NOT X multisource_inlined_rel Y', {'x': self.ic1})
+        
+if __name__ == '__main__':
+    from logilab.common.testlib import unittest_main
+    unittest_main()