diff --git a/smdebug/core/hook.py b/smdebug/core/hook.py index a16f2ed48..0e0ac9dc4 100644 --- a/smdebug/core/hook.py +++ b/smdebug/core/hook.py @@ -47,9 +47,10 @@ class ScalarCache(object): - def __init__(self, scalar_name, scalar_val, sm_metric, write_tb, write_event): + def __init__(self, scalar_name, scalar_val, mode, sm_metric, write_tb, write_event): self.name = scalar_name self.value = scalar_val + self.mode = mode self.sm_metric = sm_metric self.write_tb = write_tb self.write_event = write_event @@ -442,6 +443,10 @@ def _increment_step(self): self.step += 1 self.mode_steps[self.mode] += 1 + + # Increment Global step number irrespective of what mode it is + if self.mode != ModeKeys.GLOBAL: + self.mode_steps[ModeKeys.GLOBAL] = self.step self._collections_to_save_for_step = None def _write_state(self): @@ -566,12 +571,15 @@ def _write_scalars(self): for scalar_obj in self.scalar_cache: scalar_name = scalar_obj.name scalar_val = scalar_obj.value + scalar_mode = scalar_obj.mode sm_metric = scalar_obj.sm_metric write_tb = scalar_obj.write_tb write_event = scalar_obj.write_event if self.metrics_writer and sm_metric: self.metrics_writer.log_metric( - scalar_name, scalar_val, iteration_number=self.mode_steps[self.mode] + scalar_name + "_" + scalar_mode.name, + scalar_val, + iteration_number=self.mode_steps[scalar_mode], ) if write_tb: tb_writer = self._maybe_get_tb_writer() @@ -598,7 +606,7 @@ def save_scalar(self, name, value, sm_metric=False): val = self._make_numpy_array(value) if val.size != 1: raise TypeError(f"{name} has non scalar value of type: {type(value)}") - scalar_obj = ScalarCache(name, val, sm_metric=True, write_tb=True, write_event=True) + scalar_obj = ScalarCache(name, val, self.mode, sm_metric, write_tb=True, write_event=True) self.scalar_cache.append(scalar_obj) def _write_raw_tensor(self, tensor_name, tensor_value, save_collections, tensor_ref=None): @@ -659,7 +667,12 @@ def _save_for_tensor(self, tensor_name, tensor_value, check_before_write=True): # Always log loss to Minerva tensor_val = np.mean(np_val) scalar_obj = ScalarCache( - tensor_name, tensor_val, sm_metric=True, write_tb=False, write_event=False + tensor_name, + tensor_val, + self.mode, + sm_metric=True, + write_tb=False, + write_event=False, ) self.scalar_cache.append(scalar_obj) diff --git a/smdebug/mxnet/collection.py b/smdebug/mxnet/collection.py index fb8ebf24a..6685a732f 100644 --- a/smdebug/mxnet/collection.py +++ b/smdebug/mxnet/collection.py @@ -24,7 +24,7 @@ def _register_default_collections(self): self.get(CollectionKeys.WEIGHTS).include("^(?!gradient).*weight") self.get(CollectionKeys.BIASES).include("^(?!gradient).*bias") self.get(CollectionKeys.GRADIENTS).include("^gradient") - self.get(CollectionKeys.LOSSES).include(".*loss") + self.get(CollectionKeys.LOSSES).include(".*loss._(?!input).*output") def create_collection(self, name): super().create_collection(name, cls=Collection) diff --git a/smdebug/pytorch/collection.py b/smdebug/pytorch/collection.py index 5876faa54..fa9bcc74c 100644 --- a/smdebug/pytorch/collection.py +++ b/smdebug/pytorch/collection.py @@ -39,7 +39,7 @@ def _register_default_collections(self): self.get(CollectionKeys.WEIGHTS).include("^(?!gradient).*weight") self.get(CollectionKeys.BIASES).include("^(?!gradient).*bias") self.get(CollectionKeys.GRADIENTS).include("^gradient") - self.get(CollectionKeys.LOSSES).include("[Ll]oss") + self.get(CollectionKeys.LOSSES).include("[Ll]oss_(?!input).*output") def create_collection(self, name): super().create_collection(name, cls=Collection) diff --git a/tests/core/test_hook_save_scalar.py b/tests/core/test_hook_save_scalar.py index 3103aadc6..daf779784 100644 --- a/tests/core/test_hook_save_scalar.py +++ b/tests/core/test_hook_save_scalar.py @@ -67,6 +67,9 @@ def forward(self, x): hook.register_module(model) if register_loss: hook.register_loss(criterion) + + hook.save_scalar("pt_num_steps", steps, sm_metric=True) + model.train() optimizer = optim.SGD(model.parameters(), lr=0.001, momentum=0.9) @@ -82,11 +85,23 @@ def forward(self, x): loss = criterion(output, target) else: loss = F.nll_loss(output, target) - hook.save_scalar("pt_train_loss", loss.item(), sm_metric=True) loss.backward() optimizer.step() hook.save_scalar("pt_after_train", 1, sm_metric=False) + model.eval() + hook.set_mode(ModeKeys.EVAL) + with torch.no_grad(): + for i in range(steps): + batch_size = 32 + data, target = torch.rand(batch_size, 1, 28, 28), torch.rand(batch_size).long() + data, target = data.to("cpu"), target.to("cpu") + output = model(data) + if register_loss: + loss = criterion(output, target) + else: + loss = F.nll_loss(output, target) + def simple_mx_model(hook, steps=10, register_loss=False): """ @@ -111,6 +126,9 @@ def simple_mx_model(hook, steps=10, register_loss=False): softmax_cross_entropy = gluon.loss.SoftmaxCrossEntropyLoss() if register_loss: hook.register_block(softmax_cross_entropy) + + hook.save_scalar("mx_num_steps", steps, sm_metric=True) + trainer = gluon.Trainer(net.collect_params(), "sgd", {"learning_rate": 0.1}) hook.save_scalar("mx_before_train", 1, sm_metric=False) @@ -127,9 +145,16 @@ def simple_mx_model(hook, steps=10, register_loss=False): trainer.step(batch_size) # calculate training metrics train_loss += loss.mean().asscalar() - hook.save_scalar("mx_train_loss", loss.mean().asscalar(), sm_metric=True) hook.save_scalar("mx_after_train", 1, sm_metric=False) + hook.set_mode(ModeKeys.EVAL) + for i in range(steps): + batch_size = 32 + data, target = mx.random.randn(batch_size, 1, 28, 28), mx.random.randn(batch_size) + data = data.as_in_context(mx.cpu(0)) + val_output = net(data) + loss = softmax_cross_entropy(val_output, target) + def simple_tf_model(hook, steps=10, lr=0.4): """ @@ -164,7 +189,10 @@ def simple_tf_model(hook, steps=10, lr=0.4): hooks = [hook] hook.set_mode(ModeKeys.TRAIN) - model.fit(x_train, y_train, epochs=steps, steps_per_epoch=steps, callbacks=hooks, verbose=0) + model.fit(x_train, y_train, epochs=1, steps_per_epoch=steps, callbacks=hooks, verbose=0) + + hook.set_mode(ModeKeys.EVAL) + model.evaluate(x_test, y_test, steps=10, callbacks=hooks, verbose=0) def delete_local_trials(local_trials): @@ -172,27 +200,16 @@ def delete_local_trials(local_trials): shutil.rmtree(trial) -def check_trials(out_dir, save_steps, coll_name, saved_scalars=None): - """ - Create trial to check if non-scalar data is written as per save config and - check whether all the scalars written through save_scalar have been saved. - """ - trial = create_trial(path=out_dir, name="test output") - assert trial - tensor_list = set(trial.tensor_names()) & set(trial.tensor_names(collection=coll_name)) - for tname in tensor_list: - if tname not in saved_scalars: - assert len(trial.tensor(tname).steps()) == len(save_steps) - scalar_list = trial.tensor_names(regex="^scalar") - if scalar_list: - assert len(set(saved_scalars) & set(scalar_list)) == len(saved_scalars) +# need this to seek to the right file offset for test output verification +metrics_file_position = 0 -def check_metrics_file(saved_scalars): +def check_metrics_file(save_steps, saved_scalars=None): """ Check the SageMaker metrics file to ensure that all the scalars saved using save_scalar(sm_metrics=True) or mentioned through SM_METRICS collections, have been saved. """ + global metrics_file_position if is_sagemaker_job(): METRICS_DIR = os.environ.get(DEFAULT_SAGEMAKER_METRICS_PATH) if not METRICS_DIR: @@ -200,12 +217,90 @@ def check_metrics_file(saved_scalars): return file_name = "{}/{}.json".format(METRICS_DIR, str(os.getpid())) scalarnames = set() + + import collections + + train_metric = collections.defaultdict(list) + eval_metric = collections.defaultdict(list) + with open(file_name) as fp: + # since SM metrics expects all metrics to be written in 1 file, seeking to + # the right offset for the purpose of this test - so that the metrics logged in + # the corresponding test are verified + fp.seek(metrics_file_position) for line in fp: data = json.loads(line) - scalarnames.add(data["MetricName"]) + assert data["IterationNumber"] != -1 # iteration number should not be -1 + metric_name = data["MetricName"] + if "TRAIN" in metric_name: + train_metric[metric_name].append(data["IterationNumber"]) + scalarnames.add(metric_name.rstrip("_TRAIN")) + elif "EVAL" in metric_name: + eval_metric[metric_name].append(data["IterationNumber"]) + scalarnames.add(metric_name.rstrip("_EVAL")) + else: + scalarnames.add( + metric_name.rstrip("_GLOBAL") + ) # check the scalar saved using save_scalar() + metrics_file_position = fp.tell() assert scalarnames - assert len(set(saved_scalars) & set(scalarnames)) > 0 + + if saved_scalars: + assert len(set(saved_scalars) & set(scalarnames)) > 0 + + # check if all metrics have been written at the expected step number + for train_data in train_metric: + assert len(set(save_steps["TRAIN"]) & set(train_metric[train_data])) == len( + save_steps["TRAIN"] + ) + for eval_data in eval_metric: + assert len(set(save_steps["EVAL"]) & set(eval_metric[eval_data])) == len( + save_steps["EVAL"] + ) + + +def check_trials(out_dir, save_steps, saved_scalars=None): + """ + Create trial to check if non-scalar data is written as per save config and + check whether all the scalars written through save_scalar have been saved. + """ + trial = create_trial(path=out_dir, name="test output") + assert trial + tensor_list = trial.tensor_names() + for tname in tensor_list: + if tname not in saved_scalars: + train_steps = trial.tensor(tname).steps(mode=ModeKeys.TRAIN) + eval_steps = trial.tensor(tname).steps(mode=ModeKeys.EVAL) + + # check if all tensors have been saved according to save steps + assert len(set(save_steps["TRAIN"]) & set(train_steps)) == len(save_steps["TRAIN"]) + if eval_steps: # need this check for bias and gradients + assert len(set(save_steps["EVAL"]) & set(eval_steps)) == len(save_steps["EVAL"]) + scalar_list = trial.tensor_names(regex="^scalar") + if scalar_list: + assert len(set(saved_scalars) & set(scalar_list)) == len(saved_scalars) + + +def verify_files(out_dir, save_config, saved_scalars=None): + """ + Analyze the tensors saved and verify that metrics are stored correctly in the + SM metrics json file + """ + + # Retrieve save_step for verification in the trial and the JSON file + save_config_train_steps = save_config.get_save_config(ModeKeys.TRAIN).save_steps + if not save_config_train_steps: + save_interval = save_config.get_save_config(ModeKeys.TRAIN).save_interval + save_config_train_steps = [i for i in range(0, 10, save_interval)] + save_config_eval_steps = save_config.get_save_config(ModeKeys.EVAL).save_steps + if not save_config_eval_steps: + save_interval = save_config.get_save_config(ModeKeys.EVAL).save_interval + save_config_eval_steps = [i for i in range(0, 10, save_interval)] + + save_steps = {"TRAIN": save_config_train_steps, "EVAL": save_config_eval_steps} + + check_trials(out_dir, save_steps, saved_scalars) + check_metrics_file(save_steps, saved_scalars) def helper_pytorch_tests(collection, register_loss, save_config): @@ -214,21 +309,18 @@ def helper_pytorch_tests(collection, register_loss, save_config): run_id = "trial_" + coll_name + "-" + datetime.now().strftime("%Y%m%d-%H%M%S%f") trial_dir = os.path.join(SMDEBUG_PT_HOOK_TESTS_DIR, run_id) - hook = PT_Hook(out_dir=trial_dir, include_collections=[coll_name], export_tensorboard=True) - - coll = hook.get_collection(coll_name) - coll.save_config = save_config - save_steps = save_config.get_save_config(ModeKeys.TRAIN).save_steps - if not save_steps: - save_interval = save_config.get_save_config(ModeKeys.TRAIN).save_interval - save_steps = [i for i in range(0, 10, save_interval)] + hook = PT_Hook( + out_dir=trial_dir, + include_collections=[coll_name], + save_config=save_config, + export_tensorboard=True, + ) simple_pt_model(hook, register_loss=register_loss) hook.close() - saved_scalars = ["scalar/pt_before_train", "scalar/pt_train_loss", "scalar/pt_after_train"] - check_trials(trial_dir, save_steps, coll_name, saved_scalars) - check_metrics_file(saved_scalars) + saved_scalars = ["scalar/pt_num_steps", "scalar/pt_before_train", "scalar/pt_after_train"] + verify_files(trial_dir, save_config, saved_scalars) @pytest.mark.parametrize("collection", [("all", ".*"), ("scalars", "^scalar")]) @@ -240,6 +332,7 @@ def helper_pytorch_tests(collection, register_loss, save_config): { ModeKeys.TRAIN: SaveConfigMode(save_interval=2), ModeKeys.GLOBAL: SaveConfigMode(save_interval=3), + ModeKeys.EVAL: SaveConfigMode(save_interval=1), } ), ], @@ -256,20 +349,18 @@ def helper_mxnet_tests(collection, register_loss, save_config): run_id = "trial_" + coll_name + "-" + datetime.now().strftime("%Y%m%d-%H%M%S%f") trial_dir = os.path.join(SMDEBUG_MX_HOOK_TESTS_DIR, run_id) - hook = MX_Hook(out_dir=trial_dir, include_collections=[coll_name], export_tensorboard=True) - coll = hook.get_collection(coll_name) - coll.save_config = save_config - save_steps = save_config.get_save_config(ModeKeys.TRAIN).save_steps - if not save_steps: - save_interval = save_config.get_save_config(ModeKeys.TRAIN).save_interval - save_steps = [i for i in range(0, 10, save_interval)] + hook = MX_Hook( + out_dir=trial_dir, + include_collections=[coll_name], + save_config=save_config, + export_tensorboard=True, + ) simple_mx_model(hook, register_loss=register_loss) hook.close() - saved_scalars = ["scalar/mx_before_train", "scalar/mx_train_loss", "scalar/mx_after_train"] - check_trials(trial_dir, save_steps, coll_name, saved_scalars) - check_metrics_file(saved_scalars) + saved_scalars = ["scalar/mx_num_steps", "scalar/mx_before_train", "scalar/mx_after_train"] + verify_files(trial_dir, save_config, saved_scalars) @pytest.mark.parametrize("collection", [("all", ".*"), ("scalars", "^scalar")]) @@ -281,6 +372,7 @@ def helper_mxnet_tests(collection, register_loss, save_config): { ModeKeys.TRAIN: SaveConfigMode(save_interval=2), ModeKeys.GLOBAL: SaveConfigMode(save_interval=3), + ModeKeys.EVAL: SaveConfigMode(save_interval=1), } ), ], @@ -297,25 +389,34 @@ def helper_tensorflow_tests(collection, save_config): run_id = "trial_" + coll_name + "-" + datetime.now().strftime("%Y%m%d-%H%M%S%f") trial_dir = os.path.join(SMDEBUG_TF_HOOK_TESTS_DIR, run_id) - hook = TF_Hook(out_dir=trial_dir, include_collections=[coll_name], export_tensorboard=True) - coll = hook.get_collection(coll_name) - coll.save_config = save_config - save_steps = save_config.get_save_config(ModeKeys.TRAIN).save_steps - if not save_steps: - save_interval = save_config.get_save_config(ModeKeys.TRAIN).save_interval - save_steps = [i for i in range(0, 10, save_interval)] + hook = TF_Hook( + out_dir=trial_dir, + include_collections=[coll_name], + save_config=save_config, + export_tensorboard=True, + ) simple_tf_model(hook) hook.close() saved_scalars = ["loss"] - check_trials(trial_dir, save_steps, coll_name, saved_scalars) - check_metrics_file(saved_scalars) + verify_files(trial_dir, save_config, saved_scalars) -@pytest.mark.slow # 1:30 -def test_tf_save_scalar(): - save_config = SaveConfig(save_steps=[0, 2, 4, 6, 8]) - collection = ("sm_metrics", "loss") +@pytest.mark.parametrize("collection", [("all", ".*"), ("sm_metrics", "loss")]) +@pytest.mark.parametrize( + "save_config", + [ + SaveConfig(save_steps=[0, 2, 4, 6, 8]), + SaveConfig( + { + ModeKeys.TRAIN: SaveConfigMode(save_interval=2), + ModeKeys.GLOBAL: SaveConfigMode(save_interval=3), + ModeKeys.EVAL: SaveConfigMode(save_interval=1), + } + ), + ], +) +def test_tf_save_scalar(collection, save_config): helper_tensorflow_tests(collection, save_config) delete_local_trials([SMDEBUG_TF_HOOK_TESTS_DIR]) diff --git a/tests/mxnet/test_hook_loss_collection.py b/tests/mxnet/test_hook_loss_collection.py index 95307c316..1a21b64f7 100644 --- a/tests/mxnet/test_hook_loss_collection.py +++ b/tests/mxnet/test_hook_loss_collection.py @@ -33,6 +33,9 @@ def test_loss_collection_default(): loss_val = loss_tensor.value(step_num=1) assert len(loss_val) > 0 + # Assert that we are not logging the inputs to loss block. + input_loss_tensors = tr.tensor_names(regex=".*loss._input*") + assert len(input_loss_tensors) == 0 shutil.rmtree(out_dir) diff --git a/tests/pytorch/test_loss.py b/tests/pytorch/test_loss.py index e140319ca..f09822961 100644 --- a/tests/pytorch/test_loss.py +++ b/tests/pytorch/test_loss.py @@ -102,9 +102,9 @@ def test_register_loss_module(out_dir): loss_coll = trial.collection("losses") loss_tensor = trial.tensor("CrossEntropyLoss_output_0") - # Capture ['CrossEntropyLoss_input_0', 'CrossEntropyLoss_input_1', 'CrossEntropyLoss_output_0'] - assert len(trial.tensor_names()) == 3 - assert len(loss_coll.tensor_names) == 3 + # Capture ['CrossEntropyLoss_output_0'] + assert len(trial.tensor_names()) == 1 + assert len(loss_coll.tensor_names) == 1 # Loss should be logged for all the steps since passed `available_steps = range(n_steps)` assert len(trial.steps()) == n_steps