Files
chromium_depot_tools/tests/siso_test.py
Alex Ovsienko 02f1c0f8b7 siso: rewrite the entire siso test to pytest.
With this tests will be easier to read and support.

Also have improved parameterized tests.

The reason why pytest was picked is because as my knowledge goes this is the most up to date and supported unit test framework in Python. The current parameterized tests state is primitive, and the parameterized library was last updated 2 years ago. pytest will help us adopt best practices that later perhaps we could spread to other unit tests in depot_tools.

Bug: 459690822
Change-Id: I67d16b56ff3c4dbd260ea0b07354ef766a6a6964
Reviewed-on: https://chromium-review.googlesource.com/c/chromium/tools/depot_tools/+/7246550
Reviewed-by: Gavin Mak <gavinmak@google.com>
Commit-Queue: Alex Ovsienko <ovsienko@google.com>
Reviewed-by: Junji Watanabe <jwata@google.com>
2025-12-17 16:05:19 -08:00

1055 lines
39 KiB
Python
Executable File

#!/usr/bin/env vpython3
# Copyright (c) 2024 The Chromium Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
import io
import os
import shlex
import sys
import pytest
import subprocess
import itertools
from pathlib import Path
from typing import Any, Dict, List, Tuple, Generator, Optional
ROOT_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
sys.path.insert(0, ROOT_DIR)
import siso
# These are required for fixtures to work.
# pylint: disable=redefined-outer-name,unused-argument
@pytest.fixture
def siso_test_fixture(tmp_path: Path,
mocker: Any) -> Generator[None, None, None]:
# Replace trial dir functionality with tmp_parth.
previous_dir = os.getcwd()
os.chdir(tmp_path)
mocker.patch("siso.getpass.getuser", return_value="testuser")
yield
os.chdir(previous_dir)
def test_load_sisorc_no_file(siso_test_fixture: Any) -> None:
global_flags, subcmd_flags = siso.load_sisorc(
os.path.join("build", "config", "siso", ".sisorc"))
assert global_flags == []
assert subcmd_flags == {}
def test_load_sisorc(siso_test_fixture: Any) -> None:
sisorc = os.path.join("build", "config", "siso", ".sisorc")
os.makedirs(os.path.dirname(sisorc))
with open(sisorc, "w") as f:
f.write("""
# comment
-credential_helper=gcloud
ninja --failure_verbose=false -k=0
""")
global_flags, subcmd_flags = siso.load_sisorc(sisorc)
assert global_flags == ["-credential_helper=gcloud"]
assert subcmd_flags == {"ninja": ["--failure_verbose=false", "-k=0"]}
def test_apply_sisorc_none(siso_test_fixture: Any) -> None:
new_args = siso.apply_sisorc([], {}, ["ninja", "-C", "out/Default"],
"ninja")
assert new_args == ["ninja", "-C", "out/Default"]
def test_apply_sisorc_nosubcmd(siso_test_fixture: Any) -> None:
new_args = siso.apply_sisorc([], {"ninja": ["-k=0"]}, ["-version"], "")
assert new_args == ["-version"]
def test_apply_sisorc(siso_test_fixture: Any) -> None:
new_args = siso.apply_sisorc(
["-credential_helper=luci-auth"], {"ninja": ["-k=0"]},
["-log_dir=/tmp", "ninja", "-C", "out/Default"], "ninja")
assert new_args == [
"-credential_helper=luci-auth", "-log_dir=/tmp", "ninja", "-k=0", "-C",
"out/Default"
]
def test_is_subcommand_present(mocker: Any) -> None:
mock_call = mocker.patch("siso.subprocess.call")
def side_effect(cmd, *_, **__):
if cmd[2] in ["collector", "ninja"]:
return 0
return 2
mock_call.side_effect = side_effect
assert siso._is_subcommand_present("siso_path", "collector")
assert siso._is_subcommand_present("siso_path", "ninja")
assert not siso._is_subcommand_present("siso_path", "unknown")
@pytest.mark.parametrize("args, want", [
pytest.param(["ninja", "-C", "out/Default"], [
"ninja", "-C", "out/Default", "--metrics_labels",
f"type=developer,tool=siso,host_os={siso._SYSTEM_DICT.get(sys.platform, sys.platform)}"
],
id="no_labels"),
pytest.param(["ninja", "-C", "out/Default", "--metrics_labels=foo=bar"],
["ninja", "-C", "out/Default", "--metrics_labels=foo=bar"],
id="labels_exist"),
])
def test_apply_metrics_labels(args: List[str], want: List[str]) -> None:
got = siso.apply_metrics_labels(args)
assert got == want
@pytest.mark.parametrize("args, env, want", [
pytest.param(["ninja", "-C", "out/Default"], {},
["ninja", "-C", "out/Default"],
id="no_env_flags"),
pytest.param([
"ninja", "-C", "out/Default", "--enable_cloud_monitoring",
"--enable_cloud_profiler"
], {}, [
"ninja", "-C", "out/Default", "--enable_cloud_monitoring",
"--enable_cloud_profiler"
],
id="some_already_applied_no_env_flags"),
pytest.param(
["ninja", "-C", "out/Default", "--metrics_project", "some_project"], {},
[
"ninja", "-C", "out/Default", "--metrics_project", "some_project",
"--enable_cloud_monitoring", "--enable_cloud_profiler",
"--enable_cloud_trace", "--enable_cloud_logging"
],
id="metrics_project_set"),
pytest.param(["ninja", "-C", "out/Default"],
{"RBE_metrics_project": "some_project"}, [
"ninja", "-C", "out/Default", "--enable_cloud_monitoring",
"--enable_cloud_profiler", "--enable_cloud_trace",
"--enable_cloud_logging"
],
id="metrics_project_set_thru_env"),
pytest.param(["ninja", "-C", "out/Default", "--project", "some_project"],
{}, [
"ninja", "-C", "out/Default", "--project", "some_project",
"--enable_cloud_monitoring", "--enable_cloud_profiler",
"--enable_cloud_trace", "--enable_cloud_logging",
"--metrics_project=some_project"
],
id="cloud_project_set"),
pytest.param(["ninja", "-C", "out/Default"],
{"SISO_PROJECT": "some_project"}, [
"ninja", "-C", "out/Default", "--enable_cloud_monitoring",
"--enable_cloud_profiler", "--enable_cloud_trace",
"--enable_cloud_logging", "--metrics_project=some_project"
],
id="cloud_project_set_thru_env"),
pytest.param(
["ninja", "-C", "out/Default", "--enable_cloud_profiler=false"],
{"SISO_PROJECT": "some_project"}, [
"ninja", "-C", "out/Default", "--enable_cloud_profiler=false",
"--enable_cloud_monitoring", "--enable_cloud_trace",
"--enable_cloud_logging", "--metrics_project=some_project"
],
id="respects_set_flags"),
])
def test_apply_telemetry_flags(args: List[str], env: Dict[str, str],
want: List[str]) -> None:
got = siso.apply_telemetry_flags(args, env)
assert got == want
def test_apply_telemetry_flags_sets_expected_env_var(mocker: Any) -> None:
mocker.patch.dict("os.environ", {})
args = [
"ninja",
"-C",
"out/Default",
]
env = {}
_ = siso.apply_telemetry_flags(args, env)
assert env.get("GOOGLE_API_USE_CLIENT_CERTIFICATE") == "false"
@pytest.mark.parametrize("args, env, want", [
pytest.param(
["--metrics_project", "proj1"], {}, "proj1", id="metrics_project_arg"),
pytest.param(["--project", "proj2"], {}, "proj2", id="project_arg"),
pytest.param(["--metrics_project", "proj1", "--project", "proj2"], {},
"proj1",
id="metrics_project_and_project_args"),
pytest.param([], {"RBE_metrics_project": "proj3"},
"proj3",
id="rbe_metrics_project_env"),
pytest.param([], {"SISO_PROJECT": "proj4"}, "proj4", id="siso_project_env"),
pytest.param([], {
"RBE_metrics_project": "proj3",
"SISO_PROJECT": "proj4"
},
"proj3",
id="rbe_and_siso_project_env"),
pytest.param(["--project", "proj2"], {"RBE_metrics_project": "proj3"},
"proj2",
id="project_arg_and_rbe_env"),
pytest.param(["--metrics_project", "proj1"],
{"RBE_metrics_project": "proj3"},
"proj1",
id="metrics_project_arg_and_rbe_env"),
pytest.param([], {}, "", id="no_project"),
pytest.param(["-metrics_project", "proj1"], {},
"proj1",
id="short_metrics_project_arg"),
pytest.param(["-project", "proj2"], {}, "proj2", id="short_project_arg"),
])
def test_fetch_metrics_project(args: List[str], env: Dict[str, str],
want: str) -> None:
got = siso._fetch_metrics_project(args, env)
assert got == want
@pytest.mark.parametrize("platform, env_vars, want_path_template", [
("Linux", {
"XDG_RUNTIME_DIR": os.path.join("{root_dir}", "run", "user", "1000")
}, os.path.join("{root_dir}", "run", "user", "1000", "{user}", "siso")),
("Linux", {}, os.path.join("/tmp", "{user}", "siso")),
("Darwin", {
"TMPDIR": os.path.join("{root_dir}", "var", "folders", "12", "345..."),
},
os.path.join("{root_dir}", "var", "folders", "12", "345...", "{user}",
"siso")),
("Darwin", {}, os.path.join("/tmp", "{user}", "siso")),
("Linux", {
"XDG_RUNTIME_DIR": "a" * 100
}, os.path.join("/tmp", "{user}", "siso")),
])
def test_resolve_sockets_folder(siso_test_fixture: Any, tmp_path: Path,
platform: str, env_vars: Dict[str, str],
want_path_template: str, mocker: Any) -> None:
user = "testuser"
# Replace placeholders in paths
for key, value in env_vars.items():
env_vars[key] = value.format(root_dir=str(tmp_path))
want_path = want_path_template.format(root_dir=str(tmp_path), user=user)
mocker.patch("sys.platform", new=platform.lower())
path, length = siso._resolve_sockets_folder(env_vars)
# If the desired path is too long, the function will fall back to /tmp/<user>/siso
if (104 - len(want_path) - 6) < 1:
expected_path = os.path.join("/tmp", user, "siso")
else:
expected_path = want_path
expected_len = 104 - len(expected_path) - 6
# Windows.
assert path == expected_path
assert length == expected_len
assert os.path.isdir(path)
def test_handle_collector_args_disabled(mocker: Any) -> None:
mock_fetch = mocker.patch("siso._fetch_metrics_project",
return_value="test-project")
mock_start_collector = mocker.patch("siso._start_collector")
mocker.patch("sys.platform", new="linux")
siso_path = "path/to/siso"
out_dir = "out/Default"
env = {"SISO_PROJECT": "test-project"}
args = ["ninja", "-C", out_dir]
result = siso._handle_collector_args(siso_path, args, env)
assert result == args
mock_fetch.assert_not_called()
mock_start_collector.assert_not_called()
@pytest.mark.skipif(sys.platform == "win32",
reason="Skipping Linux-specific test on Windows")
def test_handle_collector_args_starts_linux(siso_test_fixture: Any,
mocker: Any) -> None:
mock_start_collector = mocker.patch("siso._start_collector",
return_value=True)
mocker.patch("sys.platform", new="linux")
siso_path = "path/to/siso"
env = {"SISO_PROJECT": "test-project", "XDG_RUNTIME_DIR": "/tmp/run"}
args = ["ninja", "--enable_collector"]
captured_args = []
def fetch_metrics_project_side_effect(args, env):
captured_args.append(list(args))
return "test-project"
mocker.patch("siso._fetch_metrics_project",
side_effect=fetch_metrics_project_side_effect)
result = siso._handle_collector_args(siso_path, args, env)
sockets_file = os.path.join("/tmp", "run", "testuser", "siso",
"test-project.sock")
assert result == [
"ninja", "--enable_collector",
f"--collector_address=unix://{sockets_file}"
]
assert captured_args == [["ninja", "--enable_collector"]]
mock_start_collector.assert_called_once_with(siso_path, sockets_file,
"test-project")
def test_handle_collector_args_starts_windows(mocker: Any) -> None:
mock_start_collector = mocker.patch("siso._start_collector",
return_value=True)
mock_fetch = mocker.patch("siso._fetch_metrics_project",
return_value="test-project")
mocker.patch("sys.platform", new="win32")
siso_path = "path/to/siso"
env = {"SISO_PROJECT": "test-project"}
args = ["ninja", "--enable_collector"]
original_args = list(args)
result = siso._handle_collector_args(siso_path, args, env)
assert result == ["ninja", "--enable_collector"]
mock_fetch.assert_called_once_with(original_args, env)
mock_start_collector.assert_called_once_with(siso_path, None,
"test-project")
@pytest.mark.skipif(sys.platform == "win32",
reason="Skipping Linux-specific test on Windows")
def test_handle_collector_args_fails(siso_test_fixture: Any,
mocker: Any) -> None:
mock_start_collector = mocker.patch("siso._start_collector",
return_value=False)
mocker.patch("sys.platform", new="linux")
siso_path = "path/to/siso"
env = {"SISO_PROJECT": "test-project", "XDG_RUNTIME_DIR": "/tmp/run"}
args = ["ninja", "--enable_collector"]
captured_args = []
def fetch_metrics_project_side_effect(args, env):
captured_args.append(list(args))
return "test-project"
mocker.patch("siso._fetch_metrics_project",
side_effect=fetch_metrics_project_side_effect)
result = siso._handle_collector_args(siso_path, args, env)
assert result == ["ninja", "--enable_collector=false"]
assert captured_args == [["ninja", "--enable_collector"]]
sockets_file = os.path.join("/tmp", "run", "testuser", "siso",
"test-project.sock")
mock_start_collector.assert_called_once_with(siso_path, sockets_file,
"test-project")
@pytest.fixture
def start_collector_mocks(mocker: Any) -> Dict[str, Any]:
mocks = {
"is_subcommand_present":
mocker.patch("siso._is_subcommand_present", return_value=True),
"subprocess_run":
mocker.patch("siso.subprocess.run"),
"kill_collector":
mocker.patch("siso._kill_collector"),
"time_sleep":
mocker.patch("siso.time.sleep"),
"time_time":
mocker.patch("siso.time.time",
side_effect=(1000 + i * 0.1 for i in range(100))),
"http_connection":
mocker.patch("siso.http.client.HTTPConnection"),
"subprocess_popen":
mocker.patch("siso.subprocess.Popen"),
}
mock_conn = mocker.Mock()
mocks["http_connection"].return_value = mock_conn
mocks["mock_conn"] = mock_conn
return mocks
def _configure_http_responses(
mocker: Any,
mock_conn: Any,
status_responses: List[Tuple[int, Any]],
config_responses: Optional[List[Tuple[int, Any]]] = None) -> None:
if config_responses is None:
config_responses = []
request_path_history = []
def request_side_effect(method, path):
request_path_history.append(path)
def getresponse_side_effect():
path = request_path_history[-1]
if path == "/health/status":
if not status_responses:
return mocker.Mock(status=404,
read=mocker.Mock(return_value=b""))
status_code, _ = status_responses.pop(0)
return mocker.Mock(status=status_code,
read=mocker.Mock(return_value=b""))
if path == "/health/config":
if not config_responses:
return mocker.Mock(status=200,
read=mocker.Mock(return_value=b"{}"))
status_code, _ = config_responses.pop(0)
return mocker.Mock(status=status_code,
read=mocker.Mock(return_value=b""))
return mocker.Mock(status=404)
mock_conn.request.side_effect = request_side_effect
mock_conn.getresponse.side_effect = getresponse_side_effect
def test_start_collector_removes_existing_socket_file(
start_collector_mocks: Dict[str, Any], mocker: Any) -> None:
mocker.patch("sys.platform", new="linux")
mock_os_path_exists = mocker.patch("os.path.exists", return_value=True)
mock_os_remove = mocker.patch("os.remove")
siso_path = "siso_path"
project = "test-project"
sockets_file = os.path.join("/tmp", "test.sock")
_configure_http_responses(mocker,
start_collector_mocks["mock_conn"],
status_responses=[(404, None), (200, None)],
config_responses=[(200, None), (200, None)])
status_healthy = {"healthy": True, "status": "StatusOK"}
config = {
"receivers": {
"otlp": {
"protocols": {
"grpc": {
"endpoint": sockets_file
}
}
}
}
}
mocker.patch("siso.json.loads",
side_effect=[status_healthy, config, config])
siso._start_collector(siso_path, sockets_file, project)
mock_os_path_exists.assert_called_with(sockets_file)
mock_os_remove.assert_called_with(sockets_file)
def test_start_collector_remove_socket_file_fails(
start_collector_mocks: Dict[str, Any], mocker: Any) -> None:
mocker.patch("sys.platform", new="linux")
mock_os_path_exists = mocker.patch("os.path.exists", return_value=True)
mock_os_remove = mocker.patch("os.remove",
side_effect=OSError("Permission denied"))
mock_stderr = mocker.patch("sys.stderr", new_callable=io.StringIO)
siso_path = "siso_path"
project = "test-project"
sockets_file = os.path.join("/tmp", "test.sock")
_configure_http_responses(mocker,
start_collector_mocks["mock_conn"],
status_responses=[(404, None), (200, None)],
config_responses=[(200, None), (200, None)])
status_healthy = {"healthy": True, "status": "StatusOK"}
config = {
"receivers": {
"otlp": {
"protocols": {
"grpc": {
"endpoint": siso._OTLP_DEFAULT_TCP_ENDPOINT
}
}
}
}
}
mocker.patch("siso.json.loads",
side_effect=[status_healthy, config, config])
siso._start_collector(siso_path, sockets_file, project)
mock_os_path_exists.assert_called_with(sockets_file)
mock_os_remove.assert_called_with(sockets_file)
assert f"Failed to remove {sockets_file}" in mock_stderr.getvalue()
@pytest.mark.parametrize(
"global_flags, subcmd_flags, args, subcmd, should_collect_logs, env, want, want_stderr",
[
pytest.param([], {}, ["other", "-C", "out/Default"], "other", True, {},
["other", "-C", "out/Default"], "",
id="no_ninja"),
pytest.param(
[], {}, ["ninja", "-C", "out/Default"], "ninja", False, {}, [
"ninja",
"-C",
"out/Default",
"--metrics_labels",
f"type=developer,tool=siso,host_os={siso._SYSTEM_DICT.get(sys.platform, sys.platform)}",
],
"",
id="ninja_no_logs"),
pytest.param(
[], {}, ["ninja", "-C", "out/Default"], "ninja", True, {}, [
"ninja",
"-C",
"out/Default",
"--metrics_labels",
f"type=developer,tool=siso,host_os={siso._SYSTEM_DICT.get(sys.platform, sys.platform)}",
],
"",
id="ninja_with_logs_no_project"),
pytest.param(
[], {},
["ninja", "-C", "out/Default", "--project=test-project"], "ninja",
True, {}, [
"ninja",
"-C",
"out/Default",
"--project=test-project",
"--metrics_labels",
f"type=developer,tool=siso,host_os={siso._SYSTEM_DICT.get(sys.platform, sys.platform)}",
"--enable_cloud_monitoring",
"--enable_cloud_profiler",
"--enable_cloud_trace",
"--enable_cloud_logging",
"--metrics_project=test-project",
],
"",
id="ninja_with_logs_with_project_in_args"),
pytest.param(
[], {}, ["ninja", "-C", "out/Default"], "ninja", True,
{"SISO_PROJECT": "test-project"},
[
"ninja",
"-C",
"out/Default",
"--metrics_labels",
f"type=developer,tool=siso,host_os={siso._SYSTEM_DICT.get(sys.platform, sys.platform)}",
"--enable_cloud_monitoring",
"--enable_cloud_profiler",
"--enable_cloud_trace",
"--enable_cloud_logging",
"--metrics_project=test-project",
],
"",
id="ninja_with_logs_with_project_in_env"),
pytest.param(
["-gflag"], {"ninja": ["-sflag"]}, ["ninja", "-C", "out/Default"],
"ninja", False, {}, [
"-gflag", "ninja", "-sflag", "-C", "out/Default",
"--metrics_labels",
f"type=developer,tool=siso,host_os={siso._SYSTEM_DICT.get(sys.platform, sys.platform)}",
],
"depot_tools/siso.py: %s\n" \
% shlex.join(["-gflag", "ninja", "-sflag", "-C", "out/Default"]),
id="with_sisorc"),
pytest.param(
["-gflag_only"],
{},
["ninja", "-C", "out/Default"],
"ninja",
False,
{},
[
"-gflag_only",
"ninja",
"-C",
"out/Default",
"--metrics_labels",
f"type=developer,tool=siso,host_os={siso._SYSTEM_DICT.get(sys.platform, sys.platform)}",
],
"depot_tools/siso.py: %s\n"
% shlex.join(["-gflag_only", "ninja", "-C", "out/Default"]),
id="with_sisorc_global_flags_only"
),
pytest.param(
[],
{"ninja": ["-sflag_only"]},
["ninja", "-C", "out/Default"],
"ninja",
False,
{},
[
"ninja",
"-sflag_only",
"-C",
"out/Default",
"--metrics_labels",
f"type=developer,tool=siso,host_os={siso._SYSTEM_DICT.get(sys.platform, sys.platform)}",
],
"depot_tools/siso.py: %s\n"
% shlex.join(["ninja", "-sflag_only", "-C", "out/Default"]),
id="with_sisorc_subcmd_flags_only"
),
pytest.param(
["-gflag_tel"],
{"ninja": ["-sflag_tel"]},
["ninja", "-C", "out/Default"],
"ninja",
True,
{"SISO_PROJECT": "telemetry-project"},
[
"-gflag_tel",
"ninja",
"-sflag_tel",
"-C",
"out/Default",
"--metrics_labels",
f"type=developer,tool=siso,host_os={siso._SYSTEM_DICT.get(sys.platform, sys.platform)}",
"--enable_cloud_monitoring",
"--enable_cloud_profiler",
"--enable_cloud_trace",
"--enable_cloud_logging",
"--metrics_project=telemetry-project",
],
"depot_tools/siso.py: %s\n"
% shlex.join(["-gflag_tel", "ninja", "-sflag_tel", "-C", "out/Default"]),
id="with_sisorc_global_and_subcmd_flags_and_telemetry"
),
pytest.param(
["-gflag_non_ninja"],
{"other_subcmd": ["-sflag_non_ninja"]},
["other_subcmd", "-C", "out/Default"],
"other_subcmd",
True,
{"SISO_PROJECT": "telemetry-project"},
[
"-gflag_non_ninja",
"other_subcmd",
"-sflag_non_ninja",
"-C",
"out/Default",
],
"depot_tools/siso.py: %s\n"
% shlex.join(["-gflag_non_ninja", "other_subcmd", "-sflag_non_ninja", "-C", "out/Default"]),
id="with_sisorc_non_ninja_subcmd"
),])
def test_process_args(global_flags: List[str], subcmd_flags: Dict[str,
List[str]],
args: List[str], subcmd: str, should_collect_logs: bool,
env: Dict[str, str], want: List[str], want_stderr: str,
siso_test_fixture: Any, mocker: Any) -> None:
mock_stderr = mocker.patch("sys.stderr", new_callable=io.StringIO)
got = siso._process_args(global_flags, subcmd_flags, args, subcmd,
should_collect_logs, env)
assert got == want
assert mock_stderr.getvalue() == want_stderr
# Else it won"t even compile on Windows.
if sys.platform != "win32":
SIGKILL = siso.signal.SIGKILL # pylint: disable=no-member
else:
SIGKILL = None
@pytest.mark.skipif(sys.platform == "win32", reason="Not applicable on Windows")
@pytest.mark.parametrize(
"stdout, stderr, returncode, kill_side_effect, expected_result, expected_kill_args",
[
pytest.param(
b"123\n", b"", 0, None, True,
(123, SIGKILL), id="found_and_killed"),
pytest.param(b"",
b"lsof: no process found\n",
1,
None,
False,
None,
id="process_not_found"),
pytest.param(b"123\n",
b"",
0,
OSError("Operation not permitted"),
False, (123, SIGKILL),
id="kill_fails"),
pytest.param(b"\n", b"", 0, None, False, None, id="no_pids_found"),
pytest.param(b"0\n123\n456\n",
b"",
0,
None,
True, (123, SIGKILL),
id="multiple_pids_found"),
])
def test_kill_collector_posix(stdout: bytes, stderr: bytes, returncode: int,
kill_side_effect: Optional[OSError],
expected_result: bool,
expected_kill_args: Optional[Tuple[int, Any]],
mocker: Any) -> None:
mocker.patch("sys.platform", new="linux")
mock_os_kill = mocker.patch("siso.os.kill")
mock_subprocess_run = mocker.patch("siso.subprocess.run")
mock_subprocess_run.return_value = mocker.Mock(stdout=stdout,
stderr=stderr,
returncode=returncode)
mock_os_kill.side_effect = kill_side_effect
result = siso._kill_collector()
assert result == expected_result
mock_subprocess_run.assert_called_once_with(
["lsof", "-t", f"-i:{siso._OTLP_HEALTH_PORT}"], capture_output=True)
if expected_kill_args:
mock_os_kill.assert_called_once_with(*expected_kill_args)
else:
mock_os_kill.assert_not_called()
@pytest.mark.skipif(sys.platform != "win32", reason="Only for Windows")
@pytest.mark.parametrize("run_effects, expected_result, expected_calls", [
pytest.param([
(f" TCP 127.0.0.1:{siso._OTLP_HEALTH_PORT} [::]:0 LISTENING 1234\r\n"
.encode("utf-8"), b"", 0),
(b"", b"", 0),
],
True, [
["netstat", "-aon"],
["taskkill", "/F", "/T", "/PID", "1234"],
],
id="found_and_killed"),
pytest.param([
(b" TCP 0.0.0.0:135 0.0.0.0:0 LISTENING 868\r\n",
b"", 0),
],
False, [["netstat", "-aon"]],
id="process_not_found"),
pytest.param([
(f" TCP 127.0.0.1:{siso._OTLP_HEALTH_PORT} [::]:0 LISTENING 1234\r\n TCP 127.0.0.1:{siso._OTLP_HEALTH_PORT} [::]:0 LISTENING 5678\r\n"
.encode("utf-8"), b"", 0),
(b"", b"", 0),
],
True, [
["netstat", "-aon"],
["taskkill", "/F", "/T", "/PID", "1234"],
],
id="multiple_pids_found"),
pytest.param([
(b"", b"netstat error\n", 1),
],
False, [["netstat", "-aon"]],
id="netstat_fails"),
pytest.param([
(f" TCP 127.0.0.1:{siso._OTLP_HEALTH_PORT} [::]:0 LISTENING 1234\r\n"
.encode("utf-8"), b"", 0),
(b"", b"ERROR: Cannot terminate process.", 1),
],
False, [
["netstat", "-aon"],
["taskkill", "/F", "/T", "/PID", "1234"],
],
id="taskkill_fails"),
])
def test_kill_collector_windows(run_effects: List[Tuple[bytes, bytes, int]],
expected_result: bool,
expected_calls: List[List[str]],
mocker: Any) -> None:
mock_subprocess_run = mocker.patch("siso.subprocess.run")
mock_subprocess_run.side_effect = [
mocker.Mock(stdout=stdout, stderr=stderr, returncode=returncode)
for stdout, stderr, returncode in run_effects
]
result = siso._kill_collector()
assert result == expected_result
calls = [mocker.call(c, capture_output=True) for c in expected_calls]
mock_subprocess_run.assert_has_calls(calls)
assert mock_subprocess_run.call_count == len(calls)
def test_start_collector_subcommand_not_present(
start_collector_mocks: Dict[str, Any]) -> None:
m = start_collector_mocks
m["is_subcommand_present"].return_value = False
siso_path = "siso_path"
project = "test-project"
result = siso._start_collector(siso_path, None, project)
assert not result
m["is_subcommand_present"].assert_called_once_with(siso_path, "collector")
@pytest.mark.parametrize(
"platform, creationflags",
[
("linux", 0),
("win32", 512), # subprocess.CREATE_NEW_PROCESS_GROUP
])
def test_start_collector_dead_then_healthy(platform: str, creationflags: int,
start_collector_mocks: Dict[str,
Any],
mocker: Any) -> None:
mocker.patch("sys.platform", new=platform)
mocker.patch("subprocess.CREATE_NEW_PROCESS_GROUP",
creationflags,
create=True)
mock_json_loads = mocker.patch("siso.json.loads")
m = start_collector_mocks
siso_path = "siso_path"
project = "test-project"
_configure_http_responses(mocker,
m["mock_conn"],
status_responses=[(404, None), (200, None)],
config_responses=[(200, None)])
status_healthy = {"healthy": True, "status": "StatusOK"}
config = {
"receivers": {
"otlp": {
"protocols": {
"grpc": {
"endpoint": siso._OTLP_DEFAULT_TCP_ENDPOINT
}
}
}
}
}
mock_json_loads.side_effect = [status_healthy, config]
result = siso._start_collector(siso_path, None, project)
assert result
m["subprocess_popen"].assert_called_once_with(
[siso_path, "collector", "--project", project],
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
start_new_session=True,
creationflags=creationflags)
m["kill_collector"].assert_not_called()
def test_start_collector_unhealthy_then_healthy(
start_collector_mocks: Dict[str, Any], mocker: Any) -> None:
mocker.patch("sys.platform", new="linux")
mock_json_loads = mocker.patch("siso.json.loads")
m = start_collector_mocks
siso_path = "siso_path"
project = "test-project"
_configure_http_responses(mocker,
m["mock_conn"],
status_responses=[(200, None), (200, None)],
config_responses=[(200, None), (200, None)])
status_unhealthy = {"healthy": False, "status": "NotOK"}
status_healthy = {"healthy": True, "status": "StatusOK"}
config_project_full = {
"exporters": {
"googlecloud": {
"project": project
}
},
"receivers": {
"otlp": {
"protocols": {
"grpc": {
"endpoint": siso._OTLP_DEFAULT_TCP_ENDPOINT
}
}
}
}
}
mock_json_loads.side_effect = [
status_unhealthy, status_healthy, config_project_full,
config_project_full
]
result = siso._start_collector(siso_path, None, project)
assert result
m["subprocess_popen"].assert_called_once_with(
[siso_path, "collector", "--project", project],
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
start_new_session=True,
creationflags=0)
m["kill_collector"].assert_called_once()
def test_start_collector_already_healthy(start_collector_mocks: Dict[str, Any],
mocker: Any) -> None:
mock_json_loads = mocker.patch("siso.json.loads")
m = start_collector_mocks
siso_path = "siso_path"
project = "test-project"
_configure_http_responses(mocker,
m["mock_conn"],
status_responses=[(200, None)],
config_responses=[(200, None), (200, None)])
status_healthy = {"healthy": True, "status": "StatusOK"}
config_project_full = {
"exporters": {
"googlecloud": {
"project": project
}
},
"receivers": {
"otlp": {
"protocols": {
"grpc": {
"endpoint": siso._OTLP_DEFAULT_TCP_ENDPOINT
}
}
}
}
}
mock_json_loads.side_effect = [
status_healthy, config_project_full, config_project_full
]
result = siso._start_collector(siso_path, None, project)
assert result
m["subprocess_popen"].assert_not_called()
m["kill_collector"].assert_not_called()
def test_start_collector_never_healthy(start_collector_mocks: Dict[str, Any],
mocker: Any) -> None:
mocker.patch("sys.platform", new="linux")
m = start_collector_mocks
siso_path = "siso_path"
project = "test-project"
_configure_http_responses(mocker,
m["mock_conn"],
status_responses=[(404, None)])
siso._start_collector(siso_path, None, project)
m["subprocess_popen"].assert_called_once_with(
[siso_path, "collector", "--project", project],
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
start_new_session=True,
creationflags=0)
m["kill_collector"].assert_not_called()
def test_start_collector_healthy_after_retries(start_collector_mocks: Dict[str,
Any],
mocker: Any) -> None:
mocker.patch("sys.platform", new="linux")
mock_json_loads = mocker.patch("siso.json.loads")
m = start_collector_mocks
siso_path = "siso_path"
project = "test-project"
_configure_http_responses(mocker,
m["mock_conn"],
status_responses=[(404, None), (404, None),
(404, None), (200, None)],
config_responses=[(200, None), (200, None)])
status_healthy = {"healthy": True, "status": "StatusOK"}
config_project_full = {
"exporters": {
"googlecloud": {
"project": project
}
},
"receivers": {
"otlp": {
"protocols": {
"grpc": {
"endpoint": siso._OTLP_DEFAULT_TCP_ENDPOINT
}
}
}
}
}
mock_json_loads.side_effect = [
status_healthy, config_project_full, config_project_full
]
result = siso._start_collector(siso_path, None, project)
assert result
m["subprocess_popen"].assert_called_once_with(
[siso_path, "collector", "--project", project],
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
start_new_session=True,
creationflags=0)
m["kill_collector"].assert_not_called()
@pytest.mark.parametrize(
"os_path_exists_side_effect, expected_result, http_status_responses, json_loads_side_effect_values, expected_os_exists_calls",
[
(itertools.repeat(True), True, [
(404, None), (200, None)
], ["status_healthy", "config_with_socket"], 2),
([False] * 8 + [True], True, [(404, None)] + [(200, None)] * 9,
["status_healthy", "config_with_socket"] * 9, 9),
([False] * 10, False, [(404, None)] + [(200, None)] * 9,
["status_healthy", "config_with_socket"] * 9, 10),
],
ids=["socket_exists", "appears_later", "never_appears"])
def test_start_collector_with_sockets_file(
start_collector_mocks: Dict[str, Any], mocker: Any,
os_path_exists_side_effect: Any, expected_result: bool,
http_status_responses: List[Tuple[int, Any]],
json_loads_side_effect_values: List[str],
expected_os_exists_calls: int) -> None:
mocker.patch("sys.platform", new="linux")
mock_json_loads = mocker.patch("siso.json.loads")
mocker.patch("os.path.isfile", return_value=False)
mock_os_exists = mocker.patch("os.path.exists")
mocker.patch("os.remove")
siso_path = "siso_path"
project = "test-project"
sockets_file = os.path.join("/tmp", "test-socket.sock")
status_healthy = {"healthy": True, "status": "StatusOK"}
config_with_socket = {
"receivers": {
"otlp": {
"protocols": {
"grpc": {
"endpoint": sockets_file
}
}
}
}
}
json_loads_map = {
"status_healthy": status_healthy,
"config_with_socket": config_with_socket,
}
json_loads_side_effect = [
json_loads_map[v] for v in json_loads_side_effect_values
]
m = start_collector_mocks
mock_os_exists.side_effect = os_path_exists_side_effect
mock_json_loads.side_effect = json_loads_side_effect
_configure_http_responses(mocker,
m["mock_conn"],
status_responses=list(http_status_responses),
config_responses=[(200, None)] * 20)
result = siso._start_collector(siso_path, sockets_file, project)
assert result == expected_result
m["subprocess_popen"].assert_called_once_with([
siso_path, "collector", "--project", project, "--collector_address",
f"unix://{sockets_file}"
],
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
start_new_session=True,
creationflags=0)
m["kill_collector"].assert_not_called()
assert mock_os_exists.call_count == expected_os_exists_calls
# Stanza to have pytest be executed.
if __name__ == "__main__":
sys.exit(pytest.main([__file__] + sys.argv[1:]))