Source code for penguin.genetic

import os
import sys
import random
import logging
import shutil
import yaml
import traceback
from penguin import getColoredLogger
from threading import RLock
from concurrent.futures import ThreadPoolExecutor
from queue import Queue
from copy import deepcopy

from .common import get_inits_from_proj, patch_config, frozenset_to_dict, dict_to_frozenset
from penguin.penguin_config import dump_config, load_config, load_unpatched_config, hash_yaml_config
from .utils import AtomicCounter, hash_image_inputs
from .manager import PandaRunner, calculate_score
from .graphs import Failure, Mitigation

from .search_utils import ConfigSearch


from dataclasses import dataclass
from typing import Tuple, List, Set

"""
Genetic Algorithm Configuration Search

Overall idea:
- We have a population of configurations
- A configuration is a chromosome
- Each set of potential mitigations for a given failure is a gene
- The fitness function is the health score
- We'll support all the usual GA operations: crossover, mutation, selection
"""






[docs] def create_init_gene(base_config, proj_dir, patch_dir): """ Based on add_init_options_to_graph in manager.py A config needs to have an ['env']['igloo_init'] in order to do anything useful. We might have a single option already set or we might have multiple options stored proj_dir/static/InitFinder.yaml (based on static analysis). If we have no value set and no potential values, we raise an error. Otherwise we'll create a fake failure for "init" and a mitigation node to add each of the init options. Then we'll create configs with each init. This means we'll start our search with multiple configuration options to explore. If an igloo_init is set in the initial config, we'll assume that's right and leave it alone. """ # Hack igloo_inits as a gene # But only if we don't have igloo_init set and have multiple # potential values # Add a fake failure init_mitigations = set() if len(base_config["env"].get("igloo_init", [])) == 0: init_options = get_inits_from_proj(proj_dir) if len(init_options) == 0: raise RuntimeError( "No potential init binaries identified and none could be found" ) for init in init_options: # First add mitigation: patch_fname = f"init{init.replace('/', '_')}.yaml" patch_path = os.path.join(patch_dir, patch_fname) with open(patch_path, "w") as f: yaml.dump({"env": {"igloo_init": init}}, f) this_init_mit = dict_to_frozenset({'patches': [patch_path]}) init_mitigations.add(this_init_mit) else: # If we already have an init defined, we'll just use that patch_fname = "init_init.yaml" patch_path = os.path.join(patch_dir, patch_fname) with open(patch_path, "w") as f: yaml.dump({"env": {"igloo_init": init}}, f) mit = dict_to_frozenset({'patches': [patch_path]}) init_mitigations.add(mit) return MitigationAlleleSet("init", frozenset(init_mitigations))
# This is absolutely horrible, but thing we need it for the bolt on approach to testing out this new search # Note: this does not support keys that were *removed)
[docs] def diff_configs(new, old): diff = {} for k in new: if k not in old: diff[k] = new[k] elif isinstance(new[k], dict) and isinstance(old[k], dict): nested_diff = diff_configs(new[k], old[k]) if nested_diff: # Only include if there's a difference diff[k] = nested_diff elif new[k] != old[k]: diff[k] = new[k] return diff
[docs] @dataclass(frozen=True, eq=True) class MitigationPatch: """ This class represents a single mitigation and the patch to apply """ failure_name: str patch: frozenset # probably not frozen or do we do a new set?
[docs] @dataclass(frozen=True, eq=True) class MitigationAlleleSet: """ This class contains a set of mitigations we know about for a given failure In the biology analogy, each mitigation would be an allele """ failure_name: str mitigations: frozenset # probably not frozen or do we do a new set?
[docs] class GenePool: """ The gene pool tracks the set of possible mitigations for a given failure """ def __init__(self): self.genes = dict() # TODO look into the data structure for indexing failures here
[docs] def update(self, new_gene): if new_gene is None: # nop return failure_name = new_gene.failure_name if isinstance(new_gene, MitigationAlleleSet): new_mits = frozenset(new_gene.mitigations) elif isinstance(new_gene, MitigationPatch): new_mits = frozenset([new_gene.patch]) elif isinstance(new_gene, Mitigation): new_mits = frozenset([dict_to_frozenset(new_gene.patch)]) else: raise (RuntimeError(f"Unexpected datatype for new_gene {new_gene}: {type(new_gene)}")) if failure_name not in self.genes: self.genes[failure_name] = new_mits else: current_mitigations = self.genes[failure_name] self.genes[failure_name] = frozenset(current_mitigations.union(new_mits))
[docs] def get_mitigations(self, failure): if isinstance(failure, Failure): failure_name = self.failure_to_genename(failure) elif isinstance(failure, str): failure_name = failure else: print(RuntimeError(f"Unexpected datatype for failure {failure}: {type(failure)}")) return self.genes.get(failure_name, frozenset())
[docs] def failure_to_genename(failure: Failure): # HACK: interfaces return overly unique names so they wouldn't be considered the same gene return failure.friendly_name
[docs] def get_names(self): return self.genes.keys()
[docs] class ConfigChromosome: # At the end of the day, each "chromosome" is going to be a configuration. Which is set of mitigations # We are using a frozenset since each chromosome will be unique genes: frozenset[MitigationPatch] hash: int def __init__(self, parent: 'ConfigChromosome' = None, new_gene=None): self.genes = frozenset() if parent is None: parent = self # weird """ elif not new_gene.failure_name.startswith("init"): try: assert len(parent.genes) > 0, "If there's a parent and we aren't adding init, it must have genes" except AssertionError as e: print("Debugging ConfigChromosome constructor") print("Parent genes: ", parent.genes) print("new_gene: ", new_gene) import IPython; IPython.embed() """ if isinstance(new_gene, Mitigation): new_gene = MitigationPatch(new_gene.failure_name, dict_to_frozenset(new_gene.patch)) if isinstance(new_gene, MitigationPatch): old_genes = parent.genes # faded and with lots of holes if old_gene := parent.get_mitigation(new_gene.failure_name): # If we had this gene, replace it with the new one. This is for mutations old_genes = old_genes.difference({old_gene}) self.genes = frozenset(old_genes.union({new_gene})) elif isinstance(new_gene, frozenset): self.genes = new_gene # cache the unsigned hash of genes, not sure how much this actually buys us self.hash = hash(self.genes) & (1 << sys.hash_info.width) - 1 def __str__(self): return f"{self.hash:016x}"
[docs] def to_dict(self): config_dict = {} # TODO: should we do some validation to make sure our config doesn't have conflicting options? for m in self.genes: sub_dict = frozenset_to_dict(m.patch) for k, v in sub_dict.items(): if k in config_dict: if isinstance(config_dict[k], list): config_dict[k].extend(v) elif isinstance(config_dict[k], dict): config_dict[k].update(v) else: raise (RuntimeError(f"Unexpected type for config key {k}: {type(config_dict[k])}")) else: config_dict[k] = v return config_dict
[docs] def get_mitigation(self, failure_name): if failure_name is None: # happens with learning configs return None assert (isinstance(failure_name, str)), "Failure name must be a string" for m in self.genes: if m.failure_name == failure_name: return m return None
[docs] def mitigations_to_str(self): return "\n".join(f"{m.failure_name}: {m.patch}" for m in self.genes)
[docs] class ConfigPopulation(ConfigSearch): def __init__(self, proj_dir, base_config, run_base, logger, pop_size, timeout, max_runs, nelites=1, verbose=False): self.proj_dir = proj_dir self.base_config = deepcopy(base_config) # store the base configuration, assumes it is immutable self.chromosomes = set() self.attempted_configs = set() # store the configurations we've already tried - do we need this with the cache? self.pool = GenePool() # store the pool of all possible mitigations self.work_queue = Queue() # store the work queue of configurations to run self.run_index = AtomicCounter(-1) # store the run index, start at -1 since we increment before using self.logger = logger self.run_base = run_base self.lock = RLock() # lock for shared state self.fitnesses = dict() # cache the fitness of each configuration self.pop_size = pop_size self.parents = [] self.timeout = timeout self.nelites = nelites # number of elites to keep in the next generation self.verbose = verbose self.max_runs = max_runs # This is where the biology breaks down a bit. We'll add a new chromosome to the population based on an observed failure
[docs] def extend_genome(self, parent: ConfigChromosome, new_gene: MitigationAlleleSet): # First, check to see if we have existing mitigations for this failure old_mitigations = self.pool.get_mitigations(new_gene.failure_name) self.pool.update(new_gene) # our pool contains all possible mitigations for a given failure # If we have new mitigations, add them as children to this config # In the biology analogy, these things would be alleles for m in new_gene.mitigations.difference(old_mitigations): self.chromosomes.add(ConfigChromosome(parent, MitigationPatch(new_gene.failure_name, m)))
[docs] def run_configs(self, nthreads: int, chromosomes: Set[ConfigChromosome]): results = {} """ # single-threaded debug mode: for c in chromosomes: self.run_index.increment() failures, scores = self.run_config(c, self.run_index.get()) results[c.hash] = {"failures": failures, "scores": scores, "fitness": float(sum(scores.values()))} """ with ThreadPoolExecutor(max_workers=nthreads) as executor: for tid in range(nthreads): try: executor.submit(self.run_worker, id=tid, results=results) except Exception as e: self.logger.error(f"Error in run_worker: {e}") raise e self.create_work_queue(nthreads, chromosomes) self.join_workers(nthreads) return results
[docs] def run_worker(self, id: int, results: dict ): while True: # A unit of work is a configuration to run along with the run index self.logger.info(f"[thread {id}]: Getting work from queue") task = self.work_queue.get() if task is None: self.logger.info(f"[thread {id}]: Got task None from queue, exiting") self.work_queue.task_done() break try: config, run_index = task if run_index < self.max_runs: self.logger.info(f"[thread {id}]: Running config {config} with run index {run_index}") failures, scores = self.run_config(config, run_index) with self.lock: results[config.hash] = {"failures": failures, "scores": scores, "fitness": float(sum(scores.values()))} else: self.logger.info(f"[thread {id}]: Skipping config {config} with run index {run_index}") except Exception as e: self.logger.error(f"[thread {id}]: Error running config {config} with run index {run_index}: {e}") self.work_queue.task_done() traceback.print_exc() self.work_queue.task_done() self.logger.info(f"[thread {id}]: Finished config {config} with run index {run_index}")
[docs] def create_work_queue(self, nworkers: int, chromosomes: Set[ConfigChromosome]): for chromosome in chromosomes: self.work_queue.put((chromosome, self.run_index.increment())) for _ in range(nworkers): self.work_queue.put(None)
[docs] def join_workers(self, nworkers=1): self.logger.info("Joining workers...") self.work_queue.join() self.logger.info("Joined workers")
[docs] def get_full_config(self, config: ConfigChromosome): """ Given a configuration, return the full configuration (base_config+config) """ combined_config = deepcopy(self.base_config) # FIXME: we only do patches now! Is that good? # we apply our configs last so they should override the base config... combined_config['patches'].extend(config.to_dict()['patches']) return combined_config
[docs] def get_patched_config(self, config: ConfigChromosome): patched_config = self.get_full_config(config) for p in patched_config.get("patches", []): # kinda funny how the names wound up... p_yaml = load_unpatched_config(os.path.join(self.proj_dir, p)) patched_config = patch_config(self.logger, patched_config, p_yaml) return patched_config
[docs] def run_config(self, config: ConfigChromosome, run_index: int) -> Tuple[List[Failure], float]: """ Careful! This function is run in parallel and should not modify any shared state. This is very dirty - we return the failure type from graph.py, not our own Failure class We also tack on the base config, so watch out for that. """ failures = [] # TODO: should we track only new failures? score = {"empty": 0.0} if not self.get_fitness(config): self.logger.info(f"Skipping config {config} since it has already been tried") return failures, {"previous_run": self.get_fitness(config)} run_dir = os.path.join(self.run_base, str(run_index)) if os.path.isdir(run_dir): # Remove it shutil.rmtree(run_dir) os.makedirs(run_dir) self.logger.info(f"Running config {config} in {run_dir}") # Write config to disk full_config = self.get_full_config(config) self.logger.info(f"config {config} uses patches: {full_config.get('patches', [])}") dump_config(full_config, os.path.join(run_dir, "config.yaml")) # Run the configuration conf_yaml = os.path.join(run_dir, "config.yaml") self.logger.debug(f"Config {config} has image_inputs hash {hash_image_inputs(self.proj_dir, full_config)}") out_dir = os.path.join(run_dir, "output") os.makedirs(out_dir, exist_ok=True) # Stash genes in the run directory with open(os.path.join(run_dir, "genes.txt"), "w") as f: output = f"Config: {config.hash}\n" f.write(output+config.mitigations_to_str()) try: PandaRunner().run(conf_yaml, self.proj_dir, out_dir, timeout=self.timeout, verbose=self.verbose) except RuntimeError as e: # Uh oh, we got an error while running. Warn and continue self.logger.error(f"Could not run {run_dir}: {e}") return [], 0, None # Now, get the score and failures score = calculate_score(out_dir) with open(os.path.join(run_dir, "fitness.txt"), "w") as f: total = float(sum(score.values())) f.write(f"{total}\n") # Failure analysis needs a config with the patches expanded patched_config = load_config(self.proj_dir, conf_yaml) failures = self.analyze_failures(patched_config, run_dir) return failures, score
[docs] def record_fitness(self, config: ConfigChromosome, fitness): if config.hash not in self.fitnesses: self.fitnesses[config.hash] = fitness else: # If we decide to really hold on to an elite, could wind up here # But with the large number of possibilities we shouldn't end up here self.logger.info(f"Double record! Fitness for {config.hash} already recorded")
[docs] def get_fitness(self, config: ConfigChromosome): return self.fitnesses.get(config.hash, False)
[docs] def config_in_pop(self, config: ConfigChromosome): return config.hash in [c.hash for c in self.chromosomes]
[docs] def selection(self, nparents): """ A ranked based selection. We'll choose nparents to move on to the next generation By default, hold on to the highest fitness config (nelites=1) """ self.parents = [] # get current population, sorted by fitness, higher is better (e.g., 6/6 is better than 5/6) if len(self.chromosomes) == 0: sorted_configs = [] else: sorted_configs = sorted(self.chromosomes, key=lambda c: self.get_fitness(c)) # probability of a selecting each config, weighted rank based selection n = len(sorted_configs) norm = n*(n+1)/2 probs = [(i+1)/norm for i in range(0, n)] # select nparents based on the probabilities # first, if we have duplicates, we'll just add them if n < nparents: self.parents = sorted_configs if self.nelites > 0: elites = sorted_configs[-self.nelites:n] self.parents.extend(elites) while len(self.parents) < nparents: r = random.random() for i in range(n): if r < probs[i]: # only allow duplicates if our starting population is too small # they very well may get reduced in the crossover step, but we want this functionality # should our approach to crossover change if sorted_configs[i] not in self.parents or n < nparents: self.parents.append(sorted_configs[i]) break self.logger.info(f"Selected {len(self.parents)} parents from {n} configs") self.logger.debug(f"parents:\n {self.print_chromosomes(self.parents)}")
[docs] def crossover(self, p1_prob=0.5): """ Perform crossover on the parents to create new children, choose from p1 with probability p1_prob """ # Pass elites through unscathed self.chromosomes = set(self.parents[:self.nelites]) self.parents = self.parents[self.nelites:] # cross over neighbors in the parent list for i in range(0, len(self.parents), 2): try: p1 = self.parents[i] p2 = self.parents[i+1] except IndexError: # lazily handle odd number of parents break child = ConfigChromosome() for gene in self.pool.get_names(): # get the set of known mitigations for this gene # cross over the two parents, grabbing a mitigation from each if random.random() < p1_prob: m1 = p1.get_mitigation(gene) if m1: child = ConfigChromosome(child, m1) else: m2 = p2.get_mitigation(gene) if m2: child = ConfigChromosome(child, m2) else: m2 = p2.get_mitigation(gene) if m2: child = ConfigChromosome(child, m2) else: m1 = p1.get_mitigation(gene) if m1: child = ConfigChromosome(child, m1) # Only add this child if it is unique and has not been tried before # Since the population is a set, we don't have to check for duplicates if not self.get_fitness(child) and len(child.genes) > 0: try: assert (child.get_mitigation("init_init")), "Missing init gene" if not self.config_in_pop(child): # Unfortunately, using sets didn't get us this diversity automagically self.chromosomes.add(child) except AssertionError as e: self.logger.error(f"Error in crossover: {e}") self.logger.info(f"Population size after crossover is: {len(self.chromosomes)}") self.logger.debug(f"population:\n{self.print_chromosomes(self.chromosomes)}")
[docs] def mutation(self, nmuts=1): """ Perform mutation on the population, we are going to be a bit more aggressive with mutation than traditional GAs Our mutation here is essentially "try a different mitigation". Elites will get modified here """ # At this point, we have parents and a starter population with children from crossover, make new configs until are population is full failed = 0 while len(self.chromosomes) < self.pop_size: # Originally did a growing configchromosome, but that was buggy and inefficient genes = random.choice(self.parents).genes gene_names = [g.failure_name for g in genes] # First, make sure we have all genes in this chromosome (with a possibility of no mitigation) # this allows us to get a bunch of diversity from learning configs print(f"Gene names: {gene_names}") print(f"Pool names: {self.pool.get_names()}") for g in self.pool.get_names(): if g not in gene_names: self.logger.debug(f"Adding missing gene {g}") choices = self.pool.get_mitigations(g) if not g.startswith("init"): # init is special, don't allow that to be None. otherwise maybe "no mitigation" is what we'd like? choices = choices.union({None}) m = random.choice(list(choices)) if m and m is not None: genes = genes.union({MitigationPatch(g, m)}) self.logger.debug(f"Mutating genes {genes}") for i in range(nmuts): gene = random.choice(list(self.pool.get_names())) self.logger.debug(f"Mutating gene {gene}") choices = self.pool.get_mitigations(gene) assert choices, f"No choices for gene {gene}" m = random.choice(list(choices)) self.logger.debug(f"Mutation: {m}") self.logger.debug(f"Before mutation: {genes}") if gene in [g.failure_name for g in genes]: # if we already have this gene, remove it genes = genes.difference({g for g in genes if g.failure_name == gene}) child = ConfigChromosome(None, genes.union({MitigationPatch(gene, m)})) self.logger.debug(f"After mutation: {child.genes}") if child and not self.get_fitness(child) and not self.config_in_pop(child): # If we have exhausted our configuration space, this will cause an infinite loop... self.chromosomes.add(child) failed = 0 else: failed += 1 if failed > 100: self.logger.warn("Failed to generate a new configuration 100 times in a row, giving up") break self.logger.info(f"Population size after mutation is: {len(self.chromosomes)}") self.logger.debug(f"population:\n{self.print_chromosomes(self.chromosomes)}")
[docs] def print_chromosomes(self, chromosomes): return "\n".join([f"Chromosome {c}:\n{c.mitigations_to_str()}\n" for c in chromosomes])
[docs] def main(): import argparse if len(sys.argv) < 3: print(f"Usage: {sys.argv[0]} <config> <outdir>") sys.exit(1) parser = argparse.ArgumentParser() parser.add_argument( "config", type=str, help="Path to the configuration file", ) parser.add_argument( "outdir", type=str, help="Path to the output directory", ) parser.add_argument( "--niters", type=int, default=100, help="Number of iterations to run. Default is 100.", ) parser.add_argument( "--nworkers", type=int, default=4, help="Number of workers to run in parallel. Default is 4", ) parser.add_argument( "--timeout", type=int, default=300, help="Timeout in seconds. Default is 300", ) parser.add_argument( "--nmuts", type=int, default=1, help="Number of mutations to perform per iteration. Default is 1", ) args = parser.parse_args() ga_search(os.path.dirname(args.config), args.config, args.outdir, args.timeout, args.niters, args.nworkers, args.nmuts)
if __name__ == "__main__": main()