new file mode 100644
@@ -0,0 +1,319 @@
+# SPDX-License-Identifier: GPL-2.0
+#
+# Copyright 2025 Canonical Ltd
+#
+"""LSP-based line-level analysis for source code.
+
+This module provides functionality to analyse which lines in source files
+are active vs inactive based on preprocessor conditionals, using clangd's
+inactive regions feature via the Language Server Protocol (LSP).
+"""
+
+import concurrent.futures
+import json
+import multiprocessing
+import os
+import re
+import tempfile
+import time
+
+from u_boot_pylib import tools, tout
+from analyser import Analyser, FileResult
+from lsp_client import LspClient
+
+
+def create_compile_commands(build_dir, srcdir):
+ """Create compile_commands.json using gen_compile_commands.py.
+
+ Args:
+ build_dir (str): Build directory path
+ srcdir (str): Source directory path
+
+ Returns:
+ list: List of compile command entries
+ """
+ # Use the same pattern as gen_compile_commands.py
+ line_pattern = re.compile(
+ r'^(saved)?cmd_[^ ]*\.o := (?P<command_prefix>.* )'
+ r'(?P<file_path>[^ ]*\.[cS]) *(;|$)')
+
+ compile_commands = []
+
+ # Walk through build directory looking for .cmd files
+ filename_matcher = re.compile(r'^\..*\.cmd$')
+ exclude_dirs = ['.git', 'Documentation', 'include', 'tools']
+
+ for dirpath, dirnames, filenames in os.walk(build_dir, topdown=True):
+ # Prune unwanted directories
+ dirnames = [d for d in dirnames if d not in exclude_dirs]
+
+ for filename in filenames:
+ if not filename_matcher.match(filename):
+ continue
+
+ cmd_file = os.path.join(dirpath, filename)
+ try:
+ with open(cmd_file, 'rt', encoding='utf-8') as f:
+ result = line_pattern.match(f.readline())
+ if result:
+ command_prefix = result.group('command_prefix')
+ file_path = result.group('file_path')
+
+ # Clean up command prefix (handle escaped #)
+ prefix = command_prefix.replace(r'\#', '#').replace(
+ '$(pound)', '#')
+
+ # Get absolute path to source file
+ abs_path = os.path.realpath(
+ os.path.join(srcdir, file_path))
+ if os.path.exists(abs_path):
+ compile_commands.append({
+ 'directory': srcdir,
+ 'file': abs_path,
+ 'command': prefix + file_path,
+ })
+ except (OSError, IOError):
+ continue
+
+ return compile_commands
+
+
+def worker(args):
+ """Analyse a single source file using clangd LSP.
+
+ Args:
+ args (tuple): Tuple of (source_file, client)
+ where client is a shared LspClient instance
+
+ Returns:
+ tuple: (source_file, inactive_regions, error_msg)
+ """
+ source_file, client = args
+
+ try:
+ # Read file content
+ content = tools.read_file(source_file, binary=False)
+
+ # Open the document
+ client.notify('textDocument/didOpen', {
+ 'textDocument': {
+ 'uri': f'file://{source_file}',
+ 'languageId': 'c',
+ 'version': 1,
+ 'text': content
+ }
+ })
+
+ # Wait for clangd to process and send notifications
+ # Poll for inactive regions notification for this specific file
+ max_wait = 10 # seconds
+ start_time = time.time()
+ inactive_regions = None
+
+ while time.time() - start_time < max_wait:
+ time.sleep(0.1)
+
+ with client.lock:
+ notifications = list(client.notifications)
+ # Clear processed notifications to avoid buildup
+ client.notifications = []
+
+ for notif in notifications:
+ method = notif.get('method', '')
+ if method == 'textDocument/clangd.inactiveRegions':
+ params = notif.get('params', {})
+ uri = params.get('uri', '')
+ # Check if this notification is for our file
+ if uri == f'file://{source_file}':
+ inactive_regions = params.get('inactiveRegions', [])
+ break
+
+ if inactive_regions is not None:
+ break
+
+ # Close the document to free resources
+ client.notify('textDocument/didClose', {
+ 'textDocument': {
+ 'uri': f'file://{source_file}'
+ }
+ })
+
+ if inactive_regions is None:
+ # No inactive regions notification received
+ # This could mean the file has no inactive code
+ inactive_regions = []
+
+ return (source_file, inactive_regions, None)
+
+ except Exception as e:
+ return (source_file, None, str(e))
+
+
+class LspAnalyser(Analyser): # pylint: disable=too-few-public-methods
+ """Analyser that uses clangd LSP to determine active lines.
+
+ This analyser uses the Language Server Protocol (LSP) with clangd to
+ identify inactive preprocessor regions in source files.
+ """
+
+ def __init__(self, build_dir, srcdir, used_sources, keep_temps=False):
+ """Set up the LSP analyser.
+
+ Args:
+ build_dir (str): Build directory containing .o and .cmd files
+ srcdir (str): Path to source root directory
+ used_sources (set): Set of source files that are compiled
+ keep_temps (bool): If True, keep temporary files for debugging
+ """
+ super().__init__(srcdir, keep_temps)
+ self.build_dir = build_dir
+ self.used_sources = used_sources
+
+ def extract_inactive_regions(self, jobs=None):
+ """Extract inactive regions from source files using clangd.
+
+ Args:
+ jobs (int): Number of parallel jobs (None = use all CPUs)
+
+ Returns:
+ dict: Mapping of source file paths to lists of inactive regions
+ """
+ # Create compile commands database
+ tout.progress('Building compile commands database...')
+ compile_commands = create_compile_commands(self.build_dir, self.srcdir)
+
+ # Filter to only .c and .S files that we need to analyse
+ filtered_files = []
+ for cmd in compile_commands:
+ source_file = cmd['file']
+ if source_file in self.used_sources:
+ if source_file.endswith('.c') or source_file.endswith('.S'):
+ filtered_files.append(source_file)
+
+ tout.progress(f'Found {len(filtered_files)} source files to analyse')
+
+ if not filtered_files:
+ return {}
+
+ inactive = {}
+ errors = []
+
+ # Create a single clangd instance and use it for all files
+ with tempfile.TemporaryDirectory() as tmpdir:
+ # Write compile commands database
+ compile_db = os.path.join(tmpdir, 'compile_commands.json')
+ with open(compile_db, 'w', encoding='utf-8') as f:
+ json.dump(compile_commands, f)
+
+ # Start a single clangd server
+ tout.progress('Starting clangd server...')
+ with LspClient(['clangd', '--log=error',
+ f'--compile-commands-dir={tmpdir}']) as client:
+ result = client.init(f'file://{self.srcdir}')
+ if not result:
+ tout.error('Failed to start clangd')
+ return {}
+
+ # Determine number of workers
+ if jobs is None:
+ jobs = min(multiprocessing.cpu_count(), len(filtered_files))
+ elif jobs <= 0:
+ jobs = 1
+
+ tout.progress(f'Processing files with {jobs} workers...')
+
+ # Use ThreadPoolExecutor to process files in parallel
+ # (threads share the same clangd client)
+ with concurrent.futures.ThreadPoolExecutor(
+ max_workers=jobs) as executor:
+ # Submit all tasks
+ future_to_file = {
+ executor.submit(worker, (source_file, client)):
+ source_file
+ for source_file in filtered_files
+ }
+
+ # Collect results as they complete
+ completed = 0
+ for future in concurrent.futures.as_completed(future_to_file):
+ source_file = future_to_file[future]
+ completed += 1
+ tout.progress(
+ f'Processing {completed}/{len(filtered_files)}: ' +
+ f'{os.path.basename(source_file)}...')
+
+ try:
+ source_file_result, inactive_regions, error_msg = (
+ future.result())
+
+ if error_msg:
+ errors.append(f'{source_file}: {error_msg}')
+ elif inactive_regions is not None:
+ inactive[source_file_result] = (
+ inactive_regions)
+ except Exception as exc:
+ errors.append(f'{source_file}: {exc}')
+
+ # Report any errors
+ if errors:
+ for error in errors[:10]: # Show first 10 errors
+ tout.error(error)
+ if len(errors) > 10:
+ tout.error(f'... and {len(errors) - 10} more errors')
+ tout.warning(f'Failed to analyse {len(errors)} file(s) with LSP')
+
+ return inactive
+
+ def process(self, jobs=None):
+ """Perform line-level analysis using clangd LSP.
+
+ Args:
+ jobs (int): Number of parallel jobs (None = use all CPUs)
+
+ Returns:
+ dict: Mapping of source file paths to FileResult named tuples
+ """
+ tout.progress('Extracting inactive regions using clangd LSP...')
+ inactive_regions_map = self.extract_inactive_regions(jobs)
+
+ file_results = {}
+ for source_file in self.used_sources:
+ # Only process .c and .S files
+ if not (source_file.endswith('.c') or source_file.endswith('.S')):
+ continue
+
+ abs_path = os.path.realpath(source_file)
+ inactive_regions = inactive_regions_map.get(abs_path, [])
+
+ # Count total lines in the file
+ total_lines = self.count_lines(abs_path)
+
+ # Create line status dict
+ line_status = {}
+ # Set up all lines as active
+ for i in range(1, total_lines + 1):
+ line_status[i] = 'active'
+
+ # Mark inactive lines based on regions
+ # LSP uses 0-indexed line numbers
+ for region in inactive_regions:
+ start_line = region['start']['line'] + 1
+ end_line = region['end']['line'] + 1
+ # Mark lines as inactive (inclusive range)
+ for line_num in range(start_line, end_line + 1):
+ if line_num <= total_lines:
+ line_status[line_num] = 'inactive'
+
+ inactive_lines = len([s for s in line_status.values()
+ if s == 'inactive'])
+ active_lines = total_lines - inactive_lines
+
+ file_results[abs_path] = FileResult(
+ total_lines=total_lines,
+ active_lines=active_lines,
+ inactive_lines=inactive_lines,
+ line_status=line_status
+ )
+
+ tout.info(f'Analysed {len(file_results)} files using clangd LSP')
+ return file_results
new file mode 100644
@@ -0,0 +1,225 @@
+# SPDX-License-Identifier: GPL-2.0
+#
+# Copyright 2025 Canonical Ltd
+#
+"""Minimal LSP (Language Server Protocol) client for clangd.
+
+This module provides a simple JSON-RPC 2.0 client for communicating with
+LSP servers like clangd. It focuses on the specific functionality needed
+for analyzing inactive preprocessor regions.
+"""
+
+import json
+import subprocess
+import threading
+from typing import Any, Dict, Optional
+
+
+class LspClient:
+ """Minimal LSP client for JSON-RPC 2.0 communication.
+
+ This client handles the basic LSP protocol communication over
+ stdin/stdout with a language server process.
+
+ Attributes:
+ process: The language server subprocess
+ next_id: Counter for JSON-RPC request IDs
+ responses: Dict mapping request IDs to response data
+ lock: Thread lock for response dictionary
+ reader_thread: Background thread reading server responses
+ """
+
+ def __init__(self, server_command):
+ """Init the LSP client and start the server.
+
+ Args:
+ server_command (list): Command to start the LSP server
+ (e.g., ['clangd', '--log=error'])
+ """
+ self.process = subprocess.Popen(
+ server_command,
+ stdin=subprocess.PIPE,
+ stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE,
+ text=True,
+ bufsize=0
+ )
+ self.next_id = 1
+ self.responses = {}
+ self.notifications = []
+ self.lock = threading.Lock()
+ self.running = True
+
+ # Start background thread to read responses
+ self.reader_thread = threading.Thread(target=self._read_responses)
+ self.reader_thread.daemon = True
+ self.reader_thread.start()
+
+ def _read_responses(self):
+ """Background thread to read responses from the server"""
+ while self.running and self.process.poll() is None:
+ try:
+ # Read headers
+ headers = {}
+ while True:
+ line = self.process.stdout.readline()
+ if not line or line == '\r\n' or line == '\n':
+ break
+ if ':' in line:
+ key, value = line.split(':', 1)
+ headers[key.strip()] = value.strip()
+
+ if 'Content-Length' not in headers:
+ continue
+
+ # Read content
+ content_length = int(headers['Content-Length'])
+ content = self.process.stdout.read(content_length)
+
+ if not content:
+ break
+
+ # Parse JSON
+ message = json.loads(content)
+
+ # Store response or notification
+ with self.lock:
+ if 'id' in message:
+ # Response to a request
+ self.responses[message['id']] = message
+ else:
+ # Notification from server
+ self.notifications.append(message)
+
+ except (json.JSONDecodeError, ValueError):
+ continue
+ except Exception:
+ break
+
+ def _send_message(self, message: Dict[str, Any]):
+ """Send a JSON-RPC message to the server.
+
+ Args:
+ message: JSON-RPC message dictionary
+ """
+ content = json.dumps(message)
+ headers = f'Content-Length: {len(content)}\r\n\r\n'
+ self.process.stdin.write(headers + content)
+ self.process.stdin.flush()
+
+ def request(self, method: str, params: Optional[Dict] = None,
+ timeout: int = 30) -> Optional[Dict]:
+ """Send a JSON-RPC request and wait for response.
+
+ Args:
+ method: LSP method name (e.g., 'initialize')
+ params: Method parameters dictionary
+ timeout: Timeout in seconds (default: 30)
+
+ Returns:
+ Response dictionary, or None on timeout/error
+ """
+ request_id = self.next_id
+ self.next_id += 1
+
+ message = {
+ 'jsonrpc': '2.0',
+ 'id': request_id,
+ 'method': method,
+ }
+ if params:
+ message['params'] = params
+
+ self._send_message(message)
+
+ # Wait for response
+ import time
+ start_time = time.time()
+ while time.time() - start_time < timeout:
+ with self.lock:
+ if request_id in self.responses:
+ response = self.responses.pop(request_id)
+ if 'result' in response:
+ return response['result']
+ if 'error' in response:
+ raise RuntimeError(
+ f"LSP error: {response['error']}")
+ return response
+ time.sleep(0.01)
+
+ return None
+
+ def notify(self, method: str, params: Optional[Dict] = None):
+ """Send a JSON-RPC notification (no response expected).
+
+ Args:
+ method: LSP method name
+ params: Method parameters dictionary
+ """
+ message = {
+ 'jsonrpc': '2.0',
+ 'method': method,
+ }
+ if params:
+ message['params'] = params
+
+ self._send_message(message)
+
+ def init(self, root_uri: str, capabilities: Optional[Dict] = None) -> Dict:
+ """Send initialize request to the server.
+
+ Args:
+ root_uri: Workspace root URI (e.g., 'file:///path/to/workspace')
+ capabilities: Client capabilities dict
+
+ Returns:
+ Server capabilities from initialize response
+ """
+ if capabilities is None:
+ capabilities = {
+ 'textDocument': {
+ 'semanticTokens': {
+ 'requests': {
+ 'full': True
+ }
+ },
+ 'publishDiagnostics': {},
+ 'inactiveRegions': {
+ 'refreshSupport': False
+ }
+ }
+ }
+
+ result = self.request('initialize', {
+ 'processId': None,
+ 'rootUri': root_uri,
+ 'capabilities': capabilities
+ })
+
+ # Send initialized notification
+ self.notify('initialized', {})
+
+ return result
+
+ def shutdown(self):
+ """Shutdown the language server"""
+ self.request('shutdown')
+ self.notify('exit')
+ self.running = False
+ if self.process:
+ self.process.wait(timeout=5)
+ # Close file descriptors to avoid ResourceWarnings
+ if self.process.stdin:
+ self.process.stdin.close()
+ if self.process.stdout:
+ self.process.stdout.close()
+ if self.process.stderr:
+ self.process.stderr.close()
+
+ def __enter__(self):
+ """Context manager entry"""
+ return self
+
+ def __exit__(self, exc_type, exc_val, exc_tb):
+ """Context manager exit - ensure cleanup"""
+ self.shutdown()
new file mode 100755
@@ -0,0 +1,153 @@
+#!/usr/bin/env python3
+# SPDX-License-Identifier: GPL-2.0+
+#
+# Copyright 2025 Canonical Ltd
+#
+"""Test script for LSP client with clangd"""
+
+import json
+import os
+import sys
+import tempfile
+import time
+
+# Add parent directory to path
+sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
+
+from lsp_client import LspClient # pylint: disable=wrong-import-position
+
+
+def test_clangd():
+ """Test basic clangd functionality"""
+ # Create a temporary directory with a simple C file
+ with tempfile.TemporaryDirectory() as tmpdir:
+ # Create a C file with CONFIG-style inactive code
+ test_file = os.path.join(tmpdir, 'test.c')
+ with open(test_file, 'w', encoding='utf-8') as f:
+ f.write('''#include <stdio.h>
+
+// Simulate U-Boot style CONFIG options
+#define CONFIG_FEATURE_A 1
+
+void always_compiled(void)
+{
+ printf("Always here\\n");
+}
+
+#ifdef CONFIG_FEATURE_A
+void feature_a_code(void)
+{
+ printf("Feature A enabled\\n");
+}
+#endif
+
+#ifdef CONFIG_FEATURE_B
+void feature_b_code(void)
+{
+ printf("Feature B enabled (THIS SHOULD BE INACTIVE)\\n");
+}
+#endif
+
+#if 0
+void disabled_debug_code(void)
+{
+ printf("Debug code (INACTIVE)\\n");
+}
+#endif
+''')
+
+ # Create compile_commands.json
+ compile_commands = [
+ {
+ 'directory': tmpdir,
+ 'command': f'gcc -c {test_file}',
+ 'file': test_file
+ }
+ ]
+ compile_db = os.path.join(tmpdir, 'compile_commands.json')
+ with open(compile_db, 'w', encoding='utf-8') as f:
+ json.dump(compile_commands, f)
+
+ # Create .clangd config to enable inactive regions
+ clangd_config = os.path.join(tmpdir, '.clangd')
+ with open(clangd_config, 'w', encoding='utf-8') as f:
+ f.write('''InactiveRegions:
+ Opacity: 0.55
+''')
+
+ print(f'Created test file: {test_file}')
+ print(f'Created compile DB: {compile_db}')
+ print(f'Created clangd config: {clangd_config}')
+
+ # Start clangd
+ print('\\nStarting clangd...')
+ with LspClient(['clangd', '--log=error',
+ f'--compile-commands-dir={tmpdir}']) as client:
+ print('Initialising...')
+ result = client.init(f'file://{tmpdir}')
+ print(f'Server capabilities: {result.get("capabilities", {}).keys()}')
+
+ # Open the document
+ print(f'\\nOpening document: {test_file}')
+ with open(test_file, 'r', encoding='utf-8') as f:
+ content = f.read()
+
+ client.notify('textDocument/didOpen', {
+ 'textDocument': {
+ 'uri': f'file://{test_file}',
+ 'languageId': 'c',
+ 'version': 1,
+ 'text': content
+ }
+ })
+
+ # Wait for clangd to index the file
+ print('\\nWaiting for clangd to index file...')
+ time.sleep(3)
+
+ # Check for inactive regions notification
+ print('\\nChecking for inactive regions notification...')
+ with client.lock:
+ notifications = list(client.notifications)
+
+ print(f'Received {len(notifications)} notifications:')
+ inactive_regions = None
+ for notif in notifications:
+ method = notif.get('method', 'unknown')
+ print(f' - {method}')
+
+ # Look for the clangd inactive regions extension
+ if method == 'textDocument/clangd.inactiveRegions':
+ params = notif.get('params', {})
+ inactive_regions = params.get('inactiveRegions', [])
+ print(f' Found {len(inactive_regions)} inactive regions!')
+
+ if inactive_regions:
+ print('\\nInactive regions:')
+ for region in inactive_regions:
+ start = region['start']
+ end = region['end']
+ start_line = start['line'] + 1 # LSP is 0-indexed
+ end_line = end['line'] + 1
+ print(f' Lines {start_line}-{end_line}')
+ else:
+ print('\\nNo inactive regions received (feature may not be enabled)')
+
+ # Also show the file with line numbers for reference
+ print('\\nFile contents:')
+ for i, line in enumerate(content.split('\\n'), 1):
+ print(f'{i:3}: {line}')
+
+ print('\\nTest completed!')
+
+ # Check clangd stderr for any errors
+ print('\\n=== Clangd stderr output ===')
+ stderr_output = client.process.stderr.read()
+ if stderr_output:
+ print(stderr_output[:1000])
+ else:
+ print('(no stderr output)')
+
+
+if __name__ == '__main__':
+ test_clangd()