Replies: 4 comments 2 replies
-
This should "just work" for you using Dagger, DTables, DataFrames
# starting from a DF to make things simle
x = DataFrame(; a=1:10_000, b=1:10_000)
# create 10 partitions
d = DTable(x, 1000)
results = []
for i in 1:(length(d.chunks)-1)
# running on current+next partition and calculating average of the column :a
out = Dagger.@spawn ((c1, c2) -> sum(c1.a .+ c2.a) / 2)(d.chunks[i], d.chunks[i+1])
# in an ideal world, the return is an object not scalar (eg, DataFrame), hence the generic `push!`
push!(results, out)
end
fetch.(results)
What you're trying to do looks a lot like what DTables.jl internals look like, so feel free to have a look at how things are done there for more inspiration We should probably handle windowed functions differently. I suggest you try OnlineStats with this to skip the manual work. Something like this https://github.com/krynju/mgr-benchmarks/blob/6f536976fb78750e46f1a4f06b9239e6c443faea/dtable/scripts/dtable_full_scenario_stages.jl#L56-L62 Out of core (so disk caching) can be enabled through |
Beta Was this translation helpful? Give feedback.
-
One thing to beware of is that
For multithreading, I would put a lock around it. For distributed, I would instead use something like |
Beta Was this translation helpful? Give feedback.
-
Oooops there should be no fixed and edited previous comment using Dagger, DTables, DataFrames
# starting from a DF to make things simle
x = DataFrame(; a=1:10_000, b=1:10_000)
# create 10 partitions
d = DTable(x, 1000)
results = []
for i in 1:(length(d.chunks)-1)
# running on current+next partition and calculating average of the column :a
out = Dagger.@spawn ((c1, c2) -> sum(c1.a .+ c2.a) / 2)(d.chunks[i], d.chunks[i+1])
# in an ideal world, the return is an object not scalar (eg, DataFrame), hence the generic `push!`
push!(results, out)
end
fetch.(results) |
Beta Was this translation helpful? Give feedback.
-
Thank you for both for your comments! The original MWE is working great now :) I'm ultimately looking for a Note: I'm using DTable and Dagger latest versions on main (to have access to My questions below (hopefully, they will be helpful for others as well):
Eg, "I want to sum up column a" (if reduce didn't exist...)
This code runs but the files will be created in the I suspect this is a bug that comes from here. I think the fix might be:
I'm happy to open Issue/PR, but I wanted to confirm that I'm not using it wrong.
Eg, (references to code in Q2)
Thank you :) |
Beta Was this translation helpful? Give feedback.
Uh oh!
There was an error while loading. Please reload this page.
-
First of all, thank you again for creating these packages!
I have a use case where I need to process a large number of tables in a windowed query (think tables saved by the hour and I need two-hour averages).
I discussed with @jpsamaroo in JCon that it would be best to process it by simply operating on the chunks underpinning the DTable, but I've been struggling to produce a working example (see MWE below).
I get an error about indexing: "ERROR: ArgumentError: invalid index: EagerThunk (running) of type Dagger.EagerThunk"
Or an error about Chunk: "ERROR: type Chunk has no field a"
I probably wrongly assumed that wrapping it in Dagger.@Spawn will delay execution until it's ready to be run (ie, that inner function will never see the "Chunk")
MWE:
Thank you!
EDIT:
The bonus question would be how to run it out-of-core as presented by Julian? (with deserialize/serialize arguments)
I couldn't find that API on the main branch / in the docs.
I suspect it might not be released yet, right?
Beta Was this translation helpful? Give feedback.
All reactions