Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
27da88d
Initial commit (via PkgTemplates)
NHDaly Feb 13, 2022
4c66820
Add CI.yml - test with singlethreaded and multithreaded
NHDaly Feb 14, 2022
ca1710a
Finish implementing MultiThreadedCache, and add stress test.
NHDaly Feb 14, 2022
14d7450
Free the Futures once they're no longer needed, to prevent leaks
NHDaly Feb 14, 2022
790d468
Fix CI.yml syntax for environment variables
NHDaly Feb 14, 2022
1da361e
Add julia "1" to build config
NHDaly Feb 14, 2022
9a3209b
Add MacOS and Windows to build config
NHDaly Feb 14, 2022
7b593d6
Add constructor that provides pre-computed values to the base_cache
NHDaly Feb 14, 2022
de48129
Merge pull request #2 from NHDaly/nhd-constructors
NHDaly Feb 14, 2022
5696600
Gracefully handle exceptions thrown during `get!()` functions
NHDaly Feb 14, 2022
f503eaa
Add benchmark test measuring parallel scaling.
NHDaly Feb 14, 2022
18ba084
Merge pull request #3 from NHDaly/nhd-exception-handling
NHDaly Feb 14, 2022
0faa198
Fix lazy construction of Dicts, per guidance from Julia Base
NHDaly Feb 15, 2022
11e1289
Merge pull request #4 from NHDaly/nhd-benchmark-parallel-scaling
NHDaly Feb 15, 2022
55d3727
Merge pull request #5 from NHDaly/nhd-fix-lazy-construction
NHDaly Feb 15, 2022
2c950eb
Fix thread-safety violation by accidentally not locking the base cach…
NHDaly Feb 15, 2022
38e664d
Fix concurrency-safety by not holding the get!() on the thread-local …
NHDaly Feb 15, 2022
76a48d5
Use per-thread locks to make invariant to Task migration.
NHDaly Feb 15, 2022
24cb6f2
Merge pull request #6 from NHDaly/nhd-concurrency-safety-fixes
NHDaly Feb 17, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 49 additions & 0 deletions .github/workflows/CI.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
name: CI
on:
pull_request:
push:
branches:
- master
tags: '*'
jobs:
test:
name: Julia ${{ matrix.version }} - ${{ matrix.os }} - ${{ matrix.arch }} - ${{ github.event_name }}
runs-on: ${{ matrix.os }}
strategy:
fail-fast: false
matrix:
version:
- '1.3'
- '1'
- 'nightly'
os:
- ubuntu-latest
- macOS-latest
- windows-latest
arch:
- x64
env:
- JULIA_NUM_THREADS=1
- JULIA_NUM_THREADS=6
steps:
- uses: actions/checkout@v2
- uses: julia-actions/setup-julia@v1
with:
version: ${{ matrix.version }}
arch: ${{ matrix.arch }}
- uses: actions/cache@v1
env:
cache-name: cache-artifacts
with:
path: ~/.julia/artifacts
key: ${{ runner.os }}-test-${{ env.cache-name }}-${{ hashFiles('**/Project.toml') }}
restore-keys: |
${{ runner.os }}-test-${{ env.cache-name }}-
${{ runner.os }}-test-
${{ runner.os }}-
- uses: julia-actions/julia-buildpkg@v1
- uses: julia-actions/julia-runtest@v1
- uses: julia-actions/julia-processcoverage@v1
- uses: codecov/codecov-action@v1
with:
file: lcov.info
16 changes: 16 additions & 0 deletions .github/workflows/CompatHelper.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
name: CompatHelper
on:
schedule:
- cron: 0 0 * * *
workflow_dispatch:
jobs:
CompatHelper:
runs-on: ubuntu-latest
steps:
- name: Pkg.add("CompatHelper")
run: julia -e 'using Pkg; Pkg.add("CompatHelper")'
- name: CompatHelper.main()
env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
COMPATHELPER_PRIV: ${{ secrets.DOCUMENTER_KEY }}
run: julia -e 'using CompatHelper; CompatHelper.main()'
15 changes: 15 additions & 0 deletions .github/workflows/TagBot.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
name: TagBot
on:
issue_comment:
types:
- created
workflow_dispatch:
jobs:
TagBot:
if: github.event_name == 'workflow_dispatch' || github.actor == 'JuliaTagBot'
runs-on: ubuntu-latest
steps:
- uses: JuliaRegistries/TagBot@v1
with:
token: ${{ secrets.GITHUB_TOKEN }}
ssh: ${{ secrets.DOCUMENTER_KEY }}
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
/Manifest.toml
21 changes: 21 additions & 0 deletions LICENSE
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
MIT License

