Source code for penguin.gen_image

"""
penguin.gen_image
=================

Filesystem image generation utilities for the Penguin emulation environment.

This module provides functions for generating disk images, adding required files,
and handling image creation via CLI or programmatically.
"""

import logging
import os
import subprocess
import sys
import tempfile
from pathlib import Path
from subprocess import check_output
from random import randint
from penguin.defaults import default_preinit_script
from penguin.utils import get_arch_dir, get_driver_kmod_path
import tarfile
import time
import io

import click

from penguin import getColoredLogger
from penguin.penguin_config import load_config

"""
gen_image can be run as a separate script if this is loaded at the module
level. This makes it easier to profile.
"""

"""
This class wrapped what used to be a libguestfs interface

At this point it allows us to pretend that the temporary directory we have
is another file system
"""
logger = getColoredLogger("penguin.gen_image")


[docs] def get_mount_type(path: str) -> str | None: """ Get the filesystem type of the mount at the given path. :param path: Path to check. :type path: str :return: Filesystem type string or None. :rtype: str or None """ try: stat_output = subprocess.check_output(["stat", "-f", "-c", "%T", path]) return stat_output.decode("utf-8").strip().lower() except subprocess.CalledProcessError: return None
[docs] def tar_add_min_files(tf_path: str, config: dict) -> None: """ Add minimal required files to a tar archive for the image. :param tf_path: Path to the tar file. :type tf_path: str :param config: Configuration dictionary. :type config: dict """ arch_dir = get_arch_dir(config) with tarfile.open(tf_path, "a") as tf: # Add igloo/ directory igloo_dir = tarfile.TarInfo(name="igloo/") igloo_dir.type = tarfile.DIRTYPE igloo_dir.mode = 0o755 igloo_dir.mtime = int(time.time()) igloo_dir.uname = "root" igloo_dir.gname = "root" tf.addfile(igloo_dir) # Add igloo/boot/ directory igloo_boot_dir = tarfile.TarInfo(name="igloo/boot/") igloo_boot_dir.type = tarfile.DIRTYPE igloo_boot_dir.mode = 0o755 igloo_boot_dir.mtime = int(time.time()) igloo_boot_dir.uname = "root" igloo_boot_dir.gname = "root" tf.addfile(igloo_boot_dir) # /igloo/boot/preinit init_bytes = default_preinit_script.encode() ti = tarfile.TarInfo(name="igloo/boot/preinit") ti.size = len(init_bytes) ti.mode = 0o755 ti.mtime = int(time.time()) ti.uname = "root" ti.gname = "root" tf.addfile(ti, fileobj=io.BytesIO(init_bytes)) # /igloo/boot/busybox busybox_path = os.path.join(arch_dir, "busybox") tf.add(busybox_path, arcname="igloo/boot/busybox", filter=lambda ti: (setattr(ti, 'mode', 0o755) or ti)) # /igloo/boot/hyp_file_op shr = os.path.join(arch_dir, "hyp_file_op") tf.add(shr, arcname="igloo/boot/hyp_file_op", filter=lambda ti: (setattr(ti, 'mode', 0o755) or ti)) # /igloo/boot/send_portalcall spc = os.path.join(arch_dir, "send_portalcall") tf.add(spc, arcname="igloo/boot/send_portalcall", filter=lambda ti: (setattr(ti, 'mode', 0o755) or ti)) # /igloo/boot/sh (symlink) symlink_info = tarfile.TarInfo(name="igloo/boot/sh") symlink_info.type = tarfile.SYMTYPE symlink_info.linkname = "/igloo/boot/busybox" symlink_info.mode = 0o777 symlink_info.mtime = int(time.time()) symlink_info.uname = "root" symlink_info.gname = "root" tf.addfile(symlink_info) # /igloo/boot/igloo.ko driver = get_driver_kmod_path(config) tf.add(driver, arcname="igloo/boot/igloo.ko", filter=lambda ti: (setattr(ti, 'mode', 0o755) or ti))
[docs] def make_image(fs: str, out: str, artifacts: str | None, config: dict) -> None: """ Generate a new disk image from the given filesystem and configuration. :param fs: Path to input filesystem tarball. :type fs: str :param out: Path to output qcow image. :type out: str :param artifacts: Path to artifacts directory. :type artifacts: str or None :param config: Configuration dictionary. :type config: dict """ logger.debug("Generating new image from config...") IN_TARBALL = Path(fs) ARTIFACTS = Path(artifacts or "/tmp") QCOW = Path(out) ARTIFACTS.mkdir(exist_ok=True) # Unique suffix to avoid conflicts suffix = randint(0, 1000000) delete_tar = True MODIFIED_TARBALL = Path(ARTIFACTS, f"fs_out_{suffix}.tar") with tempfile.TemporaryDirectory() as TMP_DIR: uncompressed_tar = Path(TMP_DIR, f"uncompressed_{suffix}.tar") check_output(f"pigz -dc '{str(IN_TARBALL)}' > '{uncompressed_tar}'", shell=True) # Add files directly to the tar tar_add_min_files(uncompressed_tar, config) check_output(f"pigz -c '{uncompressed_tar}' > '{MODIFIED_TARBALL}'", shell=True) TARBALL = MODIFIED_TARBALL # 1GB of padding. XXX is this a good amount - does it slow things down if it's too much? # Our disk images are sparse, so this doesn't actually take up any space? PADDING_MB = 4096 # Was 1024 BLOCK_SIZE = 4096 # Calculate image and filesystem size UNPACKED_SIZE = int(check_output(f'zcat "{TARBALL}" | wc -c', shell=True)) UNPACKED_SIZE = UNPACKED_SIZE + 1024 * 1024 * PADDING_MB REQUIRED_BLOCKS = int((UNPACKED_SIZE + BLOCK_SIZE - 1) / BLOCK_SIZE + 4096) # Was +1024 FILESYSTEM_SIZE = int(REQUIRED_BLOCKS * BLOCK_SIZE) # Calculate the number of inodes - err on the side of too big since we'll add more to the FS later INODE_SIZE = 8192 # For every 8KB of disk space, we'll allocate an inode NUMBER_OF_INODES = int(FILESYSTEM_SIZE / INODE_SIZE) NUMBER_OF_INODES = ( NUMBER_OF_INODES + 1000 ) # Padding for more files getting added later def _make_img(work_dir, qcow, delete_tar): IMAGE = Path(work_dir, "image.raw") # Create raw image file check_output(["truncate", "-s", str(FILESYSTEM_SIZE), IMAGE]) # Format as ext4 and populate directly from the tarball check_output([ "mke2fs", "-t", "ext4", "-N", str(NUMBER_OF_INODES), "-d", str(uncompressed_tar), str(IMAGE) ]) # Convert to qcow2 check_output(["qemu-img", "convert", "-f", "raw", "-O", "qcow2", str(IMAGE), str(qcow)]) if delete_tar: check_output(["rm", str(TARBALL)]) # if our QCOW path is a lustrefs we need to operate within the workdir and copy the qcow out if get_mount_type(QCOW.parent) == "lustre": # Need to convert to qcow within the workdir _make_img(TMP_DIR, Path(TMP_DIR, "image.qcow"), delete_tar) check_output(["mv", Path(TMP_DIR, "image.qcow"), str(QCOW)]) else: _make_img(TMP_DIR, QCOW, delete_tar)
[docs] def fakeroot_gen_image(fs: str, out: str, artifacts: str, proj_dir: str, config: str) -> str: """ Run image generation under fakeroot. :param fs: Path to input filesystem tarball. :type fs: str :param out: Path to output qcow image. :type out: str :param artifacts: Path to artifacts directory. :type artifacts: str :param proj_dir: Path to project directory. :type proj_dir: str :param config: Path to config file. :type config: str :return: Path to generated image. :rtype: str :raises Exception: If image generation fails. """ o = Path(out) cmd = [ "fakeroot", "gen_image", "--fs", str(fs), "--out", str(o), "--artifacts", str(artifacts), "--proj", str(proj_dir), "--config", str(config), ] if logger.level == logging.DEBUG: cmd.extend(["--verbose"]) p = subprocess.Popen(cmd, stdout=sys.stdout, stderr=sys.stderr) p.wait() if p.returncode != 0: raise Exception(f"Image generation failed with code {p.returncode}") if o.exists(): return str(o) raise Exception("No image generated")
@click.command() @click.option("--fs", required=True, help="Path to a filesystem as a tar gz") @click.option("--out", required=True, help="Path to a qcow to be created") @click.option("--artifacts", default=None, help="Path to a directory for artifacts") @click.option("--proj", required=True, help="Path to a project directory") @click.option("--config", default=None, help="Path to config file") @click.option("-v", "--verbose", count=True) def makeImage(fs: str, out: str, artifacts: str | None, proj: str, config: str, verbose: int) -> None: """ CLI entrypoint for image generation. :param fs: Path to input filesystem tarball. :type fs: str :param out: Path to output qcow image. :type out: str :param artifacts: Path to artifacts directory. :type artifacts: str or None :param proj: Path to project directory. :type proj: str :param config: Path to config file. :type config: str :param verbose: Verbosity level. :type verbose: int """ if verbose: logger.setLevel(logging.DEBUG) if not os.path.isfile(config): logger.error(f"Config file {config} not found") sys.exit(1) try: c = load_config(proj, config) make_image(fs, out, artifacts, c) except Exception as e: logger.error("Failed to generate image") # Show exception logger.error(e, exc_info=True, stack_info=True) sys.exit(1) if __name__ == "__main__": makeImage()