"""
HyperFile Plugin
================
This module implements the HyperFile plugin for the Penguin framework, enabling
hypercall-based file operations between a guest and the host. It provides a model
for virtual files that can be read, written, or controlled via ioctl/getattr
operations from the guest OS. The plugin is designed to be flexible and extensible,
allowing users to specify custom file behaviors via models.
Features
--------
- Handles hypercalls for file operations (read, write, ioctl, getattr)
- Supports dynamic file models for custom device/file behaviors
- Logs and tracks file operation results for analysis
- Provides default behaviors for unhandled operations
Example Usage
-------------
.. code-block:: python
from pyplugins.interventions.hyperfile import HyperFile
# Register the plugin with Penguin, specifying file models and log file
plugin = HyperFile()
File Model Example
------------------
.. code-block:: python
files = {
"/dev/zero": {
fops.HYP_READ: HyperFile.read_zero,
fops.HYP_WRITE: HyperFile.write_discard,
"size": 0,
}
}
Classes
-------
- HyperFile: Main plugin class implementing the hypercall interface.
Functions
---------
- hyper(name: str) -> int: Map operation name to hyperfile operation constant.
- hyper2name(num: int) -> str: Map hyperfile operation constant to operation name.
"""
import struct
from typing import Any, Dict, Tuple
from penguin import Plugin, plugins
from hyper.consts import igloo_hypercall_constants as iconsts
from hyper.consts import hyperfs_ops as hops
from hyper.consts import hyperfs_file_ops as fops
HYP_RETRY = 0xdeadbeef
try:
from penguin import yaml
except ImportError:
import yaml
[docs]
def hyper(name: str) -> int:
"""
**Map a string operation name to its corresponding hyperfile operation constant.**
**Parameters**
- `name` (`str`): The operation name ("read", "write", "ioctl", "getattr").
**Returns**
- `int`: The corresponding hyperfile operation constant.
**Raises**
- `ValueError`: If the operation name is unknown.
"""
if name == "read":
return fops.HYP_READ
elif name == "write":
return fops.HYP_WRITE
elif name == "ioctl":
return fops.HYP_IOCTL
elif name == "getattr":
return fops.HYP_GETATTR
raise ValueError(f"Unknown hyperfile operation {name}")
[docs]
def hyper2name(num: int) -> str:
"""
**Map a hyperfile operation constant to its string operation name.**
**Parameters**
- `num` (`int`): The hyperfile operation constant.
**Returns**
- `str`: The operation name.
**Raises**
- `ValueError`: If the operation constant is unknown.
"""
if num == fops.HYP_READ:
return "read"
elif num == fops.HYP_WRITE:
return "write"
elif num == fops.HYP_IOCTL:
return "ioctl"
elif num == fops.HYP_GETATTR:
return "getattr"
raise ValueError(f"Unknown hyperfile operation {num}")
[docs]
class HyperFile(Plugin):
"""
**The HyperFile plugin implements a virtual file interface for the guest OS,
allowing the guest to perform file operations via hypercalls.**
**Attributes**
- `arch_bytes` (`int`): Number of bytes per architecture word.
- `log_file` (`Optional[str]`): Path to the log file for operation results.
- `files` (`Optional[Dict[str, Dict]]`): File models for virtual devices.
- `logger` (`Any`): Logger instance.
- `endian` (`str`): Endianness format for struct packing.
- `s_word`, `u_word` (`str`): Signed/unsigned word format for struct packing.
- `results` (`Dict`): Stores results of file operations for logging.
- `default_model` (`Dict`): Default model for unhandled file operations.
"""
def __init__(self) -> None:
"""
**Initialize the HyperFile plugin, set up file models, logging, and
register hypercall handlers.**
**Returns**
- `None`
"""
panda = self.panda
self.arch_bytes = panda.bits // 8
self.log_file = self.get_arg("log_file")
self.files = self.get_arg("models")
self.logger = self.get_arg("logger")
# Struct format strings for endianness and word size
self.endian = '<' if panda.endianness == 'little' else '>'
self.s_word, self.u_word = 'iI' if panda.bits == 32 else 'qQ'
if self.files is None:
# We can be imported without files, but we'll ignore it
return
if self.log_file:
# Initialize a blank file so we can tail it
open(self.log_file, "w").close()
# We track when processes access or IOCTL files we've added here:
self.results = {} # path: {event: ... }
# event="read": {bytes_read: X, data: "0"}
# event="write": {bytes_written: X, data: ...}
# event="icotl": {mode: {count: X, rv: Y}}
assert isinstance(
self.files, dict), f"Files should be dict, not {self.files}"
self.default_model = {
fops.HYP_READ: self.read_unhandled,
fops.HYP_WRITE: self.write_unhandled,
fops.HYP_IOCTL: self.ioctl,
fops.HYP_GETATTR: self.getattr,
"size": 0,
}
# files = {filename: {'read': func, 'write': func, 'ioctl': func}}}
# On hypercall we dispatch to the appropriate handler: read, write,
# ioctl
@panda.hypercall(iconsts.IGLOO_HYPERFS_MAGIC)
def before_hypercall(cpu):
# We pass args in the arch-syscall ABI specified in pypanda's arch.py
# arm: x8/r7 r0, r1, r2
# mips: v0, a0, a1, a2
hc_type = panda.arch.get_arg(cpu, 1, convention="syscall")
if hc_type == hops.HYP_FILE_OP:
self.handle_file_op(cpu)
elif hc_type == hops.HYP_GET_NUM_HYPERFILES:
self.handle_get_num_hyperfiles(cpu)
elif hc_type == hops.HYP_GET_HYPERFILE_PATHS:
self.handle_get_hyperfile_paths(cpu)
[docs]
def handle_get_num_hyperfiles(self, cpu: Any) -> None:
"""
**Handle the hypercall to get the number of hyperfiles.**
**Parameters**
- `cpu` (`Any`): The CPU context from Panda.
**Returns**
- `None`
"""
num_hyperfiles_addr = self.panda.arch.get_arg(
cpu, 2, convention="syscall")
try:
plugins.mem.write_bytes_panda(
cpu,
num_hyperfiles_addr,
struct.pack(f"{self.endian} {self.u_word}", len(self.files)),
)
except ValueError:
# Memory r/w failed - tell guest to retry
self.panda.arch.set_retval(cpu, HYP_RETRY)
self.logger.debug(
"Failed to read/write number of hyperfiles from guest - retry")
[docs]
def handle_get_hyperfile_paths(self, cpu: Any) -> None:
"""
**Handle the hypercall to get the paths of all hyperfiles.**
**Parameters**
- `cpu` (`Any`): The CPU context from Panda.
**Returns**
- `None`
"""
hyperfile_paths_array_ptr = self.panda.arch.get_arg(
cpu, 2, convention="syscall")
n = len(self.files)
hyperfile_paths_ptrs = [None] * n
for i in range(n):
try:
hyperfile_paths_ptrs[i] = self.panda.virtual_memory_read(
cpu,
hyperfile_paths_array_ptr + i * self.arch_bytes,
self.arch_bytes,
fmt="int",
)
except ValueError:
self.panda.arch.set_retval(cpu, HYP_RETRY)
self.logger.debug(
"Failed to read hyperfile path ptr from guest - retry")
return
for path, buf in zip(self.files.keys(), hyperfile_paths_ptrs):
try:
plugins.mem.write_bytes_panda(cpu, buf, path.encode())
except ValueError:
self.panda.arch.set_retval(cpu, HYP_RETRY)
self.logger.debug(
"Failed to write hyperfile path to guest - retry")
return
[docs]
def handle_file_op(self, cpu: Any) -> None:
"""
**Handle a file operation hypercall (read, write, ioctl, getattr).**
**Parameters**
- `cpu` (`Any`): The CPU context from Panda.
**Returns**
- `None`
"""
header_fmt = f"{self.endian} i {self.u_word}"
read_fmt = write_fmt = f"{self.endian} {self.u_word} {self.u_word} q"
ioctl_fmt = f"{self.endian} I {self.u_word}"
getattr_fmt = f"{self.endian} {self.u_word}"
hyperfs_data_size = struct.calcsize(
header_fmt) + max(struct.calcsize(fmt) for fmt in (read_fmt, write_fmt, ioctl_fmt))
buf_addr = self.panda.arch.get_arg(cpu, 2, convention="syscall")
try:
buf = plugins.mem.read_bytes_panda(cpu, buf_addr, hyperfs_data_size)
except ValueError:
# Memory read failed - tell guest to retry
self.panda.arch.set_retval(cpu, HYP_RETRY)
self.logger.debug(
"Failed to read hyperfile struct from guest - retry")
return
# Unpack request with our dynamic format string
type_val, path_ptr = struct.unpack_from(header_fmt, buf)
try:
device_name = plugins.mem.read_str_panda(cpu, path_ptr)
except ValueError:
# Memory read failed - tell guest to retry
self.panda.arch.set_retval(cpu, HYP_RETRY)
self.logger.debug(
"Failed to read hyperfile struct from guest - retry")
return
if not len(device_name):
# XXX: why does this happen? Probably a bug somewhere else?
self.logger.warning(
"Empty device name in hyperfile request - ignore")
self.panda.arch.set_retval(
cpu, self.panda.to_unsigned_guest(-22), failure=True)
return
sub_offset = struct.calcsize(header_fmt)
# Ensure we have a model - if we don't, warn and add default
if device_name not in self.files:
self.logger.warning(
f"Detected {hyper2name(type_val)} event on device {repr(device_name)} but device is not in config. Using defaults.")
self.files[device_name] = {
k: v for k, v in self.default_model.items()} # XXX can't use deepcopy
model = self.files[device_name]
# Ensure our model specifies the current behavior - if not, warn and
# add default
if type_val not in model:
if not (type_val == fops.HYP_GETATTR and "size" in model):
# If we have a size, we can handle getattr with out default
# method (return size) and it's fine. Otherwise warn
self.logger.warning(
f"Detected {hyper2name(type_val)} event on device {repr(device_name)} but this event is not modeled in config. Using default.")
model[type_val] = self.default_model[type_val]
# Dispatch based on the type of operation
if type_val == fops.HYP_READ:
buffer, length, offset = struct.unpack_from(
read_fmt, buf, sub_offset)
new_buffer, retval = model[type_val](
device_name, buffer, length, offset)
# We need to write new_buffer back into the struct at buffer
# XXX: sizes? overflows?
if len(new_buffer):
try:
plugins.mem.write_bytes_panda(cpu, buffer, new_buffer)
except ValueError:
self.logger.warning(
f"After reading hyperfile {device_name} failed to write result into guest memory at {buffer:x} - retry")
self.panda.arch.set_retval(cpu, HYP_RETRY)
# XXX: If we ever have stateful files, we'll need to tell
# it the read failed
return
self.handle_result(device_name, "read", retval, length, new_buffer)
elif type_val == fops.HYP_WRITE:
buffer, length, offset = struct.unpack_from(
write_fmt, buf, sub_offset)
# We're writing data into our pseudofile. First we need to read what the guest
# has given us as data to write
# XXX offset is _internal_ to our data structures, it's how far into the file
# we've seeked. It's NOT related to the guest buffer
try:
contents = plugins.mem.read_bytes_panda(cpu, buffer, length)
except ValueError:
self.logger.warning(
f"Before writing to hyperfile {device_name} failed to read data out of guest memory at {buffer:x} with offset {offset:x}")
self.panda.arch.set_retval(cpu, HYP_RETRY)
# XXX: We might be able to get stuck in a loop here if hyperfs isn't paging in
# what we expect
return
retval = model[type_val](
device_name, buffer, length, offset, contents)
self.handle_result(
device_name,
"write",
retval,
length,
offset,
contents)
elif type_val == fops.HYP_IOCTL:
cmd, arg = struct.unpack_from(ioctl_fmt, buf, sub_offset)
retval = model[type_val](device_name, cmd, arg)
self.handle_result(device_name, "ioctl", retval, cmd, arg)
elif type_val == fops.HYP_GETATTR:
retval, size_data = model[type_val](device_name, model)
size_bytes = struct.pack(f"{self.endian} q", size_data)
self.handle_result(device_name, "getattr", retval, size_data)
size_ptr, = struct.unpack_from(getattr_fmt, buf, sub_offset)
try:
plugins.mem.write_bytes_panda(cpu, size_ptr, size_bytes)
except ValueError:
self.logger.debug(
"Failed to write hyperfile size into guest - retry(?)")
self.panda.arch.set_retval(cpu, HYP_RETRY)
return
self.panda.arch.set_retval(cpu, self.panda.to_unsigned_guest(retval))
[docs]
def handle_result(self, device_name: str, event: str,
retval: int, *data: Any) -> None:
"""
**Record the result of a file operation for logging and analysis.**
**Parameters**
- `device_name` (`str`): The name of the device/file.
- `event` (`str`): The event type ("read", "write", "ioctl", "getattr").
- `retval` (`int`): The return value of the operation.
- `*data` (`Any`): Additional data relevant to the event.
**Returns**
- `None`
"""
if device_name not in self.results:
self.results[device_name] = {}
if event not in self.results[device_name]:
self.results[device_name][event] = []
if event == "read":
requested_length, buffer = data
buffer = buffer.decode("utf-8", errors="ignore")
result = {
"readval": retval,
"bytes_requested": requested_length,
"data": buffer,
}
elif event == "write":
length, offset, buffer = data
buffer = buffer.decode("utf-8", errors="ignore")
result = {
"retval": retval,
"bytes_requested": length,
"offset": offset,
"data": buffer,
}
elif event == "ioctl":
cmd, arg = data
result = {
"cmd": cmd,
"arg": arg,
"retval": retval,
}
elif event == "getattr":
result = {
"size": data[0],
"retval": retval,
}
else:
raise ValueError(f"Unknown event {event}")
self.results[device_name][event].append(result)
# XXX TESTING ONLY, dump log in a stream?
# with open(self.log_file, "w") as f:
# yaml.dump(self.results, f)
# event="read": {bytes_read: X, data: "0"}
# event="write": {bytes_written: X, data: ...}
# event="icotl": {mode: {count: X, rv: Y}}
# Function to handle read operations
[docs]
@staticmethod
def read_zero(devname: str, buffer: int, length: int,
offset: int) -> Tuple[bytes, int]:
"""
**Return a buffer of zero bytes for read operations.**
**Parameters**
- `devname` (`str`): Device name.
- `buffer` (`int`): Guest buffer address.
- `length` (`int`): Number of bytes to read.
- `offset` (`int`): Offset into the file.
**Returns**
- `Tuple[bytes, int]`: (Data read, number of bytes read)
"""
data = b"0"
final_data = data[offset: offset + length]
return (final_data, len(final_data)) # data, rv
# Function to handle write operations
[docs]
@staticmethod
def write_discard(devname: str, buffer: int, length: int,
offset: int, contents: bytes) -> int:
"""
**Discard written data and return the number of bytes written.**
**Parameters**
- `devname` (`str`): Device name.
- `buffer` (`int`): Guest buffer address.
- `length` (`int`): Number of bytes to write.
- `offset` (`int`): Offset into the file.
- `contents` (`bytes`): Data to write.
**Returns**
- `int`: Number of bytes written.
"""
return length
[docs]
@staticmethod
def ioctl(devname: str, cmd: int, arg: int) -> int:
"""
**Handle an ioctl operation (default: always succeeds).**
**Parameters**
- `devname` (`str`): Device name.
- `cmd` (`int`): IOCTL command.
- `arg` (`int`): IOCTL argument.
**Returns**
- `int`: Return value (0 for success).
"""
return 0
[docs]
@staticmethod
def ioctl_unhandled(devname: str, cmd: int, arg: int) -> int:
"""
**Handle an unhandled ioctl operation.**
**Parameters**
- `devname` (`str`): Device name.
- `cmd` (`int`): IOCTL command.
- `arg` (`int`): IOCTL argument.
**Returns**
- `int`: Return value (-25 for ENOTTY).
"""
return -25 # -ENOTTY
[docs]
@staticmethod
def read_unhandled(filename: str, buffer: int, length: int,
offset: int) -> Tuple[bytes, int]:
"""
**Handle an unhandled read operation.**
**Parameters**
- `filename` (`str`): File name.
- `buffer` (`int`): Guest buffer address.
- `length` (`int`): Number of bytes to read.
- `offset` (`int`): Offset into the file.
**Returns**
- `Tuple[bytes, int]`: (Empty bytes, -22 for EINVAL)
"""
return (b"", -22) # -EINVAL
[docs]
@staticmethod
def write_unhandled(filename: str, buffer: int,
length: int, offset: int, contents: bytes) -> int:
"""
**Handle an unhandled write operation.**
**Parameters**
- `filename` (`str`): File name.
- `buffer` (`int`): Guest buffer address.
- `length` (`int`): Number of bytes to write.
- `offset` (`int`): Offset into the file.
- `contents` (`bytes`): Data to write.
**Returns**
- `int`: Return value (-22 for EINVAL).
"""
return -22 # -EINVAL
[docs]
@staticmethod
def getattr(device_name: str, model: Dict[str, Any]) -> Tuple[int, int]:
"""
**Handle a getattr operation, returning the file size.**
**Parameters**
- `device_name` (`str`): Device name.
- `model` (`Dict[str, Any]`): File model dictionary.
**Returns**
- `Tuple[int, int]`: (Return value, file size)
"""
"""
Return retval, size to write into buffer.
Note we could refactor this to be different and take in the panda object as an arg
and handle writing the getattr results into memory. For now we're just returning
a retval + size that's getting written into guest memory by the caller.
"""
return 0, model.get("size", 0)
[docs]
def uninit(self) -> None:
"""
**Dump the results to the log file on plugin unload.**
**Returns**
- `None`
"""
if self.log_file is not None:
with open(self.log_file, "w") as f:
yaml.dump(self.results, f)