Source code for penguin.config_patchers

"""
penguin.config_patchers
=======================

Configuration patch generation utilities for the Penguin emulation environment.

This module provides classes and helpers for generating configuration patches,
handling static and dynamic pseudofiles, network devices, library injection,
NVRAM defaults, and other config modifications.
"""

import os
import re
import stat
import subprocess
import tarfile
from elftools.common.exceptions import ELFError
from elftools.elf.elffile import ELFFile

from abc import ABC, abstractmethod
from collections import defaultdict
from pathlib import Path

from penguin import getColoredLogger
from .arch import arch_filter, arch_end
from .defaults import (
    default_init_script,
    default_lib_aliases,
    default_netdevs,
    default_plugins,
    expert_knowledge_pseudofiles,
    default_libinject_string_introspection,
    static_dir as STATIC_DIR
)
from .utils import get_arch_subdir

logger = getColoredLogger("penguin.config_patchers")

RESOURCES: str = os.path.join(os.path.dirname(os.path.dirname(__file__)), "resources")


[docs] class PatchGenerator(ABC): def __init__(self) -> None: self.enabled: bool = True self.patch_name: str | None = None
[docs] @abstractmethod def generate(self, patches: dict) -> dict | None: """ Generate a patch dictionary. :param patches: Existing patches dictionary. :type patches: dict :return: Patch dictionary or None. :rtype: dict or None """ raise NotImplementedError("Subclasses should implement this method")
[docs] class TarHelper: ''' Collection of static method to help find files in a tar archive '''
[docs] @staticmethod def get_all_members(tarfile_path: str): with tarfile.open(tarfile_path, "r") as tar: # Trim leading . from path, everything is ./ # return {member.name[1:] for member in tar.getmembers()} return tar.getmembers()
[docs] @staticmethod def get_other_members(tarfile_path: str): # Get things that aren't files nor directories - devices, symlinnks, etc with tarfile.open(tarfile_path, "r") as tar: # Trim leading . from path, everything is ./ return { member.name[1:] for member in tar.getmembers() if not member.isfile() and not member.isdir }
[docs] @staticmethod def get_directory_members(tarfile_path: str) -> set[str]: with tarfile.open(tarfile_path, "r") as tar: # Trim leading . from path, everything is ./ results = {member.name[1:] for member in tar.getmembers() if member.isdir()} # For each result, recursively add all parent directories # e.g., /etc/hosts -> /etc, / for r in list(results): parts = r.split("/") for i in range(len(parts)): results.add("/".join(parts[: i + 1])) return results
[docs] @staticmethod def get_file_members(tarfile_path: str) -> set[str]: with tarfile.open(tarfile_path, "r") as tar: # Trim leading . from path, everything is ./ return {member.name[1:] for member in tar.getmembers() if member.isfile()}
[docs] class FileHelper:
[docs] @staticmethod def find_executables(tmp_dir: str, target_dirs: set[str] | None = None): if not target_dirs: target_dirs = {"/"} for root, _, files in os.walk(tmp_dir): # Exclude the '/igloo' path if "/igloo" in root: continue for file in files: file_path = Path(root) / file # Check if the file is executable and in one of the target directories if ( file_path.is_file() and os.access(file_path, os.X_OK) and any(str(file_path).endswith(d) for d in target_dirs) ): yield file_path
[docs] @staticmethod def find_strings_in_file(file_path: str, pattern: str) -> list[str]: result = subprocess.run(["strings", file_path], capture_output=True, text=True) return [line for line in result.stdout.splitlines() if re.search(pattern, line)]
[docs] @staticmethod def find_shell_scripts(tmp_dir: str): for root, _, files in os.walk(tmp_dir): # Exclude the '/igloo' path if "/igloo" in root: continue for file in files: file_path = Path(root) / file # Check if the file is executable and in one of the target directories if ( file_path.is_file() and os.access(file_path, os.X_OK) and str(file_path).endswith(".sh") ): yield file_path
[docs] @staticmethod def exists(tmp_dir: str, target: str) -> bool: """ Check if the target exists within the extracted filesystem in tmp_dir, handling symlinks correctly. :param tmp_dir: The root of the extracted filesystem (e.g., /tmp/extracted) :type tmp_dir: str :param target: The target path to check (e.g., /foo/zoo) :type target: str :return: True if the target exists within tmp_dir, False otherwise :rtype: bool """ assert target.startswith("/") assert os.path.exists(tmp_dir) # Strip the leading slash from the target to work with relative paths target = target[1:] # Remove leading '/' parts = target.split("/") # Initialize path traversal from tmp_dir current_path = tmp_dir for part in parts: next_path = os.path.join(current_path, part) if os.path.islink(next_path): # Resolve symlink resolved = os.readlink(next_path) # If symlink is absolute, restart from tmp_dir if resolved.startswith("/"): current_path = os.path.realpath(os.path.join(tmp_dir, resolved[1:])) else: # Resolve relative symlink against the current path current_path = os.path.realpath(os.path.join(current_path, resolved)) else: # Move one level deeper in the path current_path = next_path # If the resolved path doesn't exist at any point, return False if not os.path.exists(current_path): return False # Final check: Ensure the fully resolved path exists return os.path.exists(current_path)
[docs] class NvramHelper: @staticmethod def _get_default_nvram_values() -> dict[str, str]: """ Default nvram values from Firmadyne and FirmAE. :return: Dictionary of default NVRAM values. :rtype: dict[str, str] """ nvram = { "console_loglevel": "7", "restore_defaults": "1", "sku_name": "", "wla_wlanstate": "", "lan_if": "br0", "lan_ipaddr": "192.168.0.50", "lan_bipaddr": "192.168.0.255", "lan_netmask": "255.255.255.0", "time_zone": "PST8PDT", "wan_hwaddr_def": "01:23:45:67:89:ab", "wan_ifname": "eth0", "lan_ifnames": "eth1 eth2 eth3 eth4", "ethConver": "1", "lan_proto": "dhcp", "wan_ipaddr": "0.0.0.0", "wan_netmask": "255.255.255.0", "wanif": "eth0", "time_zone_x": "0", "rip_multicast": "0", "bs_trustedip_enable": "0", "et0macaddr": "01:23:45:67:89:ab", "filter_rule_tbl": "", "pppoe2_schedule_config": "127:0:0:23:59", "schedule_config": "127:0:0:23:59", "access_control_mode": "0", "fwpt_df_count": "0", "static_if_status": "1", "www_relocation": "", } # Helper function add default entries from firmae def _add_firmae_for_entries(config_dict, pattern, value, start, end): for index in range(start, end + 1): config_dict[pattern % index] = value # TODO: do we want a config toggle for these entires seprately from the other defaults? _add_firmae_for_entries( nvram, "usb_info_dev%d", "A200396E0402FF83@[email protected]@U@1@USB_Storage;U:;0;0@", 0, 101, ) _add_firmae_for_entries(nvram, "wla_ap_isolate_%d", "", 1, 5) _add_firmae_for_entries(nvram, "wlg_ap_isolate_%d", "", 1, 5) _add_firmae_for_entries(nvram, "wlg_allow_access_%d", "", 1, 5) _add_firmae_for_entries(nvram, "%d:macaddr", "01:23:45:67:89:ab", 0, 3) _add_firmae_for_entries(nvram, "lan%d_ifnames", "", 1, 10) return nvram
[docs] @staticmethod def parse_nvram_file(path: str, f) -> dict: """ Parse a NVRAM file and return key-value pairs. :param path: Path to NVRAM file. :type path: str :param f: File object. :return: Dictionary of key-value pairs. :rtype: dict """ file_content = f.read() key_val_pairs = file_content.split(b"\x00") results_null = {} results_lines = {} # print(f"Parsing potential nvram file {path}") # print(f"Found {len(key_val_pairs)} null terminators pairs vs {len(file_content.splitlines())} lines") for pair in key_val_pairs[:-1]: # Exclude the last split as it might be empty try: key, val = pair.split(b"=", 1) # It's safe to set val as a stirng, even when it's an int if key.startswith(b"#"): continue results_null[key] = val except ValueError: logger.warning(f"could not process default nvram file {path} for {pair}") continue # Second pass, if there are a lot of lines, let's try that way for line in file_content.split(b"\n"): if line.startswith(b"#"): continue if b"=" not in line: continue key, val = line.split(b"=", 1) results_lines[key] = val # Do we have more results in one than the other? Either should have at least 5 for us to have any confidence if len(results_null) > 5 and len(results_null) > len(results_lines): return results_null elif len(results_lines) > 5 and len(results_lines) > len(results_null): return results_lines else: return {}
[docs] @staticmethod def nvram_config_analysis(fs_path: str, full_path: bool = True) -> dict[str, str]: # Nvram source 2: standard nvram paths with plaintext data # If we have a hit, we combine with any existing values # These are notionally sorted - if an earlier path provides a value, we won't clobber # but we will consume keys from all paths that we can find and parse # If full_path, we check the whole path, otherwise just the basename nvram_paths = [ "./var/etc/nvram.default", "./etc/nvram.default", "./etc/nvram.conf", "./etc/nvram.deft", "./etc/nvram.update", "./etc/wlan/nvram_params", "./etc/system_nvram_defaults", "./image/mnt/nvram_ap.default", "./etc_ro/Wireless/RT2860AP/RT2860_default_vlan", "./etc_ro/Wireless/RT2860AP/RT2860_default_novlan", "./image/mnt/nvram_whp.default", "./image/mnt/nvram_rt.default", "./image/mnt/nvram_rpt.default", "./image/mnt/nvram.default", ] nvram_basenames = set([os.path.basename(x) for x in nvram_paths]) path_nvrams = {} # XXX: Should we store the source filename somewhere? Maybe # move this to a static analysis that spits out more verbose data # and then only some turns into a config patch? if full_path: # Check the exact paths for path in nvram_paths: abs_path = os.path.join(fs_path, path.lstrip("/")) if os.path.exists(abs_path): # Found a default nvram file, parse it with open(abs_path, "rb") as f: result = NvramHelper.parse_nvram_file(path, f) # result is key -> value. We want to store path as well for k, v in result.items(): path_nvrams[k.decode()] = v.decode() else: # Check every file to see if it has a matching basename for root, _, files in os.walk(fs_path): for file in files: abs_path = os.path.join(root, file) rel_path = "./" + os.path.relpath(abs_path, fs_path) if rel_path in nvram_paths: # Exact match - we already checked this continue if any(file == fname for fname in nvram_basenames): # Found a matching basename, parse the file with open(abs_path, "rb") as f: result = NvramHelper.parse_nvram_file(rel_path, f) for k, v in result.items(): path_nvrams[k.decode()] = v.decode() return path_nvrams
[docs] class BasePatch(PatchGenerator): ''' Generate base config for static_files and default plugins ''' UNKNOWN_INIT: str = "UNKNOWN_FIX_ME" def __init__(self, arch_info: str, inits: list, kernel_versions: dict) -> None: self.patch_name = "base" self.enabled = True self.set_arch_info(arch_info) self.kernel_versions = kernel_versions if len(inits): self.igloo_init = inits[0] else: self.igloo_init = self.UNKNOWN_INIT logger.warning("Failed to find any init programs - config will need manual refinement")
[docs] def set_arch_info(self, arch_identified: str) -> None: ''' Set architecture info for config patch. :param arch_identified: Identified architecture string. :type arch_identified: str ''' # TODO: should we allow a config to be generated for an unsupported architecture? # For example, what if we're wrong and a user wants to customize this. arch, endian = arch_end(arch_identified) if arch is None: raise NotImplementedError(f"Architecture {arch_identified} not supported ({arch}, {endian})") # Map architecture names to config schema valid names if arch == "aarch64": self.arch_name = "aarch64" elif arch == "intel64": self.arch_name = "intel64" elif arch == "loongarch64": self.arch_name = "loongarch64" elif arch == "riscv64": self.arch_name = "riscv64" elif arch == "powerpc": self.arch_name = "powerpc" elif arch == "powerpc64": if endian == "el": self.arch_name = "powerpc64le" # powerpc64el -> powerpc64le for config schema else: self.arch_name = "powerpc64" # powerpc64eb -> powerpc64 else: # For architectures like mips with endianness, construct the name self.arch_name = arch + endian mock_config = {"core": {"arch": self.arch_name}} self.arch_dir = get_arch_subdir(mock_config) if arch_identified == "aarch64": self.dylib_dir = "arm64" elif arch_identified == "intel64": self.dylib_dir = "x86_64" elif arch_identified == "loongarch64": self.dylib_dir = "loongarch" elif "powerpc" in self.arch_name: self.dylib_dir = self.arch_name.replace("powerpc", "ppc") # dylibs are built with short names else: self.dylib_dir = self.arch_dir
[docs] def generate(self, patches: dict) -> dict: # Add serial device in pseudofiles # This is because arm uses ttyAMA (major 204) and mips uses ttyS (major 4). # XXX: For mips we use major 4, minor 65. For arm we use major 204, minor 65. # For powerpc: major 229, minor 1 (hvc1) if 'mips' in self.arch_name or self.arch_name == "intel64": igloo_serial_major = 4 igloo_serial_minor = 65 elif self.arch_name in ['armel', 'aarch64']: igloo_serial_major = 204 igloo_serial_minor = 65 elif "powerpc" in self.arch_name: igloo_serial_major = 229 igloo_serial_minor = 1 elif self.arch_name == "loongarch64": igloo_serial_major = 4 igloo_serial_minor = 65 else: igloo_serial_major = 204 igloo_serial_minor = 65 result = { "core": { "arch": self.arch_name, "kernel": self.kernel_versions["selected_kernel"], }, "env": { "igloo_init": self.igloo_init, }, "pseudofiles": { # Ensure guest can't interfere with our 2nd serial console - make it a null device "/dev/ttyS1": { "read": { "model": "zero", }, "write": { "model": "discard", }, "ioctl": { "*": { "model": "return_const", "val": 0, } } }, "/dev/ttyAMA1": { "read": { "model": "zero", }, "write": { "model": "discard", }, "ioctl": { "*": { "model": "return_const", "val": 0, } } } }, "static_files": { "/igloo/init": { "type": "inline_file", "contents": default_init_script, "mode": 0o111, }, "/igloo/utils/sh": { "type": "symlink", "target": "/igloo/utils/busybox", }, "/igloo/utils/sleep": { "type": "symlink", "target": "/igloo/utils/busybox", }, # Add ltrace prototype files. They go in /igloo/ltrace because /igloo is treated as ltrace's /usr/share, and the files are normally in /usr/share/ltrace. "/igloo/ltrace/*": { "type": "host_file", "mode": 0o444, "host_path": os.path.join(*[STATIC_DIR, "ltrace", "*"]), }, # Dynamic libraries "/igloo/dylibs/*": { "type": "host_file", "mode": 0o755, "host_path": os.path.join(STATIC_DIR, "dylibs", self.dylib_dir or self.arch_dir, "*"), }, # Startup scripts "/igloo/source.d/*": { "type": "host_file", "mode": 0o755, "host_path": os.path.join(*[RESOURCES, "source.d", "*"]), }, "/igloo/serial": { "type": "dev", "devtype": "char", "major": igloo_serial_major, "minor": igloo_serial_minor, "mode": 0o666, } }, "plugins": default_plugins, } # Always add our utilities into static files guest_scripts_dir = os.path.join(STATIC_DIR, "guest-utils", "scripts") for f in os.listdir(guest_scripts_dir): result["static_files"][f"/igloo/utils/{f}"] = { "type": "host_file", "host_path": f"{guest_scripts_dir}/{f}", "mode": 0o755, } result["static_files"]["/igloo/utils/*"] = { "type": "host_file", "host_path": f"{STATIC_DIR}/{self.arch_dir}/*", "mode": 0o755, } return result
[docs] class RootShell(PatchGenerator): ''' Add root shell ''' def __init__(self) -> None: self.patch_name = "root_shell" self.enabled = False
[docs] def generate(self, patches: dict) -> dict: return { "core": { "root_shell": False, }, }
[docs] class DynamicExploration(PatchGenerator): ''' We are dynamically evaluating and refining a configuration. We need to collect data programatically. Disable root shell, enable coverage-tracking and nmap for coverage generation. Enable VPN so nmap has something to talk to. Ideally this will also be paired with ShimBusybox to get shell-level instrumentation. ''' def __init__(self) -> None: self.patch_name = "auto_explore" self.enabled = False
[docs] def generate(self, patches: dict) -> dict: return { "core": { "root_shell": False, }, "plugins": { "nmap": { "enabled": True, }, "vpn": { "enabled": True, "log": True, }, "netbinds": { "enabled": True, "shutdown_on_www": False, }, } }
[docs] class SingleShotFICD(PatchGenerator): ''' We are doing a single-shot, automated evaluation. Disable root shell, but keep VPN on and measure FICD ''' def __init__(self) -> None: self.patch_name = "single_shot_ficd" self.enabled = False
[docs] def generate(self, patches: dict) -> dict: return { "core": { "root_shell": False, }, "plugins": { "nmap": { "enabled": False, }, "vpn": { "enabled": True, }, "netbinds": { "enabled": True, "shutdown_on_www": False, # FICD or www success results in shutdown }, "ficd": { "enabled": True, "stop_on_if": True, }, "fetch_web": { "enabled": True, "shutdown_after_www": True, # FICD or www success results in shutdown }, } }
[docs] class SingleShot(PatchGenerator): ''' We are doing a single-shot, automated evaluation. Disable root shell, leave coverage/nmap, but keep VPN on and use fetch_web to collect responses ''' def __init__(self) -> None: self.patch_name = "single_shot" self.enabled = False
[docs] def generate(self, patches: dict) -> dict: return { "core": { "root_shell": False, }, "plugins": { "nmap": { "enabled": False, }, "vpn": { "enabled": True, }, "netbinds": { "enabled": True, "shutdown_on_www": False, # We want fetch_web to do the shutdown }, "fetch_web": { "enabled": True, "shutdown_after_www": True, }, } }
[docs] class ManualInteract(PatchGenerator): ''' Interactive for manual exploration. Enable root shell, enable vpn. Do not terminate on www bind. ''' def __init__(self) -> None: self.patch_name = "manual" self.enabled = True
[docs] def generate(self, patches: dict) -> dict: return { "core": { "root_shell": True }, "plugins": { "nmap": { "enabled": False, }, "vpn": { "enabled": True, }, "netbinds": { "enabled": True, "shutdown_on_www": False, }, } }
[docs] class NetdevsDefault(PatchGenerator): ''' Add list of default network device names. ''' def __init__(self) -> None: self.enabled = True self.patch_name = "netdevs.default"
[docs] def generate(self, patches: dict) -> dict: return {'netdevs': default_netdevs}
[docs] class NetdevsTailored(PatchGenerator): ''' Add list of network device names observed in static analysis. ''' def __init__(self, netdevs: dict) -> None: self.enabled = True self.patch_name = "netdevs.dynamic" self.netdevs = netdevs
[docs] def generate(self, patches: dict) -> dict | None: values = set() if not self.netdevs: return for src, devs in self.netdevs.items(): values.update(devs) if len(values): return {'netdevs': sorted(list(values))}
[docs] class PseudofilesExpert(PatchGenerator): ''' Fixed set of pseudofile models from FirmAE. ''' def __init__(self) -> None: self.enabled = True self.patch_name = "pseudofiles.expert_knowledge"
[docs] def generate(self, patches: dict) -> dict: return {'pseudofiles': expert_knowledge_pseudofiles}
[docs] class LibInjectStringIntrospection(PatchGenerator): ''' Add LibInject aliases for string introspection (e.g., for comparison detection). For each method we see in the filesystem that's in our list of shim targets, add the shim ''' def __init__(self, library_info: dict) -> None: self.enabled = True self.patch_name = 'lib_inject.string_introspection' self.library_info = library_info
[docs] def generate(self, patches: dict) -> dict: aliases = {} for _, exported_syms in self.library_info.get("symbols", {}).items(): for sym in exported_syms: if sym in default_libinject_string_introspection: aliases[sym] = default_libinject_string_introspection[sym] return {'lib_inject': {'aliases': aliases}}
[docs] class LibInjectTailoredAliases(PatchGenerator): ''' Set default aliases in libinject based on library analysis. If one of the defaults is present in a library, we'll add it to the libinject alias list ''' def __init__(self, library_info: dict) -> None: self.enabled = True self.patch_name = 'lib_inject.dynamic_models' self.library_info = library_info self.unmodeled = set()
[docs] def generate(self, patches: dict) -> dict | None: aliases = {} # Only copy values from our defaults if we see that same symbol exported for _, exported_syms in self.library_info.get("symbols", {}).items(): for sym in exported_syms: if sym in default_lib_aliases: aliases[sym] = default_lib_aliases[sym] elif "nvram" in sym and sym not in self.unmodeled: self.unmodeled.add(sym) if len(self.unmodeled): logger.info(f"Detected {len(self.unmodeled)} unmodeled symbols around nvram. You may wish to create libinject models for these:") for sym in self.unmodeled: logger.info(f"\t{sym}") if len(aliases): return {'lib_inject': {'aliases': aliases}}
[docs] class LibInjectFixedAliases(PatchGenerator): ''' Set all aliases in libinject from our defaults. ''' def __init__(self) -> None: self.enabled = False self.patch_name = 'lib_inject.fixed_models'
[docs] def generate(self, patches: dict) -> dict: return {'lib_inject': {'aliases': default_lib_aliases}}
""" class LibInjectJITAliases(PatchGenerator): ''' For nvram methods that we don't have shims for, try throwing some defaults based on symbol names. This is probably going to break things but could be interesting ''' def __init__(self, library_info): self.enabled = True self.patch_name = 'lib_inject.jit_models' self.library_info = library_info self.unmodeled = set() def generate(self, patches): aliases = {} # Only copy values from our defaults if we see that same symbol exported for _, exported_syms in self.library_info.get("symbols", {}).items(): for sym in exported_syms: if "nvram" in sym and sym not in default_lib_aliases: if "_get" in sym: target = "libinject_nvram_get" elif "_set" in sym: target = "libinject_nvram_get" else: target = "libinject_ret_0" aliases[sym] = target logger.info(f"\tJIT mapping {sym} -> {target}") if len(aliases): return {'lib_inject': {'aliases': aliases}} """
[docs] class ForceWWW(PatchGenerator): ''' This is a hacky FirmAE approach to identify webservers and just start them. Unsurprisingly, it increases the rate of web servers starting. We'll export this into our static files section so we could later decide to try it. We'll enable this by default here. ''' def __init__(self, fs_path: str) -> None: self.enabled = False self.patch_name = 'force_www' self.fs_path = fs_path
[docs] def generate(self, patches: dict) -> dict | None: # Map between filename and command file2cmd = { "./etc/init.d/uhttpd": "/etc/init.d/uhttpd start", "./usr/bin/httpd": "/usr/bin/httpd", "./usr/sbin/httpd": "/usr/sbin/httpd", "./bin/goahead": "/bin/goahead", "./bin/alphapd": "/bin/alphapd", "./bin/boa": "/bin/boa", "./usr/sbin/lighttpd": "/usr/sbin/lighttpd -f /etc/lighttpd/lighttpd.conf", } www_cmds = [] www_paths = [] # Do we have lighttpd.conf? have_lighttpd_conf = os.path.isfile(os.path.join(self.fs_path, "./etc/lighttpd/lighttpd.conf")) for file, cmd in file2cmd.items(): if os.path.isfile(os.path.join(self.fs_path, file)): if file == "./usr/sbin/lighttpd" and not have_lighttpd_conf: # Lighttpd only valid if there's a config file continue www_cmds.append(cmd) www_paths.append(file) if not len(www_cmds): return # Start of the shell script # We want to start each identified webserver in a loop cmd_str = """#!/igloo/utils/sh /igloo/utils/busybox sleep 120 while true; do """ # Loop through the commands to add them to the script for cmd in www_cmds: cmd_str += f""" if ! (/igloo/utils/busybox ps | /igloo/utils/busybox grep -v grep | /igloo/utils/busybox grep -sqi "{cmd}"); then {cmd} & fi """ # Close the loop cmd_str += """ /igloo/utils/busybox sleep 30 done """ return { "core": { 'force_www': True }, "static_files": { "/igloo/utils/www_cmds": { "type": "inline_file", "contents": cmd_str, "mode": 0o755, } } }
[docs] class GenerateMissingDirs(PatchGenerator): ''' Examine the fs archive to identify missing directories We ignore the extracted filesystem because we want to ensure symlinks are handled correctly ''' TARGET_DIRECTORIES: list[str] = [ "/proc", "/etc_ro", "/tmp", "/var", "/run", "/sys", "/root", "/tmp/var", "/tmp/media", "/tmp/etc", "/tmp/var/run", "/tmp/home", "/tmp/home/root", "/tmp/mnt", "/tmp/opt", "/tmp/www", "/var/run", "/var/lock", "/usr/bin", "/usr/sbin", ] def __init__(self, archive_path: str, archive_files: list) -> None: self.patch_name = "static.missing_dirs" self.enabled = True self.archive_path = archive_path self.archive_files = {member.name[1:] for member in archive_files} @staticmethod def _resolve_path(d: str, symlinks: dict, depth: int = 0) -> str: parts = d.split("/") for i in range(len(parts), 1, -1): sub_path = "/".join(parts[:i]) if sub_path in symlinks: if depth > 10 or d == symlinks[sub_path]: logger.warning(f"Symlink loop detected for {d}") return d return GenerateMissingDirs._resolve_path( d.replace(sub_path, symlinks[sub_path], 1), symlinks, depth=depth+1 ) if not d.startswith("/"): d = "/" + d if d in symlinks: # We resolved a symlink to another symlink, need to recurse # XXX: What if our resolved path contains a symlink earlier in the path TODO if depth > 10 or d == symlinks[d]: logger.warning(f"Symlink loop detected for {d}") return d else: # Recurse return GenerateMissingDirs._resolve_path(symlinks[d], symlinks, depth=depth+1) return d
[docs] def generate(self, patches: dict) -> dict: # XXX: Do we want to operate on archives to ensure symlinks behave as expected? symlinks = TarHelper.get_symlink_members(self.archive_path) result = defaultdict(dict) for d in self.TARGET_DIRECTORIES: # It's not already in there, add it as a world-readable directory # Handle symlinks. If we have a directory like /tmp/var and /tmp is a symlink to /asdf, we want to make /asdf/var resolved_path = self._resolve_path(d, symlinks) # Try handling ../s by resolving the path if ".." in resolved_path.split("/"): resolved_path = os.path.normpath(resolved_path) if ".." in resolved_path.split("/"): logger.debug("Skipping directory with .. in path: " + resolved_path) continue while resolved_path.endswith("/"): resolved_path = resolved_path[:-1] # Check if this directory looks like / - it might be ./ or something else if resolved_path == ".": continue # Guestfs gets mad if there's a /. in the path if resolved_path.endswith("/."): resolved_path = resolved_path[:-2] # Look at each parent directory, is it a symlink? for i in range(1, len(resolved_path.split("/"))): parent = "/".join(resolved_path.split("/")[:i]) if parent in symlinks: logger.debug( f"Skipping {resolved_path} because parent {parent} is a symlink" ) continue # Clean up the path while "/./" in resolved_path: resolved_path = resolved_path.replace("/./", "/") while "//" in resolved_path: resolved_path = resolved_path.replace("//", "/") while resolved_path.endswith("/"): resolved_path = resolved_path[:-1] # If this path is in the archive OR any existing patches, skip # Note we're ignoring the enabled flag of patches if resolved_path in self.archive_files: continue if any([resolved_path in p[0].get('static_files', {}).keys() for p in patches.values()]): continue # Add path and parents (as necessary) path_parts = resolved_path.split("/") # If any parts are just .// for i in range(1, len(path_parts) + 1): subdir = "/".join(path_parts[:i]) if subdir not in self.archive_files: result['static_files'][subdir] = { "type": "dir", "mode": 0o755, } return result
[docs] class GenerateReferencedDirs(PatchGenerator): ''' FirmAE "Boot mitigation": find path strings in binaries, make their directories if they don't already exist. ''' def __init__(self, extract_dir): self.patch_name = "static.binary_paths" self.enabled = True self.extract_dir = extract_dir
[docs] def generate(self, patches: dict) -> dict: result = defaultdict(dict) for f in FileHelper.find_executables( self.extract_dir, {"/bin", "/sbin", "/usr/bin", "/usr/sbin"} ): # For things that look like binaries, find unique strings that look like paths for dest in list( set(FileHelper.find_strings_in_file(f, "^(/var|/etc|/tmp)(.+)([^\\/]+)$")) ): if any([x in dest for x in ["%s", "%c", "%d", "/tmp/services"]]): # Ignore these paths, printf format strings aren't real directories to create # Not sure what /tmp/services is or where we got that from? continue result["static_files"][dest] = { "type": "dir", "mode": 0o755, } return result
[docs] class GenerateShellMounts(PatchGenerator): """ Ensure we have /mnt/* directories referenced by shell scripts. """ def __init__(self, extract_dir, existing): self.patch_name = "static.shell_script_mounts" self.extract_dir = extract_dir self.enabled = True self.existing = {member.name[1:] for member in existing}
[docs] def generate(self, patches: dict) -> dict: result = defaultdict(dict) for f in FileHelper.find_shell_scripts(self.extract_dir): for dest in list( set(FileHelper.find_strings_in_file(f, "^/mnt/[a-zA-Z0-9._/]+$")) ): if not dest.endswith("/"): dest = os.path.dirname(dest) # We're making the directory in which the file we saw referenced # will be # Does this file exist in the filesystem or in any existing patches? if dest in self.existing: continue if any([dest in p[0].get('static_files', {}).keys() for p in patches.values()]): continue # Try resolving the dest (to handle symlinks more correctly than the existing check) if FileHelper.exists(self.extract_dir, dest): # Directory already exists - don't clobber! continue result['static_files'][dest] = { "type": "dir", "mode": 0o755, } return result
[docs] class GenerateMissingFiles(PatchGenerator): ''' Ensure we have /bin/sh, /etc/TZ, /var/run/nvramd.pid, and localhost in /etc/hosts. ''' def __init__(self, extract_dir: str) -> None: self.patch_name = "static.missing_files" self.enabled = True self.extract_dir = extract_dir
[docs] def generate(self, patches: dict) -> dict: # Firmadyne/FirmAE mitigation, ensure these 3 files always exist # Note including /bin/sh here means we'll add it if it's missing and as a symlink to /igloo/utils/busybox # this is similar to how we can shim an (existing) /bin/sh to point to /igloo/utils/busybox but here we # only add it if it's missing result = defaultdict(dict) model = { # Ensure /bin/sh exists if not already present "/bin/sh": { "type": "symlink", "target": "/igloo/utils/busybox" }, # Set timezone to EST "/etc/TZ": { "type": "inline_file", "contents": "EST5EDT", "mode": 0o755, }, # Needed for Ralink and D-Link # See https://github.com/firmadyne/libnvram/blob/e33692277d475d61a03e0772efeef5c829872f34/nvram.c#L189 "/var/run/nvramd.pid": { "type": "inline_file", "contents": "", "mode": 0o644, }, } for fname, data in model.items(): if not os.path.isfile(os.path.join(self.extract_dir, fname[1:])): result['static_files'][fname] = data # Ensure we have an entry for localhost in /etc/hosts. So long as we have an /etc/ directory hosts = "" if os.path.isfile(os.path.join(self.extract_dir, "etc/hosts")): with open(os.path.join(self.extract_dir, "etc/hosts"), "r") as f: hosts = f.read() # if '127.0.0.1 localhost' not in hosts: # Regex with whitespace and newlines if not re.search(r"^127\.0\.0\.1\s+localhost\s*$", hosts, re.MULTILINE): if len(hosts) and not hosts.endswith("\n"): hosts += "\n" hosts += "127.0.0.1 localhost\n" result["static_files"]["/etc/hosts"] = { "type": "inline_file", "contents": hosts, "mode": 0o755, } return result
[docs] class DeleteFiles(PatchGenerator): ''' Delete some files we don't want. ''' def __init__(self, extract_dir: str) -> None: self.patch_name = "static.delete_files" self.enabled = True self.extract_dir = extract_dir
[docs] def generate(self, patches: dict) -> dict: result = defaultdict(dict) # Delete some files that we don't want. securetty is general, limits shell access. # 'sys_resetbutton' is some FW-specific hack from FirmAE # TODO: does securetty matter if our root shell is disabled? for f in ["/etc/securetty", "/etc/scripts/sys_resetbutton"]: if os.path.isfile(os.path.join(self.extract_dir, f[1:])): result["static_files"][f] = { "type": "delete", } return result
[docs] class LinksysHack(PatchGenerator): ''' Linksys specific hack from FirmAE with pseudofile model. ''' def __init__(self, extract_dir: str) -> None: self.patch_name = "pseudofiles.linksys" self.enabled = True self.extract_dir = extract_dir
[docs] def generate(self, patches: dict) -> dict: result = defaultdict(dict) # TODO: The following changes from FirmAE should likely be disabled by default # as we can't consider this information as part of our search if it's in the initial config # Linksys specific hack from firmae if all( os.path.isfile(os.path.join(self.extract_dir, x[1:])) for x in ["/bin/gpio", "/usr/lib/libcm.so", "/usr/lib/libshared.so"] ): result["pseudofiles"]["/dev/gpio/in"] = { "read": { "model": "return_const", "val": 0xFFFFFFFF, } } return result
[docs] class KernelModules(PatchGenerator): """ Create a symlink from the guest kernel module path to our kernel's module path. (ie.., /lib/modules/1.2.0-custom -> /lib/modules/4.10.0) """ def __init__(self, extract_dir: str, kernel_version: dict) -> None: self.patch_name = "static.kernel_modules" self.enabled = True self.extract_dir = extract_dir self.kernel_version = kernel_version
[docs] @staticmethod def is_kernel_version(name: str) -> bool: # Regex to match typical kernel version patterns return re.match(r"^\d+\.\d+\.\d+(-[\w\.]+)?$", name) is not None
# Always use a.b.c format for the symlink target
[docs] @staticmethod def pad_kernel_version(ver: str) -> str: base = ver.split("-", 1)[0] tokens = base.split(".") while len(tokens) < 3: tokens.append("0") return ".".join(tokens)
[docs] def generate(self, patches: dict) -> dict: result = defaultdict(dict) # Identify original kernel version and create a symlink to /lib/modules kernel_version = None potential_kernels = set() # Only look at the top-level directories in self.extract_dir / lib / modules modules_path = os.path.join(self.extract_dir, "lib/modules") if os.path.exists(modules_path): for d in os.listdir(modules_path): d_path = os.path.join(modules_path, d) if os.path.isdir(d_path): potential_kernels.add(d) # Filter potential kernels to match the expected version pattern potential_kernels = {d for d in potential_kernels if self.is_kernel_version(d)} # Determine the kernel version to use if len(potential_kernels) == 1: kernel_version = potential_kernels.pop() elif len(potential_kernels) > 1: # Prioritize the version names that match more complex patterns with dashes for potential_name in potential_kernels: if "." in potential_name and "-" in potential_name: kernel_version = potential_name break if not kernel_version: # Fallback to a simpler version matching pattern for potential_name in potential_kernels: if "." in potential_name: kernel_version = potential_name break # Fallback to picking the first one (could improve this further) if not kernel_version: logger.warning( "Multiple kernel versions look valid (TODO improve selection logic, grabbing first)" ) logger.warning(potential_kernels) kernel_version = potential_kernels.pop() if kernel_version: # We have a kernel version, add it to our config padded_selected = self.pad_kernel_version(self.kernel_version["selected_kernel"]) padded_target = self.pad_kernel_version(kernel_version) result["static_files"][f"/lib/modules/{padded_selected}"] = { "type": "symlink", "target": f"/lib/modules/{padded_target}", } return result
[docs] class ShimBinaries: ''' Identify binaries in the guest FS that we want to shim and add symlinks to go from guest bin -> igloo bin into our config. ''' def __init__(self, files): self.files = files
[docs] def make_shims(self, shim_targets: dict[str, str]) -> dict: result = defaultdict(dict) for fname in self.files: path = fname.path.lstrip('.') # Trim leading . basename = os.path.basename(path) if path.startswith("/igloo/utils/"): raise ValueError( "Unexpected /igloo/utils present in input filesystem archive" ) # It's a guest file/symlink. If it's one of our targets and executable, we want to shim! if not (fname.isfile() or fname.issym()) or not fname.mode & ( stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH ): # Skip if it's not a file or non-executable continue # Is the current file one we want to shim? if basename in shim_targets: logger.debug(f"making shim for {basename}, full path: {path}, fname.path: {fname.path}") result["static_files"][path] = { "type": "shim", "target": f"/igloo/utils/{shim_targets[basename]}", } return result
[docs] class ShimStopBins(ShimBinaries, PatchGenerator): def __init__(self, files: list) -> None: super().__init__(files) self.patch_name = "static.shims.stop_bins" self.enabled = True
[docs] def generate(self, patches: dict) -> dict: return self.make_shims({ "reboot": "exit0.sh", "halt": "exit0.sh", })
[docs] class ShimNoModules(ShimBinaries, PatchGenerator): def __init__(self, files: list) -> None: super().__init__(files) self.patch_name = "static.shims.no_modules" self.enabled = True
[docs] def generate(self, patches: dict) -> dict: return self.make_shims({ "insmod": "exit0.sh" })
[docs] class ShimBusybox(ShimBinaries, PatchGenerator): def __init__(self, files: list) -> None: super().__init__(files) self.patch_name = "static.shims.busybox" self.enabled = False
[docs] def generate(self, patches: dict) -> dict: return self.make_shims({ "ash": "busybox", "sh": "busybox", "bash": "bash", })
[docs] class ShimCrypto(ShimBinaries, PatchGenerator): def __init__(self, files: list) -> None: super().__init__(files) self.patch_name = "static.shims.crypto" self.enabled = False
[docs] def generate(self, patches: dict) -> dict | None: result = self.make_shims({ "openssl": "openssl", "ssh-keygen": "ssh-keygen" }) if not len(result.get("static_files", [])): # Nothing to shim, don't add the key copy return result["static_files"]["/igloo/keys/*"] = { "type": "host_file", "mode": 0o444, "host_path": os.path.join(*[RESOURCES, "static_keys", "*"]) } return result
[docs] class ShimFwEnv(ShimBinaries, PatchGenerator): ''' Replace fw_printenv/getenv/setenv with hypercall based alternatives Work in progress. Needs testing ''' def __init__(self, files: list) -> None: raise NotImplementedError("Untested shim type") super().__init__(files) self.patch_name = "static.shims.fw_env"
[docs] def generate(self, patches: dict) -> dict: return self.make_shims({ "fw_printenv": "fw_printenv", "fw_getenv": "fw_printenv", "fw_setenv": "fw_printenv", })
[docs] class NvramLibraryRecovery(PatchGenerator): ''' During static analysis the LibrarySymbols class collected key->value mappings from libraries exporting some common nvram defaults symbols ("Nvrams", "router_defaults") - add these to our nvram config if we have any. TODO: if we find multiple nvram source files here, we should generate multiple patches. Then we should consider these during search. For now we just take non-conflicting values from largest to smallest source files. More realistic might be to try each file individually. ''' def __init__(self, library_info): self.library_info = library_info self.patch_name = "nvram.01_library" self.enabled = True
[docs] def generate(self, patches: dict) -> dict | None: sources = self.library_info.get("nvram", {}) if not len(sources): return # Sources is source filename -> key -> value # First we want to sort sources from most to least keys sorted_sources = sorted(sources.items(), key=lambda x: len(x[1]), reverse=True) nvram_defaults = {} for source, nvram in sorted_sources: for key, value in nvram.items(): if key not in nvram_defaults: nvram_defaults[key] = value if len(nvram_defaults): return {'nvram': nvram_defaults}
[docs] class NvramConfigRecovery(PatchGenerator): """ Search for files that contain nvram keys and values to populate NVRAM defaults """ def __init__(self, extract_dir: str) -> None: self.extract_dir = extract_dir self.patch_name = "nvram.02_config_paths" self.enabled = True
[docs] def generate(self, patches: dict) -> dict | None: result = NvramHelper.nvram_config_analysis(self.extract_dir, True) if len(result): return {'nvram': result}
[docs] class NvramConfigRecoveryWild(PatchGenerator): """ Search for files that contain nvram keys and values to populate NVRAM defaults. This version relaxes the search to allow for basename matches instead of full path matches. """ def __init__(self, extract_dir: str) -> None: self.extract_dir = extract_dir self.patch_name = "nvram.03_config_paths_basename" self.enabled = True
[docs] def generate(self, patches: dict) -> dict | None: result = NvramHelper.nvram_config_analysis(self.extract_dir, False) if len(result): return {'nvram': result}
[docs] class NvramDefaults(PatchGenerator): """ Add default nvram values from Firmadyne and FirmAE """ def __init__(self) -> None: self.patch_name = "nvram.04_defaults" self.enabled = True
[docs] def generate(self, patches: dict) -> dict | None: result = NvramHelper._get_default_nvram_values() if len(result): return {'nvram': result}
[docs] class NvramFirmAEFileSpecific(PatchGenerator): """ Apply FW-specific nvram patches based on presence of hardcoded strings in files from FirmAE. """ FIRMAE_TARGETS: dict[str, list[tuple[str, str]]] = { "./sbin/rc": [("ipv6_6to4_lan_ip", "2002:7f00:0001::")], "./lib/libacos_shared.so": [("time_zone_x", "0")], "./sbin/acos_service": [("rip_enable", "0")], "./usr/sbin/httpd": [ ("rip_multicast", "0"), ("bs_trustedip_enable", "0"), ("filter_rule_tbl", ""), ], } def __init__(self, fs_path: str) -> None: self.fs_path = fs_path self.patch_name = "nvram.05_firmae_file_specific"
[docs] def generate(self, patches: dict) -> dict | None: result = {} # For each key in static_targets, check if the query is in the file # TODO: Should we be operating on an archive to better handle symlinks? for key, queries in self.FIRMAE_TARGETS.items(): if not os.path.isfile(os.path.join(self.fs_path, key[1:])): continue try: with open(os.path.join(self.fs_path, key[1:]), "rb") as f: for query, _ in queries: # Check if query is in file if query.encode() in f.read(): result[key] = query except Exception as e: # Not sure what kind of errors we could encounter here, missing files? perms? logger.error(f"Failed to read {key} for nvram key check: {e}") if len(result): return {'nvram': result}
[docs] class PseudofilesTailored(PatchGenerator): ''' For all missing pseudofiles we saw referenced during static analysis, try adding them with a default model ''' def __init__(self, pseudofiles: dict) -> None: self.patch_name = "pseudofiles.dynamic" self.pseudofiles = pseudofiles self.enabled = True
[docs] def generate(self, patches: dict) -> dict | None: results = {} mtd_count = 0 for section, file_names in self.pseudofiles.items(): for file_name in file_names: if section == 'dev' and file_name.startswith("/dev/mtd"): # TODO: do we want to make placeholders for MTD or not? continue if file_name.endswith("/"): # We don't want to treat a directory as a pseudofile, instead we'll # add a placehodler into the directory. This ensures the directory is created # XXX: hyperfs doesn't allow userspace to create files in these directories yet # https://github.com/rehosting/hyperfs/issues/20 file_name += ".placeholder" results[file_name] = { 'read': { "model": "zero", }, 'write': { "model": "discard", } } if section == "dev": # /dev files get a default IOCTL model results[file_name]['ioctl'] = { '*': {"model": "return_const", "val": 0} } if file_name.startswith("/dev/mtd"): # MTD devices get a name (shows up in /proc/mtd) # Note 'uboot' probably isn't right, but we need something results[file_name]['name'] = f"uboot.{mtd_count}" mtd_count += 1 if len(results): return {'pseudofiles': results}