@@ -92,6 +92,215 @@ $ stackablectl stacklet list
9292
9393include::partial$instance-hint.adoc[]
9494
95+ == Inspect the data in Kafka
96+
97+ Kafka is an event streaming platform to stream the data in near real-time. All the messages put in and read from Kafka
98+ are structured in dedicated queues called topics. The test data will be put into topics called stations and measurements. The records
99+ are produced (put in) by the test data generator and consumed (read) by Druid afterwards in the same order they were
100+ created.
101+
102+ To interact with Kafka you will use the client scripts shipped with the Kafka image. Kafka uses mutual TLS, so clients
103+ wanting to connect to Kafka must present a valid TLS certificate. The easiest way to obtain this is to shell into the
104+ `kafka-broker-default-0` Pod, as we will do in the following section for demonstration purposes. For a production setup,
105+ you should spin up a dedicated Pod provisioned with a certificate acting as a Kafka client instead of shell-ing into the
106+ Kafka Pod.
107+
108+ === List the available Topics
109+
110+ You can execute a command on the Kafka broker to list the available topics as follows:
111+
112+ [source,console]
113+ ----
114+ $ kubectl k exec kafka-broker-default-0 -c kafka -- \
115+ /stackable/kafka/bin/kafka-topics.sh \
116+ --describe \
117+ --bootstrap-server kafka-broker-default-headless.default.svc.cluster.local:9093 \
118+ --command-config /stackable/config/client.properties
119+ ...
120+ Topic: measurements TopicId: w9qYb3GaTvCMZj4G8pkPPQ PartitionCount: 8 ReplicationFactor: 1 Configs: min.insync.replicas=1,segment.bytes=100000000,retention.bytes=900000000
121+ Topic: measurements Partition: 0 Leader: 1243966388 Replicas: 1243966388 Isr: 1243966388 Elr: LastKnownElr:
122+ Topic: measurements Partition: 1 Leader: 1243966388 Replicas: 1243966388 Isr: 1243966388 Elr: LastKnownElr:
123+ Topic: measurements Partition: 2 Leader: 1243966388 Replicas: 1243966388 Isr: 1243966388 Elr: LastKnownElr:
124+ Topic: measurements Partition: 3 Leader: 1243966388 Replicas: 1243966388 Isr: 1243966388 Elr: LastKnownElr:
125+ Topic: measurements Partition: 4 Leader: 1243966388 Replicas: 1243966388 Isr: 1243966388 Elr: LastKnownElr:
126+ Topic: measurements Partition: 5 Leader: 1243966388 Replicas: 1243966388 Isr: 1243966388 Elr: LastKnownElr:
127+ Topic: measurements Partition: 6 Leader: 1243966388 Replicas: 1243966388 Isr: 1243966388 Elr: LastKnownElr:
128+ Topic: measurements Partition: 7 Leader: 1243966388 Replicas: 1243966388 Isr: 1243966388 Elr: LastKnownElr:
129+ Topic: stations TopicId: QkKmvOagQkG4QbeS0IZ_Tg PartitionCount: 8 ReplicationFactor: 1 Configs: min.insync.replicas=1,segment.bytes=100000000,retention.bytes=900000000
130+ Topic: stations Partition: 0 Leader: 1243966388 Replicas: 1243966388 Isr: 1243966388 Elr: LastKnownElr:
131+ Topic: stations Partition: 1 Leader: 1243966388 Replicas: 1243966388 Isr: 1243966388 Elr: LastKnownElr:
132+ Topic: stations Partition: 2 Leader: 1243966388 Replicas: 1243966388 Isr: 1243966388 Elr: LastKnownElr:
133+ Topic: stations Partition: 3 Leader: 1243966388 Replicas: 1243966388 Isr: 1243966388 Elr: LastKnownElr:
134+ Topic: stations Partition: 4 Leader: 1243966388 Replicas: 1243966388 Isr: 1243966388 Elr: LastKnownElr:
135+ Topic: stations Partition: 5 Leader: 1243966388 Replicas: 1243966388 Isr: 1243966388 Elr: LastKnownElr:
136+ Topic: stations Partition: 6 Leader: 1243966388 Replicas: 1243966388 Isr: 1243966388 Elr: LastKnownElr:
137+ Topic: stations Partition: 7 Leader: 1243966388 Replicas: 1243966388 Isr: 1243966388 Elr: LastKnownElr:
138+ ----
139+
140+ You can see that Kafka consists of one broker, and the topics `stations` and `measurements` have been created with eight
141+ partitions each.
142+
143+ === Show Sample Records
144+
145+ To see some records sent to Kafka, run the following commands. You can change the number of records to
146+ print via the `--max-messages` parameter.
147+
148+ [source,console]
149+ ----
150+ $ kubectl exec kafka-broker-default-0 -c kafka -- \
151+ /stackable/kafka/bin/kafka-console-consumer.sh \
152+ --bootstrap-server kafka-broker-default-headless.default.svc.cluster.local:9093 \
153+ --consumer.config /stackable/config/client.properties \
154+ --topic stations \
155+ --offset earliest \
156+ --partition 0 \
157+ --max-messages 2
158+ ----
159+
160+ Below is an example of the output of two records:
161+
162+ [source,json]
163+ ----
164+ {
165+ "uuid": "47174d8f-1b8e-4599-8a59-b580dd55bc87",
166+ "number": 48900237,
167+ "shortname": "EITZE",
168+ "longname": "EITZE",
169+ "km": 9.56,
170+ "agency": "VERDEN",
171+ "longitude": 9.2767694354,
172+ "latitude": 52.9040654474,
173+ "water": {
174+ "shortname": "ALLER",
175+ "longname": "ALLER"
176+ }
177+ }
178+ {
179+ "uuid": "5aaed954-de4e-4528-8f65-f3f530bc8325",
180+ "number": 48900204,
181+ "shortname": "RETHEM",
182+ "longname": "RETHEM",
183+ "km": 34.22,
184+ "agency": "VERDEN",
185+ "longitude": 9.3828408101,
186+ "latitude": 52.7890975921,
187+ "water": {
188+ "shortname": "ALLER",
189+ "longname": "ALLER"
190+ }
191+ }
192+ ----
193+
194+ [source,console]
195+ ----
196+ $ kubectl exec kafka-broker-default-0 -c kafka -- \
197+ /stackable/kafka/bin/kafka-console-consumer.sh \
198+ --bootstrap-server kafka-broker-default-headless.default.svc.cluster.local:9093 \
199+ --consumer.config /stackable/config/client.properties \
200+ --topic measurements \
201+ --offset earliest \
202+ --partition 0 \
203+ --max-messages 3
204+ ----
205+
206+ Below is an example of the output of three records:
207+
208+ [source,json]
209+ ----
210+ {
211+ "timestamp": 1658151900000,
212+ "value": 221,
213+ "station_uuid": "47174d8f-1b8e-4599-8a59-b580dd55bc87"
214+ }
215+ {
216+ "timestamp": 1658152800000,
217+ "value": 220,
218+ "station_uuid": "47174d8f-1b8e-4599-8a59-b580dd55bc87"
219+ }
220+ {
221+ "timestamp": 1658153700000,
222+ "value": 220,
223+ "station_uuid": "47174d8f-1b8e-4599-8a59-b580dd55bc87"
224+ }
225+ ----
226+
227+ The records of the two topics only contain the needed data. The measurement records contain a `station_uuid` for the
228+ measuring station. The relationship is illustrated below.
229+
230+ image::nifi-kafka-druid-water-level-data/topics.png[]
231+
232+ The reason for splitting the data up into two different topics is the improved performance. One more straightforward
233+ solution would be to use a single topic and produce records like the following:
234+
235+ [source,json]
236+ ----
237+ {
238+ "uuid": "47174d8f-1b8e-4599-8a59-b580dd55bc87",
239+ "number": 48900237,
240+ "shortname": "EITZE",
241+ "longname": "EITZE",
242+ "km": 9.56,
243+ "agency": "VERDEN",
244+ "longitude": 9.2767694354,
245+ "latitude": 52.9040654474,
246+ "water": {
247+ "shortname": "ALLER",
248+ "longname": "ALLER"
249+ },
250+ "timestamp": 1658151900000,
251+ "value": 221
252+ }
253+ ----
254+
255+ Notice the two last attributes that differ from the previously shown `stations` records. The obvious downside is that
256+ every measurement (multiple millions of it) has to contain all the data known about the station it was measured at. This
257+ often leads to transmitting and storing duplicated information, e.g., the longitude of a station, resulting in increased
258+ network traffic and storage usage. The solution is only to send a station's known/needed data or measurement data. This
259+ process is called data normalization. The downside is that when analyzing the data, you need to combine the records from
260+ multiple tables in Druid (`stations` and `measurements`).
261+
262+ If you are interested in how many records have been produced to the Kafka topic so far, use the following command. It
263+ will print the last record produced to the topic partition, formatted with the pattern specified in the `-f` parameter.
264+ The given pattern will print some metadata of the record.
265+
266+ [source,console]
267+ ----
268+ $ kubectl exec kafka-broker-default-0 -c kafka -- \
269+ /stackable/kafka/bin/kafka-get-offsets.sh \
270+ --bootstrap-server kafka-broker-default-headless.default.svc.cluster.local:9093 \
271+ --command-config /stackable/config/client.properties \
272+ --topic measurements
273+ ...
274+ measurements:0:1366665
275+ measurements:1:1364930
276+ measurements:2:1395607
277+ measurements:3:1390762
278+ measurements:4:1368829
279+ measurements:5:1362539
280+ measurements:6:1344362
281+ measurements:7:1369651
282+ ----
283+
284+ Multiplying `1,324,098` records by `8` partitions, we end up with ~ 10,592,784 records.
285+
286+ To inspect the last produced records, use the following command. Here, we consume the last three records from partition
287+ `0` of the `measurements` topic.
288+
289+ [source,console]
290+ ----
291+ $ kubectl exec kafka-broker-default-0 -c kafka -- \
292+ /stackable/kafka/bin/kafka-console-consumer.sh \
293+ --bootstrap-server kafka-broker-default-headless.default.svc.cluster.local:9093 \
294+ --consumer.config /stackable/config/client.properties \
295+ --topic measurements \
296+ --offset latest \
297+ --partition 0 \
298+ --max-messages 3
299+ -...
300+ {"timestamp":"2025-10-21T11:00:00+02:00","value":369.54,"station_uuid":"5cdc6555-87d7-4fcd-834d-cbbe24c9d08b"}
301+ {"timestamp":"2025-10-21T11:15:00+02:00","value":369.54,"station_uuid":"5cdc6555-87d7-4fcd-834d-cbbe24c9d08b"}
302+ {"timestamp":"2025-10-21T11:00:00+02:00","value":8.0,"station_uuid":"7deedc21-2878-40cc-ab47-f6da0d9002f1"}
303+ ----
95304== NiFi
96305
97306NiFi fetches water-level data from the internet and ingests it into Kafka in real time. This demo includes a workflow
0 commit comments