server/test/unittest_ldapuser.py
branchstable
changeset 8146 67b9b273b70d
parent 7791 31bb51ea5485
child 8188 1867e252e487
equal deleted inserted replaced
8145:e88a24f88143 8146:67b9b273b70d
    30 from cubicweb.devtools.httptest import get_available_port
    30 from cubicweb.devtools.httptest import get_available_port
    31 from cubicweb.devtools import get_test_db_handler
    31 from cubicweb.devtools import get_test_db_handler
    32 
    32 
    33 from cubicweb.server.sources.ldapuser import *
    33 from cubicweb.server.sources.ldapuser import *
    34 
    34 
    35 SYT = 'syt'
       
    36 SYT_EMAIL = 'Sylvain Thenault'
       
    37 ADIM = 'adim'
       
    38 CONFIG = u'''host=%s
    35 CONFIG = u'''host=%s
    39 user-base-dn=ou=People,dc=cubicweb,dc=test
    36 user-base-dn=ou=People,dc=cubicweb,dc=test
    40 user-scope=ONELEVEL
    37 user-scope=ONELEVEL
    41 user-classes=top,posixAccount
    38 user-classes=top,posixAccount
    42 user-login-attr=uid
    39 user-login-attr=uid
    43 user-default-group=users
    40 user-default-group=users
    44 user-attrs-map=gecos:email,uid:login
    41 user-attrs-map=gecos:email,uid:login
    45 '''
    42 '''
    46 
    43 
    47 
       
    48 def nopwd_authenticate(self, session, login, password):
       
    49     """used to monkey patch the source to get successful authentication without
       
    50     upassword checking
       
    51     """
       
    52     assert login, 'no login!'
       
    53     searchfilter = [filter_format('(%s=%s)', (self.user_login_attr, login))]
       
    54     searchfilter.extend(self.base_filters)
       
    55     searchstr = '(&%s)' % ''.join(searchfilter)
       
    56     # first search the user
       
    57     try:
       
    58         user = self._search(session, self.user_base_dn, self.user_base_scope,
       
    59                             searchstr)[0]
       
    60     except IndexError:
       
    61         # no such user
       
    62         raise AuthenticationError()
       
    63     # don't check upassword !
       
    64     return self.repo.extid2eid(self, user['dn'], 'CWUser', session)
       
    65 
    44 
    66 def setUpModule(*args):
    45 def setUpModule(*args):
    67     create_slapd_configuration(LDAPUserSourceTC.config)
    46     create_slapd_configuration(LDAPUserSourceTC.config)
    68 
    47 
    69 def tearDownModule(*args):
    48 def tearDownModule(*args):
    89     #ldapuri = 'ldapi://' + join(basedir, "ldapi").replace('/', '%2f')
    68     #ldapuri = 'ldapi://' + join(basedir, "ldapi").replace('/', '%2f')
    90     port = get_available_port(xrange(9000, 9100))
    69     port = get_available_port(xrange(9000, 9100))
    91     host = 'localhost:%s' % port
    70     host = 'localhost:%s' % port
    92     ldapuri = 'ldap://%s' % host
    71     ldapuri = 'ldap://%s' % host
    93     cmdline = ["/usr/sbin/slapd", "-f",  slapdconf,  "-h",  ldapuri, "-d", "0"]
    72     cmdline = ["/usr/sbin/slapd", "-f",  slapdconf,  "-h",  ldapuri, "-d", "0"]
    94     print "Starting slapd on", ldapuri
    73     print 'Starting slapd:', ' '.join(cmdline)
    95     slapd_process = subprocess.Popen(cmdline)
    74     slapd_process = subprocess.Popen(cmdline)
    96     time.sleep(0.2)
    75     time.sleep(0.2)
    97     if slapd_process.poll() is None:
    76     if slapd_process.poll() is None:
    98         print "slapd started with pid %s" % slapd_process.pid
    77         print "slapd started with pid %s" % slapd_process.pid
    99     else:
    78     else:
   119     tags = CubicWebTC.tags | Tags(('ldap'))
    98     tags = CubicWebTC.tags | Tags(('ldap'))
   120 
    99 
   121     @classmethod
   100     @classmethod
   122     def pre_setup_database(cls, session, config):
   101     def pre_setup_database(cls, session, config):
   123         session.create_entity('CWSource', name=u'ldapuser', type=u'ldapuser',
   102         session.create_entity('CWSource', name=u'ldapuser', type=u'ldapuser',
   124                                     config=CONFIG)
   103                               config=CONFIG)
   125         session.commit()
   104         session.commit()
   126         # XXX keep it there
   105         # XXX keep it there
   127         session.execute('CWUser U')
   106         session.execute('CWUser U')
   128 
       
   129     def patch_authenticate(self):
       
   130         self._orig_authenticate = LDAPUserSource.authenticate
       
   131         LDAPUserSource.authenticate = nopwd_authenticate
       
   132 
       
   133     def tearDown(self):
       
   134         if hasattr(self, '_orig_authenticate'):
       
   135             LDAPUserSource.authenticate = self._orig_authenticate
       
   136         CubicWebTC.tearDown(self)
       
   137 
   107 
   138     def test_authenticate(self):
   108     def test_authenticate(self):
   139         source = self.repo.sources_by_uri['ldapuser']
   109         source = self.repo.sources_by_uri['ldapuser']
   140         self.session.set_cnxset()
   110         self.session.set_cnxset()
   141         self.assertRaises(AuthenticationError,
   111         self.assertRaises(AuthenticationError,
   145         source = self.repo.sources_by_uri['ldapuser']
   115         source = self.repo.sources_by_uri['ldapuser']
   146         source.synchronize()
   116         source.synchronize()
   147 
   117 
   148     def test_base(self):
   118     def test_base(self):
   149         # check a known one
   119         # check a known one
   150         rset = self.sexecute('CWUser X WHERE X login %(login)s', {'login': SYT})
   120         rset = self.sexecute('CWUser X WHERE X login %(login)s', {'login': 'syt'})
   151         e = rset.get_entity(0, 0)
   121         e = rset.get_entity(0, 0)
   152         self.assertEqual(e.login, SYT)
   122         self.assertEqual(e.login, 'syt')
   153         e.complete()
   123         e.complete()
   154         self.assertEqual(e.creation_date, None)
   124         self.assertEqual(e.creation_date, None)
   155         self.assertEqual(e.modification_date, None)
   125         self.assertEqual(e.modification_date, None)
   156         self.assertEqual(e.firstname, None)
   126         self.assertEqual(e.firstname, None)
   157         self.assertEqual(e.surname, None)
   127         self.assertEqual(e.surname, None)
   158         self.assertEqual(e.in_group[0].name, 'users')
   128         self.assertEqual(e.in_group[0].name, 'users')
   159         self.assertEqual(e.owned_by[0].login, SYT)
   129         self.assertEqual(e.owned_by[0].login, 'syt')
   160         self.assertEqual(e.created_by, ())
   130         self.assertEqual(e.created_by, ())
   161         self.assertEqual(e.primary_email[0].address, SYT_EMAIL)
   131         self.assertEqual(e.primary_email[0].address, 'Sylvain Thenault')
   162         # email content should be indexed on the user
   132         # email content should be indexed on the user
   163         rset = self.sexecute('CWUser X WHERE X has_text "thenault"')
   133         rset = self.sexecute('CWUser X WHERE X has_text "thenault"')
   164         self.assertEqual(rset.rows, [[e.eid]])
   134         self.assertEqual(rset.rows, [[e.eid]])
   165 
   135 
   166     def test_not(self):
   136     def test_not(self):
   167         eid = self.sexecute('CWUser X WHERE X login %(login)s', {'login': SYT})[0][0]
   137         eid = self.sexecute('CWUser X WHERE X login %(login)s', {'login': 'syt'})[0][0]
   168         rset = self.sexecute('CWUser X WHERE NOT X eid %s' % eid)
   138         rset = self.sexecute('CWUser X WHERE NOT X eid %s' % eid)
   169         self.assert_(rset)
   139         self.assert_(rset)
   170         self.assert_(not eid in (r[0] for r in rset))
   140         self.assert_(not eid in (r[0] for r in rset))
   171 
   141 
   172     def test_multiple(self):
   142     def test_multiple(self):
   173         seid = self.sexecute('CWUser X WHERE X login %(login)s', {'login': SYT})[0][0]
   143         seid = self.sexecute('CWUser X WHERE X login %(login)s', {'login': 'syt'})[0][0]
   174         aeid = self.sexecute('CWUser X WHERE X login %(login)s', {'login': ADIM})[0][0]
   144         aeid = self.sexecute('CWUser X WHERE X login %(login)s', {'login': 'adim'})[0][0]
   175         rset = self.sexecute('CWUser X, Y WHERE X login %(syt)s, Y login %(adim)s',
   145         rset = self.sexecute('CWUser X, Y WHERE X login %(syt)s, Y login %(adim)s',
   176                             {'syt': SYT, 'adim': ADIM})
   146                             {'syt': 'syt', 'adim': 'adim'})
   177         self.assertEqual(rset.rows, [[seid, aeid]])
   147         self.assertEqual(rset.rows, [[seid, aeid]])
   178         rset = self.sexecute('Any X,Y,L WHERE X login L, X login %(syt)s, Y login %(adim)s',
   148         rset = self.sexecute('Any X,Y,L WHERE X login L, X login %(syt)s, Y login %(adim)s',
   179                             {'syt': SYT, 'adim': ADIM})
   149                             {'syt': 'syt', 'adim': 'adim'})
   180         self.assertEqual(rset.rows, [[seid, aeid, SYT]])
   150         self.assertEqual(rset.rows, [[seid, aeid, 'syt']])
   181 
   151 
   182     def test_in(self):
   152     def test_in(self):
   183         seid = self.sexecute('CWUser X WHERE X login %(login)s', {'login': SYT})[0][0]
   153         seid = self.sexecute('CWUser X WHERE X login %(login)s', {'login': 'syt'})[0][0]
   184         aeid = self.sexecute('CWUser X WHERE X login %(login)s', {'login': ADIM})[0][0]
   154         aeid = self.sexecute('CWUser X WHERE X login %(login)s', {'login': 'adim'})[0][0]
   185         rset = self.sexecute('Any X,L ORDERBY L WHERE X login IN("%s", "%s"), X login L' % (SYT, ADIM))
   155         rset = self.sexecute('Any X,L ORDERBY L WHERE X login IN("%s", "%s"), X login L' % ('syt', 'adim'))
   186         self.assertEqual(rset.rows, [[aeid, ADIM], [seid, SYT]])
   156         self.assertEqual(rset.rows, [[aeid, 'adim'], [seid, 'syt']])
   187 
   157 
   188     def test_relations(self):
   158     def test_relations(self):
   189         eid = self.sexecute('CWUser X WHERE X login %(login)s', {'login': SYT})[0][0]
   159         eid = self.sexecute('CWUser X WHERE X login %(login)s', {'login': 'syt'})[0][0]
   190         rset = self.sexecute('Any X,E WHERE X is CWUser, X login L, X primary_email E')
   160         rset = self.sexecute('Any X,E WHERE X is CWUser, X login L, X primary_email E')
   191         self.assert_(eid in (r[0] for r in rset))
   161         self.assert_(eid in (r[0] for r in rset))
   192         rset = self.sexecute('Any X,L,E WHERE X is CWUser, X login L, X primary_email E')
   162         rset = self.sexecute('Any X,L,E WHERE X is CWUser, X login L, X primary_email E')
   193         self.assert_(SYT in (r[1] for r in rset))
   163         self.assert_('syt' in (r[1] for r in rset))
   194 
   164 
   195     def test_count(self):
   165     def test_count(self):
   196         nbusers = self.sexecute('Any COUNT(X) WHERE X is CWUser')[0][0]
   166         nbusers = self.sexecute('Any COUNT(X) WHERE X is CWUser')[0][0]
   197         # just check this is a possible number
   167         # just check this is a possible number
   198         self.assert_(nbusers > 1, nbusers)
   168         self.assert_(nbusers > 1, nbusers)
   199         self.assert_(nbusers < 30, nbusers)
   169         self.assert_(nbusers < 30, nbusers)
   200 
   170 
   201     def test_upper(self):
   171     def test_upper(self):
   202         eid = self.sexecute('CWUser X WHERE X login %(login)s', {'login': SYT})[0][0]
   172         eid = self.sexecute('CWUser X WHERE X login %(login)s', {'login': 'syt'})[0][0]
   203         rset = self.sexecute('Any UPPER(L) WHERE X eid %s, X login L' % eid)
   173         rset = self.sexecute('Any UPPER(L) WHERE X eid %s, X login L' % eid)
   204         self.assertEqual(rset[0][0], SYT.upper())
   174         self.assertEqual(rset[0][0], 'syt'.upper())
   205 
   175 
   206     def test_unknown_attr(self):
   176     def test_unknown_attr(self):
   207         eid = self.sexecute('CWUser X WHERE X login %(login)s', {'login': SYT})[0][0]
   177         eid = self.sexecute('CWUser X WHERE X login %(login)s', {'login': 'syt'})[0][0]
   208         rset = self.sexecute('Any L,C,M WHERE X eid %s, X login L, '
   178         rset = self.sexecute('Any L,C,M WHERE X eid %s, X login L, '
   209                             'X creation_date C, X modification_date M' % eid)
   179                             'X creation_date C, X modification_date M' % eid)
   210         self.assertEqual(rset[0][0], SYT)
   180         self.assertEqual(rset[0][0], 'syt')
   211         self.assertEqual(rset[0][1], None)
   181         self.assertEqual(rset[0][1], None)
   212         self.assertEqual(rset[0][2], None)
   182         self.assertEqual(rset[0][2], None)
   213 
   183 
   214     def test_sort(self):
   184     def test_sort(self):
   215         logins = [l for l, in self.sexecute('Any L ORDERBY L WHERE X login L')]
   185         logins = [l for l, in self.sexecute('Any L ORDERBY L WHERE X login L')]
   219         logins = [l for l, in self.sexecute('Any L ORDERBY lower(L) WHERE X login L')]
   189         logins = [l for l, in self.sexecute('Any L ORDERBY lower(L) WHERE X login L')]
   220         self.assertEqual(logins, sorted(logins))
   190         self.assertEqual(logins, sorted(logins))
   221 
   191 
   222     def test_or(self):
   192     def test_or(self):
   223         rset = self.sexecute('DISTINCT Any X WHERE X login %(login)s OR (X in_group G, G name "managers")',
   193         rset = self.sexecute('DISTINCT Any X WHERE X login %(login)s OR (X in_group G, G name "managers")',
   224                             {'login': SYT})
   194                             {'login': 'syt'})
   225         self.assertEqual(len(rset), 2, rset.rows) # syt + admin
   195         self.assertEqual(len(rset), 2, rset.rows) # syt + admin
   226 
   196 
   227     def test_nonregr_set_owned_by(self):
   197     def test_nonregr_set_owned_by(self):
   228         # test that when a user coming from ldap is triggering a transition
   198         # test that when a user coming from ldap is triggering a transition
   229         # the related TrInfo has correct owner information
   199         # the related TrInfo has correct owner information
   230         self.sexecute('SET X in_group G WHERE X login %(syt)s, G name "managers"', {'syt': SYT})
   200         self.sexecute('SET X in_group G WHERE X login %(syt)s, G name "managers"', {'syt': 'syt'})
   231         self.commit()
   201         self.commit()
   232         syt = self.sexecute('CWUser X WHERE X login %(login)s', {'login': SYT}).get_entity(0, 0)
   202         syt = self.sexecute('CWUser X WHERE X login %(login)s', {'login': 'syt'}).get_entity(0, 0)
   233         self.assertEqual([g.name for g in syt.in_group], ['managers', 'users'])
   203         self.assertEqual([g.name for g in syt.in_group], ['managers', 'users'])
   234         self.patch_authenticate()
   204         cnx = self.login('syt', password='syt')
   235         cnx = self.login(SYT, password='dummypassword')
       
   236         cu = cnx.cursor()
   205         cu = cnx.cursor()
   237         adim = cu.execute('CWUser X WHERE X login %(login)s', {'login': ADIM}).get_entity(0, 0)
   206         adim = cu.execute('CWUser X WHERE X login %(login)s', {'login': 'adim'}).get_entity(0, 0)
   238         iworkflowable = adim.cw_adapt_to('IWorkflowable')
   207         iworkflowable = adim.cw_adapt_to('IWorkflowable')
   239         iworkflowable.fire_transition('deactivate')
   208         iworkflowable.fire_transition('deactivate')
   240         try:
   209         try:
   241             cnx.commit()
   210             cnx.commit()
   242             adim.cw_clear_all_caches()
   211             adim.cw_clear_all_caches()
   243             self.assertEqual(adim.in_state[0].name, 'deactivated')
   212             self.assertEqual(adim.in_state[0].name, 'deactivated')
   244             trinfo = iworkflowable.latest_trinfo()
   213             trinfo = iworkflowable.latest_trinfo()
   245             self.assertEqual(trinfo.owned_by[0].login, SYT)
   214             self.assertEqual(trinfo.owned_by[0].login, 'syt')
   246             # select from_state to skip the user's creation TrInfo
   215             # select from_state to skip the user's creation TrInfo
   247             rset = self.sexecute('Any U ORDERBY D DESC WHERE WF wf_info_for X,'
   216             rset = self.sexecute('Any U ORDERBY D DESC WHERE WF wf_info_for X,'
   248                                 'WF creation_date D, WF from_state FS,'
   217                                 'WF creation_date D, WF from_state FS,'
   249                                 'WF owned_by U?, X eid %(x)s',
   218                                 'WF owned_by U?, X eid %(x)s',
   250                                 {'x': adim.eid})
   219                                 {'x': adim.eid})
   251             self.assertEqual(rset.rows, [[syt.eid]])
   220             self.assertEqual(rset.rows, [[syt.eid]])
   252         finally:
   221         finally:
   253             # restore db state
   222             # restore db state
   254             self.restore_connection()
   223             self.restore_connection()
   255             adim = self.sexecute('CWUser X WHERE X login %(login)s', {'login': ADIM}).get_entity(0, 0)
   224             adim = self.sexecute('CWUser X WHERE X login %(login)s', {'login': 'adim'}).get_entity(0, 0)
   256             adim.cw_adapt_to('IWorkflowable').fire_transition('activate')
   225             adim.cw_adapt_to('IWorkflowable').fire_transition('activate')
   257             self.sexecute('DELETE X in_group G WHERE X login %(syt)s, G name "managers"', {'syt': SYT})
   226             self.sexecute('DELETE X in_group G WHERE X login %(syt)s, G name "managers"', {'syt': 'syt'})
   258 
   227 
   259     def test_same_column_names(self):
   228     def test_same_column_names(self):
   260         self.sexecute('Any X, Y WHERE X copain Y, X login "comme", Y login "cochon"')
   229         self.sexecute('Any X, Y WHERE X copain Y, X login "comme", Y login "cochon"')
   261 
   230 
   262     def test_multiple_entities_from_different_sources(self):
   231     def test_multiple_entities_from_different_sources(self):
   263         req = self.request()
   232         req = self.request()
   264         self.create_user(req, 'cochon')
   233         self.create_user(req, 'cochon')
   265         self.assertTrue(self.sexecute('Any X,Y WHERE X login %(syt)s, Y login "cochon"', {'syt': SYT}))
   234         self.assertTrue(self.sexecute('Any X,Y WHERE X login %(syt)s, Y login "cochon"', {'syt': 'syt'}))
   266 
   235 
   267     def test_exists1(self):
   236     def test_exists1(self):
   268         self.session.set_cnxset()
   237         self.session.set_cnxset()
   269         self.session.create_entity('CWGroup', name=u'bougloup1')
   238         self.session.create_entity('CWGroup', name=u'bougloup1')
   270         self.session.create_entity('CWGroup', name=u'bougloup2')
   239         self.session.create_entity('CWGroup', name=u'bougloup2')
   271         self.sexecute('SET U in_group G WHERE G name ~= "bougloup%", U login "admin"')
   240         self.sexecute('SET U in_group G WHERE G name ~= "bougloup%", U login "admin"')
   272         self.sexecute('SET U in_group G WHERE G name = "bougloup1", U login %(syt)s', {'syt': SYT})
   241         self.sexecute('SET U in_group G WHERE G name = "bougloup1", U login %(syt)s', {'syt': 'syt'})
   273         rset = self.sexecute('Any L,SN ORDERBY L WHERE X in_state S, '
   242         rset = self.sexecute('Any L,SN ORDERBY L WHERE X in_state S, '
   274                              'S name SN, X login L, EXISTS(X in_group G, G name ~= "bougloup%")')
   243                              'S name SN, X login L, EXISTS(X in_group G, G name ~= "bougloup%")')
   275         self.assertEqual(rset.rows, [['admin', 'activated'], [SYT, 'activated']])
   244         self.assertEqual(rset.rows, [['admin', 'activated'], ['syt', 'activated']])
   276 
   245 
   277     def test_exists2(self):
   246     def test_exists2(self):
   278         req = self.request()
   247         req = self.request()
   279         self.create_user(req, 'comme')
   248         self.create_user(req, 'comme')
   280         self.create_user(req, 'cochon')
   249         self.create_user(req, 'cochon')
   287         req = self.request()
   256         req = self.request()
   288         self.create_user(req, 'comme')
   257         self.create_user(req, 'comme')
   289         self.create_user(req, 'cochon')
   258         self.create_user(req, 'cochon')
   290         self.sexecute('SET X copain Y WHERE X login "comme", Y login "cochon"')
   259         self.sexecute('SET X copain Y WHERE X login "comme", Y login "cochon"')
   291         self.assertTrue(self.sexecute('Any X, Y WHERE X copain Y, X login "comme", Y login "cochon"'))
   260         self.assertTrue(self.sexecute('Any X, Y WHERE X copain Y, X login "comme", Y login "cochon"'))
   292         self.sexecute('SET X copain Y WHERE X login %(syt)s, Y login "cochon"', {'syt': SYT})
   261         self.sexecute('SET X copain Y WHERE X login %(syt)s, Y login "cochon"', {'syt': 'syt'})
   293         self.assertTrue(self.sexecute('Any X, Y WHERE X copain Y, X login %(syt)s, Y login "cochon"', {'syt': SYT}))
   262         self.assertTrue(self.sexecute('Any X, Y WHERE X copain Y, X login %(syt)s, Y login "cochon"', {'syt': 'syt'}))
   294         rset = self.sexecute('Any GN,L WHERE X in_group G, X login L, G name GN, G name "managers" '
   263         rset = self.sexecute('Any GN,L WHERE X in_group G, X login L, G name GN, G name "managers" '
   295                              'OR EXISTS(X copain T, T login in ("comme", "cochon"))')
   264                              'OR EXISTS(X copain T, T login in ("comme", "cochon"))')
   296         self.assertEqual(sorted(rset.rows), [['managers', 'admin'], ['users', 'comme'], ['users', SYT]])
   265         self.assertEqual(sorted(rset.rows), [['managers', 'admin'], ['users', 'comme'], ['users', 'syt']])
   297 
   266 
   298     def test_exists4(self):
   267     def test_exists4(self):
   299         req = self.request()
   268         req = self.request()
   300         self.create_user(req, 'comme')
   269         self.create_user(req, 'comme')
   301         self.create_user(req, 'cochon', groups=('users', 'guests'))
   270         self.create_user(req, 'cochon', groups=('users', 'guests'))
   302         self.create_user(req, 'billy')
   271         self.create_user(req, 'billy')
   303         self.sexecute('SET X copain Y WHERE X login "comme", Y login "cochon"')
   272         self.sexecute('SET X copain Y WHERE X login "comme", Y login "cochon"')
   304         self.sexecute('SET X copain Y WHERE X login "cochon", Y login "cochon"')
   273         self.sexecute('SET X copain Y WHERE X login "cochon", Y login "cochon"')
   305         self.sexecute('SET X copain Y WHERE X login "comme", Y login "billy"')
   274         self.sexecute('SET X copain Y WHERE X login "comme", Y login "billy"')
   306         self.sexecute('SET X copain Y WHERE X login %(syt)s, Y login "billy"', {'syt': SYT})
   275         self.sexecute('SET X copain Y WHERE X login %(syt)s, Y login "billy"', {'syt': 'syt'})
   307         # search for group name, login where
   276         # search for group name, login where
   308         #   CWUser copain with "comme" or "cochon" AND same login as the copain
   277         #   CWUser copain with "comme" or "cochon" AND same login as the copain
   309         # OR
   278         # OR
   310         #   CWUser in_state activated AND not copain with billy
   279         #   CWUser in_state activated AND not copain with billy
   311         #
   280         #
   313         rset= self.sexecute('Any GN,L WHERE X in_group G, X login L, G name GN, '
   282         rset= self.sexecute('Any GN,L WHERE X in_group G, X login L, G name GN, '
   314                            'EXISTS(X copain T, T login L, T login in ("comme", "cochon")) OR '
   283                            'EXISTS(X copain T, T login L, T login in ("comme", "cochon")) OR '
   315                            'EXISTS(X in_state S, S name "activated", NOT X copain T2, T2 login "billy")')
   284                            'EXISTS(X in_state S, S name "activated", NOT X copain T2, T2 login "billy")')
   316         all = self.sexecute('Any GN, L WHERE X in_group G, X login L, G name GN')
   285         all = self.sexecute('Any GN, L WHERE X in_group G, X login L, G name GN')
   317         all.rows.remove(['users', 'comme'])
   286         all.rows.remove(['users', 'comme'])
   318         all.rows.remove(['users', SYT])
   287         all.rows.remove(['users', 'syt'])
   319         self.assertEqual(sorted(rset.rows), sorted(all.rows))
   288         self.assertEqual(sorted(rset.rows), sorted(all.rows))
   320 
   289 
   321     def test_exists5(self):
   290     def test_exists5(self):
   322         req = self.request()
   291         req = self.request()
   323         self.create_user(req, 'comme')
   292         self.create_user(req, 'comme')
   324         self.create_user(req, 'cochon', groups=('users', 'guests'))
   293         self.create_user(req, 'cochon', groups=('users', 'guests'))
   325         self.create_user(req, 'billy')
   294         self.create_user(req, 'billy')
   326         self.sexecute('SET X copain Y WHERE X login "comme", Y login "cochon"')
   295         self.sexecute('SET X copain Y WHERE X login "comme", Y login "cochon"')
   327         self.sexecute('SET X copain Y WHERE X login "cochon", Y login "cochon"')
   296         self.sexecute('SET X copain Y WHERE X login "cochon", Y login "cochon"')
   328         self.sexecute('SET X copain Y WHERE X login "comme", Y login "billy"')
   297         self.sexecute('SET X copain Y WHERE X login "comme", Y login "billy"')
   329         self.sexecute('SET X copain Y WHERE X login %(syt)s, Y login "cochon"', {'syt': SYT})
   298         self.sexecute('SET X copain Y WHERE X login %(syt)s, Y login "cochon"', {'syt': 'syt'})
   330         rset= self.sexecute('Any L WHERE X login L, '
   299         rset= self.sexecute('Any L WHERE X login L, '
   331                            'EXISTS(X copain T, T login in ("comme", "cochon")) AND '
   300                            'EXISTS(X copain T, T login in ("comme", "cochon")) AND '
   332                            'NOT EXISTS(X copain T2, T2 login "billy")')
   301                            'NOT EXISTS(X copain T2, T2 login "billy")')
   333         self.assertEqual(sorted(rset.rows), [['cochon'], [SYT]])
   302         self.assertEqual(sorted(rset.rows), [['cochon'], ['syt']])
   334         rset= self.sexecute('Any GN,L WHERE X in_group G, X login L, G name GN, '
   303         rset= self.sexecute('Any GN,L WHERE X in_group G, X login L, G name GN, '
   335                            'EXISTS(X copain T, T login in ("comme", "cochon")) AND '
   304                            'EXISTS(X copain T, T login in ("comme", "cochon")) AND '
   336                            'NOT EXISTS(X copain T2, T2 login "billy")')
   305                            'NOT EXISTS(X copain T2, T2 login "billy")')
   337         self.assertEqual(sorted(rset.rows), [['guests', 'cochon'],
   306         self.assertEqual(sorted(rset.rows), [['guests', 'cochon'],
   338                                               ['users', 'cochon'],
   307                                               ['users', 'cochon'],
   339                                               ['users', SYT]])
   308                                               ['users', 'syt']])
   340 
   309 
   341     def test_cd_restriction(self):
   310     def test_cd_restriction(self):
   342         rset = self.sexecute('CWUser X WHERE X creation_date > "2009-02-01"')
   311         rset = self.sexecute('CWUser X WHERE X creation_date > "2009-02-01"')
   343         # admin/anon but no ldap user since it doesn't support creation_date
   312         # admin/anon but no ldap user since it doesn't support creation_date
   344         self.assertEqual(sorted(e.login for e in rset.entities()),
   313         self.assertEqual(sorted(e.login for e in rset.entities()),
   357         cnx = self.login('iaminguestsgrouponly')
   326         cnx = self.login('iaminguestsgrouponly')
   358         return cnx.cursor()
   327         return cnx.cursor()
   359 
   328 
   360     def test_security1(self):
   329     def test_security1(self):
   361         cu = self._init_security_test()
   330         cu = self._init_security_test()
   362         rset = cu.execute('CWUser X WHERE X login %(login)s', {'login': SYT})
   331         rset = cu.execute('CWUser X WHERE X login %(login)s', {'login': 'syt'})
   363         self.assertEqual(rset.rows, [])
   332         self.assertEqual(rset.rows, [])
   364         rset = cu.execute('Any X WHERE X login "iaminguestsgrouponly"')
   333         rset = cu.execute('Any X WHERE X login "iaminguestsgrouponly"')
   365         self.assertEqual(len(rset.rows), 1)
   334         self.assertEqual(len(rset.rows), 1)
   366 
   335 
   367     def test_security2(self):
   336     def test_security2(self):
   368         cu = self._init_security_test()
   337         cu = self._init_security_test()
   369         rset = cu.execute('Any X WHERE X has_text %(syt)s', {'syt': SYT})
   338         rset = cu.execute('Any X WHERE X has_text %(syt)s', {'syt': 'syt'})
   370         self.assertEqual(rset.rows, [])
   339         self.assertEqual(rset.rows, [])
   371         rset = cu.execute('Any X WHERE X has_text "iaminguestsgrouponly"')
   340         rset = cu.execute('Any X WHERE X has_text "iaminguestsgrouponly"')
   372         self.assertEqual(len(rset.rows), 1)
   341         self.assertEqual(len(rset.rows), 1)
   373 
   342 
   374     def test_security3(self):
   343     def test_security3(self):
   375         cu = self._init_security_test()
   344         cu = self._init_security_test()
   376         rset = cu.execute('Any F WHERE X has_text %(syt)s, X firstname F', {'syt': SYT})
   345         rset = cu.execute('Any F WHERE X has_text %(syt)s, X firstname F', {'syt': 'syt'})
   377         self.assertEqual(rset.rows, [])
   346         self.assertEqual(rset.rows, [])
   378         rset = cu.execute('Any F WHERE X has_text "iaminguestsgrouponly", X firstname F')
   347         rset = cu.execute('Any F WHERE X has_text "iaminguestsgrouponly", X firstname F')
   379         self.assertEqual(rset.rows, [[None]])
   348         self.assertEqual(rset.rows, [[None]])
   380 
   349 
   381     def test_copy_to_system_source(self):
   350     def test_copy_to_system_source(self):
   382         source = self.repo.sources_by_uri['ldapuser']
   351         source = self.repo.sources_by_uri['ldapuser']
   383         eid = self.sexecute('CWUser X WHERE X login %(login)s', {'login': SYT})[0][0]
   352         eid = self.sexecute('CWUser X WHERE X login %(login)s', {'login': 'syt'})[0][0]
   384         self.sexecute('SET X cw_source S WHERE X eid %(x)s, S name "system"', {'x': eid})
   353         self.sexecute('SET X cw_source S WHERE X eid %(x)s, S name "system"', {'x': eid})
   385         self.commit()
   354         self.commit()
   386         source.reset_caches()
   355         source.reset_caches()
   387         rset = self.sexecute('CWUser X WHERE X login %(login)s', {'login': SYT})
   356         rset = self.sexecute('CWUser X WHERE X login %(login)s', {'login': 'syt'})
   388         self.assertEqual(len(rset), 1)
   357         self.assertEqual(len(rset), 1)
   389         e = rset.get_entity(0, 0)
   358         e = rset.get_entity(0, 0)
   390         self.assertEqual(e.eid, eid)
   359         self.assertEqual(e.eid, eid)
   391         self.assertEqual(e.cw_metainformation(), {'source': {'type': u'native', 'uri': u'system', 'use-cwuri-as-url': False},
   360         self.assertEqual(e.cw_metainformation(), {'source': {'type': u'native', 'uri': u'system', 'use-cwuri-as-url': False},
   392                                                   'type': 'CWUser',
   361                                                   'type': 'CWUser',
   394         self.assertEqual(e.cw_source[0].name, 'system')
   363         self.assertEqual(e.cw_source[0].name, 'system')
   395         self.assertTrue(e.creation_date)
   364         self.assertTrue(e.creation_date)
   396         self.assertTrue(e.modification_date)
   365         self.assertTrue(e.modification_date)
   397         # XXX test some password has been set
   366         # XXX test some password has been set
   398         source.synchronize()
   367         source.synchronize()
   399         rset = self.sexecute('CWUser X WHERE X login %(login)s', {'login': SYT})
   368         rset = self.sexecute('CWUser X WHERE X login %(login)s', {'login': 'syt'})
   400         self.assertEqual(len(rset), 1)
   369         self.assertEqual(len(rset), 1)
   401 
   370 
   402     def test_nonregr1(self):
   371     def test_nonregr1(self):
   403         self.sexecute('Any X,AA ORDERBY AA DESC WHERE E eid %(x)s, E owned_by X, '
   372         self.sexecute('Any X,AA ORDERBY AA DESC WHERE E eid %(x)s, E owned_by X, '
   404                      'X modification_date AA',
   373                      'X modification_date AA',