Skip to content

Commit 761d0b6

Browse files
committed
Port #1146 & #1147 to deque::Injector and queue::SegQueue
1 parent 8144fbb commit 761d0b6

File tree

4 files changed

+77
-24
lines changed

4 files changed

+77
-24
lines changed

crossbeam-deque/src/deque.rs

Lines changed: 23 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
use std::alloc::{alloc_zeroed, handle_alloc_error, Layout};
12
use std::boxed::Box;
23
use std::cell::{Cell, UnsafeCell};
34
use std::cmp;
@@ -1203,11 +1204,6 @@ struct Slot<T> {
12031204
}
12041205

12051206
impl<T> Slot<T> {
1206-
const UNINIT: Self = Self {
1207-
task: UnsafeCell::new(MaybeUninit::uninit()),
1208-
state: AtomicUsize::new(0),
1209-
};
1210-
12111207
/// Waits until a task is written into the slot.
12121208
fn wait_write(&self) {
12131209
let backoff = Backoff::new();
@@ -1229,12 +1225,27 @@ struct Block<T> {
12291225
}
12301226

12311227
impl<T> Block<T> {
1232-
/// Creates an empty block that starts at `start_index`.
1233-
fn new() -> Block<T> {
1234-
Self {
1235-
next: AtomicPtr::new(ptr::null_mut()),
1236-
slots: [Slot::UNINIT; BLOCK_CAP],
1228+
/// Creates an empty block.
1229+
fn new() -> Box<Self> {
1230+
let layout = Layout::new::<Self>();
1231+
assert!(
1232+
layout.size() != 0,
1233+
"Block should never be zero-sized, as it has an AtomicPtr field"
1234+
);
1235+
// SAFETY: layout is not zero-sized
1236+
let ptr = unsafe { alloc_zeroed(layout) };
1237+
// Handle allocation failure
1238+
if ptr.is_null() {
1239+
handle_alloc_error(layout)
12371240
}
1241+
// SAFETY: This is safe because:
1242+
// [1] `Block::next` (AtomicPtr) may be safely zero initialized.
1243+
// [2] `Block::slots` (Array) may be safely zero initialized because of [3, 4].
1244+
// [3] `Slot::task` (UnsafeCell) may be safely zero initialized because it
1245+
// holds a MaybeUninit.
1246+
// [4] `Slot::state` (AtomicUsize) may be safely zero initialized.
1247+
// TODO: unsafe { Box::new_zeroed().assume_init() }
1248+
unsafe { Box::from_raw(ptr.cast()) }
12381249
}
12391250

12401251
/// Waits until the next pointer is set.
@@ -1313,7 +1324,7 @@ unsafe impl<T: Send> Sync for Injector<T> {}
13131324

13141325
impl<T> Default for Injector<T> {
13151326
fn default() -> Self {
1316-
let block = Box::into_raw(Box::new(Block::<T>::new()));
1327+
let block = Box::into_raw(Block::<T>::new());
13171328
Self {
13181329
head: CachePadded::new(Position {
13191330
block: AtomicPtr::new(block),
@@ -1374,7 +1385,7 @@ impl<T> Injector<T> {
13741385
// If we're going to have to install the next block, allocate it in advance in order to
13751386
// make the wait for other threads as short as possible.
13761387
if offset + 1 == BLOCK_CAP && next_block.is_none() {
1377-
next_block = Some(Box::new(Block::<T>::new()));
1388+
next_block = Some(Block::<T>::new());
13781389
}
13791390

13801391
let new_tail = tail + (1 << SHIFT);

crossbeam-deque/tests/injector.rs

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -373,3 +373,19 @@ fn destructors() {
373373
}
374374
}
375375
}
376+
377+
// If `Block` is created on the stack, the array of slots will multiply this `BigStruct` and
378+
// probably overflow the thread stack. It's now directly created on the heap to avoid this.
379+
#[test]
380+
fn stack_overflow() {
381+
const N: usize = 32_768;
382+
struct BigStruct {
383+
_data: [u8; N],
384+
}
385+
386+
let q = Injector::new();
387+
388+
q.push(BigStruct { _data: [0u8; N] });
389+
390+
while !matches!(q.steal(), Empty) {}
391+
}

crossbeam-queue/src/seg_queue.rs

Lines changed: 23 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
use alloc::alloc::{alloc_zeroed, handle_alloc_error, Layout};
12
use alloc::boxed::Box;
23
use core::cell::UnsafeCell;
34
use core::fmt;
@@ -36,11 +37,6 @@ struct Slot<T> {
3637
}
3738

3839
impl<T> Slot<T> {
39-
const UNINIT: Self = Self {
40-
value: UnsafeCell::new(MaybeUninit::uninit()),
41-
state: AtomicUsize::new(0),
42-
};
43-
4440
/// Waits until a value is written into the slot.
4541
fn wait_write(&self) {
4642
let backoff = Backoff::new();
@@ -62,12 +58,27 @@ struct Block<T> {
6258
}
6359

6460
impl<T> Block<T> {
65-
/// Creates an empty block that starts at `start_index`.
66-
fn new() -> Block<T> {
67-
Self {
68-
next: AtomicPtr::new(ptr::null_mut()),
69-
slots: [Slot::UNINIT; BLOCK_CAP],
61+
/// Creates an empty block.
62+
fn new() -> Box<Self> {
63+
let layout = Layout::new::<Self>();
64+
assert!(
65+
layout.size() != 0,
66+
"Block should never be zero-sized, as it has an AtomicPtr field"
67+
);
68+
// SAFETY: layout is not zero-sized
69+
let ptr = unsafe { alloc_zeroed(layout) };
70+
// Handle allocation failure
71+
if ptr.is_null() {
72+
handle_alloc_error(layout)
7073
}
74+
// SAFETY: This is safe because:
75+
// [1] `Block::next` (AtomicPtr) may be safely zero initialized.
76+
// [2] `Block::slots` (Array) may be safely zero initialized because of [3, 4].
77+
// [3] `Slot::value` (UnsafeCell) may be safely zero initialized because it
78+
// holds a MaybeUninit.
79+
// [4] `Slot::state` (AtomicUsize) may be safely zero initialized.
80+
// TODO: unsafe { Box::new_zeroed().assume_init() }
81+
unsafe { Box::from_raw(ptr.cast()) }
7182
}
7283

7384
/// Waits until the next pointer is set.
@@ -209,12 +220,12 @@ impl<T> SegQueue<T> {
209220
// If we're going to have to install the next block, allocate it in advance in order to
210221
// make the wait for other threads as short as possible.
211222
if offset + 1 == BLOCK_CAP && next_block.is_none() {
212-
next_block = Some(Box::new(Block::<T>::new()));
223+
next_block = Some(Block::<T>::new());
213224
}
214225

215226
// If this is the first push operation, we need to allocate the first block.
216227
if block.is_null() {
217-
let new = Box::into_raw(Box::new(Block::<T>::new()));
228+
let new = Box::into_raw(Block::<T>::new());
218229

219230
if self
220231
.tail

crossbeam-queue/tests/seg_queue.rs

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -193,3 +193,18 @@ fn into_iter_drop() {
193193
assert_eq!(i, j);
194194
}
195195
}
196+
197+
// If `Block` is created on the stack, the array of slots will multiply this `BigStruct` and
198+
// probably overflow the thread stack. It's now directly created on the heap to avoid this.
199+
#[test]
200+
fn stack_overflow() {
201+
const N: usize = 32_768;
202+
struct BigStruct {
203+
_data: [u8; N],
204+
}
205+
206+
let q = SegQueue::new();
207+
q.push(BigStruct { _data: [0u8; N] });
208+
209+
for _data in q.into_iter() {}
210+
}

0 commit comments

Comments
 (0)