Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ require (
github.com/klauspost/compress v1.17.8
github.com/minio/minio-go v6.0.14+incompatible
github.com/ozontech/insane-json v0.1.9
github.com/pierrec/lz4/v4 v4.1.21
github.com/prometheus/client_golang v1.16.0
github.com/prometheus/procfs v0.10.1
github.com/rjeczalik/notify v0.9.3
Expand Down Expand Up @@ -118,7 +119,6 @@ require (
github.com/onsi/ginkgo v1.16.5 // indirect
github.com/opencontainers/runtime-spec v1.0.2 // indirect
github.com/pascaldekloe/name v1.0.1 // indirect
github.com/pierrec/lz4/v4 v4.1.21 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/prometheus/client_model v0.3.0 // indirect
github.com/prometheus/common v0.42.0 // indirect
Expand Down
47 changes: 31 additions & 16 deletions plugin/input/file/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,13 +60,15 @@ type jobProvider struct {
}

type Job struct {
file *os.File
inode inodeID
sourceID pipeline.SourceID // some value to distinguish jobs with same inode
filename string
symlink string
curOffset int64 // offset to not call Seek() everytime
tail []byte // some data of a new line read by worker, to not seek backwards to read from line start
file *os.File
mimeType string
isCompressed bool
inode inodeID
sourceID pipeline.SourceID // some value to distinguish jobs with same inode
filename string
symlink string
curOffset int64 // offset to not call Seek() everytime
tail []byte // some data of a new line read by worker, to not seek backwards to read from line start

ignoreEventsLE uint64 // events with seq id less or equal than this should be ignored in terms offset commitment
lastEventSeq uint64
Expand All @@ -83,10 +85,15 @@ type Job struct {
mu *sync.Mutex
}

func (j *Job) seek(offset int64, whence int, hint string) int64 {
n, err := j.file.Seek(offset, whence)
if err != nil {
logger.Infof("file seek error hint=%s, name=%s, err=%s", hint, j.filename, err.Error())
func (j *Job) seek(offset int64, whence int, hint string) (n int64) {
var err error
if !j.isCompressed {
n, err = j.file.Seek(offset, whence)
if err != nil {
logger.Infof("file seek error hint=%s, name=%s, err=%s", hint, j.filename, err.Error())
}
} else {
n = 0
}
j.curOffset = n

Expand Down Expand Up @@ -354,6 +361,10 @@ func (jp *jobProvider) checkFileWasTruncated(job *Job, size int64) {
}
}

func isCompressed(mimeType string) bool {
return mimeType == "application/x-lz4"
}

func (jp *jobProvider) addJob(file *os.File, stat os.FileInfo, filename string, symlink string) {
sourceID := sourceIDByStat(stat, symlink)

Expand All @@ -370,12 +381,16 @@ func (jp *jobProvider) addJob(file *os.File, stat os.FileInfo, filename string,
}

inode := getInode(stat)
mimeType := getMimeType(filename)

job := &Job{
file: file,
inode: inode,
filename: filename,
symlink: symlink,
sourceID: sourceID,
file: file,
isCompressed: isCompressed(mimeType),
mimeType: mimeType,
inode: inode,
filename: filename,
symlink: symlink,
sourceID: sourceID,

isVirgin: true,
isDone: true,
Expand Down
78 changes: 71 additions & 7 deletions plugin/input/file/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,16 @@
import (
"bytes"
"io"
"mime"
"os"
"os/exec"
"path/filepath"
"strings"

"github.com/ozontech/file.d/pipeline"
"github.com/ozontech/file.d/pipeline/metadata"
k8s_meta "github.com/ozontech/file.d/plugin/input/k8s/meta"
"github.com/pierrec/lz4/v4"

"go.uber.org/zap"
)
Expand Down Expand Up @@ -86,12 +90,35 @@
}
}

var reader io.Reader
if job.mimeType == "application/x-lz4" {
if isNotFileBeingWritten(file.Name()) {
logger.Error("cannot lock file", zap.String("filename", file.Name()))
break
}
lz4Reader := lz4.NewReader(file)
if len(offsets) > 0 {
for lastOffset+int64(readBufferSize) < offsets[0].Offset {
n, err := lz4Reader.Read(readBuf)
if err != nil {
if err == io.EOF {
break // End of file reached
}
}
lastOffset += int64(n)
}
}
reader = lz4Reader
} else {
reader = file
}

// append the data of the old work, this happens when the event was not completely written to the file
// for example: {"level": "info", "message": "some...
// the end of the message can be added later and will be read in this iteration
accumBuf = append(accumBuf[:0], job.tail...)
for {
n, err := file.Read(readBuf)
n, err := reader.Read(readBuf)
controller.IncReadOps()
// if we read to end of file it's time to check truncation etc and process next job
if err == io.EOF || n == 0 {
Expand Down Expand Up @@ -173,22 +200,59 @@
}
}

func getMimeType(filename string) string {
ext := filepath.Ext(filename)
mimeType := mime.TypeByExtension(ext)
if mimeType == "" {
mimeType = "application/octet-stream"
}

return mimeType
}

func isNotFileBeingWritten(filePath string) bool {
// Run the lsof command to check open file descriptors
cmd := exec.Command("lsof", filePath)
output, err := cmd.Output()
if err != nil {
return false // Error running lsof
}

// Check the output for write access
lines := strings.Split(string(output), "\n")
for _, line := range lines {
// Check if the line contains 'w' indicating write access
if strings.Contains(line, "w") {
return true // File is being written to
}
}

return false // File is not being written to
}

func (w *worker) processEOF(file *os.File, job *Job, jobProvider *jobProvider, totalOffset int64) error {
stat, err := file.Stat()
if err != nil {
return err
}

// files truncated from time to time, after logs from file was processed.
// Position > stat.Size() means that data was truncated and
// caret pointer must be moved to start of file.
if totalOffset > stat.Size() {
jobProvider.truncateJob(job)
if !job.isCompressed {
// files truncated from time to time, after logs from file was processed.
// Position > stat.Size() means that data was truncated and
// caret pointer must be moved to start of file.
if totalOffset > stat.Size() {
jobProvider.truncateJob(job)
}
}

// Mark job as done till new lines has appeared.
jobProvider.doneJob(job)

if job.isCompressed {
job.mu.Lock()
file.Close()

Check failure on line 252 in plugin/input/file/worker.go

View workflow job for this annotation

GitHub Actions / lint

Error return value of `file.Close` is not checked (errcheck)
jobProvider.deleteJobAndUnlock(job)
}

return nil
}

Expand Down
Loading