Source code for penguin.static_analyses

"""
penguin.static_analyses
=======================

Static analysis utilities for the Penguin emulation environment.

This module provides classes and helpers for analyzing extracted filesystems.
"""

import os
import re
import stat
import struct
from subprocess import check_output, CalledProcessError, STDOUT, PIPE, SubprocessError

from abc import ABC
from elftools.common.exceptions import ELFError, ELFParseError
from elftools.elf.elffile import ELFFile
from elftools.elf.sections import SymbolTableSection
from collections import Counter
from pathlib import Path
from penguin import getColoredLogger
from penguin.utils import get_available_kernel_versions
from penguin.defaults import DEFAULT_KERNEL
import tempfile
import subprocess

from .arch import arch_filter, arch_end
logger = getColoredLogger("penguin.static_analyses")


[docs] class FileSystemHelper:
[docs] @staticmethod def find_regex( target_regex: re.Pattern, extract_root: str, ignore: list | tuple | None = None ) -> dict: """ Search the filesystem for matches to a regex pattern using ripgrep. :param target_regex: Compiled regex pattern to match. :param extract_root: Root directory to search. :param ignore: Optional list/tuple of matches to ignore. :return: Dict of {match: {"count": int, "files": [str]}} """ results = {} if not ignore: ignore = tuple() elif isinstance(ignore, list): ignore = tuple(ignore) pattern_str = target_regex.pattern extract_path_str = str(extract_root) try: # Get list of files containing matches file_list_output = check_output( f"rg --files-with-matches -a '{pattern_str}' '{extract_path_str}'", stderr=PIPE, shell=True, ) # Process each file with Python's regex to extract actual matches if file_list_output: for filepath in file_list_output.decode().splitlines(): if not os.path.isfile(filepath) or os.path.islink(filepath): continue # open the file and read the content try: with open(filepath, "r", encoding="utf-8", errors="replace") as f: content = f.read() except Exception as e: logger.warning(f"failed to read file {filepath}: {e}") continue # apply regex pattern to find matches matches = target_regex.findall(content) for match in matches: if match in ignore: continue if match not in results: results[match] = {"count": 0, "files": set()} results[match]["count"] += 1 results[match]["files"].add(filepath) except (SubprocessError, FileNotFoundError) as e: if e.returncode == 1: return {} else: logger.warning(f"Failed to run ripgrep: {e} - falling back to pure Python regex") return FileSystemHelper._find_regex_python(target_regex, extract_root, ignore) return results
@staticmethod def _find_regex_python( target_regex: re.Pattern, extract_root: str, ignore: list | None = None ) -> dict: """ Fallback implementation using Python's built-in regex. :param target_regex: Compiled regex pattern to match. :param extract_root: Root directory to search. :param ignore: Optional list of matches to ignore. :return: Dict of {match: {"count": int, "files": [str]}} """ results = {} if not ignore: ignore = [] # iterate through each file in the extracted root directory for root, dirs, files in os.walk(extract_root): for filename in files: filepath = os.path.join(root, filename) # skip our files in the "./igloo" path if filepath.startswith(os.path.join(extract_root, "igloo")): continue # skip non-regular files if `only_files` is true if not os.path.isfile(filepath) or os.path.islink(filepath): continue # open the file and read the content try: with open(filepath, "r", encoding="utf-8", errors="replace") as f: content = f.read() except Exception as e: logger.warning(f"failed to read file {filepath}: {e}") continue # apply regex pattern to find matches matches = target_regex.findall(content) for match in matches: if match in ignore: continue if match not in results: results[match] = {"count": 0, "files": set()} results[match]["count"] += 1 results[match]["files"].add(filepath) return results
[docs] class StaticAnalysis(ABC): """ Abstract base class for static analyses. """ def __init__(self) -> None: """ Initialize the static analysis. """ pass
[docs] def run(self, extract_dir: str, prior_results: dict) -> None: """ Run the static analysis. :param extract_dir: Directory containing extracted filesystem. :param prior_results: Results from previous analyses. """ pass
[docs] class ArchId(StaticAnalysis): """ Identify the most common architecture in the extracted filesystem. """
[docs] def run(self, extracted_fs: str, prior_results: dict) -> str: ''' Count architectures to identify most common. If both 32 and 64 bit binaries from the most common architecture are present, prefer 64-bit. Raise an error if architecture cannot be determined or is unsupported. :param extracted_fs: Path to extracted filesystem. :param prior_results: Results from previous analyses. :return: Most common architecture string. :raises ValueError: If unable to determine architecture. ''' arch_counts = {32: Counter(), 64: Counter(), "unknown": 0} for root, _, files in os.walk(extracted_fs): for file_name in files: path = os.path.join(root, file_name) if ( os.path.isfile(path) and not os.path.islink(path) and self._binary_filter(extracted_fs, path) ): logger.debug(f"Checking architecture in {path}") with open(path, "rb") as f: if f.read(4) != b"\x7fELF": continue f.seek(0) try: ef = ELFFile(f) except ELFError as e: logger.warning(f"Failed to parse ELF file {path}: {e}. Ignoring") continue info = arch_filter(ef) if info.bits is None or info.arch is None: arch_counts["unknown"] += 1 else: arch_counts[info.bits][info.arch] += 1 # If there is at least one intel and non-intel arch, # filter out all the intel ones. # Some firmwares include x86_64 binaries left-over from the build process that aren't run in the guest. intel_archs = ("intel", "intel64") archs_list = list(arch_counts[32].keys()) + list(arch_counts[64].keys()) if any(arch in intel_archs for arch in archs_list) and any( arch not in intel_archs for arch in archs_list ): del arch_counts[32]["intel"] del arch_counts[64]["intel64"] # Now select the most common architecture. # First try the most common 64-bit architecture. # Then try the most common 32-bit one. best_64 = arch_counts[64].most_common(1) best_32 = arch_counts[32].most_common(1) if len(best_64) != 0: best = best_64[0][0] best_count = best_64[0][1] elif len(best_32) != 0: best = best_32[0][0] best_count = best_32[0][1] else: raise ValueError("Failed to determine architecture of filesystem") # If unknown is the most common, we'll raise an error if arch_counts["unknown"] > best_count: # Dump debug info - which arches have what counts? for arch, count in arch_counts[32].items(): logger.info(f"32-bit arch {arch} has {count} files") for arch, count in arch_counts[64].items(): logger.info(f"64-bit arch {arch} has {count} files") # Finally, report unknown count logger.info(f"Unknown architecture count: {arch_counts['unknown']}") raise ValueError("Failed to determine architecture of filesystem") logger.debug(f"Identified architecture: {best}") return best
@staticmethod def _binary_filter(fsbase: str, name: str) -> bool: """ Filter for binary files of interest. :param fsbase: Base directory. :param name: File path. :return: True if file is a relevant binary. """ base_directories = ["sbin", "bin", "usr/sbin", "usr/bin"] for base in base_directories: if name.startswith(os.path.join(fsbase, base)): return True # Shared libraries, kernel modules, or busybox return name.endswith((".so", ".ko")) or \ ".so." in name or \ name.endswith("busybox")
[docs] class InitFinder(StaticAnalysis): ''' Find potential init scripts and binaries in an extracted filesystem. '''
[docs] def run(self, filesystem_root_path: str, prior_results: dict) -> list[str]: ''' Search the filesystem for binaries that might be init scripts. :param filesystem_root_path: Root path of extracted filesystem. :param prior_results: Results from previous analyses. :return: Sorted list of init script paths. ''' inits = [] # Walk through the filesystem root and find potential init scripts. for root, dirs, files in os.walk(filesystem_root_path): for filename in files: filepath = os.path.join(root, filename) if self._is_init_script(filepath, filesystem_root_path): inits.append("/" + os.path.relpath(filepath, filesystem_root_path)) # Sort inits by length, shortest to longest. inits.sort(key=lambda x: len(x)) # Deprecated: kernel_inits. Filesystem extraction could try analyzing kernel binary # to find init argument built into the kernel. We do not currently do this or have a # way to pass this information through ''' # Examine `init.txt` in the output directory, if it exists. kernel_inits = [] try: with open(os.path.join(output_dir, "init.txt"), "r") as f: kernel_inits = [x.strip() for x in f.readlines()] os.remove(os.path.join(output_dir, "init.txt")) except FileNotFoundError: # No `init.txt`, it's okay. pass if kernel_inits: # Combine `kernel_inits` with `inits`, prioritizing `kernel_inits`. common_inits = [x for x in kernel_inits if x in inits] only_fs_inits = [x for x in inits if x not in common_inits] common_inits.sort(key=lambda x: len(x)) only_fs_inits.sort(key=lambda x: len(x)) inits = common_inits + only_fs_inits ''' # Now rank our init options, using the same ranking as Firmadyne/Firmae where # a few specific inits are prioritized, then fallback to others target_inits = ["preinit", "init", "rcS"] # If any of these are in our init list, move them to the front # but maintain this order (i.e., preinit goes before /init so loop backwards) for potential in target_inits[::-1]: try: idx = [x.split("/")[-1] for x in inits].index(potential) except ValueError: # No match continue # Move to front match = inits.pop(idx) inits.insert(0, match) # Remove entries longer than 32 characters. inits = [i for i in inits if len(i) <= 32] # Final pass to ensure all inits are executable. # Trim the first / in the path to ensure it's relative to our extract dir inits = [ i for i in inits if os.stat(os.path.join(filesystem_root_path, i[:1])).st_mode & (stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH) ] return inits
@staticmethod def _is_init_script(filepath: str, fsroot: str) -> bool: ''' Determine if a file is a potential init script. :param filepath: Path to file. :param fsroot: Filesystem root. :return: True if file is a potential init script. ''' if filepath.startswith("./igloo"): return False if not os.path.isfile(filepath) and not os.path.islink(filepath): return False name = os.path.basename(filepath) if any([x in name for x in ["init", "start"]]) and not any( [x in name for x in ["inittab", "telinit", "initd"]] ): # If 'start' is in the name, ensure it's not part of "restart" or "startup". if "start" in name and not re.search(r"[\W_\-\.]start[\W_\-\.]", name): return False # Handle symlinks: make sure the link target exists. if os.path.islink(filepath): link_target = os.readlink(filepath) if os.path.isabs(link_target): result = os.path.join(fsroot, "./"+link_target) else: result = os.path.join(os.path.dirname(filepath), link_target) if not os.path.exists(result): logger.warning( f"Potential init '{filepath}' is a symlink to '{link_target}' which does not exist in the filesystem" ) return False # If 'init' is in the name, ensure it's not named `.init`. if "init" in name and name.endswith(".init"): return False # Check if the file is executable. if os.path.isfile(filepath) and os.stat(filepath).st_mode & (stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH): return True elif "rcS" in name: if os.path.isfile(filepath) and os.stat(filepath).st_mode & (stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH): return True return False
[docs] class KernelVersionFinder(StaticAnalysis): """ Find and select the best kernel version from extracted filesystem. """
[docs] @staticmethod def is_kernel_version(name: str) -> bool: """ Check if a string matches a kernel version pattern. :param name: Version string. :return: True if matches kernel version pattern. """ return re.match(r"^\d+\.\d+\.\d+(-[\w\.]+)?$", name) is not None
[docs] @staticmethod def select_best_kernel(kernel_versions: set[str]) -> str: """ Select the most recent kernel version and match to available kernels. :param kernel_versions: Iterable of kernel version strings. :return: Best matching kernel version string. """ if not kernel_versions: return DEFAULT_KERNEL # Parse kernel versions into tuples for comparison def parse_version(ver): base = ver.split("-", 1)[0] return tuple(int(t) for t in base.split(".") if t.isdigit()) # Sort kernel_versions by parsed version, descending sorted_versions = sorted(kernel_versions, key=parse_version, reverse=True) most_recent = sorted_versions[0] # Now use the logic from the previous select_best_kernel base_version = most_recent.split("-", 1)[0] guest_tokens = base_version.split(".") guest_version = tuple(int(t) for t in guest_tokens if t.isdigit()) guest_major = guest_version[0] if guest_version else None available_versions = get_available_kernel_versions() major_matches = [v for v in available_versions if v[0] == guest_major] def version_distance(v): maxlen = max(len(v), len(guest_version)) v_pad = v + (0,) * (maxlen - len(v)) g_pad = guest_version + (0,) * (maxlen - len(guest_version)) return sum(abs(a - b) for a, b in zip(v_pad, g_pad)) if major_matches: best = min(major_matches, key=version_distance) else: best = min(available_versions, key=version_distance) best_str = ".".join(str(x) for x in best) return best_str
[docs] def run(self, extract_dir: str, prior_results: dict) -> dict[str, list[str] | str]: """ Run kernel version analysis. :param extract_dir: Directory containing extracted filesystem. :param prior_results: Results from previous analyses. :return: Dict with potential and selected kernel versions. """ potential_kernels = set() # Only look at the top-level directories in self.extract_dir / lib / modules modules_path = os.path.join(extract_dir, "lib/modules") if os.path.exists(modules_path): for d in os.listdir(modules_path): d_path = os.path.join(modules_path, d) if os.path.isdir(d_path): potential_kernels.add(d) # Filter potential kernels to match the expected version pattern potential_kernels = {d for d in potential_kernels if self.is_kernel_version(d)} selected_kernel = self.select_best_kernel(potential_kernels) return { "potential_kernels": sorted(potential_kernels), "selected_kernel": selected_kernel, }
[docs] class EnvFinder(StaticAnalysis): """ Identify potential environment variables and their values in the filesystem. """ BORING_VARS: list[str] = ["TERM"]
[docs] def run(self, extract_dir: str, prior_results: dict) -> dict[str, list | None]: """ Find environment variables and their possible values. :param extract_dir: Directory containing extracted filesystem. :param prior_results: Results from previous analyses. :return: Dict of environment variable names to possible values. """ # To start, we know there's `igloo_task_size` (a knob we created to configure), and # igloo_init (another knob we created) to specify the init program. We'll find # values for both # Three magic values for igloo_task_size task_options = [0xBF000000, 0x7F000000, 0x3F000000] potential_env = { "igloo_task_size": task_options, "igloo_init": prior_results['InitFinder'] } # Now search the filesystem for shell scripts accessing /proc/cmdline pattern = re.compile(r"\/proc\/cmdline.*?([A-Za-z0-9_]+)=", re.MULTILINE) potential_keys = FileSystemHelper.find_regex(pattern, extract_dir, ignore=self.BORING_VARS).keys() # For each key, try pulling out potential values from the filesystem for k in potential_keys: known_vals = None pattern = re.compile(k + r"=([A-Za-z0-9_]+)", re.MULTILINE) potential_vals = FileSystemHelper.find_regex(pattern, extract_dir, ignore=self.BORING_VARS).keys() if len(potential_vals): known_vals = list(potential_vals) potential_env[k] = known_vals return potential_env
[docs] class PseudofileFinder(StaticAnalysis): """ Find device and proc pseudofiles in the extracted filesystem. """ IGLOO_ADDED_DEVICES: list[str] = [ "autofs", "btrfs-control", "cfs0", "cfs1", "cfs2", "cfs3", "cfs4", "console", "cpu_dma_latency", "full", "fuse", "input", "kmsg", "loop-control", "loop0", "loop1", "loop2", "loop3", "loop4", "loop5", "loop6", "loop7", "mem", "memory_bandwidth", "mice", "net", "network_latency", "network_throughput", "null", "port", "ppp", "psaux", "ptmx", "pts", "ptyp0", "ptyp1", "ptyp2", "ptyp3", "ptyp4", "ptyp5", "ptyp6", "ptyp7", "ptyp8", "ptyp9", "ptypa", "ptypb", "ptypc", "ptypd", "ptype", "ptypf", "ram", "ram0", "ram1", "ram10", "ram11", "ram12", "ram13", "ram14", "ram15", "ram2", "ram3", "ram4", "ram5", "ram6", "ram7", "ram8", "ram9", "random", "root", "tty", "tty0", "tty1", "tty10", "tty11", "tty12", "tty13", "tty14", "tty15", "tty16", "tty17", "tty18", "tty19", "tty2", "tty20", "tty21", "tty22", "tty23", "tty24", "tty25", "tty26", "tty27", "tty28", "tty29", "tty3", "tty30", "tty31", "tty32", "tty33", "tty34", "tty35", "tty36", "tty37", "tty38", "tty39", "tty4", "tty40", "tty41", "tty42", "tty43", "tty44", "tty45", "tty46", "tty47", "tty48", "tty49", "tty5", "tty50", "tty51", "tty52", "tty53", "tty54", "tty55", "tty56", "tty57", "tty58", "tty59", "tty6", "tty60", "tty61", "tty62", "tty63", "tty7", "tty8", "tty9", "ttyS0", "ttyS1", "ttyS2", "ttyS3", "ttyp0", "ttyp1", "ttyp2", "ttyp3", "ttyp4", "ttyp5", "ttyp6", "ttyp7", "ttyp8", "ttyp9", "ttypa", "ttypb", "ttypc", "ttypd", "ttype", "ttypf", "tun", "urandom", "vcs", "vcs1", "vcsa", "vcsa1", "vda", "vga_arbiter", "vsock", "zero", "root", "pts", # Added in init "ttyAMA0", "ttyAMA1", # ARM "stdin", "stdout", "stderr", # Symlinks to /proc/self/fd/X ] IGLOO_PROCFS: list[str] = [ "buddyinfo", "cgroups", "cmdline", "config.gz", "consoles", "cpuinfo", "crypto", "devices", "diskstats", "execdomains", "fb", "filesystems", "interrupts", "iomem", "ioports", "kallsyms", "key-users", "keys", "kmsg", "kpagecount", "kpageflags", "loadavg", "locks", "meminfo", "misc", "modules", "mounts", "mtd", # We might shadow this later intentionally, but not by default "net", "pagetypeinfo", "partitions", "penguin_net", # This is custom and unique but we shouldn't ever shadow it "sched_debug", "slabinfo", "softirqs", "stat", "swaps", "sysrq-trigger", "thread-self", "timer_list", "uptime", "version", "vmallocinfo", "vmstat", "zoneinfo", # Directories "bus", "bus/pci", "bus/pci/00", "bus/pci/00/00.0", "bus/pci/00/0a.0", "bus/pci/00/0a.1 ", "bus/pci/00/0a.2 ", "bus/pci/00/0a.3 ", "bus/pci/00/0b.0 ", "bus/pci/00/12.0 ", "bus/pci/00/13.0 ", "bus/pci/00/14.0 ", "bus/pci/devices ", "bus/input", "bus/input/devices", "bus/input/handlers", "cpu", "cpu/alignment", "driver", "driver/rtc", "fs", "fs/afs", "fs/afs/cells", "fs/afs/rootcell", "fs/ext4", "fs/f2fs", "fs/jbd2", "fs/nfsd", "fs/lockd", "fs/lockd/nlm_end_grace", "fs/nfsfs", "fs/nfsfs/servers", "fs/nfsfs/volumes", # Sys is special, loaded dynamically # sysvipc, driver (empty), scsi, tty, sys (big), irq (numbers), bus, fs "sysvipc/shm", "sysvipc/sem", "sysvipc/msg", "scsi/device_info", "scsi/scsi", "tty/drivers", "tty/ldisc", "tty/driver", "tty/driver/serial", "tty/ldisc", ] # Directories that we want to just ignore entirely - don't create any entries # within these directories. IRQs and device-tree are related to the emulated CPU # self and PID are related to the process itself and dynamically created PROC_IGNORE: list[str] = ["irq", "self", "PID", "device-tree", "net", "vmcore"] def __init__(self) -> None: """ Initialize PseudofileFinder and load additional procfs entries. """ # Load ../resources/proc_sys.txt, add each line to IGLOO_PROCFS resources = os.path.join(os.path.dirname(os.path.dirname(__file__)), "resources") with open(os.path.join(resources, "proc_sys.txt"), "r") as f: for line in f.readlines(): self.IGLOO_PROCFS.append(line.strip()) def _filter_files( self, extract_dir: str, pattern: re.Pattern, ignore_list: list[str], remove_list: list[str] ) -> list[str]: """ Filter files in a directory based on regex, ignore, and remove lists. :param extract_dir: Directory to search. :param pattern: Regex pattern to match. :param ignore_list: List of prefixes to ignore. :param remove_list: List of absolute matches to remove. :return: Filtered list of file paths. """ # Find all files matching the pattern found_files = list(FileSystemHelper.find_regex(pattern, extract_dir).keys()) # Apply ignore filters: these are paths we'll ignore entirely # filtered_files = [ # f for f in found_files if not any(f == ignored or f.startswith(ignored +"/") for ignored in ignore_list) # ] filtered_files = [] for x in found_files: for f in ignore_list: if x == f or x.startswith(f + "/"): # print(f"Ignoring {x}") break else: filtered_files.append(x) # Remove items from remove_list (like IGLOO_ADDED_DEVICES or IGLOO_PROCFS) # filtered_files = [f for f in filtered_files if \ # f not in remove_list] for f in remove_list: if f in filtered_files: # print(f"Removing {f}") filtered_files.remove(f) # Remove directories that have subpaths directories_to_remove = { "/".join(k.split("/")[:i + 1]) # get parent directories for k in filtered_files for i in range(len(k.split("/")[:-1])) # only consider parent parts } return [k for k in filtered_files if k not in directories_to_remove]
[docs] def run(self, extract_dir: str, prior_results: dict) -> dict[str, list[str]]: """ Run pseudofile analysis. :param extract_dir: Directory containing extracted filesystem. :param prior_results: Results from previous analyses. :return: Dict with lists of device and proc files. """ # Regex patterns for dev and proc files dev_pattern = re.compile(r"/dev/([a-zA-Z0-9_/]+)", re.MULTILINE) proc_pattern = re.compile(r"/proc/([a-zA-Z0-9_/]+)", re.MULTILINE) # Filter device files dev_files = self._filter_files( extract_dir, dev_pattern, [], self.IGLOO_ADDED_DEVICES ) # Filter proc files, applying PROC_IGNORE and IGLOO_PROCFS proc_files = self._filter_files( extract_dir, proc_pattern, self.PROC_IGNORE, self.IGLOO_PROCFS ) # Return dev and proc files in the appropriate format return { "dev": [f"/dev/{x}" for x in dev_files], "proc": [f"/proc/{x}" for x in proc_files], }
@staticmethod def _get_devfiles_in_fs(extracted_dir: str) -> list[str]: """ Get all device files in extracted_dir/dev. :param extracted_dir: Directory containing extracted filesystem. :return: List of device file paths. """ dev_dir = os.path.join(extracted_dir, "dev") results = [] if os.path.exists(dev_dir): for root, _, files in os.walk(dev_dir): for f in files: relative_path = os.path.join("/dev", os.path.relpath(os.path.join(root, f), dev_dir)) results.append(relative_path) return results
[docs] class InterfaceFinder(StaticAnalysis): """ Identify network interfaces in the filesystem. """
[docs] def run(self, extract_dir: str, prior_results: dict) -> dict[str, list[str]] | None: """ Find network interfaces using sysfs and command references. :param extract_dir: Directory containing extracted filesystem. :param prior_results: Results from previous analyses. :return: Dict of interfaces found via sysfs and commands. """ # Find all network interfaces in the filesystem pattern = re.compile(r"/sys/class/net/([a-zA-Z0-9_]+)", re.MULTILINE) sys_net_ifaces = FileSystemHelper.find_regex(pattern, extract_dir).keys() # Filter out the default network interfaces sys_net_ifaces = [i for i in sys_net_ifaces if not i.startswith("veth") and not i.startswith("br") and not i == "lo"] # Now search for references to standard network commands: ifconfig, ip, brctl # We'll use these to identify interfaces interfaces = set() # Look for patterns that match network interface names in the context of commands interface_regex = r"([a-zA-Z0-9][a-zA-Z0-9_-]{2,15})" ifconfig_matches = re.compile(rf"ifconfig\s+{interface_regex}") ip_link_matches = re.compile(rf"ip\s+(?:addr|link|route|add|set|show)\s+{interface_regex}") ifup_down_matches = re.compile(rf"if(?:up|down)\s+{interface_regex}") ethtool_matches = re.compile(rf"ethtool\s+{interface_regex}") route_matches = re.compile(rf"route\s+(?:add|del)\s+{interface_regex}") iwconfig_matches = re.compile(rf"iwconfig\s+{interface_regex}") netstat_matches = re.compile(rf"netstat\s+-r\s+{interface_regex}") ss_matches = re.compile(rf"ss\s+-i\s+{interface_regex}") # Aggregate all patterns patterns = [ ifconfig_matches, ip_link_matches, ifup_down_matches, ethtool_matches, route_matches, iwconfig_matches, netstat_matches, ss_matches ] for p in patterns: interfaces.update(FileSystemHelper.find_regex(p, extract_dir).keys()) bad_prefixes = ["veth", "br"] bad_vals = ["lo", "set", "add", "del", "route", "show", "addr", "link", "up", "down", "flush", "help", "default"] # Filter out the default network interfaces interfaces = [iface for iface in interfaces if not any([x in iface for x in bad_vals]) and not any([iface.startswith(x) for x in bad_prefixes]) and not iface.isnumeric()] result = {} if len(sys_net_ifaces): result["sysfs"] = list(sys_net_ifaces) if len(interfaces): result["commands"] = list(interfaces) if len(result): return result
[docs] class ClusterCollector(StaticAnalysis): ''' Collect summary statistics for the filesystem to help identify clusters. '''
[docs] def run(self, extract_dir: str, prior_results: dict) -> dict[str, list[str]]: """ Collect basename and hash of every executable file. :param extract_dir: Directory containing extracted filesystem. :param prior_results: Results from previous analyses. :return: Dict with lists of files, executables, and hashes. """ # Collect the basename + hash of every executable file in the system all_files = set() executables = set() executable_hashes = set() for root, _, files in os.walk(extract_dir): for f in files: file_path = os.path.join(root, f) if os.path.isfile(file_path): all_files.add(os.path.basename(f)) if os.path.isfile(file_path) and os.access(file_path, os.X_OK): executables.add(os.path.basename(f)) hash_value = self.compute_file_hash(file_path) if hash_value: executable_hashes.add(hash_value) return { 'files': list(all_files), 'executables': list(executables), 'executable_hashes': list(executable_hashes) }
[docs] @staticmethod def compute_file_hash(file_path: str) -> str | None: """ Compute SHA256 hash of a file. :param file_path: Path to file. :return: Hex digest string or None on failure. """ try: # Use the system's sha256sum binary for better performance output = check_output(["sha256sum", file_path], stderr=STDOUT) # sha256sum output format: '<hash> <file_path>' return output.decode('utf-8').split()[0] except (CalledProcessError, FileNotFoundError, IOError) as e: logger.debug(f"Failed to hash file {file_path}: {e}") return None
[docs] class LibrarySymbols(StaticAnalysis): """ Examine libraries in the filesystem for NVRAM keys and exported symbols. Uses pyelftools to find definitions for NVRAM_KEYS variables and tracks exported function names. """ NVRAM_KEYS: list[str] = ["Nvrams", "router_defaults"]
[docs] def run(self, extract_dir: str, prior_results: dict) -> dict[str, dict]: """ Analyze libraries for NVRAM keys and symbols. :param extract_dir: Directory containing extracted filesystem. :param prior_results: Results from previous analyses. :return: Dict with nvram values and symbol paths. """ self.extract_dir = extract_dir self.archend = arch_end(prior_results['ArchId']) if any([x is None for x in self.archend]): self.enabled = False print(f"Warning: Unknown architecture/endianness: {self.archend}. Cannot run NVRAM recovery Static Analysis") return symbols = {} nvram = {} sym_paths = {} # path -> symbol names # Now let's examine each extracted library for root, _, files in os.walk(self.extract_dir): for file in files: file_path = Path(root) / file if file_path.is_file() and \ (str(file_path).endswith(".so") or ".so." in str(file_path)): try: found_nvram, found_syms = self._analyze_library(file_path, self.archend) except Exception as e: logger.error( f"Unhandled exception in _analyze_library for {file_path}: {e}" ) continue tmpless_path = str(file_path).replace(str(self.extract_dir), "") sym_paths[tmpless_path] = found_syms for symname, offset in found_syms.items(): symbols[(tmpless_path, symname)] = offset for key, value in found_nvram.items(): nvram_key = key.rsplit(":", 1)[-1] # Handle case of value coming from ar nvram[(tmpless_path, nvram_key)] = value # Raw data will be library path -> key -> value nvram_values = {} for (path, key), value in nvram.items(): if path not in nvram_values: nvram_values[path] = {} if key is not None and len(key) and value is not None: nvram_values[path][key] = value # nvram is key of filepath -> nvram key -> nvram value # We should 1) generate patches for each possible non-conflicting source return {'nvram': nvram_values, 'symbols': sym_paths}
@staticmethod def _find_symbol_address( elffile: ELFFile, symbol_name: str ) -> tuple[int | None, int | str | None]: """ Find the address and section index of a symbol in an ELF file. :param elffile: ELFFile object. :param symbol_name: Name of the symbol. :return: Tuple of (address, section_index) or (None, None). """ try: symbol_tables = [ s for s in elffile.iter_sections() if isinstance(s, SymbolTableSection) ] except ELFParseError: return None, None for section in symbol_tables: if symbol := section.get_symbol_by_name(symbol_name): symbol = symbol[0] return ( symbol["st_value"], symbol["st_shndx"], ) # Return symbol address and section index return None, None @staticmethod def _get_string_from_address( elffile: ELFFile, address: int, is_64: bool = False, is_eb: bool = False ) -> str | None: """ Get a string from a given address in an ELF file. :param elffile: ELFFile object. :param address: Address to read string from. :param is_64: True if 64-bit ELF. :param is_eb: True if big-endian. :return: Decoded string or None. """ for section in elffile.iter_sections(): start_addr = section["sh_addr"] end_addr = start_addr + section.data_size if start_addr <= address < end_addr: offset_within_section = address - start_addr data = section.data()[offset_within_section:] str_end = data.find(b"\x00") if str_end != -1: try: return data[:str_end].decode("utf-8") except UnicodeDecodeError: # print(f"Failed to decode string: {data[:str_end]}") pass return None @staticmethod def _is_elf(filename: str) -> bool: """ Check if a file is an ELF binary. :param filename: Path to file. :return: True if ELF, False otherwise. """ try: with open(filename, "rb") as f: magic = f.read(4) return magic == b"\x7fELF" except IOError: return False
[docs] @staticmethod def get_nvram_info( elf_path: str, archend: str ) -> dict[str, str | None]: """ Extract NVRAM key-value pairs from an ELF file. :param elf_path: Path to ELF file. :param archend: Architecture/endianness info. :return: Dict of NVRAM key-value pairs. """ nvram_data = {} is_eb = "eb" in archend is_64 = "64" in archend with open(elf_path, "rb") as f: try: elffile = ELFFile(f) except ELFError: # elftools failed to parse our file. If it's actually an ELF, warn if LibrarySymbols._is_elf(elf_path): logger.warning( f"Failed to parse {elf_path} as an ELF file when analyzing libraries" ) return nvram_data # Check for nvram keys for nvram_key in LibrarySymbols.NVRAM_KEYS: address, section_index = LibrarySymbols._find_symbol_address(elffile, nvram_key) if address is None: continue if section_index == "SHN_UNDEF": # This is a common case for shared libraries, it means # the symbol is defined in another library? continue try: section = elffile.get_section(section_index) except TypeError: logger.warning( f"Failed to get section {section_index} for symbol {nvram_key} in {elf_path} when analyzing libraries" ) continue data = section.data() start_addr = section["sh_addr"] offset = address - start_addr pointer_size = 8 if is_64 else 4 unpack_format = f"{'>' if is_eb else '<'}{'Q' if is_64 else 'I'}" # We expect key_ptr, value_ptr, NULL, ... # note that we could have key_ptr, NULL, NULL # end when we get a NULL key fail_count = 0 while offset + (pointer_size * 3) < len(data): ptrs = [ struct.unpack( unpack_format, data[ offset + i * pointer_size: offset + (i + 1) * pointer_size ], )[0] for i in range(3) ] if ptrs[0] != 0: key = LibrarySymbols._get_string_from_address(elffile, ptrs[0], is_64, is_eb) val = LibrarySymbols._get_string_from_address(elffile, ptrs[1], is_64, is_eb) if ( key and not any([x in key for x in ' /\t\n\r<>"']) and not key[0].isnumeric() ): fail_count = 0 if key not in nvram_data: nvram_data[key] = val else: fail_count += 1 else: # Should we break here? # For now let's just keep going (be sure to keep offset increment below) # so we're more likely to find additional keys - might get false positives though pass if fail_count > 5: # Probably just outside of the table? break offset += pointer_size * 3 return nvram_data
@staticmethod def _analyze_library( elf_path: str, archend: str ) -> tuple[dict, dict]: """ Analyze a single library for exported tables and function names. :param elf_path: Path to library file. :param archend: Architecture/endianness info. :return: Tuple of (nvram_data, symbols). """ symbols = {} # Symbol name -> relative(?) address nvram_data = {} # key -> value (may be empty string) # Check if the file is an ar archive try: with open(elf_path, 'rb') as f: archive = f.read(8) == b"!<arch>\n" if archive: with tempfile.TemporaryDirectory() as temp_dir: subprocess.run(["ar", "x", elf_path], cwd=temp_dir, check=True) for obj_file in os.listdir(temp_dir): obj_path = os.path.join(temp_dir, obj_file) found_nvram, found_syms = LibrarySymbols._analyze_library(obj_path, archend) archive_key = f"{os.path.basename(elf_path)}:{obj_file}" symbols.update({f"{archive_key}:{k}": v for k, v in found_syms.items()}) nvram_data.update({f"{archive_key}:{k}": v for k, v in found_nvram.items()}) return nvram_data, symbols except CalledProcessError as e: logger.error(f"Error processing archive {elf_path}: {e.output.decode('utf-8', errors='ignore')}") # Handle ELF files try: if nm_out := check_output(["nm", "-D", "--defined-only", elf_path], stderr=STDOUT): for line in nm_out.decode("utf8", errors="ignore").split("\n"): if line: parts = line.split() if len(parts) == 3: addr, _, name = parts if '@' in name: name = name.split("@")[0] addr = int(addr, 16) if addr != 0: symbols[name] = addr elif line.strip().endswith("no symbols"): continue else: logger.warning(f"Unexpected nm output format: {line}") except CalledProcessError as e: if LibrarySymbols._is_elf(elf_path): logger.error(f"Error running nm on {elf_path}: {e.output.decode('utf-8', errors='ignore')}") return nvram_data, symbols if any(sym in symbols for sym in LibrarySymbols.NVRAM_KEYS): nvram_data = LibrarySymbols.get_nvram_info(elf_path, archend) return nvram_data, symbols