diff --git a/src/adapter/src/flags.rs b/src/adapter/src/flags.rs index ae5f0dc3e225b..60d3ee3ce4cd8 100644 --- a/src/adapter/src/flags.rs +++ b/src/adapter/src/flags.rs @@ -10,12 +10,13 @@ use std::time::Duration; use mz_compute_client::protocol::command::ComputeParameters; +use mz_compute_types::dataflows::YieldSpec; use mz_orchestrator::scheduling_config::{ServiceSchedulingConfig, ServiceTopologySpreadConfig}; use mz_ore::cast::CastFrom; use mz_ore::error::ErrorExt; use mz_persist_client::cfg::{PersistParameters, RetryParameters}; use mz_service::params::GrpcClientParameters; -use mz_sql::session::vars::SystemVars; +use mz_sql::session::vars::{SystemVars, DEFAULT_LINEAR_JOIN_YIELDING}; use mz_storage_types::parameters::{ StorageMaxInflightBytesConfig, StorageParameters, UpsertAutoSpillConfig, }; @@ -23,9 +24,16 @@ use mz_tracing::params::TracingParameters; /// Return the current compute configuration, derived from the system configuration. pub fn compute_config(config: &SystemVars) -> ComputeParameters { + let linear_join_yielding = config.linear_join_yielding(); + let linear_join_yielding = parse_yield_spec(linear_join_yielding).unwrap_or_else(|| { + tracing::error!("invalid `linear_join_yielding` config: {linear_join_yielding}"); + parse_yield_spec(&DEFAULT_LINEAR_JOIN_YIELDING).expect("default is valid") + }); + ComputeParameters { max_result_size: Some(config.max_result_size()), dataflow_max_inflight_bytes: Some(config.dataflow_max_inflight_bytes()), + linear_join_yielding: Some(linear_join_yielding), enable_mz_join_core: Some(config.enable_mz_join_core()), enable_jemalloc_profiling: Some(config.enable_jemalloc_profiling()), enable_specialized_arrangements: Some(config.enable_specialized_arrangements()), @@ -35,6 +43,22 @@ pub fn compute_config(config: &SystemVars) -> ComputeParameters { } } +fn parse_yield_spec(s: &str) -> Option { + let parts: Vec<_> = s.split(':').collect(); + match &parts[..] { + ["work", amount] => { + let amount = amount.parse().ok()?; + Some(YieldSpec::ByWork(amount)) + } + ["time", millis] => { + let millis = millis.parse().ok()?; + let duration = Duration::from_millis(millis); + Some(YieldSpec::ByTime(duration)) + } + _ => None, + } +} + /// Return the current storage configuration, derived from the system configuration. pub fn storage_config(config: &SystemVars) -> StorageParameters { StorageParameters { diff --git a/src/compute-client/src/protocol/command.proto b/src/compute-client/src/protocol/command.proto index 758ca21082d8a..91396e6a535c9 100644 --- a/src/compute-client/src/protocol/command.proto +++ b/src/compute-client/src/protocol/command.proto @@ -70,4 +70,5 @@ message ProtoComputeParameters { mz_service.params.ProtoGrpcClientParameters grpc_client = 6; optional bool enable_jemalloc_profiling = 7; optional bool enable_specialized_arrangements = 8; + mz_compute_types.dataflows.ProtoYieldSpec linear_join_yielding = 9; } diff --git a/src/compute-client/src/protocol/command.rs b/src/compute-client/src/protocol/command.rs index c1573c2d1a8de..e9a5922d97946 100644 --- a/src/compute-client/src/protocol/command.rs +++ b/src/compute-client/src/protocol/command.rs @@ -10,7 +10,7 @@ //! Compute protocol commands. use mz_cluster_client::client::{ClusterStartupEpoch, TimelyConfig, TryIntoTimelyConfig}; -use mz_compute_types::dataflows::DataflowDescription; +use mz_compute_types::dataflows::{DataflowDescription, YieldSpec}; use mz_expr::RowSetFinishing; use mz_ore::tracing::OpenTelemetryContext; use mz_persist_client::cfg::PersistParameters; @@ -359,6 +359,8 @@ pub struct ComputeParameters { pub max_result_size: Option, /// The maximum number of in-flight bytes emitted by persist_sources feeding dataflows. pub dataflow_max_inflight_bytes: Option, + /// The yielding behavior with which linear joins should be rendered. + pub linear_join_yielding: Option, /// Whether rendering should use `mz_join_core` rather than DD's `JoinCore::join_core`. pub enable_mz_join_core: Option, /// Whether to activate jemalloc heap profiling. @@ -379,6 +381,7 @@ impl ComputeParameters { let ComputeParameters { max_result_size, dataflow_max_inflight_bytes, + linear_join_yielding, enable_mz_join_core, enable_jemalloc_profiling, enable_specialized_arrangements, @@ -393,6 +396,9 @@ impl ComputeParameters { if dataflow_max_inflight_bytes.is_some() { self.dataflow_max_inflight_bytes = dataflow_max_inflight_bytes; } + if linear_join_yielding.is_some() { + self.linear_join_yielding = linear_join_yielding; + } if enable_mz_join_core.is_some() { self.enable_mz_join_core = enable_mz_join_core; } @@ -420,6 +426,7 @@ impl RustType for ComputeParameters { ProtoComputeParameters { max_result_size: self.max_result_size.into_proto(), dataflow_max_inflight_bytes: self.dataflow_max_inflight_bytes.into_proto(), + linear_join_yielding: self.linear_join_yielding.into_proto(), enable_mz_join_core: self.enable_mz_join_core.into_proto(), enable_jemalloc_profiling: self.enable_jemalloc_profiling.into_proto(), enable_specialized_arrangements: self.enable_specialized_arrangements.into_proto(), @@ -433,6 +440,7 @@ impl RustType for ComputeParameters { Ok(Self { max_result_size: proto.max_result_size.into_rust()?, dataflow_max_inflight_bytes: proto.dataflow_max_inflight_bytes.into_rust()?, + linear_join_yielding: proto.linear_join_yielding.into_rust()?, enable_mz_join_core: proto.enable_mz_join_core.into_rust()?, enable_jemalloc_profiling: proto.enable_jemalloc_profiling.into_rust()?, enable_specialized_arrangements: proto.enable_specialized_arrangements.into_rust()?, diff --git a/src/compute-types/src/dataflows.proto b/src/compute-types/src/dataflows.proto index 2d9ff2cec9501..776690c6c8865 100644 --- a/src/compute-types/src/dataflows.proto +++ b/src/compute-types/src/dataflows.proto @@ -13,6 +13,7 @@ import "compute-types/src/plan.proto"; import "compute-types/src/sinks.proto"; import "compute-types/src/sources.proto"; import "expr/src/scalar.proto"; +import "proto/src/proto.proto"; import "repr/src/antichain.proto"; import "repr/src/global_id.proto"; import "repr/src/relation_and_scalar.proto"; @@ -63,3 +64,10 @@ message ProtoBuildDesc { mz_repr.global_id.ProtoGlobalId id = 1; plan.ProtoPlan plan = 2; } + +message ProtoYieldSpec { + oneof kind { + uint64 by_work = 1; + mz_proto.ProtoDuration by_time = 2; + } +} diff --git a/src/compute-types/src/dataflows.rs b/src/compute-types/src/dataflows.rs index 2f449c0f9fd7d..7a1c748d2c897 100644 --- a/src/compute-types/src/dataflows.rs +++ b/src/compute-types/src/dataflows.rs @@ -10,6 +10,7 @@ //! Types for describing dataflows. use std::collections::{BTreeMap, BTreeSet}; +use std::time::Duration; use mz_expr::{CollectionPlan, MirRelationExpr, MirScalarExpr, OptimizedMirRelationExpr}; use mz_proto::{IntoRustIfSome, ProtoMapEntry, ProtoType, RustType, TryFromProtoError}; @@ -640,6 +641,38 @@ impl RustType for BuildDesc { } } +/// Specification of a dataflow operator's yielding behavior. +#[derive(Clone, Copy, Debug, Eq, PartialEq, Serialize, Deserialize, Arbitrary)] +pub enum YieldSpec { + ByWork(usize), + ByTime(Duration), +} + +impl RustType for YieldSpec { + fn into_proto(&self) -> ProtoYieldSpec { + use proto_yield_spec::Kind; + + let kind = match *self { + Self::ByWork(w) => Kind::ByWork(w.into_proto()), + Self::ByTime(t) => Kind::ByTime(t.into_proto()), + }; + ProtoYieldSpec { kind: Some(kind) } + } + + fn from_proto(proto: ProtoYieldSpec) -> Result { + use proto_yield_spec::Kind; + + let Some(kind) = proto.kind else { + return Err(TryFromProtoError::missing_field("ProtoYieldSpec::kind")); + }; + let spec = match kind { + Kind::ByWork(w) => Self::ByWork(w.into_rust()?), + Kind::ByTime(t) => Self::ByTime(t.into_rust()?), + }; + Ok(spec) + } +} + #[cfg(test)] mod tests { use mz_proto::protobuf_roundtrip; diff --git a/src/compute/src/compute_state.rs b/src/compute/src/compute_state.rs index e32742c2b96e4..f8360a3b16335 100644 --- a/src/compute/src/compute_state.rs +++ b/src/compute/src/compute_state.rs @@ -44,7 +44,7 @@ use crate::arrangement::manager::{SpecializedTraceHandle, TraceBundle, TraceMana use crate::logging; use crate::logging::compute::ComputeEvent; use crate::metrics::ComputeMetrics; -use crate::render::LinearJoinImpl; +use crate::render::{LinearJoinImpl, LinearJoinSpec}; use crate::server::ResponseSender; use crate::typedefs::TraceRowHandle; @@ -84,8 +84,8 @@ pub struct ComputeState { max_result_size: u32, /// Maximum number of in-flight bytes emitted by persist_sources feeding dataflows. pub dataflow_max_inflight_bytes: usize, - /// Implementation to use for rendering linear joins. - pub linear_join_impl: LinearJoinImpl, + /// Specification for rendering linear joins. + pub linear_join_spec: LinearJoinSpec, /// Metrics for this replica. pub metrics: ComputeMetrics, /// A process-global handle to tracing configuration. @@ -116,7 +116,7 @@ impl ComputeState { command_history, max_result_size: u32::MAX, dataflow_max_inflight_bytes: usize::MAX, - linear_join_impl: Default::default(), + linear_join_spec: Default::default(), metrics, tracing_handle, enable_specialized_arrangements: Default::default(), @@ -198,6 +198,7 @@ impl<'a, A: Allocate + 'static> ActiveComputeState<'a, A> { let ComputeParameters { max_result_size, dataflow_max_inflight_bytes, + linear_join_yielding, enable_mz_join_core, enable_jemalloc_profiling, enable_specialized_arrangements, @@ -212,11 +213,14 @@ impl<'a, A: Allocate + 'static> ActiveComputeState<'a, A> { if let Some(v) = dataflow_max_inflight_bytes { self.compute_state.dataflow_max_inflight_bytes = v; } + if let Some(v) = linear_join_yielding { + self.compute_state.linear_join_spec.yielding = v; + } if let Some(v) = enable_specialized_arrangements { self.compute_state.enable_specialized_arrangements = v; } if let Some(v) = enable_mz_join_core { - self.compute_state.linear_join_impl = match v { + self.compute_state.linear_join_spec.implementation = match v { false => LinearJoinImpl::DifferentialDataflow, true => LinearJoinImpl::Materialize, }; diff --git a/src/compute/src/render/context.rs b/src/compute/src/render/context.rs index 2155357c68ec1..b1733976a3811 100644 --- a/src/compute/src/render/context.rs +++ b/src/compute/src/render/context.rs @@ -42,7 +42,7 @@ use timely::progress::{Antichain, Timestamp}; use crate::arrangement::manager::SpecializedTraceHandle; use crate::extensions::arrange::{KeyCollection, MzArrange}; use crate::render::errors::ErrorLogger; -use crate::render::join::LinearJoinImpl; +use crate::render::join::LinearJoinSpec; use crate::typedefs::{ErrSpine, RowSpine, TraceErrHandle, TraceRowHandle}; // Local type definition to avoid the horror in signatures. @@ -98,8 +98,8 @@ where pub bindings: BTreeMap>, /// A token that operators can probe to know whether the dataflow is shutting down. pub(super) shutdown_token: ShutdownToken, - /// The implementation to use for rendering linear joins. - pub(super) linear_join_impl: LinearJoinImpl, + /// Specification for rendering linear joins. + pub(super) linear_join_spec: LinearJoinSpec, pub(super) enable_specialized_arrangements: bool, } @@ -127,7 +127,7 @@ where until: dataflow.until.clone(), bindings: BTreeMap::new(), shutdown_token: Default::default(), - linear_join_impl: Default::default(), + linear_join_spec: Default::default(), enable_specialized_arrangements: Default::default(), } } diff --git a/src/compute/src/render/join/linear_join.rs b/src/compute/src/render/join/linear_join.rs index 5c123eb062412..b4db82877daca 100644 --- a/src/compute/src/render/join/linear_join.rs +++ b/src/compute/src/render/join/linear_join.rs @@ -11,10 +11,13 @@ //! //! Consult [LinearJoinPlan] documentation for details. +use std::time::Instant; + use differential_dataflow::lattice::Lattice; use differential_dataflow::operators::arrange::arrangement::Arranged; use differential_dataflow::trace::TraceReader; use differential_dataflow::{AsCollection, Collection, Data}; +use mz_compute_types::dataflows::YieldSpec; use mz_compute_types::plan::join::linear_join::{LinearJoinPlan, LinearStagePlan}; use mz_compute_types::plan::join::JoinClosure; use mz_repr::fixed_length::IntoRowByTypes; @@ -36,16 +39,36 @@ use crate::typedefs::RowSpine; /// Available linear join implementations. /// /// See the `mz_join_core` module docs for our rationale for providing two join implementations. -#[derive(Clone, Copy, Default)] +#[derive(Clone, Copy)] pub enum LinearJoinImpl { - #[default] Materialize, DifferentialDataflow, } -impl LinearJoinImpl { - /// Run this join implementation on the provided arrangements. - fn run( +/// Specification of how linear joins are to be executed. +/// +/// Note that currently `yielding` only affects the `Materialize` join implementation, as the DD +/// join doesn't allow configuring its yielding behavior. Merging [#390] would fix this. +/// +/// [#390]: https://github.com/TimelyDataflow/differential-dataflow/pull/390 +#[derive(Clone, Copy)] +pub struct LinearJoinSpec { + pub implementation: LinearJoinImpl, + pub yielding: YieldSpec, +} + +impl Default for LinearJoinSpec { + fn default() -> Self { + Self { + implementation: LinearJoinImpl::Materialize, + yielding: YieldSpec::ByWork(1_000_000), + } + } +} + +impl LinearJoinSpec { + /// Render a join operator according to this specification. + fn render( &self, arranged1: &Arranged, arranged2: &Arranged, @@ -63,11 +86,21 @@ impl LinearJoinImpl { V1: Data, V2: Data, { - match self { - Self::DifferentialDataflow => { + use LinearJoinImpl::*; + use YieldSpec::*; + + match (self.implementation, self.yielding) { + (DifferentialDataflow, _) => { differential_dataflow::operators::JoinCore::join_core(arranged1, arranged2, result) } - Self::Materialize => mz_join_core(arranged1, arranged2, result), + (Materialize, ByWork(limit)) => { + let yield_fn = move |_start, work| work >= limit; + mz_join_core(arranged1, arranged2, result, yield_fn) + } + (Materialize, ByTime(limit)) => { + let yield_fn = move |start: Instant, _work| start.elapsed() >= limit; + mz_join_core(arranged1, arranged2, result, yield_fn) + } } } } @@ -160,7 +193,7 @@ where // Different variants of `joined` implement this differently, // and the logic is centralized there. let stream = differential_join( - self.linear_join_impl, + self.linear_join_spec, joined, inputs[stage_plan.lookup_relation].enter_region(inner), stage_plan, @@ -212,7 +245,7 @@ where /// version of the join of previous inputs. This is split into its own method /// to enable reuse of code with different types of `prev_keyed`. fn differential_join( - join_impl: LinearJoinImpl, + join_spec: LinearJoinSpec, mut joined: JoinedFlavor, lookup_relation: CollectionBundle, LinearStagePlan { @@ -273,14 +306,14 @@ where JoinedFlavor::Local(local) => match arrangement { ArrangementFlavor::Local(oks, errs1) => { let (oks, errs2) = - dispatch_differential_join_inner_local_local(join_impl, local, oks, closure); + dispatch_differential_join_inner_local_local(join_spec, local, oks, closure); errors.push(errs1.as_collection(|k, _v| k.clone())); errors.extend(errs2); oks } ArrangementFlavor::Trace(_gid, oks, errs1) => { let (oks, errs2) = - dispatch_differential_join_inner_local_trace(join_impl, local, oks, closure); + dispatch_differential_join_inner_local_trace(join_spec, local, oks, closure); errors.push(errs1.as_collection(|k, _v| k.clone())); errors.extend(errs2); oks @@ -289,14 +322,14 @@ where JoinedFlavor::Trace(trace) => match arrangement { ArrangementFlavor::Local(oks, errs1) => { let (oks, errs2) = - dispatch_differential_join_inner_trace_local(join_impl, trace, oks, closure); + dispatch_differential_join_inner_trace_local(join_spec, trace, oks, closure); errors.push(errs1.as_collection(|k, _v| k.clone())); errors.extend(errs2); oks } ArrangementFlavor::Trace(_gid, oks, errs1) => { let (oks, errs2) = - dispatch_differential_join_inner_trace_trace(join_impl, trace, oks, closure); + dispatch_differential_join_inner_trace_trace(join_spec, trace, oks, closure); errors.push(errs1.as_collection(|k, _v| k.clone())); errors.extend(errs2); oks @@ -307,7 +340,7 @@ where /// Dispatches valid combinations of arrangements where the type-specialized keys match. fn dispatch_differential_join_inner_local_local( - join_impl: LinearJoinImpl, + join_spec: LinearJoinSpec, prev_keyed: SpecializedArrangement, next_input: SpecializedArrangement, closure: JoinClosure, @@ -329,7 +362,7 @@ where .zip(next_key_types.iter()) .all(|(c1, c2)| c1.scalar_type == c2.scalar_type)); differential_join_inner( - join_impl, + join_spec, prev_keyed, next_input, Some(prev_key_types), @@ -341,7 +374,7 @@ where ( SpecializedArrangement::RowRow(prev_keyed), SpecializedArrangement::RowRow(next_input), - ) => differential_join_inner(join_impl, prev_keyed, next_input, None, None, None, closure), + ) => differential_join_inner(join_spec, prev_keyed, next_input, None, None, None, closure), (SpecializedArrangement::Bytes9Row(key_types, _), SpecializedArrangement::RowRow(_)) => { panic!( "Invalid combination of type specializations: key types differ! \ @@ -361,7 +394,7 @@ where /// Dispatches valid combinations of arrangement-trace where the type-specialized keys match. fn dispatch_differential_join_inner_local_trace( - join_impl: LinearJoinImpl, + join_spec: LinearJoinSpec, prev_keyed: SpecializedArrangement, next_input: SpecializedArrangementImport, closure: JoinClosure, @@ -384,7 +417,7 @@ where .zip(next_key_types.iter()) .all(|(c1, c2)| c1.scalar_type == c2.scalar_type)); differential_join_inner( - join_impl, + join_spec, prev_keyed, next_input, Some(prev_key_types), @@ -396,7 +429,7 @@ where ( SpecializedArrangement::RowRow(prev_keyed), SpecializedArrangementImport::RowRow(next_input), - ) => differential_join_inner(join_impl, prev_keyed, next_input, None, None, None, closure), + ) => differential_join_inner(join_spec, prev_keyed, next_input, None, None, None, closure), ( SpecializedArrangement::Bytes9Row(key_types, _), SpecializedArrangementImport::RowRow(_), @@ -418,7 +451,7 @@ where /// Dispatches valid combinations of trace-arrangement where the type-specialized keys match. fn dispatch_differential_join_inner_trace_local( - join_impl: LinearJoinImpl, + join_spec: LinearJoinSpec, prev_keyed: SpecializedArrangementImport, next_input: SpecializedArrangement, closure: JoinClosure, @@ -441,7 +474,7 @@ where .zip(next_key_types.iter()) .all(|(c1, c2)| c1.scalar_type == c2.scalar_type)); differential_join_inner( - join_impl, + join_spec, prev_keyed, next_input, Some(prev_key_types), @@ -453,7 +486,7 @@ where ( SpecializedArrangementImport::RowRow(prev_keyed), SpecializedArrangement::RowRow(next_input), - ) => differential_join_inner(join_impl, prev_keyed, next_input, None, None, None, closure), + ) => differential_join_inner(join_spec, prev_keyed, next_input, None, None, None, closure), ( SpecializedArrangementImport::Bytes9Row(key_types, _), SpecializedArrangement::RowRow(_), @@ -475,7 +508,7 @@ where /// Dispatches valid combinations of trace-arrangement where the type-specialized keys match. fn dispatch_differential_join_inner_trace_trace( - join_impl: LinearJoinImpl, + join_spec: LinearJoinSpec, prev_keyed: SpecializedArrangementImport, next_input: SpecializedArrangementImport, closure: JoinClosure, @@ -498,7 +531,7 @@ where .zip(next_key_types.iter()) .all(|(c1, c2)| c1.scalar_type == c2.scalar_type)); differential_join_inner( - join_impl, + join_spec, prev_keyed, next_input, Some(prev_key_types), @@ -510,7 +543,7 @@ where ( SpecializedArrangementImport::RowRow(prev_keyed), SpecializedArrangementImport::RowRow(next_input), - ) => differential_join_inner(join_impl, prev_keyed, next_input, None, None, None, closure), + ) => differential_join_inner(join_spec, prev_keyed, next_input, None, None, None, closure), ( SpecializedArrangementImport::Bytes9Row(key_types, _), SpecializedArrangementImport::RowRow(_), @@ -537,7 +570,7 @@ where /// The return type includes an optional error collection, which may be /// `None` if we can determine that `closure` cannot error. fn differential_join_inner( - join_impl: LinearJoinImpl, + join_spec: LinearJoinSpec, prev_keyed: Arranged, next_input: Arranged, key_types: Option>, @@ -567,8 +600,8 @@ where let mut new_buf = Row::default(); if closure.could_error() { - let (oks, err) = join_impl - .run(&prev_keyed, &next_input, move |key, old, new| { + let (oks, err) = join_spec + .render(&prev_keyed, &next_input, move |key, old, new| { let key = key.into_row(&mut key_buf, key_types.as_deref()); let old = old.into_row(&mut old_buf, prev_types.as_deref()); let new = new.into_row(&mut new_buf, next_types.as_deref()); @@ -591,7 +624,7 @@ where (oks.as_collection(), Some(err.as_collection())) } else { - let oks = join_impl.run(&prev_keyed, &next_input, move |key, old, new| { + let oks = join_spec.render(&prev_keyed, &next_input, move |key, old, new| { let key = key.into_row(&mut key_buf, key_types.as_deref()); let old = old.into_row(&mut old_buf, prev_types.as_deref()); let new = new.into_row(&mut new_buf, next_types.as_deref()); diff --git a/src/compute/src/render/join/mod.rs b/src/compute/src/render/join/mod.rs index cf4a113e6bd5f..1dcb67b57f179 100644 --- a/src/compute/src/render/join/mod.rs +++ b/src/compute/src/render/join/mod.rs @@ -15,4 +15,4 @@ mod delta_join; mod linear_join; mod mz_join_core; -pub use linear_join::LinearJoinImpl; +pub use linear_join::{LinearJoinImpl, LinearJoinSpec}; diff --git a/src/compute/src/render/join/mz_join_core.rs b/src/compute/src/render/join/mz_join_core.rs index 633095d5a65fd..95e3f6398e630 100644 --- a/src/compute/src/render/join/mz_join_core.rs +++ b/src/compute/src/render/join/mz_join_core.rs @@ -38,6 +38,7 @@ use std::cmp::Ordering; use std::collections::VecDeque; +use std::time::Instant; use differential_dataflow::consolidation::consolidate_updates; use differential_dataflow::difference::Multiply; @@ -61,10 +62,11 @@ use timely::PartialOrder; /// Each matching pair of records `(key, val1)` and `(key, val2)` are subjected to the `result` function, /// which produces something implementing `IntoIterator`, where the output collection will have an entry for /// every value returned by the iterator. -pub(super) fn mz_join_core( +pub(super) fn mz_join_core( arranged1: &Arranged, arranged2: &Arranged, mut result: L, + yield_fn: YFn, ) -> Collection where G: Scope, @@ -77,6 +79,7 @@ where K: Data, V1: Data, V2: Data, + YFn: Fn(Instant, usize) -> bool + 'static, { let mut trace1 = arranged1.trace.clone(); let mut trace2 = arranged2.trace.clone(); @@ -278,24 +281,30 @@ where // input must scan all batches from the other input). // Perform some amount of outstanding work. - let mut fuel = 1_000_000; - while !todo1.is_empty() && fuel > 0 { - todo1 - .front_mut() - .unwrap() - .work(output, &mut result, &mut fuel); + let start_time = Instant::now(); + let mut work = 0; + while !todo1.is_empty() && !yield_fn(start_time, work) { + todo1.front_mut().unwrap().work( + output, + &mut result, + |w| yield_fn(start_time, w), + &mut work, + ); if !todo1.front().unwrap().work_remains() { todo1.pop_front(); } } // Perform some amount of outstanding work. - let mut fuel = 1_000_000; - while !todo2.is_empty() && fuel > 0 { - todo2 - .front_mut() - .unwrap() - .work(output, &mut result, &mut fuel); + let start_time = Instant::now(); + let mut work = 0; + while !todo2.is_empty() && !yield_fn(start_time, work) { + todo2.front_mut().unwrap().work( + output, + &mut result, + |w| yield_fn(start_time, w), + &mut work, + ); if !todo2.front().unwrap().work_remains() { todo2.pop_front(); } @@ -405,14 +414,16 @@ where } /// Process keys until at least `fuel` output tuples produced, or the work is exhausted. - fn work( + fn work( &mut self, output: &mut OutputHandle>, mut result: L, - fuel: &mut usize, + yield_fn: YFn, + work: &mut usize, ) where I: IntoIterator, L: FnMut(&C1::Key, &C1::Val, &C2::Val) -> I, + YFn: Fn(usize) -> bool, { let meet = self.capability.time(); @@ -443,7 +454,7 @@ where Ordering::Less => cursor1.seek_key(storage1, cursor2.key(storage2)), Ordering::Greater => cursor2.seek_key(storage2, cursor1.key(storage1)), Ordering::Equal => { - // Populate `temp` with the results, as long as fuel remains. + // Populate `temp` with the results, until we should yield. let key = cursor2.key(storage2); while let Some(val1) = cursor1.get_val(storage1) { while let Some(val2) = cursor2.get_val(storage2) { @@ -463,14 +474,14 @@ where cursor1.step_val(storage1); cursor2.rewind_vals(storage2); - *fuel = fuel.saturating_sub(temp.len()); + *work = work.saturating_add(temp.len()); - if *fuel == 0 { - // The fuel is exhausted, so we should yield. Returning here is only - // allowed because we leave the cursors in a state that will let us - // pick up the work correctly on the next invocation. - *fuel += flush(temp, &mut session); - if *fuel == 0 { + if yield_fn(*work) { + // Returning here is only allowed because we leave the cursors in a + // state that will let us pick up the work correctly on the next + // invocation. + *work -= flush(temp, &mut session); + if yield_fn(*work) { return; } } @@ -483,7 +494,7 @@ where } if !temp.is_empty() { - *fuel += flush(temp, &mut session); + *work -= flush(temp, &mut session); } // We only get here after having iterated through all keys. diff --git a/src/compute/src/render/mod.rs b/src/compute/src/render/mod.rs index 171854a300a7b..cbea84e29ceb5 100644 --- a/src/compute/src/render/mod.rs +++ b/src/compute/src/render/mod.rs @@ -150,7 +150,7 @@ mod threshold; mod top_k; pub use context::CollectionBundle; -pub use join::LinearJoinImpl; +pub use join::{LinearJoinImpl, LinearJoinSpec}; /// Assemble the "compute" side of a dataflow, i.e. all but the sources. /// @@ -289,7 +289,7 @@ pub fn build_compute_dataflow( if recursive { scope.clone().iterative::, _, _>(|region| { let mut context = Context::for_dataflow_in(&dataflow, region.clone()); - context.linear_join_impl = compute_state.linear_join_impl; + context.linear_join_spec = compute_state.linear_join_spec; context.enable_specialized_arrangements = compute_state.enable_specialized_arrangements; @@ -351,7 +351,7 @@ pub fn build_compute_dataflow( } else { scope.clone().region_named(&build_name, |region| { let mut context = Context::for_dataflow_in(&dataflow, region.clone()); - context.linear_join_impl = compute_state.linear_join_impl; + context.linear_join_spec = compute_state.linear_join_spec; context.enable_specialized_arrangements = compute_state.enable_specialized_arrangements; diff --git a/src/sql/src/session/vars.rs b/src/sql/src/session/vars.rs index 3291174243970..9aa15738189e3 100644 --- a/src/sql/src/session/vars.rs +++ b/src/sql/src/session/vars.rs @@ -1296,6 +1296,16 @@ const ENABLE_MZ_JOIN_CORE: ServerVar = ServerVar { internal: true, }; +pub static DEFAULT_LINEAR_JOIN_YIELDING: Lazy = Lazy::new(|| "work:1000000".into()); +static LINEAR_JOIN_YIELDING: Lazy> = Lazy::new(|| ServerVar { + name: UncasedStr::new("linear_join_yielding"), + value: &DEFAULT_LINEAR_JOIN_YIELDING, + description: + "The yielding behavior compute rendering should apply for linear join operators. Either \ + 'work:' or 'time:'.", + internal: true, +}); + pub const ENABLE_DEFAULT_CONNECTION_VALIDATION: ServerVar = ServerVar { name: UncasedStr::new("enable_default_connection_validation"), value: &true, @@ -2471,6 +2481,7 @@ impl SystemVars { .with_var(&KEEP_N_SOURCE_STATUS_HISTORY_ENTRIES) .with_var(&KEEP_N_SINK_STATUS_HISTORY_ENTRIES) .with_var(&ENABLE_MZ_JOIN_CORE) + .with_var(&LINEAR_JOIN_YIELDING) .with_var(&ENABLE_STORAGE_SHARD_FINALIZATION) .with_var(&ENABLE_CONSOLIDATE_AFTER_UNION_NEGATE) .with_var(&ENABLE_SPECIALIZED_ARRANGEMENTS) @@ -3128,6 +3139,11 @@ impl SystemVars { *self.expect_value(&ENABLE_MZ_JOIN_CORE) } + /// Returns the `linear_join_yielding` configuration parameter. + pub fn linear_join_yielding(&self) -> &String { + self.expect_value(&LINEAR_JOIN_YIELDING) + } + /// Returns the `enable_storage_shard_finalization` configuration parameter. pub fn enable_storage_shard_finalization(&self) -> bool { *self.expect_value(&ENABLE_STORAGE_SHARD_FINALIZATION) @@ -4797,6 +4813,7 @@ pub fn is_tracing_var(name: &str) -> bool { pub fn is_compute_config_var(name: &str) -> bool { name == MAX_RESULT_SIZE.name() || name == DATAFLOW_MAX_INFLIGHT_BYTES.name() + || name == LINEAR_JOIN_YIELDING.name() || name == ENABLE_MZ_JOIN_CORE.name() || name == ENABLE_JEMALLOC_PROFILING.name() || name == ENABLE_SPECIALIZED_ARRANGEMENTS.name()