Source code for pyplugins.apis.osi

"""
OSI Plugin (osi.py) for Penguin
===============================

This module provides the OSI plugin for the Penguin framework, enabling coroutine-based access to guest OS state via the hypervisor portal. It allows querying of process information, file descriptors, memory mappings, process handles, arguments, and environment variables. The plugin is designed for advanced analysis, automation, and plugin interoperability.

Features
--------

- Retrieve process arguments, environment variables, and process names.
- List open file descriptors and resolve their names.
- Query memory mappings and locate mappings by address.
- Fetch process handles and detailed process information.
- All methods are coroutine-based and return wrapper objects or lists for further inspection.

Example Usage
-------------

.. code-block:: python

    from penguin import plugins

    # Get process name
    procname = yield from plugins.OSI.get_proc_name(pid)

    # Get process arguments
    args = yield from plugins.OSI.get_args(pid)

    # Get environment variables
    env = yield from plugins.OSI.get_env(pid)

    # Get open file descriptors
    fds = yield from plugins.OSI.get_fds(pid)

    # Get memory mappings
    mappings = yield from plugins.OSI.get_mappings(pid)

Purpose
-------

The OSI plugin enables flexible, efficient, and scriptable access to guest OS state, supporting advanced analysis, automation, and plugin interoperability in the Penguin environment.
"""

from penguin import Plugin, plugins
from hyper.consts import HYPER_OP as hop
from hyper.portal import PortalCmd
from wrappers.generic import Wrapper
from wrappers.osi_wrap import MappingWrapper, MappingsWrapper
from typing import List, Dict, Any, Optional, Generator

kffi = plugins.kffi
CONST_UNKNOWN_STR = "[???]"


