Source code for pyplugins.analysis.ficd

import time
from os import path
from penguin import Plugin, plugins
import Levenshtein as lv


[docs] class FICD(Plugin): """ FICD metric based on Pandawan, see https://github.com/BUseclab/Pandawan/blob/main/plugins/pandawan/ficd.py The goal here is to produce a faithful representation of that metric in PENGUIN "To this end, FICD considers that a firmware image reached Ifin in if no previously unseen (i.e., unique) tasks are launched within tf seconds. We refer to tf as the time frame parameter" "In our re-hosting experiments we use three (Py)PANDA plugins (coverage, syscalls_logger, and SyscallToKmodTracer) along with the FICD plugin, which results in the optimal tf = 220sec and tf = 300sec" """ def __init__(self): self.time_frame = 300 # set up as arg at some point self.init_time = time.time() self.boot_time = self.init_time self.unique_proc_times = {} self.seen_procs = set() self.prev_time = time.time() self.ifin_reached = False self.measured_tf = 0 self.last_proc_time = 0 self.outfile = path.join(self.get_arg("outdir"), "ficd.yaml") self.stop_on_if = self.get_arg_bool("stop_on_if") self.logger.info("Loading FICD plugin") @plugins.syscalls.syscall("on_sys_execve_enter") def ficd_execve(regs, proto, syscall, fname_ptr, argv_ptr, envp): cpu = self.panda.get_cpu() try: fname = plugins.mem.read_str_panda(cpu, fname_ptr) except ValueError: return # Now get args try: argv_buf = self.panda.virtual_memory_read( cpu, argv_ptr, 96, fmt="ptrlist") except ValueError: self.on_exec(fname) return # Read each argument pointer into argv list argv = [] nullable_argv = [] for ptr in argv_buf: if ptr == 0: break try: argv.append(plugins.mem.read_str_panda(cpu, ptr)) nullable_argv.append(plugins.mem.read_str_panda(cpu, ptr)) except ValueError: argv.append(f"(error: 0x{ptr:x})") nullable_argv.append(None) self.on_exec(fname + " " + " ".join(argv))
[docs] def reset(self): self.ifin_reached = False self.init_time = time.time()
[docs] def on_exec(self, newproc): self.prev_time = time.time() for proc in self.seen_procs: lev_ratio = lv.ratio(proc, newproc) if lev_ratio >= 0.5: self.unique_proc_times[newproc] = [ round(self.prev_time - self.init_time, 2), "Not Unique"] measured_tf = self.prev_time - self.last_proc_time if measured_tf > self.time_frame: self.ifin_reached = True self.measured_tf = measured_tf self.ifin_time = self.prev_time-self.init_time self.logger.info( f"FICD Ifin reached on exec occuring {self.prev_time - self.init_time} after start") if self.stop_on_if: self.logger.warning("Stopping on Ifin reached...") self.panda.end_analysis() return if self.ifin_reached: self.logger.warning( f"Warning! FICD Ifin reached but new exec {newproc} was seen at time {self.prev_time - self.init_time}. System is likely being exercised.") self.last_proc_time = self.prev_time self.seen_procs.add(newproc) self.unique_proc_times[newproc] = [ round(self.prev_time - self.init_time, 2), "Unique"]
[docs] def uninit(self): prev_time = time.time() with open(self.outfile, "w") as f: f.write(f"ifin_reached: {self.ifin_reached}\n") if self.ifin_reached: f.write(f"ifin_time: {self.ifin_time}\n") f.write(f"boot_time: {self.boot_time}\n") f.write(f"init_time: {self.init_time}\n") f.write(f"measured_tf: {self.measured_tf}\n") f.write( f"last_proc_start: {self.last_proc_time - self.init_time}\n") f.write(f"total_execution_time: {prev_time - self.init_time}\n") f.write(f"time_past_tf: {self.measured_tf - self.time_frame}\n")