Source code for pyplugins.apis.uprobes

"""
.. include:: /docs/uprobes.md
   :parser: myst_parser.sphinx_
"""

from penguin import plugins, Plugin
from typing import Dict, Any, Callable, Optional, Iterator, Union, List
from hyper.consts import igloo_hypercall_constants as iconsts
from hyper.consts import portal_type
from hyper.portal import PortalCmd
from wrappers.ptregs_wrap import get_pt_regs_wrapper
import functools
from collections import defaultdict
from hyper.consts import HYPER_OP as hop
import inspect

__all__ = [
    "Uprobes"
]


[docs] class Uprobes(Plugin): """ Uprobes Plugin ============== Provides an interface for registering and handling user-space probes (uprobes). """ def __init__(self): self.outdir = self.get_arg("outdir") self.projdir = self.get_arg("proj_dir") if self.get_arg_bool("verbose"): self.logger.setLevel("DEBUG") # Maps probe_id to (callback_handle, is_method, read_only, original_func, injection_config) self._hooks: Dict[int, tuple] = {} self._hook_info = {} self._pending_uprobes = [] # Mappings for unregistering self._handle_to_probe_ids: Dict[Callable, List[int]] = defaultdict(list) self._func_to_probe_ids: Dict[Callable, List[int]] = defaultdict(list) self._name_to_probe_ids: Dict[str, List[int]] = defaultdict(list) self.portal = plugins.portal self.portal.register_interrupt_handler( "uprobes", self._uprobe_interrupt_handler) self.panda.hypercall(iconsts.IGLOO_HYP_UPROBE_ENTER)( self._uprobe_enter_handler) self.panda.hypercall(iconsts.IGLOO_HYP_UPROBE_RETURN)( self._uprobe_return_handler) self._uprobe_event = self.plugins.portal.wrap(self._uprobe_event) def _resolve_callback(self, f, is_method, hook_ptr): if is_method and hasattr(f, '__qualname__') and '.' in f.__qualname__: class_name = f.__qualname__.split('.')[0] method_name = f.__qualname__.split('.')[-1] try: instance = getattr(plugins, class_name) if instance and hasattr(instance, method_name): bound_method = getattr(instance, method_name) if hook_ptr in self._hooks: # Update the callback but preserve metadata _, _, read_only, original_func, injection = self._hooks[hook_ptr] self._hooks[hook_ptr] = ( bound_method, False, read_only, original_func, injection) return bound_method except AttributeError: pass return f def _analyze_signature(self, func): """ Analyze signature to determine argument injection based on "sugar" rules. Context Indices: 0: is_enter (bool) 1: tgid_pid (u64) 2: cpu (Any) Rules: - Kwargs matching 'is_enter', 'tgid_pid'are explicitly bound. - If **kwargs exists, all context is passed. - Remaining Positional Args logic: - 1 arg: (is_enter) - 2 args: (is_enter, tgid_pid) """ try: sig = inspect.signature(func) params = list(sig.parameters.values()) except Exception: return [], {} # Skip 'self' if method if params and params[0].name == 'self': params = params[1:] # Skip 'regs' (always first) if params: params = params[1:] pos_indices = [] kw_indices = {} # Context Mapping CTX_ENTER = 0 CTX_TGID_PID = 1 # Temporary list of positionals to assign standard meanings to positional_candidates = [] # Pre-identify parameter names to avoid double-injection param_names = {p.name for p in params} for p in params: if p.kind == p.VAR_KEYWORD: # ONLY inject if the context names aren't already explicitly handled if 'is_enter' not in param_names: kw_indices['is_enter'] = CTX_ENTER if 'tgid_pid' not in param_names: kw_indices['tgid_pid'] = CTX_TGID_PID continue if p.kind == p.VAR_POSITIONAL: continue # Explicit Name Matching mapped_idx = -1 if p.name == 'is_enter': mapped_idx = CTX_ENTER elif p.name == 'tgid_pid': mapped_idx = CTX_TGID_PID if mapped_idx != -1: # Explicit match found if p.kind == p.KEYWORD_ONLY: kw_indices[p.name] = mapped_idx else: # It's positional-capable, but we treated it by name. # We add it to positionals list but pre-filled. # Actually, simplistic approach: explicit names are satisfied. # We just need to handle the "unnamed" positionals. pos_indices.append(mapped_idx) else: # Candidate for Sugar assignment if p.kind in (p.POSITIONAL_ONLY, p.POSITIONAL_OR_KEYWORD): # Store index in pos_indices to patch later positional_candidates.append(len(pos_indices)) pos_indices.append(None) # Placeholder # Apply Sugar Rules to unnamed positionals count = len(positional_candidates) if count == 1: # 1 Arg -> is_enter idx = positional_candidates[0] pos_indices[idx] = CTX_ENTER elif count == 2: # 2 Args -> is_enter, tgid_pid idx0 = positional_candidates[0] idx1 = positional_candidates[1] pos_indices[idx0] = CTX_ENTER pos_indices[idx1] = CTX_TGID_PID else: # Fallback: fill with None or maybe assume is_enter, tgid_pid order? # For now, safe default is None to avoid injection errors pass # Clean up any None entries (shouldn't happen with valid logic above unless >2 positionals) pos_indices = [x if x is not None else -1 for x in pos_indices] return pos_indices, kw_indices def _uprobe_event(self, cpu: Any, is_enter: bool) -> Any: arg = self.panda.arch.get_arg(cpu, 2, convention="syscall") sce = plugins.kffi.read_type_panda(cpu, arg, "portal_event") hook_id = sce.id if hook_id not in self._hooks: return f, is_method, read_only, _, injection = self._hooks[hook_id] ptregs_addr = sce.regs.address pt_regs_raw = plugins.kffi.read_type_panda(cpu, ptregs_addr, "pt_regs") pt_regs = get_pt_regs_wrapper(self.panda, pt_regs_raw) original_bytes = None if not read_only: original_bytes = bytes(pt_regs_raw) fn_to_call = f if not is_method else self._resolve_callback( f, is_method, hook_id) if fn_to_call: tgid_pid = (sce.tgid << 32) | sce.tid # Context Map: 0->is_enter, 1->tgid_pid ctx_values = (is_enter, tgid_pid) pos_ids, kw_ids = injection args = [] for i in pos_ids: if i >= 0: args.append(ctx_values[i]) else: args.append(None) # Should not happen with 1 or 2 args kwargs = {} if kw_ids: kwargs = {k: ctx_values[i] for k, i in kw_ids.items()} fn_ret = fn_to_call(pt_regs, *args, **kwargs) if isinstance(fn_ret, Iterator): fn_ret = yield from fn_ret else: return if not read_only: new_bytes = bytes(pt_regs_raw) if original_bytes != new_bytes: plugins.mem.write_bytes_panda(cpu, ptregs_addr, new_bytes) return fn_ret def _uprobe_enter_handler(self, cpu: Any) -> None: """ Entry handler for uprobes. Parameters ---------- cpu : Any CPU context. Returns ------- None """ self._uprobe_event(cpu, True) def _uprobe_return_handler(self, cpu: Any) -> None: """ Return handler for uprobes. Parameters ---------- cpu : Any CPU context. Returns ------- None """ self._uprobe_event(cpu, False)
[docs] @plugins.live_image.fs_init def on_fs_init(self): self.portal.queue_interrupt("uprobes")
def _uprobe_interrupt_handler(self) -> bool: """ Handle interrupts for pending uprobe registrations and unregistrations. """ if not self._pending_uprobes: return False pending_uprobes = self._pending_uprobes[:] self._pending_uprobes = [] while pending_uprobes: item = pending_uprobes.pop(0) # Handle unregister if isinstance(item, tuple) and item[0] == 'unregister': _, probe_id = item yield PortalCmd(hop.HYPER_OP_UNREGISTER_UPROBE, addr=probe_id) continue # Handle register uprobe_config, handle = item original_func = getattr(handle, '_original_func', handle) probe_id = yield from self._register_uprobe(uprobe_config) if probe_id: is_method = uprobe_config.get("is_method", False) read_only = uprobe_config.get("read_only", False) # [PATCH] Analyze signature injection = self._analyze_signature(original_func) self._hooks[probe_id] = ( handle, is_method, read_only, original_func, injection) self._hook_info[probe_id] = uprobe_config # Populate mappings self._handle_to_probe_ids[handle].append(probe_id) self._func_to_probe_ids[original_func].append(probe_id) func_name = getattr(original_func, "__name__", None) if func_name: self._name_to_probe_ids[func_name].append(probe_id) self.logger.debug( f"Registered uprobe ID {probe_id} for {uprobe_config['path']}:{uprobe_config.get('offset', 0):#x}") else: self.logger.error("Failed to register uprobe") return False def _register_uprobe(self, config: Dict[str, Any]) -> Iterator[Optional[int]]: on_enter = config.get("on_enter", True) on_return = config.get("on_return", False) if on_enter and on_return: probe_type = portal_type.PORTAL_UPROBE_TYPE_BOTH elif on_enter: probe_type = portal_type.PORTAL_UPROBE_TYPE_ENTRY elif on_return: probe_type = portal_type.PORTAL_UPROBE_TYPE_RETURN else: return None init_data = { "path": config["path"].encode('latin-1'), "offset": config["offset"], "type": probe_type, "pid": config.get("pid_filter") if config.get("pid_filter") is not None else 0xffffffff, "comm": config["process_filter"].encode('latin-1') if config.get("process_filter") else b"" } # 1. Allocate and initialize the struct in one call reg = plugins.kffi.new("uprobe_registration", init_data) # 2. Extract the native bytes representation directly reg_bytes = bytes(reg) result = yield PortalCmd(hop.HYPER_OP_REGISTER_UPROBE, reg.offset, len(reg_bytes), None, reg_bytes) if result is None: self.logger.error( f"Failed to register uprobe at {config['path']}:{config['offset']:#x}") return None probe_id = result self.logger.debug( f"Uprobe successfully registered with ID: {probe_id}") return result def _cleanup_probe_maps(self, probe_id: int): if probe_id in self._hooks: handle, _, _, original_func, _ = self._hooks[probe_id] if handle in self._handle_to_probe_ids: if probe_id in self._handle_to_probe_ids[handle]: self._handle_to_probe_ids[handle].remove(probe_id) if original_func in self._func_to_probe_ids: if probe_id in self._func_to_probe_ids[original_func]: self._func_to_probe_ids[original_func].remove(probe_id) name = getattr(original_func, "__name__", None) if name and name in self._name_to_probe_ids: if probe_id in self._name_to_probe_ids[name]: self._name_to_probe_ids[name].remove(probe_id) del self._hooks[probe_id] if probe_id in self._hook_info: del self._hook_info[probe_id]
[docs] def uprobe( self, path: Optional[str] = None, symbol: Union[str, int] = None, address: Optional[int] = None, base_addr: Optional[int] = None, process_filter: Optional[str] = None, on_enter: bool = True, on_return: bool = False, pid_filter: Optional[int] = None, read_only: bool = False, fail_register_ok: bool = False ) -> Callable[[Callable], Callable]: """ Decorator to register a uprobe. Can register by symbol name, file offset, or virtual address. Parameters ---------- path : Optional[str] Path to the executable or library file (can include wildcards). Required if 'address' is used. If None (and using 'symbol'), matches all libraries containing the symbol. symbol : Union[str, int] Symbol name (string) or file offset (integer). address : Optional[int] Virtual address to hook. If provided, overrides 'symbol'. base_addr : Optional[int] If provided with 'address', offset is calculated as address - base_addr. If None, attempts to resolve 'address' using ELF headers. process_filter : Optional[str] Process name to filter events. on_enter : bool Trigger on function entry (default: True). on_return : bool Trigger on function return (default: False). pid_filter : Optional[int] PID to filter events for a specific process. read_only: bool fail_register_ok : bool If True, silently return if symbol not found. Returns ------- Callable[[Callable], Callable] Decorator function that registers the uprobe. """ def _register_decorator(uprobe_configs): def decorator(func): # Wrapper to act as a unique handle @functools.wraps(func) def wrapper(*args, **kwargs): return func(*args, **kwargs) wrapper._original_func = func is_method = hasattr(func, '__self__') or ( hasattr(func, '__qualname__') and '.' in func.__qualname__) for uprobe_config in uprobe_configs: uprobe_config["callback"] = func uprobe_config["is_method"] = is_method uprobe_config["read_only"] = read_only # Store wrapper instead of raw func self._pending_uprobes.append((uprobe_config, wrapper)) if plugins.live_image.fs_generated: self.portal.queue_interrupt("uprobes") return wrapper return decorator def _no_op_decorator(func): return func base_config = { 'process_filter': process_filter, 'on_enter': on_enter, 'on_return': on_return, 'pid_filter': pid_filter, 'read_only': read_only } # --- CASE A: ADDRESS PROVIDED --- if address is not None: if path is None: self.logger.error( "Path must be specified when using 'address'.") return _no_op_decorator offset = plugins.symbols.resolve_addr( path, address, base_addr=base_addr) if offset is None: if fail_register_ok: return _no_op_decorator self.logger.error( f"Could not resolve address {address:#x} in {path} to a file offset. " "Defaulting to offset 0." ) offset = 0 cfg = base_config.copy() cfg.update({ "path": path, "offset": offset, "symbol": f"addr_{address:#x}" }) return _register_decorator([cfg]) # --- CASE B: SEARCH EVERYWHERE (path is None) --- if path is None: if isinstance(symbol, int): self.logger.error("If path is None, symbol must be a string.") return _no_op_decorator if symbol is None: self.logger.error("Must specify symbol or address.") return _no_op_decorator matching_libs = plugins.symbols.find_all(symbol) if not matching_libs: if fail_register_ok: return _no_op_decorator self.logger.warning( f"Symbol '{symbol}' not found in any library.") return _no_op_decorator uprobe_configs = [] for lib_path, offset in matching_libs: cfg = base_config.copy() cfg.update( {"path": lib_path, "offset": offset, "symbol": symbol}) uprobe_configs.append(cfg) return _register_decorator(uprobe_configs) # --- CASE C: SPECIFIC PATH (Symbol or Offset) --- if symbol is None: self.logger.error("Must specify symbol or address.") return _no_op_decorator if isinstance(symbol, int): offset = symbol # Check for potential Virtual Address confusion if path and '*' not in path: try: fs = plugins.static_fs size = fs.get_size(path) if size is not None: if offset >= size: self.logger.warning( f"Offset {offset:#x} exceeds file size ({size:#x}) of {path}. " "Attempting to resolve as virtual address..." ) resolved = plugins.symbols.resolve_addr( path, offset) if resolved is not None: self.logger.info( f"Resolved address {offset:#x} -> file offset {resolved:#x}") offset = resolved else: self.logger.error( f"Could not resolve address {offset:#x} to a valid file offset. Defaulting to offset 0.") offset = 0 except Exception as e: self.logger.debug( f"Could not verify offset vs file size: {e}") symbol_name = f"offset_{offset:#x}" resolved_path = path else: symbol_name = symbol resolved_path, offset = plugins.symbols.lookup(path, symbol) if offset is None: if fail_register_ok: return _no_op_decorator self.logger.warning( f"Symbol '{symbol}' not found in '{path}'. Defaulting to offset 0.") offset = 0 resolved_path = path cfg = base_config.copy() cfg.update( {"path": resolved_path, "offset": offset, "symbol": symbol_name}) return _register_decorator([cfg])
[docs] def uretprobe( self, path: Optional[str] = None, symbol: Union[str, int] = None, address: Optional[int] = None, base_addr: Optional[int] = None, **kwargs ) -> Callable: kwargs['on_enter'] = False kwargs['on_return'] = True return self.uprobe(path, symbol, address=address, base_addr=base_addr, **kwargs)
[docs] def unregister(self, target: Union[Callable, str]): """ Unregister a uprobe by handle, function, or name. Args: target: The handle (returned by decorator), function, or name of the uprobe to unregister. """ probe_ids = [] # 1. Try by handle (wrapper) if target in self._handle_to_probe_ids: probe_ids.extend(self._handle_to_probe_ids[target]) # 2. Try by original function func = getattr(target, '_original_func', target) if func in self._func_to_probe_ids: for pid in self._func_to_probe_ids[func]: if pid not in probe_ids: probe_ids.append(pid) # Check if target is a bound method, if so, check its __func__ if hasattr(target, '__func__') and target.__func__ in self._func_to_probe_ids: for pid in self._func_to_probe_ids[target.__func__]: if pid not in probe_ids: probe_ids.append(pid) # 3. Try by name if isinstance(target, str): if target in self._name_to_probe_ids: for pid in self._name_to_probe_ids[target]: if pid not in probe_ids: probe_ids.append(pid) if not probe_ids: self.logger.warning(f"No uprobes found for target {target}") return for pid in probe_ids: self._cleanup_probe_maps(pid) self._pending_uprobes.append(('unregister', pid)) self.portal.queue_interrupt("uprobes")