diff --git a/go.mod b/go.mod index bd9a42820..c9981d4a9 100644 --- a/go.mod +++ b/go.mod @@ -31,6 +31,7 @@ require ( github.com/klauspost/compress v1.17.8 github.com/minio/minio-go v6.0.14+incompatible github.com/ozontech/insane-json v0.1.9 + github.com/pierrec/lz4/v4 v4.1.21 github.com/prometheus/client_golang v1.16.0 github.com/prometheus/procfs v0.10.1 github.com/rjeczalik/notify v0.9.3 @@ -118,7 +119,6 @@ require ( github.com/onsi/ginkgo v1.16.5 // indirect github.com/opencontainers/runtime-spec v1.0.2 // indirect github.com/pascaldekloe/name v1.0.1 // indirect - github.com/pierrec/lz4/v4 v4.1.21 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect github.com/prometheus/client_model v0.3.0 // indirect github.com/prometheus/common v0.42.0 // indirect diff --git a/plugin/input/file/provider.go b/plugin/input/file/provider.go index f68669a6b..91483b2da 100644 --- a/plugin/input/file/provider.go +++ b/plugin/input/file/provider.go @@ -60,13 +60,15 @@ type jobProvider struct { } type Job struct { - file *os.File - inode inodeID - sourceID pipeline.SourceID // some value to distinguish jobs with same inode - filename string - symlink string - curOffset int64 // offset to not call Seek() everytime - tail []byte // some data of a new line read by worker, to not seek backwards to read from line start + file *os.File + mimeType string + isCompressed bool + inode inodeID + sourceID pipeline.SourceID // some value to distinguish jobs with same inode + filename string + symlink string + curOffset int64 // offset to not call Seek() everytime + tail []byte // some data of a new line read by worker, to not seek backwards to read from line start ignoreEventsLE uint64 // events with seq id less or equal than this should be ignored in terms offset commitment lastEventSeq uint64 @@ -83,10 +85,15 @@ type Job struct { mu *sync.Mutex } -func (j *Job) seek(offset int64, whence int, hint string) int64 { - n, err := j.file.Seek(offset, whence) - if err != nil { - logger.Infof("file seek error hint=%s, name=%s, err=%s", hint, j.filename, err.Error()) +func (j *Job) seek(offset int64, whence int, hint string) (n int64) { + var err error + if !j.isCompressed { + n, err = j.file.Seek(offset, whence) + if err != nil { + logger.Infof("file seek error hint=%s, name=%s, err=%s", hint, j.filename, err.Error()) + } + } else { + n = 0 } j.curOffset = n @@ -354,6 +361,10 @@ func (jp *jobProvider) checkFileWasTruncated(job *Job, size int64) { } } +func isCompressed(mimeType string) bool { + return mimeType == "application/x-lz4" +} + func (jp *jobProvider) addJob(file *os.File, stat os.FileInfo, filename string, symlink string) { sourceID := sourceIDByStat(stat, symlink) @@ -370,12 +381,16 @@ func (jp *jobProvider) addJob(file *os.File, stat os.FileInfo, filename string, } inode := getInode(stat) + mimeType := getMimeType(filename) + job := &Job{ - file: file, - inode: inode, - filename: filename, - symlink: symlink, - sourceID: sourceID, + file: file, + isCompressed: isCompressed(mimeType), + mimeType: mimeType, + inode: inode, + filename: filename, + symlink: symlink, + sourceID: sourceID, isVirgin: true, isDone: true, diff --git a/plugin/input/file/worker.go b/plugin/input/file/worker.go index 38dcd9568..c381fd4a5 100644 --- a/plugin/input/file/worker.go +++ b/plugin/input/file/worker.go @@ -3,12 +3,16 @@ package file import ( "bytes" "io" + "mime" "os" + "os/exec" + "path/filepath" "strings" "github.com/ozontech/file.d/pipeline" "github.com/ozontech/file.d/pipeline/metadata" k8s_meta "github.com/ozontech/file.d/plugin/input/k8s/meta" + "github.com/pierrec/lz4/v4" "go.uber.org/zap" ) @@ -86,12 +90,35 @@ func (w *worker) work(controller inputer, jobProvider *jobProvider, readBufferSi } } + var reader io.Reader + if job.mimeType == "application/x-lz4" { + if isNotFileBeingWritten(file.Name()) { + logger.Error("cannot lock file", zap.String("filename", file.Name())) + break + } + lz4Reader := lz4.NewReader(file) + if len(offsets) > 0 { + for lastOffset+int64(readBufferSize) < offsets[0].Offset { + n, err := lz4Reader.Read(readBuf) + if err != nil { + if err == io.EOF { + break // End of file reached + } + } + lastOffset += int64(n) + } + } + reader = lz4Reader + } else { + reader = file + } + // append the data of the old work, this happens when the event was not completely written to the file // for example: {"level": "info", "message": "some... // the end of the message can be added later and will be read in this iteration accumBuf = append(accumBuf[:0], job.tail...) for { - n, err := file.Read(readBuf) + n, err := reader.Read(readBuf) controller.IncReadOps() // if we read to end of file it's time to check truncation etc and process next job if err == io.EOF || n == 0 { @@ -173,22 +200,59 @@ func (w *worker) work(controller inputer, jobProvider *jobProvider, readBufferSi } } +func getMimeType(filename string) string { + ext := filepath.Ext(filename) + mimeType := mime.TypeByExtension(ext) + if mimeType == "" { + mimeType = "application/octet-stream" + } + + return mimeType +} + +func isNotFileBeingWritten(filePath string) bool { + // Run the lsof command to check open file descriptors + cmd := exec.Command("lsof", filePath) + output, err := cmd.Output() + if err != nil { + return false // Error running lsof + } + + // Check the output for write access + lines := strings.Split(string(output), "\n") + for _, line := range lines { + // Check if the line contains 'w' indicating write access + if strings.Contains(line, "w") { + return true // File is being written to + } + } + + return false // File is not being written to +} + func (w *worker) processEOF(file *os.File, job *Job, jobProvider *jobProvider, totalOffset int64) error { stat, err := file.Stat() if err != nil { return err } - // files truncated from time to time, after logs from file was processed. - // Position > stat.Size() means that data was truncated and - // caret pointer must be moved to start of file. - if totalOffset > stat.Size() { - jobProvider.truncateJob(job) + if !job.isCompressed { + // files truncated from time to time, after logs from file was processed. + // Position > stat.Size() means that data was truncated and + // caret pointer must be moved to start of file. + if totalOffset > stat.Size() { + jobProvider.truncateJob(job) + } } - // Mark job as done till new lines has appeared. jobProvider.doneJob(job) + if job.isCompressed { + job.mu.Lock() + file.Close() + jobProvider.deleteJobAndUnlock(job) + } + return nil }