Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
116 changes: 73 additions & 43 deletions arc/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@

import numpy as np
from typing import TYPE_CHECKING, List, Optional, Tuple, Union
from concurrent.futures import ThreadPoolExecutor, as_completed

import arc.parser.parser as parser
from arc import plotter
Expand Down Expand Up @@ -131,6 +132,7 @@ class Scheduler(object):
species_list (list): Contains input :ref:`ARCSpecies <species>` objects (both wells and TSs).
rxn_list (list): Contains input :ref:`ARCReaction <reaction>` objects.
project_directory (str): Folder path for the project: the input file path or ARC/Projects/project-name.
conformer_gen_nprocs (int, optional): The number of processes to use for non-TS conformer generation. Defaults to 1 (serial).
composite_method (str, optional): A composite method to use.
conformer_opt_level (Union[str, dict], optional): The level of theory to use for conformer comparisons.
conformer_sp_level (Union[str, dict], optional): The level of theory to use for conformer sp jobs.
Expand Down Expand Up @@ -171,6 +173,7 @@ class Scheduler(object):

Attributes:
project (str): The project's name. Used for naming the working directory.
conformer_gen_nprocs (int): The number of processes to use for non-TS conformer generation.
servers (list): A list of servers used for the present project.
species_list (list): Contains input :ref:`ARCSpecies <species>` objects (both species and TSs).
species_dict (dict): Keys are labels, values are :ref:`ARCSpecies <species>` objects.
Expand Down Expand Up @@ -232,6 +235,7 @@ def __init__(self,
ess_settings: dict,
species_list: list,
project_directory: str,
conformer_gen_nprocs: Optional[int] = 1,
composite_method: Optional[Level] = None,
conformer_opt_level: Optional[Level] = None,
conformer_sp_level: Optional[Level] = None,
Expand Down Expand Up @@ -297,6 +301,7 @@ def __init__(self,
self.output_multi_spc = dict()
self.report_e_elect = report_e_elect
self.skip_nmd = skip_nmd
self.conformer_gen_nprocs = int(conformer_gen_nprocs or default_job_settings.get('conformer_gen_nprocs', 1))

self.species_dict, self.rxn_dict = dict(), dict()
for species in self.species_list:
Expand Down Expand Up @@ -1040,6 +1045,26 @@ def end_job(self, job: 'JobAdapter',
self.save_restart_dict()
return True

def _generate_conformers_for_label(self, label: str) -> None:
"""
A helper function to generate conformers for a given species label (used internally).

Args:
label (str): The species label.
"""
species = self.species_dict[label]
if species.force_field == 'cheap':
# Just embed in RDKit and use MMFF94s for opt and energies.
if species.initial_xyz is None:
species.initial_xyz = species.get_xyz()
else:
n_confs = self.n_confs if species.multi_species is None else 1
species.generate_conformers(
n_confs=n_confs,
e_confs=self.e_confs,
plot_path=os.path.join(self.project_directory, 'output', 'Species',
label, 'geometry', 'conformers'))

def _run_a_job(self,
job: 'JobAdapter',
label: str,
Expand Down Expand Up @@ -1088,52 +1113,57 @@ def run_conformer_jobs(self, labels: Optional[List[str]] = None):
If ``None``, conformer jobs will be spawned for all species in self.species_list.
"""
labels_to_consider = labels if labels is not None else [spc.label for spc in self.species_list]
log_info_printed = False
pending_non_ts_generation: List[str] = []

for label in labels_to_consider:
if not self.species_dict[label].is_ts and not self.output[label]['job_types']['opt'] \
and 'opt' not in self.job_dict[label] and 'composite' not in self.job_dict[label] \
and all([e is None for e in self.species_dict[label].conformer_energies]) \
and self.species_dict[label].number_of_atoms > 1 and not self.output[label]['paths']['geo'] \
and self.species_dict[label].yml_path is None and not self.output[label]['convergence'] \
and (self.job_types['conf_opt'] and label not in self.dont_gen_confs
or self.species_dict[label].get_xyz(generate=False) is None):
# This is not a TS, opt (/composite) did not converge nor is it running, and conformer energies were
# not set. Also, either 'conf_opt' are set to True in job_types (and it's not in dont_gen_confs),
# or they are set to False (or it's in dont_gen_confs), but the species has no 3D information.
# Generate conformers.
if not log_info_printed:
logger.info('\nStarting (non-TS) species conformational analysis...\n')
log_info_printed = True
if self.species_dict[label].force_field == 'cheap':
# Just embed in RDKit and use MMFF94s for opt and energies.
if self.species_dict[label].initial_xyz is None:
self.species_dict[label].initial_xyz = self.species_dict[label].get_xyz()
else:
# Run the combinatorial method w/o fitting a force field.
n_confs = self.n_confs if self.species_dict[label].multi_species is None else 1
self.species_dict[label].generate_conformers(
n_confs=n_confs,
e_confs=self.e_confs,
plot_path=os.path.join(self.project_directory, 'output', 'Species',
label, 'geometry', 'conformers'))
self.process_conformers(label)
# TSs:
elif self.species_dict[label].is_ts \
and self.species_dict[label].tsg_spawned \
and not self.species_dict[label].ts_conf_spawned \
and all([tsg.success is not None for tsg in self.species_dict[label].ts_guesses]) \
and any([tsg.success for tsg in self.species_dict[label].ts_guesses]):
# This is a TS Species for which all TSGs were spawned, but conformers haven't been spawned,
# and all tsg.success flags contain a values (either ``True`` or ``False``), so they are done.
# We're ready to spawn conformer jobs for this TS Species
# Non_TS needing conformer generation:
if (not self.species_dict[label].is_ts
and not self.output[label]['job_types']['opt']
and 'opt' not in self.job_dict[label]
and 'composite' not in self.job_dict[label]
and all([e is None for e in self.species_dict[label].conformer_energies])
and self.species_dict[label].number_of_atoms > 1
and not self.output[label]['paths']['geo']
and self.species_dict[label].yml_path is None
and not self.output[label]['convergence']
and ((self.job_types['conf_opt'] and label not in self.dont_gen_confs)
or self.species_dict[label].get_xyz(generate=False) is None)):
pending_non_ts_generation.append(label)
elif (self.species_dict[label].is_ts
and self.species_dict[label].tsg_spawned
and not self.species_dict[label].ts_conf_spawned
and all([tsg.success is not None for tsg in self.species_dict[label].ts_guesses])
and any([tsg.success for tsg in self.species_dict[label].ts_guesses])):
self.run_ts_conformer_jobs(label=label)
self.species_dict[label].ts_conf_spawned = True
if label in self.dont_gen_confs \
and (self.species_dict[label].initial_xyz is not None
or self.species_dict[label].final_xyz is not None
or len(self.species_dict[label].conformers)) \
and not self.species_dict[label].is_ts:
# The species was defined with xyzs.
if (label in self.dont_gen_confs
and (self.species_dict[label].initial_xyz is not None
or self.species_dict[label].final_xyz is not None
or len(self.species_dict[label].conformers))
and not self.species_dict[label].is_ts):
self.process_conformers(label)

if pending_non_ts_generation:
logger.info('\nStarting (non-TS) species conformational analysis...\n')
if self.conformer_gen_nprocs > 1 and len(pending_non_ts_generation) > 1:
# Parallel generation
with ThreadPoolExecutor(max_workers=self.conformer_gen_nprocs) as executor:
futures = {
executor.submit(self._generate_conformers_for_label, label): label
for label in pending_non_ts_generation
}
for future in as_completed(futures):
label_done = futures[future]
try:
future.result()
except Exception as e:
logger.error(f'Error generating conformers for species {label_done}: {e}')
else:
# Serial generation
for label in pending_non_ts_generation:
self._generate_conformers_for_label(label)

for label in pending_non_ts_generation:
self.process_conformers(label)

def run_ts_conformer_jobs(self, label: str):
Expand Down
112 changes: 112 additions & 0 deletions arc/scheduler_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,11 @@
This module contains unit tests for the arc.scheduler module
"""

import time
import unittest
import os
import shutil
from unittest.mock import patch

import arc.parser.parser as parser
from arc.checks.ts import check_ts
Expand All @@ -24,6 +26,29 @@

default_levels_of_theory = settings['default_levels_of_theory']

# Helpers for MultiProcessing Test
def fake_generate_conformers(species_self, n_confs=10, e_confs=5, plot_path=None):
"""Stand-in for ARCSpecies.generate_conformers.
It both *creates a dummy conformer* and *records the label* on a function attribute."""
species_self.conformers = [{
"symbols": ("H", "H"),
"isotopes": (1, 1),
"coords": ((0.0, 0.0, 0.0), (0.0, 0.0, 0.74)),
}]
species_self.conformer_energies = [0.0]
fake_generate_conformers.called.add(species_self.label)


def fake_process_conformers(sched_self, lbl):
"""Stand-in for Scheduler.process_conformers."""
fake_process_conformers.called.add(lbl)


# simple per-test “state holders” on the functions themselves
fake_generate_conformers.called = set()
fake_process_conformers.called = set()
#


class TestScheduler(unittest.TestCase):
"""
Expand Down Expand Up @@ -757,6 +782,93 @@ def test_add_label_to_unique_species_labels(self):
self.assertEqual(unique_label, 'new_species_15_1')
self.assertEqual(self.sched2.unique_species_labels, ['methylamine', 'C2H6', 'CtripCO', 'new_species_15', 'new_species_15_0', 'new_species_15_1'])

def test_run_conformer_jobs_calls_process_conformers_for_dont_gen_confs(self):
"""
If a non-TS is in dont_gen_confs and the species has initial/final xyz or existing conformers,
run_conformer_jobs should call process_conformers(label) for that species.
"""
label = "spc_dont_gen_confs"
xyz = {
'symbols': ("H", "H"),
'isotopes': (1, 1),
'coords': ((0.0, 0.0, 0.0), (0.0, 0.0, 0.74)),
}
spc = ARCSpecies(label=label, smiles='[H][H]', xyz=xyz)
spc.number_of_atoms = 2
spc.conformers = [xyz]
spc.conformer_energies = [None]

job_types = {'conf_opt': True, 'conf_sp': False, 'opt': False, 'fine': False, 'freq': False,
'sp': False, 'rotors': False, 'orbitals': False, 'lennard_jones': False, 'bde': False, 'irc': False, 'tsg': False}
sched = Scheduler(project='test_run_conformer_jobs_dont_gen_confs',
ess_settings=self.ess_settings,
species_list=[spc],
project_directory=os.path.join(ARC_PATH, 'Projects', 'arc_project_for_testing_delete_after_usage3'),
testing=True,
job_types=job_types)
sched.dont_gen_confs = [label]

with patch.object(Scheduler, "process_conformers", autospec=True) as mock_proc:
sched.run_conformer_jobs(labels=[label])
mock_proc.assert_called_once()
# First arg is Scheduler (self), second is label
called_args = mock_proc.call_args[0]
self.assertEqual(called_args[1], label)

def test_sched_calls_real_generate_conformers(self):
spc = ARCSpecies(label="spc_real_gen_confs", multiplicity=1, charge=0, smiles='CC')
sched = Scheduler(
project='test_sched_calls_real_generate_conformers',
ess_settings=self.ess_settings,
species_list=[spc],
project_directory=os.path.join(ARC_PATH, 'Projects', 'arc_project_for_testing_delete_after_usage3'),
testing=True,
job_types={'conf_opt': True, 'conf_sp': False, 'opt': False, 'fine': False, 'freq': False,
'sp': False, 'rotors': False, 'orbitals': False, 'lennard_jones': False, 'bde': False, 'irc': False, 'tsg': False},
)

sched._generate_conformers_for_label(label=spc.label)
assert len(spc.conformers) > 0, "Real conformer generation did not populate spc.conformers"

def test_run_conformer_jobs_generates_and_processes_for_multiple_species(self):
"""
For multiple non-TS species needing conformer generation, verify that:
- ARCSpecies.generate_conformers is invoked for each species
- Scheduler.process_conformers is subsequently called for each label
Also verify both serial (nprocs=1) and parallel (nprocs>1) code paths.
"""
job_types = {
'conf_opt': True, 'conf_sp': False, 'opt': False, 'fine': False, 'freq': False,
'sp': False, 'rotors': False, 'orbitals': False, 'lennard_jones': False,
'bde': False, 'irc': False, 'tsg': False
}
time_taken = {}
for nprocs in (1, 2):
with self.subTest(nprocs=nprocs):
start_time = time.time()
fake_generate_conformers.called.clear()
fake_process_conformers.called.clear()
spc1 = ARCSpecies(label="spc_multi_1", multiplicity=1, charge=0, smiles='CCO')
spc2 = ARCSpecies(label="spc_multi_2", multiplicity=1, charge=0, smiles='CCN')

sched = Scheduler(
project='test_run_conformer_jobs_multiple_species',
ess_settings=self.ess_settings,
species_list=[
spc1,
spc2,
],
project_directory=os.path.join(ARC_PATH, 'Projects', 'arc_project_for_testing_delete_after_usage3'),
testing=True,
job_types=job_types,
conformer_gen_nprocs=nprocs,
)

sched.run_conformer_jobs(labels=[spc1.label, spc2.label])
time_taken[nprocs] = time.time() - start_time
print(f"nprocs={nprocs}, fake_generate_conformers.called={fake_generate_conformers.called}, fake_process_conformers.called={fake_process_conformers.called}")

print(f"Time taken: {time_taken}")
@classmethod
def tearDownClass(cls):
"""
Expand Down
Loading
Loading