Skip to content

Commit 4215b87

Browse files
authored
Merge pull request #54 from JuliaConcurrent/dict-hist
Add divide-and-conquer histogram benchmark
2 parents 7c3d94b + c5edb15 commit 4215b87

File tree

6 files changed

+237
-37
lines changed

6 files changed

+237
-37
lines changed

benchmark/ConcurrentCollectionsBenchmarks/src/bench_dict_histogram.jl

Lines changed: 64 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -5,12 +5,12 @@ using BenchmarkTools
55
using ConcurrentCollections
66

77
function generate(; datasize = 2^19, nkeys = datasize)
8+
ints = rand(1:nkeys, datasize)
89
lastkey = string(nkeys)
910
prefix = suffix = ""
1011
# prefix = "9" ^ 30 # adding works for hashing and comparison
1112
# suffix = "0" ^ 30 # adding works for hashing (but not for comparison)
12-
ks = prefix .* string.(1:nkeys; pad = length(lastkey)) .* suffix
13-
data = rand(ks, datasize)
13+
data = prefix .* string.(ints; pad = length(lastkey)) .* suffix
1414
return data
1515
end
1616

@@ -46,6 +46,30 @@ function hist_parallel!(dict::ConcurrentDict, data; ntasks = Threads.nthreads())
4646
return dict
4747
end
4848

49+
function hist_dac_impl(data, chunk_starts, basesize)
50+
if length(chunk_starts) == 0
51+
return Dict{String,Int}()
52+
elseif length(chunk_starts) == 1
53+
i = @inbounds chunk_starts[begin]
54+
chunk = @inbounds data[i:min(i + basesize - 1, end)]
55+
return hist_seq!(Dict{String,Int}(), chunk)
56+
else
57+
h = length(chunk_starts) ÷ 2
58+
left_chunk = @view chunk_starts[begin:begin+h-1]
59+
right_chunk = @view chunk_starts[begin+h:end]
60+
task = Threads.@spawn hist_dac_impl(data, right_chunk, basesize)
61+
left = hist_dac_impl(data, left_chunk, basesize)
62+
right = fetch(task)::typeof(left)
63+
return mergewith!(+, left, right)
64+
end
65+
end
66+
67+
function hist_parallel_dac(data; ntasks = Threads.nthreads())
68+
basesize = cld(length(data), ntasks)
69+
chunk_starts = firstindex(data):basesize:lastindex(data)
70+
return hist_dac_impl(data, chunk_starts, basesize)
71+
end
72+
4973
function default_ntasks_list()
5074
ntasks_list = [Threads.nthreads()]
5175
if Threads.nthreads() > 2
@@ -56,30 +80,50 @@ end
5680

5781
const CACHE = Ref{Any}()
5882

59-
function setup(; ntasks_list = default_ntasks_list())
60-
CACHE[] = data = generate()
83+
function setup(;
84+
ntasks_list = default_ntasks_list(),
85+
datasize = 2^19,
86+
nkeys_list = [datasize],
87+
)
88+
CACHE[] = data = Dict(0 => generate(; nkeys = 1)) # dummy data for allocation
89+
empty!(data)
6190
T = typeof(data)
6291

