Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
93fb82a
Log basic metrics instead of extended statistics
cirego Mar 17, 2021
dc50905
Merge branch 'main' into chris/system-table-kafka-metrics
cirego Mar 17, 2021
ee80890
Add mz_kafka_statistics built-in log / source
cirego Mar 17, 2021
23c3970
Remove change for debugging
cirego Mar 18, 2021
0388f52
Put consumer in the builtin log name
cirego Mar 18, 2021
8945be4
Tracking kafka consumer statistics now works
cirego Mar 18, 2021
f73c6f6
Fixup test_persistence
cirego Mar 18, 2021
045293d
Merge branch 'main' into chris/system-table-kafka-metrics
cirego Mar 22, 2021
9f34b96
Log per-partition kafka consumer metrics
cirego Mar 22, 2021
f09fe85
Merge branch 'main' into chris/system-table-kafka-metrics
cirego Mar 22, 2021
e6fe16a
Record kafka consumer metrics by default
cirego Mar 22, 2021
dbec829
Merge branch 'main' into chris/system-table-kafka-metrics
cirego Mar 22, 2021
351837c
Merge branch 'chris/system-table-kafka-metrics' into chris/default_st…
cirego Mar 22, 2021
3713dbe
Derive default values for stats object
cirego Mar 22, 2021
1c38647
Merge branch 'main' into chris/system-table-kafka-metrics
cirego Mar 22, 2021
80d07f9
Merge branch 'chris/system-table-kafka-metrics' into chris/default_st…
cirego Mar 22, 2021
77d3c5e
Remove commented out log line
cirego Mar 22, 2021
2b94899
Merge branch 'chris/system-table-kafka-metrics' into chris/default_st…
cirego Mar 23, 2021
73165d1
Merge branch 'main' into chris/default_statistics_interval
cirego Mar 23, 2021
e8afcea
Use chrono instead of hardcoded string
cirego Mar 23, 2021
7fd2ba4
Remove completed TODO :)
cirego Mar 23, 2021
ab36134
Use Option<String> instead of lambda
cirego Mar 23, 2021
50f4712
Merge branch 'main' into chris/default_statistics_interval
cirego Mar 24, 2021
33350b1
Remove unneeded use statements
cirego Mar 24, 2021
1b45481
Bump timeout to see if test passes
cirego Mar 24, 2021
80f3100
Another debugging commit for CI
cirego Mar 24, 2021
9c8fe7c
Merge branch 'main' into chris/default_statistics_interval
cirego Mar 25, 2021
3f94cb1
Update release comment / merge main
cirego Mar 25, 2021
478628d
Remove debuggingm commit
cirego Mar 25, 2021
2736dbc
Add upgrade test for kafka stats interval
cirego Mar 25, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion doc/user/content/release-notes.md
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ Wrap your release notes at the 80 character mark.
{{% version-header v0.7.2 %}}

- Record Kafka Consumer metrics in the `mz_kafka_consumer_statistics` system
table.
table. Enabled by default for all Kafka sources.

- Add the [`jsonb_object_agg`](/sql/functions/jsonb_object_agg) function to
aggregate rows into a JSON object.
Expand Down
22 changes: 19 additions & 3 deletions src/sql/src/kafka_util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,12 @@ enum ValType {

// Describes Kafka cluster configurations users can suppply using `CREATE
// SOURCE...WITH (option_list)`.
// TODO(sploiselle): Support overriding keys, default values.
// TODO(sploiselle): Support overriding keys.
struct Config {
name: &'static str,
val_type: ValType,
transform: fn(String) -> String,
default: Option<String>,
}

impl Config {
Expand All @@ -49,6 +50,7 @@ impl Config {
name,
val_type,
transform: convert::identity,
default: None,
}
}

Expand All @@ -69,6 +71,12 @@ impl Config {
self
}

// Allows for returning a default value for this configuration option
fn set_default(mut self, d: Option<String>) -> Self {
self.default = d;
self
}

// Get the appropriate String to use as the Kafka config key.
fn get_key(&self) -> String {
self.name.replace("_", ".")
Expand Down Expand Up @@ -105,7 +113,12 @@ fn extract(
Ok(v) => v,
Err(e) => bail!("Invalid WITH option {}={}: {}", config.name, v, e),
},
None => continue,
None => match &config.default {
Some(v) => v.to_string(),
None => {
continue;
}
},
};
out.insert(config.get_key(), value);
}
Expand Down Expand Up @@ -134,7 +147,10 @@ pub fn extract_config(
// The range of values comes from `statistics.interval.ms` in
// https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md
ValType::Number(0, 86_400_000),
),
)
.set_default(Some(
chrono::Duration::seconds(1).num_milliseconds().to_string(),
)),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(this seems like a rather roundabout way to spell "1000", but leave it if you feel it is clearer as is)

