diff --git a/multiprocess_functions.py b/multiprocess_functions.py index 128bdae..ad9edca 100644 --- a/multiprocess_functions.py +++ b/multiprocess_functions.py @@ -110,7 +110,7 @@ def update_queue(result): return error_files -def create_process_list(files, MIN_MEM_PER_PROCESS, MIN_CORES_PER_PROCESS, LOAD_BALANCE_FACTOR): +def create_process_list(num_instances, MIN_MEM_PER_PROCESS, MIN_CORES_PER_PROCESS, LOAD_BALANCE_FACTOR): total_cores = get_total_cores() numa_nodes = get_numa_nodes() cores_per_numa = total_cores//numa_nodes @@ -128,7 +128,7 @@ def create_process_list(files, MIN_MEM_PER_PROCESS, MIN_CORES_PER_PROCESS, LOAD_ # Core_max cm = cores_per_numa // MIN_CORES_PER_PROCESS - pn = len(files) // numa_nodes + pn = num_instances // numa_nodes # Load_balance_max lbm = max(pn//LOAD_BALANCE_FACTOR, 1) print("Memory max {}, Core max {}, Load Balance max {}".format(mm, cm, lbm)) @@ -164,4 +164,73 @@ def start_process_list(files, max_processes_list, bash_subprocess): continue files = returned_files - return files \ No newline at end of file + return files + + +def multiprocess_models(files, max_processes_list, model_list, num_multimer_predictions_per_model, bash_subprocess): + + files = sorted(files, key=os.path.getsize, reverse=True) + total_cores = get_total_cores() + combo = [] + for file in files: + for model in model_list: + for i in range(num_multimer_predictions_per_model): + combo.append((file, model, i)) + #combo.append((file, model)) + + for max_processes in max_processes_list: + if len(combo) == 0: + break + if len(combo) < (3*max_processes)//4: + continue + os.environ["OMP_NUM_THREADS"] = str(total_cores//max_processes) + print("Number of OMP Threads = {}, for {} instances".format(os.environ.get('OMP_NUM_THREADS'), max_processes)) + + cores_per_process = total_cores // max_processes + pool = mp.Pool(processes=max_processes) + + queue = [i for i in range(max_processes)] + core_list, numa_nodes = get_core_list(cores_per_process) + + error_combo = [] + def update_queue(result): + print(result) + index = core_list.index(result[4][0]) + queue.append(index // cores_per_process) + if (result[0] != 0): + error_combo.append((result[1], result[2], result[3])) + + print(len(combo)) + results = [None] * len(combo) + + i = 0 + for c in combo: + file_path = c[0] + model_name = c[1] + prediction_id = c[2] + random_seed = 10*(i%len(model_list)) + prediction_id # Random seed is set to 0, 10, 20, 30, 40 for each model + + process_num = queue.pop(0) + + if max_processes < numa_nodes: + if max_processes == 1: + if numa_nodes > 1: + mem = '0-{}'.format(numa_nodes-1) + else: + mem = '0' + else: + mem = '{}-{}'.format(str(process_num * (numa_nodes//max_processes)), str(((process_num + 1) * (numa_nodes//max_processes)) - 1)) + else: + mem = str(process_num//(max_processes//numa_nodes)) + + results[i] = pool.apply_async(bash_subprocess, args=(file_path, model_name, random_seed, mem, core_list[process_num*cores_per_process: (process_num+1)*cores_per_process]), callback = update_queue) + i += 1 + while len(queue) == 0 and i < len(combo): + time.sleep(0.05) + pool.close() + pool.join() + + print("Following protein combos couldn't be processed with {} instances".format(error_combo)) + combo = error_combo + + return combo \ No newline at end of file diff --git a/run_amber.py b/run_amber.py index 338d55a..bdb3080 100644 --- a/run_amber.py +++ b/run_amber.py @@ -87,7 +87,7 @@ def amber_relax( print(model_list) for model_name in model_list: for i in range(num_prediction_per_model): - result_output_path = os.path.join(output_dir, f'result_{model_name}_pred_{i}.pkl') + result_output_path = os.path.join(output_dir, f'result_{model_name}_pred_{(FLAGS.random_seed + i) % 5}.pkl') with open(result_output_path, 'rb') as f: prediction_result = pickle.load(f) prediction_result = jax.tree_map( @@ -108,14 +108,14 @@ def amber_relax( print('### post-adjust: amber-relax') relaxed_pdbs = {} t_0 = time.time() - timmer_name = 'amberrelax_%s_from_%s_pred_%s' % (fasta_name, model_name, str(i)) + timmer_name = 'amberrelax_%s_from_%s_pred_%s' % (fasta_name, model_name, str((FLAGS.random_seed + i) % 5)) timmer.add_timmer(timmer_name) t1_amber = time.time() relaxed_pdb_str, _, _ = amber_relaxer.process(prot=unrelaxed_protein) t2_amber = time.time() print(' # [TIME] amber process =', (t2_amber-t1_amber),'sec') relaxed_pdbs[model_name] = relaxed_pdb_str - f_relaxed_output = os.path.join(output_dir, f'relaxed_{model_name}_pred_{i}.pdb') + f_relaxed_output = os.path.join(output_dir, f'relaxed_{model_name}_pred_{(FLAGS.random_seed + i) % 5}.pdb') with open(f_relaxed_output, 'w') as h: h.write(relaxed_pdb_str) timings[f'relax_{model_name}'] = time.time() - t_0 diff --git a/run_modelinfer_pytorch_jit_multimer.py b/run_modelinfer_pytorch_jit_multimer.py index b1ed6b1..47c264e 100644 --- a/run_modelinfer_pytorch_jit_multimer.py +++ b/run_modelinfer_pytorch_jit_multimer.py @@ -142,8 +142,8 @@ def main(argv): fp_timmer = os.path.join(FLAGS.output_dir, f'timmers_{model_name}.txt') h_timmer = Timmers(fp_timmer) for i in range(num_prediction_per_model): - model_runners[f'{model_name}_pred_{i}'] = RunModel( - model_config, root_params, h_timmer, FLAGS.random_seed) + model_runners[f'{model_name}_pred_{(FLAGS.random_seed + i) % 5}'] = RunModel( + model_config, root_params, h_timmer, FLAGS.random_seed + i) for fasta_name in fasta_names: run_model_inference( fasta_name, diff --git a/run_multimodel_infer_multimer.py b/run_multimodel_infer_multimer.py new file mode 100644 index 0000000..85ca3d6 --- /dev/null +++ b/run_multimodel_infer_multimer.py @@ -0,0 +1,105 @@ +import subprocess +import os +import time +import multiprocess_functions as mpf +from datetime import datetime +d = datetime.now() +timestamp = "inference_multimer_%04d%02d%02d%02d%02d" % (d.year, d.month, d.day, d.hour, d.minute) + +from absl import app +from absl import flags +from absl import logging + +flags.DEFINE_string('root_condaenv', None, 'conda environment directory path') +flags.DEFINE_string('root_home', None, 'home directory') +flags.DEFINE_string('input_dir', None, 'root directory holding all .fa files') +flags.DEFINE_string('output_dir', None, 'Path to a directory that will store the results.') +flags.DEFINE_string('model_names', None, 'Names of models to use') +flags.DEFINE_integer('AF2_BF16', 1, 'Set to 0 for FP32 precision run.') +flags.DEFINE_integer('num_multimer_predictions_per_model', 1, 'How many ' + 'predictions (each with a different random seed) will be ' + 'generated per model. E.g. if this is 2 and there are 5 ' + 'models then there will be 10 predictions per input. ' + 'Note: this FLAG only applies in multimer mode') +FLAGS = flags.FLAGS + +script = "python run_modelinfer_pytorch_jit_multimer.py" +base_fold_cmd = "/usr/bin/time -v {} \ + --fasta_paths={} \ + --output_dir={} \ + --model_names={} \ + --root_params={} \ + --random_seed={} \ + --num_multimer_predictions_per_model={}" + +def bash_subprocess(file_path, model_name, random_seed, mem, core_list): + """Starts a new bash subprocess and puts it on the specified cores.""" + out_dir = FLAGS.output_dir + root_params = FLAGS.root_home + "/weights/extracted/" + log_dir = FLAGS.root_home + "/logs/" + str(timestamp) + "/" + os.makedirs(log_dir, exist_ok=True) + # num_multimer_predictions_per_model = FLAGS.num_multimer_predictions_per_model + num_multimer_predictions_per_model = 1 + h_cores = mpf.get_total_cores() + # Choose a different random seed for each model + command = base_fold_cmd.format(script, file_path, out_dir, model_name, root_params, random_seed, num_multimer_predictions_per_model) + numactl_args = ["numactl", "-m", mem, "-C", ",".join(["-".join([str(core_list[0]), str(core_list[-1])]), "-".join([str(core_list[0] + h_cores), str(core_list[-1] + h_cores)])]), command] + # numactl_args = ["numactl", "-m", mem, "-C", "-".join([str(core_list[0]), str(core_list[-1])]), command] + + print(" ".join(numactl_args)) + with open(log_dir + 'inference_log_' + os.path.basename(file_path) + "_" + model_name + "_" + str(random_seed) + '.txt', 'w') as f: + try: + process = subprocess.call(" ".join(numactl_args), shell=True, universal_newlines=True, stdout=f, stderr=f) + except Exception as e: + print('exception for', os.path.basename(file_path), e) + return (process, file_path, model_name, mem, core_list) + +def main(argv): + t1 = time.time() + + if len(argv) > 1: + raise app.UsageError('Too many command-line arguments.') + + # root_condaenv=FLAGS.root_condaenv + input_dir = FLAGS.input_dir + + os.environ["TF_ENABLE_ONEDNN_OPTS"] = "1" + os.environ["MALLOC_CONF"] = "oversize_threshold:1,background_thread:true,metadata_thp:auto,dirty_decay_ms:-1,muzzy_decay_ms:-1" + os.environ["USE_OPENMP"] = "1" + os.environ["USE_AVX512"] = "1" + os.environ["IPEX_ONEDNN_LAYOUT"] = "1" + os.environ["PYTORCH_TENSOREXPR"] = "0" + os.environ["CUDA_VISIBLE_DEVICES"] = "-1" + os.environ["AF2_BF16"] = str(FLAGS.AF2_BF16) + + """The main function.""" + directory = input_dir + + # Get the list of files in the directory. + files = os.listdir(directory) + for i, file in enumerate(files): + files[i] = os.path.join(directory, file) + + model_list = FLAGS.model_names.strip('[]').split(',') + + MIN_MEM_PER_PROCESS=32*1024 # 32 GB + MIN_CORES_PER_PROCESS=8 + LOAD_BALANCE_FACTOR=4 + + num_instances = len(files) * len(model_list) * FLAGS.num_multimer_predictions_per_model + max_processes_list = mpf.create_process_list(num_instances, MIN_MEM_PER_PROCESS, MIN_CORES_PER_PROCESS, LOAD_BALANCE_FACTOR) + error_combo = mpf.multiprocess_models(files, max_processes_list, model_list, FLAGS.num_multimer_predictions_per_model, bash_subprocess) + + print("Following protein combination couldn't be processed {} ".format(error_combo)) + t2 = time.time() + print('### Total inference time: %d sec' % (t2-t1)) + + +if __name__ == "__main__": + flags.mark_flags_as_required([ + 'root_home', + 'input_dir', + 'output_dir', + 'model_names' + ]) + app.run(main) diff --git a/run_multimodel_relax.py b/run_multimodel_relax.py new file mode 100644 index 0000000..c9a085a --- /dev/null +++ b/run_multimodel_relax.py @@ -0,0 +1,106 @@ +import subprocess +import os +import time +import multiprocess_functions as mpf +from datetime import datetime +d = datetime.now() +timestamp = "relax_%04d%02d%02d%02d%02d" % (d.year, d.month, d.day, d.hour, d.minute) + +from absl import app +from absl import flags +from absl import logging + +flags.DEFINE_string('root_home', None, 'home directory') +flags.DEFINE_string('input_dir', None, 'root directory holding all .fa files') +flags.DEFINE_string('output_dir', None, 'Path to a directory that will store the results.') +flags.DEFINE_string('model_names', None, 'Names of models to use') +flags.DEFINE_integer('num_multimer_predictions_per_model', 1, 'How many ' + 'predictions (each with a different random seed) will be ' + 'generated per model. E.g. if this is 2 and there are 5 ' + 'models then there will be 10 predictions per input. ' + 'Note: this FLAG only applies in multimer mode') +flags.DEFINE_enum('model_preset', 'monomer', + ['monomer', 'monomer_casp14', 'monomer_ptm', 'multimer'], + 'Choose preset model configuration - the monomer model, ' + 'the monomer model with extra ensembling, monomer model with ' + 'pTM head, or multimer model') +FLAGS = flags.FLAGS + +script = "python run_amber.py" +base_fold_cmd = "/usr/bin/time -v {} \ + --fasta_paths {} \ + --output_dir {} \ + --model_names={} \ + --model_preset={} \ + --random_seed={} \ + --num_multimer_predictions_per_model={} \ + " + +def bash_subprocess(file_path, model_name, random_seed, mem, core_list): + """Starts a new bash subprocess and puts it on the specified cores.""" + out_dir = FLAGS.output_dir + root_params = FLAGS.root_home + "/weights/extracted/" + log_dir = FLAGS.root_home + "/logs/" + str(timestamp) + "/" + os.makedirs(log_dir, exist_ok=True) + # number_multimer_predictions_per_model = FLAGS.num_multimer_predictions_per_model + number_multimer_predictions_per_model = 1 + model_preset = FLAGS.model_preset + + command = base_fold_cmd.format(script, file_path, out_dir, model_name, model_preset, random_seed, number_multimer_predictions_per_model) + numactl_args = ["numactl", "-m", mem, "-C", "-".join([str(core_list[0]), str(core_list[-1])]), command] + + print(" ".join(numactl_args)) + with open(log_dir + 'relax_log_' + os.path.basename(file_path) + "_" + model_name + '_' + str(random_seed) + '.txt', 'w') as f: + try: + process = subprocess.call(" ".join(numactl_args), shell=True, universal_newlines=True, stdout=f, stderr=f) + except Exception as e: + print('exception for', os.path.basename(file_path), e) + return (process, file_path, model_name, mem, core_list) + + +def main(argv): + t1 = time.time() + + if len(argv) > 1: + raise app.UsageError('Too many command-line arguments.') + + input_dir = FLAGS.input_dir + + os.environ["USE_OPENMP"] = "1" + + """The main function.""" + directory = input_dir + total_cores = mpf.get_total_cores() + print("Total cores: ", total_cores) + print("Total memory: {} MB ".format(mpf.check_available_memory())) + + # Get the list of files in the directory. + files = os.listdir(directory) + for i, file in enumerate(files): + files[i] = os.path.join(directory, file) + + model_list = FLAGS.model_names.strip('[]').split(',') + + MIN_MEM_PER_PROCESS=16*1024 # 16 GB + MIN_CORES_PER_PROCESS=1 + LOAD_BALANCE_FACTOR=1 + + num_instances = len(files) * len(model_list) * FLAGS.num_multimer_predictions_per_model + max_processes_list = mpf.create_process_list(num_instances, MIN_MEM_PER_PROCESS, MIN_CORES_PER_PROCESS, LOAD_BALANCE_FACTOR) + print("max_processes_list", max_processes_list) + error_combo = mpf.multiprocess_models(files, max_processes_list, model_list, FLAGS.num_multimer_predictions_per_model, bash_subprocess) + + print("Following protein combination couldn't be processed".format(error_combo)) + t2 = time.time() + print('### Total Relaxation time: %d sec' % (t2-t1)) + + +if __name__ == "__main__": + flags.mark_flags_as_required([ + 'root_home', + 'input_dir', + 'output_dir', + 'model_names', + 'model_preset', + ]) + app.run(main) diff --git a/run_multiprocess_infer.py b/run_multiprocess_infer.py index d82c95e..9b6b865 100644 --- a/run_multiprocess_infer.py +++ b/run_multiprocess_infer.py @@ -35,7 +35,8 @@ def bash_subprocess(file_path, mem, core_list): model_names=FLAGS.model_names command = base_fold_cmd.format(script, file_path, out_dir, model_names, root_params) - numactl_args = ["numactl", "-m", mem, "-C", "-".join([str(core_list[0]), str(core_list[-1])]), command] + numactl_args = ["numactl", "-m", mem, "-C", "-".join([str(core_list[0]), str(core_list[-1])]) + "," + "-".join([str(core_list[0] + 128), str(core_list[-1] + 128)]), command] + # numactl_args = ["numactl", "-m", mem, "-C", "-".join([str(core_list[0]), str(core_list[-1])]), command] print(" ".join(numactl_args)) with open(log_dir + 'inference_log_' + os.path.basename(file_path) + '.txt', 'w') as f: @@ -77,7 +78,8 @@ def main(argv): MIN_CORES_PER_PROCESS=8 LOAD_BALANCE_FACTOR=4 - max_processes_list = mpf.create_process_list(files, MIN_MEM_PER_PROCESS, MIN_CORES_PER_PROCESS, LOAD_BALANCE_FACTOR) + num_instances = len(files) + max_processes_list = mpf.create_process_list(num_instances, MIN_MEM_PER_PROCESS, MIN_CORES_PER_PROCESS, LOAD_BALANCE_FACTOR) files = mpf.start_process_list(files, max_processes_list, bash_subprocess) print("Following protein files couldn't be processed") diff --git a/run_multiprocess_infer_multimer.py b/run_multiprocess_infer_multimer.py index 52b8619..487ec89 100644 --- a/run_multiprocess_infer_multimer.py +++ b/run_multiprocess_infer_multimer.py @@ -89,7 +89,8 @@ def main(argv): MIN_CORES_PER_PROCESS=8 LOAD_BALANCE_FACTOR=4 - max_processes_list = mpf.create_process_list(files, MIN_MEM_PER_PROCESS, MIN_CORES_PER_PROCESS, LOAD_BALANCE_FACTOR) + num_instances = len(files) + max_processes_list = mpf.create_process_list(num_instances, MIN_MEM_PER_PROCESS, MIN_CORES_PER_PROCESS, LOAD_BALANCE_FACTOR) files = mpf.start_process_list(files, max_processes_list, bash_subprocess) print("Following protein files couldn't be processed") diff --git a/run_multiprocess_pre.py b/run_multiprocess_pre.py index 772f0db..6833545 100644 --- a/run_multiprocess_pre.py +++ b/run_multiprocess_pre.py @@ -82,7 +82,8 @@ def main(argv): MIN_CORES_PER_PROCESS=4 LOAD_BALANCE_FACTOR=1 - max_processes_list = mpf.create_process_list(files, MIN_MEM_PER_PROCESS, MIN_CORES_PER_PROCESS, LOAD_BALANCE_FACTOR) + num_instances = len(files) + max_processes_list = mpf.create_process_list(num_instances, MIN_MEM_PER_PROCESS, MIN_CORES_PER_PROCESS, LOAD_BALANCE_FACTOR) files = mpf.start_process_list(files, max_processes_list, bash_subprocess) print("Following protein files couldn't be processed") diff --git a/run_multiprocess_pre_multimer.py b/run_multiprocess_pre_multimer.py index b655e75..5ef56c5 100644 --- a/run_multiprocess_pre_multimer.py +++ b/run_multiprocess_pre_multimer.py @@ -82,7 +82,8 @@ def main(argv): MIN_CORES_PER_PROCESS=4 LOAD_BALANCE_FACTOR=1 - max_processes_list = mpf.create_process_list(files, MIN_MEM_PER_PROCESS, MIN_CORES_PER_PROCESS, LOAD_BALANCE_FACTOR) + num_instances = len(files) + max_processes_list = mpf.create_process_list(num_instances, MIN_MEM_PER_PROCESS, MIN_CORES_PER_PROCESS, LOAD_BALANCE_FACTOR) files = mpf.start_process_list(files, max_processes_list, bash_subprocess) print("Following protein files couldn't be processed") diff --git a/run_multiprocess_relax.py b/run_multiprocess_relax.py index 45fc969..b78248b 100644 --- a/run_multiprocess_relax.py +++ b/run_multiprocess_relax.py @@ -89,11 +89,11 @@ def main(argv): MIN_CORES_PER_PROCESS=1 LOAD_BALANCE_FACTOR=1 - max_processes_list = mpf.create_process_list(files, MIN_MEM_PER_PROCESS, MIN_CORES_PER_PROCESS, LOAD_BALANCE_FACTOR) + num_instances = len(files) + max_processes_list = mpf.create_process_list(num_instances, MIN_MEM_PER_PROCESS, MIN_CORES_PER_PROCESS, LOAD_BALANCE_FACTOR) files = mpf.start_process_list(files, max_processes_list, bash_subprocess) - print("Following protein files couldn't be processed") - print(files) + print("Following protein files couldn't be processed {}", files) t2 = time.time() print('### Total Relaxation time: %d sec' % (t2-t1)) diff --git a/tpp-pytorch-extension b/tpp-pytorch-extension index b767a59..f991237 160000 --- a/tpp-pytorch-extension +++ b/tpp-pytorch-extension @@ -1 +1 @@ -Subproject commit b767a59d9c0700e52a5ca028c68af6404516d6b2 +Subproject commit f991237d89b7082e6069a612aae28ffcdde85225