Skip to content

Conversation

@seveas
Copy link
Contributor

@seveas seveas commented Nov 8, 2024

Similar to how we use kubernetes annotations to determine a parser, this
uses custom fields in systemd units to configure a parser per systemd
unit.

In the unit file this is configured as:

[Service]
...
LogExtraFields=FLUENT_BIT_PARSER=logfmt

Enter [N/A] in the box, if an item is not applicable to your change.

Testing
Before we can approve your change; please submit the following in a comment:

  • Example configuration file for the change
  • Debug log output from testing the change
  • Attached Valgrind output that shows no leaks or memory corruption was found

If this is a change to packaging of containers or native binaries then please confirm it works for all targets.

  • [N/A] Run local packaging test showing all targets (including any new ones) build.
  • [N/A] Set ok-package-test label to test for all targets (requires maintainer to do).

Documentation

  • Documentation required for this feature

Backporting

  • [N/A] Backport to latest stable release.

Fluent Bit is licensed under Apache 2.0, by submitting this pull request I understand that this code will be released under the terms of that license.

@seveas
Copy link
Contributor Author

seveas commented Nov 8, 2024

Example config:

[SERVICE]
    flush           1
    daemon          Off
    log_level       info
    parsers_file    /etc/fluent-bit/parsers.conf
    http_server     On
    http_listen     127.0.0.1
    http_port       2020
    storage.metrics on

[INPUT]
    name  systemd
    tag   journald.*
    strip_underscores On
    alias journald
    db /tmp/fluent-debug.db
    Systemd_Filter _SYSTEMD_UNIT=metroplex.service

[OUTPUT]
    name  stdout
    match *
    alias debug

The systemd unit of the service whose logs we now parse:

[Unit]
Description=metroplex
After=network.target

[Service]
User=git
Group=git
WorkingDirectory=/data/mail-replies
ExecStart=[redacted]
Restart=always
LogExtraFields=FLUENT_BIT_PARSER=logfmt

[Install]
WantedBy=multi-user.target

Log output showing that MESSAGE is parsed and the parsed fields logged:

$ sudo build/bin/fluent-bit -c fluent-bit.conf
Fluent Bit v3.2.0
* Copyright (C) 2015-2024 The Fluent Bit Authors
* Fluent Bit is a CNCF sub-project under the umbrella of Fluentd
* https://fluentbit.io

