Skip to content

Commit 7a87522

Browse files
authored
Dataloader analysis for PyTorch (#64)
* Adding the functions to get the dataloader events for pytorch * Adding the training script and notebook for dataloader analysis * Fixed the timeconversion from timestamp to UTC and fixed the local reader for system tracefiles. * Updating the dataloader analysis notebook * Updated the notebook with analysis for batch processing. * Updated notebook to display python profiler stats. * Updated the notebook with documenation and layout * Updated the notebook to have static contents * Updating the notebook to handle absence of traceevents * FIxed the tracevents as per the current format and added notebook for triggering the pytorch training jobs * Moved the analysis functions from notebook to a class * Updated the utility functions to retrieve the dataloader events * Added the test scripts for horovod and distributed training * Adding a script that uses dummy custom dataloader * Addressed the review comments * Updated the utility code and added a training script that uses custom datasets * Added hyper parameteres for custom dataset training.
1 parent 918c2b3 commit 7a87522

16 files changed

+1773
-104
lines changed
Lines changed: 320 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,320 @@
1+
#!/usr/bin/env python
2+
# coding: utf-8
3+
4+
# Standard Library
5+
# In[1]:
6+
import argparse
7+
import logging
8+
import os
9+
from functools import partial
10+
from typing import List, Optional, Tuple, Union
11+
12+
# Third Party
13+
import numpy as np
14+
import pandas as pd
15+
import torch
16+
import torch.nn as nn
17+
from PIL import Image
18+
from sklearn.metrics import roc_auc_score
19+
from torch import Tensor
20+
from torch.autograd import Variable
21+
from torch.optim import Adam
22+
from torch.utils.data import DataLoader, Dataset
23+
from torchvision import transforms
24+
from torchvision.models.resnet import BasicBlock, ResNet
25+
26+
# First Party
27+
from smdebug.profiler.utils import str2bool
28+
from smdebug.pytorch import get_hook
29+
30+
DATA_FOLDER = "/home/ubuntu/pytorch_tests/histo"
31+
LABELS = f"{DATA_FOLDER}/train_labels.csv"
32+
TRAIN_IMAGES_FOLDER = f"{DATA_FOLDER}/train"
33+
USE_GPU = torch.cuda.is_available()
34+
logging.basicConfig(level="INFO")
35+
logger = logging.getLogger()
36+
37+
38+
def read_labels(path_to_file: str) -> pd.DataFrame:
39+
labels = pd.read_csv(path_to_file)
40+
return labels
41+
42+
43+
def format_labels_for_dataset(labels: pd.DataFrame) -> np.array:
44+
return labels["label"].values.reshape(-1, 1)
45+
46+
47+
def format_path_to_images_for_dataset(labels: pd.DataFrame, path: str) -> List:
48+
return [os.path.join(path, f"{f}.tif") for f in labels["id"].values]
49+
50+
51+
def train_valid_split(df: pd.DataFrame) -> Tuple:
52+
limit_df = 50000
53+
df = df.sample(n=df.shape[0])
54+
df = df.iloc[:limit_df]
55+
split = 40000
56+
train = df.iloc[:split]
57+
valid = df.iloc[:split]
58+
return train, valid
59+
60+
61+
class MainDataset(Dataset):
62+
def __init__(self, x_dataset: Dataset, y_dataset: Dataset, x_tfms: Optional = None):
63+
self.x_dataset = x_dataset
64+
self.y_dataset = y_dataset
65+
self.x_tfms = x_tfms
66+
67+
def __len__(self) -> int:
68+
return self.x_dataset.__len__()
69+
70+
def __getitem__(self, index: int) -> Tuple:
71+
x = self.x_dataset[index]
72+
y = self.y_dataset[index]
73+
if self.x_tfms is not None:
74+
x = self.x_tfms(x)
75+
return x, y
76+
77+
78+
class ImageDataset(Dataset):
79+
def __init__(self, paths_to_imgs: List):
80+
self.paths_to_imgs = paths_to_imgs
81+
82+
def __len__(self) -> int:
83+
return len(self.paths_to_imgs)
84+
85+
def __getitem__(self, index: int) -> Image.Image:
86+
img = Image.open(self.paths_to_imgs[index])
87+
return img
88+
89+
90+
class LabelDataset(Dataset):
91+
def __init__(self, labels: List):
92+
self.labels = labels
93+
94+
def __len__(self) -> int:
95+
return len(self.labels)
96+
97+
def __getitem__(self, index: int) -> int:
98+
return self.labels[index]
99+
100+
101+
def to_gpu(tensor):
102+
return tensor.cuda() if USE_GPU else tensor
103+
104+
105+
def T(tensor):
106+
if not torch.is_tensor(tensor):
107+
tensor = torch.FloatTensor(tensor)
108+
else:
109+
tensor = tensor.type(torch.FloatTensor)
110+
if USE_GPU:
111+
tensor = to_gpu(tensor)
112+
return tensor
113+
114+
115+
def predict(model, dataloader):
116+
model.eval()
117+
y_true, y_hat = [], []
118+
with torch.no_grad():
119+
for x, y in dataloader:
120+
x = Variable(T(x))
121+
y = Variable(T(y))
122+
output = model(x)
123+
y_true.append(to_numpy(y))
124+
y_hat.append(to_numpy(output))
125+
return y_true, y_hat
126+
127+
128+
def iteration_trigger(iteration, every_x_iterations):
129+
if every_x_iterations == 1:
130+
return True
131+
elif iteration > 0 and iteration % every_x_iterations == 0:
132+
return True
133+
else:
134+
return False
135+
136+
137+
def init_triggers(step=1, valid=10, train=10):
138+
do_step_trigger = partial(iteration_trigger, every_x_iterations=step)
139+
valid_loss_trigger = partial(iteration_trigger, every_x_iterations=valid)
140+
train_loss_trigger = partial(iteration_trigger, every_x_iterations=train)
141+
return do_step_trigger, valid_loss_trigger, train_loss_trigger
142+
143+
144+
def auc_writer(y_true, y_hat, iteration):
145+
try:
146+
auc = roc_auc_score(np.vstack(y_true), np.vstack(y_hat))
147+
except:
148+
auc = -1
149+
logger.info(f"iteration: {iteration}, auc: {auc}")
150+
151+
152+
def create_resnet9_model(output_dim: int = 1) -> nn.Module:
153+
model = ResNet(BasicBlock, [1, 1, 1, 1])
154+
in_features = model.fc.in_features
155+
model.avgpool = nn.AdaptiveAvgPool2d(1)
156+
model.fc = nn.Linear(in_features, output_dim)
157+
model = to_gpu(model)
158+
return model
159+
160+
161+
def to_numpy(tensor: Union[Tensor, Image.Image, np.array]) -> np.ndarray:
162+
if type(tensor) == np.array or type(tensor) == np.ndarray:
163+
return np.array(tensor)
164+
elif type(tensor) == Image.Image:
165+
return np.array(tensor)
166+
elif type(tensor) == Tensor:
167+
return tensor.cpu().detach().numpy()
168+
else:
169+
msg = "Input parameter is not of a valid datatype."
170+
raise ValueError(msg)
171+
172+
173+
def train_one_epoch(
174+
model,
175+
train_dataloader,
176+
valid_dataloader,
177+
loss,
178+
optimizer,
179+
do_step_trigger,
180+
valid_loss_trigger,
181+
train_loss_trigger,
182+
):
183+
model.train()
184+
y_true_train, y_hat_train = [], []
185+
for iteration, (x, y) in enumerate(train_dataloader):
186+
x = Variable(T(x), requires_grad=True)
187+
y = Variable(T(y), requires_grad=True)
188+
output = model(x)
189+
y_true_train.append(to_numpy(y))
190+
y_hat_train.append(to_numpy(output))
191+
loss_values = loss(output, y)
192+
loss_values.backward()
193+
if do_step_trigger(iteration):
194+
optimizer.step()
195+
optimizer.zero_grad()
196+
if train_loss_trigger(iteration):
197+
auc_writer(y_true_train, y_hat_train, iteration)
198+
y_true_train, y_hat_train = [], []
199+
if valid_loss_trigger(iteration):
200+
y_true, y_hat = predict(model, valid_dataloader)
201+
auc_writer(y_true, y_hat, iteration)
202+
return
203+
204+
205+
def train(
206+
model,
207+
train_dataloader,
208+
valid_dataloader,
209+
loss,
210+
optimizer,
211+
do_step_trigger,
212+
valid_loss_trigger,
213+
train_loss_trigger,
214+
epoch,
215+
):
216+
print(f"Training the model for {epoch}")
217+
for i in range(epoch):
218+
train_one_epoch(
219+
model,
220+
train_dataloader,
221+
valid_dataloader,
222+
loss,
223+
optimizer,
224+
do_step_trigger,
225+
valid_loss_trigger,
226+
train_loss_trigger,
227+
)
228+
return
229+
230+
231+
def main():
232+
parser = argparse.ArgumentParser(description="Training with histopathologic data")
233+
parser.add_argument("--batch_size", type=int, default=512)
234+
parser.add_argument("--epoch", type=int, default=1)
235+
parser.add_argument("--gpu", type=str2bool, default=True)
236+
parser.add_argument("--workers", type=int, default=4)
237+
parser.add_argument("--pin_memory", type=str2bool, default=True)
238+
parser.add_argument("--data_folder", type=str, default="/opt/ml/input/data/training")
239+
240+
args = parser.parse_args()
241+
242+
global DATA_FOLDER
243+
DATA_FOLDER = args.data_folder
244+
global LABELS
245+
LABELS = f"{DATA_FOLDER}/train_labels.csv"
246+
global TRAIN_IMAGES_FOLDER
247+
TRAIN_IMAGES_FOLDER = f"{DATA_FOLDER}/train"
248+
global USE_GPU
249+
if args.gpu:
250+
USE_GPU = torch.cuda.is_available()
251+
else:
252+
USE_GPU = False
253+
254+
hook = get_hook(create_if_not_exists=True)
255+
labels = read_labels(LABELS)
256+
train, valid = train_valid_split(labels)
257+
258+
train_labels = format_labels_for_dataset(train)
259+
valid_labels = format_labels_for_dataset(valid)
260+
261+
train_images = format_path_to_images_for_dataset(train, TRAIN_IMAGES_FOLDER)
262+
valid_images = format_path_to_images_for_dataset(valid, TRAIN_IMAGES_FOLDER)
263+
264+
train_images_dataset = ImageDataset(train_images)
265+
valid_images_dataset = ImageDataset(valid_images)
266+
train_labels_dataset = LabelDataset(train_labels)
267+
valid_labels_dataset = LabelDataset(valid_labels)
268+
269+
x_tfms = transforms.Compose(
270+
[
271+
transforms.ToTensor(),
272+
transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
273+
]
274+
)
275+
276+
train_dataset = MainDataset(train_images_dataset, train_labels_dataset, x_tfms)
277+
valid_dataset = MainDataset(valid_images_dataset, valid_labels_dataset, x_tfms)
278+
279+
shuffle = True
280+
batch_size = args.batch_size
281+
num_workers = args.workers
282+
283+
train_dataloader = DataLoader(
284+
train_dataset,
285+
batch_size=batch_size,
286+
shuffle=shuffle,
287+
num_workers=num_workers,
288+
pin_memory=args.pin_memory,
289+
)
290+
valid_dataloader = DataLoader(
291+
valid_dataset,
292+
batch_size=batch_size,
293+
shuffle=shuffle,
294+
num_workers=num_workers,
295+
pin_memory=args.pin_memory,
296+
)
297+
298+
resnet9 = create_resnet9_model(output_dim=1)
299+
300+
lr = 1e-3
301+
optimizer = Adam(resnet9.parameters(), lr=lr)
302+
loss = nn.BCEWithLogitsLoss()
303+
304+
do_step_trigger, valid_loss_trigger, train_loss_trigger = init_triggers(1, 20, 10)
305+
306+
train_one_epoch(
307+
resnet9,
308+
train_dataloader,
309+
valid_dataloader,
310+
loss,
311+
optimizer,
312+
do_step_trigger,
313+
valid_loss_trigger,
314+
train_loss_trigger,
315+
)
316+
print(f"Training complete.")
317+
318+
319+
if __name__ == "__main__":
320+
main()
Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
# Standard Library
2+
import argparse
3+
import os
4+
import subprocess
5+
import sys
6+
from distutils.util import strtobool
7+
8+
# Third Party
9+
from torch.cuda import device_count
10+
11+
HOROVOD_PYTORCH_TEST_MNIST_SCRIPT = "./horovod_mnist.py"
12+
13+
14+
HOROVOD_MNIST_SCRIPT_NAME = "horovod_mnist.py"
15+
16+
17+
def launch_horovod_job(script_file_path, script_args, num_workers, smprofile_path, mode):
18+
command = ["mpirun", "-np", str(num_workers)] + [sys.executable, script_file_path] + script_args
19+
env_dict = os.environ.copy()
20+
env_dict["HOROVOD_TIMELINE"] = f"{smprofile_path}"
21+
if mode == "cpu":
22+
env_dict["CUDA_VISIBLE_DEVICES"] = "-1"
23+
subprocess.check_call(command, env=env_dict)
24+
25+
26+
def main():
27+
parser = argparse.ArgumentParser(description="Launch horovod test")
28+
parser.add_argument("--script", type=str, default=HOROVOD_PYTORCH_TEST_MNIST_SCRIPT)
29+
parser.add_argument("--batch_size", type=int, default=128)
30+
parser.add_argument("--epoch", type=int, default=5)
31+
parser.add_argument("--gpu", type=strtobool, default=1)
32+
parser.add_argument("--profile_path", type=str, default="./hvd_timeline.json")
33+
parser.add_argument("--model", type=str, default="resnet50")
34+
opt = parser.parse_args()
35+
36+
if opt.gpu == 1:
37+
mode = "gpu"
38+
else:
39+
mode = "cpu"
40+
num_workers = 1 if bool(device_count()) is False else device_count()
41+
print(f"Number of workers = {num_workers}")
42+
mode_args = []
43+
if mode == "cpu":
44+
mode_args += ["--use_only_cpu", "true"]
45+
mode_args += [
46+
"--epochs",
47+
str(opt.epoch),
48+
"--batch_size",
49+
str(opt.batch_size),
50+
"--model",
51+
str(opt.model),
52+
]
53+
launch_horovod_job(
54+
script_file_path=opt.script,
55+
script_args=mode_args,
56+
num_workers=num_workers,
57+
smprofile_path=opt.profile_path,
58+
mode=mode,
59+
)
60+
61+
62+
if __name__ == "__main__":
63+
main()

0 commit comments

Comments
 (0)