Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 13 additions & 8 deletions config/_default/menus/main.en.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5674,46 +5674,51 @@ menu:
parent: observability_pipelines_destinations
identifier: observability_pipelines_http_client
weight: 1112
- name: Kafka
url: observability_pipelines/destinations/kafka/
parent: observability_pipelines_destinations
identifier: observability_pipelines_kafka
weight: 1113
- name: Microsoft Sentinel
identifier: observability_pipelines_microsoft_sentinel
url: /observability_pipelines/destinations/microsoft_sentinel/
parent: observability_pipelines_destinations
weight: 1113
weight: 1114
- name: New Relic
identifier: observability_pipelines_new_relic
url: /observability_pipelines/destinations/new_relic/
parent: observability_pipelines_destinations
weight: 1114
weight: 1115
- name: OpenSearch
url: observability_pipelines/destinations/opensearch
parent: observability_pipelines_destinations
identifier: observability_pipelines_opensearch
weight: 1115
weight: 1116
- name: SentinelOne
url: observability_pipelines/destinations/sentinelone
parent: observability_pipelines_destinations
identifier: observability_pipelines_sentinelone
weight: 1116
weight: 1117
- name: Socket
url: observability_pipelines/destinations/socket
parent: observability_pipelines_destinations
identifier: observability_pipelines_socket
weight: 1117
weight: 1118
- name: Splunk HEC
url: observability_pipelines/destinations/splunk_hec
parent: observability_pipelines_destinations
identifier: observability_pipelines_splunk_hec
weight: 1118
weight: 1119
- name: Sumo Logic Hosted Collector
url: observability_pipelines/destinations/sumo_logic_hosted_collector
parent: observability_pipelines_destinations
identifier: observability_pipelines_sumo_logic_hosted_collector
weight: 1119
weight: 1120
- name: Syslog
url: observability_pipelines/destinations/syslog
parent: observability_pipelines_destinations
identifier: observability_pipelines_syslog
weight: 1120
weight: 1121
- name: Environment Variables
url: observability_pipelines/environment_variables/
parent: observability_pipelines
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,47 +145,11 @@ See the [Observability Pipelines Metrics][8] for a full list of available health

#### Component metrics

Monitor the health of your Pub/Sub destination with the following key metrics:

`pipelines.component_sent_events_total`
: Events successfully delivered.

`pipelines.component_discarded_events_total`
: Events dropped.

`pipelines.component_errors_total`
: Errors in the destination component.

`pipelines.component_sent_event_bytes_total`
: Total event bytes sent.

`pipelines.utilization`
: Worker resource usage.
{{% observability_pipelines/metrics/component %}}

#### Buffer metrics (when buffering is enabled)

Track buffer behavior with these additional metrics:

`pipelines.buffer_events`
: Number of events currently in the buffer.

`pipelines.buffer_byte_size`
: Current buffer size in bytes.

`pipelines.buffer_received_events_total`
: Total events added to the buffer.

`pipelines.buffer_received_event_bytes_total`
: Total bytes added to the buffer.

`pipelines.buffer_sent_events_total`
: Total events successfully flushed from the buffer.

`pipelines.buffer_sent_event_bytes_total`
: Total bytes successfully flushed from the buffer.

`pipelines.buffer_discarded_events_total`
: Events discarded from the buffer (for example, due to overflow).
{{% observability_pipelines/metrics/buffer %}}

### Event batching

Expand Down
140 changes: 140 additions & 0 deletions content/en/observability_pipelines/destinations/kafka.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
---
title: Kafka Destination
disable_toc: false
---

## Overview

Use Observability Pipelines' Kafka destination to send logs to Kafka topics.

### When to use this destination

Common scenarios when you might use this destination:
- To route logs to the following destinations:
- [Clickhouse][1]: An open-source column-oriented database management system used for analyzing large volumes of logs.
- [Snowflake][2]: A data warehouse used for storage and query.
- Snowflake's API integration utilizes Kafka as a method to ingest logs into their platform.
- [Databricks][3]: A data lakehouse for analytics and storage.
- [Azure Event Hub][4]: An ingest and processing service in the Microsoft and Azure ecosystem.
- To route data to Kafka and use the Kafka Connect ecosystem.
- To process and normalize your data with Observability Pipelines before routing to Apache Spark with Kafka to analyze data and run machine learning workloads.

## Setup

Set up the Kafka destination and its environment variables when you [set up a pipeline][5]. The information below is configured in the pipelines UI.

### Set up the destination

1. Enter the name of the topic you want to send logs to.
1. In the **Encoding** dropdown menu, select either `JSON` or `Raw message` as the output format.

#### Optional settings

##### Enable TLS

