Source code for pyplugins.apis.unwind

"""
Unwind Plugin (unwind.py)
=========================

Advanced stack unwinding using DWARF CFI and Heuristics with Capstone-powered
instruction validation for multi-architecture support.

Dependencies:
- capstone (pip install capstone)
- penguin.plugins.*
- wrappers.ptregs_wrap
"""

import bisect
from typing import Optional, List, Dict, Any, Generator, Tuple

# External Libs
try:
    from capstone import *
    from capstone.arm import *
    from capstone.arm64 import *
    from capstone.mips import *
    from capstone.ppc import *
    from capstone.x86 import *
    from capstone.riscv import *
    HAVE_CAPSTONE = True
except ImportError:
    HAVE_CAPSTONE = False

from elftools.elf.elffile import ELFFile
from elftools.dwarf.callframe import FDE
from penguin import Plugin, plugins
from wrappers.ptregs_wrap import PtRegsWrapper


[docs] class ArchInfo: def __init__(self, name, ptr_size, endian, dwarf_map, cs_arch, cs_mode, call_offset): self.name = name self.ptr_size = ptr_size self.endian = endian self.dwarf_map = dwarf_map self.cs_arch = cs_arch self.cs_mode = cs_mode self.call_offset = call_offset # Distance from Return Address to Call Instr
# DWARF Register Mappings (Standard ABI) for creating new states # We still need these to interpret DWARF rules, but not for initial state. X86_64_MAP = {"sp_reg": 7, "ra_reg": 16, "fp_reg": 6} ARM_MAP = {"sp_reg": 13, "ra_reg": 14, "fp_reg": 11} # R13=SP, R14=LR, R11=FP ARM64_MAP = {"sp_reg": 31, "ra_reg": 30, "fp_reg": 29} # X29=FP, X30=LR MIPS_MAP = {"sp_reg": 29, "ra_reg": 31, "fp_reg": 30} # $29=SP, $31=RA, $30=FP PPC_MAP = {"sp_reg": 1, "ra_reg": 65, "fp_reg": 31} # x2=SP, x1=RA, x8=FP/s0 RISCV_MAP = {"sp_reg": 2, "ra_reg": 1, "fp_reg": 8} CONFIGS = { "intel64": ArchInfo("x86_64", 8, "little", X86_64_MAP, CS_ARCH_X86, CS_MODE_64, 5), "armel": ArchInfo("arm", 4, "little", ARM_MAP, CS_ARCH_ARM, CS_MODE_ARM, 4), "aarch64": ArchInfo("arm64", 8, "little", ARM64_MAP, CS_ARCH_ARM64, CS_MODE_LITTLE_ENDIAN, 4), "mipsel": ArchInfo("mips", 4, "little", MIPS_MAP, CS_ARCH_MIPS, CS_MODE_MIPS32 + CS_MODE_LITTLE_ENDIAN, 8), "mipseb": ArchInfo("mips", 4, "big", MIPS_MAP, CS_ARCH_MIPS, CS_MODE_MIPS32 + CS_MODE_BIG_ENDIAN, 8), "mips64el": ArchInfo("mips64", 8, "little", MIPS_MAP, CS_ARCH_MIPS, CS_MODE_MIPS64 + CS_MODE_LITTLE_ENDIAN, 8), "mips64eb": ArchInfo("mips64", 8, "big", MIPS_MAP, CS_ARCH_MIPS, CS_MODE_MIPS64 + CS_MODE_BIG_ENDIAN, 8), "powerpc": ArchInfo("ppc", 4, "big", PPC_MAP, CS_ARCH_PPC, CS_MODE_32 + CS_MODE_BIG_ENDIAN, 4), "powerpc64": ArchInfo("ppc64", 8, "big", PPC_MAP, CS_ARCH_PPC, CS_MODE_64 + CS_MODE_BIG_ENDIAN, 4), "powerpc64le": ArchInfo("ppc64", 8, "little", PPC_MAP, CS_ARCH_PPC, CS_MODE_64 + CS_MODE_LITTLE_ENDIAN, 4), "riscv64": ArchInfo("riscv", 8, "little", RISCV_MAP, CS_ARCH_RISCV, CS_MODE_RISCV64, 4), "loongarch64": ArchInfo("loongarch", 8, "little", {"sp_reg": 3, "ra_reg": 1, "fp_reg": 22}, None, None, 4), }
[docs] class StackUnwinder(Plugin): def __init__(self): self._elf_cache = {} self._fde_cache = {} self._arch_info = None self._reverse_sym_cache = {} self._is_pie_cache = {} self._md = None self.logger.setLevel("DEBUG") if not HAVE_CAPSTONE: self.logger.warning( "Capstone not found. Heuristic unwinding will be severely limited.") def _init_capstone(self, arch: ArchInfo): if not HAVE_CAPSTONE or not arch.cs_arch: return if self._md: return try: self._md = Cs(arch.cs_arch, arch.cs_mode) self._md.detail = True except Exception as e: self.logger.error(f"Failed to init Capstone for {arch.name}: {e}") self._md = None def _get_arch_info(self) -> ArchInfo: if self._arch_info: return self._arch_info conf = self.get_arg("conf") arch_str = conf.get("core", {}).get("arch") if conf else "intel64" if arch_str not in CONFIGS: arch_str = "intel64" info = CONFIGS[arch_str] self._init_capstone(info) return info def _get_elf_for_mapping(self, path: str) -> Optional[ELFFile]: if path in self._elf_cache: return self._elf_cache[path] try: f = plugins.static_fs.open(path) if not f: return None elf = ELFFile(f) self._elf_cache[path] = elf try: self._is_pie_cache[path] = (elf.header.e_type == 'ET_DYN') except Exception: self._is_pie_cache[path] = True return elf except Exception: return None def _prepare_reverse_lookup(self, path: str): if path in self._reverse_sym_cache: return offsets = [] try: sym_db = plugins.symbols._load_symbols_db() if sym_db and path in sym_db: offsets = sorted( [(off, name) for name, off in sym_db[path].items() if isinstance(off, int)]) except AttributeError: pass if not offsets: try: plugins.symbols.lookup(path, "__force_load__") sym_db = plugins.symbols._load_symbols_db() if sym_db and path in sym_db: offsets = sorted( [(off, name) for name, off in sym_db[path].items() if isinstance(off, int)]) except Exception: pass self._reverse_sym_cache[path] = offsets def _resolve_symbol(self, path: str, address: int) -> Tuple[str, int]: self._prepare_reverse_lookup(path) sorted_syms = self._reverse_sym_cache.get(path, []) if not sorted_syms: return "unknown", 0 idx = bisect.bisect_right(sorted_syms, (address, "")) if idx == 0: return "unknown", 0 sym_addr, sym_name = sorted_syms[idx - 1] diff = address - sym_addr if diff > 0x10000: return "unknown", 0 return sym_name, diff def _normalize_mappings(self, raw_mappings: Any) -> Tuple[List[Any], Dict[str, int]]: self.logger.debug(f"Normalizing {len(raw_mappings)} mappings...") sorted_maps = sorted(list(raw_mappings), key=lambda x: x.base) normalized = [] lib_bases = {} last_named_map = None for m in sorted_maps: name = m.name if (not name or name == "[anonymous]" or name == "/dev/zero") and last_named_map: if m.base - (last_named_map.base + last_named_map.size) < 0x2000: m.name = last_named_map.name name = m.name if name and not name.startswith("[") and name != "/dev/zero": last_named_map = m if name not in lib_bases: lib_bases[name] = m.base else: lib_bases[name] = min(lib_bases[name], m.base) normalized.append(m) return normalized, lib_bases
[docs] def unwind(self, regs: PtRegsWrapper) -> Generator[Any, None, List[Dict[str, Any]]]: self.logger.info("--- Starting Multi-Arch Unwind ---") frames = [] arch = self._get_arch_info() dwarf_map = arch.dwarf_map # Initial State from Wrapper current_ip = regs.get_pc() current_sp = regs.get_sp() # Best effort FP retrieval using generic aliases in wrapper current_fp = None # Try generic aliases first (fp, rbp, s0, etc handled by wrapper) if "fp" in regs.REG_NAMES: current_fp = regs.get_register("fp") elif "ebp" in regs.REG_NAMES: current_fp = regs.get_register("ebp") elif "s0" in regs.REG_NAMES: current_fp = regs.get_register("s0") # RISC-V FP elif "r11" in regs.REG_NAMES and "arm" in arch.name: current_fp = regs.get_register("r11") # ARM FP # DWARF State tracking (Dictionary of register indices) # We only populate this fully if we are doing DWARF unwinding current_dwarf_regs = {} if dwarf_map: current_dwarf_regs[dwarf_map["sp_reg"]] = current_sp # Populate generics for i in range(32): val = regs.get_register(f"r{i}") if val is not None: current_dwarf_regs[i] = val raw_mappings = yield from plugins.osi.get_mappings() sorted_maps, lib_bases = self._normalize_mappings(raw_mappings) visited_sps = {current_sp} for depth in range(64): # --- Symbol Resolution --- sym_name = "unknown" sym_diff = 0 module_name = "unknown" map_offset = 0 mapping = next((m for m in sorted_maps if m.base <= current_ip < m.base + m.size), None) if mapping: module_name = mapping.name map_offset = current_ip - mapping.base if module_name and not module_name.startswith("["): # PIE logic is_pie = self._is_pie_cache.get(module_name, True) lookup = current_ip if is_pie: if module_name in lib_bases: lookup = current_ip - lib_bases[module_name] else: lookup = current_ip - mapping.base sym_name, sym_diff = self._resolve_symbol( module_name, lookup) repr_str = f"{current_ip:#x}" if sym_name != "unknown": repr_str = f"{module_name}!{sym_name}+{sym_diff:#x}" frames.append({ "depth": depth, "ip": current_ip, "sp": current_sp, "module": module_name, "symbol": sym_name, "repr": repr_str, "offset": map_offset, }) # --- Next State Determination --- # Tuple (next_ip, next_sp, next_fp, next_dwarf_regs) next_state = None method = "failed" # 1. DWARF CFI if dwarf_map: try: # Pass the dict, get back (ret_addr, new_reg_dict) dwarf_res = yield from self._unwind_frame_dwarf(current_ip, current_dwarf_regs, mapping, arch) if dwarf_res: d_ret, d_regs = dwarf_res d_sp = d_regs.get(dwarf_map["sp_reg"], current_sp) d_fp = d_regs.get(dwarf_map["fp_reg"], current_fp) next_state = (d_ret, d_sp, d_fp, d_regs) method = "dwarf" except Exception: pass # 2. Frame Pointer if not next_state and current_fp and current_fp > 0: if abs(current_fp - current_sp) < 0x100000: fp_res = yield from self._unwind_frame_fp(current_fp, sorted_maps, arch) if fp_res: f_ret, f_sp, f_fp = fp_res # Clear dwarf regs, we lost context next_state = (f_ret, f_sp, f_fp, {}) method = "frame_pointer" # 3. Heuristic if not next_state: func_start_addr = ( current_ip - sym_diff) if sym_name != "unknown" else None h_res = yield from self._unwind_frame_heuristic(current_sp, current_ip, sorted_maps, arch, func_start_addr) if h_res: h_ret, h_sp = h_res # Heuristic doesn't recover FP or full regs next_state = (h_ret, h_sp, current_fp, {}) method = "heuristic" if not next_state: break current_ip, current_sp, current_fp, current_dwarf_regs = next_state frames[-1]["method"] = method if current_sp in visited_sps: self.logger.warning(f" Loop detected at SP {current_sp:#x}") break visited_sps.add(current_sp) if current_ip == 0: break return frames
def _unwind_frame_dwarf(self, pc, regs_dict, mapping, arch): if not mapping or not mapping.name or mapping.name.startswith("["): return None elf = self._get_elf_for_mapping(mapping.name) if not elf or not elf.has_dwarf_info(): return None rel_pc = pc - mapping.base + mapping.offset # Optimized lookup (cache) cache_key = f"{mapping.name}_{rel_pc}" fde = self._fde_cache.get(cache_key) if not fde: dwarf = elf.get_dwarf_info() if not dwarf.has_CFI(): return None # Still linear, but result is cached. # Ideally use an IntervalTree here for O(log n) for entry in dwarf.CFI_entries(): if isinstance(entry, FDE) and entry.header.initial_location <= rel_pc < entry.header.initial_location + entry.header.address_range: fde = entry break if fde: self._fde_cache[cache_key] = fde if not fde: return None decoded = fde.get_decoded() rule_row = next((r for r in reversed(decoded) if r.pc <= rel_pc), None) if not rule_row: return None # Calculate CFA cfa = regs_dict.get(rule_row.cfa.reg, 0) + rule_row.cfa.offset new_regs = regs_dict.copy() new_regs[arch.dwarf_map["sp_reg"]] = cfa # Calculate Return Address ret_addr = 0 ra_reg = arch.dwarf_map["ra_reg"] ra_rule = rule_row.regs.get(ra_reg) if not ra_rule: ret_addr = regs_dict.get(ra_reg, 0) elif ra_rule[0] == 'OFFSET': data = yield from plugins.mem.read_bytes(cfa + ra_rule[1], arch.ptr_size) if data: ret_addr = int.from_bytes(data, byteorder=arch.endian) if ret_addr == 0: return None return ret_addr, new_regs def _unwind_frame_fp(self, fp, mappings, arch): """ Standard RBP/FP chain walking. Returns: (ret_addr, new_sp, new_fp) """ # Read [FP] -> Old FP, [FP+Size] -> Ret Addr try: w1 = yield from plugins.mem.read_bytes(fp, arch.ptr_size) w2 = yield from plugins.mem.read_bytes(fp + arch.ptr_size, arch.ptr_size) if not w1 or not w2: return None next_fp = int.from_bytes(w1, byteorder=arch.endian) ra = int.from_bytes(w2, byteorder=arch.endian) # Sanity checks def is_code(addr): for m in mappings: if m.base <= addr < m.base + m.size: return m.exec return False if not is_code(ra): return None if next_fp <= fp: return None # Stack must grow up # SP for next frame is usually just above where RA was saved new_sp = fp + (arch.ptr_size * 2) return ra, new_sp, next_fp except Exception: return None def _validate_call_capstone(self, addr: int, data: bytes, func_start: Optional[int]) -> Tuple[bool, bool]: """ Returns: (Is_Call, Linkage_Confirmed) """ if not self._md: return True, None try: insns = list(self._md.disasm(data, addr)) if not insns: return False, False insn = insns[0] if not (insn.group(CS_GRP_CALL) or insn.group(CS_GRP_BRANCH_RELATIVE)): return False, False if func_start is None: return True, None for op in insn.operands: if op.type == CS_OP_IMM: target = op.imm if target == func_start: return True, True if abs(target - addr) > 0x100000: return True, False return True, None except Exception: return False, False def _unwind_frame_heuristic(self, sp, current_ip, mappings, arch, current_func_start): stack_data = yield from plugins.mem.read_bytes(sp, 1024) if not stack_data: return None ptr_size = arch.ptr_size endian = arch.endian # We step through the stack looking for pointers for i in range(0, len(stack_data), ptr_size): chunk = stack_data[i:i+ptr_size] possible_ra = int.from_bytes(chunk, byteorder=endian) # Filter 1: Basic constraints if possible_ra < 0x1000 or (possible_ra & 1): continue if possible_ra == current_ip: continue # Filter 2: Executable Memory target_map = next((m for m in mappings if m.base <= possible_ra < m.base + m.size), None) if not target_map or not target_map.exec: continue # Filter 3: Call Site Validation is_valid_call = False # X86 Backwards scan if "x86" in arch.name: scan_start = possible_ra - 16 instr_bytes = yield from plugins.mem.read_bytes(scan_start, 16) if instr_bytes: for offset in range(14, 0, -1): candidate_data = instr_bytes[offset:] if not candidate_data: continue try: insn = next(self._md.disasm( candidate_data, scan_start + offset)) if (scan_start + offset + insn.size) == possible_ra: if insn.group(CS_GRP_CALL): is_valid_call = True break except StopIteration: pass # RISC Fixed offset else: call_offset = arch.call_offset call_site_addr = possible_ra - call_offset instr_bytes = yield from plugins.mem.read_bytes(call_site_addr, 4) if instr_bytes: is_call, linkage = self._validate_call_capstone( call_site_addr, instr_bytes, current_func_start) # If we explicitly failed linkage (ghost frame), skip if linkage is False: continue is_valid_call = is_call if is_valid_call: new_sp = sp + i + ptr_size return possible_ra, new_sp return None