siso: create _start_collector function.

It will run only when collector subcommand is present and attempt at handling the start and restart when needed.

Bug: b/455433899
Change-Id: I9a8b8001aec29b6ca3db61ec6150deb86a6a6964
Reviewed-on: https://chromium-review.googlesource.com/c/chromium/tools/depot_tools/+/7183447
Reviewed-by: Junji Watanabe <jwata@google.com>
Commit-Queue: Alex Ovsienko <ovsienko@google.com>
This commit is contained in:
Alex Ovsienko
2025-12-07 15:57:33 -08:00
committed by LUCI CQ
parent 360ca09180
commit f8cc59a94b
2 changed files with 536 additions and 2 deletions

100
siso.py
View File

@@ -1,5 +1,5 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
# Copyright 2023 The Chromium Authors. All rights reserved. # Copyright 2024 The Chromium Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be # Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file. # found in the LICENSE file.
"""This script is a wrapper around the siso binary that is pulled to """This script is a wrapper around the siso binary that is pulled to
@@ -8,6 +8,8 @@ binary when run inside a gclient source tree, so users can just type
"siso" on the command line.""" "siso" on the command line."""
import argparse import argparse
import json
import http.client
import os import os
import platform import platform
import shlex import shlex
@@ -15,6 +17,8 @@ import shutil
import signal import signal
import subprocess import subprocess
import sys import sys
import time
from enum import Enum
from typing import Optional from typing import Optional
import build_telemetry import build_telemetry
@@ -23,6 +27,7 @@ import gclient_paths
_SYSTEM_DICT = {"Windows": "windows", "Darwin": "mac", "Linux": "linux"} _SYSTEM_DICT = {"Windows": "windows", "Darwin": "mac", "Linux": "linux"}
_OTLP_DEFAULT_TCP_ENDPOINT = "127.0.0.1:4317"
_OTLP_HEALTH_PORT = 13133 _OTLP_HEALTH_PORT = 13133
@@ -122,6 +127,99 @@ def _kill_collector() -> bool:
return False return False
# Start collector when present.
# Returns boolean whether collector has started successfully and a potential sockets path.
def _start_collector(siso_path: str, sockets_file: Optional[str],
project: str) -> bool:
if not _is_subcommand_present(siso_path, "collector"):
print(f"Collector is not present in the submitted siso: {siso_path}")
return False
class Status(Enum):
HEALTHY = 1
WRONG_PROJECT = 2
WRONG_ENDPOINT = 3
UNHEALTHY = 4
DEAD = 5
def collector_status() -> Status:
conn = http.client.HTTPConnection(f"localhost:{_OTLP_HEALTH_PORT}")
try:
conn.request("GET", "/health/status")
except ConnectionError:
return Status.DEAD
response = conn.getresponse()
if response.status != 200:
return Status.DEAD
status = json.loads(response.read())
if not status["healthy"] or status["status"] != "StatusOK":
return Status.UNHEALTHY
if fetch_project(conn) != project:
return Status.WRONG_PROJECT
endpoint = fetch_receiver_endpoint(conn)
expected_endpoint = sockets_file or _OTLP_DEFAULT_TCP_ENDPOINT
if endpoint != expected_endpoint:
return Status.WRONG_ENDPOINT
return Status.HEALTHY
def fetch_project(conn: http.client.HTTPConnection) -> str:
conn.request("GET", "/health/config")
response = conn.getresponse()
resp_json = json.loads(response.read())
try:
return resp_json["exporters"]["googlecloud"]["project"]
except KeyError:
return ""
def fetch_receiver_endpoint(conn: http.client.HTTPConnection) -> str:
conn.request("GET", "/health/config")
response = conn.getresponse()
resp_json = json.loads(response.read())
try:
return resp_json["receivers"]["otlp"]["protocols"]["grpc"][
"endpoint"]
except KeyError:
return ""
# Closure fetch parameters.
def start_collector() -> None:
# Use Popen as it's non blocking.
creationflags = 0
if platform.system() == "Windows":
creationflags = subprocess.CREATE_NEW_PROCESS_GROUP
cmd = [siso_path, "collector", "--project", project]
if sockets_file:
cmd += ["--otel_socket", sockets_file]
subprocess.Popen(
cmd,
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
start_new_session=True,
creationflags=creationflags,
)
start = time.time()
status = collector_status()
if status == Status.HEALTHY:
return True
if status != Status.DEAD:
if not _kill_collector():
return False
start_collector()
while time.time() - start < 1:
status = collector_status()
if status == Status.HEALTHY:
return True
time.sleep(0.05)
return False
def check_outdir(subcmd, out_dir): def check_outdir(subcmd, out_dir):
ninja_marker = os.path.join(out_dir, ".ninja_deps") ninja_marker = os.path.join(out_dir, ".ninja_deps")
if os.path.exists(ninja_marker): if os.path.exists(ninja_marker):

