Skip to content

Commit 4b0c8e7

Browse files
authored
Merge pull request #32114 from JuliaLang/jb/toomanytasks
avoid error when Task multiq is full
2 parents 8f9d356 + 169550a commit 4b0c8e7

File tree

3 files changed

+27
-15
lines changed

3 files changed

+27
-15
lines changed

base/task.jl

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -419,7 +419,12 @@ function enq_work(t::Task)
419419
push!(Workqueues[tid], t)
420420
else
421421
tid = 0
422-
ccall(:jl_enqueue_task, Cvoid, (Any,), t)
422+
if ccall(:jl_enqueue_task, Cint, (Any,), t) != 0
423+
# if multiq is full, give to a random thread (TODO fix)
424+
tid = mod(time_ns() % Int, Threads.nthreads()) + 1
425+
ccall(:jl_set_task_tid, Cvoid, (Any, Cint), t, tid-1)
426+
push!(Workqueues[tid], t)
427+
end
423428
end
424429
ccall(:jl_wakeup_thread, Cvoid, (Int16,), (tid - 1) % Int16)
425430
return t

src/partr.c

Lines changed: 17 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -46,10 +46,10 @@ typedef struct taskheap_tag {
4646

4747
/* multiqueue parameters */
4848
static const int16_t heap_d = 8;
49-
static const int heap_c = 4;
49+
static const int heap_c = 16;
5050

5151
/* size of each heap */
52-
static const int tasks_per_heap = 8192; // TODO: this should be smaller by default, but growable!
52+
static const int tasks_per_heap = 16384; // TODO: this should be smaller by default, but growable!
5353

5454
/* the multiqueue's heaps */
5555
static taskheap_t *heaps;
@@ -117,7 +117,7 @@ static inline int multiq_insert(jl_task_t *task, int16_t priority)
117117

118118
if (heaps[rn].ntasks >= tasks_per_heap) {
119119
jl_mutex_unlock_nogc(&heaps[rn].lock);
120-
jl_error("multiq insertion failed, increase #tasks per heap");
120+
// multiq insertion failed, increase #tasks per heap
121121
return -1;
122122
}
123123

@@ -287,9 +287,11 @@ void jl_threadfun(void *arg)
287287

288288

289289
// enqueue the specified task for execution
290-
JL_DLLEXPORT void jl_enqueue_task(jl_task_t *task)
290+
JL_DLLEXPORT int jl_enqueue_task(jl_task_t *task)
291291
{
292-
multiq_insert(task, task->prio);
292+
if (multiq_insert(task, task->prio) == -1)
293+
return 1;
294+
return 0;
293295
}
294296

295297

@@ -372,17 +374,22 @@ JL_DLLEXPORT void jl_wakeup_thread(int16_t tid)
372374
}
373375

374376

377+
JL_DLLEXPORT void jl_set_task_tid(jl_task_t *task, int tid)
378+
{
379+
// Try to acquire the lock on this task.
380+
// If this fails, we'll check for that error later (in jl_switchto).
381+
if (jl_atomic_load_acquire(&task->tid) != tid) {
382+
jl_atomic_compare_exchange(&task->tid, -1, tid);
383+
}
384+
}
385+
375386
// get the next runnable task from the multiq
376387
static jl_task_t *get_next_task(jl_value_t *getsticky)
377388
{
378389
jl_task_t *task = (jl_task_t*)jl_apply(&getsticky, 1);
379390
if (jl_typeis(task, jl_task_type)) {
380391
int self = jl_get_ptls_states()->tid;
381-
// try to acquire the lock on this task now
382-
// we'll check this error later (in yieldto)
383-
if (jl_atomic_load_acquire(&task->tid) != self) {
384-
jl_atomic_compare_exchange(&task->tid, -1, self);
385-
}
392+
jl_set_task_tid(task, self);
386393
return task;
387394
}
388395
#ifdef JULIA_ENABLE_THREADING

src/task.c

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -151,14 +151,14 @@ void JL_NORETURN jl_finish_task(jl_task_t *t, jl_value_t *resultval JL_MAYBE_UNR
151151
{
152152
jl_ptls_t ptls = jl_get_ptls_states();
153153
JL_SIGATOMIC_BEGIN();
154+
t->result = resultval;
155+
jl_gc_wb(t, t->result);
154156
if (t->exception != jl_nothing)
155-
t->state = failed_sym;
157+
jl_atomic_store_release(&t->state, failed_sym);
156158
else
157-
t->state = done_sym;
159+
jl_atomic_store_release(&t->state, done_sym);
158160
if (t->copy_stack) // early free of stkbuf
159161
t->stkbuf = NULL;
160-
t->result = resultval;
161-
jl_gc_wb(t, t->result);
162162
// ensure that state is cleared
163163
ptls->in_finalizer = 0;
164164
ptls->in_pure_callback = 0;

0 commit comments

Comments
 (0)