Skip to content

Conversation

petrosagg
Copy link
Contributor

@petrosagg petrosagg commented Sep 5, 2022

Motivation

This PR provides a new async/timely integration that as far as I can tell has no weird invariants that the user needs to now about and so it's hopefully much harder to misuse than the previous build_async method.

The piece that makes this possible is the AsyncInputHandle type that correctly wires up async Wakers and a manual Future implementation that allows users to naturally call input.next().await on a handle and/or safely use these future in a select! loop.

The handle's next() method behaves like an async iterator, resolving to None when there are no more notifications to be received, and gives out an enum of Event::Data(capability, data) when there is data in the input or Event::Progress(frontier) when the frontier has advanced.

On top of the generic builder_async::OperatorBuilder this PR also implements Operator::unary_async, Operator::binary_async, Operator::sink_async and a freestanding function operator::source_async.

@petrosagg petrosagg force-pushed the timely-async-bridge branch 3 times, most recently from fd69010 to be0c305 Compare September 5, 2022 11:34
@petrosagg petrosagg marked this pull request as ready for review September 5, 2022 11:37
@petrosagg petrosagg requested a review from aljoscha September 5, 2022 11:37
@petrosagg petrosagg force-pushed the timely-async-bridge branch 5 times, most recently from cf16ed3 to 88f26f0 Compare September 7, 2022 13:50
Copy link
Contributor

@guswynn guswynn left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I need to read this more closely and ask more questions, i will reread it tomorrow morning


/// An async Waker that activates a specific operator when woken and marks the task as ready
struct TimelyWaker {
activator: SyncActivator,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

my understanding is that if Waker didnt require Send then this could be a normal activator?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, exactly. We do some trickery in the future were we also carry around a normal activator plus the thread id that it belongs to and dispatch to the right one based on thread id. This is what gloomio does

Comment on lines 227 to 229
let connection =
vec![Antichain::from_elem(Default::default()); self.builder.shape().inputs()];
self.new_output_connection(connection)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just call self.builder.new_output(), to make it more clear that for output we just forward on

Comment on lines 299 to 300
// The timely operator needs to be kept alive if the task is pending or if there
// are live input handles, perhaps in an another executor
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if the logic future is dead, then what happens if their are live input handles? nothing will be able to read them, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It was supposed to support a usecase where you've sent the handles to another executor but the original operator has terminated. I removed the input_handle logic and now just panic if a handle is used after the operator has died

let consumed_part_cap = cap.delayed_for_output(cap.time(), consumed_part_port);
let mut consumed_part_session =
consumed_part_output_handle.session(&consumed_part_cap);
drop(cap);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this strictly required?

new_ts_upper
);

let cap = cap_set.delayed(cap_set.first().unwrap());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

similarly, is this just to avoid holding onto the cap as we wait for the advance future to complete?

Copy link
Contributor

@guswynn guswynn left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok I think this is reasonable, but 3 things need to be documented:

  • Why do we need shared_frontiers? Its so we can hold frontierref's across await points, i think
  • Why are the transmute's okay? Link to the tokio docs
    • Also followup: make a miri test if that even possible at all
  • How the reactor works

As well as:

  • follow-up on where we actually need SeqCst
  • and I have a few more nits i added!

/// The index of this handle into the shared frontiers vector
index: usize,
/// Reference to the reactor queue of this input handle where Wakers can be registered
reactor_registry: Weak<RefCell<Vec<Waker>>>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

name this the same as registered_wakers

Also, we should document why it needs to be a Weak

@petrosagg petrosagg merged commit c3a5ba7 into MaterializeInc:main Sep 12, 2022
@petrosagg petrosagg deleted the timely-async-bridge branch September 12, 2022 16:06
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants