-
Notifications
You must be signed in to change notification settings - Fork 67
feat: reuse pin_memory when registering checkpoint #56
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
This PR implements a reusable pin memory buffer mechanism for checkpoints to reduce memory allocation overhead. The shared memory pool allows checkpoints to reuse the same pinned memory buffers sequentially, with the constraint that only one checkpoint can use the shared pool at a time and the buffer shape is fixed on first allocation.
Key changes:
- Added
use_shared_memory_poolparameter toregister_checkpoint()for opt-in shared memory usage - Introduced tracking of current shared memory pool user via
_current_shared_memory_pool_user - Modified
_register_checkpoint()to accept and reuse existing pin memory buffers when provided
Reviewed Changes
Copilot reviewed 2 out of 2 changed files in this pull request and generated 7 comments.
| File | Description |
|---|---|
| checkpoint_engine/ps.py | Implements shared pin memory pool infrastructure with registration/unregistration logic and helper method for memory pool access |
| tests/test_pin_memory.py | Adds test coverage for shared memory pool registration, unregistration, and conflict handling scenarios |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| self._memory_pool[self.shared_memory_pool_name] = _register_checkpoint( | ||
| files=files or [], | ||
| named_tensors=named_tensors or {}, | ||
| rank=self._rank, | ||
| shared_pin_memory=self._memory_pool[self.shared_memory_pool_name], | ||
| ) | ||
| self._current_shared_memory_pool_user = checkpoint_name | ||
| else: | ||
| assert checkpoint_name not in self._memory_pool, ( | ||
| f"checkpoint {checkpoint_name} already registered" | ||
| ) | ||
| self._memory_pool[checkpoint_name] = _register_checkpoint( | ||
| files=files or [], named_tensors=named_tensors or {}, rank=self._rank | ||
| ) | ||
| if self._p2p_store is not None: | ||
| self._register_parameters_to_p2p_store(checkpoint_name) | ||
| except Exception: |
Copilot
AI
Nov 19, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There's a resource leak when registration of a shared memory pool checkpoint fails. If an exception occurs after setting self._current_shared_memory_pool_user = checkpoint_name (line 879) but before the checkpoint is fully registered, the exception handler at line 895 calls self.unregister_checkpoint(checkpoint_name). However, for shared memory pool users, unregister_checkpoint only clears _current_shared_memory_pool_user without cleaning up the shared memory pool itself, which may have been partially modified.
Consider resetting _current_shared_memory_pool_user to empty string in the exception handler before calling unregister_checkpoint, or handle shared memory pool cleanup differently in error cases.
| if self._p2p_store is not None: | ||
| num_unregistered = self._unregister_parameters_from_p2p_store(checkpoint_name) | ||
| logger.info( | ||
| f"[rank{self._rank}] unregister {num_unregistered} parameters from p2p store for checkpoint {checkpoint_name}" | ||
| ) | ||
| if checkpoint_name == self._current_shared_memory_pool_user: | ||
| logger.info( | ||
| f"[rank{self._rank}] unregister shared memory pool from p2p store, skip unregistering from memory pool" | ||
| ) | ||
| self._current_shared_memory_pool_user = "" | ||
| return |
Copilot
AI
Nov 19, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When unregistering a checkpoint using the shared memory pool, the p2p_store is unregistered (lines 912-915) before checking if it's the shared memory pool user (line 916). This means that _unregister_parameters_from_p2p_store is called with the checkpoint name, which internally calls _get_memory_pool(checkpoint_name).
However, _get_memory_pool will correctly return the shared memory pool for the current user. The issue is that the p2p_store keys are generated using the pattern f"memory_pool_{checkpoint_name}_{idx}", so each checkpoint using the shared pool would have different p2p_store keys, even though they share the same underlying memory buffers. This could lead to p2p_store inconsistencies where multiple sets of keys point to the same memory, or failed unregistration attempts.
| if self._p2p_store is not None: | |
| num_unregistered = self._unregister_parameters_from_p2p_store(checkpoint_name) | |
| logger.info( | |
| f"[rank{self._rank}] unregister {num_unregistered} parameters from p2p store for checkpoint {checkpoint_name}" | |
| ) | |
| if checkpoint_name == self._current_shared_memory_pool_user: | |
| logger.info( | |
| f"[rank{self._rank}] unregister shared memory pool from p2p store, skip unregistering from memory pool" | |
| ) | |
| self._current_shared_memory_pool_user = "" | |
| return | |
| if checkpoint_name == self._current_shared_memory_pool_user: | |
| logger.info( | |
| f"[rank{self._rank}] unregister shared memory pool from p2p store, skip unregistering from memory pool" | |
| ) | |
| self._current_shared_memory_pool_user = "" | |
| return | |
| if self._p2p_store is not None: | |
| num_unregistered = self._unregister_parameters_from_p2p_store(checkpoint_name) | |
| logger.info( | |
| f"[rank{self._rank}] unregister {num_unregistered} parameters from p2p store for checkpoint {checkpoint_name}" | |
| ) |
6071dc8 to
71af910
Compare
resolve #23
Add a reusable pin memory buffer. In the current implementation, Only one checkpoint is able to use the shared pin memory at the same time. And the pin memory buffer shape is fixed to the shape when it is used for the first time, which cannot be modified latter.