Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,18 @@ All notable changes to this project will be documented in this file.
- Helm: Allow Pod `priorityClassName` to be configured ([#890]).
- Add experimental support for Kafka KRaft mode ([#889]).
- Add experimental support for Kafka `4.1.0` ([#889]).
- Add `prometheus.io/path|port|scheme` annotations to metrics service ([#897]).

### Changed

- Deprecate support for Kafka `3.7.2` ([#892]).
- BREAKING: The `<cluster>-<role>-<rolegroup>` rolegroup service was replaced with a `<cluster>-<role>-<rolegroup>-headless`
and `<cluster>-<role>-<rolegroup>-metrics` rolegroup service ([#897]).

[#889]: https://github.com/stackabletech/kafka-operator/pull/889
[#890]: https://github.com/stackabletech/kafka-operator/pull/890
[#892]: https://github.com/stackabletech/kafka-operator/pull/892
[#897]: https://github.com/stackabletech/kafka-operator/pull/897

## [25.7.0] - 2025-07-23

Expand Down
2 changes: 1 addition & 1 deletion rust/operator-binary/src/config/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,7 @@ pub fn controller_kafka_container_command(
fn to_listeners(port: u16) -> String {
// The environment variables are set in the statefulset of the controller
format!(
"{listener_name}://$POD_NAME.$ROLEGROUP_REF.$NAMESPACE.svc.$CLUSTER_DOMAIN:{port}",
"{listener_name}://$POD_NAME.$ROLEGROUP_HEADLESS_SERVICE_NAME.$NAMESPACE.svc.$CLUSTER_DOMAIN:{port}",
listener_name = KafkaListenerName::Controller
)
}
Expand Down
73 changes: 50 additions & 23 deletions rust/operator-binary/src/crd/listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@ use std::{
};

use snafu::{OptionExt, Snafu};
use stackable_operator::{kube::ResourceExt, utils::cluster_info::KubernetesClusterInfo};
use stackable_operator::{
kube::ResourceExt, role_utils::RoleGroupRef, utils::cluster_info::KubernetesClusterInfo,
};
use strum::{EnumDiscriminants, EnumString};

use crate::crd::{STACKABLE_LISTENER_BROKER_DIR, security::KafkaTlsSecurity, v1alpha1};
Expand Down Expand Up @@ -170,10 +172,14 @@ impl Display for KafkaListener {
pub fn get_kafka_listener_config(
kafka: &v1alpha1::KafkaCluster,
kafka_security: &KafkaTlsSecurity,
object_name: &str,
rolegroup_ref: &RoleGroupRef<v1alpha1::KafkaCluster>,
cluster_info: &KubernetesClusterInfo,
) -> Result<KafkaListenerConfig, KafkaListenerError> {
let pod_fqdn = pod_fqdn(kafka, object_name, cluster_info)?;
let pod_fqdn = pod_fqdn(
kafka,
&rolegroup_ref.rolegroup_headless_service_name(),
cluster_info,
)?;
let mut listeners = vec![];
let mut advertised_listeners = vec![];
let mut listener_security_protocol_map: BTreeMap<KafkaListenerName, KafkaListenerProtocol> =
Expand Down Expand Up @@ -334,12 +340,11 @@ pub fn node_port_cmd(directory: &str, port_name: &str) -> String {

pub fn pod_fqdn(
kafka: &v1alpha1::KafkaCluster,
object_name: &str,
sts_service_name: &str,
cluster_info: &KubernetesClusterInfo,
) -> Result<String, KafkaListenerError> {
Ok(format!(
"$POD_NAME.{object_name}.{namespace}.svc.{cluster_domain}",
object_name = object_name,
"$POD_NAME.{sts_service_name}.{namespace}.svc.{cluster_domain}",
namespace = kafka.namespace().context(ObjectHasNoNamespaceSnafu)?,
cluster_domain = cluster_info.cluster_domain
))
Expand All @@ -354,7 +359,7 @@ mod tests {
};

use super::*;
use crate::crd::authentication::ResolvedAuthenticationClasses;
use crate::crd::{authentication::ResolvedAuthenticationClasses, role::KafkaRole};

fn default_cluster_info() -> KubernetesClusterInfo {
KubernetesClusterInfo {
Expand All @@ -364,9 +369,6 @@ mod tests {

#[test]
fn test_get_kafka_listeners_config() {
let object_name = "simple-kafka-broker-default";
let cluster_info = default_cluster_info();

let kafka_cluster = r#"
apiVersion: kafka.stackable.tech/v1alpha1
kind: KafkaCluster
Expand Down Expand Up @@ -400,9 +402,12 @@ mod tests {
"internalTls".to_string(),
Some("tls".to_string()),
);

let cluster_info = default_cluster_info();
// "simple-kafka-broker-default"
let rolegroup_ref = kafka.rolegroup_ref(&KafkaRole::Broker, "default");
let config =
get_kafka_listener_config(&kafka, &kafka_security, object_name, &cluster_info).unwrap();
get_kafka_listener_config(&kafka, &kafka_security, &rolegroup_ref, &cluster_info)
.unwrap();

assert_eq!(
config.listeners(),
Expand All @@ -428,7 +433,12 @@ mod tests {
kafka_security.client_port_name()
),
internal_name = KafkaListenerName::Internal,
internal_host = pod_fqdn(&kafka, object_name, &cluster_info).unwrap(),
internal_host = pod_fqdn(
&kafka,
&rolegroup_ref.rolegroup_headless_service_name(),
&cluster_info
)
.unwrap(),
internal_port = kafka_security.internal_port(),
)
);
Expand All @@ -454,7 +464,8 @@ mod tests {
Some("tls".to_string()),
);
let config =
get_kafka_listener_config(&kafka, &kafka_security, object_name, &cluster_info).unwrap();
get_kafka_listener_config(&kafka, &kafka_security, &rolegroup_ref, &cluster_info)
.unwrap();

assert_eq!(
config.listeners(),
Expand All @@ -480,7 +491,12 @@ mod tests {
kafka_security.client_port_name()
),
internal_name = KafkaListenerName::Internal,
internal_host = pod_fqdn(&kafka, object_name, &cluster_info).unwrap(),
internal_host = pod_fqdn(
&kafka,
&rolegroup_ref.rolegroup_headless_service_name(),
&cluster_info
)
.unwrap(),
internal_port = kafka_security.internal_port(),
)
);
Expand All @@ -505,7 +521,8 @@ mod tests {
);

let config =
get_kafka_listener_config(&kafka, &kafka_security, object_name, &cluster_info).unwrap();
get_kafka_listener_config(&kafka, &kafka_security, &rolegroup_ref, &cluster_info)
.unwrap();

assert_eq!(
config.listeners(),
Expand All @@ -531,7 +548,12 @@ mod tests {
kafka_security.client_port_name()
),
internal_name = KafkaListenerName::Internal,
internal_host = pod_fqdn(&kafka, object_name, &cluster_info).unwrap(),
internal_host = pod_fqdn(
&kafka,
&rolegroup_ref.rolegroup_headless_service_name(),
&cluster_info
)
.unwrap(),
internal_port = kafka_security.internal_port(),
)
);
Expand All @@ -552,9 +574,6 @@ mod tests {

#[test]
fn test_get_kafka_kerberos_listeners_config() {
let object_name = "simple-kafka-broker-default";
let cluster_info = default_cluster_info();

let kafka_cluster = r#"
apiVersion: kafka.stackable.tech/v1alpha1
kind: KafkaCluster
Expand Down Expand Up @@ -587,9 +606,12 @@ mod tests {
"tls".to_string(),
Some("tls".to_string()),
);

let cluster_info = default_cluster_info();
// "simple-kafka-broker-default"
let rolegroup_ref = kafka.rolegroup_ref(&KafkaRole::Broker, "default");
let config =
get_kafka_listener_config(&kafka, &kafka_security, object_name, &cluster_info).unwrap();
get_kafka_listener_config(&kafka, &kafka_security, &rolegroup_ref, &cluster_info)
.unwrap();

assert_eq!(
config.listeners(),
Expand Down Expand Up @@ -618,7 +640,12 @@ mod tests {
kafka_security.client_port_name()
),
internal_name = KafkaListenerName::Internal,
internal_host = pod_fqdn(&kafka, object_name, &cluster_info).unwrap(),
internal_host = pod_fqdn(
&kafka,
&rolegroup_ref.rolegroup_headless_service_name(),
&cluster_info
)
.unwrap(),
internal_port = kafka_security.internal_port(),
bootstrap_name = KafkaListenerName::Bootstrap,
bootstrap_host = node_address_cmd(STACKABLE_LISTENER_BROKER_DIR),
Expand Down
7 changes: 5 additions & 2 deletions rust/operator-binary/src/crd/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -294,7 +294,9 @@ impl v1alpha1::KafkaCluster {
for replica in 0..replicas {
pod_descriptors.push(KafkaPodDescriptor {
namespace: namespace.clone(),
role_group_service_name: rolegroup_ref.object_name(),
role_group_service_name: rolegroup_ref
.rolegroup_headless_service_name(),
role_group_statefulset_name: rolegroup_ref.object_name(),
replica,
cluster_domain: cluster_info.cluster_domain.clone(),
node_id: node_id_hash_offset + u32::from(replica),
Expand Down Expand Up @@ -341,6 +343,7 @@ impl v1alpha1::KafkaCluster {
#[derive(Debug, PartialEq, Eq, PartialOrd, Ord)]
pub struct KafkaPodDescriptor {
namespace: String,
role_group_statefulset_name: String,
role_group_service_name: String,
replica: u16,
cluster_domain: DomainName,
Expand All @@ -361,7 +364,7 @@ impl KafkaPodDescriptor {
}

pub fn pod_name(&self) -> String {
format!("{}-{}", self.role_group_service_name, self.replica)
format!("{}-{}", self.role_group_statefulset_name, self.replica)
}

/// Build the Kraft voter String
Expand Down
22 changes: 18 additions & 4 deletions rust/operator-binary/src/kafka_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ use crate::{
resource::{
configmap::build_rolegroup_config_map,
listener::build_broker_rolegroup_bootstrap_listener,
service::build_rolegroup_service,
service::{build_rolegroup_headless_service, build_rolegroup_metrics_service},
statefulset::{build_broker_rolegroup_statefulset, build_controller_rolegroup_statefulset},
},
};
Expand Down Expand Up @@ -347,8 +347,16 @@ pub async fn reconcile_kafka(
.merged_config(kafka, &rolegroup_ref.role_group)
.context(FailedToResolveConfigSnafu)?;

let rg_service =
build_rolegroup_service(kafka, &resolved_product_image, &rolegroup_ref)
let rg_headless_service = build_rolegroup_headless_service(
kafka,
&resolved_product_image,
&rolegroup_ref,
&kafka_security,
)
.context(BuildServiceSnafu)?;

let rg_metrics_service =
build_rolegroup_metrics_service(kafka, &resolved_product_image, &rolegroup_ref)
.context(BuildServiceSnafu)?;

let rg_configmap = build_rolegroup_config_map(
Expand Down Expand Up @@ -407,7 +415,13 @@ pub async fn reconcile_kafka(
}

cluster_resources
.add(client, rg_service)
.add(client, rg_headless_service)
.await
.with_context(|_| ApplyRoleGroupServiceSnafu {
rolegroup: rolegroup_ref.clone(),
})?;
cluster_resources
.add(client, rg_metrics_service)
.await
.with_context(|_| ApplyRoleGroupServiceSnafu {
rolegroup: rolegroup_ref.clone(),
Expand Down
Loading
Loading