@@ -8,7 +8,7 @@
import re
import os
import shutil
-from subprocess import call, check_call, check_output, CalledProcessError
+from subprocess import call, check_call, check_output, CalledProcessError, run
from subprocess import DEVNULL
import tempfile
@@ -38,11 +38,20 @@ class FsHelper:
fsh.mk_fs()
...
+ To create an encrypted LUKS1 partition:
+
+ with FsHelper(ubman.config, 'ext4', 10, 'mmc1',
+ encrypt_passphrase='test') as fsh:
+ # create files in the fsh.srcdir directory
+ fsh.mk_fs() # Creates and encrypts the filesystem
+ ...
+
Properties:
fs_img (str): Filename for the filesystem image; this is set to a
default value but can be overwritten
"""
- def __init__(self, config, fs_type, size_mb, prefix, part_mb=None):
+ def __init__(self, config, fs_type, size_mb, prefix, part_mb=None,
+ encrypt_passphrase=None):
"""Set up a new object
Args:
@@ -54,6 +63,8 @@ class FsHelper:
part_mb (int, optional): Size of partition in MB. If None, defaults
to size_mb. This can be used to make the partition larger than
the filesystem, to create space for disk-encryption metadata
+ encrypt_passphrase (str, optional): If provided, encrypt the
+ filesystem with LUKS1 using this passphrase
"""
if ('fat' not in fs_type and 'ext' not in fs_type and
fs_type not in ['exfat', 'fs_generic']):
@@ -65,6 +76,7 @@ class FsHelper:
self.partition_mb = part_mb if part_mb is not None else size_mb
self.prefix = prefix
self.quiet = True
+ self.encrypt_passphrase = encrypt_passphrase
# Use a default filename; the caller can adjust it
leaf = f'{prefix}.{fs_type}.img'
@@ -136,6 +148,10 @@ class FsHelper:
check_call(f'mcopy -i {fs_img} {flags} {self.srcdir}/* ::/',
shell=True)
+ # Encrypt the filesystem if requested
+ if self.encrypt_passphrase:
+ self.encrypt_luks(self.encrypt_passphrase)
+
def setup(self):
"""Set up the srcdir ready to receive files"""
if not self.srcdir:
@@ -149,6 +165,95 @@ class FsHelper:
self.tmpdir = tempfile.TemporaryDirectory('fs_helper')
self.srcdir = self.tmpdir.name
+ def encrypt_luks(self, passphrase):
+ """Encrypt the filesystem image with LUKS1
+
+ This replaces the filesystem image with a LUKS1-encrypted version.
+ LUKS1 is used because U-Boot's unlock implementation currently only
+ supports LUKS version 1.
+
+ Args:
+ passphrase (str): Passphrase for the LUKS container
+
+ Returns:
+ str: Path to the encrypted image
+
+ Raises:
+ CalledProcessError: If cryptsetup is not available or fails
+ """
+ # LUKS1 encryption parameters
+ cipher = 'aes-cbc-essiv:sha256'
+ key_size = 256
+ hash_alg = 'sha256'
+
+ # Save the original filesystem image
+ orig_fs_img = f'{self.fs_img}.orig'
+ os.rename(self.fs_img, orig_fs_img)
+
+ # Create a new image file for the LUKS container
+ luks_img = self.fs_img
+ luks_size_mb = self.partition_mb
+ check_call(f'dd if=/dev/zero of={luks_img} bs=1M count={luks_size_mb}',
+ shell=True, stdout=DEVNULL if self.quiet else None)
+
+ # Ensure device-mapper kernel module is loaded
+ if not os.path.exists('/sys/class/misc/device-mapper'):
+ # Try to load the dm_mod kernel module
+ result = run(['sudo', 'modprobe', 'dm_mod'],
+ stdout=DEVNULL, stderr=DEVNULL, check=False)
+ if result.returncode != 0:
+ raise RuntimeError(
+ 'Device-mapper is not available. Please ensure the dm_mod '
+ 'kernel module is loaded and you have permission to use '
+ 'device-mapper. This is required for LUKS encryption tests.')
+
+ device_name = f'luks_test_{os.getpid()}'
+
+ # Clean up any stale device with the same name
+ run(['sudo', 'cryptsetup', 'close', device_name],
+ stdout=DEVNULL, stderr=DEVNULL, check=False)
+
+ try:
+ # Format as LUKS1
+ run(['cryptsetup', 'luksFormat',
+ '--type', 'luks1',
+ '--cipher', cipher,
+ '--key-size', str(key_size),
+ '--hash', hash_alg,
+ '--iter-time', '10', # Very fast for testing (low security)
+ luks_img],
+ input=f'{passphrase}\n'.encode(),
+ stdout=DEVNULL if self.quiet else None,
+ stderr=DEVNULL if self.quiet else None,
+ check=True)
+
+ # Open the LUKS device (requires sudo)
+ # Use --key-file=- to read passphrase from stdin
+ result = run(['sudo', 'cryptsetup', 'open', '--key-file=-',
+ luks_img, device_name], input=passphrase.encode(),
+ stdout=DEVNULL if self.quiet else None, stderr=None,
+ check=True)
+ # Copy the filesystem data into the LUKS container
+ check_call(f'sudo dd if={orig_fs_img} of=/dev/mapper/{device_name} bs=1M',
+ shell=True, stdout=DEVNULL if self.quiet else None)
+
+ # Remove the original filesystem image
+ os.remove(orig_fs_img)
+
+ except Exception:
+ # Clean up on error
+ if os.path.exists(luks_img):
+ os.remove(luks_img)
+ if os.path.exists(orig_fs_img):
+ os.rename(orig_fs_img, self.fs_img)
+ raise
+ finally:
+ # Always close the device if it's still open
+ run(['sudo', 'cryptsetup', 'close', device_name],
+ stdout=DEVNULL, stderr=DEVNULL, check=False)
+
+ return self.fs_img
+
def cleanup(self):
"""Remove created image"""
if self.tmpdir: