Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ spec:
echo 'Waiting for all nifi instances to be ready'
kubectl wait --for=condition=ready --timeout=30m pod -l app.kubernetes.io/name=nifi,app.kubernetes.io/instance=nifi
- name: wait-for-kafka-topics
image: oci.stackable.tech/sdp/kafka:3.9.1-stackable0.0.0-dev
image: oci.stackable.tech/sdp/kafka:4.1.0-stackable0.0.0-dev
command:
- bash
- -euo
Expand Down
Binary file not shown.
97 changes: 0 additions & 97 deletions docs/modules/demos/pages/nifi-kafka-druid-earthquake-data.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -86,103 +86,6 @@ $ stackablectl stacklet list

include::partial$instance-hint.adoc[]

== Inspect the data in Kafka

Kafka is an event streaming platform to stream the data in near real-time.
All the messages put in and read from Kafka are structured in dedicated queues called topics.
The test data will be put into a topic called earthquakes.
The records are produced (written) by the test data generator and consumed (read) by Druid afterwards in the same order they were created.

As Kafka has no web interface, you must use a Kafka client like {kcat}[kcat].
Kafka uses mutual TLS, so clients wanting to connect to Kafka must present a valid TLS certificate.
The easiest way to obtain this is to shell into the `kafka-broker-default-0` Pod, as we will do in the following section for demonstration purposes.
For a production setup, you should spin up a dedicated Pod provisioned with a certificate acting as a Kafka client instead of shell-ing into the Kafka Pod.

=== List the available Topics

You can execute a command on the Kafka broker to list the available topics as follows:

