UtilsWorkflow

AutoRPE.UtilsWorkflow.BinaryTree

class AutoRPE.UtilsWorkflow.BinaryTree.BinaryTree(root: Job, accuracy_test: object, local_folder: str, max_running_jobs: int = 100)[source]

Bases: object

Represents a binary tree of jobs for an experiment, where each job can have various statuses. Jobs are processed based on their statuses and are moved through different stages in the workflow.

  • PENDING: The job has been created and is ready to be launched.

  • RUNNING: The job is currently running.

  • SUCCESS: A job that has been completed with successful results and asserted.

  • FAILED: A job that has been completed with unsuccessful results and cannot be split.

  • SUSPENDED: A job that has been completed with unsuccessful results, that has been split, and depends on the results of descendant jobs.

Initializes the binary tree with a root job, accuracy test, local folder, and a maximum number of running jobs.

Parameters:
  • root (Job) – The root job of the binary tree, which starts the experiment.

  • accuracy_test (object) – The accuracy test to evaluate the job results.

  • local_folder (str) – The local folder where the experiment data is stored.

  • max_running_jobs (int, optional) – The maximum number of jobs allowed to run concurrently (default is 100).

all()[source]

Returns a list of all jobs across different statuses (excluding disinherited jobs).

Returns:

A combined list of jobs from all statuses.

Return type:

list

ban_variable(job: Job, var_id: str)[source]

Bans a variable from being used in a job or its children if it is part of the reduced precision set.

Parameters:
  • job (Job) – The job where the variable is banned.

  • var_id (str) – The ID of the variable to be banned.

Returns:

None

check_children(job: Job)[source]

Checks if all child jobs of a given job have finished. If all children are finished, updates the job status accordingly.

Parameters:

job (Job) – The job whose children are being checked.

Returns:

None

check_pending(job: Job)[source]

Checks if a job is already in the queue. If not, it is submitted for execution.

Parameters:

job (Job) – The job to be checked and potentially started.

Returns:

None

check_running(job: Job)[source]

Checks if a running job has finished and updates its status accordingly.

Parameters:

job (Job) – The job to be checked.

Returns:

None

checkpoint(incremental_id: int)[source]

Saves a checkpoint of the experiment at the current status.

Parameters:

incremental_id (int) – The ID of the job at which to save the checkpoint.

Returns:

None

fail(job: Job)[source]

Handles the failure of a job, either by resubmitting it, subdividing it into children, or marking it as failed.

Parameters:

job (Job) – The job that has failed.

Returns:

None

manage_reshuffled_job(job: Job)[source]

Manages a reshuffled job by determining whether it can be re-executed based on its child jobs’ statuses.

Parameters:

job (Job) – The reshuffled job to be managed.

Returns:

None

move_to_failed(job: Job)[source]

Moves a job to the FAILED list after it has failed.

Parameters:

job (Job) – The job to be moved to failed.

Returns:

None

move_to_pending(job: Job)[source]

Moves a job to the PENDING list, indicating it is ready to be re-executed.

Parameters:

job (Job) – The job to be moved to pending.

Returns:

None

move_to_success(job: Job)[source]

Moves a job to the SUCCESS list after it has completed successfully.

Parameters:

job (Job) – The job to be moved to success.

Returns:

None

move_to_suspended(job: Job)[source]

Moves a job to the SUSPENDED list, indicating it has failed and needs further evaluation.

Parameters:

job (Job) – The job to be moved to suspended.

Returns:

None

print_status()[source]

Prints the current status of all jobs in the experiment, showing the number of jobs in each list.

Returns:

None

sort_pending(lower_first: bool = True)[source]

Sorts the pending jobs based on their levels, either prioritizing lower or higher levels.

Parameters:

lower_first (bool, optional) – If True, jobs are sorted by ascending level; otherwise, by descending level.

Returns:

None

step()[source]

Loops through all the jobs, checks their status, and updates their status accordingly.

Returns:

