@@ -60,13 +60,15 @@ type jobProvider struct {
60
60
}
61
61
62
62
type Job struct {
63
- file * os.File
64
- inode inodeID
65
- sourceID pipeline.SourceID // some value to distinguish jobs with same inode
66
- filename string
67
- symlink string
68
- curOffset int64 // offset to not call Seek() everytime
69
- tail []byte // some data of a new line read by worker, to not seek backwards to read from line start
63
+ file * os.File
64
+ mimeType string
65
+ isCompressed bool
66
+ inode inodeID
67
+ sourceID pipeline.SourceID // some value to distinguish jobs with same inode
68
+ filename string
69
+ symlink string
70
+ curOffset int64 // offset to not call Seek() everytime
71
+ tail []byte // some data of a new line read by worker, to not seek backwards to read from line start
70
72
71
73
ignoreEventsLE uint64 // events with seq id less or equal than this should be ignored in terms offset commitment
72
74
lastEventSeq uint64
@@ -83,10 +85,15 @@ type Job struct {
83
85
mu * sync.Mutex
84
86
}
85
87
86
- func (j * Job ) seek (offset int64 , whence int , hint string ) int64 {
87
- n , err := j .file .Seek (offset , whence )
88
- if err != nil {
89
- logger .Infof ("file seek error hint=%s, name=%s, err=%s" , hint , j .filename , err .Error ())
88
+ func (j * Job ) seek (offset int64 , whence int , hint string ) (n int64 ) {
89
+ var err error
90
+ if ! j .isCompressed {
91
+ n , err = j .file .Seek (offset , whence )
92
+ if err != nil {
93
+ logger .Infof ("file seek error hint=%s, name=%s, err=%s" , hint , j .filename , err .Error ())
94
+ }
95
+ } else {
96
+ n = 0
90
97
}
91
98
j .curOffset = n
92
99
@@ -354,6 +361,10 @@ func (jp *jobProvider) checkFileWasTruncated(job *Job, size int64) {
354
361
}
355
362
}
356
363
364
+ func isCompressed (mimeType string ) bool {
365
+ return mimeType == "application/x-lz4"
366
+ }
367
+
357
368
func (jp * jobProvider ) addJob (file * os.File , stat os.FileInfo , filename string , symlink string ) {
358
369
sourceID := sourceIDByStat (stat , symlink )
359
370
@@ -370,12 +381,16 @@ func (jp *jobProvider) addJob(file *os.File, stat os.FileInfo, filename string,
370
381
}
371
382
372
383
inode := getInode (stat )
384
+ mimeType := getMimeType (filename )
385
+
373
386
job := & Job {
374
- file : file ,
375
- inode : inode ,
376
- filename : filename ,
377
- symlink : symlink ,
378
- sourceID : sourceID ,
387
+ file : file ,
388
+ isCompressed : isCompressed (mimeType ),
389
+ mimeType : mimeType ,
390
+ inode : inode ,
391
+ filename : filename ,
392
+ symlink : symlink ,
393
+ sourceID : sourceID ,
379
394
380
395
isVirgin : true ,
381
396
isDone : true ,
@@ -746,6 +761,24 @@ func (jp *jobProvider) deleteJobAndUnlock(job *Job) {
746
761
}
747
762
}
748
763
764
+ func (jp * jobProvider ) deleteJob (job * Job ) {
765
+ if ! job .isDone {
766
+ jp .logger .Panicf ("can't delete job, it isn't done: %d:%s" , job .sourceID , job .filename )
767
+ }
768
+ sourceID := job .sourceID
769
+ filename := job .filename
770
+
771
+ jp .jobsMu .Lock ()
772
+ delete (jp .jobs , sourceID )
773
+ c := jp .jobsDone .Dec ()
774
+ jp .jobsMu .Unlock ()
775
+
776
+ jp .logger .Infof ("job %d:%s deleted" , job .sourceID , filename )
777
+ if c < 0 {
778
+ jp .logger .Panicf ("done jobs counter less than zero" )
779
+ }
780
+ }
781
+
749
782
func getInode (stat os.FileInfo ) inodeID {
750
783
return inodeID (stat .Sys ().(* syscall.Stat_t ).Ino )
751
784
}
0 commit comments