Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
1d0e520
docs: archive
VihasMakwana Jan 7, 2025
0b7f793
Merge branch 'main' into archive-docs
VihasMakwana Feb 25, 2025
4b48aab
[chore] - finalize tracker's implementation
VihasMakwana Feb 28, 2025
41dc51c
Merge branch 'main' into tracker-final
VihasMakwana Feb 28, 2025
c9a985e
lint
VihasMakwana Feb 28, 2025
d1d9099
Merge branch 'main' into tracker-final
VihasMakwana Feb 28, 2025
400479d
Merge branch 'main' into archive-docs
VihasMakwana Feb 28, 2025
9a102d0
skip on windows
VihasMakwana Feb 28, 2025
ec2bafb
Merge branch 'main' into tracker-final
VihasMakwana Mar 4, 2025
fcc86db
Merge branch 'main' into tracker-final
VihasMakwana Mar 4, 2025
e075279
Merge branch 'main' into tracker-final
VihasMakwana Mar 7, 2025
0eb153a
Merge branch 'main' into tracker-final
VihasMakwana Mar 7, 2025
e94be1e
Merge branch 'main' into tracker-final
VihasMakwana Mar 10, 2025
ddb279c
add readme and changelog
VihasMakwana Mar 10, 2025
2323f1d
Merge branch 'main' into tracker-final
VihasMakwana Mar 17, 2025
da29633
Merge branch 'main' into tracker-final
VihasMakwana Mar 17, 2025
07a6651
Merge branch 'main' into tracker-final
VihasMakwana Mar 18, 2025
f98fecc
Merge branch 'main' into tracker-final
VihasMakwana Mar 26, 2025
fa8c854
Merge branch 'main' into tracker-final
VihasMakwana Mar 28, 2025
cecb53e
nits
VihasMakwana Apr 1, 2025
34f9015
Merge branch 'archive-docs' into tracker-final
VihasMakwana Apr 1, 2025
a0e83d3
Merge branch 'main' into tracker-final
VihasMakwana Apr 1, 2025
3b65789
comments
VihasMakwana Apr 6, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions .chloggen/fileconsumer-archive.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: filelogreceiver

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Add a new feature called "archive" to store offsets older than 3 poll cycles on disk.

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [38056]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: []
4 changes: 3 additions & 1 deletion pkg/stanza/fileconsumer/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ func NewConfig() *Config {
MaxLogSize: reader.DefaultMaxLogSize,
Encoding: defaultEncoding,
FlushPeriod: reader.DefaultFlushPeriod,
PollsToArchive: 0,
Resolver: attrs.Resolver{
IncludeFileName: true,
},
Expand All @@ -88,7 +89,7 @@ type Config struct {
DeleteAfterRead bool `mapstructure:"delete_after_read,omitempty"`
IncludeFileRecordNumber bool `mapstructure:"include_file_record_number,omitempty"`
Compression string `mapstructure:"compression,omitempty"`
PollsToArchive int `mapstructure:"-"` // TODO: activate this config once archiving is set up
PollsToArchive int `mapstructure:"polls_to_archive,omitempty"`
AcquireFSLock bool `mapstructure:"acquire_fs_lock,omitempty"`
}

Expand Down Expand Up @@ -184,6 +185,7 @@ func (c Config) Build(set component.TelemetrySettings, emit emit.Callback, opts
maxBatches: c.MaxBatches,
telemetryBuilder: telemetryBuilder,
noTracking: o.noTracking,
pollsToArchive: c.PollsToArchive,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will always be 0 because in line 92 the field PollsToArchive is not mapped to a configuration setting. Is this intended?

PollsToArchive int `mapstructure:"-"` // TODO: activate this config once archiving is set up

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@andrzej-stencel I've added documentation and enabled the config in line 92. Can you take a look when you have a moment?

}, nil
}

Expand Down
172 changes: 172 additions & 0 deletions pkg/stanza/fileconsumer/design/archive.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,172 @@
# File archiving
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This document is full of typos and inconsistent formatting. Please take a pass through to clean it up.


With archiving enabled, file offsets older than three poll cycles are stored on disk rather than being discarded. This feature enabled fileconsumer to remember file for a longer period and also aims to use limited amount of memory.

## Settings exposed for archiving

1. `polls_to_archive`
- This settings control the number of poll cycles to archive (above the in-memory three poll cycle limit).
- If you set `polls_to_archive` to 10, then fileconsumer will store upto 10 poll cycles on disk.


## How does archiving work?

