111 with self.session.repo.internal_session() as isession: |
111 with self.session.repo.internal_session() as isession: |
112 lfsource = isession.repo.sources_by_uri['ldapuser'] |
112 lfsource = isession.repo.sources_by_uri['ldapuser'] |
113 stats = lfsource.pull_data(isession, force=True, raise_on_error=True) |
113 stats = lfsource.pull_data(isession, force=True, raise_on_error=True) |
114 isession.commit() |
114 isession.commit() |
115 |
115 |
116 def test_filter_inactivate(self): |
116 def test_a_filter_inactivate(self): |
117 """ filtered out people should be deactivated, unable to authenticate """ |
117 """ filtered out people should be deactivated, unable to authenticate """ |
118 source = self.session.execute('CWSource S WHERE S type="ldapfeed"').get_entity(0,0) |
118 source = self.session.execute('CWSource S WHERE S type="ldapfeed"').get_entity(0,0) |
119 config = source.repo_source.check_config(source) |
119 config = source.repo_source.check_config(source) |
120 # filter with adim's phone number |
120 # filter with adim's phone number |
121 config['user-filter'] = u'(%s=%s)' % ('telephoneNumber', '109') |
121 config['user-filter'] = u'(%s=%s)' % ('telephoneNumber', '109') |
124 self._pull() |
124 self._pull() |
125 self.assertRaises(AuthenticationError, self.repo.connect, 'syt', password='syt') |
125 self.assertRaises(AuthenticationError, self.repo.connect, 'syt', password='syt') |
126 self.assertEqual(self.execute('Any N WHERE U login "syt", ' |
126 self.assertEqual(self.execute('Any N WHERE U login "syt", ' |
127 'U in_state S, S name N').rows[0][0], |
127 'U in_state S, S name N').rows[0][0], |
128 'deactivated') |
128 'deactivated') |
|
129 self.assertEqual(self.execute('Any N WHERE U login "adim", ' |
|
130 'U in_state S, S name N').rows[0][0], |
|
131 'activated') |
|
132 # unfilter, syt should be activated again |
|
133 config['user-filter'] = u'' |
|
134 source.repo_source.update_config(source, config) |
|
135 self.commit() |
|
136 self._pull() |
|
137 self.assertEqual(self.execute('Any N WHERE U login "syt", ' |
|
138 'U in_state S, S name N').rows[0][0], |
|
139 'activated') |
129 self.assertEqual(self.execute('Any N WHERE U login "adim", ' |
140 self.assertEqual(self.execute('Any N WHERE U login "adim", ' |
130 'U in_state S, S name N').rows[0][0], |
141 'U in_state S, S name N').rows[0][0], |
131 'activated') |
142 'activated') |
132 |
143 |
133 def test_delete(self): |
144 def test_delete(self): |
147 self._pull() |
158 self._pull() |
148 # reset the fscking ldap thing |
159 # reset the fscking ldap thing |
149 self.tearDownClass() |
160 self.tearDownClass() |
150 self.setUpClass() |
161 self.setUpClass() |
151 self._pull() |
162 self._pull() |
152 # still deactivated, but a warning has been emitted ... |
|
153 self.assertEqual(self.execute('Any N WHERE U login "syt", ' |
163 self.assertEqual(self.execute('Any N WHERE U login "syt", ' |
154 'U in_state S, S name N').rows[0][0], |
164 'U in_state S, S name N').rows[0][0], |
155 'deactivated') |
165 'activated') |
156 # test reactivating the user isn't enough to authenticate, as the native source |
166 # test reactivating the user isn't enough to authenticate, as the native source |
157 # refuse to authenticate user from other sources |
167 # refuse to authenticate user from other sources |
158 os.system(deletecmd) |
168 os.system(deletecmd) |
159 self._pull() |
169 self._pull() |
160 user = self.execute('CWUser U WHERE U login "syt"').get_entity(0, 0) |
170 user = self.execute('CWUser U WHERE U login "syt"').get_entity(0, 0) |