diff --git a/.chloggen/fileconsumer-archive.yaml b/.chloggen/fileconsumer-archive.yaml new file mode 100644 index 0000000000000..3f739abbe006c --- /dev/null +++ b/.chloggen/fileconsumer-archive.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: filelogreceiver + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Add a new feature called "archive" to store offsets older than 3 poll cycles on disk. + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [38056] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [] diff --git a/pkg/stanza/fileconsumer/config.go b/pkg/stanza/fileconsumer/config.go index e4744c04772b6..fb20e4c6409d6 100644 --- a/pkg/stanza/fileconsumer/config.go +++ b/pkg/stanza/fileconsumer/config.go @@ -63,6 +63,7 @@ func NewConfig() *Config { MaxLogSize: reader.DefaultMaxLogSize, Encoding: defaultEncoding, FlushPeriod: reader.DefaultFlushPeriod, + PollsToArchive: 0, Resolver: attrs.Resolver{ IncludeFileName: true, }, @@ -88,7 +89,7 @@ type Config struct { DeleteAfterRead bool `mapstructure:"delete_after_read,omitempty"` IncludeFileRecordNumber bool `mapstructure:"include_file_record_number,omitempty"` Compression string `mapstructure:"compression,omitempty"` - PollsToArchive int `mapstructure:"-"` // TODO: activate this config once archiving is set up + PollsToArchive int `mapstructure:"polls_to_archive,omitempty"` AcquireFSLock bool `mapstructure:"acquire_fs_lock,omitempty"` } @@ -184,6 +185,7 @@ func (c Config) Build(set component.TelemetrySettings, emit emit.Callback, opts maxBatches: c.MaxBatches, telemetryBuilder: telemetryBuilder, noTracking: o.noTracking, + pollsToArchive: c.PollsToArchive, }, nil } diff --git a/pkg/stanza/fileconsumer/design/archive.md b/pkg/stanza/fileconsumer/design/archive.md new file mode 100644 index 0000000000000..6382a0289b5e6 --- /dev/null +++ b/pkg/stanza/fileconsumer/design/archive.md @@ -0,0 +1,172 @@ +# File archiving + +With archiving enabled, file offsets older than three poll cycles are stored on disk rather than being discarded. This feature enabled fileconsumer to remember file for a longer period and also aims to use limited amount of memory. + +## Settings exposed for archiving + +1. `polls_to_archive` + - This settings control the number of poll cycles to archive (above the in-memory three poll cycle limit). + - If you set `polls_to_archive` to 10, then fileconsumer will store upto 10 poll cycles on disk. + + +## How does archiving work? + +- We stores the offsets older than three poll cycles on disk. If we use `polls_to_archive: 10`, the on-disk structure looks like following: +![on-disk](images/on-disk.png) + - Once we hit the limit of `polls_to_archive` poll cycles, we roll over and overwrite oldest data. The on-disk structure represents a ring buffer + - We retain a total of 13 poll cycles: 3 cycles in memory and 10 cycles on disk. + +Basic terminology before we proceed further: +1. `archiveIndex`: The `archiveIndex` refers to the on-disk position where the next data will be written. +2. `polls_to_archive`: This refers to number of poll cycles to archive or the maximum size of on-disk ring buffer + +### How does reading from archiving work? + +During reader creation, we group all the new (or unmatched) files and try to find a match in archive. From high level, it consists of following steps: +1. We start from most recently written index on archive and load the data from it. +2. If we don't have any unmatched files, we exit the loop. +3. We loop through all the unmatched files and the file's fingerprint is cross referenced against archive'd data. + a. If a match is found, we update the offset for the file +4. We move to next most recent index and continue from step 2. + +Let's take a few examples to understand this: + +- Consider the following structure, +![read-1](images/read-1.png) + - Here, we have stored data for previous eight poll cycles (3 poll cycles in memory + 5 on disk) + - When we enter the reading mode, we first read data from most recently written index. + - The most recently data is stored at `archiveIndex-1` because `archiveIndex` points to the position where the next data will be written. + - After evaluating data at this index, we move to the next most recent index. + - We continue this process until one of the following conditions is met: + - We have no unmatched files left. + - We have read through the entire archive. + - We encounter an empty value. This can happen if the archive is partially filled + - In above diagram, once we reach at the beginning of the archive (i.e. index `0`), we roll over and proceed to the next most recent index. In this case, it is index `9`, which contains no data. +- Let's take one more example where we have overwritten older data, +![read-2](images/read-2.png) + - Here, the archive is completely filled and we have rolled over overwriting older data. + - `archiveIndex` points to `4` i.e. the least recent data. + - We first load the most recent data (i.e. `archiveIndex-1`) and try to match offsets against it. + - Once we evaulate data from this index, we move to previous index and we continue this process until read through the entire archive + +### How does writing to archive work? + +Writing to archive is rather simple: + +- At the end of each poll cycle, instead of purging the readers older than 3 cycles, we move that oldest readers to the archive. +- We write data to `archiveIndex` and increment the index. Consider the following image: +![write](images/write.png) + - Before the poll cycle, `archiveIndex` is pointed next to `5`. + - At the end of each poll cycle, we write the data to `archiveIndex` and increment the index. + - After the cycle, the on-disk structure looks like the one on the right. + +## Archive restoration + +Archive restoration is an important step if the user changes `polls_to_archive` setting. This section explains how changing this setting impacts the underlying disk structure after a collector run. + +There are two cases to consider: +1. When `polls_to_archive` has increased. In other words, new archive will be larger than older one. +1. When `polls_to_archive` has decreased. In other words, the archive size has shrunk. + +### Case 1: `polls_to_archive` has increased +This case is straightforward. + +Consider following image, + +![grown](images/grown-1.png) + +The previous archive size was `10` and later it got changed to `15`. We just move the `archiveIndex` to next free slot. In this case, the next available slot is at index `10`. + +### Case 2: `polls_to_archive` has decreased + +There different sub-cases to consider. + +#### Case 2.1: Most recently written index is in bounds w.r.t. new `polls_to_archive` + +*Scenario 1: Most recently written index is in bounds and we have overwritten the data atleast once* + +![case-3](images/case-3.png) +Following configurations are in for this case: +- previous `polls_to_archive` was `10` +- new `polls_to_archive` is `7` +- most recently written index is `4` (pointing to data `14`) +- `t.archiveIndex` i.e. least recently written index is `6` + +Here, we can see that most recently written index (i.e. `4`) is in bounds w.r.t. new `polls_to_archive` (i.e. `7`). In other words, `most recently written index < new polls_to_archive`. + +We now need to construct a new, smaller archive with 7 most recent elements. +These elements are (from most recent to least recent): + +```14, 13, 12, 11, 10, 9, 8``` + +We do this in following manner: +- The elements on left of `archiveIndex` will always be included in the new archive. Hence, we don't touch them. +- We then take the remaining elements and reconstruct the archive. + - The remaining elements are equal to `new polls_to_archive - archiveIndex`. + - In above image, there are five elements on the left of `archiveIndex` and we will always include them. + - We take two most recent elements from the right side and include them in archive, discarding remaining + +Pseudocode: +```go +if (storage[archiveIndex] == nil ) { + // we'll talk about this condition in scenario 2 + return +} +most_recent_index := (t.archiveIndex-1) % previous_polls_to_archive // index 5 in above image +least_recent_index := (most_recent_index-new_polls_to_archive) % previous_polls_to_archive // index 8 in above image + +for i := 0; i < new_polls_to_archive-archiveIndex; i++ { + storage[archiveIndex+i] = storage[least_recent_index] // rewrite on left side of storage + least_recent_index++ +} +// archiveIndex remains unchanged in this case, as it's already pointing at the least recently written data. +``` + +*Scenario 2: Most recently written index is in bounds and we have not overwritten the data* + +![case-4](images/case-4.png) + +Following configurations are in for this case: +- previous `polls_to_archive` was `10` +- new `polls_to_archive` is `6` +- most recently written index is `5` (pointing to data `14`) +- `t.archiveIndex` i.e. least recently written index is `6` + +If the slot pointed by `archiveIndex` is nil, it means that we haven't rolled over and that the next slots are empty and we don't need to perform any swapping. +In above pseudocode, the first condition handles this scenario. + +#### Case 2.2: Most recently written index is out of bounds or at bounds w.r.t. new `polls_to_archive` + +*Scenario 1: Most recently written index is out of bounds* + +![case-2](images/case-2.png) + +Following configurations are in for this case: +- previous `polls_to_archive` was `10` +- new `polls_to_archive` is `5` +- most recently writin index is `9` +- `t.archiveIndex` i.e. least recently written index is `0` + +Here, we can see that most recently written index (i.e. `9`) is out of bounds w.r.t. new `polls_to_archive` (i.e. `5`). In other words, `most recently written index > new polls_to_archive`. + +We take five (because new `polls_to_archive` is `5`) most recently written elements and construct a new, smaller archive. +Pseudocode: + +```go +most_recent_index := (t.archiveIndex-1) % previous_polls_to_archive // index 9 in above image +least_recent_index := (most_recent_index-new_polls_to_archive) % previous_polls_to_archive // index 4 in above image + +for i := 0; i < new_polls_to_archive; i++ { + storage[i] = storage[least_recent_index] // rewrite from beginning of storage + least_recent_index++ +} +archiveIndex = 0 // point archiveIndex least recently written data +``` + +The new archive is represented by the lower list in the image above. + +*Scenario 2: Most recently written index is at the bounds* + +![case-1](images/case-1.png) + +The pseudocode remains same and same steps are performed. diff --git a/pkg/stanza/fileconsumer/design.md b/pkg/stanza/fileconsumer/design/design.md similarity index 100% rename from pkg/stanza/fileconsumer/design.md rename to pkg/stanza/fileconsumer/design/design.md diff --git a/pkg/stanza/fileconsumer/design/images/case-1.png b/pkg/stanza/fileconsumer/design/images/case-1.png new file mode 100644 index 0000000000000..c5f70bd8c20b5 Binary files /dev/null and b/pkg/stanza/fileconsumer/design/images/case-1.png differ diff --git a/pkg/stanza/fileconsumer/design/images/case-2.png b/pkg/stanza/fileconsumer/design/images/case-2.png new file mode 100644 index 0000000000000..7c250799db1cc Binary files /dev/null and b/pkg/stanza/fileconsumer/design/images/case-2.png differ diff --git a/pkg/stanza/fileconsumer/design/images/case-3.png b/pkg/stanza/fileconsumer/design/images/case-3.png new file mode 100644 index 0000000000000..e6391009008ac Binary files /dev/null and b/pkg/stanza/fileconsumer/design/images/case-3.png differ diff --git a/pkg/stanza/fileconsumer/design/images/case-4.png b/pkg/stanza/fileconsumer/design/images/case-4.png new file mode 100644 index 0000000000000..f327ec038f948 Binary files /dev/null and b/pkg/stanza/fileconsumer/design/images/case-4.png differ diff --git a/pkg/stanza/fileconsumer/design/images/grown-1.png b/pkg/stanza/fileconsumer/design/images/grown-1.png new file mode 100644 index 0000000000000..ad50ebd6b4105 Binary files /dev/null and b/pkg/stanza/fileconsumer/design/images/grown-1.png differ diff --git a/pkg/stanza/fileconsumer/design/images/on-disk.png b/pkg/stanza/fileconsumer/design/images/on-disk.png new file mode 100644 index 0000000000000..e2b02ced60fb8 Binary files /dev/null and b/pkg/stanza/fileconsumer/design/images/on-disk.png differ diff --git a/pkg/stanza/fileconsumer/design/images/read-1.png b/pkg/stanza/fileconsumer/design/images/read-1.png new file mode 100644 index 0000000000000..621c23b4f730b Binary files /dev/null and b/pkg/stanza/fileconsumer/design/images/read-1.png differ diff --git a/pkg/stanza/fileconsumer/design/images/read-2.png b/pkg/stanza/fileconsumer/design/images/read-2.png new file mode 100644 index 0000000000000..ae801964b9445 Binary files /dev/null and b/pkg/stanza/fileconsumer/design/images/read-2.png differ diff --git a/pkg/stanza/fileconsumer/design/images/write.png b/pkg/stanza/fileconsumer/design/images/write.png new file mode 100644 index 0000000000000..82267f281af90 Binary files /dev/null and b/pkg/stanza/fileconsumer/design/images/write.png differ diff --git a/pkg/stanza/fileconsumer/file.go b/pkg/stanza/fileconsumer/file.go index e8119e36a7a08..2845f36134f1a 100644 --- a/pkg/stanza/fileconsumer/file.go +++ b/pkg/stanza/fileconsumer/file.go @@ -204,21 +204,16 @@ func (m *Manager) makeFingerprint(path string) (*fingerprint.Fingerprint, *os.Fi // discarding any that have a duplicate fingerprint to other files that have already // been read this polling interval func (m *Manager) makeReaders(ctx context.Context, paths []string) { + var unmatchedFiles []*os.File + var unmatchedFingerprints []*fingerprint.Fingerprint + for _, path := range paths { fp, file := m.makeFingerprint(path) if fp == nil { continue } - // Exclude duplicate paths with the same content. This can happen when files are - // being rotated with copy/truncate strategy. (After copy, prior to truncate.) - if r := m.tracker.GetCurrentFile(fp); r != nil { - m.set.Logger.Debug("Skipping duplicate file", zap.String("path", file.Name())) - // re-add the reader as Match() removes duplicates - m.tracker.Add(r) - if err := file.Close(); err != nil { - m.set.Logger.Debug("problem closing file", zap.Error(err)) - } + if m.excludeDuplicate(fp, file) { continue } @@ -228,10 +223,69 @@ func (m *Manager) makeReaders(ctx context.Context, paths []string) { continue } + if r != nil { + m.tracker.Add(r) + continue + } + // aggregate unmatched fingerprint and file to match it against archive + unmatchedFingerprints = append(unmatchedFingerprints, fp) + unmatchedFiles = append(unmatchedFiles, file) + } + + m.processUnmatchedFiles(ctx, unmatchedFiles, unmatchedFingerprints) +} + +func (m *Manager) processUnmatchedFiles(ctx context.Context, files []*os.File, fingerprints []*fingerprint.Fingerprint) { + // processUnmatchedFiles accepts a list of unmatched files and their corresponding fingerprints + // and looks for a match in archive. + // If a match is found, it will create reader based on known metadata + // Else, it will create a new reader from scratch + + metadataFromArchive := m.tracker.FindFiles(fingerprints) + + for i, metadata := range metadataFromArchive { + file, fp := files[i], fingerprints[i] + if m.excludeDuplicate(fp, file) { + continue + } + + var r *reader.Reader + var err error + + if metadata != nil { + // matched metadata is found in archive, create a new reader from this metadata. + r, err = m.readerFactory.NewReaderFromMetadata(file, metadata) + } else { + // If we don't match any previously known files, create a new reader from scratch. + m.set.Logger.Info("Started watching file", zap.String("path", file.Name())) + r, err = m.readerFactory.NewReader(file, fp) + } + + if err != nil { + m.set.Logger.Error("Failed to create reader", zap.Error(err)) + continue + } + + m.telemetryBuilder.FileconsumerOpenFiles.Add(ctx, 1) m.tracker.Add(r) } } +func (m *Manager) excludeDuplicate(fp *fingerprint.Fingerprint, file *os.File) bool { + // excludeDuplicate return true if duplicate path is found with the same content and closes the duplicate. + // This can happen when files are being rotated with copy/truncate strategy. (After copy, prior to truncate.) + if r := m.tracker.GetCurrentFile(fp); r != nil { + m.set.Logger.Debug("Skipping duplicate file", zap.String("path", file.Name())) + // re-add the reader as Match() removes duplicates + m.tracker.Add(r) + if err := file.Close(); err != nil { + m.set.Logger.Debug("problem closing file", zap.Error(err)) + } + return true + } + return false +} + func (m *Manager) newReader(ctx context.Context, file *os.File, fp *fingerprint.Fingerprint) (*reader.Reader, error) { // Check previous poll cycle for match if oldReader := m.tracker.GetOpenFile(fp); oldReader != nil { @@ -261,14 +315,9 @@ func (m *Manager) newReader(ctx context.Context, file *os.File, fp *fingerprint. return r, nil } - // If we don't match any previously known files, create a new reader from scratch - m.set.Logger.Info("Started watching file", zap.String("path", file.Name())) - r, err := m.readerFactory.NewReader(file, fp) - if err != nil { - return nil, err - } - m.telemetryBuilder.FileconsumerOpenFiles.Add(ctx, 1) - return r, nil + // the file is not found in tracker. + // we'll create readers for such files after matching against the archive, in processUnmatchedFiles() + return nil, nil } func (m *Manager) instantiateTracker(ctx context.Context, persister operator.Persister) { diff --git a/pkg/stanza/fileconsumer/file_test.go b/pkg/stanza/fileconsumer/file_test.go index a657026d8b2ed..ab39f8bbb5f5d 100644 --- a/pkg/stanza/fileconsumer/file_test.go +++ b/pkg/stanza/fileconsumer/file_test.go @@ -1580,3 +1580,54 @@ func TestReadGzipCompressedLogsFromEnd(t *testing.T) { operator.poll(context.TODO()) sink.ExpectToken(t, []byte("testlog4")) } + +func TestArchive(t *testing.T) { + t.Parallel() + persister := testutil.NewUnscopedMockPersister() + + tempDir := t.TempDir() + cfg := NewConfig().includeDir(tempDir) + cfg.StartAt = "beginning" + cfg.PollsToArchive = 100 + cfg.PollInterval = 100 * time.Millisecond + + temp := filetest.OpenTempWithPattern(t, tempDir, "file.log") + filetest.WriteString(t, temp, "testlog1\n") + + operator, sink := testManager(t, cfg) + require.NoError(t, operator.Start(persister)) + defer func() { + require.NoError(t, operator.Stop()) + }() + + sink.ExpectCall(t, []byte("testlog1"), map[string]any{ + attrs.LogFileName: filepath.Base(temp.Name()), + }) + + os.Remove(temp.Name()) + + time.Sleep(500 * time.Millisecond) + + temp = filetest.OpenTempWithPattern(t, tempDir, "file.log") + filetest.WriteString(t, temp, "testlog1\n") + filetest.WriteString(t, temp, "testlog2\n") + + sink.ExpectCall(t, []byte("testlog2"), map[string]any{ + attrs.LogFileName: filepath.Base(temp.Name()), + }) + + os.Remove(temp.Name()) + + time.Sleep(500 * time.Millisecond) + + temp = filetest.OpenTempWithPattern(t, tempDir, "file.log") + filetest.WriteString(t, temp, "testlog1\n") + filetest.WriteString(t, temp, "testlog2\n") + filetest.WriteString(t, temp, "testlog3\n") + filetest.WriteString(t, temp, "testlog4\n") + + log3 := emit.Token{Body: []byte("testlog3"), Attributes: map[string]any{attrs.LogFileName: filepath.Base(temp.Name())}} + log4 := emit.Token{Body: []byte("testlog4"), Attributes: map[string]any{attrs.LogFileName: filepath.Base(temp.Name())}} + + sink.ExpectCalls(t, log3, log4) +} diff --git a/pkg/stanza/fileconsumer/internal/tracker/tracker.go b/pkg/stanza/fileconsumer/internal/tracker/tracker.go index c7e6e897f0e3a..82f96a43d854a 100644 --- a/pkg/stanza/fileconsumer/internal/tracker/tracker.go +++ b/pkg/stanza/fileconsumer/internal/tracker/tracker.go @@ -327,6 +327,11 @@ func (t *fileTracker) FindFiles(fps []*fingerprint.Fingerprint) []*reader.Metada nextIndex := t.archiveIndex matchedMetadata := make([]*reader.Metadata, len(fps)) + if !t.archiveEnabled() { + // we make an early exit if archiving is disabled. This would create new readers from scratch + // we return an array of "nil"s to make things easier while creating readers + return matchedMetadata + } // continue executing the loop until either all records are matched or all archive sets have been processed. for i := 0; i < t.pollsToArchive; i++ { // Update the mostRecentIndex @@ -420,7 +425,10 @@ func (t *noStateTracker) EndPoll() {} func (t *noStateTracker) TotalReaders() int { return 0 } -func (t *noStateTracker) FindFiles([]*fingerprint.Fingerprint) []*reader.Metadata { return nil } +func (t *noStateTracker) FindFiles(fps []*fingerprint.Fingerprint) []*reader.Metadata { + // we return an array of "nil"s to make things easier while creating readers + return make([]*reader.Metadata, len(fps)) +} func encodeIndex(val int) []byte { var buf bytes.Buffer diff --git a/receiver/filelogreceiver/README.md b/receiver/filelogreceiver/README.md index 4ff813736f437..8441070abd8a2 100644 --- a/receiver/filelogreceiver/README.md +++ b/receiver/filelogreceiver/README.md @@ -63,6 +63,7 @@ Tails and parses logs from files. | `ordering_criteria.sort_by.format` | | Relevant if `sort_type` is set to `timestamp`. Defines the strptime format of the timestamp being sorted. | | `ordering_criteria.sort_by.ascending` | | Sort direction | | `compression` | | Indicate the compression format of input files. If set accordingly, files will be read using a reader that uncompresses the file before scanning its content. Options are `` or `gzip` | +| `polls_to_archive` | unset (`0`) | Enables [archiving](../../pkg/stanza/fileconsumer/design/archive.md). With archiving enabled, file offsets older than three poll cycles are stored on disk rather than being discarded | Note that _by default_, no logs will be read from a file that is not actively being written to because `start_at` defaults to `end`. @@ -211,7 +212,7 @@ For additional resiliency, see [Fault tolerant log collection example](../../exa Here is some of the information the file log receiver stores: - The number of files it is currently tracking (`knownFiles`). - For each file being tracked: - - The [fingerprint](../../pkg/stanza/fileconsumer/design.md#fingerprints) of the file (`Fingerprint.first_bytes`). + - The [fingerprint](../../pkg/stanza/fileconsumer/design/design.md#fingerprints) of the file (`Fingerprint.first_bytes`). - The byte offset from the start of the file, indicating the position in the file from where the file log receiver continues reading the file (`Offset`). - An arbitrary set of file attributes, such as the name of the file (`FileAttributes`).