Skip to content

Commit 92b472e

Browse files
d4l3ktimocafe
authored andcommitted
ProcessGroupGloo: support lazy_init (pytorch#150801)
This adds lazy initialization support to ProcessGroupGloo via `TORCH_GLOO_LAZY_INIT` or via `create_device(..., lazy_init=True)` This is still a draft PR as there's one race condition when doing coalesced operations that needs to be fixed upstream in Gloo first. Depends on pytorch/gloo#427 landing first This also updates the gloo submodule to include the required changes. Test plan: added lazy init test variants ``` pytest -v test/distributed/test_c10d_gloo.py -k Lazy ``` Pull Request resolved: pytorch#150801 Approved by: https://github.com/fduwjj
1 parent 63cc5c3 commit 92b472e

File tree

10 files changed

+119
-53
lines changed

10 files changed

+119
-53
lines changed

docs/source/distributed.rst

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -284,6 +284,13 @@ The machine with rank 0 will be used to set up all connections.
284284
This is the default method, meaning that ``init_method`` does not have to be specified (or
285285
can be ``env://``).
286286

287+
Improving initialization time
288+
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
289+
290+
* ``TORCH_GLOO_LAZY_INIT`` - establishes connections on demand rather than
291+
using a full mesh which can greatly improve initialization time for non all2all
292+
operations.
293+
287294
Post-Initialization
288295
-------------------
289296

test/distributed/test_c10d_gloo.py

Lines changed: 19 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@
4646
requires_gloo,
4747
simple_sparse_reduce_tests,
4848
skip_if_lt_x_gpu,
49+
skip_if_win32,
4950
verify_ddp_error_logged,
5051
)
5152
from torch.testing._internal.common_utils import (
@@ -219,6 +220,8 @@ def test_default_store_timeout_gloo(self):
219220

220221

221222
class ProcessGroupGlooTest(MultiProcessTestCase):
223+
lazy_init = False
224+
222225
def _create_process_group_gloo(self, store, rank, world_size, opts):
223226
pg = c10d.ProcessGroupGloo(store, self.rank, self.world_size, opts)
224227
dist.barrier(group=pg)
@@ -231,7 +234,7 @@ def setUp(self):
231234
def opts(self, threads=2):
232235
opts = c10d.ProcessGroupGloo._Options()
233236
opts._timeout = 50.0
234-
opts._devices = [create_device(interface=LOOPBACK)]
237+
opts._devices = [create_device(interface=LOOPBACK, lazy_init=self.lazy_init)]
235238
opts._threads = threads
236239
return opts
237240

@@ -241,8 +244,8 @@ def test_multi_device_constructor(self):
241244
opts = c10d.ProcessGroupGloo._Options()
242245
opts._timeout = 5.0
243246
opts._devices = [
244-
create_device(interface=LOOPBACK),
245-
create_device(interface=LOOPBACK),
247+
create_device(interface=LOOPBACK, lazy_init=self.lazy_init),
248+
create_device(interface=LOOPBACK, lazy_init=self.lazy_init),
246249
]
247250
pg = self._create_process_group_gloo(store, self.rank, self.world_size, opts)
248251

@@ -2334,6 +2337,19 @@ def test_forward_backward_optimizer(self):
23342337
optimizer.step()
23352338

23362339

2340+
@skip_if_win32()
2341+
class ProcessGroupGlooLazyInitTest(ProcessGroupGlooTest):
2342+
lazy_init = True
2343+
2344+
def setUp(self):
2345+
os.environ["TORCH_GLOO_LAZY_INIT"] = "1"
2346+
super().setUp()
2347+
2348+
def tearDown(self) -> None:
2349+
del os.environ["TORCH_GLOO_LAZY_INIT"]
2350+
return super().tearDown()
2351+
2352+
23372353
class CommTest(test_c10d_common.AbstractCommTest, MultiProcessTestCase):
23382354
@property
23392355
def device(self):

third_party/gloo

Submodule gloo updated from e348db9 to c610704

torch/_C/_distributed_c10d.pyi

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -570,9 +570,9 @@ class ProcessGroupGloo(Backend):
570570
timeout: timedelta,
571571
) -> None: ...
572572
@staticmethod
573-
def create_device(hostname="", interface="") -> Device: ...
573+
def create_device(hostname="", interface="", lazy_init=None) -> Device: ...
574574
@staticmethod
575-
def create_default_device() -> Device: ...
575+
def create_default_device(lazy_init=None) -> Device: ...
576576
def _set_default_timeout(self, timeout) -> None: ...
577577

578578
class _ProcessGroupWrapper(Backend):

torch/csrc/distributed/c10d/GlooDeviceFactory.cpp

Lines changed: 30 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -39,12 +39,14 @@ C10_DEFINE_SHARED_REGISTRY_WITHOUT_WARNING(
3939
GlooDeviceRegistry,
4040
::gloo::transport::Device,
4141
const std::string& /* interface */,
42-
const std::string& /* hostname */)
42+
const std::string& /* hostname */,
43+
bool /* lazyInit */)
4344