- We stores the offsets older than three poll cycles on disk. If we use `polls_to_archive: 10`, the on-disk structure looks like following:
![on-disk](images/on-disk.png)
- Once we hit the limit of `polls_to_archive` poll cycles, we roll over and overwrite oldest data. The on-disk structure represents a ring buffer
- We retain a total of 13 poll cycles: 3 cycles in memory and 10 cycles on disk.

Basic terminology before we proceed further:
1. `archiveIndex`: The `archiveIndex` refers to the on-disk position where the next data will be written.
2. `polls_to_archive`: This refers to number of poll cycles to archive or the maximum size of on-disk ring buffer

### How does reading from archiving work?

During reader creation, we group all the new (or unmatched) files and try to find a match in archive. From high level, it consists of following steps:
1. We start from most recently written index on archive and load the data from it.
2. If we don't have any unmatched files, we exit the loop.
3. We loop through all the unmatched files and the file's fingerprint is cross referenced against archive'd data.
a. If a match is found, we update the offset for the file
4. We move to next most recent index and continue from step 2.

Let's take a few examples to understand this:

- Consider the following structure,
![read-1](images/read-1.png)
- Here, we have stored data for previous eight poll cycles (3 poll cycles in memory + 5 on disk)
- When we enter the reading mode, we first read data from most recently written index.
- The most recently data is stored at `archiveIndex-1` because `archiveIndex` points to the position where the next data will be written.
- After evaluating data at this index, we move to the next most recent index.
- We continue this process until one of the following conditions is met:
- We have no unmatched files left.
- We have read through the entire archive.
- We encounter an empty value. This can happen if the archive is partially filled
- In above diagram, once we reach at the beginning of the archive (i.e. index `0`), we roll over and proceed to the next most recent index. In this case, it is index `9`, which contains no data.
- Let's take one more example where we have overwritten older data,
![read-2](images/read-2.png)
- Here, the archive is completely filled and we have rolled over overwriting older data.
- `archiveIndex` points to `4` i.e. the least recent data.
- We first load the most recent data (i.e. `archiveIndex-1`) and try to match offsets against it.
- Once we evaulate data from this index, we move to previous index and we continue this process until read through the entire archive

### How does writing to archive work?

Writing to archive is rather simple:

- At the end of each poll cycle, instead of purging the readers older than 3 cycles, we move that oldest readers to the archive.
- We write data to `archiveIndex` and increment the index. Consider the following image:
![write](images/write.png)
- Before the poll cycle, `archiveIndex` is pointed next to `5`.
- At the end of each poll cycle, we write the data to `archiveIndex` and increment the index.
- After the cycle, the on-disk structure looks like the one on the right.

## Archive restoration

Archive restoration is an important step if the user changes `polls_to_archive` setting. This section explains how changing this setting impacts the underlying disk structure after a collector run.

There are two cases to consider:
1. When `polls_to_archive` has increased. In other words, new archive will be larger than older one.
1. When `polls_to_archive` has decreased. In other words, the archive size has shrunk.

### Case 1: `polls_to_archive` has increased
This case is straightforward.

Consider following image,

![grown](images/grown-1.png)

The previous archive size was `10` and later it got changed to `15`. We just move the `archiveIndex` to next free slot. In this case, the next available slot is at index `10`.

### Case 2: `polls_to_archive` has decreased

There different sub-cases to consider.

#### Case 2.1: Most recently written index is in bounds w.r.t. new `polls_to_archive`

*Scenario 1: Most recently written index is in bounds and we have overwritten the data atleast once*

![case-3](images/case-3.png)
Following configurations are in for this case:
- previous `polls_to_archive` was `10`
- new `polls_to_archive` is `7`
- most recently written index is `4` (pointing to data `14`)
- `t.archiveIndex` i.e. least recently written index is `6`

Here, we can see that most recently written index (i.e. `4`) is in bounds w.r.t. new `polls_to_archive` (i.e. `7`). In other words, `most recently written index < new polls_to_archive`.

We now need to construct a new, smaller archive with 7 most recent elements.
These elements are (from most recent to least recent):

```14, 13, 12, 11, 10, 9, 8```

We do this in following manner:
- The elements on left of `archiveIndex` will always be included in the new archive. Hence, we don't touch them.
- We then take the remaining elements and reconstruct the archive.
- The remaining elements are equal to `new polls_to_archive - archiveIndex`.
- In above image, there are five elements on the left of `archiveIndex` and we will always include them.
- We take two most recent elements from the right side and include them in archive, discarding remaining

