reauth: add reauth capabilities to HttpConn

This CL makes it possible to create HttpConn RPC calls with ReAuth
credential.

Callers can specify whether they want to fail when ReAuth token isn't
available, or fallback to access token.

Bug: 442666611
Change-Id: I748a01634eea8ebe321779c8ae4f52d7beed3407
Reviewed-on: https://chromium-review.googlesource.com/c/chromium/tools/depot_tools/+/6938952
Commit-Queue: Jiewei Qian <qjw@chromium.org>
Reviewed-by: Scott Lee <ddoman@chromium.org>
Reviewed-by: Allen Li <ayatane@chromium.org>
This commit is contained in:
Jiewei Qian
2025-09-16 00:32:10 -07:00
committed by LUCI CQ
parent 024aacb38b
commit 9d7425377e
2 changed files with 153 additions and 24 deletions

View File

@@ -24,6 +24,7 @@ sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
import scm_mock
import gerrit_util
import auth
import metrics
import scm
import subprocess2
@@ -119,30 +120,33 @@ class CookiesAuthenticatorTest(unittest.TestCase):
self.addCleanup(mock.patch.stopall)
self.maxDiff = None
def assertAuthenticatedConnAuth(self,
auth: gerrit_util.CookiesAuthenticator,
host: str, expected: str):
def assertAuthenticatedConnAuth(
self, cookieAuth: gerrit_util.CookiesAuthenticator, host: str,
expected: str):
conn = makeConn(host)
auth.authenticate(conn)
cookieAuth.authenticate(conn)
self.assertEqual(conn.req_headers['Authorization'], expected)
def testGetNewPasswordUrl(self):
auth = gerrit_util.CookiesAuthenticator()
self.assertEqual('https://chromium.googlesource.com/new-password',
auth.get_new_password_url('chromium.googlesource.com'))
cookieAuth = gerrit_util.CookiesAuthenticator()
self.assertEqual(
'https://chromium.googlesource.com/new-password',
cookieAuth.get_new_password_url('chromium.googlesource.com'))
self.assertEqual(
'https://chrome-internal.googlesource.com/new-password',
auth.get_new_password_url(
cookieAuth.get_new_password_url(
'chrome-internal-review.googlesource.com'))
def testGetNewPasswordMessage(self):
auth = gerrit_util.CookiesAuthenticator()
cookieAuth = gerrit_util.CookiesAuthenticator()
self.assertIn(
'https://chromium.googlesource.com/new-password',
auth._get_new_password_message('chromium-review.googlesource.com'))
cookieAuth._get_new_password_message(
'chromium-review.googlesource.com'))
self.assertIn(
'https://chrome-internal.googlesource.com/new-password',
auth._get_new_password_message('chrome-internal.googlesource.com'))
cookieAuth._get_new_password_message(
'chrome-internal.googlesource.com'))
def testGetGitcookiesPath(self):
self.assertEqual(
@@ -161,9 +165,9 @@ class CookiesAuthenticatorTest(unittest.TestCase):
os.getenv.assert_called_with('GIT_COOKIES_PATH')
def testGitcookies(self):
auth = gerrit_util.CookiesAuthenticator()
cookieAuth = gerrit_util.CookiesAuthenticator()
self.assertEqual(
auth.gitcookies, {
cookieAuth.gitcookies, {
'chromium.googlesource.com':
('git-user.chromium.org', '1/chromium-secret'),
'chromium-review.googlesource.com':
@@ -175,23 +179,24 @@ class CookiesAuthenticatorTest(unittest.TestCase):
expected_chromium_header = (
'Basic Z2l0LXVzZXIuY2hyb21pdW0ub3JnOjEvY2hyb21pdW0tc2VjcmV0')
auth = gerrit_util.CookiesAuthenticator()
self.assertAuthenticatedConnAuth(auth, 'chromium.googlesource.com',
cookieAuth = gerrit_util.CookiesAuthenticator()
self.assertAuthenticatedConnAuth(cookieAuth,
'chromium.googlesource.com',
expected_chromium_header)
self.assertAuthenticatedConnAuth(auth,
self.assertAuthenticatedConnAuth(cookieAuth,
'chromium-review.googlesource.com',
expected_chromium_header)
self.assertAuthenticatedConnAuth(auth, 'some-review.example.com',
self.assertAuthenticatedConnAuth(cookieAuth, 'some-review.example.com',
'Bearer example-bearer-token')
def testGetAuthEmail(self):
auth = gerrit_util.CookiesAuthenticator()
cookieAuth = gerrit_util.CookiesAuthenticator()
self.assertEqual('user@chromium.org',
auth.get_auth_email('chromium.googlesource.com'))
cookieAuth.get_auth_email('chromium.googlesource.com'))
self.assertEqual(
'user@chromium.org',
auth.get_auth_email('chromium-review.googlesource.com'))
self.assertIsNone(auth.get_auth_email('some-review.example.com'))
cookieAuth.get_auth_email('chromium-review.googlesource.com'))
self.assertIsNone(cookieAuth.get_auth_email('some-review.example.com'))
class GceAuthenticatorTest(unittest.TestCase):
@@ -382,6 +387,53 @@ class GerritUtilTest(unittest.TestCase):
'body': '{"d": {"k": "v"}, "l": [1, 2, 3]}',
}, conn.req_params)
@mock.patch('auth.GerritAuthenticator.get_authorization_header')
@mock.patch('gerrit_util._Authenticator.get')
def testCreateHttpConn_ReAuth(self, mockAuth, getAuthHeader):
mockAuth.return_value = gerrit_util.GitCredsAuthenticator()
getAuthHeader.return_value = "BearerReAuth cafe"
reauth_context = auth.ReAuthContext(host="chromium",
project="infra/infra")
gerrit_util.CreateHttpConn('host.example.com',
'foo/bar',
reauth_context=reauth_context)
getAuthHeader.assert_called_with(reauth_context)
@mock.patch('auth.GerritAuthenticator.get_access_token')
@mock.patch('auth.GerritAuthenticator.get_authorization_header')
@mock.patch('gerrit_util._Authenticator.get')
def testCreateHttpConn_ReAuthOptional(self, mockAuth, getAuthHeader,
getAccessToken):
mockAuth.return_value = gerrit_util.GitCredsAuthenticator()
getAuthHeader.side_effect = auth.GitReAuthRequiredError()
getAccessToken.return_value = "decafe"
reauth_context = auth.ReAuthContext(host="chromium",
project="infra/infra")
gerrit_util.CreateHttpConn('host.example.com',
'foo/bar',
reauth_context=reauth_context,
reauth_is_optional=True)
getAuthHeader.assert_called_with(reauth_context)
getAccessToken.assert_called()
@mock.patch('auth.GerritAuthenticator.get_access_token')
@mock.patch('auth.GerritAuthenticator.get_authorization_header')
@mock.patch('gerrit_util._Authenticator.get')
def testCreateHttpConn_ReAuthRequired(self, mockAuth, getAuthHeader,
getAccessToken):
mockAuth.return_value = gerrit_util.GitCredsAuthenticator()
getAuthHeader.side_effect = auth.GitReAuthRequiredError()
getAccessToken.return_value = "decafe"
reauth_context = auth.ReAuthContext(host="chromium",
project="infra/infra")
with self.assertRaises(auth.GitReAuthRequiredError):
gerrit_util.CreateHttpConn('host.example.com',
'foo/bar',
reauth_context=reauth_context,
reauth_is_optional=False)
getAuthHeader.assert_called_with(reauth_context)
getAccessToken.assert_not_called()
def testReadHttpResponse_200(self):
conn = mock.Mock()
conn.req_params = {'uri': 'uri', 'method': 'method'}
@@ -704,6 +756,14 @@ class SSOAuthenticatorTest(unittest.TestCase):
with self.assertRaises(subprocess.TimeoutExpired):
self.sso._get_sso_info()
@mock.patch('gerrit_util.SSOAuthenticator.authenticate')
def testAttemptAuthenticateWithReAuth(self, mockAuthenticate):
conn = makeConn("chromium")
reauth_context = auth.ReAuthContext(host="chromium",
project="infra/infra")
out = self.sso.attempt_authenticate_with_reauth(conn, reauth_context)
mockAuthenticate.assert_called()
self.assertTrue(out)
class SSOHelperTest(unittest.TestCase):