Skip to content

Commit 1b8f074

Browse files
committed
Reland "IO: tie lifetime of handle field to container (#43218)"
Reverts a400a24 (#43924). Fix lifetime issues with the original attempt. We do not need to prevent it from being GC-finalized: only to make sure it does not conflict with a concurrent uv_close callback.
1 parent f10ba9c commit 1b8f074

File tree

10 files changed

+145
-128
lines changed

10 files changed

+145
-128
lines changed

base/asyncevent.jl

Lines changed: 14 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -14,9 +14,9 @@ Use [`isopen`](@ref) to check whether it is still active.
1414
This provides an implicit acquire & release memory ordering between the sending and waiting threads.
1515
"""
1616
mutable struct AsyncCondition
17-
handle::Ptr{Cvoid}
17+
@atomic handle::Ptr{Cvoid}
1818
cond::ThreadSynchronizer
19-
isopen::Bool
19+
@atomic isopen::Bool
2020
@atomic set::Bool
2121

2222
function AsyncCondition()
@@ -77,9 +77,9 @@ once. When the timer is closed (by [`close`](@ref)) waiting tasks are woken with
7777
7878
"""
7979
mutable struct Timer
80-
handle::Ptr{Cvoid}
80+
@atomic handle::Ptr{Cvoid}
8181
cond::ThreadSynchronizer
82-
isopen::Bool
82+
@atomic isopen::Bool
8383
@atomic set::Bool
8484

8585
function Timer(timeout::Real; interval::Real = 0.0)
@@ -149,12 +149,12 @@ function wait(t::Union{Timer, AsyncCondition})
149149
end
150150

151151

152-
isopen(t::Union{Timer, AsyncCondition}) = t.isopen
152+
isopen(t::Union{Timer, AsyncCondition}) = t.isopen && t.handle != C_NULL
153153

154154
function close(t::Union{Timer, AsyncCondition})
155155
iolock_begin()
156-
if t.handle != C_NULL && isopen(t)
157-
t.isopen = false
156+
if isopen(t)
157+
@atomic :monotonic t.isopen = false
158158
ccall(:jl_close_uv, Cvoid, (Ptr{Cvoid},), t)
159159
end
160160
iolock_end()
@@ -166,12 +166,12 @@ function uvfinalize(t::Union{Timer, AsyncCondition})
166166
lock(t.cond)
167167
try
168168
if t.handle != C_NULL
169-
disassociate_julia_struct(t.handle) # not going to call the usual close hooks
169+
disassociate_julia_struct(t.handle) # not going to call the usual close hooks anymore
170170
if t.isopen
171-
t.isopen = false
172-
ccall(:jl_close_uv, Cvoid, (Ptr{Cvoid},), t)
171+
@atomic :monotonic t.isopen = false
172+
ccall(:jl_close_uv, Cvoid, (Ptr{Cvoid},), t.handle)
173173
end
174-
t.handle = C_NULL
174+
@atomic :monotonic t.handle = C_NULL
175175
notify(t.cond, false)
176176
end
177177
finally
@@ -184,9 +184,9 @@ end
184184
function _uv_hook_close(t::Union{Timer, AsyncCondition})
185185
lock(t.cond)
186186
try
187-
t.isopen = false
188-
t.handle = C_NULL
189-
notify(t.cond, t.set)
187+
@atomic :monotonic t.isopen = false
188+
Libc.free(@atomicswap :monotonic t.handle = C_NULL)
189+
notify(t.cond, false)
190190
finally
191191
unlock(t.cond)
192192
end

base/libuv.jl

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -61,8 +61,11 @@ function preserve_handle(x)
6161
end
6262
function unpreserve_handle(x)
6363
lock(preserve_handle_lock)
64-
v = uvhandles[x]::Int
65-
if v == 1
64+
v = get(uvhandles, x, 0)::Int
65+
if v == 0
66+
unlock(preserve_handle_lock)
67+
error("unbalanced call to unpreserve_handle for $(typeof(x))")
68+
elseif v == 1
6669
pop!(uvhandles, x)
6770
else
6871
uvhandles[x] = v - 1

base/process.jl

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ function uv_return_spawn(p::Ptr{Cvoid}, exit_status::Int64, termsignal::Int32)
5656
proc = unsafe_pointer_to_objref(data)::Process
5757
proc.exitcode = exit_status
5858
proc.termsignal = termsignal
59-
disassociate_julia_struct(proc) # ensure that data field is set to C_NULL
59+
disassociate_julia_struct(proc.handle) # ensure that data field is set to C_NULL
6060
ccall(:jl_close_uv, Cvoid, (Ptr{Cvoid},), proc.handle)
6161
proc.handle = C_NULL
6262
lock(proc.exitnotify)
@@ -70,7 +70,7 @@ end
7070

7171
# called when the libuv handle is destroyed
7272
function _uv_hook_close(proc::Process)
73-
proc.handle = C_NULL
73+
Libc.free(@atomicswap :not_atomic proc.handle = C_NULL)
7474
nothing
7575
end
7676

@@ -607,10 +607,10 @@ Get the child process ID, if it still exists.
607607
This function requires at least Julia 1.1.
608608
"""
609609
function Libc.getpid(p::Process)
610-
# TODO: due to threading, this method is no longer synchronized with the user application
610+
# TODO: due to threading, this method is only weakly synchronized with the user application
611611
iolock_begin()
612612
ppid = Int32(0)
613-
if p.handle != C_NULL
613+
if p.handle != C_NULL # e.g. process_running
614614
ppid = ccall(:jl_uv_process_pid, Int32, (Ptr{Cvoid},), p.handle)
615615
end
616616
iolock_end()

base/stream.jl

Lines changed: 13 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -377,7 +377,7 @@ if OS_HANDLE != RawFD
377377
end
378378

379379
function isopen(x::Union{LibuvStream, LibuvServer})
380-
if x.status == StatusUninit || x.status == StatusInit
380+
if x.status == StatusUninit || x.status == StatusInit || x.handle === C_NULL
381381
throw(ArgumentError("$x is not initialized"))
382382
end
383383
return x.status != StatusClosed
@@ -496,34 +496,37 @@ end
496496

497497
function close(stream::Union{LibuvStream, LibuvServer})
498498
iolock_begin()
499-
should_wait = false
500499
if stream.status == StatusInit
501500
ccall(:jl_forceclose_uv, Cvoid, (Ptr{Cvoid},), stream.handle)
502501
stream.status = StatusClosing
503502
elseif isopen(stream)
504-
should_wait = uv_handle_data(stream) != C_NULL
505503
if stream.status != StatusClosing
506504
ccall(:jl_close_uv, Cvoid, (Ptr{Cvoid},), stream.handle)
507505
stream.status = StatusClosing
508506
end
509507
end
510508
iolock_end()
511-
should_wait && wait_close(stream)
509+
wait_close(stream)
512510
nothing
513511
end
514512

515513
function uvfinalize(uv::Union{LibuvStream, LibuvServer})
516-
uv.handle == C_NULL && return
517514
iolock_begin()
518515
if uv.handle != C_NULL
519-
disassociate_julia_struct(uv.handle) # not going to call the usual close hooks
520-
if uv.status != StatusUninit
521-
close(uv)
522-
else
516+
disassociate_julia_struct(uv.handle) # not going to call the usual close hooks (so preserve_handle is not needed)
517+
if uv.status == StatusUninit
518+
Libc.free(uv.handle)
519+
elseif uv.status == StatusInit
520+
ccall(:jl_forceclose_uv, Cvoid, (Ptr{Cvoid},), uv.handle)
521+
elseif isopen(uv)
522+
if uv.status != StatusClosing
523+
ccall(:jl_close_uv, Cvoid, (Ptr{Cvoid},), uv.handle)
524+
end
525+
elseif uv.status == StatusClosed
523526
Libc.free(uv.handle)
524527
end
525-
uv.status = StatusClosed
526528
uv.handle = C_NULL
529+
uv.status = StatusClosed
527530
end
528531
iolock_end()
529532
nothing
@@ -713,7 +716,6 @@ end
713716
function _uv_hook_close(uv::Union{LibuvStream, LibuvServer})
714717
lock(uv.cond)
715718
try
716-
uv.handle = C_NULL
717719
uv.status = StatusClosed
718720
# notify any listeners that exist on this libuv stream type
719721
notify(uv.cond)

src/init.c

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -165,8 +165,7 @@ static void jl_close_item_atexit(uv_handle_t *handle)
165165
switch(handle->type) {
166166
case UV_PROCESS:
167167
// cause Julia to forget about the Process object
168-
if (handle->data)
169-
jl_uv_call_close_callback((jl_value_t*)handle->data);
168+
handle->data = NULL;
170169
// and make libuv think it is already dead
171170
((uv_process_t*)handle)->pid = 0;
172171
// fall-through

src/jl_uv.c

Lines changed: 27 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -77,14 +77,16 @@ JL_DLLEXPORT void jl_iolock_end(void)
7777
}
7878

7979

80-
void jl_uv_call_close_callback(jl_value_t *val)
80+
static void jl_uv_call_close_callback(jl_value_t *val)
8181
{
82-
jl_value_t *args[2];
82+
jl_value_t **args;
83+
JL_GC_PUSHARGS(args, 2); // val is "rooted" in the finalizer list only right now
8384
args[0] = jl_get_global(jl_base_relative_to(((jl_datatype_t*)jl_typeof(val))->name->module),
8485
jl_symbol("_uv_hook_close")); // topmod(typeof(val))._uv_hook_close
8586
args[1] = val;
8687
assert(args[0]);
8788
jl_apply(args, 2); // TODO: wrap in try-catch?
89+
JL_GC_POP();
8890
}
8991

9092
static void jl_uv_closeHandle(uv_handle_t *handle)
@@ -105,6 +107,7 @@ static void jl_uv_closeHandle(uv_handle_t *handle)
105107
ct->world_age = jl_atomic_load_acquire(&jl_world_counter);
106108
jl_uv_call_close_callback((jl_value_t*)handle->data);
107109
ct->world_age = last_age;
110+
return;
108111
}
109112
if (handle == (uv_handle_t*)&signal_async)
110113
return;
@@ -125,6 +128,10 @@ static void jl_uv_flush_close_callback(uv_write_t *req, int status)
125128
free(req);
126129
return;
127130
}
131+
if (uv_is_closing((uv_handle_t*)stream)) { // avoid double-close on the stream
132+
free(req);
133+
return;
134+
}
128135
if (status == 0 && uv_is_writable(stream) && stream->write_queue_size != 0) {
129136
// new data was written, wait for it to flush too
130137
uv_buf_t buf;
@@ -134,12 +141,9 @@ static void jl_uv_flush_close_callback(uv_write_t *req, int status)
134141
if (uv_write(req, stream, &buf, 1, (uv_write_cb)jl_uv_flush_close_callback) == 0)
135142
return; // success
136143
}
137-
if (!uv_is_closing((uv_handle_t*)stream)) { // avoid double-close on the stream
138-
if (stream->type == UV_TTY)
139-
uv_tty_set_mode((uv_tty_t*)stream, UV_TTY_MODE_NORMAL);
140-
uv_close((uv_handle_t*)stream, &jl_uv_closeHandle);
141-
}
142-
free(req);
144+
if (stream->type == UV_TTY)
145+
uv_tty_set_mode((uv_tty_t*)stream, UV_TTY_MODE_NORMAL);
146+
uv_close((uv_handle_t*)stream, &jl_uv_closeHandle);
143147
}
144148

145149
static void uv_flush_callback(uv_write_t *req, int status)
@@ -224,47 +228,42 @@ static void jl_proc_exit_cleanup_cb(uv_process_t *process, int64_t exit_status,
224228

225229
JL_DLLEXPORT void jl_close_uv(uv_handle_t *handle)
226230
{
231+
JL_UV_LOCK();
227232
if (handle->type == UV_PROCESS && ((uv_process_t*)handle)->pid != 0) {
228233
// take ownership of this handle,
229234
// so we can waitpid for the resource to exit and avoid leaving zombies
230235
assert(handle->data == NULL); // make sure Julia has forgotten about it already
231236
((uv_process_t*)handle)->exit_cb = jl_proc_exit_cleanup_cb;
232-
return;
237+
uv_unref(handle);
233238
}
234-
JL_UV_LOCK();
235-
if (handle->type == UV_FILE) {
239+
else if (handle->type == UV_FILE) {
236240
uv_fs_t req;
237241
jl_uv_file_t *fd = (jl_uv_file_t*)handle;
238242
if ((ssize_t)fd->file != -1) {
239243
uv_fs_close(handle->loop, &req, fd->file, NULL);
240244
fd->file = (uv_os_fd_t)(ssize_t)-1;
241245
}
242246
jl_uv_closeHandle(handle); // synchronous (ok since the callback is known to not interact with any global state)
243-
JL_UV_UNLOCK();
244-
return;
245-
}
246-
247-
if (handle->type == UV_NAMED_PIPE || handle->type == UV_TCP || handle->type == UV_TTY) {
248-
uv_write_t *req = (uv_write_t*)malloc_s(sizeof(uv_write_t));
249-
req->handle = (uv_stream_t*)handle;
250-
jl_uv_flush_close_callback(req, 0);
251-
JL_UV_UNLOCK();
252-
return;
253247
}
254-
255-
// avoid double-closing the stream
256-
if (!uv_is_closing(handle)) {
257-
uv_close(handle, &jl_uv_closeHandle);
248+
else if (!uv_is_closing(handle)) { // avoid double-closing the stream
249+
if (handle->type == UV_NAMED_PIPE || handle->type == UV_TCP || handle->type == UV_TTY) {
250+
// flush the stream write-queue first
251+
uv_write_t *req = (uv_write_t*)malloc_s(sizeof(uv_write_t));
252+
req->handle = (uv_stream_t*)handle;
253+
jl_uv_flush_close_callback(req, 0);
254+
}
255+
else {
256+
uv_close(handle, &jl_uv_closeHandle);
257+
}
258258
}
259259
JL_UV_UNLOCK();
260260
}
261261

262262
JL_DLLEXPORT void jl_forceclose_uv(uv_handle_t *handle)
263263
{
264-
// avoid double-closing the stream
265-
if (!uv_is_closing(handle)) {
264+
if (!uv_is_closing(handle)) { // avoid double-closing the stream
266265
JL_UV_LOCK();
267-
if (!uv_is_closing(handle)) {
266+
if (!uv_is_closing(handle)) { // double-check
268267
uv_close(handle, &jl_uv_closeHandle);
269268
}
270269
JL_UV_UNLOCK();

src/julia_internal.h

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -568,7 +568,6 @@ JL_DLLEXPORT jl_fptr_args_t jl_get_builtin_fptr(jl_value_t *b);
568568

569569
extern uv_loop_t *jl_io_loop;
570570
void jl_uv_flush(uv_stream_t *stream);
571-
void jl_uv_call_close_callback(jl_value_t *val);
572571

573572
typedef struct jl_typeenv_t {
574573
jl_tvar_t *var;

0 commit comments

Comments
 (0)