Skip to content

Commit 2cecf8b

Browse files
gnak-yarandrzej-stencel
authored andcommitted
[chore][pkg/stanza] Fixed broken benchmarks (open-telemetry#43190)
<!--Ex. Fixing a bug - Describe the bug and how this fixes the issue. Ex. Adding a feature - Explain what this achieves.--> #### Description This is another fix of open-telemetry#43044. Probably one more PR will be needed! Addressed 2 of the benchmark failures in `pkg/stanza/adapter` here. ``` --- FAIL: BenchmarkReadLine receiver_test.go:285: Error Trace: /Users/ray.kang/github.com/gnak-yar/opentelemetry-collector-contrib/pkg/stanza/adapter/receiver_test.go:285 Error: Received unexpected error: '' unsupported type 'file_input' Test: BenchmarkReadLine --- FAIL: BenchmarkParseAndMap receiver_test.go:369: Error Trace: /Users/ray.kang/github.com/gnak-yar/opentelemetry-collector-contrib/pkg/stanza/adapter/receiver_test.go:369 Error: Received unexpected error: unsupported type 'file_input' Test: BenchmarkParseAndMap ``` **BenchmarkReadLine** The `file_input` operator should be registered to a global operator registry before run. The registration is done when the file input operator package is imported, but the package isn't imported so the error occurs. I've removed the benchmark instead of fixing it. Here's the reason. - As far as I understand, this benchmark is to see the `file_input` operator's performance, but we have other benchmarks doing almost the same thing. - [file.BenchmarkReadExistingLogs](https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/7e4bf11279e5b454980626237ccd1417b7e0e26a/pkg/stanza/operator/input/file/benchmark_test.go#L23) - [fileconsumer.BenchmarkFileInput](https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/7e4bf11279e5b454980626237ccd1417b7e0e26a/pkg/stanza/fileconsumer/benchmark_test.go#L31) - Even the test seems a bit weird to me because the `b.N` loop(`for i := 0; i < b.N; i++ {`) is used for log generation, not for a function to be measured. **BenchmarkParseAndMap** Two reasons of the failure - The `file_input` package is not imported as well. - Unmarshaling yaml configs(line 369 before change) does not work well, as the configs are not supposed to be loaded from yaml directly. For example, the following structure is a part of `fileconsumer.Config`. `yaml.Unmarshaler` does not understand the embeded fields by default(Probably a more tag like `yaml:",inline"` is required). ```go type Config struct { matcher.Criteria `mapstructure:",squash"` attrs.Resolver `mapstructure:",squash"` ... } ``` As far as I understand, `BenchmarkParseAndMap` is to see the performance of `regex_parser`'s severity mappings. So, I've moved the benchmark to `regex_parser` and made it focus on the operator's performance. I'm not familiar with the full history and background. Please let me know if you have any concerns or suggestions! <!-- Issue number (e.g. open-telemetry#1234) or full URL to issue, if applicable. --> #### Link to tracking issue Fixes open-telemetry#43044 <!--Describe what testing was performed and which tests were added.--> #### Testing ``` cd pkg/stanza/adapter go test -bench='^Benchmark(ReadLine|ParseAndMap)$' # For testing a benchmark moved to the regex parser. cd pkg/stanza/operator/parser/regex go test -bench='^BenchmarkProcessBatch' ``` --------- Co-authored-by: Andrzej Stencel <[email protected]>
1 parent 4386067 commit 2cecf8b

File tree

2 files changed

+50
-148
lines changed

2 files changed

+50
-148
lines changed

pkg/stanza/adapter/receiver_test.go

Lines changed: 0 additions & 148 deletions
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,6 @@ import (
77
"context"
88
"fmt"
99
"math"
10-
"os"
11-
"path/filepath"
1210
"sync/atomic"
1311
"testing"
1412
"time"
@@ -17,13 +15,11 @@ import (
1715
"github.com/stretchr/testify/require"
1816
"go.opentelemetry.io/collector/component"
1917
"go.opentelemetry.io/collector/component/componenttest"
20-
"go.opentelemetry.io/collector/confmap/confmaptest"
2118
"go.opentelemetry.io/collector/consumer"
2219
"go.opentelemetry.io/collector/consumer/consumertest"
2320
"go.opentelemetry.io/collector/pdata/plog"
2421
"go.opentelemetry.io/collector/receiver/receiverhelper"
2522
"go.opentelemetry.io/collector/receiver/receivertest"
26-
"gopkg.in/yaml.v3"
2723

2824
"github.com/open-telemetry/opentelemetry-collector-contrib/extension/storage/storagetest"
2925
"github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal/consumerretry"
@@ -259,150 +255,6 @@ func benchmarkReceiver(b *testing.B, logsPerIteration int, batchingInput, batchi
259255
require.NoError(b, rcv.Shutdown(b.Context()))
260256
}
261257

262-
func BenchmarkReadLine(b *testing.B) {
263-
receivedAllLogs := make(chan struct{})
264-
filePath := filepath.Join(b.TempDir(), "bench.log")
265-
266-
pipelineYaml := fmt.Sprintf(`
267-
pipeline:
268-
type: file_input
269-
include:
270-
- %s
271-
start_at: beginning`,
272-
filePath)
273-
274-
confmapFilePath := filepath.Join(b.TempDir(), "conf.yaml")
275-
require.NoError(b, os.WriteFile(confmapFilePath, []byte(pipelineYaml), 0o600))
276-
277-
testConfMaps, err := confmaptest.LoadConf(confmapFilePath)
278-
require.NoError(b, err)
279-
280-
conf, err := testConfMaps.Sub("pipeline")
281-
require.NoError(b, err)
282-
require.NotNil(b, conf)
283-
284-
operatorCfg := operator.Config{}
285-
require.NoError(b, conf.Unmarshal(&operatorCfg))
286-
287-
operatorCfgs := []operator.Config{operatorCfg}
288-
289-
storageClient := storagetest.NewInMemoryClient(
290-
component.KindReceiver,
291-
component.MustNewID("foolog"),
292-
"test",
293-
)
294-
295-
obsrecv, err := receiverhelper.NewObsReport(receiverhelper.ObsReportSettings{ReceiverCreateSettings: receivertest.NewNopSettings(component.MustNewType("foolog"))})
296-
require.NoError(b, err)
297-
298-
mockConsumer := &testConsumer{
299-
receivedAllLogs: receivedAllLogs,
300-
expectedLogs: uint32(b.N),
301-
receivedLogs: atomic.Uint32{},
302-
}
303-
rcv := &receiver{
304-
consumer: mockConsumer,
305-
obsrecv: obsrecv,
306-
storageClient: storageClient,
307-
}
308-
309-
set := componenttest.NewNopTelemetrySettings()
310-
emitter := helper.NewBatchingLogEmitter(set, rcv.consumeEntries)
311-
defer func() {
312-
require.NoError(b, emitter.Stop())
313-
}()
314-
315-
pipe, err := pipeline.Config{
316-
Operators: operatorCfgs,
317-
DefaultOutput: emitter,
318-
}.Build(set)
319-
require.NoError(b, err)
320-
321-
rcv.pipe = pipe
322-
rcv.set = set
323-
rcv.emitter = emitter
324-
325-
// Populate the file that will be consumed
326-
file, err := os.OpenFile(filePath, os.O_RDWR|os.O_CREATE, 0o666)
327-
require.NoError(b, err)
328-
for i := 0; i < b.N; i++ {
329-
_, err := file.WriteString("testlog\n")
330-
require.NoError(b, err)
331-
}
332-
333-
// Run the actual benchmark
334-
b.ResetTimer()
335-
require.NoError(b, rcv.Start(b.Context(), nil))
336-
337-
<-receivedAllLogs
338-
339-
require.NoError(b, rcv.Shutdown(b.Context()))
340-
}
341-
342-
func BenchmarkParseAndMap(b *testing.B) {
343-
filePath := filepath.Join(b.TempDir(), "bench.log")
344-
345-
fileInputYaml := fmt.Sprintf(`
346-
- type: file_input
347-
include:
348-
- %s
349-
start_at: beginning`, filePath)
350-
351-
regexParserYaml := `
352-
- type: regex_parser
353-
regex: '(?P<remote_host>[^\s]+) - (?P<remote_user>[^\s]+) \[(?P<timestamp>[^\]]+)\] "(?P<http_method>[A-Z]+) (?P<path>[^\s]+)[^"]+" (?P<http_status>\d+) (?P<bytes_sent>[^\s]+)'
354-
timestamp:
355-
parse_from: timestamp
356-
layout: '%d/%b/%Y:%H:%M:%S %z'
357-
severity:
358-
parse_from: http_status
359-
preserve: true
360-
mapping:
361-
critical: 5xx
362-
error: 4xx
363-
info: 3xx
364-
debug: 2xx`
365-
366-
pipelineYaml := fmt.Sprintf("%s%s", fileInputYaml, regexParserYaml)
367-
368-
var operatorCfgs []operator.Config
369-
require.NoError(b, yaml.Unmarshal([]byte(pipelineYaml), &operatorCfgs))
370-
371-
set := componenttest.NewNopTelemetrySettings()
372-
emitter := helper.NewBatchingLogEmitter(set, func(_ context.Context, entries []*entry.Entry) {
373-
for _, e := range entries {
374-
convert(e)
375-
}
376-
})
377-
defer func() {
378-
require.NoError(b, emitter.Stop())
379-
}()
380-
381-
pipe, err := pipeline.Config{
382-
Operators: operatorCfgs,
383-
DefaultOutput: emitter,
384-
}.Build(set)
385-
require.NoError(b, err)
386-
387-
// Populate the file that will be consumed
388-
file, err := os.OpenFile(filePath, os.O_RDWR|os.O_CREATE, 0o666)
389-
require.NoError(b, err)
390-
for i := 0; i < b.N; i++ {
391-
_, err := fmt.Fprintf(file, "10.33.121.119 - - [11/Aug/2020:00:00:00 -0400] \"GET /index.html HTTP/1.1\" 404 %d\n", i%1000)
392-
require.NoError(b, err)
393-
}
394-
395-
storageClient := storagetest.NewInMemoryClient(
396-
component.KindReceiver,
397-
component.MustNewID("foolog"),
398-
"test",
399-
)
400-
401-
// Run the actual benchmark
402-
b.ResetTimer()
403-
require.NoError(b, pipe.Start(storageClient))
404-
}
405-
406258
const testInputOperatorTypeStr = "test_input"
407259

408260
type testInputBuilder struct {
Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
// Copyright The OpenTelemetry Authors
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
package regex
5+
6+
import (
7+
"fmt"
8+
"testing"
9+
10+
"github.com/stretchr/testify/require"
11+
"go.opentelemetry.io/collector/component/componenttest"
12+
13+
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/entry"
14+
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/helper"
15+
)
16+
17+
func BenchmarkProcessBatch(b *testing.B) {
18+
b.Run("SeverityMapping", func(b *testing.B) {
19+
config := NewConfig()
20+
config.OnError = helper.SendOnError
21+
config.Regex = `(?P<remote_host>[^\s]+) - (?P<remote_user>[^\s]+) \[(?P<timestamp>[^\]]+)\] "(?P<http_method>[A-Z]+) (?P<path>[^\s]+) [^"]+" (?P<http_status>\d+) (?P<bytes_sent>[^\s]+)`
22+
config.TimeParser = &helper.TimeParser{
23+
ParseFrom: func() *entry.Field { f := entry.NewAttributeField("timestamp"); return &f }(),
24+
Layout: "%d/%b/%Y:%H:%M:%S %z",
25+
LayoutType: helper.StrptimeKey,
26+
}
27+
config.SeverityConfig = &helper.SeverityConfig{
28+
ParseFrom: func() *entry.Field { f := entry.NewAttributeField("http_status"); return &f }(),
29+
Mapping: map[string]any{
30+
"critical": "5xx",
31+
"error": "4xx",
32+
"info": "3xx",
33+
"debug": "2xx",
34+
},
35+
}
36+
37+
op, err := config.Build(componenttest.NewNopTelemetrySettings())
38+
require.NoError(b, err)
39+
40+
entries := make([]*entry.Entry, 1000000)
41+
for i := range 1000 {
42+
entries[i] = entry.New()
43+
entries[i].Body = fmt.Sprintf("10.33.121.119 - - [11/Aug/2020:00:00:00 -0400] \"GET /index.html HTTP/1.1\" 404 %d\n", i%1000)
44+
}
45+
46+
for b.Loop() {
47+
require.NoError(b, op.ProcessBatch(b.Context(), entries))
48+
}
49+
})
50+
}

0 commit comments

Comments
 (0)