[docs] class OSI(Plugin): """ OSI Plugin ========== Provides coroutine-based methods for querying OS-level information (processes, FDs, mappings) from the guest via the hypervisor portal. Attributes ---------- logger : object Logger for debug and error messages. """
[docs] def get_fd_name( self, fd: int, pid: Optional[int] = None) -> Generator[Any, None, Optional[str]]: """ Get the filename for a specific file descriptor. Uses the efficient get_fds function to retrieve information for a specific file descriptor. Parameters ---------- fd : int File descriptor number. pid : int, optional Process ID, or None for current process. Returns ------- str or None The file descriptor name, or None if not found. """ self.logger.debug(f"get_fd_name called: fd={fd}") # Try using the get_fds functionality first (more efficient) # Only request the single FD we need fds = yield from self.get_fds(pid=pid, start_fd=fd, count=1) if fds and len(fds) > 0 and fds[0].fd == fd: fd_name = fds[0].name self.logger.debug( f"File descriptor name read successfully: {fd_name}") return fd_name return
[docs] def get_args(self, pid: Optional[int] = None) -> Generator[Any, None, List[str]]: """ Get the argument list for a process. Parameters ---------- pid : int, optional Process ID, or None for current process. Returns ------- List[str] List of argument strings (empty if not found). """ self.logger.debug("read_process_args called") proc_args = yield PortalCmd(hop.HYPER_OP_READ_PROCARGS, pid=pid) if not proc_args: return [] # Optimization: decode only valid data, split fast, filter printable decoded = proc_args.rstrip(b'\0').decode('latin-1', errors='replace') args = decoded.split() return [arg for arg in args if arg.isprintable()]
[docs] def get_proc_name(self, pid: Optional[int] = None) -> Generator[Any, None, str]: """ Get the process name (first argument) for a process. Parameters ---------- pid : int, optional Process ID, or None for current process. Returns ------- str Process name or '[???]' if not found. """ self.logger.debug("get_process_name called") proc_name = yield from self.get_args(pid) if proc_name: return proc_name[0] return CONST_UNKNOWN_STR
[docs] def get_env(self, pid: Optional[int] = None) -> Generator[Any, None, Dict[str, str]]: """ Get the environment variables for a process. Parameters ---------- pid : int, optional Process ID, or None for current process. Returns ------- Dict[str, str] Dictionary of environment variables (empty if not found). """ self.logger.debug("get_process_env called") proc_env = yield PortalCmd(hop.HYPER_OP_READ_PROCENV, pid=pid) if proc_env: args = [i.decode("latin-1").split("=") for i in proc_env.split(b"\0") if i] env = {k: v for k, v in args} self.logger.debug(f"Proc env read successfully: {env}") return env return {}
[docs] def get_proc(self, pid: Optional[int] = None) -> Generator[Any, None, Optional[Wrapper]]: """ Get detailed process information for a process. Parameters ---------- pid : int, optional Process ID, or None for current process. Returns ------- Wrapper or None Process information wrapper object, or None if not found. """ proc_bytes = yield PortalCmd(hop.HYPER_OP_OSI_PROC, 0, 0, pid) if proc_bytes: pb = kffi.from_buffer("osi_proc", proc_bytes) wrap = Wrapper(pb) wrap.name = proc_bytes[pb.name_offset:].decode("latin-1") return wrap
[docs] def get_proc_exe(self, pid: Optional[int] = None) -> Generator[Any, None, Optional[str]]: """ Get the full executable path for a process. Parameters ---------- pid : int, optional Process ID, or None for current process. Returns ------- Wrapper or None Process information wrapper object, or None if not found. """ exe_path_bytes = yield PortalCmd(hop.HYPER_OP_OSI_PROC_EXE, 0, 0, pid) if exe_path_bytes: return exe_path_bytes.decode('latin-1', errors='replace') return CONST_UNKNOWN_STR
[docs] def get_mappings(self, pid: Optional[int] = None) -> Generator[Any, None, MappingsWrapper]: """ Get memory mappings for a process. Parameters ---------- pid : int, optional Process ID, or None for current process. Returns ------- MappingsWrapper Wrapper containing all memory mappings (empty if not found). """ skip = 0 self.logger.debug( f"get_proc_mappings called for pid={pid}, skip={skip}") all_mappings = [] current_skip = skip total_count = 0 while True: # Send skip count in addr field, as per portal.c implementation self.logger.debug(f"Fetching mappings with skip={current_skip}") mappings_bytes = yield PortalCmd(hop.HYPER_OP_OSI_MAPPINGS, current_skip, 0, pid) if not mappings_bytes: self.logger.debug("No mapping data received") if not all_mappings: # If this was our first request return [], 0 break orh_struct = kffi.from_buffer("osi_result_header", mappings_bytes) count = orh_struct.result_count total_count = orh_struct.total_count # Get the actual size of data returned from the kernel total_size = len(mappings_bytes) self.logger.debug( f"Received {count} mappings out of {total_count}, buffer size: {total_size}") # Skip the header (two 64-bit counts) offset = 16 mappings = [] t_size = kffi.sizeof("osi_module") # Verify expected module array size against buffer size expected_end = offset + (count * t_size) if expected_end > total_size: self.logger.warning( f"Buffer too small for all mappings: need {expected_end}, got {total_size}. Adjusting count.") # Adjust count to fit available buffer adjusted_count = (total_size - offset) // t_size if adjusted_count < count: count = adjusted_count self.logger.warning(f"Adjusted mapping count to {count}") # Each mapping entry for i in range(count): # Ensure we have enough data if offset + t_size > total_size: self.logger.error( f"Buffer too short for mapping {i}: offset {offset}, len {total_size}") break try: # Create wrapper object for the mapping b = kffi.from_buffer( "osi_module", mappings_bytes, instance_offset_in_buffer=offset) mapping = MappingWrapper(b) # Check if name_offset is within bounds, and if the offset # makes sense if mapping.name_offset and mapping.name_offset < total_size: try: # Find null terminator - safely handle potential # out-of-bounds access end = mappings_bytes.find( b'\0', mapping.name_offset) if end != -1 and end < total_size: name = mappings_bytes[mapping.name_offset:end].decode( 'latin-1', errors='replace') mapping.name = name else: # If no null terminator found or out of bounds, # use a limited slice max_name_len = total_size - mapping.name_offset if max_name_len > 0: name = mappings_bytes[mapping.name_offset:mapping.name_offset + max_name_len].decode( 'latin-1', errors='replace') mapping.name = name else: mapping.name = "[unknown]" except Exception as e: self.logger.warning( f"Error decoding name for mapping {i}: {e}") mapping.name = "[invalid name]" else: mapping.name = "[unknown]" mappings.append(mapping) offset += t_size # Size of struct osi_module except Exception as e: self.logger.error(f"Error unpacking mapping {i}: {e}") break all_mappings.extend(mappings) # If we received less mappings than requested or already have all # mappings, we're done if len(mappings) == 0 or len(all_mappings) >= total_count: break # Update skip for next request current_skip += len(mappings) ret_mappings = MappingsWrapper(all_mappings) self.logger.debug(f"Retrieved a total of {len(all_mappings)} mappings") return ret_mappings
[docs] def get_proc_handles(self) -> Generator[Any, None, List[Wrapper]]: """ Retrieve a list of process handles from the kernel. Returns ------- List[Wrapper] List of process handle objects with properties: pid, taskd, start_time (empty if not found). """ self.logger.debug("get_proc_handles called") # Fetch proc handles from the kernel proc_handles_bytes = yield PortalCmd(hop.HYPER_OP_OSI_PROC_HANDLES, 0, 0) if not proc_handles_bytes: self.logger.debug("No process handles data received") return [] # Get the actual size of data returned from the kernel total_size = len(proc_handles_bytes) # Ensure we have enough data for the header if total_size < 16: self.logger.error( f"Buffer too small for header: {total_size} bytes") return [] # Extract header information orh_struct = kffi.from_buffer("osi_result_header", proc_handles_bytes) count = orh_struct.result_count total_count = orh_struct.total_count self.logger.debug( f"Received {count} process handles out of {total_count}") # Validate count values if count > 10000: self.logger.warning( f"Unreasonably large handle count: {count}, capping at 1000") count = 1000 # Skip the header offset = kffi.sizeof("osi_result_header") handles = [] handle_type = "osi_proc_handle" handle_size = kffi.sizeof(handle_type) # Calculate how many handles can actually fit in the buffer max_possible_count = (total_size - offset) // handle_size safe_count = min(count, max_possible_count) if safe_count < count: self.logger.warning( f"Buffer can only fit {safe_count} handles out of reported {count}") count = safe_count # Process each handle for i in range(count): if offset + handle_size > total_size: self.logger.error( f"Buffer too short for handle {i}: offset {offset}, len {total_size}") break try: # Create wrapper object for the handle handle = kffi.from_buffer( "osi_proc_handle", proc_handles_bytes, instance_offset_in_buffer=offset) handle_wrapper = Wrapper(handle) handles.append(handle_wrapper) offset += handle_size except Exception as e: self.logger.error(f"Error unpacking handle {i}: {e}") break self.logger.debug(f"Retrieved {len(handles)} process handles") return handles
[docs] def get_fds(self, pid: Optional[int] = None, start_fd: int = 0, count: Optional[int] = None) -> Generator[Any, None, List[Wrapper]]: """ Retrieve file descriptors for a process. Parameters ---------- pid : int, optional Process ID, or None for current process. start_fd : int, optional FD number to start listing from (default: 0). count : int, optional Maximum number of file descriptors to return (None for all). Returns ------- List[Wrapper] List of file descriptor objects with fd and name properties (empty if not found). """ # Ensure start_fd is an integer if start_fd is None: start_fd = 0 self.logger.debug( f"get_fds called: start_fd={start_fd}, pid={pid}, count={count}") fds = [] current_fd = start_fd while True: fds_bytes = yield PortalCmd(hop.HYPER_OP_READ_FDS, current_fd, 0, pid) if not fds_bytes: self.logger.debug("No file descriptors data received") # Return empty list only if we haven't fetched any FDs yet if not fds: return [] break # Get the actual size of data returned from the kernel total_size = len(fds_bytes) # Ensure we have enough data for the header if total_size < 16: self.logger.error( f"Buffer too small for header: {total_size} bytes") return [] # Make sure we're using the correct header structure format orh_struct = kffi.from_buffer("osi_result_header", fds_bytes) # In the kernel, these are LE64 values, need to access correctly batch_count = orh_struct.result_count total_count = orh_struct.total_count self.logger.debug( f"Raw header values: result_count={batch_count}, total_count={total_count}") self.logger.debug( f"Received {batch_count} file descriptors out of {total_count}") # Break if there are no FDs in this batch to avoid infinite loop if batch_count == 0: self.logger.debug( "No file descriptors in this batch, breaking loop") break # Skip the header offset = kffi.sizeof("osi_result_header") fd_size = kffi.sizeof("osi_fd_entry") # Process each FD entry for i in range(batch_count): if offset + fd_size > total_size: self.logger.error( f"Buffer too short for FD {i}: offset {offset}, len {total_size}") break try: # Create wrapper object for the FD fd_entry = kffi.from_buffer( "osi_fd_entry", fds_bytes, instance_offset_in_buffer=offset) fd_wrapper = Wrapper(fd_entry) # Extract the path name using name_offset if fd_entry.name_offset and fd_entry.name_offset < total_size: try: # Find null terminator end = fds_bytes.find(b'\0', fd_entry.name_offset) if end != -1 and end < total_size: name = fds_bytes[fd_entry.name_offset:end].decode( 'latin-1', errors='replace') fd_wrapper.name = name else: # Limited slice if no null terminator max_name_len = min( 256, total_size - fd_entry.name_offset) if max_name_len > 0: name = fds_bytes[fd_entry.name_offset:fd_entry.name_offset + max_name_len].decode( 'latin-1', errors='replace') fd_wrapper.name = name else: fd_wrapper.name = "[unknown]" except Exception as e: self.logger.warning( f"Error decoding name for FD {i}: {e}") fd_wrapper.name = "[invalid name]" else: fd_wrapper.name = "[unknown]" fds.append(fd_wrapper) offset += fd_size except Exception as e: self.logger.error(f"Error unpacking FD entry {i}: {e}") break # Track how many FDs we've processed in this batch self.logger.debug( f"Retrieved {batch_count} file descriptors in this batch, total now: {len(fds)}") # Update current_fd for next iteration (pagination) # We need to update by batch_count, not the total accumulated fds # Otherwise we might skip entries or go into an infinite loop current_fd += batch_count # Break if we've got all available FDs from kernel if len(fds) >= total_count: break # Break if we've fetched enough FDs based on count parameter if count is not None and len(fds) >= count: break # Protection against incorrect data in the list or count mismatch if count is not None and len(fds) > count: fds = fds[:count] # Just return the list of FDs return fds
[docs] def get_mapping_by_addr(self, addr: int) -> Generator[Any, None, Optional[MappingWrapper]]: """ Get the memory mapping containing a specific address. Parameters ---------- addr : int Address to look up. Returns ------- MappingWrapper or None Mapping containing the address, or None if not found. """ self.logger.debug(f"get_mapping_by_addr called: addr={addr:#x}") maps = yield from self.get_mappings() if maps: mapping = maps.get_mapping_by_addr(addr) if mapping: self.logger.debug( f"Mapping found: {mapping.name} at {mapping.start:#x} - {mapping.end:#x}") return mapping else: self.logger.debug(f"No mapping found for addr={addr:#x}")