Source code for pyplugins.apis.kffi

"""
KFFI API Plugin
================

This module provides the KFFI (Kernel Foreign Function Interface) plugin for the
Penguin framework. It enables calling kernel-space functions and interacting with
kernel memory from user-space plugins. The KFFI plugin abstracts low-level kernel
function invocation, argument marshalling, and result handling, allowing other
plugins to perform advanced kernel introspection and manipulation.

Features
--------

* Call arbitrary kernel functions with specified arguments.
* Read and write kernel memory.
* Marshal arguments and results between user-space and kernel-space.
* Supports type-safe function signatures and return values.
* Compile and inject dynamic C-structs/typedefs on the fly using DWARFFI.

Example usage
-------------

.. code-block:: python

    from penguin import plugins

    # Call a kernel function (e.g., do_sys_open) with arguments
    result = yield from plugins.kffi.call_function("do_sys_open", "/etc/passwd", 0, 0)

    # Read kernel memory at a specific address
    data = yield from plugins.kffi.read_kernel_memory(0xffff888000000000, 64)

    # Write to kernel memory
    yield from plugins.kffi.write_kernel_memory(0xffff888000000000, b"\x90\x90\x90\x90")

    # Load a custom struct layout into the emulator's architecture
    plugins.kffi.cdef("struct my_payload { int a; char b[10]; };")
"""

import inspect
import hashlib
import os
from os.path import isfile, join, realpath
from pathlib import Path
from typing import Any, Generator, Iterator, Optional, Tuple, Union

from dwarffi.instances import BoundTypeInstance, Ptr, EnumInstance
from dwarffi.dffi import DFFI
import struct

from wrappers.generic import Wrapper
from wrappers.ptregs_wrap import get_pt_regs_wrapper

from penguin import Plugin, getColoredLogger, plugins
from penguin.abi_info import ARCH_ABI_INFO