Config::new(
"topic_metadata_refresh_interval_ms",
// The range of values comes from `topic.metadata.refresh.interval.ms` in
Expand Down
44 changes: 44 additions & 0 deletions test/catalog-compat/catcompatck/catcompatck
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,42 @@ testdrive --no-reset <<'EOF'
> CREATE MATERIALIZED VIEW v AS WITH u AS (SELECT * FROM real_time_src) SELECT * from u;
EOF

say "creating Kafka source with metrics disabled"
testdrive --no-reset <<'EOF'
# Copy schema from above, golden010 catalog doesn't support statistics_interval_ms
$ set schema={
"type": "record",
"name": "envelope",
"fields": [
{
"name": "before",
"type": [
{
"name": "row",
"type": "record",
"fields": [
{"name": "a", "type": "long"},
{"name": "b", "type": "long"}
]
},
"null"
]
},
{ "name": "after", "type": ["row", "null"] }
]
}

> CREATE SOURCE real_time_src_no_stats
FROM KAFKA BROKER '${testdrive.kafka-addr}' TOPIC 'testdrive-real-time-${testdrive.seed}'
WITH (statistics_interval_ms = 0)
FORMAT AVRO USING SCHEMA '${schema}'
ENVELOPE DEBEZIUM

> CREATE MATERIALIZED VIEW real_time_no_stats AS
SELECT *, concat(a::text, CAST(b AS text)) AS c
FROM real_time_src_no_stats
EOF

say "killing materialized-golden071"
kill_materialized

Expand Down Expand Up @@ -164,4 +200,12 @@ a b
2 1
3 1
1 2

# Kafka metrics for the real_time_src should be enabled now
# Count should be 2 because there are two materialized views on real_time_src
# If real_time_src_no_stats were also emitting stats, there would be 3 rows
> SELECT count(*) FROM mz_kafka_consumer_statistics;
count
-----
2
EOF
26 changes: 22 additions & 4 deletions test/testdrive/avro-sources.td
Original file line number Diff line number Diff line change
Expand Up @@ -299,7 +299,7 @@ $ kafka-ingest format=avro topic=non-dbz-data-varying-partition schema=${non-dbz
# Erroneously adds start_offsets for non-existent partitions.
> CREATE MATERIALIZED SOURCE non_dbz_data_varying_partition_2
FROM KAFKA BROKER '${testdrive.kafka-addr}' TOPIC 'testdrive-non-dbz-data-varying-partition-${testdrive.seed}'
WITH (start_offset=[0,1], statistics_interval_ms = 1000)
WITH (start_offset=[0,1])
FORMAT AVRO USING SCHEMA '${non-dbz-schema}'
ENVELOPE NONE

Expand Down Expand Up @@ -333,15 +333,24 @@ a b
5 6
9 10

# There should be two partitions per consumer
# There should be two partitions for the last created source / consumer (non_dbz_data_varying_partition_2)
> SELECT count(*) FROM mz_kafka_consumer_statistics GROUP BY consumer_name;
count
-----
1
1
1
1
2
2
2
2
2
2

> CREATE MATERIALIZED SOURCE non_dbz_data_varying_partition_3
FROM KAFKA BROKER '${testdrive.kafka-addr}' TOPIC 'testdrive-non-dbz-data-varying-partition-${testdrive.seed}'
WITH (start_offset=[1,1], statistics_interval_ms = 1000)
WITH (start_offset=[1,1])
FORMAT AVRO USING SCHEMA '${non-dbz-schema}'
ENVELOPE NONE

Expand All @@ -358,10 +367,19 @@ a b
9 10
11 12

# There should be metrics for 3 partitions per consumer
# There should three partitions for the last three sources / consumers (non_dbz_data_varying_partition_[123])
> SELECT count(*) FROM mz_kafka_consumer_statistics GROUP BY consumer_name;
count
-----
1
1
1
1
2
2
2
2
3
3
3

Expand Down
1 change: 0 additions & 1 deletion test/testdrive/kafka-stats.td
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ $ kafka-create-topic topic=data

> CREATE SOURCE data
FROM KAFKA BROKER '${testdrive.kafka-addr}' TOPIC 'testdrive-data-${testdrive.seed}'
WITH (statistics_interval_ms = 1000)
FORMAT AVRO USING SCHEMA '${schema}'

> CREATE MATERIALIZED VIEW test1 AS
Expand Down