[Concept,2/2] pickman: Only treat missing table as fresh database

Message ID 20260316185102.3892597-3-sjg@u-boot.org
State New
Headers
Series pickman: Fix robustness issues with error handling and large prompts |

Commit Message

Simon Glass March 16, 2026, 6:51 p.m. UTC
  From: Simon Glass <simon.glass@canonical.com>

get_schema_version() catches OperationalError and returns 0 to indicate
a fresh database. However, it catches all OperationalError variants,
including "attempt to write a readonly database" and "database is
locked". When this happens, migrate_to() misinterprets an existing
database as empty and re-runs all migrations from scratch, destroying
the data.

Narrow the catch to only match "no such table", which is the genuine
fresh-database case. All other OperationalError variants are re-raised
so that callers see the real problem instead of silently losing data.

Signed-off-by: Simon Glass <simon.glass@canonical.com>
---

 tools/pickman/database.py | 14 ++++++++++++--
 tools/pickman/ftest.py    | 34 ++++++++++++++++++++++++++++++++++
 2 files changed, 46 insertions(+), 2 deletions(-)
  

Patch

diff --git a/tools/pickman/database.py b/tools/pickman/database.py
index 317668a979d..28b28896cb2 100644
--- a/tools/pickman/database.py
+++ b/tools/pickman/database.py
@@ -190,14 +190,24 @@  class Database:  # pylint: disable=too-many-public-methods
     def get_schema_version(self):
         """Get the version of the database's schema
 
+        Only returns 0 when the schema_version table genuinely does not
+        exist (i.e. a fresh database).  Other errors (read-only, locked,
+        corrupt) are re-raised so that migrate_to() does not accidentally
+        destroy an existing database by re-running all migrations.
+
         Return:
             int: Database version, 0 means there is no data
+
+        Raises:
+            sqlite3.OperationalError: For errors other than a missing table
         """
         try:
             self.cur.execute('SELECT version FROM schema_version')
             return self.cur.fetchone()[0]
-        except sqlite3.OperationalError:
-            return 0
+        except sqlite3.OperationalError as exc:
+            if 'no such table' in str(exc):
+                return 0
+            raise
 
     def execute(self, query, parameters=()):
         """Execute a database query
diff --git a/tools/pickman/ftest.py b/tools/pickman/ftest.py
index 52c807cf8cc..261ca4cd2d5 100644
--- a/tools/pickman/ftest.py
+++ b/tools/pickman/ftest.py
@@ -10,6 +10,7 @@  import asyncio
 import argparse
 import os
 import shutil
+import sqlite3
 import subprocess
 import sys
 import tempfile
@@ -398,6 +399,39 @@  class TestDatabase(unittest.TestCase):
             self.assertEqual(sources[1], ('branch-b', 'def456'))
             dbs.close()
 
+    def test_schema_version_readonly_raises(self):
+        """Test that a read-only database raises instead of returning 0.
+
+        Previously get_schema_version() caught all OperationalError and
+        returned 0, which caused migrate_to() to re-create all tables on
+        top of an existing (read-only) database, wiping the data.
+        """
+        with terminal.capture():
+            dbs = database.Database(self.db_path)
+            dbs.start()
+            dbs.source_set('us/next', 'abc123')
+            dbs.commit()
+
+            # Simulate a read-only error by replacing the cursor
+            real_cur = dbs.cur
+            mock_cur = mock.MagicMock()
+            mock_cur.execute.side_effect = sqlite3.OperationalError(
+                'attempt to write a readonly database')
+            dbs.cur = mock_cur
+            with self.assertRaises(sqlite3.OperationalError):
+                dbs.get_schema_version()
+            dbs.cur = real_cur
+            dbs.close()
+
+    def test_schema_version_missing_table_returns_zero(self):
+        """Test that a missing schema_version table returns 0."""
+        with terminal.capture():
+            dbs = database.Database(self.db_path)
+            dbs.open_it()
+            # Fresh database has no tables, should return 0
+            self.assertEqual(dbs.get_schema_version(), 0)
+            dbs.close()
+
 
 class TestDatabaseCommit(unittest.TestCase):
     """Tests for Database commit functions."""