______ _                  _    ______ _ _           _____  _____
|  ___| |                | |   | ___ (_) |         |____ |/ __  \
| |_  | |_   _  ___ _ __ | |_  | |_/ /_| |_  __   __   / /`' / /'
|  _| | | | | |/ _ \ '_ \| __| | ___ \ | __| \ \ / /   \ \  / /
| |   | | |_| |  __/ | | | |_  | |_/ / | |_   \ V /.___/ /./ /___
\_|   |_|\__,_|\___|_| |_|\__| \____/|_|\__|   \_/ \____(_)_____/


[2024/11/08 00:21:44] [ info] [fluent bit] version=3.2.0, commit=5277337894, pid=1916253
[2024/11/08 00:21:44] [ info] [storage] ver=1.5.2, type=memory, sync=normal, checksum=off, max_chunks_up=128
[2024/11/08 00:21:44] [ info] [simd    ] disabled
[2024/11/08 00:21:44] [ info] [cmetrics] version=0.9.9
[2024/11/08 00:21:44] [ info] [ctraces ] version=0.5.7
[2024/11/08 00:21:44] [ info] [input:systemd:journald] initializing
[2024/11/08 00:21:44] [ info] [input:systemd:journald] storage_strategy='memory' (memory only)
[2024/11/08 00:21:44] [ info] [input:systemd:journald] seek_cursor=s=66627c2174d44809afcb48277ff970bc;i=951... OK
[2024/11/08 00:21:44] [ info] [output:stdout:debug] worker #0 started
[2024/11/08 00:21:44] [ info] [http_server] listen iface=127.0.0.1 tcp_port=2020
[2024/11/08 00:21:44] [ info] [sp] stream processor started
[0] journald.metroplex.service: [[1731054102.527313000, {}], {"BOOT_ID"=>"c682e3c02a264e16a841c85a1b640321", "MACHINE_ID"=>"97f6a5230d6b4c7f82404d364aee69be", "HOSTNAME"=>"[redacted in pr comment]", "PRIORITY"=>"6", "CAP_EFFECTIVE"=>"0", "SELINUX_CONTEXT"=>"unconfined
", "TRANSPORT"=>"stdout", "STREAM_ID"=>"63153e5e2d6c463c93b013e3ed23f0bf", "SYSLOG_FACILITY"=>"3", "SYSLOG_IDENTIFIER"=>"rake", "app"=>"metroplex", "environment"=>"staging", "log"=>"[redacted in pr comment]", "level"=>"debug", "PID"=>"1864387", "UID"=>"500", "GID"=>"500", "COMM"=>"ruby", "EXE"=>"/usr/share/rbenv/versions/3.2.2/bin/ruby", "CMDLINE"=>"[redacted in pr comment]", "SYSTEMD_CGROUP"=>"/system.slice/metroplex.service", "SYSTEMD_UNIT"=>"metroplex.service", "SYSTEMD_SLICE"=>"system.slice", "SYSTEMD_INVOCATION_ID"=>"3d15e21185d441dcb9bb41a03c4a32b5"}]

@seveas
Copy link
Contributor Author

seveas commented Nov 8, 2024

Valgrind is happy:

$ sudo valgrind --leak-check=yes build/bin/fluent-bit -c fluent-bit.conf
==1920220== Memcheck, a memory error detector
==1920220== Copyright (C) 2002-2017, and GNU GPL'd, by Julian Seward et al.
==1920220== Using Valgrind-3.15.0 and LibVEX; rerun with -h for copyright info
==1920220== Command: build/bin/fluent-bit -c fluent-bit.conf
==1920220==
Fluent Bit v3.2.0
* Copyright (C) 2015-2024 The Fluent Bit Authors
* Fluent Bit is a CNCF sub-project under the umbrella of Fluentd
* https://fluentbit.io

______ _                  _    ______ _ _           _____  _____
|  ___| |                | |   | ___ (_) |         |____ |/ __  \
| |_  | |_   _  ___ _ __ | |_  | |_/ /_| |_  __   __   / /`' / /'
|  _| | | | | |/ _ \ '_ \| __| | ___ \ | __| \ \ / /   \ \  / /
| |   | | |_| |  __/ | | | |_  | |_/ / | |_   \ V /.___/ /./ /___
\_|   |_|\__,_|\___|_| |_|\__| \____/|_|\__|   \_/ \____(_)_____/


[2024/11/08 01:16:10] [ info] [fluent bit] version=3.2.0, commit=5277337894, pid=1920220
[2024/11/08 01:16:10] [ info] [storage] ver=1.5.2, type=memory, sync=normal, checksum=off, max_chunks_up=128
[2024/11/08 01:16:10] [ info] [simd    ] disabled
[2024/11/08 01:16:10] [ info] [cmetrics] version=0.9.9
[2024/11/08 01:16:10] [ info] [ctraces ] version=0.5.7
[2024/11/08 01:16:10] [ info] [input:systemd:journald] initializing
[2024/11/08 01:16:10] [ info] [input:systemd:journald] storage_strategy='memory' (memory only)
[2024/11/08 01:16:11] [ info] [input:systemd:journald] seek_cursor=s=66627c2174d44809afcb48277ff970bc;i=952... OK
[2024/11/08 01:16:11] [ info] [output:stdout:debug] worker #0 started
[2024/11/08 01:16:11] [ info] [http_server] listen iface=127.0.0.1 tcp_port=2020
[2024/11/08 01:16:11] [ info] [sp] stream processor started
==1920220== Warning: client switching stacks?  SP change: 0x3b3175b8 --> 0x61486c0
==1920220==          to suppress, use: --max-stackframe=891088632 or greater
==1920220== Warning: client switching stacks?  SP change: 0x61485a8 --> 0x3b3175b8
==1920220==          to suppress, use: --max-stackframe=891088912 or greater
==1920220== Warning: client switching stacks?  SP change: 0x3b3175b8 --> 0x61485a8
==1920220==          to suppress, use: --max-stackframe=891088912 or greater
==1920220==          further instances of this message will not be shown.
[0] journald.metroplex.service: [[1731057365.842950000, {}], {"BOOT_ID"=>"c682e3c02a264e16a841c85a1b640321", "MACHINE_ID"=>"97f6a5230d6b4c7f82404d364aee69be", [...further logs redacted for pr comment...]}]
[1] journald.metroplex.service: [[1731057370.848281000, {}], [...further logs redacted for pr comment...]}]
^C[2024/11/08 01:16:14] [engine] caught signal (SIGINT)
[2024/11/08 01:16:14] [ warn] [engine] service will shutdown in max 5 seconds
[2024/11/08 01:16:14] [ info] [input] pausing journald
[2024/11/08 01:16:14] [ info] [engine] service has stopped (0 pending tasks)
[2024/11/08 01:16:14] [ info] [input] pausing journald
[2024/11/08 01:16:14] [ info] [output:stdout:debug] thread worker #0 stopping...
[2024/11/08 01:16:14] [ info] [output:stdout:debug] thread worker #0 stopped
==1920220==
==1920220== HEAP SUMMARY:
==1920220==     in use at exit: 0 bytes in 0 blocks
==1920220==   total heap usage: 11,937 allocs, 11,937 frees, 1,941,754 bytes allocated
==1920220==
==1920220== All heap blocks were freed -- no leaks are possible
==1920220==
==1920220== For lists of detected and suppressed errors, rerun with: -s
==1920220== ERROR SUMMARY: 0 errors from 0 contexts (suppressed: 0 from 0)

