Source code for pyplugins.interventions.mount

"""
Mount Tracker Plugin
====================

This module provides a passive tracker for mount attempts within a guest environment.
It is intended for use with the Penguin analysis framework and is implemented as a plugin.

Purpose
-------

- Tracks when the guest tries to mount filesystems.
- Records unsupported filesystem types (e.g., when mount returns EINVAL) to inform kernel support decisions.
- Logs attempts to mount missing devices.
- Can optionally fake mount successes for specific targets or all mounts, aiding in analysis and mitigation.

Usage
-----

The plugin can be configured with the following arguments:
- `outdir`: Output directory for logs.
- `fake_mounts`: List of mount targets to fake as successful.
- `all_succeed`: If set, all mount attempts are faked as successful.
- `verbose`: Enables debug logging.

Example
-------

All mount attempts are logged to `mounts.csv` in the specified output directory.

"""

from os.path import join as pjoin
from penguin import plugins, Plugin

mount_log = "mounts.csv"


[docs] class MountTracker(Plugin): """ MountTracker Plugin =================== Tracks and logs mount attempts in the guest. Attributes ---------- outdir : str Output directory for logs. mounts : set of tuple Set of (source, target, fs_type) tuples already logged. fake_mounts : list of str List of mount targets to fake as successful. all_succeed : bool If True, all mount attempts are faked as successful. Behavior -------- - Subscribes to exec events to detect `/bin/mount` invocations. - Hooks the mount syscall return to log and optionally fake mount results. """ def __init__(self): """ Initialize the MountTracker plugin. Reads configuration arguments, subscribes to exec events and mount syscall returns, sets up logging and internal state. Returns ------- None """ self.outdir = self.get_arg("outdir") # Use the Execs plugin interface for exec events plugins.subscribe(plugins.Execs, "exec_event", self.find_mount) self.mounts = set() self.fake_mounts = self.get_arg("fake_mounts") or [] self.all_succeed = self.get_arg("all_succeed") or False if self.get_arg_bool("verbose"): self.logger.setLevel("DEBUG") plugins.syscalls.syscall("on_sys_mount_return")(self.post_mount)
[docs] def post_mount(self, regs, proto, syscall, source, target, fs_type, flags, data): """ Coroutine callback for the mount syscall return. Reads the mount arguments from memory, logs the attempt, and optionally fakes the result. Parameters ---------- regs : object Register state proto : object Protocol context (opaque, framework-specific) syscall : object Syscall context, with `.retval` for return value source : int Pointer to source device string target : int Pointer to mount target string fs_type : int Pointer to filesystem type string flags : int Mount flags data : int Pointer to mount data Returns ------- None """ source_str = yield from plugins.mem.read_str(source) target_str = yield from plugins.mem.read_str(target) fs_type_str = yield from plugins.mem.read_str(fs_type) results = { "source": source_str, "target": target_str, "fs_type": fs_type_str, } retval = syscall.retval self.log_mount(retval, results) if retval == -16: # EBUSY # Already mounted - we could perhaps use this info to drop the mount from our init script? # Just pretend it was a success syscall.retval = 0 elif retval < 0: if results["target"] in self.fake_mounts: self.logger.debug(f"Fake mount: {results['target']}") syscall.retval = 0 if self.all_succeed: # Always pretend it was a success? syscall.retval = 0
[docs] def find_mount(self, event: dict) -> None: """ Detects `/bin/mount` invocations from exec events and logs them. Parameters ---------- event : dict Exec event dictionary, expected to have 'procname' and 'argv' keys. Returns ------- None """ fname = event.get('procname', None) argv = event.get('argv', []) if fname == "/bin/mount": argc = len(argv) if argc >= 5 and argv[0] == "mount" and argv[1] == "-t": results = { "source": argv[3], "target": argv[4], "fs_type": argv[2], } self.log_mount(-1, results)
[docs] def log_mount(self, retval: int, results: dict) -> None: """ Logs a mount attempt to the output CSV file if not already logged. Parameters ---------- retval : int Return value of the mount syscall or -1 for exec events. results : dict Dictionary with keys 'source', 'target', 'fs_type'. Returns ------- None """ src = results["source"] tgt = results["target"] fs = results["fs_type"] if (src, tgt, fs) not in self.mounts: self.mounts.add((src, tgt, fs)) with open(pjoin(self.outdir, mount_log), "a") as f: f.write(f"{src},{tgt},{fs},{retval}\n") self.logger.debug( f"Mount returns {retval} for: mount -t {fs} {src} {tgt}")