Source code for AutoRPE.UtilsWorkflow.BinaryTree

from logging import root
from os.path import join


[docs] class BinaryTree: """ Represents a binary tree of jobs for an experiment, where each job can have various statuses. Jobs are processed based on their statuses and are moved through different stages in the workflow. - PENDING: The job has been created and is ready to be launched. - RUNNING: The job is currently running. - SUCCESS: A job that has been completed with successful results and asserted. - FAILED: A job that has been completed with unsuccessful results and cannot be split. - SUSPENDED: A job that has been completed with unsuccessful results, that has been split, and depends on the results of descendant jobs. Initializes the binary tree with a root job, accuracy test, local folder, and a maximum number of running jobs. Parameters: root (Job): The root job of the binary tree, which starts the experiment. accuracy_test (object): The accuracy test to evaluate the job results. local_folder (str): The local folder where the experiment data is stored. max_running_jobs (int, optional): The maximum number of jobs allowed to run concurrently (default is 100). """ def __init__(self, root: 'Job', accuracy_test: object, local_folder: str, max_running_jobs: int=100): # Save the seed job, i.e. the seed of the father of the binary tree self.root = root self.loop = 0 self.max_id = 0 # Initialize the different lists of Binary jobs self.pending = [] self.running = [] self.success = [] self.failed = [] self.suspended = [] self.disinherited = [] self.local_folder = local_folder self.checkpoint_folder = join(local_folder, "checkpoints") # Initialize a dictionary with links to all the lists self.__dict = {"PENDING": self.pending, "RUNNING": self.running, "SUCCESS": self.success, "FAILED": self.failed, "SUSPENDED": self.suspended, "DISINHERITED": self.disinherited, } # Set the number of maximum jobs running at the same time self.accuracy_test = accuracy_test self.max_running_jobs = max_running_jobs # Set checkpoint name and put seed job to pending self.pending.append(self.root) # self.checkpoint_name = "checkpoint_%s.pkl" % self.root.hash
[docs] def sort_pending(self, lower_first: bool=True): """ Sorts the pending jobs based on their levels, either prioritizing lower or higher levels. Parameters: lower_first (bool, optional): If True, jobs are sorted by ascending level; otherwise, by descending level. Returns: None """ if lower_first: self.pending.sort(key=lambda _x: _x.level) else: self.pending.sort(key=lambda _x: - _x.level)
[docs] def all(self): """ Returns a list of all jobs across different statuses (excluding disinherited jobs). Returns: list: A combined list of jobs from all statuses. """ self.sort_pending() return self.running + self.pending + self.success + self.failed + self.suspended
def __getitem__(self, item: str): """ Allows direct access to the job lists by their status name (e.g., "PENDING", "RUNNING", etc.). Parameters: item (str): The status of the jobs to access (e.g., "PENDING"). Returns: list: The list of jobs with the specified status. """ return self.__dict[item]
[docs] def step(self): """ Loops through all the jobs, checks their status, and updates their status accordingly. Returns: int: The number of jobs that had their status changed. """ step_changes = 0 for job in self.all(): status = job.status if status == "PENDING": if len(self.running) < self.max_running_jobs: # Check the job is already in queue, if not submit the job self.check_pending(job) elif status == "RUNNING": # Check the job has finished self.check_running(job) elif status == "SUSPENDED": self.check_children(job) # Register the change if any if status != job.status: step_changes += 1 # Create a mermaid graph for the new status # job4 = [j for j in self.all() if j.incremental_id==4] # job3 = [j for j in self.all() if j.incremental_id == 3] # if job4: # job4[0].graph() # if job3: # job3[0].graph() self.root.graph() # for job in self.pending: # self.checkpoint(job.incremental_id) self.loop += step_changes # In case the status of any of the jobs from the experiment changed, print the status. if step_changes: self.print_status() return step_changes
[docs] def check_pending(self, job: 'Job'): """ Checks if a job is already in the queue. If not, it is submitted for execution. Parameters: job (Job): The job to be checked and potentially started. Returns: None """ # Check the job is already in queue, if not submit the job job.run_job() job.status = "RUNNING" self.pending.remove(job) self.running.append(job)
[docs] def check_running(self, job: 'Job'): """ Checks if a running job has finished and updates its status accordingly. Parameters: job (Job): The job to be checked. Returns: None """ remote_status = job.remote_status if remote_status == "COMPLETED": # job.retrials = 0 # Passes on the remote directory and the communicator so as to retrieve pkl result file # Create plots # self.create_plots() result = job.get_result(self.accuracy_test) if result == "SUCCESS" : self.move_to_success(job) print("Job %s %s terminated with success" % (job.incremental_id, job.hash)) elif result == "FAIL": self.fail(job) else: print("WHATTHEHELL") # self.create_plots() # plot_result.plot_result( # input_file=self._result, # limits=self.accuracy_test.limits, # ensemble_members_path=self.accuracy_test.members_path, # output_folder=self.local_folder + "/plot/plot_Test" + str(self.hash) # ) elif remote_status in ["FAILED", "TIMEOUT", "NODE_FAIL"]: # Fail for numerical reasons its ok if self.accuracy_test.fail_for_numerical_reason(remote_rundir=job.remote_rundir): self.fail(job) elif remote_status == "NODE_FAIL": job.job_id = None # Remove entry from dictionary if the job is stored if job.hash in job.analysis_status: job.analysis_status.pop(job.hash) # Re-run the job print("Node where job %s was running failed, resubmitting job " % job.remote_rundir) self.move_to_pending(job) elif job.retrials < job.max_retrials and len(job.id_reduced_precision) > 1: # Reset job_id job.job_id = None # Remove entry from dictionary if the job is stored if job.hash in job.analysis_status: job.analysis_status.pop(job.hash) # Re-run the job print("job %s failed for unknown reasons, resubmitting job " % job.remote_rundir) self.move_to_pending(job) job.retrials += 1 else: if job.retrials > 0: print("Job failed for unknown reasons, was resubmitted %i times" % job.retrials) self.fail(job) elif remote_status == 'CANCELLED+': job.retrials += 1 self.fail(job) elif remote_status == "RUNNING": pass elif remote_status == "PENDING": pass else: raise AssertionError("Unhandled status %s" % remote_status)
[docs] def move_to_success(self, job: 'Job'): """ Moves a job to the SUCCESS list after it has completed successfully. Parameters: job (Job): The job to be moved to success. Returns: None """ self[job.status].remove(job) self.success.append(job) job.status = "SUCCESS" job.update_parent_set()
[docs] def move_to_failed(self, job: 'Job'): """ Moves a job to the FAILED list after it has failed. Parameters: job (Job): The job to be moved to failed. Returns: None """ self[job.status].remove(job) self.failed.append(job) job.status = "FAILED" if not job.reshuffle: job.update_parent_set() job.print_info_job("failed")
[docs] def move_to_pending(self, job: 'Job'): """ Moves a job to the PENDING list, indicating it is ready to be re-executed. Parameters: job (Job): The job to be moved to pending. Returns: None """ # Case of a new job move to pending if job in self[job.status]: self[job.status].remove(job) self.pending.append(job) job.status = "PENDING" job.print_info_job("added to pending")
[docs] def move_to_suspended(self, job: 'Job'): """ Moves a job to the SUSPENDED list, indicating it has failed and needs further evaluation. Parameters: job (Job): The job to be moved to suspended. Returns: None """ self[job.status].remove(job) self.suspended.append(job) job.status = "SUSPENDED" job.print_info_job("suspended")
[docs] def fail(self, job: 'Job'): """ Handles the failure of a job, either by resubmitting it, subdividing it into children, or marking it as failed. Parameters: job (Job): The job that has failed. Returns: None """ import AutoRPE.UtilsRPE.Error as Error import AutoRPE.UtilsWorkflow.ExceptionManager as ExceptionManager # The job is already a try to save some var from banned list, put it to fail if job.reshuffle: self.move_to_failed(job) return if not job.child: try: # If the job can be subdivided: spawn children job.spawn_children() self.move_to_suspended(job) for ch in job.child: if ch not in self.all(): self.move_to_pending(ch) except Error.ClusterCantBeDivided: # This is a leaf, we cannot subdivide the set, move to failed self.move_to_failed(job) else: # Either both child failed or father failed while the child were ok, check why and decide what to do ExceptionManager.resolve_exception(job, self)
# exception_manager.divide_and_force(self, queue) # queue.root.graph(queue.disinherited)
[docs] def check_children(self, job: 'Job'): """ Checks if all child jobs of a given job have finished. If all children are finished, updates the job status accordingly. Parameters: job (Job): The job whose children are being checked. Returns: None """ # Check if all the child jobs have finished all_finished = all([c.status == "FAILED" or c.status == "SUCCESS" for c in job.child]) if all_finished: # The initial reshuffled job is being analyzed if job.reshuffle and (job.parent is None or not job.parent.reshuffle): self.manage_reshuffled_job(job) return # for child in job.child: # # Get the banned variables from all the child # job.banned_variables += child.banned_variables # # # In case the child job failed, include its analysis variables in the banned variables # if child.status == "FAILED": # self.move_to_failed(child) # # Keep unique values only # job.banned_variables = list(set(job.banned_variables)) # job.id_reduced_precision = [var for var in job.id_reduced_precision if var not in job.banned_variables] if not job.id_reduced_precision: # The does not need to be rerun, both child have failed and all variables are double self.move_to_success(job) else: self.move_to_pending(job)
[docs] def manage_reshuffled_job(self, job: 'Job'): """ Manages a reshuffled job by determining whether it can be re-executed based on its child jobs' statuses. Parameters: job (Job): The reshuffled job to be managed. Returns: None """ import AutoRPE.UtilsWorkflow.ExceptionManager as ExceptionManager # The failed child must be discarded, since it contains the combination of prole that fails child = [c for c in job.child if not c.status == "FAILED"] if len(child) == 0: ExceptionManager.resolve_exception(job, self) return max_len = max([len(c.id_reduced_precision) for c in child]) for c in child: if len(c.id_reduced_precision) == max_len: job.id_reduced_precision = c.id_reduced_precision job.banned_variables = c.banned_variables job.child = job.reshuffle # First, move to failed the job self.move_to_failed(c.child_number) # Then ban his variables in his ancestors up to this job job.fail_child(c.child_number) for i_job in self.all(): if i_job.hash == c.child_number: for var_id in i_job.id_reduced_precision: self.ban_variable(job, var_id) # There can be more than one, the recursive function will take care of banning identical jobs break self.move_to_success(job) return
[docs] def ban_variable(self, job: 'Job', var_id: str): """ Bans a variable from being used in a job or its children if it is part of the reduced precision set. Parameters: job (Job): The job where the variable is banned. var_id (str): The ID of the variable to be banned. Returns: None """ for c in job.child: if var_id in c.id_reduced_precision: c.id_reduced_precision.remove(var_id) c.banned_variables.append(var_id) if not c.id_reduced_precision: self.move_to_failed(c) self.ban_variable(c, var_id) return
[docs] def print_status(self): """ Prints the current status of all jobs in the experiment, showing the number of jobs in each list. Returns: None """ for key in self.__dict.keys(): print("\t%18s: %4i" % (key, len(self[key])))
[docs] def checkpoint(self, incremental_id: int): """ Saves a checkpoint of the experiment at the current status. Parameters: incremental_id (int): The ID of the job at which to save the checkpoint. Returns: None """ import pickle import sys from os.path import isfile sys.setrecursionlimit(8000) if not incremental_id % 10 == 0 and incremental_id >= 15: return if incremental_id > self.max_id: self.max_id = incremental_id filename = "checkpoint_at_job_%s_d.pkl" % incremental_id else: filename = "checkpoint_at_job_%s_u.pkl" % incremental_id checkpoint_name = self.checkpoint_folder + "/%s" % filename if isfile(checkpoint_name): return print("Saving checkpoint!", end="\r") all_jobs = self.all() + self.disinherited # Remove communicator before saving it, since it throws the TypeError: can't pickle _thread.lock objects communicator = self.accuracy_test.communicator self.accuracy_test.communicator = None for job in all_jobs: job.communicator = None # Dump object if does not exist # pickle.dump(self, open(checkpoint_name, "wb")) # Restore the communicator self.accuracy_test.communicator = communicator for job in all_jobs: job.communicator = communicator print("Checkpoint saved at %s!" % filename)
def __hash__(self): """ Returns the hash of the experiment, which is the same as the hash of the root job. Returns: int: The hash of the experiment. """ return self.root.hash