mirror of
https://chromium.googlesource.com/chromium/tools/depot_tools.git
synced 2026-01-11 02:31:29 +00:00
This CL adds pinentry support to ask for security key PIN. Security key PINs may be required (by the manufacturer, or configured by the user) to perform FIDO2 assertions. PIN entry is done by calling pinentry command (or overridden by LUCI_AUTH_PINENTRY environment variable), which we'll ask users to install during onboarding. Bug: 448235795 Change-Id: Ie87389330668dc5eaf8214699defec094757ca9e Reviewed-on: https://chromium-review.googlesource.com/c/chromium/tools/depot_tools/+/7004844 Reviewed-by: Jiewei Qian <qjw@chromium.org> Commit-Queue: Jiewei Qian <qjw@chromium.org> Reviewed-by: Chenlin Fan <fancl@chromium.org>
391 lines
12 KiB
Python
Executable File
391 lines
12 KiB
Python
Executable File
#!/usr/bin/env vpython3
|
|
# Copyright 2025 The ChromiumOS Authors
|
|
# Use of this source code is governed by a BSD-style license that can be
|
|
# found in the LICENSE file.
|
|
|
|
# [VPYTHON:BEGIN]
|
|
# python_version: "3.11"
|
|
# wheel: <
|
|
# name: "infra/python/wheels/cffi/${vpython_platform}"
|
|
# version: "version:1.15.1.chromium.2"
|
|
# >
|
|
# wheel: <
|
|
# name: "infra/python/wheels/cryptography/${vpython_platform}"
|
|
# version: "version:43.0.0"
|
|
# >
|
|
# wheel: <
|
|
# name: "infra/python/wheels/pycparser-py2_py3"
|
|
# version: "version:2.21"
|
|
# >
|
|
# wheel: <
|
|
# name: "infra/python/wheels/fido2-py3"
|
|
# version: "version:2.0.0"
|
|
# >
|
|
# [VPYTHON:END]
|
|
|
|
import argparse
|
|
from concurrent.futures import ThreadPoolExecutor, as_completed
|
|
from contextlib import contextmanager
|
|
import dataclasses
|
|
import json
|
|
import logging
|
|
import os
|
|
import signal
|
|
import subprocess
|
|
import sys
|
|
from threading import Event
|
|
import traceback
|
|
from typing import BinaryIO, Optional
|
|
|
|
from fido2.client import DefaultClientDataCollector
|
|
from fido2.client import Fido2Client, UserInteraction, WebAuthnClient
|
|
from fido2.hid import CtapHidDevice
|
|
from fido2.webauthn import AuthenticationResponse
|
|
from fido2.webauthn import PublicKeyCredentialRequestOptions, UserVerificationRequirement
|
|
|
|
try:
|
|
from fido2.client.windows import WindowsClient
|
|
except ImportError:
|
|
WindowsClient = None
|
|
|
|
_PLUGIN_ENDIANNESS = 'little'
|
|
_PLUGIN_HEADER_SIZE = 4
|
|
|
|
# Exit codes.
|
|
_EXIT_NO_FIDO2_DEVICES = 11
|
|
_EXIT_ALL_ASSERTIONS_FAILED = 12
|
|
_EXIT_NO_MATCHING_CRED = 13
|
|
_EXIT_PINENTRY_FAILED = 14
|
|
|
|
def read_full(r: BinaryIO, size: int) -> bytes:
|
|
"""Read an exact amount of data.
|
|
|
|
Raises exception on error or EOF.
|
|
"""
|
|
b = r.read(size)
|
|
if len(b) != size:
|
|
raise EOFError(f"premature EOF when reading {size} bytes from {r}.")
|
|
return b
|
|
|
|
|
|
def write_full(w: BinaryIO, b: bytes):
|
|
"""Write all bytes.
|
|
|
|
Raises IOError if the write isn't complete.
|
|
"""
|
|
written = w.write(b)
|
|
if written != len(b):
|
|
raise IOError(
|
|
f"failed to write fully, wrote {written} bytes out of {_PLUGIN_HEADER_SIZE} bytes."
|
|
)
|
|
|
|
|
|
def plugin_read(r: BinaryIO) -> bytes:
|
|
"""Read a framed WebAuthn plugin message.
|
|
|
|
A frame consists of: 4 bytes of little endian uint32 length, plus
|
|
this amount bytes of binary data.
|
|
"""
|
|
header = read_full(r, _PLUGIN_HEADER_SIZE)
|
|
length = int.from_bytes(header, _PLUGIN_ENDIANNESS, signed=False)
|
|
return read_full(r, length)
|
|
|
|
|
|
def plugin_write(w: BinaryIO, b: bytes):
|
|
"""Write a framed Webauthn plugin message.
|
|
|
|
A frame consists of: 4 bytes of little endian uint32 length, plus
|
|
this amount bytes of binary data.
|
|
"""
|
|
length = len(b)
|
|
header = length.to_bytes(_PLUGIN_HEADER_SIZE,
|
|
_PLUGIN_ENDIANNESS,
|
|
signed=False)
|
|
write_full(w, header)
|
|
write_full(w, b)
|
|
|
|
|
|
@dataclasses.dataclass
|
|
class PluginRequest:
|
|
origin: str
|
|
public_key_credential_request: PublicKeyCredentialRequestOptions
|
|
|
|
|
|
def parse_plugin_request(b: bytes) -> PluginRequest:
|
|
"""Parse a plugin request JSON string."""
|
|
j = json.loads(b)
|
|
|
|
req = PublicKeyCredentialRequestOptions.from_dict(j["requestData"])
|
|
|
|
# Apply overrides to certain fields.
|
|
req = PublicKeyCredentialRequestOptions(
|
|
challenge=req.challenge,
|
|
rp_id=req.rp_id,
|
|
allow_credentials=req.allow_credentials,
|
|
hints=req.hints,
|
|
|
|
# Default to 30s timeout.
|
|
timeout=req.timeout or 30_000,
|
|
|
|
# Discourage UV.
|
|
#
|
|
# ReAuth flow is triggered for user who's already logged in, so
|
|
# there's no need to ask for PIN/password authentication factor.
|
|
#
|
|
# Here we only want to test for user presence and ownership of
|
|
# the private key.
|
|
user_verification=UserVerificationRequirement.DISCOURAGED,
|
|
|
|
# Don't support extensions for now.
|
|
extensions=None,
|
|
)
|
|
|
|
return PluginRequest(
|
|
origin=j["origin"],
|
|
public_key_credential_request=req,
|
|
)
|
|
|
|
|
|
def encode_plugin_response(a: AuthenticationResponse) -> bytes:
|
|
"""Encode a plugin response to JSON."""
|
|
return json.dumps({
|
|
"type": "getResponse",
|
|
"responseData": dict(a),
|
|
"error": None,
|
|
}).encode('utf-8')
|
|
|
|
|
|
def request_pin_pinentry() -> Optional[str]:
|
|
"""Requests a PIN entry with pinentry program."""
|
|
try:
|
|
# Using pinentry to ask for PIN.
|
|
# https://www.gnupg.org/documentation/manuals/assuan/Client-requests.html
|
|
pinentry_path = os.environ.get('LUCI_AUTH_PINENTRY', 'pinentry')
|
|
proc = subprocess.Popen([pinentry_path],
|
|
stdin=subprocess.PIPE,
|
|
stdout=subprocess.PIPE,
|
|
stderr=subprocess.PIPE,
|
|
text=True)
|
|
except FileNotFoundError as e:
|
|
traceback.print_exception(e, file=sys.stderr)
|
|
logging.error("PIN requested, but can't find a pinentry program.")
|
|
logging.error(
|
|
"Please install a suitable pinentry program for your operating system."
|
|
)
|
|
sys.exit(_EXIT_PINENTRY_FAILED)
|
|
|
|
pinentry_input = """
|
|
OPTION ttyname=/dev/tty
|
|
SETTITLE Chromium Infra Auth
|
|
SETDESC Enter the FIDO2 PIN for your security key, then touch your security key to continue.
|
|
SETPROMPT PIN:
|
|
GETPIN
|
|
"""
|
|
stdout, stderr = proc.communicate(pinentry_input)
|
|
|
|
if proc.returncode != 0:
|
|
logging.error('pinentry failed: %s', stderr)
|
|
sys.exit(_EXIT_PINENTRY_FAILED)
|
|
|
|
for line in stdout.splitlines():
|
|
if line.startswith('D '):
|
|
return line[2:].strip()
|
|
|
|
logging.warning(
|
|
'An empty PIN was entered. Security key assertion may fail.')
|
|
return None
|
|
|
|
|
|
def request_pin_mac() -> Optional[str]:
|
|
"""Request a PIN entry with macOS's built-in `osascript` utility."""
|
|
osascript_command = (
|
|
'text returned of ('
|
|
'display'
|
|
' dialog "Enter security key PIN.\\n\\nThen touch your security key to continue."'
|
|
' default answer ""'
|
|
' with hidden answer'
|
|
' with title "Chromium Infra Auth"'
|
|
')')
|
|
|
|
result = subprocess.run(
|
|
['osascript', '-e', osascript_command],
|
|
capture_output=True,
|
|
text=True,
|
|
)
|
|
|
|
if result.returncode != 0:
|
|
logging.error("PIN entry failed: %s", result.stderr)
|
|
return None
|
|
|
|
return result.stdout.strip()
|
|
|
|
|
|
class PinEntryInteraction(UserInteraction):
|
|
"""Handler when user interaction is required.
|
|
|
|
This plugin's stdin/stdout talks with git-credential-luci, so we fail
|
|
actions that require user input (this plugin shouldn't set any flag
|
|
that require user interaction).
|
|
"""
|
|
|
|
def prompt_up(self):
|
|
sys.stderr.write("\nTouch your blinking security key to continue.\n\n")
|
|
|
|
def request_pin(self, permissions, rp_id):
|
|
"""Ask for PIN entry with a GUI dialog by using a system tool.
|
|
|
|
We only handle Linux and MacOS here. We use Windows WebAuthn API
|
|
directly, which handles PIN entry if necessary.
|
|
"""
|
|
if sys.platform == 'darwin':
|
|
return request_pin_mac()
|
|
return request_pin_pinentry()
|
|
|
|
def request_uv(self, permissions, rp_id):
|
|
# Allows PIN entry.
|
|
return True
|
|
|
|
|
|
def get_clients(origin: str) -> list[tuple[WebAuthnClient, str]]:
|
|
"""Return WebAuthn clients.
|
|
|
|
The return value is a list of (WebAuthnClient, client description)
|
|
where we can send assertion requests to.
|
|
|
|
On Windows, this method returns a client that talks with Win32 API
|
|
if available.
|
|
"""
|
|
client_data_collector = DefaultClientDataCollector(origin)
|
|
|
|
# Use Windows WebAuthn API if available.
|
|
if WindowsClient and WindowsClient.is_available():
|
|
logging.debug("Using WindowsClient")
|
|
return [(WindowsClient(client_data_collector), "WindowsWebAuthn")]
|
|
|
|
user_interaction = PinEntryInteraction()
|
|
clients = []
|
|
for dev in CtapHidDevice.list_devices():
|
|
desc = dev.descriptor
|
|
desc_str = (f'CtapHidDevice {desc.product_name}'
|
|
f' (VID 0x{desc.vid:04x},'
|
|
f' PID 0x{desc.pid:04x}) at {desc.path}')
|
|
logging.debug("Found %s", desc_str)
|
|
clients.append((
|
|
Fido2Client(
|
|
dev,
|
|
client_data_collector=client_data_collector,
|
|
user_interaction=user_interaction,
|
|
),
|
|
desc_str,
|
|
))
|
|
|
|
return clients
|
|
|
|
|
|
def assert_on_client(*, client: WebAuthnClient, client_desc: str,
|
|
request: PublicKeyCredentialRequestOptions, cancel: Event):
|
|
try:
|
|
return client.get_assertion(request, cancel)
|
|
except Exception as e:
|
|
if not cancel.is_set():
|
|
logging.error("Assertion failed on %s: %s", client_desc, e)
|
|
return None
|
|
|
|
|
|
@contextmanager
|
|
def set_event_on_signal(signum: int, event: Event):
|
|
"""Return a context manager that sets `event` when `signum` is signaled."""
|
|
original_handler = signal.getsignal(signum)
|
|
|
|
def handler(signum, _):
|
|
logging.info("Signal %s received.", signal.strsignal(signum))
|
|
event.set()
|
|
|
|
signal.signal(signum, handler)
|
|
try:
|
|
yield
|
|
finally:
|
|
signal.signal(signum, original_handler)
|
|
|
|
|
|
def get_parser() -> argparse.ArgumentParser:
|
|
parser = argparse.ArgumentParser(
|
|
description=
|
|
"A LUCI Auth plugin to perform FIDO2 security key assertions", )
|
|
parser.add_argument(
|
|
"-l",
|
|
"--list-devices",
|
|
action="store_true",
|
|
default=False,
|
|
help=
|
|
"If set, detects FIDO devices, then print their information to stderr, "
|
|
"then exit this program. Useful for troubleshoot udev rules and "
|
|
"permission issues on Linux.")
|
|
return parser
|
|
|
|
|
|
def main():
|
|
args = get_parser().parse_args()
|
|
|
|
logging.basicConfig(level=logging.INFO)
|
|
|
|
# If requested, probe FIDO devices, print their information, then exit.
|
|
if args.list_devices:
|
|
# A "stub" origin to satisfy Fido2Client constructor.
|
|
stub_origin = "chromium.org"
|
|
clients = get_clients(stub_origin)
|
|
if clients:
|
|
logging.info("Found the following FIDO devices:")
|
|
for _, client_desc in clients:
|
|
logging.info(" * %s", client_desc)
|
|
else:
|
|
logging.info("No available FIDO device.")
|
|
sys.exit(0)
|
|
|
|
plugin_req = parse_plugin_request(plugin_read(sys.stdin.buffer))
|
|
|
|
clients = get_clients(plugin_req.origin)
|
|
if not clients:
|
|
logging.error("No available FIDO device.")
|
|
sys.exit(_EXIT_NO_FIDO2_DEVICES)
|
|
|
|
# Race and retrieve the first successful assertion.
|
|
outcome = None
|
|
cancel = Event()
|
|
with set_event_on_signal(signal.SIGINT, cancel), set_event_on_signal(
|
|
signal.SIGTERM,
|
|
cancel), ThreadPoolExecutor(max_workers=len(clients)) as executor:
|
|
futures = [
|
|
executor.submit(assert_on_client,
|
|
client=client,
|
|
client_desc=desc,
|
|
request=plugin_req.public_key_credential_request,
|
|
cancel=cancel) for client, desc in clients
|
|
]
|
|
for future in as_completed(futures):
|
|
if result := future.result():
|
|
outcome = result
|
|
cancel.set()
|
|
break
|
|
|
|
if not outcome:
|
|
logging.error("All assertions failed or timed out.")
|
|
sys.exit(_EXIT_ALL_ASSERTIONS_FAILED)
|
|
|
|
assertions = outcome.get_assertions()
|
|
if not assertions:
|
|
logging.error("No matching credential.")
|
|
sys.exit(_EXIT_NO_MATCHING_CRED)
|
|
elif len(assertions) > 1:
|
|
logging.warning(
|
|
"Multiple assertions returned for rp_id %s, selecting the first one.",
|
|
plugin_req.public_key_credential_request.rp_id)
|
|
|
|
# Write the first completed assertion.
|
|
plugin_write(sys.stdout.buffer,
|
|
encode_plugin_response(outcome.get_response(0)))
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|