Source code for pyplugins.apis.symbols

"""
Symbols Plugin (symbols.py) for Penguin
=======================================

This module provides the Symbols plugin for the Penguin framework, serving as a robust,
centralized service for resolving binary symbols to file offsets. It allows other plugins
and scripts to locate functions and variables within guest executables and shared libraries,
even in challenging scenarios like stripped binaries or non-standard architectures.

Features
--------

- **Robust Forward Lookup:** Resolves symbol names to file offsets using a tiered strategy:
  1. Pre-computed JSON cache (fastest).
  2. Native ``nm`` utility (static and dynamic tables).
  3. PyELFtools (section parsing).
  4. Manual PT_DYNAMIC segment parsing (handles "sstripped" binaries with no section headers).
  5. ``readelf`` fallbacks, including MIPS GOT scraping for embedded targets.
- **Reverse Resolution:** Maps file offsets back to the nearest symbol name (useful for stack trace generation).
- **Address Resolution:** Maps virtual addresses to file offsets (useful for handling raw addresses provided by users).
- **Introspection:** Methods to list, filter, and bulk-load symbols for specific binaries.
- **Architecture Aware:** Automatically handles absolute addressing (ET_EXEC) vs relative offsets (ET_DYN) and architecture-specific symbol tables.

Example Usage
-------------

.. code-block:: python

    from penguin import plugins

    # 1. Forward Lookup: Get the binary path and file offset for a function
    #    (Useful for placing hooks or uprobes)
    path, offset = plugins.Symbols.lookup("/usr/bin/httpd", "httpGetEnv")
    if offset:
        print(f"Function located at offset {hex(offset)} in {path}")

    # 2. Address Resolution: Convert a virtual address to a file offset
    offset = plugins.Symbols.resolve_addr("/usr/bin/httpd", 0x400500)

    # 3. Reverse Lookup: Identify a function from a crash/instruction pointer
    name, dist = plugins.Symbols.resolve_offset("/usr/lib/libc.so.0", 0x12345)

Purpose
-------

The Symbols plugin bridges the gap between high-level analysis names (functions) and low-level
binary offsets required for instrumentation.
"""

import os
import lzma
import stat
import shutil
import subprocess
import tempfile
import struct
import cxxfilt
from typing import Dict, Any, Optional, Tuple, List

from elftools.elf.elffile import ELFFile
from elftools.elf.sections import SymbolTableSection
from elftools.common.exceptions import ELFError

from penguin import Plugin, plugins


