[Concept,17/20] buildman: Add worker mode for distributed builds

Message ID 20260316154733.1587261-18-sjg@u-boot.org
State New
Headers
Series buildman: Add distributed builds |

Commit Message

Simon Glass March 16, 2026, 3:47 p.m. UTC
  From: Simon Glass <sjg@chromium.org>

Add --worker flag which runs buildman in worker mode, accepting JSON
commands on stdin and streaming BM>-prefixed JSON responses on stdout.
This is the remote side of the distributed build system: the boss
starts a worker via 'ssh host buildman --worker', pushes source with
'git push', then sends build commands.

The worker uses Builder and BuilderThread with a custom subclass that
sends results over SSH instead of writing them to disk. Worktrees are
created sequentially before the Builder starts, with progress messages
sent to the boss. The worker supports both batch mode (build_boards)
and demand-driven mode (build_prepare / build_board / build_done).

Build settings (no_lto, allow_missing, verbose, etc.) are received
from the boss via a 'configure' command and applied to every build.

On exit (quit command, stdin close, or signal), the worker kills its
entire process group to clean up child make/cc1/ld processes.

Tests are included in test_worker.py covering command parsing, build
coordination, error handling and cleanup.

Signed-off-by: Simon Glass <sjg@chromium.org>
---

 tools/buildman/builder.py      |  53 +-
 tools/buildman/cmdline.py      |   3 +
 tools/buildman/control.py      |   5 +
 tools/buildman/main.py         |   2 +
 tools/buildman/test_builder.py |  28 +-
 tools/buildman/test_worker.py  | 896 ++++++++++++++++++++++++++++++
 tools/buildman/worker.py       | 985 +++++++++++++++++++++++++++++++++
 7 files changed, 1930 insertions(+), 42 deletions(-)
 create mode 100644 tools/buildman/test_worker.py
 create mode 100644 tools/buildman/worker.py

-- 
2.43.0
  

Patch

diff --git a/tools/buildman/builder.py b/tools/buildman/builder.py
index c4ec22dbc77..3264978a616 100644
--- a/tools/buildman/builder.py
+++ b/tools/buildman/builder.py
@@ -199,15 +199,12 @@  class Builder:
         _complete_delay: Expected delay until completion (timedelta)
         _next_delay_update: Next time we plan to display a progress update
                 (datatime)
-        num_threads: Number of builder threads to run
         _opts: DisplayOptions for result output
         _re_make_err: Compiled regex for make error detection
         _restarting_config: True if 'Restart config' is detected in output
         _result_handler: ResultHandler for displaying results
         _single_builder: BuilderThread object for the singer builder, if
             threading is not being used
-        _start_time: Start time for the build
-        _step: Step value for processing commits (1=all, 2=every other, etc.)
         _terminated: Thread was terminated due to an error
         _threads: List of active threads
         _timestamps: List of timestamps for the completion of the last
@@ -318,8 +315,8 @@  class Builder:
         self._build_period_us = None
         self._complete_delay = None
         self._next_delay_update = datetime.now()
-        self._start_time = None
-        self._step = step
+        self.start_time = None
+        self.step = step
         self.no_subdirs = no_subdirs
         self.full_path = full_path
         self.verbose_build = verbose_build
@@ -369,14 +366,14 @@  class Builder:
         # Attributes set by other methods
         self._build_period = None
         self._commit = None
-        self._upto = 0
+        self.upto = 0
         self._warned = 0
         self.fail = 0
         self.commit_count = 0
         self.commits = None
         self.count = 0
-        self._timestamps = collections.deque()
-        self._verbose = False
+        self.timestamps = collections.deque()
+        self.verbose = False
 
         # Note: baseline state for result summaries is now in ResultHandler
 
@@ -478,9 +475,9 @@  class Builder:
         build (one board, one commit).
         """
         now = datetime.now()
-        self._timestamps.append(now)
-        count = len(self._timestamps)
-        delta = self._timestamps[-1] - self._timestamps[0]
+        self.timestamps.append(now)
+        count = len(self.timestamps)
+        delta = self.timestamps[-1] - self.timestamps[0]
         seconds = delta.total_seconds()
 
         # If we have enough data, estimate build period (time taken for a
@@ -489,7 +486,7 @@  class Builder:
             self._next_delay_update = now + timedelta(seconds=2)
             if seconds > 0:
                 self._build_period = float(seconds) / count
-                todo = self.count - self._upto
+                todo = self.count - self.upto
                 self._complete_delay = timedelta(microseconds=
                         self._build_period * todo * 1000000)
                 # Round it
@@ -497,7 +494,7 @@  class Builder:
                         microseconds=self._complete_delay.microseconds)
 
         if seconds > 60:
-            self._timestamps.popleft()
+            self.timestamps.popleft()
             count -= 1
 
     def _select_commit(self, commit, checkout=True):
@@ -580,7 +577,7 @@  class Builder:
         if result:
             target = result.brd.target
 
-            self._upto += 1
+            self.upto += 1
             if result.return_code != 0:
                 self.fail += 1
             elif result.stderr:
@@ -592,7 +589,7 @@  class Builder:
             if self._opts.ide:
                 if result.stderr:
                     sys.stderr.write(result.stderr)
-            elif self._verbose:
+            elif self.verbose:
                 terminal.print_clear()
                 boards_selected = {target : result.brd}
                 self._result_handler.reset_result_summary(boards_selected)
@@ -602,13 +599,13 @@  class Builder:
             target = '(starting)'
 
         # Display separate counts for ok, warned and fail
-        ok = self._upto - self._warned - self.fail
+        ok = self.upto - self._warned - self.fail
         line = '\r' + self.col.build(self.col.GREEN, f'{ok:5d}')
         line += self.col.build(self.col.YELLOW, f'{self._warned:5d}')
         line += self.col.build(self.col.RED, f'{self.fail:5d}')
 
         line += f' /{self.count:<5d}  '
-        remaining = self.count - self._upto
+        remaining = self.count - self.upto
         if remaining:
             line += self.col.build(self.col.MAGENTA, f' -{remaining:<5d}  ')
         else:
@@ -1033,10 +1030,10 @@  class Builder:
             board_selected (dict): Selected boards to build
         """
         # First work out how many commits we will build
-        count = (self.commit_count + self._step - 1) // self._step
+        count = (self.commit_count + self.step - 1) // self.step
         self.count = len(board_selected) * count
-        self._upto = self._warned = self.fail = 0
-        self._timestamps = collections.deque()
+        self.upto = self._warned = self.fail = 0
+        self.timestamps = collections.deque()
 
     def get_thread_dir(self, thread_num):
         """Get the directory path to the working dir for a thread.
@@ -1114,7 +1111,7 @@  class Builder:
             else:
                 raise ValueError(f"Can't setup git repo with {setup_git}.")
 
-    def _prepare_working_space(self, max_threads, setup_git):
+    def prepare_working_space(self, max_threads, setup_git):
         """Prepare the working directory for use.
 
         Set up the git repo for each thread. Creates a linked working tree
@@ -1187,7 +1184,7 @@  class Builder:
                     to_remove.append(dirname)
         return to_remove
 
-    def _prepare_output_space(self):
+    def prepare_output_space(self):
         """Get the output directories ready to receive files.
 
         We delete any output directories which look like ones we need to
@@ -1223,16 +1220,16 @@  class Builder:
         """
         self.commit_count = len(commits) if commits else 1
         self.commits = commits
-        self._verbose = verbose
+        self.verbose = verbose
 
         self._result_handler.reset_result_summary(board_selected)
         builderthread.mkdir(self.base_dir, parents = True)
