Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 1 addition & 3 deletions filebeat/input/journald/environment_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,9 +112,7 @@
return
}

cleanTestName := strings.Replace(t.Name(), "\\", "_", -1)

f, err := os.CreateTemp(folder, cleanTestName+"-*")
f, err := os.CreateTemp(folder, "Filebeat-Test-Journald"+"-*")
if err != nil {
t.Logf("cannot create file for error logs: %s", err)
return
Expand Down Expand Up @@ -182,7 +180,7 @@
}

// waitUntilEventCount waits until total count events arrive to the client.
func (e *inputTestingEnvironment) waitUntilEventsPublished(published int) {

Check failure on line 183 in filebeat/input/journald/environment_test.go

View workflow job for this annotation

GitHub Actions / lint (ubuntu-latest)

func (*inputTestingEnvironment).waitUntilEventsPublished is unused (unused)
e.t.Helper()
msg := strings.Builder{}
require.Eventually(e.t, func() bool {
Expand Down
13 changes: 1 addition & 12 deletions filebeat/input/journald/input_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -509,18 +509,7 @@ func TestDoubleStarCanBeUsed(t *testing.T) {
}

env.startInput(ctx, inp)
// Wait for at least 11 events, this means more than one journal file
// has been read and ingested.
//
// When many small journal files are ingested, the journalctl process
// may exit before the input has fully read its stdout, which makes us
// discard the last few lines/entries.
//
// We still correctly track the cursor of events published to the output,
// however the cursor returned by journalctl on this set of handcrafted
// journal files leads to us skipping some events.
// See https://github.com/elastic/beats/issues/46904.
env.waitUntilEventsPublished(11)
env.waitUntilEventCount(len(srcFiles) * 10)
}

func decompress(t *testing.T, namegz string) string {
Expand Down
6 changes: 3 additions & 3 deletions filebeat/input/journald/pkg/journalctl/jctlmock_test.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 3 additions & 7 deletions filebeat/input/journald/pkg/journalctl/journalctl.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,24 +169,20 @@ func (j *journalctl) Kill() error {
// journalctl finished returning all data and exited successfully, if journalctl
// exited unexpectedly, then `err` is non-nil, `finished` is false and an empty
// byte array is returned.
func (j *journalctl) Next(cancel input.Canceler) ([]byte, bool, error) {
func (j *journalctl) Next(cancel input.Canceler) ([]byte, error) {
select {
case <-cancel.Done():
return []byte{}, false, ErrCancelled
return []byte{}, ErrCancelled
case d, open := <-j.dataChan:
if !open {
// Wait for the process to exit, so we can read the exit code.
j.waitDone.Wait()
if j.cmd.ProcessState.ExitCode() == 0 {
return []byte{}, true, nil
}
return []byte{},
false,
fmt.Errorf(
"no more data to read, journalctl exited unexpectedly, exit code: %d",
j.cmd.ProcessState.ExitCode())
}

return d, false, nil
return d, nil
}
}
171 changes: 60 additions & 111 deletions filebeat/input/journald/pkg/journalctl/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,20 +66,10 @@ type Jctl interface {
//
// If cancel is cancelled, Next returns a zero value JournalEntry
// and ErrCancelled.
//
// If finished is true, then journalctl returned all messages
// and exited successfully
Next(input.Canceler) (data []byte, finished bool, err error)
Next(input.Canceler) (data []byte, err error)
Kill() error
}

type readerState uint8

const (
readingOldEntriesState readerState = iota
followingState
)

// Reader reads entries from journald by calling `jouranlctl`
// and reading its output.
//
Expand All @@ -103,10 +93,10 @@ type Reader struct {
// like the message filters, format, etc
args []string

// firstRunArgs are the arguments used in the first call to
// extraArgs are the arguments used in the first call to
// journalctl that will be replaced by the cursor argument
// once data has been ingested
firstRunArgs []string
extraArgs []string

// cursor is the jornalctl cursor, it is also stored in Filebeat's registry
cursor string
Expand All @@ -117,29 +107,31 @@ type Reader struct {
jctlFactory JctlFactory

backoff backoff.Backoff
state readerState
}

// handleSeekAndCursor returns the correct arguments for seek and cursor.
// If there is a cursor, only the cursor is used, seek is ignored.
// If there is no cursor, then seek is used
// The bool parameter indicates whether there might be messages from
// the previous boots
func handleSeekAndCursor(mode SeekMode, since time.Duration, cursor string) ([]string, bool) {
func handleSeekAndCursor(mode SeekMode, since time.Duration, cursor string) []string {
if cursor != "" {
return []string{"--after-cursor", cursor}, true
return []string{"--after-cursor", cursor, "--boot", "all"}
}

switch mode {
case SeekSince:
return []string{"--since", time.Now().Add(since).Format(sinceTimeFormat)}, true
return []string{
"--since", time.Now().Add(since).Format(sinceTimeFormat),
"--boot", "all",
}
case SeekTail:
return []string{"--since", "now"}, false
return []string{"--since", "now"}
case SeekHead:
return []string{"--no-tail"}, true
return []string{"--no-tail", "--boot", "all"}
default:
// That should never happen
return []string{}, false
return []string{}
}
}

Expand Down Expand Up @@ -183,7 +175,7 @@ func New(

logger = logger.Named("reader")

args := []string{"--utc", "--output=json", "--no-pager", "--all"}
args := []string{"--utc", "--output=json", "--no-pager", "--all", "--follow"}

if file != "" && file != localSystemJournalID {

Expand Down Expand Up @@ -220,28 +212,20 @@ func New(
args = append(args, "--facility", fmt.Sprintf("%d", facility))
}

firstRunArgs, prevBoots := handleSeekAndCursor(mode, since, cursor)
state := readingOldEntriesState // Initial state
if !prevBoots {
state = followingState
}
extraArgs := handleSeekAndCursor(mode, since, cursor)

r := Reader{
logger: logger,
jctlLogger: logger.Named("journalctl-runner"),

args: args,
firstRunArgs: firstRunArgs,

state: state,
cursor: cursor,

logger: logger,
jctlLogger: logger.Named("journalctl-runner"),
args: args,
extraArgs: extraArgs,
cursor: cursor,
canceler: canceler,
jctlFactory: newJctl,
backoff: backoff.NewExpBackoff(canceler.Done(), 100*time.Millisecond, 2*time.Second),
}

if err := r.newJctl(firstRunArgs...); err != nil {
if err := r.newJctl(extraArgs...); err != nil {
return &Reader{}, err
}

Expand Down Expand Up @@ -274,94 +258,59 @@ func (r *Reader) Close() error {
// journalctl restarting it as necessary with a backoff strategy. It either
// returns a valid journald entry or ErrCancelled when the input is cancelled.
func (r *Reader) next(cancel input.Canceler) ([]byte, error) {
msg, finished, err := r.jctl.Next(cancel)
msg, err := r.jctl.Next(cancel)

// Check if the input has been cancelled
select {
case <-cancel.Done():
if cancel.Err() != nil {
// The caller is responsible for calling Reader.Close to terminate
// journalctl. Cancelling this canceller only means this Next call was
// cancelled. Because the input has been cancelled, we ignore the message
// and any error it might have returned.
return nil, ErrCancelled
default:
// Three options:
// - Journalctl finished reading messages from previous boots
// successfully, restart it with --follow flag.
// - Error, journalctl exited with an error, restart it in the same
// mode it was running.
// - No error, skip the default block and go parse the message

var extraArgs []string
var restart bool

// First of all: handle the error, if any
if err != nil {
r.logger.Warnf("reader error: '%s', restarting...", err)
restart = true

if r.cursor == "" && r.state == readingOldEntriesState {
// Corner case: journalctl exited with an error before reading the
// 1st message. This means we don't have a cursor and need to restart
// it with the initial arguments.
extraArgs = append(extraArgs, r.firstRunArgs...)
} else if r.cursor != "" {
// There is a cursor, so just append it to our arguments
extraArgs = append(extraArgs, "--after-cursor", r.cursor)

// Last, but not least, add "--follow" if we're in following mode
if r.state == followingState {
extraArgs = append(extraArgs, "--follow")
}
}

// Handle backoff
//
// If the last restart (if any) was more than 5s ago,
// recreate the backoff and do not wait.
// We recreate the backoff so r.backoff.Last().IsZero()
// will return true next time it's called making us to
// wait in case jouranlctl crashes in less than 5s.
if !r.backoff.Last().IsZero() && time.Since(r.backoff.Last()) > 5*time.Second {
r.backoff = backoff.NewExpBackoff(cancel.Done(), 100*time.Millisecond, 2*time.Second)
} else {
r.backoff.Wait()
}
}

// If journalctl finished reading the messages from previous boots
// and exited successfully
if finished {
restart = true
extraArgs = append(extraArgs, "--follow")
if r.cursor != "" {
// If there is a cursor, only use the cursor and the follow argument
extraArgs = append(extraArgs, "--after-cursor", r.cursor)
} else {
// If there is no cursor, it means the first successfully run
// did not return any event, so we have to restart with the
// --follow and all the initial args.
}

extraArgs = append(extraArgs, r.firstRunArgs...)
}
// Two options:
// - No error, return the message
// - Error, journalctl exited with an error, restart with
// backoff if necessary.
if err == nil {
return msg, nil
}
r.logger.Warnf("reader error: '%s', restarting...", err)

r.state = followingState
r.logger.Info("finished reading journal entries from all boots, restarting journalctl with follow flag")
}
// Handle backoff
//
// If the last restart (if any) was more than 5s ago,
// recreate the backoff and do not wait.
// We recreate the backoff so r.backoff.Last().IsZero()
// will return true next time it's called making us to
// wait in case jouranlctl crashes in less than 5s.
if !r.backoff.Last().IsZero() && time.Since(r.backoff.Last()) > 5*time.Second {
r.backoff = backoff.NewExpBackoff(cancel.Done(), 100*time.Millisecond, 2*time.Second)
} else {
r.backoff.Wait()
}

// Restart journalctl if needed
if restart {
if err := r.newJctl(extraArgs...); err != nil {
// If we cannot restart journalct, there is nothing we can do.
return nil, fmt.Errorf("cannot restart journalctl: %w", err)
}
var extraArgs []string
// Corner case: journalctl exited with an error before reading the
// 1st message. This means we don't have a cursor and need to restart
// it with the initial arguments.
if r.cursor == "" {
extraArgs = r.extraArgs
} else {
// We have a cursor, set it instead of the other options that select
// where in the journal to start reading because they are incompatible
// with setting the cursor.
extraArgs = []string{"--after-cursor", r.cursor}
}

// Return an empty message and wait for the caller to call us again
return nil, ErrRestarting
}
if err := r.newJctl(extraArgs...); err != nil {
// If we cannot restart journalct, there is nothing we can do.
return nil, fmt.Errorf("cannot restart journalctl: %w", err)
}

return msg, nil
// Return an empty message and wait for the caller to call us again
return nil, ErrRestarting
}

// Next returns the next journal entry. If there is no entry available
Expand Down
12 changes: 6 additions & 6 deletions filebeat/input/journald/pkg/journalctl/reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,8 @@
for idx, rawEvent := range testCases {
t.Run(fmt.Sprintf("test %d", idx), func(t *testing.T) {
mock := JctlMock{
NextFunc: func(canceler input.Canceler) ([]byte, bool, error) {
return rawEvent, false, nil
NextFunc: func(canceler input.Canceler) ([]byte, error) {
return rawEvent, nil
},
}
r := Reader{
Expand All @@ -71,12 +71,12 @@
var jdEvent []byte

func TestRestartsJournalctlOnError(t *testing.T) {
logp.DevelopmentSetup(logp.ToObserverOutput())

Check failure on line 74 in filebeat/input/journald/pkg/journalctl/reader_test.go

View workflow job for this annotation

GitHub Actions / lint (ubuntu-latest)

SA1019: logp.DevelopmentSetup is deprecated: Prefer using localized loggers. Use logp.NewDevelopmentLogger. (staticcheck)
ctx := context.Background()

mock := JctlMock{
NextFunc: func(canceler input.Canceler) ([]byte, bool, error) {
return jdEvent, false, errors.New("journalctl exited with code 42")
NextFunc: func(canceler input.Canceler) ([]byte, error) {
return jdEvent, errors.New("journalctl exited with code 42")
},
}

Expand All @@ -93,8 +93,8 @@

// If calls have been made, change the Next function to always succeed
// and return it
mock.NextFunc = func(canceler input.Canceler) ([]byte, bool, error) {
return jdEvent, false, nil
mock.NextFunc = func(canceler input.Canceler) ([]byte, error) {
return jdEvent, nil
}

return &mock, nil
Expand Down
6 changes: 0 additions & 6 deletions filebeat/tests/integration/journald_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,6 @@ func TestJournaldInputRunsAndRecoversFromJournalctlFailures(t *testing.T) {

filebeat.WriteConfigFile(yamlCfg)
filebeat.Start()
// On a normal execution we run journalclt twice, the first time to read all messages from the
// previous boot until 'now' and the second one with the --follow flag that should keep on running.
filebeat.WaitForLogs("journalctl started with PID", 10*time.Second, "journalctl did not start")
filebeat.WaitForLogs("journalctl started with PID", 10*time.Second, "journalctl did not start")

pidLine := filebeat.GetLastLogLine("journalctl started with PID")
Expand Down Expand Up @@ -98,9 +95,6 @@ func TestJournaldInputDoesNotDuplicateData(t *testing.T) {

filebeat.WriteConfigFile(yamlCfg)
filebeat.Start()
// On a normal execution we run journalclt twice, the first time to read all messages from the
// previous boot until 'now' and the second one with the --follow flag that should keep on running.
filebeat.WaitForLogs("journalctl started with PID", 10*time.Second, "journalctl did not start")
filebeat.WaitForLogs("journalctl started with PID", 10*time.Second, "journalctl did not start")

pidLine := filebeat.GetLastLogLine("journalctl started with PID")
Expand Down
Loading
Loading