[Concept,18/20] buildman: Add boss module for driving remote workers

Message ID 20260316154733.1587261-19-sjg@u-boot.org
State New
Headers
Series buildman: Add distributed builds |

Commit Message

Simon Glass March 16, 2026, 3:47 p.m. UTC
  From: Simon Glass <sjg@chromium.org>

Add the boss side of the distributed build protocol. The RemoteWorker
class manages a persistent SSH connection to a remote buildman worker,
providing methods to set up a work directory, push source via git, send
build commands and receive results.

WorkerPool manages parallel worker startup, source push, build settings
distribution, demand-driven board dispatch and result collection. Each
worker gets boards from a shared pool as it finishes previous ones, so
faster workers naturally get more work.

Features include capacity-weighted board distribution (nthreads *
bogomips), per-worker log files, build timeouts via reader threads,
transfer statistics, and clean shutdown on Ctrl-C via two-phase quit.

Tests are included in test_boss.py covering protocol handling, board
dispatch, worker lifecycle and error paths.

Signed-off-by: Simon Glass <sjg@chromium.org>
---

 tools/buildman/boss.py         | 1507 ++++++++++++++++++
 tools/buildman/main.py         |    2 +
 tools/buildman/test_boss.py    | 2645 ++++++++++++++++++++++++++++++++
 tools/buildman/test_machine.py |   62 +-
 4 files changed, 4195 insertions(+), 21 deletions(-)
 create mode 100644 tools/buildman/boss.py
 create mode 100644 tools/buildman/test_boss.py

-- 
2.43.0
  

Patch