-        self._prepare_working_space(min(self.num_threads, len(board_selected)),
+        self.prepare_working_space(min(self.num_threads, len(board_selected)),
                 board_selected and commits is not None)
-        self._prepare_output_space()
+        self.prepare_output_space()
         if not self._opts.ide:
             tprint('\rStarting build...', newline=False)
-        self._start_time = datetime.now()
+        self.start_time = datetime.now()
         self._setup_build(board_selected, commits)
         self.count += extra_count
         self.process_result(None)
@@ -1246,7 +1243,7 @@  class Builder:
             job.work_in_output = self.work_in_output
             job.adjust_cfg = self.adjust_cfg
             job.fragments = fragments
-            job.step = self._step
+            job.step = self.step
             if self.num_threads:
                 self.queue.put(job)
             else:
@@ -1318,4 +1315,4 @@  class Builder:
         """
         self._result_handler.print_build_summary(
             self.count, self._already_done, self.kconfig_reconfig,
-            self._start_time, self.thread_exceptions)
+            self.start_time, self.thread_exceptions)
diff --git a/tools/buildman/cmdline.py b/tools/buildman/cmdline.py
index a85da069d24..5f3c47bf7fe 100644
--- a/tools/buildman/cmdline.py
+++ b/tools/buildman/cmdline.py
@@ -119,6 +119,9 @@  def add_upto_m(parser):
     parser.add_argument(
           '--maintainer-check', action='store_true',
           help='Check that maintainer entries exist for each board')
+    parser.add_argument('--worker', action='store_true', default=False,
+          help='Run in worker mode, accepting build commands on stdin '
+               '(used internally for distributed builds)')
     parser.add_argument(
           '--no-allow-missing', action='store_true', default=False,
           help='Disable telling binman to allow missing blobs')
diff --git a/tools/buildman/control.py b/tools/buildman/control.py
index 7515932a2e4..082db377293 100644
--- a/tools/buildman/control.py
+++ b/tools/buildman/control.py
@@ -759,6 +759,11 @@  def do_buildman(args, toolchains=None, make_func=None, brds=None,
     gitutil.setup()
     col = terminal.Color()
 
+    # Handle --worker: run in worker mode for distributed builds
+    if args.worker:
+        from buildman import worker  # pylint: disable=C0415
+        return worker.do_worker()
+
     # Handle --machines: probe remote machines and show status
     if args.machines or args.machines_fetch_arch:
         return machine.do_probe_machines(
diff --git a/tools/buildman/main.py b/tools/buildman/main.py
index faff7d41ceb..225e341fc26 100755
--- a/tools/buildman/main.py
+++ b/tools/buildman/main.py
@@ -42,6 +42,7 @@  def run_tests(skip_net_tests, debug, verbose, args):
     from buildman import test_builder
     from buildman import test_cfgutil
     from buildman import test_machine
+    from buildman import test_worker
 
     test_name = args.terms and args.terms[0] or None
     if skip_net_tests:
@@ -65,6 +66,7 @@  def run_tests(skip_net_tests, debug, verbose, args):
          test_builder.TestMake,
          test_builder.TestPrintBuildSummary,
          test_machine,
+         test_worker,
          'buildman.toolchain'])
 
     return (0 if result.wasSuccessful() else 1)
diff --git a/tools/buildman/test_builder.py b/tools/buildman/test_builder.py
index 48be83cf645..74f19ec9528 100644
--- a/tools/buildman/test_builder.py
+++ b/tools/buildman/test_builder.py
@@ -277,7 +277,7 @@  class TestPrepareThread(unittest.TestCase):
 
 
 class TestPrepareWorkingSpace(unittest.TestCase):
-    """Tests for Builder._prepare_working_space()"""
+    """Tests for Builder.prepare_working_space()"""
 
     def setUp(self):
         """Set up test fixtures"""
@@ -296,7 +296,7 @@  class TestPrepareWorkingSpace(unittest.TestCase):
     @mock.patch.object(builderthread, 'mkdir')
     def test_no_setup_git(self, mock_mkdir, mock_prepare_thread):
         """Test with setup_git=False"""
-        self.builder._prepare_working_space(2, False)
+        self.builder.prepare_working_space(2, False)
 
         mock_mkdir.assert_called_once()
         # Should prepare 2 threads with setup_git=False
@@ -312,7 +312,7 @@  class TestPrepareWorkingSpace(unittest.TestCase):
     def test_worktree_available(self, _mock_mkdir, mock_check_worktree,
                                 mock_prune, mock_prepare_thread):
         """Test when worktree is available"""
-        self.builder._prepare_working_space(3, True)
+        self.builder.prepare_working_space(3, True)
 
         mock_check_worktree.assert_called_once()
         mock_prune.assert_called_once()
@@ -329,7 +329,7 @@  class TestPrepareWorkingSpace(unittest.TestCase):
     def test_worktree_not_available(self, _mock_mkdir, mock_check_worktree,
                                     mock_prepare_thread):
         """Test when worktree is not available (falls back to clone)"""
-        self.builder._prepare_working_space(2, True)
+        self.builder.prepare_working_space(2, True)
 
         mock_check_worktree.assert_called_once()
         # Should prepare 2 threads with setup_git='clone'
@@ -341,7 +341,7 @@  class TestPrepareWorkingSpace(unittest.TestCase):
     @mock.patch.object(builderthread, 'mkdir')
     def test_zero_threads(self, _mock_mkdir, mock_prepare_thread):
         """Test with max_threads=0 (should still prepare 1 thread)"""
-        self.builder._prepare_working_space(0, False)
+        self.builder.prepare_working_space(0, False)
 
         # Should prepare at least 1 thread
         self.assertEqual(mock_prepare_thread.call_count, 1)
@@ -352,7 +352,7 @@  class TestPrepareWorkingSpace(unittest.TestCase):
     def test_no_git_dir(self, _mock_mkdir, mock_prepare_thread):
         """Test with no git_dir set"""
         self.builder.git_dir = None
-        self.builder._prepare_working_space(2, True)
+        self.builder.prepare_working_space(2, True)
 
         # _detect_git_setup returns False when git_dir is None
         self.assertEqual(mock_prepare_thread.call_count, 2)
@@ -368,7 +368,7 @@  class TestPrepareWorkingSpace(unittest.TestCase):
                         mock_prune, mock_prepare_thread):
         """Test lazy_thread_setup skips upfront thread preparation"""
         self.builder._lazy_thread_setup = True
-        self.builder._prepare_working_space(4, True)
+        self.builder.prepare_working_space(4, True)
 
         # Git setup type is detected so prepare_thread() can use it
         # later, but no threads are prepared upfront
@@ -510,7 +510,7 @@  class TestShowNotBuilt(unittest.TestCase):
 
 
 class TestPrepareOutputSpace(unittest.TestCase):
-    """Tests for _prepare_output_space() and _get_output_space_removals()"""
+    """Tests for prepare_output_space() and _get_output_space_removals()"""
 
     def setUp(self):
         """Set up test fixtures"""
@@ -561,25 +561,25 @@  class TestPrepareOutputSpace(unittest.TestCase):
         self.assertEqual(result, ['/tmp/test/02_g1234567_old'])
 
     @mock.patch.object(builder.Builder, '_get_output_space_removals')
-    def test_prepare_output_space_nothing_to_remove(self, mock_get_removals):
-        """Test _prepare_output_space with nothing to remove"""
+    def testprepare_output_space_nothing_to_remove(self, mock_get_removals):
+        """Test prepare_output_space with nothing to remove"""
         mock_get_removals.return_value = []
         terminal.get_print_test_lines()  # Clear
 
-        self.builder._prepare_output_space()
+        self.builder.prepare_output_space()
 
         lines = terminal.get_print_test_lines()
         self.assertEqual(len(lines), 0)
 
     @mock.patch.object(shutil, 'rmtree')
     @mock.patch.object(builder.Builder, '_get_output_space_removals')
-    def test_prepare_output_space_removes_dirs(self, mock_get_removals,
+    def testprepare_output_space_removes_dirs(self, mock_get_removals,
                                                mock_rmtree):
-        """Test _prepare_output_space removes old directories"""
+        """Test prepare_output_space removes old directories"""
         mock_get_removals.return_value = ['/tmp/test/old1', '/tmp/test/old2']
         terminal.get_print_test_lines()  # Clear
 
-        self.builder._prepare_output_space()
+        self.builder.prepare_output_space()
 
         # Check rmtree was called for each directory
         self.assertEqual(mock_rmtree.call_count, 2)
diff --git a/tools/buildman/test_worker.py b/tools/buildman/test_worker.py
new file mode 100644
index 00000000000..4a47501217b
--- /dev/null
+++ b/tools/buildman/test_worker.py
@@ -0,0 +1,896 @@ 
+# SPDX-License-Identifier: GPL-2.0+
+# Copyright 2026 Simon Glass <sjg@chromium.org>
+
+"""Tests for the worker module"""
+
+# pylint: disable=W0212,C0302
+
+import io
+import json
+import os
+import queue
+import signal
+import subprocess
+import tempfile
+import unittest
+from unittest import mock
+
+from u_boot_pylib.command import CommandExc, CommandResult
+
+from buildman import worker
+
+
+def _parse(line):
+    """Parse a protocol line into a dict"""
+    return json.loads(line[len(worker.RESPONSE_PREFIX):])
+
+
+class _ProtoTestBase(unittest.TestCase):
+    """Base class for tests that use the worker protocol
+
+    Provides capture/parse helpers and resets _protocol_out on tearDown.
+    """
+
+    def setUp(self):
+        self.buf = io.StringIO()
+        worker._protocol_out = self.buf
+
+    def tearDown(self):
+        worker._protocol_out = None
+
+    def get_resp(self):
+        """Parse the last response written to the capture buffer"""
+        return _parse(self.buf.getvalue())
+
+    def get_all_resp(self):
+        """Parse all responses from the capture buffer"""
+        return [_parse(line) for line in self.buf.getvalue().strip().split('\n')
+                if line]
+
+    def assert_resp(self, key, value):
+        """Assert a key in the last response equals value"""
+        self.assertEqual(self.get_resp()[key], value)
+
+    def assert_in_output(self, text):
+        """Assert text appears in raw protocol output"""
+        self.assertIn(text, self.buf.getvalue())
+
+
+class _RunWorkerBase(_ProtoTestBase):
+    """Base class for tests that run the full worker loop"""
+
+    def _run(self, stdin_text):
+        """Run the worker with given stdin, return (result, output lines)"""
+        buf = io.StringIO()
+        with mock.patch('buildman.worker.toolchain_mod.Toolchains'), \
+             mock.patch('sys.stdin', io.StringIO(stdin_text)), \
+             mock.patch('sys.stdout', buf):
+            result = worker.run_worker()
+        lines = [line for line in buf.getvalue().strip().split('\n')
+                 if line]
+        return result, lines
+
+
+class TestProtocol(_ProtoTestBase):
+    """Test _send(), _send_error() and _send_build_result()"""
+
+    def test_send(self):
+        """Test sending a response"""
+        worker._send({'resp': 'ready', 'nthreads': 4})
+        self.assertTrue(self.buf.getvalue().startswith(
+            worker.RESPONSE_PREFIX))
+        self.assert_resp('resp', 'ready')
+        self.assert_resp('nthreads', 4)
+
+    def test_send_error(self):
+        """Test sending an error response"""
+        worker._send_error('something broke')
+        self.assert_resp('resp', 'error')
+        self.assert_resp('msg', 'something broke')
+
+    def test_send_build_result_with_sizes(self):
+        """Test sending result with sizes"""
+        worker._send_build_result(
+            'sandbox', 0, 0,
+            sizes={'text': 1000, 'data': 200})
+        self.assertEqual(
+            self.get_resp()['sizes'], {'text': 1000, 'data': 200})
+
+    def test_send_build_result_without_sizes(self):
+        """Test sending result without sizes"""
+        worker._send_build_result('sandbox', 0, 0)
+        self.assertNotIn('sizes', self.get_resp())
+
+
+class TestUtilityFunctions(unittest.TestCase):
+    """Test _get_nthreads(), _get_load_avg() and _get_sizes()"""
+
+    def test_nthreads_normal(self):
+        """Test getting thread count"""
+        self.assertGreater(worker._get_nthreads(), 0)
+
+    @mock.patch('os.cpu_count', return_value=None)
+    def test_nthreads_none(self, _mock):
+        """Test when cpu_count returns None"""
+        self.assertEqual(worker._get_nthreads(), 1)
+
+    @mock.patch('os.cpu_count', side_effect=AttributeError)
+    def test_nthreads_attribute_error(self, _mock):
+        """Test when cpu_count raises AttributeError"""
+        self.assertEqual(worker._get_nthreads(), 1)
+
+    @mock.patch('builtins.open', side_effect=OSError('no file'))
+    def test_load_avg_no_proc(self, _mock):
+        """Test when /proc/loadavg is not available"""
+        self.assertEqual(worker._get_load_avg(), 0.0)
+
+    def test_get_sizes_no_elf(self):
+        """Test with no ELF file"""
+        with tempfile.TemporaryDirectory() as tmpdir:
+            self.assertEqual(worker._get_sizes(tmpdir), {})
+
+    @mock.patch('buildman.worker.subprocess.Popen')
+    def test_get_sizes_with_elf(self, mock_popen):
+        """Test with ELF file present"""
+        proc = mock.Mock()
+        proc.communicate.return_value = (
+            b'   text    data     bss     dec     hex filename\n'
+            b'  12345    1234     567   14146    374a u-boot\n',
+            b'')
+        proc.returncode = 0
+        mock_popen.return_value = proc
+        with tempfile.TemporaryDirectory() as tmpdir:
+            elf = os.path.join(tmpdir, 'u-boot')
+            with open(elf, 'w', encoding='utf-8') as fout:
+                fout.write('fake')
+            self.assertIn('raw', worker._get_sizes(tmpdir))
+
+    @mock.patch('buildman.worker.subprocess.Popen',
+                side_effect=OSError('no size'))
+    def test_get_sizes_popen_fails(self, _mock):
+        """Test when size command fails"""
+        with tempfile.TemporaryDirectory() as tmpdir:
+            elf = os.path.join(tmpdir, 'u-boot')
+            with open(elf, 'w', encoding='utf-8') as fout:
+                fout.write('fake')
+            self.assertEqual(worker._get_sizes(tmpdir), {})
+
+
+class TestCmdSetup(_ProtoTestBase):
+    """Test _cmd_setup()"""
+
+    @mock.patch('buildman.worker.command.run_one')
+    def test_auto_work_dir(self, mock_run):
+        """Test setup with auto-created work directory"""
+        mock_run.return_value = mock.Mock(return_code=0)
+        state = {}
+        result = worker._cmd_setup({'work_dir': ''}, state)
+        self.assertTrue(result)
+        self.assertIn('work_dir', state)
+        self.assertTrue(state.get('auto_work_dir'))
+        mock_run.assert_called_once()
+        self.addCleanup(lambda: os.path.isdir(state['work_dir'])
+                        and os.rmdir(state['work_dir']))
+
+    @mock.patch('buildman.worker.command.run_one')
+    def test_explicit_work_dir(self, mock_run):
+        """Test setup with explicit work directory"""
+        mock_run.return_value = mock.Mock(return_code=0)
+        with tempfile.TemporaryDirectory() as tmpdir:
+            state = {}
+            work_dir = os.path.join(tmpdir, 'build')
+            self.assertTrue(
+                worker._cmd_setup({'work_dir': work_dir}, state))
+            self.assertEqual(state['work_dir'], work_dir)
+            self.assertTrue(os.path.isdir(work_dir))
+            self.assertNotIn('auto_work_dir', state)
+
+    @mock.patch('buildman.worker.command.run_one')
+    def test_setup_returns_git_dir(self, mock_run):
+        """Test setup response includes git_dir"""
+        mock_run.return_value = mock.Mock(return_code=0)
+        with tempfile.TemporaryDirectory() as tmpdir:
+            worker._cmd_setup({'work_dir': tmpdir}, {})
+            self.assert_resp('resp', 'setup_done')
+            self.assertEqual(
+                self.get_resp()['git_dir'],
+                os.path.join(tmpdir, '.git'))
+
+    def test_setup_existing_git(self):
+        """Test setup skips git init if .git already exists"""
+        with tempfile.TemporaryDirectory() as tmpdir:
+            os.makedirs(os.path.join(tmpdir, '.git'))
+            with mock.patch(
+                    'buildman.worker.command.run_one') as mock_run:
+                self.assertTrue(
+                    worker._cmd_setup({'work_dir': tmpdir}, {}))
+            mock_run.assert_not_called()
+
+    @mock.patch('buildman.worker.command.run_one')
+    def test_git_init_fails(self, mock_run):
+        """Test setup when git init fails"""
+        mock_run.side_effect = CommandExc(
+            'git init failed', CommandResult())
+        with tempfile.TemporaryDirectory() as tmpdir:
+            result = worker._cmd_setup(
+                {'work_dir': os.path.join(tmpdir, 'new')}, {})
+        self.assertFalse(result)
+        self.assert_in_output('git init failed')
+
+
+class TestCmdQuit(_ProtoTestBase):
+    """Test _cmd_quit()"""
+
+    def test_quit(self):
+        """Test quit command"""
+        worker._cmd_quit({})
+        self.assert_resp('resp', 'quit_ack')
+
+    def test_quit_cleanup(self):
+        """Test quit cleans up auto work directory"""
+        tmpdir = tempfile.mkdtemp(prefix='bm-test-')
+        worker._cmd_quit({'work_dir': tmpdir, 'auto_work_dir': True})
+        self.assertFalse(os.path.exists(tmpdir))
+
+    def test_quit_preserves_explicit_dir(self):
+        """Test quit does not remove non-auto work directory"""
+        with tempfile.TemporaryDirectory() as tmpdir:
+            worker._cmd_quit({'work_dir': tmpdir})
+            self.assertTrue(os.path.isdir(tmpdir))
+
+
+class TestCmdConfigure(_ProtoTestBase):
+    """Test _cmd_configure()"""
+
+    def test_configure(self):
+        """Test that configure stores settings in state"""
+        state = {}
+        settings = {'no_lto': True, 'allow_missing': True}
+        self.assertTrue(
+            worker._cmd_configure({'settings': settings}, state))
+        self.assertEqual(state['settings'], settings)
+        self.assert_resp('resp', 'configure_done')
+
+    def test_configure_empty(self):
+        """Test configure with empty settings"""
+        state = {}
+        worker._cmd_configure({'settings': {}}, state)
+        self.assertEqual(state['settings'], {})
+
+
+class TestRunWorker(_RunWorkerBase):
+    """Test run_worker()"""
+
+    def test_empty_stdin(self):
+        """Test worker with empty stdin (unexpected close)"""
+        result, lines = self._run('')
+        self.assertEqual(result, 1)
+        self.assertEqual(_parse(lines[0])['resp'], 'ready')
+
+    def test_ready_includes_slots(self):
+        """Test that the ready response includes slots"""
+        _, lines = self._run('{"cmd": "quit"}\n')
+        obj = _parse(lines[0])
+        self.assertEqual(obj['resp'], 'ready')
+        self.assertIn('slots', obj)
+        self.assertGreaterEqual(obj['slots'], 1)
+
+    def test_quit_command(self):
+        """Test worker with quit command"""
+        result, lines = self._run('{"cmd": "quit"}\n')
+        self.assertEqual(result, 0)
+        self.assertEqual(len(lines), 2)  # ready + quit_ack
+
+    def test_invalid_json(self):
+        """Test worker with invalid JSON input"""
+        _, lines = self._run('not json\n{"cmd": "quit"}\n')
+        self.assertEqual(len(lines), 3)  # ready + error + quit_ack
+        self.assertIn('invalid JSON', _parse(lines[1])['msg'])
+
+    def test_unknown_command(self):
+        """Test worker with unknown command"""
+        _, lines = self._run(
+            '{"cmd": "dance"}\n{"cmd": "quit"}\n')
+        self.assertIn('unknown command', _parse(lines[1])['msg'])
+
+    def test_blank_lines(self):
+        """Test worker ignores blank lines"""
+        _, lines = self._run('\n\n{"cmd": "quit"}\n\n')
+        self.assertEqual(len(lines), 2)  # ready + quit_ack
+
+    def test_configure_command(self):
+        """Test configure command in worker loop"""
+        result, lines = self._run(
+            '{"cmd": "configure", "settings": {"no_lto": true}}\n'
+            '{"cmd": "quit"}\n')
+        self.assertEqual(result, 0)
+        self.assertEqual(_parse(lines[1])['resp'], 'configure_done')
+
+
+class TestRunWorkerDispatch(_RunWorkerBase):
+    """Test run_worker command dispatch"""
+
+    @mock.patch('buildman.worker._cmd_setup', return_value=True)
+    def test_setup(self, mock_fn):
+        """Test setup command is dispatched"""
+        self._run('{"cmd": "setup", "work_dir": "/tmp"}\n'
+                  '{"cmd": "quit"}\n')
+        mock_fn.assert_called_once()
+
+    @mock.patch('buildman.worker._cmd_build_boards')
+    def test_build_boards(self, mock_fn):
+        """Test build_boards command is dispatched"""
+        self._run('{"cmd": "build_boards", "boards": [], '
+                  '"commits": []}\n{"cmd": "quit"}\n')
+        mock_fn.assert_called_once()
+
+    @mock.patch('buildman.worker._cmd_build_prepare')
+    def test_build_prepare(self, mock_fn):
+        """Test build_prepare command is dispatched"""
+        self._run('{"cmd": "build_prepare", "commits": []}\n'
+                  '{"cmd": "quit"}\n')
+        mock_fn.assert_called_once()
+
+    @mock.patch('buildman.worker._cmd_build_board')
+    def test_build_board(self, mock_fn):
+        """Test build_board command is dispatched"""
+        self._run('{"cmd": "build_board", "board": "x"}\n'
+                  '{"cmd": "quit"}\n')
+        mock_fn.assert_called_once()
+
+    @mock.patch('buildman.worker._cmd_build_done')
+    def test_build_done(self, mock_fn):
+        """Test build_done command is dispatched"""
+        self._run('{"cmd": "build_done"}\n{"cmd": "quit"}\n')
+        mock_fn.assert_called_once()
+
+    def test_stdin_eof(self):
+        """Test worker handles stdin EOF"""
+        result, _lines = self._run('')
+        self.assertEqual(result, 1)
+
+    def test_queue_empty_retry(self):
+        """Test dispatch retries on queue.Empty"""
+        eof = object()
+        mock_queue = mock.Mock()
+        mock_queue.get.side_effect = [
+            queue.Empty(),
+            '{"cmd": "quit"}\n',
+        ]
+        worker._dispatch_commands(mock_queue, eof, {})
+        self.assert_in_output('quit_ack')
+        self.assertEqual(mock_queue.get.call_count, 2)
+
+
+class TestWorkerMake(unittest.TestCase):
+    """Test _worker_make()"""
+
+    @mock.patch('buildman.worker.subprocess.Popen')
+    def test_success(self, mock_popen):
+        """Test successful make invocation"""
+        proc = mock.Mock()
+        proc.communicate.return_value = (b'built ok\n', b'')
+        proc.returncode = 0
+        mock_popen.return_value = proc
+
+        result = worker._worker_make(
+            None, None, None, '/tmp',
+            'O=/tmp/out', '-s', '-j', '4', 'sandbox_defconfig',
+            env={'PATH': '/usr/bin'})
+        self.assertEqual(result.return_code, 0)
+        self.assertEqual(result.stdout, 'built ok\n')
+        self.assertEqual(mock_popen.call_args[0][0][0], 'make')
+
+    @mock.patch('buildman.worker.subprocess.Popen',
+                side_effect=FileNotFoundError('no make'))
+    def test_make_not_found(self, _mock_popen):
+        """Test when make binary is not found"""
+        result = worker._worker_make(
+            None, None, None, '/tmp', env={})
+        self.assertEqual(result.return_code, 1)
+        self.assertIn('make failed', result.stderr)
+
+
+class TestWorkerBuilderThread(_ProtoTestBase):
+    """Test _WorkerBuilderThread"""
+
+    def _make_thread(self):
+        """Create an uninitialised thread instance for testing
+
+        Uses __new__ to avoid calling __init__ which requires a real
+        Builder. Tests must set any attributes they need.
+        """
+        return worker._WorkerBuilderThread.__new__(
+            worker._WorkerBuilderThread)
+
+    def test_write_result_is_noop(self):
+        """Test that _write_result does nothing"""
+        self._make_thread()._write_result(None, False, False)
+
+    def test_send_result(self):
+        """Test that _send_result sends a build_result message"""
+        thread = self._make_thread()
+        result = mock.Mock(
+            brd=mock.Mock(target='sandbox'),
+            commit_upto=0, return_code=0,
+            stderr='', stdout='', out_dir='/nonexistent')
+        thread._send_result(result)
+        self.assert_resp('resp', 'build_result')
+        self.assert_resp('board', 'sandbox')
+
+    def test_run_job_sends_heartbeat(self):
+        """Test run_job sends heartbeat"""
+        thread = self._make_thread()
+        thread.thread_num = 0
+        job = mock.Mock(brd=mock.Mock(target='sandbox'))
+        with mock.patch.object(worker._WorkerBuilderThread.__bases__[0],
+                               'run_job'):
+            thread.run_job(job)
+        self.assert_in_output('heartbeat')
+
+    def test_checkout_with_commits(self):
+        """Test _checkout with commits"""
+        thread = self._make_thread()
+        thread.builder = mock.Mock()
+        commit = mock.Mock(hash='abc123')
+        thread.builder.commits = [commit]
+        thread.builder.checkout = True
+
+        with mock.patch('buildman.worker._run_git') as mock_git, \
+             mock.patch('buildman.worker._remove_stale_lock'), \
+             tempfile.TemporaryDirectory() as tmpdir:
+            result = thread._checkout(0, tmpdir)
+
+        self.assertEqual(result, commit)
+        mock_git.assert_called_once()
+
+    def test_checkout_no_commits(self):
+        """Test _checkout without commits returns 'current'"""
+        thread = self._make_thread()
+        thread.builder = mock.Mock(commits=None)
+        self.assertEqual(thread._checkout(0, '/tmp'), 'current')
+
+
+class TestCmdBuildBoards(_ProtoTestBase):
+    """Test _cmd_build_boards"""
+
+    def _make_state(self, tmpdir, **overrides):
+        """Create a standard state dict for build tests"""
+        state = {
+            'work_dir': tmpdir,
+            'toolchains': mock.Mock(),
+            'settings': {},
+            'nthreads': 4,
+        }
+        state.update(overrides)
+        return state
+
+    def test_no_work_dir_error(self):
+        """Test error when no work directory set"""
+        worker._cmd_build_boards({
+            'boards': [{'board': 'x', 'arch': 'arm'}],
+            'commits': ['abc'],
+        }, {'work_dir': None})
+        self.assert_in_output('no work directory')
+
+    def test_no_boards_error(self):
+        """Test error when no boards specified"""
+        with tempfile.TemporaryDirectory() as tmpdir:
+            worker._cmd_build_boards(
+                {'boards': [], 'commits': ['abc']},
+                self._make_state(tmpdir))
+        self.assert_in_output('no boards')
+
+    def test_no_toolchains_error(self):
+        """Test error when toolchains not set up"""
+        with tempfile.TemporaryDirectory() as tmpdir:
+            worker._cmd_build_boards({
+                'boards': [{'board': 'x', 'arch': 'arm'}],
+                'commits': ['abc'],
+            }, {'work_dir': tmpdir})
+        self.assert_in_output('no toolchains')
+
+    @mock.patch('buildman.worker._setup_worktrees')
+    @mock.patch('buildman.worker.builder_mod.Builder')
+    @mock.patch('buildman.worker.ResultHandler')
+    def test_creates_builder(self, _mock_rh_cls, mock_builder_cls,
+                             mock_setup_wt):
+        """Test that build_boards creates a Builder correctly"""
+        mock_builder = mock.Mock()
+        mock_builder.run_build.return_value = (0, 0, [])
+        mock_builder_cls.return_value = mock_builder
+
+        with tempfile.TemporaryDirectory() as tmpdir:
+            worker._cmd_build_boards({
+                'boards': [
+                    {'board': 'sandbox', 'arch': 'sandbox'},
+                    {'board': 'rpi', 'arch': 'arm'},
+                ],
+                'commits': ['abc123', 'def456'],
+            }, self._make_state(tmpdir, nthreads=8,
+                                settings={'no_lto': True,
+                                          'force_build': True,
+                                          'kconfig_check': False}))
+
+        mock_setup_wt.assert_called_once()
+        kwargs = mock_builder_cls.call_args[1]
+        self.assertEqual(kwargs['thread_class'],
+                         worker._WorkerBuilderThread)
+        self.assertTrue(kwargs['no_lto'])
+        self.assertFalse(kwargs['kconfig_check'])
+
+        call_args = mock_builder.init_build.call_args
+        self.assertEqual(len(call_args[0][0]), 2)
+        self.assertIn('rpi', call_args[0][1])
+        self.assert_in_output('build_done')
+
+    @mock.patch('buildman.worker.builder_mod.Builder')
+    @mock.patch('buildman.worker.ResultHandler')
+    def test_no_commits(self, _mock_rh_cls, mock_builder_cls):
+        """Test build_boards with no commits (current source)"""
+        mock_builder = mock.Mock()
+        mock_builder.run_build.return_value = (0, 0, [])
+        mock_builder_cls.return_value = mock_builder
+
+        with tempfile.TemporaryDirectory() as tmpdir:
+            worker._cmd_build_boards({
+                'boards': [{'board': 'sandbox', 'arch': 'sandbox'}],
+                'commits': [None],
+            }, self._make_state(tmpdir))
+
+        self.assertIsNone(
+            mock_builder.init_build.call_args[0][0])
+        self.assertTrue(mock_builder_cls.call_args[1]['kconfig_check'])
+
+    @mock.patch('buildman.worker._setup_worktrees')
+    @mock.patch('buildman.worker.builder_mod.Builder')
+    @mock.patch('buildman.worker.ResultHandler')
+    def test_build_crash(self, _mock_rh, mock_builder_cls, _mock_wt):
+        """Test build_boards when run_build crashes"""
+        mock_builder = mock.Mock()
+        mock_builder.run_build.side_effect = RuntimeError('crash')
+        mock_builder_cls.return_value = mock_builder
+
+        with tempfile.TemporaryDirectory() as tmpdir:
+            worker._cmd_build_boards({
+                'boards': [{'board': 'x', 'arch': 'arm'}],
+                'commits': ['abc'],
+            }, self._make_state(tmpdir, nthreads=2))
+
+        self.assert_in_output('"exceptions": 1')
+
+
+class TestCmdBuildPrepare(_ProtoTestBase):
+    """Test _cmd_build_prepare()"""
+
+    def test_no_work_dir(self):
+        """Test error when no work directory"""
+        worker._cmd_build_prepare({}, {})
+        self.assert_in_output('no work directory')
+
+    def test_no_toolchains(self):
+        """Test error when no toolchains"""
+        with tempfile.TemporaryDirectory() as tmpdir:
+            worker._cmd_build_prepare({}, {'work_dir': tmpdir})
+        self.assert_in_output('no toolchains')
+
+    @mock.patch('buildman.worker._setup_worktrees')
+    @mock.patch('buildman.worker._create_builder')
+    def test_success(self, mock_create, _mock_wt):
+        """Test successful build_prepare"""
+        mock_bldr = mock.Mock(
+            base_dir='/tmp/test', commit_count=1,
+            work_in_output=False)
+        mock_create.return_value = mock_bldr
+
+        with tempfile.TemporaryDirectory() as tmpdir:
+            os.makedirs(os.path.join(tmpdir, '.git'))
+            state = {
+                'work_dir': tmpdir,
+                'toolchains': mock.Mock(),
+                'nthreads': 2,
+                'settings': {},
+            }
+            worker._cmd_build_prepare(
+                {'commits': ['abc123']}, state)
+
+        self.assert_in_output('build_prepare_done')
+        self.assertIn('builder', state)
+
+
+class TestCmdBuildBoard(_ProtoTestBase):
+    """Test _cmd_build_board()"""
+
+    def test_no_builder(self):
+        """Test error when no builder"""
+        worker._cmd_build_board({'board': 'x'}, {})
+        self.assert_in_output('no builder')
+
+    def test_queues_job(self):
+        """Test that build_board queues a job"""
+        mock_bldr = mock.Mock(
+            commit_count=1, count=0, work_in_output=False,
+            adjust_cfg=None, step=1)
+        worker._cmd_build_board(
+            {'board': 'sandbox', 'arch': 'sandbox'},
+            {'builder': mock_bldr, 'commits': None})
+        mock_bldr.queue.put.assert_called_once()
+
+
+class TestCmdBuildDone(_ProtoTestBase):
+    """Test _cmd_build_done()"""
+
+    def test_no_builder(self):
+        """Test build_done with no builder"""
+        worker._cmd_build_done({})
+        self.assert_resp('resp', 'build_done')
+        self.assert_resp('exceptions', 0)
+
+    def test_with_builder(self):
+        """Test build_done with a builder"""
+        mock_bldr = mock.Mock()
+        mock_bldr.run_build.return_value = (0, 0, [])
+        state = {'builder': mock_bldr, 'commits': ['abc']}
+        worker._cmd_build_done(state)
+        self.assert_resp('resp', 'build_done')
+        self.assertNotIn('builder', state)
+
+    def test_builder_crash(self):
+        """Test build_done when run_build crashes"""
+        mock_bldr = mock.Mock()
+        mock_bldr.run_build.side_effect = RuntimeError('boom')
+        state = {'builder': mock_bldr, 'commits': ['abc']}
+        worker._cmd_build_done(state)
+        self.assert_resp('exceptions', 1)
+        self.assertNotIn('builder', state)
+
+
+class TestSetupWorktrees(_ProtoTestBase):
+    """Test _setup_worktrees()"""
+
+    @mock.patch('buildman.worker._run_git')
+    def test_creates_worktrees(self, mock_git):
+        """Test that worktrees are created for each thread"""
+        with tempfile.TemporaryDirectory() as tmpdir:
+            git_dir = os.path.join(tmpdir, '.git')
+            os.makedirs(git_dir)
+            worker._setup_worktrees(tmpdir, git_dir, 3)
+
+        self.assertEqual(mock_git.call_count, 4)  # prune + 3 adds
+        self.assertIn('prune', mock_git.call_args_list[0][0])
+        resps = self.get_all_resp()
+        self.assertEqual(len(resps), 3)
+        for resp in resps:
+            self.assertEqual(resp['resp'], 'worktree_created')
+
+    @mock.patch('buildman.worker._run_git')
+    def test_skips_existing_valid_worktree(self, mock_git):
+        """Test that valid existing worktrees are reused"""
+        with tempfile.TemporaryDirectory() as tmpdir:
+            git_dir = os.path.join(tmpdir, '.git')
+            os.makedirs(git_dir)
+
+            thread_dir = os.path.join(tmpdir, '.bm-work', '00')
+            os.makedirs(thread_dir)
+            real_gitdir = os.path.join(git_dir, 'worktrees', '00')
+            os.makedirs(real_gitdir)
+            with open(os.path.join(thread_dir, '.git'), 'w',
+                       encoding='utf-8') as fout:
+                fout.write(f'gitdir: {real_gitdir}\n')
+
+            worker._setup_worktrees(tmpdir, git_dir, 1)
+
+        self.assertEqual(mock_git.call_count, 1)  # prune only
+
+    @mock.patch('buildman.worker._run_git')
+    def test_replaces_stale_clone(self, mock_git):
+        """Test that a full .git directory (old clone) is replaced"""
+        with tempfile.TemporaryDirectory() as tmpdir:
+            git_dir = os.path.join(tmpdir, '.git')
+            os.makedirs(git_dir)
+
+            thread_dir = os.path.join(tmpdir, '.bm-work', '00')
+            clone_git = os.path.join(thread_dir, '.git')
+            os.makedirs(os.path.join(clone_git, 'objects'))
+
+            worker._setup_worktrees(tmpdir, git_dir, 1)
+            self.assertFalse(os.path.isdir(clone_git))
+
+        self.assertEqual(mock_git.call_count, 2)  # prune + add
+
+    @mock.patch('buildman.worker._run_git')
+    def test_stale_dot_git_file(self, mock_git):
+        """Test removing stale .git file pointing to non-existent dir"""
+        with tempfile.TemporaryDirectory() as tmpdir:
+            git_dir = os.path.join(tmpdir, '.git')
+            os.makedirs(git_dir)
+
+            thread_dir = os.path.join(tmpdir, '.bm-work', '00')
+            os.makedirs(thread_dir)
+            dot_git = os.path.join(thread_dir, '.git')
+            with open(dot_git, 'w', encoding='utf-8') as fout:
+                fout.write('gitdir: /nonexistent\n')
+
+            worker._setup_worktrees(tmpdir, git_dir, 1)
+            self.assertFalse(os.path.isfile(dot_git))
+
+        self.assertGreaterEqual(mock_git.call_count, 2)
+
+
+class TestRunGit(unittest.TestCase):
+    """Test _run_git()"""
+
+    @mock.patch('buildman.worker.subprocess.Popen')
+    def test_success(self, mock_popen):
+        """Test successful git command"""
+        proc = mock.Mock()
+        proc.communicate.return_value = (b'', b'')
+        proc.returncode = 0
+        mock_popen.return_value = proc
+        worker._run_git('status', cwd='/tmp')
+
+    @mock.patch('buildman.worker.subprocess.Popen')
+    def test_failure(self, mock_popen):
+        """Test failed git command"""
+        proc = mock.Mock()
+        proc.communicate.return_value = (b'', b'fatal: bad ref\n')
+        proc.returncode = 128
+        mock_popen.return_value = proc
+        with self.assertRaises(OSError) as ctx:
+            worker._run_git('checkout', 'bad', cwd='/tmp')
+        self.assertIn('bad ref', str(ctx.exception))
+
+    @mock.patch('buildman.worker.subprocess.Popen')
+    def test_timeout(self, mock_popen):
+        """Test git command timeout"""
+        proc = mock.Mock()
+        proc.communicate.side_effect = [
+            subprocess.TimeoutExpired('git', 30),
+            (b'', b''),
+        ]
+        mock_popen.return_value = proc
+        with self.assertRaises(OSError) as ctx:
+            worker._run_git('fetch', cwd='/tmp', timeout=30)
+        self.assertIn('timed out', str(ctx.exception))
+
+
+class TestResolveGitDir(unittest.TestCase):
+    """Test _resolve_git_dir()"""
+
+    def test_directory(self):
+        """Test with a regular .git directory"""
+        with tempfile.TemporaryDirectory() as tmpdir:
+            git_dir = os.path.join(tmpdir, '.git')
+            os.makedirs(git_dir)
+            self.assertEqual(worker._resolve_git_dir(git_dir), git_dir)
+
+    def test_gitdir_file_absolute(self):
+        """Test with a .git file with absolute path"""
+        with tempfile.TemporaryDirectory() as tmpdir:
+            real_git = os.path.join(tmpdir, 'real.git')
+            os.makedirs(real_git)
+            dot_git = os.path.join(tmpdir, '.git')
+            with open(dot_git, 'w', encoding='utf-8') as fout:
+                fout.write(f'gitdir: {real_git}\n')
+            self.assertEqual(
+                worker._resolve_git_dir(dot_git), real_git)
+
+    def test_gitdir_file_relative(self):
+        """Test .git file with relative path"""
+        with tempfile.TemporaryDirectory() as tmpdir:
+            real_git = os.path.join(tmpdir, 'worktrees', 'wt1')
+            os.makedirs(real_git)
+            dot_git = os.path.join(tmpdir, '.git')
+            with open(dot_git, 'w', encoding='utf-8') as fout:
+                fout.write('gitdir: worktrees/wt1\n')
+            self.assertEqual(
+                worker._resolve_git_dir(dot_git),
+                os.path.join(tmpdir, 'worktrees', 'wt1'))
+
+
+class TestProcessManagement(unittest.TestCase):
+    """Test _kill_group(), _dbg() and signal handling"""
+
+    @mock.patch('os.killpg')
+    def test_kill_group_not_leader(self, mock_killpg):
+        """Test _kill_group when not group leader"""
+        old = worker._is_group_leader
+        worker._is_group_leader = False
+        worker._kill_group()
+        mock_killpg.assert_not_called()
+        worker._is_group_leader = old
+
+    @mock.patch('os.killpg')
+    @mock.patch('os.getpgrp', return_value=12345)
+    def test_kill_group_leader(self, _mock_grp, mock_killpg):
+        """Test _kill_group when group leader"""
+        old = worker._is_group_leader
+        worker._is_group_leader = True
+        worker._kill_group()
+        mock_killpg.assert_called_once()
+        worker._is_group_leader = old
+
+    @mock.patch('os.killpg', side_effect=OSError('no perm'))
+    @mock.patch('os.getpgrp', return_value=12345)
+    def test_kill_group_fails(self, _mock_grp, _mock_killpg):
+        """Test _kill_group handles OSError"""
+        old = worker._is_group_leader
+        worker._is_group_leader = True
+        worker._kill_group()  # should not raise
+        worker._is_group_leader = old
+
+    def test_dbg_off(self):
+        """Test _dbg when debug is off"""
+        old = worker._debug
+        worker._debug = False
+        worker._dbg('test message')  # should not raise
+        worker._debug = old
+
+    def test_dbg_on(self):
+        """Test _dbg when debug is on"""
+        old = worker._debug
+        worker._debug = True
+        with mock.patch('sys.stderr', new_callable=io.StringIO) as err:
+            worker._dbg('hello')
+            self.assertIn('hello', err.getvalue())
+        worker._debug = old
+
+    def test_dbg_stderr_oserror(self):
+        """Test _dbg handles OSError from stderr"""
+        old = worker._debug
+        worker._debug = True
+        err = mock.Mock()
+        err.write.side_effect = OSError('broken pipe')
+        with mock.patch('sys.stderr', err):
+            worker._dbg('will fail')  # should not raise
+        worker._debug = old
+
+    @mock.patch('buildman.worker._kill_group')
+    @mock.patch('os._exit')
+    def test_exit_handler(self, mock_exit, mock_kill):
+        """Test signal handler calls _kill_group and os._exit"""
+        handlers = {}
+        orig_signal = signal.signal
+
+        def capture_signal(signum, handler):
+            handlers[signum] = handler
+            return orig_signal(signum, signal.SIG_DFL)
+
+        with mock.patch('signal.signal', side_effect=capture_signal), \
+             mock.patch('buildman.worker.toolchain_mod.Toolchains'), \
+             mock.patch('sys.stdin', io.StringIO('{"cmd":"quit"}\n')), \
+             mock.patch('sys.stdout', io.StringIO()):
+            worker.run_worker()
+
+        handler = handlers.get(signal.SIGTERM)
+        self.assertIsNotNone(handler)
+        mock_kill.reset_mock()
+        mock_exit.reset_mock()
+        handler(signal.SIGTERM, None)
+        mock_kill.assert_called()
+        mock_exit.assert_called_with(1)
+
+
+class TestDoWorker(unittest.TestCase):
+    """Test do_worker()"""
+
+    @mock.patch('buildman.worker.run_worker', return_value=0)
+    @mock.patch('os.setpgrp')
+    @mock.patch('os.getpid', return_value=100)
+    @mock.patch('os.getpgrp', return_value=100)
+    def test_start(self, _grp, _pid, _setpgrp, mock_run):
+        """Test do_worker sets process group and runs"""
+        self.assertEqual(worker.do_worker(debug=False), 0)
+        mock_run.assert_called_once_with(False)
+
+    @mock.patch('buildman.worker.run_worker', return_value=0)
+    @mock.patch('os.setpgrp', side_effect=OSError('not allowed'))
+    @mock.patch('os.getpid', return_value=100)
+    @mock.patch('os.getpgrp', return_value=1)
+    def test_setpgrp_fails(self, _grp, _pid, _setpgrp, _mock_run):
+        """Test do_worker handles setpgrp failure"""
+        self.assertEqual(worker.do_worker(debug=False), 0)
+
+
+if __name__ == '__main__':
+    unittest.main()
diff --git a/tools/buildman/worker.py b/tools/buildman/worker.py
new file mode 100644
index 00000000000..ddf023a1979
--- /dev/null
+++ b/tools/buildman/worker.py
@@ -0,0 +1,985 @@ 
+# SPDX-License-Identifier: GPL-2.0+
+# Copyright 2026 Simon Glass <sjg@chromium.org>
+
+"""Worker mode for distributed builds
+
+A worker runs on a remote machine and receives build commands over stdin from a
+boss. Commands and responses use a JSON-lines protocol:
+
+Commands (boss -> worker, on stdin):
+    {"cmd": "setup", "work_dir": "/path"}
+    {"cmd": "configure", "settings": {"no_lto": true, ...}}
+    {"cmd": "build_boards",
+     "boards": [{"board": "sandbox", "arch": "sandbox"}],
+     "commits": ["<hash>", ...]}
+    {"cmd": "build_prepare", "commits": ["<hash>", ...]}
+    {"cmd": "build_board", "board": "sandbox", "arch": "sandbox"}
+    {"cmd": "build_done"}
+    {"cmd": "quit"}
+
+Responses (worker -> boss, on stdout):
+    Each line is prefixed with 'BM> ' followed by a JSON object:
+    BM> {"resp": "ready", "nthreads": 8, "slots": 2}
+    BM> {"resp": "setup_done", "work_dir": "/path", "git_dir": "/path/.git"}
+    BM> {"resp": "configure_done"}
+    BM> {"resp": "build_prepare_done"}
+    BM> {"resp": "build_result", "board": "sandbox", "commit_upto": 0,
+         "return_code": 0, "stderr": "", "sizes": {...}}
+    BM> {"resp": "build_done", "exceptions": 0}
+    BM> {"resp": "error", "msg": "..."}
+    BM> {"resp": "quit_ack"}
+
+The 'BM> ' prefix allows the boss to distinguish protocol messages from
+any stray output on the SSH connection (e.g. login banners, warnings).
+
+The worker uses Builder and BuilderThread from the local build path,
+with a custom BuilderThread subclass that sends results over SSH
+instead of writing them to disk.  This means the worker inherits the
+same board-first scheduling, per-thread worktrees, incremental builds
+and retry logic as local builds.
+
+Typical flow (batch mode):
+    1. Boss starts worker: ssh host buildman --worker
+    2. Worker sends 'ready' with nthreads
+    3. Boss sends 'setup' to create work directory with a git repo
+    4. Worker sends 'setup_done' with git_dir path
+    5. Boss pushes source: git push ssh://host/<git_dir> HEAD:refs/heads/work
+    6. Boss sends 'build_boards' with all boards and commits
+    7. Worker creates a Builder which sets up per-thread worktrees
+       and runs BuilderThread instances that pick boards from a queue,
+       build all commits for each, and stream 'build_result' responses
+    8. Boss sends 'quit' when done
+
+Demand-driven flow:
+    Steps 1-5 same as above, then:
+    6. Boss sends 'build_prepare' with commits
+    7. Worker creates Builder and worktrees, sends 'build_prepare_done'
+    8. Boss sends 'build_board' commands one at a time from a shared
+       pool, sending more as results arrive to keep threads busy
+    9. Boss sends 'build_done' when no more boards
+   10. Worker drains queue, sends 'build_done', boss sends 'quit'
+"""
+
+import json
+import os
+import queue
+import signal
+import shutil
+import subprocess
+import sys
+import tempfile
+import traceback
+import threading
+
+from buildman.board import Board
+from buildman import builderthread
+from buildman import builder as builder_mod
+from buildman.outcome import DisplayOptions
+from buildman.resulthandler import ResultHandler
+from buildman import toolchain as toolchain_mod
+from u_boot_pylib import command
+from u_boot_pylib import terminal
+
+from patman.commit import Commit
+
+# Protocol prefix for all worker responses
+RESPONSE_PREFIX = 'BM> '
+
+# Lock to prevent interleaved stdout writes from concurrent build threads
+_send_lock = threading.Lock()
+
+# Lock for debug output to stderr
+_debug_lock = threading.Lock()
+
+# Whether debug output is enabled (set by run_worker)
+_debug = False  # pylint: disable=C0103
+
+# Whether this process is a process group leader (set by do_worker)
+_is_group_leader = False  # pylint: disable=C0103
+
+# The real stdout for protocol messages (set by run_worker)
+_protocol_out = None  # pylint: disable=C0103
+
+
+def _kill_group():
+    """Kill all processes in our process group
+
+    Sends SIGKILL to our entire process group, which includes this
+    process plus all make, cc1, as, ld, etc. spawned by build threads.
+    Only works if do_worker() confirmed we are the process group leader.
+    Does nothing otherwise, to avoid killing unrelated processes
+    (e.g. the test runner).
+    """
+    if not _is_group_leader:
+        _dbg('_kill_group: not leader, skipping')
+        return
+    _dbg(f'_kill_group: killing pgid {os.getpgrp()}')
+    try:
+        os.killpg(os.getpgrp(), signal.SIGKILL)
+    except OSError as exc:
+        _dbg(f'_kill_group: killpg failed: {exc}')
+
+
+def _dbg(msg):
+    """Print a debug message to stderr if debug mode is enabled
+
+    Args:
+        msg (str): Message to print
+    """
+    if _debug:
+        with _debug_lock:
+            try:
+                sys.stderr.write(f'W: {msg}\n')
+                sys.stderr.flush()
+            except OSError:
+                pass
+
+
+def _send(obj):
+    """Send a JSON response to the boss
+
+    Thread-safe: uses a lock to prevent interleaved writes from
+    concurrent build threads. Writes to _protocol_out (the real
+    stdout) rather than sys.stdout which is redirected to stderr.
+
+    Args:
+        obj (dict): Response object to send
+    """
+    out = _protocol_out or sys.stdout
+    with _send_lock:
+        out.write(RESPONSE_PREFIX + json.dumps(obj) + '\n')
+        out.flush()
+
+
+def _send_error(msg):
+    """Send an error response
+
+    Args:
+        msg (str): Error message
+    """
+    _send({'resp': 'error', 'msg': msg})
+
+
+def _send_build_result(board, commit_upto, return_code, **kwargs):
+    """Send a build result response
+
+    Args:
+        board (str): Board target name
+        commit_upto (int): Commit number
+        return_code (int): Build return code
+        **kwargs: Optional keys: stderr, stdout, sizes
+    """
+    result = {
+        'resp': 'build_result',
+        'board': board,
+        'commit_upto': commit_upto,
+        'return_code': return_code,
+        'stderr': kwargs.get('stderr', ''),
+        'stdout': kwargs.get('stdout', ''),
+        'load_avg': _get_load_avg(),
+    }
+    sizes = kwargs.get('sizes')
+    if sizes:
+        result['sizes'] = sizes
+    _send(result)
+
+
+def _get_nthreads():
+    """Get the number of available build threads
+
+    Returns:
+        int: Number of threads available for building
+    """
+    try:
+        return os.cpu_count() or 1
+    except (AttributeError, NotImplementedError):
+        return 1
+
+
+def _get_load_avg():
+    """Get the 1-minute load average
+
+    Returns:
+        float: 1-minute load average, or 0.0 if unavailable
+    """
+    try:
+        with open('/proc/loadavg', encoding='utf-8') as inf:
+            return float(inf.read().split()[0])
+    except (OSError, ValueError, IndexError):
+        return 0.0
+
+
+def _get_sizes(out_dir):
+    """Get the image sizes from a build output directory
+
+    Uses subprocess.Popen directly instead of command.run_pipe() to
+    avoid the select() FD_SETSIZE limit in cros_subprocess. With many
+    threads running builds, pipe file descriptors can exceed 1024,
+    causing select() to fail or corrupt memory.
+
+    Args:
+        out_dir (str): Build output directory
+
+    Returns:
+        dict: Size information, or empty dict if not available
+    """
+    elf = os.path.join(out_dir, 'u-boot')
+    if not os.path.exists(elf):
+        return {}
+    try:
+        proc = subprocess.Popen(  # pylint: disable=R1732
+            ['size', elf], stdin=subprocess.DEVNULL,
+            stdout=subprocess.PIPE, stderr=subprocess.PIPE)
+        stdout, _ = proc.communicate()
+        if proc.returncode == 0:
+            # Strip the header line from size output, keeping only data lines.
+            # This matches the format that local builderthread produces.
+            lines = stdout.decode('utf-8', errors='replace').splitlines()
+            if len(lines) > 1:
+                return {'raw': '\n'.join(lines[1:])}
+    except OSError:
+        pass
+    return {}
+
+
+def _worker_make(_commit, _brd, _stage, cwd, *args, **kwargs):
+    """Run make using subprocess.Popen to avoid select() FD limit
+
+    On workers with many parallel builds, file descriptor numbers can
+    exceed FD_SETSIZE (1024), causing the select()-based
+    communicate_filter in cros_subprocess to fail. Using
+    subprocess.Popen with communicate() avoids this.
+
+    Args:
+        _commit: Unused (API compatibility with Builder.make)
+        _brd: Unused
+        _stage: Unused
+        cwd (str): Working directory
+        *args: Make arguments
+        **kwargs: Must include 'env' dict
+
+    Returns:
+        CommandResult: Result of the make command
+    """
+    env = kwargs.get('env')
+    cmd = ['make'] + list(args)
+    try:
+        proc = subprocess.Popen(  # pylint: disable=R1732
+            cmd, cwd=cwd, env=env, stdin=subprocess.DEVNULL,
+            stdout=subprocess.PIPE, stderr=subprocess.PIPE)
+        stdout, stderr = proc.communicate()
+        result = command.CommandResult()
+        result.stdout = stdout.decode('utf-8', errors='replace')
+        result.stderr = stderr.decode('utf-8', errors='replace')
+        result.combined = result.stdout + result.stderr
+        result.return_code = proc.returncode
+        return result
+    except Exception as exc:  # pylint: disable=W0718
+        result = command.CommandResult()
+        result.return_code = 1
+        result.stderr = f'make failed to start: {exc}'
+        result.combined = result.stderr
+        return result
+
+
+def _run_git(*args, cwd=None, timeout=60):
+    """Run a git command using subprocess.Popen to avoid select() FD limit
+
+    On workers with many parallel builds, file descriptor numbers can
+    exceed FD_SETSIZE (1024), causing the select()-based cros_subprocess
+    to fail. Using subprocess.Popen with communicate() avoids this.
+
+    Args:
+        *args: Git command arguments (without 'git' prefix)
+        cwd (str): Working directory
+        timeout (int): Timeout in seconds for the command
+
+    Returns:
+        CommandResult: Result of the git command
+
+    Raises:
+        OSError: If the git command fails or times out
+    """
+    cmd = ['git'] + list(args)
+    proc = subprocess.Popen(  # pylint: disable=R1732
+        cmd, cwd=cwd, stdin=subprocess.DEVNULL,
+        stdout=subprocess.PIPE, stderr=subprocess.PIPE)
+    try:
+        _, stderr = proc.communicate(timeout=timeout)
+    except subprocess.TimeoutExpired as exc:
+        proc.kill()
+        proc.communicate()
+        raise OSError(
+            f'git command timed out after {timeout}s: {cmd}') from exc
+    if proc.returncode != 0:
+        raise OSError(stderr.decode('utf-8', errors='replace').strip())
+
+
+def _resolve_git_dir(git_dir):
+    """Resolve a .git entry to the actual git directory
+
+    For a regular repo, .git is a directory and is returned as-is.
+    For a worktree, .git is a file containing 'gitdir: <path>' and
+    the referenced directory is returned.
+
+    Args:
+        git_dir (str): Path to a .git file or directory
+
+    Returns:
+        str: Path to the actual git directory
+    """
+    if os.path.isfile(git_dir):
+        with open(git_dir, encoding='utf-8') as inf:
+            line = inf.readline().strip()
+        if line.startswith('gitdir: '):
+            path = line[8:]
+            if not os.path.isabs(path):
+                path = os.path.join(os.path.dirname(git_dir), path)
+            return path
+    return git_dir
+
+
+def _remove_stale_lock(git_dir):
+    """Remove a stale index.lock left by a previous SIGKILL'd run
+
+    When the worker is killed (e.g. boss timeout or Ctrl-C), any
+    in-progress git checkout leaves behind an index.lock. This must
+    be cleaned up before the next checkout can proceed.
+
+    Args:
+        git_dir (str): Path to a .git file or directory
+    """
+    real_dir = _resolve_git_dir(git_dir)
+    lock = os.path.join(real_dir, 'index.lock')
+    try:
+        os.remove(lock)
+    except FileNotFoundError:
+        pass
+
+
+def _setup_worktrees(work_dir, git_dir, num_threads):
+    """Create per-thread worktrees sequentially with progress messages
+
+    Sets up git worktrees for each build thread before the Builder is
+    created. This avoids the problems of lazy per-thread creation:
+    concurrent threads contending on the index lock, and no progress
+    messages reaching the boss while threads are blocked.
+
+    For existing valid worktrees (from a previous run), creation is
+    skipped. A 'worktree_created' message is sent after each thread
+    so the boss can show setup progress.
+
+    Args:
+        work_dir (str): Base work directory (Builder's base_dir)
+        git_dir (str): Git directory path (e.g. work_dir/.git)
+        num_threads (int): Number of threads to create worktrees for
+    """
+    bm_work = os.path.join(work_dir, '.bm-work')
+    os.makedirs(bm_work, exist_ok=True)
+    src_dir = os.path.abspath(git_dir)
+
+    # Clean up stale locks from a previous SIGKILL'd run. Both the
+    # main repo and the worktree gitdirs can have stale index.lock
+    # files that would make git commands hang indefinitely.
+    _remove_stale_lock(git_dir)
+
+    # Prune stale worktree entries before creating new ones
+    _run_git('worktree', 'prune', cwd=work_dir)
+
+    for i in range(num_threads):
+        thread_dir = os.path.join(bm_work, f'{i:02d}')
+        dot_git = os.path.join(thread_dir, '.git')
+
+        need_worktree = not os.path.exists(dot_git)
+        if not need_worktree:
+            if os.path.isdir(dot_git):
+                # This is a full clone from an older buildman version,
+                # not a worktree. Remove it so we can create a proper
+                # worktree that shares objects with the main repo.
+                shutil.rmtree(thread_dir)
+                need_worktree = True
+            else:
+                # Validate existing worktree — it may be stale from a
+                # previous killed run whose gitdir was pruned
+                real_dir = _resolve_git_dir(dot_git)
+                if not os.path.isdir(real_dir):
+                    os.remove(dot_git)
+                    need_worktree = True
+
+        if need_worktree:
+            os.makedirs(thread_dir, exist_ok=True)
+            _run_git('--git-dir', src_dir, 'worktree',
+                     'add', '.', '--detach', cwd=thread_dir)
+        else:
+            _remove_stale_lock(dot_git)
+
+        _send({'resp': 'worktree_created', 'thread': i})
+
+
+class _WorkerBuilderThread(builderthread.BuilderThread):
+    """BuilderThread subclass that sends results over SSH
+
+    Overrides _write_result() (no-op, since the worker doesn't write
+    build output to a local directory tree), _send_result() (sends
+    the result back to the boss as a JSON protocol message instead of
+    putting it in the builder's out_queue), and _checkout() (uses
+    subprocess.Popen for git checkout to avoid the select() FD_SETSIZE
+    limit on machines with many threads).
+
+    Worktrees are created sequentially before the Builder starts
+    threads (see _setup_worktrees), so _checkout() only needs to
+    do the checkout itself.
+    """
+
+    def run_job(self, job):
+        """Run a job, sending a heartbeat so the boss knows we're alive"""
+        _send({'resp': 'heartbeat', 'board': job.brd.target,
+               'thread': self.thread_num})
+        super().run_job(job)
+
+    def _write_result(self, result, keep_outputs, work_in_output):
+        """Skip disk writes — results are sent over SSH"""
+
+    def _send_result(self, result):
+        """Send the build result to the boss over the SSH protocol"""
+        sizes = {}
+        if result.out_dir and result.return_code == 0:
+            sizes = _get_sizes(result.out_dir)
+        _send_build_result(
+            result.brd.target, result.commit_upto, result.return_code,
+            stderr=result.stderr or '', stdout=result.stdout or '',
+            sizes=sizes)
+
+    def _checkout(self, commit_upto, work_dir):
+        """Check out a commit using subprocess to avoid select() FD limit
+
+        Worktrees are already set up by _setup_worktrees() before
+        the Builder starts threads, so this only needs to do the
+        checkout itself.
+        """
+        if self.builder.commits:
+            commit = self.builder.commits[commit_upto]
+            if self.builder.checkout:
+                git_dir = os.path.join(work_dir, '.git')
+                _remove_stale_lock(git_dir)
+                _run_git('checkout', '-f', commit.hash, cwd=work_dir)
+        else:
+            commit = 'current'
+        return commit
+
+
+def _cmd_setup(req, state):
+    """Handle the 'setup' command
+
+    Creates or re-uses a work directory and initialises a git repo in it.
+    The boss can then use 'git push' over SSH to send source code
+    to the repo before issuing build commands.
+
+    Also scans for available toolchains so that the worker can select
+    the right cross-compiler for each board's architecture.
+
+    Args:
+        req (dict): Request with keys:
+            work_dir (str): Working directory path (auto-created if empty)
+        state (dict): Worker state, updated in place
+
+    Returns:
+        bool: True on success
+    """
+    work_dir = req.get('work_dir')
+    if not work_dir:
+        work_dir = tempfile.mkdtemp(prefix='bm-worker-')
+        state['auto_work_dir'] = True
+    os.makedirs(work_dir, exist_ok=True)
+    state['work_dir'] = work_dir
+
+    # Initialise a git repo so the boss can push to it
+    git_dir = os.path.join(work_dir, '.git')
+    if not os.path.isdir(git_dir):
+        try:
+            command.run_one('git', 'init', cwd=work_dir,
+                            capture=True, raise_on_error=True)
+        except command.CommandExc as exc:
+            _send_error(f'git init failed: {exc}')
+            return False
+
+    _send({'resp': 'setup_done', 'work_dir': work_dir,
+           'git_dir': git_dir})
+    return True
+
+
+def _cmd_configure(req, state):
+    """Handle the 'configure' command
+
+    Stores build settings received from the boss. These settings mirror
+    the command-line flags that affect how make is invoked (verbose,
+    allow_missing, no_lto, etc.) and are applied to every subsequent
+    build.
+
+    Args:
+        req (dict): Request with 'settings' dict containing build flags
+        state (dict): Worker state, updated in place
+
+    Returns:
+        bool: True on success
+    """
+    settings = req.get('settings', {})
+    state['settings'] = settings
+    _dbg(f'configure: {settings}')
+    _send({'resp': 'configure_done'})
+    return True
+
+
+def _parse_commits(commit_hashes):
+    """Convert commit hashes to Commit objects
+
+    Args:
+        commit_hashes (list): Commit hashes, or [None] for current source
+
+    Returns:
+        list of Commit or None: Commit objects, or None for current source
+    """
+    if commit_hashes and commit_hashes[0] is not None:
+        return [Commit(h) for h in commit_hashes]
+    return None
+
+
+def _parse_boards(board_dicts):
+    """Convert board dicts from the boss into Board objects
+
+    Args:
+        board_dicts (list of dict): Each with 'board' and 'arch' keys
+
+    Returns:
+        dict: target_name -> Board mapping
+    """
+    board_selected = {}
+    for bd in board_dicts:
+        target = bd['board']
+        brd = Board('Active', bd.get('arch', ''), '', '', '',
+                     target, target, target)
+        board_selected[target] = brd
+    return board_selected
+
+
+def _run_build(bldr, commits, board_selected):
+    """Run a build and send the result over the protocol
+
+    Args:
+        bldr (Builder): Configured builder
+        commits (list of Commit or None): Commits to build
+        board_selected (dict): target_name -> Board mapping
+    """
+    bldr.init_build(commits, board_selected, keep_outputs=False,
+                    verbose=False, fragments=None)
+    try:
+        _fail, _warned, exceptions = bldr.run_build(delay_summary=True)
+    except Exception as exc:  # pylint: disable=W0718
+        _dbg(f'run_build crashed: {exc}')
+        _dbg(traceback.format_exc())
+        _send({'resp': 'build_done', 'exceptions': 1})
+        return
+    _send({'resp': 'build_done',
+           'exceptions': len(exceptions)})
+
+
+def _cmd_build_boards(req, state):
+    """Handle the 'build_boards' command
+
+    Creates a Builder with a _WorkerBuilderThread subclass and runs the
+    build. Results are streamed back over the SSH protocol as each
+    commit completes.
+
+    Args:
+        req (dict): Request with:
+            boards (list of dict): Each with 'board' and 'arch'
+            commits (list): Commit hashes in order, or [None] for
+                current source
+        state (dict): Worker state
+    """
+    work_dir = state.get('work_dir')
+    if not work_dir:
+        _send_error('no work directory set up')
+        return
+
+    board_dicts = req.get('boards', [])
+    if not board_dicts:
+        _send_error('no boards specified')
+        return
+
+    toolchains = state.get('toolchains')
+    if not toolchains:
+        _send_error('no toolchains available (run setup first)')
+        return
+
+    nthreads = state.get('nthreads', _get_nthreads())
+    commits = _parse_commits(req.get('commits', [None]))
+    board_selected = _parse_boards(board_dicts)
+
+    # Calculate thread/job split: enough threads to keep all CPUs
+    # busy, with each thread running make with -j proportionally
+    num_threads = min(nthreads, len(board_selected))
+    num_jobs = max(1, nthreads // num_threads)
+
+    _dbg(f'build_boards: {len(board_selected)} boards x '
+         f'{len(commits) if commits else 1} commits '
+         f'threads={num_threads} -j{num_jobs}')
+
+    # Set up worktrees sequentially before creating the Builder.
+    # This sends progress messages so the boss can show setup status
+    # (e.g. [ruru 3/256]) and avoids the build timeout firing before
+    # any build results arrive.
+    git_dir = os.path.join(work_dir, '.git')
+    if commits is not None:
+        _send({'resp': 'build_started', 'num_threads': num_threads})
+        _setup_worktrees(work_dir, git_dir, num_threads)
+
+    bldr = _create_builder(state, num_threads, num_jobs)
+    _run_build(bldr, commits, board_selected)
+
+
+def _create_builder(state, num_threads, num_jobs):
+    """Create a Builder configured for worker use
+
+    Args:
+        state (dict): Worker state with toolchains, work_dir, settings
+        num_threads (int): Number of build threads
+        num_jobs (int): Make -j value per thread
+
+    Returns:
+        Builder: Configured builder with threads started and waiting
+    """
+    work_dir = state['work_dir']
+    git_dir = os.path.join(work_dir, '.git')
+    settings = state.get('settings', {})
+    toolchains = state['toolchains']
+
+    col = terminal.Color(terminal.COLOR_NEVER)
+    opts = DisplayOptions(
+        show_errors=False, show_sizes=False, show_detail=False,
+        show_bloat=False, show_config=False, show_environment=False,
+        show_unknown=False, ide=True, list_error_boards=False)
+    result_handler = ResultHandler(col, opts)
+
+    bldr = builder_mod.Builder(
+        toolchains, work_dir, git_dir,
+        num_threads, num_jobs, col, result_handler,
+        thread_class=_WorkerBuilderThread,
+        make_func=_worker_make,
+        handle_signals=False,
+        lazy_thread_setup=True,
+        checkout=True,
+        per_board_out_dir=False,
+        force_build=settings.get('force_build', False),
+        force_build_failures=settings.get('force_build', False),
+        no_lto=settings.get('no_lto', False),
+        allow_missing=settings.get('allow_missing', False),
+        verbose_build=settings.get('verbose_build', False),
+        warnings_as_errors=settings.get('warnings_as_errors', False),
+        mrproper=settings.get('mrproper', False),
+        fallback_mrproper=settings.get('fallback_mrproper', False),
+        config_only=settings.get('config_only', False),
+        reproducible_builds=settings.get('reproducible_builds', False),
+        force_config_on_failure=True,
+        kconfig_check=settings.get('kconfig_check', True),
+    )
+    result_handler.set_builder(bldr)
+    return bldr
+
+
+def _cmd_build_prepare(req, state):
+    """Handle the 'build_prepare' command
+
+    Creates a Builder with threads waiting for jobs. The boss follows
+    this with 'build_board' commands to feed boards one at a time, then
+    'build_done' to signal completion.
+
+    Args:
+        req (dict): Request with:
+            commits (list): Commit hashes in order, or [None] for
+                current source
+        state (dict): Worker state
+    """
+    work_dir = state.get('work_dir')
+    if not work_dir:
+        _send_error('no work directory set up')
+        return
+
+    toolchains = state.get('toolchains')
+    if not toolchains:
+        _send_error('no toolchains available (run setup first)')
+        return
+
+    commits = _parse_commits(req.get('commits', [None]))
+    nthreads = state.get('nthreads', _get_nthreads())
+    max_boards = req.get('max_boards', 0)
+
+    num_threads = nthreads
+    num_jobs = None  # dynamic: nthreads / active_boards
+
+    _dbg(f'build_prepare: '
+         f'{len(commits) if commits else 1} commits '
+         f'threads={num_threads} max_boards={max_boards} -j=dynamic')
+
+    # Set up worktrees before creating the Builder
+    git_dir = os.path.join(work_dir, '.git')
+    if commits is not None:
+        _send({'resp': 'build_started', 'num_threads': num_threads})
+        _setup_worktrees(work_dir, git_dir, num_threads)
+
+    bldr = _create_builder(state, num_threads, num_jobs)
+    bldr.max_boards = max_boards
+
+    # Minimal init: set commits and prepare directories. Threads are
+    # already started by the Builder constructor, waiting on the queue.
+    bldr.commit_count = len(commits) if commits else 1
+    bldr.commits = commits
+    bldr.verbose = False
+    builderthread.mkdir(bldr.base_dir, parents=True)
+    bldr.prepare_working_space(num_threads, commits is not None)
+    bldr.prepare_output_space()
+    bldr.start_time = builder_mod.datetime.now()
+    bldr.count = 0
+    bldr.upto = bldr._warned = bldr.fail = 0
+    bldr.timestamps = builder_mod.collections.deque()
+    bldr.thread_exceptions = []
+
+    state['builder'] = bldr
+    state['commits'] = commits
+    _send({'resp': 'build_prepare_done'})
+
+
+def _cmd_build_board(req, state):
+    """Handle the 'build_board' command
+
+    Adds one board to the running Builder's job queue.
+
+    Args:
+        req (dict): Request with:
+            board (str): Board target name
+            arch (str): Board architecture
+        state (dict): Worker state with 'builder' from build_prepare
+    """
+    bldr = state.get('builder')
+    if not bldr:
+        _send_error('no builder (send build_prepare first)')
+        return
+
+    target = req['board']
+    arch = req.get('arch', '')
+    brd = Board('Active', arch, '', '', '', target, target, target)
+    commits = state.get('commits')
+
+    job = builderthread.BuilderJob()
+    job.brd = brd
+    job.commits = commits
+    job.keep_outputs = False
+    job.work_in_output = bldr.work_in_output
+    job.adjust_cfg = bldr.adjust_cfg
+    job.fragments = None
+    job.step = bldr.step
+    bldr.count += bldr.commit_count
+    bldr.queue.put(job)
+
+
+def _cmd_build_done(state):
+    """Handle the 'build_done' command from the boss
+
+    Waits for all queued jobs to finish, then sends build_done.
+
+    Args:
+        state (dict): Worker state with 'builder' from build_prepare
+    """
+    bldr = state.get('builder')
+    if not bldr:
+        _send({'resp': 'build_done', 'exceptions': 0})
+        return
+
+    try:
+        _fail, _warned, exceptions = bldr.run_build(delay_summary=True)
+    except Exception as exc:  # pylint: disable=W0718
+        _dbg(f'run_build crashed: {exc}')
+        _dbg(traceback.format_exc())
+        _send({'resp': 'build_done', 'exceptions': 1})
+        state.pop('builder', None)
+        state.pop('commits', None)
+        return
+    _send({'resp': 'build_done',
+           'exceptions': len(exceptions)})
+    state.pop('builder', None)
+    state.pop('commits', None)
+
+
+def _cmd_quit(state):
+    """Handle the 'quit' command
+
+    Cleans up the work directory if auto-created, sends quit_ack,
+    then kills all child processes (make, cc1, etc.) and this process
+    via SIGKILL to the process group.
+
+    Args:
+        state (dict): Worker state
+    """
+    work_dir = state.get('work_dir', '')
+    if work_dir and state.get('auto_work_dir'):
+        shutil.rmtree(work_dir, ignore_errors=True)
+    _send({'resp': 'quit_ack'})
+    _kill_group()
+
+
+def run_worker(debug=False):
+    """Main worker loop
+
+    Reads JSON commands from stdin and dispatches them. Sends responses
+    as 'BM> ' prefixed JSON lines on stdout. Builds run in parallel
+    using Builder with a _WorkerBuilderThread subclass.
+
+    Args:
+        debug (bool): True to print debug messages to stderr
+
+    Returns:
+        int: 0 on success, non-zero on error
+    """
+    global _debug, _protocol_out  # pylint: disable=W0603
+
+    _debug = debug
+
+    # Save the real stdout for protocol messages, then redirect
+    # stdout to stderr so that tprint and other library output
+    # doesn't corrupt the JSON protocol on the SSH pipe.
+    _protocol_out = sys.stdout
+    sys.stdout = sys.stderr
+
+    # Exit immediately on signals, killing all child processes.
+    # SIGHUP is sent by sshd when the SSH connection drops.
+    # _kill_group() sends SIGKILL to the process group which terminates
+    # everything including this process.
+    def _exit_handler(_signum, _frame):
+        _kill_group()
+        os._exit(1)  # pylint: disable=W0212
+    signal.signal(signal.SIGTERM, _exit_handler)
+    signal.signal(signal.SIGINT, _exit_handler)
+    signal.signal(signal.SIGHUP, _exit_handler)
+
+    nthreads = _get_nthreads()
+
+    # Scan for toolchains at startup so we can select the right
+    # cross-compiler for each board's architecture. The boss sets up
+    # the git repo and pushes source via SSH before starting us, so
+    # there is no 'setup' command — we are ready as soon as we start.
+    toolchains = toolchain_mod.Toolchains()
+    toolchains.get_settings(show_warning=False)
+    toolchains.scan(verbose=False, raise_on_error=False)
+
+    _dbg(f'ready: {nthreads} threads')
+    _send({'resp': 'ready', 'nthreads': nthreads, 'slots': nthreads})
+
+    stop_event = threading.Event()
+    state = {
+        'work_dir': os.getcwd(),
+        'nthreads': nthreads,
+        'toolchains': toolchains,
+        'stop': stop_event,
+    }
+
+    # Read stdin in a background thread so that EOF (boss
+    # disconnected) is detected even while a long-running command
+    # like build_boards is executing. When EOF is seen, kill the
+    # entire process group so that all child make processes die too.
+    cmd_queue = queue.Queue()
+    eof_sentinel = object()
+
+    def _stdin_reader():
+        while True:
+            line = sys.stdin.readline()
+            if not line:
+                # Boss disconnected — kill everything immediately.
+                # _kill_group() handles production (kills the process
+                # group); stop_event handles tests (where _kill_group
+                # is a no-op) by unblocking build threads.
+                _dbg('stdin closed, killing group')
+                stop_event.set()
+                _kill_group()
+                cmd_queue.put(eof_sentinel)
+                return
+            line = line.strip()
+            if line:
+                cmd_queue.put(line)
+
+    threading.Thread(target=_stdin_reader, daemon=True).start()
+
+    return _dispatch_commands(cmd_queue, eof_sentinel, state)
+
+
+def _dispatch_commands(cmd_queue, eof_sentinel, state):
+    """Read commands from the queue and dispatch them
+
+    Args:
+        cmd_queue (queue.Queue): Queue of JSON command strings
+        eof_sentinel (object): Sentinel value indicating stdin closed
+        state (dict): Worker state
+
+    Returns:
+        int: 0 on clean quit, 1 on unexpected stdin close
+    """
+    while True:
+        try:
+            line = cmd_queue.get(timeout=1)
+        except queue.Empty:
+            continue
+        if line is eof_sentinel:
+            break
+
+        try:
+            req = json.loads(line)
+        except json.JSONDecodeError as exc:
+            _send_error(f'invalid JSON: {exc}')
+            continue
+
+        cmd = req.get('cmd', '')
+
+        if cmd == 'setup':
+            _cmd_setup(req, state)
+        elif cmd == 'configure':
+            _cmd_configure(req, state)
+        elif cmd == 'build_boards':
+            _cmd_build_boards(req, state)
+        elif cmd == 'build_prepare':
+            _cmd_build_prepare(req, state)
+        elif cmd == 'build_board':
+            _cmd_build_board(req, state)
+        elif cmd == 'build_done':
+            _cmd_build_done(state)
+        elif cmd == 'quit':
+            _cmd_quit(state)
+            return 0
+        else:
+            _send_error(f'unknown command: {cmd}')
+
+    # stdin closed without quit — boss was interrupted
+    return 1
+
+
+def do_worker(debug=False):
+    """Entry point for 'buildman --worker'
+
+    Args:
+        debug (bool): True to print debug messages to stderr
+
+    Returns:
+        int: 0 on success
+    """
+    global _is_group_leader  # pylint: disable=W0603
+
+    # Ensure we are a process group leader so _kill_group() can kill
+    # all child processes (make, cc1, as, ld) on exit. When launched
+    # via SSH, sshd already makes us session + group leader (pid ==
+    # pgid), so setpgrp() fails with EPERM — that's fine. This is
+    # done here rather than in run_worker() so that tests can call
+    # run_worker() without becoming a process group leader.
+    try:
+        os.setpgrp()
+    except OSError:
+        pass
+    _is_group_leader = os.getpid() == os.getpgrp()
+    return run_worker(debug)