# This file contains the definition of a Binary Search Job, which its an extension of a Job class.
# The extension includes several methods that are useful to do a precision analysis using a binary search algorithm.
from AutoRPE.UtilsWorkflow.RemoteManager import RemoteManager
import AutoRPE.UtilsRPE.Error as Error
from numpy import var
# Extend it
[docs]
class Job(RemoteManager):
"""
Represents a binary search job for precision analysis, extending the RemoteManager.
"""
def __init__(self,
id_reduced_precision: list,
forced_ids: list,
banned_variables,
analysis_variables: list,
reduced_precision_level: int,
communicator: 'SSH',
vault: 'Vault',
template: str,
local_folder: str,
result_filename: str,
counter: 'Counter',
analysis_status: str
):
"""
Initializes a Job instance with analysis and precision-related parameters.
Parameters:
id_reduced_precision (list): IDs of variables in reduced precision.
forced_ids (list): IDs of variables with forced precision.
banned_variables (list): Variables excluded from analysis.
analysis_variables (list): Variables involved in the analysis.
reduced_precision_level (int): Current precision level.
communicator (SSH): Facilitates remote communication.
vault (Vault): Stores and retrieves variables.
template (str): Job template.
local_folder (str): Local storage path.
result_filename (str): Name of the result file.
counter (Counter): Tracks the job count.
analysis_status (str): Current status of the analysis.
"""
super().__init__(id_reduced_precision,
forced_ids,
reduced_precision_level,
analysis_variables,
communicator,
vault,
template,
result_filename,
counter,
local_folder=local_folder,
analysis_status=analysis_status)
# This class extends the classical job putting some extra variables that define the relations
self.parent = None
self.child = []
self.child_batch_list = []
self.child_number = None
self.batch_size = 4
self.reshuffle = False
self.level = None
# Other things that need to be saved
self.banned_variables = banned_variables[:]
self.local_folder = local_folder
self.max_retrials = 1
self.batch_counter = 0
self.counter = counter
self.plot_id = 0
self.last_graph = ""
[docs]
def get_variables_reduced_precision(self):
"""
Retrieves variables under reduced precision from the vault.
Returns:
list: Variables with reduced precision.
"""
# Uses the vault to get variables with reduced precision being analyzed from their ids
return [self.vault.get_variable_by_id(_id) for _id in self.id_reduced_precision]
[docs]
def get_cluster_id(self):
"""
Determines the cluster IDs of variables under reduced precision.
Returns:
list: Unique cluster IDs, empty if no clusters exist.
"""
var_reduced_precision = self.get_variables_reduced_precision()
cluster_id = [var.cluster_id for var in var_reduced_precision]
# Some variables do not belong to any cluster
if -1 in cluster_id:
return []
# Just clusters in this child
else:
return list(set(cluster_id))
[docs]
def has_cluster(self):
"""
Checks whether the job contains clustered variables.
Returns:
bool: True if clusters exist, False otherwise.
"""
if self.get_cluster_id():
return True
else:
return False
[docs]
def print_info_job(self, status: str):
"""
Prints job details with the given status.
Parameters:
status (str): Job status for logging.
"""
print("Job %s %s %s before computing fitness" % (self.incremental_id, self.hash, status))
[docs]
def ancestors(self):
"""
Retrieves all ancestor jobs of the current job.
Returns:
list[Job]: List of ancestor jobs.
"""
to_return = []
cv = self
while cv.parent is not None:
to_return.append(cv.parent)
cv = cv.parent
return list(to_return)
[docs]
def update_parent_set(self):
"""
Updates the parent job with banned and reduced precision variables.
"""
# Root job has not parent to update
if self.parent is None:
return
# Propagate the bad variables to the parent
banned_variables = self.banned_variables[:]
# If the job failed, also analyzed variables are bad
if self.status == "FAILED":
banned_variables += self.id_reduced_precision[:]
# Get the paren job
parent = self.parent
# Banning one of the good children would add twice the banned variables
parent.banned_variables += [b for b in banned_variables if b not in parent.banned_variables]
# Remove bad variables from analyzed set
parent.id_reduced_precision = [i for i in parent.id_reduced_precision if i not in parent.banned_variables]
[docs]
def descendants(self):
"""
Retrieves all descendant jobs recursively.
Returns:
list[Job]: Descendants of the current job.
"""
# Using a recursive approach to obtain the full list of descendants
if not self.child:
return []
else:
# Getting the two lists of descendants of the two child and merging them in a single list
child_descendants = sum([child.descendants() for child in self.child], [])
# Returning the child plus their descendants
return self.child[:] + child_descendants
[docs]
def create_child(self, _id_subset, _index):
"""
Creates a child job with a subset of variables.
Parameters:
_id_subset (list): Subset of reduced precision IDs.
_index (int): Child index in the hierarchy.
Returns:
Job: Newly created child job.
"""
child = Job(id_reduced_precision=_id_subset,
forced_ids=self.forced_ids,
analysis_variables=self.analysis_variables,
banned_variables=self.banned_variables,
reduced_precision_level=self.reduced_precision_level,
communicator=self.communicator,
vault=self.vault,
template=self.template,
local_folder=self.local_folder,
result_filename=self.result_filename,
counter=self.counter,
analysis_status=self.analysis_status
)
child.parent = self
child.child_number = _index
child.level = self.level + 1
return child
[docs]
def spawn_children(self):
"""
Spawns children jobs by dividing variable clusters.
"""
# Creates set for the two child
_id_subset = self.divide_set_cluster()
for index, id_subset in enumerate(_id_subset):
child = self.create_child(id_subset, index)
self.child.append(child)
[docs]
def fail_child(self, bad_child: 'Job'):
"""
Marks a child job as failed and propagates failure to its descendants.
Parameters:
bad_child (Job): The failed child job.
"""
if not self.child:
return
else:
for child in self.child:
if child == bad_child or child == bad_child.parent:
return
else:
bad_var = bad_child.banned_variables + bad_child.id_reduced_precision
common_id = [i for i in bad_var if i in child.id_reduced_precision]
if common_id:
child.id_reduced_precision = [i for i in child.id_reduced_precision if i not in common_id]
child.banned_variables += common_id
child.fail_child(bad_child)
[docs]
def find_child_set(self, analysis_set_dict: dict):
"""
Finds and stores active children in the analysis set dictionary.
Parameters:
analysis_set_dict (dict): Dictionary to store active children.
"""
for child in self.child:
if child.status != 'FAILED' and child.id_reduced_precision:
analysis_set_dict[child.hash] = child
child.find_child_set(analysis_set_dict)
[docs]
def create_children_batch(self):
"""
Groups child jobs into batches for submission, ordered by banned variables.
"""
# Save old child structure, that will be destroyed in creating new children
self.reshuffle = self.child[:]
# Get the list of all children nodes
child_analysis_set_dict = {}
self.find_child_set(child_analysis_set_dict)
# Sort the dictionary in order of length of variables analyzed
child_analysis_set_list = sorted(child_analysis_set_dict.values(), key=lambda x: len(x.id_reduced_precision))
# Compute number of batches needed
n_batches = len(child_analysis_set_list) // self.batch_size
if len(child_analysis_set_list) % self.batch_size:
n_batches += 1
# Create the structure with right size
self.child_batch_list = [[] for i in range(n_batches)]
# Fill the structure
for idx, child in enumerate(child_analysis_set_list):
batch_index = idx // self.batch_size
id_subset = [i for i in self.id_reduced_precision if i not in child.id_reduced_precision]
banned_variables = self.banned_variables + child.id_reduced_precision
child = self.create_child(id_subset, child)
child.banned_variables = banned_variables
child.reshuffle = True
self.child_batch_list[batch_index].append(child)
[docs]
def divide_set_module(self, variables: list):
"""
Divides a set of variables into two groups based on module and routine.
Parameters:
variables (list): Variables to be divided.
Returns:
tuple: Two subsets of variable IDs.
"""
import AutoRPE.UtilsRPE.Error as Error
if len(variables) == 1:
raise Error.ClusterCantBeDivided("This cluster can't be subdivided anymore")
# Dividing by modules
modules = [var.procedure.module.name for var in variables]
# It is necessary to sort the modules otherwise the order can be different in different runs
# leading to different groups with different hashes, which prevents reproducibility
unique_modules = list(sorted(set(modules)))
if len(unique_modules) > 1:
d = {}
for mod in unique_modules:
d[mod] = modules.count(mod)
d = {k: v for k, v in sorted(d.items(), key=lambda item: - item[1])}
g1 = {"mod": [], "val": 0}
g2 = {"mod": [], "val": 0}
for key, value in d.items():
if g1["val"] < g2["val"]:
g1["val"] += value
g1["mod"].append(key)
else:
g2["val"] += value
g2["mod"].append(key)
set1 = [v.id for v in variables if v.procedure.module.name in g1["mod"]]
set2 = [v.id for v in variables if v.procedure.module.name in g2["mod"]]
return set1, set2
else:
# Separate by routines
routines = [var.procedure.name for var in variables]
unique_routines = list(sorted(set(routines)))
if len(unique_routines) > 1:
d = {}
for routine in unique_routines:
d[routine] = routines.count(routine)
d = {k: v for k, v in sorted(d.items(), key=lambda item: - item[1])}
g1 = {"mod": [], "val": 0}
g2 = {"mod": [], "val": 0}
for key, value in d.items():
if g1["val"] < g2["val"]:
g1["val"] += value
g1["mod"].append(key)
else:
g2["val"] += value
g2["mod"].append(key)
set1 = [v.id for v in variables if v.procedure.name in g1["mod"]]
set2 = [v.id for v in variables if v.procedure.name in g2["mod"]]
return set1, set2
else:
# Just separate in two halves
# assert len(self.id_reduced_precision) > 1
set1 = []
set2 = self.id_reduced_precision[:]
for i in range(int(len(self.id_reduced_precision) / 2)):
set1.append(set2.pop(0))
return set1, set2
[docs]
def divide_set_cluster(self):
"""
Divides variables into subsets based on clusters and module hierarchy.
Returns:
tuple: Two subsets of variable IDs.
"""
# Just one variable left, no possibility to further subdivide: fail
if len(self.id_reduced_precision) == 1:
raise Error.ClusterCantBeDivided("This cluster can't be subdivided anymore")
# Retrieve variables being analyzed using the id_reduced_precision
var_reduced_precision = self.get_variables_reduced_precision()
# Get their cluster ids
cluster_id = [var.cluster_id for var in var_reduced_precision]
# Separate clusters from the rest of the variables (only applies in the first iteration of the tree)
if -1 in cluster_id and len(set(cluster_id)) > 1:
set1 = [var.id for var in var_reduced_precision if var.cluster_id == -1]
set2 = [var.id for var in var_reduced_precision if var.cluster_id != -1]
# We are dealing with clusters
elif -1 not in cluster_id:
# It is necessary to sort the clusters otherwise the order can be different in different runs
# leading to different groups with different hashes, which prevents reproducibility
cluster_id = list(set(cluster_id))
cluster_id.sort()
# Just one cluster: fail
if len(cluster_id) == 1:
raise Error.ClusterCantBeDivided("This cluster can't be subdivided anymore")
# Two cluster left: divide into two branches
elif len(cluster_id) == 2:
set1 = [var.id for var in var_reduced_precision if var.cluster_id == cluster_id[0]]
set2 = [var.id for var in var_reduced_precision if var.cluster_id == cluster_id[1]]
# Various clusters: distribute the clusters on the branches in a balanced way
else:
set1 = []
set2 = [var for var in var_reduced_precision if var.cluster_id != cluster_id[-1]]
while len(cluster_id) > 0 and len(set1) < len(set2):
c_id = cluster_id.pop()
set1.extend([v for v in var_reduced_precision if v.cluster_id == c_id])
set2 = [v for v in set2 if v.cluster_id != c_id]
set1 = [v.id for v in set1]
set2 = [v.id for v in set2]
# 4 - Just independent variables, either global or local: divide by module/subroutine
else:
n_main = len([v for v in var_reduced_precision if v.procedure.name == "main"])
if n_main != 0 and n_main != len(var_reduced_precision):
set1 = [v.id for v in var_reduced_precision if v.procedure.name == "main"]
set2 = [v.id for v in var_reduced_precision if v.procedure.name != "main"]
else:
set1, set2 = self.divide_set_module(var_reduced_precision)
return set1, set2
[docs]
def divide_function_level(self, variables: list):
"""
Divides variables into subsets based on function levels.
Parameters:
variables (list): Variables to be divided.
Returns:
list: List of subsets of variable IDs.
"""
sets = []
levels = list(set([v.procedure.level for v in variables if v.procedure.name != 'main']))
main_variables = [v.id for v in variables if v.procedure.name == 'main']
if len(main_variables):
sets.append(main_variables)
if len(levels) == 1 or len(main_variables) == len(variables):
return self.divide_set_module(variables)
for l in levels:
sets.append([v.id for v in variables if v.procedure.name != 'main' and v.procedure.level == l])
return sets
[docs]
def kind_of_exception(self):
"""
Categorizes the exception type when merging sets fails. Types of exception:
- No exception: It isn't a failed job
- Intra-routine: All variables belong to same routine
- Intra-module: All variables belong to same module, but not routine
- Inter-module: The variables belong to different modules.
Returns:
str: Type of exception ('IntraRoutine', 'IntraModule', 'InterModule', etc.).
"""
if not self.child:
return "TreeLeafFail"
child_1, child_2 = self.child
# We'll focus only on the id of reduced precision
ch1_variables = [self.vault.get_variable_by_id(_id) for _id in child_1.id_reduced_precision]
ch2_variables = [self.vault.get_variable_by_id(_id) for _id in child_2.id_reduced_precision]
ch1_routines = set([v.procedure for v in ch1_variables])
ch2_routines = set([v.procedure for v in ch2_variables])
if len(ch1_routines.union(ch2_routines)) == 1:
return "IntraRoutine"
ch1_modules = set([v.procedure.module for v in ch1_variables])
ch2_modules = set([v.procedure.module for v in ch2_variables])
if len(ch1_modules.union(ch2_modules)) == 1:
return "IntraModule"
return "InterModule"
[docs]
def graph(self):
"""
Generates a graph representation of the job and its descendants.
"""
members = self.descendants()
def style(status, incremental_id):
if status == "SUCCESS":
color = "#00b300"
elif status == "FAILED":
color = "#e60000"
elif status == "SUSPENDED":
color = "#ffff1a"
else:
color = "#aaa"
return "style %s fill:%s\n" % (incremental_id, color)
# return "style %s fill:%s\n" % (fix_identifier(node.identifier()), color)
graph = "graph TD\n"
graph += style(self.status, self.incremental_id)
max_else_status = 0
for member in members:
len_id_reduced_precision = len(member.id_reduced_precision)
len_banned_var = len(member.banned_variables)
len_forced_var = len(member.forced_ids)
ID = member.incremental_id
if len_id_reduced_precision == 0:
if len_banned_var:
node = "%s --> %s[ " + str(ID) + " : *+" + str(len_banned_var) + "]\n"
else:
node = "%s --> %s[*]\n"
else:
if len_banned_var:
node = "%s --> %s[ " + str(ID) + " : " + str(len_id_reduced_precision) + "+" + str(
len_banned_var) + "]\n"
elif len_forced_var:
node = "%s --> %s[ " + str(ID) + " : " + str(len_id_reduced_precision) + "+ _" + str(
len_forced_var) + "]\n"
else:
node = "%s --> %s[" + str(ID) + " : " + str(len_id_reduced_precision) + "]\n"
graph += node % (member.parent.incremental_id, member.incremental_id)
graph += style(member.status, member.incremental_id)
if member.status != "SUCCESS" and member.status != "FAILED" and member.status != "SUSPENDED" \
and member.incremental_id >= max_else_status:
max_else_status = member.incremental_id
import calendar
import time
# ts = calendar.timegm(time.gmtime())
# print(self.plot_id)
# mermaid_graph_file = open(
# self.local_folder + "/mermaid/plot_" + str(ts) + "last_run_id_" + str(max_else_status) + ".txt", "w")
if graph != self.last_graph:
mermaid_graph_file = open(
self.local_folder + "/mermaid/"+str(self.incremental_id)+"/plot_" + '{:0>4}'.format(self.plot_id) + ".txt", "w")
mermaid_graph_file.write(graph)
mermaid_graph_file.close()
self.plot_id += 1
self.last_graph = graph
[docs]
def plot_variables(self, var_name: str):
"""
Plots a variable's values over time and compares to error limits.
Parameters:
var_name (str): Name of the variable to plot.
Returns:
None | Exception: Returns None if successful, otherwise raises exceptions.
"""
import matplotlib.pyplot as plt
import pickle as pkl
import numpy as np
hash_folder_name = self.hash
limit_file_path = f'{self.local_folder}/analysis_configuration_files/limit_file.pkl'
simulation_rmse_path = f'{self.remote_rundir}/simulation_RMSE.pkl'
# Open limit file from local folder
try:
with open(limit_file_path, 'rb') as file:
limit_file_data = pkl.load(file)
except FileNotFoundError:
print(f'File in {limit_file_path} does not exist.')
return None
except Exception as e:
print(f'An error occurred while opening the file: {e}')
return None
# Open simulation result from remote folder:
buffer_size = 3 * 1024 * 1024
try:
with self.communicator.sftp.open(simulation_rmse_path, 'rb', bufsize=buffer_size) as f:
simulation_rmse_data = pkl.load(f)
except FileNotFoundError:
print(f'File in {simulation_rmse_path} does not exist.')
return None
except Exception as e:
print(f'An error occurred while opening the file: {e}')
return None
# Limit-file values:
values_limitfile = None
for grid_name in limit_file_data.keys():
if var_name in limit_file_data[grid_name].keys():
values_limitfile = limit_file_data[grid_name][var_name]
break
if values_limitfile is None:
print(f'variable "{var_name}" could not be found in file "{limit_file_path}"')
return None
bottom, top, mean = values_limitfile[0], values_limitfile[1], values_limitfile[2]
# Simulation RMSE values:
values_simulation = None
for grid_name in simulation_rmse_data.keys():
if var_name in simulation_rmse_data[grid_name].keys():
values_simulation = simulation_rmse_data[grid_name][var_name]
break
if values_simulation is None:
print(f'variable "{var_name}" could not be found in file "{simulation_rmse_path}"')
return None
# Start plotting
fig, ax = plt.subplots()
# TODO
time = np.linspace(0, 1, len(mean))
# Plotting the mean values
ax.plot(time, mean, label='Ensemble mean error', color='lightblue', linestyle='dashed')
# Plotting the error spread
ax.fill_between(time, bottom, top, color='lightblue', edgecolor='none', alpha=0.5, label='Ensemble error spread')
# Plotting the simulation values
ax.plot(time, values_simulation, label='Simulation RMSE', color='brown')
# Adding labels, legend and title
title = f'RMSE of variable {var_name} over time vs the limits set by the limit-file.'
ax.set_xlabel('time axis')
ax.set_ylabel(f'{var_name}')
ax.set_title(title)
ax.legend(loc='best')
# Save the plot
image_name = f'var_{var_name}_job_{self.hash}.png'
plt.savefig(image_name, bbox_inches='tight', pad_inches=0.25)