From patchwork Mon Mar 16 15:47:23 2026 Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 8bit X-Patchwork-Submitter: Simon Glass X-Patchwork-Id: 2011 Return-Path: X-Original-To: u-boot-concept@u-boot.org Delivered-To: u-boot-concept@u-boot.org DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/simple; d=u-boot.org; s=default; t=1773678335; bh=wZasqr/2GybPEUdlIDUdeFAAeTXKyNoRI+VJimzDowQ=; h=From:To:Date:In-Reply-To:References:CC:Subject:List-Id: List-Archive:List-Help:List-Owner:List-Post:List-Subscribe: List-Unsubscribe:From; b=ay5RvBUaUdyNqCTJ2Z6LWcLoycYyBHl78gnSKu8fc3zPQPwy0PibISj4wp5Oata20 /1iNkmoTYXiZjmWrA6v+7VMpjxDpq+Pqq+h+1HlSzVO6/1M18eT3+B9cj5YOVNX/Ea 9PI7cJLrT1KGAZ/uu0SC28NWBkSywOYQH9+R9cUc/h+tuq3Eajoq+w2Br4v9/lKGws Iv7ZyavbM6t4p08E0Blf0qEXQ3dQoiQ2eOYMmE6D88rApXMla9ukDhAXo8ihwrbklh 7D/QMQgPzBEc7V3vXim9yacndn2NVoXcoX/VNN2tqtTEADIYrIZaxqg4dSJ7ydbC55 EuQO0WU4yOpXA== Received: from localhost (localhost [127.0.0.1]) by mail.u-boot.org (Postfix) with ESMTP id 94F8C6A08D for ; Mon, 16 Mar 2026 10:25:35 -0600 (MDT) X-Virus-Scanned: Debian amavis at Received: from mail.u-boot.org ([127.0.0.1]) by localhost (mail.u-boot.org [127.0.0.1]) (amavis, port 10024) with ESMTP id En9T12XnlHMv for ; Mon, 16 Mar 2026 10:25:35 -0600 (MDT) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/simple; d=u-boot.org; s=default; t=1773678335; bh=wZasqr/2GybPEUdlIDUdeFAAeTXKyNoRI+VJimzDowQ=; h=From:To:Date:In-Reply-To:References:CC:Subject:List-Id: List-Archive:List-Help:List-Owner:List-Post:List-Subscribe: List-Unsubscribe:From; b=ay5RvBUaUdyNqCTJ2Z6LWcLoycYyBHl78gnSKu8fc3zPQPwy0PibISj4wp5Oata20 /1iNkmoTYXiZjmWrA6v+7VMpjxDpq+Pqq+h+1HlSzVO6/1M18eT3+B9cj5YOVNX/Ea 9PI7cJLrT1KGAZ/uu0SC28NWBkSywOYQH9+R9cUc/h+tuq3Eajoq+w2Br4v9/lKGws Iv7ZyavbM6t4p08E0Blf0qEXQ3dQoiQ2eOYMmE6D88rApXMla9ukDhAXo8ihwrbklh 7D/QMQgPzBEc7V3vXim9yacndn2NVoXcoX/VNN2tqtTEADIYrIZaxqg4dSJ7ydbC55 EuQO0WU4yOpXA== Received: from mail.u-boot.org (localhost [127.0.0.1]) by mail.u-boot.org (Postfix) with ESMTP id 62B286A07C for ; Mon, 16 Mar 2026 10:25:35 -0600 (MDT) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/simple; d=u-boot.org; s=default; t=1773676147; bh=Ab+tzy0WY+M37nKcxcvYoOHg5ZSPOEZhrtrQY/INFkM=; h=From:To:Cc:Subject:Date:In-Reply-To:References:From; b=vOSEVaykF9PqwHtGFQw4e6P/h6bJwtBl381uYcIQQDTcKxUv10svSyam2O59h9Yyf Dm1aO1MNXpz4CEBg1t6/Zd9PrUWAwVXriI3KStadln3dZpAiHQK24Y6nPA6tFsyKrN BerpDIQk7GemR+IdgEc4wCmbgEuiXP0Jd8h+KiV9KIG+ojBdLWQxv8KQ0vMBqPKdnl ulbjfy92fw40YtmfE6ZgdEF5PnyUA803p4rmsqXj4c8R0U+EvalNy/yquFTwUI/T8Y HqFqnEiG9oc5udm1VEMsZNPBDekgtgf1jJzp7HetY6F9zb/dL9b3MkDPEHa/utqE3P jLoBdFJNge8tw== Received: from localhost (localhost [127.0.0.1]) by mail.u-boot.org (Postfix) with ESMTP id E992F6A07B; Mon, 16 Mar 2026 09:49:07 -0600 (MDT) X-Virus-Scanned: Debian amavis at Received: from mail.u-boot.org ([127.0.0.1]) by localhost (mail.u-boot.org [127.0.0.1]) (amavis, port 10026) with ESMTP id fgb26LBX0FNR; Mon, 16 Mar 2026 09:49:07 -0600 (MDT) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/simple; d=u-boot.org; s=default; t=1773676143; bh=45SccaPxO5PwXyACmAHrUg1UtDr0ZoF09v815cuF0Vk=; h=From:To:Cc:Subject:Date:In-Reply-To:References:From; b=AISM13MSyMgMm6edSyBElcob6rnaxprx9d3m5duFzutfOZIW89CjYebmuHJbFNO/H 5r/UYDCpIEFigqro5htUNrOcr/UzASZfstvqs/S3tRyRwO5lh7EtdchBcoRHhHnzJ2 1boAM9Ka8ucla9iVX5lMQOLZH1RXEeUBf0XIwcAUlCJPN62739fPvN88HKGlWa02CP 55mN3yIn00fupiCfmHoLsDlFc/tN1hD7MdSCr7TDLucxYN+H4ovWGYAnOTCFOPWn7c mJHYP3SSZzBKFRq40Js4bgTF1KGKbK9rHvrL7tF/b1xmjB5M/ovvqO/vt8g+OKes/D wlW1S838ln2hQ== Received: from u-boot.org (unknown [73.34.74.121]) by mail.u-boot.org (Postfix) with ESMTPSA id 036906A098; Mon, 16 Mar 2026 09:49:02 -0600 (MDT) From: Simon Glass To: U-Boot Concept Date: Mon, 16 Mar 2026 09:47:23 -0600 Message-ID: <20260316154733.1587261-19-sjg@u-boot.org> X-Mailer: git-send-email 2.43.0 In-Reply-To: <20260316154733.1587261-1-sjg@u-boot.org> References: <20260316154733.1587261-1-sjg@u-boot.org> MIME-Version: 1.0 X-MailFrom: sjg@u-boot.org X-Mailman-Rule-Hits: max-size X-Mailman-Rule-Misses: dmarc-mitigation; no-senders; approved; loop; banned-address; emergency; member-moderation; nonmember-moderation; administrivia; implicit-dest; max-recipients; news-moderation; no-subject; digests; suspicious-header Message-ID-Hash: H6MITGRSOTAM5N4QREUH3HU6YVATENTT X-Message-ID-Hash: H6MITGRSOTAM5N4QREUH3HU6YVATENTT X-Mailman-Approved-At: Mon, 16 Mar 2026 16:25:33 -0600 CC: Simon Glass X-Mailman-Version: 3.3.10 Precedence: list Subject: [Concept] [PATCH 18/20] buildman: Add boss module for driving remote workers List-Id: Discussion and patches related to U-Boot Concept Archived-At: List-Archive: List-Help: List-Owner: List-Post: List-Subscribe: List-Unsubscribe: From: Simon Glass Add the boss side of the distributed build protocol. The RemoteWorker class manages a persistent SSH connection to a remote buildman worker, providing methods to set up a work directory, push source via git, send build commands and receive results. WorkerPool manages parallel worker startup, source push, build settings distribution, demand-driven board dispatch and result collection. Each worker gets boards from a shared pool as it finishes previous ones, so faster workers naturally get more work. Features include capacity-weighted board distribution (nthreads * bogomips), per-worker log files, build timeouts via reader threads, transfer statistics, and clean shutdown on Ctrl-C via two-phase quit. Tests are included in test_boss.py covering protocol handling, board dispatch, worker lifecycle and error paths. Signed-off-by: Simon Glass --- tools/buildman/boss.py | 1507 ++++++++++++++++++ tools/buildman/main.py | 2 + tools/buildman/test_boss.py | 2645 ++++++++++++++++++++++++++++++++ tools/buildman/test_machine.py | 62 +- 4 files changed, 4195 insertions(+), 21 deletions(-) create mode 100644 tools/buildman/boss.py create mode 100644 tools/buildman/test_boss.py -- 2.43.0 diff --git a/tools/buildman/boss.py b/tools/buildman/boss.py new file mode 100644 index 00000000000..b1d15fec28f --- /dev/null +++ b/tools/buildman/boss.py @@ -0,0 +1,1507 @@ +# SPDX-License-Identifier: GPL-2.0+ +# Copyright 2026 Simon Glass +# pylint: disable=C0302 + +"""Boss side of the distributed build protocol + +Manages SSH connections to remote workers and communicates using the JSON-lines +protocol defined in worker.py. Each RemoteWorker wraps a persistent SSH process +whose stdin/stdout carry the protocol messages. + +Typical usage: + w = RemoteWorker('myhost', buildman_path='buildman') + w.start() # launches ssh, waits for 'ready' + w.setup() # worker creates git repo + w.push_source(git_dir, ref) # git push to the worker's repo + w.build('sandbox', commit='abc123', commit_upto=0) + result = w.recv() # {'resp': 'build_result', ...} + w.quit() +""" + +import datetime +import json +import os +import queue +import subprocess +import sys +import threading +import time + +from buildman import builderthread +from buildman import worker as worker_mod +from u_boot_pylib import command +from u_boot_pylib import tools +from u_boot_pylib import tout + +# SSH options shared with machine.py +SSH_OPTS = [ + '-o', 'BatchMode=yes', + '-o', 'StrictHostKeyChecking=accept-new', +] + +# Per-build timeout in seconds. If a worker doesn't respond within this +# time, the boss assumes the worker is dead or hung and stops using it. +BUILD_TIMEOUT = 300 + +# Interval in seconds between status summaries in the boss log +STATUS_INTERVAL = 60 + + +class BossError(Exception): + """Error communicating with a remote worker""" + + +class WorkerBusy(BossError): + """Worker machine is already in use by another boss""" + + +def _run_ssh(hostname, remote_cmd, timeout=10): + """Run a one-shot SSH command on a remote host + + Args: + hostname (str): SSH hostname + remote_cmd (str): Shell command to run on the remote host + timeout (int): SSH connect timeout in seconds + + Returns: + str: stdout from the command + + Raises: + BossError: if the command fails + """ + ssh_cmd = [ + 'ssh', + '-o', f'ConnectTimeout={timeout}', + ] + SSH_OPTS + [hostname, '--', remote_cmd] + try: + result = command.run_pipe( + [ssh_cmd], capture=True, capture_stderr=True, + raise_on_error=True) + return result.stdout.strip() if result.stdout else '' + except command.CommandExc as exc: + raise BossError(f'SSH command failed on {hostname}: {exc}') from exc + + +def kill_workers(machines): + """Kill stale worker processes and remove lock files on remote machines + + Connects to each machine via SSH, kills any running worker processes + and removes the lock file. Useful for cleaning up after a failed or + interrupted distributed build. + + Args: + machines (list of str): SSH hostnames to clean up + + Returns: + int: 0 on success + """ + + results = {} + lock = threading.Lock() + + def _kill_one(hostname): + kill_script = ('pids=$(pgrep -f "[p]ython3.*--worker" 2>/dev/null); ' + 'if [ -n "$pids" ]; then ' + ' kill $pids 2>/dev/null; ' + ' echo "killed $pids"; ' + 'else ' + ' echo "no workers"; ' + 'fi; ' + 'rm -f ~/dev/.bm-worker/.lock' + ) + try: + output = _run_ssh(hostname, kill_script) + with lock: + results[hostname] = output + except BossError as exc: + with lock: + results[hostname] = f'FAILED: {exc}' + + threads = [] + for hostname in machines: + thr = threading.Thread(target=_kill_one, args=(hostname,)) + thr.start() + threads.append(thr) + for thr in threads: + thr.join() + + for hostname, output in sorted(results.items()): + print(f' {hostname}: {output}') + return 0 + + +class RemoteWorker: # pylint: disable=R0902 + """Manages one SSH connection to a remote buildman worker + + The startup sequence is: + 1. init_git() - create a bare git repo on the remote via one-shot SSH + 2. push_source() - git push the local tree to the remote repo + 3. start() - launch the worker from the pushed tree + + This ensures the worker runs the same version of buildman as the boss. + + Attributes: + hostname (str): SSH hostname (user@host or just host) + nthreads (int): Number of build threads the worker reported + git_dir (str): Path to the worker's git directory + work_dir (str): Path to the worker's work directory + """ + + def __init__(self, hostname, timeout=10, name=None): + """Create a new remote worker connection + + Args: + hostname (str): SSH hostname + timeout (int): SSH connect timeout in seconds + name (str or None): Short display name, defaults to hostname + """ + self.hostname = hostname + self.name = name or hostname + self.timeout = timeout + self.nthreads = 0 + self.slots = 1 + self.max_boards = 0 + self.bogomips = 0.0 + self.git_dir = '' + self.work_dir = '' + self.toolchains = {} + self.closing = False + self.bytes_sent = 0 + self.bytes_recv = 0 + self._proc = None + self._stderr_lines = [] + self._stderr_thread = None + + def init_git(self, work_dir='~/dev/.bm-worker'): + """Ensure a git repo exists on the remote host via one-shot SSH + + Reuses an existing repo if present, so that subsequent pushes + only transfer the delta. Creates a lock file to prevent two + bosses from using the same worker simultaneously. A lock is + considered stale if no worker process is running. + + Args: + work_dir (str): Fixed path for the work directory + + Raises: + WorkerBusy: if another boss holds the lock + BossError: if the SSH command fails + """ + lock = f'{work_dir}/.lock' + init_script = (f'mkdir -p {work_dir} && ' + # Check for lock — stale if no worker process is running + f'if [ -f {lock} ]; then ' + f' if pgrep -f "[p]ython3.*--worker" >/dev/null 2>&1; then ' + f' echo BUSY; exit 0; ' + f' fi; ' + f' rm -f {lock}; ' + f'fi && ' + # Create lock and init git + f'date +%s > {lock} && ' + f'(test -d {work_dir}/.git || git init -q {work_dir}) && ' + f'git -C {work_dir} config ' + f'receive.denyCurrentBranch updateInstead && ' + f'echo {work_dir}' + ) + output = _run_ssh(self.hostname, init_script, self.timeout) + if not output: + raise BossError( + f'init_git on {self.hostname} returned no work directory') + last_line = output.splitlines()[-1].strip() + if last_line == 'BUSY': + raise WorkerBusy(f'{self.hostname} is busy (locked)') + self.work_dir = last_line + self.git_dir = os.path.join(self.work_dir, '.git') + + def start(self, debug=False): + """Start the worker from the pushed source tree + + Launches the worker using the buildman from the pushed git tree. + The source must already have been pushed via init_git() and + push_source(). + + A background thread forwards the worker's stderr to the boss's + stderr, prefixed with the machine name, so that debug messages + and errors are always visible. + + Args: + debug (bool): True to pass -D to the worker for tracebacks + + Raises: + BossError: if the SSH connection or worker startup fails + """ + if not self.work_dir: + raise BossError(f'No work_dir on {self.hostname} ' + f'(call init_git and push_source first)') + worker_cmd = 'python3 tools/buildman/main.py --worker' + if debug: + worker_cmd += ' -D' + ssh_cmd = [ + 'ssh', + '-o', f'ConnectTimeout={self.timeout}', + ] + SSH_OPTS + [ + self.hostname, '--', + f'cd {self.work_dir} && git checkout -qf work && ' + f'{worker_cmd}', + ] + try: + # pylint: disable=R1732 + self._proc = subprocess.Popen( + ssh_cmd, stdin=subprocess.PIPE, stdout=subprocess.PIPE, + stderr=subprocess.PIPE) + except OSError as exc: + raise BossError( + f'Failed to start SSH to {self.hostname}: {exc}') from exc + + # Forward worker stderr in a background thread so debug messages + # and errors are always visible + self._stderr_lines = [] + self._stderr_thread = threading.Thread( + target=self._forward_stderr, daemon=True) + self._stderr_thread.start() + + resp = self._recv() + if resp.get('resp') != 'ready': + self.close() + raise BossError( + f'Worker on {self.hostname} did not send ready: {resp}') + self.nthreads = resp.get('nthreads', 1) + self.slots = resp.get('slots', 1) + if not self.max_boards: + self.max_boards = self.nthreads + + def _forward_stderr(self): + """Forward worker stderr to boss stderr with machine name prefix + + Runs in a background thread. Saves lines for _get_stderr() too. + """ + try: + for raw in self._proc.stderr: + line = raw.decode('utf-8', errors='replace').rstrip('\n') + if line: + self._stderr_lines.append(line) + sys.stderr.write(f'[{self.name}] {line}\n') + sys.stderr.flush() + except (OSError, ValueError): + pass + + def _send(self, obj): + """Send a JSON command to the worker + + Args: + obj (dict): Command object to send + + Raises: + BossError: if the SSH process is not running + """ + if not self._proc or self._proc.poll() is not None: + raise BossError(f'Worker on {self.hostname} is not running') + line = json.dumps(obj) + '\n' + data = line.encode('utf-8') + self.bytes_sent += len(data) + self._proc.stdin.write(data) + self._proc.stdin.flush() + + def _recv(self): + """Read the next protocol response from the worker + + Reads lines from stdout, skipping any that don't start with the + 'BM> ' prefix (e.g. SSH banners). + + Returns: + dict: Parsed JSON response + + Raises: + BossError: if the worker closes the connection or sends bad data + """ + while True: + raw = self._proc.stdout.readline() + if not raw: + stderr = self._get_stderr() + raise BossError(f'Worker on {self.hostname} closed connection' + f'{": " + stderr if stderr else ""}') + self.bytes_recv += len(raw) + line = raw.decode('utf-8', errors='replace').rstrip('\n') + if line.startswith(worker_mod.RESPONSE_PREFIX): + payload = line[len(worker_mod.RESPONSE_PREFIX):] + try: + return json.loads(payload) + except json.JSONDecodeError as exc: + raise BossError( + f'Bad JSON from {self.hostname}: {exc}') from exc + + def _get_stderr(self): + """Get the last stderr line from the worker + + Waits briefly for the stderr forwarding thread to finish + collecting output, then returns the last non-empty line. + + Returns: + str: Last non-empty line of stderr, or empty string + """ + if hasattr(self, '_stderr_thread'): + self._stderr_thread.join(timeout=2) + for line in reversed(self._stderr_lines): + if line.strip(): + return line.strip() + return '' + + def push_source(self, local_git_dir, refspec): + """Push source code to the worker's git repo + + Uses 'git push' over SSH to send commits to the worker. + + Args: + local_git_dir (str): Path to local git directory + refspec (str): Git refspec to push (e.g. 'HEAD:refs/heads/work') + + Raises: + BossError: if the push fails + """ + if not self.git_dir: + raise BossError( + f'No git_dir on {self.hostname} (call init_git first)') + push_url = f'{self.hostname}:{self.git_dir}' + try: + command.run_pipe([['git', 'push', '--force', push_url, refspec]], + capture=True, capture_stderr=True, + raise_on_error=True, cwd=local_git_dir) + except command.CommandExc as exc: + raise BossError( + f'git push to {self.hostname} failed: {exc}') from exc + + def configure(self, settings): + """Send build settings to the worker + + Sends settings that affect how make is invoked (verbose, no_lto, + allow_missing, etc.). Must be called after start() and before + any build commands. + + Args: + settings (dict): Build settings, e.g.: + verbose_build (bool): Run make with V=1 + allow_missing (bool): Pass BINMAN_ALLOW_MISSING=1 + no_lto (bool): Pass NO_LTO=1 + reproducible_builds (bool): Pass SOURCE_DATE_EPOCH=0 + warnings_as_errors (bool): Pass KCFLAGS=-Werror + mrproper (bool): Run make mrproper before config + fallback_mrproper (bool): Retry with mrproper on failure + + Raises: + BossError: if the worker rejects the settings + """ + self._send({'cmd': 'configure', 'settings': settings}) + resp = self._recv() + if resp.get('resp') != 'configure_done': + raise BossError( + f'Worker on {self.hostname} rejected configure: {resp}') + + def build_boards(self, boards, commits): + """Send a build_boards command to the worker + + Tells the worker to build all boards for each commit. The + worker handles checkout scheduling, parallelism and -j + calculation internally. + + Args: + boards (list of dict): Board info dicts with keys: + board (str): Board target name + defconfig (str): Defconfig target + env (dict): Extra environment variables + commits (list): Commit hashes in order, or [None] for + current source + """ + self._send({ + 'cmd': 'build_boards', + 'boards': boards, + 'commits': commits, + }) + + def build_prepare(self, commits): + """Send a build_prepare command to the worker + + Creates the Builder and worktrees. Follow with build_board() + calls, then build_done(). + + Args: + commits (list): Commit hashes in order, or [None] for + current source + """ + self._send({'cmd': 'build_prepare', 'commits': commits, + 'max_boards': self.max_boards}) + + def build_board(self, board, arch): + """Send a build_board command to add one board to the worker + + Args: + board (str): Board target name + arch (str): Board architecture + """ + self._send({ + 'cmd': 'build_board', + 'board': board, + 'arch': arch, + }) + + def build_done(self): + """Tell the worker no more boards are coming""" + self._send({'cmd': 'build_done'}) + + def recv(self): + """Receive the next response from the worker + + Returns: + dict: Parsed JSON response + """ + return self._recv() + + def quit(self): + """Tell the worker to quit, remove the lock and close""" + try: + self._send({'cmd': 'quit'}) + resp = self._recv() + except BossError: + resp = {} + self.close() + self.remove_lock() + return resp + + def remove_lock(self): + """Remove the lock file from the remote machine""" + if self.work_dir: + try: + _run_ssh(self.hostname, + f'rm -f {self.work_dir}/.lock', self.timeout) + except BossError: + pass + + def close(self): + """Close the SSH connection + + Closes stdin first so SSH can flush any pending data (e.g. a + quit command) to the remote, then waits briefly for SSH to + exit on its own before terminating it. + """ + if self._proc: + try: + self._proc.stdin.close() + except OSError: + pass + try: + self._proc.wait(timeout=2) + except subprocess.TimeoutExpired: + self._proc.terminate() + try: + self._proc.wait(timeout=3) + except subprocess.TimeoutExpired: + self._proc.kill() + self._proc = None + + def __repr__(self): + status = 'running' if self._proc else 'stopped' + return (f'RemoteWorker({self.hostname}, ' + f'nthreads={self.nthreads}, {status})') + + def __del__(self): + self.close() + + +def _format_bytes(nbytes): + """Format a byte count as a human-readable string""" + if nbytes < 1024: + return f'{nbytes}B' + if nbytes < 1024 * 1024: + return f'{nbytes / 1024:.1f}KB' + return f'{nbytes / (1024 * 1024):.1f}MB' + + +class _BossLog: + """Central boss log for distributed builds + + Logs major events and periodic per-worker status summaries + to boss.log in the builder output directory. + """ + + def __init__(self, base_dir): + os.makedirs(base_dir, exist_ok=True) + path = os.path.join(base_dir, '.buildman.log') + # pylint: disable=R1732 + self._logf = open(path, 'w', encoding='utf-8') + self._lock = threading.Lock() + self._stats = {} + self._timer = None + self._closed = False + + def log(self, msg): + """Write a timestamped log entry""" + with self._lock: + if self._logf: + stamp = datetime.datetime.now().strftime('%H:%M:%S') + self._logf.write(f'{stamp} {msg}\n') + self._logf.flush() + + def init_worker(self, wrk): + """Register a worker for status tracking""" + with self._lock: + self._stats[wrk.name] = { + 'sent': 0, + 'recv': 0, + 'load_avg': 0.0, + 'nthreads': wrk.nthreads, + } + + def record_sent(self, wrk_name, count=1): + """Record boards sent to a worker""" + with self._lock: + if wrk_name in self._stats: + self._stats[wrk_name]['sent'] += count + + def record_recv(self, wrk_name, load_avg=0.0): + """Record a reply received from a worker""" + with self._lock: + if wrk_name in self._stats: + self._stats[wrk_name]['recv'] += 1 + self._stats[wrk_name]['load_avg'] = load_avg + + def log_status(self): + """Log a status summary for all workers""" + with self._lock: + parts = [] + total_load = 0.0 + total_threads = 0 + total_done = 0 + total_sent = 0 + for name, st in self._stats.items(): + nthreads = st['nthreads'] + cpu_pct = (st['load_avg'] / nthreads * 100 if nthreads else 0) + parts.append(f'{name}:done={st["recv"]}/{st["sent"]}' + f' cpu={cpu_pct:.0f}%') + total_load += st['load_avg'] + total_threads += nthreads + total_done += st['recv'] + total_sent += st['sent'] + total_cpu = (total_load / total_threads * 100 + if total_threads else 0) + parts.append(f'TOTAL:done={total_done}/{total_sent}' + f' cpu={total_cpu:.0f}%') + if self._logf: + stamp = datetime.datetime.now().strftime('%H:%M:%S') + self._logf.write(f'{stamp} STATUS {", ".join(parts)}\n') + self._logf.flush() + + def start_timer(self): + """Start the periodic status timer""" + def _tick(): + if not self._closed: + self.log_status() + self._timer = threading.Timer(STATUS_INTERVAL, _tick) + self._timer.daemon = True + self._timer.start() + self._timer = threading.Timer(STATUS_INTERVAL, _tick) + self._timer.daemon = True + self._timer.start() + + def close(self): + """Stop the timer and close the log file""" + self._closed = True + if self._timer: + self._timer.cancel() + self._timer = None + with self._lock: + if self._logf: + self._logf.close() + self._logf = None + + +def split_boards(board_selected, toolchains): + """Split boards between local and remote machines + + Boards whose architecture has a toolchain on at least one remote machine + are assigned to remote workers. The rest stay local. + + Args: + board_selected (dict): target_name -> Board for all selected boards + toolchains (dict): Architecture -> gcc path on remote machines. + Combined from all machines. + + Returns: + tuple: + dict: target_name -> Board for local builds + dict: target_name -> Board for remote builds + """ + remote_archs = set(toolchains.keys()) if toolchains else set() + local = {} + remote = {} + for name, brd in board_selected.items(): + if brd.arch in remote_archs: + remote[name] = brd + else: + local[name] = brd + return local, remote + + +def _write_remote_result(builder, resp, board_selected, hostname): + """Write a remote build result and update builder progress + + Creates the same directory structure and files that BuilderThread would + create for a local build, then calls builder.process_result() to + update the progress display. + + Args: + builder (Builder): Builder object + resp (dict): build_result response from a worker + board_selected (dict): target_name -> Board, for looking up + the Board object + hostname (str): Remote machine that built this board + """ + board = resp.get('board', '') + commit_upto = resp.get('commit_upto', 0) + return_code = resp.get('return_code', 1) + stderr = resp.get('stderr', '') + + build_dir = builder.get_build_dir(commit_upto, board) + builderthread.mkdir(build_dir, parents=True) + + tools.write_file(os.path.join(build_dir, 'done'), + f'{return_code}\n', binary=False) + + err_path = os.path.join(build_dir, 'err') + if stderr: + tools.write_file(err_path, stderr, binary=False) + elif os.path.exists(err_path): + os.remove(err_path) + + tools.write_file(os.path.join(build_dir, 'log'), + resp.get('stdout', ''), binary=False) + + sizes = resp.get('sizes', {}) + if sizes.get('raw'): + # Strip any header line (starts with 'text') in case the worker + # sends raw size output including the header + raw = sizes['raw'] + lines = raw.splitlines() + if lines and lines[0].lstrip().startswith('text'): + raw = '\n'.join(lines[1:]) + if raw.strip(): + tools.write_file(os.path.join(build_dir, 'sizes'), + raw, binary=False) + + # Update the builder's progress display + brd = board_selected.get(board) + if brd: + result = command.CommandResult(stderr=stderr, return_code=return_code) + result.brd = brd + result.commit_upto = commit_upto + result.already_done = False + result.kconfig_reconfig = False + result.remote = hostname + builder.process_result(result) + + +class DemandState: # pylint: disable=R0903 + """Mutable state for a demand-driven worker build + + Tracks how many boards have been sent, received and are in-flight + for a single worker during demand-driven dispatch. + + Attributes: + sent: Total boards sent to the worker + in_flight: Boards currently being built (sent - completed) + expected: Total results expected (sent * ncommits) + received: Results received so far + board_results: Per-board result count (target -> int) + ncommits: Number of commits being built + grab_func: Callable(wrk, count) -> list of Board to get more + boards from the shared pool + """ + + def __init__(self, sent, ncommits, grab_func): + self.sent = sent + self.in_flight = sent + self.expected = sent * ncommits + self.received = 0 + self.board_results = {} + self.ncommits = ncommits + self.grab_func = grab_func + + +class _DispatchContext: + """Shared infrastructure for dispatching builds to workers + + Manages per-worker log files, worktree progress tracking, reader + threads, and result processing. Used by both _dispatch_jobs() and + _dispatch_demand() to avoid duplicating this infrastructure. + """ + + def __init__(self, workers, builder, board_selected, boss_log): + self.builder = builder + self.board_selected = board_selected + self.boss_log = boss_log + self._worktree_counts = {} + self._worktree_lock = threading.Lock() + + # Open a log file per worker + os.makedirs(builder.base_dir, exist_ok=True) + self.log_files = {} + for wrk in workers: + path = os.path.join(builder.base_dir, f'worker-{wrk.name}.log') + self.log_files[wrk] = open( # pylint: disable=R1732 + path, 'w', encoding='utf-8') + + def log(self, wrk, direction, msg): + """Write a timestamped entry to a worker's log file""" + logf = self.log_files.get(wrk) + if logf: + stamp = datetime.datetime.now().strftime('%H:%M:%S') + logf.write(f'{stamp} {direction} {msg}\n') + logf.flush() + + def update_progress(self, resp, wrk): + """Handle worktree progress messages from a worker + + Args: + resp (dict): Response from the worker + wrk (RemoteWorker): Worker that sent the response + + Returns: + bool: True if the response was a progress message + """ + resp_type = resp.get('resp') + if resp_type == 'build_started': + with self._worktree_lock: + num = resp.get('num_threads', wrk.nthreads) + self._worktree_counts[wrk.name] = (self._worktree_counts.get( + wrk.name, (0, num))[0], num) + return True + if resp_type == 'worktree_created': + with self._worktree_lock: + done, total = self._worktree_counts.get( + wrk.name, (0, wrk.nthreads)) + self._worktree_counts[wrk.name] = (done + 1, total) + self._refresh_progress() + return True + return False + + def _refresh_progress(self): + """Update the builder's progress string from worktree counts""" + with self._worktree_lock: + parts = [] + for name, (done, total) in sorted(self._worktree_counts.items()): + if done < total: + parts.append(f'{name} {done}/{total}') + self.builder.progress = ', '.join(parts) + if self.builder.progress: + self.builder.process_result(None) + + def start_reader(self, wrk): + """Start a background reader thread for a worker + + Returns: + queue.Queue: Queue that receives (status, value) tuples + """ + recv_q = queue.Queue() + + def _reader(): + while True: + try: + resp = wrk.recv() + recv_q.put(('ok', resp)) + except BossError as exc: + recv_q.put(('error', exc)) + break + except Exception: # pylint: disable=W0718 + recv_q.put(('error', BossError( + f'Worker on {wrk.name} connection lost'))) + break + + threading.Thread(target=_reader, daemon=True).start() + return recv_q + + def recv(self, wrk, recv_q): + """Get next response from queue with timeout + + Returns: + dict or None: Response, or None on error + """ + try: + status, val = recv_q.get(timeout=BUILD_TIMEOUT) + except queue.Empty: + self.log(wrk, '!!', f'Worker timed out after {BUILD_TIMEOUT}s') + if not wrk.closing: + print(f'\n Error from {wrk.name}: timed out') + return None + if status == 'error': + self.log(wrk, '!!', str(val)) + if not wrk.closing: + print(f'\n Error from {wrk.name}: {val}') + return None + resp = val + self.log(wrk, '<<', json.dumps(resp, separators=(',', ':'))) + if resp.get('resp') == 'error': + if not wrk.closing: + print(f'\n Worker error on {wrk.name}: ' + f'{resp.get("msg", "unknown")}') + return None + return resp + + def write_result(self, wrk, resp): + """Write a build result and update progress + + Returns: + bool: True on success, False on error + """ + if self.boss_log: + self.boss_log.record_recv(wrk.name, resp.get('load_avg', 0.0)) + try: + _write_remote_result( + self.builder, resp, self.board_selected, wrk.name) + except Exception as exc: # pylint: disable=W0718 + self.log(wrk, '!!', f'unexpected: {exc}') + print(f'\n Unexpected error on {wrk.name}: {exc}') + return False + return True + + def wait_for_prepare(self, wrk, recv_q): + """Wait for build_prepare_done, handling progress messages + + Returns: + bool: True if prepare succeeded + """ + while True: + resp = self.recv(wrk, recv_q) + if resp is None: + return False + if self.update_progress(resp, wrk): + continue + resp_type = resp.get('resp') + if resp_type == 'build_prepare_done': + return True + if resp_type == 'heartbeat': + continue + self.log(wrk, '!!', f'unexpected during prepare: {resp_type}') + return False + + @staticmethod + def send_batch(wrk, boards): + """Send a batch of boards to a worker + + Returns: + int: Number of boards sent, or -1 on error + """ + for brd in boards: + try: + wrk.build_board(brd.target, brd.arch) + except BossError: + return -1 + return len(boards) + + def collect_results(self, wrk, recv_q, state): + """Collect results and send more boards as threads free up + + Args: + wrk (RemoteWorker): Worker to collect from + recv_q (queue.Queue): Response queue from start_reader() + state (DemandState): Mutable build state for this worker + """ + while state.received < state.expected: + resp = self.recv(wrk, recv_q) + if resp is None: + return False + resp_type = resp.get('resp') + if resp_type == 'heartbeat': + continue + if resp_type == 'build_done': + return True + if resp_type != 'build_result': + continue + + if not self.write_result(wrk, resp): + return False + state.received += 1 + + target = resp.get('board') + results = state.board_results + results[target] = results.get(target, 0) + 1 + if results[target] == state.ncommits: + state.in_flight -= 1 + if state.in_flight < wrk.max_boards: + more = state.grab_func(wrk, 1) + if more and self.send_batch(wrk, more) > 0: + state.sent += 1 + state.in_flight += 1 + state.expected += state.ncommits + if self.boss_log: + self.boss_log.record_sent( + wrk.name, state.ncommits) + return True + + def recv_one(self, wrk, recv_q): + """Receive one build result, skipping progress messages + + Returns: + bool: True to continue, False to stop this worker + """ + while True: + resp = self.recv(wrk, recv_q) + if resp is None: + return False + if self.update_progress(resp, wrk): + continue + resp_type = resp.get('resp') + if resp_type == 'heartbeat': + continue + if resp_type == 'build_done': + nexc = resp.get('exceptions', 0) + if nexc: + self.log(wrk, '!!', f'worker finished with {nexc} ' + f'thread exception(s)') + return False + if resp_type == 'build_result': + return self.write_result(wrk, resp) + return True + + def close(self): + """Close all log files and the boss log""" + for logf in self.log_files.values(): + logf.close() + if self.boss_log: + self.boss_log.log_status() + self.boss_log.log('dispatch: end') + self.boss_log.close() + + +class WorkerPool: + """Manages a pool of remote workers for distributed builds + + Handles starting workers, pushing source, distributing build jobs + and collecting results. + + Attributes: + workers (list of RemoteWorker): Active workers + """ + + def __init__(self, machines): + """Create a worker pool from available machines + + Args: + machines (list of Machine): Available machines from MachinePool + """ + self.workers = [] + self._machines = machines + self._boss_log = None + + def start_all(self, git_dir, refspec, debug=False, settings=None): + """Start workers on all machines + + Uses a three-phase approach so that each worker runs the same + version of buildman as the boss: + 1. Create git repos on all machines (parallel) + 2. Push source to all repos (parallel) + 3. Start workers from pushed source (parallel) + 4. Send build settings to all workers (parallel) + + Args: + git_dir (str): Local git directory to push + refspec (str): Git refspec to push + debug (bool): True to pass -D to workers for tracebacks + settings (dict or None): Build settings to send to workers + + Returns: + list of RemoteWorker: Workers that started successfully + """ + # Phase 1: init git repos + ready = self._run_parallel( + 'Preparing', self._machines, self._init_one) + + # Phase 2: push source + ready = self._run_parallel('Pushing source to', ready, + lambda wrk: wrk.push_source(git_dir, refspec)) + + # Phase 3: start workers + self.workers = self._run_parallel('Starting', ready, + lambda wrk: self._start_one(wrk, debug)) + + # Phase 4: send build settings + if settings and self.workers: + self._run_parallel('Configuring', self.workers, + lambda wrk: wrk.configure(settings)) + + return self.workers + + def _init_one(self, mach): + """Create a RemoteWorker and initialise its git repo + + Args: + mach: Machine object with hostname attribute + + Returns: + RemoteWorker: Initialised worker + """ + wrk = RemoteWorker(mach.hostname, name=mach.name) + wrk.toolchains = dict(mach.toolchains) + wrk.bogomips = mach.info.bogomips if mach.info else 0.0 + wrk.max_boards = mach.max_boards + wrk.init_git() + return wrk + + @staticmethod + def _start_one(wrk, debug=False): + """Start the worker process from the pushed tree + + Args: + wrk (RemoteWorker): Worker to start + debug (bool): True to pass -D to the worker + """ + wrk.start(debug=debug) + + def _run_parallel(self, label, items, func): + """Run a function on items in parallel, collecting successes + + Args: + label (str): Progress label (e.g. 'Pushing source to') + items (list): Items to process + func (callable): Function to call on each item. May return + a replacement item; if None is returned, the original + item is kept. + + Returns: + list: Items that succeeded (possibly replaced by func) + """ + lock = threading.Lock() + results = [] + done = [] + + def _run_one(item): + name = getattr(item, 'name', getattr(item, 'hostname', str(item))) + try: + replacement = func(item) + with lock: + results.append(replacement if replacement else item) + done.append(name) + tout.progress(f'{label} workers {len(done)}/' + f'{len(items)}: {", ".join(done)}') + except WorkerBusy: + with lock: + done.append(f'{name} (BUSY)') + tout.progress(f'{label} workers {len(done)}/' + f'{len(items)}: {", ".join(done)}') + except BossError as exc: + # Clean up lock if the worker was initialised + if hasattr(item, 'remove_lock'): + item.remove_lock() + with lock: + done.append(f'{name} (FAILED)') + tout.progress(f'{label} workers {len(done)}/' + f'{len(items)}: {", ".join(done)}') + print(f'\n Worker failed on {name}: {exc}') + + tout.progress(f'{label} workers on {len(items)} machines') + threads = [] + for item in items: + thr = threading.Thread(target=_run_one, args=(item,)) + thr.start() + threads.append(thr) + for thr in threads: + thr.join() + tout.clear_progress() + return results + + @staticmethod + def _get_capacity(wrk): + """Get a worker's build capacity score + + Uses nthreads * bogomips as the capacity metric. Falls back to + nthreads alone if bogomips is not available. + + Args: + wrk (RemoteWorker): Worker to score + + Returns: + float: Capacity score (higher is faster) + """ + bogo = wrk.bogomips if wrk.bogomips else 1.0 + return wrk.nthreads * bogo + + def _get_worker_for_arch(self, arch, assigned): + """Pick the next worker that supports a given architecture + + Distributes boards proportionally to each worker's capacity + (nthreads * bogomips). Picks the capable worker whose current + assignment is most below its fair share. + + Args: + arch (str): Board architecture (e.g. 'arm', 'aarch64') + assigned (dict): worker -> int count of boards assigned so far + + Returns: + RemoteWorker or None: A worker with the right toolchain + """ + if arch == 'sandbox': + capable = list(self.workers) + else: + capable = [w for w in self.workers if arch in w.toolchains] + if not capable: + return None + + total_cap = sum(self._get_capacity(w) for w in capable) + if not total_cap: + total_cap = len(capable) + + # Pick the worker with the lowest assigned / capacity ratio + best = min(capable, key=lambda w: (assigned.get(w, 0) / + (self._get_capacity(w) or 1))) + assigned[best] = assigned.get(best, 0) + 1 + return best + + def build_boards(self, board_selected, commits, builder, local_count=0): + """Build boards on remote workers and write results locally + + Uses demand-driven dispatch: boards are fed to workers from a + shared pool. Each worker gets one board per thread initially, + then one more each time a board completes. Faster workers + naturally get more boards. + + Args: + board_selected (dict): target_name -> Board to build remotely + commits (list of Commit or None): Commits to build + builder (Builder): Builder object for result processing + local_count (int): Number of boards being built locally + """ + if not self.workers or not board_selected: + return + + ncommits = max(1, len(commits)) if commits else 1 + + # Build a pool of boards that have work remaining + pool = list(board_selected.values()) + if not builder.force_build: + commit_range = range(len(commits)) if commits else range(1) + pool = [b for b in pool + if any(not os.path.exists( + builder.get_done_file(cu, b.target)) + for cu in commit_range)] + + # Filter boards that no worker can handle + capable_archs = set() + for wrk in self.workers: + capable_archs.update(wrk.toolchains.keys()) + capable_archs.add('sandbox') + skipped = [b for b in pool if b.arch not in capable_archs] + pool = [b for b in pool if b.arch in capable_archs] + if skipped: + builder.count -= len(skipped) * ncommits + + if not pool: + print('No remote jobs to dispatch') + return + + total_jobs = len(pool) * ncommits + nmach = len(self.workers) + if local_count: + nmach += 1 + parts = [f'{len(pool)} boards', f'{ncommits} commits {nmach} machines'] + print(f'Building {" x ".join(parts)} (demand-driven)') + + self._boss_log = _BossLog(builder.base_dir) + self._boss_log.log(f'dispatch: {len(self.workers)} workers, ' + f'{total_jobs} total jobs') + for wrk in self.workers: + self._boss_log.init_worker(wrk) + + self._dispatch_demand(pool, commits, builder, board_selected) + + @staticmethod + def _grab_boards(pool, pool_lock, wrk, count): + """Take up to count boards from pool that wrk can build + + Args: + pool (list of Board): Shared board pool (modified in place) + pool_lock (threading.Lock): Lock protecting the pool + wrk (RemoteWorker): Worker to match toolchains against + count (int): Maximum number of boards to take + + Returns: + list of Board: Boards taken from the pool + """ + with pool_lock: + batch = [] + remaining = [] + for brd in pool: + if len(batch) >= count: + remaining.append(brd) + elif (brd.arch == 'sandbox' + or brd.arch in wrk.toolchains): + batch.append(brd) + else: + remaining.append(brd) + pool[:] = remaining + return batch + + def _dispatch_jobs(self, worker_jobs, builder, board_selected): + """Send build jobs to workers and collect results + + Opens a log file per worker, then runs each worker's jobs + in a separate thread. + + Args: + worker_jobs (dict): worker -> list of (board, commit_upto, + commit) tuples + builder (Builder): Builder for result processing + board_selected (dict): target_name -> Board mapping + """ + ctx = _DispatchContext(worker_jobs.keys(), builder, board_selected, + self._boss_log) + + if ctx.boss_log: + ctx.boss_log.start_timer() + + threads = [] + for wrk, wjobs in worker_jobs.items(): + thr = threading.Thread( + target=self._run_batch_worker, args=(wrk, wjobs, ctx), + daemon=True) + thr.start() + threads.append(thr) + for thr in threads: + thr.join() + + ctx.close() + self._boss_log = None + + @staticmethod + def _run_batch_worker(wrk, wjobs, ctx): + """Send build commands to one worker and collect results + + Args: + wrk (RemoteWorker): Worker to run + wjobs (list): List of (board, commit_upto, commit) tuples + ctx (_DispatchContext): Shared dispatch infrastructure + """ + recv_q = ctx.start_reader(wrk) + + board_infos = {} + commit_list = [] + commit_set = set() + for brd, _, commit in wjobs: + target = brd.target + if target not in board_infos: + board_infos[target] = { + 'board': target, 'arch': brd.arch} + commit_hash = commit.hash if commit else None + if commit_hash not in commit_set: + commit_set.add(commit_hash) + commit_list.append(commit_hash) + + boards_list = list(board_infos.values()) + total = len(boards_list) * len(commit_list) + + ctx.log(wrk, '>>', f'{len(boards_list)} boards x ' + f'{len(commit_list)} commits') + if ctx.boss_log: + ctx.boss_log.log(f'{wrk.name}: {len(boards_list)} boards' + f' x {len(commit_list)} commits') + + try: + wrk.build_boards(boards_list, commit_list) + except BossError as exc: + ctx.log(wrk, '!!', str(exc)) + if not wrk.closing: + print(f'\n Error from {wrk.name}: {exc}') + return + if ctx.boss_log: + ctx.boss_log.record_sent(wrk.name, total) + + for _ in range(total): + if not ctx.recv_one(wrk, recv_q): + return + + def _start_demand_worker( # pylint: disable=R0913 + self, wrk, ctx, commit_list, ncommits, pool, pool_lock): + """Prepare a worker and send the initial batch of boards + + Args: + wrk (RemoteWorker): Worker to run + ctx (_DispatchContext): Shared dispatch infrastructure + commit_list (list of str): Commit hashes to build + ncommits (int): Number of commits + pool (list of Board): Shared board pool + pool_lock (threading.Lock): Lock protecting the pool + + Returns: + tuple: (recv_q, state) on success, or (None, None) if the + worker failed during prepare or had no boards to build + """ + recv_q = ctx.start_reader(wrk) + + try: + wrk.build_prepare(commit_list) + except BossError as exc: + ctx.log(wrk, '!!', str(exc)) + if not wrk.closing: + print(f'\n Error from {wrk.name}: {exc}') + return None, None + + if not ctx.wait_for_prepare(wrk, recv_q): + return None, None + + initial = self._grab_boards(pool, pool_lock, wrk, wrk.max_boards) + if not initial: + try: + wrk.build_done() + except BossError: + pass + return None, None + + count = ctx.send_batch(wrk, initial) + if count < 0: + return None, None + if ctx.boss_log: + ctx.boss_log.record_sent(wrk.name, count * ncommits) + ctx.log(wrk, '>>', f'{count} boards (initial,' + f' max_boards={wrk.max_boards})') + + def _grab(w, n): + return self._grab_boards(pool, pool_lock, w, n) + + state = DemandState(count, ncommits, _grab) + return recv_q, state + + @staticmethod + def _finish_demand_worker(wrk, ctx, recv_q, state): + """Collect results and finish the demand-driven protocol + + Args: + wrk (RemoteWorker): Worker to collect from + ctx (_DispatchContext): Shared dispatch infrastructure + recv_q (queue.Queue): Response queue from start_reader() + state (DemandState): Build state from _start_demand_worker() + """ + ctx.collect_results(wrk, recv_q, state) + + ctx.log(wrk, '>>', f'{state.sent} boards total') + try: + wrk.build_done() + except BossError as exc: + ctx.log(wrk, '!!', str(exc)) + return + + # Wait for worker's build_done + while True: + resp = ctx.recv(wrk, recv_q) + if resp is None: + return + if resp.get('resp') == 'build_done': + break + + def _dispatch_demand(self, pool, commits, builder, board_selected): + """Dispatch boards on demand from a shared pool + + Each worker gets boards from the pool as it finishes previous + ones, so faster workers naturally get more work. + + Args: + pool (list of Board): Boards available to build + commits (list of Commit or None): Commits to build + builder (Builder): Builder for result processing + board_selected (dict): target_name -> Board mapping + """ + commit_list = [c.hash if c else None for c in (commits or [None])] + ncommits = len(commit_list) + pool_lock = threading.Lock() + + ctx = _DispatchContext( + self.workers, builder, board_selected, self._boss_log) + + if ctx.boss_log: + ctx.boss_log.start_timer() + + def _run_worker(wrk): + recv_q, state = self._start_demand_worker( + wrk, ctx, commit_list, ncommits, pool, pool_lock) + if recv_q is not None: + self._finish_demand_worker(wrk, ctx, recv_q, state) + + threads = [] + for wrk in self.workers: + thr = threading.Thread(target=_run_worker, args=(wrk,), + daemon=True) + thr.start() + threads.append(thr) + for thr in threads: + thr.join() + + ctx.close() + self._boss_log = None + + def quit_all(self): + """Quit all workers gracefully""" + self.print_transfer_summary() + if self._boss_log: + self._boss_log.log('quit: shutting down') + self._boss_log.log_status() + self._boss_log.close() + self._boss_log = None + for wrk in self.workers: + try: + wrk.quit() + except BossError: + wrk.close() + self.workers = [] + + def print_transfer_summary(self): + """Print data transfer summary for all workers""" + if not self.workers: + return + total_sent = 0 + total_recv = 0 + parts = [] + for wrk in self.workers: + sent = getattr(wrk, 'bytes_sent', 0) + recv = getattr(wrk, 'bytes_recv', 0) + total_sent += sent + total_recv += recv + name = getattr(wrk, 'name', '?') + parts.append(f'{name}:' + f'{_format_bytes(sent)}/' + f'{_format_bytes(recv)}') + sys.stderr.write(f'\nTransfer (sent/recv): {", ".join(parts)}' + f' total:{_format_bytes(total_sent)}/' + f'{_format_bytes(total_recv)}\n') + sys.stderr.flush() + + def close_all(self): + """Stop all workers immediately + + Use this on Ctrl-C. Sends a quit command to all workers first, + then waits briefly for the commands to travel through SSH + before closing the connections. This two-phase approach avoids + a race where closing SSH kills the connection before the quit + command is forwarded to the remote worker. + """ + self.print_transfer_summary() + if self._boss_log: + self._boss_log.log('interrupted: Ctrl-C') + self._boss_log.log_status() + self._boss_log.close() + self._boss_log = None + + # Suppress error messages from reader threads + for wrk in self.workers: + wrk.closing = True + + # Phase 1: send quit to all workers + for wrk in self.workers: + try: + wrk._send({'cmd': 'quit'}) # pylint: disable=W0212 + except BossError: + pass + + # Brief pause so SSH can forward the quit commands to the + # remote workers before we tear down the connections + time.sleep(0.5) + + # Phase 2: close all connections + for wrk in self.workers: + wrk.close() + wrk.remove_lock() + self.workers = [] diff --git a/tools/buildman/main.py b/tools/buildman/main.py index 225e341fc26..b9779c38408 100755 --- a/tools/buildman/main.py +++ b/tools/buildman/main.py @@ -43,6 +43,7 @@ def run_tests(skip_net_tests, debug, verbose, args): from buildman import test_cfgutil from buildman import test_machine from buildman import test_worker + from buildman import test_boss test_name = args.terms and args.terms[0] or None if skip_net_tests: @@ -67,6 +68,7 @@ def run_tests(skip_net_tests, debug, verbose, args): test_builder.TestPrintBuildSummary, test_machine, test_worker, + test_boss, 'buildman.toolchain']) return (0 if result.wasSuccessful() else 1) diff --git a/tools/buildman/test_boss.py b/tools/buildman/test_boss.py new file mode 100644 index 00000000000..2f6710c1a58 --- /dev/null +++ b/tools/buildman/test_boss.py @@ -0,0 +1,2645 @@ +# SPDX-License-Identifier: GPL-2.0+ +# Copyright 2026 Simon Glass + +"""Tests for the boss module""" + +# pylint: disable=C0302,E1101,W0212,W0612 + +import io +import json +import os +import queue +import random +import shutil +import subprocess +import tempfile +import threading +import time +import types +import unittest +from unittest import mock + +from u_boot_pylib import command +from u_boot_pylib import terminal +from u_boot_pylib import tools + +from buildman import boss +from buildman import bsettings +from buildman import machine +from buildman import worker as worker_mod + + +def _make_response(obj): + """Create a BM>-prefixed response line as bytes""" + return (worker_mod.RESPONSE_PREFIX + json.dumps(obj) + '\n').encode() + + +class FakeProc: + """Fake subprocess.Popen for testing""" + + def __init__(self, responses=None): + self.stdin = io.BytesIO() + self._responses = responses or [] + self._resp_idx = 0 + self.stdout = self + self.stderr = io.BytesIO(b'') + self._returncode = None + + def readline(self): + """Return the next canned response line""" + if self._resp_idx < len(self._responses): + line = self._responses[self._resp_idx] + self._resp_idx += 1 + return line + return b'' + + def poll(self): + """Return the process return code""" + return self._returncode + + def terminate(self): + """Simulate SIGTERM""" + self._returncode = -15 + + def kill(self): + """Simulate SIGKILL""" + self._returncode = -9 + + def wait(self, timeout=None): + """Wait for the process (no-op)""" + + +class TestRunSsh(unittest.TestCase): + """Test _run_ssh()""" + + @mock.patch('buildman.boss.command.run_pipe') + def test_success(self, mock_pipe): + """Test successful one-shot SSH command""" + mock_pipe.return_value = mock.Mock( + stdout='/tmp/bm-worker-abc\n') + result = boss._run_ssh('host1', 'echo hello') + self.assertEqual(result, '/tmp/bm-worker-abc') + + @mock.patch('buildman.boss.command.run_pipe') + def test_failure(self, mock_pipe): + """Test SSH command failure""" + mock_pipe.side_effect = command.CommandExc( + 'connection refused', command.CommandResult()) + with self.assertRaises(boss.BossError) as ctx: + boss._run_ssh('host1', 'echo hello') + self.assertIn('SSH command failed', str(ctx.exception)) + + +class TestKillWorkers(unittest.TestCase): + """Test kill_workers()""" + + @mock.patch('buildman.boss._run_ssh') + def test_kill_workers(self, mock_ssh): + """Test killing workers on multiple machines""" + mock_ssh.side_effect = ['killed 1234', 'no workers'] + with terminal.capture(): + result = boss.kill_workers(['host1', 'host2']) + self.assertEqual(result, 0) + self.assertEqual(mock_ssh.call_count, 2) + + @mock.patch('buildman.boss._run_ssh') + def test_kill_workers_ssh_failure(self, mock_ssh): + """Test that SSH failures are reported but do not abort""" + mock_ssh.side_effect = boss.BossError('connection refused') + with terminal.capture(): + result = boss.kill_workers(['host1']) + self.assertEqual(result, 0) + + +class TestRemoteWorkerInitGit(unittest.TestCase): + """Test RemoteWorker.init_git()""" + + @mock.patch('buildman.boss._run_ssh') + def test_init_git(self, mock_ssh): + """Test successful git init""" + mock_ssh.return_value = '/tmp/bm-worker-abc' + w = boss.RemoteWorker('host1') + w.init_git() + self.assertEqual(w.work_dir, '/tmp/bm-worker-abc') + self.assertEqual(w.git_dir, '/tmp/bm-worker-abc/.git') + + @mock.patch('buildman.boss._run_ssh') + def test_init_git_busy(self, mock_ssh): + """Test init_git when machine is locked""" + mock_ssh.return_value = 'BUSY' + w = boss.RemoteWorker('host1') + with self.assertRaises(boss.WorkerBusy) as ctx: + w.init_git() + self.assertIn('busy', str(ctx.exception)) + + @mock.patch('buildman.boss._run_ssh') + def test_init_git_empty_output(self, mock_ssh): + """Test init_git with empty output""" + mock_ssh.return_value = '' + w = boss.RemoteWorker('host1') + with self.assertRaises(boss.BossError) as ctx: + w.init_git() + self.assertIn('returned no work directory', str(ctx.exception)) + + @mock.patch('buildman.boss._run_ssh') + def test_init_git_ssh_failure(self, mock_ssh): + """Test init_git when SSH fails""" + mock_ssh.side_effect = boss.BossError('connection refused') + w = boss.RemoteWorker('host1') + with self.assertRaises(boss.BossError): + w.init_git() + + +def _make_result(board, commit_upto=0, return_code=0, + stderr='', stdout=''): + """Create a build_result response dict""" + return { + 'resp': 'build_result', + 'board': board, + 'commit_upto': commit_upto, + 'return_code': return_code, + 'stderr': stderr, + 'stdout': stdout, + } + + +def _make_builder(tmpdir, force_build=True): + """Create a mock Builder with base_dir set""" + builder = mock.Mock() + builder.force_build = force_build + builder.base_dir = tmpdir + builder.count = 0 + builder.get_build_dir.side_effect = ( + lambda c, b: os.path.join(tmpdir, b)) + return builder + + +def _start_worker(hostname, mock_popen, proc): + """Helper to create a worker with work_dir set and start it""" + mock_popen.return_value = proc + wrk = boss.RemoteWorker(hostname) + wrk.work_dir = '/tmp/bm-worker-123' + wrk.git_dir = '/tmp/bm-worker-123/.git' + wrk.start() + return wrk + + +class TestRemoteWorkerStart(unittest.TestCase): + """Test RemoteWorker.start()""" + + @mock.patch('subprocess.Popen') + def test_start_success(self, mock_popen): + """Test successful worker start""" + proc = FakeProc([ + _make_response({'resp': 'ready', 'nthreads': 8}), + ]) + w = _start_worker('myhost', mock_popen, proc) + self.assertEqual(w.nthreads, 8) + + # Check SSH command runs from pushed tree + cmd = mock_popen.call_args[0][0] + self.assertIn('ssh', cmd) + self.assertIn('myhost', cmd) + self.assertIn('--worker', cmd[-1]) + + @mock.patch('subprocess.Popen') + def test_start_not_ready(self, mock_popen): + """Test start when worker sends unexpected response""" + proc = FakeProc([ + _make_response({'resp': 'error', 'msg': 'broken'}), + ]) + mock_popen.return_value = proc + + w = boss.RemoteWorker('badhost') + w.work_dir = '/tmp/bm-test' + with self.assertRaises(boss.BossError) as ctx: + w.start() + self.assertIn('did not send ready', str(ctx.exception)) + + @mock.patch('subprocess.Popen') + def test_start_ssh_failure(self, mock_popen): + """Test start when SSH fails to launch""" + mock_popen.side_effect = OSError('No such file') + + w = boss.RemoteWorker('badhost') + w.work_dir = '/tmp/bm-test' + with self.assertRaises(boss.BossError) as ctx: + w.start() + self.assertIn('Failed to start SSH', str(ctx.exception)) + + @mock.patch('subprocess.Popen') + def test_start_connection_closed(self, mock_popen): + """Test start when connection closes immediately""" + proc = FakeProc([]) # No responses + mock_popen.return_value = proc + + w = boss.RemoteWorker('deadhost') + w.work_dir = '/tmp/bm-test' + with self.assertRaises(boss.BossError) as ctx: + w.start() + self.assertIn('closed connection', str(ctx.exception)) + + def test_start_no_work_dir(self): + """Test start without init_git raises error""" + w = boss.RemoteWorker('host1') + with self.assertRaises(boss.BossError) as ctx: + w.start() + self.assertIn('call init_git', str(ctx.exception)) + + @mock.patch('subprocess.Popen') + def test_max_boards_defaults_to_nthreads(self, mock_popen): + """Test max_boards defaults to nthreads when not configured""" + proc = FakeProc([ + _make_response({'resp': 'ready', 'nthreads': 64}), + ]) + w = _start_worker('host1', mock_popen, proc) + self.assertEqual(w.max_boards, 64) + + @mock.patch('subprocess.Popen') + def test_max_boards_preserved_when_set(self, mock_popen): + """Test max_boards keeps its configured value after start""" + proc = FakeProc([ + _make_response({'resp': 'ready', 'nthreads': 256}), + ]) + mock_popen.return_value = proc + w = boss.RemoteWorker('host1') + w.work_dir = '/tmp/bm-test' + w.max_boards = 64 + w.start() + self.assertEqual(w.nthreads, 256) + self.assertEqual(w.max_boards, 64) + + +class TestRemoteWorkerPush(unittest.TestCase): + """Test RemoteWorker.push_source()""" + + def test_push_no_init(self): + """Test push before init_git raises error""" + w = boss.RemoteWorker('host1') + with self.assertRaises(boss.BossError) as ctx: + w.push_source('/tmp/repo', 'HEAD:refs/heads/work') + self.assertIn('call init_git first', str(ctx.exception)) + + @mock.patch('buildman.boss.command.run_pipe') + def test_push_success(self, mock_pipe): + """Test successful git push""" + mock_pipe.return_value = mock.Mock(return_code=0) + w = boss.RemoteWorker('host1') + w.git_dir = '/tmp/bm-worker-123/.git' + + w.push_source('/home/user/u-boot', 'HEAD:refs/heads/work') + cmd = mock_pipe.call_args[0][0][0] + self.assertIn('git', cmd) + self.assertIn('push', cmd) + self.assertIn('host1:/tmp/bm-worker-123/.git', cmd) + self.assertIn('HEAD:refs/heads/work', cmd) + + @mock.patch('buildman.boss.command.run_pipe') + def test_push_failure(self, mock_pipe): + """Test git push failure""" + mock_pipe.side_effect = command.CommandExc( + 'push failed', command.CommandResult()) + w = boss.RemoteWorker('host1') + w.git_dir = '/tmp/bm/.git' + + with self.assertRaises(boss.BossError) as ctx: + w.push_source('/tmp/repo', 'HEAD:refs/heads/work') + self.assertIn('git push', str(ctx.exception)) + + +class TestRemoteWorkerBuildBoards(unittest.TestCase): + """Test RemoteWorker.build_boards() and recv()""" + + @mock.patch('subprocess.Popen') + def test_build_boards_and_recv(self, mock_popen): + """Test sending build_boards and receiving results""" + proc = FakeProc([ + _make_response({'resp': 'ready', 'nthreads': 4}), + _make_response({ + 'resp': 'build_result', 'board': 'sandbox', + 'commit_upto': 0, 'return_code': 0, + 'stderr': '', 'stdout': '', + }), + ]) + w = _start_worker('host1', mock_popen, proc) + boards = [{'board': 'sandbox', 'defconfig': 'sandbox_defconfig', + 'env': {}}] + w.build_boards(boards, ['abc123']) + + # Check the command was sent + sent = proc.stdin.getvalue().decode() + obj = json.loads(sent) + self.assertEqual(obj['cmd'], 'build_boards') + self.assertEqual(len(obj['boards']), 1) + self.assertEqual(obj['boards'][0]['board'], 'sandbox') + self.assertEqual(obj['commits'], ['abc123']) + + # Receive result + resp = w.recv() + self.assertEqual(resp['resp'], 'build_result') + self.assertEqual(resp['board'], 'sandbox') + + +class TestRemoteWorkerQuit(unittest.TestCase): + """Test RemoteWorker.quit() and close()""" + + @mock.patch('buildman.boss._run_ssh') + @mock.patch('subprocess.Popen') + def test_quit(self, mock_popen, mock_ssh): + """Test clean quit removes the lock""" + proc = FakeProc([ + _make_response({'resp': 'ready', 'nthreads': 4}), + _make_response({'resp': 'quit_ack'}), + ]) + w = _start_worker('host1', mock_popen, proc) + resp = w.quit() + self.assertEqual(resp.get('resp'), 'quit_ack') + self.assertIsNone(w._proc) + # Lock removal SSH should have been called + mock_ssh.assert_called_once() + self.assertIn('rm -f', mock_ssh.call_args[0][1]) + + @mock.patch('subprocess.Popen') + def test_close_without_quit(self, mock_popen): + """Test close without sending quit""" + proc = FakeProc([ + _make_response({'resp': 'ready', 'nthreads': 4}), + ]) + w = _start_worker('host1', mock_popen, proc) + self.assertIsNotNone(w._proc) + w.close() + self.assertIsNone(w._proc) + + def test_close_when_not_started(self): + """Test close on a worker that was never started""" + w = boss.RemoteWorker('host1') + w.close() # Should not raise + + +class TestRemoteWorkerRecv(unittest.TestCase): + """Test response parsing""" + + @mock.patch('subprocess.Popen') + def test_skip_non_protocol_lines(self, mock_popen): + """Test that non-BM> lines are skipped""" + proc = FakeProc([ + b'Welcome to myhost\n', + b'Last login: Mon Jan 1\n', + _make_response({'resp': 'ready', 'nthreads': 2}), + ]) + w = _start_worker('host1', mock_popen, proc) + self.assertEqual(w.nthreads, 2) + + @mock.patch('subprocess.Popen') + def test_bad_json(self, mock_popen): + """Test bad JSON in protocol line""" + proc = FakeProc([ + (worker_mod.RESPONSE_PREFIX + 'not json\n').encode(), + ]) + mock_popen.return_value = proc + + w = boss.RemoteWorker('host1') + w._proc = proc + with self.assertRaises(boss.BossError) as ctx: + w._recv() + self.assertIn('Bad JSON', str(ctx.exception)) + + +class TestRemoteWorkerSend(unittest.TestCase): + """Test _send()""" + + def test_send_when_not_running(self): + """Test sending to a stopped worker""" + w = boss.RemoteWorker('host1') + with self.assertRaises(boss.BossError) as ctx: + w._send({'cmd': 'quit'}) + self.assertIn('not running', str(ctx.exception)) + + +class TestRemoteWorkerRepr(unittest.TestCase): + """Test __repr__""" + + def test_repr_stopped(self): + """Test repr when stopped""" + w = boss.RemoteWorker('host1') + self.assertIn('host1', repr(w)) + self.assertIn('stopped', repr(w)) + + @mock.patch('subprocess.Popen') + def test_repr_running(self, mock_popen): + """Test repr when running""" + proc = FakeProc([ + _make_response({'resp': 'ready', 'nthreads': 8}), + ]) + w = _start_worker('host1', mock_popen, proc) + self.assertIn('running', repr(w)) + self.assertIn('nthreads=8', repr(w)) + w.close() + + +class FakeBoard: # pylint: disable=R0903 + """Fake board for testing split_boards()""" + + def __init__(self, target, arch): + self.target = target + self.arch = arch + + +class TestSplitBoards(unittest.TestCase): + """Test split_boards()""" + + def test_all_local(self): + """Test when no remote toolchains match""" + boards = { + 'sandbox': FakeBoard('sandbox', 'sandbox'), + 'rpi': FakeBoard('rpi', 'arm'), + } + local, remote = boss.split_boards(boards, {'x86': '/usr/bin/gcc'}) + self.assertEqual(len(local), 2) + self.assertEqual(len(remote), 0) + + def test_all_remote(self): + """Test when all boards have remote toolchains""" + boards = { + 'rpi': FakeBoard('rpi', 'arm'), + 'odroid': FakeBoard('odroid', 'arm'), + } + local, remote = boss.split_boards(boards, {'arm': '/usr/bin/gcc'}) + self.assertEqual(len(local), 0) + self.assertEqual(len(remote), 2) + + def test_mixed(self): + """Test split with some local, some remote""" + boards = { + 'sandbox': FakeBoard('sandbox', 'sandbox'), + 'rpi': FakeBoard('rpi', 'arm'), + 'qemu': FakeBoard('qemu', 'riscv'), + } + local, remote = boss.split_boards( + boards, {'arm': '/usr/bin/gcc', 'riscv': '/usr/bin/gcc'}) + self.assertEqual(len(local), 1) + self.assertIn('sandbox', local) + self.assertEqual(len(remote), 2) + self.assertIn('rpi', remote) + self.assertIn('qemu', remote) + + def test_empty_toolchains(self): + """Test with no remote toolchains""" + boards = {'sandbox': FakeBoard('sandbox', 'sandbox')} + local, remote = boss.split_boards(boards, {}) + self.assertEqual(len(local), 1) + self.assertEqual(len(remote), 0) + + def test_none_toolchains(self): + """Test with None toolchains""" + boards = {'sandbox': FakeBoard('sandbox', 'sandbox')} + local, remote = boss.split_boards(boards, None) + self.assertEqual(len(local), 1) + self.assertEqual(len(remote), 0) + + +class TestWriteRemoteResult(unittest.TestCase): + """Test _write_remote_result()""" + + def test_success(self): + """Test writing a successful build result""" + with tempfile.TemporaryDirectory() as tmpdir: + build_dir = os.path.join(tmpdir, 'sandbox') + builder = mock.Mock() + builder.get_build_dir.return_value = build_dir + + resp = { + 'resp': 'build_result', + 'board': 'sandbox', + 'commit_upto': 0, + 'return_code': 0, + 'stderr': '', + 'stdout': 'build output', + } + boss._write_remote_result(builder, resp, {}, 'host1') + + build_dir = builder.get_build_dir.return_value + self.assertTrue(os.path.isdir(build_dir)) + + self.assertEqual( + tools.read_file(os.path.join(build_dir, 'done'), + binary=False), '0\n') + self.assertEqual( + tools.read_file(os.path.join(build_dir, 'log'), + binary=False), 'build output') + self.assertFalse(os.path.exists( + os.path.join(build_dir, 'err'))) + + def test_failure_with_stderr(self): + """Test writing a failed build result with stderr""" + with tempfile.TemporaryDirectory() as tmpdir: + build_dir = os.path.join(tmpdir, 'rpi') + builder = mock.Mock() + builder.get_build_dir.return_value = build_dir + + resp = { + 'resp': 'build_result', + 'board': 'rpi', + 'commit_upto': 1, + 'return_code': 2, + 'stderr': 'error: undefined reference', + 'stdout': '', + } + boss._write_remote_result(builder, resp, {}, 'host1') + + build_dir = builder.get_build_dir.return_value + self.assertEqual( + tools.read_file(os.path.join(build_dir, 'done'), + binary=False), '2\n') + self.assertEqual( + tools.read_file(os.path.join(build_dir, 'err'), + binary=False), + 'error: undefined reference') + + def test_with_sizes(self): + """Test writing a result with size information""" + with tempfile.TemporaryDirectory() as tmpdir: + build_dir = os.path.join(tmpdir, 'sandbox') + builder = mock.Mock() + builder.get_build_dir.return_value = build_dir + + sizes_raw = (' text data bss dec hex\n' + ' 12345 1234 567 14146 374a\n') + resp = { + 'resp': 'build_result', + 'board': 'sandbox', + 'commit_upto': 0, + 'return_code': 0, + 'stderr': '', + 'stdout': '', + 'sizes': {'raw': sizes_raw}, + } + boss._write_remote_result(builder, resp, {}, 'host1') + + # Boss strips the header line from sizes + build_dir = builder.get_build_dir.return_value + self.assertEqual( + tools.read_file(os.path.join(build_dir, 'sizes'), + binary=False), + ' 12345 1234 567 14146 374a') + + +class FakeMachineInfo: # pylint: disable=R0903 + """Fake machine info for testing""" + bogomips = 5000.0 + + +class FakeMachine: # pylint: disable=R0903 + """Fake machine for testing WorkerPool""" + + def __init__(self, hostname): + self.hostname = hostname + self.name = hostname + self.toolchains = {'arm': '/usr/bin/arm-linux-gnueabihf-gcc'} + self.info = FakeMachineInfo() + self.max_boards = 0 + +def _make_worker(): + """Create a RemoteWorker with mocked subprocess for testing + + Uses __new__ to avoid calling __init__ which requires real SSH. + Sets all attributes to safe defaults. + """ + wrk = boss.RemoteWorker.__new__(boss.RemoteWorker) + wrk.hostname = 'host1' + wrk.name = 'host1' + wrk.nthreads = 4 + wrk.bogomips = 5000.0 + wrk.max_boards = 0 + wrk.slots = 2 + wrk.toolchains = {} + wrk._proc = mock.Mock() + wrk._proc.poll.return_value = None # process is running + wrk._log = None + wrk._closed = False + wrk._closing = False + wrk._stderr_buf = [] + wrk._stderr_thread = None + wrk._stderr_lines = [] + wrk._ready = queue.Queue() + wrk._lock_file = None + wrk._work_dir = '' + wrk._git_dir = '' + wrk.work_dir = '' + wrk.timeout = 10 + wrk.bytes_sent = 0 + wrk.bytes_recv = 0 + wrk.closing = False + return wrk + + +def _make_ctx(board_selected=None): + """Create a _DispatchContext with temp directory for testing + + Returns: + tuple: (ctx, wrk, tmpdir) — caller must call ctx.close() and + shutil.rmtree(tmpdir) + """ + wrk = mock.Mock(nthreads=4, closing=False, max_boards=0, + slots=2, toolchains={'arm': '/gcc'}) + wrk.name = 'host1' + builder = mock.Mock() + tmpdir = tempfile.mkdtemp() + builder.base_dir = tmpdir + ctx = boss._DispatchContext( + workers=[wrk], builder=builder, + board_selected=board_selected or {}, + boss_log=mock.Mock()) + return ctx, wrk, tmpdir + + +class TestWorkerPool(unittest.TestCase): + """Test WorkerPool""" + + @mock.patch('subprocess.Popen') + @mock.patch('buildman.boss._run_ssh') + @mock.patch('buildman.boss.command.run_pipe') + def test_start_all(self, mock_pipe, mock_ssh, mock_popen): + """Test starting workers on multiple machines""" + mock_ssh.return_value = '/tmp/bm-1' + mock_pipe.return_value = mock.Mock(return_code=0) + proc1 = FakeProc([ + _make_response({'resp': 'ready', 'nthreads': 4}), + ]) + proc2 = FakeProc([ + _make_response({'resp': 'ready', 'nthreads': 8}), + ]) + mock_popen.side_effect = [proc1, proc2] + + machines = [FakeMachine('host1'), FakeMachine('host2')] + pool = boss.WorkerPool(machines) + with terminal.capture(): + workers = pool.start_all('/tmp/repo', 'HEAD:refs/heads/work') + self.assertEqual(len(workers), 2) + + @mock.patch('subprocess.Popen') + @mock.patch('buildman.boss._run_ssh') + @mock.patch('buildman.boss.command.run_pipe') + def test_start_all_with_settings(self, mock_pipe, mock_ssh, mock_popen): + """Test that start_all sends settings via configure""" + mock_ssh.return_value = '/tmp/bm-1' + mock_pipe.return_value = mock.Mock(return_code=0) + proc1 = FakeProc([ + _make_response({'resp': 'ready', 'nthreads': 4}), + _make_response({'resp': 'configure_done'}), + ]) + proc2 = FakeProc([ + _make_response({'resp': 'ready', 'nthreads': 8}), + _make_response({'resp': 'configure_done'}), + ]) + mock_popen.side_effect = [proc1, proc2] + + machines = [FakeMachine('host1'), FakeMachine('host2')] + pool = boss.WorkerPool(machines) + settings = {'no_lto': True, 'allow_missing': True} + with terminal.capture(): + workers = pool.start_all('/tmp/repo', 'HEAD:refs/heads/work', + settings=settings) + self.assertEqual(len(workers), 2) + # Check that configure was sent (2nd response consumed) + self.assertEqual(proc1._resp_idx, 2) + self.assertEqual(proc2._resp_idx, 2) + + @mock.patch('buildman.boss._run_ssh') + def test_start_all_init_failure(self, mock_ssh): + """Test start_all when init_git fails on one machine""" + call_count = [0] + + def _side_effect(_hostname, _cmd, **_kwargs): + call_count[0] += 1 + if call_count[0] % 2 == 0: + raise boss.BossError('connection refused') + return '/tmp/bm-1' + + mock_ssh.side_effect = _side_effect + + machines = [FakeMachine('good'), FakeMachine('bad')] + pool = boss.WorkerPool(machines) + with terminal.capture(): + workers = pool.start_all('/tmp/repo', 'HEAD:refs/heads/work') + # Only 'good' survives init phase; push and start not reached + # for 'bad' + self.assertLessEqual(len(workers), 1) + + def test_quit_all(self): + """Test quitting all workers""" + pool = boss.WorkerPool([]) + w1 = mock.Mock(spec=boss.RemoteWorker) + w2 = mock.Mock(spec=boss.RemoteWorker) + pool.workers = [w1, w2] + with terminal.capture(): + pool.quit_all() + w1.quit.assert_called_once() + w2.quit.assert_called_once() + self.assertEqual(len(pool.workers), 0) + + def test_quit_all_with_error(self): + """Test quit_all when a worker raises BossError""" + pool = boss.WorkerPool([]) + w1 = mock.Mock(spec=boss.RemoteWorker) + w1.quit.side_effect = boss.BossError('connection lost') + pool.workers = [w1] + with terminal.capture(): + pool.quit_all() + w1.close.assert_called_once() + self.assertEqual(len(pool.workers), 0) + + def test_build_boards_empty(self): + """Test build_boards with no workers or boards""" + pool = boss.WorkerPool([]) + with terminal.capture(): + pool.build_boards({}, None, mock.Mock()) # Should not raise + + +class TestBuildBoards(unittest.TestCase): + """Test WorkerPool.build_boards() end-to-end""" + + def setUp(self): + self._tmpdir = tempfile.mkdtemp() + + def tearDown(self): + shutil.rmtree(self._tmpdir, ignore_errors=True) + + @staticmethod + def _make_worker(hostname, toolchains, responses): + """Create a mock RemoteWorker with canned responses + + Args: + hostname (str): Hostname for the worker + toolchains (dict): arch -> gcc path + responses (list of dict): Responses to return from recv() + + Returns: + Mock: Mock RemoteWorker + """ + wrk = mock.Mock(spec=boss.RemoteWorker) + wrk.hostname = hostname + wrk.name = hostname + wrk.toolchains = toolchains + wrk.nthreads = 4 + wrk.max_boards = 4 + wrk.bogomips = 5000.0 + wrk.slots = 4 + wrk.recv.side_effect = list(responses) + return wrk + + def _demand_responses(self, *results): + """Build recv responses for the demand-driven protocol + + Returns a list starting with build_prepare_done, followed by + the given results, and ending with build_done. + """ + return ([{'resp': 'build_prepare_done'}] + list(results) + + [{'resp': 'build_done'}]) + + def test_build_boards_success(self): + """Test building boards across workers with correct results""" + wrk1 = self._make_worker('host1', {'arm': '/usr/bin/arm-gcc'}, + self._demand_responses(_make_result('rpi'))) + wrk2 = self._make_worker('host2', + {'riscv': '/usr/bin/riscv64-gcc'}, + self._demand_responses(_make_result('odroid'))) + + pool = boss.WorkerPool([]) + pool.workers = [wrk1, wrk2] + + boards = { + 'rpi': FakeBoard('rpi', 'arm'), + 'odroid': FakeBoard('odroid', 'riscv'), + } + builder = _make_builder(self._tmpdir) + + with terminal.capture(): + pool.build_boards(boards, None, builder) + + # Both workers should have received build_prepare + wrk1.build_prepare.assert_called_once() + wrk2.build_prepare.assert_called_once() + + # Verify done files were written + for board in ['rpi', 'odroid']: + done_path = os.path.join(self._tmpdir, board, 'done') + self.assertTrue(os.path.exists(done_path)) + + def test_board_stays_on_same_worker(self): + """Test that all commits for a board go to the same worker""" + commits = [types.SimpleNamespace(hash=f'abc{i}') for i in range(3)] + + # Two workers with different archs, each gets one board + wrk1 = self._make_worker('host1', {'arm': '/usr/bin/arm-gcc'}, + self._demand_responses( + *[_make_result('rpi', commit_upto=i) + for i in range(3)])) + wrk2 = self._make_worker('host2', + {'riscv': '/usr/bin/riscv64-gcc'}, + self._demand_responses( + *[_make_result('odroid', commit_upto=i) + for i in range(3)])) + + pool = boss.WorkerPool([]) + pool.workers = [wrk1, wrk2] + + boards = { + 'rpi': FakeBoard('rpi', 'arm'), + 'odroid': FakeBoard('odroid', 'riscv'), + } + builder = _make_builder(self._tmpdir) + + with terminal.capture(): + pool.build_boards(boards, commits, builder) + + # Each worker gets one board via build_board + wrk1.build_board.assert_called_once() + wrk2.build_board.assert_called_once() + + # Check each worker got the right board + self.assertEqual(wrk1.build_board.call_args[0][0], 'rpi') + self.assertEqual(wrk2.build_board.call_args[0][0], 'odroid') + + def test_arch_passed(self): + """Test that the board's arch is sent to the worker""" + wrk = self._make_worker( + 'host1', + {'arm': '/usr/bin/arm-linux-gnueabihf-gcc'}, + self._demand_responses(_make_result('rpi'))) + + pool = boss.WorkerPool([]) + pool.workers = [wrk] + + boards = {'rpi': FakeBoard('rpi', 'arm')} + builder = _make_builder(self._tmpdir) + + with terminal.capture(): + pool.build_boards(boards, None, builder) + + wrk.build_board.assert_called_once() + # build_board(board, arch) + self.assertEqual(wrk.build_board.call_args[0][1], 'arm') + + def test_worker_error_response(self): + """Test that error responses are caught and stop the worker""" + wrk = self._make_worker( + 'host1', {'arm': '/usr/bin/arm-gcc'}, [ + {'resp': 'error', 'msg': 'no work directory set up'}, + ]) + + pool = boss.WorkerPool([]) + pool.workers = [wrk] + + boards = { + 'rpi': FakeBoard('rpi', 'arm'), + 'odroid': FakeBoard('odroid', 'arm'), + } + builder = _make_builder(self._tmpdir) + + with terminal.capture(): + pool.build_boards(boards, None, builder) + + # build_prepare sent, error on first recv stops the worker + self.assertTrue(wrk.build_prepare.called) + + def test_boss_error_stops_worker(self): + """Test that BossError from recv() stops the worker""" + wrk = self._make_worker( + 'host1', {'arm': '/usr/bin/arm-gcc'}, []) + wrk.recv.side_effect = boss.BossError('connection lost') + + pool = boss.WorkerPool([]) + pool.workers = [wrk] + + boards = { + 'rpi': FakeBoard('rpi', 'arm'), + 'odroid': FakeBoard('odroid', 'arm'), + } + builder = _make_builder(self._tmpdir) + + with terminal.capture(): + pool.build_boards(boards, None, builder) + + # build_prepare sent, but only one recv attempted before error + self.assertTrue(wrk.build_prepare.called) + self.assertEqual(wrk.recv.call_count, 1) + + def test_toolchain_matching(self): + """Test boards only go to workers with the right toolchain""" + wrk_arm = self._make_worker( + 'arm-host', {'arm': '/usr/bin/arm-gcc'}, + self._demand_responses( + _make_result('rpi'), + _make_result('odroid'), + )) + wrk_riscv = self._make_worker( + 'rv-host', {'riscv': '/usr/bin/riscv64-gcc'}, + self._demand_responses( + _make_result('qemu_rv'), + )) + + pool = boss.WorkerPool([]) + pool.workers = [wrk_arm, wrk_riscv] + + boards = { + 'rpi': FakeBoard('rpi', 'arm'), + 'odroid': FakeBoard('odroid', 'arm'), + 'qemu_rv': FakeBoard('qemu_rv', 'riscv'), + } + builder = _make_builder(self._tmpdir) + + with terminal.capture(): + pool.build_boards(boards, None, builder) + + # arm boards go to wrk_arm, riscv to wrk_riscv + arm_boards = {call[0][0] + for call in wrk_arm.build_board.call_args_list} + rv_boards = {call[0][0] + for call in wrk_riscv.build_board.call_args_list} + self.assertEqual(arm_boards, {'rpi', 'odroid'}) + self.assertEqual(rv_boards, {'qemu_rv'}) + + def test_sandbox_any_worker(self): + """Test that sandbox boards can go to any worker""" + wrk = self._make_worker( + 'host1', {'arm': '/usr/bin/arm-gcc'}, + self._demand_responses( + _make_result('sandbox'), + )) + + pool = boss.WorkerPool([]) + pool.workers = [wrk] + + boards = {'sandbox': FakeBoard('sandbox', 'sandbox')} + builder = _make_builder(self._tmpdir) + + with terminal.capture(): + pool.build_boards(boards, None, builder) + + # sandbox should be sent to the worker even though it has + # no 'sandbox' toolchain — sandbox uses the host compiler + wrk.build_board.assert_called_once() + self.assertEqual(wrk.build_board.call_args[0][1], 'sandbox') + + def test_skip_done_boards(self): + """Test that already-done boards are skipped without force""" + # Create a done file for 'rpi' + done_path = os.path.join(self._tmpdir, 'rpi_done') + tools.write_file(done_path, '0\n', binary=False) + + wrk = self._make_worker( + 'host1', {'arm': '/usr/bin/arm-gcc'}, + self._demand_responses( + _make_result('odroid'), + )) + + pool = boss.WorkerPool([]) + pool.workers = [wrk] + + boards = { + 'rpi': FakeBoard('rpi', 'arm'), + 'odroid': FakeBoard('odroid', 'arm'), + } + builder = _make_builder(self._tmpdir, force_build=False) + builder.get_done_file.side_effect = ( + lambda c, b: os.path.join(self._tmpdir, f'{b}_done')) + + with terminal.capture(): + pool.build_boards(boards, None, builder) + + # Only odroid should be built (rpi has done file) + wrk.build_board.assert_called_once() + self.assertEqual(wrk.build_board.call_args[0][0], 'odroid') + + def test_no_capable_worker(self): + """Test boards with no capable worker are silently skipped""" + wrk = self._make_worker( + 'host1', {'arm': '/usr/bin/arm-gcc'}, + [{'resp': 'build_prepare_done'}]) + + pool = boss.WorkerPool([]) + pool.workers = [wrk] + + boards = {'qemu_rv': FakeBoard('qemu_rv', 'riscv')} + builder = _make_builder(self._tmpdir) + + with terminal.capture(): + pool.build_boards(boards, None, builder) + + # No build_board should be sent (no riscv worker) + wrk.build_board.assert_not_called() + + def test_progress_updated(self): + """Test that process_result is called for each build result""" + wrk = self._make_worker( + 'host1', {'arm': '/usr/bin/arm-gcc'}, + self._demand_responses( + _make_result('rpi'), + _make_result('odroid'), + )) + + pool = boss.WorkerPool([]) + pool.workers = [wrk] + + brd_rpi = FakeBoard('rpi', 'arm') + brd_odroid = FakeBoard('odroid', 'arm') + boards = {'rpi': brd_rpi, 'odroid': brd_odroid} + builder = _make_builder(self._tmpdir) + + with terminal.capture(): + pool.build_boards(boards, None, builder) + + # process_result should be called for each board + self.assertEqual(builder.process_result.call_count, 2) + # Each result should have remote set to hostname + for call in builder.process_result.call_args_list: + result = call[0][0] + self.assertEqual(result.remote, 'host1') + + def test_log_files_created(self): + """Test that worker log files are created in the output dir""" + wrk = self._make_worker( + 'myhost', {'arm': '/usr/bin/arm-gcc'}, + self._demand_responses( + _make_result('rpi'), + )) + + pool = boss.WorkerPool([]) + pool.workers = [wrk] + + boards = {'rpi': FakeBoard('rpi', 'arm')} + builder = _make_builder(self._tmpdir) + + with terminal.capture(): + pool.build_boards(boards, None, builder) + + log_path = os.path.join(self._tmpdir, 'worker-myhost.log') + self.assertTrue(os.path.exists(log_path)) + content = tools.read_file(log_path, binary=False) + self.assertIn('>> 1 boards', content) + self.assertIn('<< ', content) + self.assertIn('build_result', content) + + def test_heartbeat_resets_timeout(self): + """Test that heartbeat messages are accepted without error""" + wrk = self._make_worker( + 'host1', {'arm': '/usr/bin/arm-gcc'}, [ + {'resp': 'build_prepare_done'}, + {'resp': 'heartbeat', 'board': 'rpi', 'thread': 0}, + _make_result('rpi'), + {'resp': 'build_done'}, + ]) + + pool = boss.WorkerPool([]) + pool.workers = [wrk] + + boards = {'rpi': FakeBoard('rpi', 'arm')} + builder = _make_builder(self._tmpdir) + + with terminal.capture(): + pool.build_boards(boards, None, builder) + + # The heartbeat should be silently consumed, result processed + self.assertEqual(builder.process_result.call_count, 1) + + def test_build_done_stops_worker(self): + """Test that build_done ends collection without timeout""" + wrk = self._make_worker( + 'host1', {'arm': '/usr/bin/arm-gcc'}, [ + {'resp': 'build_prepare_done'}, + _make_result('rpi'), + # Worker had 2 boards but only 1 result, then + # build_done + {'resp': 'build_done', 'exceptions': 1}, + # Final build_done response after boss sends + # build_done + {'resp': 'build_done'}, + ]) + + pool = boss.WorkerPool([]) + pool.workers = [wrk] + + boards = { + 'rpi': FakeBoard('rpi', 'arm'), + 'odroid': FakeBoard('odroid', 'arm'), + } + builder = _make_builder(self._tmpdir) + + with terminal.capture(): + pool.build_boards(boards, None, builder) + + # Only 1 result processed (odroid was lost to a thread + # exception) + self.assertEqual(builder.process_result.call_count, 1) + + +class TestPipelinedBuilds(unittest.TestCase): + """Test pipelined builds with multiple slots""" + + def setUp(self): + self._tmpdir = tempfile.mkdtemp() + + def tearDown(self): + shutil.rmtree(self._tmpdir, ignore_errors=True) + + def test_multiple_boards(self): + """Test that boss sends build_prepare then build_board for each""" + wrk = TestBuildBoards._make_worker( + 'host1', {'arm': '/usr/bin/arm-gcc'}, [ + {'resp': 'build_prepare_done'}, + _make_result('b1'), + _make_result('b2'), + _make_result('b3'), + _make_result('b4'), + {'resp': 'build_done'}, + ]) + + pool = boss.WorkerPool([]) + pool.workers = [wrk] + + boards = { + 'b1': FakeBoard('b1', 'arm'), + 'b2': FakeBoard('b2', 'arm'), + 'b3': FakeBoard('b3', 'arm'), + 'b4': FakeBoard('b4', 'arm'), + } + builder = _make_builder(self._tmpdir) + with terminal.capture(): + pool.build_boards(boards, None, builder) + + # build_prepare called once, build_board called 4 times + wrk.build_prepare.assert_called_once() + sent_boards = {call[0][0] + for call in wrk.build_board.call_args_list} + self.assertEqual(sent_boards, {'b1', 'b2', 'b3', 'b4'}) + # All 4 results collected + self.assertEqual(builder.process_result.call_count, 4) + + @mock.patch('subprocess.Popen') + def test_slots_from_ready(self, mock_popen): + """Test that slots is read from the worker's ready response""" + proc = FakeProc([ + _make_response( + {'resp': 'ready', 'nthreads': 20, 'slots': 5}), + ]) + w = _start_worker('host1', mock_popen, proc) + self.assertEqual(w.nthreads, 20) + self.assertEqual(w.slots, 5) + + @mock.patch('subprocess.Popen') + def test_slots_default(self, mock_popen): + """Test that slots defaults to 1 for old workers""" + proc = FakeProc([ + _make_response({'resp': 'ready', 'nthreads': 8}), + ]) + w = _start_worker('host1', mock_popen, proc) + self.assertEqual(w.slots, 1) + + +class TestBuildTimeout(unittest.TestCase): + """Test that the build timeout prevents hangs""" + + def setUp(self): + self._tmpdir = tempfile.mkdtemp() + self._orig_timeout = boss.BUILD_TIMEOUT + + def tearDown(self): + shutil.rmtree(self._tmpdir, ignore_errors=True) + boss.BUILD_TIMEOUT = self._orig_timeout + + def test_recv_timeout(self): + """Test that a hung worker times out instead of blocking""" + + # Use a very short timeout so the test runs quickly + boss.BUILD_TIMEOUT = 0.5 + + wrk = TestBuildBoards._make_worker( + 'slowhost', {'arm': '/usr/bin/arm-gcc'}, []) + wrk.nthreads = 1 + wrk.max_boards = 1 + wrk.slots = 1 + + # Simulate a worker that never responds: recv blocks forever + wrk.recv.side_effect = lambda: time.sleep(60) + + pool = boss.WorkerPool([]) + pool.workers = [wrk] + + boards = {'rpi': FakeBoard('rpi', 'arm')} + builder = _make_builder(self._tmpdir) + + start = time.monotonic() + with terminal.capture(): + pool.build_boards(boards, None, builder) + elapsed = time.monotonic() - start + + # Should complete quickly (within a few seconds), not hang + self.assertLess(elapsed, 10) + + # No results should have been processed + builder.process_result.assert_not_called() + + +class TestMachineMaxBoards(unittest.TestCase): + """Test per-machine max_boards config""" + + def test_max_boards_from_config(self): + """Test [machine:name] section sets max_boards""" + bsettings.setup('') + bsettings.settings.read_string(""" +[machines] +ruru +weka + +[machine:ruru] +max_boards = 64 +""") + pool = machine.MachinePool() + pool._load_from_config() + by_name = {m.name: m for m in pool.machines} + self.assertEqual(by_name['ruru'].max_boards, 64) + self.assertEqual(by_name['weka'].max_boards, 0) + + def test_max_boards_default(self): + """Test max_boards is 0 when no per-machine section exists""" + mach = machine.Machine('host1') + self.assertEqual(mach.max_boards, 0) + + +class TestGccVersion(unittest.TestCase): + """Test gcc_version()""" + + def test_buildman_toolchain(self): + """Test extracting version from a buildman-fetched toolchain""" + path = ('/home/sglass/.buildman-toolchains/gcc-13.1.0-nolibc/' + 'aarch64-linux/bin/aarch64-linux-gcc') + self.assertEqual(machine.gcc_version(path), 'gcc-13.1.0-nolibc') + + def test_different_version(self): + """Test a different gcc version""" + path = ('/home/sglass/.buildman-toolchains/gcc-11.1.0-nolibc/' + 'aarch64-linux/bin/aarch64-linux-gcc') + self.assertEqual(machine.gcc_version(path), 'gcc-11.1.0-nolibc') + + def test_system_gcc(self): + """Test a system gcc with no version directory""" + self.assertIsNone(machine.gcc_version('/usr/bin/gcc')) + + def test_empty(self): + """Test an empty path""" + self.assertIsNone(machine.gcc_version('')) + + + +class _PipelineWorker: # pylint: disable=R0902 + """Mock worker that simulates the demand-driven build protocol + + Responds to build_prepare, build_board, and build_done commands + via the recv() queue, simulating real worker behaviour. + """ + + def __init__(self, nthreads, max_boards=0): + self.hostname = 'sim-host' + self.name = 'sim-host' + self.toolchains = {'arm': '/usr/bin/arm-gcc'} + self.nthreads = nthreads + self.max_boards = max_boards or nthreads + self.slots = nthreads + self.bogomips = 5000.0 + self.closing = False + + self._ready = queue.Queue() + self._commits = None + + def build_prepare(self, commits): + """Accept prepare command and queue ready response""" + self._commits = commits + self._ready.put({'resp': 'build_prepare_done'}) + + def build_board(self, board, _arch): + """Queue results for this board across all commits""" + + def _produce(): + for cu in range(len(self._commits)): + time.sleep(random.uniform(0.001, 0.005)) + self._ready.put({ + 'resp': 'build_result', + 'board': board, + 'commit_upto': cu, + 'return_code': 0, + 'stderr': '', + 'stdout': '', + }) + + threading.Thread(target=_produce, daemon=True).start() + + def build_done(self): + """Queue the build_done response after a short delay""" + def _respond(): + time.sleep(0.05) + self._ready.put({'resp': 'build_done'}) + threading.Thread(target=_respond, daemon=True).start() + + def recv(self): + """Wait for the next result""" + return self._ready.get() + + +class TestBuildBoardsUtilisation(unittest.TestCase): + """Test that build_boards dispatches correctly to workers + + The boss sends one build_boards command per worker. The mock + worker simulates board-first scheduling and produces results. + """ + + def setUp(self): + self._tmpdir = tempfile.mkdtemp() + + def tearDown(self): + shutil.rmtree(self._tmpdir, ignore_errors=True) + + def _run_build(self, nboards, ncommits, nthreads, max_boards=0): + """Run a build and return the mock worker""" + wrk = _PipelineWorker(nthreads, max_boards=max_boards) + + pool = boss.WorkerPool([]) + pool.workers = [wrk] + + boards = {} + for i in range(nboards): + target = f'board{i}' + boards[target] = FakeBoard(target, 'arm') + + commits = [types.SimpleNamespace(hash=f'commit{i}') + for i in range(ncommits)] + + builder = mock.Mock() + builder.force_build = True + builder.base_dir = self._tmpdir + builder.count = 0 + builder.get_build_dir.side_effect = ( + lambda c, b: os.path.join(self._tmpdir, b)) + + with terminal.capture(): + pool.build_boards(boards, commits, builder) + return wrk + + def test_all_results_collected(self): + """Verify boss collects all board x commit results""" + nboards = 30 + ncommits = 10 + wrk = self._run_build(nboards, ncommits, nthreads=8) + + # The ready queue should be empty (boss drained everything) + self.assertTrue(wrk._ready.empty()) + + def test_large_scale(self): + """Test at realistic scale: 200 boards x 10 commits""" + wrk = self._run_build(nboards=200, ncommits=10, + nthreads=32) + self.assertTrue(wrk._ready.empty()) + + def test_max_boards_caps_batch(self): + """Test that max_boards limits initial and in-flight boards""" + wrk = self._run_build(nboards=30, ncommits=5, + nthreads=32, max_boards=8) + self.assertTrue(wrk._ready.empty()) + # Worker should still receive all boards despite the cap + self.assertEqual(wrk.max_boards, 8) + + +class TestFormatBytes(unittest.TestCase): + """Test _format_bytes()""" + + def test_format_bytes(self): + """Test bytes, KB and MB ranges""" + self.assertEqual(boss._format_bytes(0), '0B') + self.assertEqual(boss._format_bytes(1023), '1023B') + self.assertEqual(boss._format_bytes(1024), '1.0KB') + self.assertEqual(boss._format_bytes(1536), '1.5KB') + self.assertEqual(boss._format_bytes(1024 * 1024), '1.0MB') + self.assertEqual(boss._format_bytes(5 * 1024 * 1024), '5.0MB') + + +# Tests merged into TestWriteRemoteResult above + + def test_with_stderr(self): + """Test writing result with stderr""" + bldr = mock.Mock() + bldr.get_build_dir.return_value = tempfile.mkdtemp() + resp = { + 'board': 'sandbox', + 'commit_upto': 0, + 'return_code': 2, + 'stderr': 'error: missing header\n', + 'stdout': '', + } + boss._write_remote_result(bldr, resp, {'sandbox': mock.Mock()}, + 'host1') + build_dir = bldr.get_build_dir.return_value + err_path = os.path.join(build_dir, 'err') + self.assertIn('error:', + tools.read_file(err_path, binary=False)) + + shutil.rmtree(build_dir) + + def test_removes_stale_err(self): + """Test that stale err file is removed on success""" + bldr = mock.Mock() + build_dir = tempfile.mkdtemp() + bldr.get_build_dir.return_value = build_dir + err_path = os.path.join(build_dir, 'err') + tools.write_file(err_path, 'old error', binary=False) + resp = { + 'board': 'sandbox', + 'commit_upto': 0, + 'return_code': 0, + 'stderr': '', + 'stdout': '', + } + boss._write_remote_result(bldr, resp, {'sandbox': mock.Mock()}, + 'host1') + self.assertFalse(os.path.exists(err_path)) + + shutil.rmtree(build_dir) + + +class TestRemoteWorkerMethods(unittest.TestCase): + """Test RemoteWorker send/recv/close methods""" + + def test_send(self): + """Test _send writes JSON to stdin""" + wrk = _make_worker() + wrk._proc.stdin = mock.Mock() + wrk._send({'cmd': 'quit'}) + wrk._proc.stdin.write.assert_called_once() + data = wrk._proc.stdin.write.call_args[0][0] + self.assertIn(b'quit', data) + + def test_send_broken_pipe(self): + """Test _send raises BrokenPipeError on broken pipe""" + wrk = _make_worker() + wrk._proc.stdin.write.side_effect = BrokenPipeError() + with self.assertRaises(BrokenPipeError): + wrk._send({'cmd': 'quit'}) + + def test_build_commands(self): + """Test build_prepare, build_board and build_done commands""" + wrk = _make_worker() + wrk._send = mock.Mock() + + wrk.build_prepare(['abc123']) + self.assertEqual(wrk._send.call_args[0][0]['cmd'], + 'build_prepare') + + wrk._send.reset_mock() + wrk.build_board('sandbox', 'sandbox') + self.assertEqual(wrk._send.call_args[0][0]['cmd'], + 'build_board') + + wrk._send.reset_mock() + wrk.build_done() + self.assertEqual(wrk._send.call_args[0][0]['cmd'], + 'build_done') + + def test_quit(self): + """Test quit sends command and closes, handles errors""" + wrk = _make_worker() + wrk._send = mock.Mock() + wrk._recv = mock.Mock(return_value={'resp': 'quit_ack'}) + wrk.close = mock.Mock() + wrk.quit() + wrk._send.assert_called_once() + wrk.close.assert_called_once() + + # Error path: BossError during quit still closes + wrk2 = _make_worker() + wrk2._send = mock.Mock(side_effect=boss.BossError('gone')) + wrk2.close = mock.Mock() + wrk2.quit() + wrk2.close.assert_called_once() + + def test_close_idempotent(self): + """Test close can be called multiple times""" + wrk = _make_worker() + wrk.close() + self.assertIsNone(wrk._proc) + wrk.close() # should not raise + + @mock.patch('buildman.boss._run_ssh') + def test_remove_lock(self, mock_ssh): + """Test remove_lock: SSH call, no work_dir, SSH failure""" + wrk = _make_worker() + wrk.work_dir = '/tmp/bm' + wrk.remove_lock() + mock_ssh.assert_called_once() + + # No work_dir: does nothing + wrk2 = _make_worker() + wrk2.work_dir = '' + wrk2.remove_lock() + + # SSH failure: silently ignored + mock_ssh.side_effect = boss.BossError('gone') + wrk3 = _make_worker() + wrk3.work_dir = '/tmp/bm' + wrk3.remove_lock() + + + + + +class TestWorkerPoolCapacity(unittest.TestCase): + """Test WorkerPool capacity and arch assignment""" + + def test_get_capacity(self): + """Test capacity calculation""" + wrk = mock.Mock(nthreads=8, bogomips=5000.0) + self.assertEqual( + boss.WorkerPool._get_capacity(wrk), 40000.0) + + def test_get_capacity_no_bogomips(self): + """Test capacity with no bogomips falls back to nthreads""" + wrk = mock.Mock(nthreads=4, bogomips=0) + self.assertEqual(boss.WorkerPool._get_capacity(wrk), 4.0) + + def test_get_worker_for_arch(self): + """Test arch-based worker selection""" + w1 = mock.Mock(nthreads=8, bogomips=5000.0, + toolchains={'arm': '/gcc'}) + w2 = mock.Mock(nthreads=4, bogomips=5000.0, + toolchains={'arm': '/gcc'}) + pool = boss.WorkerPool.__new__(boss.WorkerPool) + pool.workers = [w1, w2] + assigned = {} + + # First assignment should go to w1 (higher capacity) + wrk = pool._get_worker_for_arch('arm', assigned) + self.assertEqual(wrk, w1) + + # Second should go to w2 (w1 already has 1) + wrk = pool._get_worker_for_arch('arm', assigned) + self.assertEqual(wrk, w2) + + def test_get_worker_sandbox(self): + """Test sandbox goes to any worker""" + w1 = mock.Mock(nthreads=4, bogomips=1000.0, toolchains={}) + pool = boss.WorkerPool.__new__(boss.WorkerPool) + pool.workers = [w1] + assigned = {} + wrk = pool._get_worker_for_arch('sandbox', assigned) + self.assertEqual(wrk, w1) + + def test_get_worker_no_capable(self): + """Test returns None when no worker supports arch""" + w1 = mock.Mock(nthreads=4, bogomips=1000.0, + toolchains={'arm': '/gcc'}) + pool = boss.WorkerPool.__new__(boss.WorkerPool) + pool.workers = [w1] + self.assertIsNone( + pool._get_worker_for_arch('mips', {})) + + +class TestBossLog(unittest.TestCase): + """Test _BossLog""" + + def test_log_and_close(self): + """Test logging and closing""" + with tempfile.TemporaryDirectory() as tmpdir: + blog = boss._BossLog(tmpdir) + wrk = mock.Mock(name='host1', nthreads=4) + wrk.name = 'host1' + blog.init_worker(wrk) + blog.log('test message') + blog.record_sent('host1', 3) + blog.record_recv('host1', load_avg=2.5) + blog.close() + + log_path = os.path.join(tmpdir, '.buildman.log') + content = tools.read_file(log_path, binary=False) + self.assertIn('test message', content) + + def test_log_status(self): + """Test log_status writes counts""" + with tempfile.TemporaryDirectory() as tmpdir: + blog = boss._BossLog(tmpdir) + wrk = mock.Mock(name='host1', nthreads=4) + wrk.name = 'host1' + blog.init_worker(wrk) + blog.record_sent('host1', 5) + blog.record_recv('host1') + blog.record_recv('host1') + blog.log_status() + blog.close() + + content = tools.read_file( + os.path.join(tmpdir, '.buildman.log'), binary=False) + self.assertIn('host1', content) + + def test_start_timer(self): + """Test start_timer and close with elapsed""" + with tempfile.TemporaryDirectory() as tmpdir: + blog = boss._BossLog(tmpdir) + blog.start_timer() + blog.close() + + +class TestDispatchContext(unittest.TestCase): + """Test _DispatchContext""" + + def test_update_progress_build_started(self): + """Test worktree progress: build_started""" + wrk = mock.Mock(nthreads=4) + wrk.name = 'host1' + builder = mock.Mock() + with tempfile.TemporaryDirectory() as tmpdir: + builder.base_dir = tmpdir + ctx = boss._DispatchContext( + workers=[wrk], builder=builder, + board_selected={}, boss_log=mock.Mock()) + resp = {'resp': 'build_started', 'num_threads': 8} + self.assertTrue(ctx.update_progress(resp, wrk)) + ctx.close() + + def test_update_progress_worktree_created(self): + """Test worktree progress: worktree_created""" + wrk = mock.Mock(nthreads=2) + wrk.name = 'host1' + builder = mock.Mock() + with tempfile.TemporaryDirectory() as tmpdir: + builder.base_dir = tmpdir + ctx = boss._DispatchContext( + workers=[wrk], builder=builder, + board_selected={}, boss_log=mock.Mock()) + ctx.update_progress( + {'resp': 'build_started', 'num_threads': 2}, wrk) + self.assertTrue(ctx.update_progress( + {'resp': 'worktree_created'}, wrk)) + ctx.close() + + def test_update_progress_other(self): + """Test non-progress messages return False""" + wrk = mock.Mock(nthreads=4) + wrk.name = 'host1' + builder = mock.Mock() + with tempfile.TemporaryDirectory() as tmpdir: + builder.base_dir = tmpdir + ctx = boss._DispatchContext( + workers=[wrk], builder=builder, + board_selected={}, boss_log=mock.Mock()) + self.assertFalse(ctx.update_progress( + {'resp': 'build_result'}, wrk)) + ctx.close() + + def test_log(self): + """Test per-worker log file""" + wrk = mock.Mock(nthreads=4) + wrk.name = 'host1' + builder = mock.Mock() + with tempfile.TemporaryDirectory() as tmpdir: + builder.base_dir = tmpdir + ctx = boss._DispatchContext( + workers=[wrk], builder=builder, + board_selected={}, boss_log=mock.Mock()) + ctx.log(wrk, '>>>', 'test message') + ctx.close() + content = tools.read_file( + os.path.join(tmpdir, 'worker-host1.log'), + binary=False) + self.assertIn('test message', content) + + +class TestRemoteWorkerClose(unittest.TestCase): + """Test RemoteWorker.close() error paths""" + + def test_close_error_paths(self): + """Test close: stdin OSError, terminate timeout, kill timeout""" + # stdin.close() raises OSError + wrk = _make_worker() + wrk._proc.stdin.close.side_effect = OSError('broken') + wrk.close() + self.assertIsNone(wrk._proc) + + # wait() times out, terminate succeeds + wrk2 = _make_worker() + wrk2._proc.wait.side_effect = [ + subprocess.TimeoutExpired('ssh', 2), + None, + ] + wrk2.close() + self.assertIsNone(wrk2._proc) + + # wait() times out twice, falls back to kill + wrk3 = _make_worker() + wrk3._proc.wait.side_effect = [ + subprocess.TimeoutExpired('ssh', 2), + subprocess.TimeoutExpired('ssh', 3), + ] + wrk3.close() + self.assertIsNone(wrk3._proc) + + def test_configure_rejected(self): + """Test configure raises on rejection""" + wrk = _make_worker() + wrk._send = mock.Mock() + wrk._recv = mock.Mock(return_value={'resp': 'error', 'msg': 'bad'}) + with self.assertRaises(boss.BossError): + wrk.configure({'no_lto': True}) + + def test_get_stderr(self): + """Test _get_stderr returns last non-empty line, or empty""" + wrk = _make_worker() + wrk._stderr_thread = mock.Mock() + wrk._stderr_lines = ['first', '', 'last error', ''] + self.assertEqual(wrk._get_stderr(), 'last error') + + wrk._stderr_lines = [] + self.assertEqual(wrk._get_stderr(), '') + + +# Tests merged into TestWriteRemoteResult above + + def test_sizes_with_header(self): + """Test that size output header line is stripped""" + bldr = mock.Mock() + build_dir = tempfile.mkdtemp() + bldr.get_build_dir.return_value = build_dir + resp = { + 'board': 'sandbox', + 'commit_upto': 0, + 'return_code': 0, + 'stderr': '', + 'stdout': '', + 'sizes': { + 'raw': (' text data bss dec hex\n' + ' 1000 200 100 1300 514\n')}, + } + boss._write_remote_result(bldr, resp, {'sandbox': mock.Mock()}, + 'host1') + sizes_content = tools.read_file( + os.path.join(build_dir, 'sizes'), binary=False) + self.assertNotIn('text', sizes_content) + self.assertIn('1000', sizes_content) + + shutil.rmtree(build_dir) + + + + + +class TestWorkerPoolEdgeCases(unittest.TestCase): + """Test WorkerPool edge cases""" + + def test_print_transfer_empty(self): + """Test print_transfer_summary with no workers""" + pool = boss.WorkerPool([]) + with terminal.capture(): + pool.print_transfer_summary() + + def test_quit_all_with_boss_log(self): + """Test quit_all closes boss_log""" + pool = boss.WorkerPool([]) + blog = mock.Mock() + pool._boss_log = blog + with terminal.capture(): + pool.quit_all() + blog.log.assert_called() + blog.close.assert_called_once() + + def test_build_boards_with_local_count(self): + """Test build_boards progress includes local count""" + pool = boss.WorkerPool([]) + wrk = mock.Mock( + nthreads=4, bogomips=1000.0, slots=2, + toolchains={'arm': '/gcc'}, closing=False) + wrk.name = 'host1' + wrk.recv.side_effect = [ + {'resp': 'build_prepare_done'}, + {'resp': 'build_result', 'board': 'rpi', + 'commit_upto': 0, 'return_code': 0, + 'stderr': '', 'stdout': ''}, + {'resp': 'build_done'}, + ] + pool.workers = [wrk] + + builder = mock.Mock() + builder.force_build = True + builder.base_dir = tempfile.mkdtemp() + builder.count = 0 + builder.get_build_dir.return_value = tempfile.mkdtemp() + + boards = {'rpi': mock.Mock(target='rpi', arch='arm')} + with terminal.capture(): + pool.build_boards(boards, None, builder, local_count=5) + + shutil.rmtree(builder.base_dir) + shutil.rmtree(builder.get_build_dir.return_value) + + def test_get_capacity_no_bogomips(self): + """Test _get_worker_for_arch falls back when bogomips is 0""" + w1 = mock.Mock(nthreads=4, bogomips=0, toolchains={'arm': '/gcc'}) + pool = boss.WorkerPool.__new__(boss.WorkerPool) + pool.workers = [w1] + wrk = pool._get_worker_for_arch('arm', {}) + self.assertEqual(wrk, w1) + + def test_close_all(self): + """Test close_all with boss_log""" + pool = boss.WorkerPool([]) + wrk = mock.Mock() + wrk.closing = False + wrk.bytes_sent = 100 + wrk.bytes_recv = 200 + wrk.name = 'host1' + pool.workers = [wrk] + blog = mock.Mock() + pool._boss_log = blog + with terminal.capture(): + pool.close_all() + wrk.close.assert_called() + wrk.remove_lock.assert_called() + blog.close.assert_called_once() + self.assertEqual(len(pool.workers), 0) + + +class TestDispatchContextRecv(unittest.TestCase): + """Test _DispatchContext.recv() error paths""" + + + @mock.patch.object(boss, 'BUILD_TIMEOUT', 0.01) + def test_recv_timeout(self): + """Test recv returns None on timeout""" + ctx, wrk, tmpdir = _make_ctx() + recv_q = queue.Queue() # empty queue + with terminal.capture(): + result = ctx.recv(wrk, recv_q) + self.assertIsNone(result) + ctx.close() + shutil.rmtree(tmpdir) + + def test_recv_error_response(self): + """Test recv returns None on error response""" + ctx, wrk, tmpdir = _make_ctx() + recv_q = queue.Queue() + recv_q.put(('error', 'something broke')) + with terminal.capture(): + result = ctx.recv(wrk, recv_q) + self.assertIsNone(result) + ctx.close() + shutil.rmtree(tmpdir) + + def test_recv_worker_error(self): + """Test recv returns None on worker error response""" + ctx, wrk, tmpdir = _make_ctx( + {'rpi': mock.Mock(target='rpi', arch='arm')}) + recv_q = queue.Queue() + recv_q.put(('resp', {'resp': 'error', 'msg': 'oops'})) + with terminal.capture(): + result = ctx.recv(wrk, recv_q) + self.assertIsNone(result) + ctx.close() + shutil.rmtree(tmpdir) + + def test_write_result_exception(self): + """Test write_result catches exceptions""" + ctx, wrk, tmpdir = _make_ctx( + {'rpi': mock.Mock(target='rpi', arch='arm')}) + ctx.builder.get_build_dir.side_effect = RuntimeError('boom') + resp = {'board': 'sandbox', 'commit_upto': 0, + 'return_code': 0} + with terminal.capture(): + result = ctx.write_result(wrk, resp) + self.assertFalse(result) + ctx.close() + shutil.rmtree(tmpdir) + + def test_wait_for_prepare(self): + """Test wait_for_prepare with prepare_done""" + ctx, wrk, tmpdir = _make_ctx() + recv_q = queue.Queue() + recv_q.put(('resp', {'resp': 'build_prepare_done'})) + result = ctx.wait_for_prepare(wrk, recv_q) + self.assertTrue(result) + ctx.close() + shutil.rmtree(tmpdir) + + @mock.patch.object(boss, 'BUILD_TIMEOUT', 0.01) + def test_wait_for_prepare_timeout(self): + """Test wait_for_prepare returns False on timeout""" + ctx, wrk, tmpdir = _make_ctx() + recv_q = queue.Queue() + with terminal.capture(): + result = ctx.wait_for_prepare(wrk, recv_q) + self.assertFalse(result) + ctx.close() + shutil.rmtree(tmpdir) + + def test_send_batch_error(self): + """Test send_batch returns -1 on BossError""" + ctx, wrk, tmpdir = _make_ctx( + {'rpi': mock.Mock(target='rpi', arch='arm')}) + wrk.build_board.side_effect = boss.BossError('gone') + brd = mock.Mock(target='rpi', arch='arm') + result = ctx.send_batch(wrk, [brd]) + self.assertEqual(result, -1) + ctx.close() + shutil.rmtree(tmpdir) + + def test_collect_results_heartbeat(self): + """Test collect_results skips heartbeat""" + ctx, wrk, tmpdir = _make_ctx( + {'rpi': mock.Mock(target='rpi', arch='arm')}) + ctx.board_selected = {'rpi': mock.Mock(target='rpi', arch='arm')} + build_dir = tempfile.mkdtemp() + ctx.builder.get_build_dir.return_value = build_dir + recv_q = queue.Queue() + recv_q.put(('resp', {'resp': 'heartbeat'})) + recv_q.put(('resp', {'resp': 'build_result', 'board': 'rpi', + 'commit_upto': 0, 'return_code': 0, + 'stderr': '', 'stdout': ''})) + recv_q.put(('resp', {'resp': 'build_done'})) + state = boss.DemandState( + sent=1, ncommits=1, grab_func=lambda w, c: []) + with terminal.capture(): + result = ctx.collect_results(wrk, recv_q, state) + self.assertTrue(result) + ctx.close() + shutil.rmtree(tmpdir) + shutil.rmtree(build_dir) + + +class TestForwardStderr(unittest.TestCase): + """Test RemoteWorker._forward_stderr()""" + + def test_forward_stderr(self): + """Test stderr collection and OSError handling""" + wrk = boss.RemoteWorker.__new__(boss.RemoteWorker) + wrk.name = 'host1' + wrk._stderr_lines = [] + wrk._proc = mock.Mock() + wrk._proc.stderr = [b'error line 1\n', b'error line 2\n', b''] + with terminal.capture(): + wrk._forward_stderr() + self.assertEqual(wrk._stderr_lines, + ['error line 1', 'error line 2']) + + # OSError path: silently handled + wrk2 = boss.RemoteWorker.__new__(boss.RemoteWorker) + wrk2.name = 'host1' + wrk2._stderr_lines = [] + wrk2._proc = mock.Mock() + wrk2._proc.stderr.__iter__ = mock.Mock( + side_effect=OSError('closed')) + wrk2._forward_stderr() + + +class TestStartDebug(unittest.TestCase): + """Test RemoteWorker.start() debug flag (line 242)""" + + @mock.patch('subprocess.Popen') + @mock.patch('buildman.boss._run_ssh') + def test_debug_flag(self, _mock_ssh, mock_popen): + """Test start() passes -D when debug=True""" + proc = mock.Mock() + proc.stdout.readline.return_value = ( + b'BM> {"resp":"ready","nthreads":4,"slots":2}\n') + proc.poll.return_value = None + proc.stderr = [] # empty iterable for _forward_stderr thread + mock_popen.return_value = proc + + wrk = boss.RemoteWorker.__new__(boss.RemoteWorker) + wrk.hostname = 'host1' + wrk.name = 'host1' + wrk._proc = None + wrk._closed = False + wrk._closing = False + wrk._stderr_lines = [] + wrk._stderr_thread = None + wrk._ready = queue.Queue() + wrk._log = None + wrk.bytes_sent = 0 + wrk.bytes_recv = 0 + wrk.nthreads = 0 + wrk.slots = 0 + wrk.max_boards = 0 + wrk.toolchains = {} + wrk.closing = False + wrk._work_dir = '/tmp/bm' + wrk._git_dir = '/tmp/bm/.git' + wrk.work_dir = '/tmp/bm' + wrk.timeout = 10 + + wrk.start(debug=True) + cmd = mock_popen.call_args[0][0] + self.assertIn('-D', ' '.join(cmd)) + + +class TestBossLogTimer(unittest.TestCase): + """Test _BossLog.start_timer() (lines 607-612)""" + + def test_timer_ticks(self): + """Test that the timer fires and logs status""" + with tempfile.TemporaryDirectory() as tmpdir: + blog = boss._BossLog(tmpdir) + wrk = mock.Mock(nthreads=4) + wrk.name = 'host1' + blog.init_worker(wrk) + blog.record_sent('host1', 5) + + # Patch STATUS_INTERVAL to fire quickly + with mock.patch.object(boss, 'STATUS_INTERVAL', 0.01): + blog.start_timer() + time.sleep(0.05) + blog.close() + + content = tools.read_file( + os.path.join(tmpdir, '.buildman.log'), binary=False) + # Timer should have logged at least one status line + self.assertIn('host1', content) + + +class TestWaitForPrepareProgress(unittest.TestCase): + """Test wait_for_prepare with progress and heartbeat messages""" + + + def test_heartbeat_during_prepare(self): + """Test heartbeat messages are skipped during prepare""" + ctx, wrk, tmpdir = _make_ctx() + recv_q = queue.Queue() + recv_q.put(('resp', {'resp': 'heartbeat'})) + recv_q.put(('resp', {'resp': 'build_prepare_done'})) + result = ctx.wait_for_prepare(wrk, recv_q) + self.assertTrue(result) + ctx.close() + shutil.rmtree(tmpdir) + + def test_progress_during_prepare(self): + """Test worktree progress during prepare""" + ctx, wrk, tmpdir = _make_ctx() + recv_q = queue.Queue() + recv_q.put(('resp', {'resp': 'build_started', 'num_threads': 2})) + recv_q.put(('resp', {'resp': 'worktree_created'})) + recv_q.put(('resp', {'resp': 'build_prepare_done'})) + result = ctx.wait_for_prepare(wrk, recv_q) + self.assertTrue(result) + ctx.close() + shutil.rmtree(tmpdir) + + def test_unexpected_during_prepare(self): + """Test unexpected response during prepare returns False""" + ctx, wrk, tmpdir = _make_ctx() + recv_q = queue.Queue() + recv_q.put(('resp', {'resp': 'something_weird'})) + result = ctx.wait_for_prepare(wrk, recv_q) + self.assertFalse(result) + ctx.close() + shutil.rmtree(tmpdir) + + +class TestRecvOne(unittest.TestCase): + """Test _DispatchContext.recv_one()""" + + + @mock.patch.object(boss, 'BUILD_TIMEOUT', 0.01) + def test_recv_one_timeout(self): + """Test recv_one returns False on timeout""" + ctx, wrk, tmpdir = _make_ctx() + recv_q = queue.Queue() + with terminal.capture(): + result = ctx.recv_one(wrk, recv_q) + self.assertFalse(result) + ctx.close() + shutil.rmtree(tmpdir) + + def test_recv_one_build_done(self): + """Test recv_one handles build_done with exceptions""" + ctx, wrk, tmpdir = _make_ctx( + {'rpi': mock.Mock(target='rpi', arch='arm')}) + recv_q = queue.Queue() + recv_q.put(('resp', {'resp': 'build_done', 'exceptions': 2})) + result = ctx.recv_one(wrk, recv_q) + self.assertFalse(result) + ctx.close() + shutil.rmtree(tmpdir) + + def test_recv_one_build_result(self): + """Test recv_one processes build_result""" + ctx, wrk, tmpdir = _make_ctx( + {'rpi': mock.Mock(target='rpi', arch='arm')}) + build_dir = tempfile.mkdtemp() + ctx.builder.get_build_dir.return_value = build_dir + recv_q = queue.Queue() + recv_q.put(('resp', {'resp': 'build_result', 'board': 'rpi', + 'commit_upto': 0, 'return_code': 0, + 'stderr': '', 'stdout': ''})) + with terminal.capture(): + result = ctx.recv_one(wrk, recv_q) + self.assertTrue(result) + ctx.close() + shutil.rmtree(tmpdir) + shutil.rmtree(build_dir) + + def test_recv_one_heartbeat(self): + """Test recv_one skips heartbeat then gets result""" + ctx, wrk, tmpdir = _make_ctx( + {'rpi': mock.Mock(target='rpi', arch='arm')}) + build_dir = tempfile.mkdtemp() + ctx.builder.get_build_dir.return_value = build_dir + recv_q = queue.Queue() + recv_q.put(('resp', {'resp': 'heartbeat'})) + recv_q.put(('resp', {'resp': 'build_result', 'board': 'rpi', + 'commit_upto': 0, 'return_code': 0, + 'stderr': '', 'stdout': ''})) + with terminal.capture(): + result = ctx.recv_one(wrk, recv_q) + self.assertTrue(result) + ctx.close() + shutil.rmtree(tmpdir) + shutil.rmtree(build_dir) + + def test_recv_one_other(self): + """Test recv_one returns True for unknown response""" + ctx, wrk, tmpdir = _make_ctx( + {'rpi': mock.Mock(target='rpi', arch='arm')}) + recv_q = queue.Queue() + recv_q.put(('resp', {'resp': 'something_else'})) + result = ctx.recv_one(wrk, recv_q) + self.assertTrue(result) + ctx.close() + shutil.rmtree(tmpdir) + + def test_recv_one_progress(self): + """Test recv_one handles progress then result""" + ctx, wrk, tmpdir = _make_ctx( + {'rpi': mock.Mock(target='rpi', arch='arm')}) + build_dir = tempfile.mkdtemp() + ctx.builder.get_build_dir.return_value = build_dir + recv_q = queue.Queue() + recv_q.put(('resp', {'resp': 'build_started', 'num_threads': 2})) + recv_q.put(('resp', {'resp': 'build_result', 'board': 'rpi', + 'commit_upto': 0, 'return_code': 0, + 'stderr': '', 'stdout': ''})) + with terminal.capture(): + result = ctx.recv_one(wrk, recv_q) + self.assertTrue(result) + ctx.close() + shutil.rmtree(tmpdir) + shutil.rmtree(build_dir) + + +class TestCollectResultsExtended(unittest.TestCase): + """Test collect_results with more board grabbing""" + + + def test_collect_grabs_more(self): + """Test collect_results grabs more boards when in_flight drops""" + ctx, wrk, tmpdir = _make_ctx( + {'rpi': mock.Mock(target='rpi', arch='arm')}) + wrk.max_boards = 2 + build_dir = tempfile.mkdtemp() + ctx.builder.get_build_dir.return_value = build_dir + extra_brd = mock.Mock(target='extra', arch='arm') + grab_calls = [0] + + def grab(_w, _n): + grab_calls[0] += 1 + if grab_calls[0] == 1: + return [extra_brd] + return [] + + state = boss.DemandState(sent=1, ncommits=1, grab_func=grab) + + recv_q = queue.Queue() + recv_q.put(('resp', {'resp': 'build_result', 'board': 'rpi', + 'commit_upto': 0, 'return_code': 0, + 'stderr': '', 'stdout': ''})) + # After rpi completes, in_flight drops to 0 < max_boards=2, + # grab returns extra_brd, then build_done ends collection + recv_q.put(('resp', {'resp': 'build_done'})) + + with terminal.capture(): + ctx.collect_results(wrk, recv_q, state) + + self.assertEqual(state.received, 1) + self.assertGreater(grab_calls[0], 0) + ctx.close() + shutil.rmtree(tmpdir) + shutil.rmtree(ctx.builder.get_build_dir.return_value) + + def test_collect_write_failure(self): + """Test collect_results stops on write_result failure""" + ctx, wrk, tmpdir = _make_ctx( + {'rpi': mock.Mock(target='rpi', arch='arm')}) + ctx.builder.get_build_dir.side_effect = RuntimeError('boom') + + state = boss.DemandState(sent=1, ncommits=1, + grab_func=lambda w, n: []) + recv_q = queue.Queue() + recv_q.put(('resp', {'resp': 'build_result', 'board': 'rpi', + 'commit_upto': 0, 'return_code': 0, + 'stderr': '', 'stdout': ''})) + with terminal.capture(): + result = ctx.collect_results(wrk, recv_q, state) + self.assertFalse(result) + ctx.close() + shutil.rmtree(tmpdir) + + +class TestRunParallelErrors(unittest.TestCase): + """Test WorkerPool._run_parallel error paths""" + + def test_worker_busy_and_boss_error(self): + """Test _run_parallel handles WorkerBusy and BossError""" + pool = boss.WorkerPool([]) + busy_wrk = mock.Mock(name='busy1') + busy_wrk.name = 'busy1' + fail_wrk = mock.Mock(name='fail1') + fail_wrk.name = 'fail1' + + calls = [] + + def func(item): + calls.append(item.name) + if item.name == 'busy1': + raise boss.WorkerBusy('too busy') + raise boss.BossError('failed') + + with terminal.capture(): + pool._run_parallel('Testing', [busy_wrk, fail_wrk], func) + fail_wrk.remove_lock.assert_called_once() + + +class TestCloseAll(unittest.TestCase): + """Test WorkerPool.close_all() (lines 1533-1544)""" + + def test_close_all_sends_quit(self): + """Test close_all sends quit and closes workers""" + pool = boss.WorkerPool([]) + wrk = mock.Mock() + wrk.closing = False + wrk.bytes_sent = 0 + wrk.bytes_recv = 0 + wrk.name = 'host1' + pool.workers = [wrk] + blog = mock.Mock() + pool._boss_log = blog + + with terminal.capture(): + pool.close_all() + + wrk.close.assert_called() + wrk.remove_lock.assert_called() + self.assertEqual(len(pool.workers), 0) + + +class TestCollectResultsTimeout(unittest.TestCase): + """Test collect_results recv timeout and non-result responses""" + + + @mock.patch.object(boss, 'BUILD_TIMEOUT', 0.01) + def test_collect_timeout(self): + """Test collect_results returns False on recv timeout (line 933)""" + ctx, wrk, tmpdir = _make_ctx() + state = boss.DemandState(sent=1, ncommits=1, + grab_func=lambda w, n: []) + recv_q = queue.Queue() + with terminal.capture(): + result = ctx.collect_results(wrk, recv_q, state) + self.assertFalse(result) + ctx.close() + shutil.rmtree(tmpdir) + + def test_collect_skips_unknown(self): + """Test collect_results skips non-build_result responses (line 940)""" + ctx, wrk, tmpdir = _make_ctx() + state = boss.DemandState(sent=1, ncommits=1, + grab_func=lambda w, n: []) + recv_q = queue.Queue() + recv_q.put(('resp', {'resp': 'configure_done'})) # unknown + recv_q.put(('resp', {'resp': 'build_done'})) + with terminal.capture(): + result = ctx.collect_results(wrk, recv_q, state) + self.assertTrue(result) + ctx.close() + shutil.rmtree(tmpdir) + + +class TestDispatchJobs(unittest.TestCase): + """Test WorkerPool._dispatch_jobs() (lines 1290-1309)""" + + def test_dispatch_jobs(self): + """Test _dispatch_jobs runs batch workers and closes context""" + pool = boss.WorkerPool([]) + wrk = mock.Mock(nthreads=4, closing=False, slots=2) + wrk.name = 'host1' + + brd = mock.Mock(target='rpi', arch='arm') + commit = mock.Mock(hash='abc123') + wjobs = [(brd, 0, commit)] + + builder = mock.Mock() + tmpdir = tempfile.mkdtemp() + builder.base_dir = tmpdir + board_selected = {'rpi': brd} + blog = mock.Mock() + pool._boss_log = blog + + # Mock build_boards to avoid actual protocol + wrk.build_boards = mock.Mock() + # recv_one will be called once — return False to end + with mock.patch.object(boss._DispatchContext, 'start_reader', + return_value=queue.Queue()), \ + mock.patch.object(boss._DispatchContext, 'recv_one', + return_value=False): + with terminal.capture(): + pool._dispatch_jobs({wrk: wjobs}, builder, + board_selected) + + self.assertIsNone(pool._boss_log) + shutil.rmtree(tmpdir) + + +class TestRunBatchWorker(unittest.TestCase): + """Test WorkerPool._run_batch_worker() (lines 1320-1358)""" + + def test_batch_worker_success(self): + """Test _run_batch_worker sends build_boards and collects""" + wrk = mock.Mock(nthreads=4, closing=False, slots=2) + wrk.name = 'host1' + brd = mock.Mock(target='rpi', arch='arm') + commit = mock.Mock(hash='abc123') + + builder = mock.Mock() + tmpdir = tempfile.mkdtemp() + builder.base_dir = tmpdir + + ctx = boss._DispatchContext( + workers=[wrk], builder=builder, + board_selected={'rpi': brd}, boss_log=mock.Mock()) + + recv_q = queue.Queue() + recv_q.put(('resp', {'resp': 'build_result', 'board': 'rpi', + 'commit_upto': 0, 'return_code': 0, + 'stderr': '', 'stdout': ''})) + build_dir = tempfile.mkdtemp() + builder.get_build_dir.return_value = build_dir + + with mock.patch.object(ctx, 'start_reader', + return_value=recv_q): + with terminal.capture(): + boss.WorkerPool._run_batch_worker( + wrk, [(brd, 0, commit)], ctx) + + wrk.build_boards.assert_called_once() + ctx.close() + shutil.rmtree(tmpdir) + shutil.rmtree(build_dir) + + def test_batch_worker_build_error(self): + """Test _run_batch_worker handles build_boards BossError""" + wrk = mock.Mock(nthreads=4, closing=False, slots=2) + wrk.name = 'host1' + wrk.build_boards.side_effect = boss.BossError('gone') + brd = mock.Mock(target='rpi', arch='arm') + commit = mock.Mock(hash='abc123') + + builder = mock.Mock() + tmpdir = tempfile.mkdtemp() + builder.base_dir = tmpdir + + ctx = boss._DispatchContext( + workers=[wrk], builder=builder, + board_selected={}, boss_log=mock.Mock()) + + with mock.patch.object(ctx, 'start_reader', + return_value=queue.Queue()): + with terminal.capture(): + boss.WorkerPool._run_batch_worker( + wrk, [(brd, 0, commit)], ctx) + + ctx.close() + shutil.rmtree(tmpdir) + + +class TestStartDemandWorker(unittest.TestCase): + """Test WorkerPool._start_demand_worker() (lines 1380-1400)""" + + def _make_pool_and_ctx(self): + pool = boss.WorkerPool([]) + wrk = mock.Mock(nthreads=4, closing=False, max_boards=2, + slots=2, toolchains={'arm': '/gcc'}) + wrk.name = 'host1' + builder = mock.Mock() + tmpdir = tempfile.mkdtemp() + builder.base_dir = tmpdir + ctx = boss._DispatchContext( + workers=[wrk], builder=builder, + board_selected={}, boss_log=mock.Mock()) + return pool, wrk, ctx, tmpdir + + def test_prepare_error(self): + """Test _start_demand_worker handles build_prepare BossError""" + pool, wrk, ctx, tmpdir = self._make_pool_and_ctx() + wrk.build_prepare.side_effect = boss.BossError('gone') + + with mock.patch.object(ctx, 'start_reader', + return_value=queue.Queue()): + with terminal.capture(): + recv_q, state = pool._start_demand_worker( + wrk, ctx, ['abc'], 1, [], threading.Lock()) + self.assertIsNone(recv_q) + self.assertIsNone(state) + ctx.close() + shutil.rmtree(tmpdir) + + def test_prepare_timeout(self): + """Test _start_demand_worker when wait_for_prepare fails""" + pool, wrk, ctx, tmpdir = self._make_pool_and_ctx() + + with mock.patch.object(ctx, 'start_reader', + return_value=queue.Queue()), \ + mock.patch.object(ctx, 'wait_for_prepare', + return_value=False): + with terminal.capture(): + recv_q, state = pool._start_demand_worker( + wrk, ctx, ['abc'], 1, [], threading.Lock()) + self.assertIsNone(recv_q) + ctx.close() + shutil.rmtree(tmpdir) + + def test_no_boards(self): + """Test _start_demand_worker when no boards available + + Also covers the BossError path in build_done (lines 1395-1396). + """ + pool, wrk, ctx, tmpdir = self._make_pool_and_ctx() + wrk.build_done.side_effect = boss.BossError('gone') + + with mock.patch.object(ctx, 'start_reader', + return_value=queue.Queue()), \ + mock.patch.object(ctx, 'wait_for_prepare', + return_value=True): + with terminal.capture(): + recv_q, state = pool._start_demand_worker( + wrk, ctx, ['abc'], 1, [], threading.Lock()) + self.assertIsNone(recv_q) + ctx.close() + shutil.rmtree(tmpdir) + + def test_send_batch_failure(self): + """Test _start_demand_worker when send_batch fails""" + pool, wrk, ctx, tmpdir = self._make_pool_and_ctx() + brd = mock.Mock(target='rpi', arch='arm') + pool_list = [brd] + + wrk.build_board.side_effect = boss.BossError('gone') + + with mock.patch.object(ctx, 'start_reader', + return_value=queue.Queue()), \ + mock.patch.object(ctx, 'wait_for_prepare', + return_value=True): + with terminal.capture(): + recv_q, state = pool._start_demand_worker( + wrk, ctx, ['abc'], 1, pool_list, + threading.Lock()) + self.assertIsNone(recv_q) + ctx.close() + shutil.rmtree(tmpdir) + + def test_success(self): + """Test _start_demand_worker success path""" + pool, wrk, ctx, tmpdir = self._make_pool_and_ctx() + brd = mock.Mock(target='rpi', arch='arm') + pool_list = [brd] + + with mock.patch.object(ctx, 'start_reader', + return_value=queue.Queue()), \ + mock.patch.object(ctx, 'wait_for_prepare', + return_value=True): + with terminal.capture(): + recv_q, state = pool._start_demand_worker( + wrk, ctx, ['abc'], 1, pool_list, + threading.Lock()) + self.assertIsNotNone(recv_q) + self.assertIsNotNone(state) + self.assertEqual(state.sent, 1) + ctx.close() + shutil.rmtree(tmpdir) + + +class TestFinishDemandWorker(unittest.TestCase): + """Test WorkerPool._finish_demand_worker() (lines 1429-1437)""" + + def test_build_done_error(self): + """Test _finish_demand_worker when build_done raises""" + wrk = mock.Mock(nthreads=4, closing=False, max_boards=0, + slots=2) + wrk.name = 'host1' + wrk.build_done.side_effect = boss.BossError('gone') + builder = mock.Mock() + tmpdir = tempfile.mkdtemp() + builder.base_dir = tmpdir + + ctx = boss._DispatchContext( + workers=[wrk], builder=builder, + board_selected={}, boss_log=mock.Mock()) + + state = boss.DemandState(sent=0, ncommits=1, + grab_func=lambda w, n: []) + recv_q = queue.Queue() + + with mock.patch.object(ctx, 'collect_results'): + boss.WorkerPool._finish_demand_worker( + wrk, ctx, recv_q, state) + ctx.close() + shutil.rmtree(tmpdir) + + def test_finish_waits_for_done(self): + """Test _finish_demand_worker waits for build_done response""" + wrk = mock.Mock(nthreads=4, closing=False, max_boards=0, + slots=2) + wrk.name = 'host1' + builder = mock.Mock() + tmpdir = tempfile.mkdtemp() + builder.base_dir = tmpdir + + ctx = boss._DispatchContext( + workers=[wrk], builder=builder, + board_selected={}, boss_log=mock.Mock()) + + state = boss.DemandState(sent=0, ncommits=1, + grab_func=lambda w, n: []) + recv_q = queue.Queue() + recv_q.put(('resp', {'resp': 'heartbeat'})) + recv_q.put(('resp', {'resp': 'build_done'})) + + with mock.patch.object(ctx, 'collect_results'): + boss.WorkerPool._finish_demand_worker( + wrk, ctx, recv_q, state) + ctx.close() + shutil.rmtree(tmpdir) + + @mock.patch.object(boss, 'BUILD_TIMEOUT', 0.01) + def test_finish_recv_timeout(self): + """Test _finish_demand_worker handles recv timeout""" + wrk = mock.Mock(nthreads=4, closing=False, max_boards=0, + slots=2) + wrk.name = 'host1' + builder = mock.Mock() + tmpdir = tempfile.mkdtemp() + builder.base_dir = tmpdir + + ctx = boss._DispatchContext( + workers=[wrk], builder=builder, + board_selected={}, boss_log=mock.Mock()) + + state = boss.DemandState(sent=0, ncommits=1, + grab_func=lambda w, n: []) + recv_q = queue.Queue() + + with mock.patch.object(ctx, 'collect_results'), \ + terminal.capture(): + boss.WorkerPool._finish_demand_worker( + wrk, ctx, recv_q, state) + ctx.close() + shutil.rmtree(tmpdir) + + +class TestCloseAllSignal(unittest.TestCase): + """Test close_all quit/close path (lines 1543-1544)""" + + def test_close_all_quit_error(self): + """Test close_all handles _send BossError during quit""" + pool = boss.WorkerPool([]) + wrk = mock.Mock() + wrk.closing = False + wrk.bytes_sent = 0 + wrk.bytes_recv = 0 + wrk.name = 'host1' + wrk._send.side_effect = boss.BossError('gone') + pool.workers = [wrk] + pool._boss_log = mock.Mock() + + with terminal.capture(): + pool.close_all() + wrk.close.assert_called() + self.assertEqual(len(pool.workers), 0) + + +class TestZeroCapacity(unittest.TestCase): + """Test _get_worker_for_arch with zero total capacity (line 1183)""" + + def test_zero_nthreads(self): + """Test workers with 0 nthreads don't cause division by zero""" + w1 = mock.Mock(nthreads=0, bogomips=0, + toolchains={'arm': '/gcc'}) + pool = boss.WorkerPool.__new__(boss.WorkerPool) + pool.workers = [w1] + # Should not raise ZeroDivisionError + wrk = pool._get_worker_for_arch('arm', {}) + self.assertEqual(wrk, w1) + + +if __name__ == '__main__': + unittest.main() diff --git a/tools/buildman/test_machine.py b/tools/buildman/test_machine.py index b635d1afb6f..eb28d313e3e 100644 --- a/tools/buildman/test_machine.py +++ b/tools/buildman/test_machine.py @@ -264,7 +264,8 @@ class TestMachinePool(unittest.TestCase): 'host2\n' ) pool = machine.MachinePool() - available = pool.probe_all() + with terminal.capture(): + available = pool.probe_all() self.assertEqual(len(available), 2) self.assertEqual(pool.get_total_weight(), 14) @@ -285,7 +286,8 @@ class TestMachinePool(unittest.TestCase): 'host2\n' ) pool = machine.MachinePool() - available = pool.probe_all() + with terminal.capture(): + available = pool.probe_all() self.assertEqual(len(available), 1) self.assertEqual(available[0].hostname, 'host1') @@ -311,8 +313,10 @@ sandbox : /usr/bin/gcc 'host1\n' ) pool = machine.MachinePool() - pool.probe_all() - missing = pool.check_toolchains({'arm', 'sandbox'}) + with terminal.capture(): + pool.probe_all() + with terminal.capture(): + missing = pool.check_toolchains({'arm', 'sandbox'}) self.assertEqual(missing, {}) @mock.patch('buildman.machine._run_ssh') @@ -336,8 +340,10 @@ sandbox : /usr/bin/gcc 'host1\n' ) pool = machine.MachinePool() - pool.probe_all() - missing = pool.check_toolchains({'arm', 'sandbox'}) + with terminal.capture(): + pool.probe_all() + with terminal.capture(): + missing = pool.check_toolchains({'arm', 'sandbox'}) self.assertEqual(len(missing), 1) m = list(missing.keys())[0] self.assertIn('arm', missing[m]) @@ -683,7 +689,8 @@ class TestMachinePoolExtended(unittest.TestCase): 'mem_avail_mb': 8000, 'disk_avail_mb': 20000}) bsettings.add_file('[machines]\nhost1\n') pool = machine.MachinePool() - pool.probe_all() + with terminal.capture(): + pool.probe_all() # Just verify it doesn't crash with terminal.capture(): pool.print_summary() @@ -703,8 +710,10 @@ class TestMachinePoolExtended(unittest.TestCase): mock_ssh.side_effect = ssh_side_effect bsettings.add_file('[machines]\nhost1\n') pool = machine.MachinePool() - pool.probe_all() - pool.check_toolchains({'arm', 'sandbox'}) + with terminal.capture(): + pool.probe_all() + with terminal.capture(): + pool.check_toolchains({'arm', 'sandbox'}) with terminal.capture(): pool.print_summary(local_archs={'arm', 'sandbox'}) @@ -728,7 +737,8 @@ class TestMachinePoolExtended(unittest.TestCase): mock_ssh.side_effect = ssh_side_effect bsettings.add_file('[machines]\nhost1\n') pool = machine.MachinePool() - pool.probe_all() + with terminal.capture(): + pool.probe_all() local_gcc = { 'arm': f'{home}/.buildman-toolchains/gcc-13.1.0-nolibc/' @@ -751,7 +761,8 @@ class TestMachinePoolExtended(unittest.TestCase): with mock.patch.object(machine.Machine, '_probe_toolchains_from_boss', fake_probe): - missing = pool.check_toolchains({'arm'}, local_gcc=local_gcc) + with terminal.capture(): + missing = pool.check_toolchains({'arm'}, local_gcc=local_gcc) # arm should be flagged as missing due to version mismatch self.assertEqual(len(missing), 1) @@ -839,7 +850,8 @@ class TestPrintSummaryEdgeCases(unittest.TestCase): mock_ssh.side_effect = machine.MachineError('refused') bsettings.add_file('[machines]\nhost1\n') pool = machine.MachinePool() - pool.probe_all() + with terminal.capture(): + pool.probe_all() # Should not crash with unavailable machine with terminal.capture(): pool.print_summary() @@ -851,7 +863,8 @@ class TestPrintSummaryEdgeCases(unittest.TestCase): **MACHINE_INFO, 'load_1m': 10.0}) bsettings.add_file('[machines]\nhost1\n') pool = machine.MachinePool() - pool.probe_all() + with terminal.capture(): + pool.probe_all() with terminal.capture(): pool.print_summary() @@ -865,7 +878,8 @@ class TestPrintSummaryEdgeCases(unittest.TestCase): '[machines]\nhost1\n' '[machine:host1]\nmax_boards = 50\n') pool = machine.MachinePool() - pool.probe_all() + with terminal.capture(): + pool.probe_all() with terminal.capture(): pool.print_summary() @@ -877,7 +891,8 @@ class TestPrintSummaryEdgeCases(unittest.TestCase): 'mem_avail_mb': 8000, 'disk_avail_mb': 20000}) bsettings.add_file('[machines]\nhost1\n') pool = machine.MachinePool() - pool.probe_all() + with terminal.capture(): + pool.probe_all() pool.machines[0].tc_error = 'buildman not found' with terminal.capture(): pool.print_summary(local_archs={'arm'}) @@ -890,7 +905,8 @@ class TestPrintSummaryEdgeCases(unittest.TestCase): 'mem_avail_mb': 8000, 'disk_avail_mb': 20000}) bsettings.add_file('[machines]\nhost1\n') pool = machine.MachinePool() - pool.probe_all() + with terminal.capture(): + pool.probe_all() pool.machines[0].toolchains = {'sandbox': '/usr/bin/gcc'} local_gcc = { 'arm': os.path.expanduser( @@ -912,7 +928,8 @@ class TestCheckToolchainsEdge(unittest.TestCase): bsettings.add_file('[machines]\nhost1\n') pool = machine.MachinePool() # Machine is not probed, so not reachable - result = pool.check_toolchains({'arm'}) + with terminal.capture(): + result = pool.check_toolchains({'arm'}) self.assertEqual(result, {}) @mock.patch('buildman.machine._run_ssh') @@ -931,15 +948,17 @@ class TestCheckToolchainsEdge(unittest.TestCase): mock_ssh.side_effect = ssh_side_effect bsettings.add_file('[machines]\nhost1\n') pool = machine.MachinePool() - pool.probe_all() + with terminal.capture(): + pool.probe_all() local_gcc = { 'arm': f'{home}/.buildman-toolchains/gcc-13/arm/bin/gcc', } # fetch=True should trigger _fetch_all_missing with mock.patch.object(pool, '_fetch_all_missing') as mock_fetch: - pool.check_toolchains({'arm'}, fetch=True, - local_gcc=local_gcc) + with terminal.capture(): + pool.check_toolchains({'arm'}, fetch=True, + local_gcc=local_gcc) mock_fetch.assert_called_once() @@ -1011,7 +1030,8 @@ class TestPrintSummaryMissingNoVersion(unittest.TestCase): 'mem_avail_mb': 8000, 'disk_avail_mb': 20000}) bsettings.add_file('[machines]\nhost1\n') pool = machine.MachinePool() - pool.probe_all() + with terminal.capture(): + pool.probe_all() pool.machines[0].toolchains = {} # arm has a version (under ~/.buildman-toolchains), # sandbox does not