from __future__ import annotations
from tqdm import tqdm
import AutoRPE.UtilsRPE.BasicFunctions as BasicFunctions
import AutoRPE.UtilsRPE.SourceManager as SourceManager
import AutoRPE.UtilsRPE.MaskCreator as MaskCreator
import AutoRPE.UtilsRPE.Getter as Getter
import AutoRPE.UtilsRPE.NatureDeterminer as NatureDeterminer
import AutoRPE.UtilsRPE.CallManager as CallManager
import AutoRPE.UtilsRPE.Inserter as Inserter
import AutoRPE.UtilsRPE.VariablePrecision as VariablePrecision
import AutoRPE.UtilsRPE.Error as Error
import AutoRPE.UtilsRPE.RegexPattern as RegexPattern
import AutoRPE.UtilsRPE.Classes.Vault as Vault
import AutoRPE.UtilsRPE.Classes.Module as Module
import AutoRPE.UtilsRPE.Classes.Procedure as Procedure
from AutoRPE.UtilsRPE.postproc.Formatter import format_fortran_content
from AutoRPE.UtilsRPE.logger import clean_line, logger
from AutoRPE.UtilsRPE.Classes.Subprogram import SubRoutine
import re
from pathlib import Path
# Hardcoded list, this should be solved
list_of_intrinsics_2fix = ["max", "maxloc", "maxval", "min", "minloc", "minval", "epsilon",
"isnan", "count", "abs", "mod", "reshape",
"atanh", "sqrt", "log",
"sum",
"erfc",
"merge",
"dsqrt", "cmplx", "transfer",
"matmul", "dot_product",
"huge"]
[docs]
def Implement_RPE(path_to_input_sources: str, path_to_output_sources: str, blacklist: list[str]) -> ImplementRPE.vault:
"""
Implements the Reduced Precision Emulator (RPE) for the source code located in `path_to_input_sources`
and stores the modified sources in `path_to_output_sources`. Additionally, a database (referred to as
vault) is created to store detailed information about the source code, including functions, subroutines,
modules, variables and their precisions, and all corresponding dependencies.
Parameters:
path_to_input_sources: The directory of the original pre-processed sources.
path_to_output_sources: The directory where modified sources will be saved.
blacklist: list of files that should not be modified by the tool.
Returns:
Vault: Database with all the information of the code.
"""
# Changes all the real declaration for RPE declaration
implementer = ImplementRPE(path_to_input_sources, path_to_output_sources, blacklist,
extension=".f90")
# Replaces real declaration with RPE declaration and save files to output sources
implementer.replace_real_with_RPE_declaration()
# Fix the problem the changes implied
implementer.fix_sources()
# Insert precision specification into sources
Inserter.add_sbits_to_rpe_variables(implementer.vault)
# Add the read_precisions and rpe_emulator use statement into all files (should be accessible for all modules)
Inserter.insert_use_statement(implementer.vault)
# Save module txt files to disk
implementer.save_files()
# Use fortran formatter with the output files
implementer.format_output_files()
# Create the read_precisions.f90 module
Inserter.create_read_precisions_module(path_to_output_sources, implementer.vault)
return implementer.vault
[docs]
class ImplementRPE:
"""
A class to implement the Reduced Precision Emulator (RPE) for Fortran source code.
This class facilitates modifying Fortran source files to use reduced precision
emulation, making necessary adjustments to variable declarations, function and
subroutine calls, interfaces, and more. It also integrates a vault for managing
metadata about the code, such as variables, subprograms, and modules.
Attributes:
output_path (str): The directory where modified sources will be saved.
blacklist (list): List of filenames or modules to exclude from modification.
vault (Vault): A database-like object containing detailed information about
the code structure and dependencies.
"""
def __init__(self, path_to_input_sources: str, path_to_output_sources: str, blacklist: list[str], extension=".f90"):
self.output_path = path_to_output_sources
self.blacklist = blacklist
# Parse the sources and obtain all info in the output sources
self.vault = Vault.create_database(path_to_input_sources, extension)
[docs]
def save_files(self):
"""
Saves all modified source files to the output directory specified during initialization.
This method iterates through all modules in the vault and writes their updated
content to disk.
"""
for module in self.vault.modules.values():
module.save_file(self.output_path)
[docs]
def replace_real_with_RPE_declaration(self):
"""
Replaces declarations of real variables with RPE variables in the source files.
This method identifies real variable declarations and ensures they are
converted to `type(rpe_var)` where applicable. It also propagates changes
to dependent variables while considering constraints such as module
blacklists and variable usage in external subprograms.
"""
def ban_condition(variable):
if variable.is_parameter:
return True
if variable.is_used_in_external is not None:
# There are external subprograms which have an interface for mixed call
# if variable.is_used_in_external.lower() not in SourceManager.allowed_externals:
if variable.is_used_in_external not in SourceManager.allowed_externals:
return True
# Black list all the variables that belong to black listed modules
if variable.module.filename in self.blacklist:
return True
return False
var_found = 0
# Stores the variables that can't be changed
unchanged_variables_hash = []
for v in self.vault.variables:
if v.type in VariablePrecision.real_id:
if ban_condition(v):
unchanged_variables_hash.append(v.hash_id)
# Now propagate their dependency
for hash_id in unchanged_variables_hash:
var = self.vault.get_variable_from_hash(hash_id)
for additional_var_hash in var.same_as:
if var.module.filename in self.blacklist:
# Do not propagate if procedure has an interface for mixed or ought to have one
if var.procedure.name[-3:] in ['_dp', '_sp'] or var.procedure.name in ['sum3x3_2d',
'sum3x3_3d',
'iom_get_123d',
'set_grid_bounds']:
continue
# This loop could be simplified, but would cause a mem error
if additional_var_hash not in unchanged_variables_hash:
unchanged_variables_hash.append(additional_var_hash)
for module in self.vault.modules.values():
if module.filename in self.blacklist:
continue
# print("\rReplacing real variables %i/%i %20s" % (index + 1, n_of_file, source_file.filename), end="")
# Replace real declarations with type(rpe_var)
# Look at each line for REAL variable declarations and replace it with type(rpe_var
for _index, line in enumerate(module.lines):
_real_with_precision = re.search(RegexPattern.real_with_precision_declaration, line, re.I)
if not _real_with_precision:
# No real declared in line
continue
# Get the variable name
var_name = re.search(r'(::|FUNCTION)\s*\b(\w+)\b', line).group(2)
procedure_name = module.line_info[_index].block.name
# Retrieve the var object
var_obj = self.vault.get_variable_by_info(var_name, procedure_name, module.name)
# Update this function using module.line_info[_index].block.name
# if not BasicFunctions.is_in_reals_blacklist(self.real_blacklist, line, source_file.module_name,
# dt_name):
# Replace original precision if allowed
if var_obj.hash_id not in unchanged_variables_hash:
var_obj.type = 'rpe_var'
var_found += 1
module.lines[_index] = line.replace(_real_with_precision.group(1), "type(rpe_var)")
[docs]
def fix_sources(self):
"""
Applies a series of fixes to the source files to integrate RPE.
This includes adjusting parameter declarations, function and subroutine calls,
read/write statements, interfaces, and pointer assignments to ensure compatibility
with RPE.
"""
n_of_modules = len(self.vault.modules)
# This fixes all modules at a time, execute it just once: Remove indistinguishable interfaces
self.fix_interfaces()
for module in tqdm(self.vault.modules.values(), desc="Fixing modules"):
# Fix all RPE variables declared with assignation
self.fix_parameter_declaration(module)
# Remove keyword PURE from function declarations
self.fix_pure_functions(module)
# Process intrinsic calls
self.fix_intrinsic_functions(module)
# Fix calls to type constructor with RPE components
self.fix_derived_type_constructor(module)
# Process read and write calls
self.fix_read_and_write_calls(module)
# Fix pointer assignations
self.fix_pointer_assignations(module)
self.fix_subprogram_calls()
[docs]
def fix_pure_functions(self, module:Module):
"""
Removes the `pure` attribute from function declarations in a given module.
Parameters:
module (Module): The module object containing the lines of source code to modify.
"""
for _index, line in enumerate(module.lines):
if re.search(RegexPattern.pure_function_declaration, line, flags=re.IGNORECASE):
module.lines[_index] = re.sub(RegexPattern.pure_function_declaration, "function ", line,
flags=re.IGNORECASE)
[docs]
def fix_parameter_declaration(self, module:Module):
"""
Adjusts parameter declarations in a module to comply with RPE requirements.
This includes converting parameter declarations with assignments to use
the RPE constructor where necessary.
Parameters:
module (Module): The module object containing the lines of source code to modify.
"""
# If there's an assignation whe have to convert everything to an rpe constructor call
for _index, line in enumerate(module.lines):
if re.search(RegexPattern.rpe_parameter_declaration, line, flags=re.I):
arguments = line.split("::")[1]
assignation = arguments.split("=")[1].strip()
content_of_declaration = self._find_kind_of_parameter_declaration(assignation,
module.line_info[_index].block)
# The argument need to be converted to real
if not content_of_declaration == "is_a_number":
# Take the real part of the rpe expression, to be passed to the RPE constructor
assignation_real_part = self.get_assignation_real_part(module.line_info[_index].block, assignation,
line,
content_of_declaration)
# Substituting the assignation with its real part in line
line = line.replace(assignation, assignation_real_part)
assignation = assignation_real_part
# Add the rpe constructor to assignation in line
module.lines[_index] = Inserter.add_rpe_type_constructor(assignation, line)
# There are some variables that are assigned values via the 'data' statement, for which we need a
# different pattern:
elif re.search(RegexPattern.data_statement_assignation, line, flags=re.I):
arguments = re.search(RegexPattern.data_statement_assignation, line, flags=re.I).group(1)
# assignation = line.split(arguments)[1].strip() # This is commented because we do not need it for now
# Assignation via `data`` declaration only accepts numbers (no operations, no using other variables),
# so we can assume that the assignation part will be a number. We only need to check if the variable
# is a `rpe_var` type variable or not. First, we get the index of this var in vault.variables:
indx = next((indx_ for indx_, var_ in enumerate(self.vault.variables) if var_.name == arguments), None)
if indx is None:
raise Exception(f"Could not found var {arguments}.")
# Then, by accessing to vault.variables[index] we get the type of variable:
if self.vault.variables[indx].type == 'rpe_var':
module.lines[_index] = f"{line.split(arguments)[0]}{arguments}%val{line.split(arguments)[1]}"
[docs]
def get_assignation_real_part(self, procedure: Procedure, argument: str, type_of_exception: str):
"""
Extracts the real part of an RPE variable or expression during an assignment.
Parameters:
procedure: The procedure containing the assignment.
argument: The argument being assigned.
line: The source code line containing the assignment.
type_of_exception: The type of special handling needed for the assignment
(e.g., intrinsic, operation, etc.).
Returns:
str: The modified assignment string with the real part extracted.
"""
# The assignation is made with a simple RPE variable
if type_of_exception == "rpe_var":
# Take the real part of the RPE var
return argument + "%val"
elif type_of_exception == "has_operation":
# add %val member to rpe variable among the operands
return self.get_operation_real_part(argument, procedure)
elif type_of_exception == 'has_intrinsics':
# Check if the intrinsic argument is an rpe var, and add the %val where needed
return self.get_intrinsic_real_part(argument, procedure)
else:
raise Error.ExceptionNotManaged("This %s is not managed yet" % type_of_exception)
[docs]
def fix_interfaces(self):
"""
Fixes indistinguishable interfaces by resolving conflicts caused by
substituting `sp/dp` real types with `rpe_var`.
This ensures that module procedures are properly updated to match
the expected interfaces.
"""
all_interfaces = self.vault.interfaces_dictionary
interfaces_to_fix = [interface for interface in all_interfaces
if len(all_interfaces[interface].key) != len(all_interfaces[interface].subprogram)]
if interfaces_to_fix:
for interface in interfaces_to_fix:
interface = all_interfaces[interface]
interface_to_remove = [n.name for n in interface.subprogram if n.name not in interface.key.values()]
for interface_name in interface_to_remove:
for index, line in enumerate(interface.module.lines):
# Matches the name of the interface
match = re.search(r'MODULE PROCEDURE .*(\b%s\b)' % interface_name, line)
if match:
# Remove the offending module procedure, keeps the rest
interface.module.lines[index] = "MODULE PROCEDURE " + \
",".join([i.strip() for i in
line.split("MODULE PROCEDURE")[1].split(",")
if not i.strip() == interface_name])
if interface.module.lines[index].strip() == "MODULE PROCEDURE":
interface.module.lines[index] = ""
break
if not match:
raise Error.ExceptionNotManaged("An indistinguishable interface was not fixed!")
[docs]
def fix_derived_type_constructor(self, module: Module):
"""
Adjusts calls to derived type constructors in the given module to be compatible with RPE.
This method ensures that arguments passed to derived type constructors
match the expected type, performing conversions if necessary.
Parameters:
module: The module object containing the lines of source code to modify.
"""
for line_index, line in enumerate(module.lines):
called_derived_type = self._call_to_derived_type_constructor(line, module.module_name)
if called_derived_type and not called_derived_type.is_external:
# Find call to type constructor
constructor_call = BasicFunctions.close_brackets(
line[re.search(r'\b%s\b' % called_derived_type.name, line, re.I).start():])
called_arguments = CallManager.find_call_arguments(constructor_call)
if not called_arguments:
continue
fixed_constructor_call = constructor_call
for argument_index, argument in enumerate(called_arguments):
argument_type = Getter.get_type_of_contents(argument, module.line_info[line_index].block,
self.vault)
dummy_argument = called_derived_type.get_member_by_position(argument_index)
if not dummy_argument:
raise AssertionError("Variable not found in position: %i" % argument_index)
if VariablePrecision.type_are_equivalent(argument_type, dummy_argument.type):
continue
elif argument_type in VariablePrecision.real_id and dummy_argument.type == "rpe_var":
fixed_constructor_call = Inserter.add_rpe_to_argument(argument, fixed_constructor_call)
else:
raise AssertionError("Types do not coincide and the case is not considered %s %s" % (
argument_type, dummy_argument.type))
module.lines[line_index] = line.replace(constructor_call, fixed_constructor_call)
def _fix_read_calls(self, module):
for line_index, line in enumerate(module.lines):
matches = re.search(RegexPattern.call_to_read, BasicFunctions.remove_if_condition(line), re.I)
if matches:
read_call = BasicFunctions.close_brackets(matches.groups()[0])
# Remove the read call
read_arguments = BasicFunctions.remove_if_condition(line)[matches.start(1):].replace(read_call, "")
# Continue just if arguments follow the call
if not read_arguments:
continue
replacement = read_arguments.replace(" ", "")
# Add parenthesis because the read() doesn't use them
arguments = CallManager.find_call_arguments("(" + replacement + ")")
# Further separate arguments. Used in case of loops READ(inum,*) ( tpifl (jfl), jfl=1, jpnrstflo)
list_arguments = []
index_to_remove = []
for argument_index, argument in enumerate(arguments):
if BasicFunctions.is_all_inside_parenthesis(argument):
index_to_remove.append(argument_index)
argument = CallManager.find_call_arguments(argument)
list_arguments.extend(argument)
# Remove the original argument
for i in index_to_remove[::-1]:
del arguments[i]
# Add the split argument
arguments.extend([i for i in list_arguments])
# Check the argument type and replace just if is an rpe
for argument_index, argument in enumerate(arguments):
argument_type = Getter.get_type_of_contents(argument, procedure=module.line_info[line_index].block,
vault=self.vault)
if argument_type == "rpe_var":
replacement = Inserter.replace_variable_exact_match(argument, argument + "%val", replacement)
# Rebuilt the line
# self.lines[line_index] = read_call + " " + read_arguments
module.lines[line_index] = line.replace(read_arguments, replacement)
def _fix_write_calls(self, module: Module):
for l_index, line in enumerate(module.lines):
write_statement = re.search(RegexPattern.call_to_function_name % "write", line, re.I)
if write_statement:
wc = BasicFunctions.close_brackets(line[write_statement.start():])
# Separate write statement from arguments
split_index = line.index(wc) + len(wc)
write_statement, arguments = line[:split_index], line[split_index:]
if not arguments.strip():
# There is no argument to fix
continue
# Transform the string in a list of arguments
arguments = CallManager.find_call_arguments("(" + arguments + ")")
for argument_index, argument in enumerate(arguments):
# Prepare array for possible match
hardcoded_array = []
if NatureDeterminer.is_hardcoded_array(argument, hardcoded_array):
subst = False
for el_index, el in enumerate(hardcoded_array):
argument_type = Getter.get_type_of_contents(el, module.line_info[l_index].block, self.vault)
if argument_type == "rpe_var":
subst = True
hardcoded_array[el_index] = "real( %s )" % el
if subst:
arguments[argument_index] = "(/ " + ", ".join(hardcoded_array) + " /)"
elif re.match(r"\(", argument):
# For calls like WRITE(numout,9402) (ilci(ji,jj),ilcj(ji,jj),ji=il1,il2)
array_elements = CallManager.find_call_arguments(argument)
for el_index, el in enumerate(array_elements):
argument_type = Getter.get_type_of_contents(el, module.line_info[l_index].block, self.vault)
if argument_type == "rpe_var":
array_elements[el_index] = "real( %s )" % el
arguments[argument_index] = "( " + ", ".join(array_elements) + " )"
else:
argument_type = Getter.get_type_of_contents(argument, module.line_info[l_index].block,
self.vault)
if argument_type == "rpe_var":
arguments[argument_index] = "real( %s )" % argument
module.lines[l_index] = write_statement + " " + ", ".join(arguments)
[docs]
def fix_read_and_write_calls(self, module: Module):
"""
Adjusts read and write calls in a module to handle RPE variables correctly.
Parameters:
module (Module): The module object containing the lines of source code to modify.
"""
self._fix_read_calls(module)
self._fix_write_calls(module)
[docs]
def fix_pointer_assignations(self, module: Module):
"""
Adjusts pointer assignments in the given module to be compatible with RPE variables.
This method ensures that assignments between pointers and targets of
different types (e.g., `rpe_var` and standard types) are handled correctly.
Parameters:
module (Module): The module object containing the lines of source code to modify.
"""
for line_index, line in enumerate(module.lines):
#TODO: The new timing function uses some pointing structures that are difficult to handle. For now we will just ignore it.
# However this error clause is probably too broad.
try:
assignation = Getter.get_pointer_assignment(line, self.vault, module.line_info[line_index].block)
except Error.VariableNotFound as err:
logger.error(err)
assignation = None
if assignation:
ptr, target = assignation[0]
if 'rpe_var' in [ptr.type, target.type] and ptr.type != target.type:
module.lines[line_index] = line.replace(target.name, "%s%%val" % target.name)
raise Error.ExceptionNotManaged("Why is this happening? Are dependency propagated correctly?")
[docs]
def fix_intrinsic_functions(self, module: Module):
"""
Adjusts calls to intrinsic functions in the given module to handle RPE variables.
This involves identifying intrinsic function calls, determining whether
their arguments are RPE variables, and applying the appropriate transformations
to ensure compatibility.
Parameters:
module (Module): The module object containing the lines of source code to modify.
"""
# Fix all the intrinsics function that are not overloaded in the RPE library
for _index, _line in enumerate(module.lines):
# No need to fix intrinsics in declaration, they have already been fixed
if re.search(RegexPattern.variable_declaration, _line, re.I):
continue
# Cycle through the matches, that can be more than one in a line
replacements = []
pieces_to_replace = []
# Mask literals to avoid false positive
mask_line = MaskCreator.mask_literal(_line)
matches_start_list = [i.start(1) for i in re.finditer(RegexPattern.call_to_function, mask_line, re.I)
if i.group(1).lower() in list_of_intrinsics_2fix]
for start in matches_start_list[::-1]:
# Save the piece that need change
pieces_to_replace.append(_line[start:].strip())
# Current index
i_match = len(pieces_to_replace) - 1
# Get intrinsic call
pieces_to_replace[i_match] = BasicFunctions.close_brackets(pieces_to_replace[i_match])
replacements.append(pieces_to_replace[i_match])
_line = _line[:start] + _line[start:].replace(pieces_to_replace[i_match],
"ALREADY_FIXED_" + str(i_match))
# Split the calls argument
arguments = CallManager.find_call_arguments(pieces_to_replace[i_match])
for arg in arguments:
if arg.count("ALREADY_FIXED_"):
if re.match(r"ALREADY_FIXED_\d+", arg):
continue
split_elements = []
if NatureDeterminer.has_operations(arg, split_elements):
# Split elements, removing the arguments already fixed, adding the others to the loop
arguments.extend([el for el in split_elements if not el.count("ALREADY_FIXED_")])
continue
if NatureDeterminer.is_logical_comparison(arg):
# For example: mask = iszij1 == ALREADY_FIXED_0
continue
try:
# Case in which an intrinsic is used as indexing of an array
Getter.get_variable(arg, module.line_info[_index].block, self.vault)
except Error.VariableNotFound:
continue
# raise Error.ExceptionNotManaged("Check this intrinsic please")
argument_type = Getter.get_type_of_contents(arg, module.line_info[_index].block, self.vault)
if argument_type != 'rpe_var':
continue
if re.match(RegexPattern.variable, arg):
replacements[i_match] = Inserter.add_val_to_argument(arg, replacements[i_match])
else:
replacements[i_match] = Inserter.add_real_to_argument(arg, replacements[i_match],
VariablePrecision.real_id["wp"])
for i_match, (piece_to_replace, replacement) in reversed(
list(enumerate(zip(pieces_to_replace, replacements)))):
# Starting from the end, and substituting just first match we avoid re-substituting matches
_line = _line.replace("ALREADY_FIXED_" + str(i_match), replacement, 1)
module.lines[_index] = _line
def _fix_external_call(self, external_call, line):
temporal_call = BasicFunctions.remove_if_condition(line)
called_arguments = external_call.arguments
# In case the procedure is not found, consider it external
for argument in called_arguments:
if argument.type == "rpe_var":
# If it is not a single variable a cast is safe, it must be intent IN
if NatureDeterminer.has_operations(argument.name):
temporal_call = Inserter.add_real_to_argument(argument.name, temporal_call,
VariablePrecision.real_id["dp"])
else:
# Here we add the real member of the RPE type
temporal_call = Inserter.add_rpe_real_member_to_argument(argument.name, temporal_call)
return temporal_call.strip()
def _fix_arguments(self, subprogram_call):
original_line = subprogram_call.line_info.line
called_arguments = CallManager.find_call_arguments(subprogram_call.call)
if not subprogram_call.subprogram.is_external:
for index, (called_arg, arg, dummy_a) in enumerate(zip(called_arguments, subprogram_call.arguments, subprogram_call.dummy_arguments)):
# This argument is not a real, don't need to check
if arg.type not in list(VariablePrecision.real_id.values()) + ['rpe_var']:
continue
# This argument is coherent
if VariablePrecision.type_are_equivalent(arg.type, dummy_a.type):
continue
# Inconsistency between used argument and what the call expects
if dummy_a.intent == "in":
if dummy_a.type == "rpe_var" and arg.type in VariablePrecision.real_id:
called_arguments[index] = Inserter.add_rpe_to_argument(arg.name, called_arg, dummy_a.is_pointer)
elif dummy_a.type in VariablePrecision.real_id and arg.type == "rpe_var":
if dummy_a.is_pointer and arg.variable.is_pointer:
print('call %s', subprogram_call.call)
# raise Error.ExceptionNotManaged("This dependency should have been be considered")
# %val member is dp by def, so we can use the val member only if the dummy is dp
if arg.variable is not None and not dummy_a.type == VariablePrecision.real_id['sp']:
called_arguments[index] = Inserter.add_val_to_argument(arg.name, called_arg)
# In all other cases, a cast is the only solution
else:
called_arguments[index] = Inserter.add_real_to_argument(arg.name, called_arguments[index],
VariablePrecision.real_id['dp'])
# The value of the argument can be modified by the call, if its an rpe pass the val
else:
if arg.type == "rpe_var" and dummy_a.type in VariablePrecision.real_id:
called_arguments[index] = Inserter.add_val_to_argument(arg.name, called_arg)
else:
raise Error.ExceptionNotManaged("Here we should use temporal variables")
# Create the new call
# Initialize fixed_call_name with the default value
fixed_call_name = subprogram_call.name
# TODO: The following section was only needed to fix a bug with a subprogram_call with a function glob_sum
# which was replacing the interface call with the call to the matching function instead.
# Handle the edge case in which the call used the interface name and it differs from the subprogram_call
# Check if the subprogram has an interface and if the interface name is different from the subprogram call name
has_interface = subprogram_call.subprogram.interface is not None
interface_name_differs = has_interface and subprogram_call.subprogram.interface.name != subprogram_call.name
if has_interface:
interface_name = subprogram_call.subprogram.interface.name
if interface_name_differs:
# Check if the interface name is present in the original line (case insensitive)
interface_in_line = re.search(r'\b' + re.escape(interface_name) + r'\b', original_line, re.IGNORECASE) is not None
if interface_in_line:
fixed_call_name = interface_name
fixed_call = f"{fixed_call_name}({', '.join(called_arguments)})"
else:
fixed_call = self._fix_external_call(subprogram_call, subprogram_call.call)
if isinstance(subprogram_call.subprogram, SubRoutine): fixed_call = f" {fixed_call}"
# Nothing needed to be fixed
if subprogram_call.call == fixed_call:
return
module = subprogram_call.block.module
index = subprogram_call.line_info.line_number
module.lines[index] = original_line.replace(subprogram_call.call, fixed_call)
[docs]
def fix_subprogram_calls(self):
"""
Ensures consistency between subprogram calls and their arguments.
This includes adjusting arguments passed to subprograms based on their
expected types and modifying the calls as needed.
"""
# Check if call and arguments are coherent, fix where not
for subprogram_calls in self.vault.dictionary_of_calls.values():
for call in subprogram_calls:
# Refresh arguments, that may have changed type due to previous fixes
call.update_actual_arguments(self.vault)
self._fix_arguments(call)
def _find_kind_of_parameter_declaration(self, argument, procedure):
"""Return the nature of the parameter declaration. Depending on the type, the fix will be different"""
type_of_exception = None
# These are real, but need some manipulation before adding the RPE constructor
if argument.count(","):
if argument.count("(/") or argument.count("RESHAPE([") or re.search(r"= *\[.*\]", argument):
type_of_exception = "is_a_number"
elif argument.count("rpe_var("):
type_of_exception = "already_fixed"
else:
# Get the type of content
content_type = Getter.get_type_of_contents(argument, procedure, self.vault)
# Check for intrinsic first: they return a real, but their argument need to be fixed if its an RPE
if NatureDeterminer.has_intrinsics(argument):
type_of_exception = 'has_intrinsics'
# If it doesn't have intrinsic and is real or integer, then is a number: just add and RPE constructor
elif content_type in list(VariablePrecision.real_id) + ['integer']:
return "is_a_number"
# Need to split operand, find rpe_variable and take their real part
elif NatureDeterminer.has_operations(argument):
type_of_exception = 'has_operation'
# Just a %val will be added
else:
type_of_exception = "rpe_var"
if not type_of_exception:
print(argument)
pass
# raise Error.ExceptionNotManaged("Unknown type of parameter declaration")
return type_of_exception
[docs]
def get_operation_real_part(self, argument: str, procedure: Procedure):
"""
Extracts the real parts of RPE variables involved in an operation.
This method processes operations containing RPE variables and ensures
that their real parts are correctly referenced.
Parameters:
argument (str): The expression containing the operation.
procedure (Procedure): The procedure where the operation is defined.
Returns:
str: The operation with the real parts of RPE variables substituted.
"""
# Get the elements in operation
split_elements = []
NatureDeterminer.has_operations(argument, split_elements)
argument_real_part = argument
for element in split_elements:
# If is an RPE get the real part
element = element.strip()
if Getter.get_type_of_contents(element, procedure, self.vault) == 'rpe_var':
element_real_part = " %s%%val " % element
# In case of intrinsic we need to fix the argument, because it has to be real
elif NatureDeterminer.has_intrinsics(element):
element_real_part = self.get_intrinsic_real_part(element, procedure)
else:
continue
argument_real_part = Inserter.replace_variable_exact_match(element, element_real_part,
argument_real_part)
return argument_real_part
[docs]
def get_intrinsic_real_part(self, string: str, procedure: Procedure):
"""
Adjusts the arguments of intrinsic function calls to handle RPE variables.
This method ensures that RPE variables used as arguments in intrinsic
functions have their real parts correctly referenced.
Parameters:
string: The intrinsic function call as a string.
procedure: The procedure where the function call is defined.
Returns:
str: The modified function call with the real parts of RPE variables substituted.
"""
# We need to fix ALL intrinsic, because their overload is not available at declaration time
intrinsic_argument = MaskCreator.mask_intrinsics(string)
rpe_element = []
for arg in intrinsic_argument:
if Getter.get_type_of_contents(arg, procedure, self.vault) == 'rpe_var':
# Get the real part of the RPE variable
split_elements = []
if NatureDeterminer.has_operations(arg, split_elements):
# List rpe element involved in the operation
rpe_element.extend([el for el in split_elements if
Getter.get_type_of_contents(el, procedure, self.vault) == "rpe_var"])
else:
rpe_element.append(arg)
for el in rpe_element:
string = Inserter.replace_variable_exact_match(el, el + "%val", string)
return string
def _call_to_derived_type_constructor(self, line, module_name):
line = MaskCreator.remove_literals(line)
matches = re.finditer(RegexPattern.assignation_to_DT, line, flags=re.I)
if matches:
for m in matches:
fun_name = m.groups()[0]
try:
derived_type = self.vault.get_derived_type_constructor(fun_name)
if derived_type:
return derived_type
except Error.DerivedTypeNotFound:
pass
return None