[Concept,06/24] pickman: Add tests to improve control.py coverage

Message ID 20251217022823.392557-7-sjg@u-boot.org
State New
Headers
Series pickman: Refine the feature set |

Commit Message

Simon Glass Dec. 17, 2025, 2:27 a.m. UTC
  From: Simon Glass <simon.glass@canonical.com>

Add tests to improve test coverage for control.py:
- test_poll_continues_on_step_error: Test poll warning on step error
- test_format_history_summary: Test history summary formatting
- test_get_next_commits_with_empty_lines: Test empty line handling
- test_commit_source_resolve_error: Test commit resolution failure
- test_unknown_command: Test unknown command handling
- test_review_with_mrs_no_comments: Test review with open MRs

This improves control.py coverage from 63% to 67%.

Co-developed-by: Claude Opus 4.5 <noreply@anthropic.com>
Signed-off-by: Simon Glass <simon.glass@canonical.com>
---

 tools/pickman/ftest.py | 179 +++++++++++++++++++++++++++++++++++++++++
 1 file changed, 179 insertions(+)
  

Patch

diff --git a/tools/pickman/ftest.py b/tools/pickman/ftest.py
index cc4a5727ac4..8865296ff11 100644
--- a/tools/pickman/ftest.py
+++ b/tools/pickman/ftest.py
@@ -20,6 +20,7 @@  sys.path.insert(0, os.path.join(our_path, '..'))
 # pylint: disable=wrong-import-position,import-error,cyclic-import
 from u_boot_pylib import command
 from u_boot_pylib import terminal
+from u_boot_pylib import tout
 
 from pickman import __main__ as pickman
 from pickman import control
@@ -1363,6 +1364,184 @@  class TestPoll(unittest.TestCase):
         self.assertEqual(ret, 0)
         self.assertEqual(call_count[0], 2)
 
