Source code for pengutils.utils.util_base

"""
Utility Base Functions
======================

This module provides utility functions for wrapping and executing filtered queries on event databases.
It is designed to be used by CLI scripts for querying and outputting event data, supporting flexible
filtering and output options.

Example usage
-------------

.. code-block:: python

    from pengutils.utils.util_base import wrapper

    def my_filter(sess, arg1, arg2):
        # ...filter logic...
        return query

    wrapper(results_path, output_path, print_procname, follow, my_filter, (arg1, arg2))

Functions
---------

- wrapper: Handles session setup, filtering, and output for event queries.

"""

from sqlalchemy.orm import Session
from sqlalchemy import func, create_engine
from rich import print as rprint
from rich.markup import escape
from time import sleep
from os.path import join, exists
from pengutils.events import Event


[docs] def get_default_results_path(): """ Get the default results path. Returns ------- str Default results path. """ import os project_dir = os.getenv("PENGUIN_PROJECT_DIR") if project_dir: return os.path.join(project_dir, "results/latest") # Fall back to current directory (old default) return "./results"
[docs] def wrapper(results, output, print_procname, follow, filter_func, args): """ Wrapper function to execute a filtered query and output results. Parameters ---------- results : str Path to results folder. output : str Output file path (default: ``/dev/stdout``). print_procname : bool Whether to print the process name in output. follow : bool Whether to show latest results as they appear. filter_func : callable Function to filter the query. Should accept (sess, ``*args``). args : tuple Arguments to pass to the filter function. Returns ------- None """ db_path = join(results, "plugins.db") if not exists(db_path): print(f"Failed to find db at {db_path}. Check your --results") return engine = create_engine(f"sqlite:///{db_path}") with open(output, "w") as f: with Session(engine) as sess: highest_id = -1 # only pretty print if we are printing to stdout if output == "/dev/stdout": printer = lambda *args, **kwargs: rprint(*(escape(str(arg)) for arg in args), **kwargs) # noqa: E731 else: printer = print # in follow mode we print the last 4 events and then continue from there if follow: if id_num := sess.execute(func.max(Event.id)).first(): highest_id = id_num[0] - 4 while True: query = filter_func(sess, *args) if highest_id != -1: query = query.filter(Event.id > highest_id) for event in query.all(): if print_procname: printer(f"({event.procname}) {event}", file=f) else: printer(event, file=f) highest_id = max(highest_id, event.id) if not follow: break else: sleep(1)