From c178a9ef304b31df2428dfef21432f0e87a0798a Mon Sep 17 00:00:00 2001 From: Dongxu Yang <78518666+dongxuy04@users.noreply.github.com> Date: Sat, 7 Jun 2025 16:54:27 +0800 Subject: [PATCH 01/13] add gdrcopy Signed-off-by: Dongxu Yang <78518666+dongxuy04@users.noreply.github.com> --- cpp/tensorrt_llm/runtime/CMakeLists.txt | 1 + .../runtime/moeLoadBalancer/gdrwrap.cpp | 219 +++++++++++++ .../runtime/moeLoadBalancer/gdrwrap.h | 292 ++++++++++++++++++ cpp/tests/unit_tests/runtime/CMakeLists.txt | 1 + cpp/tests/unit_tests/runtime/gdrcopyTest.cpp | 150 +++++++++ 5 files changed, 663 insertions(+) create mode 100644 cpp/tensorrt_llm/runtime/moeLoadBalancer/gdrwrap.cpp create mode 100644 cpp/tensorrt_llm/runtime/moeLoadBalancer/gdrwrap.h create mode 100644 cpp/tests/unit_tests/runtime/gdrcopyTest.cpp diff --git a/cpp/tensorrt_llm/runtime/CMakeLists.txt b/cpp/tensorrt_llm/runtime/CMakeLists.txt index a4d436934fe..a66c1266059 100644 --- a/cpp/tensorrt_llm/runtime/CMakeLists.txt +++ b/cpp/tensorrt_llm/runtime/CMakeLists.txt @@ -43,6 +43,7 @@ set(SRCS ipcNvlsMemory.cpp mcastDeviceMemory.cpp memoryCounters.cpp + moeLoadBalancer/gdrwrap.cpp moeLoadBalancer/moeLoadBalancer.cpp moeLoadBalancer/topologyDetector.cpp ncclCommunicator.cpp diff --git a/cpp/tensorrt_llm/runtime/moeLoadBalancer/gdrwrap.cpp b/cpp/tensorrt_llm/runtime/moeLoadBalancer/gdrwrap.cpp new file mode 100644 index 00000000000..d8324af1005 --- /dev/null +++ b/cpp/tensorrt_llm/runtime/moeLoadBalancer/gdrwrap.cpp @@ -0,0 +1,219 @@ +/************************************************************************* + * Copyright (c) 2020-2021, NVIDIA CORPORATION. All rights reserved. + * + * See LICENSE.txt for license information + ************************************************************************/ + +#ifndef _WIN32 + +#include "gdrwrap.h" +#include "tensorrt_llm/common/assert.h" +#include "tensorrt_llm/common/logger.h" + +#include +#include + +namespace tensorrt_llm +{ +namespace runtime +{ +namespace gdrcopy +{ + +// Function pointers assigned from dlopen() +static gdr_t (*gdr_internal_open)(void); +static int (*gdr_internal_close)(gdr_t g); +static int (*gdr_internal_pin_buffer)( + gdr_t g, unsigned long addr, size_t size, uint64_t p2p_token, uint32_t va_space, gdr_mh_t* handle); +static int (*gdr_internal_unpin_buffer)(gdr_t g, gdr_mh_t handle); +static int (*gdr_internal_get_info)(gdr_t g, gdr_mh_t handle, gdr_info_t* info); +static int (*gdr_internal_map)(gdr_t g, gdr_mh_t handle, void** va, size_t size); +static int (*gdr_internal_unmap)(gdr_t g, gdr_mh_t handle, void* va, size_t size); +static void (*gdr_internal_runtime_get_version)(int* major, int* minor); +static void (*gdr_internal_driver_get_version)(gdr_t g, int* major, int* minor); +static int (*gdr_internal_copy_to_mapping)(gdr_mh_t handle, void* map_d_ptr, void const* h_ptr, size_t size); +static int (*gdr_internal_copy_from_mapping)(gdr_mh_t handle, void* h_ptr, void const* map_d_ptr, size_t size); + +static pthread_mutex_t gGdrLock = PTHREAD_MUTEX_INITIALIZER; +static bool gGdrInitialized = false; +static void* gGdrApiHandle = nullptr; + +#define GDRAPI_LIBNAME "libgdrapi.so" + +#define LOAD_SYM(handle, symbol, funcptr) \ + do \ + { \ + *(void**) (&(funcptr)) = dlsym(handle, symbol); \ + if ((funcptr) == NULL) \ + { \ + TLLM_LOG_WARNING("dlsym failed on %s - %s", symbol, dlerror()); \ + dlclose(handle); \ + gGdrApiHandle = nullptr; \ + gInitStatus = false; \ + return; \ + } \ + } while (0) + +static pthread_once_t gInitOnceControl = PTHREAD_ONCE_INIT; +static bool gInitStatus = false; + +static void initialize_internal() +{ + gGdrApiHandle = dlopen(GDRAPI_LIBNAME, RTLD_NOW); + if (!gGdrApiHandle) + { + TLLM_LOG_INFO("Failed to open %s. GDRCopy support is disabled.", GDRAPI_LIBNAME); + gInitStatus = false; + return; + } + + LOAD_SYM(gGdrApiHandle, "gdr_open", gdr_internal_open); + LOAD_SYM(gGdrApiHandle, "gdr_close", gdr_internal_close); + LOAD_SYM(gGdrApiHandle, "gdr_pin_buffer", gdr_internal_pin_buffer); + LOAD_SYM(gGdrApiHandle, "gdr_unpin_buffer", gdr_internal_unpin_buffer); + LOAD_SYM(gGdrApiHandle, "gdr_get_info", gdr_internal_get_info); + LOAD_SYM(gGdrApiHandle, "gdr_map", gdr_internal_map); + LOAD_SYM(gGdrApiHandle, "gdr_unmap", gdr_internal_unmap); + LOAD_SYM(gGdrApiHandle, "gdr_runtime_get_version", gdr_internal_runtime_get_version); + LOAD_SYM(gGdrApiHandle, "gdr_driver_get_version", gdr_internal_driver_get_version); + LOAD_SYM(gGdrApiHandle, "gdr_copy_to_mapping", gdr_internal_copy_to_mapping); + LOAD_SYM(gGdrApiHandle, "gdr_copy_from_mapping", gdr_internal_copy_from_mapping); + + gdr_t g = gdr_internal_open(); + if (g == nullptr) + { + TLLM_LOG_WARNING("gdr_open failed. GDRCopy support is disabled."); + dlclose(gGdrApiHandle); + gGdrApiHandle = nullptr; + gInitStatus = false; + return; + } + + int libMajor, libMinor, drvMajor, drvMinor; + gdr_internal_runtime_get_version(&libMajor, &libMinor); + gdr_internal_driver_get_version(g, &drvMajor, &drvMinor); + gdr_internal_close(g); + + if (libMajor < 2 || (libMajor == 2 && libMinor < 1) || drvMajor < 2 || (drvMajor == 2 && drvMinor < 1)) + { + TLLM_LOG_WARNING( + "GDRCopy library version (%d.%d) or driver version (%d.%d) is too old. Required >= 2.1. GDRCopy support " + "is disabled.", + libMajor, libMinor, drvMajor, drvMinor); + dlclose(gGdrApiHandle); + gGdrApiHandle = nullptr; + gInitStatus = false; + return; + } + + TLLM_LOG_INFO("GDRCopy enabled library %d.%d driver %d.%d", libMajor, libMinor, drvMajor, drvMinor); + gInitStatus = true; + gGdrInitialized = true; +} + +bool initialize() +{ + pthread_once(&gInitOnceControl, initialize_internal); + return gInitStatus; +} + +bool isInitialized() +{ + return gGdrInitialized; +} + +#define CHECK_INITIALIZED() \ + TLLM_CHECK_WITH_INFO(gGdrInitialized, "GDRCopy library is not initialized. Call gdrcopy::initialize() first.") + +#define GDRLOCKCALL(cmd) \ + [&] \ + { \ + pthread_mutex_lock(&gGdrLock); \ + auto ret = (cmd); \ + pthread_mutex_unlock(&gGdrLock); \ + return ret; \ + }() + +gdr_t open() +{ + CHECK_INITIALIZED(); + return gdr_internal_open(); +} + +int close(gdr_t g) +{ + CHECK_INITIALIZED(); + return gdr_internal_close(g); +} + +int pin_buffer(gdr_t g, unsigned long addr, size_t size, uint64_t p2p_token, uint32_t va_space, gdr_mh_t* handle) +{ + CHECK_INITIALIZED(); + return GDRLOCKCALL(gdr_internal_pin_buffer(g, addr, size, p2p_token, va_space, handle)); +} + +int unpin_buffer(gdr_t g, gdr_mh_t handle) +{ + CHECK_INITIALIZED(); + return GDRLOCKCALL(gdr_internal_unpin_buffer(g, handle)); +} + +int get_info(gdr_t g, gdr_mh_t handle, gdr_info_t* info) +{ + CHECK_INITIALIZED(); + return GDRLOCKCALL(gdr_internal_get_info(g, handle, info)); +} + +int map(gdr_t g, gdr_mh_t handle, void** va, size_t size) +{ + CHECK_INITIALIZED(); + return GDRLOCKCALL(gdr_internal_map(g, handle, va, size)); +} + +int unmap(gdr_t g, gdr_mh_t handle, void* va, size_t size) +{ + CHECK_INITIALIZED(); + return GDRLOCKCALL(gdr_internal_unmap(g, handle, va, size)); +} + +void runtime_get_version(int* major, int* minor) +{ + CHECK_INITIALIZED(); + gdr_internal_runtime_get_version(major, minor); +} + +void driver_get_version(gdr_t g, int* major, int* minor) +{ + CHECK_INITIALIZED(); + gdr_internal_driver_get_version(g, major, minor); +} + +int copy_to_mapping(gdr_mh_t handle, void* map_d_ptr, void const* h_ptr, size_t size) +{ + CHECK_INITIALIZED(); + return GDRLOCKCALL(gdr_internal_copy_to_mapping(handle, map_d_ptr, h_ptr, size)); +} + +int copy_from_mapping(gdr_mh_t handle, void* h_ptr, void const* map_d_ptr, size_t size) +{ + CHECK_INITIALIZED(); + return GDRLOCKCALL(gdr_internal_copy_from_mapping(handle, h_ptr, map_d_ptr, size)); +} + +void gdrCudaFree(GdrMemDesc* memDesc, gdr_t handle) +{ + CHECK_INITIALIZED(); + if (memDesc) + { + unmap(handle, memDesc->gdrMh, memDesc->gdrMap, memDesc->gdrMapSize); + unpin_buffer(handle, memDesc->gdrMh); + TLLM_CUDA_CHECK(cudaFree(memDesc->gdrDeviceMem)); + delete memDesc; + } +} + +} // namespace gdrcopy +} // namespace runtime +} // namespace tensorrt_llm + +#endif // _WIN32 diff --git a/cpp/tensorrt_llm/runtime/moeLoadBalancer/gdrwrap.h b/cpp/tensorrt_llm/runtime/moeLoadBalancer/gdrwrap.h new file mode 100644 index 00000000000..4591f0f7916 --- /dev/null +++ b/cpp/tensorrt_llm/runtime/moeLoadBalancer/gdrwrap.h @@ -0,0 +1,292 @@ +#pragma once + +#ifdef _WIN32 + +#include "tensorrt_llm/common/assert.h" +#include "tensorrt_llm/common/logger.h" +#include +#include + +// Dummy types for Windows to allow compilation. +struct gdr; +typedef struct gdr* gdr_t; + +typedef struct gdr_mh_s +{ + unsigned long h; +} gdr_mh_t; + +struct gdr_info +{ +}; +typedef struct gdr_info gdr_info_t; + +namespace tensorrt_llm +{ +namespace runtime +{ +namespace gdrcopy +{ + +struct GdrMemDesc +{ +}; + +// On Windows, GDRCopy is not supported. These are stub implementations. +inline bool initialize() +{ + TLLM_LOG_INFO("GDRCopy is not supported on Windows."); + return false; +} + +inline bool isInitialized() +{ + return false; +} + +#define GDRCOPY_UNSUPPORTED() TLLM_THROW("GDRCopy is not supported on Windows") + +inline gdr_t open() +{ + GDRCOPY_UNSUPPORTED(); + return nullptr; +} + +inline int close(gdr_t /*g*/) +{ + GDRCOPY_UNSUPPORTED(); + return -1; +} + +inline int pin_buffer(gdr_t /*g*/, unsigned long /*addr*/, size_t /*size*/, uint64_t /*p2p_token*/, + uint32_t /*va_space*/, gdr_mh_t* /*handle*/) +{ + GDRCOPY_UNSUPPORTED(); + return -1; +} + +inline int unpin_buffer(gdr_t /*g*/, gdr_mh_t /*handle*/) +{ + GDRCOPY_UNSUPPORTED(); + return -1; +} + +inline int get_info(gdr_t /*g*/, gdr_mh_t /*handle*/, gdr_info_t* /*info*/) +{ + GDRCOPY_UNSUPPORTED(); + return -1; +} + +inline int map(gdr_t /*g*/, gdr_mh_t /*handle*/, void** /*va*/, size_t /*size*/) +{ + GDRCOPY_UNSUPPORTED(); + return -1; +} + +inline int unmap(gdr_t /*g*/, gdr_mh_t /*handle*/, void* /*va*/, size_t /*size*/) +{ + GDRCOPY_UNSUPPORTED(); + return -1; +} + +inline void runtime_get_version(int* /*major*/, int* /*minor*/) +{ + GDRCOPY_UNSUPPORTED(); +} + +inline void driver_get_version(gdr_t /*g*/, int* /*major*/, int* /*minor*/) +{ + GDRCOPY_UNSUPPORTED(); +} + +inline int copy_to_mapping(gdr_mh_t /*handle*/, void* /*map_d_ptr*/, void const* /*h_ptr*/, size_t /*size*/) +{ + GDRCOPY_UNSUPPORTED(); + return -1; +} + +inline int copy_from_mapping(gdr_mh_t /*handle*/, void* /*h_ptr*/, void const* /*map_d_ptr*/, size_t /*size*/) +{ + GDRCOPY_UNSUPPORTED(); + return -1; +} + +template +void gdrCudaMalloc(T** /*ptr*/, T** /*devPtr*/, size_t /*nelem*/, GdrMemDesc** /*memDesc*/, gdr_t /*handle*/) +{ + GDRCOPY_UNSUPPORTED(); +} + +inline void gdrCudaFree(GdrMemDesc* /*memDesc*/, gdr_t /*handle*/) +{ + GDRCOPY_UNSUPPORTED(); +} + +} // namespace gdrcopy +} // namespace runtime +} // namespace tensorrt_llm + +#else // NOT _WIN32 + +#include "tensorrt_llm/common/assert.h" +#include "tensorrt_llm/common/cudaUtils.h" +#include "tensorrt_llm/common/logger.h" +#include +#include +#include + +// These definitions are from gdrapi.h to avoid a direct dependency on the header. +#define GPU_PAGE_SHIFT 16 +#define GPU_PAGE_SIZE (1UL << GPU_PAGE_SHIFT) +#define GPU_PAGE_OFFSET (GPU_PAGE_SIZE - 1) +#define GPU_PAGE_MASK (~GPU_PAGE_OFFSET) + +struct gdr; +typedef struct gdr* gdr_t; + +typedef struct gdr_mh_s +{ + unsigned long h; +} gdr_mh_t; + +struct gdr_info +{ + uint64_t va; + uint64_t mapped_size; + uint32_t page_size; + uint64_t tm_cycles; + uint32_t cycles_per_ms; + unsigned mapped : 1; + unsigned wc_mapping : 1; +}; +typedef struct gdr_info gdr_info_t; + +namespace tensorrt_llm +{ +namespace runtime +{ +namespace gdrcopy +{ + +// This is required as the GDR memory is mapped WC +#if !defined(__NVCC__) +#if defined(__PPC__) +static inline void wc_store_fence(void) +{ + asm volatile("sync"); +} +#elif defined(__x86_64__) +#include + +static inline void wc_store_fence(void) +{ + _mm_sfence(); +} +#elif defined(__aarch64__) +#ifdef __cplusplus +#include + +static inline void wc_store_fence(void) +{ + std::atomic_thread_fence(std::memory_order_release); +} +#else +#include + +static inline void wc_store_fence(void) +{ + atomic_thread_fence(memory_order_release); +} +#endif +#endif +#endif + +// Initializes the GDRCopy library by dynamically loading it. +// This function is thread-safe. +// Returns true on success, false on failure. +bool initialize(); + +// Returns true if the GDRCopy library has been successfully initialized. +bool isInitialized(); + +// All functions below are wrappers around the GDRCopy library functions. +// They are thread-safe. +// Before calling any of these functions, ensure the library is initialized. + +gdr_t open(); +int close(gdr_t g); +int pin_buffer(gdr_t g, unsigned long addr, size_t size, uint64_t p2p_token, uint32_t va_space, gdr_mh_t* handle); +int unpin_buffer(gdr_t g, gdr_mh_t handle); +int get_info(gdr_t g, gdr_mh_t handle, gdr_info_t* info); +int map(gdr_t g, gdr_mh_t handle, void** va, size_t size); +int unmap(gdr_t g, gdr_mh_t handle, void* va, size_t size); +void runtime_get_version(int* major, int* minor); +void driver_get_version(gdr_t g, int* major, int* minor); +int copy_to_mapping(gdr_mh_t handle, void* map_d_ptr, void const* h_ptr, size_t size); +int copy_from_mapping(gdr_mh_t handle, void* h_ptr, void const* map_d_ptr, size_t size); + +// --- GDRCopy Memory Management Helpers --- + +struct GdrMemDesc +{ + void* gdrDeviceMem; + void* gdrMap; + size_t gdrOffset; + size_t gdrMapSize; + gdr_mh_t gdrMh; +}; + +// Allocates memory that can be used with GDRCopy. +// Throws an exception on failure. +template +void gdrCudaMalloc(T** ptr, T** devPtr, size_t nelem, GdrMemDesc** memDesc, gdr_t handle) +{ + TLLM_CHECK_WITH_INFO(isInitialized(), "GDRCopy library is not initialized."); + gdr_info_t info; + size_t mapSize; + gdr_mh_t mh; + char* devMem; + void* gdrMap; + + mapSize = sizeof(T) * nelem; + + // GDRCOPY Pinned buffer has to be a minimum of a GPU_PAGE_SIZE + size_t alignedMapSize = (mapSize + GPU_PAGE_SIZE - 1) & GPU_PAGE_MASK; + if (alignedMapSize == 0 && mapSize > 0) + { + alignedMapSize = GPU_PAGE_SIZE; + } + TLLM_CUDA_CHECK(cudaMalloc(&devMem, alignedMapSize + GPU_PAGE_SIZE - 1)); + uint64_t alignedAddr = ((uint64_t) devMem + GPU_PAGE_OFFSET) & GPU_PAGE_MASK; + size_t align = alignedAddr - (uint64_t) devMem; + + TLLM_CHECK_WITH_INFO(pin_buffer(handle, alignedAddr, alignedMapSize, 0, 0, &mh) == 0, "GDR pin_buffer failed"); + TLLM_CHECK_WITH_INFO(map(handle, mh, &gdrMap, alignedMapSize) == 0, "GDR map failed"); + TLLM_CHECK_WITH_INFO(get_info(handle, mh, &info) == 0, "GDR get_info failed"); + + ssize_t off = info.va - alignedAddr; + + *memDesc = new GdrMemDesc(); + (*memDesc)->gdrDeviceMem = devMem; + (*memDesc)->gdrMap = gdrMap; + (*memDesc)->gdrMapSize = alignedMapSize; + (*memDesc)->gdrOffset = off + align; + (*memDesc)->gdrMh = mh; + + *ptr = (T*) ((char*) gdrMap + off); + if (devPtr) + *devPtr = (T*) (devMem + off + align); + + TLLM_LOG_DEBUG("GDRCOPY : allocated devMem %p gdrMap %p offset %lx mh %lx mapSize %zu at %p", + (*memDesc)->gdrDeviceMem, (*memDesc)->gdrMap, (*memDesc)->gdrOffset, (*memDesc)->gdrMh.h, + (*memDesc)->gdrMapSize, *ptr); +} + +// Frees memory allocated with gdrCudaMalloc. +void gdrCudaFree(GdrMemDesc* memDesc, gdr_t handle); + +} // namespace gdrcopy +} // namespace runtime +} // namespace tensorrt_llm + +#endif // _WIN32 diff --git a/cpp/tests/unit_tests/runtime/CMakeLists.txt b/cpp/tests/unit_tests/runtime/CMakeLists.txt index ad6884ee762..511783b9c8d 100644 --- a/cpp/tests/unit_tests/runtime/CMakeLists.txt +++ b/cpp/tests/unit_tests/runtime/CMakeLists.txt @@ -12,6 +12,7 @@ add_gtest(bufferManagerTest bufferManagerTest.cpp) add_gtest(cudaMemPoolTest cudaMemPoolTest.cpp) add_gtest(decodingLayerWorkspaceTest decodingLayerWorkspaceTest.cpp) +add_gtest(gdrcopyTest gdrcopyTest.cpp) add_gtest(iBufferTest iBufferTest.cpp) add_gtest(iTensorTest iTensorTest.cpp) add_gtest(loraCacheTest loraCacheTest.cpp) diff --git a/cpp/tests/unit_tests/runtime/gdrcopyTest.cpp b/cpp/tests/unit_tests/runtime/gdrcopyTest.cpp new file mode 100644 index 00000000000..5ff14d4b0ac --- /dev/null +++ b/cpp/tests/unit_tests/runtime/gdrcopyTest.cpp @@ -0,0 +1,150 @@ +#include +#include +#include +#include + +#include "tensorrt_llm/common/cudaUtils.h" +#include "tensorrt_llm/common/logger.h" +#include "tensorrt_llm/runtime/moeLoadBalancer/gdrwrap.h" + +// Skip all tests on Windows +#ifndef _WIN32 + +namespace tensorrt_llm::runtime::gdrcopy +{ + +namespace +{ + +class GdrCopyTest : public ::testing::Test +{ +protected: + void SetUp() override + { + // Try to initialize GDRCopy + gdrCopyInitialized = gdrcopy::initialize(); + if (!gdrCopyInitialized) + { + GTEST_SKIP() << "GDRCopy library not found or failed to initialize. Skipping all tests."; + } + + gdrHandle = gdrcopy::open(); + ASSERT_NE(gdrHandle, nullptr) << "gdr_open() failed after successful initialization."; + } + + void TearDown() override + { + if (gdrCopyInitialized && gdrHandle) + { + ASSERT_EQ(gdrcopy::close(gdrHandle), 0); + } + } + + bool gdrCopyInitialized = false; + gdr_t gdrHandle = nullptr; + const size_t TEST_SIZE = 4096; // Use a page size for testing +}; + +TEST_F(GdrCopyTest, GetVersionInfo) +{ + int libMajor = 0, libMinor = 0; + gdrcopy::runtime_get_version(&libMajor, &libMinor); + TLLM_LOG_INFO("GDRCopy library version: %d.%d", libMajor, libMinor); + EXPECT_GE(libMajor, 2); + + int drvMajor = 0, drvMinor = 0; + gdrcopy::driver_get_version(gdrHandle, &drvMajor, &drvMinor); + TLLM_LOG_INFO("GDRCopy driver version: %d.%d", drvMajor, drvMinor); + EXPECT_GE(drvMajor, 2); +} + +TEST_F(GdrCopyTest, RawApiLifecycle) +{ + // 1. Allocate aligned device memory + char* d_buffer; + TLLM_CUDA_CHECK(cudaMalloc(&d_buffer, TEST_SIZE + GPU_PAGE_SIZE)); + unsigned long d_addr_aligned = ((unsigned long) d_buffer + GPU_PAGE_OFFSET) & GPU_PAGE_MASK; + + // 2. Pin buffer + gdr_mh_t mh; + ASSERT_EQ(gdrcopy::pin_buffer(gdrHandle, d_addr_aligned, TEST_SIZE, 0, 0, &mh), 0); + + // 3. Get Info before map + gdr_info_t info; + ASSERT_EQ(gdrcopy::get_info(gdrHandle, mh, &info), 0); + EXPECT_EQ(info.mapped, 0); + EXPECT_EQ(info.va, d_addr_aligned); + + // 4. Map buffer + void* map_ptr = nullptr; + ASSERT_EQ(gdrcopy::map(gdrHandle, mh, &map_ptr, TEST_SIZE), 0); + ASSERT_NE(map_ptr, nullptr); + + // 5. Get Info after map + ASSERT_EQ(gdrcopy::get_info(gdrHandle, mh, &info), 0); + EXPECT_EQ(info.mapped, 1); + EXPECT_TRUE(info.wc_mapping); + + // 6. Test copy functions + std::vector h_src(TEST_SIZE); + std::vector h_dst(TEST_SIZE, 0); + for (size_t i = 0; i < TEST_SIZE; ++i) + { + h_src[i] = static_cast(i % 256); + } + + // Host -> Mapped GPU memory + ASSERT_EQ(gdrcopy::copy_to_mapping(mh, map_ptr, h_src.data(), TEST_SIZE), 0); + + // Mapped GPU memory -> Host + ASSERT_EQ(gdrcopy::copy_from_mapping(mh, h_dst.data(), map_ptr, TEST_SIZE), 0); + + // Verify + EXPECT_EQ(memcmp(h_src.data(), h_dst.data(), TEST_SIZE), 0); + + // 7. Unmap buffer + ASSERT_EQ(gdrcopy::unmap(gdrHandle, mh, map_ptr, TEST_SIZE), 0); + + // 8. Unpin buffer + ASSERT_EQ(gdrcopy::unpin_buffer(gdrHandle, mh), 0); + + // 9. Free device memory + TLLM_CUDA_CHECK(cudaFree(d_buffer)); +} + +TEST_F(GdrCopyTest, HelperLifecycle) +{ + char* gdr_ptr = nullptr; + char* dev_ptr = nullptr; + GdrMemDesc* mem_desc = nullptr; + + // 1. Allocate using helper + ASSERT_NO_THROW(gdrcopy::gdrCudaMalloc(&gdr_ptr, &dev_ptr, TEST_SIZE, &mem_desc, gdrHandle)); + ASSERT_NE(gdr_ptr, nullptr); + ASSERT_NE(dev_ptr, nullptr); + ASSERT_NE(mem_desc, nullptr); + + // 2. Prepare host data and copy to GDR mapped pointer + std::vector h_src(TEST_SIZE); + for (size_t i = 0; i < TEST_SIZE; ++i) + { + h_src[i] = static_cast((i + 1) % 256); + } + memcpy(gdr_ptr, h_src.data(), TEST_SIZE); + + // 3. IMPORTANT: Flush WC buffer to ensure data is written to the device + gdrcopy::wc_store_fence(); + + // 4. Verify by copying back from the *device* pointer + std::vector h_dst(TEST_SIZE, 0); + TLLM_CUDA_CHECK(cudaMemcpy(h_dst.data(), dev_ptr, TEST_SIZE, cudaMemcpyDeviceToHost)); + EXPECT_EQ(memcmp(h_src.data(), h_dst.data(), TEST_SIZE), 0); + + // 5. Free using helper + ASSERT_NO_THROW(gdrcopy::gdrCudaFree(mem_desc, gdrHandle)); +} + +} // namespace +} // namespace tensorrt_llm::runtime::gdrcopy + +#endif // _WIN32 From 5fb06487efa7930076bb94af6e6c0775cad00c36 Mon Sep 17 00:00:00 2001 From: Dongxu Yang <78518666+dongxuy04@users.noreply.github.com> Date: Mon, 9 Jun 2025 15:27:52 +0800 Subject: [PATCH 02/13] add get gpu memory numa id interface Signed-off-by: Dongxu Yang <78518666+dongxuy04@users.noreply.github.com> --- .../moeLoadBalancer/topologyDetector.cpp | 57 +++++++++---------- .../moeLoadBalancer/topologyDetector.h | 9 ++- 2 files changed, 34 insertions(+), 32 deletions(-) diff --git a/cpp/tensorrt_llm/runtime/moeLoadBalancer/topologyDetector.cpp b/cpp/tensorrt_llm/runtime/moeLoadBalancer/topologyDetector.cpp index 01ee8297f77..8f3685827d5 100644 --- a/cpp/tensorrt_llm/runtime/moeLoadBalancer/topologyDetector.cpp +++ b/cpp/tensorrt_llm/runtime/moeLoadBalancer/topologyDetector.cpp @@ -166,12 +166,14 @@ void TopologyDetector::detectGpuTopology() { return; } - mGpuToNumaMap.clear(); // Clear before re-populating - mNumaToGpuMap.clear(); // Clear before re-populating + mGpuToNumaMap.clear(); // Clear before re-populating + mGpuMemoryToNumaMap.clear(); // Clear before re-populating + mNumaToGpuMap.clear(); // Clear before re-populating for (int deviceId = 0; deviceId < deviceCount; ++deviceId) { - int numaNode = 0; // Default NUMA node + int numaNode = 0; // Default NUMA node + int numaMemoryNode = -1; // Default Memory NUMA node #ifdef __linux__ if (numa_available() != -1) @@ -214,9 +216,16 @@ void TopologyDetector::detectGpuTopology() // libnuma not available, default GPU to NUMA node 0 numaNode = 0; } + int hasMemoryNumaConfig = 0; + TLLM_CUDA_CHECK(cudaDeviceGetAttribute(&hasMemoryNumaConfig, cudaDevAttrNumaConfig, deviceId)); + if (hasMemoryNumaConfig == cudaDeviceNumaConfigNumaNode) + { + TLLM_CUDA_CHECK(cudaDeviceGetAttribute(&numaMemoryNode, cudaDevAttrNumaId, deviceId)); + } #endif mGpuToNumaMap[deviceId] = numaNode; + mGpuMemoryToNumaMap[deviceId] = numaMemoryNode; mNumaToGpuMap[numaNode].push_back(deviceId); } } @@ -392,10 +401,7 @@ int TopologyDetector::getCurrentGpuNumaCpuCount() int TopologyDetector::getCurrentGpuNumaId() { int currentDevice = -1; - if (cudaGetDevice(¤tDevice) != cudaSuccess) - { - return -1; // Indicate error or no CUDA device context - } + TLLM_CUDA_CHECK(cudaGetDevice(¤tDevice)); auto it = mGpuToNumaMap.find(currentDevice); if (it != mGpuToNumaMap.end()) @@ -406,6 +412,21 @@ int TopologyDetector::getCurrentGpuNumaId() return 0; } +int TopologyDetector::getCurrentGpuMemoryNumaId() +{ + int currentDevice = -1; + TLLM_CUDA_CHECK(cudaGetDevice(¤tDevice)); + + auto it = mGpuMemoryToNumaMap.find(currentDevice); + if (it != mGpuMemoryToNumaMap.end()) + { + return it->second; + } + TLLM_LOG_WARNING( + "NUMA node for current GPU Memory %d not found in map. Defaulting to node -1 (No Memory Node).", currentDevice); + return -1; +} + int TopologyDetector::getGpuCountUnderNuma(int numaId) { auto it = mNumaToGpuMap.find(numaId); @@ -421,26 +442,4 @@ std::string TopologyDetector::getCpuArchitecture() return mCpuArchitecture; } -bool TopologyDetector::canSupportHostNativeAtomics() -{ - int currentDevice = -1; - if (cudaGetDevice(¤tDevice) != cudaSuccess) - { - TLLM_LOG_WARNING("Failed to get current CUDA device for atomic support check."); - return false; - } - - int hostNativeAtomicSupported = 0; - cudaError_t err - = cudaDeviceGetAttribute(&hostNativeAtomicSupported, cudaDevAttrHostNativeAtomicSupported, currentDevice); - - if (err != cudaSuccess) - { - TLLM_LOG_WARNING("Failed to get cudaDevAttrHostNativeAtomicSupported for device %d. Error: %s", currentDevice, - cudaGetErrorString(err)); - return false; - } - return static_cast(hostNativeAtomicSupported); -} - } // namespace tensorrt_llm::runtime diff --git a/cpp/tensorrt_llm/runtime/moeLoadBalancer/topologyDetector.h b/cpp/tensorrt_llm/runtime/moeLoadBalancer/topologyDetector.h index f1cd279dbbb..1ae18f77504 100644 --- a/cpp/tensorrt_llm/runtime/moeLoadBalancer/topologyDetector.h +++ b/cpp/tensorrt_llm/runtime/moeLoadBalancer/topologyDetector.h @@ -55,6 +55,11 @@ class TopologyDetector // Returns 0 as a default or -1 on error. int getCurrentGpuNumaId(); + // Returns the ID of the NUMA node that current GPU's memory is assigned. + // GPUs using C2C link with CPU may have assigned NUMA ID for its memory, like GB200. + // Returns -1 if it doesn't have NUMA ID. + int getCurrentGpuMemoryNumaId(); + // Returns the number of GPUs associated with the given NUMA node ID. int getGpuCountUnderNuma(int numaId); @@ -67,9 +72,6 @@ class TopologyDetector // Returns the detected CPU architecture (e.g., "x86_64", "aarch64"). std::string getCpuArchitecture(); - // Checks if the current CUDA device and host system support native atomic operations. - bool canSupportHostNativeAtomics(); - #ifdef __linux__ // Getters for precomputed CPU affinity masks const struct bitmask* getStrictCpuMaskForGpu(int gpuId) const; @@ -85,6 +87,7 @@ class TopologyDetector // Member variables std::map mGpuToNumaMap; // GPU ID -> NUMA Node ID + std::map mGpuMemoryToNumaMap; // GPU ID -> Memory NUMA Node ID std::map> mNumaToGpuMap; // NUMA Node ID -> List of GPU IDs std::map mNumaToCpuCountMap; // NUMA Node ID -> CPU Core Count std::string mCpuArchitecture; From ef03b4ee6b0cb69e030eee2e3f63788f2a22bada Mon Sep 17 00:00:00 2001 From: Dongxu Yang <78518666+dongxuy04@users.noreply.github.com> Date: Wed, 11 Jun 2025 13:57:23 +0800 Subject: [PATCH 03/13] add PCIe support and use NUMA allocation for GB200 Signed-off-by: Dongxu Yang <78518666+dongxuy04@users.noreply.github.com> --- cpp/tensorrt_llm/runtime/CMakeLists.txt | 1 + .../runtime/moeLoadBalancer/gdrwrap.cpp | 40 +++ .../runtime/moeLoadBalancer/gdrwrap.h | 46 +--- .../hostAccessibleDeviceAllocator.cpp | 182 ++++++++++++++ .../hostAccessibleDeviceAllocator.h | 154 ++++++++++++ .../moeLoadBalancer/moeLoadBalancer.cpp | 90 +++++-- .../runtime/moeLoadBalancer/moeLoadBalancer.h | 10 +- .../moeLoadBalancer/topologyDetector.cpp | 17 ++ .../moeLoadBalancer/topologyDetector.h | 6 + cpp/tensorrt_llm/thop/moeLoadBalanceOp.cpp | 35 ++- cpp/tests/unit_tests/runtime/CMakeLists.txt | 2 + cpp/tests/unit_tests/runtime/gdrcopyTest.cpp | 3 +- .../hostAccessibleDeviceAllocatorTest.cpp | 236 ++++++++++++++++++ .../modules/fused_moe/moe_load_balancer.py | 27 +- .../_torch/modules/fused_moe/quantization.py | 100 +++++--- 15 files changed, 809 insertions(+), 140 deletions(-) create mode 100644 cpp/tensorrt_llm/runtime/moeLoadBalancer/hostAccessibleDeviceAllocator.cpp create mode 100644 cpp/tensorrt_llm/runtime/moeLoadBalancer/hostAccessibleDeviceAllocator.h create mode 100644 cpp/tests/unit_tests/runtime/hostAccessibleDeviceAllocatorTest.cpp diff --git a/cpp/tensorrt_llm/runtime/CMakeLists.txt b/cpp/tensorrt_llm/runtime/CMakeLists.txt index a66c1266059..c517f98cabc 100644 --- a/cpp/tensorrt_llm/runtime/CMakeLists.txt +++ b/cpp/tensorrt_llm/runtime/CMakeLists.txt @@ -44,6 +44,7 @@ set(SRCS mcastDeviceMemory.cpp memoryCounters.cpp moeLoadBalancer/gdrwrap.cpp + moeLoadBalancer/hostAccessibleDeviceAllocator.cpp moeLoadBalancer/moeLoadBalancer.cpp moeLoadBalancer/topologyDetector.cpp ncclCommunicator.cpp diff --git a/cpp/tensorrt_llm/runtime/moeLoadBalancer/gdrwrap.cpp b/cpp/tensorrt_llm/runtime/moeLoadBalancer/gdrwrap.cpp index d8324af1005..c648afeda89 100644 --- a/cpp/tensorrt_llm/runtime/moeLoadBalancer/gdrwrap.cpp +++ b/cpp/tensorrt_llm/runtime/moeLoadBalancer/gdrwrap.cpp @@ -200,6 +200,46 @@ int copy_from_mapping(gdr_mh_t handle, void* h_ptr, void const* map_d_ptr, size_ return GDRLOCKCALL(gdr_internal_copy_from_mapping(handle, h_ptr, map_d_ptr, size)); } +void gdrCudaMalloc(void** ptr, void** devPtr, size_t mapSize, GdrMemDesc** memDesc, gdr_t handle) +{ + TLLM_CHECK_WITH_INFO(isInitialized(), "GDRCopy library is not initialized."); + gdr_info_t info; + gdr_mh_t mh; + char* devMem; + void* gdrMap; + + // GDRCOPY Pinned buffer has to be a minimum of a GPU_PAGE_SIZE + size_t alignedMapSize = (mapSize + GPU_PAGE_SIZE - 1) & GPU_PAGE_MASK; + if (alignedMapSize == 0 && mapSize > 0) + { + alignedMapSize = GPU_PAGE_SIZE; + } + TLLM_CUDA_CHECK(cudaMalloc(&devMem, alignedMapSize + GPU_PAGE_SIZE - 1)); + uint64_t alignedAddr = ((uint64_t) devMem + GPU_PAGE_OFFSET) & GPU_PAGE_MASK; + size_t align = alignedAddr - (uint64_t) devMem; + + TLLM_CHECK_WITH_INFO(pin_buffer(handle, alignedAddr, alignedMapSize, 0, 0, &mh) == 0, "GDR pin_buffer failed"); + TLLM_CHECK_WITH_INFO(map(handle, mh, &gdrMap, alignedMapSize) == 0, "GDR map failed"); + TLLM_CHECK_WITH_INFO(get_info(handle, mh, &info) == 0, "GDR get_info failed"); + + ssize_t off = info.va - alignedAddr; + + *memDesc = new GdrMemDesc(); + (*memDesc)->gdrDeviceMem = devMem; + (*memDesc)->gdrMap = gdrMap; + (*memDesc)->gdrMapSize = alignedMapSize; + (*memDesc)->gdrOffset = off + align; + (*memDesc)->gdrMh = mh; + + *ptr = (void*) ((char*) gdrMap + off); + if (devPtr) + *devPtr = (void*) (devMem + off + align); + + TLLM_LOG_DEBUG("GDRCOPY : allocated devMem %p gdrMap %p offset %lx mh %lx mapSize %zu at %p", + (*memDesc)->gdrDeviceMem, (*memDesc)->gdrMap, (*memDesc)->gdrOffset, (*memDesc)->gdrMh.h, + (*memDesc)->gdrMapSize, *ptr); +} + void gdrCudaFree(GdrMemDesc* memDesc, gdr_t handle) { CHECK_INITIALIZED(); diff --git a/cpp/tensorrt_llm/runtime/moeLoadBalancer/gdrwrap.h b/cpp/tensorrt_llm/runtime/moeLoadBalancer/gdrwrap.h index 4591f0f7916..265d089bd9f 100644 --- a/cpp/tensorrt_llm/runtime/moeLoadBalancer/gdrwrap.h +++ b/cpp/tensorrt_llm/runtime/moeLoadBalancer/gdrwrap.h @@ -131,6 +131,7 @@ inline void gdrCudaFree(GdrMemDesc* /*memDesc*/, gdr_t /*handle*/) #include "tensorrt_llm/common/assert.h" #include "tensorrt_llm/common/cudaUtils.h" #include "tensorrt_llm/common/logger.h" +#include #include #include #include @@ -237,50 +238,7 @@ struct GdrMemDesc }; // Allocates memory that can be used with GDRCopy. -// Throws an exception on failure. -template -void gdrCudaMalloc(T** ptr, T** devPtr, size_t nelem, GdrMemDesc** memDesc, gdr_t handle) -{ - TLLM_CHECK_WITH_INFO(isInitialized(), "GDRCopy library is not initialized."); - gdr_info_t info; - size_t mapSize; - gdr_mh_t mh; - char* devMem; - void* gdrMap; - - mapSize = sizeof(T) * nelem; - - // GDRCOPY Pinned buffer has to be a minimum of a GPU_PAGE_SIZE - size_t alignedMapSize = (mapSize + GPU_PAGE_SIZE - 1) & GPU_PAGE_MASK; - if (alignedMapSize == 0 && mapSize > 0) - { - alignedMapSize = GPU_PAGE_SIZE; - } - TLLM_CUDA_CHECK(cudaMalloc(&devMem, alignedMapSize + GPU_PAGE_SIZE - 1)); - uint64_t alignedAddr = ((uint64_t) devMem + GPU_PAGE_OFFSET) & GPU_PAGE_MASK; - size_t align = alignedAddr - (uint64_t) devMem; - - TLLM_CHECK_WITH_INFO(pin_buffer(handle, alignedAddr, alignedMapSize, 0, 0, &mh) == 0, "GDR pin_buffer failed"); - TLLM_CHECK_WITH_INFO(map(handle, mh, &gdrMap, alignedMapSize) == 0, "GDR map failed"); - TLLM_CHECK_WITH_INFO(get_info(handle, mh, &info) == 0, "GDR get_info failed"); - - ssize_t off = info.va - alignedAddr; - - *memDesc = new GdrMemDesc(); - (*memDesc)->gdrDeviceMem = devMem; - (*memDesc)->gdrMap = gdrMap; - (*memDesc)->gdrMapSize = alignedMapSize; - (*memDesc)->gdrOffset = off + align; - (*memDesc)->gdrMh = mh; - - *ptr = (T*) ((char*) gdrMap + off); - if (devPtr) - *devPtr = (T*) (devMem + off + align); - - TLLM_LOG_DEBUG("GDRCOPY : allocated devMem %p gdrMap %p offset %lx mh %lx mapSize %zu at %p", - (*memDesc)->gdrDeviceMem, (*memDesc)->gdrMap, (*memDesc)->gdrOffset, (*memDesc)->gdrMh.h, - (*memDesc)->gdrMapSize, *ptr); -} +void gdrCudaMalloc(void** ptr, void** devPtr, size_t mapSize, GdrMemDesc** memDesc, gdr_t handle); // Frees memory allocated with gdrCudaMalloc. void gdrCudaFree(GdrMemDesc* memDesc, gdr_t handle); diff --git a/cpp/tensorrt_llm/runtime/moeLoadBalancer/hostAccessibleDeviceAllocator.cpp b/cpp/tensorrt_llm/runtime/moeLoadBalancer/hostAccessibleDeviceAllocator.cpp new file mode 100644 index 00000000000..979a210afe9 --- /dev/null +++ b/cpp/tensorrt_llm/runtime/moeLoadBalancer/hostAccessibleDeviceAllocator.cpp @@ -0,0 +1,182 @@ +/* + * Copyright (c) 2022-2024, NVIDIA CORPORATION. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include + +#include "gdrwrap.h" +#include "hostAccessibleDeviceAllocator.h" +#include "topologyDetector.h" + +#include "tensorrt_llm/common/cudaUtils.h" +#include "tensorrt_llm/common/logger.h" + +namespace tensorrt_llm::runtime +{ + +void HostAccessibleDeviceAllocator::init() +{ + TLLM_CHECK(mIsInited == false); + TLLM_CUDA_CHECK(cudaGetDevice(&mDevId)); + mGpuMemNumaId = TopologyDetector::getInstance().getCurrentGpuMemoryNumaId(); + if (mGpuMemNumaId < 0) + { + // We only use GDRCopy when there is no NUMA node for GPU memory. + bool gdrCopyInitedSuccess = true; + if (!tensorrt_llm::runtime::gdrcopy::isInitialized() && !tensorrt_llm::runtime::gdrcopy::initialize()) + { + gdrCopyInitedSuccess = false; + } + if (gdrCopyInitedSuccess) + { + mGdrHandle = tensorrt_llm::runtime::gdrcopy::open(); + } + } + mIsInited = true; +} + +void HostAccessibleDeviceAllocator::shutdown() +{ + if (mIsInited == false) + { + return; + } + // We should close GDRCopy handle in the last MoeLoadBalancer, + // But there might be some allocated memory not freed, so we can't close GDRCopy handle. + // So for now, we don't close GDRCopy handle. +#if 0 + if (mGdrHandle != nullptr) { + tensorrt_llm::runtime::gdrcopy::close(mGdrHandle); + mGdrHandle = nullptr; + } +#endif + mIsInited = false; +} + +HostAccessibleDeviceAllocator& HostAccessibleDeviceAllocator::getInstance() +{ + static HostAccessibleDeviceAllocator instance; + return instance; +} + +void HostAccessibleDeviceAllocator::IncRefCount() +{ + std::lock_guard lock(mRefMutex); + if (mLoadBalancerCount == 0) + { + init(); + } + mLoadBalancerCount++; +} + +void HostAccessibleDeviceAllocator::DecRefCount() +{ + std::lock_guard lock(mRefMutex); + mLoadBalancerCount--; + if (mLoadBalancerCount == 0) + { + shutdown(); + } +} + +void HostAccessibleDeviceAllocator::recordAllocation( + void* devPtr, size_t memorySize, void* hostPtr, gdrcopy::GdrMemDesc* memDesc) +{ + std::lock_guard lock(mAllocationsMutex); + mAllocations[devPtr] = {memorySize, hostPtr, memDesc}; +} + +void* HostAccessibleDeviceAllocator::getHostPtr(void* devPtr) +{ + std::lock_guard lock(mAllocationsMutex); + if (mAllocations.empty()) + { + return nullptr; + } + + auto it = mAllocations.upper_bound(devPtr); + if (it == mAllocations.begin()) + { + return nullptr; + } + + --it; + + void* recordedDevPtr = it->first; + auto const& allocationInfo = it->second; + size_t recordedSize = allocationInfo.size; + void* recordedHostPtr = allocationInfo.hostPtr; + + auto pDev = static_cast(devPtr); + auto pRecordedDev = static_cast(recordedDevPtr); + + if (pDev >= pRecordedDev && pDev < (pRecordedDev + recordedSize)) + { + ptrdiff_t offset = pDev - pRecordedDev; + return static_cast(recordedHostPtr) + offset; + } + + return nullptr; +} + +void* HostAccessibleDeviceAllocator::allocate(size_t memorySize) +{ + int currentDevId = -1; + TLLM_CUDA_CHECK(cudaGetDevice(¤tDevId)); + TLLM_CHECK_WITH_INFO(currentDevId == mDevId, + "HostAccessibleDeviceAllocator is not initialized for the current device, currentDevId=%d, mDevId=%d", + currentDevId, mDevId); + TLLM_CHECK_WITH_INFO(isSupported(), "HostAccessibleDeviceAllocator is not supported on the current system."); + void* devPtr = nullptr; + void* hostPtr = nullptr; + gdrcopy::GdrMemDesc* memDesc = nullptr; + if (mGpuMemNumaId >= 0) + { + devPtr = TopologyDetector::getInstance().allocateCurrentGpuNumaMemory(memorySize); + hostPtr = devPtr; + } + else + { + gdrcopy::gdrCudaMalloc(&hostPtr, &devPtr, memorySize, &memDesc, mGdrHandle); + } + recordAllocation(devPtr, memorySize, hostPtr, memDesc); + return devPtr; +} + +void HostAccessibleDeviceAllocator::free(void* ptr) +{ + std::lock_guard lock(mAllocationsMutex); + auto it = mAllocations.find(ptr); + if (it != mAllocations.end()) + { + auto const& allocInfo = it->second; + if (allocInfo.memDesc) + { + gdrcopy::gdrCudaFree(allocInfo.memDesc, mGdrHandle); + } + else + { + TopologyDetector::getInstance().freeCurrentGpuNumaMemory(it->first, allocInfo.size); + } + mAllocations.erase(it); + } + else + { + TLLM_LOG_WARNING("Attempted to free a pointer that was not allocated by HostAccessibleDeviceAllocator."); + } +} + +} // namespace tensorrt_llm::runtime diff --git a/cpp/tensorrt_llm/runtime/moeLoadBalancer/hostAccessibleDeviceAllocator.h b/cpp/tensorrt_llm/runtime/moeLoadBalancer/hostAccessibleDeviceAllocator.h new file mode 100644 index 00000000000..679df172b26 --- /dev/null +++ b/cpp/tensorrt_llm/runtime/moeLoadBalancer/hostAccessibleDeviceAllocator.h @@ -0,0 +1,154 @@ +/* + * Copyright (c) 2022-2024, NVIDIA CORPORATION. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include +#include + +#include "gdrwrap.h" +#include "topologyDetector.h" + +namespace tensorrt_llm::runtime +{ + +class MoeLoadBalancer; + +namespace unit_tests +{ +class HostAccessibleDeviceAllocatorTest; +} + +class HostAccessibleDeviceAllocator +{ +public: + // Delete the copy constructor and copy assignment operator to prevent cloning. + HostAccessibleDeviceAllocator(HostAccessibleDeviceAllocator const&) = delete; + void operator=(HostAccessibleDeviceAllocator const&) = delete; + + /** + * @brief Get the single instance of the HostAccessibleDeviceAllocator. + * + * @return HostAccessibleDeviceAllocator& Reference to the singleton instance. + */ + static HostAccessibleDeviceAllocator& getInstance(); + + /** + * @brief Allocate host accessible memory on the device. + * + * @param memorySize The size of the memory to allocate. + * @return void* Pointer to the allocated memory. + */ + void* allocate(size_t memorySize); + + /** + * @brief Free the allocated memory. + * + * @param ptr Pointer to the memory to free. + */ + void free(void* ptr); + + /** + * @brief Get the host-accessible pointer for a given device pointer. + * + * @param devPtr The device pointer to look up. It can be a pointer inside a recorded allocation. + * @return void* The corresponding host-accessible pointer, or nullptr if not found. + */ + void* getHostPtr(void* devPtr); + + /** + * @brief Check if host accessible memory is supported on the current system. + * @note This function should be called after IncRefCount() by some MoeLoadBalancer. + * + * @return bool True if host accessible memory is supported, false otherwise. + */ + bool isSupported() const + { + return mGpuMemNumaId >= 0 || mGdrHandle != nullptr; + } + +private: + struct AllocationInfo + { + size_t size; + void* hostPtr; + gdrcopy::GdrMemDesc* memDesc; + }; + + /** + * @brief Private constructor to prevent direct instantiation. + * + * Initialization logic for the allocator (like initializing GDRCopy) + * can be placed here. + */ + HostAccessibleDeviceAllocator() = default; + + /** + * @brief Initialize the allocator. + */ + void init(); + + /** + * @brief Shutdown the allocator. + */ + void shutdown(); + + friend class tensorrt_llm::runtime::MoeLoadBalancer; + friend class tensorrt_llm::runtime::unit_tests::HostAccessibleDeviceAllocatorTest; + + /** + * @brief Increment the reference count of the load balancer. + * This Allocator is shared by multiple MoeLoadBalancers, so we need to + * increment the reference count when a new MoeLoadBalancer is created. + * They may share the same GDR handle. + */ + void IncRefCount(); + + /** + * @brief Decrement the reference count of the load balancer. + * This Allocator is shared by multiple MoeLoadBalancers, so we need to + * decrement the reference count when a MoeLoadBalancer is destroyed. + * If the reference count is 0, we need to close the GDR handle. + */ + void DecRefCount(); + + /** + * @brief Record a device memory allocation and its corresponding host-accessible pointer. + * + * @param devPtr The device pointer of the allocated memory. + * @param memorySize The size of the allocated memory. + * @param hostPtr The corresponding host-accessible pointer. + * @param memDesc Optional GDR memory descriptor if allocated with GDRCopy. + */ + void recordAllocation(void* devPtr, size_t memorySize, void* hostPtr, gdrcopy::GdrMemDesc* memDesc = nullptr); + + // if GPU memory has NUMA id, then CPU can direct access that. We should use this. + int mGpuMemNumaId = -1; + // if Not, we should use GDRCopy + gdr_t mGdrHandle = nullptr; + + int mDevId = -1; + + bool mIsInited = false; + std::mutex mRefMutex; + int mLoadBalancerCount = 0; + + std::mutex mAllocationsMutex; + std::map mAllocations; +}; + +} // namespace tensorrt_llm::runtime diff --git a/cpp/tensorrt_llm/runtime/moeLoadBalancer/moeLoadBalancer.cpp b/cpp/tensorrt_llm/runtime/moeLoadBalancer/moeLoadBalancer.cpp index 292cf1110c2..67c1988a0de 100644 --- a/cpp/tensorrt_llm/runtime/moeLoadBalancer/moeLoadBalancer.cpp +++ b/cpp/tensorrt_llm/runtime/moeLoadBalancer/moeLoadBalancer.cpp @@ -15,6 +15,7 @@ */ #include "moeLoadBalancer.h" +#include "hostAccessibleDeviceAllocator.h" #include "tensorrt_llm/common/assert.h" #include "tensorrt_llm/common/cudaUtils.h" #include "tensorrt_llm/kernels/moeLoadBalance/moeLoadBalanceKernels.h" @@ -289,9 +290,9 @@ void freeStatisticInfo(tensorrt_llm::kernels::MoeLoadBalanceStatisticInfo* stati } void allocatePlacementInfo(tensorrt_llm::kernels::MoeLoadBalanceMetaInfo const& metaInfo, - tensorrt_llm::kernels::MoePlacementInfo* placementInfo, bool isCpu = false, bool useManaged = false) + tensorrt_llm::kernels::MoePlacementInfo* placementInfo, bool isCpu = false) { - auto allocFn = [isCpu, useManaged](void** ptr, size_t size) + auto allocFn = [isCpu](void** ptr, size_t size) { if (isCpu) { @@ -299,15 +300,9 @@ void allocatePlacementInfo(tensorrt_llm::kernels::MoeLoadBalanceMetaInfo const& } else { - if (useManaged) + if (HostAccessibleDeviceAllocator::getInstance().isSupported()) { - TLLM_CUDA_CHECK(cudaMallocManaged(ptr, size)); - int cur_dev; - TLLM_CUDA_CHECK(cudaGetDevice(&cur_dev)); - TLLM_CUDA_CHECK(cudaMemAdvise(*ptr, size, cudaMemAdviseSetPreferredLocation, cur_dev)); - TLLM_CUDA_CHECK(cudaMemAdvise(*ptr, size, cudaMemAdviseSetAccessedBy, cur_dev)); - TLLM_CUDA_CHECK(cudaMemAdvise(*ptr, size, cudaMemAdviseSetAccessedBy, cudaCpuDeviceId)); - TLLM_CUDA_CHECK(cudaMemset(*ptr, 0, size)); + *ptr = HostAccessibleDeviceAllocator::getInstance().allocate(size); return cudaSuccess; } else @@ -334,7 +329,15 @@ void freePlacementInfo(tensorrt_llm::kernels::MoePlacementInfo* placementInfo, b } else { - return cudaFree(ptr); + if (HostAccessibleDeviceAllocator::getInstance().isSupported()) + { + HostAccessibleDeviceAllocator::getInstance().free(ptr); + return cudaSuccess; + } + else + { + return cudaFree(ptr); + } } }; TLLM_CUDA_CHECK(freeFn(placementInfo->expertReplicaCount)); @@ -342,6 +345,26 @@ void freePlacementInfo(tensorrt_llm::kernels::MoePlacementInfo* placementInfo, b TLLM_CUDA_CHECK(freeFn(placementInfo->globalSlotIds)); } +void getHostAccessibleGpuPlacement(tensorrt_llm::kernels::MoePlacementInfo const* placementInfoGpuAccess, + tensorrt_llm::kernels::MoePlacementInfo* placementInfoHostAccess) +{ + if (HostAccessibleDeviceAllocator::getInstance().isSupported()) + { + placementInfoHostAccess->expertReplicaStartOffset = static_cast( + HostAccessibleDeviceAllocator::getInstance().getHostPtr(placementInfoGpuAccess->expertReplicaStartOffset)); + placementInfoHostAccess->expertReplicaCount = static_cast( + HostAccessibleDeviceAllocator::getInstance().getHostPtr(placementInfoGpuAccess->expertReplicaCount)); + placementInfoHostAccess->globalSlotIds = static_cast( + HostAccessibleDeviceAllocator::getInstance().getHostPtr(placementInfoGpuAccess->globalSlotIds)); + } + else + { + placementInfoHostAccess->expertReplicaStartOffset = nullptr; + placementInfoHostAccess->expertReplicaCount = nullptr; + placementInfoHostAccess->globalSlotIds = nullptr; + } +} + tensorrt_llm::kernels::MoeLoadBalanceSingleLayerSignal* allocateSingleLayerSignal() { tensorrt_llm::kernels::MoeLoadBalanceSingleLayerSignal* ptr = nullptr; @@ -377,7 +400,7 @@ SingleLayerMoeLoadBalancer::~SingleLayerMoeLoadBalancer() void SingleLayerMoeLoadBalancer::addSingleWeightSlot(int localSlotId, std::string const& name, MoeWeight weightSlot) { - mWeightUpdater->addSingleWeightSlot(localSlotId, name, weightSlot); + mWeightUpdater->addSingleWeightSlot(localSlotId, name, weightSlot, mMoeLoadBalancer->mUseGpuMemcpy); } void SingleLayerMoeLoadBalancer::addSingleHostWeight(int expertId, std::string const& name, MoeWeight hostWeight) @@ -420,7 +443,8 @@ void SingleLayerMoeLoadBalancer::createResources() } allocatePlacementInfo(mMetaInfo, &mCpuPlacementInfo.placementInfoForGPU, true); - allocatePlacementInfo(mMetaInfo, &mGpuPlacement, false, true); + allocatePlacementInfo(mMetaInfo, &mGpuPlacementGpuAccess, false); + getHostAccessibleGpuPlacement(&mGpuPlacementGpuAccess, &mGpuPlacementHostAccess); mSingleLayerSignal = allocateSingleLayerSignal(); TLLM_CUDA_CHECK(cudaEventCreate(&mUpdateWeightsDoneEvent)); @@ -432,7 +456,7 @@ void SingleLayerMoeLoadBalancer::destroyResources() mWeightUpdater.reset(); freeStatisticInfo(&mStatisticInfo); freePlacementInfo(&mCpuPlacementInfo.placementInfoForGPU, true); - freePlacementInfo(&mGpuPlacement, false); + freePlacementInfo(&mGpuPlacementGpuAccess, false); freeSingleLayerSignal(mSingleLayerSignal); TLLM_CUDA_CHECK(cudaEventDestroy(mUpdateWeightsDoneEvent)); } @@ -499,14 +523,15 @@ cudaStream_t SingleLayerMoeLoadBalancer::getStream() const void SingleLayerMoeLoadBalancer::copyPlacementInfoToGpu() { cudaStream_t stream = mMoeLoadBalancer->mStream; - TLLM_CUDA_CHECK( - cudaMemcpyAsync(mGpuPlacement.expertReplicaCount, mCpuPlacementInfo.placementInfoForGPU.expertReplicaCount, - sizeof(int) * mMetaInfo.expertCount, cudaMemcpyHostToDevice, stream)); - TLLM_CUDA_CHECK(cudaMemcpyAsync(mGpuPlacement.expertReplicaStartOffset, + TLLM_CUDA_CHECK(cudaMemcpyAsync(mGpuPlacementGpuAccess.expertReplicaCount, + mCpuPlacementInfo.placementInfoForGPU.expertReplicaCount, sizeof(int) * mMetaInfo.expertCount, + cudaMemcpyHostToDevice, stream)); + TLLM_CUDA_CHECK(cudaMemcpyAsync(mGpuPlacementGpuAccess.expertReplicaStartOffset, mCpuPlacementInfo.placementInfoForGPU.expertReplicaStartOffset, sizeof(int) * mMetaInfo.expertCount, cudaMemcpyHostToDevice, stream)); - TLLM_CUDA_CHECK(cudaMemcpyAsync(mGpuPlacement.globalSlotIds, mCpuPlacementInfo.placementInfoForGPU.globalSlotIds, - sizeof(int) * mMetaInfo.epSize * mMetaInfo.slotCountPerRank, cudaMemcpyHostToDevice, stream)); + TLLM_CUDA_CHECK( + cudaMemcpyAsync(mGpuPlacementGpuAccess.globalSlotIds, mCpuPlacementInfo.placementInfoForGPU.globalSlotIds, + sizeof(int) * mMetaInfo.epSize * mMetaInfo.slotCountPerRank, cudaMemcpyHostToDevice, stream)); mCpuPlacementInfo.rankExpertIds.swap(mCpuPlacementInfo.oldRankExpertIds); for (int i = 0; i < mMetaInfo.epSize; ++i) { @@ -518,11 +543,11 @@ void SingleLayerMoeLoadBalancer::copyPlacementInfoToGpu() void SingleLayerMoeLoadBalancer::copyPlacementInfoToGpuByCpu() { - memcpy(mGpuPlacement.expertReplicaCount, mCpuPlacementInfo.placementInfoForGPU.expertReplicaCount, + memcpy(mGpuPlacementHostAccess.expertReplicaCount, mCpuPlacementInfo.placementInfoForGPU.expertReplicaCount, sizeof(int) * mMetaInfo.expertCount); - memcpy(mGpuPlacement.expertReplicaStartOffset, mCpuPlacementInfo.placementInfoForGPU.expertReplicaStartOffset, - sizeof(int) * mMetaInfo.expertCount); - memcpy(mGpuPlacement.globalSlotIds, mCpuPlacementInfo.placementInfoForGPU.globalSlotIds, + memcpy(mGpuPlacementHostAccess.expertReplicaStartOffset, + mCpuPlacementInfo.placementInfoForGPU.expertReplicaStartOffset, sizeof(int) * mMetaInfo.expertCount); + memcpy(mGpuPlacementHostAccess.globalSlotIds, mCpuPlacementInfo.placementInfoForGPU.globalSlotIds, sizeof(int) * mMetaInfo.epSize * mMetaInfo.slotCountPerRank); mCpuPlacementInfo.rankExpertIds.swap(mCpuPlacementInfo.oldRankExpertIds); for (int i = 0; i < mMetaInfo.epSize; ++i) @@ -574,7 +599,7 @@ MoeWeightUpdaterBase::MoeWeightUpdaterBase( } void MoeWeightUpdaterBase::addSingleWeightSlot( - int localSlotId, std::string const& name, tensorrt_llm::runtime::MoeWeight weightSlot) + int localSlotId, std::string const& name, tensorrt_llm::runtime::MoeWeight weightSlot, bool gpuAccess) { TLLM_CHECK_WITH_INFO(mWeightSlotsFinalized == false, "Cannot add slots after finalize"); TLLM_CHECK_WITH_INFO(localSlotId >= 0 && localSlotId < mMetaInfo.slotCountPerRank, @@ -589,6 +614,12 @@ void MoeWeightUpdaterBase::addSingleWeightSlot( } TLLM_CHECK_WITH_INFO(mWeightSlots[name][localSlotId].mWeightPtr == nullptr, "localSlotId=%d, name=%s already added.", localSlotId, name.c_str()); + if (!gpuAccess) + { + // if using cpu copy, change to host accessible pointer. + weightSlot.mWeightPtr + = tensorrt_llm::runtime::HostAccessibleDeviceAllocator::getInstance().getHostPtr(weightSlot.mWeightPtr); + } mWeightSlots[name][localSlotId] = weightSlot; } @@ -808,6 +839,9 @@ MoeLoadBalancer::MoeLoadBalancer(int epRank, int epSize, int layerUpdatesPerIter int currentGpuNumaId = topologyDetector.getCurrentGpuNumaId(); int numaCpuCount = topologyDetector.getCurrentGpuNumaCpuCount(); int numaGpuCount = topologyDetector.getGpuCountUnderNuma(currentGpuNumaId); + HostAccessibleDeviceAllocator::getInstance().IncRefCount(); + TLLM_CHECK_WITH_INFO(HostAccessibleDeviceAllocator::getInstance().isSupported(), + "HostAccessibleDeviceAllocator is not supported on current platform, please install gdrcopy(gdrdrv)."); TLLM_CHECK_WITH_INFO( numaCpuCount > 0 && numaGpuCount > 0, "numaCpuCount=%d, numaGpuCount=%d", numaCpuCount, numaGpuCount); int cpuCountPerGpu = std::max(1, numaCpuCount / numaGpuCount); @@ -837,7 +871,11 @@ MoeLoadBalancer::MoeLoadBalancer(int epRank, int epSize, int layerUpdatesPerIter mMultiThreadWorker.reset(new MultiThreadWorker(numCopyThreads)); } -MoeLoadBalancer::~MoeLoadBalancer() {} +MoeLoadBalancer::~MoeLoadBalancer() +{ + shutdown(); + HostAccessibleDeviceAllocator::getInstance().DecRefCount(); +} std::shared_ptr MoeLoadBalancer::AddLayer(int expertCount, int topK, int slotCountPerRank) { diff --git a/cpp/tensorrt_llm/runtime/moeLoadBalancer/moeLoadBalancer.h b/cpp/tensorrt_llm/runtime/moeLoadBalancer/moeLoadBalancer.h index 4c77963c69b..18960ec347e 100644 --- a/cpp/tensorrt_llm/runtime/moeLoadBalancer/moeLoadBalancer.h +++ b/cpp/tensorrt_llm/runtime/moeLoadBalancer/moeLoadBalancer.h @@ -79,7 +79,7 @@ class MoeWeightUpdaterBase virtual ~MoeWeightUpdaterBase() {} - void addSingleWeightSlot(int localSlotId, std::string const& name, MoeWeight weightSlot); + void addSingleWeightSlot(int localSlotId, std::string const& name, MoeWeight weightSlot, bool gpuAccess); virtual void addSingleHostWeight(int expertId, std::string const& name, MoeWeight hostWeight) = 0; virtual void finalizeWeights(); virtual void updateWeights(MoePlacementCpuInfo const* placementCpuInfo, int rank = 0, int size = 1) = 0; @@ -155,6 +155,11 @@ class SingleLayerMoeLoadBalancer return &mCpuPlacementInfo; } + tensorrt_llm::kernels::MoePlacementInfo getGpuPlacementInfo() + { + return mGpuPlacementGpuAccess; + } + tensorrt_llm::kernels::MoeLoadBalanceSingleLayerSignal* getSignal() { return mSingleLayerSignal; @@ -200,7 +205,8 @@ class SingleLayerMoeLoadBalancer tensorrt_llm::kernels::MoeLoadBalanceMetaInfo mMetaInfo; tensorrt_llm::kernels::MoeLoadBalanceStatisticInfo mStatisticInfo; MoePlacementCpuInfo mCpuPlacementInfo; - tensorrt_llm::kernels::MoePlacementInfo mGpuPlacement; + tensorrt_llm::kernels::MoePlacementInfo mGpuPlacementHostAccess; + tensorrt_llm::kernels::MoePlacementInfo mGpuPlacementGpuAccess; tensorrt_llm::kernels::MoeLoadBalanceSingleLayerSignal* mSingleLayerSignal = nullptr; std::mutex mUpdateWeightsMutex; diff --git a/cpp/tensorrt_llm/runtime/moeLoadBalancer/topologyDetector.cpp b/cpp/tensorrt_llm/runtime/moeLoadBalancer/topologyDetector.cpp index 8f3685827d5..f24c403de72 100644 --- a/cpp/tensorrt_llm/runtime/moeLoadBalancer/topologyDetector.cpp +++ b/cpp/tensorrt_llm/runtime/moeLoadBalancer/topologyDetector.cpp @@ -437,6 +437,23 @@ int TopologyDetector::getGpuCountUnderNuma(int numaId) return 0; } +void* TopologyDetector::allocateCurrentGpuNumaMemory(size_t memorySize) +{ + int currentDevice = -1; + TLLM_CUDA_CHECK(cudaGetDevice(¤tDevice)); + int numaId = getCurrentGpuMemoryNumaId(); + TLLM_CHECK_WITH_INFO(numaId >= 0, "Current GPU memory has no NUMA ID. Cannot allocate memory."); + void* ptr = numa_alloc_onnode(memorySize, numaId); + TLLM_CUDA_CHECK(cudaHostRegister(ptr, memorySize, cudaHostRegisterDefault)); + return ptr; +} + +void TopologyDetector::freeCurrentGpuNumaMemory(void* ptr, size_t memorySize) +{ + TLLM_CUDA_CHECK(cudaHostUnregister(ptr)); + numa_free(ptr, memorySize); +} + std::string TopologyDetector::getCpuArchitecture() { return mCpuArchitecture; diff --git a/cpp/tensorrt_llm/runtime/moeLoadBalancer/topologyDetector.h b/cpp/tensorrt_llm/runtime/moeLoadBalancer/topologyDetector.h index 1ae18f77504..b2730adb073 100644 --- a/cpp/tensorrt_llm/runtime/moeLoadBalancer/topologyDetector.h +++ b/cpp/tensorrt_llm/runtime/moeLoadBalancer/topologyDetector.h @@ -69,6 +69,12 @@ class TopologyDetector return getGpuCountUnderNuma(getCurrentGpuNumaId()); } + // Returns a pointer to a memory region on the current GPU's NUMA node. + void* allocateCurrentGpuNumaMemory(size_t memorySize); + + // Frees a memory region allocated by allocateCurrentGpuNumaMemory. + void freeCurrentGpuNumaMemory(void* ptr, size_t memorySize); + // Returns the detected CPU architecture (e.g., "x86_64", "aarch64"). std::string getCpuArchitecture(); diff --git a/cpp/tensorrt_llm/thop/moeLoadBalanceOp.cpp b/cpp/tensorrt_llm/thop/moeLoadBalanceOp.cpp index e694249105b..8724a860495 100644 --- a/cpp/tensorrt_llm/thop/moeLoadBalanceOp.cpp +++ b/cpp/tensorrt_llm/thop/moeLoadBalanceOp.cpp @@ -27,6 +27,7 @@ #include #include "tensorrt_llm/kernels/moeLoadBalance/moeLoadBalanceKernels.h" +#include "tensorrt_llm/runtime/moeLoadBalancer/hostAccessibleDeviceAllocator.h" #include "tensorrt_llm/runtime/moeLoadBalancer/moeLoadBalancer.h" namespace torch_ext @@ -159,38 +160,34 @@ torch::Tensor moeLoadBalanceRouting( auto tokenRoutedSlotIds = torch::empty_like(tokenSelectedExperts); - tensorrt_llm::kernels::moeComputeRouteDevice(metaInfo, loadBalancer->getPlacementCpuInfo()->placementInfoForGPU, + tensorrt_llm::kernels::moeComputeRouteDevice(metaInfo, loadBalancer->getGpuPlacementInfo(), tokenSelectedExperts.data_ptr(), tokenRoutedSlotIds.data_ptr(), tokenCount, offsetByEpRank, stream); return tokenRoutedSlotIds; } -void migrateToManaged(at::Tensor& tensor) +void migrateToHostAccessible(at::Tensor& tensor) { TORCH_CHECK(tensor.device().is_cuda(), "only support CUDA Tensor"); + TLLM_CHECK_WITH_INFO(tensorrt_llm::runtime::HostAccessibleDeviceAllocator::getInstance().isSupported(), + "host accessible allocator is not supported on system, please install GDRCopy."); + // 1) compute total bytes size_t byte_size = tensor.numel() * tensor.element_size(); - // 2) allocate UVM - void* managed_ptr = nullptr; - cudaError_t err = cudaMallocManaged(&managed_ptr, byte_size); - TORCH_CHECK(err == cudaSuccess, "cudaMallocManaged failed"); - - // 3) advise to place on current GPU - int cur_dev; - TLLM_CUDA_CHECK(cudaGetDevice(&cur_dev)); - TLLM_CUDA_CHECK(cudaMemAdvise(managed_ptr, byte_size, cudaMemAdviseSetPreferredLocation, cur_dev)); - TLLM_CUDA_CHECK(cudaMemAdvise(managed_ptr, byte_size, cudaMemAdviseSetAccessedBy, cur_dev)); - TLLM_CUDA_CHECK(cudaMemAdvise(managed_ptr, byte_size, cudaMemAdviseSetAccessedBy, cudaCpuDeviceId)); + // 2) allocate host accessible memory + void* devPtr = tensorrt_llm::runtime::HostAccessibleDeviceAllocator::getInstance().allocate(byte_size); - // 4) copy old data to UVM - TLLM_CUDA_CHECK(cudaMemcpy(managed_ptr, tensor.data_ptr(), byte_size, cudaMemcpyDeviceToDevice)); + // 3) copy old data to new memory + TLLM_CUDA_CHECK(cudaMemcpy(devPtr, tensor.data_ptr(), byte_size, cudaMemcpyDeviceToDevice)); - // 5) use new DataPtr/StorageImpl to construct storage + // 4) use new DataPtr/StorageImpl to construct storage // here managed_ptr is data,and also context,use cudaFree as deleter c10::DataPtr dp( - managed_ptr, managed_ptr, [](void* ptr) { cudaFree(ptr); }, tensor.device()); + devPtr, devPtr, + [](void* ptr) { tensorrt_llm::runtime::HostAccessibleDeviceAllocator::getInstance().free(ptr); }, + tensor.device()); auto allocator = c10::GetAllocator(tensor.device().type()); auto storage_impl = c10::make_intrusive(c10::StorageImpl::use_byte_size_t(), byte_size, std::move(dp), allocator, @@ -275,10 +272,10 @@ TORCH_LIBRARY_IMPL(trtllm, CUDA, m) TORCH_LIBRARY_FRAGMENT(trtllm, m) { - m.def("migrate_to_managed(Tensor tensor) -> ()"); + m.def("migrate_to_host_accessible(Tensor tensor) -> ()"); } TORCH_LIBRARY_IMPL(trtllm, CUDA, m) { - m.impl("migrate_to_managed", &torch_ext::migrateToManaged); + m.impl("migrate_to_host_accessible", &torch_ext::migrateToHostAccessible); } diff --git a/cpp/tests/unit_tests/runtime/CMakeLists.txt b/cpp/tests/unit_tests/runtime/CMakeLists.txt index 511783b9c8d..898ce395a39 100644 --- a/cpp/tests/unit_tests/runtime/CMakeLists.txt +++ b/cpp/tests/unit_tests/runtime/CMakeLists.txt @@ -13,6 +13,8 @@ add_gtest(bufferManagerTest bufferManagerTest.cpp) add_gtest(cudaMemPoolTest cudaMemPoolTest.cpp) add_gtest(decodingLayerWorkspaceTest decodingLayerWorkspaceTest.cpp) add_gtest(gdrcopyTest gdrcopyTest.cpp) +add_gtest(hostAccessibleDeviceAllocatorTest + hostAccessibleDeviceAllocatorTest.cpp) add_gtest(iBufferTest iBufferTest.cpp) add_gtest(iTensorTest iTensorTest.cpp) add_gtest(loraCacheTest loraCacheTest.cpp) diff --git a/cpp/tests/unit_tests/runtime/gdrcopyTest.cpp b/cpp/tests/unit_tests/runtime/gdrcopyTest.cpp index 5ff14d4b0ac..ff67512cc31 100644 --- a/cpp/tests/unit_tests/runtime/gdrcopyTest.cpp +++ b/cpp/tests/unit_tests/runtime/gdrcopyTest.cpp @@ -119,7 +119,8 @@ TEST_F(GdrCopyTest, HelperLifecycle) GdrMemDesc* mem_desc = nullptr; // 1. Allocate using helper - ASSERT_NO_THROW(gdrcopy::gdrCudaMalloc(&gdr_ptr, &dev_ptr, TEST_SIZE, &mem_desc, gdrHandle)); + ASSERT_NO_THROW(gdrcopy::gdrCudaMalloc( + reinterpret_cast(&gdr_ptr), reinterpret_cast(&dev_ptr), TEST_SIZE, &mem_desc, gdrHandle)); ASSERT_NE(gdr_ptr, nullptr); ASSERT_NE(dev_ptr, nullptr); ASSERT_NE(mem_desc, nullptr); diff --git a/cpp/tests/unit_tests/runtime/hostAccessibleDeviceAllocatorTest.cpp b/cpp/tests/unit_tests/runtime/hostAccessibleDeviceAllocatorTest.cpp new file mode 100644 index 00000000000..c8638fde34d --- /dev/null +++ b/cpp/tests/unit_tests/runtime/hostAccessibleDeviceAllocatorTest.cpp @@ -0,0 +1,236 @@ +/* + * Copyright (c) 2022-2024, NVIDIA CORPORATION. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "tensorrt_llm/runtime/moeLoadBalancer/hostAccessibleDeviceAllocator.h" +#include "tensorrt_llm/common/cudaUtils.h" +#include +#include +#include +#include + +namespace tensorrt_llm::runtime::unit_tests +{ + +// Kernel to verify data written by the host on the device. +// It checks if each element in the array devPtr has the expected value. +__global__ void verifyDataOnDevice(int const* devPtr, int size, bool* result) +{ + bool success = true; + for (int i = 0; i < size; ++i) + { + if (devPtr[i] != i) + { + success = false; + break; + } + } + *result = success; +} + +// Kernel to write data to device memory from the device. +// This is used to test if host can read data written by the device. +__global__ void writeDataOnDevice(int* devPtr, int size) +{ + for (int i = 0; i < size; ++i) + { + devPtr[i] = size - i; + } +} + +class HostAccessibleDeviceAllocatorTest : public ::testing::Test +{ +protected: + // SetUp is called before each test case. + void SetUp() override + { + // Get the allocator instance, which is a singleton. + allocator = &HostAccessibleDeviceAllocator::getInstance(); + // The allocator is initialized on the first IncRefCount call. + // This is to simulate a component (like MoeLoadBalancer) starting to use the allocator. + allocator->IncRefCount(); + } + + // TearDown is called after each test case. + void TearDown() override + { + // Decrement the reference count. + // The allocator will be shut down when the count reaches zero. + allocator->DecRefCount(); + } + + HostAccessibleDeviceAllocator* allocator; +}; + +// Test case to check the basic allocation and free functionality. +TEST_F(HostAccessibleDeviceAllocatorTest, AllocationAndFree) +{ + // Skip the test if host-accessible memory is not supported on the current system. + if (!allocator->isSupported()) + { + GTEST_SKIP() << "Host accessible memory is not supported on this system."; + } + + constexpr size_t allocSize = 1024; + void* devPtr = allocator->allocate(allocSize); + ASSERT_NE(devPtr, nullptr); + + // Free the allocated memory. + allocator->free(devPtr); + + // Test freeing a pointer that was not allocated by this allocator. + // This should not cause a crash but should be handled gracefully (e.g., by logging a warning). + int* dummyPtr = nullptr; + TLLM_CUDA_CHECK(cudaMalloc(&dummyPtr, 16)); + allocator->free(dummyPtr); + TLLM_CUDA_CHECK(cudaFree(dummyPtr)); +} + +// Test case to verify data access between the host and the device. +TEST_F(HostAccessibleDeviceAllocatorTest, HostDeviceDataAccess) +{ + if (!allocator->isSupported()) + { + GTEST_SKIP() << "Host accessible memory is not supported on this system."; + } + + constexpr size_t numInts = 256; + constexpr size_t allocSize = numInts * sizeof(int); + + void* devPtr = allocator->allocate(allocSize); + ASSERT_NE(devPtr, nullptr); + + void* hostPtr = allocator->getHostPtr(devPtr); + ASSERT_NE(hostPtr, nullptr); + + // 1. Write from the host, then read and verify from the device. + int* hostIntPtr = static_cast(hostPtr); + for (size_t i = 0; i < numInts; ++i) + { + hostIntPtr[i] = i; + } + + // Use a CUDA kernel to verify the data on the device. + bool deviceVerificationResult = false; + bool* d_result; + TLLM_CUDA_CHECK(cudaMalloc(&d_result, sizeof(bool))); + verifyDataOnDevice<<<1, 1>>>(static_cast(devPtr), numInts, d_result); + TLLM_CUDA_CHECK(cudaDeviceSynchronize()); + TLLM_CUDA_CHECK(cudaMemcpy(&deviceVerificationResult, d_result, sizeof(bool), cudaMemcpyDeviceToHost)); + TLLM_CUDA_CHECK(cudaFree(d_result)); + EXPECT_TRUE(deviceVerificationResult); + + // 2. Write from the device, then read and verify from the host. + writeDataOnDevice<<<1, 1>>>(static_cast(devPtr), numInts); + TLLM_CUDA_CHECK(cudaDeviceSynchronize()); + + for (size_t i = 0; i < numInts; ++i) + { + EXPECT_EQ(hostIntPtr[i], numInts - i); + } + + allocator->free(devPtr); +} + +// Test getting a host pointer for an offset device pointer. +TEST_F(HostAccessibleDeviceAllocatorTest, GetHostPtrForOffset) +{ + if (!allocator->isSupported()) + { + GTEST_SKIP() << "Host accessible memory is not supported on this system."; + } + + constexpr size_t allocSize = 1024; + void* baseDevPtr = allocator->allocate(allocSize); + ASSERT_NE(baseDevPtr, nullptr); + + void* baseHostPtr = allocator->getHostPtr(baseDevPtr); + ASSERT_NE(baseHostPtr, nullptr); + + // Check a pointer with an offset within the allocation. + ptrdiff_t offset = 128; + void* offsetDevPtr = static_cast(baseDevPtr) + offset; + void* offsetHostPtr = allocator->getHostPtr(offsetDevPtr); + + ASSERT_NE(offsetHostPtr, nullptr); + EXPECT_EQ(offsetHostPtr, static_cast(baseHostPtr) + offset); + + // Check a pointer to the last byte of the allocation. + offset = allocSize - 1; + offsetDevPtr = static_cast(baseDevPtr) + offset; + offsetHostPtr = allocator->getHostPtr(offsetDevPtr); + ASSERT_NE(offsetHostPtr, nullptr); + EXPECT_EQ(offsetHostPtr, static_cast(baseHostPtr) + offset); + + // Check a pointer just outside the allocation boundary. It should not be found. + void* outsideDevPtr = static_cast(baseDevPtr) + allocSize; + void* outsideHostPtr = allocator->getHostPtr(outsideDevPtr); + EXPECT_EQ(outsideHostPtr, nullptr); + + allocator->free(baseDevPtr); +} + +// Test multiple allocations and frees. +TEST_F(HostAccessibleDeviceAllocatorTest, MultipleAllocations) +{ + if (!allocator->isSupported()) + { + GTEST_SKIP() << "Host accessible memory is not supported on this system."; + } + + constexpr int numAllocs = 10; + constexpr size_t baseSize = 64; + std::vector devPtrs(numAllocs); + std::vector sizes(numAllocs); + + // Allocate multiple blocks of memory. + for (int i = 0; i < numAllocs; ++i) + { + sizes[i] = baseSize * (i + 1); + devPtrs[i] = allocator->allocate(sizes[i]); + ASSERT_NE(devPtrs[i], nullptr); + } + + // Verify each allocation by writing and reading a value. + for (int i = 0; i < numAllocs; ++i) + { + void* hostPtr = allocator->getHostPtr(devPtrs[i]); + ASSERT_NE(hostPtr, nullptr); + // Do a small write/read test. + static_cast(hostPtr)[0] = static_cast(i); + EXPECT_EQ(static_cast(hostPtr)[0], static_cast(i)); + } + + // Free all allocated blocks. + for (int i = 0; i < numAllocs; ++i) + { + allocator->free(devPtrs[i]); + } +} + +// Test that getHostPtr returns nullptr for a pointer that was not allocated +// by this allocator. +TEST_F(HostAccessibleDeviceAllocatorTest, GetHostPtrForUnallocated) +{ + if (!allocator->isSupported()) + { + GTEST_SKIP() << "Host accessible memory is not supported on this system."; + } + // Use an arbitrary pointer value. + void* devPtr = reinterpret_cast(0xDEADBEEF); + EXPECT_EQ(allocator->getHostPtr(devPtr), nullptr); +} + +} // namespace tensorrt_llm::runtime::unit_tests diff --git a/tensorrt_llm/_torch/modules/fused_moe/moe_load_balancer.py b/tensorrt_llm/_torch/modules/fused_moe/moe_load_balancer.py index d6db1b30f6a..4dcd85113e5 100644 --- a/tensorrt_llm/_torch/modules/fused_moe/moe_load_balancer.py +++ b/tensorrt_llm/_torch/modules/fused_moe/moe_load_balancer.py @@ -293,7 +293,7 @@ def __init__( layer_id, expert_count, shared_mpi_comm) if self.updates_enabled else None self.register_weight_fns = [] - self.to_fix_weight_fns = [] + self.to_migrate_weight_fns = [] shared_rank = shared_mpi_comm.Get_rank() shared_size = shared_mpi_comm.Get_size() @@ -399,11 +399,11 @@ def set_initial_weight_assignments(self, self.single_layer_load_balancer_impl.set_initial_weight_assignments( initial_weight_assignments) - def add_to_fix_weight_fn(self, - fn: Callable, - args: Tuple, - kwargs: Dict = {}): - self.to_fix_weight_fns.append((fn, args, kwargs)) + def add_to_migrate_weight_fn(self, + fn: Callable, + args: Tuple, + kwargs: Dict = {}): + self.to_migrate_weight_fns.append((fn, args, kwargs)) def add_register_weight_fn(self, fn: Callable, @@ -415,17 +415,18 @@ def add_register_weight_fn(self, """ self.register_weight_fns.append((fn, args, kwargs)) - def fix_tensor(self, wt: torch.Tensor): - torch.ops.trtllm.migrate_to_managed(wt) + def make_tensor_host_accessible(self, wt: torch.Tensor): + torch.ops.trtllm.migrate_to_host_accessible(wt) + torch.cuda.empty_cache() def register_weight_slots_after_to_cuda(self): """ Register weights after model has been moved to cuda, should be invoked after model.to("cuda") and before finalize_model. """ - for fn, args, kwargs in self.to_fix_weight_fns: + for fn, args, kwargs in self.to_migrate_weight_fns: fn(*args, **kwargs) - self.to_fix_weight_fns = [] + self.to_migrate_weight_fns = [] for fn, args, kwargs in self.register_weight_fns: fn(*args, **kwargs) @@ -665,6 +666,7 @@ def __init__(self, layer_updates_per_iter: The number of layers to update per iteration shared_memory_base_name: Shared memory base name """ + self.is_shutdown = True self.ep_rank = ep_rank self.ep_size = ep_size self.layer_updates_per_iter = layer_updates_per_iter @@ -889,8 +891,9 @@ def maybe_create_moe_load_balancer( model_config.moe_load_balancer.setup(ep_rank=ep_rank, ep_size=ep_size) if model_config.moe_load_balancer.layer_updates_per_iter > 0: # TODO: remove this when supported. - cpu_arch = platform.machine().lower() - assert cpu_arch == 'aarch64', "online load balancer only support aarch64, e.g. GB200 now, x86 coming soon." + # cpu_arch = platform.machine().lower() + # assert cpu_arch == 'aarch64', "online load balancer only support aarch64, e.g. GB200 now, x86 coming soon." + pass moe_load_balancer = MoeLoadBalancer( ep_rank=ep_rank, diff --git a/tensorrt_llm/_torch/modules/fused_moe/quantization.py b/tensorrt_llm/_torch/modules/fused_moe/quantization.py index b821b693b61..e25b99cb788 100644 --- a/tensorrt_llm/_torch/modules/fused_moe/quantization.py +++ b/tensorrt_llm/_torch/modules/fused_moe/quantization.py @@ -479,43 +479,71 @@ def get_quant_scales(self, module: torch.nn.Module, slot_start, 0, slot_start, slot_end - slot_start), ) - def load_quant_scales(self, module: torch.nn.Module, weights: Dict): - device = torch.device("cuda") - - all_w3_scales = [ - load_weight_shard(weights[f"{expert_id}.w3.weight_scale_inv"], - module.tp_size, - module.tp_rank, - TensorParallelMode.COLUMN, - device=device) - for expert_id in module.initial_local_expert_ids - ] - - all_w1_scales = [ - load_weight_shard(weights[f"{expert_id}.w1.weight_scale_inv"], - module.tp_size, - module.tp_rank, - TensorParallelMode.COLUMN, - device=device) - for expert_id in module.initial_local_expert_ids - ] - - w3_w1_scales = torch.cat( - [torch.stack(all_w3_scales), - torch.stack(all_w1_scales)], dim=-2) - module.w3_w1_weight_scaling_factor.data.copy_(w3_w1_scales) - - all_w2_scales = [ - load_weight_shard(weights[f"{expert_id}.w2.weight_scale_inv"], - module.tp_size, - module.tp_rank, - TensorParallelMode.ROW, - device=device) - for expert_id in module.initial_local_expert_ids - ] + def load_expert_all_weight_scale_fp8_block_scale( + self, module: torch.nn.Module, weights: Dict, + load_expert_ids: List[int], dst_w3_w1_weight_scale: torch.Tensor, + dst_w2_weight_scale: torch.Tensor, device): + for local_slot_id, expert_id in enumerate(load_expert_ids): + w3_scale = load_weight_shard( + weights[f"{expert_id}.w3.weight_scale_inv"], + module.tp_size, + module.tp_rank, + TensorParallelMode.COLUMN, + device=device) + dst_w3_w1_weight_scale[local_slot_id][:dst_w3_w1_weight_scale. + shape[-2] // + 2].copy_(w3_scale) + w1_scale = load_weight_shard( + weights[f"{expert_id}.w1.weight_scale_inv"], + module.tp_size, + module.tp_rank, + TensorParallelMode.COLUMN, + device=device) + dst_w3_w1_weight_scale[local_slot_id][dst_w3_w1_weight_scale. + shape[-2] // + 2:].copy_(w1_scale) + w2_scale = load_weight_shard( + weights[f"{expert_id}.w2.weight_scale_inv"], + module.tp_size, + module.tp_rank, + TensorParallelMode.ROW, + device=device) + dst_w2_weight_scale[local_slot_id].copy_(w2_scale) - w2_scales = torch.stack(all_w2_scales) - module.w2_weight_scaling_factor.data.copy_(w2_scales) + def load_quant_scales(self, module: torch.nn.Module, weights: Dict): + self.load_expert_all_weight_scale_fp8_block_scale( + module, + weights, + module.initial_local_expert_ids, + module.w3_w1_weight_scaling_factor.data, + module.w2_weight_scaling_factor.data, + device=torch.device("cuda")) + if self.need_load_shared_weights(module): + local_shared_load_expert_ids = module.layer_load_balancer.get_load_expert_ids( + ) + local_shared_w3_w1_scale_tensors = torch.empty( + (len(local_shared_load_expert_ids), ) + + module.w3_w1_weight_scaling_factor.data.shape[1:], + dtype=module.w3_w1_weight_scaling_factor.data.dtype, + device='cpu') + local_shared_w2_scale_tensors = torch.empty( + (len(local_shared_load_expert_ids), ) + + module.w2_weight_scaling_factor.data.shape[1:], + dtype=module.w2_weight_scaling_factor.data.dtype, + device='cpu') + self.load_expert_all_weight_scale_fp8_block_scale( + module, + weights, + local_shared_load_expert_ids, + local_shared_w3_w1_scale_tensors, + local_shared_w2_scale_tensors, + device=torch.device("cpu")) + module.register_all_parameter_slot_and_to_fix_weight_fns({ + 'w3_w1_weight_scaling_factor': + local_shared_w3_w1_scale_tensors, + 'w2_weight_scaling_factor': + local_shared_w2_scale_tensors, + }) class WInt4AFP8FusedMoEMethod(FusedMoEMethodBase): From 5ac7c47c95843dd06d15c5341a0ff806f18a733a Mon Sep 17 00:00:00 2001 From: Dongxu Yang <78518666+dongxuy04@users.noreply.github.com> Date: Sat, 14 Jun 2025 22:35:00 +0800 Subject: [PATCH 04/13] rename hostAccessibleDeviceAllocatorTest Signed-off-by: Dongxu Yang <78518666+dongxuy04@users.noreply.github.com> --- cpp/tests/unit_tests/runtime/CMakeLists.txt | 2 +- ...ceAllocatorTest.cpp => hostAccessibleDeviceAllocatorTest.cu} | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) rename cpp/tests/unit_tests/runtime/{hostAccessibleDeviceAllocatorTest.cpp => hostAccessibleDeviceAllocatorTest.cu} (100%) diff --git a/cpp/tests/unit_tests/runtime/CMakeLists.txt b/cpp/tests/unit_tests/runtime/CMakeLists.txt index 898ce395a39..25ec2ab1b5e 100644 --- a/cpp/tests/unit_tests/runtime/CMakeLists.txt +++ b/cpp/tests/unit_tests/runtime/CMakeLists.txt @@ -14,7 +14,7 @@ add_gtest(cudaMemPoolTest cudaMemPoolTest.cpp) add_gtest(decodingLayerWorkspaceTest decodingLayerWorkspaceTest.cpp) add_gtest(gdrcopyTest gdrcopyTest.cpp) add_gtest(hostAccessibleDeviceAllocatorTest - hostAccessibleDeviceAllocatorTest.cpp) + hostAccessibleDeviceAllocatorTest.cu) add_gtest(iBufferTest iBufferTest.cpp) add_gtest(iTensorTest iTensorTest.cpp) add_gtest(loraCacheTest loraCacheTest.cpp) diff --git a/cpp/tests/unit_tests/runtime/hostAccessibleDeviceAllocatorTest.cpp b/cpp/tests/unit_tests/runtime/hostAccessibleDeviceAllocatorTest.cu similarity index 100% rename from cpp/tests/unit_tests/runtime/hostAccessibleDeviceAllocatorTest.cpp rename to cpp/tests/unit_tests/runtime/hostAccessibleDeviceAllocatorTest.cu index c8638fde34d..d35f8a21c34 100644 --- a/cpp/tests/unit_tests/runtime/hostAccessibleDeviceAllocatorTest.cpp +++ b/cpp/tests/unit_tests/runtime/hostAccessibleDeviceAllocatorTest.cu @@ -14,8 +14,8 @@ * limitations under the License. */ -#include "tensorrt_llm/runtime/moeLoadBalancer/hostAccessibleDeviceAllocator.h" #include "tensorrt_llm/common/cudaUtils.h" +#include "tensorrt_llm/runtime/moeLoadBalancer/hostAccessibleDeviceAllocator.h" #include #include #include From 7de16d3f7a32110310dd56772a371c8703983405 Mon Sep 17 00:00:00 2001 From: Dongxu Yang <78518666+dongxuy04@users.noreply.github.com> Date: Mon, 16 Jun 2025 13:49:45 +0800 Subject: [PATCH 05/13] remove unused import to fix format check Signed-off-by: Dongxu Yang <78518666+dongxuy04@users.noreply.github.com> --- tensorrt_llm/_torch/modules/fused_moe/moe_load_balancer.py | 1 - 1 file changed, 1 deletion(-) diff --git a/tensorrt_llm/_torch/modules/fused_moe/moe_load_balancer.py b/tensorrt_llm/_torch/modules/fused_moe/moe_load_balancer.py index 4dcd85113e5..2533118391b 100644 --- a/tensorrt_llm/_torch/modules/fused_moe/moe_load_balancer.py +++ b/tensorrt_llm/_torch/modules/fused_moe/moe_load_balancer.py @@ -1,4 +1,3 @@ -import platform import threading from contextlib import nullcontext from multiprocessing import resource_tracker, shared_memory From cc9f551a4860f8eb38b59a36cc646fbc53f23e47 Mon Sep 17 00:00:00 2001 From: Dongxu Yang <78518666+dongxuy04@users.noreply.github.com> Date: Wed, 18 Jun 2025 19:18:29 +0800 Subject: [PATCH 06/13] fix mWorkerThreadStopped flag Signed-off-by: Dongxu Yang <78518666+dongxuy04@users.noreply.github.com> --- cpp/tensorrt_llm/runtime/moeLoadBalancer/moeLoadBalancer.cpp | 5 +---- cpp/tensorrt_llm/runtime/moeLoadBalancer/moeLoadBalancer.h | 2 +- 2 files changed, 2 insertions(+), 5 deletions(-) diff --git a/cpp/tensorrt_llm/runtime/moeLoadBalancer/moeLoadBalancer.cpp b/cpp/tensorrt_llm/runtime/moeLoadBalancer/moeLoadBalancer.cpp index 67c1988a0de..0298f3975e5 100644 --- a/cpp/tensorrt_llm/runtime/moeLoadBalancer/moeLoadBalancer.cpp +++ b/cpp/tensorrt_llm/runtime/moeLoadBalancer/moeLoadBalancer.cpp @@ -921,14 +921,11 @@ void MoeLoadBalancer::finalizeModel() } if (mLayerUpdatesPerIter > 0) { + mWorkerThreadStopped = false; mMultiThreadWorker->start(); generateUpdatePlan(); startThreads(); } - else - { - mWorkerThreadStopped = true; - } mModelFinalized = true; } diff --git a/cpp/tensorrt_llm/runtime/moeLoadBalancer/moeLoadBalancer.h b/cpp/tensorrt_llm/runtime/moeLoadBalancer/moeLoadBalancer.h index 18960ec347e..b0faa0fa977 100644 --- a/cpp/tensorrt_llm/runtime/moeLoadBalancer/moeLoadBalancer.h +++ b/cpp/tensorrt_llm/runtime/moeLoadBalancer/moeLoadBalancer.h @@ -293,7 +293,7 @@ class MoeLoadBalancer void workerThread(); std::mutex mWorkerThreadMutex; std::condition_variable mWorkerThreadCondition; - bool mWorkerThreadStopped = false; + bool mWorkerThreadStopped = true; int64_t mWarmUpUntilIter = -1; // we use a separate thread to compute and update weights to avoid possible blocking for next layer due to slow From e9d20a59ef00c8e06ac612b8fc1451a912bdf29f Mon Sep 17 00:00:00 2001 From: Dongxu Yang <78518666+dongxuy04@users.noreply.github.com> Date: Wed, 18 Jun 2025 19:21:06 +0800 Subject: [PATCH 07/13] fix gdrwarp license Signed-off-by: Dongxu Yang <78518666+dongxuy04@users.noreply.github.com> --- .../runtime/moeLoadBalancer/gdrwrap.cpp | 18 ++++++++++++++---- .../runtime/moeLoadBalancer/gdrwrap.h | 16 ++++++++++++++++ 2 files changed, 30 insertions(+), 4 deletions(-) diff --git a/cpp/tensorrt_llm/runtime/moeLoadBalancer/gdrwrap.cpp b/cpp/tensorrt_llm/runtime/moeLoadBalancer/gdrwrap.cpp index c648afeda89..71f951ef786 100644 --- a/cpp/tensorrt_llm/runtime/moeLoadBalancer/gdrwrap.cpp +++ b/cpp/tensorrt_llm/runtime/moeLoadBalancer/gdrwrap.cpp @@ -1,8 +1,18 @@ -/************************************************************************* - * Copyright (c) 2020-2021, NVIDIA CORPORATION. All rights reserved. +/* + * Copyright (c) 2022-2024, NVIDIA CORPORATION. All rights reserved. * - * See LICENSE.txt for license information - ************************************************************************/ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ #ifndef _WIN32 diff --git a/cpp/tensorrt_llm/runtime/moeLoadBalancer/gdrwrap.h b/cpp/tensorrt_llm/runtime/moeLoadBalancer/gdrwrap.h index 265d089bd9f..6c683aa7bd9 100644 --- a/cpp/tensorrt_llm/runtime/moeLoadBalancer/gdrwrap.h +++ b/cpp/tensorrt_llm/runtime/moeLoadBalancer/gdrwrap.h @@ -1,3 +1,19 @@ +/* + * Copyright (c) 2022-2024, NVIDIA CORPORATION. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + #pragma once #ifdef _WIN32 From be83cde4369555bdad78e508fd51e507ecb6184c Mon Sep 17 00:00:00 2001 From: Dongxu Yang <78518666+dongxuy04@users.noreply.github.com> Date: Fri, 20 Jun 2025 14:32:25 +0800 Subject: [PATCH 08/13] fix rebase issues Signed-off-by: Dongxu Yang <78518666+dongxuy04@users.noreply.github.com> --- tensorrt_llm/_torch/modules/fused_moe/fused_moe_wide_ep.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/tensorrt_llm/_torch/modules/fused_moe/fused_moe_wide_ep.py b/tensorrt_llm/_torch/modules/fused_moe/fused_moe_wide_ep.py index 05bf4edd771..b322ba13c4e 100755 --- a/tensorrt_llm/_torch/modules/fused_moe/fused_moe_wide_ep.py +++ b/tensorrt_llm/_torch/modules/fused_moe/fused_moe_wide_ep.py @@ -440,6 +440,8 @@ def forward_chunk( x_row = x.shape[0] x_col = x.shape[1] sf_swizzle = True + x_row = x.shape[0] + x_col = x.shape[1] if self.has_any_quant: if self.has_fp8_qdq: x, _ = torch.ops.tensorrt_llm.static_quantize_e4m3_per_tensor( @@ -895,7 +897,7 @@ def register_to_fix_weight_fn(self, weight_name: str): ), f'weight {weight_name} should be a is_contiguous, shape={weight_tensor.shape}, strides={weight_tensor.is_contiguous()}' assert weight_tensor.numel() * weight_tensor.element_size() == weight_tensor.untyped_storage().size(),\ f'weight {weight_name} shape={weight_tensor.shape} storage_size = {weight_tensor.untyped_storage().size()}, numel={weight_tensor.numel()}, eltsize={weight_tensor.element_size()}, dtype={weight_tensor.dtype}' - self.layer_load_balancer.fix_tensor(weight_tensor) + self.layer_load_balancer.make_tensor_host_accessible(weight_tensor) param.data = weight_tensor def register_all_parameter_slot_and_to_fix_weight_fns( @@ -912,7 +914,7 @@ def register_all_parameter_slot_and_to_fix_weight_fns( self.register_parameter_weight_slot_fn, (weight_name, local_slot_id)) for weight_name in weight_and_tensor_dict: - self.layer_load_balancer.add_to_fix_weight_fn( + self.layer_load_balancer.add_to_migrate_weight_fn( self.register_to_fix_weight_fn, (weight_name, )) local_shared_load_expert_ids = self.layer_load_balancer.get_load_expert_ids( From 370a25a12c90c66c4127ef0f5bd58d736c6149d2 Mon Sep 17 00:00:00 2001 From: Dongxu Yang <78518666+dongxuy04@users.noreply.github.com> Date: Fri, 20 Jun 2025 15:34:51 +0800 Subject: [PATCH 09/13] add support check and update tests Signed-off-by: Dongxu Yang <78518666+dongxuy04@users.noreply.github.com> --- .../pybind/runtime/moeBindings.cpp | 4 ++ .../hostAccessibleDeviceAllocator.cpp | 45 ++++++++++++++++++- .../hostAccessibleDeviceAllocator.h | 20 ++++----- .../_torch/modules/test_moe_load_balancer.py | 4 ++ tests/unittest/bindings/test_bindings_moe.py | 3 +- 5 files changed, 62 insertions(+), 14 deletions(-) diff --git a/cpp/tensorrt_llm/pybind/runtime/moeBindings.cpp b/cpp/tensorrt_llm/pybind/runtime/moeBindings.cpp index 593c706b747..d6ee3b944ef 100644 --- a/cpp/tensorrt_llm/pybind/runtime/moeBindings.cpp +++ b/cpp/tensorrt_llm/pybind/runtime/moeBindings.cpp @@ -16,6 +16,7 @@ */ #include "moeBindings.h" +#include "tensorrt_llm/runtime/moeLoadBalancer/hostAccessibleDeviceAllocator.h" #include "tensorrt_llm/runtime/moeLoadBalancer/moeLoadBalancer.h" #include #include @@ -111,6 +112,9 @@ void initMoeBindings(pybind11::module_& m) .def("end_iter", &tr::MoeLoadBalancer::endIter, py::arg("iter_id"), "End the iteration with the given ID") .def("shutdown", &tr::MoeLoadBalancer::shutdown, "Shutdown the load balancer and clean up resources"); + m.def("is_host_accessible_device_memory_supported", &tr::HostAccessibleDeviceAllocator::isSupported, + "If current system support host accessible device memory"); + // Bind do_replication function for testing m.def("do_replication", &pyDoReplication, py::arg("meta_info"), py::arg("expert_load_factor"), py::arg("cpu_placement"), "Do replication"); diff --git a/cpp/tensorrt_llm/runtime/moeLoadBalancer/hostAccessibleDeviceAllocator.cpp b/cpp/tensorrt_llm/runtime/moeLoadBalancer/hostAccessibleDeviceAllocator.cpp index 979a210afe9..18df72e0099 100644 --- a/cpp/tensorrt_llm/runtime/moeLoadBalancer/hostAccessibleDeviceAllocator.cpp +++ b/cpp/tensorrt_llm/runtime/moeLoadBalancer/hostAccessibleDeviceAllocator.cpp @@ -15,6 +15,7 @@ */ #include +#include #include #include "gdrwrap.h" @@ -27,9 +28,35 @@ namespace tensorrt_llm::runtime { +bool HostAccessibleDeviceAllocator::mAllowManagedFallback = false; + +bool HostAccessibleDeviceAllocator::isSupported() +{ + if (TopologyDetector::getInstance().getCurrentGpuMemoryNumaId() >= 0) + { + // we are on systems that GPU memory is also a NUMA node. + return true; + } + if (!tensorrt_llm::runtime::gdrcopy::isInitialized() && !tensorrt_llm::runtime::gdrcopy::initialize()) + { + // system don't support GDRCopy. + return mAllowManagedFallback; + } + return true; +} + void HostAccessibleDeviceAllocator::init() { TLLM_CHECK(mIsInited == false); + + if (getenv("TLLM_HOST_ACCESSIBLE_ALLOW_MANAGED_FALLBACK") != nullptr) + { + if (std::string(getenv("TLLM_HOST_ACCESSIBLE_ALLOW_MANAGED_FALLBACK")) == "1") + { + mAllowManagedFallback = true; + } + } + TLLM_CUDA_CHECK(cudaGetDevice(&mDevId)); mGpuMemNumaId = TopologyDetector::getInstance().getCurrentGpuMemoryNumaId(); if (mGpuMemNumaId < 0) @@ -148,10 +175,18 @@ void* HostAccessibleDeviceAllocator::allocate(size_t memorySize) devPtr = TopologyDetector::getInstance().allocateCurrentGpuNumaMemory(memorySize); hostPtr = devPtr; } - else + else if (mGdrHandle) { gdrcopy::gdrCudaMalloc(&hostPtr, &devPtr, memorySize, &memDesc, mGdrHandle); } + else + { + TLLM_CHECK_WITH_INFO( + mAllowManagedFallback, "HostAccessibleDeviceAllocator is not supported on the current system."); + TLLM_CUDA_CHECK(cudaMallocManaged(&devPtr, memorySize)); + TLLM_CUDA_CHECK(cudaMemAdvise(devPtr, memorySize, cudaMemAdviseSetPreferredLocation, currentDevId)); + hostPtr = devPtr; + } recordAllocation(devPtr, memorySize, hostPtr, memDesc); return devPtr; } @@ -167,10 +202,16 @@ void HostAccessibleDeviceAllocator::free(void* ptr) { gdrcopy::gdrCudaFree(allocInfo.memDesc, mGdrHandle); } - else + else if (mGpuMemNumaId >= 0) { TopologyDetector::getInstance().freeCurrentGpuNumaMemory(it->first, allocInfo.size); } + else + { + TLLM_CHECK_WITH_INFO( + mAllowManagedFallback, "HostAccessibleDeviceAllocator is not supported on the current system."); + TLLM_CUDA_CHECK(cudaFree(ptr)); + } mAllocations.erase(it); } else diff --git a/cpp/tensorrt_llm/runtime/moeLoadBalancer/hostAccessibleDeviceAllocator.h b/cpp/tensorrt_llm/runtime/moeLoadBalancer/hostAccessibleDeviceAllocator.h index 679df172b26..a4843acb22a 100644 --- a/cpp/tensorrt_llm/runtime/moeLoadBalancer/hostAccessibleDeviceAllocator.h +++ b/cpp/tensorrt_llm/runtime/moeLoadBalancer/hostAccessibleDeviceAllocator.h @@ -47,10 +47,17 @@ class HostAccessibleDeviceAllocator */ static HostAccessibleDeviceAllocator& getInstance(); + /** + * @brief check if host accessible device is supported for current GPU. + * @return true if supported else false. + */ + static bool isSupported(); + /** * @brief Allocate host accessible memory on the device. * * @param memorySize The size of the memory to allocate. + * @param allowManagedFallback Whether allow fall back to managed memory if not supported. * @return void* Pointer to the allocated memory. */ void* allocate(size_t memorySize); @@ -70,17 +77,6 @@ class HostAccessibleDeviceAllocator */ void* getHostPtr(void* devPtr); - /** - * @brief Check if host accessible memory is supported on the current system. - * @note This function should be called after IncRefCount() by some MoeLoadBalancer. - * - * @return bool True if host accessible memory is supported, false otherwise. - */ - bool isSupported() const - { - return mGpuMemNumaId >= 0 || mGdrHandle != nullptr; - } - private: struct AllocationInfo { @@ -149,6 +145,8 @@ class HostAccessibleDeviceAllocator std::mutex mAllocationsMutex; std::map mAllocations; + + static bool mAllowManagedFallback; }; } // namespace tensorrt_llm::runtime diff --git a/tests/unittest/_torch/modules/test_moe_load_balancer.py b/tests/unittest/_torch/modules/test_moe_load_balancer.py index a49c1a4eaea..caad4764844 100644 --- a/tests/unittest/_torch/modules/test_moe_load_balancer.py +++ b/tests/unittest/_torch/modules/test_moe_load_balancer.py @@ -1,3 +1,4 @@ +import os import unittest from unittest.mock import MagicMock, patch @@ -14,6 +15,9 @@ class TestMoeLoadBalancer(unittest.TestCase): Test cases for the MoeLoadBalancer class. """ + def setUp(self): + os.environ["TLLM_HOST_ACCESSIBLE_ALLOW_MANAGED_FALLBACK"] = "1" + @patch('tensorrt_llm.bindings.internal.runtime.MoeLoadBalancer') def test_moe_load_balancer_init(self, mock_load_balancer_impl): """Test initialization of MoeLoadBalancer.""" diff --git a/tests/unittest/bindings/test_bindings_moe.py b/tests/unittest/bindings/test_bindings_moe.py index 58b7482e302..81c391a0bee 100644 --- a/tests/unittest/bindings/test_bindings_moe.py +++ b/tests/unittest/bindings/test_bindings_moe.py @@ -13,7 +13,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - +import os import unittest import numpy as np @@ -26,6 +26,7 @@ class TestMoePythonBindings(unittest.TestCase): def setUp(self): + os.environ["TLLM_HOST_ACCESSIBLE_ALLOW_MANAGED_FALLBACK"] = "1" torch.cuda.set_device(0) # Common test parameters self.expert_count = 8 From 4aa85d2e860404dc28d547ade0f406901cb81455 Mon Sep 17 00:00:00 2001 From: Dongxu Yang <78518666+dongxuy04@users.noreply.github.com> Date: Fri, 20 Jun 2025 17:17:42 +0800 Subject: [PATCH 10/13] using gddrcopy memcpy Signed-off-by: Dongxu Yang <78518666+dongxuy04@users.noreply.github.com> --- .../hostAccessibleDeviceAllocator.cpp | 81 +++++++++++++++---- .../hostAccessibleDeviceAllocator.h | 30 ++++++- .../moeLoadBalancer/moeLoadBalancer.cpp | 15 ++-- 3 files changed, 101 insertions(+), 25 deletions(-) diff --git a/cpp/tensorrt_llm/runtime/moeLoadBalancer/hostAccessibleDeviceAllocator.cpp b/cpp/tensorrt_llm/runtime/moeLoadBalancer/hostAccessibleDeviceAllocator.cpp index 18df72e0099..6d6e8ab21fc 100644 --- a/cpp/tensorrt_llm/runtime/moeLoadBalancer/hostAccessibleDeviceAllocator.cpp +++ b/cpp/tensorrt_llm/runtime/moeLoadBalancer/hostAccessibleDeviceAllocator.cpp @@ -14,6 +14,8 @@ * limitations under the License. */ +#include + #include #include #include @@ -122,28 +124,55 @@ void HostAccessibleDeviceAllocator::DecRefCount() void HostAccessibleDeviceAllocator::recordAllocation( void* devPtr, size_t memorySize, void* hostPtr, gdrcopy::GdrMemDesc* memDesc) { - std::lock_guard lock(mAllocationsMutex); - mAllocations[devPtr] = {memorySize, hostPtr, memDesc}; + std::unique_lock lock(mAllocationsMutex); + mDeviceAllocations[devPtr] = {memorySize, hostPtr, devPtr, memDesc}; + mHostAllocations[hostPtr] = {memorySize, hostPtr, devPtr, memDesc}; } -void* HostAccessibleDeviceAllocator::getHostPtr(void* devPtr) +HostAccessibleDeviceAllocator::AllocationInfo HostAccessibleDeviceAllocator::getAllocationInfoFromHostPtr( + void const* hostPtr) { - std::lock_guard lock(mAllocationsMutex); - if (mAllocations.empty()) + std::shared_lock lock(mAllocationsMutex); + if (mHostAllocations.empty()) { - return nullptr; + return HostAccessibleDeviceAllocator::AllocationInfo{0, nullptr, nullptr, nullptr}; } - - auto it = mAllocations.upper_bound(devPtr); - if (it == mAllocations.begin()) + auto it = mHostAllocations.upper_bound(hostPtr); + if (it == mHostAllocations.begin()) { - return nullptr; + return HostAccessibleDeviceAllocator::AllocationInfo{0, nullptr, nullptr, nullptr}; + ; } + --it; + return it->second; +} +HostAccessibleDeviceAllocator::AllocationInfo HostAccessibleDeviceAllocator::getAllocationInfoFromDevPtr( + void const* devPtr) +{ + std::shared_lock lock(mAllocationsMutex); + if (mDeviceAllocations.empty()) + { + return HostAccessibleDeviceAllocator::AllocationInfo{0, nullptr, nullptr, nullptr}; + } + auto it = mDeviceAllocations.upper_bound(devPtr); + if (it == mDeviceAllocations.begin()) + { + return HostAccessibleDeviceAllocator::AllocationInfo{0, nullptr, nullptr, nullptr}; + ; + } --it; + return it->second; +} - void* recordedDevPtr = it->first; - auto const& allocationInfo = it->second; +void* HostAccessibleDeviceAllocator::getHostPtr(void* devPtr) +{ + auto allocationInfo = getAllocationInfoFromDevPtr(devPtr); + if (allocationInfo.devPtr == nullptr) + { + return nullptr; + } + void* recordedDevPtr = allocationInfo.devPtr; size_t recordedSize = allocationInfo.size; void* recordedHostPtr = allocationInfo.hostPtr; @@ -159,6 +188,21 @@ void* HostAccessibleDeviceAllocator::getHostPtr(void* devPtr) return nullptr; } +void HostAccessibleDeviceAllocator::memcpyToDevice(void* dst, void const* src, size_t size) +{ + if (mGdrHandle != nullptr) + { + auto allocationInfo = getAllocationInfoFromHostPtr(dst); + TLLM_CHECK(allocationInfo.hostPtr != nullptr); + TLLM_CHECK(allocationInfo.memDesc != nullptr); + tensorrt_llm::runtime::gdrcopy::copy_to_mapping(allocationInfo.memDesc->gdrMh, dst, src, size); + } + else + { + memcpy(dst, src, size); + } +} + void* HostAccessibleDeviceAllocator::allocate(size_t memorySize) { int currentDevId = -1; @@ -193,9 +237,9 @@ void* HostAccessibleDeviceAllocator::allocate(size_t memorySize) void HostAccessibleDeviceAllocator::free(void* ptr) { - std::lock_guard lock(mAllocationsMutex); - auto it = mAllocations.find(ptr); - if (it != mAllocations.end()) + std::unique_lock lock(mAllocationsMutex); + auto it = mDeviceAllocations.find(ptr); + if (it != mDeviceAllocations.end()) { auto const& allocInfo = it->second; if (allocInfo.memDesc) @@ -204,7 +248,7 @@ void HostAccessibleDeviceAllocator::free(void* ptr) } else if (mGpuMemNumaId >= 0) { - TopologyDetector::getInstance().freeCurrentGpuNumaMemory(it->first, allocInfo.size); + TopologyDetector::getInstance().freeCurrentGpuNumaMemory(const_cast(it->first), allocInfo.size); } else { @@ -212,7 +256,10 @@ void HostAccessibleDeviceAllocator::free(void* ptr) mAllowManagedFallback, "HostAccessibleDeviceAllocator is not supported on the current system."); TLLM_CUDA_CHECK(cudaFree(ptr)); } - mAllocations.erase(it); + void* hostPtr = it->second.hostPtr; + TLLM_CHECK_WITH_INFO(mHostAllocations.count(hostPtr) == 1, "host pointer not recorded."); + mDeviceAllocations.erase(it); + mHostAllocations.erase(hostPtr); } else { diff --git a/cpp/tensorrt_llm/runtime/moeLoadBalancer/hostAccessibleDeviceAllocator.h b/cpp/tensorrt_llm/runtime/moeLoadBalancer/hostAccessibleDeviceAllocator.h index a4843acb22a..0cff7de9568 100644 --- a/cpp/tensorrt_llm/runtime/moeLoadBalancer/hostAccessibleDeviceAllocator.h +++ b/cpp/tensorrt_llm/runtime/moeLoadBalancer/hostAccessibleDeviceAllocator.h @@ -18,6 +18,7 @@ #include #include +#include #include #include "gdrwrap.h" @@ -77,11 +78,21 @@ class HostAccessibleDeviceAllocator */ void* getHostPtr(void* devPtr); + /** + * @brief memcpyToDevice, use memcpy or GDRCopy + * + * @param dst : the dst pointer, should be host accessible + * @param src : the src pointer, should be on host + * @param size : copy size + */ + void memcpyToDevice(void* dst, void const* src, size_t size); + private: struct AllocationInfo { size_t size; void* hostPtr; + void* devPtr; gdrcopy::GdrMemDesc* memDesc; }; @@ -132,6 +143,20 @@ class HostAccessibleDeviceAllocator */ void recordAllocation(void* devPtr, size_t memorySize, void* hostPtr, gdrcopy::GdrMemDesc* memDesc = nullptr); + /** + * @brief Get Allocation information from host pointer + * + * @param The host accessible pointer + */ + AllocationInfo getAllocationInfoFromHostPtr(void const* hostPtr); + + /** + * @brief Get Allocation information from device pointer + * + * @param The device accessible pointer + */ + AllocationInfo getAllocationInfoFromDevPtr(void const* devPtr); + // if GPU memory has NUMA id, then CPU can direct access that. We should use this. int mGpuMemNumaId = -1; // if Not, we should use GDRCopy @@ -143,8 +168,9 @@ class HostAccessibleDeviceAllocator std::mutex mRefMutex; int mLoadBalancerCount = 0; - std::mutex mAllocationsMutex; - std::map mAllocations; + std::shared_mutex mAllocationsMutex; + std::map mDeviceAllocations; + std::map mHostAllocations; static bool mAllowManagedFallback; }; diff --git a/cpp/tensorrt_llm/runtime/moeLoadBalancer/moeLoadBalancer.cpp b/cpp/tensorrt_llm/runtime/moeLoadBalancer/moeLoadBalancer.cpp index 0298f3975e5..8685ba08d15 100644 --- a/cpp/tensorrt_llm/runtime/moeLoadBalancer/moeLoadBalancer.cpp +++ b/cpp/tensorrt_llm/runtime/moeLoadBalancer/moeLoadBalancer.cpp @@ -543,11 +543,12 @@ void SingleLayerMoeLoadBalancer::copyPlacementInfoToGpu() void SingleLayerMoeLoadBalancer::copyPlacementInfoToGpuByCpu() { - memcpy(mGpuPlacementHostAccess.expertReplicaCount, mCpuPlacementInfo.placementInfoForGPU.expertReplicaCount, - sizeof(int) * mMetaInfo.expertCount); - memcpy(mGpuPlacementHostAccess.expertReplicaStartOffset, + HostAccessibleDeviceAllocator::getInstance().memcpyToDevice(mGpuPlacementHostAccess.expertReplicaCount, + mCpuPlacementInfo.placementInfoForGPU.expertReplicaCount, sizeof(int) * mMetaInfo.expertCount); + HostAccessibleDeviceAllocator::getInstance().memcpyToDevice(mGpuPlacementHostAccess.expertReplicaStartOffset, mCpuPlacementInfo.placementInfoForGPU.expertReplicaStartOffset, sizeof(int) * mMetaInfo.expertCount); - memcpy(mGpuPlacementHostAccess.globalSlotIds, mCpuPlacementInfo.placementInfoForGPU.globalSlotIds, + HostAccessibleDeviceAllocator::getInstance().memcpyToDevice(mGpuPlacementHostAccess.globalSlotIds, + mCpuPlacementInfo.placementInfoForGPU.globalSlotIds, sizeof(int) * mMetaInfo.epSize * mMetaInfo.slotCountPerRank); mCpuPlacementInfo.rankExpertIds.swap(mCpuPlacementInfo.oldRankExpertIds); for (int i = 0; i < mMetaInfo.epSize; ++i) @@ -763,14 +764,16 @@ void HostMemoryMoeWeightUpdater::copyWeightsCpu(MoeWeight const& src, MoeWeight size_t threadCopyCount = fullCopyCount / size; for (size_t i = rank * threadCopyCount; i < (rank + 1) * threadCopyCount; i++) { - memcpy(dstPtr + i * dstPitch, srcPtr + i * srcPitch, singleCopySize); + HostAccessibleDeviceAllocator::getInstance().memcpyToDevice( + dstPtr + i * dstPitch, srcPtr + i * srcPitch, singleCopySize); } size_t threadStartOffset = rank * singleCopySize / size; size_t threadEndOffset = (rank + 1) * singleCopySize / size; size_t threadCopySize = threadEndOffset - threadStartOffset; for (size_t i = fullCopyCount; i < copyCount && threadCopySize > 0; i++) { - memcpy(dstPtr + i * dstPitch + threadStartOffset, srcPtr + i * srcPitch + threadStartOffset, threadCopySize); + HostAccessibleDeviceAllocator::getInstance().memcpyToDevice( + dstPtr + i * dstPitch + threadStartOffset, srcPtr + i * srcPitch + threadStartOffset, threadCopySize); } } From 0d919a4f8ddc94539e9f1c850898e9b3b5d0ca07 Mon Sep 17 00:00:00 2001 From: Dongxu Yang <78518666+dongxuy04@users.noreply.github.com> Date: Fri, 20 Jun 2025 23:44:42 +0800 Subject: [PATCH 11/13] fix unittest Signed-off-by: Dongxu Yang <78518666+dongxuy04@users.noreply.github.com> --- cpp/tensorrt_llm/runtime/moeLoadBalancer/moeLoadBalancer.cpp | 2 +- cpp/tests/runtime/moeLoadBalancerTest.cpp | 3 +++ 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/cpp/tensorrt_llm/runtime/moeLoadBalancer/moeLoadBalancer.cpp b/cpp/tensorrt_llm/runtime/moeLoadBalancer/moeLoadBalancer.cpp index 8685ba08d15..bf4faeca046 100644 --- a/cpp/tensorrt_llm/runtime/moeLoadBalancer/moeLoadBalancer.cpp +++ b/cpp/tensorrt_llm/runtime/moeLoadBalancer/moeLoadBalancer.cpp @@ -843,7 +843,7 @@ MoeLoadBalancer::MoeLoadBalancer(int epRank, int epSize, int layerUpdatesPerIter int numaCpuCount = topologyDetector.getCurrentGpuNumaCpuCount(); int numaGpuCount = topologyDetector.getGpuCountUnderNuma(currentGpuNumaId); HostAccessibleDeviceAllocator::getInstance().IncRefCount(); - TLLM_CHECK_WITH_INFO(HostAccessibleDeviceAllocator::getInstance().isSupported(), + TLLM_CHECK_WITH_INFO(layerUpdatesPerIter == 0 || HostAccessibleDeviceAllocator::getInstance().isSupported(), "HostAccessibleDeviceAllocator is not supported on current platform, please install gdrcopy(gdrdrv)."); TLLM_CHECK_WITH_INFO( numaCpuCount > 0 && numaGpuCount > 0, "numaCpuCount=%d, numaGpuCount=%d", numaCpuCount, numaGpuCount); diff --git a/cpp/tests/runtime/moeLoadBalancerTest.cpp b/cpp/tests/runtime/moeLoadBalancerTest.cpp index 739891c9f35..a7d4058a82f 100644 --- a/cpp/tests/runtime/moeLoadBalancerTest.cpp +++ b/cpp/tests/runtime/moeLoadBalancerTest.cpp @@ -16,6 +16,8 @@ #include +#include + #include "tensorrt_llm/common/cudaUtils.h" #include "tensorrt_llm/kernels/moeLoadBalance/moeLoadBalanceKernels.h" #include "tensorrt_llm/runtime/moeLoadBalancer/moeLoadBalancer.h" @@ -314,6 +316,7 @@ class MoeLoadBalancerTest : public ::testing::TestWithParam(param.epRank, param.epSize, param.layerUpdatesPerIter); From e041100b25087823b728514e8ff3589215a6812c Mon Sep 17 00:00:00 2001 From: Dongxu Yang <78518666+dongxuy04@users.noreply.github.com> Date: Sun, 22 Jun 2025 15:20:41 +0800 Subject: [PATCH 12/13] use fused allgather for non alltoall case Signed-off-by: Dongxu Yang <78518666+dongxuy04@users.noreply.github.com> --- .../modules/fused_moe/fused_moe_wide_ep.py | 37 ++++++++++++------- .../modules/fused_moe/moe_load_balancer.py | 33 ++++++++++++----- 2 files changed, 47 insertions(+), 23 deletions(-) diff --git a/tensorrt_llm/_torch/modules/fused_moe/fused_moe_wide_ep.py b/tensorrt_llm/_torch/modules/fused_moe/fused_moe_wide_ep.py index b322ba13c4e..0167eef4806 100755 --- a/tensorrt_llm/_torch/modules/fused_moe/fused_moe_wide_ep.py +++ b/tensorrt_llm/_torch/modules/fused_moe/fused_moe_wide_ep.py @@ -364,10 +364,12 @@ def forward_chunk( loadbalancer_local_statistic_info = None gathered_loadbalancer_local_statistic_info = None + token_selected_experts_for_statistic = None if self.layer_load_balancer is None: token_selected_slots = token_selected_experts else: - if not self.layer_load_balancer.is_static_routing(): + if not self.layer_load_balancer.is_static_routing( + ) and self.enable_alltoall: self.layer_load_balancer.local_statistic( token_selected_experts, is_first_stage=is_first_call, @@ -376,9 +378,12 @@ def forward_chunk( token_selected_experts, self.use_dp) if not self.layer_load_balancer.is_static_routing(): # split into two part to get possible overlap with load balancer routing - if is_last_call: - loadbalancer_local_statistic_info = self.layer_load_balancer.get_local_statistic_tensor( - ) + if self.enable_alltoall: + if is_last_call: + loadbalancer_local_statistic_info = self.layer_load_balancer.get_local_statistic_tensor( + ) + else: + token_selected_experts_for_statistic = token_selected_experts # If load balancer is disabled, the statistics are collected from expert IDs. # If load balancer is enabled, the statistics are collected from expert slot IDs. @@ -475,31 +480,35 @@ def forward_chunk( if self.use_dp and self.parallel_size > 1 and not disable_fp4_allgather( ) and not self.enable_alltoall: - x, x_sf, token_selected_slots, token_final_scales = allgather( + x, x_sf, token_selected_slots, token_final_scales, gathered_token_selected_experts_for_statistic = allgather( [ x, x_sf, token_selected_slots, token_final_scales, + token_selected_experts_for_statistic, ], self.mapping, dim=0, sizes=None if use_dp_padding else all_rank_num_tokens) - # use separate allgather since doesn't have sizes, can be optimized but in allgather path it is OK - if is_last_call and loadbalancer_local_statistic_info is not None: - gathered_loadbalancer_local_statistic_info = allgather( - loadbalancer_local_statistic_info, self.mapping, dim=0) # Fp4 gemm has extra scaling factor if x_sf is not None: x_sf = reswizzle_sf(x_sf, x_row, x_col, self.scaling_vector_size) if self.layer_load_balancer and not self.layer_load_balancer.is_static_routing( - ) and is_last_call: - gathered_loadbalancer_local_statistic_info = gathered_loadbalancer_local_statistic_info.view( - (self.mapping.moe_ep_size, self.num_experts)) - self.layer_load_balancer.update_statistic( - gathered_loadbalancer_local_statistic_info) + ): + if self.enable_alltoall: + if is_last_call: + gathered_loadbalancer_local_statistic_info = gathered_loadbalancer_local_statistic_info.view( + (self.mapping.moe_ep_size, self.num_experts)) + self.layer_load_balancer.update_statistic( + gathered_loadbalancer_local_statistic_info) + else: + self.layer_load_balancer.statistic( + gathered_token_selected_experts_for_statistic, + is_first_stage=is_first_call, + is_last_stage=is_last_call) if self.smart_router and not cutlass_min_latency_mode: ep_size = self.cluster_size diff --git a/tensorrt_llm/_torch/modules/fused_moe/moe_load_balancer.py b/tensorrt_llm/_torch/modules/fused_moe/moe_load_balancer.py index 2533118391b..fff9ed60481 100644 --- a/tensorrt_llm/_torch/modules/fused_moe/moe_load_balancer.py +++ b/tensorrt_llm/_torch/modules/fused_moe/moe_load_balancer.py @@ -488,16 +488,16 @@ def set_cpu_stage(self): assert self.statistic_event is not None assert self.statistic_stream is not None # wait statistic update done - self.statistic_event.wait() - self.statistic_event = None - self.statistic_stream = None current_stream_event = torch.cuda.Event() current_stream_event.record(torch.cuda.current_stream()) with torch.cuda.stream(self.cudagraph_stream): + self.statistic_event.wait() current_stream_event.wait() torch.ops.trtllm.moe_load_balance_set_cpu_stage( self.single_layer_load_balancer_ptr) self.cudagraph_event.record(self.cudagraph_stream) + self.statistic_event = None + self.statistic_stream = None else: torch.ops.trtllm.moe_load_balance_set_cpu_stage( self.single_layer_load_balancer_ptr) @@ -523,10 +523,24 @@ def statistic(self, gathered_raw_expert_ids: torch.Tensor, """ if self.updates_enabled: assert isinstance(self.statistic_flag_tensor, torch.Tensor) - torch.ops.trtllm.moe_load_balance_statistic( - gathered_raw_expert_ids, self.statistic_flag_tensor, - self.single_layer_load_balancer_ptr, is_first_stage, - is_last_stage) + if is_graph_capturing(): + if is_first_stage: + self.statistic_event = torch.cuda.Event() + self.statistic_stream = torch.cuda.Stream() + current_stream_event = torch.cuda.Event() + current_stream_event.record(torch.cuda.current_stream()) + with torch.cuda.stream(self.statistic_stream): + current_stream_event.wait() + torch.ops.trtllm.moe_load_balance_statistic( + gathered_raw_expert_ids, self.statistic_flag_tensor, + self.single_layer_load_balancer_ptr, is_first_stage, + is_last_stage) + self.statistic_event.record() + else: + torch.ops.trtllm.moe_load_balance_statistic( + gathered_raw_expert_ids, self.statistic_flag_tensor, + self.single_layer_load_balancer_ptr, is_first_stage, + is_last_stage) def local_statistic(self, local_raw_expert_ids: torch.Tensor, is_first_stage: bool, is_last_stage: bool): @@ -547,8 +561,9 @@ def local_statistic(self, local_raw_expert_ids: torch.Tensor, dtype=torch.int32, device=torch.device('cuda')) if is_graph_capturing(): - self.statistic_event = torch.cuda.Event() - self.statistic_stream = torch.cuda.Stream() + if is_first_stage: + self.statistic_event = torch.cuda.Event() + self.statistic_stream = torch.cuda.Stream() current_stream_event = torch.cuda.Event() current_stream_event.record(torch.cuda.current_stream()) with torch.cuda.stream(self.statistic_stream): From cc3a320d4deb0b23b93c815f8fce261337b1c46a Mon Sep 17 00:00:00 2001 From: Dongxu Yang <78518666+dongxuy04@users.noreply.github.com> Date: Wed, 25 Jun 2025 08:40:14 +0800 Subject: [PATCH 13/13] fix rebase for fused_moe_wide_ep.py Signed-off-by: Dongxu Yang <78518666+dongxuy04@users.noreply.github.com> --- tensorrt_llm/_torch/modules/fused_moe/fused_moe_wide_ep.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/tensorrt_llm/_torch/modules/fused_moe/fused_moe_wide_ep.py b/tensorrt_llm/_torch/modules/fused_moe/fused_moe_wide_ep.py index 0167eef4806..6d54e451ee4 100755 --- a/tensorrt_llm/_torch/modules/fused_moe/fused_moe_wide_ep.py +++ b/tensorrt_llm/_torch/modules/fused_moe/fused_moe_wide_ep.py @@ -445,8 +445,6 @@ def forward_chunk( x_row = x.shape[0] x_col = x.shape[1] sf_swizzle = True - x_row = x.shape[0] - x_col = x.shape[1] if self.has_any_quant: if self.has_fp8_qdq: x, _ = torch.ops.tensorrt_llm.static_quantize_e4m3_per_tensor(