4445
#if GLOO_HAVE_TRANSPORT_TCP
4546
static std::shared_ptr<::gloo::transport::Device> makeTCPDevice(
4647
const std::string& interfaceName,
47-
const std::string& hostname) {
48+
const std::string& hostname,
49+
bool lazyInit) {
4850
TORCH_CHECK(
4951
!interfaceName.empty() || !hostname.empty(),
5052
"GlooDeviceFactory::makeTCPDevice(): interface or hostname "
@@ -56,7 +58,11 @@ static std::shared_ptr<::gloo::transport::Device> makeTCPDevice(
5658
} else {
5759
attr.hostname = hostname;
5860
}
59-
return ::gloo::transport::tcp::CreateDevice(attr);
61+
if (lazyInit) {
62+
return ::gloo::transport::tcp::CreateLazyDevice(attr);
63+
} else {
64+
return ::gloo::transport::tcp::CreateDevice(attr);
65+
}
6066
}
6167

6268
// Registry priority is per key identifier. We register TCP to `LINUX` for
@@ -69,12 +75,15 @@ C10_REGISTER_CREATOR(GlooDeviceRegistry, TCP, makeTCPDevice)
6975
#if GLOO_HAVE_TRANSPORT_TCP_TLS
7076
static std::shared_ptr<::gloo::transport::Device> makeTCPTLSDevice(
7177
const std::string& interface,
72-
const std::string& hostname) {
78+
const std::string& hostname,
79+
bool lazyInit) {
7380
TORCH_CHECK(
7481
!interface.empty() || !hostname.empty(),
7582
"GlooDeviceFactory::makeTCPTLSDevice(): interface or hostname "
7683
"can't be empty");
7784

85+
TORCH_CHECK(!lazyInit, "TCP_TLS transport does not support lazy init");
86+
7887
::gloo::transport::tcp::attr attr;
7988
if (!interface.empty()) {
8089
attr.iface = interface;
@@ -105,12 +114,15 @@ C10_REGISTER_CREATOR(GlooDeviceRegistry, TCP_TLS, makeTCPTLSDevice)
105114
#if GLOO_HAVE_TRANSPORT_UV
106115
static std::shared_ptr<::gloo::transport::Device> makeUVDevice(
107116
const std::string& interfaceName,
108-
const std::string& hostname) {
117+
const std::string& hostname,
118+
bool lazyInit) {
109119
TORCH_CHECK(
110120
!interfaceName.empty() || !hostname.empty(),
111121
"GlooDeviceFactory::makeUVDevice(): interface or hostname "
112122
"can't be empty");
113123

124+
TORCH_CHECK(!lazyInit, "UV transport does not support lazy init");
125+
114126
::gloo::transport::uv::attr attr;
115127
if (!interfaceName.empty()) {
116128
attr.iface = interfaceName;
@@ -131,41 +143,45 @@ C10_REGISTER_CREATOR(GlooDeviceRegistry, UV, makeUVDevice)
131143
namespace {
132144
std::shared_ptr<::gloo::transport::Device> makeGlooDevice(
133145
const std::string& interfaceName,
134-
const std::string& hostName) {
146+
const std::string& hostName,
147+
bool lazyInit) {
135148
static auto transportName = c10::utils::get_env("GLOO_DEVICE_TRANSPORT");
136149
if (transportName.has_value()) {
137150
return GlooDeviceRegistry()->Create(
138-
transportName.value().c_str(), interfaceName, hostName);
151+
transportName.value().c_str(), interfaceName, hostName, lazyInit);
139152
}
140153

141154
#ifdef __linux__
142-
return GlooDeviceRegistry()->Create("LINUX", interfaceName, hostName);
155+
return GlooDeviceRegistry()->Create(
156+
"LINUX", interfaceName, hostName, lazyInit);
143157
#endif
144158

145159
#ifdef __APPLE__
146-
return GlooDeviceRegistry()->Create("APPLE", interfaceName, hostName);
160+
return GlooDeviceRegistry()->Create(
161+
"APPLE", interfaceName, hostName, lazyInit);
147162
#endif
148163

149164
#ifdef _WIN32
150-
return GlooDeviceRegistry()->Create("WIN32", interfaceName, hostName);
165+
return GlooDeviceRegistry()->Create(
166+
"WIN32", interfaceName, hostName, lazyInit);
151167
#endif
152168

153169
return nullptr;
154170
}
155171
} // anonymous namespace
156172

157173
std::shared_ptr<::gloo::transport::Device> GlooDeviceFactory::
158-
makeDeviceForInterface(const std::string& interfaceName) {
159-
auto device = makeGlooDevice(interfaceName, "");
174+
makeDeviceForInterface(const std::string& interfaceName, bool lazyInit) {
175+
auto device = makeGlooDevice(interfaceName, "", lazyInit);
160176
if (!device) {
161177
TORCH_CHECK(false, "makeDeviceForInterface(): unsupported gloo device");
162178
}
163179
return device;
164180
}
165181

166182
std::shared_ptr<::gloo::transport::Device> GlooDeviceFactory::
167-
makeDeviceForHostname(const std::string& hostname) {
168-
auto device = makeGlooDevice("", hostname);
183+
makeDeviceForHostname(const std::string& hostname, bool lazyInit) {
184+
auto device = makeGlooDevice("", hostname, lazyInit);
169185
if (!device) {
170186
TORCH_CHECK(false, "makeDeviceForHostname(): unsupported gloo device");
171187
}

torch/csrc/distributed/c10d/GlooDeviceFactory.hpp

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,18 +14,21 @@ class TORCH_API GlooDeviceFactory {
1414
public:
1515
// Create new device instance for specific interface.
1616
static std::shared_ptr<::gloo::transport::Device> makeDeviceForInterface(
17-
const std::string& interface);
17+
const std::string& interface,
18+
bool lazyInit);
1819

1920
// Create new device instance for specific hostname or address.
2021
static std::shared_ptr<::gloo::transport::Device> makeDeviceForHostname(
21-
const std::string& hostname);
22+
const std::string& hostname,
23+
bool lazyInit);
2224
};
2325

2426
TORCH_DECLARE_SHARED_REGISTRY(
2527
GlooDeviceRegistry,
2628
::gloo::transport::Device,
2729
const std::string&, /* interface */
28-
const std::string& /* hostname */);
30+
const std::string&, /* hostname */
31+
bool /* lazyInit */);
2932

3033
} // namespace c10d
3134

torch/csrc/distributed/c10d/ProcessGroupGloo.cpp

Lines changed: 17 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -415,6 +415,10 @@ const auto kLoopbackAddress = "127.0.0.1";
415415

416416
} // namespace
417417

418+
bool getDefaultGlooLazyInit() {
419+
return ::c10d::getCvarBool(TORCH_GLOO_LAZY_INIT, false);
420+
}
421+
418422
// static
419423
void ProcessGroupGloo::AsyncWork::execute(
420424
const c10::intrusive_ptr<AsyncWork>& work) {
@@ -687,23 +691,24 @@ bool doesHostnameResolveToUsableAddress(const std::string& hostname) {
687691
} // namespace
688692

689693
std::shared_ptr<::gloo::transport::Device> ProcessGroupGloo::
690-
createDeviceForInterface(const std::string& interface_name) {
691-
return ::c10d::GlooDeviceFactory::makeDeviceForInterface(interface_name);
694+
createDeviceForInterface(const std::string& interface_name, bool lazyInit) {
695+
return ::c10d::GlooDeviceFactory::makeDeviceForInterface(
696+
interface_name, lazyInit);
692697
}
693698

694699
std::shared_ptr<::gloo::transport::Device> ProcessGroupGloo::
695-
createDeviceForHostname(const std::string& hostname) {
700+
createDeviceForHostname(const std::string& hostname, bool lazyInit) {
696701
TORCH_CHECK(
697702
doesHostnameResolveToUsableAddress(hostname),
698703
"Cannot resolve ",
699704
hostname,
700705
" to a (local) address");
701-
return ::c10d::GlooDeviceFactory::makeDeviceForHostname(hostname);
706+
return ::c10d::GlooDeviceFactory::makeDeviceForHostname(hostname, lazyInit);
702707
}
703708

704709
#if defined(__linux__) || defined(_WIN32)
705710
std::shared_ptr<::gloo::transport::Device> ProcessGroupGloo::
706-
createDefaultDevice() {
711+
createDefaultDevice(bool lazyInit) {
707712
// Use the hostname to resolve the network address to
708713
// use. Note: if the hostname does not resolve to an address (e.g.
709714
// because of misconfigured /etc/hosts file), this will not work.
@@ -716,21 +721,22 @@ std::shared_ptr<::gloo::transport::Device> ProcessGroupGloo::
716721

717722
// Use this machine's hostname if it resolves to an address.
718723
if (doesHostnameResolveToUsableAddress(hostname.data())) {
719-
return ::c10d::GlooDeviceFactory::makeDeviceForHostname(hostname.data());
724+
return ::c10d::GlooDeviceFactory::makeDeviceForHostname(
725+
hostname.data(), lazyInit);
720726
}
721727

722728
// Otherwise, use the loopback address.
723729
TORCH_WARN_ONCE(
724730
"Unable to resolve hostname to a (local) address. ",
725731
"Using the loopback address as fallback. ",
726732
"Manually set the network interface to bind to with GLOO_SOCKET_IFNAME.");
727-
return createDeviceForHostname(kLoopbackAddress);
733+
return createDeviceForHostname(kLoopbackAddress, lazyInit);
728734
}
729735
#endif
730736

731737
#ifdef __APPLE__
732738
std::shared_ptr<::gloo::transport::Device> ProcessGroupGloo::
733-
createDefaultDevice() {
739+
createDefaultDevice(bool lazyInit) {
734740
// Use the hostname to resolve the network address to
735741
// use. Note: if the hostname does not resolve to an address (e.g.
736742
// because of misconfigured /etc/hosts file), this will not work.
@@ -743,15 +749,16 @@ std::shared_ptr<::gloo::transport::Device> ProcessGroupGloo::
743749

744750
// Use this machine's hostname if it resolves to an address.
745751
if (doesHostnameResolveToUsableAddress(hostname.get())) {
746-
return ::c10d::GlooDeviceFactory::makeDeviceForHostname(hostname.get());
752+
return ::c10d::GlooDeviceFactory::makeDeviceForHostname(
753+
hostname.get(), lazyInit);
747754
}
748755

749756
// Otherwise, use the loopback address.
750757
TORCH_WARN_ONCE(
751758
"Unable to resolve hostname to a (local) address. ",
752759
"Using the loopback address as fallback. ",
753760
"Manually set the network interface to bind to with GLOO_SOCKET_IFNAME.");
754-
return createDeviceForHostname(kLoopbackAddress);
761+
return createDeviceForHostname(kLoopbackAddress, lazyInit);
755762
}
756763
#endif
757764

torch/csrc/distributed/c10d/ProcessGroupGloo.hpp

Lines changed: 13 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,13 @@ namespace c10d {
2828

2929
constexpr const char* GLOO_BACKEND_NAME = "gloo";
3030

31+
// Control whether or not connections are established in a full mesh or lazily
32+
// as needed.
33+
static std::vector<std::string> TORCH_GLOO_LAZY_INIT = {"TORCH_GLOO_LAZY_INIT"};
34+
35+
// Returns default value for lazyInit.
36+
bool TORCH_API getDefaultGlooLazyInit();
37+
3138
// ProcessGroupGloo implements Gloo bindings for c10d.
3239
//
3340
// All functions on this class are expected to be called in the same
@@ -244,24 +251,20 @@ class TORCH_API ProcessGroupGloo : public Backend {
244251

245252
// Create new device instance for specific interface.
246253
static std::shared_ptr<::gloo::transport::Device> createDeviceForInterface(
247-
const std::string& interface);
254+
const std::string& interface,
255+
bool lazyInit = false);
248256

249257
// Create new device instance for specific hostname or address.
250258
static std::shared_ptr<::gloo::transport::Device> createDeviceForHostname(
251-
const std::string& hostname);
259+
const std::string& hostname,
260+
bool lazyInit = false);
252261

253262
// Create new device instance.
254263
// It tries to resolve this machine's hostname and bind to that address.
255264
// If that fails (i.e. the hostname doesn't resolve to an address), it
256265
// falls back to binding to the loopback address.
257-
static std::shared_ptr<::gloo::transport::Device> createDefaultDevice();
258-
259-
// Create ProcessGroupGloo instance.
260-
static c10::intrusive_ptr<ProcessGroupGloo> createProcessGroupGloo(
261-
const c10::intrusive_ptr<Store>& store,
262-
int rank,
263-
int size,
264-
std::chrono::milliseconds timeout);
266+
static std::shared_ptr<::gloo::transport::Device> createDefaultDevice(
267+
bool lazyInit = false);
265268

266269
explicit ProcessGroupGloo(
267270
const c10::intrusive_ptr<Store>& store,

0 commit comments

Comments
 (0)