From: Simon Glass <simon.glass@canonical.com>
When the database is upgraded from before v5, existing series will have
a NULL upstream. Detect this after migration and print a warning listing
the affected series, so the user knows to set an upstream for them.
Add series_get_null_upstream() to query for series with NULL upstream.
Have start() return the old schema version so callers can check whether
a migration from a particular version occurred.
Signed-off-by: Simon Glass <simon.glass@canonical.com>
---
tools/patman/cser_helper.py | 13 ++++++++-
tools/patman/database.py | 20 +++++++++++++-
tools/patman/test_cseries.py | 52 ++++++++++++++++++++++++++++++++++++
3 files changed, 83 insertions(+), 2 deletions(-)
@@ -110,11 +110,22 @@ class CseriesHelper:
# For the first instance, start it up with the expected schema
self.db, is_new = Database.get_instance(fname)
if is_new:
- self.db.start()
+ old_version = self.db.start()
+ if old_version is not None and old_version < 5:
+ self._check_null_upstreams()
else:
# If a previous test has already checked the schema, just open it
self.db.open_it()
+ def _check_null_upstreams(self):
+ """Warn about series that have no upstream set"""
+ names = self.db.series_get_null_upstream()
+ if names:
+ tout.warning(
+ f'{len(names)} series without an upstream:')
+ for name in names:
+ tout.warning(f' {name}')
+
def close_database(self):
"""Close the database"""
if self.db:
@@ -84,7 +84,12 @@ class Database: # pylint:disable=R0904
return Database(db_path), True
def start(self):
- """Open the database read for use, migrate to latest schema"""
+ """Open the database ready for use, migrate to latest schema
+
+ Return:
+ int or None: Schema version before migration, or None if no
+ migration was needed
+ """
self.open_it()
old_version = self.get_schema_version()
if old_version > LATEST:
@@ -93,6 +98,9 @@ class Database: # pylint:disable=R0904
f'Database version {old_version} is too new (max'
f' {LATEST}); please update patman')
self.migrate_to(LATEST)
+ if old_version == LATEST:
+ return None
+ return old_version
def open_it(self):
"""Open the database, creating it if necessary"""
@@ -496,6 +504,16 @@ class Database: # pylint:disable=R0904
'UPDATE series SET upstream = ? WHERE id = ?',
(ups, series_idnum))
+ def series_get_null_upstream(self):
+ """Get a list of series names that have no upstream set
+
+ Return:
+ list of str: Series names with NULL upstream
+ """
+ res = self.execute(
+ 'SELECT name FROM series WHERE upstream IS NULL')
+ return [row[0] for row in res.fetchall()]
+
# ser_ver functions
def ser_ver_get_link(self, series_idnum, version):
@@ -3365,6 +3365,58 @@ Date: .*
db.start()
self.assertIn('is too new', err.getvalue())
+ def test_migrate_upstream_warning(self):
+ """Test that migrating to v5 warns about series without upstream"""
+ db = database.Database(f'{self.tmpdir}/.patman2.db')
+ with terminal.capture():
+ db.open_it()
+
+ # Create a v4 database with some series
+ with terminal.capture() as (out, _):
+ db.migrate_to(4)
+ self.assertEqual(
+ 'Update database to v1\nUpdate database to v2\n'
+ 'Update database to v3\nUpdate database to v4',
+ out.getvalue().strip())
+ db.execute(
+ "INSERT INTO series (name, desc, archived) "
+ "VALUES ('first', 'desc1', 0)")
+ db.execute(
+ "INSERT INTO series (name, desc, archived) "
+ "VALUES ('second', 'desc2', 0)")
+ db.execute(
+ "INSERT INTO series (name, desc, archived) "
+ "VALUES ('third', 'desc3', 0)")
+ db.commit()
+ db.close()
+
+ # Now open via CseriesHelper which triggers migration and check
+ self.make_git_tree()
+ cser = cseries.Cseries(self.tmpdir, terminal.COLOR_NEVER)
+ cser.topdir = self.tmpdir
+
+ # Point at our v4 database
+ database.Database.instances = {}
+ cser.db, _ = database.Database.get_instance(
+ f'{self.tmpdir}/.patman2.db')
+ with terminal.capture() as (_, err):
+ old_version = cser.db.start()
+ self.assertEqual(4, old_version)
+
+ # Set upstream on one series so only two are reported
+ idnum = cser.db.series_find_by_name('second')
+ cser.db.series_set_upstream(idnum, 'us')
+ cser.db.commit()
+
+ with terminal.capture() as (_, err):
+ cser._check_null_upstreams()
+ lines = err.getvalue().strip().splitlines()
+ self.assertEqual('2 series without an upstream:', lines[0])
+ self.assertEqual(' first', lines[1])
+ self.assertEqual(' third', lines[2])
+
+ cser.db.close()
+
def test_series_scan(self):
"""Test scanning a series for updates"""
cser, _ = self.setup_second()