diff --git a/.github/workflows/semantic.yml b/.github/workflows/semantic.yml index f4a66dd44e73f..d901308a303e0 100644 --- a/.github/workflows/semantic.yml +++ b/.github/workflows/semantic.yml @@ -159,6 +159,7 @@ jobs: kubernetes_logs source logstash source mongodb_metrics source + mqtt source nats source new source nginx_metrics source diff --git a/changelog.d/23670_support_many_mqtt_source_topics.enhancement.md b/changelog.d/23670_support_many_mqtt_source_topics.enhancement.md new file mode 100644 index 0000000000000..9cc08b4ad6072 --- /dev/null +++ b/changelog.d/23670_support_many_mqtt_source_topics.enhancement.md @@ -0,0 +1,3 @@ +The `mqtt` source config field `topic` can now be a list of mqtt topic strings instead of just a string. If a list is provided, the `mqtt` source client will subscribe to all the topics. + +authors: december1981 diff --git a/src/sources/mqtt/config.rs b/src/sources/mqtt/config.rs index 50fba4d18883b..6731c380be070 100644 --- a/src/sources/mqtt/config.rs +++ b/src/sources/mqtt/config.rs @@ -20,7 +20,7 @@ use crate::{ TlsSnafu, }, config::{SourceConfig, SourceContext, SourceOutput}, - serde::{default_decoding, default_framing_message_based}, + serde::{OneOrMany, default_decoding, default_framing_message_based}, }; use super::source::MqttSource; @@ -34,11 +34,11 @@ pub struct MqttSourceConfig { #[serde(flatten)] pub common: MqttCommonConfig, - /// MQTT topic from which messages are to be read. + /// MQTT topic or topics from which messages are to be read. #[configurable(derived)] #[serde(default = "default_topic")] #[derivative(Default(value = "default_topic()"))] - pub topic: String, + pub topic: OneOrMany, #[configurable(derived)] #[serde(default = "default_framing_message_based")] @@ -65,8 +65,8 @@ pub struct MqttSourceConfig { pub topic_key: OptionalValuePath, } -fn default_topic() -> String { - "vector".to_owned() +fn default_topic() -> OneOrMany { + OneOrMany::One("vector".into()) } fn default_topic_key() -> OptionalValuePath { diff --git a/src/sources/mqtt/integration_tests.rs b/src/sources/mqtt/integration_tests.rs index 52c3a2e0c00ee..0a48747f6a1ae 100644 --- a/src/sources/mqtt/integration_tests.rs +++ b/src/sources/mqtt/integration_tests.rs @@ -2,6 +2,7 @@ #![cfg(test)] use crate::common::mqtt::MqttCommonConfig; +use crate::serde::OneOrMany; use crate::test_util::trace_init; use crate::test_util::{components::SOURCE_TAGS, random_lines_with_stream, random_string}; use rumqttc::{AsyncClient, MqttOptions, QoS}; @@ -58,13 +59,13 @@ async fn get_mqtt_client() -> AsyncClient { } #[tokio::test] -async fn mqtt_happy() { +async fn mqtt_one_topic_happy() { trace_init(); let topic = "source-test"; // We always want new client ID. If it were stable, subsequent tests could receive data sent in previous runs. let client_id = format!("sourceTest{}", random_string(6)); let num_events = 10; - let (input, _events) = random_lines_with_stream(100, num_events, None); + let (input, ..) = random_lines_with_stream(100, num_events, None); assert_source_compliance(&SOURCE_TAGS, async { let common = MqttCommonConfig { @@ -76,7 +77,7 @@ async fn mqtt_happy() { let config = MqttSourceConfig { common, - topic: topic.to_owned(), + topic: OneOrMany::One(topic.to_owned()), ..MqttSourceConfig::default() }; @@ -115,3 +116,69 @@ async fn mqtt_happy() { }) .await; } + +#[tokio::test] +async fn mqtt_many_topics_happy() { + trace_init(); + let topic_prefix_1 = "source-prefix-1"; + let topic_prefix_2 = "source-prefix-2"; + // We always want new client ID. If it were stable, subsequent tests could receive data sent in previous runs. + let client_id = format!("sourceTest{}", random_string(6)); + let num_events = 10; + let (input_1, ..) = random_lines_with_stream(100, num_events, None); + let (input_2, ..) = random_lines_with_stream(100, num_events, None); + + assert_source_compliance(&SOURCE_TAGS, async { + let common = MqttCommonConfig { + host: mqtt_broker_address(), + port: mqtt_broker_port(), + client_id: Some(client_id), + ..Default::default() + }; + + let config = MqttSourceConfig { + common, + topic: OneOrMany::Many(vec![ + format!("{topic_prefix_1}/#"), + format!("{topic_prefix_2}/#"), + ]), + ..MqttSourceConfig::default() + }; + + let (tx, rx) = SourceSender::new_test(); + tokio::spawn(async move { + config + .build(SourceContext::new_test(tx, None)) + .await + .unwrap() + .await + .unwrap() + }); + + tokio::time::sleep(Duration::from_millis(100)).await; + + let client = get_mqtt_client().await; + send_test_events(&client, &format!("{topic_prefix_1}/test"), &input_1).await; + send_test_events(&client, &format!("{topic_prefix_2}/test"), &input_2).await; + + let mut expected_messages: HashSet<_> = + input_1.into_iter().chain(input_2.into_iter()).collect(); + + let events: Vec = timeout(Duration::from_secs(2), rx.take(num_events * 2).collect()) + .await + .unwrap(); + + for event in events { + let message = event + .as_log() + .get(log_schema().message_key_target_path().unwrap()) + .unwrap() + .to_string_lossy(); + if !expected_messages.remove(message.as_ref()) { + panic!("Received unexpected message: {message:?}"); + } + } + assert!(expected_messages.is_empty()); + }) + .await; +} diff --git a/src/sources/mqtt/source.rs b/src/sources/mqtt/source.rs index 1a70b22ed5208..39cbfbade459c 100644 --- a/src/sources/mqtt/source.rs +++ b/src/sources/mqtt/source.rs @@ -9,11 +9,12 @@ use crate::{ event::BatchNotifier, event::Event, internal_events::{EndpointBytesReceived, StreamClosedError}, + serde::OneOrMany, shutdown::ShutdownSignal, sources::mqtt::MqttSourceConfig, sources::util, }; -use rumqttc::{Event as MqttEvent, Incoming, Publish, QoS}; +use rumqttc::{Event as MqttEvent, Incoming, Publish, QoS, SubscribeFilter}; use vector_lib::config::LegacyKey; use vector_lib::lookup::path; @@ -42,10 +43,25 @@ impl MqttSource { pub async fn run(self, mut out: SourceSender, shutdown: ShutdownSignal) -> Result<(), ()> { let (client, mut connection) = self.connector.connect(); - client - .subscribe(&self.config.topic, QoS::AtLeastOnce) - .await - .map_err(|_| ())?; + match &self.config.topic { + OneOrMany::One(topic) => { + client + .subscribe(topic, QoS::AtLeastOnce) + .await + .map_err(|_| ())?; + } + OneOrMany::Many(topics) => { + client + .subscribe_many( + topics + .iter() + .cloned() + .map(|topic| SubscribeFilter::new(topic, QoS::AtLeastOnce)), + ) + .await + .map_err(|_| ())?; + } + } loop { tokio::select! { diff --git a/website/cue/reference/components/sources/generated/mqtt.cue b/website/cue/reference/components/sources/generated/mqtt.cue index d5d20644c185e..ba6001172a7e1 100644 --- a/website/cue/reference/components/sources/generated/mqtt.cue +++ b/website/cue/reference/components/sources/generated/mqtt.cue @@ -578,7 +578,7 @@ generated: components: sources: mqtt: configuration: { } } topic: { - description: "MQTT topic from which messages are to be read." + description: "MQTT topic or topics from which messages are to be read." required: false type: string: default: "vector" }