Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 72 additions & 3 deletions multiprocess_functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ def update_queue(result):

return error_files

def create_process_list(files, MIN_MEM_PER_PROCESS, MIN_CORES_PER_PROCESS, LOAD_BALANCE_FACTOR):
def create_process_list(num_instances, MIN_MEM_PER_PROCESS, MIN_CORES_PER_PROCESS, LOAD_BALANCE_FACTOR):
total_cores = get_total_cores()
numa_nodes = get_numa_nodes()
cores_per_numa = total_cores//numa_nodes
Expand All @@ -128,7 +128,7 @@ def create_process_list(files, MIN_MEM_PER_PROCESS, MIN_CORES_PER_PROCESS, LOAD_
# Core_max
cm = cores_per_numa // MIN_CORES_PER_PROCESS

pn = len(files) // numa_nodes
pn = num_instances // numa_nodes
# Load_balance_max
lbm = max(pn//LOAD_BALANCE_FACTOR, 1)
print("Memory max {}, Core max {}, Load Balance max {}".format(mm, cm, lbm))
Expand Down Expand Up @@ -164,4 +164,73 @@ def start_process_list(files, max_processes_list, bash_subprocess):
continue

files = returned_files
return files
return files


def multiprocess_models(files, max_processes_list, model_list, num_multimer_predictions_per_model, bash_subprocess):

files = sorted(files, key=os.path.getsize, reverse=True)
total_cores = get_total_cores()
combo = []
for file in files:
for model in model_list:
for i in range(num_multimer_predictions_per_model):
combo.append((file, model, i))
#combo.append((file, model))

for max_processes in max_processes_list:
if len(combo) == 0:
break
if len(combo) < (3*max_processes)//4:
continue
os.environ["OMP_NUM_THREADS"] = str(total_cores//max_processes)
print("Number of OMP Threads = {}, for {} instances".format(os.environ.get('OMP_NUM_THREADS'), max_processes))

cores_per_process = total_cores // max_processes
pool = mp.Pool(processes=max_processes)

queue = [i for i in range(max_processes)]
core_list, numa_nodes = get_core_list(cores_per_process)

error_combo = []
def update_queue(result):
print(result)
index = core_list.index(result[4][0])
queue.append(index // cores_per_process)
if (result[0] != 0):
error_combo.append((result[1], result[2], result[3]))

print(len(combo))
results = [None] * len(combo)

i = 0
for c in combo:
file_path = c[0]
model_name = c[1]
prediction_id = c[2]
random_seed = 10*(i%len(model_list)) + prediction_id # Random seed is set to 0, 10, 20, 30, 40 for each model

process_num = queue.pop(0)

if max_processes < numa_nodes:
if max_processes == 1:
if numa_nodes > 1:
mem = '0-{}'.format(numa_nodes-1)
else:
mem = '0'
else:
mem = '{}-{}'.format(str(process_num * (numa_nodes//max_processes)), str(((process_num + 1) * (numa_nodes//max_processes)) - 1))
else:
mem = str(process_num//(max_processes//numa_nodes))

results[i] = pool.apply_async(bash_subprocess, args=(file_path, model_name, random_seed, mem, core_list[process_num*cores_per_process: (process_num+1)*cores_per_process]), callback = update_queue)
i += 1
while len(queue) == 0 and i < len(combo):
time.sleep(0.05)
pool.close()
pool.join()

print("Following protein combos couldn't be processed with {} instances".format(error_combo))
combo = error_combo

return combo
6 changes: 3 additions & 3 deletions run_amber.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ def amber_relax(
print(model_list)
for model_name in model_list:
for i in range(num_prediction_per_model):
result_output_path = os.path.join(output_dir, f'result_{model_name}_pred_{i}.pkl')
result_output_path = os.path.join(output_dir, f'result_{model_name}_pred_{(FLAGS.random_seed + i) % 5}.pkl')
with open(result_output_path, 'rb') as f:
prediction_result = pickle.load(f)
prediction_result = jax.tree_map(
Expand All @@ -108,14 +108,14 @@ def amber_relax(
print('### post-adjust: amber-relax')
relaxed_pdbs = {}
t_0 = time.time()
timmer_name = 'amberrelax_%s_from_%s_pred_%s' % (fasta_name, model_name, str(i))
timmer_name = 'amberrelax_%s_from_%s_pred_%s' % (fasta_name, model_name, str((FLAGS.random_seed + i) % 5))
timmer.add_timmer(timmer_name)
t1_amber = time.time()
relaxed_pdb_str, _, _ = amber_relaxer.process(prot=unrelaxed_protein)
t2_amber = time.time()
print(' # [TIME] amber process =', (t2_amber-t1_amber),'sec')
relaxed_pdbs[model_name] = relaxed_pdb_str
f_relaxed_output = os.path.join(output_dir, f'relaxed_{model_name}_pred_{i}.pdb')
f_relaxed_output = os.path.join(output_dir, f'relaxed_{model_name}_pred_{(FLAGS.random_seed + i) % 5}.pdb')
with open(f_relaxed_output, 'w') as h:
h.write(relaxed_pdb_str)
timings[f'relax_{model_name}'] = time.time() - t_0
Expand Down
4 changes: 2 additions & 2 deletions run_modelinfer_pytorch_jit_multimer.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,8 +142,8 @@ def main(argv):
fp_timmer = os.path.join(FLAGS.output_dir, f'timmers_{model_name}.txt')
h_timmer = Timmers(fp_timmer)
for i in range(num_prediction_per_model):
model_runners[f'{model_name}_pred_{i}'] = RunModel(
model_config, root_params, h_timmer, FLAGS.random_seed)
model_runners[f'{model_name}_pred_{(FLAGS.random_seed + i) % 5}'] = RunModel(
model_config, root_params, h_timmer, FLAGS.random_seed + i)
for fasta_name in fasta_names:
run_model_inference(
fasta_name,
Expand Down
105 changes: 105 additions & 0 deletions run_multimodel_infer_multimer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
import subprocess
import os
import time
import multiprocess_functions as mpf
from datetime import datetime
d = datetime.now()
timestamp = "inference_multimer_%04d%02d%02d%02d%02d" % (d.year, d.month, d.day, d.hour, d.minute)

from absl import app
from absl import flags
from absl import logging

flags.DEFINE_string('root_condaenv', None, 'conda environment directory path')
flags.DEFINE_string('root_home', None, 'home directory')
flags.DEFINE_string('input_dir', None, 'root directory holding all .fa files')
flags.DEFINE_string('output_dir', None, 'Path to a directory that will store the results.')
flags.DEFINE_string('model_names', None, 'Names of models to use')
flags.DEFINE_integer('AF2_BF16', 1, 'Set to 0 for FP32 precision run.')
flags.DEFINE_integer('num_multimer_predictions_per_model', 1, 'How many '
'predictions (each with a different random seed) will be '
'generated per model. E.g. if this is 2 and there are 5 '
'models then there will be 10 predictions per input. '
'Note: this FLAG only applies in multimer mode')
FLAGS = flags.FLAGS

script = "python run_modelinfer_pytorch_jit_multimer.py"
base_fold_cmd = "/usr/bin/time -v {} \
--fasta_paths={} \
--output_dir={} \
--model_names={} \
--root_params={} \
--random_seed={} \
--num_multimer_predictions_per_model={}"

def bash_subprocess(file_path, model_name, random_seed, mem, core_list):
"""Starts a new bash subprocess and puts it on the specified cores."""
out_dir = FLAGS.output_dir
root_params = FLAGS.root_home + "/weights/extracted/"
log_dir = FLAGS.root_home + "/logs/" + str(timestamp) + "/"
os.makedirs(log_dir, exist_ok=True)
# num_multimer_predictions_per_model = FLAGS.num_multimer_predictions_per_model
num_multimer_predictions_per_model = 1
h_cores = mpf.get_total_cores()
# Choose a different random seed for each model
command = base_fold_cmd.format(script, file_path, out_dir, model_name, root_params, random_seed, num_multimer_predictions_per_model)
numactl_args = ["numactl", "-m", mem, "-C", ",".join(["-".join([str(core_list[0]), str(core_list[-1])]), "-".join([str(core_list[0] + h_cores), str(core_list[-1] + h_cores)])]), command]
# numactl_args = ["numactl", "-m", mem, "-C", "-".join([str(core_list[0]), str(core_list[-1])]), command]

print(" ".join(numactl_args))
with open(log_dir + 'inference_log_' + os.path.basename(file_path) + "_" + model_name + "_" + str(random_seed) + '.txt', 'w') as f:
try:
process = subprocess.call(" ".join(numactl_args), shell=True, universal_newlines=True, stdout=f, stderr=f)
except Exception as e:
print('exception for', os.path.basename(file_path), e)
return (process, file_path, model_name, mem, core_list)

def main(argv):
t1 = time.time()

if len(argv) > 1:
raise app.UsageError('Too many command-line arguments.')

# root_condaenv=FLAGS.root_condaenv
input_dir = FLAGS.input_dir

os.environ["TF_ENABLE_ONEDNN_OPTS"] = "1"
os.environ["MALLOC_CONF"] = "oversize_threshold:1,background_thread:true,metadata_thp:auto,dirty_decay_ms:-1,muzzy_decay_ms:-1"
os.environ["USE_OPENMP"] = "1"
os.environ["USE_AVX512"] = "1"
os.environ["IPEX_ONEDNN_LAYOUT"] = "1"
os.environ["PYTORCH_TENSOREXPR"] = "0"
os.environ["CUDA_VISIBLE_DEVICES"] = "-1"
os.environ["AF2_BF16"] = str(FLAGS.AF2_BF16)

"""The main function."""
directory = input_dir

# Get the list of files in the directory.
files = os.listdir(directory)
for i, file in enumerate(files):
files[i] = os.path.join(directory, file)

model_list = FLAGS.model_names.strip('[]').split(',')

MIN_MEM_PER_PROCESS=32*1024 # 32 GB
MIN_CORES_PER_PROCESS=8
LOAD_BALANCE_FACTOR=4

num_instances = len(files) * len(model_list) * FLAGS.num_multimer_predictions_per_model
max_processes_list = mpf.create_process_list(num_instances, MIN_MEM_PER_PROCESS, MIN_CORES_PER_PROCESS, LOAD_BALANCE_FACTOR)
error_combo = mpf.multiprocess_models(files, max_processes_list, model_list, FLAGS.num_multimer_predictions_per_model, bash_subprocess)

print("Following protein combination couldn't be processed {} ".format(error_combo))
t2 = time.time()
print('### Total inference time: %d sec' % (t2-t1))


if __name__ == "__main__":
flags.mark_flags_as_required([
'root_home',
'input_dir',
'output_dir',
'model_names'
])
app.run(main)
106 changes: 106 additions & 0 deletions run_multimodel_relax.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
import subprocess
import os
import time
import multiprocess_functions as mpf
from datetime import datetime
d = datetime.now()
timestamp = "relax_%04d%02d%02d%02d%02d" % (d.year, d.month, d.day, d.hour, d.minute)

from absl import app
from absl import flags
from absl import logging

flags.DEFINE_string('root_home', None, 'home directory')
flags.DEFINE_string('input_dir', None, 'root directory holding all .fa files')
flags.DEFINE_string('output_dir', None, 'Path to a directory that will store the results.')
flags.DEFINE_string('model_names', None, 'Names of models to use')
flags.DEFINE_integer('num_multimer_predictions_per_model', 1, 'How many '
'predictions (each with a different random seed) will be '
'generated per model. E.g. if this is 2 and there are 5 '
'models then there will be 10 predictions per input. '
'Note: this FLAG only applies in multimer mode')
flags.DEFINE_enum('model_preset', 'monomer',
['monomer', 'monomer_casp14', 'monomer_ptm', 'multimer'],
'Choose preset model configuration - the monomer model, '
'the monomer model with extra ensembling, monomer model with '
'pTM head, or multimer model')
FLAGS = flags.FLAGS

script = "python run_amber.py"
base_fold_cmd = "/usr/bin/time -v {} \
--fasta_paths {} \
--output_dir {} \
--model_names={} \
--model_preset={} \
--random_seed={} \
--num_multimer_predictions_per_model={} \
"

def bash_subprocess(file_path, model_name, random_seed, mem, core_list):
"""Starts a new bash subprocess and puts it on the specified cores."""
out_dir = FLAGS.output_dir
root_params = FLAGS.root_home + "/weights/extracted/"
log_dir = FLAGS.root_home + "/logs/" + str(timestamp) + "/"
os.makedirs(log_dir, exist_ok=True)
# number_multimer_predictions_per_model = FLAGS.num_multimer_predictions_per_model
number_multimer_predictions_per_model = 1
model_preset = FLAGS.model_preset

command = base_fold_cmd.format(script, file_path, out_dir, model_name, model_preset, random_seed, number_multimer_predictions_per_model)
numactl_args = ["numactl", "-m", mem, "-C", "-".join([str(core_list[0]), str(core_list[-1])]), command]

print(" ".join(numactl_args))
with open(log_dir + 'relax_log_' + os.path.basename(file_path) + "_" + model_name + '_' + str(random_seed) + '.txt', 'w') as f:
try:
process = subprocess.call(" ".join(numactl_args), shell=True, universal_newlines=True, stdout=f, stderr=f)
except Exception as e:
print('exception for', os.path.basename(file_path), e)
return (process, file_path, model_name, mem, core_list)


def main(argv):
t1 = time.time()

if len(argv) > 1:
raise app.UsageError('Too many command-line arguments.')

input_dir = FLAGS.input_dir

os.environ["USE_OPENMP"] = "1"

"""The main function."""
directory = input_dir
total_cores = mpf.get_total_cores()
print("Total cores: ", total_cores)
print("Total memory: {} MB ".format(mpf.check_available_memory()))

# Get the list of files in the directory.
files = os.listdir(directory)
for i, file in enumerate(files):
files[i] = os.path.join(directory, file)

model_list = FLAGS.model_names.strip('[]').split(',')

MIN_MEM_PER_PROCESS=16*1024 # 16 GB
MIN_CORES_PER_PROCESS=1
LOAD_BALANCE_FACTOR=1

num_instances = len(files) * len(model_list) * FLAGS.num_multimer_predictions_per_model
max_processes_list = mpf.create_process_list(num_instances, MIN_MEM_PER_PROCESS, MIN_CORES_PER_PROCESS, LOAD_BALANCE_FACTOR)
print("max_processes_list", max_processes_list)
error_combo = mpf.multiprocess_models(files, max_processes_list, model_list, FLAGS.num_multimer_predictions_per_model, bash_subprocess)

print("Following protein combination couldn't be processed".format(error_combo))
t2 = time.time()
print('### Total Relaxation time: %d sec' % (t2-t1))


if __name__ == "__main__":
flags.mark_flags_as_required([
'root_home',
'input_dir',
'output_dir',
'model_names',
'model_preset',
])
app.run(main)
6 changes: 4 additions & 2 deletions run_multiprocess_infer.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@ def bash_subprocess(file_path, mem, core_list):
model_names=FLAGS.model_names

command = base_fold_cmd.format(script, file_path, out_dir, model_names, root_params)
numactl_args = ["numactl", "-m", mem, "-C", "-".join([str(core_list[0]), str(core_list[-1])]), command]
numactl_args = ["numactl", "-m", mem, "-C", "-".join([str(core_list[0]), str(core_list[-1])]) + "," + "-".join([str(core_list[0] + 128), str(core_list[-1] + 128)]), command]
# numactl_args = ["numactl", "-m", mem, "-C", "-".join([str(core_list[0]), str(core_list[-1])]), command]

print(" ".join(numactl_args))
with open(log_dir + 'inference_log_' + os.path.basename(file_path) + '.txt', 'w') as f:
Expand Down Expand Up @@ -77,7 +78,8 @@ def main(argv):
MIN_CORES_PER_PROCESS=8
LOAD_BALANCE_FACTOR=4

max_processes_list = mpf.create_process_list(files, MIN_MEM_PER_PROCESS, MIN_CORES_PER_PROCESS, LOAD_BALANCE_FACTOR)
num_instances = len(files)
max_processes_list = mpf.create_process_list(num_instances, MIN_MEM_PER_PROCESS, MIN_CORES_PER_PROCESS, LOAD_BALANCE_FACTOR)
files = mpf.start_process_list(files, max_processes_list, bash_subprocess)

print("Following protein files couldn't be processed")
Expand Down
3 changes: 2 additions & 1 deletion run_multiprocess_infer_multimer.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,8 @@ def main(argv):
MIN_CORES_PER_PROCESS=8
LOAD_BALANCE_FACTOR=4

max_processes_list = mpf.create_process_list(files, MIN_MEM_PER_PROCESS, MIN_CORES_PER_PROCESS, LOAD_BALANCE_FACTOR)
num_instances = len(files)
max_processes_list = mpf.create_process_list(num_instances, MIN_MEM_PER_PROCESS, MIN_CORES_PER_PROCESS, LOAD_BALANCE_FACTOR)
files = mpf.start_process_list(files, max_processes_list, bash_subprocess)

print("Following protein files couldn't be processed")
Expand Down
3 changes: 2 additions & 1 deletion run_multiprocess_pre.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,8 @@ def main(argv):
MIN_CORES_PER_PROCESS=4
LOAD_BALANCE_FACTOR=1

max_processes_list = mpf.create_process_list(files, MIN_MEM_PER_PROCESS, MIN_CORES_PER_PROCESS, LOAD_BALANCE_FACTOR)
num_instances = len(files)
max_processes_list = mpf.create_process_list(num_instances, MIN_MEM_PER_PROCESS, MIN_CORES_PER_PROCESS, LOAD_BALANCE_FACTOR)
files = mpf.start_process_list(files, max_processes_list, bash_subprocess)

print("Following protein files couldn't be processed")
Expand Down
3 changes: 2 additions & 1 deletion run_multiprocess_pre_multimer.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,8 @@ def main(argv):
MIN_CORES_PER_PROCESS=4
LOAD_BALANCE_FACTOR=1

max_processes_list = mpf.create_process_list(files, MIN_MEM_PER_PROCESS, MIN_CORES_PER_PROCESS, LOAD_BALANCE_FACTOR)
num_instances = len(files)
max_processes_list = mpf.create_process_list(num_instances, MIN_MEM_PER_PROCESS, MIN_CORES_PER_PROCESS, LOAD_BALANCE_FACTOR)
files = mpf.start_process_list(files, max_processes_list, bash_subprocess)

print("Following protein files couldn't be processed")
Expand Down
Loading