[docs] class KFFI(Plugin): """ KFFI Plugin ----------- Provides methods for calling kernel functions and interacting with kernel memory. Methods ~~~~~~~ - ``call_function``: Call a kernel function with arguments. """ def __init__(self) -> None: self.outdir = self.get_arg("outdir") conf = self.get_arg("conf") kernel = conf["core"]["kernel"] arch = conf["core"]["arch"] if arch == "intel64": arch = "x86_64" elif arch == "aarch64": arch = "arm64" self.isf = realpath(join(kernel, f"../cosi.{arch}.json.xz")) self.logger = getColoredLogger("plugins.kffi") if not isfile(self.isf): self.logger.error(f"ISF file not found: {self.isf}") raise FileNotFoundError(f"ISF file not found: {self.isf}") self.logger.debug(f"Loading ISF file: {self.isf}") self.igloo_ko_isf = realpath(join(kernel, f"../igloo.ko.{arch}.json.xz")) self.ffi = DFFI([self.igloo_ko_isf, self.isf]) self._tramp_callbacks = {} self._tramp_addresses = {} self.tramp_init = False def __init_tramp_functionality(self): if self.tramp_init: return self.tramp_init = True # Register trampoline hit hypercall handler from hyper.consts import igloo_hypercall_constants as iconsts self.portal = plugins.portal self._on_tramp_hit_hypercall = self.portal.wrap(self._on_tramp_hit_hypercall) self.panda.hypercall(iconsts.IGLOO_HYP_TRAMP_HIT)(self._on_tramp_hit_hypercall) # Register with portal's interrupt handler system self.portal.register_interrupt_handler( "kffi", self._tramp_interrupt_handler)
[docs] def cdef(self, source: str) -> None: """ Compile C definitions on the fly and load them into DWARFFI. Automatically handles architecture-specific compiler flags, musl headers, and caches the ISF output to speed up subsequent runs. Args: source (str): The C code containing structs/enums/typedefs to compile. """ conf = self.get_arg("conf") proj_dir = self.get_arg("proj_dir") arch = conf["core"]["arch"] arch_info = ARCH_ABI_INFO[arch] abi = arch_info.get("default_abi", list(arch_info.get("abis", {}).keys())[0]) abi_info = arch_info["abis"][abi] # Determine caching directory if proj_dir: cache_dir = Path(proj_dir).resolve() / "qcows" / "cache" else: cache_dir = Path(os.path.dirname(os.path.abspath(__file__))).resolve() / "qcows" / "cache" os.makedirs(cache_dir, exist_ok=True) # Hash input for caching hash_input = f"{arch}_{abi}_{source}".encode() cache_key = hashlib.sha256(hash_input).hexdigest() cache_path = cache_dir / f"kffi_cdef_{arch}_{abi}_{cache_key}.json.xz" # Check cache if cache_path.exists(): self.logger.debug(f"Loading cached DWARFFI ISF from {cache_path}") self.ffi.load_isf(str(cache_path)) return # Build strict cross-compilation flags based on ABI config headers_dir = f"/igloo_static/musl-headers/{abi_info['musl_arch_name']}/include" target = abi_info.get("target_triple", None) or arch_info["target_triple"] compiler_flags = [ "-O3", "-g", "-gdwarf-4", "-fno-eliminate-unused-debug-types", "-c", "-target", target, "-isystem", headers_dir, "-nostdinc", ] for key, value in abi_info.get("m_flags", {}).items(): compiler_flags.append(f"-m{key.replace('_', '-')}={value}") compiler_flags.extend(abi_info.get("extra_flags", [])) self.logger.info( f"Compiling cdef for {arch} {abi}. Caching to {cache_path.name}") # Delegate to DFFI to invoke clang -> dwarf2json -> load -> cache try: self.ffi.cdef( source=source, compiler="clang-20", compiler_flags=compiler_flags, save_isf_to=str(cache_path) ) except Exception as e: self.logger.error(f"Failed to compile and load cdef: {e}") raise
[docs] def new(self, type_: str, init: Any = None) -> Any: """ Create a new instance of a type. Args: type_ (str): Name of the type. init (Any): Initial value for the instance (optional). Returns: Any: Instance of the type, or None if type not found. """ try: return self.ffi.new(type_, init) except (KeyError, ValueError): return None
[docs] def from_buffer(self, type_: str, buf: bytes, instance_offset_in_buffer: int = 0) -> Any: """ Create an instance of a type from a buffer. Args: type_ (str): Name of the type. buf (bytes): Buffer containing the data. instance_offset_in_buffer (int): Offset in buffer (default: 0). Returns: Any: Instance of the type. """ """Create an instance of a type from a buffer.""" # Ensure we pass a bytearray to dwarffi return self.ffi.from_buffer(type_, bytearray(buf), offset=instance_offset_in_buffer)
[docs] def get_field_casted(self, struct: Any, field: str) -> Any: """ Get a field from a struct, casted to its declared CFFI type. Args: struct (Any): Struct instance. field (str): Field name. Returns: Any: Field value, or None if error occurs. """ try: return self.ffi.cast(struct._instance_type_def.fields[field].type_info["name"], getattr(struct, field)) except Exception as e: self.logger.error(f"Error casting field {field} of struct {struct}: {e}") return None
[docs] def read_type_panda(self, cpu: Any, addr: int, type_: str) -> Any: """ Read a type from kernel memory using PANDA. Args: cpu (Any): CPU context. addr (int): Address to read from. type_ (str): Name of the type. Returns: Any: Instance of the type, or None if read fails. """ size = self.ffi.sizeof(type_) if not size: return None buf = plugins.mem.read_bytes_panda(cpu, addr, size) if not buf: self.logger.error(f"Failed to read bytes from {addr:#x}") return None return self.ffi.from_buffer(type_, bytearray(buf), address=addr)
[docs] def read_type(self, addr: int, type_: str) -> Generator[Any, Any, Any]: """ Read a type from kernel memory. Args: addr (int): Address to read from. type_ (str): Name of the type. Returns: Any: Instance of the type, or None if read fails. """ size = self.ffi.sizeof(type_) if not size: return None if isinstance(addr, Ptr): addr = addr.address buf = yield from plugins.mem.read_bytes(addr, size) if not buf: self.logger.error(f"Failed to read bytes from {addr:#x}") return None instance = self.ffi.from_buffer(type_, buf, address=addr) return instance
[docs] def deref(self, ptr: Ptr) -> Generator[Any, Any, Any]: """ Dereference a pointer to a type. Args: ptr (Ptr): Pointer object. Returns: Any: Value pointed to, or None if pointer is null. """ if ptr.address == 0: self.logger.error(f"Pointer address is 0: {ptr}") return None val = yield from self.read_type(ptr.address, ptr.points_to_type_name) return val
[docs] def ref(self, thing: Any) -> Optional[int]: """ Gets the address of an ffi type'd object (usually a struct) Args: thing (Any): Object. Returns: int: The address, or None if no address attribute. """ return self.ffi.addressof(thing)
[docs] def get_enum_dict(self, enum_name: str) -> Wrapper: """ Get dictionary of enum constants. Args: enum_name (str): Name of the enum. Returns: Wrapper: Wrapper containing enum constants. """ enum = self.ffi.get_type(enum_name) if not enum or not hasattr(enum, "constants"): self.logger.error(f"Enum {enum_name} not found in ISF") return {} return Wrapper(enum.constants)
[docs] def get_struct_size(self, struct_name: str) -> Optional[int]: """ Get the size of a struct. Args: struct_name (str): Name of the struct. Returns: Optional[int]: Size of the struct, or None if not found. """ try: return self.ffi.sizeof(struct_name) except (KeyError, ValueError): return None
[docs] def sizeof(self, struct_name: str) -> Optional[int]: """ Alias for get_struct_size. Args: struct_name (str): Name of the struct. Returns: Optional[int]: Size of the struct, or None if not found. """ return self.get_struct_size(struct_name)
[docs] def get_function_address(self, function: str) -> Optional[int]: """ Get the address of a kernel function. Args: function (str): Name of the function. Returns: Optional[int]: Address of the function, or None if not found. """ sym = self.ffi.get_symbol(function, path=self.igloo_ko_isf) if sym and hasattr(sym, "address") and sym.address not in [None, 0]: return sym.address sym = self.ffi.get_symbol(function, path=self.isf) if sym and hasattr(sym, "address") and sym.address not in [None, 0]: return sym.address return None
def _fixup_igloo_module_baseaddr(self, addr): self.ffi.vtypejsons[self.igloo_ko_isf].shift_symbol_addresses(addr) def _prepare_ffi_call(self, func_ptr: int, args: list, func_name: str = None) -> Generator[Tuple[bytes, Optional[int], Optional[dict]], Any, Any]: """ Prepare FFI call structure for kernel execution, using function signature if available. Args: func_ptr (int): Address of the kernel function to call. args (list): List of arguments to pass to the function (max 8). func_name (str, optional): Name of the function (for signature lookup). Returns: Tuple[bytes, Optional[int], Optional[dict]]: Serialized ``portal_ffi_call`` structure, kernel memory address if allocated, and function signature ``type_info`` if available. """ self.logger.debug( f"Preparing FFI call: func_ptr={func_ptr:#x}, args={args}, func_name={func_name}") # Lookup function signature if possible func_typeinfo = None if func_name: sym = self.ffi.get_symbol(func_name) if sym and sym.type_info and sym.type_info.get("kind") == "function": func_typeinfo = sym.type_info # Validate arguments if len(args) > 8: raise ValueError( f"Too many arguments for FFI call: {len(args)} > 8") # Use signature to cast/corral arguments marshalled_args = [] if func_typeinfo and "parameters" in func_typeinfo: params = func_typeinfo["parameters"] for i, arg in enumerate(args): if i < len(params): param_type = params[i]["type"] kind = param_type.get("kind") if kind == "pointer": if isinstance(arg, Ptr): arg = arg.address elif not isinstance(arg, int): raise TypeError(f"Argument {i} expected pointer/int, got {type(arg)}") # String: allow str/bytes elif kind == "base" and param_type.get("name") in ("char", "unsigned char"): if isinstance(arg, str): arg = arg.encode() + b"\x00" elif isinstance(arg, bytes): arg = arg if arg.endswith(b"\x00") else arg + b"\x00" # TODO: struct/array/enum marshalled_args.append(arg) else: marshalled_args = list(args) arg_bytes = [] arg_ptr_indices = [] total_bytes = 0 boundtype_ptrs = {} for i, arg in enumerate(marshalled_args): if isinstance(arg, (int, float)) or hasattr(arg, '_value'): arg_bytes.append(None) elif isinstance(arg, str): b = arg.encode() + b"\x00" arg_bytes.append(b) arg_ptr_indices.append((i, total_bytes, len(b))) total_bytes += len(b) elif isinstance(arg, bytes): b = arg if arg.endswith(b"\x00") else arg + b"\x00" arg_bytes.append(b) arg_ptr_indices.append((i, total_bytes, len(b))) total_bytes += len(b) elif isinstance(arg, BoundTypeInstance): base_addr = getattr(arg, "_base_address", None) if base_addr is None: to_write = bytes(arg) raw_addr = yield from self.kmalloc(len(to_write) + 64) if not raw_addr: raise RuntimeError("Failed to allocate kernel memory for BoundTypeInstance") aligned_addr = (raw_addr + 63) & ~63 yield from plugins.mem.write_bytes(aligned_addr, to_write) boundtype_ptrs[i] = aligned_addr else: boundtype_ptrs[i] = self.ffi.addressof(arg).address elif hasattr(arg, '__bytes__'): b = bytes(arg) arg_bytes.append(b) arg_ptr_indices.append((i, total_bytes, len(b))) total_bytes += len(b) else: raise TypeError(f"Unsupported argument type for FFI: {type(arg)}") kmem_addr = None if total_bytes > 0: kmem_addr = yield from self.kmalloc(total_bytes) if not kmem_addr: raise RuntimeError("Failed to allocate kernel memory for FFI args") for i, off, sz in arg_ptr_indices: b = arg_bytes[i] if b is not None and sz > 0: yield from plugins.mem.write_bytes(kmem_addr + off, b) ffi_call = self.new("portal_ffi_call") ffi_call.func_ptr = func_ptr ffi_call.num_args = len(marshalled_args) for i, arg in enumerate(marshalled_args): if isinstance(arg, float): ffi_call.args[i] = struct.unpack('<Q', struct.pack('<d', arg))[0] elif isinstance(arg, BoundTypeInstance): # FIX: Check for BoundTypeInstance BEFORE checking for __bytes__ ffi_call.args[i] = boundtype_ptrs[i] elif isinstance(arg, int) or hasattr(arg, '_value'): ffi_call.args[i] = int(arg) elif isinstance(arg, (str, bytes)) or hasattr(arg, '__bytes__'): for idx, off, sz in arg_ptr_indices: if idx == i: ffi_call.args[i] = kmem_addr + off break else: raise TypeError(f"Unsupported argument type for FFI assignment: {type(arg)}") return bytes(ffi_call), kmem_addr, func_typeinfo
[docs] def call_kernel_function( self, func: Union[int, str], *args: Any) -> Generator[Any, Any, Any]: """ Call a kernel function dynamically with the given arguments. This uses the FFI mechanism to directly call kernel functions. CAUTION: This is extremely powerful and can easily crash the kernel if used incorrectly. Only call functions that are safe to call from arbitrary contexts. Args: func (int or str): Function address or name. *args (Any): Arguments to pass to the function (max 8). Returns: Any: Return value from the kernel function, or None if call fails. Note: This leaks memory. We should have a better policy on that. """ if isinstance(func, str): func_ptr = self.get_function_address(func) if func_ptr is None: self.logger.error(f"Function not found: {func}") return None elif isinstance(func, int): func_ptr = func else: raise ValueError(f"Invalid function pointer type: {type(func)}") self.logger.debug( f"call_kernel_function: func_ptr={func_ptr:#x}, args={args}") func_name = func if isinstance(func, str) else None buf, optsbuf, func_typeinfo = yield from self._prepare_ffi_call(func_ptr, args, func_name) # importing here to avoid circular import issues from hyper.portal import PortalCmd # Call the function response = yield PortalCmd("ffi_exec", size=len(buf), data=buf) if not response: self.logger.error(f"FFI call failed: func_ptr={func_ptr:#x}") return None # Parse the response result_struct = self.from_buffer("portal_ffi_call", response) result = result_struct.result # Marshal return value if function signature is available if func_typeinfo and "return_type" in func_typeinfo: ret_type = func_typeinfo["return_type"] kind = ret_type.get("kind") name = ret_type.get("name") if kind == "base": base_type = self.ffi.get_type(name) if base_type: # Unsigned fixup if base_type.signed is False and result < 0: result = result % (1 << (base_type.size * 8)) # Convert to correct Python type if base_type.kind in ("int", "pointer"): result = int(result) elif base_type.kind == "float": result = float(result) elif base_type.kind == "bool": result = bool(result) elif kind == "enum": try: enum_def = self.ffi.get_enum(name) if enum_def: result = EnumInstance(enum_def, result)._value except Exception as e: self.logger.warning(f"Failed to cast return value to enum {name}: {e}") elif kind == "pointer": # Return a Ptr object ptr_type = ret_type.get("subtype") result = Ptr(result, ptr_type, self.ffi) elif kind in ("struct", "union"): # Read struct/union from kernel memory at returned address struct_type = name if result != 0: val = yield from self.read_type(result, struct_type) result = val else: result = None return result
[docs] def call(self, func: Union[int, str], *args: Any) -> Generator[Any, Any, Any]: val = yield from self.call_kernel_function(func, *args) return val
[docs] def kmalloc(self, size: int) -> Generator[Any, Any, Any]: """ Allocate memory in the kernel using ``kmalloc``. Args: size (int): Size of memory to allocate. Returns: Any: Address of allocated memory, or None if allocation fails. """ val = yield from self.call_kernel_function("igloo_kzalloc", size) return val
[docs] def kfree(self, addr: int) -> Generator[Any, Any, Any]: """ Free memory in the kernel using ``kfree``. Args: addr (int): Address of memory to free. Returns: None """ yield from self.call_kernel_function("igloo_kfree", addr)
[docs] def kallsyms_lookup(self, symbol: str) -> Generator[Any, Any, Any]: """ Look up a kernel symbol address using the ``kallsyms_lookup`` portal operation (simplified). Args: symbol (str): Name of the symbol to look up. Returns: Any: Address of the symbol as int, or None if not found. """ if not symbol or not isinstance(symbol, str): self.logger.error("Symbol name must be a non-empty string") return None # Send symbol name as null-terminated bytes symbol_bytes = symbol.encode() + b"\x00" from hyper.portal import PortalCmd addr = yield PortalCmd("kallsyms_lookup", size=len(symbol_bytes), data=symbol_bytes) if not addr: self.logger.error(f"kallsyms_lookup: symbol not found: {symbol}") return None self.logger.debug(f"kallsyms_lookup: {symbol} -> {addr:#x}") return addr
[docs] def generate_trampoline(self) -> Generator[Any, Any, Any]: """ Request a trampoline from the kernel via portal. Returns: dict: Keys include ``tramp_id``, ``tramp_addr``, and ``status``. """ from hyper.portal import PortalCmd self.__init_tramp_functionality() # Send empty buffer for trampoline generation response = yield PortalCmd("tramp_generate", size=0, data=b"") if not response: self.logger.error("Trampoline generation failed: no response") return None tramp_struct = self.from_buffer("portal_tramp_generate", response) return { "tramp_id": tramp_struct.tramp_id, "tramp_addr": tramp_struct.tramp_addr, "status": tramp_struct.status, }
[docs] def callback(self, func) -> Generator[Any, Any, Any]: """ Register a trampoline callback and return an integer guest virtual address. Immediately generates the trampoline, sets up the interrupt handler, and returns an integer address. """ if func in self._tramp_addresses: return self._tramp_addresses[func] tramp_info = yield from self.generate_trampoline() tramp_id = tramp_info.get("tramp_id") tramp_addr = tramp_info.get("tramp_addr") num_args = len(inspect.signature(func).parameters) self._tramp_callbacks[tramp_id] = (func, num_args) self._tramp_callbacks[func] = tramp_id self._tramp_addresses[tramp_id] = tramp_addr self._tramp_addresses[func] = tramp_addr return tramp_addr
[docs] def get_callback_id(self, f: Union[int, Any]) -> Optional[int]: """ Get the trampoline ID for a registered callback function or trampoline address. Args: f (int | Any): Callback function or trampoline address. Returns: Optional[int]: Trampoline ID, or None if not found. """ return self._tramp_callbacks.get(f, None)
def _tramp_interrupt_handler(self): """ Interrupt handler to register trampoline callbacks. """ if not hasattr(self, '_pending_tramp_callbacks') or not self._pending_tramp_callbacks: return False pending_tramp_callbacks = self._pending_tramp_callbacks[:] self._pending_tramp_callbacks = [] while pending_tramp_callbacks: func = pending_tramp_callbacks.pop(0) tramp_info = yield from self.generate_trampoline() tramp_id = tramp_info.get("tramp_id") tramp_addr = tramp_info.get("tramp_addr") tramp_status = tramp_info.get("status") if tramp_id is not None and tramp_addr is not None: num_args = len(inspect.signature(func).parameters) self._tramp_callbacks[tramp_id] = (func, num_args) self.logger.debug(f"Registered trampoline callback {func.__name__} with id={tramp_id} addr={tramp_addr}") # Set Callback info if exists if hasattr(self, '_tramp_proxy_map') and func in self._tramp_proxy_map: cb = self._tramp_proxy_map[func] cb.address = tramp_addr cb.id = tramp_id cb.status = tramp_status cb.ready = True else: self.logger.error(f"Failed to register trampoline callback for {func.__name__}") return False def _on_tramp_hit_hypercall(self, cpu): """ Handles trampoline hit hypercall and invokes the registered callback with pt_regs. """ tramp_id = self.panda.arch.get_arg(cpu, 1, convention="syscall") pt_regs_addr = self.panda.arch.get_arg(cpu, 2, convention="syscall") if not hasattr(self, '_tramp_callbacks') or tramp_id not in self._tramp_callbacks: self.logger.error(f"Trampoline hit for unknown id: {tramp_id}") return entry = self._tramp_callbacks[tramp_id] callback, num_args = entry self.logger.debug(f"Invoking trampoline callback for id={tramp_id}: {getattr(callback, '__name__', repr(callback))}") try: pt_regs_raw = yield from self.read_type(pt_regs_addr, "pt_regs") pt_regs = get_pt_regs_wrapper(self.panda, pt_regs_raw) original_bytes = pt_regs.to_bytes()[:] # Get args from pt_regs if num_args > 1: # Get args from pt_regs args = yield from pt_regs.get_args_portal(num_args - 1, convention="userland") else: args = [] # Call callback with pt_regs and args result = callback(pt_regs, *args) if isinstance(result, Iterator): result = yield from result # If callback returns int, set as return value if isinstance(result, int): pt_regs.set_retval(result) new = pt_regs.to_bytes() if original_bytes != new: yield from plugins.mem.write_bytes(pt_regs_addr, new) except Exception as e: self.logger.error(f"Error in trampoline callback {callback.__name__}: {e}")