diff --git a/CHANGELOG.md b/CHANGELOG.md index 8610e718..d35d682a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,14 +9,18 @@ All notable changes to this project will be documented in this file. - Helm: Allow Pod `priorityClassName` to be configured ([#890]). - Add experimental support for Kafka KRaft mode ([#889]). - Add experimental support for Kafka `4.1.0` ([#889]). +- Add `prometheus.io/path|port|scheme` annotations to metrics service ([#897]). ### Changed - Deprecate support for Kafka `3.7.2` ([#892]). +- BREAKING: The `--` rolegroup service was replaced with a `---headless` + and `---metrics` rolegroup service ([#897]). [#889]: https://github.com/stackabletech/kafka-operator/pull/889 [#890]: https://github.com/stackabletech/kafka-operator/pull/890 [#892]: https://github.com/stackabletech/kafka-operator/pull/892 +[#897]: https://github.com/stackabletech/kafka-operator/pull/897 ## [25.7.0] - 2025-07-23 diff --git a/rust/operator-binary/src/config/command.rs b/rust/operator-binary/src/config/command.rs index 3c25a8be..95281baa 100644 --- a/rust/operator-binary/src/config/command.rs +++ b/rust/operator-binary/src/config/command.rs @@ -229,7 +229,7 @@ pub fn controller_kafka_container_command( fn to_listeners(port: u16) -> String { // The environment variables are set in the statefulset of the controller format!( - "{listener_name}://$POD_NAME.$ROLEGROUP_REF.$NAMESPACE.svc.$CLUSTER_DOMAIN:{port}", + "{listener_name}://$POD_NAME.$ROLEGROUP_HEADLESS_SERVICE_NAME.$NAMESPACE.svc.$CLUSTER_DOMAIN:{port}", listener_name = KafkaListenerName::Controller ) } diff --git a/rust/operator-binary/src/crd/listener.rs b/rust/operator-binary/src/crd/listener.rs index 7ba96b54..ed2b2100 100644 --- a/rust/operator-binary/src/crd/listener.rs +++ b/rust/operator-binary/src/crd/listener.rs @@ -4,7 +4,9 @@ use std::{ }; use snafu::{OptionExt, Snafu}; -use stackable_operator::{kube::ResourceExt, utils::cluster_info::KubernetesClusterInfo}; +use stackable_operator::{ + kube::ResourceExt, role_utils::RoleGroupRef, utils::cluster_info::KubernetesClusterInfo, +}; use strum::{EnumDiscriminants, EnumString}; use crate::crd::{STACKABLE_LISTENER_BROKER_DIR, security::KafkaTlsSecurity, v1alpha1}; @@ -170,10 +172,14 @@ impl Display for KafkaListener { pub fn get_kafka_listener_config( kafka: &v1alpha1::KafkaCluster, kafka_security: &KafkaTlsSecurity, - object_name: &str, + rolegroup_ref: &RoleGroupRef, cluster_info: &KubernetesClusterInfo, ) -> Result { - let pod_fqdn = pod_fqdn(kafka, object_name, cluster_info)?; + let pod_fqdn = pod_fqdn( + kafka, + &rolegroup_ref.rolegroup_headless_service_name(), + cluster_info, + )?; let mut listeners = vec![]; let mut advertised_listeners = vec![]; let mut listener_security_protocol_map: BTreeMap = @@ -334,12 +340,11 @@ pub fn node_port_cmd(directory: &str, port_name: &str) -> String { pub fn pod_fqdn( kafka: &v1alpha1::KafkaCluster, - object_name: &str, + sts_service_name: &str, cluster_info: &KubernetesClusterInfo, ) -> Result { Ok(format!( - "$POD_NAME.{object_name}.{namespace}.svc.{cluster_domain}", - object_name = object_name, + "$POD_NAME.{sts_service_name}.{namespace}.svc.{cluster_domain}", namespace = kafka.namespace().context(ObjectHasNoNamespaceSnafu)?, cluster_domain = cluster_info.cluster_domain )) @@ -354,7 +359,7 @@ mod tests { }; use super::*; - use crate::crd::authentication::ResolvedAuthenticationClasses; + use crate::crd::{authentication::ResolvedAuthenticationClasses, role::KafkaRole}; fn default_cluster_info() -> KubernetesClusterInfo { KubernetesClusterInfo { @@ -364,9 +369,6 @@ mod tests { #[test] fn test_get_kafka_listeners_config() { - let object_name = "simple-kafka-broker-default"; - let cluster_info = default_cluster_info(); - let kafka_cluster = r#" apiVersion: kafka.stackable.tech/v1alpha1 kind: KafkaCluster @@ -400,9 +402,12 @@ mod tests { "internalTls".to_string(), Some("tls".to_string()), ); - + let cluster_info = default_cluster_info(); + // "simple-kafka-broker-default" + let rolegroup_ref = kafka.rolegroup_ref(&KafkaRole::Broker, "default"); let config = - get_kafka_listener_config(&kafka, &kafka_security, object_name, &cluster_info).unwrap(); + get_kafka_listener_config(&kafka, &kafka_security, &rolegroup_ref, &cluster_info) + .unwrap(); assert_eq!( config.listeners(), @@ -428,7 +433,12 @@ mod tests { kafka_security.client_port_name() ), internal_name = KafkaListenerName::Internal, - internal_host = pod_fqdn(&kafka, object_name, &cluster_info).unwrap(), + internal_host = pod_fqdn( + &kafka, + &rolegroup_ref.rolegroup_headless_service_name(), + &cluster_info + ) + .unwrap(), internal_port = kafka_security.internal_port(), ) ); @@ -454,7 +464,8 @@ mod tests { Some("tls".to_string()), ); let config = - get_kafka_listener_config(&kafka, &kafka_security, object_name, &cluster_info).unwrap(); + get_kafka_listener_config(&kafka, &kafka_security, &rolegroup_ref, &cluster_info) + .unwrap(); assert_eq!( config.listeners(), @@ -480,7 +491,12 @@ mod tests { kafka_security.client_port_name() ), internal_name = KafkaListenerName::Internal, - internal_host = pod_fqdn(&kafka, object_name, &cluster_info).unwrap(), + internal_host = pod_fqdn( + &kafka, + &rolegroup_ref.rolegroup_headless_service_name(), + &cluster_info + ) + .unwrap(), internal_port = kafka_security.internal_port(), ) ); @@ -505,7 +521,8 @@ mod tests { ); let config = - get_kafka_listener_config(&kafka, &kafka_security, object_name, &cluster_info).unwrap(); + get_kafka_listener_config(&kafka, &kafka_security, &rolegroup_ref, &cluster_info) + .unwrap(); assert_eq!( config.listeners(), @@ -531,7 +548,12 @@ mod tests { kafka_security.client_port_name() ), internal_name = KafkaListenerName::Internal, - internal_host = pod_fqdn(&kafka, object_name, &cluster_info).unwrap(), + internal_host = pod_fqdn( + &kafka, + &rolegroup_ref.rolegroup_headless_service_name(), + &cluster_info + ) + .unwrap(), internal_port = kafka_security.internal_port(), ) ); @@ -552,9 +574,6 @@ mod tests { #[test] fn test_get_kafka_kerberos_listeners_config() { - let object_name = "simple-kafka-broker-default"; - let cluster_info = default_cluster_info(); - let kafka_cluster = r#" apiVersion: kafka.stackable.tech/v1alpha1 kind: KafkaCluster @@ -587,9 +606,12 @@ mod tests { "tls".to_string(), Some("tls".to_string()), ); - + let cluster_info = default_cluster_info(); + // "simple-kafka-broker-default" + let rolegroup_ref = kafka.rolegroup_ref(&KafkaRole::Broker, "default"); let config = - get_kafka_listener_config(&kafka, &kafka_security, object_name, &cluster_info).unwrap(); + get_kafka_listener_config(&kafka, &kafka_security, &rolegroup_ref, &cluster_info) + .unwrap(); assert_eq!( config.listeners(), @@ -618,7 +640,12 @@ mod tests { kafka_security.client_port_name() ), internal_name = KafkaListenerName::Internal, - internal_host = pod_fqdn(&kafka, object_name, &cluster_info).unwrap(), + internal_host = pod_fqdn( + &kafka, + &rolegroup_ref.rolegroup_headless_service_name(), + &cluster_info + ) + .unwrap(), internal_port = kafka_security.internal_port(), bootstrap_name = KafkaListenerName::Bootstrap, bootstrap_host = node_address_cmd(STACKABLE_LISTENER_BROKER_DIR), diff --git a/rust/operator-binary/src/crd/mod.rs b/rust/operator-binary/src/crd/mod.rs index 9cb50b1f..9748388c 100644 --- a/rust/operator-binary/src/crd/mod.rs +++ b/rust/operator-binary/src/crd/mod.rs @@ -294,7 +294,9 @@ impl v1alpha1::KafkaCluster { for replica in 0..replicas { pod_descriptors.push(KafkaPodDescriptor { namespace: namespace.clone(), - role_group_service_name: rolegroup_ref.object_name(), + role_group_service_name: rolegroup_ref + .rolegroup_headless_service_name(), + role_group_statefulset_name: rolegroup_ref.object_name(), replica, cluster_domain: cluster_info.cluster_domain.clone(), node_id: node_id_hash_offset + u32::from(replica), @@ -341,6 +343,7 @@ impl v1alpha1::KafkaCluster { #[derive(Debug, PartialEq, Eq, PartialOrd, Ord)] pub struct KafkaPodDescriptor { namespace: String, + role_group_statefulset_name: String, role_group_service_name: String, replica: u16, cluster_domain: DomainName, @@ -361,7 +364,7 @@ impl KafkaPodDescriptor { } pub fn pod_name(&self) -> String { - format!("{}-{}", self.role_group_service_name, self.replica) + format!("{}-{}", self.role_group_statefulset_name, self.replica) } /// Build the Kraft voter String diff --git a/rust/operator-binary/src/kafka_controller.rs b/rust/operator-binary/src/kafka_controller.rs index 47b701a7..fbce9c2a 100644 --- a/rust/operator-binary/src/kafka_controller.rs +++ b/rust/operator-binary/src/kafka_controller.rs @@ -49,7 +49,7 @@ use crate::{ resource::{ configmap::build_rolegroup_config_map, listener::build_broker_rolegroup_bootstrap_listener, - service::build_rolegroup_service, + service::{build_rolegroup_headless_service, build_rolegroup_metrics_service}, statefulset::{build_broker_rolegroup_statefulset, build_controller_rolegroup_statefulset}, }, }; @@ -347,8 +347,16 @@ pub async fn reconcile_kafka( .merged_config(kafka, &rolegroup_ref.role_group) .context(FailedToResolveConfigSnafu)?; - let rg_service = - build_rolegroup_service(kafka, &resolved_product_image, &rolegroup_ref) + let rg_headless_service = build_rolegroup_headless_service( + kafka, + &resolved_product_image, + &rolegroup_ref, + &kafka_security, + ) + .context(BuildServiceSnafu)?; + + let rg_metrics_service = + build_rolegroup_metrics_service(kafka, &resolved_product_image, &rolegroup_ref) .context(BuildServiceSnafu)?; let rg_configmap = build_rolegroup_config_map( @@ -407,7 +415,13 @@ pub async fn reconcile_kafka( } cluster_resources - .add(client, rg_service) + .add(client, rg_headless_service) + .await + .with_context(|_| ApplyRoleGroupServiceSnafu { + rolegroup: rolegroup_ref.clone(), + })?; + cluster_resources + .add(client, rg_metrics_service) .await .with_context(|_| ApplyRoleGroupServiceSnafu { rolegroup: rolegroup_ref.clone(), diff --git a/rust/operator-binary/src/resource/service.rs b/rust/operator-binary/src/resource/service.rs index d9c7c7c9..5e3e0b56 100644 --- a/rust/operator-binary/src/resource/service.rs +++ b/rust/operator-binary/src/resource/service.rs @@ -2,13 +2,13 @@ use snafu::{ResultExt, Snafu}; use stackable_operator::{ builder::meta::ObjectMetaBuilder, commons::product_image_selection::ResolvedProductImage, - k8s_openapi::api::core::v1::{Service, ServiceSpec}, - kvp::{Label, Labels}, + k8s_openapi::api::core::v1::{Service, ServicePort, ServiceSpec}, + kvp::{Annotations, Labels}, role_utils::RoleGroupRef, }; use crate::{ - crd::{APP_NAME, v1alpha1}, + crd::{APP_NAME, METRICS_PORT, METRICS_PORT_NAME, security::KafkaTlsSecurity, v1alpha1}, kafka_controller::KAFKA_CONTROLLER_NAME, utils::build_recommended_labels, }; @@ -34,15 +34,16 @@ pub enum Error { /// The rolegroup [`Service`] is a headless service that allows direct access to the instances of a certain rolegroup /// /// This is mostly useful for internal communication between peers, or for clients that perform client-side load balancing. -pub fn build_rolegroup_service( +pub fn build_rolegroup_headless_service( kafka: &v1alpha1::KafkaCluster, resolved_product_image: &ResolvedProductImage, rolegroup: &RoleGroupRef, + kafka_security: &KafkaTlsSecurity, ) -> Result { Ok(Service { metadata: ObjectMetaBuilder::new() .name_and_namespace(kafka) - .name(rolegroup.object_name()) + .name(rolegroup.rolegroup_headless_service_name()) .ownerreference_from_resource(kafka, None, Some(true)) .context(ObjectMissingMetadataForOwnerRefSnafu)? .with_recommended_labels(build_recommended_labels( @@ -53,10 +54,10 @@ pub fn build_rolegroup_service( &rolegroup.role_group, )) .context(MetadataBuildSnafu)? - .with_label(Label::try_from(("prometheus.io/scrape", "true")).context(LabelBuildSnafu)?) .build(), spec: Some(ServiceSpec { cluster_ip: Some("None".to_string()), + ports: Some(headless_ports(kafka_security)), selector: Some( Labels::role_group_selector( kafka, @@ -73,3 +74,87 @@ pub fn build_rolegroup_service( status: None, }) } + +/// The rolegroup metrics [`Service`] is a service that exposes metrics and a prometheus scraping label +pub fn build_rolegroup_metrics_service( + kafka: &v1alpha1::KafkaCluster, + resolved_product_image: &ResolvedProductImage, + rolegroup: &RoleGroupRef, +) -> Result { + let metrics_service = Service { + metadata: ObjectMetaBuilder::new() + .name_and_namespace(kafka) + .name(rolegroup.rolegroup_metrics_service_name()) + .ownerreference_from_resource(kafka, None, Some(true)) + .context(ObjectMissingMetadataForOwnerRefSnafu)? + .with_recommended_labels(build_recommended_labels( + kafka, + KAFKA_CONTROLLER_NAME, + &resolved_product_image.app_version_label_value, + &rolegroup.role, + &rolegroup.role_group, + )) + .context(MetadataBuildSnafu)? + .with_labels(prometheus_labels()) + .with_annotations(prometheus_annotations()) + .build(), + spec: Some(ServiceSpec { + // Internal communication does not need to be exposed + type_: Some("ClusterIP".to_string()), + cluster_ip: Some("None".to_string()), + ports: Some(metrics_ports()), + selector: Some( + Labels::role_group_selector( + kafka, + APP_NAME, + &rolegroup.role, + &rolegroup.role_group, + ) + .context(LabelBuildSnafu)? + .into(), + ), + publish_not_ready_addresses: Some(true), + ..ServiceSpec::default() + }), + status: None, + }; + Ok(metrics_service) +} + +fn metrics_ports() -> Vec { + vec![ServicePort { + name: Some(METRICS_PORT_NAME.to_string()), + port: METRICS_PORT.into(), + protocol: Some("TCP".to_string()), + ..ServicePort::default() + }] +} + +fn headless_ports(kafka_security: &KafkaTlsSecurity) -> Vec { + vec![ServicePort { + name: Some(kafka_security.client_port_name().into()), + port: kafka_security.client_port().into(), + protocol: Some("TCP".to_string()), + ..ServicePort::default() + }] +} + +/// Common labels for Prometheus +fn prometheus_labels() -> Labels { + Labels::try_from([("prometheus.io/scrape", "true")]).expect("should be a valid label") +} + +/// Common annotations for Prometheus +/// +/// These annotations can be used in a ServiceMonitor. +/// +/// see also +fn prometheus_annotations() -> Annotations { + Annotations::try_from([ + ("prometheus.io/path".to_owned(), "/metrics".to_owned()), + ("prometheus.io/port".to_owned(), METRICS_PORT.to_string()), + ("prometheus.io/scheme".to_owned(), "http".to_owned()), + ("prometheus.io/scrape".to_owned(), "true".to_owned()), + ]) + .expect("should be valid annotations") +} diff --git a/rust/operator-binary/src/resource/statefulset.rs b/rust/operator-binary/src/resource/statefulset.rs index b5b758ec..182c1839 100644 --- a/rust/operator-binary/src/resource/statefulset.rs +++ b/rust/operator-binary/src/resource/statefulset.rs @@ -161,7 +161,7 @@ pub enum Error { /// The broker rolegroup [`StatefulSet`] runs the rolegroup, as configured by the administrator. /// /// The [`Pod`](`stackable_operator::k8s_openapi::api::core::v1::Pod`)s are accessible through the corresponding -/// [`Service`](`stackable_operator::k8s_openapi::api::core::v1::Service`) from [`build_rolegroup_service`](`crate::resource::service::build_rolegroup_service`). +/// [`Service`](`stackable_operator::k8s_openapi::api::core::v1::Service`) from [`build_rolegroup_service`](`crate::resource::service::build_rolegroup_headless_service`). #[allow(clippy::too_many_arguments)] pub fn build_broker_rolegroup_statefulset( kafka: &v1alpha1::KafkaCluster, @@ -285,13 +285,9 @@ pub fn build_broker_rolegroup_statefulset( ..EnvVar::default() }); - let kafka_listeners = get_kafka_listener_config( - kafka, - kafka_security, - &rolegroup_ref.object_name(), - cluster_info, - ) - .context(InvalidKafkaListenersSnafu)?; + let kafka_listeners = + get_kafka_listener_config(kafka, kafka_security, rolegroup_ref, cluster_info) + .context(InvalidKafkaListenersSnafu)?; let cluster_id = kafka.cluster_id().context(ClusterIdMissingSnafu)?; @@ -547,7 +543,7 @@ pub fn build_broker_rolegroup_statefulset( ), ..LabelSelector::default() }, - service_name: Some(rolegroup_ref.object_name()), + service_name: Some(rolegroup_ref.rolegroup_headless_service_name()), template: pod_template, volume_claim_templates: Some(pvcs), ..StatefulSetSpec::default() @@ -621,8 +617,8 @@ pub fn build_controller_rolegroup_statefulset( }); env.push(EnvVar { - name: "ROLEGROUP_REF".to_string(), - value: Some(rolegroup_ref.object_name()), + name: "ROLEGROUP_HEADLESS_SERVICE_NAME".to_string(), + value: Some(rolegroup_ref.rolegroup_headless_service_name()), ..EnvVar::default() }); @@ -632,13 +628,9 @@ pub fn build_controller_rolegroup_statefulset( ..EnvVar::default() }); - let kafka_listeners = get_kafka_listener_config( - kafka, - kafka_security, - &rolegroup_ref.object_name(), - cluster_info, - ) - .context(InvalidKafkaListenersSnafu)?; + let kafka_listeners = + get_kafka_listener_config(kafka, kafka_security, rolegroup_ref, cluster_info) + .context(InvalidKafkaListenersSnafu)?; cb_kafka .image_from_product_image(resolved_product_image) @@ -864,7 +856,7 @@ pub fn build_controller_rolegroup_statefulset( ), ..LabelSelector::default() }, - service_name: Some(rolegroup_ref.object_name()), + service_name: Some(rolegroup_ref.rolegroup_headless_service_name()), template: pod_template, volume_claim_templates: Some(merged_config.resources().storage.build_pvcs()), ..StatefulSetSpec::default() diff --git a/tests/templates/kuttl/smoke-kraft/30-assert.yaml.j2 b/tests/templates/kuttl/smoke-kraft/30-assert.yaml.j2 index 02f55756..5bd04442 100644 --- a/tests/templates/kuttl/smoke-kraft/30-assert.yaml.j2 +++ b/tests/templates/kuttl/smoke-kraft/30-assert.yaml.j2 @@ -42,3 +42,113 @@ metadata: status: readyReplicas: 1 replicas: 1 +--- +apiVersion: v1 +kind: Service +metadata: + name: test-kafka-broker-default-headless +spec: + ports: + - name: kafka-tls + port: 9093 + protocol: TCP + targetPort: 9093 +--- +apiVersion: v1 +kind: Service +metadata: + name: test-kafka-broker-default-metrics +spec: + ports: + - name: metrics + port: 9606 + protocol: TCP + targetPort: 9606 +--- +apiVersion: v1 +kind: Service +metadata: + name: test-kafka-broker-automatic-log-config-headless +spec: + ports: + - name: kafka-tls + port: 9093 + protocol: TCP + targetPort: 9093 +--- +apiVersion: v1 +kind: Service +metadata: + name: test-kafka-broker-automatic-log-config-metrics +spec: + ports: + - name: metrics + port: 9606 + protocol: TCP + targetPort: 9606 +--- +apiVersion: v1 +kind: Service +metadata: + name: test-kafka-broker-custom-log-config-headless +spec: + ports: + - name: kafka-tls + port: 9093 + protocol: TCP + targetPort: 9093 +--- +apiVersion: v1 +kind: Service +metadata: + name: test-kafka-broker-custom-log-config-metrics +spec: + ports: + - name: metrics + port: 9606 + protocol: TCP + targetPort: 9606 +--- +apiVersion: v1 +kind: Service +metadata: + name: test-kafka-controller-automatic-log-config-headless +spec: + ports: + - name: kafka-tls + port: 9093 + protocol: TCP + targetPort: 9093 +--- +apiVersion: v1 +kind: Service +metadata: + name: test-kafka-controller-automatic-log-config-metrics +spec: + ports: + - name: metrics + port: 9606 + protocol: TCP + targetPort: 9606 +--- +apiVersion: v1 +kind: Service +metadata: + name: test-kafka-controller-custom-log-config-headless +spec: + ports: + - name: kafka-tls + port: 9093 + protocol: TCP + targetPort: 9093 +--- +apiVersion: v1 +kind: Service +metadata: + name: test-kafka-controller-custom-log-config-metrics +spec: + ports: + - name: metrics + port: 9606 + protocol: TCP + targetPort: 9606 diff --git a/tests/templates/kuttl/smoke-kraft/metrics.py b/tests/templates/kuttl/smoke-kraft/metrics.py index 7c9f8027..1f39540d 100644 --- a/tests/templates/kuttl/smoke-kraft/metrics.py +++ b/tests/templates/kuttl/smoke-kraft/metrics.py @@ -12,7 +12,9 @@ stream=sys.stdout, ) - http_code = requests.get("http://test-kafka-broker-default:9606").status_code + http_code = requests.get( + "http://test-kafka-broker-default-metrics:9606" + ).status_code if http_code != 200: result = 1 diff --git a/tests/templates/kuttl/smoke/metrics.py b/tests/templates/kuttl/smoke/metrics.py index 7c9f8027..ad0c75e0 100644 --- a/tests/templates/kuttl/smoke/metrics.py +++ b/tests/templates/kuttl/smoke/metrics.py @@ -12,7 +12,9 @@ stream=sys.stdout, ) - http_code = requests.get("http://test-kafka-broker-default:9606").status_code + http_code = requests.get( + "http://test-kafka-broker-default-metrics:9606/metrics" + ).status_code if http_code != 200: result = 1 diff --git a/tests/templates/kuttl/smoke/test_heap.sh b/tests/templates/kuttl/smoke/test_heap.sh index cd76d42a..28004bc4 100755 --- a/tests/templates/kuttl/smoke/test_heap.sh +++ b/tests/templates/kuttl/smoke/test_heap.sh @@ -4,12 +4,12 @@ # 2Gi * 0.8 -> 1638 EXPECTED_HEAP="-Xmx1638m -Xms1638m" -# Check if ZK_SERVER_HEAP is set to the correct calculated value +# Check if KAFKA_HEAP_OPTS is set to the correct calculated value if [[ $KAFKA_HEAP_OPTS == "$EXPECTED_HEAP" ]] then echo "[SUCCESS] KAFKA_HEAP_OPTS set to $EXPECTED_HEAP" else - echo "[ERROR] KAFKA_HEAP_OPTS not set or set with wrong value: $ZK_SERVER_HEAP" + echo "[ERROR] KAFKA_HEAP_OPTS not set or set with wrong value: $KAFKA_HEAP_OPTS" exit 1 fi