-
Notifications
You must be signed in to change notification settings - Fork 480
timely-util: no nonsense timely/async bridge #14630
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
fd69010
to
be0c305
Compare
cf16ed3
to
88f26f0
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I need to read this more closely and ask more questions, i will reread it tomorrow morning
|
||
/// An async Waker that activates a specific operator when woken and marks the task as ready | ||
struct TimelyWaker { | ||
activator: SyncActivator, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
my understanding is that if Waker
didnt require Send
then this could be a normal activator?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, exactly. We do some trickery in the future were we also carry around a normal activator plus the thread id that it belongs to and dispatch to the right one based on thread id. This is what gloomio does
src/timely-util/src/builder_async.rs
Outdated
let connection = | ||
vec![Antichain::from_elem(Default::default()); self.builder.shape().inputs()]; | ||
self.new_output_connection(connection) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
just call self.builder.new_output()
, to make it more clear that for output we just forward on
src/timely-util/src/builder_async.rs
Outdated
// The timely operator needs to be kept alive if the task is pending or if there | ||
// are live input handles, perhaps in an another executor |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if the logic future is dead, then what happens if their are live input handles? nothing will be able to read them, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It was supposed to support a usecase where you've sent the handles to another executor but the original operator has terminated. I removed the input_handle
logic and now just panic if a handle is used after the operator has died
88f26f0
to
9f17445
Compare
let consumed_part_cap = cap.delayed_for_output(cap.time(), consumed_part_port); | ||
let mut consumed_part_session = | ||
consumed_part_output_handle.session(&consumed_part_cap); | ||
drop(cap); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is this strictly required?
new_ts_upper | ||
); | ||
|
||
let cap = cap_set.delayed(cap_set.first().unwrap()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
similarly, is this just to avoid holding onto the cap as we wait for the advance
future to complete?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok I think this is reasonable, but 3 things need to be documented:
- Why do we need
shared_frontiers
? Its so we can hold frontierref's across await points, i think - Why are the transmute's okay? Link to the tokio docs
- Also followup: make a miri test if that even possible at all
- How the reactor works
As well as:
- follow-up on where we actually need
SeqCst
- and I have a few more nits i added!
/// The index of this handle into the shared frontiers vector | ||
index: usize, | ||
/// Reference to the reactor queue of this input handle where Wakers can be registered | ||
reactor_registry: Weak<RefCell<Vec<Waker>>>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
name this the same as registered_wakers
Also, we should document why it needs to be a Weak
9f17445
to
f54fb16
Compare
Signed-off-by: Petros Angelatos <[email protected]>
f54fb16
to
fa8a719
Compare
Motivation
This PR provides a new async/timely integration that as far as I can tell has no weird invariants that the user needs to now about and so it's hopefully much harder to misuse than the previous
build_async
method.The piece that makes this possible is the
AsyncInputHandle
type that correctly wires up asyncWakers
and a manualFuture
implementation that allows users to naturally callinput.next().await
on a handle and/or safely use these future in aselect!
loop.The handle's
next()
method behaves like an async iterator, resolving toNone
when there are no more notifications to be received, and gives out an enum ofEvent::Data(capability, data)
when there is data in the input orEvent::Progress(frontier)
when the frontier has advanced.On top of the generic
builder_async::OperatorBuilder
this PR also implementsOperator::unary_async
,Operator::binary_async
,Operator::sink_async
and a freestanding functionoperator::source_async
.