mirror of
https://chromium.googlesource.com/chromium/tools/depot_tools.git
synced 2026-01-10 18:21:28 +00:00
Add luci-auth-fido2-plugin
This plugin handles FIDO2 security keys for doing auth and integrates with luci-auth (specifically git-credential-luci). (It's in Python because tl;dr the Python library is much better.) Bug: 433851494 Change-Id: Ib956b614588aad8ad4fda7619bfbae17a670438f Reviewed-on: https://chromium-review.googlesource.com/c/chromium/tools/depot_tools/+/6804585 Reviewed-by: Scott Lee <ddoman@chromium.org> Commit-Queue: Allen Li <ayatane@chromium.org>
This commit is contained in:
1
luci-auth-fido2-plugin
Symbolic link
1
luci-auth-fido2-plugin
Symbolic link
@@ -0,0 +1 @@
|
||||
luci_auth_fido2_plugin.py
|
||||
12
luci-auth-fido2-plugin.bat
Normal file
12
luci-auth-fido2-plugin.bat
Normal file
@@ -0,0 +1,12 @@
|
||||
@echo off
|
||||
:: Copyright 2025 The Chromium Authors. All rights reserved.
|
||||
:: Use of this source code is governed by a BSD-style license that can be
|
||||
:: found in the LICENSE file.
|
||||
setlocal
|
||||
|
||||
:: Ensure that "depot_tools" is somewhere in PATH so this tool can be used
|
||||
:: standalone, but allow other PATH manipulations to take priority.
|
||||
set PATH=%PATH%;%~dp0
|
||||
|
||||
:: Defer control.
|
||||
vpython3 "%~dp0\luci_auth_fido2_plugin.py" %*
|
||||
290
luci_auth_fido2_plugin.py
Executable file
290
luci_auth_fido2_plugin.py
Executable file
@@ -0,0 +1,290 @@
|
||||
#!/usr/bin/env vpython3
|
||||
# Copyright 2025 The ChromiumOS Authors
|
||||
# Use of this source code is governed by a BSD-style license that can be
|
||||
# found in the LICENSE file.
|
||||
|
||||
# [VPYTHON:BEGIN]
|
||||
# python_version: "3.11"
|
||||
# wheel: <
|
||||
# name: "infra/python/wheels/cffi/${vpython_platform}"
|
||||
# version: "version:1.15.1.chromium.2"
|
||||
# >
|
||||
# wheel: <
|
||||
# name: "infra/python/wheels/cryptography/${vpython_platform}"
|
||||
# version: "version:43.0.0"
|
||||
# >
|
||||
# wheel: <
|
||||
# name: "infra/python/wheels/pycparser-py2_py3"
|
||||
# version: "version:2.21"
|
||||
# >
|
||||
# wheel: <
|
||||
# name: "infra/python/wheels/fido2-py3"
|
||||
# version: "version:2.0.0"
|
||||
# >
|
||||
# [VPYTHON:END]
|
||||
|
||||
from concurrent.futures import ThreadPoolExecutor, as_completed
|
||||
from contextlib import contextmanager
|
||||
import ctypes
|
||||
import dataclasses
|
||||
import json
|
||||
import logging
|
||||
import signal
|
||||
import sys
|
||||
from threading import Event
|
||||
from typing import BinaryIO
|
||||
|
||||
from fido2.client import DefaultClientDataCollector
|
||||
from fido2.client import Fido2Client, UserInteraction, WebAuthnClient
|
||||
from fido2.hid import CtapHidDevice
|
||||
from fido2.webauthn import AuthenticationResponse
|
||||
from fido2.webauthn import PublicKeyCredentialRequestOptions, UserVerificationRequirement
|
||||
|
||||
try:
|
||||
from fido2.client.windows import WindowsClient
|
||||
except ImportError:
|
||||
WindowsClient = None
|
||||
|
||||
_PLUGIN_ENDIANNESS = 'little'
|
||||
_PLUGIN_HEADER_SIZE = 4
|
||||
|
||||
# Exit codes.
|
||||
_EXIT_NO_FIDO2_DEVICES = 11
|
||||
_EXIT_ALL_ASSERTIONS_FAILED = 12
|
||||
_EXIT_NO_MATCHING_CRED = 13
|
||||
|
||||
|
||||
def read_full(r: BinaryIO, size: int) -> bytes:
|
||||
"""Read an exact amount of data.
|
||||
|
||||
Raises exception on error or EOF.
|
||||
"""
|
||||
b = r.read(size)
|
||||
if len(b) != size:
|
||||
raise EOFError(f"premature EOF when reading {size} bytes from {r}.")
|
||||
return b
|
||||
|
||||
|
||||
def write_full(w: BinaryIO, b: bytes):
|
||||
"""Write all bytes.
|
||||
|
||||
Raises IOError if the write isn't complete.
|
||||
"""
|
||||
written = w.write(b)
|
||||
if written != len(b):
|
||||
raise IOError(
|
||||
f"failed to write fully, wrote {written} bytes out of {_PLUGIN_HEADER_SIZE} bytes."
|
||||
)
|
||||
|
||||
|
||||
def plugin_read(r: BinaryIO) -> bytes:
|
||||
"""Read a framed WebAuthn plugin message.
|
||||
|
||||
A frame consists of: 4 bytes of little endian uint32 length, plus
|
||||
this amount bytes of binary data.
|
||||
"""
|
||||
header = read_full(r, _PLUGIN_HEADER_SIZE)
|
||||
length = int.from_bytes(header, _PLUGIN_ENDIANNESS, signed=False)
|
||||
return read_full(r, length)
|
||||
|
||||
|
||||
def plugin_write(w: BinaryIO, b: bytes):
|
||||
"""Write a framed Webauthn plugin message.
|
||||
|
||||
A frame consists of: 4 bytes of little endian uint32 length, plus
|
||||
this amount bytes of binary data.
|
||||
"""
|
||||
length = len(b)
|
||||
header = length.to_bytes(_PLUGIN_HEADER_SIZE,
|
||||
_PLUGIN_ENDIANNESS,
|
||||
signed=False)
|
||||
write_full(w, header)
|
||||
write_full(w, b)
|
||||
|
||||
|
||||
@dataclasses.dataclass
|
||||
class PluginRequest:
|
||||
origin: str
|
||||
public_key_credential_request: PublicKeyCredentialRequestOptions
|
||||
|
||||
|
||||
def parse_plugin_request(b: bytes) -> PluginRequest:
|
||||
"""Parse a plugin request JSON string."""
|
||||
j = json.loads(b)
|
||||
|
||||
req = PublicKeyCredentialRequestOptions.from_dict(j["requestData"])
|
||||
|
||||
# Apply overrides to certain fields.
|
||||
req = PublicKeyCredentialRequestOptions(
|
||||
challenge=req.challenge,
|
||||
rp_id=req.rp_id,
|
||||
allow_credentials=req.allow_credentials,
|
||||
hints=req.hints,
|
||||
|
||||
# Default to 30s timeout.
|
||||
timeout=req.timeout or 30_000,
|
||||
|
||||
# Discourage UV.
|
||||
#
|
||||
# ReAuth flow is triggered for user who's already logged in, so
|
||||
# there's no need to ask for PIN/password authentication factor.
|
||||
#
|
||||
# Here we only want to test for user presence and ownership of
|
||||
# the private key.
|
||||
user_verification=UserVerificationRequirement.DISCOURAGED,
|
||||
|
||||
# Don't support extensions for now.
|
||||
extensions=None,
|
||||
)
|
||||
|
||||
return PluginRequest(
|
||||
origin=j["origin"],
|
||||
public_key_credential_request=req,
|
||||
)
|
||||
|
||||
|
||||
def encode_plugin_response(a: AuthenticationResponse) -> bytes:
|
||||
"""Encode a plugin response to JSON."""
|
||||
return json.dumps({
|
||||
"type": "getResponse",
|
||||
"responseData": dict(a),
|
||||
"error": None,
|
||||
}).encode('utf-8')
|
||||
|
||||
|
||||
class DiscardInteraction(UserInteraction):
|
||||
"""Handler when user interaction is required.
|
||||
|
||||
This plugin's stdin/stdout talks with git-credential-luci, so we fail
|
||||
actions that require user input (this plugin shouldn't set any flag
|
||||
that require user interaction).
|
||||
"""
|
||||
|
||||
def prompt_up(self):
|
||||
sys.stderr.write("\nTouch your blinking security key to continue.\n\n")
|
||||
|
||||
def request_pin(self, permissions, rp_id):
|
||||
# This plugin shouldn't set assertion flags that will require
|
||||
# PIN entry.
|
||||
return None
|
||||
|
||||
def request_uv(self, permissions, rp_id):
|
||||
# Don't allow user verification (UV), because we don't allow PIN
|
||||
# entry, UV will fail.
|
||||
return False
|
||||
|
||||
|
||||
def get_clients(origin: str) -> list[tuple[WebAuthnClient, str]]:
|
||||
"""Return WebAuthn clients.
|
||||
|
||||
The return value is a list of (WebAuthnClient, client description)
|
||||
where we can send assertion requests to.
|
||||
|
||||
On Windows, this method returns a client that talks with Win32 API
|
||||
if available.
|
||||
"""
|
||||
client_data_collector = DefaultClientDataCollector(origin)
|
||||
|
||||
# Use Windows WebAuthn API if available, and if we aren't running as admin.
|
||||
if WindowsClient is not None \
|
||||
and WindowsClient.is_available():
|
||||
if ctypes.windll.shell32.IsUserAnAdmin():
|
||||
logging.info("User is admin")
|
||||
else:
|
||||
logging.info("Using WindowsClient")
|
||||
return [(WindowsClient(client_data_collector), "WindowsWebAuthn")]
|
||||
|
||||
user_interaction = DiscardInteraction()
|
||||
clients = []
|
||||
for dev in CtapHidDevice.list_devices():
|
||||
desc = dev.descriptor
|
||||
desc_str = (f'CtapHidDevice {desc.product_name}'
|
||||
f' (VID 0x{desc.vid:04x},'
|
||||
f' PID 0x{desc.pid:04x}) at {desc.path}')
|
||||
logging.info("Found %s", desc_str)
|
||||
clients.append((
|
||||
Fido2Client(
|
||||
dev,
|
||||
client_data_collector=client_data_collector,
|
||||
user_interaction=user_interaction,
|
||||
),
|
||||
desc_str,
|
||||
))
|
||||
|
||||
return clients
|
||||
|
||||
|
||||
def assert_on_client(*, client: WebAuthnClient, client_desc: str,
|
||||
request: PublicKeyCredentialRequestOptions, cancel: Event):
|
||||
try:
|
||||
return client.get_assertion(request, cancel)
|
||||
except Exception as e:
|
||||
if not cancel.is_set():
|
||||
logging.error("Assertion failed on %s: %s", client_desc, e)
|
||||
return None
|
||||
|
||||
|
||||
@contextmanager
|
||||
def set_event_on_signal(signum: int, event: Event):
|
||||
"""Return a context manager that sets `event` when `signum` is signaled."""
|
||||
original_handler = signal.getsignal(signum)
|
||||
|
||||
def handler(signum, _):
|
||||
logging.info("Signal %s received.", signal.strsignal(signum))
|
||||
event.set()
|
||||
|
||||
signal.signal(signum, handler)
|
||||
try:
|
||||
yield
|
||||
finally:
|
||||
signal.signal(signum, original_handler)
|
||||
|
||||
|
||||
def main():
|
||||
logging.basicConfig(level=logging.INFO)
|
||||
plugin_req = parse_plugin_request(plugin_read(sys.stdin.buffer))
|
||||
|
||||
clients = get_clients(plugin_req.origin)
|
||||
if not clients:
|
||||
logging.error("No available FIDO devices.")
|
||||
sys.exit(_EXIT_NO_FIDO2_DEVICES)
|
||||
|
||||
# Race and retrieve the first successful assertion.
|
||||
outcome = None
|
||||
cancel = Event()
|
||||
with set_event_on_signal(signal.SIGINT, cancel), set_event_on_signal(
|
||||
signal.SIGTERM,
|
||||
cancel), ThreadPoolExecutor(max_workers=len(clients)) as executor:
|
||||
futures = [
|
||||
executor.submit(assert_on_client,
|
||||
client=client,
|
||||
client_desc=desc,
|
||||
request=plugin_req.public_key_credential_request,
|
||||
cancel=cancel) for client, desc in clients
|
||||
]
|
||||
for future in as_completed(futures):
|
||||
if result := future.result():
|
||||
outcome = result
|
||||
cancel.set()
|
||||
break
|
||||
|
||||
if not outcome:
|
||||
logging.error("All assertions failed or timed out.")
|
||||
sys.exit(_EXIT_ALL_ASSERTIONS_FAILED)
|
||||
|
||||
assertions = outcome.get_assertions()
|
||||
if not assertions:
|
||||
logging.error("No matching credential.")
|
||||
sys.exit(_EXIT_NO_MATCHING_CRED)
|
||||
elif len(assertions) > 1:
|
||||
logging.warning(
|
||||
"Multiple assertions returned for rp_id %s, selecting the first one.",
|
||||
plugin_req.public_key_credential_request.rp_id)
|
||||
|
||||
# Write the first completed assertion.
|
||||
plugin_write(sys.stdout.buffer,
|
||||
encode_plugin_response(outcome.get_response(0)))
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
69
tests/luci_auth_fido2_plugin_test.py
Executable file
69
tests/luci_auth_fido2_plugin_test.py
Executable file
@@ -0,0 +1,69 @@
|
||||
#!/usr/bin/env vpython3
|
||||
# coding=utf-8
|
||||
# Copyright (c) 2012 The Chromium Authors. All rights reserved.
|
||||
# Use of this source code is governed by a BSD-style license that can be
|
||||
# found in the LICENSE file.
|
||||
|
||||
# [VPYTHON:BEGIN]
|
||||
# python_version: "3.11"
|
||||
# wheel: <
|
||||
# name: "infra/python/wheels/cffi/${vpython_platform}"
|
||||
# version: "version:1.15.1.chromium.2"
|
||||
# >
|
||||
# wheel: <
|
||||
# name: "infra/python/wheels/cryptography/${vpython_platform}"
|
||||
# version: "version:43.0.0"
|
||||
# >
|
||||
# wheel: <
|
||||
# name: "infra/python/wheels/pycparser-py2_py3"
|
||||
# version: "version:2.21"
|
||||
# >
|
||||
# wheel: <
|
||||
# name: "infra/python/wheels/fido2-py3"
|
||||
# version: "version:2.0.0"
|
||||
# >
|
||||
# [VPYTHON:END]
|
||||
"""Unit tests for luci_auth_fido2_plugin.py."""
|
||||
|
||||
import logging
|
||||
import os
|
||||
import sys
|
||||
import unittest
|
||||
|
||||
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
|
||||
|
||||
from fido2.webauthn import PublicKeyCredentialDescriptor
|
||||
from fido2.webauthn import PublicKeyCredentialRequestOptions
|
||||
from fido2.webauthn import PublicKeyCredentialType
|
||||
from fido2.webauthn import UserVerificationRequirement
|
||||
|
||||
import luci_auth_fido2_plugin as plugin
|
||||
|
||||
|
||||
class TestFido2Plugin(unittest.TestCase):
|
||||
|
||||
def test_parse_plugin_request(self):
|
||||
req = b'{"type":"get","origin":"https://accounts.google.com","requestData":{"rpId":"google.com","challenge":"alice-==","timeout":30000,"allowCredentials":[{"type":"public-key","id":"key="}],"userVerification":"preferred","extensions":{"appid":"google.com"}}}'
|
||||
got = plugin.parse_plugin_request(req)
|
||||
want = plugin.PluginRequest(
|
||||
origin='https://accounts.google.com',
|
||||
public_key_credential_request=PublicKeyCredentialRequestOptions(
|
||||
challenge=b'jX\x9c{',
|
||||
timeout=30_000,
|
||||
rp_id='google.com',
|
||||
allow_credentials=[
|
||||
PublicKeyCredentialDescriptor(
|
||||
type=PublicKeyCredentialType.PUBLIC_KEY,
|
||||
id=b'\x91\xec',
|
||||
)
|
||||
],
|
||||
user_verification=UserVerificationRequirement.DISCOURAGED,
|
||||
),
|
||||
)
|
||||
self.assertEqual(got, want)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
logging.basicConfig(
|
||||
level=logging.DEBUG if '-v' in sys.argv else logging.ERROR)
|
||||
unittest.main()
|
||||
Reference in New Issue
Block a user