@@ -199,15 +199,12 @@ class Builder:
_complete_delay: Expected delay until completion (timedelta)
_next_delay_update: Next time we plan to display a progress update
(datatime)
- num_threads: Number of builder threads to run
_opts: DisplayOptions for result output
_re_make_err: Compiled regex for make error detection
_restarting_config: True if 'Restart config' is detected in output
_result_handler: ResultHandler for displaying results
_single_builder: BuilderThread object for the singer builder, if
threading is not being used
- _start_time: Start time for the build
- _step: Step value for processing commits (1=all, 2=every other, etc.)
_terminated: Thread was terminated due to an error
_threads: List of active threads
_timestamps: List of timestamps for the completion of the last
@@ -318,8 +315,8 @@ class Builder:
self._build_period_us = None
self._complete_delay = None
self._next_delay_update = datetime.now()
- self._start_time = None
- self._step = step
+ self.start_time = None
+ self.step = step
self.no_subdirs = no_subdirs
self.full_path = full_path
self.verbose_build = verbose_build
@@ -369,14 +366,14 @@ class Builder:
# Attributes set by other methods
self._build_period = None
self._commit = None
- self._upto = 0
+ self.upto = 0
self._warned = 0
self.fail = 0
self.commit_count = 0
self.commits = None
self.count = 0
- self._timestamps = collections.deque()
- self._verbose = False
+ self.timestamps = collections.deque()
+ self.verbose = False
# Note: baseline state for result summaries is now in ResultHandler
@@ -478,9 +475,9 @@ class Builder:
build (one board, one commit).
"""
now = datetime.now()
- self._timestamps.append(now)
- count = len(self._timestamps)
- delta = self._timestamps[-1] - self._timestamps[0]
+ self.timestamps.append(now)
+ count = len(self.timestamps)
+ delta = self.timestamps[-1] - self.timestamps[0]
seconds = delta.total_seconds()
# If we have enough data, estimate build period (time taken for a
@@ -489,7 +486,7 @@ class Builder:
self._next_delay_update = now + timedelta(seconds=2)
if seconds > 0:
self._build_period = float(seconds) / count
- todo = self.count - self._upto
+ todo = self.count - self.upto
self._complete_delay = timedelta(microseconds=
self._build_period * todo * 1000000)
# Round it
@@ -497,7 +494,7 @@ class Builder:
microseconds=self._complete_delay.microseconds)
if seconds > 60:
- self._timestamps.popleft()
+ self.timestamps.popleft()
count -= 1
def _select_commit(self, commit, checkout=True):
@@ -580,7 +577,7 @@ class Builder:
if result:
target = result.brd.target
- self._upto += 1
+ self.upto += 1
if result.return_code != 0:
self.fail += 1
elif result.stderr:
@@ -592,7 +589,7 @@ class Builder:
if self._opts.ide:
if result.stderr:
sys.stderr.write(result.stderr)
- elif self._verbose:
+ elif self.verbose:
terminal.print_clear()
boards_selected = {target : result.brd}
self._result_handler.reset_result_summary(boards_selected)
@@ -602,13 +599,13 @@ class Builder:
target = '(starting)'
# Display separate counts for ok, warned and fail
- ok = self._upto - self._warned - self.fail
+ ok = self.upto - self._warned - self.fail
line = '\r' + self.col.build(self.col.GREEN, f'{ok:5d}')
line += self.col.build(self.col.YELLOW, f'{self._warned:5d}')
line += self.col.build(self.col.RED, f'{self.fail:5d}')
line += f' /{self.count:<5d} '
- remaining = self.count - self._upto
+ remaining = self.count - self.upto
if remaining:
line += self.col.build(self.col.MAGENTA, f' -{remaining:<5d} ')
else:
@@ -1033,10 +1030,10 @@ class Builder:
board_selected (dict): Selected boards to build
"""
# First work out how many commits we will build
- count = (self.commit_count + self._step - 1) // self._step
+ count = (self.commit_count + self.step - 1) // self.step
self.count = len(board_selected) * count
- self._upto = self._warned = self.fail = 0
- self._timestamps = collections.deque()
+ self.upto = self._warned = self.fail = 0
+ self.timestamps = collections.deque()
def get_thread_dir(self, thread_num):
"""Get the directory path to the working dir for a thread.
@@ -1114,7 +1111,7 @@ class Builder:
else:
raise ValueError(f"Can't setup git repo with {setup_git}.")
- def _prepare_working_space(self, max_threads, setup_git):
+ def prepare_working_space(self, max_threads, setup_git):
"""Prepare the working directory for use.
Set up the git repo for each thread. Creates a linked working tree
@@ -1187,7 +1184,7 @@ class Builder:
to_remove.append(dirname)
return to_remove
- def _prepare_output_space(self):
+ def prepare_output_space(self):
"""Get the output directories ready to receive files.
We delete any output directories which look like ones we need to
@@ -1223,16 +1220,16 @@ class Builder:
"""
self.commit_count = len(commits) if commits else 1
self.commits = commits
- self._verbose = verbose
+ self.verbose = verbose
self._result_handler.reset_result_summary(board_selected)
builderthread.mkdir(self.base_dir, parents = True)
- self._prepare_working_space(min(self.num_threads, len(board_selected)),
+ self.prepare_working_space(min(self.num_threads, len(board_selected)),
board_selected and commits is not None)
- self._prepare_output_space()
+ self.prepare_output_space()
if not self._opts.ide:
tprint('\rStarting build...', newline=False)
- self._start_time = datetime.now()
+ self.start_time = datetime.now()
self._setup_build(board_selected, commits)
self.count += extra_count
self.process_result(None)
@@ -1246,7 +1243,7 @@ class Builder:
job.work_in_output = self.work_in_output
job.adjust_cfg = self.adjust_cfg
job.fragments = fragments
- job.step = self._step
+ job.step = self.step
if self.num_threads:
self.queue.put(job)
else:
@@ -1318,4 +1315,4 @@ class Builder:
"""
self._result_handler.print_build_summary(
self.count, self._already_done, self.kconfig_reconfig,
- self._start_time, self.thread_exceptions)
+ self.start_time, self.thread_exceptions)
@@ -119,6 +119,9 @@ def add_upto_m(parser):
parser.add_argument(
'--maintainer-check', action='store_true',
help='Check that maintainer entries exist for each board')
+ parser.add_argument('--worker', action='store_true', default=False,
+ help='Run in worker mode, accepting build commands on stdin '
+ '(used internally for distributed builds)')
parser.add_argument(
'--no-allow-missing', action='store_true', default=False,
help='Disable telling binman to allow missing blobs')
@@ -759,6 +759,11 @@ def do_buildman(args, toolchains=None, make_func=None, brds=None,
gitutil.setup()
col = terminal.Color()
+ # Handle --worker: run in worker mode for distributed builds
+ if args.worker:
+ from buildman import worker # pylint: disable=C0415
+ return worker.do_worker()
+
# Handle --machines: probe remote machines and show status
if args.machines or args.machines_fetch_arch:
return machine.do_probe_machines(
@@ -42,6 +42,7 @@ def run_tests(skip_net_tests, debug, verbose, args):
from buildman import test_builder
from buildman import test_cfgutil
from buildman import test_machine
+ from buildman import test_worker
test_name = args.terms and args.terms[0] or None
if skip_net_tests:
@@ -65,6 +66,7 @@ def run_tests(skip_net_tests, debug, verbose, args):
test_builder.TestMake,
test_builder.TestPrintBuildSummary,
test_machine,
+ test_worker,
'buildman.toolchain'])
return (0 if result.wasSuccessful() else 1)
@@ -277,7 +277,7 @@ class TestPrepareThread(unittest.TestCase):
class TestPrepareWorkingSpace(unittest.TestCase):
- """Tests for Builder._prepare_working_space()"""
+ """Tests for Builder.prepare_working_space()"""
def setUp(self):
"""Set up test fixtures"""
@@ -296,7 +296,7 @@ class TestPrepareWorkingSpace(unittest.TestCase):
@mock.patch.object(builderthread, 'mkdir')
def test_no_setup_git(self, mock_mkdir, mock_prepare_thread):
"""Test with setup_git=False"""
- self.builder._prepare_working_space(2, False)
+ self.builder.prepare_working_space(2, False)
mock_mkdir.assert_called_once()
# Should prepare 2 threads with setup_git=False
@@ -312,7 +312,7 @@ class TestPrepareWorkingSpace(unittest.TestCase):
def test_worktree_available(self, _mock_mkdir, mock_check_worktree,
mock_prune, mock_prepare_thread):
"""Test when worktree is available"""
- self.builder._prepare_working_space(3, True)
+ self.builder.prepare_working_space(3, True)
mock_check_worktree.assert_called_once()
mock_prune.assert_called_once()
@@ -329,7 +329,7 @@ class TestPrepareWorkingSpace(unittest.TestCase):
def test_worktree_not_available(self, _mock_mkdir, mock_check_worktree,
mock_prepare_thread):
"""Test when worktree is not available (falls back to clone)"""
- self.builder._prepare_working_space(2, True)
+ self.builder.prepare_working_space(2, True)
mock_check_worktree.assert_called_once()
# Should prepare 2 threads with setup_git='clone'
@@ -341,7 +341,7 @@ class TestPrepareWorkingSpace(unittest.TestCase):
@mock.patch.object(builderthread, 'mkdir')
def test_zero_threads(self, _mock_mkdir, mock_prepare_thread):
"""Test with max_threads=0 (should still prepare 1 thread)"""
- self.builder._prepare_working_space(0, False)
+ self.builder.prepare_working_space(0, False)
# Should prepare at least 1 thread
self.assertEqual(mock_prepare_thread.call_count, 1)
@@ -352,7 +352,7 @@ class TestPrepareWorkingSpace(unittest.TestCase):
def test_no_git_dir(self, _mock_mkdir, mock_prepare_thread):
"""Test with no git_dir set"""
self.builder.git_dir = None
- self.builder._prepare_working_space(2, True)
+ self.builder.prepare_working_space(2, True)
# _detect_git_setup returns False when git_dir is None
self.assertEqual(mock_prepare_thread.call_count, 2)
@@ -368,7 +368,7 @@ class TestPrepareWorkingSpace(unittest.TestCase):
mock_prune, mock_prepare_thread):
"""Test lazy_thread_setup skips upfront thread preparation"""
self.builder._lazy_thread_setup = True
- self.builder._prepare_working_space(4, True)
+ self.builder.prepare_working_space(4, True)
# Git setup type is detected so prepare_thread() can use it
# later, but no threads are prepared upfront
@@ -510,7 +510,7 @@ class TestShowNotBuilt(unittest.TestCase):
class TestPrepareOutputSpace(unittest.TestCase):
- """Tests for _prepare_output_space() and _get_output_space_removals()"""
+ """Tests for prepare_output_space() and _get_output_space_removals()"""
def setUp(self):
"""Set up test fixtures"""
@@ -561,25 +561,25 @@ class TestPrepareOutputSpace(unittest.TestCase):
self.assertEqual(result, ['/tmp/test/02_g1234567_old'])
@mock.patch.object(builder.Builder, '_get_output_space_removals')
- def test_prepare_output_space_nothing_to_remove(self, mock_get_removals):
- """Test _prepare_output_space with nothing to remove"""
+ def testprepare_output_space_nothing_to_remove(self, mock_get_removals):
+ """Test prepare_output_space with nothing to remove"""
mock_get_removals.return_value = []
terminal.get_print_test_lines() # Clear
- self.builder._prepare_output_space()
+ self.builder.prepare_output_space()
lines = terminal.get_print_test_lines()
self.assertEqual(len(lines), 0)
@mock.patch.object(shutil, 'rmtree')
@mock.patch.object(builder.Builder, '_get_output_space_removals')
- def test_prepare_output_space_removes_dirs(self, mock_get_removals,
+ def testprepare_output_space_removes_dirs(self, mock_get_removals,
mock_rmtree):
- """Test _prepare_output_space removes old directories"""
+ """Test prepare_output_space removes old directories"""
mock_get_removals.return_value = ['/tmp/test/old1', '/tmp/test/old2']
terminal.get_print_test_lines() # Clear
- self.builder._prepare_output_space()
+ self.builder.prepare_output_space()
# Check rmtree was called for each directory
self.assertEqual(mock_rmtree.call_count, 2)
new file mode 100644
@@ -0,0 +1,896 @@
+# SPDX-License-Identifier: GPL-2.0+
+# Copyright 2026 Simon Glass <sjg@chromium.org>
+
+"""Tests for the worker module"""
+
+# pylint: disable=W0212,C0302
+
+import io
+import json
+import os
+import queue
+import signal
+import subprocess
+import tempfile
+import unittest
+from unittest import mock
+
+from u_boot_pylib.command import CommandExc, CommandResult
+
+from buildman import worker
+
+
+def _parse(line):
+ """Parse a protocol line into a dict"""
+ return json.loads(line[len(worker.RESPONSE_PREFIX):])
+
+
+class _ProtoTestBase(unittest.TestCase):
+ """Base class for tests that use the worker protocol
+
+ Provides capture/parse helpers and resets _protocol_out on tearDown.
+ """
+
+ def setUp(self):
+ self.buf = io.StringIO()
+ worker._protocol_out = self.buf
+
+ def tearDown(self):
+ worker._protocol_out = None
+
+ def get_resp(self):
+ """Parse the last response written to the capture buffer"""
+ return _parse(self.buf.getvalue())
+
+ def get_all_resp(self):
+ """Parse all responses from the capture buffer"""
+ return [_parse(line) for line in self.buf.getvalue().strip().split('\n')
+ if line]
+
+ def assert_resp(self, key, value):
+ """Assert a key in the last response equals value"""
+ self.assertEqual(self.get_resp()[key], value)
+
+ def assert_in_output(self, text):
+ """Assert text appears in raw protocol output"""
+ self.assertIn(text, self.buf.getvalue())
+
+
+class _RunWorkerBase(_ProtoTestBase):
+ """Base class for tests that run the full worker loop"""
+
+ def _run(self, stdin_text):
+ """Run the worker with given stdin, return (result, output lines)"""
+ buf = io.StringIO()
+ with mock.patch('buildman.worker.toolchain_mod.Toolchains'), \
+ mock.patch('sys.stdin', io.StringIO(stdin_text)), \
+ mock.patch('sys.stdout', buf):
+ result = worker.run_worker()
+ lines = [line for line in buf.getvalue().strip().split('\n')
+ if line]
+ return result, lines
+
+
+class TestProtocol(_ProtoTestBase):
+ """Test _send(), _send_error() and _send_build_result()"""
+
+ def test_send(self):
+ """Test sending a response"""
+ worker._send({'resp': 'ready', 'nthreads': 4})
+ self.assertTrue(self.buf.getvalue().startswith(
+ worker.RESPONSE_PREFIX))
+ self.assert_resp('resp', 'ready')
+ self.assert_resp('nthreads', 4)
+
+ def test_send_error(self):
+ """Test sending an error response"""
+ worker._send_error('something broke')
+ self.assert_resp('resp', 'error')
+ self.assert_resp('msg', 'something broke')
+
+ def test_send_build_result_with_sizes(self):
+ """Test sending result with sizes"""
+ worker._send_build_result(
+ 'sandbox', 0, 0,
+ sizes={'text': 1000, 'data': 200})
+ self.assertEqual(
+ self.get_resp()['sizes'], {'text': 1000, 'data': 200})
+
+ def test_send_build_result_without_sizes(self):
+ """Test sending result without sizes"""
+ worker._send_build_result('sandbox', 0, 0)
+ self.assertNotIn('sizes', self.get_resp())
+
+
+class TestUtilityFunctions(unittest.TestCase):
+ """Test _get_nthreads(), _get_load_avg() and _get_sizes()"""
+
+ def test_nthreads_normal(self):
+ """Test getting thread count"""
+ self.assertGreater(worker._get_nthreads(), 0)
+
+ @mock.patch('os.cpu_count', return_value=None)
+ def test_nthreads_none(self, _mock):
+ """Test when cpu_count returns None"""
+ self.assertEqual(worker._get_nthreads(), 1)
+
+ @mock.patch('os.cpu_count', side_effect=AttributeError)
+ def test_nthreads_attribute_error(self, _mock):
+ """Test when cpu_count raises AttributeError"""
+ self.assertEqual(worker._get_nthreads(), 1)
+
+ @mock.patch('builtins.open', side_effect=OSError('no file'))
+ def test_load_avg_no_proc(self, _mock):
+ """Test when /proc/loadavg is not available"""
+ self.assertEqual(worker._get_load_avg(), 0.0)
+
+ def test_get_sizes_no_elf(self):
+ """Test with no ELF file"""
+ with tempfile.TemporaryDirectory() as tmpdir:
+ self.assertEqual(worker._get_sizes(tmpdir), {})
+
+ @mock.patch('buildman.worker.subprocess.Popen')
+ def test_get_sizes_with_elf(self, mock_popen):
+ """Test with ELF file present"""
+ proc = mock.Mock()
+ proc.communicate.return_value = (
+ b' text data bss dec hex filename\n'
+ b' 12345 1234 567 14146 374a u-boot\n',
+ b'')
+ proc.returncode = 0
+ mock_popen.return_value = proc
+ with tempfile.TemporaryDirectory() as tmpdir:
+ elf = os.path.join(tmpdir, 'u-boot')
+ with open(elf, 'w', encoding='utf-8') as fout:
+ fout.write('fake')
+ self.assertIn('raw', worker._get_sizes(tmpdir))
+
+ @mock.patch('buildman.worker.subprocess.Popen',
+ side_effect=OSError('no size'))
+ def test_get_sizes_popen_fails(self, _mock):
+ """Test when size command fails"""
+ with tempfile.TemporaryDirectory() as tmpdir:
+ elf = os.path.join(tmpdir, 'u-boot')
+ with open(elf, 'w', encoding='utf-8') as fout:
+ fout.write('fake')
+ self.assertEqual(worker._get_sizes(tmpdir), {})
+
+
+class TestCmdSetup(_ProtoTestBase):
+ """Test _cmd_setup()"""
+
+ @mock.patch('buildman.worker.command.run_one')
+ def test_auto_work_dir(self, mock_run):
+ """Test setup with auto-created work directory"""
+ mock_run.return_value = mock.Mock(return_code=0)
+ state = {}
+ result = worker._cmd_setup({'work_dir': ''}, state)
+ self.assertTrue(result)
+ self.assertIn('work_dir', state)
+ self.assertTrue(state.get('auto_work_dir'))
+ mock_run.assert_called_once()
+ self.addCleanup(lambda: os.path.isdir(state['work_dir'])
+ and os.rmdir(state['work_dir']))
+
+ @mock.patch('buildman.worker.command.run_one')
+ def test_explicit_work_dir(self, mock_run):
+ """Test setup with explicit work directory"""
+ mock_run.return_value = mock.Mock(return_code=0)
+ with tempfile.TemporaryDirectory() as tmpdir:
+ state = {}
+ work_dir = os.path.join(tmpdir, 'build')
+ self.assertTrue(
+ worker._cmd_setup({'work_dir': work_dir}, state))
+ self.assertEqual(state['work_dir'], work_dir)
+ self.assertTrue(os.path.isdir(work_dir))
+ self.assertNotIn('auto_work_dir', state)
+
+ @mock.patch('buildman.worker.command.run_one')
+ def test_setup_returns_git_dir(self, mock_run):
+ """Test setup response includes git_dir"""
+ mock_run.return_value = mock.Mock(return_code=0)
+ with tempfile.TemporaryDirectory() as tmpdir:
+ worker._cmd_setup({'work_dir': tmpdir}, {})
+ self.assert_resp('resp', 'setup_done')
+ self.assertEqual(
+ self.get_resp()['git_dir'],
+ os.path.join(tmpdir, '.git'))
+
+ def test_setup_existing_git(self):
+ """Test setup skips git init if .git already exists"""
+ with tempfile.TemporaryDirectory() as tmpdir:
+ os.makedirs(os.path.join(tmpdir, '.git'))
+ with mock.patch(
+ 'buildman.worker.command.run_one') as mock_run:
+ self.assertTrue(
+ worker._cmd_setup({'work_dir': tmpdir}, {}))
+ mock_run.assert_not_called()
+
+ @mock.patch('buildman.worker.command.run_one')
+ def test_git_init_fails(self, mock_run):
+ """Test setup when git init fails"""
+ mock_run.side_effect = CommandExc(
+ 'git init failed', CommandResult())
+ with tempfile.TemporaryDirectory() as tmpdir:
+ result = worker._cmd_setup(
+ {'work_dir': os.path.join(tmpdir, 'new')}, {})
+ self.assertFalse(result)
+ self.assert_in_output('git init failed')
+
+
+class TestCmdQuit(_ProtoTestBase):
+ """Test _cmd_quit()"""
+
+ def test_quit(self):
+ """Test quit command"""
+ worker._cmd_quit({})
+ self.assert_resp('resp', 'quit_ack')
+
+ def test_quit_cleanup(self):
+ """Test quit cleans up auto work directory"""
+ tmpdir = tempfile.mkdtemp(prefix='bm-test-')
+ worker._cmd_quit({'work_dir': tmpdir, 'auto_work_dir': True})
+ self.assertFalse(os.path.exists(tmpdir))
+
+ def test_quit_preserves_explicit_dir(self):
+ """Test quit does not remove non-auto work directory"""
+ with tempfile.TemporaryDirectory() as tmpdir:
+ worker._cmd_quit({'work_dir': tmpdir})
+ self.assertTrue(os.path.isdir(tmpdir))
+
+
+class TestCmdConfigure(_ProtoTestBase):
+ """Test _cmd_configure()"""
+
+ def test_configure(self):
+ """Test that configure stores settings in state"""
+ state = {}
+ settings = {'no_lto': True, 'allow_missing': True}
+ self.assertTrue(
+ worker._cmd_configure({'settings': settings}, state))
+ self.assertEqual(state['settings'], settings)
+ self.assert_resp('resp', 'configure_done')
+
+ def test_configure_empty(self):
+ """Test configure with empty settings"""
+ state = {}
+ worker._cmd_configure({'settings': {}}, state)
+ self.assertEqual(state['settings'], {})
+
+
+class TestRunWorker(_RunWorkerBase):
+ """Test run_worker()"""
+
+ def test_empty_stdin(self):
+ """Test worker with empty stdin (unexpected close)"""
+ result, lines = self._run('')
+ self.assertEqual(result, 1)
+ self.assertEqual(_parse(lines[0])['resp'], 'ready')
+
+ def test_ready_includes_slots(self):
+ """Test that the ready response includes slots"""
+ _, lines = self._run('{"cmd": "quit"}\n')
+ obj = _parse(lines[0])
+ self.assertEqual(obj['resp'], 'ready')
+ self.assertIn('slots', obj)
+ self.assertGreaterEqual(obj['slots'], 1)
+
+ def test_quit_command(self):
+ """Test worker with quit command"""
+ result, lines = self._run('{"cmd": "quit"}\n')
+ self.assertEqual(result, 0)
+ self.assertEqual(len(lines), 2) # ready + quit_ack
+
+ def test_invalid_json(self):
+ """Test worker with invalid JSON input"""
+ _, lines = self._run('not json\n{"cmd": "quit"}\n')
+ self.assertEqual(len(lines), 3) # ready + error + quit_ack
+ self.assertIn('invalid JSON', _parse(lines[1])['msg'])
+
+ def test_unknown_command(self):
+ """Test worker with unknown command"""
+ _, lines = self._run(
+ '{"cmd": "dance"}\n{"cmd": "quit"}\n')
+ self.assertIn('unknown command', _parse(lines[1])['msg'])
+
+ def test_blank_lines(self):
+ """Test worker ignores blank lines"""
+ _, lines = self._run('\n\n{"cmd": "quit"}\n\n')
+ self.assertEqual(len(lines), 2) # ready + quit_ack
+
+ def test_configure_command(self):
+ """Test configure command in worker loop"""
+ result, lines = self._run(
+ '{"cmd": "configure", "settings": {"no_lto": true}}\n'
+ '{"cmd": "quit"}\n')
+ self.assertEqual(result, 0)
+ self.assertEqual(_parse(lines[1])['resp'], 'configure_done')
+
+
+class TestRunWorkerDispatch(_RunWorkerBase):
+ """Test run_worker command dispatch"""
+
+ @mock.patch('buildman.worker._cmd_setup', return_value=True)
+ def test_setup(self, mock_fn):
+ """Test setup command is dispatched"""
+ self._run('{"cmd": "setup", "work_dir": "/tmp"}\n'
+ '{"cmd": "quit"}\n')
+ mock_fn.assert_called_once()
+
+ @mock.patch('buildman.worker._cmd_build_boards')
+ def test_build_boards(self, mock_fn):
+ """Test build_boards command is dispatched"""
+ self._run('{"cmd": "build_boards", "boards": [], '
+ '"commits": []}\n{"cmd": "quit"}\n')
+ mock_fn.assert_called_once()
+
+ @mock.patch('buildman.worker._cmd_build_prepare')
+ def test_build_prepare(self, mock_fn):
+ """Test build_prepare command is dispatched"""
+ self._run('{"cmd": "build_prepare", "commits": []}\n'
+ '{"cmd": "quit"}\n')
+ mock_fn.assert_called_once()
+
+ @mock.patch('buildman.worker._cmd_build_board')
+ def test_build_board(self, mock_fn):
+ """Test build_board command is dispatched"""
+ self._run('{"cmd": "build_board", "board": "x"}\n'
+ '{"cmd": "quit"}\n')
+ mock_fn.assert_called_once()
+
+ @mock.patch('buildman.worker._cmd_build_done')
+ def test_build_done(self, mock_fn):
+ """Test build_done command is dispatched"""
+ self._run('{"cmd": "build_done"}\n{"cmd": "quit"}\n')
+ mock_fn.assert_called_once()
+
+ def test_stdin_eof(self):
+ """Test worker handles stdin EOF"""
+ result, _lines = self._run('')
+ self.assertEqual(result, 1)
+
+ def test_queue_empty_retry(self):
+ """Test dispatch retries on queue.Empty"""
+ eof = object()
+ mock_queue = mock.Mock()
+ mock_queue.get.side_effect = [
+ queue.Empty(),
+ '{"cmd": "quit"}\n',
+ ]
+ worker._dispatch_commands(mock_queue, eof, {})
+ self.assert_in_output('quit_ack')
+ self.assertEqual(mock_queue.get.call_count, 2)
+
+
+class TestWorkerMake(unittest.TestCase):
+ """Test _worker_make()"""
+
+ @mock.patch('buildman.worker.subprocess.Popen')
+ def test_success(self, mock_popen):
+ """Test successful make invocation"""
+ proc = mock.Mock()
+ proc.communicate.return_value = (b'built ok\n', b'')
+ proc.returncode = 0
+ mock_popen.return_value = proc
+
+ result = worker._worker_make(
+ None, None, None, '/tmp',
+ 'O=/tmp/out', '-s', '-j', '4', 'sandbox_defconfig',
+ env={'PATH': '/usr/bin'})
+ self.assertEqual(result.return_code, 0)
+ self.assertEqual(result.stdout, 'built ok\n')
+ self.assertEqual(mock_popen.call_args[0][0][0], 'make')
+
+ @mock.patch('buildman.worker.subprocess.Popen',
+ side_effect=FileNotFoundError('no make'))
+ def test_make_not_found(self, _mock_popen):
+ """Test when make binary is not found"""
+ result = worker._worker_make(
+ None, None, None, '/tmp', env={})
+ self.assertEqual(result.return_code, 1)
+ self.assertIn('make failed', result.stderr)
+
+
+class TestWorkerBuilderThread(_ProtoTestBase):
+ """Test _WorkerBuilderThread"""
+
+ def _make_thread(self):
+ """Create an uninitialised thread instance for testing
+
+ Uses __new__ to avoid calling __init__ which requires a real
+ Builder. Tests must set any attributes they need.
+ """
+ return worker._WorkerBuilderThread.__new__(
+ worker._WorkerBuilderThread)
+
+ def test_write_result_is_noop(self):
+ """Test that _write_result does nothing"""
+ self._make_thread()._write_result(None, False, False)
+
+ def test_send_result(self):
+ """Test that _send_result sends a build_result message"""
+ thread = self._make_thread()
+ result = mock.Mock(
+ brd=mock.Mock(target='sandbox'),
+ commit_upto=0, return_code=0,
+ stderr='', stdout='', out_dir='/nonexistent')
+ thread._send_result(result)
+ self.assert_resp('resp', 'build_result')
+ self.assert_resp('board', 'sandbox')
+
+ def test_run_job_sends_heartbeat(self):
+ """Test run_job sends heartbeat"""
+ thread = self._make_thread()
+ thread.thread_num = 0
+ job = mock.Mock(brd=mock.Mock(target='sandbox'))
+ with mock.patch.object(worker._WorkerBuilderThread.__bases__[0],
+ 'run_job'):
+ thread.run_job(job)
+ self.assert_in_output('heartbeat')
+
+ def test_checkout_with_commits(self):
+ """Test _checkout with commits"""
+ thread = self._make_thread()
+ thread.builder = mock.Mock()
+ commit = mock.Mock(hash='abc123')
+ thread.builder.commits = [commit]
+ thread.builder.checkout = True
+
+ with mock.patch('buildman.worker._run_git') as mock_git, \
+ mock.patch('buildman.worker._remove_stale_lock'), \
+ tempfile.TemporaryDirectory() as tmpdir:
+ result = thread._checkout(0, tmpdir)
+
+ self.assertEqual(result, commit)
+ mock_git.assert_called_once()
+
+ def test_checkout_no_commits(self):
+ """Test _checkout without commits returns 'current'"""
+ thread = self._make_thread()
+ thread.builder = mock.Mock(commits=None)
+ self.assertEqual(thread._checkout(0, '/tmp'), 'current')
+
+
+class TestCmdBuildBoards(_ProtoTestBase):
+ """Test _cmd_build_boards"""
+
+ def _make_state(self, tmpdir, **overrides):
+ """Create a standard state dict for build tests"""
+ state = {
+ 'work_dir': tmpdir,
+ 'toolchains': mock.Mock(),
+ 'settings': {},
+ 'nthreads': 4,
+ }
+ state.update(overrides)
+ return state
+
+ def test_no_work_dir_error(self):
+ """Test error when no work directory set"""
+ worker._cmd_build_boards({
+ 'boards': [{'board': 'x', 'arch': 'arm'}],
+ 'commits': ['abc'],
+ }, {'work_dir': None})
+ self.assert_in_output('no work directory')
+
+ def test_no_boards_error(self):
+ """Test error when no boards specified"""
+ with tempfile.TemporaryDirectory() as tmpdir:
+ worker._cmd_build_boards(
+ {'boards': [], 'commits': ['abc']},
+ self._make_state(tmpdir))
+ self.assert_in_output('no boards')
+
+ def test_no_toolchains_error(self):
+ """Test error when toolchains not set up"""
+ with tempfile.TemporaryDirectory() as tmpdir:
+ worker._cmd_build_boards({
+ 'boards': [{'board': 'x', 'arch': 'arm'}],
+ 'commits': ['abc'],
+ }, {'work_dir': tmpdir})
+ self.assert_in_output('no toolchains')
+
+ @mock.patch('buildman.worker._setup_worktrees')
+ @mock.patch('buildman.worker.builder_mod.Builder')
+ @mock.patch('buildman.worker.ResultHandler')
+ def test_creates_builder(self, _mock_rh_cls, mock_builder_cls,
+ mock_setup_wt):
+ """Test that build_boards creates a Builder correctly"""
+ mock_builder = mock.Mock()
+ mock_builder.run_build.return_value = (0, 0, [])
+ mock_builder_cls.return_value = mock_builder
+
+ with tempfile.TemporaryDirectory() as tmpdir:
+ worker._cmd_build_boards({
+ 'boards': [
+ {'board': 'sandbox', 'arch': 'sandbox'},
+ {'board': 'rpi', 'arch': 'arm'},
+ ],
+ 'commits': ['abc123', 'def456'],
+ }, self._make_state(tmpdir, nthreads=8,
+ settings={'no_lto': True,
+ 'force_build': True,
+ 'kconfig_check': False}))
+
+ mock_setup_wt.assert_called_once()
+ kwargs = mock_builder_cls.call_args[1]
+ self.assertEqual(kwargs['thread_class'],
+ worker._WorkerBuilderThread)
+ self.assertTrue(kwargs['no_lto'])
+ self.assertFalse(kwargs['kconfig_check'])
+
+ call_args = mock_builder.init_build.call_args
+ self.assertEqual(len(call_args[0][0]), 2)
+ self.assertIn('rpi', call_args[0][1])
+ self.assert_in_output('build_done')
+
+ @mock.patch('buildman.worker.builder_mod.Builder')
+ @mock.patch('buildman.worker.ResultHandler')
+ def test_no_commits(self, _mock_rh_cls, mock_builder_cls):
+ """Test build_boards with no commits (current source)"""
+ mock_builder = mock.Mock()
+ mock_builder.run_build.return_value = (0, 0, [])
+ mock_builder_cls.return_value = mock_builder
+
+ with tempfile.TemporaryDirectory() as tmpdir:
+ worker._cmd_build_boards({
+ 'boards': [{'board': 'sandbox', 'arch': 'sandbox'}],
+ 'commits': [None],
+ }, self._make_state(tmpdir))
+
+ self.assertIsNone(
+ mock_builder.init_build.call_args[0][0])
+ self.assertTrue(mock_builder_cls.call_args[1]['kconfig_check'])
+
+ @mock.patch('buildman.worker._setup_worktrees')
+ @mock.patch('buildman.worker.builder_mod.Builder')
+ @mock.patch('buildman.worker.ResultHandler')
+ def test_build_crash(self, _mock_rh, mock_builder_cls, _mock_wt):
+ """Test build_boards when run_build crashes"""
+ mock_builder = mock.Mock()
+ mock_builder.run_build.side_effect = RuntimeError('crash')
+ mock_builder_cls.return_value = mock_builder
+
+ with tempfile.TemporaryDirectory() as tmpdir:
+ worker._cmd_build_boards({
+ 'boards': [{'board': 'x', 'arch': 'arm'}],
+ 'commits': ['abc'],
+ }, self._make_state(tmpdir, nthreads=2))
+
+ self.assert_in_output('"exceptions": 1')
+
+
+class TestCmdBuildPrepare(_ProtoTestBase):
+ """Test _cmd_build_prepare()"""
+
+ def test_no_work_dir(self):
+ """Test error when no work directory"""
+ worker._cmd_build_prepare({}, {})
+ self.assert_in_output('no work directory')
+
+ def test_no_toolchains(self):
+ """Test error when no toolchains"""
+ with tempfile.TemporaryDirectory() as tmpdir:
+ worker._cmd_build_prepare({}, {'work_dir': tmpdir})
+ self.assert_in_output('no toolchains')
+
+ @mock.patch('buildman.worker._setup_worktrees')
+ @mock.patch('buildman.worker._create_builder')
+ def test_success(self, mock_create, _mock_wt):
+ """Test successful build_prepare"""
+ mock_bldr = mock.Mock(
+ base_dir='/tmp/test', commit_count=1,
+ work_in_output=False)
+ mock_create.return_value = mock_bldr
+
+ with tempfile.TemporaryDirectory() as tmpdir:
+ os.makedirs(os.path.join(tmpdir, '.git'))
+ state = {
+ 'work_dir': tmpdir,
+ 'toolchains': mock.Mock(),
+ 'nthreads': 2,
+ 'settings': {},
+ }
+ worker._cmd_build_prepare(
+ {'commits': ['abc123']}, state)
+
+ self.assert_in_output('build_prepare_done')
+ self.assertIn('builder', state)
+
+
+class TestCmdBuildBoard(_ProtoTestBase):
+ """Test _cmd_build_board()"""
+
+ def test_no_builder(self):
+ """Test error when no builder"""
+ worker._cmd_build_board({'board': 'x'}, {})
+ self.assert_in_output('no builder')
+
+ def test_queues_job(self):
+ """Test that build_board queues a job"""
+ mock_bldr = mock.Mock(
+ commit_count=1, count=0, work_in_output=False,
+ adjust_cfg=None, step=1)
+ worker._cmd_build_board(
+ {'board': 'sandbox', 'arch': 'sandbox'},
+ {'builder': mock_bldr, 'commits': None})
+ mock_bldr.queue.put.assert_called_once()
+
+
+class TestCmdBuildDone(_ProtoTestBase):
+ """Test _cmd_build_done()"""
+
+ def test_no_builder(self):
+ """Test build_done with no builder"""
+ worker._cmd_build_done({})
+ self.assert_resp('resp', 'build_done')
+ self.assert_resp('exceptions', 0)
+
+ def test_with_builder(self):
+ """Test build_done with a builder"""
+ mock_bldr = mock.Mock()
+ mock_bldr.run_build.return_value = (0, 0, [])
+ state = {'builder': mock_bldr, 'commits': ['abc']}
+ worker._cmd_build_done(state)
+ self.assert_resp('resp', 'build_done')
+ self.assertNotIn('builder', state)
+
+ def test_builder_crash(self):
+ """Test build_done when run_build crashes"""
+ mock_bldr = mock.Mock()
+ mock_bldr.run_build.side_effect = RuntimeError('boom')
+ state = {'builder': mock_bldr, 'commits': ['abc']}
+ worker._cmd_build_done(state)
+ self.assert_resp('exceptions', 1)
+ self.assertNotIn('builder', state)
+
+
+class TestSetupWorktrees(_ProtoTestBase):
+ """Test _setup_worktrees()"""
+
+ @mock.patch('buildman.worker._run_git')
+ def test_creates_worktrees(self, mock_git):
+ """Test that worktrees are created for each thread"""
+ with tempfile.TemporaryDirectory() as tmpdir:
+ git_dir = os.path.join(tmpdir, '.git')
+ os.makedirs(git_dir)
+ worker._setup_worktrees(tmpdir, git_dir, 3)
+
+ self.assertEqual(mock_git.call_count, 4) # prune + 3 adds
+ self.assertIn('prune', mock_git.call_args_list[0][0])
+ resps = self.get_all_resp()
+ self.assertEqual(len(resps), 3)
+ for resp in resps:
+ self.assertEqual(resp['resp'], 'worktree_created')
+
+ @mock.patch('buildman.worker._run_git')
+ def test_skips_existing_valid_worktree(self, mock_git):
+ """Test that valid existing worktrees are reused"""
+ with tempfile.TemporaryDirectory() as tmpdir:
+ git_dir = os.path.join(tmpdir, '.git')
+ os.makedirs(git_dir)
+
+ thread_dir = os.path.join(tmpdir, '.bm-work', '00')
+ os.makedirs(thread_dir)
+ real_gitdir = os.path.join(git_dir, 'worktrees', '00')
+ os.makedirs(real_gitdir)
+ with open(os.path.join(thread_dir, '.git'), 'w',
+ encoding='utf-8') as fout:
+ fout.write(f'gitdir: {real_gitdir}\n')
+
+ worker._setup_worktrees(tmpdir, git_dir, 1)
+
+ self.assertEqual(mock_git.call_count, 1) # prune only
+
+ @mock.patch('buildman.worker._run_git')
+ def test_replaces_stale_clone(self, mock_git):
+ """Test that a full .git directory (old clone) is replaced"""
+ with tempfile.TemporaryDirectory() as tmpdir:
+ git_dir = os.path.join(tmpdir, '.git')
+ os.makedirs(git_dir)
+
+ thread_dir = os.path.join(tmpdir, '.bm-work', '00')
+ clone_git = os.path.join(thread_dir, '.git')
+ os.makedirs(os.path.join(clone_git, 'objects'))
+
+ worker._setup_worktrees(tmpdir, git_dir, 1)
+ self.assertFalse(os.path.isdir(clone_git))
+
+ self.assertEqual(mock_git.call_count, 2) # prune + add
+
+ @mock.patch('buildman.worker._run_git')
+ def test_stale_dot_git_file(self, mock_git):
+ """Test removing stale .git file pointing to non-existent dir"""
+ with tempfile.TemporaryDirectory() as tmpdir:
+ git_dir = os.path.join(tmpdir, '.git')
+ os.makedirs(git_dir)
+
+ thread_dir = os.path.join(tmpdir, '.bm-work', '00')
+ os.makedirs(thread_dir)
+ dot_git = os.path.join(thread_dir, '.git')
+ with open(dot_git, 'w', encoding='utf-8') as fout:
+ fout.write('gitdir: /nonexistent\n')
+
+ worker._setup_worktrees(tmpdir, git_dir, 1)
+ self.assertFalse(os.path.isfile(dot_git))
+
+ self.assertGreaterEqual(mock_git.call_count, 2)
+
+
+class TestRunGit(unittest.TestCase):
+ """Test _run_git()"""
+
+ @mock.patch('buildman.worker.subprocess.Popen')
+ def test_success(self, mock_popen):
+ """Test successful git command"""
+ proc = mock.Mock()
+ proc.communicate.return_value = (b'', b'')
+ proc.returncode = 0
+ mock_popen.return_value = proc
+ worker._run_git('status', cwd='/tmp')
+
+ @mock.patch('buildman.worker.subprocess.Popen')
+ def test_failure(self, mock_popen):
+ """Test failed git command"""
+ proc = mock.Mock()
+ proc.communicate.return_value = (b'', b'fatal: bad ref\n')
+ proc.returncode = 128
+ mock_popen.return_value = proc
+ with self.assertRaises(OSError) as ctx:
+ worker._run_git('checkout', 'bad', cwd='/tmp')
+ self.assertIn('bad ref', str(ctx.exception))
+
+ @mock.patch('buildman.worker.subprocess.Popen')
+ def test_timeout(self, mock_popen):
+ """Test git command timeout"""
+ proc = mock.Mock()
+ proc.communicate.side_effect = [
+ subprocess.TimeoutExpired('git', 30),
+ (b'', b''),
+ ]
+ mock_popen.return_value = proc
+ with self.assertRaises(OSError) as ctx:
+ worker._run_git('fetch', cwd='/tmp', timeout=30)
+ self.assertIn('timed out', str(ctx.exception))
+
+
+class TestResolveGitDir(unittest.TestCase):
+ """Test _resolve_git_dir()"""
+
+ def test_directory(self):
+ """Test with a regular .git directory"""
+ with tempfile.TemporaryDirectory() as tmpdir:
+ git_dir = os.path.join(tmpdir, '.git')
+ os.makedirs(git_dir)
+ self.assertEqual(worker._resolve_git_dir(git_dir), git_dir)
+
+ def test_gitdir_file_absolute(self):
+ """Test with a .git file with absolute path"""
+ with tempfile.TemporaryDirectory() as tmpdir:
+ real_git = os.path.join(tmpdir, 'real.git')
+ os.makedirs(real_git)
+ dot_git = os.path.join(tmpdir, '.git')
+ with open(dot_git, 'w', encoding='utf-8') as fout:
+ fout.write(f'gitdir: {real_git}\n')
+ self.assertEqual(
+ worker._resolve_git_dir(dot_git), real_git)
+
+ def test_gitdir_file_relative(self):
+ """Test .git file with relative path"""
+ with tempfile.TemporaryDirectory() as tmpdir:
+ real_git = os.path.join(tmpdir, 'worktrees', 'wt1')
+ os.makedirs(real_git)
+ dot_git = os.path.join(tmpdir, '.git')
+ with open(dot_git, 'w', encoding='utf-8') as fout:
+ fout.write('gitdir: worktrees/wt1\n')
+ self.assertEqual(
+ worker._resolve_git_dir(dot_git),
+ os.path.join(tmpdir, 'worktrees', 'wt1'))
+
+
+class TestProcessManagement(unittest.TestCase):
+ """Test _kill_group(), _dbg() and signal handling"""
+
+ @mock.patch('os.killpg')
+ def test_kill_group_not_leader(self, mock_killpg):
+ """Test _kill_group when not group leader"""
+ old = worker._is_group_leader
+ worker._is_group_leader = False
+ worker._kill_group()
+ mock_killpg.assert_not_called()
+ worker._is_group_leader = old
+
+ @mock.patch('os.killpg')
+ @mock.patch('os.getpgrp', return_value=12345)
+ def test_kill_group_leader(self, _mock_grp, mock_killpg):
+ """Test _kill_group when group leader"""
+ old = worker._is_group_leader
+ worker._is_group_leader = True
+ worker._kill_group()
+ mock_killpg.assert_called_once()
+ worker._is_group_leader = old
+
+ @mock.patch('os.killpg', side_effect=OSError('no perm'))
+ @mock.patch('os.getpgrp', return_value=12345)
+ def test_kill_group_fails(self, _mock_grp, _mock_killpg):
+ """Test _kill_group handles OSError"""
+ old = worker._is_group_leader
+ worker._is_group_leader = True
+ worker._kill_group() # should not raise
+ worker._is_group_leader = old
+
+ def test_dbg_off(self):
+ """Test _dbg when debug is off"""
+ old = worker._debug
+ worker._debug = False
+ worker._dbg('test message') # should not raise
+ worker._debug = old
+
+ def test_dbg_on(self):
+ """Test _dbg when debug is on"""
+ old = worker._debug
+ worker._debug = True
+ with mock.patch('sys.stderr', new_callable=io.StringIO) as err:
+ worker._dbg('hello')
+ self.assertIn('hello', err.getvalue())
+ worker._debug = old
+
+ def test_dbg_stderr_oserror(self):
+ """Test _dbg handles OSError from stderr"""
+ old = worker._debug
+ worker._debug = True
+ err = mock.Mock()
+ err.write.side_effect = OSError('broken pipe')
+ with mock.patch('sys.stderr', err):
+ worker._dbg('will fail') # should not raise
+ worker._debug = old
+
+ @mock.patch('buildman.worker._kill_group')
+ @mock.patch('os._exit')
+ def test_exit_handler(self, mock_exit, mock_kill):
+ """Test signal handler calls _kill_group and os._exit"""
+ handlers = {}
+ orig_signal = signal.signal
+
+ def capture_signal(signum, handler):
+ handlers[signum] = handler
+ return orig_signal(signum, signal.SIG_DFL)
+
+ with mock.patch('signal.signal', side_effect=capture_signal), \
+ mock.patch('buildman.worker.toolchain_mod.Toolchains'), \
+ mock.patch('sys.stdin', io.StringIO('{"cmd":"quit"}\n')), \
+ mock.patch('sys.stdout', io.StringIO()):
+ worker.run_worker()
+
+ handler = handlers.get(signal.SIGTERM)
+ self.assertIsNotNone(handler)
+ mock_kill.reset_mock()
+ mock_exit.reset_mock()
+ handler(signal.SIGTERM, None)
+ mock_kill.assert_called()
+ mock_exit.assert_called_with(1)
+
+
+class TestDoWorker(unittest.TestCase):
+ """Test do_worker()"""
+
+ @mock.patch('buildman.worker.run_worker', return_value=0)
+ @mock.patch('os.setpgrp')
+ @mock.patch('os.getpid', return_value=100)
+ @mock.patch('os.getpgrp', return_value=100)
+ def test_start(self, _grp, _pid, _setpgrp, mock_run):
+ """Test do_worker sets process group and runs"""
+ self.assertEqual(worker.do_worker(debug=False), 0)
+ mock_run.assert_called_once_with(False)
+
+ @mock.patch('buildman.worker.run_worker', return_value=0)
+ @mock.patch('os.setpgrp', side_effect=OSError('not allowed'))
+ @mock.patch('os.getpid', return_value=100)
+ @mock.patch('os.getpgrp', return_value=1)
+ def test_setpgrp_fails(self, _grp, _pid, _setpgrp, _mock_run):
+ """Test do_worker handles setpgrp failure"""
+ self.assertEqual(worker.do_worker(debug=False), 0)
+
+
+if __name__ == '__main__':
+ unittest.main()
new file mode 100644
@@ -0,0 +1,985 @@
+# SPDX-License-Identifier: GPL-2.0+
+# Copyright 2026 Simon Glass <sjg@chromium.org>
+
+"""Worker mode for distributed builds
+
+A worker runs on a remote machine and receives build commands over stdin from a
+boss. Commands and responses use a JSON-lines protocol:
+
+Commands (boss -> worker, on stdin):
+ {"cmd": "setup", "work_dir": "/path"}
+ {"cmd": "configure", "settings": {"no_lto": true, ...}}
+ {"cmd": "build_boards",
+ "boards": [{"board": "sandbox", "arch": "sandbox"}],
+ "commits": ["<hash>", ...]}
+ {"cmd": "build_prepare", "commits": ["<hash>", ...]}
+ {"cmd": "build_board", "board": "sandbox", "arch": "sandbox"}
+ {"cmd": "build_done"}
+ {"cmd": "quit"}
+
+Responses (worker -> boss, on stdout):
+ Each line is prefixed with 'BM> ' followed by a JSON object:
+ BM> {"resp": "ready", "nthreads": 8, "slots": 2}
+ BM> {"resp": "setup_done", "work_dir": "/path", "git_dir": "/path/.git"}
+ BM> {"resp": "configure_done"}
+ BM> {"resp": "build_prepare_done"}
+ BM> {"resp": "build_result", "board": "sandbox", "commit_upto": 0,
+ "return_code": 0, "stderr": "", "sizes": {...}}
+ BM> {"resp": "build_done", "exceptions": 0}
+ BM> {"resp": "error", "msg": "..."}
+ BM> {"resp": "quit_ack"}
+
+The 'BM> ' prefix allows the boss to distinguish protocol messages from
+any stray output on the SSH connection (e.g. login banners, warnings).
+
+The worker uses Builder and BuilderThread from the local build path,
+with a custom BuilderThread subclass that sends results over SSH
+instead of writing them to disk. This means the worker inherits the
+same board-first scheduling, per-thread worktrees, incremental builds
+and retry logic as local builds.
+
+Typical flow (batch mode):
+ 1. Boss starts worker: ssh host buildman --worker
+ 2. Worker sends 'ready' with nthreads
+ 3. Boss sends 'setup' to create work directory with a git repo
+ 4. Worker sends 'setup_done' with git_dir path
+ 5. Boss pushes source: git push ssh://host/<git_dir> HEAD:refs/heads/work
+ 6. Boss sends 'build_boards' with all boards and commits
+ 7. Worker creates a Builder which sets up per-thread worktrees
+ and runs BuilderThread instances that pick boards from a queue,
+ build all commits for each, and stream 'build_result' responses
+ 8. Boss sends 'quit' when done
+
+Demand-driven flow:
+ Steps 1-5 same as above, then:
+ 6. Boss sends 'build_prepare' with commits
+ 7. Worker creates Builder and worktrees, sends 'build_prepare_done'
+ 8. Boss sends 'build_board' commands one at a time from a shared
+ pool, sending more as results arrive to keep threads busy
+ 9. Boss sends 'build_done' when no more boards
+ 10. Worker drains queue, sends 'build_done', boss sends 'quit'
+"""
+
+import json
+import os
+import queue
+import signal
+import shutil
+import subprocess
+import sys
+import tempfile
+import traceback
+import threading
+
+from buildman.board import Board
+from buildman import builderthread
+from buildman import builder as builder_mod
+from buildman.outcome import DisplayOptions
+from buildman.resulthandler import ResultHandler
+from buildman import toolchain as toolchain_mod
+from u_boot_pylib import command
+from u_boot_pylib import terminal
+
+from patman.commit import Commit
+
+# Protocol prefix for all worker responses
+RESPONSE_PREFIX = 'BM> '
+
+# Lock to prevent interleaved stdout writes from concurrent build threads
+_send_lock = threading.Lock()
+
+# Lock for debug output to stderr
+_debug_lock = threading.Lock()
+
+# Whether debug output is enabled (set by run_worker)
+_debug = False # pylint: disable=C0103
+
+# Whether this process is a process group leader (set by do_worker)
+_is_group_leader = False # pylint: disable=C0103
+
+# The real stdout for protocol messages (set by run_worker)
+_protocol_out = None # pylint: disable=C0103
+
+
+def _kill_group():
+ """Kill all processes in our process group
+
+ Sends SIGKILL to our entire process group, which includes this
+ process plus all make, cc1, as, ld, etc. spawned by build threads.
+ Only works if do_worker() confirmed we are the process group leader.
+ Does nothing otherwise, to avoid killing unrelated processes
+ (e.g. the test runner).
+ """
+ if not _is_group_leader:
+ _dbg('_kill_group: not leader, skipping')
+ return
+ _dbg(f'_kill_group: killing pgid {os.getpgrp()}')
+ try:
+ os.killpg(os.getpgrp(), signal.SIGKILL)
+ except OSError as exc:
+ _dbg(f'_kill_group: killpg failed: {exc}')
+
+
+def _dbg(msg):
+ """Print a debug message to stderr if debug mode is enabled
+
+ Args:
+ msg (str): Message to print
+ """
+ if _debug:
+ with _debug_lock:
+ try:
+ sys.stderr.write(f'W: {msg}\n')
+ sys.stderr.flush()
+ except OSError:
+ pass
+
+
+def _send(obj):
+ """Send a JSON response to the boss
+
+ Thread-safe: uses a lock to prevent interleaved writes from
+ concurrent build threads. Writes to _protocol_out (the real
+ stdout) rather than sys.stdout which is redirected to stderr.
+
+ Args:
+ obj (dict): Response object to send
+ """
+ out = _protocol_out or sys.stdout
+ with _send_lock:
+ out.write(RESPONSE_PREFIX + json.dumps(obj) + '\n')
+ out.flush()
+
+
+def _send_error(msg):
+ """Send an error response
+
+ Args:
+ msg (str): Error message
+ """
+ _send({'resp': 'error', 'msg': msg})
+
+
+def _send_build_result(board, commit_upto, return_code, **kwargs):
+ """Send a build result response
+
+ Args:
+ board (str): Board target name
+ commit_upto (int): Commit number
+ return_code (int): Build return code
+ **kwargs: Optional keys: stderr, stdout, sizes
+ """
+ result = {
+ 'resp': 'build_result',
+ 'board': board,
+ 'commit_upto': commit_upto,
+ 'return_code': return_code,
+ 'stderr': kwargs.get('stderr', ''),
+ 'stdout': kwargs.get('stdout', ''),
+ 'load_avg': _get_load_avg(),
+ }
+ sizes = kwargs.get('sizes')
+ if sizes:
+ result['sizes'] = sizes
+ _send(result)
+
+
+def _get_nthreads():
+ """Get the number of available build threads
+
+ Returns:
+ int: Number of threads available for building
+ """
+ try:
+ return os.cpu_count() or 1
+ except (AttributeError, NotImplementedError):
+ return 1
+
+
+def _get_load_avg():
+ """Get the 1-minute load average
+
+ Returns:
+ float: 1-minute load average, or 0.0 if unavailable
+ """
+ try:
+ with open('/proc/loadavg', encoding='utf-8') as inf:
+ return float(inf.read().split()[0])
+ except (OSError, ValueError, IndexError):
+ return 0.0
+
+
+def _get_sizes(out_dir):
+ """Get the image sizes from a build output directory
+
+ Uses subprocess.Popen directly instead of command.run_pipe() to
+ avoid the select() FD_SETSIZE limit in cros_subprocess. With many
+ threads running builds, pipe file descriptors can exceed 1024,
+ causing select() to fail or corrupt memory.
+
+ Args:
+ out_dir (str): Build output directory
+
+ Returns:
+ dict: Size information, or empty dict if not available
+ """
+ elf = os.path.join(out_dir, 'u-boot')
+ if not os.path.exists(elf):
+ return {}
+ try:
+ proc = subprocess.Popen( # pylint: disable=R1732
+ ['size', elf], stdin=subprocess.DEVNULL,
+ stdout=subprocess.PIPE, stderr=subprocess.PIPE)
+ stdout, _ = proc.communicate()
+ if proc.returncode == 0:
+ # Strip the header line from size output, keeping only data lines.
+ # This matches the format that local builderthread produces.
+ lines = stdout.decode('utf-8', errors='replace').splitlines()
+ if len(lines) > 1:
+ return {'raw': '\n'.join(lines[1:])}
+ except OSError:
+ pass
+ return {}
+
+
+def _worker_make(_commit, _brd, _stage, cwd, *args, **kwargs):
+ """Run make using subprocess.Popen to avoid select() FD limit
+
+ On workers with many parallel builds, file descriptor numbers can
+ exceed FD_SETSIZE (1024), causing the select()-based
+ communicate_filter in cros_subprocess to fail. Using
+ subprocess.Popen with communicate() avoids this.
+
+ Args:
+ _commit: Unused (API compatibility with Builder.make)
+ _brd: Unused
+ _stage: Unused
+ cwd (str): Working directory
+ *args: Make arguments
+ **kwargs: Must include 'env' dict
+
+ Returns:
+ CommandResult: Result of the make command
+ """
+ env = kwargs.get('env')
+ cmd = ['make'] + list(args)
+ try:
+ proc = subprocess.Popen( # pylint: disable=R1732
+ cmd, cwd=cwd, env=env, stdin=subprocess.DEVNULL,
+ stdout=subprocess.PIPE, stderr=subprocess.PIPE)
+ stdout, stderr = proc.communicate()
+ result = command.CommandResult()
+ result.stdout = stdout.decode('utf-8', errors='replace')
+ result.stderr = stderr.decode('utf-8', errors='replace')
+ result.combined = result.stdout + result.stderr
+ result.return_code = proc.returncode
+ return result
+ except Exception as exc: # pylint: disable=W0718
+ result = command.CommandResult()
+ result.return_code = 1
+ result.stderr = f'make failed to start: {exc}'
+ result.combined = result.stderr
+ return result
+
+
+def _run_git(*args, cwd=None, timeout=60):
+ """Run a git command using subprocess.Popen to avoid select() FD limit
+
+ On workers with many parallel builds, file descriptor numbers can
+ exceed FD_SETSIZE (1024), causing the select()-based cros_subprocess
+ to fail. Using subprocess.Popen with communicate() avoids this.
+
+ Args:
+ *args: Git command arguments (without 'git' prefix)
+ cwd (str): Working directory
+ timeout (int): Timeout in seconds for the command
+
+ Returns:
+ CommandResult: Result of the git command
+
+ Raises:
+ OSError: If the git command fails or times out
+ """
+ cmd = ['git'] + list(args)
+ proc = subprocess.Popen( # pylint: disable=R1732
+ cmd, cwd=cwd, stdin=subprocess.DEVNULL,
+ stdout=subprocess.PIPE, stderr=subprocess.PIPE)
+ try:
+ _, stderr = proc.communicate(timeout=timeout)
+ except subprocess.TimeoutExpired as exc:
+ proc.kill()
+ proc.communicate()
+ raise OSError(
+ f'git command timed out after {timeout}s: {cmd}') from exc
+ if proc.returncode != 0:
+ raise OSError(stderr.decode('utf-8', errors='replace').strip())
+
+
+def _resolve_git_dir(git_dir):
+ """Resolve a .git entry to the actual git directory
+
+ For a regular repo, .git is a directory and is returned as-is.
+ For a worktree, .git is a file containing 'gitdir: <path>' and
+ the referenced directory is returned.
+
+ Args:
+ git_dir (str): Path to a .git file or directory
+
+ Returns:
+ str: Path to the actual git directory
+ """
+ if os.path.isfile(git_dir):
+ with open(git_dir, encoding='utf-8') as inf:
+ line = inf.readline().strip()
+ if line.startswith('gitdir: '):
+ path = line[8:]
+ if not os.path.isabs(path):
+ path = os.path.join(os.path.dirname(git_dir), path)
+ return path
+ return git_dir
+
+
+def _remove_stale_lock(git_dir):
+ """Remove a stale index.lock left by a previous SIGKILL'd run
+
+ When the worker is killed (e.g. boss timeout or Ctrl-C), any
+ in-progress git checkout leaves behind an index.lock. This must
+ be cleaned up before the next checkout can proceed.
+
+ Args:
+ git_dir (str): Path to a .git file or directory
+ """
+ real_dir = _resolve_git_dir(git_dir)
+ lock = os.path.join(real_dir, 'index.lock')
+ try:
+ os.remove(lock)
+ except FileNotFoundError:
+ pass
+
+
+def _setup_worktrees(work_dir, git_dir, num_threads):
+ """Create per-thread worktrees sequentially with progress messages
+
+ Sets up git worktrees for each build thread before the Builder is
+ created. This avoids the problems of lazy per-thread creation:
+ concurrent threads contending on the index lock, and no progress
+ messages reaching the boss while threads are blocked.
+
+ For existing valid worktrees (from a previous run), creation is
+ skipped. A 'worktree_created' message is sent after each thread
+ so the boss can show setup progress.
+
+ Args:
+ work_dir (str): Base work directory (Builder's base_dir)
+ git_dir (str): Git directory path (e.g. work_dir/.git)
+ num_threads (int): Number of threads to create worktrees for
+ """
+ bm_work = os.path.join(work_dir, '.bm-work')
+ os.makedirs(bm_work, exist_ok=True)
+ src_dir = os.path.abspath(git_dir)
+
+ # Clean up stale locks from a previous SIGKILL'd run. Both the
+ # main repo and the worktree gitdirs can have stale index.lock
+ # files that would make git commands hang indefinitely.
+ _remove_stale_lock(git_dir)
+
+ # Prune stale worktree entries before creating new ones
+ _run_git('worktree', 'prune', cwd=work_dir)
+
+ for i in range(num_threads):
+ thread_dir = os.path.join(bm_work, f'{i:02d}')
+ dot_git = os.path.join(thread_dir, '.git')
+
+ need_worktree = not os.path.exists(dot_git)
+ if not need_worktree:
+ if os.path.isdir(dot_git):
+ # This is a full clone from an older buildman version,
+ # not a worktree. Remove it so we can create a proper
+ # worktree that shares objects with the main repo.
+ shutil.rmtree(thread_dir)
+ need_worktree = True
+ else:
+ # Validate existing worktree — it may be stale from a
+ # previous killed run whose gitdir was pruned
+ real_dir = _resolve_git_dir(dot_git)
+ if not os.path.isdir(real_dir):
+ os.remove(dot_git)
+ need_worktree = True
+
+ if need_worktree:
+ os.makedirs(thread_dir, exist_ok=True)
+ _run_git('--git-dir', src_dir, 'worktree',
+ 'add', '.', '--detach', cwd=thread_dir)
+ else:
+ _remove_stale_lock(dot_git)
+
+ _send({'resp': 'worktree_created', 'thread': i})
+
+
+class _WorkerBuilderThread(builderthread.BuilderThread):
+ """BuilderThread subclass that sends results over SSH
+
+ Overrides _write_result() (no-op, since the worker doesn't write
+ build output to a local directory tree), _send_result() (sends
+ the result back to the boss as a JSON protocol message instead of
+ putting it in the builder's out_queue), and _checkout() (uses
+ subprocess.Popen for git checkout to avoid the select() FD_SETSIZE
+ limit on machines with many threads).
+
+ Worktrees are created sequentially before the Builder starts
+ threads (see _setup_worktrees), so _checkout() only needs to
+ do the checkout itself.
+ """
+
+ def run_job(self, job):
+ """Run a job, sending a heartbeat so the boss knows we're alive"""
+ _send({'resp': 'heartbeat', 'board': job.brd.target,
+ 'thread': self.thread_num})
+ super().run_job(job)
+
+ def _write_result(self, result, keep_outputs, work_in_output):
+ """Skip disk writes — results are sent over SSH"""
+
+ def _send_result(self, result):
+ """Send the build result to the boss over the SSH protocol"""
+ sizes = {}
+ if result.out_dir and result.return_code == 0:
+ sizes = _get_sizes(result.out_dir)
+ _send_build_result(
+ result.brd.target, result.commit_upto, result.return_code,
+ stderr=result.stderr or '', stdout=result.stdout or '',
+ sizes=sizes)
+
+ def _checkout(self, commit_upto, work_dir):
+ """Check out a commit using subprocess to avoid select() FD limit
+
+ Worktrees are already set up by _setup_worktrees() before
+ the Builder starts threads, so this only needs to do the
+ checkout itself.
+ """
+ if self.builder.commits:
+ commit = self.builder.commits[commit_upto]
+ if self.builder.checkout:
+ git_dir = os.path.join(work_dir, '.git')
+ _remove_stale_lock(git_dir)
+ _run_git('checkout', '-f', commit.hash, cwd=work_dir)
+ else:
+ commit = 'current'
+ return commit
+
+
+def _cmd_setup(req, state):
+ """Handle the 'setup' command
+
+ Creates or re-uses a work directory and initialises a git repo in it.
+ The boss can then use 'git push' over SSH to send source code
+ to the repo before issuing build commands.
+
+ Also scans for available toolchains so that the worker can select
+ the right cross-compiler for each board's architecture.
+
+ Args:
+ req (dict): Request with keys:
+ work_dir (str): Working directory path (auto-created if empty)
+ state (dict): Worker state, updated in place
+
+ Returns:
+ bool: True on success
+ """
+ work_dir = req.get('work_dir')
+ if not work_dir:
+ work_dir = tempfile.mkdtemp(prefix='bm-worker-')
+ state['auto_work_dir'] = True
+ os.makedirs(work_dir, exist_ok=True)
+ state['work_dir'] = work_dir
+
+ # Initialise a git repo so the boss can push to it
+ git_dir = os.path.join(work_dir, '.git')
+ if not os.path.isdir(git_dir):
+ try:
+ command.run_one('git', 'init', cwd=work_dir,
+ capture=True, raise_on_error=True)
+ except command.CommandExc as exc:
+ _send_error(f'git init failed: {exc}')
+ return False
+
+ _send({'resp': 'setup_done', 'work_dir': work_dir,
+ 'git_dir': git_dir})
+ return True
+
+
+def _cmd_configure(req, state):
+ """Handle the 'configure' command
+
+ Stores build settings received from the boss. These settings mirror
+ the command-line flags that affect how make is invoked (verbose,
+ allow_missing, no_lto, etc.) and are applied to every subsequent
+ build.
+
+ Args:
+ req (dict): Request with 'settings' dict containing build flags
+ state (dict): Worker state, updated in place
+
+ Returns:
+ bool: True on success
+ """
+ settings = req.get('settings', {})
+ state['settings'] = settings
+ _dbg(f'configure: {settings}')
+ _send({'resp': 'configure_done'})
+ return True
+
+
+def _parse_commits(commit_hashes):
+ """Convert commit hashes to Commit objects
+
+ Args:
+ commit_hashes (list): Commit hashes, or [None] for current source
+
+ Returns:
+ list of Commit or None: Commit objects, or None for current source
+ """
+ if commit_hashes and commit_hashes[0] is not None:
+ return [Commit(h) for h in commit_hashes]
+ return None
+
+
+def _parse_boards(board_dicts):
+ """Convert board dicts from the boss into Board objects
+
+ Args:
+ board_dicts (list of dict): Each with 'board' and 'arch' keys
+
+ Returns:
+ dict: target_name -> Board mapping
+ """
+ board_selected = {}
+ for bd in board_dicts:
+ target = bd['board']
+ brd = Board('Active', bd.get('arch', ''), '', '', '',
+ target, target, target)
+ board_selected[target] = brd
+ return board_selected
+
+
+def _run_build(bldr, commits, board_selected):
+ """Run a build and send the result over the protocol
+
+ Args:
+ bldr (Builder): Configured builder
+ commits (list of Commit or None): Commits to build
+ board_selected (dict): target_name -> Board mapping
+ """
+ bldr.init_build(commits, board_selected, keep_outputs=False,
+ verbose=False, fragments=None)
+ try:
+ _fail, _warned, exceptions = bldr.run_build(delay_summary=True)
+ except Exception as exc: # pylint: disable=W0718
+ _dbg(f'run_build crashed: {exc}')
+ _dbg(traceback.format_exc())
+ _send({'resp': 'build_done', 'exceptions': 1})
+ return
+ _send({'resp': 'build_done',
+ 'exceptions': len(exceptions)})
+
+
+def _cmd_build_boards(req, state):
+ """Handle the 'build_boards' command
+
+ Creates a Builder with a _WorkerBuilderThread subclass and runs the
+ build. Results are streamed back over the SSH protocol as each
+ commit completes.
+
+ Args:
+ req (dict): Request with:
+ boards (list of dict): Each with 'board' and 'arch'
+ commits (list): Commit hashes in order, or [None] for
+ current source
+ state (dict): Worker state
+ """
+ work_dir = state.get('work_dir')
+ if not work_dir:
+ _send_error('no work directory set up')
+ return
+
+ board_dicts = req.get('boards', [])
+ if not board_dicts:
+ _send_error('no boards specified')
+ return
+
+ toolchains = state.get('toolchains')
+ if not toolchains:
+ _send_error('no toolchains available (run setup first)')
+ return
+
+ nthreads = state.get('nthreads', _get_nthreads())
+ commits = _parse_commits(req.get('commits', [None]))
+ board_selected = _parse_boards(board_dicts)
+
+ # Calculate thread/job split: enough threads to keep all CPUs
+ # busy, with each thread running make with -j proportionally
+ num_threads = min(nthreads, len(board_selected))
+ num_jobs = max(1, nthreads // num_threads)
+
+ _dbg(f'build_boards: {len(board_selected)} boards x '
+ f'{len(commits) if commits else 1} commits '
+ f'threads={num_threads} -j{num_jobs}')
+
+ # Set up worktrees sequentially before creating the Builder.
+ # This sends progress messages so the boss can show setup status
+ # (e.g. [ruru 3/256]) and avoids the build timeout firing before
+ # any build results arrive.
+ git_dir = os.path.join(work_dir, '.git')
+ if commits is not None:
+ _send({'resp': 'build_started', 'num_threads': num_threads})
+ _setup_worktrees(work_dir, git_dir, num_threads)
+
+ bldr = _create_builder(state, num_threads, num_jobs)
+ _run_build(bldr, commits, board_selected)
+
+
+def _create_builder(state, num_threads, num_jobs):
+ """Create a Builder configured for worker use
+
+ Args:
+ state (dict): Worker state with toolchains, work_dir, settings
+ num_threads (int): Number of build threads
+ num_jobs (int): Make -j value per thread
+
+ Returns:
+ Builder: Configured builder with threads started and waiting
+ """
+ work_dir = state['work_dir']
+ git_dir = os.path.join(work_dir, '.git')
+ settings = state.get('settings', {})
+ toolchains = state['toolchains']
+
+ col = terminal.Color(terminal.COLOR_NEVER)
+ opts = DisplayOptions(
+ show_errors=False, show_sizes=False, show_detail=False,
+ show_bloat=False, show_config=False, show_environment=False,
+ show_unknown=False, ide=True, list_error_boards=False)
+ result_handler = ResultHandler(col, opts)
+
+ bldr = builder_mod.Builder(
+ toolchains, work_dir, git_dir,
+ num_threads, num_jobs, col, result_handler,
+ thread_class=_WorkerBuilderThread,
+ make_func=_worker_make,
+ handle_signals=False,
+ lazy_thread_setup=True,
+ checkout=True,
+ per_board_out_dir=False,
+ force_build=settings.get('force_build', False),
+ force_build_failures=settings.get('force_build', False),
+ no_lto=settings.get('no_lto', False),
+ allow_missing=settings.get('allow_missing', False),
+ verbose_build=settings.get('verbose_build', False),
+ warnings_as_errors=settings.get('warnings_as_errors', False),
+ mrproper=settings.get('mrproper', False),
+ fallback_mrproper=settings.get('fallback_mrproper', False),
+ config_only=settings.get('config_only', False),
+ reproducible_builds=settings.get('reproducible_builds', False),
+ force_config_on_failure=True,
+ kconfig_check=settings.get('kconfig_check', True),
+ )
+ result_handler.set_builder(bldr)
+ return bldr
+
+
+def _cmd_build_prepare(req, state):
+ """Handle the 'build_prepare' command
+
+ Creates a Builder with threads waiting for jobs. The boss follows
+ this with 'build_board' commands to feed boards one at a time, then
+ 'build_done' to signal completion.
+
+ Args:
+ req (dict): Request with:
+ commits (list): Commit hashes in order, or [None] for
+ current source
+ state (dict): Worker state
+ """
+ work_dir = state.get('work_dir')
+ if not work_dir:
+ _send_error('no work directory set up')
+ return
+
+ toolchains = state.get('toolchains')
+ if not toolchains:
+ _send_error('no toolchains available (run setup first)')
+ return
+
+ commits = _parse_commits(req.get('commits', [None]))
+ nthreads = state.get('nthreads', _get_nthreads())
+ max_boards = req.get('max_boards', 0)
+
+ num_threads = nthreads
+ num_jobs = None # dynamic: nthreads / active_boards
+
+ _dbg(f'build_prepare: '
+ f'{len(commits) if commits else 1} commits '
+ f'threads={num_threads} max_boards={max_boards} -j=dynamic')
+
+ # Set up worktrees before creating the Builder
+ git_dir = os.path.join(work_dir, '.git')
+ if commits is not None:
+ _send({'resp': 'build_started', 'num_threads': num_threads})
+ _setup_worktrees(work_dir, git_dir, num_threads)
+
+ bldr = _create_builder(state, num_threads, num_jobs)
+ bldr.max_boards = max_boards
+
+ # Minimal init: set commits and prepare directories. Threads are
+ # already started by the Builder constructor, waiting on the queue.
+ bldr.commit_count = len(commits) if commits else 1
+ bldr.commits = commits
+ bldr.verbose = False
+ builderthread.mkdir(bldr.base_dir, parents=True)
+ bldr.prepare_working_space(num_threads, commits is not None)
+ bldr.prepare_output_space()
+ bldr.start_time = builder_mod.datetime.now()
+ bldr.count = 0
+ bldr.upto = bldr._warned = bldr.fail = 0
+ bldr.timestamps = builder_mod.collections.deque()
+ bldr.thread_exceptions = []
+
+ state['builder'] = bldr
+ state['commits'] = commits
+ _send({'resp': 'build_prepare_done'})
+
+
+def _cmd_build_board(req, state):
+ """Handle the 'build_board' command
+
+ Adds one board to the running Builder's job queue.
+
+ Args:
+ req (dict): Request with:
+ board (str): Board target name
+ arch (str): Board architecture
+ state (dict): Worker state with 'builder' from build_prepare
+ """
+ bldr = state.get('builder')
+ if not bldr:
+ _send_error('no builder (send build_prepare first)')
+ return
+
+ target = req['board']
+ arch = req.get('arch', '')
+ brd = Board('Active', arch, '', '', '', target, target, target)
+ commits = state.get('commits')
+
+ job = builderthread.BuilderJob()
+ job.brd = brd
+ job.commits = commits
+ job.keep_outputs = False
+ job.work_in_output = bldr.work_in_output
+ job.adjust_cfg = bldr.adjust_cfg
+ job.fragments = None
+ job.step = bldr.step
+ bldr.count += bldr.commit_count
+ bldr.queue.put(job)
+
+
+def _cmd_build_done(state):
+ """Handle the 'build_done' command from the boss
+
+ Waits for all queued jobs to finish, then sends build_done.
+
+ Args:
+ state (dict): Worker state with 'builder' from build_prepare
+ """
+ bldr = state.get('builder')
+ if not bldr:
+ _send({'resp': 'build_done', 'exceptions': 0})
+ return
+
+ try:
+ _fail, _warned, exceptions = bldr.run_build(delay_summary=True)
+ except Exception as exc: # pylint: disable=W0718
+ _dbg(f'run_build crashed: {exc}')
+ _dbg(traceback.format_exc())
+ _send({'resp': 'build_done', 'exceptions': 1})
+ state.pop('builder', None)
+ state.pop('commits', None)
+ return
+ _send({'resp': 'build_done',
+ 'exceptions': len(exceptions)})
+ state.pop('builder', None)
+ state.pop('commits', None)
+
+
+def _cmd_quit(state):
+ """Handle the 'quit' command
+
+ Cleans up the work directory if auto-created, sends quit_ack,
+ then kills all child processes (make, cc1, etc.) and this process
+ via SIGKILL to the process group.
+
+ Args:
+ state (dict): Worker state
+ """
+ work_dir = state.get('work_dir', '')
+ if work_dir and state.get('auto_work_dir'):
+ shutil.rmtree(work_dir, ignore_errors=True)
+ _send({'resp': 'quit_ack'})
+ _kill_group()
+
+
+def run_worker(debug=False):
+ """Main worker loop
+
+ Reads JSON commands from stdin and dispatches them. Sends responses
+ as 'BM> ' prefixed JSON lines on stdout. Builds run in parallel
+ using Builder with a _WorkerBuilderThread subclass.
+
+ Args:
+ debug (bool): True to print debug messages to stderr
+
+ Returns:
+ int: 0 on success, non-zero on error
+ """
+ global _debug, _protocol_out # pylint: disable=W0603
+
+ _debug = debug
+
+ # Save the real stdout for protocol messages, then redirect
+ # stdout to stderr so that tprint and other library output
+ # doesn't corrupt the JSON protocol on the SSH pipe.
+ _protocol_out = sys.stdout
+ sys.stdout = sys.stderr
+
+ # Exit immediately on signals, killing all child processes.
+ # SIGHUP is sent by sshd when the SSH connection drops.
+ # _kill_group() sends SIGKILL to the process group which terminates
+ # everything including this process.
+ def _exit_handler(_signum, _frame):
+ _kill_group()
+ os._exit(1) # pylint: disable=W0212
+ signal.signal(signal.SIGTERM, _exit_handler)
+ signal.signal(signal.SIGINT, _exit_handler)
+ signal.signal(signal.SIGHUP, _exit_handler)
+
+ nthreads = _get_nthreads()
+
+ # Scan for toolchains at startup so we can select the right
+ # cross-compiler for each board's architecture. The boss sets up
+ # the git repo and pushes source via SSH before starting us, so
+ # there is no 'setup' command — we are ready as soon as we start.
+ toolchains = toolchain_mod.Toolchains()
+ toolchains.get_settings(show_warning=False)
+ toolchains.scan(verbose=False, raise_on_error=False)
+
+ _dbg(f'ready: {nthreads} threads')
+ _send({'resp': 'ready', 'nthreads': nthreads, 'slots': nthreads})
+
+ stop_event = threading.Event()
+ state = {
+ 'work_dir': os.getcwd(),
+ 'nthreads': nthreads,
+ 'toolchains': toolchains,
+ 'stop': stop_event,
+ }
+
+ # Read stdin in a background thread so that EOF (boss
+ # disconnected) is detected even while a long-running command
+ # like build_boards is executing. When EOF is seen, kill the
+ # entire process group so that all child make processes die too.
+ cmd_queue = queue.Queue()
+ eof_sentinel = object()
+
+ def _stdin_reader():
+ while True:
+ line = sys.stdin.readline()
+ if not line:
+ # Boss disconnected — kill everything immediately.
+ # _kill_group() handles production (kills the process
+ # group); stop_event handles tests (where _kill_group
+ # is a no-op) by unblocking build threads.
+ _dbg('stdin closed, killing group')
+ stop_event.set()
+ _kill_group()
+ cmd_queue.put(eof_sentinel)
+ return
+ line = line.strip()
+ if line:
+ cmd_queue.put(line)
+
+ threading.Thread(target=_stdin_reader, daemon=True).start()
+
+ return _dispatch_commands(cmd_queue, eof_sentinel, state)
+
+
+def _dispatch_commands(cmd_queue, eof_sentinel, state):
+ """Read commands from the queue and dispatch them
+
+ Args:
+ cmd_queue (queue.Queue): Queue of JSON command strings
+ eof_sentinel (object): Sentinel value indicating stdin closed
+ state (dict): Worker state
+
+ Returns:
+ int: 0 on clean quit, 1 on unexpected stdin close
+ """
+ while True:
+ try:
+ line = cmd_queue.get(timeout=1)
+ except queue.Empty:
+ continue
+ if line is eof_sentinel:
+ break
+
+ try:
+ req = json.loads(line)
+ except json.JSONDecodeError as exc:
+ _send_error(f'invalid JSON: {exc}')
+ continue
+
+ cmd = req.get('cmd', '')
+
+ if cmd == 'setup':
+ _cmd_setup(req, state)
+ elif cmd == 'configure':
+ _cmd_configure(req, state)
+ elif cmd == 'build_boards':
+ _cmd_build_boards(req, state)
+ elif cmd == 'build_prepare':
+ _cmd_build_prepare(req, state)
+ elif cmd == 'build_board':
+ _cmd_build_board(req, state)
+ elif cmd == 'build_done':
+ _cmd_build_done(state)
+ elif cmd == 'quit':
+ _cmd_quit(state)
+ return 0
+ else:
+ _send_error(f'unknown command: {cmd}')
+
+ # stdin closed without quit — boss was interrupted
+ return 1
+
+
+def do_worker(debug=False):
+ """Entry point for 'buildman --worker'
+
+ Args:
+ debug (bool): True to print debug messages to stderr
+
+ Returns:
+ int: 0 on success
+ """
+ global _is_group_leader # pylint: disable=W0603
+
+ # Ensure we are a process group leader so _kill_group() can kill
+ # all child processes (make, cc1, as, ld) on exit. When launched
+ # via SSH, sshd already makes us session + group leader (pid ==
+ # pgid), so setpgrp() fails with EPERM — that's fine. This is
+ # done here rather than in run_worker() so that tests can call
+ # run_worker() without becoming a process group leader.
+ try:
+ os.setpgrp()
+ except OSError:
+ pass
+ _is_group_leader = os.getpid() == os.getpgrp()
+ return run_worker(debug)