import os
import shutil
import csv
import re
import statistics
import math
from collections import Counter
from typing import List
from copy import deepcopy
from concurrent.futures import ThreadPoolExecutor, as_completed
from penguin.common import getColoredLogger, yaml, frozenset_to_dict, dict_to_frozenset
from penguin.penguin_config import dump_config, hash_yaml_config, load_config, load_unpatched_config
from .manager import PandaRunner, calculate_score
[docs]
def calculate_entropy(buffer: bytes) -> float:
# Count the frequency of each byte value
byte_counts = Counter(buffer)
total_bytes = len(buffer)
# Calculate entropy
entropy = 0.0
for count in byte_counts.values():
probability = count / total_bytes
entropy -= probability * math.log2(probability)
return entropy
VALID_TARGETS = ["webserver_start", "coverage", "network_traffic", "network_entropy"]
[docs]
class PatchMinimizer():
def __init__(self, proj_dir, config_path, output_dir, timeout,
max_iters, nworkers, verbose, minimization_target="webserver_start"):
if minimization_target not in VALID_TARGETS:
raise ValueError(f"Invalid minimization target {minimization_target}. Must be one of {VALID_TARGETS}")
self.logger = getColoredLogger("penguin.patch_minmizer")
self.proj_dir = (proj_dir + "/") if not proj_dir.endswith("/") else proj_dir
self.output_dir = output_dir
self.timeout = timeout
self.max_iters = max_iters
self.nworkers = nworkers
self.verbose = verbose
self.minimization_target = minimization_target
self.patches_to_test = list()
self.binary_search = True
self.dynamic_patch_dir = os.path.join(self.proj_dir, "dynamic_patches")
self.data_baseline = dict()
base_config = load_unpatched_config(config_path)
self.original_config = base_config
self.base_config = deepcopy(base_config)
self.run_count = 0
self.scores = dict() # run_index -> score
self.runmap = dict() # run_index -> patchset
# TODO: use FICD to set timeout if timeout parameter. Warn if FICD not reached
# add an FICD option to run until FICD (which might have to do the baseline single-threaded)
# Gather all the candidate patches and our base config to include patches we need for exploration
self.base_config["patches"] = list()
# We have 3 patches to pick between: manual, single_shot, and auto_explore.
# We never want manual. But we might want either single_shot or auto_explore depending on self.minimization_target
# If we're set to www_start, we want to take single_shot, else we want auto_explore
ignore_patches = ["manual"]
required_patches = ["base", "lib_inject.core"]
if self.minimization_target == "webserver_start":
ignore_patches.extend(["auto_explore", "single_shot"])
this_required = "single_shot_ficd"
else:
ignore_patches.append("single_shot_ficd", "single_shot")
this_required = "auto_explore"
required_patches.append(this_required)
for patch in base_config["patches"]:
if any(patch.endswith(f"{x}.yaml") for x in ignore_patches):
self.logger.info(f"Ignoring {patch} to support automated minimization")
if any(patch.endswith(f"/{x}.yaml") for x in required_patches):
# Patches we just leave *always* enabled: base, auto_explore and lib_inject.core
self.base_config["patches"].append(patch)
else:
# Patches we want to test. Will never include manual or single_shot (since we filtered at start of loop)
self.patches_to_test.append(patch)
# Ensure we have static_patches/auto_explore.yaml and NOT single_shot.yaml
if not any([patch.endswith(f"/{this_required}.yaml") for patch in self.base_config["patches"]]):
self.logger.warning(f"Adding {this_required} patch to supported automated exploration to guide minimization")
# Ensure static_patches dir is in at least one of the patches
assert (any([patch.startswith("static_patches") for patch in self.patches_to_test])), "No static_patches dir in patches - not sure how to add auto_explore"
self.base_config["patches"].append(f"static_patches/{this_required}.yaml")
# Patches can override options in previous patches
# this can cause serious issues with our minimization algorithm since we assume patches are orthoganal
self.original_patched_config = load_config(self.proj_dir, config_path)
dump_config(self.original_patched_config, os.path.join(output_dir, "patched_config.yaml"))
self.remove_shadowed_options()
self.split_overlapping_patches()
self.run_base = os.path.join(output_dir, "runs")
os.makedirs(self.run_base, exist_ok=True)
dump_config(self.base_config, os.path.join(output_dir, "base_config.yaml"))
self.logger.setLevel("DEBUG" if verbose else "INFO")
self.logger.info(f"Loaded {len(self.patches_to_test)} patches to test")
self.logger.debug(f"Candidate patches: {self.patches_to_test}")
[docs]
@staticmethod
def lists_overlap(list1, list2):
overlap = list()
for item in list1:
if item in list2:
overlap.append(item)
return overlap
[docs]
@staticmethod
def dicts_overlap(dict1, dict2):
"""
Returns a dict of the overlapping keys and values
"""
overlap = dict()
for key in dict1:
if key in dict2:
if isinstance(dict1[key], dict) and isinstance(dict2[key], dict):
sub_overlap = PatchMinimizer.dicts_overlap(dict1[key], dict2[key])
if sub_overlap:
overlap[key] = sub_overlap
elif isinstance(dict1[key], list) and isinstance(dict2[key], list):
sub_overlap = PatchMinimizer.lists_overlap(dict1[key], dict2[key])
if sub_overlap:
overlap[key] = sub_overlap
elif dict1[key] == dict2[key]:
overlap[key] = dict1[key]
return overlap
[docs]
@staticmethod
def diff_dicts(dict1, dict2):
"""
Returns the difference of dict1 - dict2
i.e., the keys and values that are in dict1 not in dict2
"""
diff = dict()
for key in dict1:
if key in dict2:
if isinstance(dict1[key], dict) and isinstance(dict2[key], dict):
sub_diff = PatchMinimizer.diff_dicts(dict1[key], dict2[key])
if sub_diff:
diff[key] = sub_diff
elif isinstance(dict1[key], list) and isinstance(dict2[key], list):
new_list = [item for item in dict1[key] if item not in dict2[key]]
if new_list:
diff[key] = new_list
elif dict1[key] != dict2[key]:
diff[key] = dict1[key]
else:
diff[key] = dict1[key]
return diff
[docs]
@staticmethod
def filter_conflicts(final_dict, patch, path=""):
"""
Filter out any keys that are in the final_dict but with a different value
"""
filtered_patch = {}
conflicts = []
for key, value in patch.items():
current_path = f"{path}.{key}" if path else key
if key in final_dict:
if isinstance(value, dict) and isinstance(final_dict[key], dict):
nested_filtered, nested_conflicts = PatchMinimizer.filter_conflicts(final_dict[key], value, current_path)
if nested_filtered:
filtered_patch[key] = nested_filtered
conflicts.extend(nested_conflicts)
elif isinstance(value, list) and isinstance(final_dict[key], list):
templist = [item for item in value if item in final_dict[key]]
if templist:
filtered_patch[key] = templist
else:
if final_dict[key] != value:
conflicts.append((current_path, value, final_dict[key]))
else:
filtered_patch[key] = value
else:
filtered_patch[key] = value
return filtered_patch, conflicts
[docs]
def remove_shadowed_options(self):
"""
walk through each patch and remove any options that the unpatched config would overwrite
we remove the old patch and generate a new one
"""
for patch in deepcopy(self.patches_to_test):
# load the patch
with open(os.path.join(self.proj_dir, patch), "r") as f:
loaded_patch = yaml.load(f, Loader=yaml.FullLoader)
# get the differences between the patched and unpatched config
new_patch, conflicts = self.__class__.filter_conflicts(self.original_patched_config, loaded_patch)
if conflicts:
self.patches_to_test.remove(patch)
if new_patch:
os.makedirs(self.dynamic_patch_dir, exist_ok=True)
new_patch_path = os.path.join(self.dynamic_patch_dir,
f"unshadow_{patch.replace('/', '_')}_{hash_yaml_config(new_patch)[-6:]}.yaml")
with open(new_patch_path, "w") as f:
yaml.dump(new_patch, f)
self.patches_to_test.append(new_patch_path.replace(self.proj_dir, ""))
self.logger.info(f"Patch {patch} had options shadowed by final config, created a new patch without those options: {new_patch_path}")
else:
self.logger.info(f"Patch {patch} had all options shadowed by final config, patch removed")
self.logger.debug(f"conflicts were in the following options: {conflicts}")
else:
self.logger.info(f"Patch {patch} is free of options shadowed by final config")
[docs]
def split_overlapping_patches(self):
"""
If we have overlapping patches, we attempt to preserve orthoganality by splitting them
Ensuring that each unique configuration option is in only one patch
However, in a real config only the last option is considered. So we should throw away
options that are not the last one.
"""
overlapping = dict()
for patch in self.patches_to_test:
for other in self.patches_to_test:
if patch != other:
patch_config = load_unpatched_config(os.path.join(self.proj_dir, patch))
other_config = load_unpatched_config(os.path.join(self.proj_dir, other))
overlap = PatchMinimizer.dicts_overlap(patch_config, other_config)
if overlap:
overlap_key = dict_to_frozenset(overlap)
if overlap_key not in overlapping:
overlapping[overlap_key] = set()
overlapping[overlap_key].update({patch, other})
if overlapping:
self.logger.info("Overlapping patches detected")
os.makedirs(self.dynamic_patch_dir, exist_ok=True)
for overlap, patches in overlapping.items():
overlap_dict = frozenset_to_dict(overlap)
new_patch_path = os.path.join(self.dynamic_patch_dir,
f"overlap_{'_'.join(overlap_dict.keys())}_{hash_yaml_config(overlap_dict)[-6:]}.yaml")
new_patches = [(new_patch_path, frozenset_to_dict(overlap))]
self.logger.info(f"Option {overlap_dict} is in multiple patches: {patches}")
for old_patch in patches:
old_patch_path = os.path.join(self.proj_dir, old_patch)
old_patch_config = load_unpatched_config(old_patch_path)
new_patch = deepcopy(old_patch_config)
diff = PatchMinimizer.diff_dicts(new_patch, frozenset_to_dict(overlap))
old_patch_path = os.path.join(self.proj_dir, old_patch)
# Create the diff patch in dynamic dir still (originally used original dir, but would be misleading for static
diff_path = os.path.join(self.dynamic_patch_dir,
f"diff_{hash_yaml_config(diff)[-6:]}_{os.path.basename(old_patch_path)}")
if diff:
# we might've emptied a patch
new_patches.append((diff_path, diff))
self.logger.debug(f"Removing {old_patch} from consideration due to overlap")
if old_patch in self.patches_to_test:
# We may have already removed it from a three-way conflict
self.patches_to_test.remove(old_patch)
for path, new_patch in new_patches:
self.logger.info(f"Creating new patch {path} to preserve orthoganality")
with open(path, "w") as f:
yaml.dump(new_patch, f)
self.patches_to_test.append(path.replace(self.proj_dir, ""))
[docs]
def run_config(self, patchset, run_index):
"""
This function runs a single configuration and returns the score
Runs in parallel... so be careful with shared resources
"""
score = {"empty": 0.0}
run_dir = os.path.join(self.run_base, str(run_index))
if os.path.isdir(run_dir):
# Remove it
shutil.rmtree(run_dir)
os.makedirs(run_dir)
self.logger.info(f"Starting run {run_index} in {run_dir} with patches:\n\t" + "\n\t".join(patchset))
new_config = deepcopy(self.base_config)
new_config["patches"].extend(patchset)
conf_yaml = os.path.join(run_dir, "config.yaml")
dump_config(new_config, conf_yaml)
out_dir = os.path.join(run_dir, "output")
os.makedirs(out_dir, exist_ok=True)
try:
PandaRunner().run(conf_yaml, self.proj_dir, out_dir,
timeout=self.timeout, verbose=self.verbose)
except RuntimeError as e:
# Uh oh, we got an error while running. Warn and continue
self.logger.error(f"Could not run {run_dir}: {e}")
raise
score = calculate_score(out_dir)
with open(os.path.join(run_dir, "totalscore.txt"), "w") as f:
total = float(sum(score.values()))
f.write(f"{total}\n")
return run_index, score
[docs]
def run_configs(self, patchsets: List[str]):
with ThreadPoolExecutor(max_workers=self.nworkers) as executor:
futures = []
for patchset in patchsets:
self.runmap[self.run_count] = deepcopy(patchset)
futures.append(executor.submit(self.run_config, patchset, self.run_count))
self.run_count += 1
# Wait for all the submitted tasks to complete
for future in as_completed(futures):
try:
res = future.result() # Optionally handle exceptions here
if res is None:
self.logger.error("Thread returned None??")
continue
index, score = res
if score is None:
# Error, need to retry. Add the patchset back to the queue
self.logger.error(f"Error running patchset {patchset}. Retrying")
self.patches_to_test.append(patchset)
continue
self.scores[index] = score
except Exception as e:
print(f"Thread raised an exception: {e}")
self.logger.exception(e) # Show the full traceback
# Bail
executor.shutdown(wait=False)
break
[docs]
def calculate_network_data(self, run_index):
'''
From a directory, calculate the bytes sent, received, and entropy of received from the guest
Consume vpn_{ip}_port files for data amounts (as csv) and vpn_response_{ip}_port files for entropy.
Note that ip could be ipv6 with []s. Colons in names are replaced with underscores (weird for ipv6)
'''
output_dir = os.path.join(self.run_base, str(run_index), "output")
total_data = dict()
pattern = re.compile(r"vpn_(?:([0-9\.]+)|\[[0-9a-f]\]+)_(\d+)")
pattern2 = re.compile(r"vpn_response_(?:([0-9\.]*)|\[[0-9a-f]\]).*_(\d+)")
pattern3 = re.compile(r"web_(?:([0-9\.]*)|\[[0-9a-f]\]).*_(\d+)")
if len(os.listdir(output_dir)) == 0:
self.logger.error(f"Run {run_index} produced no output. This is unexpected")
return
if not os.path.isfile(os.path.join(output_dir, ".ran")):
self.logger.error(f"Run {run_index} did not complete. This is unexpected")
return
default_data = {'to_guest': [], 'from_guest': [], 'entropy': 0.0, # Populated based on vpn logs
'first_length': 0, 'first_entropy': 0.0} # Populated based on fetch_web logs
for file in os.listdir(output_dir):
# and extract the port number:
if m := pattern.match(file):
# Looking at a log of data amounts
sublist = ([], [])
port = int(m.group(2))
file_path = os.path.join(output_dir, file)
with open(file_path, 'r', newline='') as csvfile:
reader = csv.reader(csvfile)
for row in reader:
try:
sublist[0].append(int(row[0]))
sublist[1].append(int(row[1]))
except ValueError as e:
self.logger.error(f"Error reading {file_path}: {e}")
self.logger.error(f"Row: {row}")
continue
if port not in total_data:
total_data[port] = deepcopy(default_data)
total_data[port]['to_guest'].extend(sublist[0])
total_data[port]['from_guest'].extend(sublist[1])
elif m := pattern2.match(file):
# Looking at a response file
# Calculate the entropy of the response by reading the whole file
port = int(m.group(2))
if port not in total_data:
total_data[port] = deepcopy(default_data)
file_path = os.path.join(output_dir, file)
with open(file_path, "rb") as f:
entropy = calculate_entropy(f.read())
self.logger.debug(f"Run {run_index} port {port} has entropy: {entropy:.02f}")
total_data[port]['entropy'] = entropy
elif m := pattern3.match(file):
# Looking at fetch_web log
port = int(m.group(2))
if port not in total_data:
total_data[port] = deepcopy(default_data)
# Populate length and update entropy
with open(os.path.join(output_dir, file), "rb") as f:
data = f.read()
total_data[port]['first_entropy'] = calculate_entropy(data)
total_data[port]['first_length'] = len(data)
return total_data
[docs]
def verify_net_traffic(self, run_index):
'''
Given the results of a run, determine if it's still viable based on network traffic as compared to the baseline
'''
assert (self.data_baseline), "Baseline data not established"
target_ports = [80, 443]
total_data = self.calculate_network_data(run_index)
# Look at the bytes received from the guest
for port, data in total_data.items():
if len(target_ports) and port not in target_ports:
continue # Skip non-target ports
if port not in self.data_baseline.keys():
self.logger.warning(f"On run {run_index}, port {port} produces traffic not seen in baseline. Ignoring")
continue
if self.data_baseline[port]['first_length'] == 0:
# if we baseline didn't respond, we shouldn't expect one
continue
# IF ENTROPY - how does the entropy of the response compare to the baseline? We want it to be similar
# First assert that the baseline has entropy
if self.minimization_target == "network_entropy":
assert 'entropy' in self.data_baseline[port], f"Baseline data for port {port} does not have entropy"
if 'entropy' in data:
self.logger.info(f"Run {run_index} port {port} has entropy {data['entropy']:.02f}, baseline: {self.data_baseline[port]['entropy']:.02f}")
if data['entropy'] < 0.95 * self.data_baseline[port]['entropy']:
self.logger.info(f"Run {run_index} is not viable based on entropy of response on port {port}. Got {data['entropy']:.02f} vs baseline: {self.data_baseline[port]['entropy']:.02f}")
return False
elif self.minimization_target == "network_traffic":
# IF WEB - how does the mean bytes received from the guest compare to the baseline? We want it to be similar
perc = 90
port_percentile = PatchMinimizer.percentile(data['from_guest'], perc)
baseline_percentile = PatchMinimizer.percentile(self.data_baseline[port]['from_guest'], perc)
mean = statistics.mean(data['from_guest'])
baseline_mean = statistics.mean(self.data_baseline[port]['from_guest'])
self.logger.info(f"{perc}th percentile for port {port}: {port_percentile}, baseline: {baseline_percentile}")
self.logger.info(f"mean for port {port}: {mean}, baseline: {baseline_mean}")
if mean < 0.95 * baseline_mean:
self.logger.info(f"Run {run_index} is not viable based on mean bytes received from guest on port {port}")
return False
elif self.minimization_target == "webserver_start":
# We just want the webserver response to be non-empty
if data['first_length'] == 0:
self.logger.info(f"Run {run_index} is not viable based on 0-byte first response length on port {port}")
return False
self.logger.info(f"Run {run_index} has a non-empty first response on {port}: {data['first_length']} bytes with entropy {data['first_entropy']:.02f}")
else:
raise ValueError(f"Unknown minimization target {self.minimization_target}")
return True
[docs]
def verify_coverage(self, run_index):
'''
Given the results of a run, determine if it's still viable based on coverage as compared to the baseline
'''
# First, see if we dropped the number of network binds
if self.scores[run_index]["bound_sockets"] < self.scores[0]["bound_sockets"]:
self.logger.info(f"Run {run_index} has fewer bound sockets than baseline. Not viable")
return False
# Run 0 is always our baseline
baseline = self.scores[0]["blocks_covered"]
# Then, is overall health within 95% of the baseline?
# our_score = sum(self.scores[run_index].values())
our_score = self.scores[run_index]["blocks_covered"]
fraction_baseline = our_score / baseline
self.logger.info(f"Blocks covered for {run_index}: {our_score} (baseline: {baseline}), our_score/baseline: {fraction_baseline}")
if fraction_baseline > 1.10:
self.logger.warning(f"Run {run_index} has more blocks >=10% more blocks covered than baseline. Are you sure baseline is optimized?")
return fraction_baseline >= 0.95
[docs]
def verify_www_started(self, run_index):
'''
Check netbinds log to ensure we saw a webserver bind
'''
run_dir = os.path.join(self.run_base, str(run_index))
with open(os.path.join(*[run_dir, "output", "netbinds_summary.csv"])) as f:
reader = csv.DictReader(f)
netbinds = [row for row in reader]
www_start = any([row['bound_www'] == 'True' for row in netbinds])
if not www_start:
self.logger.info(f"Run {run_index} did not start a webserver. Not viable")
return www_start
[docs]
def config_still_viable(self, run_index):
'''
Compare the results from this run to our baseline. Determine if it's still viable.
If not, we return False, indicating that this config is not valid by our minimization target.
'''
failure = False
if self.minimization_target == "webserver_start":
# If no www start, it's not viable
failure |= not self.verify_www_started(run_index)
if not failure and self.minimization_target in ["webserver_start", "network_entropy", "network_traffic"]:
# If we're doing network stuff, we need to check the network data
failure |= not self.verify_net_traffic(run_index)
if not failure and self.minimization_target == "coverage":
failure |= self.verify_coverage(run_index)
return not failure
[docs]
@staticmethod
def percentile(data, percentile):
data.sort()
k = (len(data) - 1) * (percentile / 100)
f = math.floor(k)
c = math.ceil(k)
if f == c:
return data[int(k)]
d0 = data[int(f)] * (c - k)
d1 = data[int(c)] * (k - f)
return d0 + d1
[docs]
def get_best_patchset(self):
"""
If we don't assume orthoganality, we have to run every combination of patches
Then we could get the best one this way
"""
best = (0, self.original_config["patches"])
for index, score in self.scores.items():
if self.config_still_viable(index) and len(self.runmap[index]) < len(best):
best = (index, self.runmap[index])
return best
[docs]
def establish_baseline(self):
'''
For our very first run, we'll establish the baseline.
First we'll validate that our provided config meets our expectations, or raise an exception if it fails
IF minimization target is webserver_start:
- it must already start a webserver - otherwise we can't minimize
IF minimization target is coverage
- it must produce coverage information (e.g., it should typically have auto_explore patch)
- actual limitation: it should have the vpn, nmap, coverage plugins
IF minimization target is network_traffic:
- It must generate network traffic
- actual requirements: should have the vpn and nmap plugins
After validating the config, run the baseline and do an initial static minimization by removing any pseudofiles
that aren't ever used. Split these into new patches and drop them from patches_to_test
Finally, update self.patches_to_test
'''
# Check in self.original_patched_config that we have the necessary plugins
match self.minimization_target:
case 'coverage':
# Check for the necessary plugins: vpn, nmap, coverage in self.original_patched_config['plugins'] keys
required_plugins = ['vpn', 'nmap', 'coverage']
case 'network_traffic':
required_plugins = ['vpn', 'nmap']
case 'network_entropy':
required_plugins = ['vpn', 'nmap']
case 'webserver_start':
required_plugins = []
for required_plugin in required_plugins:
if required_plugin not in self.original_patched_config['plugins']:
raise ValueError(f"Config does not have the required plugin: {required_plugin} for search mode {self.minimization_target}")
if (self.minimization_target in ['network_traffic', 'network_entropy']) and not self.original_patched_config.get('plugins', {}).get('vpn', {}).get('log', False):
raise ValueError(f"Config does not have plugins.vpn.log set, required for search mode {self.minimization_target}")
assert (self.run_count == 0), f"Establish baseline should be run first not {self.run_count}"
patchset = self.patches_to_test
_, score = self.run_config(patchset, 0)
self.run_count += 1 # Bump run_count so we don't re-run baseline
# Score for baseline goes in self.scores (coverage) and data_baseline stores network data (entropy, bytes)
self.scores[0] = score
self.data_baseline = self.calculate_network_data(0)
self.logger.debug(f"data_baseline: {self.data_baseline}")
assert (self.data_baseline), "Baseline data not established"
self.config_still_viable(0)
# Output is in run_dir
run_dir = os.path.join(self.run_base, "0")
# Check netbinds_summary.csv to see if webserver (or other services start)
with open(os.path.join(*[run_dir, "output", "netbinds_summary.csv"])) as f:
reader = csv.DictReader(f)
netbinds = [row for row in reader]
www_start = any([row['bound_www'] == 'True' for row in netbinds])
any_network_starts = len(netbinds) > 0
if not any_network_starts:
# No matter what you're doing, something should start a network service
raise ValueError(f"Baseline config does not start any network services - invalid for mode {self.minimization_target}")
if (self.minimization_target == 'webserver_start') and not www_start:
# If you're minimizing on webserver start and your baseline doesn't start a webserver, that's a problem
raise ValueError(f"Baseline config does not start a webserver - invalid for mode {self.minimization_target}")
if (self.minimization_target in ['network_traffic', 'network_entropy']):
if not self.data_baseline:
# Check that baseline total_data isn't empty
raise ValueError(f"Baseline config empty - no network traffic generated. Invalid for mode {self.minimization_target}")
if not self.original_patched_config.get('plugins', {}).get('vpn', {}).get('log', False):
# Network VPN must be enabled
raise ValueError(f"Baseline config does not generate network traffic as plugins.vpn.log is not set - invalid for mode {self.minimization_target}")
if sum([sum(data['from_guest']) for data in self.data_baseline.values()]) == 0:
# Need network response
raise ValueError(f"Baseline run had no network responses. Invalid for mode {self.minimization_target}")
if self.minimization_target == 'network_entropy':
# If data was non-zero I think entropy must also be non-zero - probably redundant
if sum([data['entropy'] for data in self.data_baseline.values()]) == 0:
raise ValueError(f"Baseline run had no network entropy. Invalid for mode {self.minimization_target}")
# Great - we have a valid baseline. Now let's figure out if any pseudofile patches are irrelevant.
# Look at output/pseudofiles_modeled.yaml - the keys here are the pseudofiles that were modeled, everything else is irrelevant
with open(os.path.join(*[run_dir, "output", "pseudofiles_modeled.yaml"])) as f:
pseudofiles = yaml.safe_load(f)
relevant_pseudofiles = set(pseudofiles.keys())
# Now look at our patches providing pseudofiles. Split them into two groups: those that are relevant and aren't
for patch in list(self.patches_to_test):
with open(os.path.join(self.proj_dir, patch)) as f:
patch_config = yaml.safe_load(f)
pseudofiles = patch_config.get('pseudofiles', [])
if not len(pseudofiles):
# No pseudofiles
continue
# Does this contain any pseudofiles that *aren't* relevant?
irrelevant_pseudofiles = set(pseudofiles) - relevant_pseudofiles
if not len(irrelevant_pseudofiles):
# Everything was relevant
continue
# We have irrelevant pseudofiles. Split them into a new (disabled) patch.
# Split relevant ones into a new (enabled) patch. Drop the original patch
# First create the new patch with only the relevant pseudofiles
new_patch = deepcopy(patch_config)
new_patch['pseudofiles'] = {pf: pseudofiles[pf] for pf in pseudofiles if pf not in irrelevant_pseudofiles}
# Remove old patch from our queue
self.patches_to_test.remove(patch)
if len(new_patch['pseudofiles']) or len(new_patch.keys()) > 1:
# The new patch is doing something (either relevant pseudofiles or other actions)
os.makedirs(self.dynamic_patch_dir, exist_ok=True)
new_patch_path = os.path.join(self.dynamic_patch_dir, f"relevant_pseudofiles_{hash_yaml_config(new_patch)[-6:]}_{os.path.basename(patch)}")
with open(new_patch_path, "w") as f:
yaml.dump(new_patch, f)
self.patches_to_test.append(new_patch_path.replace(self.proj_dir, ""))
self.logger.info(f"Splitting patch {patch} into {new_patch_path} to remove {len(irrelevant_pseudofiles)} irrelevant pseudofiles")
else:
self.logger.info(f"Removing patch {patch} entirely as it only provides {len(irrelevant_pseudofiles)} irrelevant pseudofiles")
[docs]
def run(self):
# First, establish a baseline score by running the base config.
# We're going to do a binary search, so might as well run the other two halves as well
# It is assumed by other code that run 0 is the baseline
self.logger.info("Establishing baseline")
self.establish_baseline()
slow_mode = not self.binary_search
patchsets = [] # Start with an empty list, we'll add our halves later
# Now, do the binary search. This is fairly optimistic, but can save a bunch of time when it works
if not slow_mode:
while self.run_count < self.max_iters:
self.logger.info(f"{len(self.patches_to_test)} patches remaining")
self.logger.debug(f"Patches to test: {self.patches_to_test}")
# XXX we used to special case the baseline as a 1-of-3 vs normally 1-of-2.
# Now we just run the baseline before starting
first_half_index = self.run_count
second_half_index = self.run_count + 1
patchsets.append(self.patches_to_test[:len(self.patches_to_test)//2])
patchsets.append(self.patches_to_test[len(self.patches_to_test)//2:])
self.run_configs(patchsets)
# We need to record run 0's stats
if not self.data_baseline:
raise ValueError("Baseline data not established")
first_half = self.config_still_viable(first_half_index)
second_half = self.config_still_viable(second_half_index)
if first_half and second_half:
self.logger.warning("Config bisection had both halves pass. Either half is a valid solution? Testing in slow mode")
slow_mode = True
patchsets.clear()
break
elif not first_half and not second_half:
self.logger.info("Config bisection had both halves fail. Time to move slowly")
slow_mode = True
patchsets.clear()
break
elif first_half:
self.logger.info("First half passed, second half failed. Considering first half")
self.patches_to_test = deepcopy(self.runmap[first_half_index])
else:
self.logger.info("First half failed, second half passed. Considering second half")
self.patches_to_test = deepcopy(self.runmap[second_half_index])
patchsets.clear()
if len(self.patches_to_test) == 1:
self.logger.info("Only one patch left. Stopping")
break
if slow_mode:
# Assuming orthoganality of patches, we'll generate a config without each patch
# Greater than 2 since if we have two left binary search would've tested them both
if len(self.patches_to_test) > 2 or not self.binary_search:
run_tracker = dict()
for i, patch in enumerate(self.patches_to_test, start=self.run_count):
if i >= self.max_iters:
self.logger.info("Hit max iterations. Stopping")
break
patchset = deepcopy(self.patches_to_test)
patchset.remove(patch)
patchsets.append(patchset)
run_tracker[i] = patch
self.run_configs(patchsets)
for i, patch in run_tracker.items():
if self.config_still_viable(i):
self.logger.info(f"After running {i} removing {patch} from consideration, appears unnecessary")
self.patches_to_test.remove(patch)
else:
self.logger.info(f"Keeping {patch} since run {i} was not viable without it")
output_file = os.path.join(self.proj_dir, "minimized.yaml")
# TODO: force overwrite of this when --force
if not os.path.exists(output_file):
self.logger.info(f"Writing minimized config to {output_file} (note: this may include auto_explore.yaml)")
self.base_config["patches"].extend(self.patches_to_test)
with open(output_file, "w") as f:
yaml.dump(self.base_config, f)
else:
self.logger.info(f"Config already exists at {output_file}, not overwriting")
return self.patches_to_test
[docs]
def minimize(proj_dir, config_path, output_dir, timeout, max_iters=1000,
nworkers=1, verbose=False):
pm = PatchMinimizer(proj_dir, config_path, output_dir, timeout, max_iters, nworkers, verbose)
pm.run()
print(f"{len(pm.patches_to_test)} required patches: {pm.patches_to_test}")