The number of jobs that had their status changed.

Return type:

int

AutoRPE.UtilsWorkflow.BinaryTreeSearch

class AutoRPE.UtilsWorkflow.BinaryTreeSearch.BinaryTreeSearch(communicator: SSH, local_folder: str, analysis_status: dict, job_template: str, vault: Vault, original_precision_level: int, reduced_precision_level: int, accuracy_test: object, max_running_jobs: int, output_filename: str, experiment_name: str = 'BinaryTreeSearch')[source]

Bases: object

Initializes the experiment with given parameters and sets up necessary components.

Parameters:
  • communicator (SSH) – The communicator object used to manage communication between remote-local machines.

  • local_folder (str) – The local directory to store data.

  • analysis_status (dict) – Dictionary used for tracking the experiment’s progress (status of each job of the analysis).

  • job_template (str) – Filename (path) of template used to generate jobs for the analysis.

  • vault (Vault) – The vault containing variables used for the analysis.

  • original_precision_level (int) – The precision level to use in the original run (dp=52, sp=23, hp=10).

  • reduced_precision_level (int) – The precision level for the reduced precision run (dp=52, sp=23, hp=10).

  • accuracy_test (object) – Object containing the accuracy test to validate results.

  • max_running_jobs (int) – The maximum number of jobs that can run simultaneously.

  • output_filename (str) – The name of the output file where results will be saved.

  • experiment_name (str, optional) – The name of the experiment. Defaults to “BinaryTreeSearch”.

Runs the binary tree search to evaluate the precision of variables.

Parameters:
  • id_forced_var (list, optional) – List of variable IDs that are forced into the analysis.

  • id_banned_var (list, optional) – List of variable IDs that are excluded from the analysis.

Returns:

The root job of the binary tree after the search is complete.

Return type:

Job

initial_check(forced_id: list = [], banned_id: list = [])[source]

Performs an initial test run with original precision to ensure the accuracy test works.

Parameters:
  • forced_id (list, optional) – List of variable IDs that are forced into the test.

  • banned_id (list, optional) – List of variable IDs that are excluded from the test.

Returns:

None

Raises:

AssertionError – If the basic test doesn’t pass, an error is raised.

print_root_configuration()[source]

Prints the configuration of the root job to the specified output file.

recover_checkpoint()[source]

Recovers the analysis state from a previously saved checkpoint.

Returns:

The binary tree object after recovery from the checkpoint.

Return type:

BinaryTree

root_job_succeeded()[source]

Checks if the root job of the binary tree search has successfully completed.

Returns:

True if the root job succeeded, otherwise False.

Return type:

bool

Raises:

AssertionError – If the root job failed, an error is raised.

setup(id_reduced_precision: list, forced_ids: list, id_banned_var: list)[source]

Sets up the experiment driver in preparation for starting the analysis.

Parameters:
  • id_reduced_precision (list) – List of variable IDs to use with reduced precision.

  • forced_ids (list) – List of variable IDs that must be included in the analysis.

  • id_banned_var (list) – List of variable IDs that should be kept in original precision.

Returns:

None

update_banned_variables()[source]

Updates the list of banned variables (to be kept at original precision) and writes them to a new file.

class AutoRPE.UtilsWorkflow.BinaryTreeSearch.Counter[source]

Bases: object

A simple counter class to track increments.

up(how_much: str = 1)[source]

Increment the counter by the specified amount and return the previous count.

Parameters:

how_much (int, optional) – The amount by which to increment the counter (default is 1).

Returns:

The previous value of the counter before the increment.

Return type:

int

class AutoRPE.UtilsWorkflow.BinaryTreeSearch.GracefulKiller[source]

Bases: object

This class is used to allow the user to stop an analysis at any point creating a pause_checkpoint.pkl which can be used in the future to restart the analysis from the same point.

It uses the signal library to catch signals.

exit_gracefully(signum: int, frame)[source]

