@@ -246,6 +246,7 @@ class CseriesHelper:
Return: tuple:
str: Series name
str: Series description
+ str or None: Upstream name
Raises:
ValueError: Series is not found
@@ -400,9 +401,9 @@ class CseriesHelper:
idnum = self.db.series_find_by_name(name, include_archived)
if not idnum:
return None
- name, desc = self.db.series_get_info(idnum)
+ name, desc, ups = self.db.series_get_info(idnum)
- return Series.from_fields(idnum, name, desc)
+ return Series.from_fields(idnum, name, desc, ups)
def _get_branch_name(self, name, version):
"""Get the branch name for a particular version
@@ -1441,7 +1442,7 @@ class CseriesHelper:
int: Number of version which need a 'scan'
"""
max_vers = self._series_max_version(ser.idnum)
- name, desc = self._get_series_info(ser.idnum)
+ name, desc, ups = self._get_series_info(ser.idnum)
coloured = self.col.build(self.col.BLACK, desc, bright=False,
back=self.col.YELLOW)
versions = self._get_version_list(ser.idnum)
@@ -1487,7 +1488,7 @@ class CseriesHelper:
all series
"""
max_vers = self._series_max_version(ser.idnum)
- name, desc = self._get_series_info(ser.idnum)
+ name, desc, ups = self._get_series_info(ser.idnum)
stats, pwc = self._series_get_version_stats(ser.idnum, max_vers)
states = {x.state for x in pwc.values()}
state = 'accepted'
@@ -295,10 +295,11 @@ class Database: # pylint:disable=R0904
list of Series
"""
res = self.execute(
- 'SELECT id, name, desc FROM series ' +
+ 'SELECT id, name, desc, upstream FROM series ' +
('WHERE archived = 0' if not include_archived else ''))
- return [Series.from_fields(idnum=idnum, name=name, desc=desc)
- for idnum, name, desc in res.fetchall()]
+ return [Series.from_fields(idnum=idnum, name=name, desc=desc,
+ ups=ups)
+ for idnum, name, desc, ups in res.fetchall()]
# series functions
@@ -349,12 +350,14 @@ class Database: # pylint:disable=R0904
Return: tuple:
str: Series name
str: Series description
+ str or None: Upstream name
Raises:
ValueError: Series is not found
"""
- res = self.execute('SELECT name, desc FROM series WHERE id = ?',
- (idnum,))
+ res = self.execute(
+ 'SELECT name, desc, upstream FROM series WHERE id = ?',
+ (idnum,))
recs = res.fetchall()
if len(recs) != 1:
raise ValueError(f'No series found (id {idnum} len {len(recs)})')
@@ -417,7 +420,7 @@ class Database: # pylint:disable=R0904
'GROUP BY series_id')
return res.fetchall()
- def series_add(self, name, desc):
+ def series_add(self, name, desc, ups=None):
"""Add a new series record
The new record is set to not archived
@@ -425,13 +428,14 @@ class Database: # pylint:disable=R0904
Args:
name (str): Series name
desc (str): Series description
+ ups (str or None): Name of the upstream for this series
Return:
int: ID num of the new series record
"""
self.execute(
- 'INSERT INTO series (name, desc, archived) '
- f"VALUES ('{name}', '{desc}', 0)")
+ 'INSERT INTO series (name, desc, archived, upstream) '
+ 'VALUES (?, ?, 0, ?)', (name, desc, ups))
return self.lastrowid()
def series_remove(self, idnum):
@@ -481,6 +485,17 @@ class Database: # pylint:disable=R0904
self.execute(
'UPDATE series SET name = ? WHERE id = ?', (name, series_idnum))
+ def series_set_upstream(self, series_idnum, ups):
+ """Update upstream for a series
+
+ Args:
+ series_idnum (int): ID num of the series
+ ups (str or None): Name of the upstream, or None to clear
+ """
+ self.execute(
+ 'UPDATE series SET upstream = ? WHERE id = ?',
+ (ups, series_idnum))
+
# ser_ver functions
def ser_ver_get_link(self, series_idnum, version):
@@ -72,11 +72,12 @@ class Series(dict):
return self[name]
@staticmethod
- def from_fields(idnum, name, desc):
+ def from_fields(idnum, name, desc, ups):
ser = Series()
ser.idnum = idnum
ser.name = name
ser.desc = desc
+ ser.upstream = ups
return ser
def AddTag(self, commit, line, name, value):
@@ -1879,6 +1879,36 @@ Tested-by: Mary Smith <msmith@wibble.com> # yak
self.run_args('upstream', 'list')
self.assertFalse(out.getvalue().strip())
+ def test_series_upstream(self):
+ """Test upstream field in the series table"""
+ cser = self.get_cser()
+
+ # Add a series without upstream
+ cser.db.series_add('first', 'my desc')
+ cser.db.commit()
+ slist = cser.db.series_get_dict()
+ self.assertIsNone(slist['first'].upstream)
+
+ # Add a series with upstream
+ cser.db.series_add('second', 'desc2', ups='us')
+ cser.db.commit()
+ slist = cser.db.series_get_dict()
+ self.assertIsNone(slist['first'].upstream)
+ self.assertEqual('us', slist['second'].upstream)
+
+ # Update upstream on existing series
+ idnum = cser.db.series_find_by_name('first')
+ cser.db.series_set_upstream(idnum, 'ci')
+ cser.db.commit()
+ slist = cser.db.series_get_dict()
+ self.assertEqual('ci', slist['first'].upstream)
+
+ # Clear upstream
+ cser.db.series_set_upstream(idnum, None)
+ cser.db.commit()
+ slist = cser.db.series_get_dict()
+ self.assertIsNone(slist['first'].upstream)
+
def test_series_add_mark(self):
"""Test marking a cseries with Change-Id fields"""
cser = self.get_cser()