diff --git a/luci-auth-fido2-plugin b/luci-auth-fido2-plugin new file mode 120000 index 0000000000..05b52fafb2 --- /dev/null +++ b/luci-auth-fido2-plugin @@ -0,0 +1 @@ +luci_auth_fido2_plugin.py \ No newline at end of file diff --git a/luci-auth-fido2-plugin.bat b/luci-auth-fido2-plugin.bat new file mode 100644 index 0000000000..425d5848c4 --- /dev/null +++ b/luci-auth-fido2-plugin.bat @@ -0,0 +1,12 @@ +@echo off +:: Copyright 2025 The Chromium Authors. All rights reserved. +:: Use of this source code is governed by a BSD-style license that can be +:: found in the LICENSE file. +setlocal + +:: Ensure that "depot_tools" is somewhere in PATH so this tool can be used +:: standalone, but allow other PATH manipulations to take priority. +set PATH=%PATH%;%~dp0 + +:: Defer control. +vpython3 "%~dp0\luci_auth_fido2_plugin.py" %* diff --git a/luci_auth_fido2_plugin.py b/luci_auth_fido2_plugin.py new file mode 100755 index 0000000000..d6fb28edc9 --- /dev/null +++ b/luci_auth_fido2_plugin.py @@ -0,0 +1,290 @@ +#!/usr/bin/env vpython3 +# Copyright 2025 The ChromiumOS Authors +# Use of this source code is governed by a BSD-style license that can be +# found in the LICENSE file. + +# [VPYTHON:BEGIN] +# python_version: "3.11" +# wheel: < +# name: "infra/python/wheels/cffi/${vpython_platform}" +# version: "version:1.15.1.chromium.2" +# > +# wheel: < +# name: "infra/python/wheels/cryptography/${vpython_platform}" +# version: "version:43.0.0" +# > +# wheel: < +# name: "infra/python/wheels/pycparser-py2_py3" +# version: "version:2.21" +# > +# wheel: < +# name: "infra/python/wheels/fido2-py3" +# version: "version:2.0.0" +# > +# [VPYTHON:END] + +from concurrent.futures import ThreadPoolExecutor, as_completed +from contextlib import contextmanager +import ctypes +import dataclasses +import json +import logging +import signal +import sys +from threading import Event +from typing import BinaryIO + +from fido2.client import DefaultClientDataCollector +from fido2.client import Fido2Client, UserInteraction, WebAuthnClient +from fido2.hid import CtapHidDevice +from fido2.webauthn import AuthenticationResponse +from fido2.webauthn import PublicKeyCredentialRequestOptions, UserVerificationRequirement + +try: + from fido2.client.windows import WindowsClient +except ImportError: + WindowsClient = None + +_PLUGIN_ENDIANNESS = 'little' +_PLUGIN_HEADER_SIZE = 4 + +# Exit codes. +_EXIT_NO_FIDO2_DEVICES = 11 +_EXIT_ALL_ASSERTIONS_FAILED = 12 +_EXIT_NO_MATCHING_CRED = 13 + + +def read_full(r: BinaryIO, size: int) -> bytes: + """Read an exact amount of data. + + Raises exception on error or EOF. + """ + b = r.read(size) + if len(b) != size: + raise EOFError(f"premature EOF when reading {size} bytes from {r}.") + return b + + +def write_full(w: BinaryIO, b: bytes): + """Write all bytes. + + Raises IOError if the write isn't complete. + """ + written = w.write(b) + if written != len(b): + raise IOError( + f"failed to write fully, wrote {written} bytes out of {_PLUGIN_HEADER_SIZE} bytes." + ) + + +def plugin_read(r: BinaryIO) -> bytes: + """Read a framed WebAuthn plugin message. + + A frame consists of: 4 bytes of little endian uint32 length, plus + this amount bytes of binary data. + """ + header = read_full(r, _PLUGIN_HEADER_SIZE) + length = int.from_bytes(header, _PLUGIN_ENDIANNESS, signed=False) + return read_full(r, length) + + +def plugin_write(w: BinaryIO, b: bytes): + """Write a framed Webauthn plugin message. + + A frame consists of: 4 bytes of little endian uint32 length, plus + this amount bytes of binary data. + """ + length = len(b) + header = length.to_bytes(_PLUGIN_HEADER_SIZE, + _PLUGIN_ENDIANNESS, + signed=False) + write_full(w, header) + write_full(w, b) + + +@dataclasses.dataclass +class PluginRequest: + origin: str + public_key_credential_request: PublicKeyCredentialRequestOptions + + +def parse_plugin_request(b: bytes) -> PluginRequest: + """Parse a plugin request JSON string.""" + j = json.loads(b) + + req = PublicKeyCredentialRequestOptions.from_dict(j["requestData"]) + + # Apply overrides to certain fields. + req = PublicKeyCredentialRequestOptions( + challenge=req.challenge, + rp_id=req.rp_id, + allow_credentials=req.allow_credentials, + hints=req.hints, + + # Default to 30s timeout. + timeout=req.timeout or 30_000, + + # Discourage UV. + # + # ReAuth flow is triggered for user who's already logged in, so + # there's no need to ask for PIN/password authentication factor. + # + # Here we only want to test for user presence and ownership of + # the private key. + user_verification=UserVerificationRequirement.DISCOURAGED, + + # Don't support extensions for now. + extensions=None, + ) + + return PluginRequest( + origin=j["origin"], + public_key_credential_request=req, + ) + + +def encode_plugin_response(a: AuthenticationResponse) -> bytes: + """Encode a plugin response to JSON.""" + return json.dumps({ + "type": "getResponse", + "responseData": dict(a), + "error": None, + }).encode('utf-8') + + +class DiscardInteraction(UserInteraction): + """Handler when user interaction is required. + + This plugin's stdin/stdout talks with git-credential-luci, so we fail + actions that require user input (this plugin shouldn't set any flag + that require user interaction). + """ + + def prompt_up(self): + sys.stderr.write("\nTouch your blinking security key to continue.\n\n") + + def request_pin(self, permissions, rp_id): + # This plugin shouldn't set assertion flags that will require + # PIN entry. + return None + + def request_uv(self, permissions, rp_id): + # Don't allow user verification (UV), because we don't allow PIN + # entry, UV will fail. + return False + + +def get_clients(origin: str) -> list[tuple[WebAuthnClient, str]]: + """Return WebAuthn clients. + + The return value is a list of (WebAuthnClient, client description) + where we can send assertion requests to. + + On Windows, this method returns a client that talks with Win32 API + if available. + """ + client_data_collector = DefaultClientDataCollector(origin) + + # Use Windows WebAuthn API if available, and if we aren't running as admin. + if WindowsClient is not None \ + and WindowsClient.is_available(): + if ctypes.windll.shell32.IsUserAnAdmin(): + logging.info("User is admin") + else: + logging.info("Using WindowsClient") + return [(WindowsClient(client_data_collector), "WindowsWebAuthn")] + + user_interaction = DiscardInteraction() + clients = [] + for dev in CtapHidDevice.list_devices(): + desc = dev.descriptor + desc_str = (f'CtapHidDevice {desc.product_name}' + f' (VID 0x{desc.vid:04x},' + f' PID 0x{desc.pid:04x}) at {desc.path}') + logging.info("Found %s", desc_str) + clients.append(( + Fido2Client( + dev, + client_data_collector=client_data_collector, + user_interaction=user_interaction, + ), + desc_str, + )) + + return clients + + +def assert_on_client(*, client: WebAuthnClient, client_desc: str, + request: PublicKeyCredentialRequestOptions, cancel: Event): + try: + return client.get_assertion(request, cancel) + except Exception as e: + if not cancel.is_set(): + logging.error("Assertion failed on %s: %s", client_desc, e) + return None + + +@contextmanager +def set_event_on_signal(signum: int, event: Event): + """Return a context manager that sets `event` when `signum` is signaled.""" + original_handler = signal.getsignal(signum) + + def handler(signum, _): + logging.info("Signal %s received.", signal.strsignal(signum)) + event.set() + + signal.signal(signum, handler) + try: + yield + finally: + signal.signal(signum, original_handler) + + +def main(): + logging.basicConfig(level=logging.INFO) + plugin_req = parse_plugin_request(plugin_read(sys.stdin.buffer)) + + clients = get_clients(plugin_req.origin) + if not clients: + logging.error("No available FIDO devices.") + sys.exit(_EXIT_NO_FIDO2_DEVICES) + + # Race and retrieve the first successful assertion. + outcome = None + cancel = Event() + with set_event_on_signal(signal.SIGINT, cancel), set_event_on_signal( + signal.SIGTERM, + cancel), ThreadPoolExecutor(max_workers=len(clients)) as executor: + futures = [ + executor.submit(assert_on_client, + client=client, + client_desc=desc, + request=plugin_req.public_key_credential_request, + cancel=cancel) for client, desc in clients + ] + for future in as_completed(futures): + if result := future.result(): + outcome = result + cancel.set() + break + + if not outcome: + logging.error("All assertions failed or timed out.") + sys.exit(_EXIT_ALL_ASSERTIONS_FAILED) + + assertions = outcome.get_assertions() + if not assertions: + logging.error("No matching credential.") + sys.exit(_EXIT_NO_MATCHING_CRED) + elif len(assertions) > 1: + logging.warning( + "Multiple assertions returned for rp_id %s, selecting the first one.", + plugin_req.public_key_credential_request.rp_id) + + # Write the first completed assertion. + plugin_write(sys.stdout.buffer, + encode_plugin_response(outcome.get_response(0))) + + +if __name__ == "__main__": + main() diff --git a/tests/luci_auth_fido2_plugin_test.py b/tests/luci_auth_fido2_plugin_test.py new file mode 100755 index 0000000000..9885415066 --- /dev/null +++ b/tests/luci_auth_fido2_plugin_test.py @@ -0,0 +1,69 @@ +#!/usr/bin/env vpython3 +# coding=utf-8 +# Copyright (c) 2012 The Chromium Authors. All rights reserved. +# Use of this source code is governed by a BSD-style license that can be +# found in the LICENSE file. + +# [VPYTHON:BEGIN] +# python_version: "3.11" +# wheel: < +# name: "infra/python/wheels/cffi/${vpython_platform}" +# version: "version:1.15.1.chromium.2" +# > +# wheel: < +# name: "infra/python/wheels/cryptography/${vpython_platform}" +# version: "version:43.0.0" +# > +# wheel: < +# name: "infra/python/wheels/pycparser-py2_py3" +# version: "version:2.21" +# > +# wheel: < +# name: "infra/python/wheels/fido2-py3" +# version: "version:2.0.0" +# > +# [VPYTHON:END] +"""Unit tests for luci_auth_fido2_plugin.py.""" + +import logging +import os +import sys +import unittest + +sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) + +from fido2.webauthn import PublicKeyCredentialDescriptor +from fido2.webauthn import PublicKeyCredentialRequestOptions +from fido2.webauthn import PublicKeyCredentialType +from fido2.webauthn import UserVerificationRequirement + +import luci_auth_fido2_plugin as plugin + + +class TestFido2Plugin(unittest.TestCase): + + def test_parse_plugin_request(self): + req = b'{"type":"get","origin":"https://accounts.google.com","requestData":{"rpId":"google.com","challenge":"alice-==","timeout":30000,"allowCredentials":[{"type":"public-key","id":"key="}],"userVerification":"preferred","extensions":{"appid":"google.com"}}}' + got = plugin.parse_plugin_request(req) + want = plugin.PluginRequest( + origin='https://accounts.google.com', + public_key_credential_request=PublicKeyCredentialRequestOptions( + challenge=b'jX\x9c{', + timeout=30_000, + rp_id='google.com', + allow_credentials=[ + PublicKeyCredentialDescriptor( + type=PublicKeyCredentialType.PUBLIC_KEY, + id=b'\x91\xec', + ) + ], + user_verification=UserVerificationRequirement.DISCOURAGED, + ), + ) + self.assertEqual(got, want) + + +if __name__ == '__main__': + logging.basicConfig( + level=logging.DEBUG if '-v' in sys.argv else logging.ERROR) + unittest.main()