Handle termination signals by setting the kill flag and printing a message.

Parameters:
  • signum (int) – The signal number received.

  • frame (frame object) – The current stack frame (unused in this implementation).

kill_now = False

AutoRPE.UtilsWorkflow.Communicator

class AutoRPE.UtilsWorkflow.Communicator.SSH(user: str, host: str, remote_scratch: str = '')[source]

Bases: object

Initializes an SSH connection and sets up SFTP.

Parameters:
  • user (str) – The username for SSH authentication.

  • host (str) – The host address of the remote server.

  • remote_scratch (str, optional) – Path to the remote scratch directory. Defaults to “”.

connect()[source]

Establishes an SSH connection to the remote host.

Returns:

True if the connection is established, False otherwise.

Return type:

bool

Raises:

IOError – If the connection cannot be established.

execute(command: str)[source]

Executes a command on the remote server.

Parameters:

command (str) – The command to execute.

Returns:

A tuple of file-like objects for the command’s stdin, stdout, and stderr.

Return type:

(stdin, stdout, stderr)

get(remote_path: str, local_path: str)[source]

Downloads a file from the remote server to the local machine.

Parameters:
  • remote_path (str) – The remote file path.

  • local_path (str) – The local destination file path.

init_transport()[source]

Initializes the SFTP transport for file transfers.

is_remote_file(path: str)[source]

Checks if a given path on the remote server is a file.

Parameters:

path (str) – The remote file path to check.

Returns:

True if the path is a file, False otherwise.

Return type:

bool

list_dir(dir_path: str)[source]

Lists the contents of a remote directory.

Parameters:

dir_path (str) – The remote directory path to list.

Returns:

A list of file and directory names in the remote directory.

Return type:

list[str]

put(local_path: str, remote_path: str)[source]

Uploads a file from the local machine to the remote server.

Parameters:
  • local_path (str) – The local file path.

  • remote_path (str) – The remote destination file path.

write_file(text: str, remote_path: str)[source]

Writes text content to a file on the remote server.

Parameters:
  • text (str) – The text to write to the file.

  • remote_path (str) – The remote file path to write the text.

AutoRPE.UtilsWorkflow.Communicator.mkdir_p(sftp: SFTPClient, remote_directory: str)[source]

Recursively creates directories on the remote server if they do not exist.

Parameters:
  • sftp (SFTPClient) – The SFTP client used for communication with the remote server.

  • remote_directory (str) – The remote directory path to create.

Returns:

True if any directories were created, False otherwise.

Return type:

bool

AutoRPE.UtilsWorkflow.Communicator.remote_isdir(attr)[source]

Checks if the given attribute corresponds to a directory.

Parameters:

attr (stat_result) – The file attributes of the remote file/directory.

Returns:

True if the attribute corresponds to a directory, False otherwise.

Return type:

bool

AutoRPE.UtilsWorkflow.ExceptionManager

“Different ways of handling exception

AutoRPE.UtilsWorkflow.ExceptionManager.children_fail_test(analyzed_job: Job, binary_tree: BinaryTree)[source]

Handles failure of children jobs by separating them into batches.

Parameters:
  • analyzed_job (Job) – The job being analyzed.

  • binary_tree (BinaryTree) – The binary tree managing the job states.

Returns:

None

AutoRPE.UtilsWorkflow.ExceptionManager.children_submit_batch(analyzed_job: Job, binary_tree: BinaryTree)[source]

Submits a batch of children jobs for analysis.

Parameters:
  • analyzed_job (Job) – The job whose children are being submitted.

  • binary_tree (BinaryTree) – The binary tree managing job states.

Returns:

None

AutoRPE.UtilsWorkflow.ExceptionManager.choose_child(analyzed_job: Job)[source]

Selects the child job for further analysis based on variable restrictions.

Parameters:

analyzed_job (Job) – The parent job with children to analyze.

Returns:

The selected success_child and failed_child.

Return type:

tuple