diff --git a/tools/buildman/boss.py b/tools/buildman/boss.py
new file mode 100644
index 00000000000..b1d15fec28f
--- /dev/null
+++ b/tools/buildman/boss.py
@@ -0,0 +1,1507 @@ 
+# SPDX-License-Identifier: GPL-2.0+
+# Copyright 2026 Simon Glass <sjg@chromium.org>
+# pylint: disable=C0302
+
+"""Boss side of the distributed build protocol
+
+Manages SSH connections to remote workers and communicates using the JSON-lines
+protocol defined in worker.py. Each RemoteWorker wraps a persistent SSH process
+whose stdin/stdout carry the protocol messages.
+
+Typical usage:
+    w = RemoteWorker('myhost', buildman_path='buildman')
+    w.start()                      # launches ssh, waits for 'ready'
+    w.setup()                      # worker creates git repo
+    w.push_source(git_dir, ref)    # git push to the worker's repo
+    w.build('sandbox', commit='abc123', commit_upto=0)
+    result = w.recv()              # {'resp': 'build_result', ...}
+    w.quit()
+"""
+
+import datetime
+import json
+import os
+import queue
+import subprocess
+import sys
+import threading
+import time
+
+from buildman import builderthread
+from buildman import worker as worker_mod
+from u_boot_pylib import command
+from u_boot_pylib import tools
+from u_boot_pylib import tout
+
+# SSH options shared with machine.py
+SSH_OPTS = [
+    '-o', 'BatchMode=yes',
+    '-o', 'StrictHostKeyChecking=accept-new',
+]
+
+# Per-build timeout in seconds. If a worker doesn't respond within this
+# time, the boss assumes the worker is dead or hung and stops using it.
+BUILD_TIMEOUT = 300
+
+# Interval in seconds between status summaries in the boss log
+STATUS_INTERVAL = 60
+
+
+class BossError(Exception):
+    """Error communicating with a remote worker"""
+
+
+class WorkerBusy(BossError):
+    """Worker machine is already in use by another boss"""
+
+
+def _run_ssh(hostname, remote_cmd, timeout=10):
+    """Run a one-shot SSH command on a remote host
+
+    Args:
+        hostname (str): SSH hostname
+        remote_cmd (str): Shell command to run on the remote host
+        timeout (int): SSH connect timeout in seconds
+
+    Returns:
+        str: stdout from the command
+
+    Raises:
+        BossError: if the command fails
+    """
+    ssh_cmd = [
+        'ssh',
+        '-o', f'ConnectTimeout={timeout}',
+    ] + SSH_OPTS + [hostname, '--', remote_cmd]
+    try:
+        result = command.run_pipe(
+            [ssh_cmd], capture=True, capture_stderr=True,
+            raise_on_error=True)
+        return result.stdout.strip() if result.stdout else ''
+    except command.CommandExc as exc:
+        raise BossError(f'SSH command failed on {hostname}: {exc}') from exc
+
+
+def kill_workers(machines):
+    """Kill stale worker processes and remove lock files on remote machines
+
+    Connects to each machine via SSH, kills any running worker processes
+    and removes the lock file. Useful for cleaning up after a failed or
+    interrupted distributed build.
+
+    Args:
+        machines (list of str): SSH hostnames to clean up
+
+    Returns:
+        int: 0 on success
+    """
+
+    results = {}
+    lock = threading.Lock()
+
+    def _kill_one(hostname):
+        kill_script = ('pids=$(pgrep -f "[p]ython3.*--worker" 2>/dev/null); '
+            'if [ -n "$pids" ]; then '
+            '  kill $pids 2>/dev/null; '
+            '  echo "killed $pids"; '
+            'else '
+            '  echo "no workers"; '
+            'fi; '
+            'rm -f ~/dev/.bm-worker/.lock'
+        )
+        try:
+            output = _run_ssh(hostname, kill_script)
+            with lock:
+                results[hostname] = output
+        except BossError as exc:
+            with lock:
+                results[hostname] = f'FAILED: {exc}'
+
+    threads = []
+    for hostname in machines:
+        thr = threading.Thread(target=_kill_one, args=(hostname,))
+        thr.start()
+        threads.append(thr)
+    for thr in threads:
+        thr.join()
+
+    for hostname, output in sorted(results.items()):
+        print(f'  {hostname}: {output}')
+    return 0
+
+
+class RemoteWorker:  # pylint: disable=R0902
+    """Manages one SSH connection to a remote buildman worker
+
+    The startup sequence is:
+        1. init_git() - create a bare git repo on the remote via one-shot SSH
+        2. push_source() - git push the local tree to the remote repo
+        3. start() - launch the worker from the pushed tree
+
+    This ensures the worker runs the same version of buildman as the boss.
+
+    Attributes:
+        hostname (str): SSH hostname (user@host or just host)
+        nthreads (int): Number of build threads the worker reported
+        git_dir (str): Path to the worker's git directory
+        work_dir (str): Path to the worker's work directory
+    """
+
+    def __init__(self, hostname, timeout=10, name=None):
+        """Create a new remote worker connection
+
+        Args:
+            hostname (str): SSH hostname
+            timeout (int): SSH connect timeout in seconds
+            name (str or None): Short display name, defaults to hostname
+        """
+        self.hostname = hostname
+        self.name = name or hostname
+        self.timeout = timeout
+        self.nthreads = 0
+        self.slots = 1
+        self.max_boards = 0
+        self.bogomips = 0.0
+        self.git_dir = ''
+        self.work_dir = ''
+        self.toolchains = {}
+        self.closing = False
+        self.bytes_sent = 0
+        self.bytes_recv = 0
+        self._proc = None
+        self._stderr_lines = []
+        self._stderr_thread = None
+
+    def init_git(self, work_dir='~/dev/.bm-worker'):
+        """Ensure a git repo exists on the remote host via one-shot SSH
+
+        Reuses an existing repo if present, so that subsequent pushes
+        only transfer the delta. Creates a lock file to prevent two
+        bosses from using the same worker simultaneously. A lock is
+        considered stale if no worker process is running.
+
+        Args:
+            work_dir (str): Fixed path for the work directory
+
+        Raises:
+            WorkerBusy: if another boss holds the lock
+            BossError: if the SSH command fails
+        """
+        lock = f'{work_dir}/.lock'
+        init_script = (f'mkdir -p {work_dir} && '
+            # Check for lock — stale if no worker process is running
+            f'if [ -f {lock} ]; then '
+            f'  if pgrep -f "[p]ython3.*--worker" >/dev/null 2>&1; then '
+            f'    echo BUSY; exit 0; '
+            f'  fi; '
+            f'  rm -f {lock}; '
+            f'fi && '
+            # Create lock and init git
+            f'date +%s > {lock} && '
+            f'(test -d {work_dir}/.git || git init -q {work_dir}) && '
+            f'git -C {work_dir} config '
+            f'receive.denyCurrentBranch updateInstead && '
+            f'echo {work_dir}'
+        )
+        output = _run_ssh(self.hostname, init_script, self.timeout)
+        if not output:
+            raise BossError(
+                f'init_git on {self.hostname} returned no work directory')
+        last_line = output.splitlines()[-1].strip()
+        if last_line == 'BUSY':
+            raise WorkerBusy(f'{self.hostname} is busy (locked)')
+        self.work_dir = last_line
+        self.git_dir = os.path.join(self.work_dir, '.git')
+
+    def start(self, debug=False):
+        """Start the worker from the pushed source tree
+
+        Launches the worker using the buildman from the pushed git tree.
+        The source must already have been pushed via init_git() and
+        push_source().
+
+        A background thread forwards the worker's stderr to the boss's
+        stderr, prefixed with the machine name, so that debug messages
+        and errors are always visible.
+
+        Args:
+            debug (bool): True to pass -D to the worker for tracebacks
+
+        Raises:
+            BossError: if the SSH connection or worker startup fails
+        """
+        if not self.work_dir:
+            raise BossError(f'No work_dir on {self.hostname} '
+                f'(call init_git and push_source first)')
+        worker_cmd = 'python3 tools/buildman/main.py --worker'
+        if debug:
+            worker_cmd += ' -D'
+        ssh_cmd = [
+            'ssh',
+            '-o', f'ConnectTimeout={self.timeout}',
+        ] + SSH_OPTS + [
+            self.hostname, '--',
+            f'cd {self.work_dir} && git checkout -qf work && '
+            f'{worker_cmd}',
+        ]
+        try:
+            # pylint: disable=R1732
+            self._proc = subprocess.Popen(
+                ssh_cmd, stdin=subprocess.PIPE, stdout=subprocess.PIPE,
+                stderr=subprocess.PIPE)
+        except OSError as exc:
+            raise BossError(
+                f'Failed to start SSH to {self.hostname}: {exc}') from exc
+
+        # Forward worker stderr in a background thread so debug messages
+        # and errors are always visible
+        self._stderr_lines = []
+        self._stderr_thread = threading.Thread(
+            target=self._forward_stderr, daemon=True)
+        self._stderr_thread.start()
+
+        resp = self._recv()
+        if resp.get('resp') != 'ready':
+            self.close()
+            raise BossError(
+                f'Worker on {self.hostname} did not send ready: {resp}')
+        self.nthreads = resp.get('nthreads', 1)
+        self.slots = resp.get('slots', 1)
+        if not self.max_boards:
+            self.max_boards = self.nthreads
+
+    def _forward_stderr(self):
+        """Forward worker stderr to boss stderr with machine name prefix
+
+        Runs in a background thread. Saves lines for _get_stderr() too.
+        """
+        try:
+            for raw in self._proc.stderr:
+                line = raw.decode('utf-8', errors='replace').rstrip('\n')
+                if line:
+                    self._stderr_lines.append(line)
+                    sys.stderr.write(f'[{self.name}] {line}\n')
+                    sys.stderr.flush()
+        except (OSError, ValueError):
+            pass
+
+    def _send(self, obj):
+        """Send a JSON command to the worker
+
+        Args:
+            obj (dict): Command object to send
+
+        Raises:
+            BossError: if the SSH process is not running
+        """
+        if not self._proc or self._proc.poll() is not None:
+            raise BossError(f'Worker on {self.hostname} is not running')
+        line = json.dumps(obj) + '\n'
+        data = line.encode('utf-8')
+        self.bytes_sent += len(data)
+        self._proc.stdin.write(data)
+        self._proc.stdin.flush()
+
+    def _recv(self):
+        """Read the next protocol response from the worker
+
+        Reads lines from stdout, skipping any that don't start with the
+        'BM> ' prefix (e.g. SSH banners).
+
+        Returns:
+            dict: Parsed JSON response
+
+        Raises:
+            BossError: if the worker closes the connection or sends bad data
+        """
+        while True:
+            raw = self._proc.stdout.readline()
+            if not raw:
+                stderr = self._get_stderr()
+                raise BossError(f'Worker on {self.hostname} closed connection'
+                    f'{": " + stderr if stderr else ""}')
+            self.bytes_recv += len(raw)
+            line = raw.decode('utf-8', errors='replace').rstrip('\n')
+            if line.startswith(worker_mod.RESPONSE_PREFIX):
+                payload = line[len(worker_mod.RESPONSE_PREFIX):]
+                try:
+                    return json.loads(payload)
+                except json.JSONDecodeError as exc:
+                    raise BossError(
+                        f'Bad JSON from {self.hostname}: {exc}') from exc
+
+    def _get_stderr(self):
+        """Get the last stderr line from the worker
+
+        Waits briefly for the stderr forwarding thread to finish
+        collecting output, then returns the last non-empty line.
+
+        Returns:
+            str: Last non-empty line of stderr, or empty string
+        """
+        if hasattr(self, '_stderr_thread'):
+            self._stderr_thread.join(timeout=2)
+        for line in reversed(self._stderr_lines):
+            if line.strip():
+                return line.strip()
+        return ''
+
+    def push_source(self, local_git_dir, refspec):
+        """Push source code to the worker's git repo
+
+        Uses 'git push' over SSH to send commits to the worker.
+
+        Args:
+            local_git_dir (str): Path to local git directory
+            refspec (str): Git refspec to push (e.g. 'HEAD:refs/heads/work')
+
+        Raises:
+            BossError: if the push fails
+        """
+        if not self.git_dir:
+            raise BossError(
+                f'No git_dir on {self.hostname} (call init_git first)')
+        push_url = f'{self.hostname}:{self.git_dir}'
+        try:
+            command.run_pipe([['git', 'push', '--force', push_url, refspec]],
+                capture=True, capture_stderr=True,
+                raise_on_error=True, cwd=local_git_dir)
+        except command.CommandExc as exc:
+            raise BossError(
+                f'git push to {self.hostname} failed: {exc}') from exc
+
+    def configure(self, settings):
+        """Send build settings to the worker
+
+        Sends settings that affect how make is invoked (verbose, no_lto,
+        allow_missing, etc.). Must be called after start() and before
+        any build commands.
+
+        Args:
+            settings (dict): Build settings, e.g.:
+                verbose_build (bool): Run make with V=1
+                allow_missing (bool): Pass BINMAN_ALLOW_MISSING=1
+                no_lto (bool): Pass NO_LTO=1
+                reproducible_builds (bool): Pass SOURCE_DATE_EPOCH=0
+                warnings_as_errors (bool): Pass KCFLAGS=-Werror
+                mrproper (bool): Run make mrproper before config
+                fallback_mrproper (bool): Retry with mrproper on failure
+
+        Raises:
+            BossError: if the worker rejects the settings
+        """
+        self._send({'cmd': 'configure', 'settings': settings})
+        resp = self._recv()
+        if resp.get('resp') != 'configure_done':
+            raise BossError(
+                f'Worker on {self.hostname} rejected configure: {resp}')
+
+    def build_boards(self, boards, commits):
+        """Send a build_boards command to the worker
+
+        Tells the worker to build all boards for each commit. The
+        worker handles checkout scheduling, parallelism and -j
+        calculation internally.
+
+        Args:
+            boards (list of dict): Board info dicts with keys:
+                board (str): Board target name
+                defconfig (str): Defconfig target
+                env (dict): Extra environment variables
+            commits (list): Commit hashes in order, or [None] for
+                current source
+        """
+        self._send({
+            'cmd': 'build_boards',
+            'boards': boards,
+            'commits': commits,
+        })
+
+    def build_prepare(self, commits):
+        """Send a build_prepare command to the worker
+
+        Creates the Builder and worktrees. Follow with build_board()
+        calls, then build_done().
+
+        Args:
+            commits (list): Commit hashes in order, or [None] for
+                current source
+        """
+        self._send({'cmd': 'build_prepare', 'commits': commits,
+                    'max_boards': self.max_boards})
+
+    def build_board(self, board, arch):
+        """Send a build_board command to add one board to the worker
+
+        Args:
+            board (str): Board target name
+            arch (str): Board architecture
+        """
+        self._send({
+            'cmd': 'build_board',
+            'board': board,
+            'arch': arch,
+        })
+
+    def build_done(self):
+        """Tell the worker no more boards are coming"""
+        self._send({'cmd': 'build_done'})
+
+    def recv(self):
+        """Receive the next response from the worker
+
+        Returns:
+            dict: Parsed JSON response
+        """
+        return self._recv()
+
+    def quit(self):
+        """Tell the worker to quit, remove the lock and close"""
+        try:
+            self._send({'cmd': 'quit'})
+            resp = self._recv()
+        except BossError:
+            resp = {}
+        self.close()
+        self.remove_lock()
+        return resp
+
+    def remove_lock(self):
+        """Remove the lock file from the remote machine"""
+        if self.work_dir:
+            try:
+                _run_ssh(self.hostname,
+                         f'rm -f {self.work_dir}/.lock', self.timeout)
+            except BossError:
+                pass
+
+    def close(self):
+        """Close the SSH connection
+
+        Closes stdin first so SSH can flush any pending data (e.g. a
+        quit command) to the remote, then waits briefly for SSH to
+        exit on its own before terminating it.
+        """
+        if self._proc:
+            try:
+                self._proc.stdin.close()
+            except OSError:
+                pass
+            try:
+                self._proc.wait(timeout=2)
+            except subprocess.TimeoutExpired:
+                self._proc.terminate()
+                try:
+                    self._proc.wait(timeout=3)
+                except subprocess.TimeoutExpired:
+                    self._proc.kill()
+            self._proc = None
+
+    def __repr__(self):
+        status = 'running' if self._proc else 'stopped'
+        return (f'RemoteWorker({self.hostname}, '
+                f'nthreads={self.nthreads}, {status})')
+
+    def __del__(self):
+        self.close()
+
+
+def _format_bytes(nbytes):
+    """Format a byte count as a human-readable string"""
+    if nbytes < 1024:
+        return f'{nbytes}B'
+    if nbytes < 1024 * 1024:
+        return f'{nbytes / 1024:.1f}KB'
+    return f'{nbytes / (1024 * 1024):.1f}MB'
+
+
+class _BossLog:
+    """Central boss log for distributed builds
+
+    Logs major events and periodic per-worker status summaries
+    to boss.log in the builder output directory.
+    """
+
+    def __init__(self, base_dir):
+        os.makedirs(base_dir, exist_ok=True)
+        path = os.path.join(base_dir, '.buildman.log')
+        # pylint: disable=R1732
+        self._logf = open(path, 'w', encoding='utf-8')
+        self._lock = threading.Lock()
+        self._stats = {}
+        self._timer = None
+        self._closed = False
+
+    def log(self, msg):
+        """Write a timestamped log entry"""
+        with self._lock:
+            if self._logf:
+                stamp = datetime.datetime.now().strftime('%H:%M:%S')
+                self._logf.write(f'{stamp} {msg}\n')
+                self._logf.flush()
+
+    def init_worker(self, wrk):
+        """Register a worker for status tracking"""
+        with self._lock:
+            self._stats[wrk.name] = {
+                'sent': 0,
+                'recv': 0,
+                'load_avg': 0.0,
+                'nthreads': wrk.nthreads,
+            }
+
+    def record_sent(self, wrk_name, count=1):
+        """Record boards sent to a worker"""
+        with self._lock:
+            if wrk_name in self._stats:
+                self._stats[wrk_name]['sent'] += count
+
+    def record_recv(self, wrk_name, load_avg=0.0):
+        """Record a reply received from a worker"""
+        with self._lock:
+            if wrk_name in self._stats:
+                self._stats[wrk_name]['recv'] += 1
+                self._stats[wrk_name]['load_avg'] = load_avg
+
+    def log_status(self):
+        """Log a status summary for all workers"""
+        with self._lock:
+            parts = []
+            total_load = 0.0
+            total_threads = 0
+            total_done = 0
+            total_sent = 0
+            for name, st in self._stats.items():
+                nthreads = st['nthreads']
+                cpu_pct = (st['load_avg'] / nthreads * 100 if nthreads else 0)
+                parts.append(f'{name}:done={st["recv"]}/{st["sent"]}'
+                    f' cpu={cpu_pct:.0f}%')
+                total_load += st['load_avg']
+                total_threads += nthreads
+                total_done += st['recv']
+                total_sent += st['sent']
+            total_cpu = (total_load / total_threads * 100
+                         if total_threads else 0)
+            parts.append(f'TOTAL:done={total_done}/{total_sent}'
+                         f' cpu={total_cpu:.0f}%')
+            if self._logf:
+                stamp = datetime.datetime.now().strftime('%H:%M:%S')
+                self._logf.write(f'{stamp} STATUS {", ".join(parts)}\n')
+                self._logf.flush()
+
+    def start_timer(self):
+        """Start the periodic status timer"""
+        def _tick():
+            if not self._closed:
+                self.log_status()
+                self._timer = threading.Timer(STATUS_INTERVAL, _tick)
+                self._timer.daemon = True
+                self._timer.start()
+        self._timer = threading.Timer(STATUS_INTERVAL, _tick)
+        self._timer.daemon = True
+        self._timer.start()
+
+    def close(self):
+        """Stop the timer and close the log file"""
+        self._closed = True
+        if self._timer:
+            self._timer.cancel()
+            self._timer = None
+        with self._lock:
+            if self._logf:
+                self._logf.close()
+                self._logf = None
+
+
+def split_boards(board_selected, toolchains):
+    """Split boards between local and remote machines
+
+    Boards whose architecture has a toolchain on at least one remote machine
+    are assigned to remote workers. The rest stay local.
+
+    Args:
+        board_selected (dict): target_name -> Board for all selected boards
+        toolchains (dict): Architecture -> gcc path on remote machines.
+            Combined from all machines.
+
+    Returns:
+        tuple:
+            dict: target_name -> Board for local builds
+            dict: target_name -> Board for remote builds
+    """
+    remote_archs = set(toolchains.keys()) if toolchains else set()
+    local = {}
+    remote = {}
+    for name, brd in board_selected.items():
+        if brd.arch in remote_archs:
+            remote[name] = brd
+        else:
+            local[name] = brd
+    return local, remote
+
+
+def _write_remote_result(builder, resp, board_selected, hostname):
+    """Write a remote build result and update builder progress
+
+    Creates the same directory structure and files that BuilderThread would
+    create for a local build, then calls builder.process_result() to
+    update the progress display.
+
+    Args:
+        builder (Builder): Builder object
+        resp (dict): build_result response from a worker
+        board_selected (dict): target_name -> Board, for looking up
+            the Board object
+        hostname (str): Remote machine that built this board
+    """
+    board = resp.get('board', '')
+    commit_upto = resp.get('commit_upto', 0)
+    return_code = resp.get('return_code', 1)
+    stderr = resp.get('stderr', '')
+
+    build_dir = builder.get_build_dir(commit_upto, board)
+    builderthread.mkdir(build_dir, parents=True)
+
+    tools.write_file(os.path.join(build_dir, 'done'),
+        f'{return_code}\n', binary=False)
+
+    err_path = os.path.join(build_dir, 'err')
+    if stderr:
+        tools.write_file(err_path, stderr, binary=False)
+    elif os.path.exists(err_path):
+        os.remove(err_path)
+
+    tools.write_file(os.path.join(build_dir, 'log'),
+        resp.get('stdout', ''), binary=False)
+
+    sizes = resp.get('sizes', {})
+    if sizes.get('raw'):
+        # Strip any header line (starts with 'text') in case the worker
+        # sends raw size output including the header
+        raw = sizes['raw']
+        lines = raw.splitlines()
+        if lines and lines[0].lstrip().startswith('text'):
+            raw = '\n'.join(lines[1:])
+        if raw.strip():
+            tools.write_file(os.path.join(build_dir, 'sizes'),
+                raw, binary=False)
+
+    # Update the builder's progress display
+    brd = board_selected.get(board)
+    if brd:
+        result = command.CommandResult(stderr=stderr, return_code=return_code)
+        result.brd = brd
+        result.commit_upto = commit_upto
+        result.already_done = False
+        result.kconfig_reconfig = False
+        result.remote = hostname
+        builder.process_result(result)
+
+
+class DemandState:  # pylint: disable=R0903
+    """Mutable state for a demand-driven worker build
+
+    Tracks how many boards have been sent, received and are in-flight
+    for a single worker during demand-driven dispatch.
+
+    Attributes:
+        sent: Total boards sent to the worker
+        in_flight: Boards currently being built (sent - completed)
+        expected: Total results expected (sent * ncommits)
+        received: Results received so far
+        board_results: Per-board result count (target -> int)
+        ncommits: Number of commits being built
+        grab_func: Callable(wrk, count) -> list of Board to get more
+            boards from the shared pool
+    """
+
+    def __init__(self, sent, ncommits, grab_func):
+        self.sent = sent
+        self.in_flight = sent
+        self.expected = sent * ncommits
+        self.received = 0
+        self.board_results = {}
+        self.ncommits = ncommits
+        self.grab_func = grab_func
+
+
+class _DispatchContext:
+    """Shared infrastructure for dispatching builds to workers
+
+    Manages per-worker log files, worktree progress tracking, reader
+    threads, and result processing. Used by both _dispatch_jobs() and
+    _dispatch_demand() to avoid duplicating this infrastructure.
+    """
+
+    def __init__(self, workers, builder, board_selected, boss_log):
+        self.builder = builder
+        self.board_selected = board_selected
+        self.boss_log = boss_log
+        self._worktree_counts = {}
+        self._worktree_lock = threading.Lock()
+
+        # Open a log file per worker
+        os.makedirs(builder.base_dir, exist_ok=True)
+        self.log_files = {}
+        for wrk in workers:
+            path = os.path.join(builder.base_dir, f'worker-{wrk.name}.log')
+            self.log_files[wrk] = open(  # pylint: disable=R1732
+                path, 'w', encoding='utf-8')
+
+    def log(self, wrk, direction, msg):
+        """Write a timestamped entry to a worker's log file"""
+        logf = self.log_files.get(wrk)
+        if logf:
+            stamp = datetime.datetime.now().strftime('%H:%M:%S')
+            logf.write(f'{stamp} {direction} {msg}\n')
+            logf.flush()
+
+    def update_progress(self, resp, wrk):
+        """Handle worktree progress messages from a worker
+
+        Args:
+            resp (dict): Response from the worker
+            wrk (RemoteWorker): Worker that sent the response
+
+        Returns:
+            bool: True if the response was a progress message
+        """
+        resp_type = resp.get('resp')
+        if resp_type == 'build_started':
+            with self._worktree_lock:
+                num = resp.get('num_threads', wrk.nthreads)
+                self._worktree_counts[wrk.name] = (self._worktree_counts.get(
+                        wrk.name, (0, num))[0], num)
+            return True
+        if resp_type == 'worktree_created':
+            with self._worktree_lock:
+                done, total = self._worktree_counts.get(
+                    wrk.name, (0, wrk.nthreads))
+                self._worktree_counts[wrk.name] = (done + 1, total)
+            self._refresh_progress()
+            return True
+        return False
+
+    def _refresh_progress(self):
+        """Update the builder's progress string from worktree counts"""
+        with self._worktree_lock:
+            parts = []
+            for name, (done, total) in sorted(self._worktree_counts.items()):
+                if done < total:
+                    parts.append(f'{name} {done}/{total}')
+            self.builder.progress = ', '.join(parts)
+            if self.builder.progress:
+                self.builder.process_result(None)
+
+    def start_reader(self, wrk):
+        """Start a background reader thread for a worker
+
+        Returns:
+            queue.Queue: Queue that receives (status, value) tuples
+        """
+        recv_q = queue.Queue()
+
+        def _reader():
+            while True:
+                try:
+                    resp = wrk.recv()
+                    recv_q.put(('ok', resp))
+                except BossError as exc:
+                    recv_q.put(('error', exc))
+                    break
+                except Exception:  # pylint: disable=W0718
+                    recv_q.put(('error', BossError(
+                        f'Worker on {wrk.name} connection lost')))
+                    break
+
+        threading.Thread(target=_reader, daemon=True).start()
+        return recv_q
+
+    def recv(self, wrk, recv_q):
+        """Get next response from queue with timeout
+
+        Returns:
+            dict or None: Response, or None on error
+        """
+        try:
+            status, val = recv_q.get(timeout=BUILD_TIMEOUT)
+        except queue.Empty:
+            self.log(wrk, '!!', f'Worker timed out after {BUILD_TIMEOUT}s')
+            if not wrk.closing:
+                print(f'\n  Error from {wrk.name}: timed out')
+            return None
+        if status == 'error':
+            self.log(wrk, '!!', str(val))
+            if not wrk.closing:
+                print(f'\n  Error from {wrk.name}: {val}')
+            return None
+        resp = val
+        self.log(wrk, '<<', json.dumps(resp, separators=(',', ':')))
+        if resp.get('resp') == 'error':
+            if not wrk.closing:
+                print(f'\n  Worker error on {wrk.name}: '
+                      f'{resp.get("msg", "unknown")}')
+            return None
+        return resp
+
+    def write_result(self, wrk, resp):
+        """Write a build result and update progress
+
+        Returns:
+            bool: True on success, False on error
+        """
+        if self.boss_log:
+            self.boss_log.record_recv(wrk.name, resp.get('load_avg', 0.0))
+        try:
+            _write_remote_result(
+                self.builder, resp, self.board_selected, wrk.name)
+        except Exception as exc:  # pylint: disable=W0718
+            self.log(wrk, '!!', f'unexpected: {exc}')
+            print(f'\n  Unexpected error on {wrk.name}: {exc}')
+            return False
+        return True
+
+    def wait_for_prepare(self, wrk, recv_q):
+        """Wait for build_prepare_done, handling progress messages
+
+        Returns:
+            bool: True if prepare succeeded
+        """
+        while True:
+            resp = self.recv(wrk, recv_q)
+            if resp is None:
+                return False
+            if self.update_progress(resp, wrk):
+                continue
+            resp_type = resp.get('resp')
+            if resp_type == 'build_prepare_done':
+                return True
+            if resp_type == 'heartbeat':
+                continue
+            self.log(wrk, '!!', f'unexpected during prepare: {resp_type}')
+            return False
+
+    @staticmethod
+    def send_batch(wrk, boards):
+        """Send a batch of boards to a worker
+
+        Returns:
+            int: Number of boards sent, or -1 on error
+        """
+        for brd in boards:
+            try:
+                wrk.build_board(brd.target, brd.arch)
+            except BossError:
+                return -1
+        return len(boards)
+
+    def collect_results(self, wrk, recv_q, state):
+        """Collect results and send more boards as threads free up
+
+        Args:
+            wrk (RemoteWorker): Worker to collect from
+            recv_q (queue.Queue): Response queue from start_reader()
+            state (DemandState): Mutable build state for this worker
+        """
+        while state.received < state.expected:
+            resp = self.recv(wrk, recv_q)
+            if resp is None:
+                return False
+            resp_type = resp.get('resp')
+            if resp_type == 'heartbeat':
+                continue
+            if resp_type == 'build_done':
+                return True
+            if resp_type != 'build_result':
+                continue
+
+            if not self.write_result(wrk, resp):
+                return False
+            state.received += 1
+
+            target = resp.get('board')
+            results = state.board_results
+            results[target] = results.get(target, 0) + 1
+            if results[target] == state.ncommits:
+                state.in_flight -= 1
+                if state.in_flight < wrk.max_boards:
+                    more = state.grab_func(wrk, 1)
+                    if more and self.send_batch(wrk, more) > 0:
+                        state.sent += 1
+                        state.in_flight += 1
+                        state.expected += state.ncommits
+                        if self.boss_log:
+                            self.boss_log.record_sent(
+                                wrk.name, state.ncommits)
+        return True
+
+    def recv_one(self, wrk, recv_q):
+        """Receive one build result, skipping progress messages
+
+        Returns:
+            bool: True to continue, False to stop this worker
+        """
+        while True:
+            resp = self.recv(wrk, recv_q)
+            if resp is None:
+                return False
+            if self.update_progress(resp, wrk):
+                continue
+            resp_type = resp.get('resp')
+            if resp_type == 'heartbeat':
+                continue
+            if resp_type == 'build_done':
+                nexc = resp.get('exceptions', 0)
+                if nexc:
+                    self.log(wrk, '!!', f'worker finished with {nexc} '
+                             f'thread exception(s)')
+                return False
+            if resp_type == 'build_result':
+                return self.write_result(wrk, resp)
+            return True
+
+    def close(self):
+        """Close all log files and the boss log"""
+        for logf in self.log_files.values():
+            logf.close()
+        if self.boss_log:
+            self.boss_log.log_status()
+            self.boss_log.log('dispatch: end')
+            self.boss_log.close()
+
+
+class WorkerPool:
+    """Manages a pool of remote workers for distributed builds
+
+    Handles starting workers, pushing source, distributing build jobs
+    and collecting results.
+
+    Attributes:
+        workers (list of RemoteWorker): Active workers
+    """
+
+    def __init__(self, machines):
+        """Create a worker pool from available machines
+
+        Args:
+            machines (list of Machine): Available machines from MachinePool
+        """
+        self.workers = []
+        self._machines = machines
+        self._boss_log = None
+
+    def start_all(self, git_dir, refspec, debug=False, settings=None):
+        """Start workers on all machines
+
+        Uses a three-phase approach so that each worker runs the same
+        version of buildman as the boss:
+            1. Create git repos on all machines (parallel)
+            2. Push source to all repos (parallel)
+            3. Start workers from pushed source (parallel)
+            4. Send build settings to all workers (parallel)
+
+        Args:
+            git_dir (str): Local git directory to push
+            refspec (str): Git refspec to push
+            debug (bool): True to pass -D to workers for tracebacks
+            settings (dict or None): Build settings to send to workers
+
+        Returns:
+            list of RemoteWorker: Workers that started successfully
+        """
+        # Phase 1: init git repos
+        ready = self._run_parallel(
+            'Preparing', self._machines, self._init_one)
+
+        # Phase 2: push source
+        ready = self._run_parallel('Pushing source to', ready,
+            lambda wrk: wrk.push_source(git_dir, refspec))
+
+        # Phase 3: start workers
+        self.workers = self._run_parallel('Starting', ready,
+            lambda wrk: self._start_one(wrk, debug))
+
+        # Phase 4: send build settings
+        if settings and self.workers:
+            self._run_parallel('Configuring', self.workers,
+                lambda wrk: wrk.configure(settings))
+
+        return self.workers
+
+    def _init_one(self, mach):
+        """Create a RemoteWorker and initialise its git repo
+
+        Args:
+            mach: Machine object with hostname attribute
+
+        Returns:
+            RemoteWorker: Initialised worker
+        """
+        wrk = RemoteWorker(mach.hostname, name=mach.name)
+        wrk.toolchains = dict(mach.toolchains)
+        wrk.bogomips = mach.info.bogomips if mach.info else 0.0
+        wrk.max_boards = mach.max_boards
+        wrk.init_git()
+        return wrk
+
+    @staticmethod
+    def _start_one(wrk, debug=False):
+        """Start the worker process from the pushed tree
+
+        Args:
+            wrk (RemoteWorker): Worker to start
+            debug (bool): True to pass -D to the worker
+        """
+        wrk.start(debug=debug)
+
+    def _run_parallel(self, label, items, func):
+        """Run a function on items in parallel, collecting successes
+
+        Args:
+            label (str): Progress label (e.g. 'Pushing source to')
+            items (list): Items to process
+            func (callable): Function to call on each item. May return
+                a replacement item; if None is returned, the original
+                item is kept.
+
+        Returns:
+            list: Items that succeeded (possibly replaced by func)
+        """
+        lock = threading.Lock()
+        results = []
+        done = []
+
+        def _run_one(item):
+            name = getattr(item, 'name', getattr(item, 'hostname', str(item)))
+            try:
+                replacement = func(item)
+                with lock:
+                    results.append(replacement if replacement else item)
+                    done.append(name)
+                    tout.progress(f'{label} workers {len(done)}/'
+                        f'{len(items)}: {", ".join(done)}')
+            except WorkerBusy:
+                with lock:
+                    done.append(f'{name} (BUSY)')
+                    tout.progress(f'{label} workers {len(done)}/'
+                        f'{len(items)}: {", ".join(done)}')
+            except BossError as exc:
+                # Clean up lock if the worker was initialised
+                if hasattr(item, 'remove_lock'):
+                    item.remove_lock()
+                with lock:
+                    done.append(f'{name} (FAILED)')
+                    tout.progress(f'{label} workers {len(done)}/'
+                        f'{len(items)}: {", ".join(done)}')
+                print(f'\n  Worker failed on {name}: {exc}')
+
+        tout.progress(f'{label} workers on {len(items)} machines')
+        threads = []
+        for item in items:
+            thr = threading.Thread(target=_run_one, args=(item,))
+            thr.start()
+            threads.append(thr)
+        for thr in threads:
+            thr.join()
+        tout.clear_progress()
+        return results
+
+    @staticmethod
+    def _get_capacity(wrk):
+        """Get a worker's build capacity score
+
+        Uses nthreads * bogomips as the capacity metric. Falls back to
+        nthreads alone if bogomips is not available.
+
+        Args:
+            wrk (RemoteWorker): Worker to score
+
+        Returns:
+            float: Capacity score (higher is faster)
+        """
+        bogo = wrk.bogomips if wrk.bogomips else 1.0
+        return wrk.nthreads * bogo
+
+    def _get_worker_for_arch(self, arch, assigned):
+        """Pick the next worker that supports a given architecture
+
+        Distributes boards proportionally to each worker's capacity
+        (nthreads * bogomips). Picks the capable worker whose current
+        assignment is most below its fair share.
+
+        Args:
+            arch (str): Board architecture (e.g. 'arm', 'aarch64')
+            assigned (dict): worker -> int count of boards assigned so far
+
+        Returns:
+            RemoteWorker or None: A worker with the right toolchain
+        """
+        if arch == 'sandbox':
+            capable = list(self.workers)
+        else:
+            capable = [w for w in self.workers if arch in w.toolchains]
+        if not capable:
+            return None
+
+        total_cap = sum(self._get_capacity(w) for w in capable)
+        if not total_cap:
+            total_cap = len(capable)
+
+        # Pick the worker with the lowest assigned / capacity ratio
+        best = min(capable, key=lambda w: (assigned.get(w, 0) /
+                                           (self._get_capacity(w) or 1)))
+        assigned[best] = assigned.get(best, 0) + 1
+        return best
+
+    def build_boards(self, board_selected, commits, builder, local_count=0):
+        """Build boards on remote workers and write results locally
+
+        Uses demand-driven dispatch: boards are fed to workers from a
+        shared pool. Each worker gets one board per thread initially,
+        then one more each time a board completes. Faster workers
+        naturally get more boards.
+
+        Args:
+            board_selected (dict): target_name -> Board to build remotely
+            commits (list of Commit or None): Commits to build
+            builder (Builder): Builder object for result processing
+            local_count (int): Number of boards being built locally
+        """
+        if not self.workers or not board_selected:
+            return
+
+        ncommits = max(1, len(commits)) if commits else 1
+
+        # Build a pool of boards that have work remaining
+        pool = list(board_selected.values())
+        if not builder.force_build:
+            commit_range = range(len(commits)) if commits else range(1)
+            pool = [b for b in pool
+                    if any(not os.path.exists(
+                               builder.get_done_file(cu, b.target))
+                           for cu in commit_range)]
+
+        # Filter boards that no worker can handle
+        capable_archs = set()
+        for wrk in self.workers:
+            capable_archs.update(wrk.toolchains.keys())
+        capable_archs.add('sandbox')
+        skipped = [b for b in pool if b.arch not in capable_archs]
+        pool = [b for b in pool if b.arch in capable_archs]
+        if skipped:
+            builder.count -= len(skipped) * ncommits
+
+        if not pool:
+            print('No remote jobs to dispatch')
+            return
+
+        total_jobs = len(pool) * ncommits
+        nmach = len(self.workers)
+        if local_count:
+            nmach += 1
+        parts = [f'{len(pool)} boards', f'{ncommits} commits {nmach} machines']
+        print(f'Building {" x ".join(parts)} (demand-driven)')
+
+        self._boss_log = _BossLog(builder.base_dir)
+        self._boss_log.log(f'dispatch: {len(self.workers)} workers, '
+            f'{total_jobs} total jobs')
+        for wrk in self.workers:
+            self._boss_log.init_worker(wrk)
+
+        self._dispatch_demand(pool, commits, builder, board_selected)
+
+    @staticmethod
+    def _grab_boards(pool, pool_lock, wrk, count):
+        """Take up to count boards from pool that wrk can build
+
+        Args:
+            pool (list of Board): Shared board pool (modified in place)
+            pool_lock (threading.Lock): Lock protecting the pool
+            wrk (RemoteWorker): Worker to match toolchains against
+            count (int): Maximum number of boards to take
+
+        Returns:
+            list of Board: Boards taken from the pool
+        """
+        with pool_lock:
+            batch = []
+            remaining = []
+            for brd in pool:
+                if len(batch) >= count:
+                    remaining.append(brd)
+                elif (brd.arch == 'sandbox'
+                      or brd.arch in wrk.toolchains):
+                    batch.append(brd)
+                else:
+                    remaining.append(brd)
+            pool[:] = remaining
+            return batch
+
+    def _dispatch_jobs(self, worker_jobs, builder, board_selected):
+        """Send build jobs to workers and collect results
+
+        Opens a log file per worker, then runs each worker's jobs
+        in a separate thread.
+
+        Args:
+            worker_jobs (dict): worker -> list of (board, commit_upto,
+                commit) tuples
+            builder (Builder): Builder for result processing
+            board_selected (dict): target_name -> Board mapping
+        """
+        ctx = _DispatchContext(worker_jobs.keys(), builder, board_selected,
+            self._boss_log)
+
+        if ctx.boss_log:
+            ctx.boss_log.start_timer()
+
+        threads = []
+        for wrk, wjobs in worker_jobs.items():
+            thr = threading.Thread(
+                target=self._run_batch_worker, args=(wrk, wjobs, ctx),
+                daemon=True)
+            thr.start()
+            threads.append(thr)
+        for thr in threads:
+            thr.join()
+
+        ctx.close()
+        self._boss_log = None
+
+    @staticmethod
+    def _run_batch_worker(wrk, wjobs, ctx):
+        """Send build commands to one worker and collect results
+
+        Args:
+            wrk (RemoteWorker): Worker to run
+            wjobs (list): List of (board, commit_upto, commit) tuples
+            ctx (_DispatchContext): Shared dispatch infrastructure
+        """
+        recv_q = ctx.start_reader(wrk)
+
+        board_infos = {}
+        commit_list = []
+        commit_set = set()
+        for brd, _, commit in wjobs:
+            target = brd.target
+            if target not in board_infos:
+                board_infos[target] = {
+                    'board': target, 'arch': brd.arch}
+            commit_hash = commit.hash if commit else None
+            if commit_hash not in commit_set:
+                commit_set.add(commit_hash)
+                commit_list.append(commit_hash)
+
+        boards_list = list(board_infos.values())
+        total = len(boards_list) * len(commit_list)
+
+        ctx.log(wrk, '>>', f'{len(boards_list)} boards x '
+                f'{len(commit_list)} commits')
+        if ctx.boss_log:
+            ctx.boss_log.log(f'{wrk.name}: {len(boards_list)} boards'
+                f' x {len(commit_list)} commits')
+
+        try:
+            wrk.build_boards(boards_list, commit_list)
+        except BossError as exc:
+            ctx.log(wrk, '!!', str(exc))
+            if not wrk.closing:
+                print(f'\n  Error from {wrk.name}: {exc}')
+            return
+        if ctx.boss_log:
+            ctx.boss_log.record_sent(wrk.name, total)
+
+        for _ in range(total):
+            if not ctx.recv_one(wrk, recv_q):
+                return
+
+    def _start_demand_worker(  # pylint: disable=R0913
+            self, wrk, ctx, commit_list, ncommits, pool, pool_lock):
+        """Prepare a worker and send the initial batch of boards
+
+        Args:
+            wrk (RemoteWorker): Worker to run
+            ctx (_DispatchContext): Shared dispatch infrastructure
+            commit_list (list of str): Commit hashes to build
+            ncommits (int): Number of commits
+            pool (list of Board): Shared board pool
+            pool_lock (threading.Lock): Lock protecting the pool
+
+        Returns:
+            tuple: (recv_q, state) on success, or (None, None) if the
+                worker failed during prepare or had no boards to build
+        """
+        recv_q = ctx.start_reader(wrk)
+
+        try:
+            wrk.build_prepare(commit_list)
+        except BossError as exc:
+            ctx.log(wrk, '!!', str(exc))
+            if not wrk.closing:
+                print(f'\n  Error from {wrk.name}: {exc}')
+            return None, None
+
+        if not ctx.wait_for_prepare(wrk, recv_q):
+            return None, None
+
+        initial = self._grab_boards(pool, pool_lock, wrk, wrk.max_boards)
+        if not initial:
+            try:
+                wrk.build_done()
+            except BossError:
+                pass
+            return None, None
+
+        count = ctx.send_batch(wrk, initial)
+        if count < 0:
+            return None, None
+        if ctx.boss_log:
+            ctx.boss_log.record_sent(wrk.name, count * ncommits)
+        ctx.log(wrk, '>>', f'{count} boards (initial,'
+                f' max_boards={wrk.max_boards})')
+
+        def _grab(w, n):
+            return self._grab_boards(pool, pool_lock, w, n)
+
+        state = DemandState(count, ncommits, _grab)
+        return recv_q, state
+
+    @staticmethod
+    def _finish_demand_worker(wrk, ctx, recv_q, state):
+        """Collect results and finish the demand-driven protocol
+
+        Args:
+            wrk (RemoteWorker): Worker to collect from
+            ctx (_DispatchContext): Shared dispatch infrastructure
+            recv_q (queue.Queue): Response queue from start_reader()
+            state (DemandState): Build state from _start_demand_worker()
+        """
+        ctx.collect_results(wrk, recv_q, state)
+
+        ctx.log(wrk, '>>', f'{state.sent} boards total')
+        try:
+            wrk.build_done()
+        except BossError as exc:
+            ctx.log(wrk, '!!', str(exc))
+            return
+
+        # Wait for worker's build_done
+        while True:
+            resp = ctx.recv(wrk, recv_q)
+            if resp is None:
+                return
+            if resp.get('resp') == 'build_done':
+                break
+
+    def _dispatch_demand(self, pool, commits, builder, board_selected):
+        """Dispatch boards on demand from a shared pool
+
+        Each worker gets boards from the pool as it finishes previous
+        ones, so faster workers naturally get more work.
+
+        Args:
+            pool (list of Board): Boards available to build
+            commits (list of Commit or None): Commits to build
+            builder (Builder): Builder for result processing
+            board_selected (dict): target_name -> Board mapping
+        """
+        commit_list = [c.hash if c else None for c in (commits or [None])]
+        ncommits = len(commit_list)
+        pool_lock = threading.Lock()
+
+        ctx = _DispatchContext(
+            self.workers, builder, board_selected, self._boss_log)
+
+        if ctx.boss_log:
+            ctx.boss_log.start_timer()
+
+        def _run_worker(wrk):
+            recv_q, state = self._start_demand_worker(
+                wrk, ctx, commit_list, ncommits, pool, pool_lock)
+            if recv_q is not None:
+                self._finish_demand_worker(wrk, ctx, recv_q, state)
+
+        threads = []
+        for wrk in self.workers:
+            thr = threading.Thread(target=_run_worker, args=(wrk,),
+                daemon=True)
+            thr.start()
+            threads.append(thr)
+        for thr in threads:
+            thr.join()
+
+        ctx.close()
+        self._boss_log = None
+
+    def quit_all(self):
+        """Quit all workers gracefully"""
+        self.print_transfer_summary()
+        if self._boss_log:
+            self._boss_log.log('quit: shutting down')
+            self._boss_log.log_status()
+            self._boss_log.close()
+            self._boss_log = None
+        for wrk in self.workers:
+            try:
+                wrk.quit()
+            except BossError:
+                wrk.close()
+        self.workers = []
+
+    def print_transfer_summary(self):
+        """Print data transfer summary for all workers"""
+        if not self.workers:
+            return
+        total_sent = 0
+        total_recv = 0
+        parts = []
+        for wrk in self.workers:
+            sent = getattr(wrk, 'bytes_sent', 0)
+            recv = getattr(wrk, 'bytes_recv', 0)
+            total_sent += sent
+            total_recv += recv
+            name = getattr(wrk, 'name', '?')
+            parts.append(f'{name}:'
+                         f'{_format_bytes(sent)}/'
+                         f'{_format_bytes(recv)}')
+        sys.stderr.write(f'\nTransfer (sent/recv): {", ".join(parts)}'
+            f' total:{_format_bytes(total_sent)}/'
+            f'{_format_bytes(total_recv)}\n')
+        sys.stderr.flush()
+
+    def close_all(self):
+        """Stop all workers immediately
+
+        Use this on Ctrl-C. Sends a quit command to all workers first,
+        then waits briefly for the commands to travel through SSH
+        before closing the connections. This two-phase approach avoids
+        a race where closing SSH kills the connection before the quit
+        command is forwarded to the remote worker.
+        """
+        self.print_transfer_summary()
+        if self._boss_log:
+            self._boss_log.log('interrupted: Ctrl-C')
+            self._boss_log.log_status()
+            self._boss_log.close()
+            self._boss_log = None
+
+        # Suppress error messages from reader threads
+        for wrk in self.workers:
+            wrk.closing = True
+
+        # Phase 1: send quit to all workers
+        for wrk in self.workers:
+            try:
+                wrk._send({'cmd': 'quit'})  # pylint: disable=W0212
+            except BossError:
+                pass
+
+        # Brief pause so SSH can forward the quit commands to the
+        # remote workers before we tear down the connections
+        time.sleep(0.5)
+
+        # Phase 2: close all connections
+        for wrk in self.workers:
+            wrk.close()
+            wrk.remove_lock()
+        self.workers = []
diff --git a/tools/buildman/main.py b/tools/buildman/main.py
index 225e341fc26..b9779c38408 100755
--- a/tools/buildman/main.py
+++ b/tools/buildman/main.py
@@ -43,6 +43,7 @@  def run_tests(skip_net_tests, debug, verbose, args):
     from buildman import test_cfgutil
     from buildman import test_machine
     from buildman import test_worker
