Source code for pyplugins.hyper.portal

"""
Portal Plugin
=============

This module implements the Portal plugin for the Penguin hypervisor environment. It provides a mechanism for plugins to communicate with the hypervisor and each other using memory-mapped regions and hypercalls. The Portal plugin manages command and data transfer, interrupt handling, and memory region state for efficient and flexible plugin communication.

Usage
-----

The Portal plugin is loaded by the Penguin framework and is not intended for direct invocation. It provides an API for other plugins to register interrupt handlers, queue interrupts, and send/receive commands via the portal mechanism.

Example
-------

.. code-block:: python

    # Register an interrupt handler
    portal.register_interrupt_handler("my_plugin", my_handler_fn)

    # Queue an interrupt for a plugin
    portal.queue_interrupt("my_plugin")

Classes
-------

- PortalCmd: Encapsulates a command to be sent through the portal mechanism.
- Portal: Main plugin class for handling portal communication and interrupts.

Key Features
------------

- Memory-mapped command and data transfer
- Plugin interrupt registration and handling
- Command construction and parsing utilities

"""

from penguin import plugins, Plugin
from collections.abc import Iterator
from hyper.consts import igloo_hypercall_constants as iconsts
from hyper.consts import HYPER_OP as hop
from typing import Union, Callable, Optional, Any
import struct
import functools

CURRENT_PID_NUM = 0xffffffff

kffi = plugins.kffi


# Global singleton for a no-operation command
_NONE_CMD = None