Toggle the switch to enable **TLS**. The following certificate and key files are required.<br>**Note**: All file paths are made relative to the configuration data directory, which is `/var/lib/observability-pipelines-worker/config/` by default. See [Advanced Configurations][6] for more information. The file must be owned by the `observability-pipelines-worker group` and `observability-pipelines-worker` user, or at least readable by the group or user.
- `Server Certificate Path`: The path to the certificate file that has been signed by your Certificate Authority (CA) Root File in DER or PEM (X.509).
- `CA Certificate Path`: The path to the certificate file that is your Certificate Authority (CA) Root File in DER or PEM (X.509).
- `Private Key Path`: The path to the `.key` private key file that belongs to your Server Certificate Path in DER or PEM (PKCS#8) format.

##### Enable SASL authentication

1. Toggle the switch to enable **SASL Authentication**.
1. Select the mechanism (**PLAIN**, **SCHRAM-SHA-256**, or **SCHRAM-SHA-512**) in the dropdown menu.

##### Enable compression

1. Toggle switch to **Enable Compression**.
1. In the **Compression Algorithm** dropdown menu, select a compression algorithm (**gzip**, **zstd**, **lz4**, or **snappy**).
1. (Optional) Select a **Compression Level** in the dropdown menu. If the level is not specified, the algorithm's default level is used.

##### Buffering options (Preview)

Toggle the switch to enable **Buffering Options** ({{< tooltip glossary="preview" case="title" >}}).<br>**Note**: Contact your account manager to request access to the Preview.
- If disabled (default): Up to 500 events are buffered before flush.
- If enabled:
1. Select the buffer type you want to set.
- **Memory**: Fast, limited by RAM
- **Buffer size**: Durable, survives restarts
1. Enter the buffer size and select the unit.
- Maximum capacity in MB or GB.

##### Advanced options

Click **Advanced** if you want to set any of the following fields:

1. **Message Key Field**: Specify which log field contains the message key for partitioning, grouping, and ordering.
1. **Headers Key**: Specify which log field contains your Kafka headers. If left blank, no headers are written.
1. **Message Timeout (ms)**: Local message timeout, in milliseconds. Default is `300,000 ms`.
1. **Socket Timeout (ms)**: Default timeout, in milliseconds, for network requests. Default is `60,000 ms`.
1. **Rate Limit Events**: The maximum number of requests the Kafka client can send within the rate limit time window. Default is no rate limit.
1. **Rate Limit Time Window (secs)**: The time window used for the rate limit option.
- This setting has no effect if the rate limit for events is not set.
- Default is `1 second` if **Rate Limit Events** is set, but **Rate Limit Time Window** is not set.
1. To add additional [librdkafka options](#librdkafka-options), click **Add Option** and select an option in the dropdown menu.
1. Enter a value for that option.
1. Check your values against the [librdkafka documentation][7] to make sure they have the correct type and are within the set range.
1. Click **Add Option** to add another librdkafka option.

### Set environment variables

#### Kafka bootstrap servers
- The host and port of the Kafka bootstrap servers.
- This is the bootstrap server that the client uses to connect to the Kafka cluster and discover all the other hosts in the cluster. The host and port must be entered in the format of `host:port`, such as `10.14.22.123:9092`. If there is more than one server, use commas to separate them.
- Stored as the environment variable: `DD_OP_DESTINATION_KAFKA_BOOTSTRAP_SERVERS`.

#### TLS (when enabled)

- If TLS is enabled, the Kafka TLS passphrase is needed.
- Stored as the environment variable: `DD_OP_DESTINATION_KAFKA_KEY_PASS`.

#### SASL (when enabled)

- Kafka SASL username
- Stored as the environment variable: `DD_OP_DESTINATION_KAFKA_SASL_USERNAME`.
- Kafka SASL password
- Stored as the environment variable: `DD_OP_DESTINATION_KAFKA_SASL_PASSWORD`.

## librdkafka options

These are the available librdkafka options:

- client.id
- queue.buffering.max_messages
- transactional.id
- enable.idempotence
- acks

See the [librdkafka documentation][7] for more information and to ensure your values have the correct type and are within range.

## How the destination works

See the [Observability Pipelines Metrics][8] for a full list of available health metrics.

### Worker health metrics

#### Component metrics

{{% observability_pipelines/metrics/component %}}

#### Buffer metrics (when buffering is enabled)

{{% observability_pipelines/metrics/buffer %}}

### Event batching

A batch of events is flushed when one of these parameters is met. See [event batching][9] for more information.

| Max Events | Max Bytes | Timeout (seconds) |
|----------------|-----------------|---------------------|
| 10,000 | 1,000,000 | 1 |

[1]: https://clickhouse.com/docs/engines/table-engines/integrations/kafka
[2]: https://docs.snowflake.com/en/user-guide/kafka-connector
[3]: https://docs.databricks.com/aws/en/connect/streaming/kafka
[4]: https://learn.microsoft.com/en-us/azure/event-hubs/azure-event-hubs-apache-kafka-overview
[5]: https://app.datadoghq.com/observability-pipelines
[6]: /observability_pipelines/advanced_configurations/
[7]: https://docs.confluent.io/platform/current/clients/librdkafka/html/md_CONFIGURATION.html
[8]: /observability_pipelines/monitoring/metrics/
[9]: /observability_pipelines/destinations/#event-batching
22 changes: 22 additions & 0 deletions layouts/shortcodes/observability_pipelines/metrics/buffer.en.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
Track buffer behavior with these additional metrics:

`pipelines.buffer_events`
: Number of events currently in the buffer.

`pipelines.buffer_byte_size`
: Current buffer size in bytes.

`pipelines.buffer_received_events_total`
: Total events added to the buffer.

`pipelines.buffer_received_event_bytes_total`
: Total bytes added to the buffer.

`pipelines.buffer_sent_events_total`
: Total events successfully flushed from the buffer.

`pipelines.buffer_sent_event_bytes_total`
: Total bytes successfully flushed from the buffer.

`pipelines.buffer_discarded_events_total`
: Events discarded from the buffer (for example, due to overflow).
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
Monitor the health of your destination with the following key metrics:

`pipelines.component_sent_events_total`
: Events successfully delivered.

`pipelines.component_discarded_events_total`
: Events dropped.

`pipelines.component_errors_total`
: Errors in the destination component.

`pipelines.component_sent_events_bytes_total`
: Total event bytes sent.

`pipelines.utilization`
: Worker resource usage.
Loading