Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions python/ray/_raylet.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ cdef extern from "Python.h":
ctypedef struct CPyThreadState "PyThreadState":
int recursion_limit
int recursion_remaining
int c_recursion_remaining

# From Include/ceveal.h#67
int Py_GetRecursionLimit()
Expand Down
66 changes: 52 additions & 14 deletions python/ray/_raylet.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -679,29 +679,63 @@ def compute_task_id(ObjectRef object_ref):


cdef increase_recursion_limit():
"""Double the recusion limit if current depth is close to the limit"""
"""
Ray does some weird things with asio fibers and asyncio to run asyncio actors.
This results in the Python interpreter thinking there's a lot of recursion depth,
so we need to increase the limit when we start getting close.

0x30C0000 is Python 3.12
On 3.12, when recursion depth increases, c_recursion_remaining will decrease,
and that's what's actually compared to raise a RecursionError. So increasing
it by 1000 when it drops below 1000 will keep us from raising the RecursionError.
https://github.com/python/cpython/blob/bfb9e2f4a4e690099ec2ec53c08b90f4d64fde36/Python/pystate.c#L1353
0x30B00A4 is Python 3.11
On 3.11, the recursion depth can be calculated with recursion_limit - recursion_remaining.
We can get the current limit with Py_GetRecursionLimit and set it with Py_SetRecursionLimit.
We'll double the limit when there's less than 500 remaining.
On older versions
There's simply a recursion_depth variable and we'll increase the max the same
way we do for 3.11.
"""
cdef:
CPyThreadState * s = <CPyThreadState *> PyThreadState_Get()
int current_limit = Py_GetRecursionLimit()
int new_limit = current_limit * 2
cdef extern from *:
"""
#if PY_VERSION_HEX >= 0x30C0000
#define CURRENT_DEPTH(x) ((x)->py_recursion_limit - (x)->py_recursion_remaining)
bool IncreaseRecursionLimitIfNeeded(PyThreadState *x) {
if (x->c_recursion_remaining < 1000) {
x->c_recursion_remaining += 1000;
return true;
}
return false;
}
#elif PY_VERSION_HEX >= 0x30B00A4
#define CURRENT_DEPTH(x) ((x)->recursion_limit - (x)->recursion_remaining)
bool IncreaseRecursionLimitIfNeeded(PyThreadState *x) {
int current_limit = Py_GetRecursionLimit();
int current_depth = x->recursion_limit - x->recursion_remaining;
if (current_limit - current_depth < 500) {
Py_SetRecursionLimit(current_limit * 2);
return true;
}
return false;
}
#else
#define CURRENT_DEPTH(x) ((x)->recursion_depth)
bool IncreaseRecursionLimitIfNeeded(PyThreadState *x) {
int current_limit = Py_GetRecursionLimit();
if (current_limit - x->recursion_depth < 500) {
Py_SetRecursionLimit(current_limit * 2);
return true;
}
return false;
}
#endif
"""
int CURRENT_DEPTH(CPyThreadState *x)
c_bool IncreaseRecursionLimitIfNeeded(CPyThreadState *x)

CPyThreadState * s = <CPyThreadState *> PyThreadState_Get()
c_bool increased_recursion_limit = IncreaseRecursionLimitIfNeeded(s)

int current_depth = CURRENT_DEPTH(s)
if current_limit - current_depth < 500:
Py_SetRecursionLimit(new_limit)
logger.debug("Increasing Python recursion limit to {} "
"current recursion depth is {}.".format(
new_limit, current_depth))
if increased_recursion_limit:
logger.debug("Increased Python recursion limit")