AutoRPE.UtilsWorkflow.ExceptionManager.divide_and_force(analyzed_job: Job, binary_tree: BinaryTree)[source]

Handles exceptions by dividing and forcing variables for analyzed jobs.

Parameters:
  • analyzed_job (Job) – The job being analyzed.

  • binary_tree (BinaryTree) – The binary tree managing the job states.

Returns:

None

AutoRPE.UtilsWorkflow.ExceptionManager.resolve_exception(analyzed_job: Job, binary_tree: BinaryTree)[source]

Resolves exceptions during job analysis.

Parameters:
  • analyzed_job (Job) – The job that encountered an exception.

  • binary_tree (BinaryTree) – The binary tree managing job states.

Returns:

None

AutoRPE.UtilsWorkflow.Job

class AutoRPE.UtilsWorkflow.Job.Job(id_reduced_precision: list, forced_ids: list, banned_variables, analysis_variables: list, reduced_precision_level: int, communicator: SSH, vault: Vault, template: str, local_folder: str, result_filename: str, counter: Counter, analysis_status: str)[source]

Bases: RemoteManager

Represents a binary search job for precision analysis, extending the RemoteManager.

ancestors()[source]

Retrieves all ancestor jobs of the current job.

Returns:

List of ancestor jobs.

Return type:

list[Job]

create_child(_id_subset, _index)[source]

Creates a child job with a subset of variables.

Parameters:
  • _id_subset (list) – Subset of reduced precision IDs.

  • _index (int) – Child index in the hierarchy.

Returns:

Newly created child job.

Return type:

Job

create_children_batch()[source]

Groups child jobs into batches for submission, ordered by banned variables.

descendants()[source]

Retrieves all descendant jobs recursively.

Returns:

Descendants of the current job.

Return type:

list[Job]

divide_function_level(variables: list)[source]

Divides variables into subsets based on function levels.

Parameters:

variables (list) – Variables to be divided.

Returns:

List of subsets of variable IDs.

Return type:

list

divide_set_cluster()[source]

Divides variables into subsets based on clusters and module hierarchy.

Returns:

Two subsets of variable IDs.

Return type:

tuple

divide_set_module(variables: list)[source]

Divides a set of variables into two groups based on module and routine.

Parameters:

variables (list) – Variables to be divided.

Returns:

Two subsets of variable IDs.

Return type:

tuple

fail_child(bad_child: Job)[source]

Marks a child job as failed and propagates failure to its descendants.

Parameters:

bad_child (Job) – The failed child job.

find_child_set(analysis_set_dict: dict)[source]

Finds and stores active children in the analysis set dictionary.

Parameters:

analysis_set_dict (dict) – Dictionary to store active children.

get_cluster_id()[source]

Determines the cluster IDs of variables under reduced precision.

Returns:

Unique cluster IDs, empty if no clusters exist.

Return type:

list

get_variables_reduced_precision()[source]

Retrieves variables under reduced precision from the vault.

Returns:

Variables with reduced precision.

Return type:

list

graph()[source]

Generates a graph representation of the job and its descendants.

has_cluster()[source]

Checks whether the job contains clustered variables.

Returns:

True if clusters exist, False otherwise.

Return type:

bool

kind_of_exception()[source]
Categorizes the exception type when merging sets fails. Types of exception:
  • No exception: It isn’t a failed job

  • Intra-routine: All variables belong to same routine

  • Intra-module: All variables belong to same module, but not routine

  • Inter-module: The variables belong to different modules.

Returns:

Type of exception (‘IntraRoutine’, ‘IntraModule’, ‘InterModule’, etc.).

Return type:

str

plot_variables(var_name: str)[source]

Plots a variable’s values over time and compares to error limits.

Parameters:

var_name (str) – Name of the variable to plot.

Returns:

Returns None if successful, otherwise raises exceptions.

Return type:

None | Exception

print_info_job(status: str)[source]

Prints job details with the given status.

Parameters:

