Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 25 additions & 1 deletion src/adapter/src/flags.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,22 +10,30 @@
use std::time::Duration;

use mz_compute_client::protocol::command::ComputeParameters;
use mz_compute_types::dataflows::YieldSpec;
use mz_orchestrator::scheduling_config::{ServiceSchedulingConfig, ServiceTopologySpreadConfig};
use mz_ore::cast::CastFrom;
use mz_ore::error::ErrorExt;
use mz_persist_client::cfg::{PersistParameters, RetryParameters};
use mz_service::params::GrpcClientParameters;
use mz_sql::session::vars::SystemVars;
use mz_sql::session::vars::{SystemVars, DEFAULT_LINEAR_JOIN_YIELDING};
use mz_storage_types::parameters::{
StorageMaxInflightBytesConfig, StorageParameters, UpsertAutoSpillConfig,
};
use mz_tracing::params::TracingParameters;

/// Return the current compute configuration, derived from the system configuration.
pub fn compute_config(config: &SystemVars) -> ComputeParameters {
let linear_join_yielding = config.linear_join_yielding();
let linear_join_yielding = parse_yield_spec(linear_join_yielding).unwrap_or_else(|| {
tracing::error!("invalid `linear_join_yielding` config: {linear_join_yielding}");
parse_yield_spec(&DEFAULT_LINEAR_JOIN_YIELDING).expect("default is valid")
});

ComputeParameters {
max_result_size: Some(config.max_result_size()),
dataflow_max_inflight_bytes: Some(config.dataflow_max_inflight_bytes()),
linear_join_yielding: Some(linear_join_yielding),
enable_mz_join_core: Some(config.enable_mz_join_core()),
enable_jemalloc_profiling: Some(config.enable_jemalloc_profiling()),
enable_specialized_arrangements: Some(config.enable_specialized_arrangements()),
Expand All @@ -35,6 +43,22 @@ pub fn compute_config(config: &SystemVars) -> ComputeParameters {
}
}

fn parse_yield_spec(s: &str) -> Option<YieldSpec> {
let parts: Vec<_> = s.split(':').collect();
match &parts[..] {
["work", amount] => {
let amount = amount.parse().ok()?;
Some(YieldSpec::ByWork(amount))
}
["time", millis] => {
let millis = millis.parse().ok()?;
let duration = Duration::from_millis(millis);
Some(YieldSpec::ByTime(duration))
}
_ => None,
}
}

/// Return the current storage configuration, derived from the system configuration.
pub fn storage_config(config: &SystemVars) -> StorageParameters {
StorageParameters {
Expand Down
1 change: 1 addition & 0 deletions src/compute-client/src/protocol/command.proto
Original file line number Diff line number Diff line change
Expand Up @@ -70,4 +70,5 @@ message ProtoComputeParameters {
mz_service.params.ProtoGrpcClientParameters grpc_client = 6;
optional bool enable_jemalloc_profiling = 7;
optional bool enable_specialized_arrangements = 8;
mz_compute_types.dataflows.ProtoYieldSpec linear_join_yielding = 9;
}
10 changes: 9 additions & 1 deletion src/compute-client/src/protocol/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
//! Compute protocol commands.

use mz_cluster_client::client::{ClusterStartupEpoch, TimelyConfig, TryIntoTimelyConfig};
use mz_compute_types::dataflows::DataflowDescription;
use mz_compute_types::dataflows::{DataflowDescription, YieldSpec};
use mz_expr::RowSetFinishing;
use mz_ore::tracing::OpenTelemetryContext;
use mz_persist_client::cfg::PersistParameters;
Expand Down Expand Up @@ -359,6 +359,8 @@ pub struct ComputeParameters {
pub max_result_size: Option<u32>,
/// The maximum number of in-flight bytes emitted by persist_sources feeding dataflows.
pub dataflow_max_inflight_bytes: Option<usize>,
/// The yielding behavior with which linear joins should be rendered.
pub linear_join_yielding: Option<YieldSpec>,
/// Whether rendering should use `mz_join_core` rather than DD's `JoinCore::join_core`.
pub enable_mz_join_core: Option<bool>,
/// Whether to activate jemalloc heap profiling.
Expand All @@ -379,6 +381,7 @@ impl ComputeParameters {
let ComputeParameters {
max_result_size,
dataflow_max_inflight_bytes,
linear_join_yielding,
enable_mz_join_core,
enable_jemalloc_profiling,
enable_specialized_arrangements,
Expand All @@ -393,6 +396,9 @@ impl ComputeParameters {
if dataflow_max_inflight_bytes.is_some() {
self.dataflow_max_inflight_bytes = dataflow_max_inflight_bytes;
}
if linear_join_yielding.is_some() {
self.linear_join_yielding = linear_join_yielding;
}
if enable_mz_join_core.is_some() {
self.enable_mz_join_core = enable_mz_join_core;
}
Expand Down Expand Up @@ -420,6 +426,7 @@ impl RustType<ProtoComputeParameters> for ComputeParameters {
ProtoComputeParameters {
max_result_size: self.max_result_size.into_proto(),
dataflow_max_inflight_bytes: self.dataflow_max_inflight_bytes.into_proto(),
linear_join_yielding: self.linear_join_yielding.into_proto(),
enable_mz_join_core: self.enable_mz_join_core.into_proto(),
enable_jemalloc_profiling: self.enable_jemalloc_profiling.into_proto(),
enable_specialized_arrangements: self.enable_specialized_arrangements.into_proto(),
Expand All @@ -433,6 +440,7 @@ impl RustType<ProtoComputeParameters> for ComputeParameters {
Ok(Self {
max_result_size: proto.max_result_size.into_rust()?,
dataflow_max_inflight_bytes: proto.dataflow_max_inflight_bytes.into_rust()?,
linear_join_yielding: proto.linear_join_yielding.into_rust()?,
enable_mz_join_core: proto.enable_mz_join_core.into_rust()?,
enable_jemalloc_profiling: proto.enable_jemalloc_profiling.into_rust()?,
enable_specialized_arrangements: proto.enable_specialized_arrangements.into_rust()?,
Expand Down
8 changes: 8 additions & 0 deletions src/compute-types/src/dataflows.proto
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import "compute-types/src/plan.proto";
import "compute-types/src/sinks.proto";
import "compute-types/src/sources.proto";
import "expr/src/scalar.proto";
import "proto/src/proto.proto";
import "repr/src/antichain.proto";
import "repr/src/global_id.proto";
import "repr/src/relation_and_scalar.proto";
Expand Down Expand Up @@ -63,3 +64,10 @@ message ProtoBuildDesc {
mz_repr.global_id.ProtoGlobalId id = 1;
plan.ProtoPlan plan = 2;
}

message ProtoYieldSpec {
oneof kind {
uint64 by_work = 1;
mz_proto.ProtoDuration by_time = 2;
}
}
33 changes: 33 additions & 0 deletions src/compute-types/src/dataflows.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
//! Types for describing dataflows.

use std::collections::{BTreeMap, BTreeSet};
use std::time::Duration;

use mz_expr::{CollectionPlan, MirRelationExpr, MirScalarExpr, OptimizedMirRelationExpr};
use mz_proto::{IntoRustIfSome, ProtoMapEntry, ProtoType, RustType, TryFromProtoError};
Expand Down Expand Up @@ -640,6 +641,38 @@ impl RustType<ProtoBuildDesc> for BuildDesc<crate::plan::Plan> {
}
}

/// Specification of a dataflow operator's yielding behavior.
#[derive(Clone, Copy, Debug, Eq, PartialEq, Serialize, Deserialize, Arbitrary)]
pub enum YieldSpec {
ByWork(usize),
ByTime(Duration),
}

impl RustType<ProtoYieldSpec> for YieldSpec {
fn into_proto(&self) -> ProtoYieldSpec {
use proto_yield_spec::Kind;

let kind = match *self {
Self::ByWork(w) => Kind::ByWork(w.into_proto()),
Self::ByTime(t) => Kind::ByTime(t.into_proto()),
};
ProtoYieldSpec { kind: Some(kind) }
}

fn from_proto(proto: ProtoYieldSpec) -> Result<Self, TryFromProtoError> {
use proto_yield_spec::Kind;

let Some(kind) = proto.kind else {
return Err(TryFromProtoError::missing_field("ProtoYieldSpec::kind"));
};
let spec = match kind {
Kind::ByWork(w) => Self::ByWork(w.into_rust()?),
Kind::ByTime(t) => Self::ByTime(t.into_rust()?),
};
Ok(spec)
}
}

#[cfg(test)]
mod tests {
use mz_proto::protobuf_roundtrip;
Expand Down
14 changes: 9 additions & 5 deletions src/compute/src/compute_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ use crate::arrangement::manager::{SpecializedTraceHandle, TraceBundle, TraceMana
use crate::logging;
use crate::logging::compute::ComputeEvent;
use crate::metrics::ComputeMetrics;
use crate::render::LinearJoinImpl;
use crate::render::{LinearJoinImpl, LinearJoinSpec};
use crate::server::ResponseSender;
use crate::typedefs::TraceRowHandle;

Expand Down Expand Up @@ -84,8 +84,8 @@ pub struct ComputeState {
max_result_size: u32,
/// Maximum number of in-flight bytes emitted by persist_sources feeding dataflows.
pub dataflow_max_inflight_bytes: usize,
/// Implementation to use for rendering linear joins.
pub linear_join_impl: LinearJoinImpl,
/// Specification for rendering linear joins.
pub linear_join_spec: LinearJoinSpec,
/// Metrics for this replica.
pub metrics: ComputeMetrics,
/// A process-global handle to tracing configuration.
Expand Down Expand Up @@ -116,7 +116,7 @@ impl ComputeState {
command_history,
max_result_size: u32::MAX,
dataflow_max_inflight_bytes: usize::MAX,
linear_join_impl: Default::default(),
linear_join_spec: Default::default(),
metrics,
tracing_handle,
enable_specialized_arrangements: Default::default(),
Expand Down Expand Up @@ -198,6 +198,7 @@ impl<'a, A: Allocate + 'static> ActiveComputeState<'a, A> {
let ComputeParameters {
max_result_size,
dataflow_max_inflight_bytes,
linear_join_yielding,
enable_mz_join_core,
enable_jemalloc_profiling,
enable_specialized_arrangements,
Expand All @@ -212,11 +213,14 @@ impl<'a, A: Allocate + 'static> ActiveComputeState<'a, A> {
if let Some(v) = dataflow_max_inflight_bytes {
self.compute_state.dataflow_max_inflight_bytes = v;
}
if let Some(v) = linear_join_yielding {
self.compute_state.linear_join_spec.yielding = v;
}
if let Some(v) = enable_specialized_arrangements {
self.compute_state.enable_specialized_arrangements = v;
}
if let Some(v) = enable_mz_join_core {
self.compute_state.linear_join_impl = match v {
self.compute_state.linear_join_spec.implementation = match v {
false => LinearJoinImpl::DifferentialDataflow,
true => LinearJoinImpl::Materialize,
};
Expand Down
8 changes: 4 additions & 4 deletions src/compute/src/render/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ use timely::progress::{Antichain, Timestamp};
use crate::arrangement::manager::SpecializedTraceHandle;
use crate::extensions::arrange::{KeyCollection, MzArrange};
use crate::render::errors::ErrorLogger;
use crate::render::join::LinearJoinImpl;
use crate::render::join::LinearJoinSpec;
use crate::typedefs::{ErrSpine, RowSpine, TraceErrHandle, TraceRowHandle};

// Local type definition to avoid the horror in signatures.
Expand Down Expand Up @@ -98,8 +98,8 @@ where
pub bindings: BTreeMap<Id, CollectionBundle<S, T>>,
/// A token that operators can probe to know whether the dataflow is shutting down.
pub(super) shutdown_token: ShutdownToken,
/// The implementation to use for rendering linear joins.
pub(super) linear_join_impl: LinearJoinImpl,
/// Specification for rendering linear joins.
pub(super) linear_join_spec: LinearJoinSpec,
pub(super) enable_specialized_arrangements: bool,
}

Expand Down Expand Up @@ -127,7 +127,7 @@ where
until: dataflow.until.clone(),
bindings: BTreeMap::new(),
shutdown_token: Default::default(),
linear_join_impl: Default::default(),
linear_join_spec: Default::default(),
enable_specialized_arrangements: Default::default(),
}
}
Expand Down
Loading