diff --git a/docs/api.md b/docs/api.md index 778cf3e46..3490b14a5 100644 --- a/docs/api.md +++ b/docs/api.md @@ -96,6 +96,7 @@ include_workers include_regex reductions save_raw_tensor +save_shape save_interval save_steps start_step diff --git a/smdebug/core/hook.py b/smdebug/core/hook.py index 10577fed6..758782620 100644 --- a/smdebug/core/hook.py +++ b/smdebug/core/hook.py @@ -46,7 +46,7 @@ size_and_shape, validate_custom_tensor_value, ) -from smdebug.core.writer import FileWriter +from smdebug.core.writer import FileWriter, ShapeWriter from smdebug.exceptions import InvalidCollectionConfiguration try: @@ -222,7 +222,7 @@ def __init__( self.mode = ModeKeys.GLOBAL self.mode_steps = {ModeKeys.GLOBAL: init_step} self.writer = None - + self.shape_writer = None if is_sagemaker_job() and SageMakerFileMetricsWriter is not None: self.metrics_writer = SageMakerFileMetricsWriter() else: @@ -343,6 +343,12 @@ def _get_collections_to_save_for_step(self) -> Set["Collection"]: ) return self._collections_to_save_for_step + def _saving_shapes_in_step(self) -> bool: + for coll in self._get_collections_to_save_for_step(): + if coll.reduction_config.save_shape is True: + return True + return False + def _get_collections_with_tensor(self, tensor_name) -> Set["Collection"]: self._assert_prep() # for tf this will be prepopulated in check_and_add_tensor @@ -404,6 +410,17 @@ def _prepare_collections(self): self.prepared_collections = True #### End of Save Manager methods #### + @staticmethod + def _close_given_writer_map(writer_dict): + # Delete all the dist training writers + to_delete_writers = [] + for key, writer in writer_dict.items(): + # close calls flush + writer.close() + to_delete_writers.append(key) + + for key in to_delete_writers: + del writer_dict[key] def _close_writers(self) -> None: if self.dry_run: @@ -417,16 +434,11 @@ def _close_writers(self) -> None: self.writer.close() self.writer = None - to_delete_writers = [] + self._close_given_writer_map(self.tb_writers) - # Delete all the tb writers - for mode, writer in self.tb_writers.items(): - if writer is not None: - writer.flush() - writer.close() - to_delete_writers.append(mode) - for mode in to_delete_writers: - del self.tb_writers[mode] + if self.shape_writer is not None: + self.shape_writer.close() + self.shape_writer = None def _initialize_writers(self, only_initialize_if_missing=False) -> None: # Function is overridden in smdebug/tensorflow/base_hook.py @@ -454,9 +466,24 @@ def _initialize_writers(self, only_initialize_if_missing=False) -> None: if self.save_all_workers is False: if self.worker != self.chief_worker: return + self.writer = FileWriter(trial_dir=self.out_dir, step=self.step, worker=self.worker) - def _get_writers(self, tensor_name, tensor_ref=None) -> List[FileWriter]: + if self._saving_shapes_in_step(): + self.shape_writer = ShapeWriter( + trial_dir=self.out_dir, + step=self.step, + worker=self.worker, + index_writer=self.writer.index_writer, + ) + + def _get_single_process_writers(self, shape_writers=False) -> List[FileWriter]: + if shape_writers is False: + return [self.writer] if self.writer else [] + else: + return [self.shape_writer] if self.shape_writer else [] + + def _get_writers(self, tensor_name, tensor_ref=None, shape_writers=False) -> List[FileWriter]: """ :param tensor_name: :param tensor_ref: used by TF @@ -464,7 +491,7 @@ def _get_writers(self, tensor_name, tensor_ref=None) -> List[FileWriter]: """ if self.save_all_workers is False and self.worker != self.chief_worker: return [] - return [self.writer] if self.writer else [] + return self._get_single_process_writers(shape_writers) def _maybe_get_tb_writer(self) -> Optional[FileWriter]: """ Returns a FileWriter object if `hook.tensorboard_dir` has been specified, else None. @@ -726,6 +753,28 @@ def _write_raw_tensor(self, tensor_name, tensor_value, save_collections, tensor_ self._write_raw_tensor_simple(tensor_name, tensor_value, tensor_ref=tensor_ref) break + def _write_shape(self, tensor_name, tensor_value, save_collections, tensor_ref=None): + shape_writers = self._get_writers(tensor_name, tensor_ref=tensor_ref, shape_writers=True) + for s_col in save_collections: + reduction_config = s_col.reduction_config + if self.dry_run is False and reduction_config.save_shape is True: + numpy_tensor_value = self._make_numpy_array(tensor_value) + this_size, this_shape = size_and_shape(numpy_tensor_value) + if tensor_ref is not None and tensor_ref.tf_obj is not None: + original_name = tensor_ref.tf_obj.name + else: + original_name = None + + for writer in shape_writers: + writer.write_shape( + tensor_name, + this_shape, + self.mode, + self.mode_steps[self.mode], + original_name=original_name, + ) + break + def _write_raw_tensor_simple(self, tensor_name, tensor_value, tensor_ref=None, timestamp=None): # tensor_ref is used by TF # todo: if fp16, check perf of saving as fp16 in proto vs as fp32 @@ -805,6 +854,9 @@ def _write_for_tensor(self, tensor_name, tensor_value, save_collections, tensor_ :param save_collections: list of collections which are being saved for this step """ self._log_save(tensor_name, save_collections) + + self._write_shape(tensor_name, tensor_value, save_collections, tensor_ref=tensor_ref) + # write reductions defined for collections this tensor may be part of self._write_reductions(tensor_name, tensor_value, save_collections, tensor_ref=tensor_ref) diff --git a/smdebug/core/index_reader.py b/smdebug/core/index_reader.py index 3b23e1671..2c182d0d0 100644 --- a/smdebug/core/index_reader.py +++ b/smdebug/core/index_reader.py @@ -16,7 +16,7 @@ MISSING_EVENT_FILE_RETRY_LIMIT, MISSING_EVENT_FILE_RETRY_LIMIT_KEY, ) -from smdebug.core.locations import IndexFileLocationUtils, TensorLocation +from smdebug.core.locations import IndexFileLocationUtils, TensorLocation, TensorShape from smdebug.core.logger import get_logger from smdebug.core.modes import ModeKeys from smdebug.core.s3_utils import list_s3_objects @@ -120,12 +120,22 @@ def fetch_tensor_value(self, tensor_location: TensorLocation): def list_event_files(self, start_after_prefix): pass - @abstractmethod def load_tensor_data_from_index_files( self, start_after_key=None, range_steps=None ) -> Tuple[Dict[str, Dict[int, Dict[str, TensorLocation]]], str]: """Return a triply nested dict referring to tensor data.""" + responses, steps, last_index_token, workers = self.read_index_files( + start_after_key, range_steps + ) + + tensor_data = {} + for step, response, worker in zip(steps, responses, workers): + tensor_data = self._update_tensors_from_json( + tensor_data, step, response, self.path, worker + ) + return tensor_data, last_index_token + @abstractmethod def _is_event_file_present(self, file_name) -> bool: pass @@ -203,8 +213,10 @@ def _validate(index_dict): raise IndexReaderException("meta section is not present") if len(index_dict["meta"]) == 0: raise IndexReaderException("meta section is empty") - if "tensor_payload" not in index_dict: - raise IndexReaderException("tensor_payload section is not present") + if "tensor_payload" not in index_dict and "shape_payload" not in index_dict: + raise IndexReaderException( + "neither tensor_payload nor shape_payload sections are present" + ) def _update_tensors_from_json( self, index_tensors_dict, step, response: bytes, path, worker @@ -233,28 +245,41 @@ def _update_tensors_from_json( mode = index_meta["mode"] mode = ModeKeys[mode.strip()] mode_step = index_meta["mode_step"] - event_file_name = os.path.join(path, index_meta["event_file_name"]) - tensors = index_dict["tensor_payload"] - for tensor in tensors: - tensor_name = tensor["tensorname"] - start_idx = tensor["start_idx"] - length = tensor["length"] - tensor_location = TensorLocation( - tensor_name, mode, mode_step, event_file_name, start_idx, length, worker - ) + + to_update_index_dict = [] + + if "tensor_payload" in index_dict and len(index_dict["tensor_payload"]): + event_file_name = os.path.join(path, index_meta["event_file_name"]) + for tensor in index_dict["tensor_payload"]: + tensor_name = tensor["tensorname"] + start_idx = tensor["start_idx"] + length = tensor["length"] + tensor_location = TensorLocation( + tensor_name, mode, mode_step, event_file_name, start_idx, length, worker + ) + to_update_index_dict.append((tensor_name, step, tensor_location)) + + if "shape_payload" in index_dict and len(index_dict["shape_payload"]): + for tensor in index_dict["shape_payload"]: + tensor_name = tensor["tensorname"] + original_name = tensor["originalname"] + shape = tensor["shape"] + ts = TensorShape(tensor_name, mode, mode_step, shape, original_name) + to_update_index_dict.append((tensor_name, step, ts)) + + for tu in to_update_index_dict: + tensor_name, step, obj = tu + if isinstance(obj, TensorLocation): + obj_dict = {"tensor_location": obj} + elif isinstance(obj, TensorShape): + obj_dict = {"tensor_shape": obj} if tensor_name in index_tensors_dict: if step in index_tensors_dict[tensor_name]: - index_tensors_dict[tensor_name][step].update( - {worker: {"tensor_location": tensor_location}} - ) + index_tensors_dict[tensor_name][step].update({worker: obj_dict}) else: - index_tensors_dict[tensor_name].update( - {step: {worker: {"tensor_location": tensor_location}}} - ) + index_tensors_dict[tensor_name].update({step: {worker: obj_dict}}) else: - index_tensors_dict[tensor_name] = { - step: {worker: {"tensor_location": tensor_location}} - } + index_tensors_dict[tensor_name] = {step: {worker: obj_dict}} return index_tensors_dict @@ -285,22 +310,6 @@ def fetch_tensor_value(self, tensor_location: TensorLocation) -> np.ndarray: tensor_name, step, tensor_data, mode, mode_step = tensor_tuple return tensor_data - def load_tensor_data_from_index_files( - self, start_after_key=None, range_steps=None - ) -> Tuple[Dict[str, Dict[int, Dict[str, TensorLocation]]], str]: - """Return a triply nested dict referring to tensor data.""" - - responses, steps, last_index_token, workers = self.read_index_files( - start_after_key, range_steps - ) - - tensor_data = {} - for step, response, worker in zip(steps, responses, workers): - tensor_data = self._update_tensors_from_json( - tensor_data, step, response, self.path, worker - ) - return tensor_data, last_index_token - def read_index_files( self, start_after_key: str, range_steps=None ) -> Tuple[List[bytes], list, str, List[str]]: @@ -398,21 +407,6 @@ def fetch_tensor_value(self, tensor_location: TensorLocation) -> np.ndarray: tensor_name, step, tensor_data, mode, mode_step = tensor_tuple return tensor_data - def load_tensor_data_from_index_files( - self, start_after_key=None, range_steps=None - ) -> Tuple[Dict[str, Dict[int, Dict[str, TensorLocation]]], str]: - """Return a triply nested dict referring to tensor data.""" - - responses, steps, last_index_token, workers = self.read_index_files( - start_after_key, range_steps - ) - tensor_data = {} - for step, response, worker in zip(steps, responses, workers): - tensor_data = self._update_tensors_from_json( - tensor_data, step, response, self.path, worker - ) - return tensor_data, last_index_token - def read_index_files( self, start_after_key: str, range_steps=None ) -> Tuple[List[bytes], list, str, List[str]]: diff --git a/smdebug/core/locations.py b/smdebug/core/locations.py index af703e514..9712f58a2 100644 --- a/smdebug/core/locations.py +++ b/smdebug/core/locations.py @@ -24,6 +24,20 @@ def to_dict(self): return {"tensorname": self.tensorname, "start_idx": self.start_idx, "length": self.length} +class TensorShape: + def __init__(self, name, mode, mode_step, shape, original_name=None): + if original_name is None: + original_name = name + self.name = name + self.original_name = original_name + self.mode = mode + self.mode_step = mode_step + self.shape = tuple(shape) + + def to_dict(self): + return {"tensorname": self.name, "originalname": self.original_name, "shape": self.shape} + + STEP_NUMBER_FORMATTING_LENGTH = "012" diff --git a/smdebug/core/reduction_config.py b/smdebug/core/reduction_config.py index 9a24ff6d6..1fa6121b6 100644 --- a/smdebug/core/reduction_config.py +++ b/smdebug/core/reduction_config.py @@ -3,12 +3,23 @@ from typing import Any, Dict # First Party +from smdebug.core.logger import get_logger from smdebug.core.utils import split +logger = get_logger() + + ALLOWED_REDUCTIONS = ["min", "max", "mean", "std", "variance", "sum", "prod"] ALLOWED_NORMS = ["l1", "l2"] REDUCTION_CONFIG_VERSION_NUM = "v0" -ALLOWED_PARAMS = ["reductions", "abs_reductions", "norms", "abs_norms", "save_raw_tensor"] +ALLOWED_PARAMS = [ + "reductions", + "abs_reductions", + "norms", + "abs_norms", + "save_raw_tensor", + "save_shape", +] class ReductionConfig: @@ -49,12 +60,14 @@ def __init__( norms=None, abs_norms=None, save_raw_tensor=False, + save_shape=False, ): self.reductions = reductions if reductions is not None else [] self.abs_reductions = abs_reductions if abs_reductions is not None else [] self.norms = norms if norms is not None else [] self.abs_norms = abs_norms if abs_norms is not None else [] self.save_raw_tensor = save_raw_tensor + self.save_shape = save_shape ## DO NOT REMOVE, if you add anything here, please make sure that _check & from_json is updated accordingly self._check() @@ -75,6 +88,8 @@ def _check(self): raise ValueError("abs_norms can only be one of " + ",".join(ALLOWED_NORMS)) if not isinstance(self.save_raw_tensor, bool): raise ValueError(f"save_raw_tensor={self.save_raw_tensor} must be a boolean") + if not isinstance(self.save_shape, bool): + raise ValueError(f"save_shape={self.save_shape} must be a boolean") @classmethod def from_dict(cls, params: Dict[str, Any]) -> "ReductionConfig": @@ -83,7 +98,7 @@ def from_dict(cls, params: Dict[str, Any]) -> "ReductionConfig": return None if not isinstance(params, dict): raise ValueError(f"params={params} must be dict") - + save_shape = params.get("save_shape", False) save_raw_tensor = params.get("save_raw_tensor", False) # Parse comma-separated string into array all_reductions = split(params.get("reductions", "")) @@ -108,6 +123,7 @@ def from_dict(cls, params: Dict[str, Any]) -> "ReductionConfig": norms=norms, abs_norms=abs_norms, save_raw_tensor=save_raw_tensor, + save_shape=save_shape, ) @classmethod @@ -116,7 +132,6 @@ def from_json(cls, json_str: str) -> "ReductionConfig": return cls.from_dict(d) def to_json_dict(self) -> Dict[str, Any]: - save_raw_tensor = self.save_raw_tensor # Convert reductions from various arrays into single comma-separated string all_reductions = [] for red in self.reductions: @@ -129,7 +144,11 @@ def to_json_dict(self) -> Dict[str, Any]: all_reductions.append(f"abs_{red}_norm") all_reductions_str = ",".join(all_reductions) # Return the dict - return {"save_raw_tensor": save_raw_tensor, "reductions": all_reductions_str} + return { + "save_raw_tensor": self.save_raw_tensor, + "reductions": all_reductions_str, + "save_shape": self.save_shape, + } def to_json(self) -> str: return json.dumps(self.to_json_dict()) @@ -144,10 +163,11 @@ def __eq__(self, other): and self.norms == other.norms and self.abs_norms == other.abs_norms and self.save_raw_tensor == other.save_raw_tensor + and self.save_shape == other.save_shape ) def __repr__(self): return ( f"" + f"abs_reductions={self.abs_reductions}, norms={self.norms}, abs_norms={self.abs_norms}>, save_shape={self.save_shape}, save_raw_tensor={self.save_raw_tensor}" ) diff --git a/smdebug/core/tensor.py b/smdebug/core/tensor.py index c268c48fc..de52f7858 100644 --- a/smdebug/core/tensor.py +++ b/smdebug/core/tensor.py @@ -10,6 +10,7 @@ from smdebug.exceptions import ( InvalidWorker, NoMoreData, + ShapeUnavailableForStep, StepNotYetAvailable, StepUnavailable, TensorUnavailableForStep, @@ -62,6 +63,16 @@ def set_step_location(self, step_num, worker, location): s = self._steps[step_num][worker] s.location = location + def set_step_shape(self, step_num, worker, shape): + step = Step(step_num, shape=shape) + if step_num not in self._steps: + self._steps[step_num] = {worker: step} + elif worker not in self._steps[step_num]: + self._steps[step_num].update({worker: step}) + + s = self._steps[step_num][worker] + s.shape = shape + def set_step_reduction_value(self, step_num, worker, red_name, abs, red_value): if step_num not in self._steps: s = Step(step_num) @@ -88,10 +99,11 @@ def step(self, step_num): class Step: """Contains the step number, value, location, and reduction values/locations.""" - def __init__(self, step_num, value=None, location=None): + def __init__(self, step_num, value=None, location=None, shape=None): self.step_num = step_num self.value = value self.location = location + self.shape = shape # mapping from (red_name, abs) to value self._reduction_values = {} @@ -126,6 +138,9 @@ class Tensor: def __init__(self, name, trial, cache): self._mode_steps = {} self.name = name + # SMdebug modifies some names of tensors to be more descriptive + # In such cases we save here the original name + self.original_name = None self.trial = trial self.cache = cache @@ -264,6 +279,16 @@ def value(self, step_num, mode=ModeKeys.GLOBAL, worker=None): has_reductions = has_reduction_locations or has_reduction_values raise TensorUnavailableForStep(self.name, step_num, mode, has_reductions) + def shape(self, step_num, mode=ModeKeys.GLOBAL, worker=None): + s = self._step(step_num=step_num, mode=mode, worker=worker) + if s.shape is not None: + return s.shape + try: + value = self.value(step_num, mode, worker) + return value.shape + except TensorUnavailableForStep: + raise ShapeUnavailableForStep(self.name, step_num, mode) + def reduction_values(self, step_num, mode=ModeKeys.GLOBAL, worker=None): s = self._step(step_num=step_num, mode=mode, worker=worker) if s is not None: @@ -334,9 +359,13 @@ def _create_mode_step(self, mode, mode_step): if mode not in self._mode_steps: self._mode_steps[mode] = ModeSteps(mode) - def add_step(self, mode, mode_step, worker, location): + def add_step(self, mode, mode_step, worker, tensor_location, tensor_shape): self._create_mode_step(mode, mode_step) - self._mode_steps[mode].set_step_location(mode_step, worker, location) + if tensor_location is not None: + self._mode_steps[mode].set_step_location(mode_step, worker, tensor_location) + if tensor_shape is not None: + self._mode_steps[mode].set_step_shape(mode_step, worker, tensor_shape.shape) + self.original_name = tensor_shape.original_name def add_reduction_step(self, mode, mode_step, worker, red_name, abs, red_location): self._create_mode_step(mode, mode_step) diff --git a/smdebug/core/tfevent/index_file_writer.py b/smdebug/core/tfevent/index_file_writer.py index 80cb54b68..886c7760e 100644 --- a/smdebug/core/tfevent/index_file_writer.py +++ b/smdebug/core/tfevent/index_file_writer.py @@ -13,6 +13,7 @@ def __init__(self, file_path): self.file_path = file_path self.index_payload = [] self.index_meta = {} + self.shape_payload = [] self.writer = None def __exit__(self): @@ -28,7 +29,7 @@ def _init_writer(self): def add_index(self, tensorlocation): if not self.writer: self._init_writer() - if not self.index_meta: + if not self.index_meta or not "event_file_name" in self.index_meta: self.index_meta = { "mode": tensorlocation.mode, "mode_step": tensorlocation.mode_step, @@ -36,6 +37,13 @@ def add_index(self, tensorlocation): } self.index_payload.append(tensorlocation.to_dict()) + def add_shape(self, tensorshape): + if not self.writer: + self._init_writer() + if not self.index_meta: + self.index_meta = {"mode": tensorshape.mode, "mode_step": tensorshape.mode_step} + self.shape_payload.append(tensorshape.to_dict()) + def flush(self): """Flushes the event string to file.""" if not self.writer: @@ -44,33 +52,45 @@ def flush(self): raise ValueError( f"Cannot write empty index_meta={self.index_meta} to file {self.file_path}" ) - if not self.index_payload: + if not self.index_payload and not self.shape_payload: raise ValueError( - f"Cannot write empty index_payload={self.index_payload} to file {self.file_path}" + f"Cannot write empty payload: index_payload={self.index_payload}, shape_payload={self.shape_payload} to file {self.file_path}" ) - index = Index(meta=self.index_meta, tensor_payload=self.index_payload) + index = Index( + meta=self.index_meta, + tensor_payload=self.index_payload, + shape_payload=self.shape_payload, + ) self.writer.write(index.to_json()) self.writer.flush() self.index_meta = {} - self.index_payload = {} + self.index_payload = [] + self.shape_payload = [] def close(self): """Closes the record writer.""" if self.writer is not None: - if self.index_meta and self.index_payload: + if self.index_meta and (self.index_payload or self.shape_payload): self.flush() self.writer.close() self.writer = None class Index: - def __init__(self, meta=None, tensor_payload=None): + def __init__(self, meta=None, tensor_payload=None, shape_payload=None): self.meta = meta self.tensor_payload = tensor_payload + self.shape_payload = shape_payload def to_json(self): - return json.dumps({"meta": self.meta, "tensor_payload": self.tensor_payload}) + return json.dumps( + { + "meta": self.meta, + "tensor_payload": self.tensor_payload, + "shape_payload": self.shape_payload, + } + ) class EventWithIndex(object): diff --git a/smdebug/core/writer.py b/smdebug/core/writer.py index a342cf433..3966da82b 100644 --- a/smdebug/core/writer.py +++ b/smdebug/core/writer.py @@ -16,6 +16,9 @@ # under the License. """APIs for logging data in the event file.""" +# Standard Library +from typing import Tuple + # First Party from smdebug.core.modes import MODE_PLUGIN_NAME, MODE_STEP_PLUGIN_NAME from smdebug.core.tfevent.event_file_writer import EventFileWriter @@ -31,14 +34,64 @@ from smdebug.core.tfevent.util import make_tensor_proto # Local -from .locations import IndexFileLocationUtils, TensorboardFileLocation, TensorFileLocation +from .locations import ( + IndexFileLocationUtils, + TensorboardFileLocation, + TensorFileLocation, + TensorShape, +) from .logger import get_logger from .modes import ModeKeys logger = get_logger() -class FileWriter: +class BaseWriter: + def __init__(self, trial_dir, worker, step=0, mode=ModeKeys.GLOBAL): + self.trial_dir = trial_dir + self.step = step + self.worker = worker + if worker is None: + assert False, "Worker should not be none. Check worker name initialization" + self.mode = mode + self._writer = None + self._index_writer = None + + def name(self): + return self._writer.name() + + def __enter__(self): + """Make usable with "with" statement.""" + return self + + def __exit__(self, unused_type, unused_value, unused_traceback): + """Make usable with "with" statement.""" + self.close() + + def flush(self): + """Flushes the event file to disk. + Call this method to make sure that all pending events have been written to disk. + """ + self._writer.flush() + # don't flush index writer as we only want to flush on close + + @classmethod + def create_index_writer(cls, trial_dir, worker, step): + el = TensorFileLocation(step_num=step, worker_name=worker) + event_file_path = el.get_file_location(trial_dir=trial_dir) + index_file_path = IndexFileLocationUtils.get_index_key_for_step(trial_dir, step, worker) + return IndexWriter(index_file_path) + + @property + def index_writer(self): + return self._index_writer + + @index_writer.setter + def index_writer(self, iw): + self._index_writer = iw + + +class FileWriter(BaseWriter): def __init__( self, trial_dir, @@ -50,6 +103,7 @@ def __init__( flush_secs=120, verbose=False, write_checksum=False, + index_writer=None, ): """Creates a `FileWriter` and an file. On construction the summary writer creates a new event file in `trial_dir`. @@ -71,19 +125,16 @@ def __init__( verbose : bool Determines whether to print logging messages. """ - self.trial_dir = trial_dir - self.step = step - self.worker = worker - if worker is None: - assert False, "Worker should not be none. Check worker name initialization" - self.mode = mode + super(FileWriter, self).__init__(trial_dir, worker, step, mode) if wtype == "events": + if index_writer is None: + self.index_writer = self.create_index_writer( + trial_dir=trial_dir, worker=worker, step=step + ) + else: + self.index_writer = index_writer el = TensorFileLocation(step_num=self.step, worker_name=self.worker) event_file_path = el.get_file_location(trial_dir=self.trial_dir) - index_file_path = IndexFileLocationUtils.get_index_key_for_step( - self.trial_dir, self.step, self.worker - ) - self.index_writer = IndexWriter(index_file_path) elif wtype == "tensorboard": el = TensorboardFileLocation( step_num=self.step, worker_name=self.worker, mode=self.mode @@ -103,14 +154,6 @@ def __init__( ) self._default_bins = _get_default_bins() - def __enter__(self): - """Make usable with "with" statement.""" - return self - - def __exit__(self, unused_type, unused_value, unused_traceback): - """Make usable with "with" statement.""" - self.close() - @staticmethod def _get_metadata(mode, mode_step): sm2 = SummaryMetadata.PluginData(plugin_name=MODE_STEP_PLUGIN_NAME, content=str(mode_step)) @@ -187,13 +230,6 @@ def write_scalar_summary(self, name, value, global_step, timestamp: float = None s = scalar_summary(name, value) self._writer.write_summary(s, global_step, timestamp=timestamp) - def flush(self): - """Flushes the event file to disk. - Call this method to make sure that all pending events have been written to disk. - """ - self._writer.flush() - # don't flush index writer as we only want to flush on close - def close(self): """Flushes the event file to disk and close the file. Call this method when you do not need the summary writer anymore. @@ -202,9 +238,6 @@ def close(self): if self.index_writer is not None: self.index_writer.close() - def name(self): - return self._writer.name() - @staticmethod def _check_mode_step(mode, mode_step, global_step): if mode_step is None: @@ -216,3 +249,24 @@ def _check_mode_step(mode, mode_step, global_step): ex_str = "mode can be one of " + ", ".join(mode_keys) raise ValueError(ex_str) return mode, mode_step + + +class ShapeWriter(BaseWriter): + def __init__(self, trial_dir, worker, index_writer, step=0, mode=ModeKeys.GLOBAL): + super(ShapeWriter, self).__init__(trial_dir, worker, step, mode) + self._index_writer = index_writer + + def write_shape( + self, name, shape: Tuple[int], mode=ModeKeys.GLOBAL, mode_step=None, original_name=None + ): + self._index_writer.add_shape( + TensorShape(name, mode.name, mode_step, shape, original_name=original_name) + ) + + def flush(self): + self._index_writer.flush() + + def close(self): + """Flushes the event file to disk and close the file. + """ + self._index_writer.close() diff --git a/smdebug/exceptions.py b/smdebug/exceptions.py index 6d917e6bf..e2ed43da9 100644 --- a/smdebug/exceptions.py +++ b/smdebug/exceptions.py @@ -68,6 +68,21 @@ def __str__(self): return msg +class ShapeUnavailableForStep(Exception): + def __init__(self, tname, step, mode=modes.GLOBAL): + self.step = step + self.mode = mode + self.tname = tname + + def __str__(self): + msg = ( + "Shape for tensor {} is not available for step {} " + "with mode {} as it was not saved." + "".format(self.tname, self.step, self.mode.name) + ) + return msg + + class TensorUnavailable(Exception): def __init__(self, tname): self.tname = tname diff --git a/smdebug/tensorflow/base_hook.py b/smdebug/tensorflow/base_hook.py index a8cc9f679..ad6557ade 100644 --- a/smdebug/tensorflow/base_hook.py +++ b/smdebug/tensorflow/base_hook.py @@ -16,7 +16,7 @@ from smdebug.core.reductions import get_numpy_reduction, get_reduction_tensor_name from smdebug.core.tfevent.util import make_numpy_array from smdebug.core.utils import serialize_tf_device -from smdebug.core.writer import FileWriter +from smdebug.core.writer import FileWriter, ShapeWriter # Local from .collection import CollectionKeys, CollectionManager @@ -86,6 +86,7 @@ def __init__( Example -> /job:worker/replica:0/task:1/device:GPU:0 : _job-worker_replica-0_task-1_device-GPU-0""" self.device_map = {} self.writer_map = {} + self.shape_writer_map = {} # This will be None if the var wasn't set, i.e. not param server self.tf_config_json = load_tf_config_json(os.getenv("TF_CONFIG")) self._hook_supported = None @@ -261,7 +262,7 @@ def _set_chief_worker(self): elif self.distribution_strategy == TFDistributionStrategy.UNSUPPORTED: raise NotImplementedError - def _get_writers(self, tensor_name, tensor_ref) -> List[FileWriter]: + def _get_writers(self, tensor_name, tensor_ref, shape_writers=False) -> List[FileWriter]: """ For tensors generated during distributed tf jobs, we map the tensor to a writer with its device attribute. @@ -277,8 +278,8 @@ def _get_writers(self, tensor_name, tensor_ref) -> List[FileWriter]: TFDistributionStrategy.PARAMETER_SERVER, TFDistributionStrategy.HOROVOD, ]: - if (self.save_all_workers is True or self.worker == self.chief_worker) and self.writer: - return [self.writer] + if self.save_all_workers is True or self.worker == self.chief_worker: + return self._get_single_process_writers(shape_writers) elif self.distribution_strategy == TFDistributionStrategy.MIRRORED: if len(self.device_map): # else is for metrics in Keras @@ -289,17 +290,25 @@ def _get_writers(self, tensor_name, tensor_ref) -> List[FileWriter]: # if device str is empty or cpu in worker if not bool(worker) or "CPU" in worker: if self.save_all_workers: - return list(self.writer_map.values()) + if shape_writers is False: + return list(self.writer_map.values()) + else: + return list(self.shape_writer_map.values()) else: - return [self.writer_map[self.device_map[self.chief_worker]]] + if shape_writers is False: + return [self.writer_map[self.device_map[self.chief_worker]]] + else: + return [self.shape_writer_map[self.device_map[self.chief_worker]]] elif self.save_all_workers or worker == self.chief_worker: - return [self.writer_map[self.device_map[worker]]] - elif self.writer: + if shape_writers is False: + return [self.writer_map[self.device_map[worker]]] + else: + return [self.shape_writer_map[self.device_map[worker]]] + else: # training on CPU when all device strings have cpu - return [self.writer] + return self._get_single_process_writers(shape_writers) elif self.distribution_strategy == TFDistributionStrategy.NONE: - if self.writer: - return [self.writer] + return self._get_single_process_writers(shape_writers) else: raise NotImplementedError # when self.writer is None, returns empty list @@ -320,6 +329,13 @@ def _initialize_writers(self, only_initialize_if_missing=False) -> None: self.writer = FileWriter( trial_dir=self.out_dir, step=self.step, worker=self.worker ) + if self._saving_shapes_in_step(): + self.shape_writer = ShapeWriter( + trial_dir=self.out_dir, + step=self.step, + worker=self.worker, + index_writer=self.writer.index_writer, + ) elif self.distribution_strategy == TFDistributionStrategy.MIRRORED: if len(self.device_map): for device, device_string in self.device_map.items(): @@ -329,15 +345,37 @@ def _initialize_writers(self, only_initialize_if_missing=False) -> None: self.writer_map[device_string] = FileWriter( trial_dir=self.out_dir, step=self.step, worker=device_string ) + if self._saving_shapes_in_step(): + self.shape_writer_map[device_string] = ShapeWriter( + trial_dir=self.out_dir, + step=self.step, + worker=self.worker, + index_writer=self.writer_map[device_string].index_writer, + ) else: # training on CPU when all device strings have cpu if self.writer is None or only_initialize_if_missing is False: self.writer = FileWriter( trial_dir=self.out_dir, step=self.step, worker=self.worker ) + if self._saving_shapes_in_step(): + self.shape_writer = ShapeWriter( + trial_dir=self.out_dir, + step=self.step, + worker=self.worker, + index_writer=self.writer.index_writer, + ) + elif self.distribution_strategy == TFDistributionStrategy.NONE: if self.writer is None or only_initialize_if_missing is False: self.writer = FileWriter(trial_dir=self.out_dir, step=self.step, worker=self.worker) + if self._saving_shapes_in_step(): + self.shape_writer = ShapeWriter( + trial_dir=self.out_dir, + step=self.step, + worker=self.worker, + index_writer=self.writer.index_writer, + ) else: raise NotImplementedError @@ -353,25 +391,13 @@ def _close_writers(self) -> None: self.writer.close() self.writer = None - # Delete all the dist training writers - to_delete_writers = [] - for device, writer in self.writer_map.items(): - writer.flush() - writer.close() - to_delete_writers.append(device) - - for device in to_delete_writers: - del self.writer_map[device] - - to_delete_writers = [] - # Delete all the tb writers - for mode, writer in self.tb_writers.items(): - if writer is not None: - writer.flush() - writer.close() - to_delete_writers.append(mode) - for mode in to_delete_writers: - del self.tb_writers[mode] + self._close_given_writer_map(self.writer_map) + self._close_given_writer_map(self.shape_writer_map) + self._close_given_writer_map(self.tb_writers) + + if self.shape_writer is not None: + self.shape_writer.close() + self.shape_writer = None def _export_model(self): tb_writer = self._maybe_get_tb_writer() diff --git a/smdebug/trials/local_trial.py b/smdebug/trials/local_trial.py index 0fa4e76ba..272acff83 100644 --- a/smdebug/trials/local_trial.py +++ b/smdebug/trials/local_trial.py @@ -34,17 +34,11 @@ def __init__( self.index_reader = LocalIndexReader(self.path) self.logger.info(f"Loading trial {name} at path {self.trial_dir}") self._load_collections() - self._load_tensors() + self.refresh_data() def _get_collection_files(self) -> list: return list_collection_files_in_directory(get_path_to_collections(self.path)) - def _load_tensors_from_index_tensors(self, index_tensors_dict): - for tname in index_tensors_dict: - for step, itds in index_tensors_dict[tname].items(): - for worker in itds: - self._add_tensor(int(step), worker, itds[worker]["tensor_location"]) - def _read_collections(self, collection_files): first_collection_file = collection_files[0] # First Collection File self.collection_manager = CollectionManager.load(first_collection_file) diff --git a/smdebug/trials/s3_trial.py b/smdebug/trials/s3_trial.py index 0a3cb6389..374ecdfad 100644 --- a/smdebug/trials/s3_trial.py +++ b/smdebug/trials/s3_trial.py @@ -45,7 +45,7 @@ def __init__( self.path = "s3://" + os.path.join(self.bucket_name, self.prefix_name) self.index_reader = S3IndexReader(self.path) self._load_collections() - self._load_tensors() + self.refresh_data() def _get_collection_files(self) -> list: collection_files, _ = list_s3_objects( @@ -56,12 +56,6 @@ def _get_collection_files(self) -> list: ) return collection_files - def _load_tensors_from_index_tensors(self, index_tensors_dict): - for tname in index_tensors_dict: - for step, itds in index_tensors_dict[tname].items(): - for worker in itds: - self._add_tensor(int(step), worker, itds[worker]["tensor_location"]) - def _read_collections(self, collection_files): first_collection_file = collection_files[0] # First Collection File key = os.path.join(first_collection_file) diff --git a/smdebug/trials/trial.py b/smdebug/trials/trial.py index 5008ef6a7..c1c8d9c84 100644 --- a/smdebug/trials/trial.py +++ b/smdebug/trials/trial.py @@ -14,7 +14,7 @@ TRAINING_END_DELAY_REFRESH_DEFAULT, TRAINING_END_DELAY_REFRESH_KEY, ) -from smdebug.core.locations import IndexFileLocationUtils, TensorLocation +from smdebug.core.locations import IndexFileLocationUtils, TensorLocation, TensorShape from smdebug.core.logger import get_logger from smdebug.core.modes import ModeKeys from smdebug.core.reductions import REDUCTIONS_PREFIX, reverse_reduction_tensor_name @@ -190,7 +190,7 @@ def maybe_refresh(self, name=None): retry_count = 2 while retry_count > 0: if name is None: - self.refresh_tensors() + self.refresh_data() else: self.refresh_tensor(name) if retry_count > 1: @@ -215,7 +215,7 @@ def maybe_refresh(self, name=None): def refresh_tensor(self, tname, steps=None): # for now we load all tensors at once - self.refresh_tensors() + self.refresh_data() def tensor(self, tname): # will not show tensor if it was not written yet @@ -231,14 +231,14 @@ def has_tensor(self, tname): self.maybe_refresh(tname) return tname in self._tensors - def _populate_step_dict(self, tensor_object, step_num): - if tensor_object.mode != ModeKeys.GLOBAL: - if tensor_object.mode not in self._mode_to_global: - self._mode_to_global[tensor_object.mode] = {} - if tensor_object.mode_step not in self._mode_to_global[tensor_object.mode]: - self._mode_to_global[tensor_object.mode][tensor_object.mode_step] = int(step_num) + def _populate_step_dict(self, mode, mode_step, step_num): + if mode != ModeKeys.GLOBAL: + if mode not in self._mode_to_global: + self._mode_to_global[mode] = {} + if mode_step not in self._mode_to_global[mode]: + self._mode_to_global[mode][mode_step] = int(step_num) if step_num not in self._global_to_mode: - self._global_to_mode[step_num] = (tensor_object.mode, tensor_object.mode_step) + self._global_to_mode[step_num] = (mode, mode_step) def _populate_workers_for_global_step(self, step, worker) -> None: """ @@ -263,7 +263,7 @@ def _populate_workers_for_global_step(self, step, worker) -> None: self.last_complete_step = step self.logger.debug(f"Populating last completing step to: {step}") - def _populate_global_step_to_tensor_name_map(self, tensor: TensorLocation, step_num) -> None: + def _populate_global_step_to_tensor_name_map(self, tensorname: str, step_num) -> None: """ The self.global_step_to_tensors_map dictionary holds a mapping of step number and a set of all the tensor names that have been written for the step. @@ -274,47 +274,67 @@ def _populate_global_step_to_tensor_name_map(self, tensor: TensorLocation, step_ """ if step_num not in self.global_step_to_tensors_map: self.global_step_to_tensors_map[step_num] = set() - self.global_step_to_tensors_map[step_num].add(tensor.tensorname) + self.global_step_to_tensors_map[step_num].add(tensorname) - def _populate_mode_to_tensor_name_map(self, tensor: TensorLocation) -> None: + def _populate_mode_to_tensor_name_map(self, tensorname, mode) -> None: """ The self.mode_to_tensors_map dictionary holds a mapping of mode and a set of all the tensor names that have been written for the mode. :param tensor: :return: """ - if tensor.mode != ModeKeys.GLOBAL: - if tensor.mode not in self.mode_to_tensors_map: - self.mode_to_tensors_map[tensor.mode] = set() - self.mode_to_tensors_map[tensor.mode].add(tensor.tensorname) + if mode != ModeKeys.GLOBAL: + if mode not in self.mode_to_tensors_map: + self.mode_to_tensors_map[mode] = set() + self.mode_to_tensors_map[mode].add(tensorname) - def _add_tensor(self, step_num, worker, tensor_object: TensorLocation): + def _load_tensors_from_index_tensors(self, index_tensors_dict): + for tname in index_tensors_dict: + for step, itds in index_tensors_dict[tname].items(): + for worker in itds: + self._add_tensor( + int(step), + worker, + itds[worker].get("tensor_location", None), + itds[worker].get("tensor_shape", None), + ) + + def _add_tensor( + self, step_num, worker, tensor_location: TensorLocation, tensor_shape: TensorShape + ): is_reduction = False - if REDUCTIONS_PREFIX in tensor_object.tensorname: - tname, red_name, abs = reverse_reduction_tensor_name(tensor_object.tensorname) - tensor_object.tensorname = tname - is_reduction = True + if tensor_location is not None: + tensorname = tensor_location.tensorname + mode = tensor_location.mode + mode_step = tensor_location.mode_step + elif tensor_shape is not None: + tensorname = tensor_shape.name + mode = tensor_shape.mode + mode_step = tensor_shape.mode_step else: - tname = tensor_object.tensorname + raise RuntimeError("both tensor_location and tensor_shape can't be None") - if tname not in self._tensors: - tensor = Tensor(tname, trial=self, cache=self.cache) - self._tensors[tname] = tensor + if REDUCTIONS_PREFIX in tensorname: + tensorname, red_name, abs = reverse_reduction_tensor_name(tensorname) + is_reduction = True - tensor = self._tensors[tname] + if tensorname not in self._tensors: + tensor = Tensor(tensorname, trial=self, cache=self.cache) + self._tensors[tensorname] = tensor + + tensor = self._tensors[tensorname] if is_reduction: - tensor.add_reduction_step( - tensor_object.mode, tensor_object.mode_step, worker, red_name, abs, tensor_object - ) + tensor.add_reduction_step(mode, mode_step, worker, red_name, abs, tensor_location) else: - tensor.add_step(tensor_object.mode, tensor_object.mode_step, worker, tensor_object) + # shape can only be passed for actual tensor, not reductions + tensor.add_step(mode, mode_step, worker, tensor_location, tensor_shape) - self._populate_step_dict(tensor_object, step_num) - self._populate_global_step_to_tensor_name_map(tensor_object, step_num) + self._populate_step_dict(mode, mode_step, step_num) + self._populate_global_step_to_tensor_name_map(tensorname, step_num) self._populate_workers_for_global_step(step_num, worker) - self._populate_mode_to_tensor_name_map(tensor_object) + self._populate_mode_to_tensor_name_map(tensorname, mode) def _tensors_matching_regex(self, regex_list) -> set: matched_tensornames = set() @@ -560,10 +580,6 @@ def has_passed_step(self, step, mode=ModeKeys.GLOBAL) -> StepState: return StepState.UNAVAILABLE return StepState.NOT_YET_AVAILABLE - def _load_tensors(self): - if self.index_mode: - self._load_tensors_from_index_files() - def _update_last_index_token(self, new_index_token: str) -> None: """ This function updates the last_index_token in the following scenarios: @@ -625,15 +641,7 @@ def _update_last_index_token(self, new_index_token: str) -> None: f"Updating last_complete_step to: {self.last_complete_step}. " ) - def _load_tensors_from_index_files(self): - self.index_tensors_dict, new_index_token = self.index_reader.load_tensor_data_from_index_files( - start_after_key=self.last_index_token, range_steps=self.range_steps - ) - self._load_tensors_from_index_tensors(self.index_tensors_dict) - if new_index_token: # new index token can be None if there are no new index files - self._update_last_index_token(new_index_token) - - def refresh_tensors(self): + def refresh_data(self): # TODO if job finished if self.index_mode: index_tensors_dict, new_index_token = self.index_reader.load_tensor_data_from_index_files( diff --git a/tests/mxnet/test_hook_reduce_config.py b/tests/mxnet/test_hook_reduce_config.py index 245a16a80..46476414d 100644 --- a/tests/mxnet/test_hook_reduce_config.py +++ b/tests/mxnet/test_hook_reduce_config.py @@ -2,6 +2,9 @@ import shutil from datetime import datetime +# Third Party +from tests.utils import verify_shapes + # First Party from smdebug.mxnet import ReductionConfig, SaveConfig from smdebug.mxnet.hook import Hook as t_hook @@ -24,6 +27,7 @@ def test_save_config(hook=None, out_dir=None): hook = t_hook( out_dir=out_dir, save_config=global_save_config, + save_all=True, include_collections=[ "weights", "biases", @@ -82,6 +86,22 @@ def test_save_config(hook=None, out_dir=None): shutil.rmtree(out_dir) +def test_save_shapes(out_dir): + global_reduce_config = ReductionConfig(save_shape=True) + global_save_config = SaveConfig(save_steps=[0, 1]) + + hook = t_hook( + out_dir=out_dir, + save_config=global_save_config, + save_all=True, + reduction_config=global_reduce_config, + ) + run_mnist_gluon_model(hook=hook, num_steps_train=5) + verify_shapes(out_dir, 0) + verify_shapes(out_dir, 1) + shutil.rmtree(out_dir) + + def test_save_config_hook_from_json(): from smdebug.core.json_config import CONFIG_FILE_PATH_ENV_STR import os diff --git a/tests/pytorch/test_json_configs/test_hook_save_shape.json b/tests/pytorch/test_json_configs/test_hook_save_shape.json new file mode 100644 index 000000000..166051af4 --- /dev/null +++ b/tests/pytorch/test_json_configs/test_hook_save_shape.json @@ -0,0 +1,9 @@ +{ + "S3Path": "s3://kjndjknd_bucket/prefix", + "LocalPath": "/tmp/test_output/test_hook_save_shape/jsonloading", + "HookParameters": { + "save_all": true, + "save_shape": true, + "save_steps": "0,1" + } + } diff --git a/tests/pytorch/test_reduce_config.py b/tests/pytorch/test_reduce_config.py index 230e0410c..de97b30da 100644 --- a/tests/pytorch/test_reduce_config.py +++ b/tests/pytorch/test_reduce_config.py @@ -5,7 +5,10 @@ # Third Party import torch +import torch.nn as nn +import torch.nn.functional as F import torch.optim as optim +from tests.utils import verify_shapes # First Party from smdebug.pytorch import ReductionConfig, SaveConfig @@ -86,6 +89,85 @@ def test_reduce_config(hook=None, out_dir=None): shutil.rmtree(out_dir) +def test_save_shapes(hook=None, out_dir=None): + class ChildA(nn.Module): + def __init__(self): + super(ChildA, self).__init__() + self.child2 = ChildB() + self.relu0 = nn.ReLU() + + def forward(self, x): + return self.relu0(self.child2(x)) + + class ChildB(nn.Module): + def __init__(self): + super(ChildB, self).__init__() + self.conv1 = nn.Conv2d(1, 20, 5, 1) + + def forward(self, x): + return self.conv1(x) + + class NestedNet(nn.Module): + def __init__(self): + super(NestedNet, self).__init__() + self.child1 = ChildA() + self.max_pool = nn.MaxPool2d(2, stride=2) + self.conv2 = nn.Conv2d(20, 50, 5, 1) + relu_module = nn.ReLU() + self.relu1 = nn.ReLU() + self.max_pool2 = nn.MaxPool2d(2, stride=2) + self.fc1 = nn.Linear(4 * 4 * 50, 500) + self.relu2 = nn.ReLU() + self.fc2 = nn.Linear(500, 10) + + def forward(self, x): + x = self.child1(x) + x = self.max_pool(x) + x = self.relu1(self.conv2(x)) + x = self.max_pool2(x) + x = x.view(-1, 4 * 4 * 50) + x = self.relu2(self.fc1(x)) + x = self.fc2(x) + return F.log_softmax(x, dim=1) + + hook_created = False + if hook is None: + global_reduce_config = ReductionConfig(save_shape=True) + global_save_config = SaveConfig(save_steps=[0]) + + run_id = "trial_" + datetime.now().strftime("%Y%m%d-%H%M%S%f") + out_dir = "/tmp/" + run_id + hook = t_hook( + out_dir=out_dir, + save_config=global_save_config, + save_all=True, + reduction_config=global_reduce_config, + ) + hook_created = True + + model = NestedNet().to(torch.device("cpu")) + hook.register_module(model) + optimizer = optim.SGD(model.parameters(), lr=0.001, momentum=0.9) + train(model, hook, torch.device("cpu"), optimizer, num_steps=10) + # different versions seem to output different number of loss tensors + verify_shapes(out_dir, 0) + if hook_created: + shutil.rmtree(out_dir) + + +def test_save_shapes_json(): + from smdebug.core.json_config import CONFIG_FILE_PATH_ENV_STR + + out_dir = "/tmp/test_output/test_hook_save_shape/jsonloading" + shutil.rmtree(out_dir, True) + os.environ[ + CONFIG_FILE_PATH_ENV_STR + ] = "tests/pytorch/test_json_configs/test_hook_save_shape.json" + hook = t_hook.create_from_json_file() + test_save_shapes(hook=hook, out_dir=out_dir) + shutil.rmtree(out_dir, True) + + # Test creating hook by loading the json file with reduction configs. def test_reduce_config_with_json(): from smdebug.core.json_config import CONFIG_FILE_PATH_ENV_STR diff --git a/tests/tensorflow/hooks/test_estimator_modes.py b/tests/tensorflow/hooks/test_estimator_modes.py index e7bd40945..b7de19d6e 100644 --- a/tests/tensorflow/hooks/test_estimator_modes.py +++ b/tests/tensorflow/hooks/test_estimator_modes.py @@ -18,6 +18,7 @@ import pytest import tensorflow as tf from tests.analysis.utils import delete_s3_prefix +from tests.utils import verify_shapes # First Party import smdebug.tensorflow as smd @@ -30,10 +31,12 @@ def help_test_mnist( path, save_config=None, + reduction_config=None, hook=None, set_modes=True, num_steps=10, num_eval_steps=None, + save_all=False, steps=None, include_collections=None, ): @@ -125,7 +128,11 @@ def cnn_model_fn(features, labels, mode): if include_collections is None: include_collections = ["weights", "gradients", "default", "losses"] hook = smd.SessionHook( - out_dir=trial_dir, save_config=save_config, include_collections=include_collections + out_dir=trial_dir, + save_config=save_config, + include_collections=include_collections, + save_all=save_all, + reduction_config=reduction_config, ) if num_eval_steps is None: @@ -187,6 +194,29 @@ def test_mnist(out_dir, on_s3=False): helper_test_mnist_trial(out_dir) +@pytest.mark.slow # 0:02 to run +def test_mnist_shapes(out_dir, on_s3=False): + if on_s3: + run_id = "trial_" + datetime.now().strftime("%Y%m%d-%H%M%S%f") + bucket = "smdebug-testing" + prefix = "outputs/hooks/estimator_modes/" + run_id + out_dir = f"s3://{bucket}/{prefix}" + help_test_mnist( + out_dir, + save_all=True, + save_config=smd.SaveConfig(save_steps=[0]), + num_steps=1, + steps=None, + reduction_config=smd.ReductionConfig(save_shape=True), + ) + verify_shapes(out_dir, 0) + + +@pytest.mark.slow # 0:02 to run +def test_mnist_shapes_s3(out_dir): + test_mnist_shapes(out_dir, on_s3=True) + + @pytest.mark.slow # 0:02 to run def test_mnist_local_json(out_dir, monkeypatch): monkeypatch.setenv( diff --git a/tests/tensorflow/hooks/test_reductions.py b/tests/tensorflow/hooks/test_reductions.py index 4fde66a49..e009f4565 100644 --- a/tests/tensorflow/hooks/test_reductions.py +++ b/tests/tensorflow/hooks/test_reductions.py @@ -1,5 +1,8 @@ # Standard Library +# Third Party +from tests.utils import verify_shapes + # First Party import smdebug.tensorflow as smd from smdebug.core.json_config import CONFIG_FILE_PATH_ENV_STR @@ -57,6 +60,19 @@ def test_reductions(out_dir, save_raw_tensor=False): helper_test_reductions(out_dir, hook, save_raw_tensor) +def test_shapes(out_dir, save_raw_tensor=False): + pre_test_clean_up() + rdnc = smd.ReductionConfig(save_shape=True, save_raw_tensor=save_raw_tensor) + hook = smd.SessionHook( + out_dir=out_dir, + save_config=smd.SaveConfig(save_interval=1), + reduction_config=rdnc, + include_collections=["weights", "gradients", "losses"], + ) + simple_model(hook) + verify_shapes(out_dir, 0) + + def test_reductions_with_raw_tensor(out_dir): test_reductions(out_dir, save_raw_tensor=True) diff --git a/tests/tensorflow/keras/test_keras.py b/tests/tensorflow/keras/test_keras.py index 2fcb379f6..bfd5e7cc7 100644 --- a/tests/tensorflow/keras/test_keras.py +++ b/tests/tensorflow/keras/test_keras.py @@ -5,6 +5,7 @@ import pytest import tensorflow as tf from tests.tensorflow.utils import create_trial_fast_refresh +from tests.utils import verify_shapes # First Party from smdebug.core.access_layer import has_training_ended @@ -224,6 +225,20 @@ def test_tf_keras(out_dir): exhaustive_check(out_dir, True) +@pytest.mark.slow # 0:07 to run +def test_tf_keras_shapes(out_dir): + train_model( + out_dir, + save_all=True, + reduction_config=ReductionConfig(save_shape=True), + use_tf_keras=True, + save_config=SaveConfig(save_steps=[0, 10]), + eager=False, + steps=["train", "eval", "predict", "train"], + ) + verify_shapes(out_dir, 0) + + @pytest.mark.slow # 0:03 to run def test_tf_keras_non_keras_opt(out_dir): include_collections = [ diff --git a/tests/tensorflow2/test_keras.py b/tests/tensorflow2/test_keras.py index 4323b5284..3df832404 100644 --- a/tests/tensorflow2/test_keras.py +++ b/tests/tensorflow2/test_keras.py @@ -16,7 +16,7 @@ from tests.constants import TEST_DATASET_S3_PATH from tests.tensorflow2.utils import is_tf_2_2, is_tf_2_3 from tests.tensorflow.utils import create_trial_fast_refresh -from tests.utils import use_s3_datasets +from tests.utils import use_s3_datasets, verify_shapes # First Party import smdebug.tensorflow as smd @@ -183,6 +183,18 @@ def helper_keras_gradtape( hook.close() +def test_keras_gradtape_shapes(out_dir): + hook = smd.KerasHook( + out_dir=out_dir, + save_all=True, + save_config=SaveConfig(save_steps=[0]), + reduction_config=ReductionConfig(save_shape=True), + ) + helper_keras_gradtape(trial_dir=out_dir, hook=hook) + verify_shapes(out_dir, 0) + verify_shapes(out_dir, 500) + + @pytest.mark.skip_if_non_eager @pytest.mark.slow @pytest.mark.parametrize("saveall", [True, False]) @@ -453,6 +465,18 @@ def test_keras_fit(out_dir, tf_eager_mode, saveall): assert trial.tensor(tname).value(0) is not None +def test_keras_fit_shapes(out_dir): + hook = smd.KerasHook( + out_dir=out_dir, + save_all=True, + save_config=SaveConfig(save_steps=[0]), + reduction_config=ReductionConfig(save_shape=True), + ) + helper_keras_fit(trial_dir=out_dir, hook=hook) + print(create_trial_fast_refresh(out_dir).tensor_names(step=0)) + verify_shapes(out_dir, 0) + + @pytest.mark.slow def test_base_reductions(out_dir, tf_eager_mode): helper_keras_fit( diff --git a/tests/tensorflow2/test_keras_mirrored.py b/tests/tensorflow2/test_keras_mirrored.py index d857218ab..3f0bafe4f 100644 --- a/tests/tensorflow2/test_keras_mirrored.py +++ b/tests/tensorflow2/test_keras_mirrored.py @@ -13,6 +13,7 @@ from tests.core.utils import verify_files from tests.tensorflow2.utils import is_tf_2_2, is_tf_2_3 from tests.tensorflow.utils import create_trial_fast_refresh +from tests.utils import verify_shapes # First Party import smdebug.tensorflow as smd @@ -290,6 +291,20 @@ def test_save_all(out_dir, tf_eager_mode, workers): verify_files(out_dir, save_config, saved_scalars) +@pytest.mark.slow +def test_shapes(out_dir, tf_eager_mode): + strategy, _ = train_model( + out_dir, + save_all=True, + save_config=SaveConfig(save_steps=[0]), + reduction_config=ReductionConfig(save_shape=True), + steps=["train"], + eager=tf_eager_mode, + ) + multiworker = strategy.num_replicas_in_sync > 1 + verify_shapes(out_dir, 0, multiworker=multiworker) + + @pytest.mark.slow def test_base_reductions(out_dir, tf_eager_mode): train_model( diff --git a/tests/utils.py b/tests/utils.py index d5db2a8ba..af827f264 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -5,16 +5,20 @@ # Third Party import boto3 +import numpy as np from tests.constants import TEST_DATASET_S3_PATH +from tests.tensorflow.utils import create_trial_fast_refresh # First Party from smdebug.core.config_constants import ( CONFIG_FILE_PATH_ENV_STR, DEFAULT_SAGEMAKER_OUTDIR, DEFAULT_SAGEMAKER_TENSORBOARD_PATH, + DEFAULT_WORKER_NAME, TENSORBOARD_CONFIG_FILE_PATH_ENV_STR, ) from smdebug.core.utils import is_s3, remove_file_if_exists +from smdebug.exceptions import TensorUnavailableForStep def use_s3_datasets(): @@ -27,6 +31,46 @@ def use_s3_datasets(): return False +def is_scalar(x): + if isinstance(x, list): + if len(x) == 1: + return True + elif isinstance(x, np.ndarray): + return True + return False + + +def verify_shapes(out_dir, step_num, multiworker=False): + trial = create_trial_fast_refresh(out_dir) + for tname in trial.tensor_names(step=step_num): + tensor = trial.tensor(tname) + if multiworker is False: + assert isinstance(tensor.shape(step_num), tuple), (tname, tensor.shape(step_num)) + try: + if not is_scalar(tensor.value(step_num)): + # test did not save value except scalars which dont use reduction config + # so it should raise the below exception + assert False + except TensorUnavailableForStep: + pass + else: + workers = tensor.workers(step_num) + assert len(workers) > 1 + for w in workers: + try: + if not is_scalar(tensor.value(step_num, worker=w)): + # test did not save value so it should raise the below exception + assert False + except TensorUnavailableForStep: + pass + + assert isinstance(tensor.shape(step_num, worker=w), tuple), ( + tname, + w, + tensor.shape(step_num, worker=w), + ) + + class SagemakerSimulator(object): """ Creates an environment variable pointing to a JSON config file, and creates the config file.