[docs] class PortalCmd: """ Encapsulates a command to be sent through the portal mechanism. This class centralizes the logic for constructing portal commands and reduces complexity in the _handle_output_cmd method. Attributes: op (int): Operation code from HYPER_OP constants. addr (int): Address field value. size (int): Size field value. pid (int): Process ID or CURRENT_PID_NUM for current process. data (Optional[bytes]): Optional data payload for the command. """ __slots__ = ('op', 'addr', 'size', 'pid', 'data') def __init__( self, op: Union[int, str], addr: int = 0, size: int = 0, pid: Optional[int] = None, data: Optional[bytes] = None ) -> None: """ Initialize a portal command. **Parameters:** - `op` (int | str): Operation code from HYPER_OP constants. - `addr` (int): Address field value. - `size` (int): Size field value. - `pid` (Optional[int]): Process ID or None for current process. - `data` (Optional[bytes]): Optional data payload for the command. **Returns:** None """ op_num = None if isinstance(op, int): self.op = op elif isinstance(op, str): op_num = getattr(hop, f"HYPER_OP_{op.upper()}", None) if op_num is None: op_num = getattr(hop, op.upper(), None) if op_num is None: raise ValueError(f"Invalid operation name: {op}") self.op = op_num else: raise TypeError(f"Operation must be int or str, got {type(op)}") if addr is None: self.addr = 0 else: self.addr = addr self.size = size if size is not None else (len(data) if data else 0) self.data = data if pid is None: self.pid = CURRENT_PID_NUM else: self.pid = pid
[docs] @classmethod def none(cls) -> "PortalCmd": """ Create a command representing no operation. **Returns:** - `PortalCmd`: A command with HYPER_OP_NONE operation. """ global _NONE_CMD if _NONE_CMD is None: _NONE_CMD = cls(hop.HYPER_OP_NONE, 0, 0, None, None) return _NONE_CMD
[docs] class Portal(Plugin): """ Portal is a plugin that manages communication and interrupts between plugins and the hypervisor. It provides methods for registering interrupt handlers, queuing interrupts, and reading/writing commands and data to memory-mapped regions. Attributes: outdir (str): Output directory for plugin data. endian_format (str): Endianness format character for struct operations. portal_interrupt (Optional[int]): Address of the portal interrupt. _interrupt_handlers (dict): Mapping of plugin names to their interrupt handler functions. _pending_interrupts (set): Set of plugin names with pending interrupts. regions_size (int): Size of the memory region. """ def __init__(self) -> None: """ Initialize the Portal plugin. - Sets up the output directory. - Registers memory region and portal interrupt handlers. - Initializes internal state for interrupt handling. **Returns:** None """ self.outdir = self.get_arg("outdir") conf = self.get_arg("conf") # in single-cpu mode we can just cache the cpu # otherwise call panda.get_cpu if conf and "core" in conf: smp = int(conf["core"].get("smp", 1)) if smp == 1: self._cached_single_cpu = None def get_cpu_fast(): if self._cached_single_cpu is None: self._cached_single_cpu = self.panda.get_cpu() return self._cached_single_cpu self._get_cpu = get_cpu_fast else: self._get_cpu = self.panda.get_cpu self.endian_format = '<' if self.panda.endianness == 'little' else '>' self.region_header_fmt = f"{self.endian_format}IIQQ" self.region_header_size = kffi.sizeof("region_header") self.region_header_struct = struct.Struct(self.region_header_fmt) self.mask = 0xFFFFFFFFFFFFFFFF if self.panda.bits == 64 else 0xFFFFFFFF self.portal_interrupt = None # Generic interrupts mechanism self._interrupt_handlers = {} # plugin_name -> handler_function self._pending_interrupts = set() # Set of plugin names with pending work self.panda.hypercall(iconsts.IGLOO_HYPER_REGISTER_MEM_REGION)( self._register_cpu_memregion) self.panda.hypercall(iconsts.IGLOO_HYPER_ENABLE_PORTAL_INTERRUPT)( self._register_portal_interrupt) # Don't wrap _portal_interrupt - it's not a generator function self.panda.hypercall(iconsts.IGLOO_HYPER_PORTAL_INTERRUPT)( self.wrap(self._portal_interrupt)) def _register_portal_interrupt(self, cpu: Any) -> None: """ Register the portal interrupt address for the current CPU. **Parameters:** - `cpu` (Any): CPU object. **Returns:** None """ self.portal_interrupt = self.panda.arch.get_arg( cpu, 1, convention="syscall") assert self.panda.arch.get_arg( cpu, 2, convention="syscall") == 0 def _portal_interrupt(self, cpu: Any) -> Iterator: """ Handle portal interrupts and process pending items from registered plugins. **Parameters:** - `cpu` (Any): CPU object. **Yields:** None """ # Process one item from each plugin that has pending interrupts interrupts = self._pending_interrupts.copy() self._pending_interrupts.clear() self._portal_clear_interrupt() for plugin_name in list(interrupts): if plugin_name in self._interrupt_handlers: handler_fn = self._interrupt_handlers[plugin_name] self.logger.debug(f"Processing interrupt for {plugin_name}") # Call handler function without any arguments # Plugin is responsible for tracking its own pending work yield from handler_fn()
[docs] def register_interrupt_handler( self, plugin_name: str, handler_fn: Callable[[], Iterator]) -> None: """ Register a plugin to handle portal interrupts. Parameters: - plugin_name (str): Name of the plugin. - handler_fn (Callable[[], Iterator]): Function to handle interrupts for this plugin. Must be a generator function that can be used with yield from. Returns: None """ self.logger.debug(f"Registering interrupt handler for {plugin_name}") # The handler function should be a wrapped generator self._interrupt_handlers[plugin_name] = handler_fn if plugin_name in self._pending_interrupts: self.logger.debug( f"Plugin {plugin_name} already had pending interrupts")
[docs] def queue_interrupt(self, plugin_name: str) -> bool: """ Queue an interrupt for a plugin. **Parameters:** - `plugin_name` (str): Name of the plugin. **Returns:** - `bool`: True if queued successfully, False otherwise. """ if plugin_name not in self._interrupt_handlers: self.logger.error( f"No interrupt handler registered for {plugin_name}") return False # Add plugin to pending set if plugin_name not in self._pending_interrupts: # Trigger an interrupt to process the item self._portal_set_interrupt() self._pending_interrupts.add(plugin_name) return True
def _cleanup_all_interrupts(self) -> None: """ Clean up all registered interrupt handlers and pending interrupts. **Returns:** None """ self._interrupt_handlers = {} self._pending_interrupts = set() def _portal_set_interrupt_value(self, value: int) -> None: """ Set the portal interrupt value in memory. **Parameters:** - `value` (int): Value to write to the portal interrupt address. **Returns:** None """ if self.portal_interrupt: buf = struct.pack(f"{self.endian_format}Q", value) try: plugins.mem.write_bytes_panda( self._get_cpu(), self.portal_interrupt, buf) except ValueError as e: # Failures are fine, we get them on the next portal interrupt # as long as we've queued an interrupt self.logger.debug(f"Failed to write portal interrupt: {e}") def _portal_set_interrupt(self) -> None: """ Set the portal interrupt to signal an event. **Returns:** None """ self._portal_set_interrupt_value(1) def _portal_clear_interrupt(self) -> None: """ Clear the portal interrupt. **Returns:** None """ self._portal_set_interrupt_value(0) ''' Our memregion is the first available memregion OR the one that is owned by us This can return none ''' def _read_memregion_state(self, cpum: tuple) -> tuple: """ Read the state of the memory region. **Parameters:** - `cpum` (tuple): Tuple of (cpu, cpu_memregion). **Returns:** - `(op, addr, size)`: Tuple of operation, address, and size. """ cpu, cpu_memregion = cpum try: buf = plugins.mem.read_bytes_panda(cpu, cpu_memregion, self.region_header_size) _, _, addr, size = self.region_header_struct.unpack(buf) except ValueError as e: self.logger.error(f"Failed to read memregion state: {e}") return 0, 0 return addr, size def _read_memregion_data(self, cpum: tuple, size: int) -> Optional[bytes]: """ Read data from the memory region. **Parameters:** - `cpum` (tuple): Tuple of (cpu, cpu_memregion). - `size` (int): Number of bytes to read. **Returns:** - `Optional[bytes]`: Data read from the memory region, or None on error. """ cpu, cpu_memregion = cpum if size > self.regions_size: self.logger.error( f"Size {size} exceeds chunk size {self.regions_size}") size = self.regions_size try: mem = plugins.mem.read_bytes_panda( cpu, cpu_memregion + self.region_header_size, size) return mem except ValueError as e: self.logger.error(f"Failed to read memory: {e}") def _read_memregion_state_and_data(self, cpum: tuple) -> tuple: """ Read the state and data from the memory region. **Parameters:** - `cpum` (tuple): Tuple of (cpu, cpu_memregion). **Returns:** - `(addr, size, data)`: Tuple of address, size, and data read from the memory region. """ cpu, cpu_memregion = cpum try: buf = plugins.mem.read_bytes_panda(cpu, cpu_memregion, self.region_header_size + self.regions_size) except ValueError as e: self.logger.error(f"Failed to read memregion state and data: {e}") return 0, 0, None _, _, addr, size = self.region_header_struct.unpack_from(buf, 0) data = buf[self.region_header_size:self.region_header_size + size] return addr, size, data def _write_memregion_state( self, cpum: tuple, op: int, addr: int, size: int, pid: Optional[int] = None, data: Optional[bytes] = None ) -> None: """ Write the state to the memory region. **Parameters:** - `cpum` (tuple): Tuple of (cpu, cpu_memregion). - `op` (int): Operation code. - `addr` (int): Address value. - `size` (int): Size value. - `pid` (Optional[int]): Process ID. **Returns:** None """ cpu, cpu_memregion = cpum if size > self.regions_size: self.logger.error( f"Size {size} exceeds chunk size {self.regions_size}") size = self.regions_size if size < 0: self.logger.error(f"Size {size} is negative") size = 0 if addr < 0: addr = addr & self.mask pid = pid or CURRENT_PID_NUM to_write = self.region_header_struct.pack(op, pid, addr, size) if data: if len(data) > self.regions_size: self.logger.error( f"Data length {len(data)} exceeds chunk size {self.regions_size}") data = data[:self.regions_size] to_write += data try: plugins.mem.write_bytes_panda(cpu, cpu_memregion, to_write) except ValueError as e: self.logger.error(f"Failed to write memregion state: {e}") def _handle_input_state(self, cpum: tuple, op: int) -> Optional[tuple]: """ Handle the input state from the memory region and process the operation. **Parameters:** - `cpum` (tuple): Tuple of (cpu, cpu_memregion). **Returns:** - `Optional[tuple]`: Operation and associated data, or None. """ in_op = None if op == hop.HYPER_OP_NONE: pass elif op & hop.HYPER_RESP_NONE == 0: self.logger.error(f"Invalid operation OP in return {op:#x}") elif op < hop.HYPER_RESP_NONE or op > hop.HYPER_RESP_MAX: self.logger.error(f"Invalid operation: {op:#x}") elif op == hop.HYPER_RESP_READ_OK: _, _, data = self._read_memregion_state_and_data(cpum) in_op = (op, data) elif op == hop.HYPER_RESP_READ_FAIL: self.logger.debug("Failed to read memory") elif op == hop.HYPER_RESP_READ_PARTIAL: _, _, data = self._read_memregion_state_and_data(cpum) in_op = (op, data) elif op == hop.HYPER_RESP_WRITE_OK: pass elif op == hop.HYPER_RESP_WRITE_FAIL: self.logger.debug("Failed to write memory") pass elif op == hop.HYPER_RESP_READ_NUM: _, size = self._read_memregion_state(cpum) in_op = (op, size) elif op == hop.HYPER_RESP_NONE: pass else: self.logger.error(f"Unknown operation: {op:#x}") return in_op def _write_portalcmd(self, cpum: tuple, cmd: PortalCmd) -> None: """ Write a PortalCmd to the memory region. **Parameters:** - `cpum` (tuple): Tuple of (cpu, cpu_memregion). - `cmd` (PortalCmd): PortalCmd instance to write. **Returns:** None """ self._write_memregion_state(cpum, cmd.op, cmd.addr, cmd.size, cmd.pid, cmd.data)
[docs] def wrap(self, f: Callable) -> Callable: """ Wrap a function to manage portal command iteration and state. **Parameters:** - `f` (Callable): Function to wrap. **Returns:** - `Callable`: Wrapped function. """ iterators = {} syscall_convention = self.panda.arch.call_conventions["syscall"] cpu_memregion_reg = syscall_convention[3] op_reg = syscall_convention[4] get_reg = self.panda.arch.get_reg get_cpu = self._get_cpu handle_input_state = self._handle_input_state write_portalcmd = self._write_portalcmd HYPER_RESP_READ_OK = hop.HYPER_RESP_READ_OK HYPER_RESP_READ_NUM = hop.HYPER_RESP_READ_NUM HYPER_RESP_READ_PARTIAL = hop.HYPER_RESP_READ_PARTIAL HYPER_OP_NONE = hop.HYPER_OP_NONE @functools.wraps(f) def wrapper(*args, **kwargs): if self._pending_interrupts: self.logger.debug("Pending interrupts detected, setting interrupt") self._portal_set_interrupt() cpu = get_cpu() cpu_memregion = get_reg(cpu, cpu_memregion_reg) cpum = cpu, cpu_memregion fn_return = None current_iter = iterators.get(cpu_memregion, None) if current_iter is None: # self.logger.debug("Creating new iterator") # Revert to calling the original function f with self_ fn_ret = f(*args, **kwargs) if not isinstance(fn_ret, Iterator): return fn_ret iterators[cpu_memregion] = fn_ret current_iter = fn_ret in_op = None new_iterator = True else: op = get_reg(cpu, op_reg) in_op = handle_input_state(cpum, op) new_iterator = False try: if not in_op: cmd = next(current_iter) elif in_op[0] == HYPER_RESP_READ_OK: cmd = current_iter.send(in_op[1]) elif in_op[0] == HYPER_RESP_READ_NUM: cmd = current_iter.send(in_op[1]) elif in_op[0] == HYPER_RESP_READ_PARTIAL: cmd = current_iter.send(in_op[1]) else: iterators[cpu_memregion] = None raise Exception(f"Invalid state cmd is {in_op}") except StopIteration as e: del iterators[cpu_memregion] # The function has completed, and we need to return the value fn_return = e.value cmd = PortalCmd.none() except Exception as e: self.logger.error(f"Error in portal iterator: {e}") cmd = e if cmd.__class__.__name__ == "PortalCmd": if not (new_iterator and cmd.op == HYPER_OP_NONE): write_portalcmd(cpum, cmd) elif isinstance(cmd, Exception): write_portalcmd(cpum, PortalCmd.none()) raise cmd elif cmd is not None: self.logger.error( f"Invalid return to portal: {type(cmd)} {cmd}") return fn_return return wrapper
def _register_cpu_memregion(self, cpu: Any) -> None: """ Register the memory region size for the current CPU. **Parameters:** - `cpu` (Any): CPU object. **Returns:** None """ self.regions_size = self.panda.arch.get_arg( cpu, 1, convention="syscall")
[docs] def uninit(self) -> None: """ Clean up all interrupt handlers and pending interrupts on plugin unload. **Returns:** None """ self._cleanup_all_interrupts()