Skip to content

Commit 1111e18

Browse files
committed
fixup branch for upstream
1 parent a37dc45 commit 1111e18

File tree

10 files changed

+32
-91
lines changed

10 files changed

+32
-91
lines changed

examples/python/remote_storage_example/README.md

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -108,3 +108,21 @@ The system automatically selects the best available storage backend:
108108
1. Initiator sends memory descriptors to target
109109
2. Target performs storage-to-memory or memory-to-storage operations
110110
3. Data is transferred between initiator and target memory
111+
112+
Remote reads are implemented as a read from storage followed by a network write.
113+
114+
Remote writes are implemented as a read from network following by a storage write.
115+
116+
### Pipelining
117+
118+
To improve performance of the remote storage server, we can pipeline operations to network and storage. This pipelining allows multiple threads to handle each request. However, in order to maintain correctness, the order of network and storage must happen in order for each individual remote storage operation. To do this, we implemented a simple pipelining scheme.
119+
120+
![Remote Operation Pipelines](storage_pipelines.png)
121+
122+
### Performance Tips
123+
124+
For high-speed storage and network hardware, you may need to tweak performance with a couple of environment variables.
125+
126+
First, for optimal GDS performance, ensure you are using the GDS_MT backend with default concurrency. Additionally, you can use the cufile options described in the [GDS README](https://github.com/ai-dynamo/nixl/blob/main/src/plugins/cuda_gds/README.md).
127+
128+
On the network side, remote reads from VRAM to DRAM can be limited by UCX rail selection. This can be tweaked by setting UCX_MAX_RMA_RAILS=1. However, with larger batch or message sizes, this might limit bandwidth and a higher number of rails might be needed.

examples/python/remote_storage_example/agent_file.in

Lines changed: 0 additions & 1 deletion
This file was deleted.

examples/python/remote_storage_example/client_command.sh

Lines changed: 0 additions & 14 deletions
This file was deleted.

examples/python/remote_storage_example/cufile.json

Lines changed: 0 additions & 24 deletions
This file was deleted.

examples/python/remote_storage_example/myfio.fio

Lines changed: 0 additions & 15 deletions
This file was deleted.

examples/python/remote_storage_example/nixl_p2p_storage_example.py

Lines changed: 14 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ def execute_transfer(my_agent, local_descs, remote_descs, remote_name, operation
3636
my_agent.release_xfer_handle(handle)
3737

3838

39-
def remote_storage_transfer(my_agent, my_mem_descs, operation, remote_agent_name):
39+
def remote_storage_transfer(my_agent, my_mem_descs, operation, remote_agent_name, iterations):
4040
"""Initiate remote memory transfer."""
4141
if operation != "READ" and operation != "WRITE":
4242
logger.error("Invalid operation, exiting")
@@ -47,14 +47,14 @@ def remote_storage_transfer(my_agent, my_mem_descs, operation, remote_agent_name
4747
else:
4848
operation = b"READ"
4949

50-
iterations = b"0100"
50+
iterations_str = bytes(f"{iterations:04d}", "utf-8")
5151
# Send the descriptors that you want to read into or write from
5252
logger.info(f"Sending {operation} request to {remote_agent_name}")
5353
test_descs_str = my_agent.get_serialized_descs(my_mem_descs)
5454

5555
start_time = time.time()
5656

57-
my_agent.send_notif(remote_agent_name, operation + iterations + test_descs_str)
57+
my_agent.send_notif(remote_agent_name, operation + iterations_str + test_descs_str)
5858

5959
while not my_agent.check_remote_xfer_done(remote_agent_name, b"COMPLETE"):
6060
continue
@@ -113,8 +113,7 @@ def pipeline_reads(my_agent, req_agent, my_mem_descs, my_file_descs, sent_descs,
113113
s+=1
114114
n+=1
115115

116-
117-
done, not_done = concurrent.futures.wait(futures, return_when=concurrent.futures.ALL_COMPLETED)
116+
_, not_done = concurrent.futures.wait(futures, return_when=concurrent.futures.ALL_COMPLETED)
118117
assert not not_done
119118

120119
def pipeline_writes(my_agent, req_agent, my_mem_descs, my_file_descs, sent_descs, iterations):
@@ -142,17 +141,14 @@ def pipeline_writes(my_agent, req_agent, my_mem_descs, my_file_descs, sent_descs
142141
s+=1
143142
n+=1
144143

145-
146-
done, not_done = concurrent.futures.wait(futures, return_when=concurrent.futures.ALL_COMPLETED)
144+
_, not_done = concurrent.futures.wait(futures, return_when=concurrent.futures.ALL_COMPLETED)
147145
assert not not_done
148146

149147
def handle_remote_transfer_request(my_agent, my_mem_descs, my_file_descs):
150148
"""Handle remote memory and storage transfers as target."""
151149
# Wait for initiator to send list of memory descriptors
152150
notifs = my_agent.get_new_notifs()
153151

154-
# logger.info("Waiting for a remote transfer request...")
155-
156152
while len(notifs) == 0:
157153
notifs = my_agent.get_new_notifs()
158154

@@ -182,52 +178,47 @@ def handle_remote_transfer_request(my_agent, my_mem_descs, my_file_descs):
182178
# Send completion notification to initiator
183179
my_agent.send_notif(req_agent, b"COMPLETE")
184180

185-
# logger.info("One transfer test complete.")
186-
187-
188-
def run_client(my_agent, nixl_mem_reg_descs, nixl_file_reg_descs, agents_file):
181+
def run_client(my_agent, nixl_mem_reg_descs, nixl_file_reg_descs, agents_file, iterations):
189182
logger.info("Client initialized, ready for local transfer test...")
190183

191184
# For sample purposes, write to and then read from local storage
192185
logger.info("Starting local transfer test...")
193186

194187
start_time = time.time()
195188

196-
for i in range (1, 100):
189+
for i in range (1, iterations):
197190
execute_transfer(
198191
my_agent,
199192
nixl_mem_reg_descs.trim(),
200193
nixl_file_reg_descs.trim(),
201194
my_agent.name,
202195
"WRITE",
203-
# ["POSIX"]
204196
["GDS_MT"]
205197
)
206198

207199
end_time = time.time()
208200

209201
elapsed = end_time - start_time
210202

211-
logger.info(f"Time for 100 WRITE iterations: {elapsed} seconds")
203+
logger.info(f"Time for {iterations} WRITE iterations: {elapsed} seconds")
212204

213205
start_time = time.time()
214206

215-
for i in range (1, 100):
207+
for i in range (1, iterations):
216208
execute_transfer(
217209
my_agent,
218210
nixl_mem_reg_descs.trim(),
219211
nixl_file_reg_descs.trim(),
220212
my_agent.name,
221213
"READ",
222-
# ["POSIX"]
223214
["GDS_MT"]
224215
)
225216

226217
end_time = time.time()
227218

228219
elapsed = end_time - start_time
229220

230-
logger.info(f"Time for 100 READ iterations: {elapsed} seconds")
221+
logger.info(f"Time for {iterations} READ iterations: {elapsed} seconds")
231222

232223
logger.info("Local transfer test complete")
233224

@@ -238,10 +229,10 @@ def run_client(my_agent, nixl_mem_reg_descs, nixl_file_reg_descs, agents_file):
238229
# For sample purposes, write to and then read from each target agent
239230
for target_agent in target_agents:
240231
remote_storage_transfer(
241-
my_agent, nixl_mem_reg_descs.trim(), "WRITE", target_agent
232+
my_agent, nixl_mem_reg_descs.trim(), "WRITE", target_agent, iterations
242233
)
243234
remote_storage_transfer(
244-
my_agent, nixl_mem_reg_descs.trim(), "READ", target_agent
235+
my_agent, nixl_mem_reg_descs.trim(), "READ", target_agent, iterations
245236
)
246237

247238
logger.info("Remote transfer test complete")
@@ -254,7 +245,6 @@ def run_storage_server(my_agent, nixl_mem_reg_descs, nixl_file_reg_descs):
254245
my_agent, nixl_mem_reg_descs.trim(), nixl_file_reg_descs.trim()
255246
)
256247

257-
258248
if __name__ == "__main__":
259249
parser = nsu.get_base_parser()
260250
parser.add_argument(
@@ -276,6 +266,7 @@ def run_storage_server(my_agent, nixl_mem_reg_descs, nixl_file_reg_descs):
276266
type=str,
277267
help="File containing list of target agents (only needed for client)",
278268
)
269+
parser.add_argument("--iterations", type=int, default=100, help="Number of iterations for each transfer")
279270
args = parser.parse_args()
280271

281272
mem = "DRAM"
@@ -299,7 +290,7 @@ def run_storage_server(my_agent, nixl_mem_reg_descs, nixl_file_reg_descs):
299290
parser.error("--agents_file is required when role is client")
300291
try:
301292
run_client(
302-
my_agent, nixl_mem_reg_descs, nixl_file_reg_descs, args.agents_file
293+
my_agent, nixl_mem_reg_descs, nixl_file_reg_descs, args.agents_file, args.iterations
303294
)
304295
finally:
305296
nsu.cleanup_resources(
Binary file not shown.
Binary file not shown.

examples/python/remote_storage_example/server_command.sh

Lines changed: 0 additions & 14 deletions
This file was deleted.
345 KB
Loading

0 commit comments

Comments
 (0)