Pseudocode:
```go
if (storage[archiveIndex] == nil ) {
// we'll talk about this condition in scenario 2
return
}
most_recent_index := (t.archiveIndex-1) % previous_polls_to_archive // index 5 in above image
least_recent_index := (most_recent_index-new_polls_to_archive) % previous_polls_to_archive // index 8 in above image

for i := 0; i < new_polls_to_archive-archiveIndex; i++ {
storage[archiveIndex+i] = storage[least_recent_index] // rewrite on left side of storage
least_recent_index++
}
// archiveIndex remains unchanged in this case, as it's already pointing at the least recently written data.
```

*Scenario 2: Most recently written index is in bounds and we have not overwritten the data*

![case-4](images/case-4.png)

Following configurations are in for this case:
- previous `polls_to_archive` was `10`
- new `polls_to_archive` is `6`
- most recently written index is `5` (pointing to data `14`)
- `t.archiveIndex` i.e. least recently written index is `6`

If the slot pointed by `archiveIndex` is nil, it means that we haven't rolled over and that the next slots are empty and we don't need to perform any swapping.
In above pseudocode, the first condition handles this scenario.

#### Case 2.2: Most recently written index is out of bounds or at bounds w.r.t. new `polls_to_archive`

*Scenario 1: Most recently written index is out of bounds*

![case-2](images/case-2.png)

Following configurations are in for this case:
- previous `polls_to_archive` was `10`
- new `polls_to_archive` is `5`
- most recently writin index is `9`
- `t.archiveIndex` i.e. least recently written index is `0`

Here, we can see that most recently written index (i.e. `9`) is out of bounds w.r.t. new `polls_to_archive` (i.e. `5`). In other words, `most recently written index > new polls_to_archive`.

We take five (because new `polls_to_archive` is `5`) most recently written elements and construct a new, smaller archive.
Pseudocode:

```go
most_recent_index := (t.archiveIndex-1) % previous_polls_to_archive // index 9 in above image
least_recent_index := (most_recent_index-new_polls_to_archive) % previous_polls_to_archive // index 4 in above image

for i := 0; i < new_polls_to_archive; i++ {
storage[i] = storage[least_recent_index] // rewrite from beginning of storage
least_recent_index++
}
archiveIndex = 0 // point archiveIndex least recently written data
```

The new archive is represented by the lower list in the image above.

*Scenario 2: Most recently written index is at the bounds*

![case-1](images/case-1.png)

