Skip to content

Conversation

@specture724
Copy link
Collaborator

resolve #23
Add a reusable pin memory buffer. In the current implementation, Only one checkpoint is able to use the shared pin memory at the same time. And the pin memory buffer shape is fixed to the shape when it is used for the first time, which cannot be modified latter.

Copy link

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

This PR implements a reusable pin memory buffer mechanism for checkpoints to reduce memory allocation overhead. The shared memory pool allows checkpoints to reuse the same pinned memory buffers sequentially, with the constraint that only one checkpoint can use the shared pool at a time and the buffer shape is fixed on first allocation.

Key changes:

  • Added use_shared_memory_pool parameter to register_checkpoint() for opt-in shared memory usage
  • Introduced tracking of current shared memory pool user via _current_shared_memory_pool_user
  • Modified _register_checkpoint() to accept and reuse existing pin memory buffers when provided

Reviewed Changes

Copilot reviewed 2 out of 2 changed files in this pull request and generated 7 comments.

File Description
checkpoint_engine/ps.py Implements shared pin memory pool infrastructure with registration/unregistration logic and helper method for memory pool access
tests/test_pin_memory.py Adds test coverage for shared memory pool registration, unregistration, and conflict handling scenarios

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines +873 to 894
self._memory_pool[self.shared_memory_pool_name] = _register_checkpoint(
files=files or [],
named_tensors=named_tensors or {},
rank=self._rank,
shared_pin_memory=self._memory_pool[self.shared_memory_pool_name],
)
self._current_shared_memory_pool_user = checkpoint_name
else:
assert checkpoint_name not in self._memory_pool, (
f"checkpoint {checkpoint_name} already registered"
)
self._memory_pool[checkpoint_name] = _register_checkpoint(
files=files or [], named_tensors=named_tensors or {}, rank=self._rank
)
if self._p2p_store is not None:
self._register_parameters_to_p2p_store(checkpoint_name)
except Exception:
Copy link

Copilot AI Nov 19, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's a resource leak when registration of a shared memory pool checkpoint fails. If an exception occurs after setting self._current_shared_memory_pool_user = checkpoint_name (line 879) but before the checkpoint is fully registered, the exception handler at line 895 calls self.unregister_checkpoint(checkpoint_name). However, for shared memory pool users, unregister_checkpoint only clears _current_shared_memory_pool_user without cleaning up the shared memory pool itself, which may have been partially modified.

Consider resetting _current_shared_memory_pool_user to empty string in the exception handler before calling unregister_checkpoint, or handle shared memory pool cleanup differently in error cases.

Copilot uses AI. Check for mistakes.
Comment on lines 855 to +926
if self._p2p_store is not None:
num_unregistered = self._unregister_parameters_from_p2p_store(checkpoint_name)
logger.info(
f"[rank{self._rank}] unregister {num_unregistered} parameters from p2p store for checkpoint {checkpoint_name}"
)
if checkpoint_name == self._current_shared_memory_pool_user:
logger.info(
f"[rank{self._rank}] unregister shared memory pool from p2p store, skip unregistering from memory pool"
)
self._current_shared_memory_pool_user = ""
return
Copy link

Copilot AI Nov 19, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When unregistering a checkpoint using the shared memory pool, the p2p_store is unregistered (lines 912-915) before checking if it's the shared memory pool user (line 916). This means that _unregister_parameters_from_p2p_store is called with the checkpoint name, which internally calls _get_memory_pool(checkpoint_name).

However, _get_memory_pool will correctly return the shared memory pool for the current user. The issue is that the p2p_store keys are generated using the pattern f"memory_pool_{checkpoint_name}_{idx}", so each checkpoint using the shared pool would have different p2p_store keys, even though they share the same underlying memory buffers. This could lead to p2p_store inconsistencies where multiple sets of keys point to the same memory, or failed unregistration attempts.

Suggested change
if self._p2p_store is not None:
num_unregistered = self._unregister_parameters_from_p2p_store(checkpoint_name)
logger.info(
f"[rank{self._rank}] unregister {num_unregistered} parameters from p2p store for checkpoint {checkpoint_name}"
)
if checkpoint_name == self._current_shared_memory_pool_user:
logger.info(
f"[rank{self._rank}] unregister shared memory pool from p2p store, skip unregistering from memory pool"
)
self._current_shared_memory_pool_user = ""
return
if checkpoint_name == self._current_shared_memory_pool_user:
logger.info(
f"[rank{self._rank}] unregister shared memory pool from p2p store, skip unregistering from memory pool"
)
self._current_shared_memory_pool_user = ""
return
if self._p2p_store is not None:
num_unregistered = self._unregister_parameters_from_p2p_store(checkpoint_name)
logger.info(
f"[rank{self._rank}] unregister {num_unregistered} parameters from p2p store for checkpoint {checkpoint_name}"
)

Copilot uses AI. Check for mistakes.
@specture724 specture724 force-pushed the feat/reuse_pin_memory branch from 6071dc8 to 71af910 Compare November 19, 2025 08:21
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Support reusing pin_memory when register_checkpoint

1 participant