Copyright (c) 2022 Nathan Daly <[email protected]> and contributors

Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:

The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.

THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
13 changes: 13 additions & 0 deletions Project.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
name = "MultiThreadedCaches"
uuid = "18056a9e-ed0c-4ef3-9ad7-376a7fb08032"
authors = ["Nathan Daly <[email protected]> and contributors"]
version = "0.1.0"

[compat]
julia = "1.3"

[extras]
Test = "8dfed614-e22c-5e08-85e1-65c5234f0b40"

[targets]
test = ["Test"]
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
# MultiThreadedCaches
232 changes: 232 additions & 0 deletions src/MultiThreadedCaches.jl
Original file line number Diff line number Diff line change
@@ -0,0 +1,232 @@
module MultiThreadedCaches

using Base: @lock

export MultiThreadedCache, init_cache!

"""
MultiThreadedCache{K,V}()
MultiThreadedCache{K,V}([initial_kv_values])

`MultiThreadedCache{K,V}()` constructs a fast, thread-safe cache.

This cache stores k=>v pairs that cache a deterministic computation. The only API into the
cache is `get!()`: you can look up a key, and if it is not available, you can produce a
value which will be added to the cache.

Accesses to the cache will look first in the per-thread cache, and then fall back to the
shared thread-safe cache. Concurrent misses to the same key in the shared cache will
coordinate, so that only one Task will perform the compuatation for that value, and the
other Task(s) will block.

# Examples:
```julia
julia> cache = MultiThreadedCache{Int, Int}(Dict(1=>2, 2=>3))
MultiThreadedCache{Int64, Int64}(Dict(2 => 3, 1 => 2))

julia> init_cache!(cache)
MultiThreadedCache{Int64, Int64}(Dict(2 => 3, 1 => 2))

julia> get!(cache, 2) do
2+1
end
3

julia> get!(cache, 5) do
5+1
end
6

julia> get!(cache, 5) do
5+10
end
6
```
"""
struct MultiThreadedCache{K,V}
thread_caches::Vector{Dict{K,V}}
thread_locks::Vector{ReentrantLock}

base_cache::Dict{K,V} # Guarded by: base_cache_lock
base_cache_lock::ReentrantLock
base_cache_futures::Dict{K,Channel{V}} # Guarded by: base_cache_lock

function MultiThreadedCache{K,V}() where {K,V}
base_cache = Dict{K,V}()

return MultiThreadedCache{K,V}(base_cache)
end

function MultiThreadedCache{K,V}(base_cache::Dict) where {K,V}
thread_caches = Dict{K,V}[]
thread_locks = ReentrantLock[]

base_cache_lock = ReentrantLock()
base_cache_futures = Dict{K,Channel{V}}()

return new(thread_caches, thread_locks, base_cache, base_cache_lock, base_cache_futures)
end
end

"""
init_cache!(cache::MultiThreadedCache{K,V})

This function must be called inside a Module's `__init__()` method, to ensure that the
number of threads are set *at runtime, not precompilation time*.

!!! note
NOTE: This function is *not thread safe*, it must not be called concurrently with any other
code that touches the cache. This should only be called once, during module initialization.
"""
function init_cache!(cache::MultiThreadedCache{K,V}) where {K,V}
# Statically resize the vector, but wait to lazily create the dictionaries when
# requested, so that the object will be allocated on the thread that will consume it.
# (This follows the guidance from Julia Base.)
resize!(cache.thread_caches, Threads.nthreads())
resize!(cache.thread_locks, Threads.nthreads())
return cache
end

function Base.show(io::IO, cache::MultiThreadedCache{K,V}) where {K,V}
print(io, "$(MultiThreadedCache{K,V})(", cache.base_cache, ")")
end

# Based upon the thread-safe Global RNG implementation in the Random stdlib:
# https://github.com/JuliaLang/julia/blob/e4fcdf5b04fd9751ce48b0afc700330475b42443/stdlib/Random/src/RNGs.jl#L369-L385
# Get or lazily construct the per-thread cache when first requested.
function _thread_cache(mtcache::MultiThreadedCache, tid)
length(mtcache.thread_caches) >= Threads.nthreads() || _thread_cache_length_assert()
if @inbounds isassigned(mtcache.thread_caches, tid)
@inbounds cache = mtcache.thread_caches[tid]
else
# We copy the base cache to all the thread caches, so that any precomputed values
# can be shared without having to wait for a cache miss.
cache = copy(mtcache.base_cache)
@inbounds mtcache.thread_caches[tid] = cache
end
return cache
end
@noinline _thread_cache_length_assert() = @assert false "** Must call `init_cache!(cache)` in your Module's __init__()! - length(cache.thread_caches) < Threads.nthreads() "
function _thread_lock(cache::MultiThreadedCache, tid)
length(cache.thread_locks) >= Threads.nthreads() || _thread_cache_length_assert()
if @inbounds isassigned(cache.thread_locks, tid)
@inbounds lock = cache.thread_locks[tid]
else
lock = eltype(cache.thread_locks)()
@inbounds cache.thread_locks[tid] = lock
end
return lock
end
@noinline _thread_cache_length_assert() = @assert false "** Must call `init_cache!(cache)` in your Module's __init__()! - length(cache.thread_caches) < Threads.nthreads() "


