[Concept,7/9] codman: Begin an experimental lsp analyser

Message ID 20251124134932.1991031-8-sjg@u-boot.org
State New
Headers
Series codman: Add a new source-code analysis tool |

Commit Message

Simon Glass Nov. 24, 2025, 1:49 p.m. UTC
  From: Simon Glass <simon.glass@canonical.com>

It is possible to use an LSP to determine which code is used, at least
to some degree.

Make a start on this, in the hope that future work may prove out the
concept.

So far I have not found this to be particularly useful, since it does
not seem to handle IS_ENABLED() and similar macros when working out
inactive regions.

Co-developed-by: Claude <noreply@anthropic.com>
Signed-off-by: Simon Glass <simon.glass@canonical.com>
---

 tools/codman/lsp.py        | 319 +++++++++++++++++++++++++++++++++++++
 tools/codman/lsp_client.py | 225 ++++++++++++++++++++++++++
 tools/codman/test_lsp.py   | 153 ++++++++++++++++++
 3 files changed, 697 insertions(+)
 create mode 100644 tools/codman/lsp.py
 create mode 100644 tools/codman/lsp_client.py
 create mode 100755 tools/codman/test_lsp.py
  

Patch

diff --git a/tools/codman/lsp.py b/tools/codman/lsp.py
new file mode 100644
index 00000000000..143fe22a7e1
--- /dev/null
+++ b/tools/codman/lsp.py
@@ -0,0 +1,319 @@ 
+# SPDX-License-Identifier: GPL-2.0
+#
+# Copyright 2025 Canonical Ltd
+#
+"""LSP-based line-level analysis for source code.
+
+This module provides functionality to analyse which lines in source files
+are active vs inactive based on preprocessor conditionals, using clangd's
+inactive regions feature via the Language Server Protocol (LSP).
+"""
+
+import concurrent.futures
+import json
+import multiprocessing
+import os
+import re
+import tempfile
+import time
+
+from u_boot_pylib import tools, tout
+from analyser import Analyser, FileResult
+from lsp_client import LspClient
+
+
+def create_compile_commands(build_dir, srcdir):
+    """Create compile_commands.json using gen_compile_commands.py.
+
+    Args:
+        build_dir (str): Build directory path
+        srcdir (str): Source directory path
+
+    Returns:
+        list: List of compile command entries
+    """
+    # Use the same pattern as gen_compile_commands.py
+    line_pattern = re.compile(
+        r'^(saved)?cmd_[^ ]*\.o := (?P<command_prefix>.* )'
+        r'(?P<file_path>[^ ]*\.[cS]) *(;|$)')
+
+    compile_commands = []
+
+    # Walk through build directory looking for .cmd files
+    filename_matcher = re.compile(r'^\..*\.cmd$')
+    exclude_dirs = ['.git', 'Documentation', 'include', 'tools']
+
+    for dirpath, dirnames, filenames in os.walk(build_dir, topdown=True):
+        # Prune unwanted directories
+        dirnames = [d for d in dirnames if d not in exclude_dirs]
+
+        for filename in filenames:
+            if not filename_matcher.match(filename):
+                continue
+
+            cmd_file = os.path.join(dirpath, filename)
+            try:
+                with open(cmd_file, 'rt', encoding='utf-8') as f:
+                    result = line_pattern.match(f.readline())
+                    if result:
+                        command_prefix = result.group('command_prefix')
+                        file_path = result.group('file_path')
+
+                        # Clean up command prefix (handle escaped #)
+                        prefix = command_prefix.replace(r'\#', '#').replace(
+                            '$(pound)', '#')
+
+                        # Get absolute path to source file
+                        abs_path = os.path.realpath(
+                            os.path.join(srcdir, file_path))
+                        if os.path.exists(abs_path):
+                            compile_commands.append({
+                                'directory': srcdir,
+                                'file': abs_path,
+                                'command': prefix + file_path,
+                            })
+            except (OSError, IOError):
+                continue
+
+    return compile_commands
+
+
+def worker(args):
+    """Analyse a single source file using clangd LSP.
+
+    Args:
+        args (tuple): Tuple of (source_file, client)
+            where client is a shared LspClient instance
+
+    Returns:
+        tuple: (source_file, inactive_regions, error_msg)
+    """
+    source_file, client = args
+
+    try:
+        # Read file content
+        content = tools.read_file(source_file, binary=False)
+
+        # Open the document
+        client.notify('textDocument/didOpen', {
+            'textDocument': {
+                'uri': f'file://{source_file}',
+                'languageId': 'c',
+                'version': 1,
+                'text': content
+            }
+        })
+
+        # Wait for clangd to process and send notifications
+        # Poll for inactive regions notification for this specific file
+        max_wait = 10  # seconds
+        start_time = time.time()
+        inactive_regions = None
+
+        while time.time() - start_time < max_wait:
+            time.sleep(0.1)
+
+            with client.lock:
+                notifications = list(client.notifications)
+                # Clear processed notifications to avoid buildup
+                client.notifications = []
+
+            for notif in notifications:
+                method = notif.get('method', '')
+                if method == 'textDocument/clangd.inactiveRegions':
+                    params = notif.get('params', {})
+                    uri = params.get('uri', '')
+                    # Check if this notification is for our file
+                    if uri == f'file://{source_file}':
+                        inactive_regions = params.get('inactiveRegions', [])
+                        break
+
+            if inactive_regions is not None:
+                break
+
+        # Close the document to free resources
+        client.notify('textDocument/didClose', {
+            'textDocument': {
+                'uri': f'file://{source_file}'
+            }
+        })
+
+        if inactive_regions is None:
+            # No inactive regions notification received
+            # This could mean the file has no inactive code
+            inactive_regions = []
+
+        return (source_file, inactive_regions, None)
+
+    except Exception as e:
+        return (source_file, None, str(e))
+
+
+class LspAnalyser(Analyser):  # pylint: disable=too-few-public-methods
+    """Analyser that uses clangd LSP to determine active lines.
+
+    This analyser uses the Language Server Protocol (LSP) with clangd to
+    identify inactive preprocessor regions in source files.
+    """
+
+    def __init__(self, build_dir, srcdir, used_sources, keep_temps=False):
+        """Set up the LSP analyser.
+
+        Args:
+            build_dir (str): Build directory containing .o and .cmd files
+            srcdir (str): Path to source root directory
+            used_sources (set): Set of source files that are compiled
+            keep_temps (bool): If True, keep temporary files for debugging
+        """
+        super().__init__(srcdir, keep_temps)
+        self.build_dir = build_dir
+        self.used_sources = used_sources
+
+    def extract_inactive_regions(self, jobs=None):
+        """Extract inactive regions from source files using clangd.
+
+        Args:
+            jobs (int): Number of parallel jobs (None = use all CPUs)
+
+        Returns:
+            dict: Mapping of source file paths to lists of inactive regions
+        """
+        # Create compile commands database
+        tout.progress('Building compile commands database...')
+        compile_commands = create_compile_commands(self.build_dir, self.srcdir)
+
+        # Filter to only .c and .S files that we need to analyse
+        filtered_files = []
+        for cmd in compile_commands:
+            source_file = cmd['file']
+            if source_file in self.used_sources:
+                if source_file.endswith('.c') or source_file.endswith('.S'):
+                    filtered_files.append(source_file)
+
+        tout.progress(f'Found {len(filtered_files)} source files to analyse')
+
+        if not filtered_files:
+            return {}
+
+        inactive = {}
+        errors = []
+
+        # Create a single clangd instance and use it for all files
+        with tempfile.TemporaryDirectory() as tmpdir:
+            # Write compile commands database
+            compile_db = os.path.join(tmpdir, 'compile_commands.json')
+            with open(compile_db, 'w', encoding='utf-8') as f:
+                json.dump(compile_commands, f)
+
+            # Start a single clangd server
+            tout.progress('Starting clangd server...')
+            with LspClient(['clangd', '--log=error',
+                           f'--compile-commands-dir={tmpdir}']) as client:
+                result = client.init(f'file://{self.srcdir}')
+                if not result:
+                    tout.error('Failed to start clangd')
+                    return {}
+
+                # Determine number of workers
+                if jobs is None:
+                    jobs = min(multiprocessing.cpu_count(), len(filtered_files))
+                elif jobs <= 0:
+                    jobs = 1
+
+                tout.progress(f'Processing files with {jobs} workers...')
+
+                # Use ThreadPoolExecutor to process files in parallel
+                # (threads share the same clangd client)
+                with concurrent.futures.ThreadPoolExecutor(
+                        max_workers=jobs) as executor:
+                    # Submit all tasks
+                    future_to_file = {
+                        executor.submit(worker, (source_file, client)):
+                        source_file
+                        for source_file in filtered_files
+                    }
+
+                    # Collect results as they complete
+                    completed = 0
+                    for future in concurrent.futures.as_completed(future_to_file):
+                        source_file = future_to_file[future]
+                        completed += 1
+                        tout.progress(
+                            f'Processing {completed}/{len(filtered_files)}: ' +
+                            f'{os.path.basename(source_file)}...')
+
+                        try:
+                            source_file_result, inactive_regions, error_msg = (
+                                future.result())
+
+                            if error_msg:
+                                errors.append(f'{source_file}: {error_msg}')
+                            elif inactive_regions is not None:
+                                inactive[source_file_result] = (
+                                    inactive_regions)
+                        except Exception as exc:
+                            errors.append(f'{source_file}: {exc}')
+
+        # Report any errors
+        if errors:
+            for error in errors[:10]:  # Show first 10 errors
+                tout.error(error)
+            if len(errors) > 10:
+                tout.error(f'... and {len(errors) - 10} more errors')
+            tout.warning(f'Failed to analyse {len(errors)} file(s) with LSP')
+
+        return inactive
+
+    def process(self, jobs=None):
+        """Perform line-level analysis using clangd LSP.
+
+        Args:
+            jobs (int): Number of parallel jobs (None = use all CPUs)
+
+        Returns:
+            dict: Mapping of source file paths to FileResult named tuples
+        """
+        tout.progress('Extracting inactive regions using clangd LSP...')
+        inactive_regions_map = self.extract_inactive_regions(jobs)
+
+        file_results = {}
+        for source_file in self.used_sources:
+            # Only process .c and .S files
+            if not (source_file.endswith('.c') or source_file.endswith('.S')):
+                continue
+
+            abs_path = os.path.realpath(source_file)
+            inactive_regions = inactive_regions_map.get(abs_path, [])
+
+            # Count total lines in the file
+            total_lines = self.count_lines(abs_path)
+
+            # Create line status dict
+            line_status = {}
+            # Set up all lines as active
+            for i in range(1, total_lines + 1):
+                line_status[i] = 'active'
+
+            # Mark inactive lines based on regions
+            # LSP uses 0-indexed line numbers
+            for region in inactive_regions:
+                start_line = region['start']['line'] + 1
+                end_line = region['end']['line'] + 1
+                # Mark lines as inactive (inclusive range)
+                for line_num in range(start_line, end_line + 1):
+                    if line_num <= total_lines:
+                        line_status[line_num] = 'inactive'
+
+            inactive_lines = len([s for s in line_status.values()
+                                 if s == 'inactive'])
+            active_lines = total_lines - inactive_lines
+
+            file_results[abs_path] = FileResult(
+                total_lines=total_lines,
+                active_lines=active_lines,
+                inactive_lines=inactive_lines,
+                line_status=line_status
+            )
+
+        tout.info(f'Analysed {len(file_results)} files using clangd LSP')
+        return file_results
diff --git a/tools/codman/lsp_client.py b/tools/codman/lsp_client.py
new file mode 100644
index 00000000000..954879a651e
--- /dev/null
+++ b/tools/codman/lsp_client.py
@@ -0,0 +1,225 @@ 
+# SPDX-License-Identifier: GPL-2.0
+#
+# Copyright 2025 Canonical Ltd
+#
+"""Minimal LSP (Language Server Protocol) client for clangd.
+
+This module provides a simple JSON-RPC 2.0 client for communicating with
+LSP servers like clangd. It focuses on the specific functionality needed
+for analyzing inactive preprocessor regions.
+"""
+
+import json
+import subprocess
+import threading
+from typing import Any, Dict, Optional
+
+
+class LspClient:
+    """Minimal LSP client for JSON-RPC 2.0 communication.
+
+    This client handles the basic LSP protocol communication over
+    stdin/stdout with a language server process.
+
+    Attributes:
+        process: The language server subprocess
+        next_id: Counter for JSON-RPC request IDs
+        responses: Dict mapping request IDs to response data
+        lock: Thread lock for response dictionary
+        reader_thread: Background thread reading server responses
+    """
+
+    def __init__(self, server_command):
+        """Init the LSP client and start the server.
+
+        Args:
+            server_command (list): Command to start the LSP server
+                (e.g., ['clangd', '--log=error'])
+        """
+        self.process = subprocess.Popen(
+            server_command,
+            stdin=subprocess.PIPE,
+            stdout=subprocess.PIPE,
+            stderr=subprocess.PIPE,
+            text=True,
+            bufsize=0
+        )
+        self.next_id = 1
+        self.responses = {}
+        self.notifications = []
+        self.lock = threading.Lock()
+        self.running = True
+
+        # Start background thread to read responses
+        self.reader_thread = threading.Thread(target=self._read_responses)
+        self.reader_thread.daemon = True
+        self.reader_thread.start()
+
+    def _read_responses(self):
+        """Background thread to read responses from the server"""
+        while self.running and self.process.poll() is None:
+            try:
+                # Read headers
+                headers = {}
+                while True:
+                    line = self.process.stdout.readline()
+                    if not line or line == '\r\n' or line == '\n':
+                        break
+                    if ':' in line:
+                        key, value = line.split(':', 1)
+                        headers[key.strip()] = value.strip()
+
+                if 'Content-Length' not in headers:
+                    continue
+
+                # Read content
+                content_length = int(headers['Content-Length'])
+                content = self.process.stdout.read(content_length)
+
+                if not content:
+                    break
+
+                # Parse JSON
+                message = json.loads(content)
+
+                # Store response or notification
+                with self.lock:
+                    if 'id' in message:
+                        # Response to a request
+                        self.responses[message['id']] = message
+                    else:
+                        # Notification from server
+                        self.notifications.append(message)
+
+            except (json.JSONDecodeError, ValueError):
+                continue
+            except Exception:
+                break
+
+    def _send_message(self, message: Dict[str, Any]):
+        """Send a JSON-RPC message to the server.
+
+        Args:
+            message: JSON-RPC message dictionary
+        """
+        content = json.dumps(message)
+        headers = f'Content-Length: {len(content)}\r\n\r\n'
+        self.process.stdin.write(headers + content)
+        self.process.stdin.flush()
+
+    def request(self, method: str, params: Optional[Dict] = None,
+                timeout: int = 30) -> Optional[Dict]:
+        """Send a JSON-RPC request and wait for response.
+
+        Args:
+            method: LSP method name (e.g., 'initialize')
+            params: Method parameters dictionary
+            timeout: Timeout in seconds (default: 30)
+
+        Returns:
+            Response dictionary, or None on timeout/error
+        """
+        request_id = self.next_id
+        self.next_id += 1
+
+        message = {
+            'jsonrpc': '2.0',
+            'id': request_id,
+            'method': method,
+        }
+        if params:
+            message['params'] = params
+
+        self._send_message(message)
+
+        # Wait for response
+        import time
+        start_time = time.time()
+        while time.time() - start_time < timeout:
+            with self.lock:
+                if request_id in self.responses:
+                    response = self.responses.pop(request_id)
+                    if 'result' in response:
+                        return response['result']
+                    if 'error' in response:
+                        raise RuntimeError(
+                            f"LSP error: {response['error']}")
+                    return response
+            time.sleep(0.01)
+
+        return None
+
+    def notify(self, method: str, params: Optional[Dict] = None):
+        """Send a JSON-RPC notification (no response expected).
+
+        Args:
+            method: LSP method name
+            params: Method parameters dictionary
+        """
+        message = {
+            'jsonrpc': '2.0',
+            'method': method,
+        }
+        if params:
+            message['params'] = params
+
+        self._send_message(message)
+
+    def init(self, root_uri: str, capabilities: Optional[Dict] = None) -> Dict:
+        """Send initialize request to the server.
+
+        Args:
+            root_uri: Workspace root URI (e.g., 'file:///path/to/workspace')
+            capabilities: Client capabilities dict
+
+        Returns:
+            Server capabilities from initialize response
+        """
+        if capabilities is None:
+            capabilities = {
+                'textDocument': {
+                    'semanticTokens': {
+                        'requests': {
+                            'full': True
+                        }
+                    },
+                    'publishDiagnostics': {},
+                    'inactiveRegions': {
+                        'refreshSupport': False
+                    }
+                }
+            }
+
+        result = self.request('initialize', {
+            'processId': None,
+            'rootUri': root_uri,
+            'capabilities': capabilities
+        })
+
+        # Send initialized notification
+        self.notify('initialized', {})
+
+        return result
+
+    def shutdown(self):
+        """Shutdown the language server"""
+        self.request('shutdown')
+        self.notify('exit')
+        self.running = False
+        if self.process:
+            self.process.wait(timeout=5)
+            # Close file descriptors to avoid ResourceWarnings
+            if self.process.stdin:
+                self.process.stdin.close()
+            if self.process.stdout:
+                self.process.stdout.close()
+            if self.process.stderr:
+                self.process.stderr.close()
+
+    def __enter__(self):
+        """Context manager entry"""
+        return self
+
+    def __exit__(self, exc_type, exc_val, exc_tb):
+        """Context manager exit - ensure cleanup"""
+        self.shutdown()
diff --git a/tools/codman/test_lsp.py b/tools/codman/test_lsp.py
new file mode 100755
index 00000000000..1070ce655fb
--- /dev/null
+++ b/tools/codman/test_lsp.py
@@ -0,0 +1,153 @@ 
+#!/usr/bin/env python3
+# SPDX-License-Identifier: GPL-2.0+
+#
+# Copyright 2025 Canonical Ltd
+#
+"""Test script for LSP client with clangd"""
+
+import json
+import os
+import sys
+import tempfile
+import time
+
+# Add parent directory to path
+sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
+
+from lsp_client import LspClient  # pylint: disable=wrong-import-position
+
+
+def test_clangd():
+    """Test basic clangd functionality"""
+    # Create a temporary directory with a simple C file
+    with tempfile.TemporaryDirectory() as tmpdir:
+        # Create a C file with CONFIG-style inactive code
+        test_file = os.path.join(tmpdir, 'test.c')
+        with open(test_file, 'w', encoding='utf-8') as f:
+            f.write('''#include <stdio.h>
+
+// Simulate U-Boot style CONFIG options
+#define CONFIG_FEATURE_A 1
+
+void always_compiled(void)
+{
+    printf("Always here\\n");
+}
+
+#ifdef CONFIG_FEATURE_A
+void feature_a_code(void)
+{
+    printf("Feature A enabled\\n");
+}
+#endif
+
+#ifdef CONFIG_FEATURE_B
+void feature_b_code(void)
+{
+    printf("Feature B enabled (THIS SHOULD BE INACTIVE)\\n");
+}
+#endif
+
+#if 0
+void disabled_debug_code(void)
+{
+    printf("Debug code (INACTIVE)\\n");
+}
+#endif
+''')
+
+        # Create compile_commands.json
+        compile_commands = [
+            {
+                'directory': tmpdir,
+                'command': f'gcc -c {test_file}',
+                'file': test_file
+            }
+        ]
+        compile_db = os.path.join(tmpdir, 'compile_commands.json')
+        with open(compile_db, 'w', encoding='utf-8') as f:
+            json.dump(compile_commands, f)
+
+        # Create .clangd config to enable inactive regions
+        clangd_config = os.path.join(tmpdir, '.clangd')
+        with open(clangd_config, 'w', encoding='utf-8') as f:
+            f.write('''InactiveRegions:
+  Opacity: 0.55
+''')
+
+        print(f'Created test file: {test_file}')
+        print(f'Created compile DB: {compile_db}')
+        print(f'Created clangd config: {clangd_config}')
+
+        # Start clangd
+        print('\\nStarting clangd...')
+        with LspClient(['clangd', '--log=error',
+                        f'--compile-commands-dir={tmpdir}']) as client:
+            print('Initialising...')
+            result = client.init(f'file://{tmpdir}')
+            print(f'Server capabilities: {result.get("capabilities", {}).keys()}')
+
+            # Open the document
+            print(f'\\nOpening document: {test_file}')
+            with open(test_file, 'r', encoding='utf-8') as f:
+                content = f.read()
+
+            client.notify('textDocument/didOpen', {
+                'textDocument': {
+                    'uri': f'file://{test_file}',
+                    'languageId': 'c',
+                    'version': 1,
+                    'text': content
+                }
+            })
+
+            # Wait for clangd to index the file
+            print('\\nWaiting for clangd to index file...')
+            time.sleep(3)
+
+            # Check for inactive regions notification
+            print('\\nChecking for inactive regions notification...')
+            with client.lock:
+                notifications = list(client.notifications)
+
+            print(f'Received {len(notifications)} notifications:')
+            inactive_regions = None
+            for notif in notifications:
+                method = notif.get('method', 'unknown')
+                print(f'  - {method}')
+
+                # Look for the clangd inactive regions extension
+                if method == 'textDocument/clangd.inactiveRegions':
+                    params = notif.get('params', {})
+                    inactive_regions = params.get('inactiveRegions', [])
+                    print(f'    Found {len(inactive_regions)} inactive regions!')
+
+            if inactive_regions:
+                print('\\nInactive regions:')
+                for region in inactive_regions:
+                    start = region['start']
+                    end = region['end']
+                    start_line = start['line'] + 1  # LSP is 0-indexed
+                    end_line = end['line'] + 1
+                    print(f'  Lines {start_line}-{end_line}')
+            else:
+                print('\\nNo inactive regions received (feature may not be enabled)')
+
+            # Also show the file with line numbers for reference
+            print('\\nFile contents:')
+            for i, line in enumerate(content.split('\\n'), 1):
+                print(f'{i:3}: {line}')
+
+            print('\\nTest completed!')
+
+            # Check clangd stderr for any errors
+            print('\\n=== Clangd stderr output ===')
+            stderr_output = client.process.stderr.read()
+            if stderr_output:
+                print(stderr_output[:1000])
+            else:
+                print('(no stderr output)')
+
+
+if __name__ == '__main__':
+    test_clangd()