diff --git a/smdebug/core/hook.py b/smdebug/core/hook.py index cb3a46d2b..3e2e80ec5 100644 --- a/smdebug/core/hook.py +++ b/smdebug/core/hook.py @@ -136,7 +136,7 @@ def __init__( self.reduction_config = reduction_config self.include_regex = include_regex self.collection_manager = collection_manager - self.collection_manager.set_num_workers(self.get_num_workers()) + self.collection_manager.set_num_workers(self._get_num_workers()) self.init_step = init_step self.logger = logger @@ -228,11 +228,11 @@ def __repr__(self): ) @abstractmethod - def get_worker_name(self): + def _get_worker_name(self): pass @abstractmethod - def get_num_workers(self): + def _get_num_workers(self): pass #### Save Manager methods #### @@ -361,7 +361,7 @@ def _initialize_writers(self) -> None: return self.writer = FileWriter(trial_dir=self.out_dir, step=self.step, worker=self.worker) - def get_writers(self, tensor_name, tensor_ref=None) -> List[FileWriter]: + def _get_writers(self, tensor_name, tensor_ref=None) -> List[FileWriter]: """ :param tensor_name: :param tensor_ref: used by TF @@ -451,7 +451,7 @@ def set_mode(self, mode): self._collections_to_save_for_step = None def export_collections(self): - num_workers = self.get_num_workers() + num_workers = self._get_num_workers() if self.save_all_workers is False: if self.chief_worker != self.worker: return @@ -604,7 +604,7 @@ def _write_raw_tensor_simple(self, tensor_name, tensor_value, tensor_ref=None): numpy_tensor_value = self._make_numpy_array(tensor_value) this_size, this_shape = size_and_shape(numpy_tensor_value) if self.dry_run is False and this_size > 0: - writers = self.get_writers(tensor_name, tensor_ref=tensor_ref) + writers = self._get_writers(tensor_name, tensor_ref=tensor_ref) for writer in writers: writer.write_tensor( tdata=numpy_tensor_value, @@ -709,11 +709,6 @@ def _make_numpy_array(tensor_value): :return: numpy ndarray """ - def _set_collection_manager(self, coll_manager): - # used when creating hook from json config - # using this elsewhere may have unintended consequences - self.collection_manager = coll_manager - def add_to_collection(self, collection_name, variable): self.collection_manager.get(collection_name).add(variable) diff --git a/smdebug/mxnet/hook.py b/smdebug/mxnet/hook.py index 66209c7a3..18b290893 100644 --- a/smdebug/mxnet/hook.py +++ b/smdebug/mxnet/hook.py @@ -61,10 +61,10 @@ def __init__( self.exported_model = False # Keep the set of blocks to which this hook is registered. The blocks include loss blocks as well. self.registered_blocks = set() - self.worker = self.get_worker_name() + self.worker = self._get_worker_name() set_hook(self) - def get_worker_name(self): + def _get_worker_name(self): try: import horovod.mxnet as hvd @@ -74,7 +74,7 @@ def get_worker_name(self): pass return CONFIG_DEFAULT_WORKER_NAME - def get_num_workers(self): + def _get_num_workers(self): try: import horovod.mxnet as hvd @@ -91,17 +91,17 @@ def hook_from_config(cls, json_config_path=None): def _cleanup(self): # Write the gradients of the past step if the writer is still available. if self.writer is not None and self.last_block is not None: - self.log_params(self.last_block) + self._log_params(self.last_block) if self.exported_model is False: self._export_model() super()._cleanup() - def log_params(self, block): + def _log_params(self, block): params = block.collect_params().values() for param in params: - self.log_param(param) + self._log_param(param) - def log_param(self, param): + def _log_param(self, param): self._save_for_tensor(tensor_name=param.name, tensor_value=param.data(param.list_ctx()[0])) # If Gradient for this param is available if param.grad_req != "null": @@ -127,7 +127,7 @@ def forward_pre_hook(self, block, inputs): if self.writer is not None: # Write the params and gradients of the # past step if the writer is still available. - self.log_params(block) + self._log_params(block) self._close_writers() self._close_tb_writer() @@ -206,6 +206,10 @@ def _is_recursive_needed(self): return len(extra_coll) != 0 def register_hook(self, block): + # for compatibility with ZCC patches which call this + self.register_block(block) + + def register_block(self, block): """ This function registers the forward hook. If user wants to register the hook for every child in the given block, then the function calls "apply" API for diff --git a/smdebug/pytorch/hook.py b/smdebug/pytorch/hook.py index 366ec85b5..1999698cd 100644 --- a/smdebug/pytorch/hook.py +++ b/smdebug/pytorch/hook.py @@ -55,10 +55,10 @@ def __init__( self.has_registered_module = False self.has_registered_loss_module = False - self.worker = self.get_worker_name() + self.worker = self._get_worker_name() set_hook(self) - def get_num_workers(self): + def _get_num_workers(self): """Check horovod and torch.distributed.""" # Try torch.distributed # torch.distributed is empty on Mac on Torch <= 1.2 @@ -76,7 +76,7 @@ def get_num_workers(self): # Return default return 1 - def get_worker_name(self): + def _get_worker_name(self): """Check horovod and torch.distributed.""" # Try torch.distributed # torch.distributed is empty on Mac on Torch <= 1.2 @@ -108,7 +108,7 @@ def hook_from_config(cls, json_config_path=None): """ return create_hook_from_json_config(cls, json_config_path=json_config_path) - def log_params(self, module): + def _log_params(self, module): module_name = module._get_name() params = module.named_parameters() for name, param in params: @@ -149,7 +149,7 @@ def forward_pre_hook(self, module, inputs): if self._get_collections_to_save_for_step(): self._initialize_writers() - self.log_params(module) + self._log_params(module) if self.last_saved_step is not None and not self.exported_collections: self.export_collections() @@ -200,11 +200,15 @@ def _backward_apply(self, module): pname = module._get_name() + "_" + name param.register_hook(self.backward_hook(pname)) - def closure_for_registering_forward_hook(self, module): + def _closure_for_registering_forward_hook(self, module): """Lambda functions don't work here.""" module.register_forward_hook(self.forward_hook) def register_hook(self, module): + # for compatibility with ZCC patches which call this + self.register_module(module) + + def register_module(self, module): """ This function registers the forward hook. If user wants to register the hook for every child in the given block, then the function calls "apply" API for @@ -228,7 +232,7 @@ def register_hook(self, module): # Set `self.forward_hook` as a callback for each submodule/layer. # `module.apply(fn)` calls fn for each submodule in module.children() - module.apply(self.closure_for_registering_forward_hook) + module.apply(self._closure_for_registering_forward_hook) # Capture the gradient for each parameter in the net self._backward_apply(module) diff --git a/smdebug/tensorflow/base_hook.py b/smdebug/tensorflow/base_hook.py index 069108e1e..59838f7ca 100644 --- a/smdebug/tensorflow/base_hook.py +++ b/smdebug/tensorflow/base_hook.py @@ -98,7 +98,7 @@ def __init__( def hook_from_config(cls, json_config_path=None): return create_hook_from_json_config(cls, json_config_path=json_config_path) - def get_distribution_strategy(self) -> TFDistributionStrategy: + def _get_distribution_strategy(self) -> TFDistributionStrategy: try: import horovod.tensorflow as hvd @@ -120,7 +120,7 @@ def get_distribution_strategy(self) -> TFDistributionStrategy: return TFDistributionStrategy.UNSUPPORTED - def get_worker_name(self) -> str: + def _get_worker_name(self) -> str: """ This function returns the name of the worker based on the distribution strategy. @@ -146,7 +146,7 @@ def get_worker_name(self) -> str: return CONFIG_DEFAULT_WORKER_NAME def export_collections(self): - num_workers = self.get_num_workers() + num_workers = self._get_num_workers() if self.save_all_workers is False: num_workers = 1 if ( @@ -168,7 +168,7 @@ def export_collections(self): collection_file_name = f"{self.worker}_collections.json" self.collection_manager.export(self.out_dir, collection_file_name) - def get_num_workers(self): + def _get_num_workers(self): try: import horovod.tensorflow as hvd @@ -194,7 +194,7 @@ def _add_to_device_map(self, tensor): if tensor.device and "CPU" not in tensor.device and tensor.device not in self.device_map: self.device_map[tensor.device] = serialize_tf_device(tensor.device) - def get_writers(self, tensor_name, tensor_ref) -> List[FileWriter]: + def _get_writers(self, tensor_name, tensor_ref) -> List[FileWriter]: """ For tensors generated during distributed tf jobs, we map the tensor to a writer with its device attribute. @@ -379,7 +379,8 @@ def save_scalar(self, name, value, searchable=False): save_scalar() not supported on Tensorflow """ self.logger.warning( - "save_scalar not supported on Tensorflow. Add the scalar to searchable_scalars collection instead." + "save_scalar not supported on Tensorflow. " + "Add the scalar to scalars or searchable_scalars collection instead. " ) return diff --git a/smdebug/tensorflow/keras.py b/smdebug/tensorflow/keras.py index db3fc8a9b..af1ff5b73 100644 --- a/smdebug/tensorflow/keras.py +++ b/smdebug/tensorflow/keras.py @@ -78,7 +78,7 @@ def _is_not_supported(self): ): self.logger.info("Disabling SMDebug as it does not support eager mode") self._hook_supported = False - elif self.get_distribution_strategy() == TFDistributionStrategy.MIRRORED_STRATEGY: + elif self._get_distribution_strategy() == TFDistributionStrategy.MIRRORED_STRATEGY: try: from tensorflow.python.keras.distribute.distributed_training_utils import ( get_distributed_model, @@ -90,7 +90,7 @@ def _is_not_supported(self): "with TensorFlow version <1.14" ) self._hook_supported = False - elif self.get_distribution_strategy() == TFDistributionStrategy.UNSUPPORTED: + elif self._get_distribution_strategy() == TFDistributionStrategy.UNSUPPORTED: self.logger.info( f"Disabling SMDebug as it does not support " f"{tf.distribute.get_strategy()}" ) @@ -430,8 +430,8 @@ def on_epoch_end(self, batch, logs=None): def _on_any_mode_begin(self, mode): if self._is_not_supported(): return - self.distribution_strategy = self.get_distribution_strategy() - self.worker = self.get_worker_name() + self.distribution_strategy = self._get_distribution_strategy() + self.worker = self._get_worker_name() self.graph = tf.get_default_graph() self.set_mode(mode) diff --git a/smdebug/tensorflow/session.py b/smdebug/tensorflow/session.py index 8b4d4c5a1..4bdb0223c 100644 --- a/smdebug/tensorflow/session.py +++ b/smdebug/tensorflow/session.py @@ -196,7 +196,7 @@ def _add_weights_and_biases(self): def _is_not_supported(self): if self._hook_supported is None: self._hook_supported = True - if self.get_distribution_strategy() == TFDistributionStrategy.MIRRORED_STRATEGY: + if self._get_distribution_strategy() == TFDistributionStrategy.MIRRORED_STRATEGY: from packaging import version if version.parse(tf.__version__) < version.parse("1.14.0"): @@ -207,7 +207,7 @@ def _is_not_supported(self): "Disabling SMDebug as it does not support mirrored strategy" "with TensorFlow version <1.14" ) - elif self.get_distribution_strategy() == TFDistributionStrategy.UNSUPPORTED: + elif self._get_distribution_strategy() == TFDistributionStrategy.UNSUPPORTED: self.logger.info( f"Disabling SMDebug as it does not support " f"{tf.distribute.get_strategy()}" ) @@ -231,8 +231,8 @@ def begin(self): # todo: use global step from TF instead of tornasole steps # todo: handle multiple graphs in the model - self.worker = self.get_worker_name() - self.distribution_strategy = self.get_distribution_strategy() + self.worker = self._get_worker_name() + self.distribution_strategy = self._get_distribution_strategy() self.graph = tf.get_default_graph() self._add_weights_and_biases() diff --git a/smdebug/trials/local_trial.py b/smdebug/trials/local_trial.py index 70dafe318..936fbcfec 100644 --- a/smdebug/trials/local_trial.py +++ b/smdebug/trials/local_trial.py @@ -34,22 +34,18 @@ def __init__( self.index_reader = LocalIndexReader(self.path) self.logger.info(f"Loading trial {name} at path {self.trial_dir}") self._load_collections() - self.load_tensors() + self._load_tensors() - def get_collection_files(self) -> list: + def _get_collection_files(self) -> list: return list_files_in_directory(get_path_to_collections(self.path)) def _load_tensors_from_index_tensors(self, index_tensors_dict): for tname in index_tensors_dict: for step, itds in index_tensors_dict[tname].items(): for worker in itds: - self.add_tensor(int(step), worker, itds[worker]["tensor_location"]) + self._add_tensor(int(step), worker, itds[worker]["tensor_location"]) - def read_collections(self, collection_files): + def _read_collections(self, collection_files): first_collection_file = collection_files[0] # First Collection File self.collection_manager = CollectionManager.load(first_collection_file) self.num_workers = self.collection_manager.get_num_workers() - - def get_tensors(self, tname_steps_dict, should_regex_match=False): - # now we do not need to do anything since we read the full event file - pass diff --git a/smdebug/trials/s3_trial.py b/smdebug/trials/s3_trial.py index 97e900494..c0f32f5a7 100644 --- a/smdebug/trials/s3_trial.py +++ b/smdebug/trials/s3_trial.py @@ -46,9 +46,9 @@ def __init__( self.index_reader = S3IndexReader(self.path) self.s3_handler = S3Handler() self._load_collections() - self.load_tensors() + self._load_tensors() - def get_collection_files(self) -> list: + def _get_collection_files(self) -> list: collection_files, _ = list_s3_objects( self.bucket_name, get_path_to_collections(self.prefix_name), @@ -61,9 +61,9 @@ def _load_tensors_from_index_tensors(self, index_tensors_dict): for tname in index_tensors_dict: for step, itds in index_tensors_dict[tname].items(): for worker in itds: - self.add_tensor(int(step), worker, itds[worker]["tensor_location"]) + self._add_tensor(int(step), worker, itds[worker]["tensor_location"]) - def read_collections(self, collection_files): + def _read_collections(self, collection_files): first_collection_file = collection_files[0] # First Collection File key = os.path.join(first_collection_file) collections_req = ReadObjectRequest(self._get_s3_location(key)) @@ -72,10 +72,5 @@ def read_collections(self, collection_files): self.collection_manager = CollectionManager.load_from_string(obj_data) self.num_workers = self.collection_manager.get_num_workers() - def get_tensors(self, tname_steps_dict, should_regex_match=False): - # to be used when getting selective tensors from S3 - # now we do not need to do anything since we read the full event file from S3 - pass - def _get_s3_location(self, obj): return "s3://" + self.bucket_name + "/" + obj diff --git a/smdebug/trials/trial.py b/smdebug/trials/trial.py index 228387a54..7d6fa0f41 100644 --- a/smdebug/trials/trial.py +++ b/smdebug/trials/trial.py @@ -119,11 +119,11 @@ def __repr__(self): ) @abstractmethod - def read_collections(self, collection_files): + def _read_collections(self, collection_files): pass @abstractmethod - def get_collection_files(self): + def _get_collection_files(self): pass def _load_collections(self): @@ -133,7 +133,7 @@ def _load_collections(self): def _fetch(): nonlocal collection_files nonlocal num_times_before_warning - collection_files = self.get_collection_files() + collection_files = self._get_collection_files() num_times_before_warning -= 1 if num_times_before_warning < 0: @@ -161,7 +161,7 @@ def _wait_for_all_collection_files(): _fetch() _wait_for_first_collection_file() - self.read_collections(collection_files) + self._read_collections(collection_files) _wait_for_all_collection_files() @abstractmethod @@ -278,7 +278,7 @@ def _populate_mode_to_tensor_name_map(self, tensor: TensorLocation) -> None: self.mode_to_tensors_map[tensor.mode] = set() self.mode_to_tensors_map[tensor.mode].add(tensor.tensorname) - def add_tensor(self, step_num, worker, tensor_object: TensorLocation): + def _add_tensor(self, step_num, worker, tensor_object: TensorLocation): to = tensor_object # self.worker_set.add(worker) if REDUCTIONS_PREFIX in to.tensorname: @@ -522,7 +522,7 @@ def has_passed_step(self, step, mode=ModeKeys.GLOBAL) -> StepState: return StepState.UNAVAILABLE return StepState.NOT_YET_AVAILABLE - def load_tensors(self): + def _load_tensors(self): if self.index_mode: self._load_tensors_from_index_files() diff --git a/smdebug/xgboost/hook.py b/smdebug/xgboost/hook.py index a2a1c97af..e53cfdbb8 100644 --- a/smdebug/xgboost/hook.py +++ b/smdebug/xgboost/hook.py @@ -105,17 +105,17 @@ def __init__( self.hyperparameters = hyperparameters self.train_data = self._validate_data(train_data) self.validation_data = self._validate_data(validation_data) - self.worker = self.get_worker_name() + self.worker = self._get_worker_name() self._full_shap_values = None set_hook(self) def __call__(self, env: CallbackEnv) -> None: self._callback(env) - def get_num_workers(self): + def _get_num_workers(self): return xgb.rabit.get_world_size() - def get_worker_name(self): + def _get_worker_name(self): return "worker_{}".format(xgb.rabit.get_rank()) @classmethod diff --git a/tests/core/test_hook_save_scalar.py b/tests/core/test_hook_save_scalar.py index 4772a78ff..16e14052e 100644 --- a/tests/core/test_hook_save_scalar.py +++ b/tests/core/test_hook_save_scalar.py @@ -64,7 +64,7 @@ def forward(self, x): model = Net().to(torch.device("cpu")) criterion = nn.NLLLoss() - hook.register_hook(model) + hook.register_module(model) if register_loss: hook.register_loss(criterion) model.train() @@ -105,12 +105,12 @@ def simple_mx_model(hook, steps=10, register_loss=False): mxnn.Dense(10), ) net.initialize(init=init.Xavier(), ctx=mx.cpu()) - hook.register_hook(net) + hook.register_block(net) train_loss = 0.0 softmax_cross_entropy = gluon.loss.SoftmaxCrossEntropyLoss() if register_loss: - hook.register_hook(softmax_cross_entropy) + hook.register_block(softmax_cross_entropy) trainer = gluon.Trainer(net.collect_params(), "sgd", {"learning_rate": 0.1}) hook.save_scalar("mx_before_train", 1, searchable=False) diff --git a/tests/pytorch/test_collection.py b/tests/pytorch/test_collection.py index 48cfe65df..d5a8dd4ea 100644 --- a/tests/pytorch/test_collection.py +++ b/tests/pytorch/test_collection.py @@ -28,7 +28,7 @@ def test_collection_add(hook=None, out_dir=None): hook_created = True model = Net().to(torch.device("cpu")) - hook.register_hook(model) + hook.register_module(model) optimizer = optim.SGD(model.parameters(), lr=0.001, momentum=0.9) train(model, hook, torch.device("cpu"), optimizer, num_steps=10) tr = create_trial(out_dir) diff --git a/tests/pytorch/test_distributed_training.py b/tests/pytorch/test_distributed_training.py index 3f71ebcc1..031cda791 100644 --- a/tests/pytorch/test_distributed_training.py +++ b/tests/pytorch/test_distributed_training.py @@ -79,7 +79,7 @@ def run(rank, size, include_workers="one", num_epochs=10, batch_size=128, num_ba include_workers=include_workers, ) - hook.register_hook(model) + hook.register_module(model) for epoch in range(num_epochs): epoch_loss = 0.0 @@ -94,7 +94,7 @@ def run(rank, size, include_workers="one", num_epochs=10, batch_size=128, num_ba optimizer.step() # print(f"Rank {dist.get_rank()}, epoch {epoch}: {epoch_loss / num_batches}") - assert hook.get_worker_name() == f"worker_{dist.get_rank()}" + assert hook._get_worker_name() == f"worker_{dist.get_rank()}" # Race condition here where both workers attempt to move # /tmp/{out_dir}/END_OF_JOB.ts to {out_dir}/END_OF_JOB.ts try: @@ -155,11 +155,11 @@ def test_run_net_single_process(): hook = smd.Hook( out_dir=out_dir, save_config=smd.SaveConfig(save_steps=[0, 1, 5]), save_all=True ) - hook.register_hook(model) + hook.register_module(model) train(model=model, device=device, optimizer=optimizer) hook._cleanup() - assert hook.get_worker_name() == "worker_0" + assert hook._get_worker_name() == "worker_0" trial = create_trial(path=out_dir) assert len(trial.workers()) == 1, f"trial.workers() = {trial.workers()}" diff --git a/tests/pytorch/test_loss.py b/tests/pytorch/test_loss.py index 63f579d0a..437598651 100644 --- a/tests/pytorch/test_loss.py +++ b/tests/pytorch/test_loss.py @@ -44,7 +44,7 @@ def create_net_and_train(out_dir, n_steps, use_loss_module=False, use_loss_funct criterion = nn.CrossEntropyLoss() hook = smd.Hook(out_dir=out_dir, save_config=smd.SaveConfig(save_interval=1)) - hook.register_hook(net) + hook.register_module(net) if use_loss_module: hook.register_loss(criterion) diff --git a/tests/pytorch/test_modes.py b/tests/pytorch/test_modes.py index f9fb6f4bd..52e6bd782 100644 --- a/tests/pytorch/test_modes.py +++ b/tests/pytorch/test_modes.py @@ -105,7 +105,7 @@ def helper_test_modes(hook=None, out_dir="./test_output/test_hook_modes/"): ], ) - hook.register_hook(model) + hook.register_module(model) optimizer = optim.SGD(model.parameters(), lr=0.001, momentum=0.9) hook.set_mode(mode=modes.TRAIN) train(model, device, optimizer, num_steps=10, save_steps=save_steps) diff --git a/tests/pytorch/test_reduce_config.py b/tests/pytorch/test_reduce_config.py index 31f83047e..051a7f1cf 100644 --- a/tests/pytorch/test_reduce_config.py +++ b/tests/pytorch/test_reduce_config.py @@ -45,7 +45,7 @@ def test_reduce_config(hook=None, out_dir=None): hook_created = True model = Net().to(torch.device("cpu")) - hook.register_hook(model) + hook.register_module(model) optimizer = optim.SGD(model.parameters(), lr=0.001, momentum=0.9) train(model, hook, torch.device("cpu"), optimizer, num_steps=10) diff --git a/tests/zero_code_change/pytorch_integration_tests.py b/tests/zero_code_change/pytorch_integration_tests.py index e5647626e..4fa75803a 100644 --- a/tests/zero_code_change/pytorch_integration_tests.py +++ b/tests/zero_code_change/pytorch_integration_tests.py @@ -33,7 +33,7 @@ def test_pytorch(script_mode: bool, use_loss_module=False): if script_mode: hook = smd.Hook(out_dir=sim.out_dir) - hook.register_hook(net) + hook.register_module(net) hook.register_loss(criterion) for i, data in enumerate(trainloader, 0):