const CACHE_MISS = :__MultiThreadedCaches_key_not_found__

function Base.get!(func::Base.Callable, cache::MultiThreadedCache{K,V}, key) where {K,V}
# If the thread-local cache has the value, we can return immediately.
# We store tcache in a local variable, so that even if the Task migrates Threads, we are
# still operating on the same initial cache object.
tid = Threads.threadid()
tcache = _thread_cache(cache, tid)
tlock = _thread_lock(cache, tid)
# We have to lock during access to the thread-local dict, because it's possible that the
# Task may migrate to another thread by the end, and we really might be mutating the
# dict in parallel. But most of the time this lock should have 0 contention, since it's
# only held during get() and set!().
Base.@lock tlock begin
thread_local_cached_value_or_miss = get(tcache, key, CACHE_MISS)
if thread_local_cached_value_or_miss !== CACHE_MISS
return thread_local_cached_value_or_miss::V
end
end
# If not, we need to check the base cache.
# When we're done, call this function to set the result
@inline function _store_result!(v::V; test_haskey::Bool)
# Set the value into thread-local cache for the supplied key.
# Note that we must perform two separate get() and setindex!() calls, for
# concurrency-safety, in case the dict has been mutated by another task in between.
# TODO: For 100% concurrency-safety, we maybe want to lock around the get() above
# and the setindex!() here.. it's probably fine without it, but needs considering.
# Currently this is relying on get() and setindex!() having no yields.
Base.@lock tlock begin
if test_haskey
if !haskey(tcache, key)
setindex!(tcache, key, v)
end
else
setindex!(tcache, key, v)
end
end
end

# Even though we're using Thread-local caches, we still need to lock during
# construction to prevent multiple tasks redundantly constructing the same object,
# and potential thread safety violations due to Tasks migrating threads.
# NOTE that we only grab the lock if the key doesn't exist, so the mutex contention
# is not on the critical path for most accessses. :)
is_first_task = false
local future # used only if the base_cache doesn't have the key
# We lock the mutex, but for only a short, *constant time* duration, to grab the
# future for this key, or to create the future if it doesn't exist.
@lock cache.base_cache_lock begin
value_or_miss = get(cache.base_cache, key, CACHE_MISS)
if value_or_miss !== CACHE_MISS
return value_or_miss::V
end
future = get!(cache.base_cache_futures, key) do
is_first_task = true
Channel{V}(1)
end
end
if is_first_task
v = try
func()
catch e
# In the case of an exception, we abort the current computation of this
# key/value pair, and throw the exception onto the future, so that all
# pending tasks will see the exeption as well.
#
# NOTE: we could also cache the exception and throw it from now on, but this
# would make interactive development difficult, since once you fix the
# error, you'd have to clear out your cache. So instead, we just rethrow the
# exception and don't cache anything, so that you can fix the exception and
# continue on. (This means that throwing exceptions remains expensive.)

# close(::Channel, ::Exception) requires an Exception object, so if the user
# threw a non-Exception, we convert it to one, here.
e isa Exception || (e = ErrorException("Non-exception object thrown during get!(): $e"))
close(future, e)
# As below, the future isn't needed after this returns (see below).
@lock cache.base_cache_lock begin
delete!(cache.base_cache_futures, key)
end
rethrow(e)
end
# Finally, lock again for a *constant time* to insert the computed value into
# the shared cache, so that we can free the Channel and future gets can read
# from the shared base_cache.
@lock cache.base_cache_lock begin
cache.base_cache[key] = v
# We no longer need the Future, since all future requests will see the key
# in the base_cache. (Other Tasks may still hold a reference, but it will
# be GC'd once they have all completed.)
delete!(cache.base_cache_futures, key)
end
# Store the result in this thread-local dictionary.
_store_result!(v, test_haskey=false)
# Return v to any other Tasks that were blocking on this key.
put!(future, v)
return v
else
# Block on the future until the first task that asked for this key finishes
# computing a value for it.
v = fetch(future)
# Store the result in our original thread-local cache (if another Task hasn't) set
# it already.
_store_result!(v, test_haskey=true)
return v
end
end


end # module
Loading