Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
1d0e520
docs: archive
VihasMakwana Jan 7, 2025
0b7f793
Merge branch 'main' into archive-docs
VihasMakwana Feb 25, 2025
4b48aab
[chore] - finalize tracker's implementation
VihasMakwana Feb 28, 2025
41dc51c
Merge branch 'main' into tracker-final
VihasMakwana Feb 28, 2025
c9a985e
lint
VihasMakwana Feb 28, 2025
d1d9099
Merge branch 'main' into tracker-final
VihasMakwana Feb 28, 2025
400479d
Merge branch 'main' into archive-docs
VihasMakwana Feb 28, 2025
9a102d0
skip on windows
VihasMakwana Feb 28, 2025
ec2bafb
Merge branch 'main' into tracker-final
VihasMakwana Mar 4, 2025
fcc86db
Merge branch 'main' into tracker-final
VihasMakwana Mar 4, 2025
e075279
Merge branch 'main' into tracker-final
VihasMakwana Mar 7, 2025
0eb153a
Merge branch 'main' into tracker-final
VihasMakwana Mar 7, 2025
e94be1e
Merge branch 'main' into tracker-final
VihasMakwana Mar 10, 2025
ddb279c
add readme and changelog
VihasMakwana Mar 10, 2025
2323f1d
Merge branch 'main' into tracker-final
VihasMakwana Mar 17, 2025
da29633
Merge branch 'main' into tracker-final
VihasMakwana Mar 17, 2025
07a6651
Merge branch 'main' into tracker-final
VihasMakwana Mar 18, 2025
f98fecc
Merge branch 'main' into tracker-final
VihasMakwana Mar 26, 2025
fa8c854
Merge branch 'main' into tracker-final
VihasMakwana Mar 28, 2025
cecb53e
nits
VihasMakwana Apr 1, 2025
34f9015
Merge branch 'archive-docs' into tracker-final
VihasMakwana Apr 1, 2025
a0e83d3
Merge branch 'main' into tracker-final
VihasMakwana Apr 1, 2025
3b65789
comments
VihasMakwana Apr 6, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
[chore] - finalize tracker's implementation
  • Loading branch information
VihasMakwana committed Feb 28, 2025
commit 4b48aab1e52a20d87cc0cc00363e85c8e0c2026f
2 changes: 2 additions & 0 deletions pkg/stanza/fileconsumer/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ func NewConfig() *Config {
MaxLogSize: reader.DefaultMaxLogSize,
Encoding: defaultEncoding,
FlushPeriod: reader.DefaultFlushPeriod,
PollsToArchive: 0,
Resolver: attrs.Resolver{
IncludeFileName: true,
},
Expand Down Expand Up @@ -184,6 +185,7 @@ func (c Config) Build(set component.TelemetrySettings, emit emit.Callback, opts
maxBatches: c.MaxBatches,
telemetryBuilder: telemetryBuilder,
noTracking: o.noTracking,
pollsToArchive: c.PollsToArchive,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will always be 0 because in line 92 the field PollsToArchive is not mapped to a configuration setting. Is this intended?

PollsToArchive int `mapstructure:"-"` // TODO: activate this config once archiving is set up

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@andrzej-stencel I've added documentation and enabled the config in line 92. Can you take a look when you have a moment?

}, nil
}