The pseudocode remains same and same steps are performed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
83 changes: 66 additions & 17 deletions pkg/stanza/fileconsumer/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,21 +204,16 @@ func (m *Manager) makeFingerprint(path string) (*fingerprint.Fingerprint, *os.Fi
// discarding any that have a duplicate fingerprint to other files that have already
// been read this polling interval
func (m *Manager) makeReaders(ctx context.Context, paths []string) {
var unmatchedFiles []*os.File
var unmatchedFingerprints []*fingerprint.Fingerprint

for _, path := range paths {
fp, file := m.makeFingerprint(path)
if fp == nil {
continue
}

// Exclude duplicate paths with the same content. This can happen when files are
// being rotated with copy/truncate strategy. (After copy, prior to truncate.)
if r := m.tracker.GetCurrentFile(fp); r != nil {
m.set.Logger.Debug("Skipping duplicate file", zap.String("path", file.Name()))
// re-add the reader as Match() removes duplicates
m.tracker.Add(r)
if err := file.Close(); err != nil {
m.set.Logger.Debug("problem closing file", zap.Error(err))
}
if m.excludeDuplicate(fp, file) {
continue
}

Expand All @@ -228,10 +223,69 @@ func (m *Manager) makeReaders(ctx context.Context, paths []string) {
continue
}

if r != nil {
m.tracker.Add(r)
continue
}
// aggregate unmatched fingerprint and file to match it against archive
unmatchedFingerprints = append(unmatchedFingerprints, fp)
unmatchedFiles = append(unmatchedFiles, file)
}

m.processUnmatchedFiles(ctx, unmatchedFiles, unmatchedFingerprints)
}

func (m *Manager) processUnmatchedFiles(ctx context.Context, files []*os.File, fingerprints []*fingerprint.Fingerprint) {
// processUnmatchedFiles accepts a list of unmatched files and their corresponding fingerprints
// and looks for a match in archive.
// If a match is found, it will create reader based on known metadata
// Else, it will create a new reader from scratch

metadataFromArchive := m.tracker.FindFiles(fingerprints)

for i, metadata := range metadataFromArchive {
file, fp := files[i], fingerprints[i]
if m.excludeDuplicate(fp, file) {
continue
}

var r *reader.Reader
var err error

if metadata != nil {
// matched metadata is found in archive, create a new reader from this metadata.
r, err = m.readerFactory.NewReaderFromMetadata(file, metadata)
} else {
// If we don't match any previously known files, create a new reader from scratch.
m.set.Logger.Info("Started watching file", zap.String("path", file.Name()))
r, err = m.readerFactory.NewReader(file, fp)
}

if err != nil {
m.set.Logger.Error("Failed to create reader", zap.Error(err))
continue
}

m.telemetryBuilder.FileconsumerOpenFiles.Add(ctx, 1)
m.tracker.Add(r)
}
}

func (m *Manager) excludeDuplicate(fp *fingerprint.Fingerprint, file *os.File) bool {
// excludeDuplicate return true if duplicate path is found with the same content and closes the duplicate.
// This can happen when files are being rotated with copy/truncate strategy. (After copy, prior to truncate.)
if r := m.tracker.GetCurrentFile(fp); r != nil {
m.set.Logger.Debug("Skipping duplicate file", zap.String("path", file.Name()))
// re-add the reader as Match() removes duplicates
m.tracker.Add(r)
if err := file.Close(); err != nil {
m.set.Logger.Debug("problem closing file", zap.Error(err))
}
return true
}
return false
}

func (m *Manager) newReader(ctx context.Context, file *os.File, fp *fingerprint.Fingerprint) (*reader.Reader, error) {
// Check previous poll cycle for match
if oldReader := m.tracker.GetOpenFile(fp); oldReader != nil {
Expand Down Expand Up @@ -261,14 +315,9 @@ func (m *Manager) newReader(ctx context.Context, file *os.File, fp *fingerprint.
return r, nil
}

// If we don't match any previously known files, create a new reader from scratch
m.set.Logger.Info("Started watching file", zap.String("path", file.Name()))
r, err := m.readerFactory.NewReader(file, fp)
if err != nil {
return nil, err
}
m.telemetryBuilder.FileconsumerOpenFiles.Add(ctx, 1)
return r, nil
// the file is not found in tracker.
// we'll create readers for such files after matching against the archive, in processUnmatchedFiles()
return nil, nil
}

func (m *Manager) instantiateTracker(ctx context.Context, persister operator.Persister) {
Expand Down
51 changes: 51 additions & 0 deletions pkg/stanza/fileconsumer/file_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1580,3 +1580,54 @@ func TestReadGzipCompressedLogsFromEnd(t *testing.T) {
operator.poll(context.TODO())
sink.ExpectToken(t, []byte("testlog4"))
}

func TestArchive(t *testing.T) {
t.Parallel()
persister := testutil.NewUnscopedMockPersister()

tempDir := t.TempDir()
cfg := NewConfig().includeDir(tempDir)
cfg.StartAt = "beginning"
cfg.PollsToArchive = 100
cfg.PollInterval = 100 * time.Millisecond

temp := filetest.OpenTempWithPattern(t, tempDir, "file.log")
filetest.WriteString(t, temp, "testlog1\n")

operator, sink := testManager(t, cfg)
require.NoError(t, operator.Start(persister))
defer func() {
require.NoError(t, operator.Stop())
}()

sink.ExpectCall(t, []byte("testlog1"), map[string]any{
attrs.LogFileName: filepath.Base(temp.Name()),
})

os.Remove(temp.Name())

time.Sleep(500 * time.Millisecond)

temp = filetest.OpenTempWithPattern(t, tempDir, "file.log")
filetest.WriteString(t, temp, "testlog1\n")
filetest.WriteString(t, temp, "testlog2\n")

sink.ExpectCall(t, []byte("testlog2"), map[string]any{
attrs.LogFileName: filepath.Base(temp.Name()),
})

os.Remove(temp.Name())

time.Sleep(500 * time.Millisecond)

temp = filetest.OpenTempWithPattern(t, tempDir, "file.log")
filetest.WriteString(t, temp, "testlog1\n")
filetest.WriteString(t, temp, "testlog2\n")
filetest.WriteString(t, temp, "testlog3\n")
filetest.WriteString(t, temp, "testlog4\n")

log3 := emit.Token{Body: []byte("testlog3"), Attributes: map[string]any{attrs.LogFileName: filepath.Base(temp.Name())}}
log4 := emit.Token{Body: []byte("testlog4"), Attributes: map[string]any{attrs.LogFileName: filepath.Base(temp.Name())}}

sink.ExpectCalls(t, log3, log4)
}
Loading
Loading