new file mode 100644
@@ -0,0 +1,1507 @@
+# SPDX-License-Identifier: GPL-2.0+
+# Copyright 2026 Simon Glass <sjg@chromium.org>
+# pylint: disable=C0302
+
+"""Boss side of the distributed build protocol
+
+Manages SSH connections to remote workers and communicates using the JSON-lines
+protocol defined in worker.py. Each RemoteWorker wraps a persistent SSH process
+whose stdin/stdout carry the protocol messages.
+
+Typical usage:
+ w = RemoteWorker('myhost', buildman_path='buildman')
+ w.start() # launches ssh, waits for 'ready'
+ w.setup() # worker creates git repo
+ w.push_source(git_dir, ref) # git push to the worker's repo
+ w.build('sandbox', commit='abc123', commit_upto=0)
+ result = w.recv() # {'resp': 'build_result', ...}
+ w.quit()
+"""
+
+import datetime
+import json
+import os
+import queue
+import subprocess
+import sys
+import threading
+import time
+
+from buildman import builderthread
+from buildman import worker as worker_mod
+from u_boot_pylib import command
+from u_boot_pylib import tools
+from u_boot_pylib import tout
+
+# SSH options shared with machine.py
+SSH_OPTS = [
+ '-o', 'BatchMode=yes',
+ '-o', 'StrictHostKeyChecking=accept-new',
+]
+
+# Per-build timeout in seconds. If a worker doesn't respond within this
+# time, the boss assumes the worker is dead or hung and stops using it.
+BUILD_TIMEOUT = 300
+
+# Interval in seconds between status summaries in the boss log
+STATUS_INTERVAL = 60
+
+
+class BossError(Exception):
+ """Error communicating with a remote worker"""
+
+
+class WorkerBusy(BossError):
+ """Worker machine is already in use by another boss"""
+
+
+def _run_ssh(hostname, remote_cmd, timeout=10):
+ """Run a one-shot SSH command on a remote host
+
+ Args:
+ hostname (str): SSH hostname
+ remote_cmd (str): Shell command to run on the remote host
+ timeout (int): SSH connect timeout in seconds
+
+ Returns:
+ str: stdout from the command
+
+ Raises:
+ BossError: if the command fails
+ """
+ ssh_cmd = [
+ 'ssh',
+ '-o', f'ConnectTimeout={timeout}',
+ ] + SSH_OPTS + [hostname, '--', remote_cmd]
+ try:
+ result = command.run_pipe(
+ [ssh_cmd], capture=True, capture_stderr=True,
+ raise_on_error=True)
+ return result.stdout.strip() if result.stdout else ''
+ except command.CommandExc as exc:
+ raise BossError(f'SSH command failed on {hostname}: {exc}') from exc
+
+
+def kill_workers(machines):
+ """Kill stale worker processes and remove lock files on remote machines
+
+ Connects to each machine via SSH, kills any running worker processes
+ and removes the lock file. Useful for cleaning up after a failed or
+ interrupted distributed build.
+
+ Args:
+ machines (list of str): SSH hostnames to clean up
+
+ Returns:
+ int: 0 on success
+ """
+
+ results = {}
+ lock = threading.Lock()
+
+ def _kill_one(hostname):
+ kill_script = ('pids=$(pgrep -f "[p]ython3.*--worker" 2>/dev/null); '
+ 'if [ -n "$pids" ]; then '
+ ' kill $pids 2>/dev/null; '
+ ' echo "killed $pids"; '
+ 'else '
+ ' echo "no workers"; '
+ 'fi; '
+ 'rm -f ~/dev/.bm-worker/.lock'
+ )
+ try:
+ output = _run_ssh(hostname, kill_script)
+ with lock:
+ results[hostname] = output
+ except BossError as exc:
+ with lock:
+ results[hostname] = f'FAILED: {exc}'
+
+ threads = []
+ for hostname in machines:
+ thr = threading.Thread(target=_kill_one, args=(hostname,))
+ thr.start()
+ threads.append(thr)
+ for thr in threads:
+ thr.join()
+
+ for hostname, output in sorted(results.items()):
+ print(f' {hostname}: {output}')
+ return 0
+
+
+class RemoteWorker: # pylint: disable=R0902
+ """Manages one SSH connection to a remote buildman worker
+
+ The startup sequence is:
+ 1. init_git() - create a bare git repo on the remote via one-shot SSH
+ 2. push_source() - git push the local tree to the remote repo
+ 3. start() - launch the worker from the pushed tree
+
+ This ensures the worker runs the same version of buildman as the boss.
+
+ Attributes:
+ hostname (str): SSH hostname (user@host or just host)
+ nthreads (int): Number of build threads the worker reported
+ git_dir (str): Path to the worker's git directory
+ work_dir (str): Path to the worker's work directory
+ """
+
+ def __init__(self, hostname, timeout=10, name=None):
+ """Create a new remote worker connection
+
+ Args:
+ hostname (str): SSH hostname
+ timeout (int): SSH connect timeout in seconds
+ name (str or None): Short display name, defaults to hostname
+ """
+ self.hostname = hostname
+ self.name = name or hostname
+ self.timeout = timeout
+ self.nthreads = 0
+ self.slots = 1
+ self.max_boards = 0
+ self.bogomips = 0.0
+ self.git_dir = ''
+ self.work_dir = ''
+ self.toolchains = {}
+ self.closing = False
+ self.bytes_sent = 0
+ self.bytes_recv = 0
+ self._proc = None
+ self._stderr_lines = []
+ self._stderr_thread = None
+
+ def init_git(self, work_dir='~/dev/.bm-worker'):
+ """Ensure a git repo exists on the remote host via one-shot SSH
+
+ Reuses an existing repo if present, so that subsequent pushes
+ only transfer the delta. Creates a lock file to prevent two
+ bosses from using the same worker simultaneously. A lock is
+ considered stale if no worker process is running.
+
+ Args:
+ work_dir (str): Fixed path for the work directory
+
+ Raises:
+ WorkerBusy: if another boss holds the lock
+ BossError: if the SSH command fails
+ """
+ lock = f'{work_dir}/.lock'
+ init_script = (f'mkdir -p {work_dir} && '
+ # Check for lock — stale if no worker process is running
+ f'if [ -f {lock} ]; then '
+ f' if pgrep -f "[p]ython3.*--worker" >/dev/null 2>&1; then '
+ f' echo BUSY; exit 0; '
+ f' fi; '
+ f' rm -f {lock}; '
+ f'fi && '
+ # Create lock and init git
+ f'date +%s > {lock} && '
+ f'(test -d {work_dir}/.git || git init -q {work_dir}) && '
+ f'git -C {work_dir} config '
+ f'receive.denyCurrentBranch updateInstead && '
+ f'echo {work_dir}'
+ )
+ output = _run_ssh(self.hostname, init_script, self.timeout)
+ if not output:
+ raise BossError(
+ f'init_git on {self.hostname} returned no work directory')
+ last_line = output.splitlines()[-1].strip()
+ if last_line == 'BUSY':
+ raise WorkerBusy(f'{self.hostname} is busy (locked)')
+ self.work_dir = last_line
+ self.git_dir = os.path.join(self.work_dir, '.git')
+
+ def start(self, debug=False):
+ """Start the worker from the pushed source tree
+
+ Launches the worker using the buildman from the pushed git tree.
+ The source must already have been pushed via init_git() and
+ push_source().
+
+ A background thread forwards the worker's stderr to the boss's
+ stderr, prefixed with the machine name, so that debug messages
+ and errors are always visible.
+
+ Args:
+ debug (bool): True to pass -D to the worker for tracebacks
+
+ Raises:
+ BossError: if the SSH connection or worker startup fails
+ """
+ if not self.work_dir:
+ raise BossError(f'No work_dir on {self.hostname} '
+ f'(call init_git and push_source first)')
+ worker_cmd = 'python3 tools/buildman/main.py --worker'
+ if debug:
+ worker_cmd += ' -D'
+ ssh_cmd = [
+ 'ssh',
+ '-o', f'ConnectTimeout={self.timeout}',
+ ] + SSH_OPTS + [
+ self.hostname, '--',
+ f'cd {self.work_dir} && git checkout -qf work && '
+ f'{worker_cmd}',
+ ]
+ try:
+ # pylint: disable=R1732
+ self._proc = subprocess.Popen(
+ ssh_cmd, stdin=subprocess.PIPE, stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE)
+ except OSError as exc:
+ raise BossError(
+ f'Failed to start SSH to {self.hostname}: {exc}') from exc
+
+ # Forward worker stderr in a background thread so debug messages
+ # and errors are always visible
+ self._stderr_lines = []
+ self._stderr_thread = threading.Thread(
+ target=self._forward_stderr, daemon=True)
+ self._stderr_thread.start()
+
+ resp = self._recv()
+ if resp.get('resp') != 'ready':
+ self.close()
+ raise BossError(
+ f'Worker on {self.hostname} did not send ready: {resp}')
+ self.nthreads = resp.get('nthreads', 1)
+ self.slots = resp.get('slots', 1)
+ if not self.max_boards:
+ self.max_boards = self.nthreads
+
+ def _forward_stderr(self):
+ """Forward worker stderr to boss stderr with machine name prefix
+
+ Runs in a background thread. Saves lines for _get_stderr() too.
+ """
+ try:
+ for raw in self._proc.stderr:
+ line = raw.decode('utf-8', errors='replace').rstrip('\n')
+ if line:
+ self._stderr_lines.append(line)
+ sys.stderr.write(f'[{self.name}] {line}\n')
+ sys.stderr.flush()
+ except (OSError, ValueError):
+ pass
+
+ def _send(self, obj):
+ """Send a JSON command to the worker
+
+ Args:
+ obj (dict): Command object to send
+
+ Raises:
+ BossError: if the SSH process is not running
+ """
+ if not self._proc or self._proc.poll() is not None:
+ raise BossError(f'Worker on {self.hostname} is not running')
+ line = json.dumps(obj) + '\n'
+ data = line.encode('utf-8')
+ self.bytes_sent += len(data)
+ self._proc.stdin.write(data)
+ self._proc.stdin.flush()
+
+ def _recv(self):
+ """Read the next protocol response from the worker
+
+ Reads lines from stdout, skipping any that don't start with the
+ 'BM> ' prefix (e.g. SSH banners).
+
+ Returns:
+ dict: Parsed JSON response
+
+ Raises:
+ BossError: if the worker closes the connection or sends bad data
+ """
+ while True:
+ raw = self._proc.stdout.readline()
+ if not raw:
+ stderr = self._get_stderr()
+ raise BossError(f'Worker on {self.hostname} closed connection'
+ f'{": " + stderr if stderr else ""}')
+ self.bytes_recv += len(raw)
+ line = raw.decode('utf-8', errors='replace').rstrip('\n')
+ if line.startswith(worker_mod.RESPONSE_PREFIX):
+ payload = line[len(worker_mod.RESPONSE_PREFIX):]
+ try:
+ return json.loads(payload)
+ except json.JSONDecodeError as exc:
+ raise BossError(
+ f'Bad JSON from {self.hostname}: {exc}') from exc
+
+ def _get_stderr(self):
+ """Get the last stderr line from the worker
+
+ Waits briefly for the stderr forwarding thread to finish
+ collecting output, then returns the last non-empty line.
+
+ Returns:
+ str: Last non-empty line of stderr, or empty string
+ """
+ if hasattr(self, '_stderr_thread'):
+ self._stderr_thread.join(timeout=2)
+ for line in reversed(self._stderr_lines):
+ if line.strip():
+ return line.strip()
+ return ''
+
+ def push_source(self, local_git_dir, refspec):
+ """Push source code to the worker's git repo
+
+ Uses 'git push' over SSH to send commits to the worker.
+
+ Args:
+ local_git_dir (str): Path to local git directory
+ refspec (str): Git refspec to push (e.g. 'HEAD:refs/heads/work')
+
+ Raises:
+ BossError: if the push fails
+ """
+ if not self.git_dir:
+ raise BossError(
+ f'No git_dir on {self.hostname} (call init_git first)')
+ push_url = f'{self.hostname}:{self.git_dir}'
+ try:
+ command.run_pipe([['git', 'push', '--force', push_url, refspec]],
+ capture=True, capture_stderr=True,
+ raise_on_error=True, cwd=local_git_dir)
+ except command.CommandExc as exc:
+ raise BossError(
+ f'git push to {self.hostname} failed: {exc}') from exc
+
+ def configure(self, settings):
+ """Send build settings to the worker
+
+ Sends settings that affect how make is invoked (verbose, no_lto,
+ allow_missing, etc.). Must be called after start() and before
+ any build commands.
+
+ Args:
+ settings (dict): Build settings, e.g.:
+ verbose_build (bool): Run make with V=1
+ allow_missing (bool): Pass BINMAN_ALLOW_MISSING=1
+ no_lto (bool): Pass NO_LTO=1
+ reproducible_builds (bool): Pass SOURCE_DATE_EPOCH=0
+ warnings_as_errors (bool): Pass KCFLAGS=-Werror
+ mrproper (bool): Run make mrproper before config
+ fallback_mrproper (bool): Retry with mrproper on failure
+
+ Raises:
+ BossError: if the worker rejects the settings
+ """
+ self._send({'cmd': 'configure', 'settings': settings})
+ resp = self._recv()
+ if resp.get('resp') != 'configure_done':
+ raise BossError(
+ f'Worker on {self.hostname} rejected configure: {resp}')
+
+ def build_boards(self, boards, commits):
+ """Send a build_boards command to the worker
+
+ Tells the worker to build all boards for each commit. The
+ worker handles checkout scheduling, parallelism and -j
+ calculation internally.
+
+ Args:
+ boards (list of dict): Board info dicts with keys:
+ board (str): Board target name
+ defconfig (str): Defconfig target
+ env (dict): Extra environment variables
+ commits (list): Commit hashes in order, or [None] for
+ current source
+ """
+ self._send({
+ 'cmd': 'build_boards',
+ 'boards': boards,
+ 'commits': commits,
+ })
+
+ def build_prepare(self, commits):
+ """Send a build_prepare command to the worker
+
+ Creates the Builder and worktrees. Follow with build_board()
+ calls, then build_done().
+
+ Args:
+ commits (list): Commit hashes in order, or [None] for
+ current source
+ """
+ self._send({'cmd': 'build_prepare', 'commits': commits,
+ 'max_boards': self.max_boards})
+
+ def build_board(self, board, arch):
+ """Send a build_board command to add one board to the worker
+
+ Args:
+ board (str): Board target name
+ arch (str): Board architecture
+ """
+ self._send({
+ 'cmd': 'build_board',
+ 'board': board,
+ 'arch': arch,
+ })
+
+ def build_done(self):
+ """Tell the worker no more boards are coming"""
+ self._send({'cmd': 'build_done'})
+
+ def recv(self):
+ """Receive the next response from the worker
+
+ Returns:
+ dict: Parsed JSON response
+ """
+ return self._recv()
+
+ def quit(self):
+ """Tell the worker to quit, remove the lock and close"""
+ try:
+ self._send({'cmd': 'quit'})
+ resp = self._recv()
+ except BossError:
+ resp = {}
+ self.close()
+ self.remove_lock()
+ return resp
+
+ def remove_lock(self):
+ """Remove the lock file from the remote machine"""
+ if self.work_dir:
+ try:
+ _run_ssh(self.hostname,
+ f'rm -f {self.work_dir}/.lock', self.timeout)
+ except BossError:
+ pass
+
+ def close(self):
+ """Close the SSH connection
+
+ Closes stdin first so SSH can flush any pending data (e.g. a
+ quit command) to the remote, then waits briefly for SSH to
+ exit on its own before terminating it.
+ """
+ if self._proc:
+ try:
+ self._proc.stdin.close()
+ except OSError:
+ pass
+ try:
+ self._proc.wait(timeout=2)
+ except subprocess.TimeoutExpired:
+ self._proc.terminate()
+ try:
+ self._proc.wait(timeout=3)
+ except subprocess.TimeoutExpired:
+ self._proc.kill()
+ self._proc = None
+
+ def __repr__(self):
+ status = 'running' if self._proc else 'stopped'
+ return (f'RemoteWorker({self.hostname}, '
+ f'nthreads={self.nthreads}, {status})')
+
+ def __del__(self):
+ self.close()
+
+
+def _format_bytes(nbytes):
+ """Format a byte count as a human-readable string"""
+ if nbytes < 1024:
+ return f'{nbytes}B'
+ if nbytes < 1024 * 1024:
+ return f'{nbytes / 1024:.1f}KB'
+ return f'{nbytes / (1024 * 1024):.1f}MB'
+
+
+class _BossLog:
+ """Central boss log for distributed builds
+
+ Logs major events and periodic per-worker status summaries
+ to boss.log in the builder output directory.
+ """
+
+ def __init__(self, base_dir):
+ os.makedirs(base_dir, exist_ok=True)
+ path = os.path.join(base_dir, '.buildman.log')
+ # pylint: disable=R1732
+ self._logf = open(path, 'w', encoding='utf-8')
+ self._lock = threading.Lock()
+ self._stats = {}
+ self._timer = None
+ self._closed = False
+
+ def log(self, msg):
+ """Write a timestamped log entry"""
+ with self._lock:
+ if self._logf:
+ stamp = datetime.datetime.now().strftime('%H:%M:%S')
+ self._logf.write(f'{stamp} {msg}\n')
+ self._logf.flush()
+
+ def init_worker(self, wrk):
+ """Register a worker for status tracking"""
+ with self._lock:
+ self._stats[wrk.name] = {
+ 'sent': 0,
+ 'recv': 0,
+ 'load_avg': 0.0,
+ 'nthreads': wrk.nthreads,
+ }
+
+ def record_sent(self, wrk_name, count=1):
+ """Record boards sent to a worker"""
+ with self._lock:
+ if wrk_name in self._stats:
+ self._stats[wrk_name]['sent'] += count
+
+ def record_recv(self, wrk_name, load_avg=0.0):
+ """Record a reply received from a worker"""
+ with self._lock:
+ if wrk_name in self._stats:
+ self._stats[wrk_name]['recv'] += 1
+ self._stats[wrk_name]['load_avg'] = load_avg
+
+ def log_status(self):
+ """Log a status summary for all workers"""
+ with self._lock:
+ parts = []
+ total_load = 0.0
+ total_threads = 0
+ total_done = 0
+ total_sent = 0
+ for name, st in self._stats.items():
+ nthreads = st['nthreads']
+ cpu_pct = (st['load_avg'] / nthreads * 100 if nthreads else 0)
+ parts.append(f'{name}:done={st["recv"]}/{st["sent"]}'
+ f' cpu={cpu_pct:.0f}%')
+ total_load += st['load_avg']
+ total_threads += nthreads
+ total_done += st['recv']
+ total_sent += st['sent']
+ total_cpu = (total_load / total_threads * 100
+ if total_threads else 0)
+ parts.append(f'TOTAL:done={total_done}/{total_sent}'
+ f' cpu={total_cpu:.0f}%')
+ if self._logf:
+ stamp = datetime.datetime.now().strftime('%H:%M:%S')
+ self._logf.write(f'{stamp} STATUS {", ".join(parts)}\n')
+ self._logf.flush()
+
+ def start_timer(self):
+ """Start the periodic status timer"""
+ def _tick():
+ if not self._closed:
+ self.log_status()
+ self._timer = threading.Timer(STATUS_INTERVAL, _tick)
+ self._timer.daemon = True
+ self._timer.start()
+ self._timer = threading.Timer(STATUS_INTERVAL, _tick)
+ self._timer.daemon = True
+ self._timer.start()
+
+ def close(self):
+ """Stop the timer and close the log file"""
+ self._closed = True
+ if self._timer:
+ self._timer.cancel()
+ self._timer = None
+ with self._lock:
+ if self._logf:
+ self._logf.close()
+ self._logf = None
+
+
+def split_boards(board_selected, toolchains):
+ """Split boards between local and remote machines
+
+ Boards whose architecture has a toolchain on at least one remote machine
+ are assigned to remote workers. The rest stay local.
+
+ Args:
+ board_selected (dict): target_name -> Board for all selected boards
+ toolchains (dict): Architecture -> gcc path on remote machines.
+ Combined from all machines.
+
+ Returns:
+ tuple:
+ dict: target_name -> Board for local builds
+ dict: target_name -> Board for remote builds
+ """
+ remote_archs = set(toolchains.keys()) if toolchains else set()
+ local = {}
+ remote = {}
+ for name, brd in board_selected.items():
+ if brd.arch in remote_archs:
+ remote[name] = brd
+ else:
+ local[name] = brd
+ return local, remote
+
+
+def _write_remote_result(builder, resp, board_selected, hostname):
+ """Write a remote build result and update builder progress
+
+ Creates the same directory structure and files that BuilderThread would
+ create for a local build, then calls builder.process_result() to
+ update the progress display.
+
+ Args:
+ builder (Builder): Builder object
+ resp (dict): build_result response from a worker
+ board_selected (dict): target_name -> Board, for looking up
+ the Board object
+ hostname (str): Remote machine that built this board
+ """
+ board = resp.get('board', '')
+ commit_upto = resp.get('commit_upto', 0)
+ return_code = resp.get('return_code', 1)
+ stderr = resp.get('stderr', '')
+
+ build_dir = builder.get_build_dir(commit_upto, board)
+ builderthread.mkdir(build_dir, parents=True)
+
+ tools.write_file(os.path.join(build_dir, 'done'),
+ f'{return_code}\n', binary=False)
+
+ err_path = os.path.join(build_dir, 'err')
+ if stderr:
+ tools.write_file(err_path, stderr, binary=False)
+ elif os.path.exists(err_path):
+ os.remove(err_path)
+
+ tools.write_file(os.path.join(build_dir, 'log'),
+ resp.get('stdout', ''), binary=False)
+
+ sizes = resp.get('sizes', {})
+ if sizes.get('raw'):
+ # Strip any header line (starts with 'text') in case the worker
+ # sends raw size output including the header
+ raw = sizes['raw']
+ lines = raw.splitlines()
+ if lines and lines[0].lstrip().startswith('text'):
+ raw = '\n'.join(lines[1:])
+ if raw.strip():
+ tools.write_file(os.path.join(build_dir, 'sizes'),
+ raw, binary=False)
+
+ # Update the builder's progress display
+ brd = board_selected.get(board)
+ if brd:
+ result = command.CommandResult(stderr=stderr, return_code=return_code)
+ result.brd = brd
+ result.commit_upto = commit_upto
+ result.already_done = False
+ result.kconfig_reconfig = False
+ result.remote = hostname
+ builder.process_result(result)
+
+
+class DemandState: # pylint: disable=R0903
+ """Mutable state for a demand-driven worker build
+
+ Tracks how many boards have been sent, received and are in-flight
+ for a single worker during demand-driven dispatch.
+
+ Attributes:
+ sent: Total boards sent to the worker
+ in_flight: Boards currently being built (sent - completed)
+ expected: Total results expected (sent * ncommits)
+ received: Results received so far
+ board_results: Per-board result count (target -> int)
+ ncommits: Number of commits being built
+ grab_func: Callable(wrk, count) -> list of Board to get more
+ boards from the shared pool
+ """
+
+ def __init__(self, sent, ncommits, grab_func):
+ self.sent = sent
+ self.in_flight = sent
+ self.expected = sent * ncommits
+ self.received = 0
+ self.board_results = {}
+ self.ncommits = ncommits
+ self.grab_func = grab_func
+
+
+class _DispatchContext:
+ """Shared infrastructure for dispatching builds to workers
+
+ Manages per-worker log files, worktree progress tracking, reader
+ threads, and result processing. Used by both _dispatch_jobs() and
+ _dispatch_demand() to avoid duplicating this infrastructure.
+ """
+
+ def __init__(self, workers, builder, board_selected, boss_log):
+ self.builder = builder
+ self.board_selected = board_selected
+ self.boss_log = boss_log
+ self._worktree_counts = {}
+ self._worktree_lock = threading.Lock()
+
+ # Open a log file per worker
+ os.makedirs(builder.base_dir, exist_ok=True)
+ self.log_files = {}
+ for wrk in workers:
+ path = os.path.join(builder.base_dir, f'worker-{wrk.name}.log')
+ self.log_files[wrk] = open( # pylint: disable=R1732
+ path, 'w', encoding='utf-8')
+
+ def log(self, wrk, direction, msg):
+ """Write a timestamped entry to a worker's log file"""
+ logf = self.log_files.get(wrk)
+ if logf:
+ stamp = datetime.datetime.now().strftime('%H:%M:%S')
+ logf.write(f'{stamp} {direction} {msg}\n')
+ logf.flush()
+
+ def update_progress(self, resp, wrk):
+ """Handle worktree progress messages from a worker
+
+ Args:
+ resp (dict): Response from the worker
+ wrk (RemoteWorker): Worker that sent the response
+
+ Returns:
+ bool: True if the response was a progress message
+ """
+ resp_type = resp.get('resp')
+ if resp_type == 'build_started':
+ with self._worktree_lock:
+ num = resp.get('num_threads', wrk.nthreads)
+ self._worktree_counts[wrk.name] = (self._worktree_counts.get(
+ wrk.name, (0, num))[0], num)
+ return True
+ if resp_type == 'worktree_created':
+ with self._worktree_lock:
+ done, total = self._worktree_counts.get(
+ wrk.name, (0, wrk.nthreads))
+ self._worktree_counts[wrk.name] = (done + 1, total)
+ self._refresh_progress()
+ return True
+ return False
+
+ def _refresh_progress(self):
+ """Update the builder's progress string from worktree counts"""
+ with self._worktree_lock:
+ parts = []
+ for name, (done, total) in sorted(self._worktree_counts.items()):
+ if done < total:
+ parts.append(f'{name} {done}/{total}')
+ self.builder.progress = ', '.join(parts)
+ if self.builder.progress:
+ self.builder.process_result(None)
+
+ def start_reader(self, wrk):
+ """Start a background reader thread for a worker
+
+ Returns:
+ queue.Queue: Queue that receives (status, value) tuples
+ """
+ recv_q = queue.Queue()
+
+ def _reader():
+ while True:
+ try:
+ resp = wrk.recv()
+ recv_q.put(('ok', resp))
+ except BossError as exc:
+ recv_q.put(('error', exc))
+ break
+ except Exception: # pylint: disable=W0718
+ recv_q.put(('error', BossError(
+ f'Worker on {wrk.name} connection lost')))
+ break
+
+ threading.Thread(target=_reader, daemon=True).start()
+ return recv_q
+
+ def recv(self, wrk, recv_q):
+ """Get next response from queue with timeout
+
+ Returns:
+ dict or None: Response, or None on error
+ """
+ try:
+ status, val = recv_q.get(timeout=BUILD_TIMEOUT)
+ except queue.Empty:
+ self.log(wrk, '!!', f'Worker timed out after {BUILD_TIMEOUT}s')
+ if not wrk.closing:
+ print(f'\n Error from {wrk.name}: timed out')
+ return None
+ if status == 'error':
+ self.log(wrk, '!!', str(val))
+ if not wrk.closing:
+ print(f'\n Error from {wrk.name}: {val}')
+ return None
+ resp = val
+ self.log(wrk, '<<', json.dumps(resp, separators=(',', ':')))
+ if resp.get('resp') == 'error':
+ if not wrk.closing:
+ print(f'\n Worker error on {wrk.name}: '
+ f'{resp.get("msg", "unknown")}')
+ return None
+ return resp
+
+ def write_result(self, wrk, resp):
+ """Write a build result and update progress
+
+ Returns:
+ bool: True on success, False on error
+ """
+ if self.boss_log:
+ self.boss_log.record_recv(wrk.name, resp.get('load_avg', 0.0))
+ try:
+ _write_remote_result(
+ self.builder, resp, self.board_selected, wrk.name)
+ except Exception as exc: # pylint: disable=W0718
+ self.log(wrk, '!!', f'unexpected: {exc}')
+ print(f'\n Unexpected error on {wrk.name}: {exc}')
+ return False
+ return True
+
+ def wait_for_prepare(self, wrk, recv_q):
+ """Wait for build_prepare_done, handling progress messages
+
+ Returns:
+ bool: True if prepare succeeded
+ """
+ while True:
+ resp = self.recv(wrk, recv_q)
+ if resp is None:
+ return False
+ if self.update_progress(resp, wrk):
+ continue
+ resp_type = resp.get('resp')
+ if resp_type == 'build_prepare_done':
+ return True
+ if resp_type == 'heartbeat':
+ continue
+ self.log(wrk, '!!', f'unexpected during prepare: {resp_type}')
+ return False
+
+ @staticmethod
+ def send_batch(wrk, boards):
+ """Send a batch of boards to a worker
+
+ Returns:
+ int: Number of boards sent, or -1 on error
+ """
+ for brd in boards:
+ try:
+ wrk.build_board(brd.target, brd.arch)
+ except BossError:
+ return -1
+ return len(boards)
+
+ def collect_results(self, wrk, recv_q, state):
+ """Collect results and send more boards as threads free up
+
+ Args:
+ wrk (RemoteWorker): Worker to collect from
+ recv_q (queue.Queue): Response queue from start_reader()
+ state (DemandState): Mutable build state for this worker
+ """
+ while state.received < state.expected:
+ resp = self.recv(wrk, recv_q)
+ if resp is None:
+ return False
+ resp_type = resp.get('resp')
+ if resp_type == 'heartbeat':
+ continue
+ if resp_type == 'build_done':
+ return True
+ if resp_type != 'build_result':
+ continue
+
+ if not self.write_result(wrk, resp):
+ return False
+ state.received += 1
+
+ target = resp.get('board')
+ results = state.board_results
+ results[target] = results.get(target, 0) + 1
+ if results[target] == state.ncommits:
+ state.in_flight -= 1
+ if state.in_flight < wrk.max_boards:
+ more = state.grab_func(wrk, 1)
+ if more and self.send_batch(wrk, more) > 0:
+ state.sent += 1
+ state.in_flight += 1
+ state.expected += state.ncommits
+ if self.boss_log:
+ self.boss_log.record_sent(
+ wrk.name, state.ncommits)
+ return True
+
+ def recv_one(self, wrk, recv_q):
+ """Receive one build result, skipping progress messages
+
+ Returns:
+ bool: True to continue, False to stop this worker
+ """
+ while True:
+ resp = self.recv(wrk, recv_q)
+ if resp is None:
+ return False
+ if self.update_progress(resp, wrk):
+ continue
+ resp_type = resp.get('resp')
+ if resp_type == 'heartbeat':
+ continue
+ if resp_type == 'build_done':
+ nexc = resp.get('exceptions', 0)
+ if nexc:
+ self.log(wrk, '!!', f'worker finished with {nexc} '
+ f'thread exception(s)')
+ return False
+ if resp_type == 'build_result':
+ return self.write_result(wrk, resp)
+ return True
+
+ def close(self):
+ """Close all log files and the boss log"""
+ for logf in self.log_files.values():
+ logf.close()
+ if self.boss_log:
+ self.boss_log.log_status()
+ self.boss_log.log('dispatch: end')
+ self.boss_log.close()
+
+
+class WorkerPool:
+ """Manages a pool of remote workers for distributed builds
+
+ Handles starting workers, pushing source, distributing build jobs
+ and collecting results.
+
+ Attributes:
+ workers (list of RemoteWorker): Active workers
+ """
+
+ def __init__(self, machines):
+ """Create a worker pool from available machines
+
+ Args:
+ machines (list of Machine): Available machines from MachinePool
+ """
+ self.workers = []
+ self._machines = machines
+ self._boss_log = None
+
+ def start_all(self, git_dir, refspec, debug=False, settings=None):
+ """Start workers on all machines
+
+ Uses a three-phase approach so that each worker runs the same
+ version of buildman as the boss:
+ 1. Create git repos on all machines (parallel)
+ 2. Push source to all repos (parallel)
+ 3. Start workers from pushed source (parallel)
+ 4. Send build settings to all workers (parallel)
+
+ Args:
+ git_dir (str): Local git directory to push
+ refspec (str): Git refspec to push
+ debug (bool): True to pass -D to workers for tracebacks
+ settings (dict or None): Build settings to send to workers
+
+ Returns:
+ list of RemoteWorker: Workers that started successfully
+ """
+ # Phase 1: init git repos
+ ready = self._run_parallel(
+ 'Preparing', self._machines, self._init_one)
+
+ # Phase 2: push source
+ ready = self._run_parallel('Pushing source to', ready,
+ lambda wrk: wrk.push_source(git_dir, refspec))
+
+ # Phase 3: start workers
+ self.workers = self._run_parallel('Starting', ready,
+ lambda wrk: self._start_one(wrk, debug))
+
+ # Phase 4: send build settings
+ if settings and self.workers:
+ self._run_parallel('Configuring', self.workers,
+ lambda wrk: wrk.configure(settings))
+
+ return self.workers
+
+ def _init_one(self, mach):
+ """Create a RemoteWorker and initialise its git repo
+
+ Args:
+ mach: Machine object with hostname attribute
+
+ Returns:
+ RemoteWorker: Initialised worker
+ """
+ wrk = RemoteWorker(mach.hostname, name=mach.name)
+ wrk.toolchains = dict(mach.toolchains)
+ wrk.bogomips = mach.info.bogomips if mach.info else 0.0
+ wrk.max_boards = mach.max_boards
+ wrk.init_git()
+ return wrk
+
+ @staticmethod
+ def _start_one(wrk, debug=False):
+ """Start the worker process from the pushed tree
+
+ Args:
+ wrk (RemoteWorker): Worker to start
+ debug (bool): True to pass -D to the worker
+ """
+ wrk.start(debug=debug)
+
+ def _run_parallel(self, label, items, func):
+ """Run a function on items in parallel, collecting successes
+
+ Args:
+ label (str): Progress label (e.g. 'Pushing source to')
+ items (list): Items to process
+ func (callable): Function to call on each item. May return
+ a replacement item; if None is returned, the original
+ item is kept.
+
+ Returns:
+ list: Items that succeeded (possibly replaced by func)
+ """
+ lock = threading.Lock()
+ results = []
+ done = []
+
+ def _run_one(item):
+ name = getattr(item, 'name', getattr(item, 'hostname', str(item)))
+ try:
+ replacement = func(item)
+ with lock:
+ results.append(replacement if replacement else item)
+ done.append(name)
+ tout.progress(f'{label} workers {len(done)}/'
+ f'{len(items)}: {", ".join(done)}')
+ except WorkerBusy:
+ with lock:
+ done.append(f'{name} (BUSY)')
+ tout.progress(f'{label} workers {len(done)}/'
+ f'{len(items)}: {", ".join(done)}')
+ except BossError as exc:
+ # Clean up lock if the worker was initialised
+ if hasattr(item, 'remove_lock'):
+ item.remove_lock()
+ with lock:
+ done.append(f'{name} (FAILED)')
+ tout.progress(f'{label} workers {len(done)}/'
+ f'{len(items)}: {", ".join(done)}')
+ print(f'\n Worker failed on {name}: {exc}')
+
+ tout.progress(f'{label} workers on {len(items)} machines')
+ threads = []
+ for item in items:
+ thr = threading.Thread(target=_run_one, args=(item,))
+ thr.start()
+ threads.append(thr)
+ for thr in threads:
+ thr.join()
+ tout.clear_progress()
+ return results
+
+ @staticmethod
+ def _get_capacity(wrk):
+ """Get a worker's build capacity score
+
+ Uses nthreads * bogomips as the capacity metric. Falls back to
+ nthreads alone if bogomips is not available.
+
+ Args:
+ wrk (RemoteWorker): Worker to score
+
+ Returns:
+ float: Capacity score (higher is faster)
+ """
+ bogo = wrk.bogomips if wrk.bogomips else 1.0
+ return wrk.nthreads * bogo
+
+ def _get_worker_for_arch(self, arch, assigned):
+ """Pick the next worker that supports a given architecture
+
+ Distributes boards proportionally to each worker's capacity
+ (nthreads * bogomips). Picks the capable worker whose current
+ assignment is most below its fair share.
+
+ Args:
+ arch (str): Board architecture (e.g. 'arm', 'aarch64')
+ assigned (dict): worker -> int count of boards assigned so far
+
+ Returns:
+ RemoteWorker or None: A worker with the right toolchain
+ """
+ if arch == 'sandbox':
+ capable = list(self.workers)
+ else:
+ capable = [w for w in self.workers if arch in w.toolchains]
+ if not capable:
+ return None
+
+ total_cap = sum(self._get_capacity(w) for w in capable)
+ if not total_cap:
+ total_cap = len(capable)
+
+ # Pick the worker with the lowest assigned / capacity ratio
+ best = min(capable, key=lambda w: (assigned.get(w, 0) /
+ (self._get_capacity(w) or 1)))
+ assigned[best] = assigned.get(best, 0) + 1
+ return best
+
+ def build_boards(self, board_selected, commits, builder, local_count=0):
+ """Build boards on remote workers and write results locally
+
+ Uses demand-driven dispatch: boards are fed to workers from a
+ shared pool. Each worker gets one board per thread initially,
+ then one more each time a board completes. Faster workers
+ naturally get more boards.
+
+ Args:
+ board_selected (dict): target_name -> Board to build remotely
+ commits (list of Commit or None): Commits to build
+ builder (Builder): Builder object for result processing
+ local_count (int): Number of boards being built locally
+ """
+ if not self.workers or not board_selected:
+ return
+
+ ncommits = max(1, len(commits)) if commits else 1
+
+ # Build a pool of boards that have work remaining
+ pool = list(board_selected.values())
+ if not builder.force_build:
+ commit_range = range(len(commits)) if commits else range(1)
+ pool = [b for b in pool
+ if any(not os.path.exists(
+ builder.get_done_file(cu, b.target))
+ for cu in commit_range)]
+
+ # Filter boards that no worker can handle
+ capable_archs = set()
+ for wrk in self.workers:
+ capable_archs.update(wrk.toolchains.keys())
+ capable_archs.add('sandbox')
+ skipped = [b for b in pool if b.arch not in capable_archs]
+ pool = [b for b in pool if b.arch in capable_archs]
+ if skipped:
+ builder.count -= len(skipped) * ncommits
+
+ if not pool:
+ print('No remote jobs to dispatch')
+ return
+
+ total_jobs = len(pool) * ncommits
+ nmach = len(self.workers)
+ if local_count:
+ nmach += 1
+ parts = [f'{len(pool)} boards', f'{ncommits} commits {nmach} machines']
+ print(f'Building {" x ".join(parts)} (demand-driven)')
+
+ self._boss_log = _BossLog(builder.base_dir)
+ self._boss_log.log(f'dispatch: {len(self.workers)} workers, '
+ f'{total_jobs} total jobs')
+ for wrk in self.workers:
+ self._boss_log.init_worker(wrk)
+
+ self._dispatch_demand(pool, commits, builder, board_selected)
+
+ @staticmethod
+ def _grab_boards(pool, pool_lock, wrk, count):
+ """Take up to count boards from pool that wrk can build
+
+ Args:
+ pool (list of Board): Shared board pool (modified in place)
+ pool_lock (threading.Lock): Lock protecting the pool
+ wrk (RemoteWorker): Worker to match toolchains against
+ count (int): Maximum number of boards to take
+
+ Returns:
+ list of Board: Boards taken from the pool
+ """
+ with pool_lock:
+ batch = []
+ remaining = []
+ for brd in pool:
+ if len(batch) >= count:
+ remaining.append(brd)
+ elif (brd.arch == 'sandbox'
+ or brd.arch in wrk.toolchains):
+ batch.append(brd)
+ else:
+ remaining.append(brd)
+ pool[:] = remaining
+ return batch
+
+ def _dispatch_jobs(self, worker_jobs, builder, board_selected):
+ """Send build jobs to workers and collect results
+
+ Opens a log file per worker, then runs each worker's jobs
+ in a separate thread.
+
+ Args:
+ worker_jobs (dict): worker -> list of (board, commit_upto,
+ commit) tuples
+ builder (Builder): Builder for result processing
+ board_selected (dict): target_name -> Board mapping
+ """
+ ctx = _DispatchContext(worker_jobs.keys(), builder, board_selected,
+ self._boss_log)
+
+ if ctx.boss_log:
+ ctx.boss_log.start_timer()
+
+ threads = []
+ for wrk, wjobs in worker_jobs.items():
+ thr = threading.Thread(
+ target=self._run_batch_worker, args=(wrk, wjobs, ctx),
+ daemon=True)
+ thr.start()
+ threads.append(thr)
+ for thr in threads:
+ thr.join()
+
+ ctx.close()
+ self._boss_log = None
+
+ @staticmethod
+ def _run_batch_worker(wrk, wjobs, ctx):
+ """Send build commands to one worker and collect results
+
+ Args:
+ wrk (RemoteWorker): Worker to run
+ wjobs (list): List of (board, commit_upto, commit) tuples
+ ctx (_DispatchContext): Shared dispatch infrastructure
+ """
+ recv_q = ctx.start_reader(wrk)
+
+ board_infos = {}
+ commit_list = []
+ commit_set = set()
+ for brd, _, commit in wjobs:
+ target = brd.target
+ if target not in board_infos:
+ board_infos[target] = {
+ 'board': target, 'arch': brd.arch}
+ commit_hash = commit.hash if commit else None
+ if commit_hash not in commit_set:
+ commit_set.add(commit_hash)
+ commit_list.append(commit_hash)
+
+ boards_list = list(board_infos.values())
+ total = len(boards_list) * len(commit_list)
+
+ ctx.log(wrk, '>>', f'{len(boards_list)} boards x '
+ f'{len(commit_list)} commits')
+ if ctx.boss_log:
+ ctx.boss_log.log(f'{wrk.name}: {len(boards_list)} boards'
+ f' x {len(commit_list)} commits')
+
+ try:
+ wrk.build_boards(boards_list, commit_list)
+ except BossError as exc:
+ ctx.log(wrk, '!!', str(exc))
+ if not wrk.closing:
+ print(f'\n Error from {wrk.name}: {exc}')
+ return
+ if ctx.boss_log:
+ ctx.boss_log.record_sent(wrk.name, total)
+
+ for _ in range(total):
+ if not ctx.recv_one(wrk, recv_q):
+ return
+
+ def _start_demand_worker( # pylint: disable=R0913
+ self, wrk, ctx, commit_list, ncommits, pool, pool_lock):
+ """Prepare a worker and send the initial batch of boards
+
+ Args:
+ wrk (RemoteWorker): Worker to run
+ ctx (_DispatchContext): Shared dispatch infrastructure
+ commit_list (list of str): Commit hashes to build
+ ncommits (int): Number of commits
+ pool (list of Board): Shared board pool
+ pool_lock (threading.Lock): Lock protecting the pool
+
+ Returns:
+ tuple: (recv_q, state) on success, or (None, None) if the
+ worker failed during prepare or had no boards to build
+ """
+ recv_q = ctx.start_reader(wrk)
+
+ try:
+ wrk.build_prepare(commit_list)
+ except BossError as exc:
+ ctx.log(wrk, '!!', str(exc))
+ if not wrk.closing:
+ print(f'\n Error from {wrk.name}: {exc}')
+ return None, None
+
+ if not ctx.wait_for_prepare(wrk, recv_q):
+ return None, None
+
+ initial = self._grab_boards(pool, pool_lock, wrk, wrk.max_boards)
+ if not initial:
+ try:
+ wrk.build_done()
+ except BossError:
+ pass
+ return None, None
+
+ count = ctx.send_batch(wrk, initial)
+ if count < 0:
+ return None, None
+ if ctx.boss_log:
+ ctx.boss_log.record_sent(wrk.name, count * ncommits)
+ ctx.log(wrk, '>>', f'{count} boards (initial,'
+ f' max_boards={wrk.max_boards})')
+
+ def _grab(w, n):
+ return self._grab_boards(pool, pool_lock, w, n)
+
+ state = DemandState(count, ncommits, _grab)
+ return recv_q, state
+
+ @staticmethod
+ def _finish_demand_worker(wrk, ctx, recv_q, state):
+ """Collect results and finish the demand-driven protocol
+
+ Args:
+ wrk (RemoteWorker): Worker to collect from
+ ctx (_DispatchContext): Shared dispatch infrastructure
+ recv_q (queue.Queue): Response queue from start_reader()
+ state (DemandState): Build state from _start_demand_worker()
+ """
+ ctx.collect_results(wrk, recv_q, state)
+
+ ctx.log(wrk, '>>', f'{state.sent} boards total')
+ try:
+ wrk.build_done()
+ except BossError as exc:
+ ctx.log(wrk, '!!', str(exc))
+ return
+
+ # Wait for worker's build_done
+ while True:
+ resp = ctx.recv(wrk, recv_q)
+ if resp is None:
+ return
+ if resp.get('resp') == 'build_done':
+ break
+
+ def _dispatch_demand(self, pool, commits, builder, board_selected):
+ """Dispatch boards on demand from a shared pool
+
+ Each worker gets boards from the pool as it finishes previous
+ ones, so faster workers naturally get more work.
+
+ Args:
+ pool (list of Board): Boards available to build
+ commits (list of Commit or None): Commits to build
+ builder (Builder): Builder for result processing
+ board_selected (dict): target_name -> Board mapping
+ """
+ commit_list = [c.hash if c else None for c in (commits or [None])]
+ ncommits = len(commit_list)
+ pool_lock = threading.Lock()
+
+ ctx = _DispatchContext(
+ self.workers, builder, board_selected, self._boss_log)
+
+ if ctx.boss_log:
+ ctx.boss_log.start_timer()
+
+ def _run_worker(wrk):
+ recv_q, state = self._start_demand_worker(
+ wrk, ctx, commit_list, ncommits, pool, pool_lock)
+ if recv_q is not None:
+ self._finish_demand_worker(wrk, ctx, recv_q, state)
+
+ threads = []
+ for wrk in self.workers:
+ thr = threading.Thread(target=_run_worker, args=(wrk,),
+ daemon=True)
+ thr.start()
+ threads.append(thr)
+ for thr in threads:
+ thr.join()
+
+ ctx.close()
+ self._boss_log = None
+
+ def quit_all(self):
+ """Quit all workers gracefully"""
+ self.print_transfer_summary()
+ if self._boss_log:
+ self._boss_log.log('quit: shutting down')
+ self._boss_log.log_status()
+ self._boss_log.close()
+ self._boss_log = None
+ for wrk in self.workers:
+ try:
+ wrk.quit()
+ except BossError:
+ wrk.close()
+ self.workers = []
+
+ def print_transfer_summary(self):
+ """Print data transfer summary for all workers"""
+ if not self.workers:
+ return
+ total_sent = 0
+ total_recv = 0
+ parts = []
+ for wrk in self.workers:
+ sent = getattr(wrk, 'bytes_sent', 0)
+ recv = getattr(wrk, 'bytes_recv', 0)
+ total_sent += sent
+ total_recv += recv
+ name = getattr(wrk, 'name', '?')
+ parts.append(f'{name}:'
+ f'{_format_bytes(sent)}/'
+ f'{_format_bytes(recv)}')
+ sys.stderr.write(f'\nTransfer (sent/recv): {", ".join(parts)}'
+ f' total:{_format_bytes(total_sent)}/'
+ f'{_format_bytes(total_recv)}\n')
+ sys.stderr.flush()
+
+ def close_all(self):
+ """Stop all workers immediately
+
+ Use this on Ctrl-C. Sends a quit command to all workers first,
+ then waits briefly for the commands to travel through SSH
+ before closing the connections. This two-phase approach avoids
+ a race where closing SSH kills the connection before the quit
+ command is forwarded to the remote worker.
+ """
+ self.print_transfer_summary()
+ if self._boss_log:
+ self._boss_log.log('interrupted: Ctrl-C')
+ self._boss_log.log_status()
+ self._boss_log.close()
+ self._boss_log = None
+
+ # Suppress error messages from reader threads
+ for wrk in self.workers:
+ wrk.closing = True
+
+ # Phase 1: send quit to all workers
+ for wrk in self.workers:
+ try:
+ wrk._send({'cmd': 'quit'}) # pylint: disable=W0212
+ except BossError:
+ pass
+
+ # Brief pause so SSH can forward the quit commands to the
+ # remote workers before we tear down the connections
+ time.sleep(0.5)
+
+ # Phase 2: close all connections
+ for wrk in self.workers:
+ wrk.close()
+ wrk.remove_lock()
+ self.workers = []
@@ -43,6 +43,7 @@ def run_tests(skip_net_tests, debug, verbose, args):
from buildman import test_cfgutil
from buildman import test_machine
from buildman import test_worker
+ from buildman import test_boss
test_name = args.terms and args.terms[0] or None
if skip_net_tests:
@@ -67,6 +68,7 @@ def run_tests(skip_net_tests, debug, verbose, args):
test_builder.TestPrintBuildSummary,
test_machine,
test_worker,
+ test_boss,
'buildman.toolchain'])
return (0 if result.wasSuccessful() else 1)
new file mode 100644
@@ -0,0 +1,2645 @@
+# SPDX-License-Identifier: GPL-2.0+
+# Copyright 2026 Simon Glass <sjg@chromium.org>
+
+"""Tests for the boss module"""
+
+# pylint: disable=C0302,E1101,W0212,W0612
+
+import io
+import json
+import os
+import queue
+import random
+import shutil
+import subprocess
+import tempfile
+import threading
+import time
+import types
+import unittest
+from unittest import mock
+
+from u_boot_pylib import command
+from u_boot_pylib import terminal
+from u_boot_pylib import tools
+
+from buildman import boss
+from buildman import bsettings
+from buildman import machine
+from buildman import worker as worker_mod
+
+
+def _make_response(obj):
+ """Create a BM>-prefixed response line as bytes"""
+ return (worker_mod.RESPONSE_PREFIX + json.dumps(obj) + '\n').encode()
+
+
+class FakeProc:
+ """Fake subprocess.Popen for testing"""
+
+ def __init__(self, responses=None):
+ self.stdin = io.BytesIO()
+ self._responses = responses or []
+ self._resp_idx = 0
+ self.stdout = self
+ self.stderr = io.BytesIO(b'')
+ self._returncode = None
+
+ def readline(self):
+ """Return the next canned response line"""
+ if self._resp_idx < len(self._responses):
+ line = self._responses[self._resp_idx]
+ self._resp_idx += 1
+ return line
+ return b''
+
+ def poll(self):
+ """Return the process return code"""
+ return self._returncode
+
+ def terminate(self):
+ """Simulate SIGTERM"""
+ self._returncode = -15
+
+ def kill(self):
+ """Simulate SIGKILL"""
+ self._returncode = -9
+
+ def wait(self, timeout=None):
+ """Wait for the process (no-op)"""
+
+
+class TestRunSsh(unittest.TestCase):
+ """Test _run_ssh()"""
+
+ @mock.patch('buildman.boss.command.run_pipe')
+ def test_success(self, mock_pipe):
+ """Test successful one-shot SSH command"""
+ mock_pipe.return_value = mock.Mock(
+ stdout='/tmp/bm-worker-abc\n')
+ result = boss._run_ssh('host1', 'echo hello')
+ self.assertEqual(result, '/tmp/bm-worker-abc')
+
+ @mock.patch('buildman.boss.command.run_pipe')
+ def test_failure(self, mock_pipe):
+ """Test SSH command failure"""
+ mock_pipe.side_effect = command.CommandExc(
+ 'connection refused', command.CommandResult())
+ with self.assertRaises(boss.BossError) as ctx:
+ boss._run_ssh('host1', 'echo hello')
+ self.assertIn('SSH command failed', str(ctx.exception))
+
+
+class TestKillWorkers(unittest.TestCase):
+ """Test kill_workers()"""
+
+ @mock.patch('buildman.boss._run_ssh')
+ def test_kill_workers(self, mock_ssh):
+ """Test killing workers on multiple machines"""
+ mock_ssh.side_effect = ['killed 1234', 'no workers']
+ with terminal.capture():
+ result = boss.kill_workers(['host1', 'host2'])
+ self.assertEqual(result, 0)
+ self.assertEqual(mock_ssh.call_count, 2)
+
+ @mock.patch('buildman.boss._run_ssh')
+ def test_kill_workers_ssh_failure(self, mock_ssh):
+ """Test that SSH failures are reported but do not abort"""
+ mock_ssh.side_effect = boss.BossError('connection refused')
+ with terminal.capture():
+ result = boss.kill_workers(['host1'])
+ self.assertEqual(result, 0)
+
+
+class TestRemoteWorkerInitGit(unittest.TestCase):
+ """Test RemoteWorker.init_git()"""
+
+ @mock.patch('buildman.boss._run_ssh')
+ def test_init_git(self, mock_ssh):
+ """Test successful git init"""
+ mock_ssh.return_value = '/tmp/bm-worker-abc'
+ w = boss.RemoteWorker('host1')
+ w.init_git()
+ self.assertEqual(w.work_dir, '/tmp/bm-worker-abc')
+ self.assertEqual(w.git_dir, '/tmp/bm-worker-abc/.git')
+
+ @mock.patch('buildman.boss._run_ssh')
+ def test_init_git_busy(self, mock_ssh):
+ """Test init_git when machine is locked"""
+ mock_ssh.return_value = 'BUSY'
+ w = boss.RemoteWorker('host1')
+ with self.assertRaises(boss.WorkerBusy) as ctx:
+ w.init_git()
+ self.assertIn('busy', str(ctx.exception))
+
+ @mock.patch('buildman.boss._run_ssh')
+ def test_init_git_empty_output(self, mock_ssh):
+ """Test init_git with empty output"""
+ mock_ssh.return_value = ''
+ w = boss.RemoteWorker('host1')
+ with self.assertRaises(boss.BossError) as ctx:
+ w.init_git()
+ self.assertIn('returned no work directory', str(ctx.exception))
+
+ @mock.patch('buildman.boss._run_ssh')
+ def test_init_git_ssh_failure(self, mock_ssh):
+ """Test init_git when SSH fails"""
+ mock_ssh.side_effect = boss.BossError('connection refused')
+ w = boss.RemoteWorker('host1')
+ with self.assertRaises(boss.BossError):
+ w.init_git()
+
+
+def _make_result(board, commit_upto=0, return_code=0,
+ stderr='', stdout=''):
+ """Create a build_result response dict"""
+ return {
+ 'resp': 'build_result',
+ 'board': board,
+ 'commit_upto': commit_upto,
+ 'return_code': return_code,
+ 'stderr': stderr,
+ 'stdout': stdout,
+ }
+
+
+def _make_builder(tmpdir, force_build=True):
+ """Create a mock Builder with base_dir set"""
+ builder = mock.Mock()
+ builder.force_build = force_build
+ builder.base_dir = tmpdir
+ builder.count = 0
+ builder.get_build_dir.side_effect = (
+ lambda c, b: os.path.join(tmpdir, b))
+ return builder
+
+
+def _start_worker(hostname, mock_popen, proc):
+ """Helper to create a worker with work_dir set and start it"""
+ mock_popen.return_value = proc
+ wrk = boss.RemoteWorker(hostname)
+ wrk.work_dir = '/tmp/bm-worker-123'
+ wrk.git_dir = '/tmp/bm-worker-123/.git'
+ wrk.start()
+ return wrk
+
+
+class TestRemoteWorkerStart(unittest.TestCase):
+ """Test RemoteWorker.start()"""
+
+ @mock.patch('subprocess.Popen')
+ def test_start_success(self, mock_popen):
+ """Test successful worker start"""
+ proc = FakeProc([
+ _make_response({'resp': 'ready', 'nthreads': 8}),
+ ])
+ w = _start_worker('myhost', mock_popen, proc)
+ self.assertEqual(w.nthreads, 8)
+
+ # Check SSH command runs from pushed tree
+ cmd = mock_popen.call_args[0][0]
+ self.assertIn('ssh', cmd)
+ self.assertIn('myhost', cmd)
+ self.assertIn('--worker', cmd[-1])
+
+ @mock.patch('subprocess.Popen')
+ def test_start_not_ready(self, mock_popen):
+ """Test start when worker sends unexpected response"""
+ proc = FakeProc([
+ _make_response({'resp': 'error', 'msg': 'broken'}),
+ ])
+ mock_popen.return_value = proc
+
+ w = boss.RemoteWorker('badhost')
+ w.work_dir = '/tmp/bm-test'
+ with self.assertRaises(boss.BossError) as ctx:
+ w.start()
+ self.assertIn('did not send ready', str(ctx.exception))
+
+ @mock.patch('subprocess.Popen')
+ def test_start_ssh_failure(self, mock_popen):
+ """Test start when SSH fails to launch"""
+ mock_popen.side_effect = OSError('No such file')
+
+ w = boss.RemoteWorker('badhost')
+ w.work_dir = '/tmp/bm-test'
+ with self.assertRaises(boss.BossError) as ctx:
+ w.start()
+ self.assertIn('Failed to start SSH', str(ctx.exception))
+
+ @mock.patch('subprocess.Popen')
+ def test_start_connection_closed(self, mock_popen):
+ """Test start when connection closes immediately"""
+ proc = FakeProc([]) # No responses
+ mock_popen.return_value = proc
+
+ w = boss.RemoteWorker('deadhost')
+ w.work_dir = '/tmp/bm-test'
+ with self.assertRaises(boss.BossError) as ctx:
+ w.start()
+ self.assertIn('closed connection', str(ctx.exception))
+
+ def test_start_no_work_dir(self):
+ """Test start without init_git raises error"""
+ w = boss.RemoteWorker('host1')
+ with self.assertRaises(boss.BossError) as ctx:
+ w.start()
+ self.assertIn('call init_git', str(ctx.exception))
+
+ @mock.patch('subprocess.Popen')
+ def test_max_boards_defaults_to_nthreads(self, mock_popen):
+ """Test max_boards defaults to nthreads when not configured"""
+ proc = FakeProc([
+ _make_response({'resp': 'ready', 'nthreads': 64}),
+ ])
+ w = _start_worker('host1', mock_popen, proc)
+ self.assertEqual(w.max_boards, 64)
+
+ @mock.patch('subprocess.Popen')
+ def test_max_boards_preserved_when_set(self, mock_popen):
+ """Test max_boards keeps its configured value after start"""
+ proc = FakeProc([
+ _make_response({'resp': 'ready', 'nthreads': 256}),
+ ])
+ mock_popen.return_value = proc
+ w = boss.RemoteWorker('host1')
+ w.work_dir = '/tmp/bm-test'
+ w.max_boards = 64
+ w.start()
+ self.assertEqual(w.nthreads, 256)
+ self.assertEqual(w.max_boards, 64)
+
+
+class TestRemoteWorkerPush(unittest.TestCase):
+ """Test RemoteWorker.push_source()"""
+
+ def test_push_no_init(self):
+ """Test push before init_git raises error"""
+ w = boss.RemoteWorker('host1')
+ with self.assertRaises(boss.BossError) as ctx:
+ w.push_source('/tmp/repo', 'HEAD:refs/heads/work')
+ self.assertIn('call init_git first', str(ctx.exception))
+
+ @mock.patch('buildman.boss.command.run_pipe')
+ def test_push_success(self, mock_pipe):
+ """Test successful git push"""
+ mock_pipe.return_value = mock.Mock(return_code=0)
+ w = boss.RemoteWorker('host1')
+ w.git_dir = '/tmp/bm-worker-123/.git'
+
+ w.push_source('/home/user/u-boot', 'HEAD:refs/heads/work')
+ cmd = mock_pipe.call_args[0][0][0]
+ self.assertIn('git', cmd)
+ self.assertIn('push', cmd)
+ self.assertIn('host1:/tmp/bm-worker-123/.git', cmd)
+ self.assertIn('HEAD:refs/heads/work', cmd)
+
+ @mock.patch('buildman.boss.command.run_pipe')
+ def test_push_failure(self, mock_pipe):
+ """Test git push failure"""
+ mock_pipe.side_effect = command.CommandExc(
+ 'push failed', command.CommandResult())
+ w = boss.RemoteWorker('host1')
+ w.git_dir = '/tmp/bm/.git'
+
+ with self.assertRaises(boss.BossError) as ctx:
+ w.push_source('/tmp/repo', 'HEAD:refs/heads/work')
+ self.assertIn('git push', str(ctx.exception))
+
+
+class TestRemoteWorkerBuildBoards(unittest.TestCase):
+ """Test RemoteWorker.build_boards() and recv()"""
+
+ @mock.patch('subprocess.Popen')
+ def test_build_boards_and_recv(self, mock_popen):
+ """Test sending build_boards and receiving results"""
+ proc = FakeProc([
+ _make_response({'resp': 'ready', 'nthreads': 4}),
+ _make_response({
+ 'resp': 'build_result', 'board': 'sandbox',
+ 'commit_upto': 0, 'return_code': 0,
+ 'stderr': '', 'stdout': '',
+ }),
+ ])
+ w = _start_worker('host1', mock_popen, proc)
+ boards = [{'board': 'sandbox', 'defconfig': 'sandbox_defconfig',
+ 'env': {}}]
+ w.build_boards(boards, ['abc123'])
+
+ # Check the command was sent
+ sent = proc.stdin.getvalue().decode()
+ obj = json.loads(sent)
+ self.assertEqual(obj['cmd'], 'build_boards')
+ self.assertEqual(len(obj['boards']), 1)
+ self.assertEqual(obj['boards'][0]['board'], 'sandbox')
+ self.assertEqual(obj['commits'], ['abc123'])
+
+ # Receive result
+ resp = w.recv()
+ self.assertEqual(resp['resp'], 'build_result')
+ self.assertEqual(resp['board'], 'sandbox')
+
+
+class TestRemoteWorkerQuit(unittest.TestCase):
+ """Test RemoteWorker.quit() and close()"""
+
+ @mock.patch('buildman.boss._run_ssh')
+ @mock.patch('subprocess.Popen')
+ def test_quit(self, mock_popen, mock_ssh):
+ """Test clean quit removes the lock"""
+ proc = FakeProc([
+ _make_response({'resp': 'ready', 'nthreads': 4}),
+ _make_response({'resp': 'quit_ack'}),
+ ])
+ w = _start_worker('host1', mock_popen, proc)
+ resp = w.quit()
+ self.assertEqual(resp.get('resp'), 'quit_ack')
+ self.assertIsNone(w._proc)
+ # Lock removal SSH should have been called
+ mock_ssh.assert_called_once()
+ self.assertIn('rm -f', mock_ssh.call_args[0][1])
+
+ @mock.patch('subprocess.Popen')
+ def test_close_without_quit(self, mock_popen):
+ """Test close without sending quit"""
+ proc = FakeProc([
+ _make_response({'resp': 'ready', 'nthreads': 4}),
+ ])
+ w = _start_worker('host1', mock_popen, proc)
+ self.assertIsNotNone(w._proc)
+ w.close()
+ self.assertIsNone(w._proc)
+
+ def test_close_when_not_started(self):
+ """Test close on a worker that was never started"""
+ w = boss.RemoteWorker('host1')
+ w.close() # Should not raise
+
+
+class TestRemoteWorkerRecv(unittest.TestCase):
+ """Test response parsing"""
+
+ @mock.patch('subprocess.Popen')
+ def test_skip_non_protocol_lines(self, mock_popen):
+ """Test that non-BM> lines are skipped"""
+ proc = FakeProc([
+ b'Welcome to myhost\n',
+ b'Last login: Mon Jan 1\n',
+ _make_response({'resp': 'ready', 'nthreads': 2}),
+ ])
+ w = _start_worker('host1', mock_popen, proc)
+ self.assertEqual(w.nthreads, 2)
+
+ @mock.patch('subprocess.Popen')
+ def test_bad_json(self, mock_popen):
+ """Test bad JSON in protocol line"""
+ proc = FakeProc([
+ (worker_mod.RESPONSE_PREFIX + 'not json\n').encode(),
+ ])
+ mock_popen.return_value = proc
+
+ w = boss.RemoteWorker('host1')
+ w._proc = proc
+ with self.assertRaises(boss.BossError) as ctx:
+ w._recv()
+ self.assertIn('Bad JSON', str(ctx.exception))
+
+
+class TestRemoteWorkerSend(unittest.TestCase):
+ """Test _send()"""
+
+ def test_send_when_not_running(self):
+ """Test sending to a stopped worker"""
+ w = boss.RemoteWorker('host1')
+ with self.assertRaises(boss.BossError) as ctx:
+ w._send({'cmd': 'quit'})
+ self.assertIn('not running', str(ctx.exception))
+
+
+class TestRemoteWorkerRepr(unittest.TestCase):
+ """Test __repr__"""
+
+ def test_repr_stopped(self):
+ """Test repr when stopped"""
+ w = boss.RemoteWorker('host1')
+ self.assertIn('host1', repr(w))
+ self.assertIn('stopped', repr(w))
+
+ @mock.patch('subprocess.Popen')
+ def test_repr_running(self, mock_popen):
+ """Test repr when running"""
+ proc = FakeProc([
+ _make_response({'resp': 'ready', 'nthreads': 8}),
+ ])
+ w = _start_worker('host1', mock_popen, proc)
+ self.assertIn('running', repr(w))
+ self.assertIn('nthreads=8', repr(w))
+ w.close()
+
+
+class FakeBoard: # pylint: disable=R0903
+ """Fake board for testing split_boards()"""
+
+ def __init__(self, target, arch):
+ self.target = target
+ self.arch = arch
+
+
+class TestSplitBoards(unittest.TestCase):
+ """Test split_boards()"""
+
+ def test_all_local(self):
+ """Test when no remote toolchains match"""
+ boards = {
+ 'sandbox': FakeBoard('sandbox', 'sandbox'),
+ 'rpi': FakeBoard('rpi', 'arm'),
+ }
+ local, remote = boss.split_boards(boards, {'x86': '/usr/bin/gcc'})
+ self.assertEqual(len(local), 2)
+ self.assertEqual(len(remote), 0)
+
+ def test_all_remote(self):
+ """Test when all boards have remote toolchains"""
+ boards = {
+ 'rpi': FakeBoard('rpi', 'arm'),
+ 'odroid': FakeBoard('odroid', 'arm'),
+ }
+ local, remote = boss.split_boards(boards, {'arm': '/usr/bin/gcc'})
+ self.assertEqual(len(local), 0)
+ self.assertEqual(len(remote), 2)
+
+ def test_mixed(self):
+ """Test split with some local, some remote"""
+ boards = {
+ 'sandbox': FakeBoard('sandbox', 'sandbox'),
+ 'rpi': FakeBoard('rpi', 'arm'),
+ 'qemu': FakeBoard('qemu', 'riscv'),
+ }
+ local, remote = boss.split_boards(
+ boards, {'arm': '/usr/bin/gcc', 'riscv': '/usr/bin/gcc'})
+ self.assertEqual(len(local), 1)
+ self.assertIn('sandbox', local)
+ self.assertEqual(len(remote), 2)
+ self.assertIn('rpi', remote)
+ self.assertIn('qemu', remote)
+
+ def test_empty_toolchains(self):
+ """Test with no remote toolchains"""
+ boards = {'sandbox': FakeBoard('sandbox', 'sandbox')}
+ local, remote = boss.split_boards(boards, {})
+ self.assertEqual(len(local), 1)
+ self.assertEqual(len(remote), 0)
+
+ def test_none_toolchains(self):
+ """Test with None toolchains"""
+ boards = {'sandbox': FakeBoard('sandbox', 'sandbox')}
+ local, remote = boss.split_boards(boards, None)
+ self.assertEqual(len(local), 1)
+ self.assertEqual(len(remote), 0)
+
+
+class TestWriteRemoteResult(unittest.TestCase):
+ """Test _write_remote_result()"""
+
+ def test_success(self):
+ """Test writing a successful build result"""
+ with tempfile.TemporaryDirectory() as tmpdir:
+ build_dir = os.path.join(tmpdir, 'sandbox')
+ builder = mock.Mock()
+ builder.get_build_dir.return_value = build_dir
+
+ resp = {
+ 'resp': 'build_result',
+ 'board': 'sandbox',
+ 'commit_upto': 0,
+ 'return_code': 0,
+ 'stderr': '',
+ 'stdout': 'build output',
+ }
+ boss._write_remote_result(builder, resp, {}, 'host1')
+
+ build_dir = builder.get_build_dir.return_value
+ self.assertTrue(os.path.isdir(build_dir))
+
+ self.assertEqual(
+ tools.read_file(os.path.join(build_dir, 'done'),
+ binary=False), '0\n')
+ self.assertEqual(
+ tools.read_file(os.path.join(build_dir, 'log'),
+ binary=False), 'build output')
+ self.assertFalse(os.path.exists(
+ os.path.join(build_dir, 'err')))
+
+ def test_failure_with_stderr(self):
+ """Test writing a failed build result with stderr"""
+ with tempfile.TemporaryDirectory() as tmpdir:
+ build_dir = os.path.join(tmpdir, 'rpi')
+ builder = mock.Mock()
+ builder.get_build_dir.return_value = build_dir
+
+ resp = {
+ 'resp': 'build_result',
+ 'board': 'rpi',
+ 'commit_upto': 1,
+ 'return_code': 2,
+ 'stderr': 'error: undefined reference',
+ 'stdout': '',
+ }
+ boss._write_remote_result(builder, resp, {}, 'host1')
+
+ build_dir = builder.get_build_dir.return_value
+ self.assertEqual(
+ tools.read_file(os.path.join(build_dir, 'done'),
+ binary=False), '2\n')
+ self.assertEqual(
+ tools.read_file(os.path.join(build_dir, 'err'),
+ binary=False),
+ 'error: undefined reference')
+
+ def test_with_sizes(self):
+ """Test writing a result with size information"""
+ with tempfile.TemporaryDirectory() as tmpdir:
+ build_dir = os.path.join(tmpdir, 'sandbox')
+ builder = mock.Mock()
+ builder.get_build_dir.return_value = build_dir
+
+ sizes_raw = (' text data bss dec hex\n'
+ ' 12345 1234 567 14146 374a\n')
+ resp = {
+ 'resp': 'build_result',
+ 'board': 'sandbox',
+ 'commit_upto': 0,
+ 'return_code': 0,
+ 'stderr': '',
+ 'stdout': '',
+ 'sizes': {'raw': sizes_raw},
+ }
+ boss._write_remote_result(builder, resp, {}, 'host1')
+
+ # Boss strips the header line from sizes
+ build_dir = builder.get_build_dir.return_value
+ self.assertEqual(
+ tools.read_file(os.path.join(build_dir, 'sizes'),
+ binary=False),
+ ' 12345 1234 567 14146 374a')
+
+
+class FakeMachineInfo: # pylint: disable=R0903
+ """Fake machine info for testing"""
+ bogomips = 5000.0
+
+
+class FakeMachine: # pylint: disable=R0903
+ """Fake machine for testing WorkerPool"""
+
+ def __init__(self, hostname):
+ self.hostname = hostname
+ self.name = hostname
+ self.toolchains = {'arm': '/usr/bin/arm-linux-gnueabihf-gcc'}
+ self.info = FakeMachineInfo()
+ self.max_boards = 0
+
+def _make_worker():
+ """Create a RemoteWorker with mocked subprocess for testing
+
+ Uses __new__ to avoid calling __init__ which requires real SSH.
+ Sets all attributes to safe defaults.
+ """
+ wrk = boss.RemoteWorker.__new__(boss.RemoteWorker)
+ wrk.hostname = 'host1'
+ wrk.name = 'host1'
+ wrk.nthreads = 4
+ wrk.bogomips = 5000.0
+ wrk.max_boards = 0
+ wrk.slots = 2
+ wrk.toolchains = {}
+ wrk._proc = mock.Mock()
+ wrk._proc.poll.return_value = None # process is running
+ wrk._log = None
+ wrk._closed = False
+ wrk._closing = False
+ wrk._stderr_buf = []
+ wrk._stderr_thread = None
+ wrk._stderr_lines = []
+ wrk._ready = queue.Queue()
+ wrk._lock_file = None
+ wrk._work_dir = ''
+ wrk._git_dir = ''
+ wrk.work_dir = ''
+ wrk.timeout = 10
+ wrk.bytes_sent = 0
+ wrk.bytes_recv = 0
+ wrk.closing = False
+ return wrk
+
+
+def _make_ctx(board_selected=None):
+ """Create a _DispatchContext with temp directory for testing
+
+ Returns:
+ tuple: (ctx, wrk, tmpdir) — caller must call ctx.close() and
+ shutil.rmtree(tmpdir)
+ """
+ wrk = mock.Mock(nthreads=4, closing=False, max_boards=0,
+ slots=2, toolchains={'arm': '/gcc'})
+ wrk.name = 'host1'
+ builder = mock.Mock()
+ tmpdir = tempfile.mkdtemp()
+ builder.base_dir = tmpdir
+ ctx = boss._DispatchContext(
+ workers=[wrk], builder=builder,
+ board_selected=board_selected or {},
+ boss_log=mock.Mock())
+ return ctx, wrk, tmpdir
+
+
+class TestWorkerPool(unittest.TestCase):
+ """Test WorkerPool"""
+
+ @mock.patch('subprocess.Popen')
+ @mock.patch('buildman.boss._run_ssh')
+ @mock.patch('buildman.boss.command.run_pipe')
+ def test_start_all(self, mock_pipe, mock_ssh, mock_popen):
+ """Test starting workers on multiple machines"""
+ mock_ssh.return_value = '/tmp/bm-1'
+ mock_pipe.return_value = mock.Mock(return_code=0)
+ proc1 = FakeProc([
+ _make_response({'resp': 'ready', 'nthreads': 4}),
+ ])
+ proc2 = FakeProc([
+ _make_response({'resp': 'ready', 'nthreads': 8}),
+ ])
+ mock_popen.side_effect = [proc1, proc2]
+
+ machines = [FakeMachine('host1'), FakeMachine('host2')]
+ pool = boss.WorkerPool(machines)
+ with terminal.capture():
+ workers = pool.start_all('/tmp/repo', 'HEAD:refs/heads/work')
+ self.assertEqual(len(workers), 2)
+
+ @mock.patch('subprocess.Popen')
+ @mock.patch('buildman.boss._run_ssh')
+ @mock.patch('buildman.boss.command.run_pipe')
+ def test_start_all_with_settings(self, mock_pipe, mock_ssh, mock_popen):
+ """Test that start_all sends settings via configure"""
+ mock_ssh.return_value = '/tmp/bm-1'
+ mock_pipe.return_value = mock.Mock(return_code=0)
+ proc1 = FakeProc([
+ _make_response({'resp': 'ready', 'nthreads': 4}),
+ _make_response({'resp': 'configure_done'}),
+ ])
+ proc2 = FakeProc([
+ _make_response({'resp': 'ready', 'nthreads': 8}),
+ _make_response({'resp': 'configure_done'}),
+ ])
+ mock_popen.side_effect = [proc1, proc2]
+
+ machines = [FakeMachine('host1'), FakeMachine('host2')]
+ pool = boss.WorkerPool(machines)
+ settings = {'no_lto': True, 'allow_missing': True}
+ with terminal.capture():
+ workers = pool.start_all('/tmp/repo', 'HEAD:refs/heads/work',
+ settings=settings)
+ self.assertEqual(len(workers), 2)
+ # Check that configure was sent (2nd response consumed)
+ self.assertEqual(proc1._resp_idx, 2)
+ self.assertEqual(proc2._resp_idx, 2)
+
+ @mock.patch('buildman.boss._run_ssh')
+ def test_start_all_init_failure(self, mock_ssh):
+ """Test start_all when init_git fails on one machine"""
+ call_count = [0]
+
+ def _side_effect(_hostname, _cmd, **_kwargs):
+ call_count[0] += 1
+ if call_count[0] % 2 == 0:
+ raise boss.BossError('connection refused')
+ return '/tmp/bm-1'
+
+ mock_ssh.side_effect = _side_effect
+
+ machines = [FakeMachine('good'), FakeMachine('bad')]
+ pool = boss.WorkerPool(machines)
+ with terminal.capture():
+ workers = pool.start_all('/tmp/repo', 'HEAD:refs/heads/work')
+ # Only 'good' survives init phase; push and start not reached
+ # for 'bad'
+ self.assertLessEqual(len(workers), 1)
+
+ def test_quit_all(self):
+ """Test quitting all workers"""
+ pool = boss.WorkerPool([])
+ w1 = mock.Mock(spec=boss.RemoteWorker)
+ w2 = mock.Mock(spec=boss.RemoteWorker)
+ pool.workers = [w1, w2]
+ with terminal.capture():
+ pool.quit_all()
+ w1.quit.assert_called_once()
+ w2.quit.assert_called_once()
+ self.assertEqual(len(pool.workers), 0)
+
+ def test_quit_all_with_error(self):
+ """Test quit_all when a worker raises BossError"""
+ pool = boss.WorkerPool([])
+ w1 = mock.Mock(spec=boss.RemoteWorker)
+ w1.quit.side_effect = boss.BossError('connection lost')
+ pool.workers = [w1]
+ with terminal.capture():
+ pool.quit_all()
+ w1.close.assert_called_once()
+ self.assertEqual(len(pool.workers), 0)
+
+ def test_build_boards_empty(self):
+ """Test build_boards with no workers or boards"""
+ pool = boss.WorkerPool([])
+ with terminal.capture():
+ pool.build_boards({}, None, mock.Mock()) # Should not raise
+
+
+class TestBuildBoards(unittest.TestCase):
+ """Test WorkerPool.build_boards() end-to-end"""
+
+ def setUp(self):
+ self._tmpdir = tempfile.mkdtemp()
+
+ def tearDown(self):
+ shutil.rmtree(self._tmpdir, ignore_errors=True)
+
+ @staticmethod
+ def _make_worker(hostname, toolchains, responses):
+ """Create a mock RemoteWorker with canned responses
+
+ Args:
+ hostname (str): Hostname for the worker
+ toolchains (dict): arch -> gcc path
+ responses (list of dict): Responses to return from recv()
+
+ Returns:
+ Mock: Mock RemoteWorker
+ """
+ wrk = mock.Mock(spec=boss.RemoteWorker)
+ wrk.hostname = hostname
+ wrk.name = hostname
+ wrk.toolchains = toolchains
+ wrk.nthreads = 4
+ wrk.max_boards = 4
+ wrk.bogomips = 5000.0
+ wrk.slots = 4
+ wrk.recv.side_effect = list(responses)
+ return wrk
+
+ def _demand_responses(self, *results):
+ """Build recv responses for the demand-driven protocol
+
+ Returns a list starting with build_prepare_done, followed by
+ the given results, and ending with build_done.
+ """
+ return ([{'resp': 'build_prepare_done'}] + list(results)
+ + [{'resp': 'build_done'}])
+
+ def test_build_boards_success(self):
+ """Test building boards across workers with correct results"""
+ wrk1 = self._make_worker('host1', {'arm': '/usr/bin/arm-gcc'},
+ self._demand_responses(_make_result('rpi')))
+ wrk2 = self._make_worker('host2',
+ {'riscv': '/usr/bin/riscv64-gcc'},
+ self._demand_responses(_make_result('odroid')))
+
+ pool = boss.WorkerPool([])
+ pool.workers = [wrk1, wrk2]
+
+ boards = {
+ 'rpi': FakeBoard('rpi', 'arm'),
+ 'odroid': FakeBoard('odroid', 'riscv'),
+ }
+ builder = _make_builder(self._tmpdir)
+
+ with terminal.capture():
+ pool.build_boards(boards, None, builder)
+
+ # Both workers should have received build_prepare
+ wrk1.build_prepare.assert_called_once()
+ wrk2.build_prepare.assert_called_once()
+
+ # Verify done files were written
+ for board in ['rpi', 'odroid']:
+ done_path = os.path.join(self._tmpdir, board, 'done')
+ self.assertTrue(os.path.exists(done_path))
+
+ def test_board_stays_on_same_worker(self):
+ """Test that all commits for a board go to the same worker"""
+ commits = [types.SimpleNamespace(hash=f'abc{i}') for i in range(3)]
+
+ # Two workers with different archs, each gets one board
+ wrk1 = self._make_worker('host1', {'arm': '/usr/bin/arm-gcc'},
+ self._demand_responses(
+ *[_make_result('rpi', commit_upto=i)
+ for i in range(3)]))
+ wrk2 = self._make_worker('host2',
+ {'riscv': '/usr/bin/riscv64-gcc'},
+ self._demand_responses(
+ *[_make_result('odroid', commit_upto=i)
+ for i in range(3)]))
+
+ pool = boss.WorkerPool([])
+ pool.workers = [wrk1, wrk2]
+
+ boards = {
+ 'rpi': FakeBoard('rpi', 'arm'),
+ 'odroid': FakeBoard('odroid', 'riscv'),
+ }
+ builder = _make_builder(self._tmpdir)
+
+ with terminal.capture():
+ pool.build_boards(boards, commits, builder)
+
+ # Each worker gets one board via build_board
+ wrk1.build_board.assert_called_once()
+ wrk2.build_board.assert_called_once()
+
+ # Check each worker got the right board
+ self.assertEqual(wrk1.build_board.call_args[0][0], 'rpi')
+ self.assertEqual(wrk2.build_board.call_args[0][0], 'odroid')
+
+ def test_arch_passed(self):
+ """Test that the board's arch is sent to the worker"""
+ wrk = self._make_worker(
+ 'host1',
+ {'arm': '/usr/bin/arm-linux-gnueabihf-gcc'},
+ self._demand_responses(_make_result('rpi')))
+
+ pool = boss.WorkerPool([])
+ pool.workers = [wrk]
+
+ boards = {'rpi': FakeBoard('rpi', 'arm')}
+ builder = _make_builder(self._tmpdir)
+
+ with terminal.capture():
+ pool.build_boards(boards, None, builder)
+
+ wrk.build_board.assert_called_once()
+ # build_board(board, arch)
+ self.assertEqual(wrk.build_board.call_args[0][1], 'arm')
+
+ def test_worker_error_response(self):
+ """Test that error responses are caught and stop the worker"""
+ wrk = self._make_worker(
+ 'host1', {'arm': '/usr/bin/arm-gcc'}, [
+ {'resp': 'error', 'msg': 'no work directory set up'},
+ ])
+
+ pool = boss.WorkerPool([])
+ pool.workers = [wrk]
+
+ boards = {
+ 'rpi': FakeBoard('rpi', 'arm'),
+ 'odroid': FakeBoard('odroid', 'arm'),
+ }
+ builder = _make_builder(self._tmpdir)
+
+ with terminal.capture():
+ pool.build_boards(boards, None, builder)
+
+ # build_prepare sent, error on first recv stops the worker
+ self.assertTrue(wrk.build_prepare.called)
+
+ def test_boss_error_stops_worker(self):
+ """Test that BossError from recv() stops the worker"""
+ wrk = self._make_worker(
+ 'host1', {'arm': '/usr/bin/arm-gcc'}, [])
+ wrk.recv.side_effect = boss.BossError('connection lost')
+
+ pool = boss.WorkerPool([])
+ pool.workers = [wrk]
+
+ boards = {
+ 'rpi': FakeBoard('rpi', 'arm'),
+ 'odroid': FakeBoard('odroid', 'arm'),
+ }
+ builder = _make_builder(self._tmpdir)
+
+ with terminal.capture():
+ pool.build_boards(boards, None, builder)
+
+ # build_prepare sent, but only one recv attempted before error
+ self.assertTrue(wrk.build_prepare.called)
+ self.assertEqual(wrk.recv.call_count, 1)
+
+ def test_toolchain_matching(self):
+ """Test boards only go to workers with the right toolchain"""
+ wrk_arm = self._make_worker(
+ 'arm-host', {'arm': '/usr/bin/arm-gcc'},
+ self._demand_responses(
+ _make_result('rpi'),
+ _make_result('odroid'),
+ ))
+ wrk_riscv = self._make_worker(
+ 'rv-host', {'riscv': '/usr/bin/riscv64-gcc'},
+ self._demand_responses(
+ _make_result('qemu_rv'),
+ ))
+
+ pool = boss.WorkerPool([])
+ pool.workers = [wrk_arm, wrk_riscv]
+
+ boards = {
+ 'rpi': FakeBoard('rpi', 'arm'),
+ 'odroid': FakeBoard('odroid', 'arm'),
+ 'qemu_rv': FakeBoard('qemu_rv', 'riscv'),
+ }
+ builder = _make_builder(self._tmpdir)
+
+ with terminal.capture():
+ pool.build_boards(boards, None, builder)
+
+ # arm boards go to wrk_arm, riscv to wrk_riscv
+ arm_boards = {call[0][0]
+ for call in wrk_arm.build_board.call_args_list}
+ rv_boards = {call[0][0]
+ for call in wrk_riscv.build_board.call_args_list}
+ self.assertEqual(arm_boards, {'rpi', 'odroid'})
+ self.assertEqual(rv_boards, {'qemu_rv'})
+
+ def test_sandbox_any_worker(self):
+ """Test that sandbox boards can go to any worker"""
+ wrk = self._make_worker(
+ 'host1', {'arm': '/usr/bin/arm-gcc'},
+ self._demand_responses(
+ _make_result('sandbox'),
+ ))
+
+ pool = boss.WorkerPool([])
+ pool.workers = [wrk]
+
+ boards = {'sandbox': FakeBoard('sandbox', 'sandbox')}
+ builder = _make_builder(self._tmpdir)
+
+ with terminal.capture():
+ pool.build_boards(boards, None, builder)
+
+ # sandbox should be sent to the worker even though it has
+ # no 'sandbox' toolchain — sandbox uses the host compiler
+ wrk.build_board.assert_called_once()
+ self.assertEqual(wrk.build_board.call_args[0][1], 'sandbox')
+
+ def test_skip_done_boards(self):
+ """Test that already-done boards are skipped without force"""
+ # Create a done file for 'rpi'
+ done_path = os.path.join(self._tmpdir, 'rpi_done')
+ tools.write_file(done_path, '0\n', binary=False)
+
+ wrk = self._make_worker(
+ 'host1', {'arm': '/usr/bin/arm-gcc'},
+ self._demand_responses(
+ _make_result('odroid'),
+ ))
+
+ pool = boss.WorkerPool([])
+ pool.workers = [wrk]
+
+ boards = {
+ 'rpi': FakeBoard('rpi', 'arm'),
+ 'odroid': FakeBoard('odroid', 'arm'),
+ }
+ builder = _make_builder(self._tmpdir, force_build=False)
+ builder.get_done_file.side_effect = (
+ lambda c, b: os.path.join(self._tmpdir, f'{b}_done'))
+
+ with terminal.capture():
+ pool.build_boards(boards, None, builder)
+
+ # Only odroid should be built (rpi has done file)
+ wrk.build_board.assert_called_once()
+ self.assertEqual(wrk.build_board.call_args[0][0], 'odroid')
+
+ def test_no_capable_worker(self):
+ """Test boards with no capable worker are silently skipped"""
+ wrk = self._make_worker(
+ 'host1', {'arm': '/usr/bin/arm-gcc'},
+ [{'resp': 'build_prepare_done'}])
+
+ pool = boss.WorkerPool([])
+ pool.workers = [wrk]
+
+ boards = {'qemu_rv': FakeBoard('qemu_rv', 'riscv')}
+ builder = _make_builder(self._tmpdir)
+
+ with terminal.capture():
+ pool.build_boards(boards, None, builder)
+
+ # No build_board should be sent (no riscv worker)
+ wrk.build_board.assert_not_called()
+
+ def test_progress_updated(self):
+ """Test that process_result is called for each build result"""
+ wrk = self._make_worker(
+ 'host1', {'arm': '/usr/bin/arm-gcc'},
+ self._demand_responses(
+ _make_result('rpi'),
+ _make_result('odroid'),
+ ))
+
+ pool = boss.WorkerPool([])
+ pool.workers = [wrk]
+
+ brd_rpi = FakeBoard('rpi', 'arm')
+ brd_odroid = FakeBoard('odroid', 'arm')
+ boards = {'rpi': brd_rpi, 'odroid': brd_odroid}
+ builder = _make_builder(self._tmpdir)
+
+ with terminal.capture():
+ pool.build_boards(boards, None, builder)
+
+ # process_result should be called for each board
+ self.assertEqual(builder.process_result.call_count, 2)
+ # Each result should have remote set to hostname
+ for call in builder.process_result.call_args_list:
+ result = call[0][0]
+ self.assertEqual(result.remote, 'host1')
+
+ def test_log_files_created(self):
+ """Test that worker log files are created in the output dir"""
+ wrk = self._make_worker(
+ 'myhost', {'arm': '/usr/bin/arm-gcc'},
+ self._demand_responses(
+ _make_result('rpi'),
+ ))
+
+ pool = boss.WorkerPool([])
+ pool.workers = [wrk]
+
+ boards = {'rpi': FakeBoard('rpi', 'arm')}
+ builder = _make_builder(self._tmpdir)
+
+ with terminal.capture():
+ pool.build_boards(boards, None, builder)
+
+ log_path = os.path.join(self._tmpdir, 'worker-myhost.log')
+ self.assertTrue(os.path.exists(log_path))
+ content = tools.read_file(log_path, binary=False)
+ self.assertIn('>> 1 boards', content)
+ self.assertIn('<< ', content)
+ self.assertIn('build_result', content)
+
+ def test_heartbeat_resets_timeout(self):
+ """Test that heartbeat messages are accepted without error"""
+ wrk = self._make_worker(
+ 'host1', {'arm': '/usr/bin/arm-gcc'}, [
+ {'resp': 'build_prepare_done'},
+ {'resp': 'heartbeat', 'board': 'rpi', 'thread': 0},
+ _make_result('rpi'),
+ {'resp': 'build_done'},
+ ])
+
+ pool = boss.WorkerPool([])
+ pool.workers = [wrk]
+
+ boards = {'rpi': FakeBoard('rpi', 'arm')}
+ builder = _make_builder(self._tmpdir)
+
+ with terminal.capture():
+ pool.build_boards(boards, None, builder)
+
+ # The heartbeat should be silently consumed, result processed
+ self.assertEqual(builder.process_result.call_count, 1)
+
+ def test_build_done_stops_worker(self):
+ """Test that build_done ends collection without timeout"""
+ wrk = self._make_worker(
+ 'host1', {'arm': '/usr/bin/arm-gcc'}, [
+ {'resp': 'build_prepare_done'},
+ _make_result('rpi'),
+ # Worker had 2 boards but only 1 result, then
+ # build_done
+ {'resp': 'build_done', 'exceptions': 1},
+ # Final build_done response after boss sends
+ # build_done
+ {'resp': 'build_done'},
+ ])
+
+ pool = boss.WorkerPool([])
+ pool.workers = [wrk]
+
+ boards = {
+ 'rpi': FakeBoard('rpi', 'arm'),
+ 'odroid': FakeBoard('odroid', 'arm'),
+ }
+ builder = _make_builder(self._tmpdir)
+
+ with terminal.capture():
+ pool.build_boards(boards, None, builder)
+
+ # Only 1 result processed (odroid was lost to a thread
+ # exception)
+ self.assertEqual(builder.process_result.call_count, 1)
+
+
+class TestPipelinedBuilds(unittest.TestCase):
+ """Test pipelined builds with multiple slots"""
+
+ def setUp(self):
+ self._tmpdir = tempfile.mkdtemp()
+
+ def tearDown(self):
+ shutil.rmtree(self._tmpdir, ignore_errors=True)
+
+ def test_multiple_boards(self):
+ """Test that boss sends build_prepare then build_board for each"""
+ wrk = TestBuildBoards._make_worker(
+ 'host1', {'arm': '/usr/bin/arm-gcc'}, [
+ {'resp': 'build_prepare_done'},
+ _make_result('b1'),
+ _make_result('b2'),
+ _make_result('b3'),
+ _make_result('b4'),
+ {'resp': 'build_done'},
+ ])
+
+ pool = boss.WorkerPool([])
+ pool.workers = [wrk]
+
+ boards = {
+ 'b1': FakeBoard('b1', 'arm'),
+ 'b2': FakeBoard('b2', 'arm'),
+ 'b3': FakeBoard('b3', 'arm'),
+ 'b4': FakeBoard('b4', 'arm'),
+ }
+ builder = _make_builder(self._tmpdir)
+ with terminal.capture():
+ pool.build_boards(boards, None, builder)
+
+ # build_prepare called once, build_board called 4 times
+ wrk.build_prepare.assert_called_once()
+ sent_boards = {call[0][0]
+ for call in wrk.build_board.call_args_list}
+ self.assertEqual(sent_boards, {'b1', 'b2', 'b3', 'b4'})
+ # All 4 results collected
+ self.assertEqual(builder.process_result.call_count, 4)
+
+ @mock.patch('subprocess.Popen')
+ def test_slots_from_ready(self, mock_popen):
+ """Test that slots is read from the worker's ready response"""
+ proc = FakeProc([
+ _make_response(
+ {'resp': 'ready', 'nthreads': 20, 'slots': 5}),
+ ])
+ w = _start_worker('host1', mock_popen, proc)
+ self.assertEqual(w.nthreads, 20)
+ self.assertEqual(w.slots, 5)
+
+ @mock.patch('subprocess.Popen')
+ def test_slots_default(self, mock_popen):
+ """Test that slots defaults to 1 for old workers"""
+ proc = FakeProc([
+ _make_response({'resp': 'ready', 'nthreads': 8}),
+ ])
+ w = _start_worker('host1', mock_popen, proc)
+ self.assertEqual(w.slots, 1)
+
+
+class TestBuildTimeout(unittest.TestCase):
+ """Test that the build timeout prevents hangs"""
+
+ def setUp(self):
+ self._tmpdir = tempfile.mkdtemp()
+ self._orig_timeout = boss.BUILD_TIMEOUT
+
+ def tearDown(self):
+ shutil.rmtree(self._tmpdir, ignore_errors=True)
+ boss.BUILD_TIMEOUT = self._orig_timeout
+
+ def test_recv_timeout(self):
+ """Test that a hung worker times out instead of blocking"""
+
+ # Use a very short timeout so the test runs quickly
+ boss.BUILD_TIMEOUT = 0.5
+
+ wrk = TestBuildBoards._make_worker(
+ 'slowhost', {'arm': '/usr/bin/arm-gcc'}, [])
+ wrk.nthreads = 1
+ wrk.max_boards = 1
+ wrk.slots = 1
+
+ # Simulate a worker that never responds: recv blocks forever
+ wrk.recv.side_effect = lambda: time.sleep(60)
+
+ pool = boss.WorkerPool([])
+ pool.workers = [wrk]
+
+ boards = {'rpi': FakeBoard('rpi', 'arm')}
+ builder = _make_builder(self._tmpdir)
+
+ start = time.monotonic()
+ with terminal.capture():
+ pool.build_boards(boards, None, builder)
+ elapsed = time.monotonic() - start
+
+ # Should complete quickly (within a few seconds), not hang
+ self.assertLess(elapsed, 10)
+
+ # No results should have been processed
+ builder.process_result.assert_not_called()
+
+
+class TestMachineMaxBoards(unittest.TestCase):
+ """Test per-machine max_boards config"""
+
+ def test_max_boards_from_config(self):
+ """Test [machine:name] section sets max_boards"""
+ bsettings.setup('')
+ bsettings.settings.read_string("""
+[machines]
+ruru
+weka
+
+[machine:ruru]
+max_boards = 64
+""")
+ pool = machine.MachinePool()
+ pool._load_from_config()
+ by_name = {m.name: m for m in pool.machines}
+ self.assertEqual(by_name['ruru'].max_boards, 64)
+ self.assertEqual(by_name['weka'].max_boards, 0)
+
+ def test_max_boards_default(self):
+ """Test max_boards is 0 when no per-machine section exists"""
+ mach = machine.Machine('host1')
+ self.assertEqual(mach.max_boards, 0)
+
+
+class TestGccVersion(unittest.TestCase):
+ """Test gcc_version()"""
+
+ def test_buildman_toolchain(self):
+ """Test extracting version from a buildman-fetched toolchain"""
+ path = ('/home/sglass/.buildman-toolchains/gcc-13.1.0-nolibc/'
+ 'aarch64-linux/bin/aarch64-linux-gcc')
+ self.assertEqual(machine.gcc_version(path), 'gcc-13.1.0-nolibc')
+
+ def test_different_version(self):
+ """Test a different gcc version"""
+ path = ('/home/sglass/.buildman-toolchains/gcc-11.1.0-nolibc/'
+ 'aarch64-linux/bin/aarch64-linux-gcc')
+ self.assertEqual(machine.gcc_version(path), 'gcc-11.1.0-nolibc')
+
+ def test_system_gcc(self):
+ """Test a system gcc with no version directory"""
+ self.assertIsNone(machine.gcc_version('/usr/bin/gcc'))
+
+ def test_empty(self):
+ """Test an empty path"""
+ self.assertIsNone(machine.gcc_version(''))
+
+
+
+class _PipelineWorker: # pylint: disable=R0902
+ """Mock worker that simulates the demand-driven build protocol
+
+ Responds to build_prepare, build_board, and build_done commands
+ via the recv() queue, simulating real worker behaviour.
+ """
+
+ def __init__(self, nthreads, max_boards=0):
+ self.hostname = 'sim-host'
+ self.name = 'sim-host'
+ self.toolchains = {'arm': '/usr/bin/arm-gcc'}
+ self.nthreads = nthreads
+ self.max_boards = max_boards or nthreads
+ self.slots = nthreads
+ self.bogomips = 5000.0
+ self.closing = False
+
+ self._ready = queue.Queue()
+ self._commits = None
+
+ def build_prepare(self, commits):
+ """Accept prepare command and queue ready response"""
+ self._commits = commits
+ self._ready.put({'resp': 'build_prepare_done'})
+
+ def build_board(self, board, _arch):
+ """Queue results for this board across all commits"""
+
+ def _produce():
+ for cu in range(len(self._commits)):
+ time.sleep(random.uniform(0.001, 0.005))
+ self._ready.put({
+ 'resp': 'build_result',
+ 'board': board,
+ 'commit_upto': cu,
+ 'return_code': 0,
+ 'stderr': '',
+ 'stdout': '',
+ })
+
+ threading.Thread(target=_produce, daemon=True).start()
+
+ def build_done(self):
+ """Queue the build_done response after a short delay"""
+ def _respond():
+ time.sleep(0.05)
+ self._ready.put({'resp': 'build_done'})
+ threading.Thread(target=_respond, daemon=True).start()
+
+ def recv(self):
+ """Wait for the next result"""
+ return self._ready.get()
+
+
+class TestBuildBoardsUtilisation(unittest.TestCase):
+ """Test that build_boards dispatches correctly to workers
+
+ The boss sends one build_boards command per worker. The mock
+ worker simulates board-first scheduling and produces results.
+ """
+
+ def setUp(self):
+ self._tmpdir = tempfile.mkdtemp()
+
+ def tearDown(self):
+ shutil.rmtree(self._tmpdir, ignore_errors=True)
+
+ def _run_build(self, nboards, ncommits, nthreads, max_boards=0):
+ """Run a build and return the mock worker"""
+ wrk = _PipelineWorker(nthreads, max_boards=max_boards)
+
+ pool = boss.WorkerPool([])
+ pool.workers = [wrk]
+
+ boards = {}
+ for i in range(nboards):
+ target = f'board{i}'
+ boards[target] = FakeBoard(target, 'arm')
+
+ commits = [types.SimpleNamespace(hash=f'commit{i}')
+ for i in range(ncommits)]
+
+ builder = mock.Mock()
+ builder.force_build = True
+ builder.base_dir = self._tmpdir
+ builder.count = 0
+ builder.get_build_dir.side_effect = (
+ lambda c, b: os.path.join(self._tmpdir, b))
+
+ with terminal.capture():
+ pool.build_boards(boards, commits, builder)
+ return wrk
+
+ def test_all_results_collected(self):
+ """Verify boss collects all board x commit results"""
+ nboards = 30
+ ncommits = 10
+ wrk = self._run_build(nboards, ncommits, nthreads=8)
+
+ # The ready queue should be empty (boss drained everything)
+ self.assertTrue(wrk._ready.empty())
+
+ def test_large_scale(self):
+ """Test at realistic scale: 200 boards x 10 commits"""
+ wrk = self._run_build(nboards=200, ncommits=10,
+ nthreads=32)
+ self.assertTrue(wrk._ready.empty())
+
+ def test_max_boards_caps_batch(self):
+ """Test that max_boards limits initial and in-flight boards"""
+ wrk = self._run_build(nboards=30, ncommits=5,
+ nthreads=32, max_boards=8)
+ self.assertTrue(wrk._ready.empty())
+ # Worker should still receive all boards despite the cap
+ self.assertEqual(wrk.max_boards, 8)
+
+
+class TestFormatBytes(unittest.TestCase):
+ """Test _format_bytes()"""
+
+ def test_format_bytes(self):
+ """Test bytes, KB and MB ranges"""
+ self.assertEqual(boss._format_bytes(0), '0B')
+ self.assertEqual(boss._format_bytes(1023), '1023B')
+ self.assertEqual(boss._format_bytes(1024), '1.0KB')
+ self.assertEqual(boss._format_bytes(1536), '1.5KB')
+ self.assertEqual(boss._format_bytes(1024 * 1024), '1.0MB')
+ self.assertEqual(boss._format_bytes(5 * 1024 * 1024), '5.0MB')
+
+
+# Tests merged into TestWriteRemoteResult above
+
+ def test_with_stderr(self):
+ """Test writing result with stderr"""
+ bldr = mock.Mock()
+ bldr.get_build_dir.return_value = tempfile.mkdtemp()
+ resp = {
+ 'board': 'sandbox',
+ 'commit_upto': 0,
+ 'return_code': 2,
+ 'stderr': 'error: missing header\n',
+ 'stdout': '',
+ }
+ boss._write_remote_result(bldr, resp, {'sandbox': mock.Mock()},
+ 'host1')
+ build_dir = bldr.get_build_dir.return_value
+ err_path = os.path.join(build_dir, 'err')
+ self.assertIn('error:',
+ tools.read_file(err_path, binary=False))
+
+ shutil.rmtree(build_dir)
+
+ def test_removes_stale_err(self):
+ """Test that stale err file is removed on success"""
+ bldr = mock.Mock()
+ build_dir = tempfile.mkdtemp()
+ bldr.get_build_dir.return_value = build_dir
+ err_path = os.path.join(build_dir, 'err')
+ tools.write_file(err_path, 'old error', binary=False)
+ resp = {
+ 'board': 'sandbox',
+ 'commit_upto': 0,
+ 'return_code': 0,
+ 'stderr': '',
+ 'stdout': '',
+ }
+ boss._write_remote_result(bldr, resp, {'sandbox': mock.Mock()},
+ 'host1')
+ self.assertFalse(os.path.exists(err_path))
+
+ shutil.rmtree(build_dir)
+
+
+class TestRemoteWorkerMethods(unittest.TestCase):
+ """Test RemoteWorker send/recv/close methods"""
+
+ def test_send(self):
+ """Test _send writes JSON to stdin"""
+ wrk = _make_worker()
+ wrk._proc.stdin = mock.Mock()
+ wrk._send({'cmd': 'quit'})
+ wrk._proc.stdin.write.assert_called_once()
+ data = wrk._proc.stdin.write.call_args[0][0]
+ self.assertIn(b'quit', data)
+
+ def test_send_broken_pipe(self):
+ """Test _send raises BrokenPipeError on broken pipe"""
+ wrk = _make_worker()
+ wrk._proc.stdin.write.side_effect = BrokenPipeError()
+ with self.assertRaises(BrokenPipeError):
+ wrk._send({'cmd': 'quit'})
+
+ def test_build_commands(self):
+ """Test build_prepare, build_board and build_done commands"""
+ wrk = _make_worker()
+ wrk._send = mock.Mock()
+
+ wrk.build_prepare(['abc123'])
+ self.assertEqual(wrk._send.call_args[0][0]['cmd'],
+ 'build_prepare')
+
+ wrk._send.reset_mock()
+ wrk.build_board('sandbox', 'sandbox')
+ self.assertEqual(wrk._send.call_args[0][0]['cmd'],
+ 'build_board')
+
+ wrk._send.reset_mock()
+ wrk.build_done()
+ self.assertEqual(wrk._send.call_args[0][0]['cmd'],
+ 'build_done')
+
+ def test_quit(self):
+ """Test quit sends command and closes, handles errors"""
+ wrk = _make_worker()
+ wrk._send = mock.Mock()
+ wrk._recv = mock.Mock(return_value={'resp': 'quit_ack'})
+ wrk.close = mock.Mock()
+ wrk.quit()
+ wrk._send.assert_called_once()
+ wrk.close.assert_called_once()
+
+ # Error path: BossError during quit still closes
+ wrk2 = _make_worker()
+ wrk2._send = mock.Mock(side_effect=boss.BossError('gone'))
+ wrk2.close = mock.Mock()
+ wrk2.quit()
+ wrk2.close.assert_called_once()
+
+ def test_close_idempotent(self):
+ """Test close can be called multiple times"""
+ wrk = _make_worker()
+ wrk.close()
+ self.assertIsNone(wrk._proc)
+ wrk.close() # should not raise
+
+ @mock.patch('buildman.boss._run_ssh')
+ def test_remove_lock(self, mock_ssh):
+ """Test remove_lock: SSH call, no work_dir, SSH failure"""
+ wrk = _make_worker()
+ wrk.work_dir = '/tmp/bm'
+ wrk.remove_lock()
+ mock_ssh.assert_called_once()
+
+ # No work_dir: does nothing
+ wrk2 = _make_worker()
+ wrk2.work_dir = ''
+ wrk2.remove_lock()
+
+ # SSH failure: silently ignored
+ mock_ssh.side_effect = boss.BossError('gone')
+ wrk3 = _make_worker()
+ wrk3.work_dir = '/tmp/bm'
+ wrk3.remove_lock()
+
+
+
+
+
+class TestWorkerPoolCapacity(unittest.TestCase):
+ """Test WorkerPool capacity and arch assignment"""
+
+ def test_get_capacity(self):
+ """Test capacity calculation"""
+ wrk = mock.Mock(nthreads=8, bogomips=5000.0)
+ self.assertEqual(
+ boss.WorkerPool._get_capacity(wrk), 40000.0)
+
+ def test_get_capacity_no_bogomips(self):
+ """Test capacity with no bogomips falls back to nthreads"""
+ wrk = mock.Mock(nthreads=4, bogomips=0)
+ self.assertEqual(boss.WorkerPool._get_capacity(wrk), 4.0)
+
+ def test_get_worker_for_arch(self):
+ """Test arch-based worker selection"""
+ w1 = mock.Mock(nthreads=8, bogomips=5000.0,
+ toolchains={'arm': '/gcc'})
+ w2 = mock.Mock(nthreads=4, bogomips=5000.0,
+ toolchains={'arm': '/gcc'})
+ pool = boss.WorkerPool.__new__(boss.WorkerPool)
+ pool.workers = [w1, w2]
+ assigned = {}
+
+ # First assignment should go to w1 (higher capacity)
+ wrk = pool._get_worker_for_arch('arm', assigned)
+ self.assertEqual(wrk, w1)
+
+ # Second should go to w2 (w1 already has 1)
+ wrk = pool._get_worker_for_arch('arm', assigned)
+ self.assertEqual(wrk, w2)
+
+ def test_get_worker_sandbox(self):
+ """Test sandbox goes to any worker"""
+ w1 = mock.Mock(nthreads=4, bogomips=1000.0, toolchains={})
+ pool = boss.WorkerPool.__new__(boss.WorkerPool)
+ pool.workers = [w1]
+ assigned = {}
+ wrk = pool._get_worker_for_arch('sandbox', assigned)
+ self.assertEqual(wrk, w1)
+
+ def test_get_worker_no_capable(self):
+ """Test returns None when no worker supports arch"""
+ w1 = mock.Mock(nthreads=4, bogomips=1000.0,
+ toolchains={'arm': '/gcc'})
+ pool = boss.WorkerPool.__new__(boss.WorkerPool)
+ pool.workers = [w1]
+ self.assertIsNone(
+ pool._get_worker_for_arch('mips', {}))
+
+
+class TestBossLog(unittest.TestCase):
+ """Test _BossLog"""
+
+ def test_log_and_close(self):
+ """Test logging and closing"""
+ with tempfile.TemporaryDirectory() as tmpdir:
+ blog = boss._BossLog(tmpdir)
+ wrk = mock.Mock(name='host1', nthreads=4)
+ wrk.name = 'host1'
+ blog.init_worker(wrk)
+ blog.log('test message')
+ blog.record_sent('host1', 3)
+ blog.record_recv('host1', load_avg=2.5)
+ blog.close()
+
+ log_path = os.path.join(tmpdir, '.buildman.log')
+ content = tools.read_file(log_path, binary=False)
+ self.assertIn('test message', content)
+
+ def test_log_status(self):
+ """Test log_status writes counts"""
+ with tempfile.TemporaryDirectory() as tmpdir:
+ blog = boss._BossLog(tmpdir)
+ wrk = mock.Mock(name='host1', nthreads=4)
+ wrk.name = 'host1'
+ blog.init_worker(wrk)
+ blog.record_sent('host1', 5)
+ blog.record_recv('host1')
+ blog.record_recv('host1')
+ blog.log_status()
+ blog.close()
+
+ content = tools.read_file(
+ os.path.join(tmpdir, '.buildman.log'), binary=False)
+ self.assertIn('host1', content)
+
+ def test_start_timer(self):
+ """Test start_timer and close with elapsed"""
+ with tempfile.TemporaryDirectory() as tmpdir:
+ blog = boss._BossLog(tmpdir)
+ blog.start_timer()
+ blog.close()
+
+
+class TestDispatchContext(unittest.TestCase):
+ """Test _DispatchContext"""
+
+ def test_update_progress_build_started(self):
+ """Test worktree progress: build_started"""
+ wrk = mock.Mock(nthreads=4)
+ wrk.name = 'host1'
+ builder = mock.Mock()
+ with tempfile.TemporaryDirectory() as tmpdir:
+ builder.base_dir = tmpdir
+ ctx = boss._DispatchContext(
+ workers=[wrk], builder=builder,
+ board_selected={}, boss_log=mock.Mock())
+ resp = {'resp': 'build_started', 'num_threads': 8}
+ self.assertTrue(ctx.update_progress(resp, wrk))
+ ctx.close()
+
+ def test_update_progress_worktree_created(self):
+ """Test worktree progress: worktree_created"""
+ wrk = mock.Mock(nthreads=2)
+ wrk.name = 'host1'
+ builder = mock.Mock()
+ with tempfile.TemporaryDirectory() as tmpdir:
+ builder.base_dir = tmpdir
+ ctx = boss._DispatchContext(
+ workers=[wrk], builder=builder,
+ board_selected={}, boss_log=mock.Mock())
+ ctx.update_progress(
+ {'resp': 'build_started', 'num_threads': 2}, wrk)
+ self.assertTrue(ctx.update_progress(
+ {'resp': 'worktree_created'}, wrk))
+ ctx.close()
+
+ def test_update_progress_other(self):
+ """Test non-progress messages return False"""
+ wrk = mock.Mock(nthreads=4)
+ wrk.name = 'host1'
+ builder = mock.Mock()
+ with tempfile.TemporaryDirectory() as tmpdir:
+ builder.base_dir = tmpdir
+ ctx = boss._DispatchContext(
+ workers=[wrk], builder=builder,
+ board_selected={}, boss_log=mock.Mock())
+ self.assertFalse(ctx.update_progress(
+ {'resp': 'build_result'}, wrk))
+ ctx.close()
+
+ def test_log(self):
+ """Test per-worker log file"""
+ wrk = mock.Mock(nthreads=4)
+ wrk.name = 'host1'
+ builder = mock.Mock()
+ with tempfile.TemporaryDirectory() as tmpdir:
+ builder.base_dir = tmpdir
+ ctx = boss._DispatchContext(
+ workers=[wrk], builder=builder,
+ board_selected={}, boss_log=mock.Mock())
+ ctx.log(wrk, '>>>', 'test message')
+ ctx.close()
+ content = tools.read_file(
+ os.path.join(tmpdir, 'worker-host1.log'),
+ binary=False)
+ self.assertIn('test message', content)
+
+
+class TestRemoteWorkerClose(unittest.TestCase):
+ """Test RemoteWorker.close() error paths"""
+
+ def test_close_error_paths(self):
+ """Test close: stdin OSError, terminate timeout, kill timeout"""
+ # stdin.close() raises OSError
+ wrk = _make_worker()
+ wrk._proc.stdin.close.side_effect = OSError('broken')
+ wrk.close()
+ self.assertIsNone(wrk._proc)
+
+ # wait() times out, terminate succeeds
+ wrk2 = _make_worker()
+ wrk2._proc.wait.side_effect = [
+ subprocess.TimeoutExpired('ssh', 2),
+ None,
+ ]
+ wrk2.close()
+ self.assertIsNone(wrk2._proc)
+
+ # wait() times out twice, falls back to kill
+ wrk3 = _make_worker()
+ wrk3._proc.wait.side_effect = [
+ subprocess.TimeoutExpired('ssh', 2),
+ subprocess.TimeoutExpired('ssh', 3),
+ ]
+ wrk3.close()
+ self.assertIsNone(wrk3._proc)
+
+ def test_configure_rejected(self):
+ """Test configure raises on rejection"""
+ wrk = _make_worker()
+ wrk._send = mock.Mock()
+ wrk._recv = mock.Mock(return_value={'resp': 'error', 'msg': 'bad'})
+ with self.assertRaises(boss.BossError):
+ wrk.configure({'no_lto': True})
+
+ def test_get_stderr(self):
+ """Test _get_stderr returns last non-empty line, or empty"""
+ wrk = _make_worker()
+ wrk._stderr_thread = mock.Mock()
+ wrk._stderr_lines = ['first', '', 'last error', '']
+ self.assertEqual(wrk._get_stderr(), 'last error')
+
+ wrk._stderr_lines = []
+ self.assertEqual(wrk._get_stderr(), '')
+
+
+# Tests merged into TestWriteRemoteResult above
+
+ def test_sizes_with_header(self):
+ """Test that size output header line is stripped"""
+ bldr = mock.Mock()
+ build_dir = tempfile.mkdtemp()
+ bldr.get_build_dir.return_value = build_dir
+ resp = {
+ 'board': 'sandbox',
+ 'commit_upto': 0,
+ 'return_code': 0,
+ 'stderr': '',
+ 'stdout': '',
+ 'sizes': {
+ 'raw': (' text data bss dec hex\n'
+ ' 1000 200 100 1300 514\n')},
+ }
+ boss._write_remote_result(bldr, resp, {'sandbox': mock.Mock()},
+ 'host1')
+ sizes_content = tools.read_file(
+ os.path.join(build_dir, 'sizes'), binary=False)
+ self.assertNotIn('text', sizes_content)
+ self.assertIn('1000', sizes_content)
+
+ shutil.rmtree(build_dir)
+
+
+
+
+
+class TestWorkerPoolEdgeCases(unittest.TestCase):
+ """Test WorkerPool edge cases"""
+
+ def test_print_transfer_empty(self):
+ """Test print_transfer_summary with no workers"""
+ pool = boss.WorkerPool([])
+ with terminal.capture():
+ pool.print_transfer_summary()
+
+ def test_quit_all_with_boss_log(self):
+ """Test quit_all closes boss_log"""
+ pool = boss.WorkerPool([])
+ blog = mock.Mock()
+ pool._boss_log = blog
+ with terminal.capture():
+ pool.quit_all()
+ blog.log.assert_called()
+ blog.close.assert_called_once()
+
+ def test_build_boards_with_local_count(self):
+ """Test build_boards progress includes local count"""
+ pool = boss.WorkerPool([])
+ wrk = mock.Mock(
+ nthreads=4, bogomips=1000.0, slots=2,
+ toolchains={'arm': '/gcc'}, closing=False)
+ wrk.name = 'host1'
+ wrk.recv.side_effect = [
+ {'resp': 'build_prepare_done'},
+ {'resp': 'build_result', 'board': 'rpi',
+ 'commit_upto': 0, 'return_code': 0,
+ 'stderr': '', 'stdout': ''},
+ {'resp': 'build_done'},
+ ]
+ pool.workers = [wrk]
+
+ builder = mock.Mock()
+ builder.force_build = True
+ builder.base_dir = tempfile.mkdtemp()
+ builder.count = 0
+ builder.get_build_dir.return_value = tempfile.mkdtemp()
+
+ boards = {'rpi': mock.Mock(target='rpi', arch='arm')}
+ with terminal.capture():
+ pool.build_boards(boards, None, builder, local_count=5)
+
+ shutil.rmtree(builder.base_dir)
+ shutil.rmtree(builder.get_build_dir.return_value)
+
+ def test_get_capacity_no_bogomips(self):
+ """Test _get_worker_for_arch falls back when bogomips is 0"""
+ w1 = mock.Mock(nthreads=4, bogomips=0, toolchains={'arm': '/gcc'})
+ pool = boss.WorkerPool.__new__(boss.WorkerPool)
+ pool.workers = [w1]
+ wrk = pool._get_worker_for_arch('arm', {})
+ self.assertEqual(wrk, w1)
+
+ def test_close_all(self):
+ """Test close_all with boss_log"""
+ pool = boss.WorkerPool([])
+ wrk = mock.Mock()
+ wrk.closing = False
+ wrk.bytes_sent = 100
+ wrk.bytes_recv = 200
+ wrk.name = 'host1'
+ pool.workers = [wrk]
+ blog = mock.Mock()
+ pool._boss_log = blog
+ with terminal.capture():
+ pool.close_all()
+ wrk.close.assert_called()
+ wrk.remove_lock.assert_called()
+ blog.close.assert_called_once()
+ self.assertEqual(len(pool.workers), 0)
+
+
+class TestDispatchContextRecv(unittest.TestCase):
+ """Test _DispatchContext.recv() error paths"""
+
+
+ @mock.patch.object(boss, 'BUILD_TIMEOUT', 0.01)
+ def test_recv_timeout(self):
+ """Test recv returns None on timeout"""
+ ctx, wrk, tmpdir = _make_ctx()
+ recv_q = queue.Queue() # empty queue
+ with terminal.capture():
+ result = ctx.recv(wrk, recv_q)
+ self.assertIsNone(result)
+ ctx.close()
+ shutil.rmtree(tmpdir)
+
+ def test_recv_error_response(self):
+ """Test recv returns None on error response"""
+ ctx, wrk, tmpdir = _make_ctx()
+ recv_q = queue.Queue()
+ recv_q.put(('error', 'something broke'))
+ with terminal.capture():
+ result = ctx.recv(wrk, recv_q)
+ self.assertIsNone(result)
+ ctx.close()
+ shutil.rmtree(tmpdir)
+
+ def test_recv_worker_error(self):
+ """Test recv returns None on worker error response"""
+ ctx, wrk, tmpdir = _make_ctx(
+ {'rpi': mock.Mock(target='rpi', arch='arm')})
+ recv_q = queue.Queue()
+ recv_q.put(('resp', {'resp': 'error', 'msg': 'oops'}))
+ with terminal.capture():
+ result = ctx.recv(wrk, recv_q)
+ self.assertIsNone(result)
+ ctx.close()
+ shutil.rmtree(tmpdir)
+
+ def test_write_result_exception(self):
+ """Test write_result catches exceptions"""
+ ctx, wrk, tmpdir = _make_ctx(
+ {'rpi': mock.Mock(target='rpi', arch='arm')})
+ ctx.builder.get_build_dir.side_effect = RuntimeError('boom')
+ resp = {'board': 'sandbox', 'commit_upto': 0,
+ 'return_code': 0}
+ with terminal.capture():
+ result = ctx.write_result(wrk, resp)
+ self.assertFalse(result)
+ ctx.close()
+ shutil.rmtree(tmpdir)
+
+ def test_wait_for_prepare(self):
+ """Test wait_for_prepare with prepare_done"""
+ ctx, wrk, tmpdir = _make_ctx()
+ recv_q = queue.Queue()
+ recv_q.put(('resp', {'resp': 'build_prepare_done'}))
+ result = ctx.wait_for_prepare(wrk, recv_q)
+ self.assertTrue(result)
+ ctx.close()
+ shutil.rmtree(tmpdir)
+
+ @mock.patch.object(boss, 'BUILD_TIMEOUT', 0.01)
+ def test_wait_for_prepare_timeout(self):
+ """Test wait_for_prepare returns False on timeout"""
+ ctx, wrk, tmpdir = _make_ctx()
+ recv_q = queue.Queue()
+ with terminal.capture():
+ result = ctx.wait_for_prepare(wrk, recv_q)
+ self.assertFalse(result)
+ ctx.close()
+ shutil.rmtree(tmpdir)
+
+ def test_send_batch_error(self):
+ """Test send_batch returns -1 on BossError"""
+ ctx, wrk, tmpdir = _make_ctx(
+ {'rpi': mock.Mock(target='rpi', arch='arm')})
+ wrk.build_board.side_effect = boss.BossError('gone')
+ brd = mock.Mock(target='rpi', arch='arm')
+ result = ctx.send_batch(wrk, [brd])
+ self.assertEqual(result, -1)
+ ctx.close()
+ shutil.rmtree(tmpdir)
+
+ def test_collect_results_heartbeat(self):
+ """Test collect_results skips heartbeat"""
+ ctx, wrk, tmpdir = _make_ctx(
+ {'rpi': mock.Mock(target='rpi', arch='arm')})
+ ctx.board_selected = {'rpi': mock.Mock(target='rpi', arch='arm')}
+ build_dir = tempfile.mkdtemp()
+ ctx.builder.get_build_dir.return_value = build_dir
+ recv_q = queue.Queue()
+ recv_q.put(('resp', {'resp': 'heartbeat'}))
+ recv_q.put(('resp', {'resp': 'build_result', 'board': 'rpi',
+ 'commit_upto': 0, 'return_code': 0,
+ 'stderr': '', 'stdout': ''}))
+ recv_q.put(('resp', {'resp': 'build_done'}))
+ state = boss.DemandState(
+ sent=1, ncommits=1, grab_func=lambda w, c: [])
+ with terminal.capture():
+ result = ctx.collect_results(wrk, recv_q, state)
+ self.assertTrue(result)
+ ctx.close()
+ shutil.rmtree(tmpdir)
+ shutil.rmtree(build_dir)
+
+
+class TestForwardStderr(unittest.TestCase):
+ """Test RemoteWorker._forward_stderr()"""
+
+ def test_forward_stderr(self):
+ """Test stderr collection and OSError handling"""
+ wrk = boss.RemoteWorker.__new__(boss.RemoteWorker)
+ wrk.name = 'host1'
+ wrk._stderr_lines = []
+ wrk._proc = mock.Mock()
+ wrk._proc.stderr = [b'error line 1\n', b'error line 2\n', b'']
+ with terminal.capture():
+ wrk._forward_stderr()
+ self.assertEqual(wrk._stderr_lines,
+ ['error line 1', 'error line 2'])
+
+ # OSError path: silently handled
+ wrk2 = boss.RemoteWorker.__new__(boss.RemoteWorker)
+ wrk2.name = 'host1'
+ wrk2._stderr_lines = []
+ wrk2._proc = mock.Mock()
+ wrk2._proc.stderr.__iter__ = mock.Mock(
+ side_effect=OSError('closed'))
+ wrk2._forward_stderr()
+
+
+class TestStartDebug(unittest.TestCase):
+ """Test RemoteWorker.start() debug flag (line 242)"""
+
+ @mock.patch('subprocess.Popen')
+ @mock.patch('buildman.boss._run_ssh')
+ def test_debug_flag(self, _mock_ssh, mock_popen):
+ """Test start() passes -D when debug=True"""
+ proc = mock.Mock()
+ proc.stdout.readline.return_value = (
+ b'BM> {"resp":"ready","nthreads":4,"slots":2}\n')
+ proc.poll.return_value = None
+ proc.stderr = [] # empty iterable for _forward_stderr thread
+ mock_popen.return_value = proc
+
+ wrk = boss.RemoteWorker.__new__(boss.RemoteWorker)
+ wrk.hostname = 'host1'
+ wrk.name = 'host1'
+ wrk._proc = None
+ wrk._closed = False
+ wrk._closing = False
+ wrk._stderr_lines = []
+ wrk._stderr_thread = None
+ wrk._ready = queue.Queue()
+ wrk._log = None
+ wrk.bytes_sent = 0
+ wrk.bytes_recv = 0
+ wrk.nthreads = 0
+ wrk.slots = 0
+ wrk.max_boards = 0
+ wrk.toolchains = {}
+ wrk.closing = False
+ wrk._work_dir = '/tmp/bm'
+ wrk._git_dir = '/tmp/bm/.git'
+ wrk.work_dir = '/tmp/bm'
+ wrk.timeout = 10
+
+ wrk.start(debug=True)
+ cmd = mock_popen.call_args[0][0]
+ self.assertIn('-D', ' '.join(cmd))
+
+
+class TestBossLogTimer(unittest.TestCase):
+ """Test _BossLog.start_timer() (lines 607-612)"""
+
+ def test_timer_ticks(self):
+ """Test that the timer fires and logs status"""
+ with tempfile.TemporaryDirectory() as tmpdir:
+ blog = boss._BossLog(tmpdir)
+ wrk = mock.Mock(nthreads=4)
+ wrk.name = 'host1'
+ blog.init_worker(wrk)
+ blog.record_sent('host1', 5)
+
+ # Patch STATUS_INTERVAL to fire quickly
+ with mock.patch.object(boss, 'STATUS_INTERVAL', 0.01):
+ blog.start_timer()
+ time.sleep(0.05)
+ blog.close()
+
+ content = tools.read_file(
+ os.path.join(tmpdir, '.buildman.log'), binary=False)
+ # Timer should have logged at least one status line
+ self.assertIn('host1', content)
+
+
+class TestWaitForPrepareProgress(unittest.TestCase):
+ """Test wait_for_prepare with progress and heartbeat messages"""
+
+
+ def test_heartbeat_during_prepare(self):
+ """Test heartbeat messages are skipped during prepare"""
+ ctx, wrk, tmpdir = _make_ctx()
+ recv_q = queue.Queue()
+ recv_q.put(('resp', {'resp': 'heartbeat'}))
+ recv_q.put(('resp', {'resp': 'build_prepare_done'}))
+ result = ctx.wait_for_prepare(wrk, recv_q)
+ self.assertTrue(result)
+ ctx.close()
+ shutil.rmtree(tmpdir)
+
+ def test_progress_during_prepare(self):
+ """Test worktree progress during prepare"""
+ ctx, wrk, tmpdir = _make_ctx()
+ recv_q = queue.Queue()
+ recv_q.put(('resp', {'resp': 'build_started', 'num_threads': 2}))
+ recv_q.put(('resp', {'resp': 'worktree_created'}))
+ recv_q.put(('resp', {'resp': 'build_prepare_done'}))
+ result = ctx.wait_for_prepare(wrk, recv_q)
+ self.assertTrue(result)
+ ctx.close()
+ shutil.rmtree(tmpdir)
+
+ def test_unexpected_during_prepare(self):
+ """Test unexpected response during prepare returns False"""
+ ctx, wrk, tmpdir = _make_ctx()
+ recv_q = queue.Queue()
+ recv_q.put(('resp', {'resp': 'something_weird'}))
+ result = ctx.wait_for_prepare(wrk, recv_q)
+ self.assertFalse(result)
+ ctx.close()
+ shutil.rmtree(tmpdir)
+
+
+class TestRecvOne(unittest.TestCase):
+ """Test _DispatchContext.recv_one()"""
+
+
+ @mock.patch.object(boss, 'BUILD_TIMEOUT', 0.01)
+ def test_recv_one_timeout(self):
+ """Test recv_one returns False on timeout"""
+ ctx, wrk, tmpdir = _make_ctx()
+ recv_q = queue.Queue()
+ with terminal.capture():
+ result = ctx.recv_one(wrk, recv_q)
+ self.assertFalse(result)
+ ctx.close()
+ shutil.rmtree(tmpdir)
+
+ def test_recv_one_build_done(self):
+ """Test recv_one handles build_done with exceptions"""
+ ctx, wrk, tmpdir = _make_ctx(
+ {'rpi': mock.Mock(target='rpi', arch='arm')})
+ recv_q = queue.Queue()
+ recv_q.put(('resp', {'resp': 'build_done', 'exceptions': 2}))
+ result = ctx.recv_one(wrk, recv_q)
+ self.assertFalse(result)
+ ctx.close()
+ shutil.rmtree(tmpdir)
+
+ def test_recv_one_build_result(self):
+ """Test recv_one processes build_result"""
+ ctx, wrk, tmpdir = _make_ctx(
+ {'rpi': mock.Mock(target='rpi', arch='arm')})
+ build_dir = tempfile.mkdtemp()
+ ctx.builder.get_build_dir.return_value = build_dir
+ recv_q = queue.Queue()
+ recv_q.put(('resp', {'resp': 'build_result', 'board': 'rpi',
+ 'commit_upto': 0, 'return_code': 0,
+ 'stderr': '', 'stdout': ''}))
+ with terminal.capture():
+ result = ctx.recv_one(wrk, recv_q)
+ self.assertTrue(result)
+ ctx.close()
+ shutil.rmtree(tmpdir)
+ shutil.rmtree(build_dir)
+
+ def test_recv_one_heartbeat(self):
+ """Test recv_one skips heartbeat then gets result"""
+ ctx, wrk, tmpdir = _make_ctx(
+ {'rpi': mock.Mock(target='rpi', arch='arm')})
+ build_dir = tempfile.mkdtemp()
+ ctx.builder.get_build_dir.return_value = build_dir
+ recv_q = queue.Queue()
+ recv_q.put(('resp', {'resp': 'heartbeat'}))
+ recv_q.put(('resp', {'resp': 'build_result', 'board': 'rpi',
+ 'commit_upto': 0, 'return_code': 0,
+ 'stderr': '', 'stdout': ''}))
+ with terminal.capture():
+ result = ctx.recv_one(wrk, recv_q)
+ self.assertTrue(result)
+ ctx.close()
+ shutil.rmtree(tmpdir)
+ shutil.rmtree(build_dir)
+
+ def test_recv_one_other(self):
+ """Test recv_one returns True for unknown response"""
+ ctx, wrk, tmpdir = _make_ctx(
+ {'rpi': mock.Mock(target='rpi', arch='arm')})
+ recv_q = queue.Queue()
+ recv_q.put(('resp', {'resp': 'something_else'}))
+ result = ctx.recv_one(wrk, recv_q)
+ self.assertTrue(result)
+ ctx.close()
+ shutil.rmtree(tmpdir)
+
+ def test_recv_one_progress(self):
+ """Test recv_one handles progress then result"""
+ ctx, wrk, tmpdir = _make_ctx(
+ {'rpi': mock.Mock(target='rpi', arch='arm')})
+ build_dir = tempfile.mkdtemp()
+ ctx.builder.get_build_dir.return_value = build_dir
+ recv_q = queue.Queue()
+ recv_q.put(('resp', {'resp': 'build_started', 'num_threads': 2}))
+ recv_q.put(('resp', {'resp': 'build_result', 'board': 'rpi',
+ 'commit_upto': 0, 'return_code': 0,
+ 'stderr': '', 'stdout': ''}))
+ with terminal.capture():
+ result = ctx.recv_one(wrk, recv_q)
+ self.assertTrue(result)
+ ctx.close()
+ shutil.rmtree(tmpdir)
+ shutil.rmtree(build_dir)
+
+
+class TestCollectResultsExtended(unittest.TestCase):
+ """Test collect_results with more board grabbing"""
+
+
+ def test_collect_grabs_more(self):
+ """Test collect_results grabs more boards when in_flight drops"""
+ ctx, wrk, tmpdir = _make_ctx(
+ {'rpi': mock.Mock(target='rpi', arch='arm')})
+ wrk.max_boards = 2
+ build_dir = tempfile.mkdtemp()
+ ctx.builder.get_build_dir.return_value = build_dir
+ extra_brd = mock.Mock(target='extra', arch='arm')
+ grab_calls = [0]
+
+ def grab(_w, _n):
+ grab_calls[0] += 1
+ if grab_calls[0] == 1:
+ return [extra_brd]
+ return []
+
+ state = boss.DemandState(sent=1, ncommits=1, grab_func=grab)
+
+ recv_q = queue.Queue()
+ recv_q.put(('resp', {'resp': 'build_result', 'board': 'rpi',
+ 'commit_upto': 0, 'return_code': 0,
+ 'stderr': '', 'stdout': ''}))
+ # After rpi completes, in_flight drops to 0 < max_boards=2,
+ # grab returns extra_brd, then build_done ends collection
+ recv_q.put(('resp', {'resp': 'build_done'}))
+
+ with terminal.capture():
+ ctx.collect_results(wrk, recv_q, state)
+
+ self.assertEqual(state.received, 1)
+ self.assertGreater(grab_calls[0], 0)
+ ctx.close()
+ shutil.rmtree(tmpdir)
+ shutil.rmtree(ctx.builder.get_build_dir.return_value)
+
+ def test_collect_write_failure(self):
+ """Test collect_results stops on write_result failure"""
+ ctx, wrk, tmpdir = _make_ctx(
+ {'rpi': mock.Mock(target='rpi', arch='arm')})
+ ctx.builder.get_build_dir.side_effect = RuntimeError('boom')
+
+ state = boss.DemandState(sent=1, ncommits=1,
+ grab_func=lambda w, n: [])
+ recv_q = queue.Queue()
+ recv_q.put(('resp', {'resp': 'build_result', 'board': 'rpi',
+ 'commit_upto': 0, 'return_code': 0,
+ 'stderr': '', 'stdout': ''}))
+ with terminal.capture():
+ result = ctx.collect_results(wrk, recv_q, state)
+ self.assertFalse(result)
+ ctx.close()
+ shutil.rmtree(tmpdir)
+
+
+class TestRunParallelErrors(unittest.TestCase):
+ """Test WorkerPool._run_parallel error paths"""
+
+ def test_worker_busy_and_boss_error(self):
+ """Test _run_parallel handles WorkerBusy and BossError"""
+ pool = boss.WorkerPool([])
+ busy_wrk = mock.Mock(name='busy1')
+ busy_wrk.name = 'busy1'
+ fail_wrk = mock.Mock(name='fail1')
+ fail_wrk.name = 'fail1'
+
+ calls = []
+
+ def func(item):
+ calls.append(item.name)
+ if item.name == 'busy1':
+ raise boss.WorkerBusy('too busy')
+ raise boss.BossError('failed')
+
+ with terminal.capture():
+ pool._run_parallel('Testing', [busy_wrk, fail_wrk], func)
+ fail_wrk.remove_lock.assert_called_once()
+
+
+class TestCloseAll(unittest.TestCase):
+ """Test WorkerPool.close_all() (lines 1533-1544)"""
+
+ def test_close_all_sends_quit(self):
+ """Test close_all sends quit and closes workers"""
+ pool = boss.WorkerPool([])
+ wrk = mock.Mock()
+ wrk.closing = False
+ wrk.bytes_sent = 0
+ wrk.bytes_recv = 0
+ wrk.name = 'host1'
+ pool.workers = [wrk]
+ blog = mock.Mock()
+ pool._boss_log = blog
+
+ with terminal.capture():
+ pool.close_all()
+
+ wrk.close.assert_called()
+ wrk.remove_lock.assert_called()
+ self.assertEqual(len(pool.workers), 0)
+
+
+class TestCollectResultsTimeout(unittest.TestCase):
+ """Test collect_results recv timeout and non-result responses"""
+
+
+ @mock.patch.object(boss, 'BUILD_TIMEOUT', 0.01)
+ def test_collect_timeout(self):
+ """Test collect_results returns False on recv timeout (line 933)"""
+ ctx, wrk, tmpdir = _make_ctx()
+ state = boss.DemandState(sent=1, ncommits=1,
+ grab_func=lambda w, n: [])
+ recv_q = queue.Queue()
+ with terminal.capture():
+ result = ctx.collect_results(wrk, recv_q, state)
+ self.assertFalse(result)
+ ctx.close()
+ shutil.rmtree(tmpdir)
+
+ def test_collect_skips_unknown(self):
+ """Test collect_results skips non-build_result responses (line 940)"""
+ ctx, wrk, tmpdir = _make_ctx()
+ state = boss.DemandState(sent=1, ncommits=1,
+ grab_func=lambda w, n: [])
+ recv_q = queue.Queue()
+ recv_q.put(('resp', {'resp': 'configure_done'})) # unknown
+ recv_q.put(('resp', {'resp': 'build_done'}))
+ with terminal.capture():
+ result = ctx.collect_results(wrk, recv_q, state)
+ self.assertTrue(result)
+ ctx.close()
+ shutil.rmtree(tmpdir)
+
+
+class TestDispatchJobs(unittest.TestCase):
+ """Test WorkerPool._dispatch_jobs() (lines 1290-1309)"""
+
+ def test_dispatch_jobs(self):
+ """Test _dispatch_jobs runs batch workers and closes context"""
+ pool = boss.WorkerPool([])
+ wrk = mock.Mock(nthreads=4, closing=False, slots=2)
+ wrk.name = 'host1'
+
+ brd = mock.Mock(target='rpi', arch='arm')
+ commit = mock.Mock(hash='abc123')
+ wjobs = [(brd, 0, commit)]
+
+ builder = mock.Mock()
+ tmpdir = tempfile.mkdtemp()
+ builder.base_dir = tmpdir
+ board_selected = {'rpi': brd}
+ blog = mock.Mock()
+ pool._boss_log = blog
+
+ # Mock build_boards to avoid actual protocol
+ wrk.build_boards = mock.Mock()
+ # recv_one will be called once — return False to end
+ with mock.patch.object(boss._DispatchContext, 'start_reader',
+ return_value=queue.Queue()), \
+ mock.patch.object(boss._DispatchContext, 'recv_one',
+ return_value=False):
+ with terminal.capture():
+ pool._dispatch_jobs({wrk: wjobs}, builder,
+ board_selected)
+
+ self.assertIsNone(pool._boss_log)
+ shutil.rmtree(tmpdir)
+
+
+class TestRunBatchWorker(unittest.TestCase):
+ """Test WorkerPool._run_batch_worker() (lines 1320-1358)"""
+
+ def test_batch_worker_success(self):
+ """Test _run_batch_worker sends build_boards and collects"""
+ wrk = mock.Mock(nthreads=4, closing=False, slots=2)
+ wrk.name = 'host1'
+ brd = mock.Mock(target='rpi', arch='arm')
+ commit = mock.Mock(hash='abc123')
+
+ builder = mock.Mock()
+ tmpdir = tempfile.mkdtemp()
+ builder.base_dir = tmpdir
+
+ ctx = boss._DispatchContext(
+ workers=[wrk], builder=builder,
+ board_selected={'rpi': brd}, boss_log=mock.Mock())
+
+ recv_q = queue.Queue()
+ recv_q.put(('resp', {'resp': 'build_result', 'board': 'rpi',
+ 'commit_upto': 0, 'return_code': 0,
+ 'stderr': '', 'stdout': ''}))
+ build_dir = tempfile.mkdtemp()
+ builder.get_build_dir.return_value = build_dir
+
+ with mock.patch.object(ctx, 'start_reader',
+ return_value=recv_q):
+ with terminal.capture():
+ boss.WorkerPool._run_batch_worker(
+ wrk, [(brd, 0, commit)], ctx)
+
+ wrk.build_boards.assert_called_once()
+ ctx.close()
+ shutil.rmtree(tmpdir)
+ shutil.rmtree(build_dir)
+
+ def test_batch_worker_build_error(self):
+ """Test _run_batch_worker handles build_boards BossError"""
+ wrk = mock.Mock(nthreads=4, closing=False, slots=2)
+ wrk.name = 'host1'
+ wrk.build_boards.side_effect = boss.BossError('gone')
+ brd = mock.Mock(target='rpi', arch='arm')
+ commit = mock.Mock(hash='abc123')
+
+ builder = mock.Mock()
+ tmpdir = tempfile.mkdtemp()
+ builder.base_dir = tmpdir
+
+ ctx = boss._DispatchContext(
+ workers=[wrk], builder=builder,
+ board_selected={}, boss_log=mock.Mock())
+
+ with mock.patch.object(ctx, 'start_reader',
+ return_value=queue.Queue()):
+ with terminal.capture():
+ boss.WorkerPool._run_batch_worker(
+ wrk, [(brd, 0, commit)], ctx)
+
+ ctx.close()
+ shutil.rmtree(tmpdir)
+
+
+class TestStartDemandWorker(unittest.TestCase):
+ """Test WorkerPool._start_demand_worker() (lines 1380-1400)"""
+
+ def _make_pool_and_ctx(self):
+ pool = boss.WorkerPool([])
+ wrk = mock.Mock(nthreads=4, closing=False, max_boards=2,
+ slots=2, toolchains={'arm': '/gcc'})
+ wrk.name = 'host1'
+ builder = mock.Mock()
+ tmpdir = tempfile.mkdtemp()
+ builder.base_dir = tmpdir
+ ctx = boss._DispatchContext(
+ workers=[wrk], builder=builder,
+ board_selected={}, boss_log=mock.Mock())
+ return pool, wrk, ctx, tmpdir
+
+ def test_prepare_error(self):
+ """Test _start_demand_worker handles build_prepare BossError"""
+ pool, wrk, ctx, tmpdir = self._make_pool_and_ctx()
+ wrk.build_prepare.side_effect = boss.BossError('gone')
+
+ with mock.patch.object(ctx, 'start_reader',
+ return_value=queue.Queue()):
+ with terminal.capture():
+ recv_q, state = pool._start_demand_worker(
+ wrk, ctx, ['abc'], 1, [], threading.Lock())
+ self.assertIsNone(recv_q)
+ self.assertIsNone(state)
+ ctx.close()
+ shutil.rmtree(tmpdir)
+
+ def test_prepare_timeout(self):
+ """Test _start_demand_worker when wait_for_prepare fails"""
+ pool, wrk, ctx, tmpdir = self._make_pool_and_ctx()
+
+ with mock.patch.object(ctx, 'start_reader',
+ return_value=queue.Queue()), \
+ mock.patch.object(ctx, 'wait_for_prepare',
+ return_value=False):
+ with terminal.capture():
+ recv_q, state = pool._start_demand_worker(
+ wrk, ctx, ['abc'], 1, [], threading.Lock())
+ self.assertIsNone(recv_q)
+ ctx.close()
+ shutil.rmtree(tmpdir)
+
+ def test_no_boards(self):
+ """Test _start_demand_worker when no boards available
+
+ Also covers the BossError path in build_done (lines 1395-1396).
+ """
+ pool, wrk, ctx, tmpdir = self._make_pool_and_ctx()
+ wrk.build_done.side_effect = boss.BossError('gone')
+
+ with mock.patch.object(ctx, 'start_reader',
+ return_value=queue.Queue()), \
+ mock.patch.object(ctx, 'wait_for_prepare',
+ return_value=True):
+ with terminal.capture():
+ recv_q, state = pool._start_demand_worker(
+ wrk, ctx, ['abc'], 1, [], threading.Lock())
+ self.assertIsNone(recv_q)
+ ctx.close()
+ shutil.rmtree(tmpdir)
+
+ def test_send_batch_failure(self):
+ """Test _start_demand_worker when send_batch fails"""
+ pool, wrk, ctx, tmpdir = self._make_pool_and_ctx()
+ brd = mock.Mock(target='rpi', arch='arm')
+ pool_list = [brd]
+
+ wrk.build_board.side_effect = boss.BossError('gone')
+
+ with mock.patch.object(ctx, 'start_reader',
+ return_value=queue.Queue()), \
+ mock.patch.object(ctx, 'wait_for_prepare',
+ return_value=True):
+ with terminal.capture():
+ recv_q, state = pool._start_demand_worker(
+ wrk, ctx, ['abc'], 1, pool_list,
+ threading.Lock())
+ self.assertIsNone(recv_q)
+ ctx.close()
+ shutil.rmtree(tmpdir)
+
+ def test_success(self):
+ """Test _start_demand_worker success path"""
+ pool, wrk, ctx, tmpdir = self._make_pool_and_ctx()
+ brd = mock.Mock(target='rpi', arch='arm')
+ pool_list = [brd]
+
+ with mock.patch.object(ctx, 'start_reader',
+ return_value=queue.Queue()), \
+ mock.patch.object(ctx, 'wait_for_prepare',
+ return_value=True):
+ with terminal.capture():
+ recv_q, state = pool._start_demand_worker(
+ wrk, ctx, ['abc'], 1, pool_list,
+ threading.Lock())
+ self.assertIsNotNone(recv_q)
+ self.assertIsNotNone(state)
+ self.assertEqual(state.sent, 1)
+ ctx.close()
+ shutil.rmtree(tmpdir)
+
+
+class TestFinishDemandWorker(unittest.TestCase):
+ """Test WorkerPool._finish_demand_worker() (lines 1429-1437)"""
+
+ def test_build_done_error(self):
+ """Test _finish_demand_worker when build_done raises"""
+ wrk = mock.Mock(nthreads=4, closing=False, max_boards=0,
+ slots=2)
+ wrk.name = 'host1'
+ wrk.build_done.side_effect = boss.BossError('gone')
+ builder = mock.Mock()
+ tmpdir = tempfile.mkdtemp()
+ builder.base_dir = tmpdir
+
+ ctx = boss._DispatchContext(
+ workers=[wrk], builder=builder,
+ board_selected={}, boss_log=mock.Mock())
+
+ state = boss.DemandState(sent=0, ncommits=1,
+ grab_func=lambda w, n: [])
+ recv_q = queue.Queue()
+
+ with mock.patch.object(ctx, 'collect_results'):
+ boss.WorkerPool._finish_demand_worker(
+ wrk, ctx, recv_q, state)
+ ctx.close()
+ shutil.rmtree(tmpdir)
+
+ def test_finish_waits_for_done(self):
+ """Test _finish_demand_worker waits for build_done response"""
+ wrk = mock.Mock(nthreads=4, closing=False, max_boards=0,
+ slots=2)
+ wrk.name = 'host1'
+ builder = mock.Mock()
+ tmpdir = tempfile.mkdtemp()
+ builder.base_dir = tmpdir
+
+ ctx = boss._DispatchContext(
+ workers=[wrk], builder=builder,
+ board_selected={}, boss_log=mock.Mock())
+
+ state = boss.DemandState(sent=0, ncommits=1,
+ grab_func=lambda w, n: [])
+ recv_q = queue.Queue()
+ recv_q.put(('resp', {'resp': 'heartbeat'}))
+ recv_q.put(('resp', {'resp': 'build_done'}))
+
+ with mock.patch.object(ctx, 'collect_results'):
+ boss.WorkerPool._finish_demand_worker(
+ wrk, ctx, recv_q, state)
+ ctx.close()
+ shutil.rmtree(tmpdir)
+
+ @mock.patch.object(boss, 'BUILD_TIMEOUT', 0.01)
+ def test_finish_recv_timeout(self):
+ """Test _finish_demand_worker handles recv timeout"""
+ wrk = mock.Mock(nthreads=4, closing=False, max_boards=0,
+ slots=2)
+ wrk.name = 'host1'
+ builder = mock.Mock()
+ tmpdir = tempfile.mkdtemp()
+ builder.base_dir = tmpdir
+
+ ctx = boss._DispatchContext(
+ workers=[wrk], builder=builder,
+ board_selected={}, boss_log=mock.Mock())
+
+ state = boss.DemandState(sent=0, ncommits=1,
+ grab_func=lambda w, n: [])
+ recv_q = queue.Queue()
+
+ with mock.patch.object(ctx, 'collect_results'), \
+ terminal.capture():
+ boss.WorkerPool._finish_demand_worker(
+ wrk, ctx, recv_q, state)
+ ctx.close()
+ shutil.rmtree(tmpdir)
+
+
+class TestCloseAllSignal(unittest.TestCase):
+ """Test close_all quit/close path (lines 1543-1544)"""
+
+ def test_close_all_quit_error(self):
+ """Test close_all handles _send BossError during quit"""
+ pool = boss.WorkerPool([])
+ wrk = mock.Mock()
+ wrk.closing = False
+ wrk.bytes_sent = 0
+ wrk.bytes_recv = 0
+ wrk.name = 'host1'
+ wrk._send.side_effect = boss.BossError('gone')
+ pool.workers = [wrk]
+ pool._boss_log = mock.Mock()
+
+ with terminal.capture():
+ pool.close_all()
+ wrk.close.assert_called()
+ self.assertEqual(len(pool.workers), 0)
+
+
+class TestZeroCapacity(unittest.TestCase):
+ """Test _get_worker_for_arch with zero total capacity (line 1183)"""
+
+ def test_zero_nthreads(self):
+ """Test workers with 0 nthreads don't cause division by zero"""
+ w1 = mock.Mock(nthreads=0, bogomips=0,
+ toolchains={'arm': '/gcc'})
+ pool = boss.WorkerPool.__new__(boss.WorkerPool)
+ pool.workers = [w1]
+ # Should not raise ZeroDivisionError
+ wrk = pool._get_worker_for_arch('arm', {})
+ self.assertEqual(wrk, w1)
+
+
+if __name__ == '__main__':
+ unittest.main()
@@ -264,7 +264,8 @@ class TestMachinePool(unittest.TestCase):
'host2\n'
)
pool = machine.MachinePool()
- available = pool.probe_all()
+ with terminal.capture():
+ available = pool.probe_all()
self.assertEqual(len(available), 2)
self.assertEqual(pool.get_total_weight(), 14)
@@ -285,7 +286,8 @@ class TestMachinePool(unittest.TestCase):
'host2\n'
)
pool = machine.MachinePool()
- available = pool.probe_all()
+ with terminal.capture():
+ available = pool.probe_all()
self.assertEqual(len(available), 1)
self.assertEqual(available[0].hostname, 'host1')
@@ -311,8 +313,10 @@ sandbox : /usr/bin/gcc
'host1\n'
)
pool = machine.MachinePool()
- pool.probe_all()
- missing = pool.check_toolchains({'arm', 'sandbox'})
+ with terminal.capture():
+ pool.probe_all()
+ with terminal.capture():
+ missing = pool.check_toolchains({'arm', 'sandbox'})
self.assertEqual(missing, {})
@mock.patch('buildman.machine._run_ssh')
@@ -336,8 +340,10 @@ sandbox : /usr/bin/gcc
'host1\n'
)
pool = machine.MachinePool()
- pool.probe_all()
- missing = pool.check_toolchains({'arm', 'sandbox'})
+ with terminal.capture():
+ pool.probe_all()
+ with terminal.capture():
+ missing = pool.check_toolchains({'arm', 'sandbox'})
self.assertEqual(len(missing), 1)
m = list(missing.keys())[0]
self.assertIn('arm', missing[m])
@@ -683,7 +689,8 @@ class TestMachinePoolExtended(unittest.TestCase):
'mem_avail_mb': 8000, 'disk_avail_mb': 20000})
bsettings.add_file('[machines]\nhost1\n')
pool = machine.MachinePool()
- pool.probe_all()
+ with terminal.capture():
+ pool.probe_all()
# Just verify it doesn't crash
with terminal.capture():
pool.print_summary()
@@ -703,8 +710,10 @@ class TestMachinePoolExtended(unittest.TestCase):
mock_ssh.side_effect = ssh_side_effect
bsettings.add_file('[machines]\nhost1\n')
pool = machine.MachinePool()
- pool.probe_all()
- pool.check_toolchains({'arm', 'sandbox'})
+ with terminal.capture():
+ pool.probe_all()
+ with terminal.capture():
+ pool.check_toolchains({'arm', 'sandbox'})
with terminal.capture():
pool.print_summary(local_archs={'arm', 'sandbox'})
@@ -728,7 +737,8 @@ class TestMachinePoolExtended(unittest.TestCase):
mock_ssh.side_effect = ssh_side_effect
bsettings.add_file('[machines]\nhost1\n')
pool = machine.MachinePool()
- pool.probe_all()
+ with terminal.capture():
+ pool.probe_all()
local_gcc = {
'arm': f'{home}/.buildman-toolchains/gcc-13.1.0-nolibc/'
@@ -751,7 +761,8 @@ class TestMachinePoolExtended(unittest.TestCase):
with mock.patch.object(machine.Machine,
'_probe_toolchains_from_boss',
fake_probe):
- missing = pool.check_toolchains({'arm'}, local_gcc=local_gcc)
+ with terminal.capture():
+ missing = pool.check_toolchains({'arm'}, local_gcc=local_gcc)
# arm should be flagged as missing due to version mismatch
self.assertEqual(len(missing), 1)
@@ -839,7 +850,8 @@ class TestPrintSummaryEdgeCases(unittest.TestCase):
mock_ssh.side_effect = machine.MachineError('refused')
bsettings.add_file('[machines]\nhost1\n')
pool = machine.MachinePool()
- pool.probe_all()
+ with terminal.capture():
+ pool.probe_all()
# Should not crash with unavailable machine
with terminal.capture():
pool.print_summary()
@@ -851,7 +863,8 @@ class TestPrintSummaryEdgeCases(unittest.TestCase):
**MACHINE_INFO, 'load_1m': 10.0})
bsettings.add_file('[machines]\nhost1\n')
pool = machine.MachinePool()
- pool.probe_all()
+ with terminal.capture():
+ pool.probe_all()
with terminal.capture():
pool.print_summary()
@@ -865,7 +878,8 @@ class TestPrintSummaryEdgeCases(unittest.TestCase):
'[machines]\nhost1\n'
'[machine:host1]\nmax_boards = 50\n')
pool = machine.MachinePool()
- pool.probe_all()
+ with terminal.capture():
+ pool.probe_all()
with terminal.capture():
pool.print_summary()
@@ -877,7 +891,8 @@ class TestPrintSummaryEdgeCases(unittest.TestCase):
'mem_avail_mb': 8000, 'disk_avail_mb': 20000})
bsettings.add_file('[machines]\nhost1\n')
pool = machine.MachinePool()
- pool.probe_all()
+ with terminal.capture():
+ pool.probe_all()
pool.machines[0].tc_error = 'buildman not found'
with terminal.capture():
pool.print_summary(local_archs={'arm'})
@@ -890,7 +905,8 @@ class TestPrintSummaryEdgeCases(unittest.TestCase):
'mem_avail_mb': 8000, 'disk_avail_mb': 20000})
bsettings.add_file('[machines]\nhost1\n')
pool = machine.MachinePool()
- pool.probe_all()
+ with terminal.capture():
+ pool.probe_all()
pool.machines[0].toolchains = {'sandbox': '/usr/bin/gcc'}
local_gcc = {
'arm': os.path.expanduser(
@@ -912,7 +928,8 @@ class TestCheckToolchainsEdge(unittest.TestCase):
bsettings.add_file('[machines]\nhost1\n')
pool = machine.MachinePool()
# Machine is not probed, so not reachable
- result = pool.check_toolchains({'arm'})
+ with terminal.capture():
+ result = pool.check_toolchains({'arm'})
self.assertEqual(result, {})
@mock.patch('buildman.machine._run_ssh')
@@ -931,15 +948,17 @@ class TestCheckToolchainsEdge(unittest.TestCase):
mock_ssh.side_effect = ssh_side_effect
bsettings.add_file('[machines]\nhost1\n')
pool = machine.MachinePool()
- pool.probe_all()
+ with terminal.capture():
+ pool.probe_all()
local_gcc = {
'arm': f'{home}/.buildman-toolchains/gcc-13/arm/bin/gcc',
}
# fetch=True should trigger _fetch_all_missing
with mock.patch.object(pool, '_fetch_all_missing') as mock_fetch:
- pool.check_toolchains({'arm'}, fetch=True,
- local_gcc=local_gcc)
+ with terminal.capture():
+ pool.check_toolchains({'arm'}, fetch=True,
+ local_gcc=local_gcc)
mock_fetch.assert_called_once()
@@ -1011,7 +1030,8 @@ class TestPrintSummaryMissingNoVersion(unittest.TestCase):
'mem_avail_mb': 8000, 'disk_avail_mb': 20000})
bsettings.add_file('[machines]\nhost1\n')
pool = machine.MachinePool()
- pool.probe_all()
+ with terminal.capture():
+ pool.probe_all()
pool.machines[0].toolchains = {}
# arm has a version (under ~/.buildman-toolchains),
# sandbox does not