# Python structure to control the submission and status of a remote job with few other capabilities.
[docs]
class RemoteManager:
"""
Initializes the RemoteManager with the given parameters.
Parameters:
id_reduced_precision (list): List of reduced precision variable IDs.
forced_ids (list): List of forced variable IDs, if any.
reduced_precision_level (int): Precision level for the variables.
analysis_variables (list): List of analysis variables.
communicator (SSH): The communicator object for remote operations.
vault (Vault): Vault containing variable data.
template (str): Path to the job template.
result_filename (str): Filename for the results.
counter (Counter): Counter object for incrementing job IDs.
local_folder (str): Local directory for storing job data.
analysis_status (dict): Dictionary containing the status of various analyses.
"""
def __init__(self, id_reduced_precision: list,
forced_ids: list,
reduced_precision_level: list,
analysis_variables: list,
communicator: 'SSH',
vault: 'Vault',
template: str,
result_filename: str,
counter: 'Counter', local_folder: str, analysis_status: str):
self.vault = vault
self.id_reduced_precision = id_reduced_precision
self.forced_ids = [] if not forced_ids else forced_ids[:]
self.analysis_variables = analysis_variables
self.incremental_id = counter.up() if counter else 0
self.communicator = communicator
self.reduced_precision_level = reduced_precision_level
self.job_name = "%i-%s" % (self.incremental_id, self.hash)
self.num_proc = 48
self.remote_scratch = self.communicator.remote_scratch
self.remote_logdir = "%s/analysis/LOGS" % self.remote_scratch
self.remote_namelist_dir = "%s/Namelists" % self.remote_logdir
self.remote_rundir = "%s/analysis/%s" % (self.remote_scratch, self.hash)
self.job_id_filename = "%s/job_id.txt" % self.remote_rundir
self.local_folder = local_folder
self.remote_jobscript_path = "%s/%s.cmd" % (self.remote_logdir, self.hash)
self.remote_namelist = "%s/%s" % (self.remote_namelist_dir, self.hash)
self.template = template
self.result_filename = result_filename
self.jobscript = self.generate_jobscript()
self.elapsed_time = None
self.job_id = None
self.retrials = 0
self._result = None
self.analysis_status = analysis_status
# The default status is PENDING
self._status = "PENDING"
@property
def variable_set(self):
"""
Returns the set of variable IDs, including forced IDs if provided.
Returns:
list: List of variable IDs, including forced IDs.
"""
if self.forced_ids:
return self.id_reduced_precision + self.forced_ids
else:
return self.id_reduced_precision
@property
def hash(self):
"""
Generates a unique hash based on the list of variable IDs and reduced precision level.
Returns:
str: A hash string representing the unique identifier.
"""
import hashlib
# Sort in order of raising id
self.variable_set.sort()
# Convert to string the list of ids
string_to_hash = "".join([str(i_id) + '_' + str(self.reduced_precision_level) for i_id in self.variable_set])
# Create the hash
hash_object = hashlib.md5(string_to_hash.encode())
return hash_object.hexdigest()
[docs]
def variables(self):
"""
Retrieves the variables corresponding to the variable IDs in the variable set.
Returns:
list: List of variables corresponding to the IDs in the variable set.
"""
return [self.vault.get_variable_by_id(_x) for _x in set(self.variable_set)]
@property
def remote_status(self):
"""
Retrieves the remote status of the job.
Returns:
str: Current job status.
"""
_status = self.job_status()
return _status
@property
def status(self):
"""
Retrieves the current status of the job.
Returns:
str: Current job status.
"""
return self._status
@status.setter
def status(self, new_value: str):
"""
Sets a new status for the job.
Parameters:
new_value (str): The new status to be set for the job.
Raises:
AssertionError: If the new status is not a valid job status.
"""
possible_status = ["PENDING", "RUNNING", "ASSERTION_PENDING", "SUCCESS", "FAILED", "SUSPENDED"]
# The list of possible status shall cover all the possible situations in which a job can be found.
# PENDING: The job has been created and ready to be launched.
# A PENDING JOB CAN ONLY HAVE SUSPENDED ANCESTORS AND SUCCESS OR FAILED DESCENDANTS
# RUNNING: The job is RUNNING.
# A RUNNING JOB CAN ONLY HAVE SUSPENDED ANCESTORS AND SUCCESS OR FAILED DESCENDANTS.
# SUCCESS: A job that has been completed with successful results and asserted.
# A SUCCESS job can not have both descendants FAILED
# FAILED: A job that has been completed with unsuccessful results and that can not be
# split ( Already was or a single variable set).
# SUSPENDED: A job that has been completed with unsuccessful results that has been split and depends on the
# results of descendant jobs.
# SUSPENDED ancestors must be SUSPENDED.
# At least one direct descendant should be different from FAILED or COMPLETED
if new_value in possible_status:
self._status = new_value
else:
raise AssertionError("Unknown status")
[docs]
def generate_jobscript(self):
"""
Generates a job script based on the template and current job parameters.
Returns:
str: A job script ready for submission to the scheduler.
"""
from os import path
with open(self.template) as f:
job = f.read()
job = job.replace("%REMOTE_PATH%", self.remote_scratch)
job = job.replace("%RUNHASH%", self.hash)
job = job.replace("%JOBNAME%", self.job_name)
job = job.replace("%LOGDIR%", self.remote_logdir)
job = job.replace("%NAMELIST%", self.remote_namelist)
return job
[docs]
def check_submitted(self):
"""
Checks if the job has already been submitted to the remote scheduler.
Returns:
str: The job's current status (e.g., "PENDING" or "TO_SUBMIT").
"""
command = 'squeue --format="%i %j"'
stdin, stdout, stderr = self.communicator.execute(command)
output = stdout.read().decode()
lines = output.split("\n")
if len(lines) > 1:
jobs_in_remote_queue = lines[1:]
for line in jobs_in_remote_queue:
if line.strip():
job_id, job_name = line.split()
# Take into account just the jobs related to the analysis
if '-' not in job_name:
continue
# Remove node id that can differ from one run to the next
if job_name.split("-")[1] == self.job_name.split("-")[1]:
self.job_id = job_id
return "PENDING"
return "TO_SUBMIT"
[docs]
def check_running(self):
"""
Checks if the job is currently running on the remote scheduler.
Returns:
str: The current status of the job (e.g., "RUNNING", "PENDING").
"""
# TODO: To use this tool with other schedulers, it might be convenient to define it somewhere else.
scheduler_status_command = 'sacct --format="State,Elapsed"'
# Defining the command that will be sent to the remote machine
command = '%s --job %s --user=$USER' % (scheduler_status_command, self.job_id)
# Executing the command using the communicator
stdin, stdout, stderr = self.communicator.execute(command)
# Read status and time in the output received.
output = stdout.read().decode()
try:
split_output = output.split("\n")[2].split()
status = split_output[0]
except IndexError:
# Workaround for jobs run by other users on MN4 (sacct on MN4 won't allow reading others users job status)
if output == ' State Elapsed \n---------- ---------- \n':
status = "COMPLETED"
else:
status = "PENDING"
return status
[docs]
def generate_namelist(self):
"""
Generates a namelist for the variables being analyzed, setting their precision according to the reduced precision level.
Returns:
str: A string representing the generated namelist.
"""
# Namelist blueprint
line_text = "emulator_variable_precisions(%d) = %d\t! Variable:%24s\tRoutine:%22s\tModule:%16s\n"
# Namelist text
namelist = ["! namelist variable precisions\n&precisions\n"]
# Order variables by id ascending order
self.analysis_variables.sort(key=lambda x: x.id)
# Fill namelist
for v in self.analysis_variables:
# Reduce precision if variable is being analyzed otherwise keep original
v_precision = self.reduced_precision_level if v.id in self.variable_set else v.precision
namelist.append(
line_text % (v.id, v_precision, v.name, v.procedure.name, v.procedure.module.name))
namelist.append("/\n")
return "".join(namelist)
[docs]
def run_job(self, force: bool=False):
"""
Submits the job to the remote scheduler if it has not been submitted or if forced.
Parameters:
force (bool): If True, forces the submission of the job even if it has already been submitted.
Returns:
str: The status of the job after attempting to run.
Raises:
ExceptionNotManaged: If the job status is unknown.
"""
import AutoRPE.UtilsRPE.Error as Error
# Submit the job if force is true or the results are not present yet
job_status = self.remote_status
if force or job_status == "TO_SUBMIT":
# The job has not been submitted yet
print("Submitting job with incremental id %s " % self.incremental_id)
self.job_id = self.submit_job()
print("Job %s successfully submitted with job_id %s" % (self.incremental_id, self.job_id))
else:
if job_status in ["COMPLETED", "TIMEOUT", "FAILED", "NODE_FAIL", "CANCELLED+"]:
return job_status
elif job_status == "RUNNING":
# Set the got from the remote
print("Job %s with job_id %s is running" % (self.incremental_id, self.job_id))
elif job_status == "PENDING":
print("Job %s was already in queue with job_id %s" % (self.incremental_id, self.job_id))
return "RUNNING"
else:
raise Error.ExceptionNotManaged("Unknown job status")
[docs]
def submit_job(self, check_low=True):
"""
Submits the job to the scheduler and returns the job ID.
Parameters:
check_low (bool): If True, checks if the job is queued and handles any low-level errors.
Returns:
str: The job ID assigned by the scheduler.
"""
import re
# Write the namelist to remote file
# try:
# job_result = self.communicator.sftp.open(self.remote_rundir+"/time.step").read().decode()
# if job_result.split()[-1] == "152":
# print("AHHHHHHH")
# except IOError:
# pass
self.communicator.write_file(self.generate_namelist(), self.remote_namelist)
# Write job to remote file
self.communicator.write_file(self.jobscript, self.remote_jobscript_path)
# with self.communicator.sftp.file(self.remote_jobscript_path, "w") as jobscript:
# jobscript.write(self.jobscript)
# Defining the scheduler submit command
# TODO: To use this tool with other schedulers, it might be convenient to define it somewhere else.
scheduler_submit_command = "sbatch"
# Defining the command to submit the job
command = "%s %s" % (scheduler_submit_command, self.remote_jobscript_path)
# Executing the command using the communicator
stdin, stdout, stderr = self.communicator.execute(command)
output = str(stdout.read())
# Read the job id of the submitted job
job_id_pattern = "Submitted batch job +([0-9]+)"
job_id = re.search(job_id_pattern, output).group(1)
# Return job id
if (check_low):
# After MN4 update, jobs are failing due to slowliness of SLURM: add a sleep if job is not yet in queue
import time
status = self.check_running()
while status == "COMPLETED":
time.sleep(5)
status = self.check_running()
return job_id
[docs]
def job_status(self):
"""
Retrieves the current status of the job based on its ID.
Returns:
str: The current status of the job (e.g., "COMPLETED", "PENDING").
"""
self.update_parameters()
try:
self.job_id , job_status, _, self.retrials = self.analysis_status[self.hash]
if job_status == "COMPLETED":
return job_status
except KeyError:
pass
if self.job_id is None:
return self.check_submitted()
else:
return self.check_running()
[docs]
def evaluate_simulation(self, accuracy_test: object):
"""
Evaluates the accuracy of the simulation using the provided accuracy test.
Parameters:
accuracy_test (object): The accuracy test to evaluate the simulation.
Returns:
str: The result of the accuracy test evaluation.
"""
try:
return accuracy_test.evaluate_success(remote_rundir=self.remote_rundir)
except IOError:
self.remote_scratch = '/gpfs/scratch/bsc32/bsc32402/a5x6/'
self.remote_rundir = "%s/analysis/%s" % (self.remote_scratch, self.hash)
return accuracy_test.evaluate_success(remote_rundir=self.remote_rundir)
[docs]
def get_result(self, accuracy_test: object):
"""
Retrieves the result of the simulation, either from the dictionary or from the remote system.
Parameters:
accuracy_test (object): The accuracy test to evaluate the result.
Returns:
str: The simulation result, either from the dictionary or from a remote file.
"""
if self.incremental_id > 1000 and self.parent.incremental_id == 1:
return self.evaluate_simulation(accuracy_test)
filename_to_evaluate = self.job_id + '_RESULT.txt'
try:
_, _, job_result, _ = self.analysis_status[self.hash]
except KeyError:
try:
# Attempt to open and read the remote results file
with self.communicator.sftp.open(self.remote_rundir + "/" + filename_to_evaluate, "r") as file:
job_result = file.read().decode()
# Add the result to the dictionary for future queries
self.analysis_status[self.hash] = [self.job_id, self.status, job_result, self.retrials]
except IOError as e:
# Handle file not found, permissions, or other IO errors
job_result = "ERROR: Could not read result file - " + str(e)
except Exception as e:
# Handle other unforeseen errors
job_result = "ERROR: An unexpected error occurred - " + str(e) # If the simulation was running when the analysis status dict was created
if job_result == "UNKNOWN":
job_result = self.evaluate_simulation(accuracy_test)
elif "ERROR" in job_result:
# Optionally handle or log the error before moving on
pass
return job_result
[docs]
def update_parameters(self):
"""
Updates the parameters used for remote job submission, including paths and job-related attributes.
"""
remote_rundir = self.remote_rundir
job_hash = self.hash
rundir = "%s/analysis/%s" % (self.remote_scratch, job_hash)
if rundir != remote_rundir:
self.remote_rundir = rundir
self.job_name = "%i-%s" % (self.incremental_id, job_hash)
self.remote_jobscript_path = "%s/%s.cmd" % (self.remote_logdir, job_hash)
self.remote_namelist = "%s/%s" % (self.remote_namelist_dir, job_hash)
self.jobscript = self.generate_jobscript()
self.job_id_filename = "%s/job_id.txt" % self.remote_rundir
# Reset job_id
self.job_id = None
# def get_checkpoint_name(self):
# self.name=self.incremental_id
# print(self.name)
# return self.name