+    from buildman import test_boss
 
     test_name = args.terms and args.terms[0] or None
     if skip_net_tests:
@@ -67,6 +68,7 @@  def run_tests(skip_net_tests, debug, verbose, args):
          test_builder.TestPrintBuildSummary,
          test_machine,
          test_worker,
+         test_boss,
          'buildman.toolchain'])
 
     return (0 if result.wasSuccessful() else 1)
diff --git a/tools/buildman/test_boss.py b/tools/buildman/test_boss.py
new file mode 100644
index 00000000000..2f6710c1a58
--- /dev/null
+++ b/tools/buildman/test_boss.py
@@ -0,0 +1,2645 @@ 
+# SPDX-License-Identifier: GPL-2.0+
+# Copyright 2026 Simon Glass <sjg@chromium.org>
+
+"""Tests for the boss module"""
+
+# pylint: disable=C0302,E1101,W0212,W0612
+
+import io
+import json
+import os
+import queue
+import random
+import shutil
+import subprocess
+import tempfile
+import threading
+import time
+import types
+import unittest
+from unittest import mock
+
+from u_boot_pylib import command
+from u_boot_pylib import terminal
+from u_boot_pylib import tools
+
+from buildman import boss
+from buildman import bsettings
+from buildman import machine
+from buildman import worker as worker_mod
+
+
+def _make_response(obj):
+    """Create a BM>-prefixed response line as bytes"""
+    return (worker_mod.RESPONSE_PREFIX + json.dumps(obj) + '\n').encode()
+
+
+class FakeProc:
+    """Fake subprocess.Popen for testing"""
+
+    def __init__(self, responses=None):
+        self.stdin = io.BytesIO()
+        self._responses = responses or []
+        self._resp_idx = 0
+        self.stdout = self
+        self.stderr = io.BytesIO(b'')
+        self._returncode = None
+
+    def readline(self):
+        """Return the next canned response line"""
+        if self._resp_idx < len(self._responses):
+            line = self._responses[self._resp_idx]
+            self._resp_idx += 1
+            return line
+        return b''
+
+    def poll(self):
+        """Return the process return code"""
+        return self._returncode
+
+    def terminate(self):
+        """Simulate SIGTERM"""
+        self._returncode = -15
+
+    def kill(self):
+        """Simulate SIGKILL"""
+        self._returncode = -9
+
+    def wait(self, timeout=None):
+        """Wait for the process (no-op)"""
+
+
+class TestRunSsh(unittest.TestCase):
+    """Test _run_ssh()"""
+
+    @mock.patch('buildman.boss.command.run_pipe')
+    def test_success(self, mock_pipe):
+        """Test successful one-shot SSH command"""
+        mock_pipe.return_value = mock.Mock(
+            stdout='/tmp/bm-worker-abc\n')
+        result = boss._run_ssh('host1', 'echo hello')
+        self.assertEqual(result, '/tmp/bm-worker-abc')
+
+    @mock.patch('buildman.boss.command.run_pipe')
+    def test_failure(self, mock_pipe):
+        """Test SSH command failure"""
+        mock_pipe.side_effect = command.CommandExc(
+            'connection refused', command.CommandResult())
+        with self.assertRaises(boss.BossError) as ctx:
+            boss._run_ssh('host1', 'echo hello')
+        self.assertIn('SSH command failed', str(ctx.exception))
+
+
+class TestKillWorkers(unittest.TestCase):
+    """Test kill_workers()"""
+
+    @mock.patch('buildman.boss._run_ssh')
+    def test_kill_workers(self, mock_ssh):
+        """Test killing workers on multiple machines"""
+        mock_ssh.side_effect = ['killed 1234', 'no workers']
+        with terminal.capture():
+            result = boss.kill_workers(['host1', 'host2'])
+        self.assertEqual(result, 0)
+        self.assertEqual(mock_ssh.call_count, 2)
+
+    @mock.patch('buildman.boss._run_ssh')
+    def test_kill_workers_ssh_failure(self, mock_ssh):
+        """Test that SSH failures are reported but do not abort"""
+        mock_ssh.side_effect = boss.BossError('connection refused')
+        with terminal.capture():
+            result = boss.kill_workers(['host1'])
+        self.assertEqual(result, 0)
+
+
+class TestRemoteWorkerInitGit(unittest.TestCase):
+    """Test RemoteWorker.init_git()"""
+
+    @mock.patch('buildman.boss._run_ssh')
+    def test_init_git(self, mock_ssh):
+        """Test successful git init"""
+        mock_ssh.return_value = '/tmp/bm-worker-abc'
+        w = boss.RemoteWorker('host1')
+        w.init_git()
+        self.assertEqual(w.work_dir, '/tmp/bm-worker-abc')
+        self.assertEqual(w.git_dir, '/tmp/bm-worker-abc/.git')
+
+    @mock.patch('buildman.boss._run_ssh')
+    def test_init_git_busy(self, mock_ssh):
+        """Test init_git when machine is locked"""
+        mock_ssh.return_value = 'BUSY'
+        w = boss.RemoteWorker('host1')
+        with self.assertRaises(boss.WorkerBusy) as ctx:
+            w.init_git()
+        self.assertIn('busy', str(ctx.exception))
+
+    @mock.patch('buildman.boss._run_ssh')
+    def test_init_git_empty_output(self, mock_ssh):
+        """Test init_git with empty output"""
+        mock_ssh.return_value = ''
+        w = boss.RemoteWorker('host1')
+        with self.assertRaises(boss.BossError) as ctx:
+            w.init_git()
+        self.assertIn('returned no work directory', str(ctx.exception))
+
+    @mock.patch('buildman.boss._run_ssh')
+    def test_init_git_ssh_failure(self, mock_ssh):
+        """Test init_git when SSH fails"""
+        mock_ssh.side_effect = boss.BossError('connection refused')
+        w = boss.RemoteWorker('host1')
+        with self.assertRaises(boss.BossError):
+            w.init_git()
+
+
+def _make_result(board, commit_upto=0, return_code=0,
+                  stderr='', stdout=''):
+    """Create a build_result response dict"""
+    return {
+        'resp': 'build_result',
+        'board': board,
+        'commit_upto': commit_upto,
+        'return_code': return_code,
+        'stderr': stderr,
+        'stdout': stdout,
+    }
+
+
+def _make_builder(tmpdir, force_build=True):
+    """Create a mock Builder with base_dir set"""
+    builder = mock.Mock()
+    builder.force_build = force_build
+    builder.base_dir = tmpdir
+    builder.count = 0
+    builder.get_build_dir.side_effect = (
+        lambda c, b: os.path.join(tmpdir, b))
+    return builder
+
+
+def _start_worker(hostname, mock_popen, proc):
+    """Helper to create a worker with work_dir set and start it"""
+    mock_popen.return_value = proc
+    wrk = boss.RemoteWorker(hostname)
+    wrk.work_dir = '/tmp/bm-worker-123'
+    wrk.git_dir = '/tmp/bm-worker-123/.git'
+    wrk.start()
+    return wrk
+
+
+class TestRemoteWorkerStart(unittest.TestCase):
+    """Test RemoteWorker.start()"""
+
+    @mock.patch('subprocess.Popen')
+    def test_start_success(self, mock_popen):
+        """Test successful worker start"""
+        proc = FakeProc([
+            _make_response({'resp': 'ready', 'nthreads': 8}),
+        ])
+        w = _start_worker('myhost', mock_popen, proc)
+        self.assertEqual(w.nthreads, 8)
+
+        # Check SSH command runs from pushed tree
+        cmd = mock_popen.call_args[0][0]
+        self.assertIn('ssh', cmd)
+        self.assertIn('myhost', cmd)
+        self.assertIn('--worker', cmd[-1])
+
+    @mock.patch('subprocess.Popen')
+    def test_start_not_ready(self, mock_popen):
+        """Test start when worker sends unexpected response"""
+        proc = FakeProc([
+            _make_response({'resp': 'error', 'msg': 'broken'}),
+        ])
+        mock_popen.return_value = proc
+
+        w = boss.RemoteWorker('badhost')
+        w.work_dir = '/tmp/bm-test'
+        with self.assertRaises(boss.BossError) as ctx:
+            w.start()
+        self.assertIn('did not send ready', str(ctx.exception))
+
+    @mock.patch('subprocess.Popen')
+    def test_start_ssh_failure(self, mock_popen):
+        """Test start when SSH fails to launch"""
+        mock_popen.side_effect = OSError('No such file')
+
+        w = boss.RemoteWorker('badhost')
+        w.work_dir = '/tmp/bm-test'
+        with self.assertRaises(boss.BossError) as ctx:
+            w.start()
+        self.assertIn('Failed to start SSH', str(ctx.exception))
+
+    @mock.patch('subprocess.Popen')
+    def test_start_connection_closed(self, mock_popen):
+        """Test start when connection closes immediately"""
+        proc = FakeProc([])  # No responses
+        mock_popen.return_value = proc
+
+        w = boss.RemoteWorker('deadhost')
+        w.work_dir = '/tmp/bm-test'
+        with self.assertRaises(boss.BossError) as ctx:
+            w.start()
+        self.assertIn('closed connection', str(ctx.exception))
+
+    def test_start_no_work_dir(self):
+        """Test start without init_git raises error"""
+        w = boss.RemoteWorker('host1')
+        with self.assertRaises(boss.BossError) as ctx:
+            w.start()
+        self.assertIn('call init_git', str(ctx.exception))
+
+    @mock.patch('subprocess.Popen')
+    def test_max_boards_defaults_to_nthreads(self, mock_popen):
+        """Test max_boards defaults to nthreads when not configured"""
+        proc = FakeProc([
+            _make_response({'resp': 'ready', 'nthreads': 64}),
+        ])
+        w = _start_worker('host1', mock_popen, proc)
+        self.assertEqual(w.max_boards, 64)
+
+    @mock.patch('subprocess.Popen')
+    def test_max_boards_preserved_when_set(self, mock_popen):
+        """Test max_boards keeps its configured value after start"""
+        proc = FakeProc([
+            _make_response({'resp': 'ready', 'nthreads': 256}),
+        ])
+        mock_popen.return_value = proc
+        w = boss.RemoteWorker('host1')
+        w.work_dir = '/tmp/bm-test'
+        w.max_boards = 64
+        w.start()
+        self.assertEqual(w.nthreads, 256)
+        self.assertEqual(w.max_boards, 64)
+
+
+class TestRemoteWorkerPush(unittest.TestCase):
+    """Test RemoteWorker.push_source()"""
+
+    def test_push_no_init(self):
+        """Test push before init_git raises error"""
+        w = boss.RemoteWorker('host1')
+        with self.assertRaises(boss.BossError) as ctx:
+            w.push_source('/tmp/repo', 'HEAD:refs/heads/work')
+        self.assertIn('call init_git first', str(ctx.exception))
+
+    @mock.patch('buildman.boss.command.run_pipe')
+    def test_push_success(self, mock_pipe):
+        """Test successful git push"""
+        mock_pipe.return_value = mock.Mock(return_code=0)
+        w = boss.RemoteWorker('host1')
+        w.git_dir = '/tmp/bm-worker-123/.git'
+
+        w.push_source('/home/user/u-boot', 'HEAD:refs/heads/work')
+        cmd = mock_pipe.call_args[0][0][0]
+        self.assertIn('git', cmd)
+        self.assertIn('push', cmd)
+        self.assertIn('host1:/tmp/bm-worker-123/.git', cmd)
+        self.assertIn('HEAD:refs/heads/work', cmd)
+
+    @mock.patch('buildman.boss.command.run_pipe')
+    def test_push_failure(self, mock_pipe):
+        """Test git push failure"""
+        mock_pipe.side_effect = command.CommandExc(
+            'push failed', command.CommandResult())
+        w = boss.RemoteWorker('host1')
+        w.git_dir = '/tmp/bm/.git'
+
+        with self.assertRaises(boss.BossError) as ctx:
+            w.push_source('/tmp/repo', 'HEAD:refs/heads/work')
+        self.assertIn('git push', str(ctx.exception))
+
+
+class TestRemoteWorkerBuildBoards(unittest.TestCase):
+    """Test RemoteWorker.build_boards() and recv()"""
+
+    @mock.patch('subprocess.Popen')
+    def test_build_boards_and_recv(self, mock_popen):
+        """Test sending build_boards and receiving results"""
+        proc = FakeProc([
+            _make_response({'resp': 'ready', 'nthreads': 4}),
+            _make_response({
+                'resp': 'build_result', 'board': 'sandbox',
+                'commit_upto': 0, 'return_code': 0,
+                'stderr': '', 'stdout': '',
+            }),
+        ])
+        w = _start_worker('host1', mock_popen, proc)
+        boards = [{'board': 'sandbox', 'defconfig': 'sandbox_defconfig',
+                    'env': {}}]
+        w.build_boards(boards, ['abc123'])
+
+        # Check the command was sent
+        sent = proc.stdin.getvalue().decode()
+        obj = json.loads(sent)
+        self.assertEqual(obj['cmd'], 'build_boards')
+        self.assertEqual(len(obj['boards']), 1)
+        self.assertEqual(obj['boards'][0]['board'], 'sandbox')
+        self.assertEqual(obj['commits'], ['abc123'])
+
+        # Receive result
+        resp = w.recv()
+        self.assertEqual(resp['resp'], 'build_result')
+        self.assertEqual(resp['board'], 'sandbox')
+
+
+class TestRemoteWorkerQuit(unittest.TestCase):
+    """Test RemoteWorker.quit() and close()"""
+
+    @mock.patch('buildman.boss._run_ssh')
+    @mock.patch('subprocess.Popen')
+    def test_quit(self, mock_popen, mock_ssh):
+        """Test clean quit removes the lock"""
+        proc = FakeProc([
+            _make_response({'resp': 'ready', 'nthreads': 4}),
+            _make_response({'resp': 'quit_ack'}),
+        ])
+        w = _start_worker('host1', mock_popen, proc)
+        resp = w.quit()
+        self.assertEqual(resp.get('resp'), 'quit_ack')
+        self.assertIsNone(w._proc)
+        # Lock removal SSH should have been called
+        mock_ssh.assert_called_once()
+        self.assertIn('rm -f', mock_ssh.call_args[0][1])
+
+    @mock.patch('subprocess.Popen')
+    def test_close_without_quit(self, mock_popen):
+        """Test close without sending quit"""
+        proc = FakeProc([
+            _make_response({'resp': 'ready', 'nthreads': 4}),
+        ])
+        w = _start_worker('host1', mock_popen, proc)
+        self.assertIsNotNone(w._proc)
+        w.close()
+        self.assertIsNone(w._proc)
+
+    def test_close_when_not_started(self):
+        """Test close on a worker that was never started"""
+        w = boss.RemoteWorker('host1')
+        w.close()  # Should not raise
+
+
+class TestRemoteWorkerRecv(unittest.TestCase):
+    """Test response parsing"""
+
+    @mock.patch('subprocess.Popen')
+    def test_skip_non_protocol_lines(self, mock_popen):
+        """Test that non-BM> lines are skipped"""
+        proc = FakeProc([
+            b'Welcome to myhost\n',
+            b'Last login: Mon Jan 1\n',
+            _make_response({'resp': 'ready', 'nthreads': 2}),
+        ])
+        w = _start_worker('host1', mock_popen, proc)
+        self.assertEqual(w.nthreads, 2)
+
+    @mock.patch('subprocess.Popen')
+    def test_bad_json(self, mock_popen):
+        """Test bad JSON in protocol line"""
+        proc = FakeProc([
+            (worker_mod.RESPONSE_PREFIX + 'not json\n').encode(),
+        ])
+        mock_popen.return_value = proc
+
+        w = boss.RemoteWorker('host1')
+        w._proc = proc
+        with self.assertRaises(boss.BossError) as ctx:
+            w._recv()
+        self.assertIn('Bad JSON', str(ctx.exception))
+
+
+class TestRemoteWorkerSend(unittest.TestCase):
+    """Test _send()"""
+
+    def test_send_when_not_running(self):
+        """Test sending to a stopped worker"""
+        w = boss.RemoteWorker('host1')
+        with self.assertRaises(boss.BossError) as ctx:
+            w._send({'cmd': 'quit'})
+        self.assertIn('not running', str(ctx.exception))
+
+
+class TestRemoteWorkerRepr(unittest.TestCase):
+    """Test __repr__"""
+
+    def test_repr_stopped(self):
+        """Test repr when stopped"""
+        w = boss.RemoteWorker('host1')
+        self.assertIn('host1', repr(w))
+        self.assertIn('stopped', repr(w))
+
+    @mock.patch('subprocess.Popen')
+    def test_repr_running(self, mock_popen):
+        """Test repr when running"""
+        proc = FakeProc([
+            _make_response({'resp': 'ready', 'nthreads': 8}),
+        ])
+        w = _start_worker('host1', mock_popen, proc)
+        self.assertIn('running', repr(w))
+        self.assertIn('nthreads=8', repr(w))
+        w.close()
+
+
+class FakeBoard:  # pylint: disable=R0903
+    """Fake board for testing split_boards()"""
+
+    def __init__(self, target, arch):
+        self.target = target
+        self.arch = arch
+
+
+class TestSplitBoards(unittest.TestCase):
+    """Test split_boards()"""
+
+    def test_all_local(self):
+        """Test when no remote toolchains match"""
+        boards = {
+            'sandbox': FakeBoard('sandbox', 'sandbox'),
+            'rpi': FakeBoard('rpi', 'arm'),
+        }
+        local, remote = boss.split_boards(boards, {'x86': '/usr/bin/gcc'})
+        self.assertEqual(len(local), 2)
+        self.assertEqual(len(remote), 0)
+
+    def test_all_remote(self):
+        """Test when all boards have remote toolchains"""
+        boards = {
+            'rpi': FakeBoard('rpi', 'arm'),
+            'odroid': FakeBoard('odroid', 'arm'),
+        }
+        local, remote = boss.split_boards(boards, {'arm': '/usr/bin/gcc'})
+        self.assertEqual(len(local), 0)
+        self.assertEqual(len(remote), 2)
+
+    def test_mixed(self):
+        """Test split with some local, some remote"""
+        boards = {
+            'sandbox': FakeBoard('sandbox', 'sandbox'),
+            'rpi': FakeBoard('rpi', 'arm'),
+            'qemu': FakeBoard('qemu', 'riscv'),
+        }
+        local, remote = boss.split_boards(
+            boards, {'arm': '/usr/bin/gcc', 'riscv': '/usr/bin/gcc'})
+        self.assertEqual(len(local), 1)
+        self.assertIn('sandbox', local)
+        self.assertEqual(len(remote), 2)
+        self.assertIn('rpi', remote)
+        self.assertIn('qemu', remote)
+
+    def test_empty_toolchains(self):
+        """Test with no remote toolchains"""
+        boards = {'sandbox': FakeBoard('sandbox', 'sandbox')}
+        local, remote = boss.split_boards(boards, {})
+        self.assertEqual(len(local), 1)
+        self.assertEqual(len(remote), 0)
+
+    def test_none_toolchains(self):
+        """Test with None toolchains"""
+        boards = {'sandbox': FakeBoard('sandbox', 'sandbox')}
+        local, remote = boss.split_boards(boards, None)
+        self.assertEqual(len(local), 1)
+        self.assertEqual(len(remote), 0)
+
+
+class TestWriteRemoteResult(unittest.TestCase):
+    """Test _write_remote_result()"""
+
+    def test_success(self):
+        """Test writing a successful build result"""
+        with tempfile.TemporaryDirectory() as tmpdir:
+            build_dir = os.path.join(tmpdir, 'sandbox')
+            builder = mock.Mock()
+            builder.get_build_dir.return_value = build_dir
+
+            resp = {
+                'resp': 'build_result',
+                'board': 'sandbox',
+                'commit_upto': 0,
+                'return_code': 0,
+                'stderr': '',
+                'stdout': 'build output',
+            }
+            boss._write_remote_result(builder, resp, {}, 'host1')
+
+            build_dir = builder.get_build_dir.return_value
+            self.assertTrue(os.path.isdir(build_dir))
+
+            self.assertEqual(
+                tools.read_file(os.path.join(build_dir, 'done'),
+                                binary=False), '0\n')
+            self.assertEqual(
+                tools.read_file(os.path.join(build_dir, 'log'),
+                                binary=False), 'build output')
+            self.assertFalse(os.path.exists(
+                os.path.join(build_dir, 'err')))
+
+    def test_failure_with_stderr(self):
+        """Test writing a failed build result with stderr"""
+        with tempfile.TemporaryDirectory() as tmpdir:
+            build_dir = os.path.join(tmpdir, 'rpi')
+            builder = mock.Mock()
+            builder.get_build_dir.return_value = build_dir
+
+            resp = {
+                'resp': 'build_result',
+                'board': 'rpi',
+                'commit_upto': 1,
+                'return_code': 2,
+                'stderr': 'error: undefined reference',
+                'stdout': '',
+            }
+            boss._write_remote_result(builder, resp, {}, 'host1')
+
+            build_dir = builder.get_build_dir.return_value
+            self.assertEqual(
+                tools.read_file(os.path.join(build_dir, 'done'),
+                                binary=False), '2\n')
+            self.assertEqual(
+                tools.read_file(os.path.join(build_dir, 'err'),
+                                binary=False),
+                'error: undefined reference')
+
+    def test_with_sizes(self):
+        """Test writing a result with size information"""
+        with tempfile.TemporaryDirectory() as tmpdir:
+            build_dir = os.path.join(tmpdir, 'sandbox')
+            builder = mock.Mock()
+            builder.get_build_dir.return_value = build_dir
+
+            sizes_raw = ('   text    data     bss     dec     hex\n'
+                         '  12345    1234     567   14146    374a\n')
+            resp = {
+                'resp': 'build_result',
+                'board': 'sandbox',
+                'commit_upto': 0,
+                'return_code': 0,
+                'stderr': '',
+                'stdout': '',
+                'sizes': {'raw': sizes_raw},
+            }
+            boss._write_remote_result(builder, resp, {}, 'host1')
+
+            # Boss strips the header line from sizes
+            build_dir = builder.get_build_dir.return_value
+            self.assertEqual(
+                tools.read_file(os.path.join(build_dir, 'sizes'),
+                                binary=False),
+                '  12345    1234     567   14146    374a')
+
+
+class FakeMachineInfo:  # pylint: disable=R0903
+    """Fake machine info for testing"""
+    bogomips = 5000.0
+
+
+class FakeMachine:  # pylint: disable=R0903
+    """Fake machine for testing WorkerPool"""
+
+    def __init__(self, hostname):
+        self.hostname = hostname
+        self.name = hostname
+        self.toolchains = {'arm': '/usr/bin/arm-linux-gnueabihf-gcc'}
+        self.info = FakeMachineInfo()
+        self.max_boards = 0
+
+def _make_worker():
+    """Create a RemoteWorker with mocked subprocess for testing
+
+    Uses __new__ to avoid calling __init__ which requires real SSH.
+    Sets all attributes to safe defaults.
+    """
+    wrk = boss.RemoteWorker.__new__(boss.RemoteWorker)
+    wrk.hostname = 'host1'
+    wrk.name = 'host1'
+    wrk.nthreads = 4
+    wrk.bogomips = 5000.0
+    wrk.max_boards = 0
+    wrk.slots = 2
+    wrk.toolchains = {}
+    wrk._proc = mock.Mock()
+    wrk._proc.poll.return_value = None  # process is running
+    wrk._log = None
+    wrk._closed = False
+    wrk._closing = False
+    wrk._stderr_buf = []
+    wrk._stderr_thread = None
+    wrk._stderr_lines = []
+    wrk._ready = queue.Queue()
+    wrk._lock_file = None
+    wrk._work_dir = ''
+    wrk._git_dir = ''
+    wrk.work_dir = ''
+    wrk.timeout = 10
+    wrk.bytes_sent = 0
+    wrk.bytes_recv = 0
+    wrk.closing = False
+    return wrk
+
+
+def _make_ctx(board_selected=None):
+    """Create a _DispatchContext with temp directory for testing
+
+    Returns:
+        tuple: (ctx, wrk, tmpdir) — caller must call ctx.close() and
+            shutil.rmtree(tmpdir)
+    """
+    wrk = mock.Mock(nthreads=4, closing=False, max_boards=0,
+                    slots=2, toolchains={'arm': '/gcc'})
+    wrk.name = 'host1'
+    builder = mock.Mock()
+    tmpdir = tempfile.mkdtemp()
+    builder.base_dir = tmpdir
+    ctx = boss._DispatchContext(
+        workers=[wrk], builder=builder,
+        board_selected=board_selected or {},
+        boss_log=mock.Mock())
+    return ctx, wrk, tmpdir
+
+
+class TestWorkerPool(unittest.TestCase):
+    """Test WorkerPool"""
+
+    @mock.patch('subprocess.Popen')
+    @mock.patch('buildman.boss._run_ssh')
+    @mock.patch('buildman.boss.command.run_pipe')
+    def test_start_all(self, mock_pipe, mock_ssh, mock_popen):
+        """Test starting workers on multiple machines"""
+        mock_ssh.return_value = '/tmp/bm-1'
+        mock_pipe.return_value = mock.Mock(return_code=0)
+        proc1 = FakeProc([
+            _make_response({'resp': 'ready', 'nthreads': 4}),
+        ])
+        proc2 = FakeProc([
+            _make_response({'resp': 'ready', 'nthreads': 8}),
+        ])
+        mock_popen.side_effect = [proc1, proc2]
+
+        machines = [FakeMachine('host1'), FakeMachine('host2')]
+        pool = boss.WorkerPool(machines)
+        with terminal.capture():
+            workers = pool.start_all('/tmp/repo', 'HEAD:refs/heads/work')
+        self.assertEqual(len(workers), 2)
+
+    @mock.patch('subprocess.Popen')
+    @mock.patch('buildman.boss._run_ssh')
+    @mock.patch('buildman.boss.command.run_pipe')
+    def test_start_all_with_settings(self, mock_pipe, mock_ssh, mock_popen):
+        """Test that start_all sends settings via configure"""
+        mock_ssh.return_value = '/tmp/bm-1'
+        mock_pipe.return_value = mock.Mock(return_code=0)
+        proc1 = FakeProc([
+            _make_response({'resp': 'ready', 'nthreads': 4}),
+            _make_response({'resp': 'configure_done'}),
+        ])
+        proc2 = FakeProc([
+            _make_response({'resp': 'ready', 'nthreads': 8}),
+            _make_response({'resp': 'configure_done'}),
+        ])
+        mock_popen.side_effect = [proc1, proc2]
+
+        machines = [FakeMachine('host1'), FakeMachine('host2')]
+        pool = boss.WorkerPool(machines)
+        settings = {'no_lto': True, 'allow_missing': True}
+        with terminal.capture():
+            workers = pool.start_all('/tmp/repo', 'HEAD:refs/heads/work',
+                                     settings=settings)
+        self.assertEqual(len(workers), 2)
+        # Check that configure was sent (2nd response consumed)
+        self.assertEqual(proc1._resp_idx, 2)
+        self.assertEqual(proc2._resp_idx, 2)
+
+    @mock.patch('buildman.boss._run_ssh')
+    def test_start_all_init_failure(self, mock_ssh):
+        """Test start_all when init_git fails on one machine"""
+        call_count = [0]
+
+        def _side_effect(_hostname, _cmd, **_kwargs):
+            call_count[0] += 1
+            if call_count[0] % 2 == 0:
+                raise boss.BossError('connection refused')
+            return '/tmp/bm-1'
+
+        mock_ssh.side_effect = _side_effect
+
+        machines = [FakeMachine('good'), FakeMachine('bad')]
+        pool = boss.WorkerPool(machines)
+        with terminal.capture():
+            workers = pool.start_all('/tmp/repo', 'HEAD:refs/heads/work')
+        # Only 'good' survives init phase; push and start not reached
+        # for 'bad'
+        self.assertLessEqual(len(workers), 1)
+
+    def test_quit_all(self):
+        """Test quitting all workers"""
+        pool = boss.WorkerPool([])
+        w1 = mock.Mock(spec=boss.RemoteWorker)
+        w2 = mock.Mock(spec=boss.RemoteWorker)
+        pool.workers = [w1, w2]
+        with terminal.capture():
+            pool.quit_all()
+        w1.quit.assert_called_once()
+        w2.quit.assert_called_once()
+        self.assertEqual(len(pool.workers), 0)
+
+    def test_quit_all_with_error(self):
+        """Test quit_all when a worker raises BossError"""
+        pool = boss.WorkerPool([])
+        w1 = mock.Mock(spec=boss.RemoteWorker)
+        w1.quit.side_effect = boss.BossError('connection lost')
+        pool.workers = [w1]
+        with terminal.capture():
+            pool.quit_all()
+        w1.close.assert_called_once()
+        self.assertEqual(len(pool.workers), 0)
+
+    def test_build_boards_empty(self):
+        """Test build_boards with no workers or boards"""
+        pool = boss.WorkerPool([])
+        with terminal.capture():
+            pool.build_boards({}, None, mock.Mock())  # Should not raise
+
+
+class TestBuildBoards(unittest.TestCase):
+    """Test WorkerPool.build_boards() end-to-end"""
+
+    def setUp(self):
+        self._tmpdir = tempfile.mkdtemp()
+
+    def tearDown(self):
+        shutil.rmtree(self._tmpdir, ignore_errors=True)
+
+    @staticmethod
+    def _make_worker(hostname, toolchains, responses):
+        """Create a mock RemoteWorker with canned responses
+
+        Args:
+            hostname (str): Hostname for the worker
+            toolchains (dict): arch -> gcc path
+            responses (list of dict): Responses to return from recv()
+
+        Returns:
+            Mock: Mock RemoteWorker
+        """
+        wrk = mock.Mock(spec=boss.RemoteWorker)
+        wrk.hostname = hostname
+        wrk.name = hostname
+        wrk.toolchains = toolchains
+        wrk.nthreads = 4
+        wrk.max_boards = 4
+        wrk.bogomips = 5000.0
+        wrk.slots = 4
+        wrk.recv.side_effect = list(responses)
+        return wrk
+
+    def _demand_responses(self, *results):
+        """Build recv responses for the demand-driven protocol
+
+        Returns a list starting with build_prepare_done, followed by
+        the given results, and ending with build_done.
+        """
+        return ([{'resp': 'build_prepare_done'}] + list(results)
+                + [{'resp': 'build_done'}])
+
+    def test_build_boards_success(self):
+        """Test building boards across workers with correct results"""
+        wrk1 = self._make_worker('host1', {'arm': '/usr/bin/arm-gcc'},
+            self._demand_responses(_make_result('rpi')))
+        wrk2 = self._make_worker('host2',
+            {'riscv': '/usr/bin/riscv64-gcc'},
+            self._demand_responses(_make_result('odroid')))
+
+        pool = boss.WorkerPool([])
+        pool.workers = [wrk1, wrk2]
+
+        boards = {
+            'rpi': FakeBoard('rpi', 'arm'),
+            'odroid': FakeBoard('odroid', 'riscv'),
+        }
+        builder = _make_builder(self._tmpdir)
+
+        with terminal.capture():
+            pool.build_boards(boards, None, builder)
+
+        # Both workers should have received build_prepare
+        wrk1.build_prepare.assert_called_once()
+        wrk2.build_prepare.assert_called_once()
+
+        # Verify done files were written
+        for board in ['rpi', 'odroid']:
+            done_path = os.path.join(self._tmpdir, board, 'done')
+            self.assertTrue(os.path.exists(done_path))
+
+    def test_board_stays_on_same_worker(self):
+        """Test that all commits for a board go to the same worker"""
+        commits = [types.SimpleNamespace(hash=f'abc{i}') for i in range(3)]
+
+        # Two workers with different archs, each gets one board
+        wrk1 = self._make_worker('host1', {'arm': '/usr/bin/arm-gcc'},
+            self._demand_responses(
+                *[_make_result('rpi', commit_upto=i)
+                  for i in range(3)]))
+        wrk2 = self._make_worker('host2',
+            {'riscv': '/usr/bin/riscv64-gcc'},
+            self._demand_responses(
+                *[_make_result('odroid', commit_upto=i)
+                  for i in range(3)]))
+
+        pool = boss.WorkerPool([])
+        pool.workers = [wrk1, wrk2]
+
+        boards = {
+            'rpi': FakeBoard('rpi', 'arm'),
+            'odroid': FakeBoard('odroid', 'riscv'),
+        }
+        builder = _make_builder(self._tmpdir)
+
+        with terminal.capture():
+            pool.build_boards(boards, commits, builder)
+
+        # Each worker gets one board via build_board
+        wrk1.build_board.assert_called_once()
+        wrk2.build_board.assert_called_once()
+
+        # Check each worker got the right board
+        self.assertEqual(wrk1.build_board.call_args[0][0], 'rpi')
+        self.assertEqual(wrk2.build_board.call_args[0][0], 'odroid')
+
+    def test_arch_passed(self):
+        """Test that the board's arch is sent to the worker"""
+        wrk = self._make_worker(
+            'host1',
+            {'arm': '/usr/bin/arm-linux-gnueabihf-gcc'},
+            self._demand_responses(_make_result('rpi')))
+
+        pool = boss.WorkerPool([])
+        pool.workers = [wrk]
+
+        boards = {'rpi': FakeBoard('rpi', 'arm')}
+        builder = _make_builder(self._tmpdir)
+
+        with terminal.capture():
+            pool.build_boards(boards, None, builder)
+
+        wrk.build_board.assert_called_once()
+        # build_board(board, arch)
+        self.assertEqual(wrk.build_board.call_args[0][1], 'arm')
+
+    def test_worker_error_response(self):
+        """Test that error responses are caught and stop the worker"""
+        wrk = self._make_worker(
+            'host1', {'arm': '/usr/bin/arm-gcc'}, [
+                {'resp': 'error', 'msg': 'no work directory set up'},
+            ])
+
+        pool = boss.WorkerPool([])
+        pool.workers = [wrk]
+
+        boards = {
+            'rpi': FakeBoard('rpi', 'arm'),
+            'odroid': FakeBoard('odroid', 'arm'),
+        }
+        builder = _make_builder(self._tmpdir)
+
+        with terminal.capture():
+            pool.build_boards(boards, None, builder)
+
+        # build_prepare sent, error on first recv stops the worker
+        self.assertTrue(wrk.build_prepare.called)
+
+    def test_boss_error_stops_worker(self):
+        """Test that BossError from recv() stops the worker"""
+        wrk = self._make_worker(
+            'host1', {'arm': '/usr/bin/arm-gcc'}, [])
+        wrk.recv.side_effect = boss.BossError('connection lost')
+
+        pool = boss.WorkerPool([])
+        pool.workers = [wrk]
+
+        boards = {
+            'rpi': FakeBoard('rpi', 'arm'),
+            'odroid': FakeBoard('odroid', 'arm'),
+        }
+        builder = _make_builder(self._tmpdir)
+
+        with terminal.capture():
+            pool.build_boards(boards, None, builder)
+
+        # build_prepare sent, but only one recv attempted before error
+        self.assertTrue(wrk.build_prepare.called)
+        self.assertEqual(wrk.recv.call_count, 1)
+
+    def test_toolchain_matching(self):
+        """Test boards only go to workers with the right toolchain"""
+        wrk_arm = self._make_worker(
+            'arm-host', {'arm': '/usr/bin/arm-gcc'},
+            self._demand_responses(
+                _make_result('rpi'),
+                _make_result('odroid'),
+            ))
+        wrk_riscv = self._make_worker(
+            'rv-host', {'riscv': '/usr/bin/riscv64-gcc'},
+            self._demand_responses(
+                _make_result('qemu_rv'),
+            ))
+
+        pool = boss.WorkerPool([])
+        pool.workers = [wrk_arm, wrk_riscv]
+
+        boards = {
+            'rpi': FakeBoard('rpi', 'arm'),
+            'odroid': FakeBoard('odroid', 'arm'),
+            'qemu_rv': FakeBoard('qemu_rv', 'riscv'),
+        }
+        builder = _make_builder(self._tmpdir)
+
+        with terminal.capture():
+            pool.build_boards(boards, None, builder)
+
+        # arm boards go to wrk_arm, riscv to wrk_riscv
+        arm_boards = {call[0][0]
+                      for call in wrk_arm.build_board.call_args_list}
+        rv_boards = {call[0][0]
+                     for call in wrk_riscv.build_board.call_args_list}
+        self.assertEqual(arm_boards, {'rpi', 'odroid'})
+        self.assertEqual(rv_boards, {'qemu_rv'})
+
+    def test_sandbox_any_worker(self):
+        """Test that sandbox boards can go to any worker"""
+        wrk = self._make_worker(
+            'host1', {'arm': '/usr/bin/arm-gcc'},
+            self._demand_responses(
+                _make_result('sandbox'),
+            ))
+
+        pool = boss.WorkerPool([])
+        pool.workers = [wrk]
+
+        boards = {'sandbox': FakeBoard('sandbox', 'sandbox')}
+        builder = _make_builder(self._tmpdir)
+
+        with terminal.capture():
+            pool.build_boards(boards, None, builder)
+
+        # sandbox should be sent to the worker even though it has
+        # no 'sandbox' toolchain — sandbox uses the host compiler
+        wrk.build_board.assert_called_once()
+        self.assertEqual(wrk.build_board.call_args[0][1], 'sandbox')
+
+    def test_skip_done_boards(self):
+        """Test that already-done boards are skipped without force"""
+        # Create a done file for 'rpi'
+        done_path = os.path.join(self._tmpdir, 'rpi_done')
+        tools.write_file(done_path, '0\n', binary=False)
+
+        wrk = self._make_worker(
+            'host1', {'arm': '/usr/bin/arm-gcc'},
+            self._demand_responses(
+                _make_result('odroid'),
+            ))
+
+        pool = boss.WorkerPool([])
+        pool.workers = [wrk]
+
+        boards = {
+            'rpi': FakeBoard('rpi', 'arm'),
+            'odroid': FakeBoard('odroid', 'arm'),
+        }
+        builder = _make_builder(self._tmpdir, force_build=False)
+        builder.get_done_file.side_effect = (
+            lambda c, b: os.path.join(self._tmpdir, f'{b}_done'))
+
+        with terminal.capture():
+            pool.build_boards(boards, None, builder)
+
+        # Only odroid should be built (rpi has done file)
+        wrk.build_board.assert_called_once()
+        self.assertEqual(wrk.build_board.call_args[0][0], 'odroid')
+
+    def test_no_capable_worker(self):
+        """Test boards with no capable worker are silently skipped"""
+        wrk = self._make_worker(
+            'host1', {'arm': '/usr/bin/arm-gcc'},
+            [{'resp': 'build_prepare_done'}])
+
+        pool = boss.WorkerPool([])
+        pool.workers = [wrk]
+
+        boards = {'qemu_rv': FakeBoard('qemu_rv', 'riscv')}
+        builder = _make_builder(self._tmpdir)
+
+        with terminal.capture():
+            pool.build_boards(boards, None, builder)
+
+        # No build_board should be sent (no riscv worker)
+        wrk.build_board.assert_not_called()
+
+    def test_progress_updated(self):
+        """Test that process_result is called for each build result"""
+        wrk = self._make_worker(
+            'host1', {'arm': '/usr/bin/arm-gcc'},
+            self._demand_responses(
+                _make_result('rpi'),
+                _make_result('odroid'),
+            ))
+
+        pool = boss.WorkerPool([])
+        pool.workers = [wrk]
+
+        brd_rpi = FakeBoard('rpi', 'arm')
+        brd_odroid = FakeBoard('odroid', 'arm')
+        boards = {'rpi': brd_rpi, 'odroid': brd_odroid}
+        builder = _make_builder(self._tmpdir)
+
+        with terminal.capture():
+            pool.build_boards(boards, None, builder)
+
+        # process_result should be called for each board
+        self.assertEqual(builder.process_result.call_count, 2)
+        # Each result should have remote set to hostname
+        for call in builder.process_result.call_args_list:
+            result = call[0][0]
+            self.assertEqual(result.remote, 'host1')
+
+    def test_log_files_created(self):
+        """Test that worker log files are created in the output dir"""
+        wrk = self._make_worker(
+            'myhost', {'arm': '/usr/bin/arm-gcc'},
+            self._demand_responses(
+                _make_result('rpi'),
+            ))
+
+        pool = boss.WorkerPool([])
+        pool.workers = [wrk]
+
+        boards = {'rpi': FakeBoard('rpi', 'arm')}
+        builder = _make_builder(self._tmpdir)
+
+        with terminal.capture():
+            pool.build_boards(boards, None, builder)
+
+        log_path = os.path.join(self._tmpdir, 'worker-myhost.log')
+        self.assertTrue(os.path.exists(log_path))
+        content = tools.read_file(log_path, binary=False)
+        self.assertIn('>> 1 boards', content)
+        self.assertIn('<< ', content)
+        self.assertIn('build_result', content)
+
+    def test_heartbeat_resets_timeout(self):
+        """Test that heartbeat messages are accepted without error"""
+        wrk = self._make_worker(
+            'host1', {'arm': '/usr/bin/arm-gcc'}, [
+                {'resp': 'build_prepare_done'},
+                {'resp': 'heartbeat', 'board': 'rpi', 'thread': 0},
+                _make_result('rpi'),
+                {'resp': 'build_done'},
+            ])
+
+        pool = boss.WorkerPool([])
+        pool.workers = [wrk]
+
+        boards = {'rpi': FakeBoard('rpi', 'arm')}
+        builder = _make_builder(self._tmpdir)
+
+        with terminal.capture():
+            pool.build_boards(boards, None, builder)
+
+        # The heartbeat should be silently consumed, result processed
+        self.assertEqual(builder.process_result.call_count, 1)
+
+    def test_build_done_stops_worker(self):
+        """Test that build_done ends collection without timeout"""
+        wrk = self._make_worker(
+            'host1', {'arm': '/usr/bin/arm-gcc'}, [
+                {'resp': 'build_prepare_done'},
+                _make_result('rpi'),
+                # Worker had 2 boards but only 1 result, then
+                # build_done
+                {'resp': 'build_done', 'exceptions': 1},
+                # Final build_done response after boss sends
+                # build_done
+                {'resp': 'build_done'},
+            ])
+
+        pool = boss.WorkerPool([])
+        pool.workers = [wrk]
+
+        boards = {
+            'rpi': FakeBoard('rpi', 'arm'),
+            'odroid': FakeBoard('odroid', 'arm'),
+        }
+        builder = _make_builder(self._tmpdir)
+
+        with terminal.capture():
+            pool.build_boards(boards, None, builder)
+
+        # Only 1 result processed (odroid was lost to a thread
+        # exception)
+        self.assertEqual(builder.process_result.call_count, 1)
+
+
+class TestPipelinedBuilds(unittest.TestCase):
+    """Test pipelined builds with multiple slots"""
+
+    def setUp(self):
+        self._tmpdir = tempfile.mkdtemp()
+
+    def tearDown(self):
+        shutil.rmtree(self._tmpdir, ignore_errors=True)
+
+    def test_multiple_boards(self):
+        """Test that boss sends build_prepare then build_board for each"""
+        wrk = TestBuildBoards._make_worker(
+            'host1', {'arm': '/usr/bin/arm-gcc'}, [
+                {'resp': 'build_prepare_done'},
+                _make_result('b1'),
+                _make_result('b2'),
+                _make_result('b3'),
+                _make_result('b4'),
+                {'resp': 'build_done'},
+            ])
+
+        pool = boss.WorkerPool([])
+        pool.workers = [wrk]
+
+        boards = {
+            'b1': FakeBoard('b1', 'arm'),
+            'b2': FakeBoard('b2', 'arm'),
+            'b3': FakeBoard('b3', 'arm'),
+            'b4': FakeBoard('b4', 'arm'),
+        }
+        builder = _make_builder(self._tmpdir)
+        with terminal.capture():
+            pool.build_boards(boards, None, builder)
+
+        # build_prepare called once, build_board called 4 times
+        wrk.build_prepare.assert_called_once()
+        sent_boards = {call[0][0]
+                       for call in wrk.build_board.call_args_list}
+        self.assertEqual(sent_boards, {'b1', 'b2', 'b3', 'b4'})
+        # All 4 results collected
+        self.assertEqual(builder.process_result.call_count, 4)
+
+    @mock.patch('subprocess.Popen')
+    def test_slots_from_ready(self, mock_popen):
+        """Test that slots is read from the worker's ready response"""
+        proc = FakeProc([
+            _make_response(
+                {'resp': 'ready', 'nthreads': 20, 'slots': 5}),
+        ])
+        w = _start_worker('host1', mock_popen, proc)
+        self.assertEqual(w.nthreads, 20)
+        self.assertEqual(w.slots, 5)
+
+    @mock.patch('subprocess.Popen')
+    def test_slots_default(self, mock_popen):
+        """Test that slots defaults to 1 for old workers"""
+        proc = FakeProc([
+            _make_response({'resp': 'ready', 'nthreads': 8}),
+        ])
+        w = _start_worker('host1', mock_popen, proc)
+        self.assertEqual(w.slots, 1)
+
+
+class TestBuildTimeout(unittest.TestCase):
+    """Test that the build timeout prevents hangs"""
+
+    def setUp(self):
+        self._tmpdir = tempfile.mkdtemp()
+        self._orig_timeout = boss.BUILD_TIMEOUT
+
+    def tearDown(self):
+        shutil.rmtree(self._tmpdir, ignore_errors=True)
+        boss.BUILD_TIMEOUT = self._orig_timeout
+
+    def test_recv_timeout(self):
+        """Test that a hung worker times out instead of blocking"""
+
+        # Use a very short timeout so the test runs quickly
+        boss.BUILD_TIMEOUT = 0.5
+
+        wrk = TestBuildBoards._make_worker(
+            'slowhost', {'arm': '/usr/bin/arm-gcc'}, [])
+        wrk.nthreads = 1
+        wrk.max_boards = 1
+        wrk.slots = 1
+
+        # Simulate a worker that never responds: recv blocks forever
+        wrk.recv.side_effect = lambda: time.sleep(60)
+
+        pool = boss.WorkerPool([])
+        pool.workers = [wrk]
+
+        boards = {'rpi': FakeBoard('rpi', 'arm')}
+        builder = _make_builder(self._tmpdir)
+
+        start = time.monotonic()
+        with terminal.capture():
+            pool.build_boards(boards, None, builder)
+        elapsed = time.monotonic() - start
+
+        # Should complete quickly (within a few seconds), not hang
+        self.assertLess(elapsed, 10)
+
+        # No results should have been processed
+        builder.process_result.assert_not_called()
+
+
+class TestMachineMaxBoards(unittest.TestCase):
+    """Test per-machine max_boards config"""
+
+    def test_max_boards_from_config(self):
+        """Test [machine:name] section sets max_boards"""
+        bsettings.setup('')
+        bsettings.settings.read_string("""
+[machines]
+ruru
+weka
+
+[machine:ruru]
+max_boards = 64
+""")
+        pool = machine.MachinePool()
+        pool._load_from_config()
+        by_name = {m.name: m for m in pool.machines}
+        self.assertEqual(by_name['ruru'].max_boards, 64)
+        self.assertEqual(by_name['weka'].max_boards, 0)
+
+    def test_max_boards_default(self):
+        """Test max_boards is 0 when no per-machine section exists"""
+        mach = machine.Machine('host1')
+        self.assertEqual(mach.max_boards, 0)
+
+
+class TestGccVersion(unittest.TestCase):
+    """Test gcc_version()"""
+
+    def test_buildman_toolchain(self):
+        """Test extracting version from a buildman-fetched toolchain"""
+        path = ('/home/sglass/.buildman-toolchains/gcc-13.1.0-nolibc/'
+                'aarch64-linux/bin/aarch64-linux-gcc')
+        self.assertEqual(machine.gcc_version(path), 'gcc-13.1.0-nolibc')
+
+    def test_different_version(self):
+        """Test a different gcc version"""
+        path = ('/home/sglass/.buildman-toolchains/gcc-11.1.0-nolibc/'
+                'aarch64-linux/bin/aarch64-linux-gcc')
+        self.assertEqual(machine.gcc_version(path), 'gcc-11.1.0-nolibc')
+
+    def test_system_gcc(self):
+        """Test a system gcc with no version directory"""
+        self.assertIsNone(machine.gcc_version('/usr/bin/gcc'))
+
+    def test_empty(self):
+        """Test an empty path"""
+        self.assertIsNone(machine.gcc_version(''))
+
+
+
+class _PipelineWorker:  # pylint: disable=R0902
+    """Mock worker that simulates the demand-driven build protocol
+
+    Responds to build_prepare, build_board, and build_done commands
+    via the recv() queue, simulating real worker behaviour.
+    """
+
+    def __init__(self, nthreads, max_boards=0):
+        self.hostname = 'sim-host'
+        self.name = 'sim-host'
+        self.toolchains = {'arm': '/usr/bin/arm-gcc'}
+        self.nthreads = nthreads
+        self.max_boards = max_boards or nthreads
+        self.slots = nthreads
+        self.bogomips = 5000.0
+        self.closing = False
+
+        self._ready = queue.Queue()
+        self._commits = None
+
+    def build_prepare(self, commits):
+        """Accept prepare command and queue ready response"""
+        self._commits = commits
+        self._ready.put({'resp': 'build_prepare_done'})
+
+    def build_board(self, board, _arch):
+        """Queue results for this board across all commits"""
+
+        def _produce():
+            for cu in range(len(self._commits)):
+                time.sleep(random.uniform(0.001, 0.005))
+                self._ready.put({
+                    'resp': 'build_result',
+                    'board': board,
+                    'commit_upto': cu,
+                    'return_code': 0,
+                    'stderr': '',
+                    'stdout': '',
+                })
+
+        threading.Thread(target=_produce, daemon=True).start()
+
+    def build_done(self):
+        """Queue the build_done response after a short delay"""
+        def _respond():
+            time.sleep(0.05)
+            self._ready.put({'resp': 'build_done'})
+        threading.Thread(target=_respond, daemon=True).start()
+
+    def recv(self):
+        """Wait for the next result"""
+        return self._ready.get()
+
+
+class TestBuildBoardsUtilisation(unittest.TestCase):
+    """Test that build_boards dispatches correctly to workers
+
+    The boss sends one build_boards command per worker.  The mock
+    worker simulates board-first scheduling and produces results.
+    """
+
+    def setUp(self):
+        self._tmpdir = tempfile.mkdtemp()
+
+    def tearDown(self):
+        shutil.rmtree(self._tmpdir, ignore_errors=True)
+
+    def _run_build(self, nboards, ncommits, nthreads, max_boards=0):
+        """Run a build and return the mock worker"""
+        wrk = _PipelineWorker(nthreads, max_boards=max_boards)
+
+        pool = boss.WorkerPool([])
+        pool.workers = [wrk]
+
+        boards = {}
+        for i in range(nboards):
+            target = f'board{i}'
+            boards[target] = FakeBoard(target, 'arm')
+
+        commits = [types.SimpleNamespace(hash=f'commit{i}')
+                   for i in range(ncommits)]
+
+        builder = mock.Mock()
+        builder.force_build = True
+        builder.base_dir = self._tmpdir
+        builder.count = 0
+        builder.get_build_dir.side_effect = (
+            lambda c, b: os.path.join(self._tmpdir, b))
+
+        with terminal.capture():
+            pool.build_boards(boards, commits, builder)
+        return wrk
+
+    def test_all_results_collected(self):
+        """Verify boss collects all board x commit results"""
+        nboards = 30
+        ncommits = 10
+        wrk = self._run_build(nboards, ncommits, nthreads=8)
+
+        # The ready queue should be empty (boss drained everything)
+        self.assertTrue(wrk._ready.empty())
+
+    def test_large_scale(self):
+        """Test at realistic scale: 200 boards x 10 commits"""
+        wrk = self._run_build(nboards=200, ncommits=10,
+                              nthreads=32)
+        self.assertTrue(wrk._ready.empty())
+
+    def test_max_boards_caps_batch(self):
+        """Test that max_boards limits initial and in-flight boards"""
+        wrk = self._run_build(nboards=30, ncommits=5,
+                              nthreads=32, max_boards=8)
+        self.assertTrue(wrk._ready.empty())
+        # Worker should still receive all boards despite the cap
+        self.assertEqual(wrk.max_boards, 8)
+
+
+class TestFormatBytes(unittest.TestCase):
+    """Test _format_bytes()"""
+
+    def test_format_bytes(self):
+        """Test bytes, KB and MB ranges"""
+        self.assertEqual(boss._format_bytes(0), '0B')
+        self.assertEqual(boss._format_bytes(1023), '1023B')
+        self.assertEqual(boss._format_bytes(1024), '1.0KB')
+        self.assertEqual(boss._format_bytes(1536), '1.5KB')
+        self.assertEqual(boss._format_bytes(1024 * 1024), '1.0MB')
+        self.assertEqual(boss._format_bytes(5 * 1024 * 1024), '5.0MB')
+
+
+# Tests merged into TestWriteRemoteResult above
+
+    def test_with_stderr(self):
+        """Test writing result with stderr"""
+        bldr = mock.Mock()
+        bldr.get_build_dir.return_value = tempfile.mkdtemp()
+        resp = {
+            'board': 'sandbox',
+            'commit_upto': 0,
+            'return_code': 2,
+            'stderr': 'error: missing header\n',
+            'stdout': '',
+        }
+        boss._write_remote_result(bldr, resp, {'sandbox': mock.Mock()},
+                                  'host1')
+        build_dir = bldr.get_build_dir.return_value
+        err_path = os.path.join(build_dir, 'err')
+        self.assertIn('error:',
+                       tools.read_file(err_path, binary=False))
+
+        shutil.rmtree(build_dir)
+
+    def test_removes_stale_err(self):
+        """Test that stale err file is removed on success"""
+        bldr = mock.Mock()
+        build_dir = tempfile.mkdtemp()
+        bldr.get_build_dir.return_value = build_dir
+        err_path = os.path.join(build_dir, 'err')
+        tools.write_file(err_path, 'old error', binary=False)
+        resp = {
+            'board': 'sandbox',
+            'commit_upto': 0,
+            'return_code': 0,
+            'stderr': '',
+            'stdout': '',
+        }
+        boss._write_remote_result(bldr, resp, {'sandbox': mock.Mock()},
+                                  'host1')
+        self.assertFalse(os.path.exists(err_path))
+
+        shutil.rmtree(build_dir)
+
+
+class TestRemoteWorkerMethods(unittest.TestCase):
+    """Test RemoteWorker send/recv/close methods"""
+
+    def test_send(self):
+        """Test _send writes JSON to stdin"""
+        wrk = _make_worker()
+        wrk._proc.stdin = mock.Mock()
+        wrk._send({'cmd': 'quit'})
+        wrk._proc.stdin.write.assert_called_once()
+        data = wrk._proc.stdin.write.call_args[0][0]
+        self.assertIn(b'quit', data)
+
+    def test_send_broken_pipe(self):
+        """Test _send raises BrokenPipeError on broken pipe"""
+        wrk = _make_worker()
+        wrk._proc.stdin.write.side_effect = BrokenPipeError()
+        with self.assertRaises(BrokenPipeError):
+            wrk._send({'cmd': 'quit'})
+
+    def test_build_commands(self):
+        """Test build_prepare, build_board and build_done commands"""
+        wrk = _make_worker()
+        wrk._send = mock.Mock()
+
+        wrk.build_prepare(['abc123'])
+        self.assertEqual(wrk._send.call_args[0][0]['cmd'],
+                         'build_prepare')
+
+        wrk._send.reset_mock()
+        wrk.build_board('sandbox', 'sandbox')
+        self.assertEqual(wrk._send.call_args[0][0]['cmd'],
+                         'build_board')
+
+        wrk._send.reset_mock()
+        wrk.build_done()
+        self.assertEqual(wrk._send.call_args[0][0]['cmd'],
+                         'build_done')
+
+    def test_quit(self):
+        """Test quit sends command and closes, handles errors"""
+        wrk = _make_worker()
+        wrk._send = mock.Mock()
+        wrk._recv = mock.Mock(return_value={'resp': 'quit_ack'})
+        wrk.close = mock.Mock()
+        wrk.quit()
+        wrk._send.assert_called_once()
+        wrk.close.assert_called_once()
+
+        # Error path: BossError during quit still closes
+        wrk2 = _make_worker()
+        wrk2._send = mock.Mock(side_effect=boss.BossError('gone'))
+        wrk2.close = mock.Mock()
+        wrk2.quit()
+        wrk2.close.assert_called_once()
+
+    def test_close_idempotent(self):
+        """Test close can be called multiple times"""
+        wrk = _make_worker()
+        wrk.close()
+        self.assertIsNone(wrk._proc)
+        wrk.close()  # should not raise
+
+    @mock.patch('buildman.boss._run_ssh')
+    def test_remove_lock(self, mock_ssh):
+        """Test remove_lock: SSH call, no work_dir, SSH failure"""
+        wrk = _make_worker()
+        wrk.work_dir = '/tmp/bm'
+        wrk.remove_lock()
+        mock_ssh.assert_called_once()
+
+        # No work_dir: does nothing
+        wrk2 = _make_worker()
+        wrk2.work_dir = ''
+        wrk2.remove_lock()
+
+        # SSH failure: silently ignored
+        mock_ssh.side_effect = boss.BossError('gone')
+        wrk3 = _make_worker()
+        wrk3.work_dir = '/tmp/bm'
+        wrk3.remove_lock()
+
+
+
+
+
+class TestWorkerPoolCapacity(unittest.TestCase):
+    """Test WorkerPool capacity and arch assignment"""
+
+    def test_get_capacity(self):
+        """Test capacity calculation"""
+        wrk = mock.Mock(nthreads=8, bogomips=5000.0)
+        self.assertEqual(
+            boss.WorkerPool._get_capacity(wrk), 40000.0)
+
+    def test_get_capacity_no_bogomips(self):
+        """Test capacity with no bogomips falls back to nthreads"""
+        wrk = mock.Mock(nthreads=4, bogomips=0)
+        self.assertEqual(boss.WorkerPool._get_capacity(wrk), 4.0)
+
+    def test_get_worker_for_arch(self):
+        """Test arch-based worker selection"""
+        w1 = mock.Mock(nthreads=8, bogomips=5000.0,
+                       toolchains={'arm': '/gcc'})
+        w2 = mock.Mock(nthreads=4, bogomips=5000.0,
+                       toolchains={'arm': '/gcc'})
+        pool = boss.WorkerPool.__new__(boss.WorkerPool)
+        pool.workers = [w1, w2]
+        assigned = {}
+
+        # First assignment should go to w1 (higher capacity)
+        wrk = pool._get_worker_for_arch('arm', assigned)
+        self.assertEqual(wrk, w1)
+
+        # Second should go to w2 (w1 already has 1)
+        wrk = pool._get_worker_for_arch('arm', assigned)
+        self.assertEqual(wrk, w2)
+
+    def test_get_worker_sandbox(self):
+        """Test sandbox goes to any worker"""
+        w1 = mock.Mock(nthreads=4, bogomips=1000.0, toolchains={})
+        pool = boss.WorkerPool.__new__(boss.WorkerPool)
+        pool.workers = [w1]
+        assigned = {}
+        wrk = pool._get_worker_for_arch('sandbox', assigned)
+        self.assertEqual(wrk, w1)
+
+    def test_get_worker_no_capable(self):
+        """Test returns None when no worker supports arch"""
+        w1 = mock.Mock(nthreads=4, bogomips=1000.0,
+                       toolchains={'arm': '/gcc'})
+        pool = boss.WorkerPool.__new__(boss.WorkerPool)
+        pool.workers = [w1]
+        self.assertIsNone(
+            pool._get_worker_for_arch('mips', {}))
+
+
+class TestBossLog(unittest.TestCase):
+    """Test _BossLog"""
+
+    def test_log_and_close(self):
+        """Test logging and closing"""
+        with tempfile.TemporaryDirectory() as tmpdir:
+            blog = boss._BossLog(tmpdir)
+            wrk = mock.Mock(name='host1', nthreads=4)
+            wrk.name = 'host1'
+            blog.init_worker(wrk)
+            blog.log('test message')
+            blog.record_sent('host1', 3)
+            blog.record_recv('host1', load_avg=2.5)
+            blog.close()
+
+            log_path = os.path.join(tmpdir, '.buildman.log')
+            content = tools.read_file(log_path, binary=False)
+            self.assertIn('test message', content)
+
+    def test_log_status(self):
+        """Test log_status writes counts"""
+        with tempfile.TemporaryDirectory() as tmpdir:
+            blog = boss._BossLog(tmpdir)
+            wrk = mock.Mock(name='host1', nthreads=4)
+            wrk.name = 'host1'
+            blog.init_worker(wrk)
+            blog.record_sent('host1', 5)
+            blog.record_recv('host1')
+            blog.record_recv('host1')
+            blog.log_status()
+            blog.close()
+
+            content = tools.read_file(
+                os.path.join(tmpdir, '.buildman.log'), binary=False)
+            self.assertIn('host1', content)
+
+    def test_start_timer(self):
+        """Test start_timer and close with elapsed"""
+        with tempfile.TemporaryDirectory() as tmpdir:
+            blog = boss._BossLog(tmpdir)
+            blog.start_timer()
+            blog.close()
+
+
+class TestDispatchContext(unittest.TestCase):
+    """Test _DispatchContext"""
+
+    def test_update_progress_build_started(self):
+        """Test worktree progress: build_started"""
+        wrk = mock.Mock(nthreads=4)
+        wrk.name = 'host1'
+        builder = mock.Mock()
+        with tempfile.TemporaryDirectory() as tmpdir:
+            builder.base_dir = tmpdir
+            ctx = boss._DispatchContext(
+                workers=[wrk], builder=builder,
+                board_selected={}, boss_log=mock.Mock())
+            resp = {'resp': 'build_started', 'num_threads': 8}
+            self.assertTrue(ctx.update_progress(resp, wrk))
+            ctx.close()
+
+    def test_update_progress_worktree_created(self):
+        """Test worktree progress: worktree_created"""
+        wrk = mock.Mock(nthreads=2)
+        wrk.name = 'host1'
+        builder = mock.Mock()
+        with tempfile.TemporaryDirectory() as tmpdir:
+            builder.base_dir = tmpdir
+            ctx = boss._DispatchContext(
+                workers=[wrk], builder=builder,
+                board_selected={}, boss_log=mock.Mock())
+            ctx.update_progress(
+                {'resp': 'build_started', 'num_threads': 2}, wrk)
+            self.assertTrue(ctx.update_progress(
+                {'resp': 'worktree_created'}, wrk))
+            ctx.close()
+
+    def test_update_progress_other(self):
+        """Test non-progress messages return False"""
+        wrk = mock.Mock(nthreads=4)
+        wrk.name = 'host1'
+        builder = mock.Mock()
+        with tempfile.TemporaryDirectory() as tmpdir:
+            builder.base_dir = tmpdir
+            ctx = boss._DispatchContext(
+                workers=[wrk], builder=builder,
+                board_selected={}, boss_log=mock.Mock())
+            self.assertFalse(ctx.update_progress(
+                {'resp': 'build_result'}, wrk))
+            ctx.close()
+
+    def test_log(self):
+        """Test per-worker log file"""
+        wrk = mock.Mock(nthreads=4)
+        wrk.name = 'host1'
+        builder = mock.Mock()
+        with tempfile.TemporaryDirectory() as tmpdir:
+            builder.base_dir = tmpdir
+            ctx = boss._DispatchContext(
+                workers=[wrk], builder=builder,
+                board_selected={}, boss_log=mock.Mock())
+            ctx.log(wrk, '>>>', 'test message')
+            ctx.close()
+            content = tools.read_file(
+                os.path.join(tmpdir, 'worker-host1.log'),
+                binary=False)
+            self.assertIn('test message', content)
+
+
+class TestRemoteWorkerClose(unittest.TestCase):
+    """Test RemoteWorker.close() error paths"""
+
+    def test_close_error_paths(self):
+        """Test close: stdin OSError, terminate timeout, kill timeout"""
+        # stdin.close() raises OSError
+        wrk = _make_worker()
+        wrk._proc.stdin.close.side_effect = OSError('broken')
+        wrk.close()
+        self.assertIsNone(wrk._proc)
+
+        # wait() times out, terminate succeeds
+        wrk2 = _make_worker()
+        wrk2._proc.wait.side_effect = [
+            subprocess.TimeoutExpired('ssh', 2),
+            None,
+        ]
+        wrk2.close()
+        self.assertIsNone(wrk2._proc)
+
+        # wait() times out twice, falls back to kill
+        wrk3 = _make_worker()
+        wrk3._proc.wait.side_effect = [
+            subprocess.TimeoutExpired('ssh', 2),
+            subprocess.TimeoutExpired('ssh', 3),
+        ]
+        wrk3.close()
+        self.assertIsNone(wrk3._proc)
+
+    def test_configure_rejected(self):
+        """Test configure raises on rejection"""
+        wrk = _make_worker()
+        wrk._send = mock.Mock()
+        wrk._recv = mock.Mock(return_value={'resp': 'error', 'msg': 'bad'})
+        with self.assertRaises(boss.BossError):
+            wrk.configure({'no_lto': True})
+
+    def test_get_stderr(self):
+        """Test _get_stderr returns last non-empty line, or empty"""
+        wrk = _make_worker()
+        wrk._stderr_thread = mock.Mock()
+        wrk._stderr_lines = ['first', '', 'last error', '']
+        self.assertEqual(wrk._get_stderr(), 'last error')
+
+        wrk._stderr_lines = []
+        self.assertEqual(wrk._get_stderr(), '')
+
+
+# Tests merged into TestWriteRemoteResult above
+
+    def test_sizes_with_header(self):
+        """Test that size output header line is stripped"""
+        bldr = mock.Mock()
+        build_dir = tempfile.mkdtemp()
+        bldr.get_build_dir.return_value = build_dir
+        resp = {
+            'board': 'sandbox',
+            'commit_upto': 0,
+            'return_code': 0,
+            'stderr': '',
+            'stdout': '',
+            'sizes': {
+                'raw': ('   text    data     bss     dec     hex\n'
+                        '  1000     200     100    1300     514\n')},
+        }
+        boss._write_remote_result(bldr, resp, {'sandbox': mock.Mock()},
+                                  'host1')
+        sizes_content = tools.read_file(
+            os.path.join(build_dir, 'sizes'), binary=False)
+        self.assertNotIn('text', sizes_content)
+        self.assertIn('1000', sizes_content)
+
+        shutil.rmtree(build_dir)
+
+
+
+
+
+class TestWorkerPoolEdgeCases(unittest.TestCase):
+    """Test WorkerPool edge cases"""
+
+    def test_print_transfer_empty(self):
+        """Test print_transfer_summary with no workers"""
+        pool = boss.WorkerPool([])
+        with terminal.capture():
+            pool.print_transfer_summary()
+
+    def test_quit_all_with_boss_log(self):
+        """Test quit_all closes boss_log"""
+        pool = boss.WorkerPool([])
+        blog = mock.Mock()
+        pool._boss_log = blog
+        with terminal.capture():
+            pool.quit_all()
+        blog.log.assert_called()
+        blog.close.assert_called_once()
+
+    def test_build_boards_with_local_count(self):
+        """Test build_boards progress includes local count"""
+        pool = boss.WorkerPool([])
+        wrk = mock.Mock(
+            nthreads=4, bogomips=1000.0, slots=2,
+            toolchains={'arm': '/gcc'}, closing=False)
+        wrk.name = 'host1'
+        wrk.recv.side_effect = [
+            {'resp': 'build_prepare_done'},
+            {'resp': 'build_result', 'board': 'rpi',
+             'commit_upto': 0, 'return_code': 0,
+             'stderr': '', 'stdout': ''},
+            {'resp': 'build_done'},
+        ]
+        pool.workers = [wrk]
+
+        builder = mock.Mock()
+        builder.force_build = True
+        builder.base_dir = tempfile.mkdtemp()
+        builder.count = 0
+        builder.get_build_dir.return_value = tempfile.mkdtemp()
+
+        boards = {'rpi': mock.Mock(target='rpi', arch='arm')}
+        with terminal.capture():
+            pool.build_boards(boards, None, builder, local_count=5)
+
+        shutil.rmtree(builder.base_dir)
+        shutil.rmtree(builder.get_build_dir.return_value)
+
+    def test_get_capacity_no_bogomips(self):
+        """Test _get_worker_for_arch falls back when bogomips is 0"""
+        w1 = mock.Mock(nthreads=4, bogomips=0, toolchains={'arm': '/gcc'})
+        pool = boss.WorkerPool.__new__(boss.WorkerPool)
+        pool.workers = [w1]
+        wrk = pool._get_worker_for_arch('arm', {})
+        self.assertEqual(wrk, w1)
+
+    def test_close_all(self):
+        """Test close_all with boss_log"""
+        pool = boss.WorkerPool([])
+        wrk = mock.Mock()
+        wrk.closing = False
+        wrk.bytes_sent = 100
+        wrk.bytes_recv = 200
+        wrk.name = 'host1'
+        pool.workers = [wrk]
+        blog = mock.Mock()
+        pool._boss_log = blog
+        with terminal.capture():
+            pool.close_all()
+        wrk.close.assert_called()
+        wrk.remove_lock.assert_called()
+        blog.close.assert_called_once()
+        self.assertEqual(len(pool.workers), 0)
+
+
+class TestDispatchContextRecv(unittest.TestCase):
+    """Test _DispatchContext.recv() error paths"""
+
+
+    @mock.patch.object(boss, 'BUILD_TIMEOUT', 0.01)
+    def test_recv_timeout(self):
+        """Test recv returns None on timeout"""
+        ctx, wrk, tmpdir = _make_ctx()
+        recv_q = queue.Queue()  # empty queue
+        with terminal.capture():
+            result = ctx.recv(wrk, recv_q)
+        self.assertIsNone(result)
+        ctx.close()
+        shutil.rmtree(tmpdir)
+
+    def test_recv_error_response(self):
+        """Test recv returns None on error response"""
+        ctx, wrk, tmpdir = _make_ctx()
+        recv_q = queue.Queue()
+        recv_q.put(('error', 'something broke'))
+        with terminal.capture():
+            result = ctx.recv(wrk, recv_q)
+        self.assertIsNone(result)
+        ctx.close()
+        shutil.rmtree(tmpdir)
+
+    def test_recv_worker_error(self):
+        """Test recv returns None on worker error response"""
+        ctx, wrk, tmpdir = _make_ctx(
+            {'rpi': mock.Mock(target='rpi', arch='arm')})
+        recv_q = queue.Queue()
+        recv_q.put(('resp', {'resp': 'error', 'msg': 'oops'}))
+        with terminal.capture():
+            result = ctx.recv(wrk, recv_q)
+        self.assertIsNone(result)
+        ctx.close()
+        shutil.rmtree(tmpdir)
+
+    def test_write_result_exception(self):
+        """Test write_result catches exceptions"""
+        ctx, wrk, tmpdir = _make_ctx(
+            {'rpi': mock.Mock(target='rpi', arch='arm')})
+        ctx.builder.get_build_dir.side_effect = RuntimeError('boom')
+        resp = {'board': 'sandbox', 'commit_upto': 0,
+                'return_code': 0}
+        with terminal.capture():
+            result = ctx.write_result(wrk, resp)
+        self.assertFalse(result)
+        ctx.close()
+        shutil.rmtree(tmpdir)
+
+    def test_wait_for_prepare(self):
+        """Test wait_for_prepare with prepare_done"""
+        ctx, wrk, tmpdir = _make_ctx()
+        recv_q = queue.Queue()
+        recv_q.put(('resp', {'resp': 'build_prepare_done'}))
+        result = ctx.wait_for_prepare(wrk, recv_q)
+        self.assertTrue(result)
+        ctx.close()
+        shutil.rmtree(tmpdir)
+
+    @mock.patch.object(boss, 'BUILD_TIMEOUT', 0.01)
+    def test_wait_for_prepare_timeout(self):
+        """Test wait_for_prepare returns False on timeout"""
+        ctx, wrk, tmpdir = _make_ctx()
+        recv_q = queue.Queue()
+        with terminal.capture():
+            result = ctx.wait_for_prepare(wrk, recv_q)
+        self.assertFalse(result)
+        ctx.close()
+        shutil.rmtree(tmpdir)
+
+    def test_send_batch_error(self):
+        """Test send_batch returns -1 on BossError"""
+        ctx, wrk, tmpdir = _make_ctx(
+            {'rpi': mock.Mock(target='rpi', arch='arm')})
+        wrk.build_board.side_effect = boss.BossError('gone')
+        brd = mock.Mock(target='rpi', arch='arm')
+        result = ctx.send_batch(wrk, [brd])
+        self.assertEqual(result, -1)
+        ctx.close()
+        shutil.rmtree(tmpdir)
+
+    def test_collect_results_heartbeat(self):
+        """Test collect_results skips heartbeat"""
+        ctx, wrk, tmpdir = _make_ctx(
+            {'rpi': mock.Mock(target='rpi', arch='arm')})
+        ctx.board_selected = {'rpi': mock.Mock(target='rpi', arch='arm')}
+        build_dir = tempfile.mkdtemp()
+        ctx.builder.get_build_dir.return_value = build_dir
+        recv_q = queue.Queue()
+        recv_q.put(('resp', {'resp': 'heartbeat'}))
+        recv_q.put(('resp', {'resp': 'build_result', 'board': 'rpi',
+                             'commit_upto': 0, 'return_code': 0,
+                             'stderr': '', 'stdout': ''}))
+        recv_q.put(('resp', {'resp': 'build_done'}))
+        state = boss.DemandState(
+            sent=1, ncommits=1, grab_func=lambda w, c: [])
+        with terminal.capture():
+            result = ctx.collect_results(wrk, recv_q, state)
+        self.assertTrue(result)
+        ctx.close()
+        shutil.rmtree(tmpdir)
+        shutil.rmtree(build_dir)
+
+
+class TestForwardStderr(unittest.TestCase):
+    """Test RemoteWorker._forward_stderr()"""
+
+    def test_forward_stderr(self):
+        """Test stderr collection and OSError handling"""
+        wrk = boss.RemoteWorker.__new__(boss.RemoteWorker)
+        wrk.name = 'host1'
+        wrk._stderr_lines = []
+        wrk._proc = mock.Mock()
+        wrk._proc.stderr = [b'error line 1\n', b'error line 2\n', b'']
+        with terminal.capture():
+            wrk._forward_stderr()
+        self.assertEqual(wrk._stderr_lines,
+                         ['error line 1', 'error line 2'])
+
+        # OSError path: silently handled
+        wrk2 = boss.RemoteWorker.__new__(boss.RemoteWorker)
+        wrk2.name = 'host1'
+        wrk2._stderr_lines = []
+        wrk2._proc = mock.Mock()
+        wrk2._proc.stderr.__iter__ = mock.Mock(
+            side_effect=OSError('closed'))
+        wrk2._forward_stderr()
+
+
+class TestStartDebug(unittest.TestCase):
+    """Test RemoteWorker.start() debug flag (line 242)"""
+
+    @mock.patch('subprocess.Popen')
+    @mock.patch('buildman.boss._run_ssh')
+    def test_debug_flag(self, _mock_ssh, mock_popen):
+        """Test start() passes -D when debug=True"""
+        proc = mock.Mock()
+        proc.stdout.readline.return_value = (
+            b'BM> {"resp":"ready","nthreads":4,"slots":2}\n')
+        proc.poll.return_value = None
+        proc.stderr = []  # empty iterable for _forward_stderr thread
+        mock_popen.return_value = proc
+
+        wrk = boss.RemoteWorker.__new__(boss.RemoteWorker)
+        wrk.hostname = 'host1'
+        wrk.name = 'host1'
+        wrk._proc = None
+        wrk._closed = False
+        wrk._closing = False
+        wrk._stderr_lines = []
+        wrk._stderr_thread = None
+        wrk._ready = queue.Queue()
+        wrk._log = None
+        wrk.bytes_sent = 0
+        wrk.bytes_recv = 0
+        wrk.nthreads = 0
+        wrk.slots = 0
+        wrk.max_boards = 0
+        wrk.toolchains = {}
+        wrk.closing = False
+        wrk._work_dir = '/tmp/bm'
+        wrk._git_dir = '/tmp/bm/.git'
+        wrk.work_dir = '/tmp/bm'
+        wrk.timeout = 10
+
+        wrk.start(debug=True)
+        cmd = mock_popen.call_args[0][0]
+        self.assertIn('-D', ' '.join(cmd))
+
+
+class TestBossLogTimer(unittest.TestCase):
+    """Test _BossLog.start_timer() (lines 607-612)"""
+
+    def test_timer_ticks(self):
+        """Test that the timer fires and logs status"""
+        with tempfile.TemporaryDirectory() as tmpdir:
+            blog = boss._BossLog(tmpdir)
+            wrk = mock.Mock(nthreads=4)
+            wrk.name = 'host1'
+            blog.init_worker(wrk)
+            blog.record_sent('host1', 5)
+
+            # Patch STATUS_INTERVAL to fire quickly
+            with mock.patch.object(boss, 'STATUS_INTERVAL', 0.01):
+                blog.start_timer()
+                time.sleep(0.05)
+            blog.close()
+
+            content = tools.read_file(
+                os.path.join(tmpdir, '.buildman.log'), binary=False)
+            # Timer should have logged at least one status line
+            self.assertIn('host1', content)
+
+
+class TestWaitForPrepareProgress(unittest.TestCase):
+    """Test wait_for_prepare with progress and heartbeat messages"""
+
+
+    def test_heartbeat_during_prepare(self):
+        """Test heartbeat messages are skipped during prepare"""
+        ctx, wrk, tmpdir = _make_ctx()
+        recv_q = queue.Queue()
+        recv_q.put(('resp', {'resp': 'heartbeat'}))
+        recv_q.put(('resp', {'resp': 'build_prepare_done'}))
+        result = ctx.wait_for_prepare(wrk, recv_q)
+        self.assertTrue(result)
+        ctx.close()
+        shutil.rmtree(tmpdir)
+
+    def test_progress_during_prepare(self):
+        """Test worktree progress during prepare"""
+        ctx, wrk, tmpdir = _make_ctx()
+        recv_q = queue.Queue()
+        recv_q.put(('resp', {'resp': 'build_started', 'num_threads': 2}))
+        recv_q.put(('resp', {'resp': 'worktree_created'}))
+        recv_q.put(('resp', {'resp': 'build_prepare_done'}))
+        result = ctx.wait_for_prepare(wrk, recv_q)
+        self.assertTrue(result)
+        ctx.close()
+        shutil.rmtree(tmpdir)
+
+    def test_unexpected_during_prepare(self):
+        """Test unexpected response during prepare returns False"""
+        ctx, wrk, tmpdir = _make_ctx()
+        recv_q = queue.Queue()
+        recv_q.put(('resp', {'resp': 'something_weird'}))
+        result = ctx.wait_for_prepare(wrk, recv_q)
+        self.assertFalse(result)
+        ctx.close()
+        shutil.rmtree(tmpdir)
+
+
+class TestRecvOne(unittest.TestCase):
+    """Test _DispatchContext.recv_one()"""
+
+
+    @mock.patch.object(boss, 'BUILD_TIMEOUT', 0.01)
+    def test_recv_one_timeout(self):
+        """Test recv_one returns False on timeout"""
+        ctx, wrk, tmpdir = _make_ctx()
+        recv_q = queue.Queue()
+        with terminal.capture():
+            result = ctx.recv_one(wrk, recv_q)
+        self.assertFalse(result)
+        ctx.close()
+        shutil.rmtree(tmpdir)
+
+    def test_recv_one_build_done(self):
+        """Test recv_one handles build_done with exceptions"""
+        ctx, wrk, tmpdir = _make_ctx(
+            {'rpi': mock.Mock(target='rpi', arch='arm')})
+        recv_q = queue.Queue()
+        recv_q.put(('resp', {'resp': 'build_done', 'exceptions': 2}))
+        result = ctx.recv_one(wrk, recv_q)
+        self.assertFalse(result)
+        ctx.close()
+        shutil.rmtree(tmpdir)
+
+    def test_recv_one_build_result(self):
+        """Test recv_one processes build_result"""
+        ctx, wrk, tmpdir = _make_ctx(
+            {'rpi': mock.Mock(target='rpi', arch='arm')})
+        build_dir = tempfile.mkdtemp()
+        ctx.builder.get_build_dir.return_value = build_dir
+        recv_q = queue.Queue()
+        recv_q.put(('resp', {'resp': 'build_result', 'board': 'rpi',
+                             'commit_upto': 0, 'return_code': 0,
+                             'stderr': '', 'stdout': ''}))
+        with terminal.capture():
+            result = ctx.recv_one(wrk, recv_q)
+        self.assertTrue(result)
+        ctx.close()
+        shutil.rmtree(tmpdir)
+        shutil.rmtree(build_dir)
+
+    def test_recv_one_heartbeat(self):
+        """Test recv_one skips heartbeat then gets result"""
+        ctx, wrk, tmpdir = _make_ctx(
+            {'rpi': mock.Mock(target='rpi', arch='arm')})
+        build_dir = tempfile.mkdtemp()
+        ctx.builder.get_build_dir.return_value = build_dir
+        recv_q = queue.Queue()
+        recv_q.put(('resp', {'resp': 'heartbeat'}))
+        recv_q.put(('resp', {'resp': 'build_result', 'board': 'rpi',
+                             'commit_upto': 0, 'return_code': 0,
+                             'stderr': '', 'stdout': ''}))
+        with terminal.capture():
+            result = ctx.recv_one(wrk, recv_q)
+        self.assertTrue(result)
+        ctx.close()
+        shutil.rmtree(tmpdir)
+        shutil.rmtree(build_dir)
+
+    def test_recv_one_other(self):
+        """Test recv_one returns True for unknown response"""
+        ctx, wrk, tmpdir = _make_ctx(
+            {'rpi': mock.Mock(target='rpi', arch='arm')})
+        recv_q = queue.Queue()
+        recv_q.put(('resp', {'resp': 'something_else'}))
+        result = ctx.recv_one(wrk, recv_q)
+        self.assertTrue(result)
+        ctx.close()
+        shutil.rmtree(tmpdir)
+
+    def test_recv_one_progress(self):
+        """Test recv_one handles progress then result"""
+        ctx, wrk, tmpdir = _make_ctx(
+            {'rpi': mock.Mock(target='rpi', arch='arm')})
+        build_dir = tempfile.mkdtemp()
+        ctx.builder.get_build_dir.return_value = build_dir
+        recv_q = queue.Queue()
+        recv_q.put(('resp', {'resp': 'build_started', 'num_threads': 2}))
+        recv_q.put(('resp', {'resp': 'build_result', 'board': 'rpi',
+                             'commit_upto': 0, 'return_code': 0,
+                             'stderr': '', 'stdout': ''}))
+        with terminal.capture():
+            result = ctx.recv_one(wrk, recv_q)
+        self.assertTrue(result)
+        ctx.close()
+        shutil.rmtree(tmpdir)
+        shutil.rmtree(build_dir)
+
+
+class TestCollectResultsExtended(unittest.TestCase):
+    """Test collect_results with more board grabbing"""
+
+
+    def test_collect_grabs_more(self):
+        """Test collect_results grabs more boards when in_flight drops"""
+        ctx, wrk, tmpdir = _make_ctx(
+            {'rpi': mock.Mock(target='rpi', arch='arm')})
+        wrk.max_boards = 2
+        build_dir = tempfile.mkdtemp()
+        ctx.builder.get_build_dir.return_value = build_dir
+        extra_brd = mock.Mock(target='extra', arch='arm')
+        grab_calls = [0]
+
+        def grab(_w, _n):
+            grab_calls[0] += 1
+            if grab_calls[0] == 1:
+                return [extra_brd]
+            return []
+
+        state = boss.DemandState(sent=1, ncommits=1, grab_func=grab)
+
+        recv_q = queue.Queue()
+        recv_q.put(('resp', {'resp': 'build_result', 'board': 'rpi',
+                             'commit_upto': 0, 'return_code': 0,
+                             'stderr': '', 'stdout': ''}))
+        # After rpi completes, in_flight drops to 0 < max_boards=2,
+        # grab returns extra_brd, then build_done ends collection
+        recv_q.put(('resp', {'resp': 'build_done'}))
+
+        with terminal.capture():
+            ctx.collect_results(wrk, recv_q, state)
+
+        self.assertEqual(state.received, 1)
+        self.assertGreater(grab_calls[0], 0)
+        ctx.close()
+        shutil.rmtree(tmpdir)
+        shutil.rmtree(ctx.builder.get_build_dir.return_value)
+
+    def test_collect_write_failure(self):
+        """Test collect_results stops on write_result failure"""
+        ctx, wrk, tmpdir = _make_ctx(
+            {'rpi': mock.Mock(target='rpi', arch='arm')})
+        ctx.builder.get_build_dir.side_effect = RuntimeError('boom')
+
+        state = boss.DemandState(sent=1, ncommits=1,
+                                 grab_func=lambda w, n: [])
+        recv_q = queue.Queue()
+        recv_q.put(('resp', {'resp': 'build_result', 'board': 'rpi',
+                             'commit_upto': 0, 'return_code': 0,
+                             'stderr': '', 'stdout': ''}))
+        with terminal.capture():
+            result = ctx.collect_results(wrk, recv_q, state)
+        self.assertFalse(result)
+        ctx.close()
+        shutil.rmtree(tmpdir)
+
+
+class TestRunParallelErrors(unittest.TestCase):
+    """Test WorkerPool._run_parallel error paths"""
+
+    def test_worker_busy_and_boss_error(self):
+        """Test _run_parallel handles WorkerBusy and BossError"""
+        pool = boss.WorkerPool([])
+        busy_wrk = mock.Mock(name='busy1')
+        busy_wrk.name = 'busy1'
+        fail_wrk = mock.Mock(name='fail1')
+        fail_wrk.name = 'fail1'
+
+        calls = []
+
+        def func(item):
+            calls.append(item.name)
+            if item.name == 'busy1':
+                raise boss.WorkerBusy('too busy')
+            raise boss.BossError('failed')
+
+        with terminal.capture():
+            pool._run_parallel('Testing', [busy_wrk, fail_wrk], func)
+        fail_wrk.remove_lock.assert_called_once()
+
+
+class TestCloseAll(unittest.TestCase):
+    """Test WorkerPool.close_all() (lines 1533-1544)"""
+
+    def test_close_all_sends_quit(self):
+        """Test close_all sends quit and closes workers"""
+        pool = boss.WorkerPool([])
+        wrk = mock.Mock()
+        wrk.closing = False
+        wrk.bytes_sent = 0
+        wrk.bytes_recv = 0
+        wrk.name = 'host1'
+        pool.workers = [wrk]
+        blog = mock.Mock()
+        pool._boss_log = blog
+
+        with terminal.capture():
+            pool.close_all()
+
+        wrk.close.assert_called()
+        wrk.remove_lock.assert_called()
+        self.assertEqual(len(pool.workers), 0)
+
+
+class TestCollectResultsTimeout(unittest.TestCase):
+    """Test collect_results recv timeout and non-result responses"""
+
+
+    @mock.patch.object(boss, 'BUILD_TIMEOUT', 0.01)
+    def test_collect_timeout(self):
+        """Test collect_results returns False on recv timeout (line 933)"""
+        ctx, wrk, tmpdir = _make_ctx()
+        state = boss.DemandState(sent=1, ncommits=1,
+                                 grab_func=lambda w, n: [])
+        recv_q = queue.Queue()
+        with terminal.capture():
+            result = ctx.collect_results(wrk, recv_q, state)
+        self.assertFalse(result)
+        ctx.close()
+        shutil.rmtree(tmpdir)
+
+    def test_collect_skips_unknown(self):
+        """Test collect_results skips non-build_result responses (line 940)"""
+        ctx, wrk, tmpdir = _make_ctx()
+        state = boss.DemandState(sent=1, ncommits=1,
+                                 grab_func=lambda w, n: [])
+        recv_q = queue.Queue()
+        recv_q.put(('resp', {'resp': 'configure_done'}))  # unknown
+        recv_q.put(('resp', {'resp': 'build_done'}))
+        with terminal.capture():
+            result = ctx.collect_results(wrk, recv_q, state)
+        self.assertTrue(result)
+        ctx.close()
+        shutil.rmtree(tmpdir)
+
+
+class TestDispatchJobs(unittest.TestCase):
+    """Test WorkerPool._dispatch_jobs() (lines 1290-1309)"""
+
+    def test_dispatch_jobs(self):
+        """Test _dispatch_jobs runs batch workers and closes context"""
+        pool = boss.WorkerPool([])
+        wrk = mock.Mock(nthreads=4, closing=False, slots=2)
+        wrk.name = 'host1'
+
+        brd = mock.Mock(target='rpi', arch='arm')
+        commit = mock.Mock(hash='abc123')
+        wjobs = [(brd, 0, commit)]
+
+        builder = mock.Mock()
+        tmpdir = tempfile.mkdtemp()
+        builder.base_dir = tmpdir
+        board_selected = {'rpi': brd}
+        blog = mock.Mock()
+        pool._boss_log = blog
+
+        # Mock build_boards to avoid actual protocol
+        wrk.build_boards = mock.Mock()
+        # recv_one will be called once — return False to end
+        with mock.patch.object(boss._DispatchContext, 'start_reader',
+                               return_value=queue.Queue()), \
+             mock.patch.object(boss._DispatchContext, 'recv_one',
+                               return_value=False):
+            with terminal.capture():
+                pool._dispatch_jobs({wrk: wjobs}, builder,
+                                    board_selected)
+
+        self.assertIsNone(pool._boss_log)
+        shutil.rmtree(tmpdir)
+
+
+class TestRunBatchWorker(unittest.TestCase):
+    """Test WorkerPool._run_batch_worker() (lines 1320-1358)"""
+
+    def test_batch_worker_success(self):
+        """Test _run_batch_worker sends build_boards and collects"""
+        wrk = mock.Mock(nthreads=4, closing=False, slots=2)
+        wrk.name = 'host1'
+        brd = mock.Mock(target='rpi', arch='arm')
+        commit = mock.Mock(hash='abc123')
+
+        builder = mock.Mock()
+        tmpdir = tempfile.mkdtemp()
+        builder.base_dir = tmpdir
+
+        ctx = boss._DispatchContext(
+            workers=[wrk], builder=builder,
+            board_selected={'rpi': brd}, boss_log=mock.Mock())
+
+        recv_q = queue.Queue()
+        recv_q.put(('resp', {'resp': 'build_result', 'board': 'rpi',
+                             'commit_upto': 0, 'return_code': 0,
+                             'stderr': '', 'stdout': ''}))
+        build_dir = tempfile.mkdtemp()
+        builder.get_build_dir.return_value = build_dir
+
+        with mock.patch.object(ctx, 'start_reader',
+                               return_value=recv_q):
+            with terminal.capture():
+                boss.WorkerPool._run_batch_worker(
+                    wrk, [(brd, 0, commit)], ctx)
+
+        wrk.build_boards.assert_called_once()
+        ctx.close()
+        shutil.rmtree(tmpdir)
+        shutil.rmtree(build_dir)
+
+    def test_batch_worker_build_error(self):
+        """Test _run_batch_worker handles build_boards BossError"""
+        wrk = mock.Mock(nthreads=4, closing=False, slots=2)
+        wrk.name = 'host1'
+        wrk.build_boards.side_effect = boss.BossError('gone')
+        brd = mock.Mock(target='rpi', arch='arm')
+        commit = mock.Mock(hash='abc123')
+
+        builder = mock.Mock()
+        tmpdir = tempfile.mkdtemp()
+        builder.base_dir = tmpdir
+
+        ctx = boss._DispatchContext(
+            workers=[wrk], builder=builder,
+            board_selected={}, boss_log=mock.Mock())
+
+        with mock.patch.object(ctx, 'start_reader',
+                               return_value=queue.Queue()):
+            with terminal.capture():
+                boss.WorkerPool._run_batch_worker(
+                    wrk, [(brd, 0, commit)], ctx)
+
+        ctx.close()
+        shutil.rmtree(tmpdir)
+
+
+class TestStartDemandWorker(unittest.TestCase):
+    """Test WorkerPool._start_demand_worker() (lines 1380-1400)"""
+
+    def _make_pool_and_ctx(self):
+        pool = boss.WorkerPool([])
+        wrk = mock.Mock(nthreads=4, closing=False, max_boards=2,
+                        slots=2, toolchains={'arm': '/gcc'})
+        wrk.name = 'host1'
+        builder = mock.Mock()
+        tmpdir = tempfile.mkdtemp()
+        builder.base_dir = tmpdir
+        ctx = boss._DispatchContext(
+            workers=[wrk], builder=builder,
+            board_selected={}, boss_log=mock.Mock())
+        return pool, wrk, ctx, tmpdir
+
+    def test_prepare_error(self):
+        """Test _start_demand_worker handles build_prepare BossError"""
+        pool, wrk, ctx, tmpdir = self._make_pool_and_ctx()
+        wrk.build_prepare.side_effect = boss.BossError('gone')
+
+        with mock.patch.object(ctx, 'start_reader',
+                               return_value=queue.Queue()):
+            with terminal.capture():
+                recv_q, state = pool._start_demand_worker(
+                    wrk, ctx, ['abc'], 1, [], threading.Lock())
+        self.assertIsNone(recv_q)
+        self.assertIsNone(state)
+        ctx.close()
+        shutil.rmtree(tmpdir)
+
+    def test_prepare_timeout(self):
+        """Test _start_demand_worker when wait_for_prepare fails"""
+        pool, wrk, ctx, tmpdir = self._make_pool_and_ctx()
+
+        with mock.patch.object(ctx, 'start_reader',
+                               return_value=queue.Queue()), \
+             mock.patch.object(ctx, 'wait_for_prepare',
+                               return_value=False):
+            with terminal.capture():
+                recv_q, state = pool._start_demand_worker(
+                    wrk, ctx, ['abc'], 1, [], threading.Lock())
+        self.assertIsNone(recv_q)
+        ctx.close()
+        shutil.rmtree(tmpdir)
+
+    def test_no_boards(self):
+        """Test _start_demand_worker when no boards available
+
+        Also covers the BossError path in build_done (lines 1395-1396).
+        """
+        pool, wrk, ctx, tmpdir = self._make_pool_and_ctx()
+        wrk.build_done.side_effect = boss.BossError('gone')
+
+        with mock.patch.object(ctx, 'start_reader',
+                               return_value=queue.Queue()), \
+             mock.patch.object(ctx, 'wait_for_prepare',
+                               return_value=True):
+            with terminal.capture():
+                recv_q, state = pool._start_demand_worker(
+                    wrk, ctx, ['abc'], 1, [], threading.Lock())
+        self.assertIsNone(recv_q)
+        ctx.close()
+        shutil.rmtree(tmpdir)
+
+    def test_send_batch_failure(self):
+        """Test _start_demand_worker when send_batch fails"""
+        pool, wrk, ctx, tmpdir = self._make_pool_and_ctx()
+        brd = mock.Mock(target='rpi', arch='arm')
+        pool_list = [brd]
+
+        wrk.build_board.side_effect = boss.BossError('gone')
+
+        with mock.patch.object(ctx, 'start_reader',
+                               return_value=queue.Queue()), \
+             mock.patch.object(ctx, 'wait_for_prepare',
+                               return_value=True):
+            with terminal.capture():
+                recv_q, state = pool._start_demand_worker(
+                    wrk, ctx, ['abc'], 1, pool_list,
+                    threading.Lock())
+        self.assertIsNone(recv_q)
+        ctx.close()
+        shutil.rmtree(tmpdir)
+
+    def test_success(self):
+        """Test _start_demand_worker success path"""
+        pool, wrk, ctx, tmpdir = self._make_pool_and_ctx()
+        brd = mock.Mock(target='rpi', arch='arm')
+        pool_list = [brd]
+
+        with mock.patch.object(ctx, 'start_reader',
+                               return_value=queue.Queue()), \
+             mock.patch.object(ctx, 'wait_for_prepare',
+                               return_value=True):
+            with terminal.capture():
+                recv_q, state = pool._start_demand_worker(
+                    wrk, ctx, ['abc'], 1, pool_list,
+                    threading.Lock())
+        self.assertIsNotNone(recv_q)
+        self.assertIsNotNone(state)
+        self.assertEqual(state.sent, 1)
+        ctx.close()
+        shutil.rmtree(tmpdir)
+
+
+class TestFinishDemandWorker(unittest.TestCase):
+    """Test WorkerPool._finish_demand_worker() (lines 1429-1437)"""
+
+    def test_build_done_error(self):
+        """Test _finish_demand_worker when build_done raises"""
+        wrk = mock.Mock(nthreads=4, closing=False, max_boards=0,
+                        slots=2)
+        wrk.name = 'host1'
+        wrk.build_done.side_effect = boss.BossError('gone')
+        builder = mock.Mock()
+        tmpdir = tempfile.mkdtemp()
+        builder.base_dir = tmpdir
+
+        ctx = boss._DispatchContext(
+            workers=[wrk], builder=builder,
+            board_selected={}, boss_log=mock.Mock())
+
+        state = boss.DemandState(sent=0, ncommits=1,
+                                 grab_func=lambda w, n: [])
+        recv_q = queue.Queue()
+
+        with mock.patch.object(ctx, 'collect_results'):
+            boss.WorkerPool._finish_demand_worker(
+                wrk, ctx, recv_q, state)
+        ctx.close()
+        shutil.rmtree(tmpdir)
+
+    def test_finish_waits_for_done(self):
+        """Test _finish_demand_worker waits for build_done response"""
+        wrk = mock.Mock(nthreads=4, closing=False, max_boards=0,
+                        slots=2)
+        wrk.name = 'host1'
+        builder = mock.Mock()
+        tmpdir = tempfile.mkdtemp()
+        builder.base_dir = tmpdir
+
+        ctx = boss._DispatchContext(
+            workers=[wrk], builder=builder,
+            board_selected={}, boss_log=mock.Mock())
+
+        state = boss.DemandState(sent=0, ncommits=1,
+                                 grab_func=lambda w, n: [])
+        recv_q = queue.Queue()
+        recv_q.put(('resp', {'resp': 'heartbeat'}))
+        recv_q.put(('resp', {'resp': 'build_done'}))
+
+        with mock.patch.object(ctx, 'collect_results'):
+            boss.WorkerPool._finish_demand_worker(
+                wrk, ctx, recv_q, state)
+        ctx.close()
+        shutil.rmtree(tmpdir)
+
+    @mock.patch.object(boss, 'BUILD_TIMEOUT', 0.01)
+    def test_finish_recv_timeout(self):
+        """Test _finish_demand_worker handles recv timeout"""
+        wrk = mock.Mock(nthreads=4, closing=False, max_boards=0,
+                        slots=2)
+        wrk.name = 'host1'
+        builder = mock.Mock()
+        tmpdir = tempfile.mkdtemp()
+        builder.base_dir = tmpdir
+
+        ctx = boss._DispatchContext(
+            workers=[wrk], builder=builder,
+            board_selected={}, boss_log=mock.Mock())
+
+        state = boss.DemandState(sent=0, ncommits=1,
+                                 grab_func=lambda w, n: [])
+        recv_q = queue.Queue()
+
+        with mock.patch.object(ctx, 'collect_results'), \
+             terminal.capture():
+            boss.WorkerPool._finish_demand_worker(
+                wrk, ctx, recv_q, state)
+        ctx.close()
+        shutil.rmtree(tmpdir)
+
+
+class TestCloseAllSignal(unittest.TestCase):
+    """Test close_all quit/close path (lines 1543-1544)"""
+
+    def test_close_all_quit_error(self):
+        """Test close_all handles _send BossError during quit"""
+        pool = boss.WorkerPool([])
+        wrk = mock.Mock()
+        wrk.closing = False
+        wrk.bytes_sent = 0
+        wrk.bytes_recv = 0
+        wrk.name = 'host1'
+        wrk._send.side_effect = boss.BossError('gone')
+        pool.workers = [wrk]
+        pool._boss_log = mock.Mock()
+
+        with terminal.capture():
+            pool.close_all()
+        wrk.close.assert_called()
+        self.assertEqual(len(pool.workers), 0)
+
+
+class TestZeroCapacity(unittest.TestCase):
+    """Test _get_worker_for_arch with zero total capacity (line 1183)"""
+
+    def test_zero_nthreads(self):
+        """Test workers with 0 nthreads don't cause division by zero"""
+        w1 = mock.Mock(nthreads=0, bogomips=0,
+                        toolchains={'arm': '/gcc'})
+        pool = boss.WorkerPool.__new__(boss.WorkerPool)
+        pool.workers = [w1]
+        # Should not raise ZeroDivisionError
+        wrk = pool._get_worker_for_arch('arm', {})
+        self.assertEqual(wrk, w1)
+
+
+if __name__ == '__main__':
+    unittest.main()
diff --git a/tools/buildman/test_machine.py b/tools/buildman/test_machine.py
index b635d1afb6f..eb28d313e3e 100644
--- a/tools/buildman/test_machine.py
+++ b/tools/buildman/test_machine.py
@@ -264,7 +264,8 @@  class TestMachinePool(unittest.TestCase):
             'host2\n'
         )
         pool = machine.MachinePool()