+    def test_poll_continues_on_step_error(self):
+        """Test poll continues when step returns non-zero."""
+        call_count = [0]
+
+        def mock_step(_args, _dbs):
+            call_count[0] += 1
+            if call_count[0] >= 2:
+                raise KeyboardInterrupt
+            return 1  # Return error
+
+        with mock.patch.object(control, 'do_step', mock_step):
+            with mock.patch('time.sleep'):
+                args = argparse.Namespace(
+                    cmd='poll', source='us/next', interval=1,
+                    remote='ci', target='master'
+                )
+                with terminal.capture() as (_, stderr):
+                    ret = control.do_poll(args, None)
+
+        self.assertEqual(ret, 0)
+        self.assertIn('returned 1', stderr.getvalue())
+
+
+class TestFormatHistorySummary(unittest.TestCase):
+    """Tests for format_history_summary function."""
+
+    def test_format_history_summary(self):
+        """Test formatting history summary."""
+        commits = [
+            control.CommitInfo('aaa111', 'aaa111a', 'First commit', 'Author 1'),
+            control.CommitInfo('bbb222', 'bbb222b', 'Second commit', 'Author 2'),
+        ]
+        result = control.format_history_summary('us/next', commits, 'cherry-abc')
+
+        self.assertIn('us/next', result)
+        self.assertIn('Branch: cherry-abc', result)
+        self.assertIn('- aaa111a First commit', result)
+        self.assertIn('- bbb222b Second commit', result)
+
+
+class TestGetNextCommitsEmptyLine(unittest.TestCase):
+    """Tests for get_next_commits with empty lines."""
+
+    def setUp(self):
+        """Set up test fixtures."""
+        fd, self.db_path = tempfile.mkstemp(suffix='.db')
+        os.close(fd)
+        os.unlink(self.db_path)
+        self.old_db_fname = control.DB_FNAME
+        control.DB_FNAME = self.db_path
+        database.Database.instances.clear()
+
+    def tearDown(self):
+        """Clean up test fixtures."""
+        control.DB_FNAME = self.old_db_fname
+        if os.path.exists(self.db_path):
+            os.unlink(self.db_path)
+        database.Database.instances.clear()
+        command.TEST_RESULT = None
+
+    def test_get_next_commits_with_empty_lines(self):
+        """Test get_next_commits handles empty lines in output."""
+        with terminal.capture():
+            dbs = database.Database(self.db_path)
+            dbs.start()
+            dbs.source_set('us/next', 'abc123')
+            dbs.commit()
+
+            # Log output with empty lines
+            log_output = (
+                'aaa111|aaa111a|Author 1|First commit|abc123\n'
+                '\n'  # Empty line
+                'bbb222|bbb222b|Author 2|Second commit|aaa111\n'
+                '\n'  # Another empty line
+            )
+            command.TEST_RESULT = command.CommandResult(stdout=log_output)
+
+            commits, merge_found, error = control.get_next_commits(dbs,
+                                                                   'us/next')
+            self.assertIsNone(error)
+            self.assertFalse(merge_found)
+            self.assertEqual(len(commits), 2)
+            dbs.close()
+
+
+class TestDoCommitSourceResolveError(unittest.TestCase):
+    """Tests for do_commit_source error handling."""
+
+    def setUp(self):
+        """Set up test fixtures."""
+        fd, self.db_path = tempfile.mkstemp(suffix='.db')
+        os.close(fd)
+        os.unlink(self.db_path)
+        self.old_db_fname = control.DB_FNAME
+        control.DB_FNAME = self.db_path
+        database.Database.instances.clear()
+
+    def tearDown(self):
+        """Clean up test fixtures."""
+        control.DB_FNAME = self.old_db_fname
+        if os.path.exists(self.db_path):
+            os.unlink(self.db_path)
+        database.Database.instances.clear()
+        command.TEST_RESULT = None
+
+    def test_commit_source_resolve_error(self):
+        """Test commit-source fails when commit can't be resolved."""
+        with terminal.capture():
+            dbs = database.Database(self.db_path)
+            dbs.start()
+            dbs.source_set('us/next', 'oldcommit12345')
+            dbs.commit()
+
+        database.Database.instances.clear()
+
+        def mock_git_fail(**_kwargs):
+            raise command.CommandExc('git error', command.CommandResult())
+
+        command.TEST_RESULT = mock_git_fail
+
+        args = argparse.Namespace(cmd='commit-source', source='us/next',
+                                  commit='badcommit')
+        with terminal.capture() as (_, stderr):
+            ret = control.do_commit_source(args, None)
+        self.assertEqual(ret, 1)
+        self.assertIn('Could not resolve', stderr.getvalue())
+
+
+class TestDoPickmanUnknownCommand(unittest.TestCase):
+    """Tests for do_pickman with unknown command."""
+
+    def setUp(self):
+        """Set up test fixtures."""
+        fd, self.db_path = tempfile.mkstemp(suffix='.db')
+        os.close(fd)
+        os.unlink(self.db_path)
+        self.old_db_fname = control.DB_FNAME
+        control.DB_FNAME = self.db_path
+        database.Database.instances.clear()
+
+    def tearDown(self):
+        """Clean up test fixtures."""
+        control.DB_FNAME = self.old_db_fname
+        if os.path.exists(self.db_path):
+            os.unlink(self.db_path)
+        database.Database.instances.clear()
+
+    def test_unknown_command(self):
+        """Test do_pickman returns 1 for unknown command."""
+        args = argparse.Namespace(cmd='unknown-command')
+        with terminal.capture():
+            ret = control.do_pickman(args)
+        self.assertEqual(ret, 1)
+
+
+class TestDoReviewWithMrs(unittest.TestCase):
+    """Tests for do_review with open MRs."""
+
+    def test_review_with_mrs_no_comments(self):
+        """Test review with open MRs but no comments."""
+        tout.init(tout.INFO)
+
+        mock_mr = {
+            'iid': 123,
+            'title': '[pickman] Test MR',
+            'web_url': 'https://gitlab.com/mr/123',
+        }
+        with mock.patch.object(gitlab_api, 'get_open_pickman_mrs',
+                               return_value=[mock_mr]):
+            with mock.patch.object(gitlab_api, 'get_mr_comments',
+                                   return_value=[]):
+                args = argparse.Namespace(cmd='review', remote='ci')
+                with terminal.capture() as (stdout, _):
+                    ret = control.do_review(args, None)
+
+        self.assertEqual(ret, 0)
+        self.assertIn('Found 1 open pickman MR', stdout.getvalue())
+
 
 if __name__ == '__main__':
     unittest.main()