Expand Down
73 changes: 55 additions & 18 deletions pkg/stanza/fileconsumer/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,34 +204,78 @@ func (m *Manager) makeFingerprint(path string) (*fingerprint.Fingerprint, *os.Fi
// discarding any that have a duplicate fingerprint to other files that have already
// been read this polling interval
func (m *Manager) makeReaders(ctx context.Context, paths []string) {
var unmatchedFiles []*os.File
var unmatchedFingerprints []*fingerprint.Fingerprint

for _, path := range paths {
fp, file := m.makeFingerprint(path)
if fp == nil {
if fp == nil || m.excludeDuplicate(fp, file) {
continue
}

r, err := m.newReader(ctx, file, fp)
if err != nil {
m.set.Logger.Error("Failed to create reader", zap.Error(err))
continue
}

// Exclude duplicate paths with the same content. This can happen when files are
// being rotated with copy/truncate strategy. (After copy, prior to truncate.)
if r := m.tracker.GetCurrentFile(fp); r != nil {
m.set.Logger.Debug("Skipping duplicate file", zap.String("path", file.Name()))
// re-add the reader as Match() removes duplicates
if r != nil {
m.telemetryBuilder.FileconsumerOpenFiles.Add(ctx, 1)
m.tracker.Add(r)
if err := file.Close(); err != nil {
m.set.Logger.Debug("problem closing file", zap.Error(err))
}
continue
}
// aggregate unmatched fingerprint and file to match it against archive
unmatchedFingerprints = append(unmatchedFingerprints, fp)
unmatchedFiles = append(unmatchedFiles, file)
}

r, err := m.newReader(ctx, file, fp)
m.processUnmatchedFiles(ctx, unmatchedFiles, unmatchedFingerprints)
}

func (m *Manager) processUnmatchedFiles(ctx context.Context, files []*os.File, fingerprints []*fingerprint.Fingerprint) {
metadataFromArchive := m.tracker.FindFiles(fingerprints)

for i, metadata := range metadataFromArchive {
file, fp := files[i], fingerprints[i]
if m.excludeDuplicate(fp, file) {
continue
}

r, err := m.createReader(file, fp, metadata)
if err != nil {
m.set.Logger.Error("Failed to create reader", zap.Error(err))
continue
}

m.telemetryBuilder.FileconsumerOpenFiles.Add(ctx, 1)
m.tracker.Add(r)
}
}

func (m *Manager) createReader(file *os.File, fp *fingerprint.Fingerprint, metadata *reader.Metadata) (*reader.Reader, error) {
if metadata != nil {
return m.readerFactory.NewReaderFromMetadata(file, metadata)
}

m.set.Logger.Info("Started watching file", zap.String("path", file.Name()))
return m.readerFactory.NewReader(file, fp)
}

func (m *Manager) excludeDuplicate(fp *fingerprint.Fingerprint, file *os.File) bool {
// shouldExclude return true if duplicate path is found with the same content and closes the duplicate.
// This can happen when files are being rotated with copy/truncate strategy. (After copy, prior to truncate.)
if r := m.tracker.GetCurrentFile(fp); r != nil {
m.set.Logger.Debug("Skipping duplicate file", zap.String("path", file.Name()))
// re-add the reader as Match() removes duplicates
m.tracker.Add(r)
if err := file.Close(); err != nil {
m.set.Logger.Debug("problem closing file", zap.Error(err))
}
return true
}
return false
}

func (m *Manager) newReader(ctx context.Context, file *os.File, fp *fingerprint.Fingerprint) (*reader.Reader, error) {
// Check previous poll cycle for match
if oldReader := m.tracker.GetOpenFile(fp); oldReader != nil {
Expand Down Expand Up @@ -261,14 +305,7 @@ func (m *Manager) newReader(ctx context.Context, file *os.File, fp *fingerprint.
return r, nil
}

// If we don't match any previously known files, create a new reader from scratch
m.set.Logger.Info("Started watching file", zap.String("path", file.Name()))
r, err := m.readerFactory.NewReader(file, fp)
if err != nil {
return nil, err
}
m.telemetryBuilder.FileconsumerOpenFiles.Add(ctx, 1)
return r, nil
return nil, nil
}

func (m *Manager) instantiateTracker(ctx context.Context, persister operator.Persister) {
Expand Down
51 changes: 51 additions & 0 deletions pkg/stanza/fileconsumer/file_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1655,3 +1655,54 @@ func TestIncludeFileRecordNumberWithHeaderConfiguredButMissing(t *testing.T) {
attrs.LogFileRecordNumber: int64(1),
})
}

func TestArchive(t *testing.T) {
t.Parallel()
persister := testutil.NewUnscopedMockPersister()

tempDir := t.TempDir()
cfg := NewConfig().includeDir(tempDir)
cfg.StartAt = "beginning"
cfg.PollsToArchive = 100
cfg.PollInterval = 100 * time.Millisecond

temp := filetest.OpenTempWithPattern(t, tempDir, "file.log")
filetest.WriteString(t, temp, "testlog1\n")

operator, sink := testManager(t, cfg)
operator.Start(persister)
defer func() {
require.NoError(t, operator.Stop())
}()

sink.ExpectCall(t, []byte("testlog1"), map[string]any{
attrs.LogFileName: filepath.Base(temp.Name()),
})

os.Remove(temp.Name())

time.Sleep(500 * time.Millisecond)

temp = filetest.OpenTempWithPattern(t, tempDir, "file.log")
filetest.WriteString(t, temp, "testlog1\n")
filetest.WriteString(t, temp, "testlog2\n")

sink.ExpectCall(t, []byte("testlog2"), map[string]any{
attrs.LogFileName: filepath.Base(temp.Name()),
})

os.Remove(temp.Name())

time.Sleep(500 * time.Millisecond)

temp = filetest.OpenTempWithPattern(t, tempDir, "file.log")
filetest.WriteString(t, temp, "testlog1\n")
filetest.WriteString(t, temp, "testlog2\n")
filetest.WriteString(t, temp, "testlog3\n")
filetest.WriteString(t, temp, "testlog4\n")

log3 := emit.Token{Body: []byte("testlog3"), Attributes: map[string]any{attrs.LogFileName: filepath.Base(temp.Name())}}
log4 := emit.Token{Body: []byte("testlog4"), Attributes: map[string]any{attrs.LogFileName: filepath.Base(temp.Name())}}

sink.ExpectCalls(t, log3, log4)
}
10 changes: 9 additions & 1 deletion pkg/stanza/fileconsumer/internal/tracker/tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -327,6 +327,11 @@ func (t *fileTracker) FindFiles(fps []*fingerprint.Fingerprint) []*reader.Metada
nextIndex := t.archiveIndex
matchedMetadata := make([]*reader.Metadata, len(fps))

if !t.archiveEnabled() {
// we make an early exit if archiving is disabled. This would create new readers from scratch
// we return an array of "nil"s to make things easier while creating readers
return matchedMetadata
}
// continue executing the loop until either all records are matched or all archive sets have been processed.
for i := 0; i < t.pollsToArchive; i++ {
// Update the mostRecentIndex
Expand Down Expand Up @@ -420,7 +425,10 @@ func (t *noStateTracker) EndPoll() {}

func (t *noStateTracker) TotalReaders() int { return 0 }

func (t *noStateTracker) FindFiles([]*fingerprint.Fingerprint) []*reader.Metadata { return nil }
func (t *noStateTracker) FindFiles(fps []*fingerprint.Fingerprint) []*reader.Metadata {
// we return an array of "nil"s to make things easier while creating readers
return make([]*reader.Metadata, len(fps))
}

func encodeIndex(val int) []byte {
var buf bytes.Buffer
Expand Down
Loading