Skip to content

Commit cec3b02

Browse files
committed
Fix rate limit internal counter
Signed-off-by: Israel Blancas <[email protected]>
1 parent 3351cc6 commit cec3b02

14 files changed

+954
-94
lines changed

.chloggen/40811.yaml

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
# Use this changelog template to create an entry for release notes.
2+
3+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
4+
change_type: bug_fix
5+
6+
# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
7+
component: coralogixexporter
8+
9+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
10+
note: Fix rate limit error count reset
11+
12+
# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
13+
issues: [40811]
14+
15+
# (Optional) One or more lines of additional information to render under the primary note.
16+
# These lines will be padded with 2 spaces and then inserted directly into the document.
17+
# Use pipe (|) for multiline entries.
18+
subtext: |
19+
The rate limit error count was not reset just after a successful request.
20+
21+
Also, we are printing now when the rate limit is triggered.
22+
23+
# If your change doesn't affect end users or the exported elements of any package,
24+
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
25+
# Optional: The change log or logs in which this entry should be included.
26+
# e.g. '[user]' or '[user, api]'
27+
# Include 'user' if the change is relevant to end users.
28+
# Include 'api' if there is a change to a library API.
29+
# Default: '[user]'
30+
change_logs: []
Lines changed: 246 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,246 @@
1+
// Copyright The OpenTelemetry Authors
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
package coralogixexporter
5+
6+
import (
7+
"context"
8+
"os"
9+
"testing"
10+
"time"
11+
12+
"github.com/stretchr/testify/assert"
13+
"github.com/stretchr/testify/require"
14+
"go.opentelemetry.io/collector/component/componenttest"
15+
"go.opentelemetry.io/collector/config/configopaque"
16+
"go.opentelemetry.io/collector/exporter/exportertest"
17+
"go.opentelemetry.io/collector/pdata/pcommon"
18+
"go.opentelemetry.io/collector/pdata/ptrace"
19+
)
20+
21+
func Test_E2E_RateLimit_HappyPath(t *testing.T) {
22+
privateKey := os.Getenv("CORALOGIX_PRIVATE_KEY")
23+
if privateKey == "" {
24+
t.Skip("Skipping E2E test: CORALOGIX_PRIVATE_KEY not set")
25+
}
26+
27+
cfg := &Config{
28+
Domain: "eu2.coralogix.com",
29+
PrivateKey: configopaque.String(privateKey),
30+
RateLimiter: RateLimiterConfig{
31+
Enabled: true,
32+
Threshold: 5,
33+
Duration: 30 * time.Second,
34+
},
35+
AppName: "e2e-test-app",
36+
SubSystem: "e2e-test-subsystem",
37+
}
38+
39+
t.Log("Creating exporter")
40+
exp, err := newTracesExporter(cfg, exportertest.NewNopSettings(exportertest.NopType))
41+
require.NoError(t, err)
42+
43+
t.Log("Starting exporter")
44+
err = exp.start(context.Background(), componenttest.NewNopHost())
45+
require.NoError(t, err)
46+
defer func() {
47+
t.Log("Shutting down exporter")
48+
err = exp.shutdown(context.Background())
49+
require.NoError(t, err)
50+
}()
51+
52+
start := time.Now()
53+
54+
t.Log("Sending data for 5 mins")
55+
count := 0
56+
traces := generateTestTraces()
57+
for time.Since(start) < 5*time.Minute {
58+
err = exp.pushTraces(context.Background(), traces)
59+
require.NoError(t, err)
60+
if count%10 == 0 {
61+
t.Logf("Sending traces %d", count)
62+
}
63+
count++
64+
}
65+
}
66+
67+
func Test_E2E_InvalidKeyRateLimit(t *testing.T) {
68+
privateKey := os.Getenv("CORALOGIX_PRIVATE_KEY_LOW_QUOTA")
69+
if privateKey == "" {
70+
t.Skip("Skipping E2E test: CORALOGIX_PRIVATE_KEY_LOW_QUOTA not set")
71+
}
72+
73+
cfg := &Config{
74+
Domain: "eu2.coralogix.com",
75+
PrivateKey: configopaque.String("invalid-key"),
76+
RateLimiter: RateLimiterConfig{
77+
Enabled: true,
78+
Threshold: 5,
79+
Duration: 30 * time.Second,
80+
},
81+
AppName: "e2e-test-app",
82+
SubSystem: "e2e-test-subsystem",
83+
}
84+
85+
t.Log("Creating exporter")
86+
exp, err := newTracesExporter(cfg, exportertest.NewNopSettings(exportertest.NopType))
87+
require.NoError(t, err)
88+
89+
t.Log("Starting exporter")
90+
err = exp.start(context.Background(), componenttest.NewNopHost())
91+
require.NoError(t, err)
92+
defer func() {
93+
t.Log("Shutting down exporter")
94+
err = exp.shutdown(context.Background())
95+
require.NoError(t, err)
96+
}()
97+
98+
t.Log("Phase 1: Sending data until rate limiter is activated")
99+
rateLimited := false
100+
errorCount := 0
101+
rateLimitedCount := 0
102+
103+
for !rateLimited {
104+
traces := generateTestTraces()
105+
err = exp.pushTraces(context.Background(), traces)
106+
if err != nil {
107+
errorCount++
108+
if exp.rateError.isRateLimited() {
109+
rateLimitedCount++
110+
rateLimited = true
111+
t.Logf("Rate limiter activated! Error count: %d, Rate limited count: %d", errorCount, rateLimitedCount)
112+
} else {
113+
t.Logf("Received error (not rate limited yet). Error count: %d", errorCount)
114+
}
115+
}
116+
}
117+
118+
require.True(t, rateLimited, "Rate limiter should have been activated")
119+
120+
t.Log("Phase 2: trying to send data during rate limit period (but will be rate limited)")
121+
rateLimitStart := time.Now()
122+
rateLimitEnabled := false
123+
124+
count := 0
125+
for time.Since(rateLimitStart) < cfg.RateLimiter.Duration {
126+
traces := generateTestTraces()
127+
err = exp.pushTraces(context.Background(), traces)
128+
if err != nil && exp.rateError.isRateLimited() {
129+
if count%5000 == 0 { // Do not spam the logs
130+
t.Logf("Tried to send data but limited for %s", cfg.RateLimiter.Duration-time.Since(*exp.rateError.timestamp.Load()))
131+
}
132+
rateLimitEnabled = true
133+
} else if exp.rateError.isRateLimited() {
134+
t.Fatalf("Should have returned error")
135+
}
136+
count++
137+
}
138+
139+
require.True(t, rateLimitEnabled, "Should have received rate limit errors")
140+
141+
t.Log("Phase 3: Verifying we can try to send data again")
142+
assert.Eventually(t, func() bool {
143+
return exp.canSend()
144+
}, 2*time.Minute, 100*time.Millisecond, "Should be able to send data after rate limiter duration")
145+
}
146+
147+
func Test_E2E_LowQuotaRateLimit(t *testing.T) {
148+
privateKey := os.Getenv("CORALOGIX_PRIVATE_KEY_LOW_QUOTA")
149+
if privateKey == "" {
150+
t.Skip("Skipping E2E test: CORALOGIX_PRIVATE_KEY_LOW_QUOTA not set")
151+
}
152+
153+
cfg := &Config{
154+
Domain: "eu2.coralogix.com",
155+
PrivateKey: configopaque.String(privateKey),
156+
RateLimiter: RateLimiterConfig{
157+
Enabled: true,
158+
Threshold: 5,
159+
Duration: 30 * time.Second,
160+
},
161+
AppName: "e2e-test-app",
162+
SubSystem: "e2e-test-subsystem",
163+
}
164+
165+
t.Log("Creating exporter")
166+
exp, err := newTracesExporter(cfg, exportertest.NewNopSettings(exportertest.NopType))
167+
require.NoError(t, err)
168+
169+
t.Log("Starting exporter")
170+
err = exp.start(context.Background(), componenttest.NewNopHost())
171+
require.NoError(t, err)
172+
defer func() {
173+
t.Log("Shutting down exporter")
174+
err = exp.shutdown(context.Background())
175+
require.NoError(t, err)
176+
}()
177+
178+
t.Log("Phase 1: Sending data until rate limiter is activated")
179+
rateLimited := false
180+
errorCount := 0
181+
rateLimitedCount := 0
182+
183+
for !rateLimited {
184+
traces := generateTestTraces()
185+
err = exp.pushTraces(context.Background(), traces)
186+
if err != nil {
187+
errorCount++
188+
if exp.rateError.isRateLimited() {
189+
rateLimitedCount++
190+
rateLimited = true
191+
t.Logf("Rate limiter activated! Error count: %d, Rate limited count: %d", errorCount, rateLimitedCount)
192+
} else {
193+
t.Logf("Received error (not rate limited yet). Error count: %d", errorCount)
194+
}
195+
}
196+
}
197+
198+
require.True(t, rateLimited, "Rate limiter should have been activated")
199+
200+
t.Log("Phase 2: trying to send data during rate limit period (but will be rate limited)")
201+
rateLimitEnabled := false
202+
203+
count := 0
204+
for time.Since(*exp.rateError.timestamp.Load()) < cfg.RateLimiter.Duration {
205+
traces := generateTestTraces()
206+
err = exp.pushTraces(context.Background(), traces)
207+
if err != nil && exp.rateError.isRateLimited() {
208+
if count%5000 == 0 { // Do not spam the logs
209+
t.Logf("Tried to send data but limited for %s", cfg.RateLimiter.Duration-time.Since(*exp.rateError.timestamp.Load()))
210+
}
211+
rateLimitEnabled = true
212+
} else if exp.rateError.isRateLimited() {
213+
t.Fatalf("Should have returned error")
214+
}
215+
count++
216+
}
217+
218+
require.True(t, rateLimitEnabled, "Should have received rate limit errors")
219+
220+
t.Log("Phase 3: Verifying we can try to send data again")
221+
assert.Eventually(t, func() bool {
222+
return exp.canSend()
223+
}, 2*time.Minute, 100*time.Millisecond, "Should be able to send data after rate limiter duration")
224+
}
225+
226+
func generateTestTraces() ptrace.Traces {
227+
traces := ptrace.NewTraces()
228+
resourceSpans := traces.ResourceSpans()
229+
230+
for i := 0; i < 5000; i++ {
231+
rs := resourceSpans.AppendEmpty()
232+
resource := rs.Resource()
233+
resource.Attributes().PutStr("service.name", "e2e-test-service")
234+
resource.Attributes().PutStr("environment", "test")
235+
scopeSpans := rs.ScopeSpans().AppendEmpty()
236+
span := scopeSpans.Spans().AppendEmpty()
237+
span.SetTraceID(pcommon.TraceID([16]byte{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16}))
238+
span.SetSpanID(pcommon.SpanID([8]byte{1, 2, 3, 4, 5, 6, 7, 8}))
239+
span.SetName("test-span")
240+
span.SetKind(ptrace.SpanKindServer)
241+
span.SetStartTimestamp(pcommon.NewTimestampFromTime(time.Now()))
242+
span.SetEndTimestamp(pcommon.NewTimestampFromTime(time.Now().Add(time.Second)))
243+
}
244+
245+
return traces
246+
}

exporter/coralogixexporter/logs_client.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,7 @@ func (e *logsExporter) pushLogs(ctx context.Context, ld plog.Logs) error {
7070
)
7171
}
7272

73+
e.rateError.errorCount.Store(0)
7374
return nil
7475
}
7576

0 commit comments

Comments
 (0)