Source code for pengutils.utils.cli_db

"""
Database CLI
============

This script consolidates various database query utilities into a single command-line interface.
It allows querying execution events, file descriptor usage, file reads/writes, syscalls, and unique tasks (process names) from the Penguin RemoteCtrl Plugin database.

Example usage
-------------

.. code-block:: bash

    # Query tasks
    cli_db.py tasks --results ./results/latest

    # Query execs
    cli_db.py execs --procname myproc --fd 3

    # Query syscalls
    cli_db.py syscalls --errors

    # Query reads/writes
    cli_db.py reads --filename config.txt
    cli_db.py writes --fd 1

    # Query unique FDs
    cli_db.py fds --follow

Options
-------
Common options:
- ``--results``: Path to results folder (default: ``./results/latest/``)
- ``--output``: Output file (default: ``/dev/stdout``)

See individual commands for specific filters and options.
"""

import click
from sqlalchemy import create_engine, func
from sqlalchemy.orm import Session
from rich import print as rprint
from time import sleep
from os.path import join, exists

from pengutils.events import Event, Exec, Read, Syscall, Write
from pengutils.utils.util_base import wrapper, get_default_results_path

# --- Filter Helpers ---


[docs] def exec_filter(sess, procname, fd, filename): query = sess.query(Exec) if procname: query = query.filter(Exec.procname.contains(procname)) if fd: query = query.filter(Exec.fd == fd) if filename: query = query.filter(Exec.fname.contains(filename)) return query
[docs] def read_filter(sess, procname, fd, filename): query = sess.query(Read) if procname: query = query.filter(Read.procname.contains(procname)) if fd: query = query.filter(Read.fd == fd) if filename: query = query.filter(Read.fname.contains(filename)) return query
[docs] def write_filter(sess, procname, fd, filename): query = sess.query(Write) if procname: query = query.filter(Write.procname.contains(procname)) if fd: query = query.filter(Write.fd == fd) if filename: query = query.filter(Write.fname.contains(filename)) return query
[docs] def syscall_filter(sess, procname, syscall, errors): query = sess.query(Syscall) if procname: query = query.filter(Syscall.procname.contains(procname)) if syscall: if not syscall.startswith("sys_"): syscall = "sys_" + syscall query = query.filter(Syscall.name.contains(syscall)) if errors: query = query.filter(Syscall.retno < 0) return query
# --- CLI Group --- @click.group(name="db") def db_cli(): """Database query commands.""" pass # --- Commands --- @db_cli.command() @click.option("--results", default=get_default_results_path(), help="Path to results folder") @click.option("--procname", default=None, help="Process name to filter for (looks for substring)") @click.option("--follow", default=False, help="Show latest results as they appear", is_flag=True) @click.option("--fd", default=None, help="Filter for file descriptor") @click.option("--filename", default=None, help="Filter for file name") @click.option("--output", default="/dev/stdout", help="Output to file instead of stdout") def execs(results, procname, follow, fd, filename, output): """Query execution events.""" print_procname = procname is None args = (procname, fd, filename) wrapper(results, output, print_procname, follow, exec_filter, args) @db_cli.command() @click.option("--results", default=get_default_results_path(), help="Path to results folder") @click.option("--procname", default=None, help="Process name to filter for (looks for substring)") @click.option("--follow", default=False, help="Show latest results as they appear", is_flag=True) @click.option("--fd", default=None, help="file descriptor number to filter") @click.option("--output", default="/dev/stdout", help="Output to file instead of stdout") def fds(results, procname, follow, fd, output): """Query file descriptor write events (unique combinations).""" db_path = join(results, "plugins.db") if not exists(db_path): rprint( f"[red]Failed to find db at {db_path}. Check your --results[/red]") return engine = create_engine(f"sqlite:///{db_path}") # Use built-in print for files to avoid ANSI codes, rprint for stdout printer = rprint if output == "/dev/stdout" else print with open(output, "w") as f: with Session(engine) as sess: highest_id = -1 print_procname = True if follow: if id_num := sess.execute(func.max(Event.id)).first(): highest_id = id_num[0] - 4 while True: query = sess.query(Write) if procname: query = query.filter(Write.procname.contains(procname)) print_procname = False if highest_id != -1: query = query.filter(Write.id > highest_id) if fd: query = query.filter(Write.fd == fd) seen = set() for event in query.all(): if (event.procname, event.fd, event.fname) not in seen: seen.add((event.procname, event.fd, event.fname)) highest_id = max(highest_id, event.id) for e in sorted(seen): if print_procname: printer(f"({e[0]}) {e[1]} {e[2]}", file=f) else: printer(f"{e[1]} {e[2]}", file=f) # Flush to ensure output appears immediately when following f.flush() if not follow: break else: sleep(1) @db_cli.command() @click.option("--results", default=get_default_results_path(), help="Path to results folder") @click.option("--procname", default=None, help="Process name to filter for (looks for substring)") @click.option("--follow", default=False, help="Show latest results as they appear", is_flag=True) @click.option("--fd", default=None, help="Filter for file descriptor") @click.option("--filename", default=None, help="Filter for file name") @click.option("--output", default="/dev/stdout", help="Output to file instead of stdout") def reads(results, procname, follow, fd, filename, output): """Query file read events.""" print_procname = procname is None args = (procname, fd, filename) wrapper(results, output, print_procname, follow, read_filter, args) @db_cli.command() @click.option("--results", default=get_default_results_path(), help="Path to results folder") @click.option("--procname", default=None, help="Process name to filter for (looks for substring)") @click.option("--syscall", default=None, help="Syscall name to filter for (looks for substring)") @click.option("--errors", default=False, help="Just show syscalls that returned an error", is_flag=True) @click.option("--follow", default=False, help="Show latest results as they appear", is_flag=True) @click.option("--output", default="/dev/stdout", help="Output to file instead of stdout") def syscalls(results, procname, syscall, errors, follow, output): """Query syscall events.""" print_procname = procname is None args = (procname, syscall, errors) wrapper(results, output, print_procname, follow, syscall_filter, args) @db_cli.command() @click.option("--results", default=get_default_results_path(), help="Path to results folder") @click.option("--output", default="/dev/stdout", help="Output to file instead of stdout") def tasks(results, output): """Query and list unique process names (tasks).""" db_path = join(results, "plugins.db") if not exists(db_path): rprint( f"[red]Failed to find db at {db_path}. Check your --results[/red]") return engine = create_engine(f"sqlite:///{db_path}") printer = rprint if output == "/dev/stdout" else print with open(output, "w") as f: with Session(engine) as sess: query = sess.query(Event.procname).distinct().order_by(Event.id) for event in query.all(): printer(event.procname, file=f) @db_cli.command() @click.option("--results", default=get_default_results_path(), help="Path to results folder") @click.option("--procname", default=None, help="Process name to filter for (looks for substring)") @click.option("--follow", default=False, help="Show latest results as they appear", is_flag=True) @click.option("--fd", default=None, help="Filter for file descriptor") @click.option("--filename", default=None, help="Filter for file name") @click.option("--output", default="/dev/stdout", help="Output to file instead of stdout") def writes(results, procname, follow, fd, filename, output): """Query file write events.""" print_procname = procname is None args = (procname, fd, filename) wrapper(results, output, print_procname, follow, write_filter, args) if __name__ == "__main__": db_cli()