diff --git a/doc/user/content/release-notes.md b/doc/user/content/release-notes.md index 61b974ecb29db..abd92b22d59dc 100644 --- a/doc/user/content/release-notes.md +++ b/doc/user/content/release-notes.md @@ -49,7 +49,7 @@ Wrap your release notes at the 80 character mark. {{% version-header v0.7.2 %}} - Record Kafka Consumer metrics in the `mz_kafka_consumer_statistics` system - table. + table. Enabled by default for all Kafka sources. - Add the [`jsonb_object_agg`](/sql/functions/jsonb_object_agg) function to aggregate rows into a JSON object. diff --git a/src/sql/src/kafka_util.rs b/src/sql/src/kafka_util.rs index e04b9d964f126..7d1fa00e4e2db 100644 --- a/src/sql/src/kafka_util.rs +++ b/src/sql/src/kafka_util.rs @@ -36,11 +36,12 @@ enum ValType { // Describes Kafka cluster configurations users can suppply using `CREATE // SOURCE...WITH (option_list)`. -// TODO(sploiselle): Support overriding keys, default values. +// TODO(sploiselle): Support overriding keys. struct Config { name: &'static str, val_type: ValType, transform: fn(String) -> String, + default: Option, } impl Config { @@ -49,6 +50,7 @@ impl Config { name, val_type, transform: convert::identity, + default: None, } } @@ -69,6 +71,12 @@ impl Config { self } + // Allows for returning a default value for this configuration option + fn set_default(mut self, d: Option) -> Self { + self.default = d; + self + } + // Get the appropriate String to use as the Kafka config key. fn get_key(&self) -> String { self.name.replace("_", ".") @@ -105,7 +113,12 @@ fn extract( Ok(v) => v, Err(e) => bail!("Invalid WITH option {}={}: {}", config.name, v, e), }, - None => continue, + None => match &config.default { + Some(v) => v.to_string(), + None => { + continue; + } + }, }; out.insert(config.get_key(), value); } @@ -134,7 +147,10 @@ pub fn extract_config( // The range of values comes from `statistics.interval.ms` in // https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md ValType::Number(0, 86_400_000), - ), + ) + .set_default(Some( + chrono::Duration::seconds(1).num_milliseconds().to_string(), + )), Config::new( "topic_metadata_refresh_interval_ms", // The range of values comes from `topic.metadata.refresh.interval.ms` in diff --git a/test/catalog-compat/catcompatck/catcompatck b/test/catalog-compat/catcompatck/catcompatck index e90ed0495eb70..512a1941a11ee 100755 --- a/test/catalog-compat/catcompatck/catcompatck +++ b/test/catalog-compat/catcompatck/catcompatck @@ -135,6 +135,42 @@ testdrive --no-reset <<'EOF' > CREATE MATERIALIZED VIEW v AS WITH u AS (SELECT * FROM real_time_src) SELECT * from u; EOF +say "creating Kafka source with metrics disabled" +testdrive --no-reset <<'EOF' +# Copy schema from above, golden010 catalog doesn't support statistics_interval_ms +$ set schema={ + "type": "record", + "name": "envelope", + "fields": [ + { + "name": "before", + "type": [ + { + "name": "row", + "type": "record", + "fields": [ + {"name": "a", "type": "long"}, + {"name": "b", "type": "long"} + ] + }, + "null" + ] + }, + { "name": "after", "type": ["row", "null"] } + ] + } + +> CREATE SOURCE real_time_src_no_stats + FROM KAFKA BROKER '${testdrive.kafka-addr}' TOPIC 'testdrive-real-time-${testdrive.seed}' + WITH (statistics_interval_ms = 0) + FORMAT AVRO USING SCHEMA '${schema}' + ENVELOPE DEBEZIUM + +> CREATE MATERIALIZED VIEW real_time_no_stats AS + SELECT *, concat(a::text, CAST(b AS text)) AS c + FROM real_time_src_no_stats +EOF + say "killing materialized-golden071" kill_materialized @@ -164,4 +200,12 @@ a b 2 1 3 1 1 2 + +# Kafka metrics for the real_time_src should be enabled now +# Count should be 2 because there are two materialized views on real_time_src +# If real_time_src_no_stats were also emitting stats, there would be 3 rows +> SELECT count(*) FROM mz_kafka_consumer_statistics; +count +----- +2 EOF diff --git a/test/testdrive/avro-sources.td b/test/testdrive/avro-sources.td index 222c10a4c3558..1e75ad4a8362c 100644 --- a/test/testdrive/avro-sources.td +++ b/test/testdrive/avro-sources.td @@ -299,7 +299,7 @@ $ kafka-ingest format=avro topic=non-dbz-data-varying-partition schema=${non-dbz # Erroneously adds start_offsets for non-existent partitions. > CREATE MATERIALIZED SOURCE non_dbz_data_varying_partition_2 FROM KAFKA BROKER '${testdrive.kafka-addr}' TOPIC 'testdrive-non-dbz-data-varying-partition-${testdrive.seed}' - WITH (start_offset=[0,1], statistics_interval_ms = 1000) + WITH (start_offset=[0,1]) FORMAT AVRO USING SCHEMA '${non-dbz-schema}' ENVELOPE NONE @@ -333,15 +333,24 @@ a b 5 6 9 10 -# There should be two partitions per consumer +# There should be two partitions for the last created source / consumer (non_dbz_data_varying_partition_2) > SELECT count(*) FROM mz_kafka_consumer_statistics GROUP BY consumer_name; count ----- +1 +1 +1 +1 +2 +2 +2 +2 +2 2 > CREATE MATERIALIZED SOURCE non_dbz_data_varying_partition_3 FROM KAFKA BROKER '${testdrive.kafka-addr}' TOPIC 'testdrive-non-dbz-data-varying-partition-${testdrive.seed}' - WITH (start_offset=[1,1], statistics_interval_ms = 1000) + WITH (start_offset=[1,1]) FORMAT AVRO USING SCHEMA '${non-dbz-schema}' ENVELOPE NONE @@ -358,10 +367,19 @@ a b 9 10 11 12 -# There should be metrics for 3 partitions per consumer +# There should three partitions for the last three sources / consumers (non_dbz_data_varying_partition_[123]) > SELECT count(*) FROM mz_kafka_consumer_statistics GROUP BY consumer_name; count ----- +1 +1 +1 +1 +2 +2 +2 +2 +3 3 3 diff --git a/test/testdrive/kafka-stats.td b/test/testdrive/kafka-stats.td index c46cf43213e73..82c56c9da5606 100644 --- a/test/testdrive/kafka-stats.td +++ b/test/testdrive/kafka-stats.td @@ -22,7 +22,6 @@ $ kafka-create-topic topic=data > CREATE SOURCE data FROM KAFKA BROKER '${testdrive.kafka-addr}' TOPIC 'testdrive-data-${testdrive.seed}' - WITH (statistics_interval_ms = 1000) FORMAT AVRO USING SCHEMA '${schema}' > CREATE MATERIALIZED VIEW test1 AS