[repo] Drop repo.new_session method
we should not go through Session to create Connection anymore.
This is unofficial API, no backward compat for now.
--- a/cubicweb/repoapi.py Tue Mar 14 11:07:58 2017 +0100
+++ b/cubicweb/repoapi.py Fri Mar 10 18:04:25 2017 +0100
@@ -45,8 +45,13 @@
def connect(repo, login, **kwargs):
"""Take credential and return associated Connection.
- raise AuthenticationError if the credential are invalid."""
- return repo.new_session(login, **kwargs).new_cnx()
+ raise AuthenticationError if the credential are invalid.
+ """
+ # use an internal connection to try to get a user object
+ with repo.internal_cnx() as cnx:
+ user = repo.authenticate_user(cnx, login, **kwargs)
+ return Connection(repo, user)
+
def anonymous_cnx(repo):
"""return a Connection for Anonymous user.
--- a/cubicweb/server/repository.py Tue Mar 14 11:07:58 2017 +0100
+++ b/cubicweb/server/repository.py Fri Mar 10 18:04:25 2017 +0100
@@ -642,22 +642,6 @@
query_attrs)
return rset.rows
- def new_session(self, login, **kwargs):
- """open a *new* session for a given user
-
- raise `AuthenticationError` if the authentication failed
- raise `ConnectionError` if we can't open a connection
- """
- # use an internal connection
- with self.internal_cnx() as cnx:
- # try to get a user object
- user = self.authenticate_user(cnx, login, **kwargs)
- session = Session(user, self)
- user._cw = user.cw_rset.req = session
- user.cw_clear_relation_cache()
- self.info('opened session %s for user %s', session, login)
- return session
-
# session handling ########################################################
@contextmanager
--- a/cubicweb/server/test/unittest_ldapsource.py Tue Mar 14 11:07:58 2017 +0100
+++ b/cubicweb/server/test/unittest_ldapsource.py Fri Mar 10 18:04:25 2017 +0100
@@ -326,6 +326,7 @@
def test_a_filter_inactivate(self):
""" filtered out people should be deactivated, unable to authenticate """
+ repo_source = self.repo.sources_by_uri['ldap']
with self.admin_access.repo_cnx() as cnx:
source = cnx.execute('CWSource S WHERE S type="ldapfeed"').get_entity(0, 0)
config = source.repo_source.check_config(source)
@@ -335,7 +336,8 @@
cnx.commit()
with self.repo.internal_cnx() as cnx:
self.pull(cnx)
- self.assertRaises(AuthenticationError, self.repo.new_session, 'syt', password='syt')
+ self.assertRaises(AuthenticationError,
+ repo_source.authenticate, cnx, 'syt', 'syt')
with self.admin_access.repo_cnx() as cnx:
self.assertEqual(cnx.execute('Any N WHERE U login "syt", '
'U in_state S, S name N').rows[0][0],
@@ -364,7 +366,9 @@
self.delete_ldap_entry('uid=syt,ou=People,dc=cubicweb,dc=test')
with self.repo.internal_cnx() as cnx:
self.pull(cnx)
- self.assertRaises(AuthenticationError, self.repo.new_session, 'syt', password='syt')
+ source = self.repo.sources_by_uri['ldap']
+ self.assertRaises(AuthenticationError,
+ source.authenticate, cnx, 'syt', 'syt')
with self.admin_access.repo_cnx() as cnx:
self.assertEqual(cnx.execute('Any N WHERE U login "syt", '
'U in_state S, S name N').rows[0][0],
@@ -401,6 +405,7 @@
# test reactivating BY HAND the user isn't enough to
# authenticate, as the native source refuse to authenticate
# user from other sources
+ repo_source = self.repo.sources_by_uri['ldap']
self.delete_ldap_entry('uid=syt,ou=People,dc=cubicweb,dc=test')
with self.repo.internal_cnx() as cnx:
self.pull(cnx)
@@ -409,15 +414,16 @@
user = cnx.execute('CWUser U WHERE U login "syt"').get_entity(0, 0)
user.cw_adapt_to('IWorkflowable').fire_transition('activate')
cnx.commit()
- with self.assertRaises(AuthenticationError):
- self.repo.new_session('syt', password='syt')
+ self.assertRaises(AuthenticationError,
+ repo_source.authenticate, cnx, 'syt', 'syt')
# ok now let's try to make it a system user
cnx.execute('SET X cw_source S WHERE X eid %(x)s, S name "system"', {'x': user.eid})
cnx.commit()
- # and that we can now authenticate again
- self.assertRaises(AuthenticationError, self.repo.new_session, 'syt', password='toto')
- self.assertTrue(self.repo.new_session('syt', password='syt'))
+ # and that we can now authenticate again
+ self.assertRaises(AuthenticationError,
+ repo_source.authenticate, cnx, 'syt', 'toto')
+ self.assertTrue(self.repo.authenticate_user(cnx, 'syt', password='syt'))
class LDAPFeedGroupTC(LDAPFeedTestBase):
--- a/cubicweb/server/test/unittest_repository.py Tue Mar 14 11:07:58 2017 +0100
+++ b/cubicweb/server/test/unittest_repository.py Fri Mar 10 18:04:25 2017 +0100
@@ -77,19 +77,21 @@
self.assertFalse(cnx.execute('Any X WHERE NOT X cw_source S'))
def test_connect(self):
- self.assertTrue(self.repo.new_session(self.admlogin, password=self.admpassword))
- self.assertRaises(AuthenticationError,
- self.repo.new_session, self.admlogin, password='nimportnawak')
- self.assertRaises(AuthenticationError,
- self.repo.new_session, self.admlogin, password='')
- self.assertRaises(AuthenticationError,
- self.repo.new_session, self.admlogin, password=None)
- self.assertRaises(AuthenticationError,
- self.repo.new_session, None, password=None)
- self.assertRaises(AuthenticationError,
- self.repo.new_session, self.admlogin)
- self.assertRaises(AuthenticationError,
- self.repo.new_session, None)
+ with self.repo.internal_cnx() as cnx:
+ self.assertTrue(
+ self.repo.authenticate_user(cnx, self.admlogin, password=self.admpassword))
+ self.assertRaises(AuthenticationError, self.repo.authenticate_user,
+ cnx, self.admlogin, password='nimportnawak')
+ self.assertRaises(AuthenticationError, self.repo.authenticate_user,
+ cnx, self.admlogin, password='')
+ self.assertRaises(AuthenticationError, self.repo.authenticate_user,
+ cnx, self.admlogin, password=None)
+ self.assertRaises(AuthenticationError, self.repo.authenticate_user,
+ cnx, None, password=None)
+ self.assertRaises(AuthenticationError, self.repo.authenticate_user,
+ cnx, self.admlogin)
+ self.assertRaises(AuthenticationError, self.repo.authenticate_user,
+ cnx, None)
def test_login_upassword_accent(self):
with self.admin_access.repo_cnx() as cnx:
@@ -97,8 +99,8 @@
'X in_group G WHERE G name "users"',
{'login': u"barnabé", 'passwd': u"héhéhé".encode('UTF8')})
cnx.commit()
- repo = self.repo
- self.assertTrue(repo.new_session(u"barnabé", password=u"héhéhé".encode('UTF8')))
+ repo = self.repo
+ self.assertTrue(repo.authenticate_user(cnx, u"barnabé", password=u"héhéhé".encode('UTF8')))
def test_rollback_on_execute_validation_error(self):
class ValidationErrorAfterHook(Hook):
@@ -181,12 +183,6 @@
ownedby = schema.rschema('owned_by')
self.assertEqual(ownedby.objects('CWEType'), ('CWUser',))
- def test_internal_api(self):
- repo = self.repo
- session = repo.new_session(self.admlogin, password=self.admpassword)
- with session.new_cnx() as cnx:
- self.assertEqual(repo.type_from_eid(2, cnx), 'CWGroup')
-
def test_public_api(self):
self.assertEqual(self.repo.get_schema(), self.repo.schema)
self.assertEqual(self.repo.source_defs(), {'system': {'type': 'native',
--- a/cubicweb/server/test/unittest_security.py Tue Mar 14 11:07:58 2017 +0100
+++ b/cubicweb/server/test/unittest_security.py Fri Mar 10 18:04:25 2017 +0100
@@ -85,13 +85,13 @@
oldhash = cnx.system_sql("SELECT cw_upassword FROM cw_CWUser "
"WHERE cw_login = 'oldpassword'").fetchone()[0]
oldhash = self.repo.system_source.binary_to_str(oldhash)
- session = self.repo.new_session('oldpassword', password='oldpassword')
+ self.repo.authenticate_user(cnx, 'oldpassword', password='oldpassword')
newhash = cnx.system_sql("SELECT cw_upassword FROM cw_CWUser "
"WHERE cw_login = 'oldpassword'").fetchone()[0]
newhash = self.repo.system_source.binary_to_str(newhash)
self.assertNotEqual(oldhash, newhash)
self.assertTrue(newhash.startswith(b'$6$'))
- session = self.repo.new_session('oldpassword', password='oldpassword')
+ self.repo.authenticate_user(cnx, 'oldpassword', password='oldpassword')
newnewhash = cnx.system_sql("SELECT cw_upassword FROM cw_CWUser WHERE "
"cw_login = 'oldpassword'").fetchone()[0]
newnewhash = self.repo.system_source.binary_to_str(newnewhash)
@@ -303,7 +303,8 @@
cnx.execute('SET X upassword %(passwd)s WHERE X eid %(x)s',
{'x': ueid, 'passwd': b'newpwd'})
cnx.commit()
- self.repo.new_session('user', password='newpwd')
+ with self.repo.internal_cnx() as cnx:
+ self.repo.authenticate_user(cnx, 'user', password='newpwd')
def test_user_cant_change_other_upassword(self):
with self.admin_access.repo_cnx() as cnx:
--- a/cubicweb/utils.py Tue Mar 14 11:07:58 2017 +0100
+++ b/cubicweb/utils.py Fri Mar 10 18:04:25 2017 +0100
@@ -51,6 +51,7 @@
random.seed()
def admincnx(appid):
+ from cubicweb import repoapi
from cubicweb.cwconfig import CubicWebConfiguration
from cubicweb.server.repository import Repository
config = CubicWebConfiguration.config_for(appid)
@@ -60,8 +61,7 @@
repo = Repository(config)
repo.bootstrap()
- session = repo.new_session(login, password=password)
- return session.new_cnx()
+ return repoapi.connect(repo, login, password=password)
def make_uid(key=None):