"""
HookLogger Plugin
=================
The HookLogger plugin provides a high-level, "easy mode" interface for dynamic binary instrumentation in Penguin.
It allows users to register uprobes and syscall hooks using simple, format-string-style "actions" without writing custom callback code.
It handles the complexities of:
- Architecture-specific argument retrieval (calling conventions).
- Memory dereferencing (reading strings, pointers, buffers).
- Endianness conversion (automatically detected).
- Context management (pairing entry/exit probes to capture return values).
Action String Format
--------------------
The core of HookLogger is the **Action String**, which defines what data to capture and how to format it.
**Basic Syntax:**
.. code-block:: text
[bp|break] [print] func_name(arg_fmt1, arg_fmt2, ...) [= ret_fmt1, ...]
- **`func_name`**: The name of the function or syscall (e.g., `malloc`, `sys_read`).
- **`(...)`**: A comma-separated list of formats for the function arguments (captured at Entry).
- **`= ...`**: (Optional) A comma-separated list of formats for the return value(s) (captured at Exit).
- **`bp` or `break`**: If present, triggers a Python breakpoint (PDB) when hit.
**Format Specifiers:**
| Specifier | Description | Data Source |
| :--- | :--- | :--- |
| ``%d`` / ``%i`` | Signed Integer | Register/Stack Value |
| ``%u`` | Unsigned Integer | Register/Stack Value |
| ``%x`` / ``%X`` | Hexadecimal | Register/Stack Value |
| ``%p`` | Pointer (0x...) | Register/Stack Value |
| ``%s`` | String (char*) | Dereferences pointer at **Entry** (reads null-term string) |
| ``%c`` | Character | Lower 8 bits of value |
| ``%b`` | Boolean | True/False |
| ``%fd`` | File Descriptor | Resolves FD to filename (via OSI) |
| ``%proc`` | Process Name | Current process name (via OSI) |
| ``%x64``, ``%u32``... | Memory Dump | Reads specific bit-width from pointer (8, 16, 32, 64) |
**Deferred Resolution (:out):**
By default, pointer types like ``%s`` are dereferenced immediately at function entry. However, for output parameters (buffers filled by the function), you need to capture the pointer at entry but read the memory at exit.
Append ``:out`` to any format specifier to defer resolution:
- ``%s:out``: Capture char* at entry, read string at exit.
- ``%x64:out``: Capture void* at entry, read 64 bits at exit.
**Example:** Hooking `read(fd, buf, len)` to see the data read:
.. code-block:: text
sys_read(%fd, %s:out, %d) = %d
Arguments & Filters
-------------------
When registering a hook (via Python or RemoteCtrl), the following arguments control scope and output:
- **`pid_filter`** *(int)*: Only trigger if the current PID matches this value.
- **`process_filter`** *(str)*: Only trigger if the current process name matches this string.
- **`logfile`** *(str)*: If provided, appends output to this file in the results directory. If omitted, output goes to the standard Penguin logger.
Example Usage
-------------
**Via CLI (using cli_breakpoint.py):**
.. code-block:: bash
# Basic input tracing
cli_breakpoint.py uprobe --path /lib/libc.so.6 --symbol strlen --action "print %s"
# Capture output buffer (Deferred)
cli_breakpoint.py syscall --name sys_read --action "print %fd, %s:out, %d = %d"
# Filter by PID and log to file
cli_breakpoint.py uprobe --path /bin/mybin --symbol do_work --action "print %x, %d" --pid 1234 --output work.log
**Via Python API:**
.. code-block:: python
plugins.load_plugin('hooklogger')
# Register a hook
plugins.hooklogger.register_uprobe(
path="/usr/bin/wget",
symbol="connect",
action_str="print(%fd, %p) = %d",
process_filter="wget",
logfile="connections.log"
)
"""
from penguin import Plugin, plugins
import os
import pdb
import re
import struct
from collections import defaultdict
uprobes = plugins.uprobes
syscalls = plugins.syscalls
mem = plugins.mem
osi = plugins.osi
[docs]
class HookLogger(Plugin):
"""
HookLogger Plugin
=================
The "easy mode" for dynamic instrumentation.
Transforms simple action strings (e.g., 'print(%s, %d)') into
complex, reliable uprobes and syscall hooks.
Features:
- Parses action strings to determine what data to capture.
- Automatically handles memory resolution (dereferencing pointers) at entry.
- **Deferred Resolution**: Use '%s:out' to capture a pointer at entry but read
the string at return (useful for output buffers like in read()).
- Manages context stacks to safely print return values ('func() = ret').
"""
def __init__(self):
self.next_hook_id = 1
self.hooks_by_id = {}
# Stack: { hook_id: { tid: [ [arg1_str, arg2_str], ... ] } }
self.call_stacks = defaultdict(lambda: defaultdict(list))
self.outdir = self.get_arg("outdir")
self.arch_bits = self.panda.bits
self.ptr_mask = (1 << self.arch_bits) - 1
# Detect Endianness
# PANDA arch names: mipsel/mips64el (Little), mips/mips64 (Big), ppc/ppc64 (Big), etc.
self.endian_fmt = '>' if hasattr(
self.panda, 'endianness') and self.panda.endianness == 'big' else '<'
[docs]
def list_hooks(self):
hook_list = []
for hook_id, data in self.hooks_by_id.items():
info = {
"id": hook_id,
"type": data.get('type', '?'),
"action": data.get('raw_action', '?'),
"logfile": data.get('logfile', None)
}
if 'target_desc' in data:
info['target'] = data['target_desc']
if 'filters' in data:
info['filters'] = data['filters']
hook_list.append(info)
return hook_list
[docs]
def disable_hook(self, hook_id):
if hook_id not in self.hooks_by_id:
raise ValueError(f"Hook ID {hook_id} not found")
self.logger.info(f"Disabling hook {hook_id}")
self._unregister_hook(hook_id)
return True
[docs]
def disable_all(self):
count = len(self.hooks_by_id)
self.logger.info(f"Disabling all {count} hooks")
for hid in list(self.hooks_by_id.keys()):
self._unregister_hook(hid)
self.call_stacks.clear()
return count
[docs]
def register_uprobe(self, path, symbol, action_str, pid_filter=None, process_filter=None, logfile=None):
if not path or not symbol:
raise ValueError("Missing path or symbol")
try:
target_val = int(symbol, 0)
except (ValueError, TypeError):
target_val = symbol
hook_id = self.next_hook_id
self.next_hook_id += 1
is_break = 'break' in action_str or 'bp' in action_str
arg_fmts, ret_fmts = self._parse_print_formats(action_str)
entry_method = None
if arg_fmts and len(arg_fmts) == 1:
entry_method = self._resolve_plugin_method(arg_fmts[0])
if entry_method:
arg_fmts = []
exit_method = None
if ret_fmts and len(ret_fmts) == 1:
exit_method = self._resolve_plugin_method(ret_fmts[0])
if exit_method:
ret_fmts = []
# Check if any argument format requires deferred resolution (e.g. %s:out)
has_deferred = arg_fmts and any(f.endswith(':out') for f in arg_fmts)
# We must enable return probe if:
# 1. We want to print return values (ret_fmts)
# 2. We have a custom exit method
# 3. We have deferred arguments (need to read them at exit)
is_retprobe = (ret_fmts is not None) or (
exit_method is not None) or has_deferred
if ret_fmts is None:
ret_fmts = []
prefix = f"{os.path.basename(path)}:{target_val}"
hook_data = {
'type': 'uretprobe' if is_retprobe else 'uprobe',
'raw_action': action_str,
'target_desc': f"{path}:{symbol}",
'filters': f"pid={pid_filter}" if pid_filter else "",
'logfile': logfile,
'handles': []
}
self.hooks_by_id[hook_id] = hook_data
def entry_handler(regs, **kwargs):
if hook_id not in self.hooks_by_id:
return
if entry_method:
yield from entry_method(regs)
else:
vals = []
if arg_fmts:
count = len([f for f in arg_fmts if f != '%proc'])
vals = yield from regs.get_args_portal(count, convention='userland')
resolved_args = []
if arg_fmts:
# Resolve values. If is_retprobe is True, allow deferral (:out)
resolved_args = yield from self._resolve_values(arg_fmts, vals, allow_defer=is_retprobe)
if is_retprobe:
# Even if arg_fmts is empty, we might push empty list to track call depth?
# But usually we only push if we have args.
if arg_fmts:
# Use TID/PID from kwargs (injected by uprobes) for thread safety
# The tgid_pid comes as a 64-bit int: (tgid << 32) | tid
# We just need tid for stack matching
tgid_pid = kwargs.get('tgid_pid', 0)
self.call_stacks[hook_id][tgid_pid].append(resolved_args)
else:
# Print immediately
self._log_action(resolved_args, [],
prefix, is_break, logfile)
def exit_handler(regs, **kwargs):
if hook_id not in self.hooks_by_id:
return
saved_args = []
if arg_fmts:
tgid_pid = kwargs.get('tgid_pid', 0)
stack = self.call_stacks[hook_id][tgid_pid]
if stack:
# Pop the raw/partially-resolved args
raw_saved = stack.pop()
# FINAL RESOLUTION: Check for deferred items
for arg in raw_saved:
# Identifier for deferred: tuple like ("__DEFERRED__", fmt, val)
if isinstance(arg, tuple) and len(arg) == 3 and arg[0] == "__DEFERRED__":
_, fmt, ptr = arg
# Read memory NOW at exit
val = yield from self._format_value(fmt, ptr)
saved_args.append(val)
else:
saved_args.append(arg)
else:
saved_args = ["<lost-context>"] * len(arg_fmts)
if exit_method:
yield from exit_method(regs)
else:
ret_vals = []
if ret_fmts:
try:
retval = regs.get_retval()
except Exception:
retval = 0
ret_vals = [retval]
if len(ret_fmts) > 1:
ret_vals.extend([None]*(len(ret_fmts)-1))
# Return values are always resolved immediately at exit
resolved_rets = yield from self._resolve_values(ret_fmts, ret_vals, allow_defer=False)
self._log_action(saved_args, resolved_rets,
prefix, is_break, logfile, is_ret=True)
def combined_handler(regs, is_enter=True, **kwargs):
if is_enter:
yield from entry_handler(regs, **kwargs)
else:
yield from exit_handler(regs, **kwargs)
needs_entry = (not is_retprobe) or (is_retprobe and len(
arg_fmts) > 0) or (entry_method is not None)
needs_exit = is_retprobe
# Register ONCE if both are needed
if needs_entry and needs_exit:
h = uprobes.uprobe(
path=path, symbol=target_val,
process_filter=process_filter,
pid_filter=pid_filter,
on_enter=True, on_return=True
)(combined_handler)
hook_data['handles'].append(('uprobe', h))
else:
# Fallback for single cases
if needs_entry:
h = uprobes.uprobe(
path=path, symbol=target_val,
process_filter=process_filter,
pid_filter=pid_filter,
on_enter=True, on_return=False
)(entry_handler)
hook_data['handles'].append(('uprobe', h))
if needs_exit:
h = uprobes.uprobe(
path=path, symbol=target_val,
process_filter=process_filter,
pid_filter=pid_filter,
on_enter=False, on_return=True
)(exit_handler)
hook_data['handles'].append(('uprobe', h))
target_log = logfile if logfile else "Logger"
self.logger.info(
f"HookLogger: Attached at {hook_data['target_desc']} -> {target_log}")
return hook_id
[docs]
def register_syscall(self, name, action_str, pid_filter=None, process_filter=None, logfile=None):
hook_id = self.next_hook_id
self.next_hook_id += 1
is_break = 'break' in action_str
arg_fmts, _ = self._parse_print_formats(action_str)
syscall_method = None
if arg_fmts and len(arg_fmts) == 1:
syscall_method = self._resolve_plugin_method(arg_fmts[0])
prefix = f"syscall:{name}"
self.hooks_by_id[hook_id] = {
'type': 'syscall', 'raw_action': action_str, 'name': name, 'logfile': logfile, 'handles': []
}
def handler(regs, proto, sc, *args):
if hook_id not in self.hooks_by_id:
return
if syscall_method:
yield from syscall_method(regs, proto, sc, *args)
else:
vals = []
if arg_fmts:
count = len([f for f in arg_fmts if f != '%proc'])
vals = yield from regs.get_args_portal(count, 'syscall')
# Syscalls in this mode are entry-only, so allow_defer=False
resolved_args = yield from self._resolve_values(arg_fmts, vals, allow_defer=False)
self._log_action(resolved_args, [], prefix, is_break, logfile)
h = syscalls.syscall(
name_or_pattern=name,
comm_filter=process_filter,
pid_filter=pid_filter
)(handler)
self.hooks_by_id[hook_id]['handles'].append(('syscall', h))
target_log = logfile if logfile else "Logger"
self.logger.info(
f"HookLogger: Syscall attached at {name} -> {target_log}")
return hook_id
def _unregister_hook(self, hook_id):
if hook_id not in self.hooks_by_id:
return
data = self.hooks_by_id[hook_id]
for h_type, handle in data.get('handles', []):
try:
if h_type == 'uprobe':
uprobes.unregister(handle)
elif h_type == 'syscall':
syscalls.unregister(handle)
except Exception as e:
self.logger.error(
f"Error unregistering {h_type} hook {hook_id}: {e}")
del self.hooks_by_id[hook_id]
if hook_id in self.call_stacks:
del self.call_stacks[hook_id]
# --- Helpers ---
def _resolve_plugin_method(self, name):
if '.' not in name:
return None
try:
parts = name.split('.')
if len(parts) != 2:
return None
pname, mname = parts
if not hasattr(plugins, pname):
return None
p = getattr(plugins, pname)
if hasattr(p, mname):
return getattr(p, mname)
except AttributeError:
return None
return None
def _parse_print_formats(self, action_str):
body = re.sub(r'^print\s*\(?', '', action_str,
flags=re.IGNORECASE).rstrip(')')
if '=' in body:
parts = body.split('=', 1)
arg_fmts = [f.strip() for f in parts[0].split(',') if f.strip()]
ret_fmts = [f.strip() for f in parts[1].split(',') if f.strip()]
return arg_fmts, ret_fmts
else:
fmts = [f.strip() for f in body.split(',') if f.strip()]
return fmts, None
def _resolve_values(self, fmts, vals, allow_defer=False):
"""
Resolves raw register values based on format strings.
If allow_defer is True and fmt ends with ':out', returns a deferred tuple
instead of resolving immediately.
"""
out = []
val_idx = 0
for fmt in fmts:
# Check for deferred modifier
is_deferred = False
if allow_defer and fmt.endswith(':out'):
fmt = fmt[:-4] # Strip :out
is_deferred = True
elif fmt.endswith(':out') and not allow_defer:
# User asked for deferral but we can't do it (e.g. no return probe)
# Strip it and proceed normally, maybe warn?
fmt = fmt[:-4]
if fmt == '%proc':
try:
pname = yield from osi.get_proc_name()
out.append(pname)
except Exception:
out.append("?")
else:
if val_idx < len(vals):
val = vals[val_idx]
val_idx += 1
if is_deferred:
# Store tuple for later resolution
out.append(("__DEFERRED__", fmt, val))
else:
# Resolve immediately
formatted = yield from self._format_value(fmt, val)
out.append(formatted)
else:
out.append("?")
return out
def _log_action(self, arg_strs, ret_strs, prefix, is_break, logfile=None, is_ret=False):
if is_break:
self.logger.warning(f"Dynamic Breakpoint: {prefix}")
pdb.set_trace()
return
lhs = ", ".join(arg_strs)
if is_ret:
rhs = ", ".join(ret_strs)
msg = f"{prefix}({lhs}) = {rhs}"
else:
msg = f"{prefix}({lhs})"
if logfile:
path = os.path.join(self.outdir, logfile)
try:
with open(path, "a") as f:
f.write(msg + "\n")
except Exception as e:
self.logger.error(f"Failed to write to {path}: {e}")
else:
self.logger.info(msg)
def _to_signed(self, val, bits=None):
bits = bits or self.arch_bits
if val & (1 << (bits - 1)):
return val - (1 << bits)
return val
def _format_value(self, fmt, val):
if val is None:
return "nil"
if fmt in ['%d', '%i']:
return str(self._to_signed(val))
if fmt == '%u':
return str(val)
if fmt in ['%x', '%X']:
return f"{val:x}" if fmt == '%x' else f"{val:X}"
if fmt == '%p':
return f"0x{val:0{self.arch_bits//4}x}"
if fmt == '%c':
return chr(val & 0xFF) if 32 <= (val & 0xFF) <= 126 else '.'
if fmt == '%b':
return str(bool(val))
if fmt == '%fd':
try:
name = yield from osi.get_fd_name(val)
return f"{val}({name or '?'})"
except Exception:
return f"{val}(?)"
if fmt == '%s':
try:
return f'"{yield from mem.read_str(val)}"'
except Exception:
return f"<bad-str:{val:x}>"
m = re.match(r'%(?P<t>[uix])(?P<b>8|16|32|64)', fmt)
if m:
t, b = m.group('t'), int(m.group('b'))
try:
d = yield from mem.read_bytes(val, b//8)
c = {1: 'B', 2: 'H', 4: 'I', 8: 'Q'}[b//8]
if t == 'i':
c = c.lower()
v = struct.unpack(f"{self.endian_fmt}{c}", d)[0]
return f"{v:x}" if t == 'x' else str(v)
except Exception:
return f"<bad-mem:{val:x}>"
return f"{val:x}(?)"