Source code for pyplugins.loggers.rw_logger

"""
Read/Write Logger Plugin
========================

This plugin records read and write system call events to the penguin database. It hooks into the system call
return events for ``read`` and ``write``, extracts relevant details such as file descriptor, buffer content, and
process name, and stores them as ``Read`` and ``Write`` events in the database.

Purpose
-------

- Monitors file descriptor read and write operations in the guest.
- Records buffer contents, file descriptor names, and process names for each event.
- Enables later analysis of file I/O activity and data flow.

Usage
-----

Simply add this plugin by name to your config.

The plugin extracts relevant fields and stores them in the database using the ``Read`` and ``Write`` event types.
"""

from penguin import plugins, Plugin
from pengutils.events import Read, Write

syscalls = plugins.syscalls


[docs] class RWLog(Plugin): """ Plugin for logging read and write system call events to the database. Hooks into system call return events and records them as `Read` and `Write` events. """ def __init__(self) -> None: """ Initialize the RWLog plugin. - Sets up the output directory and database reference. - Registers hooks for `on_sys_write_return` and `on_sys_read_return` syscalls. **Returns:** None """ self.outdir = self.get_arg("outdir") self.DB = plugins.DB self.cbs = [] self.enable()
[docs] def enable(self): self.cbs.append(syscalls.syscall("on_sys_write_return")(self.write)) self.cbs.append(syscalls.syscall("on_sys_read_return")(self.read))
[docs] def disable(self): cbs = self.cbs self.cbs = [] for cb in cbs: syscalls.unregister(cb)
[docs] def write(self, regs, proto, syscall, fd, buf, count) -> None: """ Callback for handling write syscall return events. **Parameters:** - `regs`: CPU registers at the time of the syscall. - `proto`: Protocol or plugin-specific context. - `syscall`: Syscall number or identifier. - `fd`: File descriptor being written to. - `buf`: Buffer address containing data written. - `count`: Number of bytes written. Reads the buffer content, resolves the file descriptor name and process name, and records the event in the database as a `Write` event. **Returns:** None """ rv = syscall.retval if rv < 0: rv = count if rv <= 0: return data = yield from plugins.mem.read_bytes(buf, size=rv) signed_fd = int(self.panda.ffi.cast("target_long", fd)) fname = (yield from plugins.OSI.get_fd_name(fd)) or "?" args = yield from plugins.OSI.get_args() if args: procname = args[0] else: procname = "[???]" self.DB.add_event(Write, { "procname": procname, "fd": signed_fd, "fname": fname, "buffer": data, } )
[docs] def read(self, regs, proto, syscall, fd, buf, count) -> None: """ Callback for handling read syscall return events. **Parameters:** - `proto`: Protocol or plugin-specific context. - `syscall`: Syscall number or identifier. - `fd`: File descriptor being read from. - `buf`: Buffer address containing data read. - `count`: Number of bytes read. Reads the buffer content, resolves the file descriptor name and process name, and records the event in the database as a `Read` event. **Returns:** None """ rv = syscall.retval if rv < 0: rv = count if rv <= 0: return data = yield from plugins.mem.read_bytes(buf, rv) signed_fd = int(self.panda.ffi.cast("target_long", fd)) fname = (yield from plugins.OSI.get_fd_name(fd)) or "?" # Get name of FD, if it's valid signed_fd = int(self.panda.ffi.cast("target_long", fd)) args = yield from plugins.osi.get_args() if args: procname = args[0] else: procname = "[???]" self.DB.add_event(Read, { "procname": procname, "fd": signed_fd, "fname": fname, "buffer": data, } )