-        available = pool.probe_all()
+        with terminal.capture():
+            available = pool.probe_all()
         self.assertEqual(len(available), 2)
         self.assertEqual(pool.get_total_weight(), 14)
 
@@ -285,7 +286,8 @@  class TestMachinePool(unittest.TestCase):
             'host2\n'
         )
         pool = machine.MachinePool()
-        available = pool.probe_all()
+        with terminal.capture():
+            available = pool.probe_all()
         self.assertEqual(len(available), 1)
         self.assertEqual(available[0].hostname, 'host1')
 
@@ -311,8 +313,10 @@  sandbox   : /usr/bin/gcc
             'host1\n'
         )
         pool = machine.MachinePool()
-        pool.probe_all()
-        missing = pool.check_toolchains({'arm', 'sandbox'})
+        with terminal.capture():
+            pool.probe_all()
+        with terminal.capture():
+            missing = pool.check_toolchains({'arm', 'sandbox'})
         self.assertEqual(missing, {})
 
     @mock.patch('buildman.machine._run_ssh')
@@ -336,8 +340,10 @@  sandbox   : /usr/bin/gcc
             'host1\n'
         )
         pool = machine.MachinePool()
-        pool.probe_all()
-        missing = pool.check_toolchains({'arm', 'sandbox'})
+        with terminal.capture():
+            pool.probe_all()
+        with terminal.capture():
+            missing = pool.check_toolchains({'arm', 'sandbox'})
         self.assertEqual(len(missing), 1)
         m = list(missing.keys())[0]
         self.assertIn('arm', missing[m])
