-
Notifications
You must be signed in to change notification settings - Fork 5
Implement MultiThreadedCache.jl #1
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from all commits
Commits
Show all changes
19 commits
Select commit
Hold shift + click to select a range
27da88d
Initial commit (via PkgTemplates)
NHDaly 4c66820
Add CI.yml - test with singlethreaded and multithreaded
NHDaly ca1710a
Finish implementing MultiThreadedCache, and add stress test.
NHDaly 14d7450
Free the Futures once they're no longer needed, to prevent leaks
NHDaly 790d468
Fix CI.yml syntax for environment variables
NHDaly 1da361e
Add julia "1" to build config
NHDaly 9a3209b
Add MacOS and Windows to build config
NHDaly 7b593d6
Add constructor that provides pre-computed values to the base_cache
NHDaly de48129
Merge pull request #2 from NHDaly/nhd-constructors
NHDaly 5696600
Gracefully handle exceptions thrown during `get!()` functions
NHDaly f503eaa
Add benchmark test measuring parallel scaling.
NHDaly 18ba084
Merge pull request #3 from NHDaly/nhd-exception-handling
NHDaly 0faa198
Fix lazy construction of Dicts, per guidance from Julia Base
NHDaly 11e1289
Merge pull request #4 from NHDaly/nhd-benchmark-parallel-scaling
NHDaly 55d3727
Merge pull request #5 from NHDaly/nhd-fix-lazy-construction
NHDaly 2c950eb
Fix thread-safety violation by accidentally not locking the base cach…
NHDaly 38e664d
Fix concurrency-safety by not holding the get!() on the thread-local …
NHDaly 76a48d5
Use per-thread locks to make invariant to Task migration.
NHDaly 24cb6f2
Merge pull request #6 from NHDaly/nhd-concurrency-safety-fixes
NHDaly File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,49 @@ | ||
name: CI | ||
on: | ||
pull_request: | ||
push: | ||
branches: | ||
- master | ||
tags: '*' | ||
jobs: | ||
test: | ||
name: Julia ${{ matrix.version }} - ${{ matrix.os }} - ${{ matrix.arch }} - ${{ github.event_name }} | ||
runs-on: ${{ matrix.os }} | ||
strategy: | ||
fail-fast: false | ||
matrix: | ||
version: | ||
- '1.3' | ||
- '1' | ||
- 'nightly' | ||
os: | ||
- ubuntu-latest | ||
- macOS-latest | ||
- windows-latest | ||
arch: | ||
- x64 | ||
env: | ||
- JULIA_NUM_THREADS=1 | ||
- JULIA_NUM_THREADS=6 | ||
steps: | ||
- uses: actions/checkout@v2 | ||
- uses: julia-actions/setup-julia@v1 | ||
with: | ||
version: ${{ matrix.version }} | ||
arch: ${{ matrix.arch }} | ||
- uses: actions/cache@v1 | ||
env: | ||
cache-name: cache-artifacts | ||
with: | ||
path: ~/.julia/artifacts | ||
key: ${{ runner.os }}-test-${{ env.cache-name }}-${{ hashFiles('**/Project.toml') }} | ||
restore-keys: | | ||
${{ runner.os }}-test-${{ env.cache-name }}- | ||
${{ runner.os }}-test- | ||
${{ runner.os }}- | ||
- uses: julia-actions/julia-buildpkg@v1 | ||
- uses: julia-actions/julia-runtest@v1 | ||
- uses: julia-actions/julia-processcoverage@v1 | ||
- uses: codecov/codecov-action@v1 | ||
with: | ||
file: lcov.info |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,16 @@ | ||
name: CompatHelper | ||
on: | ||
schedule: | ||
- cron: 0 0 * * * | ||
workflow_dispatch: | ||
jobs: | ||
CompatHelper: | ||
runs-on: ubuntu-latest | ||
steps: | ||
- name: Pkg.add("CompatHelper") | ||
run: julia -e 'using Pkg; Pkg.add("CompatHelper")' | ||
- name: CompatHelper.main() | ||
env: | ||
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} | ||
COMPATHELPER_PRIV: ${{ secrets.DOCUMENTER_KEY }} | ||
run: julia -e 'using CompatHelper; CompatHelper.main()' |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,15 @@ | ||
name: TagBot | ||
on: | ||
issue_comment: | ||
types: | ||
- created | ||
workflow_dispatch: | ||
jobs: | ||
TagBot: | ||
if: github.event_name == 'workflow_dispatch' || github.actor == 'JuliaTagBot' | ||
runs-on: ubuntu-latest | ||
steps: | ||
- uses: JuliaRegistries/TagBot@v1 | ||
with: | ||
token: ${{ secrets.GITHUB_TOKEN }} | ||
ssh: ${{ secrets.DOCUMENTER_KEY }} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
/Manifest.toml |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,21 @@ | ||
MIT License | ||
|
||
Copyright (c) 2022 Nathan Daly <[email protected]> and contributors | ||
|
||
Permission is hereby granted, free of charge, to any person obtaining a copy | ||
of this software and associated documentation files (the "Software"), to deal | ||
in the Software without restriction, including without limitation the rights | ||
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell | ||
copies of the Software, and to permit persons to whom the Software is | ||
furnished to do so, subject to the following conditions: | ||
|
||
The above copyright notice and this permission notice shall be included in all | ||
copies or substantial portions of the Software. | ||
|
||
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR | ||
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, | ||
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE | ||
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER | ||
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, | ||
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE | ||
SOFTWARE. |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,13 @@ | ||
name = "MultiThreadedCaches" | ||
uuid = "18056a9e-ed0c-4ef3-9ad7-376a7fb08032" | ||
authors = ["Nathan Daly <[email protected]> and contributors"] | ||
version = "0.1.0" | ||
|
||
[compat] | ||
julia = "1.3" | ||
|
||
[extras] | ||
Test = "8dfed614-e22c-5e08-85e1-65c5234f0b40" | ||
|
||
[targets] | ||
test = ["Test"] |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
# MultiThreadedCaches |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,232 @@ | ||
module MultiThreadedCaches | ||
|
||
using Base: @lock | ||
|
||
export MultiThreadedCache, init_cache! | ||
|
||
""" | ||
MultiThreadedCache{K,V}() | ||
MultiThreadedCache{K,V}([initial_kv_values]) | ||
|
||
`MultiThreadedCache{K,V}()` constructs a fast, thread-safe cache. | ||
|
||
This cache stores k=>v pairs that cache a deterministic computation. The only API into the | ||
cache is `get!()`: you can look up a key, and if it is not available, you can produce a | ||
value which will be added to the cache. | ||
|
||
Accesses to the cache will look first in the per-thread cache, and then fall back to the | ||
shared thread-safe cache. Concurrent misses to the same key in the shared cache will | ||
coordinate, so that only one Task will perform the compuatation for that value, and the | ||
other Task(s) will block. | ||
|
||
# Examples: | ||
```julia | ||
julia> cache = MultiThreadedCache{Int, Int}(Dict(1=>2, 2=>3)) | ||
MultiThreadedCache{Int64, Int64}(Dict(2 => 3, 1 => 2)) | ||
|
||
julia> init_cache!(cache) | ||
MultiThreadedCache{Int64, Int64}(Dict(2 => 3, 1 => 2)) | ||
|
||
julia> get!(cache, 2) do | ||
2+1 | ||
end | ||
3 | ||
|
||
julia> get!(cache, 5) do | ||
5+1 | ||
end | ||
6 | ||
|
||
julia> get!(cache, 5) do | ||
5+10 | ||
end | ||
6 | ||
``` | ||
""" | ||
struct MultiThreadedCache{K,V} | ||
thread_caches::Vector{Dict{K,V}} | ||
thread_locks::Vector{ReentrantLock} | ||
|
||
base_cache::Dict{K,V} # Guarded by: base_cache_lock | ||
base_cache_lock::ReentrantLock | ||
base_cache_futures::Dict{K,Channel{V}} # Guarded by: base_cache_lock | ||
|
||
function MultiThreadedCache{K,V}() where {K,V} | ||
base_cache = Dict{K,V}() | ||
|
||
return MultiThreadedCache{K,V}(base_cache) | ||
end | ||
|
||
function MultiThreadedCache{K,V}(base_cache::Dict) where {K,V} | ||
thread_caches = Dict{K,V}[] | ||
thread_locks = ReentrantLock[] | ||
|
||
base_cache_lock = ReentrantLock() | ||
base_cache_futures = Dict{K,Channel{V}}() | ||
|
||
return new(thread_caches, thread_locks, base_cache, base_cache_lock, base_cache_futures) | ||
end | ||
end | ||
|
||
""" | ||
init_cache!(cache::MultiThreadedCache{K,V}) | ||
|
||
This function must be called inside a Module's `__init__()` method, to ensure that the | ||
number of threads are set *at runtime, not precompilation time*. | ||
|
||
!!! note | ||
NOTE: This function is *not thread safe*, it must not be called concurrently with any other | ||
code that touches the cache. This should only be called once, during module initialization. | ||
""" | ||
function init_cache!(cache::MultiThreadedCache{K,V}) where {K,V} | ||
# Statically resize the vector, but wait to lazily create the dictionaries when | ||
# requested, so that the object will be allocated on the thread that will consume it. | ||
# (This follows the guidance from Julia Base.) | ||
resize!(cache.thread_caches, Threads.nthreads()) | ||
resize!(cache.thread_locks, Threads.nthreads()) | ||
return cache | ||
end | ||
|
||
function Base.show(io::IO, cache::MultiThreadedCache{K,V}) where {K,V} | ||
print(io, "$(MultiThreadedCache{K,V})(", cache.base_cache, ")") | ||
end | ||
|
||
# Based upon the thread-safe Global RNG implementation in the Random stdlib: | ||
# https://github.com/JuliaLang/julia/blob/e4fcdf5b04fd9751ce48b0afc700330475b42443/stdlib/Random/src/RNGs.jl#L369-L385 | ||
# Get or lazily construct the per-thread cache when first requested. | ||
function _thread_cache(mtcache::MultiThreadedCache, tid) | ||
length(mtcache.thread_caches) >= Threads.nthreads() || _thread_cache_length_assert() | ||
if @inbounds isassigned(mtcache.thread_caches, tid) | ||
@inbounds cache = mtcache.thread_caches[tid] | ||
else | ||
# We copy the base cache to all the thread caches, so that any precomputed values | ||
# can be shared without having to wait for a cache miss. | ||
cache = copy(mtcache.base_cache) | ||
@inbounds mtcache.thread_caches[tid] = cache | ||
end | ||
return cache | ||
end | ||
@noinline _thread_cache_length_assert() = @assert false "** Must call `init_cache!(cache)` in your Module's __init__()! - length(cache.thread_caches) < Threads.nthreads() " | ||
function _thread_lock(cache::MultiThreadedCache, tid) | ||
length(cache.thread_locks) >= Threads.nthreads() || _thread_cache_length_assert() | ||
if @inbounds isassigned(cache.thread_locks, tid) | ||
@inbounds lock = cache.thread_locks[tid] | ||
else | ||
lock = eltype(cache.thread_locks)() | ||
@inbounds cache.thread_locks[tid] = lock | ||
end | ||
return lock | ||
end | ||
@noinline _thread_cache_length_assert() = @assert false "** Must call `init_cache!(cache)` in your Module's __init__()! - length(cache.thread_caches) < Threads.nthreads() " | ||
|
||
|
||
const CACHE_MISS = :__MultiThreadedCaches_key_not_found__ | ||
|
||
function Base.get!(func::Base.Callable, cache::MultiThreadedCache{K,V}, key) where {K,V} | ||
# If the thread-local cache has the value, we can return immediately. | ||
# We store tcache in a local variable, so that even if the Task migrates Threads, we are | ||
# still operating on the same initial cache object. | ||
tid = Threads.threadid() | ||
tcache = _thread_cache(cache, tid) | ||
tlock = _thread_lock(cache, tid) | ||
# We have to lock during access to the thread-local dict, because it's possible that the | ||
# Task may migrate to another thread by the end, and we really might be mutating the | ||
# dict in parallel. But most of the time this lock should have 0 contention, since it's | ||
# only held during get() and set!(). | ||
Base.@lock tlock begin | ||
thread_local_cached_value_or_miss = get(tcache, key, CACHE_MISS) | ||
if thread_local_cached_value_or_miss !== CACHE_MISS | ||
return thread_local_cached_value_or_miss::V | ||
end | ||
end | ||
# If not, we need to check the base cache. | ||
# When we're done, call this function to set the result | ||
@inline function _store_result!(v::V; test_haskey::Bool) | ||
# Set the value into thread-local cache for the supplied key. | ||
# Note that we must perform two separate get() and setindex!() calls, for | ||
# concurrency-safety, in case the dict has been mutated by another task in between. | ||
# TODO: For 100% concurrency-safety, we maybe want to lock around the get() above | ||
# and the setindex!() here.. it's probably fine without it, but needs considering. | ||
# Currently this is relying on get() and setindex!() having no yields. | ||
Base.@lock tlock begin | ||
if test_haskey | ||
if !haskey(tcache, key) | ||
setindex!(tcache, key, v) | ||
end | ||
else | ||
setindex!(tcache, key, v) | ||
end | ||
end | ||
end | ||
|
||
# Even though we're using Thread-local caches, we still need to lock during | ||
# construction to prevent multiple tasks redundantly constructing the same object, | ||
# and potential thread safety violations due to Tasks migrating threads. | ||
# NOTE that we only grab the lock if the key doesn't exist, so the mutex contention | ||
# is not on the critical path for most accessses. :) | ||
is_first_task = false | ||
local future # used only if the base_cache doesn't have the key | ||
# We lock the mutex, but for only a short, *constant time* duration, to grab the | ||
# future for this key, or to create the future if it doesn't exist. | ||
@lock cache.base_cache_lock begin | ||
value_or_miss = get(cache.base_cache, key, CACHE_MISS) | ||
if value_or_miss !== CACHE_MISS | ||
return value_or_miss::V | ||
end | ||
future = get!(cache.base_cache_futures, key) do | ||
is_first_task = true | ||
Channel{V}(1) | ||
end | ||
end | ||
if is_first_task | ||
v = try | ||
func() | ||
catch e | ||
# In the case of an exception, we abort the current computation of this | ||
# key/value pair, and throw the exception onto the future, so that all | ||
# pending tasks will see the exeption as well. | ||
# | ||
# NOTE: we could also cache the exception and throw it from now on, but this | ||
# would make interactive development difficult, since once you fix the | ||
# error, you'd have to clear out your cache. So instead, we just rethrow the | ||
# exception and don't cache anything, so that you can fix the exception and | ||
# continue on. (This means that throwing exceptions remains expensive.) | ||
|
||
# close(::Channel, ::Exception) requires an Exception object, so if the user | ||
# threw a non-Exception, we convert it to one, here. | ||
e isa Exception || (e = ErrorException("Non-exception object thrown during get!(): $e")) | ||
close(future, e) | ||
# As below, the future isn't needed after this returns (see below). | ||
@lock cache.base_cache_lock begin | ||
delete!(cache.base_cache_futures, key) | ||
end | ||
rethrow(e) | ||
end | ||
# Finally, lock again for a *constant time* to insert the computed value into | ||
# the shared cache, so that we can free the Channel and future gets can read | ||
# from the shared base_cache. | ||
@lock cache.base_cache_lock begin | ||
cache.base_cache[key] = v | ||
# We no longer need the Future, since all future requests will see the key | ||
# in the base_cache. (Other Tasks may still hold a reference, but it will | ||
# be GC'd once they have all completed.) | ||
delete!(cache.base_cache_futures, key) | ||
end | ||
# Store the result in this thread-local dictionary. | ||
_store_result!(v, test_haskey=false) | ||
# Return v to any other Tasks that were blocking on this key. | ||
put!(future, v) | ||
return v | ||
else | ||
# Block on the future until the first task that asked for this key finishes | ||
# computing a value for it. | ||
v = fetch(future) | ||
# Store the result in our original thread-local cache (if another Task hasn't) set | ||
# it already. | ||
_store_result!(v, test_haskey=true) | ||
return v | ||
end | ||
end | ||
|
||
|
||
end # module |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.