Skip to content

Commit 09fe663

Browse files
authored
Add ShardedReadIndicator (#3)
1 parent 8c85c4f commit 09fe663

File tree

4 files changed

+58
-18
lines changed

4 files changed

+58
-18
lines changed

.github/workflows/ci.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ jobs:
1818
nthreads:
1919
- '1'
2020
- '2'
21-
- '8'
21+
- '4'
2222
fail-fast: false
2323
name: Test Julia ${{ matrix.julia-version }} nthreads=${{ matrix.nthreads }}
2424
steps:

src/LeftRight.jl

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,5 +18,6 @@ include("core.jl")
1818
end # module Internal
1919

2020
const Guard = Internal.Guard
21+
# const SimpleGuard = Internal.SimpleGuard
2122

2223
end # baremodule LeftRight

src/core.jl

Lines changed: 45 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,7 @@
1+
###
2+
### ReadIndicator (single counter)
3+
###
4+
15
# TODO: maybe use "NO_WAITERS_MASK" so that depart can use `iszero`
26
const HAS_WAITER_MASK = ~(typemax(UInt) >> 1)
37

@@ -102,37 +106,65 @@ end
102106
# used for establishing the happens-before edge required for non-atomic store/load of
103107
# `ind.waiter` field.
104108

109+
###
110+
### ShardedReadIndicator
111+
###
112+
113+
struct ShardedReadIndicator
114+
indicators::Vector{ReadIndicator}
115+
end
116+
117+
ShardedReadIndicator() =
118+
ShardedReadIndicator([ReadIndicator() for _ in 1:Threads.nthreads()])
119+
120+
arrive!(ind::ShardedReadIndicator) = arrive!(ind.indicators[Threads.threadid()])
121+
122+
function wait_empty(ind::ShardedReadIndicator; options...)
123+
for sub in ind.indicators
124+
wait_empty(sub; options...)
125+
end
126+
end
127+
128+
###
129+
### LeftRight.Guard
130+
###
131+
105132
@enum LeftOrRight LEFT_READABLE RIGHT_READABLE
106133

107134
flip(x::LeftOrRight) = x == LEFT_READABLE ? RIGHT_READABLE : LEFT_READABLE
108135

109136
abstract type AbstractReadWriteGuard end # TODO: Move it to ConcurrentUtils
110137

111-
mutable struct Guard{Data} <: AbstractReadWriteGuard
138+
mutable struct GenericGuard{Data,Indicator} <: AbstractReadWriteGuard
112139
@const left::Data
113140
@const right::Data
114141
@atomic versionindex::Int
115142
@atomic leftright::LeftOrRight
116-
@const indicators::NTuple{2,ReadIndicator}
143+
@const indicators::NTuple{2,Indicator}
117144
@const lock::ReentrantLock
118145

119-
global function _Guard(left, right)
146+
global function _Guard(left, right, Indicator)
120147
right = right::typeof(left)
121-
indicators = (ReadIndicator(), ReadIndicator())
148+
indicators = (Indicator(), Indicator())
122149
lock = ReentrantLock()
123-
return new{typeof(left)}(left, right, 1, LEFT_READABLE, indicators, lock)
150+
D = typeof(left)
151+
I = typeof(indicators[1])
152+
return new{D,I}(left, right, 1, LEFT_READABLE, indicators, lock)
124153
end
125154
end
126155

127-
function Guard{Data}(f = Data) where {Data}
156+
const Guard{Data} = GenericGuard{Data,ShardedReadIndicator}
157+
const SimpleGuard{Data} = GenericGuard{Data,ReadIndicator}
158+
159+
function GenericGuard{Data,Indicator}(f = Data) where {Data,Indicator}
128160
left = f()::Data
129161
right = f()::Data
130-
return _Guard(left, right)::Guard{Data}
162+
return _Guard(left, right, Indicator)::GenericGuard{Data}
131163
end
132164

133-
Guard(f) = _Guard(f(), f())
165+
GenericGuard{<:Any,Indicator}(f) where {Indicator} = _Guard(f(), f(), Indicator)
134166

135-
function acquire_read(g::Guard)
167+
function acquire_read(g::GenericGuard)
136168
versionindex = @atomic :monotonic g.versionindex # [^monotonic_versionindex]
137169
token = arrive!(g.indicators[versionindex])
138170

@@ -143,12 +175,12 @@ end
143175
# [^monotonic_versionindex]: Since `g.versionindex` is just a "performance hint," it can be
144176
# loaded using `:monotonic`.
145177

146-
function release_read(::Guard, token)
178+
function release_read(::GenericGuard, token)
147179
depart!(token)
148180
return
149181
end
150182

151-
function LeftRight.guarding_read(f, g::Guard)
183+
function LeftRight.guarding_read(f, g::GenericGuard)
152184
token, data = acquire_read(g)
153185
try
154186
return f(data)
@@ -157,7 +189,7 @@ function LeftRight.guarding_read(f, g::Guard)
157189
end
158190
end
159191

160-
function LeftRight.guarding(f!, g::Guard)
192+
function LeftRight.guarding(f!, g::GenericGuard)
161193
lock(g.lock)
162194
try
163195
# No need to use `:acquire` since the lock already has ordered the access:
@@ -179,7 +211,7 @@ end
179211
# [^seq_cst_state_leftright]: Some accesses to `ind.state` and `g.leftright` must have
180212
# sequential consistent ordering. See discussion below for more details.
181213

182-
function toggle_and_wait(g::Guard)
214+
function toggle_and_wait(g::GenericGuard)
183215
prev = @atomic :monotonic g.versionindex # [^monotonic_versionindex]
184216
next = mod1(prev + 1, 2)
185217
wait_empty(g.indicators[next]) # [^w1]

test/LeftRightTests/src/test_core.jl

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,11 @@
11
module TestCore
22

33
using LeftRight
4+
using LeftRight.Internal: SimpleGuard
45
using Test
56

6-
function test_simple()
7-
g = LeftRight.Guard{Dict{Symbol,Int}}()
7+
function check_serial(Guard)
8+
g = Guard{Dict{Symbol,Int}}()
89
LeftRight.guarding(g) do dict
910
dict[:a] = 111
1011
end
@@ -25,8 +26,11 @@ function test_simple()
2526
end == 333
2627
end
2728

28-
function test_concurrency(; ntasks = Threads.nthreads(), ntries = 100_000)
29-
g = LeftRight.Guard() do
29+
test_serial() = check_serial(LeftRight.Guard)
30+
test_serial_simple() = check_serial(SimpleGuard)
31+
32+
function check_concurrency(Guard; ntasks = Threads.nthreads(), ntries = 10_000_000)
33+
g = Guard() do
3034
Dict(:a => 0, :b => 2)
3135
end
3236

@@ -59,4 +63,7 @@ function test_concurrency(; ntasks = Threads.nthreads(), ntries = 100_000)
5963
@test ok[]
6064
end
6165

66+
test_concurrency(; options...) = check_concurrency(LeftRight.Guard; options...)
67+
test_concurrency_simple(; options...) = check_concurrency(SimpleGuard; options...)
68+
6269
end # module

0 commit comments

Comments
 (0)