@@ -683,7 +689,8 @@  class TestMachinePoolExtended(unittest.TestCase):
             'mem_avail_mb': 8000, 'disk_avail_mb': 20000})
         bsettings.add_file('[machines]\nhost1\n')
         pool = machine.MachinePool()
-        pool.probe_all()
+        with terminal.capture():
+            pool.probe_all()
         # Just verify it doesn't crash
         with terminal.capture():
             pool.print_summary()
@@ -703,8 +710,10 @@  class TestMachinePoolExtended(unittest.TestCase):
         mock_ssh.side_effect = ssh_side_effect
         bsettings.add_file('[machines]\nhost1\n')
         pool = machine.MachinePool()
-        pool.probe_all()
-        pool.check_toolchains({'arm', 'sandbox'})
+        with terminal.capture():
+            pool.probe_all()
+        with terminal.capture():
+            pool.check_toolchains({'arm', 'sandbox'})
         with terminal.capture():
             pool.print_summary(local_archs={'arm', 'sandbox'})
 
@@ -728,7 +737,8 @@  class TestMachinePoolExtended(unittest.TestCase):
         mock_ssh.side_effect = ssh_side_effect
         bsettings.add_file('[machines]\nhost1\n')
         pool = machine.MachinePool()
-        pool.probe_all()
+        with terminal.capture():
+            pool.probe_all()
 
         local_gcc = {
             'arm': f'{home}/.buildman-toolchains/gcc-13.1.0-nolibc/'
@@ -751,7 +761,8 @@  class TestMachinePoolExtended(unittest.TestCase):
         with mock.patch.object(machine.Machine,
                                '_probe_toolchains_from_boss',
                                fake_probe):
-            missing = pool.check_toolchains({'arm'}, local_gcc=local_gcc)
+            with terminal.capture():
+                missing = pool.check_toolchains({'arm'}, local_gcc=local_gcc)
 
         # arm should be flagged as missing due to version mismatch
         self.assertEqual(len(missing), 1)
@@ -839,7 +850,8 @@  class TestPrintSummaryEdgeCases(unittest.TestCase):
         mock_ssh.side_effect = machine.MachineError('refused')
         bsettings.add_file('[machines]\nhost1\n')
         pool = machine.MachinePool()
-        pool.probe_all()
+        with terminal.capture():
+            pool.probe_all()
         # Should not crash with unavailable machine
         with terminal.capture():
             pool.print_summary()
@@ -851,7 +863,8 @@  class TestPrintSummaryEdgeCases(unittest.TestCase):
             **MACHINE_INFO, 'load_1m': 10.0})
         bsettings.add_file('[machines]\nhost1\n')
         pool = machine.MachinePool()