6392
suite = BenchmarkGroup()
64-
suite["base-seq"] = @benchmarkable(
65-
# Base.Dict, sequential
66-
hist_seq!(dict, CACHE[]::$T),
67-
setup = (dict = Dict{String,Int}()),
68-
evals = 1,
69-
)
70-
suite["cdict-seq"] = @benchmarkable(
71-
# ConcurrentDict, sequential
72-
hist_seq!(dict, CACHE[]::$T),
73-
setup = (dict = ConcurrentDict{String,Int}()),
74-
evals = 1,
75-
)
76-
for ntasks in ntasks_list
77-
suite["cdict-ntasks=$ntasks"] = @benchmarkable(
78-
# ConcurrentDict, parallel
79-
hist_parallel!(dict, CACHE[]::$T; ntasks = $ntasks),
93+
for nkeys in nkeys_list
94+
data[nkeys] = generate(; datasize = datasize, nkeys = nkeys)
95+
96+
s0 = suite["nkeys=$nkeys"] = BenchmarkGroup()
97+
98+
sbs = s0["alg=:base_seq"] = BenchmarkGroup()
99+
sbs["ntasks=1"] = @benchmarkable(
100+
# Base.Dict, sequential
101+
hist_seq!(dict, (CACHE[]::$T)[$nkeys]),
102+
setup = (dict = Dict{String,Int}()),
103+
evals = 1,
104+
)
105+
scs = s0["alg=:cdict_seq"] = BenchmarkGroup()
106+
scs["ntasks=1"] = @benchmarkable(
107+
# ConcurrentDict, sequential
108+
hist_seq!(dict, (CACHE[]::$T)[$nkeys]),
80109
setup = (dict = ConcurrentDict{String,Int}()),
81110
evals = 1,
82111
)
112+
sbp = s0["alg=:base_par"] = BenchmarkGroup()
113+
scp = s0["alg=:cdict_par"] = BenchmarkGroup()
114+
for ntasks in ntasks_list
115+
sbp["ntasks=$ntasks"] = @benchmarkable(
116+
# Base.Dict, parallel
117+
hist_parallel_dac((CACHE[]::$T)[$nkeys]; ntasks = $ntasks),
118+
evals = 1,
119+
)
120+
scp["ntasks=$ntasks"] = @benchmarkable(
121+
# ConcurrentDict, parallel
122+
hist_parallel!(dict, (CACHE[]::$T)[$nkeys]; ntasks = $ntasks),
123+
setup = (dict = ConcurrentDict{String,Int}()),
124+
evals = 1,
125+
)
126+
end
83127
end
84128
return suite
85129
end
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
/backup
2+
/build
3+
/tmp

benchmark/dict_histogram/Makefile

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
JULIA = julia1.7
2+
JULIA_CMD ?= $(JULIA) --color=yes --startup-file=no
3+
4+
export JULIA_PROJECT = $(shell pwd)/../../test/ConcurrentCollectionsTests
5+
# export JULIA_LOAD_PATH = @
6+
7+
.PHONY: benchmark clean backup
8+
9+
BUILD = build
10+
11+
benchmark: $(BUILD)/results.json
12+
13+
$(BUILD)/results.json:
14+
$(JULIA_CMD) -t16 run.jl
15+
16+
clean:
17+
rm -fv $(BUILD)/*.json
18+
19+
backup:
20+
test -e $(BUILD)/results.json
21+
mkdir -pv backup
22+
rm -rf tmp/backup
23+
mkdir -pv tmp/backup/build
24+
mv $(BUILD)/* tmp/backup/build/
25+
mv tmp/backup backup/backup-$$(date +%Y-%m-%d-%H%M%S)

benchmark/dict_histogram/plot.jl

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
import BenchmarkConfigSweeps
2+
import BenchmarkTools
3+
using DataFrames
4+
using FileIO
5+
using JSON
6+
using Statistics
7+
using VegaLite
8+
9+
results = only(BenchmarkTools.load(joinpath(@__DIR__, "build/results.json")))
10+
df_raw = DataFrame(BenchmarkConfigSweeps.flattable(results))
11+
12+
access_param = "Access density"
13+
14+
begin
15+
df = select(df_raw, Not(:trial))
16+
df = select(df, Not(r"JULIA_.*"))
17+
df[:, :ms] = map(t -> mean(t).time, df_raw.trial) ./ 1e6
18+
df[:, :Implementation] = map(df.alg) do alg
19+
if alg === :base_seq || alg == :base_par
20+
Symbol("Base.Dict + Divide-and-Conquer")
21+
elseif alg === :cdict_seq || alg === :cdict_par
22+
:ConcurrentDict
23+
else
24+
error("unknown alg = ", alg)
25+
end
26+
end
27+
datasize = 2^19
28+
df[:, access_param] = datasize ./ df.nkeys
29+
df
30+
end
31+
#-
32+
33+
df_speedup = combine(groupby(df, Not([:ms, :ntasks, :alg, :Implementation]))) do g
34+
baseline = only(g.ms[g.alg.===:base_seq])
35+
hcat(g, DataFrame((; speedup = baseline ./ g.ms)))
36+
end
37+
#-
38+
39+
function parallel_algorithms(df)
40+
idx = df.alg .∈ Ref((:base_par, :cdict_par))
41+
return df[idx, :]
42+
end
43+
44+
plt = @vlplot(
45+
facet = {column = {field = :Implementation}},
46+
spec = {
47+
layer = [
48+
{
49+
# :line,
50+
mark = {:line, point = true},
51+
encoding = {
52+
x = {:ntasks, type = :quantitative, title = "Number of Tasks"},
53+
y = {
54+
:speedup,
55+
type = :quantitative,
56+
title = "Speedup wrt sequential program",
57+
},
58+
color = {field = access_param, type = :ordinal},
59+
},
60+
},
61+
{mark = :rule, encoding = {y = {datum = 1}}},
62+
],
63+
},
64+
data = parallel_algorithms(df_speedup),
65+
)
66+
67+
save(joinpath(@__DIR__, "build/results.png"), plt)
68+
save(joinpath(@__DIR__, "build/results.svg"), plt)
69+
70+
plt

benchmark/dict_histogram/run.jl

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
import BenchmarkTools
2+
import ConcurrentCollectionsBenchmarks
3+
import JSON
4+
5+
function git_info(dir = @__DIR__)
6+
git(cmd) = strip(read(setenv(`git $cmd`; dir), String))
7+
return (;
8+
revision = git(`rev-parse HEAD`),
9+
status = git(`status --short --untracked-files=no --porcelain`),
10+
)
11+
end
12+
13+
function julia_info()
14+
return (
15+
version = string(VERSION),
16+
git = (
17+
commit = Base.GIT_VERSION_INFO.commit,
18+
branch = Base.GIT_VERSION_INFO.branch,
19+
),
20+
is_debugbuild = ccall(:jl_is_debugbuild, Cint, ()) != 0,
21+
libllvm_version = string(Base.libllvm_version),
22+
Sys = (
23+
WORD_SIZE = Sys.WORD_SIZE,
24+
JIT = Sys.JIT,
25+
# CPU_NAME = Sys.CPU_NAME,
26+
# CPU_THREADS = Sys.CPU_THREADS,
27+
),
28+
env = Dict(k => v for (k, v) in ENV if startswith(k, "JULIA_")),
29+
)
30+
end
31+
32+
function main(args = ARGS)
33+
output = get(args, 1, joinpath(@__DIR__, "build", "results.json"))
34+
mkpath(dirname(output))
35+
36+
info = (; git = git_info(), julia = julia_info())
37+
open(joinpath(dirname(output), "info.json"), write = true) do io
38+
JSON.print(io, info)
39+
end
40+
41+
suite = ConcurrentCollectionsBenchmarks.BenchDictHistogram.setup(
42+
ntasks_list = 1:Threads.nthreads(),
43+
nkeys_list = [2^13, 2^16, 2^19, 2^25],
44+
)
45+
results = run(suite; verbose = true)
46+
BenchmarkTools.save(output, results)
47+
return results
48+
end
49+
50+
if abspath(PROGRAM_FILE) == @__FILE__
51+
main()
52+
end

test/ConcurrentCollectionsTests/src/test_bench_dict_histogram.jl

Lines changed: 23 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,20 @@ module TestBenchDictHistogram
22

33
using ConcurrentCollections
44
using ConcurrentCollectionsBenchmarks.BenchDictHistogram:
5-
default_ntasks_list, generate, hist_parallel!, hist_seq!
5+
default_ntasks_list, generate, hist_parallel!, hist_parallel_dac, hist_seq!
66
using Test
77

8+
function diffvalues(actual, expected)
9+
diffs = []
10+
for (key, ve) in expected
11+
va = actual[key]
12+
if ve != va
13+
push!(diffs, (; key, actual = va, expected = ve))
14+
end
15+
end
16+
return diffs
17+
end
18+
819
function test()
920
datasize_list = [10, 2^5, 2^10, 2^20]
1021
fulldata = generate(datasize = datasize_list[end])
@@ -20,14 +31,7 @@ function test(data)
2031
cdseq = hist_seq!(ConcurrentDict{String,Int}(), data)
2132
@test sort(collect(setdiff(keys(dbase), keys(cdseq)))) == []
2233
@test sort(collect(setdiff(keys(cdseq), keys(dbase)))) == []
23-
diffvalues = []
24-
for (key, expected) in dbase
25-
actual = cdseq[key]
26-
if actual != expected
27-
push!(diffvalues, (; key, actual, expected))
28-
end
29-
end
30-
@test diffvalues == []
34+
@test diffvalues(cdseq, dbase) == []
3135
@test Dict(cdseq) == dbase
3236
end
3337
@testset for ntasks in default_ntasks_list()
@@ -39,16 +43,18 @@ function test(data)
3943
=#
4044
@test sort(collect(setdiff(keys(dbase), keys(cdpar)))) == []
4145
@test sort(collect(setdiff(keys(cdpar), keys(dbase)))) == []
42-
diffvalues = []
43-
for (key, expected) in dbase
44-
actual = cdpar[key]
45-
if actual != expected
46-
push!(diffvalues, (; key, actual, expected))
47-
end
48-
end
49-
@test diffvalues == []
46+
@test diffvalues(cdpar, dbase) == []
5047
@test Dict(cdpar) == dbase
5148
end
49+
@testset "dac" begin
50+
@testset for ntasks in default_ntasks_list()
51+
dpar = hist_parallel_dac(data; ntasks = ntasks)
52+
@test sort(collect(setdiff(keys(dbase), keys(dpar)))) == []
53+
@test sort(collect(setdiff(keys(dpar), keys(dbase)))) == []
54+
@test diffvalues(dpar, dbase) == []
55+
@test dpar == dbase
56+
end
57+
end
5258
end
5359

5460
end # module

0 commit comments

Comments
 (0)