Source code for pyplugins.apis.fs

"""
FS API Plugin
=============

This module provides the FS plugin for the penguin framework, enabling interaction with the guest filesystem.
It exposes methods for reading, writing, and querying files and directories in the guest, and can be used
by other plugins to perform filesystem operations.

Features
--------

- Read files from the guest filesystem.
- Write data to files in the guest.
- Execute programs in the guest environment.
- Supports chunked operations for large files.
- Abstracts guest filesystem access for analysis and automation.

Example Usage
-------------

::

    from penguin import plugins

    # Read a file from the guest
    content = yield from plugins.fs.read_file("/etc/passwd")

    # Write to a file in the guest
    yield from plugins.fs.write_file("/tmp/test.txt", "hello world")

    # Execute a program in the guest
    yield from plugins.fs.exec_program("/bin/ls", argv=["ls", "-l", "/"])
"""

from penguin import Plugin
from hyper.consts import HYPER_OP as hop
from hyper.portal import PortalCmd


[docs] class FS(Plugin): """ FS Plugin ========= Provides methods for interacting with the guest filesystem, including reading, writing, and listing files and directories. Methods ------- read_file Read a file from the guest filesystem. write_file Write data to a file in the guest filesystem. exec_program Execute a program in the guest environment. Note ---- All methods are generated and their signatures and types are enforced. """
[docs] def read_file(self, fname: str, size: int = None, offset: int = 0) -> bytes: """ Read a file from the guest filesystem. Parameters ---------- fname : str Path to the file in the guest. size : int, optional Size limit. If None, reads entire file. offset : int, optional Offset in bytes where to start reading (default: 0). Returns ------- bytes The file data as bytes. Raises ------ Exception If the file cannot be read. Reads the specified file from the guest filesystem, optionally limiting the read to a specific size and offset. If `size` is not specified, the entire file is read in chunks. > **Note:** This method is generated and type-checked. """ fname_bytes = fname.encode('latin-1')[:255] + b'\0' rsize = self.plugins.portal.regions_size # Handle the case where we want to read a specific amount if size is not None: # If size is small enough, do a single read if size <= rsize - 1: data = yield PortalCmd(hop.HYPER_OP_READ_FILE, offset, size, None, fname_bytes) return data # For larger sizes, read in chunks all_data = b"" current_offset = offset bytes_remaining = size while bytes_remaining > 0: chunk_size = min(rsize - 1, bytes_remaining) self.logger.debug( f"Reading file chunk: {fname}, offset={current_offset}, size={chunk_size}") chunk = yield PortalCmd(hop.HYPER_OP_READ_FILE, current_offset, chunk_size, None, fname_bytes) if not chunk: self.logger.debug( f"No data returned at offset {current_offset}, stopping read") break all_data += chunk current_offset += len(chunk) bytes_remaining -= len(chunk) # If we got less data than requested, we've reached EOF if len(chunk) < chunk_size: self.logger.debug( f"Reached EOF at offset {current_offset} (requested {chunk_size}, got {len(chunk)})") break return all_data # If size is not specified, read the entire file in chunks all_data = b"" current_offset = offset chunk_size = rsize - 1 while True: self.logger.debug( f"Reading file chunk: {fname}, offset={current_offset}, size={chunk_size}") chunk = yield PortalCmd(hop.HYPER_OP_READ_FILE, current_offset, chunk_size, None, fname_bytes) if not chunk: self.logger.debug( f"No data returned at offset {current_offset}, stopping read") break all_data += chunk current_offset += len(chunk) # If we got less data than requested, we've reached EOF if len(chunk) < chunk_size: self.logger.debug( f"Reached EOF at offset {current_offset} (requested {chunk_size}, got {len(chunk)})") break return all_data
[docs] def write_file(self, fname: str, data: bytes | str, offset: int = 0) -> int: """ Write data to a file in the guest filesystem. Parameters ---------- fname : str Path to the file in the guest. data : bytes or str Data to write to the file. offset : int, optional Offset in bytes where to start writing (default: 0). Returns ------- int Number of bytes written. Raises ------ Exception If the file cannot be written. Overwrites the file if it exists, or creates it if it does not. Handles large writes in chunks. > **Note:** This method is generated and type-checked. """ # Convert string data to bytes if necessary if isinstance(data, str): data = data.encode('latin-1') fname_bytes = fname.encode('latin-1')[:255] + b'\0' rsize = self.plugins.portal.regions_size # Calculate the maximum data size that can fit in one region max_data_size = rsize - len(fname_bytes) # If data is small enough, do a single write if len(data) <= max_data_size: self.logger.debug( f"Writing {len(data)} bytes to file {fname} at offset {offset}") bytes_written = yield PortalCmd(hop.HYPER_OP_WRITE_FILE, offset, len(data), None, fname_bytes + data) return bytes_written # For larger files, write in chunks total_bytes = 0 current_offset = offset current_pos = 0 while current_pos < len(data): # Calculate maximum chunk size to fit in memory region, considering # filename length max_chunk = max_data_size - 16 # Add safety margin chunk_size = min(max_chunk, len(data) - current_pos) self.logger.debug( f"Writing file chunk: {fname}, offset={current_offset}, size={chunk_size}") chunk = data[current_pos:current_pos + chunk_size] bytes_written = yield PortalCmd(hop.HYPER_OP_WRITE_FILE, current_offset, len(chunk), None, fname_bytes + chunk) if not bytes_written: self.logger.error( f"Failed to write chunk at offset {current_offset}") break total_bytes += bytes_written current_offset += bytes_written current_pos += chunk_size # If we couldn't write the full chunk, stop if bytes_written < chunk_size: self.logger.debug( f"Partial write: wrote {bytes_written} of {chunk_size} bytes") break self.logger.debug(f"Total bytes written to file: {total_bytes}") return total_bytes
[docs] def exec_program( self, exe_path: str = None, argv: list[str] = None, envp: dict[str, str] = None, wait: bool = False ) -> int: """ Execute a program in the guest environment. Parameters ---------- exe_path : str, optional Path to executable. If not provided, uses `argv[0]`. argv : list of str, optional List of arguments (including program name as first arg). envp : dict of str, optional Dictionary of environment variables. wait : bool, optional Whether to wait for program to complete. Returns ------- int Return code from execution. Raises ------ Exception If the program cannot be executed. Executes a program in the guest using the kernel's `call_usermodehelper` function. Optionally waits for completion. > **Note:** This method is generated and type-checked. """ if not exe_path: exe_path = argv[0] self.logger.debug( f"exec_program called: exe_path={exe_path}, wait={wait}") # Prepare the data buffer using a list of bytes objects data_parts = [] # Add executable path (null-terminated) data_parts.append(exe_path.encode('latin-1') + b'\0') # Add argv (null-separated, double-null terminated) if argv: for arg in argv: data_parts.append(arg.encode('latin-1') + b'\0') data_parts.append(b'\0') # Double null termination # Add environment variables (null-separated, double-null terminated) if envp: for key, value in envp.items(): env_string = f"{key}={value}" data_parts.append(env_string.encode('latin-1') + b'\0') data_parts.append(b'\0') # Double null termination data_parts.append(b'\0') # Just null termination # Convert the list to a single bytes object data = b''.join(data_parts) # Call the kernel with the prepared data # The wait mode is passed in header.addr field result = yield PortalCmd(hop.HYPER_OP_EXEC, wait, len(data), None, data) self.logger.debug(f"exec_program result: {result}") return result