Skip to content

Commit f139bc5

Browse files
committed
in_systemd: allow a parser to be specified as part of the systemd unit
Similar to how we use kubernetes annotations to determine a parser, this uses custom fields in systemd units to configure a parser per systemd unit. In the unit file this is configured as: ``` [Service] ... LogExtraFields=FLUENT_BIT_PARSER=logfmt ``` Signed-off-by: Dennis Kaarsemaker <[email protected]>
1 parent e039c56 commit f139bc5

File tree

1 file changed

+94
-0
lines changed

1 file changed

+94
-0
lines changed

plugins/in_systemd/systemd.c

Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@
2121
#include <fluent-bit/flb_input_plugin.h>
2222
#include <fluent-bit/flb_config.h>
2323
#include <fluent-bit/flb_time.h>
24+
#include <fluent-bit/flb_parser.h>
25+
#include <fluent-bit/flb_log_event_decoder.h>
2426

2527
#include "systemd_config.h"
2628
#include "systemd_db.h"
@@ -70,6 +72,59 @@ static int tag_compose(const char *tag, const char *unit_name,
7072
return 0;
7173
}
7274

75+
static int flb_systemd_repack_map(struct flb_log_event_encoder *encoder,
76+
char *data,
77+
size_t data_size)
78+
{
79+
msgpack_unpacked source_map;
80+
size_t offset;
81+
int result;
82+
size_t index;
83+
msgpack_object value;
84+
msgpack_object key;
85+
86+
result = FLB_EVENT_ENCODER_SUCCESS;
87+
88+
if (data_size > 0) {
89+
msgpack_unpacked_init(&source_map);
90+
91+
offset = 0;
92+
result = msgpack_unpack_next(&source_map,
93+
data,
94+
data_size,
95+
&offset);
96+
97+
if (result == MSGPACK_UNPACK_SUCCESS) {
98+
result = FLB_EVENT_ENCODER_SUCCESS;
99+
}
100+
else {
101+
result = FLB_EVENT_DECODER_ERROR_DESERIALIZATION_FAILURE;
102+
}
103+
104+
for (index = 0;
105+
index < source_map.data.via.map.size &&
106+
result == FLB_EVENT_ENCODER_SUCCESS;
107+
index++) {
108+
key = source_map.data.via.map.ptr[index].key;
109+
value = source_map.data.via.map.ptr[index].val;
110+
111+
result = flb_log_event_encoder_append_body_msgpack_object(
112+
encoder,
113+
&key);
114+
115+
if (result == FLB_EVENT_ENCODER_SUCCESS) {
116+
result = flb_log_event_encoder_append_body_msgpack_object(
117+
encoder,
118+
&value);
119+
}
120+
}
121+
122+
msgpack_unpacked_destroy(&source_map);
123+
}
124+
125+
return result;
126+
}
127+
73128
static int in_systemd_collect(struct flb_input_instance *ins,
74129
struct flb_config *config, void *in_context)
75130
{
@@ -84,11 +139,14 @@ static int in_systemd_collect(struct flb_input_instance *ins,
84139
long nsec;
85140
uint64_t usec;
86141
size_t length;
142+
size_t plength;
87143
size_t threshold;
144+
char *name;
88145
const char *sep;
89146
const char *key;
90147
const char *val;
91148
char *buf = NULL;
149+
void *pbuf = NULL;
92150
#ifdef FLB_HAVE_SQLDB
93151
char *cursor = NULL;
94152
#endif
@@ -100,6 +158,7 @@ static int in_systemd_collect(struct flb_input_instance *ins,
100158
const void *data;
101159
struct flb_systemd_config *ctx = in_context;
102160
struct flb_time tm;
161+
struct flb_parser *parser;
103162

104163
/* Restricted by mem_buf_limit */
105164
if (flb_input_buf_paused(ins) == FLB_TRUE) {
@@ -134,6 +193,8 @@ static int in_systemd_collect(struct flb_input_instance *ins,
134193
}
135194

136195
while ((ret_j = sd_journal_next(ctx->j)) > 0) {
196+
/* Reset dynamic parser */
197+
parser = NULL;
137198
/* If the tag is composed dynamically, gather the Systemd Unit name */
138199
if (ctx->dynamic_tag) {
139200
ret = sd_journal_get_data(ctx->j, "_SYSTEMD_UNIT", &data, &length);
@@ -154,6 +215,17 @@ static int in_systemd_collect(struct flb_input_instance *ins,
154215
tag_len = ctx->ins->tag_len;
155216
}
156217

218+
/* Find the parser, if specified */
219+
ret = sd_journal_get_data(ctx->j, "FLUENT_BIT_PARSER", &data, &length);
220+
if (ret == 0) {
221+
name = flb_strndup((const char *)(data+18), length-18);
222+
parser = flb_parser_get(name, config);
223+
if (!parser) {
224+
flb_plg_error(ctx->ins, "no such parser: '%s'", name);
225+
}
226+
free(name);
227+
}
228+
157229
if (last_tag_len == 0) {
158230
strncpy(last_tag, tag, tag_len);
159231
last_tag_len = tag_len;
@@ -219,6 +291,28 @@ static int in_systemd_collect(struct flb_input_instance *ins,
219291

220292
len = (sep - key);
221293

294+
if (strncmp(key, "FLUENT_BIT_PARSER", len) == 0) {
295+
continue;
296+
}
297+
298+
/* If this is the message, apply the parser if any is specified */
299+
if (parser && strncmp(key, "MESSAGE", len) == 0) {
300+
val = sep + 1;
301+
len = length - (sep - key) - 1;
302+
ret = flb_parser_do(parser, val, len, &pbuf, &plength, &tm);
303+
if (ret != -1) {
304+
ret = flb_systemd_repack_map(ctx->log_encoder, pbuf, plength);
305+
flb_free(pbuf);
306+
continue;
307+
}
308+
/*
309+
* If the parser failed, reset the return code
310+
* to append the unparsed message as normal
311+
*/
312+
len = sep - key;
313+
ret = FLB_EVENT_ENCODER_SUCCESS;
314+
}
315+
222316
if (ret == FLB_EVENT_ENCODER_SUCCESS) {
223317
ret = flb_log_event_encoder_append_body_string_length(
224318
ctx->log_encoder, len);

0 commit comments

Comments
 (0)