Files
chromium_depot_tools/owners_client.py
Devon Loehr c48f866fcf Select a minimal number of owners for a set of files
This CL adds a function which takes a set of files, and attempts to
select a single owner for all of them. If it cannot, it falls back to
the standard owner selection algorithm, which may result in more owners
being chosen than necessary, but guarantees that a valid set of owners
is always returned.

Bug: 389069356
Change-Id: I985804040f149a02bfb5b3c6b946602a81334e7c
Reviewed-on: https://chromium-review.googlesource.com/c/chromium/tools/depot_tools/+/6321289
Reviewed-by: Josip Sokcevic <sokcevic@chromium.org>
Commit-Queue: Devon Loehr <dloehr@google.com>
2025-03-04 12:19:30 -08:00

228 lines
8.3 KiB
Python

# Copyright (c) 2020 The Chromium Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
import os
import random
import gerrit_util
import git_common
class OwnersClient(object):
"""Interact with OWNERS files in a repository.
This class allows you to interact with OWNERS files in a repository both the
Gerrit Code-Owners plugin REST API, and the owners database implemented by
Depot Tools in owners.py:
- List all the owners for a group of files.
- Check if files have been approved.
- Suggest owners for a group of files.
All code should use this class to interact with OWNERS files instead of the
owners database in owners.py
"""
# '*' means that everyone can approve.
EVERYONE = '*'
# Possible status of a file.
# - INSUFFICIENT_REVIEWERS: The path needs owners approval, but none of its
# owners is currently a reviewer of the change.
# - PENDING: An owner of this path has been added as reviewer, but approval
# has not been given yet.
# - APPROVED: The path has been approved by an owner.
APPROVED = 'APPROVED'
PENDING = 'PENDING'
INSUFFICIENT_REVIEWERS = 'INSUFFICIENT_REVIEWERS'
def ListOwners(self, path):
"""List all owners for a file.
The returned list is sorted so that better owners appear first.
"""
raise Exception('Not implemented')
def BatchListOwners(self, paths):
"""List all owners for a group of files.
Returns a dictionary {path: [owners]}.
"""
if not paths:
return dict()
nproc = min(gerrit_util.MAX_CONCURRENT_CONNECTION, len(paths))
with git_common.ScopedPool(nproc, kind='threads') as pool:
return dict(
pool.imap_unordered(lambda p: (p, self.ListOwners(p)), paths))
def GetFilesApprovalStatus(self, paths, approvers, reviewers):
"""Check the approval status for the given paths.
Utility method to check for approval status when a change has not yet
been created, given reviewers and approvers.
See GetChangeApprovalStatus for description of the returned value.
"""
approvers = set(approvers)
if approvers:
approvers.add(self.EVERYONE)
reviewers = set(reviewers)
if reviewers:
reviewers.add(self.EVERYONE)
status = {}
owners_by_path = self.BatchListOwners(paths)
for path, owners in owners_by_path.items():
owners = set(owners)
if owners.intersection(approvers):
status[path] = self.APPROVED
elif owners.intersection(reviewers):
status[path] = self.PENDING
else:
status[path] = self.INSUFFICIENT_REVIEWERS
return status
def ScoreOwners(self, paths, exclude=None):
"""Get sorted list of owners for the given paths."""
if not paths:
return []
exclude = exclude or []
owners = []
queues = self.BatchListOwners(paths).values()
for i in range(max(len(q) for q in queues)):
for q in queues:
if i < len(q) and q[i] not in owners and q[i] not in exclude:
owners.append(q[i])
return owners
def SuggestOwners(self, paths, exclude=None):
"""Suggest a set of owners for the given paths."""
exclude = exclude or []
paths_by_owner = {}
owners_by_path = self.BatchListOwners(paths)
for path, owners in owners_by_path.items():
for owner in owners:
paths_by_owner.setdefault(owner, set()).add(path)
selected = []
missing = set(paths)
for owner in self.ScoreOwners(paths, exclude=exclude):
missing_len = len(missing)
missing.difference_update(paths_by_owner[owner])
if missing_len > len(missing):
selected.append(owner)
if not missing:
break
return selected
def SuggestMinimalOwners(self,
paths: list[str],
exclude: list[str] = None) -> list[str]:
"""
Suggest a set of owners for the given paths. Never return an owner in
the |exclude| list.
Aims to provide only one, but will provide more if it's unable to
find a common owner.
"""
exclude = exclude or []
owners_by_path = self.BatchListOwners(paths)
if not owners_by_path:
return []
common_owners = set(owners_by_path.popitem()[1]) - set(exclude)
for _, owners in owners_by_path.items():
common_owners = common_owners.intersection(set(owners))
if not common_owners:
# This likely means some of the files had `noparent` set.
# Fall back to the default suggestion algorithm, which accounts
# for noparent but is liable to return many different owners
return self.SuggestOwners(paths, exclude)
# Return an arbitrary common owner, preferring those with a good score
sorted_common_owners = [
owner for owner in self.ScoreOwners(paths, exclude=exclude)
if owner in common_owners
]
# Return a singleton list so this function has a consistent return type
return sorted_common_owners[:1]
class GerritClient(OwnersClient):
"""Implement OwnersClient using OWNERS REST API."""
def __init__(self, host, project, branch):
super(GerritClient, self).__init__()
self._host = host
self._project = project
self._branch = branch
self._owners_cache = {}
self._best_owners_cache = {}
# Seed used by Gerrit to shuffle code owners that have the same score.
# Can be used to make the sort order stable across several requests,
# e.g. to get the same set of random code owners for different file
# paths that have the same code owners.
self._seed = random.getrandbits(30)
def _FetchOwners(self, path, cache, highest_score_only=False):
# Always use slashes as separators.
path = path.replace(os.sep, '/')
if path not in cache:
# GetOwnersForFile returns a list of account details sorted by order
# of best reviewer for path. If owners have the same score, the
# order is random, seeded by `self._seed`.
data = gerrit_util.GetOwnersForFile(
self._host,
self._project,
self._branch,
path,
resolve_all_users=False,
highest_score_only=highest_score_only,
seed=self._seed)
cache[path] = [
d['account']['email'] for d in data['code_owners']
if 'account' in d and 'email' in d['account']
]
# If owned_by_all_users is true, add everyone as an owner at the end
# of the owners list.
if data.get('owned_by_all_users', False):
cache[path].append(self.EVERYONE)
return cache[path]
def ListOwners(self, path):
return self._FetchOwners(path, self._owners_cache)
def ListBestOwners(self, path):
return self._FetchOwners(path,
self._best_owners_cache,
highest_score_only=True)
def BatchListBestOwners(self, paths):
"""List only the higest-scoring owners for a group of files.
Returns a dictionary {path: [owners]}.
"""
with git_common.ScopedPool(kind='threads') as pool:
return dict(
pool.imap_unordered(lambda p: (p, self.ListBestOwners(p)),
paths))
def GetCodeOwnersClient(host, project, branch):
"""Get a new OwnersClient.
Uses GerritClient and raises an exception if code-owners plugin is not
available."""
if gerrit_util.IsCodeOwnersEnabledOnHost(host):
return GerritClient(host, project, branch)
raise Exception(
'code-owners plugin is not enabled. Ask your host admin to enable it '
'on %s. Read more about code-owners at '
'https://chromium-review.googlesource.com/'
'plugins/code-owners/Documentation/index.html.' % host)