status (str) – Job status for logging.

spawn_children()[source]

Spawns children jobs by dividing variable clusters.

update_parent_set()[source]

Updates the parent job with banned and reduced precision variables.

AutoRPE.UtilsWorkflow.RemoteManager

class AutoRPE.UtilsWorkflow.RemoteManager.RemoteManager(id_reduced_precision: list, forced_ids: list, reduced_precision_level: list, analysis_variables: list, communicator: SSH, vault: Vault, template: str, result_filename: str, counter: Counter, local_folder: str, analysis_status: str)[source]

Bases: object

Initializes the RemoteManager with the given parameters.

Parameters:
  • id_reduced_precision (list) – List of reduced precision variable IDs.

  • forced_ids (list) – List of forced variable IDs, if any.

  • reduced_precision_level (int) – Precision level for the variables.

  • analysis_variables (list) – List of analysis variables.

  • communicator (SSH) – The communicator object for remote operations.

  • vault (Vault) – Vault containing variable data.

  • template (str) – Path to the job template.

  • result_filename (str) – Filename for the results.

  • counter (Counter) – Counter object for incrementing job IDs.

  • local_folder (str) – Local directory for storing job data.

  • analysis_status (dict) – Dictionary containing the status of various analyses.

check_running()[source]

Checks if the job is currently running on the remote scheduler.

Returns:

The current status of the job (e.g., “RUNNING”, “PENDING”).

Return type:

str

check_submitted()[source]

Checks if the job has already been submitted to the remote scheduler.

Returns:

The job’s current status (e.g., “PENDING” or “TO_SUBMIT”).

Return type:

str

evaluate_simulation(accuracy_test: object)[source]

Evaluates the accuracy of the simulation using the provided accuracy test.

Parameters:

accuracy_test (object) – The accuracy test to evaluate the simulation.

Returns:

The result of the accuracy test evaluation.

Return type:

str

generate_jobscript()[source]

Generates a job script based on the template and current job parameters.

Returns:

A job script ready for submission to the scheduler.

Return type:

str

generate_namelist()[source]

Generates a namelist for the variables being analyzed, setting their precision according to the reduced precision level.

Returns:

A string representing the generated namelist.

Return type:

str

get_result(accuracy_test: object)[source]

Retrieves the result of the simulation, either from the dictionary or from the remote system.

Parameters:

accuracy_test (object) – The accuracy test to evaluate the result.

Returns:

The simulation result, either from the dictionary or from a remote file.

Return type:

str

property hash

Generates a unique hash based on the list of variable IDs and reduced precision level.

Returns:

A hash string representing the unique identifier.

Return type:

str

job_status()[source]

Retrieves the current status of the job based on its ID.

Returns:

The current status of the job (e.g., “COMPLETED”, “PENDING”).

Return type:

str

property remote_status

Retrieves the remote status of the job.

Returns:

Current job status.

Return type:

str

run_job(force: bool = False)[source]

Submits the job to the remote scheduler if it has not been submitted or if forced.

Parameters:

force (bool) – If True, forces the submission of the job even if it has already been submitted.

Returns:

The status of the job after attempting to run.

Return type:

str

Raises:

ExceptionNotManaged – If the job status is unknown.

property status

Retrieves the current status of the job.

Returns:

Current job status.

Return type:

str

submit_job(check_low=True)[source]

Submits the job to the scheduler and returns the job ID.

Parameters:

check_low (bool) – If True, checks if the job is queued and handles any low-level errors.

Returns:

The job ID assigned by the scheduler.

Return type:

str

update_parameters()[source]

Updates the parameters used for remote job submission, including paths and job-related attributes.

property variable_set

Returns the set of variable IDs, including forced IDs if provided.

Returns:

List of variable IDs, including forced IDs.

Return type:

list

variables()[source]

Retrieves the variables corresponding to the variable IDs in the variable set.

Returns:

List of variables corresponding to the IDs in the variable set.

Return type:

list