Source code for penguin.graph_search

import time
import shutil
import glob
import os
import csv
from threading import Thread
from typing import List, Tuple
from copy import deepcopy

from penguin import getColoredLogger
from .common import yaml, get_inits_from_proj
from .graphs import Configuration, ConfigurationManager, Failure, Mitigation
from penguin.penguin_config import dump_config, hash_yaml_config, load_config
from .utils import AtomicCounter, get_mitigation_providers
from .manager import PandaRunner, calculate_score

WWW_ONLY = False  # To simplify large scale evaluations - should we bail early if we see a webserver start?

# Cache output to avoid re-running previously seen configs.
# This is only useful in debugging where we'll rerun our scripts
# multiple times. In production we'll only run a search once per FW
# since nothing's configurable at runtime.
CACHE_SUPPORT = False
cache_dir = "/cache"
caches = {}  # config hash -> output directory with .run.


[docs] class Worker: def __init__( self, global_state, config_manager, proj_dir, run_base, max_iters, run_index, timeout, active_worker_count, verbose=False, thread_id=None, logger=None, ): self.global_state = global_state self.config_manager = config_manager self.proj_dir = proj_dir self.run_base = run_base self.max_iters = max_iters self.run_index = run_index self.active_worker_count = active_worker_count self.thread_id = thread_id self.timeout = timeout self.verbose = verbose self.logger = logger or getColoredLogger( f"mgr{self.thread_id if self.thread_id is not None else ''}.run.{self.run_index.get()}" )
[docs] def run(self): while self.max_iters == -1 or self.run_index.get() < self.max_iters: self.active_worker_count.increment() try: config = self.config_manager.run_exploration_cycle( self.run_config_f, self.find_mitigations_f, self.find_new_configs_f, logger=self.logger, ) except Exception as e: self.logger.error(f"Error in run_exploration_cycle: {e}") raise e finally: self.active_worker_count.decrement() if config is None: time.sleep(1) # If all workers are waiting, that means we're done if self.active_worker_count.get() == 0: self.logger.info("All workers waiting, exiting") return
[docs] def find_new_configs_f( self, failure: Failure, mitigation: Mitigation, parent_config: Configuration ) -> List[Configuration]: """ Given a failure and mitigation, find new configurations to explore """ providers = get_mitigation_providers(parent_config.info) if failure.type not in providers: self.logger.warning(f"No mitigation provider for {failure.id} - ignoring") return [] results = [] # print(f"Asking {failure.type} to mitigate {failure} with {mitigation}") # XXX: would this ever want to return more than one config? For now it's a list... for c in ( providers[failure.type].implement_mitigation( parent_config, failure, mitigation ) or [] ): if not isinstance(c, Configuration): raise TypeError( f"Plugin {failure.type} returned a non-Configuration object {c}" ) if c == parent_config: self.logger.error( f"Plugin {failure.type} returned the parent config {c} as a new config Ignoring" ) continue # If the mitigation has the 'exclusive' property AND already has a child, we skip? # if mitigation.exclusive and len(self.config_manager.graph.descendants(mitigation.gid)): # print(f"Skipping {c} as {mitigation} is exclusive and previously used") # continue results.append(c) return results
[docs] def find_mitigations_f( self, failure: Failure, config: Configuration ) -> List[Mitigation]: results = [] # Lookup the plugin that can handle this failure analysis = get_mitigation_providers(config.info)[failure.type] # print(f"Get potential mitigation for {failure} from {analysis}") for m in analysis.get_potential_mitigations(config.info, failure) or []: if not isinstance(m, Mitigation): raise TypeError( f"Plugin {analysis.ANALYSIS_TYPE} returned a non-Mitigation object {m}" ) # self.logger.info(f"Plugin {analysis.ANALYSIS_TYPE} suggests mitigation {m}") results.append(m) return results
[docs] def run_config_f(self, config: Configuration) -> Tuple[List[Failure], float]: """ Run a given configuration, collect details of it's failures and calculate score """ self.run_idx = self.run_index.increment() run_dir = os.path.join(self.run_base, str(self.run_idx)) if os.path.isdir(run_dir): # Remove it shutil.rmtree(run_dir) os.makedirs(run_dir) # Write config to disk combined_config = deepcopy(config.info) combined_config["core"] = self.global_state.info dump_config(combined_config, os.path.join(run_dir, "config.yaml")) # Dump string representation of graph with open(os.path.join(run_dir, "graph.txt"), "w") as f: f.write(self.config_manager.stringify_state()) with open(os.path.join(run_dir, "full_graph.txt"), "w") as f: f.write(self.config_manager.stringify_state2()) if parent_cc := self.config_manager.graph.get_parent_config(config): parent_failure = self.config_manager.graph.get_parent_failure(config) parent_mitigation = self.config_manager.graph.get_parent_mitigation(config) self.logger.debug( f"Derived from {parent_cc} with {parent_mitigation} to fix {parent_failure}" ) # Record parent in output directory: with open(os.path.join(run_dir, "parent.txt"), "w") as f: f.write(f"parent={parent_cc.run_idx}\n") with open(os.path.join(run_dir, "parent_mitigation.txt"), "w") as f: f.write(f"{parent_mitigation.type}\n") f.write(f"{parent_mitigation.exclusive}\n") f.write(f"{parent_mitigation.info}\n") else: self.logger.debug("Root config") # DEBUG: save graph to disk # if len(self.config_manager.graph.graph.nodes) < 50: # self.config_manager.graph.create_png(os.path.join(run_dir, "graph.png")) # Dump pickle of graph every 5 # if self.run_idx % 5 == 0: # self.config_manager.graph.save_graph(os.path.join(run_dir, "graph.pkl")) # *** EMULATE TARGET *** # Run emulation `n_config_tests` times n_config_tests = 1 truncated = 0 # How much did we shorten the execution by (as an optimization) # Number of sections less than our allowed timeout conf_yaml = os.path.join(run_dir, "config.yaml") do_run = True if CACHE_SUPPORT: # We have cache support. Hash our *whole config* and see if it's in the global cache # If so, set out_dir to that and skip run. Otherwise do run. XXX: Not hash_yaml_config # because that ignores the core section data = yaml.safe_load(open(conf_yaml)) config_hash = hash_yaml_config(data) if config_hash in caches: # Copy the cache directory to our run_dir # Only works for n_config_tests = 1 assert n_config_tests == 1, f"{n_config_tests} != 1: Unsupported" # Check if we have a .ran file in the cache directory and .config if os.path.isfile( os.path.join(*[caches[config_hash], "output", ".ran"]) ) and os.path.isfile(os.path.join(caches[config_hash], "config.yaml")): # print(f"Cache hit for {config_hash}") do_run = False shutil.copytree( os.path.join(caches[config_hash], "output"), os.path.join(run_dir, "output"), ) else: print(f"\tInvalid cache hit {config_hash} - ignore") # Delete the cache entry shutil.rmtree(caches[config_hash]) if do_run: # config_depth = self.config_manager.graph.get_config_depth(config) for config_idx in range(n_config_tests): out_dir = os.path.join( run_dir, "output" + (str(config_idx) if config_idx > 0 else "") ) os.makedirs(out_dir, exist_ok=True) try: # self._subprocess_panda_run(conf_yaml, run_dir, out_dir) PandaRunner().run( conf_yaml, self.proj_dir, out_dir, timeout=self.timeout, verbose=self.verbose ) except RuntimeError as e: # Uh oh, we got an error while running. Warn and continue self.logger.error(f"Could not run {run_dir}: {e}") return [], 0, None if CACHE_SUPPORT: # We had a cache miss. Add this config into hash if os.path.isfile(os.path.join(run_dir, "output", ".ran")): this_cache_dir = os.path.join(cache_dir, config_hash) if os.path.isdir(this_cache_dir): print( f"ERROR: Cache directory already exists, this should never happen. {run_dir} has hash {config_hash}" ) else: shutil.copytree(run_dir, this_cache_dir) caches[config_hash] = this_cache_dir else: print("Not caching config as it did not produce a .ran file") # if we have an exclusive config, treat score as 0 scores = self.find_best_score( run_dir, self.run_idx, n_config_tests, config.exclusive is not None ) failures = self.analyze_failures(run_dir, config, n_config_tests, logger=self.logger) if not len(failures) and truncated > 0: # We saw no failures, but we also were running with a shortened execution. Make a fake failure that # We'll mitigate in core failures.append(Failure("truncation", "core", {"truncated": truncated})) # Record details of failures into output directory # Failures are a list of Failure objects. Above output dir because it's not a part of # our real dynamic analysis, more meta-info with open(os.path.join(run_dir, "failures.yaml"), "w") as f: yaml.dump([fail.to_dict() for fail in failures], f) # XXX Better score aggregation? Can we use dynamic weights or something? final_score = sum(scores.values()) # Compare to parent score, if we had a parent if parent_cc: parent_score = parent_cc.health_score with open(os.path.join(run_dir, "score_delta.txt"), "w") as f: f.write(f"{final_score-parent_score:.02f}") # Config option to bail after we first see a webserver start if WWW_ONLY: # Check if netbinds.csv contains a bind for port 80. If so we'll terminate early # If this config was non-exclusive (i.e., it was a normal run, no symex/dynval), we're done! if not config.exclusive and os.path.isfile( os.path.join(run_dir, "output", "netbinds.csv") ): with open(os.path.join(run_dir, "output", "netbinds.csv"), "r") as f: reader = csv.DictReader(f) for row in reader: if row["guest_port"] == "80": self.logger.info( f"Terminating early due to port 80 bind in {run_dir}" ) for _ in range(self.max_iters): self.run_index.increment() # There must be a better way to do this break return failures, final_score, self.run_idx
[docs] def find_best_score(self, run_dir, run_idx, n_config_tests, is_exclusive): """ Look acrous our `n_config_tests` runs. Calculate the maximal score for each score type our various metrics. Note n_config_tests is 1 for now. Later we might increase depending on expected non-determinism. """ best_scores = {} # For each key, maximal score across all runs for config_idx in range(n_config_tests): these_scores = calculate_score( os.path.join( run_dir, f"output{config_idx}" if config_idx > 0 else "output" ), have_console=not self.global_state.info["show_output"], ) for score_name, score in these_scores.items(): if score_name not in best_scores or score > best_scores[score_name]: best_scores[score_name] = score if is_exclusive: # Exclusive configs get a fixed score of 0, they're an intermediate analysis best_scores = {k: 0 for k in best_scores} # Report scores and save to disk # self.logger.info(f"scores: {[f'{k[:4]}:{v}' for k, v in best_scores.items()]}") with open(os.path.join(run_dir, "scores.txt"), "w") as f: f.write("score_type,score\n") for k, v in best_scores.items(): f.write(f"{k},{v:.02f}\n") # Write a single score to disk with open(os.path.join(run_dir, "score.txt"), "w") as f: total_score = sum(best_scores.values()) f.write(f"{total_score:.02f}\n") return best_scores
[docs] @staticmethod def analyze_failures(run_dir, node, n_config_tests, logger=None): """ After we run a configuration, do our post-run analysis of failures. Run each PyPlugin that has a PenguinAnalysis implemented. Have each identify failures. Staticmethod with logger arg so we can call from other search algos too. Maybe worth refactoring into manager or something more generic? """ fails = [] # (id, type, {data}) for config_idx in range(n_config_tests): output_dir = os.path.join( run_dir, f"output{config_idx}" if config_idx > 0 else "output" ) mitigation_providers = get_mitigation_providers(node.info) # For an exclusive config, only query the exclusive provider if node.exclusive is not None: if node.exclusive not in mitigation_providers: raise ValueError( f"Cannot use exclusive {node.info['exclusive']} as it's not a mitigation provider" ) mitigation_providers = { node.exclusive: mitigation_providers[node.exclusive] } for plugin_name, analysis in mitigation_providers.items(): try: failures = analysis.parse_failures(output_dir) except Exception as e: logger.error(e) raise e # if len(failures): # logger.info(f"Plugin {plugin_name} reports {len(failures)} failures: {failures}") for failure in failures or []: if not isinstance(failure, Failure): raise TypeError( f"Plugin {plugin_name} returned a non-Failure object {failure}" ) fails.append(failure) # We might have duplicate failures, but that's okay, caller will dedup? return fails
[docs] class GlobalState: def __init__(self, proj_dir, output_dir, base_config): self.proj_dir = proj_dir # show_output is False unless we're told otherwise show_output = ( base_config["core"]["show_output"] if "show_output" in base_config["core"] else False ) # root_shell is True unless we're told otherwise root_shell = ( base_config["core"]["root_shell"] if "root_shell" in base_config["core"] else True ) self.info = { "arch": base_config["core"]["arch"], "fs": base_config["core"]["fs"], "kernel": base_config["core"]["kernel"], "show_output": show_output, "root_shell": root_shell, "version": base_config["core"]["version"], } del base_config["core"] # Nobody should use base, ask us instead! if not os.path.isfile(os.path.join(proj_dir, self.info["fs"])): raise ValueError(f"Base filesystem archive not found: {self.info['fs']}") # Static analysis *must* have found some inits, otherwise we can't even start execution! inits_path = os.path.join(*[proj_dir, "static", "InitFinder.yaml"]) if os.path.isfile(inits_path): with open(inits_path, "r") as f: self.inits = yaml.safe_load(f) if not self.inits: raise RuntimeError( f"No potential inits found during static analysis: {inits_path} is empty." )
[docs] def add_init_options_to_graph(config_manager, proj_dir, base_config): """ A config needs to have an ['env']['igloo_init'] in order to do anything useful. We might have a single option already set or we might have multiple options stored proj_dir/static/InitFinder.yaml (based on static analysis). If we have no value set and no potential values, we raise an error. Otherwise we'll create a fake failure for "init" and a mitigation node to add each of the init options. Then we'll create configs with each init and add the necessary graph edges. This means we'll start our search with multiple configuration options (nodes) to explore. If an igloo_init is set in the initial config, we'll assume that's right and leave it alone. """ # Hack igloo_inits into graph as a failure and mitigation. # But only if we don't have igloo_init set and have multiple # potential values if len(base_config.info["env"].get("igloo_init", [])) == 0: init_options = get_inits_from_proj(proj_dir) if len(init_options) == 0: raise RuntimeError( "No potential init binaries identified and none could be found" ) base = config_manager.graph.get_node(base_config.gid) assert base is not None, f"BUG: base config {base_config} not in configuration" if base.run: raise RuntimeError( f"Base config {base_config} already ran, cannot add init options" ) base.run = True # Add a fake failure init_fail = Failure("init", "init", {"inits": init_options}) config_manager.graph.add_node(init_fail) # Connect back to baseline config_manager.graph.add_edge(base_config, init_fail) # Now for each, add mitigations and new config for init in init_options: # First add mitigation: this_init_mit = Mitigation(f"init_{init}", "init", {"init": init}) config_manager.graph.add_node(this_init_mit) # Connect failure to mitigation config_manager.graph.add_edge(init_fail, this_init_mit, unknown=True) # Next add new config conf_info = deepcopy(base_config.info) conf_info["env"]["igloo_init"] = init new_config = Configuration(init, conf_info) config_manager.graph.add_node(new_config) # Connect new config to mitigation and parent config config_manager.graph.add_edge(base_config, new_config, delta=f"init={init}") config_manager.graph.add_edge(this_init_mit, new_config)
[docs] def report_best_results(best_idx, best_output, output_dir): with open(os.path.join(output_dir, "best.txt"), "w") as f: f.write(str(best_idx)) # Now let's examine the best run's netbinds to report on network binds netbinds = os.path.join(best_output, "netbinds.csv") # parse csv - headers are procname,ipvn,domain,guest_ip,guest_port net_procnames = set() net_count = 0 if netbinds is not None: with open(netbinds, "r") as f: reader = csv.DictReader(f) for row in reader: net_procnames.add(row["procname"]) net_count += 1 else: net_count = 0 with open(os.path.join(output_dir, "result"), "w") as f: if net_count == 0: f.write("false\n") # No network binds else: f.write( f"{len(net_procnames)} unique processes bound to {net_count} network sockets\n" )
[docs] def main(): import sys if len(sys.argv) < 3: print(f"Usage: {sys.argv[0]} <proj_dir> <config> <outdir>") sys.exit(1) config = load_config(sys.argv[1], sys.argv[2]) graph_search(sys.argv[1], config, sys.argv[3])
if __name__ == "__main__": main()