diff --git a/demos/data-lakehouse-iceberg-trino-spark/create-spark-ingestion-job.yaml b/demos/data-lakehouse-iceberg-trino-spark/create-spark-ingestion-job.yaml index 2990b0bd..8751e9f6 100644 --- a/demos/data-lakehouse-iceberg-trino-spark/create-spark-ingestion-job.yaml +++ b/demos/data-lakehouse-iceberg-trino-spark/create-spark-ingestion-job.yaml @@ -26,7 +26,7 @@ spec: echo 'Waiting for all nifi instances to be ready' kubectl wait --for=condition=ready --timeout=30m pod -l app.kubernetes.io/name=nifi,app.kubernetes.io/instance=nifi - name: wait-for-kafka-topics - image: oci.stackable.tech/sdp/kafka:3.9.1-stackable0.0.0-dev + image: oci.stackable.tech/sdp/kafka:4.1.0-stackable0.0.0-dev command: - bash - -euo diff --git a/docs/modules/demos/images/nifi-kafka-druid-water-level-data/topics.png b/docs/modules/demos/images/nifi-kafka-druid-water-level-data/topics.png deleted file mode 100644 index ee9107d3..00000000 Binary files a/docs/modules/demos/images/nifi-kafka-druid-water-level-data/topics.png and /dev/null differ diff --git a/docs/modules/demos/pages/nifi-kafka-druid-earthquake-data.adoc b/docs/modules/demos/pages/nifi-kafka-druid-earthquake-data.adoc index 128da163..0dce404f 100644 --- a/docs/modules/demos/pages/nifi-kafka-druid-earthquake-data.adoc +++ b/docs/modules/demos/pages/nifi-kafka-druid-earthquake-data.adoc @@ -86,103 +86,6 @@ $ stackablectl stacklet list include::partial$instance-hint.adoc[] -== Inspect the data in Kafka - -Kafka is an event streaming platform to stream the data in near real-time. -All the messages put in and read from Kafka are structured in dedicated queues called topics. -The test data will be put into a topic called earthquakes. -The records are produced (written) by the test data generator and consumed (read) by Druid afterwards in the same order they were created. - -As Kafka has no web interface, you must use a Kafka client like {kcat}[kcat]. -Kafka uses mutual TLS, so clients wanting to connect to Kafka must present a valid TLS certificate. -The easiest way to obtain this is to shell into the `kafka-broker-default-0` Pod, as we will do in the following section for demonstration purposes. -For a production setup, you should spin up a dedicated Pod provisioned with a certificate acting as a Kafka client instead of shell-ing into the Kafka Pod. - -=== List the available Topics - -You can execute a command on the Kafka broker to list the available topics as follows: - -// In the following commands the kcat-prober container instead of the kafka container is used to send requests to Kafka. -// This is necessary because kcat cannot use key- and truststore files with empty passwords, which are mounted here to the kafka container. -// However, the kcat-prober container has TLS certificates mounted, which can be used by kcat to connect to Kafka. -[source,console] ----- -$ kubectl exec -it kafka-broker-default-0 -c kcat-prober -- /bin/bash -c "/stackable/kcat -b localhost:9093 -X security.protocol=SSL -X ssl.key.location=/stackable/tls-kcat/tls.key -X ssl.certificate.location=/stackable/tls-kcat/tls.crt -X ssl.ca.location=/stackable/tls-kcat/ca.crt -L" -Metadata for all topics (from broker -1: ssl://localhost:9093/bootstrap): - 1 brokers: - broker 1001 at 172.19.0.4:32321 (controller) - 1 topics: - topic "earthquakes" with 8 partitions: - partition 0, leader 1001, replicas: 1001, isrs: 1001 - partition 1, leader 1001, replicas: 1001, isrs: 1001 - partition 2, leader 1001, replicas: 1001, isrs: 1001 - partition 3, leader 1001, replicas: 1001, isrs: 1001 - partition 4, leader 1001, replicas: 1001, isrs: 1001 - partition 5, leader 1001, replicas: 1001, isrs: 1001 - partition 6, leader 1001, replicas: 1001, isrs: 1001 - partition 7, leader 1001, replicas: 1001, isrs: 1001 ----- - -You can see that Kafka consists of one broker, and the topic `earthquakes` with eight partitions has been created. To -see some records sent to Kafka, run the following command. You can change the number of records to print via the `-c` -parameter. - -[source,console] ----- -$ kubectl exec -it kafka-broker-default-0 -c kcat-prober -- /bin/bash -c "/stackable/kcat -b localhost:9093 -X security.protocol=SSL -X ssl.key.location=/stackable/tls-kcat/tls.key -X ssl.certificate.location=/stackable/tls-kcat/tls.crt -X ssl.ca.location=/stackable/tls-kcat/ca.crt -C -t earthquakes -c 1" ----- - -Below is an example of the output of one record: - -[source,json] ----- - { - "time":"1950-02-07T10:37:29.240Z", - "latitude":45.949, - "longitude":151.59, - "depth":35.0, - "mag":5.94, - "magType":"mw", - "nst":null, - "gap":null, - "dmin":null, - "rms":null, - "net":"iscgem", - "id":"iscgem895202", - "updated":"2022-04-26T18:23:38.377Z", - "place":"Kuril Islands", - "type":"earthquake", - "horizontalError":null, - "depthError":12.6, - "magError":0.55, - "magNst":null, - "status":"reviewed", - "locationSource":"iscgem", - "magSource":"iscgem" -} ----- - -If you are interested in how many records have been produced to the Kafka topic so far, use the following command. -It will print the last record produced to the topic partition, formatted with the pattern specified in the `-f` parameter. -The given pattern will print some metadata of the record. - -[source,console] ----- -$ kubectl exec -it kafka-broker-default-0 -c kcat-prober -- /bin/bash -c "/stackable/kcat -b localhost:9093 -X security.protocol=SSL -X ssl.key.location=/stackable/tls-kcat/tls.key -X ssl.certificate.location=/stackable/tls-kcat/tls.crt -X ssl.ca.location=/stackable/tls-kcat/ca.crt -C -t earthquakes -o -8 -c 8 -f 'Topic %t / Partition %p / Offset: %o / Timestamp: %T\n'" -Topic earthquakes / Partition 0 / Offset: 378859 / Timestamp: 1752584024936 -Topic earthquakes / Partition 0 / Offset: 378860 / Timestamp: 1752584024936 -Topic earthquakes / Partition 0 / Offset: 378861 / Timestamp: 1752584024936 -Topic earthquakes / Partition 0 / Offset: 378862 / Timestamp: 1752584024936 -Topic earthquakes / Partition 0 / Offset: 378863 / Timestamp: 1752584024936 -Topic earthquakes / Partition 0 / Offset: 378864 / Timestamp: 1752584024936 -Topic earthquakes / Partition 0 / Offset: 378865 / Timestamp: 1752584024936 -Topic earthquakes / Partition 0 / Offset: 378866 / Timestamp: 1752584024936 ----- - -If you calculate `379,000` records * `8` partitions, you end up with ~ 3,032,000 records. -The output also shows that the last measurement record was produced at the timestamp `1752584024936`, which translates to `Tuesday, 15 July 2025 14:53:44.936 GMT+02:00` -(using e.g. the command `date -d @1752584024`). - == NiFi NiFi is used to fetch earthquake data from the internet and ingest it into Kafka. diff --git a/docs/modules/demos/pages/nifi-kafka-druid-water-level-data.adoc b/docs/modules/demos/pages/nifi-kafka-druid-water-level-data.adoc index c308761b..14e1b0bb 100644 --- a/docs/modules/demos/pages/nifi-kafka-druid-water-level-data.adoc +++ b/docs/modules/demos/pages/nifi-kafka-druid-water-level-data.adoc @@ -92,182 +92,6 @@ $ stackablectl stacklet list include::partial$instance-hint.adoc[] -== Inspect the data in Kafka - -Kafka is an event streaming platform to stream the data in near real-time. All the messages put in and read from Kafka -are structured in dedicated queues called topics. The test data will be put into topics called stations and measurements. The records -are produced (put in) by the test data generator and consumed (read) by Druid afterwards in the same order they were -created. - -As Kafka has no web interface, you must use a Kafka client like {kcat}[kcat]. Kafka uses mutual TLS, so clients -wanting to connect to Kafka must present a valid TLS certificate. The easiest way to obtain this is to shell into the -`kafka-broker-default-0` Pod, as we will do in the following section for demonstration purposes. For a production setup, -you should spin up a dedicated Pod provisioned with a certificate acting as a Kafka client instead of shell-ing into the -Kafka Pod. - -=== List the available Topics - -You can execute a command on the Kafka broker to list the available topics as follows: - -// In the following commands the kcat-prober container instead of the kafka container is used to send requests to Kafka. -// This is necessary because kcat cannot use key- and truststore files with empty passwords, which are mounted here to the kafka container. -// However, the kcat-prober container has TLS certificates mounted, which can be used by kcat to connect to Kafka. -[source,console] ----- -$ kubectl exec -it kafka-broker-default-0 -c kcat-prober -- /bin/bash -c "/stackable/kcat -b localhost:9093 -X security.protocol=SSL -X ssl.key.location=/stackable/tls-kcat/tls.key -X ssl.certificate.location=/stackable/tls-kcat/tls.crt -X ssl.ca.location=/stackable/tls-kcat/ca.crt -L" -Metadata for all topics (from broker -1: ssl://localhost:9093/bootstrap): - 1 brokers: - broker 1001 at 172.19.0.3:31041 (controller) - 2 topics: - topic "stations" with 8 partitions: - partition 0, leader 1001, replicas: 1001, isrs: 1001 - partition 1, leader 1001, replicas: 1001, isrs: 1001 - partition 2, leader 1001, replicas: 1001, isrs: 1001 - partition 3, leader 1001, replicas: 1001, isrs: 1001 - partition 4, leader 1001, replicas: 1001, isrs: 1001 - partition 5, leader 1001, replicas: 1001, isrs: 1001 - partition 6, leader 1001, replicas: 1001, isrs: 1001 - partition 7, leader 1001, replicas: 1001, isrs: 1001 - topic "measurements" with 8 partitions: - partition 0, leader 1001, replicas: 1001, isrs: 1001 - partition 1, leader 1001, replicas: 1001, isrs: 1001 - partition 2, leader 1001, replicas: 1001, isrs: 1001 - partition 3, leader 1001, replicas: 1001, isrs: 1001 - partition 4, leader 1001, replicas: 1001, isrs: 1001 - partition 5, leader 1001, replicas: 1001, isrs: 1001 - partition 6, leader 1001, replicas: 1001, isrs: 1001 - partition 7, leader 1001, replicas: 1001, isrs: 1001 ----- - -You can see that Kafka consists of one broker, and the topics `stations` and `measurements` have been created with eight -partitions each. - -=== Show Sample Records - -To see some records sent to Kafka, run the following commands. You can change the number of records to -print via the `-c` parameter. - -[source,console] ----- -$ kubectl exec -it kafka-broker-default-0 -c kcat-prober -- /bin/bash -c "/stackable/kcat -b localhost:9093 -X security.protocol=SSL -X ssl.key.location=/stackable/tls-kcat/tls.key -X ssl.certificate.location=/stackable/tls-kcat/tls.crt -X ssl.ca.location=/stackable/tls-kcat/ca.crt -C -t stations -c 2" ----- - -Below is an example of the output of two records: - -[source,json] ----- -{ - "uuid": "47174d8f-1b8e-4599-8a59-b580dd55bc87", - "number": 48900237, - "shortname": "EITZE", - "longname": "EITZE", - "km": 9.56, - "agency": "VERDEN", - "longitude": 9.2767694354, - "latitude": 52.9040654474, - "water": { - "shortname": "ALLER", - "longname": "ALLER" - } -} -{ - "uuid": "5aaed954-de4e-4528-8f65-f3f530bc8325", - "number": 48900204, - "shortname": "RETHEM", - "longname": "RETHEM", - "km": 34.22, - "agency": "VERDEN", - "longitude": 9.3828408101, - "latitude": 52.7890975921, - "water": { - "shortname": "ALLER", - "longname": "ALLER" - } -} ----- - -[source,console] ----- -$ kubectl exec -it kafka-broker-default-0 -c kcat-prober -- /bin/bash -c "/stackable/kcat -b localhost:9093 -X security.protocol=SSL -X ssl.key.location=/stackable/tls-kcat/tls.key -X ssl.certificate.location=/stackable/tls-kcat/tls.crt -X ssl.ca.location=/stackable/tls-kcat/ca.crt -C -t measurements -c 3" ----- - -Below is an example of the output of three records: - -[source,json] ----- -{ - "timestamp": 1658151900000, - "value": 221, - "station_uuid": "47174d8f-1b8e-4599-8a59-b580dd55bc87" -} -{ - "timestamp": 1658152800000, - "value": 220, - "station_uuid": "47174d8f-1b8e-4599-8a59-b580dd55bc87" -} -{ - "timestamp": 1658153700000, - "value": 220, - "station_uuid": "47174d8f-1b8e-4599-8a59-b580dd55bc87" -} ----- - -The records of the two topics only contain the needed data. The measurement records contain a `station_uuid` for the -measuring station. The relationship is illustrated below. - -image::nifi-kafka-druid-water-level-data/topics.png[] - -The reason for splitting the data up into two different topics is the improved performance. One more straightforward -solution would be to use a single topic and produce records like the following: - -[source,json] ----- -{ - "uuid": "47174d8f-1b8e-4599-8a59-b580dd55bc87", - "number": 48900237, - "shortname": "EITZE", - "longname": "EITZE", - "km": 9.56, - "agency": "VERDEN", - "longitude": 9.2767694354, - "latitude": 52.9040654474, - "water": { - "shortname": "ALLER", - "longname": "ALLER" - }, - "timestamp": 1658151900000, - "value": 221 -} ----- - -Notice the two last attributes that differ from the previously shown `stations` records. The obvious downside is that -every measurement (multiple millions of it) has to contain all the data known about the station it was measured at. This -often leads to transmitting and storing duplicated information, e.g., the longitude of a station, resulting in increased -network traffic and storage usage. The solution is only to send a station's known/needed data or measurement data. This -process is called data normalization. The downside is that when analyzing the data, you need to combine the records from -multiple tables in Druid (`stations` and `measurements`). - -If you are interested in how many records have been produced to the Kafka topic so far, use the following command. It -will print the last record produced to the topic partition, formatted with the pattern specified in the `-f` parameter. -The given pattern will print some metadata of the record. - -[source,console] ----- -$ kubectl exec -it kafka-broker-default-0 -c kcat-prober -- /bin/bash -c "/stackable/kcat -b localhost:9093 -X security.protocol=SSL -X ssl.key.location=/stackable/tls-kcat/tls.key -X ssl.certificate.location=/stackable/tls-kcat/tls.crt -X ssl.ca.location=/stackable/tls-kcat/ca.crt -C -t measurements -o -8 -c 8 -f 'Topic %t / Partition %p / Offset: %o / Timestamp: %T\n'" -Topic measurements / Partition 0 / Offset: 1324098 / Timestamp: 1680606104652 -Topic measurements / Partition 1 / Offset: 1346816 / Timestamp: 1680606100462 -Topic measurements / Partition 2 / Offset: 1339363 / Timestamp: 1680606100461 -Topic measurements / Partition 3 / Offset: 1352787 / Timestamp: 1680606104652 -Topic measurements / Partition 4 / Offset: 1330144 / Timestamp: 1680606098368 -Topic measurements / Partition 5 / Offset: 1340226 / Timestamp: 1680606104652 -Topic measurements / Partition 6 / Offset: 1320125 / Timestamp: 1680606100462 -Topic measurements / Partition 7 / Offset: 1317719 / Timestamp: 1680606098368 ----- - -If you calculate `1,324,098` records * `8` partitions, you end up with ~ 10,592,784 records. The output also shows that -the last measurement record was produced at the timestamp `1680606104652`, translating to -`Di 4. Apr 13:01:44 CEST 2023` (using the command `date -d @1680606104`). - == NiFi NiFi fetches water-level data from the internet and ingests it into Kafka in real time. This demo includes a workflow diff --git a/stacks/data-lakehouse-iceberg-trino-spark/kafka.yaml b/stacks/data-lakehouse-iceberg-trino-spark/kafka.yaml index ce2af1aa..4cae3791 100644 --- a/stacks/data-lakehouse-iceberg-trino-spark/kafka.yaml +++ b/stacks/data-lakehouse-iceberg-trino-spark/kafka.yaml @@ -7,9 +7,12 @@ spec: image: productVersion: 3.9.1 clusterConfig: - zookeeperConfigMapName: kafka-znode authentication: - authenticationClass: kafka-client-tls + controllers: + roleGroups: + default: + replicas: 1 brokers: config: resources: @@ -25,7 +28,7 @@ spec: default: replicas: 5 configOverrides: - server.properties: + broker.properties: num.partitions: "27" log.segment.bytes: "50000000" # 0.5GB log.retention.bytes: "2000000000" # 2 GB. Should keep between 2.0 and 2.5GB @@ -38,11 +41,3 @@ spec: provider: tls: clientCertSecretClass: tls ---- -apiVersion: zookeeper.stackable.tech/v1alpha1 -kind: ZookeeperZnode -metadata: - name: kafka-znode -spec: - clusterRef: - name: zookeeper diff --git a/stacks/nifi-kafka-druid-superset-s3/kafka.yaml b/stacks/nifi-kafka-druid-superset-s3/kafka.yaml index 3ce5fe7c..1d7dbce8 100644 --- a/stacks/nifi-kafka-druid-superset-s3/kafka.yaml +++ b/stacks/nifi-kafka-druid-superset-s3/kafka.yaml @@ -1,23 +1,18 @@ --- -apiVersion: zookeeper.stackable.tech/v1alpha1 -kind: ZookeeperZnode -metadata: - name: kafka-znode -spec: - clusterRef: - name: zookeeper ---- apiVersion: kafka.stackable.tech/v1alpha1 kind: KafkaCluster metadata: name: kafka spec: image: - productVersion: 3.9.1 + productVersion: 4.1.0 clusterConfig: - zookeeperConfigMapName: kafka-znode authentication: - authenticationClass: kafka-client-tls + controllers: + roleGroups: + default: + replicas: 1 brokers: config: bootstrapListenerClass: external-stable @@ -35,7 +30,7 @@ spec: default: replicas: 1 configOverrides: - server.properties: + broker.properties: num.partitions: "8" # We have # 1 brokers