-        pool.probe_all()
+        with terminal.capture():
+            pool.probe_all()
         with terminal.capture():
             pool.print_summary()
 
@@ -865,7 +878,8 @@  class TestPrintSummaryEdgeCases(unittest.TestCase):
             '[machines]\nhost1\n'
             '[machine:host1]\nmax_boards = 50\n')
         pool = machine.MachinePool()
-        pool.probe_all()
+        with terminal.capture():
+            pool.probe_all()
         with terminal.capture():
             pool.print_summary()
 
@@ -877,7 +891,8 @@  class TestPrintSummaryEdgeCases(unittest.TestCase):
             'mem_avail_mb': 8000, 'disk_avail_mb': 20000})
         bsettings.add_file('[machines]\nhost1\n')
         pool = machine.MachinePool()
-        pool.probe_all()
+        with terminal.capture():
+            pool.probe_all()
         pool.machines[0].tc_error = 'buildman not found'
         with terminal.capture():
             pool.print_summary(local_archs={'arm'})
@@ -890,7 +905,8 @@  class TestPrintSummaryEdgeCases(unittest.TestCase):
             'mem_avail_mb': 8000, 'disk_avail_mb': 20000})
         bsettings.add_file('[machines]\nhost1\n')
         pool = machine.MachinePool()
