@@ -204,34 +204,78 @@ func (m *Manager) makeFingerprint(path string) (*fingerprint.Fingerprint, *os.Fi
204
204
// discarding any that have a duplicate fingerprint to other files that have already
205
205
// been read this polling interval
206
206
func (m * Manager ) makeReaders (ctx context.Context , paths []string ) {
207
+ var unmatchedFiles []* os.File
208
+ var unmatchedFingerprints []* fingerprint.Fingerprint
209
+
207
210
for _ , path := range paths {
208
211
fp , file := m .makeFingerprint (path )
209
- if fp == nil {
212
+ if fp == nil || m .excludeDuplicate (fp , file ) {
213
+ continue
214
+ }
215
+
216
+ r , err := m .newReader (ctx , file , fp )
217
+ if err != nil {
218
+ m .set .Logger .Error ("Failed to create reader" , zap .Error (err ))
210
219
continue
211
220
}
212
221
213
- // Exclude duplicate paths with the same content. This can happen when files are
214
- // being rotated with copy/truncate strategy. (After copy, prior to truncate.)
215
- if r := m .tracker .GetCurrentFile (fp ); r != nil {
216
- m .set .Logger .Debug ("Skipping duplicate file" , zap .String ("path" , file .Name ()))
217
- // re-add the reader as Match() removes duplicates
222
+ if r != nil {
223
+ m .telemetryBuilder .FileconsumerOpenFiles .Add (ctx , 1 )
218
224
m .tracker .Add (r )
219
- if err := file .Close (); err != nil {
220
- m .set .Logger .Debug ("problem closing file" , zap .Error (err ))
221
- }
222
225
continue
223
226
}
227
+ // aggregate unmatched fingerprint and file to match it against archive
228
+ unmatchedFingerprints = append (unmatchedFingerprints , fp )
229
+ unmatchedFiles = append (unmatchedFiles , file )
230
+ }
224
231
225
- r , err := m .newReader (ctx , file , fp )
232
+ m .processUnmatchedFiles (ctx , unmatchedFiles , unmatchedFingerprints )
233
+ }
234
+
235
+ func (m * Manager ) processUnmatchedFiles (ctx context.Context , files []* os.File , fingerprints []* fingerprint.Fingerprint ) {
236
+ metadataFromArchive := m .tracker .FindFiles (fingerprints )
237
+
238
+ for i , metadata := range metadataFromArchive {
239
+ file , fp := files [i ], fingerprints [i ]
240
+ if m .excludeDuplicate (fp , file ) {
241
+ continue
242
+ }
243
+
244
+ r , err := m .createReader (file , fp , metadata )
226
245
if err != nil {
227
246
m .set .Logger .Error ("Failed to create reader" , zap .Error (err ))
228
247
continue
229
248
}
230
249
250
+ m .telemetryBuilder .FileconsumerOpenFiles .Add (ctx , 1 )
231
251
m .tracker .Add (r )
232
252
}
233
253
}
234
254
255
+ func (m * Manager ) createReader (file * os.File , fp * fingerprint.Fingerprint , metadata * reader.Metadata ) (* reader.Reader , error ) {
256
+ if metadata != nil {
257
+ return m .readerFactory .NewReaderFromMetadata (file , metadata )
258
+ }
259
+
260
+ m .set .Logger .Info ("Started watching file" , zap .String ("path" , file .Name ()))
261
+ return m .readerFactory .NewReader (file , fp )
262
+ }
263
+
264
+ func (m * Manager ) excludeDuplicate (fp * fingerprint.Fingerprint , file * os.File ) bool {
265
+ // shouldExclude return true if duplicate path is found with the same content and closes the duplicate.
266
+ // This can happen when files are being rotated with copy/truncate strategy. (After copy, prior to truncate.)
267
+ if r := m .tracker .GetCurrentFile (fp ); r != nil {
268
+ m .set .Logger .Debug ("Skipping duplicate file" , zap .String ("path" , file .Name ()))
269
+ // re-add the reader as Match() removes duplicates
270
+ m .tracker .Add (r )
271
+ if err := file .Close (); err != nil {
272
+ m .set .Logger .Debug ("problem closing file" , zap .Error (err ))
273
+ }
274
+ return true
275
+ }
276
+ return false
277
+ }
278
+
235
279
func (m * Manager ) newReader (ctx context.Context , file * os.File , fp * fingerprint.Fingerprint ) (* reader.Reader , error ) {
236
280
// Check previous poll cycle for match
237
281
if oldReader := m .tracker .GetOpenFile (fp ); oldReader != nil {
@@ -261,14 +305,7 @@ func (m *Manager) newReader(ctx context.Context, file *os.File, fp *fingerprint.
261
305
return r , nil
262
306
}
263
307
264
- // If we don't match any previously known files, create a new reader from scratch
265
- m .set .Logger .Info ("Started watching file" , zap .String ("path" , file .Name ()))
266
- r , err := m .readerFactory .NewReader (file , fp )
267
- if err != nil {
268
- return nil , err
269
- }
270
- m .telemetryBuilder .FileconsumerOpenFiles .Add (ctx , 1 )
271
- return r , nil
308
+ return nil , nil
272
309
}
273
310
274
311
func (m * Manager ) instantiateTracker (ctx context.Context , persister operator.Persister ) {
0 commit comments