Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/semantic.yml
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,7 @@ jobs:
kubernetes_logs source
logstash source
mongodb_metrics source
mqtt source
nats source
new source
nginx_metrics source
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
The `mqtt` source config field `topic` can now be a list of mqtt topic strings instead of just a string. If a list is provided, the `mqtt` source client will subscribe to all the topics.

authors: december1981
10 changes: 5 additions & 5 deletions src/sources/mqtt/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use crate::{
TlsSnafu,
},
config::{SourceConfig, SourceContext, SourceOutput},
serde::{default_decoding, default_framing_message_based},
serde::{OneOrMany, default_decoding, default_framing_message_based},
};

use super::source::MqttSource;
Expand All @@ -34,11 +34,11 @@ pub struct MqttSourceConfig {
#[serde(flatten)]
pub common: MqttCommonConfig,

/// MQTT topic from which messages are to be read.
/// MQTT topic or topics from which messages are to be read.
#[configurable(derived)]
#[serde(default = "default_topic")]
#[derivative(Default(value = "default_topic()"))]
pub topic: String,
pub topic: OneOrMany<String>,

#[configurable(derived)]
#[serde(default = "default_framing_message_based")]
Expand All @@ -65,8 +65,8 @@ pub struct MqttSourceConfig {
pub topic_key: OptionalValuePath,
}

fn default_topic() -> String {
"vector".to_owned()
fn default_topic() -> OneOrMany<String> {
OneOrMany::One("vector".into())
}

fn default_topic_key() -> OptionalValuePath {
Expand Down
73 changes: 70 additions & 3 deletions src/sources/mqtt/integration_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
#![cfg(test)]

use crate::common::mqtt::MqttCommonConfig;
use crate::serde::OneOrMany;
use crate::test_util::trace_init;
use crate::test_util::{components::SOURCE_TAGS, random_lines_with_stream, random_string};
use rumqttc::{AsyncClient, MqttOptions, QoS};
Expand Down Expand Up @@ -58,13 +59,13 @@ async fn get_mqtt_client() -> AsyncClient {
}

#[tokio::test]
async fn mqtt_happy() {
async fn mqtt_one_topic_happy() {
trace_init();
let topic = "source-test";
// We always want new client ID. If it were stable, subsequent tests could receive data sent in previous runs.
let client_id = format!("sourceTest{}", random_string(6));
let num_events = 10;
let (input, _events) = random_lines_with_stream(100, num_events, None);
let (input, ..) = random_lines_with_stream(100, num_events, None);

assert_source_compliance(&SOURCE_TAGS, async {
let common = MqttCommonConfig {
Expand All @@ -76,7 +77,7 @@ async fn mqtt_happy() {

let config = MqttSourceConfig {
common,
topic: topic.to_owned(),
topic: OneOrMany::One(topic.to_owned()),
..MqttSourceConfig::default()
};

Expand Down Expand Up @@ -115,3 +116,69 @@ async fn mqtt_happy() {
})
.await;
}

#[tokio::test]
async fn mqtt_many_topics_happy() {
trace_init();
let topic_prefix_1 = "source-prefix-1";
let topic_prefix_2 = "source-prefix-2";
// We always want new client ID. If it were stable, subsequent tests could receive data sent in previous runs.
let client_id = format!("sourceTest{}", random_string(6));
let num_events = 10;
let (input_1, ..) = random_lines_with_stream(100, num_events, None);
let (input_2, ..) = random_lines_with_stream(100, num_events, None);

assert_source_compliance(&SOURCE_TAGS, async {
let common = MqttCommonConfig {
host: mqtt_broker_address(),
port: mqtt_broker_port(),
client_id: Some(client_id),
..Default::default()
};

let config = MqttSourceConfig {
common,
topic: OneOrMany::Many(vec![
format!("{topic_prefix_1}/#"),
format!("{topic_prefix_2}/#"),
]),
..MqttSourceConfig::default()
};

let (tx, rx) = SourceSender::new_test();
tokio::spawn(async move {
config
.build(SourceContext::new_test(tx, None))
.await
.unwrap()
.await
.unwrap()
});

tokio::time::sleep(Duration::from_millis(100)).await;

let client = get_mqtt_client().await;
send_test_events(&client, &format!("{topic_prefix_1}/test"), &input_1).await;
send_test_events(&client, &format!("{topic_prefix_2}/test"), &input_2).await;

let mut expected_messages: HashSet<_> =
input_1.into_iter().chain(input_2.into_iter()).collect();

let events: Vec<Event> = timeout(Duration::from_secs(2), rx.take(num_events * 2).collect())
.await
.unwrap();

for event in events {
let message = event
.as_log()
.get(log_schema().message_key_target_path().unwrap())
.unwrap()
.to_string_lossy();
if !expected_messages.remove(message.as_ref()) {
panic!("Received unexpected message: {message:?}");
}
}
assert!(expected_messages.is_empty());
})
.await;
}
26 changes: 21 additions & 5 deletions src/sources/mqtt/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,12 @@ use crate::{
event::BatchNotifier,
event::Event,
internal_events::{EndpointBytesReceived, StreamClosedError},
serde::OneOrMany,
shutdown::ShutdownSignal,
sources::mqtt::MqttSourceConfig,
sources::util,
};
use rumqttc::{Event as MqttEvent, Incoming, Publish, QoS};
use rumqttc::{Event as MqttEvent, Incoming, Publish, QoS, SubscribeFilter};
use vector_lib::config::LegacyKey;
use vector_lib::lookup::path;

Expand Down Expand Up @@ -42,10 +43,25 @@ impl MqttSource {
pub async fn run(self, mut out: SourceSender, shutdown: ShutdownSignal) -> Result<(), ()> {
let (client, mut connection) = self.connector.connect();

client
.subscribe(&self.config.topic, QoS::AtLeastOnce)
.await
.map_err(|_| ())?;
match &self.config.topic {
OneOrMany::One(topic) => {
client
.subscribe(topic, QoS::AtLeastOnce)
.await
.map_err(|_| ())?;
}
OneOrMany::Many(topics) => {
client
.subscribe_many(
topics
.iter()
.cloned()
.map(|topic| SubscribeFilter::new(topic, QoS::AtLeastOnce)),
)
.await
.map_err(|_| ())?;
}
}

loop {
tokio::select! {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -578,7 +578,7 @@ generated: components: sources: mqtt: configuration: {
}
}
topic: {
description: "MQTT topic from which messages are to be read."
description: "MQTT topic or topics from which messages are to be read."
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
description: "MQTT topic or topics from which messages are to be read."
description: "MQTT topic(s) from which messages are read."

Copy link
Author

@december1981 december1981 Sep 1, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't mind either way. I wrote it as topic or topics to make it a bit clearer that it could be specified as one or many (not just a single item of many for the former case)

required: false
type: string: default: "vector"
}
Expand Down
Loading