Source code for pyplugins.apis.syscalls

"""
.. include:: /docs/syscalls.md
   :parser: myst_parser.sphinx_
"""

from penguin import plugins, Plugin
import json
from typing import Dict, List, Any, Callable, Optional, Iterator, Union
from hyper.consts import value_filter_type as vft
from hyper.consts import igloo_hypercall_constants as iconsts
from hyper.consts import igloo_base_hypercalls as bconsts
from hyper.portal import PortalCmd
from wrappers.ptregs_wrap import get_pt_regs_wrapper
import functools
from collections import defaultdict
from hyper.consts import HYPER_OP as hop

# allows us to not include SyscallPrototype which is internal
__all__ = [
    "ValueFilter",
    "Syscalls"
]


class SyscallEvent:
    __slots__ = ('_sce', 'name')

    def __init__(self, sce):
        self._sce = sce

        # 1. Fast C-string extraction natively supported by dwarffi.
        # Calling bytes() on the array field does a direct buffer slice
        # (equivalent to your old start:start+size math) without the boilerplate.
        raw_bytes = bytes(sce.syscall_name)

        # 2. Parse the C-string
        self.name = raw_bytes.split(b'\x00', 1)[0].decode('utf-8', errors='replace')

    def __getattr__(self, attr):
        # 3. Transparently pass through any standard field accesses
        # (e.g., event.orig_x0) to the underlying dwarffi instance.
        return getattr(self._sce, attr)

    def __bytes__(self) -> bytes:
        # 4. Allow bytes() to be called on the wrapper to get the raw bytes for writing back to memory.
        return bytes(self._sce)

    def __setattr__(self, attr, value):
        # If the attribute belongs to the wrapper, set it locally
        if attr in self.__slots__:
            object.__setattr__(self, attr, value)
        else:
            # Otherwise, forward the write to the underlying dwarffi instance
            setattr(self._sce, attr, value)

    @property
    def size(self) -> int:
        # Expose the size from the underlying dwarffi BoundTypeInstance
        return self._sce._instance_type_def.size