@niedbalski
Copy link
Collaborator

@seveas I am trying to understand the use case here. Can you expand a bit on what is the problem you are trying to solve?

@seveas
Copy link
Contributor Author

seveas commented Nov 21, 2024

@niedbalski the trigger for this is the desire to move to yaml based configs, and the incompatibility of those and the way we build our configuration. We currently have a base config (not yaml) deployed to all hosts, it includes a systemd input and a strategically placed @INCLUDE, so per application/server we can drop in filters that set up parsing for specific systemd units.

This is impossible to do in yaml configs, as its inclusion system only supports including full pipelines. So, taking inspiration from the kubernetes filter, where you can have one input, one filter and using metadata attached to pods, we came up with this strategy where a systemd unit can specify a parser to use, removing the need for a separate filter per systemd service and removing the need for these includes.

Similar to how we use kubernetes annotations to determine a parser, this
uses custom fields in systemd units to configure a parser per systemd
unit.

In the unit file this is configured as:

```
[Service]
...
LogExtraFields=FLUENT_BIT_PARSER=logfmt
```

Signed-off-by: Dennis Kaarsemaker <[email protected]>
@github-actions
Copy link
Contributor

github-actions bot commented Sep 7, 2025

This PR is stale because it has been open 45 days with no activity. Remove stale label or comment or this will be closed in 10 days.

@github-actions github-actions bot added the Stale label Sep 7, 2025
@lecaros
Copy link
Contributor

lecaros commented Sep 10, 2025

hi. Can you elaborate on your use case?
can you clarify what do you mean with this?

This is impossible to do in yaml configs, as its inclusion system only supports including full pipelines.

@eschabell
Copy link
Contributor

@seveas can you address both the conflicts and questions asked by reviewers?

@github-actions github-actions bot removed the Stale label Oct 24, 2025
nourdouf added a commit to nourdouf/fluent-bit that referenced this pull request Nov 19, 2025
This merge integrates the parser functionality from PR fluent#9567 with the
current master branch's cfl_kvlist-based implementation.

Changes:
- Added flb_systemd_repack_map() function to repack parsed msgpack data
- Added parser lookup from FLUENT_BIT_PARSER systemd journal field
- Modified systemd_enumerate_data_store() to accept and apply parser
- Parser is applied to MESSAGE field when specified
- FLUENT_BIT_PARSER field is excluded from output
- Parsed data is repacked and integrated into the log encoder

The parser allows per-systemd-unit log parsing configuration via:
  [Service]
  LogExtraFields=FLUENT_BIT_PARSER=logfmt

Co-authored-by: Dennis Kaarsemaker <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants