Skip to content

Commit b2c6c33

Browse files
committed
precommit
1 parent 4cf3027 commit b2c6c33

File tree

3 files changed

+149
-40
lines changed

3 files changed

+149
-40
lines changed

examples/python/remote_storage_example/README.md

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -113,16 +113,16 @@ Remote reads are implemented as a read from storage followed by a network write.
113113

114114
Remote writes are implemented as a read from network following by a storage write.
115115

116-
### Pipelining
116+
### Pipelining
117117

118-
To improve performance of the remote storage server, we can pipeline operations to network and storage. This pipelining allows multiple threads to handle each request. However, in order to maintain correctness, the order of network and storage must happen in order for each individual remote storage operation. To do this, we implemented a simple pipelining scheme.
118+
To improve performance of the remote storage server, we can pipeline operations to network and storage. This pipelining allows multiple threads to handle each request. However, in order to maintain correctness, the order of network and storage must happen in order for each individual remote storage operation. To do this, we implemented a simple pipelining scheme.
119119

120120
![Remote Operation Pipelines](storage_pipelines.png)
121121

122122
### Performance Tips
123123

124124
For high-speed storage and network hardware, you may need to tweak performance with a couple of environment variables.
125125

126-
First, for optimal GDS performance, ensure you are using the GDS_MT backend with default concurrency. Additionally, you can use the cufile options described in the [GDS README](https://github.com/ai-dynamo/nixl/blob/main/src/plugins/cuda_gds/README.md).
126+
First, for optimal GDS performance, ensure you are using the GDS_MT backend with default concurrency. Additionally, you can use the cufile options described in the [GDS README](https://github.com/ai-dynamo/nixl/blob/main/src/plugins/cuda_gds/README.md).
127127

128-
On the network side, remote reads from VRAM to DRAM can be limited by UCX rail selection. This can be tweaked by setting UCX_MAX_RMA_RAILS=1. However, with larger batch or message sizes, this might limit bandwidth and a higher number of rails might be needed.
128+
On the network side, remote reads from VRAM to DRAM can be limited by UCX rail selection. This can be tweaked by setting UCX_MAX_RMA_RAILS=1. However, with larger batch or message sizes, this might limit bandwidth and a higher number of rails might be needed.

examples/python/remote_storage_example/nixl_p2p_storage_example.py

Lines changed: 141 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -28,15 +28,21 @@
2828
logger = get_logger(__name__)
2929

3030

31-
def execute_transfer(my_agent, local_descs, remote_descs, remote_name, operation, use_backends = []):
31+
def execute_transfer(
32+
my_agent, local_descs, remote_descs, remote_name, operation, use_backends=[]
33+
):
3234

33-
handle = my_agent.initialize_xfer(operation, local_descs, remote_descs, remote_name, backends=use_backends)
35+
handle = my_agent.initialize_xfer(
36+
operation, local_descs, remote_descs, remote_name, backends=use_backends
37+
)
3438
my_agent.transfer(handle)
3539
nsu.wait_for_transfer(my_agent, handle)
3640
my_agent.release_xfer_handle(handle)
3741

3842

39-
def remote_storage_transfer(my_agent, my_mem_descs, operation, remote_agent_name, iterations):
43+
def remote_storage_transfer(
44+
my_agent, my_mem_descs, operation, remote_agent_name, iterations
45+
):
4046
"""Initiate remote memory transfer."""
4147
if operation != "READ" and operation != "WRITE":
4248
logger.error("Invalid operation, exiting")
@@ -88,62 +94,145 @@ def connect_to_agents(my_agent, agents_file):
8894

8995
return target_agents
9096

91-
def pipeline_reads(my_agent, req_agent, my_mem_descs, my_file_descs, sent_descs, iterations):
97+
98+
def pipeline_reads(
99+
my_agent, req_agent, my_mem_descs, my_file_descs, sent_descs, iterations
100+
):
92101

93102
with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:
94103
n = 0
95104
s = 0
96105
futures = []
97106

98107
while n < iterations and s < iterations:
99-
108+
100109
if s == 0:
101-
futures.append(executor.submit(execute_transfer, my_agent, my_mem_descs, my_file_descs, my_agent.name, "READ"))
102-
s+=1
110+
futures.append(
111+
executor.submit(
112+
execute_transfer,
113+
my_agent,
114+
my_mem_descs,
115+
my_file_descs,
116+
my_agent.name,
117+
"READ",
118+
)
119+
)
120+
s += 1
103121
continue
104122

105123
if s == iterations:
106-
futures.append(executor.submit(execute_transfer, my_agent, my_mem_descs, sent_descs, req_agent, "WRITE"))
107-
n+=1
124+
futures.append(
125+
executor.submit(
126+
execute_transfer,
127+
my_agent,
128+
my_mem_descs,
129+
sent_descs,
130+
req_agent,
131+
"WRITE",
132+
)
133+
)
134+
n += 1
108135
continue
109136

110137
# Do two storage and network in parallel
111-
futures.append(executor.submit(execute_transfer, my_agent, my_mem_descs, my_file_descs, my_agent.name, "READ"))
112-
futures.append(executor.submit(execute_transfer, my_agent, my_mem_descs, sent_descs, req_agent, "WRITE"))
113-
s+=1
114-
n+=1
138+
futures.append(
139+
executor.submit(
140+
execute_transfer,
141+
my_agent,
142+
my_mem_descs,
143+
my_file_descs,
144+
my_agent.name,
145+
"READ",
146+
)
147+
)
148+
futures.append(
149+
executor.submit(
150+
execute_transfer,
151+
my_agent,
152+
my_mem_descs,
153+
sent_descs,
154+
req_agent,
155+
"WRITE",
156+
)
157+
)
158+
s += 1
159+
n += 1
115160

116-
_, not_done = concurrent.futures.wait(futures, return_when=concurrent.futures.ALL_COMPLETED)
161+
_, not_done = concurrent.futures.wait(
162+
futures, return_when=concurrent.futures.ALL_COMPLETED
163+
)
117164
assert not not_done
118165

119-
def pipeline_writes(my_agent, req_agent, my_mem_descs, my_file_descs, sent_descs, iterations):
166+
167+
def pipeline_writes(
168+
my_agent, req_agent, my_mem_descs, my_file_descs, sent_descs, iterations
169+
):
120170

121171
with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:
122172
n = 0
123173
s = 0
124174
futures = []
125175

126176
while n < iterations and s < iterations:
127-
177+
128178
if s == 0:
129-
futures.append(executor.submit(execute_transfer, my_agent, my_mem_descs, sent_descs, req_agent, "READ"))
130-
s+=1
179+
futures.append(
180+
executor.submit(
181+
execute_transfer,
182+
my_agent,
183+
my_mem_descs,
184+
sent_descs,
185+
req_agent,
186+
"READ",
187+
)
188+
)
189+
s += 1
131190
continue
132191

133192
if s == iterations:
134-
futures.append(executor.submit(execute_transfer, my_agent, my_mem_descs, my_file_descs, my_agent.name, "WRITE"))
135-
n+=1
193+
futures.append(
194+
executor.submit(
195+
execute_transfer,
196+
my_agent,
197+
my_mem_descs,
198+
my_file_descs,
199+
my_agent.name,
200+
"WRITE",
201+
)
202+
)
203+
n += 1
136204
continue
137205

138206
# Do two storage and network in parallel
139-
futures.append(executor.submit(execute_transfer, my_agent, my_mem_descs, sent_descs, req_agent, "READ"))
140-
futures.append(executor.submit(execute_transfer, my_agent, my_mem_descs, my_file_descs, my_agent.name, "WRITE"))
141-
s+=1
142-
n+=1
207+
futures.append(
208+
executor.submit(
209+
execute_transfer,
210+
my_agent,
211+
my_mem_descs,
212+
sent_descs,
213+
req_agent,
214+
"READ",
215+
)
216+
)
217+
futures.append(
218+
executor.submit(
219+
execute_transfer,
220+
my_agent,
221+
my_mem_descs,
222+
my_file_descs,
223+
my_agent.name,
224+
"WRITE",
225+
)
226+
)
227+
s += 1
228+
n += 1
143229

144-
_, not_done = concurrent.futures.wait(futures, return_when=concurrent.futures.ALL_COMPLETED)
230+
_, not_done = concurrent.futures.wait(
231+
futures, return_when=concurrent.futures.ALL_COMPLETED
232+
)
145233
assert not not_done
146234

235+
147236
def handle_remote_transfer_request(my_agent, my_mem_descs, my_file_descs):
148237
"""Handle remote memory and storage transfers as target."""
149238
# Wait for initiator to send list of memory descriptors
@@ -171,29 +260,36 @@ def handle_remote_transfer_request(my_agent, my_mem_descs, my_file_descs):
171260
sent_descs = my_agent.deserialize_descs(recv_msg[8:])
172261

173262
if operation == "READ":
174-
pipeline_reads(my_agent, req_agent, my_mem_descs, my_file_descs, sent_descs, iterations)
263+
pipeline_reads(
264+
my_agent, req_agent, my_mem_descs, my_file_descs, sent_descs, iterations
265+
)
175266
elif operation == "WRITE":
176-
pipeline_writes(my_agent, req_agent, my_mem_descs, my_file_descs, sent_descs, iterations)
177-
267+
pipeline_writes(
268+
my_agent, req_agent, my_mem_descs, my_file_descs, sent_descs, iterations
269+
)
270+
178271
# Send completion notification to initiator
179272
my_agent.send_notif(req_agent, b"COMPLETE")
180273

181-
def run_client(my_agent, nixl_mem_reg_descs, nixl_file_reg_descs, agents_file, iterations):
274+
275+
def run_client(
276+
my_agent, nixl_mem_reg_descs, nixl_file_reg_descs, agents_file, iterations
277+
):
182278
logger.info("Client initialized, ready for local transfer test...")
183279

184280
# For sample purposes, write to and then read from local storage
185281
logger.info("Starting local transfer test...")
186282

187283
start_time = time.time()
188284

189-
for i in range (1, iterations):
285+
for i in range(1, iterations):
190286
execute_transfer(
191287
my_agent,
192288
nixl_mem_reg_descs.trim(),
193289
nixl_file_reg_descs.trim(),
194290
my_agent.name,
195291
"WRITE",
196-
["GDS_MT"]
292+
["GDS_MT"],
197293
)
198294

199295
end_time = time.time()
@@ -204,14 +300,14 @@ def run_client(my_agent, nixl_mem_reg_descs, nixl_file_reg_descs, agents_file, i
204300

205301
start_time = time.time()
206302

207-
for i in range (1, iterations):
303+
for i in range(1, iterations):
208304
execute_transfer(
209305
my_agent,
210306
nixl_mem_reg_descs.trim(),
211307
nixl_file_reg_descs.trim(),
212308
my_agent.name,
213309
"READ",
214-
["GDS_MT"]
310+
["GDS_MT"],
215311
)
216312

217313
end_time = time.time()
@@ -245,6 +341,7 @@ def run_storage_server(my_agent, nixl_mem_reg_descs, nixl_file_reg_descs):
245341
my_agent, nixl_mem_reg_descs.trim(), nixl_file_reg_descs.trim()
246342
)
247343

344+
248345
if __name__ == "__main__":
249346
parser = nsu.get_base_parser()
250347
parser.add_argument(
@@ -266,7 +363,12 @@ def run_storage_server(my_agent, nixl_mem_reg_descs, nixl_file_reg_descs):
266363
type=str,
267364
help="File containing list of target agents (only needed for client)",
268365
)
269-
parser.add_argument("--iterations", type=int, default=100, help="Number of iterations for each transfer")
366+
parser.add_argument(
367+
"--iterations",
368+
type=int,
369+
default=100,
370+
help="Number of iterations for each transfer",
371+
)
270372
args = parser.parse_args()
271373

272374
mem = "DRAM"
@@ -290,7 +392,11 @@ def run_storage_server(my_agent, nixl_mem_reg_descs, nixl_file_reg_descs):
290392
parser.error("--agents_file is required when role is client")
291393
try:
292394
run_client(
293-
my_agent, nixl_mem_reg_descs, nixl_file_reg_descs, args.agents_file, args.iterations
395+
my_agent,
396+
nixl_mem_reg_descs,
397+
nixl_file_reg_descs,
398+
args.agents_file,
399+
args.iterations,
294400
)
295401
finally:
296402
nsu.cleanup_resources(

examples/python/remote_storage_example/nixl_storage_utils/common.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020

2121
import argparse
2222
import os
23+
2324
import torch
2425

2526
import nixl._utils as nixl_utils
@@ -76,7 +77,9 @@ def setup_memory_and_files(agent, batch_size, buf_size, fileprefix, mem="DRAM"):
7677
my_mem_list.append(nixl_utils.malloc_passthru(buf_size))
7778
nixl_mem_reg_list.append((my_mem_list[-1], buf_size, 0, str(i)))
7879

79-
my_file_list.append(os.open(f"{fileprefix}_{i}", os.O_RDWR | os.O_CREAT | os.O_DIRECT))
80+
my_file_list.append(
81+
os.open(f"{fileprefix}_{i}", os.O_RDWR | os.O_CREAT | os.O_DIRECT)
82+
)
8083
nixl_file_reg_list.append((0, buf_size, my_file_list[-1], str(i)))
8184

8285
nixl_mem_reg_descs = agent.register_memory(nixl_mem_reg_list, mem)

0 commit comments

Comments
 (0)