Source code for pengutils.events.types

"""
Event Types
===========

This module defines the main event types for the penguin event database, each as a subclass of `Event`.
These types represent different kinds of system events (read, write, syscall, exec) and are mapped to
corresponding tables in the database using SQLAlchemy ORM.

Example usage
-------------

.. code-block:: python

    from pengutils.events.types import Read, Write, Syscall, Exec

Classes
-------

- Read: Represents a file read event.
- Write: Represents a file write event.
- Syscall: Represents a syscall event with arguments and return value.
- Exec: Represents an exec event (process execution).

Each class provides a ``__str__`` method for human-readable representation.

Table Structure
---------------

Each event type is mapped to its own table and linked to the base ``event`` table via a foreign key.

"""

from .base import Event
from sqlalchemy.orm import Mapped
from sqlalchemy.orm import mapped_column
from sqlalchemy import ForeignKey
from typing import Optional


[docs] class Read(Event): """ Read Event ========== Represents a file read event. Attributes ---------- id : int Primary key, foreign key to event.id. fd : int File descriptor read from. fname : str Name of the file read. buffer : Optional[bytes] Contents read from the file. """ __tablename__ = "read" id: Mapped[int] = mapped_column(ForeignKey("event.id"), primary_key=True) fd: Mapped[int] fname: Mapped[str] buffer: Mapped[Optional[bytes]] __mapper_args__ = { "polymorphic_identity": "read", } def __str__(self): """ Return a human-readable string representation of the read event. Returns ------- str String representation. """ buf = repr(self.buffer) if self.buffer is not None else "" return f'read({self.fd}, {self.fname}, "{buf}")'
[docs] class Write(Event): """ Write Event =========== Represents a file write event. Attributes ---------- id : int Primary key, foreign key to event.id. fd : int File descriptor written to. fname : Optional[str] Name of the file written. buffer : Optional[bytes] Contents written to the file. """ __tablename__ = "write" id: Mapped[int] = mapped_column(ForeignKey("event.id"), primary_key=True) fd: Mapped[int] fname: Mapped[Optional[str]] buffer: Mapped[Optional[bytes]] __mapper_args__ = { "polymorphic_identity": "write", } def __str__(self): """ Return a human-readable string representation of the write event. Returns ------- str String representation. """ buf = repr(self.buffer) if self.buffer is not None else "" return f'write({self.fd}, {self.fname}, "{buf}")'
[docs] class Syscall(Event): """ Syscall Event ============= Represents a syscall event, including arguments and return value. Attributes ---------- id : int Primary key, foreign key to event.id. name : str Name of the syscall. retno : Optional[int] Return value of the syscall. retno_repr : Optional[str] String representation of the return value. arg0-arg5 : Optional[int] Argument values. arg0_repr-arg5_repr : Optional[str] String representations of arguments. """ __tablename__ = "syscall" id: Mapped[int] = mapped_column(ForeignKey("event.id"), primary_key=True) name: Mapped[str] retno: Mapped[Optional[int]] retno_repr: Mapped[Optional[str]] arg0: Mapped[Optional[int]] arg0_repr: Mapped[Optional[str]] arg1: Mapped[Optional[int]] arg1_repr: Mapped[Optional[str]] arg2: Mapped[Optional[int]] arg2_repr: Mapped[Optional[str]] arg3: Mapped[Optional[int]] arg3_repr: Mapped[Optional[str]] arg4: Mapped[Optional[int]] arg4_repr: Mapped[Optional[str]] arg5: Mapped[Optional[int]] arg5_repr: Mapped[Optional[str]] __mapper_args__ = { "polymorphic_identity": "syscall", } def __str__(self): """ Return a human-readable string representation of the syscall event. Returns ------- str String representation. """ args = [] for i in range(6): arg, arg_repr = getattr(self, f"arg{i}"), getattr(self, f"arg{i}_repr") if arg is not None: args.append(f"{arg_repr}({arg:#x})") return f"{self.name}({', '.join(args)}) = {self.retno}({self.retno_repr})"
[docs] class Exec(Event): """ Exec Event ========== Represents a process execution (exec) event. Attributes ---------- id : int Primary key, foreign key to event.id. calltree : str Call tree information. argc : str Argument count. argv : str Argument values. envp : str Environment variables. euid : int Effective user ID. egid : int Effective group ID. """ __tablename__ = "exec" id: Mapped[int] = mapped_column(ForeignKey("event.id"), primary_key=True) calltree: Mapped[str] argc: Mapped[str] argv: Mapped[str] envp: Mapped[str] euid: Mapped[int] egid: Mapped[int] __mapper_args__ = { "polymorphic_identity": "exec", } def __str__(self): """ Return a human-readable string representation of the exec event. Returns ------- str String representation. """ return f'Exec: "{self.argv}" {self.calltree}'