Skip to content

Commit 4b48aab

Browse files
committed
[chore] - finalize tracker's implementation
1 parent f036dd9 commit 4b48aab

File tree

4 files changed

+117
-19
lines changed

4 files changed

+117
-19
lines changed

pkg/stanza/fileconsumer/config.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,7 @@ func NewConfig() *Config {
6363
MaxLogSize: reader.DefaultMaxLogSize,
6464
Encoding: defaultEncoding,
6565
FlushPeriod: reader.DefaultFlushPeriod,
66+
PollsToArchive: 0,
6667
Resolver: attrs.Resolver{
6768
IncludeFileName: true,
6869
},
@@ -184,6 +185,7 @@ func (c Config) Build(set component.TelemetrySettings, emit emit.Callback, opts
184185
maxBatches: c.MaxBatches,
185186
telemetryBuilder: telemetryBuilder,
186187
noTracking: o.noTracking,
188+
pollsToArchive: c.PollsToArchive,
187189
}, nil
188190
}
189191

pkg/stanza/fileconsumer/file.go

Lines changed: 55 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -204,34 +204,78 @@ func (m *Manager) makeFingerprint(path string) (*fingerprint.Fingerprint, *os.Fi
204204
// discarding any that have a duplicate fingerprint to other files that have already
205205
// been read this polling interval
206206
func (m *Manager) makeReaders(ctx context.Context, paths []string) {
207+
var unmatchedFiles []*os.File
208+
var unmatchedFingerprints []*fingerprint.Fingerprint
209+
207210
for _, path := range paths {
208211
fp, file := m.makeFingerprint(path)
209-
if fp == nil {
212+
if fp == nil || m.excludeDuplicate(fp, file) {
213+
continue
214+
}
215+
216+
r, err := m.newReader(ctx, file, fp)
217+
if err != nil {
218+
m.set.Logger.Error("Failed to create reader", zap.Error(err))
210219
continue
211220
}
212221

213-
// Exclude duplicate paths with the same content. This can happen when files are
214-
// being rotated with copy/truncate strategy. (After copy, prior to truncate.)
215-
if r := m.tracker.GetCurrentFile(fp); r != nil {
216-
m.set.Logger.Debug("Skipping duplicate file", zap.String("path", file.Name()))
217-
// re-add the reader as Match() removes duplicates
222+
if r != nil {
223+
m.telemetryBuilder.FileconsumerOpenFiles.Add(ctx, 1)
218224
m.tracker.Add(r)
219-
if err := file.Close(); err != nil {
220-
m.set.Logger.Debug("problem closing file", zap.Error(err))
221-
}
222225
continue
223226
}
227+
// aggregate unmatched fingerprint and file to match it against archive
228+
unmatchedFingerprints = append(unmatchedFingerprints, fp)
229+
unmatchedFiles = append(unmatchedFiles, file)
230+
}
224231

225-
r, err := m.newReader(ctx, file, fp)
232+
m.processUnmatchedFiles(ctx, unmatchedFiles, unmatchedFingerprints)
233+
}
234+
235+
func (m *Manager) processUnmatchedFiles(ctx context.Context, files []*os.File, fingerprints []*fingerprint.Fingerprint) {
236+
metadataFromArchive := m.tracker.FindFiles(fingerprints)
237+
238+
for i, metadata := range metadataFromArchive {
239+
file, fp := files[i], fingerprints[i]
240+
if m.excludeDuplicate(fp, file) {
241+
continue
242+
}
243+
244+
r, err := m.createReader(file, fp, metadata)
226245
if err != nil {
227246
m.set.Logger.Error("Failed to create reader", zap.Error(err))
228247
continue
229248
}
230249

250+
m.telemetryBuilder.FileconsumerOpenFiles.Add(ctx, 1)
231251
m.tracker.Add(r)
232252
}
233253
}
234254

255+
func (m *Manager) createReader(file *os.File, fp *fingerprint.Fingerprint, metadata *reader.Metadata) (*reader.Reader, error) {
256+
if metadata != nil {
257+
return m.readerFactory.NewReaderFromMetadata(file, metadata)
258+
}
259+
260+
m.set.Logger.Info("Started watching file", zap.String("path", file.Name()))
261+
return m.readerFactory.NewReader(file, fp)
262+
}
263+
264+
func (m *Manager) excludeDuplicate(fp *fingerprint.Fingerprint, file *os.File) bool {
265+
// shouldExclude return true if duplicate path is found with the same content and closes the duplicate.
266+
// This can happen when files are being rotated with copy/truncate strategy. (After copy, prior to truncate.)
267+
if r := m.tracker.GetCurrentFile(fp); r != nil {
268+
m.set.Logger.Debug("Skipping duplicate file", zap.String("path", file.Name()))
269+
// re-add the reader as Match() removes duplicates
270+
m.tracker.Add(r)
271+
if err := file.Close(); err != nil {
272+
m.set.Logger.Debug("problem closing file", zap.Error(err))
273+
}
274+
return true
275+
}
276+
return false
277+
}
278+
235279
func (m *Manager) newReader(ctx context.Context, file *os.File, fp *fingerprint.Fingerprint) (*reader.Reader, error) {
236280
// Check previous poll cycle for match
237281
if oldReader := m.tracker.GetOpenFile(fp); oldReader != nil {
@@ -261,14 +305,7 @@ func (m *Manager) newReader(ctx context.Context, file *os.File, fp *fingerprint.
261305
return r, nil
262306
}
263307

264-
// If we don't match any previously known files, create a new reader from scratch
265-
m.set.Logger.Info("Started watching file", zap.String("path", file.Name()))
266-
r, err := m.readerFactory.NewReader(file, fp)
267-
if err != nil {
268-
return nil, err
269-
}
270-
m.telemetryBuilder.FileconsumerOpenFiles.Add(ctx, 1)
271-
return r, nil
308+
return nil, nil
272309
}
273310

274311
func (m *Manager) instantiateTracker(ctx context.Context, persister operator.Persister) {

pkg/stanza/fileconsumer/file_test.go

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1655,3 +1655,54 @@ func TestIncludeFileRecordNumberWithHeaderConfiguredButMissing(t *testing.T) {
16551655
attrs.LogFileRecordNumber: int64(1),
16561656
})
16571657
}
1658+
1659+
func TestArchive(t *testing.T) {
1660+
t.Parallel()
1661+
persister := testutil.NewUnscopedMockPersister()
1662+
1663+
tempDir := t.TempDir()
1664+
cfg := NewConfig().includeDir(tempDir)
1665+
cfg.StartAt = "beginning"
1666+
cfg.PollsToArchive = 100
1667+
cfg.PollInterval = 100 * time.Millisecond
1668+
1669+
temp := filetest.OpenTempWithPattern(t, tempDir, "file.log")
1670+
filetest.WriteString(t, temp, "testlog1\n")
1671+
1672+
operator, sink := testManager(t, cfg)
1673+
operator.Start(persister)
1674+
defer func() {
1675+
require.NoError(t, operator.Stop())
1676+
}()
1677+
1678+
sink.ExpectCall(t, []byte("testlog1"), map[string]any{
1679+
attrs.LogFileName: filepath.Base(temp.Name()),
1680+
})
1681+
1682+
os.Remove(temp.Name())
1683+
1684+
time.Sleep(500 * time.Millisecond)
1685+
1686+
temp = filetest.OpenTempWithPattern(t, tempDir, "file.log")
1687+
filetest.WriteString(t, temp, "testlog1\n")
1688+
filetest.WriteString(t, temp, "testlog2\n")
1689+
1690+
sink.ExpectCall(t, []byte("testlog2"), map[string]any{
1691+
attrs.LogFileName: filepath.Base(temp.Name()),
1692+
})
1693+
1694+
os.Remove(temp.Name())
1695+
1696+
time.Sleep(500 * time.Millisecond)
1697+
1698+
temp = filetest.OpenTempWithPattern(t, tempDir, "file.log")
1699+
filetest.WriteString(t, temp, "testlog1\n")
1700+
filetest.WriteString(t, temp, "testlog2\n")
1701+
filetest.WriteString(t, temp, "testlog3\n")
1702+
filetest.WriteString(t, temp, "testlog4\n")
1703+
1704+
log3 := emit.Token{Body: []byte("testlog3"), Attributes: map[string]any{attrs.LogFileName: filepath.Base(temp.Name())}}
1705+
log4 := emit.Token{Body: []byte("testlog4"), Attributes: map[string]any{attrs.LogFileName: filepath.Base(temp.Name())}}
1706+
1707+
sink.ExpectCalls(t, log3, log4)
1708+
}

pkg/stanza/fileconsumer/internal/tracker/tracker.go

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -327,6 +327,11 @@ func (t *fileTracker) FindFiles(fps []*fingerprint.Fingerprint) []*reader.Metada
327327
nextIndex := t.archiveIndex
328328
matchedMetadata := make([]*reader.Metadata, len(fps))
329329

330+
if !t.archiveEnabled() {
331+
// we make an early exit if archiving is disabled. This would create new readers from scratch
332+
// we return an array of "nil"s to make things easier while creating readers
333+
return matchedMetadata
334+
}
330335
// continue executing the loop until either all records are matched or all archive sets have been processed.
331336
for i := 0; i < t.pollsToArchive; i++ {
332337
// Update the mostRecentIndex
@@ -420,7 +425,10 @@ func (t *noStateTracker) EndPoll() {}
420425

421426
func (t *noStateTracker) TotalReaders() int { return 0 }
422427

423-
func (t *noStateTracker) FindFiles([]*fingerprint.Fingerprint) []*reader.Metadata { return nil }
428+
func (t *noStateTracker) FindFiles(fps []*fingerprint.Fingerprint) []*reader.Metadata {
429+
// we return an array of "nil"s to make things easier while creating readers
430+
return make([]*reader.Metadata, len(fps))
431+
}
424432

425433
func encodeIndex(val int) []byte {
426434
var buf bytes.Buffer

0 commit comments

Comments
 (0)