cdef CObjectLocationPtrToDict(CObjectLocation* c_object_location):
Expand Down Expand Up @@ -2462,6 +2496,10 @@ cdef CRayStatus task_execution_handler(
if hasattr(e, "unexpected_error_traceback"):
msg += (f" {e.unexpected_error_traceback}")
return CRayStatus.UnexpectedSystemExit(msg)
except Exception as e:
msg = "Unexpected exception raised in task execution handler: {}".format(e)
logger.error(msg)
return CRayStatus.UnexpectedSystemExit(msg)

return CRayStatus.OK()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,3 +176,7 @@ def send_multiple_tensors(
communicator_metadata.dst_rank,
communicator_metadata.communicator_name,
)

@staticmethod
def garbage_collect(tensor_transport_meta: CollectiveTransportMetadata):
pass
16 changes: 14 additions & 2 deletions python/ray/experimental/collective/nixl_tensor_transport.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,9 @@ def extract_tensor_transport_metadata(
device = None
tensor_meta = []
if gpu_object:
serialized_descs, agent_meta = nixl_backend.get_nixl_metadata(gpu_object)
reg_descs, serialized_descs, agent_meta = nixl_backend.get_nixl_metadata(
gpu_object
)
# We assume all tensors in one GPU object have the same device type.
device = gpu_object[0].device
for t in gpu_object:
Expand All @@ -64,10 +66,11 @@ def extract_tensor_transport_metadata(
)
tensor_meta.append((t.shape, t.dtype))
else:
serialized_descs, agent_meta = None, None
reg_descs, serialized_descs, agent_meta = None, None, None
return NixlTransportMetadata(
tensor_meta=tensor_meta,
tensor_device=device,
nixl_reg_descs=reg_descs,
nixl_serialized_descs=serialized_descs,
nixl_agent_meta=agent_meta,
)
Expand Down Expand Up @@ -150,3 +153,12 @@ def send_multiple_tensors(
raise NotImplementedError(
"NIXL transport does not support send_multiple_tensors, since it is a one-sided transport."
)

@staticmethod
def garbage_collect(tensor_transport_meta: NixlTransportMetadata):
from ray.util.collective.collective import get_group_handle
from ray.util.collective.collective_group.nixl_backend import NixlBackend

descs = tensor_transport_meta.nixl_reg_descs
nixl_backend: NixlBackend = get_group_handle(NIXL_GROUP_NAME)
nixl_backend.deregister_memory(descs)
10 changes: 10 additions & 0 deletions python/ray/experimental/collective/tensor_transport_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,3 +126,13 @@ def send_multiple_tensors(
tensors: The tensors to send.
communicator_metadata: The communicator metadata for the send/recv operation.
"""

@staticmethod
@abstractmethod
def garbage_collect(tensor_transport_meta: TensorTransportMetadata):
"""
Garbage collect for the tensor transport after the GPU object is freed.

Args:
tensor_transport_meta: The tensor transport metadata.
"""
Original file line number Diff line number Diff line change
Expand Up @@ -531,17 +531,22 @@ def get_gpu_object(

def free_object_primary_copy(self, object_id: str):
"""
Free the primary copy of the GPU object.
Free the primary copy of the GPU object. Expected to be idempotent when called from
free_actor_object_callback because the primary copy holder should always only have one ref
in the deque.
"""
from ray.experimental.gpu_object_manager.gpu_object_store import (
__ray_free__,
)

try:
src_actor = self.managed_gpu_object_metadata[object_id].src_actor
src_actor.__ray_call__.options(
concurrency_group="_ray_system", max_task_retries=-1
).remote(__ray_free__, object_id)
gpu_object_meta = self.managed_gpu_object_metadata[object_id]
src_actor = gpu_object_meta.src_actor
tensor_transport_backend = gpu_object_meta.tensor_transport_backend
tensor_transport_meta = gpu_object_meta.tensor_transport_meta
src_actor.__ray_call__.options(concurrency_group="_ray_system").remote(
__ray_free__, object_id, tensor_transport_backend, tensor_transport_meta
)
except Exception as e:
logger.error(
"Something went wrong while freeing the RDT object!", exc_info=e
Expand Down
17 changes: 12 additions & 5 deletions python/ray/experimental/gpu_object_manager/gpu_object_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,13 +104,20 @@ def __ray_recv__(
gpu_object_store.add_object(obj_id, tensors)


def __ray_free__(self, obj_id: str):
"""
Called on the primary copy holder. Note that the primary copy holder should always only have one ref
in the gpu object store.
"""
def __ray_free__(
self,
obj_id: str,
tensor_transport_backend: Backend,
tensor_transport_meta: TensorTransportMetadata,
):
try:
from ray._private.worker import global_worker
from ray.experimental.collective import get_tensor_transport_manager

tensor_transport_manager = get_tensor_transport_manager(
tensor_transport_backend
)
tensor_transport_manager.garbage_collect(tensor_transport_meta)

gpu_object_store = global_worker.gpu_object_manager.gpu_object_store
gpu_object_store.pop_object(obj_id)
Expand Down
12 changes: 9 additions & 3 deletions python/ray/util/collective/collective_group/nixl_backend.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import time
from typing import TYPE_CHECKING, List, Tuple
from typing import TYPE_CHECKING, Any, List, Tuple

from nixl._api import nixl_agent, nixl_agent_config

Expand Down Expand Up @@ -87,9 +87,11 @@ def recv(
break

nixl_agent.release_xfer_handle(xfer_handle)
nixl_agent.deregister_memory(local_descs)
nixl_agent.remove_remote_agent(remote_name)

def get_nixl_metadata(self, tensors: List["torch.Tensor"]) -> Tuple[bytes, bytes]:
def get_nixl_metadata(
self, tensors: List["torch.Tensor"]
) -> Tuple[Any, bytes, bytes]:
"""Get NIXL metadata for a set of tensors.

Args:
Expand All @@ -104,6 +106,10 @@ def get_nixl_metadata(self, tensors: List["torch.Tensor"]) -> Tuple[bytes, bytes
reg_descs = nixl_agent.register_memory(tensors)
xfer_descs = reg_descs.trim()
return (
reg_descs,
nixl_agent.get_serialized_descs(xfer_descs),
nixl_agent.get_agent_metadata(),
)

def deregister_memory(self, descs: Any):
self._nixl_agent.deregister_memory(descs)
3 changes: 2 additions & 1 deletion python/ray/util/collective/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from dataclasses import dataclass
from datetime import timedelta
from enum import Enum
from typing import TYPE_CHECKING, List, Optional, Tuple
from typing import TYPE_CHECKING, Any, List, Optional, Tuple

from numpy import int32

Expand Down Expand Up @@ -80,6 +80,7 @@ class NixlTransportMetadata(TensorTransportMetadata):
nixl_agent_meta: The additional metadata of the remote NIXL agent.
"""

nixl_reg_descs: Optional[Any] = None
nixl_serialized_descs: Optional[bytes] = None
nixl_agent_meta: Optional[bytes] = None

Expand Down