-        pool.probe_all()
+        with terminal.capture():
+            pool.probe_all()
         pool.machines[0].toolchains = {'sandbox': '/usr/bin/gcc'}
         local_gcc = {
             'arm': os.path.expanduser(
@@ -912,7 +928,8 @@  class TestCheckToolchainsEdge(unittest.TestCase):
         bsettings.add_file('[machines]\nhost1\n')
         pool = machine.MachinePool()
         # Machine is not probed, so not reachable
-        result = pool.check_toolchains({'arm'})
+        with terminal.capture():
+            result = pool.check_toolchains({'arm'})
         self.assertEqual(result, {})
 
     @mock.patch('buildman.machine._run_ssh')
@@ -931,15 +948,17 @@  class TestCheckToolchainsEdge(unittest.TestCase):
         mock_ssh.side_effect = ssh_side_effect
         bsettings.add_file('[machines]\nhost1\n')
         pool = machine.MachinePool()
-        pool.probe_all()
+        with terminal.capture():
+            pool.probe_all()
 
         local_gcc = {
             'arm': f'{home}/.buildman-toolchains/gcc-13/arm/bin/gcc',
         }
         # fetch=True should trigger _fetch_all_missing
         with mock.patch.object(pool, '_fetch_all_missing') as mock_fetch:
-            pool.check_toolchains({'arm'}, fetch=True,
-                                  local_gcc=local_gcc)
+            with terminal.capture():
+                pool.check_toolchains({'arm'}, fetch=True,
+                                      local_gcc=local_gcc)
             mock_fetch.assert_called_once()
 
 
@@ -1011,7 +1030,8 @@  class TestPrintSummaryMissingNoVersion(unittest.TestCase):
             'mem_avail_mb': 8000, 'disk_avail_mb': 20000})
         bsettings.add_file('[machines]\nhost1\n')
         pool = machine.MachinePool()
-        pool.probe_all()
+        with terminal.capture():
+            pool.probe_all()
         pool.machines[0].toolchains = {}
         # arm has a version (under ~/.buildman-toolchains),
         # sandbox does not