import random
import threading
from .graphs import Failure, Mitigation
from .utils import get_mitigation_providers
from typing import List
import os
from copy import deepcopy
[docs]
class MABWeightedSet:
'''
This class stores failures and potential solutions. Within each failure we have a set of potential solutions,
each with its own weight, and we model each solution using Thompson Sampling with a Beta distribution.
We provide a probabilistic selection mechanism to select one of the failure's solutions based on Thompson Sampling.
After observing the result of the selected failure-solution pair, we update the Beta distributions for the solutions
based on the observed result.
This class is thread-safe and can be used in a multi-threaded environment.
'''
def __init__(self, alpha=5, beta=5):
# Store failures as a dictionary with potential solutions
self.failures = {} # (failure_name -> {"solutions": [{"solution": str, "alpha": float, "beta": float}]})
self.alpha_init = alpha # Initial alpha value for the Beta distribution
self.beta_init = beta # Initial beta value for the Beta distribution
self.observed_scores = [] # Track all observed scores
self.selections = []
self.learning_queue = {} # failure name -> {config: config, exclusive: provider}
self.already_learned = set()
self.lock = threading.Lock()
[docs]
def add_failure(self, failure_name, allow_none=True):
"""Add a failure."""
with self.lock:
if failure_name not in self.failures:
self.failures[failure_name] = {"solutions": []}
else:
raise ValueError(f"Failure '{failure_name}' already exists.")
if allow_none:
self.add_solution(failure_name, None)
[docs]
def queue_learning(self, failure_name, mitigation, config, exclusive):
"""
On a run we observed a failure that produced an exclusive mitigation - in other words, an analysis
has requested we do a special (and expensive) run just so we can learn more about potential solutions to the failure.
For now we'll do it ASAP if it's a new mitigation, otherwise we'll ignore
"""
with self.lock:
if failure_name in self.already_learned or failure_name in self.learning_queue:
# Already learned or queued
return
self.learning_queue[failure_name] = {
"patches": deepcopy(config['patches']) + [mitigation],
"exclusive": exclusive,
}
[docs]
def add_solution(self, failure_name, solution, exclusive=None):
"""Add a potential solution to an existing failure."""
with self.lock:
if failure_name not in self.failures:
raise ValueError(f"Failure '{failure_name}' does not exist. Add it first.")
if exclusive is not None and not isinstance(exclusive, str):
raise ValueError(f"Exclusive must be None or the name of a failure, not {exclusive} in {failure_name} -> {solution}")
if solution not in [x["solution"] for x in self.failures[failure_name]["solutions"]]:
self.failures[failure_name]["solutions"].append({
"solution": solution,
"alpha": self.alpha_init, # Beta distribution alpha (success count)
"beta": self.beta_init, # Beta distribution beta (failure count)
"exclusive": exclusive
})
[docs]
def probabilistic_mitigation_selection(self):
"""Select independent failures to mitigate and pick one of their solutions."""
with self.lock:
# If we have any entries in self.learning_queue, we'll process them first
if len(self.learning_queue.keys()):
# Need to return a list with an exact set of patches to run in [(failure_name, patch), ]
failure_name = next(iter(self.learning_queue.keys()))
self.already_learned.add(failure_name)
soln = self.learning_queue.pop(failure_name)
print("Selected exclusive configuration:", failure_name, soln['patches'][-1])
return [(f"exclusive_{failure_name}_{soln['exclusive']}", patch) for patch in soln['patches']]
for _ in range(1000): # Limiting to 100 tries for fairness
selected_failures = [] # (failure_name, solution)
have_exclusive = False
epsilon = 0.05 # 5% chance to explore at random within each failure
with self.lock:
# TODO: should we order failures randomly here to ensure we don't bias towards early exclusive choices?
for failure_name, failure_data in self.failures.items():
# print("Selecting solution for:", failure_name)
if not failure_data["solutions"]:
print("\tNo solutions available for:", failure_name)
continue
# With probability epsilon, explore a random solution
soln = None
if random.random() < epsilon:
# print("\tSelecting random solution for:", failure_name)
soln = self._select_solution_random(failure_name, can_be_exclusive=not have_exclusive) # returns (solution, exclusive)
if not soln:
# print("\tSelecting Thompson Sampled solution for:", failure_name)
# If not randomly picking (or if random failed)
# Select one solution for the chosen failure using Thompson Sampling
soln = self._select_solution(failure_name, can_be_exclusive=not have_exclusive) # returns (solution, exclusive)
if soln is not None and soln[0] is not None:
# print("\tSelected solution:", soln[0])
# print("\tExclusive:", soln[1])
selected_failures.append((failure_name, soln[0])) # (failure_name, solution)
assert (soln[1] is not False) # Sanity check - was previously bool but is not Optional[str]
have_exclusive |= (soln[1] is not None)
if selected_failures not in self.selections:
self.selections.append(selected_failures)
return selected_failures
print("Failed to find any solutions?")
def _select_solution_random(self, failure_name, can_be_exclusive=True):
solutions = [x for x in self.failures[failure_name]["solutions"]
if not x["exclusive"] or can_be_exclusive]
# print("\tPotential solutions for", failure_name, ":")
# for s in solutions:
# print("\t\t", s)
if solutions:
soln = random.choice(solutions)
if soln and soln["solution"]:
return soln["solution"], soln["exclusive"]
return None
def _select_solution(self, failure_name, can_be_exclusive=True):
"""Select a solution for a given failure using Thompson Sampling."""
solutions = [x for x in self.failures[failure_name]["solutions"]
if not x["exclusive"] or can_be_exclusive]
# print("\tPotential solutions for", failure_name, ":")
# for s in solutions:
# print("\t", s)
if solutions:
import numpy as np
# Use Thompson Sampling by sampling from Beta(alpha, beta) for each solution
sampled_weights = [np.random.beta(sol["alpha"], sol["beta"]) for sol in solutions]
solution_idx = sampled_weights.index(max(sampled_weights)) # Choose the solution with the highest sample
selected_solution = solutions[solution_idx]["solution"]
is_exclusive = solutions[solution_idx]["exclusive"]
return selected_solution, is_exclusive
# print("No valid solutions found for", failure_name)
# print(self.failures[failure_name]["solutions"])
# print()
return None
[docs]
def report_result(self, selected_failures, final_score):
"""
Update the Beta distribution for the selected solution based on the observed result.
"""
# Before we update the average score or our alphas/betas, we need to see if this is
# the result of an exclusive run - it will be if every failure_name starts with "exclusive_"
if all(failure_name.startswith("exclusive_") for failure_name, _ in selected_failures):
# We're in a learning mode - just ignore the result
print("\tIgnoring run, was exclusive")
return
with self.lock:
self.observed_scores.append(final_score)
avg_score = sum(self.observed_scores) / len(self.observed_scores)
for failure_name, selected_solution in selected_failures:
if failure_name.startswith("exclusive_"):
raise ValueError
solution_idx = next(i for i, s in enumerate(self.failures[failure_name]["solutions"])
if s["solution"] == selected_solution)
solution_data = self.failures[failure_name]["solutions"][solution_idx]
# Update distributions based on final score
decay_factor = 0.9 # Introduce a decay factor
weight = decay_factor * self.weighted_likelihood(final_score, avg_score)
if final_score > avg_score: # Success case
solution_data["alpha"] += weight # Weighted update
else: # Failure case
solution_data["beta"] += weight # Weighted update
[docs]
def weighted_likelihood(self, final_score, avg_score):
"""Calculate a weighted likelihood for updating based on score deviation."""
weight = abs(final_score - avg_score) # More deviation = more weight
return min(1.0, weight) # Cap weight at 1.0 to avoid excessive adjustments
def __str__(self):
"""Custom string representation to show failures and their solutions"""
output = ""
for failure, details in self.failures.items():
output += f"Failure: {failure}\n"
for sol in details["solutions"]:
output += f" - Solution: {sol['solution']} (Alpha: {sol['alpha']:.02f}, Beta: {sol['beta']:.02f}" + (" Exclusive" if sol['exclusive'] else "") + ")\n"
return output
[docs]
class ConfigSearch:
"""
This class contains logic that would be shared across various configuration search algorithms.
"""
def __init__(self):
# We expect children to set up their own logger
pass
[docs]
def find_mitigations(self, failure: Failure, config) -> List[Mitigation]:
results = []
# Lookup the plugin that can handle this failure
analysis = get_mitigation_providers(config)[failure.type]
for m in analysis.get_potential_mitigations(config, failure) or []:
if not isinstance(m, Mitigation):
raise TypeError(
f"Plugin {analysis.ANALYSIS_TYPE} returned a non-Mitigation object {m}"
)
results.append(m)
return results
[docs]
def analyze_failures(self, config, run_dir, exclusive=None):
"""
After we run a configuration, do our post-run analysis of failures.
Run each PyPlugin that has a PenguinAnalysis implemented. Ask each to
identify failures.
"""
fails = [] # (id, type, {data})
output_dir = os.path.join(run_dir, "output")
mitigation_providers = get_mitigation_providers(config)
# For an exclusive config, only query the exclusive provider
if exclusive is not None:
if exclusive not in mitigation_providers:
raise ValueError(
f"Cannot use exclusive {exclusive} as it's not a mitigation provider"
)
mitigation_providers = {
exclusive: mitigation_providers[exclusive]
}
for plugin_name, analysis in mitigation_providers.items():
try:
failures = analysis.parse_failures(output_dir)
except Exception as e:
self.logger.error(e)
raise e
for failure in failures or []:
if not isinstance(failure, Failure):
raise TypeError(
f"Plugin {plugin_name} returned a non-Failure object {failure}"
)
fails.append(failure)
# We might have duplicate failures, but that's okay, caller will dedup?
return fails
if __name__ == "__main__":
# Unit testing
def generate_ground_truth(N=5, M=5, min_weight=-100, max_weight=100, scale=1.2):
"""
Generate synthetic ground truth for testing. Create N distinct failures with between 1 and M solutions each.
Each solution has a weight from scale**(min_weight to max_weight).
"""
ground_truth = {}
# Add N failures with a random number 2, M solutions with random weights
N = 5
M = 5
for i in range(N):
ground_truth[f"failure{i}"] = {f"solution{j}": round((scale)**random.randint(min_weight, max_weight), 2) for j in range(random.randint(1, M))}
for f in ground_truth.keys():
ground_truth[f][None] = 0
return ground_truth
def create_synthetic_test_data(mab, ground_truth):
"""
Populate the MABWeightedSet instance with synthetic test data.
"""
# Add failures from the ground truth
for failure_name, failure_data in ground_truth.items():
mab.add_failure(failure_name)
# Add possible solutions for each failure with fixed initial weights
for solution in failure_data.keys():
mab.add_solution(failure_name, solution) # Solutions are initially equal
def simulate_iterations(mab, ground_truth, iterations=100):
"""
Run multiple iterations to simulate the selection and update process.
"""
for idx in range(iterations):
# Select failures and their solutions probabilistically
selected_failures = mab.probabilistic_mitigation_selection()
# Calculate a synthetic "final score" based on the ground truth.
# If the selected solution matches the ground truth preferred solution, assign a high score.
if not selected_failures:
break
# Calculate the final score based on the ground truth
final_score = 0
for failure, solution in selected_failures:
final_score += ground_truth[failure][solution]
final_score /= sum(len(v) for v in ground_truth.values()) # Normalize score between 0 and 1
# Report the result to update the Beta distributions
mab.report_result(selected_failures, final_score)
print(f"Iteration {idx} selects " + ", ".join([f"{k}={v}" for (k, v) in selected_failures]) + f" with score {final_score:.02f}")
def main():
# Instantiate the MABWeightedSet class
mab = MABWeightedSet()
# Generate synthetic ground truth
ground_truth = generate_ground_truth()
# Create synthetic test data in the instance
create_synthetic_test_data(mab, ground_truth)
# Run the synthetic test with multiple iterations
simulate_iterations(mab, ground_truth, iterations=500)
# Print the final state of the failures and solutions
print("========= RESULTS ========")
print(mab)
best = {} # failure -> best
for fail, solns in ground_truth.items():
best[fail] = max(solns, key=lambda x: solns[x])
print(f"Failure: {fail}")
for soln, value in solns.items():
print(f" - {soln}: {value}")
# Get best results
# If we had picked the best value for each failure, what would our total score be?
best_weight = sum(ground_truth[failure][best[failure]] for failure in ground_truth)
found_weight = 0
for failure, failure_data in mab.failures.items():
# Select the best solution from our MAB solution by alpha / (alpha + beta). Could also just use alpha?
best_soln = max(failure_data["solutions"], key=lambda x: x["alpha"] / (x["alpha"] + x["beta"]))
found_weight += ground_truth[failure][best_soln["solution"]]
print(f"For {failure} best identified solution is {best_soln['solution']}")
print(f"Best possible score: {best_weight}")
print(f"MAB solution: {found_weight}")
percent_diff = (found_weight - best_weight) / best_weight
print(f"% difference: {100 * percent_diff:.02f}%")
if percent_diff > -0.1:
print("PASS")
else:
print("FAIL")
main()