"""
.. include:: /docs/syscalls.md
:parser: myst_parser.sphinx_
"""
from penguin import plugins, Plugin
import json
from typing import Dict, List, Any, Callable, Optional, Iterator, Union
from hyper.consts import value_filter_type as vft
from hyper.consts import igloo_hypercall_constants as iconsts
from hyper.consts import igloo_base_hypercalls as bconsts
from hyper.portal import PortalCmd
from wrappers.ptregs_wrap import get_pt_regs_wrapper
import functools
from collections import defaultdict
from hyper.consts import HYPER_OP as hop
# allows us to not include SyscallPrototype which is internal
__all__ = [
"ValueFilter",
"Syscalls"
]
class SyscallEvent:
__slots__ = ('_sce', 'name')
def __init__(self, sce):
self._sce = sce
# 1. Fast C-string extraction natively supported by dwarffi.
# Calling bytes() on the array field does a direct buffer slice
# (equivalent to your old start:start+size math) without the boilerplate.
raw_bytes = bytes(sce.syscall_name)
# 2. Parse the C-string
self.name = raw_bytes.split(b'\x00', 1)[0].decode('utf-8', errors='replace')
def __getattr__(self, attr):
# 3. Transparently pass through any standard field accesses
# (e.g., event.orig_x0) to the underlying dwarffi instance.
return getattr(self._sce, attr)
def __bytes__(self) -> bytes:
# 4. Allow bytes() to be called on the wrapper to get the raw bytes for writing back to memory.
return bytes(self._sce)
def __setattr__(self, attr, value):
# If the attribute belongs to the wrapper, set it locally
if attr in self.__slots__:
object.__setattr__(self, attr, value)
else:
# Otherwise, forward the write to the underlying dwarffi instance
setattr(self._sce, attr, value)
@property
def size(self) -> int:
# Expose the size from the underlying dwarffi BoundTypeInstance
return self._sce._instance_type_def.size
[docs]
class ValueFilter:
"""
ValueFilter
===========
Represents a complex value filter for syscall arguments or return values.
Attributes
----------
filter_type : int
The type of filter.
value : int
The value for the filter.
min_value : int
The minimum value for range filters.
max_value : int
The maximum value for range filters.
bitmask : int
The bitmask for bitmask filters.
"""
__slots__ = ('filter_type', 'value', 'min_value', 'max_value', 'bitmask', 'pattern')
def __init__(self, filter_type: int = vft.SYSCALLS_HC_FILTER_EXACT, value: int = 0,
min_value: int = 0, max_value: int = 0, bitmask: int = 0,
pattern: Optional[bytes] = None) -> None:
"""
Initialize a ValueFilter.
Parameters
----------
filter_type : int
The type of filter.
value : int
The value for the filter.
min_value : int
The minimum value for range filters.
max_value : int
The maximum value for range filters.
bitmask : int
The bitmask for bitmask filters.
"""
self.filter_type = filter_type
self.value = value
self.min_value = min_value
self.max_value = max_value
self.bitmask = bitmask
self.pattern = pattern
@classmethod
def _encode_pattern(cls, pattern: str) -> bytes:
"""
Encodes a string pattern to bytes with a null terminator.
"""
if pattern is None:
return None
# Ensure it's null-terminated for C-string safety
return pattern.encode('utf-8') + b'\x00'
[docs]
@classmethod
def exact(cls, value: int) -> "ValueFilter":
"""
Create an exact match filter.
Parameters
----------
value : int
The value to match.
Returns
-------
ValueFilter
An exact match filter.
"""
return cls(filter_type=vft.SYSCALLS_HC_FILTER_EXACT, value=value)
[docs]
@classmethod
def greater(cls, value: int) -> "ValueFilter":
"""
Create a greater than filter.
Parameters
----------
value : int
The value to compare.
Returns
-------
ValueFilter
A greater than filter.
"""
return cls(filter_type=vft.SYSCALLS_HC_FILTER_GREATER, value=value)
[docs]
@classmethod
def greater_equal(cls, value: int) -> "ValueFilter":
"""
Create a greater than or equal filter.
Parameters
----------
value : int
The value to compare.
Returns
-------
ValueFilter
A greater than or equal filter.
"""
return cls(filter_type=vft.SYSCALLS_HC_FILTER_GREATER_EQUAL, value=value)
[docs]
@classmethod
def less(cls, value: int) -> "ValueFilter":
"""
Create a less than filter.
Parameters
----------
value : int
The value to compare.
Returns
-------
ValueFilter
A less than filter.
"""
return cls(filter_type=vft.SYSCALLS_HC_FILTER_LESS, value=value)
[docs]
@classmethod
def less_equal(cls, value: int) -> "ValueFilter":
"""
Create a less than or equal filter.
Parameters
----------
value : int
The value to compare.
Returns
-------
ValueFilter
A less than or equal filter.
"""
return cls(filter_type=vft.SYSCALLS_HC_FILTER_LESS_EQUAL, value=value)
[docs]
@classmethod
def not_equal(cls, value: int) -> "ValueFilter":
"""
Create a not equal filter.
Parameters
----------
value : int
The value to compare.
Returns
-------
ValueFilter
A not equal filter.
"""
return cls(filter_type=vft.SYSCALLS_HC_FILTER_NOT_EQUAL, value=value)
[docs]
@classmethod
def range(cls, min_value: int, max_value: int) -> "ValueFilter":
"""
Create a range filter.
Parameters
----------
min_value : int
The minimum value.
max_value : int
The maximum value.
Returns
-------
ValueFilter
A range filter.
"""
return cls(filter_type=vft.SYSCALLS_HC_FILTER_RANGE,
min_value=min_value, max_value=max_value)
[docs]
@classmethod
def success(cls) -> "ValueFilter":
"""
Create a success filter (>= 0).
Returns
-------
ValueFilter
A success filter.
"""
return cls(filter_type=vft.SYSCALLS_HC_FILTER_SUCCESS)
[docs]
@classmethod
def error(cls) -> "ValueFilter":
"""
Create an error filter (< 0).
Returns
-------
ValueFilter
An error filter.
"""
return cls(filter_type=vft.SYSCALLS_HC_FILTER_ERROR)
[docs]
@classmethod
def bitmask_set(cls, bitmask: int) -> "ValueFilter":
"""
Create a bitmask set filter.
Parameters
----------
bitmask : int
The bitmask to set.
Returns
-------
ValueFilter
A bitmask set filter.
"""
return cls(filter_type=vft.SYSCALLS_HC_FILTER_BITMASK_SET,
bitmask=bitmask)
[docs]
@classmethod
def bitmask_clear(cls, bitmask: int) -> "ValueFilter":
"""
Create a bitmask clear filter.
Parameters
----------
bitmask : int
The bitmask to clear.
Returns
-------
ValueFilter
A bitmask clear filter.
"""
return cls(filter_type=vft.SYSCALLS_HC_FILTER_BITMASK_CLEAR,
bitmask=bitmask)
[docs]
@classmethod
def string_exact(cls, pattern: str) -> "ValueFilter":
"""
Create an exact string match filter.
"""
return cls(filter_type=vft.SYSCALLS_HC_FILTER_STR_EXACT,
pattern=cls._encode_pattern(pattern))
[docs]
@classmethod
def string_contains(cls, pattern: str) -> "ValueFilter":
"""
Create a string contains filter.
"""
return cls(filter_type=vft.SYSCALLS_HC_FILTER_STR_CONTAINS,
pattern=cls._encode_pattern(pattern))
[docs]
@classmethod
def string_startswith(cls, pattern: str) -> "ValueFilter":
"""
Create a string starts-with filter.
"""
return cls(filter_type=vft.SYSCALLS_HC_FILTER_STR_STARTSWITH,
pattern=cls._encode_pattern(pattern))
[docs]
@classmethod
def string_endswith(cls, pattern: str) -> "ValueFilter":
"""
Create a string ends-with filter.
"""
return cls(filter_type=vft.SYSCALLS_HC_FILTER_STR_ENDSWITH,
pattern=cls._encode_pattern(pattern))
class SyscallPrototype:
"""
SyscallPrototype
================
Represents syscall metadata, including argument types and names.
This is an internal class used by the Syscalls plugin to store syscall information.
Attributes
----------
name : str
The syscall name.
types : list
List of argument types.
names : list
List of argument names.
args : list
List of argument type/name pairs.
unknown_args : bool
Whether the arguments are unknown.
nargs : int
Number of arguments.
"""
__slots__ = ('name', 'types', 'names', 'args', 'unknown_args', 'nargs')
def __init__(self, name: str,
args: Optional[List[Any]] = None, unknown_args: bool = False) -> None:
"""
Initialize a SyscallPrototype.
Parameters
----------
name : str
The syscall name.
args : list, optional
List of argument type/name pairs.
unknown_args : bool
Whether the arguments are unknown.
"""
# Store the name as the primary identifier
self.name = name
self.types = []
self.names = []
self.args = args
self.unknown_args = unknown_args
if args:
for i, j in args:
self.types.append(i)
self.names.append(j)
self.nargs = len(self.types)
def __repr__(self) -> str:
"""
Return a string representation of the SyscallPrototype.
Returns
-------
str
String representation.
"""
return f"<SyscallPrototype name='{self.name}' nargs={self.nargs}>"
[docs]
class Syscalls(Plugin):
"""
Syscalls Plugin
===============
Provides an interface to monitor and intercept system calls.
Uses the portal's interrupt mechanism to handle registration and event delivery.
"""
def __init__(self) -> None:
"""
Initialize the Syscalls plugin.
Registers with portal and sets up syscall metadata.
"""
self.outdir = self.get_arg("outdir")
# Map hook pointers to callbacks
# Maps hook pointers to (on_all, callback_func, is_method, read_only) tuples
self._hooks: Dict[int, tuple] = {}
self._hook_info = {}
# Track function -> hook_ptr and name -> hook_ptr for easier lookup
# Track mappings for unregistering
# specific handle -> hook_ptr
self._handle_to_hook_ptr: Dict[Callable, int] = {}
self._func_to_hook_ptrs: Dict[Callable, List[int]] = defaultdict(
list) # original func -> [hook_ptrs]
self._name_to_hook_ptrs: Dict[str, List[int]] = defaultdict(
list) # name -> [hook_ptrs]
# Syscall type information - using dictionary for fast name-based
# lookups
self._syscall_info_table: Dict[str, SyscallPrototype] = {}
# OPTIMIZATION: Raw Name Cache (read -> Proto)
# Prevents re-cleaning strings for global hooks
self._raw_name_proto_cache: Dict[str, SyscallPrototype] = {}
# OPTIMIZATION: Hook Ptr Cache (0x1234 -> Proto)
# Fast path for specific hooks
self._hook_proto_cache: Dict[int, SyscallPrototype] = {}
# Register with portal's interrupt handler system
plugins.portal.register_interrupt_handler(
"syscalls", self._syscall_interrupt_handler)
# Register handlers for syscall setup and events
self.panda.hypercall(bconsts.IGLOO_HYP_SETUP_SYSCALL)(
self._setup_syscall_handler)
# Register syscall enter/return hypercalls
self.panda.hypercall(iconsts.IGLOO_HYP_SYSCALL_ENTER)(
self._syscall_enter_event)
self.panda.hypercall(iconsts.IGLOO_HYP_SYSCALL_RETURN)(
self._syscall_return_event)
# Add a queue for pending hook registrations
self._pending_hooks = []
self._syscall_event = plugins.portal.wrap(self._syscall_event)
def _setup_syscall_handler(self, cpu: int) -> None:
"""
Handler for setting up syscall definitions.
Parameters
----------
cpu : int
CPU id.
Returns
-------
None
"""
reg1 = self.panda.arch.get_arg(cpu, 1, convention="syscall")
# Read the JSON string at reg1
json_str = plugins.mem.read_str_panda(cpu, reg1)
syscall_data = json.loads(json_str)
# Clean the syscall name
name = syscall_data["name"]
clean_name = self._clean_syscall_name(name)
unknown_args = syscall_data.get("args", "") == "unknown"
if unknown_args:
args = []
else:
args = syscall_data.get("args") or []
# Create prototype without syscall_nr
sysinfo = SyscallPrototype(
name=f'sys_{clean_name}',
args=args,
unknown_args=unknown_args
)
# Store by cleaned name in the hash table
if unknown_args and clean_name in self._syscall_info_table:
self.logger.debug(
f"Syscall {name} is a compat syscall, skipping it for a better match")
else:
self._syscall_info_table[clean_name] = sysinfo
# Clear caches that rely on names if we update definitions
self._raw_name_proto_cache.clear()
self.logger.debug(
f"Registered syscall {name} (cleaned: {clean_name})")
def _clean_syscall_name(self, name: str) -> str:
"""
Clean a syscall name.
Removes various prefixes and architecture-specific parts.
Parameters
----------
name : str
The syscall name.
Returns
-------
str
Cleaned syscall name.
"""
if name.startswith("compat_"):
name = name[7:]
# First remove sys_ prefix if present
if name.startswith("sys_"):
name = name[4:]
# Then remove any remaining leading underscores
name = name.lstrip("_")
return name
def _syscall_interrupt_handler(self) -> bool:
"""
Handle interrupts from the portal for syscall hook registration.
Returns
-------
bool
True if more hooks are pending, False otherwise.
"""
# Always ensure we yield at least once so the function returns a proper
# generator
if not self._pending_hooks:
self.logger.debug("No pending syscall hooks to register")
# Yield a no-op operation to ensure this is always an iterator
return False
pending_hooks = self._pending_hooks[:]
self._pending_hooks = []
while pending_hooks:
# Take one item from the queue
item = pending_hooks.pop(0)
# Check if this is an unregister request
if isinstance(item, tuple) and item[0] == 'unregister':
_, hook_ptr = item
yield PortalCmd(hop.HYPER_OP_UNREGISTER_SYSCALL_HOOK, addr=hook_ptr)
else:
hook_config, handle = item
# handle is the wrapper created in the decorator
original_func = getattr(handle, '_original_func', handle)
# Register the syscall hook
hook_ptr = yield from self._register_syscall_hook(hook_config)
on_all = hook_config.get("on_all", False)
is_method = hook_config.get("is_method", False)
read_only = hook_config.get("read_only", False)
# Store 5-tuple (on_all, handle, is_method, read_only, original_func)
self._hooks[hook_ptr] = (
on_all, handle, is_method, read_only, original_func)
# 1. Map specific handle (fun1, fun2) to hook_ptr
self._handle_to_hook_ptr[handle] = hook_ptr
# 2. Map original function to list of hook_ptrs
self._func_to_hook_ptrs[original_func].append(hook_ptr)
# 3. Map name to list of hook_ptrs
func_name = getattr(original_func, "__name__", None)
if func_name:
self._name_to_hook_ptrs[func_name].append(hook_ptr)
def _get_proto(self, cpu: int, sce: Any, on_all: bool) -> SyscallPrototype:
"""
Get the syscall prototype for a given event.
Parameters
----------
cpu : int
CPU id.
sce : Any
Syscall event object.
on_all : bool
Whether this is a global hook (determines caching strategy).
Returns
-------
SyscallPrototype
The prototype for the syscall.
"""
# OPTIMIZATION: Tier 1 Cache - Hook Pointer (Specific Hooks Only)
# If this is not a global hook, the hook pointer maps 1:1 to a syscall prototype
if not on_all:
hook_ptr = sce.hook.address
if hook_ptr in self._hook_proto_cache:
return self._hook_proto_cache[hook_ptr]
# Get syscall name from the event
name = sce.name
# OPTIMIZATION: Tier 2 Cache - Raw Name (Global Hooks)
# Avoids repeated string cleaning and dict lookups for global hooks
if name in self._raw_name_proto_cache:
proto = self._raw_name_proto_cache[name]
# If we found it via name but we are in a specific hook, upgrade to Tier 1
if not on_all:
self._hook_proto_cache[sce.hook.address] = proto
return proto
# Clean the syscall name
cleaned_name = self._clean_syscall_name(name)
# Look up the prototype by cleaned name directly using hash table (O(1)
# lookup)
proto = self._syscall_info_table.get(cleaned_name)
# If not found, try removing architecture prefix
if not proto:
# Try removing everything up to and including the first underscore
if '_' in cleaned_name:
arch_stripped_name = cleaned_name.split('_', 1)[1]
proto = self._syscall_info_table.get(arch_stripped_name)
if proto:
self.logger.debug(
f"Found syscall after removing architecture prefix: {cleaned_name} → {arch_stripped_name}")
# If still not found, create a new generic prototype with unknown args
if not proto:
generic_args = [("int", f"unknown{i+1}") for i in range(6)]
proto = SyscallPrototype(name=f'sys_{cleaned_name}', args=generic_args)
self._syscall_info_table[cleaned_name] = proto
self.logger.debug(
f"Syscall {name} not registered {cleaned_name=}, created generic prototype with {len(generic_args)} args")
# Update caches
self._raw_name_proto_cache[name] = proto
if not on_all:
self._hook_proto_cache[sce.hook.address] = proto
return proto
def _resolve_syscall_callback(self, f, is_method, hook_ptr=None):
"""
Resolve and bind the function or method for a syscall event.
If a method is resolved, update self._hooks[hook_ptr] to store the bound method.
Returns the callable (already bound if needed), or None on error.
Parameters
----------
f : callable
Function or method.
is_method : bool
Whether it's a method.
hook_ptr : optional
Hook pointer.
Returns
-------
callable or None
The resolved callable.
"""
if is_method and hasattr(f, '__qualname__') and '.' in f.__qualname__:
class_name = f.__qualname__.split('.')[0]
method_name = f.__qualname__.split('.')[-1]
try:
instance = getattr(plugins, class_name)
if instance and hasattr(instance, method_name):
bound_method = getattr(instance, method_name)
# Update cache for fast-path next time
if hook_ptr is not None and hook_ptr in self._hooks:
on_all, _, _, read_only, original_func = self._hooks[hook_ptr]
self._hooks[hook_ptr] = (
on_all, bound_method, False, read_only, original_func)
return bound_method
else:
self.logger.error(
f"Could not find method {method_name} on instance")
return None
except AttributeError:
self.logger.error(
f"Could not find instance for class {class_name}")
return None
return f
def _syscall_enter_event(self, cpu: int) -> None:
"""
Handler for syscall entry events from the hypervisor.
Parameters
----------
cpu : int
CPU id.
Returns
-------
None
"""
self._syscall_event(cpu, is_enter=True)
def _syscall_return_event(self, cpu: int) -> None:
"""
Handler for syscall return events from the hypervisor.
Parameters
----------
cpu : int
CPU id.
Returns
-------
None
"""
self._syscall_event(cpu, is_enter=False)
def _syscall_event(self, cpu: int, is_enter: Optional[bool] = None) -> Any:
"""
Handle a syscall event, dispatching to the registered callback.
Parameters
----------
cpu : int
CPU id.
is_enter : bool, optional
True if entry event, False if return event.
Returns
-------
Any
The result of the callback.
"""
arg = self.panda.arch.get_arg(cpu, 1, convention="syscall")
# 1. Get Event Object (No bytes serialization yet)
sce = SyscallEvent(plugins.kffi.read_type_panda(cpu, arg, "syscall_event"))
hook_ptr = sce.hook.address
if hook_ptr not in self._hooks:
return
# 2. Unpack Hook Data including read_only flag
# _hooks now stores (on_all, handle, is_method, read_only, original_func)
entry = self._hooks[hook_ptr]
on_all, f, is_method, read_only = entry[0], entry[1], entry[2], entry[3]
# 1. ZERO-COPY UPGRADE
original = None
if not read_only:
original = bytes(sce)
# 4. Get Prototype (Optimized with Caching)
proto = self._get_proto(cpu, sce, on_all)
pt_regs_raw = yield from plugins.kffi.read_type(sce.regs.address, "pt_regs")
pt_regs = get_pt_regs_wrapper(self.panda, pt_regs_raw)
if on_all or proto is None or sce.argc == 0:
args = (pt_regs, proto, sce)
else:
sysargs = [sce.args[i] for i in range(sce.argc)]
args = (pt_regs, proto, sce, *sysargs)
# Fast path: already bound or standard function
if not is_method:
result = f(*args)
else:
# Slow path: resolve, bind, cache
fn_to_call = self._resolve_syscall_callback(f, is_method, hook_ptr)
if fn_to_call:
result = fn_to_call(*args)
else:
return
if isinstance(result, Iterator):
result = yield from result
# 5. Write Back (Skipped if read_only)
if not read_only:
new = bytes(sce)
if original != new:
yield from plugins.mem.write_bytes(arg, new)
return result
def _register_syscall_hook(
self, hook_config: Dict[str, Any]) -> Iterator[int]:
"""
Register a syscall hook with the kernel using the portal.
Parameters
----------
hook_config : dict
Hook configuration dictionary.
Returns
-------
Iterator[int]
Yields the hook pointer.
"""
# Clone the hook config for internal storage and ensure it has enabled
# flag
def _parse_filter(f):
"""Helper to translate a ValueFilter object or primitive into a dwarffi init dict."""
if f is None:
return {
"enabled": False,
"type": vft.SYSCALLS_HC_FILTER_EXACT,
"value": 0,
"min_value": 0,
"max_value": 0,
"bitmask": 0
}
if getattr(f, "__class__", None) and f.__class__.__name__ == 'ValueFilter':
res = {
"enabled": True,
"type": f.filter_type,
"value": f.value,
"min_value": f.min_value,
"max_value": f.max_value,
"bitmask": f.bitmask
}
if getattr(f, 'pattern', None):
plen = len(f.pattern)
# Exclude the null terminator from the pattern length consistently
if f.pattern.endswith(b'\x00'):
plen -= 1
res["pattern_len"] = plen
res["pattern"] = yield from plugins.mem.copy_buf_guest(f.pattern)
return res
elif isinstance(f, (int, float)):
return {
"enabled": True,
"type": vft.SYSCALLS_HC_FILTER_EXACT,
"value": int(f),
"min_value": 0,
"max_value": 0,
"bitmask": 0
}
elif isinstance(f, str):
# Simple exact match for strings
encoded = f.encode('utf-8')
plen = len(encoded)
encoded += b'\x00'
return {
"enabled": True,
"type": vft.SYSCALLS_HC_FILTER_STR_STARTSWITH,
"value": 0,
"min_value": 0,
"max_value": 0,
"bitmask": 0,
"pattern_len": plen,
"pattern": (yield from plugins.mem.copy_buf_guest(encoded))
}
return {"enabled": False}
raw_arg_filters = hook_config.get("arg_filters", []) or []
arg_filters_init = []
for i in range(6): # IGLOO_SYSCALL_MAXARGS
f_val = raw_arg_filters[i] if i < len(raw_arg_filters) else None
arg_filters_init.append((yield from _parse_filter(f_val)))
procname = hook_config.get("procname", None)
pid_filter = hook_config.get("pid_filter", None)
init_data = {
"enabled": hook_config.get("enabled", True),
"on_enter": hook_config.get("on_enter", False),
"on_return": hook_config.get("on_return", False),
"on_all": hook_config.get("on_all", False),
"name": hook_config.get("name", ""),
"comm_filter_enabled": procname is not None,
"comm_filter": procname or "",
"pid_filter_enabled": pid_filter is not None,
"filter_pid": pid_filter if pid_filter is not None else 0,
# Nested structs and arrays are passed as lists of dicts
"arg_filters": arg_filters_init,
"retval_filter": (yield from _parse_filter(hook_config.get("retval_filter", None)))
}
# 4. Allocate and initialize in one call!
sch = plugins.kffi.new("struct syscall_hook", init_data)
# 3. ZERO-COPY UPGRADE
as_bytes = bytes(sch)
# Send to kernel via portal
result = yield PortalCmd(hop.HYPER_OP_REGISTER_SYSCALL_HOOK, size=len(as_bytes), data=as_bytes)
self._hook_info[result] = hook_config
return result
[docs]
def syscall(
self,
name_or_pattern: Optional[str] = None,
on_enter: Optional[bool] = None,
on_return: Optional[bool] = None,
comm_filter: Optional[str] = None,
arg_filters: Optional[List[Any]] = None,
pid_filter: Optional[int] = None,
retval_filter: Optional[Union[ValueFilter, str, int]] = None,
enabled: bool = True,
read_only: bool = False
) -> Callable:
"""
Decorator for registering syscall callbacks.
Parameters
----------
name_or_pattern : str, optional
Syscall name or pattern.
on_enter : bool, optional
Register for entry events.
on_return : bool, optional
Register for return events.
comm_filter : str, optional
Process name filter.
arg_filters : list, optional
Argument filters.
pid_filter : int, optional
PID filter.
retval_filter : any, optional
Return value filter.
enabled : bool
Whether the hook is enabled.
read_only : bool
Whether the hook modifies arguments (set to True to optimize).
Returns
-------
Callable
Decorator function.
"""
def decorator(func):
# Parse pattern if provided in the hsyscall format
syscall_name = ""
on_all = False
on_unknown = False
hook_on_enter = on_enter
hook_on_return = on_return
if name_or_pattern and isinstance(name_or_pattern, str):
# Check if using the hsyscall-style pattern
if name_or_pattern.startswith("on_"):
parts = name_or_pattern.split('_')
# Handle different pattern formats
if len(parts) >= 3:
# Format: on_sys_NAME_enter/return or
# on_all/unknown_sys_enter/return
if parts[1] == "all" and len(
parts) >= 4 and parts[2] == "sys":
on_all = True
syscall_name = ""
hook_on_enter = parts[3] == "enter"
hook_on_return = parts[3] == "return"
elif parts[1] == "unknown" and len(parts) >= 4 and parts[2] == "sys":
on_unknown = True
syscall_name = ""
hook_on_enter = parts[3] == "enter"
hook_on_return = parts[3] == "return"
elif parts[1] == "sys" and len(parts) >= 4:
# Format: on_sys_NAME_enter/return
# Handle case where syscall name itself contains underscores
# Last part is enter/return, everything between
# "sys" and that is the syscall name
last_part = parts[-1]
if last_part in ["enter", "return"]:
hook_on_enter = last_part == "enter"
hook_on_return = last_part == "return"
# Combine all middle parts for the syscall name
syscall_name = "_".join(parts[2:-1])
else:
# If it doesn't end with enter/return, treat
# the whole thing as name_enter
syscall_name = "_".join(parts[2:])
hook_on_enter = True
hook_on_return = False
else:
# Fallback to treating the input as a syscall name
syscall_name = name_or_pattern
else:
# If it doesn't start with on_, treat it as a syscall name
syscall_name = name_or_pattern
else:
# Use the provided values for name, on_enter, on_return
syscall_name = name_or_pattern if name_or_pattern else ""
if syscall_name == "all":
on_all = True
syscall_name = ""
elif syscall_name == "unknown":
on_unknown = True
syscall_name = ""
# Ensure at least one of hook_on_enter or hook_on_return is True
if not (hook_on_enter or hook_on_return):
hook_on_enter = True # Default to entry if neither is specified
# Detect if this is a method by checking if it has __self__ (bound method)
# or if it's being called on a class (unbound method during class
# definition)
is_method = hasattr(func, '__self__') or (
hasattr(func, '__qualname__') and '.' in func.__qualname__)
# Create hook configuration
hook_config = {
"name": syscall_name,
"on_enter": hook_on_enter,
"on_return": hook_on_return,
"on_all": on_all,
"on_unknown": on_unknown,
"procname": comm_filter, # Use comm_filter instead of procname
"arg_filters": arg_filters, # Now supports complex filtering
"enabled": enabled,
"callback": func,
"decorator": decorator,
"pid_filter": pid_filter,
"retval_filter": retval_filter, # Now supports complex filtering
"is_method": is_method, # Store method detection result
"read_only": read_only, # Store read_only flag
}
# Create a unique wrapper handle for this registration
@functools.wraps(func)
def wrapper(*args, **kwargs):
return func(*args, **kwargs)
# Attach original function to wrapper for lookup
wrapper._original_func = func
# Add to pending hooks and queue interrupt
self._pending_hooks.append((hook_config, wrapper))
plugins.portal.queue_interrupt("syscalls")
return wrapper
return decorator
def _queue_unregister(self, hook_ptr: int):
"""Helper to queue unregister command"""
self._pending_hooks.append(('unregister', hook_ptr))
plugins.portal.queue_interrupt("syscalls")
def _cleanup_hook_maps(self, hook_ptr: int):
"""Clean up local maps for a specific hook pointer"""
if hook_ptr not in self._hooks:
return
# Unpack tuple (on_all, handle, is_method, read_only, original_func)
entry = self._hooks[hook_ptr]
handle = entry[1]
original_func = entry[4]
# 1. Remove from handle map
if handle in self._handle_to_hook_ptr:
del self._handle_to_hook_ptr[handle]
# 2. Remove from func map list
if original_func in self._func_to_hook_ptrs:
if hook_ptr in self._func_to_hook_ptrs[original_func]:
self._func_to_hook_ptrs[original_func].remove(hook_ptr)
if not self._func_to_hook_ptrs[original_func]:
del self._func_to_hook_ptrs[original_func]
# 3. Remove from name map list
func_name = getattr(original_func, "__name__", None)
if func_name and func_name in self._name_to_hook_ptrs:
if hook_ptr in self._name_to_hook_ptrs[func_name]:
self._name_to_hook_ptrs[func_name].remove(hook_ptr)
if not self._name_to_hook_ptrs[func_name]:
del self._name_to_hook_ptrs[func_name]
# 4. Remove from main hooks map
del self._hooks[hook_ptr]
# Queue the kernel unregistration
self._queue_unregister(hook_ptr)
[docs]
def unregister(self, target: Any) -> None:
"""
Unregister syscall hook(s).
Can accept:
1. A specific handle returned by syscall() (unregisters only that hook).
2. The original function (unregisters ALL hooks for that function).
3. A string name (unregisters ALL hooks with that name).
Parameters
----------
target : callable or str
The handle, function, or name to unregister.
"""
targets_found = []
# Case 1: Specific Handle (Result of decorator)
if callable(target) and target in self._handle_to_hook_ptr:
targets_found.append(self._handle_to_hook_ptr[target])
# Case 2: Original Function (e.g., self.callback)
# Check direct match
elif callable(target) and target in self._func_to_hook_ptrs:
targets_found.extend(self._func_to_hook_ptrs[target])
# Check __func__ (unbound method vs bound method match)
elif callable(target) and hasattr(target, '__func__') and target.__func__ in self._func_to_hook_ptrs:
targets_found.extend(self._func_to_hook_ptrs[target.__func__])
# Case 3: Name
elif isinstance(target, str) and target in self._name_to_hook_ptrs:
targets_found.extend(self._name_to_hook_ptrs[target])
if not targets_found:
self.logger.warning(
f"Could not find hook to unregister for {target}")
return
# Unregister all found matches
for ptr in targets_found:
self._cleanup_hook_maps(ptr)