View File

@@ -1,5 +1,5 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
# Copyright (c) 2025 The Chromium Authors. All rights reserved. # Copyright (c) 2024 The Chromium Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be # Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file. # found in the LICENSE file.
@@ -10,6 +10,7 @@ import sys
import unittest import unittest
import platform import platform
from unittest import mock from unittest import mock
import subprocess
ROOT_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) ROOT_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
sys.path.insert(0, ROOT_DIR) sys.path.insert(0, ROOT_DIR)
@@ -24,9 +25,12 @@ class SisoTest(trial_dir.TestCase):
super().setUp() super().setUp()
self.previous_dir = os.getcwd() self.previous_dir = os.getcwd()
os.chdir(self.root_dir) os.chdir(self.root_dir)
self.patchers_to_stop = []
def tearDown(self): def tearDown(self):
os.chdir(self.previous_dir) os.chdir(self.previous_dir)
for patcher in reversed(self.patchers_to_stop):
patcher.stop()
super().tearDown() super().tearDown()
def test_load_sisorc_no_file(self): def test_load_sisorc_no_file(self):
@@ -591,6 +595,438 @@ ninja --failure_verbose=false -k=0
) )
]) ])
def _start_collector_mocks(self):
patchers = {
'is_subcommand_present':
mock.patch('siso._is_subcommand_present', return_value=True),
'subprocess_run':
mock.patch('siso.subprocess.run'),
'kill_collector':
mock.patch('siso._kill_collector'),
'time_sleep':
mock.patch('siso.time.sleep'),
'time_time':
mock.patch('siso.time.time'),
'http_connection':
mock.patch('siso.http.client.HTTPConnection'),
'subprocess_popen':
mock.patch('siso.subprocess.Popen'),
}
mocks = {}
for name, patcher in patchers.items():
mocks[name] = patcher.start()
self.patchers_to_stop.append(patcher)
# Make time advance quickly to prevent test timeouts.
mocks['time_time'].side_effect = (1000 + i * 0.1 for i in range(100))
m = mock.MagicMock()
for name, mocked in mocks.items():
setattr(m, name, mocked)
m.mock_conn = mock.Mock()
m.http_connection.return_value = m.mock_conn
return m
def _configure_http_responses(self,
mock_conn,
status_responses,
config_responses=None):
if config_responses is None:
config_responses = []
request_path_history = []
def request_side_effect(method, path):
request_path_history.append(path)
def getresponse_side_effect():
path = request_path_history[-1]
if path == '/health/status':
if not status_responses:
return mock.Mock(status=404,
read=mock.Mock(return_value=b''))
status_code, _ = status_responses.pop(0)
return mock.Mock(status=status_code,
read=mock.Mock(return_value=b'')
) # Data will be handled by json_loads mock
if path == '/health/config':
if not config_responses:
return mock.Mock(status=200,
read=mock.Mock(return_value=b'{}'))
status_code, _ = config_responses.pop(0)
return mock.Mock(status=status_code,
read=mock.Mock(return_value=b'')
) # Data will be handled by json_loads mock
return mock.Mock(status=404)
mock_conn.request.side_effect = request_side_effect
mock_conn.getresponse.side_effect = getresponse_side_effect
def test_start_collector_subcommand_not_present(self):
m = self._start_collector_mocks()
siso_path = "siso_path"
project = "test-project"
result = siso._start_collector(siso_path, None, project)
self.assertFalse(result)
m.is_subcommand_present.assert_called_once_with(siso_path, 'collector')
@mock.patch('siso.platform.system', return_value='Linux')
@mock.patch('siso.json.loads')
def test_start_collector_dead_then_healthy(self, mock_json_loads,
_mock_system):
m = self._start_collector_mocks()
siso_path = "siso_path"
project = "test-project"
self._configure_http_responses(m.mock_conn,
status_responses=[(404, None),
(200, None)],
config_responses=[(200, None),
(200, None)])
status_healthy = {'healthy': True, 'status': 'StatusOK'}
config_project_full = {
'exporters': {
'googlecloud': {
'project': project
}
},
'receivers': {
'otlp': {
'protocols': {
'grpc': {
'endpoint': siso._OTLP_DEFAULT_TCP_ENDPOINT
}
}
}
}
}
mock_json_loads.side_effect = [
status_healthy, config_project_full, config_project_full
]
result = siso._start_collector(siso_path, None, project)
self.assertTrue(result)
m.subprocess_popen.assert_called_once_with(
[siso_path, "collector", "--project", project],
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
start_new_session=True,
creationflags=0)
m.kill_collector.assert_not_called()
@mock.patch('siso.platform.system', return_value='Linux')
@mock.patch('siso.json.loads')
def test_start_collector_unhealthy_then_healthy(self, mock_json_loads,
_mock_system):
m = self._start_collector_mocks()
siso_path = "siso_path"
project = "test-project"
self._configure_http_responses(m.mock_conn,
status_responses=[(200, None),
(200, None)],
config_responses=[(200, None),
(200, None)])
status_unhealthy = {'healthy': False, 'status': 'NotOK'}
status_healthy = {'healthy': True, 'status': 'StatusOK'}
config_project_full = {
'exporters': {
'googlecloud': {
'project': project
}
},
'receivers': {
'otlp': {
'protocols': {
'grpc': {
'endpoint': siso._OTLP_DEFAULT_TCP_ENDPOINT
}
}
}
}
}
mock_json_loads.side_effect = [
status_unhealthy, status_healthy, config_project_full,
config_project_full
]
result = siso._start_collector(siso_path, None, project)
self.assertTrue(result)
m.subprocess_popen.assert_called_once_with(
[siso_path, "collector", "--project", project],
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
start_new_session=True,
creationflags=0)
m.kill_collector.assert_called_once()
@mock.patch('siso.platform.system', return_value='Windows')
@mock.patch('siso.json.loads')
def test_start_collector_dead_then_healthy_windows(self, mock_json_loads,
_mock_system):
m = self._start_collector_mocks()
siso_path = "siso_path"
project = "test-project"
self._configure_http_responses(m.mock_conn,
status_responses=[(404, None),
(200, None)],
config_responses=[(200, None),
(200, None)])
status_healthy = {'healthy': True, 'status': 'StatusOK'}
config_project_full = {
'exporters': {
'googlecloud': {
'project': project
}
},
'receivers': {
'otlp': {
'protocols': {
'grpc': {
'endpoint': siso._OTLP_DEFAULT_TCP_ENDPOINT
}
}
}
}
}
mock_json_loads.side_effect = [
status_healthy, config_project_full, config_project_full
]
# On non-Windows platforms, subprocess.CREATE_NEW_PROCESS_GROUP does not exist.
# We mock it here to make the test runnable on all platforms.
with mock.patch('subprocess.CREATE_NEW_PROCESS_GROUP', 512,
create=True):
result = siso._start_collector(siso_path, None, project)
self.assertTrue(result)
m.subprocess_popen.assert_called_once_with(
[siso_path, "collector", "--project", project],
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
start_new_session=True,
creationflags=subprocess.CREATE_NEW_PROCESS_GROUP)
m.kill_collector.assert_not_called()
@mock.patch('siso.platform.system', return_value='Linux')
@mock.patch('siso.json.loads')
def test_start_collector_wrong_project_then_healthy(self, mock_json_loads,
_mock_system):
m = self._start_collector_mocks()
siso_path = "siso_path"
project = "test-project"
self._configure_http_responses(m.mock_conn,
status_responses=[(200, None),
(200, None)],
config_responses=[(200, None),
(200, None),
(200, None)])
status_healthy = {'healthy': True, 'status': 'StatusOK'}
config_wrong_project_full = {
'exporters': {
'googlecloud': {
'project': 'wrong-project'
}
},
'receivers': {
'otlp': {
'protocols': {
'grpc': {
'endpoint': siso._OTLP_DEFAULT_TCP_ENDPOINT
}
}
}
}
}
config_project_full = {
'exporters': {
'googlecloud': {
'project': project
}
},
'receivers': {
'otlp': {
'protocols': {
'grpc': {
'endpoint': siso._OTLP_DEFAULT_TCP_ENDPOINT
}
}
}
}
}
mock_json_loads.side_effect = [
status_healthy, config_wrong_project_full, status_healthy,
config_project_full, config_project_full
]
result = siso._start_collector(siso_path, None, project)
self.assertTrue(result)
m.subprocess_popen.assert_called_once_with(
[siso_path, "collector", "--project", project],
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
start_new_session=True,
creationflags=0)
m.kill_collector.assert_called_once()
@mock.patch('siso.json.loads')
def test_start_collector_already_healthy(self, mock_json_loads):
m = self._start_collector_mocks()
siso_path = "siso_path"
project = "test-project"
self._configure_http_responses(m.mock_conn,
status_responses=[(200, None)],
config_responses=[(200, None),
(200, None)])
status_healthy = {'healthy': True, 'status': 'StatusOK'}
config_project_full = {
'exporters': {
'googlecloud': {
'project': project
}
},
'receivers': {
'otlp': {
'protocols': {
'grpc': {
'endpoint': siso._OTLP_DEFAULT_TCP_ENDPOINT
}
}
}
}
}
mock_json_loads.side_effect = [
status_healthy, config_project_full, config_project_full
]
result = siso._start_collector(siso_path, None, project)
self.assertTrue(result)
m.subprocess_popen.assert_not_called()
m.kill_collector.assert_not_called()
@mock.patch('siso.platform.system', return_value='Linux')
def test_start_collector_never_healthy(self, _mock_system):
m = self._start_collector_mocks()
siso_path = "siso_path"
project = "test-project"
self._configure_http_responses(m.mock_conn,
status_responses=[(404, None)])
siso._start_collector(siso_path, None, project)
m.subprocess_popen.assert_called_once_with(
[siso_path, "collector", "--project", project],
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
start_new_session=True,
creationflags=0)
m.kill_collector.assert_not_called()
@mock.patch('siso.platform.system', return_value='Linux')
@mock.patch('siso.json.loads')
def test_start_collector_healthy_after_retries(self, mock_json_loads,
_mock_system):
m = self._start_collector_mocks()
siso_path = "siso_path"
project = "test-project"
self._configure_http_responses(m.mock_conn,
status_responses=[(404, None), (404,
None),
(404, None),
(200, None)],
config_responses=[(200, None),
(200, None)])
status_healthy = {'healthy': True, 'status': 'StatusOK'}
config_project_full = {
'exporters': {
'googlecloud': {
'project': project
}
},
'receivers': {
'otlp': {
'protocols': {
'grpc': {
'endpoint': siso._OTLP_DEFAULT_TCP_ENDPOINT
}
}
}
}
}
mock_json_loads.side_effect = [
status_healthy, config_project_full, config_project_full
]
result = siso._start_collector(siso_path, None, project)
self.assertTrue(result)
m.subprocess_popen.assert_called_once_with(
[siso_path, "collector", "--project", project],
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
start_new_session=True,
creationflags=0)
m.kill_collector.assert_not_called()
@mock.patch('siso.platform.system', return_value='Linux')
@mock.patch('siso.json.loads')
def test_start_collector_with_sockets_file(self, mock_json_loads,
_mock_system):
m = self._start_collector_mocks()
siso_path = "siso_path"
project = "test-project"
sockets_file = "/tmp/test-socket.sock"
self._configure_http_responses(m.mock_conn,
status_responses=[(404, None),
(200, None)],
config_responses=[(200, None),
(200, None)])
status_healthy = {'healthy': True, 'status': 'StatusOK'}
config_project_full_with_socket = {
'exporters': {
'googlecloud': {
'project': project
}
},
'receivers': {
'otlp': {
'protocols': {
'grpc': {
'endpoint': sockets_file
}
}
}
}
}
mock_json_loads.side_effect = [
status_healthy, config_project_full_with_socket,
config_project_full_with_socket
]
result = siso._start_collector(siso_path, sockets_file, project)
self.assertTrue(result)
m.subprocess_popen.assert_called_once_with([
siso_path, "collector", "--project", project, "--otel_socket",
sockets_file
],
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
start_new_session=True,
creationflags=0)
m.kill_collector.assert_not_called()
if __name__ == '__main__': if __name__ == '__main__':
# Suppress print to console for unit tests. # Suppress print to console for unit tests.
unittest.main(buffer=True) unittest.main(buffer=True)