[docs] class Symbols(Plugin): """ Symbols Plugin ============== A central service for resolving symbol names to file offsets within the guest. """ def __init__(self): self.projdir = self.get_arg("proj_dir") if self.get_arg_bool("verbose"): self.logger.setLevel("DEBUG") self.libsymbols_path = os.path.join( self.projdir, "static", "LibrarySymbols.json.xz") self.nm_path = shutil.which("nm") if not self.nm_path: self.logger.warning( "Host 'nm' utility not found. Symbol lookup will be slower.") self._symbols_cache: Optional[Dict[str, Any]] = None self._symbols_loaded = False
[docs] def load_symbols(self, path: str) -> Dict[str, int]: """ Force-loads symbols for a binary into the cache. Useful for pre-warming the cache or debugging what symbols are detected. """ # 1. Resolve Path resolved_path = path if '*' not in path: resolved_path = self._resolve_staticfs_symlink(path) # 2. Check if already cached db = self._load_symbols_db() if db and resolved_path in db: return db[resolved_path] # 3. Trigger a dummy lookup to force population # We pass a dummy symbol that likely doesn't exist to trigger the full scan logic # logic inside _scan_nm and _scan_file_fallback populates the cache for ALL symbols found self.lookup(path, "__FORCE_LOAD_TRIGGER__") # 4. Return the now-populated cache db = self._load_symbols_db() return db.get(resolved_path, {})
[docs] def resolve_offset(self, path: str, offset: int) -> Optional[Tuple[str, int]]: """ Reverse lookup: Given a file offset, find the nearest preceding symbol. Returns (SymbolName, DistanceFromStart) Example: resolve_offset("/bin/httpd", 0x4005) -> ("main", 5) """ symbols = self.load_symbols(path) if not symbols: return None best_symbol = None min_dist = float('inf') # Linear scan is fast enough for symbol tables (usually <10k entries) for name, sym_offset in symbols.items(): if sym_offset <= offset: dist = offset - sym_offset if dist < min_dist: min_dist = dist best_symbol = name if best_symbol: return best_symbol, min_dist return None
[docs] def resolve_addr(self, path: str, vaddr: int, base_addr: Optional[int] = None) -> Optional[int]: """ Resolves a virtual address to a file offset. If `base_addr` is provided, the offset is calculated as `vaddr - base_addr`. Otherwise, it attempts to map the virtual address to a file offset using ELF segments. If that fails, it retries by assuming common base addresses (e.g., 0x400000) and checking if the adjusted address maps to a segment or valid file offset. Returns the file offset or None if resolution fails. """ # 1. Explicit Base Address if base_addr is not None: return vaddr - base_addr resolved_path = path if '*' not in path: resolved_path = self._resolve_staticfs_symlink(path) fs = plugins.static_fs f = None try: f = fs.open(resolved_path) if not f: return None # 2. ELF Segment Analysis try: elffile = ELFFile(f) is_exec = elffile.header['e_type'] == 'ET_EXEC' segments = [] image_base = 0 for segment in elffile.iter_segments(): if segment['p_type'] == 'PT_LOAD': seg_info = { 'vaddr': segment['p_vaddr'], 'memsz': segment['p_memsz'], 'offset': segment['p_offset'] } segments.append(seg_info) if image_base == 0 or seg_info['vaddr'] < image_base: image_base = seg_info['vaddr'] # A. Try Direct Mapping offset = self._vaddr_to_file_offset_optimized( segments, vaddr, is_exec, image_base) if offset is not None: return offset # B. Try Common Bases # Common bases: Linux 64-bit ET_EXEC (0x400000), Ghidra PIE default (0x100000), # ARM/MIPS/Older (0x10000), Linux 32-bit (0x8048000). common_bases = [0x400000, 0x100000, 0x10000, 0x8048000] for base in common_bases: adjusted = vaddr - base if adjusted < 0: continue # Check if adjusted fits in ELF segments offset = self._vaddr_to_file_offset_optimized( segments, adjusted, is_exec, image_base) if offset is not None: self.logger.debug( f"Resolved {vaddr:#x} using common base {base:#x} -> {offset:#x}") return offset except ELFError as e: # Not an ELF or parse error, fall through to raw size check self.logger.error( f"ELF parsing failed for {resolved_path}: {e}") # 3. Fallback: Raw File Size Check # If ELF parsing failed or no segments matched, check if a common base adjustment # yields a valid raw offset within the file. try: # We need file size. f is already open. f.seek(0, 2) file_size = f.tell() common_bases = [0x400000, 0x100000, 0x10000, 0x8048000] for base in common_bases: adjusted = vaddr - base if 0 <= adjusted < file_size: self.logger.debug( f"Resolved {vaddr:#x} using common base {base:#x} -> raw offset {adjusted:#x}") return adjusted except Exception as e: self.logger.error( f"Raw file fallback failed for {resolved_path}: {e}") except Exception as e: self.logger.error( f"Unexpected error resolving addr {vaddr:#x} in {resolved_path}: {e}") pass return None
[docs] def list_symbols(self, path: str, filter_str: Optional[str] = None) -> List[str]: """ Returns a list of all symbol names found in the binary. Optional filter_str performs a substring match. """ symbols = self.load_symbols(path) if not symbols: return [] if filter_str: return [name for name in symbols.keys() if filter_str in name] return list(symbols.keys())
[docs] def lookup(self, path: str, symbol: str) -> Tuple[Optional[str], Optional[int]]: """ Resolve a symbol name to a specific library path and file offset. Parameters ---------- path : str Path to binary (supports wildcards like "*/libc.so*"). symbol : str Symbol name to look up. Returns ------- Tuple[str, int] or (None, None) """ # 1. Resolve Symlinks resolved_path = path if '*' not in path: resolved_path = self._resolve_staticfs_symlink(path) # 2. Check JSON/Runtime Database (Fastest) res_path, res_offset = self._scan_json(resolved_path, symbol) if res_offset is not None: return res_path, res_offset # Retry with original path if resolved was different if resolved_path != path: res_path, res_offset = self._scan_json(path, symbol) if res_offset is not None: return res_path, res_offset # 3. Native `nm` Lookup (Fast) if self.nm_path and '*' not in resolved_path: res_path, res_offset = self._scan_nm(resolved_path, symbol) if res_offset is not None: return res_path, res_offset # 4. Fallback: PyELFtools / Manual Raw / Readelf (Kitchen Sink) if '*' not in resolved_path: self.logger.debug( f"[Fallback] Attempting robust fallback for {resolved_path}") return self._scan_file_fallback(resolved_path, symbol) return None, None
[docs] def find_all(self, symbol: str) -> List[Tuple[str, int]]: """ Search for a symbol in ALL known libraries in the database. Parameters ---------- symbol : str Symbol name to look up. Returns ------- List[Tuple[str, int]] A list of (library_path, file_offset) for every occurrence of the symbol. """ results = [] db = self._load_symbols_db() if not db: return results # Pre-calculate demangled name once demangled_target = None if symbol.startswith('_Z'): try: demangled_target = cxxfilt.demangle(symbol) except Exception: pass for lib_path, lib_symbols in db.items(): offset = self._resolve_symbol_in_dict( lib_symbols, symbol, demangled_target) if offset is not None: results.append((lib_path, offset)) return results
[docs] def get_offset(self, path: str, symbol: str) -> Optional[int]: _, offset = self.lookup(path, symbol) return offset
# ------------------------------------------------------------------------- # Internal Logic # ------------------------------------------------------------------------- def _load_symbols_db(self) -> Dict[str, Any]: if self._symbols_loaded: return self._symbols_cache self._symbols_loaded = True self._symbols_cache = {} if os.path.exists(self.libsymbols_path): try: with lzma.open(self.libsymbols_path, 'rt', encoding='utf-8') as f: import ujson as json data = json.load(f) self._symbols_cache = data.get("symbols", {}) self.logger.info( f"Loaded symbols DB from {self.libsymbols_path}") except Exception as e: self.logger.warning(f"Failed to load symbols DB: {e}") return self._symbols_cache def _update_cache(self, path: str, new_symbols: Dict[str, int]): """Adds newly discovered symbols for a path to the runtime cache.""" db = self._load_symbols_db() if db is not None: if path not in db or len(new_symbols) > len(db[path]): self.logger.debug( f"[Cache] Updating cache for {path} with {len(new_symbols)} symbols") db[path] = new_symbols def _scan_json(self, path: str, symbol: str) -> Tuple[Optional[str], Optional[int]]: db = self._load_symbols_db() if not db: return None, None demangled_target = None if symbol.startswith('_Z'): try: demangled_target = cxxfilt.demangle(symbol) except Exception: pass if '*' in path: pattern = path.replace('*', '') for lib_path, lib_symbols in db.items(): if pattern in lib_path or pattern in os.path.basename(lib_path): offset = self._resolve_symbol_in_dict( lib_symbols, symbol, demangled_target) if offset is not None: return lib_path, offset return None, None norm_path = os.path.basename(path) for lib_path, lib_symbols in db.items(): lib_basename = os.path.basename(lib_path) is_match = (path == lib_path or norm_path == lib_basename or (norm_path.rstrip('-') and lib_basename.startswith(norm_path.rstrip('-')))) if is_match: offset = self._resolve_symbol_in_dict( lib_symbols, symbol, demangled_target) if offset is not None: return lib_path, offset return None, None def _resolve_symbol_in_dict(self, symbols_dict: Dict[str, int], target: str, demangled_target: Optional[str]) -> Optional[int]: if target in symbols_dict: return symbols_dict[target] if demangled_target: for name, offset in symbols_dict.items(): if name.startswith('_Z'): if cxxfilt.demangle(name) == demangled_target: return offset return None def _run_nm_command(self, cmd: List[str]) -> Dict[str, int]: symbols = {} try: # self.logger.debug(f"[NM] Running: {' '.join(cmd)}") proc = subprocess.Popen( cmd, stdout=subprocess.PIPE, stderr=subprocess.DEVNULL, text=True ) for line in proc.stdout: parts = line.split() if len(parts) >= 3 and parts[1].upper() != 'U': try: sym_name = parts[0] vaddr = int(parts[2], 16) symbols[sym_name] = vaddr except ValueError: pass proc.terminate() if hasattr(proc, 'wait'): proc.wait() except Exception: pass return symbols def _scan_nm(self, path: str, symbol: str) -> Tuple[Optional[str], Optional[int]]: host_path = None is_temp = False try: fs = plugins.static_fs f = fs.open(path) if not f: return None, None fd, host_path = tempfile.mkstemp() os.close(fd) is_temp = True with open(host_path, 'wb') as tf: shutil.copyfileobj(f, tf) f.close() # Attempt 1: Static cmd = [self.nm_path, '-P', host_path] new_symbols = self._run_nm_command(cmd) # Attempt 2: Dynamic if not new_symbols: cmd = [self.nm_path, '-D', '-P', host_path] new_symbols = self._run_nm_command(cmd) if new_symbols: with open(host_path, 'rb') as tf: elffile = ELFFile(tf) is_exec = elffile.header['e_type'] == 'ET_EXEC' image_base = 0 segments = [] for segment in elffile.iter_segments(): if segment['p_type'] == 'PT_LOAD': seg_info = { 'vaddr': segment['p_vaddr'], 'memsz': segment['p_memsz'], 'offset': segment['p_offset'] } segments.append(seg_info) if image_base == 0 or seg_info['vaddr'] < image_base: image_base = seg_info['vaddr'] converted_symbols = {} for s_name, s_vaddr in new_symbols.items(): offset = self._vaddr_to_file_offset_optimized( segments, s_vaddr, is_exec, image_base) if offset is not None: converted_symbols[s_name] = offset self._update_cache(path, converted_symbols) if symbol in converted_symbols: return path, converted_symbols[symbol] except Exception as e: self.logger.debug(f"nm lookup failed for {path}: {e}") finally: if is_temp and host_path and os.path.exists(host_path): try: os.unlink(host_path) except OSError: pass return None, None def _scan_file_fallback(self, path: str, symbol: str) -> Tuple[Optional[str], Optional[int]]: lookup_path = path if path.startswith("/") else "/" + path f = None new_symbols = {} found_offset = None segments = [] is_exec = False image_base = 0 try: fs = plugins.static_fs f = fs.open(lookup_path) if not f: return None, None # 1. PREP: Read ELF Segments try: elffile = ELFFile(f) is_exec = elffile.header['e_type'] == 'ET_EXEC' for segment in elffile.iter_segments(): if segment['p_type'] == 'PT_LOAD': seg_info = { 'vaddr': segment['p_vaddr'], 'memsz': segment['p_memsz'], 'offset': segment['p_offset'] } segments.append(seg_info) if image_base == 0 or seg_info['vaddr'] < image_base: image_base = seg_info['vaddr'] # Strategy A: PyELFtools (Sections) - Good for standard binaries for section in elffile.iter_sections(): if isinstance(section, SymbolTableSection): for sym in section.iter_symbols(): if sym['st_shndx'] != 'SHN_UNDEF': s_name = sym.name if s_name: new_symbols[s_name] = sym['st_value'] except ELFError: pass except Exception: pass # Strategy B: Manual Raw Dynamic Parsing - Good for sstrip (no sections) if not new_symbols: self.logger.debug( f"[Fallback-Manual] Scanning raw PT_DYNAMIC for {path}...") new_symbols = self._scan_raw_dynamic_symbols( f, elffile, segments) # Strategy C: Readelf Subprocess - The "Final Boss" if not new_symbols: f.close() f = None tf_fd, tf_path = tempfile.mkstemp() os.close(tf_fd) try: with fs.open(lookup_path) as src, open(tf_path, 'wb') as dst: shutil.copyfileobj(src, dst) readelf_path = shutil.which("readelf") if readelf_path: # C1: Standard Symbols cmd = [readelf_path, "--symbols", "-W", tf_path] new_symbols = self._parse_readelf_output(cmd) # C2: Dynamic Symbols if not new_symbols: cmd = [readelf_path, "--dyn-syms", "-W", tf_path] new_symbols = self._parse_readelf_output(cmd) # C3: MIPS GOT (Crucial for MIPS executables) if not new_symbols: cmd = [readelf_path, "-A", "-W", tf_path] new_symbols = self._parse_readelf_mips_got(cmd) finally: if os.path.exists(tf_path): os.unlink(tf_path) # Process Results final_symbols = {} for s_name, s_vaddr in new_symbols.items(): offset = self._vaddr_to_file_offset_optimized( segments, s_vaddr, is_exec, image_base) if offset is not None: final_symbols[s_name] = offset if s_name == symbol: found_offset = offset if final_symbols: self._update_cache(lookup_path, final_symbols) if found_offset is not None: return lookup_path, found_offset except Exception as e: self.logger.error(f"Fallback lookup error: {e}") finally: if f: f.close() return None, None def _scan_raw_dynamic_symbols(self, f, elffile, segments) -> Dict[str, int]: """ Manually parses the PT_DYNAMIC segment to find the Symbol Table and String Table. """ symbols = {} try: dyn_segment = None for seg in elffile.iter_segments(): if seg['p_type'] == 'PT_DYNAMIC': dyn_segment = seg break if not dyn_segment: return {} dt_symtab = None dt_strtab = None dt_strsz = 0 dt_syment = 0 dt_mips_symtabno = 0 for tag in dyn_segment.iter_tags(): if tag.entry.d_tag == 'DT_SYMTAB': dt_symtab = tag.entry.d_val elif tag.entry.d_tag == 'DT_STRTAB': dt_strtab = tag.entry.d_val elif tag.entry.d_tag == 'DT_STRSZ': dt_strsz = tag.entry.d_val elif tag.entry.d_tag == 'DT_SYMENT': dt_syment = tag.entry.d_val elif tag.entry.d_tag == 'DT_MIPS_SYMTABNO': dt_mips_symtabno = tag.entry.d_val if not dt_symtab or not dt_strtab: return {} is_64 = elffile.elfclass == 64 is_little = elffile.little_endian endian_char = '<' if is_little else '>' if is_64: fmt = endian_char + 'IBBHQQ' entry_size = 24 else: fmt = endian_char + 'IIIBBH' entry_size = 16 if dt_syment > 0: entry_size = dt_syment symtab_offset = self._vaddr_to_file_offset_optimized( segments, dt_symtab, False, 0) strtab_offset = self._vaddr_to_file_offset_optimized( segments, dt_strtab, False, 0) if symtab_offset is None or strtab_offset is None: return {} num_symbols = dt_mips_symtabno if dt_mips_symtabno > 0 else 10000 f.seek(strtab_offset) string_table_data = f.read(dt_strsz) f.seek(symtab_offset) for _ in range(num_symbols): raw_bytes = f.read(entry_size) if len(raw_bytes) < entry_size: break parts = struct.unpack(fmt, raw_bytes) if is_64: st_name_idx = parts[0] st_value = parts[4] else: st_name_idx = parts[0] st_value = parts[1] if st_name_idx == 0 or st_name_idx >= len(string_table_data): continue end_idx = string_table_data.find(b'\0', st_name_idx) if end_idx == -1: continue try: sym_name = string_table_data[st_name_idx:end_idx].decode( 'utf-8', errors='ignore') if sym_name: symbols[sym_name] = st_value except Exception: pass except Exception as e: self.logger.debug(f"[Fallback-Manual] Failed: {e}") return symbols def _parse_readelf_output(self, cmd: List[str]) -> Dict[str, int]: symbols = {} try: proc = subprocess.Popen( cmd, stdout=subprocess.PIPE, stderr=subprocess.DEVNULL, text=True ) for line in proc.stdout: parts = line.split() if len(parts) >= 8 and parts[1].strip() != "Value": try: if parts[6] != "UND": val_hex = parts[1] sym_name = parts[7] if '@' in sym_name: sym_name = sym_name.split('@')[0] if '[' in sym_name: sym_name = sym_name.split('[')[0] symbols[sym_name] = int(val_hex, 16) except (ValueError, IndexError): pass proc.wait() except Exception: pass return symbols def _parse_readelf_mips_got(self, cmd: List[str]) -> Dict[str, int]: """Scrapes MIPS GOT entries by looking for the distinct '(gp)' signature.""" symbols = {} try: proc = subprocess.Popen( cmd, stdout=subprocess.PIPE, stderr=subprocess.DEVNULL, text=True ) for line in proc.stdout: line = line.strip() if not line: continue parts = line.split() if len(parts) > 2 and '(gp)' in parts[1]: if "Lazy resolver" in line or "Module pointer" in line: continue if len(parts) < 5: # Filter out Local entries continue try: vaddr_hex = parts[2] sym_name = parts[-1] if sym_name == vaddr_hex: continue symbols[sym_name] = int(vaddr_hex, 16) except (ValueError, IndexError): pass proc.wait() except Exception: pass return symbols def _resolve_staticfs_symlink(self, path: str) -> str: fs = plugins.static_fs current_path = path if path.startswith("/") else "/" + path visited = set() while True: if current_path in visited: return path visited.add(current_path) try: mount_source = getattr(fs, "_fs", None) if not mount_source: return current_path file_info = mount_source.lookup(current_path) if not file_info: return current_path if stat.S_ISLNK(file_info.mode): link_target = file_info.linkname if link_target.startswith("/"): current_path = link_target else: parent_dir = os.path.dirname(current_path) current_path = os.path.normpath( os.path.join(parent_dir, link_target)) else: return current_path except Exception: return path def _vaddr_to_file_offset_optimized(self, segments: list, vaddr: int, is_exec: bool = False, image_base: int = 0) -> Optional[int]: if not segments: return None if is_exec and image_base == 0: min_vaddr = min(s['vaddr'] for s in segments) if min_vaddr > 0: image_base = min_vaddr for seg in segments: if seg['vaddr'] <= vaddr < (seg['vaddr'] + seg['memsz']): return seg['offset'] + (vaddr - seg['vaddr']) if is_exec and image_base > 0 and vaddr >= image_base: return vaddr - image_base return None