// In the following commands the kcat-prober container instead of the kafka container is used to send requests to Kafka.
// This is necessary because kcat cannot use key- and truststore files with empty passwords, which are mounted here to the kafka container.
// However, the kcat-prober container has TLS certificates mounted, which can be used by kcat to connect to Kafka.
[source,console]
----
$ kubectl exec -it kafka-broker-default-0 -c kcat-prober -- /bin/bash -c "/stackable/kcat -b localhost:9093 -X security.protocol=SSL -X ssl.key.location=/stackable/tls-kcat/tls.key -X ssl.certificate.location=/stackable/tls-kcat/tls.crt -X ssl.ca.location=/stackable/tls-kcat/ca.crt -L"
Metadata for all topics (from broker -1: ssl://localhost:9093/bootstrap):
1 brokers:
broker 1001 at 172.19.0.4:32321 (controller)
1 topics:
topic "earthquakes" with 8 partitions:
partition 0, leader 1001, replicas: 1001, isrs: 1001
partition 1, leader 1001, replicas: 1001, isrs: 1001
partition 2, leader 1001, replicas: 1001, isrs: 1001
partition 3, leader 1001, replicas: 1001, isrs: 1001
partition 4, leader 1001, replicas: 1001, isrs: 1001
partition 5, leader 1001, replicas: 1001, isrs: 1001
partition 6, leader 1001, replicas: 1001, isrs: 1001
partition 7, leader 1001, replicas: 1001, isrs: 1001
----

You can see that Kafka consists of one broker, and the topic `earthquakes` with eight partitions has been created. To
see some records sent to Kafka, run the following command. You can change the number of records to print via the `-c`
parameter.

[source,console]
----
$ kubectl exec -it kafka-broker-default-0 -c kcat-prober -- /bin/bash -c "/stackable/kcat -b localhost:9093 -X security.protocol=SSL -X ssl.key.location=/stackable/tls-kcat/tls.key -X ssl.certificate.location=/stackable/tls-kcat/tls.crt -X ssl.ca.location=/stackable/tls-kcat/ca.crt -C -t earthquakes -c 1"
----

Below is an example of the output of one record:

[source,json]
----
{
"time":"1950-02-07T10:37:29.240Z",
"latitude":45.949,
"longitude":151.59,
"depth":35.0,
"mag":5.94,
"magType":"mw",
"nst":null,
"gap":null,
"dmin":null,
"rms":null,
"net":"iscgem",
"id":"iscgem895202",
"updated":"2022-04-26T18:23:38.377Z",
"place":"Kuril Islands",
"type":"earthquake",
"horizontalError":null,
"depthError":12.6,
"magError":0.55,
"magNst":null,
"status":"reviewed",
"locationSource":"iscgem",
"magSource":"iscgem"
}
----

If you are interested in how many records have been produced to the Kafka topic so far, use the following command.
It will print the last record produced to the topic partition, formatted with the pattern specified in the `-f` parameter.
The given pattern will print some metadata of the record.

[source,console]
----
$ kubectl exec -it kafka-broker-default-0 -c kcat-prober -- /bin/bash -c "/stackable/kcat -b localhost:9093 -X security.protocol=SSL -X ssl.key.location=/stackable/tls-kcat/tls.key -X ssl.certificate.location=/stackable/tls-kcat/tls.crt -X ssl.ca.location=/stackable/tls-kcat/ca.crt -C -t earthquakes -o -8 -c 8 -f 'Topic %t / Partition %p / Offset: %o / Timestamp: %T\n'"
Topic earthquakes / Partition 0 / Offset: 378859 / Timestamp: 1752584024936
Topic earthquakes / Partition 0 / Offset: 378860 / Timestamp: 1752584024936
Topic earthquakes / Partition 0 / Offset: 378861 / Timestamp: 1752584024936
Topic earthquakes / Partition 0 / Offset: 378862 / Timestamp: 1752584024936
Topic earthquakes / Partition 0 / Offset: 378863 / Timestamp: 1752584024936
Topic earthquakes / Partition 0 / Offset: 378864 / Timestamp: 1752584024936
Topic earthquakes / Partition 0 / Offset: 378865 / Timestamp: 1752584024936
Topic earthquakes / Partition 0 / Offset: 378866 / Timestamp: 1752584024936
----

If you calculate `379,000` records * `8` partitions, you end up with ~ 3,032,000 records.
The output also shows that the last measurement record was produced at the timestamp `1752584024936`, which translates to `Tuesday, 15 July 2025 14:53:44.936 GMT+02:00`
(using e.g. the command `date -d @1752584024`).

== NiFi

NiFi is used to fetch earthquake data from the internet and ingest it into Kafka.
Expand Down
176 changes: 0 additions & 176 deletions docs/modules/demos/pages/nifi-kafka-druid-water-level-data.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -92,182 +92,6 @@ $ stackablectl stacklet list

include::partial$instance-hint.adoc[]

== Inspect the data in Kafka

Kafka is an event streaming platform to stream the data in near real-time. All the messages put in and read from Kafka
are structured in dedicated queues called topics. The test data will be put into topics called stations and measurements. The records
are produced (put in) by the test data generator and consumed (read) by Druid afterwards in the same order they were
created.

As Kafka has no web interface, you must use a Kafka client like {kcat}[kcat]. Kafka uses mutual TLS, so clients
wanting to connect to Kafka must present a valid TLS certificate. The easiest way to obtain this is to shell into the
`kafka-broker-default-0` Pod, as we will do in the following section for demonstration purposes. For a production setup,
you should spin up a dedicated Pod provisioned with a certificate acting as a Kafka client instead of shell-ing into the
Kafka Pod.

=== List the available Topics

You can execute a command on the Kafka broker to list the available topics as follows:

// In the following commands the kcat-prober container instead of the kafka container is used to send requests to Kafka.
// This is necessary because kcat cannot use key- and truststore files with empty passwords, which are mounted here to the kafka container.
// However, the kcat-prober container has TLS certificates mounted, which can be used by kcat to connect to Kafka.
[source,console]
----
$ kubectl exec -it kafka-broker-default-0 -c kcat-prober -- /bin/bash -c "/stackable/kcat -b localhost:9093 -X security.protocol=SSL -X ssl.key.location=/stackable/tls-kcat/tls.key -X ssl.certificate.location=/stackable/tls-kcat/tls.crt -X ssl.ca.location=/stackable/tls-kcat/ca.crt -L"
Metadata for all topics (from broker -1: ssl://localhost:9093/bootstrap):
1 brokers:
broker 1001 at 172.19.0.3:31041 (controller)
2 topics:
topic "stations" with 8 partitions:
partition 0, leader 1001, replicas: 1001, isrs: 1001
partition 1, leader 1001, replicas: 1001, isrs: 1001
partition 2, leader 1001, replicas: 1001, isrs: 1001
partition 3, leader 1001, replicas: 1001, isrs: 1001
partition 4, leader 1001, replicas: 1001, isrs: 1001
partition 5, leader 1001, replicas: 1001, isrs: 1001
partition 6, leader 1001, replicas: 1001, isrs: 1001
partition 7, leader 1001, replicas: 1001, isrs: 1001
topic "measurements" with 8 partitions:
partition 0, leader 1001, replicas: 1001, isrs: 1001
partition 1, leader 1001, replicas: 1001, isrs: 1001
partition 2, leader 1001, replicas: 1001, isrs: 1001
partition 3, leader 1001, replicas: 1001, isrs: 1001
partition 4, leader 1001, replicas: 1001, isrs: 1001
partition 5, leader 1001, replicas: 1001, isrs: 1001
partition 6, leader 1001, replicas: 1001, isrs: 1001
partition 7, leader 1001, replicas: 1001, isrs: 1001
----

You can see that Kafka consists of one broker, and the topics `stations` and `measurements` have been created with eight
partitions each.

=== Show Sample Records

To see some records sent to Kafka, run the following commands. You can change the number of records to
print via the `-c` parameter.

[source,console]
----
$ kubectl exec -it kafka-broker-default-0 -c kcat-prober -- /bin/bash -c "/stackable/kcat -b localhost:9093 -X security.protocol=SSL -X ssl.key.location=/stackable/tls-kcat/tls.key -X ssl.certificate.location=/stackable/tls-kcat/tls.crt -X ssl.ca.location=/stackable/tls-kcat/ca.crt -C -t stations -c 2"
----

Below is an example of the output of two records:

[source,json]
----
{
"uuid": "47174d8f-1b8e-4599-8a59-b580dd55bc87",
"number": 48900237,
"shortname": "EITZE",
"longname": "EITZE",
"km": 9.56,
"agency": "VERDEN",
"longitude": 9.2767694354,
"latitude": 52.9040654474,
"water": {
"shortname": "ALLER",
"longname": "ALLER"
}
}
{
"uuid": "5aaed954-de4e-4528-8f65-f3f530bc8325",
"number": 48900204,
"shortname": "RETHEM",
"longname": "RETHEM",
"km": 34.22,
"agency": "VERDEN",
"longitude": 9.3828408101,
"latitude": 52.7890975921,
"water": {
"shortname": "ALLER",
"longname": "ALLER"
}
}
----

[source,console]
----
$ kubectl exec -it kafka-broker-default-0 -c kcat-prober -- /bin/bash -c "/stackable/kcat -b localhost:9093 -X security.protocol=SSL -X ssl.key.location=/stackable/tls-kcat/tls.key -X ssl.certificate.location=/stackable/tls-kcat/tls.crt -X ssl.ca.location=/stackable/tls-kcat/ca.crt -C -t measurements -c 3"
----

Below is an example of the output of three records:

[source,json]
----
{
"timestamp": 1658151900000,
"value": 221,
"station_uuid": "47174d8f-1b8e-4599-8a59-b580dd55bc87"
}
{
"timestamp": 1658152800000,
"value": 220,
"station_uuid": "47174d8f-1b8e-4599-8a59-b580dd55bc87"
}
{
"timestamp": 1658153700000,
"value": 220,
"station_uuid": "47174d8f-1b8e-4599-8a59-b580dd55bc87"
}
----

The records of the two topics only contain the needed data. The measurement records contain a `station_uuid` for the
measuring station. The relationship is illustrated below.

image::nifi-kafka-druid-water-level-data/topics.png[]

The reason for splitting the data up into two different topics is the improved performance. One more straightforward
solution would be to use a single topic and produce records like the following:

[source,json]
----
{
"uuid": "47174d8f-1b8e-4599-8a59-b580dd55bc87",
"number": 48900237,
"shortname": "EITZE",
"longname": "EITZE",
"km": 9.56,
"agency": "VERDEN",
"longitude": 9.2767694354,
"latitude": 52.9040654474,
"water": {
"shortname": "ALLER",
"longname": "ALLER"
},
"timestamp": 1658151900000,
"value": 221
}
----

Notice the two last attributes that differ from the previously shown `stations` records. The obvious downside is that
every measurement (multiple millions of it) has to contain all the data known about the station it was measured at. This
often leads to transmitting and storing duplicated information, e.g., the longitude of a station, resulting in increased
network traffic and storage usage. The solution is only to send a station's known/needed data or measurement data. This
process is called data normalization. The downside is that when analyzing the data, you need to combine the records from
multiple tables in Druid (`stations` and `measurements`).

If you are interested in how many records have been produced to the Kafka topic so far, use the following command. It
will print the last record produced to the topic partition, formatted with the pattern specified in the `-f` parameter.
The given pattern will print some metadata of the record.

[source,console]
----
$ kubectl exec -it kafka-broker-default-0 -c kcat-prober -- /bin/bash -c "/stackable/kcat -b localhost:9093 -X security.protocol=SSL -X ssl.key.location=/stackable/tls-kcat/tls.key -X ssl.certificate.location=/stackable/tls-kcat/tls.crt -X ssl.ca.location=/stackable/tls-kcat/ca.crt -C -t measurements -o -8 -c 8 -f 'Topic %t / Partition %p / Offset: %o / Timestamp: %T\n'"
Topic measurements / Partition 0 / Offset: 1324098 / Timestamp: 1680606104652
Topic measurements / Partition 1 / Offset: 1346816 / Timestamp: 1680606100462
Topic measurements / Partition 2 / Offset: 1339363 / Timestamp: 1680606100461
Topic measurements / Partition 3 / Offset: 1352787 / Timestamp: 1680606104652
Topic measurements / Partition 4 / Offset: 1330144 / Timestamp: 1680606098368
Topic measurements / Partition 5 / Offset: 1340226 / Timestamp: 1680606104652
Topic measurements / Partition 6 / Offset: 1320125 / Timestamp: 1680606100462
Topic measurements / Partition 7 / Offset: 1317719 / Timestamp: 1680606098368
----

If you calculate `1,324,098` records * `8` partitions, you end up with ~ 10,592,784 records. The output also shows that
the last measurement record was produced at the timestamp `1680606104652`, translating to
`Di 4. Apr 13:01:44 CEST 2023` (using the command `date -d @1680606104`).

== NiFi

NiFi fetches water-level data from the internet and ingests it into Kafka in real time. This demo includes a workflow
Expand Down
15 changes: 5 additions & 10 deletions stacks/data-lakehouse-iceberg-trino-spark/kafka.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,12 @@ spec:
image:
productVersion: 3.9.1
clusterConfig:
zookeeperConfigMapName: kafka-znode
authentication:
- authenticationClass: kafka-client-tls
controllers:
roleGroups:
default:
replicas: 1
brokers:
config:
resources:
Expand All @@ -25,7 +28,7 @@ spec:
default:
replicas: 5
configOverrides:
server.properties:
broker.properties:
num.partitions: "27"
log.segment.bytes: "50000000" # 0.5GB
log.retention.bytes: "2000000000" # 2 GB. Should keep between 2.0 and 2.5GB
Expand All @@ -38,11 +41,3 @@ spec:
provider:
tls:
clientCertSecretClass: tls
---
apiVersion: zookeeper.stackable.tech/v1alpha1
kind: ZookeeperZnode
metadata:
name: kafka-znode
spec:
clusterRef:
name: zookeeper
17 changes: 6 additions & 11 deletions stacks/nifi-kafka-druid-superset-s3/kafka.yaml
Original file line number Diff line number Diff line change
@@ -1,23 +1,18 @@
---
apiVersion: zookeeper.stackable.tech/v1alpha1
kind: ZookeeperZnode
metadata:
name: kafka-znode
spec:
clusterRef:
name: zookeeper
---
apiVersion: kafka.stackable.tech/v1alpha1
kind: KafkaCluster
metadata:
name: kafka
spec:
image:
productVersion: 3.9.1
productVersion: 4.1.0
clusterConfig:
zookeeperConfigMapName: kafka-znode
authentication:
- authenticationClass: kafka-client-tls
controllers:
roleGroups:
default:
replicas: 1
brokers:
config:
bootstrapListenerClass: external-stable
Expand All @@ -35,7 +30,7 @@ spec:
default:
replicas: 1
configOverrides:
server.properties:
broker.properties:
num.partitions: "8"
# We have
# 1 brokers
Expand Down