Skip to content

Commit 63f43ba

Browse files
committed
deal with remaining race conditions in these APIs
The jl_live_tasks API now reports all threads, instead of only Tasks first started by the current thread. There is a new abstraction called mtarraylist with adds functionality to small_arraylist (it is layout-compatible). In particular, it makes it safe for another thread to observe the content of the list concurrently with any mutations.
1 parent 53b7b8e commit 63f43ba

File tree

8 files changed

+177
-52
lines changed

8 files changed

+177
-52
lines changed

src/Makefile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ endif
4343
SRCS := \
4444
jltypes gf typemap smallintset ast builtins module interpreter symbol \
4545
dlload sys init task array staticdata toplevel jl_uv datatype \
46-
simplevector runtime_intrinsics precompile jloptions \
46+
simplevector runtime_intrinsics precompile jloptions mtarraylist \
4747
threading partr stackwalk gc gc-debug gc-pages gc-stacks gc-alloc-profiler method \
4848
jlapi signal-handling safepoint timing subtype rtutils gc-heap-snapshot \
4949
crc32c APInt-C processor ircode opaque_closure codegen-stubs coverage runtime_ccall

src/gc-stacks.c

Lines changed: 54 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -116,7 +116,7 @@ static void _jl_free_stack(jl_ptls_t ptls, void *stkbuf, size_t bufsz)
116116
if (bufsz <= pool_sizes[JL_N_STACK_POOLS - 1]) {
117117
unsigned pool_id = select_pool(bufsz);
118118
if (pool_sizes[pool_id] == bufsz) {
119-
arraylist_push(&ptls->heap.free_stacks[pool_id], stkbuf);
119+
small_arraylist_push(&ptls->heap.free_stacks[pool_id], stkbuf);
120120
return;
121121
}
122122
}
@@ -145,7 +145,7 @@ void jl_release_task_stack(jl_ptls_t ptls, jl_task_t *task)
145145
#ifdef _COMPILER_ASAN_ENABLED_
146146
__asan_unpoison_stack_memory((uintptr_t)stkbuf, bufsz);
147147
#endif
148-
arraylist_push(&ptls->heap.free_stacks[pool_id], stkbuf);
148+
small_arraylist_push(&ptls->heap.free_stacks[pool_id], stkbuf);
149149
}
150150
}
151151
}
@@ -160,9 +160,9 @@ JL_DLLEXPORT void *jl_malloc_stack(size_t *bufsz, jl_task_t *owner) JL_NOTSAFEPO
160160
if (ssize <= pool_sizes[JL_N_STACK_POOLS - 1]) {
161161
unsigned pool_id = select_pool(ssize);
162162
ssize = pool_sizes[pool_id];
163-
arraylist_t *pool = &ptls->heap.free_stacks[pool_id];
163+
small_arraylist_t *pool = &ptls->heap.free_stacks[pool_id];
164164
if (pool->len > 0) {
165-
stk = arraylist_pop(pool);
165+
stk = small_arraylist_pop(pool);
166166
}
167167
}
168168
else {
@@ -181,8 +181,8 @@ JL_DLLEXPORT void *jl_malloc_stack(size_t *bufsz, jl_task_t *owner) JL_NOTSAFEPO
181181
}
182182
*bufsz = ssize;
183183
if (owner) {
184-
arraylist_t *live_tasks = &ptls->heap.live_tasks;
185-
arraylist_push(live_tasks, owner);
184+
small_arraylist_t *live_tasks = &ptls->heap.live_tasks;
185+
mtarraylist_push(live_tasks, owner);
186186
}
187187
return stk;
188188
}
@@ -206,7 +206,7 @@ void sweep_stack_pools(void)
206206

207207
// free half of stacks that remain unused since last sweep
208208
for (int p = 0; p < JL_N_STACK_POOLS; p++) {
209-
arraylist_t *al = &ptls2->heap.free_stacks[p];
209+
small_arraylist_t *al = &ptls2->heap.free_stacks[p];
210210
size_t n_to_free;
211211
if (al->len > MIN_STACK_MAPPINGS_PER_POOL) {
212212
n_to_free = al->len / 2;
@@ -217,12 +217,12 @@ void sweep_stack_pools(void)
217217
n_to_free = 0;
218218
}
219219
for (int n = 0; n < n_to_free; n++) {
220-
void *stk = arraylist_pop(al);
220+
void *stk = small_arraylist_pop(al);
221221
free_stack(stk, pool_sizes[p]);
222222
}
223223
}
224224