[docs] class ValueFilter: """ ValueFilter =========== Represents a complex value filter for syscall arguments or return values. Attributes ---------- filter_type : int The type of filter. value : int The value for the filter. min_value : int The minimum value for range filters. max_value : int The maximum value for range filters. bitmask : int The bitmask for bitmask filters. """ __slots__ = ('filter_type', 'value', 'min_value', 'max_value', 'bitmask', 'pattern') def __init__(self, filter_type: int = vft.SYSCALLS_HC_FILTER_EXACT, value: int = 0, min_value: int = 0, max_value: int = 0, bitmask: int = 0, pattern: Optional[bytes] = None) -> None: """ Initialize a ValueFilter. Parameters ---------- filter_type : int The type of filter. value : int The value for the filter. min_value : int The minimum value for range filters. max_value : int The maximum value for range filters. bitmask : int The bitmask for bitmask filters. """ self.filter_type = filter_type self.value = value self.min_value = min_value self.max_value = max_value self.bitmask = bitmask self.pattern = pattern @classmethod def _encode_pattern(cls, pattern: str) -> bytes: """ Encodes a string pattern to bytes with a null terminator. """ if pattern is None: return None # Ensure it's null-terminated for C-string safety return pattern.encode('utf-8') + b'\x00'
[docs] @classmethod def exact(cls, value: int) -> "ValueFilter": """ Create an exact match filter. Parameters ---------- value : int The value to match. Returns ------- ValueFilter An exact match filter. """ return cls(filter_type=vft.SYSCALLS_HC_FILTER_EXACT, value=value)
[docs] @classmethod def greater(cls, value: int) -> "ValueFilter": """ Create a greater than filter. Parameters ---------- value : int The value to compare. Returns ------- ValueFilter A greater than filter. """ return cls(filter_type=vft.SYSCALLS_HC_FILTER_GREATER, value=value)
[docs] @classmethod def greater_equal(cls, value: int) -> "ValueFilter": """ Create a greater than or equal filter. Parameters ---------- value : int The value to compare. Returns ------- ValueFilter A greater than or equal filter. """ return cls(filter_type=vft.SYSCALLS_HC_FILTER_GREATER_EQUAL, value=value)
[docs] @classmethod def less(cls, value: int) -> "ValueFilter": """ Create a less than filter. Parameters ---------- value : int The value to compare. Returns ------- ValueFilter A less than filter. """ return cls(filter_type=vft.SYSCALLS_HC_FILTER_LESS, value=value)
[docs] @classmethod def less_equal(cls, value: int) -> "ValueFilter": """ Create a less than or equal filter. Parameters ---------- value : int The value to compare. Returns ------- ValueFilter A less than or equal filter. """ return cls(filter_type=vft.SYSCALLS_HC_FILTER_LESS_EQUAL, value=value)
[docs] @classmethod def not_equal(cls, value: int) -> "ValueFilter": """ Create a not equal filter. Parameters ---------- value : int The value to compare. Returns ------- ValueFilter A not equal filter. """ return cls(filter_type=vft.SYSCALLS_HC_FILTER_NOT_EQUAL, value=value)
[docs] @classmethod def range(cls, min_value: int, max_value: int) -> "ValueFilter": """ Create a range filter. Parameters ---------- min_value : int The minimum value. max_value : int The maximum value. Returns ------- ValueFilter A range filter. """ return cls(filter_type=vft.SYSCALLS_HC_FILTER_RANGE, min_value=min_value, max_value=max_value)
[docs] @classmethod def success(cls) -> "ValueFilter": """ Create a success filter (>= 0). Returns ------- ValueFilter A success filter. """ return cls(filter_type=vft.SYSCALLS_HC_FILTER_SUCCESS)
[docs] @classmethod def error(cls) -> "ValueFilter": """ Create an error filter (< 0). Returns ------- ValueFilter An error filter. """ return cls(filter_type=vft.SYSCALLS_HC_FILTER_ERROR)
[docs] @classmethod def bitmask_set(cls, bitmask: int) -> "ValueFilter": """ Create a bitmask set filter. Parameters ---------- bitmask : int The bitmask to set. Returns ------- ValueFilter A bitmask set filter. """ return cls(filter_type=vft.SYSCALLS_HC_FILTER_BITMASK_SET, bitmask=bitmask)
[docs] @classmethod def bitmask_clear(cls, bitmask: int) -> "ValueFilter": """ Create a bitmask clear filter. Parameters ---------- bitmask : int The bitmask to clear. Returns ------- ValueFilter A bitmask clear filter. """ return cls(filter_type=vft.SYSCALLS_HC_FILTER_BITMASK_CLEAR, bitmask=bitmask)
[docs] @classmethod def string_exact(cls, pattern: str) -> "ValueFilter": """ Create an exact string match filter. """ return cls(filter_type=vft.SYSCALLS_HC_FILTER_STR_EXACT, pattern=cls._encode_pattern(pattern))
[docs] @classmethod def string_contains(cls, pattern: str) -> "ValueFilter": """ Create a string contains filter. """ return cls(filter_type=vft.SYSCALLS_HC_FILTER_STR_CONTAINS, pattern=cls._encode_pattern(pattern))
[docs] @classmethod def string_startswith(cls, pattern: str) -> "ValueFilter": """ Create a string starts-with filter. """ return cls(filter_type=vft.SYSCALLS_HC_FILTER_STR_STARTSWITH, pattern=cls._encode_pattern(pattern))
[docs] @classmethod def string_endswith(cls, pattern: str) -> "ValueFilter": """ Create a string ends-with filter. """ return cls(filter_type=vft.SYSCALLS_HC_FILTER_STR_ENDSWITH, pattern=cls._encode_pattern(pattern))
class SyscallPrototype: """ SyscallPrototype ================ Represents syscall metadata, including argument types and names. This is an internal class used by the Syscalls plugin to store syscall information. Attributes ---------- name : str The syscall name. types : list List of argument types. names : list List of argument names. args : list List of argument type/name pairs. unknown_args : bool Whether the arguments are unknown. nargs : int Number of arguments. """ __slots__ = ('name', 'types', 'names', 'args', 'unknown_args', 'nargs') def __init__(self, name: str, args: Optional[List[Any]] = None, unknown_args: bool = False) -> None: """ Initialize a SyscallPrototype. Parameters ---------- name : str The syscall name. args : list, optional List of argument type/name pairs. unknown_args : bool Whether the arguments are unknown. """ # Store the name as the primary identifier self.name = name self.types = [] self.names = [] self.args = args self.unknown_args = unknown_args if args: for i, j in args: self.types.append(i) self.names.append(j) self.nargs = len(self.types) def __repr__(self) -> str: """ Return a string representation of the SyscallPrototype. Returns ------- str String representation. """ return f"<SyscallPrototype name='{self.name}' nargs={self.nargs}>"
[docs] class Syscalls(Plugin): """ Syscalls Plugin =============== Provides an interface to monitor and intercept system calls. Uses the portal's interrupt mechanism to handle registration and event delivery. """ def __init__(self) -> None: """ Initialize the Syscalls plugin. Registers with portal and sets up syscall metadata. """ self.outdir = self.get_arg("outdir") # Map hook pointers to callbacks # Maps hook pointers to (on_all, callback_func, is_method, read_only) tuples self._hooks: Dict[int, tuple] = {} self._hook_info = {} # Track function -> hook_ptr and name -> hook_ptr for easier lookup # Track mappings for unregistering # specific handle -> hook_ptr self._handle_to_hook_ptr: Dict[Callable, int] = {} self._func_to_hook_ptrs: Dict[Callable, List[int]] = defaultdict( list) # original func -> [hook_ptrs] self._name_to_hook_ptrs: Dict[str, List[int]] = defaultdict( list) # name -> [hook_ptrs] # Syscall type information - using dictionary for fast name-based # lookups self._syscall_info_table: Dict[str, SyscallPrototype] = {} # OPTIMIZATION: Raw Name Cache (read -> Proto) # Prevents re-cleaning strings for global hooks self._raw_name_proto_cache: Dict[str, SyscallPrototype] = {} # OPTIMIZATION: Hook Ptr Cache (0x1234 -> Proto) # Fast path for specific hooks self._hook_proto_cache: Dict[int, SyscallPrototype] = {} # Register with portal's interrupt handler system plugins.portal.register_interrupt_handler( "syscalls", self._syscall_interrupt_handler) # Register handlers for syscall setup and events self.panda.hypercall(bconsts.IGLOO_HYP_SETUP_SYSCALL)( self._setup_syscall_handler) # Register syscall enter/return hypercalls self.panda.hypercall(iconsts.IGLOO_HYP_SYSCALL_ENTER)( self._syscall_enter_event) self.panda.hypercall(iconsts.IGLOO_HYP_SYSCALL_RETURN)( self._syscall_return_event) # Add a queue for pending hook registrations self._pending_hooks = [] self._syscall_event = plugins.portal.wrap(self._syscall_event) def _setup_syscall_handler(self, cpu: int) -> None: """ Handler for setting up syscall definitions. Parameters ---------- cpu : int CPU id. Returns ------- None """ reg1 = self.panda.arch.get_arg(cpu, 1, convention="syscall") # Read the JSON string at reg1 json_str = plugins.mem.read_str_panda(cpu, reg1) syscall_data = json.loads(json_str) # Clean the syscall name name = syscall_data["name"] clean_name = self._clean_syscall_name(name) unknown_args = syscall_data.get("args", "") == "unknown" if unknown_args: args = [] else: args = syscall_data.get("args") or [] # Create prototype without syscall_nr sysinfo = SyscallPrototype( name=f'sys_{clean_name}', args=args, unknown_args=unknown_args ) # Store by cleaned name in the hash table if unknown_args and clean_name in self._syscall_info_table: self.logger.debug( f"Syscall {name} is a compat syscall, skipping it for a better match") else: self._syscall_info_table[clean_name] = sysinfo # Clear caches that rely on names if we update definitions self._raw_name_proto_cache.clear() self.logger.debug( f"Registered syscall {name} (cleaned: {clean_name})") def _clean_syscall_name(self, name: str) -> str: """ Clean a syscall name. Removes various prefixes and architecture-specific parts. Parameters ---------- name : str The syscall name. Returns ------- str Cleaned syscall name. """ if name.startswith("compat_"): name = name[7:] # First remove sys_ prefix if present if name.startswith("sys_"): name = name[4:] # Then remove any remaining leading underscores name = name.lstrip("_") return name def _syscall_interrupt_handler(self) -> bool: """ Handle interrupts from the portal for syscall hook registration. Returns ------- bool True if more hooks are pending, False otherwise. """ # Always ensure we yield at least once so the function returns a proper # generator if not self._pending_hooks: self.logger.debug("No pending syscall hooks to register") # Yield a no-op operation to ensure this is always an iterator return False pending_hooks = self._pending_hooks[:] self._pending_hooks = [] while pending_hooks: # Take one item from the queue item = pending_hooks.pop(0) # Check if this is an unregister request if isinstance(item, tuple) and item[0] == 'unregister': _, hook_ptr = item yield PortalCmd(hop.HYPER_OP_UNREGISTER_SYSCALL_HOOK, addr=hook_ptr) else: hook_config, handle = item # handle is the wrapper created in the decorator original_func = getattr(handle, '_original_func', handle) # Register the syscall hook hook_ptr = yield from self._register_syscall_hook(hook_config) on_all = hook_config.get("on_all", False) is_method = hook_config.get("is_method", False) read_only = hook_config.get("read_only", False) # Store 5-tuple (on_all, handle, is_method, read_only, original_func) self._hooks[hook_ptr] = ( on_all, handle, is_method, read_only, original_func) # 1. Map specific handle (fun1, fun2) to hook_ptr self._handle_to_hook_ptr[handle] = hook_ptr # 2. Map original function to list of hook_ptrs self._func_to_hook_ptrs[original_func].append(hook_ptr) # 3. Map name to list of hook_ptrs func_name = getattr(original_func, "__name__", None) if func_name: self._name_to_hook_ptrs[func_name].append(hook_ptr) def _get_proto(self, cpu: int, sce: Any, on_all: bool) -> SyscallPrototype: """ Get the syscall prototype for a given event. Parameters ---------- cpu : int CPU id. sce : Any Syscall event object. on_all : bool Whether this is a global hook (determines caching strategy). Returns ------- SyscallPrototype The prototype for the syscall. """ # OPTIMIZATION: Tier 1 Cache - Hook Pointer (Specific Hooks Only) # If this is not a global hook, the hook pointer maps 1:1 to a syscall prototype if not on_all: hook_ptr = sce.hook.address if hook_ptr in self._hook_proto_cache: return self._hook_proto_cache[hook_ptr] # Get syscall name from the event name = sce.name # OPTIMIZATION: Tier 2 Cache - Raw Name (Global Hooks) # Avoids repeated string cleaning and dict lookups for global hooks if name in self._raw_name_proto_cache: proto = self._raw_name_proto_cache[name] # If we found it via name but we are in a specific hook, upgrade to Tier 1 if not on_all: self._hook_proto_cache[sce.hook.address] = proto return proto # Clean the syscall name cleaned_name = self._clean_syscall_name(name) # Look up the prototype by cleaned name directly using hash table (O(1) # lookup) proto = self._syscall_info_table.get(cleaned_name) # If not found, try removing architecture prefix if not proto: # Try removing everything up to and including the first underscore if '_' in cleaned_name: arch_stripped_name = cleaned_name.split('_', 1)[1] proto = self._syscall_info_table.get(arch_stripped_name) if proto: self.logger.debug( f"Found syscall after removing architecture prefix: {cleaned_name} → {arch_stripped_name}") # If still not found, create a new generic prototype with unknown args if not proto: generic_args = [("int", f"unknown{i+1}") for i in range(6)] proto = SyscallPrototype(name=f'sys_{cleaned_name}', args=generic_args) self._syscall_info_table[cleaned_name] = proto self.logger.debug( f"Syscall {name} not registered {cleaned_name=}, created generic prototype with {len(generic_args)} args") # Update caches self._raw_name_proto_cache[name] = proto if not on_all: self._hook_proto_cache[sce.hook.address] = proto return proto def _resolve_syscall_callback(self, f, is_method, hook_ptr=None): """ Resolve and bind the function or method for a syscall event. If a method is resolved, update self._hooks[hook_ptr] to store the bound method. Returns the callable (already bound if needed), or None on error. Parameters ---------- f : callable Function or method. is_method : bool Whether it's a method. hook_ptr : optional Hook pointer. Returns ------- callable or None The resolved callable. """ if is_method and hasattr(f, '__qualname__') and '.' in f.__qualname__: class_name = f.__qualname__.split('.')[0] method_name = f.__qualname__.split('.')[-1] try: instance = getattr(plugins, class_name) if instance and hasattr(instance, method_name): bound_method = getattr(instance, method_name) # Update cache for fast-path next time if hook_ptr is not None and hook_ptr in self._hooks: on_all, _, _, read_only, original_func = self._hooks[hook_ptr] self._hooks[hook_ptr] = ( on_all, bound_method, False, read_only, original_func) return bound_method else: self.logger.error( f"Could not find method {method_name} on instance") return None except AttributeError: self.logger.error( f"Could not find instance for class {class_name}") return None return f def _syscall_enter_event(self, cpu: int) -> None: """ Handler for syscall entry events from the hypervisor. Parameters ---------- cpu : int CPU id. Returns ------- None """ self._syscall_event(cpu, is_enter=True) def _syscall_return_event(self, cpu: int) -> None: """ Handler for syscall return events from the hypervisor. Parameters ---------- cpu : int CPU id. Returns ------- None """ self._syscall_event(cpu, is_enter=False) def _syscall_event(self, cpu: int, is_enter: Optional[bool] = None) -> Any: """ Handle a syscall event, dispatching to the registered callback. Parameters ---------- cpu : int CPU id. is_enter : bool, optional True if entry event, False if return event. Returns ------- Any The result of the callback. """ arg = self.panda.arch.get_arg(cpu, 1, convention="syscall") # 1. Get Event Object (No bytes serialization yet) sce = SyscallEvent(plugins.kffi.read_type_panda(cpu, arg, "syscall_event")) hook_ptr = sce.hook.address if hook_ptr not in self._hooks: return # 2. Unpack Hook Data including read_only flag # _hooks now stores (on_all, handle, is_method, read_only, original_func) entry = self._hooks[hook_ptr] on_all, f, is_method, read_only = entry[0], entry[1], entry[2], entry[3] # 1. ZERO-COPY UPGRADE original = None if not read_only: original = bytes(sce) # 4. Get Prototype (Optimized with Caching) proto = self._get_proto(cpu, sce, on_all) pt_regs_raw = yield from plugins.kffi.read_type(sce.regs.address, "pt_regs") pt_regs = get_pt_regs_wrapper(self.panda, pt_regs_raw) if on_all or proto is None or sce.argc == 0: args = (pt_regs, proto, sce) else: sysargs = [sce.args[i] for i in range(sce.argc)] args = (pt_regs, proto, sce, *sysargs) # Fast path: already bound or standard function if not is_method: result = f(*args) else: # Slow path: resolve, bind, cache fn_to_call = self._resolve_syscall_callback(f, is_method, hook_ptr) if fn_to_call: result = fn_to_call(*args) else: return if isinstance(result, Iterator): result = yield from result # 5. Write Back (Skipped if read_only) if not read_only: new = bytes(sce) if original != new: yield from plugins.mem.write_bytes(arg, new) return result def _register_syscall_hook( self, hook_config: Dict[str, Any]) -> Iterator[int]: """ Register a syscall hook with the kernel using the portal. Parameters ---------- hook_config : dict Hook configuration dictionary. Returns ------- Iterator[int] Yields the hook pointer. """ # Clone the hook config for internal storage and ensure it has enabled # flag def _parse_filter(f): """Helper to translate a ValueFilter object or primitive into a dwarffi init dict.""" if f is None: return { "enabled": False, "type": vft.SYSCALLS_HC_FILTER_EXACT, "value": 0, "min_value": 0, "max_value": 0, "bitmask": 0 } if getattr(f, "__class__", None) and f.__class__.__name__ == 'ValueFilter': res = { "enabled": True, "type": f.filter_type, "value": f.value, "min_value": f.min_value, "max_value": f.max_value, "bitmask": f.bitmask } if getattr(f, 'pattern', None): plen = len(f.pattern) # Exclude the null terminator from the pattern length consistently if f.pattern.endswith(b'\x00'): plen -= 1 res["pattern_len"] = plen res["pattern"] = yield from plugins.mem.copy_buf_guest(f.pattern) return res elif isinstance(f, (int, float)): return { "enabled": True, "type": vft.SYSCALLS_HC_FILTER_EXACT, "value": int(f), "min_value": 0, "max_value": 0, "bitmask": 0 } elif isinstance(f, str): # Simple exact match for strings encoded = f.encode('utf-8') plen = len(encoded) encoded += b'\x00' return { "enabled": True, "type": vft.SYSCALLS_HC_FILTER_STR_STARTSWITH, "value": 0, "min_value": 0, "max_value": 0, "bitmask": 0, "pattern_len": plen, "pattern": (yield from plugins.mem.copy_buf_guest(encoded)) } return {"enabled": False} raw_arg_filters = hook_config.get("arg_filters", []) or [] arg_filters_init = [] for i in range(6): # IGLOO_SYSCALL_MAXARGS f_val = raw_arg_filters[i] if i < len(raw_arg_filters) else None arg_filters_init.append((yield from _parse_filter(f_val))) procname = hook_config.get("procname", None) pid_filter = hook_config.get("pid_filter", None) init_data = { "enabled": hook_config.get("enabled", True), "on_enter": hook_config.get("on_enter", False), "on_return": hook_config.get("on_return", False), "on_all": hook_config.get("on_all", False), "name": hook_config.get("name", ""), "comm_filter_enabled": procname is not None, "comm_filter": procname or "", "pid_filter_enabled": pid_filter is not None, "filter_pid": pid_filter if pid_filter is not None else 0, # Nested structs and arrays are passed as lists of dicts "arg_filters": arg_filters_init, "retval_filter": (yield from _parse_filter(hook_config.get("retval_filter", None))) } # 4. Allocate and initialize in one call! sch = plugins.kffi.new("struct syscall_hook", init_data) # 3. ZERO-COPY UPGRADE as_bytes = bytes(sch) # Send to kernel via portal result = yield PortalCmd(hop.HYPER_OP_REGISTER_SYSCALL_HOOK, size=len(as_bytes), data=as_bytes) self._hook_info[result] = hook_config return result
[docs] def syscall( self, name_or_pattern: Optional[str] = None, on_enter: Optional[bool] = None, on_return: Optional[bool] = None, comm_filter: Optional[str] = None, arg_filters: Optional[List[Any]] = None, pid_filter: Optional[int] = None, retval_filter: Optional[Union[ValueFilter, str, int]] = None, enabled: bool = True, read_only: bool = False ) -> Callable: """ Decorator for registering syscall callbacks. Parameters ---------- name_or_pattern : str, optional Syscall name or pattern. on_enter : bool, optional Register for entry events. on_return : bool, optional Register for return events. comm_filter : str, optional Process name filter. arg_filters : list, optional Argument filters. pid_filter : int, optional PID filter. retval_filter : any, optional Return value filter. enabled : bool Whether the hook is enabled. read_only : bool Whether the hook modifies arguments (set to True to optimize). Returns ------- Callable Decorator function. """ def decorator(func): # Parse pattern if provided in the hsyscall format syscall_name = "" on_all = False on_unknown = False hook_on_enter = on_enter hook_on_return = on_return if name_or_pattern and isinstance(name_or_pattern, str): # Check if using the hsyscall-style pattern if name_or_pattern.startswith("on_"): parts = name_or_pattern.split('_') # Handle different pattern formats if len(parts) >= 3: # Format: on_sys_NAME_enter/return or # on_all/unknown_sys_enter/return if parts[1] == "all" and len( parts) >= 4 and parts[2] == "sys": on_all = True syscall_name = "" hook_on_enter = parts[3] == "enter" hook_on_return = parts[3] == "return" elif parts[1] == "unknown" and len(parts) >= 4 and parts[2] == "sys": on_unknown = True syscall_name = "" hook_on_enter = parts[3] == "enter" hook_on_return = parts[3] == "return" elif parts[1] == "sys" and len(parts) >= 4: # Format: on_sys_NAME_enter/return # Handle case where syscall name itself contains underscores # Last part is enter/return, everything between # "sys" and that is the syscall name last_part = parts[-1] if last_part in ["enter", "return"]: hook_on_enter = last_part == "enter" hook_on_return = last_part == "return" # Combine all middle parts for the syscall name syscall_name = "_".join(parts[2:-1]) else: # If it doesn't end with enter/return, treat # the whole thing as name_enter syscall_name = "_".join(parts[2:]) hook_on_enter = True hook_on_return = False else: # Fallback to treating the input as a syscall name syscall_name = name_or_pattern else: # If it doesn't start with on_, treat it as a syscall name syscall_name = name_or_pattern else: # Use the provided values for name, on_enter, on_return syscall_name = name_or_pattern if name_or_pattern else "" if syscall_name == "all": on_all = True syscall_name = "" elif syscall_name == "unknown": on_unknown = True syscall_name = "" # Ensure at least one of hook_on_enter or hook_on_return is True if not (hook_on_enter or hook_on_return): hook_on_enter = True # Default to entry if neither is specified # Detect if this is a method by checking if it has __self__ (bound method) # or if it's being called on a class (unbound method during class # definition) is_method = hasattr(func, '__self__') or ( hasattr(func, '__qualname__') and '.' in func.__qualname__) # Create hook configuration hook_config = { "name": syscall_name, "on_enter": hook_on_enter, "on_return": hook_on_return, "on_all": on_all, "on_unknown": on_unknown, "procname": comm_filter, # Use comm_filter instead of procname "arg_filters": arg_filters, # Now supports complex filtering "enabled": enabled, "callback": func, "decorator": decorator, "pid_filter": pid_filter, "retval_filter": retval_filter, # Now supports complex filtering "is_method": is_method, # Store method detection result "read_only": read_only, # Store read_only flag } # Create a unique wrapper handle for this registration @functools.wraps(func) def wrapper(*args, **kwargs): return func(*args, **kwargs) # Attach original function to wrapper for lookup wrapper._original_func = func # Add to pending hooks and queue interrupt self._pending_hooks.append((hook_config, wrapper)) plugins.portal.queue_interrupt("syscalls") return wrapper return decorator
def _queue_unregister(self, hook_ptr: int): """Helper to queue unregister command""" self._pending_hooks.append(('unregister', hook_ptr)) plugins.portal.queue_interrupt("syscalls") def _cleanup_hook_maps(self, hook_ptr: int): """Clean up local maps for a specific hook pointer""" if hook_ptr not in self._hooks: return # Unpack tuple (on_all, handle, is_method, read_only, original_func) entry = self._hooks[hook_ptr] handle = entry[1] original_func = entry[4] # 1. Remove from handle map if handle in self._handle_to_hook_ptr: del self._handle_to_hook_ptr[handle] # 2. Remove from func map list if original_func in self._func_to_hook_ptrs: if hook_ptr in self._func_to_hook_ptrs[original_func]: self._func_to_hook_ptrs[original_func].remove(hook_ptr) if not self._func_to_hook_ptrs[original_func]: del self._func_to_hook_ptrs[original_func] # 3. Remove from name map list func_name = getattr(original_func, "__name__", None) if func_name and func_name in self._name_to_hook_ptrs: if hook_ptr in self._name_to_hook_ptrs[func_name]: self._name_to_hook_ptrs[func_name].remove(hook_ptr) if not self._name_to_hook_ptrs[func_name]: del self._name_to_hook_ptrs[func_name] # 4. Remove from main hooks map del self._hooks[hook_ptr] # Queue the kernel unregistration self._queue_unregister(hook_ptr)
[docs] def unregister(self, target: Any) -> None: """ Unregister syscall hook(s). Can accept: 1. A specific handle returned by syscall() (unregisters only that hook). 2. The original function (unregisters ALL hooks for that function). 3. A string name (unregisters ALL hooks with that name). Parameters ---------- target : callable or str The handle, function, or name to unregister. """ targets_found = [] # Case 1: Specific Handle (Result of decorator) if callable(target) and target in self._handle_to_hook_ptr: targets_found.append(self._handle_to_hook_ptr[target]) # Case 2: Original Function (e.g., self.callback) # Check direct match elif callable(target) and target in self._func_to_hook_ptrs: targets_found.extend(self._func_to_hook_ptrs[target]) # Check __func__ (unbound method vs bound method match) elif callable(target) and hasattr(target, '__func__') and target.__func__ in self._func_to_hook_ptrs: targets_found.extend(self._func_to_hook_ptrs[target.__func__]) # Case 3: Name elif isinstance(target, str) and target in self._name_to_hook_ptrs: targets_found.extend(self._name_to_hook_ptrs[target]) if not targets_found: self.logger.warning( f"Could not find hook to unregister for {target}") return # Unregister all found matches for ptr in targets_found: self._cleanup_hook_maps(ptr)