225-
arraylist_t *live_tasks = &ptls2->heap.live_tasks;
225+
small_arraylist_t *live_tasks = &ptls2->heap.live_tasks;
226226
size_t n = 0;
227227
size_t ndel = 0;
228228
size_t l = live_tasks->len;
@@ -265,24 +265,52 @@ void sweep_stack_pools(void)
265265

266266
JL_DLLEXPORT jl_array_t *jl_live_tasks(void)
267267
{
268-
jl_task_t *ct = jl_current_task;
269-
jl_ptls_t ptls = ct->ptls;
270-
arraylist_t *live_tasks = &ptls->heap.live_tasks;
271-
size_t i, j, l;
272-
jl_array_t *a;
273-
do {
274-
l = live_tasks->len;
275-
a = jl_alloc_vec_any(l + 1); // may gc, changing the number of tasks
276-
} while (l + 1 < live_tasks->len);
277-
l = live_tasks->len;
278-
void **lst = live_tasks->items;
279-
j = 0;
280-
((void**)jl_array_data(a))[j++] = ptls->root_task;
281-
for (i = 0; i < l; i++) {
282-
if (((jl_task_t*)lst[i])->stkbuf != NULL)
283-
((void**)jl_array_data(a))[j++] = lst[i];
268+
size_t nthreads = jl_atomic_load_acquire(&jl_n_threads);
269+
jl_ptls_t *allstates = jl_atomic_load_relaxed(&jl_all_tls_states);
270+
size_t l = 0; // l is not reset on restart, so we keep getting more aggressive at making a big enough list everything it fails
271+
restart:
272+
for (size_t i = 0; i < nthreads; i++) {
273+
// skip GC threads since they don't have tasks
274+
if (gc_first_tid <= i && i < gc_first_tid + jl_n_gcthreads) {
275+
continue;
276+
}
277+
jl_ptls_t ptls2 = allstates[i];
278+
if (ptls2 == NULL)
279+
continue;
280+
small_arraylist_t *live_tasks = &ptls2->heap.live_tasks;
281+
size_t n = mtarraylist_length(live_tasks);
282+
l += n + (ptls2->root_task->stkbuf != NULL);
283+
}
284+
l += l / 20; // add 5% for margin of estimation error
285+
jl_array_t *a = jl_alloc_vec_any(l); // may gc, changing the number of tasks and forcing us to reload everything
286+
nthreads = jl_atomic_load_acquire(&jl_n_threads);
287+
allstates = jl_atomic_load_relaxed(&jl_all_tls_states);
288+
size_t j = 0;
289+
for (size_t i = 0; i < nthreads; i++) {
290+
// skip GC threads since they don't have tasks
291+
if (gc_first_tid <= i && i < gc_first_tid + jl_n_gcthreads) {
292+
continue;
293+
}
294+
jl_ptls_t ptls2 = allstates[i];
295+
if (ptls2 == NULL)
296+
continue;
297+
jl_task_t *t = ptls2->root_task;
298+
if (t->stkbuf != NULL) {
299+
if (j == l)
300+
goto restart;
301+
((void**)jl_array_data(a))[j++] = t;
302+
}
303+
small_arraylist_t *live_tasks = &ptls2->heap.live_tasks;
304+
size_t n = mtarraylist_length(live_tasks);
305+
for (size_t i = 0; i < n; i++) {
306+
jl_task_t *t = (jl_task_t*)mtarraylist_get(live_tasks, i);
307+
if (t->stkbuf != NULL) {
308+
if (j == l)
309+
goto restart;
310+
((void**)jl_array_data(a))[j++] = t;
311+
}
312+
}
284313
}
285-
l = jl_array_len(a);
286314
if (j < l) {
287315
JL_GC_PUSH1(&a);
288316
jl_array_del_end(a, l - j);

src/gc.c

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -954,7 +954,7 @@ JL_DLLEXPORT jl_weakref_t *jl_gc_new_weakref_th(jl_ptls_t ptls,
954954
jl_weakref_t *wr = (jl_weakref_t*)jl_gc_alloc(ptls, sizeof(void*),
955955
jl_weakref_type);
956956
wr->value = value; // NOTE: wb not needed here
957-
arraylist_push(&ptls->heap.weak_refs, wr);
957+
small_arraylist_push(&ptls->heap.weak_refs, wr);
958958
return wr;
959959
}
960960

@@ -3551,8 +3551,10 @@ void jl_init_thread_heap(jl_ptls_t ptls)
35513551
p[i].freelist = NULL;
35523552
p[i].newpages = NULL;
35533553
}
3554-
arraylist_new(&heap->weak_refs, 0);
3555-
arraylist_new(&heap->live_tasks, 0);
3554+
small_arraylist_new(&heap->weak_refs, 0);
3555+
small_arraylist_new(&heap->live_tasks, 0);
3556+
for (int i = 0; i < JL_N_STACK_POOLS; i++)
3557+
small_arraylist_new(&heap->free_stacks[i], 0);
35563558
heap->mallocarrays = NULL;
35573559
heap->mafreelist = NULL;
35583560
heap->big_objects = NULL;

src/julia.h

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1054,6 +1054,11 @@ JL_DLLEXPORT void *jl_gc_managed_realloc(void *d, size_t sz, size_t oldsz,
10541054
int isaligned, jl_value_t *owner);
10551055
JL_DLLEXPORT void jl_gc_safepoint(void);
10561056

1057+
void *mtarraylist_get(small_arraylist_t *_a, size_t idx) JL_NOTSAFEPOINT;
1058+
size_t mtarraylist_length(small_arraylist_t *_a) JL_NOTSAFEPOINT;
1059+
void mtarraylist_add(small_arraylist_t *_a, void *elt, size_t idx) JL_NOTSAFEPOINT;
1060+
void mtarraylist_push(small_arraylist_t *_a, void *elt) JL_NOTSAFEPOINT;
1061+
10571062
// object accessors -----------------------------------------------------------
10581063

10591064
#define jl_svec_len(t) (((jl_svec_t*)(t))->length)

src/julia_threads.h

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -140,10 +140,10 @@ typedef struct {
140140

141141
typedef struct {
142142
// variable for tracking weak references
143-
arraylist_t weak_refs;
143+
small_arraylist_t weak_refs;
144144
// live tasks started on this thread
145145
// that are holding onto a stack from the pool
146-
arraylist_t live_tasks;
146+
small_arraylist_t live_tasks;
147147

148148
// variables for tracking malloc'd arrays
149149
struct _mallocarray_t *mallocarrays;
@@ -170,7 +170,7 @@ typedef struct {
170170
jl_gc_pool_t norm_pools[JL_GC_N_POOLS];
171171

172172
#define JL_N_STACK_POOLS 16
173-
arraylist_t free_stacks[JL_N_STACK_POOLS];
173+
small_arraylist_t free_stacks[JL_N_STACK_POOLS];
174174
} jl_thread_heap_t;
175175

176176
typedef struct {

src/mtarraylist.c

Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
// This file is a part of Julia. License is MIT: https://julialang.org/license
2+
3+
#include "julia.h"
4+
#include "julia_internal.h"
5+
#include "julia_assert.h"
6+
7+
#ifdef __cplusplus
8+
extern "C" {
9+
#endif
10+
11+
// this file provides some alternate API functions for small_arraylist (push and add)
12+
// which can be safely observed from other threads concurrently
13+
// there is only permitted to be a single writer thread (or a mutex)
14+
// but there can be any number of observers
15+
16+
typedef struct {
17+
_Atomic(uint32_t) len;
18+
uint32_t max;
19+
_Atomic(_Atomic(void*)*) items;
20+
_Atomic(void*) _space[SMALL_AL_N_INLINE];
21+
} small_mtarraylist_t;
22+
23+
// change capacity to at least newlen
24+
static void mtarraylist_resizeto(small_mtarraylist_t *a, size_t len, size_t newlen)
25+
{
26+
size_t max = a->max;
27+
if (newlen > max) {
28+
size_t nm = max * 2;
29+
if (nm == 0)
30+
nm = 1;
31+
while (newlen > nm)
32+
nm *= 2;
33+
void **olditems = (void**)a->items;
34+
void *p = calloc_s(nm * sizeof(*a->items));
35+
memcpy(p, (void*)a->items, len * sizeof(*a->items));
36+
jl_atomic_store_release(&a->items, (_Atomic(void*)*)p);
37+
a->max = nm;
38+
if (olditems != (void*)&a->_space[0]) {
39+
jl_task_t *ct = jl_current_task;
40+
jl_gc_add_quiescent(ct->ptls, olditems, free);
41+
}
42+
}
43+
}
44+
45+
// single-threaded
46+
void mtarraylist_push(small_arraylist_t *_a, void *elt)
47+
{
48+
small_mtarraylist_t *a = (small_mtarraylist_t*)_a;
49+
size_t len = jl_atomic_load_relaxed(&a->len);
50+
mtarraylist_resizeto(a, len, len + 1);
51+
jl_atomic_store_release(&jl_atomic_load_relaxed(&a->items)[len], elt);
52+
jl_atomic_store_release(&a->len, len + 1);
53+
}
54+
55+
// single-threaded
56+
void mtarraylist_add(small_arraylist_t *_a, void *elt, size_t idx)
57+
{
58+
small_mtarraylist_t *a = (small_mtarraylist_t*)_a;
59+
size_t len = jl_atomic_load_relaxed(&a->len);
60+
mtarraylist_resizeto(a, len, idx + 1);
61+
jl_atomic_store_release(&jl_atomic_load_relaxed(&a->items)[idx], elt);
62+
if (jl_atomic_load_relaxed(&a->len) < idx + 1)
63+
jl_atomic_store_release(&a->len, idx + 1);
64+
}
65+
66+
// concurrent-safe
67+
size_t mtarraylist_length(small_arraylist_t *_a)
68+
{
69+
small_mtarraylist_t *a = (small_mtarraylist_t*)_a;
70+
return jl_atomic_load_relaxed(&a->len);
71+
}
72+
73+
// concurrent-safe
74+
void *mtarraylist_get(small_arraylist_t *_a, size_t idx)
75+
{
76+
small_mtarraylist_t *a = (small_mtarraylist_t*)_a;
77+
size_t len = jl_atomic_load_acquire(&a->len);
78+
if (idx >= len)
79+
return NULL;
80+
return jl_atomic_load_relaxed(&jl_atomic_load_relaxed(&a->items)[idx]);
81+
}

src/stackwalk.c

Lines changed: 25 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1127,7 +1127,7 @@ JL_DLLEXPORT void jlbacktrace(void) JL_NOTSAFEPOINT
11271127
}
11281128
}
11291129

1130-
// Print backtrace for specified task
1130+
// Print backtrace for specified task to jl_safe_printf stderr
11311131
JL_DLLEXPORT void jlbacktracet(jl_task_t *t) JL_NOTSAFEPOINT
11321132
{
11331133
jl_task_t *ct = jl_current_task;
@@ -1147,9 +1147,7 @@ JL_DLLEXPORT void jl_print_backtrace(void) JL_NOTSAFEPOINT
11471147

11481148
extern int gc_first_tid;
11491149

1150-
// Print backtraces for all live tasks, for all threads.
1151-
// WARNING: this is dangerous and can crash if used outside of gdb, if
1152-
// all of Julia's threads are not stopped!
1150+
// Print backtraces for all live tasks, for all threads, to jl_safe_printf stderr
11531151
JL_DLLEXPORT void jl_print_task_backtraces(int show_done) JL_NOTSAFEPOINT
11541152
{
11551153
size_t nthreads = jl_atomic_load_acquire(&jl_n_threads);
@@ -1160,24 +1158,33 @@ JL_DLLEXPORT void jl_print_task_backtraces(int show_done) JL_NOTSAFEPOINT
11601158
continue;
11611159
}
11621160
jl_ptls_t ptls2 = allstates[i];
1163-
arraylist_t *live_tasks = &ptls2->heap.live_tasks;
1164-
size_t n = live_tasks->len;
1161+
if (ptls2 == NULL)
1162+
continue;
1163+
small_arraylist_t *live_tasks = &ptls2->heap.live_tasks;
1164+
size_t n = mtarraylist_length(live_tasks);
1165+
jl_task_t *t = ptls2->root_task;
1166+
int t_state = jl_atomic_load_relaxed(&t->_state);
11651167
jl_safe_printf("==== Thread %d created %zu live tasks\n",
1166-
ptls2->tid + 1, n + 1);
1167-
jl_safe_printf(" ---- Root task (%p)\n", ptls2->root_task);
1168-
jl_safe_printf(" (sticky: %d, started: %d, state: %d, tid: %d)\n",
1169-
ptls2->root_task->sticky, ptls2->root_task->started,
1170-
jl_atomic_load_relaxed(&ptls2->root_task->_state),
1171-
jl_atomic_load_relaxed(&ptls2->root_task->tid) + 1);
1172-
jlbacktracet(ptls2->root_task);
1168+
ptls2->tid + 1, n + (t_state != JL_TASK_STATE_DONE));
1169+
if (show_done || t_state != JL_TASK_STATE_DONE) {
1170+
jl_safe_printf(" ---- Root task (%p)\n", ptls2->root_task);
1171+
jl_safe_printf(" (sticky: %d, started: %d, state: %d, tid: %d)\n",
1172+
t->sticky, t->started, t_state,
1173+
jl_atomic_load_relaxed(&t->tid) + 1);
1174+
if (t->stkbuf != NULL)
1175+
jlbacktracet(t);
1176+
else
1177+
jl_safe_printf(" no stack\n");
1178+
jl_safe_printf(" ---- End root task\n");
1179+
}
11731180

1174-
void **lst = live_tasks->items;
1175-
for (size_t j = 0; j < live_tasks->len; j++) {
1176-
jl_task_t *t = (jl_task_t *)lst[j];
1181+
for (size_t j = 0; j < n; j++) {
1182+
jl_task_t *t = (jl_task_t*)mtarraylist_get(live_tasks, j);
1183+
if (t == NULL)
1184+
continue;
11771185
int t_state = jl_atomic_load_relaxed(&t->_state);
1178-
if (!show_done && t_state == JL_TASK_STATE_DONE) {
1186+
if (!show_done && t_state == JL_TASK_STATE_DONE)
11791187
continue;
1180-
}
11811188
jl_safe_printf(" ---- Task %zu (%p)\n", j + 1, t);
11821189
// n.b. this information might not be consistent with the stack printing after it, since it could start running or change tid, etc.
11831190
jl_safe_printf(" (sticky: %d, started: %d, state: %d, tid: %d)\n",

src/threading.c

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -314,6 +314,8 @@ static uv_mutex_t tls_lock; // controls write-access to these variables:
314314
_Atomic(jl_ptls_t*) jl_all_tls_states JL_GLOBALLY_ROOTED;
315315
int jl_all_tls_states_size;
316316
static uv_cond_t cond;
317+
// concurrent reads are permitted, using the same pattern as mtsmall_arraylist
318+
// it is implemented separately because the API of direct jl_all_tls_states use is already widely prevalent
317319

318320
// return calling thread's ID
319321
JL_DLLEXPORT int16_t jl_threadid(void)
@@ -382,10 +384,10 @@ jl_ptls_t jl_init_threadtls(int16_t tid)
382384
uv_cond_init(&ptls->wake_signal);
383385

384386
uv_mutex_lock(&tls_lock);
385-
jl_ptls_t *allstates = jl_atomic_load_relaxed(&jl_all_tls_states);
386387
if (tid == -1)
387388
tid = jl_atomic_load_relaxed(&jl_n_threads);
388389
ptls->tid = tid;
390+
jl_ptls_t *allstates = jl_atomic_load_relaxed(&jl_all_tls_states);
389391
if (jl_all_tls_states_size <= tid) {
390392
int i, newsize = jl_all_tls_states_size + tid + 2;
391393
jl_ptls_t *newpptls = (jl_ptls_t*)calloc(newsize, sizeof(jl_ptls_t));

0 commit comments

Comments
 (0)